此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

死信主题处理

启用 DLQ

要启用 DLQ,基于 Kafka Binder 的应用程序必须通过属性spring.cloud.stream.bindings.<binding-name>.group. 匿名使用者组(即应用程序未明确提供组)无法启用 DLQ 功能。spring-doc.cadn.net.cn

当应用程序想要将错误记录发送到 DLQ 主题时,该应用程序必须启用 DLQ 功能,因为默认情况下未启用此功能。 要启用 DLQ,请通过属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq必须设置为 true。spring-doc.cadn.net.cn

启用 DLQ 后,在处理过程中发生错误并且所有重试都将根据spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts属性,则该记录将被发送到 DLQ 主题。spring-doc.cadn.net.cn

默认情况下,max-attempts属性设置为 3。 什么时候max-attemptsproperty 大于1,并且启用了 DLQ,则您将看到重试遵循max-attempts财产。 如果未启用 dlq(这是默认设置),则max-attemptsproperty 对重试的处理方式没有任何影响。 在这种情况下,重试将回退到 Spring for Apache Kafka 中的容器默认值,即10重试。 如果应用程序想要在禁用 DLQ 时完全禁用重试,则设置max-attemptsproperty 设置为1将不起作用。 要在这种情况下完全禁用重试,您需要提供ListenerContainerCustomizer,然后使用适当的Backoff设置。 下面是一个示例。spring-doc.cadn.net.cn

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> {
		var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
		container.setCommonErrorHandler(commonErrorHandler);
	};
}

这样,默认容器行为将被禁用,并且不会尝试重试。 如上所述,启用 DLQ 时,Binder 设置将具有优先权。spring-doc.cadn.net.cn

处理死信主题中的记录

由于框架无法预测用户希望如何处理死信消息,因此它不提供任何标准机制来处理它们。 如果死信的原因是暂时的,您可能希望将消息路由回原始主题。 但是,如果问题是永久性问题,则可能会导致无限循环。 本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在三次尝试后将它们移动到 “parking lot” 主题。 该应用程序是另一个spring-cloud-stream应用程序,它从死信主题中读取。 当 5 秒内未收到任何消息时,它会退出。spring-doc.cadn.net.cn

这些示例假定原始目标是so8400out且 Consumer 组为so8400.spring-doc.cadn.net.cn

有几种策略需要考虑:spring-doc.cadn.net.cn

  • 请考虑仅在主应用程序未运行时运行 rerouting。 否则,暂时性错误的重试将很快用完。spring-doc.cadn.net.cn

  • 或者,使用两阶段方法:使用此应用程序路由到第三个主题,使用另一个应用程序从那里路由回主要主题。spring-doc.cadn.net.cn

以下代码清单显示了示例应用程序:spring-doc.cadn.net.cn

application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
应用
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private StreamBridge streamBridge;

    @Bean
    public Function<Message<?>, Message<?>> reRoute() {
        return failed -> {
            processed.incrementAndGet();
            Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
            if (retries == null) {
                System.out.println("First retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else if (retries < 3) {
                System.out.println("Another retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else {
                System.out.println("Retries exhausted for " + failed);
                streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
            }
            return null;
        };
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, exiting");
                return;
            }
        }
    }
}