对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
开始@KafkaListener
序列中的 s
一个常见的用例是在另一个侦听器使用主题中的所有记录后启动侦听器。
例如,您可能希望在处理来自其他主题的记录之前将一个或多个压缩主题的内容加载到内存中。
从版本 2.7.3 开始,新组件ContainerGroupSequencer
已引入。
它使用@KafkaListener
的containerGroup
属性将容器分组在一起,并在当前组中的所有容器都处于空闲状态时启动下一个组中的容器。
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在这里,我们有 4 个监听器,分为两组,g1
和g2
.
在应用程序上下文初始化期间,排序器将autoStartup
属性设置为false
.
它还将idleEventInterval
对于任何容器(尚未设置),设置为提供的值(在本例中为 5000 毫秒)。
然后,当 sequencer 由应用程序上下文启动时,将启动第一组中的容器。
如ListenerContainerIdleEvent
s,则每个容器中的每个子容器都将被停止。
当ConcurrentMessageListenerContainer
已停止,则父容器将停止。
当一个组中的所有容器都已停止时,将启动下一个组中的容器。
组中的组或容器的数量没有限制。
默认情况下,最终组 (g2
上图)在空闲时不会停止。
要修改该行为,请将stopLastGroupWhenIdle
自true
在 Sequencer 上。
顺便说一句,以前每个组中的容器被添加到类型为Collection<MessageListenerContainer>
其中 Bean 名称是containerGroup
.
这些集合现在已被弃用,取而代之的是 bean 类型的ContainerGroup
其中 Bean 名称是组名称,后缀为.group
;在上面的示例中,将有 2 个 beang1.group
和g2.group
.
这Collection
Bean 将在将来的发行版中删除。