聚合

基本上,聚合器是拆分器的镜像,是一种消息处理程序,它接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游使用者。spring-doc.cadn.net.cn

从技术上讲,聚合器比 splitter 更复杂,因为它是有状态的。 它必须保存要聚合的消息,并确定何时准备好聚合整个消息组。 为此,它需要一个MessageStore.spring-doc.cadn.net.cn

功能性

Aggregator 通过关联和存储一组相关消息来组合这些消息,直到该组被视为完整为止。 此时,聚合器通过处理整个组来创建单个消息,并将聚合消息作为输出发送。spring-doc.cadn.net.cn

实现聚合器需要提供执行聚合(即,从多个中创建单个消息)的逻辑。 两个相关概念是 correlation 和 release。spring-doc.cadn.net.cn

Correlation 确定如何对消息进行分组以进行聚合。 在 Spring Integration 中,默认情况下,关联是基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息标头。 具有相同IntegrationMessageHeaderAccessor.CORRELATION_ID组合在一起。 但是,您可以自定义关联策略,以允许以其他方式指定应如何将消息分组在一起。 为此,您可以实现CorrelationStrategy(本章稍后将介绍)。spring-doc.cadn.net.cn

要确定一组消息的准备处理点,请使用ReleaseStrategy被咨询。 当序列中包含的所有消息都存在时,聚合器的默认发布策略会根据IntegrationMessageHeaderAccessor.SEQUENCE_SIZE页眉。 您可以通过提供对自定义ReleaseStrategy实现。spring-doc.cadn.net.cn

编程模型

聚合 API 由许多类组成:spring-doc.cadn.net.cn

  • 界面MessageGroupProcessor及其子类:MethodInvokingAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessorspring-doc.cadn.net.cn

  • ReleaseStrategyinterface 及其默认实现:SimpleSequenceSizeReleaseStrategyspring-doc.cadn.net.cn

  • CorrelationStrategyinterface 及其默认实现:HeaderAttributeCorrelationStrategyspring-doc.cadn.net.cn

AggregatingMessageHandler

AggregatingMessageHandlerAbstractCorrelatingMessageHandler) 是MessageHandler实现,封装了聚合器的常见功能(以及其他相关的用例),如下所示:spring-doc.cadn.net.cn

决定如何将消息分组在一起的责任被委派给CorrelationStrategy实例。 决定是否可以释放消息组的责任委托给ReleaseStrategy实例。spring-doc.cadn.net.cn

以下清单显示了该基地的简要亮点AbstractAggregatingMessageGroupProcessor(实施aggregatePayloads方法留给开发人员):spring-doc.cadn.net.cn

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

DefaultAggregatingMessageGroupProcessor,ExpressionEvaluatingMessageGroupProcessorMethodInvokingMessageGroupProcessor作为AbstractAggregatingMessageGroupProcessor.spring-doc.cadn.net.cn

从版本 5.2 开始,Function<MessageGroup, Map<String, Object>>策略可用于AbstractAggregatingMessageGroupProcessor合并和计算(聚合)输出消息的标头。 这DefaultAggregateHeadersFunctionimplementation 可用于返回组内没有冲突的所有 Headers 的 logic;组中的一封或多封邮件的报头缺失不被视为冲突。 冲突的标头将被省略。 随着新引入的DelegatingMessageGroupProcessor,此函数用于任何任意(非AbstractAggregatingMessageGroupProcessor) MessageGroupProcessor实现。 从本质上讲,框架将提供的函数注入到AbstractAggregatingMessageGroupProcessor实例,并将所有其他实现包装到DelegatingMessageGroupProcessor. The Logic: 差异AbstractAggregatingMessageGroupProcessorDelegatingMessageGroupProcessor后者在调用 Delegate 策略之前不会提前计算标头,并且如果 Delegate 返回MessageAbstractIntegrationMessageBuilder. 在这种情况下,框架假定目标实现已负责生成一组填充到返回结果中的适当标头。 这Function<MessageGroup, Map<String, Object>>策略的headers-functionreference 属性,作为 XML 配置的AggregatorSpec.headersFunction()选项,以及AggregatorFactoryBean.setHeadersFunction()用于普通 Java 配置。spring-doc.cadn.net.cn

CorrelationStrategyAbstractCorrelatingMessageHandler,并且具有基于IntegrationMessageHeaderAccessor.CORRELATION_IDmessage 标头,如下例所示:spring-doc.cadn.net.cn

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor. 它会创建一个Message其 payload 为List为给定组接收的负载。 这适用于使用拆分器、发布-订阅通道或上游收件人列表路由器的简单分散-收集实现。spring-doc.cadn.net.cn

在此类场景中使用发布-订阅通道或收件人列表路由器时,请务必启用apply-sequence旗。 这样做会添加必要的标头:CORRELATION_ID,SEQUENCE_NUMBERSEQUENCE_SIZE. 默认情况下,对于 Spring 集成中的拆分器,该行为是启用的,但不会为发布-订阅通道或收件人列表路由器启用该行为,因为这些组件可能用于不需要这些 Headers 的各种上下文中。

在为应用程序实施特定的聚合器策略时,您可以扩展AbstractAggregatingMessageGroupProcessor并实施aggregatePayloads方法。 但是,有更好的解决方案,与 API 的耦合较少,用于实现聚合逻辑,可以通过 XML 或 Comments 进行配置。spring-doc.cadn.net.cn

一般来说,任何 POJO 都可以实现聚合算法,前提是它提供了一个接受单个java.util.List作为参数(也支持参数化列表)。 调用该方法用于聚合消息,如下所示:spring-doc.cadn.net.cn

  • 如果参数是java.util.Collection<T>并且参数类型 T 可分配给Message,则为聚合而积累的消息的整个列表将发送到聚合器。spring-doc.cadn.net.cn

  • 如果参数是非参数化的java.util.Collection或参数类型不可分配给Message中,该方法接收累积消息的有效负载。spring-doc.cadn.net.cn

  • 如果返回类型不能分配给Message,它被视为Message该 API 的 API 由 Framework 自动创建。spring-doc.cadn.net.cn

为了实现代码简单性并促进最佳实践(如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。

从版本 5.3 开始,在处理消息组后,AbstractCorrelatingMessageHandler执行MessageBuilder.popSequenceDetails()消息标头修改,以便具有多个嵌套级别的适当 splitter-aggregator 方案。 仅当消息组发布结果不是消息集合时,才会执行该作。 在这种情况下,目标MessageGroupProcessor负责MessageBuilder.popSequenceDetails()call 来构建这些消息。spring-doc.cadn.net.cn

如果MessageGroupProcessor返回Message一个MessageBuilder.popSequenceDetails()将仅在sequenceDetails与组中的第一条消息匹配。 (以前,仅当普通有效负载或AbstractIntegrationMessageBuilder已从MessageGroupProcessor.)spring-doc.cadn.net.cn

此功能可以通过新的popSequence boolean属性,因此MessageBuilder.popSequenceDetails()在某些情况下,当标准拆分器尚未填充关联详细信息时,可以禁用。 这个属性本质上会撤消最近的上游所做的工作applySequence = trueAbstractMessageSplitter. 有关更多信息,请参阅 Splitterspring-doc.cadn.net.cn

SimpleMessageGroup.getMessages()method 返回一个unmodifiableCollection. 因此,如果聚合 POJO 方法具有Collection<Message>参数,则传入的参数正是Collection实例,当您使用SimpleMessageStore对于聚合商,该原始Collection<Message>在释放组后被清除。 因此,Collection<Message>如果 POJO 中的 variable 被传出 聚合器,它也会被清除。 如果您只想按原样发布该集合以进行进一步处理,则必须构建一个新的Collection(例如,new ArrayList<Message>(messages)). 从版本 4.3 开始,框架不再将消息复制到新集合,以避免创建不需要的额外对象。

在版本 4.2 之前,无法提供MessageGroupProcessor通过使用 XML 配置。 只有 POJO 方法可用于聚合。 现在,如果框架检测到引用的(或内部的)bean 实现了MessageProcessor,它用作聚合器的输出处理器。spring-doc.cadn.net.cn

如果您希望从自定义MessageGroupProcessor作为 Message 的有效负载,您的类应扩展AbstractAggregatingMessageGroupProcessor并实施aggregatePayloads().spring-doc.cadn.net.cn

此外,从 4.2 版本开始,一个SimpleMessageGroupProcessor。 它返回来自组的消息集合,如前所述,这会导致单独发送已发布的消息。spring-doc.cadn.net.cn

这使得聚合器可以充当消息屏障,其中到达的消息将被保留,直到发布策略触发并且该组作为单个消息序列发布。spring-doc.cadn.net.cn

从版本 6.0 开始,上述拆分行为仅在组处理器为SimpleMessageGroupProcessor. 否则,使用任何其他MessageGroupProcessor实现,该 API 将返回Collection<Message>,则仅发出一条回复消息,并将整个消息集合作为其有效负载。 这种逻辑由聚合器的规范目的决定 - 通过某个键收集请求消息并生成单个分组消息。spring-doc.cadn.net.cn

在 6.5 版本之前,如果MessageGroupProcessor(通常是来自 DSL 的 lambda)返回一组有效负载,AbstractCorrelatingMessageHandler失败了,并且IllegalArgumentException声明只能收集消息。 从现在开始,这种限制被消除,返回的有效负载集合作为来自聚合器的单个回复消息发出,仅包含来自最后一个请求消息的标头。 如果需要 headers 聚合以及有效负载集合,则AbstractAggregatingMessageGroupProcessor建议使用 implementations 而不是 plainMessageGroupProcessorfunctional 接口。spring-doc.cadn.net.cn

ReleaseStrategy

ReleaseStrategy接口定义如下:spring-doc.cadn.net.cn

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

一般来说,任何 POJO 都可以实现完成决策逻辑,如果它提供了一个接受单个java.util.List作为参数(也支持参数化列表)并返回布尔值。 在每条新消息到达后调用此方法,以确定组是否完成,如下所示:spring-doc.cadn.net.cn

  • 如果参数是java.util.List<T>和参数类型T可分配给Message,组中累积的整个消息列表将发送到该方法。spring-doc.cadn.net.cn

  • 如果参数是非参数化的java.util.List或参数类型不可分配给Message中,该方法接收累积消息的有效负载。spring-doc.cadn.net.cn

  • 该方法必须返回true如果消息组已准备好进行聚合,否则为 false。spring-doc.cadn.net.cn

以下示例演示如何使用@ReleaseStrategy注解List的类型Message:spring-doc.cadn.net.cn

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

以下示例演示如何使用@ReleaseStrategy注解List的类型String:spring-doc.cadn.net.cn

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

基于前两个示例中的签名,基于 POJO 的发布策略将Collection的尚未发布的消息(如果您需要访问整个Message) 或Collection有效负载对象(如果 type 参数不是Message). 这满足了大多数使用案例。 但是,如果出于某种原因,您需要访问完整的MessageGroup中,您应该提供ReleaseStrategy接口。spring-doc.cadn.net.cn

在处理可能较大的组时,您应该了解如何调用这些方法,因为在释放组之前可能会多次调用发布策略。 最有效的是ReleaseStrategy,因为聚合器可以直接调用它。 第二高效的是 POJO 方法,该方法的Collection<Message<?>>parameter 类型。 效率最低的是 POJO 方法,该方法的Collection<Something>类型。 框架必须将有效负载从组中的消息复制到新的集合中(并可能尝试将有效负载转换为Something) 中。 用Collection<?>避免了转换,但仍需要创建新的Collection.spring-doc.cadn.net.cn

出于这些原因,对于大型组,我们建议您实施ReleaseStrategy.spring-doc.cadn.net.cn

当释放组进行聚合时,将处理其所有尚未发布的消息并将其从组中删除。 如果组也已完成(即,如果序列中的所有消息都已到达或未定义序列),则该组将标记为完成。 此组的任何新消息都将发送到 discard 通道(如果已定义)。 设置expire-groups-upon-completiontrue(默认值为false) 删除整个组,并且任何新消息(与已删除的组具有相同的相关 ID)将形成一个新组。 您可以使用MessageGroupStoreReapersend-partial-result-on-expiry设置为true.spring-doc.cadn.net.cn

从版本 6.5 开始,关联处理程序还可以配置为discardIndividuallyOnExpiry选项将整个组作为单个消息丢弃。 实质上,此消息的有效负载是来自过期组的消息列表。 仅在以下情况下工作sendPartialResultOnExpiry设置为false(默认)和dicardChannelspring-doc.cadn.net.cn

为了便于丢弃延迟到达的消息,聚合器必须在组发布后维护有关该组的状态。 这最终可能导致内存不足情况。 为避免此类情况,您应该考虑配置MessageGroupStoreReaper以删除组元数据。 应将过期参数设置为在到达某个点后使组过期,之后预计延迟消息不会到达。 有关配置收割者的信息,请参阅在 Aggregator 中管理状态:MessageGroupStore.

Spring 集成为ReleaseStrategy:SimpleSequenceSizeReleaseStrategy. 此实现会参考SEQUENCE_NUMBERSEQUENCE_SIZE标头,以决定消息组何时完成并准备好进行聚合。 如前所述,它也是默认策略。spring-doc.cadn.net.cn

在 5.0 版本之前,默认的发布策略是SequenceSizeReleaseStrategy,这在大型组中表现不佳。 使用该策略,可以检测并拒绝重复的序列号。 此作可能很昂贵。

如果要聚合大型组,则不需要释放部分组,也不需要检测/拒绝重复序列,请考虑使用SimpleSequenceSizeReleaseStrategy相反 - 对于这些用例,它的效率要高得多,并且自 5.0 版以来,当未指定 Partial Group Release 时,它是默认值。spring-doc.cadn.net.cn

聚合大型组

4.3 版本更改了默认值Collection对于SimpleMessageGroupHashSet(以前是BlockingQueue). 当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。 尽管删除哈希集的速度通常要快得多,但对于大型消息来说,它的成本可能很高,因为必须在 insert 和 remove 上计算哈希值。 如果您的消息哈希成本很高,请考虑使用其他一些集合类型。 如MessageGroupFactory一个SimpleMessageGroupFactory,以便您可以选择Collection最适合你的需求。 您还可以提供自己的工厂实现来创建其他Collection<Message<?>>.spring-doc.cadn.net.cn

以下示例显示了如何使用前面的实现和SimpleSequenceSizeReleaseStrategy:spring-doc.cadn.net.cn

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果筛选条件端点涉及聚合器上游的流,则序列大小发布策略(固定或基于sequenceSizeheader) 无法达到其目的,因为 filter 可能会丢弃序列中的某些消息。 在这种情况下,建议选择另一个ReleaseStrategy,或者使用从丢弃子流发送的补偿消息,该子流的内容中包含一些信息,以便在自定义完整组函数中跳过。 有关更多信息,请参阅过滤器

关联策略

CorrelationStrategy接口定义如下:spring-doc.cadn.net.cn

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

该方法返回一个Object该键表示用于将消息与消息组关联的关联键。 该键必须满足Map关于实施equals()hashCode().spring-doc.cadn.net.cn

通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与ServiceActivator(包括对@Headerannotations) 的 Annotations)。 该方法必须返回一个值,并且该值不能是null.spring-doc.cadn.net.cn

Spring 集成为CorrelationStrategy:HeaderAttributeCorrelationStrategy. 此实现返回其中一个消息标头(其名称由 constructor 参数指定)的值作为相关键。 默认情况下,关联策略是HeaderAttributeCorrelationStrategy,返回CORRELATION_IDheader 属性。 如果您有要用于关联的自定义标头名称,则可以在HeaderAttributeCorrelationStrategy并将其作为聚合商关联策略的参考。spring-doc.cadn.net.cn

锁定注册表

对组的更改是线程安全的。 因此,当您同时发送同一相关 ID 的消息时,聚合器中将只处理其中一个消息,从而使其实际上每个消息组都是单线程的。 一个LockRegistry用于获取已解析相关 ID 的锁。 一个DefaultLockRegistry默认使用 (in-memory)。 用于在共享MessageGroupStore正在使用,则必须配置共享锁注册表。spring-doc.cadn.net.cn

避免死锁

如上所述,当消息组发生更改(添加或释放消息)时,将持有一个锁。spring-doc.cadn.net.cn

请考虑以程:spring-doc.cadn.net.cn

...->aggregator1-> ... ->aggregator2-> ...

如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。 这将导致线程挂起,并且jstack <pid>可能会出现如下结果:spring-doc.cadn.net.cn

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"

有几种方法可以避免此问题:spring-doc.cadn.net.cn

  • 确保每个聚合器都有自己的 Lock Registry(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表)spring-doc.cadn.net.cn

  • 使用ExecutorChannelQueueChannel作为聚合器的输出通道,以便下游流在新线程上运行spring-doc.cadn.net.cn

  • 从版本 5.1.1 开始,将releaseLockBeforeSendaggregator 属性设置为truespring-doc.cadn.net.cn

如果由于某种原因,单个聚合器的输出最终路由回同一聚合器,也可能导致此问题。 当然,上述第一种解决方案不适用于这种情况。

在 Java DSL 中配置聚合器

有关如何在 Java DSL 中配置聚合器的信息,请参见 Aggregators 和 Resequencersspring-doc.cadn.net.cn

使用 XML 配置聚合器

Spring 集成支持通过 XML 配置聚合器<aggregator/>元素。 以下示例显示了聚合器的示例:spring-doc.cadn.net.cn

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"                          (1)
        auto-startup="true"                                (2)
        input-channel="inputChannel"                       (3)
        output-channel="outputChannel"                     (4)
        discard-channel="throwAwayChannel"                 (5)
        message-store="persistentMessageStore"             (6)
        order="1"                                          (7)
        send-partial-result-on-expiry="false"              (8)
        send-timeout="1000"                                (9)

        correlation-strategy="correlationStrategyBean"     (10)
        correlation-strategy-method="correlate"            (11)
        correlation-strategy-expression="headers['foo']"   (12)

        ref="aggregatorBean"                               (13)
        method="aggregate"                                 (14)

        release-strategy="releaseStrategyBean"             (15)
        release-strategy-method="release"                  (16)
        release-strategy-expression="size() == 5"          (17)

        expire-groups-upon-completion="false"              (18)
        empty-group-min-timeout="60000"                    (19)

        lock-registry="lockRegistry"                       (20)

        group-timeout="60000"                              (21)
        group-timeout-expression="size() ge 2 ? 100 : -1"  (22)
        expire-groups-upon-timeout="true"                  (23)

        scheduler="taskScheduler" >                        (24)
            <expire-transactional/>                        (25)
            <expire-advice-chain/>                         (26)
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 聚合商的 ID 是可选的。
2 生命周期属性指示是否应在应用程序上下文启动期间启动聚合器。 可选(默认值为 'true')。
3 聚合器从中接收消息的通道。 必填。
4 聚合器将聚合结果发送到的通道。 可选(因为传入消息本身可以在 'replyChannel' 消息头中指定回复通道)。
5 聚合器将超时消息发送到的通道(如果send-partial-result-on-expiryfalse). 自选。
6 MessageGroupStore用于将消息组存储在其关联键下,直到它们完成。 自选。 默认情况下,它是一个易失性内存存储。 有关更多信息,请参阅 Message Store
7 当多个 handle 订阅到同一DirectChannel(用于负载平衡目的)。 自选。
8 表示过期的消息应被聚合,并在其包含MessageGroup已过期(请参阅MessageGroupStore.expireMessageGroups(long)). 使MessageGroup是通过配置MessageGroupStoreReaper. 但是,您也可以选择过期MessageGroup通过调用MessageGroupStore.expireMessageGroups(timeout). 您可以通过 Control Bus作来实现,或者,如果您引用了MessageGroupStore实例中,通过调用expireMessageGroups(timeout). 否则,此属性本身不会执行任何作。 它仅用作指示是否丢弃或将仍在MessageGroup即将过期。 可选(默认值为false). 注意:此属性可能更恰当地称为send-partial-result-on-timeout,因为在以下情况下,该组实际上可能不会过期expire-groups-upon-timeout设置为false.
9 发送回复时要等待的超时间隔Messageoutput-channeldiscard-channel. 默认为30秒。 仅当输出通道具有一些 “发送” 限制(例如QueueChannel具有固定的 'capacity' 。 在这种情况下,MessageDeliveryException被抛出。 为AbstractSubscribableChannelimplementations、send-timeout被忽略。 为group-timeout(-expression)MessageDeliveryException从 Scheduled expiring task (计划过期任务) 将导致此任务被重新计划。 自选。
10 对实现消息关联(分组)算法的 Bean 的引用。 该 bean 可以是CorrelationStrategyinterface 或 POJO 的 API API 中。 在后一种情况下,correlation-strategy-methodattribute 的 intent 值。 可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID标头)。
11 在 Bean 上定义的方法,由correlation-strategy. 它实现了关联决策算法。 可选,但有限制 (correlation-strategy必须存在)。
12 表示关联策略的 SPEL 表达式。 例:"headers['something']". 只有其中之一correlation-strategycorrelation-strategy-expression是允许的。
13 对在应用程序上下文中定义的 Bean 的引用。 如前所述,Bean 必须实现聚合逻辑。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。
14 在 bean 上定义的方法,该方法由ref属性。 它实现了消息聚合算法。 可选(取决于ref属性)。
15 对实现发布策略的 bean 的引用。 该 bean 可以是ReleaseStrategyinterface 或 POJO 的 API API 中。 在后一种情况下,release-strategy-methodattribute 的 intent 值。 可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader 属性)。
16 在 bean 上定义的方法,该方法由release-strategy属性。 它实现了完成决策算法。 可选,但有限制 (release-strategy必须存在)。
17 表示发布策略的 SPEL 表达式。 表达式的根对象是一个MessageGroup. 例:"size() == 5". 只有其中之一release-strategyrelease-strategy-expression是允许的。
18 当设置为true(默认值为false),已完成的组将从邮件存储中删除,从而让具有相同关联的后续邮件形成一个新组。 默认行为是将与已完成组具有相同关联的消息发送到discard-channel.
19 仅当MessageGroupStoreReaper配置为MessageStore<aggregator>. 默认情况下,当MessageGroupStoreReaper配置为使部分组过期,则还会删除空组。 在正常释放组后,存在空组。 空组允许检测和丢弃延迟到达的消息。 如果您希望使空组过期的时间比使部分组过期的时间更长,请设置此属性。 然后,空组不会从MessageStore直到它们至少在此毫秒数内未被修改。 请注意,空组过期的实际时间也受 reaper 的timeoutproperty,它可以是此值加上 timeout 的值。
20 org.springframework.integration.util.LockRegistry豆。 它曾经获得一个Lock基于groupId对于MessageGroup. 默认情况下,内部的DefaultLockRegistry被使用。 使用分布式LockRegistry,例如ZookeeperLockRegistry,确保只有一个聚合器实例可以同时对组进行作。 有关更多信息,请参阅 Redis Lock RegistryZookeeper Lock Registry
21 超时(以毫秒为单位),用于强制MessageGroupcomplete 时ReleaseStrategy在当前消息到达时不释放组。 当需要发出部分结果(或丢弃组)时,如果新消息未到达MessageGroup在超时内,从最后一条消息到达的时间开始计算。 要设置一个超时,该超时从MessageGroup被创建,请参阅group-timeout-expression信息。 当新消息到达聚合器时,任何现有的ScheduledFuture<?>对于其MessageGroup已取消。 如果ReleaseStrategy返回false(表示不发布)和groupTimeout > 0,则计划新任务使组过期。 我们不建议将此属性设置为零(或负值)。 这样做可以有效地禁用聚合器,因为每个消息组都会立即完成。 但是,您可以使用表达式有条件地将其设置为零(或负值)。 看group-timeout-expression以获取信息。 完成期间执行的作取决于ReleaseStrategysend-partial-group-on-expiry属性。 有关更多信息,请参阅 Aggregator 和 Group Timeout 。 它与 'group-timeout-expression' 属性互斥。
22 计算结果为groupTimeout使用MessageGroup作为#rootevaluation context 对象。 用于调度MessageGroup强制完成。 如果表达式的计算结果为null,则不会计划完成。 如果计算结果为零,则立即在当前线程上完成该组。 实际上,这提供了一个动态的group-timeout财产。 例如,如果您希望强制完成MessageGroup自创建组以来经过 10 秒后,您可以考虑使用以下 SPEL 表达式:timestamp + 10000 - T(System).currentTimeMillis()哪里timestampMessageGroup.getTimestamp()作为MessageGroup这是#rootevaluation context 对象。 但请记住,组创建时间可能与第一条消息的时间不同,具体取决于其他组过期属性的配置。 看group-timeout了解更多信息。 与 'group-timeout' 属性互斥。
23 当组由于超时(或MessageGroupStoreReaper),则默认情况下,该组已过期(完全删除)。 迟到的消息将启动一个新组。 您可以将其设置为false以完成组,但保留其元数据,以便丢弃延迟到达的消息。 空组可以稍后使用MessageGroupStoreReaperempty-group-min-timeout属性。 它默认为 'true'。
24 一个TaskSchedulerbean 引用来调度MessageGroup如果没有新消息到达,则强制完成MessageGroupgroupTimeout. 如果未提供,则默认调度程序 (taskSchedulerApplicationContext (ThreadPoolTaskScheduler) 被使用。 如果满足以下条件,则此属性不适用group-timeoutgroup-timeout-expression未指定。
25 从 4.1 版本开始。 它允许为forceComplete操作。 它是从group-timeout(-expression)或按MessageGroupStoreReaper,并且不会应用于 normaladd,releasediscard操作。 只有此子元素或<expire-advice-chain/>是允许的。
26 4.1 版本开始。 它允许配置任何Advice对于forceComplete操作。 它是从group-timeout(-expression)或按MessageGroupStoreReaper,并且不会应用于 normaladd,releasediscard操作。 只有此子元素或<expire-transactional/>是允许的。 交易Advice也可以使用 Spring 在此处进行配置txNamespace。
过期组

有两个属性与过期(完全删除)组相关。 当组过期时,没有该组的记录,如果到达具有相同关联的新消息,则会启动一个新组。 当组完成(无过期)时,空组将保留,并丢弃延迟到达的消息。 稍后可以使用MessageGroupStoreReaperempty-group-min-timeout属性。spring-doc.cadn.net.cn

expire-groups-upon-completion与 “正常” 补全相关,当ReleaseStrategy释放组。 默认为false.spring-doc.cadn.net.cn

如果组未正常完成,但因超时而被释放或丢弃,则该组通常已过期。 从版本 4.1 开始,您可以通过使用expire-groups-upon-timeout. 它默认为true以实现向后兼容性。spring-doc.cadn.net.cn

当组超时时,ReleaseStrategy再获得一次释放该组的机会。 如果这样做,并且expire-groups-upon-timeout为 false,则过期时间由expire-groups-upon-completion. 如果在超时期间,释放策略未释放组,则过期时间由expire-groups-upon-timeout. 超时组被丢弃或发生部分释放(基于send-partial-result-on-expiry).

从 5.0 版本开始,空组也会被安排在empty-group-min-timeout. 如果expireGroupsUponCompletion == falseminimumTimeoutForEmptyGroups > 0,则删除组的任务将计划在正常或部分序列发布时进行。spring-doc.cadn.net.cn

从版本 5.4 开始,可以将聚合器(和重排序器)配置为使孤立组(位于持久性消息存储中的那些可能不会被释放的组)过期。 这expireTimeout(如果大于0) 表示应清除存储中早于此值的组。 这purgeOrphanedGroups()方法,并与提供的expireDuration,定期在计划任务中。 此方法也可以随时从外部调用。 过期逻辑完全委托给forceComplete(MessageGroup)功能。 当需要从那些不再使用常规消息到达逻辑释放的旧组中清理消息存储时,这种定期清除功能非常有用。 在大多数情况下,这发生在应用程序重新启动后,当使用持久性消息组存储时。 该功能类似于MessageGroupStoreReaper替换为 Scheduled Task,但在使用 Group Timeout 而不是 Reaper 时,提供了一种处理特定组件中的旧 group 的便捷方法。 这MessageGroupStore必须专门为当前关联终端节点提供。 否则,一个聚合商可能会从另一个聚合商中清除组。 使用聚合器时,使用此技术过期的组将被丢弃或作为部分组发布,具体取决于expireGroupsUponCompletion财产。spring-doc.cadn.net.cn

我们通常建议使用ref属性(如果自定义聚合器处理程序实现可以在其他<aggregator>定义。 但是,如果自定义聚合器实现仅由<aggregator>,您可以使用内部 Bean 定义(从版本 1.0.3 开始)在<aggregator>元素,如下例所示:spring-doc.cadn.net.cn

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
同时使用 aref属性和内部 Bean 定义位于同一<aggregator>不允许配置,因为它会产生不明确的条件。 在这种情况下,将引发 Exception。

以下示例显示了聚合器 Bean 的实现:spring-doc.cadn.net.cn

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

前面示例的完成策略 Bean 的实现可能如下所示:spring-doc.cadn.net.cn

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}
只要有必要这样做,就可以将发布策略方法和 aggregator 方法组合成一个 bean。

上面示例的相关策略 bean 的实现可能如下所示:spring-doc.cadn.net.cn

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

前面示例中的聚合器将按某个标准对数字进行分组(在本例中为除以 10 后的余数)并保留该组,直到有效负载提供的数字之和超过特定值。spring-doc.cadn.net.cn

只要有必要这样做,就可以将发布策略方法、相关策略方法和 聚合器方法组合到单个 bean 中。 (实际上,它们全部或其中任何两个都可以组合。

聚合器和 Spring 表达式语言 (SpEL)

从 Spring Integration 2.0 开始,你可以使用 SPEL 处理各种策略(关联、发布和聚合),如果这种发布策略背后的逻辑相对简单,我们建议这样做。 假设您有一个 legacy 组件,该组件旨在接收对象数组。 我们知道,默认发布策略将所有聚合消息组合到List. 现在我们有两个问题。 首先,我们需要从列表中提取单个消息。 其次,我们需要提取每条消息的有效负载并组装对象数组。 以下示例解决了这两个问题:spring-doc.cadn.net.cn

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

但是,使用 SPEL,实际上可以通过单行表达式相对容易地处理此类需求,从而避免编写自定义类并将其配置为 bean。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载中组装一个新的集合,然后将其转换为数组,从而获得与早期 Java 代码相同的结果。spring-doc.cadn.net.cn

在处理自定义发布和关联策略时,您可以应用相同的基于表达式的方法。spring-doc.cadn.net.cn

而不是为自定义CorrelationStrategycorrelation-strategy属性中,您可以将简单的关联逻辑实现为 SpEL 表达式,并在correlation-strategy-expression属性,如下例所示:spring-doc.cadn.net.cn

correlation-strategy-expression="payload.person.id"

在前面的示例中,我们假设有效负载具有person属性替换为id,它将用于关联消息。spring-doc.cadn.net.cn

同样,对于ReleaseStrategy,您可以将发布逻辑实现为 SpEL 表达式,并在release-strategy-expression属性。 评估上下文的根对象是MessageGroup本身。 这List的消息可以使用message表达式中组的属性。spring-doc.cadn.net.cn

在 5.0 版之前的版本中,根对象是Message<?>,如前面的示例所示:
release-strategy-expression="!messages.?[payload==5].empty"

在前面的示例中,SPEL 评估上下文的根对象是MessageGroup本身,并且您声明,只要有一条 payload 为5在此组中,应释放该组。spring-doc.cadn.net.cn

聚合器和组超时

从版本 4.0 开始,引入了两个新的互斥属性:group-timeoutgroup-timeout-expression. 请参阅使用 XML 配置聚合器。 在某些情况下,如果ReleaseStrategy在当前消息到达时不释放。 为此,groupTimeout选项允许调度MessageGroup强制完成,如下例所示:spring-doc.cadn.net.cn

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

在此示例中,如果聚合器按顺序接收最后一条消息,则可以正常发布release-strategy-expression. 如果该特定消息未到达,则groupTimeout强制组在 10 秒后完成,只要该组至少包含两个 Message。spring-doc.cadn.net.cn

强制组完成的结果取决于ReleaseStrategysend-partial-result-on-expiry. 首先,再次咨询发布策略,看看是否要进行正常的发布。 虽然组没有更改,ReleaseStrategy此时可以决定释放该组。 如果发布策略仍未释放该组,则表示该组已过期。 如果send-partial-result-on-expirytrue、(部分)MessageGroup作为普通聚合器回复消息发布到output-channel. 否则,它将被丢弃。spring-doc.cadn.net.cn

两者之间有区别groupTimeoutbehavior 和MessageGroupStoreReaper(请参阅 使用 XML 配置聚合器)。 收割者为所有MessageGroups 在MessageGroupStore周期性地。 这groupTimeout为每个MessageGroup如果新消息在groupTimeout. 此外,收割者可用于删除空组(如果expire-groups-upon-completion为 false)。spring-doc.cadn.net.cn

从版本 5.5 开始,groupTimeoutExpression可以评估为java.util.Date实例。 这在根据组创建时间确定计划任务时刻 (MessageGroup.getTimestamp()) 而不是当前消息到达,因为它是在groupTimeoutExpression的计算结果为long:spring-doc.cadn.net.cn

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"

使用注释配置聚合器

以下示例显示了配置了注释的聚合器:spring-doc.cadn.net.cn

public class Waiter {
  ...

  @Aggregator  (1)
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  (2)
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  (3)
  public String correlateBy(OrderItem item) {
    ...
  }
}
1 一个注释,指示此方法应用作聚合器。 如果将此类用作聚合器,则必须指定它。
2 一个注释,指示此方法用作聚合器的发布策略。 如果任何方法中都不存在,则聚合器将使用SimpleSequenceSizeReleaseStrategy.
3 一个注释,指示此方法应用作聚合器的关联策略。 如果未指示关联策略,则聚合器使用HeaderAttributeCorrelationStrategy基于CORRELATION_ID.

XML 元素提供的所有配置选项也可用于@Aggregator注解。spring-doc.cadn.net.cn

可以从 XML 中显式引用聚合器,或者如果@MessageEndpoint在类上定义,通过 Classpath 扫描自动检测。spring-doc.cadn.net.cn

注解配置 (@Aggregator和其他)仅涵盖简单的使用案例,其中大多数默认选项就足够了。 如果您在使用 annotation 配置时需要对这些选项进行更多控制,请考虑使用@Bean定义AggregatingMessageHandler并标记其@Beanmethod 替换为@ServiceActivator,如下例所示:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}
从版本 4.2 开始,AggregatorFactoryBean可用于简化 Java 的 Java 配置AggregatingMessageHandler.

在 Aggregator 中管理状态:MessageGroupStore

Aggregator(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它要求根据一段时间内到达的一组消息做出决策,所有这些消息都具有相同的关联键。 有状态模式(例如ReleaseStrategy) 由以下原则驱动:组件(无论是由框架定义还是由用户定义)应该能够保持无状态。 所有状态都由MessageGroup,其管理权委托给MessageGroupStore. 这MessageGroupStore接口定义如下:spring-doc.cadn.net.cn

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

MessageGroupStore在 中积累状态信息MessageGroups等待发布策略触发时,该事件可能永远不会发生。 因此,为了防止过时的消息延迟,并让易失性存储提供一个钩子,以便在应用程序关闭时进行清理,使用MessageGroupStore允许您注册回调以应用于其MessageGroups当它们到期时。 该界面非常简单,如下面的清单所示:spring-doc.cadn.net.cn

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。spring-doc.cadn.net.cn

MessageGroupStore维护这些回调的列表,该列表按需应用于时间戳早于作为参数提供的时间的所有消息(请参阅registerMessageGroupExpiryCallback(..)expireMessageGroups(..)方法)。spring-doc.cadn.net.cn

重要的是不要使用相同的MessageGroupStore实例中,当您打算依赖expireMessageGroups功能性。 每AbstractCorrelatingMessageHandler注册自己的MessageGroupCallback基于forceComplete()回调。 这样,每个过期的组都可能被错误的聚合器完成或丢弃。 从版本 5.0.10 开始,UniqueExpiryCallbackAbstractCorrelatingMessageHandler对于 Registration(注册)回调中的MessageGroupStore. 这MessageGroupStore,反过来,检查是否存在此类的实例,并记录错误并显示相应的消息(如果回调集中已存在该实例)。 这样,框架就不允许使用MessageGroupStore实例,以避免上述的副作用,即不是由特定关联处理程序创建的组过期。

您可以调用expireMessageGroups方法。 任何早于当前时间减去此值的消息都已过期,并应用了回调。 因此,是 store 的用户定义了消息组 “expiry” 的含义。spring-doc.cadn.net.cn

为了方便用户, Spring 集成以MessageGroupStoreReaper,如下例所示:spring-doc.cadn.net.cn

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

收割者是一个Runnable. 在前面的示例中,消息组存储的 expire 方法每 10 秒调用一次。 超时本身为 30 秒。spring-doc.cadn.net.cn

重要的是要了解MessageGroupStoreReaper是一个近似值,并且受 Task Scheduler 的速率影响,因为此属性仅在下一次计划执行MessageGroupStoreReaper任务。 例如,如果超时设置为 10 分钟,但MessageGroupStoreReaper任务计划为每小时运行一次,最后一次执行MessageGroupStoreReaper任务发生在超时前一分钟,则MessageGroup在接下来的 59 分钟内不会过期。 因此,我们建议将速率设置为至少等于或更短的超时值。

除了 reaper 之外,当应用程序关闭时,还会通过AbstractCorrelatingMessageHandler.spring-doc.cadn.net.cn

AbstractCorrelatingMessageHandler注册自己的 expiry 回调,这是带有 boolean 标志的 Linksend-partial-result-on-expiry在聚合器的 XML 配置中。 如果标志设置为true,然后,当调用过期回调时,尚未发布的组中的任何未标记消息都可以发送到输出通道。spring-doc.cadn.net.cn

由于MessageGroupStoreReaper从计划任务中调用,并可能导致生成消息(取决于sendPartialResultOnExpiry选项)添加到下游集成流程中,建议提供自定义的TaskScheduler替换为MessagePublishingErrorHandler处理异常errorChannel,正如常规 Aggregator 版本功能所期望的那样。 相同的逻辑也适用于组超时功能,该功能也依赖于TaskScheduler. 有关更多信息,请参阅错误处理

当共享的MessageStore用于不同的关联终端节点,则必须配置适当的CorrelationStrategy以确保组 ID 的唯一性。 否则,当一个关联终端节点释放来自其他终端节点的消息或使其他关联终端节点的消息过期时,可能会发生意外行为。 具有相同关联键的消息存储在同一个消息组中。spring-doc.cadn.net.cn

一些MessageStore实现允许通过对数据进行分区来使用相同的物理资源。 例如,JdbcMessageStore具有region属性和MongoDbMessageStore具有collectionName财产。spring-doc.cadn.net.cn

有关MessageStore接口及其实现,请参阅 Message Storespring-doc.cadn.net.cn

Flux 聚合器

在版本 5.2 中,FluxAggregatorMessageHandler组件。 它基于 Project ReactorFlux.groupBy()Flux.window()运营商。 传入消息将发送到FluxSinkFlux.create()在此组件的构造函数中。 如果outputChannel未提供,或者它不是ReactiveStreamsSubscribableChannel,则订阅到主FluxLifecycle.start()实现。 否则,它将推迟到ReactiveStreamsSubscribableChannel实现。 消息按Flux.groupBy()使用CorrelationStrategy对于组键。 默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID标头。spring-doc.cadn.net.cn

默认情况下,每个关闭的窗口都作为Fluxin payload 中。 此消息包含窗口中第一条消息的所有标头。 这Flux在输出消息中,必须订阅 payload 并在下游进行处理。 这样的逻辑可以被setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)configuration 选项的FluxAggregatorMessageHandler. 例如,如果我们想让List的有效负载,我们可以配置Flux.collectList()喜欢这个:spring-doc.cadn.net.cn

fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));

“中有几个选项FluxAggregatorMessageHandler要选择合适的窗口策略:spring-doc.cadn.net.cn

  • setBoundaryTrigger(Predicate<Message<?>>)- 传播到Flux.windowUntil()算子。 有关更多信息,请参阅其 JavaDocs。 优先于所有其他窗口选项。spring-doc.cadn.net.cn

  • setWindowSize(int)setWindowSizeFunction(Function<Message<?>, Integer>)- 传播到Flux.window(int)windowTimeout(int, Duration). 默认情况下,窗口大小是根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE页眉。spring-doc.cadn.net.cn

  • setWindowTimespan(Duration)- 传播到Flux.window(Duration)windowTimeout(int, Duration)取决于 Window Size 配置。spring-doc.cadn.net.cn

  • setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 一个函数,用于将变换应用于 Grouped Fluxes 中,用于 Exposed 选项未涵盖的任何自定义窗口作。spring-doc.cadn.net.cn

由于此组件是一个MessageHandler实现它可以简单地用作@Bean定义与@ServiceActivatormessaging 注释。 使用 Java DSL,可以从.handle()EIP 方法。 下面的示例演示了如何注册IntegrationFlow在运行时,以及FluxAggregatorMessageHandler可以与上游的 splitter 相关联:spring-doc.cadn.net.cn

IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();

Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

消息组的条件

从版本 5.5 开始,AbstractCorrelatingMessageHandler(包括其 Java 和 XML DSLs)公开了一个groupConditionSupplier选项BiFunction<Message<?>, String, String>实现。 此函数用于添加到组的每条消息,并将结果条件句子存储到组中以供将来考虑。 这ReleaseStrategy可以查阅此条件,而不是迭代组中的所有消息。 看GroupConditionProviderJavaDocs 和 Message Group Condition 了解更多信息。spring-doc.cadn.net.cn