企业集成模式 (EIP) 一书标识了几种能够缓冲消息的模式。
例如,聚合器缓冲消息,直到可以释放消息,而缓冲消息,直到使用者从该通道显式接收这些消息。
由于消息流中的任何点都可能发生故障,因此缓冲消息的 EIP 组件也会引入消息可能丢失的点。QueueChannel
为了降低丢失消息的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常存储在某种类型的持久存储(例如 RDBMS)中。
Spring Integration 通过以下方式提供对消息存储模式的支持:
-
定义策略接口
org.springframework.integration.store.MessageStore
-
提供此接口的多个实现
-
在能够缓冲消息的所有组件上公开属性,以便可以注入实现接口的任何实例。
message-store
MessageStore
有关如何配置特定消息存储库实现以及如何将实现注入特定缓冲组件的详细信息,本手册中都有介绍(请参阅特定组件,例如 QueueChannel、Aggregator、Delayer 等)。
以下一对示例演示如何为 a 和 aggregator 添加对消息存储库的引用:MessageStore
QueueChannel
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息使用 存储在内存中,这是 的实现。
这对于开发或简单的低容量环境来说可能很好,在这些环境中,非持久性消息的潜在丢失不是问题。
但是,典型的生产应用程序需要更可靠的选项,不仅要降低消息丢失的风险,还要避免潜在的内存不足错误。
因此,我们还为各种数据存储提供了实现。
以下是受支持的实现的完整列表:o.s.i.store.SimpleMessageStore
MessageStore
MessageStore
-
Hazelcast 消息存储库:使用 Hazelcast 分布式缓存来存储消息
-
JDBC 消息存储库:使用 RDBMS 存储消息
-
Redis 消息存储库:使用 Redis 键/值数据存储存储来存储消息
-
MongoDB 消息存储库:使用 MongoDB 文档存储存储消息
但是,在使用 . 的持久实现时,请注意一些限制。 消息数据(有效负载和标头)使用不同的序列化策略进行序列化和反序列化,具体取决于 .
例如,使用 时,默认情况下仅保留数据。
在这种情况下,在序列化发生之前,将删除不可序列化的标头。
此外,请注意传输适配器(如 FTP、HTTP、JMS 等)注入的特定于协议的标头。
例如,将 HTTP 标头映射到消息标头中,其中一个是不可序列化的实例。
但是,您可以将自己的 和 策略接口的实现注入到某些实现(例如 )中,以更改序列化和反序列化的行为。 请特别注意表示某些类型数据的标头。
例如,如果其中一个标头包含某个 Spring Bean 的实例,则在反序列化时,您最终可能会得到该 Bean 的另一个实例,这直接影响框架创建的一些隐式标头(例如 或 )。
目前,它们不可序列化,但即使可序列化,反序列化通道也不表示预期的实例。 从 Spring Integration 版本 3.0 开始,您可以使用配置为在向 . 此外,请考虑配置消息流时发生的情况,如下所示:网关→队列通道(由持久性消息存储库支持)→服务激活器。
该网关创建一个临时应答通道,该通道在服务激活器的轮询器从队列中读取时丢失。
同样,您可以使用标头 enricher 将标头替换为表示形式。 有关详细信息,请参阅标头扩充器。 |
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore
:实现特定于实例的操作QueueChannel
-
PriorityCapableChannelMessageStore
:标记要用于实例的实现,并为持久化消息提供优先级顺序。MessageStore
PriorityChannel
实际行为取决于实现。
该框架提供了以下实现,可以用作 和 的持久性:MessageStore
QueueChannel
PriorityChannel
注意事项
SimpleMessageStore 从版本 4.1 开始,调用 .
对于大型消息组来说,这是一个严重的性能问题。
4.0.1 引入了一个布尔属性,可用于控制此行为。
聚合器在内部使用时,此属性设置为提高性能。
现在是默认的。 现在,在组件(如聚合器)之外访问组存储的用户将获得对聚合器正在使用的组的直接引用,而不是副本。 在聚合器之外操纵组可能会导致不可预知的结果。 因此,不应执行此类操作或将属性设置为 。 |
但是,在使用 . 的持久实现时,请注意一些限制。 消息数据(有效负载和标头)使用不同的序列化策略进行序列化和反序列化,具体取决于 .
例如,使用 时,默认情况下仅保留数据。
在这种情况下,在序列化发生之前,将删除不可序列化的标头。
此外,请注意传输适配器(如 FTP、HTTP、JMS 等)注入的特定于协议的标头。
例如,将 HTTP 标头映射到消息标头中,其中一个是不可序列化的实例。
但是,您可以将自己的 和 策略接口的实现注入到某些实现(例如 )中,以更改序列化和反序列化的行为。 请特别注意表示某些类型数据的标头。
例如,如果其中一个标头包含某个 Spring Bean 的实例,则在反序列化时,您最终可能会得到该 Bean 的另一个实例,这直接影响框架创建的一些隐式标头(例如 或 )。
目前,它们不可序列化,但即使可序列化,反序列化通道也不表示预期的实例。 从 Spring Integration 版本 3.0 开始,您可以使用配置为在向 . 此外,请考虑配置消息流时发生的情况,如下所示:网关→队列通道(由持久性消息存储库支持)→服务激活器。
该网关创建一个临时应答通道,该通道在服务激活器的轮询器从队列中读取时丢失。
同样,您可以使用标头 enricher 将标头替换为表示形式。 有关详细信息,请参阅标头扩充器。 |
注意事项
SimpleMessageStore 从版本 4.1 开始,调用 .
对于大型消息组来说,这是一个严重的性能问题。
4.0.1 引入了一个布尔属性,可用于控制此行为。
聚合器在内部使用时,此属性设置为提高性能。
现在是默认的。 现在,在组件(如聚合器)之外访问组存储的用户将获得对聚合器正在使用的组的直接引用,而不是副本。 在聚合器之外操纵组可能会导致不可预知的结果。 因此,不应执行此类操作或将属性设置为 。 |
用MessageGroupFactory
从版本 4.3 开始,可以注入一些自定义策略来创建和自定义 .
这默认为 ,它基于 () 内部集合生成实例。
其他可能的选项是 和 ,其中最后一个选项可用于恢复以前的行为。
此外,该选项可用。
有关详细信息,请参阅下一节。
从版本 5.0.1 开始,当组中消息的顺序和唯一性无关紧要时,该选项也可用。MessageGroupStore
MessageGroupFactory
MessageGroup
MessageGroupStore
SimpleMessageGroupFactory
SimpleMessageGroup
GroupType.HASH_SET
LinkedHashSet
SYNCHRONISED_SET
BLOCKING_QUEUE
SimpleMessageGroup
PERSISTENT
LIST
持久性和延迟负载MessageGroupStore
从版本 4.3 开始,所有持久性实例都以延迟加载方式从存储中检索实例及其实例。
在大多数情况下,它对于关联实例(请参阅 Aggregator 和 Resequencer)很有用,因为它会增加开销以在每个关联操作上从存储中加载整个 ex。MessageGroupStore
MessageGroup
messages
MessageHandler
MessageGroup
您可以使用该选项从配置中关闭延迟加载行为。AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
我们在 MongoDB(MongoDB 消息存储库)和(聚合器)上对延迟加载的性能测试使用类似于以下内容的自定义:MessageStore
<aggregator>
release-strategy
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它为 1000 条简单消息生成类似于以下内容的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
但是,从版本 5.5 开始,所有持久性实现都提供基于目标数据库流式处理 API 的协定。
当存储中的组非常大时,这可以提高资源利用率。
在框架内部,这个新的 API 在 Delayer 中使用(例如),当它在启动时重新调度持久化消息时。
返回的必须在处理结束时关闭,例如通过自动关闭。
每当使用 a 时,其委托给 .MessageGroupStore
streamMessagesForGroup(Object groupId)
Stream<Message<?>>
try-with-resources
PersistentMessageGroup
streamMessages()
MessageGroupStore.streamMessagesForGroup()
消息组条件
从 5.5 版开始,抽象提供了一个字符串选项。
此选项的值可以是以后出于任何原因可以分析的任何内容,以便为组做出决策。
例如,来自关联消息处理程序的 a 可以从组中查询此属性,而不是循环访问组中的所有消息。
公开一个 API。
为此,已向 .
将每条消息添加到组后,将根据该消息以及该组的现有条件评估此函数。
实现可以决定返回一个新值(现有值)或将目标条件重置为 。
a 的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。
例如,from the File Aggregator 组件将条件从消息标头填充到组中,并通过将组大小与此条件中的值进行比较来咨询它。
这样,它就不会迭代组中的所有消息来查找带有标题的消息。
它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。MessageGroup
condition
ReleaseStrategy
MessageGroupStore
setGroupCondition(Object groupId, String condition)
setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
AbstractCorrelatingMessageHandler
null
condition
FileMarkerReleaseStrategy
FileHeaders.LINE_COUNT
FileSplitter.FileMarker.Mark.END
canRelease()
FileSplitter.FileMarker.Mark.END
FileHeaders.LINE_COUNT
此外,为了方便配置,还引入了合约。
检查提供的接口是否实现此接口,并提取组条件评估逻辑。GroupConditionProvider
AbstractCorrelatingMessageHandler
ReleaseStrategy
conditionSupplier