此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

发送消息

本节介绍如何发送消息。spring-doc.cadn.net.cn

KafkaTemplate

本节介绍如何使用KafkaTemplate以发送消息。spring-doc.cadn.net.cn

概述

KafkaTemplate包装生产者并提供将数据发送到 Kafka 主题的便捷方法。 下面的清单显示了KafkaTemplate:spring-doc.cadn.net.cn

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

有关更多详细信息,请参阅 Javadocspring-doc.cadn.net.cn

在版本 3.0 中,之前返回ListenableFuture已更改为 returnCompletableFuture. 为了方便迁移,2.9 版本添加了一种方法usingCompletableFuture()它提供了相同的方法CompletableFuture返回类型;此方法不再可用。

sendDefaultAPI 要求已向模板提供默认主题。spring-doc.cadn.net.cn

API 接收timestamp作为参数,并将此时间戳存储在记录中。 用户提供的时间戳的存储方式取决于在 Kafka 主题上配置的时间戳类型。 如果主题配置为使用CREATE_TIME时,将记录用户指定的时间戳(如果未指定,则生成)。 如果主题配置为使用LOG_APPEND_TIME时,将忽略用户指定的时间戳,并且代理会添加本地代理时间。spring-doc.cadn.net.cn

metricspartitionsFormethods 委托给底层Producer. 这executemethod 提供对底层Producer.spring-doc.cadn.net.cn

要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从版本 2.5 开始,您现在可以覆盖工厂的ProducerConfig属性创建来自同一工厂的不同生产者配置的模板。spring-doc.cadn.net.cn

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

请注意,类型为ProducerFactory<?, ?>(例如由 Spring Boot 自动配置的那个)可以用不同的狭窄泛型类型引用。spring-doc.cadn.net.cn

您还可以使用标准的<bean/>定义。spring-doc.cadn.net.cn

然后,要使用模板,您可以调用其方法之一。spring-doc.cadn.net.cn

当您使用带有Message<?>参数,则主题、分区、键和时间戳信息在消息标头中提供,其中包括以下项目:spring-doc.cadn.net.cn

消息有效负载是数据。spring-doc.cadn.net.cn

或者,您可以配置KafkaTemplate替换为ProducerListener要获取包含发送结果(成功或失败)的异步回调,而不是等待Future以完成。 下面的清单显示了ProducerListener接口:spring-doc.cadn.net.cn

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下,模板配置了LoggingProducerListener,它会记录错误,并且在发送成功时不执行任何作。spring-doc.cadn.net.cn

为方便起见,如果您只想实现其中一种方法,则提供了默认方法实现。spring-doc.cadn.net.cn

请注意,send 方法返回一个CompletableFuture<SendResult>. 您可以向侦听器注册回调,以异步接收发送结果。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult具有两个属性,一个ProducerRecordRecordMetadata. 有关这些对象的信息,请参阅 Kafka API 文档。spring-doc.cadn.net.cn

Throwable可以转换为KafkaProducerException;其producerRecord属性包含失败的记录。spring-doc.cadn.net.cn

如果你想阻塞发送线程等待结果,你可以调用 future 的get()方法;建议使用带有 timeout 的方法。 如果您已将linger.ms,您可能希望调用flush()之前,或者为了方便起见,模板有一个带有autoFlush参数,该参数使模板flush()在每次发送时。 仅当您已设置linger.msproducer 属性,并希望立即发送部分批处理。spring-doc.cadn.net.cn

例子

本节介绍向 Kafka 发送消息的示例:spring-doc.cadn.net.cn

示例 1.非阻塞 (异步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
阻止 (Sync)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

请注意,ExecutionExceptionKafkaProducerException使用producerRecord财产。spring-doc.cadn.net.cn

RoutingKafkaTemplate

从版本 2.5 开始,您可以使用RoutingKafkaTemplate在运行时根据目标选择创建者topic名字。spring-doc.cadn.net.cn

路由模板不支持事务,execute,flushmetrics作,因为这些作的主题未知。

该模板需要java.util.regex.PatternProducerFactory<Object, Object>实例。 此映射应排序(例如LinkedHashMap),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。spring-doc.cadn.net.cn

以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用同一模板发送到不同的主题,每个主题使用不同的值序列化器。spring-doc.cadn.net.cn

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

相应的@KafkaListener的 显示在 Annotation Properties 中。spring-doc.cadn.net.cn

有关实现类似结果的另一种技术,但具有将不同类型的发送到同一主题的附加功能,请参阅 Delegating Serializer 和 Deserializerspring-doc.cadn.net.cn

DefaultKafkaProducerFactory

KafkaTemplate一个ProducerFactory用于创建 Producer。spring-doc.cadn.net.cn

不使用 Transaction 时,默认情况下,DefaultKafkaProducerFactory创建一个供所有客户端使用的单例 Producer,如KafkaProducerJavaDocs 的 Java 文档。 但是,如果您调用flush()在模板上,这可能会导致使用同一生产者的其他线程出现延迟。 从版本 2.3 开始,DefaultKafkaProducerFactory具有新属性producerPerThread. 当设置为true,工厂将为每个线程创建(并缓存)一个单独的 producer,以避免此问题。spring-doc.cadn.net.cn

什么时候producerPerThreadtrue,用户代码必须调用closeThreadBoundProducer()在工厂中,当不再需要生产者时。 这将以物理方式关闭 producer 并将其从ThreadLocal. 叫reset()destroy()不会清理这些 producer。

创建DefaultKafkaProducerFactory、键和/或值Serializer可以通过调用仅接受属性 Map 的构造函数从配置中获取类(参见KafkaTemplate) 或Serializer实例可以传递给DefaultKafkaProducerFactoryconstructor (在这种情况下,所有Producer共享相同的实例)。 或者,您可以提供Supplier<Serializer>s(从版本 2.3 开始),该 S 将用于获取单独的Serializer实例Producer:spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从版本 2.5.10 开始,您现在可以在创建工厂后更新 producer 属性。 这可能很有用,例如,如果您必须在凭证更改后更新 SSL 密钥/信任存储位置。 这些更改不会影响现有的生产者实例;叫reset()关闭任何现有生产者,以便使用新属性创建新的生产者。 注意:您不能将事务性生产者工厂更改为非事务性工厂,反之亦然。spring-doc.cadn.net.cn

现在提供了两种新方法:spring-doc.cadn.net.cn

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

从版本 2.8 开始,如果你提供序列化器作为对象(在构造函数中或通过 setter),工厂将调用configure()方法配置它们。spring-doc.cadn.net.cn

ReplyingKafkaTemplate

版本 2.1.3 引入了KafkaTemplate提供请求/回复语义。 该类名为ReplyingKafkaTemplate并且有两个额外的方法;方法签名如下:spring-doc.cadn.net.cn

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是一个CompletableFuture该 ID 将异步填充 result(或 Exception,对于 Timeout)。 结果还具有sendFutureproperty 中调用的KafkaTemplate.send(). 您可以使用此 future 来确定 send作的结果。spring-doc.cadn.net.cn

在版本 3.0 中,这些方法返回的 futures(及其sendFutureproperties) 已更改为CompletableFutures 而不是ListenableFutures.

如果使用第一种方法,或者replyTimeoutargument 是null中,模板的defaultReplyTimeout属性(默认为 5 秒)。spring-doc.cadn.net.cn

从版本 2.8.8 开始,模板有一个新方法waitForAssignment. 如果回复容器配置了auto.offset.reset=latest以避免在初始化容器之前发送请求和回复。spring-doc.cadn.net.cn

当使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的pollTimeout属性,因为在第一次轮询完成之前不会发送通知。

以下 Spring Boot 应用程序显示了如何使用该功能的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

请注意,我们可以使用 Boot 的自动配置的容器工厂来创建回复容器。spring-doc.cadn.net.cn

如果 non-tanky 反序列化器用于回复,请考虑使用ErrorHandlingDeserializer委托给你配置的 deserializer。 配置后,RequestReplyFuture将破例完成,您可以赶上ExecutionException,使用DeserializationException在其cause财产。spring-doc.cadn.net.cn

从版本 2.6.7 开始,除了检测DeserializationExceptions,模板将调用replyErrorChecker函数(如果提供)。 如果它返回异常,则 future 将异常完成。spring-doc.cadn.net.cn

下面是一个示例:spring-doc.cadn.net.cn

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

该模板会设置一个标头(名为KafkaHeaders.CORRELATION_ID默认情况下),该回调必须由服务器端回显。spring-doc.cadn.net.cn

在这种情况下,以下@KafkaListener应用程序响应:spring-doc.cadn.net.cn

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListenerinfrastructure 回显相关 ID 并确定回复主题。spring-doc.cadn.net.cn

转发侦听器结果@SendTo了解有关发送回复的更多信息。 模板使用默认标头KafKaHeaders.REPLY_TOPIC以指示回复所指向的主题。spring-doc.cadn.net.cn

从版本 2.2 开始,模板会尝试从配置的回复容器中检测回复主题或分区。 如果容器配置为侦听单个主题或单个TopicPartitionOffset,它用于设置回复标头。 如果容器配置为其他方式,则用户必须设置回复标头。 在这种情况下,INFO日志消息在初始化期间写入。 以下示例使用KafkaHeaders.REPLY_TOPIC:spring-doc.cadn.net.cn

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

使用单个回复进行配置时TopicPartitionOffset,您可以对多个模板使用相同的回复主题,只要每个实例侦听不同的分区即可。 使用单个回复主题进行配置时,每个实例必须使用不同的group.id. 在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例会找到相关 ID。 这对于自动扩展可能很有用,但会产生额外网络流量的开销,并且丢弃每个不需要的回复的成本很小。 使用此设置时,我们建议您将模板的sharedReplyTopictrue,这会降低对 DEBUG 而不是默认 ERROR 的意外回复的日志记录级别。spring-doc.cadn.net.cn

以下是配置回复容器以使用相同的共享回复主题的示例:spring-doc.cadn.net.cn

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果您有多个客户端实例,并且未按照上一段所述配置它们,则每个实例都需要一个专用的回复主题。 另一种方法是将KafkaHeaders.REPLY_PARTITION并为每个实例使用专用分区。 这Header包含一个四字节的 int (big-endian)。 服务器必须使用此标头将回复路由到正确的分区 (@KafkaListener执行此作)。 但是,在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(通过使用TopicPartitionOffset在其ContainerPropertiesconstructor 的 API 中)。
DefaultKafkaHeaderMapper要求 Jackson 位于 Classpath 上(对于@KafkaListener). 如果不可用,则消息转换器没有标头映射器,因此您必须配置MessagingMessageConverter替换为SimpleKafkaHeaderMapper,如前所述。

默认情况下,使用 3 个标头:spring-doc.cadn.net.cn

这些标头名称由@KafkaListenerinfrastructure 来路由回复。spring-doc.cadn.net.cn

从版本 2.3 开始,您可以自定义标头名称 - 该模板有 3 个属性correlationHeaderName,replyTopicHeaderNamereplyPartitionHeaderName. 如果您的服务器不是 Spring 应用程序(或者不使用@KafkaListener).spring-doc.cadn.net.cn

相反,如果请求应用程序不是 Spring 应用程序,并且将关联信息放在不同的标头中,则从版本 3.0 开始,您可以配置自定义correlationHeaderName在侦听器容器工厂上,该标头将被回显。 以前,侦听器必须回显自定义关联标头。

请求/回复Message<?>s

版本 2.7 向ReplyingKafkaTemplate发送和接收spring-messagingMessage<?>抽象化:spring-doc.cadn.net.cn

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

这些将使用模板的默认值replyTimeout,还有一些重载版本可以在方法调用中超时。spring-doc.cadn.net.cn

在版本 3.0 中,这些方法返回的 futures(及其sendFutureproperties) 已更改为CompletableFutures 而不是ListenableFutures.

如果使用者的Deserializer或模板的MessageConverter可以通过回复消息中的配置或类型元数据,在没有任何附加信息的情况下转换有效负载。spring-doc.cadn.net.cn

如果需要为 return 类型提供类型信息,请使用第二种方法来帮助消息转换器。 这也允许同一个模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。 以下是后者的示例:spring-doc.cadn.net.cn

模板 Bean
@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
使用模板
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })

回复类型 message<?>

@KafkaListener返回Message<?>,对于 2.5 之前的版本,必须填充回复主题和相关 ID 标头。 在此示例中,我们使用请求中的回复主题标头:spring-doc.cadn.net.cn

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

这还显示了如何在回复记录上设置键。spring-doc.cadn.net.cn

从版本 2.5 开始,框架将检测这些标头是否缺失,并使用主题填充它们 - 由@SendTo值或传入的KafkaHeaders.REPLY_TOPIC标头(如果存在)。 它还将回显传入的KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION(如果存在)。spring-doc.cadn.net.cn

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

聚合多个回复

中的 模板ReplyingKafkaTemplate严格适用于单个请求/答复方案。 对于单个消息的多个接收者返回回复的情况,您可以使用AggregatingReplyingKafkaTemplate. 这是 Scatter-Gather Enterprise Integration Pattern 的客户端实现。spring-doc.cadn.net.cn

ReplyingKafkaTemplateAggregatingReplyingKafkaTemplateconstructor 需要一个 producer 工厂和一个 Listener 容器来接收回复;它有第三个参数BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy每次收到回复时都会查阅;当谓词返回trueConsumerRecords 用于完成FuturesendAndReceive方法。spring-doc.cadn.net.cn

还有一个附加属性returnPartialOnTimeout(默认为 false)。 当此项设置为true,而不是用KafkaReplyTimeoutException,则部分结果正常补全 future(只要至少收到一条回复记录)。spring-doc.cadn.net.cn

从版本 2.3.5 开始,谓词也会在超时后调用(如果returnPartialOnTimeouttrue). 第一个参数是当前记录列表;第二个是true如果此调用是由于超时。 谓词可以修改记录列表。spring-doc.cadn.net.cn

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

请注意,返回类型是ConsumerRecord其值是ConsumerRecords. “外部”ConsumerRecord不是“真实”记录,它由模板合成,作为为请求收到的实际回复记录的持有者。 当发生正常发布时(发布策略返回 true),主题设置为aggregatedResults;如果returnPartialOnTimeout为 true,并且发生超时(并且至少收到了一条回复记录),则主题设置为partialResultsAfterTimeout. 该模板为这些 “topic” 名称提供常量静态变量:spring-doc.cadn.net.cn

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

真实ConsumerRecords 在Collection包含从中接收回复的实际主题。spring-doc.cadn.net.cn

回复的侦听器容器必须配置为AckMode.MANUALAckMode.MANUAL_IMMEDIATE;Consumer 属性enable.auto.commit必须是false(自 2.3 版起的默认值)。 为避免任何丢失消息的可能性,模板仅在未完成请求为零时(即当 release 策略释放最后一个未完成的请求时)提交 offsets。 重新平衡后,可能会出现重复的回复投放;对于任何正在进行的请求,这些都将被忽略;当收到已发布回复的重复回复时,您可能会看到错误日志消息。
如果您使用ErrorHandlingDeserializer使用此聚合模板时,框架不会自动检测DeserializationExceptions. 相反,记录(带有null值)将完整返回,但 Headers 中存在反序列化异常。 建议应用程序调用 Utility 方法ReplyingKafkaTemplate.checkDeserialization()方法来确定是否发生了反序列化异常。 有关更多信息,请参阅其 JavaDocs。 这replyErrorChecker也不要求此聚合模板;您应该对回复的每个元素执行检查。