对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
向侦听器容器添加了版本 2.1.3 和方法。
以前,您可以在 a 中暂停使用者,然后通过侦听 a 来恢复它,这会提供对对象的访问。
虽然可以使用事件侦听器暂停空闲容器中的使用者,但在某些情况下,这不是线程安全的,因为无法保证在使用者线程上调用事件侦听器。
若要安全地暂停和恢复使用者,应在侦听器容器上使用 and 方法。
A 在下一个之前生效;A 在当前返回后立即生效。
当容器暂停时,它会继续到使用者,从而避免在使用组管理时重新平衡,但它不会检索任何记录。
有关详细信息,请参阅 Kafka 文档。pause()
resume()
ConsumerAwareMessageListener
ListenerContainerIdleEvent
Consumer
pause
resume
pause()
poll()
resume()
poll()
poll()
从版本 2.1.5 开始,您可以调用以查看是否已被调用。
但是,消费者可能还没有真正停下来。 如果所有实例实际上都已暂停,则返回 true。isPauseRequested()
pause()
isConsumerPaused()
Consumer
此外(也是从 2.1.5 开始),实例以容器作为属性和属性中涉及的实例发布。ConsumerPausedEvent
ConsumerResumedEvent
source
TopicPartition
partitions
从版本 2.9 开始,当新的容器属性设置为 true 时,暂停在处理当前记录后生效。
默认情况下,当处理完上一次轮询的所有记录时,暂停将生效。
请参阅 [pauseImmediate]。pauseImmediate
以下简单的 Spring Boot 应用程序演示了如何使用容器注册表获取对方法容器的引用,并暂停或恢复其使用者以及接收相应的事件:@KafkaListener
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
下面的列表显示了上述示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2