此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.0! |
发送消息
发送消息时,您可以使用以下任一方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前面清单中的最后一个方法开始讨论,因为它实际上是最明确的。
它允许在运行时提供 AMQP 交换名称(以及路由密钥)。
最后一个参数是负责实际创建 message 实例的回调。
使用此方法发送消息的示例可能如下所示:
以下示例演示如何使用send
发送消息的方法:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
您可以设置exchange
属性(如果您计划在大部分或所有时间使用该 Template 实例发送到同一 Exchange)。
在这种情况下,您可以使用前面清单中的第二种方法。
以下示例在功能上等同于前面的示例:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果exchange
和routingKey
properties 的 API API 中,您可以使用仅接受Message
.
以下示例显示了如何执行此作:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
考虑 exchange 和 routing key 属性的更好方法是显式方法参数始终覆盖模板的默认值。
事实上,即使您没有在模板上显式设置这些属性,也始终存在默认值。
在这两种情况下,默认值都是空的String
,但这实际上是一个明智的默认值。
就路由密钥而言,它并不总是必要的(例如,对于
一个Fanout
交换)。
此外,队列可以绑定到具有空String
.
这些都是依赖默认 empty 的合法情况String
模板的 routing key 属性的值。
就交易所名称而言,空的String
之所以常用,是因为 AMQP 规范将“默认交换”定义为没有名称。
由于所有队列都自动绑定到该默认交换(这是直接交换),因此使用它们的名称作为绑定值,因此前面列表中的第二种方法可用于通过默认交换向任何队列进行简单的点对点消息传递。
您可以将队列名称作为routingKey
,通过在运行时提供 method 参数。
以下示例显示了如何执行此作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,您可以创建一个模板,该模板可用于主要或专门发布到单个 Queue。 以下示例显示了如何执行此作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息生成器 API
从版本 1.3 开始,消息构建器 API 由MessageBuilder
和MessagePropertiesBuilder
.
这些方法提供了一种方便的“Fluent”方法来创建消息或消息属性。
以下示例显示了 Fluent API 的运行情况:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
在MessageProperties
可以设置。
其他方法包括setHeader(String key, String value)
,removeHeader(String key)
,removeHeaders()
和copyProperties(MessageProperties properties)
.
每个属性设置方法都有一个set*IfAbsent()
变体。
在存在默认初始值的情况下,该方法命名为set*IfAbsentOrDefault()
.
提供了五种静态方法来创建初始消息生成器:
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
1 | 生成器创建的消息具有一个正文,该正文是对参数的直接引用。 |
2 | 生成器创建的消息具有一个 body,该 body 是一个新数组,其中包含参数中的 bytes 副本。 |
3 | 生成器创建的消息具有一个 body,该 body 是一个新数组,其中包含参数中的字节范围。
看Arrays.copyOfRange() 了解更多详情。 |
4 | 生成器创建的消息具有一个 body,该 body 是对参数 body 的直接引用。
参数的属性将复制到新的MessageProperties 对象。 |
5 | 生成器创建的消息具有一个 body,该 body 是一个包含参数 body 副本的新数组。
参数的属性将复制到新的MessageProperties 对象。 |
提供了三种静态方法来创建一个MessagePropertiesBuilder
实例:
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 | 使用默认值初始化新的 message properties 对象。 |
2 | 构建器初始化为 和build() 将返回提供的 properties 对象。 |
3 | 参数的属性将复制到新的MessageProperties 对象。 |
使用RabbitTemplate
实现AmqpTemplate
,每个send()
methods 具有一个重载版本,该版本需要额外的CorrelationData
对象。
启用 publisher 确认后,此对象将在 中描述的回调中返回AmqpTemplate
.
这允许发件人关联确认 (ack
或nack
) 替换为已发送的消息。
从版本 1.6.7 开始,CorrelationAwareMessagePostProcessor
接口,允许在转换消息后修改关联数据。
以下示例演示如何使用它:
Message postProcessMessage(Message message, Correlation correlation);
在版本 2.0 中,此接口已弃用。
该方法已移至MessagePostProcessor
替换为默认实现,该实现委托给postProcessMessage(Message message)
.
同样从版本 1.6.7 开始,一个名为CorrelationDataPostProcessor
。
毕竟 this 被调用MessagePostProcessor
实例(在send()
方法以及setBeforePublishPostProcessors()
).
实现可以更新或替换send()
方法(如果有)。
这Message
和原始CorrelationData
(如果有)作为参数提供。
以下示例演示如何使用postProcess
方法:
CorrelationData postProcess(Message message, CorrelationData correlationData);
发布者返回
当模板的mandatory
property 为true
,则返回的消息由 中描述的回调提供AmqpTemplate
.
从版本 1.4 开始,RabbitTemplate
支持 SPELmandatoryExpression
属性,该属性作为根评估对象针对每个请求消息进行评估,解析为boolean
价值。
Bean 引用,例如@myBean.isMandatory(#root)
,可以在表达式中使用。
发布者返回也可以由RabbitTemplate
在 Send 和 Receive作中。
有关更多信息,请参阅 Reply Timeout (回复超时)。
配料
版本 1.4.2 引入了BatchingRabbitTemplate
.
这是RabbitTemplate
替换为send
方法,该方法根据BatchingStrategy
.
只有当批处理完成时,才会将消息发送到 RabbitMQ。
下面的清单显示了BatchingStrategy
接口定义:
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
批处理数据保存在内存中。 如果系统发生故障,未发送的消息可能会丢失。 |
一个SimpleBatchingStrategy
。
它支持将消息发送到单个 exchange 或 routing key。
它具有以下属性:
-
batchSize
:在发送批次之前,批次中的消息数。 -
bufferLimit
:批处理消息的最大大小。 这会抢占batchSize
,并导致发送部分批处理。 -
timeout
:当没有新活动向批次添加消息时,将发送部分批次的时间。
这SimpleBatchingStrategy
通过在每条嵌入消息前面使用四字节二进制长度来格式化批处理。
这是通过设置springBatchFormat
message 属性设置为lengthHeader4
.
默认情况下,批处理消息由侦听器容器自动取消批处理(通过使用springBatchFormat 消息标头)。
拒绝来自批处理的任何消息将导致整个批处理被拒绝。 |
但是,有关更多信息,请参阅使用 Batching @RabbitListener。