对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
从版本 1.1.4 开始,Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。
要在 Spring 应用程序中使用它,jar 必须存在于类路径中。
它是 Spring for Apache Kafka 项目的可选依赖项,不会以可传递方式下载。kafka-streams
基本
参考 Apache Kafka Streams 文档建议了以下 API 使用方式:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
因此,我们有两个主要组成部分:
-
StreamsBuilder
:使用用于构建(或)实例的 API。KStream
KTable
-
KafkaStreams
:管理这些实例的生命周期。
单个实例向实例公开的所有实例都会同时启动和停止,即使它们具有不同的逻辑。
换言之,由 a 定义的所有流都与单个生命周期控件相关联。
实例一旦被关闭,就无法重新启动。
相反,必须创建一个新实例来重新启动流处理。KStream KafkaStreams StreamsBuilder StreamsBuilder KafkaStreams streams.close() KafkaStreams |
单个实例向实例公开的所有实例都会同时启动和停止,即使它们具有不同的逻辑。
换言之,由 a 定义的所有流都与单个生命周期控件相关联。
实例一旦被关闭,就无法重新启动。
相反,必须创建一个新实例来重新启动流处理。KStream KafkaStreams StreamsBuilder StreamsBuilder KafkaStreams streams.close() KafkaStreams |
弹簧管理
为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 .
这是一个将单例实例公开为 Bean 的实现。
下面的示例创建这样的 Bean:StreamsBuilderFactoryBean
AbstractFactoryBean
StreamsBuilder
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在以对象的形式提供,而不是 .KafkaStreamsConfiguration StreamsConfig |
还实现了管理内部实例的生命周期。
与 Kafka Streams API 类似,您必须在启动 .
这也适用于 Spring API for Kafka Streams。
因此,在 上使用 default 时,必须在刷新应用程序上下文之前声明 上的实例。
例如,可以是常规的 Bean 定义,而使用 Kafka Streams API 时不会产生任何影响。
以下示例演示如何执行此操作:StreamsBuilderFactoryBean
SmartLifecycle
KafkaStreams
KStream
KafkaStreams
autoStartup = true
StreamsBuilderFactoryBean
KStream
StreamsBuilder
KStream
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果要手动控制生命周期(例如,在某个条件下停止和启动),可以使用工厂 bean () 前缀直接引用 Bean。
由于使用其内部实例,因此可以安全地停止并重新启动它。
在每个 .
如果要单独控制实例的生命周期,还可以考虑使用不同的实例。StreamsBuilderFactoryBean
&
StreamsBuilderFactoryBean
KafkaStreams
KafkaStreams
start()
StreamsBuilderFactoryBean
KStream
您还可以在 上指定 、 和 选项,这些选项将委托给内部实例。
此外,从版本 2.1.5 开始,除了间接设置这些选项外,还可以使用回调接口来配置内部实例。
请注意,这将覆盖 提供的选项。
如果需要直接执行某些操作,可以使用 访问该内部实例。
您可以按类型自动连接 Bean,但应确保在 Bean 定义中使用完整类型,如以下示例所示:KafkaStreams.StateListener
Thread.UncaughtExceptionHandler
StateRestoreListener
StreamsBuilderFactoryBean
KafkaStreams
StreamsBuilderFactoryBean
KafkaStreamsCustomizer
KafkaStreams
KafkaStreamsCustomizer
StreamsBuilderFactoryBean
KafkaStreams
KafkaStreams
StreamsBuilderFactoryBean.getKafkaStreams()
StreamsBuilderFactoryBean
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果使用接口 Bean 定义,则可以按名称添加注入。
以下示例演示如何执行此操作:@Qualifier
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从 2.4.1 版开始,工厂 Bean 具有 type 的新属性;这允许在创建流之前自定义(例如添加状态存储)和/或 。infrastructureCustomizer
KafkaStreamsInfrastructureCustomizer
StreamsBuilder
Topology
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供了默认的无操作实现,以避免在不需要时必须实现这两种方法。
当您需要应用多个定制器时,将提供 A。CompositeKafkaStreamsInfrastructureCustomizer
从版本 2.2 开始,流配置现在以对象的形式提供,而不是 .KafkaStreamsConfiguration StreamsConfig |
KafkaStreams 千分尺支持
在 V2.5.3 中引入,您可以配置 a 以自动注册由工厂 Bean 管理的对象的千分尺:KafkaStreamsMicrometerListener
KafkaStreams
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
流 JSON 序列化和反序列化
为了在以 JSON 格式读取或写入主题或状态存储时序列化和反序列化数据,Spring for Apache Kafka 提供了一个使用 JSON 的实现,委托给序列化、反序列化和消息转换中所述的 和。
该实现通过其构造函数(目标类型或 )提供相同的配置选项。
在以下示例中,我们使用 to 序列化和反序列化 Kafka 流的有效负载(只要需要实例,就可以以类似的方式使用 ):JsonSerde
JsonSerializer
JsonDeserializer
JsonSerde
ObjectMapper
JsonSerde
Cat
JsonSerde
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/使用者工厂中使用时,可以使用 Fluent API,从而简化配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
用KafkaStreamBrancher
该类引入了一种更方便的方法,用于在 之上构建条件分支。KafkaStreamBrancher
KStream
请考虑以下不使用 :KafkaStreamBrancher
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用:KafkaStreamBrancher
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置 Kafka Streams 环境,需要一个实例。
有关所有可能的选项,请参阅 Apache Kafka 文档。StreamsBuilderFactoryBean
KafkaStreamsConfiguration
从版本 2.2 开始,流配置现在以对象的形式提供,而不是以 .KafkaStreamsConfiguration StreamsConfig |
为了避免在大多数情况下使用样板代码,尤其是在开发微服务时,Spring for Apache Kafka 提供了注释,您应该将其放在类上。
您只需要声明一个名为 的 Bean 即可。
在应用程序上下文中自动声明名为 的 Bean。
您也可以声明和使用任何其他 bean。
您可以通过提供实现 的 Bean 来执行该 Bean 的其他定制。
如果有多个这样的豆子,它们将根据它们的属性进行应用。@EnableKafkaStreams
@Configuration
KafkaStreamsConfiguration
defaultKafkaStreamsConfig
StreamsBuilderFactoryBean
defaultKafkaStreamsBuilder
StreamsBuilderFactoryBean
StreamsBuilderFactoryBeanConfigurer
Ordered.order
缺省情况下,当工厂 Bean 停止时,将调用该方法。
从 V2.1.2 开始,工厂 Bean 具有其他构造函数,采用具有属性的对象来控制方法是在期间调用还是两者都不调用。
从版本 2.7 开始,默认设置是从不清理本地状态。KafkaStreams.cleanUp()
CleanupConfig
cleanUp()
start()
stop()
从版本 2.2 开始,流配置现在以对象的形式提供,而不是以 .KafkaStreamsConfiguration StreamsConfig |
标头 Enricher
版本 3.0 添加了 ;提供与实现已弃用接口的已弃用接口相同的功能。
这可用于在流处理中添加标头;标头值是 SpEL 表达式;表达式计算的根对象具有 3 个属性:HeaderEnricherProcessor
ContextualProcessor
HeaderEnricher
Transformer
-
record
- (、、、org.apache.kafka.streams.processor.api.Record
key
value
timestamp
headers
) -
key
- 当前记录的键 -
value
- 当前记录的值 -
context
- 允许访问当前记录元数据ProcessorContext
表达式必须返回 a 或 a(将转换为 using )。byte[]
String
byte[]
UTF-8
要在流中使用 enricher,请执行以下操作:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改 或 ;它只是添加标题。key
value
每条记录都需要一个新实例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
下面是一个简单的示例,添加一个文本标头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
每条记录都需要一个新实例。 |
MessagingProcessor
版本 3.0 添加了 的扩展,提供与实现已弃用接口的已弃用接口相同的功能。
这允许 Kafka Streams 拓扑与 Spring Messaging 组件(如 Spring Integration 流)进行交互。
变压器需要实现 。MessagingProcessor
ContextualProcessor
MessagingTransformer
Transformer
MessagingFunction
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 使用其 .
它还需要将键、值和元数据(包括标头)转换为Spring Messaging/从Spring Messaging转换。
有关更多信息,请参见 [从 KStream
调用 Spring 集成流]。GatewayProxyFactoryBean
MessagingMessageConverter
Message<?>
从反序列化异常中恢复
版本 2.3 引入了在发生反序列化异常时可以执行某些操作的功能。
请参阅 的 Kafka 文档,其中 是实现。
配置了实现。
该框架提供了将失败的记录发送到死信主题的 WHICH 。
有关此恢复器的详细信息,请参阅发布死信记录。RecoveringDeserializationExceptionHandler
DeserializationExceptionHandler
RecoveringDeserializationExceptionHandler
RecoveringDeserializationExceptionHandler
ConsumerRecordRecoverer
DeadLetterPublishingRecoverer
要配置恢复器,请将以下属性添加到流配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,bean 也可以是你自己的实现。recoverer()
ConsumerRecordRecoverer
Kafka 流示例
以下示例结合了本章中介绍的所有主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}