Batch 的领域语言
Batch 的域语言
对于任何有经验的批处理架构师来说,批处理的总体概念
Spring Batch 应该是熟悉和舒适的。有 “Jobs” 和 “Steps” 以及
开发人员提供的名为ItemReader
和ItemWriter
.然而
由于 Spring 模式、作、模板、回调和惯用语,因此有
以下机会:
-
对明确关注点分离的依从性显著提高。
-
清晰描述的架构层和作为接口提供的服务。
-
简单的默认实现,允许快速采用和易于使用 开箱即用。
-
显著增强了可扩展性。
下图是批处理参考体系结构的简化版本,该体系结构 已经使用了几十年。它概述了构成 batch processing 的 domain language 进行批处理。此体系结构框架是一个蓝图,它具有以下 已通过最近几代的数十年实施得到验证 平台(COBOL/Mainframe、C/Unix 和现在的 Java/anywhere)。JCL 和 COBOL 开发人员 可能和 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 提供了 layers、components 和 technical 的物理实现 服务常见于健壮、可维护的系统,用于解决 使用基础设施和扩展创建从简单到复杂的批处理应用程序 以满足非常复杂的加工需求。

上图突出显示了构成
Spring Batch 中。Job 有一对多步骤,每个步骤都只有一个ItemReader
,
一ItemProcessor
和一个ItemWriter
.需要启动一个作业(使用JobLauncher
),并且需要将有关当前正在运行的进程的元数据存储在JobRepository
).
工作
本节介绍与批处理作业概念相关的构造型。一个Job
是一个
封装整个批处理过程的实体。与其他 Spring 一样
projects 中,一个Job
与 XML 配置文件或基于 Java 的
配置。此配置可称为 “job configuration”。然而Job
只是整个层次结构的顶部,如下图所示:

在 Spring Batch 中,Job
只是一个容器Step
实例。它结合了多个
在逻辑上属于流中并允许配置属性的步骤
global 到所有步骤,例如可重启性。作业配置包含:
-
作业的简单名称。
-
的定义和排序
Step
实例。 -
作业是否可重新启动。
对于使用 Java 配置的用户,Spring Batch 提供了
Job 接口中,采用SimpleJob
类,这会创建一些标准的
功能Job
.当使用基于 java 的配置时,一个
builders 可用于实例化Job
,如下所示
例:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
对于使用 XML 配置的用户, Spring Batch 提供了Job
接口以SimpleJob
类,这会创建一些标准的
功能Job
.但是,批处理命名空间抽象化了
直接实例化它。相反,<job>
元素,如
以下示例:
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance 实例
一个JobInstance
是指逻辑作业运行的概念。考虑一个批处理作业,该
应该在一天结束时运行一次,例如 'EndOfDay'Job
从前面
图。有一个 'EndOfDay' 作业,但Job
必须是
单独跟踪。对于此 job,有一个逻辑JobInstance
每天。
例如,有 1 月 1 日运行、1 月 2 日运行,依此类推。如果 1 月 1 日
run 第一次失败,第二天再次运行,它仍然是 1 月 1 日的运行。
(通常,这也与它正在处理的数据相对应,即 1 月
第 1 次运行处理 1 月 1 日的数据)。因此,每个JobInstance
可以有多个
执行次数 (JobExecution
将在本章后面更详细地讨论),并且只有
一JobInstance
对应于特定的Job
并识别JobParameters
能
在给定时间运行。
的定义JobInstance
与要加载的数据完全无关。
这完全取决于ItemReader
实现来确定数据的加载方式。为
例如,在 EndOfDay 场景中,数据上可能有一列指示
数据所属的“生效日期”或“计划日期”。所以,1 月 1 日运行
将仅加载第 1 次运行中的数据,而 1 月 2 日运行将仅使用
2nd. 因为这个决定可能是一个商业决策,所以它留给了ItemReader
来决定。但是,使用相同的JobInstance
确定是否
'state'(即ExecutionContext
,本章稍后将对此进行讨论)
使用以前的执行。使用新的JobInstance
表示“从
beginning“,使用现有实例通常意味着”从您离开的地方开始”
关闭”。
JobParameters (作业参数)
经过讨论JobInstance
以及它与约伯记有何不同,这是自然而然的问题
is: “How is one”JobInstance
与另一个区别开来?答案是:JobParameters
.一个JobParameters
object 包含一组用于启动批处理的参数
工作。它们可用于识别,甚至在运行期间用作参考数据,如
如下图所示:

在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 1 日
对于 1 月 2 日,实际上只有一个Job
,但它有两个JobParameter
对象:
一个以 Job 参数 01-01-2017 启动,另一个以
参数为 01-02-2017。因此,合约可以定义为:JobInstance
= Job
+ 识别JobParameters
.这使开发人员能够有效地控制JobInstance
定义,因为它们控制传入的参数。
并非所有作业参数都需要用于识别JobInstance .默认情况下,它们会这样做。但是,框架也允许提交
的Job 的参数对JobInstance . |
JobExecution (任务执行)
一个JobExecution
是指单次尝试运行 Job 的技术概念。一
执行可能以 failure 或 success 结束,但JobInstance
对应于给定的
除非执行成功完成,否则不会认为执行已完成。
使用 EndOfDayJob
前面作为示例,请考虑JobInstance
为
2017 年 1 月 1 日,第一次运行时失败。如果使用相同的
将作业参数标识为首次运行 (01-01-2017),则新增JobExecution
是
创建。然而,仍然只有一个JobInstance
.
一个Job
定义作业是什么以及如何执行,而JobInstance
是一个
Purely Organizational 对象将执行分组在一起,主要是为了启用正确的
restart semantics 的 Semantics 中。一个JobExecution
,但是,它是
实际发生在运行期间,并且包含更多必须控制的属性
和 persisted,如下表所示:
财产 |
定义 |
地位 |
一个 |
startTime (开始时间) |
一个 |
结束时间 |
一个 |
exitStatus |
这 |
创建时间 |
一个 |
lastUpdated |
一个 |
executionContext |
包含需要在 执行。 |
failureExceptions 异常 |
在执行 |
这些属性很重要,因为它们是持久的,可以完全用于 确定执行的状态。例如,如果 01-01 的 EndOfDay 作业为 在晚上 9:00 执行,在 9:30 失败,则在批处理中创建以下条目 元数据表:
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
识别 |
1 |
日期 |
附表。日期 |
2017-01-01 |
真 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
为了清楚起见,可能已缩写或删除列名,并且 格式。 |
现在作业已失败,假设问题花了一整夜才出现
确定,因此 'Batch 窗口' 现在已关闭。进一步假设窗口
从晚上 9:00 开始,作业在 01-01 中再次启动,从中断处开始,然后
在 9:30 成功完成。因为现在是第二天,所以 01-02 作业必须为
运行,之后在 9:31 开始,并在正常时间完成
时间 10:30。没有要求一个JobInstance
被踢出
另一个,除非两个 Job 有可能尝试访问相同的数据,
导致数据库级别的锁定问题。这完全取决于调度程序
确定Job
应该运行。因为它们是分开的JobInstances
Spring
Batch 不会尝试阻止它们并发运行。(尝试运行
相同JobInstance
而另一个已在运行会导致JobExecutionAlreadyRunningException
被抛出)。现在应该有一个额外的条目
在这两个JobInstance
和JobParameters
表中的JobExecution
表,如下表所示:
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
2 |
结束工作 |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
识别 |
1 |
日期 |
附表。日期 |
2017-01-01 00:00:00 |
真 |
2 |
日期 |
附表。日期 |
2017-01-01 00:00:00 |
真 |
3 |
日期 |
附表。日期 |
2017-01-02 00:00:00 |
真 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
完成 |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
完成 |
为了清楚起见,可能已缩写或删除列名,并且 格式。 |
步
一个Step
是一个域对象,它封装了批处理的独立顺序阶段
工作。因此,每个 Job 都完全由一个或多个步骤组成。一个Step
包含
定义和控制实际批处理所需的所有信息。这
必然是一个模糊的描述,因为任何给定的Step
位于
开发者编写Job
.一个Step
可以像
开发者的愿望。一个简单的Step
可能会将数据从文件加载到数据库中,
需要很少或不需要代码(取决于使用的实现)。更复杂的Step
可能具有作为处理的一部分应用的复杂业务规则。如
替换为Job
一个Step
有一个单独的StepExecution
与唯一的JobExecution
,如下图所示:

StepExecution 执行
一个StepExecution
表示执行Step
.新的StepExecution
每次创建Step
运行,类似于JobExecution
.但是,如果步骤失败
执行,因为该步骤在失败之前,则不会保留任何执行。一个StepExecution
仅在其Step
实际上已启动。
Step
执行由StepExecution
类。每次执行
包含对其相应步骤的引用,并且JobExecution
和交易相关
数据,例如提交和回滚计数以及开始和结束时间。此外,每个步骤
execution 包含一个ExecutionContext
,其中包含开发人员需要
在批处理运行中保留,例如所需的统计信息或状态信息
重新启动。下表列出了StepExecution
:
财产 |
定义 |
地位 |
一个 |
startTime (开始时间) |
一个 |
结束时间 |
一个 |
exitStatus |
这 |
executionContext |
包含需要在 执行。 |
读取计数 |
已成功读取的项目数。 |
writeCount |
已成功写入的项目数。 |
提交计数 |
已为此执行提交的事务数。 |
回滚计数 |
业务事务由 |
readSkipCount |
次数 |
processSkipCount |
次数 |
filterCount 过滤器计数 |
已被 |
writeSkipCount |
次数 |
ExecutionContext
一ExecutionContext
表示持久化的键/值对的集合,
由框架控制,以便允许开发人员在某个位置存储持久化
state 的 state 的StepExecution
object 或JobExecution
对象。对于那些
熟悉 Quartz,它与 JobDataMap 非常相似。最好的使用示例是
便于重启。以平面文件输入为例,同时处理单个
行,框架会定期持久化ExecutionContext
在提交点。行为
so 允许ItemReader
存储其状态,以防在运行期间发生致命错误
或者即使停电。所需要做的就是输入当前的行数
读入上下文,如以下示例所示,框架将执行
休息:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
使用 EndOfDay 示例中的Job
Stereotypes 部分为例,假设有
是将文件加载到数据库中的一个步骤 'loadData'。在第一次运行失败后,
元数据表将类似于以下示例:
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
日期 |
附表。日期 |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
地位 |
1 |
1 |
loadData (加载数据) |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{件数=40321} |
在上述情况下,Step
运行 30 分钟并处理了 40,321 个“碎片”,其中
将表示文件中的行。此值在每次
commit 的 Commit 中,可以包含与ExecutionContext
.在提交之前收到通知需要满足以下条件之一StepListener
implementations(或ItemStream
),将更详细地讨论
在本指南的后面部分。与前面的示例一样,假设Job
是
第二天重新启动。重新启动时,ExecutionContext
之
最后一次运行将从数据库中重构。当ItemReader
打开时,它可以
检查它在上下文中是否有任何 stored 状态,并从那里初始化自身,
如以下示例所示:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,在上述代码运行后,当前行为 40,322,允许Step
从上次中断的地方重新开始。这ExecutionContext
也可用于
需要保留的有关运行本身的统计信息。例如,如果平面文件
包含跨多行存在的处理订单,则可能需要
store 已处理的订单数量(这与
行读取),以便可以在Step
替换为总数
正文中处理的订单数。框架为开发人员处理将此存储在
order 以正确地将其范围限定为单个JobInstance
.这可能非常困难
了解是否存在ExecutionContext
应该使用或不使用。例如,使用
'EndOfDay' 示例中,当 01-01 运行第二次再次开始时,
框架认识到它是相同的JobInstance
和单个Step
基础
拉取ExecutionContext
并将其(作为StepExecution
) 到Step
本身。相反,对于 01-02 运行,框架
识别出它是一个不同的实例,因此必须将空上下文传递给Step
.框架为
developer,以确保在正确的时间向他们提供状态。这也很重要
请注意,恰好有一个ExecutionContext
存在每StepExecution
在任何给定时间。
的客户端ExecutionContext
应该小心,因为这会创建一个共享的
键空间。因此,在输入值时应小心,以确保没有数据
覆盖。但是,Step
在上下文中绝对不存储任何数据,因此没有
对框架产生不利影响的方式。
同样重要的是要注意,至少有一个ExecutionContext
每JobExecution
并且每个StepExecution
.例如,请考虑以下
代码片段:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
正如评论中所指出的,ecStep
不等于ecJob
.他们是两个不同的ExecutionContexts
.范围限定为Step
保存在Step
,而作用域为 Job 的 Job 则保存在每个Step
执行。
在ExecutionContext ,则所有非临时条目都必须为Serializable .
执行上下文的正确序列化为步骤和作业的重启功能奠定了基础。
如果您使用的键或值本身不可序列化,则需要
采用定制的序列化方法。无法序列化执行上下文
可能会危及状态持久化过程,使失败的作业无法正确恢复。 |
JobRepository 仓库
JobRepository
是上面提到的所有 Stereotype 的持久化机制。
它为JobLauncher
,Job
和Step
实现。当Job
首次启动时,一个JobExecution
从存储库获取,并且在
执行过程,StepExecution
和JobExecution
实现是持久化的
通过将它们传递到存储库。
Spring Batch XML 命名空间支持配置JobRepository
实例
使用<job-repository>
标签中,如以下示例所示:
<job-repository id="jobRepository"/>
使用 Java 配置时,@EnableBatchProcessing
注解提供了一个JobRepository
作为自动配置的开箱即用组件之一。
作业Starters
JobLauncher
表示用于启动Job
使用一组给定的JobParameters
,如以下示例所示:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
预计 implementations 会获得有效的JobExecution
从JobRepository
并执行Job
.
项目读取者
ItemReader
是一个抽象,表示Step
一
item 的 intent 值。当ItemReader
已经用尽了它所能提供的物品,它
通过返回null
.有关ItemReader
interface 及其
可以在 Readers And Writers 中找到各种实现。
项目写入器
ItemWriter
是一个抽象,表示Step
、一个批处理或块
的项目数。通常,ItemWriter
不知道它应该输入的内容
receive next,并且只知道在其当前调用中传递的 Item。更多
有关ItemWriter
接口及其各种实现可以在 Readers And Writers 中找到。
Item Processor
ItemProcessor
是表示项的业务处理的抽象。
虽然ItemReader
读取一个项目,而ItemWriter
写入它们时,ItemProcessor
提供用于转换或应用其他业务处理的访问点。
如果在处理 Item 时确定 Item 无效,则返回null
表示不应写出该项。有关ItemProcessor
接口可以在 Readers And Writers 中找到。
Batch 命名空间
前面列出的许多域概念都需要在 Spring 中配置ApplicationContext
.虽然上述接口的实现可以是
在标准 bean 定义中使用,为了方便
配置,如以下示例所示:
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>
只要已声明批处理命名空间,就可以使用其任何元素。更多
有关配置 Job 的信息,请参阅 配置 和
Running a Job (运行作业)。有关配置Step
可以在 配置步骤.