此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

重试关键业务逻辑

在某些情况下,您可能希望重试对应用程序至关重要的业务逻辑部分。 可能存在对关系数据库的外部调用或从 Kafka Streams 处理器调用 REST 终端节点。 这些调用可能会因各种原因而失败,例如网络问题或远程服务不可用。 更常见的是,如果您可以再次尝试,这些失败可能会自行解决。 默认情况下,Kafka Streams Binder 会创建RetryTemplatebean 的所有输入绑定。spring-doc.cadn.net.cn

如果函数具有以下签名,spring-doc.cadn.net.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()

使用默认绑定名称,RetryTemplate将注册为process-in-0-RetryTemplate. 这遵循了绑定名称 (process-in-0) 后跟文本-RetryTemplate. 在多个 input bindings 的情况下,将有一个单独的RetryTemplate每个绑定可用的 bean。 如果有自定义RetryTemplateBean 在应用程序中可用,并通过spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName,则它优先于任何输入绑定级别重试模板配置属性。spring-doc.cadn.net.cn

一旦RetryTemplate从绑定到应用程序中,它可用于重试应用程序的任何关键部分。 下面是一个示例:spring-doc.cadn.net.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

    return input -> input
            .process(() -> new Processor<Object, String>() {
                @Override
                public void init(ProcessorContext processorContext) {
                }

                @Override
                public void process(Object o, String s) {
                    retryTemplate.execute(context -> {
                       //Critical business logic goes here.
                    });
                }

                @Override
                public void close() {
                }
            });
}

或者,您可以使用自定义RetryTemplate如下。spring-doc.cadn.net.cn

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

    @Bean
    @StreamRetryTemplate
    RetryTemplate fooRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1);

        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input -> input
                .process(() -> new Processor<Object, String>() {
                    @Override
                    public void init(ProcessorContext processorContext) {
                    }

                    @Override
                    public void process(Object o, String s) {
                        fooRetryTemplate().execute(context -> {
                           //Critical business logic goes here.
                        });

                    }

                    @Override
                    public void close() {
                    }
                });
    }
}

请注意,当重试次数用尽时,默认情况下,将引发最后一个异常,从而导致处理器终止。 如果您希望处理异常并继续处理,可以将 RecoveryCallback 添加到execute方法: 下面是一个示例。spring-doc.cadn.net.cn

retryTemplate.execute(context -> {
    //Critical business logic goes here.
    }, context -> {
       //Recovery logic goes here.
       return null;
    ));

请参阅 Spring Retry 项目,了解有关 RetryTemplate、重试策略、回退策略等的更多信息。spring-doc.cadn.net.cn