对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
消息侦听器容器
二MessageListenerContainer
提供了 implementations:
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
这KafkaMessageListenerContainer
接收来自单个线程上所有主题或分区的所有消息。
这ConcurrentMessageListenerContainer
委托人KafkaMessageListenerContainer
实例来提供多线程消耗。
从版本 2.2.7 开始,您可以添加RecordInterceptor
到侦听器容器;它将在调用侦听器之前调用,以允许检查或修改记录。
如果侦听器返回 null,则不会调用侦听器。
从版本 2.7 开始,它有额外的方法,这些方法在 listener 退出后调用(通常,或通过抛出异常)。
此外,从版本 2.7 开始,现在有一个BatchInterceptor
,为 Batch 侦听器提供类似的功能。
此外,ConsumerAwareRecordInterceptor
(以及BatchInterceptor
) 提供对Consumer<?, ?>
.
例如,这可能用于访问侦听器中的使用者指标。
你不应该在这些拦截器中执行任何影响消费者位置和/或提交偏移量的方法;容器需要管理此类信息。 |
如果拦截器更改了记录(通过创建新记录),则topic ,partition 和offset 必须保持不变,以避免意外的副作用,例如记录丢失。 |
这CompositeRecordInterceptor
和CompositeBatchInterceptor
可用于调用多个拦截器。
默认情况下,从版本 2.8 开始,当使用事务时,在事务开始之前调用拦截器。
您可以设置侦听器容器的interceptBeforeTx
property 设置为false
在事务启动后调用拦截器。
从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是KafkaAwareTransactionManager
s.
例如,这允许拦截器参与由容器启动的 JDBC 事务。
从版本 2.3.8、2.4.6 开始,ConcurrentMessageListenerContainer
现在支持在并发大于 1 时进行静态成员身份。
这group.instance.id
后缀为-n
跟n
起价1
.
这与增加的session.timeout.ms
可用于减少再平衡事件,例如,在重新启动应用程序实例时。
用KafkaMessageListenerContainer
以下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收一个ConsumerFactory
以及有关主题和分区以及其他配置的信息,在ContainerProperties
对象。ContainerProperties
具有以下构造函数:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数采用TopicPartitionOffset
参数显式指示容器使用哪些分区(使用 Consumerassign()
方法)和可选的 initial offset 来获取。
默认情况下,正值是绝对偏移量。
默认情况下,负值是相对于分区中的当前最后一个偏移量。
的构造函数TopicPartitionOffset
这需要额外的boolean
参数。
如果这是true
,则初始偏移量(正或负)相对于此使用者的当前位置。
偏移量在容器启动时应用。
第二个选项采用一组主题,Kafka 根据group.id
property — 在组中分布分区。
第三个使用正则表达式Pattern
以选择主题。
要分配MessageListener
添加到容器中,您可以使用ContainerProps.setMessageListener
方法。
以下示例显示了如何执行此作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,在创建DefaultKafkaConsumerFactory
,使用仅接受上述属性的构造函数意味着 key 和 valueDeserializer
类是从 Configuration 中选取的。
或者Deserializer
实例可以传递给DefaultKafkaConsumerFactory
constructor 的 intent 和/或 value,在这种情况下,所有 Consumer 共享相同的实例。
另一种选择是提供Supplier<Deserializer>
s(从版本 2.3 开始),该 S 将用于获取单独的Deserializer
实例Consumer
:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请参阅 Javadoc 以获取ContainerProperties
了解有关您可以设置的各种属性的更多信息。
从 2.1.1 版本开始,一个名为logContainerConfig
可用。
什么时候true
和INFO
启用日志记录时,每个侦听器容器都会写入一条日志消息,总结其配置属性。
默认情况下,主题偏移提交的日志记录在DEBUG
日志记录级别。
从版本 2.1.2 开始,在ContainerProperties
叫commitLogLevel
用于指定这些消息的日志级别。
例如,要将日志级别更改为INFO
,您可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
.
从版本 2.2 开始,一个名为missingTopicsFatal
已添加(默认:false
从 2.3.4 开始)。
如果代理上不存在任何已配置的主题,这将阻止容器启动。
如果容器配置为侦听主题模式 (regex),则它不适用。
以前,容器线程在consumer.poll()
方法等待主题出现,同时记录许多消息。
除了日志之外,没有迹象表明存在问题。
从版本 2.8 开始,新的 container 属性authExceptionRetryInterval
已引入。
这会导致容器在获取任何消息后重试获取消息AuthenticationException
或AuthorizationException
从KafkaConsumer
.
例如,当已配置的用户被拒绝读取特定主题的访问权限或凭证不正确时,可能会发生这种情况。
定义authExceptionRetryInterval
允许容器在授予适当权限时进行恢复。
默认情况下,未配置间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。 |
从版本 2.8 开始,在创建 Consumer Factory 时,如果你提供反序列化器作为对象(在构造函数中或通过 setter),工厂将调用configure()
方法配置它们。
用ConcurrentMessageListenerContainer
单个构造函数类似于KafkaListenerContainer
构造 函数。
下面的清单显示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还具有concurrency
财产。
例如container.setConcurrency(3)
创建 3 个KafkaMessageListenerContainer
实例。
对于第一个构造函数,Kafka 使用其组管理功能在使用者之间分配分区。
侦听多个主题时,默认分区分布可能不是您所期望的。
例如,如果您有 3 个主题,每个主题有 5 个分区,并且您希望使用 使用 Spring Boot 时,可以按如下方式分配 set 策略:
|
当容器属性配置为TopicPartitionOffset
s、ConcurrentMessageListenerContainer
分发TopicPartitionOffset
跨委托的实例KafkaMessageListenerContainer
实例。
如果,比如 6 个TopicPartitionOffset
实例,并且concurrency
是3
;每个容器获取两个分区。
五人TopicPartitionOffset
实例中,两个容器获得两个分区,第三个容器获得一个分区。
如果concurrency
大于TopicPartitions
这concurrency
向下调整,以便每个容器获得一个分区。
这client.id property(如果已设置)附加-n 哪里n 是与并发对应的使用者实例。
启用 JMX 时,需要为 MBean 提供唯一名称。 |
从版本 1.3 开始,MessageListenerContainer
提供对底层KafkaConsumer
.
在ConcurrentMessageListenerContainer
这metrics()
method 返回所有目标的量度KafkaMessageListenerContainer
实例。
这些指标分为Map<MetricName, ? extends Metric>
由client-id
为底层KafkaConsumer
.
从版本 2.3 开始,ContainerProperties
提供了一个idleBetweenPolls
选项,让侦听器容器中的 main 循环在KafkaConsumer.poll()
调用。
从提供的选项中选择实际睡眠间隔作为最小值,并且max.poll.interval.ms
consumer config 和 current records 批处理时间。
提交偏移量
提供了几个选项来提交偏移量。
如果enable.auto.commit
consumer 属性是true
,Kafka 会根据其配置自动提交偏移量。
如果是false
,容器支持多个AckMode
设置(在下一个列表中描述)。
默认的AckMode
是BATCH
.
从版本 2.3 开始,框架将enable.auto.commit
自false
除非在配置中明确设置。
以前,Kafka 默认值 (true
) 。
消费者poll()
method 返回一个或多个ConsumerRecords
.
这MessageListener
。
以下列表描述了容器对每个AckMode
(当交易未被使用时):
-
RECORD
:当侦听器在处理记录后返回时提交偏移量。 -
BATCH
:当poll()
已处理。 -
TIME
:当poll()
已处理,只要ackTime
since the last commit 已超过。 -
COUNT
:当poll()
已处理,只要ackCount
自上次提交以来已收到记录。 -
COUNT_TIME
:似TIME
和COUNT
,但如果任一条件为true
. -
MANUAL
:消息侦听器负责acknowledge()
这Acknowledgment
. 之后,与BATCH
被应用。 -
MANUAL_IMMEDIATE
:当Acknowledgment.acknowledge()
method 由侦听器调用。
使用事务时,偏移量将发送到事务,语义等效于RECORD
或BATCH
,具体取决于侦听器类型(记录或批处理)。
MANUAL 和MANUAL_IMMEDIATE 要求侦听器是AcknowledgingMessageListener 或BatchAcknowledgingMessageListener .
请参见消息侦听器。 |
根据syncCommits
container 属性、commitSync()
或commitAsync()
方法。syncCommits
是true
默认情况下;另请参阅setSyncCommitTimeout
.
看setCommitCallback
获取异步提交的结果;默认回调是LoggingCommitCallback
记录错误(和调试级别的成功)。
因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
成为false
.
从版本 2.3 开始,它会无条件地将其设置为 false,除非在 consumer 工厂中特别设置或容器的 consumer 属性覆盖。
这Acknowledgment
方法如下:
public interface Acknowledgment {
void acknowledge();
}
此方法使侦听器可以控制何时提交偏移量。
从版本 2.3 开始,Acknowledgment
interface 有两个额外的方法nack(long sleep)
和nack(int index, long sleep)
.
第一个用于记录侦听器,第二个用于批处理侦听器。
为您的侦听器类型调用错误的方法将抛出IllegalStateException
.
如果要提交部分 batch,请使用nack() ,使用事务时,将AckMode 自MANUAL ;调用nack() 会将成功处理的记录的偏移量发送到交易。 |
nack() 只能在调用侦听器的使用者线程上调用。 |
nack() 在使用 Out of Order Commits 时不允许。 |
对于记录侦听器,当nack()
被调用,则提交任何待处理的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一次poll()
.
使用者可以在重新交付之前暂停,方法是将sleep
论点。
这与在容器配置了DefaultErrorHandler
.
nack() 在指定的休眠持续时间内暂停整个侦听器,包括所有分配的分区。 |
使用批处理侦听器时,您可以指定发生故障的批处理中的索引。
什么时候nack()
调用时,将在索引之前为记录提交偏移量,并在失败和丢弃的记录的分区上执行查找,以便在下一个poll()
.
有关更多信息,请参阅容器错误处理程序。
使用者在休眠期间暂停,以便我们继续轮询 broker 以保持使用者处于活动状态。
实际休眠时间及其分辨率取决于容器的pollTimeout 默认为 5 秒。
最小睡眠时间等于pollTimeout 并且所有的睡眠时间都是它的倍数。
对于较短的休眠时间,或者为了提高其准确性,请考虑减少容器的pollTimeout . |
从版本 3.0.10 开始,批处理侦听器可以使用acknowledge(index)
在Acknowledgment
论点。
调用此方法时,将提交索引处记录的偏移量(以及所有以前的记录)。
叫acknowledge()
执行部分批处理提交后,将提交批处理剩余部分的偏移量。
以下限制适用:
-
AckMode.MANUAL_IMMEDIATE
是必需的 -
必须在侦听器线程上调用该方法
-
侦听器必须使用
List
而不是原始的ConsumerRecords
-
索引必须在列表元素的范围内
-
索引必须大于上一次调用中使用的索引
这些限制是强制性的,该方法将抛出一个IllegalArgumentException
或IllegalStateException
,具体取决于冲突。
侦听器容器自动启动
侦听器容器实现SmartLifecycle
和autoStartup
是true
默认情况下。
容器在后期阶段 (Integer.MAX-VALUE - 100
).
实现SmartLifecycle
来处理来自侦听器的数据,应在早期阶段启动。
这- 100
为后续阶段留出空间,使组件能够在容器之后自动启动。