此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

“Tombstone”记录的 Null 负载和日志压缩

当您使用 Log Compaction 时,您可以使用nullpayloads 来标识密钥的删除。spring-doc.cadn.net.cn

您还可以接收null值,例如Deserializer那可能会回来null当它无法反序列化值时。spring-doc.cadn.net.cn

要发送nullpayload 使用KafkaTemplate中,您可以将 null 传递到send()方法。 一个例外是send(Message<?> message)变体。 因为spring-messaging Message<?>不能有nullpayload 中,您可以使用名为KafkaNull,框架发送null. 为方便起见,静态KafkaNull.INSTANCEspring-doc.cadn.net.cn

当您使用消息侦听器容器时,收到的ConsumerRecord具有null value().spring-doc.cadn.net.cn

要配置@KafkaListener处理nullpayload 中,您必须使用@Payloadannotation 替换为required = false. 如果它是压缩日志的逻辑删除消息,则通常还需要 key,以便您的应用程序可以确定哪个 key 是”deleted". 以下示例显示了此类配置:spring-doc.cadn.net.cn

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

当您使用类级别@KafkaListener具有多个@KafkaHandler方法,则需要一些额外的配置。 具体来说,您需要一个@KafkaHandler方法替换为KafkaNull有效载荷。 以下示例显示如何配置一个:spring-doc.cadn.net.cn

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

请注意,参数是nullKafkaNull.spring-doc.cadn.net.cn

此功能需要使用KafkaNullAwarePayloadArgumentResolver框架将在使用默认MessageHandlerMethodFactory. 使用自定义MessageHandlerMethodFactory添加自定义HandlerMethodArgumentResolver@KafkaListener.