此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
StreamsBuilderFactoryBean 配置器
通常需要自定义StreamsBuilderFactoryBean
这会创建KafkaStreams
对象。
基于 Spring Kafka 提供的底层支持,binder 允许您自定义StreamsBuilderFactoryBean
.
您可以使用StreamsBuilderFactoryBeanConfigurer
要自定义StreamsBuilderFactoryBean
本身。
然后,一旦您可以访问StreamsBuilderFactoryBean
通过这个 Configurer,你可以自定义相应的KafkaStreams
用KafkaStreamsCustomzier
.
这两个定制器都是 Spring for Apache Kafka 项目的一部分。
以下是使用StreamsBuilderFactoryBeanConfigurer
.
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
以上显示了您可以执行的作来自定义StreamsBuilderFactoryBean
.
您基本上可以从StreamsBuilderFactoryBean
进行自定义。
此定制器将在工厂 Bean 启动之前由 Binders 调用。
访问StreamsBuilderFactoryBean
,您还可以自定义底层KafkaStreams
对象。
这是执行此作的蓝图。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将由StreamsBuilderFactoryBeabn
就在底层KafkaStreams
开始。
只能有一个StreamsBuilderFactoryBeanConfigurer
在整个应用程序中。
那么我们如何考虑多个 Kafka Streams 处理器,因为每个处理器都由单独的 Kafka Streams 处理器备份StreamsBuilderFactoryBean
对象?
在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储
如上所述,Binders 没有提供将全局状态存储注册为功能的第一类方法。
为此,您需要通过StreamsBuilderFactoryBeanConfigurer
.
这是如何做到这一点的。
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return streamsBuilderFactoryBean -> {
try {
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(StreamsBuilder builder) {
builder.addGlobalStore(
...
);
}
});
}
catch (Exception e) {
}
};
}
任何自定义StreamsBuilder
必须通过KafkaStreamsInfrastructureCustomizer
如上所示。
如果StreamsBuilderFactoryBean#getObject()
以获取对StreamsBuilder
object,它可能无法工作,因为 bean 可能在初始化中,因此会遇到一些循环依赖问题。
如果您有多个处理器,则需要将全局状态存储附加到右侧StreamsBuilder
通过过滤掉其他StreamsBuilderFactoryBean
使用上述应用程序 ID 的对象。
使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理程序
在错误处理部分中,我们指出了 Binder 没有提供处理生产异常的第一类方法。
虽然是这种情况,您仍然可以使用StreamsBuilderFacotryBean
customizer 来注册生产异常处理程序。见下文。
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
同样,如果您有多个处理器,则可能需要根据正确的StreamsBuilderFactoryBean
.
您还可以使用 configuration 属性添加此类 production 异常处理程序(有关更多信息,请参见下文),但如果您选择使用编程方法,则这是一个选项。