[Webinar] Kafka Re-Architected for the Cloud with Kora Engine → Register Today!

We ❤️ syslogs: Real-time syslog processing with Apache Kafka and KSQL – Part 3: Enriching events with external data

Get started with Confluent, for free

Watch demo: Kafka streaming in 10 minutes

作成者 :

Using KSQL, the SQL streaming engine for Apache Kafka®, it’s straightforward to build streaming data pipelines that filter, aggregate, and enrich inbound data. The data could be from numerous sources, including applications, log files, databases, and IoT and network devices. In the two previous articles (1 | 2) in this series I’ve shown how to use Apache Kafka and its Connect API to ingest syslog data from multiple sources. Using KSQL, it’s easy to filter it as well as use aggregations to drive simple anomaly detection. The output from KSQL stream processing is a Kafka topic, which can you stream with Kafka Connect to a data store such as S3, or use to drive applications directly such as push-notification alerting.

Streaming syslog and MongoDB data into Kafka and processing it with KSQL

In this article we’re going to conclude our fun with syslog data by looking at how we can enrich inbound streams of syslog data with reference information from elsewhere to produce a real-time enriched data stream. The syslog data in this example comes from various servers and network devices, and the additional information with which we’re going to enrich it is from MongoDB, which happens to be the datastore used by Ubiquiti network devices. With the enriched data we’re going to drive some real-time analytics through Elasticsearch and Kibana, as well as trigger push notifications based on activity of certain devices on the network.

Streaming syslog data from Ubiquiti devices

Streaming syslogdata into Kafka from Ubiquiti

I’m a big fan of Ubiquiti’s network devices, including their Unifi access points. I could wax lyrical here as to why, but I will instead refer you to Troy Hunt’s thorough write-up of Ubiquiti. One of the nice characteristics of them is that there is a ton of configuration that you can do to do them, of the sort that typical consumer-grade networking equipment just doesn’t support.

Configuring the Unifi controller to write syslog data for all devices is easily done through the GUI:

Unifi control panel syslog setting

The syslog server and port should be those where you’re running the Kafka Connect syslog agent (see here for setup instructions). Run the console consumer against the topic to validate that you’re getting data:

$ kafka-avro-console-consumer \
 --bootstrap-server localhost:9092 \
 --property schema.registry.url=http://localhost:8081 \
 --topic syslog

{"date":{"long":1522059085000},"facility":{"int":3},"host":{"string":"("U7PG2,xxxxxxxxxx,v3.7.40.6115")"},"level":{"int":6},"message":{"string":"("U7PG2,xxxxxxxxxx,v3.7.40.6115") hostapd: ath3: STA xx:xx:xx:xx:xx:xx IEEE 802.11: associated\n"},"charset":{"string":"UTF-8"},"remote_address":{"string":"/192.168.10.71:45584"},"hostname":{"string":"192.168.10.71"}}

Working with Ubiquiti syslog data in Apache Kafka and KSQL

From the KSQL prompt, we can inspect the topic, using PRINT:

ksql> PRINT 'syslog';
Format:AVRO
3/4/18 9:14:35 AM GMT, /(/192.168.10.87:50442, {"date": 1520154874000, "facility": 3, "host": "asgard02", "level": 5, "message": "asgard02 syslogd[4134]: --- syslogd restarted ---\n", "charset": "UTF-8", "remote_address": "/192.168.10.87:50442", "hostname": "192.168.10.87"}
3/4/18 9:14:35 AM GMT, /(/192.168.10.87:50442, {"date": 1520154874000, "facility": 5, "host": "asgard02", "level": 5, "message": "asgard02 syslogd[4134]: Configuration Notice:\n\tASL Module \"com.apple.cdscheduler\" claims selected messages.\n\tThose messages may not appear in standard system log files or in the ASL database.\n", "charset": "UTF-8", "remote_address": "/192.168.10.87:50442", "hostname": "192.168.10.87"}

You’ll note that KSQL determines automagically that the data is in Avro format, and displays the timestamp, key, and message value. Declaring a KSQL stream on top of these syslog events is simple:

ksql> CREATE STREAM SYSLOG WITH (KAFKA_TOPIC='syslog',VALUE_FORMAT='AVRO');

Looking at the data some more, we can inspect the host names:

ksql> SELECT HOST FROM SYSLOG WHERE HOST !='asgard02' LIMIT 50;
("U7PG2,f09fc2000000,v3.7.40.6115")
rpi-03
proxmox01
asgard02
("BZ2,24a43c000000,v3.7.40.6115")

Whilst most devices on my network use a standard hostname when sending their syslog data, Ubiquiti ones send a hostname that looks like this:

(\"U7PG2,f09fc2000000,v3.7.40.6115\")

Where this appears to be a concatenation of device model, MAC, and firmware version. I can use this ‘fingerprint’ to pick out all syslog traffic relating to these devices in KSQL:

ksql> SELECT MESSAGE FROM SYSLOG WHERE HOST LIKE '(%';
("U7PG2,f09fc2000000,v3.7.40.6115") hostapd: ath0: STA 60:01:94:xx:xx:xx IEEE 802.11: disassociated
("U7PG2,f09fc2000000,v3.7.40.6115") kernel: [7142078.740000]  ieee80211_ioctl_kickmac[14643]
("U7PG2,f09fc2000000,v3.7.40.6115") libubnt[9577]: dpi.dpi_stainfo_notify(): dpi not enable
("U7PG2,f09fc2000000,v3.7.40.6115") libubnt[9577]: wevent.ubnt_custom_event(): EVENT_STA_LEAVE ath0: 60:01:94:xx:xx:xx / 0

(NB: a better pattern would be (%,%,%) but KSQL currently only supports single wildcard LIKE predicates)

Filter syslog data with KSQL

Let’s persist this stream of Ubiquiti syslog data to make it easier to work with in subsequent querying and processing:

ksql> CREATE STREAM UBNT_SYSLOG AS SELECT * FROM SYSLOG WHERE HOST LIKE '(%';

Message ---------------------------- Stream created and running ---------------------------- ksql> DESCRIBE UBNT_SYSLOG;

Field | Type -------------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) DATE | BIGINT FACILITY | INTEGER HOST | VARCHAR(STRING) LEVEL | INTEGER MESSAGE | VARCHAR(STRING) CHARSET | VARCHAR(STRING) REMOTE_ADDRESS | VARCHAR(STRING) HOSTNAME | VARCHAR(STRING) -------------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>; ksql>

From this stream, we can examine our data and look for certain conditions and events—using standard SQL predicates. For example, we can see devices connecting to an Access Point:

ksql> SELECT MESSAGE from UBNT_SYSLOG WHERE MESSAGE LIKE '% associated%';
("U7PG2,f09fc2000000,v3.7.40.6115") hostapd: ath3: STA 38:f7:3d:xx:xx:xx IEEE 802.11: associated

So this in itself it pretty neat, as we can filter in realtime the syslog data that’s coming in. But…how about being able to identify the access point and user device names? At the moment we can only see their MAC addresses. Read on to see how!

Accessing Ubiquiti data from MongoDB via KSQL and Debezium

Streaming syslogdata into Kafka from Ubiquiti

To get additional metadata from our network devices, including things like the access point and user device names, we’re going to stream this data from its source into Kafka. Ubiquiti uses MongoDB to hold this data, and there’s a great change-data-capture (CDC) tool built on Kafka Connect called Debezium that supports several source databases—including MongoDB! We can use this to stream the current contents of the database into a Kafka topic, as well as all and every subsequent changes to that data. This means that in effect we maintain a live mirror of our MongoDB database in Kafka itself. We don’t need to do some kind of convoluted lookups from Kafka out to a database each time we want to look up some reference data, since we have now that reference data within Kafka itself.

For the purposes of this blog, I cloned my MongoDB instance from the Ubiquiti controller onto a separate server. For a real-life example then we’d use the live controller itself – since working against a clone means that any new changes to the data (such as new access points being created) won’t be available to us.

You can use a tool such as Robo 3T to explore the data that Ubiqiuti has within it:

Robo 3T showing sample of Ubiquiti’s MongoDB data

Setting up Debezium to stream the data from MongoDB is straightforward – you can follow the steps in this guide. After doing it, you’ll have a bunch of Kafka topics, each reflecting the data in each of the MongoDB collections. The two that we’re interested in for this article are:

  • device – metadata for access points
  • user – metadata for user devices (i.e. stuff that connects to the access points—phones, computers, etc)

Let’s take a look at the data we’ve brought in, using KSQL. Create a stream over the inbound Device data topic (which is metadata about the access points), and use the EXTRACTJSONFIELD function to show specific fields:

ksql> CREATE STREAM DEVICE_RAW (after VARCHAR) WITH (KAFKA_TOPIC='ubnt.ace.device',VALUE_FORMAT='AVRO');
ksql> SELECT EXTRACTJSONFIELD(after,'$.name'),EXTRACTJSONFIELD(after,'$.ip') FROM DEVICE_RAW;
Unifi AP - Study | 192.168.10.68
Unifi AP - Attic | 192.168.10.67
ubnt.moffatt.me | 77.102.5.159
Unifi AP - Pantry | 192.168.10.71

Now let’s declare all the columns of interest in our schema, and using CREATE STREAM AS SELECT (“CSAS”) generate an Avro topic based on the source stream. This resulting topic will hold not only the transformed data that’s currently on the source topic from MongoDB, but also any subsequent changes to that data.

ksql> CREATE STREAM UBNT_DEVICE_SRC AS \
SELECT EXTRACTJSONFIELD(after,'$._id') as id, \
EXTRACTJSONFIELD(after,'$.name') as name, \
EXTRACTJSONFIELD(after,'$.ip') as ip, \
EXTRACTJSONFIELD(after,'$.mac') as mac, \
EXTRACTJSONFIELD(after,'$.model') as model, \
EXTRACTJSONFIELD(after,'$.version') as version \
FROM DEVICE_RAW;

ksql> SELECT id, name, ip, mac, model, version FROM UBNT_DEVICE_SRC; {"$oid":"58385328e4b001431e4e497a"} | Unifi AP - Study | 192.168.10.68 | xx:xx:xx:xx:xx:xx | BZ2 | 3.7.40.6115 {"$oid":"583854cde4b001431e4e4982"} | Unifi AP - Attic | 192.168.10.67 | xx:xx:xx:xx:xx:xx | BZ2 | 3.7.40.6115 {"$oid":"58b3fb48e4b0b79e50242621"} | ubnt.moffatt.me | 77.102.5.159 | xx:xx:xx:xx:xx:xx | UGW3 | 4.3.34.4943823 {"$oid":"58b406f1e4b0e334d74c46e4"} | Unifi AP - Pantry | 192.168.10.71 | xx:xx:xx:xx:xx:xx | U7PG2 | 3.7.40.6115

Because we want to join this data (which is, in effect, reference data) we need to ensure that the topic’s messages are keyed on the column on which we want to join, which is the IP address.

Let’s see what the key currently is, courtesy of the system column ROWKEY:

ksql> SELECT ROWKEY FROM UBNT_DEVICE_SRC;
{"id":"{ \"$oid\" : \"58b3fb48e4b0b79e50242621\"}"}
{"id":"{ \"$oid\" : \"58385328e4b001431e4e497a\"}"}
{"id":"{ \"$oid\" : \"58b406f1e4b0e334d74c46e4\"}"}

The source topic is keyed on the id field, which traces back to the source MongoDB collection. We want to rekey the topic to use ip instead. KSQL is a simple way to do this, using the PARTITION BY clause:

ksql> CREATE STREAM UBNT_DEVICE_SRC_REKEY AS SELECT * FROM UBNT_DEVICE_SRC PARTITION BY IP;

Message ---------------------------- Stream created and running ----------------------------

Let’s verify that the message key is now the same as the ip field in the message value:

ksql> SELECT ROWKEY, ip FROM UBNT_DEVICE_SRC_REKEY;
192.168.10.68 | 192.168.10.68
77.102.5.159 | 77.102.5.159
192.168.10.67 | 192.168.10.67

Perfect! The final part of this is to take the resulting Kafka topic from this KSQL streaming transformation and use it as the basis for our KSQL table which will be joined to the syslog data.

ksql> CREATE TABLE UBNT_DEVICE WITH (KAFKA_TOPIC='UBNT_DEVICE_SRC_REKEY',VALUE_FORMAT='AVRO',KEY='IP');

It’s crucial that our table’s topic messages are keyed on the join column which we will be using, so let’s verify again that it is indeed the case in our new table:

ksql> SELECT ROWKEY, IP, NAME FROM UBNT_DEVICE;
192.168.10.68 | 192.168.10.68 | Unifi AP - Study
192.168.10.71 | 192.168.10.71 | Unifi AP - Pantry
192.168.10.67 | 192.168.10.67 | Unifi AP - Attic
77.102.5.159 | 77.102.5.159 | ubnt.moffatt.me

Q: Why did we create a STREAM of device data, and then a TABLE? When should we use STREAM and when should we use TABLE?

A: Great question! Logically, we are using the device data as a TABLE. That is, we want to join an inbound stream of events to our device data in order to enrich it. We want to know for a given key, what the corresponding values are. For a given device row, what’s its name, it’s model, it’s version etc. So it is most definitely a TABLE.

But, in order to join to a TABLE, that TABLE must be keyed on the join column. And as we saw from inspecting ROWKEY above, this was not the case. So we utilised KSQL’s powerful re-keying functionality to rekey the topic automagically. And to do that, we treat the inbound data as a STREAM. Why? Because it’s simply an inbound Kafka topic of events, partitioned on one column and on which we want to partition another. Each event (in this context, a change to the source devicedata on MongoDB) simply needs re-routing to the output topic with the new partitioning key.

Joining Ubiquiti syslog events with access point reference data

Join syslog data to reference data with KSQL

Having defined our inbound stream of Ubiquiti events (captured through Kafka Connect’s syslog connector), and device reference data sourced from MongoDB (snapshotted and changes both streamed through Kafka Connect from Debezium), we can join the two, which is remarkably straightforward to anyone familiar with SQL. Here we’re showing each syslog message, enriched with the name of the access point from which it originated:

ksql> SELECT D.NAME, L.MESSAGE \
FROM UBNT_SYSLOG L \
LEFT JOIN UBNT_DEVICE D \
ON L.HOSTNAME=D.IP;

Unifi AP - Pantry | ("U7PG2,f09fc2000000,v3.7.40.6115") kernel: [5255656.050000] ieee80211_ioctl_set_ratelimit: node with aid 8 and mac cc:2d:b7:xx:xx:xx has been tagged non rate-limiting Unifi AP - Attic | ("BZ2,dc9fdbxxxxxx,v3.7.40.6115") hostapd: ath1: STA 3c:2e:f9:xx:xx:xx IEEE 802.11: associated Unifi AP - Attic | ("BZ2,dc9fdbxxxxxx,v3.7.40.6115") syslog: dpi.dpi_stainfo_notify(): dpi not enable

In the syslog data from Ubiquiti is a whole wealth of events, many of them more low-level than we may be interested in. One of the things that is useful to track is user devices connecting to access points, and we can easily expose this here using a SQL predicate. We can also expose the timestamp at which this happened, made human-readable with the TIMESTAMPTOSTRING function:

ksql> SELECT TIMESTAMPTOSTRING(L.ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS CONNECT_TS, D.NAME, L.MESSAGE \
FROM UBNT_SYSLOG L \
LEFT JOIN UBNT_DEVICE D \
ON L.HOSTNAME=D.IP \
WHERE L.MESSAGE LIKE '% associated%';

2018-03-28 09:37:33 | Unifi AP - Attic | ("BZ2,dc9fdbxxxxxx,v3.7.40.6115") hostapd: ath1: STA 3c:2e:f9:xx:xx:xx IEEE 802.11: associated 2018-03-27 17:15:11 | Unifi AP - Study | ("BZ2,24a43cde91a0,v3.7.40.6115") hostapd: ath1: STA fc:a1:83:xx:xx:xx IEEE 802.11: associated

So this is pretty cool, as we can now see as events occur more information about them—in this case the access point that a device is joining. Let’s persist it as a stream, and add in the derivation of the MAC address of the connecting device, extracted using the SUBSTRING function:

ksql> CREATE STREAM UBNT_SYSLOG_AP_CONNECTS AS \
SELECT D.NAME AS AP_NAME, \
SUBSTRING(L.MESSAGE,53,70) as USER_MAC \
FROM UBNT_SYSLOG L \
LEFT JOIN UBNT_DEVICE D \
ON L.HOSTNAME=D.IP \
WHERE L.MESSAGE LIKE '% associated%';

Message ---------------------------- Stream created and running ---------------------------- ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , AP_NAME, USER_MAC </span> FROM UBNT_SYSLOG_AP_CONNECTS; 2018-03-28 09:37:33 | Unifi AP - Attic | 3c:2e:f9:xx:xx:xx 2018-03-27 17:15:11 | Unifi AP - Study | fc:a1:83:xx:xx:xx

But what about the user device—the third column in the above output? All we have is a MAC address (obfuscated in the examples above). Wouldn’t it be nice to see the name of the device as well as the access point? Well, you know what’s coming now!

Joining Ubiquiti syslog events with access point and device reference data

Enrich syslog data to lookup data with KSQL

In the same way as we saw above how to manipulate the inbound stream of snapshotted device (access point) data and subsequent changes (streamed from MongoDB via Debezium), we’ll do the same here for the device information, which is found in the user collection. The steps are:

  1. Declare a raw STREAM on source user MongoDB collection data, in which all data resides in the after field
    ksql> CREATE STREAM USER_RAW (AFTER VARCHAR) WITH (KAFKA_TOPIC='ubnt.ace.user',VALUE_FORMAT='AVRO');
    

    Message ---------------- Stream created ----------------

  2. Declare a derived STREAM in which columns are extracted from the source after field
    ksql> CREATE STREAM UBNT_USER_SRC AS SELECT EXTRACTJSONFIELD(after,'$.mac') as mac, \
    EXTRACTJSONFIELD(after,'$.oui') as oui, \
    EXTRACTJSONFIELD(after,'$.name') as name, \
    EXTRACTJSONFIELD(after,'$.hostname') as hostname, \
    EXTRACTJSONFIELD(after,'$.is_guest') as is_guest \
    FROM USER_RAW;
    

    Message ---------------------------- Stream created and running ----------------------------

  3. Re-key the STREAM by join column (MAC address)
    ksql> CREATE STREAM UBNT_USER_SRC_REKEY AS SELECT * FROM UBNT_USER_SRC PARTITION BY MAC;
    

    Message ---------------------------- Stream created and running ----------------------------

  4. Declare a TABLE on top of the Kafka topic resulting from the re-key operation
    ksql> CREATE TABLE UBNT_USER WITH (KAFKA_TOPIC='UBNT_USER_SRC_REKEY',VALUE_FORMAT='AVRO',KEY='MAC');
    

    Message --------------- Table created ---------------

  5. Check that the message key (ROWKEY) matches the intended join column (MAC):
    ksql> SELECT ROWKEY, MAC, NAME FROM UBNT_USER;
    b8:27:eb:xx:xx:xx | b8:27:eb:xx:xx:xx | rpi-01.moffatt.me
    ac:bc:32:xx:xx:xx | ac:bc:32:xx:xx:xx | asgard
    1e:7e:60:xx:xx:xx | 1e:7e:60:xx:xx:xx | media02.moffatt.me
    fc:a1:83:xx:xx:xx | fc:a1:83:xx:xx:xx | Fire 01 (Red)
    60:01:94:xx:xx:xx | 60:01:94:xx:xx:xx | Attic lights plug
    00:04:20:xx:xx:xx | 00:04:20:xx:xx:xx | Squeezebox - Kitchen
    b8:ac:6f:xx:xx:xx | b8:ac:6f:xx:xx:xx | crashplan.moffatt.me

Now we can join between Ubiquiti syslog events and reference information for both access points (persisted above in the UBNT_SYSLOG_AP_CONNECTS stream), and user devices (UBNT_USER):

ksql> SELECT TIMESTAMPTOSTRING(L.ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS CONNECT_TS, \
L.AP_NAME, U.NAME AS USER_DEVICE_NAME , U.HOSTNAME as HOSTNAME, U.IS_GUEST  \
FROM UBNT_SYSLOG_AP_CONNECTS L \
LEFT JOIN UBNT_USER U \
ON L.USER_MAC = U.MAC ;
2018-06-11 15:05:06 | Unifi AP - Study | null | ItStillnsWhenIP | false
2018-06-11 15:05:06 | Unifi AP - Study | null | ItStillnsWhenIP | false
2018-06-11 15:13:46 | Unifi AP - Study | Fire 02 (Yellow) | amazon-ca6091cfe | false

So every time a user’s device connects to an access point, we get to see the name of the access point, the name of the user device, and the type of the device. We’ll persist this as a stream, because we’ve not quite finished this exploration yet!

ksql> CREATE STREAM UBNT_AP_USER_DEVICE_CONNECTS AS \
SELECT TIMESTAMPTOSTRING(L.ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS CONNECT_TS, \
L.AP_NAME, U.NAME AS USER_DEVICE_NAME , U.HOSTNAME as HOSTNAME, U.IS_GUEST  \
FROM UBNT_SYSLOG_AP_CONNECTS L \
LEFT JOIN UBNT_USER U \
ON L.USER_MAC = U.MAC ;

Streaming enriched syslog data from Kafka to Elasticsearch with Kafka Connect

Taking this enriched stream of data we can use a tool such as Elasticsearch with Kibana on top to provide an easy visualisation of the real-time data, as well as aggregate analysis based upon it:

Kibana dashboard showing summary of Ubiquiti data streamed from Apache Kafka and KSQL

Streaming the Kafka topic to Elasticsearch is easy using the Kafka Connect Elasticsearch plugin. Take a look at the Elasticsearch mapping template and Kafka Connect configuration I am using.

Enriched streams of data are valuable for analysis that we want to consume and look at, but even more valuable is event-driven alerting on conditions that we’re interested in. We saw in the previous article how we can use something like a simple Python script to drive push-based notifications in response to events on a Kafka topic. Using this pattern we can use a further KSQL expression to send notifications to a topic when a connection is made to an access point from a certain type of device. For example, if I wanted to track whenever my wifi-enabled plugs (yes, really!) reconnect to an access point, I can filter on the device type of “Espressi”:

ksql> SELECT CONNECT_TS, AP_NAME, USER_DEVICE_NAME \
FROM UBNT_AP_USER_DEVICE_CONNECTS \
WHERE DEVICE_TYPE='Espressi';
2018-03-04 14:07:43 | Unifi AP - Study | Wifi Plug - Sitting Room
2018-03-04 14:07:48 | Unifi AP - Study | Attic lights plug
2018-03-04 14:07:42 | Unifi AP - Study | Wifi Plug - Sitting Room
2018-03-04 14:07:42 | Unifi AP - Study | Study light

Filtering streaming data with KSQL to trigger push notification to Slack

Persisting this to a target stream that the python application is listening to results in a nice push notification every time the device reconnects…

CREATE STREAM KSQL_NOTIFY_DEVICE WITH (VALUE_FORMAT='JSON') AS \
SELECT CONNECT_TS + ' 🛰  📟 ' + USER_DEVICE_NAME + ' connected to ' + AP_NAME AS TEXT, \
       'device_connections' AS CHANNEL \
FROM UBNT_AP_USER_DEVICE_CONNECTS5 \
WHERE DEVICE_TYPE='Espressi';

…on both desktop:

Slack message via KSQL when a device connects to an access point

…and mobile

Recap

We’ve come a long way! Let’s remind ourselves what we’ve built:

Streaming syslog and MongoDB data into Kafka and processing it with KSQL

  1. Streaming syslog events from multiple devices into Kafka
  2. Streaming contents and subsequent changes from MongoDB into Kafka
  3. Filtering syslog events to just include those from Ubiquiti devices
  4. Enriching Ubiquiti syslog events with both (i) access point name and (ii) user device name and device type
  5. Streaming enriched Ubiquiti syslog data to Elasticsearch for analysis in Kibana
  6. Filtering Ubiquiti syslog data to include only wifi connections from devices which are wifi-plugs
  7. Triggering push notification from Kafka topic to Slack

Phew!

Summary

In this mini-series of blog articles we’ve seen the power of Apache Kafka—including Kafka Connect—and KSQL to build applications and analytics on Apache Kafka as a streaming platform. Using KSQL, it’s simple to create scalable real-time stream processing applications, using just SQL—no other coding required! Kafka Connect provides a configuration-file based method for powerful streaming integration between sources of data into Kafka, and from Kafka out to targets such as Elasticsearch.

If you’re interested in learning more, you can:

  • Robin is a principal developer advocate at Confluent, the company founded by the original creators of Apache Kafka, as well as an Oracle Groundbreaker Ambassador. His career has always involved data, from the old worlds of COBOL and DB2, through the worlds of Oracle and Hadoop, and into the current world with Kafka. His particular interests are analytics, systems architecture, performance testing and optimization. He blogs at http://cnfl.io/rmoff and http://rmoff.net/ and can be found tweeting grumpy geek thoughts as @rmoff. Outside of work he enjoys drinking good beer and eating fried breakfasts, although generally not at the same time.

Get started with Confluent, for free

Watch demo: Kafka streaming in 10 minutes

このブログ記事は気に入りましたか?今すぐ共有