此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

使用反应式 Kafka Binder 的基本示例

在本节中,我们将展示一些使用反应式 Binder 编写反应式 Kafka 应用程序的基本代码片段以及相关详细信息。spring-doc.cadn.net.cn

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

您可以使用上述upppercase函数,其中包含基于消息通道的 Kafka 绑定器 (spring-cloud-stream-binder-kafka) 以及反应式 Kafka Binder (spring-cloud-stream-binder-kafka-reactive),这是本节讨论的主题。 当将此函数与常规 Kafka Binder 一起使用时,尽管您在应用程序中使用反应式类型(即在uppercasefunction),则您只能在函数执行过程中获取反应流。 在函数的执行上下文之外,没有反应式的好处,因为底层 Binders 不是基于反应式堆栈的。 因此,尽管这看起来像是带来了一个完整的端到端反应式堆栈,但此应用程序只是部分反应式的。spring-doc.cadn.net.cn

现在假设你正在为 Kafka 使用适当的反应式 Binder -spring-cloud-stream-binder-kafka-reactive与上述函数的应用程序。 这个 binder 实现将提供完整的反应式好处,从链的顶端消费到底层的发布。 这是因为底层 Binder 构建在 Reactor Kafka 的核心 API 之上。 在消费者方面,它使用 KafkaReceiver,它是 Kafka 消费者的反应式实现。 同样,在生产者方面,它使用 KafkaSender API,这是 Kafka 生产者的反应式实现。 由于反应式 Kafka Binder 的基础建立在适当的反应式 Kafka API 之上,因此应用程序可以获得使用反应式技术的全部好处。 使用此反应式 Kafka Binder 时,应用程序内置了自动背压等功能。spring-doc.cadn.net.cn

从版本 4.0.2 开始,您可以自定义ReceiverOptionsSenderOptions通过提供一个或多个ReceiverOptionsCustomizerSenderOptionsCustomizerbeans 的 bean 中。 他们是BiFunctions 接收绑定名称和初始选项,并返回自定义选项。 接口扩展Ordered因此,当存在多个定制器时,将按所需的顺序应用定制器。spring-doc.cadn.net.cn

默认情况下,Binder 不会提交偏移量。 从版本 4.0.2 开始,KafkaHeaders.ACKNOWLEDGMENTheader 包含一个ReceiverOffset对象,它允许您通过调用其acknowledge()commit()方法。
@Bean
public Consumer<Flux<Message<String>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

请参阅reactor-kafka文档和 JavaDocs 了解更多信息。spring-doc.cadn.net.cn

此外,从版本 4.0.3 开始,Kafka 使用者属性reactiveAtmostOnce可以设置为trueBinder 将在处理每次轮询返回的记录之前自动提交偏移量。 此外,从版本 4.0.3 开始,您可以设置 consumer 属性reactiveAutoCommittrueBinder 将在处理每次轮询返回的记录后自动提交偏移量。 在这些情况下,确认标头不存在。spring-doc.cadn.net.cn

4.0.2 也提供了reactiveAutoCommit,但实现不正确,它的行为类似于reactiveAtMostOnce.

以下是如何使用reaciveAutoCommit.spring-doc.cadn.net.cn

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

请注意,reactor-kafka返回Flux<Flux<ConsumerRecord<?, ?>>>使用 Auto Commit 时。 鉴于 Spring 无法访问 inner flux 的内容,因此应用程序必须处理本机ConsumerRecord;没有对内容应用消息转换或转换服务。 这需要使用本机解码(通过指定Deserializer)返回所需类型的记录键/值。spring-doc.cadn.net.cn