此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
编程模型的辅助设备
单个应用程序中的多个 Kafka Streams 处理器
Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。 您可以按以下方式进行申请。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,Binder 将创建 3 个具有不同应用程序 ID 的单独 Kafka Streams 对象(更多内容见下文)。 但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream,需要激活哪些功能。 以下是激活功能的方法。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
如果您希望某些功能不立即激活,您可以将其从此列表中删除。
当您拥有单个 Kafka Streams 处理器和其他类型的Function
同一应用程序中的 bean,通过不同的 Binders 处理(例如,基于常规 Kafka Message Channel Binders 的函数 Bean)
Kafka Streams 应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必需属性。 Spring Cloud Stream Kafka Streams Binder 允许您以多种方式配置此应用程序 ID。
如果应用程序中只有一个处理器,则可以使用以下属性在 Binder 级别进行设置:
spring.cloud.stream.kafka.streams.binder.applicationId
.
为方便起见,如果您只有一个处理器,您还可以使用spring.application.name
作为属性来委托应用程序 ID。
如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。 对于函数模型,您可以将其作为属性附加到每个函数。
例如,假设您有以下功能。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后,您可以使用以下 Binder 级别属性为每个应用程序设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。 但是,如果您使用的是函数模型,则在上面看到的 binder 级别设置每个函数要容易得多。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。 如果您要自动扩展应用程序,这一点尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。
如果应用程序未提供应用程序 ID,则在这种情况下,Binder 将为您自动生成静态应用程序 ID。
这在开发方案中很方便,因为它避免了显式提供应用程序 ID 的需要。
以这种方式生成的应用程序 ID 在应用程序重启时将是静态的。
对于函数模型,生成的应用程序 ID 将是函数 Bean 名称,后跟文字applicationID
,例如process-applicationID
如果process
如果函数 Bean Name.
应用程序 ID 设置摘要
-
默认情况下,Binder 将为每个函数方法自动生成应用程序 ID。
-
如果您有单个处理器,则可以使用
spring.kafka.streams.applicationId
,spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
. -
如果您有多个处理器,则可以使用属性 -
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
.
使用函数式样式覆盖 Binder 生成的默认绑定名称
默认情况下,当使用函数式样式时,Binders 使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。 如果要覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>
.默认绑定名称是 Binder 生成的原始绑定名称。
例如,假设您有这个函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 将生成带有名称process-in-0
,process-in-1
和process-out-0
.
现在,如果你想把它们完全改成别的,也许是更多特定于域的绑定名称,那么你可以按如下方式进行。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,您必须在这些新绑定名称上设置所有绑定级别属性。
请记住,对于上述函数式编程模型,在大多数情况下,遵守默认绑定名称是有意义的。 您可能仍然希望进行此覆盖的唯一原因是,当您拥有大量配置属性并且希望将绑定映射到对域更友好的内容时。
设置 Bootstrap 服务器配置
运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。
如果您不提供此信息,则 Binder 会期望您以默认localhost:9092
.
如果不是这种情况,那么您需要覆盖它。有几种方法可以做到这一点。
-
使用 boot 属性 -
spring.kafka.bootstrapServers
-
Binder 级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
当涉及到 Binder 级别属性时,您是否使用通过常规 Kafka Binder 提供的 broker 属性并不重要 -spring.cloud.stream.kafka.binder.brokers
.
Kafka Streams Binder 将首先检查是否设置了 Kafka Streams Binder 特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers
),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers
.