处理异常
本节介绍如何处理在使用 Spring for Apache Kafka 时可能出现的各种异常。
侦听器错误处理程序
从版本 2.0 开始,@KafkaListener
annotation 具有一个新属性:errorHandler
.
您可以使用errorHandler
要提供KafkaListenerErrorHandler
实现。
这个功能接口有一个方法,如下面的清单所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问 spring-messagingMessage<?>
对象和侦听器抛出的异常,该异常包装在ListenerExecutionFailedException
.
错误处理程序可以引发原始异常或新异常,该异常将引发到容器中。
错误处理程序返回的任何内容都将被忽略。
从版本 2.7 开始,您可以设置rawRecordHeader
属性MessagingMessageConverter
和BatchMessagingMessageConverter
这会导致 RAWConsumerRecord
添加到已转换的Message<?>
在KafkaHeaders.RAW_DATA
页眉。
这很有用,例如,如果您希望使用DeadLetterPublishingRecoverer
在侦听器错误处理程序中。
它可能用于请求/回复方案,即您希望在一定次数的重试后,在死信主题中捕获失败的记录后,将失败结果发送给发件人。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口 (ConsumerAwareListenerErrorHandler
) 访问消费者对象,通过以下方法访问该对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口 (ManualAckListenerErrorHandler
) 提供对Acknowledgment
使用手动时的对象AckMode
s.
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何一种情况下,您都不应该对使用者执行任何查找,因为容器不会知道它们。
容器错误处理程序
从版本 2.8 开始,旧版ErrorHandler
和BatchErrorHandler
interfaces 已被新的CommonErrorHandler
.
这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。CommonErrorHandler
提供了替换大多数旧版框架错误处理程序实现的实现。
看将自定义旧版错误处理程序实现迁移到CommonErrorHandler
有关将自定义错误处理程序迁移到 的信息CommonErrorHandler
.
使用事务时,默认情况下不会配置错误处理程序,以便异常将回滚事务。
事务容器的错误处理由AfterRollbackProcessor
.
如果您在使用事务时提供自定义错误处理程序,并且您希望回滚事务,它必须引发异常。
此接口有一个 default 方法isAckAfterHandle()
它由容器调用,以确定如果错误处理程序返回而不引发异常,是否应提交偏移量;默认情况下,它返回 true。
通常,当错误未被“处理”时(例如,在执行 seek作之后),框架提供的错误处理程序将引发异常。
默认情况下,此类异常由容器记录在ERROR
水平。
所有框架错误处理程序都扩展了KafkaExceptionLogLevelAware
,它允许您控制记录这些异常的级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定要用于容器工厂中所有侦听器的全局错误处理程序。 以下示例显示了如何执行此作:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带注释的侦听器方法引发异常,则会将其抛出到容器中,并根据容器配置处理消息。
容器在调用错误处理程序之前提交任何待处理的偏移量提交。
如果您使用的是 Spring Boot,则只需将错误处理程序添加为@Bean
和 Boot 会将其添加到自动配置的工厂中。
退避处理程序
错误处理程序(如 DefaultErrorHandler)使用BackOff
以确定在重试投放之前要等待的时间。
从版本 2.9 开始,您可以配置自定义BackOffHandler
.
默认处理程序只是暂停线程,直到 back off 时间过去(或容器停止)。
该框架还提供了ContainerPausingBackOffHandler
这将暂停侦听器容器,直到 Back Off 时间过去,然后恢复容器。
当延迟时间超过max.poll.interval.ms
consumer 属性。
请注意,实际回退时间的分辨率会受到pollTimeout
container 属性。
DefaultErrorHandler
这个新的错误处理程序将SeekToCurrentErrorHandler
和RecoveringBatchErrorHandler
,它们现在已成为多个版本的默认错误处理程序。
一个区别是批处理侦听器的回退行为(当BatchListenerFailedException
)等效于 Retrying Complete Batches。
从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与查找未处理的记录偏移量相同的语义,如下所述,但实际上没有查找。
相反,记录由侦听器容器保留,并在错误处理程序退出后(以及在执行单个暂停poll() ,以保持使用者的活力;如果 Non-Blocking Retries 或ContainerPausingBackOffHandler 正在使用,则暂停可能会延伸到多个轮询)。
错误处理程序将结果返回到容器,指示是否可以重新提交当前失败的记录,或者它是否已恢复,然后不会再次将其发送到侦听器。
要启用此模式,请设置属性seekAfterError 自false . |
错误处理程序可以恢复 (跳过) 不断失败的记录。
默认情况下,在 10 次失败后,将记录失败的记录(在ERROR
级别)。
您可以使用自定义 recoverer (BiConsumer
) 和BackOff
,它控制每个尝试之间的传递尝试和延迟。
使用FixedBackOff
跟FixedBackOff.UNLIMITED_ATTEMPTS
导致 (有效) 无限重试。
以下示例配置三次尝试后的恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。
例如,使用@KafkaListener
container factory 中,您可以添加DefaultErrorHandler
如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录侦听器,这将重试传输最多 2 次(3 次传输尝试),后退 1 秒,而不是默认配置 (FixedBackOff(0L, 9)
).
在重试次数用尽后,只会记录失败。
例如,如果poll
返回 6 条记录(每个分区 0、1、2 各 2 条),侦听器在第 4 条记录上引发异常,容器通过提交前三条消息的偏移量来确认前三条消息。
这DefaultErrorHandler
寻求分区 1 的偏移量 1 和分区 2 的偏移量 0。
下一个poll()
返回 3 条未处理的记录。
如果AckMode
是BATCH
,容器会在调用错误处理程序之前提交前两个分区的偏移量。
对于批处理侦听器,侦听器必须抛出一个BatchListenerFailedException
指示批处理中的哪些记录失败。
事件的顺序是:
-
在索引之前提交记录的偏移量。
-
如果重试次数未用尽,则执行 seek作,以便重新传送所有剩余记录(包括失败的记录)。
-
如果重试次数已用尽,请尝试恢复失败的记录(仅限默认日志)并执行查找,以便重新传递剩余记录(不包括失败的记录)。 已提交已恢复记录的偏移量。
-
如果重试已用尽且恢复失败,则执行查找,就像重试未用尽一样。
从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与上面讨论的查找未处理的记录偏移量相同的语义,但实际上没有查找。
相反,错误处理程序会创建一个新的ConsumerRecords<?, ?> 仅包含未处理的记录,这些记录随后将提交给侦听器(在执行单个暂停poll() ,以保持使用者处于活动状态)。
要启用此模式,请设置属性seekAfterError 自false . |
默认 recoverer 会在重试次数用尽后记录失败的记录。
您可以使用自定义恢复器,或者框架提供的恢复器,例如DeadLetterPublishingRecoverer
.
当使用 POJO 批处理侦听器(例如List<Thing>
),并且您没有完整的 consumer 记录要添加到异常中,则只需添加失败记录的索引即可:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置了AckMode.MANUAL_IMMEDIATE
,则可以将错误处理程序配置为提交已恢复记录的偏移量;将commitRecovered
property 设置为true
.
另请参阅发布死信记录。
使用事务时,类似的功能由DefaultAfterRollbackProcessor
.
请参阅 After-rollback Processor (回滚后处理器)。
这DefaultErrorHandler
将某些异常视为致命异常,并且对于此类异常,将跳过重试;在第一次失败时调用 recoverer。
默认情况下,被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
,因为这些异常不太可能在重试的投放中得到解决。
您可以向 not-retryable 类别添加更多异常类型,或完全替换分类异常的映射。
请参阅 Javadocs 以获取DefaultErrorHandler.addNotRetryableException()
和DefaultErrorHandler.setClassifications()
有关更多信息,以及spring-retry
BinaryExceptionClassifier
.
下面是一个将IllegalArgumentException
对于不可重试的异常:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以配置一个或多个RetryListener
s,接收重试和恢复进度的通知。
从版本 2.8.10 开始,添加了批处理侦听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 JavaDocs。
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
如果 recoverer 发生故障,则BackOff 将默认重置,并且重新投放将再次通过回退,然后再再次尝试恢复。
要在恢复失败后跳过重试,请将错误处理程序的resetStateOnRecoveryFailure 自false . |
您可以为错误处理程序提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
要确定BackOff
若要根据失败的记录和/或异常使用:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回null
,则处理程序的默认值BackOff
将被使用。
设置resetStateOnExceptionChange
自true
并且重试序列将重新启动(包括选择新的BackOff
(如果已配置)如果异常类型在两次失败之间发生更改。
什么时候false
(版本 2.9 之前的默认值),则不考虑异常类型。
从版本 2.9 开始,现在是true
默认情况下。
另请参阅 Delivery Attempts 标头。
使用批处理错误处理程序的转换错误
从版本 2.8 开始,批处理侦听器现在可以正确处理转换错误,当使用MessageConverter
替换为ByteArrayDeserializer
一个BytesDeserializer
或StringDeserializer
以及DefaultErrorHandler
.
当发生转换错误时,有效负载将设置为 null,并将反序列化异常添加到记录标头中,类似于ErrorHandlingDeserializer
.
一个ConversionException
s 在侦听器中可用,因此侦听器可以抛出BatchListenerFailedException
指示发生 conversion 异常的第一个索引。
例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整批处理
现在是DefaultErrorHandler
对于批处理侦听器,其中侦听器抛出除BatchListenerFailedException
.
无法保证在重新交付批次时,该批次具有相同的记录数和/或重新交付的记录具有相同的顺序。
因此,不可能轻松维护批处理的重试状态。
这FallbackBatchErrorHandler
采用以下方法。
如果批处理侦听器引发的异常不是BatchListenerFailedException
,则从内存中的记录批次执行重试。
为了避免在延长的重试序列期间发生重新平衡,错误处理程序会暂停使用者,在每次重试时在休眠前轮询使用者以进行回退,然后再次调用侦听器。
如果/当重试次数用尽时,ConsumerRecordRecoverer
为批处理中的每条记录调用。
如果 recoverer 引发异常,或者线程在休眠期间中断,则将在下一次轮询时重新传递该批记录。
在退出之前,无论结果如何,使用者都会恢复。
此机制不能用于事务。 |
在等待BackOff
interval 中,错误处理程序将以短暂的 sleep 循环,直到达到所需的延迟,同时检查容器是否已停止,从而允许 sleep 在stop()
而不是造成延迟。
容器停止错误处理程序
这CommonContainerStoppingErrorHandler
如果侦听器引发异常,则停止容器。
对于记录侦听器,当AckMode
是RECORD
,则提交已处理记录的偏移量。
对于记录侦听器,当AckMode
是任何手动值,则会提交已确认记录的偏移量。
对于记录侦听器,当AckMode
是BATCH
,或者对于批处理侦听器,则在重新启动容器时重放整个批处理。
容器停止后,将ListenerExecutionFailedException
被抛出。
这是为了使事务回滚(如果启用了事务)。
委派错误处理程序
这CommonDelegatingErrorHandler
可以委托给不同的错误处理程序,具体取决于异常类型。
例如,您可能希望调用DefaultErrorHandler
对于大多数例外情况,或者CommonContainerStoppingErrorHandler
对于其他人来说。
所有委托必须共享相同的兼容属性 (ackAfterHandle
,seekAfterError
…).
对 Record 和 Batch 侦听器使用不同的常见错误处理程序
如果您希望对记录和批处理侦听器使用不同的错误处理策略,则CommonMixedErrorHandler
允许为每个侦听器类型配置特定的错误处理程序。
常见错误处理程序摘要
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
旧版错误处理程序及其替换
旧版错误处理程序 | 更换 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
不可更换、使用 |
|
|
|
无替代品,使用 |
将自定义旧版错误处理程序实现迁移到CommonErrorHandler
请参阅 JavaDocsCommonErrorHandler
.
要将ErrorHandler
或ConsumerAwareErrorHandler
implementation 中,您应该实现handleOne()
然后离开seeksAfterHandle()
返回false
(默认)。
您还应该实现handleOtherException()
处理发生在记录处理范围之外的异常(例如,消费者错误)。
要将RemainingRecordsErrorHandler
implementation 中,您应该实现handleRemaining()
并覆盖seeksAfterHandle()
返回true
(错误处理程序必须执行必要的查找)。
您还应该实现handleOtherException()
- 处理发生在记录处理范围之外的异常(例如,消费者错误)。
要替换任何BatchErrorHandler
implementation 中,您应该实现handleBatch()
您还应该实现handleOtherException()
- 处理发生在记录处理范围之外的异常(例如,消费者错误)。
回滚处理器之后
使用事务时,如果侦听器引发异常(并且错误处理程序(如果存在)引发异常),则事务将回滚。
默认情况下,任何未处理的记录(包括失败的记录)都会在下次轮询时重新获取。
这是通过执行seek
作中的DefaultAfterRollbackProcessor
.
使用批处理侦听器,将重新处理整个记录批次(容器不知道批处理中的哪条记录失败)。
要修改此行为,您可以使用自定义AfterRollbackProcessor
.
例如,对于基于记录的侦听器,您可能希望跟踪失败的记录,并在尝试一定次数后放弃,也许是通过将其发布到死信主题。
从版本 2.2 开始,DefaultAfterRollbackProcessor
现在可以恢复 (跳过) 不断失败的记录。
默认情况下,在 10 次失败后,将记录失败的记录(在ERROR
级别)。
您可以使用自定义恢复器 (BiConsumer
) 和最大失败次数。
设置maxFailures
属性设置为负数会导致无限次重试。
以下示例配置三次尝试后的恢复:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当您不使用事务时,您可以通过配置DefaultErrorHandler
.
请参阅 容器错误处理程序。
从版本 3.2 开始,Recovery 现在可以恢复(跳过)一直失败的整批记录。
设置ContainerProperties.setBatchRecoverAfterRollback(true)
以启用此功能。
默认行为,批处理侦听器无法进行恢复,因为框架不知道批处理中的哪些记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。 |
另请参阅发布死信记录。
从版本 2.2.5 开始,DefaultAfterRollbackProcessor
可以在新事务中调用(在失败的事务回滚后启动)。
然后,如果您使用的是DeadLetterPublishingRecoverer
要发布失败的记录,处理器会将 Recovered 记录在原始 topic/partition 中的 offset 发送到 transaction。
要启用此功能,请将commitRecovered
和kafkaTemplate
属性DefaultAfterRollbackProcessor
.
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
从版本 2.5.5 开始,如果 recoverer 失败,则BackOff 将默认重置,并且重新投放将再次通过回退,然后再再次尝试恢复。
在早期版本中,BackOff 未重置,并在下次失败时重新尝试恢复。
要恢复到之前的行为,请将处理器的resetStateOnRecoveryFailure property 设置为false . |
从版本 2.6 开始,您现在可以为处理器提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
要确定BackOff
若要根据失败的记录和/或异常使用:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回null
,则处理器的默认值BackOff
将被使用。
从版本 2.6.3 开始,将resetStateOnExceptionChange
自true
并且重试序列将重新启动(包括选择新的BackOff
(如果已配置)如果异常类型在两次失败之间发生更改。
默认情况下,不考虑异常类型。
从版本 2.3.1 开始,类似于DefaultErrorHandler
这DefaultAfterRollbackProcessor
将某些异常视为致命异常,并且对于此类异常,将跳过重试;在第一次失败时调用 recoverer。
默认情况下,被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
,因为这些异常不太可能在重试的投放中得到解决。
您可以向 not-retryable 类别添加更多异常类型,或完全替换分类异常的映射。
请参阅 Javadocs 以获取DefaultAfterRollbackProcessor.setClassifications()
有关更多信息,以及spring-retry
BinaryExceptionClassifier
.
下面是一个将IllegalArgumentException
对于不可重试的异常:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参阅 Delivery Attempts 标头。
使用电流kafka-clients ,容器无法检测ProducerFencedException 是由再平衡引起的,或者如果生产者的transactional.id 已因超时或到期而被撤销。
因为,在大多数情况下,它是由再平衡引起的,所以容器不会调用AfterRollbackProcessor (因为寻找分区是不合适的,因为我们不再被分配它们)。
如果您确保超时时间足够大,可以处理每个事务并定期执行“空”事务(例如,通过ListenerContainerIdleEvent ),您可以避免由于超时和过期而导致的屏蔽。
或者,您可以将stopContainerWhenFenced container 属性设置为true 容器将停止,避免记录丢失。
您可以使用ConsumerStoppedEvent 并检查Reason 属性FENCED 以检测此情况。
由于该事件还引用了容器,因此您可以使用此事件重新启动容器。 |
从版本 2.7 开始,在等待BackOff
interval 中,错误处理程序将以短暂的 sleep 循环,直到达到所需的延迟,同时检查容器是否已停止,从而允许 sleep 在stop()
而不是造成延迟。
从版本 2.7 开始,处理器可以配置一个或多个RetryListener
s,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 JavaDocs。
Delivery Attempts 标头
以下内容仅适用于记录侦听器,不适用于批处理侦听器。
从版本 2.5 开始,当使用ErrorHandler
或AfterRollbackProcessor
实现DeliveryAttemptAware
,则可以启用添加KafkaHeaders.DELIVERY_ATTEMPT
标头 (kafka_deliveryAttempt
) 添加到记录中。
此标头的值是从 1 开始的递增整数。
当接收到原始的ConsumerRecord<?, ?>
整数位于byte[4]
.
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
使用@KafkaListener
使用DefaultKafkaHeaderMapper
或SimpleKafkaHeaderMapper
,可以通过添加@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
作为 listener 方法的参数。
要启用此标头的填充,请设置 container 属性deliveryAttemptHeader
自true
.
默认情况下,它是禁用的,以避免查找每条记录的状态并添加标头的 (小) 开销。
这DefaultErrorHandler
和DefaultAfterRollbackProcessor
支持此功能。
批量侦听器的 Delivery Attempts Header
处理时ConsumerRecord
使用BatchListener
这KafkaHeaders.DELIVERY_ATTEMPT
header 可以以不同的方式出现SingleRecordListener
.
从版本 3.3 开始,如果要将KafkaHeaders.DELIVERY_ATTEMPT
标头添加到ConsumerRecord
使用BatchListener
中,将DeliveryAttemptAwareRetryListener
作为RetryListener
在ErrorHandler
.
请参考下面的代码。
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
然后,每当批处理无法完成时,DeliveryAttemptAwareRetryListener
将注入一个KafkaHeaders.DELIVERY_ATTMPT
标头添加到ConsumerRecord
.
侦听器信息报头
在某些情况下,能够知道侦听器在哪个容器中运行非常有用。
从版本 2.8.4 开始,您现在可以设置listenerInfo
属性,或将info
属性@KafkaListener
注解。
然后,容器会将此 API 添加到KafkaListener.LISTENER_INFO
所有传入邮件的标头;然后,它可以用于 Record interceptor、filters 等,或者用于侦听器本身。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
当用于RecordInterceptor
或RecordFilterStrategy
实现中,标头在 Consumer Record 中作为字节数组,使用KafkaListenerAnnotationBeanPostProcessor
的charSet
财产。
标头映射器还会转换为String
创建MessageHeaders
从使用者记录中,并且从不将此标头映射到出站记录上。
对于 POJO 批处理侦听器,从版本 2.8.6 开始,标头被复制到批处理的每个成员中,并且也可用作单个String
参数。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理侦听器具有过滤器,并且过滤器导致空批次,则需要添加required = false 到@Header 参数,因为该信息不可用于空批处理。 |
如果您收到List<Message<Thing>>
该信息位于KafkaHeaders.LISTENER_INFO
标头Message<?>
.
有关使用批处理的更多信息,请参阅批处理侦听器。
发布死信记录
您可以配置DefaultErrorHandler
和DefaultAfterRollbackProcessor
当达到记录的最大失败次数时,使用 Record Recoverer。
该框架提供了DeadLetterPublishingRecoverer
,这会将失败的消息发布到另一个主题。
恢复器需要一个KafkaTemplate<Object, Object>
,用于发送记录。
您还可以选择使用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
,调用该函数用于解析目标 topic 和 partition。
默认情况下,死信记录将发送到名为<originalTopic>-dlt (原始主题名称后缀为-dlt ) 并添加到与原始记录相同的分区。
因此,当您使用默认解析程序时,死信主题必须至少具有与原始主题一样多的分区。
|
如果返回的TopicPartition
具有负分区,则分区未在ProducerRecord
,因此 Kafka 会选择该分区。
从版本 2.2.4 开始,任何ListenerExecutionFailedException
(例如,在@KafkaListener
方法)通过groupId
财产。
这允许目标解析器使用它,以及ConsumerRecord
以选择 Dead letter 主题。
以下示例显示如何连接自定义目标解析程序:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录通过以下标头进行增强:
-
KafkaHeaders.DLT_EXCEPTION_FQCN
:Exception 类名(通常为ListenerExecutionFailedException
,但可以是其他)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
:异常原因类名(如果存在)(自版本 2.8 起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
:异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
:异常类名(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:异常堆栈跟踪(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:异常消息(仅限键反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC
:原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION
:原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
:原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
:处理记录失败的原始消费者组(自 2.8 版本起)。
Key 异常仅由DeserializationException
s 所以没有DLT_KEY_EXCEPTION_CAUSE_FQCN
.
有两种机制可以添加更多标头。
-
将 recoverer 和 override 子类化
createProducerRecord()
-叫super.createProducerRecord()
并添加更多标头。 -
提供
BiFunction
要接收 Consumer 记录和异常,请返回Headers
对象;从那里的 Headers 将被复制到最终的 producer 记录;另请参阅 管理死信记录标头。 用setHeadersFunction()
要设置BiFunction
.
第二个版本更易于实现,但第一个版本具有更多信息,包括已组装的标准标头。
从版本 2.3 开始,当与ErrorHandlingDeserializer
,发布者将恢复记录value()
,则设置为无法反序列化的原始值。
以前,value()
为 null,并且用户代码必须解码DeserializationException
从消息标头。
此外,您还可以提供多个KafkaTemplate
s 给出版商;例如,如果要发布byte[]
从DeserializationException
,以及使用与成功反序列化的记录不同的序列化程序的值。
以下是使用KafkaTemplate
使用String
和byte[]
序列化器:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来查找适合value()
即将发布。
一个LinkedHashMap
,以便按顺序检查键。
发布时null
值,并且有多个模板,则 recoverer 将查找Void
类;如果不存在,则values().iterator()
将被使用。
从 2.7 开始,您可以使用setFailIfSendResultIsError
方法,以便在消息发布失败时引发异常。
您还可以使用setWaitForSendResultTimeout
.
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
从版本 2.5.5 开始,如果 recoverer 失败,则BackOff 将默认重置,并且重新投放将再次通过回退,然后再再次尝试恢复。
在早期版本中,BackOff 未重置,并在下次失败时重新尝试恢复。
要恢复到之前的行为,请将错误处理程序的resetStateOnRecoveryFailure property 设置为false . |
从版本 2.6.3 开始,将resetStateOnExceptionChange
自true
并且重试序列将重新启动(包括选择新的BackOff
(如果已配置)如果异常类型在两次失败之间发生更改。
默认情况下,不考虑异常类型。
从版本 2.3 开始,recoverer 也可以与 Kafka Streams 一起使用 - 有关更多信息,请参阅从反序列化异常中恢复。
这ErrorHandlingDeserializer
在 Headers 中添加反序列化异常ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
和ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
(使用 Java 序列化)。
默认情况下,这些标头不会保留在发布到死信主题的邮件中。
从版本 2.7 开始,如果 key 和 value 都失败了反序列化,则两者的原始值都会填充到发送到 DLT 的记录中。
如果传入的记录彼此依赖,但可能无序到达,那么将失败的记录重新发布到原始主题的尾部(多次)可能很有用,而不是将其直接发送到死信主题。 有关示例,请参阅此 Stack Overflow 问题。
以下错误处理程序配置将完全执行此作:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,recoverer 检查目标解析器选择的分区是否确实存在。
如果该分区不存在,则ProducerRecord
设置为null
,允许KafkaProducer
以选择分区。
您可以通过设置verifyPartition
property 设置为false
.
从版本 3.1 开始,将logRecoveryRecord
property 设置为true
将记录恢复记录和异常。
管理死信记录标头
-
appendOriginalHeaders
(默认true
) -
stripPreviousExceptionHeaders
(默认true
自 2.8 版本起)
Apache Kafka 支持多个同名的标头;要获取 “latest” 值,您可以使用headers.lastHeader(headerName)
;要获取多个标头的迭代器,请使用headers.headers(headerName).iterator()
.
当重复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为RecordTooLargeException
);对于异常标头尤其如此,尤其是对于 Stack Trace 标头。
使用这两个属性的原因是,虽然您可能希望只保留最后一个异常信息,但您可能希望保留记录在每次失败时传递的主题的历史记录。
appendOriginalHeaders
应用于所有名为ORIGINAL
而stripPreviousExceptionHeaders
应用于所有名为EXCEPTION
.
从版本 2.8.4 开始,您现在可以控制将哪些标准标头添加到输出记录中。
请参阅enum HeadersToAdd
对于默认添加的(当前)10 个标准头文件的通用名称(这些不是实际的头文件名称,只是一个抽象;实际的头文件名称由getHeaderNames()
子类可以覆盖的方法。
要排除标头,请使用excludeHeaders()
方法;例如,要禁止在标头中添加异常堆栈跟踪,请使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加ExceptionHeadersCreator
;这也会禁用所有标准异常标头。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从版本 2.8.4 开始,您现在可以通过addHeadersFunction
方法。
这允许应用其他函数,即使已经注册了另一个函数,例如,在使用非阻塞重试时。
另请参阅使用非阻塞重试的 Failure Header Management。
ExponentialBackOffWithMaxRetries
实现
Spring Framework 提供了许多BackOff
实现。
默认情况下,ExponentialBackOff
将无限期重试;要在重试尝试一定次数后放弃,需要计算maxElapsedTime
.
从版本 2.7.3 开始, Spring for Apache Kafka 提供了ExponentialBackOffWithMaxRetries
它是一个子类,它接收maxRetries
属性并自动计算maxElapsedTime
,这样方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
此作将在1, 2, 4, 8, 10, 10
秒,然后再调用 recoverer。