此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
配置选项
本节包含 Kafka Streams Binders 使用的配置选项。
有关与 Binder 相关的常见配置选项和属性,请参阅核心文档。
Kafka Streams Binder 属性
以下属性在 Binder 级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.
在 Kafka Streams Binder 中重复使用的任何 Kafka Binder 提供的属性都必须以spring.cloud.stream.kafka.streams.binder
而不是spring.cloud.stream.kafka.binder
.
此规则的唯一例外是在定义 Kafka 引导服务器属性时,在这种情况下,任一前缀都有效。
- 配置
-
Map 包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以
spring.cloud.stream.kafka.streams.binder.
. 以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfig
Apache Kafka Streams 文档中的 JavaDocs。
您可以从中设置的所有配置StreamsConfig
可以通过此设置。
使用此属性时,它适用于整个应用程序,因为这是 Binder 级别属性。
如果应用程序中有多个处理器,则所有处理器都将获取这些属性。
对于像application.id
,这将成为问题,因此您必须仔细检查属性StreamsConfig
使用此 Binder 级别进行映射configuration
财产。
- functions.<function-bean-name>.applicationId
-
仅适用于功能式处理器。 这可用于设置应用程序中每个函数的应用程序 ID。 在有多个函数的情况下,这是设置应用程序 ID 的便捷方法。
- functions.<function-bean-name>.configuration
-
仅适用于功能式处理器。 Map 包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于 Binder 级别
configuration
属性,但此级别的configuration
属性仅针对命名函数进行限制。 当您有多个处理器并且想要根据特定功能限制对配置的访问时,您可能希望使用它。 都StreamsConfig
properties 可以在此处使用。 - 经纪人
-
代理 URL
违约:
localhost
- zk节点
-
Zookeeper 网址
违约:
localhost
- deserializationExceptionHandler
-
反序列化错误处理程序类型。 此处理程序在 Binder 级别应用,因此应用于应用程序中的所有 input 绑定。 有一种方法可以在 Consumer 绑定级别以更精细的方式控制它。 可能的值为 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
违约:
logAndFail
- 应用程序 ID
-
在 Binder 级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个功能,则应以不同的方式设置应用程序 ID。 请参阅上文,其中详细讨论了设置应用程序 ID。
Default:application 将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大尝试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接到 state store 时的 Backoff period。
默认值:1000 毫秒
- consumer属性
-
Binder 级别的任意使用者属性。
- producer属性
-
Binder 级别的任意 producer 属性。
- includeStoppedProcessorsForHealthCheck
-
当处理器的 bindings 通过 actuator 停止时,默认情况下,此处理器将不参与健康检查。 将此属性设置为
true
为所有处理器启用运行状况检查,包括当前通过 Bindings Actuator 端点停止的处理器。默认值:false
Kafka Streams 创建者属性
以下属性仅适用于 Kafka Streams 创建者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为方便起见,如果有多个输出绑定,并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.
.
- keySerde
-
要使用的 Key Serde
默认:请参阅上面关于消息取消/序列化的讨论
- valueSerde
-
value serde to use
默认:请参阅上面关于消息取消/序列化的讨论
- useNativeEncoding
-
启用/禁用本机编码的标志
违约:
true
. - streamPartitionerBeanName
-
要在使用者处使用的自定义出站分区器 bean 名称。 应用程序可以提供自定义
StreamPartitioner
作为 Spring bean,并且此 bean 的名称可以提供给生产者使用,而不是默认名称。Default:请参阅上面关于出站分区支持的讨论。
- produced作为
-
处理器生成到的 sink 组件的自定义名称。
聋子:
none
(由 Kafka Streams 生成)
Kafka Streams 消费者属性
以下属性可供 Kafka Streams 使用者使用,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为方便起见,如果有多个 input 绑定,并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.
.
- 应用程序 ID
-
设置 application.id per input binding。
默认值:见上文。
- keySerde
-
要使用的 Key Serde
默认:请参阅上面关于消息取消/序列化的讨论
- valueSerde
-
value serde to use
默认:请参阅上面关于消息取消/序列化的讨论
- materialized作为
-
state store 在使用传入的 KTable 类型时实现
违约:
none
. - useNative解码
-
启用/禁用本机解码的标志
违约:
true
. - dlq名称
-
DLQ 主题名称。
默认值:请参阅上面对错误处理和 DLQ 的讨论。
- startOffset
-
如果没有要消耗的已提交偏移量,则从 offset 开始。 这主要在使用者第一次使用某个主题时使用。 Kafka Streams 使用
earliest
作为 default 策略,并且 Binder 使用相同的 default。 这可以被覆盖为latest
使用此属性。违约:
earliest
.
注意:使用resetOffsets
对 Consumer 没有任何影响 Kafka Streams Binder。
与基于消息通道的 Binder 不同,Kafka Streams Binder 不会按需开始或结束。
- deserializationExceptionHandler
-
反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面描述的 Binder 级别属性。 可能的值为 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
违约:
logAndFail
- 时间戳ExtractorBeanName
-
要在使用者处使用的特定时间戳提取器 bean 名称。 应用程序可以提供
TimestampExtractor
作为 Spring bean,并且这个 bean 的名称可以提供给消费者使用,而不是默认的。Default:请参阅上面关于时间戳提取器的讨论。
- eventTypes 事件类型
-
此绑定支持的事件类型的逗号分隔列表。
违约:
none
- eventTypeHeaderKey 事件类型
-
Event type 标头键。
违约:
event_type
- consumedAs
-
处理器从其中使用的源组件的自定义名称。
聋子:
none
(由 Kafka Streams 生成)
关于并发的特别说明
在 Kafka Streams 中,您可以使用num.stream.threads
财产。
为此,您可以使用各种configuration
上面在 Binder、Functions、Producer 或 Consumer Level 下描述的选项。
您还可以使用concurrency
核心 Spring Cloud Stream 为此目的提供的属性。
使用时,您需要在消费者上使用它。
当您有多个 input binding(输入绑定)时,请在第一个 input binding(输入绑定)上设置此项。
例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency
,它将被翻译为num.stream.threads
由 Binder 提供。
如果您有多个处理器,并且一个处理器定义了绑定级别并发,但其他处理器没有定义,则那些没有绑定级别并发的处理器将默认恢复为通过spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
.
如果此 Binder 配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。