对于最新的稳定版本,请使用 Spring AMQP 3.2.0! |
AmqpTemplate
与 Spring 框架和相关项目提供的许多其他高级抽象一样, Spring AMQP 提供了一个起着核心作用的“模板”。
定义主要作的接口称为AmqpTemplate
.
这些作涵盖了发送和接收消息的一般行为。
换句话说,它们对于任何实现都不是唯一的 — 因此名称中的“AMQP”。
另一方面,该接口的实现与 AMQP 协议的实现相关联。
与 JMS 不同,JMS 本身是一个接口级 API,而 AMQP 是一个线级协议。
该协议的实现提供自己的 Client 端库,因此 template 接口的每个实现都依赖于特定的 Client 端库。
目前,只有一个 implementation:RabbitTemplate
.
在下面的例子中,我们经常使用AmqpTemplate
.
但是,当您查看实例化模板或调用 setter 的配置示例或任何代码摘录时,您可以看到实现类型(例如RabbitTemplate
).
另请参见 Async Rabbit Template。
添加重试功能
从版本 1.3 开始,您现在可以配置RabbitTemplate
要使用RetryTemplate
以帮助处理代理连接问题。
有关完整信息,请参见spring-retry项目。
下面只是一个使用指数回退策略和默认SimpleRetryPolicy
,这会在将异常引发给调用方之前进行 3 次尝试。
以下示例使用 XML 命名空间:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
以下示例使用@Configuration
Java 中的注释:
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
从版本 1.4 开始,除了retryTemplate
属性、recoveryCallback
选项在RabbitTemplate
.
它用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
.
这RecoveryCallback 在某种程度上受到限制,因为重试上下文仅包含lastThrowable 田。
对于更复杂的使用案例,您应该使用外部RetryTemplate 以便您可以将其他信息传达给RecoveryCallback 通过上下文的属性。
以下示例显示了如何执行此作: |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在这种情况下,您不会注入RetryTemplate
到RabbitTemplate
.
发布是异步的 — 如何检测成功和失败
发布消息是一种异步机制,默认情况下,RabbitMQ 会丢弃无法路由的消息。 要成功发布,您可以接收异步确认,如 Correlated Publisher Confirms and Returns 中所述。 请考虑两种故障情况:
-
发布到 Exchange,但没有匹配的目标队列。
-
发布到不存在的 Exchange。
第一种情况由发布者返回涵盖,如相关发布者确认和返回中所述。
对于第二种情况,消息被丢弃,并且不会生成任何返回。
底层通道因异常而关闭。
默认情况下,会记录此异常,但您可以注册ChannelListener
使用CachingConnectionFactory
获取此类事件的通知。
以下示例演示如何添加ConnectionListener
:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以检查信号的reason
属性来确定发生的问题。
要检测发送线程上的异常,您可以setChannelTransacted(true)
在RabbitTemplate
,并在txCommit()
.
但是,事务会严重影响性能,因此在仅为这一个用例启用事务之前,请仔细考虑这一点。
相关发布者确认并返回
这RabbitTemplate
实现AmqpTemplate
支持 Publisher Confirms 和 Returns。
对于返回的消息,模板的mandatory
property 必须设置为true
或mandatory-expression
必须评估为true
对于特定消息。
此功能需要一个CachingConnectionFactory
它有它的publisherReturns
属性设置为true
(请参阅 发布者确认并返回)。
返回值通过注册一个RabbitTemplate.ReturnsCallback
通过调用setReturnsCallback(ReturnsCallback callback)
.
回调必须实现以下方法:
void returnedMessage(ReturnedMessage returned);
这ReturnedMessage
具有以下属性:
-
message
- 返回的消息本身 -
replyCode
- 指示退货原因的代码 -
replyText
- 退货的文本原因 - 例如NO_ROUTE
-
exchange
- 消息发送到的 Exchange -
routingKey
- 使用的路由密钥
只有一个ReturnsCallback
由每个RabbitTemplate
.
另请参阅 回复超时。
对于发布者确认(也称为发布者确认),模板需要一个CachingConnectionFactory
它有它的publisherConfirm
属性设置为ConfirmType.CORRELATED
.
确认通过注册RabbitTemplate.ConfirmCallback
通过调用setConfirmCallback(ConfirmCallback callback)
.
回调必须实现此方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
这CorrelationData
是客户端在发送原始消息时提供的对象。
这ack
对于ack
和 false 表示nack
.
为nack
实例中,原因可能包含nack
,如果它在nack
生成。
例如,当向不存在的 exchange 发送消息时。
在这种情况下,代理将关闭通道。
关闭的原因包含在cause
.
这cause
已在版本 1.4 中添加。
只有一个ConfirmCallback
受RabbitTemplate
.
当 rabbit template send作完成时,通道将关闭。
当连接工厂缓存已满时,这排除了接收 confirms 或 returns (当缓存中有空间时,通道未物理关闭,返回和 confirm 正常进行)。
当缓存已满时,框架会将关闭时间最多推迟 5 秒,以便有时间接收确认和返回。
使用 confirm 时,当收到最后一次确认时,通道将关闭。
当仅使用返回时,通道将保持打开状态整整 5 秒。
我们通常建议将连接出厂的channelCacheSize 设置为足够大的值,以便将发布消息的通道返回到高速缓存,而不是关闭。
您可以使用 RabbitMQ 管理插件监控通道使用情况。
如果您看到通道正在快速打开和关闭,则应考虑增加高速缓存大小以减少服务器上的开销。 |
在版本 2.1 之前,在收到确认之前,为发布者确认启用的渠道会返回到缓存中。
其他一些进程可以签出通道并执行一些导致通道关闭的作 — 例如将消息发布到不存在的 exchange。
这可能会导致确认丢失。
版本 2.1 及更高版本在确认未完成时不再将通道返回到缓存。
这RabbitTemplate 执行逻辑close() 在每次作后在通道上。
通常,这意味着一次只有一个确认在通道上未完成。 |
从版本 2.2 开始,回调在连接工厂的executor 线程。
这是为了避免在从回调中执行 Rabbit作时出现潜在的死锁。
在以前的版本中,回调是直接在amqp-client 连接 I/O 线程;如果您执行某些 RPC作(例如打开新通道),这将死锁,因为 I/O 线程阻塞等待结果,但结果需要由 I/O 线程本身处理。
对于这些版本,有必要将工作 (例如发送消息) 移交给回调中的另一个线程。
这不再是必需的,因为框架现在将回调调用移交给执行程序。 |
只要返回回调在 60 秒或更短的时间内执行,就仍然保证在 ack 之前收到返回的消息。 确认计划在返回回传退出后或 60 秒后传递,以先到者为准。 |
这CorrelationData
object 具有CompletableFuture
,而不是使用ConfirmCallback
在模板上。
以下示例显示如何配置CorrelationData
实例:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由于它是一个CompletableFuture<Confirm>
,您可以执行以下任一作get()
准备好或使用时的结果whenComplete()
进行异步回调。
这Confirm
object 是具有 2 个属性的简单 bean:ack
和reason
(对于nack
实例)。
未为 broker-generated 填充原因nack
实例。
它为nack
实例(例如,在ack
实例非常出色)。
此外,当同时启用 confirms 和 returns 时,CorrelationData
return
属性中填充返回的消息(如果无法将其路由到任何队列)。
可以保证在使用ack
.CorrelationData.getReturn()
返回ReturnMessage
具有属性:
-
message (返回的消息)
-
回复代码
-
回复文本
-
交换
-
routingKey
另请参阅 Scoped Operations 以获取等待发布者确认的更简单机制。
作用域作
通常,在使用模板时,一个Channel
从缓存中签出(或创建),用于作,并返回到缓存以供重用。
在多线程环境中,不能保证下一个作使用相同的通道。
但是,有时您可能希望对通道的使用进行更多控制,并确保在同一通道上执行许多作。
从版本 2.0 开始,一个名为invoke
提供OperationsCallback
.
在回调范围内和提供的RabbitOperations
参数使用相同的专用Channel
,该 URL 将在最后关闭(不会返回到缓存中)。
如果通道是PublisherCallbackChannel
,则在收到所有确认后,它会返回到缓存中(请参阅 相关发布者确认和返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
您可能需要此 API 的一个例子是,如果您希望使用waitForConfirms()
method 在底层Channel
.
Spring API 以前没有公开此方法,因为如前所述,通道通常是缓存和共享的。
这RabbitTemplate
现在提供waitForConfirms(long timeout)
和waitForConfirmsOrDie(long timeout)
,该通道委托给OperationsCallback
.
出于显而易见的原因,这些方法不能在该范围之外使用。
请注意,其他位置提供了用于将 Confirm 与 requests 相关联的更高级别抽象(请参阅 Correlated Publisher Confirms and Returns)。 如果您只想等待 Broker 确认送达,则可以使用以下示例中所示的技术:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果您愿意RabbitAdmin
在OperationsCallback
,则必须使用相同的RabbitTemplate
用于invoke
操作。
如果模板作已经在现有事务的范围内执行,例如,在事务处理侦听器容器线程上运行并在事务处理模板上执行作时,则前面的讨论没有意义。
在这种情况下,将在该通道上执行作,并在线程返回到容器时提交。
无需使用invoke 在那种情况下。 |
以这种方式使用 confirms 时,实际上并不需要为将 confirm 与请求相关联而设置的大部分基础设施(除非还启用了 return)。
从版本 2.2 开始,连接工厂支持一个名为publisherConfirmType
.
当此项设置为ConfirmType.SIMPLE
,则避免了基础设施,并且确认处理可以更高效。
此外,RabbitTemplate
将publisherSequenceNumber
属性MessageProperties
.
如果您想检查(或记录或以其他方式使用)特定的确认,您可以使用重载的invoke
方法,如下例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
这些ConfirmCallback 对象(对于ack 和nack 实例)是 Rabbit 客户端回调,而不是模板回调。 |
以下示例日志ack
和nack
实例:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
作用域作绑定到线程。 有关多线程环境中严格排序的讨论,请参阅多线程环境中的严格消息排序。 |
多线程环境中的严格消息排序
作用域内作 中的讨论仅在对同一线程执行作时适用。
请考虑以下情况:
-
thread-1
将消息发送到队列,并将工作移交给thread-2
-
thread-2
将消息发送到同一队列
由于 RabbitMQ 的异步性质和缓存通道的使用;不确定是否会使用相同的通道,因此无法保证消息到达队列的顺序。
(在大多数情况下,它们会按顺序到达,但无序送达的概率不为零)。
要解决此用例,您可以使用大小为1
(连同channelCheckoutTimeout
) 确保消息始终在同一频道上发布,并且将保证顺序。
为此,如果连接工厂有其他用途(例如使用者),则应为模板使用专用连接工厂,或将模板配置为使用嵌入在主连接工厂中的 publisher 连接工厂(请参阅使用单独的连接)。
这最好用一个简单的 Spring Boot 应用程序来说明:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使发布是在两个不同的线程上执行的,它们也将使用相同的通道,因为缓存的上限是单个通道。
从版本 2.3.7 开始,ThreadChannelConnectionFactory
支持将线程的通道转移到另一个线程,使用prepareContextSwitch
和switchContext
方法。
第一个方法返回一个上下文,该上下文将传递给调用第二个方法的第二个线程。
线程可以具有绑定到非事务通道或事务通道(或每个通道中的一个)的 URL;除非使用两个连接工厂,否则无法单独传输它们。
示例如下:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦prepareSwitchContext 调用时,如果当前线程再执行任何作,则会在新 Channel 上执行。
当不再需要线程绑定通道时,关闭它很重要。 |
消息传递集成
从版本 1.4 开始,RabbitMessagingTemplate
(建立在RabbitTemplate
)提供了与 Spring Framework 消息传递抽象的集成——即org.springframework.messaging.Message
.
这样,您就可以使用spring-messaging
Message<?>
抽象化。
其他 Spring 项目(例如 Spring 集成和 Spring 的 STOMP 支持)也使用这种抽象。
涉及两个消息转换器:一个用于在 spring-messaging 之间进行转换Message<?>
和 Spring AMQP 的Message
abstraction 和一个用于在 Spring AMQP 的Message
abstraction 和底层 RabbitMQ 客户端库所需的格式。
默认情况下,消息有效负载由提供的RabbitTemplate
实例的消息转换器。
或者,您可以注入自定义MessagingMessageConverter
使用其他一些 payload converter,如下例所示:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
验证的用户 ID
从版本 1.6 开始,模板现在支持user-id-expression
(userIdExpression
使用 Java 配置时)。
如果发送消息,则在评估此表达式后设置 user id 属性(如果尚未设置)。
评估的根对象是要发送的消息。
以下示例演示如何使用user-id-expression
属性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个示例是 Literal 表达式。
第二个获取username
属性。
使用单独的连接
从版本 2.0.2 开始,您可以将usePublisherConnection
property 设置为true
以使用与 Listener 容器使用的连接不同的连接(如果可能)。
这是为了避免当生产者因任何原因被阻止时,使用者被阻止。
为此,连接工厂维护第二个内部连接工厂;默认情况下,它与主工厂的类型相同,但如果您希望使用不同的工厂类型进行发布,则可以显式设置此类型。
如果 rabbit 模板在侦听器容器启动的事务中运行,则无论此设置如何,都会使用容器的通道。
通常,您不应使用RabbitAdmin 的模板将此设置为true .
使用RabbitAdmin 采用连接工厂的构造函数。
如果您使用另一个采用模板的构造函数,请确保模板的值为false .
这是因为,通常使用 admin 来声明侦听器容器的队列。
使用将属性设置为true 意味着独占队列(例如AnonymousQueue ) 将在与侦听器容器使用的连接不同的连接上声明。
在这种情况下,容器不能使用队列。 |