项读取和写入
ItemReaders 和 ItemWriters
所有批处理都可以用最简单的形式描述为大量读取
的数据,执行某种类型的计算或转换,并写入结果
外。Spring Batch 提供了三个关键接口来帮助执行批量读取和写入:ItemReader
,ItemProcessor
和ItemWriter
.
ItemReader
虽然概念很简单,但ItemReader
是提供来自多个
不同类型的输入。最常见的示例包括:
-
平面文件:平面文件项读取器从平面文件中读取数据行,通常 描述其数据字段由文件中的固定位置定义或分隔的记录 通过一些特殊字符(如逗号)。
-
XML:XML
ItemReaders
独立于用于解析的技术处理 XML, 映射和验证对象。输入数据允许验证 XML 文件 针对 XSD 架构。 -
Database:访问数据库资源以返回可映射到 对象进行处理。默认 SQL
ItemReader
implementations 调用RowMapper
要返回对象,请跟踪当前行(如果需要重新启动),请存储 Basic 统计信息,并提供一些事务增强功能,稍后将对此进行说明。
还有更多可能性,但本章我们将重点介绍基本的可能性。一个
所有可用的完整列表ItemReader
实现可以在附录 A 中找到。
ItemReader
是泛型
input作,如以下接口定义所示:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
这read
method 定义了ItemReader
.调用它
返回一个项目或null
如果没有更多项目。一个项目可能表示
文件、数据库中的一行或 XML 文件中的元素。通常预期
这些对象将映射到可用的域对象(例如Trade
,Foo
或其他),但
是合同中没有要求这样做的。
预计ItemReader
interface 仅转发。
但是,如果底层资源是事务性的(例如 JMS 队列),则调用read
可能会在 rollback 场景中的后续调用中返回相同的 logical item。是的
还值得注意的是,缺少 Item 来处理ItemReader
不会导致
异常。例如,数据库ItemReader
配置了
返回 0 个结果的 query 返回null
在第一次调用read
.
ItemWriter
ItemWriter
在功能上类似于ItemReader
但使用逆运算。
资源仍然需要定位、打开和关闭,但它们的不同之处在于ItemWriter
写出,而不是读入。对于数据库或队列,
这些作可以是 INSERTS、UPDATES 或 SENDS。的序列化格式
输出特定于每个批处理作业。
与 一样ItemReader
,ItemWriter
是一个相当通用的接口,如下面的接口定义所示:
public interface ItemWriter<T> {
void write(Chunk<? extends T> items) throws Exception;
}
与 一样read
上ItemReader
,write
提供了ItemWriter
.它
尝试写出传入的项列表,只要该列表处于打开状态。因为它是
通常期望将项目一起“批处理”到一个块中,然后输出
interface 接受 Item 列表,而不是 Item 本身。写出
list 中,任何可能需要的刷新都可以在从写入返回之前执行
方法。例如,如果写入 Hibernate DAO,则可以多次调用 write,
每个项目一个。然后,编写器可以调用flush
在之前的 Hibernate 会话中
返回。
ItemStream
双ItemReaders
和ItemWriters
很好地服务于他们的个人目的,但有一个
他们俩都关心需要另一个接口。一般来说,作为
作为批处理作业范围的一部分,读取器和写入器需要打开、关闭,并且
需要一种持久化状态的机制。这ItemStream
interface 就是为了达到这个目的,
如以下示例所示:
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
在描述每种方法之前,我们应该提到ExecutionContext
.的客户端ItemReader
也实现ItemStream
应该调用open
在对read
,以打开任何资源(如文件)或获取连接。类似的
限制适用于ItemWriter
实现ItemStream
.如
第 2 章,如果在ExecutionContext
,它可用于启动
这ItemReader
或ItemWriter
位于其初始状态以外的位置。相反close
以确保安全释放 open 期间分配的所有资源。update
主要是为了确保当前持有的任何状态都加载到
提供的ExecutionContext
.此方法在提交之前调用,以确保
当前状态在提交之前保留在数据库中。
在特殊情况下,客户端的ItemStream
是一个Step
(来自春季
Batch Core)、一个ExecutionContext
为每个 StepExecution 创建,以允许用户
存储特定执行的状态,如果
一样JobInstance
将再次启动。对于熟悉 Quartz 的人来说,语义
与 Quartz 非常相似JobDataMap
.
委托模式和注册步骤
请注意,CompositeItemWriter
是委托模式的一个示例,即
常见于 Spring Batch 中。委托本身可以实现回调接口
如StepListener
.如果它们这样做,并且它们是否与 Spring 结合使用
Batch Core 作为Step
在Job
,那么他们几乎肯定需要
手动注册到Step
.直接
连接到Step
如果它实现了ItemStream
或StepListener
接口。但是,由于Step
,
它们需要作为侦听器或流注入(或两者兼而有之,如果合适)。
下面的示例演示如何在 XML 中将委托作为流注入:
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
commit-interval="2">
<streams>
<stream ref="barWriter" />
</streams>
</chunk>
</tasklet>
</step>
</job>
<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter" />
</bean>
<bean id="barWriter" class="...BarWriter" />
下面的示例演示如何在 XML 中将委托作为流注入:
@Bean
public Job ioSampleJob(JobRepository jobRepository) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1())
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(fooReader())
.processor(fooProcessor())
.writer(compositeItemWriter())
.stream(barWriter())
.build();
}
@Bean
public CustomCompositeItemWriter compositeItemWriter() {
CustomCompositeItemWriter writer = new CustomCompositeItemWriter();
writer.setDelegate(barWriter());
return writer;
}
@Bean
public BarWriter barWriter() {
return new BarWriter();
}
平面文件
交换批量数据的最常见机制之一一直是平面 文件。与 XML 不同,XML 有一个公认的标准来定义其结构 (XSD) 中,任何读取平面文件的人都必须提前了解文件的确切情况 结构。通常,所有平面文件都分为两种类型:分隔和固定长度。 分隔文件是指字段由分隔符(如逗号)分隔的文件。 Fixed Length 文件具有具有固定长度的字段。
这FieldSet
在 Spring Batch 中处理平面文件时,无论是用于 Importing 还是
output 中,最重要的类之一是FieldSet
.许多架构和
库包含用于帮助您从文件中读入的抽象,但它们通常
返回一个String
或String
对象。这真的只能让你成功一半
那里。一个FieldSet
是 Spring Batch 的抽象,用于启用字段的绑定
文件资源。它允许开发人员以与
他们将使用数据库输入。一个FieldSet
在概念上类似于 JDBCResultSet
.一个FieldSet
只需要一个参数:一个String
令牌数组。
(可选)您还可以配置字段的名称,以便字段可以
按索引或名称访问,如ResultSet
,如下所示
例:
String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);
还有更多选项FieldSet
接口,例如Date
长BigDecimal
等。最大的优势FieldSet
是它提供
平面文件输入的一致解析。而不是每个批处理作业在
可能意想不到的方式,它可以是一致的,无论是在处理由
format 异常,或者在执行简单的数据转换时。
FlatFileItemReader
平面文件是最多包含二维(表格)数据的任何类型的文件。
在 Spring Batch 框架中读取平面文件由名为FlatFileItemReader
,它提供读取和解析平面的基本功能
文件。的两个最重要的必需依赖项FlatFileItemReader
是Resource
和LineMapper
.这LineMapper
界面将在 Next 中进行更多探讨
部分。resource 属性表示 Spring CoreResource
.文档
解释如何创建这种类型的 bean 可以在 Spring 中找到
框架,第 5 章。资源。因此,本指南不详细介绍
创建Resource
objects beyond 显示了以下简单示例:
Resource resource = new FileSystemResource("resources/trades.csv");
在复杂的批处理环境中,目录结构通常由 Enterprise Application Integration (EAI) 管理 基础结构,其中为外部接口建立了放置区以移动文件 从 FTP 位置到批处理位置,反之亦然。文件移动实用程序 超出了 Spring Batch 架构的范围,但对于 Batch 来说并不罕见 Job Streams 将文件移动实用程序作为 Job Stream 中的步骤包含在内。批次 架构只需要知道如何找到要处理的文件。Spring Batch 从此起点开始将数据馈送到管道的过程。但是, Spring 集成提供了许多 这些类型的服务。
中的其他属性FlatFileItemReader
让您进一步指定数据的情况
解释,如下表所述:
财产 | 类型 | 描述 |
---|---|---|
评论 |
字符串 |
指定指示注释行的行前缀。 |
编码 |
字符串 |
指定要使用的文本编码。默认值为 |
线映射器 |
|
将 |
linesToSkip (行到 Skip) |
int |
文件顶部要忽略的行数。 |
recordSeparatorPolicy 的 |
RecordSeparatorPolicy 的 |
用于确定行尾的位置 并执行诸如 continue 之类的作,以 if 结尾的行位于带引号的字符串内。 |
资源 |
|
要从中读取的资源。 |
skippedLines回调 |
LineCallbackHandler 线回调处理程序 |
将
文件中要跳过的行。如果 |
严格 |
布尔 |
在 strict 模式下,读取器在 |
LineMapper
与 一样RowMapper
,它采用一个低级结构,例如ResultSet
并返回
一Object
,平面文件处理需要相同的结构来转换String
线
转换为Object
,如以下接口定义所示:
public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
基本协定是,给定当前行及其行号
associated,则 Mapper 应返回一个结果域对象。这类似于RowMapper
,因为每行都与其行号相关联,就像ResultSet
与其行号相关联。这允许将行号绑定到
生成的 domain 对象,用于身份比较或提供更多信息的日志记录。然而
与RowMapper
这LineMapper
被赋予一条原始行,如上所述,该行仅
让你成功一半。该行必须被标记成FieldSet
,然后可以是
映射到对象,如本文档后面所述。
LineTokenizer
用于将一行 input 转换为FieldSet
是必需的,因为有
可以是需要转换为FieldSet
.在
Spring Batch 中,这个接口是LineTokenizer
:
public interface LineTokenizer {
FieldSet tokenize(String line);
}
一个LineTokenizer
是这样的,给定一行输入(理论上String
可以包含多行)、一个FieldSet
表示线是
返回。这FieldSet
然后可以传递给FieldSetMapper
.Spring Batch 包含
以下LineTokenizer
实现:
-
DelimitedLineTokenizer
:用于记录中的字段由 定界符。最常见的分隔符是逗号,但通常使用竖线或分号 也。 -
FixedLengthTokenizer
:用于记录中的字段均为“固定 宽度”。必须为每种记录类型定义每个字段的宽度。 -
PatternMatchingCompositeLineTokenizer
:确定哪个LineTokenizer
在 应该通过检查模式来在特定行上使用分词器。
FieldSetMapper
这FieldSetMapper
interface 定义了一个方法mapFieldSet
,它采用FieldSet
object 并将其内容映射到对象。此对象可以是自定义 DTO、
domain 对象或数组,具体取决于作业的需要。这FieldSetMapper
是
与LineTokenizer
翻译资源中的一行数据
转换为所需类型的对象,如以下接口定义所示:
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
使用的模式与RowMapper
使用者JdbcTemplate
.
DefaultLineMapper
现在,用于读取平面文件的基本接口已经定义,它变为 明确需要三个基本步骤:
-
从文件中读取一行。
-
将
String
行插入LineTokenizer#tokenize()
方法检索FieldSet
. -
将
FieldSet
从分词化返回到FieldSetMapper
,返回 结果ItemReader#read()
方法。
上面描述的两个接口代表两个独立的任务:将一行转换为FieldSet
并将FieldSet
传递给域对象。因为LineTokenizer
匹配LineMapper
(一行)和FieldSetMapper
匹配LineMapper
,这是一个默认实现,它
同时使用LineTokenizer
以及FieldSetMapper
。这DefaultLineMapper
,
如下面的类定义所示,表示大多数用户需要的行为:
public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {
private LineTokenizer tokenizer;
private FieldSetMapper<T> fieldSetMapper;
public T mapLine(String line, int lineNumber) throws Exception {
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
上述功能在默认实现中提供,而不是构建 导入到 Reader 本身中(就像在框架的早期版本中所做的那样),以允许用户 在控制解析过程方面具有更大的灵活性,尤其是在访问 RAW 时 线路。
简单分隔文件读取示例
以下示例说明了如何使用实际域方案读取平面文件。 此特定批处理作业从以下文件中读取 football players:
ID,lastName,firstName,position,birthYear,debutYear "AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996", "AbduRa00,Abdullah,Rabih,rb,1975,1999", "AberWa00,Abercrombie,Walter,rb,1959,1982", "AbraDa00,Abramowicz,Danny,wr,1945,1967", "AdamBo00,Adams,Bob,te,1946,1969", "AdamCh00,Adams,Charlie,wr,1979,2003"
此文件的内容映射到以下内容Player
domain 对象:
public class Player implements Serializable {
private String ID;
private String lastName;
private String firstName;
private String position;
private int birthYear;
private int debutYear;
public String toString() {
return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
",First Name=" + firstName + ",Position=" + position +
",Birth Year=" + birthYear + ",DebutYear=" +
debutYear;
}
// setters and getters...
}
要将FieldSet
转换为Player
object、FieldSetMapper
这又回到了玩家的需求
,如以下示例所示:
protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fieldSet) {
Player player = new Player();
player.setID(fieldSet.readString(0));
player.setLastName(fieldSet.readString(1));
player.setFirstName(fieldSet.readString(2));
player.setPosition(fieldSet.readString(3));
player.setBirthYear(fieldSet.readInt(4));
player.setDebutYear(fieldSet.readInt(5));
return player;
}
}
然后,可以通过正确构造FlatFileItemReader
并调用read
,如以下示例所示:
FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
//DelimitedLineTokenizer defaults to comma as its delimiter
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();
每次调用read
返回一个新的Player
object 从文件中的每一行中获取。当文件末尾为
达到null
返回。
按名称映射字段
两者都允许一项额外的功能DelimitedLineTokenizer
和FixedLengthTokenizer
它在功能上类似于
JDBCResultSet
.字段的名称可以注入到其中任何一个LineTokenizer
实现来提高 Map 函数的可读性。
首先,将平面文件中所有字段的列名注入到分词器中。
如以下示例所示:
tokenizer.setNames(new String[] {"ID", "lastName", "firstName", "position", "birthYear", "debutYear"});
一个FieldSetMapper
可以按如下方式使用此信息:
public class PlayerMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fs) {
if (fs == null) {
return null;
}
Player player = new Player();
player.setID(fs.readString("ID"));
player.setLastName(fs.readString("lastName"));
player.setFirstName(fs.readString("firstName"));
player.setPosition(fs.readString("position"));
player.setDebutYear(fs.readInt("debutYear"));
player.setBirthYear(fs.readInt("birthYear"));
return player;
}
}
将 FieldSet 自动映射到 Domain Objects
对于许多人来说,必须编写一个特定的FieldSetMapper
和写作一样繁琐
一个特定的RowMapper
对于JdbcTemplate
.Spring Batch 通过提供
一个FieldSetMapper
通过将字段名称与 setter 匹配来自动映射字段
在对象上使用 JavaBean 规范。
再次以足球为例,BeanWrapperFieldSetMapper
配置如下所示
以下 XML 代码段:
<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="player" />
</bean>
<bean id="player"
class="org.springframework.batch.sample.domain.Player"
scope="prototype" />
再次以足球为例,BeanWrapperFieldSetMapper
配置如下所示
以下 Java 代码段:
@Bean
public FieldSetMapper fieldSetMapper() {
BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();
fieldSetMapper.setPrototypeBeanName("player");
return fieldSetMapper;
}
@Bean
@Scope("prototype")
public Player player() {
return new Player();
}
对于FieldSet
,映射器会在新的
实例的Player
对象(因此,需要原型范围)在
Spring 容器查找与属性名称匹配的 setter 的方式相同。每个都可用
字段中的FieldSet
映射,结果的Player
object 返回,没有
需要代码。
固定长度文件格式
到目前为止,仅对分隔文件进行了详细讨论。然而,它们代表 只有一半的文件读取图片。许多使用平面文件的组织都使用固定的 length 格式。固定长度文件示例如下:
UK21341EAH4121131.11customer1 UK21341EAH4221232.11customer2 UK21341EAH4321333.11customer3 UK21341EAH4421434.11customer4 UK21341EAH4521535.11customer5
虽然这看起来像一个大字段,但它实际上代表了 4 个不同的字段:
-
ISIN: 所订购商品的唯一标识符 - 长度为 12 个字符。
-
数量:所订购商品的数量 - 3 个字符长。
-
价格:商品的价格 - 5 个字符长。
-
客户:订购商品的客户的 ID - 长度为 9 个字符。
在配置FixedLengthLineTokenizer
,则必须提供这些长度中的每一个
以范围的形式。
以下示例显示了如何为FixedLengthLineTokenizer
在
XML:
<bean id="fixedLengthLineTokenizer"
class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
<property name="names" value="ISIN,Quantity,Price,Customer" />
<property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>
因为FixedLengthLineTokenizer
使用相同的LineTokenizer
interface 设置为
前面讨论过,它返回相同的FieldSet
就像使用了分隔符一样。这
允许使用相同的方法处理其输出,例如使用BeanWrapperFieldSetMapper
.
支持范围的上述语法需要专用属性编辑器 |
以下示例显示了如何为FixedLengthLineTokenizer
在
Java:
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
tokenizer.setColumns(new Range(1, 12),
new Range(13, 15),
new Range(16, 20),
new Range(21, 29));
return tokenizer;
}
因为FixedLengthLineTokenizer
使用相同的LineTokenizer
interface 设置为
上面讨论过,它会返回相同的FieldSet
就像使用了分隔符一样。这
允许使用相同的方法来处理其输出,例如使用BeanWrapperFieldSetMapper
.
单个文件中的多个记录类型
到目前为止,所有的文件读取示例都对 为简单起见:文件中的所有记录都具有相同的格式。但是,这可能会 并非总是如此。文件可能包含具有不同 格式,这些格式需要以不同的方式进行标记化并映射到不同的对象。这 以下文件摘录说明了这一点:
USER;Smith;Peter;;T;20014539;F LINEA;1044391041ABC037.49G201XX1383.12H LINEB;2134776319DEF422.99M005LI
在这个文件中,我们有三种类型的记录,“USER”、“LINEA”和“LINEB”。“USER” 行
对应于一个User
对象。“LINEA” 和 “LINEB” 都对应Line
对象
尽管 “LINEA” 比 “LINEB” 有更多的信息。
这ItemReader
单独读取每一行,但我们必须指定不同的LineTokenizer
和FieldSetMapper
对象,以便ItemWriter
接收
正确的项目。这PatternMatchingCompositeLineMapper
通过允许地图
的模式更改为LineTokenizers
和 patterns 更改为FieldSetMappers
进行配置。
以下示例显示了如何为FixedLengthLineTokenizer
在
XML:
<bean id="orderFileLineMapper"
class="org.spr...PatternMatchingCompositeLineMapper">
<property name="tokenizers">
<map>
<entry key="USER*" value-ref="userTokenizer" />
<entry key="LINEA*" value-ref="lineATokenizer" />
<entry key="LINEB*" value-ref="lineBTokenizer" />
</map>
</property>
<property name="fieldSetMappers">
<map>
<entry key="USER*" value-ref="userFieldSetMapper" />
<entry key="LINE*" value-ref="lineFieldSetMapper" />
</map>
</property>
</bean>
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
PatternMatchingCompositeLineMapper lineMapper =
new PatternMatchingCompositeLineMapper();
Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
tokenizers.put("USER*", userTokenizer());
tokenizers.put("LINEA*", lineATokenizer());
tokenizers.put("LINEB*", lineBTokenizer());
lineMapper.setTokenizers(tokenizers);
Map<String, FieldSetMapper> mappers = new HashMap<>(2);
mappers.put("USER*", userFieldSetMapper());
mappers.put("LINE*", lineFieldSetMapper());
lineMapper.setFieldSetMappers(mappers);
return lineMapper;
}
在此示例中,“LINEA” 和 “LINEB” 具有单独的LineTokenizer
实例,但它们都使用
一样FieldSetMapper
.
这PatternMatchingCompositeLineMapper
使用PatternMatcher#match
方法
以便为每行选择正确的代表。这PatternMatcher
允许
两个具有特殊含义的通配符:问号 (“?”) 只匹配一个
字符,而星号 (“*”) 匹配零个或多个字符。请注意,在
前面的配置,所有模式都以星号结尾,从而有效地使它们
前缀。这PatternMatcher
始终匹配最具体的模式
可能,无论配置中的顺序如何。因此,如果 “LINE*” 和 “LINEA*” 是
两者都列为模式,“LINEA” 将匹配模式 “LINEA*”,而 “LINEB” 将匹配
模式 “LINE*”。此外,单个星号 (“*”) 可以通过匹配
任何其他模式不匹配的任何行。
下面的示例演示如何匹配 XML 中任何其他模式都不匹配的行:
<entry key="*" value-ref="defaultLineTokenizer" />
下面的示例展示了如何匹配 Java 中任何其他模式都不匹配的行:
...
tokenizers.put("*", defaultLineTokenizer());
...
还有一个PatternMatchingCompositeLineTokenizer
可用于分词
独自。
平面文件包含每个记录跨越多行的记录也很常见。自
处理这种情况,则需要更复杂的策略。演示
常见模式可以在multiLineRecords
样本。
平面文件中的异常处理
在许多情况下,对行进行标记可能会导致引发异常。多
平面文件不完美,并且包含格式不正确的记录。许多用户选择
在记录问题、原始行和行时跳过这些错误的行
数。这些日志稍后可以手动检查,也可以由另一个批处理作业检查。对于这个
原因,Spring Batch 提供了用于处理解析异常的异常层次结构:FlatFileParseException
和FlatFileFormatException
.FlatFileParseException
是
由FlatFileItemReader
在尝试读取
文件。FlatFileFormatException
由LineTokenizer
接口,并指示在分词时遇到更具体的错误。
IncorrectTokenCountException
双DelimitedLineTokenizer
和FixedLengthLineTokenizer
能够指定
可用于创建FieldSet
.但是,如果列数
names 与在对行进行分词时找到的列数不匹配,则FieldSet
无法创建,并且IncorrectTokenCountException
被抛出,其中包含
遇到的令牌数和预期数量,如以下示例所示:
tokenizer.setNames(new String[] {"A", "B", "C", "D"});
try {
tokenizer.tokenize("a,b,c");
}
catch (IncorrectTokenCountException e) {
assertEquals(4, e.getExpectedCount());
assertEquals(3, e.getActualCount());
}
因为分词器配置了 4 个列名,但在
该文件、IncorrectTokenCountException
被抛出。
IncorrectLineLengthException
以固定长度格式格式化的文件时,解析时有其他要求 因为,与分隔格式不同,每列都必须严格遵守其预定义的 宽度。如果总行长不等于此列的最大宽度值,则 Exception 的 Exception 引发,如以下示例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5),
new Range(6, 10),
new Range(11, 15) });
try {
tokenizer.tokenize("12345");
fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
assertEquals(15, ex.getExpectedLength());
assertEquals(5, ex.getActualLength());
}
上述分词器的配置范围为:1-5、6-10 和 11-15。因此
线路的总长度为 15。但是,在前面的示例中,长度为 5 的行
传入,导致IncorrectLineLengthException
被扔出去。抛出一个
exception 而不是仅映射第一列,从而允许处理
行更早失败,并且包含的信息比失败时包含的信息多,而
尝试读取FieldSetMapper
.但是,在某些情况下,
线路的长度并不总是恒定的。因此,验证行长度可以
通过 'strict' 属性关闭,如以下示例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));
前面的示例与前面的示例几乎相同,只是tokenizer.setStrict(false)
被叫来了。此设置告诉分词器不强制执行
line length。一个FieldSet
现在已正确创建,并且
返回。但是,它仅包含其余值的空令牌。
FlatFileItemWriter
写出到平面文件与从文件读入存在相同的问题 必须克服。步骤必须能够在 交易方式。
LineAggregator
正如LineTokenizer
interface 是必需的,才能将 Item 转换为String
,文件写入必须能够将多个字段聚合成一个字符串
用于写入文件。在 Spring Batch 中,这是LineAggregator
,显示在
接口定义如下:
public interface LineAggregator<T> {
public String aggregate(T item);
}
这LineAggregator
是LineTokenizer
.LineTokenizer
采用String
并返回一个FieldSet
而LineAggregator
采用item
并返回一个String
.
PassThroughLineAggregator
最基本的LineAggregator
interface 是PassThroughLineAggregator
,它假定对象已经是一个字符串,或者
它的字符串表示形式可以写入,如下面的代码所示:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
public String aggregate(T item) {
return item.toString();
}
}
如果直接控制创建字符串
必需的,但FlatFileItemWriter
,例如 transaction 和 restart
支持是必要的。
简化文件编写示例
现在,LineAggregator
interface 及其最基本的实现PassThroughLineAggregator
,已经定义过,基本的编写流程可以是
解释:
-
需要写入的对象被传递给
LineAggregator
为了获得String
. -
返回的
String
将写入配置文件。
以下摘录自FlatFileItemWriter
在代码中表示:
public void write(T item) throws Exception {
write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}
在 XML 中,配置的简单示例可能如下所示:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean class="org.spr...PassThroughLineAggregator"/>
</property>
</bean>
在 Java 中,一个简单的配置示例可能如下所示:
@Bean
public FlatFileItemWriter itemWriter() {
return new FlatFileItemWriterBuilder<Foo>()
.name("itemWriter")
.resource(new FileSystemResource("target/test-outputs/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
FieldExtractor
前面的示例对于写入文件的最基本用途可能很有用。
但是,大多数FlatFileItemWriter
有一个需要
写出,因此必须转换为一行。在文件读取中,以下内容为
必填:
-
从文件中读取一行。
-
将该行传递到
LineTokenizer#tokenize()
方法,为了检索FieldSet
. -
将
FieldSet
从分词化返回到FieldSetMapper
,返回 结果ItemReader#read()
方法。
文件写入具有类似但相反的步骤:
-
将要写入的项传递给写入器。
-
将项上的字段转换为数组。
-
将结果数组聚合为一行。
因为框架无法知道对象的哪些字段需要
被写出,则FieldExtractor
才能完成将
item 添加到数组中,如以下接口定义所示:
public interface FieldExtractor<T> {
Object[] extract(T item);
}
的FieldExtractor
interface 应从字段
中,可以在
元素或作为固定宽度线条的一部分。
PassThroughFieldExtractor
在许多情况下,集合(例如数组、Collection
或FieldSet
,
需要写出来。从这些集合类型之一中“提取”数组非常
简单。为此,请将集合转换为数组。因此,PassThroughFieldExtractor
应该在此方案中使用。需要注意的是,如果
传入的对象不是一种 collection 类型,则PassThroughFieldExtractor
返回仅包含要提取的项的数组。
BeanWrapperFieldExtractor
与BeanWrapperFieldSetMapper
在文件读取部分中描述的是
通常最好配置如何将域对象转换为对象数组,而不是
而不是自己编写转换。这BeanWrapperFieldExtractor
提供了这个
功能,如以下示例所示:
BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });
String first = "Alan";
String last = "Turing";
int born = 1912;
Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);
assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);
此提取器实现只有一个必需的属性:要
地图。正如BeanWrapperFieldSetMapper
需要字段名称来映射FieldSet
对于所提供对象上的 setter,则BeanWrapperFieldExtractor
需要名字
映射到 getter 以创建对象数组。值得注意的是,
names 确定数组中字段的顺序。
分隔文件写入示例
最基本的平面文件格式是所有字段都由分隔符分隔的格式。
这可以使用DelimitedLineAggregator
.以下示例编写
输出一个简单的 domain 对象,该对象表示客户账户的信用:
public class CustomerCredit {
private int id;
private String name;
private BigDecimal credit;
//getters and setters removed for clarity
}
由于正在使用域对象,因此FieldExtractor
interface 以及要使用的分隔符。
以下示例演示如何使用FieldExtractor
在 XML 中使用分隔符:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit"/>
</bean>
</property>
</bean>
</property>
</bean>
以下示例演示如何使用FieldExtractor
在 Java 中使用分隔符:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
在前面的示例中,BeanWrapperFieldExtractor
前面已描述
chapter 用于将 name 和 credit 字段转换为CustomerCredit
导入到对象中
数组,然后在每个字段之间用逗号写出。
也可以使用FlatFileItemWriterBuilder.DelimitedBuilder
自
自动创建BeanWrapperFieldExtractor
和DelimitedLineAggregator
如以下示例所示:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.delimited()
.delimiter("|")
.names(new String[] {"name", "credit"})
.build();
}
固定宽度文件写入示例
Delimited 不是平面文件格式的唯一类型。许多人喜欢使用 set width for
在字段之间划定的每一列,这通常称为“固定宽度”。
Spring Batch 在文件写入中支持此功能,使用FormatterLineAggregator
.
使用相同的CustomerCredit
domain 对象,则可以配置为
在 XML 中遵循:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...FormatterLineAggregator">
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit" />
</bean>
</property>
<property name="format" value="%-9s%-2.0f" />
</bean>
</property>
</bean>
使用相同的CustomerCredit
domain 对象,则可以配置为
在 Java 中遵循:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
前面的大多数示例看起来应该很熟悉。但是,格式 property 是 new。
以下示例显示了 XML 中的 format 属性:
<property name="format" value="%-9s%-2.0f" />
以下示例显示了 Java 中的 format 属性:
...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...
底层实现是使用相同的Formatter
作为 Java 5 的一部分添加。Java 的Formatter
基于printf
C 编程的功能
语言。有关如何配置格式化程序的最详细信息,请参见
Formatter 的 Javadoc。
也可以使用FlatFileItemWriterBuilder.FormattedBuilder
自
自动创建BeanWrapperFieldExtractor
和FormatterLineAggregator
如以下示例所示:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.formatted()
.format("%-9s%-2.0f")
.names(new String[] {"name", "credit"})
.build();
}
处理文件创建
FlatFileItemReader
与文件资源的关系非常简单。当读者
初始化后,它会打开文件(如果存在),如果不存在,则引发异常。
文件写入并不是那么简单。乍一看,它似乎很相似
应该存在 Straightforward ContractFlatFileItemWriter
:如果文件已
exists,抛出一个异常,如果没有,则创建它并开始编写。然而
可能会重新启动Job
可能会导致问题。在正常的重启场景中,
contract is reversed:如果文件存在,则从最后一个已知 good 开始写入该文件
position 的 Position,如果没有,则引发异常。但是,如果文件名
因为这个工作总是一样的吗?在这种情况下,如果文件
存在,除非是重新启动。由于这种可能性,FlatFileItemWriter
包含属性shouldDeleteIfExists
.将此属性设置为 true 会导致
打开 Writer 时要删除的同名现有文件。
XML 项读取器和写入器
Spring Batch 为读取 XML 记录和 将它们映射到 Java 对象以及将 Java 对象写入 XML 记录。
对流式处理 XML 的约束
StAX API 用于 I/O,因为其他标准 XML 解析 API 不适合批处理 处理要求(DOM 一次将整个输入加载到内存中,并且 SAX 控制 解析过程,允许用户仅提供回调)。 |
我们需要考虑 XML 输入和输出在 Spring Batch 中是如何工作的。首先,有一个
一些与文件读取和写入不同但在 Spring Batch 中通用的概念
XML 处理。使用 XML 处理时,不是记录行 (FieldSet
实例)需要
要进行标记化,则假定 XML 资源是“片段”的集合
对应于单个记录,如下图所示:

在上述场景中,'trade' 标签被定义为 'root element'。万事 在 '<trade>' 和 '</trade>' 之间被视为一个 “片段”。Spring Batch 使用 Object/XML Mapping (OXM) 将片段绑定到对象。但是,Spring Batch 不是 与任何特定的 XML 绑定技术相关联。典型的用途是委托给 Spring OXM,它 为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 是可选的,您可以选择实现特定于 Spring Batch 的接口 如果需要。与 OXM 支持的技术的关系显示在 下图:

通过介绍 OXM 以及如何使用 XML 片段来表示记录,我们 现在可以更仔细地检查读者和作者。
StaxEventItemReader
这StaxEventItemReader
配置为处理
记录。首先,考虑以下一组 XML 记录,这些记录
这StaxEventItemReader
可以处理:
<?xml version="1.0" encoding="UTF-8"?>
<records>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0001</isin>
<quantity>5</quantity>
<price>11.39</price>
<customer>Customer1</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0002</isin>
<quantity>2</quantity>
<price>72.99</price>
<customer>Customer2c</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0003</isin>
<quantity>9</quantity>
<price>99.99</price>
<customer>Customer3</customer>
</trade>
</records>
为了能够处理 XML 记录,需要满足以下条件:
-
根元素名称:构成 对象。示例配置通过 trade 的值演示了这一点。
-
Resource:表示要读取的文件的 Spring Resource。
-
Unmarshaller
:Spring OXM 提供的解组工具,用于映射 XML fragment 复制到对象。
以下示例演示如何定义StaxEventItemReader
,该
元素命名为trade
,则data/iosample/input/input.xml
和 unmarshaller
叫tradeMarshaller
在 XML 中:
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
<property name="fragmentRootElementName" value="trade" />
<property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
<property name="unmarshaller" ref="tradeMarshaller" />
</bean>
以下示例演示如何定义StaxEventItemReader
,该
元素命名为trade
,则data/iosample/input/input.xml
和 unmarshaller
叫tradeMarshaller
在 Java 中:
@Bean
public StaxEventItemReader itemReader() {
return new StaxEventItemReaderBuilder<Trade>()
.name("itemReader")
.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
.addFragmentRootElements("trade")
.unmarshaller(tradeMarshaller())
.build();
}
请注意,在此示例中,我们选择使用XStreamMarshaller
,它接受
作为 Map 传入的别名,第一个键和值是 fragment 的名称
(即根元素)和要绑定的对象类型。然后,类似于FieldSet
这
映射到 Object Type 中字段的其他元素的名称描述为
map 中的键/值对。在配置文件中,我们可以使用 Spring 配置
实用程序来描述所需的别名。
以下示例演示如何在 XML 中描述别名:
<bean id="tradeMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
以下示例演示如何在 Java 中描述别名:
@Bean
public XStreamMarshaller tradeMarshaller() {
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
return marshaller;
}
在输入时,读取器会读取 XML 资源,直到它识别出一个新的片段是
即将开始。默认情况下,读取器会匹配元素名称,以识别新的
fragment 即将启动。Reader 从
片段并将文档传递给反序列化器(通常是 Spring 周围的包装器
OXM 公司Unmarshaller
) 将 XML 映射到 Java 对象。
总之,此过程类似于以下 Java 代码,它使用 injection 提供的 Spring 配置:
StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());
boolean hasNext = true;
Trade trade = null;
while (hasNext) {
trade = xmlStaxEventItemReader.read();
if (trade == null) {
hasNext = false;
}
else {
System.out.println(trade);
}
}
StaxEventItemWriter
Output 与 input 对称工作。这StaxEventItemWriter
需要一个Resource
一个
marshaller 和rootTagName
.将 Java 对象传递给编组处理程序(通常为
标准 Spring OXM Marshaller),它写入Resource
通过使用自定义事件
编写器,该 Writer 会过滤StartDocument
和EndDocument
为每个
fragment 的 fragment 中。
下面的 XML 示例使用MarshallingEventWriterSerializer
:
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" ref="outputResource" />
<property name="marshaller" ref="tradeMarshaller" />
<property name="rootTagName" value="trade" />
<property name="overwriteOutput" value="true" />
</bean>
以下 Java 示例使用MarshallingEventWriterSerializer
:
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
return new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(tradeMarshaller())
.resource(outputResource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
}
上述配置设置了三个必需的属性,并设置了可选的overwriteOutput=true
attrbute,用于指定
可以覆盖现有文件。
下面的 XML 示例使用与读取示例中使用的相同的封送处理程序 在本章前面显示:
<bean id="customerCreditMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="customer"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
下面的 Java 示例使用与读取示例中使用的相同的编组处理程序 在本章前面显示:
@Bean
public XStreamMarshaller customerCreditMarshaller() {
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
marshaller.setAliases(aliases);
return marshaller;
}
为了用 Java 示例进行总结,下面的代码说明了所有要点 讨论,演示了所需属性的编程设置:
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
StaxEventItemWriter staxItemWriter =
new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(marshaller)
.resource(resource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
staxItemWriter.afterPropertiesSet();
ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);
JSON 项读取器和写入器
Spring Batch 支持读取和写入以下格式的 JSON 资源:
[
{
"isin": "123",
"quantity": 1,
"price": 1.2,
"customer": "foo"
},
{
"isin": "456",
"quantity": 2,
"price": 1.4,
"customer": "bar"
}
]
假设 JSON 资源是对应于 单个项目。Spring Batch 不绑定到任何特定的 JSON 库。
JsonItemReader
这JsonItemReader
将 JSON 解析和绑定委托给org.springframework.batch.item.json.JsonObjectReader
接口。此接口
旨在通过使用流式处理 API 读取 JSON 对象来实现
成块。目前提供了两种实现:
为了能够处理 JSON 记录,需要满足以下条件:
-
Resource
:一个 Spring Resource,表示要读取的 JSON 文件。 -
JsonObjectReader
:一个 JSON 对象读取器,用于解析 JSON 对象并将其绑定到项目
以下示例演示如何定义JsonItemReader
它与
previous JSON 资源org/springframework/batch/item/json/trades.json
以及JsonObjectReader
基于 Jackson:
@Bean
public JsonItemReader<Trade> jsonItemReader() {
return new JsonItemReaderBuilder<Trade>()
.jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonItemReader")
.build();
}
JsonFileItemWriter
这JsonFileItemWriter
将项的封送委托给org.springframework.batch.item.json.JsonObjectMarshaller
接口。合同
的 JSON 是获取一个对象并将其编组为 JSONString
.
目前提供了两种实现:
为了能够写入 JSON 记录,需要满足以下条件:
-
Resource
: 弹簧Resource
,表示要写入的 JSON 文件 -
JsonObjectMarshaller
:将对象编组器编组为 JSON 格式
以下示例演示如何定义JsonFileItemWriter
:
@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
return new JsonFileItemWriterBuilder<Trade>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonFileItemWriter")
.build();
}
多文件输入
在单个文件中处理多个文件是一个常见的要求Step
.假设
文件都具有相同的格式,MultiResourceItemReader
支持这种类型的
input 进行 XML 和平面文件处理。请考虑目录中的以下文件:
file-1.txt file-2.txt ignored.txt
file-1.txt 和 file-2.txt 的格式相同,出于业务原因,应为
一起处理。这MultiResourceItemReader
可用于读取两个文件
使用通配符。
以下示例演示如何读取 XML 中带有通配符的文件:
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
<property name="resources" value="classpath:data/input/file-*.txt" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
以下示例演示如何在 Java 中读取带有通配符的文件:
@Bean
public MultiResourceItemReader multiResourceReader() {
return new MultiResourceItemReaderBuilder<Foo>()
.delegate(flatFileItemReader())
.resources(resources())
.build();
}
引用的委托是一个简单的FlatFileItemReader
.上面的配置是
来自两个文件的 input,处理回滚和重启场景。应该注意的是,
与任何ItemReader
,添加额外的输入(在本例中为文件)可能会导致
重新启动时出现问题。建议批处理作业与自己的个人作业一起使用
目录,直到成功完成。
输入资源使用MultiResourceItemReader#setComparator(Comparator) 确保在重启方案中的作业运行之间保留资源顺序。 |
数据库
与大多数企业应用程序样式一样,数据库是 批。但是,由于 系统必须使用的数据集。如果 SQL 语句返回 100 万行,则 结果集可能会将所有返回的结果保存在内存中,直到读取完所有行。 Spring Batch 为此问题提供了两种类型的解决方案:
基于游标ItemReader
实现
使用数据库游标通常是大多数批处理开发人员的默认方法。
因为它是数据库对 “流” 关系数据问题的解决方案。这
JavaResultSet
class 本质上是一种面向对象的机制,用于作
光标。一个ResultSet
将光标保留到当前数据行。叫next
在ResultSet
将此光标移动到下一行。Spring Batch 基于游标ItemReader
实现在初始化时打开一个游标,并将游标向前移动一行
每次调用read
,返回可用于处理的映射对象。这close
method 以确保释放所有资源。Spring 核心JdbcTemplate
通过使用回调模式将
所有行ResultSet
和 close ,然后再将控制权返回给方法调用方。
但是,在 Batch 中,这必须等到步骤完成。下图显示了
基于游标的ItemReader
工程。请注意,虽然示例
使用 SQL(因为 SQL 是如此广为人知),任何技术都可以实现基本的
方法。

此示例说明了基本模式。给定一个 'FOO' 表,它有三列:ID
,NAME
和BAR
,选择 ID 大于 1 但小于 7 的所有行。这
将光标的开头(第 1 行)放在 ID 2 上。此行的结果应为
完全映射Foo
对象。叫read()
再次将光标移动到下一行,
即Foo
ID 为 3。这些读取的结果将在每个read
,允许对对象进行垃圾回收(假设没有实例变量
维护对它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的技术的 JDBC 实现。它有效
直接使用ResultSet
并需要针对连接运行 SQL 语句
从DataSource
.以下数据库架构用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人喜欢对每一行使用域对象,因此以下示例使用
实现RowMapper
接口将CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因为JdbcCursorItemReader
共享关键接口JdbcTemplate
,它很有用
请参阅有关如何读取此数据的示例JdbcTemplate
,以便对其进行对比
使用ItemReader
.对于此示例,假设 中有 1,000 行
这CUSTOMER
数据库。第一个示例使用JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行上述代码片段后,customerCredits
列表包含 1,000 个CustomerCredit
对象。在查询方法中,从DataSource
,则提供的 SQL 将针对它运行,并且mapRow
method 的调用请求
每个ResultSet
.将此与JdbcCursorItemReader
,如以下示例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行上述代码片段后,计数器等于 1,000。如果上面的代码有
将返回的customerCredit
导入到列表中,结果将恰好是
与JdbcTemplate
例。然而,这个ItemReader
是它允许 “streamed” 项目。这read
method 可以调用一次,则 Item
可以通过ItemWriter
,然后可以通过read
.这允许以 'chunks' 的形式完成项目读取和写入并提交
定期,这是高性能批处理的本质。此外,它
易于配置为注入 Spring BatchStep
.
以下示例演示如何将ItemReader
转换为Step
在 XML 中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例演示如何将ItemReader
转换为Step
在 Java 中:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
其他属性
因为在 Java 中打开游标有很多不同的选项,所以有很多
属性JdbcCursorItemReader
,如下所述
桌子:
忽略警告 |
确定是记录 SQLWarnings 还是导致异常。
默认值为 |
fetchSize (获取大小) |
为 JDBC 驱动程序提供有关应获取的行数的提示
当 |
最大行数 |
设置基础的最大行数限制 |
查询超时 |
设置驱动程序等待 |
verifyCursorPosition |
因为 |
saveState |
指示是否应将读取器的状态保存在 |
driverSupportsAbsolute |
指示 JDBC 驱动程序是否支持
在 |
setUseSharedExtendedConnection |
指示连接
used for cursor 应该被所有其他处理使用,从而共享相同的
交易。如果此项设置为 |
HibernateCursorItemReader
就像普通的 Spring 用户会做出是否使用 ORM 的重要决定一样
解决方案,这会影响它们是否使用JdbcTemplate
或HibernateTemplate
,Spring Batch 用户具有相同的选项。HibernateCursorItemReader
是游标技术的 Hibernate 实现。
Hibernate 的批量使用一直存在相当大的争议。这主要是因为
Hibernate 最初是为了支持在线应用程序样式而开发的。然而,那
并不意味着它不能用于批处理。最简单的解决方法
这个问题是使用StatelessSession
而不是标准会话。这将删除
Hibernate 采用的所有缓存和脏检查,这可能会导致
Batch 方案。有关 stateless 和 normal 之间区别的更多信息
Hibernate 会话,请参阅特定 Hibernate 版本的文档。这HibernateCursorItemReader
允许您声明一个 HQL 语句并传入一个SessionFactory
,它将每次调用传回一个项目以读取相同的基本
时尚作为JdbcCursorItemReader
.以下示例配置使用相同的
'customer credit' 示例作为 JDBC 读取器:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
此配置ItemReader
返回CustomerCredit
对象
如JdbcCursorItemReader
,假设 Hibernate 映射文件已经
为Customer
桌子。'useStatelessSession' 属性默认
设置为 true,但在此处添加是为了引起人们对打开或关闭它的能力的注意。
还值得注意的是,底层游标的 fetch size 可以使用setFetchSize
财产。与 一样JdbcCursorItemReader
,配置为
简单。
下面的示例展示了如何注入一个 HibernateItemReader
在 XML 中:
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
下面的示例展示了如何注入一个 HibernateItemReader
在 Java 中:
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
StoredProcedureItemReader
有时需要使用存储过程来获取游标数据。这StoredProcedureItemReader
其工作原理类似于JdbcCursorItemReader
,只不过,改为
运行查询以获取游标时,它会运行返回游标的存储过程。
存储过程可以通过三种不同的方式返回游标:
-
作为返回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作为作为 out 参数返回的 ref 游标(由 Oracle 和 PostgreSQL 使用)。
-
作为存储函数调用的返回值。
以下 XML 示例配置使用与前面相同的 'customer credit' 示例 例子:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下 Java 示例配置使用与 前面的例子:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
前面的示例依赖于存储过程来提供ResultSet
作为
返回结果(前面的选项 1)。
如果存储过程返回ref-cursor
(选项 2),那么我们需要提供
返回的 out 参数的位置ref-cursor
.
以下示例显示了如何使用第一个参数作为 XML:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例显示了如何使用第一个参数作为 Java:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
如果游标是从存储函数返回的(选项 3),我们需要将
property “function” 设置为true
.它默认为false
.
以下示例显示了 property totrue
在 XML 中:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例显示了 property totrue
在 Java 中:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
在所有这些情况下,我们都需要定义一个RowMapper
以及DataSource
和
实际过程名称。
如果存储过程或函数接受参数,则必须声明它们并
使用parameters
财产。以下示例(对于 Oracle)声明了三个
参数。第一个是out
参数返回 ref-cursor,而
第二个和第三个是采用 type 为INTEGER
.
以下示例演示如何在 XML 中使用参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
以下示例演示如何在 Java 中使用参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
除了参数声明之外,我们还需要指定一个PreparedStatementSetter
设置调用的参数值的实现。这与
这JdbcCursorItemReader
以上。Additional Properties 中列出的所有其他属性都适用于StoredProcedureItemReader
也。
寻呼ItemReader
实现
使用数据库游标的另一种方法是运行多个查询,其中每个查询 获取结果的一部分。我们将此部分称为页面。每个查询必须 指定 起始行号 和我们希望在页面中返回的行数。
JdbcPagingItemReader
分页的一个实现ItemReader
是JdbcPagingItemReader
.这JdbcPagingItemReader
需要一个PagingQueryProvider
负责提供 SQL
用于检索构成页面的行的查询。由于每个数据库都有自己的
策略来提供分页支持,我们需要使用不同的PagingQueryProvider
对于每个支持的数据库类型。还有SqlPagingQueryProviderFactoryBean
,它会自动检测正在使用的数据库并确定适当的PagingQueryProvider
实现。这简化了配置,并且是
推荐的最佳实践。
这SqlPagingQueryProviderFactoryBean
要求您指定select
子句和from
第。您还可以提供可选的where
第。这些条款和
必填sortKey
用于构建 SQL 语句。
在sortKey 以保证
执行之间不会丢失任何数据。 |
打开读取器后,它会将每次调用传回一个项目read
在同一
与其他时尚一样的基本时尚ItemReader
.分页发生在幕后,当
需要额外的行。
以下 XML 示例配置使用与
基于游标ItemReaders
如前所示:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
以下 Java 示例配置使用与
基于游标ItemReaders
如前所示:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
此配置ItemReader
返回CustomerCredit
对象使用RowMapper
,
必须指定。'pageSize' 属性确定读取的实体数量
从数据库进行每个查询运行。
'parameterValues' 属性可用于指定Map
of 参数值的
查询。如果您在where
子句,则每个条目的键应
匹配命名参数的名称。如果您使用传统的 '?' 占位符,则
key 应该是占位符的编号,从 1 开始。
JpaPagingItemReader
分页的另一种实现ItemReader
是JpaPagingItemReader
.JPA 执行
没有类似于 Hibernate 的概念StatelessSession
,所以我们不得不使用其他
功能。由于 JPA 支持分页,因此这是很自然的
在使用 JPA 进行批处理时的选择。读取每个页面后,
实体将分离,并且持久化上下文将被清除,以允许实体
在页面处理完毕后进行垃圾回收。
这JpaPagingItemReader
允许您声明一个 JPQL 语句并传入一个EntityManagerFactory
.然后,它每次调用传回一个项目以读取相同的基本
时尚无双ItemReader
.分页发生在幕后,当额外的
entities 是必需的。
以下 XML 示例配置使用与 前面显示的 JDBC 读取器:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
以下 Java 示例配置使用与 前面显示的 JDBC 读取器:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
此配置ItemReader
返回CustomerCredit
对象,其方式与
描述JdbcPagingItemReader
上,假设CustomerCredit
object 具有
正确的 JPA 注释或 ORM 映射文件。'pageSize' 属性确定
每次查询执行从数据库中读取的实体数。
数据库 ItemWriters
虽然平面文件和 XML 文件都有特定的ItemWriter
实例,则没有确切的等效项
在数据库世界中。这是因为交易提供了所有需要的功能。ItemWriter
实现对于 Files 来说是必需的,因为它们必须像事务一样运行,
跟踪写入的项目并在适当的时候进行冲洗或清理。
数据库不需要此功能,因为写入已经包含在
交易。用户可以创建自己的 DAO 来实现ItemWriter
interface 或
使用自定义中的一个ItemWriter
这是为通用处理问题编写的。也
方式,它们应该可以正常工作。需要注意的一件事是性能
以及通过批处理输出提供的错误处理功能。这是最
常见,当使用 Hibernate 作为ItemWriter
但在使用
JDBC 批处理模式。批处理数据库输出没有任何固有的缺陷,假设我们
请小心刷新,并且数据中没有错误。但是,任何
写入可能会引起混淆,因为无法知道是哪个单独的项目引起的
异常,或者即使任何单个项目负责,如
下图:

如果在写入之前缓冲了项,则在缓冲区
flushed 的 SET 文件。例如,假设每个块写入 20 个项目,
第 15 项会引发DataIntegrityViolationException
.就Step
就 20 项全部写入成功,因为无法知道
错误会发生,直到它们被实际写入为止。一次Session#flush()
称为
buffer 被清空并命中异常。此时,没有任何Step
可以做到。必须回滚事务。通常,此异常可能会导致
要跳过的项目(取决于跳过/重试策略),然后不写入
再。但是,在批处理场景中,无法知道是哪个项目导致了
问题。发生故障时,正在写入整个缓冲区。唯一的方法
解决此问题是在每个项后刷新,如下图所示:

这是一个常见的用例,尤其是在使用 Hibernate 时,以及
的实现ItemWriter
是在每次调用write()
.这样做允许
为了可靠地跳过项目,Spring Batch 在内部负责
对ItemWriter
在错误之后。
重用现有服务
批处理系统通常与其他应用程序样式结合使用。最
common 是一个在线系统,但它也可能支持集成甚至胖客户端
应用程序,方法是移动每个应用程序样式使用的必要批量数据。对于这个
原因,许多用户希望重用现有的 DAO 或其他服务
他们的批处理作业。Spring 容器本身通过允许任何
necessary 类。但是,在某些情况下,现有服务
需要充当ItemReader
或ItemWriter
,要么满足
另一个 Spring Batch 类,或者因为它确实是 mainItemReader
对于一个步骤。是的
为每个需要包装的服务编写一个 Adapter 类相当简单,但是
因为它是一个常见的问题,所以 Spring Batch 提供了实现:ItemReaderAdapter
和ItemWriterAdapter
.这两个类都实现了标准的 Spring
方法,并且设置起来相当简单。
下面的 XML 示例使用ItemReaderAdapter
:
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="generateFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
以下 Java 示例使用ItemReaderAdapter
:
@Bean
public ItemReaderAdapter itemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(fooService());
reader.setTargetMethod("generateFoo");
return reader;
}
@Bean
public FooService fooService() {
return new FooService();
}
需要注意的一点是,targetMethod
必须相同
作为read
:筋疲力尽时返回null
.否则,它将返回一个Object
.其他任何阻止框架知道处理何时结束、
导致无限循环或错误失败,具体取决于实现
的ItemWriter
.
下面的 XML 示例使用ItemWriterAdapter
:
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="processFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
以下 Java 示例使用ItemWriterAdapter
:
@Bean
public ItemWriterAdapter itemWriter() {
ItemWriterAdapter writer = new ItemWriterAdapter();
writer.setTargetObject(fooService());
writer.setTargetMethod("processFoo");
return writer;
}
@Bean
public FooService fooService() {
return new FooService();
}
防止状态持久性
默认情况下,所有ItemReader
和ItemWriter
implementations 会存储其当前的
state 中的ExecutionContext
在提交之前。但是,这可能并不总是如此
所需的行为。例如,许多开发人员选择将他们的数据库读取器
'rerunnable' 使用进程指示器。将额外的列添加到输入数据中,以
指示是否已处理。当正在读取特定记录时(或
written) 的 processed 标志从false
自true
.然后,SQL 语句可以
在where
子句,例如where PROCESSED_IND = false
,
从而确保在重新启动时仅返回未处理的记录。在
在这种情况下,最好不要存储任何状态,例如当前行号,
因为它在重新启动时无关紧要。因此,所有读取器和写入器都包含
'saveState' 属性。
下面的 Bean 定义显示了如何防止 XML 中的状态持久性:
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="rowMapper">
<bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
</property>
<property name="saveState" value="false" />
<property name="sql">
<value>
SELECT games.player_id, games.year_no, SUM(COMPLETES),
SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
from games, players where players.player_id =
games.player_id group by games.player_id, games.year_no
</value>
</property>
</bean>
以下 bean 定义显示了如何防止 Java 中的状态持久性:
@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<PlayerSummary>()
.dataSource(dataSource)
.rowMapper(new PlayerSummaryMapper())
.saveState(false)
.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
+ "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
+ "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
+ "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
+ "from games, players where players.player_id ="
+ "games.player_id group by games.player_id, games.year_no")
.build();
}
这ItemReader
configured 不会在ExecutionContext
为
它参与的任何执行。
创建自定义 ItemReader 和 ItemWriters
到目前为止,本章已经讨论了 Spring 中读写的基本契约
Batch 和一些常见的实现来实现。然而,这些都是公平的
generic,并且有许多可能的情况不是开箱即用的
实现。本节使用一个简单的示例来说明如何创建自定义ItemReader
和ItemWriter
implementation 并正确实现他们的 Contract。这ItemReader
还实现了ItemStream
,为了说明如何使 Reader 或
writer 可重启。
习惯ItemReader
例
在此示例中,我们创建了一个简单的ItemReader
实现
从提供的列表中读取。我们从实现最基本的 Contract 开始ItemReader
这read
方法,如以下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类采用一个项目列表,并一次返回一个项目,并删除每个项目
从列表中。当列表为空时,它会返回null
,从而满足最基本的
的要求ItemReader
,如以下测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使ItemReader
可重启
最后的挑战是使ItemReader
可重启。目前,如果处理是
interrupted 并重新开始,则ItemReader
必须从头开始。这是
实际上在许多情况下都有效,但有时最好使用批处理作业
从上次中断处重新开始。关键的判别因素通常是 Reader 是否是有状态的
或无状态。无状态读取器不需要担心可重启性,但
Stateful 必须尝试在 restart 时重建其最后一个已知状态。因此,
我们建议您尽可能保持自定义读取器无状态,因此您不必担心
关于可重启性。
如果您确实需要存储 state,则ItemStream
interface 应该使用:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用ItemStream
update
method,则ItemReader
存储在提供的ExecutionContext
键为 'current.index'。当ItemStream
open
方法时,调用ExecutionContext
检查以查看它是否
包含具有该键的条目。如果找到该键,则当前索引将移动到
那个位置。这是一个相当微不足道的示例,但它仍然符合通用合同:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
最ItemReaders
具有更复杂的重启逻辑。这JdbcCursorItemReader
,例如,将最后处理的行的行 ID 存储在
光标。
还值得注意的是,在ExecutionContext
不应为
琐碎。那是因为相同的ExecutionContext
用于所有ItemStreams
在
一个Step
.在大多数情况下,只需在键前加上类名就足够了
以保证唯一性。但是,在极少数情况下,两个相同类型的ItemStream
用于同一步骤(如果需要两个文件
output),则需要更独特的名称。出于这个原因,许多 Spring BatchItemReader
和ItemWriter
实现具有setName()
属性,允许此
key name 被覆盖。
习惯ItemWriter
例
实现自定义ItemWriter
在许多方面与ItemReader
例
但差异足以证明它自己的例子。但是,添加
可重启性本质上是相同的,因此本例不涉及。与ItemReader
example,一个List
是为了使示例尽可能简单
可能:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使ItemWriter
可重启
要使ItemWriter
重新启动的,我们将遵循与ItemReader
,添加并实现ItemStream
接口同步
执行上下文。在此示例中,我们可能必须计算处理的项目数
并将其添加为页脚记录。如果需要这样做,我们可以实施ItemStream
在我们的ItemWriter
以便 counter 从执行中重建
context 的 URL 中。
在许多实际情况下,自定义ItemWriters
也委托给另一个编写器,该编写器本身
是可重新启动的(例如,在写入文件时),否则它会写入
事务资源,因此不需要重新启动,因为它是无状态的。
当你有一个有状态的 writer 时,你可能应该确保实现ItemStream
如
以及ItemWriter
.还要记住,作者的客户端需要注意
这ItemStream
,因此您可能需要在配置中将其注册为流。
Item Reader 和 Writer 实现
在本节中,我们将向您介绍尚未阅读的读者和作家 在前面的部分中讨论过。
装饰
在某些情况下,用户需要将专门的行为附加到预先存在的ItemReader
.Spring Batch 提供了一些开箱即用的装饰器,这些装饰器可以添加
additional 行为添加到您的ItemReader
和ItemWriter
实现。
Spring Batch 包括以下装饰器:
SynchronizedItemStreamReader
当使用ItemReader
不是线程安全的,Spring Batch 提供了SynchronizedItemStreamReader
decorator 的 intent 函数,该 API 可用于将ItemReader
线程安全。Spring Batch 提供了一个SynchronizedItemStreamReaderBuilder
构造
的SynchronizedItemStreamReader
.
例如,FlatFileItemReader
不是线程安全的,不能在
多线程步骤。此读取器可以用SynchronizedItemStreamReader
以便在多线程步骤中安全地使用它。下面是一个如何装饰的示例
这样的读者:
@Bean
public SynchronizedItemStreamReader<Person> itemReader() {
FlatFileItemReader<Person> flatFileItemReader = new FlatFileItemReaderBuilder<Person>()
// set reader properties
.build();
return new SynchronizedItemStreamReaderBuilder<Person>()
.delegate(flatFileItemReader)
.build();
}
SingleItemPeekableItemReader
Spring Batch 包含一个装饰器,该装饰器将 peek 方法添加到ItemReader
.这个 peek
方法允许用户提前速览一个项目。对速览的重复调用将返回相同的结果
item 返回的 Item 中,这是从read
方法。Spring Batch 提供了一个SingleItemPeekableItemReaderBuilder
要构造SingleItemPeekableItemReader
.
SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为它不会 可以在多个线程中遵循 PEEK。只有一个线程偷看 将在下一次调用 READ 时获取该项目。 |
SynchronizedItemStreamWriter
当使用ItemWriter
不是线程安全的,Spring Batch 提供了SynchronizedItemStreamWriter
decorator 的 intent 函数,该 API 可用于将ItemWriter
线程安全。Spring Batch 提供了一个SynchronizedItemStreamWriterBuilder
构造
的SynchronizedItemStreamWriter
.
例如,FlatFileItemWriter
不是线程安全的,不能在
多线程步骤。这个编写器可以用SynchronizedItemStreamWriter
以便在多线程步骤中安全地使用它。下面是一个如何装饰的示例
这样的作家:
@Bean
public SynchronizedItemStreamWriter<Person> itemWriter() {
FlatFileItemWriter<Person> flatFileItemWriter = new FlatFileItemWriterBuilder<Person>()
// set writer properties
.build();
return new SynchronizedItemStreamWriterBuilder<Person>()
.delegate(flatFileItemWriter)
.build();
}
MultiResourceItemWriter
这MultiResourceItemWriter
将ResourceAwareItemWriterItemStream
并创建一个新的
output 资源(当当前资源中写入的项数超过itemCountLimitPerResource
.Spring Batch 提供了一个MultiResourceItemWriterBuilder
自
构造MultiResourceItemWriter
.
消息传递 Readers 和 Writers
Spring Batch 为常用的消息传递系统提供了以下读取器和写入器:
AmqpItemReader
这AmqpItemReader
是一个ItemReader
它使用AmqpTemplate
接收或转换
来自 Exchange 的消息。Spring Batch 提供了一个AmqpItemReaderBuilder
构造
的AmqpItemReader
.
AmqpItemWriter
这AmqpItemWriter
是一个ItemWriter
它使用AmqpTemplate
将消息发送到
一个 AMQP 交易所。如果
提供的AmqpTemplate
.Spring Batch 提供了一个AmqpItemWriterBuilder
自
构造AmqpItemWriter
.
JmsItemReader
这JmsItemReader
是一个ItemReader
对于使用JmsTemplate
.模板
应具有默认目标,该目标用于为read()
方法。Spring Batch 提供了一个JmsItemReaderBuilder
要构造JmsItemReader
.
JmsItemWriter
这JmsItemWriter
是一个ItemWriter
对于使用JmsTemplate
.模板
应具有默认目标,用于将项目发送到write(List)
.Spring
Batch 提供了一个JmsItemWriterBuilder
要构造JmsItemWriter
.
数据库读取器
Spring Batch 提供以下数据库读取器:
Neo4jItemReader
这Neo4jItemReader
是一个ItemReader
从图形数据库 Neo4j 中读取对象
通过使用分页技术。Spring Batch 提供了一个Neo4jItemReaderBuilder
自
构造Neo4jItemReader
.
MongoItemReader
这MongoItemReader
是一个ItemReader
使用
paging 技术。Spring Batch 提供了一个MongoItemReaderBuilder
构造
实例的MongoItemReader
.
HibernateCursorItemReader
这HibernateCursorItemReader
是一个ItemStreamReader
用于读取数据库记录
构建在 Hibernate 之上。它执行 HQL 查询,然后在初始化时迭代
作为read()
方法,依次返回一个对象
对应于当前行。Spring Batch 提供了一个HibernateCursorItemReaderBuilder
要构造HibernateCursorItemReader
.
数据库写入器
Spring Batch 提供以下数据库编写器:
Neo4jItemWriter
这Neo4jItemWriter
是一个ItemWriter
写入 Neo4j 数据库的实现。
Spring Batch 提供了一个Neo4jItemWriterBuilder
要构造Neo4jItemWriter
.
MongoItemWriter
这MongoItemWriter
是一个ItemWriter
写入 MongoDB 存储的实现
使用 Spring Data 的MongoOperations
.Spring Batch 提供了一个MongoItemWriterBuilder
要构造MongoItemWriter
.
RepositoryItemWriter
这RepositoryItemWriter
是一个ItemWriter
wrapper 的CrudRepository
从 Spring 开始
数据。Spring Batch 提供了一个RepositoryItemWriterBuilder
要构造
这RepositoryItemWriter
.
HibernateItemWriter
这HibernateItemWriter
是一个ItemWriter
,它使用 Hibernate 会话来保存或
更新不属于当前 Hibernate 会话的实体。Spring Batch 提供
一个HibernateItemWriterBuilder
要构造HibernateItemWriter
.
专业读者
Spring Batch 提供了以下专用阅读器:
LdifReader
这LdifReader
从Resource
,
解析它们,并返回一个LdapAttribute
object for eachread
执行。Spring Batch
提供LdifReaderBuilder
要构造LdifReader
.
专业作家
Spring Batch 提供了以下专门的编写器: