If you adopt a streaming platform such as Apache Kafka, one of the most important questions to answer is: what topics are you going to use? In particular, if you have a bunch of different events that you want to publish to Kafka as messages, do you put them in the same topic, or do you split them across different topics?
The most important function of a topic is to allow a consumer to specify which subset of messages it wants to consume. At the one extreme, putting absolutely all your data in a single topic is probably a bad idea, since it would mean consumers have no way of selecting the events of interest—they would just get everything. At the other extreme, having millions of different topics is also a bad idea, since each topic in Kafka has a cost, and thus having a large number of topics will harm performance.
Actually, from a performance point of view, it’s the number of partitions that matters. But since each topic in Kafka has at least one partition, if you have n topics, you inevitably have at least n partitions. A while ago, Jun Rao wrote a blog post explaining the cost of having many partitions (end-to-end latency, file descriptors, memory overhead, recovery time after a failure). As a rule of thumb, if you care about latency, you should probably aim for (order of magnitude) hundreds of topic-partitions per broker node. If you have tens of thousands, or even thousands of partitions per node, your latency will suffer.
That performance argument provides some guidance for designing your topic structure: if you’re finding yourself with many thousands of topics, it would be advisable to merge some of the fine-grained, low-throughput topics into coarser-grained topics, and thus reduce the proliferation of partitions.
However, performance is not the end of the story. Even more important, in my opinion, are the data integrity and data modelling aspects of your topic structure. We will discuss those in the rest of this article.
The common wisdom (according to several conversations I’ve had, and according to a mailing list thread) seems to be: put all events of the same type in the same topic, and use different topics for different event types. That line of thinking is reminiscent of relational databases, where a table is a collection of records with the same type (i.e. the same set of columns), so we have an analogy between a relational table and a Kafka topic.
The Confluent Schema Registry has traditionally reinforced this pattern, because it encourages you to use the same Avro schema for all messages in a topic. That schema can be evolved while maintaining compatibility (e.g. by adding optional fields), but ultimately all messages have been expected to conform to a certain record type. We’ll revisit this later in the post, after we’ve covered some more background.
For some types of streaming data, such as logged activity events, it makes sense to require that all messages in the same topic conform to the same schema. However, some people are using Kafka for more database-like purposes, such as event sourcing, or exchanging data between microservices. In this context, I believe it’s less important to define a topic as a grouping of messages with the same schema. Much more important is the fact that Kafka maintains ordering of messages within a topic-partition.
Imagine a scenario in which you have some entity (say, a customer), and many different things can happen to that entity: a customer is created, a customer changes their address, a customer adds a new credit card to their account, a customer makes a customer support enquiry, a customer pays an invoice, a customer closes their account.
The order of those events matters. For example, we might expect that a customer is created before anything else can happen to a customer, and we might expect that after a customer closes their account nothing more will happen to them. When using Kafka, you can preserve the order of those events by putting them all in the same partition. In this example, you would use the customer ID as the partitioning key, and then put all these different events in the same topic. They must be in the same topic because different topics mean different partitions, and ordering is not preserved across partitions.
If you did use different topics for (say) the
customerInvoicePaid events, then a consumer of those topics may see the events in a nonsensical order. For example, the consumer may see an address change for a customer that does not exist (because it has not yet been created, since the corresponding
customerCreated event has been delayed).
The risk of reordering is particularly high if a consumer is shut down for a while, perhaps for maintenance or to deploy a new version. While the consumer is stopped, events continue to be published, and those events are stored in the selected topic-partition on the Kafka brokers. When the consumer starts up again, it consumes the backlog of events from all of its input partitions. If the consumer has only one input, that’s no problem: the pending events are simply processed sequentially in the order they are stored. But if the consumer has several input topics, it will pick input topics to read in some arbitrary order. It may read all of the pending events from one input topic before it reads the backlog on another input topic, or it may interleave the inputs in some way.
Thus, if you put the
customerInvoicePaid events in three separate topics, the consumer may well see all of the
customerAddressChanged events before it sees any of the
customerCreated events. And so it is likely that the consumer will see a
customerAddressChanged event for a customer that, according to its view of the world, has not yet been created.
You might be tempted to attach a timestamp to each message and use that for event ordering. That might just about work if you are importing events into a data warehouse, where you can order the events after the fact. But in a stream process, timestamps are not enough: if you get an event with a certain timestamp, you don’t know whether you still need to wait for some previous event with a lower timestamp, or if all previous events have arrived and you’re ready to process the event. And relying on clock synchronisation generally leads to nightmares; for more detail on the problems with clocks, I refer you to Chapter 8 of my book.
Given that background, I will propose some rules of thumb to help you figure out which things to put in the same topic, and which things to split into separate topics:
If you are using a data encoding such as JSON, without a statically defined schema, you can easily put many different event types in the same topic. However, if you are using a schema-based encoding such as Avro, a bit more thought is needed to handle multiple event types in a single topic.
As mentioned above, the Avro-based Confluent Schema Registry for Kafka currently relies on the assumption that there is one schema for each topic (or rather, one schema for the key and one for the value of a message). You can register new versions of a schema, and the registry checks that the schema changes are forward and backward compatible. A nice thing about this design is that you can have different producers and consumers using different schema versions at the same time, and they still remain compatible with each other.
More precisely, when Confluent’s Avro serializer registers a schema in the registry, it does so under a subject name. By default, that subject is
<topic>-key for message keys and
<topic>-value for message values. The schema registry then checks the mutual compatibility of all schemas that are registered under a particular subject.
I have recently made a patch to the Avro serializer that makes the compatibility check more flexible. The patch adds two new configuration options:
key.subject.name.strategy (which defines how to construct the subject name for message keys), and
value.subject.name.strategy (how to construct the subject name for message values). The options can take one of the following values:
io.confluent.kafka.serializers.subject.TopicNameStrategy(default): The subject name for message keys is
<topic>-valuefor message values. This means that the schemas of all messages in the topic must be compatible with each other.
io.confluent.kafka.serializers.subject.RecordNameStrategy: The subject name is the fully-qualified name of the Avro record type of the message. Thus, the schema registry checks the compatibility for a particular record type, regardless of topic. This setting allows any number of different event types in the same topic.
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy: The subject name is
<topic>is the Kafka topic name, and
<type>is the fully-qualified name of the Avro record type of the message. This setting also allows any number of event types in the same topic, and further constrains the compatibility check to the current topic only.
With this new feature, you can easily and cleanly put all the different events for a particular entity in the same topic. Now you can freely choose the granularity of topics based on the criteria above, and not be limited to a single event type per topic.