此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

状态存储

当使用高级 DSL 时,Kafka Streams 会自动创建状态存储,并进行适当的调用以触发状态存储。spring-doc.cadn.net.cn

如果要具体化传入的KTablebinding 作为命名状态存储,那么您可以使用以下策略来实现此目的。spring-doc.cadn.net.cn

假设您有以下函数。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然后,通过设置以下属性,传入的KTable数据将被具体化到命名的状态存储中。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在应用程序中将自定义状态存储定义为 bean,这些 bean 将被 Binder 检测并添加到 Kafka Streams 构建器中。 尤其是在使用处理器 API 时,需要手动注册状态存储。 为此,您可以在应用程序中将 StateStore 创建为 Bean。 以下是定义此类 bean 的示例。spring-doc.cadn.net.cn

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

然后,应用程序可以直接访问这些状态存储。spring-doc.cadn.net.cn

在引导过程中,上述 bean 将由 Binder 处理并传递给 Streams 构建器对象。spring-doc.cadn.net.cn

访问 state store:spring-doc.cadn.net.cn

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

这在注册全局 state store 时不起作用。 要注册全局状态存储,请参阅以下有关自定义的部分StreamsBuilderFactoryBean.spring-doc.cadn.net.cn