此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
暂停和恢复侦听器容器
添加了版本 2.1.3pause()
和resume()
方法传递给侦听器容器。
以前,您可以在ConsumerAwareMessageListener
并通过监听ListenerContainerIdleEvent
,它提供对Consumer
对象。
虽然您可以使用事件侦听器在空闲容器中暂停使用者,但在某些情况下,这不是线程安全的,因为无法保证在使用者线程上调用事件侦听器。
要安全地暂停和恢复使用者,您应该使用pause
和resume
方法。
一个pause()
在下一个poll()
;一个resume()
在当前poll()
返回。
当容器暂停时,它将继续poll()
使用者,如果正在使用组管理,则避免再平衡,但它不检索任何记录。
有关更多信息,请参阅 Kafka 文档。
从版本 2.1.5 开始,您可以调用isPauseRequested()
以查看pause()
已被调用。
但是,使用者可能实际上尚未暂停。isConsumerPaused()
如果所有Consumer
实例实际上已暂停。
此外(也是从 2.1.5 开始),ConsumerPausedEvent
和ConsumerResumedEvent
实例与容器一起发布,作为source
property 和TopicPartition
实例中涉及的partitions
财产。
从版本 2.9 开始,一个新的 container 属性pauseImmediate
,如果设置为 true,则会导致暂停在处理当前记录后生效。
默认情况下,暂停在处理了上一次轮询中的所有记录后生效。
请参阅 pauseImmediate。
以下简单的 Spring Boot 应用程序通过使用容器注册表来获取对@KafkaListener
方法的容器并暂停或恢复其使用者以及接收相应的事件:
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
下面的清单显示了前面示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2