此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1

以下 Spring 应用程序事件由侦听器容器及其使用者发布:

  • ConsumerStartingEvent:在消费者线程首次启动时发布,在开始轮询之前发布。

  • ConsumerStartedEvent:在使用者即将开始轮询时发布。

  • ConsumerFailedToStartEvent:如果容器属性中未发布,则已发布。 此事件可能表示配置的任务执行程序没有足够的线程来支持使用它的容器及其并发性。 发生这种情况时,还会记录一条错误消息。ConsumerStartingEventconsumerStartTimeout

  • ListenerContainerIdleEvent:在未收到任何消息时发布(如果已配置)。idleInterval

  • ListenerContainerNoLongerIdleEvent:在先前发布 .ListenerContainerIdleEvent

  • ListenerContainerPartitionIdleEvent:在未收到来自该分区的消息时发布(如果已配置)。idlePartitionEventInterval

  • ListenerContainerPartitionNoLongerIdleEvent:当从以前发布过 .ListenerContainerPartitionIdleEvent

  • NonResponsiveConsumerEvent:当使用者在方法中似乎被阻止时发布。poll

  • ConsumerPartitionPausedEvent:当分区暂停时,由每个使用者发布。

  • ConsumerPartitionResumedEvent:由每个使用者在恢复分区时发布。

  • ConsumerPausedEvent:容器暂停时由每个使用者发布。

  • ConsumerResumedEvent:由每个使用者在容器恢复时发布。

  • ConsumerStoppingEvent:由每个消费者在停止前发布。

  • ConsumerStoppedEvent:在使用者关闭后发布。 请参阅线程安全

  • ConsumerRetryAuthEvent:当使用者的身份验证或授权失败且正在重试时发布。

  • ConsumerRetryAuthSuccessfulEvent:在成功重试身份验证或授权时发布。只有在之前有过的情况下才会发生。ConsumerRetryAuthEvent

  • ContainerStoppedEvent:在所有使用者都停止时发布。

默认情况下,应用程序上下文的事件多播程序在调用线程上调用事件侦听器。 如果将多播程序更改为使用异步执行程序,则当事件包含对使用者的引用时,不得调用任何方法。Consumer

具有以下属性:ListenerContainerIdleEvent

  • source:发布事件的侦听器容器实例。

  • container:侦听器容器或父侦听器容器(如果源容器是子容器)。

  • id:侦听器 ID(或容器 Bean 名称)。

  • idleTime:发布事件时容器处于空闲状态的时间。

  • topicPartitions:生成事件时为容器分配的主题和分区。

  • consumer:对 Kafka 对象的引用。 例如,如果以前调用了使用者的方法,则在收到事件时可以调用。Consumerpause()resume()

  • paused:容器当前是否处于暂停状态。 有关详细信息,请参阅暂停和恢复侦听器容器

具有相同的属性,但 和 除外。ListenerContainerNoLongerIdleEventidleTimepaused

具有以下属性:ListenerContainerPartitionIdleEvent

  • source:发布事件的侦听器容器实例。

  • container:侦听器容器或父侦听器容器(如果源容器是子容器)。

  • id:侦听器 ID(或容器 Bean 名称)。

  • idleTime:发布事件时,时间分区消耗处于空闲状态。

  • topicPartition:触发事件的主题和分区。

  • consumer:对 Kafka 对象的引用。 例如,如果以前调用了使用者的方法,则在收到事件时可以调用。Consumerpause()resume()

  • paused:该使用者的分区消耗当前是否已暂停。 有关详细信息,请参阅暂停和恢复侦听器容器

具有相同的属性,但 和 除外。ListenerContainerPartitionNoLongerIdleEventidleTimepaused

具有以下属性:NonResponsiveConsumerEvent

  • source:发布事件的侦听器容器实例。

  • container:侦听器容器或父侦听器容器(如果源容器是子容器)。

  • id:侦听器 ID(或容器 Bean 名称)。

  • timeSinceLastPoll:容器上次调用之前的时间。poll()

  • topicPartitions:生成事件时为容器分配的主题和分区。

  • consumer:对 Kafka 对象的引用。 例如,如果以前调用了使用者的方法,则在收到事件时可以调用。Consumerpause()resume()

  • paused:容器当前是否处于暂停状态。 有关详细信息,请参阅暂停和恢复侦听器容器

、 和 事件具有以下属性:ConsumerPausedEventConsumerResumedEventConsumerStopping

  • source:发布事件的侦听器容器实例。

  • container:侦听器容器或父侦听器容器(如果源容器是子容器)。

  • partitions:涉及的实例。TopicPartition

事件具有以下属性:ConsumerPartitionPausedEventConsumerPartitionResumedEvent

  • source:发布事件的侦听器容器实例。

  • container:侦听器容器或父侦听器容器(如果源容器是子容器)。

  • partition:涉及的实例。TopicPartition

该事件具有以下属性:ConsumerRetryAuthEvent

  • source:发布事件的侦听器容器实例。

  • container:侦听器容器或父侦听器容器(如果源容器是子容器)。

  • reason:

    • AUTHENTICATION- 由于身份验证异常,事件已发布。

    • AUTHORIZATION- 由于授权异常,事件已发布。

、 、 、 和 事件具有以下属性:ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent

  • source:发布事件的侦听器容器实例。

  • container:侦听器容器或父侦听器容器(如果源容器是子容器)。

所有容器(无论是子容器还是父容器)都发布 . 对于父容器,源和容器属性是相同的。ContainerStoppedEvent

此外,还具有以下附加属性:ConsumerStoppedEvent

  • reason:

    • NORMAL- 消费者正常停止(容器已停止)。

    • ERROR- 一个被扔了。java.lang.Error

    • FENCED- 事务生产者被围起来,容器属性为 。stopContainerWhenFencedtrue

    • AUTH- 一个 OR 被抛出,但未配置。AuthenticationExceptionAuthorizationExceptionauthExceptionRetryInterval

    • NO_OFFSET- 分区没有偏移量,策略为 。auto.offset.resetnone

在遇到此类情况后,可以使用此事件重新启动容器:

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}
默认情况下,应用程序上下文的事件多播程序在调用线程上调用事件侦听器。 如果将多播程序更改为使用异步执行程序,则当事件包含对使用者的引用时,不得调用任何方法。Consumer

检测空闲和无响应的使用者

虽然效率很高,但异步使用者的一个问题是检测它们何时处于空闲状态。 如果一段时间内没有消息到达,您可能需要执行一些操作。

您可以将侦听器容器配置为在一段时间后没有消息传递时发布。 当容器处于空闲状态时,每毫秒发布一个事件。ListenerContainerIdleEventidleEventInterval

若要配置此功能,请在容器上设置。 以下示例演示如何执行此操作:idleEventInterval

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下示例演示如何设置 for :idleEventInterval@KafkaListener

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在每种情况下,当容器处于空闲状态时,每分钟发布一次事件。

如果由于某种原因,使用者方法未退出,则不会收到任何消息,并且无法生成空闲事件(这是无法访问代理时早期版本的问题)。 在这种情况下,如果轮询未在属性中返回,则容器会发布。 默认情况下,此检查在每个容器中每 30 秒执行一次。 您可以通过在配置侦听器容器时设置(默认为 30 秒)和(默认为 3.0)属性来修改此行为。 应大于以避免由于争用条件而产生虚假事件。 接收此类事件后,您可以停止容器,从而唤醒使用者,使其可以停止。poll()kafka-clientsNonResponsiveConsumerEvent3xpollTimeoutmonitorIntervalnoPollThresholdContainerPropertiesnoPollThreshold1.0

从版本 2.6.2 开始,如果容器已发布 ,它将在随后收到记录时发布 。ListenerContainerIdleEventListenerContainerNoLongerIdleEvent

事件消耗

您可以通过实现来捕获这些事件 - 一个普通的侦听器或一个缩小到仅接收此特定事件的侦听器。 您还可以使用 Spring Framework 4.2 中引入的 。ApplicationListener@EventListener

下一个示例合并到一个类中。 您应该了解应用程序侦听器会获取所有容器的事件,因此,如果要根据哪个容器处于空闲状态执行特定操作,则可能需要检查侦听器 ID。 您也可以将 's 用于此目的。@KafkaListener@EventListener@EventListenercondition

有关事件属性的信息,请参阅应用程序事件

该事件通常发布在使用者线程上,因此可以安全地与对象交互。Consumer

以下示例同时使用 和 :@KafkaListener@EventListener

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
事件侦听器可以查看所有容器的事件。 因此,在前面的示例中,我们根据侦听器 ID 缩小接收到的事件范围。 由于为支持并发而创建的容器,因此实际容器的名称为 其中 是支持并发的每个实例的唯一值。 这就是我们在条件中使用的原因。@KafkaListenerid-nnstartsWith
如果希望使用 idle 事件停止 lister 容器,则不应调用调用侦听器的线程。 这样做会导致延迟和不必要的日志消息。 相反,应将事件移交给其他线程,然后该线程可以停止容器。 此外,如果容器实例是子容器,则不应使用容器实例。 应改为停止并发容器。container.stop()stop()

空闲时的当前位置

请注意,当检测到空闲时,您可以通过在侦听器中实现来获取当前位置。 在搜索中查看。ConsumerSeekAwareonIdleContainer()

事件侦听器可以查看所有容器的事件。 因此,在前面的示例中,我们根据侦听器 ID 缩小接收到的事件范围。 由于为支持并发而创建的容器,因此实际容器的名称为 其中 是支持并发的每个实例的唯一值。 这就是我们在条件中使用的原因。@KafkaListenerid-nnstartsWith
如果希望使用 idle 事件停止 lister 容器,则不应调用调用侦听器的线程。 这样做会导致延迟和不必要的日志消息。 相反,应将事件移交给其他线程,然后该线程可以停止容器。 此外,如果容器实例是子容器,则不应使用容器实例。 应改为停止并发容器。container.stop()stop()