对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

连接到 Kafka

从版本 2.5 开始,这些都扩展了KafkaResourceFactory. 这允许在运行时通过添加Supplier<String>添加到他们的配置中:setBootstrapServersSupplier(() -> …​). 将对所有新连接调用此函数以获取服务器列表。 Consumer 和 Producer 通常存在很长时间。 要关闭现有 Producer,请调用reset()DefaultKafkaProducerFactory. 要关闭现有 Consumer,请调用stop()(然后start()) 在KafkaListenerEndpointRegistry和/或stop()start()在任何其他侦听器容器 bean 上。spring-doc.cadn.net.cn

为方便起见,该框架还提供了一个ABSwitchCluster它支持两组引导服务器;其中 1 个随时处于活动状态。 配置ABSwitchCluster并将其添加到 producer 和 consumer 工厂中,并且KafkaAdmin,通过调用setBootstrapServersSupplier(). 当您想要切换时,调用primary()secondary()并调用reset()在生产商工厂建立新的连接;对于消费者,stop()start()all listener containers 的 Listener 容器。 使用@KafkaListeners,stop()start()KafkaListenerEndpointRegistry豆。spring-doc.cadn.net.cn

有关更多信息,请参阅 Javadocs。spring-doc.cadn.net.cn

Factory 侦听器

从版本 2.5 开始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory可以配置Listener在创建或关闭创建者或使用方时接收通知。spring-doc.cadn.net.cn

Producer Factory 侦听器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
Consumer Factory 侦听器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每种情况下,id是通过附加client-id属性(从metrics()创建后)到工厂beanName属性,以..spring-doc.cadn.net.cn

例如,这些侦听器可用于创建和绑定 MicrometerKafkaClientMetrics实例(并在客户端关闭时关闭它)。spring-doc.cadn.net.cn

该框架提供的侦听器正是这样做的;请参阅 Micrometer 本机度量。spring-doc.cadn.net.cn