ストリーム処理

KSQL UDFs and UDAFs Made Easy

Mitch Seymour
Last Updated: 

One of KSQL’s most powerful features is allowing users to build their own KSQL functions for processing real-time streams of data. These functions can be invoked on individual messages (user-defined functions or UDFs) or used to perform aggregations on groups of messages (user-defined aggregate functions or UDAFs).

The previous blog post How to Build a UDF and/or UDAF in KSQL 5.0 discussed some key steps for building and deploying a custom KSQL UDF/UDAF. Now with Confluent Platform 5.3.0, creating custom KSQL functions is even easier when you leverage Maven, a tool for building and managing dependencies in Java projects.

Confluent Platform 5.3.0 adds a new Maven archetype called the KSQL UDF / UDAF Quickstart that will allow you to quickly bootstrap your own UDF/UDAF without having to copy and paste example code, add the boilerplate for building an uber JAR, or perform other tedious tasks that would otherwise be required for setting up a new project. Maven archetypes are used to create project templates, so we found them to be a great vehicle for getting developers up and running quickly with custom KSQL functions.

In addition to discussing how the KSQL UDF / UDAF Quickstart can be used, we will also demonstrate how to convert the generated Maven project to a Gradle project with a simple command. Gradle is another automated build system that many developers prefer over Maven. These developers will learn how to bootstrap new UDF projects using the Maven archetype, and then convert to the build system of their choice for further development.

So, without further ado, let’s get started.

Using the Maven archetype

In order to use the KSQL UDF / UDAF Quickstart for bootstrapping a custom KSQL function, we need to have Maven installed. You can check to see if Maven is installed by running the following command:

$ mvn --version

If Maven is not installed, follow the official installation instructions. Next, add the Maven repositories from Confluent to your ~/.m2/settings.xml file:

<settings>
   <profiles>
      <profile>
         <id>myprofile</id>
         <repositories>
            <!-- Confluent releases -->  
            <repository>
               <id>confluent</id>
               <url>https://packages.confluent.io/maven/</url>
            </repository>

            <!-- further repository entries here -->
         </repositories>   
      </profile>
    </profiles> 

   <activeProfiles>
      <activeProfile>myprofile</activeProfile>
   </activeProfiles>
</settings>

More information about Maven repositories can be found here. Once Maven is installed and the repositories have been added, generating a new UDF/UDAF project is simple. First, run the following command:

$ mvn archetype:generate -X \
    -DarchetypeGroupId=io.confluent.ksql \
    -DarchetypeArtifactId=ksql-udf-quickstart \
    -DarchetypeVersion=5.3.0

You will be asked to provide some information about your project. An example configuration is shown below (feel free to update the following with values that are appropriate for your own project):

Define value for property 'groupId':  com.example.ksql.functions 
Define value for property 'artifactId':  my-udf
Define value for property 'version':  0.1.0
Define value for property 'package':  com.example.ksql.functions
Define value for property 'author':  Mitch Seymour

Once you’ve confirmed the configuration (e.g., by simply hitting <ENTER> when prompted to do so), the above command will create a new project with the following directory structure. (Note: The actual directory structure may vary depending on the groupId and artifactId parameters that you specified earlier).

my-udf/
├── dependency-reduced-pom.xml
├── pom.xml
└── src
    ├── main
    │   ├── java
    │   │   └── com
    │   │       └── example
    │   │           └── ksql
    │   │               └── functions
    │   │                   ├── ReverseUdf.java
    │   │                   └── SummaryStatsUdaf.java
    │   └── resources
    └── test
        └── java
            └── com
                └── example
                    └── ksql
                        └── functions
                            ├── ReverseUdfTests.java
                            └── SummaryStatsUdafTests.java

In the next section, we will explore the example KSQL functions generated by the archetype and learn how to deploy these functions to our KSQL server.

Example KSQL UDF and UDAF

The archetype includes one example UDF (REVERSE) and one example UDAF (SUMMARY_STATS), which are defined in the following files, respectively: ReverseUdf.java and SummaryStatsUdaf.java. Let’s start by taking a look at ReverseUdf.java.

ReverseUdf.java

package com.example.ksql.functions;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

@UdfDescription(
    name = "reverse",
    description = "Example UDF that reverses an object",
    version = "0.1.0",
    author = ""
)
public class ReverseUdf {
  @Udf(description = "Reverse a string")
  public String reverseString(
      @UdfParameter(value = "source", description = "the value to reverse")
      final String source
  ) {
    return new StringBuilder(source).reverse().toString();
  }

  @Udf(description = "Reverse an integer")
  public String reverseInt(
      @UdfParameter(value = "source", description = "the value to reverse")
      final Integer source
  ) {
    return new StringBuilder(source.toString()).reverse().toString();
  }

  @Udf(description = "Reverse a long")
  public String reverseLong(
      @UdfParameter(value = "source", description = "the value to reverse")
      final Long source
  ) {
    return new StringBuilder(source.toString()).reverse().toString();
  }

  @Udf(description = "Reverse a double")
  public String reverseDouble(
      @UdfParameter(value = "source", description = "the value to reverse")
      final Double source
  ) {
    return new StringBuilder(source.toString()).reverse().toString();
  }
}

This example UDF can be used for reversing strings and numerics, and it is already fully functional and ready to deploy. One key item this particular UDF showcases is the ability for a KSQL function to support multiple method signatures. Our REVERSE function (defined above) can reverse a String, Long, Integer, or Double since we provided methods for each of these operations. This example UDF is somewhat trivial, but the point of this archetype is to allow you to easily replace the code here with your own code, and then simply follow the build and deployment steps described later in this article to start using your own UDF.

As mentioned earlier, the archetype also includes an example UDAF. Unlike UDFs, which operate on a single row at a time, UDAFs can be used for computing aggregates against multiple rows of data. Let’s take a look at the example UDAF (called SUMMARY_STATS) and see how it works.

SummaryStatsUdaf.java

package com.example.ksql.functions;

import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import java.util.HashMap;
import java.util.Map;

@UdafDescription(
    name = "summary_stats",
    description = "Example UDAF that computes some summary stats for a stream of doubles",
    version = "0.1.0",
    author = ""
)
public final class SummaryStatsUdaf {

  private SummaryStatsUdaf() {
  }

  @UdafFactory(description = "compute summary stats for doubles")
  // Can be used with stream aggregations. The input of our aggregation will be doubles,
  // and the output will be a map
  public static Udaf<Double, Map<String, Double>> createUdaf() {

    return new Udaf<Double, Map<String, Double>>() {

      /**
       * Specify an initial value for our aggregation
       *
       * @return the initial state of the aggregate.
       */
      @Override
      public Map<String, Double> initialize() {
        final Map<String, Double> stats = new HashMap<>();
        stats.put("mean", 0.0);
        stats.put("sample_size", 0.0);
        stats.put("sum", 0.0);
        return stats;
      }

      /**
       * Perform the aggregation whenever a new record appears in our stream.
       *
       * @param newValue the new value to add to the {@code aggregateValue}.
       * @param aggregateValue the current aggregate.
       * @return the new aggregate value.
       */
      @Override
      public Map<String, Double> aggregate(
          final Double newValue,
          final Map<String, Double> aggregateValue
      ) {
        final Double sampleSize = 1.0 + aggregateValue
            .getOrDefault("sample_size", 0.0);

        final Double sum = newValue + aggregateValue
            .getOrDefault("sum", 0.0);
  
        // calculate the new aggregate
        aggregateValue.put("mean", sum / sampleSize);
        aggregateValue.put("sample_size", sampleSize);
        aggregateValue.put("sum", sum);
        return aggregateValue;
      }

      /**
       * Called to merge two aggregates together.
       *
       * @param aggOne the first aggregate
       * @param aggTwo the second aggregate
       * @return the merged result
       */
      @Override
      public Map<String, Double> merge(
          final Map<String, Double> aggOne,
          final Map<String, Double> aggTwo
      ) {
        final Double sampleSize =
            aggOne.getOrDefault("sample_size", 0.0) + aggTwo.getOrDefault("sample_size", 0.0);
        final Double sum =
            aggOne.getOrDefault("sum", 0.0) + aggTwo.getOrDefault("sum", 0.0);

        // calculate the new aggregate
        final Map<String, Double> newAggregate = new HashMap<>();
        newAggregate.put("mean", sum / sampleSize);
        newAggregate.put("sample_size", sampleSize);
        newAggregate.put("sum", sum);
        return newAggregate;
      }
    };
  }
}

This UDAF may seem complicated at first, but it’s really just performing some basic math and adding the computations to a Map object. Returning a Map is one method for returning multiple values from a KSQL function. Using the example above for your own UDAF, take note of the following methods:

  • initialize: used to specify the initial value of your aggregation
  • aggregate: performs the actual aggregation by looking at the current row’s value (i.e., the currentValue argument), as well as the current aggregation value (i.e., aggregateValue argument), and generates a new aggregate
  • merge: describes how to merge two aggregations into one (e.g., when using session windows)

Building and Deploying KSQL functions

Once you’ve replaced the example UDF/UDAF logic with your own (or, if you’d like, just use the example UDF/UDAF for the rest of this tutorial), then it’s time to deploy your KSQL functions to a KSQL server. To begin, build the project by running the following command in the project root directory:

$ mvn clean package

Note: the archetype includes some default unit tests, so if you changed the example code by this point, then add the -DskipTests flag to the command above (we’ll cover tests in the next section, so we can skip them for now).

The above command will drop a JAR in the target/ directory. For example, if your artifactId is my-udf, then the command will have created a file named target/my-udf-0.1.0.jar.

Now, simply copy this JAR file to the KSQL extension directory (see the ksql.extension.dir property in the ksql-server.properties file) and restart your KSQL server so that it can pick up the new JAR containing your custom KSQL function.

# stop KSQL server cleanly using the following command
$ /bin/ksql-server-stop

# restart the KSQL server so that we can use our newly deploy KSQL functions
$ /bin/ksql-server-start config/ksql-server.properties

Restarting is not only required for KSQL to recognize new functions, but also to recognize any updates you have made to existing functions. Once KSQL has finished restarting and has connected to a running Apache Kafka® cluster, you can verify that the new functions exist by running the DESCRIBE FUNCTION command from the CLI:

ksql> DESCRIBE FUNCTION REVERSE ;

Name        : REVERSE
Author      : 
Version     : 0.1.0
Overview    : Example UDF that reverses an object
Type        : scalar
Jar         : /tmp/ext/my-udf-0.1.0.jar
Variations  :

	Variation   : REVERSE(source INT)
	Returns     : VARCHAR
	Description : Reverse an integer
	source      : the value to reverse

	Variation   : REVERSE(source VARCHAR)
	Returns     : VARCHAR
	Description : Reverse a string
	source      : the value to reverse

	Variation   : REVERSE(source DOUBLE)
	Returns     : VARCHAR
	Description : Reverse a double
	source      : the value to reverse

	Variation   : REVERSE(source BIGINT)
	Returns     : VARCHAR
	Description : Reverse a long
	source      : the value to reverse

	

ksql> DESCRIBE FUNCTION SUMMARY_STATS ;

Name        : SUMMARY_STATS
Author      : mitch
Version     : 0.1.0
Overview    : Example UDAF that computes some summary stats for a stream of doubles
Type        : aggregate
Jar         : /tmp/ext/my-udf-0.1.0.jar
Variations  :

	Variation   : SUMMARY_STATS(DOUBLE)
	Returns     : MAP<VARCHAR,DOUBLE>
	Description : compute summary stats for doubles

Finally, let’s invoke our new UDF/UDAF. For this example, we’ll assume there’s a topic named api_logs in our Kafka cluster. You can create this dummy topic by using the kafka-topics console script:

# assumes the `kafka-topics` script is on your $PATH
$ kafka-topics --create \
    --zookeeper localhost:2181 \
    --topic api_logs \
    --replication-factor 1 \
    --partitions 4

Created topic "api_logs".

With the api_logs topic created, we can now create a KSQL STREAM using the following command:

ksql> CREATE STREAM api_logs (username VARCHAR, endpoint VARCHAR, response_code INT, response_time DOUBLE) \
WITH (kafka_topic='api_logs', value_format='JSON');

 Message
----------------
 Stream created
----------------

At this point, invoking our UDF/UDAF is simply a matter of adding it to our KSQL query:

ksql> SELECT username, REVERSE(username), endpoint, SUMMARY_STATS(response_time) \
FROM api_logs \
GROUP BY username, REVERSE(username), endpoint ;

The above command will execute a continuous query in the KSQL CLI. In another tab, we can produce some dummy records to the api_logs topics using the kafkacat utility.

$ echo '{"username": "mseymour", "endpoint": "index.html", "response_code": 200, "response_time": 400}' | kafkacat -q -b localhost:9092 -t api_logs -P
$ echo '{"username": "mseymour", "endpoint": "index.html", "response_code": 200, "response_time": 900}' | kafkacat -q -b localhost:9092 -t api_logs -P

Back inside the KSQL CLI, you should see the following output:

mseymour | ruomyesm | index.html | {sample_size=1.0, mean=400.0, sum=400.0}
mseymour | ruomyesm | index.html | {sample_size=2.0, mean=650.0, sum=1300.0}

Automated testing of KSQL functions

In addition to including an example UDF and UDAF implementation, the KSQL UDF / UDAF Quickstart includes unit tests that demonstrate how to test your custom KSQL functions. These tests live in the src/test/java/ directory and rely on the JUnit 5 testing platform, which is automatically included when you create a project from the quick start. Whenever you update the example KSQL functions with your own code, it is necessary to also update the included unit tests.

Before we learn how to execute the tests, let’s first see what they look like. The first unit test we’ll review ensures the REVERSE UDF returns the expected results.

ReverseUdfTests.java

package com.example.ksql.functions;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

/**
 * Example class that demonstrates how to unit test UDFs.
 */
public class ReverseUdfTests {

  @ParameterizedTest(name = "reverse({0})= {1}")
  @CsvSource({
    "hello, olleh",
    "world, dlrow",
  })
  void reverseString(final String source, final String expectedResult) {
    final ReverseUdf reverse = new ReverseUdf();
    final String actualResult = reverse.reverseString(source);
    assertEquals(expectedResult, actualResult, source + " reversed should equal " + expectedResult);
  }
}

As you can see in the code above, our testing methodology is relatively straightforward. First, we use a parameter provider called @CsvSource (included in the JUnit 5 testing library) to specify multiple test cases with their corresponding parameters and expected result values. The first value in each CSV string (hello and world) represents the parameter that we want to pass to our UDF (ReverseUdf). The second value in each CSV string represents the expected result of the test (since ReverseUdf is responsible for reversing objects, the expected result in this test case is a reversed string).

Now that we’ve defined our parameters, we simply instantiate a ReverseUdf instance, invoke the appropriate method for reversing a string (reverseString) with our test parameters, and check the result with assertEquals. This method of instantiating a KSQL function and invoking the appropriate methods directly in a test is a good way to prevent accidental regression as you iterate of your code in the future.

A keen eye may have noticed that our ReverseUdf is capable of reversing many types of objects, yet the included unit tests only cover the reversal of strings. We will leave the additional test implementations as an exercise for the reader.

Let’s move on to the unit tests for SummaryStats.

SummaryStatsUdafTests.java

package com.example.ksql.functions;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import io.confluent.ksql.function.udaf.Udaf;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

/**
 * Example class that demonstrates how to unit test UDAFs.
 */
public class SummaryStatsUdafTests {

  @Test
  void mergeAggregates() {
    final Udaf<Double, Map<String, Double>> udaf = SummaryStatsUdaf.createUdaf();
    final Map<String, Double> mergedAggregate = udaf.merge(
      // (sample_size, sum, mean)
      aggregate(3.0, 3300.0, 1100.0),
      aggregate(7.0, 6700.0, 957.143)
    );

    final Map<String, Double> expectedResult = aggregate(10.0, 10000.0, 1000.0);
    assertEquals(expectedResult, mergedAggregate);
  }

 @ParameterizedTest
 @MethodSource("aggSources")
  void calculateSummaryStats(
      final Double newValue,
      final Map<String, Double> currentAggregate,
      final Map<String, Double> expectedResult
    ) {
    final Udaf<Double, Map<String, Double>> udaf = SummaryStatsUdaf.createUdaf();
    assertEquals(expectedResult, udaf.aggregate(newValue, currentAggregate));
  }

  // the rest of this file is omitted for brevity
  Stream<Arguments> aggSources() {}
  
}

The testing methodology for UDAFs is similar to UDFs. We instantiate our UDAF instance and call the appropriate methods (in this case, aggregate and merge) with a set of predefined parameters. We then check the output against the expected results. One minor difference between the ReverseUdf test we saw earlier and the SummaryStatsUdaf above, is that the latter uses a different mechanism for generating test parameters and outputs. Instead of using @CsvSource, we use the @MethodSource provider instead. This is a minor implementation detail and I encourage you to look at the example code yourself to see exactly how this works. The important takeaway here is that testing both UDFs and UDAFs is simple using the methods discussed above.

Running the tests

Finally, we’re ready to execute the unit tests. Simply run the following command to execute the test cases:

$ mvn test

If all goes well, you should see the following output:

[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

Converting From Maven to a Gradle project

Bootstrapping your custom KSQL functions from Confluent’s Maven archetype doesn’t mean you also have to use Maven as your build tool. In fact, Gradle is often preferred, and converting your Maven project to a Gradle project is easy. Simply run the following command in the root project directory to generate a build.gradle file for your project:

$ gradle init --type pom

Now, feel free to delete the pom.xml and make all future build modifications to build.gradle instead.

What’s next?

Now that you know how to quickly bootstrap your next KSQL UDF/UDAF project, you can start building your own custom KSQL functions with minimal effort. A couple of next steps you may want to pursue include adding unit tests for your new code and, if your function might be useful to others, sharing it with the community.

For an in-depth look at custom KSQL functions, including UDFs that leverage embedded machine learning models, remote APIs, and more, you can check out my presentation from Kafka Summit London: The Exciting Frontier of Custom KSQL Functions.

Hojjat Jafarpour’s session from Kafka Summit San Francisco, UDF/UDAF: The Extensibility Framework for KSQL, may be of interest as well.

If you’re interested in more, you can:

Mitch Seymour is a senior data systems engineer at Mailchimp. Using Kafka Streams and KSQL, his team builds stream processing applications to support data science and business intelligence initiatives across the company. Outside of work, he contributes to open source software, plays retro video games, and runs a non-profit called Puplift to help animal welfare organizations with their technological needs.

Subscribe to the Confluent Blog

購読する

More Articles Like This

Introducing ksqlDB
Jay Kreps

Introducing ksqlDB

Jay Kreps

Today marks a new release of KSQL, one so significant that we’re giving it a new name: ksqlDB. Like KSQL, ksqlDB remains freely available and community licensed, and you can […]

How to Use Single Message Transforms in Kafka Connect
Chris Matta

How to Use Single Message Transforms in Kafka Connect

Chris Matta

Kafka Connect is the part of Apache Kafka® that provides reliable, scalable, distributed streaming integration between Apache Kafka and other systems. Kafka Connect has connectors for many, many systems, and […]

4 Steps to Creating Apache Kafka Connectors with the Kafka Connect API
Tiffany Chang

4 Steps to Creating Apache Kafka Connectors with the Kafka Connect API

Tiffany Chang

If you’ve worked with the Apache Kafka® and Confluent ecosystem before, chances are you’ve used a Kafka Connect connector to stream data into Kafka or stream data out of it. […]

Fully managed Apache Kafka as a Service!

Try Free