常见模式

常见批处理模式

一些 Batch Job 可以完全从 Spring Batch 中的现成组件组装。 例如,ItemReaderItemWriterimplementation 可以配置为 涵盖广泛的场景。但是,在大多数情况下,自定义代码必须为 写。应用程序开发人员的主要 API 入口点是TaskletItemReaderItemWriter和各种侦听器接口。最简单的批处理 作业可以使用来自 Spring Batch 的现成 ImportingItemReader,但它通常是 在处理和写入中存在需要开发人员的自定义关注点 要实现ItemWriterItemProcessor.spring-doc.cadn.net.cn

在本章中,我们提供了自定义业务逻辑中常见模式的一些示例。 这些示例主要包含侦听器接口。应该注意的是,ItemReaderItemWriter如果合适,也可以实现侦听器接口。spring-doc.cadn.net.cn

记录项处理和失败

一个常见的用例是需要对步骤中的错误进行特殊处理,逐项, 也许是登录到特殊通道或将记录插入数据库。一个 面向块Step(从 Step Factory Bean 创建)允许用户实现此用途 case 替换为简单的ItemReadListener对于 错误read以及一个ItemWriteListener为 错误write.以下代码片段说明了一个侦听器,该侦听器将两个 read 和写入失败:spring-doc.cadn.net.cn

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

实施此侦听器后,必须将其注册到一个步骤中。spring-doc.cadn.net.cn

以下示例演示如何在 XML 中注册一个步骤的侦听器:spring-doc.cadn.net.cn

XML 配置
<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>

以下示例显示了如何向步骤 Java 注册侦听器:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step simpleStep() {
	return this.stepBuilderFactory.get("simpleStep")
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}
如果你的监听器在onError()方法,它必须在 将要回滚的事务。如果您需要使用事务性 资源(例如数据库)位于onError()方法,请考虑添加一个声明式 transaction 传递给该方法(有关详细信息,请参见 Spring Core 参考指南),并为其 propagation 属性的值为REQUIRES_NEW.

出于业务原因手动停止作业

Spring Batch 提供了一个stop()方法通过JobOperator接口,但这是 真正供作员使用,而不是供应用程序程序员使用。有时,它是 从企业内部停止任务执行更方便或更有意义 逻辑。spring-doc.cadn.net.cn

最简单的方法是抛出一个RuntimeException(两者都未重试 无限期地跳过)。例如,可以使用自定义异常类型,如下所示 在以下示例中:spring-doc.cadn.net.cn

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

阻止步骤执行的另一种简单方法是返回nullItemReader,如以下示例所示:spring-doc.cadn.net.cn

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

前面的示例实际上依赖于存在默认实现的事实 的CompletionPolicy策略,当项目要 processed 是null.可以实施更复杂的完成策略,并且 注入到Step通过SimpleStepFactoryBean.spring-doc.cadn.net.cn

以下示例显示了如何将完成策略注入到 XML 中的步骤中:spring-doc.cadn.net.cn

XML 配置
<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

以下示例显示了如何将完成策略注入 Java 中的步骤:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step simpleStep() {
	return this.stepBuilderFactory.get("simpleStep")
				.<String, String>chunk(new SpecialCompletionPolicy())
				.reader(reader())
				.writer(writer())
				.build();
}

另一种方法是在StepExecution,它由Step在框架中实现 Item Processing。实现这个 或者,我们需要访问当前的StepExecution,这可以通过 实现StepListener并将其注册到Step.以下示例 显示设置标志的侦听器:spring-doc.cadn.net.cn

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

设置标志后,默认行为是步骤抛出JobInterruptedException.此行为可以通过StepInterruptionPolicy.但是,唯一的选择是抛出或不抛出异常 所以这总是一个不正常的工作结局。spring-doc.cadn.net.cn

添加页脚记录

通常,在写入平面文件时,必须在 文件,在所有处理完成后。这可以使用FlatFileFooterCallback由 Spring Batch 提供的接口。这FlatFileFooterCallback(及其对应的FlatFileHeaderCallback) 是FlatFileItemWriter,并且可以添加到项写入器中。spring-doc.cadn.net.cn

以下示例演示如何使用FlatFileHeaderCallbackFlatFileFooterCallback在 XML 中:spring-doc.cadn.net.cn

XML 配置
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

以下示例演示如何使用FlatFileHeaderCallbackFlatFileFooterCallback在 Java 中:spring-doc.cadn.net.cn

Java 配置
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}

页脚回调接口只有一个方法,当页脚必须为 写入,如以下接口定义所示:spring-doc.cadn.net.cn

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

编写摘要页脚

涉及页脚记录的一个常见要求是在 output 进程,并将此信息附加到文件末尾。此页脚通常 用作文件的摘要或提供校验和。spring-doc.cadn.net.cn

例如,如果批处理作业正在写入Traderecords 添加到平面文件中,并且有一个 要求所有Trades放置在页脚中,则 以后ItemWriterimplementation 可以使用:spring-doc.cadn.net.cn

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(List<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

TradeItemWriter存储一个totalAmount值,该值随amount从每个Tradeitem 写入。在最后一个Trade时,框架会调用writeFooter,它将totalAmount放入文件中。请注意,write方法 使用临时变量chunkTotal,该Tradeamounts 的 Quants。这样做是为了确保,如果write方法、totalAmount保持不变。它仅在write方法中,一旦我们保证不会抛出异常,我们就会更新totalAmount.spring-doc.cadn.net.cn

为了让writeFooter方法,则TradeItemWriter(其中 实现FlatFileFooterCallback) 必须连接到FlatFileItemWriter作为footerCallback.spring-doc.cadn.net.cn

以下示例显示了如何连接TradeItemWriter在 XML 中:spring-doc.cadn.net.cn

XML 配置
<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

以下示例显示了如何连接TradeItemWriter在 Java 中:spring-doc.cadn.net.cn

Java 配置
@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}

这种方式TradeItemWriter到目前为止,只有在以下情况下才能正确运行 这Step不可重新启动。这是因为该类是有状态的(因为它存储了totalAmount),但totalAmount不会持久化到数据库中。因此,它 在重新启动时无法检索。为了使此类可重启, 这ItemStreaminterface 应该与方法openupdate,如以下示例所示:spring-doc.cadn.net.cn

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

update 方法存储最新版本的totalAmountExecutionContext就在该对象被持久化到数据库之前。open 方法 检索任何现有的totalAmountExecutionContext并将其用作 开始处理,允许TradeItemWriter要在 restart 时拾取,其中 它上次离开了Step运行。spring-doc.cadn.net.cn

驱动基于查询的 ItemReader

关于 reader 和 writer 的章节中,使用 讨论了分页。许多数据库供应商(比如 DB2)都非常悲观 如果正在读取的表也需要由 在线应用程序的其他部分。此外,打开光标 大型数据集可能会导致某些供应商的数据库出现问题。因此,许多 项目更喜欢使用 'Driving Query' 方法来读取数据。这种方法有效 通过迭代 key,而不是需要返回的整个对象,作为 下图说明:spring-doc.cadn.net.cn

驱动查询作业
图 1.驱动查询作业

如您所见,上图中显示的示例使用与原来的 'FOO' 表相同的 在基于游标的示例中使用。但是,不是选择整行,而是只选择 在 SQL 语句中选择了 ID。因此,而不是FOO返回的对象 从readInteger返回。然后,此数字可用于查询 'details',这是一个完整的Foo对象,如下图所示:spring-doc.cadn.net.cn

驱动查询示例
图 2.驱动查询示例

ItemProcessor应用于转换从 driving 查询获取的 Key 转换为完整的Foo对象。现有的 DAO 可用于查询基于完整对象的 在键上。spring-doc.cadn.net.cn

多行记录

虽然平面文件通常每个记录都局限于单个 行,则文件可能具有跨多行且多个 格式。以下文件摘录显示了这种安排的示例:spring-doc.cadn.net.cn

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

以 'HEA' 开头的行和以 'FOT' 开头的行之间的所有内容都是 被视为 1 条记录。为了 正确处理这种情况:spring-doc.cadn.net.cn

  • 不是一次读取一条记录,ItemReader必须阅读 multi-line record 作为一个组,以便可以将其传递给ItemWriter完整。spring-doc.cadn.net.cn

  • 每种行类型可能需要以不同的方式进行标记。spring-doc.cadn.net.cn

因为一条记录跨越多行,而且我们可能不知道有多少行 有,ItemReader必须小心,始终读取整个记录。为了 执行此作,自定义ItemReader应作为FlatFileItemReader.spring-doc.cadn.net.cn

以下示例演示如何实现自定义ItemReader在 XML 中:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

以下示例演示如何实现自定义ItemReader在 Java 中:spring-doc.cadn.net.cn

Java 配置
@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}

确保每一行都正确地标记化,这对于 fixed-length 输入时,将PatternMatchingCompositeLineTokenizer可用于 委托FlatFileItemReader.看FlatFileItemReader在 Reader 和 作家章节了解更多详情。然后,委托读者使用PassThroughFieldSetMapper要交付FieldSet对于每行返回换行ItemReader.spring-doc.cadn.net.cn

以下示例说明如何确保在 XML 中正确标记每一行:spring-doc.cadn.net.cn

XML 内容
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

以下示例说明如何确保每 X 行在 Java 中正确标记化:spring-doc.cadn.net.cn

Java 内容
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}

这个包装器必须能够识别记录的结尾,这样它才能持续地 叫read()在其委托上,直到到达末尾。对于读取的每一行, wrapper 构建要返回的项目。到达页脚后,项目可以 退回以交付给ItemProcessorItemWriter,如 以下示例:spring-doc.cadn.net.cn

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

执行系统命令

许多批处理作业要求从批处理作业中调用外部命令。 这样的过程可以由调度程序单独启动,但 有关运行的常见元数据将丢失。此外,多步骤作业也将 还需要拆分为多个 Job。spring-doc.cadn.net.cn

由于需求非常普遍,Spring Batch 提供了一个Taskletimplementation for 调用 system 命令。spring-doc.cadn.net.cn

以下示例演示如何在 XML 中调用外部命令:spring-doc.cadn.net.cn

XML 配置
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

以下示例演示如何在 Java 中调用外部命令:spring-doc.cadn.net.cn

Java 配置
@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}

在未找到输入时处理步骤完成

在许多批处理方案中,在数据库或文件中找不到要处理的行不是 特殊。这Step只是认为没有找到任何工作并以 0 完成 items 已读取。所有ItemReaderSpring 中开箱即用的实现 Batch 默认使用此方法。如果未写出任何内容,这可能会导致一些混淆 即使存在输入(如果文件命名错误或类似名称,通常会发生这种情况 问题出现)。因此,应检查元数据本身以确定如何 框架发现很多工作要处理。但是,如果未找到输入 被认为是例外的?在这种情况下,以编程方式检查元数据中是否有项目 处理并导致失败是最好的解决方案。因为这是一个常见的用例, Spring Batch 提供了具有此功能的侦听器,如 的类定义NoWorkFoundStepExecutionListener:spring-doc.cadn.net.cn

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

前面的StepExecutionListener检查readCount属性的StepExecution在 'afterStep' 阶段确定是否未读取任何项目。如果那 在这种情况下,则退出代码FAILED返回,表示Step应该失败。 否则null返回,这不会影响Step.spring-doc.cadn.net.cn

将数据传递给 Future Steps

将信息从一个步骤传递到另一个步骤通常很有用。这可以通过以下方式完成 这ExecutionContext.问题是有两个ExecutionContexts:一个位于Step级别,一个位于Job水平。这Step ExecutionContext仅保持为 long 作为 step 执行,而Job ExecutionContext贯穿整个Job.上 另一方面,Step ExecutionContext每次Step提交一个 块,而Job ExecutionContext仅在每个Step.spring-doc.cadn.net.cn

这种分离的结果是,所有数据都必须放在Step ExecutionContext虽然Step正在执行。这样做可确保数据 妥善存放,而Step运行。如果数据存储到Job ExecutionContext, ,则在Step执行。如果Step失败,则该数据将丢失。spring-doc.cadn.net.cn

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(List<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

使数据可供将来使用Steps,它必须被“提升”到Job ExecutionContext在步骤完成后。Spring Batch 提供了ExecutionContextPromotionListener为此目的。必须配置 Listener 的 key 与ExecutionContext这必须得到推广。它可以 此外,还可以选择配置一个退出代码模式列表,该列表的升级 应该发生 (COMPLETED是默认值)。与所有侦听器一样,它必须注册 在Step.spring-doc.cadn.net.cn

以下示例演示如何将步骤提升为Job ExecutionContext在 XML 中:spring-doc.cadn.net.cn

XML 配置
<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

以下示例演示如何将步骤提升为Job ExecutionContext在 Java 中:spring-doc.cadn.net.cn

Java 配置
@Bean
public Job job1() {
	return this.jobBuilderFactory.get("job1")
				.start(step1())
				.next(step1())
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}

最后,必须从Job ExecutionContext,如图所示 在以下示例中:spring-doc.cadn.net.cn

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(List<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}