对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
Apache Kafka Streams 支持
从版本 1.1.4 开始, Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。
要从 Spring 应用程序中使用它,kafka-streams
jar 必须存在于 Classpath 中。
它是 Spring for Apache Kafka 项目的可选依赖项,不能传递下载。
基本
参考 Apache Kafka Streams 文档建议了以下使用 API 的方法:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
所以,我们有两个主要组件:
-
StreamsBuilder
:使用 API 构建KStream
(或KTable
) 实例。 -
KafkaStreams
:管理这些实例的生命周期。
都KStream 实例暴露于KafkaStreams 实例StreamsBuilder 同时启动和停止,即使它们具有不同的逻辑。
换句话说,由StreamsBuilder 与单个生命周期控制相关联。
一旦KafkaStreams 实例已被streams.close() ,则无法重新启动。
相反,一个新的KafkaStreams 实例才能重新启动流处理。 |
Spring 管理
为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了StreamsBuilderFactoryBean
.
这是一个AbstractFactoryBean
实现来公开StreamsBuilder
singleton 实例作为 bean。
下面的示例创建这样的 bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration object 而不是StreamsConfig . |
这StreamsBuilderFactoryBean
还实现了SmartLifecycle
要管理内部KafkaStreams
实例。
与 Kafka Streams API 类似,您必须定义KStream
实例,然后再启动KafkaStreams
.
这也适用于 Spring API for Kafka Streams。
因此,当您使用 defaultautoStartup = true
在StreamsBuilderFactoryBean
,您必须声明KStream
实例StreamsBuilder
在刷新应用程序上下文之前。
例如KStream
可以是常规的 bean 定义,而使用 Kafka Streams API 不会产生任何影响。
以下示例显示了如何执行此作:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果要手动控制生命周期(例如,按某个条件停止和启动),可以引用StreamsBuilderFactoryBean
bean 直接使用工厂 bean () 前缀。
因为&
StreamsBuilderFactoryBean
使用其内部KafkaStreams
实例,则可以安全地停止并再次重新启动它。
新的KafkaStreams
在每个start()
.
您也可以考虑使用不同的StreamsBuilderFactoryBean
实例(如果您想控制KStream
实例。
您还可以指定KafkaStreams.StateListener
,Thread.UncaughtExceptionHandler
和StateRestoreListener
选项StreamsBuilderFactoryBean
,这些 API 被委托给内部的KafkaStreams
实例。
此外,除了间接地打开这些选项之外StreamsBuilderFactoryBean
,从版本 2.1.5 开始,您可以使用KafkaStreamsCustomizer
回调接口配置内部KafkaStreams
实例。
请注意,KafkaStreamsCustomizer
覆盖StreamsBuilderFactoryBean
.
如果您需要执行一些KafkaStreams
作,您可以访问该内部KafkaStreams
实例StreamsBuilderFactoryBean.getKafkaStreams()
.
您可以自动装配StreamsBuilderFactoryBean
bean by type,但你应该确保在 bean 定义中使用 full 类型,如下例所示:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,您也可以添加@Qualifier
for injection by name(如果使用接口 bean 定义)。
以下示例显示了如何执行此作:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 有一个新属性infrastructureCustomizer
带类型KafkaStreamsInfrastructureCustomizer
;这允许自定义StreamsBuilder
(例如,添加状态存储)和/或Topology
在创建流之前。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供默认的 no-op 实现,以避免在不需要一种方法时必须同时实现这两种方法。
一个CompositeKafkaStreamsInfrastructureCustomizer
,用于需要应用多个定制器的情况。
KafkaStreams Micrometer 支持
在 2.5.3 版本引入,您可以配置KafkaStreamsMicrometerListener
要自动注册千分尺,请使用KafkaStreams
由工厂 Bean 管理的对象:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON 序列化和反序列化
为了在以 JSON 格式读取或写入主题或状态存储时对数据进行序列化和反序列化, Spring for Apache Kafka 提供了一个JsonSerde
实现,委托给JsonSerializer
和JsonDeserializer
在序列化、反序列化和消息转换中进行了介绍。
这JsonSerde
implementation 通过其构造函数(Target Type 或ObjectMapper
).
在以下示例中,我们使用JsonSerde
要序列化和反序列化Cat
payload 的 GitHub 的JsonSerde
可以在需要实例的地方以类似的方式使用):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
用KafkaStreamBrancher
这KafkaStreamBrancher
类引入了一种更方便的方式来在KStream
.
请考虑以下示例,该示例不使用KafkaStreamBrancher
:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用KafkaStreamBrancher
:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置 Kafka Streams 环境,请使用StreamsBuilderFactoryBean
需要KafkaStreamsConfiguration
实例。
有关所有可能的选项,请参阅 Apache Kafka 文档。
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration object,而不是作为StreamsConfig . |
为了在大多数情况下避免使用样板代码,尤其是在开发微服务时, Spring for Apache Kafka 提供了@EnableKafkaStreams
注解,您应该将其放置在@Configuration
类。
您只需声明一个KafkaStreamsConfiguration
名为defaultKafkaStreamsConfig
.
一个StreamsBuilderFactoryBean
Bean 中名为defaultKafkaStreamsBuilder
)会自动在 application 上下文中声明。
您可以声明并使用任何额外的StreamsBuilderFactoryBean
Beans也是如此。
您可以通过提供实现StreamsBuilderFactoryBeanConfigurer
.
如果有多个这样的 bean,它们将根据其Ordered.order
财产。
默认情况下,当工厂 Bean 停止时,KafkaStreams.cleanUp()
方法。
从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,采用CleanupConfig
对象,该对象具有允许您控制cleanUp()
method 在start()
或stop()
或者两者都不是。
从版本 2.7 开始,默认值是永不清理本地状态。
标头扩充器
版本 3.0 添加了HeaderEnricherProcessor
的扩展ContextualProcessor
;提供与已弃用的HeaderEnricher
它实现了已弃用的Transformer
接口。
这可用于在流处理中添加标头;标头值是 SPEL 表达式;表达式求值的根对象有 3 个属性:
-
record
-这org.apache.kafka.streams.processor.api.Record
(key
,value
,timestamp
,headers
) -
key
- 当前记录的 Key -
value
- 当前记录的值 -
context
-这ProcessorContext
,允许访问当前记录元数据
表达式必须返回byte[]
或String
(将转换为byte[]
用UTF-8
).
要在流中使用扩充器:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改key
或value
;它只是添加标题。
每条记录都需要一个新实例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
下面是一个简单的示例,添加了一个 Literal 标头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
版本 3.0 添加了MessagingProcessor
的扩展ContextualProcessor
,提供与已弃用的MessagingTransformer
它实现了已弃用的Transformer
接口。
这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。
transformer 需要MessagingFunction
.
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring 集成使用其GatewayProxyFactoryBean
.
它还需要一个MessagingMessageConverter
将键、值和元数据(包括标头)与 Spring 消息传递相互转换Message<?>
.
看[从KStream
] 了解更多信息。
从反序列化异常中恢复
版本 2.3 引入了RecoveringDeserializationExceptionHandler
当发生反序列化异常时,它可以采取一些作。
请参阅 Kafka 文档DeserializationExceptionHandler
,其中RecoveringDeserializationExceptionHandler
是一种实现。
这RecoveringDeserializationExceptionHandler
配置了ConsumerRecordRecoverer
实现。
该框架提供了DeadLetterPublishingRecoverer
这会将失败的记录发送到死信主题。
有关此恢复程序的更多信息,请参阅 Publishing Dead-letter Records。
要配置恢复器,请将以下属性添加到您的 streams 配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer()
bean 可以是你自己的ConsumerRecordRecoverer
.
Kafka Streams 示例
以下示例结合了我们在本章中介绍的所有主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}