此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

错误处理

Apache Kafka Streams 提供了本机处理反序列化错误异常的功能。 有关此支持的详细信息,请参阅此处。 Apache Kafka Streams 提供了两种开箱即用的反序列化异常处理程序 -LogAndContinueExceptionHandlerLogAndFailExceptionHandler. 顾名思义,前者将记录错误并继续处理下一条记录,而后者将记录错误并失败。LogAndFailExceptionHandler是默认的反序列化异常处理程序。spring-doc.cadn.net.cn

在 Binder 中处理反序列化异常

Kafka Streams Binder 允许使用以下属性指定上述反序列化异常处理程序。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述两个反序列化异常处理程序之外,Binder 还提供了第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。 以下是启用此 DLQ 异常处理程序的方法。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

当设置了上述属性时,所有 disserialization error 中的记录都会自动发送到 DLQ 主题。spring-doc.cadn.net.cn

您可以设置发布 DLQ 消息的主题名称,如下所示。spring-doc.cadn.net.cn

您可以为DlqDestinationResolver这是一个功能性接口。DlqDestinationResolver需要ConsumerRecord和异常作为输入,然后允许指定主题名称作为输出。 通过访问 KafkaConsumerRecord,可以在BiFunction.spring-doc.cadn.net.cn

以下是为DlqDestinationResolver.spring-doc.cadn.net.cn

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在 为DlqDestinationResolver是 Binder 中的 Provisioner 不会自动为应用程序创建主题。 这是因为 Binders 无法推断实现可能发送到的所有 DLQ 主题的名称。 因此,如果您使用此策略提供 DLQ 名称,则应用程序有责任确保事先创建这些主题。spring-doc.cadn.net.cn

如果DlqDestinationResolver作为 Bean 存在于应用程序中,则具有更高的优先级。 如果您不想遵循此方法,而是使用 configuration 提供静态 DLQ 名称,则可以设置以下属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果设置了此选项,则错误记录将发送到主题custom-dlq. 如果应用程序未使用上述任一策略,则它将创建一个名称为error.<input-topic-name>.<application-id>. 例如,如果绑定的目标主题是inputTopic应用程序 ID 为process-applicationId,则默认 DLQ 主题为error.inputTopic.process-applicationId. 如果您打算启用 DLQ,则始终建议为每个输入绑定显式创建一个 DLQ 主题。spring-doc.cadn.net.cn

每个输入使用者绑定的 DLQ

物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。 这意味着,如果同一应用程序中有多个函数,则此属性将应用于所有函数。 但是,如果单个处理器中有多个处理器或多个输入绑定,则可以使用 Binders 为每个输入使用者绑定提供的更精细的 DLQ 控件。spring-doc.cadn.net.cn

如果您有以下处理器,spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

并且您只想在第一个输入绑定上启用 DLQ,在第二个绑定上启用 skipAndContinue,那么您可以在使用者上执行此作,如下所示。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinuespring-doc.cadn.net.cn

以这种方式设置反序列化异常处理程序的优先级高于在 Binder 级别设置。spring-doc.cadn.net.cn

DLQ 分区

默认情况下,使用与原始记录相同的分区将记录发布到 Dead-Letter 主题。 这意味着 Dead-Letter 主题必须至少具有与原始记录一样多的分区。spring-doc.cadn.net.cn

要更改此行为,请添加DlqPartitionFunctionimplementation 作为@Bean添加到应用程序上下文中。 只能存在一个这样的 bean。 该函数随 consumer 组(大多数情况下与应用程序 ID 相同)提供,失败的ConsumerRecord和例外。 例如,如果您始终希望路由到分区 0,则可以使用:spring-doc.cadn.net.cn

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions属性设置为 1(并且 Binder 的minPartitionCount等于1),则无需提供DlqPartitionFunction;框架将始终使用分区 0。 如果将使用者绑定的dlqPartitionsproperty 的值设置为1(或 Binder 的minPartitionCount大于1),您必须提供DlqPartitionFunctionbean,即使分区计数与原始主题的分区计数相同。

在 Kafka Streams Binder 中使用异常处理功能时,需要记住几点。spring-doc.cadn.net.cn

  • 物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。 这意味着,如果同一应用程序中有多个函数,则此属性将应用于所有函数。spring-doc.cadn.net.cn

  • 反序列化的异常处理与本机反序列化和框架提供的消息转换一致。spring-doc.cadn.net.cn

在 Binder 中处理 Production 异常

与上述对反序列化异常处理程序的支持不同,Binders 不提供此类处理 生产异常的一类机制。 但是,您仍然可以使用StreamsBuilderFactoryBeanCustomizer,您可以在下面的后续部分中找到有关的更多详细信息。spring-doc.cadn.net.cn

运行时错误处理

当涉及到处理来自应用程序代码的错误时,即来自业务逻辑执行的错误,通常由应用程序来处理。 因为,Kafka Streams Binder 无法干扰应用程序代码。 但是,为了使应用程序更轻松一些,Binder 提供了一个方便的RecordRecoverableProcessor,使用 it,您可以指定要如何处理应用程序级别的错误。spring-doc.cadn.net.cn

请考虑以下代码。spring-doc.cadn.net.cn

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .map(...);
}

如果您的mapcall 会抛出一个异常,你有责任处理该错误。 这是RecordRecoverableProcessor变得很方便。 默认情况下,RecordRecoverableProcessor)将简单地记录错误并让应用程序继续。 假设您希望将失败的记录发布到 DLT,而不是在应用程序中处理它。 在这种情况下,您必须使用RecordRecoverableProcessorDltAwareProcessor. 这是你如何做到这一点。spring-doc.cadn.net.cn

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
    return input -> input
        .process(() -> new DltAwareProcessor<>(record -> {
					throw new RuntimeException("error");
				}, "hello-dlt-1", dltPublishingContext));
}

原始mapcall now 已作为 的一部分被移动KStream#processmethod 调用,它接受ProcessorSupplier. 然后,我们传入自定义DltAwareProcessor,它能够发布到 DLT。 的构造函数DltAwareProcessor上面采用三个参数 - 一个Function,它接受输入记录,然后是业务逻辑作作为Functionbody、DLT 主题,最后是DltPublishingContext. 当Function’s lambda expression throws an exception, the `DltAwareProcessor将输入记录发送到 DLT。 这DltPublishingContext提供DltAwareProcessor必要的发布基础结构 Bean。 这DltPublishingContext由 Binder 自动配置,以便您可以将其直接注入到应用程序中。spring-doc.cadn.net.cn

如果您不希望 Binder 将失败的记录发布到 DLT,则必须使用RecordRecoverableProcessor直接而不是DltAwareProcessor. 您可以提供自己的恢复程序作为BiConsumer接受Record和 exception 作为参数。 假设这样一个场景:您不想将记录发送到 DLT,而只是记录消息并继续。 下面是如何实现该目标的示例。spring-doc.cadn.net.cn

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .process(() -> new RecordRecoverableProcessor<>(record -> {
					throw new RuntimeException("error");
				},
                (record, exception) -> {
                  // Handle the record
                }));
}

在这种情况下,当记录失败时,RecordRecoverableProcessor使用用户提供的 recoverer,它是一个BiConsumer将失败的记录和引发的异常作为参数。spring-doc.cadn.net.cn

在 DltAwareProcessor 中处理记录键

使用DltAwareProcessor,如果要将记录键发送到 DLT 主题,则需要在 DLT 绑定上设置适当的序列化程序。 这是因为,DltAwareProcessor使用StreamBridge它使用常规的 Kafka Binder(基于消息通道),默认情况下使用ByteArraySerializer用于键。 对于记录值,Spring Cloud Stream 将有效负载转换为适当的byte[];但是,键并非如此,因为它只是将标头中收到的内容作为键传递。 如果您提供的是非字节数组键,则可能会导致类强制转换异常,为避免这种情况,您需要在 DLT 绑定上设置序列化程序,如下所示。spring-doc.cadn.net.cn

假设 DLT 目标是hello-dlt-1记录键为 String 数据类型。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer