通过消息启动 Batch 作业

使用核心 Spring Batch API 启动批处理作业时,您可以 基本上有两个选项:spring-doc.cadn.net.cn

例如,您可能希望使用CommandLineJobRunner当通过 使用 shell 脚本。或者,您可以使用JobOperator直接(例如,使用 Spring Batch 作为 Web 应用程序的一部分)。但是,那又如何呢 更复杂的用例?也许您需要轮询远程 (S)FTP server 检索 Batch Job 或应用程序的数据 必须同时支持多个不同的数据源。为 例如,您不仅可以从 Web 接收数据文件,还可以从 FTP 和其他来源。也许输入文件的其他转换是 在调用 Spring Batch 之前需要。spring-doc.cadn.net.cn

因此,执行批处理作业会更强大 通过使用 Spring Integration 及其众多适配器。例如 您可以使用 File Inbound Channel Adapter 来 监视文件系统中的目录,并以 一旦输入文件到达。此外,您还可以创建 Spring 使用多个不同适配器的集成流 从多个来源摄取批处理作业的数据 同时仅使用配置。实现所有这些 Spring Integration 的场景很容易,因为它允许 解耦的、事件驱动的JobLauncher.spring-doc.cadn.net.cn

Spring Batch 集成提供了JobLaunchingMessageHandler类 用于启动批处理作业。的JobLaunchingMessageHandler由 Spring Integration 消息,其有效负载为JobLaunchRequest.这个类是Job启动并围绕JobParameters他们是 启动 Batch 作业所必需的。spring-doc.cadn.net.cn

下图显示了典型的 Spring 集成 启动 Batch 作业所需的消息流。EIP (Enterprise Integration Patterns) 网站提供了消息传送图标及其描述的完整概述。spring-doc.cadn.net.cn

启动 Batch Job
图 1.启动 Batch Job

将文件转换为 JobLaunchRequest

以下示例将文件转换为JobLaunchRequest:spring-doc.cadn.net.cn

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

The JobExecution Response

When a batch job is being executed, a JobExecution instance is returned. You can use this instance to determine the status of an execution. If a JobExecution is able to be created successfully, it is always returned, regardless of whether or not the actual execution is successful.spring-doc.cadn.net.cn

The exact behavior on how the JobExecution instance is returned depends on the provided TaskExecutor. If a synchronous (single-threaded) TaskExecutor implementation is used, the JobExecution response is returned only after the job completes. When using an asynchronous TaskExecutor, the JobExecution instance is returned immediately. You can then take the id of JobExecution instance (with JobExecution.getJobId()) and query the JobRepository for the job’s updated status using the JobExplorer. For more information, see Querying the Repository.spring-doc.cadn.net.cn

Spring Batch Integration Configuration

Consider a case where someone needs to create a file inbound-channel-adapter to listen for CSV files in the provided directory, hand them off to a transformer (FileMessageToJobRequest), launch the job through the job launching gateway, and log the output of the JobExecution with the logging-channel-adapter.spring-doc.cadn.net.cn

The following example shows how that common case can be configured in Java:spring-doc.cadn.net.cn

Java Configuration
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

The following example shows how that common case can be configured in XML:spring-doc.cadn.net.cn

XML Configuration
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

Example ItemReader Configuration

Now that we are polling for files and launching jobs, we need to configure our Spring Batch ItemReader (for example) to use the files found at the location defined by the job parameter called "input.file.name", as the following bean configuration shows:spring-doc.cadn.net.cn

The following Java example shows the necessary bean configuration:spring-doc.cadn.net.cn

Java Configuration
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

The following XML example shows the necessary bean configuration:spring-doc.cadn.net.cn

XML Configuration
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

The main points of interest in the preceding example are injecting the value of #{jobParameters['input.file.name']} as the Resource property value and setting the ItemReader bean to have step scope. Setting the bean to have step scope takes advantage of the late binding support, which allows access to the jobParameters variable.spring-doc.cadn.net.cn