AMQP 支持

AMQP (RabbitMQ) 支持

Spring 集成通过使用高级消息队列协议 (AMQP) 提供用于接收和发送消息的通道适配器。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

Maven 系列
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-amqp:6.0.9"

以下适配器可用:spring-doc.cadn.net.cn

Spring 集成还提供了一个点对点的消息通道和一个由 AMQP 交换和队列支持的发布-订阅消息通道。spring-doc.cadn.net.cn

为了提供 AMQP 支持, Spring 集成依赖于 (Spring AMQP),它将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring AMQP 提供与 (Spring JMS) 类似的语义。spring-doc.cadn.net.cn

虽然提供的 AMQP 通道适配器仅用于单向消息传递(发送或接收),但 Spring 集成还为请求-回复作提供了入站和出站 AMQP 网关。spring-doc.cadn.net.cn

提示: 您应该熟悉 Spring AMQP 项目的参考文档。 它提供了有关 Spring 与 AMQP 的集成(特别是 RabbitMQ)的更深入的信息。spring-doc.cadn.net.cn

入站通道适配器

下面的清单显示了 AMQP 入站通道适配器的可能配置选项:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
Java
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
XML 格式
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  tx-size=""                      (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)

<1> The unique ID for this adapter.
Optional.
<2> Message channel to which converted messages should be sent.
Required.
<3> Names of the AMQP queues (comma-separated list) from which messages should be consumed.
Required.
<4> Acknowledge mode for the `MessageListenerContainer`.
When set to `MANUAL`, the delivery tag and channel are provided in message headers `amqp_deliveryTag` and `amqp_channel`, respectively.
The user application is responsible for acknowledgement.
`NONE` means no acknowledgements (`autoAck`).
`AUTO` means the adapter's container acknowledges when the downstream flow completes.
Optional (defaults to AUTO).
See <<amqp-inbound-ack>>.
<5> Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter.
Optional.
<6> Flag to indicate that channels created by this component are transactional.
If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback.
Optional (Defaults to false).
<7> Specify the number of concurrent consumers to create.
The default is `1`.
We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.
However, note that any ordering guarantees are lost once multiple consumers are registered.
In general, use one consumer for low-volume queues.
Not allowed when 'consumers-per-queue' is set.
Optional.
<8> Bean reference to the RabbitMQ `ConnectionFactory`.
Optional (defaults to `connectionFactory`).
<9> Message channel to which error messages should be sent.
Optional.
<10> Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered `ChannelAwareMessageListener`.
Optional (defaults to true).
<11> A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as `contentType`) are copied to Spring Integration `MessageHeaders`.
Any user-defined headers within the AMQP `MessageProperties` are NOT copied to the message by the default `DefaultAmqpHeaderMapper`.
Not allowed if 'request-header-names' is provided.
<12> Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.
This can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (such as "\*" or "thing1*, thing2" or "*something").
<13> Reference to the `AbstractMessageListenerContainer` to use for receiving AMQP Messages.
If this attribute is provided, no other attribute related to the listener container configuration should be provided.
In other words, by setting this reference, you must take full responsibility for the listener container configuration.
The only exception is the `MessageListener` itself.
Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own `MessageListener`.
Optional.
<14> The `MessageConverter` to use when receiving AMQP messages.
Optional.
<15> The `MessagePropertiesConverter` to use when receiving AMQP messages.
Optional.
<16> Specifies the phase in which the underlying `AbstractMessageListenerContainer` should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.
By default, this value is `Integer.MAX_VALUE`, meaning that this container starts as late as possible and stops as soon as possible.
Optional.
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
Optional (defaults to `1`).
<18> Receive timeout in milliseconds.
Optional (defaults to `1000`).
<19> Specifies the interval between recovery attempts of the underlying `AbstractMessageListenerContainer` (in milliseconds).
Optional (defaults to `5000`).
<20> If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If `false`, the container does not throw an exception and goes into recovery mode, attempting to restart according to the `recovery-interval`.
Optional (defaults to `true`).
<21> The time to wait for workers (in milliseconds) after the underlying `AbstractMessageListenerContainer` is stopped and before the AMQP connection is forced closed.
If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.
Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).
Optional (defaults to `5000`).
<22> By default, the underlying `AbstractMessageListenerContainer` uses a `SimpleAsyncTaskExecutor` implementation, that fires up a new thread for each task, running it asynchronously.
By default, the number of concurrent threads is unlimited.
Note that this implementation does not reuse threads.
Consider using a thread-pooling `TaskExecutor` implementation as an alternative.
Optional (defaults to `SimpleAsyncTaskExecutor`).
<23> By default, the underlying `AbstractMessageListenerContainer` creates a new instance of the `DefaultTransactionAttribute` (it takes the EJB approach to rolling back on runtime but not checked exceptions).
Optional (defaults to `DefaultTransactionAttribute`).
<24> Sets a bean reference to an external `PlatformTransactionManager` on the underlying `AbstractMessageListenerContainer`.
The transaction manager works in conjunction with the `channel-transacted` attribute.
If there is already a transaction in progress when the framework is sending or receiving a message and the `channelTransacted` flag is `true`, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the `channelTransacted` flag is `false`, no transaction semantics apply to the messaging operation (it is auto-acked).
For further information, see
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
Optional.
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`).
<26> Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`.
See the https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP Reference Manual] for more information.
<27> When the container's `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.
When set to `MESSAGES` (default), the payload is a `List<Message<?>>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.
When set to `EXTRACT_PAYLOADS`, the payload is a `List<?>` where the elements are converted from the AMQP `Message` body.
`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List<Map<String, Object>` at the corresponding index; the header name is `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`.
容器

请注意,使用 XML 配置外部容器时,不能使用 Spring AMQP 名称空间来定义容器。 这是因为命名空间至少需要一个<listener/>元素。 在此环境中,侦听器是适配器的内部。 因此,您必须使用普通的 Spring 来定义容器<bean/>定义,如下例所示:spring-doc.cadn.net.cn

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
尽管 Spring 集成 JMS 和 AMQP 支持相似,但存在重要差异。 JMS 入站通道适配器正在使用JmsDestinationPollingSource并期望一个配置的 Poller 。 AMQP 入站通道适配器使用AbstractMessageListenerContainer并且是信息驱动的。 在这方面,它更类似于 JMS 消息驱动的通道适配器。

从版本 5.5 开始,AmqpInboundChannelAdapter可以配置org.springframework.amqp.rabbit.retry.MessageRecoverer策略,该策略用于RecoveryCallback当内部调用 retry作时。 看setMessageRecoverer()JavaDocs 了解更多信息。spring-doc.cadn.net.cn

在侦听器容器中使用独占或单活动使用者时,建议您设置 container 属性forceStoptrue. 这将防止出现争用情况,即在停止容器后,另一个使用者可以在此实例完全停止之前开始使用消息。

批量消息

有关批处理消息的更多信息,请参阅 Spring AMQP 文档spring-doc.cadn.net.cn

要使用 Spring 集成生成批处理消息,只需使用BatchingRabbitTemplate.spring-doc.cadn.net.cn

在接收批处理消息时,默认情况下,侦听器容器会提取每条片段消息,适配器将生成一个Message<?>对于每个片段。 从版本 5.2 开始,如果容器的deBatchingEnabled属性设置为false,则取消批处理由适配器执行,并且单个Message<List<?>>生成时,有效负载是片段有效负载的列表(如果合适,则在转换后)。spring-doc.cadn.net.cn

默认的BatchingStrategySimpleBatchingStrategy,但这可以在适配器上覆盖。spring-doc.cadn.net.cn

org.springframework.amqp.rabbit.retry.MessageBatchRecoverer当重试作需要恢复时,必须与 Batch 一起使用。

轮询的入站通道适配器

概述

版本 5.0.1 引入了轮询通道适配器,允许您按需获取单个消息 — 例如,使用MessageSourcePollingTemplate或轮询器。 有关更多信息,请参阅 Deferred Acknowledgment Pollable Message Source (延迟确认轮询消息源) 以了解更多信息。spring-doc.cadn.net.cn

它当前不支持 XML 配置。spring-doc.cadn.net.cn

以下示例说明如何配置AmqpMessageSource:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
                    e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
            .handle(p -> {
                ...
            })
            .get();
}
Java
@Bean
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
    return new AmqpMessageSource(connectionFactory, "someQueue");
}

有关配置属性,请参阅 Javadocspring-doc.cadn.net.cn

XML 格式
This adapter currently does not have XML configuration support.

批量消息

对于轮询的适配器,没有侦听器容器,批处理的消息始终是取消批处理的(如果BatchingStrategy支持这样做)。spring-doc.cadn.net.cn

入站网关

入站网关支持入站通道适配器上的所有属性(除了 'channel' 被 'request-channel' 替换),以及一些其他属性。 以下清单显示了可用的属性:spring-doc.cadn.net.cn

Java DSL
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))
            .transform(String.class, String::toUpperCase)
            .get();
}
Java
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
    gateway.setRequestChannel(channel);
    gateway.setDefaultReplyTo("bar");
    return gateway;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                    new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("foo");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new AbstractReplyProducingMessageHandler() {

        @Override
        protected Object handleRequestMessage(Message<?> requestMessage) {
            return "reply to " + requestMessage.getPayload();
        }

    };
}
XML 格式
<int-amqp:inbound-gateway
                          id="inboundGateway"                (1)
                          request-channel="myRequestChannel" (2)
                          header-mapper=""                   (3)
                          mapped-request-headers=""          (4)
                          mapped-reply-headers=""            (5)
                          reply-channel="myReplyChannel"     (6)
                          reply-timeout="1000"               (7)
                          amqp-template=""                   (8)
                          default-reply-to="" />             (9)
1 此适配器的唯一 ID。 自选。
2 将转换后的消息发送到的消息通道。 必填。
3 AmqpHeaderMapper在接收 AMQP 消息时使用。 自选。 默认情况下,只有标准的 AMQP 属性(例如contentType) 复制到 Spring 集成或从 Spring 集成复制MessageHeaders. AMQP 中的任何用户定义的标头MessageProperties默认情况下,不会复制到 AMQP 消息或从 AMQP 消息复制DefaultAmqpHeaderMapper. 如果提供了 'request-header-names' 或 'reply-header-names',则不允许。
4 要从 AMQP 请求映射到MessageHeaders. 仅当未提供 'header-mapper' 引用时,才能提供此属性。 此列表中的值也可以是与 Headers 名称匹配的简单模式(例如 或"*""thing1*, thing2""*thing1").
5 以逗号分隔的名称列表MessageHeaders映射到 AMQP 回复消息的 AMQP 消息属性中。 所有标准标头(例如contentType) 映射到 AMQP 消息属性,而用户定义的标头映射到 'headers' 属性。 只有在未提供 'header-mapper' 引用时才能提供此属性。 此列表中的值也可以是要与标头名称匹配的简单模式(例如,或"*""foo*, bar""*foo").
6 需要回复 Messages 的消息通道。 自选。
7 设置receiveTimeout在底层o.s.i.core.MessagingTemplate用于接收来自 Reply Channel 的消息。 如果未指定,则此属性默认为1000(1 秒)。 仅当容器线程在发送回复之前移交给另一个线程时适用。
8 定制的AmqpTemplateBean 引用(以便更好地控制要发送的回复消息)。 您可以提供RabbitTemplate.
9 replyTo o.s.amqp.core.AddressrequestMessage没有replyTo财产。 如果未指定此选项,则为 noamqp-template,则没有replyTo属性存在于请求消息中,并且 一IllegalStateException被抛出,因为无法路由回复。 如果未指定此选项,并且外部amqp-template时,不会引发异常。 您必须指定此选项或配置默认值exchangeroutingKey在该模板上, 如果您预见到没有replyTo属性。

请参阅 Inbound Channel Adapter 中有关配置listener-container属性。spring-doc.cadn.net.cn

从版本 5.5 开始,AmqpInboundChannelAdapter可以配置org.springframework.amqp.rabbit.retry.MessageRecoverer策略,该策略用于RecoveryCallback当内部调用 retry作时。 看setMessageRecoverer()JavaDocs 了解更多信息。spring-doc.cadn.net.cn

入站终端节点确认模式

默认情况下,入站终端节点使用AUTOacknowledge 模式,这意味着容器会在下游集成流完成时自动确认消息(或者使用QueueChannelExecutorChannel). 将 mode 设置为NONE配置使用者,以便根本不使用确认(代理会在消息发送后立即自动确认消息)。 将 mode 设置为MANUAL允许用户代码在处理过程中的其他时间点确认消息。 为了支持这一点,在此模式下,终端节点提供了ChanneldeliveryTagamqp_channelamqp_deliveryTag标头。spring-doc.cadn.net.cn

您可以在Channel但是,一般来说,只有basicAckbasicNack(或basicReject) 使用。 为了不干扰容器的作,您不应保留对通道的引用,而只能在当前消息的上下文中使用它。spring-doc.cadn.net.cn

由于Channel是对 “live” 对象的引用,则无法序列化,如果持久保存消息,则 IT 将丢失。

以下示例显示了如何使用MANUAL确认:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {

    // Do some processing

    if (allOK) {
        channel.basicAck(deliveryTag, false);

        // perhaps do some more processing

    }
    else {
        channel.basicNack(deliveryTag, false, true);
    }
    return someResultForDownStreamProcessing;
}

出站终端节点

以下出站终端节点具有许多类似的配置选项。 从版本 5.2 开始,confirm-timeout已添加。 通常,当 publisher 确认启用时,broker 将快速返回一个 ack(或 nack),该 ack 将被发送到相应的通道。 如果在收到确认之前关闭了通道,则 Spring AMQP 框架将合成一个 nack。 “缺失”ack 不应发生,但是,如果您设置了此属性,则端点将定期检查它们,并在时间过后未收到确认时合成 nack。spring-doc.cadn.net.cn

出站通道适配器

以下示例显示了 AMQP 出站通道适配器的可用属性:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
        MessageChannel amqpOutboundChannel) {
    return IntegrationFlow.from(amqpOutboundChannel)
            .handle(Amqp.outboundAdapter(amqpTemplate)
                        .routingKey("queue1")) // default exchange - route to queue 'queue1'
            .get();
}
Java
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}
XML 格式
<int-amqp:outbound-channel-adapter id="outboundAmqp"             (1)
                               channel="outboundChannel"         (2)
                               amqp-template="myAmqpTemplate"    (3)
                               exchange-name=""                  (4)
                               exchange-name-expression=""       (5)
                               order="1"                         (6)
                               routing-key=""                    (7)
                               routing-key-expression=""         (8)
                               default-delivery-mode""           (9)
                               confirm-correlation-expression="" (10)
                               confirm-ack-channel=""            (11)
                               confirm-nack-channel=""           (12)
                               confirm-timeout=""                (13)
                               wait-for-confirm=""               (14)
                               return-channel=""                 (15)
                               error-message-strategy=""         (16)
                               header-mapper=""                  (17)
                               mapped-request-headers=""         (18)
                               lazy-connect="true"               (19)
                               multi-send="false"/>              (20)
1 此适配器的唯一 ID。 自选。
2 消息通道,应将消息发送到该通道,以便将消息转换并发布到 AMQP 交换。 必填。
3 对已配置的 AMQP 模板的 Bean 引用。 可选(默认为amqpTemplate).
4 消息发送到的 AMQP 交换的名称。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name-expression' 互斥。 自选。
5 一个 SPEL 表达式,用于确定消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name' 互斥。 自选。
6 注册多个使用者时此使用者的顺序,从而启用负载均衡和故障转移。 可选(默认为Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
7 发送消息时使用的固定 routing-key。 默认情况下,这是一个空的String. 与 'routing-key-expression' 互斥。 自选。
8 一个 SPEL 表达式,用于确定发送消息时要使用的路由键,并将消息作为根对象(例如,'payload.key')。 默认情况下,这是一个空的String. 与 'routing-key' 互斥。 自选。
9 消息的默认传递模式:PERSISTENTNON_PERSISTENT. 如果header-mapper设置传递模式。 如果 Spring Integration 消息标头amqp_deliveryMode存在时,DefaultHeaderMapper设置值。 如果未提供此属性并且 Headers 映射器未设置它,则默认值取决于底层的 Spring AMQPMessagePropertiesConverterRabbitTemplate. 如果根本没有自定义,则默认值为PERSISTENT. 自选。
10 定义相关数据的表达式。 如果提供,这会将底层 AMQP 模板配置为接收发布者确认。 需要专用的RabbitTemplate以及CachingConnectionFactory使用publisherConfirms属性设置为true. 收到发布者确认并提供关联数据后,该确认将写入confirm-ack-channelconfirm-nack-channel,具体取决于确认类型。 确认的有效负载是相关数据,如此表达式所定义。 邮件的 'amqp_publishConfirm' 报头设置为true (ack) 或false (nack). 例子:headers['myCorrelationData']payload. 版本 4.1 引入了amqp_publishConfirmNackCause消息标头。 它包含cause的 'nack' 用于出版商确认。 从版本 4.2 开始,如果表达式解析为Message<?>实例(例如#this),则在ack/nackchannel 基于该消息,并添加了其他标头。 以前,无论类型如何,都会使用关联数据作为其负载创建新消息。 另请参阅发布者确认和返回的替代机制。 自选。
11 正 (ack) 发布者确认。 有效负载是由confirm-correlation-expression. 如果表达式为#root#this,则消息是从原始消息构建的,其中amqp_publishConfirmheader 设置为true. 另请参阅发布者确认和返回的替代机制。 可选(默认值为nullChannel).
12 负数 (nack) 发布者确认。 有效负载是由confirm-correlation-expression(如果没有ErrorMessageStrategyconfigured)。 如果表达式为#root#this,则消息是从原始消息构建的,其中amqp_publishConfirmheader 设置为false. 当存在ErrorMessageStrategy,则消息是一个ErrorMessage替换为NackedAmqpMessageException有效载荷。 另请参阅发布者确认和返回的替代机制。 可选(默认值为nullChannel).
13 设置后,如果在此时间(以毫秒为单位)内未收到发布者确认,则适配器将合成否定确认 (nack)。 待处理确认每检查一次此值的 50%,因此发送 nack 的实际时间将介于此值的 1 倍到 1.5 倍之间。 另请参阅发布者确认和返回的替代机制。 默认无(不会生成 nack)。
14 当设置为 true 时,调用线程将阻塞,等待发布者确认。 这需要RabbitTemplate为 Confirms 以及confirm-correlation-expression. 线程将阻塞长达confirm-timeout(或默认为 5 秒)。 如果发生超时,则MessageTimeoutException将被抛出。 如果启用了返回并返回了一条消息,或者在等待确认时发生任何其他异常,则MessageHandlingException将引发,并显示相应的消息。
15 返回的消息发送到的通道。 如果提供,则底层 AMQP 模板配置为将无法传递的消息返回给适配器。 当没有ErrorMessageStrategy配置后,该消息是根据从 AMQP 接收的数据构建的,并具有以下附加标头:amqp_returnReplyCode,amqp_returnReplyText,amqp_returnExchange,amqp_returnRoutingKey. 当存在ErrorMessageStrategy,则消息是一个ErrorMessage替换为ReturnedAmqpMessageException有效载荷。 另请参阅发布者确认和返回的替代机制。 自选。
16 ErrorMessageStrategy用于构建的 implementation used to buildErrorMessage实例。
17 AmqpHeaderMapper在发送 AMQP 消息时使用。 默认情况下,只有标准的 AMQP 属性(例如contentType) 复制到 Spring 集成MessageHeaders. 默认的'DefaultAmqpHeaderMapper'不会将任何用户定义的 Headers 复制到消息中。 如果提供了 'request-header-names',则不允许。 自选。
18 要从MessageHeaders添加到 AMQP 消息中。 如果提供了 'header-mapper' 引用,则不允许。 此列表中的值也可以是与 Headers 名称匹配的简单模式(例如 或"*""thing1*, thing2""*thing1").
19 当设置为false时,终端节点将尝试在应用程序上下文初始化期间连接到代理。 这允许对错误配置进行 “fail fast” 检测,但如果 broker 宕机,也会导致初始化失败。 什么时候true(默认值),则在发送第一条消息时建立连接(如果连接尚不存在,因为其他组件建立了连接)。
20 当设置为true、类型为Iterable<Message<?>>将作为离散消息在同一个通道上的单个RabbitTemplate调用。 需要RabbitTemplate. 什么时候wait-for-confirms为 true,RabbitTemplate.waitForConfirmsOrDie()在发送消息后调用。 使用事务模板,将在新事务或已启动的事务(如果存在)中执行发送。
return-channel (返回通道)

使用return-channel需要RabbitTemplate使用mandatory属性设置为true以及CachingConnectionFactory使用publisherReturns属性设置为true. 当使用多个带有返回的出站终端节点时,单独的RabbitTemplate每个终端节点都需要。spring-doc.cadn.net.cn

出站网关

以下清单显示了 AMQP 出站网关的可能属性:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
    return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
                    .routingKey("foo")) // default exchange - route to queue 'foo'
            .get();
}

@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {

    String sendToRabbit(String data);

}
Java
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setExpectReply(true);
    outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {

    String sendToRabbit(String data);

}
XML 格式
<int-amqp:outbound-gateway id="outboundGateway"               (1)
                           request-channel="myRequestChannel" (2)
                           amqp-template=""                   (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           error-message-strategy=""          (18)
                           lazy-connect="true" />             (19)
1 此适配器的唯一 ID。 自选。
2 消息通道,将消息发送到该通道,以便将消息转换并发布到 AMQP 交换。 必填。
3 对已配置的 AMQP 模板的 Bean 引用。 可选(默认为amqpTemplate).
4 应将消息发送到的 AMQP 交换的名称。 如果未提供,则消息将发送到默认的 no-name cxchange。 与 'exchange-name-expression' 互斥。 自选。
5 一个 SPEL 表达式,用于确定应将消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name' 互斥。 自选。
6 注册多个使用者时此使用者的顺序,从而启用负载均衡和故障转移。 可选(默认为Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
7 从 AMQP 队列接收回复并进行转换后,应将回复发送到的消息通道。 自选。
8 网关在将回复消息发送到reply-channel. 这仅适用于reply-channelcan 阻止 — 例如QueueChannel容量限制当前已满。 默认为 infinity。
9 什么时候true,如果AmqpTemplate’s `replyTimeout财产。 默认为true.
10 routing-key以在发送消息时使用。 默认情况下,这是一个空的String. 与 'routing-key-expression' 互斥。 自选。
11 一个 SpEL 表达式,用于确定routing-key在发送消息时使用,将消息作为根对象(例如,'payload.key')。 默认情况下,这是一个空的String. 与 'routing-key' 互斥。 自选。
12 消息的默认传递模式:PERSISTENTNON_PERSISTENT. 如果header-mapper设置传递模式。 如果 Spring Integration 消息标头amqp_deliveryMode存在时,DefaultHeaderMapper设置值。 如果未提供此属性并且 Headers 映射器未设置它,则默认值取决于底层的 Spring AMQPMessagePropertiesConverterRabbitTemplate. 如果根本没有自定义,则默认值为PERSISTENT. 自选。
13 从 4.2 版本开始。 定义关联数据的表达式。 如果提供,这会将底层 AMQP 模板配置为接收发布者确认。 需要专用的RabbitTemplate以及CachingConnectionFactory使用publisherConfirms属性设置为true. 收到发布者确认并提供关联数据后,该数据将写入confirm-ack-channelconfirm-nack-channel,具体取决于确认类型。 确认的有效负载是此表达式定义的关联数据。 邮件的标头 'amqp_publishConfirm' 设置为true (ack) 或false (nack). 为nackconfirmations,则 Spring 集成提供了一个额外的 Headersamqp_publishConfirmNackCause. 例子:headers['myCorrelationData']payload. 如果表达式解析为Message<?>实例(例如#this)、消息 在ack/nackchannel 基于该消息,并添加了其他标头。 以前,无论类型如何,都会使用关联数据作为其负载创建新消息。 另请参阅发布者确认和返回的替代机制。 自选。
14 正 (ack) 发布者确认。 有效负载是由confirm-correlation-expression. 如果表达式为#root#this,则消息是从原始消息构建的,其中amqp_publishConfirmheader 设置为true. 另请参阅发布者确认和返回的替代机制。 可选(默认值为nullChannel).
15 负数 (nack) 发布者确认。 有效负载是由confirm-correlation-expression(如果没有ErrorMessageStrategyconfigured)。 如果表达式为#root#this,则消息是从原始消息构建的,其中amqp_publishConfirmheader 设置为false. 当存在ErrorMessageStrategy,则消息是一个ErrorMessage替换为NackedAmqpMessageException有效载荷。 另请参阅发布者确认和返回的替代机制。 可选(默认值为nullChannel).
16 设置后,如果在此时间(以毫秒为单位)内未收到发布者确认,网关将合成否定确认 (nack)。 待处理确认每检查一次此值的 50%,因此发送 nack 的实际时间将介于此值的 1 倍到 1.5 倍之间。 默认无(不会生成 nack)。
17 返回的消息发送到的通道。 如果提供,则底层 AMQP 模板配置为将无法传递的消息返回给适配器。 当没有ErrorMessageStrategy配置后,该消息是根据从 AMQP 接收的数据构建的,并具有以下附加标头:amqp_returnReplyCode,amqp_returnReplyText,amqp_returnExchangeamqp_returnRoutingKey. 当存在ErrorMessageStrategy,则消息是一个ErrorMessage替换为ReturnedAmqpMessageException有效载荷。 另请参阅发布者确认和返回的替代机制。 自选。
18 ErrorMessageStrategy用于构建的 implementation used to buildErrorMessage实例。
19 当设置为false时,终端节点将尝试在应用程序上下文初始化期间连接到代理。 这允许在代理关闭时通过记录错误消息来“快速失败”检测错误配置。 什么时候true(默认值),则在发送第一条消息时建立连接(如果连接尚不存在,因为其他组件建立了连接)。
return-channel (返回通道)

使用return-channel需要RabbitTemplate使用mandatory属性设置为true以及CachingConnectionFactory使用publisherReturns属性设置为true. 当使用多个带有返回的出站终端节点时,单独的RabbitTemplate每个终端节点都需要。spring-doc.cadn.net.cn

底层AmqpTemplate具有默认的replyTimeout的 5 秒。 如果需要更长的超时时间,则必须在template.

请注意,出站适配器和出站网关配置之间的唯一区别是expectReply财产。spring-doc.cadn.net.cn

异步出站网关

上一节中讨论的网关是同步的,因为发送线程将暂停,直到 收到回复(或发生超时)。 Spring 集成版本 4.3 添加了一个异步网关,它使用AsyncRabbitTemplate来自 Spring AMQP。 发送消息时,线程在发送作完成后立即返回,收到消息时,将在模板的侦听器容器线程上发送回复。 当在 poller 线程上调用网关时,这可能很有用。 线程已释放,可用于框架中的其他任务。spring-doc.cadn.net.cn

以下清单显示了 AMQP 异步出站网关的可能配置选项:spring-doc.cadn.net.cn

Java DSL
@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
Java
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
XML 格式
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 此适配器的唯一 ID。 自选。
2 消息通道,消息应发送到该通道,以便将它们转换并发布到 AMQP 交换。 必填。
3 Bean 引用配置的AsyncRabbitTemplate. 可选(默认为asyncRabbitTemplate).
4 应将消息发送到的 AMQP 交换的名称。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name-expression' 互斥。 自选。
5 一个 SPEL 表达式,用于确定消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name' 互斥。 自选。
6 注册多个使用者时此使用者的顺序,从而启用负载均衡和故障转移。 可选(默认为Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
7 从 AMQP 队列接收回复并进行转换后,应将回复发送到的消息通道。 自选。
8 网关在将回复消息发送到reply-channel. 这仅适用于reply-channelcan 阻止 — 例如QueueChannel容量限制当前已满。 默认值为 infinity。
9 AsyncRabbitTemplate’s `receiveTimeout属性,并且该设置为true,网关会向入站消息的errorChannel页眉。 当AsyncRabbitTemplate’s `receiveTimeout属性,并且该设置为false,网关会向默认的errorChannel(如果可用)。 它默认为true.
10 发送 Messages 时使用的 routing-key。 默认情况下,这是一个空的String. 与 'routing-key-expression' 互斥。 自选。
11 一个 SPEL 表达式,用于确定发送消息时要使用的路由密钥。 将消息作为根对象(例如,'payload.key')。 默认情况下,这是一个空的String. 与 'routing-key' 互斥。 自选。
12 消息的默认传递模式:PERSISTENTNON_PERSISTENT. 如果header-mapper设置传递模式。 如果 Spring 集成消息头 (amqp_deliveryMode) 存在,则DefaultHeaderMapper设置值。 如果未提供此属性并且 Headers 映射器未设置它,则默认值取决于底层的 Spring AMQPMessagePropertiesConverterRabbitTemplate. 如果未自定义,则默认值为PERSISTENT. 自选。
13 定义相关数据的表达式。 如果提供,这会将底层 AMQP 模板配置为接收发布者确认。 需要专用的RabbitTemplate以及CachingConnectionFactory及其publisherConfirms属性设置为true. 收到发布者确认并提供关联数据后,确认将写入confirm-ack-channelconfirm-nack-channel,具体取决于确认类型。 确认的有效负载是此表达式定义的关联数据,并且消息的 'amqp_publishConfirm' 标头设置为true (ack) 或false (nack). 为nack实例、附加标头 (amqp_publishConfirmNackCause) 提供。 例子:headers['myCorrelationData'],payload. 如果表达式解析为Message<?>实例(例如 “#this”)上,则在ack/nackchannel 基于该消息,并添加了其他标头。 另请参阅发布者确认和返回的替代机制。 自选。
14 正 (ack) 发布者确认。 有效负载是由confirm-correlation-expression. 需要底层AsyncRabbitTemplate使其enableConfirms属性设置为true. 另请参阅发布者确认和返回的替代机制。 可选(默认值为nullChannel).
15 从 4.2 版本开始。 负数 (nack) 发布者确认。 有效负载是由confirm-correlation-expression. 需要底层AsyncRabbitTemplate使其enableConfirms属性设置为true. 另请参阅发布者确认和返回的替代机制。 可选(默认值为nullChannel).
16 设置后,如果在此时间(以毫秒为单位)内未收到发布者确认,网关将合成否定确认 (nack)。 待处理确认每检查一次此值的 50%,因此发送 nack 的实际时间将介于此值的 1 倍到 1.5 倍之间。 另请参阅发布者确认和返回的替代机制。 默认无(不会生成 nack)。
17 返回的消息发送到的通道。 如果提供,则底层 AMQP 模板配置为将无法送达的消息返回到网关。 该消息是根据从 AMQP 接收的数据构建的,并带有以下附加标头:amqp_returnReplyCode,amqp_returnReplyText,amqp_returnExchangeamqp_returnRoutingKey. 需要底层AsyncRabbitTemplate使其mandatory属性设置为true. 另请参阅发布者确认和返回的替代机制。 自选。
18 当设置为false时,终端节点会尝试在应用程序上下文初始化期间连接到代理。 这样做可以在代理宕机时记录错误消息,从而“快速失败”检测错误配置。 什么时候true(默认值),则连接已建立(如果由于已建立其他组件而尚不存在 it) 的 intent 调用。

有关更多信息,另请参阅 Asynchronous Service Activatorspring-doc.cadn.net.cn

RabbitTemplate (兔模板)

当您使用 Confirmation 和 Returns 时,我们建议RabbitTemplate连接到AsyncRabbitTemplate要敬业。 否则,可能会遇到意想不到的副作用。spring-doc.cadn.net.cn

发布者确认和返回的替代机制

当连接工厂配置为发布者确认和返回时,上面的部分讨论了消息通道的配置,以异步接收确认和返回。 从版本 5.4 开始,有一个通常更易于使用的附加机制。spring-doc.cadn.net.cn

在这种情况下,请勿配置confirm-correlation-expression或 Confirm 和 Return 渠道。 相反,请添加CorrelationData实例中的AmqpHeaders.PUBLISH_CONFIRM_CORRELATION页眉;然后,您可以稍后等待结果,方法是在CorrelationData您已为其发送消息的实例。 这returnedMessage字段将始终在 Future 完成之前填充(如果返回消息)。spring-doc.cadn.net.cn

CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns
someFlow.getInputChannel().send(MessageBuilder.withPayload("test")
        .setHeader("rk", "someKeyThatWontRoute")
        .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
        .build());
...
try {
    Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);
    Message returned = corr.getReturnedMessage();
    if (returned !- null) {
        // message could not be routed
    }
}
catch { ... }

为了提高性能,您可能希望发送多条消息并稍后等待确认,而不是一次发送一条消息。 返回的消息是转换后的原始消息;您可以子类 ACorrelationData使用您需要的任何额外数据。spring-doc.cadn.net.cn

入站消息转换

到达通道适配器或网关的入站消息将转换为spring-messaging Message<?>payload 使用消息转换器。 默认情况下,SimpleMessageConverter,它处理 Java 序列化和文本。 标头使用DefaultHeaderMapper.inboundMapper()默认情况下。 如果发生转换错误,并且没有定义错误通道,则异常将抛出到容器中,并由侦听器容器的错误处理程序处理。 默认错误处理程序将 conversion 错误视为致命错误,并且消息将被拒绝(并路由到死信交换,如果队列是这样配置的)。 如果定义了错误通道,则ErrorMessagepayload 是一个ListenerExecutionFailedExceptionwith 属性failedMessage(无法转换的 Spring AMQP 消息)和cause. 如果容器AcknowledgeModeAUTO(默认值)并且 error 流使用错误而不引发异常,则将确认原始消息。 如果错误流引发异常,则异常类型将与容器的错误处理程序一起确定是否将消息重新排队。 如果容器配置了AcknowledgeMode.MANUAL,则有效负载为ManualAckListenerExecutionFailedException具有其他属性channeldeliveryTag. 这使错误流能够调用basicAckbasicNack(或basicReject) 来控制其处置。spring-doc.cadn.net.cn

出站消息转换

Spring AMQP 1.4 引入了ContentTypeDelegatingMessageConverter,其中实际转换器的选择基于 在 Incoming Content Type Message 属性上。 入站终端节点可以使用此 API。spring-doc.cadn.net.cn

从 Spring 集成版本 4.3 开始,您可以使用ContentTypeDelegatingMessageConverter在出站终端节点上,使用contentType标头指定使用哪个转换器。spring-doc.cadn.net.cn

以下示例将ContentTypeDelegatingMessageConverter,默认转换器为SimpleMessageConverter(处理 Java 序列化和纯文本)以及 JSON 转换器:spring-doc.cadn.net.cn

<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
                               exchange-name="someExchange"
                               routing-key="someKey"
                               amqp-template="amqpTemplateContentTypeConverter" />

<int:channel id="ctRequestChannel"/>

<rabbit:template id="amqpTemplateContentTypeConverter"
        connection-factory="connectionFactory" message-converter="ctConverter" />

<bean id="ctConverter"
        class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json">
                <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
            </entry>
        </map>
    </property>
</bean>

发送消息到ctRequestChannel使用contentTypeheader 设置为application/json导致选择 JSON 转换器。spring-doc.cadn.net.cn

这适用于出站通道适配器和网关。spring-doc.cadn.net.cn

从版本 5.0 开始,添加到MessageProperties的出站消息永远不会被映射的标头覆盖(默认情况下)。 以前,仅当消息转换器是ContentTypeDelegatingMessageConverter(在这种情况下,首先映射 Header,以便选择合适的转换器)。 对于其他转换器,例如SimpleMessageConverter,映射的标头会覆盖转换器添加的任何标头。 当出站邮件有一些剩余内容时,这会导致问题contentTypeheaders(可能来自入站通道适配器)和正确的出站contentType被错误地覆盖。 解决方法是在将消息发送到出站终端节点之前使用标头筛选条件删除标头。spring-doc.cadn.net.cn

但是,在某些情况下,需要前一个行为 — 例如,当Stringpayload 中,则SimpleMessageConverter不知道内容,并将contentTypemessage 属性设置为text/plain但您的应用程序希望将其覆盖为application/json通过设置contentType标头。 这ObjectToJsonTransformer确实如此(默认情况下)。spring-doc.cadn.net.cn

现在有一个名为headersMappedLast在出站通道适配器和网关上(以及在 AMQP 支持的通道上)。 将此项设置为true恢复覆盖转换器添加的属性的行为。spring-doc.cadn.net.cn

从版本 5.1.9 开始,类似的replyHeadersMappedLastAmqpInboundGateway当我们生成回复并希望覆盖 Converter 填充的 Headers 时。 有关更多信息,请参阅其 JavaDocs。spring-doc.cadn.net.cn

出站用户 ID

Spring AMQP 版本 1.6 引入了一种机制,允许为出站消息指定默认用户 ID。 我们始终可以将AmqpHeaders.USER_ID标头,现在优先于默认值。 这可能对消息收件人有用。 对于入站消息,如果消息发布者设置了该属性,则该属性将在AmqpHeaders.RECEIVED_USER_ID页眉。 请注意,RabbitMQ 会验证用户 ID 是否是连接的实际用户 ID,或者连接是否允许模拟spring-doc.cadn.net.cn

要为出站消息配置默认用户 ID,请在RabbitTemplate并配置出站适配器或网关以使用该模板。 同样,要在回复上设置用户 ID 属性,请将适当配置的模板注入入站网关。 有关更多信息,请参见 Spring AMQP 文档spring-doc.cadn.net.cn

延迟消息交换

Spring AMQP 支持 RabbitMQ 延迟消息交换插件。 对于入站邮件,x-delayheader 映射到AmqpHeaders.RECEIVED_DELAY页眉。 设置AMQPHeaders.DELAY标头会导致相应的x-delay标头。 您还可以指定delaydelayExpression出站终端节点上的属性 (delay-expression使用 XML 配置时)。 这些属性优先于AmqpHeaders.DELAY页眉。spring-doc.cadn.net.cn

AMQP 支持的消息通道

有两种消息通道实现可用。 一个是点对点的,另一个是发布-订阅。 这两个通道都为底层AmqpTemplateSimpleMessageListenerContainer(如本章前面所示的通道适配器和网关)。 但是,我们在此处显示的示例具有最少的配置。 浏览 XML 架构以查看可用属性。spring-doc.cadn.net.cn

点对点通道可能类似于以下示例:spring-doc.cadn.net.cn

<int-amqp:channel id="p2pChannel"/>

在后台,前面的示例会导致Queuesi.p2pChannel要声明的 URL,并且此通道会向该Queue(从技术上讲,通过发送到 no-name 直接交换,并使用与此 name 匹配的 nameQueue). 此通道还会在该Queue. 如果您希望通道是 “pollable” 而不是消息驱动的,请提供message-driven值为false,如下例所示:spring-doc.cadn.net.cn

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布-订阅通道可能如下所示:spring-doc.cadn.net.cn

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在后台,前面的示例会导致一个名为si.fanout.pubSubChannel要声明,并且此通道将发送到该 fanout 交换。 此通道还声明一个名为 exclusive、auto-delete、non-durable 的服务器Queue并将其绑定到 fanout 交换,同时在该 Consumer上注册 ConsumerQueue以接收消息。 publish-subscribe-channel 没有 “pollable” 选项。 它必须是消息驱动的。spring-doc.cadn.net.cn

从版本 4.1 开始,AMQP 支持的消息通道(与channel-transacted) 支持template-channel-transacted分开transactionalConfiguration 的AbstractMessageListenerContainer和 对于RabbitTemplate. 请注意,在之前的channel-transactedtrue默认情况下。 现在,默认情况下,它是false对于AbstractMessageListenerContainer.spring-doc.cadn.net.cn

在版本 4.3 之前,AMQP 支持的通道仅支持Serializablepayloads 和 headers 的 Headers 中。 整个消息被转换(序列化)并发送到 RabbitMQ。 现在,您可以设置extract-payload属性(或setExtractPayload()when using Java configuration) 设置为true. 当此标志为true,则消息有效负荷将被转换,并且 Headers 将被映射,其方式类似于使用通道适配器时。 这种安排允许 AMQP 支持的通道与不可序列化的有效负载(可能与其他消息转换器一起使用,例如Jackson2JsonMessageConverter). 有关默认映射 Headers 的更多信息,请参阅 AMQP Message Headers。 您可以通过提供使用outbound-header-mapperinbound-header-mapper属性。 现在,您还可以指定default-delivery-mode,用于设置没有amqp_deliveryMode页眉。 默认情况下, Spring AMQPMessageProperties使用PERSISTENT交付模式。spring-doc.cadn.net.cn

与其他支持持久性的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不用于将工作分发给其他对等应用程序。 为此,请改用通道适配器。
从版本 5.0 开始,pollable 通道现在会阻塞指定receiveTimeout(默认值为 1 秒)。 以前,与其他PollableChannel实现中,如果没有可用的消息,则线程会立即返回到调度程序,而不管接收超时如何。 阻止比使用basicGet()检索消息(无超时),因为必须创建一个使用者来接收每条消息。 要恢复之前的行为,请将 Poller 的receiveTimeout设置为 0。

使用 Java 配置进行配置

以下示例显示如何使用 Java 配置配置通道:spring-doc.cadn.net.cn

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 进行配置

以下示例显示如何使用 Java DSL 配置通道:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}

AMQP 消息标头

概述

Spring 集成 AMQP 适配器会自动映射所有 AMQP 属性和头文件。 (这是对 4.3 的更改 - 以前,仅映射标准标头)。 默认情况下,这些属性将复制到 Spring 集成中或从中复制MessageHeaders通过使用DefaultAmqpHeaderMapper.spring-doc.cadn.net.cn

你可以传入你自己的特定于 AMQP 的 Headers 映射器的实现,因为适配器具有支持这样做的属性。spring-doc.cadn.net.cn

AMQP 中的任何用户定义的标头MessageProperties复制到 AMQP 消息或从 AMQP 消息中复制,除非requestHeaderNamesreplyHeaderNames的属性DefaultAmqpHeaderMapper. 默认情况下,对于出站映射器,没有x-*标头被映射。 请参阅本节后面出现的注意事项,了解原因。spring-doc.cadn.net.cn

要覆盖默认值并恢复到 4.3 之前的行为,请使用STANDARD_REQUEST_HEADERSSTANDARD_REPLY_HEADERS在 Properties 中。spring-doc.cadn.net.cn

映射用户定义的标头时,值还可以包含简单的通配符模式(例如thing**thing) 进行匹配。 匹配所有标头。*

从版本 4.1 开始,AbstractHeaderMapper(一个DefaultAmqpHeaderMappersuperclass) 允许NON_STANDARD_HEADERStoken 的requestHeaderNamesreplyHeaderNames属性(除了现有的STANDARD_REQUEST_HEADERSSTANDARD_REPLY_HEADERS) 映射所有用户定义的标头。spring-doc.cadn.net.cn

org.springframework.amqp.support.AmqpHeadersclass 标识DefaultAmqpHeaderMapper:spring-doc.cadn.net.cn

如本节前面所述,使用 Headers 映射模式是复制所有 Headers 的常见方法。 但是,这可能会产生一些意想不到的副作用,因为某些 RabbitMQ 专有属性/标头也会被复制。 例如,当您使用联合身份验证时,收到的消息可能具有名为*x-received-from,其中包含发送消息的节点。 如果您在入站网关上对请求和回复标头映射使用通配符,则会复制此标头,这可能会导致联合身份验证出现一些问题。 此回复消息可能会联合回发送代理,发送代理可能会认为消息正在循环,因此会以静默方式删除该消息。 如果您希望使用通配符标头映射的便利性,则可能需要筛选掉下游流中的一些标头。 例如,为避免将*x-received-from标头返回到你可以使用的回复<int:header-filter …​ header-names="x-received-from">在将回复发送到 AMQP 入站网关之前。 或者,您可以显式列出实际要映射的那些属性,而不是使用通配符。 由于这些原因,对于入站邮件,映射器(默认情况下)不会映射任何x-*头。 它也不会映射deliveryModeamqp_deliveryMode标头,以避免该标头从入站消息传播到出站消息。 相反,此标头映射到amqp_receivedDeliveryMode,该 URL 不会映射到输出上。

从版本 4.3 开始,可以通过在模式前面加上!. 否定模式获得优先级,因此STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1不映射thing1(或thing2也不thing3). 标准接头加上badqux被映射。 否定技术可能很有用,例如,当 JSON 反序列化逻辑在接收方下游以不同的方式完成时,不为传入消息映射 JSON 类型的标头。 为此,一个!json_*pattern 应该为 inbound channel adapter/gateway 的 header mapper 配置。spring-doc.cadn.net.cn

如果您有一个以!,则需要使用 进行转义,如下所示:\STANDARD_REQUEST_HEADERS,\!myBangHeader. 名为!myBangHeader现在已映射。
从版本 5.1 开始,DefaultAmqpHeaderMapper将回退到映射MessageHeaders.IDMessageHeaders.TIMESTAMPMessageProperties.messageIdMessageProperties.timestamp分别,如果相应的amqp_messageIdamqp_timestamp出站邮件上不存在报头。 入站属性将映射到amqp_*标头。 填充messageId属性。

contentType页眉

与其他标头不同,AmqpHeaders.CONTENT_TYPE未以amqp_;这允许跨不同技术透明地传递 contentType 标头。 例如,发送到 RabbitMQ 队列的入站 HTTP 消息。spring-doc.cadn.net.cn

contentType标头映射到 Spring AMQP 的MessageProperties.contentType属性,该属性随后映射到 RabbitMQ 的content_type财产。spring-doc.cadn.net.cn

在 5.1 版本之前,此标头也映射为MessageProperties.headers地图;这是不正确的,此外,该值可能是错误的,因为底层 Spring AMQP 消息转换器可能已经更改了内容类型。 这样的变化将反映在一等舱content_type属性,但不在 RabbitMQ 标头映射中。 入站映射忽略了 headers 映射值。contentType不再映射到 Headers 映射中的条目。spring-doc.cadn.net.cn

严格的消息排序

本节介绍入站和出站消息的消息排序。spring-doc.cadn.net.cn

入境

如果需要对入站消息进行严格排序,则必须配置入站侦听器容器的prefetchCountproperty 设置为1. 这是因为,如果消息失败并重新传递,它将在现有的预取消息之后到达。 从 Spring AMQP 版本 2.0 开始,prefetchCount默认为250以提高性能。 严格的订购要求以性能下降为代价。spring-doc.cadn.net.cn

出境

请考虑以下集成流程:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .split(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假设我们发送消息A,BC到网关。 虽然消息A,B,C均按顺序发送,则不保证。 这是因为模板为每个 send作从缓存中“借用”一个通道,并且不能保证每条消息都使用相同的通道。 一种解决方案是在拆分器之前启动事务,但事务在 RabbitMQ 中成本高昂,并且性能会降低数百倍。spring-doc.cadn.net.cn

为了以更有效的方式解决这个问题,从版本 5.1 开始, Spring 集成提供了BoundRabbitChannelAdvice这是一个HandleMessageAdvice. 请参阅处理消息建议。 在拆分器之前应用时,它确保所有下游作都在同一通道上执行,并且可以选择等待收到所有已发送消息的发布者确认(如果连接工厂配置为确认)。 以下示例演示如何使用BoundRabbitChannelAdvice:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .split(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

请注意,相同的RabbitTemplate(它实现RabbitOperations) 用于通知和出站适配器。 该通知在模板的invoke方法,以便所有作都在同一通道上运行。 如果提供了可选的超时,则当流程完成时,建议会调用waitForConfirmsOrDie方法,如果在指定时间内未收到确认,则会引发异常。spring-doc.cadn.net.cn

下游流中不得有线程切换 (QueueChannel,ExecutorChannel等)。

AMQP 示例

要试验 AMQP 适配器,请查看 Spring 集成示例 git 存储库中提供的示例,网址为 https://github.com/SpringSource/spring-integration-samplesspring-doc.cadn.net.cn

目前,一个示例通过使用出站通道适配器和入站通道适配器演示了 Spring 集成 AMQP 适配器的基本功能。 由于示例中的 AMQP 代理实现使用 RabbitMQ。spring-doc.cadn.net.cn

为了运行该示例,您需要一个正在运行的 RabbitMQ 实例。 仅具有基本默认值的本地安装就足够了。 有关 RabbitMQ 的详细安装过程,请参阅 https://www.rabbitmq.com/install.html

启动示例应用程序后,在命令提示符上输入一些文本,包含该输入文本的消息将分派到 AMQP 队列。 作为回报,该消息由 Spring Integration 检索并打印到控制台。spring-doc.cadn.net.cn

下图说明了此示例中使用的 Spring 集成组件的基本集。spring-doc.cadn.net.cn

RabbitMQ 流队列支持

AMQP样本image::images/spring-integration-amqp-sample-graph.png的Spring Integration图[]

版本 6.0 引入了对 RabbitMQ 流队列的支持。spring-doc.cadn.net.cn

这些端点的 DSL 工厂类是Rabbit.spring-doc.cadn.net.cn

RabbitMQ 流入站通道适配器

@Bean
IntegrationFlow flow(Environment env) {
    @Bean
	IntegrationFlow simpleStream(Environment env) {
		return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
						.configureContainer(container -> container.queueName("my.stream")))
				// ...
				.get();
	}

	@Bean
	IntegrationFlow superStream(Environment env) {
		return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
						.configureContainer(container -> container.superStream("my.stream", "my.consumer")))
				// ...
				.get();
	}
}

RabbitMQ 流出站通道适配器

@Bean
IntegrationFlow outbound(RabbitStreamTemplate template) {
    return f -> f
            // ...
            .handle(RabbitStream.outboundStreamAdapter(template));

}