4. 参考资料
参考文档的这一部分详细介绍了构成 Spring for Apache Kafka 的各种组件。 主要章节介绍了使用 Spring 开发 Kafka 应用程序的核心类。
4.1. 使用 Spring for Apache Kafka
本节详细解释了影响使用 Spring for Apache Kafka 的各种问题。 有关快速但不太详细的介绍,请参阅 快速浏览。
4.1.1. 连接到 Kafka
从版本 2.5 开始,这些都扩展了KafkaResourceFactory
.
这允许在运行时通过添加Supplier<String>
添加到他们的配置中:setBootstrapServersSupplier(() → …)
.
将对所有新连接调用此函数以获取服务器列表。
Consumer 和 Producer 通常存在很长时间。
要关闭现有 Producer,请调用reset()
在DefaultKafkaProducerFactory
.
要关闭现有 Consumer,请调用stop()
(然后start()
) 在KafkaListenerEndpointRegistry
和/或stop()
和start()
在任何其他侦听器容器 bean 上。
为方便起见,该框架还提供了一个ABSwitchCluster
它支持两组引导服务器;其中 1 个随时处于活动状态。
配置ABSwitchCluster
并将其添加到 producer 和 consumer 工厂中,并且KafkaAdmin
,通过调用setBootstrapServersSupplier()
.
当您想要切换时,调用primary()
或secondary()
并调用reset()
在生产商工厂建立新的连接;对于消费者,stop()
和start()
all listener containers 的 Listener 容器。
使用@KafkaListener
s,stop()
和start()
这KafkaListenerEndpointRegistry
豆。
有关更多信息,请参阅 Javadocs。
Factory 侦听器
从版本 2.5 开始,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
可以配置Listener
在创建或关闭创建者或使用方时接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id
是通过附加client-id
属性(从metrics()
创建后)到工厂beanName
属性,以.
.
例如,这些侦听器可用于创建和绑定 MicrometerKafkaClientMetrics
实例(并在客户端关闭时关闭它)。
该框架提供的侦听器正是这样做的;请参阅 Micrometer 本机度量。
4.1.2. 配置 Topic
如果您定义了KafkaAdmin
bean 时,它可以自动向 broker 添加主题。
为此,您可以添加NewTopic
@Bean
对于每个主题添加到应用程序上下文。
版本 2.3 引入了一个新类TopicBuilder
使此类 bean 的创建更加方便。
以下示例显示了如何执行此作:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
从版本 2.6 开始,您可以省略partitions()
和/或replicas()
代理默认值将应用于这些属性。
代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464。
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
从版本 2.7 开始,您可以声明多个NewTopic
s 的KafkaAdmin.NewTopics
bean 定义:
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
使用 Spring Boot 时,KafkaAdmin bean 会自动注册,因此您只需要NewTopic (和/或NewTopics ) @Bean s. |
默认情况下,如果代理不可用,则会记录一条消息,但会继续加载上下文。
您可以通过编程方式调用管理员的initialize()
方法稍后重试。
如果您希望这种情况被视为致命的,请将管理员的fatalIfBrokerNotAvailable
property 设置为true
.
然后,上下文无法初始化。
如果 broker 支持它(1.0.0 或更高版本),则如果发现现有主题的分区数少于NewTopic.numPartitions . |
从版本 2.7 开始,KafkaAdmin
提供在运行时创建和检查主题的方法。
-
createOrModifyTopics
-
describeTopics
要获得更高级的功能,您可以使用AdminClient
径直。
以下示例显示了如何执行此作:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
从版本 2.9.10、3.0.9 开始,您可以提供Predicate<NewTopic>
,这可用于确定特定的NewTopic
应考虑创建或修改 bean。
这很有用,例如,如果您有多个KafkaAdmin
实例指向不同的集群,并且您希望选择应由每个管理员创建或修改的主题。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));
4.1.3. 发送消息
本节介绍如何发送消息。
用KafkaTemplate
本节介绍如何使用KafkaTemplate
以发送消息。
概述
这KafkaTemplate
包装生产者并提供将数据发送到 Kafka 主题的便捷方法。
下面的清单显示了KafkaTemplate
:
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
有关更多详细信息,请参阅 Javadoc。
在版本 3.0 中,之前返回ListenableFuture 已更改为 returnCompletableFuture .
为了方便迁移,2.9 版本添加了一种方法usingCompletableFuture() 它提供了相同的方法CompletableFuture 返回类型;此方法不再可用。 |
这sendDefault
API 要求已向模板提供默认主题。
API 接收timestamp
作为参数,并将此时间戳存储在记录中。
用户提供的时间戳的存储方式取决于在 Kafka 主题上配置的时间戳类型。
如果主题配置为使用CREATE_TIME
,则记录用户指定的时间戳(如果未指定,则生成)。
如果主题配置为使用LOG_APPEND_TIME
时,将忽略用户指定的时间戳,并且代理会添加本地代理时间。
要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它。 以下示例显示了如何执行此作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从版本 2.5 开始,您现在可以覆盖工厂的ProducerConfig
属性创建来自同一工厂的不同生产者配置的模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
请注意,类型为ProducerFactory<?, ?>
(例如由 Spring Boot 自动配置的那个)可以用不同的狭窄泛型类型引用。
您还可以使用标准的<bean/>
定义。
然后,要使用模板,您可以调用其方法之一。
当您使用带有Message<?>
parameter 中,主题、分区和密钥信息在消息头中提供,其中包括以下项目:
-
KafkaHeaders.TOPIC
-
KafkaHeaders.PARTITION
-
KafkaHeaders.KEY
-
KafkaHeaders.TIMESTAMP
消息有效负载是数据。
或者,您可以配置KafkaTemplate
替换为ProducerListener
要获取包含发送结果(成功或失败)的异步回调,而不是等待Future
以完成。
下面的清单显示了ProducerListener
接口:
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,模板配置了LoggingProducerListener
,它会记录错误,并且在发送成功时不执行任何作。
为方便起见,如果您只想实现其中一种方法,则提供了默认方法实现。
请注意,send 方法返回一个CompletableFuture<SendResult>
.
您可以向侦听器注册回调,以异步接收发送结果。
以下示例显示了如何执行此作:
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
SendResult
具有两个属性,一个ProducerRecord
和RecordMetadata
.
有关这些对象的信息,请参阅 Kafka API 文档。
这Throwable
可以转换为KafkaProducerException
;其failedProducerRecord
属性包含失败的记录。
如果你想阻塞发送线程等待结果,你可以调用 future 的get()
方法;建议使用带有 timeout 的方法。
如果您已将linger.ms
,您可能希望调用flush()
之前,或者为了方便起见,模板有一个带有autoFlush
参数,该参数使模板flush()
在每次发送时。
仅当您已设置linger.ms
producer 属性,并希望立即发送部分批处理。
例子
本节介绍向 Kafka 发送消息的示例:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
请注意,ExecutionException
是KafkaProducerException
使用failedProducerRecord
财产。
用RoutingKafkaTemplate
从版本 2.5 开始,您可以使用RoutingKafkaTemplate
在运行时根据目标选择创建者topic
名字。
路由模板不支持事务,execute ,flush 或metrics 作,因为这些作的主题未知。 |
该模板需要java.util.regex.Pattern
自ProducerFactory<Object, Object>
实例。
此映射应排序(例如LinkedHashMap
),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。
以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用同一模板发送到不同的主题,每个主题使用不同的值序列化器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
相应的@KafkaListener
的 显示在 Annotation Properties 中。
有关实现类似结果的另一种技术,但具有将不同类型的发送到同一主题的附加功能,请参阅 Delegating Serializer 和 Deserializer。
用DefaultKafkaProducerFactory
如用KafkaTemplate
一个ProducerFactory
用于创建 Producer。
不使用 Transaction 时,默认情况下,DefaultKafkaProducerFactory
创建一个供所有客户端使用的单例 Producer,如KafkaProducer
JavaDocs 的 Java 文档。
但是,如果您调用flush()
在模板上,这可能会导致使用同一生产者的其他线程出现延迟。
从版本 2.3 开始,DefaultKafkaProducerFactory
具有新属性producerPerThread
.
当设置为true
,工厂将为每个线程创建(并缓存)一个单独的 producer,以避免此问题。
什么时候producerPerThread 是true ,用户代码必须调用closeThreadBoundProducer() 在工厂中,当不再需要生产者时。
这将以物理方式关闭 producer 并将其从ThreadLocal .
叫reset() 或destroy() 不会清理这些 producer。 |
创建DefaultKafkaProducerFactory
、键和/或值Serializer
可以通过调用仅接受属性 Map 的构造函数从配置中获取类(参见用KafkaTemplate
) 或Serializer
实例可以传递给DefaultKafkaProducerFactory
constructor (在这种情况下,所有Producer
共享相同的实例)。
或者,您可以提供Supplier<Serializer>
s(从版本 2.3 开始),该 S 将用于获取单独的Serializer
实例Producer
:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从版本 2.5.10 开始,您现在可以在创建工厂后更新 producer 属性。
这可能很有用,例如,如果您必须在凭证更改后更新 SSL 密钥/信任存储位置。
这些更改不会影响现有的生产者实例;叫reset()
关闭任何现有生产者,以便使用新属性创建新的生产者。
注意:您不能将事务性生产者工厂更改为非事务性工厂,反之亦然。
现在提供了两种新方法:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从版本 2.8 开始,如果你提供序列化器作为对象(在构造函数中或通过 setter),工厂将调用configure()
方法配置它们。
用ReplyingKafkaTemplate
版本 2.1.3 引入了KafkaTemplate
提供请求/回复语义。
该类名为ReplyingKafkaTemplate
并且有两个额外的方法;方法签名如下:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(另请参阅请求/回复Message<?>
s).
结果是一个CompletableFuture
,该 ID 将异步填充 result(或 exception,对于 Timeout)。
结果还具有sendFuture
property 中调用的KafkaTemplate.send()
.
您可以使用此 future 来确定 send作的结果。
在版本 3.0 中,这些方法返回的 futures(及其sendFuture properties) 已更改为CompletableFuture s 而不是ListenableFuture s. |
如果使用第一种方法,或者replyTimeout
argument 是null
中,模板的defaultReplyTimeout
属性(默认为 5 秒)。
从版本 2.8.8 开始,模板有一个新方法waitForAssignment
.
如果回复容器配置了auto.offset.reset=latest
以避免在初始化容器之前发送请求和回复。
当使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的pollTimeout 属性,因为在第一次轮询完成之前不会发送通知。 |
以下 Spring Boot 应用程序显示了如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
请注意,我们可以使用 Boot 的自动配置的容器工厂来创建回复容器。
如果 non-tanky 反序列化器用于回复,请考虑使用ErrorHandlingDeserializer
委托给你配置的 deserializer。
配置后,RequestReplyFuture
将破例完成,您可以赶上ExecutionException
,使用DeserializationException
在其cause
财产。
从版本 2.6.7 开始,除了检测DeserializationException
s,模板将调用replyErrorChecker
函数(如果提供)。
如果它返回异常,则 future 将异常完成。
下面是一个示例:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
该模板会设置一个标头(名为KafkaHeaders.CORRELATION_ID
默认情况下),该回调必须由服务器端回显。
在这种情况下,以下@KafkaListener
应用程序响应:
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
这@KafkaListener
infrastructure 回显相关 ID 并确定回复主题。
看转发侦听器结果@SendTo
了解有关发送回复的更多信息。
模板使用默认标头KafKaHeaders.REPLY_TOPIC
以指示回复所指向的主题。
从版本 2.2 开始,模板会尝试从配置的回复容器中检测回复主题或分区。
如果容器配置为侦听单个主题或单个TopicPartitionOffset
,它用于设置回复标头。
如果容器配置为其他方式,则用户必须设置回复标头。
在这种情况下,INFO
日志消息在初始化期间写入。
以下示例使用KafkaHeaders.REPLY_TOPIC
:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
使用单个回复进行配置时TopicPartitionOffset
,您可以对多个模板使用相同的回复主题,只要每个实例侦听不同的分区即可。
使用单个回复主题进行配置时,每个实例必须使用不同的group.id
.
在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例会找到相关 ID。
这对于自动扩展可能很有用,但会产生额外网络流量的开销,并且丢弃每个不需要的回复的成本很小。
使用此设置时,我们建议您将模板的sharedReplyTopic
自true
,这会降低对 DEBUG 而不是默认 ERROR 的意外回复的日志记录级别。
以下是配置回复容器以使用相同的共享回复主题的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
如果您有多个客户端实例,并且未按照上一段所述配置它们,则每个实例都需要一个专用的回复主题。
另一种方法是将KafkaHeaders.REPLY_PARTITION 并为每个实例使用专用分区。
这Header 包含一个四字节的 int (big-endian)。
服务器必须使用此标头将回复路由到正确的分区 (@KafkaListener 执行此作)。
但是,在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(通过使用TopicPartitionOffset 在其ContainerProperties constructor 的 API 中)。 |
这DefaultKafkaHeaderMapper 要求 Jackson 位于 Classpath 上(对于@KafkaListener ).
如果不可用,则消息转换器没有标头映射器,因此您必须配置MessagingMessageConverter 替换为SimpleKafkaHeaderMapper ,如前所述。 |
默认情况下,使用 3 个标头:
-
KafkaHeaders.CORRELATION_ID
- 用于将回复与请求相关联 -
KafkaHeaders.REPLY_TOPIC
- 用于告诉服务器在哪里回复 -
KafkaHeaders.REPLY_PARTITION
- (可选)用于告诉服务器要回复哪个分区
这些标头名称由@KafkaListener
infrastructure 来路由回复。
从版本 2.3 开始,您可以自定义标头名称 - 该模板有 3 个属性correlationHeaderName
,replyTopicHeaderName
和replyPartitionHeaderName
.
如果您的服务器不是 Spring 应用程序(或者不使用@KafkaListener
).
相反,如果请求应用程序不是 Spring 应用程序,并且将关联信息放在不同的标头中,则从版本 3.0 开始,您可以配置自定义correlationHeaderName 在侦听器容器工厂上,该标头将被回显。
以前,侦听器必须回显自定义关联标头。 |
请求/回复Message<?>
s
版本 2.7 向ReplyingKafkaTemplate
发送和接收spring-messaging
的Message<?>
抽象化:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的默认值replyTimeout
,还有一些重载版本可以在方法调用中超时。
在版本 3.0 中,这些方法返回的 futures(及其sendFuture properties) 已更改为CompletableFuture s 而不是ListenableFuture s. |
如果使用者的Deserializer
或模板的MessageConverter
可以通过回复消息中的配置或类型元数据,在没有任何附加信息的情况下转换有效负载。
如果需要为 return 类型提供类型信息,请使用第二种方法来帮助消息转换器。 这也允许同一个模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。 以下是后者的示例:
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
回复类型 message<?>
当@KafkaListener
返回Message<?>
,对于 2.5 之前的版本,必须填充回复主题和相关 ID 标头。
在此示例中,我们使用请求中的回复主题标头:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这还显示了如何在回复记录上设置键。
从版本 2.5 开始,框架将检测这些标头是否缺失,并使用主题填充它们 - 由@SendTo
值或传入的KafkaHeaders.REPLY_TOPIC
标头(如果存在)。
它还将回显传入的KafkaHeaders.CORRELATION_ID
和KafkaHeaders.REPLY_PARTITION
(如果存在)。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
聚合多个回复
中的 模板用ReplyingKafkaTemplate
严格适用于单个请求/答复方案。
对于单个消息的多个接收者返回回复的情况,您可以使用AggregatingReplyingKafkaTemplate
.
这是 Scatter-Gather Enterprise Integration Pattern 的客户端实现。
与ReplyingKafkaTemplate
这AggregatingReplyingKafkaTemplate
constructor 需要一个 producer 工厂和一个 Listener 容器来接收回复;它有第三个参数BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
每次收到回复时都会查阅;当谓词返回true
、ConsumerRecord
s 用于完成Future
由sendAndReceive
方法。
还有一个附加属性returnPartialOnTimeout
(默认为 false)。
当此项设置为true
,而不是用KafkaReplyTimeoutException
,则部分结果正常补全 future(只要至少收到一条回复记录)。
从版本 2.3.5 开始,谓词也会在超时后调用(如果returnPartialOnTimeout
是true
).
第一个参数是当前记录列表;第二个是true
如果此调用是由于超时。
谓词可以修改记录列表。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是ConsumerRecord
其值是ConsumerRecord
s.
“外部”ConsumerRecord
不是“真实”记录,它由模板合成,作为为请求收到的实际回复记录的持有者。
当发生正常发布时(发布策略返回 true),主题设置为aggregatedResults
;如果returnPartialOnTimeout
为 true,并且发生超时(并且至少收到了一条回复记录),则主题设置为partialResultsAfterTimeout
.
该模板为这些 “topic” 名称提供常量静态变量:
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
真实ConsumerRecord
s 在Collection
包含从中接收回复的实际主题。
回复的侦听器容器必须配置为AckMode.MANUAL 或AckMode.MANUAL_IMMEDIATE ;Consumer 属性enable.auto.commit 必须是false (自 2.3 版起的默认值)。
为避免丢失消息的可能性,模板仅在未完成请求为零时(即当发布策略释放最后一个未完成的请求时)提交偏移量。
重新平衡后,可能会出现重复的回复投放;对于任何正在进行的请求,这些都将被忽略;当收到已发布回复的重复回复时,您可能会看到错误日志消息。 |
如果您使用ErrorHandlingDeserializer 使用此聚合模板时,框架不会自动检测DeserializationException s.
相反,记录(带有null 值)将完整返回,但 Headers 中存在反序列化异常。
建议应用程序调用 Utility 方法ReplyingKafkaTemplate.checkDeserialization() 方法来确定是否发生了反序列化异常。
有关更多信息,请参阅其 javadocs。
这replyErrorChecker 也不要求此聚合模板;您应该对回复的每个元素执行检查。 |
4.1.4. 接收消息
您可以通过配置MessageListenerContainer
并提供消息侦听器,或使用@KafkaListener
注解。
消息侦听器
使用消息侦听器容器时,必须提供侦听器以接收数据。 消息侦听器目前支持 8 个接口。 下面的清单显示了这些接口:
public interface MessageListener<K, V> { (1)
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { (2)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> { (5)
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> { (6)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
1 | 使用此接口处理单个ConsumerRecord 从 Kafka 使用者接收的实例poll() 作。 |
2 | 使用此接口处理单个ConsumerRecord 从 Kafka 使用者接收的实例poll() 作。 |
3 | 使用此接口处理单个ConsumerRecord 从 Kafka 使用者接收的实例poll() 作。
访问Consumer 对象。 |
4 | 使用此接口处理单个ConsumerRecord 从 Kafka 使用者接收的实例poll() 作。
访问Consumer 对象。 |
5 | 使用此接口处理所有ConsumerRecord 从 Kafka 使用者接收的实例poll() 作。AckMode.RECORD 在使用此接口时不受支持,因为侦听器将获得完整的批处理。 |
6 | 使用此接口处理所有ConsumerRecord 从 Kafka 使用者接收的实例poll() 作。 |
7 | 使用此接口处理所有ConsumerRecord 从 Kafka 使用者接收的实例poll() 作。AckMode.RECORD 在使用此接口时不受支持,因为侦听器将获得完整的批处理。
访问Consumer 对象。 |
8 | 使用此接口处理所有ConsumerRecord 从 Kafka 使用者接收的实例poll() 作。
访问Consumer 对象。 |
这Consumer object 不是线程安全的。
您只能在调用侦听器的线程上调用其方法。 |
您不应执行任何Consumer<?, ?> 影响消费者位置和/或在您的侦听器中提交的偏移量的方法;容器需要管理此类信息。 |
消息侦听器容器
二MessageListenerContainer
提供了 implementations:
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
这KafkaMessageListenerContainer
接收来自单个线程上所有主题或分区的所有消息。
这ConcurrentMessageListenerContainer
委托人KafkaMessageListenerContainer
实例来提供多线程消耗。
从版本 2.2.7 开始,您可以添加RecordInterceptor
到侦听器容器;它将在调用侦听器之前调用,以允许检查或修改记录。
如果侦听器返回 null,则不会调用侦听器。
从版本 2.7 开始,它有额外的方法,这些方法在 listener 退出后调用(通常,或通过抛出异常)。
此外,从版本 2.7 开始,现在有一个BatchInterceptor
,为 Batch 侦听器提供类似的功能。
此外,ConsumerAwareRecordInterceptor
(以及BatchInterceptor
) 提供对Consumer<?, ?>
.
例如,这可能用于访问侦听器中的使用者指标。
你不应该在这些拦截器中执行任何影响消费者位置和/或提交偏移量的方法;容器需要管理此类信息。 |
如果拦截器更改了记录(通过创建新记录),则topic ,partition 和offset 必须保持不变,以避免意外的副作用,例如记录丢失。 |
这CompositeRecordInterceptor
和CompositeBatchInterceptor
可用于调用多个拦截器。
默认情况下,从版本 2.8 开始,当使用事务时,在事务开始之前调用拦截器。
您可以设置侦听器容器的interceptBeforeTx
property 设置为false
在事务启动后调用拦截器。
从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是KafkaAwareTransactionManager
s.
例如,这允许拦截器参与由容器启动的 JDBC 事务。
从版本 2.3.8、2.4.6 开始,ConcurrentMessageListenerContainer
现在支持在并发大于 1 时进行静态成员身份。
这group.instance.id
后缀为-n
跟n
起价1
.
这与增加的session.timeout.ms
可用于减少再平衡事件,例如,在重新启动应用程序实例时。
用KafkaMessageListenerContainer
以下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收一个ConsumerFactory
以及有关主题和分区以及其他配置的信息,在ContainerProperties
对象。ContainerProperties
具有以下构造函数:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数采用TopicPartitionOffset
参数显式指示容器使用哪些分区(使用 Consumerassign()
方法)和可选的 initial offset 来获取。
默认情况下,正值是绝对偏移量。
默认情况下,负值是相对于分区中的当前最后一个偏移量。
的构造函数TopicPartitionOffset
这需要额外的boolean
参数。
如果这是true
,则初始偏移量(正或负)相对于此使用者的当前位置。
偏移量在容器启动时应用。
第二个选项采用一组主题,Kafka 根据group.id
property — 在组中分布分区。
第三个使用正则表达式Pattern
以选择主题。
要分配MessageListener
添加到容器中,您可以使用ContainerProps.setMessageListener
方法。
以下示例显示了如何执行此作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,在创建DefaultKafkaConsumerFactory
,使用仅接受上述属性的构造函数意味着 key 和 valueDeserializer
类是从 Configuration 中选取的。
或者Deserializer
实例可以传递给DefaultKafkaConsumerFactory
constructor 的 intent 和/或 value,在这种情况下,所有 Consumer 共享相同的实例。
另一种选择是提供Supplier<Deserializer>
s(从版本 2.3 开始),该 S 将用于获取单独的Deserializer
实例Consumer
:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请参阅 Javadoc 以获取ContainerProperties
了解有关您可以设置的各种属性的更多信息。
从 2.1.1 版本开始,一个名为logContainerConfig
可用。
什么时候true
和INFO
启用日志记录时,每个侦听器容器都会写入一条日志消息,总结其配置属性。
默认情况下,主题偏移提交的日志记录在DEBUG
日志记录级别。
从版本 2.1.2 开始,在ContainerProperties
叫commitLogLevel
用于指定这些消息的日志级别。
例如,要将日志级别更改为INFO
,您可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
.
从版本 2.2 开始,一个名为missingTopicsFatal
已添加(默认:false
从 2.3.4 开始)。
如果代理上不存在任何已配置的主题,这将阻止容器启动。
如果容器配置为侦听主题模式 (regex),则它不适用。
以前,容器线程在consumer.poll()
方法等待主题出现,同时记录许多消息。
除了日志之外,没有迹象表明存在问题。
从版本 2.8 开始,新的 container 属性authExceptionRetryInterval
已引入。
这会导致容器在获取任何消息后重试获取消息AuthenticationException
或AuthorizationException
从KafkaConsumer
.
例如,当已配置的用户被拒绝读取特定主题的访问权限或凭证不正确时,可能会发生这种情况。
定义authExceptionRetryInterval
允许容器在授予适当权限时进行恢复。
默认情况下,未配置间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。 |
从版本 2.8 开始,在创建 Consumer Factory 时,如果你提供反序列化器作为对象(在构造函数中或通过 setter),工厂将调用configure()
方法配置它们。
用ConcurrentMessageListenerContainer
单个构造函数类似于KafkaListenerContainer
构造 函数。
下面的清单显示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还具有concurrency
财产。
例如container.setConcurrency(3)
创建 3 个KafkaMessageListenerContainer
实例。
对于第一个构造函数,Kafka 使用其组管理功能在使用者之间分配分区。
侦听多个主题时,默认分区分布可能不是您所期望的。
例如,如果您有 3 个主题,每个主题有 5 个分区,并且您希望使用 使用 Spring Boot 时,可以按如下方式分配 set 策略:
|
当容器属性配置为TopicPartitionOffset
s、ConcurrentMessageListenerContainer
分发TopicPartitionOffset
跨委托的实例KafkaMessageListenerContainer
实例。
如果,比如 6 个TopicPartitionOffset
实例,并且concurrency
是3
;每个容器获取两个分区。
五人TopicPartitionOffset
实例中,两个容器获得两个分区,第三个容器获得一个分区。
如果concurrency
大于TopicPartitions
这concurrency
向下调整,以便每个容器获得一个分区。
这client.id property(如果已设置)附加-n 哪里n 是与并发对应的使用者实例。
启用 JMX 时,需要为 MBean 提供唯一名称。 |
从版本 1.3 开始,MessageListenerContainer
提供对底层KafkaConsumer
.
在ConcurrentMessageListenerContainer
这metrics()
method 返回所有目标的量度KafkaMessageListenerContainer
实例。
这些指标分为Map<MetricName, ? extends Metric>
由client-id
为底层KafkaConsumer
.
从版本 2.3 开始,ContainerProperties
提供了一个idleBetweenPolls
选项,让侦听器容器中的 main 循环在KafkaConsumer.poll()
调用。
从提供的选项中选择实际睡眠间隔作为最小值,并且max.poll.interval.ms
consumer config 和 current records 批处理时间。
提交偏移量
提供了几个选项来提交偏移量。
如果enable.auto.commit
consumer 属性是true
,Kafka 会根据其配置自动提交偏移量。
如果是false
,容器支持多个AckMode
设置(在下一个列表中描述)。
默认的AckMode
是BATCH
.
从版本 2.3 开始,框架将enable.auto.commit
自false
除非在配置中明确设置。
以前,Kafka 默认值 (true
) 。
消费者poll()
method 返回一个或多个ConsumerRecords
.
这MessageListener
。
以下列表描述了容器对每个AckMode
(当交易未被使用时):
-
RECORD
:当侦听器在处理记录后返回时提交偏移量。 -
BATCH
:当poll()
已处理。 -
TIME
:当poll()
已处理,只要ackTime
since the last commit 已超过。 -
COUNT
:当poll()
已处理,只要ackCount
自上次提交以来已收到记录。 -
COUNT_TIME
:似TIME
和COUNT
,但如果任一条件为true
. -
MANUAL
:消息侦听器负责acknowledge()
这Acknowledgment
. 之后,与BATCH
被应用。 -
MANUAL_IMMEDIATE
:当Acknowledgment.acknowledge()
method 由侦听器调用。
使用事务时,偏移量将发送到事务,语义等效于RECORD
或BATCH
,具体取决于侦听器类型(记录或批处理)。
MANUAL 和MANUAL_IMMEDIATE 要求侦听器是AcknowledgingMessageListener 或BatchAcknowledgingMessageListener .
请参见消息侦听器。 |
根据syncCommits
container 属性、commitSync()
或commitAsync()
方法。syncCommits
是true
默认情况下;另请参阅setSyncCommitTimeout
.
看setCommitCallback
获取异步提交的结果;默认回调是LoggingCommitCallback
记录错误(和调试级别的成功)。
因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
成为false
.
从版本 2.3 开始,它会无条件地将其设置为 false,除非在 consumer 工厂中特别设置或容器的 consumer 属性覆盖。
这Acknowledgment
方法如下:
public interface Acknowledgment {
void acknowledge();
}
此方法使侦听器可以控制何时提交偏移量。
从版本 2.3 开始,Acknowledgment
interface 有两个额外的方法nack(long sleep)
和nack(int index, long sleep)
.
第一个用于记录侦听器,第二个用于批处理侦听器。
为您的侦听器类型调用错误的方法将抛出IllegalStateException
.
如果要提交部分 batch,请使用nack() ,使用事务时,将AckMode 自MANUAL ;调用nack() 会将成功处理的记录的偏移量发送到交易。 |
nack() 只能在调用侦听器的使用者线程上调用。 |
nack() 在使用 Out of Order Commits 时不允许。 |
对于记录侦听器,当nack()
被调用,则提交任何待处理的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一次poll()
.
使用者可以在重新交付之前暂停,方法是将sleep
论点。
这与在容器配置了DefaultErrorHandler
.
使用批处理侦听器时,您可以指定发生故障的批处理中的索引。
什么时候nack()
调用时,将在索引之前为记录提交偏移量,并在失败和丢弃的记录的分区上执行查找,以便在下一个poll()
.
有关更多信息,请参阅容器错误处理程序。
使用者在休眠期间暂停,以便我们继续轮询 broker 以保持使用者处于活动状态。
实际休眠时间及其分辨率取决于容器的pollTimeout 默认为 5 秒。
最小睡眠时间等于pollTimeout 并且所有的睡眠时间都是它的倍数。
对于较短的休眠时间,或者为了提高其准确性,请考虑减少容器的pollTimeout . |
从版本 3.0.10 开始,批处理侦听器可以使用acknowledge(index)
在Acknowledgment
论点。
调用此方法时,将提交索引处记录的偏移量(以及所有以前的记录)。
叫acknowledge()
执行部分批处理提交后,将提交批处理剩余部分的偏移量。
以下限制适用:
-
AckMode.MANUAL_IMMEDIATE
是必需的 -
必须在侦听器线程上调用该方法
-
侦听器必须使用
List
而不是原始的ConsumerRecords
-
索引必须在列表元素的范围内
-
索引必须大于上一次调用中使用的索引
这些限制是强制性的,该方法将抛出一个IllegalArgumentException
或IllegalStateException
,具体取决于冲突。
手动提交偏移量
通常,使用AckMode.MANUAL
或AckMode.MANUAL_IMMEDIATE
,必须按顺序确认确认,因为 Kafka 不维护每条记录的状态,只维护每个组/分区的已提交偏移量。
从版本 2.8 开始,您现在可以设置 container 属性asyncAcks
,这允许以任何顺序确认轮询返回的记录的确认。
侦听器容器将延迟无序提交,直到收到缺少的确认。
使用者将被暂停(不传送新记录),直到上一次轮询的所有偏移量都已提交。
虽然此功能允许应用程序异步处理记录,但应理解,它增加了失败后重复交付的可能性。 |
@KafkaListener
注解
这@KafkaListener
annotation 用于将 Bean 方法指定为侦听器容器的侦听器。
该 bean 被包装在一个MessagingMessageListenerAdapter
配置了各种功能,例如转换器,用于在必要时转换数据以匹配方法参数。
您可以使用 SPEL 配置 Comments 上的大多数属性#{…}
或属性占位符 (${…}
).
有关更多信息,请参阅 Javadoc。
录制侦听器
这@KafkaListener
annotation 为简单的 POJO 侦听器提供了一种机制。
以下示例演示如何使用它:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
此机制需要一个@EnableKafka
注释添加到您的@Configuration
类和侦听器容器工厂,用于配置底层ConcurrentMessageListenerContainer
.
默认情况下,名称为kafkaListenerContainerFactory
是意料之中的。
以下示例演示如何使用ConcurrentMessageListenerContainer
:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
请注意,要设置容器属性,必须使用getContainerProperties()
方法。
它用作注入到容器中的实际属性的模板。
从版本 2.1.1 开始,您现在可以将client.id
属性。
这clientIdPrefix
后缀为-n
哪里n
是一个整数,表示使用并发时的容器编号。
从版本 2.2 开始,您现在可以覆盖容器工厂的concurrency
和autoStartup
properties 的 PROPERTIES 来使用注解本身的 properties。
属性可以是简单值、属性占位符或 SPEL 表达式。
以下示例显示了如何执行此作:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
显式分区分配
您还可以使用显式主题和分区(以及可选的其初始偏移量)配置 POJO 侦听器。 以下示例显示了如何执行此作:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以在partitions
或partitionOffsets
属性,但不能同时共享两者。
与大多数 Comments 属性一样,您可以使用 SPEL 表达式;有关如何生成大型分区列表的示例,请参阅手动分配所有分区。
从版本 2.5.5 开始,您可以将初始偏移量应用于所有分配的分区:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
通配符表示*
partitions
属性。
只能有一个@PartitionOffset
每个@TopicPartition
.
此外,当监听器实现ConsumerSeekAware
,onPartitionsAssigned
现在被调用,即使在使用手动分配时也是如此。
例如,这允许在当时执行任何任意 seek作。
从版本 2.6.4 开始,您可以指定以逗号分隔的分区列表或分区范围:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
范围是非独占的;上面的示例将分配 partition0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
.
指定初始偏移量时,可以使用相同的技术:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量将应用于所有 6 个分区。
手动确认
使用手动时AckMode
,您还可以为侦听器提供Acknowledgment
.
以下示例还显示了如何使用其他容器工厂。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消费者记录元数据
最后,有关记录的元数据可从消息标头中获得。 您可以使用以下报头名称来检索邮件的报头:
-
KafkaHeaders.OFFSET
-
KafkaHeaders.RECEIVED_KEY
-
KafkaHeaders.RECEIVED_TOPIC
-
KafkaHeaders.RECEIVED_PARTITION
-
KafkaHeaders.RECEIVED_TIMESTAMP
-
KafkaHeaders.TIMESTAMP_TYPE
从版本 2.5 开始,RECEIVED_KEY
不存在,如果传入记录具有null
钥匙;以前,标头中填充了null
价值。
此更改是为了使框架与spring-messaging
约定,其中null
值标头不存在。
以下示例演示如何使用标头:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
参数注释 (@Payload ,@Header ) 必须在 listener 方法的具体实现上指定;如果在接口上定义了它们,则不会检测到它们。 |
从版本 2.5 开始,您可以在ConsumerRecordMetadata
参数。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
这包含来自ConsumerRecord
但 Key 和 Value 除外。
Batch 侦听器
从版本 1.1 开始,您可以配置@KafkaListener
方法来接收从 Consumer Poll 接收的整批 Consumer 记录。
批处理侦听器不支持非阻塞重试。 |
要配置侦听器容器工厂创建批处理侦听器,您可以设置batchListener
财产。
以下示例显示了如何执行此作:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
从版本 2.8 开始,您可以覆盖工厂的batchListener propery 使用batch 属性@KafkaListener 注解。
这与对 Container Error Handlers 的更改一起,允许将同一工厂用于记录侦听器和批处理侦听器。 |
从版本 2.9.6 开始,容器工厂为recordMessageConverter 和batchMessageConverter 性能。
以前,只有一个属性messageConverter 该 API 适用于记录侦听器和批处理侦听器。 |
以下示例显示如何接收有效负载列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
topic 、 partition 、 offset 等在与有效负载并行的标头中可用。 以下示例演示如何使用标头:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,您也可以收到List
之Message<?>
对象,但它必须是唯一的参数(除了可选的Acknowledgment
、使用手动提交时,和/或Consumer<?, ?>
parameters) 的 API 中。
以下示例显示了如何执行此作:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,不会对有效负载执行任何转换。
如果BatchMessagingMessageConverter
配置了RecordMessageConverter
中,您还可以将泛型类型添加到Message
参数,并且有效负载会进行转换。
有关更多信息,请参阅使用 Batch Listeners 进行有效负载转换。
您还可以接收ConsumerRecord<?, ?>
对象,但它必须是唯一的参数(除了可选的Acknowledgment
、使用手动提交和Consumer<?, ?>
parameters) 的 API 中。
以下示例显示了如何执行此作:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从版本 2.2 开始,侦听器可以接收完整的ConsumerRecords<?, ?>
对象返回的poll()
方法,让侦听器访问其他方法,例如partitions()
(返回TopicPartition
实例)和records(TopicPartition)
(获取选择性记录)。
同样,这必须是唯一的参数(除了可选的Acknowledgment
、使用手动提交或Consumer<?, ?>
parameters) 的 API 中。
以下示例显示了如何执行此作:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工厂具有RecordFilterStrategy configured,则ConsumerRecords<?, ?> listeners,带有WARN 日志消息。
如果<List<?>> 使用侦听器的形式。
默认情况下,一次筛选一个记录;从版本 2.8 开始,您可以覆盖filterBatch 以在一次调用中筛选整个批处理。 |
注释属性
从版本 2.0 开始,id
property(如果存在)用作 Kafka 使用者group.id
属性,覆盖 Consumer Factory 中的 configured 属性(如果存在)。
您还可以设置groupId
显式或设置idIsGroup
设置为 false 以恢复之前使用 Consumer Factory 的行为group.id
.
您可以在大多数 Comments 属性中使用属性占位符或 SPEL 表达式,如下例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从版本 2.1.2 开始,SPEL 表达式支持特殊令牌:__listener
.
它是一个伪 Bean 名称,表示存在此 Comments 的当前 Bean 实例。
请考虑以下示例:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
给定上一个示例中的 bean,然后我们可以使用以下内容:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果,在不太可能的情况下,您有一个名为__listener
中,您可以使用beanRef
属性。
以下示例显示了如何执行此作:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
从版本 2.2.4 开始,您可以直接在 Comments 上指定 Kafka consumer 属性,这些属性将覆盖在 consumer 工厂中配置的具有相同名称的任何属性。您不能指定group.id
和client.id
properties 以这种方式;他们将被忽视;使用groupId
和clientIdPrefix
annotation 属性。
这些属性被指定为具有普通 Java 的单个字符串Properties
文件格式:foo:bar
,foo=bar
或foo bar
.
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是 中示例的相应侦听器的示例用RoutingKafkaTemplate
.
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
获取 Consumergroup.id
在多个容器中运行相同的侦听器代码时,能够确定哪个容器(由其group.id
consumer property) 记录的来源。
您可以调用KafkaUtils.getConsumerGroupId()
在侦听器线程上执行此作。
或者,您可以在 method 参数中访问组 ID。
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
这在接收List<?> 的记录。
它在接收ConsumerRecords<?, ?> 论点。
使用KafkaUtils 机制。 |
容器线程命名
一个TaskExecutor
用于调用 Consumer 和 Listener。
您可以通过设置consumerExecutor
容器的ContainerProperties
.
使用池化执行程序时,请确保有足够的线程可用于处理使用它们的所有容器之间的并发。
使用ConcurrentMessageListenerContainer
中,来自执行程序的线程将用于每个使用者 (concurrency
).
如果您未提供使用者执行程序,则SimpleAsyncTaskExecutor
用于每个容器。
此执行程序创建名称类似于<beanName>-C-<n>
.
对于ConcurrentMessageListenerContainer
这<beanName>
线程名称的一部分变为<beanName>-m
哪里m
表示 Consumer 实例。n
每次启动容器时递增。
因此,使用 bean 名称container
,此容器中的线程将被命名为container-0-C-1
,container-1-C-1
等,在容器首次启动后;container-0-C-2
,container-1-C-2
等,在 stop 和随后的 start 之后。
从 version 开始3.0.1
,您现在可以更改线程的名称,而不管使用哪个执行程序。
将AbstractMessageListenerContainer.changeConsumerThreadName
property 设置为true
和AbstractMessageListenerContainer.threadNameSupplier
将调用以获取线程名称。
这是一个Function<MessageListenerContainer, String>
,默认实现返回container.getListenerId()
.
@KafkaListener
作为元注释
从版本 2.2 开始,您现在可以使用@KafkaListener
作为 meta 注释。
以下示例显示了如何执行此作:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
您必须为以下至少一个topics
,topicPattern
或topicPartitions
(而且,通常id
或groupId
除非您指定了group.id
在 Consumer Factory 配置中)。
以下示例显示了如何执行此作:
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
@KafkaListener
在类上
当您使用@KafkaListener
在类级别,您必须指定@KafkaHandler
在方法级别。
在传送消息时,转换后的消息负载类型用于确定要调用的方法。
以下示例显示了如何执行此作:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从版本 2.1.3 开始,您可以指定@KafkaHandler
method 作为默认方法,如果其他方法不匹配,则调用该方法。
最多可以指定一种方法。
使用@KafkaHandler
方法,则有效负载必须已转换为 domain 对象(以便可以执行匹配)。
使用自定义反序列化器JsonDeserializer
或JsonMessageConverter
及其TypePrecedence
设置为TYPE_ID
.
有关更多信息,请参见 序列化、反序列化和消息转换。
由于 Spring 解析方法参数的方式存在一些限制,默认的@KafkaHandler 无法接收离散标头;它必须使用ConsumerRecordMetadata 如 Consumer Record Metadata 中所述。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是String
;这topic
parameter 还将获得对object
.
如果您需要 default 方法中有关记录的元数据,请使用以下命令:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
@KafkaListener
属性修改
从版本 2.7.2 开始,您现在可以在创建容器之前以编程方式修改注释属性。
为此,请添加一个或多个KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer
添加到应用程序上下文中。AnnotationEnhancer
是一个BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>
并且必须返回 attributes 的 Map。
属性值可以包含 SPEL 和/或属性占位符;在执行任何解析之前调用增强器。
如果存在多个增强器,并且它们实现Ordered
,它们将按顺序调用。
AnnotationEnhancer 必须声明 bean 定义static 因为它们在应用程序上下文生命周期的早期就是必需的。 |
示例如下:
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
@KafkaListener
生命周期管理
为@KafkaListener
注释不是应用程序上下文中的 bean。
相反,它们被注册到类型为KafkaListenerEndpointRegistry
.
这个 bean 由框架自动声明并管理容器的生命周期;它将自动启动任何具有autoStartup
设置为true
.
所有容器工厂创建的所有容器必须位于同一phase
.
有关更多信息,请参阅 Listener Container Auto Startup。
您可以使用注册表以编程方式管理生命周期。
启动或停止注册表将启动或停止所有已注册的容器。
或者,您可以使用其id
属性。
您可以设置autoStartup
在注解上,这将覆盖在 Container Factory 中配置的默认设置。
您可以从应用程序上下文(例如自动布线)获取对 bean 的引用,以管理其已注册的容器。
以下示例说明如何执行此作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
注册表仅维护其管理的容器的生命周期;声明为 bean 的容器不受注册表管理,可以从应用程序上下文中获取。
可以通过调用注册表的getListenerContainers()
方法。
版本 2.2.5 添加了便捷方法getAllListenerContainers()
,它返回所有容器的集合,包括由 Registry 管理的容器和声明为 bean 的容器。
返回的集合将包括任何已初始化的原型 bean,但它不会初始化任何惰性 bean 声明。
在刷新应用程序上下文后注册的端点将立即启动,无论其autoStartup 属性,以遵守SmartLifecycle 合同,其中autoStartup 仅在应用程序上下文初始化期间考虑。
延迟注册的一个示例是 Bean 的@KafkaListener 在 prototype 范围内,在初始化上下文后创建实例。
从版本 2.8.7 开始,您可以设置注册表的alwaysStartAfterRefresh property 设置为false 然后是容器的autoStartup 属性将定义容器是否启动。 |
@KafkaListener
@Payload
验证
从版本 2.2 开始,现在可以更轻松地添加Validator
验证@KafkaListener
@Payload
参数。
以前,您必须配置自定义DefaultMessageHandlerMethodFactory
并将其添加到注册商。
现在,您可以将验证器添加到注册商本身。
以下代码演示如何执行此作:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}
当您将 Spring Boot 与验证Starters一起使用时,一个LocalValidatorFactoryBean 是自动配置的,如下例所示: |
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
以下示例演示如何验证:
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
从版本 2.5.11 开始,验证现在适用于@KafkaHandler
方法。
看@KafkaListener
在类上.
重新平衡侦听器
ContainerProperties
具有一个名为consumerRebalanceListener
,它采用 Kafka 客户端的ConsumerRebalanceListener
接口。
如果未提供此属性,则容器会配置一个日志记录侦听器,该侦听器在INFO
水平。
框架还添加了一个子接口ConsumerAwareRebalanceListener
.
下面的清单显示了ConsumerAwareRebalanceListener
接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,当 partitions 被撤销时,有两个回调。 第一个会立即调用。 第二个 is 在提交任何待处理的偏移量后调用。 如果您希望在某些外部存储库中维护偏移量,这非常有用,如下例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从版本 2.4 开始,新方法onPartitionsLost() 已添加(类似于ConsumerRebalanceLister ).
默认实现ConsumerRebalanceLister 只需调用onPartionsRevoked .
默认实现ConsumerAwareRebalanceListener 什么都不做。
当为侦听器容器提供自定义侦听器(任一类型)时,您的实现必须不要调用onPartitionsRevoked 从onPartitionsLost .
如果您实现ConsumerRebalanceListener 您应该覆盖 default 方法。
这是因为侦听器容器将调用自己的onPartitionsRevoked 从其onPartitionsLost 在对实现调用 方法后。
如果 implementation delegate to default 行为,onPartitionsRevoked 将调用两次,每次Consumer 在容器的侦听器上调用该方法。 |
转发侦听器结果@SendTo
从版本 2.0 开始,如果您还注解了@KafkaListener
替换为@SendTo
注解,并且方法调用返回一个结果,则结果将转发到@SendTo
.
这@SendTo
value 可以有多种形式:
-
@SendTo("someTopic")
路由到 Literal 主题 -
@SendTo("#{someExpression}")
路由到通过在应用程序上下文初始化期间计算表达式一次来确定的主题。 -
@SendTo("!{someExpression}")
路由到通过在运行时评估表达式确定的主题。 这#root
用于评估的 object 具有三个属性:-
request
:入站ConsumerRecord
(或ConsumerRecords
对象 -
source
:这org.springframework.messaging.Message<?>
从request
. -
result
:该方法返回结果。
-
-
@SendTo
(无属性):这被视为!{source.headers['kafka_replyTopic']}
(自版本 2.1.3 起)。
从版本 2.1.11 和 2.2.1 开始,属性占位符在@SendTo
值。
表达式 evaluation 的结果必须是String
,这表示主题名称。
以下示例显示了各种使用方法@SendTo
:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
为了支持@SendTo ,必须为侦听器容器工厂提供KafkaTemplate (在其replyTemplate 属性),该属性用于发送回复。
这应该是一个KafkaTemplate 而不是ReplyingKafkaTemplate 在客户端用于请求/回复处理。
使用 Spring Boot 时,boot 会自动将模板配置到工厂中;在配置您自己的工厂时,必须按照以下示例所示进行设置。 |
从版本 2.2 开始,您可以添加ReplyHeadersConfigurer
添加到侦听器容器工厂。
将查询此信息以确定要在回复消息中设置的标头。
以下示例演示如何添加ReplyHeadersConfigurer
:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果需要,您还可以添加更多标题。 以下示例显示了如何执行此作:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当您使用@SendTo
,您必须配置ConcurrentKafkaListenerContainerFactory
替换为KafkaTemplate
在其replyTemplate
属性来执行发送。
Spring Boot 将自动连接其自动配置的模板(如果存在单个实例,则为 any)。
除非你使用请求/回复语义,否则只有简单的send(topic, value) 方法,因此您可能希望创建一个子类来生成分区或键。
以下示例显示了如何执行此作: |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
如果 listener 方法返回
|
当使用请求/回复语义时,发送方可以请求目标分区。
您可以对
有关更多信息,请参阅处理异常。 |
如果侦听器方法返回Iterable ,默认情况下,将发送每个元素的一条记录作为值。
从版本 2.3.5 开始,将splitIterables 属性@KafkaListener 自false 整个结果将作为单个ProducerRecord .
这需要在回复模板的 producer 配置中使用合适的序列化器。
但是,如果回复是Iterable<Message<?>> 该属性将被忽略,每条消息将单独发送。 |
筛选消息
Spring for Apache Kafka 项目还通过FilteringMessageListenerAdapter
类,它可以包装您的MessageListener
.
此类采用RecordFilterStrategy
其中,您可以实现filter
方法来表示消息是重复的,应该被丢弃。
这有一个名为ackDiscarded
,它指示适配器是否应确认丢弃的记录。
是的false
默认情况下。
当您使用@KafkaListener
中,将RecordFilterStrategy
(并且可选ackDiscarded
),以便将侦听器包装在适当的筛选适配器中。
此外,一个FilteringBatchMessageListenerAdapter
,用于使用批处理消息侦听器。
这FilteringBatchMessageListenerAdapter 如果您的@KafkaListener 接收一个ConsumerRecords<?, ?> 而不是List<ConsumerRecord<?, ?>> 因为ConsumerRecords 是不可变的。 |
从版本 2.8.4 开始,您可以覆盖侦听器容器工厂的默认值RecordFilterStrategy
通过使用filter
Listener 注解的属性。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
重试投放
请参阅DefaultErrorHandler
在 Handling Exceptions 中。
开始@KafkaListener
序列中的 s
一个常见的用例是在另一个侦听器使用主题中的所有记录后启动侦听器。
例如,您可能希望在处理来自其他主题的记录之前将一个或多个压缩主题的内容加载到内存中。
从版本 2.7.3 开始,新组件ContainerGroupSequencer
已引入。
它使用@KafkaListener
containerGroup
属性将容器分组在一起,并在当前组中的所有容器都处于空闲状态时启动下一个组中的容器。
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在这里,我们有 4 个监听器,分为两组,g1
和g2
.
在应用程序上下文初始化期间,排序器将autoStartup
属性设置为false
.
它还将idleEventInterval
对于任何容器(尚未设置),设置为提供的值(在本例中为 5000 毫秒)。
然后,当 sequencer 由应用程序上下文启动时,将启动第一组中的容器。
如ListenerContainerIdleEvent
s,则每个容器中的每个子容器都将被停止。
当ConcurrentMessageListenerContainer
已停止,则父容器将停止。
当一个组中的所有容器都已停止时,将启动下一个组中的容器。
组中的组或容器的数量没有限制。
默认情况下,最终组 (g2
上图)在空闲时不会停止。
要修改该行为,请将stopLastGroupWhenIdle
自true
在 Sequencer 上。
顺便说一句;以前,每个组中的容器都被添加到类型为Collection<MessageListenerContainer>
其中 Bean 名称是containerGroup
.
这些集合现在已被弃用,取而代之的是 bean 类型的ContainerGroup
其中 Bean 名称是组名称,后缀为.group
;在上面的示例中,将有 2 个 beang1.group
和g2.group
.
这Collection
Bean 将在将来的发行版中删除。
用KafkaTemplate
接收
本节介绍如何使用KafkaTemplate
以接收消息。
从版本 2.8 开始,该模板有四个receive()
方法:
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
如您所见,您需要知道需要检索的记录的分区和偏移量;一个新的Consumer
为每个作创建 (和 Closed)。
使用最后两种方法,将单独检索每条记录,并将结果组合成ConsumerRecords
对象。
创建TopicPartitionOffset
s 的请求,则仅支持正的绝对偏移量。
4.1.5. 侦听器容器属性
财产 | 违约 | 描述 |
---|---|---|
1 |
提交待处理偏移量之前的记录数,当 |
|
|
一条 |
|
Batch |
控制 offset 的提交频率 - 请参阅 Committing Offsets。 |
|
5000 |
在 |
|
LATEST_ONLY _NO_TX |
是否在分配时提交初始位置;默认情况下,仅当 |
|
假 |
启用无序提交(请参阅 手动提交偏移量);使用者将暂停,提交将延迟,直到填补间隙。 |
|
|
当不为 null 时,一个 |
|
(空字符串) |
的 |
|
假 |
设置为 |
|
假 |
设置为 |
|
|
当存在且 |
|
|
一个 |
|
调试 |
与提交偏移量相关的日志的日志记录级别。 |
|
|
再平衡侦听器;请参阅 重新平衡侦听器。 |
|
30 秒 |
在记录错误之前等待使用者启动的时间;例如,如果您使用的 Task Executor 线程不足,则可能会发生这种情况。 |
|
|
用于运行使用者线程的任务执行程序。
默认执行程序会创建名为 |
|
|
请参阅 Delivery Attempts 标头。 |
|
|
Exactly Once 语义模式;请参阅 Exactly Once 语义。 |
|
|
当使用事务创建者生成的记录时,如果使用者位于分区的末尾,则滞后可能会错误地报告为大于零,因为伪记录用于指示事务提交/回滚,并且可能存在回滚记录。
这在功能上不会影响消费者,但一些用户表示担心 “滞后” 不为零。
将此属性设置为 |
|
|
覆盖使用者 |
|
5.0 |
乘数 |
|
0 |
用于通过在轮询之间休眠线程来减慢投放速度。
处理一批记录的时间加上此值必须小于 |
|
|
设置后,将启用 |
|
|
设置后,将启用 |
|
没有 |
用于覆盖在 Consumer Factory 上配置的任何任意 Consumer 属性。 |
|
|
设置为 true 可在 INFO 级别记录所有容器属性。 |
|
|
消息侦听器。 |
|
|
是否维护使用者线程的 Micrometer 计时器。 |
|
空 |
要添加到微米度量的静态标签的映射。 |
|
|
一个基于使用者记录提供动态标签的函数。 |
|
|
如果代理上不存在已配置的主题,则为 true 时阻止容器启动。 |
|
30 秒 |
检查使用者线程状态的频率 |
|
3.0 |
乘以 |
|
|
设置为 false 以记录完整的使用者记录(错误、调试日志等),而不仅仅是 |
|
|
当容器暂停时,在当前记录之后停止处理,而不是在处理上一次轮询的所有记录之后停止处理;其余记录保留在内存中,并将在容器恢复时传递给侦听器。 |
|
5000 |
传入的超时 |
|
100 |
传入的超时 |
|
假 |
如果容器因授权/身份验证异常而停止,则为 True,则重新启动容器。 |
|
|
要在其上运行使用者监视器任务的计划程序。 |
|
10000 |
阻止 |
|
|
如果 |
|
|
当容器停止时,请在当前记录之后停止处理,而不是在处理上一次轮询的所有记录之后停止处理。 |
|
见 desc. |
使用批处理侦听器时,如果这是 |
|
|
在以下情况下使用的超时 |
|
|
是否对偏移量使用 sync 或 async commits;看 |
|
不适用 |
配置的主题、主题模式或显式分配的主题/分区。
互斥;必须至少提供一个;强制执行者 |
|
|
请参阅 事务。 |
财产 | 违约 | 描述 |
---|---|---|
|
一 |
|
应用程序上下文 |
事件发布者。 |
|
见 desc. |
已弃用 - 请参阅 |
|
|
设置 |
|
Bean 名称 |
容器的 Bean 名称;后缀为 |
|
见 desc. |
|
|
|
容器 properties 实例。 |
|
见 desc. |
已弃用 - 请参阅 |
|
见 desc. |
已弃用 - 请参阅 |
|
见 desc. |
这 |
|
|
确定 |
|
见 desc. |
用户配置的容器的 Bean 名称或 |
|
零 |
要在 |
|
(只读) |
如果已请求使用者暂停,则为 True。 |
|
|
设置 |
|
30 秒 |
当 |
财产 | 违约 | 描述 |
---|---|---|
(只读) |
当前分配给此容器的分区(显式或不显式)。 |
|
(只读) |
当前分配给此容器的分区(显式或不显式)。 |
|
|
由并发容器使用,为每个子容器的使用者提供唯一的 |
|
不适用 |
如果已请求暂停,并且使用者实际上已暂停,则为 True。 |
财产 | 违约 | 描述 |
---|---|---|
|
设置为 false 可禁止向 |
|
(只读) |
当前分配给此容器的子级的分区的聚合 |
|
(只读) |
当前分配给此容器的子级的分区 |
|
1 |
子级数 |
|
不适用 |
如果已请求暂停,并且所有子容器的使用者实际上都已暂停,则为 True。 |
|
不适用 |
对所有子项的引用 |
4.1.6. 动态创建容器
有几种技术可用于在运行时创建侦听器容器。 本节探讨了其中的一些技术。
MessageListener 实现
如果你直接实现自己的侦听器,你可以简单地使用容器工厂为该侦听器创建一个原始容器:
public class MyListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
class MyListener : MessageListener<String?, String?> {
override fun onMessage(data: ConsumerRecord<String?, String?>) {
// ...
}
}
private fun createContainer(
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
val container = factory.createContainer(topic)
container.containerProperties.messageListener = MyListener()
container.containerProperties.groupId = group
container.beanName = group
container.start()
return container
}
原型 Bean
带有 Comments 的方法的容器@KafkaListener
可以通过将 bean 声明为 prototype 来动态创建:
public class MyPojo {
private final String id;
private final String topic;
public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}
public String getId() {
return this.id;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(id: String?, topic: String?) {
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
fun listen(`in`: String?) {
println(`in`)
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
return MyPojo(id, topic)
}
applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
侦听器必须具有唯一的 ID。
从版本 2.8.9 开始,KafkaListenerEndpointRegistry 有新方法unregisterListenerContainer(String id) 以允许您重复使用 ID。
取消注册容器不会stop() 容器,您必须自己执行此作。 |
4.1.7. 应用程序事件
以下 Spring 应用程序事件由侦听器容器及其使用者发布:
-
ConsumerStartingEvent
- 在 Consumer 线程首次启动时发布,在它开始轮询之前。 -
ConsumerStartedEvent
- 在消费者即将开始轮询时发布。 -
ConsumerFailedToStartEvent
- 如果没有,则发布ConsumerStartingEvent
发布在consumerStartTimeout
container 属性。 此事件可能表示配置的任务执行程序没有足够的线程来支持使用它的容器及其并发性。 出现此情况时,还会记录错误消息。 -
ListenerContainerIdleEvent
:在 中未收到任何消息时发布idleInterval
(如果已配置)。 -
ListenerContainerNoLongerIdleEvent
:在之前发布ListenerContainerIdleEvent
. -
ListenerContainerPartitionIdleEvent
:在未收到来自该分区的消息时发布idlePartitionEventInterval
(如果已配置)。 -
ListenerContainerPartitionNoLongerIdleEvent
:当从之前发布了ListenerContainerPartitionIdleEvent
. -
NonResponsiveConsumerEvent
:当使用者似乎在poll
方法。 -
ConsumerPartitionPausedEvent
:当分区暂停时,由每个使用者发布。 -
ConsumerPartitionResumedEvent
:每个 Consumer 在恢复分区时发布的 -
ConsumerPausedEvent
:在容器暂停时由每个使用者发布。 -
ConsumerResumedEvent
:容器恢复时由每个使用者发布。 -
ConsumerStoppingEvent
:由每个使用者在停止前发布。 -
ConsumerStoppedEvent
:在 Consumer 关闭后发布。 请参见线程安全。 -
ConsumerRetryAuthEvent
:当消费者的身份验证或授权失败并正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent
:成功重试身份验证或授权时发布。只能发生在存在ConsumerRetryAuthEvent
以前。 -
ContainerStoppedEvent
:当所有使用者都已停止时发布。
默认情况下,应用程序上下文的事件 multicaster 在调用线程上调用事件侦听器。
如果将 multicaster 更改为使用异步执行程序,则不得调用任何Consumer 方法。 |
这ListenerContainerIdleEvent
具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
idleTime
:发布事件时容器处于空闲状态的时间。 -
topicPartitions
:在生成事件时为容器分配的主题和分区。 -
consumer
:对 Kafka 的引用Consumer
对象。 例如,如果使用者的pause()
方法,它可以resume()
当收到事件时。 -
paused
:容器当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ListenerContainerNoLongerIdleEvent
具有相同的属性,但idleTime
和paused
.
这ListenerContainerPartitionIdleEvent
具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
idleTime
:发布事件时分区消耗处于空闲状态的时间。 -
topicPartition
:触发事件的主题和分区。 -
consumer
:对 Kafka 的引用Consumer
对象。 例如,如果使用者的pause()
方法,它可以resume()
当收到事件时。 -
paused
:该使用者的分区使用当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ListenerContainerPartitionNoLongerIdleEvent
具有相同的属性,但idleTime
和paused
.
这NonResponsiveConsumerEvent
具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
timeSinceLastPoll
:容器上次调用之前的时间poll()
. -
topicPartitions
:在生成事件时为容器分配的主题和分区。 -
consumer
:对 Kafka 的引用Consumer
对象。 例如,如果使用者的pause()
方法,它可以resume()
当收到事件时。 -
paused
:容器当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ConsumerPausedEvent
,ConsumerResumedEvent
和ConsumerStopping
事件具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partitions
:这TopicPartition
涉及的实例。
这ConsumerPartitionPausedEvent
,ConsumerPartitionResumedEvent
事件具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partition
:这TopicPartition
实例。
这ConsumerRetryAuthEvent
event 具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
reason
-
AUTHENTICATION
- 由于身份验证异常,事件已发布。 -
AUTHORIZATION
- 事件因授权异常而发布。
-
这ConsumerStartingEvent
,ConsumerStartingEvent
,ConsumerFailedToStartEvent
,ConsumerStoppedEvent
,ConsumerRetryAuthSuccessfulEvent
和ContainerStoppedEvent
事件具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。
所有容器(无论是子容器还是父容器)发布ContainerStoppedEvent
.
对于父容器,source 和 container 属性相同。
此外,ConsumerStoppedEvent
具有以下附加属性:
-
reason
-
NORMAL
- 消费者正常停止 (容器已停止)。 -
ERROR
-一个java.lang.Error
被抛出。 -
FENCED
- 事务性生产者被隔离,并且stopContainerWhenFenced
container 属性为true
. -
AUTH
-一AuthenticationException
或AuthorizationException
被抛出,并且authExceptionRetryInterval
未配置。 -
NO_OFFSET
- 分区没有偏移量,并且auto.offset.reset
policy 为none
.
-
在出现此类情况后,您可以使用此事件重新启动容器:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的使用者
虽然效率很高,但异步使用者的一个问题是检测它们何时处于空闲状态。 如果在一段时间内没有消息到达,您可能需要采取一些措施。
您可以配置侦听器容器以发布ListenerContainerIdleEvent
当一段时间过去了,没有消息传递时。
当容器处于空闲状态时,每idleEventInterval
毫秒。
要配置此功能,请将idleEventInterval
在容器上。
以下示例显示了如何执行此作:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
return container;
}
以下示例演示如何设置idleEventInterval
对于@KafkaListener
:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在上述每种情况下,当容器处于空闲状态时,每分钟发布一次事件。
如果出于某种原因,消费者poll()
方法未退出,则不会收到任何消息,并且无法生成空闲事件(这是早期版本的kafka-clients
当无法访问代理时)。
在这种情况下,容器会发布一个NonResponsiveConsumerEvent
如果轮询未返回3x
这pollTimeout
财产。
默认情况下,此检查在每个容器中每 30 秒执行一次。
您可以通过设置monitorInterval
(默认为 30 秒)和noPollThreshold
(默认为 3.0)属性中的ContainerProperties
配置 Listener 容器时。
这noPollThreshold
应大于1.0
以避免由于争用条件而获得虚假事件。
接收此类事件可让您停止容器,从而唤醒使用者,使其可以停止。
从版本 2.6.2 开始,如果容器发布了ListenerContainerIdleEvent
,它将发布一个ListenerContainerNoLongerIdleEvent
当随后收到记录时。
事件消耗
您可以通过实现ApplicationListener
— 一般侦听器或缩小范围以仅接收此特定事件的侦听器。
您还可以使用@EventListener
,引入于 Spring Framework 4.2 中。
下一个示例结合了@KafkaListener
和@EventListener
合并到单个类中。
您应该了解应用程序侦听器会获取所有容器的事件,因此,如果您想根据哪个容器处于空闲状态来执行特定作,则可能需要检查侦听器 ID。
您还可以使用@EventListener
condition
为此目的。
有关事件属性的信息,请参阅应用程序事件。
该事件通常在使用者线程上发布,因此可以安全地与Consumer
对象。
以下示例同时使用@KafkaListener
和@EventListener
:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器查看所有容器的事件。
因此,在前面的示例中,我们根据侦听器 ID 缩小接收的事件范围。
由于为@KafkaListener 支持并发,实际的容器命名为id-n 其中,n 是每个实例的唯一值,以支持并发。
这就是我们使用startsWith 在条件。 |
如果您希望使用 idle 事件来停止 lister 容器,则不应调用container.stop() 在调用侦听器的线程上。
这样做会导致延迟和不必要的日志消息。
相反,您应该将事件移交给其他线程,然后该线程可以停止容器。
此外,您不应stop() 容器实例(如果它是子容器)。
您应该改为停止并发容器。 |
空闲时的当前位置
注意,当检测到空闲时,可以通过实现ConsumerSeekAware
在你的侦听器中。
看onIdleContainer()
在 Seek to a specific offset 中。
4.1.8. Topic/Partition 初始偏移量
有几种方法可以设置分区的初始偏移量。
手动分配分区时,您可以在配置的TopicPartitionOffset
参数(请参阅 消息侦听器容器)。
您也可以随时寻找特定的偏移量。
当您在 broker 分配分区的位置使用组管理时:
-
对于新的
group.id
,则初始偏移量由auto.offset.reset
Consumer 属性 (earliest
或latest
). -
对于现有组 ID,初始偏移量是该组 ID 的当前偏移量。 但是,您可以在初始化期间(或之后的任何时间)寻找特定的偏移量。
4.1.9. 寻找特定的偏移量
为了 seek,您的侦听器必须实现ConsumerSeekAware
,该方法包括以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
这registerSeekCallback
在启动容器时以及分配分区时调用。
在初始化后的任意时间进行 seek 时,您应该使用此回调。
您应该保存对回调的引用。
如果您在多个容器(或在ConcurrentMessageListenerContainer
),您应该将回调存储在ThreadLocal
或由侦听器键控的其他结构Thread
.
使用群组管理时,onPartitionsAssigned
在分配分区时调用。
例如,您可以使用此方法通过调用 callback 来设置 partition 的初始偏移量。
您还可以使用此方法将此线程的回调与分配的分区相关联(请参阅下面的示例)。
您必须使用 callback 参数,而不是传递给registerSeekCallback
.
从版本 2.5.5 开始,即使使用手动分区分配,也会调用此方法。
onPartitionsRevoked
在容器停止或 Kafka 撤销分配时调用。
您应该丢弃此线程的回调并删除与已撤销分区的任何关联。
该回调有以下方法:
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection=<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection=<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
seekRelative
在版本 2.3 中添加,以执行相对查找。
-
offset
negative 和toCurrent
false
- 相对于分区末尾查找。 -
offset
positive 和toCurrent
false
- 相对于分区的开头查找。 -
offset
negative 和toCurrent
true
- 相对于当前位置查找 (倒带)。 -
offset
positive 和toCurrent
true
- 相对于当前位置 seek (Fast Forward)。
这seekToTimestamp
method 也在 2.3 版中被添加。
在onIdleContainer 或onPartitionsAssigned 方法,则第二种方法是首选,因为在对使用者的offsetsForTimes 方法。
当从其他位置调用时,容器将收集所有时间戳查找请求,并对offsetsForTimes . |
您还可以从onIdleContainer()
当检测到空闲容器时。
有关如何启用空闲容器检测的信息,请参阅检测空闲和无响应的使用者。
这seekToBeginning 接受集合的方法非常有用,例如,在处理压缩的主题时,并且您希望在每次启动应用程序时都寻找开头: |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时任意查找,请使用registerSeekCallback
对于适当的线程。
这是一个简单的 Spring Boot 应用程序,它演示了如何使用回调;它向主题发送 10 条记录;打<Enter>
使所有分区都查找到开头。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了简化作,版本 2.3 添加了AbstractConsumerSeekAware
class 来跟踪要用于主题/分区的回调。
以下示例演示如何在每次容器空闲时在每个分区中查找处理的最后一条记录。
它还具有允许任意外部调用按一条记录倒带分区的方法。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
版本 2.6 向 abstract 类添加了便捷方法:
-
seekToBeginning()
- 查找所有已分配的分区到开头 -
seekToEnd()
- 查找所有已分配的分区到末尾 -
seekToTimestamp(long time)
- 将所有分配的分区查找到该时间戳表示的偏移量。
例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
}
}
4.1.10. 容器工厂
如@KafkaListener
注解一个ConcurrentKafkaListenerContainerFactory
用于为带注释的方法创建容器。
从版本 2.2 开始,您可以使用同一个工厂创建任何ConcurrentMessageListenerContainer
.
如果要创建多个具有相似属性的容器,或者希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能很有用。
创建容器后,您可以进一步修改其属性,其中许多属性是使用container.getContainerProperties()
.
以下示例将ConcurrentMessageListenerContainer
:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以这种方式创建的容器不会添加到终端节点注册表中。
它们应该被创建为@Bean 定义,以便将它们注册到应用程序上下文中。 |
从版本 2.3.4 开始,您可以添加ContainerCustomizer
到工厂,以便在创建和配置每个容器后进一步配置每个容器。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
4.1.11. 线程安全
使用并发消息侦听器容器时,将在所有使用者线程上调用单个侦听器实例。 因此,侦听器需要是线程安全的,最好使用无状态侦听器。 如果无法使侦听器线程安全,或者添加同步会显著降低添加并发的好处,则可以使用以下几种技术之一:
-
用
n
容器concurrency=1
将原型作用域MessageListener
bean 的 Bean 中,以便每个容器都获得自己的实例(这在使用@KafkaListener
). -
保持 state 在
ThreadLocal<?>
实例。 -
让单例侦听器委托给在
SimpleThreadScope
(或类似范围)。
为了便于清理线程状态(对于前面列表中的第二项和第三项),从版本 2.2 开始,侦听器容器会发布一个ConsumerStoppedEvent
当每个线程退出时。
您可以通过ApplicationListener
或@EventListener
删除方法ThreadLocal<?>
instances 或remove()
线程范围的 bean。
请注意,SimpleThreadScope
不会销毁具有销毁接口(例如DisposableBean
),因此您应该destroy()
实例。
默认情况下,应用程序上下文的事件 multicaster 在调用线程上调用事件侦听器。 如果将 multicaster 更改为使用异步执行程序,则线程清理无效。 |
4.1.12. 监控
监视侦听器性能
从版本 2.3 开始,侦听器容器将自动创建和更新 MicrometerTimer
s 表示侦听器,如果Micrometer
在类路径上检测到,并且单个MeterRegistry
存在于应用程序上下文中。
可以通过设置ContainerProperty
micrometerEnabled
自false
.
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。
计时器被命名为spring.kafka.listener
并具有以下标签:
-
name
:(容器 Bean 名称) -
result
:success
或failure
-
exception
:none
或ListenerExecutionFailedException
您可以使用ContainerProperties
micrometerTags
财产。
从版本 2.9.8、3.0.6 开始,您可以在ContainerProperties
micrometerTagsProvider
;该函数接收ConsumerRecord<?, ?>
并返回可以基于该记录的标签,并与micrometerTags
.
使用并发容器,将为每个线程创建计时器,并且name 标签的后缀为-n 其中 n 是0 自concurrency-1 . |
监控 KafkaTemplate 性能
从版本 2.5 开始,模板将自动创建并更新 MicrometerTimer
s 用于发送作,如果Micrometer
在类路径上检测到,并且单个MeterRegistry
存在于应用程序上下文中。
可以通过设置模板的micrometerEnabled
property 设置为false
.
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。
计时器被命名为spring.kafka.template
并具有以下标签:
-
name
:(模板 Bean 名称) -
result
:success
或failure
-
exception
:none
或失败的异常类名称
您可以使用模板的micrometerTags
财产。
从版本 2.9.8、3.0.6 开始,您可以提供KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)
财产;该函数接收ProducerRecord<?, ?>
并返回可以基于该记录的标签,并与micrometerTags
.
Micrometer 原生指标
从版本 2.5 开始,框架提供了 Factory Listeners 来管理 MicrometerKafkaClientMetrics
实例。
要启用此功能,只需将侦听器添加到您的 producer 和 consumer 工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
消费者/生产者id
传递给侦听器的 Tag 将添加到带有 Tag Name 的计量器标签中spring.id
.
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count()
为StreamsBuilderFactoryBean
- 请参阅 KafkaStreams Micrometer 支持。
千分尺观察
从 3.0 版本开始,现在支持使用 Micrometer 进行观察,因为KafkaTemplate
和侦听器容器。
设置observationEnabled
自true
在KafkaTemplate
和ContainerProperties
使观察成为可能;这将禁用 Micrometer Timers,因为计时器现在将随每个观测一起管理。
有关更多信息,请参阅 Micrometer Tracing 。
要向计时器/跟踪添加标签,请配置自定义KafkaTemplateObservationConvention
或KafkaListenerObservationConvention
分别添加到模板或侦听器容器中。
默认实现会添加bean.name
标签用于模板观察项,而listener.id
标记。
你可以子类化DefaultKafkaTemplateObservationConvention
或DefaultKafkaListenerObservationConvention
或提供全新的实现。
请参阅千分尺观测文档,了解记录的默认观测值的详细信息。
从版本 3.0.6 开始,您可以根据使用者或创建者记录中的信息向计时器和跟踪添加动态标签。
为此,请添加自定义KafkaListenerObservationConvention
和/或KafkaTemplateObservationConvention
添加到侦听器容器属性中,或者KafkaTemplate
分别。
这record
属性都包含ConsumerRecord
或ProducerRecord
分别。
4.1.13. 事务
本节介绍 Spring for Apache Kafka 如何支持事务。
概述
0.11.0.0 客户端库添加了对事务的支持。 Spring for Apache Kafka 通过以下方式添加支持:
-
KafkaTransactionManager
:与常规 Spring 事务支持 (@Transactional
,TransactionTemplate
等)。 -
事务
KafkaMessageListenerContainer
-
本地事务
KafkaTemplate
-
与其他事务管理器的事务同步
通过提供DefaultKafkaProducerFactory
替换为transactionIdPrefix
.
在这种情况下,而不是管理单个共享的Producer
,工厂维护事务性生产者的缓存。
当用户调用close()
在 producer 上,它被返回到缓存以供重用,而不是实际关闭。
这transactional.id
property 是transactionIdPrefix
+ n
哪里n
开头为0
,并且对于每个新生产者都会递增。
在早期版本的 Spring for Apache Kafka 中,transactional.id
对于由具有基于记录的侦听器的侦听器容器启动的事务,以不同方式生成,以支持隔离僵尸,这不再需要,使用EOSMode.V2
是从 3.0 开始的唯一选项。
对于使用多个实例运行的应用程序,transactionIdPrefix
每个实例必须唯一。
另请参阅 Exactly Once 语义。
另请参阅transactionIdPrefix
.
使用 Spring Boot,只需要将spring.kafka.producer.transaction-id-prefix
property - Boot 将自动配置KafkaTransactionManager
bean 并将其连接到侦听器容器中。
从版本 2.5.8 开始,您现在可以配置maxAge 属性。
当使用可能为代理的transactional.id.expiration.ms .
使用电流kafka-clients ,这可能会导致ProducerFencedException 没有再平衡。
通过设置maxAge 更改为小于transactional.id.expiration.ms ,如果 producer 已超过其最大年龄,则工厂将刷新 Producer。 |
用KafkaTransactionManager
这KafkaTransactionManager
是 Spring Framework 的PlatformTransactionManager
.
在其构造函数中提供了对 producer 工厂的引用。
如果您提供自定义生产者工厂,则它必须支持事务。
看ProducerFactory.transactionCapable()
.
您可以使用KafkaTransactionManager
使用常规的 Spring 事务支持 (@Transactional
,TransactionTemplate
等)。
如果事务处于活动状态,则任何KafkaTemplate
在事务范围内执行的作使用事务的Producer
.
管理器提交或回滚事务,具体取决于成功或失败。
您必须配置KafkaTemplate
以使用相同的ProducerFactory
作为事务管理器。
事务同步
本节引用仅限生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅Using Consumer-Initiated Transaction。
如果要将记录发送到 kafka 并执行一些数据库更新,则可以使用常规的 Spring 事务管理,例如,使用DataSourceTransactionManager
.
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
用于@Transactional
annotation 启动事务,并且KafkaTemplate
将事务与该事务管理器同步;每次发送都将参与该交易。
当方法退出时,数据库事务将提交,然后是 Kafka 事务。
如果你希望以相反的顺序执行提交(Kafka 优先),请使用 nested@Transactional
方法,并将外部方法配置为使用DataSourceTransactionManager
,而 inner 方法配置为使用KafkaTransactionManager
.
有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅与其他事务管理器的 Kafka 事务示例。
从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果同步事务上的提交失败(在主事务提交之后),将向调用方抛出异常。 以前,这会被静默忽略(在 debug 时记录)。 如有必要,应用程序应采取补救措施来补偿已提交的主事务。 |
使用使用者发起的事务
这ChainedKafkaTransactionManager
从 2.7 版本开始,现已弃用;请参阅 JavaDocs 的 super classChainedTransactionManager
了解更多信息。
相反,请使用KafkaTransactionManager
以启动 Kafka 事务,并使用@Transactional
以启动另一个事务。
有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅使用其他事务管理器的 Kafka 事务示例。
KafkaTemplate
本地事务
您可以使用KafkaTemplate
在本地事务中执行一系列作。
以下示例显示了如何执行此作:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板本身 (this
).
如果回调正常退出,则提交事务。
如果引发异常,则回滚事务。
如果存在KafkaTransactionManager (或 synchronized) 事务,则不使用它。
相反,使用新的 “嵌套” 事务。 |
transactionIdPrefix
跟EOSMode.V2
(又名BETA
),则唯一支持的模式,则不再需要使用相同的transactional.id
,即使对于消费者发起的交易;事实上,它在每个实例上都必须是唯一的,这与生产者发起的事务相同。
此属性在每个应用程序实例上必须具有不同的值。
KafkaTemplate
事务性和非事务性发布
通常,当KafkaTemplate
是事务性的(配置了支持事务的 producer 工厂),事务是必需的。
该交易可以通过TransactionTemplate
一个@Transactional
方法, 调用executeInTransaction
或由侦听器容器配置时,如果配置了KafkaTransactionManager
.
任何在事务范围之外使用模板的尝试都会导致模板抛出IllegalStateException
.
从版本 2.4.3 开始,你可以将模板的allowNonTransactional
property 设置为true
.
在这种情况下,模板将允许作在没有事务的情况下运行,方法是调用ProducerFactory
的createNonTransactionalProducer()
方法;Producer 将像往常一样被缓存或线程绑定,以便重用。
看用DefaultKafkaProducerFactory
.
使用 Batch 侦听器的事务
当侦听器在使用事务时失败时,AfterRollbackProcessor
调用以在回滚发生后执行一些作。
当使用默认的AfterRollbackProcessor
使用 Record 侦听器时,将执行 seek 以重新传递失败的记录。
但是,使用批处理侦听器时,将重新交付整个批处理,因为框架不知道批处理中的哪条记录失败。
有关更多信息,请参阅 After-rollback Processor 。
使用批处理侦听器时,版本 2.4.2 引入了一种替代机制来处理处理批处理时的故障。这BatchToRecordAdapter
.
当具有batchListener
设置为 true 时,将配置有BatchToRecordAdapter
,则一次使用一条记录调用侦听器。
这将启用批处理中的错误处理,同时仍可以停止处理整个批处理,具体取决于异常类型。
默认的BatchToRecordAdapter
,可以使用标准的ConsumerRecordRecoverer
例如DeadLetterPublishingRecoverer
.
以下测试用例配置代码段说明了如何使用此功能:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}
4.1.14. 恰好一次语义
您可以为侦听器容器提供KafkaAwareTransactionManager
实例。
配置后,容器会在调用侦听器之前启动事务。
任何KafkaTemplate
侦听器执行的作参与事务。
如果侦听器成功处理了记录(或多条记录),则在使用BatchMessageListener
),容器使用producer.sendOffsetsToTransaction()
),在事务管理器提交事务之前。
如果侦听器引发异常,则回滚事务并重新定位使用者,以便可以在下一次轮询时检索回滚的记录。
有关更多信息以及处理反复失败的记录,请参阅 After-rollback Processor 。
使用事务可实现 Exactly Once 语义 (EOS)。
这意味着,对于read→process-write
sequence 时,可以保证 sequence 只完成一次。
(读取和进程具有至少 1 次语义)。
Spring for Apache Kafka 版本 3.0 及更高版本仅支持EOSMode.V2
:
-
V2
- 又名 fetch-offset-request fencing (自 2.5 版本起)
这要求代理版本为 2.5 或更高版本。 |
带模式V2
,则无需为每个group.id/topic/partition
因为使用者元数据与偏移量一起发送到事务,并且代理可以使用该信息来确定创建者是否被隔离。
有关更多信息,请参阅 KIP-447。
V2
之前BETA
;这EOSMode
已更改以使框架与 KIP-732 保持一致。
4.1.15. 将 Spring Bean 连接到生产者/消费者拦截器
Apache Kafka 提供了一种向生产者和使用者添加拦截器的机制。
这些对象由 Kafka 而不是 Spring 管理,因此普通的 Spring 依赖注入不适用于依赖的 Spring Bean 中的连接。
但是,您可以使用拦截器手动连接这些依赖项config()
方法。
下面的 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂以将一些依赖的 bean 添加到配置属性中来做到这一点。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
Map<String, Object> producerProperties = new HashMap<>();
// producerProperties.put(..., ...)
// ...
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("some.bean", someBean);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
return factory;
}
@Bean
public SomeBean someBean() {
return new SomeBean();
}
@KafkaListener(id = "kgk897", topics = "kgh897")
public void listen(String in) {
System.out.println("Received " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("kgh897", "test");
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kgh897")
.partitions(1)
.replicas(1)
.build();
}
}
public class SomeBean {
public void someMethod(String what) {
System.out.println(what + " in my foo bean");
}
}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
结果:
producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test
4.1.16. 在 Spring 中管理的 Producer 拦截器
从版本 3.0.0 开始,当涉及到 producer 拦截器时,你可以让 Spring 直接将其作为 bean 进行管理,而不是将拦截器的类名提供给 Apache Kafka 生产者配置。
如果你使用这种方法,那么你需要将这个 producer 拦截器 设置为 onKafkaTemplate
.
下面是一个使用相同MyProducerInterceptor
,但更改为不使用 internal config 属性。
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private final SomeBean bean;
public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
在发送记录之前,onSend
调用生产者拦截器的方法。
服务器发送发布数据的确认后,onAcknowledgement
方法。
这onAcknowledgement
在创建者调用任何用户回调之前调用。
如果您有多个通过 Spring 管理的此类生产者拦截器,这些拦截器需要在KafkaTemplate
,您需要使用CompositeProducerInterceptor
相反。CompositeProducerInterceptor
允许按顺序添加单个 producer 拦截器。
底层ProducerInterceptor
实现按照添加到CompositeProducerInterceptor
.
4.1.17. 暂停和恢复侦听器容器
添加了版本 2.1.3pause()
和resume()
方法传递给侦听器容器。
以前,您可以在ConsumerAwareMessageListener
并通过监听ListenerContainerIdleEvent
,它提供对Consumer
对象。
虽然您可以使用事件侦听器在空闲容器中暂停使用者,但在某些情况下,这不是线程安全的,因为无法保证在使用者线程上调用事件侦听器。
要安全地暂停和恢复使用者,您应该使用pause
和resume
方法。
一个pause()
在下一个poll()
;一个resume()
在当前poll()
返回。
当容器暂停时,它将继续poll()
使用者,如果正在使用组管理,则避免再平衡,但它不检索任何记录。
有关更多信息,请参阅 Kafka 文档。
从版本 2.1.5 开始,您可以调用isPauseRequested()
以查看pause()
已被调用。
但是,使用者可能实际上尚未暂停。isConsumerPaused()
如果所有Consumer
实例实际上已暂停。
此外(也是从 2.1.5 开始),ConsumerPausedEvent
和ConsumerResumedEvent
实例与容器一起发布,作为source
property 和TopicPartition
实例中涉及的partitions
财产。
从版本 2.9 开始,一个新的 container 属性pauseImmediate
,如果设置为 true,则会导致暂停在处理当前记录后生效。
默认情况下,暂停在处理了上一次轮询中的所有记录后生效。
请参阅 [pauseImmediate]。
以下简单的 Spring Boot 应用程序通过使用容器注册表来获取对@KafkaListener
方法的容器并暂停或恢复其使用者以及接收相应的事件:
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
下面的清单显示了前面示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
4.1.18. 暂停和恢复侦听器容器上的分区
从版本 2.7 开始,您可以使用pausePartition(TopicPartition topicPartition)
和resumePartition(TopicPartition topicPartition)
方法。
暂停和恢复分别发生在poll()
类似于pause()
和resume()
方法。
这isPartitionPauseRequested()
method 如果已请求该分区的 pause,则返回 true。
这isPartitionPaused()
method 如果该分区已有效暂停,则返回 true。
同样从 2.7 版本开始ConsumerPartitionPausedEvent
和ConsumerPartitionResumedEvent
实例与容器一起发布,作为source
property 和TopicPartition
实例。
4.1.19. 序列化、反序列化和消息转换
概述
Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。
它与org.apache.kafka.common.serialization.Serializer<T>
和org.apache.kafka.common.serialization.Deserializer<T>
abstractions 的 API。
同时,我们可以使用Producer
或Consumer
configuration 属性。
以下示例显示了如何执行此作:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特殊的情况,KafkaConsumer
(因此,KafkaProducer
) 提供重载的
要接受的构造函数Serializer
和Deserializer
instances 的keys
和values
分别。
当您使用此 API 时,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
还提供属性(通过构造函数或 setter 方法)来注入自定义Serializer
和Deserializer
实例到目标Producer
或Consumer
.
此外,您还可以传入Supplier<Serializer>
或Supplier<Deserializer>
实例 - 这些Supplier
s 在创建每个Producer
或Consumer
.
字符串序列化
从 2.5 版本开始, Spring for Apache Kafka 提供了ToStringSerializer
和ParseStringDeserializer
使用实体的 String 表示的类。
他们依赖于方法toString
还有一些Function<String>
或BiFunction<String, Headers>
解析实例的 String 并填充属性。
通常,这会在类上调用一些静态方法,例如parse
:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer
配置为传达有关记录中序列化实体的类型信息Headers
.
您可以通过设置addTypeInfo
属性设置为 false。
此信息可由ParseStringDeserializer
在接收方。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
要在ToStringSerializer
(将addTypeInfo
属性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置Charset
用于转换String
往返byte[]
默认值为UTF-8
.
你可以使用 parser 方法的名称来配置反序列化器ConsumerConfig
性能:
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
属性必须包含类的完全限定名称,后跟方法名称,用句点分隔.
.
该方法必须是静态的,并且具有(String, Headers)
或(String)
.
一个ToFromStringSerde
,用于 Kafka Streams。
JSON 格式
Spring for Apache Kafka 还提供JsonSerializer
和JsonDeserializer
基于
Jackson JSON 对象映射器。
这JsonSerializer
允许将任何 Java 对象编写为 JSONbyte[]
.
这JsonDeserializer
需要额外的Class<?> targetType
参数以允许对 consumedbyte[]
添加到正确的目标对象中。
以下示例演示如何创建JsonDeserializer
:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以自定义两者JsonSerializer
和JsonDeserializer
替换为ObjectMapper
.
您还可以扩展它们以实现configure(Map<String, ?> configs, boolean isKey)
方法。
从版本 2.3 开始,默认情况下,所有 JSON 感知组件都使用JacksonUtils.enhancedObjectMapper()
实例,它附带了MapperFeature.DEFAULT_VIEW_INCLUSION
和DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
功能已禁用。
此外,此类实例还提供了用于自定义数据类型的众所周知的模块,例如 Java 时间和 Kotlin 支持。
看JacksonUtils.enhancedObjectMapper()
JavaDocs 了解更多信息。
此方法还会注册一个org.springframework.kafka.support.JacksonMimeTypeModule
为org.springframework.util.MimeType
对象序列化为纯字符串,以便通过网络实现平台间兼容性。
一个JacksonMimeTypeModule
可以在应用程序上下文中注册为 bean,并且它将被自动配置到Spring BootObjectMapper
实例.
同样从版本 2.3 开始,JsonDeserializer
提供TypeReference
的构造函数,以便更好地处理目标泛型容器类型。
从版本 2.1 开始,您可以在 record 中传达类型信息Headers
,允许处理多种类型。
此外,您可以使用以下 Kafka 属性配置序列化程序和反序列化程序。
如果您提供了Serializer
和Deserializer
instances 的KafkaConsumer
和KafkaProducer
分别。
配置属性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
要在JsonSerializer
(将addTypeInfo
属性)。 -
JsonSerializer.TYPE_MAPPINGS
(默认empty
):请参阅映射类型。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
忽略序列化程序设置的标头。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
保留序列化程序设置的标头。 -
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在标头信息,则用于键反序列化的回退类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化值的回退类型。 -
JsonDeserializer.TRUSTED_PACKAGES
(默认java.util
,java.lang
):允许反序列化的包模式的逗号分隔列表。 表示 deserialize all。*
-
JsonDeserializer.TYPE_MAPPINGS
(默认empty
):请参阅映射类型。 -
JsonDeserializer.KEY_TYPE_METHOD
(默认empty
):请参阅使用方法确定类型。 -
JsonDeserializer.VALUE_TYPE_METHOD
(默认empty
):请参阅使用方法确定类型。
从版本 2.2 开始,类型信息 Headers(如果由序列化器添加)由反序列化器删除。
您可以通过设置removeTypeHeaders
property 设置为false
,可以直接在 deserializer 上或使用前面描述的 configuration 属性。
从版本 2.8 开始,如果你以编程方式构造序列化器或反序列化器,如 编程构造 中所示,只要你没有显式设置任何属性(使用set*() 方法或使用 Fluent API)。
以前,以编程方式创建时,从不应用配置属性;如果您直接在对象上显式设置属性,则仍然会出现这种情况。 |
映射类型
从版本 2.2 开始,在使用 JSON 时,您现在可以使用前面列表中的属性来提供类型映射。
以前,您必须在序列化器和反序列化器中自定义类型映射器。
映射由逗号分隔的token:className
对。
在出站时,有效负载的类名将映射到相应的令牌。
在入站时,type 标头中的令牌将映射到相应的类名。
以下示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
相应的对象必须兼容。 |
如果使用 Spring Boot,则可以在application.properties
(或 yaml)文件。
以下示例显示了如何执行此作:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性执行简单配置。
对于更高级的配置(例如使用自定义的
还提供了 Setter,作为使用这些构造函数的替代方法。 |
从版本 2.2 开始,你可以显式地将反序列化器配置为使用提供的目标类型,并通过使用具有布尔值的重载构造函数之一来忽略 Headers 中的类型信息useHeadersIfPresent
(即true
默认情况下)。
以下示例显示了如何执行此作:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从版本 2.5 开始,您现在可以通过 properties 配置反序列化器,以调用方法来确定目标类型。
如果存在,这将覆盖上面讨论的任何其他技术。
如果数据是由不使用 Spring 序列化器的应用程序发布的,并且您需要根据数据或其他 Headers 反序列化为不同的类型,那么这可能很有用。
将这些属性设置为方法名称 - 一个完全限定的类名,后跟方法名称,用句点分隔.
.
该方法必须声明为public static
具有三个签名之一(String topic, byte[] data, Headers headers)
,(byte[] data, Headers headers)
或(byte[] data)
并返回 JacksonJavaType
.
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意标头或检查数据以确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,请考虑使用JsonPath
或类似的,但是,用于确定类型的测试越简单,过程的效率就越高。
以下是以编程方式创建反序列化器的示例(在构造函数中为 Consumer Factory 提供反序列化器时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化构建
当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要以编程方式提供类型映射,类似于使用方法确定类型,请使用typeFunction
财产。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要您不使用 Fluent API 来配置属性,或者使用set*()
方法,工厂将使用配置属性配置序列化器/反序列化器;请参阅配置属性。
委托序列化器和反序列化器
使用标头
版本 2.3 引入了DelegatingSerializer
和DelegatingDeserializer
,这允许生成和使用具有不同键和/或值类型的记录。
创建者必须设置一个 headerDelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
设置为选择器值,该值用于选择用于值的序列化程序,并将DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
对于密钥;如果未找到匹配项,则IllegalStateException
被抛出。
对于传入记录,反序列化器使用相同的 Headers 来选择要使用的反序列化器;如果未找到匹配项或标头不存在,则 RAWbyte[]
返回。
您可以将 selector 的映射配置为Serializer
/ Deserializer
通过构造函数,或者您可以通过 Kafka 生产者/消费者属性使用键DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
.
对于序列化程序,producer 属性可以是Map<String, Object>
其中 key 是 selector,值是Serializer
instance、序列化器Class
或类名。
该属性也可以是逗号分隔的映射条目的 String,如下所示。
对于反序列化器,consumer 属性可以是Map<String, Object>
其中 key 是 selector,值是Deserializer
instance、一个反序列化器Class
或类名。
该属性也可以是逗号分隔的映射条目的 String,如下所示。
要使用 properties 进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,创建者会将DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
header 设置为thing1
或thing2
.
此技术支持向同一主题(或不同主题)发送不同类型的内容。
从版本 2.5.1 开始,如果类型(键或值)是Serdes (Long ,Integer 等)。
相反,序列化程序会将标头设置为类型的类名。
没有必要为这些类型配置 serializer 或 deserializers,它们将动态创建(一次)。 |
有关将不同类型的 CAN 发送到不同主题的另一种技术,请参阅用RoutingKafkaTemplate
.
按类型
版本 2.8 引入了DelegatingByTypeSerializer
.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,你可以配置序列化器来检查是否可以从目标对象分配 map 键,这在委托序列化器可以序列化子类时很有用。
在这种情况下,如果存在友好匹配项,则排序的Map
,例如LinkedHashMap
应该提供。
按主题
从版本 2.8 开始,DelegatingByTopicSerializer
和DelegatingByTopicDeserializer
允许根据 topic name 选择 serializer/deserializer。
正则表达式Pattern
s 用于查找要使用的实例。
可以使用构造函数或通过属性(以逗号分隔的pattern:serializer
).
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
用KEY_SERIALIZATION_TOPIC_CONFIG
当将其用于键时。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
.
附加属性DelegatingByTopicSerialization.CASE_SENSITIVE
(默认true
),当设置为false
使 Topic lookup 不区分大小写。
重试 Deserializer
这RetryingDeserializer
使用委托Deserializer
和RetryTemplate
在反序列化期间,当委托可能出现暂时性错误(如网络问题)时重试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
请参阅 spring-retry 项目,了解RetryTemplate
使用 retry policy、back off policy 等。
Spring 消息传递消息转换
尽管Serializer
和Deserializer
API 与低级 Kafka 相比非常简单灵活Consumer
和Producer
perspective,那么在使用@KafkaListener
或 Spring Integration 的 Apache Kafka 支持。
让您轻松转换org.springframework.messaging.Message
,Spring for Apache Kafka 提供了一个MessageConverter
abstraction 替换为MessagingMessageConverter
implementation 及其JsonMessageConverter
(和子类)自定义。
您可以注入MessageConverter
转换为KafkaTemplate
实例,以及使用AbstractKafkaListenerContainerFactory
bean 定义@KafkaListener.containerFactory()
财产。
以下示例显示了如何执行此作:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 时,只需将转换器定义为@Bean
Spring Boot 自动配置会将其连接到自动配置的模板和容器工厂中。
当您使用@KafkaListener
,则参数类型将提供给消息转换器以协助进行转换。
只有当 |
在消费者端,您可以配置 在生产者端,当您使用 Spring Integration 或
同样,使用 为方便起见,从版本 2.3 开始,框架还提供了一个 |
从版本 2.7.1 开始,可以将消息有效负载转换委托给spring-messaging
SmartMessageConverter
;例如,这使得转换可以基于MessageHeaders.CONTENT_TYPE
页眉。
这KafkaMessageConverter.fromMessage() 方法将出站转化转换为ProducerRecord 将消息负载设在ProducerRecord.value() 财产。
这KafkaMessageConverter.toMessage() method 用于入站转化ConsumerRecord 其中 payload 是ConsumerRecord.value() 财产。
这SmartMessageConverter.toMessage() method 创建新的出站Message<?> 从Message 传递给'fromMessage()'(通常由KafkaTemplate.send(Message<?> msg) ).
同样,在KafkaMessageConverter.toMessage() 方法,在转换器创建新的Message<?> 从ConsumerRecord 这SmartMessageConverter.fromMessage() 方法,然后使用新转换的有效负载创建最终入站消息。
在任何一种情况下,如果SmartMessageConverter 返回null ,则使用原始消息。 |
当在KafkaTemplate
和 Listener Container Factory 中,您可以配置SmartMessageConverter
通过调用setMessagingConverter()
在模板上,通过contentMessageConverter
属性@KafkaListener
方法。
例子:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data Projection 接口
从版本 2.1.1 开始,你可以将 JSON 转换为 Spring Data Projection 接口,而不是具体类型。 这允许对数据进行非常有选择性的低耦合绑定,包括从 JSON 文档中的多个位置查找值。 例如,可以将以下接口定义为消息有效负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,将使用访问器方法在接收到的 JSON 文档中查找属性名称作为字段。
这@JsonPath
expression 允许自定义值查找,甚至定义多个 JSON 路径表达式,以便从多个位置查找值,直到表达式返回实际值。
要启用此功能,请使用ProjectingMessageConverter
配置了适当的委托转换器(用于出站转换和转换非投影接口)。
您还必须添加spring-data:spring-data-commons
和com.jayway.jsonpath:json-path
添加到 class 路径。
当用作@KafkaListener
方法,接口类型会像往常一样自动传递给转换器。
用ErrorHandlingDeserializer
当反序列化器无法反序列化消息时, Spring 无法处理该问题,因为它发生在poll()
返回。
为了解决这个问题,ErrorHandlingDeserializer
已引入。
此 deserializer 委托给真正的 deserializer (key 或 value)。
如果委托无法反序列化记录内容,则ErrorHandlingDeserializer
返回null
value 和DeserializationException
在包含 Cause 和 Raw 字节的标头中。
当您使用记录级别的MessageListener
,如果ConsumerRecord
包含一个DeserializationException
标头中,容器的ErrorHandler
调用失败的ConsumerRecord
.
记录不会传递给侦听器。
或者,您也可以配置ErrorHandlingDeserializer
要通过提供failedDeserializationFunction
,它是一个Function<FailedDeserializationInfo, T>
.
调用此函数以创建T
,它以通常的方式传递给侦听器。
类型为FailedDeserializationInfo
,其中包含所有上下文信息。
您可以找到DeserializationException
(作为序列化的 Java 对象)在 Headers 中。
请参阅 Javadoc 以获取ErrorHandlingDeserializer
了解更多信息。
您可以使用DefaultKafkaConsumerFactory
接受 key 和 value 的构造函数Deserializer
对象和连线ErrorHandlingDeserializer
实例。
或者,您可以使用使用者配置属性(由ErrorHandlingDeserializer
) 实例化委托。
属性名称包括ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
.
属性值可以是类或类名。
以下示例显示如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用failedDeserializationFunction
.
public class BadFoo extends Foo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {
@Override
public Foo apply(FailedDeserializationInfo info) {
return new BadFoo(info);
}
}
前面的示例使用以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);
...
如果使用者配置了ErrorHandlingDeserializer 配置KafkaTemplate 及其 producer 及其序列化器,该序列化器可以处理普通对象和原始对象byte[] 值,这是由反序列化异常引起的。
模板的 generic value type 应为Object .
一种方法是使用DelegatingByTypeSerializer ;示例如下: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当使用ErrorHandlingDeserializer
使用批处理侦听器时,必须检查消息标头中的反序列化异常。
当与DefaultBatchErrorHandler
,您可以使用该标头来确定异常失败的记录,并通过BatchListenerFailedException
.
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
可用于将标头转换为DeserializationException
.
消费List<ConsumerRecord<?, ?>
,SerializationUtils.getExceptionFromHeader()
替换为:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您还使用DeadLetterPublishingRecoverer ,则为DeserializationException 将具有record.value() 的类型byte[] ;这不应该被序列化。
考虑使用DelegatingByTypeSerializer 配置为使用ByteArraySerializer 为byte[] 以及所有其他类型的普通序列化程序(Json、Avro 等)。 |
使用批量侦听器进行有效负载转换
您还可以使用JsonMessageConverter
在BatchMessagingMessageConverter
在使用 Batch Listener Container Factory 时转换 Batch 消息。
有关更多信息,请参见序列化、反序列化和消息转换和 Spring 消息传递消息转换。
默认情况下,转换的类型是从 listener 参数推断的。
如果配置JsonMessageConverter
替换为DefaultJackson2TypeMapper
它有它的TypePrecedence
设置为TYPE_ID
(而不是默认的INFERRED
),则转换器会改用 Headers 中的 type 信息(如果存在)。
例如,这允许使用接口而不是具体类来声明侦听器方法。
此外,类型转换器支持映射,因此反序列化可以是与源不同的类型(只要数据兼容)。
当您使用类级别@KafkaListener
实例其中,有效负载必须已转换,以确定要调用的方法。
下面的示例创建使用此方法的 bean:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,要使其正常工作,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,如下所示:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批处理标头。
如果批处理转换器具有支持它的记录转换器,则您还可以接收一个消息列表,其中的有效负载根据泛型类型进行转换。 以下示例显示了如何执行此作:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
...
}
ConversionService
定制
从版本 2.1.1 开始,org.springframework.core.convert.ConversionService
默认使用o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory
为了解析用于调用侦听器方法的参数,所有实现以下任何接口的 bean 都提供了该方法:
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
这使您可以进一步自定义侦听器反序列化,而无需更改ConsumerFactory
和KafkaListenerContainerFactory
.
设置自定义MessageHandlerMethodFactory 在KafkaListenerEndpointRegistrar 通过KafkaListenerConfigurer Bean 禁用此功能。 |
添加自定义HandlerMethodArgumentResolver
自@KafkaListener
从版本 2.4.2 开始,您可以添加自己的HandlerMethodArgumentResolver
并解析自定义方法参数。
您只需实施KafkaListenerConfigurer
和使用方法setCustomMethodArgumentResolvers()
来自类KafkaListenerEndpointRegistrar
.
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您还可以通过添加自定义MessageHandlerMethodFactory
到KafkaListenerEndpointRegistrar
豆。
如果您执行此作,并且您的应用程序需要处理逻辑删除记录,则使用null
value()
(例如,来自压缩的主题),您应该添加一个KafkaNullAwarePayloadArgumentResolver
到工厂;它必须是最后一个解析器,因为它支持所有类型,并且可以匹配没有@Payload
注解。
如果您使用的是DefaultMessageHandlerMethodFactory
,将此解析程序设置为最后一个自定义解析程序;工厂将确保此解析器在标准PayloadMethodArgumentResolver
,它不知道KafkaNull
负载。
4.1.20. 消息头
0.11.0.0 客户端引入了对消息中标头的支持。
从版本 2.0 开始, Spring for Apache Kafka 现在支持将这些 Headers 映射到和从spring-messaging
MessageHeaders
.
映射的先前版本ConsumerRecord 和ProducerRecord 到 spring-messagingMessage<?> ,其中 value 属性映射到payload 和其他属性 (topic ,partition 等)映射到标头。
情况仍然如此,但现在可以映射其他(任意)标头。 |
Apache Kafka 标头具有一个简单的 API,如以下接口定义所示:
public interface Header {
String key();
byte[] value();
}
这KafkaHeaderMapper
策略用于在 Kafka 之间映射标头条目Headers
和MessageHeaders
.
其接口定义如下:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
这SimpleKafkaHeaderMapper
将原始标头映射为byte[]
,以及用于转换为String
值。
这DefaultKafkaHeaderMapper
将 key 映射到MessageHeaders
标头名称,为了支持出站消息的丰富标头类型,将执行 JSON 转换。
一个“特殊”报头(键为spring_json_header_types
) 包含<key>:<type>
.
此标头在入站端使用,以提供每个标头值到原始类型的适当转换。
在入站端,所有 KafkaHeader
实例映射到MessageHeaders
.
在出站端,默认情况下,所有MessageHeaders
映射,但id
,timestamp
以及映射到ConsumerRecord
性能。
您可以通过向映射器提供模式来指定要为出站消息映射的标头。 下面的清单显示了一些示例映射:
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
1 | 使用默认的 JacksonObjectMapper 并映射大多数 headers,如示例前面所讨论的那样。 |
2 | 使用提供的 JacksonObjectMapper 并映射大多数 headers,如示例前面所讨论的那样。 |
3 | 使用默认的 JacksonObjectMapper 并根据提供的模式映射标头。 |
4 | 使用提供的 JacksonObjectMapper 并根据提供的模式映射标头。 |
模式相当简单,可以包含一个前导通配符 ()、尾随通配符或两者(例如
.cat.*
).
您可以使用前导!
.
与标头名称匹配的第一个模式(无论是正的还是负的)获胜。
当您提供自己的模式时,我们建议包括!id
和!timestamp
,因为这些标头在入站端是只读的。
默认情况下,映射器仅反序列化java.lang 和java.util .
您可以通过添加受信任的软件包来信任其他(或所有)软件包addTrustedPackages 方法。
如果您收到来自不受信任的来源的消息,您可能希望只添加您信任的那些包。
要信任所有包,您可以使用mapper.addTrustedPackages("*") . |
映射String 当与不知道映射器的 JSON 格式的系统通信时,原始形式的标头值非常有用。 |
从版本 2.2.5 开始,您可以指定某些字符串值 Headers 不应使用 JSON 进行 Map,而是应映射到原始 Headers 或从原始 Mapbyte[]
.
这AbstractKafkaHeaderMapper
具有新属性;mapAllStringsOut
当设置为 true 时,所有字符串值标头都将转换为byte[]
使用charset
属性(默认UTF-8
).
此外,还有一个属性rawMappedHeaders
,它是header name : boolean
;如果映射包含标头名称,并且标头包含String
值,它将被映射为 RAWbyte[]
使用 charset.
此映射还用于映射原始传入byte[]
headers 设置为String
当且仅当映射值中的布尔值为true
.
如果布尔值为false
,或者标头名称不在映射中,并且带有true
值,则传入的 Headers 会简单地映射为原始的未映射 Header。
以下测试用例说明了此机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个 Headers 映射器都会映射所有入站 Headers。 从版本 2.8.8 开始,模式也可以应用于入站映射。 要创建用于入站映射的映射器,请在相应的映射器上使用静态方法之一:
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以abc
并包括所有其他
默认情况下,DefaultKafkaHeaderMapper
用于MessagingMessageConverter
和BatchMessagingMessageConverter
,只要 Jackson 在 class 路径上。
使用批处理转换器,转换后的标头在KafkaHeaders.BATCH_CONVERTED_HEADERS
作为List<Map<String, Object>>
其中,列表位置的 map 对应于 payload 中的数据位置。
如果没有转换器(因为 Jackson 不存在,或者它被显式设置为null
),则使用者记录中的标头在KafkaHeaders.NATIVE_HEADERS
页眉。
此标头是一个Headers
object(或List<Headers>
如果是 Batch Converter),其中列表中的位置对应于 payload 中的数据位置)。
某些类型不适合 JSON 序列化,并且简单的toString() 序列化可能是这些类型的首选。
这DefaultKafkaHeaderMapper 有一个名为addToStringClasses() 这允许您提供应以这种方式处理出站映射的类的名称。
在入站映射期间,它们被映射为String .
默认情况下,只有org.springframework.util.MimeType 和org.springframework.http.MediaType 以这种方式映射。 |
从版本 2.3 开始,简化了 String 值 Headers 的处理。
默认情况下,此类标头不再是 JSON 编码的(即它们没有封闭"…" 添加)。
该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String(从byte[] ).
mapper 可以处理(解码)旧版本生成的 headers(它会检查前导);这样,使用 2.3 的应用程序可以使用旧版本中的记录。" |
为了与早期版本兼容,请将encodeStrings 自true ,如果使用 2.3 的版本生成的记录可能被使用早期版本的应用程序使用。
当所有应用程序都使用 2.3 或更高版本时,可以将该属性保留为默认值false . |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它会自动将此转换器 bean 配置到自动配置的KafkaTemplate
;否则,您应该将此转换器添加到模板中。
4.1.21. 'Tombstone' 记录的 Null 有效负载和日志压缩
当您使用 Log Compaction 时,您可以使用null
payloads 来标识密钥的删除。
您还可以接收null
值,例如Deserializer
那可能会回来null
当它无法反序列化值时。
要发送null
payload 使用KafkaTemplate
中,您可以将 null 传递到send()
方法。
一个例外是send(Message<?> message)
变体。
因为spring-messaging
Message<?>
不能有null
payload 中,您可以使用名为KafkaNull
,框架发送null
.
为方便起见,静态KafkaNull.INSTANCE
。
当您使用消息侦听器容器时,收到的ConsumerRecord
具有null
value()
.
要配置@KafkaListener
处理null
payload 中,您必须使用@Payload
annotation 替换为required = false
.
如果它是压缩日志的逻辑删除消息,您通常还需要该密钥,以便您的应用程序可以确定哪个密钥被“删除”。
以下示例显示了此类配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您使用类级别@KafkaListener
具有多个@KafkaHandler
方法,则需要一些额外的配置。
具体来说,您需要一个@KafkaHandler
方法替换为KafkaNull
有效载荷。
以下示例显示如何配置一个:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是null
不KafkaNull
.
请参阅手动分配所有分区。 |
此功能需要使用KafkaNullAwarePayloadArgumentResolver 框架将在使用默认MessageHandlerMethodFactory .
使用自定义MessageHandlerMethodFactory 看添加自定义HandlerMethodArgumentResolver 自@KafkaListener . |
4.1.22. 处理异常
本节介绍如何处理在使用 Spring for Apache Kafka 时可能出现的各种异常。
侦听器错误处理程序
从版本 2.0 开始,@KafkaListener
annotation 具有一个新属性:errorHandler
.
您可以使用errorHandler
要提供KafkaListenerErrorHandler
实现。
这个功能接口有一个方法,如下面的清单所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问 spring-messagingMessage<?>
对象和侦听器抛出的异常,该异常包装在ListenerExecutionFailedException
.
错误处理程序可以引发原始异常或新异常,该异常将引发到容器中。
错误处理程序返回的任何内容都将被忽略。
从版本 2.7 开始,您可以设置rawRecordHeader
属性MessagingMessageConverter
和BatchMessagingMessageConverter
这会导致 RAWConsumerRecord
添加到已转换的Message<?>
在KafkaHeaders.RAW_DATA
页眉。
这很有用,例如,如果您希望使用DeadLetterPublishingRecoverer
在侦听器错误处理程序中。
它可能用于请求/回复方案,即您希望在一定次数的重试后,在死信主题中捕获失败的记录后,将失败结果发送给发件人。
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口 (ConsumerAwareListenerErrorHandler
) 访问消费者对象,通过以下方法访问该 Consumer 对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口 (ManualAckListenerErrorHandler
) 提供对Acknowledgment
使用手动时的对象AckMode
s.
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何一种情况下,您都不应该对使用者执行任何查找,因为容器不会知道它们。
容器错误处理程序
从版本 2.8 开始,旧版ErrorHandler
和BatchErrorHandler
interfaces 已被新的CommonErrorHandler
.
这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。CommonErrorHandler
提供了替换大多数旧版框架错误处理程序实现的实现,并弃用了旧版错误处理程序。
侦听器容器和侦听器容器工厂仍支持旧式接口;它们将在将来的发行版中弃用。
看将自定义旧版错误处理程序实现迁移到CommonErrorHandler
有关将自定义错误处理程序迁移到 的信息CommonErrorHandler
.
使用事务时,默认情况下不会配置错误处理程序,以便异常将回滚事务。
事务容器的错误处理由AfterRollbackProcessor
.
如果您在使用事务时提供自定义错误处理程序,并且您希望回滚事务,它必须引发异常。
此接口有一个 default 方法isAckAfterHandle()
它由容器调用,以确定如果错误处理程序返回而不引发异常,是否应提交偏移量;默认情况下,它返回 true。
通常,当错误未被“处理”时(例如,在执行 seek作之后),框架提供的错误处理程序将引发异常。
默认情况下,此类异常由容器记录在ERROR
水平。
所有框架错误处理程序都扩展了KafkaExceptionLogLevelAware
,它允许您控制记录这些异常的级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定要用于容器工厂中所有侦听器的全局错误处理程序。 以下示例显示了如何执行此作:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带注释的侦听器方法引发异常,则会将其抛出到容器中,并根据容器配置处理消息。
容器在调用错误处理程序之前提交任何待处理的偏移量提交。
如果您使用的是 Spring Boot,则只需将错误处理程序添加为@Bean
和 Boot 会将其添加到自动配置的工厂中。
退避处理程序
错误处理程序(如 DefaultErrorHandler)使用BackOff
以确定在重试投放之前要等待的时间。
从版本 2.9 开始,您可以配置自定义BackOffHandler
.
默认处理程序只是暂停线程,直到 back off 时间过去(或容器停止)。
该框架还提供了ContainerPausingBackOffHandler
这将暂停侦听器容器,直到 Back Off 时间过去,然后恢复容器。
当延迟时间超过max.poll.interval.ms
consumer 属性。
请注意,实际回退时间的分辨率会受到pollTimeout
container 属性。
DefaultErrorHandler
这个新的错误处理程序将SeekToCurrentErrorHandler
和RecoveringBatchErrorHandler
,它们现在已成为多个版本的默认错误处理程序。
一个区别是批处理侦听器的回退行为(当BatchListenerFailedException
)等效于 Retrying Complete Batches。
从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与查找未处理的记录偏移量相同的语义,如下所述,但实际上没有查找。
相反,记录由侦听器容器保留,并在错误处理程序退出后(以及在执行单个暂停poll() ,以保持使用者的活力;如果 Non-Blocking Retries 或ContainerPausingBackOffHandler 正在使用,则暂停可能会延伸到多个轮询)。
错误处理程序将结果返回到容器,指示是否可以重新提交当前失败的记录,或者它是否已恢复,然后不会再次将其发送到侦听器。
要启用此模式,请设置属性seekAfterError 自false . |
错误处理程序可以恢复 (跳过) 不断失败的记录。
默认情况下,在 10 次失败后,将记录失败的记录(在ERROR
级别)。
您可以使用自定义 recoverer (BiConsumer
) 和BackOff
,它控制每个尝试之间的传递尝试和延迟。
使用FixedBackOff
跟FixedBackOff.UNLIMITED_ATTEMPTS
导致 (有效) 无限重试。
以下示例配置三次尝试后的恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。
例如,使用@KafkaListener
container factory 中,您可以添加DefaultErrorHandler
如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录侦听器,这将重试传输最多 2 次(3 次传输尝试),后退 1 秒,而不是默认配置 (FixedBackOff(0L, 9)
).
在重试次数用尽后,只会记录失败。
举个例子;如果poll
返回 6 条记录(每个分区 0、1、2 各 2 条),侦听器在第 4 条记录上引发异常,容器通过提交前三条消息的偏移量来确认前三条消息。
这DefaultErrorHandler
寻求分区 1 的偏移量 1 和分区 2 的偏移量 0。
下一个poll()
返回 3 条未处理的记录。
如果AckMode
是BATCH
,容器会在调用错误处理程序之前提交前两个分区的偏移量。
对于批处理侦听器,侦听器必须抛出一个BatchListenerFailedException
指示批处理中的哪些记录失败。
事件的顺序是:
-
在索引之前提交记录的偏移量。
-
如果重试次数未用尽,则执行 seek作,以便重新传送所有剩余记录(包括失败的记录)。
-
如果重试次数已用尽,请尝试恢复失败的记录(仅限默认日志)并执行查找,以便重新传递剩余记录(不包括失败的记录)。 已提交已恢复记录的偏移量
-
如果重试已用尽且恢复失败,则执行查找,就像重试未用尽一样。
从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与上面讨论的查找未处理的记录偏移量相同的语义,但实际上没有查找。
相反,错误处理程序会创建一个新的ConsumerRecords<?, ?> 仅包含未处理的记录,这些记录随后将提交给侦听器(在执行单个暂停后poll() ,以保持使用者处于活动状态)。
要启用此模式,请设置属性seekAfterError 自false . |
默认 recoverer 会在重试次数用尽后记录失败的记录。
您可以使用自定义恢复器,或者框架提供的恢复器,例如DeadLetterPublishingRecoverer
.
当使用 POJO 批处理侦听器(例如List<Thing>
),并且您没有完整的 consumer 记录要添加到异常中,则只需添加失败记录的索引即可:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置了AckMode.MANUAL_IMMEDIATE
,则可以将错误处理程序配置为提交已恢复记录的偏移量;将commitRecovered
property 设置为true
.
另请参阅发布死信记录。
使用事务时,类似的功能由DefaultAfterRollbackProcessor
.
请参阅 After-rollback Processor (回滚后处理器)。
这DefaultErrorHandler
将某些异常视为致命异常,并且对于此类异常,将跳过重试;在第一次失败时调用 recoverer。
默认情况下,被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
,因为这些异常不太可能在重试的投放中得到解决。
您可以向 not-retryable 类别添加更多异常类型,或完全替换分类异常的映射。
请参阅 Javadocs 以获取DefaultErrorHandler.addNotRetryableException()
和DefaultErrorHandler.setClassifications()
有关更多信息,以及spring-retry
BinaryExceptionClassifier
.
下面是一个将IllegalArgumentException
对于不可重试的异常:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以配置一个或多个RetryListener
s,接收重试和恢复进度的通知。
从版本 2.8.10 开始,添加了批处理侦听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 javadocs。
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
如果 recoverer 发生故障,则BackOff 将默认重置,并且重新投放将再次通过回退,然后再再次尝试恢复。
要在恢复失败后跳过重试,请将错误处理程序的resetStateOnRecoveryFailure 自false . |
您可以为错误处理程序提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
要确定BackOff
若要根据失败的记录和/或异常使用:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回null
,则处理程序的默认值BackOff
将被使用。
设置resetStateOnExceptionChange
自true
并且重试序列将重新启动(包括选择新的BackOff
(如果已配置)如果异常类型在两次失败之间发生更改。
什么时候false
(版本 2.9 之前的默认值),则不考虑异常类型。
从版本 2.9 开始,现在是true
默认情况下。
另请参阅 Delivery Attempts 标头。
使用批处理错误处理程序的转换错误
从版本 2.8 开始,批处理侦听器现在可以正确处理转换错误,当使用MessageConverter
替换为ByteArrayDeserializer
一个BytesDeserializer
或StringDeserializer
以及DefaultErrorHandler
.
当发生转换错误时,有效负载将设置为 null,并将反序列化异常添加到记录标头中,类似于ErrorHandlingDeserializer
.
一个ConversionException
s 在侦听器中可用,因此侦听器可以抛出BatchListenerFailedException
指示发生 conversion 异常的第一个索引。
例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整批处理
现在是DefaultErrorHandler
对于批处理侦听器,其中侦听器抛出除BatchListenerFailedException
.
无法保证在重新交付批次时,该批次具有相同的记录数和/或重新交付的记录具有相同的顺序。
因此,不可能轻松维护批处理的重试状态。
这FallbackBatchErrorHandler
采用以下方法。
如果批处理侦听器引发的异常不是BatchListenerFailedException
,则从内存中的记录批次执行重试。
为了避免在延长的重试序列期间发生重新平衡,错误处理程序会暂停使用者,在每次重试时在休眠前轮询使用者以进行回退,然后再次调用侦听器。
如果/当重试次数用尽时,ConsumerRecordRecoverer
为批处理中的每条记录调用。
如果 recoverer 引发异常,或者线程在休眠期间中断,则将在下一次轮询时重新传递该批记录。
在退出之前,无论结果如何,使用者都会恢复。
此机制不能用于事务。 |
在等待BackOff
interval 中,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,从而允许睡眠在stop()
而不是造成延迟。
容器停止错误处理程序
这CommonContainerStoppingErrorHandler
如果侦听器引发异常,则停止容器。
对于记录侦听器,当AckMode
是RECORD
,则提交已处理记录的偏移量。
对于记录侦听器,当AckMode
是任何手动值,则会提交已确认记录的偏移量。
对于记录侦听器,wWhen 使用AckMode
是BATCH
,或者对于批处理侦听器,则在重新启动容器时重放整个批处理。
容器停止后,将ListenerExecutionFailedException
被抛出。
这是为了使事务回滚(如果启用了事务)。
委派错误处理程序
这CommonDelegatingErrorHandler
可以委托给不同的错误处理程序,具体取决于异常类型。
例如,您可能希望调用DefaultErrorHandler
对于大多数例外情况,或者CommonContainerStoppingErrorHandler
对于其他人来说。
对 Record 和 Batch 侦听器使用不同的常见错误处理程序
如果您希望对记录和批处理侦听器使用不同的错误处理策略,则CommonMixedErrorHandler
允许为每个侦听器类型配置特定的错误处理程序。
常见错误处理程序摘要
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
旧版错误处理程序及其替换
旧版错误处理程序 | 更换 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
不可更换、使用 |
|
|
|
无替代品 - 使用 |
将自定义旧版错误处理程序实现迁移到CommonErrorHandler
请参阅CommonErrorHandler
.
要将ErrorHandler
或ConsumerAwareErrorHandler
implementation 中,您应该实现handleOne()
然后离开seeksAfterHandle()
返回false
(默认)。
您还应该实现handleOtherException()
- 处理发生在记录处理范围之外的异常(例如,消费者错误)。
要将RemainingRecordsErrorHandler
implementation 中,您应该实现handleRemaining()
并覆盖seeksAfterHandle()
返回true
(错误处理程序必须执行必要的查找)。
您还应该实现handleOtherException()
- 处理发生在记录处理范围之外的异常(例如,消费者错误)。
要替换任何BatchErrorHandler
implementation 中,您应该实现handleBatch()
您还应该实现handleOtherException()
- 处理发生在记录处理范围之外的异常(例如,消费者错误)。
回滚后处理器
使用事务时,如果侦听器引发异常(并且错误处理程序(如果存在)引发异常),则事务将回滚。
默认情况下,任何未处理的记录(包括失败的记录)都会在下次轮询时重新获取。
这是通过执行seek
作中的DefaultAfterRollbackProcessor
.
使用批处理侦听器,将重新处理整个记录批次(容器不知道批处理中的哪条记录失败)。
要修改此行为,您可以使用自定义AfterRollbackProcessor
.
例如,对于基于记录的侦听器,您可能希望跟踪失败的记录,并在尝试一定次数后放弃,也许是通过将其发布到死信主题。
从版本 2.2 开始,DefaultAfterRollbackProcessor
现在可以恢复 (跳过) 不断失败的记录。
默认情况下,在 10 次失败后,将记录失败的记录(在ERROR
级别)。
您可以使用自定义恢复器 (BiConsumer
) 和最大失败次数。
设置maxFailures
属性设置为负数会导致无限次重试。
以下示例配置三次尝试后的恢复:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当您不使用事务时,您可以通过配置DefaultErrorHandler
.
请参阅 容器错误处理程序。
批处理侦听器无法进行恢复,因为框架不知道批处理中的哪条记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。 |
另请参阅发布死信记录。
从版本 2.2.5 开始,DefaultAfterRollbackProcessor
可以在新事务中调用(在失败的事务回滚后启动)。
然后,如果您使用的是DeadLetterPublishingRecoverer
要发布失败的记录,处理器会将 Recovered 记录在原始 topic/partition 中的 offset 发送到 transaction。
要启用此功能,请将commitRecovered
和kafkaTemplate
属性DefaultAfterRollbackProcessor
.
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
从版本 2.5.5 开始,如果 recoverer 失败,则BackOff 将默认重置,并且重新投放将再次通过回退,然后再再次尝试恢复。
在早期版本中,BackOff 未重置,并在下次失败时重新尝试恢复。
要恢复到之前的行为,请将处理器的resetStateOnRecoveryFailure property 设置为false . |
从版本 2.6 开始,您现在可以为处理器提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
要确定BackOff
若要根据失败的记录和/或异常使用:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回null
,则处理器的默认值BackOff
将被使用。
从版本 2.6.3 开始,将resetStateOnExceptionChange
自true
并且重试序列将重新启动(包括选择新的BackOff
(如果已配置)如果异常类型在两次失败之间发生更改。
默认情况下,不考虑异常类型。
从版本 2.3.1 开始,类似于DefaultErrorHandler
这DefaultAfterRollbackProcessor
将某些异常视为致命异常,并且对于此类异常,将跳过重试;在第一次失败时调用 recoverer。
默认情况下,被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
,因为这些异常不太可能在重试的投放中得到解决。
您可以向 not-retryable 类别添加更多异常类型,或完全替换分类异常的映射。
请参阅 Javadocs 以获取DefaultAfterRollbackProcessor.setClassifications()
有关更多信息,以及spring-retry
BinaryExceptionClassifier
.
下面是一个将IllegalArgumentException
对于不可重试的异常:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参阅 Delivery Attempts 标头。
使用电流kafka-clients ,容器无法检测ProducerFencedException 是由再平衡引起的,或者如果生产者的transactional.id 已因超时或到期而被撤销。
因为,在大多数情况下,它是由再平衡引起的,所以容器不会调用AfterRollbackProcessor (因为寻找分区是不合适的,因为我们不再被分配它们)。
如果您确保超时时间足够大,可以处理每个事务并定期执行“空”事务(例如,通过ListenerContainerIdleEvent ),您可以避免由于超时和过期而导致的屏蔽。
或者,您可以将stopContainerWhenFenced container 属性设置为true 容器将停止,避免记录丢失。
您可以使用ConsumerStoppedEvent 并检查Reason 属性FENCED 以检测此情况。
由于该事件还引用了容器,因此您可以使用此事件重新启动容器。 |
从版本 2.7 开始,在等待BackOff
interval 中,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,从而允许睡眠在stop()
而不是造成延迟。
从版本 2.7 开始,处理器可以配置一个或多个RetryListener
s,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 javadocs。
Delivery Attempts 标头
以下内容仅适用于记录侦听器,不适用于批处理侦听器。
从版本 2.5 开始,当使用ErrorHandler
或AfterRollbackProcessor
实现DeliveryAttemptAware
,则可以启用添加KafkaHeaders.DELIVERY_ATTEMPT
标头 (kafka_deliveryAttempt
) 添加到记录中。
此标头的值是从 1 开始的递增整数。
当接收到原始的ConsumerRecord<?, ?>
整数位于byte[4]
.
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt()
使用@KafkaListener
使用DefaultKafkaHeaderMapper
或SimpleKafkaHeaderMapper
,可以通过添加@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
作为 listener 方法的参数。
要启用此标头的填充,请设置 container 属性deliveryAttemptHeader
自true
.
默认情况下,它是禁用的,以避免查找每条记录的状态并添加标头的 (小) 开销。
这DefaultErrorHandler
和DefaultAfterRollbackProcessor
支持此功能。
侦听器信息报头
在某些情况下,能够知道侦听器在哪个容器中运行非常有用。
从版本 2.8.4 开始,您现在可以设置listenerInfo
属性,或将info
属性@KafkaListener
注解。
然后,容器会将此 API 添加到KafkaListener.LISTENER_INFO
所有传入邮件的标头;然后,它可以用于 Record interceptor、filters 等,或者用于侦听器本身。
@KafkaListener(id = "something", topic = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen2(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
当用于RecordInterceptor
或RecordFilterStrategy
实现中,标头在 Consumer Record 中作为字节数组,使用KafkaListenerAnnotationBeanPostProcessor
的charSet
财产。
标头映射器还会转换为String
创建MessageHeaders
从使用者记录中,并且从不将此标头映射到出站记录上。
对于 POJO 批处理侦听器,从版本 2.8.6 开始,标头被复制到批处理的每个成员中,并且也可用作单个String
参数。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理侦听器具有过滤器,并且过滤器导致空批次,则需要添加required = false 到@Header 参数,因为该信息不可用于空批处理。 |
如果您收到List<Message<Thing>>
该信息位于KafkaHeaders.LISTENER_INFO
标头Message<?>
.
有关使用批处理的更多信息,请参阅批处理侦听器。
发布死信记录
您可以配置DefaultErrorHandler
和DefaultAfterRollbackProcessor
当达到记录的最大失败次数时,使用 Record Recoverer。
该框架提供了DeadLetterPublishingRecoverer
,这会将失败的消息发布到另一个主题。
恢复器需要一个KafkaTemplate<Object, Object>
,用于发送记录。
您还可以选择使用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
,调用该函数用于解析目标 topic 和 partition。
默认情况下,死信记录将发送到名为<originalTopic>.DLT (原始主题名称后缀为.DLT ) 并添加到与原始记录相同的分区。
因此,当您使用默认解析程序时,死信主题必须至少具有与原始主题一样多的分区。
|
如果返回的TopicPartition
具有负分区,则分区未在ProducerRecord
,因此 Kafka 会选择该分区。
从版本 2.2.4 开始,任何ListenerExecutionFailedException
(例如,在@KafkaListener
方法)通过groupId
财产。
这允许目标解析器使用它,以及ConsumerRecord
以选择 Dead letter 主题。
以下示例显示如何连接自定义目标解析程序:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录通过以下标头进行了增强:
-
KafkaHeaders.DLT_EXCEPTION_FQCN
:Exception 类名(通常为ListenerExecutionFailedException
,但可以是其他)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
:异常原因类名(如果存在)(自版本 2.8 起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
:异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
:异常类名(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:异常堆栈跟踪(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:异常消息(仅限键反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC
:原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION
:原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
:原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
:处理记录失败的原始消费者组(自 2.8 版本起)。
Key 异常仅由DeserializationException
s 所以没有DLT_KEY_EXCEPTION_CAUSE_FQCN
.
有两种机制可以添加更多标头。
-
将 recoverer 和 override 子类化
createProducerRecord()
-叫super.createProducerRecord()
并添加更多标头。 -
提供
BiFunction
要接收 Consumer 记录和异常,请返回Headers
对象;从那里的 Headers 将被复制到最终的 producer 记录;另请参阅 管理死信记录标头。 用setHeadersFunction()
要设置BiFunction
.
第二个版本更易于实现,但第一个版本具有更多信息,包括已组装的标准标头。
从版本 2.3 开始,当与ErrorHandlingDeserializer
,发布者将恢复记录value()
,则设置为无法反序列化的原始值。
以前,value()
为 null,并且用户代码必须解码DeserializationException
从消息标头。
此外,您还可以提供多个KafkaTemplate
s 给出版商;例如,如果要发布byte[]
从DeserializationException
,以及使用与成功反序列化的记录不同的序列化程序的值。
以下是使用KafkaTemplate
使用String
和byte[]
序列化器:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来查找适合value()
即将发布。
一个LinkedHashMap
,以便按顺序检查键。
发布时null
值,当有多个模板时,Recoverer 将查找Void
类;如果不存在,则values().iterator()
将被使用。
从 2.7 开始,您可以使用setFailIfSendResultIsError
方法,以便在消息发布失败时引发异常。
您还可以使用setWaitForSendResultTimeout
.
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
从版本 2.5.5 开始,如果 recoverer 失败,则BackOff 将默认重置,并且重新投放将再次通过回退,然后再再次尝试恢复。
在早期版本中,BackOff 未重置,并在下次失败时重新尝试恢复。
要恢复到之前的行为,请将错误处理程序的resetStateOnRecoveryFailure property 设置为false . |
从版本 2.6.3 开始,将resetStateOnExceptionChange
自true
并且重试序列将重新启动(包括选择新的BackOff
(如果已配置)如果异常类型在两次失败之间发生更改。
默认情况下,不考虑异常类型。
从版本 2.3 开始,recoverer 也可以与 Kafka Streams 一起使用 - 有关更多信息,请参阅从反序列化异常中恢复。
这ErrorHandlingDeserializer
在 Headers 中添加反序列化异常ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
和ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
(使用 Java 序列化)。
默认情况下,这些标头不会保留在发布到死信主题的邮件中。
从版本 2.7 开始,如果 key 和 value 都失败了反序列化,则两者的原始值都会填充到发送到 DLT 的记录中。
如果传入的记录彼此依赖,但可能无序到达,那么将失败的记录重新发布到原始主题的尾部(多次)可能很有用,而不是将其直接发送到死信主题。 有关示例,请参阅此 Stack Overflow 问题。
以下错误处理程序配置将完全执行此作:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,recoverer 检查目标解析器选择的分区是否确实存在。
如果该分区不存在,则ProducerRecord
设置为null
,允许KafkaProducer
以选择分区。
您可以通过设置verifyPartition
property 设置为false
.
管理死信记录标头
-
appendOriginalHeaders
(默认true
) -
stripPreviousExceptionHeaders
(默认true
自 2.8 版本起)
Apache Kafka 支持多个同名的标头;要获取 “latest” 值,您可以使用headers.lastHeader(headerName)
;要获取多个标头的迭代器,请使用headers.headers(headerName).iterator()
.
当重复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为RecordTooLargeException
);对于异常标头尤其如此,尤其是对于 Stack Trace 标头。
使用这两个属性的原因是,虽然您可能希望只保留最后一个异常信息,但您可能希望保留记录在每次失败时传递的主题的历史记录。
appendOriginalHeaders
应用于所有名为ORIGINAL
而stripPreviousExceptionHeaders
应用于所有名为EXCEPTION
.
从版本 2.8.4 开始,您现在可以控制将哪些标准标头添加到输出记录中。
请参阅enum HeadersToAdd
对于默认添加的(当前)10 个标准头文件的通用名称(这些不是实际的头文件名称,只是一个抽象;实际的头文件名称由getHeaderNames()
子类可以覆盖的方法。
要排除标头,请使用excludeHeaders()
方法;例如,要禁止在标头中添加异常堆栈跟踪,请使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加ExceptionHeadersCreator
;这也会禁用所有标准异常标头。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从版本 2.8.4 开始,您现在可以通过addHeadersFunction
方法。
这允许应用其他函数,即使已经注册了另一个函数,例如,在使用非阻塞重试时。
另请参阅使用非阻塞重试的 Failure Header Management。
ExponentialBackOffWithMaxRetries
实现
Spring Framework 提供了许多BackOff
实现。
默认情况下,ExponentialBackOff
将无限期重试;要在重试尝试一定次数后放弃,需要计算maxElapsedTime
.
从版本 2.7.3 开始, Spring for Apache Kafka 提供了ExponentialBackOffWithMaxRetries
它是一个子类,它接收maxRetries
属性并自动计算maxElapsedTime
,这样方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
此作将在1, 2, 4, 8, 10, 10
秒,然后再调用 recoverer。
4.1.23. JAAS 和 Kerberos
从版本 2.0 开始,KafkaJaasLoginModuleInitializer
类以帮助进行 Kerberos 配置。
您可以使用所需的配置将此 bean 添加到应用程序上下文中。
下面的示例配置了这样的 bean:
@Bean
public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
KafkaJaasLoginModuleInitializer jaasConfig = new KafkaJaasLoginModuleInitializer();
jaasConfig.setControlFlag("REQUIRED");
Map<String, String> options = new HashMap<>();
options.put("useKeyTab", "true");
options.put("storeKey", "true");
options.put("keyTab", "/etc/security/keytabs/kafka_client.keytab");
options.put("principal", "[email protected]");
jaasConfig.setOptions(options);
return jaasConfig;
}
4.2. 非阻塞重试
版本 2.9 更改了引导基础结构 bean 的机制;请参阅配置 了解现在引导该功能所需的两种机制。
使用 Kafka 实现非阻塞重试/DLT 功能通常需要设置额外的主题并创建和配置相应的侦听器。
从 2.7 开始,Spring for Apache Kafka 通过@RetryableTopic
annotation 和RetryTopicConfiguration
类来简化该引导。
Batch 侦听器不支持非阻塞重试。 |
4.2.1. 模式如何工作
如果消息处理失败,则消息将转发到具有 back off 时间戳的重试主题。 然后,重试主题使用者会检查时间戳,如果时间戳未过期,则暂停该主题分区的消耗。 到期时,将恢复分区消耗,并再次使用消息。 如果消息处理再次失败,则消息将被转发到下一个重试主题,并重复该模式,直到处理成功或尝试用尽,并将消息发送到死信主题(如果已配置)。
举个例子,如果你有一个 “main-topic” 主题,并且想要设置非阻塞重试,指数退避为 1000 毫秒,最大尝试次数为 2 和 4,它将创建 main-topic-retry-1000、main-topic-retry-2000、main-topic-retry-4000 和 main-topic-dlt 主题并配置相应的使用者。 该框架还负责创建主题以及设置和配置侦听器。
使用此策略,您将失去 Kafka 对该主题的排序保证。 |
您可以设置AckMode 模式,但RECORD 建议。 |
目前,此功能不支持类级别@KafkaListener 附注 |
使用手册时AckMode
跟asyncAcks
设置为 true,则DefaultErrorHandler
必须配置seekAfterError
设置为false
.
从版本 2.9.10、3.0.8 开始,对于此类配置,这将无条件设置为 true。
在早期版本中,需要覆盖RetryConfigurationSupport.configureCustomizers()
method 将属性设置为true
.
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
}
此外,在这些版本之前,使用默认 (日志记录) DLT 处理程序与任何类型的手册都不兼容AckMode
,无论asyncAcks
财产。
4.2.3. 配置
从版本 2.9 开始,对于默认配置,@EnableKafkaRetryTopic
注解应该在@Configuration
annotated 类。
这使该功能能够正确引导,并允许注入一些功能的组件,以便在运行时进行查找。
没有必要同时添加@EnableKafka ,如果添加此批注,则因为@EnableKafkaRetryTopic 使用@EnableKafka . |
此外,从该版本开始,要对功能的组件和全局功能进行更高级的配置,RetryTopicConfigurationSupport
类应该在@Configuration
class 的 API 和适当的方法。
有关更多详细信息,请参阅配置全局设置和功能。
默认情况下,重试主题的容器将具有与主容器相同的并发性。
从版本 3.0 开始,您可以设置不同的concurrency
对于重试容器(在注解上,或在RetryConfigurationBuilder
).
只能使用上述技术中的一种,并且只能使用一种@Configuration 类可以扩展RetryTopicConfigurationSupport . |
使用@RetryableTopic
注解
要为@KafkaListener
annotated 方法,您只需添加@RetryableTopic
注解,Spring for Apache Kafka 将使用默认配置引导所有必要的主题和使用者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
您可以在同一类中指定一个方法来处理 DLT 消息,方法是使用@DltHandler
注解。
如果未提供 DltHandler 方法,则会创建一个默认使用者,该使用者仅记录消耗。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果未指定 kafkaTemplate 名称,则名称为defaultRetryTopicKafkaTemplate 将被查找。
如果未找到 bean,则会引发异常。 |
从版本 3.0 开始,@RetryableTopic
annotation 可以用作自定义 annotation 上的元 Annotation;例如:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
用RetryTopicConfiguration
豆
您还可以通过创建RetryTopicConfiguration
bean 中的@Configuration
annotated 类。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为使用默认配置注释有 '@KafkaListener' 的方法中的所有主题创建重试主题和 dlt 以及相应的使用者。这KafkaTemplate
instance 是消息转发所必需的。
为了实现对如何处理每个主题的非阻塞重试的更精细控制,需要多个RetryTopicConfiguration
bean 的 bean 中。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和 dlt 的使用者将被分配到一个使用者组,该使用者组的组 ID 是您在groupId 参数的@KafkaListener 注释。
如果您不提供任何 URL,它们都将属于同一个组,并且对重试主题进行再平衡将导致对主主题进行不必要的再平衡。 |
如果使用者配置了ErrorHandlingDeserializer ,要处理 deserilialization 异常,请务必配置KafkaTemplate 及其 producer 及其序列化器,该序列化器可以处理普通对象和原始对象byte[] 值,这是由反序列化异常引起的。
模板的 generic value type 应为Object .
一种方法是使用DelegatingByTypeSerializer ;示例如下: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
倍数@KafkaListener Annotations 可以用于同一主题,无论是否手动分配分区以及非阻塞重试,但给定主题只能使用一种配置。
最好使用单个RetryTopicConfiguration bean 用于配置此类主题;如果多个@RetryableTopic 注释用于同一主题,则所有注释都应该具有相同的值,否则其中一个注释将应用于该主题的所有侦听器,而其他注释的值将被忽略。 |
配置全局设置和功能
从 2.9 开始,之前用于配置组件的 bean 覆盖方法已被删除(由于上述 API 的实验性质,没有弃用)。
这不会更改RetryTopicConfiguration
Bean 方法 - 仅基础结构组件的配置。
现在,RetryTopicConfigurationSupport
类应该扩展为 (single)@Configuration
class 和正确的方法被覆盖。
示例如下:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
使用此配置方法时,@EnableKafkaRetryTopic 不应使用 annotation 来防止上下文由于重复的 bean 而无法启动。
使用简单的@EnableKafka annotation 来代替。 |
什么时候autoCreateTopics
为 true,则将使用指定数量的分区和复制因子创建 main 和 retry 主题。
从版本 3.0 开始,默认复制因子为-1
,表示使用 broker default。
如果您的代理版本低于 2.4,则需要设置一个显式值。
要覆盖特定主题(例如主题或 DLT)的这些值,只需添加NewTopic
@Bean
具有必需的属性;这将覆盖 Auto Creation 属性。
默认情况下,使用接收记录的原始分区将记录发布到重试主题。 如果重试主题的分区数少于主主题的分区数,则应适当配置框架;下面是一个示例。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
该函数的参数是使用者记录和下一个主题的名称。
您可以返回特定的分区编号,或者null
以指示KafkaProducer
应确定分区。
默认情况下,当记录通过重试主题转换时,将保留重试标头的所有值(尝试次数、时间戳)。
从版本 2.9.6 开始,如果你只想保留这些 Headers 的最后一个值,请使用configureDeadLetterPublishingContainerFactory()
方法将工厂的retainAllRetryHeaderValues
property 设置为false
.
4.2.4. 编程构建
该功能旨在与@KafkaListener
;但是,一些用户要求提供有关如何以编程方式配置非阻塞重试的信息。
下面的 Spring Boot 应用程序提供了如何执行此作的示例。
@SpringBootApplication
public class Application extends RetryTopicConfigurationSupport {
public static void main(String[] args) {
SpringApplication.run(2Application.class, args);
}
@Bean
RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
return RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(4)
.autoCreateTopicsWith(2, (short) 1)
.create(template);
}
@Bean
TaskScheduler scheduler() {
return new ThreadPoolTaskScheduler();
}
@Bean
@Order(0)
SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
Listener listener, KafkaListenerEndpointRegistry registry) {
return () -> {
KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
EndpointProcessor endpointProcessor = endpoint -> {
// customize as needed (e.g. apply attributes to retry endpoints).
if (!endpoint.equals(mainEndpoint)) {
endpoint.setConcurrency(1);
}
// these are required
endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
endpoint.setTopics("topic");
endpoint.setId("id");
endpoint.setGroupId("group");
};
mainEndpoint.setBean(listener);
try {
mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
}
catch (NoSuchMethodException | SecurityException ex) {
throw new IllegalStateException(ex);
}
mainEndpoint.setConcurrency(2);
mainEndpoint.setTopics("topic");
mainEndpoint.setId("id");
mainEndpoint.setGroupId("group");
configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
"kafkaListenerContainerFactory");
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic", "test");
};
}
}
@Component
class Listener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(KafkaUtils.format(record));
throw new RuntimeException("test");
}
}
只有在刷新应用程序上下文之前处理配置时,才会自动创建主题,如上例所示。 要在运行时配置容器,需要使用其他技术创建主题。 |
4.2.5. 功能
大多数功能都可用于@RetryableTopic
annotation 和RetryTopicConfiguration
豆。
BackOff 配置
BackOff 配置依赖于BackOffPolicy
接口中Spring Retry
项目。
它包括:
-
固定退后
-
指数回退
-
Random Exponential Back Off(随机指数回退)
-
统一随机回退
-
无退缩
-
自定义回退
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(4)
.create(template);
}
您还可以提供 Spring Retry 的SleepingBackOffPolicy
接口:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackOff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认的退避策略是FixedBackOffPolicy 最大尝试 3 次,间隔为 1000 毫秒。 |
默认最大延迟为 30 秒ExponentialBackOffPolicy .
如果您的回退策略需要值大于该值的延迟,请相应地调整 maxDelay 属性。 |
第一次尝试计入maxAttempts ,因此,如果您提供maxAttempts 值为 4,则原始尝试加上 3 次重试。 |
全局超时
您可以为重试过程设置全局超时。 如果达到该时间,则下次使用者引发异常时,消息将直接发送到 DLT,或者如果没有可用的 DLT,则直接结束处理。
@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(2000)
.timeoutAfter(5000)
.create(template);
}
默认值是没有设置超时,这也可以通过提供 -1 作为 timeout 值来实现。 |
异常分类器
您可以指定要重试的异常和不重试的异常。 您还可以将其设置为遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // Will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
默认行为是对所有异常重试,而不是遍历原因。 |
从 2.8.3 开始,有一个全局的致命异常列表,这将导致记录被发送到 DLT 而无需任何重试。
有关致命异常的默认列表,请参见DefaultErrorHandler。
您可以通过覆盖configureNonBlockingRetries
方法中的@Configuration
类,该类扩展了RetryTopicConfigurationSupport
.
有关更多信息,请参阅配置全局设置和功能。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常的分类,只需清除提供的列表即可。 |
包含和排除主题
您可以决定哪些主题将由RetryTopicConfiguration
通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics) .excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法的 bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
默认行为是包含所有主题。 |
主题自动创建
除非另有说明,否则框架将使用NewTopic
由KafkaAdmin
豆。
您可以指定分区数和用于创建主题的复制因子,并且可以关闭此功能。
从版本 3.0 开始,默认复制因子为-1
,表示使用 broker default。
如果您的代理版本低于 2.4,则需要设置一个显式值。
请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。 |
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
默认情况下,主题是使用一个分区和复制因子 -1 自动创建的(意味着使用代理默认值)。 如果您的代理版本低于 2.4,则需要设置一个显式值。 |
失败标头管理
在考虑如何管理失败标头(原始标头和异常标头)时,框架将DeadLetterPublishingRecover
以决定是附加还是替换标头。
默认情况下,它会显式设置appendOriginalHeaders
自false
和叶子stripPreviousExceptionHeaders
设置为DeadLetterPublishingRecover
.
这意味着只有第一个 “original” 和最后一个 exception headers 会保留 default configuration。 这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪标头)。
有关更多信息,请参阅 管理死信记录标题 。
要重新配置框架以对这些属性使用不同的设置,请配置DeadLetterPublishingRecoverer
Customizer 通过覆盖configureCustomizers
方法中的@Configuration
类,该类扩展了RetryTopicConfigurationSupport
.
有关更多详细信息,请参阅配置全局设置和功能。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从版本 2.8.4 开始,如果您希望添加自定义 headers(除了工厂添加的重试信息 headers 之外,您还可以添加一个headersFunction
到工厂 -factory.setHeadersFunction((rec, ex) → { … })
默认情况下,添加的任何标头都是累积的 - Kafka 标头可以包含多个值。
从版本 2.9.5 开始,如果Headers
返回的函数包含DeadLetterPublishingRecoverer.SingleRecordHeader
,则将删除该标头的任何现有值,仅保留新的单个值。
自定义 DeadLetterPublishingRecoverer
从 Failure Header Management 中可以看出,可以自定义默认的DeadLetterPublishingRecoverer
由框架创建的实例。
但是,对于某些用例,有必要将DeadLetterPublishingRecoverer
,例如,要覆盖createProducerRecord()
修改发送到 retry (或 dead-letter) 主题的内容。
从版本 3.0.9 开始,您可以覆盖RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()
方法提供DeadLetterPublisherCreator
实例,例如:
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议您在构建自定义实例时使用提供的解析程序。
4.2.6. 组合阻塞和非阻塞重试
从 2.8.4 开始,您可以将框架配置为同时使用阻塞和非阻塞重试。
例如,您可以有一组异常,这些异常也可能在下一条记录上触发错误,例如DatabaseAccessException
,因此您可以在将同一记录发送到重试主题或直接发送到 DLT 之前重试几次。
要配置阻塞重试,请覆盖configureBlockingRetries
方法中的@Configuration
类,该类扩展了RetryTopicConfigurationSupport
并添加要重试的异常以及BackOff
以供使用。
默认的BackOff
是一个FixedBackOff
没有延迟和 9 次尝试。
有关更多信息,请参阅配置全局设置和功能。
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetryException.class, MyOtherBlockingRetryException.class)
.backOff(new FixedBackOff(3000, 5));
}
结合全局可重试主题的致命异常分类,您可以针对所需的任何行为配置框架,例如让某些异常同时触发阻塞和非阻塞重试,仅触发一种或另一种,或者直接进入 DLT 而不进行任何类型的重试。 |
下面是两个配置协同工作的示例:
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
.backOff(new FixedBackOff(50, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(ShouldSkipBothRetriesException.class);
}
在此示例中:
-
ShouldRetryOnlyBlockingException.class
将仅通过阻止重试,如果所有重试都失败,将直接进入 DLT。 -
ShouldRetryViaBothException.class
将通过阻止重试,如果所有阻止重试都失败,则会转发到下一个重试主题,以进行另一组尝试。 -
ShouldSkipBothRetriesException.class
永远不会以任何方式重试,如果第一次处理尝试失败,则会直接转到 DLT。
请注意,阻止重试行为是允许列表 - 您添加确实要以这种方式重试的例外;而非阻塞重试分类是针对 FATAL 异常的,因此是 denylist - 您添加的异常不想执行非阻塞重试,而是直接发送到 DLT。 |
非阻塞异常分类行为还取决于特定主题的配置。 |
4.2.7. 访问 Delivery Attempts
要访问阻止和非阻止送达尝试,请将这些标头添加到您的@KafkaListener
方法签名:
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int blockingAttempts,
@Header(name = RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, required = false) Integer nonBlockingAttempts
只有当您将ContainerProperties
[deliveryAttemptHeader] 设置为true
.
请注意,非阻塞尝试将是null
进行初始投放。
从版本 3.0.10 开始,一个方便的KafkaMessageHeaderAccessor
以允许对这些标头进行更简单的访问;该访问器可以作为 listener 方法的参数提供:
@RetryableTopic(backoff = @Backoff(...)) @KafkaListener(id = "dh1", topics = "dh1") void listen(Thing thing, KafkaMessageHeaderAccessor accessor) { ... }
用accessor.getBlockingRetryDeliveryAttempt()
和accessor.getNonBlockingRetryDeliveryAttempt()
以获取值。
访问器将抛出IllegalStateException
如果未启用阻止重试;对于非阻塞重试,访问器返回1
进行初始投放。
4.2.8. 主题命名
重试主题和 DLT 的命名方式是在主主题后加上提供的或默认值,并附加该主题的延迟或索引。
例子:
“my-topic” → “my-topic-retry-0”, “my-topic-retry-1”, ..., “my-topic-dlt”
“my-other-topic” → “my-topic-myRetrySuffix-1000”, “my-topic-myRetrySuffix-2000”, ..., “my-topic-myDltSuffix”。
默认行为是为每次尝试创建单独的重试主题,并附加索引值:retry-0、retry-1、...、retry-n。
因此,默认情况下,重试主题的数量是配置的maxAttempts 减 1。 |
您可以配置后缀,选择是附加尝试索引还是延迟,在使用固定回退时使用单个重试主题,以及在使用指数回退时对具有 maxInterval 的尝试使用单个重试主题。
重试主题和 Dlt 后缀
您可以指定 retry 和 dlt 主题将使用的后缀。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
默认后缀为 “-retry” 和 “-dlt”,分别用于重试主题和 dlt。 |
附加主题的索引或延迟
您可以在后缀后附加主题的索引或 delay 值。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
默认行为是使用延迟值作为后缀,但具有多个主题的固定延迟配置除外,在这种情况下,主题以主题的索引为后缀。 |
固定延迟重试的单个主题
如果您使用的是固定延迟策略,例如FixedBackOffPolicy
或NoBackOffPolicy
您可以使用单个主题来完成非阻塞重试。
本主题将以 provided 或 default 后缀为后缀,并且不会附加 index 或 delay 值。
上一个FixedDelayStrategy 现已弃用,可替换为SameIntervalTopicReuseStrategy . |
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
默认行为是为每次尝试创建单独的重试主题,并附加其索引值:retry-0、retry-1、... |
maxInterval Exponential Delay 的单个主题
如果您使用的是指数退避策略 (ExponentialBackOffPolicy
),您可以使用单个重试主题来完成其延迟为配置的尝试的非阻塞重试maxInterval
.
此 “final” 重试主题将以 provided 或 default 后缀为后缀,并且将具有 index 或maxInterval
值。
通过选择使用单个主题进行重试,并使用maxInterval delay 时,配置长时间持续重试的指数重试策略可能变得更加可行,因为在这种方法中,您不需要大量的主题。 |
默认行为是使用等于配置的maxAttempts
减 1,并且在使用指数退避时,重试主题以 delay 值为后缀,最后一个重试主题(对应于maxInterval
delay)的 Delay 作为后缀。
例如,当使用initialInterval=1000
,multiplier=2
和maxInterval=16000
,为了继续尝试一小时,需要配置maxAttempts
设置为 229,默认情况下,所需的重试主题为:
-
-重试 1000
-
-重试 2000
-
-重试 4000
-
-重试 8000
-
-重试-16000-0
-
-重试 16000-1
-
-重试-16000-2
-
…
-
-重试-16000-224
当使用在相同间隔内重复使用重试主题的策略时,在上面的相同配置中,所需的重试主题为:
-
-重试 1000
-
-重试 2000
-
-重试 4000
-
-重试 8000
-
-重试 16000
这将是未来发行版中的默认值。
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 16000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 16000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
自定义命名策略
更复杂的命名策略可以通过注册一个实现RetryTopicNamesProviderFactory
.
默认实现为SuffixingRetryTopicNamesProviderFactory
并且可以通过以下方式注册不同的 implementation:
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
例如,除了标准后缀之外,以下实现还为 retry/dl 主题名称添加了前缀:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if(properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}
4.2.9. 多个侦听器,相同的主题
从版本 3.0 开始,现在可以在同一主题上配置多个侦听器。 为此,您必须使用自定义主题命名来隔离重试主题。 最好通过一个例子来说明这一点:
@RetryableTopic(...
retryTopicSuffix = "-listener1", dltTopicSuffix = "-listener1-dlt",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(id = "listener1", groupId = "group1", topics = TWO_LISTENERS_TOPIC, ...)
void listen1(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
...
}
@RetryableTopic(...
retryTopicSuffix = "-listener2", dltTopicSuffix = "-listener2-dlt",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(id = "listener2", groupId = "group2", topics = TWO_LISTENERS_TOPIC, ...)
void listen2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
...
}
这topicSuffixingStrategy
是可选的。
框架将为每个侦听器配置和使用一组单独的重试主题。
4.2.10. 分布式账本策略
该框架提供了一些使用 DLT 的策略。 您可以提供 DLT 处理方法、使用默认日志记录方法或根本不使用 DLT。 此外,您还可以选择在 DLT 处理失败时会发生什么情况。
DLT 加工方法
您可以指定用于处理主题的 DLT 的方法,以及处理失败时的行为。
为此,您可以使用@DltHandler
注解,其中@RetryableTopic
注释。
请注意,相同的方法将用于所有@RetryableTopic
该类中带注释的方法。
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT 处理程序方法也可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,将应处理 DLT 消息的 Bean 名称和方法名称作为参数传递。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
如果未提供 DLT 处理程序,则使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod。 |
从版本 2.8 开始,如果你根本不想在这个应用程序中使用 DLT,包括通过默认处理程序(或者你希望延迟使用),你可以控制 DLT 容器是否启动,独立于容器工厂的autoStartup
财产。
使用@RetryableTopic
注解中,将autoStartDltHandler
property 设置为false
;使用 Configuration Builder 时,请使用autoStartDltHandler(false)
.
您可以稍后通过KafkaListenerEndpointRegistry
.
DLT 失败行为
如果 DLT 处理失败,有两种可能的行为可用:ALWAYS_RETRY_ON_ERROR
和FAIL_ON_ERROR
.
在前者中,记录被转发回 DLT 主题,因此它不会阻止其他 DLT 记录的处理。 在后者中,使用者结束执行而不转发消息。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
默认行为是ALWAYS_RETRY_ON_ERROR . |
从版本 2.8.3 开始,ALWAYS_RETRY_ON_ERROR 如果记录导致引发致命异常,则不会将记录路由回 DLT,
例如DeserializationException 因为,通常,此类异常将始终被抛出。 |
被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
您可以使用DestinationTopicResolver
豆。
有关更多信息,请参阅 Exception Classifier。
配置无 DLT
该框架还提供了不为主题配置 DLT 的可能性。 在这种情况下,在用尽重试后,处理将结束。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}
4.2.11. 指定 ListenerContainerFactory
默认情况下,RetryTopic 配置将使用@KafkaListener
注释,但您可以指定一个不同的 Bean Bean 来用于创建 Retry Topic 和 DLT Listener 容器。
对于@RetryableTopic
注解中,您可以提供工厂的 bean 名称,并使用RetryTopicConfiguration
bean,您可以提供 bean 名称或实例本身。
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory(factory)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory("my-retry-topic-factory")
.create(template);
}
从 2.8.3 开始,您可以对可重试和不可重试的主题使用相同的工厂。 |
如果您需要将出厂配置行为恢复到之前的 2.8.3,您可以覆盖configureRetryTopicConfigurer
方法@Configuration
类,该类扩展了RetryTopicConfigurationSupport
如配置全局设置和功能中所述,并将useLegacyFactoryConfigurer
自true
如:
@Override
protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
return rtc -> rtc.useLegacyFactoryConfigurer(true);
}
4.2.12. 在运行时访问 Topic 的信息
从 2.9 开始,您可以通过在运行时通过注入提供的DestinationTopicContainer
豆。
此接口提供了在链中查找下一个主题的方法,或者在 DLT 中查找主题(如果已配置),以及有用的属性,例如主题的名称、延迟和类型。
作为一个实际用例示例,您可以使用此类信息,以便控制台应用程序可以在处理失败的原因(例如错误/不一致状态)得到解决后,将记录从 DLT 重新发送到链中的第一个重试主题。
这DestinationTopic 由DestinationTopicContainer#getNextDestinationTopicFor() method 对应于 input 主题的链中注册的下一个主题。
消息将转发到的实际主题可能会因不同的因素而有所不同,例如异常分类、尝试次数或单主题固定延迟策略。
使用DestinationTopicResolver 界面,如果你需要权衡这些因素。 |
4.2.13. 更改 KafkaBackOffException 日志记录级别
当重试主题中的消息不应使用时,会使用KafkaBackOffException
被抛出。
默认情况下,此类异常记录在DEBUG
级别,但您可以通过在ListenerContainerFactoryConfigurer
在@Configuration
类。
例如,要将日志记录级别更改为 WARN,您可以添加:
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeErrorHandler(defaultErrorHandler ->
defaultErrorHandler.setLogLevel(KafkaException.Level.WARN))
}
4.3. Apache Kafka Streams 支持
从版本 1.1.4 开始, Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。
要从 Spring 应用程序中使用它,kafka-streams
jar 必须存在于 Classpath 中。
它是 Spring for Apache Kafka 项目的可选依赖项,不能传递下载。
4.3.1. 基础
参考 Apache Kafka Streams 文档建议了以下使用 API 的方法:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
所以,我们有两个主要组件:
-
StreamsBuilder
:使用 API 构建KStream
(或KTable
) 实例。 -
KafkaStreams
:管理这些实例的生命周期。
都KStream 实例暴露于KafkaStreams 实例StreamsBuilder 同时启动和停止,即使它们具有不同的逻辑。
换句话说,由StreamsBuilder 与单个生命周期控制相关联。
一旦KafkaStreams 实例已被streams.close() ,则无法重新启动。
相反,一个新的KafkaStreams 实例才能重新启动流处理。 |
4.3.2. Spring 管理
为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了StreamsBuilderFactoryBean
.
这是一个AbstractFactoryBean
实现来公开StreamsBuilder
singleton 实例作为 bean。
下面的示例创建这样的 bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration object 而不是StreamsConfig . |
这StreamsBuilderFactoryBean
还实现了SmartLifecycle
要管理内部KafkaStreams
实例。
与 Kafka Streams API 类似,您必须定义KStream
实例,然后再启动KafkaStreams
.
这也适用于 Spring API for Kafka Streams。
因此,当您使用 defaultautoStartup = true
在StreamsBuilderFactoryBean
,您必须声明KStream
实例StreamsBuilder
在刷新应用程序上下文之前。
例如KStream
可以是常规的 bean 定义,而使用 Kafka Streams API 不会产生任何影响。
以下示例显示了如何执行此作:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果要手动控制生命周期(例如,按某个条件停止和启动),可以引用StreamsBuilderFactoryBean
bean 直接使用工厂 bean () 前缀。
因为&
StreamsBuilderFactoryBean
使用其内部KafkaStreams
实例,则可以安全地停止并再次重新启动它。
新的KafkaStreams
在每个start()
.
您也可以考虑使用不同的StreamsBuilderFactoryBean
实例(如果您想控制KStream
实例。
您还可以指定KafkaStreams.StateListener
,Thread.UncaughtExceptionHandler
和StateRestoreListener
选项StreamsBuilderFactoryBean
,这些 API 被委托给内部的KafkaStreams
实例。
此外,除了间接地打开这些选项之外StreamsBuilderFactoryBean
,从版本 2.1.5 开始,您可以使用KafkaStreamsCustomizer
回调接口配置内部KafkaStreams
实例。
请注意,KafkaStreamsCustomizer
覆盖StreamsBuilderFactoryBean
.
如果您需要执行一些KafkaStreams
作,您可以访问该内部KafkaStreams
实例StreamsBuilderFactoryBean.getKafkaStreams()
.
您可以自动装配StreamsBuilderFactoryBean
bean by type,但你应该确保在 bean 定义中使用 full 类型,如下例所示:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,您也可以添加@Qualifier
for injection by name(如果使用接口 bean 定义)。
以下示例显示了如何执行此作:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 有一个新属性infrastructureCustomizer
带类型KafkaStreamsInfrastructureCustomizer
;这允许自定义StreamsBuilder
(例如,添加状态存储)和/或Topology
在创建流之前。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供默认的 no-op 实现,以避免在不需要一种方法时必须同时实现这两种方法。
一个CompositeKafkaStreamsInfrastructureCustomizer
,用于需要应用多个定制器的情况。
4.3.3. Kafka Streams 千分尺支持
在 2.5.3 版本引入,您可以配置KafkaStreamsMicrometerListener
要自动注册千分尺,请使用KafkaStreams
由工厂 Bean 管理的对象:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
4.3.4. 流 JSON 序列化和反序列化
为了在以 JSON 格式读取或写入主题或状态存储时对数据进行序列化和反序列化, Spring for Apache Kafka 提供了一个JsonSerde
实现,委托给JsonSerializer
和JsonDeserializer
在序列化、反序列化和消息转换中进行了介绍。
这JsonSerde
implementation 通过其构造函数(Target Type 或ObjectMapper
).
在以下示例中,我们使用JsonSerde
要序列化和反序列化Cat
payload 的 GitHub 的JsonSerde
可以在需要实例的地方以类似的方式使用):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。
stream.through(new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
4.3.5. 使用KafkaStreamBrancher
这KafkaStreamBrancher
类引入了一种更方便的方式来在KStream
.
请考虑以下示例,该示例不使用KafkaStreamBrancher
:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用KafkaStreamBrancher
:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
4.3.6. 配置
要配置 Kafka Streams 环境,请使用StreamsBuilderFactoryBean
需要KafkaStreamsConfiguration
实例。
有关所有可能的选项,请参阅 Apache Kafka 文档。
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration object,而不是作为StreamsConfig . |
为了在大多数情况下避免使用样板代码,尤其是在开发微服务时, Spring for Apache Kafka 提供了@EnableKafkaStreams
注解,您应该将其放置在@Configuration
类。
您只需声明一个KafkaStreamsConfiguration
名为defaultKafkaStreamsConfig
.
一个StreamsBuilderFactoryBean
Bean 中名为defaultKafkaStreamsBuilder
)会自动在 application 上下文中声明。
您可以声明并使用任何额外的StreamsBuilderFactoryBean
Beans也是如此。
您可以通过提供实现StreamsBuilderFactoryBeanConfigurer
.
如果有多个这样的 bean,它们将根据其Ordered.order
财产。
默认情况下,当工厂 Bean 停止时,KafkaStreams.cleanUp()
方法。
从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,采用CleanupConfig
对象,该对象具有允许您控制cleanUp()
method 在start()
或stop()
或者两者都不是。
从版本 2.7 开始,默认值是永不清理本地状态。
4.3.7. Header Enricher
版本 3.0 添加了HeaderEnricherProcessor
的扩展ContextualProcessor
;提供与已弃用的HeaderEnricher
它实现了已弃用的Transformer
接口。
这可用于在流处理中添加标头;标头值是 SPEL 表达式;表达式求值的根对象有 3 个属性:
-
record
-这org.apache.kafka.streams.processor.api.Record
(key
,value
,timestamp
,headers
) -
key
- 当前记录的 Key -
value
- 当前记录的值 -
context
-这ProcessorContext
,允许访问当前记录元数据
表达式必须返回byte[]
或String
(将转换为byte[]
用UTF-8
).
要在流中使用扩充器:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改key
或value
;它只是添加标题。
每条记录都需要一个新实例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
下面是一个简单的示例,添加了一个 Literal 标头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
4.3.8.MessagingProcessor
版本 3.0 添加了MessagingProcessor
的扩展ContextualProcessor
;提供与已弃用的MessagingTransformer
它实现了已弃用的Transformer
接口。
这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。
transformer 需要MessagingFunction
.
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring 集成使用其GatewayProxyFactoryBean
.
它还需要一个MessagingMessageConverter
将键、值和元数据(包括标头)与 Spring 消息传递相互转换Message<?>
.
看[从KStream
] 了解更多信息。
4.3.9. 从反序列化异常中恢复
版本 2.3 引入了RecoveringDeserializationExceptionHandler
当发生反序列化异常时,它可以采取一些作。
请参阅 Kafka 文档DeserializationExceptionHandler
,其中RecoveringDeserializationExceptionHandler
是一种实现。
这RecoveringDeserializationExceptionHandler
配置了ConsumerRecordRecoverer
实现。
该框架提供了DeadLetterPublishingRecoverer
这会将失败的记录发送到死信主题。
有关此恢复程序的更多信息,请参阅 Publishing Dead-letter Records。
要配置恢复器,请将以下属性添加到您的 streams 配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer()
bean 可以是你自己的ConsumerRecordRecoverer
.
4.3.10. Kafka Streams 示例
以下示例结合了我们在本章中介绍的所有主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public static class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}
4.4. 测试应用程序
这spring-kafka-test
jar 包含一些有用的实用程序,可帮助测试您的应用程序。
4.4.1. KafkaTestUtils
o.s.kafka.test.utils.KafkaTestUtils
提供了许多 static 帮助程序方法来使用记录、检索各种记录偏移量等。
有关完整详细信息,请参阅其 Javadocs。
4.4.2. JUnit
o.s.kafka.test.utils.KafkaTestUtils
还提供了一些静态方法来设置 producer 和 consumer 属性。
下面的清单显示了这些方法签名:
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
从版本 2.5 开始, 使用嵌入式代理时,通常最佳做法是为每个测试使用不同的主题,以防止串扰。
如果由于某种原因无法做到这一点,请注意, |
一个 JUnit 4@Rule
wrapper 的EmbeddedKafkaBroker
用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。
(@EmbeddedKafka有关使用@EmbeddedKafka
使用 JUnit 5)。
下面的清单显示了这些方法的签名:
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }
/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
这EmbeddedKafkaBroker
class 有一个 Utility 方法,允许您使用它创建的所有主题。
以下示例演示如何使用它:
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
这KafkaTestUtils
具有一些实用程序方法,用于从 Consumer 那里获取结果。
下面的清单显示了这些方法签名:
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
以下示例演示如何使用KafkaTestUtils
:
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
当嵌入式 Kafka 和嵌入式 Zookeeper 服务器由EmbeddedKafkaBroker
中,名为spring.embedded.kafka.brokers
设置为 Kafka 代理的地址,并且名为spring.embedded.zookeeper.connect
设置为 Zookeeper 的地址。
方便的常数 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS
和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT
) 为此属性提供。
而不是 defaultspring.embedded.kafka.brokers
system 属性,Kafka broker 的地址可以暴露给任何任意且方便的属性。
为此,一个spring.embedded.kafka.brokers.property
(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY
) 系统属性。
例如,使用 Spring Boot 时,使用spring.kafka.bootstrap-servers
configuration 属性应分别为自动配置 Kafka 客户端设置。
因此,在随机端口上使用嵌入式 Kafka 运行测试之前,我们可以设置spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers
作为系统属性,并将EmbeddedKafkaBroker
将使用它来公开其代理地址。
现在,这是此属性的默认值(从版本 3.0.10 开始)。
使用EmbeddedKafkaBroker.brokerProperties(Map<String, String>)
中,您可以为 Kafka 服务器提供其他属性。
有关可能的代理属性的更多信息,请参阅 Kafka Config。
4.4.3. 配置主题
以下示例配置创建名为cat
和hat
有 5 个分区时,一个名为thing1
具有 10 个分区,并且有一个名为thing2
有 15 个分区:
public class MyTests {
@ClassRule
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");
@Test
public void test() {
embeddedKafkaRule.getEmbeddedKafka()
.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
默认情况下,addTopics
将在出现问题时抛出异常(例如添加已存在的 Topic)。
版本 2.6 添加了该方法的新版本,该方法返回Map<String, Exception>
;key 是 topic name,值为null
以获得成功,或者Exception
失败。
4.4.4. 对多个测试类使用相同的 broker
您可以将同一个 broker 用于多个测试类,类似于以下内容:
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
return embeddedKafka;
}
private EmbeddedKafkaHolder() {
super();
}
}
这假定 Spring Boot 环境,并且嵌入式代理替换了 bootstrap servers 属性。
然后,在每个测试类中,您可以使用类似于以下内容的内容:
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果您不使用 Spring Boot,则可以使用broker.getBrokersAsString()
.
前面的示例没有提供在所有测试完成后关闭代理的机制。
例如,如果您在 Gradle 守护进程中运行测试,这可能是一个问题。
在这种情况下,您不应该使用此技术,或者您应该使用某些方法来调用destroy() 在EmbeddedKafkaBroker 当您的测试完成时。 |
从版本 3.0 开始,框架公开了GlobalEmbeddedKafkaTestExecutionListener
用于 JUnit 平台;默认情况下,它是禁用的。
这需要 JUnit Platform 1.8 或更高版本。
此侦听器的目的是启动一个全局EmbeddedKafkaBroker
对于整个测试计划,并在计划结束时停止它。
要启用此侦听器,从而为项目中的所有测试提供单个全局嵌入式 Kafka 集群,spring.kafka.global.embedded.enabled
property 必须设置为true
通过系统属性或 JUnit Platform 配置。
此外,还可以提供以下属性:
-
spring.kafka.embedded.count
- 要管理的 Kafka 代理数量; -
spring.kafka.embedded.ports
- 每个 Kafka 代理启动的端口(逗号分隔值),0
如果 random port 是首选端口;值的数量必须等于count
上述; -
spring.kafka.embedded.topics
- 要在启动的 Kafka 集群中创建的主题(逗号分隔值); -
spring.kafka.embedded.partitions
- 要为创建的主题预置的分区数; -
spring.kafka.embedded.broker.properties.location
- 其他 Kafka 代理配置属性的文件位置;此属性的值必须遵循 Spring 资源抽象模式。
从本质上讲,这些属性模拟了一些@EmbeddedKafka
属性。
有关配置属性以及如何提供这些属性的更多信息,请参阅 JUnit 5 用户指南。
例如,spring.embedded.kafka.brokers.property=my.bootstrap-servers
条目可以添加到junit-platform.properties
文件。
从版本 3.0.10 开始,代理会自动将其设置为spring.kafka.bootstrap-servers
,用于使用 Spring Boot 应用程序进行测试。
建议不要将全局嵌入式 Kafka 和 per-test 类合并到单个测试套件中。 它们都共享相同的系统属性,因此很可能会导致意外行为。 |
spring-kafka-test 具有传递依赖项junit-jupiter-api 和junit-platform-launcher (后者支持 Global Embedded Broker)。
如果您希望使用嵌入式代理但不使用 JUnit,则可能希望排除这些依赖项。 |
4.4.5. @EmbeddedKafka 注解
我们通常建议您将规则用作@ClassRule
以避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。
从版本 2.0 开始,如果使用 Spring 的测试应用程序上下文缓存,还可以声明EmbeddedKafkaBroker
bean,因此单个代理可以跨多个测试类使用。
为方便起见,我们提供了一个名为@EmbeddedKafka
要注册EmbeddedKafkaBroker
豆。
以下示例演示如何使用它:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
从版本 2.2.4 开始,您还可以使用@EmbeddedKafka
注解指定 Kafka ports 属性。
以下示例将topics
,brokerProperties
和brokerPropertiesLocation
的属性@EmbeddedKafka
支持属性占位符解析:
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在前面的示例中,属性 placeholders${kafka.topics.another-topic}
,${kafka.broker.logs-dir}
和${kafka.broker.port}
从 Spring 解析Environment
.
此外,代理属性是从broker.properties
classpath 资源由brokerPropertiesLocation
.
属性占位符为brokerPropertiesLocation
URL 以及在资源中找到的任何属性占位符。
属性定义brokerProperties
覆盖 中找到的属性brokerPropertiesLocation
.
您可以使用@EmbeddedKafka
使用 JUnit 4 或 JUnit 5 进行注释。
4.4.6. @EmbeddedKafka 使用 JUnit5 进行注释
从版本 2.3 开始,有两种方法可以使用@EmbeddedKafka
注释。
当与@SpringJunitConfig
注解中,嵌入式代理将添加到 Test Application 上下文中。
您可以在类或方法级别将代理自动连接到测试中,以获取代理地址列表。
当不使用 spring 测试上下文时,EmbdeddedKafkaCondition
创建代理;该条件包括一个参数解析器,因此您可以在测试方法中访问代理...
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
如果类带有 Comments@EmbeddedBroker
也不使用ExtendedWith(SpringExtension.class)
.@SpringJunitConfig
和@SpringBootTest
是如此元注释,并且当这些 Comments 中的任何一个也存在时,将使用基于上下文的 broker。
当有可用的 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要在某处定义了该属性,这些占位符就会被解析。 如果没有可用的 Spring 上下文,则不会解析这些占位符。 |
4.4.7. 嵌入式代理@SpringBootTest
附注
Spring Initializr 现在会自动添加spring-kafka-test
对项目配置的 test 作用域中的依赖项。
如果您的应用程序在
|
在 Spring Boot 应用程序测试中,有几种方法可以使用嵌入式代理。
他们包括:
JUnit4 类规则
以下示例说明如何使用 JUnit4 类规则创建嵌入式代理:
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1,
false, "someTopic")
.brokerListProperty("spring.kafka.bootstrap-servers");
}
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
请注意,由于这是一个 Spring Boot 应用程序,因此我们覆盖 broker list 属性以设置 Boot 的属性。
@EmbeddedKafka
Annotation 或EmbeddedKafkaBroker
豆
以下示例演示如何使用@EmbeddedKafka
Annotation 创建嵌入式代理:
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
这bootstrapServersProperty 会自动设置为spring.kafka.bootstrap-servers ,默认情况下,从版本 3.0.10 开始。 |
4.4.8. Hamcrest 匹配器
这o.s.kafka.test.hamcrest.KafkaMatchers
提供以下匹配器:
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
4.4.9. AssertJ 条件
您可以使用以下 AssertJ 条件:
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
4.4.10. 示例
以下示例汇集了本章中介绍的大部分主题:
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的示例使用 Hamcrest 匹配程序。
跟AssertJ
,最后一部分类似于以下代码:
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
4.4.11. 模拟 Consumer 和 Producer
这kafka-clients
library 提供MockConsumer
和MockProducer
类进行测试。
如果您希望在某些测试中使用这些类,并且带有侦听器容器或KafkaTemplate
分别从 3.0.7 版本开始,框架现在提供MockConsumerFactory
和MockProducerFactory
实现。
这些工厂可以在侦听器容器和模板中使用,而不是默认工厂,后者需要运行(或嵌入)代理。
下面是一个返回单个使用者的简单实现示例:
@Bean
ConsumerFactory<String, String> consumerFactory() {
MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
TopicPartition topicPartition0 = new TopicPartition("topic", 0);
List<TopicPartition> topicPartitions = Arrays.asList(topicPartition0);
Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
.toMap(Function.identity(), tp -> 0L));
consumer.updateBeginningOffsets(beginningOffsets);
consumer.schedulePollTask(() -> {
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
new RecordHeaders(), Optional.empty()));
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
new RecordHeaders(), Optional.empty()));
});
return new MockConsumerFactory(() -> consumer);
}
如果您希望使用并发进行测试,则Supplier
lambda 每次都需要创建一个新实例。
使用MockProducerFactory
,有两个构造函数;一个用于创建简单工厂,另一个用于创建支持事务的工厂。
以下是示例:
@Bean
ProducerFactory<String, String> nonTransFactory() {
return new MockProducerFactory<>(() ->
new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}
@Bean
ProducerFactory<String, String> transFactory() {
MockProducer<String, String> mockProducer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
mockProducer.initTransactions();
return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}
请注意,在第二种情况下,lambda 是BiFunction<Boolean, String>
其中,如果调用方需要事务性生产者,则第一个参数为 true;可选的第二个参数包含事务 ID。
这可以是默认值(如构造函数中提供),也可以被KafkaTransactionManager
(或KafkaTemplate
对于本地事务),如果已配置。
如果您希望使用不同的 ID,则会提供事务 IDMockProducer
基于此值。
如果您在多线程环境中使用创建器,则BiFunction
应返回多个生产者(可能使用ThreadLocal
).
事务MockProducer 必须通过调用initTransaction() . |