此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

编程模型的辅助设备

单个应用程序中的多个 Kafka Streams 处理器

Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。 您可以按以下方式进行申请。spring-doc.cadn.net.cn

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,Binder 将创建 3 个具有不同应用程序 ID 的单独 Kafka Streams 对象(更多内容见下文)。 但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream,需要激活哪些功能。 以下是激活功能的方法。spring-doc.cadn.net.cn

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcessspring-doc.cadn.net.cn

如果您希望某些功能不立即激活,您可以将其从此列表中删除。spring-doc.cadn.net.cn

当您拥有单个 Kafka Streams 处理器和其他类型的Function同一应用程序中的 bean,通过不同的 Binders 处理(例如,基于常规 Kafka Message Channel Binders 的函数 Bean)spring-doc.cadn.net.cn

Kafka Streams 应用程序 ID

应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必需属性。 Spring Cloud Stream Kafka Streams Binder 允许您以多种方式配置此应用程序 ID。spring-doc.cadn.net.cn

如果应用程序中只有一个处理器,则可以使用以下属性在 Binder 级别进行设置:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.applicationId.spring-doc.cadn.net.cn

为方便起见,如果您只有一个处理器,您还可以使用spring.application.name作为属性来委托应用程序 ID。spring-doc.cadn.net.cn

如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。 对于函数模型,您可以将其作为属性附加到每个函数。spring-doc.cadn.net.cn

例如,假设您有以下功能。spring-doc.cadn.net.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后,您可以使用以下 Binder 级别属性为每个应用程序设置应用程序 ID。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.functions.process.applicationIdspring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationIdspring-doc.cadn.net.cn

对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。 但是,如果您使用的是函数模型,则在上面看到的 binder 级别设置每个函数要容易得多。spring-doc.cadn.net.cn

对于生产部署,强烈建议通过配置显式指定应用程序 ID。 如果您要自动扩展应用程序,这一点尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。spring-doc.cadn.net.cn

如果应用程序未提供应用程序 ID,则在这种情况下,Binder 将为您自动生成静态应用程序 ID。 这在开发方案中很方便,因为它避免了显式提供应用程序 ID 的需要。 以这种方式生成的应用程序 ID 在应用程序重启时将是静态的。 对于函数模型,生成的应用程序 ID 将是函数 Bean 名称,后跟文字applicationID,例如process-applicationID如果process如果函数 Bean Name.spring-doc.cadn.net.cn

应用程序 ID 设置摘要

  • 默认情况下,Binder 将为每个函数方法自动生成应用程序 ID。spring-doc.cadn.net.cn

  • 如果您有单个处理器,则可以使用spring.kafka.streams.applicationId,spring.application.namespring.cloud.stream.kafka.streams.binder.applicationId.spring-doc.cadn.net.cn

  • 如果您有多个处理器,则可以使用属性 -spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId.spring-doc.cadn.net.cn

使用函数式样式覆盖 Binder 生成的默认绑定名称

默认情况下,当使用函数式样式时,Binders 使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。 如果要覆盖这些绑定名称,可以通过指定以下属性来实现。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.<default binding name>.默认绑定名称是 Binder 生成的原始绑定名称。spring-doc.cadn.net.cn

例如,假设您有这个函数。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 将生成带有名称process-in-0,process-in-1process-out-0. 现在,如果你想把它们完全改成别的,也许是更多特定于域的绑定名称,那么你可以按如下方式进行。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-in-0=usersspring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-in-0=regionsspring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-out-0=clicksspring-doc.cadn.net.cn

之后,您必须在这些新绑定名称上设置所有绑定级别属性。spring-doc.cadn.net.cn

请记住,对于上述函数式编程模型,在大多数情况下,遵守默认绑定名称是有意义的。 您可能仍然希望进行此覆盖的唯一原因是,当您拥有大量配置属性并且希望将绑定映射到对域更友好的内容时。spring-doc.cadn.net.cn

设置 Bootstrap 服务器配置

运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。 如果您不提供此信息,则 Binder 会期望您以默认localhost:9092. 如果不是这种情况,那么您需要覆盖它。有几种方法可以做到这一点。spring-doc.cadn.net.cn

当涉及到 Binder 级别属性时,您是否使用通过常规 Kafka Binder 提供的 broker 属性并不重要 -spring.cloud.stream.kafka.binder.brokers. Kafka Streams Binder 将首先检查是否设置了 Kafka Streams Binder 特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers.spring-doc.cadn.net.cn