系统管理

系统管理

指标和管理

本节介绍如何捕获 Spring Integration 的指标。 在最近的版本中,我们更加依赖 Micrometer(参见 https://micrometer.io),我们计划在未来的版本中更多地使用 Micrometer。spring-doc.cadn.net.cn

在高容量环境中禁用日志记录

您可以在主消息流中控制调试日志记录。 在非常高容量的应用程序中,对isDebugEnabled()对于某些日志记录子系统来说,可能会非常昂贵。 您可以禁用所有此类日志记录以避免此开销。 异常日志记录 (debug 或其他) 不受此设置的影响。spring-doc.cadn.net.cn

以下清单显示了用于控制日志记录的可用选项:spring-doc.cadn.net.cn

Java
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
    defaultLoggingEnabled = "true" <1>)

public static class ContextConfiguration {
...
}
XML 格式
<int:management default-logging-enabled="true"/> (1)
1 设置为false以禁用主消息流中的所有日志记录,而不管 Log System Category (日志系统类别) 设置如何。 设置为 'true' 以启用调试日志记录(如果日志记录子系统也启用了)。 仅在未在 Bean 定义中显式配置设置时应用。 默认值为true.
defaultLoggingEnabled仅当尚未在 Bean 定义中显式配置相应的设置时,才适用。

千分尺集成

概述

从版本 5.0.3 开始,存在 Micrometer MeterRegistry在应用程序上下文中触发对 Micrometer 度量的支持。spring-doc.cadn.net.cn

要使用 Micrometer,请添加MeterRegistrybean 添加到应用程序上下文中。spring-doc.cadn.net.cn

对于每个MessageHandlerMessageChannel,则注册计时器。 对于每个MessageSource时,将注册一个计数器。spring-doc.cadn.net.cn

这仅适用于扩展AbstractMessageHandler,AbstractMessageChannelAbstractMessageSource(大多数框架组件都是这种情况)。spring-doc.cadn.net.cn

Timer消息通道上的发送作计量具有以下名称或标签:spring-doc.cadn.net.cn

(一个failureresult 替换为noneexception 表示通道的send()作返回false.)spring-doc.cadn.net.cn

Counter用于轮询消息通道上的接收作的计量器具有以下名称或标签:spring-doc.cadn.net.cn

Timer消息处理程序上的作计量器具有以下名称或标签:spring-doc.cadn.net.cn

Counter消息源的 meters 具有以下名称/标签:spring-doc.cadn.net.cn

此外,还有三个Gauge米:spring-doc.cadn.net.cn

可以自定义Meters由集成组件创建,方法是提供MicrometerMetricsCaptor. MicrometerCustomMetricsTests 测试用例显示了如何执行此作的简单示例。 您还可以通过重载build()方法。spring-doc.cadn.net.cn

从版本 5.1.13 开始,QueueChannel公开 Micrometer 仪表的队列大小和剩余容量:spring-doc.cadn.net.cn

禁用仪表

默认情况下,所有计量在首次使用时都会注册。 现在,使用 Micrometer,您可以添加MeterFilters 到MeterRegistry以防止部分或全部注册。 您可以按提供的任何属性过滤(拒绝)仪表。name,tag等。 有关更多信息,请参阅 Micrometer 文档中的 Meter Filtersspring-doc.cadn.net.cn

例如,给定:spring-doc.cadn.net.cn

@Bean
public QueueChannel noMeters() {
    return new QueueChannel(10);
}

您可以通过以下方式禁止此通道的计量注册:spring-doc.cadn.net.cn

registry.config().meterFilter(MeterFilter.deny(id ->
        "channel".equals(id.getTag("type")) &&
        "noMeters".equals(id.getTag("name"))));

千分尺观察

从版本 6.0 开始, Spring 集成利用了 Micrometer Observation 抽象,它可以处理度量以及通过适当的ObservationHandler配置。spring-doc.cadn.net.cn

IntegrationManagement组件,只要ObservationRegistrybean 存在于应用程序上下文中,并且@EnableIntegrationManagement已配置。 要自定义应插桩的组件集,请使用observationPatterns()属性在@EnableIntegrationManagement注解。 有关模式匹配算法,请参阅其 javadocs。spring-doc.cadn.net.cn

默认情况下,没有任何IntegrationManagement组件使用ObservationRegistry豆。 可以配置为匹配所有组件。*

在这种情况下,计量不是独立收集的,而是委托给适当的ObservationHandler在提供的ObservationRegistry.spring-doc.cadn.net.cn

以下 Spring 集成组件使用 observe logic 进行检测,每个组件都有各自的约定:spring-doc.cadn.net.cn

  • MessageProducerSupport作为流的入站终端节点,被视为CONSUMERspan 类型并使用IntegrationObservation.HANDLER应用程序接口;spring-doc.cadn.net.cn

  • MessagingGatewaySupport' 是一个入站请求-回复终端节点,被视为SERVERspan 类型。 它使用IntegrationObservation.GATEWAY应用程序接口;spring-doc.cadn.net.cn

  • AbstractMessageChannel.send()operation是唯一生成消息的 Spring 集成 API。 因此,它被视为PRODUCERspan 类型并使用IntegrationObservation.PRODCUER应用程序接口。 当通道是分布式实现时(例如PublishSubscribeKafkaChannelZeroMqChannel),并且必须将跟踪信息添加到消息中。 因此,IntegrationObservation.PRODUCER观察基于MessageSenderContext其中 Spring Integration 提供了一个MutableMessage允许后续跟踪Propagator添加标头,以便使用者可以使用它们;spring-doc.cadn.net.cn

  • AbstractMessageHandler是一个CONSUMERspan 类型并使用IntegrationObservation.HANDLER应用程序接口。spring-doc.cadn.net.cn

IntegrationManagement组件可以通过ObservationConvention配置。 例如,AbstractMessageHandler期望MessageReceiverObservationConvention通过其setObservationConvention()应用程序接口。spring-doc.cadn.net.cn

以下是观察 API 支持的指标、跨度和约定:spring-doc.cadn.net.cn

可观测性 - 指标

您可以在下面找到此项目声明的所有指标的列表。spring-doc.cadn.net.cn

网关

对入站消息网关的观察。spring-doc.cadn.net.cn

指标名称 spring.integration.gateway(由 convention 类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention).类型 timer.spring-doc.cadn.net.cn

指标名称 spring.integration.gateway.active(由 convention 类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention).类型 long task timer.spring-doc.cadn.net.cn

*.active 指标中可能缺少在开始观察后添加的 KeyValue。
千分尺内部使用nanoseconds对于 baseUnit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒)

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签都必须以spring.integration.前缀!
表 1.低基数键

spring.integration.name (必填)spring-doc.cadn.net.cn

消息网关组件的名称。spring-doc.cadn.net.cn

spring.integration.outcome (必填)spring-doc.cadn.net.cn

请求/回复执行的结果。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件类型 - 'gateway'。spring-doc.cadn.net.cn

处理器

消息处理程序的观察。spring-doc.cadn.net.cn

指标名称 spring.integration.handler(由 convention 类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention).类型 timer.spring-doc.cadn.net.cn

指标名称 spring.integration.handler.active(由 convention 类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention).类型 long task timer.spring-doc.cadn.net.cn

*.active 指标中可能缺少在开始观察后添加的 KeyValue。
千分尺内部使用nanoseconds对于 baseUnit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒)

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签都必须以spring.integration.前缀!
表 2.低基数键

spring.integration.name (必填)spring-doc.cadn.net.cn

消息处理程序组件的名称。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件的类型 - 'handler'。spring-doc.cadn.net.cn

制作人

对消息生产者(例如通道)的观察。spring-doc.cadn.net.cn

指标名称 spring.integration.producer(由 convention 类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention).类型 timer.spring-doc.cadn.net.cn

指标名称 spring.integration.producer.active(由 convention 类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention).类型 long task timer.spring-doc.cadn.net.cn

*.active 指标中可能缺少在开始观察后添加的 KeyValue。
千分尺内部使用nanoseconds对于 baseUnit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒)

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签都必须以spring.integration.前缀!
表 3.低基数键

spring.integration.name (必填)spring-doc.cadn.net.cn

消息处理程序组件的名称。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件的类型 - 'producer'。spring-doc.cadn.net.cn

可观测性 - Span

您可以在下面找到此项目声明的所有 span 的列表。spring-doc.cadn.net.cn

网关跨度

对入站消息网关的观察。spring-doc.cadn.net.cn

Span 名称 spring.integration.gateway(由 convention 类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention).spring-doc.cadn.net.cn

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签都必须以spring.integration.前缀!
表 4.标签键

名字spring-doc.cadn.net.cn

描述spring-doc.cadn.net.cn

spring.integration.name (必填)spring-doc.cadn.net.cn

消息网关组件的名称。spring-doc.cadn.net.cn

spring.integration.outcome (必填)spring-doc.cadn.net.cn

请求/回复执行的结果。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件类型 - 'gateway'。spring-doc.cadn.net.cn

处理程序 Span

消息处理程序的观察。spring-doc.cadn.net.cn

Span 名称 spring.integration.handler(由 convention 类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention).spring-doc.cadn.net.cn

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签都必须以spring.integration.前缀!
表 5.标签键

名字spring-doc.cadn.net.cn

描述spring-doc.cadn.net.cn

spring.integration.name (必填)spring-doc.cadn.net.cn

消息处理程序组件的名称。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件的类型 - 'handler'。spring-doc.cadn.net.cn

生产者跨度

对消息生产者(例如通道)的观察。spring-doc.cadn.net.cn

Span 名称 spring.integration.producer(由 convention 类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention).spring-doc.cadn.net.cn

封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation.spring-doc.cadn.net.cn

所有标签都必须以spring.integration.前缀!
表 6.标签键

名字spring-doc.cadn.net.cn

描述spring-doc.cadn.net.cn

spring.integration.name (必填)spring-doc.cadn.net.cn

消息处理程序组件的名称。spring-doc.cadn.net.cn

spring.integration.type (必填)spring-doc.cadn.net.cn

组件的类型 - 'producer'。spring-doc.cadn.net.cn

可观测性 - 约定

您可以在下面找到所有GlobalObservationConventionObservationConvention由此项目声明。spring-doc.cadn.net.cn

表 7.ObservationConvention 实现

ObservationConvention 类名spring-doc.cadn.net.cn

适用的 ObservationContext 类名spring-doc.cadn.net.cn

o.s.i.support.management.observation.DefaultMessageReceiverObservationConventionspring-doc.cadn.net.cn

MessageReceiverContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.MessageReceiverObservationConventionspring-doc.cadn.net.cn

MessageReceiverContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConventionspring-doc.cadn.net.cn

MessageRequestReplyReceiverContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.MessageRequestReplyReceiverObservationConventionspring-doc.cadn.net.cn

MessageRequestReplyReceiverContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.DefaultMessageSenderObservationConventionspring-doc.cadn.net.cn

MessageSenderContextspring-doc.cadn.net.cn

o.s.i.support.management.observation.MessageSenderObservationConventionspring-doc.cadn.net.cn

MessageSenderContextspring-doc.cadn.net.cn

观测传播

为了在一个跟踪中提供连接的 span 链,独立于消息传递流的性质, Spring 集成提供了一个ObservationPropagationChannelInterceptor实现。 这可以在MessageChannnelbean 或作为@GlobalChannelInterceptor与各自的MessageChannnelBean 名称模式匹配。 此拦截器的目标是将Observation从生产者线程到使用者线程,独立于MessageChannnel实施和性质。 一个DirectChannel,但是,由于其使用者直接在生产者线程上执行。spring-doc.cadn.net.cn

Spring 集成 JMX 支持

消息历史记录

消息传递体系结构的主要优点是松散耦合,因此参与的组件不会保持对彼此的任何感知。 仅这一事实就使应用程序非常灵活,允许您在不影响流的其余部分的情况下更改组件、更改消息传递路由、更改消息使用样式(轮询与事件驱动)等等。 然而,当出现问题时,这种不起眼的架构风格可能会很困难。 在调试时,您可能希望获得尽可能多的有关消息的信息 (其来源、它遍历的通道和其他详细信息)。spring-doc.cadn.net.cn

Message History 是一种有用的模式,它为您提供了一个选项来保持对消息路径的某种程度的感知,用于调试目的或维护审计跟踪。 Spring 集成提供了一种简单的方法来配置消息流以维护消息历史记录,方法是向消息添加 Headers 并在每次消息通过跟踪组件时更新该 Header。spring-doc.cadn.net.cn

消息历史记录配置

要启用消息历史记录,您只需定义message-history元素(或@EnableMessageHistory),如以下示例所示:spring-doc.cadn.net.cn

Java
@Configuration
@EnableIntegration
@EnableMessageHistory
XML 格式
<int:message-history/>

现在,每个命名组件(定义了 'id')都会被跟踪。 框架会在您的消息中设置 'history' 标头。 它的价值 aList<Properties>.spring-doc.cadn.net.cn

请考虑以下配置示例:spring-doc.cadn.net.cn

Java
@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
   ...
}

@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
    HeaderEnricher enricher =
           new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
    return enricher;
}
XML 格式
<int:gateway id="sampleGateway"
    service-interface="org.springframework.integration.history.sample.SampleGateway"
    default-request-channel="bridgeInChannel"/>

<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
    <int:header name="baz" value="baz"/>
</int:header-enricher>

前面的配置生成一个简单的消息历史记录结构,其输出类似于以下内容:spring-doc.cadn.net.cn

[{name=sampleGateway, type=gateway, timestamp=1283281668091},
 {name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]

要访问消息历史记录,您只需访问MessageHistory页眉。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

Iterator<Properties> historyIterator =
    message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));

您可能不想跟踪所有组件。 要根据名称将历史记录限制为某些组件,您可以提供tracked-components属性,并指定与要跟踪的组件匹配的组件名称和模式的逗号分隔列表。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

Java
@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
XML 格式
<int:message-history tracked-components="*Gateway, sample*, aName"/>

在前面的示例中,仅维护以“Gateway”结尾、以“sample”开头或与名称“aName”完全匹配的组件的消息历史记录。spring-doc.cadn.net.cn

此外,MessageHistoryConfigurerbean 现在由IntegrationMBeanExporter(请参阅 MBean Exporter),允许您在运行时更改模式。 但是请注意,必须停止 Bean (关闭消息历史记录) 才能更改模式。 此功能对于临时打开历史记录以分析系统可能很有用。 MBean 的对象名称为<domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer.spring-doc.cadn.net.cn

只有一个@EnableMessageHistory(或<message-history/>) 必须在应用程序上下文中声明为 Components tracking Configuration 的单一来源。 不要对MessageHistoryConfigurer.
根据定义,消息历史记录标头是不可变的(您不能重写历史记录)。 因此,在写入消息历史记录值时,组件要么创建新消息(当组件是源时),要么从请求消息中复制历史记录,对其进行修改并在回复消息上设置新列表。 在任一情况下,即使消息本身跨越线程边界,也可以追加这些值。 这意味着 history 值可以极大地简化异步消息流中的调试。

消息存储

Enterprise Integration Patterns (EIP) 一书确定了几种能够缓冲消息的模式。 例如,聚合器会缓冲消息,直到它们可以被释放,而QueueChannelbuffers 消息,直到使用者显式地从该通道接收这些消息。 由于消息流中的任何点都可能发生故障,因此缓冲消息的 EIP 组件也会引入消息可能丢失的点。spring-doc.cadn.net.cn

为了降低丢失消息的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常存储在某种类型的持久性存储(例如 RDBMS)中。spring-doc.cadn.net.cn

Spring 集成通过以下方式为消息存储模式提供支持:spring-doc.cadn.net.cn

有关如何配置特定消息存储实现以及如何注入MessageStore实现到特定的缓冲组件中都有介绍(请参阅特定组件,例如 QueueChannelAggregatorDelayer 等)。 以下一对示例显示了如何为QueueChannel对于聚合商:spring-doc.cadn.net.cn

示例 1.QueueChannel 队列频道
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
示例 2.聚合
<int:aggregator … message-store="refToMessageStore"/>

默认情况下,消息使用o.s.i.store.SimpleMessageStore,则MessageStore. 这对于开发或简单的低容量环境来说可能很好,在这些环境中,非持久性消息的潜在丢失并不是一个问题。 但是,典型的生产应用程序需要一个更健壮的选项,这不仅是为了降低消息丢失的风险,也是为了避免潜在的内存不足错误。 因此,我们还提供MessageStore各种数据存储的实现。 以下是支持的实施的完整列表:spring-doc.cadn.net.cn

但是,在使用MessageStore.spring-doc.cadn.net.cn

Message 数据(payload 和 headers)使用不同的序列化策略进行序列化和反序列化,具体取决于MessageStore. 例如,使用JdbcMessageStoreSerializable默认情况下,数据是持久的。 在这种情况下,在序列化发生之前会删除 non-Serializable headers。 此外,请注意传输适配器(如 FTP、HTTP、JMS 等)注入的特定于协议的标头。 例如<http:inbound-channel-adapter/>将 HTTP 标头映射到消息标头,其中一个是ArrayListof 不可序列化org.springframework.http.MediaType实例。 但是,您可以注入自己的SerializerDeserializerstrategy 接口转换为一些MessageStore实现(例如JdbcMessageStore) 更改序列化和反序列化的行为。spring-doc.cadn.net.cn

请特别注意表示某些类型数据的标头。 例如,如果其中一个 Headers 包含某个 Spring Bean 的实例,则在反序列化时,你最终可能会得到该 Bean 的不同实例,这会直接影响框架创建的一些隐式 Headers(例如REPLY_CHANNELERROR_CHANNEL). 目前,它们是不可序列化的,但是,即使它们是可序列化的,反序列化的通道也不会表示预期的实例。spring-doc.cadn.net.cn

从 Spring 集成版本 3.0 开始,你可以通过配置一个 Headers Enricher 来解决这个问题,以便在向HeaderChannelRegistry.spring-doc.cadn.net.cn

此外,请考虑按如下方式配置 message-flow 时会发生什么:gateway → queue-channel(由持久 Message Store 支持)→ service-activator。 该网关会创建一个临时回复通道,当服务激活器的 Poller 从队列中读取时,该通道将丢失。 同样,您可以使用标头扩充器将标头替换为String表示法。spring-doc.cadn.net.cn

有关更多信息,请参阅 Header Enricherspring-doc.cadn.net.cn

Spring Integration 4.0 引入了两个新接口:spring-doc.cadn.net.cn

  • ChannelMessageStore:要实现特定于QueueChannel实例spring-doc.cadn.net.cn

  • PriorityCapableChannelMessageStore:标记MessageStore要用于的 implementations to use forPriorityChannel实例,并为持久消息提供优先级顺序。spring-doc.cadn.net.cn

实际行为取决于实现。 该框架提供了以下实现,这些实现可以用作持久MessageStoreQueueChannelPriorityChannel:spring-doc.cadn.net.cn

注意事项SimpleMessageStore

从版本 4.1 开始,SimpleMessageStore调用getMessageGroup(). 对于大型消息组,这是一个严重的性能问题。 4.0.1 引入了布尔值copyOnGet允许您控制此行为的属性。 当聚合器在内部使用时,此属性设置为false以提高性能。 现在是false默认情况下。spring-doc.cadn.net.cn

现在,在组件(如聚合器)之外访问组存储的用户可以直接引用聚合器正在使用的组,而不是副本。 在聚合器外部纵组可能会导致不可预知的结果。spring-doc.cadn.net.cn

因此,您不应该执行此类作,或者将copyOnGetproperty 设置为true.spring-doc.cadn.net.cn

MessageGroupFactory

从版本 4.3 开始,一些MessageGroupStore实现可以注入自定义的MessageGroupFactory策略来创建和自定义MessageGroup实例MessageGroupStore. 这默认为SimpleMessageGroupFactory,它会产生SimpleMessageGroup基于GroupType.HASH_SET (LinkedHashSet) internal 集合。 其他可能的选项包括SYNCHRONISED_SETBLOCKING_QUEUE,其中最后一个可用于恢复之前的SimpleMessageGroup行为。 此外,PERSISTENT选项可用。 有关更多信息,请参阅下一节。 从版本 5.0.1 开始,LIST当组中消息的顺序和唯一性无关紧要时,选项也可用。spring-doc.cadn.net.cn

持续MessageGroupStore和 Lazy-load

从版本 4.3 开始,所有 persistentMessageGroupStore实例检索MessageGroup实例及其messages以 lazy-load 方式从 store 中。 在大多数情况下,它对关联MessageHandler实例(请参阅 AggregatorResequencer),这会增加加载整个MessageGroup从存储中执行每个关联作。spring-doc.cadn.net.cn

您可以使用AbstractMessageGroupStore.setLazyLoadMessageGroups(false)选项从配置中关闭 Lazy-load 行为。spring-doc.cadn.net.cn

我们对 MongoDB 上的延迟加载进行的性能测试MessageStore (MongoDB 消息存储)和<aggregator> (聚合器)使用自定义release-strategy类似于以下内容:spring-doc.cadn.net.cn

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

对于 1000 条简单消息,它会生成类似于以下内容的结果:spring-doc.cadn.net.cn

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

但是,从版本 5.5 开始,所有持久性MessageGroupStoreimplementations 提供了一个streamMessagesForGroup(Object groupId)Contract 的 Contract。 当 store 中的组非常大时,这可以提高资源利用率。 在框架内部,当 Delayer 在启动时重新安排持久消息时,将使用此新 API(例如)。 返回的Stream<Message<?>>必须在处理结束时关闭,例如通过try-with-resources. 每当PersistentMessageGroup,则其streamMessages()delegates 到MessageGroupStore.streamMessagesForGroup().spring-doc.cadn.net.cn

消息组条件

从版本 5.5 开始,MessageGroupabstraction 提供了一个conditionstring 选项。 此选项的值可以是以后可以出于任何原因解析以为组做出决策的任何内容。 例如,一个ReleaseStrategy关联消息处理程序可以从组中查询此属性,而不是迭代组中的所有消息。 这MessageGroupStore暴露一个setGroupCondition(Object groupId, String condition)应用程序接口。 为此,一个setGroupConditionSupplier(BiFunction<Message<?>, String, String>)选项已添加到AbstractCorrelatingMessageHandler. 在将每条消息添加到组后,将根据每条消息以及组的现有条件评估此函数。 实现可以决定返回新值、现有值或将目标条件重置为null. 的值condition可以是 JSON、SPEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。 例如,FileMarkerReleaseStrategyFile Aggregator 组件中,将条件填充到一个组中FileHeaders.LINE_COUNT标头的FileSplitter.FileMarker.Mark.END消息并从其canRelease()将组大小与此条件中的值进行比较。 这样,它不会迭代组中的所有消息来查找FileSplitter.FileMarker.Mark.ENDmessage 替换为FileHeaders.LINE_COUNT页眉。 它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。spring-doc.cadn.net.cn

此外,为了便于配置,一个GroupConditionProvider合同。 这AbstractCorrelatingMessageHandler检查提供的ReleaseStrategy实现此接口并提取conditionSupplier用于组条件评估逻辑。spring-doc.cadn.net.cn

元数据存储

许多外部系统、服务或资源不是事务性的(Twitter、RSS、文件系统等),并且无法将数据标记为已读。 此外,有时,您可能需要在某些集成解决方案中实现 Enterprise Integration Pattern 幂等接收器。 为了实现这个目标,并在下一次与外部系统交互之前存储端点的一些先前状态,或者为了处理下一条消息, Spring 集成提供了元数据存储组件作为org.springframework.integration.metadata.MetadataStore具有通用键值协定的接口。spring-doc.cadn.net.cn

元数据存储旨在存储各种类型的通用元数据(例如,已处理的最后一个源条目的发布日期),以帮助源适配器等组件处理重复项。 如果组件没有直接提供对MetadataStore中,查找元数据存储的算法如下:首先,查找具有metadataStoreID 中的应用程序上下文。 如果找到,请使用它。 否则,请创建一个新的SimpleMetadataStore,这是一种内存中实现,仅在当前运行的应用程序上下文的生命周期内保留元数据。 这意味着,在重新启动时,您最终可能会得到重复的条目。spring-doc.cadn.net.cn

如果您需要在应用程序上下文重新启动之间保留元数据,框架会提供以下持久性MetadataStores:spring-doc.cadn.net.cn

PropertiesPersistingMetadataStore由属性文件和PropertiesPersister.spring-doc.cadn.net.cn

默认情况下,它仅在应用程序上下文正常关闭时保留状态。 它实现了Flushable这样您就可以随意持久化状态,通过调用flush(). 以下示例显示了如何使用 XML 配置 'PropertiesPersistingMetadataStore':spring-doc.cadn.net.cn

<bean id="metadataStore"
    class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

或者,您也可以提供自己的MetadataStore接口(例如JdbcMetadataStore),并在应用程序上下文中将其配置为 Bean。spring-doc.cadn.net.cn

从版本 4.0 开始,SimpleMetadataStore,PropertiesPersistingMetadataStoreRedisMetadataStore实现ConcurrentMetadataStore. 这些实例提供原子更新,并且可以跨多个组件或应用程序实例使用。spring-doc.cadn.net.cn

幂等接收器和元数据存储

当需要筛选传入消息(如果已处理)时,元数据存储对于实施 EIP 幂等接收器模式非常有用,您可以丢弃该消息或在丢弃时执行一些其他逻辑。 以下配置显示了如何执行此作的示例:spring-doc.cadn.net.cn

<int:filter input-channel="serviceChannel"
			output-channel="idempotentServiceChannel"
			discard-channel="discardChannel"
			expression="@metadataStore.get(headers.businessKey) == null"/>

<int:publish-subscribe-channel id="idempotentServiceChannel"/>

<int:outbound-channel-adapter channel="idempotentServiceChannel"
                              expression="@metadataStore.put(headers.businessKey, '')"/>

<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>

value的 Timeout Entry可能是一个过期日期,在此之后,该条目应由某个预定的收割者从元数据存储中删除。spring-doc.cadn.net.cn

MetadataStoreListener

一些元数据存储(目前只有 zookeeper)支持注册侦听器以在项目更改时接收事件,如下例所示:spring-doc.cadn.net.cn

public interface MetadataStoreListener {

	void onAdd(String key, String value);

	void onRemove(String key, String oldValue);

	void onUpdate(String key, String newValue);
}

有关更多信息,请参阅 Javadoc。 这MetadataStoreListenerAdapter如果您只对事件的子集感兴趣,则可以将其子类化。spring-doc.cadn.net.cn

控制总线

正如 Enterprise Integration Patterns (EIP) 一书中描述的,控制总线背后的思想是,可以使用与用于 “应用程序级” 消息传递相同的消息传递系统来监控和管理框架内的组件。 在 Spring 集成中,我们构建在上述适配器的基础上,以便您可以发送消息作为调用公开作的一种方式。spring-doc.cadn.net.cn

以下示例说明如何使用 XML 配置控制总线:spring-doc.cadn.net.cn

<int:control-bus input-channel="operationChannel"/>

控制总线有一个 Importing 通道,可以访问该通道以调用应用程序上下文中的 bean 上的作。 它还具有服务激活终端节点的所有通用属性。 例如,如果作结果具有要发送到下游通道的返回值,则可以指定输出通道。spring-doc.cadn.net.cn

控制总线将 input 通道上的消息作为 Spring 表达式语言 (SpEL) 表达式运行。 它接受一条消息,将 body 编译为表达式,添加一些上下文,然后运行它。 默认上下文支持已使用@ManagedAttribute@ManagedOperation. 它还支持 Spring 的Lifecycle接口(及其Pausable扩展),并且它支持用于配置 Spring 的多个TaskExecutorTaskScheduler实现。 确保您自己的方法可用于控制总线的最简单方法是使用@ManagedAttribute@ManagedOperation附注。 由于这些注解也用于向 JMX MBean 注册表公开方法,因此它们提供了一个方便的副产品:通常,您希望向控制总线公开的相同类型的作对于通过 JMX 公开是合理的。 应用程序上下文中任何特定实例的解析都是在典型的 SPEL 语法中实现的。 为此,请为 bean name 提供 bean 的 SPEL 前缀 () 。 例如,要在 Spring Bean 上执行方法,客户端可以向作通道发送消息,如下所示:@spring-doc.cadn.net.cn

Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)

表达式的上下文根是Message本身,因此您还可以访问payloadheaders作为变量。 这与 Spring 集成端点中的所有其他表达式支持一致。spring-doc.cadn.net.cn

使用 Java 注释,您可以按如下方式配置控制总线:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
    return new ExpressionControlBusFactoryBean();
}

同样,您可以按如下方式配置 Java DSL 流定义:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlow.from("controlBus")
              .controlBus()
              .get();
}

如果您更喜欢将 lambda 表达式与自动DirectChannel创建,您可以按如下方式创建 Control Bus:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow controlBus() {
    return IntegrationFlowDefinition::controlBus;
}

在这种情况下,通道被命名为controlBus.input.spring-doc.cadn.net.cn

有序关机

如“MBean 导出器”中所述,MBean 导出器提供了一个名为stopActiveComponents,用于有序停止应用程序。 该作具有单个Long参数。 该参数指示作等待多长时间(以毫秒为单位)以允许正在进行的消息完成。 该作的工作原理如下:spring-doc.cadn.net.cn

  1. beforeShutdown()在所有实现OrderlyShutdownCapable.spring-doc.cadn.net.cn

    这样做可以让此类组件为关闭做好准备。 实现此接口的组件示例以及它们对此调用执行的作包括 JMS 和 AMQP 消息驱动的适配器(用于停止其侦听器容器)、停止接受新连接(同时保持现有连接打开)的 TCP 服务器连接工厂、丢弃(记录)收到的任何新消息的 TCP 入站端点以及返回503 - Service Unavailable对于任何新请求。spring-doc.cadn.net.cn

  2. 停止任何活动通道,例如 JMS 或 AMQP 支持的通道。spring-doc.cadn.net.cn

  3. 全部停止MessageSource实例。spring-doc.cadn.net.cn

  4. 停止所有入站MessageProducers(不是OrderlyShutdownCapable).spring-doc.cadn.net.cn

  5. 等待剩余的任何剩余时间,如Long参数。spring-doc.cadn.net.cn

    这样做可以让任何正在进行的消息完成其旅程。 因此,在调用此作时,选择适当的超时非常重要。spring-doc.cadn.net.cn

  6. afterShutdown()在所有OrderlyShutdownCapable组件。spring-doc.cadn.net.cn

    这样做可以让这些组件执行最终的关闭任务(例如,关闭所有打开的套接字)。spring-doc.cadn.net.cn

Orderly Shutdown Managed Operation 中所述,可以使用 JMX 调用此作。 如果您希望以编程方式调用该方法,则需要注入或以其他方式获取对IntegrationMBeanExporter. 如果没有id属性在<int-jmx:mbean-export/>definition,则 bean 具有生成的名称。 此名称包含要避免的随机组件ObjectName如果同一 JVM 中存在多个 Spring 集成上下文(MBeanServer).spring-doc.cadn.net.cn

因此,如果您希望以编程方式调用该方法,我们建议您为导出器提供id属性,以便您可以在应用程序上下文中轻松访问它。spring-doc.cadn.net.cn

最后,可以使用<control-bus>元素。 有关详细信息,请参阅监视 Spring 集成示例应用程序spring-doc.cadn.net.cn

前面描述的算法在版本 4.1 中得到了改进。 以前,所有任务执行程序和计划程序都已停止。 这可能会导致QueueChannel要保留的实例。 现在关闭使 poller 保持运行状态,以便排空和处理这些消息。

集成图

从版本 4.3 开始, Spring 集成提供了对应用程序的运行时对象模型的访问,该模型可以选择包括组件度量。 它以图表的形式公开,可用于可视化集成应用程序的当前状态。 这o.s.i.support.management.graphpackage 包含收集、构建和渲染 Spring 集成组件的运行时状态所需的所有类Graph对象。 这IntegrationGraphServer应声明为 Bean 来构建、检索和刷新Graph对象。 结果Graphobject 可以序列化为任何格式,尽管 JSON 在客户端解析和表示起来很灵活和方便。 仅具有默认组件的 Spring 集成应用程序将公开一个图形,如下所示:spring-doc.cadn.net.cn

{
  "contentDescriptor" : {
    "providerVersion" : "6.0.9",
    "providerFormatVersion" : 1.2,
    "provider" : "spring-integration",
    "name" : "myAppName:1.0"
  },
  "nodes" : [ {
    "nodeId" : 1,
    "componentType" : "null-channel",
    "integrationPatternType" : "null_channel",
    "integrationPatternCategory" : "messaging_channel",
    "properties" : { },
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 0.0,
        "max" : 0.0
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "receiveCounters" : {
      "successes" : 0,
      "failures" : 0
    },
    "name" : "nullChannel"
  }, {
    "nodeId" : 2,
    "componentType" : "publish-subscribe-channel",
    "integrationPatternType" : "publish_subscribe_channel",
    "integrationPatternCategory" : "messaging_channel",
    "properties" : { },
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 7.807002,
        "max" : 7.807002
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "name" : "errorChannel"
  }, {
    "nodeId" : 3,
    "componentType" : "logging-channel-adapter",
    "integrationPatternType" : "outbound_channel_adapter",
    "integrationPatternCategory" : "messaging_endpoint",
    "properties" : { },
    "output" : null,
    "input" : "errorChannel",
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 6.742722,
        "max" : 6.742722
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "name" : "errorLogger"
  } ],
  "links" : [ {
    "from" : 2,
    "to" : 3,
    "type" : "input"
  } ]
}
版本 5.2 弃用了旧指标,转而使用千分尺,如 Metrics Management 中所述。 旧版指标已在版本 5.4 中删除,将不再显示在图表中。

在前面的示例中,图形由三个顶级元素组成。spring-doc.cadn.net.cn

contentDescriptorgraph 元素包含有关提供数据的应用程序的一般信息。 这name可以在IntegrationGraphServerbean 或spring.application.nameapplication context environment 属性。 框架提供了其他属性,允许您将类似模型与其他源区分开来。spring-doc.cadn.net.cn

linksgraph 元素表示节点之间的连接,来自nodesgraph 元素,因此,在源 Spring Integration 应用程序中的集成组件之间。 例如,从MessageChannel更改为EventDrivenConsumer与一些MessageHandler或从AbstractReplyProducingMessageHandler更改为MessageChannel. 为了方便起见并让您确定链接的用途,该模型包括type属性。 可能的类型包括:spring-doc.cadn.net.cn

  • input:标识方向MessageChannel到终端节点,inputChannelrequestChannel财产spring-doc.cadn.net.cn

  • output:从MessageHandler,MessageProducerSourcePollingChannelAdapterMessageChannel通过outputChannelreplyChannel财产spring-doc.cadn.net.cn

  • error:从MessageHandlerPollingConsumerMessageProducerSourcePollingChannelAdapterMessageChannel通过errorChannel财产;spring-doc.cadn.net.cn

  • discard:从DiscardingMessageHandler(例如MessageFilter) 到MessageChannel通过errorChannel财产。spring-doc.cadn.net.cn

  • route:从AbstractMappingMessageRouter(例如HeaderValueRouter) 到MessageChannel. 似output但在运行时确定。 可能是配置的 channel mapping 或动态解析的 channel。 为此,路由器通常最多只保留 100 个动态路由,但您可以通过设置dynamicChannelLimit财产。spring-doc.cadn.net.cn

可视化工具可以使用此元素中的信息来呈现节点之间的连接,这些节点来自nodesgraph 元素,其中fromto数字表示nodeId属性。 例如,link元素可用于确定适当的port在目标节点上。spring-doc.cadn.net.cn

下面的 “text image” 显示了类型之间的关系:spring-doc.cadn.net.cn

              +---(discard)
              |
         +----o----+
         |         |
         |         |
         |         |
(input)--o         o---(output)
         |         |
         |         |
         |         |
         +----o----+
              |
              +---(error)

nodesgraph 元素可能是最有趣的,因为它的元素不仅包含运行时组件及其componentTypeinstances 和name值,但也可以选择包含组件公开的指标。 Node 元素包含各种属性,这些属性通常一目了然。 例如,基于表达式的组件包括expression包含组件的主表达式字符串的属性。 要启用这些指标,请添加@EnableIntegrationManagement更改为@Configuration类或添加<int:management/>元素添加到 XML 配置中。 有关完整信息,请参阅 Metrics and Managementspring-doc.cadn.net.cn

nodeId表示唯一的增量标识符,以便将一个组件与另一个组件区分开来。 它还用于links元素来表示此组件与其他组件的关系(连接)(如果有)。 这inputoutputattributes 用于inputChanneloutputChannel的属性AbstractEndpoint,MessageHandler,SourcePollingChannelAdapterMessageProducerSupport. 有关更多信息,请参阅下一节。spring-doc.cadn.net.cn

从版本 5.1 开始,IntegrationGraphServer接受Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback对于IntegrationNode对于特定的NamedComponent. 例如,您可以公开SmartLifecycle autoStartuprunning属性添加到目标图中:spring-doc.cadn.net.cn

server.setAdditionalPropertiesCallback(namedComponent -> {
            Map<String, Object> properties = null;
            if (namedComponent instanceof SmartLifecycle) {
                SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
                properties = new HashMap<>();
                properties.put("auto-startup", smartLifecycle.isAutoStartup());
                properties.put("running", smartLifecycle.isRunning());
            }
            return properties;
        });

Graph 运行时模型

Spring 集成组件具有不同的复杂程度。 例如,任何轮询的MessageSource还有一个SourcePollingChannelAdapter以及MessageChannel定期从源数据向其发送消息。 其他组件可能是中间件请求-回复组件(例如JmsOutboundGateway) 替换为AbstractEndpoint要订阅(或轮询)该requestChannel (input) 用于消息,并使用replyChannel (output) 生成要向下游发送的回复消息。 同时,任何MessageProducerSupportimplementation (例如ApplicationEventListeningMessageProducer) 包装一些源协议侦听逻辑,并将消息发送到outputChannel.spring-doc.cadn.net.cn

在图中, Spring 集成组件通过使用IntegrationNode类层次结构,您可以在o.s.i.support.management.graph包。 例如,您可以使用ErrorCapableDiscardingMessageHandlerNode对于AggregatingMessageHandler(因为它有一个discardChannel选项),并且在从PollableChannel通过使用PollingConsumer. 另一个例子是CompositeMessageHandlerNode— 对于MessageHandlerChain当订阅了SubscribableChannel通过使用EventDrivenConsumer.spring-doc.cadn.net.cn

@MessagingGateway(请参阅 Messaging Gateway)为其每个方法提供节点,其中name属性基于网关的 bean 名称和短方法签名。 请考虑以下网关示例:
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {

	void foo(String foo);

	void foo(Integer foo);

	void bar(String bar);

}

前面的网关生成类似于以下内容的节点:spring-doc.cadn.net.cn

{
  "nodeId" : 10,
  "name" : "gate.bar(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 11,
  "name" : "gate.foo(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 12,
  "name" : "gate.foo(class java.lang.Integer)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
}

您可以使用这个IntegrationNode层次结构,用于解析 Client 端的图形模型以及了解一般的 Spring 集成运行时行为。 有关更多信息,另请参阅编程提示和技巧spring-doc.cadn.net.cn

版本 5.3 引入了IntegrationPatternabstraction 和所有代表企业集成模式 (EIP) 的开箱即用组件实现此抽象并提供IntegrationPatternTypeenum 值。 此信息对于目标应用程序中的某些分类逻辑非常有用,或者,如果公开到图形节点中,则 UI 可以使用此信息来确定如何绘制组件。spring-doc.cadn.net.cn

集成图控制器

如果您的应用程序是基于 Web 的(或构建在带有嵌入式 Web 容器的 Spring Boot 之上),并且 Spring 集成 HTTP 或 WebFlux 模块(分别参见 HTTP 支持WebFlux 支持)存在于类路径上,则可以使用IntegrationGraphController要公开IntegrationGraphServer功能作为 REST 服务。 为此,@EnableIntegrationGraphController@Configuration类注释和<int-http:graph-controller/>XML 元素在 HTTP 模块中可用。 与@EnableWebMvcannotation(或<mvc:annotation-driven/>对于 XML 定义),此配置会注册一个IntegrationGraphController @RestController其中@RequestMapping.path可以在@EnableIntegrationGraphControllerannotation 或<int-http:graph-controller/>元素。 默认路径为/integration.spring-doc.cadn.net.cn

IntegrationGraphController @RestController提供以下服务:spring-doc.cadn.net.cn

  • @GetMapping(name = "getGraph"):检索自上次IntegrationGraphServer刷新。 这o.s.i.support.management.graph.Graph作为@ResponseBody的 REST 服务。spring-doc.cadn.net.cn

  • @GetMapping(path = "/refresh", name = "refreshGraph"):刷新当前Graph以获取实际的运行时状态,并将其作为 REST 响应返回。 无需刷新指标的图表。 检索图形时,将实时提供这些指标。 如果自上次检索图形以来修改了应用程序上下文,则可以调用 Refresh。 在这种情况下,图形将完全重新构建。spring-doc.cadn.net.cn

您可以为IntegrationGraphController使用 Spring Security 和 Spring MVC 项目提供的标准配置选项和组件。 以下示例可实现这些目标:spring-doc.cadn.net.cn

<mvc:annotation-driven />

<mvc:cors>
	<mvc:mapping path="/myIntegration/**"
				 allowed-origins="http://localhost:9090"
				 allowed-methods="GET" />
</mvc:cors>

<security:http>
    <security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>


<int-http:graph-controller path="/myIntegration" />

以下示例显示了如何对 Java 配置执行相同的作:spring-doc.cadn.net.cn

@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
	    http
            .authorizeRequests()
               .antMatchers("/testIntegration/**").hasRole("ADMIN")
            // ...
            .formLogin();
    }

    //...

}

请注意,为方便起见,@EnableIntegrationGraphControllerannotation 提供了一个allowedOrigins属性。 这提供了GET访问path. 为了更复杂,你可以使用标准的 Spring MVC 机制来配置 CORS 映射。spring-doc.cadn.net.cn