JMS 支持
JMS 支持
Spring 集成提供了用于接收和发送 JMS 消息的通道适配器。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:6.0.9"
这jakarta.jms:jakarta.jms-api
必须通过某些特定于 JMS 供应商的实现(例如 Apache ActiveMQ)显式添加。
实际上有两个基于 JMS 的入站 Channel Adapters。
第一个使用 Spring 的JmsTemplate
以根据轮询周期接收。
第二个是 “消息驱动” 的,依赖于 SpringMessageListener
容器。
出站通道适配器使用JmsTemplate
以按需转换和发送 JMS 消息。
通过使用JmsTemplate
和MessageListener
容器中,Spring Integration 依赖于 Spring 的 JMS 支持。
理解这一点很重要,因为这些适配器上公开的大多数属性都配置了底层JmsTemplate
和MessageListener
容器。
有关更多详细信息JmsTemplate
和MessageListener
容器,请参阅 Spring JMS 文档。
虽然 JMS 通道适配器旨在用于单向消息传递(仅发送或仅接收),但 Spring 集成还为请求和回复作提供了入站和出站 JMS 网关。
入站网关依赖于 Spring 的MessageListener
用于消息驱动接收的容器实现。
它还能够将返回值发送到reply-to
destination 的 Target,由收到的消息提供。
出站网关将 JMS 消息发送到request-destination
(或request-destination-name
或request-destination-expression
),然后收到回复消息。
您可以显式配置reply-destination
引用(或reply-destination-name
或reply-destination-expression
).
否则,出站网关将使用 JMS TemporaryQueue。
在 Spring Integration 2.2 之前,如有必要,可以使用TemporaryQueue
为每个请求或回复创建(和删除)。
从 Spring Integration 2.2 开始,您可以将出站网关配置为使用MessageListener
容器来接收回复,而不是直接使用 new(或缓存的)Consumer
以接收每个请求的回复。
如果配置了此类目标,并且未提供明确的回复目标,则单个TemporaryQueue
用于每个网关,而不是每个请求都使用一个网关。
从版本 6.0 开始,出站网关会创建一个TemporaryTopic
而不是TemporaryQueue
如果replyPubSubDomain
option 设置为true
.
某些 JMS 供应商处理这些目标的方式不同。
入站通道适配器
入站通道适配器需要对单个JmsTemplate
实例或两者兼而有之ConnectionFactory
以及Destination
(您可以提供 'destinationName' 来代替 'destination' 引用)。
以下示例定义了一个入站通道适配器,其中包含Destination
参考:
@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
integrationFlow(
Jms.inboundAdapter(connectionFactory).destination("inQueue"),
{ poller { Pollers.fixedRate(30000) } })
{
handle { m -> println(m.payload) }
}
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
source.setDestinationName("inQueue");
return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
<int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
请注意,从前面的配置中,inbound-channel-adapter 是轮询使用者。
这意味着它会调用receive() 触发时。
您应该仅在轮询相对不频繁且及时性不重要的情况下使用它。
对于所有其他情况(绝大多数基于 JMS 的使用案例),使用message-driven-channel-adapter (稍后描述)是更好的选择。 |
默认情况下,所有需要引用ConnectionFactory 自动查找名为jmsConnectionFactory .
这就是为什么您看不到connection-factory 属性。
但是,如果您的 JMSConnectionFactory 具有不同的 bean 名称,则需要提供该属性。 |
如果extract-payload
设置为true
(默认值)时,收到的 JMS 消息将通过MessageConverter
.
当依赖默认的SimpleMessageConverter
,这意味着生成的 Spring 集成消息将 JMS 消息的主体作为其有效负载。
一个 JMSTextMessage
生成基于字符串的有效负载,即 JMSBytesMessage
生成字节数组有效负载和 JMS 的可序列化实例ObjectMessage
成为 Spring Integration 消息的有效负载。
如果你希望将原始 JMS 消息作为 Spring 集成消息的有效负载,请将extractPayload
选项设置为false
.
从版本 5.0.8 开始,receive-timeout
是-1
(无需等待)等待org.springframework.jms.connection.CachingConnectionFactory
和cacheConsumers
,否则为 1 秒。
JMS 入站通道适配器 (JMS Inbound Channel Adapter) 将DynamicJmsTemplate
基于提供的ConnectionFactory
和选项。
如果外部JmsTemplate
是必需的(例如在 Spring Boot 环境中),或者ConnectionFactory
不是缓存,或者不是cacheConsumers
,建议将jmsTemplate.receiveTimeout(-1)
如果预期为非阻塞消耗:
Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))
交易
从版本 4.0 开始,入站通道适配器支持session-transacted
属性。
在早期版本中,您必须注入JmsTemplate
跟sessionTransacted
设置为true
.
(适配器确实允许您将acknowledge
属性设置为transacted
,但这是不正确的,不起作用)。
但请注意,该设置session-transacted
自true
值不大,因为事务已提交
紧接在receive()
作,在将消息发送到channel
.
如果您希望整个流是事务性的(例如,如果存在下游出站通道适配器),则必须使用transactional
poller 具有JmsTransactionManager
.
或者,考虑使用jms-message-driven-channel-adapter
跟acknowledge
设置为transacted
(默认值)。
消息驱动的通道适配器
这message-driven-channel-adapter
需要引用 Spring 的实例MessageListener
container(任何AbstractMessageListenerContainer
) 或两者兼而有之ConnectionFactory
和Destination
(可以提供 'destinationName' 来代替 'destination' 引用)。
以下示例定义了一个消息驱动的通道适配器,其中Destination
参考:
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
integrationFlow(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue")) {
channel("exampleChannel")
}
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(cf());
container.setDestinationName("inQueue");
return container;
}
@Bean
public ChannelPublishingJmsMessageListener listener() {
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
listener.setRequestChannelName("exampleChannel");
return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>
消息驱动的适配器还接受与 如果你有一个自定义的监听器容器实现(通常是 |
你不能使用 Spring JMS 命名空间元素 建议配置常规的 |
从版本 4.2 开始,默认的acknowledge mode 为transacted ,除非您提供外部容器。
在这种情况下,您应该根据需要配置容器。
我们建议使用transacted 使用DefaultMessageListenerContainer 以避免消息丢失。 |
'extract-payload' 属性具有相同的效果,其默认值为 'true'。
这poller
元素不适用于消息驱动的通道适配器,因为它是主动调用的。
在大多数情况下,消息驱动的方法更好,因为消息会传递给MessageChannel
一旦从底层 JMS 使用者收到它们。
最后,<message-driven-channel-adapter>
元素也接受 'error-channel' 属性。
这提供了相同的基本功能,如输入GatewayProxyFactoryBean
.
下面的示例说明如何在消息驱动的通道适配器上设置错误通道:
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>
将前面的示例与通用网关配置或我们稍后讨论的 JMS “inbound-gateway” 进行比较时,关键的区别在于我们处于单向流中,因为这是一个 “channel-adapter”,而不是网关。
因此,从 'error-channel' 下游的流也应该是单向的。
例如,它可以发送到日志记录处理程序,也可以连接到不同的 JMS<outbound-channel-adapter>
元素。
从 Topic 使用时,将pub-sub-domain
属性设置为 true。
设置subscription-durable
自true
对于长期订阅,或者subscription-shared
共享订阅(需要 JMS 2.0 代理,并且从版本 4.2 开始可用)。
用subscription-name
为订阅命名。
从版本 5.1 开始,当终端节点停止而应用程序保持运行时,底层侦听器容器将关闭,从而关闭其共享连接和使用者。
以前,连接和使用者保持打开状态。
要恢复到之前的行为,请将shutdownContainerOnStop
在JmsMessageDrivenEndpoint
自false
.
入站转换错误
从版本 4.2 开始,'error-channel' 也用于转换错误。
以前,如果 JMS<message-driven-channel-adapter/>
或<inbound-gateway/>
Could not deliver a message due to a conversion error,则会将异常抛回到容器中。
如果容器配置为使用事务,则消息将回滚并重复重新传递。
转换过程发生在消息构造之前和期间,因此此类错误不会发送到 'error-channel'。
现在,此类转换异常会导致ErrorMessage
被发送到 'error-channel' ,但payload
.
如果您希望事务回滚,并且您定义了 'error-channel',则 'error-channel' 上的集成流必须重新抛出异常(或其他异常)。
如果错误流没有引发异常,则提交事务并删除消息。
如果未定义 'error-channel',则像以前一样将异常抛回到容器中。
出站通道适配器
这JmsSendingMessageHandler
实现MessageHandler
接口,并且能够将 Spring IntegrationMessages
发送到 JMS 消息,然后发送到 JMS 目标。
它需要一个jmsTemplate
引用或两者jmsConnectionFactory
和destination
参考资料 (destinationName
可以代替destination
).
与入站通道适配器一样,配置此适配器的最简单方法是使用名称空间支持。
以下配置会生成一个适配器,该适配器从exampleChannel
,将这些消息转换为 JMS 消息,并将它们发送到 Bean 名称为outQueue
:
@Bean
public IntegrationFlow jmsOutboundFlow() {
return f -> f
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
integrationFlow {
handle(Jms.outboundAdapter(jmsConnectionFactory())
.apply {
destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression("10000")
configureJmsTemplate { it.explicitQosEnabled(true) }
}
)
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
handler.setDestinationName("outQueue");
return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>
与入站通道适配器一样,有一个 'extract-payload' 属性。
但是,出站适配器的含义相反。
boolean 属性不是应用于 JMS 消息,而是应用于 Spring 集成消息有效负载。
换句话说,决定是将 Spring 集成消息本身作为 JMS 消息正文传递,还是将 Spring 集成消息有效负载作为 JMS 消息正文传递。
默认值为 'true'。
因此,如果你传递一个 Spring Integration 消息,其有效负载是String
、JMSTextMessage
已创建。
另一方面,如果你想通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,请将其设置为'false'。
无论有效负载提取的布尔值如何, Spring 集成MessageHeaders 映射到 JMS 属性,只要您依赖默认转换器或提供对MessageConverter .
(“入站”适配器也是如此,只是在这些情况下,JMS 属性映射到 Spring 集成MessageHeaders ). |
从版本 5.1 开始,<int-jms:outbound-channel-adapter>
(JmsSendingMessageHandler
) 中可以使用deliveryModeExpression
和timeToLiveExpression
属性来评估 JMS 消息在运行时根据请求 Spring 发送的适当 QoS 值Message
.
新的setMapInboundDeliveryMode(true)
和setMapInboundExpiration(true)
选项DefaultJmsHeaderMapper
可能促进作为动态信息来源deliveryMode
和timeToLive
从消息标头:
<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>
入站网关
Spring 集成的消息驱动的 JMS 入站网关委托给MessageListener
容器,支持动态调整并发消费者,还可以处理回复。
入站网关需要引用ConnectionFactory
和一个请求Destination
(或 'requestDestinationName')。
以下示例定义了一个 JMSinbound-gateway
从 Bean ID 引用的 JMS 队列接收,inQueue
,并发送到名为exampleChannel
:
<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>
由于网关提供请求-回复行为而不是单向发送或接收行为,因此它们还具有两个不同的 “payload extraction” 属性(如前所述,通道适配器的 'extract-payload' 设置)。 对于入站网关,'extract-request-payload' 属性确定是否提取收到的 JMS 消息正文。 如果为'false',则 JMS 消息本身将成为 Spring 集成消息有效负载。 默认值为 'true'。
同样,对于入站网关,'extract-reply-payload'属性适用于要转换为回复 JMS 消息的 Spring 集成消息。
如果要传递整个 Spring 集成消息(作为 JMS ObjectMessage的主体),请将value this设置为'false'。
默认情况下,将 Spring 集成消息有效负载转换为 JMS 消息也是“true”的(例如,一个String
payload 变为 JMS TextMessage)。
与其他任何事情一样,网关调用可能会导致错误。 默认情况下,创建者不会收到使用者端可能发生的错误的通知,并且会超时等待回复。 但是,有时您可能希望将错误条件传回给使用者(换句话说,您可能希望通过将异常映射到消息来将异常视为有效回复)。 为了实现这一点,JMS 入站网关提供了对消息通道的支持,错误可以发送到该通道进行处理,这可能会导致回复消息有效负载符合某些约定,该约定定义了调用方可能期望的“错误”回复。 您可以使用 error-channel 属性来配置此类通道,如下例所示:
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
您可能会注意到,此示例看起来与输入GatewayProxyFactoryBean
.
同样的想法也适用于这里:exceptionTransformer
可以是创建错误响应对象的 POJO,则可以引用nullChannel
来抑制错误,或者你可以把 'error-channel' 省略出去,让异常传播。
请参阅入站转换错误。
从 Topic 使用时,将pub-sub-domain
属性设置为 true。
设置subscription-durable
自true
对于长期订阅,或者subscription-shared
共享订阅(需要 JMS 2.0 代理,并且自版本 4.2 起可用)。
用subscription-name
为订阅命名。
从版本 4.2 开始,默认的acknowledge mode 为transacted ,除非提供了外部容器。
在这种情况下,您应该根据需要配置容器。
我们建议您使用transacted 使用DefaultMessageListenerContainer 以避免消息丢失。 |
从版本 5.1 开始,当终端节点停止而应用程序保持运行时,底层侦听器容器将关闭,从而关闭其共享连接和使用者。
以前,连接和使用者保持打开状态。
要恢复到之前的行为,请将shutdownContainerOnStop
在JmsInboundGateway
自false
.
出站网关
出站网关从 Spring 集成消息创建 JMS 消息,并将它们发送到“请求目标”。
然后,它通过使用选择器从您配置的 'reply-destination' 接收消息来处理 JMS 回复消息,或者如果未提供 'reply-destination',则通过创建 JMS 来处理 JMSTemporaryQueue
(或TemporaryTopic
如果replyPubSubDomain= true
) 实例。
使用 如果指定了回复目标,则建议不要使用缓存的消费者。
或者,考虑使用 |
以下示例显示如何配置出站网关:
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>
'outbound-gateway' 有效负载提取属性与 'inbound-gateway' 的有效负载提取属性成反比(请参阅前面的讨论)。 这意味着 'extract-request-payload' 属性值适用于被转换为 JMS 消息以作为请求发送的 Spring 集成消息。 'extract-reply-payload'属性值适用于作为回复接收的 JMS 消息,然后转换为 Spring 集成消息,随后发送到'reply-channel',如前面的配置示例所示。
使用<reply-listener/>
Spring Integration 2.2 引入了一种处理回复的替代技术。
如果您添加了<reply-listener/>
child 元素添加到网关中,而不是为每个回复创建一个使用者,而是MessageListener
container 用于接收回复并将其交给请求线程。
这提供了许多性能优势,并缓解了前面警告中描述的缓存使用者内存利用率问题。
当使用<reply-listener/>
替换为没有reply-destination
,而不是创建TemporaryQueue
对于每个请求,单个TemporaryQueue
被使用。
(网关会创建一个额外的TemporaryQueue
,如果与 broker 的连接丢失并已恢复)。
如果replyPubSubDomain
设置为true
,从版本 6.0 开始,一个TemporaryTopic
。
当使用correlation-key
,则多个网关可以共享相同的回复目标,因为侦听器容器使用每个网关唯一的选择器。
如果您指定回复侦听器并指定回复目标(或回复目标名称),但未提供关联键,则网关将记录警告并回退到版本 2.2 之前的行为。 这是因为在这种情况下无法配置选择器。 因此,无法避免应答发送到可能配置了相同应答目标的不同网关。 请注意,在这种情况下,每个请求都使用一个新的使用者,并且使用者可以按照上面的警告中所述在内存中构建;因此,在这种情况下,不应使用缓存的使用者。 |
以下示例显示了具有 default 属性的回复侦听器:
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>
侦听器非常轻量级,我们预计在大多数情况下,您只需要一个使用者。
但是,您可以添加诸如concurrent-consumers
,max-concurrent-consumers
等。
有关受支持属性的完整列表,请参见模式,以及 Spring JMS 文档以了解其含义。
空闲回复侦听器
从版本 4.2 开始,您可以根据需要启动回复侦听器(并在空闲时间后停止它),而不是在网关的生命周期内运行。 如果应用程序上下文中有许多网关,而这些网关大多处于空闲状态,则这可能很有用。 一种这样的情况是具有许多(非活动)分区 Spring Batch 作业的上下文,这些作业使用 Spring Integration 和 JMS 进行分区分发。 如果所有回复侦听器都处于活动状态,则 JMS 代理的每个网关都有一个活动的使用者。 通过启用空闲超时,每个使用者仅在相应的批处理作业运行时存在(并且在批处理作业完成后的短时间内存在)。
看idle-reply-listener-timeout
在 Attribute Reference 中。
网关回复关联
本节介绍用于回复关联的机制(确保原始网关仅接收对其请求的回复),具体取决于网关的配置方式。 有关此处讨论的属性的完整描述,请参阅 Attribute Reference。
以下列表描述了各种情况(数字用于识别 — 顺序无关紧要):
-
不
reply-destination*
properties 和 no<reply-listener>
一个
TemporaryQueue
为每个请求创建,并在请求完成(成功或失败)时删除。correlation-key
无关紧要。 -
一个
reply-destination*
属性,并且<reply-listener/>
也不是correlation-key
提供这
JMSCorrelationID
等于传出消息用作使用者的消息选择器:messageSelector = "JMSCorrelationID = '" + messageId + "'"
响应系统应返回入站
JMSMessageID
在回复中JMSCorrelationID
. 这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter
对于消息驱动的 POJO。使用此配置时,不应将主题用于回复。 回复可能会丢失。 -
一个
reply-destination*
property 时,没有<reply-listener/>
,并且correlation-key="JMSCorrelationID"
网关会生成一个唯一的关联 IS 并将其插入到
JMSCorrelationID
页眉。 消息选择器为:messageSelector = "JMSCorrelationID = '" + uniqueId + "'"
响应系统应返回入站
JMSCorrelationID
在回复中JMSCorrelationID
. 这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter
对于消息驱动的 POJO。 -
一个
reply-destination*
property 时,没有<reply-listener/>
,并且correlation-key="myCorrelationHeader"
网关会生成一个唯一的关联 ID 并将其插入到
myCorrelationHeader
message 属性。 这correlation-key
可以是任何用户定义的值。 消息选择器为:messageSelector = "myCorrelationHeader = '" + uniqueId + "'"
响应系统应返回入站
myCorrelationHeader
在回复中myCorrelationHeader
. -
一个
reply-destination*
property 时,没有<reply-listener/>
,并且correlation-key="JMSCorrelationID*"
(请注意 correlation 键中的 。*
网关使用
jms_correlationId
标头(如果存在)并将其插入到JMSCorrelationID
页眉。 消息选择器为:messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"
用户必须确保此值是唯一的。
如果标头不存在,则网关的行为与
3
.响应系统应返回入站
JMSCorrelationID
在回复中JMSCorrelationID
. 这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter
对于消息驱动的 POJO。 -
不
reply-destination*
properties 的<reply-listener>
提供将创建一个临时队列,并将其用于此网关实例中的所有回复。 消息中不需要关联数据,但需要传出的
JMSMessageID
在网关内部用于将回复定向到正确的请求线程。 -
一个
reply-destination*
属性,则<reply-listener>
,并且没有correlation-key
提供不允许。
这
<reply-listener/>
配置被忽略,网关的行为与2
. 将写入一条警告日志消息以指示这种情况。 -
一个
reply-destination*
属性,则<reply-listener>
,并且correlation-key="JMSCorrelationID"
网关具有唯一的关联 ID,并将其与
JMSCorrelationID
标头 (gatewayId + "_" + ++seq
). 消息选择器为:messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"
响应系统应返回入站
JMSCorrelationID
在回复中JMSCorrelationID
. 这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter
对于消息驱动的 POJO。 由于每个网关都有一个唯一的 ID,因此每个实例都只能获得自己的回复。 完整的关联数据用于将回复路由到正确的请求线程。 -
一个
reply-destination*
属性中提供了一个<reply-listener/>
,并且correlation-key="myCorrelationHeader"
网关具有唯一的关联 ID,并将其与
myCorrelationHeader
属性 (gatewayId + "_" + ++seq
). 这correlation-key
可以是任何用户定义的值。 消息选择器为:messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"
响应系统应返回入站
myCorrelationHeader
在回复中myCorrelationHeader
. 由于每个网关都有一个唯一的 ID,因此每个实例只获得自己的回复。 完整的关联数据用于将回复路由到正确的请求线程。 -
一个
reply-destination*
属性,则<reply-listener/>
,并且correlation-key="JMSCorrelationID*"
(请注意 correlation key 中的 )
*
不允许。
回复侦听器不允许使用用户提供的相关 ID。 网关不使用此配置进行初始化。
异步网关
从版本 4.3 开始,您现在可以指定async="true"
(或setAsync(true)
在 Java 中)。
默认情况下,当请求发送到网关时,请求线程将暂停,直到收到回复。
然后,流在该线程上继续。
如果async
是true
,请求线程将在send()
完成,并在侦听器容器线程上返回回复(并且流继续)。
当在 poller 线程上调用网关时,这可能很有用。
线程已释放,可用于框架中的其他任务。
你async
需要<reply-listener/>
(或setUseReplyContainer(true)
使用 Java 配置时)。
它还需要一个correlationKey
(通常JMSCorrelationID
) 进行指定。
如果不满足上述任一条件,则async
被忽略。
属性参考
下面的清单显示了outbound-gateway
:
<int-jms:outbound-gateway
connection-factory="connectionFactory" (1)
correlation-key="" (2)
delivery-persistent="" (3)
destination-resolver="" (4)
explicit-qos-enabled="" (5)
extract-reply-payload="true" (6)
extract-request-payload="true" (7)
header-mapper="" (8)
message-converter="" (9)
priority="" (10)
receive-timeout="" (11)
reply-channel="" (12)
reply-destination="" (13)
reply-destination-expression="" (14)
reply-destination-name="" (15)
reply-pub-sub-domain="" (16)
reply-timeout="" (17)
request-channel="" (18)
request-destination="" (19)
request-destination-expression="" (20)
request-destination-name="" (21)
request-pub-sub-domain="" (22)
time-to-live="" (23)
requires-reply="" (24)
idle-reply-listener-timeout="" (25)
async=""> (26)
<int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 | 对jakarta.jms.ConnectionFactory .
默认的jmsConnectionFactory . |
||
2 | 包含关联数据以将响应与回复关联的属性的名称。
如果省略,则网关希望响应系统返回出站JMSMessageID 标头中的JMSCorrelationID 页眉。
如果指定,网关将生成相关 ID 并使用该 ID 填充指定的属性。
响应系统必须在同一属性中回显该值。
它可以设置为JMSCorrelationID ,在这种情况下,将使用标准标头而不是String 属性来保存关联数据。
当您使用<reply-container/> ,则必须指定correlation-key 如果您提供显式的reply-destination .
从版本 4.0.1 开始,此属性还支持值JMSCorrelationID* ,这意味着如果出站邮件已经具有JMSCorrelationID (映射自jms_correlationId ) 标头,则使用它而不是生成新的标头。
请注意,JMSCorrelationID* key 不允许使用<reply-container/> ,因为容器需要在初始化期间设置消息选择器。
|
||
3 | 一个布尔值,指示传递模式是否应为DeliveryMode.PERSISTENT (true ) 或DeliveryMode.NON_PERSISTENT (false ).
此设置仅在explicit-qos-enabled 是true . |
||
4 | 一个DestinationResolver .
默认值为DynamicDestinationResolver ,它将目标名称映射到该名称的队列或主题。 |
||
5 | 当设置为true ,它允许使用服务质量属性:priority ,delivery-mode 和time-to-live . |
||
6 | 当设置为true (默认值),则 Spring 集成回复消息的有效负载是从 JMS 回复消息的主体创建的(通过使用MessageConverter ).
当设置为false ,则整个 JMS 消息将成为 Spring Integration 消息的有效负载。 |
||
7 | 当设置为true (默认值),则 Spring Integration 消息的有效负载将转换为JMSMessage (通过使用MessageConverter ).
当设置为false ,则整个 Spring Integration Message 将转换为JMSMessage .
在这两种情况下,Spring 集成消息头都使用HeaderMapper . |
||
8 | 一个HeaderMapper 用于将 Spring Integration 消息头映射到 JMS 消息头和属性或从 JMS 消息头和属性映射。 |
||
9 | 对MessageConverter 用于在 JMS 消息和 Spring 集成消息有效负载(或如果为extract-request-payload 是false ).
默认值为SimpleMessageConverter . |
||
10 | 请求消息的默认优先级。
被消息优先级标头(如果存在)覆盖。
其范围是0 自9 .
此设置仅在explicit-qos-enabled 是true . |
||
11 | 等待回复的时间 (以毫秒为单位)。
默认值为5000 (5 秒)。 |
||
12 | 将回复消息发送到的通道。 | ||
13 | 对Destination ,它被设置为JMSReplyTo 页眉。
最多只有一个reply-destination ,reply-destination-expression 或reply-destination-name 是允许的。
如果未提供,则TemporaryQueue 用于回复此网关。 |
||
14 | 一个 SpEL 表达式的计算结果为Destination ,它将被设置为JMSReplyTo 页眉。
表达式可以产生Destination object 或String .
它由DestinationResolver 解析实际的Destination .
最多只有一个reply-destination ,reply-destination-expression 或reply-destination-name 是允许的。
如果未提供,则TemporaryQueue 用于回复此网关。 |
||
15 | 设置为 JMSReplyTo 标头的目标的名称。
它由DestinationResolver 解析实际的Destination .
最多只有一个reply-destination ,reply-destination-expression 或reply-destination-name 是允许的。
如果未提供,则TemporaryQueue 用于回复此网关。 |
||
16 | 当设置为true ,则表示任何回复Destination 由DestinationResolver 应为Topic 而不是Queue . |
||
17 | 网关在将回复消息发送到reply-channel .
这仅在reply-channel can 阻止 — 例如QueueChannel 容量限制当前已满。
默认值为 infinity。 |
||
18 | 此网关接收请求消息的通道。 | ||
19 | 对Destination 请求消息将发送到哪个位置。
其中之一reply-destination ,reply-destination-expression 或reply-destination-name 是必需的。
您只能使用这三个属性中的一个。 |
||
20 | 一个 SpEL 表达式的计算结果为Destination 请求消息将发送到哪个位置。
表达式可以产生Destination object 或String .
它由DestinationResolver 解析实际的Destination .
其中一个reply-destination ,reply-destination-expression 或reply-destination-name 是必需的。
您只能使用这三个属性中的一个。 |
||
21 | 请求消息发送到的目标的名称。
它由DestinationResolver 解析实际的Destination .
其中之一reply-destination ,reply-destination-expression 或reply-destination-name 是必需的。
您只能使用这三个属性中的一个。 |
||
22 | 当设置为true ,则表示任何请求Destination 由DestinationResolver 应为Topic 而不是Queue . |
||
23 | 指定消息生存时间。
此设置仅在explicit-qos-enabled 是true . |
||
24 | 指定此出站网关是否必须返回非 null 值。
默认情况下,此值为true 和MessageTimeoutException 当底层服务在receive-timeout .
请注意,如果服务从未期望返回回复,则最好使用<int-jms:outbound-channel-adapter/> 而不是<int-jms:outbound-gateway/> 跟requires-reply="false" .
对于后者,发送线程将被阻塞,等待receive-timeout 时期。 |
||
25 | 当您使用<reply-listener /> ,则默认情况下,其生命周期(start 和 stop)与网关的生命周期匹配。
当该值大于0 ,则容器将按需启动(发送请求时)。
容器将继续运行,至少直到此时间过去,没有收到任何请求(并且直到没有未完成的回复)。
容器将在下一个请求中再次启动。
停止时间是最小值,实际上可能高达此值的 1.5 倍。 |
||
26 | 请参阅异步网关。 | ||
27 | 当包含此元素时,异步MessageListenerContainer 而不是为每个回复创建一个使用者。
在许多情况下,这可能更有效。 |
将消息头映射到 JMS 消息或从 JMS 消息映射消息
JMS 消息可以包含元信息,例如 JMS API 标头和简单属性。
你可以使用 Python 将它们映射到 Spring 集成消息头或从中映射这些消息头JmsHeaderMapper
.
JMS API 标头将传递给相应的 setter 方法(例如setJMSReplyTo
),而其他标头将复制到 JMS Message 的常规属性中。
JMS 出站网关使用JmsHeaderMapper
,它将映射标准 JMS API 标头以及原始或String
消息标头。
您还可以使用header-mapper
入站和出站网关的属性。
许多特定于 JMS 供应商的客户端不允许设置deliveryMode ,priority 和timeToLive 属性。
它们被视为 QoS 属性,因此必须传播到目标MessageProducer.send(message, deliveryMode, priority, timeToLive) 应用程序接口。
因此,DefaultJmsHeaderMapper 不会将适当的 Spring 集成头(或表达式结果)映射到提到的 JMS 消息属性中。
相反,一个DynamicJmsTemplate 由JmsSendingMessageHandler 要将标头值从请求消息传播到MessageProducer.send() 应用程序接口。
要启用此功能,您必须使用DynamicJmsTemplate 及其explicitQosEnabled 属性设置为 true。
Spring 集成 Java DSL 配置了一个DynamicJmsTemplate ,但您仍必须将explicitQosEnabled 财产。 |
从 4.0 版本开始,JMSPriority header 映射到标准的priority 入站消息的标头。
以前,priority header 仅用于出站邮件。
要恢复到之前的行为(即不映射入站优先级),请将mapInboundPriority 的属性DefaultJmsHeaderMapper 自false . |
从 4.3 版本开始,DefaultJmsHeaderMapper 映射标准correlationId header 作为 Message 属性,方法是调用其toString() method (correlationId 通常是UUID ,JMS 不支持此功能)。
在入站端,它被映射为String .
这与jms_correlationId 标头,该标头映射到JMSCorrelationID 页眉。
这JMSCorrelationID 通常用于关联请求和回复,而correlationId 通常用于将相关消息合并到一个组中(例如与 aggregator 或 resequencer 一起)。 |
从版本 5.1 开始,DefaultJmsHeaderMapper
可以配置为映射入站JMSDeliveryMode
和JMSExpiration
性能:
@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}
这些 JMS 属性映射到JmsHeaders.DELIVERY_MODE
和JmsHeaders.EXPIRATION
Spring Message 标头。
消息转换、封送处理和解组
如果需要转换消息,所有 JMS 适配器和网关都允许您提供MessageConverter
通过设置message-converter
属性。
为此,请提供MessageConverter
在同一个 ApplicationContext 中可用。
此外,为了提供与 marshaller 和 unmarshaller 接口的一些一致性, Spring 提供了MarshallingMessageConverter
,您可以使用自己的自定义封送处理程序和解组处理程序对其进行配置。
以下示例说明如何执行此作
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>
<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
当您提供自己的MessageConverter 实例中,它仍然包装在HeaderMappingMessageConverter .
这意味着 'extract-request-payload' 和 'extract-reply-payload' 属性可以影响传递给转换器的实际对象。
这HeaderMappingMessageConverter 本身委托给 TargetMessageConverter 同时还映射 Spring 集成MessageHeaders 添加到 JMS 消息属性中,然后再返回。 |
JMS 支持的消息通道
前面介绍的通道适配器和网关都适用于与其他外部系统集成的应用程序。
入站选项假定其他系统正在向 JMS 目标发送 JMS 消息,出站选项假定其他系统正在从目标接收消息。
另一个系统可能是也可能不是 Spring 集成应用程序。
当然,当将 Spring 集成消息实例作为 JMS 消息本身的主体发送时(将 'extract-payload' 值设置为false
),则假定另一个系统基于 Spring Integration。
然而,这绝不是必需的。
这种灵活性是使用基于消息的集成选项和抽象的 “channels”(或 JMS 中的目标)的好处之一。
有时,给定 JMS Destination 的创建者和使用者都希望成为同一应用程序的一部分,运行在同一进程中。 您可以通过使用一对入站和出站通道适配器来实现此目的。 这种方法的问题在于您需要两个适配器,即使从概念上讲,目标是拥有单个消息通道。 从 Spring Integration 版本 2.0 开始,支持更好的选择。 现在,在使用 JMS 名称空间时可以定义单个 “channel”,如下例所示:
<int-jms:channel id="jmsChannel" queue="exampleQueue"/>
前面示例中的通道的行为与普通的<channel/>
元素。
它可以被input-channel
和output-channel
属性。
区别在于,此通道由名为exampleQueue
.
这意味着可以在生产终端节点和使用终端节点之间进行异步消息收发。
但是,与通过添加<queue/>
元素<channel/>
元素,则消息不会存储在内存中队列中。
相反,这些消息在 JMS 消息正文中传递,然后底层 JMS 提供程序的全部功能可用于该通道。
使用这种替代方法的最常见理由可能是利用 JMS 消息传递的存储转发方法提供的持久性。
如果配置正确,JMS 支持的消息通道还支持事务。
换句话说,如果生产者的发送作是回滚事务的一部分,则生产者实际上不会写入事务性 JMS 支持的通道。
同样,如果 JMS 消息的接收是回滚事务的一部分,则使用者不会从通道中物理删除该消息。
请注意,在这种情况下,生产者和使用者事务是分开的。
这与事务上下文在简单的同步<channel/>
元素中没有<queue/>
child 元素。
由于前面的示例引用了一个 JMS Queue 实例,因此它充当点对点通道。 另一方面,如果您需要发布-订阅行为,则可以使用单独的元素并引用 JMS 主题。 以下示例显示了如何执行此作:
<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>
对于任一类型的 JMS 支持的通道,都可以提供目标的名称而不是引用,如下例所示:
<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>
<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>
在前面的示例中,目标名称由 Spring 的默认值解析DynamicDestinationResolver
实现,但您可以提供DestinationResolver
接口。
此外,JMSConnectionFactory
是 channel 的 required 属性,但是默认情况下,预期的 bean 名称将是jmsConnectionFactory
.
以下示例提供了用于解析 JMS 目标名称的自定义实例和ConnectionFactory
:
<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>
对于<publish-subscribe-channel />
中,将durable
属性设置为true
对于长期订阅,或者subscription-shared
共享订阅(需要 JMS 2.0 代理,并且自版本 4.2 起可用)。
用subscription
为订阅命名。
使用 JMS 消息选择器
使用 JMS 消息选择器,您可以根据 JMS 标头和 JMS 属性筛选 JMS 消息。
例如,如果要侦听其自定义 JMS 标头属性myHeaderProperty
等于something
中,您可以指定以下表达式:
myHeaderProperty = 'something'
-
JMS 通道
-
JMS 发布订阅频道
-
JMS 入站通道适配器
-
JMS 入站网关
-
JMS 消息驱动的通道适配器
不能使用 JMS 消息选择器引用消息正文值。 |
JMS 示例
要试验这些 JMS 适配器,请查看 Spring 集成示例 Git 存储库中提供的 JMS 示例,网址为 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms。
该存储库包括两个示例。 一个提供入站和出站通道适配器,另一个提供入站和出站网关。 它们被配置为使用嵌入式 ActiveMQ 进程运行,但您可以修改每个样本的 common.xml Spring 应用程序上下文文件,以支持不同的 JMS 提供程序或独立的 ActiveMQ 进程。
换句话说,您可以拆分配置,以便入站和出站适配器在单独的 JVM 中运行。
如果您已安装 ActiveMQ,请修改brokerURL
属性中的common.xml
要使用的文件tcp://localhost:61616
(而不是vm://localhost
).
这两个样本都接受来自 stdin 的 input 并回显回 stdout。
查看配置以了解这些消息是如何通过 JMS 路由的。