对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
本节介绍如何发送消息。
用KafkaTemplate
本节介绍如何使用发送消息。KafkaTemplate
概述
包装一个生产者,并提供将数据发送到 Kafka 主题的便捷方法。
以下列表显示了以下方法:KafkaTemplate
KafkaTemplate
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
有关更多详细信息,请参阅 Javadoc。
在版本 3.0 中,以前返回的方法已更改为 return .
为了便于迁移,2.9 版本添加了一个方法,该方法提供具有返回类型的相同方法;此方法不再可用。ListenableFuture CompletableFuture usingCompletableFuture() CompletableFuture |
API 要求已向模板提供默认主题。sendDefault
API 将 a 作为参数,并将此时间戳存储在记录中。
用户提供的时间戳的存储方式取决于 Kafka 主题上配置的时间戳类型。
如果主题配置为 使用 ,则会记录用户指定的时间戳(如果未指定,则生成时间戳)。
如果主题配置为 使用 ,则忽略用户指定的时间戳,并将代理添加本地代理时间。timestamp
CREATE_TIME
LOG_APPEND_TIME
若要使用模板,可以配置生产者工厂,并在模板的构造函数中提供该工厂。 以下示例演示如何执行此操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从版本 2.5 开始,您现在可以覆盖工厂的属性,以创建具有同一工厂不同生产者配置的模板。ProducerConfig
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
请注意,类型(例如Spring Boot自动配置的bean)可以使用不同的缩小泛型类型进行引用。ProducerFactory<?, ?>
您还可以使用标准定义配置模板。<bean/>
然后,若要使用该模板,可以调用其方法之一。
将方法与参数一起使用时,主题、分区、键和时间戳信息将在包含以下项的消息头中提供:Message<?>
-
KafkaHeaders.TOPIC
-
KafkaHeaders.PARTITION
-
KafkaHeaders.KEY
-
KafkaHeaders.TIMESTAMP
消息有效负载是数据。
或者,您可以配置 with a 以获取包含发送结果(成功或失败)的异步回调,而不是等待 完成。
以下列表显示了接口的定义:KafkaTemplate
ProducerListener
Future
ProducerListener
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,模板配置了 ,该模板会记录错误,并且在发送成功时不执行任何操作。LoggingProducerListener
为方便起见,如果您只想实现其中一种方法,则提供了默认方法实现。
请注意,send 方法返回 .
您可以向监听器注册回调,以异步接收发送结果。
以下示例演示如何执行此操作:CompletableFuture<SendResult>
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
SendResult
具有两个属性,a 和 。
有关这些对象的信息,请参阅 Kafka API 文档。ProducerRecord
RecordMetadata
可以强制转换为 ;其属性包含失败的记录。Throwable
KafkaProducerException
failedProducerRecord
如果您希望阻止发送线程以等待结果,则可以调用 future 的方法;建议使用带有超时的方法。
如果设置了 ,则可能希望在等待之前调用,或者为方便起见,模板具有一个构造函数,该构造函数具有一个参数,该参数会导致模板在每次发送时调用。
仅当设置了 producer 属性并希望立即发送部分批处理时,才需要刷新。get()
linger.ms
flush()
autoFlush
flush()
linger.ms
例子
本节显示了向 Kafka 发送消息的示例:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
请注意,原因与属性有关。ExecutionException
KafkaProducerException
failedProducerRecord
在版本 3.0 中,以前返回的方法已更改为 return .
为了便于迁移,2.9 版本添加了一个方法,该方法提供具有返回类型的相同方法;此方法不再可用。ListenableFuture CompletableFuture usingCompletableFuture() CompletableFuture |
用RoutingKafkaTemplate
从版本 2.5 开始,可以使用 在运行时根据目标名称选择生产者。RoutingKafkaTemplate
topic
路由模板不支持事务、、 或操作,因为这些操作的主题未知。execute flush metrics |
该模板需要实例的映射。
这张地图应该按顺序排列(例如 a),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。java.util.regex.Pattern
ProducerFactory<Object, Object>
LinkedHashMap
以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用相同的模板发送到不同的主题,每个主题使用不同的值序列化程序。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
此示例的对应 s 显示在“注释属性”中。@KafkaListener
有关实现类似结果的另一种技术,但具有将不同类型发送到同一主题的附加功能,请参阅委派序列化程序和反序列化程序。
路由模板不支持事务、、 或操作,因为这些操作的主题未知。execute flush metrics |
用DefaultKafkaProducerFactory
如使用 KafkaTemplate
中所示,a 用于创建创建者。ProducerFactory
不使用 Transactions 时,缺省情况下,会按照 JavaDocs 中的建议创建所有客户机使用的单例生产者。
但是,如果调用模板,则可能会导致使用同一生产者的其他线程出现延迟。
从版本 2.3 开始,具有新属性 。
设置为 时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。DefaultKafkaProducerFactory
KafkaProducer
flush()
DefaultKafkaProducerFactory
producerPerThread
true
当 时,当不再需要生产者时,用户代码必须调用工厂。
这将在物理上关闭生产者并将其从 .
打电话或不会清理这些生产者。producerPerThread true closeThreadBoundProducer() ThreadLocal reset() destroy() |
创建 时,可以通过调用仅接受属性映射的构造函数从配置中获取键和/或值类(请参阅使用 KafkaTemplate
中的示例),或者可以将实例传递给构造函数(在这种情况下,所有 s 共享相同的实例)。
或者,您可以提供 s(从 2.3 版开始),用于获取每个实例的单独实例:DefaultKafkaProducerFactory
Serializer
Serializer
DefaultKafkaProducerFactory
Producer
Supplier<Serializer>
Serializer
Producer
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从版本 2.5.10 开始,您现在可以在创建工厂后更新生产者属性。
例如,如果您必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。
这些更改不会影响现有的生产者实例;调用以关闭任何现有生产者,以便使用新属性创建新的生产者。
注意:不能将事务性生产者工厂更改为非事务性生产者工厂,反之亦然。reset()
现在提供了两种新方法:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从版本 2.8 开始,如果将序列化程序作为对象提供(在构造函数中或通过 setter),工厂将调用该方法以使用配置属性配置它们。configure()
当 时,当不再需要生产者时,用户代码必须调用工厂。
这将在物理上关闭生产者并将其从 .
打电话或不会清理这些生产者。producerPerThread true closeThreadBoundProducer() ThreadLocal reset() destroy() |
用ReplyingKafkaTemplate
版本 2.1.3 引入了一个子类来提供请求/回复语义。
该类已命名,并具有两个附加方法;下面显示了方法签名:KafkaTemplate
ReplyingKafkaTemplate
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(另请参阅使用 Message<?>
s 请求/回复)。
结果是异步填充的结果(或超时的异常)。
结果还具有一个属性,该属性是调用 的结果。
您可以使用此将来确定发送操作的结果。CompletableFuture
sendFuture
KafkaTemplate.send()
在版本 3.0 中,这些方法返回的期货(及其属性)已更改为 s 而不是 s。sendFuture CompletableFuture ListenableFuture |
如果使用第一种方法,或者参数为 ,则使用模板的属性(默认为 5 秒)。replyTimeout
null
defaultReplyTimeout
从版本 2.8.8 开始,模板具有新方法。
如果应答容器配置为避免在容器初始化之前发送请求和应答,则此功能非常有用。waitForAssignment
auto.offset.reset=latest
使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的属性,因为在第一次轮询完成之前不会发送通知。pollTimeout |
以下 Spring Boot 应用程序显示了如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
请注意,我们可以使用 Boot 自动配置的容器工厂来创建回复容器。
如果将非平凡的反序列化程序用于回复,请考虑使用委托给配置的反序列化程序的 ErrorHandlingDeserializer
。
这样配置后,将异常完成,您可以在其属性中捕获 。RequestReplyFuture
ExecutionException
DeserializationException
cause
从版本 2.6.7 开始,除了检测 s 之外,模板还将调用该函数(如果提供)。
如果它返回异常,则将异常完成未来。DeserializationException
replyErrorChecker
下面是一个示例:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
模板设置一个标头(默认命名),服务器端必须回显该标头。KafkaHeaders.CORRELATION_ID
在这种情况下,以下应用程序响应:@KafkaListener
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
基础结构回显相关 ID 并确定回复主题。@KafkaListener
有关发送回复的更多信息,请参阅使用 @SendTo
转发侦听器结果。
该模板使用默认标题来指示回复所指向的主题。KafKaHeaders.REPLY_TOPIC
从版本 2.2 开始,模板会尝试从配置的回复容器中检测回复主题或分区。
如果容器配置为侦听单个主题或单个主题,则用于设置回复标头。
如果以其他方式配置容器,则用户必须设置回复标头。
在这种情况下,将在初始化期间写入日志消息。
以下示例使用:TopicPartitionOffset
INFO
KafkaHeaders.REPLY_TOPIC
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
使用单个应答进行配置时,可以对多个模板使用相同的应答主题,只要每个实例侦听不同的分区即可。
使用单个回复主题进行配置时,每个实例必须使用不同的 .
在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到相关 ID。
这对于自动缩放可能很有用,但会带来额外网络流量的开销,并且丢弃每个不需要的回复的成本很小。
使用此设置时,建议将模板设置为 ,这样可以减少对 DEBUG 的意外回复的日志记录级别,而不是默认的 ERROR。TopicPartitionOffset
group.id
sharedReplyTopic
true
以下是将回复容器配置为使用同一共享回复主题的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
如果您有多个客户端实例,并且未按照上一段所述进行配置,则每个实例都需要一个专用的回复主题。
另一种方法是为每个实例设置并使用专用分区。
包含一个四字节的 int (big-endian)。
服务器必须使用此标头将回复路由到正确的分区(执行此操作)。
但是,在这种情况下,应答容器不得使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(通过在其构造函数中使用 a)。KafkaHeaders.REPLY_PARTITION Header @KafkaListener TopicPartitionOffset ContainerProperties |
要求 Jackson 位于类路径上(对于 )。
如果它不可用,则消息转换器没有标头映射器,因此必须使用 配置 ,如前所述。DefaultKafkaHeaderMapper @KafkaListener MessagingMessageConverter SimpleKafkaHeaderMapper |
默认情况下,使用 3 个标头:
-
KafkaHeaders.CORRELATION_ID
- 用于将回复与请求相关联 -
KafkaHeaders.REPLY_TOPIC
- 用于告诉服务器在哪里回复 -
KafkaHeaders.REPLY_PARTITION
- (可选)用于告诉服务器要回复哪个分区
基础结构使用这些标头名称来路由回复。@KafkaListener
从版本 2.3 开始,您可以自定义标头名称 - 模板有 3 个属性 、 和 。
如果您的服务器不是 Spring 应用程序(或不使用 )。correlationHeaderName
replyTopicHeaderName
replyPartitionHeaderName
@KafkaListener
相反,如果请求应用程序不是 Spring 应用程序,并且将关联信息放在不同的标头中,从 3.0 版开始,您可以在侦听器容器工厂上配置自定义,并且该标头将被回显。
以前,侦听器必须回显自定义关联标头。correlationHeaderName |
请求/回复 sMessage<?>
版本 2.7 在发送和接收 的抽象中添加了方法:ReplyingKafkaTemplate
spring-messaging
Message<?>
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的默认值,还有一些重载版本可以在方法调用中超时。replyTimeout
在版本 3.0 中,这些方法返回的期货(及其属性)已更改为 s 而不是 s。sendFuture CompletableFuture ListenableFuture |
如果使用者或模板可以通过回复消息中的配置或类型元数据转换有效负载,而无需任何其他信息,请使用第一种方法。Deserializer
MessageConverter
如果需要提供返回类型的类型信息,请使用第二种方法,以帮助消息转换器。 这也允许同一模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。 以下是后者的示例:
-
Java
-
Kotlin
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
-
Java
-
Kotlin
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
在版本 3.0 中,这些方法返回的期货(及其属性)已更改为 s 而不是 s。sendFuture CompletableFuture ListenableFuture |
使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的属性,因为在第一次轮询完成之前不会发送通知。pollTimeout |
如果您有多个客户端实例,并且未按照上一段所述进行配置,则每个实例都需要一个专用的回复主题。
另一种方法是为每个实例设置并使用专用分区。
包含一个四字节的 int (big-endian)。
服务器必须使用此标头将回复路由到正确的分区(执行此操作)。
但是,在这种情况下,应答容器不得使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(通过在其构造函数中使用 a)。KafkaHeaders.REPLY_PARTITION Header @KafkaListener TopicPartitionOffset ContainerProperties |
要求 Jackson 位于类路径上(对于 )。
如果它不可用,则消息转换器没有标头映射器,因此必须使用 配置 ,如前所述。DefaultKafkaHeaderMapper @KafkaListener MessagingMessageConverter SimpleKafkaHeaderMapper |
相反,如果请求应用程序不是 Spring 应用程序,并且将关联信息放在不同的标头中,从 3.0 版开始,您可以在侦听器容器工厂上配置自定义,并且该标头将被回显。
以前,侦听器必须回显自定义关联标头。correlationHeaderName |
在版本 3.0 中,这些方法返回的期货(及其属性)已更改为 s 而不是 s。sendFuture CompletableFuture ListenableFuture |
回复类型 message<?>
当返回 2.5 之前版本的 时,需要填充回复主题和相关 ID 标头。
在此示例中,我们使用请求中的回复主题标头:@KafkaListener
Message<?>
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这还显示了如何在回复记录上设置密钥。
从版本 2.5 开始,框架将检测这些标头是否缺失,并使用主题填充它们 - 根据值确定的主题或传入标头(如果存在)。
它还将回显传入的 和 (如果存在)。@SendTo
KafkaHeaders.REPLY_TOPIC
KafkaHeaders.CORRELATION_ID
KafkaHeaders.REPLY_PARTITION
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
聚合多个回复
Using ReplyingKafkaTemplate
中的模板严格适用于单个请求/应答方案。
对于单个消息的多个接收方返回回复的情况,可以使用 .
这是 Scatter-Gather Enterprise Integration Pattern 的客户端实现。AggregatingReplyingKafkaTemplate
与 一样,构造函数需要生产者工厂和侦听器容器来接收回复;它有第三个参数,每次收到回复时都会参考该参数;当谓词返回时,S 的集合用于完成方法返回的 。ReplyingKafkaTemplate
AggregatingReplyingKafkaTemplate
BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
true
ConsumerRecord
Future
sendAndReceive
还有一个附加属性(默认值为 false)。
当 设置为 时,部分结果通常不会用 完成未来,而是正常完成未来(只要至少收到一条回复记录)。returnPartialOnTimeout
true
KafkaReplyTimeoutException
从版本 2.3.5 开始,谓词也会在超时后调用(如果为 )。
第一个参数是当前记录列表;第二种情况是此调用是否由于超时而导致。
谓词可以修改记录列表。returnPartialOnTimeout
true
true
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是 a,其值是 s 的集合。
“外部”不是“真实”记录,它是由模板合成的,作为请求收到的实际回复记录的持有者。
当发生正常发布(发布策略返回 true)时,主题设置为 ;如果为 true,并且发生超时(并且至少已收到一条回复记录),则主题设置为 。
该模板为这些“主题”名称提供了常量静态变量:ConsumerRecord
ConsumerRecord
ConsumerRecord
aggregatedResults
returnPartialOnTimeout
partialResultsAfterTimeout
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
中的实数 s 包含从中接收回复的实际主题。ConsumerRecord
Collection
应答的侦听器容器必须配置为 或 ;consumer 属性必须是(自 2.3 版起的默认值)。
为了避免丢失消息的可能性,模板仅在未完成的请求为零时(即发布策略释放最后一个未完成的请求)时提交偏移量。
重新平衡后,可能会重复发送回复;对于任何飞行中的请求,这些都将被忽略;当收到已发布的回复的重复回复时,您可能会看到错误日志消息。AckMode.MANUAL AckMode.MANUAL_IMMEDIATE enable.auto.commit false |
如果将 ErrorHandlingDeserializer 与此聚合模板一起使用,则框架不会自动检测 s。
相反,记录(带值)将原封不动地返回,标头中的反序列化异常。
建议应用程序调用实用工具方法方法来确定是否发生反序列化异常。
有关更多信息,请参阅其 JavaDocs。
此聚合模板也不需要 ;您应该对回复的每个元素执行检查。DeserializationException null ReplyingKafkaTemplate.checkDeserialization() replyErrorChecker |
应答的侦听器容器必须配置为 或 ;consumer 属性必须是(自 2.3 版起的默认值)。
为了避免丢失消息的可能性,模板仅在未完成的请求为零时(即发布策略释放最后一个未完成的请求)时提交偏移量。
重新平衡后,可能会重复发送回复;对于任何飞行中的请求,这些都将被忽略;当收到已发布的回复的重复回复时,您可能会看到错误日志消息。AckMode.MANUAL AckMode.MANUAL_IMMEDIATE enable.auto.commit false |
如果将 ErrorHandlingDeserializer 与此聚合模板一起使用,则框架不会自动检测 s。
相反,记录(带值)将原封不动地返回,标头中的反序列化异常。
建议应用程序调用实用工具方法方法来确定是否发生反序列化异常。
有关更多信息,请参阅其 JavaDocs。
此聚合模板也不需要 ;您应该对回复的每个元素执行检查。DeserializationException null ReplyingKafkaTemplate.checkDeserialization() replyErrorChecker |