对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
新增功能
自 3.1 以来 3.2 中的新增功能
本节介绍从版本 3.1 到版本 3.2 所做的更改。 有关早期版本的更改,请参阅更改历史记录。
Kafka 客户端版本
此版本需要 3.7.0kafka-clients
.
3.7.0 版本的 Kafka 客户端引入了新的消费组协议。
有关更多详细信息及其限制,请参阅 KIP-848。
新的使用者组协议是早期访问版本,不打算在生产中使用。
在此版本中,仅建议用于测试目的。
因此, Spring for Apache Kafka 仅在kafka-client
本身。
默认情况下, Spring for Apache Kafka 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过group.protocol
属性。
测试支持更改
这kraft
模式在EmbeddedKafka
默认情况下,想要使用kraft
mode 必须启用它。
这是由于在使用EmbeddedKafka
在kraft
模式,尤其是在测试新的消费者组协议时。
新的消费组协议仅在kraft
模式,因此,在测试新协议时,需要针对真实的 Kafka 集群进行,而不是基于KafkaClusterTestKit
哪EmbeddedKafka
所依据的。
此外,在运行多个KafkaListener
方法与EmbeddedKafka
在kraft
模式。
在这些问题得到解决之前,kraft
默认开启EmbeddedKafka
将保持为false
.
Kafka Streams 交互式查询支持
新的 APIKafkaStreamsInteractiveQuerySupport
用于访问 Kafka Streams 交互式查询中使用的可查询存储。
有关更多详细信息,请参阅 Kafka Streams Interactive Support 。
TransactionIdSuffixStrategy
新的TransactionIdSuffixStrategy
引入了 interface 来管理transactional.id
后缀。
默认实现为DefaultTransactionIdSuffixStrategy
当设置maxCache
大于零可以重复使用transactional.id
在特定范围内,否则将通过递增 counter 动态生成后缀。
有关更多信息,请参阅Fixed TransactionIdSuffix。
异步 @KafkaListener 返回
@KafkaListener
(以及@KafkaHandler
) 方法现在可以返回异步返回类型,包括CompletableFuture<?>
,Mono<?>
和 Kotlinsuspend
功能。
有关更多信息,请参阅 Async Returns 。
根据引发的异常将消息路由到自定义 DLT
现在可以根据异常的类型将消息重定向到自定义 DLT,该异常在消息处理过程中引发。
重定向规则通过RetryableTopic.exceptionBasedDltRouting
或RetryTopicConfigurationBuilder.dltRoutingRules
.
自定义 DLT 以及其他重试和死信主题会自动创建。
有关更多信息,请参阅根据引发的异常将消息路由到自定义 DLT。
弃用 ContainerProperties transactionManager 属性
弃用transactionManager
property 中ContainerProperties
赞成KafkaAwareTransactionManager
,与常规PlatformTransactionManager
.请参阅 ContainerProperties 和事务同步。
回滚处理后
新的AfterRollbackProcessor
应用程序接口processBatch
。
有关更多信息,请参阅 After-rollback Processor 。
更改 SameIntervalTopicReuseStrategy 默认值@RetryableTopic
改变@RetryableTopic
财产SameIntervalTopicReuseStrategy
default 值设置为SINGLE_TOPIC
.
请参阅 maxInterval Exponential Delay 的单个主题。
非阻塞重试支持类级别@KafkaListener
非阻塞重试支持对 Class 进行@KafkaListener。 请参阅 Non-blocking Retries。
支持 RetryTopicConfigurationProvider 中类的进程@RetryableTopic。
提供新的公共 API 以查找RetryTopicConfiguration
.
请参阅 查找 RetryTopicConfiguration
RetryTopicConfigurer 支持进程 MultiMethodKafkaListenerEndpoint。
这RetryTopicConfigurer
支持流程和注册MultiMethodKafkaListenerEndpoint
.
这MultiMethodKafkaListenerEndpoint
提供getter/setter
对于属性defaultMethod
和methods
.
修改EndpointCustomizer
严格来说MethodKafkaListenerEndpoint
类型。
这EndpointHandlerMethod
添加新构造函数为提供的 bean 构造一个实例。
提供 new classEndpointHandlerMultiMethod
处理程序 multi 方法,用于重试端点。
新的 API 方法,用于根据用户提供的函数寻找偏移量
ConsumerCallback
提供了一个新的 API,用于根据用户定义的函数查找偏移量,该函数将使用者中的当前偏移量作为参数。
有关更多详细信息,请参阅 Seek API Docs 。
@PartitionOffset SeekPosition 支持
添加seekPosition
property 设置为@PartitionOffset
支持TopicPartitionOffset.SeekPosition
.
有关更多详细信息,请参阅 manual-assignment 。
TopicPartitionOffset 中的新构造函数,它接受一个函数来计算要查找的偏移量
TopicPartitionOffset
具有一个新的构造函数,该构造函数采用用户提供的函数来计算要查找的偏移量。
使用此构造函数时,框架使用当前使用者偏移位置的 input 参数调用函数。
有关更多详细信息,请参阅 Seek API Docs 。
Spring Boot 应用程序名称作为默认客户端 ID 前缀
对于定义应用程序名称的 Spring Boot 应用程序,现在使用此名称 作为某些客户端类型的自动生成的客户端 ID 的默认前缀。 有关更多详细信息,请参阅默认客户端 ID 前缀。
增强了 MessageListenerContainers 的检索
ListenerContainerRegistry
提供两个新的 API 动态查找和筛选MessageListenerContainer
实例。getListenerContainersMatching(Predicate<String> idMatcher)
按 ID 进行筛选,另一个是getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
按 ID 和容器属性进行筛选。
看@KafkaListener
生命周期管理的 API 文档了解更多信息。
通过提供更多跟踪标签来增强观察
KafkaTemplateObservation
提供更多的跟踪标签(低基数)。KafkaListenerObservation
提供了新的 API 来查找高基数 Key 名称和更多跟踪标签(高基数或低基数)。
参见千分尺观察