This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4!spring-doc.cn

Retry and Dead Letter Processing

By default, when you configure retry (e.g. maxAttemts) and enableDlq in a consumer binding, these functions are performed within the binder, with no participation by the listener container or Kafka consumer.spring-doc.cn

There are situations where it is preferable to move this functionality to the listener container, such as:spring-doc.cn

  • The aggregate of retries and delays will exceed the consumer’s max.poll.interval.ms property, potentially causing a partition rebalance.spring-doc.cn

  • You wish to publish the dead letter to a different Kafka cluster.spring-doc.cn

  • You wish to add retry listeners to the error handler.spring-doc.cn

  • …​spring-doc.cn

To configure moving this functionality from the binder to the container, define a @Bean of type ListenerContainerWithDlqAndRetryCustomizer. This interface has the following methods:spring-doc.cn

/**
 * Configure the container.
 * @param container the container.
 * @param destinationName the destination name.
 * @param group the group.
 * @param dlqDestinationResolver a destination resolver for the dead letter topic (if
 * enableDlq).
 * @param backOff the backOff using retry properties (if configured).
 * @see #retryAndDlqInBinding(String, String)
 */
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
        @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
        @Nullable BackOff backOff);

/**
 * Return false to move retries and DLQ from the binding to a customized error handler
 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
 * configured via
 * {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
 * @param destinationName the destination name.
 * @param group the group.
 * @return false to disable retries and DLQ in the binding
 */
default boolean retryAndDlqInBinding(String destinationName, String group) {
    return true;
}

The destination resolver and BackOff are created from the binding properties (if configured). The KafkaTemplate uses configuration from spring.kafka…​. properties. You can then use these to create a custom error handler and dead letter publisher; for example:spring-doc.cn

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

        @Override
        public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                String group,
                @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                @Nullable BackOff backOff) {

            if (destinationName.equals("topicWithLongTotalRetryConfig")) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }
        }

        @Override
        public boolean retryAndDlqInBinding(String destinationName, String group) {
            return !destinationName.contains("topicWithLongTotalRetryConfig");
        }

    };
}

Now, only a single retry delay needs to be greater than the consumer’s max.poll.interval.ms property.spring-doc.cn

When working with several binders, the 'ListenerContainerWithDlqAndRetryCustomizer' bean gets overridden by the 'DefaultBinderFactory'. For the bean to apply, you need to use a 'BinderCustomizer' to set the container customizer (See [binder-customizer]):spring-doc.cn

@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}