Spring AMQP 提供的一些关键(也是最受欢迎的)高级功能与在发生协议错误或代理故障时的恢复和自动重新连接有关。 我们已经在本指南中看到了所有相关组件,但将它们全部汇集在一起并单独调用功能和恢复方案应该会有所帮助。
主要的重新连接功能由其本身启用。
使用自动申报功能通常也是有益的。
此外,如果您关心保证交付,您可能还需要在 和 中使用标志(如果您自己做 acks,则使用手动)。CachingConnectionFactory
RabbitAdmin
channelTransacted
RabbitTemplate
SimpleMessageListenerContainer
AcknowledgeMode.AUTO
SimpleMessageListenerContainer
自动声明交换、队列和绑定
该组件可以在启动时声明交换、队列和绑定。
它通过 .
因此,如果代理在启动时不存在,则无关紧要。
第一次使用 a(例如,
通过发送消息),侦听器将触发并应用管理功能。
在侦听器中执行自动声明的另一个好处是,如果连接因任何原因(例如,
代理死亡、网络故障等),当重新建立连接时,它们将再次应用。RabbitAdmin
ConnectionListener
Connection
以这种方式声明的队列必须具有固定的名称 - 由实例的框架显式声明或生成。
匿名队列是非持久性、独占性和自动删除的。AnonymousQueue |
仅当缓存模式为(默认值)时,才会执行自动声明。
之所以存在此限制,是因为独占队列和自动删除队列绑定到连接。CachingConnectionFactory CHANNEL |
从 V2.2.2 开始,将在实际处理声明之前检测类型的 Bean 并应用该函数。
例如,在新参数(属性)在框架中具有第一类支持之前,这很有用。RabbitAdmin
DeclarableCustomizer
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
在不提供对 Bean 定义的直接访问的项目中,它也很有用。Declarable
另请参阅 RabbitMQ 自动连接/拓扑恢复。
以这种方式声明的队列必须具有固定的名称 - 由实例的框架显式声明或生成。
匿名队列是非持久性、独占性和自动删除的。AnonymousQueue |
仅当缓存模式为(默认值)时,才会执行自动声明。
之所以存在此限制,是因为独占队列和自动删除队列绑定到连接。CachingConnectionFactory CHANNEL |
同步操作中的失败和重试选项
如果在使用(例如)时以同步顺序失去与代理的连接,Spring AMQP 会抛出一个(通常,但并非总是如此)。
我们不会试图隐瞒存在问题的事实,因此您必须能够捕获并响应异常。
如果您怀疑连接丢失(这不是您的错),最简单的办法是再次尝试该操作。
您可以手动执行此操作,也可以考虑使用 Spring Retry 来处理重试(命令式或声明式)。RabbitTemplate
AmqpException
AmqpIOException
Spring Retry 提供了几个 AOP 拦截器,并提供了极大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。
Spring AMQP 还提供了一些方便的工厂 bean,用于以 AMQP 用例的方便形式创建 Spring Retry 拦截器,并具有可用于实现自定义恢复逻辑的强类型回调接口。
有关详细信息,请参阅 Javadoc 和 和 的属性。
如果没有事务或事务在重试回调中启动事务,则无状态重试是合适的。
请注意,无状态重试比有状态重试更易于配置和分析,但如果存在必须回滚或肯定要回滚的持续事务,则通常不合适。
在事务中间断开的连接应具有与回滚相同的效果。
因此,对于事务在堆栈上层启动的重新连接,有状态重试通常是最佳选择。
有状态重试需要一种机制来唯一标识消息。
最简单的方法是让发送方在 message 属性中放置一个唯一值。
提供的消息转换器提供了一个选项来执行此操作:您可以设置为 。
否则,可以将实现注入到拦截器中。
密钥生成器必须为每条消息返回一个唯一的密钥。
在 2.0 版之前的版本中,提供了 a。
它使没有属性的消息只能重试一次(忽略重试设置)。
不再提供此建议,因为与版本 1.2 一起,其功能内置于拦截器和消息侦听器容器中。StatefulRetryOperationsInterceptor
StatelessRetryOperationsInterceptor
MessageId
createMessageIds
true
MessageKeyGenerator
MissingMessageIdAdvice
messageId
spring-retry
为了向后兼容,默认情况下(重试一次后),具有 null 消息 ID 的消息对使用者来说是致命的(使用者已停止)。
若要复制 提供的功能,可以将该属性设置为 on the listener 容器。
使用该设置时,使用者将继续运行,并且消息被拒绝(重试一次后)。
它被丢弃或路由到死信队列(如果配置了一个)。MissingMessageIdAdvice statefulRetryFatalWithNullMessageId false |
从版本 1.3 开始,提供了一个构建器 API,以帮助使用 Java(在类中)组装这些拦截器。
以下示例演示如何执行此操作:@Configuration
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只能以这种方式配置重试功能的子集。
更高级的功能需要将 a 配置为 Spring bean。
请参阅 Spring Retry Javadoc 以获取有关可用策略及其配置的完整信息。RetryTemplate
为了向后兼容,默认情况下(重试一次后),具有 null 消息 ID 的消息对使用者来说是致命的(使用者已停止)。
若要复制 提供的功能,可以将该属性设置为 on the listener 容器。
使用该设置时,使用者将继续运行,并且消息被拒绝(重试一次后)。
它被丢弃或路由到死信队列(如果配置了一个)。MissingMessageIdAdvice statefulRetryFatalWithNullMessageId false |
使用批处理侦听器重试
不建议使用批处理侦听器配置重试,除非批处理是由生产者在单个记录中创建的。 有关使用者和生产者创建的批处理的信息,请参阅批处理消息。 对于使用者创建的批处理,框架不知道批处理中的哪条消息导致了失败,因此在重试用尽后无法恢复。 对于生产者创建的批处理,由于实际上只有一条消息失败,因此可以恢复整个消息。 应用程序可能希望通过设置引发异常的索引属性来通知自定义恢复程序在批处理中发生故障的位置。
批处理侦听器的重试恢复器必须实现 。MessageBatchRecoverer
消息侦听器和异步案例
如果由于业务异常而失败,则异常由消息侦听器容器处理,然后该容器返回侦听另一条消息。
如果故障是由断开的连接(不是业务异常)引起的,则必须取消并重新启动为侦听器收集消息的使用者。
无缝处理此问题,并留下一个日志,说明侦听器正在重新启动。
事实上,它无休止地循环,试图重新启动消费者。
只有当消费者的行为确实非常糟糕时,它才会放弃。
一个副作用是,如果代理在容器启动时关闭,它会继续尝试,直到可以建立连接。MessageListener
SimpleMessageListenerContainer
与协议错误和断开连接相比,业务异常处理可能需要更多的思考和一些自定义配置,尤其是在使用事务或容器确认器时。
在 2.8.x 之前,RabbitMQ 没有对死信行为的定义。
因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无休止地重新传递。
为了限制客户重新交付的次数,一种选择是在听众的建议链中。
拦截器可以有一个恢复回调,用于实现自定义死信操作 - 任何适合您的特定环境的操作。StatefulRetryOperationsInterceptor
另一种方法是将容器的属性设置为 。
这会导致丢弃所有失败的消息。
当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传递到死信交换。defaultRequeueRejected
false
或者,您可以抛出一个 .
这样做会阻止消息重新排队,而不管属性的设置如何。AmqpRejectAndDontRequeueException
defaultRequeueRejected
从 2.1 版开始,引入了 an 来执行完全相反的逻辑:无论属性的设置如何,消息都将重新排队。ImmediateRequeueAmqpException
defaultRequeueRejected
通常,这两种技术的组合使用。
您可以在建议链中使用 a 和 a 来抛出 .
当所有重试都用尽时,将调用 。
正是这样做的。
默认值使用错误消息并发出消息。StatefulRetryOperationsInterceptor
MessageRecoverer
AmqpRejectAndDontRequeueException
MessageRecover
RejectAndDontRequeueRecoverer
MessageRecoverer
WARN
从版本 1.3 开始,将提供一个新功能,以允许在重试耗尽后发布失败的消息。RepublishMessageRecoverer
当恢复器使用最后一个异常时,消息将被确认,并且不会由代理发送到死信交换(如果已配置)。
在使用者端使用时,接收到的消息位于 message 属性中。
在本例中,是 .
这意味着经纪人的交付模式。
从版本 2.0 开始,您可以配置 for to set into the message to republish if if .
默认情况下,它使用默认值 - 。RepublishMessageRecoverer deliveryMode receivedDeliveryMode deliveryMode null NON_PERSISTENT RepublishMessageRecoverer deliveryMode null MessageProperties MessageDeliveryMode.PERSISTENT |
以下示例演示如何将 a 设置为恢复器:RepublishMessageRecoverer
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
在消息头中发布包含其他信息的消息,例如异常消息、堆栈跟踪、原始交换和路由密钥。
可以通过创建子类并覆盖 来添加其他标头。
(或任何其他属性)也可以在 中更改,如以下示例所示:RepublishMessageRecoverer
additionalHeaders()
deliveryMode
additionalHeaders()
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从版本 2.0.5 开始,如果堆栈跟踪太大,则可能会被截断;这是因为所有标头都必须适合单个帧。
默认情况下,如果堆栈跟踪会导致少于 20,000 字节(“余量”)可用于其他标头,则它将被截断。
这可以通过设置恢复器的属性来调整,如果您需要更多或更少的空间用于其他标头。
从版本 2.1.13、2.2.3 开始,异常消息将包含在此计算中,并且将使用以下算法最大化堆栈跟踪量:frameMaxHeadroom
-
如果单独的堆栈跟踪超过限制,则异常消息标头将被截断为 97 字节以上,并且堆栈跟踪也会被截断。
…
-
如果堆栈跟踪很小,则消息将被截断(加号)以适应可用字节(但堆栈跟踪本身中的消息将被截断为 97 字节加)。
…
…
每当发生任何类型的截断时,都会记录原始异常以保留完整信息。 在增强标头后执行计算,以便在表达式中使用异常类型等信息。
从版本 2.4.8 开始,错误交换和路由密钥可以作为 SpEL 表达式提供,其中 是评估的根对象。Message
从版本 2.3.3 开始,提供了一个新的子类;这支持两种样式的发布者确认,并将等待确认后再返回(如果未确认或返回消息,则引发异常)。RepublishMessageRecovererWithConfirms
如果 confirm 类型为 ,则子类还将检测是否返回消息并抛出 ;如果出版物被否定确认,它将抛出一个 .CORRELATED
AmqpMessageReturnedException
AmqpNackReceivedException
如果 confirm 类型为 ,则子类将在通道上调用该方法。SIMPLE
waitForConfirmsOrDie
有关确认和退货的更多信息,请参阅发布商确认和退货。
从版本 2.1 开始,将添加 an 以抛出 ,这会通知侦听器容器对当前失败的消息重新排队。ImmediateRequeueMessageRecoverer
ImmediateRequeueAmqpException
在使用者端使用时,接收到的消息位于 message 属性中。
在本例中,是 .
这意味着经纪人的交付模式。
从版本 2.0 开始,您可以配置 for to set into the message to republish if if .
默认情况下,它使用默认值 - 。RepublishMessageRecoverer deliveryMode receivedDeliveryMode deliveryMode null NON_PERSISTENT RepublishMessageRecoverer deliveryMode null MessageProperties MessageDeliveryMode.PERSISTENT |
Spring Retry 的异常分类
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。
默认配置将重试所有异常。
鉴于用户异常被包装在 中,我们需要确保分类检查异常原因。
默认分类器仅查看顶级异常。ListenerExecutionFailedException
从 Spring Retry 1.0.3 开始,该属性名为 (default: )。
当 时,它会遍历异常原因,直到找到匹配项或没有原因。BinaryExceptionClassifier
traverseCauses
false
true
若要使用此分类器进行重试,可以使用 created 和构造函数一起使用,该构造函数采用最大尝试次数、实例数和布尔值 (),并将此策略注入 .SimpleRetryPolicy
Map
Exception
traverseCauses
RetryTemplate