对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
暂停和恢复侦听器容器
添加了版本 2.1.3pause()
和resume()
方法传递给侦听器容器。
以前,您可以在ConsumerAwareMessageListener
并通过监听ListenerContainerIdleEvent
,它提供对Consumer
对象。
虽然您可以使用事件侦听器在空闲容器中暂停使用者,但在某些情况下,这不是线程安全的,因为无法保证在使用者线程上调用事件侦听器。
要安全地暂停和恢复使用者,您应该使用pause
和resume
方法。
一个pause()
在下一个poll()
;一个resume()
在当前poll()
返回。
当容器暂停时,它将继续poll()
使用者,如果正在使用组管理,则避免再平衡,但它不检索任何记录。
有关更多信息,请参阅 Kafka 文档。
从版本 2.1.5 开始,您可以调用isPauseRequested()
以查看pause()
已被调用。
但是,使用者可能实际上尚未暂停。isConsumerPaused()
如果所有Consumer
实例实际上已暂停。
此外(也是从 2.1.5 开始),ConsumerPausedEvent
和ConsumerResumedEvent
实例与容器一起发布,作为source
property 和TopicPartition
实例中涉及的partitions
财产。
从版本 2.9 开始,一个新的 container 属性pauseImmediate
,如果设置为 true,则会导致暂停在处理当前记录后生效。
默认情况下,暂停在处理了上一次轮询中的所有记录后生效。
请参阅 [pauseImmediate]。
以下简单的 Spring Boot 应用程序通过使用容器注册表来获取对@KafkaListener
方法的容器并暂停或恢复其使用者以及接收相应的事件:
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
下面的清单显示了前面示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2