此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

Kafka Streams 应用程序中基于事件类型的路由

Kafka Streams Binders 不支持基于常规消息通道的 Binders 中可用的路由函数。 但是,Kafka Streams Binder 仍然通过入站记录上的事件类型记录标头提供路由功能。spring-doc.cadn.net.cn

要启用基于事件类型的路由,应用程序必须提供以下属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.spring-doc.cadn.net.cn

这可以是逗号分隔的值。spring-doc.cadn.net.cn

例如,假设我们有这个函数:spring-doc.cadn.net.cn

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

我们还假设,如果传入记录的事件类型为foobar. 这可以用eventTypes属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,barspring-doc.cadn.net.cn

现在,当应用程序运行时,Binder 会检查标头的每个传入记录event_type并查看它是否将 value 设置为foobar. 如果找不到其中任何一个,则将跳过函数执行。spring-doc.cadn.net.cn

默认情况下,Binder 期望记录标头键为event_type,但可以按绑定更改。 例如,如果我们想将此绑定上的标头键更改为my_event而不是默认值,可以按如下所示进行更改。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.spring-doc.cadn.net.cn

在 Kafkfa Streams Binder 中使用事件路由功能时,它使用字节数组Serde反序列化所有传入的记录。 如果记录标头与事件类型匹配,则只有它使用实际的Serde使用已配置的或推断的Serde. 如果您在绑定上设置反序列化异常处理程序,这会引入问题,因为预期的反序列化仅发生在堆栈中,从而导致意外错误。 为了解决此问题,您可以在绑定上设置以下属性,以强制 Binder 使用 configured 或 inferredSerde而不是字节数组Serde.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEventsspring-doc.cadn.net.cn

这样,应用程序可以在使用事件路由功能时立即检测到反序列化问题,并可以做出适当的处理决策。spring-doc.cadn.net.cn