此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.2.0spring-doc.cadn.net.cn

混合高级 DSL 和低级处理器 API

Kafka Streams 提供两种 API 变体。 它有一个更高级别的 DSL,类似于 API,您可以在其中链接许多函数式程序员可能熟悉的各种作。 Kafka Streams 还提供对低级处理器 API 的访问。 处理器 API 虽然非常强大,并且能够在较低级别控制事物,但本质上是必不可少的。 Kafka Streams Binders for Spring Cloud Stream 允许您使用高级 DSL 或混合使用 DSL 和处理器 API。 混合使用这两种变体为您提供了很多选项来控制应用程序中的各种用例。 应用程序可以使用transformprocess方法 API 调用来访问处理器 API。spring-doc.cadn.net.cn

下面我们看看如何使用process应用程序接口。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

下面是一个使用transform应用程序接口。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

processAPI 方法调用是终端作,而transformAPI 是非终端的,它为您提供了一个潜在的转换KStream使用它,您可以使用 DSL 或处理器 API 继续进行进一步处理。spring-doc.cadn.net.cn