此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

配置选项

本节包含 Kafka Streams Binders 使用的配置选项。spring-doc.cadn.net.cn

有关与 Binder 相关的常见配置选项和属性,请参阅核心文档spring-doc.cadn.net.cn

Kafka Streams Binder 属性

以下属性在 Binder 级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.在 Kafka Streams Binder 中重复使用的任何 Kafka Binder 提供的属性都必须以spring.cloud.stream.kafka.streams.binder而不是spring.cloud.stream.kafka.binder. 此规则的唯一例外是在定义 Kafka 引导服务器属性时,在这种情况下,任一前缀都有效。spring-doc.cadn.net.cn

配置

Map 包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以spring.cloud.stream.kafka.streams.binder.. 以下是使用此属性的一些示例。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfigApache Kafka Streams 文档中的 JavaDocs。 您可以从中设置的所有配置StreamsConfig可以通过此设置。 使用此属性时,它适用于整个应用程序,因为这是 Binder 级别属性。 如果应用程序中有多个处理器,则所有处理器都将获取这些属性。 对于像application.id,这将成为问题,因此您必须仔细检查属性StreamsConfig使用此 Binder 级别进行映射configuration财产。spring-doc.cadn.net.cn

functions.<function-bean-name>.applicationId

仅适用于功能式处理器。 这可用于设置应用程序中每个函数的应用程序 ID。 在有多个函数的情况下,这是设置应用程序 ID 的便捷方法。spring-doc.cadn.net.cn

functions.<function-bean-name>.configuration

仅适用于功能式处理器。 Map 包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于 Binder 级别configuration属性,但此级别的configuration属性仅针对命名函数进行限制。 当您有多个处理器并且想要根据特定功能限制对配置的访问时,您可能希望使用它。 都StreamsConfigproperties 可以在此处使用。spring-doc.cadn.net.cn

经纪人

代理 URLspring-doc.cadn.net.cn

违约:localhostspring-doc.cadn.net.cn

zk节点

Zookeeper 网址spring-doc.cadn.net.cn

违约:localhostspring-doc.cadn.net.cn

deserializationExceptionHandler

反序列化错误处理程序类型。 此处理程序在 Binder 级别应用,因此应用于应用程序中的所有 input 绑定。 有一种方法可以在 Consumer 绑定级别以更精细的方式控制它。 可能的值为 -logAndContinue,logAndFail,skipAndContinuesendToDlqspring-doc.cadn.net.cn

违约:logAndFailspring-doc.cadn.net.cn

应用程序 ID

在 Binder 级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个功能,则应以不同的方式设置应用程序 ID。 请参阅上文,其中详细讨论了设置应用程序 ID。spring-doc.cadn.net.cn

Default:application 将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。spring-doc.cadn.net.cn

stateStoreRetry.maxAttempts

尝试连接到状态存储的最大尝试次数。spring-doc.cadn.net.cn

默认值:1spring-doc.cadn.net.cn

stateStoreRetry.backoffPeriod

重试时尝试连接到 state store 时的 Backoff period。spring-doc.cadn.net.cn

默认值:1000 毫秒spring-doc.cadn.net.cn

consumer属性

Binder 级别的任意使用者属性。spring-doc.cadn.net.cn

producer属性

Binder 级别的任意 producer 属性。spring-doc.cadn.net.cn

includeStoppedProcessorsForHealthCheck

当处理器的 bindings 通过 actuator 停止时,默认情况下,此处理器将不参与健康检查。 将此属性设置为true为所有处理器启用运行状况检查,包括当前通过 Bindings Actuator 端点停止的处理器。spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

Kafka Streams 创建者属性

以下属性仅适用于 Kafka Streams 创建者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.为方便起见,如果有多个输出绑定,并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer..spring-doc.cadn.net.cn

keySerde

要使用的 Key Serdespring-doc.cadn.net.cn

默认:请参阅上面关于消息取消/序列化的讨论spring-doc.cadn.net.cn

valueSerde

value serde to usespring-doc.cadn.net.cn

默认:请参阅上面关于消息取消/序列化的讨论spring-doc.cadn.net.cn

useNativeEncoding

启用/禁用本机编码的标志spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

streamPartitionerBeanName

要在使用者处使用的自定义出站分区器 bean 名称。 应用程序可以提供自定义StreamPartitioner作为 Spring bean,并且此 bean 的名称可以提供给生产者使用,而不是默认名称。spring-doc.cadn.net.cn

Default:请参阅上面关于出站分区支持的讨论。spring-doc.cadn.net.cn

produced作为

处理器生成到的 sink 组件的自定义名称。spring-doc.cadn.net.cn

聋子:none(由 Kafka Streams 生成)spring-doc.cadn.net.cn

Kafka Streams 消费者属性

以下属性可供 Kafka Streams 使用者使用,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.为方便起见,如果有多个 input 绑定,并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer..spring-doc.cadn.net.cn

应用程序 ID

设置 application.id per input binding。spring-doc.cadn.net.cn

默认值:见上文。spring-doc.cadn.net.cn

keySerde

要使用的 Key Serdespring-doc.cadn.net.cn

默认:请参阅上面关于消息取消/序列化的讨论spring-doc.cadn.net.cn

valueSerde

value serde to usespring-doc.cadn.net.cn

默认:请参阅上面关于消息取消/序列化的讨论spring-doc.cadn.net.cn

materialized作为

state store 在使用传入的 KTable 类型时实现spring-doc.cadn.net.cn

违约:none.spring-doc.cadn.net.cn

useNative解码

启用/禁用本机解码的标志spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

dlq名称

DLQ 主题名称。spring-doc.cadn.net.cn

默认值:请参阅上面对错误处理和 DLQ 的讨论。spring-doc.cadn.net.cn

startOffset

如果没有要消耗的已提交偏移量,则从 offset 开始。 这主要在使用者第一次使用某个主题时使用。 Kafka Streams 使用earliest作为 default 策略,并且 Binder 使用相同的 default。 这可以被覆盖为latest使用此属性。spring-doc.cadn.net.cn

违约:earliest.spring-doc.cadn.net.cn

注意:使用resetOffsets对 Consumer 没有任何影响 Kafka Streams Binder。 与基于消息通道的 Binder 不同,Kafka Streams Binder 不会按需开始或结束。spring-doc.cadn.net.cn

deserializationExceptionHandler

反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面描述的 Binder 级别属性。 可能的值为 -logAndContinue,logAndFail,skipAndContinuesendToDlqspring-doc.cadn.net.cn

违约:logAndFailspring-doc.cadn.net.cn

时间戳ExtractorBeanName

要在使用者处使用的特定时间戳提取器 bean 名称。 应用程序可以提供TimestampExtractor作为 Spring bean,并且这个 bean 的名称可以提供给消费者使用,而不是默认的。spring-doc.cadn.net.cn

Default:请参阅上面关于时间戳提取器的讨论。spring-doc.cadn.net.cn

eventTypes 事件类型

此绑定支持的事件类型的逗号分隔列表。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

eventTypeHeaderKey 事件类型

Event type 标头键。spring-doc.cadn.net.cn

违约:event_typespring-doc.cadn.net.cn

consumedAs

处理器从其中使用的源组件的自定义名称。spring-doc.cadn.net.cn

聋子:none(由 Kafka Streams 生成)spring-doc.cadn.net.cn

关于并发的特别说明

在 Kafka Streams 中,您可以使用num.stream.threads财产。 为此,您可以使用各种configuration上面在 Binder、Functions、Producer 或 Consumer Level 下描述的选项。 您还可以使用concurrency核心 Spring Cloud Stream 为此目的提供的属性。 使用时,您需要在消费者上使用它。 当您有多个 input binding(输入绑定)时,请在第一个 input binding(输入绑定)上设置此项。 例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency,它将被翻译为num.stream.threads由 Binder 提供。 如果您有多个处理器,并且一个处理器定义了绑定级别并发,但其他处理器没有定义,则那些没有绑定级别并发的处理器将默认恢复为通过spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads. 如果此 Binder 配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。spring-doc.cadn.net.cn