This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.6! |
Rebalancing Listeners
ContainerProperties
has a property called consumerRebalanceListener
, which takes an implementation of the Kafka client’s ConsumerRebalanceListener
interface.
If this property is not provided, the container configures a logging listener that logs rebalance events at the INFO
level.
The framework also adds a sub-interface ConsumerAwareRebalanceListener
.
The following listing shows the ConsumerAwareRebalanceListener
interface definition:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
Notice that there are two callbacks when partitions are revoked. The first is called immediately. The second is called after any pending offsets are committed. This is useful if you wish to maintain offsets in some external repository, as the following example shows:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
Starting with version 2.4, a new method onPartitionsLost() has been added (similar to a method with the same name in ConsumerRebalanceLister ).
The default implementation on ConsumerRebalanceLister simply calls onPartitionsRevoked .
The default implementation on ConsumerAwareRebalanceListener does nothing.
When supplying the listener container with a custom listener (of either type), it is important that your implementation does not call onPartitionsRevoked from onPartitionsLost .
If you implement ConsumerRebalanceListener you should override the default method.
This is because the listener container will call its own onPartitionsRevoked from its implementation of onPartitionsLost after calling the method on your implementation.
If you implementation delegates to the default behavior, onPartitionsRevoked will be called twice each time the Consumer calls that method on the container’s listener.
|
Kafka 4.0 Consumer Rebalance Protocol
Spring for Apache Kafka 4.0 supports Apache Kafka 4.0’s new consumer rebalance protocol (KIP-848), which enhances performance with server-driven, incremental partition assignments. This reduces rebalancing downtime for consumer groups.
To enable the new protocol, configure the group.protocol
property:
spring.kafka.consumer.properties.group.protocol=consumer
Keep in mind that, the above property is a Spring Boot property. If you are not using Spring Boot, you may want to set it manually as shown below.
Alternatively, set it programmatically:
Map<String, Object> props = new HashMap<>();
props.put("group.protocol", "consumer");
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);
The new protocol works seamlessly with ConsumerAwareRebalanceListener
.
Due to incremental rebalancing, onPartitionsAssigned
may be called multiple times with smaller partition sets, unlike the single callback typical of the legacy protocol.
The new protocol uses server-side partition assignments, ignoring client-side custom assignors set via spring.kafka.consumer.partition-assignment-strategy
.
A warning is logged if a custom assignor is detected.
To use custom assignors, set group.protocol=classic
(which is the default if you don’t specify a value for group.protocol
).