此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

StreamsBuilderFactoryBean 配置器

通常需要自定义StreamsBuilderFactoryBean这会创建KafkaStreams对象。 基于 Spring Kafka 提供的底层支持,binder 允许您自定义StreamsBuilderFactoryBean. 您可以使用StreamsBuilderFactoryBeanConfigurer要自定义StreamsBuilderFactoryBean本身。 然后,一旦您可以访问StreamsBuilderFactoryBean通过这个 Configurer,你可以自定义相应的KafkaStreamsKafkaStreamsCustomzier. 这两个定制器都是 Spring for Apache Kafka 项目的一部分。spring-doc.cadn.net.cn

以下是使用StreamsBuilderFactoryBeanConfigurer.spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

以上显示了您可以执行的作来自定义StreamsBuilderFactoryBean. 您基本上可以从StreamsBuilderFactoryBean进行自定义。 此定制器将在工厂 Bean 启动之前由 Binders 调用。spring-doc.cadn.net.cn

访问StreamsBuilderFactoryBean,您还可以自定义底层KafkaStreams对象。 这是执行此作的蓝图。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer将由StreamsBuilderFactoryBeabn就在底层KafkaStreams开始。spring-doc.cadn.net.cn

只能有一个StreamsBuilderFactoryBeanConfigurer在整个应用程序中。 那么我们如何考虑多个 Kafka Streams 处理器,因为每个处理器都由单独的 Kafka Streams 处理器备份StreamsBuilderFactoryBean对象? 在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些过滤器。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储

如上所述,Binders 没有提供将全局状态存储注册为功能的第一类方法。 为此,您需要通过StreamsBuilderFactoryBeanConfigurer. 这是如何做到这一点的。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {

        }
    };
}

任何自定义StreamsBuilder必须通过KafkaStreamsInfrastructureCustomizer如上所示。 如果StreamsBuilderFactoryBean#getObject()以获取对StreamsBuilderobject,它可能无法工作,因为 bean 可能在初始化中,因此会遇到一些循环依赖问题。spring-doc.cadn.net.cn

如果您有多个处理器,则需要将全局状态存储附加到右侧StreamsBuilder通过过滤掉其他StreamsBuilderFactoryBean使用上述应用程序 ID 的对象。spring-doc.cadn.net.cn

使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理程序

在错误处理部分中,我们指出了 Binder 没有提供处理生产异常的第一类方法。 虽然是这种情况,您仍然可以使用StreamsBuilderFacotryBeancustomizer 来注册生产异常处理程序。见下文。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

同样,如果您有多个处理器,则可能需要根据正确的StreamsBuilderFactoryBean. 您还可以使用 configuration 属性添加此类 production 异常处理程序(有关更多信息,请参见下文),但如果您选择使用编程方法,则这是一个选项。spring-doc.cadn.net.cn