通过消息启动 Batch 作业
使用核心 Spring Batch API 启动批处理作业时,您可以 基本上有两个选项:
-
在命令行中,使用
CommandLineJobRunner
-
以编程方式,使用
JobOperator.start()
或JobLauncher.run()
例如,您可能希望使用CommandLineJobRunner
当通过
使用 shell 脚本。或者,您可以使用JobOperator
直接(例如,使用
Spring Batch 作为 Web 应用程序的一部分)。但是,那又如何呢
更复杂的用例?也许您需要轮询远程 (S)FTP
server 检索 Batch Job 或应用程序的数据
必须同时支持多个不同的数据源。为
例如,您不仅可以从 Web 接收数据文件,还可以从
FTP 和其他来源。也许输入文件的其他转换是
在调用 Spring Batch 之前需要。
因此,执行批处理作业会更强大
通过使用 Spring Integration 及其众多适配器。例如
您可以使用 File Inbound Channel Adapter 来
监视文件系统中的目录,并以
一旦输入文件到达。此外,您还可以创建 Spring
使用多个不同适配器的集成流
从多个来源摄取批处理作业的数据
同时仅使用配置。实现所有这些
Spring Integration 的场景很容易,因为它允许
解耦的、事件驱动的JobLauncher
.
Spring Batch 集成提供了JobLaunchingMessageHandler
类
用于启动批处理作业。的JobLaunchingMessageHandler
由
Spring Integration 消息,其有效负载为JobLaunchRequest
.这个类是Job
启动并围绕JobParameters
他们是
启动 Batch 作业所必需的。
下图显示了典型的 Spring 集成 启动 Batch 作业所需的消息流。EIP (Enterprise Integration Patterns) 网站提供了消息传送图标及其描述的完整概述。

将文件转换为 JobLaunchRequest
以下示例将文件转换为JobLaunchRequest
:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
The JobExecution Response
When a batch job is being executed, a
JobExecution
instance is returned. You can use this
instance to determine the status of an execution. If
a JobExecution
is able to be created
successfully, it is always returned, regardless of whether
or not the actual execution is successful.
The exact behavior on how the JobExecution
instance is returned depends on the provided
TaskExecutor
. If a
synchronous
(single-threaded)
TaskExecutor
implementation is used, the
JobExecution
response is returned only
after
the job completes. When using an
asynchronous
TaskExecutor
, the
JobExecution
instance is returned
immediately. You can then take the id
of
JobExecution
instance
(with JobExecution.getJobId()
) and query the
JobRepository
for the job’s updated status
using the JobExplorer
. For more
information, see
Querying the Repository.
Spring Batch Integration Configuration
Consider a case where someone needs to create a file inbound-channel-adapter
to listen
for CSV files in the provided directory, hand them off to a transformer
(FileMessageToJobRequest
), launch the job through the job launching gateway, and
log the output of the JobExecution
with the logging-channel-adapter
.
-
Java
-
XML
The following example shows how that common case can be configured in Java:
Java Configuration
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
The following example shows how that common case can be configured in XML:
XML Configuration
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
Example ItemReader Configuration
Now that we are polling for files and launching jobs, we need to configure our Spring
Batch ItemReader
(for example) to use the files found at the location defined by the job
parameter called "input.file.name", as the following bean configuration shows:
-
Java
-
XML
The following Java example shows the necessary bean configuration:
Java Configuration
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
The following XML example shows the necessary bean configuration:
XML Configuration
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
The main points of interest in the preceding example are injecting the value of
#{jobParameters['input.file.name']}
as the Resource property value and setting the ItemReader
bean
to have step scope. Setting the bean to have step scope takes advantage of
the late binding support, which allows access to the
jobParameters
variable.