此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.0! |
请求/回复消息
这AmqpTemplate
还提供多种sendAndReceive
方法接受前面描述的单向发送作 (exchange
,routingKey
和Message
).
这些方法对于请求-回复场景非常有用,因为它们处理必要的reply-to
属性,并且可以在内部为此目的创建的独占队列上侦听回复消息。
类似的请求-回复方法也可用,其中MessageConverter
将同时应用于请求和回复。
这些方法被命名为convertSendAndReceive
.
请参阅Javadoc 的AmqpTemplate
了解更多详情。
从版本 1.5.0 开始,每个sendAndReceive
method 变体有一个重载版本,该版本采用CorrelationData
.
与正确配置的连接工厂一起,这允许接收作发送端的发布者确认。
请参阅相关的发布者确认和返回以及Javadoc 的RabbitOperations
了解更多信息。
从版本 2.0 开始,这些方法有变体 (convertSendAndReceiveAsType
),该ParameterizedTypeReference
参数来转换复杂的返回类型。
模板必须配置有SmartMessageConverter
.
看从Message
跟RabbitTemplate
了解更多信息。
从版本 2.1 开始,您可以配置RabbitTemplate
使用noLocalReplyConsumer
选项来控制noLocal
标志。
这是false
默认情况下。
回复超时
默认情况下,发送和接收方法会在 5 秒后超时并返回 null。
您可以通过设置replyTimeout
财产。
从版本 1.5 开始,如果将mandatory
property 设置为true
(或mandatory-expression
计算结果为true
对于特定消息),如果消息无法传送到队列,则AmqpMessageReturnedException
被抛出。
此异常具有returnedMessage
,replyCode
和replyText
属性以及exchange
和routingKey
用于发送。
此功能使用发布者返回。
您可以通过设置publisherReturns 自true 在CachingConnectionFactory (请参阅 发布者确认并返回)。
此外,您不得注册自己的ReturnCallback 使用RabbitTemplate . |
从版本 2.1.2 开始,replyTimedOut
方法,让子类被告知超时,以便它们可以清理任何保留状态。
从版本 2.0.11 和 2.1.3 开始,当您使用默认的DirectReplyToMessageListenerContainer
中,您可以通过设置模板的replyErrorHandler
财产。
对于任何失败的投放,如延迟回复和在没有关联标头的情况下收到的消息,都会调用此错误处理程序。
传入的异常是一个ListenerExecutionFailedException
,它有一个failedMessage
财产。
RabbitMQ 直接回复
从版本 3.4.0 开始,RabbitMQ 服务器支持直接回复。
这消除了固定回复队列的主要原因(以避免需要为每个请求创建临时队列)。
从 Spring AMQP 版本 1.4.1 开始,默认情况下使用直接回复(如果服务器支持)而不是创建临时回复队列。
当 noreplyQueue 提供(或者它设置为 nameamq.rabbitmq.reply-to )、RabbitTemplate 自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。
使用直接回复时,reply-listener 不是必需的,也不应进行配置。 |
命名队列仍支持回复侦听器(除了amq.rabbitmq.reply-to
)、允许控制回复并发等。
从版本 1.6 开始,如果您希望为每个
reply,将useTemporaryReplyQueues
property 设置为true
.
如果您将replyAddress
.
您可以通过子类化来更改指示是否使用直接回复的条件RabbitTemplate
并覆盖useDirectReplyTo()
以检查不同的标准。
该方法仅在发送第一个请求时调用一次。
在 2.0 版本之前,RabbitTemplate
为每个请求创建一个新的使用者,并在收到回复(或超时)时取消该使用者。
现在,模板使用DirectReplyToMessageListenerContainer
相反,让消费者被重用。
该模板仍然负责关联回复,因此不存在延迟回复发送给其他发件人的危险。
如果要恢复到之前的行为,请将useDirectReplyToContainer
(direct-reply-to-container
)属性设置为 false。
这AsyncRabbitTemplate
没有这样的选项。
它总是使用DirectReplyToContainer
for reply。
从版本 2.3.7 开始,模板具有一个新属性useChannelForCorrelation
.
当这是true
,则服务器不必将相关 ID 从请求消息标头复制到回复消息。
相反,用于发送请求的通道用于将回复与请求相关联。
Message Correlation With A Reply Queue
当使用固定回复队列(除了amq.rabbitmq.reply-to
),则必须提供关联数据,以便将回复与请求相关联。
请参阅 RabbitMQ 远程过程调用 (RPC)。
默认情况下,标准的correlationId
属性用于保存关联数据。
但是,如果您希望使用自定义属性来保存关联数据,则可以设置correlation-key
<rabbit-template/> 上的属性。
将属性显式设置为correlationId
与省略 attribute 相同。
客户端和服务器必须对关联数据使用相同的标头。
Spring AMQP 版本 1.1 使用名为spring_reply_correlation 对于此数据。
如果您希望在当前版本中恢复此行为(可能是为了保持与使用 1.1 的其他应用程序的兼容性),则必须将属性设置为spring_reply_correlation . |
默认情况下,模板会生成自己的相关 ID(忽略任何用户提供的值)。
如果您希望使用自己的关联 ID,请将RabbitTemplate
实例的userCorrelationId
property 设置为true
.
相关 ID 必须是唯一的,以避免为请求返回错误回复的可能性。 |
回复侦听器容器
当使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。
但是,可以在模板上配置单个回复队列,这样可以更高效,并且还允许您在该队列上设置参数。
但是,在这种情况下,您还必须提供 <reply-listener/> 子元素。
此元素为回复队列提供侦听器容器,模板是侦听器。
元素上允许的所有 <listener-container/> 消息侦听器容器配置属性都允许,但connection-factory
和message-converter
,这些参数继承自模板的配置。
如果您运行应用程序的多个实例或使用多个RabbitTemplate 实例,您必须为每个实例使用唯一的回复队列。
RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都将争夺回复,而不必收到自己的回复。 |
下面的示例定义了一个带有连接工厂的 rabbit 模板:
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们不共享一个通道。 因此,请求和回复不会在同一事务中执行(如果是事务性的)。
在 1.5.0 版本之前,reply-address 属性不可用。
回复始终使用默认 exchange 进行路由,并且reply-queue name 作为路由密钥。
这仍然是默认值,但您现在可以指定新的reply-address 属性。
这reply-address 可以包含格式为<exchange>/<routingKey> 并且回复将路由到指定的 Exchange,并路由到与路由密钥绑定的队列。
这reply-address 的优先级高于reply-queue .
当只有reply-address 正在使用中,则<reply-listener> 必须配置为单独的<listener-container> 元件。
这reply-address 和reply-queue (或queues 属性<listener-container> ) 必须在逻辑上引用同一队列。 |
通过此配置,一个SimpleListenerContainer
用于接收回复,其中RabbitTemplate
作为MessageListener
.
当使用<rabbit:template/>
namespace 元素,如前面的示例所示,解析器将模板中的容器和连线定义为侦听器。
当模板不使用固定的replyQueue (或者正在使用直接回复 — 请参阅 RabbitMQ 直接回复),则不需要侦听器容器。
直接reply-to 是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。 |
如果您定义RabbitTemplate
作为<bean/>
或使用@Configuration
类将其定义为@Bean
或者,当您以编程方式创建模板时,您需要自己定义并连接回复侦听器容器。
如果您未能执行此作,则模板永远不会收到回复,并最终超时并返回 null 作为对sendAndReceive
方法。
从版本 1.5 开始,RabbitTemplate
检测是否已
配置为MessageListener
以接收回复。
否则,尝试使用回复地址发送和接收消息
fail 替换为IllegalStateException
(因为从未收到回复)。
此外,如果简单的replyAddress
(队列名称),则回复侦听器容器将验证它是否正在侦听
添加到具有相同名称的队列中。
如果回复地址是交换和路由密钥,并且写入了调试日志消息,则无法执行此检查。
当您自己连接回复侦听器和模板时,请务必确保模板的replyAddress 和容器的queues (或queueNames ) 属性引用同一队列。
模板将回复地址插入到出站邮件中replyTo 财产。 |
下面的清单显示了如何手动连接 bean 的示例:
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一个完整的RabbitTemplate
此测试用例显示了与固定回复队列相连的 “远程” 侦听器容器,该容器处理请求并返回回复。
当回复超时 (replyTimeout )、sendAndReceive() 方法返回 null。 |
在版本 1.3.6 之前,仅记录超时消息的延迟回复。
现在,如果收到延迟的回复,则会被拒绝(模板会抛出一个AmqpRejectAndDontRequeueException
).
如果回复队列配置为将被拒绝的消息发送到死信交换,则可以检索回复以供以后分析。
为此,请将队列绑定到配置的死信交换,其路由键等于回复队列的名称。
有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。
您还可以查看FixedReplyQueueDeadLetterTests
test case 为例。
异步 Rabbit 模板
版本 1.6 引入了AsyncRabbitTemplate
.
这具有类似的sendAndReceive
(以及convertSendAndReceive
) 方法更改为AmqpTemplate
.
但是,它们不是阻塞,而是返回一个CompletableFuture
.
这sendAndReceive
方法返回一个RabbitMessageFuture
.
这convertSendAndReceive
方法返回一个RabbitConverterFuture
.
您可以稍后通过调用get()
on the future,或者您可以注册一个与 result 异步调用的回调。
下面的清单显示了这两种方法:
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果mandatory
设置了该 API 的 API API 的 API API 中,并且无法传递消息,则 future 会抛出一个ExecutionException
原因为AmqpMessageReturnedException
,它封装了返回的消息和有关返回的信息。
如果enableConfirms
设置,则 future 具有一个名为confirm
,它本身就是一个CompletableFuture<Boolean>
跟true
表示发布成功。
如果确认 future 为false
这RabbitFuture
具有另一个名为nackCause
,其中包含失败的原因(如果可用)。
如果在回复后收到发布者确认,则会丢弃发布者确认,因为回复意味着发布成功。 |
您可以设置receiveTimeout
属性来超时回复(默认为30000
- 30 秒)。
如果发生超时,则 future 将以AmqpReplyTimeoutException
.
该模板实现SmartLifecycle
.
在有待处理回复时停止模板会导致Future
要取消的实例。
从版本 2.0 开始,异步模板现在支持直接回复,而不是配置的回复队列。 要启用此功能,请使用以下构造函数之一:
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参阅 RabbitMQ 直接回复 以将直接回复与同步RabbitTemplate
.
版本 2.0 引入了这些方法的变体 (convertSendAndReceiveAsType
),该ParameterizedTypeReference
参数来转换复杂的返回类型。
您必须配置底层RabbitTemplate
替换为SmartMessageConverter
.
看从Message
跟RabbitTemplate
了解更多信息。
使用 AMQP 的 Spring 远程处理
Spring 远程处理不再受支持,因为该功能已从 Spring Framework 中删除。
用sendAndReceive
作RabbitTemplate
(客户端)和@RabbitListener
相反。