系统管理
指标和管理
本节介绍如何捕获 Spring Integration 的指标。 在最近的版本中,我们更加依赖 Micrometer(参见 https://micrometer.io),我们计划在未来的版本中更多地使用 Micrometer。
在高容量环境中禁用日志记录
您可以在主消息流中控制调试日志记录。
在非常高容量的应用程序中,对isDebugEnabled()
对于某些日志记录子系统来说,可能会非常昂贵。
您可以禁用所有此类日志记录以避免此开销。
异常日志记录 (debug 或其他) 不受此设置的影响。
以下清单显示了用于控制日志记录的可用选项:
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
defaultLoggingEnabled = "true" <1>)
public static class ContextConfiguration {
...
}
<int:management default-logging-enabled="true"/> (1)
1 | 设置为false 以禁用主消息流中的所有日志记录,而不管 Log System Category (日志系统类别) 设置如何。
设置为 'true' 以启用调试日志记录(如果日志记录子系统也启用了)。
仅在未在 Bean 定义中显式配置设置时应用。
默认值为true . |
defaultLoggingEnabled 仅当尚未在 Bean 定义中显式配置相应的设置时,才适用。 |
千分尺集成
概述
从版本 5.0.3 开始,存在 Micrometer MeterRegistry
在应用程序上下文中触发对 Micrometer 度量的支持。
要使用 Micrometer,请添加MeterRegistry
bean 添加到应用程序上下文中。
对于每个MessageHandler
和MessageChannel
,则注册计时器。
对于每个MessageSource
时,将注册一个计数器。
这仅适用于扩展AbstractMessageHandler
,AbstractMessageChannel
和AbstractMessageSource
(大多数框架组件都是这种情况)。
这Timer
消息通道上的发送作计量具有以下名称或标签:
-
name
:spring.integration.send
-
tag
:type:channel
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Send processing time
(一个failure
result 替换为none
exception 表示通道的send()
作返回false
.)
这Counter
用于轮询消息通道上的接收作的计量器具有以下名称或标签:
-
name
:spring.integration.receive
-
tag
:type:channel
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Messages received
这Timer
消息处理程序上的作计量器具有以下名称或标签:
-
name
:spring.integration.send
-
tag
:type:handler
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Send processing time
这Counter
消息源的 meters 具有以下名称/标签:
-
name
:spring.integration.receive
-
tag
:type:source
-
tag
:name:<componentName>
-
tag
:result:success
-
tag
:exception:none
-
description
:Messages received
此外,还有三个Gauge
米:
-
spring.integration.channels
:的数量MessageChannels
在应用程序中。 -
spring.integration.handlers
:的数量MessageHandlers
在应用程序中。 -
spring.integration.sources
:的数量MessageSources
在应用程序中。
可以自定义Meters
由集成组件创建,方法是提供MicrometerMetricsCaptor
.
MicrometerCustomMetricsTests 测试用例显示了如何执行此作的简单示例。
您还可以通过重载build()
方法。
从版本 5.1.13 开始,QueueChannel
公开 Micrometer 仪表的队列大小和剩余容量:
-
name
:spring.integration.channel.queue.size
-
tag
:type:channel
-
tag
:name:<componentName>
-
description
:The size of the queue channel
和
-
name
:spring.integration.channel.queue.remaining.capacity
-
tag
:type:channel
-
tag
:name:<componentName>
-
description
:The remaining capacity of the queue channel
禁用仪表
默认情况下,所有计量在首次使用时都会注册。
现在,使用 Micrometer,您可以添加MeterFilter
s 到MeterRegistry
以防止部分或全部注册。
您可以按提供的任何属性过滤(拒绝)仪表。name
,tag
等。
有关更多信息,请参阅 Micrometer 文档中的 Meter Filters。
例如,给定:
@Bean
public QueueChannel noMeters() {
return new QueueChannel(10);
}
您可以通过以下方式禁止此通道的计量注册:
registry.config().meterFilter(MeterFilter.deny(id ->
"channel".equals(id.getTag("type")) &&
"noMeters".equals(id.getTag("name"))));
千分尺观察
从版本 6.0 开始, Spring 集成利用了 Micrometer Observation 抽象,它可以处理度量以及通过适当的ObservationHandler
配置。
在IntegrationManagement
组件,只要ObservationRegistry
bean 存在于应用程序上下文中,并且@EnableIntegrationManagement
已配置。
要自定义应插桩的组件集,请使用observationPatterns()
属性在@EnableIntegrationManagement
注解。
有关模式匹配算法,请参阅其 javadocs。
默认情况下,没有任何IntegrationManagement 组件使用ObservationRegistry 豆。
可以配置为匹配所有组件。* |
在这种情况下,计量不是独立收集的,而是委托给适当的ObservationHandler
在提供的ObservationRegistry
.
以下 Spring 集成组件使用 observe logic 进行检测,每个组件都有各自的约定:
-
MessageProducerSupport
作为流的入站终端节点,被视为CONSUMER
span 类型并使用IntegrationObservation.HANDLER
应用程序接口; -
MessagingGatewaySupport' 是一个入站请求-回复终端节点,被视为
SERVER
span 类型。 它使用IntegrationObservation.GATEWAY
应用程序接口; -
一
AbstractMessageChannel.send()
operation是唯一生成消息的 Spring 集成 API。 因此,它被视为PRODUCER
span 类型并使用IntegrationObservation.PRODCUER
应用程序接口。 当通道是分布式实现时(例如PublishSubscribeKafkaChannel
或ZeroMqChannel
),并且必须将跟踪信息添加到消息中。 因此,IntegrationObservation.PRODUCER
观察基于MessageSenderContext
其中 Spring Integration 提供了一个MutableMessage
允许后续跟踪Propagator
添加标头,以便它们可供使用者使用; -
一
AbstractMessageHandler
是一个CONSUMER
span 类型并使用IntegrationObservation.HANDLER
应用程序接口。
对IntegrationManagement
组件可以通过ObservationConvention
配置。
例如,AbstractMessageHandler
期望MessageReceiverObservationConvention
通过其setObservationConvention()
应用程序接口。
以下是观察 API 支持的指标、跨度和约定:
可观测性 - 指标
您可以在下面找到此项目声明的所有指标的列表。
网关
对入站消息网关的观察。
指标名称 spring.integration.gateway
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention
).类型 timer
.
指标名称 spring.integration.gateway.active
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention
).类型 long task timer
.
*.active 指标中可能缺少在开始观察后添加的 KeyValue。 |
千分尺内部使用nanoseconds 对于 baseUnit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒) |
封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation
.
所有标签都必须以spring.integration. 前缀! |
名字 |
描述 |
|
消息网关组件的名称。 |
|
请求/回复执行的结果。 |
|
组件类型 - 'gateway'。 |
处理器
消息处理程序的观察。
指标名称 spring.integration.handler
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention
).类型 timer
.
指标名称 spring.integration.handler.active
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention
).类型 long task timer
.
*.active 指标中可能缺少在开始观察后添加的 KeyValue。 |
千分尺内部使用nanoseconds 对于 baseUnit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒) |
封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation
.
所有标签都必须以spring.integration. 前缀! |
名字 |
描述 |
|
消息处理程序组件的名称。 |
|
组件的类型 - 'handler'。 |
制作人
对消息生产者(例如通道)的观察。
指标名称 spring.integration.producer
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention
).类型 timer
.
指标名称 spring.integration.producer.active
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention
).类型 long task timer
.
*.active 指标中可能缺少在开始观察后添加的 KeyValue。 |
千分尺内部使用nanoseconds 对于 baseUnit。但是,每个后端都决定了实际的 baseunit。(即 Prometheus 使用秒) |
封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation
.
所有标签都必须以spring.integration. 前缀! |
名字 |
描述 |
|
消息处理程序组件的名称。 |
|
组件的类型 - 'producer'。 |
可观测性 - Span
您可以在下面找到此项目声明的所有 span 的列表。
网关跨度
对入站消息网关的观察。
Span 名称 spring.integration.gateway
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention
).
封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation
.
所有标签都必须以spring.integration. 前缀! |
名字 |
描述 |
|
消息网关组件的名称。 |
|
请求/回复执行的结果。 |
|
组件类型 - 'gateway'。 |
处理程序 Span
消息处理程序的观察。
Span 名称 spring.integration.handler
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention
).
封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation
.
所有标签都必须以spring.integration. 前缀! |
名字 |
描述 |
|
消息处理程序组件的名称。 |
|
组件的类型 - 'handler'。 |
生产者跨度
对消息生产者(例如通道)的观察。
Span 名称 spring.integration.producer
(由 convention 类定义o.s.i.support.management.observation.DefaultMessageSenderObservationConvention
).
封闭类的完全限定名称o.s.i.support.management.observation.IntegrationObservation
.
所有标签都必须以spring.integration. 前缀! |
名字 |
描述 |
|
消息处理程序组件的名称。 |
|
组件的类型 - 'producer'。 |
可观测性 - 约定
您可以在下面找到所有GlobalObservationConvention
和ObservationConvention
由此项目声明。
ObservationConvention 类名 |
适用的 ObservationContext 类名 |
|
|
|
|
|
|
|
|
|
|
|
|
观测传播
要在一个跟踪中提供连接的 span 链,而与消息传递流的性质无关,即使MessageChannel
是 persistent 和 distributed 的,则必须在此通道和此通道的使用者(订阅者)上启用观察。
这样,跟踪信息在传播到使用者线程或持久化到数据库中之前,会先存储在消息标头中。
这是通过上述方式完成的MessageSenderContext
.
消费者 (aMessageHandler
) 端使用MessageReceiverContext
并开始一个新的孩子Observation
.
Spring 集成 JMX 支持
另请参阅 JMX 支持。
消息历史记录
消息传递体系结构的主要优点是松散耦合,因此参与的组件不会保持对彼此的任何感知。 仅这一事实就使应用程序非常灵活,允许您在不影响流的其余部分的情况下更改组件、更改消息传递路由、更改消息使用样式(轮询与事件驱动)等等。 然而,当出现问题时,这种不起眼的架构风格可能会很困难。 在调试时,您可能希望获得尽可能多的有关消息的信息 (其来源、它遍历的通道和其他详细信息)。
Message History 是一种有用的模式,它为您提供了一个选项来保持对消息路径的某种程度的感知,用于调试目的或维护审计跟踪。 Spring 集成提供了一种简单的方法来配置消息流以维护消息历史记录,方法是向消息添加 Headers 并在每次消息通过跟踪组件时更新该 Header。
消息历史记录配置
要启用消息历史记录,您只需定义message-history
元素(或@EnableMessageHistory
),如以下示例所示:
@Configuration
@EnableIntegration
@EnableMessageHistory
<int:message-history/>
现在,每个命名组件(定义了 'id')都会被跟踪。
框架会在您的消息中设置 'history' 标头。
它的价值 aList<Properties>
.
请考虑以下配置示例:
@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
...
}
@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
HeaderEnricher enricher =
new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
return enricher;
}
<int:gateway id="sampleGateway"
service-interface="org.springframework.integration.history.sample.SampleGateway"
default-request-channel="bridgeInChannel"/>
<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
<int:header name="baz" value="baz"/>
</int:header-enricher>
前面的配置生成一个简单的消息历史记录结构,其输出类似于以下内容:
[{name=sampleGateway, type=gateway, timestamp=1283281668091},
{name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]
要访问消息历史记录,您只需访问MessageHistory
页眉。
以下示例显示了如何执行此作:
Iterator<Properties> historyIterator =
message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));
您可能不想跟踪所有组件。
要根据名称将历史记录限制为某些组件,您可以提供tracked-components
属性,并指定与要跟踪的组件匹配的组件名称和模式的逗号分隔列表。
以下示例显示了如何执行此作:
@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
<int:message-history tracked-components="*Gateway, sample*, aName"/>
在前面的示例中,仅维护以“Gateway”结尾、以“sample”开头或与名称“aName”完全匹配的组件的消息历史记录。
此外,MessageHistoryConfigurer
bean 现在由IntegrationMBeanExporter
(请参阅 MBean Exporter),允许您在运行时更改模式。
但是请注意,必须停止 Bean (关闭消息历史记录) 才能更改模式。
此功能对于临时打开历史记录以分析系统可能很有用。
MBean 的对象名称为<domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer
.
只有一个@EnableMessageHistory (或<message-history/> ) 必须在应用程序上下文中声明为 Components tracking Configuration 的单一来源。
不要对MessageHistoryConfigurer . |
根据定义,消息历史记录标头是不可变的(您不能重写历史记录)。 因此,在写入消息历史记录值时,组件要么创建新消息(当组件是源时),要么从请求消息中复制历史记录,对其进行修改并在回复消息上设置新列表。 在任一情况下,即使消息本身跨越线程边界,也可以追加这些值。 这意味着 history 值可以极大地简化异步消息流中的调试。 |
消息存储
Enterprise Integration Patterns (EIP) 一书确定了几种能够缓冲消息的模式。
例如,聚合器会缓冲消息,直到它们可以被释放,而QueueChannel
buffers 消息,直到使用者显式地从该通道接收这些消息。
由于消息流中的任何点都可能发生故障,因此缓冲消息的 EIP 组件也会引入消息可能丢失的点。
为了降低丢失消息的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常存储在某种类型的持久性存储(例如 RDBMS)中。
Spring 集成通过以下方式为消息存储模式提供支持:
-
定义
org.springframework.integration.store.MessageStore
策略界面 -
提供此接口的多种实现
-
公开
message-store
属性,以便您可以注入任何实现MessageStore
接口。
有关如何配置特定消息存储实现以及如何注入MessageStore
在整个手册中都介绍了特定缓冲组件的实现(请参阅特定组件,例如 QueueChannel、Aggregator、Delayer 等)。
以下一对示例显示了如何为QueueChannel
对于聚合商:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator … message-store="refToMessageStore"/>
默认情况下,消息使用o.s.i.store.SimpleMessageStore
,则MessageStore
.
这对于开发或简单的低容量环境来说可能很好,在这些环境中,非持久性消息的潜在丢失并不是一个问题。
但是,典型的生产应用程序需要一个更健壮的选项,这不仅是为了降低消息丢失的风险,也是为了避免潜在的内存不足错误。
因此,我们还提供MessageStore
各种数据存储的实现。
以下是支持的实施的完整列表:
-
Hazelcast 消息存储:使用 Hazelcast 分布式缓存来存储消息
-
JDBC 消息存储:使用 RDBMS 存储消息
-
Redis 消息存储:使用 Redis 键/值数据存储来存储消息
-
MongoDB 消息存储:使用 MongoDB 文档存储来存储消息
但是,在使用 Message 数据(payload 和 headers)使用不同的序列化策略进行序列化和反序列化,具体取决于 请特别注意表示某些类型数据的标头。
例如,如果其中一个 Headers 包含某个 Spring Bean 的实例,则在反序列化时,你最终可能会得到该 Bean 的不同实例,这会直接影响框架创建的一些隐式 Headers(例如 从 Spring 集成版本 3.0 开始,你可以通过配置一个 Headers Enricher 来解决这个问题,以便在向 此外,请考虑按如下方式配置 message-flow 时会发生什么:gateway → queue-channel(由持久 Message Store 支持)→ service-activator。
该网关会创建一个临时回复通道,当服务激活器的 Poller 从队列中读取时,该通道将丢失。
同样,您可以使用标头扩充器将标头替换为 有关更多信息,请参阅 Header Enricher。 |
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore
:要实现特定于QueueChannel
实例 -
PriorityCapableChannelMessageStore
:标记MessageStore
要用于的 implementations to use forPriorityChannel
实例,并为持久消息提供优先级顺序。
实际行为取决于实现。
该框架提供了以下实现,这些实现可以用作持久MessageStore
为QueueChannel
和PriorityChannel
:
注意事项
SimpleMessageStore 从版本 4.1 开始, 现在,在组件(如聚合器)之外访问组存储的用户可以直接引用聚合器正在使用的组,而不是副本。 在聚合器外部纵组可能会导致不可预知的结果。 因此,您不应该执行此类作,或者将 |
用MessageGroupFactory
从版本 4.3 开始,一些MessageGroupStore
实现可以注入自定义的MessageGroupFactory
策略来创建和自定义MessageGroup
实例MessageGroupStore
.
这默认为SimpleMessageGroupFactory
,它会产生SimpleMessageGroup
基于GroupType.HASH_SET
(LinkedHashSet
) internal 集合。
其他可能的选项包括SYNCHRONISED_SET
和BLOCKING_QUEUE
,其中最后一个可用于恢复之前的SimpleMessageGroup
行为。
此外,PERSISTENT
选项可用。
有关更多信息,请参阅下一节。
从版本 5.0.1 开始,LIST
当组中消息的顺序和唯一性无关紧要时,选项也可用。
持续MessageGroupStore
和 Lazy-load
从版本 4.3 开始,所有 persistentMessageGroupStore
实例检索MessageGroup
实例及其messages
以 lazy-load 方式从 store 中。
在大多数情况下,它对关联MessageHandler
实例(请参阅 Aggregator 和 Resequencer),这会增加加载整个MessageGroup
从存储中执行每个关联作。
您可以使用AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
选项从配置中关闭 Lazy-load 行为。
我们对 MongoDB 上的延迟加载进行的性能测试MessageStore
(MongoDB 消息存储)和<aggregator>
(聚合器)使用自定义release-strategy
类似于以下内容:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
对于 1000 条简单消息,它会生成类似于以下内容的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
但是,从版本 5.5 开始,所有持久性MessageGroupStore
implementations 提供了一个streamMessagesForGroup(Object groupId)
Contract 的 Contract。
当 store 中的组非常大时,这可以提高资源利用率。
在框架内部,当 Delayer 在启动时重新安排持久消息时,将使用此新 API(例如)。
返回的Stream<Message<?>>
必须在处理结束时关闭,例如通过try-with-resources
.
每当PersistentMessageGroup
,则其streamMessages()
delegates 到MessageGroupStore.streamMessagesForGroup()
.
消息组条件
从版本 5.5 开始,MessageGroup
abstraction 提供了一个condition
string 选项。
此选项的值可以是以后可以出于任何原因解析以为组做出决策的任何内容。
例如,一个ReleaseStrategy
从关联消息处理程序可以从组中查询此属性,而不是迭代组中的所有消息。
这MessageGroupStore
暴露一个setGroupCondition(Object groupId, String condition)
应用程序接口。
为此,一个setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
选项已添加到AbstractCorrelatingMessageHandler
.
在将每条消息添加到组后,将根据每条消息以及组的现有条件评估此函数。
实现可以决定返回新值、现有值或将目标条件重置为null
.
的值condition
可以是 JSON、SPEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。
例如,FileMarkerReleaseStrategy
从 File Aggregator 组件中,将条件填充到一个组中FileHeaders.LINE_COUNT
标头的FileSplitter.FileMarker.Mark.END
消息并从其canRelease()
将组大小与此条件中的值进行比较。
这样,它不会迭代组中的所有消息来查找FileSplitter.FileMarker.Mark.END
message 替换为FileHeaders.LINE_COUNT
页眉。
它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。
此外,为了便于配置,一个GroupConditionProvider
合同。
这AbstractCorrelatingMessageHandler
检查提供的ReleaseStrategy
实现此接口并提取conditionSupplier
用于组条件评估逻辑。
元数据存储
许多外部系统、服务或资源不是事务性的(Twitter、RSS、文件系统等),并且无法将数据标记为已读。
此外,有时,您可能需要在某些集成解决方案中实现 Enterprise Integration Pattern 幂等接收器。
为了实现这个目标,并在下一次与外部系统交互之前存储端点的一些先前状态,或者为了处理下一条消息, Spring 集成提供了元数据存储组件作为org.springframework.integration.metadata.MetadataStore
具有通用键值协定的接口。
元数据存储旨在存储各种类型的通用元数据(例如,已处理的最后一个源条目的发布日期),以帮助源适配器等组件处理重复项。
如果组件没有直接提供对MetadataStore
中,查找元数据存储的算法如下:首先,查找具有metadataStore
ID 中的应用程序上下文。
如果找到,请使用它。
否则,请创建一个新的SimpleMetadataStore
,这是一种内存中实现,仅在当前运行的应用程序上下文的生命周期内保留元数据。
这意味着,在重新启动时,您最终可能会得到重复的条目。
如果您需要在应用程序上下文重新启动之间保留元数据,框架会提供以下持久性MetadataStores
:
-
PropertiesPersistingMetadataStore
这PropertiesPersistingMetadataStore
由属性文件和PropertiesPersister
.
默认情况下,它仅在应用程序上下文正常关闭时保留状态。
它实现了Flushable
这样您就可以随意持久化状态,通过调用flush()
.
以下示例显示了如何使用 XML 配置 'PropertiesPersistingMetadataStore':
<bean id="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>
或者,您也可以提供自己的MetadataStore
接口(例如JdbcMetadataStore
),并在应用程序上下文中将其配置为 Bean。
从版本 4.0 开始,SimpleMetadataStore
,PropertiesPersistingMetadataStore
和RedisMetadataStore
实现ConcurrentMetadataStore
.
这些实例提供原子更新,并且可以跨多个组件或应用程序实例使用。
幂等接收器和元数据存储
当需要筛选传入消息(如果已处理)时,元数据存储对于实施 EIP 幂等接收器模式非常有用,您可以丢弃该消息或在丢弃时执行一些其他逻辑。 以下配置显示了如何执行此作的示例:
<int:filter input-channel="serviceChannel"
output-channel="idempotentServiceChannel"
discard-channel="discardChannel"
expression="@metadataStore.get(headers.businessKey) == null"/>
<int:publish-subscribe-channel id="idempotentServiceChannel"/>
<int:outbound-channel-adapter channel="idempotentServiceChannel"
expression="@metadataStore.put(headers.businessKey, '')"/>
<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>
这value
的 Timeout Entry可能是一个过期日期,在此之后,该条目应由某个预定的收割者从元数据存储中删除。
MetadataStoreListener
一些元数据存储(目前只有 zookeeper)支持注册侦听器以在项目更改时接收事件,如下例所示:
public interface MetadataStoreListener {
void onAdd(String key, String value);
void onRemove(String key, String oldValue);
void onUpdate(String key, String newValue);
}
有关更多信息,请参阅 Javadoc。
这MetadataStoreListenerAdapter
如果您只对事件的子集感兴趣,则可以将其子类化。
控制总线
正如 Enterprise Integration Patterns (EIP) 一书中描述的,控制总线背后的思想是,可以使用与用于 “应用程序级” 消息传递相同的消息传递系统来监视和管理框架内的组件。 在 Spring 集成中,我们构建在上述适配器的基础上,以便您可以发送消息作为调用公开作的一种方式。
以下示例说明如何使用 XML 配置控制总线:
<int:control-bus input-channel="operationChannel"/>
控制总线有一个 Importing 通道,可以访问该通道以调用应用程序上下文中的 bean 上的作。 它还具有服务激活终端节点的所有通用属性。 例如,如果作结果具有要发送到下游通道的返回值,则可以指定输出通道。
控制总线将 input 通道上的消息作为 Spring 表达式语言 (SpEL) 表达式运行。
它接受一条消息,将 body 编译为表达式,添加一些上下文,然后运行它。
默认上下文支持已使用@ManagedAttribute
或@ManagedOperation
.
它还支持 Spring 的Lifecycle
接口(及其Pausable
扩展),并且它支持用于配置 Spring 的多个TaskExecutor
和TaskScheduler
实现。
确保您自己的方法可用于控制总线的最简单方法是使用@ManagedAttribute
或@ManagedOperation
附注。
由于这些注解也用于向 JMX MBean 注册表公开方法,因此它们提供了一个方便的副产品:通常,您希望向控制总线公开的相同类型的作对于通过 JMX 公开是合理的。
应用程序上下文中任何特定实例的解析都是在典型的 SPEL 语法中实现的。
为此,请为 bean name 提供 bean 的 SPEL 前缀 () 。
例如,要在 Spring Bean 上执行方法,客户端可以向作通道发送消息,如下所示:@
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)
表达式的上下文根是Message
本身,因此您还可以访问payload
和headers
作为变量。
这与 Spring 集成端点中的所有其他表达式支持一致。
使用 Java 注释,您可以按如下方式配置控制总线:
@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
同样,您可以按如下方式配置 Java DSL 流定义:
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from("controlBus")
.controlBus()
.get();
}
如果您更喜欢将 lambda 表达式与自动DirectChannel
创建,您可以按如下方式创建 Control Bus:
@Bean
public IntegrationFlow controlBus() {
return IntegrationFlowDefinition::controlBus;
}
在这种情况下,通道被命名为controlBus.input
.
有序关机
如“MBean 导出器”中所述,MBean 导出器提供了一个名为stopActiveComponents
,用于有序停止应用程序。
该作具有单个Long
参数。
该参数指示作等待多长时间(以毫秒为单位)以允许正在进行的消息完成。
该作的工作原理如下:
-
叫
beforeShutdown()
在所有实现OrderlyShutdownCapable
.这样做可以让此类组件为关闭做好准备。 实现此接口的组件示例以及它们对此调用执行的作包括 JMS 和 AMQP 消息驱动的适配器(用于停止其侦听器容器)、停止接受新连接(同时保持现有连接打开)的 TCP 服务器连接工厂、丢弃(记录)收到的任何新消息的 TCP 入站端点以及返回
503 - Service Unavailable
对于任何新请求。 -
停止任何活动通道,例如 JMS 或 AMQP 支持的通道。
-
全部停止
MessageSource
实例。 -
停止所有入站
MessageProducer
s(不是OrderlyShutdownCapable
). -
等待剩余的任何剩余时间,如
Long
参数。这样做可以让任何正在进行的消息完成其旅程。 因此,在调用此作时,选择适当的超时非常重要。
-
叫
afterShutdown()
在所有OrderlyShutdownCapable
组件。这样做可以让这些组件执行最终的关闭任务(例如,关闭所有打开的套接字)。
如 Orderly Shutdown Managed Operation 中所述,可以使用 JMX 调用此作。
如果您希望以编程方式调用该方法,则需要注入或以其他方式获取对IntegrationMBeanExporter
.
如果没有id
属性在<int-jmx:mbean-export/>
definition,则 bean 具有生成的名称。
此名称包含要避免的随机组件ObjectName
如果同一 JVM 中存在多个 Spring 集成上下文(MBeanServer
).
因此,如果您希望以编程方式调用该方法,我们建议您为导出器提供id
属性,以便您可以在应用程序上下文中轻松访问它。
最后,可以使用<control-bus>
元素。
有关详细信息,请参阅监视 Spring 集成示例应用程序。
前面描述的算法在版本 4.1 中得到了改进。
以前,所有任务执行程序和计划程序都已停止。
这可能会导致QueueChannel 要保留的实例。
现在关闭使 poller 保持运行状态,以便排空和处理这些消息。 |
集成图
从版本 4.3 开始, Spring 集成提供了对应用程序的运行时对象模型的访问,该模型可以选择包括组件度量。
它以图表的形式公开,可用于可视化集成应用程序的当前状态。
这o.s.i.support.management.graph
package 包含收集、构建和渲染 Spring 集成组件的运行时状态所需的所有类Graph
对象。
这IntegrationGraphServer
应声明为 Bean 来构建、检索和刷新Graph
对象。
结果Graph
object 可以序列化为任何格式,尽管 JSON 在客户端解析和表示起来很灵活和方便。
仅具有默认组件的 Spring 集成应用程序将公开一个图形,如下所示:
{
"contentDescriptor" : {
"providerVersion" : "6.1.9",
"providerFormatVersion" : 1.2,
"provider" : "spring-integration",
"name" : "myAppName:1.0"
},
"nodes" : [ {
"nodeId" : 1,
"componentType" : "null-channel",
"integrationPatternType" : "null_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 0.0,
"max" : 0.0
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"receiveCounters" : {
"successes" : 0,
"failures" : 0
},
"name" : "nullChannel"
}, {
"nodeId" : 2,
"componentType" : "publish-subscribe-channel",
"integrationPatternType" : "publish_subscribe_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 7.807002,
"max" : 7.807002
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorChannel"
}, {
"nodeId" : 3,
"componentType" : "logging-channel-adapter",
"integrationPatternType" : "outbound_channel_adapter",
"integrationPatternCategory" : "messaging_endpoint",
"properties" : { },
"output" : null,
"input" : "errorChannel",
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 6.742722,
"max" : 6.742722
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorLogger"
} ],
"links" : [ {
"from" : 2,
"to" : 3,
"type" : "input"
} ]
}
版本 5.2 弃用了旧指标,转而使用千分尺,如 Metrics Management 中所述。 旧版指标已在版本 5.4 中删除,将不再显示在图表中。 |
在前面的示例中,图形由三个顶级元素组成。
这contentDescriptor
graph 元素包含有关提供数据的应用程序的一般信息。
这name
可以在IntegrationGraphServer
bean 或spring.application.name
application context environment 属性。
框架提供了其他属性,并允许您将类似模型与其他源区分开来。
这links
graph 元素表示节点之间的连接,来自nodes
graph 元素,因此,在源 Spring Integration 应用程序中的集成组件之间。
例如,从MessageChannel
更改为EventDrivenConsumer
与一些MessageHandler
或从AbstractReplyProducingMessageHandler
更改为MessageChannel
.
为了方便起见并让您确定链接的用途,该模型包括type
属性。
可能的类型包括:
-
input
:标识方向MessageChannel
到终端节点,inputChannel
或requestChannel
财产 -
output
:从MessageHandler
,MessageProducer
或SourcePollingChannelAdapter
到MessageChannel
通过outputChannel
或replyChannel
财产 -
error
:从MessageHandler
上PollingConsumer
或MessageProducer
或SourcePollingChannelAdapter
到MessageChannel
通过errorChannel
财产; -
discard
:从DiscardingMessageHandler
(例如MessageFilter
) 到MessageChannel
通过errorChannel
财产。 -
route
:从AbstractMappingMessageRouter
(例如HeaderValueRouter
) 到MessageChannel
. 似output
但在运行时确定。 可能是配置的 channel mapping 或动态解析的 channel。 为此,路由器通常最多只保留 100 个动态路由,但您可以通过设置dynamicChannelLimit
财产。
可视化工具可以使用此元素中的信息来呈现节点之间的连接,这些节点来自nodes
graph 元素,其中from
和to
数字表示nodeId
属性。
例如,link
元素可用于确定适当的port
在目标节点上。
下面的 “text image” 显示了类型之间的关系:
+---(discard) | +----o----+ | | | | | | (input)--o o---(output) | | | | | | +----o----+ | +---(error)
这nodes
graph 元素可能是最有趣的,因为它的元素不仅包含运行时组件及其componentType
instances 和name
值,但也可以选择包含组件公开的指标。
Node 元素包含各种属性,这些属性通常一目了然。
例如,基于表达式的组件包括expression
包含组件的主表达式字符串的属性。
要启用这些指标,请添加@EnableIntegrationManagement
更改为@Configuration
类或添加<int:management/>
元素添加到 XML 配置中。
有关完整信息,请参阅 Metrics and Management。
这nodeId
表示唯一的增量标识符,以便将一个组件与另一个组件区分开来。
它还用于links
元素来表示此组件与其他组件的关系(连接)(如果有)。
这input
和output
attributes 用于inputChannel
和outputChannel
的属性AbstractEndpoint
,MessageHandler
,SourcePollingChannelAdapter
或MessageProducerSupport
.
有关更多信息,请参阅下一节。
从版本 5.1 开始,IntegrationGraphServer
接受Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback
对于IntegrationNode
对于特定的NamedComponent
.
例如,您可以公开SmartLifecycle
autoStartup
和running
属性添加到目标图中:
server.setAdditionalPropertiesCallback(namedComponent -> {
Map<String, Object> properties = null;
if (namedComponent instanceof SmartLifecycle) {
SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
properties = new HashMap<>();
properties.put("auto-startup", smartLifecycle.isAutoStartup());
properties.put("running", smartLifecycle.isRunning());
}
return properties;
});
Graph 运行时模型
Spring 集成组件具有不同的复杂程度。
例如,任何轮询的MessageSource
还有一个SourcePollingChannelAdapter
以及MessageChannel
定期从源数据向其发送消息。
其他组件可能是中间件请求-回复组件(例如JmsOutboundGateway
) 替换为AbstractEndpoint
要订阅(或轮询)该requestChannel
(input
) 用于消息,并使用replyChannel
(output
) 生成要向下游发送的回复消息。
同时,任何MessageProducerSupport
implementation (例如ApplicationEventListeningMessageProducer
) 包装一些源协议侦听逻辑,并将消息发送到outputChannel
.
在图中, Spring 集成组件通过使用IntegrationNode
类层次结构,您可以在o.s.i.support.management.graph
包。
例如,您可以使用ErrorCapableDiscardingMessageHandlerNode
对于AggregatingMessageHandler
(因为它有一个discardChannel
选项),并且在从PollableChannel
通过使用PollingConsumer
.
另一个例子是CompositeMessageHandlerNode
— 对于MessageHandlerChain
当订阅了SubscribableChannel
通过使用EventDrivenConsumer
.
这@MessagingGateway (请参阅 Messaging Gateway)为其每个方法提供节点,其中name 属性基于网关的 bean 名称和短方法签名。
请考虑以下网关示例: |
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {
void foo(String foo);
void foo(Integer foo);
void bar(String bar);
}
前面的网关生成类似于以下内容的节点:
{
"nodeId" : 10,
"name" : "gate.bar(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 11,
"name" : "gate.foo(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 12,
"name" : "gate.foo(class java.lang.Integer)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
}
您可以使用这个IntegrationNode
层次结构,用于解析 Client 端的图形模型以及了解一般的 Spring 集成运行时行为。
有关更多信息,另请参阅编程提示和技巧。
版本 5.3 引入了IntegrationPattern
abstraction 和所有代表企业集成模式 (EIP) 的开箱即用组件实现此抽象并提供IntegrationPatternType
enum 值。
此信息对于目标应用程序中的某些分类逻辑非常有用,或者,如果公开到图形节点中,则 UI 可以使用此信息来确定如何绘制组件。
集成图控制器
如果您的应用程序是基于 Web 的(或构建在带有嵌入式 Web 容器的 Spring Boot 之上),并且 Spring 集成 HTTP 或 WebFlux 模块(分别参见 HTTP 支持和 WebFlux 支持)存在于类路径上,则可以使用IntegrationGraphController
要公开IntegrationGraphServer
功能作为 REST 服务。
为此,@EnableIntegrationGraphController
和@Configuration
类注释和<int-http:graph-controller/>
XML 元素在 HTTP 模块中可用。
与@EnableWebMvc
annotation(或<mvc:annotation-driven/>
对于 XML 定义),此配置会注册一个IntegrationGraphController
@RestController
其中@RequestMapping.path
可以在@EnableIntegrationGraphController
annotation 或<int-http:graph-controller/>
元素。
默认路径为/integration
.
这IntegrationGraphController
@RestController
提供以下服务:
-
@GetMapping(name = "getGraph")
:检索自上次IntegrationGraphServer
刷新。 这o.s.i.support.management.graph.Graph
作为@ResponseBody
的 REST 服务。 -
@GetMapping(path = "/refresh", name = "refreshGraph")
:刷新当前Graph
以获取实际的运行时状态,并将其作为 REST 响应返回。 无需刷新指标的图表。 检索图形时,会实时提供这些指标。 如果自上次检索图形以来修改了应用程序上下文,则可以调用 Refresh。 在这种情况下,图形将完全重新构建。
您可以为IntegrationGraphController
使用 Spring Security 和 Spring MVC 项目提供的标准配置选项和组件。
以下示例可实现这些目标:
<mvc:annotation-driven />
<mvc:cors>
<mvc:mapping path="/myIntegration/**"
allowed-origins="http://localhost:9090"
allowed-methods="GET" />
</mvc:cors>
<security:http>
<security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>
<int-http:graph-controller path="/myIntegration" />
以下示例显示了如何对 Java 配置执行相同的作:
@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/testIntegration/**").hasRole("ADMIN")
// ...
.formLogin();
}
//...
}
请注意,为方便起见,@EnableIntegrationGraphController
annotation 提供了一个allowedOrigins
属性。
这提供了GET
访问path
.
为了更复杂,你可以使用标准的 Spring MVC 机制来配置 CORS 映射。