此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

具有基于 Kafka Streams 的 Binders 和常规 Kafka Binder 的多 Binders

您可以有一个应用程序,其中同时具有基于常规 Kafka Binder 的函数/使用者/供应商和基于 Kafka Streams 的处理器。 但是,您不能在单个函数或使用者中混合使用它们。spring-doc.cadn.net.cn

下面是一个示例,其中同一应用程序中有两个基于 Binder 的组件。spring-doc.cadn.net.cn

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

以下是配置中的相关部分:spring-doc.cadn.net.cn

spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果你有与上述相同的应用程序,但正在处理两个不同的 Kafka 集群,例如常规的process同时作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器作用于 Kafka 集群 2。 然后,您必须使用 Spring Cloud Stream 提供的多 Binder 工具。spring-doc.cadn.net.cn

以下是您的配置在这种情况下可能会发生的变化。spring-doc.cadn.net.cn

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

注意上述配置。 我们有两种 Binder,但总共有 3 个 Binder,第一种是基于集群 1 的常规 Kafka Binder(kafka1),然后是另一个基于集群 2 的 Kafka Binder (kafka2),最后是kstream一 (kafka3). 应用程序中的第一个处理器从kafka1并发布到kafka2其中,两个 Binders 都基于常规的 Kafka Binder,但集群不同。 第二个处理器是 Kafka Streams 处理器,它使用来自kafka3它与kafka2,但 Binder 类型不同。spring-doc.cadn.net.cn

由于 Kafka Streams 系列活页夹中提供了三种不同的活页夹类型: -kstream,ktableglobalktable- 如果您的应用程序具有基于这些 Binder 中的任意一个的多个绑定,则需要将其显式提供为 Binder 类型。spring-doc.cadn.net.cn

例如,如果您有如下处理器,spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

然后,必须在多 Binder 场景中配置此 ID,如下所示。 请注意,只有当您有一个真正的多 Binder 方案,其中有多个处理器在单个应用程序中处理多个集群时,才需要这样做。 在这种情况下,需要显式地为 Binders 提供 bindings,以区别于其他处理器的 Binder 类型和集群。spring-doc.cadn.net.cn

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.