聚合器基本上是拆分器的镜像,是一种消息处理程序,它接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游使用者。
从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。
它必须保存要聚合的消息,并确定何时准备好聚合完整的消息组。
为此,它需要一个 .MessageStore
功能性
聚合器通过关联和存储一组相关消息来组合它们,直到该组被视为完整。 此时,聚合器通过处理整个组来创建单个消息,并将聚合消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即,从多个消息创建单个消息)。 两个相关的概念是关联和发布。
相关性确定如何对消息进行分组以进行聚合。
在Spring Integration中,默认情况下,基于消息头进行关联。
具有相同内容的消息将组合在一起。
但是,您可以自定义关联策略,以允许以其他方式指定消息应如何组合在一起。
为此,您可以实现 (本章后面将介绍)。IntegrationMessageHeaderAccessor.CORRELATION_ID
IntegrationMessageHeaderAccessor.CORRELATION_ID
CorrelationStrategy
要确定准备处理一组消息的时间点,请咨询 a。
聚合器的默认发布策略根据标头,当序列中包含的所有消息都存在时,会释放一个组。
您可以通过提供对自定义实现的引用来覆盖此默认策略。ReleaseStrategy
IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
ReleaseStrategy
编程模型
聚合 API 由许多类组成:
-
接口 及其子类: 和
MessageGroupProcessor
MethodInvokingAggregatingMessageGroupProcessor
ExpressionEvaluatingMessageGroupProcessor
-
接口及其默认实现:
ReleaseStrategy
SimpleSequenceSizeReleaseStrategy
-
接口及其默认实现:
CorrelationStrategy
HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
() 的子类是一个实现,封装了聚合器的通用功能(以及其他相关用例),如下所示:AggregatingMessageHandler
AbstractCorrelatingMessageHandler
MessageHandler
-
将消息关联到要聚合的组中
-
将这些消息保留在 a 中,直到可以释放组
MessageStore
-
确定何时可以释放组
-
将已发布的组聚合到单个消息中
-
识别和响应过期组
决定如何将消息组合在一起的责任委托给实例。
决定是否可以释放消息组的责任委托给实例。CorrelationStrategy
ReleaseStrategy
以下列表显示了基础的简要突出显示(实现该方法的责任留给开发人员):AbstractAggregatingMessageGroupProcessor
aggregatePayloads
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅 和 作为 的开箱即用实现。DefaultAggregatingMessageGroupProcessor
ExpressionEvaluatingMessageGroupProcessor
MethodInvokingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
从版本 5.2 开始,可以使用一种策略来合并和计算(聚合)输出消息的标头。
该实现的逻辑可返回组之间没有冲突的所有标头;组内一条或多封邮件上缺少标头不被视为冲突。
冲突的标头将被省略。
与新引入的 一起,此函数用于任何任意(非)实现。
从本质上讲,框架将提供的函数注入到实例中,并将所有其他实现包装到一个 .
和 之间的逻辑差异在于,后者在调用委托策略之前不会提前计算标头,并且如果委托返回 或 ,则不会调用该函数。
在这种情况下,框架假定目标实现已负责生成一组填充到返回结果中的适当标头。
该策略可用作 XML 配置的引用属性、Java DSL 的选项和纯 Java 配置的引用属性。Function<MessageGroup, Map<String, Object>>
AbstractAggregatingMessageGroupProcessor
DefaultAggregateHeadersFunction
DelegatingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
MessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
DelegatingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
DelegatingMessageGroupProcessor
Message
AbstractIntegrationMessageBuilder
Function<MessageGroup, Map<String, Object>>
headers-function
AggregatorSpec.headersFunction()
AggregatorFactoryBean.setHeadersFunction()
由 和 拥有,具有基于消息标头的默认值,如以下示例所示:CorrelationStrategy
AbstractCorrelatingMessageHandler
IntegrationMessageHeaderAccessor.CORRELATION_ID
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是 .
它创建一个单,其有效负载是为给定组接收的有效负载之一。
这适用于具有拆分器、发布-订阅通道或上游收件人列表路由器的简单分散-收集实现。DefaultAggregatingMessageGroupProcessor
Message
List
在这种情况下使用发布-订阅通道或收件人列表路由器时,请务必启用该标志。
这样做会添加必要的标头:、 和 。
默认情况下,Spring Integration 中的拆分器启用此行为,但未为发布-订阅通道或收件人列表路由器启用该行为,因为这些组件可能在不需要这些标头的各种上下文中使用。apply-sequence CORRELATION_ID SEQUENCE_NUMBER SEQUENCE_SIZE |
为应用程序实现特定的聚合器策略时,可以扩展和实现该方法。
但是,对于实现聚合逻辑,有更好的解决方案,与 API 的耦合较少,可以通过 XML 或注释进行配置。AbstractAggregatingMessageGroupProcessor
aggregatePayloads
通常,任何 POJO 都可以实现聚合算法,前提是它提供了一个接受单个作为参数的方法(也支持参数化列表)。
调用此方法用于聚合消息,如下所示:java.util.List
-
如果参数为 a,并且参数类型 T 可赋给 ,则为聚合而累积的消息的整个列表将发送到聚合器。
java.util.Collection<T>
Message
-
如果参数是非参数化的,或者参数类型不能赋给 ,则该方法将接收累积消息的有效负载。
java.util.Collection
Message
-
如果返回类型不能分配给 ,则将其视为框架自动创建的有效负载。
Message
Message
为了简化代码并推广最佳实践(如低耦合性、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。 |
从 V5.3 开始,在处理消息组后,an 会为具有多个嵌套级别的正确拆分器聚合器方案执行消息头修改。
仅当消息组发布结果不是消息集合时,才会执行此操作。
在这种情况下,目标在构建这些消息时负责调用。AbstractCorrelatingMessageHandler
MessageBuilder.popSequenceDetails()
MessageGroupProcessor
MessageBuilder.popSequenceDetails()
如果返回 ,则仅当输出消息与组中的第一条消息匹配时,才会对输出消息执行 a。
(以前,仅当从 返回普通有效负载或 时,才会执行此操作。MessageGroupProcessor
Message
MessageBuilder.popSequenceDetails()
sequenceDetails
AbstractIntegrationMessageBuilder
MessageGroupProcessor
此功能可以由新属性控制,因此在某些情况下,当标准拆分器尚未填充相关详细信息时,可以禁用此功能。
从本质上讲,此属性会撤消 中最近的上游执行的操作。
有关详细信息,请参阅拆分器。popSequence
boolean
MessageBuilder.popSequenceDetails()
applySequence = true
AbstractMessageSplitter
该方法返回一个 .
因此,如果聚合 POJO 方法具有参数,则传入的参数正是该实例,并且当您将 a 用于聚合器时,在释放组后会清除该原始参数。
因此,如果 POJO 中的变量从聚合器中传递出去,它也会被清除。
如果希望按原样发布该集合以进行进一步处理,则必须构建一个新的(例如,)。
从版本 4.3 开始,框架不再将消息复制到新集合中,以避免创建不需要的额外对象。SimpleMessageGroup.getMessages() unmodifiableCollection Collection<Message> Collection SimpleMessageStore Collection<Message> Collection<Message> Collection new ArrayList<Message>(messages) |
在版本 4.2 之前,无法使用 XML 配置提供 by。
只有 POJO 方法可用于聚合。
现在,如果框架检测到引用的(或内部的)Bean 实现 ,它将被用作聚合器的输出处理器。MessageGroupProcessor
MessageProcessor
如果希望将自定义对象集合作为消息的有效负载发布,则类应扩展并实现 。MessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
aggregatePayloads()
此外,从版本 4.2 开始,提供了 a。
它返回来自组的消息集合,如前所述,这会导致单独发送已发布的消息。SimpleMessageGroupProcessor
这样一来,聚合器就可以充当消息屏障,在该屏障中,到达的消息将一直保留,直到发布策略触发,并且该组作为一系列单独的消息发布。
从版本 6.0 开始,上述拆分行为仅在组处理器为 .
否则,对于返回 的任何其他实现,只会发出一条回复消息,并将整个消息集合作为其有效负载。
这种逻辑是由聚合器的规范目的决定的——通过某个键收集请求消息并生成单个分组消息。SimpleMessageGroupProcessor
MessageGroupProcessor
Collection<Message>
ReleaseStrategy
接口定义如下:ReleaseStrategy
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,任何 POJO 都可以实现完成决策逻辑,前提是它提供了一个接受单个作为参数的方法(也支持参数化列表)并返回布尔值。
在每条新消息到达后调用此方法,以确定组是否完整,如下所示:java.util.List
-
如果参数为 a 且参数类型可赋给 ,则组中累积的消息的整个列表将发送到该方法。
java.util.List<T>
T
Message
-
如果参数是非参数化的,或者参数类型不能赋给 ,则该方法将接收累积消息的有效负载。
java.util.List
Message
-
如果消息组已准备好进行聚合,则该方法必须返回,否则为 false。
true
以下示例演示如何对 of 类型的 a 使用注释:@ReleaseStrategy
List
Message
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例演示如何对 of 类型的 a 使用注释:@ReleaseStrategy
List
String
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
根据前面两个示例中的签名,基于 POJO 的发布策略将传递尚未发布的消息(如果需要访问整个消息)或有效负载对象(如果 type 参数不是 )。
这满足了大多数用例。
但是,如果由于某种原因,您需要访问完整的 ,则应提供接口的实现。Collection
Message
Collection
Message
MessageGroup
ReleaseStrategy
在处理潜在的大型组时,应了解如何调用这些方法,因为在释放组之前,可能会多次调用发布策略。
最有效的是 的实现,因为聚合器可以直接调用它。
第二种最有效的是具有参数类型的 POJO 方法。
效率最低的是具有类型的 POJO 方法。
每次调用发布策略时,框架都必须将有效负载从组中的消息复制到新的集合中(并可能尝试将有效负载转换为 )。
使用可以避免转换,但仍需要创建新的 . 出于这些原因,对于大型组,我们建议您实现 。 |
当释放组进行聚合时,将处理其所有尚未发布的消息并将其从组中删除。
如果组也已完成(即,如果序列中的所有消息都已到达,或者未定义序列),则该组将标记为已完成。
此组的任何新消息都将发送到丢弃通道(如果已定义)。
设置为(默认为 )将删除整个组,并且任何新邮件(与已删除的组具有相同的相关 ID)将组成一个新组。
您可以通过将 a 与设置为 一起释放部分序列。expire-groups-upon-completion
true
false
MessageGroupStoreReaper
send-partial-result-on-expiry
true
为了便于丢弃延迟到达的消息,聚合器必须在释放组后保持其状态。
这最终可能导致内存不足的情况。
为避免此类情况,应考虑配置 a 以删除组元数据。
到期参数应设置为在达到某个点后使组过期,在此之后预计不会到达延迟的消息。
有关配置收割器的信息,请参阅在聚合器中管理状态:MessageGroupStore 。MessageGroupStoreReaper |
Spring Integration 提供了以下项的实现: .
此实现会查询每条到达消息的 和 标头,以确定消息组何时完成并准备好进行聚合。
如前所述,这也是默认策略。ReleaseStrategy
SimpleSequenceSizeReleaseStrategy
SEQUENCE_NUMBER
SEQUENCE_SIZE
在版本 5.0 之前,默认发布策略是 ,这在大型组中表现不佳。
使用该策略,可以检测并剔除重复的序列号。
此操作可能很昂贵。SequenceSizeReleaseStrategy |
如果要聚合大型组,则不需要释放部分组,也不需要检测/拒绝重复序列,请考虑改用 - 对于这些用例,它的效率要高得多,并且是自 5.0 版以来的默认设置,当未指定部分组释放时。SimpleSequenceSizeReleaseStrategy
聚合大型组
4.3 版本将 a 中消息的默认值更改为 (以前是 )。
当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。
尽管哈希集的删除速度通常要快得多,但对于大型消息来说,它可能很昂贵,因为必须在插入和删除时计算哈希值。
如果消息的哈希处理成本很高,请考虑使用其他集合类型。
如使用 MessageGroupFactory
中所述,提供了 a,以便您可以选择最适合您需求的。
您还可以提供自己的工厂实现来创建其他一些 .Collection
SimpleMessageGroup
HashSet
BlockingQueue
SimpleMessageGroupFactory
Collection
Collection<Message<?>>
以下示例演示如何使用以前的实现和 :SimpleSequenceSizeReleaseStrategy
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果筛选器终结点涉及聚合器的上游流,则序列大小释放策略(固定或基于标头)将无法实现其目的,因为筛选器可能会丢弃序列中的某些消息。
在这种情况下,建议选择另一个 ,或者使用从丢弃子流发送的补偿消息,该子流在其内容中携带一些信息,以在自定义的完整组函数中跳过。
有关详细信息,请参阅筛选器。sequenceSize ReleaseStrategy |
关联策略
接口定义如下:CorrelationStrategy
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个,表示用于将消息与消息组关联的关联键。
该密钥必须满足 a 中用于 和 实现的密钥的条件。Object
Map
equals()
hashCode()
通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与 for a 的规则相同(包括对注解的支持)。
该方法必须返回一个值,并且该值不得为 。ServiceActivator
@Header
null
Spring Integration 提供了以下项的实现: .
此实现返回其中一个消息标头(其名称由构造函数参数指定)的值作为关联键。
默认情况下,关联策略是返回标头属性值的策略。
如果您有要用于关联的自定义标头名称,则可以在 的实例上配置它,并将其作为聚合器关联策略的参考。CorrelationStrategy
HeaderAttributeCorrelationStrategy
HeaderAttributeCorrelationStrategy
CORRELATION_ID
HeaderAttributeCorrelationStrategy
锁定注册表
对组的更改是线程安全的。
因此,当您同时发送同一相关 ID 的消息时,聚合器中只会处理其中一条消息,从而有效地将其作为每个消息组的单线程。
A 用于获取已解析的关联 ID 的锁。
默认情况下使用 A(内存中)。
若要在使用共享的服务器之间同步更新,必须配置共享锁注册表。LockRegistry
DefaultLockRegistry
MessageGroupStore
避免死锁
如上所述,当消息组发生突变(添加或释放消息)时,将保持锁定。
请考虑以程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。
这将导致线程挂起,并可能呈现如下结果:jstack <pid>
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题:
-
确保每个聚合器都有自己的锁定注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表)
-
使用 OR 作为聚合器的输出通道,以便下游流在新线程上运行
ExecutorChannel
QueueChannel
-
从版本 5.1.1 开始,将 Aggregator 属性设置为
releaseLockBeforeSend
true
如果由于某种原因,单个聚合器的输出最终被路由回同一聚合器,也可能导致此问题。 当然,上述第一种解决方案不适用于这种情况。 |
在这种情况下使用发布-订阅通道或收件人列表路由器时,请务必启用该标志。
这样做会添加必要的标头:、 和 。
默认情况下,Spring Integration 中的拆分器启用此行为,但未为发布-订阅通道或收件人列表路由器启用该行为,因为这些组件可能在不需要这些标头的各种上下文中使用。apply-sequence CORRELATION_ID SEQUENCE_NUMBER SEQUENCE_SIZE |
为了简化代码并推广最佳实践(如低耦合性、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。 |
该方法返回一个 .
因此,如果聚合 POJO 方法具有参数,则传入的参数正是该实例,并且当您将 a 用于聚合器时,在释放组后会清除该原始参数。
因此,如果 POJO 中的变量从聚合器中传递出去,它也会被清除。
如果希望按原样发布该集合以进行进一步处理,则必须构建一个新的(例如,)。
从版本 4.3 开始,框架不再将消息复制到新集合中,以避免创建不需要的额外对象。SimpleMessageGroup.getMessages() unmodifiableCollection Collection<Message> Collection SimpleMessageStore Collection<Message> Collection<Message> Collection new ArrayList<Message>(messages) |
在处理潜在的大型组时,应了解如何调用这些方法,因为在释放组之前,可能会多次调用发布策略。
最有效的是 的实现,因为聚合器可以直接调用它。
第二种最有效的是具有参数类型的 POJO 方法。
效率最低的是具有类型的 POJO 方法。
每次调用发布策略时,框架都必须将有效负载从组中的消息复制到新的集合中(并可能尝试将有效负载转换为 )。
使用可以避免转换,但仍需要创建新的 . 出于这些原因,对于大型组,我们建议您实现 。 |
为了便于丢弃延迟到达的消息,聚合器必须在释放组后保持其状态。
这最终可能导致内存不足的情况。
为避免此类情况,应考虑配置 a 以删除组元数据。
到期参数应设置为在达到某个点后使组过期,在此之后预计不会到达延迟的消息。
有关配置收割器的信息,请参阅在聚合器中管理状态:MessageGroupStore 。MessageGroupStoreReaper |
在版本 5.0 之前,默认发布策略是 ,这在大型组中表现不佳。
使用该策略,可以检测并剔除重复的序列号。
此操作可能很昂贵。SequenceSizeReleaseStrategy |
如果筛选器终结点涉及聚合器的上游流,则序列大小释放策略(固定或基于标头)将无法实现其目的,因为筛选器可能会丢弃序列中的某些消息。
在这种情况下,建议选择另一个 ,或者使用从丢弃子流发送的补偿消息,该子流在其内容中携带一些信息,以在自定义的完整组函数中跳过。
有关详细信息,请参阅筛选器。sequenceSize ReleaseStrategy |
如果由于某种原因,单个聚合器的输出最终被路由回同一聚合器,也可能导致此问题。 当然,上述第一种解决方案不适用于这种情况。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器的信息,请参阅聚合器和重排序器。
使用 XML 配置聚合器
Spring Integration 支持通过元素配置带有 XML 的聚合器。
以下示例显示了聚合器的示例:<aggregator/>
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合器的 ID 是可选的。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动聚合器。 可选(默认值为“true”)。 |
3 | 聚合器从中接收消息的通道。 必填。 |
4 | 聚合器将聚合结果发送到的通道。 可选(因为传入消息本身可以在“replyChannel”消息标头中指定回复通道)。 |
5 | 聚合器将超时消息发送到的通道(如果为 )。
自选。send-partial-result-on-expiry false |
6 | 对用于将消息组存储在其相关键下直到它们完成为止的引用。
自选。
默认情况下,它是一个易失性内存存储。
有关详细信息,请参阅消息存储。MessageGroupStore |
7 | 当多个句柄订阅相同的句柄时,此聚合器的顺序(用于负载平衡目的)。
自选。DirectChannel |
8 | 指示过期消息在包含过期后应聚合并发送到“output-channel”或“replyChannel”(请参阅 MessageGroupStore.expireMessageGroups(long) )。
使 a 过期的一种方法是配置 .
但是,您也可以通过调用 .
您可以通过 Control Bus 操作来实现此目的,或者,如果您有对实例的引用,则通过调用 。
否则,此属性本身不执行任何操作。
它仅用作是否丢弃或发送到输出或应答通道的任何消息的指示器,这些消息仍在即将过期的通道中。
可选(默认值为 )。
注意:此属性可能更适合调用,因为如果设置为 ,则该组实际上可能不会过期。MessageGroup MessageGroup MessageGroupStoreReaper MessageGroup MessageGroupStore.expireMessageGroups(timeout) MessageGroupStore expireMessageGroups(timeout) MessageGroup false send-partial-result-on-timeout expire-groups-upon-timeout false |
9 | 向 或 发送回复时要等待的超时间隔。
默认为秒。
仅当输出通道具有某些“发送”限制(例如具有固定“容量”)时,才应用它。
在本例中,将抛出 a。
对于实现,将忽略 。
对于 ,来自计划到期任务将导致重新计划此任务。
自选。Message output-channel discard-channel 30 QueueChannel MessageDeliveryException AbstractSubscribableChannel send-timeout group-timeout(-expression) MessageDeliveryException |
10 | 对实现消息关联(分组)算法的 Bean 的引用。
Bean 可以是接口的实现,也可以是 POJO。
在后一种情况下,还必须定义属性。
可选(默认情况下,聚合器使用标头)。CorrelationStrategy correlation-strategy-method IntegrationMessageHeaderAccessor.CORRELATION_ID |
11 | 在 引用的 Bean 上定义的方法。
它实现了相关决策算法。
可选,但有限制(必须存在)。correlation-strategy correlation-strategy |
12 | 表示相关策略的 SpEL 表达式。
例:。
只允许使用其中之一。"headers['something']" correlation-strategy correlation-strategy-expression |
13 | 对应用程序上下文中定义的 Bean 的引用。 Bean 必须实现聚合逻辑,如前所述。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。 |
14 | 在属性引用的 Bean 上定义的方法。
它实现了消息聚合算法。
可选(取决于所定义的属性)。ref ref |
15 | 对实现发布策略的 Bean 的引用。
Bean 可以是接口的实现,也可以是 POJO。
在后一种情况下,还必须定义属性。
可选(默认情况下,聚合器使用 header 属性)。ReleaseStrategy release-strategy-method IntegrationMessageHeaderAccessor.SEQUENCE_SIZE |
16 | 在属性引用的 Bean 上定义的方法。
它实现了完成决策算法。
可选,但有限制(必须存在)。release-strategy release-strategy |
17 | 表示发布策略的 SpEL 表达式。
表达式的根对象是 .
例:。
只允许使用其中之一。MessageGroup "size() == 5" release-strategy release-strategy-expression |
18 | 当设置为 (默认值为 )时,已完成的组将从邮件存储中删除,从而让具有相同相关性的后续邮件形成一个新组。
默认行为是将与已完成的组具有相同相关性的消息发送到 .true false discard-channel |
19 | 仅当为 的 配置了 a 时才适用。
默认情况下,当 a 配置为使部分组过期时,也会删除空组。
在正常释放组后,空组存在。
空组可以检测和丢弃迟到的消息。
如果希望空组的过期时间比过期分部组的时间更长,请设置此属性。
然后,空组不会从中删除,直到它们至少在此毫秒数内未被修改。
请注意,空组过期的实际时间也受收割者的属性影响,它可能与此值加上超时一样多。MessageGroupStoreReaper MessageStore <aggregator> MessageGroupStoreReaper MessageStore timeout |
20 | 对 Bean 的引用。
它用于获取基于 for 并发操作的 .
默认情况下,使用内部。
使用分布式 ,例如 ,可确保聚合器只有一个实例可以同时对组进行操作。
有关详细信息,请参阅 Redis Lock Registry 或 Zookeeper Lock Registry。org.springframework.integration.util.LockRegistry Lock groupId MessageGroup DefaultLockRegistry LockRegistry ZookeeperLockRegistry |
21 | 超时(以毫秒为单位),用于在当前消息到达时未释放组时强制完成。
当需要发出部分结果(或丢弃组)时,如果新消息未在超时时间内到达,则此属性为聚合器提供内置的基于时间的发布策略,超时时间从最后一条消息到达的时间开始计算。
若要设置从创建时间开始计算的超时,请参阅信息。
当新消息到达聚合器时,其任何现有消息都将被取消。
如果返回(表示不释放)和 ,则计划一个新任务使组过期。
我们不建议将此属性设置为零(或负值)。
这样做会有效地禁用聚合器,因为每个消息组都会立即完成。
但是,您可以使用表达式有条件地将其设置为零(或负值)。
有关信息,请参阅。
在完成过程中执行的操作取决于 和 属性。
有关详细信息,请参阅聚合器和组超时。
它与“group-timeout-expression”属性是互斥的。MessageGroup ReleaseStrategy MessageGroup MessageGroup group-timeout-expression ScheduledFuture<?> MessageGroup ReleaseStrategy false groupTimeout > 0 group-timeout-expression ReleaseStrategy send-partial-group-on-expiry |
22 | 计算结果为 a 的 SpEL 表达式,该表达式将 作为评估上下文对象。
用于计划强制完成。
如果表达式的计算结果为 ,则不计划完成。
如果计算结果为零,则该组将立即在当前线程上完成。
实际上,这提供了一个动态属性。
例如,如果您希望在创建组后 10 秒后强制完成 a,则可以考虑使用以下 SpEL 表达式: where is provided by as here is the evaluation context object.
但请记住,组创建时间可能与首次到达的消息的时间不同,具体取决于其他组过期属性的配置。
有关详细信息,请参阅。
与“group-timeout”属性互斥。groupTimeout MessageGroup #root MessageGroup null group-timeout MessageGroup timestamp + 10000 - T(System).currentTimeMillis() timestamp MessageGroup.getTimestamp() MessageGroup #root group-timeout |
23 | 当组由于超时(或超时)而完成时,默认情况下,该组将过期(完全删除)。
迟到的消息将启动一个新组。
您可以将其设置为完成组,但保留其元数据,以便丢弃延迟到达的消息。
空组可以稍后使用与属性一起过期。
它默认为“true”。MessageGroupStoreReaper false MessageGroupStoreReaper empty-group-min-timeout |
24 | 一个 Bean 引用,用于调度 在 中没有新消息到达时强制完成 。
如果未提供,则使用在 () 中注册的默认调度程序 ()。
如果指定或未指定,则此属性不适用。TaskScheduler MessageGroup MessageGroup groupTimeout taskScheduler ApplicationContext ThreadPoolTaskScheduler group-timeout group-timeout-expression |
25 | 从版本 4.1 开始。
它允许为操作启动事务。
它由 a 或 a 启动,不应用于正常的 、 和 操作。
只允许使用此子元素 or。forceComplete group-timeout(-expression) MessageGroupStoreReaper add release discard <expire-advice-chain/> |
26 | 从版本 4.1 开始。
它允许为操作配置任何操作。
它由 a 或 a 启动,不应用于正常的 、 和 操作。
只允许使用此子元素 or。
也可以在此处使用 Spring 命名空间配置事务。Advice forceComplete group-timeout(-expression) MessageGroupStoreReaper add release discard <expire-transactional/> Advice tx |
即将到期的组
有两个属性与过期(完全删除)组相关。
当一个组过期时,没有它的记录,如果有新消息以相同的相关性到达,则会启动一个新组。
当组完成(没有过期)时,空组将保留,延迟到达的消息将被丢弃。
稍后可以通过将空组与属性结合使用来删除。
如果组未正常完成,但因超时而被释放或丢弃,则该组通常已过期。
从版本 4.1 开始,您可以使用 来控制此行为。
它默认为向后兼容。
从版本 5.0 开始,空组也计划在 之后删除。
如果 和 ,则在正常或部分序列释放发生时安排删除组的任务。 从版本 5.4 开始,可以将聚合器(和重排序器)配置为使孤立组(持久性消息存储中可能不会释放的组)过期。
(如果大于 )指示应清除存储中早于此值的组。
该方法在启动时调用,并与提供的 一起定期在计划任务中调用。
此方法也可以随时对外部调用。
根据上面提到的提供的过期选项,过期逻辑完全委托给功能。
当需要从那些不再使用常规消息到达逻辑发布的旧组中清理消息存储时,这种定期清除功能非常有用。
在大多数情况下,这发生在应用程序重新启动后,当使用持久性消息组存储时。
该功能类似于计划任务,但在使用组超时而不是收割器时,提供了一种方便的方式来处理特定组件中的旧组。
必须专门为当前关联终结点提供。
否则,一个聚合器可能会从另一个聚合器中清除组。
使用聚合器时,使用此技术过期的组将被丢弃或作为部分组发布,具体取决于属性。 |
如果自定义聚合器处理程序实现可能在其他定义中引用,我们通常建议使用属性。
但是,如果自定义聚合器实现仅由 的单个定义使用 ,则可以使用内 Bean 定义(从版本 1.0.3 开始)在元素中配置聚合 POJO,如以下示例所示:ref
<aggregator>
<aggregator>
<aggregator>
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
不允许在同一配置中同时使用属性和内 Bean 定义,因为这会造成不明确的条件。
在这种情况下,会引发异常。ref <aggregator> |
以下示例显示了聚合器 Bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
上述示例的完成策略 Bean 的实现可能如下所示:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
只要这样做有意义,发布策略方法和聚合器方法都可以组合到一个 Bean 中。 |
上面示例中关联策略 Bean 的实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将按某个条件对数字进行分组(在本例中,余数除以 10 后),并保留该组,直到有效负载提供的数字之和超过某个值。
只要这样做有意义,发布策略方法、关联策略方法和聚合器方法都可以组合在一个 Bean 中。 (实际上,它们全部或其中任何两个都可以组合在一起。 |
聚合器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,您可以使用 SpEL 处理各种策略(关联、发布和聚合),如果此类发布策略背后的逻辑相对简单,我们建议使用。
假设您有一个旧组件,该组件旨在接收对象数组。
我们知道,默认发布策略将所有聚合的消息组合在 .
现在我们有两个问题。
首先,我们需要从列表中提取单个消息。
其次,我们需要提取每条消息的有效负载并组装对象数组。
下面的示例解决了这两个问题:List
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,在SpEL中,这样的需求实际上可以相对容易地用一行表达式来处理,从而避免了编写定制类并将其配置为bean的麻烦。 以下示例演示如何执行此操作:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载中组装一个新集合,然后将其转换为数组,从而获得与早期 Java 代码相同的结果。
在处理自定义发布和关联策略时,可以应用相同的基于表达式的方法。
您可以将简单的关联逻辑实现为 SpEL 表达式并在属性中配置它,而不是在属性中为自定义定义 Bean,如以下示例所示:CorrelationStrategy
correlation-strategy
correlation-strategy-expression
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效负载具有一个带有 的属性,该属性将用于关联消息。person
id
同样,对于 ,您可以将发布逻辑实现为 SpEL 表达式,并在属性中对其进行配置。
评估上下文的根本对象是它本身。
可以使用表达式中组的属性来引用 的消息。ReleaseStrategy
release-strategy-expression
MessageGroup
List
message
在 5.0 版之前的版本中,根对象是 的集合,如前面的示例所示:Message<?> |
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SpEL 评估上下文的根对象是 本身,并且您声明,只要此组中出现有效负载为 的消息,就应释放该组。MessageGroup
5
聚合器和组超时
从 4.0 版开始,引入了两个新的互斥属性:和 。
请参阅使用 XML 配置聚合器。
在某些情况下,如果当前消息到达时未释放,则可能需要在超时后发出聚合器结果(或丢弃组)。
为此,该选项允许强制完成计划,如以下示例所示:group-timeout
group-timeout-expression
ReleaseStrategy
groupTimeout
MessageGroup
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器按 .
如果该特定消息未到达,则强制该组在十秒后完成,只要该组包含至少两条消息即可。release-strategy-expression
groupTimeout
强制组完成的结果取决于 和 。
首先,再次咨询发布策略,以确定是否要进行正常发布。
虽然该组没有更改,但可以决定此时释放该组。
如果发布策略仍未释放该组,则该组已过期。
如果 是 ,则 (部分) 中的现有消息将作为普通聚合器回复消息发布给 。
否则,它将被丢弃。ReleaseStrategy
send-partial-result-on-expiry
ReleaseStrategy
send-partial-result-on-expiry
true
MessageGroup
output-channel
行为和之间存在差异(请参阅使用 XML 配置聚合器)。
收割者会定期启动所有 s 的强制完成。
如果在 .
此外,收割器可用于删除空组(如果为 false,则保留空组以丢弃延迟的消息)。groupTimeout
MessageGroupStoreReaper
MessageGroup
MessageGroupStore
groupTimeout
MessageGroup
groupTimeout
expire-groups-upon-completion
从版本 5.5 开始,可以评估为实例。
这在以下情况下很有用:根据组创建时间 () 而不是当前消息到达来确定计划的任务时刻,因为它是在计算时计算的:groupTimeoutExpression
java.util.Date
MessageGroup.getTimestamp()
groupTimeoutExpression
long
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器
以下示例显示了配置了批注的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 指示此方法应用作聚合器的注释。 如果将此类用作聚合器,则必须指定它。 |
2 | 指示此方法用作聚合器的发布策略的注释。
如果任何方法上都不存在,则聚合器使用 .SimpleSequenceSizeReleaseStrategy |
3 | 指示此方法应用作聚合器的关联策略的注释。
如果未指示关联策略,则聚合器使用基于 .HeaderAttributeCorrelationStrategy CORRELATION_ID |
XML 元素提供的所有配置选项也可用于批注。@Aggregator
聚合器可以从 XML 显式引用,或者,如果在类上定义了聚合器,则可以通过类路径扫描自动检测。@MessageEndpoint
Aggregator 组件的注释配置(和其他配置)仅涵盖简单的用例,其中大多数默认选项就足够了。
如果在使用批注配置时需要对这些选项进行更多控制,请考虑使用 和 标记其方法的定义,如以下示例所示:@Aggregator
@Bean
AggregatingMessageHandler
@Bean
@ServiceActivator
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
有关更多信息,请参见 Programming Model and Annotations on @Bean
Methods。
从版本 4.2 开始,可用于简化 的 Java 配置。AggregatorFactoryBean AggregatingMessageHandler |
1 | 聚合器的 ID 是可选的。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动聚合器。 可选(默认值为“true”)。 |
3 | 聚合器从中接收消息的通道。 必填。 |
4 | 聚合器将聚合结果发送到的通道。 可选(因为传入消息本身可以在“replyChannel”消息标头中指定回复通道)。 |
5 | 聚合器将超时消息发送到的通道(如果为 )。
自选。send-partial-result-on-expiry false |
6 | 对用于将消息组存储在其相关键下直到它们完成为止的引用。
自选。
默认情况下,它是一个易失性内存存储。
有关详细信息,请参阅消息存储。MessageGroupStore |
7 | 当多个句柄订阅相同的句柄时,此聚合器的顺序(用于负载平衡目的)。
自选。DirectChannel |
8 | 指示过期消息在包含过期后应聚合并发送到“output-channel”或“replyChannel”(请参阅 MessageGroupStore.expireMessageGroups(long) )。
使 a 过期的一种方法是配置 .
但是,您也可以通过调用 .
您可以通过 Control Bus 操作来实现此目的,或者,如果您有对实例的引用,则通过调用 。
否则,此属性本身不执行任何操作。
它仅用作是否丢弃或发送到输出或应答通道的任何消息的指示器,这些消息仍在即将过期的通道中。
可选(默认值为 )。
注意:此属性可能更适合调用,因为如果设置为 ,则该组实际上可能不会过期。MessageGroup MessageGroup MessageGroupStoreReaper MessageGroup MessageGroupStore.expireMessageGroups(timeout) MessageGroupStore expireMessageGroups(timeout) MessageGroup false send-partial-result-on-timeout expire-groups-upon-timeout false |
9 | 向 或 发送回复时要等待的超时间隔。
默认为秒。
仅当输出通道具有某些“发送”限制(例如具有固定“容量”)时,才应用它。
在本例中,将抛出 a。
对于实现,将忽略 。
对于 ,来自计划到期任务将导致重新计划此任务。
自选。Message output-channel discard-channel 30 QueueChannel MessageDeliveryException AbstractSubscribableChannel send-timeout group-timeout(-expression) MessageDeliveryException |
10 | 对实现消息关联(分组)算法的 Bean 的引用。
Bean 可以是接口的实现,也可以是 POJO。
在后一种情况下,还必须定义属性。
可选(默认情况下,聚合器使用标头)。CorrelationStrategy correlation-strategy-method IntegrationMessageHeaderAccessor.CORRELATION_ID |
11 | 在 引用的 Bean 上定义的方法。
它实现了相关决策算法。
可选,但有限制(必须存在)。correlation-strategy correlation-strategy |
12 | 表示相关策略的 SpEL 表达式。
例:。
只允许使用其中之一。"headers['something']" correlation-strategy correlation-strategy-expression |
13 | 对应用程序上下文中定义的 Bean 的引用。 Bean 必须实现聚合逻辑,如前所述。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。 |
14 | 在属性引用的 Bean 上定义的方法。
它实现了消息聚合算法。
可选(取决于所定义的属性)。ref ref |
15 | 对实现发布策略的 Bean 的引用。
Bean 可以是接口的实现,也可以是 POJO。
在后一种情况下,还必须定义属性。
可选(默认情况下,聚合器使用 header 属性)。ReleaseStrategy release-strategy-method IntegrationMessageHeaderAccessor.SEQUENCE_SIZE |
16 | 在属性引用的 Bean 上定义的方法。
它实现了完成决策算法。
可选,但有限制(必须存在)。release-strategy release-strategy |
17 | 表示发布策略的 SpEL 表达式。
表达式的根对象是 .
例:。
只允许使用其中之一。MessageGroup "size() == 5" release-strategy release-strategy-expression |
18 | 当设置为 (默认值为 )时,已完成的组将从邮件存储中删除,从而让具有相同相关性的后续邮件形成一个新组。
默认行为是将与已完成的组具有相同相关性的消息发送到 .true false discard-channel |
19 | 仅当为 的 配置了 a 时才适用。
默认情况下,当 a 配置为使部分组过期时,也会删除空组。
在正常释放组后,空组存在。
空组可以检测和丢弃迟到的消息。
如果希望空组的过期时间比过期分部组的时间更长,请设置此属性。
然后,空组不会从中删除,直到它们至少在此毫秒数内未被修改。
请注意,空组过期的实际时间也受收割者的属性影响,它可能与此值加上超时一样多。MessageGroupStoreReaper MessageStore <aggregator> MessageGroupStoreReaper MessageStore timeout |
20 | 对 Bean 的引用。
它用于获取基于 for 并发操作的 .
默认情况下,使用内部。
使用分布式 ,例如 ,可确保聚合器只有一个实例可以同时对组进行操作。
有关详细信息,请参阅 Redis Lock Registry 或 Zookeeper Lock Registry。org.springframework.integration.util.LockRegistry Lock groupId MessageGroup DefaultLockRegistry LockRegistry ZookeeperLockRegistry |
21 | 超时(以毫秒为单位),用于在当前消息到达时未释放组时强制完成。
当需要发出部分结果(或丢弃组)时,如果新消息未在超时时间内到达,则此属性为聚合器提供内置的基于时间的发布策略,超时时间从最后一条消息到达的时间开始计算。
若要设置从创建时间开始计算的超时,请参阅信息。
当新消息到达聚合器时,其任何现有消息都将被取消。
如果返回(表示不释放)和 ,则计划一个新任务使组过期。
我们不建议将此属性设置为零(或负值)。
这样做会有效地禁用聚合器,因为每个消息组都会立即完成。
但是,您可以使用表达式有条件地将其设置为零(或负值)。
有关信息,请参阅。
在完成过程中执行的操作取决于 和 属性。
有关详细信息,请参阅聚合器和组超时。
它与“group-timeout-expression”属性是互斥的。MessageGroup ReleaseStrategy MessageGroup MessageGroup group-timeout-expression ScheduledFuture<?> MessageGroup ReleaseStrategy false groupTimeout > 0 group-timeout-expression ReleaseStrategy send-partial-group-on-expiry |
22 | 计算结果为 a 的 SpEL 表达式,该表达式将 作为评估上下文对象。
用于计划强制完成。
如果表达式的计算结果为 ,则不计划完成。
如果计算结果为零,则该组将立即在当前线程上完成。
实际上,这提供了一个动态属性。
例如,如果您希望在创建组后 10 秒后强制完成 a,则可以考虑使用以下 SpEL 表达式: where is provided by as here is the evaluation context object.
但请记住,组创建时间可能与首次到达的消息的时间不同,具体取决于其他组过期属性的配置。
有关详细信息,请参阅。
与“group-timeout”属性互斥。groupTimeout MessageGroup #root MessageGroup null group-timeout MessageGroup timestamp + 10000 - T(System).currentTimeMillis() timestamp MessageGroup.getTimestamp() MessageGroup #root group-timeout |
23 | 当组由于超时(或超时)而完成时,默认情况下,该组将过期(完全删除)。
迟到的消息将启动一个新组。
您可以将其设置为完成组,但保留其元数据,以便丢弃延迟到达的消息。
空组可以稍后使用与属性一起过期。
它默认为“true”。MessageGroupStoreReaper false MessageGroupStoreReaper empty-group-min-timeout |
24 | 一个 Bean 引用,用于调度 在 中没有新消息到达时强制完成 。
如果未提供,则使用在 () 中注册的默认调度程序 ()。
如果指定或未指定,则此属性不适用。TaskScheduler MessageGroup MessageGroup groupTimeout taskScheduler ApplicationContext ThreadPoolTaskScheduler group-timeout group-timeout-expression |
25 | 从版本 4.1 开始。
它允许为操作启动事务。
它由 a 或 a 启动,不应用于正常的 、 和 操作。
只允许使用此子元素 or。forceComplete group-timeout(-expression) MessageGroupStoreReaper add release discard <expire-advice-chain/> |
26 | 从版本 4.1 开始。
它允许为操作配置任何操作。
它由 a 或 a 启动,不应用于正常的 、 和 操作。
只允许使用此子元素 or。
也可以在此处使用 Spring 命名空间配置事务。Advice forceComplete group-timeout(-expression) MessageGroupStoreReaper add release discard <expire-transactional/> Advice tx |
即将到期的组
有两个属性与过期(完全删除)组相关。
当一个组过期时,没有它的记录,如果有新消息以相同的相关性到达,则会启动一个新组。
当组完成(没有过期)时,空组将保留,延迟到达的消息将被丢弃。
稍后可以通过将空组与属性结合使用来删除。
如果组未正常完成,但因超时而被释放或丢弃,则该组通常已过期。
从版本 4.1 开始,您可以使用 来控制此行为。
它默认为向后兼容。
从版本 5.0 开始,空组也计划在 之后删除。
如果 和 ,则在正常或部分序列释放发生时安排删除组的任务。 从版本 5.4 开始,可以将聚合器(和重排序器)配置为使孤立组(持久性消息存储中可能不会释放的组)过期。
(如果大于 )指示应清除存储中早于此值的组。
该方法在启动时调用,并与提供的 一起定期在计划任务中调用。
此方法也可以随时对外部调用。
根据上面提到的提供的过期选项,过期逻辑完全委托给功能。
当需要从那些不再使用常规消息到达逻辑发布的旧组中清理消息存储时,这种定期清除功能非常有用。
在大多数情况下,这发生在应用程序重新启动后,当使用持久性消息组存储时。
该功能类似于计划任务,但在使用组超时而不是收割器时,提供了一种方便的方式来处理特定组件中的旧组。
必须专门为当前关联终结点提供。
否则,一个聚合器可能会从另一个聚合器中清除组。
使用聚合器时,使用此技术过期的组将被丢弃或作为部分组发布,具体取决于属性。 |
当组超时时,将再获得一次释放组的机会。
如果这样做并且是 false,则过期时间由 控制。
如果在超时期间释放策略未释放组,则过期时间由 .
超时组要么被丢弃,要么发生部分释放(基于 )。ReleaseStrategy expire-groups-upon-timeout expire-groups-upon-completion expire-groups-upon-timeout send-partial-result-on-expiry |
不允许在同一配置中同时使用属性和内 Bean 定义,因为这会造成不明确的条件。
在这种情况下,会引发异常。ref <aggregator> |
只要这样做有意义,发布策略方法和聚合器方法都可以组合到一个 Bean 中。 |
只要这样做有意义,发布策略方法、关联策略方法和聚合器方法都可以组合在一个 Bean 中。 (实际上,它们全部或其中任何两个都可以组合在一起。 |
在 5.0 版之前的版本中,根对象是 的集合,如前面的示例所示:Message<?> |
1 | 指示此方法应用作聚合器的注释。 如果将此类用作聚合器,则必须指定它。 |
2 | 指示此方法用作聚合器的发布策略的注释。
如果任何方法上都不存在,则聚合器使用 .SimpleSequenceSizeReleaseStrategy |
3 | 指示此方法应用作聚合器的关联策略的注释。
如果未指示关联策略,则聚合器使用基于 .HeaderAttributeCorrelationStrategy CORRELATION_ID |
从版本 4.2 开始,可用于简化 的 Java 配置。AggregatorFactoryBean AggregatingMessageHandler |
在聚合器中管理状态:MessageGroupStore
聚合器(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它要求根据一段时间内到达的一组消息做出决策,所有消息都具有相同的相关键。
有状态模式(如 )中的接口设计由以下原则驱动:组件(无论是由框架还是由用户定义)都应该能够保持无状态。
所有状态都由 the 携带,其管理委托给 .
接口定义如下:ReleaseStrategy
MessageGroup
MessageGroupStore
MessageGroupStore
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
在等待触发发布策略时,会累积状态信息,并且该事件可能永远不会发生。
因此,为了防止过时的消息徘徊,并使易失性存储在应用程序关闭时提供用于清理的钩子,允许您注册回调以在它们过期时应用于它。
界面非常简单,如以下列表所示:MessageGroupStore
MessageGroups
MessageGroupStore
MessageGroups
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。
维护这些回调的列表,根据需要将其应用于时间戳早于作为参数提供的时间的所有消息(请参阅前面描述的 and 方法)。MessageGroupStore
registerMessageGroupExpiryCallback(..)
expireMessageGroups(..)
当您打算依赖该功能时,请不要在不同的聚合器组件中使用相同的实例,这一点很重要。
每个都根据回调注册自己的。
这样,每个过期的组都可能被错误的聚合器完成或丢弃。
从版本 5.0.10 开始,将 a 用于 的 注册回调。
反过来,该 检查此类的实例是否存在,如果回调集中已存在错误,则使用适当的消息记录错误。
这样,框架不允许在不同的聚合器/重排序器中使用实例,以避免上述未由特定关联处理程序创建的组过期的副作用。MessageGroupStore expireMessageGroups AbstractCorrelatingMessageHandler MessageGroupCallback forceComplete() UniqueExpiryCallback AbstractCorrelatingMessageHandler MessageGroupStore MessageGroupStore MessageGroupStore |
可以使用超时值调用该方法。
任何早于当前时间减去此值的消息都已过期,并应用了回调。
因此,是存储的用户定义消息组“过期”的含义。expireMessageGroups
为了方便用户,Spring Integration 以 的形式为消息过期提供了一个包装器,如以下示例所示:MessageGroupStoreReaper
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是.
在前面的示例中,每 10 秒调用一次消息组存储的 expire 方法。
超时本身为 30 秒。Runnable
请务必了解 的“timeout”属性是一个近似值,并且受任务计划程序的速率影响,因为此属性仅在任务的下一次计划执行时检查。
例如,如果超时设置为 10 分钟,但任务计划为每小时运行一次,并且任务的最后一次执行发生在超时前 1 分钟,则在接下来的 59 分钟内不会过期。
因此,我们建议将速率设置为至少等于超时值或更短。MessageGroupStoreReaper MessageGroupStoreReaper MessageGroupStoreReaper MessageGroupStoreReaper MessageGroup |
除了收割者之外,当应用程序关闭时,也会通过 .AbstractCorrelatingMessageHandler
注册自己的过期回调,这是聚合器的 XML 配置中带有布尔标志的链接。
如果该标志设置为 ,则在调用过期回调时,组中尚未释放的任何未标记消息都可以发送到输出通道。AbstractCorrelatingMessageHandler
send-partial-result-on-expiry
true
由于 是从计划任务调用的,并且可能导致将消息(取决于选项)生成到下游集成流,因此建议通过 提供具有 to 处理程序异常的自定义,这是常规聚合器发布功能所期望的。
同样的逻辑也适用于组超时功能,该功能也依赖于 .
有关详细信息,请参阅错误处理。MessageGroupStoreReaper sendPartialResultOnExpiry TaskScheduler MessagePublishingErrorHandler errorChannel TaskScheduler |
当共享用于不同的关联终结点时,必须配置适当的参数以确保组 ID 的唯一性。
否则,当一个关联终结点释放来自其他关联终结点的消息或使其他关联终结点的消息过期时,可能会发生意外行为。
具有相同关联键的消息存储在同一消息组中。 某些实现允许通过对数据进行分区来使用相同的物理资源。
例如,具有属性,而 具有属性。 有关接口及其实现的详细信息,请参阅消息存储库。 |
当您打算依赖该功能时,请不要在不同的聚合器组件中使用相同的实例,这一点很重要。
每个都根据回调注册自己的。
这样,每个过期的组都可能被错误的聚合器完成或丢弃。
从版本 5.0.10 开始,将 a 用于 的 注册回调。
反过来,该 检查此类的实例是否存在,如果回调集中已存在错误,则使用适当的消息记录错误。
这样,框架不允许在不同的聚合器/重排序器中使用实例,以避免上述未由特定关联处理程序创建的组过期的副作用。MessageGroupStore expireMessageGroups AbstractCorrelatingMessageHandler MessageGroupCallback forceComplete() UniqueExpiryCallback AbstractCorrelatingMessageHandler MessageGroupStore MessageGroupStore MessageGroupStore |
请务必了解 的“timeout”属性是一个近似值,并且受任务计划程序的速率影响,因为此属性仅在任务的下一次计划执行时检查。
例如,如果超时设置为 10 分钟,但任务计划为每小时运行一次,并且任务的最后一次执行发生在超时前 1 分钟,则在接下来的 59 分钟内不会过期。
因此,我们建议将速率设置为至少等于超时值或更短。MessageGroupStoreReaper MessageGroupStoreReaper MessageGroupStoreReaper MessageGroupStoreReaper MessageGroup |
由于 是从计划任务调用的,并且可能导致将消息(取决于选项)生成到下游集成流,因此建议通过 提供具有 to 处理程序异常的自定义,这是常规聚合器发布功能所期望的。
同样的逻辑也适用于组超时功能,该功能也依赖于 .
有关详细信息,请参阅错误处理。MessageGroupStoreReaper sendPartialResultOnExpiry TaskScheduler MessagePublishingErrorHandler errorChannel TaskScheduler |
当共享用于不同的关联终结点时,必须配置适当的参数以确保组 ID 的唯一性。
否则,当一个关联终结点释放来自其他关联终结点的消息或使其他关联终结点的消息过期时,可能会发生意外行为。
具有相同关联键的消息存储在同一消息组中。 某些实现允许通过对数据进行分区来使用相同的物理资源。
例如,具有属性,而 具有属性。 有关接口及其实现的详细信息,请参阅消息存储库。 |
助焊剂聚合器
在版本 5.2 中,引入了该组件。
它基于项目反应堆和操作员。
传入的消息被发送到由此组件的构造函数发起的 in 中。
如果未提供或它不是 的实例,则从实现中完成对 main 的订阅。
否则,它将推迟到实现完成的订阅。
消息按 using a 作为组键进行分组。
默认情况下,会参考消息的标头。FluxAggregatorMessageHandler
Flux.groupBy()
Flux.window()
FluxSink
Flux.create()
outputChannel
ReactiveStreamsSubscribableChannel
Flux
Lifecycle.start()
ReactiveStreamsSubscribableChannel
Flux.groupBy()
CorrelationStrategy
IntegrationMessageHeaderAccessor.CORRELATION_ID
默认情况下,每个关闭的窗口都作为要生成的消息的有效负载发布。
此消息包含窗口中第一条消息的所有标头。
输出消息有效负载中的此内容必须订阅并在下游处理。
这样的逻辑可以通过 的配置选项进行自定义(或取代)。
例如,如果我们想在最后一条消息中有一个有效负载,我们可以配置一个这样的:Flux
Flux
setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
FluxAggregatorMessageHandler
List
Flux.collectList()
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
有几个选项可以选择适当的窗口策略:FluxAggregatorMessageHandler
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到运算符。 有关更多信息,请参阅其 JavaDocs。 优先于所有其他窗口选项。Flux.windowUntil()
-
setWindowSize(int)
和 - 传播到 或 。 默认情况下,窗口大小是根据组中的第一条消息及其标头计算的。setWindowSizeFunction(Function<Message<?>, Integer>)
Flux.window(int)
windowTimeout(int, Duration)
IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
-
setWindowTimespan(Duration)
- 传播到 OR,具体取决于窗口大小配置。Flux.window(Duration)
windowTimeout(int, Duration)
-
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一个函数,用于将转换应用于公开选项未涵盖的任何自定义窗口操作的分组通量。
由于此组件是一个实现,因此可以简单地将其与消息传递注释一起用作定义。
对于 Java DSL,可以从 EIP 方法使用它。
下面的示例演示了如何在运行时注册 an 以及如何将 a 与上游的拆分器相关联:MessageHandler
@Bean
@ServiceActivator
.handle()
IntegrationFlow
FluxAggregatorMessageHandler
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);