对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

实施消费者再平衡

Kafka 客户端现在支持触发强制再平衡的选项。 从 version 开始3.1.2,Spring for Apache Kafka 提供了一个选项,可以通过消息侦听器容器在 Kafka 使用者上调用此 API。 调用此 API 时,它只是提醒 Kafka 使用者触发强制再平衡;实际的再平衡只会作为下一次poll()操作。 如果已经有正在进行的再平衡,则调用强制再平衡是 NO-OP。 调用方必须等待当前重新平衡完成,然后才能调用另一个重新平衡。 请参阅 javadocs 以获取enforceRebalance了解更多详情。spring-doc.cadn.net.cn

以下代码片段显示了使用消息侦听器容器强制执行再平衡的本质。spring-doc.cadn.net.cn

@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
    System.out.println("From KafkaListener: " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
    return args -> {
        final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
        System.out.println("Enforcing a rebalance");
        Thread.sleep(5_000);
        listenerContainer.enforceRebalance();
        Thread.sleep(5_000);
    };
}

如上面的代码所示,应用程序使用KafkaListenerEndpointRegistry要访问消息侦听器容器,然后调用enforceRebalnceAPI 上。 调用enforceRebalance在侦听器容器上,它将调用委托给底层 Kafka 使用者。 Kafka 使用者将触发再平衡,作为下一个poll()操作。spring-doc.cadn.net.cn