此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.0spring-doc.cadn.net.cn

异步使用者

Spring AMQP 还通过使用@RabbitListener注解,并提供一个开放的基础设施来以编程方式注册端点。 这是迄今为止设置 asynchronous consumer 的最便捷方法。 有关更多详细信息,请参阅 Annotation-driven Listener Endpoints

prefetch 默认值以前为 1,这可能导致高效消费者的利用率不足。 从版本 2.0 开始,默认的 prefetch 值现在是 250,在大多数常见情况下,这应该让消费者忙碌起来,并且 从而提高吞吐量。spring-doc.cadn.net.cn

但是,在某些情况下,预取值应该较低:spring-doc.cadn.net.cn

此外,对于低容量消息收发和多个使用者(包括单个侦听器容器实例中的并发),您可能希望减少预取,以便在使用者之间更均匀地分配消息。spring-doc.cadn.net.cn

有关预取的更多背景信息,请参阅这篇关于 RabbitMQ 中的使用者利用率的博文和这篇关于排队理论的博文。spring-doc.cadn.net.cn

消息侦听器

对于异步Message接收、专用组件(而不是AmqpTemplate) 参与其中。 该组件是Message-consuming 回调。 我们将在本节后面讨论容器及其属性。 不过,首先,我们应该查看回调,因为这是您的应用程序代码与消息传递系统集成的地方。 回调有几个选项,从MessageListener接口,下面的清单显示了:spring-doc.cadn.net.cn

public interface MessageListener {
    void onMessage(Message message);
}

如果你的回调逻辑出于任何原因依赖于 AMQP Channel 实例,你可以改用ChannelAwareMessageListener. 它看起来很相似,但有一个额外的参数。 下面的清单显示了ChannelAwareMessageListener接口定义:spring-doc.cadn.net.cn

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}
在版本 2.1 中,此接口从 packageo.s.amqp.rabbit.coreo.s.amqp.rabbit.listener.api.

MessageListenerAdapter

如果您希望在应用程序逻辑和消息传递 API 之间保持更严格的分离,则可以依赖框架提供的适配器实现。 这通常被称为 “消息驱动的 POJO” 支持。spring-doc.cadn.net.cn

版本 1.5 引入了一种更灵活的 POJO 消息传递机制,@RabbitListener注解。 有关更多信息,请参阅 Annotation-driven Listener Endpoints

使用适配器时,只需提供对适配器本身应调用的实例的引用。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

您可以将适配器子类化并提供getListenerMethodName()以根据消息动态选择不同的方法。 此方法有两个参数,originalMessageextractedMessage,后者是任何转换的结果。 默认情况下,SimpleMessageConverter已配置。 看SimpleMessageConverter有关其他可用转换器的更多信息。spring-doc.cadn.net.cn

从版本 1.4.2 开始,原始消息具有consumerQueueconsumerTag属性,该属性可用于确定从中接收消息的队列。spring-doc.cadn.net.cn

从版本 1.5 开始,你可以配置消费者队列或标签到方法名称的映射,以动态选择要调用的方法。 如果 map 中没有条目,我们将回退到默认的 listener 方法。 默认侦听器方法(如果未设置)为handleMessage.spring-doc.cadn.net.cn

从 2.0 版本开始,一个方便的FunctionalInterface已提供。 下面的清单显示了FunctionalInterface:spring-doc.cadn.net.cn

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

    R handleMessage(T t);

}

此接口有助于使用 Java 8 lambda 方便地配置适配器,如下例所示:spring-doc.cadn.net.cn

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));

从版本 2.2 开始,buildListenerArguments(Object)已弃用,并且是新的buildListenerArguments(Object, Channel, Message)一个已经被引入。 新方法帮助 listener 获取ChannelMessage参数执行更多作,例如调用channel.basicReject(long, boolean)在 Manual acknowledge 模式下。 下面的清单显示了最基本的示例:spring-doc.cadn.net.cn

public class ExtendedListenerAdapter extends MessageListenerAdapter {

    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }

}

现在,您可以配置ExtendedListenerAdapterMessageListenerAdapter如果您需要接收 “频道” 和 “消息”。 listener 的参数应设置为buildListenerArguments(Object, Channel, Message)返回,如下面的 listener 示例所示:spring-doc.cadn.net.cn

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
    ...
}

容器

现在,您已经看到了Message-listening 回调,我们可以将注意力转向容器。 基本上,容器处理 “主动” 职责,以便侦听器回调可以保持被动状态。 容器是 “lifecycle” 组件的一个示例。 它提供了启动和停止的方法。 在配置容器时,您实际上是弥合了 AMQP 队列和MessageListener实例。 您必须提供对ConnectionFactory以及该侦听器应从中使用消息的队列名称或 Queue 实例。spring-doc.cadn.net.cn

在 2.0 版本之前,有一个侦听器容器SimpleMessageListenerContainer. 现在有第二个容器DirectMessageListenerContainer. 选择容器中介绍了选择要使用的容器和可能应用的容器之间的差异。spring-doc.cadn.net.cn

下面的清单显示了最基本的示例,该示例通过使用SimpleMessageListenerContainer:spring-doc.cadn.net.cn

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

作为一个 “活动” 组件,最常见的是使用 bean 定义创建侦听器容器,以便它可以在后台运行。 下面的示例演示了使用 XML 实现此目的的一种方法:spring-doc.cadn.net.cn

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

下面的清单显示了使用 XML 实现此目的的另一种方法:spring-doc.cadn.net.cn

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

前面的两个示例都创建了一个DirectMessageListenerContainer(请注意typeattribute — 它默认为simple).spring-doc.cadn.net.cn

或者,您可能更喜欢使用 Java 配置,它看起来类似于前面的代码片段:spring-doc.cadn.net.cn

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

消费者优先

从 RabbitMQ 版本 3.2 开始,代理现在支持使用者优先级(请参阅在 RabbitMQ 中使用使用者优先级)。 这是通过设置x-priority争论。 这SimpleMessageListenerContainer现在支持设置使用者参数,如下例所示:spring-doc.cadn.net.cn

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

为方便起见,命名空间提供了priority属性listener元素,如下例所示:spring-doc.cadn.net.cn

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

从版本 1.3 开始,您可以修改容器在运行时侦听的队列。 请参阅 侦听器容器队列spring-doc.cadn.net.cn

auto-delete队列

当容器配置为侦听auto-deletequeues,则队列具有x-expires选项,或者在 Broker 上配置了 Time-To-Live 策略,则当容器停止时(即,当最后一个使用者被取消时),代理将删除队列。 在版本 1.3 之前,由于缺少队列,无法重新启动容器。 这RabbitAdmin仅在连接关闭或打开时自动重新声明队列等,这在容器停止和启动时不会发生。spring-doc.cadn.net.cn

从版本 1.3 开始,容器使用RabbitAdmin在启动期间重新声明任何缺失的队列。spring-doc.cadn.net.cn

您还可以将条件声明(请参阅条件声明)与auto-startup="false"admin 将队列声明推迟到容器启动。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

在这种情况下,queue 和 exchange 由containerAdmin,它有auto-startup="false",以便在上下文初始化期间不声明元素。 此外,容器由于同样的原因未启动。 当容器稍后启动时,它会使用其对containerAdmin以声明元素。spring-doc.cadn.net.cn