参考指南
本指南介绍了 Spring Cloud Stream Binder 的 Apache Kafka 实现。 它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定结构的信息。 此外,本指南还介绍了 Spring Cloud Stream 的 Kafka Streams 绑定能力。
1. Apache Kafka 活页夹
1.1. 用法
要使用 Apache Kafka 活页夹,您需要添加spring-cloud-stream-binder-kafka
作为 Spring Cloud Stream 应用程序的依赖项,如以下示例中的 Maven 所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream Kafka Starter,如下面的 Maven 示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下图显示了 Apache Kafka Binder 如何运行的简化图:

Apache Kafka Binder 实现将每个目标映射到 Apache Kafka 主题。 使用者组直接映射到相同的 Apache Kafka 概念。 分区也直接映射到 Apache Kafka 分区。
该 Binder 当前使用 Apache Kafkakafka-clients
版本2.3.1
.
此客户端可以与较旧的代理通信(请参阅 Kafka 文档),但某些功能可能不可用。
例如,对于早于 0.11.x.x 的版本,不支持本机标头。
此外,0.11.x.x 不支持autoAddPartitions
财产。
1.3. 配置选项
本节包含 Apache Kafka 活页夹使用的配置选项。
有关与 Binder 相关的常见配置选项和属性,请参阅核心文档。
1.3.1. Kafka Binder 属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka Binder 连接到的代理列表。
违约:
localhost
. - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
允许指定带或不带端口信息的主机(例如host1,host2:port2
). 这将在 broker 列表中未配置端口时设置默认端口。违约:
9092
. - spring.cloud.stream.kafka.binder.configuration
-
传递给 Binder 创建的所有客户端的客户端属性(生产者和使用者)的键/值映射。 由于这些属性由生产者和使用者都使用,因此应仅限于通用属性(例如,安全性设置)。 通过此配置提供的未知 Kafka 生产者或使用者属性将被过滤掉,不允许传播。 这里的 properties 取代了 boot 中设置的任何 properties。
默认值:空地图。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客户端使用者属性的键/值映射。 除了支持已知的 Kafka 使用者属性外,这里还允许未知的使用者属性。 这里的属性会取代在 boot 和
configuration
属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.headers
-
由 Binder 传输的自定义标头的列表。 仅当⇐使用
kafka-clients
版本 < 0.11.0.0。较新版本本身支持标头。默认值:空。
- spring.cloud.stream.kafka.binder.healthTimeout的
-
等待获取分区信息的时间,以秒为单位。 如果此计时器过期,则运行状况报告为 down。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的 ack 数。 请参阅创建者的 Kafka 文档
acks
财产。违约:
1
. - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在以下情况下有效
autoCreateTopics
或autoAddPartitions
已设置。 Binder 在其生成或使用数据的主题上配置的全局最小分区数。 它可以被partitionCount
设置或instanceCount * concurrency
创建器的设置(如果其中一个更大)。违约:
1
. - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,这里还允许未知的生产者属性。 这里的属性会取代在 boot 和
configuration
属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics
处于活动状态。 可以在每个绑定上覆盖。违约:
1
. - spring.cloud.stream.kafka.binder.autoCreate主题
-
如果设置为
true
,Binder 会自动创建新主题。 如果设置为false
,则 Binder 依赖于已配置的主题。 在后一种情况下,如果主题不存在,则 Binder 无法启动。此设置独立于 auto.create.topics.enable
设置,并且不会影响它。 如果服务器设置为自动创建主题,则可以使用默认代理设置将它们作为元数据检索请求的一部分创建。违约:
true
. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true
,Binder 会根据需要创建新分区。 如果设置为false
,则 Binder 依赖于已配置的主题的分区大小。 如果目标主题的分区计数小于预期值,则 Binder 无法启动。违约:
false
. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在 Binder 中启用事务。看
transaction.id
和 Kafka 文档中的 Transactionsspring-kafka
文档。 启用事务后,单个producer
属性将被忽略,并且所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*
性能。违约
null
(无交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务 Binder 中生成者的全局生产者属性。 看
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
以及 Kafka Producer Properties 以及所有 Binders 支持的常规 producer 属性。默认值:请参阅各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
一个 bean 的 bean 名称
KafkaHeaderMapper
用于映射spring-messaging
标头与 Kafka 标头之间的匹配。 例如,如果您希望在BinderHeaderMapper
对 Headers 使用 JSON 反序列化的 bean。 如果此自定义BinderHeaderMapper
Bean 不可用于使用此属性的 Binder,则 Binder 将查找名称为kafkaBinderHeaderMapper
那是BinderHeaderMapper
在回退到默认值之前BinderHeaderMapper
由 Binder 创建。默认值:none。
1.3.2. Kafka Consumer 属性
为避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.consumer.<property>=<value> . |
以下属性仅适用于 Kafka 使用者,并且
必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.
.
- admin.configuration 的
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties
,并且在未来版本中将删除对它的支持。 - admin.replicas-分配
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment
,并且在未来版本中将删除对它的支持。 - admin.replication-factor (管理员复制因子)
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor
,并且在未来版本中将删除对它的支持。 - autoRebalance已启用
-
什么时候
true
,主题分区将在使用者组的成员之间自动重新平衡。 什么时候false
,每个使用者都会根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
. 这需要spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
属性。 的spring.cloud.stream.instanceCount
在这种情况下,property 通常必须大于 1。违约:
true
. - ackEachRecord 的
-
什么时候
autoCommitOffset
是true
,此设置指示是否在处理每条记录后提交偏移量。 默认情况下,偏移量在consumer.poll()
已处理。 轮询返回的记录数可以通过max.poll.records
Kafka 属性,该属性通过使用者configuration
财产。 将此项设置为true
可能会导致性能下降,但这样做会降低发生故障时重新传送记录的可能性。 另请参阅 BinderrequiredAcks
property,这也会影响 commit offset 的性能。违约:
false
. - autoCommitOffset
-
是否在处理消息时自动提交偏移量。 如果设置为
false
、带有键kafka_acknowledgment
的类型org.springframework.kafka.support.Acknowledgment
header 出现在入站邮件中。 应用程序可以使用此标头来确认消息。 有关详细信息,请参阅 examples 部分。 当此属性设置为false
,Kafka Binder 将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
应用程序负责确认记录。 另请参阅ackEachRecord
.违约:
true
. - autoCommitOnError
-
仅在以下情况下有效
autoCommitOffset
设置为true
. 如果设置为false
,它会抑制对导致错误的邮件的自动提交,并仅对成功的邮件进行提交。它允许流在持续失败的情况下自动从上次成功处理的消息重播。 如果设置为true
,它始终自动提交(如果启用了自动提交)。 如果未设置(默认值),则它实际上具有与enableDlq
,如果错误消息被发送到 DLQ,则自动提交错误消息,否则不提交错误消息。默认值:未设置。
- resetOffsets
-
是否将 Consumer 的 Offset 重置为 startOffset 提供的值。 如果
KafkaRebalanceListener
提供;参见使用 KafkaRebalanceListener。违约:
false
. - startOffset
-
新组的起始偏移量。 允许的值:
earliest
和latest
. 如果为使用者 'binding' 显式设置了使用者组(通过spring.cloud.stream.bindings.<channelName>.group
),则 'startOffset' 设置为earliest
.否则,它设置为latest
对于anonymous
Consumer 组。 另请参阅resetOffsets
(在此列表的前面)。默认值:null(相当于
earliest
). - enableDlq
-
当设置为 true 时,它将为使用者启用 DLQ 行为。 默认情况下,导致错误的消息将转发到名为
error.<destination>.<group>
. DLQ 主题名称可以通过设置dlqName
财产。 这为更常见的 Kafka 重放场景提供了一个替代选项,用于错误数量相对较少并且重放整个原始主题可能太麻烦的情况。 有关更多信息,请参阅 死信主题处理 。 从版本 2.0 开始,发送到 DLQ 主题的消息通过以下标头进行了增强:x-original-topic
,x-exception-message
和x-exception-stacktrace
如byte[]
. 默认情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。 有关如何更改该行为,请参阅 Dead-Letter Topic Partition Selection 。不允许destinationIsPattern
是true
.违约:
false
. - dlq分区
-
什么时候
enableDlq
为 true,并且未设置此属性,则会创建一个分区数与主主题相同数量的死信主题。 通常,死信记录会发送到与原始记录相同的死信主题中的分区。 此行为可以更改;请参阅 Dead-Letter Topic Partition Selection。 如果此属性设置为1
并且没有DqlPartitionFunction
bean,则所有死信记录都将写入 partition0
. 如果此属性大于1
,您必须提供DlqPartitionFunction
豆。 请注意,实际的分区计数受 Binder 的minPartitionCount
财产。违约:
none
- 配置
-
使用包含通用 Kafka 使用者属性的键/值对进行映射。 除了具有 Kafka 使用者属性之外,还可以在此处传递其他配置属性。 例如,应用程序所需的一些属性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
.默认值:空地图。
- dlq名称
-
用于接收错误消息的 DLQ 主题的名称。
默认值:null(如果未指定,则导致错误的消息将转发到名为
error.<destination>.<group>
). - dlqProducerProperties
-
使用此功能,可以设置特定于 DLQ 的创建者属性。 通过 kafka producer 属性提供的所有属性都可以通过此属性进行设置。 当 consumer 上启用了原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以
dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
.默认值:默认 Kafka 生产者属性。
- standardHeaders 的
-
指示入站通道适配器填充哪些标准头。 允许的值:
none
,id
,timestamp
或both
. 如果使用本机反序列化并且接收消息的第一个组件需要id
(例如配置为使用 JDBC 消息存储的聚合器)。违约:
none
- converterBean名称
-
实现
RecordMessageConverter
.在入站通道适配器中使用,以替换默认的MessagingMessageConverter
.违约:
null
- idleEventInterval
-
指示最近未收到任何消息的事件之间的时间间隔(以毫秒为单位)。 使用
ApplicationListener<ListenerContainerIdleEvent>
以接收这些事件。 有关使用示例,请参阅示例:暂停和恢复使用者。违约:
30000
- destinationIsPattern
-
如果为 true,则目标被视为正则表达式
Pattern
用于匹配 broker 的主题名称。 如果为 true,则不预置主题,并且enableDlq
是不允许的,因为 Binder 在预置阶段不知道主题名称。 请注意,检测与模式匹配的新主题所花费的时间由 consumer 属性metadata.max.age.ms
,该指标(在撰写本文时)默认为 300000 毫秒(5 分钟)。 这可以使用configuration
属性。违约:
false
- topic.properties
-
一个
Map
预置新主题时使用的 Kafka 主题属性,例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:none。
- topic.replicas-assignment
-
副本分配的 Map<Integer、List<Integer>>,键是分区,值是分配。 在预置新主题时使用。 请参阅
NewTopic
Javadocs 中的kafka-clients
罐。默认值:none。
- topic.replication-factor
-
预置主题时要使用的复制因子。覆盖 binder 范围的设置。 如果 Ignored
replicas-assignments
存在。默认值:none (使用 binder 范围的默认值 1)。
- 轮询超时
-
用于轮询使用者中的轮询的超时。
默认值:5 秒。
- 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager
用于覆盖此绑定的 Binder 的事务管理器。 如果要将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager
. 要实现记录的 Just Once 使用和生成,Consumer 和 Producer 绑定都必须使用相同的事务管理器进行配置。默认值:none。
1.3.3. 消费 Batch
从版本 3.0 开始,当spring.cloud.stream.binding.<name>.consumer.batch-mode
设置为true
,则轮询 Kafka 收到的所有记录Consumer
将以List<?>
添加到 listener 方法中。
否则,将一次使用一条记录调用该方法。
批处理的大小由 Kafka 使用者属性控制max.poll.records
,min.fetch.bytes
,fetch.max.wait.ms
;有关更多信息,请参阅 Kafka 文档。
使用批处理模式时,不支持在 Binder 中重试,因此maxAttempts 将被覆盖为 1。
您可以配置SeekToCurrentBatchErrorHandler (使用ListenerContainerCustomizer ) 实现与在 Binder 中重试类似的功能。
您也可以使用手册AckMode 并调用Ackowledgment.nack(index, sleep) 提交部分批处理的偏移量并重新交付剩余记录。
有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。 |
1.3.4. Kafka 生产者属性
为避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.producer.<property>=<value> . |
以下属性仅适用于 Kafka 生产者,并且
必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.
.
- admin.configuration 的
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties
,并且在未来版本中将删除对它的支持。 - admin.replicas-分配
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment
,并且在未来版本中将删除对它的支持。 - admin.replication-factor (管理员复制因子)
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor
,并且在未来版本中将删除对它的支持。 - 缓冲区大小
-
Kafka 创建者在发送之前尝试批处理的数据量的上限(以字节为单位)。
违约:
16384
. - 同步
-
生产者是否同步。
违约:
false
. - 发送超时表达式
-
根据传出消息评估的 SPEL 表达式,用于评估启用同步发布时等待确认的时间,例如,
headers['mySendTimeout']
. 超时的值以毫秒为单位。 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]
. 现在,在转换有效负载之前计算表达式。违约:
none
. - batch超时
-
在发送消息之前,创建者等待多长时间以允许更多消息在同一批次中累积。 (通常,创建者根本不等待,而只是发送在上一次发送过程中累积的所有消息。非零值可能会以延迟为代价增加吞吐量。
违约:
0
. - messageKey表达式
-
根据用于填充生成的 Kafka 消息的键的传出消息计算的 SpEL 表达式,例如,
headers['myKey']
. 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]
. 现在,在转换有效负载之前计算表达式。违约:
none
. - headerPatterns 的
-
一个逗号分隔的简单模式列表,用于匹配要映射到 Kafka 的 Spring 消息传递标头
Headers
在ProducerRecord
. 模式可以以通配符(星号)开头或结尾。 模式可以通过在!
. 匹配在第一个匹配项 (正或负) 之后停止。 例如!ask,as*
将通过ash
但不是ask
.id
和timestamp
永远不会映射。默认值:(所有标头 - 除了
*
id
和timestamp
) - 配置
-
Map 包含通用 Kafka 生产者属性的键/值对。
默认值:空地图。
- topic.properties
-
一个
Map
预置新主题时使用的 Kafka 主题属性,例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
副本分配的 Map<Integer、List<Integer>>,键是分区,值是分配。 在预置新主题时使用。 请参阅
NewTopic
Javadocs 中的kafka-clients
罐。默认值:none。
- topic.replication-factor
-
预置主题时要使用的复制因子。覆盖 binder 范围的设置。 如果 Ignored
replicas-assignments
存在。默认值:none (使用 binder 范围的默认值 1)。
- useTopicHeader
-
设置为
true
要使用KafkaHeaders.TOPIC
message 标头。 如果标头不存在,则使用默认绑定目标。 违约:false
. - recordMetadataChannel
-
一个 bean 的 bean 名称
MessageChannel
应将成功的发送结果发送到哪个位置;该 Bean 必须存在于应用程序上下文中。 发送到通道的消息是带有附加标头的已发送消息(转换后,如果有)KafkaHeaders.RECORD_METADATA
. 标头包含一个RecordMetadata
Kafka 客户端提供的 object;它包括主题中写入记录的 partition 和 offset。
ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败的发送将转到 producer 错误通道(如果已配置);请参见错误通道。 默认值:null
+
Kafka Binder 使用partitionCount 设置创建具有给定分区计数的主题(与minPartitionCount ,两者中的最大值是正在使用的值)。
在配置两者时要小心minPartitionCount 对于活页夹和partitionCount 对于应用程序,因为使用了较大的值。
如果已存在分区计数较小的主题,并且autoAddPartitions 已禁用(默认值),则 Binder 无法启动。
如果已存在分区计数较小的主题,并且autoAddPartitions ,则会添加新的分区。
如果已存在分区数大于最大值 (minPartitionCount 或partitionCount ),则使用现有分区计数。 |
- 压缩
-
将
compression.type
producer 属性。 支持的值包括none
,gzip
,snappy
和lz4
. 如果您覆盖kafka-clients
jar 更改为 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用zstd
压缩, 使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
.违约:
none
. - 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager
用于覆盖此绑定的 Binder 的事务管理器。 如果要将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager
. 要实现记录的 Just Once 使用和生成,Consumer 和 Producer 绑定都必须使用相同的事务管理器进行配置。默认值:none。
1.3.5. 使用示例
在本节中,我们将展示上述属性在特定情况下的用法。
示例:设置autoCommitOffset
自false
和依赖手动确认
此示例说明了如何在使用者应用程序中手动确认偏移量。
此示例要求spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset
设置为false
.
使用相应的 input channel name 作为示例。
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
示例:安全配置
Apache Kafka 0.9 支持客户端和代理之间的安全连接。
要利用此功能,请遵循 Apache Kafka 文档中的准则以及 Confluent 文档中的 Kafka 0.9 安全准则。
使用spring.cloud.stream.kafka.binder.configuration
选项为 Binder 创建的所有客户端设置安全属性。
例如,要将security.protocol
自SASL_SSL
,设置以下属性:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
所有其他安全属性都可以以类似的方式设置。
使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。
Spring Cloud Stream 支持通过使用 JAAS 配置文件和使用 Spring Boot 属性将 JAAS 配置信息传递给应用程序。
使用 JAAS 配置文件
可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。 以下示例显示了如何使用 JAAS 配置文件启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性
作为拥有 JAAS 配置文件的替代方法, Spring Cloud Stream 提供了一种机制,用于使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。
以下属性可用于配置 Kafka 客户端的登录上下文:
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
登录模块名称。在正常情况下不需要设置。
违约:
com.sun.security.auth.module.Krb5LoginModule
. - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登录模块的控制标志。
违约:
required
. - spring.cloud.stream.kafka.binder.jaas.options
-
使用包含登录模块选项的键/值对进行映射。
默认值:空地图。
以下示例显示了如何使用 Spring Boot 配置属性启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
前面的示例表示以下 JAAS 文件的等效项:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
如果代理上已存在所需的主题或将由管理员创建,则可以关闭自动创建,并且只需要发送客户端 JAAS 属性。
不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。
如果-Djava.security.auth.login.config system 属性已存在,则 Spring Cloud Stream 会忽略 Spring Boot 属性。 |
使用autoCreateTopics 和autoAddPartitions 与 Kerberos 一起。
通常,应用程序可能会使用在 Kafka 和 Zookeeper 中没有管理权限的委托人。
因此,依赖 Spring Cloud Stream 创建/修改 Topic 可能会失败。
在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。 |
示例:暂停和恢复使用者
如果您希望暂停消费但不会导致分区重新平衡,则可以暂停和恢复使用者。
这可以通过添加Consumer
作为@StreamListener
.
要恢复,您需要一个ApplicationListener
为ListenerContainerIdleEvent
实例。
事件的发布频率由idleEventInterval
财产。
由于使用者不是线程安全的,因此必须在调用线程上调用这些方法。
以下简单应用程序演示如何暂停和恢复:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
1.4. 事务性 Binder
通过设置spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
设置为非空值,例如tx-
.
在处理器应用程序中使用时,使用者启动事务;在 Consumer 线程上发送的任何记录都参与同一个事务。
当监听器正常退出时,监听器容器会将偏移量发送给事务并提交。
通用的 producer 工厂用于使用spring.cloud.stream.kafka.binder.transaction.producer.*
性能;将忽略单个绑定 Kafka 生产者属性。
事务不支持正常的 Binder 重试(和死信),因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也将被回滚。
启用重试后(通用属性maxAttempts 大于零),retry 属性用于配置DefaultAfterRollbackProcessor 在容器级别启用重试。
同样,此功能不是在事务中发布死信记录,而是再次通过DefaultAfterRollbackProcessor 它在主事务回滚后运行。 |
如果您希望在源应用程序中使用事务,或者从某个任意线程中为仅限生产者事务(例如@Scheduled
方法),您必须获取对事务性生产者工厂的引用,并定义一个KafkaTransactionManager
bean 使用它。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
请注意,我们使用BinderFactory
;用null
当只配置了一个 Binder 时。
如果配置了多个 Binder,请使用 Binder 名称获取引用。
一旦我们有了对 binder 的引用,我们就可以获得对ProducerFactory
并创建一个事务管理器。
然后你会使用普通的 Spring 事务支持,例如TransactionTemplate
或@Transactional
例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅生产者事务与来自其他事务管理器的事务同步,请使用ChainedTransactionManager
.
如果您部署应用程序的多个实例,则每个实例都需要一个唯一的transactionIdPrefix . |
1.5. 错误通道
从版本 1.3 开始,Binder 无条件地将异常发送到每个 Consumer 目标的错误通道,并且还可以配置为将异步 producer send failures发送到 error 通道。 有关更多信息,请参见[spring-cloud-stream-overview-error-handling]。
的ErrorMessage
因为发送失败是一个KafkaSendFailureException
具有属性:
-
failedMessage
: Spring 消息传递Message<?>
未能发送。 -
record
: 原始ProducerRecord
它是从failedMessage
不会自动处理生产者异常(例如发送到死信队列)。 您可以在自己的 Spring 集成流中使用这些异常。
1.6. Kafka 指标
Kafka Binder 模块公开了以下指标:
spring.cloud.stream.binder.kafka.offset
:此指标表示给定使用者组尚未从给定 Binder 的主题中使用多少条消息。
提供的指标基于 Mircometer 指标库。该指标包含使用者组信息、主题以及已提交偏移量与主题上最新偏移量的实际滞后。
此指标对于向 PaaS 平台提供自动扩展反馈特别有用。
1.7. Tombstone Records(空记录值)
使用压缩主题时,带有null
值(也称为逻辑删除记录)表示删除键。
要在@StreamListener
方法,则必须将该参数标记为 not required 才能接收null
value 参数。
@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
1.8. 使用 KafkaRebalanceListener
应用程序可能希望在最初分配分区时将主题/分区查找到任意偏移量,或在消费者上执行其他作。
从版本 2.1 开始,如果您提供单个KafkaRebalanceListener
bean 时,它将被连接到所有 Kafka 消费者绑定中。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
您无法设置resetOffsets
consumer 属性设置为true
当您提供 Rebalance 侦听器时。
1.9. 死信主题处理
1.9.1. 死信主题分区选择
默认情况下,使用与原始记录相同的分区将记录发布到 Dead-Letter 主题。 这意味着 Dead-Letter 主题必须至少具有与原始记录一样多的分区。
要更改此行为,请添加DlqPartitionFunction
implementation 作为@Bean
添加到应用程序上下文中。
只能存在一个这样的 bean。
该函数随 consumer 组一起提供,失败的ConsumerRecord
和例外。
例如,如果您始终希望路由到分区 0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions 属性设置为 1(并且 Binder 的minPartitionCount 等于1 ),则无需提供DlqPartitionFunction ;框架将始终使用分区 0。
如果将使用者绑定的dlqPartitions property 的值设置为1 (或 Binder 的minPartitionCount 大于1 ),您必须提供DlqPartitionFunction bean,即使分区计数与原始主题的分区计数相同。 |
1.9.2. 处理死信主题中的记录
由于框架无法预测用户希望如何处理死信消息,因此它不提供任何标准机制来处理它们。 如果死信的原因是暂时的,您可能希望将消息路由回原始主题。 但是,如果问题是永久性问题,则可能会导致无限循环。 本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在三次尝试后将它们移动到 “parking lot” 主题。 该应用程序是另一个spring-cloud-stream应用程序,它从死信主题中读取。 当 5 秒内未收到任何消息时,它将终止。
这些示例假定原始目标是so8400out
且 Consumer 组为so8400
.
有几种策略需要考虑:
-
请考虑仅在主应用程序未运行时运行 rerouting。 否则,暂时性错误的重试将很快用完。
-
或者,使用两阶段方法:使用此应用程序路由到第三个主题,使用另一个应用程序从那里路由回主要主题。
以下代码清单显示了示例应用程序:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
1.10. 使用 Kafka Binder 进行分区
Apache Kafka 原生支持主题分区。
有时,将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时(特定客户的所有消息都应发送到同一分区)。
以下示例显示了如何配置生产者和使用者端:
@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
必须预置主题以具有足够的分区,以实现所有使用者组所需的并发性。
上述配置最多支持 12 个 Consumer 实例(如果concurrency 是 2,如果并发数为 3,则为 4,依此类推)。
通常,最好 “过度预置” 分区,以允许将来使用者或并发性的增加。 |
上述配置使用默认分区 (key.hashCode() % partitionCount ).
这可能会也可能不会提供适当平衡的算法,具体取决于键值。
您可以使用partitionSelectorExpression 或partitionSelectorClass 性能。 |
由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。 Kafka 跨实例分配分区。
Spring Spring Boot 应用程序侦听 Kafka 流并打印(到控制台)每条消息发送到的分区 ID:
@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + " received from partition " + partition);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.topic
group: myGroup
您可以根据需要添加实例。
Kafka 重新平衡分区分配。
如果实例计数 (或instance count * concurrency
) 超过分区数,则部分消费者处于空闲状态。
2. Kafka Streams 活页夹
2.1. 用法
要使用 Kafka Streams Binders,您只需使用以下 maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
为 Kafka Streams Binder 引导新项目的一种快速方法是使用 Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示

2.2. 概述
Spring Cloud Stream 包括一个专门为 Apache Kafka Streams 绑定设计的 Binder 实现。 通过这种原生集成, Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。
Kafka Streams Binder 实现建立在 Spring for Apache Kafka 项目提供的基础之上。
Kafka Streams Binder 为 Kafka Streams 中的三种主要类型提供了绑定功能 -KStream
,KTable
和GlobalKTable
.
Kafka Streams 应用程序通常遵循一个模型,在该模型中,从入站主题中读取记录,应用业务逻辑,然后将转换后的记录写入出站主题。 或者,也可以定义没有出站目标的 Processor 应用程序。
在以下部分中,我们将了解 Spring Cloud Stream 与 Kafka Streams 集成的详细信息。
2.3. 编程模型
当使用 Kafka Streams Binder 提供的编程模型时,高级 Streams DSL 以及更高级别和较低级别 Processor-API 的混合都可以用作选项。
当混合使用更高级别和较低级别的 API 时,这通常是通过调用transform
或process
API 方法KStream
.
2.3.1. 函数式风格
从 Spring Cloud Stream 开始3.0.0
Kafka Streams Binder 允许使用 Java 8 中提供的函数式编程样式来设计和开发应用程序。
这意味着应用程序可以简洁地表示为类型的 lambda 表达式java.util.function.Function
或java.util.function.Consumer
.
让我们举一个非常基本的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
尽管简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。
这是一个没有出站绑定,只有一个入站绑定的使用者应用程序。
应用程序使用数据,它只是记录来自KStream
key 和 value 的 VALUE 值。
该应用程序包含SpringBootApplication
annotation 和标记为Bean
.
bean 方法的类型为java.util.function.Consumer
其参数化为KStream
.
然后,在实现中,我们返回一个 Consumer 对象,它本质上是一个 lambda 表达式。
在 lambda 表达式中,提供了用于处理数据的代码。
在此应用程序中,有一个类型为KStream
.
Binder 为应用程序创建此绑定,其名称为process-in-0
,即函数 Bean 名称的名称,后跟一个破折号 () 和文本-
in
后跟另一个短划线,然后是参数的序号位置。
您可以使用此绑定名称来设置其他属性,例如 destination。
例如spring.cloud.stream.bindings.process-in-0.destination=my-topic
.
如果未在绑定上设置 destination 属性,则会创建一个与绑定同名的主题(如果应用程序有足够的权限),或者该主题应该已经可用。 |
一旦构建为 uber-jar(例如kstream-consumer-app.jar
),您可以运行上述示例,如下所示。
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这是另一个示例,其中它是一个同时具有输入和输出绑定的完整处理器。 这是一个经典的字数统计示例,其中应用程序从主题接收数据,然后在滚动时间窗口中计算每个单词的出现次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
同样,这是一个完整的 Spring Boot 应用程序。此处与第一个应用程序的区别在于 bean 方法的类型为java.util.function.Function
.
第一个参数化类型的Function
用于输入KStream
第二个用于输出。
在方法主体中,提供了一个 lambda 表达式,其类型为Function
作为实现,给出了实际的业务逻辑。
与前面讨论的基于 Consumer 的应用程序类似,此处的输入绑定命名为process-in-0
默认情况下。对于输出,绑定名称也会自动设置为process-out-0
.
一旦构建为 uber-jar(例如wordcount-processor.jar
),您可以运行上述示例,如下所示。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将使用来自 Kafka 主题的消息words
,计算结果将发布到 output
主题counts
.
Spring Cloud Stream 将确保来自传入和传出主题的消息都自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写逻辑 在处理器中是必需的。设置 Kafka Streams 基础架构所需的 Kafka Streams 特定配置 由框架自动处理。
我们上面看到的两个示例有一个KStream
input 绑定。在这两种情况下,绑定都接收来自单个主题的记录。
如果要将多个主题多路复用为单个KStream
绑定,您可以在下面提供逗号分隔的 Kafka 主题作为目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果要将主题与常规 exression 匹配,还可以提供主题模式作为目标。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多个输入绑定
许多重要的 Kafka Streams 应用程序经常通过多个绑定使用来自多个主题的数据。
例如,一个主题被消耗为Kstream
另一个KTable
或GlobalKTable
.
应用程序可能希望以表类型接收数据的原因有很多。
考虑一个用例,其中底层主题是通过数据库中的变更数据捕获 (CDC) 机制填充的,或者应用程序可能只关心下游处理的最新更新。
如果应用程序指定数据需要绑定为KTable
或GlobalKTable
,则 Kafka Streams Binder 会将目标正确绑定到KTable
或GlobalKTable
并使其可供应用程序作。
我们将了解如何在 Kafka Streams Binder 中处理多个输入绑定的几种不同场景。
Kafka Streams Binder 中的 BiFunction
下面是一个示例,其中有两个 inputs 和一个 output。在这种情况下,应用程序可以利用java.util.function.BiFunction
.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
在这里,基本主题与前面的示例相同,但这里我们有两个输入。
Java 的BiFunction
support 用于将输入绑定到所需的目标。
Binder 为输入生成的默认绑定名称为process-in-0
和process-in-1
分别。默认输出绑定为process-out-0
.
在此示例中,第一个参数BiFunction
绑定为KStream
对于第一个输入,第二个参数被绑定为KTable
对于第二个输入。
Kafka Streams Binder 中的 BiConsumer
如果有两个 inputs,但没有 outputs,在这种情况下,我们可以使用java.util.function.BiConsumer
如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超出两个输入
如果您有两个以上的输入怎么办? 在某些情况下,您需要两个以上的输入。在这种情况下,Binders 允许您链接部分函数。 在函数式编程术语中,这种技术通常称为柯里化。 随着 Java 8 中添加了函数式编程支持,Java 现在允许您编写柯里化函数。 Spring Cloud Stream Kafka Streams Binder 可以利用此功能来启用多个 Importing 绑定。
让我们看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看看上面介绍的绑定模型的详细信息。
在这个模型中,我们在入站上有 3 个部分应用的函数。我们把它们称为f(x)
,f(y)
和f(z)
.
如果我们在真正的数学函数的意义上扩展这些函数,它将如下所示:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
.
这x
variable 代表KStream<Long, Order>
这y
variable 代表GlobalKTable<Long, Customer>
和z
variable 代表GlobalKTable<Long, Product>
.
第一个函数f(x)
具有应用程序的第一个输入绑定 (KStream<Long, Order>
),其输出为函数 f(y)。
函数f(y)
具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>
),它的输出是另一个函数f(z)
.
函数的输入f(z)
是应用程序的第三个输入 (GlobalKTable<Long, Product>
),其输出为KStream<Long, EnrichedOrder>
这是应用程序的最终输出绑定。
来自三个偏函数的输入,它们是KStream
,GlobalKTable
,GlobalKTable
分别在方法主体中可供您使用,用于将业务逻辑作为 Lambda 表达式的一部分实施。
输入绑定命名为enrichOrder-in-0
,enrichOrder-in-1
和enrichOrder-in-2
分别。输出绑定命名为enrichOrder-out-0
.
使用柯里化函数,您几乎可以拥有任意数量的 Importing。但是,请记住,在 Java 中,除了较少数量的输入和部分应用的函数之外,任何其他内容都可能导致代码不可读。 因此,如果您的 Kafka Streams 应用程序需要的输入绑定数量超过合理较少的数量,并且您希望使用此函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。
多个输出绑定
Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。
使用多个输出绑定时,您需要提供一个 KStream 数组 (KStream[]
) 作为出站返回类型。
下面是一个示例:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
编程模型保持不变,但出站参数化类型为KStream[]
.
默认输出绑定名称为process-out-0
,process-out-1
,process-out-2
分别。
Binder 生成三个 output bindings 的原因是,它检测返回的KStream
数组。
Kafka Streams 的基于函数的编程样式摘要
总之,下表显示了可在函数范例中使用的各种选项。
输入数量 | 输出数量 | 要使用的组件 |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用柯里化函数 |
-
如果此表中有多个输出,则类型将简单地变为
KStream[]
.
2.3.2. 命令式编程模型。
尽管上面概述的函数式编程模型是首选方法,但您仍然可以使用经典的StreamListener
基于方法。
以下是一些示例。
以下是使用StreamListener
.
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
如您所见,这有点冗长,因为您需要提供EnableBinding
和其他额外的注释,如StreamListener
和SendTo
使其成为一个完整的应用程序。EnableBinding
是指定包含绑定的 Binding 接口的位置。
在这种情况下,我们使用的是 stockKafkaStreamsProcessor
Binding 接口,该接口具有以下 Contract。
public interface KafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
@Output("output")
KStream<?, ?> output();
}
Binder 将为 input 创建绑定KStream
和输出KStream
因为您使用的是包含这些声明的 Binding 接口。
除了函数式样式中提供的编程模型的明显差异之外,这里需要提到的一点是,绑定名称是您在绑定接口中指定的名称。
例如,在上面的应用程序中,由于我们使用KafkaStreamsProcessor
,则绑定名称为input
和output
.
Binding 属性需要使用这些名称。例如spring.cloud.stream.bindings.input.destination
,spring.cloud.stream.bindings.output.destination
等。
请记住,这与函数式样式有根本的不同,因为 Binders 会为应用程序生成绑定名称。
这是因为应用程序在功能模型中不使用EnableBinding
.
这是另一个 sink 示例,其中有两个 inputs。
@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
@Input("inputTable") KTable<Long, Song> songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream<?, ?> inputStream();
@Input("inputTable")
KTable<?, ?> inputTable();
}
以下是StreamListener
等效的 相同BiFunction
基于处理器。
@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputTable();
}
最后,这是StreamListener
相当于具有三个 inputs 和 curried 函数的应用程序。
@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
@StreamListener
@SendTo("output")
public KStream<Long, EnrichedOrder> process(
@Input("input-1") KStream<Long, Order> ordersStream,
@Input("input-"2) GlobalKTable<Long, Customer> customers,
@Input("input-3") GlobalKTable<Long, Product> products) {
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
customers, (orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
interface CustomGlobalKTableProcessor {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
GlobalKTable<?, ?> input2();
@Input("input-3")
GlobalKTable<?, ?> input3();
@Output("output")
KStream<?, ?> output();
}
你可能会注意到,上面两个例子更加冗长,因为除了提供EnableBinding
,您还需要编写自己的自定义绑定接口。
使用功能模型,您可以避免所有这些仪式细节。
在我们继续查看 Kafka Streams Binder 提供的通用编程模型之前,这里是StreamListener
版本。
EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream<?, ?> input();
@Output("output1")
KStream<?, ?> output1();
@Output("output2")
KStream<?, ?> output2();
@Output("output3")
KStream<?, ?> output3();
}
}
回顾一下,我们已经回顾了使用 Kafka Streams Binder 时的各种编程模型选择。
Binder 为KStream
,KTable
和GlobalKTable
在输入上。KTable
和GlobalKTable
绑定仅在 input 上可用。
Binder 支持KStream
.
Kafka Streams Binder 编程模型的结果是,Binder 为您提供了灵活性,您可以使用功能齐全的编程模型或使用StreamListener
基于命令式方法。
2.4. 编程模型的辅助设备
2.4.1. 单个应用程序中的多个 Kafka Streams 处理器
Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。 您可以按以下方式进行申请。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,Binder 将创建 3 个具有不同应用程序 ID 的单独 Kafka Streams 对象(更多内容见下文)。 但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream,需要激活哪些功能。 以下是激活功能的方法。
spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess
如果您希望某些功能不立即激活,您可以将其从此列表中删除。
当您拥有单个 Kafka Streams 处理器和其他类型的Function
同一应用程序中的 bean,通过不同的 Binders 处理(例如,基于常规 Kafka Message Channel Binders 的函数 Bean)
2.4.2. Kafka Streams 应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必需属性。 Spring Cloud Stream Kafka Streams Binder 允许您以多种方式配置此应用程序 ID。
如果您只有一个处理器或StreamListener
在应用程序中,您可以使用以下属性在 Binder 级别进行设置:
spring.cloud.stream.kafka.streams.binder.applicationId
.
为方便起见,如果您只有一个处理器,您还可以使用spring.application.name
作为属性来委托应用程序 ID。
如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。 对于函数模型,您可以将其作为属性附加到每个函数。
例如,假设您有以下功能。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后,您可以使用以下 Binder 级别属性为每个应用程序设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
在StreamListener
,您需要在处理器上的第一个 input binding 上设置此项。
例如,假设您有以下两个StreamListener
基于处理器。
@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
...
}
@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
...
}
然后,您必须使用以下 binding 属性为此设置应用程序 ID。
spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId
和
spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。 但是,如果您使用的是函数模型,则在上面看到的 binder 级别设置每个函数要容易得多。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。 如果您要自动扩展应用程序,这一点尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。
如果应用程序未提供应用程序 ID,则在这种情况下,Binder 将为您自动生成静态应用程序 ID。
这在开发方案中很方便,因为它避免了显式提供应用程序 ID 的需要。
以这种方式生成的应用程序 ID 在应用程序重启时将是静态的。
对于函数模型,生成的应用程序 ID 将是函数 Bean 名称,后跟文字applicationID
,例如process-applicationID
如果process
如果函数 Bean Name.
在StreamListener
,而不是使用函数 Bean 名称,而是使用包含类名称,后跟方法名称,然后是文本applicationId
.
应用程序 ID 设置摘要
-
默认情况下,binder 将为每个函数自动生成应用程序 ID,或者
StreamListener
方法。 -
如果您有单个处理器,则可以使用
spring.kafka.streams.applicationId
,spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
. -
如果您有多个处理器,则可以使用属性 -
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
. 在StreamListener
,这可以使用spring.cloud.stream.kafka.streams.bindings.input.applicationId
,假设输入绑定名称为input
.
2.4.3. 使用函数式样式覆盖 Binder 生成的默认绑定名称
默认情况下,当使用函数式样式时,Binders 使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。 如果要覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>
.默认绑定名称是 Binder 生成的原始绑定名称。
例如,假设您有这个函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 将生成带有名称process-in-0
,process-in-1
和process-out-0
.
现在,如果你想把它们完全改成别的,也许是更多特定于域的绑定名称,那么你可以按如下方式进行。
springc.cloud.stream.function.bindings.process-in-0=users
springc.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,您必须在这些新绑定名称上设置所有绑定级别属性。
请记住,对于上述函数式编程模型,在大多数情况下,遵守默认绑定名称是有意义的。 您可能仍然希望进行此覆盖的唯一原因是,当您拥有大量配置属性并且希望将绑定映射到对域更友好的内容时。
2.4.4. 设置 bootstrap 服务器配置
运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。
如果您不提供此信息,则 Binder 会期望您以默认localhost:9092
.
如果不是这种情况,那么您需要覆盖它。有几种方法可以做到这一点。
-
使用 boot 属性 -
spring.kafka.bootstrapServers
-
Binder 级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
当涉及到 Binder 级别属性时,您是否使用通过常规 Kafka Binder 提供的 broker 属性并不重要 -spring.cloud.stream.kafka.binder.brokers
.
Kafka Streams Binder 将首先检查是否设置了 Kafka Streams Binder 特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers
),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers
.
2.5. 记录序列化和反序列化
Kafka Streams Binder 允许您以两种方式序列化和反序列化记录。 一个是 Kafka 提供的原生序列化和反序列化工具,另一个是 Spring Cloud Stream 框架的消息转换能力。 让我们看看一些细节。
2.5.1. 入站反序列化
键总是使用本机 Serdes 反序列化。
对于值,默认情况下,入站上的反序列化由 Kafka 本机执行。 请注意,这是对以前版本的 Kafka Streams Binder 默认行为的重大更改,其中反序列化是由框架完成的。
Kafka Streams Binder 将尝试推断匹配Serde
类型,方法是查看java.util.function.Function|Consumer
或StreamListener
.
这是它与 Serdes 匹配的顺序。
-
如果应用程序提供
Serde
如果返回类型使用传入 key 或 value 类型的实际类型进行参数化,则它将使用它Serde
用于入站反序列化。 例如,如果您在应用程序中有以下内容,则 Binder 会检测到KStream
与在Serde
豆。 它将使用它进行入站反序列化。
@Bean
public Serde<Foo() customSerde{
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型,并查看它们是否是 Kafka Streams 公开的类型之一。如果是这样,请使用它们。 以下是 Binder 将尝试从 Kafka Streams 匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的任何 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,Binder 假定类型是 JSON 友好的。 如果您有多个值对象作为输入,这非常有用,因为 Binder 会在内部推断它们以更正 Java 类型。 在回退到
JsonSerde
不过,Binder 会默认检查Serde`s set in the Kafka Streams configuration to see if it is a `Serde
它可以与传入的 KStream 类型匹配。
如果上述策略均无效,则应用程序必须通过配置提供 'Serde'。 这可以通过两种方式进行配置 - binding 或 default。
首先,Binder 将查看Serde
在绑定级别提供。
例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,您可以提供 Binding 级别Serde
使用以下内容:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您提供Serde 如上所述,每个 input binding,这将具有更高的优先级,并且 Binder 将远离任何Serde 推理。 |
如果您希望将默认键/值 Serdes 用于入站反序列化,则可以在 Binder 级别执行此作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不想要 Kafka 提供的原生解码,可以依赖 Spring Cloud Stream 提供的消息转换功能。 由于原生解码是默认的,为了让 Spring Cloud Stream 反序列化入站值对象,你需要显式禁用原生解码。
例如,如果你有与上面相同的 BiFunction 处理器,那么spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要单独禁用所有 inputs 的本机解码。否则,本机解码仍将应用于您未禁用的那些。
默认情况下,Spring Cloud Stream 将使用application/json
作为内容类型,并使用适当的 JSON 消息转换器。
您可以通过使用以下属性和适当的MessageConverter
豆。
spring.cloud.stream.bindings.process-in-0.contentType
2.5.2. 出站序列化
出站序列化几乎遵循与上述入站反序列化相同的规则。 与入站反序列化一样,与以前版本的 Spring Cloud Stream 相比,一个主要变化是出站上的序列化由 Kafka 本地处理。 在 Binder 的 3.0 版本之前,这是由框架本身完成的。
出站的键始终由 Kafka 使用匹配的Serde
这是由 Binder 推断的。
如果它无法推断出键的类型,则需要使用 configuration 指定。
值 serdes 是使用用于入站反序列化的相同规则推断的。
首先,它进行匹配以查看出站类型是否来自应用程序中提供的 bean。
如果不匹配,它会检查它是否与Serde
被 Kafka 公开,例如 -Integer
,Long
,Short
,Double
,Float
,byte[]
,UUID
和String
.
如果这不起作用,那么它会回退到JsonSerde
由 Spring Kafka 项目提供,但首先看一下默认的Serde
配置以查看是否存在匹配项。
请记住,所有这些都对应用程序是透明的。
如果这些都不起作用,则用户必须提供Serde
以按配置使用。
假设您正在使用相同的BiFunction
处理器。然后,您可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推理失败,并且没有提供绑定级别的 Serdes,则 Binder 将回退到JsonSerde
,但请查看匹配项的默认 Serdes。
默认 serdes 的配置方式与上面相同,在 deserialization 下描述。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用程序使用分支功能并具有多个输出绑定,则必须为每个绑定配置这些绑定。
同样,如果 Binder 能够推断出Serde
types 中,则无需进行此配置。
如果您不需要 Kafka 提供的原生编码,但想使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认的。
例如,如果你有与上面相同的 BiFunction 处理器,那么spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,您需要单独禁用所有输出的本机编码。否则,本机编码仍将应用于您未禁用的那些。
当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json
作为内容类型,并使用适当的 JSON 消息转换器。
您可以通过以下属性和相应的MessageConverter
豆。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用本机编码/解码时, binder 不会像本机 Serdes 那样进行任何推理。
应用程序需要显式提供所有配置选项。
因此,在编写 Spring Cloud Stream Kafka Streams 应用程序时,通常建议保留默认的反序列化/序列化选项,并坚持使用 Kafka Streams 提供的原生反/序列化。
您必须使用框架提供的消息转换功能的一种情况是,当上游生产者使用特定的序列化策略时。
在这种情况下,您需要使用匹配的反序列化策略,因为本机机制可能会失败。
当依赖默认的Serde
机制中,应用程序必须确保 Binder 有办法使用适当的Serde
,否则可能会失败。
值得一提的是,上面概述的数据取消/序列化方法仅适用于处理器的边缘,即 - 入站和出站。
您的业务逻辑可能仍需要调用明确需要的 Kafka Streams APISerde
对象。
这些仍然是应用程序的责任,必须由开发人员进行相应的处理。
2.6. 错误处理
Apache Kafka Streams 提供了本机处理反序列化错误异常的功能。
有关此支持的详细信息,请参阅此处。
Apache Kafka Streams 提供了两种开箱即用的反序列化异常处理程序 -LogAndContinueExceptionHandler
和LogAndFailExceptionHandler
.
顾名思义,前者将记录错误并继续处理下一条记录,而后者将记录错误并失败。LogAndFailExceptionHandler
是默认的反序列化异常处理程序。
2.6.1. 在 Binder 中处理反序列化异常
Kafka Streams Binder 允许使用以下属性指定上述反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两个反序列化异常处理程序之外,Binder 还提供了第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。 以下是启用此 DLQ 异常处理程序的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
当设置了上述属性时,所有 disserialization error 中的记录都会自动发送到 DLQ 主题。
您可以设置发布 DLQ 消息的主题名称,如下所示。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此选项,则错误记录将发送到主题custom-dlq
.
如果未设置,则它将创建一个名称为error.<input-topic-name>.<application-id>
.
例如,如果绑定的目标主题是inputTopic
应用程序 ID 为process-applicationId
,则默认 DLQ 主题为error.inputTopic.process-applicationId
.
如果您打算启用 DLQ,则始终建议为每个输入绑定显式创建一个 DLQ 主题。
2.6.2. 每个输入使用者绑定的 DLQ
物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。
这意味着,如果存在多个函数或StreamListener
方法,则此属性将应用于所有这些方法。
但是,如果单个处理器中有多个处理器或多个输入绑定,则可以使用 Binders 为每个输入使用者绑定提供的更精细的 DLQ 控件。
如果您有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且您只想在第一个输入绑定上启用 DLQ,在第二个绑定上启用 logAndSkip,那么您可以在使用者上执行此作,如下所示。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip
以这种方式设置反序列化异常处理程序的优先级高于在 Binder 级别设置。
2.6.3. DLQ 分区
默认情况下,使用与原始记录相同的分区将记录发布到 Dead-Letter 主题。 这意味着 Dead-Letter 主题必须至少具有与原始记录一样多的分区。
要更改此行为,请添加DlqPartitionFunction
implementation 作为@Bean
添加到应用程序上下文中。
只能存在一个这样的 bean。
该函数随 consumer 组(大多数情况下与应用程序 ID 相同)提供,失败的ConsumerRecord
和例外。
例如,如果您始终希望路由到分区 0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions 属性设置为 1(并且 Binder 的minPartitionCount 等于1 ),则无需提供DlqPartitionFunction ;框架将始终使用分区 0。
如果将使用者绑定的dlqPartitions property 的值设置为1 (或 Binder 的minPartitionCount 大于1 ),您必须提供DlqPartitionFunction bean,即使分区计数与原始主题的分区计数相同。 |
在 Kafka Streams Binder 中使用异常处理功能时,需要记住几点。
-
物业
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。 这意味着,如果存在多个函数或StreamListener
方法,则此属性将应用于所有这些方法。 -
反序列化的异常处理与本机反序列化和框架提供的消息转换一致。
2.6.4. 在 Binder 中处理 production 异常
与上述对反序列化异常处理程序的支持不同,Binders 不提供此类处理 生产异常的一类机制。
但是,您仍然可以使用StreamsBuilderFactoryBean
Customizer,您可以在下面的后续部分中找到有关的更多详细信息。
2.7. 状态存储
当使用高级 DSL 时,Kafka Streams 会自动创建状态存储,并进行适当的调用以触发状态存储。
如果要具体化传入的KTable
binding 作为命名状态存储,那么您可以使用以下策略来实现此目的。
假设您有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后,通过设置以下属性,传入的KTable
数据将被具体化到命名的状态存储中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在应用程序中将自定义状态存储定义为 bean,这些 bean 将被 Binder 检测并添加到 Kafka Streams 构建器中。 尤其是在使用处理器 API 时,需要手动注册状态存储。 为此,您可以在应用程序中将 StateStore 创建为 Bean。 以下是定义此类 bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
然后,应用程序可以直接访问这些状态存储。
在引导过程中,上述 bean 将由 Binder 处理并传递给 Streams 构建器对象。
访问 state store:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
这在注册全局 state store 时不起作用。
要注册全局状态存储,请参阅以下有关自定义的部分StreamsBuilderFactoryBean
.
2.8. 交互式查询
Kafka Streams Binder API 公开了一个名为InteractiveQueryService
以交互方式查询 State Store。
您可以在应用程序中将其作为 Spring bean 进行访问。从应用程序访问此 bean 的一种简单方法是autowire
豆子。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦你获得了对这个 bean 的访问权限,那么你就可以查询你感兴趣的特定 state-store。见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,用于检索存储的上述方法调用可能会失败。 例如,它可能仍处于初始化 state store 的过程中。 在这种情况下,重试此作将非常有用。 Kafka Streams Binder 提供了一种简单的重试机制来适应这种情况。
以下是可用于控制此重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认为
1
. -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为
1000
毫秒。
如果有多个 kafka streams 应用程序实例正在运行,则在以交互方式查询它们之前,您需要确定哪个应用程序实例托管您正在查询的特定密钥。InteractiveQueryService
API 提供了用于识别主机信息的方法。
要使其正常工作,您必须配置属性application.server
如下:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
2.9. 健康指标
运行状况指示器需要依赖项spring-boot-starter-actuator
.对于 maven,请使用:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器来检查底层流线程的状态。
Spring Cloud Stream 定义了一个属性management.health.binders.enabled
以启用运行状况指示器。请参阅 Spring Cloud Stream 文档。
运行状况指示器为每个流线程的元数据提供以下详细信息:
-
线程名称
-
线程状态:
CREATED
,RUNNING
,PARTITIONS_REVOKED
,PARTITIONS_ASSIGNED
,PENDING_SHUTDOWN
或DEAD
-
活动任务:任务 ID 和分区
-
备用任务:任务 ID 和分区
默认情况下,只有全局状态可见 (UP
或DOWN
).为了显示详细信息,属性management.endpoint.health.show-details
必须设置为ALWAYS
或WHEN_AUTHORIZED
.
有关运行状况信息的更多详细信息,请参阅 Spring Boot Actuator 文档。
运行状况指示器的状态为UP 如果注册的所有 Kafka 线程都在RUNNING 州。 |
由于 Kafka Streams Binder 中有三个单独的 Binder (KStream
,KTable
和GlobalKTable
),它们都会报告运行状况。
启用show-details
,则报告的某些信息可能是多余的。
当同一应用程序中存在多个 Kafka Streams 处理器时,将报告所有处理器的运行状况检查,并按 Kafka Streams 的应用程序 ID 进行分类。
2.10. 访问 Kafka Streams 指标
Spring Cloud Stream Kafka Streams Binder 提供了一种基本机制,用于访问通过千分尺导出的 Kafka Streams 指标MeterRegistry
.
Kafka Streams 指标可通过KafkaStreams#metrics()
由 Binder 导出到此 Meter 注册表。
导出的指标来自使用者、创建者、管理员客户端和流本身。
Binder 导出的指标的格式为:指标组名称,后跟一个点,然后是实际的指标名称。 原始指标信息中的所有破折号都将替换为点。
例如,指标名称network-io-total
从指标组consumer-metrics
在千分尺注册表中为consumer.metrics.network.io.total
.
同样,该指标commit-total
从stream-metrics
可用作stream.metrics.commit.total
.
如果同一应用程序中有多个 Kafka Streams 处理器,则指标名称前面将加上 Kafka Streams 的相应应用程序 ID。
在这种情况下,应用程序 ID 将保持原样,即不会将破折号转换为点等。
例如,如果第一个处理器的应用程序 ID 为processor-1
,则指标名称network-io-total
从指标组consumer-metrics
在千分尺注册表中为processor-1.consumer.metrics.network.io.total
.
您可以通过编程方式访问 MicrometerMeterRegistry
在应用程序中,然后遍历可用的仪表或使用 Spring Boot actuator 通过 REST 端点访问指标。
通过 Boot actuator 端点访问时,请确保将metrics
前往住宿management.endpoints.web.exposure.include
.
然后,您可以访问/acutator/metrics
以获取所有可用量度的列表,然后可以通过同一 URI (/actuator/metrics/<metric-name>
).
超出 info 级别指标的任何内容KafkaStreams#metrics()
,(例如调试级别指标)仍然只能通过 JMX 在设置metrics.recording.level
自DEBUG
.
默认情况下,Kafka Streams 将此级别设置为INFO
.有关更多详细信息,请参阅 Kafka Streams 文档中的此部分。
在未来的发行版中,Binder 可能支持通过 Micrometer 导出这些 DEBUG 级别度量。
2.11. 混合高级 DSL 和低级处理器 API
Kafka Streams 提供两种 API 变体。
它有一个更高级别的 DSL,类似于 API,您可以在其中链接许多函数式程序员可能熟悉的各种作。
Kafka Streams 还提供对低级处理器 API 的访问。
处理器 API 虽然非常强大,并且能够在较低级别控制事物,但本质上是必不可少的。
Kafka Streams Binders for Spring Cloud Stream 允许您使用高级 DSL 或混合使用 DSL 和处理器 API。
混合使用这两种变体为您提供了很多选项来控制应用程序中的各种用例。
应用程序可以使用transform
或process
方法 API 调用来访问处理器 API。
下面我们看看如何使用process
应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
下面是一个使用transform
应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
这process
API 方法调用是终端作,而transform
API 是非终端的,它为您提供了一个潜在的转换KStream
使用它,您可以使用 DSL 或处理器 API 继续进行进一步处理。
2.12. 出站的分区支持
Kafka Streams 处理器通常会将处理后的输出发送到出站 Kafka 主题中。
如果出站主题是分区的,并且处理器需要将传出数据发送到特定的分区中,则应用程序需要提供StreamPartitioner
.
有关更多详细信息,请参阅 StreamPartitioner。
让我们看一些例子。
这是我们已经多次看到的同一个处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
以下是输出绑定目标:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题outputTopic
有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认分区策略,根据特定用例,这可能不是您想要的结果。
假设你想发送任何匹配到spring
到分区 0,cloud
到分区 1,stream
到分区 2,其他所有内容都到分区 3。
这是您需要在应用程序中执行的作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是,您可以访问记录的键/值、主题名称和分区总数。 因此,如果需要,您可以实施复杂的分区策略。
您还需要提供此 bean 名称以及应用程序配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。
2.13. StreamsBuilderFactoryBean 定制器
通常需要自定义StreamsBuilderFactoryBean
这会创建KafkaStreams
对象。
基于 Spring Kafka 提供的底层支持,binder 允许您自定义StreamsBuilderFactoryBean
.
您可以使用StreamsBuilderFactoryBeanCustomizer
要自定义StreamsBuilderFactoryBean
本身。
然后,一旦您可以访问StreamsBuilderFactoryBean
通过此定制器,您可以自定义相应的KafkaStreams
用KafkaStreamsCustomzier
.
这两个定制器都是 Spring for Apache Kafka 项目的一部分。
以下是使用StreamsBuilderFactoryBeanCustomizer
.
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
以上显示了您可以执行的作来自定义StreamsBuilderFactoryBean
.
您基本上可以从StreamsBuilderFactoryBean
进行自定义。
此定制器将在工厂 Bean 启动之前由 Binders 调用。
访问StreamsBuilderFactoryBean
,您还可以自定义底层KafkaStreams
对象。
这是执行此作的蓝图。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将由StreamsBuilderFactoryBeabn
就在底层KafkaStreams
开始。
只能有一个StreamsBuilderFactoryBeanCustomizer
在整个应用程序中。
那么我们如何考虑多个 Kafka Streams 处理器,因为每个处理器都由单独的 Kafka Streams 处理器备份StreamsBuilderFactoryBean
对象?
在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
2.13.1. 使用 Customizer 注册全局状态存储
如上所述,Binders 没有提供将全局状态存储注册为功能的第一类方法。 为此,您需要使用定制器。 这是如何做到这一点的。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
同样,如果您有多个处理器,则需要将全局状态存储附加到右侧StreamsBuilder
通过过滤掉其他StreamsBuilderFactoryBean
使用上述应用程序 ID 的对象。
2.13.2. 使用 customizer 注册生产异常处理程序
在错误处理部分中,我们指出了 Binder 没有提供处理生产异常的第一类方法。
虽然是这种情况,您仍然可以使用StreamsBuilderFacotryBean
customizer 来注册生产异常处理程序。见下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
同样,如果您有多个处理器,则可能需要根据正确的StreamsBuilderFactoryBean
.
您还可以使用 configuration 属性添加此类 production 异常处理程序(有关更多信息,请参见下文),但如果您选择使用编程方法,则这是一个选项。
2.14. 时间戳提取器
Kafka Streams 允许您根据各种时间戳概念控制使用者记录的处理。
默认情况下,Kafka Streams 会提取嵌入在使用者记录中的时间戳元数据。
您可以通过提供不同的TimestampExtractor
implementation per input binding。
以下是有关如何执行此作的一些详细信息。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后你设置上面的TimestampExtractor
每个使用者绑定的 bean 名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果您跳过用于设置自定义时间戳提取器的输入使用者绑定,则该使用者将使用默认设置。
2.15. 基于 Kafka Streams 的 Binder 和常规 Kafka Binder 的多 Binder
您可以有一个应用程序,其中同时具有基于常规 Kafka Binder 的函数/使用者/供应商和基于 Kafka Streams 的处理器。 但是,您不能在单个函数或使用者中混合使用它们。
下面是一个示例,其中同一应用程序中有两个基于 Binder 的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
以下是配置中的相关部分:
spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果你有与上述相同的应用程序,但正在处理两个不同的 Kafka 集群,例如常规的process
同时作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器作用于 Kafka 集群 2。
然后,您必须使用 Spring Cloud Stream 提供的多 Binder 工具。
以下是您的配置在这种情况下可能会发生的变化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意上述配置。
我们有两种 Binder,但总共有 3 个 Binder,第一种是基于集群 1 的常规 Kafka Binder(kafka1
),然后是另一个基于集群 2 的 Kafka Binder (kafka2
),最后是kstream
一 (kafka3
).
应用程序中的第一个处理器从kafka1
并发布到kafka2
其中,两个 Binders 都基于常规的 Kafka Binder,但集群不同。
第二个处理器是 Kafka Streams 处理器,它使用来自kafka3
它与kafka2
,但 Binder 类型不同。
由于 Kafka Streams 系列活页夹中提供了三种不同的活页夹类型: -kstream
,ktable
和globalktable
- 如果您的应用程序具有基于这些 Binder 中的任意一个的多个绑定,则需要将其显式提供为 Binder 类型。
例如,如果您有如下处理器,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
然后,必须在多 Binder 场景中配置此 ID,如下所示。 请注意,只有当您有一个真正的多 Binder 方案,其中有多个处理器在单个应用程序中处理多个集群时,才需要这样做。 在这种情况下,需要显式地为 Binders 提供 bindings,以区别于其他处理器的 Binder 类型和集群。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
2.16. 状态清理
默认情况下,Kafkastreams.cleanup()
方法在绑定停止时调用。
请参阅 Spring Kafka 文档。
要修改此行为,只需添加一个CleanupConfig
@Bean
(配置为在 start、stop 或 non-not 时清理)到应用程序上下文;该 bean 将被检测到并连接到工厂 bean。
2.17. Kafka Streams 拓扑可视化
Kafka Streams Binder 提供了以下 Actuator 终端节点,用于检索拓扑描述,您可以使用外部工具可视化拓扑。
/actuator/topology
/actuator/topology/<applicaiton-id of the processor>
您需要包含来自 Spring Boot 的 actuator 和 Web 依赖项才能访问这些端点。
此外,您还需要添加topology
自management.endpoints.web.exposure.include
财产。
默认情况下,topology
endpoint 已禁用。
2.18. 配置选项
本节包含 Kafka Streams Binders 使用的配置选项。
有关与 Binder 相关的常见配置选项和属性,请参阅核心文档。
2.18.1. Kafka Streams Binder 属性
以下属性在 Binder 级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.
- 配置
-
Map 包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以
spring.cloud.stream.kafka.streams.binder.
. 以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfig
Apache Kafka Streams 文档中的 JavaDocs。
您可以从中设置的所有配置StreamsConfig
可以通过此设置。
使用此属性时,它适用于整个应用程序,因为这是 Binder 级别属性。
如果应用程序中有多个处理器,则所有处理器都将获取这些属性。
对于像application.id
,这将成为问题,因此您必须仔细检查属性StreamsConfig
使用此 Binder 级别进行映射configuration
财产。
- functions.<function-bean-name>.applicationId
-
仅适用于功能式处理器。 这可用于设置应用程序中每个函数的应用程序 ID。 在有多个函数的情况下,这是设置应用程序 ID 的便捷方法。
- functions.<function-bean-name>.configuration
-
仅适用于功能式处理器。 Map 包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于 Binder 级别
configuration
属性,但此级别的configuration
属性仅针对命名函数。 当您有多个处理器并且想要根据特定功能限制对配置的访问时,您可能希望使用它。 都StreamsConfig
properties 可以在此处使用。 - 经纪人
-
代理 URL
违约:
localhost
- zk节点
-
Zookeeper 网址
违约:
localhost
- deserializationExceptionHandler
-
反序列化错误处理程序类型。 此处理程序在 Binder 级别应用,因此应用于应用程序中的所有 input 绑定。 有一种方法可以在 Consumer 绑定级别以更精细的方式控制它。 可能的值为 -
logAndContinue
,logAndFail
或sendToDlq
违约:
logAndFail
- 应用程序 ID
-
在 Binder 级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个函数或
StreamListener
方法,则应用程序 ID 应以不同的方式设置。 请参阅上文,其中详细讨论了设置应用程序 ID。Default:application 将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大尝试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接到 state store 时的 Backoff period。
默认值:1000 毫秒
2.18.2. Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 创建者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为方便起见,如果有多个输出绑定,并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.
.
- keySerde
-
要使用的 Key Serde
默认:请参阅上面关于消息取消/序列化的讨论
- valueSerde
-
value serde to use
默认:请参阅上面关于消息取消/序列化的讨论
- useNativeEncoding
-
启用/禁用本机编码的标志
违约:
true
.
streamPartitionerBeanName 的
要在使用者处使用的自定义出站分区器 bean 名称。
应用程序可以提供自定义StreamPartitioner
作为 Spring bean,并且此 bean 的名称可以提供给生产者使用,而不是默认名称。
+ Default:请参阅上面关于出站分区支持的讨论。
2.18.3. Kafka Streams 消费者属性
以下属性可供 Kafka Streams 使用者使用,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为方便起见,如果有多个 input 绑定,并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.
.
- 应用程序 ID
-
设置 application.id per input binding。这仅适用于
StreamListener
based processors,对于基于函数的处理器,请参阅上面概述的其他方法。默认值:见上文。
- keySerde
-
要使用的 Key Serde
默认:请参阅上面关于消息取消/序列化的讨论
- valueSerde
-
value serde to use
默认:请参阅上面关于消息取消/序列化的讨论
- materialized作为
-
state store 在使用传入的 KTable 类型时实现
违约:
none
. - useNative解码
-
启用/禁用本机解码的标志
违约:
true
. - dlq名称
-
DLQ 主题名称。
默认值:请参阅上面对错误处理和 DLQ 的讨论。
- startOffset
-
如果没有要消耗的已提交偏移量,则从 offset 开始。 这主要在使用者第一次使用某个主题时使用。 Kafka Streams 使用
earliest
作为 default 策略,并且 Binder 使用相同的 default。 这可以被覆盖为latest
使用此属性。违约:
earliest
.
注意:使用resetOffsets
对 Consumer 没有任何影响 Kafka Streams Binder。
与基于消息通道的 Binder 不同,Kafka Streams Binder 不会按需开始或结束。
- deserializationExceptionHandler
-
反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面描述的 Binder 级别属性。 可能的值为 -
logAndContinue
,logAndFail
或sendToDlq
违约:
logAndFail
- 时间戳ExtractorBeanName
-
要在使用者处使用的特定时间戳提取器 bean 名称。 应用程序可以提供
TimestampExtractor
作为 Spring bean,并且这个 bean 的名称可以提供给消费者使用,而不是默认的。Default:请参阅上面关于时间戳提取器的讨论。
2.18.4. 关于并发的特别说明
在 Kafka Streams 中,您可以使用num.stream.threads
财产。
为此,您可以使用各种configuration
上面在 Binder、Functions、Producer 或 Consumer Level 下描述的选项。
您还可以使用concurrency
核心 Spring Cloud Stream 为此目的提供的属性。
使用时,您需要在消费者上使用它。
当您在函数或StreamListener
,请在第一个 Input Binding 上设置此项。
例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency
,它将被翻译为num.stream.threads
由 Binder 提供。