此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1

表 1. 性能ContainerProperties
财产 违约 描述

ackCount

1

提交挂起偏移量之前的记录数,当 为 或 .ackModeCOUNTCOUNT_TIME

adviceChain

null

一连串的对象(例如 围绕建议)包装消息侦听器,按顺序调用。AdviceMethodInterceptor

ackMode

控制提交偏移量的频率 - 请参阅提交偏移量。

ackTime

5000

当 为 或 时,提交挂起偏移量的时间(以毫秒为单位)。ackModeTIMECOUNT_TIME

assignmentCommitOption

LATEST_ONLY _NO_TX

是否在外派时承诺初始职位;默认情况下,仅当 是 时才会提交初始偏移量,即使存在事务管理器,它也不会在事务中运行。 有关可用选项的更多信息,请参见 JavaDocs。ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatestContainerProperties.AssignmentCommitOption

asyncAcks

false

启用无序提交(请参阅手动提交偏移量);使用者将暂停并推迟提交,直到填补空白。

authExceptionRetryInterval

null

如果不为 null,则当 Kafka 客户端抛出 or 时,a 在轮询之间休眠。 如果为 null,则此类异常被视为致命异常,容器将停止。DurationAuthenticationExceptionAuthorizationException

batchRecoverAfterRollback

false

设置为要启用批量恢复,请参阅回滚处理器之后true

clientId

(空字符串)

使用者属性的前缀。 覆盖消费工厂属性;在并发容器中,作为每个使用者实例的后缀添加。client.idclient.id-n

checkDeserExWhenKeyNull

设置为在收到 a 时始终检查标头。 当使用者代码无法确定是否已配置时(例如使用委派反序列化程序时),很有用。trueDeserializationExceptionnullkeyErrorHandlingDeserializer

checkDeserExWhenValueNull

设置为在收到 a 时始终检查标头。 当使用者代码无法确定是否已配置时(例如使用委派反序列化程序时),很有用。trueDeserializationExceptionnullvalueErrorHandlingDeserializer

commitCallback

null

如果存在,并且是提交完成后调用的回调。syncCommitsfalse

commitLogLevel

调试

与提交偏移量相关的日志的日志记录级别。

consumerRebalanceListener

null

再平衡侦听器;请参阅重新平衡侦听器

commitRetries

3

使用 set 设置为 true 时的重试次数。 默认值为 3(总共 4 次尝试)。RetriableCommitFailedExceptionsyncCommits

consumerStartTimeout

30年代

在记录错误之前等待使用者启动的时间;例如,如果您使用线程不足的任务执行程序,则可能会发生这种情况。

deliveryAttemptHeader

false

请参阅投放尝试标头

eosMode

V2

Exactly Once 语义模式;请参阅 Exactly Once Semantics

fixTxOffsets

false

当使用由事务生产者生成的记录时,并且使用者位于分区的末尾,由于用于指示事务提交/回滚的伪记录,并且可能存在回滚记录,因此可能会错误地将滞后报告为大于零。 这在功能上不会影响消费者,但一些用户表示担心“滞后”不为零。 将此属性设置为,容器将更正此类错误报告的偏移量。 该检查在下一次轮询之前执行,以避免显著增加提交处理的复杂性。 在撰写本文时,仅当使用者配置为 1 且大于 1 时,才会纠正滞后。 有关详细信息,请参阅 KAFKA-10683trueisolation.level=read_committedmax.poll.records

groupId

null

覆盖消费者财产;由 or 属性自动设置。group.id@KafkaListeneridgroupId

idleBeforeDataMultiplier

5.0

在收到任何记录之前应用乘数。 收到记录后,将不再应用乘数。 从 2.8 版开始可用。idleEventInterval

idleBetweenPolls

0

用于通过在投票之间休眠线程来减慢交付速度。 处理一批记录加上此值的时间必须小于使用者属性。max.poll.interval.ms

idleEventInterval

null

设置后,启用 s 的发布,请参阅应用程序事件检测空闲和无响应使用者。 另请参见。ListenerContainerIdleEventidleBeforeDataMultiplier

idlePartitionEventInterval

null

设置后,启用 s 的发布,请参阅应用程序事件检测空闲和无响应使用者ListenerContainerIdlePartitionEvent

kafkaConsumerProperties

没有

用于覆盖在使用者工厂上配置的任何任意使用者属性。

kafkaAwareTransactionManager

null

请参阅交易记录。

listenerTaskExecutor

SimpleAsyncTaskExecutor

用于运行使用者线程的任务执行器。 默认执行程序创建名为 的线程;其中,名称是 Bean 名称;with the name 是后缀为 bean 名称,其中 n 为每个子容器递增。<name>-C-nKafkaMessageListenerContainerConcurrentMessageListenerContainer-n

logContainerConfig

false

设置为在 INFO 级别记录所有容器属性。true

messageListener

null

消息侦听器。

micrometerEnabled

true

是否为消费类线程维护千分尺计时器。

micrometerTags

要添加到千分米指标的静态标签图。

micrometerTagsProvider

null

根据消费者记录提供动态标签的函数。

missingTopicsFatal

false

当 true 时,如果代理上不存在配置的主题,则阻止容器启动。

monitorInterval

30年代

多久检查一次使用者线程的状态。 请参见 和 。NonResponsiveConsumerEventnoPollThresholdpollTimeout

noPollThreshold

3.0

乘以以确定是否发布 . 看。pollTimeOutNonResponsiveConsumerEventmonitorInterval

observationConvention

null

设置后,根据消费者记录中的信息向计时器和跟踪添加动态标签。

observationEnabled

false

设置为允许通过千分尺进行观察。true

offsetAndMetadataProvider

null

的提供程序 ;默认情况下,提供程序使用空元数据创建偏移量和元数据。提供程序提供了一种自定义元数据的方法。OffsetAndMetadata

onlyLogRecordMetadata

false

设置为记录完整的使用者记录(错误、调试日志等),而不仅仅是 。falsetopic-partition@offset

pauseImmediate

false

当容器暂停时,在当前记录之后停止处理,而不是在处理上一个轮询的所有记录后停止处理;其余记录保留在内存中,并在恢复容器时传递给侦听器。

pollTimeout

5000

超时以毫秒为单位传递。Consumer.poll()

pollTimeoutWhilePaused

100

当容器处于暂停状态时传递的超时(以毫秒为单位)。Consumer.poll()

restartAfterAuthExceptions

如果容器因授权/身份验证异常而停止,则为 True 以重新启动容器。

scheduler

ThreadPoolTaskScheduler

运行使用者监视任务的调度程序。

shutdownTimeout

10000

在所有使用者停止之前和发布容器停止事件之前阻止方法的最长时间(以毫秒为单位)。stop()

stopContainerWhenFenced

false

如果抛出 ,请停止侦听器容器。 有关详细信息,请参阅回滚后处理器ProducerFencedException

stopImmediate

false

当容器停止时,请在当前记录之后停止处理,而不是在处理上一个轮询中的所有记录之后停止处理。

subBatchPerPartition

请参见 desc.

使用批处理侦听器时,如果是 ,则调用侦听器,并将轮询结果拆分为子批处理,每个分区一个子批处理。 违约。truefalse

syncCommitTimeout

null

使用 when 的超时时间为 。 如果未设置,容器将尝试确定使用者属性并使用它;否则将使用 60 秒。syncCommitstruedefault.api.timeout.ms

syncCommits

true

是否使用同步或异步提交进行偏移;看。commitCallback

主题主题模式主题分区

不适用

配置的主题、主题模式或显式分配的主题/分区。 互斥;必须至少提供一份;由构造函数强制执行。ContainerProperties

transactionManager

null

自 3.2 起不推荐使用,请参见 [kafkaAwareTransactionManager]其他事务管理器

表 2. 性能AbstractListenerContainer
财产 违约 描述

afterRollbackProcessor

DefaultAfterRollbackProcessor

在事务回滚后调用的 An 。AfterRollbackProcessor

applicationEventPublisher

应用程序上下文

事件发布者。

batchErrorHandler

请参见 desc.

已弃用 - 请参阅。commonErrorHandler

batchInterceptor

null

在调用批处理侦听器之前将 a 设置为调用;不适用于录音侦听器。 另请参见。BatchInterceptorinterceptBeforeTx

beanName

Bean 名称

容器的 Bean 名称;后缀为 for 子容器。-n

commonErrorHandler

请参见 desc.

DefaultErrorHandler或者当使用 A 时提供 A。 请参阅容器错误处理程序nulltransactionManagerDefaultAfterRollbackProcessor

containerProperties

ContainerProperties

容器属性实例。

groupId

请参见 desc.

如果存在,则为消费品工厂的属性。containerProperties.groupIdgroup.id

interceptBeforeTx

true

确定是在事务开始之前还是之后调用。recordInterceptor

listenerId

请参见 desc.

用户配置容器的 Bean 名称或 s 的属性。id@KafkaListener

listenerInfo

要在标头中填充的值。 对于 ,此值是从属性中获取的。 此标头可以在各种位置使用,例如 ,以及侦听器代码本身。KafkaHeaders.LISTENER_INFO@KafkaListenerinfoRecordInterceptorRecordFilterStrategy

pauseRequested

(只读)

如果已请求使用者暂停,则为 True。

recordInterceptor

null

在调用记录侦听器之前设置 a to call;不适用于批处理侦听器。 另请参见。RecordInterceptorinterceptBeforeTx

topicCheckTimeout

30年代

当容器属性为 时,操作完成的等待时间(以秒为单位)。missingTopicsFataltruedescribeTopics

表 3. 性能KafkaMessageListenerContainer
财产 违约 描述

assignedPartitions

(只读)

当前分配给此容器的分区(显式或未显式)。

assignedPartitionsByClientId

(只读)

当前分配给此容器的分区(显式或未显式)。

clientIdSuffix

null

由并发容器用于为每个子容器的使用者提供唯一的 .client.id

containerPaused

不适用

如果已请求暂停,并且使用者实际上已暂停,则为 true。

表 4. 性能ConcurrentMessageListenerContainer
财产 违约 描述

alwaysClientIdSuffix

true

设置为 false 可禁止在 仅为 1 时向 consumer 属性添加后缀。client.idconcurrency

assignedPartitions

(只读)

当前分配给此容器的子 s(显式或未显式)的分区的聚合。KafkaMessageListenerContainer

assignedPartitionsByClientId

(只读)

当前分配给此容器的子容器的分区(显式或不显式),由子容器的使用者属性键控。KafkaMessageListenerContainerclient.id

concurrency

1

要管理的子项数。KafkaMessageListenerContainer

containerPaused

不适用

如果已请求暂停,并且所有子容器的使用者实际上都已暂停,则为 true。

containers

不适用

对所有子项的引用。KafkaMessageListenerContainer