消息路由
消息路由
本章介绍了使用 Spring Integration 路由消息的详细信息。
路由器
本节介绍路由器的工作原理。 它包括以下主题:
概述
路由器是许多消息传递体系结构中的关键元素。 它们使用来自消息通道的消息,并根据一组条件将每条使用的消息转发到一个或多个不同的消息通道。
Spring 集成提供了以下路由器:
Router 实现共享许多 configuration 参数。 但是,路由器之间存在某些差异。 此外,配置参数的可用性取决于 router 是在链内部还是外部使用。 为了提供快速概述,以下两个表中列出了所有可用属性 。
下表显示了可用于链外路由器的配置参数:
属性 | 路由器 | 标头值路由器 | XPath 路由器 | 有效载荷类型 Router | 收件人列表路由 | 异常类型 router |
---|---|---|---|---|---|---|
应用序列 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
默认输出通道 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
需要分辨率 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
忽略发送失败 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
超时 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
身份证 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
自动启动 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
input-channel (输入通道) |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
次序 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
方法 |
![]() |
|||||
裁判 |
![]() |
|||||
表达 |
![]() |
|||||
标头名称 |
![]() |
|||||
计算为字符串 |
![]() |
|||||
xpath-expression-ref 表达式 |
![]() |
|||||
转炉 |
![]() |
下表显示了可用于链内路由器的配置参数:
属性 | 路由器 | 标头值路由器 | XPath 路由器 | 有效载荷类型 Router | 收件人列表路由器 | 异常类型 router |
---|---|---|---|---|---|---|
应用序列 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
默认输出通道 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
需要分辨率 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
忽略发送失败 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
超时 |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
身份证 |
||||||
自动启动 |
||||||
input-channel (输入通道) |
||||||
次序 |
||||||
方法 |
![]() |
|||||
裁判 |
![]() |
|||||
表达 |
![]() |
|||||
标头名称 |
![]() |
|||||
计算为字符串 |
![]() |
|||||
xpath-expression-ref 表达式 |
![]() |
|||||
转炉 |
![]() |
从 Spring Integration 2.1 开始,路由器参数在所有路由器实现中都更加标准化。 因此,一些小的更改可能会破坏基于 Spring Integration 的旧应用程序。 从 Spring Integration 2.1 开始, 在这些更改之前, 如果您确实希望以静默方式发送消息,则可以设置 |
常用路由器参数
本节介绍所有路由器参数的通用参数(在本章前面显示的两个表中勾选了所有框的参数)。
链内部和外部
以下参数对链内外的所有路由器都有效。
apply-sequence
-
此属性指定是否应将 sequence number 和 size 标头添加到每条消息中。 此可选属性默认为
false
. default-output-channel
-
如果设置,则此属性将提供对通道的引用,如果通道解析无法返回任何通道,则应将消息发送到该通道。 如果未提供默认 output channel,则 router 将引发异常。 如果要改为以静默方式删除这些消息,请将默认 output channel 属性值设置为
nullChannel
.从版本 6.0 开始,设置默认输出通道也会重置 channelKeyFallback
选项设置为false
. 因此,不会尝试从通道的名称解析通道,而是回退到这个默认输出通道 - 类似于 Javaswitch
陈述。 如果channelKeyFallback
设置为true
显式地,进一步的 logic 取决于resolutionRequired
选项:消息 to non-resolved channel from key 可以到达defaultOutputChannel
仅当resolutionRequired
是false
. 因此,其中defaultOutputChannel
提供,并且channelKeyFallback
&resolutionRequired
设置为true
被AbstractMappingMessageRouter
初始化阶段。 resolution-required
-
此属性指定是否必须始终将通道名称成功解析为存在的通道实例。 如果设置为
true
一个MessagingException
在无法解析通道时引发。 将此属性设置为false
导致忽略任何无法解析的通道。 此可选属性默认为true
.Message 仅发送到 default-output-channel
,如果指定,则resolution-required
是false
并且通道未解析。 ignore-send-failures
-
如果设置为
true
,则忽略发送到消息通道的失败。 如果设置为false
一个MessageDeliveryException
被抛出,并且如果路由器解析多个 channel,则任何后续 channel 都不会收到该消息。此属性的确切行为取决于
Channel
消息将发送到 。 例如,当使用直接通道(单线程)时,发送失败可能是由更远的下游组件抛出的异常引起的。 但是,当将消息发送到简单的队列通道 (异步) 时,引发异常的可能性相当小。虽然大多数路由器路由到单个通道,但它们可以返回多个通道名称。 这 recipient-list-router
,就是这样做的。 如果将此属性设置为true
在仅路由到单个 channel 的路由器上,任何引起的 exception 都会被吞噬,这通常没有什么意义。 在这种情况下,最好在流入口点捕获错误流中的异常。 因此,将ignore-send-failures
属性设置为true
当 Router 实现返回多个 channel name 时,通常更有意义,因为失败的 channel 之后的其他 channel(s) 仍将收到该消息。此属性默认为
false
. timeout
-
这
timeout
属性指定在向目标 Message Channel 发送消息时等待的最长时间(以毫秒为单位)。 默认情况下,send作无限期阻止。
顶层(链外)
以下参数仅对链之外的所有顶级路由器有效。
id
-
标识底层 Spring bean 定义,在路由器的情况下,它是
EventDrivenConsumer
或PollingConsumer
,具体取决于路由器的input-channel
是一个SubscribableChannel
或PollableChannel
分别。 这是一个可选属性。 auto-startup
-
此 “lifecycle” 属性指示是否应在应用程序上下文启动期间启动此组件。 此可选属性默认为
true
. input-channel
-
此终端节点的接收消息通道。
order
-
此属性定义当此终端节点作为订阅者连接到通道时的调用顺序。 当该通道使用 failover dispatching 策略时,这一点尤其重要。 当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。
路由器实现
由于基于内容的路由通常需要一些特定于域的逻辑,因此大多数用例都需要 Spring 集成的选项,通过使用 XML 名称空间支持或注释来委托给 POJO。 这两者都将在后面讨论。 但是,我们首先介绍几个满足常见要求的实现。
PayloadTypeRouter
一个PayloadTypeRouter
将消息发送到由 payload-type 映射定义的通道,如下例所示:
<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
的配置PayloadTypeRouter
也受 Spring 集成提供的名称空间的支持(参见Namespace Support
),它通过结合<router/>
configuration 及其相应的实现(通过使用<bean/>
元素)转换为单个更简洁的配置元素。
以下示例显示了PayloadTypeRouter
等同于上述配置,但使用命名空间支持:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
使用 Java DSL 时,有两个选项。
首先,您可以定义 router 对象,如前面的示例所示:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
请注意,路由器可以是(但并非必须)是@Bean
.
如果它不是@Bean
.
其次,您可以在 DSL 流本身中定义路由函数,如下例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
HeaderValueRouter
一个HeaderValueRouter
根据各个标头值映射将 Messages 发送到通道。
当HeaderValueRouter
创建,则使用要评估的标头的名称对其进行初始化。
标头的值可以是以下两项之一:
-
任意值
-
频道名称
如果它是任意值,则需要将这些 Headers 值附加到通道名称的其他映射。 否则,无需其他配置。
Spring 集成提供了一个简单的基于名称空间的 XML 配置,用于配置一个HeaderValueRouter
.
以下示例演示了HeaderValueRouter
当需要将 Headers 值映射到 channels 时:
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
在解析过程中,上述示例中定义的路由器可能会遇到通道解析失败,从而导致异常。
如果要禁止此类异常并将未解析的消息发送到默认输出通道(以default-output-channel
属性)设置resolution-required
自false
.
通常,标头值未显式映射到通道的消息将发送到default-output-channel
.
但是,当 header 值映射到通道名称但无法解析通道时,将resolution-required
属性设置为false
导致将此类消息路由到default-output-channel
.
从 Spring Integration 2.1 开始,该属性已从ignore-channel-name-resolution-failures 自resolution-required .
属性resolution-required 默认为true . |
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
使用 Java DSL 时,有两个选项。 首先,您可以定义 router 对象,如前面的示例所示:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
请注意,路由器可以是(但并非必须)是@Bean
.
如果它不是@Bean
.
其次,您可以在 DSL 流本身中定义路由函数,如下例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}
不需要将 Headers 值映射到通道名称的配置,因为 Headers 值本身表示通道名称。 以下示例显示了一个不需要将 Headers 值映射到通道名称的路由器:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
从 Spring Integration 2.1 开始,解析通道的行为更加明确。
例如,如果省略 基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。
如果你真的想丢弃消息,你还必须有 |
RecipientListRouter
一个RecipientListRouter
将收到的每条消息发送到静态定义的消息通道列表。
以下示例创建一个RecipientListRouter
:
<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>
Spring 集成还为RecipientListRouter
配置(请参阅命名空间支持),如下例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}
以下示例显示了使用 Java DSL 配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}
这里的 'apply-sequence' 标志与它对 publish-subscribe-channel 的作用相同,并且与 publish-subscribe-channel 一样,它在recipient-list-router .
看PublishSubscribeChannel 配置了解更多信息。 |
配置RecipientListRouter
是使用 Spring 表达式语言 (SpEL) 支持作为单个收件人通道的选择器。
这样做类似于在 'chain' 的开头使用 filter 来充当 “selective consumer”。
但是,在这种情况下,它全部相当简洁地组合到路由器的配置中,如下例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>
在前面的配置中,由selector-expression
属性以确定此收件人是否应包含在给定输入消息的收件人列表中。
表达式的求值结果必须为boolean
.
如果未定义此属性,则渠道始终位于收件人列表中。
RecipientListRouterManagement
从版本 4.1 开始,RecipientListRouter
提供了多个作,用于在运行时动态作收件人。
这些管理作由RecipientListRouterManagement
通过@ManagedResource
注解。
它们可以通过使用 Control Bus 和 JMX 来使用,如下例所示:
<control-bus input-channel="controlBus"/>
<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>
<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");
从应用程序启动simpleRouter
,只有一个channel1
收件人。
但是在addRecipient
命令channel2
收件人。
这是一个 “注册对消息中的一部分的兴趣” 用例,当我们可能在某个时间段内对来自路由器的消息感兴趣时,因此我们订阅了recipient-list-router
并在某个时候决定取消订阅。
由于<recipient-list-router>
,它可以在没有任何<recipient>
从一开始。
在这种情况下,RecipientListRouter
当邮件没有一个匹配的收件人时,该规则相同。
如果defaultOutputChannel
配置后,消息将发送到该处。
否则,MessageDeliveryException
被抛出。
XPath 路由器
XPath Router 是 XML Module 的一部分。 请参见使用 XPath 路由 XML 消息。
路由和错误处理
Spring 集成还提供了一个特殊的基于类型的路由器,称为ErrorMessageExceptionTypeRouter
用于路由错误消息(定义为payload
是一个Throwable
实例)。ErrorMessageExceptionTypeRouter
类似于PayloadTypeRouter
.
事实上,它们几乎相同。
唯一的区别是,虽然PayloadTypeRouter
导航有效负载实例的实例层次结构(例如payload.getClass().getSuperclass()
) 查找最具体的类型和通道映射,则ErrorMessageExceptionTypeRouter
导航“异常原因”的层次结构(例如,payload.getCause()
) 查找最具体的Throwable
类型或通道映射和用途mappingClass.isInstance(cause)
以匹配cause
添加到类或任何超类。
在这种情况下,通道映射顺序很重要。
因此,如果需要获取IllegalArgumentException ,但不是RuntimeException ,则必须先在 router 上配置最后一个选项。 |
从 4.3 版本开始,ErrorMessageExceptionTypeRouter 在初始化阶段加载所有映射类,以快速失败ClassNotFoundException . |
以下示例显示了ErrorMessageExceptionTypeRouter
:
<int:exception-type-router input-channel="inputChannel"
default-output-channel="defaultChannel">
<int:mapping exception-type="java.lang.IllegalArgumentException"
channel="illegalChannel"/>
<int:mapping exception-type="java.lang.NullPointerException"
channel="npeChannel"/>
</int:exception-type-router>
<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />
配置通用路由器
Spring 集成提供了一个通用路由器。 你可以将它用于通用路由(与 Spring Integration 提供的其他路由器相反,每个路由器都有某种形式的专用化)。
使用 XML 配置基于内容的路由器
这router
元素提供了一种将路由器连接到输入通道的方法,并且还接受可选的default-output-channel
属性。
这ref
属性引用自定义路由器实现的 Bean 名称(必须扩展AbstractMessageRouter
).
以下示例显示了三个通用路由器:
<int:router ref="payloadTypeRouter" input-channel="input1"
default-output-channel="defaultOutput1"/>
<int:router ref="recipientListRouter" input-channel="input2"
default-output-channel="defaultOutput2"/>
<int:router ref="customRouter" input-channel="input3"
default-output-channel="defaultOutput3"/>
<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>
或者ref
可能指向包含@Router
注解(稍后显示),或者您可以将ref
替换为显式方法名称。
指定方法将应用@Router
annotation 部分。
以下示例定义了一个路由器,该路由器在其ref
属性:
<int:router input-channel="input" ref="somePojo" method="someMethod"/>
我们通常建议使用ref
属性(如果自定义路由器实现在其他<router>
定义。
但是,如果自定义路由器实现的范围应限定为<router>
,您可以提供内部 Bean 定义,如下例所示:
<int:router method="someMethod" input-channel="input3"
default-output-channel="defaultOutput3">
<beans:bean class="org.foo.MyCustomRouter"/>
</int:router>
同时使用ref attribute 和内部处理程序定义位于同一<router> 不允许配置。
这样做会产生不明确的条件并引发异常。 |
如果ref 属性引用一个扩展AbstractMessageProducingHandler (例如框架本身提供的路由器),则配置会优化为直接引用路由器。
在这种情况下,每个ref 属性必须引用单独的 bean 实例(或prototype -scoped bean)或使用内部的<bean/> 配置类型。
但是,仅当未在路由器 XML 定义中提供任何特定于路由器的属性时,此优化才适用。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。 |
以下示例显示了在 Java 中配置的等效路由器:
@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
以下示例显示了使用 Java DSL 配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(myCustomRouter())
.get();
}
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
或者,您可以路由来自消息负载的数据,如下例所示:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
.get();
}
路由器和 Spring 表达式语言 (SpEL)
有时,路由逻辑可能很简单,为它编写一个单独的类并将其配置为 bean 似乎有点矫枉过正。 从 Spring Integration 2.0 开始,我们提供了一种替代方案,允许您使用 SPEL 来实现以前需要自定义 POJO 路由器的简单计算。
有关 Spring 表达式语言的更多信息,请参见 Spring Framework Reference Guide 中的相关章节。 |
通常,会评估 SPEL 表达式,并将其结果映射到通道,如下例所示:
<int:router input-channel="inChannel" expression="payload.paymentType">
<int:mapping value="CASH" channel="cashPaymentChannel"/>
<int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
<int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>
以下示例显示了在 Java 中配置的等效路由器:
@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
router.setChannelMapping("CASH", "cashPaymentChannel");
router.setChannelMapping("CREDIT", "authorizePaymentChannel");
router.setChannelMapping("DEBIT", "authorizePaymentChannel");
return router;
}
以下示例显示了在 Java DSL 中配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route("payload.paymentType", r -> r
.channelMapping("CASH", "cashPaymentChannel")
.channelMapping("CREDIT", "authorizePaymentChannel")
.channelMapping("DEBIT", "authorizePaymentChannel"))
.get();
}
为了进一步简化,SpEL 表达式可以计算为通道名称,如下面的表达式所示:
<int:router input-channel="inChannel" expression="payload + 'Channel'"/>
在前面的配置中,结果通道由 SPEL 表达式计算,该表达式将payload
替换为文本String
、'频道'。
SPEL 配置路由器的另一个优点是表达式可以返回Collection
,有效地使每个<router>
收件人列表路由器。
每当表达式返回多个通道值时,消息就会转发到每个通道。
以下示例显示了这样的表达式:
<int:router input-channel="inChannel" expression="headers.channels"/>
在上述配置中,如果消息包含名称为“channels”的报头,并且该报头的值是List
中,消息将发送到列表中的每个通道。
当您需要选择多个渠道时,您可能还会发现集合投影和集合选择表达式很有用。
有关详细信息,请参阅:
使用注释配置路由器
使用@Router
要对方法进行注释,该方法可能会返回MessageChannel
或String
类型。
在后一种情况下,终端节点解析通道名称的方式与解析默认输出通道的方式相同。
此外,该方法可能返回单个值或集合。
如果返回集合,则回复消息将发送到多个渠道。
总而言之,以下方法签名都是有效的:
@Router
public MessageChannel route(Message message) {...}
@Router
public List<MessageChannel> route(Message message) {...}
@Router
public String route(Foo payload) {...}
@Router
public List<String> route(Foo payload) {...}
除了基于有效负载的路由之外,还可以根据消息标头中作为属性或属性提供的元数据来路由消息。
在本例中,使用@Router
可以包括一个用@Header
,该值映射到 Headers 值,如下例所示,并在 Annotation Support 中进行了记录:
@Router
public List<String> route(@Header("orderStatus") OrderStatus status)
有关基于 XML 的消息的路由 (包括 XPath 支持),请参阅 XML 支持 - 处理 XML 有效负载。 |
有关路由器配置的更多信息,另请参阅 Java DSL 一章中的消息路由器。
动态路由器
Spring 集成为常见的基于内容的路由用例提供了相当多的不同路由器配置,以及将自定义路由器实现为 POJO 的选项。
例如PayloadTypeRouter
提供了一种简单的方法来配置路由器,该路由器根据传入消息的有效负载类型计算通道,而HeaderValueRouter
在配置通过评估特定消息 Headers 的值来计算通道的路由器时提供了相同的便利。
还有基于表达式 (SpEL) 的路由器,其中通道是根据计算表达式来确定的。
所有这些类型的路由器都表现出一些动态特性。
但是,这些路由器都需要静态配置。 即使在基于表达式的路由器的情况下,表达式本身也被定义为路由器配置的一部分,这意味着对相同值进行作的相同表达式始终会导致相同通道的计算。 这在大多数情况下是可以接受的,因为此类路由定义明确,因此是可预测的。 但有时我们需要动态地更改路由器配置,以便消息流可以路由到不同的通道。
例如,您可能希望关闭系统的某些部分以进行维护,并临时将消息重新路由到其他消息流。
再举一个例子,您可能希望通过添加另一个路由来处理更具体的java.lang.Number
(在PayloadTypeRouter
).
不幸的是,使用静态路由器配置来实现这两个目标中的任何一个,您将不得不关闭整个应用程序,更改路由器的配置(更改路由),然后重新启动应用程序。 这显然不是任何人都想要的解决方案。
动态路由器模式描述了一种机制,通过这些机制,您可以动态地更改或配置路由器,而不会关闭系统或单个路由器。
在我们深入了解 Spring Integration 如何支持动态路由的细节之前,我们需要考虑路由器的典型流程:
-
计算通道标识符,这是路由器在收到消息后计算的值。 通常,它是一个 String 或实际
MessageChannel
. -
将通道标识符解析为通道名称。 我们将在本节后面介绍此过程的细节。
-
将频道名称解析为实际的
MessageChannel
如果步骤 1 导致MessageChannel
,因为MessageChannel
是任何路由器工作的最终产品。
但是,如果第一步生成的通道标识符不是MessageChannel
,您有很多可能的方法可以影响MessageChannel
.
请考虑以下有效负载类型 router 的示例:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="channel1" />
<int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>
在有效载荷类型路由器的上下文中,前面提到的三个步骤将按以下方式实现:
-
计算一个通道标识符,该标识符是有效负载类型的完全限定名称(例如
java.lang.String
). -
将通道标识符解析为通道名称,其中上一步的结果用于从
mapping
元素。 -
将通道名称解析为
MessageChannel
作为对应用程序上下文中 Bean 的引用(希望是MessageChannel
) 中,由上一步的结果标识。
换句话说,每个步骤都会为下一步提供数据,直到该过程完成。
现在考虑一个 header value router 的示例:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
<int:mapping value="foo" channel="fooChannel" />
<int:mapping value="bar" channel="barChannel" />
</int:header-value-router>
现在我们可以考虑 header value router 的三个步骤是如何工作的:
-
计算一个通道标识符,该标识符是由
header-name
属性。 -
将通道标识符解析为通道名称,其中上一步的结果用于从
mapping
元素。 -
将通道名称解析为
MessageChannel
作为对应用程序上下文中 Bean 的引用(希望是MessageChannel
) 中,由上一步的结果标识。
两种不同路由器类型的前两种配置看起来几乎相同。
但是,如果您查看HeaderValueRouter
我们清楚地看到,没有mapping
sub 元素,如下面的清单所示:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
但是,该配置仍然完全有效。 那么自然而然的问题是,第二步中的映射呢?
第二步现在是可选的。
如果mapping
未定义,则第一步中计算的通道标识符值将自动被视为channel name
,现在解析为实际的MessageChannel
,就像第三步一样。
它还意味着第二步是向路由器提供动态特性的关键步骤之一,因为它引入了一个过程,允许您更改通道标识符解析为通道名称的方式,从而影响确定MessageChannel
从初始通道标识符。
例如,在前面的配置中,假设testHeader
value 是 'kermit',它现在是通道标识符(第一步)。
由于此路由器中没有映射,因此无法将此通道标识符解析为通道名称(第二步),并且此通道标识符现在被视为通道名称。
但是,如果存在映射但值不同,该怎么办?
最终结果仍然是相同的,因为如果无法通过将通道标识符解析为通道名称来确定新值,则通道标识符将成为通道名称。
剩下的就是第三步,将通道名称 ('kermit') 解析为MessageChannel
由此名称标识。
这基本上涉及对提供的名称的 bean 查找。
现在,所有包含 header-value 对 as 的消息testHeader=kermit
将被路由到MessageChannel
其 bean 名称(其id
) 是 'kermit'。
但是,如果您想将这些消息路由到 'simpson' 通道怎么办?显然,更改静态配置是可行的,但这样做也需要关闭您的系统。
但是,如果您有权访问通道标识符映射,则可以在 header-value 对现在所在的位置引入一个新的映射kermit=simpson
,因此让第二步将 'kermit' 视为通道标识符,同时将其解析为 'simpson' 作为通道名称。
这显然也适用于PayloadTypeRouter
,您现在可以在其中重新映射或删除特定的负载类型映射。
事实上,它适用于所有其他路由器,包括基于表达式的路由器,因为它们的计算值现在有机会通过第二步解析为实际channel name
.
任何作为AbstractMappingMessageRouter
(包括大多数框架定义的路由器)是动态路由器,因为channelMapping
定义在AbstractMappingMessageRouter
水平。
该地图的 setter 方法与 'setChannelMapping' 和 'removeChannelMapping' 方法一起作为公共方法公开。
这些允许您在运行时更改、添加和删除路由器映射,只要您有对路由器本身的引用。
这也意味着你可以通过 JMX(参见 JMX 支持)或 Spring 集成控制总线(参见控制总线)功能公开这些相同的配置选项。
回退到 channel key 作为频道名称,灵活方便。
但是,如果您不信任消息创建者,恶意行为者(了解系统的人)可能会创建路由到意外通道的消息。
例如,如果将 key 设置为路由器 input 通道的通道名称,则此类消息将被路由回路由器,最终导致堆栈溢出错误。
因此,您可能希望禁用此功能(将channelKeyFallback property 设置为false ),并根据需要更改映射。 |
使用 Control Bus 管理 Router Mapping
管理路由器映射的一种方法是通过控制总线模式,它公开了一个控制通道,你可以向该通道发送控制消息以管理和监视 Spring 集成组件,包括路由器。
有关控制总线的更多信息,请参阅Control Bus。 |
通常,您将发送一条控制消息,要求对特定托管组件(例如路由器)调用特定作。 以下托管作(方法)特定于更改路由器解析过程:
-
public void setChannelMapping(String key, String channelName)
:用于在channel identifier
和channel name
-
public void removeChannelMapping(String key)
:用于删除特定通道映射,从而断开channel identifier
和channel name
请注意,这些方法可用于简单的更改(例如更新单个路由或添加或删除路由)。 但是,如果要删除一个路由并添加另一个路由,则更新不是原子的。 这意味着路由表在更新之间可能处于不确定状态。 从版本 4.0 开始,您现在可以使用 control bus 以原子方式更新整个 routing table。 以下方法允许您执行此作:
-
public Map<String, String>getChannelMappings()
:返回当前映射。 -
public void replaceChannelMappings(Properties channelMappings)
:更新映射。 请注意,channelMappings
parameter 是Properties
对象。 这种安排允许控制总线命令使用内置的StringToPropertiesConverter
,如下例所示:
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"
请注意,每个映射都由换行符 (\n
).
对于映射的编程更改,我们建议您使用setChannelMappings
方法。replaceChannelMappings
忽略不是String
对象。
使用 JMX 管理路由器映射
您还可以使用 Spring 的 JMX 支持来公开路由器实例,然后使用您最喜欢的 JMX 客户端(例如,JConsole)来管理用于更改路由器配置的那些作(方法)。
有关 Spring 集成的 JMX 支持的更多信息,请参阅 JMX 支持。 |
工艺路线单
从版本 4.1 开始, Spring 集成提供了路由单企业集成模式的实现。
它被实现为routingSlip
message 标头,用于确定AbstractMessageProducingHandler
实例,当outputChannel
未为终端节点指定。
此模式在复杂的动态情况下非常有用,此时很难配置多个路由器来确定消息流。
当消息到达没有output-channel
这routingSlip
以确定将消息发送到的下一个通道。
当路由单用尽时,正常replyChannel
处理简历。
路由单的配置显示为HeaderEnricher
option — 以分号分隔的路由单,其中包含path
条目,如下例所示:
<util:properties id="properties">
<beans:prop key="myRoutePath1">channel1</beans:prop>
<beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>
<context:property-placeholder properties-ref="properties"/>
<header-enricher input-channel="input" output-channel="process">
<routing-slip
value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>
前面的示例具有:
-
一个
<context:property-placeholder>
配置,以演示路由单中的条目path
可以指定为可解析键。 -
这
<header-enricher>
<routing-slip>
sub-元素用于填充RoutingSlipHeaderValueMessageProcessor
到HeaderEnricher
处理器。 -
这
RoutingSlipHeaderValueMessageProcessor
接受String
已解析的路由单数组path
条目和返回值(来自processMessage()
) 一个singletonMap
使用path
如key
和0
作为初始routingSlipIndex
.
工艺路线单path
条目可以包含MessageChannel
bean 名称,RoutingSlipRouteStrategy
bean 名称和 Spring 表达式 (SpEL)。
这RoutingSlipHeaderValueMessageProcessor
检查每个路由单path
条目对BeanFactory
在第一个processMessage
调用。
它将条目(不是应用程序上下文中的 bean 名称)转换为ExpressionEvaluatingRoutingSlipRouteStrategy
实例。RoutingSlipRouteStrategy
条目被多次调用,直到它们返回 null 或空String
.
由于路由单涉及getOutputChannel
process 中,我们有一个 request-reply 上下文。
这RoutingSlipRouteStrategy
已引入以确定下一个outputChannel
,它使用requestMessage
和reply
对象。
此策略的实现应在应用程序上下文中注册为 Bean,并在路由单中使用其 Bean 名称path
.
这ExpressionEvaluatingRoutingSlipRouteStrategy
implementation 的 implementation 的 S
它接受一个 SPEL 表达式和一个内部ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply
object 用作 evaluation 上下文的根对象。
这是为了避免EvaluationContext
为每个ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()
调用。
它是一个简单的 Java Bean,具有两个属性:Message<?> request
和Object reply
.
通过这个表达式实现,我们可以指定路由滑移path
条目(例如,@routingSlipRoutingPojo.get(request, reply)
和request.headers[myRoutingSlipChannel]
),并避免为RoutingSlipRouteStrategy
.
这requestMessage argument 始终为Message<?> .
根据上下文,回复对象可能是Message<?> 一AbstractIntegrationMessageBuilder 或任意应用程序域对象(例如,当它由服务激活器调用的 POJO 方法返回时)。
在前两种情况下,通常的Message 属性 (payload 和headers ) 在使用 SPEL(或 Java 实现)时可用。
对于任意域对象,这些属性不可用。
因此,如果结果用于确定下一个路径,则在将路由单与 POJO 方法结合使用时要小心。 |
在分布式环境中涉及路由单时,建议不要对路由单使用内联表达式path .
此建议适用于分布式环境,例如跨 JVM 应用程序,使用request-reply 通过消息代理(例如AMQP 支持或 JMS 支持),或使用持久性MessageStore (消息存储)在集成流中。
该框架使用RoutingSlipHeaderValueMessageProcessor 以将它们转换为ExpressionEvaluatingRoutingSlipRouteStrategy 对象,它们用于routingSlip 消息标头。
由于这个类不是Serializable (它不能,因为它依赖于BeanFactory )、整个Message 变得不可序列化,并且在任何分布式作中,我们最终都会得到一个NotSerializableException .
要克服此限制,请注册一个ExpressionEvaluatingRoutingSlipRouteStrategy bean 替换为所需的 SPEL,并在路由单中使用其 bean 名称path 配置。 |
对于 Java 配置,您可以添加RoutingSlipHeaderValueMessageProcessor
实例添加到HeaderEnricher
bean 定义,如下例所示:
@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
"@routingSlipRoutingPojo.get(request, reply)",
"routingSlipRoutingStrategy",
"request.headers[myRoutingSlipChannel]",
"finishChannel")));
}
当终端节点生成回复且没有outputChannel
已定义:
-
这
routingSlipIndex
用于从工艺路线单中获取值path
列表。 -
如果
routingSlipIndex
是String
,它用于从BeanFactory
. -
如果返回的 Bean 是
MessageChannel
,则它用作下一个outputChannel
和routingSlipIndex
在回复消息标头中递增(路由单path
条目保持不变)。 -
如果返回的 Bean 是
RoutingSlipRouteStrategy
及其getNextPath
不会返回空的String
,该结果将用作下一个outputChannel
. 这routingSlipIndex
保持不变。 -
如果
RoutingSlipRouteStrategy.getNextPath
返回一个空的String
或null
这routingSlipIndex
递增,并且getOutputChannelFromRoutingSlip
为下一个 Routing Slip 递归调用path
项目。 -
如果下一个工艺路线单
path
entry 不是String
,它必须是RoutingSlipRouteStrategy
. -
当
routingSlipIndex
超过工艺路线单的大小path
list 时,算法将移动到标准replyChannel
页眉。
Process Manager 企业集成模式
企业集成模式包括 Process Manager 模式。
现在,您可以通过使用封装在RoutingSlipRouteStrategy
在工艺路线单中。
除了 bean 名称之外,RoutingSlipRouteStrategy
可以返回任何MessageChannel
object,并且不需要 thisMessageChannel
instance 是应用程序上下文中的 bean。
这样,当无法预测应该使用哪个 channel 时,我们可以提供强大的 dynamic routing logic。
一个MessageChannel
可以在RoutingSlipRouteStrategy
然后返回。
一个FixedSubscriberChannel
替换为关联的MessageHandler
对于此类情况,implementation 是一个很好的组合。
例如,你可以路由到 Reactive Streams,如下例所示:
@Bean
public PollableChannel resultsChannel() {
return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
? new FixedSubscriberChannel(m ->
Mono.just((String) m.getPayload())
.map(String::toUpperCase)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
: new FixedSubscriberChannel(m ->
Mono.just((Integer) m.getPayload())
.map(v -> v * 2)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}
Filter
邮件过滤器用于决定Message
应根据某些条件(如消息头值或消息内容本身)传递或删除。
因此,消息过滤器类似于路由器,不同之处在于,对于从过滤器的 input 通道接收的每条消息,同一条消息可能会也可能不会发送到过滤器的输出通道。
与路由器不同,它不决定将消息发送到哪个消息通道,而只决定是否发送消息。
正如我们在本节后面描述的那样,过滤器还支持 discard 通道。 在某些情况下,它可以根据布尔条件扮演非常简单的路由器(或“交换机”)的角色。 |
在 Spring 集成中,您可以将消息过滤器配置为消息端点,该端点委托给MessageSelector
接口。
该接口本身非常简单,如下面的清单所示:
public interface MessageSelector {
boolean accept(Message<?> message);
}
这MessageFilter
constructor 接受一个 selector 实例,如下例所示:
MessageFilter filter = new MessageFilter(someSelector);
结合命名空间和 SPEL,您可以使用非常少的 Java 代码配置强大的过滤器。
使用 XML 配置过滤器
您可以使用<filter>
元素用于创建消息选择端点。
除了input-channel
和output-channel
属性,它需要一个ref
属性。
这ref
可以指向MessageSelector
实现,如下例所示:
<int:filter input-channel="input" ref="selector" output-channel="output"/>
<bean id="selector" class="example.MessageSelectorImpl"/>
或者,您也可以添加method
属性。
在这种情况下,ref
attribute 可以引用任何对象。
引用的方法可能需要Message
type 或入站消息的有效负载类型。
该方法必须返回布尔值。
如果该方法返回 'true',则消息将发送到输出通道。
以下示例显示如何配置使用method
属性:
<int:filter input-channel="input" output-channel="output"
ref="exampleObject" method="someBooleanReturningMethod"/>
<bean id="exampleObject" class="example.SomeObject"/>
如果选择器或改编的 POJO 方法返回false
中,有一些设置控制对被拒绝消息的处理。
默认情况下,(如果配置如上例所示)被拒绝的消息将被静默丢弃。
如果 rejection 应改为导致错误条件,请将throw-exception-on-rejection
属性设置为true
,如下例所示:
<int:filter input-channel="input" ref="selector"
output-channel="output" throw-exception-on-rejection="true"/>
如果您希望将被拒绝的消息路由到特定通道,请将该引用作为discard-channel
,如下例所示:
<int:filter input-channel="input" ref="selector"
output-channel="output" discard-channel="rejectedMessages"/>
另请参阅 Advising Filters。
消息过滤器通常与发布-订阅通道结合使用。 许多筛选条件终端节点可能订阅了同一个通道,它们决定是否将消息传递给下一个终端节点,该终端节点可以是任何支持的类型(例如服务激活器)。 这提供了一种被动的替代方法,而不是使用具有单个点对点输入通道和多个输出通道的消息路由器的更主动的方法。 |
我们建议使用ref
属性(如果自定义过滤器实现在其他<filter>
定义。
但是,如果自定义过滤器实现的范围限定为单个<filter>
元素,你应该提供一个内部 Bean 定义,如下例所示:
<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
<beans:bean class="org.foo.MyCustomFilter"/>
</filter>
同时使用ref attribute 和内部处理程序定义位于同一<filter> 不允许配置,因为它会创建不明确的条件并引发异常。 |
如果ref 属性引用一个扩展MessageFilter (例如框架本身提供的过滤器),通过将输出通道直接注入到 Filter bean 中来优化配置。
在这种情况下,每个ref 必须发送到单独的 bean 实例(或prototype -scoped bean)或使用内部的<bean/> 配置类型。
但是,仅当未在筛选器 XML 定义中提供任何特定于筛选器的属性时,此优化才适用。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。 |
随着 SPEL 支持的引入, Spring 集成添加了expression
属性添加到 filter 元素中。
它可以用于完全避免 Java 进行简单的过滤器,如下例所示:
<int:filter input-channel="input" expression="payload.equals('nonsense')"/>
作为 expression 属性的值传递的字符串将作为 SpEL 表达式进行评估,并在评估上下文中提供消息。
如果必须在应用程序上下文的范围内包含表达式的结果,则可以使用 SPEL 参考文档中定义的表示法,如下例所示:#{}
<int:filter input-channel="input"
expression="payload.matches(#{filterPatterns.nonsensePattern})"/>
如果表达式本身需要是动态的,则可以使用 'expression' 子元素。
这为通过 key 从ExpressionSource
.
这是一个可以直接实现的策略接口,或者你可以依赖 Spring Integration 中提供的版本,该版本从“资源包”加载表达式,并可以在给定的秒数后检查修改。
以下配置示例演示了所有这些,其中,如果底层文件已被修改,则可以在一分钟内重新加载表达式:
<int:filter input-channel="input" output-channel="output">
<int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>
<beans:bean id="myExpressions"
class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
<beans:property name="basename" value="config/integration/expressions"/>
<beans:property name="cacheSeconds" value="60"/>
</beans:bean>
如果ExpressionSource
bean 被命名为expressionSource
,则无需在<expression>
元素。
但是,在前面的示例中,我们展示了它以保证完整性。
'config/integration/expressions.properties' 文件(或任何具有区域设置扩展名的更具体的版本,要以加载资源包的典型方式解析)可以包含键/值对,如下例所示:
filterPatterns.example=payload > 100
所有这些使用expression 作为属性或子元素,也可以在 transformer、router、splitter、service-activator 和 header-enricher 元素中应用。
给定组件类型的语义和角色将影响评估结果的解释,就像解释方法调用的返回值一样。
例如,表达式可以返回 router 组件将视为消息通道名称的字符串。
但是,根据作为根对象的消息评估表达式并解析前缀为“@”的 bean 名称的底层功能在 Spring 集成中的所有核心 EIP 组件中是一致的。 |
配置带有注释的过滤器
以下示例显示如何使用注释配置筛选条件:
public class PetFilter {
...
@Filter (1)
public boolean dogsOnly(String input) {
...
}
}
1 | 指示此方法将用作过滤器的注释。 如果要将此类用作过滤器,则必须指定它。 |
XML 元素提供的所有配置选项也可用于@Filter
注解。
过滤器可以从 XML 中显式引用,或者如果@MessageEndpoint
注解 (annotation) 在类上定义,通过 Classpath 扫描自动检测。
另请参阅使用注释通知终端节点。
分配器
拆分器是一个组件,其作用是将消息划分为多个部分,并发送生成的消息以进行独立处理。 很多时候,他们是包含聚合器的管道中的上游生产者。
编程模型
用于执行拆分的 API 由一个基类AbstractMessageSplitter
.
它是一个MessageHandler
封装 splitter 通用功能的实现,例如填写适当的消息标头 (CORRELATION_ID
,SEQUENCE_SIZE
和SEQUENCE_NUMBER
) 对生成的消息进行分配。
通过此填充,可以跟踪消息及其处理结果(在典型情况下,这些标头将复制到由各种转换终端节点生成的消息中)。
然后,这些值可以由组合消息处理器等使用。
以下示例显示了AbstractMessageSplitter
:
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的 splitter,您可以扩展AbstractMessageSplitter
并实施splitMessage
方法,其中包含用于拆分消息的逻辑。
返回值可以是以下值之一:
-
一个
Collection
或消息数组或Iterable
(或Iterator
) 迭代消息。 在这种情况下,消息将作为消息发送(在CORRELATION_ID
,SEQUENCE_SIZE
和SEQUENCE_NUMBER
填充)。 使用此方法可以为您提供更多控制 — 例如,在拆分过程中填充自定义消息标头。 -
一个
Collection
或非消息对象数组或Iterable
(或Iterator
) 迭代非 message 对象。 它的工作方式与前一种情况类似,不同之处在于每个 collection 元素都用作消息有效负载。 使用此方法,您可以专注于域对象,而不必考虑消息传送系统,并生成更易于测试的代码。 -
一个
Message
或非消息对象(但不是集合或数组)。 它的工作方式与前面的情况类似,只是只发送了一条消息。
在 Spring 集成中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。
在这种情况下,该方法的返回值将如前所述进行解释。
input 参数可以是Message
或简单的 POJO。
在后一种情况下,拆分器接收传入消息的有效负载。
我们推荐这种方法,因为它将代码与 Spring Integration API 分离,并且通常更容易测试。
迭代器
从版本 4.1 开始,AbstractMessageSplitter
支持Iterator
type 的value
进行拆分。
请注意,在Iterator
(或Iterable
),我们无权访问底层项目的数量,并且SEQUENCE_SIZE
header 设置为0
.
这意味着默认的SequenceSizeReleaseStrategy
的<aggregator>
不起作用,并且CORRELATION_ID
从splitter
不会被发布;它将保持为incomplete
.
在这种情况下,您应该使用适当的自定义ReleaseStrategy
或依赖send-partial-result-on-expiry
䋰group-timeout
或MessageGroupStoreReaper
.
从版本 5.0 开始,AbstractMessageSplitter
提供protected obtainSizeIfPossible()
方法来确定Iterable
和Iterator
对象。
例如XPathMessageSplitter
可以确定底层证券的大小NodeList
对象。
从版本 5.0.9 开始,此方法还会正确返回com.fasterxml.jackson.core.TreeNode
.
一Iterator
object 有助于避免在拆分之前在内存中构建整个集合。
例如,当基础项目是从某个外部系统(例如 DataBase 或 FTP)填充的MGET
) 使用迭代或流。
Stream 和 Flux
从版本 5.0 开始,AbstractMessageSplitter
支持 JavaStream
和 Reactive StreamsPublisher
types 的value
进行拆分。
在这种情况下,目标Iterator
构建在其迭代功能之上。
此外,如果分离器的输出通道是ReactiveStreamsSubscribableChannel
这AbstractMessageSplitter
生成一个Flux
result 而不是Iterator
,并且输出通道订阅了此Flux
用于对下游流量需求进行基于背压的分流。
从版本 5.2 开始,splitter 支持discardChannel
用于发送 Split 函数为其返回空容器(collection、array、stream、Flux
等)。
在这种情况下,没有要迭代的 item 来发送到outputChannel
.
这null
拆分结果仍作为流结束指示器。
使用 XML 配置 Splitter
可以通过 XML 配置拆分器,如下所示:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 拆分器的 ID 是可选的。 |
2 | 对在应用程序上下文中定义的 Bean 的引用。
Bean 必须实现拆分逻辑,如前面的部分所述。
自选。
如果未提供对 Bean 的引用,则假定到达input-channel 是java.util.Collection 默认的拆分逻辑应用于集合,将每个单独的元素合并到一条消息中,并将其发送到output-channel . |
3 | 实现拆分逻辑的方法(在 Bean 上定义)。 自选。 |
4 | 分路器的 input 通道。 必填。 |
5 | 拆分器将拆分传入消息的结果发送到的通道。 可选(因为传入消息可以自己指定回复通道)。 |
6 | 在切分结果为空的情况下,请求消息发送到的频道。
可选(它们将停止,就像null result) 的 Result) |
我们建议使用ref
属性(如果自定义 Splitter 实现可以在其他<splitter>
定义。
但是,如果自定义拆分器处理程序实现的范围应限定为<splitter>
中,您可以配置内部 Bean 定义,如下例所示:
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
同时使用 aref attribute 和内部处理程序定义位于同一<int:splitter> 不允许配置,因为它会产生不明确的条件并导致引发异常。 |
如果ref 属性引用一个扩展AbstractMessageProducingHandler (例如框架本身提供的 splitters),通过将 output channel 直接注入到 handler 中来优化配置。
在这种情况下,每个ref 必须是一个单独的 bean 实例(或者prototype -scoped bean)或使用内部的<bean/> 配置类型。
但是,仅当未在拆分器 XML 定义中提供任何特定于拆分器的属性时,此优化才适用。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。 |
使用注释配置 Splitter
这@Splitter
注解适用于需要Message
type 或消息负载类型,并且该方法的返回值应为Collection
任何类型的
如果返回的值不是实际值Message
对象中,每个项目都包装在Message
作为Message
.
每个结果Message
发送到终端节点的指定输出通道,在该终端节点上,@Splitter
已定义。
以下示例演示如何使用@Splitter
注解:
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
另请参阅使用注释、拆分器和文件拆分器为终端节点提供建议。
聚合
基本上,聚合器是拆分器的镜像,是一种消息处理程序,它接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游使用者。
从技术上讲,聚合器比 splitter 更复杂,因为它是有状态的。
它必须保存要聚合的消息,并确定何时准备好聚合整个消息组。
为此,它需要一个MessageStore
.
功能性
Aggregator 通过关联和存储一组相关消息来组合这些消息,直到该组被视为完整为止。 此时,聚合器通过处理整个组来创建单个消息,并将聚合消息作为输出发送。
实现聚合器需要提供执行聚合(即,从多个中创建单个消息)的逻辑。 两个相关概念是 correlation 和 release。
Correlation 确定如何对消息进行分组以进行聚合。
在 Spring Integration 中,默认情况下,关联是基于IntegrationMessageHeaderAccessor.CORRELATION_ID
消息标头。
具有相同IntegrationMessageHeaderAccessor.CORRELATION_ID
组合在一起。
但是,您可以自定义关联策略,以允许以其他方式指定应如何将消息分组在一起。
为此,您可以实现CorrelationStrategy
(本章稍后将介绍)。
要确定一组消息的准备处理点,请使用ReleaseStrategy
被咨询。
当序列中包含的所有消息都存在时,聚合器的默认发布策略会根据IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
页眉。
您可以通过提供对自定义ReleaseStrategy
实现。
编程模型
聚合 API 由许多类组成:
-
界面
MessageGroupProcessor
及其子类:MethodInvokingAggregatingMessageGroupProcessor
和ExpressionEvaluatingMessageGroupProcessor
-
这
ReleaseStrategy
interface 及其默认实现:SimpleSequenceSizeReleaseStrategy
-
这
CorrelationStrategy
interface 及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
这AggregatingMessageHandler
(AbstractCorrelatingMessageHandler
) 是MessageHandler
实现,封装了聚合器的常见功能(以及其他相关的用例),如下所示:
-
将消息关联到要聚合的组中
-
将这些消息保存在
MessageStore
直到可以释放组 -
决定何时可以发布组
-
将已发布的组聚合到一条消息中
-
识别和响应过期的组
决定如何将消息分组在一起的责任被委派给CorrelationStrategy
实例。
决定是否可以释放消息组的责任委托给ReleaseStrategy
实例。
以下清单显示了该基地的简要亮点AbstractAggregatingMessageGroupProcessor
(实施aggregatePayloads
方法留给开发人员):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
看DefaultAggregatingMessageGroupProcessor
,ExpressionEvaluatingMessageGroupProcessor
和MethodInvokingMessageGroupProcessor
作为AbstractAggregatingMessageGroupProcessor
.
从版本 5.2 开始,Function<MessageGroup, Map<String, Object>>
策略可用于AbstractAggregatingMessageGroupProcessor
合并和计算(聚合)输出消息的标头。
这DefaultAggregateHeadersFunction
implementation 可用于返回组内没有冲突的所有 Headers 的 logic;组中的一封或多封邮件的报头缺失不被视为冲突。
冲突的标头将被省略。
随着新引入的DelegatingMessageGroupProcessor
,此函数用于任何任意(非AbstractAggregatingMessageGroupProcessor
) MessageGroupProcessor
实现。
从本质上讲,框架将提供的函数注入到AbstractAggregatingMessageGroupProcessor
实例,并将所有其他实现包装到DelegatingMessageGroupProcessor
.
The Logic: 差异AbstractAggregatingMessageGroupProcessor
和DelegatingMessageGroupProcessor
后者在调用 Delegate 策略之前不会提前计算标头,并且如果 Delegate 返回Message
或AbstractIntegrationMessageBuilder
.
在这种情况下,框架假定目标实现已负责生成一组填充到返回结果中的适当标头。
这Function<MessageGroup, Map<String, Object>>
策略的headers-function
reference 属性,作为 XML 配置的AggregatorSpec.headersFunction()
选项,以及AggregatorFactoryBean.setHeadersFunction()
用于普通 Java 配置。
这CorrelationStrategy
归AbstractCorrelatingMessageHandler
,并且具有基于IntegrationMessageHeaderAccessor.CORRELATION_ID
message 标头,如下例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor
.
它会创建一个Message
其 payload 为List
为给定组接收的负载。
这适用于使用拆分器、发布-订阅通道或上游收件人列表路由器的简单分散-收集实现。
在此类场景中使用发布-订阅通道或收件人列表路由器时,请务必启用apply-sequence 旗。
这样做会添加必要的标头:CORRELATION_ID ,SEQUENCE_NUMBER 和SEQUENCE_SIZE .
默认情况下,对于 Spring 集成中的拆分器,该行为是启用的,但不会为发布-订阅通道或收件人列表路由器启用该行为,因为这些组件可能用于不需要这些 Headers 的各种上下文中。 |
在为应用程序实施特定的聚合器策略时,您可以扩展AbstractAggregatingMessageGroupProcessor
并实施aggregatePayloads
方法。
但是,有更好的解决方案,与 API 的耦合较少,用于实现聚合逻辑,可以通过 XML 或 Comments 进行配置。
一般来说,任何 POJO 都可以实现聚合算法,前提是它提供了一个接受单个java.util.List
作为参数(也支持参数化列表)。
调用该方法用于聚合消息,如下所示:
-
如果参数是
java.util.Collection<T>
并且参数类型 T 可分配给Message
,则为聚合而积累的消息的整个列表将发送到聚合器。 -
如果参数是非参数化的
java.util.Collection
或参数类型不可分配给Message
中,该方法接收累积消息的有效负载。 -
如果返回类型不能分配给
Message
,它被视为Message
该 API 的 API 由 Framework 自动创建。
为了实现代码简单性并促进最佳实践(如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。 |
从版本 5.3 开始,在处理消息组后,AbstractCorrelatingMessageHandler
执行MessageBuilder.popSequenceDetails()
消息标头修改,以便具有多个嵌套级别的适当 splitter-aggregator 方案。
仅当消息组发布结果不是消息集合时,才会执行该作。
在这种情况下,目标MessageGroupProcessor
负责MessageBuilder.popSequenceDetails()
call 来构建这些消息。
如果MessageGroupProcessor
返回Message
一个MessageBuilder.popSequenceDetails()
将仅在sequenceDetails
与组中的第一条消息匹配。
(以前,仅当普通有效负载或AbstractIntegrationMessageBuilder
已从MessageGroupProcessor
.)
此功能可以通过新的popSequence
boolean
属性,因此MessageBuilder.popSequenceDetails()
在某些情况下,当标准拆分器尚未填充关联详细信息时,可以禁用。
这个属性本质上会撤消最近的上游所做的工作applySequence = true
在AbstractMessageSplitter
.
有关更多信息,请参阅 Splitter 。
这SimpleMessageGroup.getMessages() method 返回一个unmodifiableCollection .
因此,如果聚合 POJO 方法具有Collection<Message> 参数,则传入的参数正是Collection 实例,当您使用SimpleMessageStore 对于聚合商,该原始Collection<Message> 在释放组后被清除。
因此,Collection<Message> 如果 POJO 中的 variable 被传出 聚合器,它也会被清除。
如果您只想按原样发布该集合以进行进一步处理,则必须构建一个新的Collection (例如,new ArrayList<Message>(messages) ).
从版本 4.3 开始,框架不再将消息复制到新集合,以避免创建不需要的额外对象。 |
在版本 4.2 之前,无法提供MessageGroupProcessor
通过使用 XML 配置。
只有 POJO 方法可用于聚合。
现在,如果框架检测到引用的(或内部的)bean 实现了MessageProcessor
,它用作聚合器的输出处理器。
如果您希望从自定义MessageGroupProcessor
作为 Message 的有效负载,您的类应扩展AbstractAggregatingMessageGroupProcessor
并实施aggregatePayloads()
.
此外,从 4.2 版本开始,一个SimpleMessageGroupProcessor
。
它返回来自组的消息集合,如前所述,这会导致单独发送已发布的消息。
这使得聚合器可以充当消息屏障,其中到达的消息将被保留,直到发布策略触发并且该组作为单个消息序列发布。
从版本 6.0 开始,上述拆分行为仅在组处理器为SimpleMessageGroupProcessor
.
否则,使用任何其他MessageGroupProcessor
实现,该 API 将返回Collection<Message>
,则仅发出一条回复消息,并将整个消息集合作为其有效负载。
这种逻辑由聚合器的规范目的决定 - 通过某个键收集请求消息并生成单个分组消息。
ReleaseStrategy
这ReleaseStrategy
接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般来说,任何 POJO 都可以实现完成决策逻辑,如果它提供了一个接受单个java.util.List
作为参数(也支持参数化列表)并返回布尔值。
在每条新消息到达后调用此方法,以确定组是否完成,如下所示:
-
如果参数是
java.util.List<T>
和参数类型T
可分配给Message
,组中累积的整个消息列表将发送到该方法。 -
如果参数是非参数化的
java.util.List
或参数类型不可分配给Message
中,该方法接收累积消息的有效负载。 -
该方法必须返回
true
如果消息组已准备好进行聚合,否则为 false。
以下示例演示如何使用@ReleaseStrategy
注解List
的类型Message
:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例演示如何使用@ReleaseStrategy
注解List
的类型String
:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前两个示例中的签名,基于 POJO 的发布策略将Collection
的尚未发布的消息(如果您需要访问整个Message
) 或Collection
有效负载对象(如果 type 参数不是Message
).
这满足了大多数使用案例。
但是,如果出于某种原因,您需要访问完整的MessageGroup
中,您应该提供ReleaseStrategy
接口。
在处理可能较大的组时,您应该了解如何调用这些方法,因为在释放组之前可能会多次调用发布策略。
最有效的是 出于这些原因,对于大型组,我们建议您实施 |
当释放组进行聚合时,将处理其所有尚未发布的消息并将其从组中删除。
如果组也已完成(即,如果序列中的所有消息都已到达或未定义序列),则该组将标记为完成。
此组的任何新消息都将发送到 discard 通道(如果已定义)。
设置expire-groups-upon-completion
自true
(默认值为false
) 删除整个组,并且任何新消息(与已删除的组具有相同的相关 ID)将形成一个新组。
您可以使用MessageGroupStoreReaper
䋰send-partial-result-on-expiry
设置为true
.
为了便于丢弃延迟到达的消息,聚合器必须在组发布后维护有关该组的状态。
这最终可能导致内存不足情况。
为避免此类情况,您应该考虑配置MessageGroupStoreReaper 以删除组元数据。
应将过期参数设置为在到达某个点后使组过期,之后预计延迟消息不会到达。
有关配置收割者的信息,请参阅在 Aggregator 中管理状态:MessageGroupStore . |
Spring 集成为ReleaseStrategy
:SimpleSequenceSizeReleaseStrategy
.
此实现会参考SEQUENCE_NUMBER
和SEQUENCE_SIZE
标头,以决定消息组何时完成并准备好进行聚合。
如前所述,它也是默认策略。
在 5.0 版本之前,默认的发布策略是SequenceSizeReleaseStrategy ,这在大型组中表现不佳。
使用该策略,可以检测并拒绝重复的序列号。
此作可能很昂贵。 |
如果要聚合大型组,则不需要释放部分组,也不需要检测/拒绝重复序列,请考虑使用SimpleSequenceSizeReleaseStrategy
相反 - 对于这些用例,它的效率要高得多,并且自 5.0 版以来,当未指定 Partial Group Release 时,它是默认值。
聚合大型组
4.3 版本更改了默认值Collection
对于SimpleMessageGroup
自HashSet
(以前是BlockingQueue
).
当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。
尽管删除哈希集的速度通常要快得多,但对于大型消息来说,它的成本可能很高,因为必须在 insert 和 remove 上计算哈希值。
如果您的消息哈希成本很高,请考虑使用其他一些集合类型。
如用MessageGroupFactory
一个SimpleMessageGroupFactory
,以便您可以选择Collection
最适合你的需求。
您还可以提供自己的工厂实现来创建其他Collection<Message<?>>
.
以下示例显示了如何使用前面的实现和SimpleSequenceSizeReleaseStrategy
:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果筛选条件端点涉及聚合器上游的流,则序列大小发布策略(固定或基于sequenceSize header) 无法达到其目的,因为 filter 可能会丢弃序列中的某些消息。
在这种情况下,建议选择另一个ReleaseStrategy ,或者使用从丢弃子流发送的补偿消息,该子流的内容中包含一些信息,以便在自定义完整组函数中跳过。
有关更多信息,请参阅过滤器。 |
关联策略
这CorrelationStrategy
接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个Object
该键表示用于将消息与消息组关联的关联键。
该键必须满足Map
关于实施equals()
和hashCode()
.
通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与ServiceActivator
(包括对@Header
annotations) 的 Annotations)。
该方法必须返回一个值,并且该值不能是null
.
Spring 集成为CorrelationStrategy
:HeaderAttributeCorrelationStrategy
.
此实现返回其中一个消息标头(其名称由 constructor 参数指定)的值作为相关键。
默认情况下,关联策略是HeaderAttributeCorrelationStrategy
,返回CORRELATION_ID
header 属性。
如果您有要用于关联的自定义标头名称,则可以在HeaderAttributeCorrelationStrategy
并将其作为聚合商关联策略的参考。
锁定注册表
对组的更改是线程安全的。
因此,当您同时发送同一相关 ID 的消息时,聚合器中将只处理其中一个消息,从而使其实际上每个消息组都是单线程的。
一个LockRegistry
用于获取已解析相关 ID 的锁。
一个DefaultLockRegistry
默认使用 (in-memory)。
用于在共享MessageGroupStore
正在使用,则必须配置共享锁注册表。
避免死锁
如上所述,当消息组发生更改(添加或释放消息)时,将持有一个锁。
请考虑以程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。
这将导致线程挂起,并且jstack <pid>
可能会出现如下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题:
-
确保每个聚合器都有自己的 Lock Registry(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表)
-
使用
ExecutorChannel
或QueueChannel
作为聚合器的输出通道,以便下游流在新线程上运行 -
从版本 5.1.1 开始,将
releaseLockBeforeSend
aggregator 属性设置为true
如果由于某种原因,单个聚合器的输出最终路由回同一聚合器,也可能导致此问题。 当然,上述第一种解决方案不适用于这种情况。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器的信息,请参见 Aggregators 和 Resequencers。
使用 XML 配置聚合器
Spring 集成支持通过 XML 配置聚合器<aggregator/>
元素。
以下示例显示了聚合器的示例:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合商的 ID 是可选的。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动聚合器。 可选(默认值为 'true')。 |
3 | 聚合器从中接收消息的通道。 必填。 |
4 | 聚合器将聚合结果发送到的通道。 可选(因为传入消息本身可以在 'replyChannel' 消息头中指定回复通道)。 |
5 | 聚合器将超时消息发送到的通道(如果send-partial-result-on-expiry 是false ).
自选。 |
6 | 对MessageGroupStore 用于将消息组存储在其关联键下,直到它们完成。
自选。
默认情况下,它是一个易失性内存存储。
有关更多信息,请参阅 Message Store 。 |
7 | 当多个 handle 订阅到同一DirectChannel (用于负载平衡目的)。
自选。 |
8 | 表示过期的消息应被聚合,并在其包含MessageGroup 已过期(请参阅MessageGroupStore.expireMessageGroups(long) ).
使MessageGroup 是通过配置MessageGroupStoreReaper .
但是,您也可以选择过期MessageGroup 通过调用MessageGroupStore.expireMessageGroups(timeout) .
您可以通过 Control Bus作来实现,或者,如果您引用了MessageGroupStore 实例中,通过调用expireMessageGroups(timeout) .
否则,此属性本身不会执行任何作。
它仅用作指示是否丢弃或将仍在MessageGroup 即将过期。
可选(默认值为false ).
注意:此属性可能更恰当地称为send-partial-result-on-timeout ,因为在以下情况下,该组实际上可能不会过期expire-groups-upon-timeout 设置为false . |
9 | 发送回复时要等待的超时间隔Message 到output-channel 或discard-channel .
默认为-1 ,这会导致无限期阻塞。
仅当输出通道具有一些 “发送” 限制(例如QueueChannel 具有固定的 'capacity' 。
在这种情况下,MessageDeliveryException 被抛出。
为AbstractSubscribableChannel implementations、send-timeout 被忽略。
为group-timeout(-expression) 这MessageDeliveryException 从 Scheduled expiring task (计划过期任务) 将导致此任务被重新计划。
自选。 |
10 | 对实现消息关联(分组)算法的 Bean 的引用。
该 bean 可以是CorrelationStrategy interface 或 POJO 的 API API 中。
在后一种情况下,correlation-strategy-method attribute 的 intent 值。
可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID 标头)。 |
11 | 在 Bean 上定义的方法,由correlation-strategy .
它实现了关联决策算法。
可选,但有限制 (correlation-strategy 必须存在)。 |
12 | 表示关联策略的 SPEL 表达式。
例:"headers['something']" .
只有其中之一correlation-strategy 或correlation-strategy-expression 是允许的。 |
13 | 对在应用程序上下文中定义的 Bean 的引用。 如前所述,Bean 必须实现聚合逻辑。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。 |
14 | 在 bean 上定义的方法,该方法由ref 属性。
它实现了消息聚合算法。
可选(取决于ref 属性)。 |
15 | 对实现发布策略的 bean 的引用。
该 bean 可以是ReleaseStrategy interface 或 POJO 的 API API 中。
在后一种情况下,release-strategy-method attribute 的 intent 值。
可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header 属性)。 |
16 | 在 bean 上定义的方法,该方法由release-strategy 属性。
它实现了完成决策算法。
可选,但有限制 (release-strategy 必须存在)。 |
17 | 表示发布策略的 SPEL 表达式。
表达式的根对象是一个MessageGroup .
例:"size() == 5" .
只有其中之一release-strategy 或release-strategy-expression 是允许的。 |
18 | 当设置为true (默认值为false ),已完成的组将从邮件存储中删除,从而让具有相同关联的后续邮件形成一个新组。
默认行为是将与已完成组具有相同关联的消息发送到discard-channel . |
19 | 仅当MessageGroupStoreReaper 配置为MessageStore 的<aggregator> .
默认情况下,当MessageGroupStoreReaper 配置为使部分组过期,则还会删除空组。
在正常释放组后,存在空组。
空组允许检测和丢弃延迟到达的消息。
如果您希望使空组过期的时间比使部分组过期的时间更长,请设置此属性。
然后,空组不会从MessageStore 直到它们至少在此毫秒数内未被修改。
请注意,空组过期的实际时间也受 reaper 的timeout property,它可以是此值加上 timeout 的值。 |
20 | 对org.springframework.integration.util.LockRegistry 豆。
它曾经获得一个Lock 基于groupId 对于MessageGroup .
默认情况下,内部的DefaultLockRegistry 被使用。
使用分布式LockRegistry ,例如ZookeeperLockRegistry ,确保只有一个聚合器实例可以同时对组进行作。
有关更多信息,请参阅 Redis Lock Registry 或 Zookeeper Lock Registry。 |
21 | 超时(以毫秒为单位),用于强制MessageGroup complete 时ReleaseStrategy 在当前消息到达时不释放组。
当需要发出部分结果(或丢弃组)时,如果新消息未到达MessageGroup 在超时内,从最后一条消息到达的时间开始计算。
要设置一个超时,该超时从MessageGroup 被创建,请参阅group-timeout-expression 信息。
当新消息到达聚合器时,任何现有的ScheduledFuture<?> 对于其MessageGroup 已取消。
如果ReleaseStrategy 返回false (表示不发布)和groupTimeout > 0 ,则计划新任务使组过期。
我们不建议将此属性设置为零(或负值)。
这样做可以有效地禁用聚合器,因为每个消息组都会立即完成。
但是,您可以使用表达式有条件地将其设置为零(或负值)。
看group-timeout-expression 以获取信息。
完成期间执行的作取决于ReleaseStrategy 和send-partial-group-on-expiry 属性。
有关更多信息,请参阅 Aggregator 和 Group Timeout 。
它与 'group-timeout-expression' 属性互斥。 |
22 | 计算结果为groupTimeout 使用MessageGroup 作为#root evaluation context 对象。
用于调度MessageGroup 强制完成。
如果表达式的计算结果为null ,则不会计划完成。
如果计算结果为零,则立即在当前线程上完成该组。
实际上,这提供了一个动态的group-timeout 财产。
例如,如果您希望强制完成MessageGroup 自创建组以来经过 10 秒后,您可以考虑使用以下 SPEL 表达式:timestamp + 10000 - T(System).currentTimeMillis() 哪里timestamp 由MessageGroup.getTimestamp() 作为MessageGroup 这是#root evaluation context 对象。
但请记住,组创建时间可能与第一条消息的时间不同,具体取决于其他组过期属性的配置。
看group-timeout 了解更多信息。
与 'group-timeout' 属性互斥。 |
23 | 当组由于超时(或MessageGroupStoreReaper ),则默认情况下,该组已过期(完全删除)。
迟到的消息将启动一个新组。
您可以将其设置为false 以完成组,但保留其元数据,以便丢弃延迟到达的消息。
空组可以稍后使用MessageGroupStoreReaper 与empty-group-min-timeout 属性。
它默认为 'true'。 |
24 | 一个TaskScheduler bean 引用来调度MessageGroup 如果没有新消息到达,则强制完成MessageGroup 在groupTimeout .
如果未提供,则默认调度程序 (taskScheduler )ApplicationContext (ThreadPoolTaskScheduler ) 被使用。
如果满足以下条件,则此属性不适用group-timeout 或group-timeout-expression 未指定。 |
25 | 从 4.1 版本开始。
它允许为forceComplete 操作。
它是从group-timeout(-expression) 或按MessageGroupStoreReaper ,并且不会应用于 normaladd ,release 和discard 操作。
只有此子元素或<expire-advice-chain/> 是允许的。 |
26 | 从 4.1 版本开始。
它允许配置任何Advice 对于forceComplete 操作。
它是从group-timeout(-expression) 或按MessageGroupStoreReaper ,并且不会应用于 normaladd ,release 和discard 操作。
只有此子元素或<expire-transactional/> 是允许的。
交易Advice 也可以使用 Spring 在此处进行配置tx Namespace。 |
过期组
有两个属性与过期(完全删除)组相关。
当组过期时,没有该组的记录,如果到达具有相同关联的新消息,则会启动一个新组。
当组完成(无过期)时,空组将保留,并丢弃延迟到达的消息。
稍后可以使用
如果组未正常完成,但因超时而被释放或丢弃,则组通常已过期。
从版本 4.1 开始,您可以通过使用
从 5.0 版本开始,空组也会被安排在 从版本 5.4 开始,可以将聚合器(和重新排序器)配置为使孤立组(持久性消息存储中的组)过期,否则可能不会发布)。
这 |
我们通常建议使用ref
属性(如果自定义聚合器处理程序实现可以在其他<aggregator>
定义。
但是,如果自定义聚合器实现仅由<aggregator>
,您可以使用内部 Bean 定义(从版本 1.0.3 开始)在<aggregator>
元素,如下例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
同时使用 aref 属性和内部 Bean 定义位于同一<aggregator> 不允许配置,因为它会产生不明确的条件。
在这种情况下,将引发 Exception。 |
以下示例显示了聚合器 Bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 Bean 的实现可能如下所示:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
只要有必要这样做,就可以将发布策略方法和 aggregator 方法组合成一个 bean。 |
上面示例的相关策略 bean 的实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将按某个标准对数字进行分组(在本例中为除以 10 后的余数)并保留该组,直到有效负载提供的数字之和超过特定值。
只要有必要,就可以将发布策略方法、相关策略方法和 聚合器方法组合到单个 bean 中。 (实际上,它们全部或其中任何两个都可以组合。 |
聚合器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,你可以使用 SPEL 处理各种策略(关联、发布和聚合),如果这种发布策略背后的逻辑相对简单,我们建议这样做。
假设您有一个 legacy 组件,该组件旨在接收对象数组。
我们知道,默认发布策略将所有聚合消息组合到List
.
现在我们有两个问题。
首先,我们需要从列表中提取单个消息。
其次,我们需要提取每条消息的有效负载并组装对象数组。
以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SPEL,实际上可以通过单行表达式相对容易地处理此类需求,从而避免编写自定义类并将其配置为 bean。 以下示例显示了如何执行此作:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载中组装一个新的集合,然后将其转换为数组,从而获得与早期 Java 代码相同的结果。
在处理自定义发布和关联策略时,您可以应用相同的基于表达式的方法。
而不是为自定义CorrelationStrategy
在correlation-strategy
属性中,您可以将简单的关联逻辑实现为 SpEL 表达式,并在correlation-strategy-expression
属性,如下例所示:
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效负载具有person
属性替换为id
,它将用于关联消息。
同样,对于ReleaseStrategy
,您可以将发布逻辑实现为 SpEL 表达式,并在release-strategy-expression
属性。
评估上下文的根对象是MessageGroup
本身。
这List
的消息可以使用message
表达式中组的属性。
在 5.0 版之前的版本中,根对象是Message<?> ,如前面的示例所示: |
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SPEL 评估上下文的根对象是MessageGroup
本身,并且您声明,只要有一条 payload 为5
在此组中,应释放该组。
聚合器和组超时
从版本 4.0 开始,引入了两个新的互斥属性:group-timeout
和group-timeout-expression
.
请参阅使用 XML 配置聚合器。
在某些情况下,如果ReleaseStrategy
在当前消息到达时不释放。
为此,groupTimeout
选项允许调度MessageGroup
强制完成,如下例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器按顺序接收最后一条消息,则可以正常发布release-strategy-expression
.
如果该特定消息未到达,则groupTimeout
强制组在 10 秒后完成,只要该组至少包含两个 Message。
强制组完成的结果取决于ReleaseStrategy
和send-partial-result-on-expiry
.
首先,再次咨询发布策略,看看是否要进行正常的发布。
虽然组没有更改,ReleaseStrategy
此时可以决定释放该组。
如果发布策略仍未释放该组,则表示该组已过期。
如果send-partial-result-on-expiry
是true
、(部分)MessageGroup
作为普通聚合器回复消息发布到output-channel
.
否则,它将被丢弃。
两者之间有区别groupTimeout
behavior 和MessageGroupStoreReaper
(请参阅 使用 XML 配置聚合器)。
收割者为所有MessageGroup
s 在MessageGroupStore
周期性地。
这groupTimeout
为每个MessageGroup
如果新消息在groupTimeout
.
此外,reaper 可用于删除空组(如果expire-groups-upon-completion
为 false)。
从版本 5.5 开始,groupTimeoutExpression
可以评估为java.util.Date
实例。
这在根据组创建时间确定计划任务时刻 (MessageGroup.getTimestamp()
) 而不是当前消息到达,因为它是在groupTimeoutExpression
的计算结果为long
:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器
以下示例显示了配置了注释的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 一个注释,指示此方法应用作聚合器。 如果将此类用作聚合器,则必须指定它。 |
2 | 一个注释,指示此方法用作聚合器的发布策略。
如果任何方法中都不存在,则聚合器将使用SimpleSequenceSizeReleaseStrategy . |
3 | 一个注释,指示此方法应用作聚合器的关联策略。
如果未指示关联策略,则聚合器使用HeaderAttributeCorrelationStrategy 基于CORRELATION_ID . |
XML 元素提供的所有配置选项也可用于@Aggregator
注解。
可以从 XML 中显式引用聚合器,或者如果@MessageEndpoint
在类上定义,通过 Classpath 扫描自动检测。
注解配置 (@Aggregator
和其他)仅涵盖简单的使用案例,其中大多数默认选项就足够了。
如果您在使用 annotation 配置时需要对这些选项进行更多控制,请考虑使用@Bean
定义AggregatingMessageHandler
并标记其@Bean
method 替换为@ServiceActivator
,如下例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
请参见 Programming Model 和注释@Bean
方法了解更多信息。
从版本 4.2 开始,AggregatorFactoryBean 可用于简化 Java 的 Java 配置AggregatingMessageHandler . |
在 Aggregator 中管理状态:MessageGroupStore
Aggregator(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它要求根据一段时间内到达的一组消息做出决策,所有这些消息都具有相同的关联键。
有状态模式(例如ReleaseStrategy
) 由以下原则驱动:组件(无论是由框架定义还是由用户定义)应该能够保持无状态。
所有状态都由MessageGroup
,其管理权委托给MessageGroupStore
.
这MessageGroupStore
接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
这MessageGroupStore
在 中积累状态信息MessageGroups
等待发布策略触发时,该事件可能永远不会发生。
因此,为了防止过时的消息延迟,并让易失性存储提供一个钩子,以便在应用程序关闭时进行清理,使用MessageGroupStore
允许您注册回调以应用于其MessageGroups
当它们到期时。
该界面非常简单,如下面的清单所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。
这MessageGroupStore
维护这些回调的列表,该列表按需应用于时间戳早于作为参数提供的时间的所有消息(请参阅registerMessageGroupExpiryCallback(..)
和expireMessageGroups(..)
方法)。
重要的是不要使用相同的MessageGroupStore 实例中,当您打算依赖expireMessageGroups 功能性。
每AbstractCorrelatingMessageHandler 注册自己的MessageGroupCallback 基于forceComplete() 回调。
这样,每个过期的组都可能被错误的聚合器完成或丢弃。
从版本 5.0.10 开始,UniqueExpiryCallback 从AbstractCorrelatingMessageHandler 对于 Registration(注册)回调中的MessageGroupStore .
这MessageGroupStore ,反过来,检查是否存在此类的实例,并记录错误并显示相应的消息(如果回调集中已存在该实例)。
这样,框架就不允许使用MessageGroupStore 实例,以避免上述的副作用,即不是由特定关联处理程序创建的组过期。 |
您可以调用expireMessageGroups
方法。
任何早于当前时间减去此值的消息都已过期,并应用了回调。
因此,是 store 的用户定义了消息组 “expiry” 的含义。
为了方便用户, Spring 集成以MessageGroupStoreReaper
,如下例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是一个Runnable
.
在前面的示例中,消息组存储的 expire 方法每 10 秒调用一次。
超时本身为 30 秒。
重要的是要了解MessageGroupStoreReaper 是一个近似值,并且受 Task Scheduler 的速率影响,因为此属性仅在下一次计划执行MessageGroupStoreReaper 任务。
例如,如果超时设置为 10 分钟,但MessageGroupStoreReaper 任务计划为每小时运行一次,最后一次执行MessageGroupStoreReaper 任务发生在超时前一分钟,则MessageGroup 在接下来的 59 分钟内不会过期。
因此,我们建议将速率设置为至少等于或更短的超时值。 |
除了 reaper 之外,当应用程序关闭时,还会通过AbstractCorrelatingMessageHandler
.
这AbstractCorrelatingMessageHandler
注册自己的 expiry 回调,这是带有 boolean 标志的 Linksend-partial-result-on-expiry
在聚合器的 XML 配置中。
如果标志设置为true
,然后,当调用过期回调时,尚未发布的组中的任何未标记消息都可以发送到输出通道。
由于MessageGroupStoreReaper 从计划任务中调用,并可能导致生成消息(取决于sendPartialResultOnExpiry 选项)添加到下游集成流程中,建议提供自定义的TaskScheduler 替换为MessagePublishingErrorHandler 处理异常errorChannel ,正如常规 Aggregator 版本功能所期望的那样。
相同的逻辑也适用于组超时功能,该功能也依赖于TaskScheduler .
有关更多信息,请参阅错误处理。 |
当共享的 一些 有关 |
Flux 聚合器
在版本 5.2 中,FluxAggregatorMessageHandler
组件。
它基于 Project ReactorFlux.groupBy()
和Flux.window()
运营商。
传入消息将发送到FluxSink
由Flux.create()
在此组件的构造函数中。
如果outputChannel
未提供,或者它不是ReactiveStreamsSubscribableChannel
,则订阅到主Flux
从Lifecycle.start()
实现。
否则,它将推迟到ReactiveStreamsSubscribableChannel
实现。
消息按Flux.groupBy()
使用CorrelationStrategy
对于组键。
默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID
标头。
默认情况下,每个关闭的窗口都作为Flux
in payload 中。
此消息包含窗口中第一条消息的所有标头。
这Flux
在输出消息中,必须订阅 payload 并在下游进行处理。
这样的逻辑可以被setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
configuration 选项的FluxAggregatorMessageHandler
.
例如,如果我们想让List
的有效负载,我们可以配置Flux.collectList()
喜欢这个:
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
“中有几个选项FluxAggregatorMessageHandler
要选择合适的窗口策略:
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到Flux.windowUntil()
算子。 有关更多信息,请参阅其 JavaDocs。 优先于所有其他窗口选项。 -
setWindowSize(int)
和setWindowSizeFunction(Function<Message<?>, Integer>)
- 传播到Flux.window(int)
或windowTimeout(int, Duration)
. 默认情况下,窗口大小是根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
页眉。 -
setWindowTimespan(Duration)
- 传播到Flux.window(Duration)
或windowTimeout(int, Duration)
取决于 Window Size 配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一个函数,用于将变换应用于 Grouped Fluxes 中,用于 Exposed 选项未涵盖的任何自定义窗口作。
由于此组件是一个MessageHandler
实现它可以简单地用作@Bean
定义与@ServiceActivator
messaging 注释。
使用 Java DSL,可以从.handle()
EIP 方法。
下面的示例演示了如何注册IntegrationFlow
在运行时,以及FluxAggregatorMessageHandler
可以与上游的 splitter 相关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
消息组的条件
从版本 5.5 开始,AbstractCorrelatingMessageHandler
(包括其 Java 和 XML DSLs)公开了一个groupConditionSupplier
选项BiFunction<Message<?>, String, String>
实现。
此函数用于添加到组的每条消息,并将结果条件句子存储到组中以供将来考虑。
这ReleaseStrategy
可以查阅此条件,而不是迭代组中的所有消息。
看GroupConditionProvider
JavaDocs 和 Message Group Condition 了解更多信息。
另请参阅 File Aggregator。
重定序器
resequencer 与 聚合器相关,但用途不同。 当聚合器合并消息时,resequencer 传递消息而不更改它们。
功能性
resequencer 的工作方式与聚合器类似,因为它使用CORRELATION_ID
将消息存储在组中。
区别在于 Resequencer 不以任何方式处理消息。
相反,它会按照SEQUENCE_NUMBER
标头值。
关于这一点,您可以选择一次释放所有消息(在整个序列之后,根据SEQUENCE_SIZE
和其他可能性)或有效序列可用时立即进行。
(我们将在本章后面介绍我们所说的 “有效序列” 的含义。
resequencer 旨在对具有较小间隙的相对较短的消息序列进行重新排序。 如果您有大量具有许多间隙的不相交序列,则可能会遇到性能问题。 |
配置 Resequencer
有关在 Java DSL 中配置重新排序器的信息,请参阅聚合器和重新排序器。
配置 resequencer 只需要在 XML 中包含适当的元素。
以下示例显示了 resequencer 配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
1 | resequencer 的 id 是可选的。 |
2 | resequencer 的 input channel。 必填。 |
3 | resequencer 将重新排序的消息发送到的通道。 自选。 |
4 | resequencer 将超时消息发送到的通道(如果send-partial-result-on-timeout 设置为false ).
自选。 |
5 | 是立即发送有序序列,还是在整个消息组到达后才发送。
自选。
(默认值为false .) |
6 | 对MessageGroupStore 这可用于将消息组存储在其关联键下,直到它们完成。
自选。
(默认值为 volatile 内存存储。 |
7 | 在组过期时,是否应发送有序的组(即使缺少某些消息)。
自选。
(默认值为 false。
看在 Aggregator 中管理状态:MessageGroupStore . |
8 | 发送回复时要等待的超时间隔Message 到output-channel 或discard-channel .
默认为-1 ,这将无限期地阻止。
仅当输出通道具有一些 “发送” 限制(例如QueueChannel 具有固定的 'capacity' 。
在这种情况下,MessageDeliveryException 被抛出。
这send-timeout 被忽略AbstractSubscribableChannel 实现。
为group-timeout(-expression) 这MessageDeliveryException 从 Scheduled Expire 任务中,将重新计划此任务。
自选。 |
9 | 对实现消息关联(分组)算法的 Bean 的引用。
该 bean 可以是CorrelationStrategy interface 或 POJO 的 API API 中。
在后一种情况下,correlation-strategy-method 属性。
自选。
(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。 |
10 | 在 Bean 上定义的方法,该方法由correlation-strategy 并且实现了 Correlation Decision 算法。
可选,但有限制(需要correlation-strategy 在场)。 |
11 | 表示关联策略的 SPEL 表达式。
例:"headers['something']" .
只有其中之一correlation-strategy 或correlation-strategy-expression 是允许的。 |
12 | 对实现发布策略的 bean 的引用。
该 bean 可以是ReleaseStrategy interface 或 POJO 的 API API 中。
在后一种情况下,release-strategy-method 属性。
可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header 属性)。 |
13 | 在 Bean 上定义的方法,该方法由release-strategy 并且实现了 completion decision 算法。
可选,但有限制(需要release-strategy 在场)。 |
14 | 表示发布策略的 SPEL 表达式。
表达式的根对象是一个MessageGroup .
例:"size() == 5" .
只有其中之一release-strategy 或release-strategy-expression 是允许的。 |
15 | 仅当MessageGroupStoreReaper 配置为<resequencer> MessageStore .
默认情况下,当MessageGroupStoreReaper 配置为使部分组过期,则还会删除空组。
正常释放组后,存在空组。
这是为了能够检测和丢弃延迟到达的消息。
如果您希望使空组过期的时间比使部分组过期的时间更长,请设置此属性。
然后,空组不会从MessageStore 直到它们至少在此毫秒数内未被修改。
请注意,使空 group 过期的实际时间也受收割者的 timeout 属性的影响,它可能等于此值加上 timeout。 |
16 | 请参阅使用 XML 配置聚合器。 |
17 | 请参阅使用 XML 配置聚合器。 |
18 | 请参阅使用 XML 配置聚合器。 |
19 | 请参阅使用 XML 配置聚合器。 |
20 | 默认情况下,当组由于超时(或由MessageGroupStoreReaper ),则会保留空组的元数据。
延迟到达的消息将被立即丢弃。
将此项设置为true 以完全删除该组。
然后,延迟到达的消息将启动一个新组,并且在该组再次超时之前不会被丢弃。
由于导致超时的序列范围中存在 “hole” ,因此新组永远不会正常释放。
空组可以在以后使用MessageGroupStoreReaper 与empty-group-min-timeout 属性。
从版本 5.0 开始,空组也会计划在empty-group-min-timeout 流逝。
默认值为 'false'。 |
另请参阅 Aggregator Expiring Groups 了解更多信息。
由于在 Java 类中没有要为 resequencers 实现的自定义行为,因此没有对它的 Comments 支持。 |
消息处理程序链
这MessageHandlerChain
是MessageHandler
可以配置为单个消息端点,同时实际委托给其他处理程序链,例如过滤器、转换器、拆分器等。
当需要以固定的线性进度连接多个处理程序时,这可能会导致更简单的配置。
例如,在其他组件之前提供 transformer 是相当常见的。
同样,当您在链中的其他组件之前提供过滤器时,您实际上是创建了一个选择性使用者。
无论哪种情况,链都只需要一个input-channel
和一个output-channel
,无需为每个单独的组件定义通道。
这MessageHandlerChain 主要是为 XML 配置设计的。
对于 Java DSL,一个IntegrationFlow definition 可以被视为一个链组件,但它与下面本章中描述的概念和原则无关。
有关更多信息,请参阅 Java DSL。 |
Spring 集成的Filter 提供 boolean 属性:throwExceptionOnRejection .
当你在同一个点对点通道上为多个选择性使用者提供具有不同接受条件时,你应该将这个值设置为'true'(默认值是false ),以便调度程序知道消息已被拒绝,并因此尝试将消息传递给其他订阅者。
如果未引发异常,则调度程序会认为消息已成功传递,即使筛选器已删除消息以防止进一步处理。
如果您确实想要 “drop” 消息,过滤器的 'discard-channel' 可能很有用,因为它确实让您有机会对丢弃的消息执行某些作(例如将其发送到 JMS 队列或将其写入日志)。 |
处理程序链简化了配置,同时在内部保持组件之间相同程度的松散耦合,如果在某些时候需要非线性排列,修改配置是很简单的。
在内部,该链被扩展为列出的端点的线性设置,由匿名通道分隔。
在链中不考虑 reply channel headers。
只有在调用最后一个处理程序后,生成的消息才会转发到回复通道或链的输出通道。
由于这种设置,除最后一个处理程序之外的所有处理程序都必须实现MessageProducer
接口(提供 'setOutputChannel()' 方法)。
如果outputChannel
在MessageHandlerChain
设置后,最后一个处理程序只需要一个 output channel。
与其他终端节点一样,output-channel 是可选的。
如果链的末尾有回复消息,则 output-channel 优先。
但是,如果它不可用,则链处理程序将检查入站消息上的回复通道头作为回退。 |
在大多数情况下,您无需实施MessageHandler
你自己。
下一节重点介绍 chain 元素的命名空间支持。
大多数 Spring 集成端点,例如服务激活器和转换器,都适合在MessageHandlerChain
.
配置链
这<chain>
元素提供了一个input-channel
属性。
如果链中的最后一个元素能够生成回复消息(可选),它还支持output-channel
属性。
然后,子元素是 filters、transformers、splitter 和 service-activator。
最后一个元素也可以是 router 或 outbound channel adapter。
以下示例显示了链定义:
<int:chain input-channel="input" output-channel="output">
<int:filter ref="someSelector" throw-exception-on-rejection="true"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:service-activator ref="someService" method="someMethod"/>
</int:chain>
这<header-enricher>
元素设置一个名为thing1
的值为thing2
在消息上。
标头扩充器是Transformer
仅触及 Headers 值。
您可以通过实现MessageHandler
这完成了 Headers 修改并将其作为 bean 进行连接,但是 Header-Enricher 是一个更简单的选项。
这<chain>
可以配置为消息流的最后一个 “closed-box” 使用者。
对于此解决方案,您可以将其放在 <链的末尾>一些 <outbound-channel-adapter>,如下例所示:
<int:chain input-channel="input">
<int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
<int:service-activator ref="someService" method="someMethod"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>
不允许的属性和元素
某些属性(例如 对于 Spring 集成核心组件,XML 模式本身会强制执行其中一些约束。 但是,对于非核心组件或您自己的自定义组件,这些约束由 XML 名称空间解析器(而不是 XML 架构)强制执行。 这些 XML 名称空间解析器约束是在 Spring Integration 2.2 中添加的。
如果您尝试使用不允许的属性和元素,XML 名称空间解析器会抛出一个 |
使用 'id' 属性
从 Spring Integration 3.0 开始,如果为 chain 元素赋予id
属性,则元素的 Bean 名称是链的id
和id
元素本身。
没有 Elementid
属性未注册为 bean,但每个属性都被赋予了一个componentName
这包括链条id
.
请考虑以下示例:
<int:chain id="somethingChain" input-channel="input">
<int:service-activator id="somethingService" ref="someService" method="someMethod"/>
<int:object-to-json-transformer/>
</int:chain>
在前面的示例中:
-
这
<chain>
根元素具有id
的 'somethingChain' 中。 因此,AbstractEndpoint
implementation (PollingConsumer
或EventDrivenConsumer
,具体取决于input-channel
type) bean 将此值作为其 bean 名称。 -
这
MessageHandlerChain
bean 获取一个 bean 别名('somethingChain.handler'),它允许从BeanFactory
. -
这
<service-activator>
不是一个成熟的消息收发终端节点(它不是PollingConsumer
或EventDrivenConsumer
). 它是一个MessageHandler
在<chain>
. 在这种情况下,使用BeanFactory
是 'somethingChain$child.somethingService.handler'。 -
这
componentName
的ServiceActivatingHandler
采用相同的值,但没有 '.handler' 后缀。 它变为 'somethingChain$child.somethingService'。 -
最后
<chain>
子组件 /<object-to-json-transformer>
中没有id
属性。 其componentName
基于它在<chain>
. 在本例中,它是 'somethingChain$child#1'。 (名称的最后一个元素是链中的顺序,以 '#0' 开头)。 请注意,此转换器未在应用程序上下文中注册为 bean,因此它不会获得beanName
. 但是,其componentName
具有一个可用于日志记录和其他目的的值。
提供显式的id 属性<chain> 元素来简化日志中子组件的标识,并提供从BeanFactory 等。 |
从 Chain 中调用 Chain
有时,您需要从链中对另一个链进行嵌套调用,然后返回并继续在原始链中执行。 为此,您可以通过包含 <gateway> 元素来使用消息传送网关,如下例所示:
<int:chain id="main-chain" input-channel="in" output-channel="out">
<int:header-enricher>
<int:header name="name" value="Many" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
<int:gateway request-channel="inputA"/>
</int:chain>
<int:chain id="nested-chain-a" input-channel="inputA">
<int:header-enricher>
<int:header name="name" value="Moe" />
</int:header-enricher>
<int:gateway request-channel="inputB"/>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
<int:chain id="nested-chain-b" input-channel="inputB">
<int:header-enricher>
<int:header name="name" value="Jack" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
在前面的示例中,nested-chain-a
在 的末尾调用main-chain
通过此处配置的 'gateway' 元素进行处理。
在nested-chain-a
,则调用nested-chain-b
是在标头扩充之后进行的。
然后,流返回以完成nested-chain-b
.
最后,流返回到main-chain
.
当<gateway>
元素定义在链中,则不需要service-interface
属性。
相反,它会获取当前状态的消息,并将其放置在request-channel
属性。
当该网关启动的下游流完成时,一个Message
返回到网关并继续在当前链中的旅程。
分散-聚集
从版本 4.1 开始, Spring 集成提供了分散-聚集企业集成模式的实现。 它是一个复合终端节点,其目标是向收件人发送消息并聚合结果。 正如 Enterprise Integration Patterns 中所指出的,它是 “best quote” 等场景的一个组件,在这种情况下,我们需要从多个供应商那里请求信息,并决定哪一个为我们提供所请求项目的最佳术语。
以前,可以使用离散元件来配置模式。 此次优化带来了更便捷的配置。
这ScatterGatherHandler
是一个请求-回复终端节点,它结合了PublishSubscribeChannel
(或RecipientListRouter
) 和AggregatingMessageHandler
.
请求消息将发送到scatter
channel 和ScatterGatherHandler
等待聚合器发送到outputChannel
.
功能性
这Scatter-Gather
pattern 建议两种情况:“auction” 和 “distribution”。
在这两种情况下,aggregation
函数相同,并提供可用于AggregatingMessageHandler
.
(实际上,ScatterGatherHandler
只需要AggregatingMessageHandler
作为构造函数参数。
有关更多信息,请参阅 Aggregator。
拍卖
拍卖会Scatter-Gather
变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是一个PublishSubscribeChannel
跟apply-sequence="true"
.
但是,此通道可以是任何MessageChannel
实现(就像request-channel
在ContentEnricher
— 请参阅 Content Enricher)。
但是,在这种情况下,您应该创建自己的自定义correlationStrategy
对于aggregation
功能。
分配
分布Scatter-Gather
variant 基于RecipientListRouter
(参见RecipientListRouter
) 替换为RecipientListRouter
.
这是第二个ScatterGatherHandler
constructor 参数。
如果只想依赖默认的correlationStrategy
对于recipient-list-router
和aggregator
,您应该指定apply-sequence="true"
.
否则,您应该提供自定义correlationStrategy
对于aggregator
.
与PublishSubscribeChannel
variant(竞价变体)中,具有recipient-list-router
selector
选项 允许根据消息筛选目标供应商。
跟apply-sequence="true"
,则默认的sequenceSize
,并且aggregator
可以正确释放组。
distribution 选项与 auction 选项互斥。
这applySequence=true 仅对于基于ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) constructor 配置,因为框架无法更改外部提供的组件。
为方便起见,使用 XML 和 Java DSLScatter-Gather 集applySequence 从版本 6.0 开始更改为 true。 |
对于拍卖和分发变体,请求 (scatter) 消息都使用gatherResultChannel
标头等待来自aggregator
.
默认情况下,所有供应商都应将其结果发送到replyChannel
标头(通常通过省略output-channel
从最终终点开始)。
但是,gatherChannel
选项,让供应商将他们的回复发送到该通道进行聚合。
配置 Scatter-Gather 端点
以下示例显示了 的 bean 定义的 Java 配置Scatter-Gather
:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们将RecipientListRouter
distributor
bean 替换为applySequence="true"
和收件人通道列表。
下一个 bean 用于AggregatingMessageHandler
.
最后,我们将这两个 bean 注入到ScatterGatherHandler
bean 定义并将其标记为@ServiceActivator
将 Scatter-gather 组件连接到集成流中。
以下示例显示如何配置<scatter-gather>
endpoint 的 XML 命名空间:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | 终端节点的 ID。
这ScatterGatherHandler bean 使用别名id + '.handler' .
这RecipientListRouter bean 使用别名id + '.scatterer' .
这AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer' .
自选。
(该BeanFactory 生成默认的id 值。 |
2 | 生命周期属性指示是否应在应用程序上下文初始化期间启动终端节点。
此外,ScatterGatherHandler 还实现了Lifecycle 以及开始和停止gatherEndpoint ,该 API 是在内部创建的,如果gather-channel 。
自选。
(默认值为true .) |
3 | 接收请求消息的通道,用于在ScatterGatherHandler .
必填。 |
4 | 将ScatterGatherHandler 发送聚合结果。
自选。
(传入邮件可以在replyChannel 消息标头)。 |
5 | 将拍卖场景的分散消息发送到的通道。
自选。
与<scatterer> sub-元素。 |
6 | 接收来自每个供应商的聚合回复的通道。
它用作replyChannel 标头。
自选。
默认情况下,FixedSubscriberChannel 已创建。 |
7 | 当多个处理程序订阅相同的组件时,此组件的顺序DirectChannel (用于负载平衡目的)。
自选。 |
8 | 指定终端节点的启动和停止阶段。
启动顺序从最低到最高,关闭顺序从最高到最低。
默认情况下,此值为Integer.MAX_VALUE ,这意味着此容器会尽可能晚地启动并尽快停止。
自选。 |
9 | 发送回复时要等待的超时间隔Message 到output-channel .
默认情况下,send() 块 1 秒。
仅当输出通道具有一些 'sending' 限制时,它才适用 — 例如,QueueChannel 具有已满的固定“容量”。
在这种情况下,MessageDeliveryException 被抛出。
这send-timeout 被忽略AbstractSubscribableChannel 实现。
为group-timeout(-expression) 这MessageDeliveryException 从 Scheduled Expire 任务中,将重新计划此任务。
自选。 |
10 | 用于指定 scatter-gather 在返回之前等待回复消息的时间。
默认情况下,它会无限期等待。
如果回复超时,则返回 'null'。
自选。
它默认为-1 ,意思是无限期等待。 |
11 | 指定 scatter-gather 是否必须返回非 null 值。
该值为true 默认情况下。
因此,一个ReplyRequiredException 当底层聚合器在gather-timeout .
注意,如果null 是可能的,gather-timeout 以避免无限期等待。 |
12 | 这<recipient-list-router> 选项。
自选。
互斥与scatter-channel 属性。 |
13 | 这<aggregator> 选项。
必填。 |
错误处理
由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。
在某些情况下,如果ReleaseStrategy
允许进程以少于请求的回复完成。
在其他情况下,当发生错误时,应考虑从子流返回类似 “补偿消息” 的内容。
每个异步子流都应该配置一个errorChannel
标头,以便从MessagePublishingErrorHandler
.
否则,将向全局errorChannel
替换为常见的错误处理逻辑。
有关异步错误处理的更多信息,请参阅错误处理。
同步流可以使用ExpressionEvaluatingRequestHandlerAdvice
忽略异常或返回补偿消息。
当异常从其中一个子流抛出到ScatterGatherHandler
,它只是被重新抛向上游。
这样,所有其他子流都将无用,并且它们的回复将在ScatterGatherHandler
.
这有时可能是预期的行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。
从版本 5.1.3 开始,ScatterGatherHandler
随errorChannelName
选择。
它被填充到errorChannel
标头,用于 When async error occurred 或可用于常规同步子流,以直接发送错误消息。
以下示例配置通过返回补偿消息来演示异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了生成正确的回复,我们必须复制 headers(包括replyChannel
和errorChannel
) 从failedMessage
的MessagingException
已发送到scatterGatherErrorChannel
由MessagePublishingErrorHandler
.
这样,目标异常将返回给ScatterGatherHandler
用于回复消息组完成。
这样的异常payload
可以在MessageGroupProcessor
的 Gatherer 或其他方式处理的下游,在 scatter-gather 端点之后。
在将分散结果发送给 Gatherer 之前,ScatterGatherHandler 恢复请求消息标头,包括 reply 和 error 通道(如果有)。
这样,来自AggregatingMessageHandler 将传播给调用方,即使在 Scatter 接收者子流中应用了异步切换也是如此。
为了成功作,需要gatherResultChannel ,originalReplyChannel 和originalErrorChannel 标头必须传输回 Scatter 收件人子流的回复。
在这种情况下,一个合理的、有限的gatherTimeout 必须为ScatterGatherHandler .
否则,默认情况下,它将永远被阻止,等待 Gatherer 的回复。 |
线程屏障
有时,我们需要暂停消息流线程,直到发生其他异步事件。 例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。 在 RabbitMQ 代理发出收到消息的确认之前,我们可能希望不回复用户。
在版本 4.2 中, Spring 集成引入了<barrier/>
组件。
底层MessageHandler
是BarrierMessageHandler
.
此类还实现MessageTriggerAction
,其中将消息传递给trigger()
方法在handleRequestMessage()
方法(如果存在)。
挂起的线程和触发线程通过调用CorrelationStrategy
在消息上。
当消息发送到input-channel
,则线程将暂停长达requestTimeout
毫秒,等待相应的触发器消息。
默认关联策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID
页眉。
当触发器消息以相同的关联到达时,线程将被释放。
发送到output-channel
使用MessageGroupProcessor
.
默认情况下,消息是Collection<?>
,标头使用DefaultAggregatingMessageGroupProcessor
.
如果trigger() method 被首先调用(或在主线程超时后),它将被暂停,直到triggerTimeout 等待 suspending 消息到达。
如果您不想暂停触发器线程,请考虑将TaskExecutor 而是使其线程被挂起。 |
在 5.4 版本之前,只有一个timeout 选项,但在某些用例中,最好为这些作设置不同的超时。
因此requestTimeout 和triggerTimeout 引入了选项。 |
这requires-reply
属性确定如果挂起的线程在触发器消息到达之前超时时要采取的作。
默认情况下,它是false
,这意味着终端节点返回null
,流结束,线程返回给调用方。
什么时候true
一个ReplyRequiredException
被抛出。
您可以调用trigger()
方法(使用名称barrier.handler
— 其中barrier
是 barrier 端点的 bean 名称)。
或者,您也可以配置<outbound-channel-adapter/>
以触发发布。
只能暂停一个具有相同关联的线程。 相同的关联可以多次使用,但只能同时使用一次。 如果第二个线程以相同的关联到达,则会引发异常。 |
以下示例演示如何使用自定义标头进行关联:
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根据哪个线程的消息先到达,将消息发送到in
或向release
最多等待 10 秒,直到另一条消息到达。
当消息被释放时,out
channel 会发送一条消息,该消息将调用自定义MessageGroupProcessor
Bean 中名为myOutputProcessor
.
如果主线程超时并且触发器稍后到达,则可以配置将延迟触发器发送到的 discard 通道。
有关此组件的示例,请参阅 barrier 示例应用程序。