此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
Kafka Binder 侦听器容器定制器
Spring Cloud Stream 通过使用定制器为消息侦听器容器提供了强大的定制选项。
本节介绍可用于 Kafka 的定制器接口:ListenerContainerCustomizer
,其特定于 Kafka 的扩展KafkaListenerContainerCustomizer
和专用的ListenerContainerWithDlqAndRetryCustomizer
.
ListenerContainerCustomizer
这ListenerContainerCustomizer
是 Spring Cloud Stream 中的一个通用接口,允许自定义消息侦听器容器。
用法
要使用ListenerContainerCustomizer
,请在您的配置中创建一个实现此接口的 Bean:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
return (container, destinationName, group) -> {
// Customize the container here
};
}
这ListenerContainerCustomizer
interface 定义以下方法:
void configure(C container, String destinationName, String group);
-
container
:要自定义的消息侦听器容器。 -
destinationName
:目标 (主题) 的名称。 -
group
:消费组 ID。
KafkaListenerContainerCustomizer
这KafkaListenerContainerCustomizer
接口扩展ListenerContainerCustomizer
修改侦听器容器的行为,并提供对特定于绑定的扩展 Kafka 使用者属性的访问。
用法
要使用KafkaListenerContainerCustomizer
,请在您的配置中创建一个实现此接口的 Bean:
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
return (container, destinationName, group, properties) -> {
// Customize the Kafka container here
};
}
这KafkaListenerContainerCustomizer
interface 添加了以下方法:
default void configureKafkaListenerContainer(
C container,
String destinationName,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
configure(container, destinationName, group);
}
此方法扩展了 baseconfigure
方法中附加参数:
-
extendedConsumerProperties
:扩展的使用者属性,包括 Kafka 特定的属性。
ListenerContainerWithDlqAndRetryCustomizer
这ListenerContainerWithDlqAndRetryCustomizer
界面为涉及死信队列 (DLQ) 和重试机制的场景提供了其他自定义选项。
用法
要使用ListenerContainerWithDlqAndRetryCustomizer
,请在您的配置中创建一个实现此接口的 Bean:
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
// Access the container here with access to the extended consumer binding properties.
};
}
这ListenerContainerWithDlqAndRetryCustomizer
interface 定义以下方法:
void configure(
AbstractMessageListenerContainer<?, ?> container,
String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
-
container
:要自定义的 Kafka 侦听器容器。 -
destinationName
:目标 (主题) 的名称。 -
group
:消费组 ID。 -
dlqDestinationResolver
:用于解析失败记录的 DLQ 目标的函数。 -
backOff
:重试的回退策略。 -
extendedConsumerProperties
:扩展的使用者属性,包括 Kafka 特定的属性。
总结
-
ListenerContainerWithDlqAndRetryCustomizer
如果启用了 DLQ,则使用。 -
KafkaListenerContainerCustomizer
用于不带 DLQ 的特定于 Kafka 的自定义。 -
基地
ListenerContainerCustomizer
用于通用定制。
这种分层方法允许在 Spring Cloud Stream 应用程序中灵活、具体地自定义 Kafka 侦听器容器。