对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
连接到 Kafka
从版本 2.5 开始,这些都扩展了KafkaResourceFactory
.
这允许在运行时通过添加Supplier<String>
添加到他们的配置中:setBootstrapServersSupplier(() -> …)
.
将对所有新连接调用此函数以获取服务器列表。
Consumer 和 Producer 通常存在很长时间。
要关闭现有 Producer,请调用reset()
在DefaultKafkaProducerFactory
.
要关闭现有 Consumer,请调用stop()
(然后start()
) 在KafkaListenerEndpointRegistry
和/或stop()
和start()
在任何其他侦听器容器 bean 上。
为方便起见,该框架还提供了一个ABSwitchCluster
它支持两组引导服务器;其中 1 个随时处于活动状态。
配置ABSwitchCluster
并将其添加到 producer 和 consumer 工厂中,并且KafkaAdmin
,通过调用setBootstrapServersSupplier()
.
当您想要切换时,调用primary()
或secondary()
并调用reset()
在生产商工厂建立新的连接;对于消费者,stop()
和start()
all listener containers 的 Listener 容器。
使用@KafkaListener
s,stop()
和start()
这KafkaListenerEndpointRegistry
豆。
有关更多信息,请参阅 Javadocs。
Factory 侦听器
从版本 2.5 开始,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
可以配置Listener
在创建或关闭创建者或使用方时接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id
是通过附加client-id
属性(从metrics()
创建后)到工厂beanName
属性,以.
.
例如,这些侦听器可用于创建和绑定 MicrometerKafkaClientMetrics
实例(并在客户端关闭时关闭它)。
该框架提供的侦听器正是这样做的;请参阅 Micrometer 本机度量。