此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.0! |
启用 Listener Endpoint Annotations
要启用对@RabbitListener
annotations 中,您可以添加@EnableRabbit
到你的@Configuration
类。
以下示例显示了如何执行此作:
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
从 2.0 版本开始,一个DirectMessageListenerContainerFactory
也可用。
它创建DirectMessageListenerContainer
实例。
有关帮助您进行选择的信息SimpleRabbitListenerContainerFactory 和DirectRabbitListenerContainerFactory ,请参阅 选择容器。 |
从版本 2.2.2 开始,您可以提供ContainerCustomizer
implementation (如上所示)。
这可用于在创建和配置容器后进一步配置容器;例如,您可以使用它来设置容器工厂未公开的属性。
版本 2.4.8 提供了CompositeContainerCustomizer
适用于您希望应用多个定制器的情况。
默认情况下,基础架构会查找名为rabbitListenerContainerFactory
作为工厂用于创建消息侦听器容器的源。
在这种情况下,忽略 RabbitMQ 基础设施设置,processOrder
方法的调用方式,核心轮询大小为 3 个线程,最大池大小为 10 个线程。
你可以自定义侦听器容器工厂以用于每个注解,或者你可以通过实现RabbitListenerConfigurer
接口。
仅当注册了至少一个端点而没有特定的容器工厂时,才需要默认值。
请参阅 Javadoc 以获取完整的详细信息和示例。
容器工厂提供了用于添加MessagePostProcessor
在接收消息之后(调用侦听器之前)和发送回复之前应用的实例。
有关回复的信息,请参阅回复管理。
从版本 2.0.6 开始,您可以添加RetryTemplate
和RecoveryCallback
添加到侦听器容器工厂。
它在发送回复时使用。
这RecoveryCallback
在重试次数用尽时调用。
您可以使用SendRetryContextAccessor
从上下文中获取信息。
以下示例显示了如何执行此作:
factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
...
return null;
});
如果您更喜欢 XML 配置,可以使用<rabbit:annotation-driven>
元素。
任何带有 Comments 的 bean@RabbitListener
被检测到。
为SimpleRabbitListenerContainer
实例中,您可以使用类似于以下内容的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
为DirectMessageListenerContainer
实例中,您可以使用类似于以下内容的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
从版本 2.0 开始,@RabbitListener
annotation 具有concurrency
财产。
它支持 SpEL 表达式 (#{…}
) 和属性占位符 (${…}
).
其含义和允许的值取决于容器类型,如下所示:
-
对于
DirectMessageListenerContainer
,该值必须是单个整数值,该值将consumersPerQueue
属性。 -
对于
SimpleRabbitListenerContainer
,该值可以是单个整数值,该值将concurrentConsumers
属性,或者它可以具有m-n
哪里m
是concurrentConsumers
property 和n
是maxConcurrentConsumers
财产。
无论哪种情况,此设置都会覆盖出厂设置。 以前,如果您的侦听器需要不同的并发性,则必须定义不同的容器工厂。
注解还允许覆盖工厂autoStartup
和taskExecutor
属性通过autoStartup
和executor
(自 2.2 起)注解属性。
对每个执行程序使用不同的执行程序可能有助于识别与日志和线程转储中的每个侦听器关联的线程。
版本 2.2 还添加了ackMode
属性,它允许您覆盖容器工厂的acknowledgeMode
财产。
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}