此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

编程模型

当使用 Kafka Streams Binder 提供的编程模型时,高级 Streams DSL 以及更高级别和较低级别 Processor-API 的混合都可以用作选项。 当混合使用更高级别和较低级别的 API 时,这通常是通过调用transformprocessAPI 方法KStream.spring-doc.cadn.net.cn

功能风格

从 Spring Cloud Stream 开始3.0.0Kafka Streams Binder 允许使用 Java 8 中提供的函数式编程样式来设计和开发应用程序。 这意味着应用程序可以简洁地表示为类型的 lambda 表达式java.util.function.Functionjava.util.function.Consumer.spring-doc.cadn.net.cn

让我们举一个非常基本的例子。spring-doc.cadn.net.cn

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

尽管简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。 这是一个没有出站绑定,只有一个入站绑定的使用者应用程序。 应用程序使用数据,它只是记录来自KStreamkey 和 value 的 VALUE 值。 该应用程序包含SpringBootApplicationannotation 和标记为Bean. bean 方法的类型为java.util.function.Consumer其参数化为KStream. 然后,在实现中,我们返回一个 Consumer 对象,它本质上是一个 lambda 表达式。 在 lambda 表达式中,提供了用于处理数据的代码。spring-doc.cadn.net.cn

在此应用程序中,有一个类型为KStream. Binder 为应用程序创建此绑定,其名称为process-in-0,即函数 Bean 名称的名称,后跟一个破折号 () 和文本-in后跟另一个短划线,然后是参数的序号位置。 您可以使用此绑定名称来设置其他属性,例如 destination。 例如spring.cloud.stream.bindings.process-in-0.destination=my-topic.spring-doc.cadn.net.cn

如果未在绑定上设置 destination 属性,则会创建一个与绑定同名的主题(如果应用程序有足够的权限),或者该主题应该已经可用。

一旦构建为 uber-jar(例如kstream-consumer-app.jar),您可以运行上述示例,如下所示。spring-doc.cadn.net.cn

如果应用程序选择使用 Spring 的Component注解,则 Binder 也支持该模型。 上面的函数式 bean 可以重写如下。spring-doc.cadn.net.cn

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

这是另一个示例,其中它是一个同时具有输入和输出绑定的完整处理器。 这是一个经典的字数统计示例,其中应用程序从主题接收数据,然后在滚动时间窗口中计算每个单词的出现次数。spring-doc.cadn.net.cn

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

同样,这是一个完整的 Spring Boot 应用程序。此处与第一个应用程序的区别在于 bean 方法的类型为java.util.function.Function. 第一个参数化类型的Function用于输入KStream第二个用于输出。 在方法主体中,提供了一个 lambda 表达式,其类型为Function作为实现,给出了实际的业务逻辑。 与前面讨论的基于 Consumer 的应用程序类似,此处的输入绑定命名为process-in-0默认情况下。对于输出,绑定名称也会自动设置为process-out-0.spring-doc.cadn.net.cn

一旦构建为 uber-jar(例如wordcount-processor.jar),您可以运行上述示例,如下所示。spring-doc.cadn.net.cn

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将使用来自 Kafka 主题的消息words,计算结果将发布到 output 主题counts.spring-doc.cadn.net.cn

Spring Cloud Stream 将确保来自传入和传出主题的消息都自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写逻辑 在处理器中是必需的。设置 Kafka Streams 基础架构所需的 Kafka Streams 特定配置 由框架自动处理。spring-doc.cadn.net.cn

我们上面看到的两个示例有一个KStreaminput 绑定。在这两种情况下,绑定都接收来自单个主题的记录。 如果要将多个主题多路复用为单个KStream绑定,您可以在下面提供逗号分隔的 Kafka 主题作为目标。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3spring-doc.cadn.net.cn

此外,如果要将主题与常规 exression 匹配,还可以提供主题模式作为目标。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.destination=input.*spring-doc.cadn.net.cn

多个输入绑定

许多重要的 Kafka Streams 应用程序经常通过多个绑定使用来自多个主题的数据。 例如,一个主题被消耗为Kstream另一个KTableGlobalKTable. 应用程序可能希望以表类型接收数据的原因有很多。 考虑一个用例,其中底层主题是通过数据库中的变更数据捕获 (CDC) 机制填充的,或者应用程序可能只关心下游处理的最新更新。 如果应用程序指定数据需要绑定为KTableGlobalKTable,则 Kafka Streams Binder 会将目标正确绑定到KTableGlobalKTable并使其可供应用程序作。 我们将了解如何在 Kafka Streams Binder 中处理多个输入绑定的几种不同场景。spring-doc.cadn.net.cn

Kafka Streams Binder 中的 BiFunction

下面是一个示例,其中有两个 inputs 和一个 output。在这种情况下,应用程序可以利用java.util.function.BiFunction.spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

在这里,基本主题与前面的示例相同,但这里我们有两个输入。 Java 的BiFunctionsupport 用于将输入绑定到所需的目标。 Binder 为输入生成的默认绑定名称为process-in-0process-in-1分别。默认输出绑定为process-out-0. 在此示例中,第一个参数BiFunction绑定为KStream对于第一个输入,第二个参数被绑定为KTable对于第二个输入。spring-doc.cadn.net.cn

Kafka Streams Binder 中的 BiConsumer

如果有两个 inputs,但没有 outputs,在这种情况下,我们可以使用java.util.function.BiConsumer如下所示。spring-doc.cadn.net.cn

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}

超出两个输入

如果您有两个以上的输入怎么办? 在某些情况下,您需要两个以上的输入。在这种情况下,Binders 允许您链接部分函数。 在函数式编程术语中,这种技术通常称为柯里化。 随着 Java 8 中添加了函数式编程支持,Java 现在允许您编写柯里化函数。 Spring Cloud Stream Kafka Streams Binder 可以利用此功能来启用多个 Importing 绑定。spring-doc.cadn.net.cn

让我们看一个例子。spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

让我们看看上面介绍的绑定模型的详细信息。 在这个模型中,我们在入站上有 3 个部分应用的函数。我们把它们称为f(x),f(y)f(z). 如果我们在真正的数学函数的意义上扩展这些函数,它将如下所示:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>. 这xvariable 代表KStream<Long, Order>yvariable 代表GlobalKTable<Long, Customer>zvariable 代表GlobalKTable<Long, Product>. 第一个函数f(x)具有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出为函数 f(y)。 函数f(y)具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>),它的输出是另一个函数f(z). 函数的输入f(z)是应用程序的第三个输入 (GlobalKTable<Long, Product>),其输出为KStream<Long, EnrichedOrder>这是应用程序的最终输出绑定。 来自三个偏函数的输入,它们是KStream,GlobalKTable,GlobalKTable分别在方法主体中可供您使用,用于将业务逻辑作为 Lambda 表达式的一部分实施。spring-doc.cadn.net.cn

输入绑定命名为enrichOrder-in-0,enrichOrder-in-1enrichOrder-in-2分别。输出绑定命名为enrichOrder-out-0.spring-doc.cadn.net.cn

使用柯里化函数,您几乎可以拥有任意数量的 Importing。但是,请记住,在 Java 中,除了较少数量的输入和部分应用的函数之外,任何其他内容都可能导致代码不可读。 因此,如果您的 Kafka Streams 应用程序需要的输入绑定数量超过合理较少的输入绑定,并且您希望使用此函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。spring-doc.cadn.net.cn

输出绑定

Kafka Streams Binder 允许KStreamKTable作为输出绑定。 在后台,Binder 使用tomethod 开启KStream将结果记录发送到 Output 主题。 如果应用程序提供了KTable作为函数中的输出,Binder 仍然使用这种技术,方法是将tomethod 的KStream.spring-doc.cadn.net.cn

例如,以下两个函数都可以工作:spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

多个输出绑定

Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。 使用多个输出绑定时,您需要提供一个 KStream 数组 (KStream[]) 作为出站返回类型。spring-doc.cadn.net.cn

下面是一个示例:spring-doc.cadn.net.cn

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

编程模型保持不变,但出站参数化类型为KStream[]. 默认输出绑定名称为process-out-0,process-out-1,process-out-2分别用于上述函数。 Binder 生成三个 output bindings 的原因是,它检测返回的KStream数组设置为 3。 请注意,在此示例中,我们提供了一个noDefaultBranch();如果我们使用了defaultBranch()相反,这将需要一个额外的输出绑定,本质上是返回一个KStream长度为 4 的数组。spring-doc.cadn.net.cn

Kafka Streams 的基于函数的编程样式摘要

总之,下表显示了可在函数范例中使用的各种选项。spring-doc.cadn.net.cn

输入数量 输出数量 要使用的组件

1spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

java.util.function.Consumerspring-doc.cadn.net.cn

2spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

java.util.function.BiConsumerspring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

1..nspring-doc.cadn.net.cn

java.util.function.Functionspring-doc.cadn.net.cn

2spring-doc.cadn.net.cn

1..nspring-doc.cadn.net.cn

java.util.function.BiFunctionspring-doc.cadn.net.cn

>= 3spring-doc.cadn.net.cn

0..nspring-doc.cadn.net.cn

使用柯里化函数spring-doc.cadn.net.cn

Kafka Streams Binder 中的函数组合

Kafka Streams Binder 支持线性拓扑的最小形式的函数组合。 使用 Java 函数式 API 支持,您可以编写多个函数,然后使用andThen方法。 例如,假设您有以下两个函数。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

即使 Binder 中没有函数组合支持,您也可以按如下方式组合这两个函数。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

然后,您可以提供表单的定义spring.cloud.function.definition=foo;bar;composed. 使用 Binder 中的函数组合支持,您无需编写第三个函数,在其中执行显式函数组合。spring-doc.cadn.net.cn

您可以简单地执行以下作:spring-doc.cadn.net.cn

spring.cloud.function.definition=foo|bar

您甚至可以这样做:spring-doc.cadn.net.cn

spring.cloud.function.definition=foo|bar;foo;bar

在此示例中,组合函数的默认绑定名称将变为foobar-in-0foobar-out-0.spring-doc.cadn.net.cn

Kafka Streams Bincer 中函数组合的限制

当您拥有java.util.function.Functionbean,它可以由另一个函数或多个函数组成。 同一个函数 Bean 可以用java.util.function.Consumer也。在这种情况下, consumer 是最后一个组合的组件。 一个函数可以由多个函数组成,然后以java.util.function.Consumer豆子。spring-doc.cadn.net.cn

当编写java.util.function.BiFunctionBiFunction必须是定义中的第一个函数。 组合实体必须是java.util.function.Functionjava.util.funciton.Consumer. 换句话说,您不能将BiFunctionBean 编写 Bean 的 Bean 文件,然后与另一个 Bean 组合BiFunction.spring-doc.cadn.net.cn

不能使用BiConsumer或定义,其中Consumer是第一个组件。 你也不能使用输出为数组 (KStream[]for branching),除非这是定义中的最后一个组件。spring-doc.cadn.net.cn

第一个FunctionBiFunction也可以在函数定义中使用柯里化形式。 例如,以下情况是可能的。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函数定义可以是curriedFoo|bar. 在后台,Binder 将为柯里化函数创建两个输入绑定,并根据定义中的最终函数创建一个输出绑定。 在这种情况下,默认输入绑定将是curriedFoobar-in-0curriedFoobar-in-1. 此示例的默认输出绑定变为curriedFoobar-out-0.spring-doc.cadn.net.cn

使用时特别说明KTable作为函数组合中的输出

假设您有以下两个函数。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

你可以将它们组合为foo|bar,但请记住,第二个函数 (bar在这种情况下)必须具有KTable作为输入,自第一个函数 (foo) 具有KTable作为输出。spring-doc.cadn.net.cn