对于最新的稳定版本,请使用 spring-cloud-stream 4.2.0spring-doc.cadn.net.cn

记录序列化和反序列化

Kafka Streams Binder 允许您以两种方式序列化和反序列化记录。 一个是 Kafka 提供的原生序列化和反序列化工具,另一个是 Spring Cloud Stream 框架的消息转换能力。 让我们看看一些细节。spring-doc.cadn.net.cn

入站反序列化

键总是使用本机 Serdes 反序列化。spring-doc.cadn.net.cn

对于值,默认情况下,入站上的反序列化由 Kafka 本机执行。 请注意,这是对以前版本的 Kafka Streams Binder 默认行为的重大更改,其中反序列化是由框架完成的。spring-doc.cadn.net.cn

Kafka Streams Binder 将尝试推断匹配Serde类型,方法是查看java.util.function.Function|Consumer. 这是它与 Serdes 匹配的顺序。spring-doc.cadn.net.cn

  • 如果应用程序提供Serde如果返回类型使用传入 key 或 value 类型的实际类型进行参数化,则它将使用它Serde用于入站反序列化。 例如,如果您在应用程序中有以下内容,则 Binder 会检测到KStream与在Serde豆。 它将使用它进行入站反序列化。spring-doc.cadn.net.cn

@Bean
public Serde<Foo> customSerde() {
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 接下来,它查看类型,并查看它们是否是 Kafka Streams 公开的类型之一。如果是这样,请使用它们。 以下是 Binder 将尝试从 Kafka Streams 匹配的 Serde 类型。spring-doc.cadn.net.cn

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • 如果 Kafka Streams 提供的任何 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,Binder 假定类型是 JSON 友好的。 如果您有多个值对象作为输入,这非常有用,因为 Binder 会在内部推断它们以更正 Java 类型。 在回退到JsonSerde不过,Binder 会默认检查Serde设置,以查看它是否为Serde它可以与传入的 KStream 类型匹配。spring-doc.cadn.net.cn

如果上述策略均无效,则应用程序必须提供Serdes 通过配置。 这可以通过两种方式进行配置 - binding 或 default。spring-doc.cadn.net.cn

首先,Binder 将查看Serde在绑定级别提供。 例如,如果您有以下处理器,spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

然后,您可以提供 Binding 级别Serde使用以下内容:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您提供Serde如上所述,每个 input binding,这将具有更高的优先级,并且 Binder 将远离任何Serde推理。

如果您希望将默认键/值 Serdes 用于入站反序列化,则可以在 Binder 级别执行此作。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您不想要 Kafka 提供的原生解码,可以依赖 Spring Cloud Stream 提供的消息转换功能。 由于原生解码是默认的,为了让 Spring Cloud Stream 反序列化入站值对象,你需要显式禁用原生解码。spring-doc.cadn.net.cn

例如,如果你有与上面相同的 BiFunction 处理器,那么spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false您需要单独禁用所有 inputs 的本机解码。否则,本机解码仍将应用于您未禁用的那些。spring-doc.cadn.net.cn

默认情况下,Spring Cloud Stream 将使用application/json作为内容类型,并使用适当的 JSON 消息转换器。 您可以通过使用以下属性和适当的MessageConverter豆。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.contentType

出站序列化

出站序列化几乎遵循与上述入站反序列化相同的规则。 与入站反序列化一样,与以前版本的 Spring Cloud Stream 相比,一个主要变化是出站上的序列化由 Kafka 本地处理。 在 Binder 的 3.0 版本之前,这是由框架本身完成的。spring-doc.cadn.net.cn

出站的键始终由 Kafka 使用匹配的Serde这是由 Binder 推断的。 如果它无法推断出键的类型,则需要使用 configuration 指定。spring-doc.cadn.net.cn

值 serdes 是使用用于入站反序列化的相同规则推断的。 首先,它进行匹配以查看出站类型是否来自应用程序中提供的 bean。 如果不匹配,它会检查它是否与Serde被 Kafka 公开,例如 -Integer,Long,Short,Double,Float,byte[],UUIDString. 如果这不起作用,那么它会回退到JsonSerde由 Spring Kafka 项目提供,但首先看一下默认的Serde配置以查看是否存在匹配项。 请记住,所有这些都对应用程序是透明的。 如果这些都不起作用,则用户必须提供Serde以按配置使用。spring-doc.cadn.net.cn

假设您正在使用相同的BiFunction处理器。然后,您可以按如下方式配置出站键/值 Serdes。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

如果 Serde 推理失败,并且没有提供绑定级别的 Serdes,则 Binder 将回退到JsonSerde,但请查看匹配项的默认 Serdes。spring-doc.cadn.net.cn

默认 serdes 的配置方式与上面相同,在 deserialization 下描述。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serdespring-doc.cadn.net.cn

如果您的应用程序使用分支功能并具有多个输出绑定,则必须为每个绑定配置这些绑定。 同样,如果 Binder 能够推断出Serdetypes 中,则无需进行此配置。spring-doc.cadn.net.cn

如果您不需要 Kafka 提供的原生编码,但想使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认的。 例如,如果你有与上面相同的 BiFunction 处理器,那么spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false在分支的情况下,您需要单独禁用所有输出的本机编码。否则,本机编码仍将应用于您未禁用的那些。spring-doc.cadn.net.cn

当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json作为内容类型,并使用适当的 JSON 消息转换器。 您可以通过以下属性和相应的MessageConverter豆。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-out-0.contentType

当禁用本机编码/解码时, binder 不会像本机 Serdes 那样进行任何推理。 应用程序需要显式提供所有配置选项。 因此,在编写 Spring Cloud Stream Kafka Streams 应用程序时,通常建议保留默认的反序列化/序列化选项,并坚持使用 Kafka Streams 提供的原生反/序列化。 您必须使用框架提供的消息转换功能的一种情况是,当上游生产者使用特定的序列化策略时。 在这种情况下,您需要使用匹配的反序列化策略,因为本机机制可能会失败。 当依赖默认的Serde机制中,应用程序必须确保 Binder 有办法使用适当的Serde,否则可能会失败。spring-doc.cadn.net.cn

值得一提的是,上面概述的数据取消/序列化方法仅适用于处理器的边缘,即 - 入站和出站。 您的业务逻辑可能仍需要调用明确需要的 Kafka Streams APISerde对象。 这些仍然是应用程序的责任,必须由开发人员进行相应的处理。spring-doc.cadn.net.cn