此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

转发侦听器结果@SendTo

从版本 2.0 开始,如果您还注解了@KafkaListener替换为@SendTo注解,并且方法调用返回一个结果,则结果将转发到@SendTo.spring-doc.cadn.net.cn

@SendTovalue 可以有多种形式:spring-doc.cadn.net.cn

从版本 2.1.11 和 2.2.1 开始,属性占位符在@SendTo值。spring-doc.cadn.net.cn

表达式 evaluation 的结果必须是String,这表示主题名称。 以下示例显示了各种使用方法@SendTo:spring-doc.cadn.net.cn

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}
为了支持@SendTo,必须为侦听器容器工厂提供KafkaTemplate(在其replyTemplate属性),该属性用于发送回复。 这应该是一个KafkaTemplate而不是ReplyingKafkaTemplate在客户端用于请求/回复处理。 使用 Spring Boot 时,它会将模板自动配置到工厂中;在配置您自己的工厂时,必须按照以下示例所示进行设置。

从版本 2.2 开始,您可以添加ReplyHeadersConfigurer添加到侦听器容器工厂。 将查询此信息以确定要在回复消息中设置的标头。 以下示例演示如何添加ReplyHeadersConfigurer:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

如果需要,您还可以添加更多标题。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

当您使用@SendTo,您必须配置ConcurrentKafkaListenerContainerFactory替换为KafkaTemplate在其replyTemplate属性来执行发送。 Spring Boot 将自动连接其自动配置的模板(如果存在单个实例,则为任何模板)。spring-doc.cadn.net.cn

除非你使用请求/回复语义,否则只有简单的send(topic, value)方法,因此您可能希望创建一个子类来生成分区或键。 以下示例显示了如何执行此作:
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory()) {

        @Override
        public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}

如果 listener 方法返回Message<?>Collection<Message<?>>,listener 方法负责设置回复的消息标头。 例如,在处理来自ReplyingKafkaTemplate,您可以执行以下作:spring-doc.cadn.net.cn

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

当使用请求/回复语义时,发送方可以请求目标分区。spring-doc.cadn.net.cn

您可以对@KafkaListenermethod 替换为@SendTo即使没有返回任何结果。 这是为了允许配置errorHandler,可以将有关失败消息传送的信息转发到某个主题。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

有关更多信息,请参阅处理异常spring-doc.cadn.net.cn

如果侦听器方法返回Iterable,默认情况下,将发送每个元素的一条记录作为值。 从版本 2.3.5 开始,将splitIterables属性@KafkaListenerfalse整个结果将作为单个ProducerRecord. 这需要在回复模板的 producer 配置中使用合适的序列化器。 但是,如果回复是Iterable<Message<?>>该属性将被忽略,每条消息将单独发送。