对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
Apache Kafka Streams 支持
从版本 1.1.4 开始, Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。
要从 Spring 应用程序中使用它,kafka-streams
jar 必须存在于 Classpath 中。
它是 Spring for Apache Kafka 项目的可选依赖项,不能传递下载。
基本
参考 Apache Kafka Streams 文档建议了以下使用 API 的方法:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
所以,我们有两个主要组件:
-
StreamsBuilder
:使用 API 构建KStream
(或KTable
) 实例。 -
KafkaStreams
:管理这些实例的生命周期。
都KStream 实例暴露于KafkaStreams 实例StreamsBuilder 同时启动和停止,即使它们具有不同的逻辑。
换句话说,由StreamsBuilder 与单个生命周期控制相关联。
一旦KafkaStreams 实例已被streams.close() ,则无法重新启动。
相反,一个新的KafkaStreams 实例才能重新启动流处理。 |
Spring 管理
为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了StreamsBuilderFactoryBean
.
这是一个AbstractFactoryBean
实现来公开StreamsBuilder
singleton 实例作为 bean。
下面的示例创建这样的 bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration object 而不是StreamsConfig . |
这StreamsBuilderFactoryBean
还实现了SmartLifecycle
要管理内部KafkaStreams
实例。
与 Kafka Streams API 类似,您必须定义KStream
实例,然后再启动KafkaStreams
.
这也适用于 Spring API for Kafka Streams。
因此,当您使用 defaultautoStartup = true
在StreamsBuilderFactoryBean
,您必须声明KStream
实例StreamsBuilder
在刷新应用程序上下文之前。
例如KStream
可以是常规的 bean 定义,而使用 Kafka Streams API 不会产生任何影响。
以下示例显示了如何执行此作:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果要手动控制生命周期(例如,按某个条件停止和启动),可以引用StreamsBuilderFactoryBean
bean 直接使用工厂 bean () 前缀。
因为&
StreamsBuilderFactoryBean
使用其内部KafkaStreams
实例,则可以安全地停止并再次重新启动它。
新的KafkaStreams
在每个start()
.
您也可以考虑使用不同的StreamsBuilderFactoryBean
实例(如果您想控制KStream
实例。
您还可以指定KafkaStreams.StateListener
,Thread.UncaughtExceptionHandler
和StateRestoreListener
选项StreamsBuilderFactoryBean
,这些 API 被委托给内部的KafkaStreams
实例。
此外,除了间接地打开这些选项之外StreamsBuilderFactoryBean
,从版本 2.1.5 开始,您可以使用KafkaStreamsCustomizer
回调接口配置内部KafkaStreams
实例。
请注意,KafkaStreamsCustomizer
覆盖StreamsBuilderFactoryBean
.
如果您需要执行一些KafkaStreams
作,您可以访问该内部KafkaStreams
实例StreamsBuilderFactoryBean.getKafkaStreams()
.
您可以自动装配StreamsBuilderFactoryBean
bean by type,但你应该确保在 bean 定义中使用 full 类型,如下例所示:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,您也可以添加@Qualifier
for injection by name(如果使用接口 bean 定义)。
以下示例显示了如何执行此作:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 有一个新属性infrastructureCustomizer
带类型KafkaStreamsInfrastructureCustomizer
;这允许自定义StreamsBuilder
(例如,添加状态存储)和/或Topology
在创建流之前。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供默认的 no-op 实现,以避免在不需要一种方法时必须同时实现这两种方法。
一个CompositeKafkaStreamsInfrastructureCustomizer
,用于需要应用多个定制器的情况。
KafkaStreams Micrometer 支持
在 2.5.3 版本引入,您可以配置KafkaStreamsMicrometerListener
要自动注册千分尺,请使用KafkaStreams
由工厂 Bean 管理的对象:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON 序列化和反序列化
为了在以 JSON 格式读取或写入主题或状态存储时对数据进行序列化和反序列化, Spring for Apache Kafka 提供了一个JsonSerde
实现,委托给JsonSerializer
和JsonDeserializer
在序列化、反序列化和消息转换中进行了介绍。
这JsonSerde
implementation 通过其构造函数(Target Type 或ObjectMapper
).
在以下示例中,我们使用JsonSerde
要序列化和反序列化Cat
payload 的 GitHub 的JsonSerde
可以在需要实例的地方以类似的方式使用):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
用KafkaStreamBrancher
这KafkaStreamBrancher
类引入了一种更方便的方式来在KStream
.
请考虑以下示例,该示例不使用KafkaStreamBrancher
:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用KafkaStreamBrancher
:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置 Kafka Streams 环境,请使用StreamsBuilderFactoryBean
需要KafkaStreamsConfiguration
实例。
有关所有可能的选项,请参阅 Apache Kafka 文档。
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration object,而不是作为StreamsConfig . |
为了在大多数情况下避免使用样板代码,尤其是在开发微服务时, Spring for Apache Kafka 提供了@EnableKafkaStreams
注解,您应该将其放置在@Configuration
类。
您只需声明一个KafkaStreamsConfiguration
名为defaultKafkaStreamsConfig
.
一个StreamsBuilderFactoryBean
Bean 中名为defaultKafkaStreamsBuilder
)会自动在 application 上下文中声明。
您可以声明并使用任何额外的StreamsBuilderFactoryBean
Beans也是如此。
您可以通过提供实现StreamsBuilderFactoryBeanConfigurer
.
如果有多个这样的 bean,它们将根据其Ordered.order
财产。
清理和停止配置
当工厂停止时,KafkaStreams.close()
使用 2 个参数调用 :
-
closeTimeout :等待线程关闭的时间(默认为
DEFAULT_CLOSE_TIMEOUT
设置为 10 秒)。可使用StreamsBuilderFactoryBean.setCloseTimeout()
. -
leaveGroupOnClose :触发来自群组的消费者离开调用(默认为
false
).可使用StreamsBuilderFactoryBean.setLeaveGroupOnClose()
.
默认情况下,当工厂 Bean 停止时,KafkaStreams.cleanUp()
方法。
从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,采用CleanupConfig
对象,该对象具有允许您控制cleanUp()
method 在start()
或stop()
或者两者都不是。
从版本 2.7 开始,默认值是永不清理本地状态。
标头扩充器
版本 3.0 添加了HeaderEnricherProcessor
的扩展ContextualProcessor
;提供与已弃用的HeaderEnricher
它实现了已弃用的Transformer
接口。
这可用于在流处理中添加标头;标头值是 SPEL 表达式;表达式求值的根对象有 3 个属性:
-
record
-这org.apache.kafka.streams.processor.api.Record
(key
,value
,timestamp
,headers
) -
key
- 当前记录的 Key -
value
- 当前记录的值 -
context
-这ProcessorContext
,允许访问当前记录元数据
表达式必须返回byte[]
或String
(将转换为byte[]
用UTF-8
).
要在流中使用扩充器:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改key
或value
;它只是添加标题。
每条记录都需要一个新实例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
下面是一个简单的示例,添加了一个 Literal 标头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
版本 3.0 添加了MessagingProcessor
的扩展ContextualProcessor
,提供与已弃用的MessagingTransformer
它实现了已弃用的Transformer
接口。
这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。
transformer 需要MessagingFunction
.
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring 集成使用其GatewayProxyFactoryBean
.
它还需要一个MessagingMessageConverter
将键、值和元数据(包括标头)与 Spring 消息传递相互转换Message<?>
.
看[从KStream
] 了解更多信息。
从反序列化异常中恢复
版本 2.3 引入了RecoveringDeserializationExceptionHandler
当发生反序列化异常时,它可以采取一些作。
请参阅 Kafka 文档DeserializationExceptionHandler
,其中RecoveringDeserializationExceptionHandler
是一种实现。
这RecoveringDeserializationExceptionHandler
配置了ConsumerRecordRecoverer
实现。
该框架提供了DeadLetterPublishingRecoverer
这会将失败的记录发送到死信主题。
有关此恢复程序的更多信息,请参阅 Publishing Dead-letter Records。
要配置恢复器,请将以下属性添加到您的 streams 配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer()
bean 可以是你自己的ConsumerRecordRecoverer
.
交互式查询支持
从版本 3.2 开始, Spring for Apache Kafka 提供了在 Kafka Streams 中进行交互式查询所需的基本工具。
交互式查询在有状态 Kafka Streams 应用程序中非常有用,因为它们提供了一种持续查询应用程序中有状态存储的方法。
因此,如果应用程序想要实现所考虑的系统的当前视图,交互式查询提供了一种实现此目的的方法。
要了解有关交互式查询的更多信息,请参阅此文。
Spring 中对 Apache Kafka 的支持以一个名为KafkaStreamsInteractiveQueryService
这是 Kafka Streams 库中交互式查询 API 的门面。
应用程序可以将此服务的实例创建为 bean,然后使用它来按其名称检索状态存储。
以下代码片段显示了一个示例。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假设 Kafka Streams 应用程序具有一个名为app-store
,则可以通过KafkStreamsInteractiveQuery
API 如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦应用程序获得对状态存储的访问权限,它就可以从中查询键值信息。
在这种情况下,应用程序使用的状态存储是只读键值存储。
Kafka Streams 应用程序还可以使用其他类型的状态存储。
例如,如果应用程序更喜欢查询基于窗口的存储,它可以在 Kafka Streams 应用程序业务逻辑中构建该存储,然后检索它。
因此,用于检索KafkaStreamsInteractiveQueryService
具有泛型存储类型签名,以便最终用户可以分配适当的类型。
下面是来自 API 的类型签名。
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
调用此方法时,用户可以指定请求正确的 state store 类型,就像我们在上面的例子中所做的那样。
重试状态存储检索
尝试使用KafkaStreamsInteractiveQueryService
,则可能会因各种原因找不到 state store。
如果这些原因是暂时的,KafkaStreamsInteractiveQueryService
提供了一个选项,通过允许注入自定义RetryTemplate
.
默认情况下,RetryTemmplate
用于KafkaStreamsInteractiveQueryService
使用最多 3 次尝试,固定回退为 1 秒。
以下是注入自定义RetryTemmplate
到KafkaStreamsInteractiveQueryService
最大尝试次数为 10 次。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
查询远程状态存储
上面显示的用于检索状态存储的 API -retrieveQueryableStore
适用于本地可用的键值 state 存储。
在生产设置中,Kafka Streams 应用程序很可能根据分区数量进行分布。
如果一个主题有四个分区,并且有四个相同的 Kafka Streams 处理器实例正在运行,则每个实例可能负责处理该主题的单个分区。
在这种情况下,调用retrieveQueryableStore
可能无法给出实例正在查找的正确结果,尽管它可能会返回有效的存储。
假设具有四个分区的主题包含有关各种键的数据,并且单个分区始终负责特定键。
如果调用retrieveQueryableStore
正在查找有关此实例不托管的密钥的信息,则它不会收到任何数据。
这是因为当前的 Kafka Streams 实例对此密钥一无所知。
要解决此问题,调用实例首先需要确保它们具有托管特定密钥的 Kafka Streams 处理器实例的主机信息。
这可以从同一application.id
如下。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的示例代码中,调用实例正在查询特定键12345
从名为app-store
.
API 还需要一个相应的密钥序列化器,在本例中为IntegerSerializer
.
Kafka Streams 会查看同一application.id
并尝试查找托管此特定键的实例,
找到后,它会将该主机信息作为HostInfo
对象。
API 如下所示:
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
当使用同一application.id
在这样的分布式方式中,应用程序应该提供一个 RPC 层,其中状态存储可以通过 RPC 端点(如 REST 端点)进行查询。
有关此内容的更多详细信息,请参阅此文章。
使用 Spring for Apache Kafka 时,使用 spring-web 技术添加基于 Spring 的 REST 端点非常容易。
一旦有了 REST 端点,就可以使用它从任何 Kafka Streams 实例查询状态存储,前提是HostInfo
实例知道密钥的托管位置。
如果托管实例的密钥是当前实例,则应用程序不需要调用 RPC 机制,而是进行 JVM 内调用。
但是,问题在于应用程序可能不知道进行调用的实例是托管密钥的位置,因为特定服务器可能会因使用者重新平衡而丢失分区。
要解决此问题,请执行以下作KafkaStreamsInteractiveQueryService
提供便捷的 API,通过 API 方法查询当前主机信息getCurrentKafkaStreamsApplicationHostInfo()
返回当前的HostInfo
.
这个想法是,应用程序可以首先获取有关密钥保存位置的信息,然后比较HostInfo
替换为 one about the current instance。
如果HostInfo
data 匹配项,然后它可以通过retrieveQueryableStore
,否则请选择 RPC 选项。
Kafka Streams 示例
以下示例结合了我们在本章中介绍的各种主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}