消息存储

Enterprise Integration Patterns (EIP) 一书确定了几种能够缓冲消息的模式。 例如,聚合器会缓冲消息,直到它们可以被释放,而 会缓冲消息,直到使用者明确地从该通道接收这些消息。 由于消息流中的任何点都可能发生故障,因此缓冲消息的 EIP 组件也会引入消息可能丢失的点。QueueChannelspring-doc.cn

为了降低丢失消息的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常存储在某种类型的持久性存储(例如 RDBMS)中。spring-doc.cn

Spring 集成通过以下方式为消息存储模式提供支持:spring-doc.cn

  • 定义策略接口org.springframework.integration.store.MessageStorespring-doc.cn

  • 提供此接口的多种实现spring-doc.cn

  • 在具有缓冲消息功能的所有组件上公开属性,以便您可以注入实现该接口的任何实例。message-storeMessageStorespring-doc.cn

手册中详细介绍了如何配置特定的消息存储实现以及如何将实现注入到特定的缓冲组件中(请参阅特定组件,例如 QueueChannelAggregatorDelayer 等)。 以下一对示例显示了如何为 和 聚合器添加对消息存储的引用:MessageStoreQueueChannelspring-doc.cn

QueueChannel 队列频道
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
聚合
<int:aggregator message-store="refToMessageStore"/>

默认情况下,消息使用 、 的实现存储在内存中。 这对于开发或简单的低容量环境来说可能很好,在这些环境中,非持久性消息的潜在丢失并不是一个问题。 但是,典型的生产应用程序需要一个更健壮的选项,这不仅是为了降低消息丢失的风险,也是为了避免潜在的内存不足错误。 因此,我们还为各种数据存储提供了实现。 以下是支持的实施的完整列表:o.s.i.store.SimpleMessageStoreMessageStoreMessageStorespring-doc.cn

但是,在使用 .MessageStorespring-doc.cn

Message 数据(payload 和 headers)使用不同的序列化策略进行序列化和反序列化,具体取决于 . 例如,使用 时,默认情况下仅保留数据。 在这种情况下,在序列化发生之前会删除 non-Serializable headers。 此外,请注意传输适配器(如 FTP、HTTP、JMS 等)注入的特定于协议的标头。 例如,将 HTTP 标头映射到消息标头,其中一个是不可序列化的实例。 但是,您可以将自己的 和 strategy 接口的实现注入到某些实现(例如 )中,以更改序列化和反序列化的行为。MessageStoreJdbcMessageStoreSerializable<http:inbound-channel-adapter/>ArrayListorg.springframework.http.MediaTypeSerializerDeserializerMessageStoreJdbcMessageStorespring-doc.cn

请特别注意表示某些类型数据的标头。 例如,如果其中一个 Headers 包含某个 Spring bean 的实例,则在反序列化时,您最终可能会得到该 Bean 的不同实例,这直接影响框架创建的某些隐式 Headers(例如 or )。 目前,它们是不可序列化的,但是,即使它们是可序列化的,反序列化的通道也不会表示预期的实例。REPLY_CHANNELERROR_CHANNELspring-doc.cn

从 Spring 集成版本 3.0 开始,您可以通过配置标头丰富器来解决此问题,以便在向 .HeaderChannelRegistryspring-doc.cn

此外,请考虑按如下方式配置 message-flow 时会发生什么:gateway → queue-channel(由持久 Message Store 支持)→ service-activator。 该网关会创建一个临时回复通道,当服务激活器的 Poller 从队列中读取时,该通道将丢失。 同样,您可以使用 header enricher 将 Headers 替换为表示形式。Stringspring-doc.cn

有关更多信息,请参阅 Header Enricherspring-doc.cn

Spring Integration 4.0 引入了两个新接口:spring-doc.cn

  • ChannelMessageStore:实现特定于实例的操作QueueChannelspring-doc.cn

  • PriorityCapableChannelMessageStore:标记要用于实例的实现,并为持久化消息提供优先级顺序。MessageStorePriorityChannelspring-doc.cn

实际行为取决于实现。 该框架提供了以下实现,这些实现可用作 和 的持久性 :MessageStoreQueueChannelPriorityChannelspring-doc.cn

注意事项SimpleMessageStore

从版本 4.1 开始,在调用 . 对于大型消息组,这是一个严重的性能问题。 4.0.1 引入了一个 boolean 属性,可让您控制此行为。 当聚合器在内部使用时,此属性设置为以提高性能。 现在是默认的。SimpleMessageStoregetMessageGroup()copyOnGetfalsefalsespring-doc.cn

现在,在组件(如聚合器)之外访问组存储的用户可以直接引用聚合器正在使用的组,而不是副本。 在聚合器外部操纵组可能会导致不可预知的结果。spring-doc.cn

因此,您不应执行此类操作或将属性设置为 。copyOnGettruespring-doc.cn

MessageGroupFactory

从版本 4.3 开始,一些实现可以注入自定义策略来创建和自定义 . 这默认为 a ,它基于 () 内部集合生成实例。 其他可能的选项包括 和 ,其中最后一个选项可用于恢复之前的行为。 此外,该选项也可用。 有关更多信息,请参阅下一节。 从版本 5.0.1 开始,当组中消息的顺序和唯一性无关紧要时,该选项也可用。MessageGroupStoreMessageGroupFactoryMessageGroupMessageGroupStoreSimpleMessageGroupFactorySimpleMessageGroupGroupType.HASH_SETLinkedHashSetSYNCHRONISED_SETBLOCKING_QUEUESimpleMessageGroupPERSISTENTLISTspring-doc.cn

持久加载和延迟加载MessageGroupStore

从版本 4.3 开始,所有持久性实例都以延迟加载方式从存储中检索实例及其实例。 在大多数情况下,它对于关联实例(请参阅 AggregatorResequencer)很有用,此时它会增加在每个关联操作上从存储加载整个的开销。MessageGroupStoreMessageGroupmessagesMessageHandlerMessageGroupspring-doc.cn

您可以使用该选项从配置中关闭延迟加载行为。AbstractMessageGroupStore.setLazyLoadMessageGroups(false)spring-doc.cn

我们对 MongoDB (MongoDB Message Store) 和 (Aggregator) 上的延迟加载性能测试使用类似于以下内容的自定义:MessageStore<aggregator>release-strategyspring-doc.cn

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

对于 1000 条简单消息,它会生成类似于以下内容的结果:spring-doc.cn

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

但是,从版本 5.5 开始,所有持久化实现都提供基于目标数据库流式处理 API 的协定。 当 store 中的组非常大时,这可以提高资源利用率。 在框架内部,当 Delayer 在启动时重新安排持久消息时,将使用此新 API(例如)。 返回的 A 必须在处理结束时关闭,例如通过 . 每当使用 a 时,它都会委托给 .MessageGroupStorestreamMessagesForGroup(Object groupId)Stream<Message<?>>try-with-resourcesPersistentMessageGroupstreamMessages()MessageGroupStore.streamMessagesForGroup()spring-doc.cn

消息组条件

从版本 5.5 开始,抽象提供了一个 string 选项。 此选项的值可以是以后可以出于任何原因解析以为组做出决策的任何内容。 例如,来自关联消息处理程序的 a 可以从组中查阅此属性,而不是迭代组中的所有消息。 这暴露了一个 API。 为此,已向 . 在将每条消息添加到组后,将根据每条消息以及组的现有条件评估此函数。 实现可以决定返回新值、现有值或将目标条件重置为 。 的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。 例如,从 File Aggregator 组件,将条件从消息的标头填充到组中,并通过将组大小与此条件中的值进行比较来咨询该条件。 这样,它不会迭代组中的所有消息来查找带有 header 的消息。 它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。MessageGroupconditionReleaseStrategyMessageGroupStoresetGroupCondition(Object groupId, String condition)setGroupConditionSupplier(BiFunction<Message<?>, String, String>)AbstractCorrelatingMessageHandlernullconditionFileMarkerReleaseStrategyFileHeaders.LINE_COUNTFileSplitter.FileMarker.Mark.ENDcanRelease()FileSplitter.FileMarker.Mark.ENDFileHeaders.LINE_COUNTspring-doc.cn

此外,为了便于配置,还引入了一个 Contract。 检查提供的是否实现了此接口,并提取了 for group 条件评估逻辑。GroupConditionProviderAbstractCorrelatingMessageHandlerReleaseStrategyconditionSupplierspring-doc.cn