对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
转发侦听器结果@SendTo
从版本 2.0 开始,如果您还注解了@KafkaListener
替换为@SendTo
注解,并且方法调用返回一个结果,则结果将转发到@SendTo
.
这@SendTo
value 可以有多种形式:
-
@SendTo("someTopic")
路由到 Literal 主题。 -
@SendTo("#{someExpression}")
路由到通过在应用程序上下文初始化期间计算表达式一次来确定的主题。 -
@SendTo("!{someExpression}")
路由到通过在运行时评估表达式确定的主题。 这#root
用于评估的 object 具有三个属性:-
request
:入站ConsumerRecord
(或ConsumerRecords
对象进行批处理侦听器)。 -
source
:这org.springframework.messaging.Message<?>
从request
. -
result
:该方法返回结果。
-
-
@SendTo
(无属性):这被视为!{source.headers['kafka_replyTopic']}
(自版本 2.1.3 起)。
从版本 2.1.11 和 2.2.1 开始,属性占位符在@SendTo
值。
表达式 evaluation 的结果必须是String
,这表示主题名称。
以下示例显示了各种使用方法@SendTo
:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
为了支持@SendTo ,必须为侦听器容器工厂提供KafkaTemplate (在其replyTemplate 属性),该属性用于发送回复。
这应该是一个KafkaTemplate 而不是ReplyingKafkaTemplate 在客户端用于请求/回复处理。
使用 Spring Boot 时,它会将模板自动配置到工厂中;在配置您自己的工厂时,必须按照以下示例所示进行设置。 |
从版本 2.2 开始,您可以添加ReplyHeadersConfigurer
添加到侦听器容器工厂。
将查询此信息以确定要在回复消息中设置的标头。
以下示例演示如何添加ReplyHeadersConfigurer
:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果需要,您还可以添加更多标题。 以下示例显示了如何执行此作:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当您使用@SendTo
,您必须配置ConcurrentKafkaListenerContainerFactory
替换为KafkaTemplate
在其replyTemplate
属性来执行发送。
Spring Boot 将自动连接其自动配置的模板(如果存在单个实例,则为任何模板)。
除非你使用请求/回复语义,否则只有简单的send(topic, value) 方法,因此您可能希望创建一个子类来生成分区或键。
以下示例显示了如何执行此作: |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
如果 listener 方法返回
|
当使用请求/回复语义时,发送方可以请求目标分区。
您可以对
有关更多信息,请参阅处理异常。 |
如果侦听器方法返回Iterable ,默认情况下,将发送每个元素的一条记录作为值。
从版本 2.3.5 开始,将splitIterables 属性@KafkaListener 自false 整个结果将作为单个ProducerRecord .
这需要在回复模板的 producer 配置中使用合适的序列化器。
但是,如果回复是Iterable<Message<?>> 该属性将被忽略,每条消息将单独发送。 |