Spring Cloud Stream 集成

任务本身可能很有用,但将任务集成到更大的生态系统中可以让它 对于更复杂的处理和编排很有用。本节 介绍了 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。spring-doc.cadn.net.cn

17. 从 Spring Cloud Stream 启动任务

您可以从流启动任务。为此,请创建一个侦听消息的 sink ,其中包含一个TaskLaunchRequest作为其有效载荷。这TaskLaunchRequest包含:spring-doc.cadn.net.cn

如果有效负载是不同的类型,则 sink 会引发异常。

例如,可以创建一个流,该流具有一个处理器,该处理器从 HTTP 源并创建一个GenericMessage,其中包含TaskLaunchRequest并发送 将消息发送到其 output 通道。然后,任务接收器将从其 输入 channnel,然后启动任务。spring-doc.cadn.net.cn

要创建 taskSink,您只需创建一个 Spring Boot 应用程序,其中包含EnableTaskLauncherannotation 中,如以下示例所示:spring-doc.cadn.net.cn

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(TaskSinkApplication.class, args);
    }
}

样本 模块中包含一个示例 Sink 和 Processor。安装 这些示例复制到本地 Maven 存储库中,从spring-cloud-task-samples目录中的skipInstall属性设置为false如 如以下示例所示:spring-doc.cadn.net.cn

mvn clean installspring-doc.cadn.net.cn

maven.remoteRepositories.springRepo.url属性必须设置为位置 的 Über-jar 所在的远程仓库。如果未设置,则没有远程 存储库,因此它仅依赖于本地存储库。

17.1. Spring Cloud 数据流

要在 Spring Cloud Data Flow 中创建流,必须先注册 Task Sink 我们创建的应用程序。在以下示例中,我们将注册 Processor 和 使用 Spring Cloud Data Flow shell 接收示例应用程序:spring-doc.cadn.net.cn

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例展示了如何从 Spring Cloud Data Flow shell 创建流:spring-doc.cadn.net.cn

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

18. Spring Cloud 任务事件

Spring Cloud Task 提供了通过 Spring Cloud Stream 发出事件的能力 channel。任务侦听器是 用于发布TaskExecution在名为task-events.此功能 会自动连接到任何具有spring-cloud-stream,spring-cloud-stream-<binder>, 以及其 Classpath 上的已定义任务。spring-doc.cadn.net.cn

要禁用事件发出侦听器,请将spring.cloud.task.events.enabledproperty 设置为false.

定义适当的类路径后,以下任务会发出TaskExecution作为 事件task-eventschannel (在任务的开始和结束时):spring-doc.cadn.net.cn

@SpringBootApplication
public class TaskEventsApplication {

    public static void main(String[] args) {
        SpringApplication.run(TaskEventsApplication.class, args);
    }

    @Configuration
    public static class TaskConfiguration {

        @Bean
        public CommandLineRunner commandLineRunner() {
            return new CommandLineRunner() {
                @Override
                public void run(String... args) throws Exception {
                    System.out.println("The CommandLineRunner was executed");
                }
            };
        }
    }
}
还需要 Binder 实现位于 Classpath 上。
可以在 samples 模块中找到一个示例任务事件应用程序 Spring Cloud Task Project 的 Spring Cloud 任务项目,这里

18.1. 禁用特定任务事件

要禁用任务事件,您可以设置spring.cloud.task.events.enabledproperty 设置为false.spring-doc.cadn.net.cn

19. Spring Batch 事件

通过任务执行 Spring Batch 作业时,可以将 Spring Cloud 任务配置为 根据 Spring Batch 中提供的 Spring Batch 侦听器发出信息性消息。 具体来说,以下 Spring Batch 侦听器会自动配置到每个批处理作业中 并在通过 Spring 运行时在关联的 Spring Cloud Stream 通道上发出消息 云任务:spring-doc.cadn.net.cn

这些侦听器会自动配置为任何AbstractJob当适当 beans (一个Job以及TaskLifecycleListener) 存在于上下文中。configuration 设置为 listen to these events 的处理方式与绑定到任何其他 Spring 的方式相同 Cloud Stream 频道已完成。我们的任务(运行批处理作业的任务)充当Source,而侦听应用程序充当ProcessorSink.spring-doc.cadn.net.cn

例如,让应用程序侦听job-execution-events渠道 用于作业的启动和停止。要配置侦听应用程序,您可以 将 input 配置为job-execution-events如下:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.input.destination=job-execution-eventsspring-doc.cadn.net.cn

还需要 Binder 实现位于 Classpath 上。
可以在 samples 模块中找到一个示例批处理事件应用程序 Spring Cloud Task Project 的 Spring Cloud 任务项目,这里

19.1. 向不同的频道发送 Batch 事件

Spring Cloud Task 为批处理事件提供的选项之一是能够更改 特定侦听器可以向其发送消息的通道。为此,请使用 以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>.例如 如果StepExecutionListener需要将其消息发送到另一个名为my-step-execution-events而不是默认的step-execution-events中,您可以添加 以下配置:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.step-execution-events.destination=my-step-execution-eventsspring-doc.cadn.net.cn

19.2. 禁用 Batch 事件

要禁用所有批处理事件的侦听器功能,请使用以下命令 配置:spring-doc.cadn.net.cn

spring.cloud.task.batch.events.enabled=falsespring-doc.cadn.net.cn

要禁用特定批处理事件,请使用以下配置:spring-doc.cadn.net.cn

spring.cloud.task.batch.events.<batch event listener>.enabled=false:spring-doc.cadn.net.cn

以下清单显示了您可以禁用的各个侦听器:spring-doc.cadn.net.cn

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

19.3. Batch 事件的 Emit Order

默认情况下,批处理事件具有Ordered.LOWEST_PRECEDENCE.要更改此值(对于 example,设置为 5 ),使用以下配置:spring-doc.cadn.net.cn

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5