此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
使用反应式 Kafka Binder 的基本示例
在本节中,我们将展示一些使用反应式 Binder 编写反应式 Kafka 应用程序的基本代码片段以及相关详细信息。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以使用上述upppercase
函数,其中包含基于消息通道的 Kafka 绑定器 (spring-cloud-stream-binder-kafka
) 以及反应式 Kafka Binder (spring-cloud-stream-binder-kafka-reactive
),这是本节讨论的主题。
当将此函数与常规 Kafka Binder 一起使用时,尽管您在应用程序中使用反应式类型(即在uppercase
function),则您只能在函数执行过程中获取反应流。
在函数的执行上下文之外,没有反应式的好处,因为底层 Binders 不是基于反应式堆栈的。
因此,尽管这看起来像是带来了一个完整的端到端反应式堆栈,但此应用程序只是部分反应式的。
现在假设你正在为 Kafka 使用适当的反应式 Binder -spring-cloud-stream-binder-kafka-reactive
与上述函数的应用程序。
这个 binder 实现将提供完整的反应式好处,从链的顶端消费到底层的发布。
这是因为底层 Binder 构建在 Reactor Kafka 的核心 API 之上。
在消费者方面,它使用 KafkaReceiver,它是 Kafka 消费者的反应式实现。
同样,在生产者方面,它使用 KafkaSender API,这是 Kafka 生产者的反应式实现。
由于反应式 Kafka Binder 的基础建立在适当的反应式 Kafka API 之上,因此应用程序可以获得使用反应式技术的全部好处。
使用此反应式 Kafka Binder 时,应用程序内置了自动背压等功能。
从版本 4.0.2 开始,您可以自定义ReceiverOptions
和SenderOptions
通过提供一个或多个ReceiverOptionsCustomizer
或SenderOptionsCustomizer
beans 的 bean 中。
他们是BiFunction
s 接收绑定名称和初始选项,并返回自定义选项。
接口扩展Ordered
因此,当存在多个定制器时,将按所需的顺序应用定制器。
默认情况下,Binder 不会提交偏移量。
从版本 4.0.2 开始,KafkaHeaders.ACKNOWLEDGMENT header 包含一个ReceiverOffset 对象,它允许您通过调用其acknowledge() 或commit() 方法。 |
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
请参阅reactor-kafka
文档和 JavaDocs 了解更多信息。
此外,从版本 4.0.3 开始,Kafka 使用者属性reactiveAtmostOnce
可以设置为true
Binder 将在处理每次轮询返回的记录之前自动提交偏移量。
此外,从版本 4.0.3 开始,您可以设置 consumer 属性reactiveAutoCommit
自true
Binder 将在处理每次轮询返回的记录后自动提交偏移量。
在这些情况下,确认标头不存在。
4.0.2 也提供了reactiveAutoCommit ,但实现不正确,它的行为类似于reactiveAtMostOnce . |
以下是如何使用reaciveAutoCommit
.
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,reactor-kafka
返回Flux<Flux<ConsumerRecord<?, ?>>>
使用 Auto Commit 时。
鉴于 Spring 无法访问 inner flux 的内容,因此应用程序必须处理本机ConsumerRecord
;没有对内容应用消息转换或转换服务。
这需要使用本机解码(通过指定Deserializer
)返回所需类型的记录键/值。