此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.0spring-doc.cadn.net.cn

轮询消费者

AmqpTemplate本身可用于轮询Message接待。 默认情况下,如果没有可用的消息,null将立即返回。 没有阻塞。 从版本 1.5 开始,您可以设置receiveTimeout,以毫秒为单位,并且 receive 方法会阻塞长达该时间,等待消息。 小于零的值表示无限期阻止(或至少在与 broker 的连接丢失之前)。 版本 1.6 引入了receive允许在每次调用时传入超时的方法。spring-doc.cadn.net.cn

由于接收作会创建一个新的QueueingConsumer对于每条消息,此技术并不真正适用于高容量环境。 考虑使用异步使用者或receiveTimeout为零。

从版本 2.4.8 开始,当使用非零超时时,你可以指定传递给basicConsume将 Consumer 与 Channel 关联的方法。 例如:template.addConsumerArg("x-priority", 10).spring-doc.cadn.net.cn

有四个简单的receive方法可用。 与Exchange在发送端,有一个方法要求已设置默认队列属性 直接放在模板本身上,并且有一个在运行时接受 queue 参数的方法。 版本 1.6 引入了接受timeoutMillis覆盖receiveTimeout基于每个请求。 下面的清单显示了这四种方法的定义:spring-doc.cadn.net.cn

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

与发送消息一样,AmqpTemplate有一些方便的方法来接收 POJO,而不是Message实例,而实现提供了一种自定义MessageConverter用于创建Object返回: 下面的清单显示了这些方法:spring-doc.cadn.net.cn

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

从版本 2.0 开始,这些方法有一些变体,它们需要额外的ParameterizedTypeReference参数来转换复杂类型。 模板必须配置有SmartMessageConverter. 看MessageRabbitTemplate了解更多信息。spring-doc.cadn.net.cn

sendAndReceive方法,从版本 1.3 开始,AmqpTemplate具有多种便利性receiveAndReply同步接收、处理和回复消息的方法。 下面的清单显示了这些方法定义:spring-doc.cadn.net.cn

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
       throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
     throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
     ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
            ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

AmqpTemplateimplementation 负责receivereply阶段。 在大多数情况下,您应该只提供ReceiveAndReplyCallback为收到的消息执行一些业务逻辑,并根据需要构建 Reply 对象或 Message。 注意,一个ReceiveAndReplyCallback可能会返回null. 在这种情况下,不会发送任何回复,并且receiveAndReply其工作原理类似于receive方法。 这允许将同一队列用于混合消息,其中一些消息可能不需要回复。spring-doc.cadn.net.cn

仅当提供的回调不是ReceiveAndReplyMessageCallback,它提供原始消息交换协定。spring-doc.cadn.net.cn

ReplyToAddressCallback对于需要自定义逻辑来确定replyToaddress 并回复ReceiveAndReplyCallback. 默认情况下,replyTo请求消息中的信息用于路由回复。spring-doc.cadn.net.cn

下面的清单显示了基于 POJO 的接收和回复的示例:spring-doc.cadn.net.cn

boolean received =
        this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {

                public Invoice handle(Order order) {
                        return processOrder(order);
                }
        });
if (received) {
        log.info("We received an order!");
}