Static Membership is an enhancement to the current rebalance protocol that aims to reduce the downtime caused by excessive and unnecessary rebalances for general Apache Kafka® client implementations. This applies to Kafka consumers, Kafka Connect, and Kafka Streams. To get a better grasp on the rebalance protocol, we’ll examine this concept in depth and explain what it means. If you already know what a Kafka rebalance is, feel free to jump directly to the following section to save time: When do we trigger an unnecessary rebalance?
A Kafka rebalance is a distributed protocol for client-side applications to process a common set of resources in a dynamic group. Two primary goals for this protocol are:
Take a Kafka consumer, for example. A group of Kafka consumers read input data from Kafka through subscriptions, and topic partitions are their shared unit of tasks. Three consumers (
C3), two topics (
T2) with three partitions each, and subscriptions would appear as follows:
C1: T1, T2 C2: T2 C3: T1
The rebalance protocol ensures that
C2 take non-overlapping assignments from topic
T2*, and the same goes for
T1. A valid assignment looks like this:
C1: t1-p1, t2-p1 C2: t2-p2, t2-p3 C3: t1-p2, t1-p3
*Note that the consumer does not check if the assignment returned from the assignor respects these rules. If your customized assignor assigns partitions to multiple owners, it would still be silently accepted and cause double fetching. Strictly speaking, only built-in rebalance assignors obey this rule for resource isolation
However, the assignment below is not allowed, as it introduces overlapping assignments:
C1: t1-p1, t2-p1 C2: t2-p1, t2-p2, t2-p3 C3: t1-p2, t1-p3
The rebalance protocol also needs to properly handle membership changes. For the above case, if a new member
C4 subscribing to
T2 joins, the rebalance protocol will try to adjust the load within the group:
C1: t1-p1, t2-p1 C2: t2-p3 C3: t1-p2, t1-p3 C4: t2-p2
In summary, the rebalance protocol needs to “balance” the load within a client group as it scales, while making the task ownership safe at the same time. Similar to most distributed consensus algorithms, Kafka takes a two-phase approach. For simplicity, we’ll stick to the Kafka consumer for now.
The endpoint that consumers commit progress to is called a group coordinator, which is hosted on a designated broker. It also serves as the centralized manager of group rebalances. When the group starts rebalancing, the group coordinator first switches its state to rebalance so that all interacting consumers are notified to rejoin the group. Until all the members rejoin or the coordinator waits long enough and reaches the rebalance timeout, the group proceeds to another stage called sync, which officially announces the formation of a valid consumer group. To distinguish members who fall out of the group during this process, each successful rebalance increments a counter called generation ID and propagates its value to all the joined members, so that out-of-generation members can be fenced.
In the sync stage, the group coordinator replies to all members with the latest generation information. Specifically, it nominates one of the members as the leader and replies to the leader with encoded membership and subscription metadata.
The leader shall complete the assignment based on membership and topic metadata information, and reply to the coordinator with the assignment information. During this period, all the followers are required to send a sync group request to get their actual assignments and go into a wait pool until the leader finishes transmitting the assignment to the coordinator. Upon receiving the assignment, the coordinator transitions the group from
stable. All pending and upcoming follower sync requests will be answered with individual assignment.
Here, we describe two demo cases: one is an actual rebalance walkthrough, and the other is the high-level state machine. Note that in the
sync stage, we can always fall back to
rebalance mode if rebalance conditions are triggered, such as adding a new member, topic partition expansion, etc.
The rebalance protocol is very effective at balancing task processing load in real time and letting users freely scale their applications, but it is a rather heavy operation as well, requiring the entire consumer group to stop working temporarily. Members are expected to revoke ongoing assignments and initialize new assignments at the start and end of each rebalance. Such operations take overhead, especially for stateful operations where the task needs to first restore a local state from its backup topic before serving.
Essentially, a rebalance kicks in when following conditions are met:
In the real world, there are many scenarios where a group coordinator triggers unnecessary rebalances that are detrimental to application performance. The first case is transient member timeout. To understand this, we need to first introduce two concepts: consumer heartbeat and session timeout.
A Kafka consumer maintains a background thread to periodically send heartbeat requests to the coordinator to indicate its liveness. The consumer configuration called
session.timeout.ms defines how long the coordinator waits after the member’s last heartbeat before it assuming the member failed. When this value is set too low, a network jitter or a long garbage collection (GC) might fail the liveness check, causing the group coordinator to remove this member and begin rebalancing. The solution is simple: instead of using the default 10-second session timeout, set it to a larger value to greatly reduce transient failure-caused rebalances.
Note that the longer you set the session timeout to, the longer partial unavailability you will have when a consumer actually fails. We will explain how to choose this value in a later section on how to opt into Static Membership.
From time to time, we need to restart our application, deploy new code, or perform a rollback, etc. These operations in the worst case may cause a lot of rebalances. When a consumer instance shuts down, it sends a leave group request to the group coordinator, letting itself be removed from the group and triggering another rebalance afterwards. When that consumer resumes after a bounce, it sends a join group request to the group coordinator, triggering another rebalance.
During a rolling bounce procedure, consecutive rebalances are triggered as instances that are shut down and resumed, and partitions are reassigned back and forth. The final assignment result is purely random and incurs a large cost to pay for task shuffling and reinitialization.
How about letting members choose not to leave the group? Not an option either. To understand why, we need to talk about the member ID for a moment.
When a new member joins the group, the request contains no membership information. The group coordinator will assign a universally unique identifier (UUID) to this member as its member ID, put the ID in the cache, and embed this information in its response to the member. Within this consumer’s lifecycle, it could reuse the same member ID without the coordinator triggering a rebalance when it rejoins, except in edge cases such as leader rejoining.
Going back to the rolling bounce scenario, a restarted member will erase in-memory membership information and rejoin the group without member ID or generation ID. Since the rejoining consumer would be recognized as a completely new member of the group, the group coordinator does not guarantee that its old assignment will be assigned back. As you can see, a member leaving the group is not the root cause for unnecessary task shuffling—the loss of identity is.
Static Membership, unlike Dynamic Membership, aims to persist member identity across multiple generations of the group. The goal here is to reuse the same subscription information and make the old members “recognizable” to the coordinator. Static Membership introduces a new consumer configuration called
group.instance.id, which is configured by users to uniquely identify their consumer instances. Although the coordinator-assigned member ID gets lost during restart, the coordinator will still recognize this member based on the provided group instance ID in the join request. Therefore, the same assignment is guaranteed.
Static Membership is extremely friendly with cloud application setups, because nowadays deployment technologies such as Kubernetes are very self-contained for managing the health of applications. To heal a dead or ill-performing consumer, Kubernetes could easily bring down the relevant instance and spin up a new one using the same instance ID. With a cloud management framework, the group coordinator’s client health check is ongoing.
Below is a quick demo of how Static Membership works.
Since the Apache Kafka 2.3 release, Static Membership has become generally available for the community. Here are the instructions if you want to be an alpha user:
inter.broker.protocol.versionto 2.3 or higher in order to enable this feature.
group.instance.idconfiguration to a unique ID for your consumer. If you are a Kafka Streams user, use the same configuration for your stream instance.
Static Membership only works as expected if these instructions are followed. We have nonetheless made some preventative efforts to reduce the potential risk of human error.
Sometimes a user can forget to upgrade a broker. When the consumer first gets started, it acquires the API version of the designated broker. If the client is configured with group instance ID and the broker is on older version, the application will crash immediately as the broker has no support for Static Membership yet.
If a user fails to configure the group instance ID uniquely, meaning that there are two or more members configured with the same instance ID, a fencing logic comes into play. When a known static member rejoins without a member ID, the coordinator generates a new UUID to reply to this member as its new member ID. At the same time, the group coordinator maintains a mapping from the instance ID to the latest assigned member ID. If a known static member rejoins with a valid member ID that doesn’t match with the cached ID, it immediately gets fenced by the coordinator response. This eliminates the risk of concurrent processing for duplicate static members.
In this very first version, we expect bugs that may invalidate the processing semantics or hinder the fencing logic. Some of them have been addressed in the trunk, such as KAFKA-8715, and we are still actively working on finding more issues.
There are still many details we haven’t covered in this blog post, like how this effort compares with Incremental Cooperative Rebalancing, how Static Membership helps with a non-sticky assignment strategy, and tooling support around the new protocol. If you’re interested, I cover all this and more in my session with Liquan Pei at Kafka Summit San Francisco titled Static Membership: Rebalance Strategy Designed for the Cloud.
This work has been ongoing for over a year, with many iterations and huge support from my colleagues, past colleagues, and community friends. I owe a big thank you to all of you, especially Guozhang Wang, Jason Gustafson, Liquan Pei, Yu Yang, Shawn Nguyen, Matthias J. Sax, John Roesler, Mayuresh Gharat, Dong Lin, and Mike Freyberger.
Boyang Chen is a software engineer at Confluent and a committer on Apache Kafka. He works on various technical initiatives, including Kafka Streams, exactly-once semantics, Apache ZooKeeper removal, and more. Previously, Boyang worked at Pinterest as a software engineer on the Ads Infrastructure Team, where he tackled various mission-critical challenges and rebuilt the whole budgeting and pacing pipeline.