此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

筛选消息

在某些情况下,例如重新平衡,可能会重新传送已处理的消息。 框架无法知道此类消息是否已被处理。 这是一个应用程序级函数。 这被称为幂等接收器模式,Spring 集成提供了它的实现spring-doc.cadn.net.cn

Spring for Apache Kafka 项目还通过FilteringMessageListenerAdapter类,它可以包装您的MessageListener. 此类采用RecordFilterStrategy其中,您可以实现filter方法来表示消息是重复的,应该被丢弃。 这有一个名为ackDiscarded,它指示适配器是否应确认丢弃的记录。 是的false默认情况下。spring-doc.cadn.net.cn

当您使用@KafkaListener中,将RecordFilterStrategy(并且可选ackDiscarded),以便将侦听器包装在适当的筛选适配器中。spring-doc.cadn.net.cn

此外,一个FilteringBatchMessageListenerAdapter,用于使用批处理消息侦听器spring-doc.cadn.net.cn

FilteringBatchMessageListenerAdapter如果您的@KafkaListener接收一个ConsumerRecords<?, ?>而不是List<ConsumerRecord<?, ?>>因为ConsumerRecords是不可变的。

从版本 2.8.4 开始,您可以覆盖侦听器容器工厂的默认值RecordFilterStrategy通过使用filterListener 注解的属性。spring-doc.cadn.net.cn

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}