此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

@KafkaListener生命周期管理

@KafkaListener注释不是应用程序上下文中的 bean。 相反,它们被注册到类型为KafkaListenerEndpointRegistry. 这个 bean 由框架自动声明并管理容器的生命周期;它将自动启动任何具有autoStartup设置为true. 所有容器工厂创建的所有容器必须位于同一phase. 有关更多信息,请参阅 Listener Container Auto Startup。 您可以使用注册表以编程方式管理生命周期。 启动或停止注册表将启动或停止所有已注册的容器。 或者,您可以使用其id属性。 您可以设置autoStartup在注解上,这将覆盖在 Container Factory 中配置的默认设置。 您可以从应用程序上下文(例如自动布线)获取对 bean 的引用,以管理其已注册的容器。 以下示例说明如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表仅维护其管理的容器的生命周期;声明为 bean 的容器不受注册表管理,可以从应用程序上下文中获取。 可以通过调用注册表的getListenerContainers()方法。 版本 2.2.5 添加了便捷方法getAllListenerContainers(),它返回所有容器的集合,包括由 Registry 管理的容器和声明为 bean 的容器。 返回的集合将包括任何已初始化的原型 bean,但它不会初始化任何惰性 bean 声明。spring-doc.cadn.net.cn

在刷新应用程序上下文后注册的端点将立即启动,无论其autoStartup属性,以遵守SmartLifecycle合同,其中autoStartup仅在应用程序上下文初始化期间考虑。 延迟注册的一个示例是 Bean 的@KafkaListener在 prototype 范围内,在初始化上下文后创建实例。 从版本 2.8.7 开始,您可以设置注册表的alwaysStartAfterRefreshproperty 设置为false然后是容器的autoStartup属性将定义容器是否启动。

从 KafkaListenerEndpointRegistry 检索 MessageListenerContainers

KafkaListenerEndpointRegistry提供检索方法MessageListenerContainer实例来适应一系列管理方案:spring-doc.cadn.net.cn

All Containers:对于覆盖所有侦听器容器的作,请使用getListenerContainers()检索综合集合。spring-doc.cadn.net.cn

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();

按 ID 指定容器:要管理单个容器,getListenerContainer(String id)启用按其 ID 检索。spring-doc.cadn.net.cn

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");

动态容器过滤:在 3.2 版中引入,两个重载getListenerContainersMatching方法允许对容器进行精细选择。 一种方法采用Predicate<String>作为参数进行基于 ID 的筛选,而另一个则采用BiPredicate<String, MessageListenerContainer>对于可能包含容器属性或状态作为参数的更高级条件。spring-doc.cadn.net.cn

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
    registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
    registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
    registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
    registry.getListenerContainersMatching(
        (id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
    );

利用这些方法有效地管理和查询MessageListenerContainer实例。spring-doc.cadn.net.cn