对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
了解消息流中的事务
Spring Integration 公开了几个钩子来满足消息流的事务需求。 为了更好地理解这些钩子以及如何从中受益,我们必须首先重新审视可用于启动消息流的六种机制,并了解如何在每种机制中满足这些流的事务需求。
以下六种机制启动消息流(本手册中提供了每种机制的详细信息):
-
网关代理:基本消息传递网关。
-
消息通道:与方法的直接交互(例如,)。
MessageChannel
channel.send(message)
-
消息发布者:作为 Spring Bean 上方法调用的副产品启动消息流的方法。
-
入站通道适配器和网关:基于将第三方系统与 Spring Integration 消息传递系统连接来启动消息流的方法(例如,)。
[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel
-
调度程序:根据预配置调度程序分发的调度事件来启动消息流的方法。
-
轮询器:与调度器类似,这是根据预配置的轮询器分发的调度或基于间隔的事件启动消息流的方法。
我们可以将这六种机制分为两大类:
-
由用户进程启动的消息流:此类别中的示例方案是调用网关方法或显式向 . 换言之,这些消息流依赖于要启动的第三方进程(例如您编写的某些代码)。
Message
MessageChannel
-
由守护进程启动的消息流:此类别中的示例场景包括轮询器轮询消息队列以使用轮询消息启动新消息流,或者调度程序通过创建新消息并在预定义时间启动消息流来调度进程。
显然,网关代理和都属于第一类,入站适配器和网关、调度程序和轮询器属于第二类。MessageChannel.send(…)
MessagePublisher
那么,如何满足每个类别中各种场景中的事务需求,Spring Integration 是否需要为特定场景的事务提供明确的东西? 或者你可以用 Spring 的事务支持来代替吗?
Spring 本身为事务管理提供了一流的支持。 因此,我们的目标不是提供新的东西,而是使用 Spring 从其现有的事务支持中受益。 换句话说,作为一个框架,我们必须向 Spring 的事务管理功能公开钩子。 但是,由于 Spring Integration 配置是基于 Spring 配置的,因此我们不需要总是暴露这些钩子,因为 Spring 已经暴露了它们。 毕竟,每个 Spring Integration 组件都是一个 Spring Bean。
考虑到这个目标,我们可以再次考虑两种情况:由用户进程启动的消息流和由守护进程启动的消息流。
由用户进程启动并在 Spring 应用程序上下文中配置的消息流受此类进程的通常事务配置的约束。
因此,Spring Integration 不需要显式配置它们来支持事务。
交易可以而且应该通过 Spring 的标准交易支持启动。
Spring 集成消息流自然遵循组件的事务语义,因为它本身是由 Spring 配置的。
例如,网关或服务激活器方法可以用 进行批注,或者可以在 XML 配置中使用指向应为事务的特定方法的切入表达式进行定义。
最重要的是,在这些方案中,您可以完全控制事务配置和边界。@Transactional
TransactionInterceptor
但是,当涉及到由守护进程启动的消息流时,情况就有些不同了。 尽管由开发人员配置,但这些流并不直接涉及要启动的人工或其他进程。 这些是基于触发器的流,由触发器进程(守护进程)根据进程的配置启动。 例如,我们可以让调度程序在每周五晚上启动消息流。 我们还可以配置一个触发器,每秒启动一次消息流,依此类推。 因此,我们需要一种方法让这些基于触发器的进程知道我们的意图,即使生成的消息流成为事务性的,以便在启动新消息流时可以创建事务上下文。 换句话说,我们需要公开一些事务配置,但只够委托给 Spring 已经提供的事务支持(就像我们在其他场景中所做的那样)。
轮询器事务支持
Spring Integration 为轮询器提供事务支持。
轮询器是一种特殊类型的组件,因为在轮询器任务中,我们可以对本身是事务性的资源进行调用,从而将调用包含在事务边界中,以便在任务失败时回滚。
如果我们要添加对通道的相同支持,则添加的事务将影响从调用开始的所有下游组件。
这为事务划分提供了相当广泛的范围,没有任何充分的理由,特别是当 Spring 已经提供了几种方法来满足下游任何组件的事务需求时。
但是,包含在事务边界中的方法是轮询器的“有力理由”。receive()
receive()
send()
receive()
每当配置轮询器时,都可以使用子元素及其属性提供事务配置,如以下示例所示:transactional
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
前面的配置看起来类似于本机 Spring 事务配置。
您仍必须提供对事务管理器的引用,并指定事务属性或依赖缺省值(例如,如果未指定“transaction-manager”属性,则默认为名为“transactionManager”的 Bean)。
在内部,该进程被包装在 Spring 的本机事务中,其中负责处理事务。
有关如何配置事务管理器、事务管理器类型(如 JTA、Datasource 等)以及与事务配置相关的其他详细信息的更多信息,请参阅 Spring Framework 参考指南。TransactionInterceptor
使用上述配置,此轮询器启动的所有消息流都是事务性的。 有关轮询器事务配置的详细信息和详细信息,请参阅轮询和事务。
除了事务之外,在运行轮询器时,您可能需要解决更多跨领域问题。
为此,轮询器元素接受一个子元素,该子元素允许您定义要应用于轮询器的自定义建议实例链。
(有关详细信息,请参阅可轮询消息源。
在 Spring Integration 2.0 中,轮询器经历了重构工作,现在使用代理机制来解决事务问题以及其他跨领域问题。
从这项工作中演变而来的重大变化之一是,我们使 和 元素相互排斥。
这背后的基本原理是,如果您需要多个建议,其中一个是交易建议,您可以将其包含在与以前相同的便利性中,但具有更多的控制权,因为您现在可以选择将建议按所需的顺序放置。
以下示例演示如何执行此操作:<advice-chain>
<transactional>
<advice-chain>
<advice-chain>
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
前面的示例显示了Spring Transaction advice()的基于XML的基本配置,并将其包含在轮询器定义的配置中。
如果您只需要解决轮询器的事务问题,您仍然可以使用该元素作为方便。txAdvice
<advice-chain>
<transactional>
事务边界
另一个重要因素是消息流中事务的边界。 启动事务时,事务上下文绑定到当前线程。 因此,无论您的消息流中有多少个端点和通道,只要您确保流在同一线程上继续,您的事务上下文就会被保留。 一旦您通过引入 Pollable Channel 或 Executor Channel 来破坏它,或者在某些服务中手动启动新线程,事务边界也将被打破。 从本质上讲,事务将在那里结束,如果线程之间发生了成功的交接,则该流将被视为成功,并且将发送 COMMIT 信号,即使该流将继续并且仍可能导致下游某处的异常。 如果这样的流是同步的,那么该异常可以被抛回给消息流的发起者,该发起者也是事务上下文的发起者,并且事务将导致回滚。 中间立场是在线程边界被破坏的任何点使用事务通道。 例如,可以使用委托给事务性 MessageStore 策略的队列支持的通道,也可以使用 JMS 支持的通道。
事务同步
在某些环境中,它有助于将操作与包含整个流的事务同步。
例如,考虑在执行多个数据库更新的流的开头。
如果事务提交,我们可能希望将文件移动到目录,而如果事务回滚,我们可能希望将其移动到目录。<file:inbound-channel-adapter/>
success
failure
Spring Integration 2.2 引入了将这些操作与事务同步的功能。
此外,如果您没有“真实”事务,但仍希望在成功或失败时执行不同的操作,则可以配置。
有关详细信息,请参阅伪事务。PseudoTransactionManager
以下列表显示了此功能的关键策略接口:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
工厂负责创建 TransactionSynchronization
对象。
您可以实现自己的框架,也可以使用框架提供的框架:。
此实现返回一个委托给默认实现 : 的实现。
此处理器支持三个 SpEL 表达式:、 和 。DefaultTransactionSynchronizationFactory
TransactionSynchronization
TransactionSynchronizationProcessor
ExpressionEvaluatingTransactionSynchronizationProcessor
beforeCommitExpression
afterCommitExpression
afterRollbackExpression
对于熟悉交易的人来说,这些操作应该是不言自明的。
在每种情况下,变量都是原始的 。
在某些情况下,其他 SpEL 变量可用,具体取决于轮询器正在轮询的内容。
例如,提供变量,该变量引用消息源的 .
同样,提供变量,该变量引用由轮询创建的变量。#root
Message
MessageSource
MongoDbMessageSource
#mongoTemplate
MongoTemplate
RedisStoreMessageSource
#store
RedisStore
若要为特定轮询器启用该功能,可以使用该属性提供对 on 轮询器元素的引用。TransactionSynchronizationFactory
<transactional/>
synchronization-factory
从版本 5.0 开始,Spring Integration 提供 ,默认情况下,当未配置但建议链中存在某种类型的建议时,该 将应用于轮询端点。
使用任何现成的实现时,轮询终结点将轮询消息绑定到当前事务上下文,并在事务建议后引发异常时将其作为 in 提供。
使用未实现的自定义事务建议时,可以显式配置 a 来实现此行为。
无论哪种情况,都会成为发送到 的 的有效负载,原因是建议引发的原始异常。
以前,有效负载是建议抛出的原始异常,并且不提供对信息的引用,因此很难确定事务提交问题的原因。PassThroughTransactionSynchronizationFactory
TransactionSynchronizationFactory
TransactionInterceptor
TransactionSynchronizationFactory
failedMessage
MessagingException
TransactionInterceptor
PassThroughTransactionSynchronizationFactory
MessagingException
ErrorMessage
errorChannel
ErrorMessage
failedMessage
为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。 以下示例演示如何使用命名空间配置文件入站通道适配器:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 评估的结果将作为有效负载发送到 or(在本例中,这将是 or — 方法调用的结果)。committedChannel
rolledBackChannel
Boolean.TRUE
Boolean.FALSE
java.io.File.renameTo()
如果您希望发送整个有效负载以进行进一步的 Spring Integration 处理,请使用“有效负载”表达式。
重要的是要了解这会将操作与事务同步。 它不会使本质上不是事务性的资源实际上成为事务性的。 相反,事务(无论是 JDBC 还是其他事务)在轮询之前启动,并在流完成时提交或回滚,然后是同步操作。 如果提供自定义 ,则它负责创建资源同步,该同步会导致绑定的资源在事务完成时自动解绑。
默认值通过返回 的子类来实现,默认值返回 。 |
除了 和 表达式之外,还支持。
在这种情况下,如果评估(或下游处理)引发异常,则会回滚事务,而不是提交事务。after-commit
after-rollback
before-commit
重要的是要了解这会将操作与事务同步。 它不会使本质上不是事务性的资源实际上成为事务性的。 相反,事务(无论是 JDBC 还是其他事务)在轮询之前启动,并在流完成时提交或回滚,然后是同步操作。 如果提供自定义 ,则它负责创建资源同步,该同步会导致绑定的资源在事务完成时自动解绑。
默认值通过返回 的子类来实现,默认值返回 。 |
伪交易
阅读“事务同步”部分后,您可能会认为在流完成时执行这些“成功”或“失败”操作会很有用,即使轮询器下游没有“实际”事务资源(如 JDBC)。 例如,考虑“<file:inbound-channel-adapter/>”后跟“<ftp:outbout-channel-adapter/>”。 这些组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。
为了提供此功能,该框架提供了一个 ,即使不涉及真正的事务资源,也可以启用上述配置。
如果流程正常完成,则调用 和 同步。
失败时,将调用同步。
因为它不是真正的事务,所以不会发生实际的提交或回滚。
伪事务是用于启用同步功能的工具。PseudoTransactionManager
beforeCommit
afterCommit
afterRollback
要使用 ,您可以将其定义为 <bean/>,就像配置真正的事务管理器一样。
以下示例演示如何执行此操作:PseudoTransactionManager
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
反应式事务
从版本 5.3 开始,还可以将 a 与返回响应式类型的端点的建议一起使用。
这包括 和 实现(例如 ),它们生成带有 或 有效负载的消息。
当其回复有效负载也是某种反应式类型时,所有其他生成回复的消息处理程序实现都可以依赖 a。ReactiveTransactionManager
TransactionInterceptor
MessageSource
ReactiveMessageHandler
ReactiveMongoDbMessageSource
Flux
Mono
ReactiveTransactionManager