此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

有选择地手动启动 Kafka Streams 处理器

虽然上面列出的方法将无条件地应用自动启动false到应用程序中的所有 Kafka Streams 处理器StreamsBuilderFactoryManager,通常希望仅单独选择的 Kafka Streams 处理器不自动启动。 例如,假设您的应用程序中有三个不同的功能(处理器),对于其中一个处理器,您不希望在应用程序启动过程中启动它。 这是这种情况的一个例子。spring-doc.cadn.net.cn

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

在上面的场景中,如果你将spring.kafka.streams.auto-startupfalse,则在应用程序启动期间,任何处理器都不会自动启动。 在这种情况下,您必须如上所述通过调用start()在底层StreamsBuilderFactoryManager. 但是,如果我们有一个用例,即只选择性地禁用一个处理器,那么你必须将auto-startup在该处理器的单个绑定上。 让我们假设我们不希望我们的process3函数自动启动。 这是一个BiFunction具有两个输入绑定 -process3-in-0process3-in-1. 为了避免此处理器的自动启动,您可以选择这些 input binding中的任何一个,并将auto-startup在他们身上。 选择哪种装订并不重要;如果您愿意,您可以设置auto-startupfalse对他们两个来说,但一个就足够了。 因为它们共享同一个工厂 bean,所以您不必在两个绑定上将 autoStartup 设置为 false,但为了清楚起见,这样做可能是有意义的。spring-doc.cadn.net.cn

以下是 Spring Cloud Stream 属性,可用于禁用此处理器的自动启动。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

然后,您可以使用 REST 端点或使用BindingsEndpointAPI 中,如下所示。 为此,您需要确保 Spring Boot 执行器依赖于 Classpath。spring-doc.cadn.net.cn

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

有关此机制的更多详细信息,请参阅参考文档中的此部分spring-doc.cadn.net.cn

通过禁用auto-startup如本节所述,请注意,这仅适用于 Consumer Bindings。 换句话说,如果您使用 producer 绑定,process3-out-0,这在禁用处理器的自动启动方面没有任何影响,尽管此 producer 绑定使用相同的StreamsBuilderFactoryBean作为使用者绑定。