还提供了多种方法,这些方法接受前面为单向发送操作(、 和 )描述的相同参数选项。
这些方法对于请求-答复方案非常有用,因为它们在发送之前处理必要属性的配置,并且可以在内部为此目的创建的独占队列上侦听回复消息。AmqpTemplate
sendAndReceive
exchange
routingKey
Message
reply-to
类似的请求-回复方法也可用于同时应用于请求和回复。
这些方法被命名为 。
有关更多详细信息,请参阅 AmqpTemplate
的 Javadoc。MessageConverter
convertSendAndReceive
从版本 1.5.0 开始,每个方法变体都有一个重载版本,该版本需要 .
与正确配置的连接工厂一起,这样就可以接收操作的发送方的发布者确认。
有关更多信息,请参见 Correlated Publisher Confirms and Returns 和 Javadoc for RabbitOperations
。sendAndReceive
CorrelationData
从版本 2.0 开始,这些方法 () 的变体采用额外的参数来转换复杂的返回类型。
模板必须配置为 .
有关详细信息,请参阅使用 RabbitTemplate
从消息
转换。convertSendAndReceiveAsType
ParameterizedTypeReference
SmartMessageConverter
从版本 2.1 开始,您可以配置带有选项来控制回复使用者的标志。
这是默认设置。RabbitTemplate
noLocalReplyConsumer
noLocal
false
回复超时
默认情况下,send 和 receive 方法在 5 秒后超时并返回 null。
可以通过设置属性来修改此行为。
从版本 1.5 开始,如果将属性设置为(或特定消息的计算结果),则如果消息无法传递到队列,则会引发 an。
此异常具有 、 和 属性,以及用于发送的 and。replyTimeout
mandatory
true
mandatory-expression
true
AmqpMessageReturnedException
returnedMessage
replyCode
replyText
exchange
routingKey
此功能使用发布者返回。
您可以通过将 设置为 on 来启用它(请参阅发布者确认和返回)。
此外,您不得在 .publisherReturns true CachingConnectionFactory ReturnCallback RabbitTemplate |
从版本 2.1.2 开始,添加了一个方法,允许子类被告知超时,以便它们可以清理任何保留的状态。replyTimedOut
从版本 2.0.11 和 2.1.3 开始,当您使用默认 时,可以通过设置模板的属性来添加错误处理程序。
对于任何失败的传递,例如延迟回复和没有相关标头的邮件,都会调用此错误处理程序。
传入的异常是 ,它有一个属性。DirectReplyToMessageListenerContainer
replyErrorHandler
ListenerExecutionFailedException
failedMessage
此功能使用发布者返回。
您可以通过将 设置为 on 来启用它(请参阅发布者确认和返回)。
此外,您不得在 .publisherReturns true CachingConnectionFactory ReturnCallback RabbitTemplate |
RabbitMQ 直接回复
从版本 3.4.0 开始,RabbitMQ 服务器支持直接回复。
这消除了固定回复队列的主要原因(以避免需要为每个请求创建临时队列)。
从 Spring AMQP 版本 1.4.1 开始,默认情况下使用直接回复(如果服务器支持),而不是创建临时回复队列。
如果提供 no(或将其设置为名称为 ),则会自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。
使用直接回复时,不需要 a,也不应配置。replyQueue amq.rabbitmq.reply-to RabbitTemplate reply-listener |
命名队列(除 )仍支持回复侦听器,允许控制回复并发等。amq.rabbitmq.reply-to
从版本 1.6 开始,如果您希望对每个队列使用临时的、独占的自动删除队列
回复,将属性设置为 。
如果设置 .useTemporaryReplyQueues
true
replyAddress
您可以通过子类化和覆盖来更改指示是否使用直接回复的条件,以检查不同的条件。
该方法仅在发送第一个请求时调用一次。RabbitTemplate
useDirectReplyTo()
在版本 2.0 之前,为每个请求创建一个新的使用者,并在收到回复(或超时)时取消使用者。
现在,模板改用 a,让使用者可以重用。
该模板仍然负责关联回复,因此不存在延迟回复发送给其他发件人的危险。
如果要恢复到以前的行为,请将 (使用 XML 配置时)属性设置为 false。RabbitTemplate
DirectReplyToMessageListenerContainer
useDirectReplyToContainer
direct-reply-to-container
没有这样的选项。
当使用直接回复时,它始终使用 a 作为回复。AsyncRabbitTemplate
DirectReplyToContainer
从版本 2.3.7 开始,模板具有新属性。
如果为 ,则服务器不必将相关 ID 从请求消息标头复制到回复消息。
相反,用于发送请求的通道用于将回复与请求相关联。useChannelForCorrelation
true
从版本 3.4.0 开始,RabbitMQ 服务器支持直接回复。
这消除了固定回复队列的主要原因(以避免需要为每个请求创建临时队列)。
从 Spring AMQP 版本 1.4.1 开始,默认情况下使用直接回复(如果服务器支持),而不是创建临时回复队列。
如果提供 no(或将其设置为名称为 ),则会自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。
使用直接回复时,不需要 a,也不应配置。replyQueue amq.rabbitmq.reply-to RabbitTemplate reply-listener |
邮件与回复队列的关联
使用固定应答队列(除 之外)时,必须提供关联数据,以便应答可以与请求相关联。
请参阅 RabbitMQ 远程过程调用 (RPC)。
默认情况下,标准属性用于保存相关数据。
但是,如果您希望使用自定义属性来保存相关数据,则可以在 <rabbit-template/> 上设置该属性。
显式将属性设置为与省略属性相同。
客户端和服务器必须对关联数据使用相同的标头。amq.rabbitmq.reply-to
correlationId
correlation-key
correlationId
Spring AMQP 版本 1.1 使用了为此数据调用的自定义属性。
如果希望在当前版本中恢复此行为(可能是为了保持与使用 1.1 的另一个应用程序的兼容性),则必须将属性设置为 。spring_reply_correlation spring_reply_correlation |
默认情况下,模板会生成自己的相关 ID(忽略任何用户提供的值)。
如果要使用自己的关联 ID,请将实例的属性设置为 。RabbitTemplate
userCorrelationId
true
相关 ID 必须是唯一的,以避免为请求返回错误回复的可能性。 |
Spring AMQP 版本 1.1 使用了为此数据调用的自定义属性。
如果希望在当前版本中恢复此行为(可能是为了保持与使用 1.1 的另一个应用程序的兼容性),则必须将属性设置为 。spring_reply_correlation spring_reply_correlation |
相关 ID 必须是唯一的,以避免为请求返回错误回复的可能性。 |
回复侦听器容器
使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。
但是,可以在模板上配置单个回复队列,这样可以更高效,还可以在该队列上设置参数。
但是,在这种情况下,还必须提供 <reply-listener/> 子元素。
此元素为应答队列提供侦听器容器,模板是侦听器。
<listener-container/> 上允许的所有消息侦听器容器配置属性都允许在元素上使用,但 和 除外,这些属性继承自模板的配置。connection-factory
message-converter
如果运行应用程序的多个实例或使用多个实例,则必须为每个实例使用唯一的应答队列。
RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都会争夺回复,而不一定收到自己的回复。RabbitTemplate |
以下示例定义了一个具有连接工厂的 rabbit 模板:
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享连接工厂,但它们不共享通道。 因此,请求和应答不会在同一事务中执行(如果是事务性的)。
在版本 1.5.0 之前,该属性不可用。
回复始终使用默认交换和名称作为路由键进行路由。
这仍然是默认值,但您现在可以指定新属性。
可以包含带有表单的地址,并且应答被路由到指定的交换,并路由到与路由键绑定的队列。
优先于 。
仅使用时,必须将其配置为单独的组件。
和 (或 上的 ) 属性必须在逻辑上引用同一队列。reply-address reply-queue reply-address reply-address <exchange>/<routingKey> reply-address reply-queue reply-address <reply-listener> <listener-container> reply-address reply-queue queues <listener-container> |
在此配置中,a 用于接收回复,其中 .
使用命名空间元素定义模板时(如前面的示例所示),解析器将模板中的容器和线路定义为侦听器。SimpleListenerContainer
RabbitTemplate
MessageListener
<rabbit:template/>
当模板不使用固定的(或正在使用直接回复 — 请参阅 RabbitMQ 直接回复)时,不需要侦听器容器。
直接是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。replyQueue reply-to |
如果将 you 定义为 或使用类将其定义为 或在以编程方式创建模板时,则需要自行定义和连接回复侦听器容器。
如果不执行此操作,模板将永远不会收到回复,最终会超时并返回 null 作为对方法调用的回复。RabbitTemplate
<bean/>
@Configuration
@Bean
sendAndReceive
从版本 1.5 开始,检测它是否已经
配置为接收回复。
如果没有,则尝试发送和接收带有回复地址的邮件
失败,并显示 (因为从未收到回复)。RabbitTemplate
MessageListener
IllegalStateException
此外,如果使用简单(队列名称),则应答侦听器容器将验证它是否正在侦听
到具有相同名称的队列。
如果回复地址是交换和路由密钥,并且写入了调试日志消息,则无法执行此检查。replyAddress
在连接回复侦听器和模板时,请务必确保模板和容器的(或)属性引用同一队列。
该模板将答复地址插入到出站邮件属性中。replyAddress queues queueNames replyTo |
以下列表显示了如何手动连接 Bean 的示例:
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
此测试用例中显示了具有固定回复队列的连线的完整示例,以及处理请求并返回回复的“远程”侦听器容器。RabbitTemplate
当回复超时 () 时,方法返回 null。replyTimeout sendAndReceive() |
在版本 1.3.6 之前,仅记录超时消息的延迟回复。
现在,如果收到延迟回复,则会被拒绝(模板会抛出 )。
如果回复队列配置为将被拒绝的邮件发送到死信交换,则可以检索回复以供以后分析。
为此,请使用与应答队列名称相等的路由密钥将队列绑定到配置的死信交换。AmqpRejectAndDontRequeueException
有关配置死字的更多信息,请参阅 RabbitMQ 死信文档。
您还可以查看测试用例以获取示例。FixedReplyQueueDeadLetterTests
如果运行应用程序的多个实例或使用多个实例,则必须为每个实例使用唯一的应答队列。
RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都会争夺回复,而不一定收到自己的回复。RabbitTemplate |
在版本 1.5.0 之前,该属性不可用。
回复始终使用默认交换和名称作为路由键进行路由。
这仍然是默认值,但您现在可以指定新属性。
可以包含带有表单的地址,并且应答被路由到指定的交换,并路由到与路由键绑定的队列。
优先于 。
仅使用时,必须将其配置为单独的组件。
和 (或 上的 ) 属性必须在逻辑上引用同一队列。reply-address reply-queue reply-address reply-address <exchange>/<routingKey> reply-address reply-queue reply-address <reply-listener> <listener-container> reply-address reply-queue queues <listener-container> |
当模板不使用固定的(或正在使用直接回复 — 请参阅 RabbitMQ 直接回复)时,不需要侦听器容器。
直接是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。replyQueue reply-to |
在连接回复侦听器和模板时,请务必确保模板和容器的(或)属性引用同一队列。
该模板将答复地址插入到出站邮件属性中。replyAddress queues queueNames replyTo |
当回复超时 () 时,方法返回 null。replyTimeout sendAndReceive() |
异步兔子模板
版本 1.6 引入了 .
这与 AmqpTemplate
上的方法类似(和)。
但是,它们不会阻塞,而是返回 .AsyncRabbitTemplate
sendAndReceive
convertSendAndReceive
CompletableFuture
这些方法返回一个 .
这些方法返回一个 .sendAndReceive
RabbitMessageFuture
convertSendAndReceive
RabbitConverterFuture
您可以稍后通过调用 future 来同步检索结果,也可以注册与结果异步调用的回调。
以下列表显示了这两种方法:get()
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果设置了消息,但无法传递消息,则 future 将抛出一个原因为 ,它封装了返回的消息和有关返回的信息。mandatory
ExecutionException
AmqpMessageReturnedException
如果设置了 ,则 future 有一个名为 的属性,该属性本身就是一个,表示发布成功。
如果确认的将来是 ,则还有一个名为 的属性,该属性包含失败的原因(如果可用)。enableConfirms
confirm
CompletableFuture<Boolean>
true
false
RabbitFuture
nackCause
如果在回复后收到发布者确认,则会丢弃该确认,因为回复意味着发布成功。 |
您可以将模板上的属性设置为超时回复(默认为 - 30 秒)。
如果发生超时,则 future 将使用 .receiveTimeout
30000
AmqpReplyTimeoutException
模板实现 .
在有待处理的答复时停止模板会导致取消待处理的实例。SmartLifecycle
Future
从版本 2.0 开始,异步模板现在支持直接回复,而不是配置的回复队列。 若要启用此功能,请使用以下构造函数之一:
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参阅 RabbitMQ Direct reply-to 以将 direct reply-to 与同步 .RabbitTemplate
版本 2.0 引入了这些方法 () 的变体,这些变体采用额外的参数来转换复杂的返回类型。
您必须使用 .
有关详细信息,请参阅使用 RabbitTemplate
从消息
转换。convertSendAndReceiveAsType
ParameterizedTypeReference
RabbitTemplate
SmartMessageConverter
从版本 3.0 开始,这些方法现在返回 s 而不是 s。AsyncRabbitTemplate CompletableFuture ListenableFuture |
如果在回复后收到发布者确认,则会丢弃该确认,因为回复意味着发布成功。 |
从版本 3.0 开始,这些方法现在返回 s 而不是 s。AsyncRabbitTemplate CompletableFuture ListenableFuture |
使用 AMQP 进行 Spring Remoting
不再支持 Spring 远程处理,因为该功能已从 Spring 框架中删除。
使用(客户端)和操作。sendAndReceive
RabbitTemplate
@RabbitListener