消息传递端点
消息收发终端节点
消息端点
本章的第一部分涵盖了一些背景理论,并揭示了相当多的关于驱动 Spring Integration 的各种消息传递组件的底层 API。 如果您想真正了解幕后发生的事情,这些信息可能会有所帮助。 但是,如果您想启动并运行各种元素的简化基于命名空间的配置,请随时跳到 Endpoint Namespace Support (终端节点命名空间支持)。
如概述中所述,消息终端节点负责将各种消息收发组件连接到通道。 在接下来的几章中,我们将介绍许多使用消息的不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面的 Message Channels 中所示,您可以向消息通道发送消息。 但是,接收稍微复杂一些。 主要原因是有两种类型的使用者:轮询使用者和事件驱动使用者。
在这两者中,事件驱动型消费者要简单得多。
无需管理和调度单独的 Poller 线程,它们本质上是具有回调方法的侦听器。
当连接到 Spring Integration 的可订阅消息通道之一时,这个简单的选项效果很好。
但是,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。
Spring 集成提供了两种不同的端点实现来容纳这两种类型的消费者。
因此,消费者自己只需要实现回调接口即可。
当需要轮询时,终端节点充当使用者实例的容器。
其好处类似于使用容器来托管消息驱动的 bean,但是,由于这些使用者是在ApplicationContext
,它更类似于 Spring 自己的MessageListener
器皿。
消息处理程序
Spring 集成的MessageHandler
interface 由框架中的许多组件实现。
换句话说,这不是公共 API 的一部分,您通常不会实现MessageHandler
径直。
尽管如此,消息使用者使用它来实际处理使用的消息,因此了解此策略接口确实有助于理解使用者的整体角色。
接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管它很简单,但这个接口为以下章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。 这些组件各自对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。 Spring 集成提供了两个端点实现,它们托管这些基于回调的处理程序,并让它们连接到消息通道。
事件驱动型消费者
因为它是两者中更简单的,所以我们首先介绍事件驱动的使用者终端节点。
您可能还记得,SubscribableChannel
接口提供了一个subscribe()
方法,并且该方法接受MessageHandler
参数(如SubscribableChannel
).
下面的清单显示了subscribe
方法:
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者,并且 Spring 集成提供的实现接受SubscribableChannel
以及MessageHandler
,如下例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring 集成还提供了一个PollingConsumer
,并且可以以相同的方式实例化,只是通道必须实现PollableChannel
,如下例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询使用者还有许多其他配置选项。 例如,trigger 是 required 属性。 以下示例显示如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
这PeriodicTrigger
通常使用简单的区间 (Duration
),但也支持initialDelay
属性和布尔值fixedRate
属性(默认值为false
— 即没有固定延迟)。
以下示例设置这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前面示例中的三个设置的结果是一个等待 5 秒,然后每秒触发一次的触发器。
这CronTrigger
需要有效的 cron 表达式。
有关详细信息,请参阅 Javadoc。
以下示例将新的CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是星期一到星期五每 10 秒触发一次的触发器。
除了触发器之外,您还可以指定其他两个与轮询相关的配置属性:maxMessagesPerPoll
和receiveTimeout
.
以下示例说明如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
这maxMessagesPerPoll
属性指定在给定轮询作中要接收的最大消息数。
这意味着 Poller 继续调用receive()
而无需等待,直到null
或达到最大值。
例如,如果 Poller 有一个 10 秒的间隔触发器,并且maxMessagesPerPoll
设置25
,并且它正在轮询队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。
它抓取 25 个,等待 10 秒,抓取下一个 25 个,依此类推。
如果maxMessagesPerPoll
配置了负值,则MessageSource.receive()
在单个轮询周期内调用,直到返回null
.
从版本 5.5 开始,一个0
值具有特殊含义 - 跳过MessageSource.receive()
调用,这可能被视为此轮询端点的暂停,直到maxMessagesPerPoll
稍后更改为 n 非零值,例如通过 Control Bus。
这receiveTimeout
property 指定 Poller 在调用 receive作时如果没有可用的消息时应等待的时间。
例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个选项的间隔触发为 5 秒,接收超时为 50 毫秒,而第二个选项的间隔触发为 50 毫秒,接收超时为 5 秒。
第一个 API 可能会比它到达通道晚 4950 毫秒收到一条消息(如果该消息在其 poll 调用之一返回后立即到达)。
另一方面,第二个配置永远不会错过超过 50 毫秒的消息。
区别在于第二个选项需要线程等待。
但是,因此,它可以更快地响应到达的消息。
这种技术称为 “长轮询”,可用于在轮询源上模拟事件驱动的行为。
轮询消费者也可以委托给 SpringTaskExecutor
,如下例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer
具有一个名为adviceChain
.
此属性允许您指定List
的 AOP 建议,用于处理其他横切关注点,包括交易。
这些建议围绕doPoll()
方法。
有关更深入的信息,请参阅 Endpoint Namespace Support 下的有关 AOP 建议链和事务支持的部分。
前面的示例显示了依赖项查找。
但是,请记住,这些使用者通常配置为 Spring bean 定义。
事实上, Spring 集成还提供了一个FactoryBean
叫ConsumerEndpointFactoryBean
,这将根据 Channel 的类型创建适当的 Consumer 类型。
此外,Spring 集成具有完整的 XML 名称空间支持,以进一步隐藏这些细节。
本指南介绍了基于命名空间的配置,因为介绍了每种组件类型。
许多MessageHandler implementations 可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
但是,何时发送回复消息以及发送多少回复消息取决于处理程序类型。
例如,聚合器等待大量消息到达,并且通常配置为拆分器的下游使用者,该拆分器可以为其处理的每条消息生成多个回复。
使用 namespace 配置时,您不需要严格了解所有详细信息。
但是,仍然值得知道的是,这些组件中的几个共享一个公共基类AbstractReplyProducingMessageHandler ,并且它提供了一个setOutputChannel(..) 方法。 |
终端节点命名空间支持
在本参考手册中,您可以找到终端节点元素的特定配置示例,例如 router、transformer、service-activator 等。
其中大多数都支持input-channel
属性,并且许多都支持output-channel
属性。
解析后,这些 endpoint 元素会生成PollingConsumer
或EventDrivenConsumer
,具体取决于input-channel
引用:PollableChannel
或SubscribableChannel
分别。
当通道是可轮询的时,轮询行为基于 endpoint 元素的poller
sub-element 及其属性。
以下清单列出了poller
:
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
id="" (6)
max-messages-per-poll="" (7)
receive-timeout="" (8)
ref="" (9)
task-executor="" (10)
time-unit="MILLISECONDS" (11)
trigger=""> (12)
<int:advice-chain /> (13)
<int:transactional /> (14)
</int:poller>
1 | 提供使用 Cron 表达式配置 Poller 的功能。
底层实现使用org.springframework.scheduling.support.CronTrigger .
如果设置了此属性,则不必指定以下任何属性:fixed-delay ,trigger ,fixed-rate 和ref . |
2 | 通过将此属性设置为true ,你可以只定义一个全局默认 Poller。
如果在应用程序上下文中定义了多个默认 Poller,则会引发异常。
连接到PollableChannel (PollingConsumer ) 或任何SourcePollingChannelAdapter 没有显式配置的 Poller 然后使用全局默认 Poller。
它默认为false .
自选。 |
3 | 标识如果此 Poller 的调用失败,则向其发送错误消息的通道。
要完全禁止异常,您可以提供对nullChannel .
自选。 |
4 | 固定延迟触发器使用PeriodicTrigger 在被窝里。
如果您不使用time-unit 属性,则指定的值以毫秒为单位表示。
如果设置了此属性,则不必指定以下任何属性:fixed-rate ,trigger ,cron 和ref . |
5 | 固定速率触发器使用PeriodicTrigger 在被窝里。
如果您不使用time-unit 属性,则指定的值以毫秒为单位表示。
如果设置了此属性,则不必指定以下任何属性:fixed-delay ,trigger ,cron 和ref . |
6 | 引用 poller 的基础 bean 定义的 ID,其类型为org.springframework.integration.scheduling.PollerMetadata .
这id 属性是顶级 Poller 元素所必需的,除非它是默认的 Poller (default="true" ). |
7 | 有关更多信息,请参阅Configuring An Inbound Channel Adapter。
如果未指定,则默认值取决于上下文。
如果您使用PollingConsumer ,则此属性默认为-1 .
但是,如果您使用SourcePollingChannelAdapter 这max-messages-per-poll 属性默认为1 .
自选。 |
8 | 在基础类上设置 ValuePollerMetadata .
如果未指定,则默认为 1000(毫秒)。
自选。 |
9 | Bean 引用另一个顶级 Poller 的 Pod 引用。
这ref 属性不得出现在顶级poller 元素。
但是,如果设置了此属性,则不必指定以下任何属性:fixed-rate ,trigger ,cron 和fixed-delay . |
10 | 提供引用自定义任务执行程序的功能。 有关更多信息,请参阅 TaskExecutor 支持。 自选。 |
11 | 此属性指定java.util.concurrent.TimeUnit enum 值org.springframework.scheduling.support.PeriodicTrigger .
因此,此属性只能与fixed-delay 或fixed-rate 属性。
如果与任一cron 或trigger reference 属性,则会导致失败。
支持的PeriodicTrigger 是毫秒。
因此,唯一可用的选项是毫秒和秒。
如果未提供此值,则任何fixed-delay 或fixed-rate value 被解释为毫秒。
基本上,此枚举为基于秒的 interval 触发器值提供了便利。
对于每小时、每天和每月设置,我们建议使用cron trigger 来代替。 |
12 | 对任何 Spring 配置的 bean 的引用,该 bean 实现了org.springframework.scheduling.Trigger 接口。
但是,如果设置了此属性,则不必指定以下任何属性:fixed-delay ,fixed-rate ,cron 和ref .
自选。 |
13 | 允许指定额外的 AOP 建议来处理其他横切关注点。 有关更多信息,请参阅 Transaction Support 。 自选。 |
14 | Poller 可以成为事务性的。 有关更多信息,请参阅 AOP Advice 链。 自选。 |
例子
可以按如下方式配置具有 1 秒间隔的简单基于间隔的 Poller :
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用fixed-rate
属性,您还可以使用fixed-delay
属性。
对于基于 Cron 表达式的轮询器,请使用cron
属性,如下例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是PollableChannel
,则需要 Poller 配置。
具体来说,如前所述,trigger
是PollingConsumer
类。
因此,如果省略poller
子元素,则可能会引发异常。
如果你试图在连接到不可轮询通道的元素上配置 Poller,也可能引发异常。
也可以创建顶级 poller,在这种情况下,只有一个ref
attribute 是必需的,如下例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
这ref 属性只允许在内部 Poller 定义上。
在顶级 Poller 上定义此属性会导致在应用程序上下文初始化期间引发配置异常。 |
全局默认轮询器
为了进一步简化配置,你可以定义一个全局默认 Poller。
XML DSL 中的单个顶级 Poller 组件可能具有default
属性设置为true
.
对于 Java 配置,一个PollerMetadata
bean 的PollerMetadata.DEFAULT_POLLER
在这种情况下,必须声明 name。
在这种情况下,任何具有PollableChannel
对于其 input channel,该通道在同一ApplicationContext
,并且没有明确配置poller
使用该默认值。
下面的示例展示了这样的 poller 和使用它的转换器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事务支持
Spring 集成还为 pollers 提供了事务支持,以便每个接收和转发作都可以作为原子工作单元执行。
要为 Poller 配置事务,请添加<transactional/>
sub-元素。
以下示例显示了可用属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关更多信息,请参阅 轮询器事务支持。
AOP 建议链
由于 Spring 事务支持依赖于代理机制,因此TransactionInterceptor
(AOP Advice) 处理由 Poller 启动的消息流的事务性行为时,有时必须提供额外的建议来处理与 Poller 关联的其他横切行为。
为此,poller
定义了一个advice-chain
元素,该元素允许您在实现MethodInterceptor
接口。
以下示例演示如何定义advice-chain
对于poller
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实施MethodInterceptor
接口,请参阅 Spring Framework Reference Guide 的 AOP 部分。
建议链也可以应用于没有任何事务配置的 Poller,从而增强 Poller 启动的消息流的行为。
使用通知链时,<transactional/> 不能指定 child 元素。
相反,请声明<tx:advice/> bean 并将其添加到<advice-chain/> .
有关完整的配置详细信息,请参阅 Poller Transaction Support 。 |
TaskExecutor 支持
轮询线程可以由 Spring 的TaskExecutor
抽象化。
这将为一个终端节点或一组终端节点启用并发。
从 Spring 3.0 开始,核心 Spring Framework 有一个task
namespace 及其<executor/>
元素支持创建简单的线程池执行程序。
该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。
配置线程池执行程序可以对终端节点在负载下的性能产生重大影响。
这些设置适用于每个终端节点,因为终端节点的性能是要考虑的主要因素之一(另一个主要因素是终端节点订阅的通道上的预期卷)。
要为配置了 XML 命名空间支持的轮询终端节点启用并发,请提供task-executor
引用其<poller/>
元素,然后提供以下示例中所示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您不提供 task-executor,则在调用者的线程中调用使用者的处理程序。
请注意,调用方通常是默认的TaskScheduler
(请参见配置 Task Scheduler)。
您还应该记住,task-executor
属性可以提供对 Spring 的TaskExecutor
接口。
这executor
元素是为了方便起见而提供的。
如前面在轮询使用者的 background 部分中提到的,您还可以以模拟事件驱动行为的方式配置轮询使用者。
在触发器中使用较长的接收超时和较短的间隔,您可以确保对到达的消息做出非常及时的反应,即使在轮询的消息源上也是如此。
请注意,这仅适用于具有超时的阻塞 wait 调用的源。
例如,文件 poller 不会阻塞。
每receive()
call 会立即返回,并且要么包含新文件,要么不包含新文件。
因此,即使 Poller 包含长receive-timeout
,该值永远不会在这种情况下使用。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。
以下示例显示了轮询使用者如何几乎即时地接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)颠簸、无限 while 循环那样多的 CPU 资源使用。
在运行时更改轮询率
当使用fixed-delay
或fixed-rate
属性,则默认实现使用PeriodicTrigger
实例。
这PeriodicTrigger
是核心 Spring Framework 的一部分。
它仅接受 interval 作为构造函数参数。
因此,无法在运行时更改它。
但是,您可以定义自己的org.springframework.scheduling.Trigger
接口。
您甚至可以使用PeriodicTrigger
作为起点。
然后,您可以为间隔 (period) 添加 setter,甚至可以在触发器本身中嵌入自己的限制逻辑。
这period
属性与每次对nextExecutionTime
以安排下一次轮询。
要在 Poller 中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用trigger
属性,该属性引用自定义触发器 Bean 实例。
现在,您可以获取对触发器 Bean 的引用并更改轮询之间的轮询间隔。
有关示例,请参阅 Spring Integration Samples 项目。
它包含一个名为dynamic-poller
,它使用自定义触发器并演示在运行时更改轮询间隔的功能。
该示例提供了一个自定义触发器,用于实现org.springframework.scheduling.Trigger
接口。
该示例的触发器基于 Spring 的PeriodicTrigger
实现。
但是,自定义触发器的字段不是最终的,并且属性具有显式 getter 和 setter,允许您在运行时动态更改轮询周期。
不过,请务必注意,由于 Trigger 方法是nextExecutionTime() ,则根据现有配置,对动态触发器的任何更改在下次轮询之前不会生效。
无法强制触发器在当前配置的下一次执行时间之前触发。 |
负载类型转换
在本参考手册中,您还可以看到接受消息或任何任意Object
作为输入参数。
在Object
,这样的参数被映射到消息有效负载或有效负载或 Headers 的一部分(当使用 Spring 表达式语言时)。
但是,端点方法的 input 参数类型有时与有效负载或其部分的类型不匹配。
在此方案中,我们需要执行类型转换。
Spring 集成提供了一种注册类型转换器(通过使用 SpringConversionService
) 在其自己的名为integrationConversionService
.
一旦使用 Spring 集成基础结构定义了第一个转换器,就会自动创建该 bean。
要注册转换器,您可以实现org.springframework.core.convert.converter.Converter
,org.springframework.core.convert.converter.GenericConverter
或org.springframework.core.convert.converter.ConverterFactory
.
这Converter
implementation 是最简单的,并且从单一类型转换为另一种类型。
对于更复杂的作,例如转换为类层次结构,您可以实现GenericConverter
可能还有一个ConditionalConverter
.
这些选项允许您完全访问from
和to
类型描述符,支持复杂的转换。
例如,如果你有一个名为Something
即转化的目标(parameter type、channel 数据类型等),您有两个具体的实现,称为Thing1
和Thing
,并且您希望根据输入类型转换为其中一种,则GenericConverter
会很合适。
有关更多信息,请参阅以下接口的 Javadoc:
实现转换器后,可以使用方便的命名空间支持注册它,如下例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,你可以使用内部 Bean,如下例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring 集成 4.0 开始,你可以使用 Comments 来创建前面的配置,如下例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您也可以使用@Configuration
annotation 中,如下例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
在配置应用程序上下文时,Spring Framework 允许您添加 相比之下, 但是,如果您确实想使用 Spring
在这种情况下,由 |
内容类型转换
从版本 5.0 开始,默认情况下,方法调用机制基于org.springframework.messaging.handler.invocation.InvocableHandlerMethod
基础设施。
其HandlerMethodArgumentResolver
实现(例如PayloadArgumentResolver
和MessageMethodArgumentResolver
) 可以使用MessageConverter
abstraction 来转换传入的payload
设置为目标方法参数类型。
转换可以基于contentType
消息标头。
为此, Spring 集成提供了ConfigurableCompositeMessageConverter
,它委托给要调用的已注册转换器列表,直到其中一个转换器返回非 null 结果。
默认情况下,此转换器提供(按严格顺序):
-
MappingJackson2MessageConverter
如果 Jackson 处理器存在于 Classpath 中
请参阅 Javadoc(在前面的列表中链接)以了解有关其用途和适当性的更多信息contentType
值进行转化。
这ConfigurableCompositeMessageConverter
是因为它可以与任何其他MessageConverter
implementations,包括或排除前面提到的 default converters。
它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如下例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在 defaults 之前注册在 composite 中。
您也不能使用ConfigurableCompositeMessageConverter
但请提供您自己的MessageConverter
通过注册一个名称为integrationArgumentResolverMessageConverter
(通过设置IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
属性)。
这MessageConverter 基于 -based(包括contentType header) 转换不可用。
在这种情况下,只有上面 Payload Type Conversion 中提到的常规类到类转换可用。 |
异步轮询
如果希望轮询是异步的,Poller 可以选择指定task-executor
属性,该属性指向任何TaskExecutor
bean(Spring 3.0 通过task
命名空间)。
但是,在使用TaskExecutor
.
问题在于有两个配置,即 Poller 和TaskExecutor
.
他们必须彼此协调一致。
否则,您最终可能会造成人为的内存泄漏。
请考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置演示了 Out-of-tune 配置。
默认情况下,任务执行程序具有无界任务队列。 即使所有线程都被阻塞,Poller 也会继续调度新任务,等待新消息到达或超时过期。 假设有 20 个线程执行任务,超时时间为 5 秒,因此它们以每秒 4 个的速率执行。 但是,新任务以每秒 20 个的速率调度,因此任务执行程序中的内部队列以每秒 16 个的速度增长(当进程空闲时),因此我们存在内存泄漏。
处理此问题的方法之一是将queue-capacity
任务执行程序的属性。
即使 0 也是一个合理的值。
您还可以通过设置设置rejection-policy
Task Executor 的属性(例如,更改为DISCARD
).
换句话说,在配置时必须了解某些细节TaskExecutor
.
有关该主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”。
端点内部 Bean
许多端点是复合 bean。
这包括所有使用者和所有轮询的入站通道适配器。
使用者(轮询或事件驱动)委托给MessageHandler
.
轮询的适配器通过委托给MessageSource
.
通常,获取对委托 Bean 的引用很有用,这可能是为了在运行时更改配置或用于测试。
这些 bean 可以从ApplicationContext
具有众所周知的名字。MessageHandler
实例使用类似于someConsumer.handler
(其中 'consumer' 是端点的id
属性)。MessageSource
实例使用 Bean ID 注册,类似于somePolledAdapter.source
,其中 'somePolledAdapter' 是适配器的 ID。
上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如下例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 bean 的处理方式与声明的任何内部 bean 相同,并且未在应用程序上下文中注册。
如果您希望以其他方式访问此 bean,请在顶层使用id
并使用ref
属性。
有关更多信息,请参阅 Spring 文档。
终端节点角色
从版本 4.2 开始,可以将终端节点分配给角色。
角色允许将终端节点作为一个组来启动和停止。
这在使用 leadership election 时特别有用,其中可以分别在授予或撤销 leadership 时启动或停止一组端点。
为此,框架注册了一个SmartLifecycleRoleController
bean 中的名称为IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER
.
每当需要控制生命周期时,都可以注入这个 bean 或@Autowired
:
<bean class="com.some.project.SomeLifecycleControl">
<property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>
您可以使用 XML、Java 配置或以编程方式将终端节点分配给角色。 以下示例演示如何使用 XML 配置终结点角色:
<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
auto-startup="false">
<int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>
以下示例说明如何为在 Java 中创建的 bean 配置端点角色:
@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
return // some MessageHandler
}
以下示例演示如何在 Java 中的方法上配置终结点角色:
@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
return payload.toUpperCase();
}
以下示例显示如何使用SmartLifecycleRoleController
在 Java 中:
@Autowired
private SmartLifecycleRoleController roleController;
...
this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...
以下示例演示如何使用IntegrationFlow
在 Java 中:
IntegrationFlow flow -> flow
.handle(..., e -> e.role("cluster"));
每个作都会将终端节点添加到cluster
角色。
调用roleController.startLifecyclesInRole("cluster")
和相应的stop…
method 启动和停止端点。
实现SmartLifecycle 可以通过编程方式添加 — 而不仅仅是 endpoints。 |
这SmartLifecycleRoleController
实现ApplicationListener<AbstractLeaderEvent>
它会自动启动和停止其配置的SmartLifecycle
对象(当某些 bean 发布OnGrantedEvent
或OnRevokedEvent
)。
使用 leadership election 启动和停止组件时,请务必设置auto-startup XML 属性 (autoStartup Bean 属性)更改为false ,以便应用程序上下文不会在上下文初始化期间启动组件。 |
从版本 4.3.8 开始,SmartLifecycleRoleController
提供了几种状态方法:
public Collection<String> getRoles() (1)
public boolean allEndpointsRunning(String role) (2)
public boolean noEndpointsRunning(String role) (3)
public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
1 | 返回正在管理的角色的列表。 |
2 | 返回true 如果角色中的所有终端节点都在运行。 |
3 | 返回true 如果角色中的任何终端节点都没有运行。 |
4 | 返回component name : running status .
组件名称通常是 bean 名称。 |
领导层活动处理
终端节点组可以根据授予或撤销的领导权分别启动和停止。 这在共享资源必须仅由单个实例使用的集群场景中非常有用。 这方面的一个例子是轮询共享目录的文件入站通道适配器。 (请参阅读取文件)。
为了参与领导者选举并在当选领导者、撤销领导者或未能获得成为领导者的资源时收到通知,应用程序会在应用程序上下文中创建一个称为“领导者发起方”的组件。
通常,leader 发起方是SmartLifecycle
,因此它会在上下文启动时启动(可选),然后在领导层发生变化时发布通知。
您还可以通过设置publishFailedEvents
自true
(从版本 5.0 开始),用于您希望在发生故障时采取特定作的情况。
按照惯例,您应该提供Candidate
接收回调。
您还可以通过Context
对象。
您的代码还可以监听o.s.i.leader.event.AbstractLeaderEvent
实例(OnGrantedEvent
和OnRevokedEvent
) 并相应地响应(例如,通过使用SmartLifecycleRoleController
).
这些事件包含对Context
对象。
下面的清单显示了Context
接口:
public interface Context {
boolean isLeader();
void yield();
String getRole();
}
从版本 5.0.6 开始,上下文提供对应聘者角色的引用。
Spring 集成提供了 leader 发起方的基本实现,该实现基于LockRegistry
抽象化。
要使用它,你需要创建一个实例作为 Bean,如下例所示:
@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
return new LockRegistryLeaderInitiator(locks);
}
如果 lock registry 正确实现,则最多只有一个 leader。
如果 lock registry 还提供引发异常的锁(理想情况下,InterruptedException
),则无领导时段的持续时间可以尽可能短,具体取决于锁定实施中的固有延迟所允许的最短。
默认情况下,busyWaitMillis
属性添加了一些额外的延迟,以防止在锁不完美(更常见)的情况下出现 CPU 匮乏,并且只有在您尝试再次获取锁时才知道它们已过期。
有关领导层选举和使用 Zookeeper 的事件的更多信息,请参阅 Zookeeper 领导层事件处理。 有关领导层选举和使用 Hazelcast 的事件的更多信息,请参阅 Hazelcast 领导层事件处理。
消息网关
网关隐藏了 Spring 集成提供的消息传递 API。 它允许你的应用程序的业务逻辑不知道 Spring 集成 API。 通过使用通用 Gateway,您的代码仅与一个简单的接互。
输入GatewayProxyFactoryBean
如前所述,不依赖于 Spring 集成 API (包括 gateway 类)就太好了。
因此, Spring 集成提供了GatewayProxyFactoryBean
,它为任何接口生成代理,并在内部调用如下所示的网关方法。
通过使用依赖关系注入,您可以向业务方法公开接口。
下面的示例展示了一个可用于与 Spring 集成交互的接口:
package org.cafeteria;
public interface Cafe {
void placeOrder(Order order);
}
Gateway XML 命名空间支持
还提供了命名空间支持。 它允许您将接口配置为服务,如下例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
定义此配置后,cafeService
现在可以注入到其他 bean 中,并且调用该代理实例的Cafe
接口不知道 Spring 集成 API。
请参阅“示例”附录,了解使用gateway
元素(在 Cafe 演示中)。
上述配置中的默认值将应用于网关接口上的所有方法。 如果未指定回复超时,则调用线程将无限期等待回复。 请参阅没有响应到达时的网关行为。
可以覆盖单个方法的默认值。 请参阅带有注释和 XML 的网关配置。
设置默认回复通道
通常,您无需指定default-reply-channel
,因为 Gateway 会自动创建一个临时的匿名回复通道,并在其中侦听回复。
但是,某些情况可能会提示您定义default-reply-channel
(或reply-channel
使用适配器网关,例如 HTTP、JMS 等)。
对于一些背景,我们简要讨论了 gateway 的一些内部工作原理。
网关创建一个临时的点对点回复通道。
它是匿名的,并添加到消息标头中,名称为replyChannel
.
当提供显式的default-reply-channel
(reply-channel
使用远程适配器网关),您可以指向 publish-subscribe 通道,之所以这样命名,是因为您可以向其添加多个订阅者。
在内部, Spring 集成在临时replyChannel
和显式定义的default-reply-channel
.
假设您希望您的回复不仅发送到网关,还发送到其他某个使用者。 在这种情况下,您需要两样东西:
-
您可以订阅的命名频道
-
该通道将成为 publish-subscribe-channel
网关使用的默认策略不能满足这些需求,因为添加到 Headers 的回复通道是匿名的和点对点的。
这意味着其他订户无法获得该消息的句柄,即使可以,该通道也具有点对点行为,因此只有一个订户可以获取该消息。
通过定义default-reply-channel
您可以指向您选择的频道。
在本例中,这是一个publish-subscribe-channel
.
网关会创建一个桥,从它到存储在 header 中的临时匿名回复通道。
您可能还希望显式提供一个回复通道,以便通过侦听器(例如,wiretap)进行监视或审计。 要配置通道拦截器,您需要一个命名通道。
从版本 5.4 开始,当网关方法返回类型为void ,框架会填充replyChannel header 作为nullChannel bean 引用(如果未明确提供此类标头)。
这允许丢弃来自下游流的任何可能的回复,从而满足单向网关协定。 |
带有注释和 XML 的网关配置
请考虑以下示例,该示例在前面的Cafe
interface 示例,方法是添加@Gateway
注解:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
这@Header
annotation 允许您添加解释为消息标头的值,如下例所示:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢使用 XML 方法来配置网关方法,则可以添加method
元素添加到网关配置中,如下例所示:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每个方法调用提供单独的标头。
如果要设置的 Headers 本质上是静态的,并且您不想通过使用@Header
附注。
例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。
通过评估调用的网关方法来确定请求的类型,尽管可能,但会违反关注点分离范式(该方法是一个 Java 工件)。
但是,在消息标头中表达您的意图(元信息)在消息传递架构中是很自然的。
以下示例演示如何为两种方法中的每一种方法添加不同的消息标头:
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前面的示例中,根据网关的方法为 'RESPONSE_TYPE' 标头设置了不同的值。
例如,如果您指定requestChannel 在<int:method/> 以及在@Gateway annotation 时,annotation 值优先。 |
如果在 XML 中指定了无参数网关,并且接口方法同时具有@Payload 和@Gateway 注解(带有payloadExpression 或payload-expression 在<int:method/> 元素)、@Payload 值被忽略。 |
表达式和 “global” 标头
这<header/>
元件支架expression
作为value
.
计算 SPEL 表达式以确定 header 的值。
从版本 5.2 开始,#root
object 是MethodArgsHolder
跟getMethod()
和getArgs()
访问。
例如,如果你希望对简单的方法名称进行路由,你可以添加一个带有以下表达式的 Headers:method.name
.
这java.reflect.Method 不可序列化。
表达式为method 如果稍后序列化消息,则会丢失。
因此,您可能希望使用method.name 或method.toString() 在那些情况下。
这toString() method 提供了一个String 方法的表示形式,包括 parameter 和 return 类型。 |
从 3.0 版本开始,<default-header/>
元素以将 Headers 添加到网关生成的所有消息中,而不管调用的方法如何。
为方法定义的特定标头优先于默认标头。
在此处为方法定义的特定标头将覆盖任何@Header
注释。
但是,默认标头不会覆盖任何@Header
注释。
网关现在还支持default-payload-expression
,该函数适用于所有方法(除非被覆盖)。
将方法参数映射到消息
使用上一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和 Headers)。 如果未使用显式配置,则使用某些约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效负载,哪些参数应该映射到 headers。 请考虑以下示例:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是Map
),第二个参数的内容变为 headers。
在第二种情况下(或第一种情况下,当参数 for 参数thing1
是一个Map
),框架无法确定哪个参数应该是有效负载。
因此,映射失败。
这通常可以使用payload-expression
一个@Payload
注释或@Headers
注解。
或者(每当约定被打破时),您可以承担将方法调用映射到消息的全部责任。
为此,请实现MethodArgsMessageMapper
并将其提供给<gateway/>
通过使用mapper
属性。
映射器将MethodArgsHolder
,这是一个简单的类,它将java.reflect.Method
instance 和Object[]
包含参数。
提供自定义映射器时,default-payload-expression
attribute 和<default-header/>
元素。
同样,payload-expression
attribute 和<header/>
元素不允许在任何<method/>
元素。
映射方法参数
以下示例显示了如何将方法参数映射到消息,并显示了无效配置的一些示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | 请注意,在此示例中,SPEL 变量#this 引用参数 — 在本例中,s . |
XML 等效项看起来略有不同,因为没有#this
method 参数的 context 获取。
但是,表达式可以使用args
属性的MethodArgsHolder
root 对象(有关更多信息,请参阅表达式和“全局”标头),如下例所示:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
注解
从版本 4.0 开始,网关服务接口可以用@MessagingGateway
注解,而不是要求定义<gateway />
xml 元素进行配置。
以下一对示例比较了配置同一网关的两种方法:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring 集成在组件扫描期间发现这些注释时,它会创建proxy 实施。
要执行此扫描并注册BeanDefinition 在应用程序上下文中,添加@IntegrationComponentScan 注解添加到@Configuration 类。
标准@ComponentScan infrastructure 不处理接口。
因此,我们引入了@IntegrationComponentScan 逻辑来查找@MessagingGateway 接口和寄存器上的注释GatewayProxyFactoryBean 实例。
另请参阅 注释支持。 |
与@MessagingGateway
注解中,您可以使用@Profile
注解以避免创建 Bean(如果此类配置文件未处于活动状态)。
从版本 6.0 开始,带有@MessagingGateway
也可以标有@Primary
对于任何 Spring 都尽可能地进行相应配置逻辑的注释@Component
定义。
从版本 6.0 开始,@MessagingGateway
接口可用于标准 Spring@Import
配置。
这可以用作@IntegrationComponentScan
或手动AnnotationGatewayProxyFactoryBean
bean 定义。
这@MessagingGateway
使用@MessageEndpoint
自版本6.0
和name()
属性本质上是别名为@Compnent.value()
.
这样,网关代理的 bean 名称生成策略将与扫描和导入组件的标准 Spring 注释配置重新保持一致。
默认的AnnotationBeanNameGenerator
可以通过AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
或作为@IntegrationComponentScan.nameGenerator()
属性。
如果您没有 XML 配置,则@EnableIntegration 至少需要一个 Comments@Configuration 类。
看Configuration 和@EnableIntegration 了解更多信息。 |
调用无参数方法
当在没有任何参数的 Gateway 接口上调用方法时,默认行为是接收Message
从PollableChannel
.
但是,有时您可能希望触发无参数方法,以便可以与下游不需要用户提供的参数的其他组件进行交互,例如触发无参数 SQL 调用或存储过程。
要实现发送和接收语义,您必须提供有效负载。
要生成有效负载,接口上的方法参数不是必需的。
您可以使用@Payload
annotation 或payload-expression
XML 中method
元素。
以下列表包括有效负载的几个示例:
-
文本字符串
-
#gatewayMethod.name
-
新的 java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例演示如何使用@Payload
注解:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
您还可以使用@Gateway
注解。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个注解都存在(并且payloadExpression 提供)@Gateway 赢了。 |
另请参阅使用注释和 XML 的网关配置。
如果方法没有参数和返回值,但包含有效负载表达式,则将其视为仅发送作。
调用default
方法
网关代理的接口可能具有default
方法,从版本 5.3 开始,框架会注入一个DefaultMethodInvokingMethodInterceptor
导入到代理中调用default
方法使用java.lang.invoke.MethodHandle
方法而不是代理。
JDK 中的接口(例如java.util.function.Function
仍然可用于网关代理,但是它们的default
方法无法调用,因为MethodHandles.Lookup
针对 JDK 类的实例化。
这些方法也可以使用显式@Gateway
注解,或者proxyDefaultMethods
在@MessagingGateway
annotation 或<gateway>
XML 组件。
错误处理
网关调用可能会导致错误。 默认情况下,在网关的方法调用时,下游发生的任何错误都会“按原样”重新引发。 例如,请考虑以下简单流程:
gateway -> service-activator
如果服务激活器调用的服务抛出MyException
(例如),框架会将其包装在MessagingException
并将传递给 Service Activator 的消息附加到failedMessage
财产。
因此,框架执行的任何日志记录都具有完整的失败上下文。
默认情况下,当网关捕获到异常时,MyException
被解包并抛给调用方。
您可以配置throws
子句来匹配原因链中的特定异常类型。
例如,如果您想捕获整个MessagingException
有了 Reason of downstream error 的所有消息收发信息,您应该有一个类似于以下内容的网关方法:
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,因此您可能不希望将调用方暴露给消息传递基础设施。
如果您的网关方法没有throws
子句中,网关会遍历 Cause Tree,查找RuntimeException
那不是MessagingException
.
如果未找到,框架将抛出MessagingException
.
如果MyException
在前面的讨论中,有一个原因为SomeOtherException
和您的方法throws SomeOtherException
,网关会进一步解包该 API 并将其抛给调用方。
当网关声明时没有service-interface
、内部框架界面RequestReplyExchanger
被使用。
请考虑以下示例:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在 5.0 版本之前,此exchange
方法没有throws
子句,因此,异常被解包。
如果你使用这个接口,并且想要恢复之前的 unwrap 行为,请使用自定义的service-interface
或访问cause
的MessagingException
你自己。
但是,您可能希望记录错误而不是传播错误,或者您可能希望将异常视为有效回复(通过将其映射到符合调用方理解的某个“错误消息”协定的消息)。
为了实现这一点,网关通过包含对error-channel
属性。
在以下示例中,“transformer”创建一个回复Message
从Exception
:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
这exceptionTransformer
可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。
这将成为发送回给调用方的有效负载。
如有必要,您可以在这样的 “错误流” 中做很多更复杂的事情。
它可能涉及路由器(包括 Spring Integration 的ErrorMessageExceptionTypeRouter
)、过滤器等。
然而,大多数时候,一个简单的 “transformer” 就足够了。
或者,您可能希望仅记录异常(或将其异步发送到某个位置)。
如果您提供单向流,则不会将任何内容发送回调用方。
如果要完全禁止显示异常,则可以提供对全局nullChannel
(本质上是一个/dev/null
方法)。
最后,如上所述,如果没有error-channel
,则异常会照常传播。
当您使用@MessagingGateway
注释(请参阅
),您可以使用@MessagingGateway
AnnotationerrorChannel
属性。
从版本 5.0 开始,当您使用带有void
return 类型(单向流)时,该error-channel
reference (如果提供) 填充到标准errorChannel
标头。
此功能允许下游异步流,基于标准ExecutorChannel
配置(或QueueChannel
) 覆盖默认的全局变量errorChannel
异常发送行为。
以前,您必须手动指定errorChannel
标头,其中@GatewayHeader
annotation 或<header>
元素。
这error-channel
的 属性被忽略void
方法。
相反,错误消息被发送到默认的errorChannel
.
通过简单的 POJI 网关公开消息传递系统是有好处的,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事项。
我们希望我们的 Java 方法尽快返回,而不是在调用者等待它返回时无限期挂起(无论是 void、返回值还是引发的 Exception)。
当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。
这意味着由网关启动的消息有可能被过滤器丢弃,并且永远不会到达负责生成回复的组件。
某些服务激活器方法可能会导致异常,因此不提供回复(因为我们不生成 null 消息)。
换句话说,多种情况都可能导致回复消息永远不会出现。
这在消息传递系统中是非常自然的。
但是,请考虑对网关方法的含义。网关的方法输入参数被合并到消息中并发送到下游。
回复消息将转换为网关方法的返回值。
因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。
否则,您的网关方法可能永远不会返回并无限期挂起。
处理这种情况的一种方法是使用异步网关(本节稍后将介绍)。
另一种处理方法是显式设置reply-timeout 属性。
这样,网关的挂起时间不会超过由reply-timeout 如果超时已过,则返回 'null'。
最后,你可能要考虑在服务激活器上设置下游标志,例如 'requires-reply' 或过滤器上的 'throw-exceptions-on-rejection'。本章的最后一节将更详细地讨论这些选项。 |
如果下游流返回ErrorMessage 其payload (一个Throwable ) 被视为常规下游错误。
如果存在error-channel 配置,则会将其发送到 Error 流。
否则,有效负载将抛给网关的调用方。
同样,如果error-channel 返回一个ErrorMessage ,则其有效负载将抛出给调用方。
这同样适用于带有Throwable 有效载荷。
这在异步情况下非常有用,当您需要将Exception 直接发送给调用方。
为此,您可以返回Exception (作为reply 来自某些服务)或扔掉它。
通常,即使使用异步流,框架也会负责将下游流引发的异常传播回网关。
TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。
它通过使用aggregator 跟group-timeout (请参阅 Aggregator 和 Group Timeout )和MessagingTimeoutException reply on the discard 流。 |
网关超时
网关有两个超时属性:requestTimeout
和replyTimeout
.
仅当通道可以阻塞(例如,有界的QueueChannel
那是满的)。
这replyTimeout
value 是网关等待回复或返回的时间null
.
它默认为无穷大。
超时可以设置为网关上所有方法的默认值 (defaultRequestTimeout
和defaultReplyTimeout
) 或在MessagingGateway
interface 注解。
单个方法可以覆盖这些默认值(在<method/>
子元素)或@Gateway
注解。
从版本 5.0 开始,超时可以定义为表达式,如下例所示:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文有一个BeanResolver
(使用@someBean
引用其他 bean),并将args
array 属性#root
对象可用。
有关此根对象的更多信息,请参阅表达式和“全局”标头。
使用 XML 进行配置时,超时属性可以是长值或 SPEL 表达式,如下例所示:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一种模式,消息传递网关提供了一种很好的方法来隐藏特定于消息传递的代码,同时仍然公开消息传递系统的全部功能。
如前所述,GatewayProxyFactoryBean
提供了一种通过服务接口公开代理的便捷方法,使您可以基于 POJO 访问消息传递系统(基于您自己的域中的对象、原语/字符串或其他对象)。
但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。
由于消息传递系统本质上是异步的,因此您可能无法始终保证 “对于每个请求,总会有一个回复” 的合同。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否需要回复或回复到达需要多长时间时,它提供了一种方便的方式来启动流。
为了处理这些类型的场景, Spring 集成使用java.util.concurrent.Future
实例来支持异步网关。
在 XML 配置中,没有任何变化,您仍然以与定义常规网关相同的方式定义异步网关,如下例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(服务接口)略有不同,如下所示:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前面的示例所示,网关方法的返回类型是Future
.
什么时候GatewayProxyFactoryBean
看到 gateway 方法的 return type 是一个Future
,它会立即使用AsyncTaskExecutor
.
这就是差异的程度。
对此类方法的调用总是立即返回Future
实例。
然后,您可以与Future
按照自己的节奏获取结果、取消等。
此外,与Future
实例, 调用get()
可能会显示 Timeout、Execution 异常等。
以下示例演示如何使用Future
从异步网关返回:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅 Spring 集成示例中的 async-gateway 示例。
AsyncTaskExecutor
默认情况下,GatewayProxyFactoryBean
使用org.springframework.core.task.SimpleAsyncTaskExecutor
提交内部时AsyncInvocationTask
返回类型为Future
.
但是,async-executor
属性中的<gateway/>
元素的配置允许您提供对java.util.concurrent.Executor
在 Spring 应用程序上下文中可用。
的 (默认)SimpleAsyncTaskExecutor
同时支持Future
和CompletableFuture
return 类型。
看CompletableFuture
.
即使有默认的执行程序,提供外部执行程序通常也很有用,这样你就可以在日志中识别其线程(当使用 XML 时,线程名称基于执行程序的 Bean 名称),如下例所示:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您希望退回不同的Future
实现中,您可以提供自定义 Executor 或完全禁用 Executor 并返回Future
在 reply message payload 中。
要禁用执行程序,请将其设置为null
在GatewayProxyFactoryBean
(通过使用setAsyncTaskExecutor(null)
).
使用 XML 配置网关时,请使用async-executor=""
.
使用@MessagingGateway
annotation 中,请使用类似于以下内容的代码:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的 concreteFuture implementation 或配置的执行程序不支持的其他一些子接口,则流将在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。 |
CompletableFuture
从版本 4.2 开始,网关方法现在可以返回CompletableFuture<?>
.
返回此类型时有两种作模式:
-
当提供了异步执行程序且返回类型恰好为
CompletableFuture
(不是子类)时,框架会在 Executor 上运行任务,并立即返回CompletableFuture
给调用方。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
用于创造未来。 -
当异步执行程序显式设置为
null
返回类型为CompletableFuture
或者返回类型是CompletableFuture
,则在调用方的线程上调用流。 在这种情况下,下游流应返回CompletableFuture
的适当类型。
这org.springframework.util.concurrent.ListenableFuture 从 Spring Framework 开始已弃用6.0 .
现在建议迁移到CompletableFuture ,它提供类似的处理功能。 |
使用场景
在下面的场景中,调用方线程立即返回CompletableFuture<Invoice>
,当下游流回复网关(使用Invoice
对象)。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在下面的场景中,调用方线程返回一个CompletableFuture<Invoice>
当下游流将其作为回复的负载提供给网关时。
当发票准备好时,必须完成其他一些流程。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在下面的场景中,调用方线程返回一个CompletableFuture<Invoice>
当下游流将其作为回复的负载提供给网关时。
当发票准备好时,必须完成其他一些流程。
如果DEBUG
logging 后,会发出一个日志条目,指示异步执行程序不能用于此方案。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
实例可用于对回复执行其他作,如下例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
反应器Mono
从版本 5.0 开始,GatewayProxyFactoryBean
允许将 Project Reactor 与网关接口方法一起使用,使用Mono<T>
return 类型。
内部的AsyncInvocationTask
包装在Mono.fromCallable()
.
一个Mono
可用于稍后检索结果(类似于Future<?>
),或者您可以通过调用Consumer
当结果返回到网关时。
这Mono 不会立即被框架刷新。
因此,底层消息流不会在网关方法返回之前启动(就像使用Future<?> Executor 任务)。
流程在Mono 已订阅。
或者,Mono (作为“可组合项”)可能是 Reactor 流的一部分,当subscribe() 与整个Flux .
以下示例显示了如何使用 Project Reactor 创建网关: |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中,这样的网关可以用于处理Flux
数据:
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
使用 Project Reactor 的另一个示例是一个简单的回调场景,如下例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程继续,其中handleInvoice()
在流完成时被调用。
如需了解详情,另请参阅 Kotlin 协程。
返回异步类型的下游流
如AsyncTaskExecutor
部分,如果您希望某个下游组件返回带有异步有效负载 (Future
,Mono
等),则必须将异步执行程序显式设置为null
(或使用 XML 配置时)。
然后,在调用方线程上调用该流,稍后可以检索结果。""
异步void
返回类型
消息网关方法可以按如下方式声明:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但是下游异常不会传播回调用方。
为了确保下游流调用和异常传播到调用方的异步行为,从版本 6.0 开始,框架提供了对Future<Void>
和Mono<Void>
return 类型。
该用例类似于前面描述的普通 send-and-forget 行为void
return 类型,但不同的是,流执行是异步发生的,并且返回Future
(或Mono
) 以null
或根据send
作结果。
如果Future<Void> 是精确的下游流回复,则asyncExecutor 选项必须设置为 null (AnnotationConstants.NULL 对于@MessagingGateway configuration) 和send part 在 producer 线程上执行。
回复 1 取决于下游流配置。
这样,目标应用程序就可以生成Future<Void> 正确回复。
这Mono use-case 已经超出了框架线程控制范围,因此将asyncExecutor 设置为 null 没有意义。
那里Mono<Void> 作为 request-reply 网关作的结果,必须配置为Mono<?> 返回 Gateway 方法的类型。 |
未到达响应时的网关行为
如前所述,网关提供了一种通过 POJO 方法调用与消息传递系统交互的便捷方式。 但是,通常预期始终返回的典型方法调用(即使有 Exception)可能并不总是一对一地映射到消息交换(例如,回复消息可能未到达 — 相当于方法未返回)。
本节的其余部分介绍各种场景以及如何使网关的行为更具可预测性。
可以配置某些属性以使同步网关行为更具可预测性,但其中一些属性可能并不总是像您预期的那样工作。
其中之一是reply-timeout
(在方法级别或default-reply-timeout
在网关级别)。
我们检查reply-timeout
属性来了解它在各种场景中如何能够和不能影响同步网关的行为。
我们研究了单线程场景(下游的所有组件都通过直接通道连接)和多线程场景(例如,在下游的某个地方,你可能有一个打破单线程边界的 pollable 或 executor 通道)。
长时间运行的流程下游
- Sync Gateway,单线程
-
如果下游组件仍在运行(可能是由于无限循环或服务缓慢),则设置
reply-timeout
不起作用,并且 Gateway 方法调用不会返回,直到下游服务退出(通过返回或引发异常)。 - Sync Gateway,多线程
-
如果下游组件仍在多线程消息流中运行(可能是由于无限循环或服务缓慢),则设置
reply-timeout
通过允许网关方法调用在达到超时后返回来发挥作用,因为GatewayProxyFactoryBean
轮询 Reply 通道,等待消息,直到超时到期。 但是,如果在生成实际回复之前已达到超时,则可能导致网关方法返回 'null'。 您应该了解,回复消息(如果生成)是在网关方法调用可能返回后发送到回复通道的,因此您必须了解这一点并在设计流程时牢记这一点。
下游组件返回 'null'
- Sync Gateway — 单线程
-
如果下游组件返回 'null' 且 no
reply-timeout
已配置,则 Gateway 方法调用将无限期挂起,除非reply-timeout
已配置或requires-reply
属性,该属性已在可能返回 'null' 的下游组件(例如,服务激活器)上设置。 在这种情况下,将引发异常并将其传播到网关。 - Sync Gateway — 多线程
-
该行为与前一种情况相同。
下游组件返回签名为“void”,而网关方法签名为非 void
- Sync Gateway — 单线程
-
如果下游组件返回 'void' 且 no
reply-timeout
已配置,则网关方法调用将无限期挂起,除非reply-timeout
已配置。 - Sync Gateway — 多线程
-
该行为与前一种情况相同。
下游组件导致运行时异常
- Sync Gateway — 单线程
-
如果组件下游引发运行时异常,则异常将通过错误消息传播回网关并重新引发。
- Sync Gateway — 多线程
-
该行为与前一种情况相同。
您应该了解,默认情况下,reply-timeout 是无界的。
因此,如果您没有显式设置reply-timeout ,则您的网关方法调用可能会无限期挂起。
因此,为了确保您分析了流程,并且如果这些场景之一发生的可能性很小,您应该设置reply-timeout 属性设置为 “'safe'” 值。
更好的是,您可以将requires-reply 属性设置为 'true' 以确保及时响应,因为一旦下游组件在内部返回 null,就会引发异常。
但是,您还应该意识到,在某些情况下(请参阅第一个),其中reply-timeout 没有帮助。
这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。
如前所述,后一种情况是定义返回Future 实例。
然后,您可以保证收到该返回值,并且您可以对调用结果进行更精细的控制。
此外,在处理路由器时,您应该记住设置resolution-required 属性设置为 'true' 会导致路由器在无法解析特定通道时抛出异常。
同样,在处理 Filter 时,你可以设置throw-exception-on-rejection 属性。
在这两种情况下,生成的流的行为类似于它包含具有 'requires-reply' 属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。 |
reply-timeout 是无界的<gateway/> 元素(由GatewayProxyFactoryBean ).
用于外部集成的入站网关(WS、HTTP 等)与这些网关共享许多特征和属性。
但是,对于这些入站网关,默认的reply-timeout 是 1000 毫秒(1 秒)。
如果对另一个线程进行了下游异步切换,则可能需要增加此属性,以便在网关超时之前有足够的时间让流完成。 |
您应该了解,计时器在线程返回到网关时启动,即当流完成或将消息传递给另一个线程时。 此时,调用线程开始等待回复。 如果流是完全同步的,则回复将立即可用。 对于异步流,线程将等待 this time。 |
看IntegrationFlow
作为 网关在 Java DSL 一章中,了解通过IntegrationFlow
.
服务激活器
服务激活器是将任何 Spring 管理的对象连接到 Importing 通道的端点类型,以便它可以扮演服务的角色。
如果服务生成输出,则它还可能连接到输出通道。
或者,输出生成服务可能位于处理管道或消息流的末尾,在这种情况下,入站消息的replyChannel
header。
如果未定义输出通道,则这是默认行为。
与此处描述的大多数配置选项一样,相同的行为实际上适用于大多数其他组件。
配置 Service Activator
要创建服务激活器,请使用带有 'input-channel' 和 'ref' 属性的 'service-activator' 元素,如下例所示:
<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>
前面的配置从exampleHandler
满足消息收发要求之一,如下所示:
-
注解
@ServiceActivator
-
是
public
-
不返回
void
如果requiresReply == true
运行时调用的目标方法是通过每个请求消息的payload
type 或作为Message<?>
type (如果 Target 类上存在此类方法)。
从版本 5.0 开始,一个服务方法可以标有@org.springframework.integration.annotation.Default
作为所有不匹配 case 的回退。
当使用内容类型转换时,这在转换后调用目标方法时非常有用。
要委托给任何对象的显式定义方法,你可以添加method
属性,如下例所示:
<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>
在任一情况下,当 service 方法返回非 null 值时,终端节点都会尝试将回复消息发送到相应的回复通道。
要确定回复通道,它首先检查output-channel
在终端节点配置中提供,如下例所示:
<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
ref="somePojo" method="someMethod"/>
如果该方法返回 result 且没有output-channel
定义后,框架会检查请求消息的replyChannel
header 值。
如果该值可用,则它会检查其类型。
如果它是一个MessageChannel
,则回复消息将发送到该通道。
如果它是一个String
时,终端节点会尝试将通道名称解析为通道实例。
如果无法解析通道,则DestinationResolutionException
被抛出。
如果可以解决,则消息将发送到那里。
如果请求消息中没有replyChannel
header 和reply
object 是一个Message
其replyChannel
标头
这是 Spring Integration 中用于请求-回复消息传递的技术,它也是返回地址模式的一个例子。
如果您的方法返回结果,并且您希望丢弃该结果并结束流,则应配置output-channel
发送到NullChannel
.
为方便起见,框架注册了一个名称为nullChannel
.
有关更多信息,请参阅 特殊通道 。
服务激活器是生成回复消息不需要的组件之一。
如果您的方法返回null
或具有void
return 类型,则 Service Activator 会在方法调用后退出,没有任何信号。
此行为可以通过AbstractReplyProducingMessageHandler.requiresReply
选项,它也公开为requires-reply
使用 XML 命名空间进行配置时。
如果标志设置为true
并且该方法返回 null,则返回ReplyRequiredException
被抛出。
service 方法中的参数可以是 message 或任意类型。
如果是后者,则假定它是消息有效负载,该有效负载从消息中提取并注入到服务方法中。
我们通常推荐这种方法,因为它在使用 Spring Integration 时遵循并促进了 POJO 模型。
参数也可能具有@Header
或@Headers
注释,如注释支持中所述。
service 方法不需要具有任何参数,这意味着您可以实现事件样式的服务激活器(其中您只关心 service 方法的调用),而不必担心消息的内容。 将其视为 null JMS 消息。 此类实现的一个示例用例是 input 通道上存储的消息的简单计数器或监视器。 |
从版本 4.1 开始,框架会正确地将消息属性 (payload
和headers
) 添加到 Java 8Optional
POJO 方法参数,如下例所示:
public class MyBean {
public String computeValue(Optional<String> payload,
@Header(value="foo", required=false) String foo1,
@Header(value="foo") Optional<String> foo2) {
if (payload.isPresent()) {
String value = payload.get();
...
}
else {
...
}
}
}
我们通常建议使用ref
属性(如果自定义服务激活器处理程序实现可以在其他<service-activator>
定义。
但是,如果自定义服务激活器处理程序实现仅在<service-activator>
,您可以提供内部 Bean 定义,如下例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="someMethod">
<beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>
同时使用ref attribute 和内部处理程序定义位于同一<service-activator> 不允许配置,因为它会产生不明确的条件并导致引发异常。 |
如果ref 属性引用一个扩展AbstractMessageProducingHandler (例如框架本身提供的处理程序),通过将 output channel 直接注入到 handler 中来优化配置。
在这种情况下,每个ref 必须发送到单独的 bean 实例(或prototype -scoped bean)或使用内部的<bean/> 配置类型。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。 |
服务激活器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,服务激活器也可以从 SPEL 中受益。
例如,您可以调用任何 Bean 方法,而无需在ref
属性或将其作为内部 Bean 定义包含,如下所示:
<int:service-activator input-channel="in" output-channel="out"
expression="@accountService.processAccount(payload, headers.accountId)"/>
<bean id="accountService" class="thing1.thing2.Account"/>
在前面的配置中,不是使用ref
或者作为内部 Bean,我们使用 SpEL 的@beanId
表示法,并调用采用与 Message Payload 兼容的类型的方法。
我们还传递一个 header 值。
任何有效的 SPEL 表达式都可以根据消息中的任何内容进行评估。
对于简单的场景,如果所有 logic 都可以封装在这样的表达式中,则您的服务激活器不需要引用 bean,如下例所示:
<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>
在前面的配置中,我们的服务逻辑是将 payload 值乘以 2。 SpEL 让我们相对容易地处理它。
看Service Activator 和.handle()
方法在 Java DSL 一章中,了解有关配置 Service Activator 的更多信息。
异步服务激活器
服务激活器由调用线程调用。
如果输入通道是SubscribableChannel
或 Poller 线程的PollableChannel
.
如果服务返回CompletableFuture<?>
,默认作是将其作为发送到 output(或 reply)通道的消息的有效负载发送。
从版本 4.3 开始,您现在可以设置async
属性设置为true
(通过使用setAsync(true)
使用 Java 配置时)。
如果服务返回CompletableFuture<?>
当 this 时async
属性设置为true
,则调用线程将立即释放,回复消息将在完成 future 的线程(从您的服务中)发送。
这对于使用PollableChannel
,因为释放 Poller 线程以执行框架中的其他服务。
如果该服务使用Exception
,则发生正常的错误处理。
一ErrorMessage
发送到errorChannel
消息标头(如果存在)。
否则,将ErrorMessage
将发送到默认的errorChannel
(如果可用)。
Service Activator 和 Method 返回类型
service 方法可以返回任何类型,这些类型将成为回复消息有效负载。
在这种情况下,新的Message<?>
Object 并复制请求消息中的所有标头。
对于大多数 Spring 集成,其工作方式相同MessageHandler
实现,当交互基于 POJO 方法调用时。
一个完整的Message<?>
object 也可以从该方法返回。
但是,请记住,与 transformer 不同,对于 Service Activator,如果返回的消息中尚不存在 headers,则将通过从请求消息中复制 headers 来修改此消息。
因此,如果您的 method 参数是Message<?>
并且您复制了 Service 方法中的一些(但不是全部)现有 Headers,它们将重新出现在回复消息中。
从回复消息中删除 Headers 不是 Service Activator 的责任,遵循松散耦合的原则,最好添加一个HeaderFilter
在集成流中。
或者,可以使用 Transformer 代替 Service Activator,但在这种情况下,当返回完整的Message<?>
该方法完全负责消息,包括复制请求消息标头(如果需要)。
您必须确保重要的框架标头(例如replyChannel
,errorChannel
),如果存在,则必须保留。
延迟器
延迟器是一个简单的端点,它允许消息流延迟一定的时间间隔。
当消息延迟时,原始发件人不会阻止。
相反,延迟的消息是使用org.springframework.scheduling.TaskScheduler
在 delay 过后发送到 output 通道。
这种方法即使对于相当长的延迟也是可扩展的,因为它不会导致大量阻塞的发送方线程。
相反,在典型情况下,线程池用于实际执行释放消息。
本节包含配置延迟器的几个示例。
配置 Delayer
这<delayer>
元素用于延迟两个消息通道之间的消息流。
与其他终端节点一样,您可以提供 'input-channel' 和 'output-channel' 属性,但延迟器还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),用于确定每条消息应延迟的毫秒数。
以下示例将所有消息延迟 3 秒:
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果需要确定每条消息的延迟,还可以使用'expression'属性提供 SPEL 表达式,如以下表达式所示:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay("delayer.messageGroupId") {
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,仅当表达式的计算结果为给定入站消息的 null 时,三秒延迟才适用。
如果只想将延迟应用于具有表达式评估有效结果的消息,则可以使用 'default-delay'0
(默认值)。
对于延迟为0
(或更少),则消息会立即在调用线程上发送。
XML 解析器使用消息组 ID<beanName>.messageGroupId . |
延迟处理程序支持表示以毫秒为单位的间隔(任何Object 谁的toString() method 生成一个可以解析为Long ) 以及java.util.Date 表示绝对时间的实例。
在第一种情况下,毫秒从当前时间开始计算(例如,值5000 将消息从延迟器收到消息的时间起至少延迟 5 秒)。
使用Date 实例,则消息在Date 对象。
等于非正延迟或过去的 Date 的值不会导致延迟。
相反,它被直接发送到原始发送方线程上的 output 通道。
如果表达式计算结果不是Date ,并且不能解析为Long ,则默认延迟(如果有 — 默认值为0 ) 应用。 |
表达式计算可能会因各种原因(包括无效的表达式或其他条件)而引发计算异常。
默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且延迟器会回退到默认延迟(如果有)。
您可以通过设置ignore-expression-failures 属性。
默认情况下,此属性设置为true 延迟器行为如前所述。
但是,如果你不希望忽略表达式计算异常并将其抛给延迟器的调用者,请将ignore-expression-failures 属性设置为false . |
在前面的示例中,延迟表达式指定为
因此,如果可能会省略标头,并且你想要回退到默认延迟,则使用索引器语法而不是点属性访问器语法通常更有效(并且建议使用),因为检测 null 比捕获异常更快。 |
延迟器委托给 Spring 的TaskScheduler
抽象化。
延迟器使用的默认调度程序是ThreadPoolTaskScheduler
实例。
请参阅配置 Task Scheduler。
如果要委托给不同的 scheduler,可以通过 delayer 元素的 'scheduler' 属性提供引用,如下例所示:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果您配置外部ThreadPoolTaskScheduler 中,您可以设置waitForTasksToCompleteOnShutdown = true 在此属性上。
它允许在应用程序关闭时成功完成已经处于执行状态(释放消息)的 'delay' 任务。
在 Spring Integration 2.2 之前,这个属性在<delayer> 元素,因为DelayHandler 可以在后台创建自己的调度器。
从 2.2 开始,延迟器需要一个外部调度器实例,并且waitForTasksToCompleteOnShutdown 已删除。
您应该使用调度程序自己的配置。 |
ThreadPoolTaskScheduler 具有属性errorHandler ,它可以注入一些org.springframework.util.ErrorHandler .
此处理程序允许处理Exception 从发送延迟消息的计划任务的线程中。
默认情况下,它使用org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler ,您可以在日志中看到堆栈跟踪。
您可能需要考虑使用org.springframework.integration.channel.MessagePublishingErrorHandler ,它会发送一个ErrorMessage 转换为error-channel ,从失败邮件的标头或默认的error-channel .
此错误处理在事务回滚(如果存在)后执行。
请参阅 发布失败。 |
Delayer 和 Message Store
这DelayHandler
将延迟的消息持久化到提供的MessageStore
.
(“groupId”基于<delayer>
元素。
延迟消息将从MessageStore
通过紧接DelayHandler
将消息发送到output-channel
.
如果提供的MessageStore
是持久性的(例如JdbcMessageStore
),它提供了在应用程序关闭时不丢失消息的能力。
应用程序启动后,DelayHandler
从MessageStore
并根据消息的原始到达时间(如果延迟是数字)重新安排它们并带有延迟。
对于延迟标头为Date
那Date
在重新安排时使用。
如果延迟的消息保留在MessageStore
超过其 'delay' ,它会在启动后立即发送。
这<delayer>
可以使用两个互斥元素之一进行扩充:<transactional>
和<advice-chain>
.
这List
的 AOP 建议应用于代理的内部DelayHandler.ReleaseMessageHandler
,它负责在延迟后在Thread
的计划任务。
例如,当下游消息流抛出异常并且ReleaseMessageHandler
将回滚。
在这种情况下,延迟消息将保留在持久MessageStore
.
您可以使用任何自定义org.aopalliance.aop.Advice
在<advice-chain>
.
这<transactional>
元素定义一个只有事务性建议的简单通知链。
以下示例显示了advice-chain
在<delayer>
:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
这DelayHandler
可以导出为 JMXMBean
使用托管式作 (getDelayedMessageCount
和reschedulePersistedMessages
),这允许在运行时重新调度延迟的持久化消息 — 例如,如果TaskScheduler
之前已停止。
这些作可以通过Control Bus
命令,如下例所示:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅系统管理。 |
从版本 5.3.7 开始,如果在将消息存储到MessageStore
,则发布任务计划在TransactionSynchronization.afterCommit()
回调。
这对于防止争用情况是必要的,在这种情况下,计划的发布可能会在事务提交之前运行,并且找不到消息。
在这种情况下,消息将在延迟后或事务提交后(以较晚者为准)发布。
发布失败
从版本 5.0.8 开始,延迟器上有两个新属性:
-
maxAttempts
(默认 5) -
retryDelay
(默认 1 秒)
当消息被释放时,如果下游流失败,将在retryDelay
.
如果maxAttempts
,则消息将被丢弃(除非发布是事务性的,在这种情况下,消息将保留在存储中,但不会再计划发布,直到应用程序重新启动,或者reschedulePersistedMessages()
方法,如上所述)。
此外,您还可以配置delayedMessageErrorChannel
;当发布失败时,ErrorMessage
发送到该通道,但 exception 作为有效负载,并且具有originalMessage
财产。
这ErrorMessage
包含标头IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
包含当前计数。
如果错误流使用错误消息并正常退出,则不会执行进一步的作;如果发布是事务性的,则将提交事务并从存储中删除消息。
如果错误流引发异常,则将重试该版本,直到maxAttempts
如上所述。
脚本支持
Spring 集成 2.1 增加了对 Java 版本 6 中引入的 JSR223 Scripting for Java 规范的支持。 它允许您使用以任何受支持的语言(包括 Ruby、JRuby、Groovy 和 Kotlin)编写的脚本为各种集成组件提供逻辑,类似于 Spring 集成中使用 Spring 表达式语言 (SPEL) 的方式。 有关 JSR223 的更多信息,请参阅文档。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-scripting</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-scripting:6.0.9"
此外,您需要添加脚本引擎实现,例如 JRuby、Jython。
从版本 5.2 开始, Spring 集成提供了 Kotlin Jsr223 支持。 您需要将这些依赖项添加到您的项目中以使其正常工作:
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-script-util</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-compiler-embeddable</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-scripting-compiler-embeddable</artifactId>
<scope>runtime</scope>
</dependency>
runtime 'org.jetbrains.kotlin:kotlin-script-util'
runtime 'org.jetbrains.kotlin:kotlin-compiler-embeddable'
runtime 'org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable'
这KotlinScriptExecutor
由提供的kotlin
语言指示符或脚本文件附带.kts
外延。
第三方已经开发了各种 JSR223 语言实现。 特定实现与 Spring Integration 的兼容性取决于它与规范的一致性以及实现者对规范的解释。 |
如果您计划使用 Groovy 作为脚本语言,我们建议您使用 Spring-Integration 的 Groovy 支持,因为它提供了特定于 Groovy 的其他功能。 但是,本节也相关。 |
脚本配置
根据集成要求的复杂程度,脚本可以在 XML 配置中作为 CDATA 内联提供,也可以作为对包含脚本的 Spring 资源的引用提供。
为了启用脚本支持,Spring 集成定义了一个ScriptExecutingMessageProcessor
,它将消息负载绑定到名为payload
将邮件标头设置为headers
变量,两者都可在脚本执行上下文中访问。
您需要做的就是编写一个使用这些变量的脚本。
以下一对示例显示了创建筛选条件的示例配置:
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb"));
}
...
@Bean
public Resource scriptResource() {
return new ByteArrayResource("headers.type == 'good'".getBytes());
}
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy"));
}
<int:filter input-channel="referencedScriptInput">
<int-script:script location="some/path/to/ruby/script/RubyFilterTests.rb"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-script:script lang="groovy">
<![CDATA[
return payload == 'good'
]]>
</int-script:script>
</int:filter>
如前面的示例所示,脚本可以内联包含,也可以通过引用资源位置(通过使用location
属性)。
此外,lang
属性对应于语言名称(或其 JSR223 别名)。
其他支持脚本的 Spring 集成端点元素包括router
,service-activator
,transformer
和splitter
.
每种情况下的脚本配置都与上述相同(除了 endpoint 元素)。
脚本支持的另一个有用功能是能够更新(重新加载)脚本,而不必重新启动应用程序上下文。
为此,请指定refresh-check-delay
属性script
元素,如下例所示:
Scripts.processor(...).refreshCheckDelay(5000)
}
<int-script:script location="..." refresh-check-delay="5000"/>
在前面的示例中,每 5 秒检查一次脚本位置的更新。 如果脚本已更新,则自更新后 5 秒后发生的任何调用都会导致运行新脚本。
请考虑以下示例:
Scripts.processor(...).refreshCheckDelay(0)
}
<int-script:script location="..." refresh-check-delay="0"/>
在前面的示例中,一旦发生任何脚本修改,就会使用任何脚本修改更新上下文,从而为“实时”配置提供一种简单的机制。 任何负值都表示在初始化应用程序上下文后不会重新加载脚本。 这是默认行为。 以下示例显示了一个从不更新的脚本:
Scripts.processor(...).refreshCheckDelay(-1)
}
<int-script:script location="..." refresh-check-delay="-1"/>
内联脚本无法重新加载。 |
脚本变量绑定
需要变量绑定才能使脚本能够引用外部提供给脚本执行上下文的变量。
默认情况下,payload
和headers
用作绑定变量。
您可以使用<variable>
元素(或ScriptSpec.variables()
选项),如下例所示:
Scripts.processor("foo/bar/MyScript.py")
.variables(Map.of("var1", "thing1", "var2", "thing2", "date", date))
}
<script:script lang="py" location="foo/bar/MyScript.py">
<script:variable name="var1" value="thing1"/>
<script:variable name="var2" value="thing2"/>
<script:variable name="date" ref="date"/>
</script:script>
如前面的示例所示,您可以将脚本变量绑定到标量值或 Spring Bean 引用。
请注意,payload
和headers
仍作为绑定变量包含在内。
在 Spring Integration 3.0 中,除了variable
元素中,variables
属性。
此属性和variable
元素不是互斥的,您可以将它们组合在一个script
元件。
但是,无论变量在何处定义,变量都必须是唯一的。
此外,从 Spring Integration 3.0 开始,内联脚本也允许变量绑定,如下例所示:
<service-activator input-channel="input">
<script:script lang="ruby" variables="thing1=THING1, date-ref=dateBean">
<script:variable name="thing2" ref="thing2Bean"/>
<script:variable name="thing3" value="thing2"/>
<![CDATA[
payload.foo = thing1
payload.date = date
payload.bar = thing2
payload.baz = thing3
payload
]]>
</script:script>
</service-activator>
前面的示例显示了内联脚本、variable
元素和variables
属性。
这variables
attribute 包含一个逗号分隔的值,其中每个段都包含变量及其值的 '=' 分隔对。
变量名称可以后缀为-ref
,如date-ref
变量。
这意味着绑定变量的名称为date
,但该值是对dateBean
bean 中。
这在使用属性占位符配置或命令行参数时可能很有用。
如果您需要对变量的生成方式进行更多控制,则可以实现自己的 Java 类,该类使用ScriptVariableGenerator
策略,它由以下接口定义:
public interface ScriptVariableGenerator {
Map<String, Object> generateScriptVariables(Message<?> message);
}
此接口要求您实现generateScriptVariables(Message)
方法。
message 参数允许您访问消息有效负载和标头中的任何可用数据,返回值是Map
的绑定变量。
每次为消息执行脚本时,都会调用此方法。
以下示例显示如何提供ScriptVariableGenerator
并使用script-variable-generator
属性:
Scripts.processor("foo/bar/MyScript.groovy")
.variableGenerator(new foo.bar.MyScriptVariableGenerator())
}
<int-script:script location="foo/bar/MyScript.groovy"
script-variable-generator="variableGenerator"/>
<bean id="variableGenerator" class="foo.bar.MyScriptVariableGenerator"/>
如果script-variable-generator
,则脚本组件使用DefaultScriptVariableGenerator
,它会合并任何提供的<variable>
元素替换为payload
和headers
变量Message
在其generateScriptVariables(Message)
方法。
您不能同时提供script-variable-generator attribute 和<variable> 元素。
它们是互斥的。 |
GraalVM Polyglot
从版本 6.0 开始,框架提供了一个PolyglotScriptExecutor
它基于 GraalVM Polyglot API。
JavaScript 的 JSR223 引擎实现(单独从 Java 中删除)已被使用这个新的脚本执行程序所取代。
了解有关在 GraalVM 中启用 JavaScript 支持以及可以通过脚本变量传播哪些配置选项的更多信息。
默认情况下,框架将allowAllAccess
自true
在共享的 Polyglot 上Context
,它支持与主机 JVM 进行这种交互:
-
新线程的创建和使用。
-
对公共主机类的访问。
-
通过向类路径添加条目来加载新的主机类。
-
将新成员导出到多语言绑定中。
-
主机系统上不受限制的 IO作。
-
传递实验性选项。
-
创建和使用 New sub-process。
-
对进程环境变量的访问。
这可以通过 overloaded 进行自定义PolyglotScriptExecutor
构造函数,它接受org.graalvm.polyglot.Context.Builder
.
要启用此 JavaScript 支持,GraalVM 的js
组件安装,或者,当使用常规 JVM 时,org.graalvm.sdk:graal-sdk
和org.graalvm.js:js
必须包含依赖项。
Groovy 支持
在 Spring Integration 2.0 中,我们添加了 Groovy 支持,允许您使用 Groovy 脚本语言为各种集成组件提供逻辑,类似于 Spring 表达式语言 (SPEL) 支持路由、转换和其他集成问题的方式。 有关 Groovy 的更多信息,请参阅 Groovy 文档,您可以在项目网站上找到该文档。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-groovy</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-groovy:6.0.9"
此外,从 V6.0 开始,提供了用于集成流配置的 Groovy DSL。
Groovy 配置
在 Spring Integration 2.1 中,Groovy 支持的配置名称空间是 Spring Integration 的脚本支持的扩展,并共享脚本支持部分中详细描述的核心配置和行为。
尽管通用脚本支持很好地支持 Groovy 脚本,但 Groovy 支持提供了Groovy
configuration 命名空间,它由 Spring Framework 的org.springframework.scripting.groovy.GroovyScriptFactory
和相关组件,为使用 Groovy 提供扩展功能。
下面的清单显示了两个示例配置:
<int:filter input-channel="referencedScriptInput">
<int-groovy:script location="some/path/to/groovy/file/GroovyFilterTests.groovy"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-groovy:script><![CDATA[
return payload == 'good'
]]></int-groovy:script>
</int:filter>
如前面的示例所示,该配置看起来与常规脚本支持配置相同。
唯一的区别是使用 Groovy 命名空间,如int-groovy
namespace 前缀。
另请注意,lang
属性<script>
标记在此命名空间中无效。
Groovy 对象自定义
如果需要自定义 Groovy 对象本身(除了设置变量之外),则可以引用实现GroovyObjectCustomizer
通过使用customizer
属性。
例如,如果您想通过修改MetaClass
以及注册要在脚本中可用的函数。
以下示例显示了如何执行此作:
<int:service-activator input-channel="groovyChannel">
<int-groovy:script location="somewhere/SomeScript.groovy" customizer="groovyCustomizer"/>
</int:service-activator>
<beans:bean id="groovyCustomizer" class="org.something.MyGroovyObjectCustomizer"/>
设置自定义GroovyObjectCustomizer
与 不互斥<variable>
元素或script-variable-generator
属性。
也可以在定义内联脚本时提供。
Spring Integration 3.0 引入了variables
属性,该属性与variable
元素。
此外,groovy 脚本能够将变量解析为 Bean 中的BeanFactory
,如果 name 中没有提供绑定变量。
以下示例演示如何使用变量 (entityManager
):
<int-groovy:script>
<![CDATA[
entityManager.persist(payload)
payload
]]>
</int-groovy:script>
entityManager
必须是应用程序上下文中的适当 Bean。
有关<variable>
元素中,variables
属性和script-variable-generator
属性,请参阅脚本变量绑定。
Groovy Script 编译器定制
这@CompileStatic
hint 是最流行的 Groovy 编译器自定义选项。
它可以在类或方法级别使用。
有关更多信息,请参阅 Groovy 参考手册,特别是 @CompileStatic。
为了将此功能用于短脚本(在集成场景中),我们被迫将简单脚本更改为更类似于 Java 的代码。
请考虑以下<filter>
脚本:
headers.type == 'good'
前面的脚本在 Spring Integration 中成为以下方法:
@groovy.transform.CompileStatic
String filter(Map headers) {
headers.type == 'good'
}
filter(headers)
这样,filter()
方法被转换并编译为静态 Java 代码,绕过 Groovy
调用的动态阶段,例如getProperty()
factories 和CallSite
代理。
从版本 4.3 开始,您可以使用compile-static
boolean
选项,指定ASTTransformationCustomizer
为@CompileStatic
应该添加到内部CompilerConfiguration
.
有了这个,你可以省略方法声明@CompileStatic
,并且仍然获得编译后的纯 Java 代码。
在这种情况下,前面的脚本可以很短,但仍需要比解释的脚本更详细一些,如下例所示:
binding.variables.headers.type == 'good'
您必须访问headers
和payload
(或任何其他)变量通过groovy.lang.Script
binding
property 的 b,因为@CompileStatic
,我们没有动态GroovyObject.getProperty()
能力。
此外,我们还引入了compiler-configuration
bean 引用。
使用此属性,您可以提供任何其他必需的 Groovy 编译器自定义,例如ImportCustomizer
.
有关此功能的更多信息,请参阅高级编译器配置的 Groovy 文档。
用compilerConfiguration 不会自动添加ASTTransformationCustomizer 对于@CompileStatic 注解,它会覆盖compileStatic 选择。
如果您仍然需要CompileStatic 中,您应该手动添加new ASTTransformationCustomizer(CompileStatic.class) 到CompilationCustomizers 那个习俗compilerConfiguration . |
Groovy 编译器自定义对refresh-check-delay 选项和可重新加载的脚本也可以静态编译。 |
控制总线
如 (企业集成模式) 中所述,控制总线背后的思想是,您可以使用与 “应用程序级” 消息传递相同的消息传递系统来监视和管理框架内的组件。 在 Spring 集成中,我们构建在前面描述的适配器之上,以便您可以发送消息作为调用公开作的一种方式。 这些作的一个选项是 Groovy 脚本。 下面的示例为 control bus 配置一个 Groovy 脚本:
<int-groovy:control-bus input-channel="operationChannel"/>
控制总线有一个 Importing 通道,可以访问该通道以调用应用程序上下文中 bean 上的作。
Groovy 控制总线将 Importing 通道上的消息作为 Groovy 脚本运行。
它接受一条消息,将正文编译为脚本,并使用GroovyObjectCustomizer
,然后运行它。
控制总线的MessageProcessor
公开应用程序上下文中所有带有 Comments 的 bean@ManagedResource
并实现 Spring 的Lifecycle
接口或扩展 Spring 的CustomizableThreadCreator
基类(例如,多个TaskExecutor
和TaskScheduler
implementations) 的 Implementations)。
在 Control Bus' 命令脚本中使用具有自定义作用域(例如 'request')的托管 bean 时要小心,尤其是在异步消息流中。
如果MessageProcessor 的控制总线无法从应用程序上下文中公开一个 bean,你最终可能会得到一些BeansException 在命令脚本运行期间。
例如,如果未建立自定义范围的上下文,则尝试获取该范围内的 bean 会触发BeanCreationException . |
如果需要进一步自定义 Groovy 对象,还可以提供对实现GroovyObjectCustomizer
通过customizer
属性,如下例所示:
<int-groovy:control-bus input-channel="input"
output-channel="output"
customizer="groovyCustomizer"/>
<beans:bean id="groovyCustomizer" class="org.foo.MyGroovyObjectCustomizer"/>
向终端节点添加行为
在 Spring Integration 2.2 之前,你可以通过将 AOP Advice 添加到 Poller 的<advice-chain/>
元素。
但是,假设您只想重试 REST Web 服务调用,而不重试任何下游终端节点。
例如,请考虑以程:
inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter
如果你在 Poller 上将一些 retry-logic 配置到通知链中,并且调用http-gateway2
由于网络故障而失败,则重试会导致http-gateway1
和http-gateway2
以第二次调用。
同样,在 jdbc-outbound-adapter 中出现暂时性故障后,两个 HTTP 网关都会被第二次调用,然后再次调用jdbc-outbound-adapter
.
Spring Integration 2.2 增加了向单个端点添加行为的能力。
这是通过添加<request-handler-advice-chain/>
元素添加到多个端点。
以下示例显示了如何<request-handler-advice-chain/>
元素中outbound-gateway
:
<int-http:outbound-gateway id="withAdvice"
url-expression="'http://localhost/test1'"
request-channel="requests"
reply-channel="nextChannel">
<int-http:request-handler-advice-chain>
<ref bean="myRetryAdvice" />
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
在这种情况下,myRetryAdvice
仅在本地应用于此网关,不适用于在将回复发送到nextChannel
.
建议的范围仅限于终端节点本身。
此时,您无法建议整个 但是, |
提供的建议课程
除了提供应用 AOP 建议类的通用机制外, Spring 集成还提供了这些开箱即用的建议实现:
-
RequestHandlerRetryAdvice
(如重试建议中所述) -
RequestHandlerCircuitBreakerAdvice
(在 Circuit Breaker Advice 中描述) -
ExpressionEvaluatingRequestHandlerAdvice
(在 Expression Evaluation Advice 中描述) -
RateLimiterRequestHandlerAdvice
(如 Rate Limiter Advice 中所述) -
CacheRequestHandlerAdvice
(在 缓存建议 中描述) -
ReactiveRequestHandlerAdvice
(在 Reactive Advice 中描述)
重试建议
重试建议 (o.s.i.handler.advice.RequestHandlerRetryAdvice
)利用了 Spring Retry 项目提供的丰富重试机制。
核心组件spring-retry
是RetryTemplate
,它允许配置复杂的重试场景,包括RetryPolicy
和BackoffPolicy
策略(具有许多实现方式)以及RecoveryCallback
策略来确定重试用尽时要采取的作。
- 无状态重试
-
无状态重试是指重试活动完全在通知中处理的情况。 线程将暂停(如果配置为这样做)并重试该作。
- 状态重试
-
有状态重试是指在通知中管理重试状态,但引发异常并且调用方重新提交请求的情况。 有状态重试的一个示例是,当我们希望消息发起方(例如 JMS)负责重新提交,而不是在当前线程上执行时。 有状态重试需要某种机制来检测重试的提交。
有关更多信息spring-retry
,请参阅项目的 Javadoc 和 Spring Batch 的参考文档,其中spring-retry
起源。
默认的 back off 行为是 not reoff 。 将立即尝试重试。 使用导致线程在两次尝试之间暂停的回退策略可能会导致性能问题,包括内存使用过多和线程不足。 在大容量环境中,应谨慎使用回退策略。 |
配置 Retry Advice
本节中的示例使用了以下内容<service-activator>
这总是会引发一个异常:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 简单的无状态重试
-
默认的
RetryTemplate
具有SimpleRetryPolicy
它尝试了 3 次。 没有BackOffPolicy
,因此这三次尝试是背靠背进行的,两次尝试之间没有延迟。 没有RecoveryCallback
,因此结果是在最后一次重试失败后向调用方抛出异常。 在 Spring 集成环境中,这个最终异常可以通过使用error-channel
在入站终端节点上。 以下示例使用RetryTemplate
并显示其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 带恢复的简单无状态重试
-
以下示例添加了一个
RecoveryCallback
添加到前面的示例中,并使用ErrorMessageSendingRecoverer
要发送ErrorMessage
添加到频道中:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- 使用自定义策略的无状态重试和恢复
-
对于更复杂的问题,我们可以提供定制的建议
RetryTemplate
. 此示例继续使用SimpleRetryPolicy
但将尝试次数增加到 4 次。 它还添加了一个ExponentialBackoffPolicy
其中,第一次重试等待 1 秒,第二次重试等待 5 秒,第三次重试等待 25 秒(总共尝试 4 次)。 下面的清单显示了该示例及其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- 命名空间对无状态重试的支持
-
从版本 4.0 开始,由于命名空间支持重试建议,可以大大简化前面的配置,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
在前面的示例中,通知被定义为顶级 bean,以便它可以在多个
request-handler-advice-chain
实例。 您还可以直接在链中定义建议,如下例所示:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
一个
<handler-retry-advice>
可以具有<fixed-back-off>
或<exponential-back-off>
child 元素或没有 child 元素。 一个<handler-retry-advice>
没有子元素时,不使用退避。 如果没有recovery-channel
,则在重试次数用尽时引发异常。 命名空间只能与无状态重试一起使用。对于更复杂的环境(自定义策略等),请使用 normal
<bean>
定义。 - 带恢复的简单状态重试
-
要使重试有状态,我们需要为 retry 提供
RetryStateGenerator
实现。 此类用于将邮件标识为重新提交,以便RetryTemplate
可以确定此消息的当前重试状态。 该框架提供了一个SpelExpressionRetryStateGenerator
,它通过使用 SPEL 表达式确定消息标识符。 此示例再次使用默认策略 (3 次尝试,无回退)。 与无状态重试一样,这些策略可以自定义。 下面的清单显示了该示例及其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果将前面的示例与无状态示例进行比较,可以看到,使用有状态重试时,每次失败时都会向调用方引发异常。
- 重试的异常分类
-
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。 默认配置对所有异常重试,异常分类器查看顶级异常。 如果您将其配置为仅在
MyException
,您的应用程序会抛出一个SomeOtherException
其中,原因是MyException
,则不会发生重试。从 Spring Retry 1.0.3 开始,
BinaryExceptionClassifier
具有一个名为traverseCauses
(默认值为false
). 什么时候true
,它会遍历异常原因,直到找到匹配项或用完遍历的原因。要使用此分类器进行重试,请使用
SimpleRetryPolicy
使用采用最大尝试次数的构造函数创建,Map
之Exception
对象和traverseCauses
布尔。 然后,您可以将此策略注入到RetryTemplate
.
traverseCauses 是必需的,因为用户异常可能包装在MessagingException . |
熔断器建议
断路器模式的一般思路是,如果某个服务当前不可用,请不要浪费时间(和资源)来尝试使用它。
这o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
实现此模式。
当断路器处于 closed 状态时,终端节点会尝试调用该服务。
如果连续尝试一定次数失败,则断路器将进入 open 状态。
当它处于打开状态时,新请求“快速失败”,并且在一段时间到期之前不会尝试调用该服务。
当该时间到期时,断路器将设置为半开状态。 在此状态下,即使一次尝试失败,断路器也会立即进入打开状态。 如果尝试成功,断路器将进入 closed 状态,在这种情况下,它不会再次进入 open 状态,直到再次发生配置的连续失败次数。 任何成功的尝试都会将状态重置为零失败,以确定断路器何时可能再次进入打开状态。
通常,此建议可能用于外部服务,在这些服务中,可能需要一些时间才能失败(例如尝试建立网络连接时超时)。
这RequestHandlerCircuitBreakerAdvice
有两个属性:threshold
和halfOpenAfter
.
这threshold
属性表示 breaker 打开之前需要发生的连续失败次数。
它默认为5
.
这halfOpenAfter
属性表示 Breaker 在最后一次失败后等待其他请求的时间。
默认值为 1000 毫秒。
以下示例配置断路器并显示其DEBUG
和ERROR
输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为2
和halfOpenAfter
设置为12
秒。
每 5 秒收到一个新请求。
前两次尝试调用了该服务。
第三个和第四个失败,并出现异常,指示断路器已打开。
尝试了第五个请求,因为该请求是在上次失败后的 15 秒。
第六次尝试立即失败,因为断路器立即打开。
表达式计算建议
最后提供的通知类是o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
.
这个建议比其他两个建议更普遍。
它提供了一种机制,用于计算发送到终端节点的原始入站消息的表达式。
在成功或失败后,可以评估单独的表达式。
(可选)可以将包含评估结果的消息与输入消息一起发送到消息通道。
此建议的一个典型用例可能是使用<ftp:outbound-channel-adapter/>
,如果传输成功,则可能将文件移动到一个目录,如果传输失败,则移动到另一个目录:
该通知具有用于设置成功时表达式、失败时设置表达式以及每个选项的相应通道的属性。
对于成功案例,发送到successChannel
是一个AdviceMessage
,其中 payload 是表达式 evaluation 的结果。
一个名为inputMessage
,包含发送到处理程序的原始消息。
发送到failureChannel
(当处理程序引发异常时)是一个ErrorMessage
负载为MessageHandlingExpressionEvaluatingAdviceException
.
赞所有人MessagingException
实例,则此有效负载具有failedMessage
和cause
属性,以及一个名为evaluationResult
,其中包含表达式 evaluation 的结果。
从版本 5.1.3 开始,如果配置了 channels,但未提供表达式,则默认表达式用于计算为payload 的消息。 |
当在通知范围内引发异常时,默认情况下,该异常会在任何failureExpression
被评估。
如果您希望禁止抛出异常,请将trapException
property 设置为true
.
以下建议显示了如何配置advice
使用 Java DSL:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
Rate Limiter 建议
Rate Limiter 建议 (RateLimiterRequestHandlerAdvice
) 允许确保终端节点不会因请求而过载。
当超出速率限制时,请求将进入 blocked 状态。
此建议的一个典型用例可能是外部服务提供商不允许超过n
每分钟的请求数。
这RateLimiterRequestHandlerAdvice
实施完全基于 Resilience4j 项目,并且需要RateLimiter
或RateLimiterConfig
注射。
也可以使用默认值和/或自定义名称进行配置。
以下示例将速率限制器建议配置为每 1 秒 1 个请求:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从版本 5.2 开始,CacheRequestHandlerAdvice
已引入。
它基于 Spring Framework 中的缓存抽象,并与@Caching
注释族。
内部的 logic 基于CacheAspectSupport
扩展,其中缓存作的代理是围绕AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
方法与请求Message<?>
作为参数。
可以使用 SPEL 表达式或Function
以评估缓存键。
请求Message<?>
可用作 SPEL 评估上下文的根对象,或作为Function
input 参数。
默认情况下,payload
的请求消息用于缓存键。
这CacheRequestHandlerAdvice
必须配置cacheNames
,当默认缓存作为CacheableOperation
,或者使用任意CacheOperation
s.
每CacheOperation
可以单独配置或具有共享选项,例如CacheManager
,CacheResolver
和CacheErrorHandler
,可以从CacheRequestHandlerAdvice
配置。
此配置功能类似于 Spring Framework 的@CacheConfig
和@Caching
注释组合。
如果CacheManager
,则默认情况下会从BeanFactory
在CacheAspectSupport
.
以下示例使用一组不同的缓存作配置两个建议:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}
Reactive Advice
从版本 5.3 开始,ReactiveRequestHandlerAdvice
可用于生成Mono
答复。
一个BiFunction<Message<?>, Mono<?>, Publisher<?>>
必须为此建议提供,并且它是从Mono.transform()
运算符对拦截的handleRequestMessage()
方法实现。
通常,这样的Mono
当我们想通过以下方式控制网络波动时,自定义是必要的timeout()
,retry()
以及类似的支持运营商。
例如,当我们可以通过 WebFlux 客户端发出 HTTP 请求时,我们可以使用以下配置来等待响应的时间不超过 5 秒:
.handle(WebFlux.outboundGateway("https://somehost/"),
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
这message
argument 是消息处理程序的请求消息,可用于确定请求范围的属性。
这mono
参数是此消息处理程序的handleRequestMessage()
方法实现。
嵌套的Mono.transform()
也可以从此函数中调用以应用,例如,Reactive Circuit Breaker。
自定义建议类
除了前面描述的提供的 advice 类之外,您还可以实现自己的 advice 类。
虽然您可以提供org.aopalliance.aop.Advice
(通常org.aopalliance.intercept.MethodInterceptor
),我们通常建议您将o.s.i.handler.advice.AbstractRequestHandlerAdvice
.
这样做的好处是避免编写低级面向方面的编程代码,并提供专门为此环境定制的起点。
子类需要实现doInvoke()
方法,其定义如下:
/**
* Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
* invokes the handler method and returns its result, or null).
* @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
* @param target The target handler.
* @param message The message that will be sent to the handler.
* @return the result after invoking the {@link MessageHandler}.
* @throws Exception
*/
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;
callback 参数可以方便地避免直接处理 AOP 的子类。
调用callback.execute()
method 调用消息处理程序。
这target
参数是为那些需要维护特定处理程序的状态的子类提供的,可能是通过在Map
keyed by 目标。
此功能允许将相同的建议应用于多个处理程序。
这RequestHandlerCircuitBreakerAdvice
使用 advice this 来保持每个处理程序的 Circuit breaker 状态。
这message
parameter 是发送到处理程序的消息。
虽然 advice 无法在调用处理程序之前修改消息,但它可以修改有效负载(如果它具有可变属性)。
通常,通知会使用该消息进行日志记录,或者在调用处理程序之前或之后的某个位置发送消息的副本。
返回值通常是由callback.execute()
.
但是,该建议确实能够修改返回值。
请注意,只有AbstractReplyProducingMessageHandler
instances 返回值。
以下示例显示了一个自定义通知类,该类扩展了AbstractRequestHandlerAdvice
:
public class MyAdvice extends AbstractRequestHandlerAdvice {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
// add code before the invocation
Object result = callback.execute();
// add code after the invocation
return result;
}
}
除了 有关更多信息,请参阅 ReflectiveMethodInvocation Javadoc。 |
处理消息通知
如本节简介中所述,请求处理程序 advice 链中的 advice 对象仅应用于当前端点,而不是下游流(如果有)。
为MessageHandler
生成 Reply 的对象(例如,扩展AbstractReplyProducingMessageHandler
),该建议将应用于内部方法:handleRequestMessage()
(调用 自MessageHandler.handleMessage()
).
对于其他消息处理程序,该建议将应用于MessageHandler.handleMessage()
.
在某些情况下,即使消息处理程序是AbstractReplyProducingMessageHandler
,则建议必须应用于handleMessage
方法。
例如,幂等接收器可能会返回null
,如果处理程序的replyRequired
属性设置为true
.
另一个示例是BoundRabbitChannelAdvice
— 请参阅 严格消息排序。
从版本 4.3.1 开始,新的HandleMessageAdvice
interface 及其基本实现 (AbstractHandleMessageAdvice
) 已引入。Advice
实现HandleMessageAdvice
始终应用于handleMessage()
方法,而不管处理程序类型如何。
重要的是要明白HandleMessageAdvice
当应用于返回响应的处理程序时,实现(例如幂等接收器)与adviceChain
并正确地应用于MessageHandler.handleMessage()
方法。
由于这种取消关联,因此不遵守建议链顺序。 |
请考虑以下配置:
<some-reply-producing-endpoint ... >
<int:request-handler-advice-chain>
<tx:advice ... />
<ref bean="myHandleMessageAdvice" />
</int:request-handler-advice-chain>
</some-reply-producing-endpoint>
在前面的示例中,<tx:advice>
应用于AbstractReplyProducingMessageHandler.handleRequestMessage()
.
然而myHandleMessageAdvice
应用于MessageHandler.handleMessage()
.
因此,它在<tx:advice>
.
要保留顺序,您应该遵循标准的 Spring AOP 配置方法并使用端点id
与.handler
suffix 获取目标MessageHandler
豆。
请注意,在这种情况下,整个下游流都在事务范围内。
对于MessageHandler
这不会返回响应,则保留 Advice Chain 顺序。
从版本 5.3 开始,HandleMessageAdviceAdapter
用于应用任何MethodInterceptor
对于MessageHandler.handleMessage()
方法,因此,整个子流。
例如,RetryOperationsInterceptor
可以应用于从某个端点开始的整个子流;默认情况下,这是不可能的,因为 Consumer 端点仅将建议应用于AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
.
事务支持
从版本 5.0 开始,新的TransactionHandleMessageAdvice
的引入使整个下游流具有事务性,这要归功于HandleMessageAdvice
实现。
当常规TransactionInterceptor
用于<request-handler-advice-chain>
元素(例如,通过配置<tx:advice>
),则启动的事务仅适用于内部AbstractReplyProducingMessageHandler.handleRequestMessage()
,并且不会传播到下游流。
为了简化 XML 配置,除了<request-handler-advice-chain>
一个<transactional>
元素已添加到所有<outbound-gateway>
和<service-activator>
和相关组件。
以下示例显示了<transactional>
使用中:
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
Java 配置可以通过使用TransactionInterceptorBuilder
,结果的 Bean 名称可以在消息传递注释中使用 adviceChain
属性,如下例所示:
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
请注意true
参数TransactionInterceptorBuilder
构造 函数。
它会导致创建一个TransactionHandleMessageAdvice
,不是常规的TransactionInterceptor
.
Java DSL 支持Advice
通过.transactional()
选项,如下例所示:
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
Advising Filters
建议时还有一个额外的考虑因素Filter
建议。
默认情况下,任何放弃作(当筛选条件返回false
) 在通知链的范围内执行。
这可能包括 discard 通道下游的所有流。
因此,例如,如果 discard 通道下游的 element 抛出异常并且存在重试建议,则会重试该过程。
此外,如果throwExceptionOnRejection
设置为true
(异常在 NOTIFY 的范围内引发)。
设置discard-within-advice
自false
修改此行为,discard(或异常)发生在调用通知链之后。
使用注释为终端节点提供建议
使用注释 (@Filter
,@ServiceActivator
,@Splitter
和@Transformer
),您可以在adviceChain
属性。
此外,@Filter
annotation 还具有discardWithinAdvice
属性,该属性可用于配置丢弃行为,如 Advising Filters中所述。
以下示例导致在通知之后执行 discard:
@MessageEndpoint
public class MyAdvisedFilter {
@Filter(inputChannel="input", outputChannel="output",
adviceChain="adviceChain", discardWithinAdvice="false")
public boolean filter(String s) {
return s.contains("good");
}
}
Advice Chain 中的 Ordering Advice
Advice 类是 “around” advice,以嵌套方式应用。 第一个建议是最外层的,而最后一个建议是最内层的(即最接近被建议的处理程序)。 将 advice 类按正确的顺序排列以实现所需的功能非常重要。
例如,假设您要添加重试建议和事务建议。
您可能希望先放置重试建议,然后再放置事务建议。
因此,每次重试都在新事务中执行。
另一方面,如果您希望所有尝试和任何恢复作(在重试RecoveryCallback
) 来限定 transaction 的作用域,则可以将 transaction advice 放在首位。
建议的处理程序属性
有时,从 advice 中访问 handler 属性很有用。
例如,大多数处理程序都实现了NamedComponent
以访问组件名称。
目标对象可以通过target
参数(当子类化AbstractRequestHandlerAdvice
) 或invocation.getThis()
(实施org.aopalliance.intercept.MethodInterceptor
).
当整个处理程序都被通知时(例如,当处理程序不生成回复或建议实现HandleMessageAdvice
),则可以将目标对象强制转换为接口,例如NamedComponent
,如以下示例所示:
String componentName = ((NamedComponent) target).getComponentName();
当您实施MethodInterceptor
直接转换目标对象,如下所示:
String componentName = ((NamedComponent) invocation.getThis()).getComponentName();
当只有handleRequestMessage()
方法(在生成回复的处理程序中),您需要访问完整的处理程序,它是一个AbstractReplyProducingMessageHandler
.
以下示例显示了如何执行此作:
AbstractReplyProducingMessageHandler handler =
((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();
String componentName = handler.getComponentName();
幂等接收器企业集成模式
从版本 4.1 开始, Spring 集成提供了幂等接收器企业集成模式的实现。
它是一个函数模式,整个幂等逻辑应该在应用程序中实现。
但是,为了简化决策,IdempotentReceiverInterceptor
组件。
这是一个 AOPAdvice
应用于MessageHandler.handleMessage()
方法,并且可以filter
请求消息或将其标记为duplicate
,根据其配置。
以前,您可以通过使用自定义MessageSelector
在<filter/>
(请参阅 Filter)。
但是,由于此模式实际上定义了终端节点的行为,而不是终端节点本身,因此幂等接收器实现不提供终端节点组件。
相反,它应用于应用程序中声明的端点。
的逻辑IdempotentReceiverInterceptor
基于提供的MessageSelector
并且,如果该选择器不接受邮件,则会使用duplicateMessage
header 设置为true
.
目标MessageHandler
(或下游流)可以查阅此标头以实现正确的幂等逻辑。
如果IdempotentReceiverInterceptor
配置了discardChannel
或throwExceptionOnRejection = true
,则不会将重复的消息发送到目标MessageHandler.handleMessage()
.
相反,它被丢弃了。
如果要丢弃(不处理)重复的消息,则discardChannel
应配置NullChannel
,例如默认的nullChannel
豆。
为了维护消息之间的状态并提供比较消息以实现幂等性的功能,我们提供了MetadataStoreSelector
.
它接受一个MessageProcessor
实现(它根据Message
) 和可选的ConcurrentMetadataStore
(元数据存储)。
请参阅MetadataStoreSelector
Javadoc了解更多信息。
您还可以自定义value
为ConcurrentMetadataStore
通过使用额外的MessageProcessor
.
默认情况下,MetadataStoreSelector
使用timestamp
消息标头。
通常,如果键没有现有值,选择器会选择一条消息进行接受。
在某些情况下,比较键的当前值和新值以确定是否应接受该消息非常有用。
从版本 5.3 开始,compareValues
属性,该属性引用BiPredicate<String, String>
;第一个参数是旧值;返回true
接受消息并将旧值替换为MetadataStore
.
这对于减少键的数量很有用;例如,在处理文件中的行时,可以将文件名存储在 Key 中,将当前行号存储在 Value 中。
然后,在重新启动后,您可以跳过已处理的行。
有关示例,请参阅Idempotent Downstream Processing a Split File。
为方便起见,MetadataStoreSelector
选项可以直接在<idempotent-receiver>
元件。
下面的清单显示了所有可能的属性:
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | 的 IDIdempotentReceiverInterceptor 豆。
自选。 |
2 | 应用此侦听器的使用者终结点名称或模式。
用逗号 (, ),例如endpoint="aaa, bbb*, ccc, *ddd, eee*fff" .
然后,使用与这些模式匹配的端点 Bean 名称来检索目标端点的MessageHandler bean(使用其.handler suffix) 和IdempotentReceiverInterceptor 应用于这些 bean。
必填。 |
3 | 一个MessageSelector bean 引用。
互斥与metadata-store 和key-strategy (key-expression) .
什么时候selector 未提供,则为key-strategy 或key-strategy-expression 是必需的。 |
4 | 标识在使用IdempotentReceiverInterceptor 不接受它。
省略时,重复消息将转发给带有duplicateMessage 页眉。
自选。 |
5 | 一个ConcurrentMetadataStore 参考。
由底层MetadataStoreSelector .
互斥与selector .
自选。
默认的MetadataStoreSelector 使用内部SimpleMetadataStore 这不会在应用程序执行之间保持状态。 |
6 | 一个MessageProcessor 参考。
由底层MetadataStoreSelector .
计算idempotentKey 从请求消息中。
互斥与selector 和key-expression .
当selector 未提供,则为key-strategy 或key-strategy-expression 是必需的。 |
7 | 用于填充ExpressionEvaluatingMessageProcessor .
由底层MetadataStoreSelector .
计算idempotentKey 通过使用请求消息作为评估上下文根对象。
互斥与selector 和key-strategy .
当selector 未提供,则为key-strategy 或key-strategy-expression 是必需的。 |
8 | 一个MessageProcessor 参考。
由底层MetadataStoreSelector .
计算value 对于idempotentKey 从请求消息中。
互斥与selector 和value-expression .
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。 |
9 | 用于填充ExpressionEvaluatingMessageProcessor .
由底层MetadataStoreSelector .
计算value 对于idempotentKey 通过使用请求消息作为评估上下文根对象。
互斥与selector 和value-strategy .
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。 |
10 | 对BiPredicate<String, String> bean,它允许您通过比较键的旧值和新值来选择性地选择消息;null 默认情况下。 |
11 | 如果IdempotentReceiverInterceptor 拒绝该消息。
默认为false .
无论discard-channel 。 |
对于 Java 配置, Spring 集成提供了方法级的@IdempotentReceiver
注解。
它用于标记method
具有消息传递注释 (@ServiceActivator
,@Router, and others) to specify which `IdempotentReceiverInterceptor
对象将应用于此终端节点。
以下示例演示如何使用@IdempotentReceiver
注解:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当你使用 Java DSL 时,你可以将拦截器添加到端点的建议链中,如下例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
这IdempotentReceiverInterceptor 专为MessageHandler.handleMessage(Message<?>) 方法。
从版本 4.3.1 开始,它实现了HandleMessageAdvice ,使用AbstractHandleMessageAdvice 作为基类,以便更好地分离。
有关更多信息,请参阅 Handling Message Advice。 |
记录通道适配器
这<logging-channel-adapter>
通常与分线器一起使用,如 Wire Tap 中所述。
但是,它也可以用作任何流的最终使用者。
例如,考虑一个以<service-activator>
返回一个 result,但您希望丢弃该结果。
为此,您可以将结果发送到NullChannel
.
或者,您可以将其路由到INFO
水平<logging-channel-adapter>
.
这样,您可以在登录INFO
级别,但在 (例如)WARN
水平。
使用NullChannel
,则在登录到DEBUG
水平。
以下清单显示了logging-channel-adapter
元素:
<int:logging-channel-adapter
channel="" (1)
level="INFO" (2)
expression="" (3)
log-full-message="false" (4)
logger-name="" /> (5)
1 | 将日志记录适配器连接到上游组件的通道。 |
2 | 将记录发送到此适配器的消息的日志记录级别。
违约:INFO . |
3 | 一个 SPEL 表达式,准确表示记录了消息的哪些部分。
违约:payload — 仅记录有效负载。
如果log-full-message 时,不能指定此属性。 |
4 | 什么时候true ,则会记录整个消息(包括标头)。
违约:false — 仅记录有效负载。
如果满足以下条件,则无法指定此属性expression 。 |
5 | 指定name 记录器(称为category 在log4j ).
用于标识此适配器创建的日志消息。
这将允许为各个适配器设置日志名称(在 logging 子系统中)。
默认情况下,所有适配器都使用以下名称记录:org.springframework.integration.handler.LoggingHandler . |
使用 Java 配置
以下 Spring Boot 应用程序显示了配置LoggingHandler
通过使用 Java 配置:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
@ServiceActivator(inputChannel = "logChannel")
public LoggingHandler logging() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(defaultRequestChannel = "logChannel")
public interface MyGateway {
void sendToLogger(String data);
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了使用 Java DSL 配置日志记录通道适配器的示例:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
public IntegrationFlow loggingFlow() {
return IntegrationFlow.from(MyGateway.class)
.log(LoggingHandler.Level.DEBUG, "TEST_LOGGER",
m -> m.getHeaders().getId() + ": " + m.getPayload());
}
@MessagingGateway
public interface MyGateway {
void sendToLogger(String data);
}
}
java.util.function
接口支持
从版本 5.1 开始, Spring 集成为java.util.function
包。
所有消息收发端点(Service Activator、Transformer、Filter 等)现在都可以引用Function
(或Consumer
) 豆子。
消息传递注释可以直接应用于这些 bean,类似于常规MessageHandler
定义。
例如,如果你有这个Function
bean 定义:
@Configuration
public class FunctionConfiguration {
@Bean
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
}
您可以将其用作 XML 配置文件中的简单引用:
<service-activator input-channel="processorViaFunctionChannel" ref="functionAsService"/>
当我们使用 Messaging Annotations 配置流时,代码很简单:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
当函数返回数组时,Collection
(基本上,任何Iterable
),Stream
或 ReactorFlux
,@Splitter
可以在这样的 bean 上用于对结果内容执行迭代。
这java.util.function.Consumer
接口可用于<int:outbound-channel-adapter>
或者,与@ServiceActivator
注解,以执行流程的最后一步:
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
public Consumer<Message<?>> messageConsumerAsService() {
// Has to be an anonymous class for proper type inference
return new Consumer<Message<?>>() {
@Override
public void accept(Message<?> e) {
collector().add(e);
}
};
}
另外,请注意上面代码片段中的注释:如果您想在Function
/Consumer
您不能使用 Lambda 定义。
由于 Java 类型擦除,我们无法确定apply()/accept()
方法调用。
这java.util.function.Supplier
interface 可以简单地与@InboundChannelAdapter
注释,或作为ref
在<int:inbound-channel-adapter>
:
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
return () -> "foo";
}
使用 Java DSL,我们只需要在端点定义中使用对函数 bean 的引用。
同时,的Supplier
接口可以按常规使用MessageSource
定义:
@Bean
public Function<String, String> toUpperCaseFunction() {
return String::toUpperCase;
}
@Bean
public Supplier<String> stringSupplier() {
return () -> "foo";
}
@Bean
public IntegrationFlow supplierFlow() {
return IntegrationFlow.from(stringSupplier())
.transform(toUpperCaseFunction())
.channel("suppliedChannel")
.get();
}
此函数支持在与 Spring Cloud Function 框架一起使用时非常有用,其中我们有一个函数目录,并且可以从集成流定义中引用其成员函数。
Kotlin 支持
该框架也得到了改进,以支持函数的 Kotlin lambda,因此现在您可以结合使用 Kotlin 语言和 Spring 集成流定义:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
Kotlin 协程
从版本 6.0 开始, Spring 集成提供了对 Kotlin 协程的支持。
现在suspend
functions 和kotlinx.coroutines.Deferred
& kotlinx.coroutines.flow.Flow
返回类型可用于服务方法:
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
该框架将它们视为 Reactive Streams 交互,并使用ReactiveAdapterRegistry
转换为相应的Mono
和Flux
反应器类型。
这样的函数 reply 将在 reply channel 中处理,如果它是一个ReactiveStreamsSubscribableChannel
或作为CompletableFuture
在相应的回调中。
具有Flow result 不是async 默认情况下,在@ServiceActivator 所以Flow 实例作为回复消息有效负载生成。
目标应用程序负责将此对象作为协程处理或将其转换为Flux 分别。 |
这@MessagingGateway
interface 方法也可以用suspend
修饰符。
该框架利用Mono
internally 使用下游流执行请求-回复。
这样的Mono
result 由MonoKt.awaitSingleOrNull()
API 来实现kotlin.coroutines.Continuation
argument fo the calledsuspend
网关的功能:
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {
suspend fun suspendGateway(payload: String): String
}
根据 Kotlin 语言要求,必须将此方法作为协程调用:
@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway
fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}