此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

使用 Batches

从版本 3.0 开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,则轮询 Kafka 收到的所有记录Consumer将以List<?>添加到 listener 方法中。 否则,将一次使用一条记录调用该方法。 批处理的大小由 Kafka 使用者属性控制max.poll.records,fetch.min.bytes,fetch.max.wait.ms;有关更多信息,请参阅 Kafka 文档。spring-doc.cadn.net.cn

接收 batch 时,允许使用以下类型签名:spring-doc.cadn.net.cn

List<Person>
Message<List<Person>>

在第一个选项List<Person>,则侦听器不会获取任何消息标头。 如果第二个类型签名 (Message<List<Person>>),则可以访问标头;但是,所有标头仍采用Collection. 我们来看以下示例。spring-doc.cadn.net.cn

假设Message包含一个包含 10 个的列表Person对象。 这MessageHeadersMessage包含标题的映射,其中 Key 作为标题名称,Value 作为 List。 此列表包含该标头的标头值,其顺序与有效负载列表相同。 因此,应用程序需要从MessageHeadersmap 的 PAYLOAD 列表。spring-doc.cadn.net.cn

请注意,以List<Message<Person>>在批处理模式下使用时不允许。spring-doc.cadn.net.cn

从 version 开始4.0.2,则 Binder 在批处理模式下使用时支持 DLQ 功能。 请记住,在处于批处理模式的使用者绑定上使用 DLQ 时,从上一次轮询收到的所有记录都将传递到 DLQ 主题。spring-doc.cadn.net.cn

使用批处理模式时,不支持在 Binder 中重试,因此maxAttempts将被覆盖为 1。 您可以配置DefaultErrorHandler(使用ListenerContainerCustomizer) 实现与在 Binder 中重试类似的功能。 您也可以使用手册AckMode并调用Ackowledgment.nack(index, sleep)提交部分批处理的偏移量并重新交付剩余记录。 有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档
接收KafkaNull对象,则 Received 列表将包含相应KafkaNull对象。 这对两者都是正确的List<Person>Message<List<Person>>style 类型签名。

在批处理模式下使用时的可观察性

批量消费记录时,不直接支持观察追踪传播功能。 这是因为 Kafka Binders 使用的 Spring for Apache Kafka 库不支持对批处理侦听器进行跟踪;它仅支持 Record 侦听器。 在批量侦听器中,收到的记录可能来自多个主题/分区和多个生产者,其中添加跟踪信息是可选的。 由于批处理中的记录之间可能没有任何关联,因此框架无法对跟踪它们做出任何假设,例如将它们作为单个跟踪 ID 提供等。 如果使用Message<List<String>>,然后,您可以获得一个名为kafka_batchConvertedHeaders,其中包含一个列表,其中包含与有效负载具有相同条目数的列表。 此列表具有Map,其中包含跟踪标头。 但是,由应用程序正确迭代此内容并开始观察。spring-doc.cadn.net.cn