分配器

拆分器是一个组件,其作用是将消息划分为多个部分,并发送生成的消息以进行独立处理。 很多时候,他们是包含聚合商的管道中的上游生产者。spring-doc.cadn.net.cn

编程模型

用于执行拆分的 API 由一个基类AbstractMessageSplitter. 它是一个MessageHandler封装 splitter 通用功能的实现,例如填写适当的消息标头 (CORRELATION_ID,SEQUENCE_SIZESEQUENCE_NUMBER) 对生成的消息进行分配。 通过此填充,可以跟踪消息及其处理结果(在典型情况下,这些标头将复制到由各种转换终端节点生成的消息中)。 然后,这些值可以由组合消息处理器等使用。spring-doc.cadn.net.cn

以下示例显示了AbstractMessageSplitter:spring-doc.cadn.net.cn

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

要在应用程序中实现特定的 splitter,您可以扩展AbstractMessageSplitter并实施splitMessage方法,其中包含用于拆分消息的逻辑。 返回值可以是以下值之一:spring-doc.cadn.net.cn

  • 一个Collection或消息数组或Iterable(或Iterator) 迭代消息。 在这种情况下,消息将作为消息发送(在CORRELATION_ID,SEQUENCE_SIZESEQUENCE_NUMBER填充)。 使用此方法可以为您提供更多控制 — 例如,在拆分过程中填充自定义消息标头。spring-doc.cadn.net.cn

  • 一个Collection或非消息对象数组或Iterable(或Iterator) 迭代非 message 对象。 它的工作方式与前一种情况类似,不同之处在于每个 collection 元素都用作消息有效负载。 使用此方法,您可以专注于域对象,而不必考虑消息传送系统,并生成更易于测试的代码。spring-doc.cadn.net.cn

  • 一个Message或非消息对象(但不是集合或数组)。 它的工作方式与前面的情况类似,只是只发送了一条消息。spring-doc.cadn.net.cn

在 Spring 集成中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。 在这种情况下,该方法的返回值将如前所述进行解释。 input 参数可以是Message或简单的 POJO。 在后一种情况下,拆分器接收传入消息的有效负载。 我们推荐这种方法,因为它将代码与 Spring Integration API 分离,并且通常更容易测试。spring-doc.cadn.net.cn

迭代器

从版本 4.1 开始,AbstractMessageSplitter支持Iteratortype 的value进行拆分。 请注意,在Iterator(或Iterable),我们无权访问底层项目的数量,并且SEQUENCE_SIZEheader 设置为0. 这意味着默认的SequenceSizeReleaseStrategy<aggregator>不起作用,并且CORRELATION_IDsplitter不会被发布;它将保持为incomplete. 在这种情况下,您应该使用适当的自定义ReleaseStrategy或依赖send-partial-result-on-expirygroup-timeoutMessageGroupStoreReaper.spring-doc.cadn.net.cn

从版本 5.0 开始,AbstractMessageSplitter提供protected obtainSizeIfPossible()方法来确定IterableIterator对象。 例如XPathMessageSplitter可以确定底层证券的大小NodeList对象。 从版本 5.0.9 开始,此方法还会正确返回com.fasterxml.jackson.core.TreeNode.spring-doc.cadn.net.cn

Iteratorobject 有助于避免在拆分之前在内存中构建整个集合。 例如,当基础项目是从某个外部系统(例如 DataBase 或 FTP)填充的MGET) 使用迭代或流。spring-doc.cadn.net.cn

Stream 和 Flux

从版本 5.0 开始,AbstractMessageSplitter支持 JavaStream和 Reactive StreamsPublishertypes 的value进行拆分。 在这种情况下,目标Iterator构建在其迭代功能之上。spring-doc.cadn.net.cn

此外,如果分离器的输出通道是ReactiveStreamsSubscribableChannelAbstractMessageSplitter生成一个Fluxresult 而不是Iterator,并且输出通道订阅了此Flux用于对下游流量需求进行基于背压的分流。spring-doc.cadn.net.cn

从版本 5.2 开始,splitter 支持discardChannel用于发送 Split 函数为其返回空容器(collection、array、stream、Flux等)。 在这种情况下,没有要迭代的 item 来发送到outputChannel. 这null拆分结果仍作为流结束指示器。spring-doc.cadn.net.cn

使用 Java、Groovy 和 Kotlin DSL 配置 Splitter

基于Message及其具有 DSL 配置的可迭代有效负载:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

有关这些 DSL 的更多信息,请参阅相应的章节:spring-doc.cadn.net.cn

使用 XML 配置 Splitter

可以通过 XML 配置拆分器,如下所示:spring-doc.cadn.net.cn

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 拆分器的 ID 是可选的。
2 对在应用程序上下文中定义的 Bean 的引用。 Bean 必须实现拆分逻辑,如前面的部分所述。 自选。 如果未提供对 Bean 的引用,则假定到达input-channeljava.util.Collection默认的拆分逻辑应用于集合,将每个单独的元素合并到一条消息中,并将其发送到output-channel.
3 实现拆分逻辑的方法(在 Bean 上定义)。 自选。
4 分路器的 input 通道。 必填。
5 拆分器将拆分传入消息的结果发送到的通道。 可选(因为传入消息可以自己指定回复通道)。
6 在切分结果为空的情况下,请求消息发送到的频道。 可选(它们将停止,就像nullresult) 的 Result)

我们建议使用ref属性(如果自定义 Splitter 实现可以在其他<splitter>定义。 但是,如果自定义拆分器处理程序实现的范围应限定为<splitter>中,您可以配置内部 Bean 定义,如下例所示:spring-doc.cadn.net.cn

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
同时使用 arefattribute 和内部处理程序定义位于同一<int:splitter>不允许配置,因为它会产生不明确的条件并导致引发异常。
如果ref属性引用一个扩展AbstractMessageProducingHandler(例如框架本身提供的 splitters),通过将 output channel 直接注入到 handler 中来优化配置。 在这种情况下,每个ref必须是一个单独的 bean 实例(或者prototype-scoped bean)或使用内部的<bean/>配置类型。 但是,仅当未在拆分器 XML 定义中提供任何特定于拆分器的属性时,此优化才适用。 如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。

使用注释配置 Splitter

@Splitter注解适用于需要Messagetype 或消息负载类型,并且该方法的返回值应为Collection任何类型的 如果返回的值不是实际值Message对象中,每个项目都包装在Message作为Message. 每个结果Message发送到终端节点的指定输出通道,在该终端节点上,@Splitter已定义。spring-doc.cadn.net.cn

以下示例演示如何使用@Splitter注解:spring-doc.cadn.net.cn

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}