事务支持

事务支持

本章介绍了 Spring 集成对事务的支持。 它涵盖以下主题:spring-doc.cadn.net.cn

了解消息流中的事务

Spring 集成公开了几个钩子来解决消息流的事务需求。 为了更好地理解这些钩子以及如何从中受益,我们必须首先重新审视可用于启动消息流的 6 种机制,并了解如何在每种机制中满足这些流的事务需求。spring-doc.cadn.net.cn

以下 6 种机制启动消息流(本手册中提供了每种机制的详细信息):spring-doc.cadn.net.cn

  • Gateway proxy(网关代理):基本的消息传递网关。spring-doc.cadn.net.cn

  • 消息通道:与MessageChannel方法(例如channel.send(message)).spring-doc.cadn.net.cn

  • 消息发布者:在 Spring Bean 上启动消息流作为方法调用的副产品的方法。spring-doc.cadn.net.cn

  • 入站通道适配器和网关:基于将第三方系统与 Spring Integration 消息传递系统(例如[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel).spring-doc.cadn.net.cn

  • Scheduler:根据预配置的调度程序分发的事件来启动消息流的方法。spring-doc.cadn.net.cn

  • Poller:与调度器类似,这是根据预配置的 Poller 分发的调度或基于间隔的事件来启动消息流的方法。spring-doc.cadn.net.cn

我们可以将这六种机制分为两大类:spring-doc.cadn.net.cn

  • 由用户进程发起的消息流:此类别中的示例场景是调用网关方法或显式发送Message更改为MessageChannel. 换句话说,这些消息流依赖于要启动的第三方进程(例如,您编写的某些代码)。spring-doc.cadn.net.cn

  • 守护进程启动的消息流:此类别中的示例场景包括轮询器轮询消息队列以使用轮询的消息启动新的消息流,或者调度程序通过创建新消息并在预定义的时间启动消息流来调度进程。spring-doc.cadn.net.cn

显然是网关代理MessageChannel.send(…​)MessagePublisher都属于第一类,入站适配器和网关、调度程序和 Poller 属于第二类。spring-doc.cadn.net.cn

那么,如何满足每个类别中各种场景中的事务需求,以及 Spring Integration 是否需要为特定场景的事务提供明确的东西呢? 或者,您可以使用 Spring 的事务支持吗?spring-doc.cadn.net.cn

Spring 本身为事务 Management 提供了一流的支持。 因此,我们的目标不是提供新的东西,而是使用 Spring 从其现有的事务支持中受益。 换句话说,作为一个框架,我们必须将钩子公开给 Spring 的事务管理功能。 但是,由于 Spring Integration 配置基于 Spring 配置,因此我们不需要总是公开这些钩子,因为 Spring 已经公开了它们。 毕竟,每个 Spring 集成组件都是一个 Spring Bean。spring-doc.cadn.net.cn

考虑到这个目标,我们可以再次考虑两种情况:由用户进程发起的消息流和由守护进程发起的消息流。spring-doc.cadn.net.cn

由用户进程启动并在 Spring 应用程序上下文中配置的消息流受此类进程的通常事务配置的约束。 因此,Spring 集成不需要显式配置它们来支持事务。 事务可以而且应该通过 Spring 的标准事务支持来启动。 Spring 集成消息流自然地遵循组件的事务语义,因为它本身是由 Spring 配置的。 例如,网关或服务激活器方法可以注释为@TransactionalTransactionInterceptor可以在 XML 配置中定义,其中包含指向应为事务性的特定方法的切入点表达式。 最重要的是,在这些情况下,您可以完全控制事务配置和边界。spring-doc.cadn.net.cn

但是,当涉及到由 daemon 进程启动的消息流时,情况略有不同。 尽管这些流程由开发人员配置,但并不直接涉及要启动的人工或其他流程。 这些是基于触发器的流,由触发器进程(守护进程)根据进程的配置启动。 例如,我们可以让调度程序在每个星期五晚上启动一个消息流。 我们还可以配置一个触发器,每秒启动一次消息流,依此类推。 因此,我们需要一种方法让这些基于触发器的流程知道我们打算使生成的消息流成为事务性的,以便在启动新的消息流时可以创建一个 Transaction 上下文。 换句话说,我们需要公开一些事务配置,但仅限于委托给 Spring 已经提供的事务支持(就像我们在其他场景中所做的那样)。spring-doc.cadn.net.cn

Poller 事务支持

Spring 集成为 Poller 提供事务支持。 Poller 是一种特殊类型的组件,因为在 Poller 任务中,我们可以调用receive()针对本身是事务性的资源,因此包括receive()调用事务的边界,这允许它在任务失败时回滚。 如果我们要添加对 channels 的相同支持,则添加的事务将影响所有以send()叫。 这为事务划分提供了相当广泛的范围,而没有任何充分的理由,特别是当 Spring 已经提供了几种方法来满足任何下游组件的事务需求时。 但是,receive()方法被包含在事务边界中是 Poller 的“重要理由”。spring-doc.cadn.net.cn

任何时候配置 Poller 时,都可以使用transactionalchild 元素及其属性,如下例所示:spring-doc.cadn.net.cn

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager"
                   isolation="DEFAULT"
                   propagation="REQUIRED"
                   read-only="true"
                   timeout="1000"/>
</poller>

前面的配置看起来类似于本机 Spring 事务配置。 您仍然必须提供对事务管理器的引用,并指定事务属性或依赖默认值(例如,如果未指定 'transaction-manager' 属性,则默认为名为 'transactionManager' 的 Bean)。 在内部,该进程被包装在 Spring 的本机事务中,其中TransactionInterceptor负责处理交易。 有关如何配置事务管理器、事务管理器的类型(例如 JTA、Datasource 等)以及与事务配置相关的其他详细信息的更多信息,请参见 Spring 框架参考指南spring-doc.cadn.net.cn

在前面的配置中,此 Poller 启动的所有消息流都是事务性的。 有关 Poller 的事务配置的更多信息和详细信息,请参见Polling and Transactionsspring-doc.cadn.net.cn

除了事务之外,在运行 Poller 时,你可能需要解决更多的横切关注点。 为了帮助解决这个问题,poller 元素接受一个<advice-chain>child 元素,它允许您定义要应用于 Poller 的自定义通知实例链。 (有关更多详细信息,请参阅 Pollable Message Source。 在 Spring Integration 2.0 中,Poller 经历了重构工作,现在使用代理机制来解决事务关注点以及其他横切关注点。 这项工作产生的重大变化之一是,我们使<transactional><advice-chain>元素是互斥的。 这背后的基本原理是,如果您需要多个建议,其中一个是 Transaction advice,您可以将其包含在<advice-chain>与以前一样方便,但具有更多的控制权,因为您现在可以选择按所需的顺序放置建议。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someOtherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

前面的示例显示了 Spring Transaction 建议 (txAdvice) 并将其包含在<advice-chain>由 Poller 定义。 如果只需要解决 Poller 的事务性问题,您仍然可以使用<transactional>元素。spring-doc.cadn.net.cn

事务边界

另一个重要因素是 Message 流中 Transactions 的边界。 当事务启动时,事务上下文将绑定到当前线程。 因此,无论您的 Message 流中有多少个端点和通道,只要您确保流在同一线程上继续,您的事务上下文就会被保留。 一旦你通过引入 Pollable ChannelExecutor Channel 来打破它,或者在某些服务中手动启动一个新线程,事务边界也会被打破。 从本质上讲,Transaction 将就在那里结束,如果在线程之间成功进行了切换,则该流将被视为成功,并且将发送 COMMIT 信号,即使该流将继续并且仍可能导致下游某个地方出现 Exception。 如果这样的流是同步的,那么该 Exception 可能会被抛回到 Message 流的发起者,该发起者也是事务上下文的发起者,并且该事务将导致 ROLLBACK。 中间立场是在打破线程边界的任何点使用事务通道。 例如,您可以使用委托给事务性 MessageStore 策略的 Queue-backed Channel,也可以使用 JMS 支持的通道。spring-doc.cadn.net.cn

事务同步

在某些环境中,它有助于将作与包含整个流的事务同步。 例如,考虑一个<file:inbound-channel-adapter/>在执行大量数据库更新的流程开始时。 如果事务提交,我们可能需要将文件移动到success目录中,而我们可能希望将其移动到failure目录(如果事务回滚)。spring-doc.cadn.net.cn

Spring Integration 2.2 引入了将这些作与事务同步的功能。 此外,您还可以配置PseudoTransactionManager如果您没有 “真实” 事务,但仍希望对成功或失败执行不同的作。 有关更多信息,请参阅伪交易spring-doc.cadn.net.cn

下面的清单显示了此功能的关键 strategy 接口:spring-doc.cadn.net.cn

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

工厂负责创建一个TransactionSynchronization对象。 您可以实现自己的 API,也可以使用框架提供的 API:DefaultTransactionSynchronizationFactory. 此实现返回一个TransactionSynchronization委托给TransactionSynchronizationProcessor:ExpressionEvaluatingTransactionSynchronizationProcessor. 此处理器支持三种 SpEL 表达式:beforeCommitExpression,afterCommitExpressionafterRollbackExpression.spring-doc.cadn.net.cn

对于熟悉交易的人来说,这些作应该是不言自明的。 在每种情况下,#rootvariable 是原始的Message. 在某些情况下,其他 SPEL 变量可用,具体取决于MessageSource正在被轮询者轮询。 例如,MongoDbMessageSource提供#mongoTemplate变量,该变量引用消息源的MongoTemplate. 同样,RedisStoreMessageSource提供#store变量,该变量引用RedisStore由 The Poll 创建。spring-doc.cadn.net.cn

要为特定的 Poller 启用该功能,可以提供对TransactionSynchronizationFactory在 Poller 的<transactional/>元素。synchronization-factory属性。spring-doc.cadn.net.cn

从版本 5.0 开始, Spring 集成提供了PassThroughTransactionSynchronizationFactory,默认情况下,当 NoTransactionSynchronizationFactory已配置,但TransactionInterceptor存在于建议链中。 当使用任何开箱即用的TransactionSynchronizationFactory实现中,轮询端点将轮询消息绑定到当前事务上下文,并将其作为failedMessageMessagingException如果在事务建议之后抛出异常。 当使用未实现的自定义交易通知时TransactionInterceptor中,您可以显式配置PassThroughTransactionSynchronizationFactory以实现此行为。 在任何一种情况下,MessagingExceptionErrorMessage发送到errorChannel,原因是 advice 抛出的原始异常。 以前,ErrorMessage的有效负载是通知引发的原始异常,并且未提供对failedMessage信息,因此很难确定事务提交问题的原因。spring-doc.cadn.net.cn

为了简化这些组件的配置, Spring 集成为默认工厂提供了名称空间支持。 下面的示例展示了如何使用名称空间来配置文件入站通道适配器:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

SPEL 评估的结果将作为有效负载发送到committedChannelrolledBackChannel(在本例中,这将是Boolean.TRUEBoolean.FALSE— 其结果java.io.File.renameTo()method 调用)。spring-doc.cadn.net.cn

如果你希望发送整个有效负载以进行进一步的 Spring 集成处理,请使用'payload'表达式。spring-doc.cadn.net.cn

请务必了解这会将作与事务同步。 它不会使本质上不是事务性的资源实际上是事务性的。 相反,事务(无论是 JDBC 还是其他事务)在轮询之前启动,并在流完成时提交或回滚,然后是同步作。spring-doc.cadn.net.cn

如果您提供自定义TransactionSynchronizationFactory,它负责创建一个资源同步,导致 Bound 的资源在事务完成时自动解绑。 默认的TransactionSynchronizationFactory为此,请返回ResourceHolderSynchronization,使用默认的shouldUnbindAtCompletion()返回true.spring-doc.cadn.net.cn

除了after-commitafter-rollback表达 式before-commit也受支持。 在这种情况下,如果评估(或下游处理)引发异常,则事务将回滚而不是提交。spring-doc.cadn.net.cn

伪交易

阅读 Transaction Synchronization 部分后,您可能会认为在流完成时执行这些 “成功” 或 “失败”作会很有用,即使 Poller 下游没有 “真正的” 事务资源(例如 JDBC)。 例如,考虑一个 “<file:inbound-channel-adapter/>” 后跟一个 “<ftp:outbout-channel-adapter/>”。 这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。spring-doc.cadn.net.cn

为了提供此功能,框架提供了一个PseudoTransactionManager,即使不涉及实际的事务资源,也可以启用上述配置。 如果流程正常完成,则beforeCommitafterCommit调用 synchronizations。 失败时,afterRollback调用 synchronization。 因为它不是真正的事务,所以不会发生实际的提交或回滚。 伪交易是用于启用同步功能的工具。spring-doc.cadn.net.cn

要使用PseudoTransactionManager,您可以将其定义为 <bean/>,就像配置真正的事务管理器一样。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

反应式事务

从版本 5.3 开始,ReactiveTransactionManager也可以与TransactionInterceptor返回反应式类型的端点的建议。 这包括MessageSourceReactiveMessageHandlerimplementations (例如ReactiveMongoDbMessageSource),这会生成一条带有FluxMono有效载荷。 所有其他生成回复的消息处理程序实现都可以依赖于ReactiveTransactionManager当他们的 reply payload 也是某种 reactive 类型时。spring-doc.cadn.net.cn