Project Metamorphosis: Unveiling the next-gen event streaming platformLearn More

Announcing ksqlDB 0.10.0

We’re excited to announce the release of ksqlDB 0.10.0, available now in the standalone distribution and on Confluent Cloud! This version includes a first-class Java client, improved Apache Kafka® key support, and a slew of new built-in functions. We’ll highlight some of the major changes, but see the changelog for a detailed list of all improvements and bug fixes.

Java client

One of the best things about ksqlDB is that it allows you to use language-neutral SQL to author stream processing programs. But what about when you need to connect your processed data back to your application? In the past, ksqlDB’s answer to this was its REST API. Although this is useful, there was room for something better. What everyone really wants is a first-class client to interact with ksqlDB from their favorite programming language.

In this release, we’re excited to announce the Java client, the first in a series of new clients that we will be rolling out. Our client supports builds on a huge amount of work that we’ve been doing over the past six months to rework how our networked server API behaves. Today’s Java client currently supports pull and push queries as well as insertion of new rows of data into existing ksqlDB streams. In an upcoming release, the client will be enhanced to additionally support persistent queries and admin operations such as listing streams, tables, and topics.

Let’s see it in action.

Push and pull query support

ksqlDB supports querying your data in two different ways: push and pull. Push queries allow your application to issue a query and subscribe to the results as they change in real time. Pull queries allow your client to fetch the current state of a materialized view at a point in time.

The client supports both consuming query result rows in an event streaming fashion via Reactive Streams as well as polling for result rows in a synchronous manner.

To consume result rows in a streaming fashion, use the streamQuery() method and subscribe a Reactive Streams Subscriber to the result.

client.streamQuery("SELECT * FROM MY_STREAM EMIT CHANGES;")
    .thenAccept(streamedQueryResult -> {
      System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());
      
      RowSubscriber subscriber = new RowSubscriber();
      streamedQueryResult.subscribe(subscriber);
    }).exceptionally(e -> {
      System.out.println("Request failed: " + e);
      return null;
    });

In the code snippet above, RowSubscriber is an example implementation of a Reactive Streams Subscriber for consuming query result rows.

The streamQuery() method may also be used to synchronously poll for results one at a time:

StreamedQueryResult streamedQueryResult = client.streamQuery("SELECT * FROM MY_STREAM EMIT CHANGES;").get();

for (int i = 0; i < 10; i++) {
  // Block until a new row is available
  Row row = streamedQueryResult.poll();
  if (row != null) {
    System.out.println("Received a row!");
    System.out.println("Row: " + row.values());
  } else {
    System.out.println("Query has ended.");
  }
}

Rather than blocking for each result row one row at a time, the executeQuery() method can be used to instead block until all result rows have arrived. This method is useful for pull queries and push queries with limits.

String pullQuery = "SELECT * FROM MY_MATERIALIZED_TABLE WHERE KEY_FIELD='some_key';";
BatchedQueryResult batchedQueryResult = client.executeQuery(pullQuery);

// Wait for query result
List resultRows = batchedQueryResult.get();

System.out.println("Received results. Num rows: " + resultRows.size());
for (Row row : resultRows) {
  System.out.println("Row: " + row.values());
}

Inserting new rows

Similar to using a basic Kafka client to produce data or using the INSERT INTO ... VALUES statement, the client also supports inserting rows into existing streams. Here’s an example of using the client to insert a new row into an existing stream ORDERS with the schema (ORDER_ID BIGINT, PRODUCT_ID VARCHAR, USER_ID VARCHAR):

KsqlObject row = new KsqlObject()
    .put("ROWKEY", "k1")
    .put("ORDER_ID", 12345678L)
    .put("PRODUCT_ID", "UAC-222-19234")
    .put("USER_ID", "User_321");

client.insertInto("ORDERS", row).get();

Get started with the Java client today!

Enhanced key support

ksqlDB tightly integrates with Kafka to make it easy to work with your existing events. But in the past, ksqlDB had strict limitations on the key namespace of Kafka’s records. This wasn’t ideal because people don’t simply use keys for partitioning; they also store important data in them.

Our goal is to make it simple for you to process all of your Kafka data with ksqlDB. To enable that, we’re pleased to announce a sweeping set of new features. Note that some of the changes are not backward compatible with previous versions, so you may need to update some of your statements.

Any key name

ksqlDB 0.8.0 saw the introduction of non-string key support. This release builds on that progress by removing the vestigial restriction that all key columns must be named ROWKEY: key columns can now have any name, just like any other column. This is important because it allows you to work with all of your Kafka data in one logical, column-oriented model.

In addition to being able to explicitly provide your own name for key columns when declaring a stream or table, the name of the key column in derived streams and tables is now itself derived from the query. For example, a query with a GROUP BY on column_1 will have a key column matching the type and name of the grouping column. More details can be found in KLIP-24.

Removal of WITH KEY syntax

Previous versions allowed you to provide the name of a value column that was a copy of the ROWKEY key column. This information acted as an optimization hint and allowed queries to use more friendly column names. The introduction of the “any key name” feature renders this syntax redundant. Version 0.10.0 drops support for this syntax.

Primary keys and keyless streams

Previously, ksqlDB would add an implicit key column called ROWKEY to any stream or table that did not explicitly declare a key column, leaving users to wonder where the column came from. ksqlDB 0.10.0 now requires all tables to declare a primary key column and creates a stream without a key column should the stream not declare one. More details can be found in KLIP-29.

New built-in functions

We’ve introduced a plethora of new built-in functions to make it easier to write your stream processing applications. Here is a quick tour of the new additions:

Special shoutout to mateuszmrozewski for contributing the INSTR UDF and hpgrahsl for contributing the ARRAY_JOIN UDF.

Get started today

For the full list of fixes and improvements, see the changelog.

If you haven’t already, join our #ksqldb Confluent Community Slack channel, follow us on Twitter, and get started with ksqlDB today!

Steven Zhang is a software engineer on the ksqlDB Team at Confluent. He graduated with a bachelor’s degree from Johns Hopkins University in spring of 2019 and joined Confluent soon after.

Did you like this blog post? Share it now

Subscribe to the Confluent blog

More Articles Like This

Unifying Streams and State: The Seamless Path to Real-Time

More than ever before, people demand immediacy in every aspect of their lives. Expectations for how we shop, bank, and commute have completely evolved over the last decade. When you […]

My Python/Java/Spring/Go/Whatever Client Won’t Connect to My Apache Kafka Cluster in Docker/AWS/My Brother’s Laptop. Please Help!

tl;dr When a client wants to send or receive a message from Apache Kafka®, there are two types of connection that must succeed: The initial connection to a broker (the […]

Real-Time Fleet Management Using Confluent Cloud and MongoDB

Most organisations maintain fleets, a collection of vehicles put to use for day-to-day operations. Telcos use a variety of vehicles including cars, vans, and trucks for service, delivery, and maintenance. […]

Sign Up Now

Start your 3-month trial. Get up to $200 off on each of your first 3 Confluent Cloud monthly bills

新規登録のみ。

上の「新規登録」をクリックすることにより、当社がお客様の個人情報を以下に従い処理することを理解されたものとみなします : プライバシーポリシー

上記の「新規登録」をクリックすることにより、お客様は以下に同意するものとします。 サービス利用規約 Confluent からのマーケティングメールの随時受信にも同意するものとします。また、当社がお客様の個人情報を以下に従い処理することを理解されたものとみなします: プライバシーポリシー

単一の Kafka Broker の場合には永遠に無料
i

商用版の機能を単一の Kafka Broker で無期限で使用できるソフトウェアです。2番目の Broker を追加すると、30日間の商用版試用期間が自動で開始します。この制限を単一の Broker へ戻すことでリセットすることはできません。

デプロイのタイプを選択
手動デプロイ
  • tar
  • zip
  • deb
  • rpm
  • docker
または
自動デプロイ
  • kubernetes
  • ansible

上の「無料ダウンロード」をクリックすることにより、当社がお客様の個人情報をプライバシーポリシーに従い処理することを理解されたものとみなします。 プライバシーポリシー

以下の「ダウンロード」をクリックすることにより、お客様は以下に同意するものとします。 Confluent ライセンス契約 Confluent からのマーケティングメールの随時受信にも同意するものとします。また、お客様の個人データが以下に従い処理することにも同意するものとします: プライバシーポリシー

このウェブサイトでは、ユーザーエクスペリエンスの向上に加え、ウェブサイトのパフォーマンスとトラフィック分析のため、Cookie を使用しています。また、サイトの使用に関する情報をソーシャルメディア、広告、分析のパートナーと共有しています。