此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
从版本 4.1 开始,Spring Integration 提供了分散-聚集企业集成模式的实现。 它是一个复合终结点,其目标是向收件人发送消息并聚合结果。 如企业集成模式中所述,它是“最佳报价”等场景的一个组件,我们需要从多个供应商处请求信息,并决定哪一个供应商为我们提供所请求项目的最佳术语。
以前,可以使用分立元件来配置模式。 此增强功能带来了更方便的配置。
是组合了 (或 ) 和 .
请求消息被发送到通道,并等待聚合器发送到 .ScatterGatherHandler
PublishSubscribeChannel
RecipientListRouter
AggregatingMessageHandler
scatter
ScatterGatherHandler
outputChannel
功能性
该模式建议两种情况:“拍卖”和“分配”。
在这两种情况下,函数是相同的,并提供了 .
(实际上,只需要一个作为构造函数参数。
有关详细信息,请参阅聚合器。Scatter-Gather
aggregation
AggregatingMessageHandler
ScatterGatherHandler
AggregatingMessageHandler
拍卖
拍卖变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是带有 .
但是,此通道可以是任何实现(如 in 的情况 — 请参阅内容丰富器)。
但是,在这种情况下,您应该为函数创建自己的自定义。Scatter-Gather
PublishSubscribeChannel
apply-sequence="true"
MessageChannel
request-channel
ContentEnricher
correlationStrategy
aggregation
分配
分发变体基于 (请参阅 RecipientListRouter
) 以及 .
这是第二个构造函数参数。
如果只想依赖 和 的默认值,则应指定 。
否则,应提供 .
与变体(拍卖变体)不同,有一个选项可以根据消息过滤目标供应商。
使用 ,提供默认值,并且可以正确释放组。
分配选项与拍卖选项互斥。Scatter-Gather
RecipientListRouter
RecipientListRouter
ScatterGatherHandler
correlationStrategy
recipient-list-router
aggregator
apply-sequence="true"
correlationStrategy
aggregator
PublishSubscribeChannel
recipient-list-router
selector
apply-sequence="true"
sequenceSize
aggregator
只有基于构造函数配置的纯 Java 配置才需要它,因为框架无法改变外部提供的组件。
为方便起见,XML 和 Java DSL for 从 V6.0 开始设置为 true。applySequence=true ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) Scatter-Gather applySequence |
对于拍卖和分发变体,请求(分散)消息都使用标头进行扩充,以等待来自 的回复消息。gatherResultChannel
aggregator
默认情况下,所有供应商都应将其结果发送到标头(通常通过省略最终端点)。
但是,还提供了该选项,允许供应商将他们的回复发送到该渠道进行聚合。replyChannel
output-channel
gatherChannel
只有基于构造函数配置的纯 Java 配置才需要它,因为框架无法改变外部提供的组件。
为方便起见,XML 和 Java DSL for 从 V6.0 开始设置为 true。applySequence=true ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) Scatter-Gather applySequence |
配置 Scatter-Gather 端点
以下示例显示了 Bean 定义的 Java 配置:Scatter-Gather
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们使用 Bean 和收件人通道列表进行配置。
下一个 Bean 是 .
最后,我们将这两个 Bean 都注入到 Bean 定义中,并将其标记为将分散-聚集组件连接到集成流中。RecipientListRouter
distributor
applySequence="true"
AggregatingMessageHandler
ScatterGatherHandler
@ServiceActivator
下面的示例演示如何使用 XML 命名空间配置终结点:<scatter-gather>
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | 终结点的 ID。
Bean 的注册别名为 。
Bean 的注册别名为 。
Bean 的注册别名为 。
自选。
(将生成默认值。ScatterGatherHandler id + '.handler' RecipientListRouter id + '.scatterer' AggregatingMessageHandler id + '.gatherer' BeanFactory id |
2 | 生命周期属性指示是否应在应用程序上下文初始化期间启动终结点。
此外,还实现并启动和停止 ,如果提供了 ,则在内部创建。
自选。
(默认值为 .)ScatterGatherHandler Lifecycle gatherEndpoint gather-channel true |
3 | 用于接收请求消息以在 .
必填。ScatterGatherHandler |
4 | 将聚合结果发送到的通道。
自选。
(传入消息可以在消息标头中指定回复通道)。ScatterGatherHandler replyChannel |
5 | 要向拍卖方案发送分散消息的通道。
自选。
与子元素互斥。<scatterer> |
6 | 从聚合的每个供应商处接收回复的渠道。
它用作分散消息中的标头。
自选。
默认情况下,将创建 。replyChannel FixedSubscriberChannel |
7 | 当多个处理程序订阅同一个组件时,此组件的顺序(用于负载平衡目的)。
自选。DirectChannel |
8 | 指定应启动和停止终结点的阶段。
启动顺序从最低到最高,关闭顺序从最高到最低。
默认情况下,此值为 ,表示此容器尽可能晚启动并尽快停止。
自选。Integer.MAX_VALUE |
9 | 向 .
默认情况下,块为一秒钟。
仅当输出通道具有某些“发送”限制时,它才适用 - 例如,具有固定“容量”的已满。
在本例中,将抛出 a。
对于实现,将忽略 。
对于 ,来自已计划的过期任务会导致重新计划此任务。
自选。Message output-channel send() QueueChannel MessageDeliveryException send-timeout AbstractSubscribableChannel group-timeout(-expression) MessageDeliveryException |
10 | 允许您指定 scatter-gather 在返回之前等待回复消息的时间。
默认情况下,它会等待几秒钟。
如果回复超时,则返回“null”。
自选。30 |
11 | 指定 scatter-gather 是否必须返回非 null 值。
默认情况下,此值为值。
因此,当基础聚合器在 之后返回 null 值时,将抛出 a。
请注意,如果可能,应指定 以避免无限期等待。true ReplyRequiredException gather-timeout null gather-timeout |
12 | 选项。
自选。
与属性互斥。<recipient-list-router> scatter-channel |
13 | 选项。
必填。<aggregator> |
1 | 终结点的 ID。
Bean 的注册别名为 。
Bean 的注册别名为 。
Bean 的注册别名为 。
自选。
(将生成默认值。ScatterGatherHandler id + '.handler' RecipientListRouter id + '.scatterer' AggregatingMessageHandler id + '.gatherer' BeanFactory id |
2 | 生命周期属性指示是否应在应用程序上下文初始化期间启动终结点。
此外,还实现并启动和停止 ,如果提供了 ,则在内部创建。
自选。
(默认值为 .)ScatterGatherHandler Lifecycle gatherEndpoint gather-channel true |
3 | 用于接收请求消息以在 .
必填。ScatterGatherHandler |
4 | 将聚合结果发送到的通道。
自选。
(传入消息可以在消息标头中指定回复通道)。ScatterGatherHandler replyChannel |
5 | 要向拍卖方案发送分散消息的通道。
自选。
与子元素互斥。<scatterer> |
6 | 从聚合的每个供应商处接收回复的渠道。
它用作分散消息中的标头。
自选。
默认情况下,将创建 。replyChannel FixedSubscriberChannel |
7 | 当多个处理程序订阅同一个组件时,此组件的顺序(用于负载平衡目的)。
自选。DirectChannel |
8 | 指定应启动和停止终结点的阶段。
启动顺序从最低到最高,关闭顺序从最高到最低。
默认情况下,此值为 ,表示此容器尽可能晚启动并尽快停止。
自选。Integer.MAX_VALUE |
9 | 向 .
默认情况下,块为一秒钟。
仅当输出通道具有某些“发送”限制时,它才适用 - 例如,具有固定“容量”的已满。
在本例中,将抛出 a。
对于实现,将忽略 。
对于 ,来自已计划的过期任务会导致重新计划此任务。
自选。Message output-channel send() QueueChannel MessageDeliveryException send-timeout AbstractSubscribableChannel group-timeout(-expression) MessageDeliveryException |
10 | 允许您指定 scatter-gather 在返回之前等待回复消息的时间。
默认情况下,它会等待几秒钟。
如果回复超时,则返回“null”。
自选。30 |
11 | 指定 scatter-gather 是否必须返回非 null 值。
默认情况下,此值为值。
因此,当基础聚合器在 之后返回 null 值时,将抛出 a。
请注意,如果可能,应指定 以避免无限期等待。true ReplyRequiredException gather-timeout null gather-timeout |
12 | 选项。
自选。
与属性互斥。<recipient-list-router> scatter-channel |
13 | 选项。
必填。<aggregator> |
错误处理
由于 Scatter-Gather 是一个多请求-应答组件,因此错误处理具有一些额外的复杂性。
在某些情况下,如果允许进程以比请求更少的回复完成,则最好只捕获并忽略下游异常。
在其他情况下,当发生错误时,应考虑从子流返回“补偿消息”之类的内容。ReleaseStrategy
每个异步子流都应配置一个标头,以便从 .
否则,将使用通用错误处理逻辑将错误发送到全局。
有关异步错误处理的详细信息,请参阅错误处理。errorChannel
MessagePublishingErrorHandler
errorChannel
同步流可以使用 an 来忽略异常或返回补偿消息。
当异常从其中一个子流抛出到 时,它只是重新抛出到上游。
这样一来,所有其他子流都将毫无用处,并且它们的回复将在 .
这有时可能是预期的行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。ExpressionEvaluatingRequestHandlerAdvice
ScatterGatherHandler
ScatterGatherHandler
从版本 5.1.3 开始,将提供该选项。
它被填充到分散消息的标头中,并在发生异步错误时使用,或者可以在常规同步子流中用于直接发送错误消息。ScatterGatherHandler
errorChannelName
errorChannel
下面的示例配置通过返回补偿消息来演示异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了生成正确的回复,我们必须从已发送到 的 中复制标头(包括 和 )。
这样,目标异常将返回给 for reply messages 组完成的收集器。
可以在收集器中筛选出此类异常,也可以在分散收集端点之后以其他方式向下游进行处理。replyChannel
errorChannel
failedMessage
MessagingException
scatterGatherErrorChannel
MessagePublishingErrorHandler
ScatterGatherHandler
payload
MessageGroupProcessor
在将分散结果发送到收集器之前,请恢复请求消息标头,包括应答和错误通道(如果有)。
这样,来自 的错误将传播到调用方,即使在分散接收者子流中应用了异步切换也是如此。
为了成功操作,必须将 和 标头传输回分散接收方子流的回复。
在这种情况下,必须为 .
否则,默认情况下,它将被阻止,等待收集器的回复。ScatterGatherHandler AggregatingMessageHandler gatherResultChannel originalReplyChannel originalErrorChannel gatherTimeout ScatterGatherHandler |
在将分散结果发送到收集器之前,请恢复请求消息标头,包括应答和错误通道(如果有)。
这样,来自 的错误将传播到调用方,即使在分散接收者子流中应用了异步切换也是如此。
为了成功操作,必须将 和 标头传输回分散接收方子流的回复。
在这种情况下,必须为 .
否则,默认情况下,它将被阻止,等待收集器的回复。ScatterGatherHandler AggregatingMessageHandler gatherResultChannel originalReplyChannel originalErrorChannel gatherTimeout ScatterGatherHandler |