交易
Spring Rabbit 框架支持同步和异步用例中的自动事务管理,具有许多不同的语义,这些语义可以通过声明方式进行选择,正如 Spring 事务的现有用户所熟悉的那样。 这使得许多(如果不是最常见的)消息传递模式易于实现。
有两种方法可以向框架发出所需的事务语义信号。
在这两个RabbitTemplate
和SimpleMessageListenerContainer
,则有一个标志channelTransacted
which、iftrue
,告诉框架使用事务通道,并通过提交或回滚(取决于结果)结束所有作(发送或接收),并发出回滚信号的异常。
另一个信号是使用 Spring 的PlatformTransactionManager
implementations 作为正在进行的作的上下文。
如果在框架发送或接收消息时已经有一个事务正在进行中,并且channelTransacted
flag 为true
,则消息收发事务的提交或回滚将推迟到当前事务结束。
如果channelTransacted
flag 为false
,没有事务语义适用于消息传递作(它是自动确认的)。
这channelTransacted
flag 是配置时间设置。
它在创建 AMQP 组件时(通常在应用程序启动时)声明和处理一次。
外部事务原则上是动态的,因为系统在运行时响应当前线程状态。
但是,在实践中,当事务以声明方式分层到应用程序上时,它通常也是一种配置设置。
对于具有RabbitTemplate
,外部事务由调用者根据口味(通常的 Spring 事务模型)以声明式或命令式方式提供。
以下示例显示了一种声明式方法(通常是首选的,因为它是非侵入性的),其中模板已配置了channelTransacted=true
:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在前面的示例中,String
payload 在标记为@Transactional
.
如果数据库处理失败并出现异常,则传入消息将返回给代理,并且不会发送传出消息。
这适用于使用RabbitTemplate
在事务方法链中(例如,除非Channel
直接作以提前提交事务)。
对于具有SimpleMessageListenerContainer
,如果需要外部事务,则必须由容器在设置侦听器时请求该事务。
为了表示需要外部事务,用户提供了PlatformTransactionManager
添加到容器。
以下示例显示了如何执行此作:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在前面的示例中,事务管理器被添加为从另一个 bean 定义注入的依赖项(未显示),并且channelTransacted
flag 也被设置为true
.
其效果是,如果侦听器失败并出现异常,则事务将回滚,并且消息也会返回给代理。
值得注意的是,如果事务提交失败(例如,由于
数据库约束错误或连接问题),则 AMQP 事务也会回滚,并将消息返回给代理。
这有时被称为“尽力而为的 1 阶段提交”,是一种非常强大的可靠消息传递模式。
如果channelTransacted
flag 设置为false
(默认值)在前面的示例中,仍将为侦听器提供外部事务,但所有消息传递作都将自动确认,因此效果是即使在业务作回滚时也提交消息传递作。
条件回滚
在 1.6.6 版本之前,将回滚规则添加到容器的transactionAttribute
当使用外部事务管理器(例如 JDBC)时,没有任何效果。
异常始终回滚事务。
此外,当在容器的通知链中使用事务通知时,条件回滚不是很有用,因为所有侦听器异常都包装在一个ListenerExecutionFailedException
.
第一个问题已得到纠正,规则现已正确应用。
此外,ListenerFailedRuleBasedTransactionAttribute
。
它是RuleBasedTransactionAttribute
,唯一的区别是它知道ListenerExecutionFailedException
并使用规则的此类异常的原因。
此 transaction 属性可以直接在容器中使用,也可以通过 transaction advice 使用。
以下示例使用此规则:
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
关于回滚已接收消息的说明
AMQP 事务仅适用于发送到 broker 的消息和 ack。
因此,当 Spring 事务回滚并且已经收到消息时, Spring AMQP 不仅必须回滚事务,还必须手动拒绝消息(有点诀窍,但这不是规范所说的)。
对消息拒绝执行的作独立于事务,并且取决于defaultRequeueRejected
属性(默认值:true
).
有关拒绝失败消息的更多信息,请参阅消息侦听器和异步情况。
有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ 代理语义。
在 RabbitMQ 2.7.0 之前,此类消息(以及在通道关闭或中止时未确认的任何消息)将放在 Rabbit 代理队列的后面。 从 2.7.0 开始,被拒绝的消息将排在队列的前面,其方式与 JMS 回滚消息类似。 |
以前,事务回滚时的消息重新排队在本地事务之间不一致,当TransactionManager 。
在前一种情况下,正常的重新排队逻辑 (AmqpRejectAndDontRequeueException 或defaultRequeueRejected=false ) 应用(请参阅 消息侦听器和异步情况)。
使用事务管理器,消息在回滚时无条件地重新排队。
从版本 2.0 开始,行为是一致的,并且在这两种情况下都应用了正常的 requeue 逻辑。
要恢复到之前的行为,您可以将容器的alwaysRequeueWithTxManagerRollback property 设置为true .
请参阅 消息侦听器容器配置。 |
用RabbitTransactionManager
RabbitTransactionManager是在外部事务中执行 Rabbit作并与外部事务同步的替代方法。
此事务管理器是PlatformTransactionManager
接口,并且应该与单个 Rabbit 一起使用ConnectionFactory
.
此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。 |
需要应用程序代码来检索事务性 Rabbit 资源,方法是ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
而不是标准Connection.createChannel()
调用,并创建后续通道。
当使用 Spring AMQP 的RabbitTemplate时,它将自动检测线程绑定的Channel并自动参与其事务。
使用 Java 配置,您可以使用以下 bean 设置新的RabbitTransactionManager:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果您更喜欢 XML 配置,则可以在 XML Application Context 文件中声明以下 Bean:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事务同步
将 RabbitMQ 事务与其他事务(例如 DBMS)同步可提供“尽力而为的一阶段提交”语义。
RabbitMQ 事务可能会在事务同步的完成后阶段提交失败。
这由spring-tx
infrastructure 作为错误,但不会向调用代码引发异常。
从版本 2.3.10 开始,您可以调用ConnectionUtils.checkAfterCompletion()
在事务在处理事务的同一线程上提交之后。
如果未发生异常,它将简单地返回;否则,它将抛出一个AfterCompletionFailedException
它将具有一个表示完成同步状态的属性。
通过调用ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
;这是一个全局标志,适用于所有线程。