对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
“Tombstone”记录的 Null 负载和日志压缩
当您使用 Log Compaction 时,您可以使用null
payloads 来标识密钥的删除。
您还可以接收null
值,例如Deserializer
那可能会回来null
当它无法反序列化值时。
要发送null
payload 使用KafkaTemplate
中,您可以将 null 传递到send()
方法。
一个例外是send(Message<?> message)
变体。
因为spring-messaging
Message<?>
不能有null
payload 中,您可以使用名为KafkaNull
,框架发送null
.
为方便起见,静态KafkaNull.INSTANCE
。
当您使用消息侦听器容器时,收到的ConsumerRecord
具有null
value()
.
要配置@KafkaListener
处理null
payload 中,您必须使用@Payload
annotation 替换为required = false
.
如果它是压缩日志的逻辑删除消息,则通常还需要 key,以便您的应用程序可以确定哪个 key 是”deleted
".
以下示例显示了此类配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您使用类级别@KafkaListener
具有多个@KafkaHandler
方法,则需要一些额外的配置。
具体来说,您需要一个@KafkaHandler
方法替换为KafkaNull
有效载荷。
以下示例显示如何配置一个:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是null
不KafkaNull
.
请参阅手动分配所有分区。 |
此功能需要使用KafkaNullAwarePayloadArgumentResolver 框架将在使用默认MessageHandlerMethodFactory .
使用自定义MessageHandlerMethodFactory 看添加自定义HandlerMethodArgumentResolver 自@KafkaListener . |