此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
使用 Batches
从版本 3.0 开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode
设置为true
,则轮询 Kafka 收到的所有记录Consumer
将以List<?>
添加到 listener 方法中。
否则,将一次使用一条记录调用该方法。
批处理的大小由 Kafka 使用者属性控制max.poll.records
,fetch.min.bytes
,fetch.max.wait.ms
;有关更多信息,请参阅 Kafka 文档。
接收 batch 时,允许使用以下类型签名:
List<Person>
Message<List<Person>>
在第一个选项List<Person>
,则侦听器不会获取任何消息标头。
如果第二个类型签名 (Message<List<Person>>
),则可以访问标头;但是,所有标头仍采用Collection
.
我们来看以下示例。
假设Message
包含一个包含 10 个的列表Person
对象。
这MessageHeaders
的Message
包含标题的映射,其中 Key 作为标题名称,Value 作为 List。
此列表包含该标头的标头值,其顺序与有效负载列表相同。
因此,应用程序需要从MessageHeaders
map 的 PAYLOAD 列表。
请注意,以List<Message<Person>>
在批处理模式下使用时不允许。
从 version 开始4.0.2
,则 Binder 在批处理模式下使用时支持 DLQ 功能。
请记住,在处于批处理模式的使用者绑定上使用 DLQ 时,从上一次轮询收到的所有记录都将传递到 DLQ 主题。
使用批处理模式时,不支持在 Binder 中重试,因此maxAttempts 将被覆盖为 1。
您可以配置DefaultErrorHandler (使用ListenerContainerCustomizer ) 实现与在 Binder 中重试类似的功能。
您也可以使用手册AckMode 并调用Ackowledgment.nack(index, sleep) 提交部分批处理的偏移量并重新交付剩余记录。
有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。 |
接收KafkaNull 对象,则 Received 列表将包含相应KafkaNull 对象。
这对两者都是正确的List<Person> 和Message<List<Person>> style 类型签名。 |
在批处理模式下使用时的可观察性
批量消费记录时,不直接支持观察追踪传播功能。
这是因为 Kafka Binders 使用的 Spring for Apache Kafka 库不支持对批处理侦听器进行跟踪;它仅支持 Record 侦听器。
在批量侦听器中,收到的记录可能来自多个主题/分区和多个生产者,其中添加跟踪信息是可选的。
由于批处理中的记录之间可能没有任何关联,因此框架无法对跟踪它们做出任何假设,例如将它们作为单个跟踪 ID 提供等。
如果使用Message<List<String>>
,然后,您可以获得一个名为kafka_batchConvertedHeaders
,其中包含一个列表,其中包含与有效负载具有相同条目数的列表。
此列表具有Map
,其中包含跟踪标头。
但是,由应用程序正确迭代此内容并开始观察。