本章的第一部分介绍了一些背景理论,并揭示了驱动Spring Integration各种消息传递组件的底层API的很多信息。 如果您想真正了解幕后发生的事情,这些信息会很有帮助。 但是,如果要启动并运行各种元素的简化的基于命名空间的配置,请随时跳到终结点命名空间支持。
如概述中所述,消息端点负责将各种消息传递组件连接到通道。 在接下来的几章中,我们将介绍许多使用消息的不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面的消息通道所示,您可以向消息通道发送消息。 但是,接收有点复杂。 主要原因是有两种类型的消费者:轮询消费者和事件驱动的消费者。
在这两者中,事件驱动的消费者要简单得多。
无需管理和调度单独的轮询器线程,它们本质上是具有回调方法的侦听器。
当连接到Spring Integration的可订阅消息通道之一时,这个简单的选项效果很好。
但是,当连接到缓冲、可轮询的消息通道时,某些组件必须调度和管理轮询线程。
Spring Integration 提供了两种不同的端点实现来适应这两种类型的使用者。
因此,消费者本身只需要实现回调接口即可。
当需要轮询时,终端节点将充当使用者实例的容器。
其好处类似于使用容器来托管消息驱动的 bean,但是,由于这些使用者是在 中运行的 Spring 管理对象,因此它更接近于 Spring 自己的容器。ApplicationContext
MessageListener
消息处理程序
Spring Integration的接口由框架中的许多组件实现。
换句话说,这不是公共 API 的一部分,您通常不会直接实现。
然而,消息使用者使用它来实际处理使用的消息,因此了解此策略界面确实有助于理解使用者的整体角色。
接口定义如下:MessageHandler
MessageHandler
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管该接口简单,但它为以下章节中介绍的大多数组件(路由器、转换器、分离器、聚合器、服务激活器等)提供了基础。 这些组件各自对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。 Spring Integration 提供了两个端点实现,用于托管这些基于回调的处理程序,并让它们连接到消息通道。
事件驱动型消费者
因为它是两者中较简单的一个,所以我们首先介绍事件驱动的使用者终结点。
您可能还记得,该接口提供了一个方法,并且该方法接受一个参数(如 SubscribableChannel
中所示)。
下面的列表显示了该方法的定义:SubscribableChannel
subscribe()
MessageHandler
subscribe
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的使用者,并且 Spring Integration 提供的实现接受 a 和 a ,如以下示例所示:SubscribableChannel
MessageHandler
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring Integration 还提供了一个 ,并且可以用相同的方式实例化它,只是通道必须实现 ,如以下示例所示:PollingConsumer
PollableChannel
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询使用者还有许多其他配置选项。 以下示例演示如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
通常用简单的间隔 () 定义,但也支持属性和布尔属性(默认值为 — 即没有固定延迟)。
下面的示例设置这两个属性:PeriodicTrigger
Duration
initialDelay
fixedRate
false
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前面示例中三个设置的结果是一个触发器,该触发器等待 5 秒,然后每秒触发一次。
需要有效的 cron 表达式。
有关详细信息,请参阅 Javadoc。
以下示例设置了一个新的:CronTrigger
CronTrigger
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一示例中定义的触发器的结果是触发器,该触发器每十秒触发一次,从星期一到星期五。
轮询终端节点的默认触发器是具有 1 秒固定延迟期的实例。PeriodicTrigger |
除了触发器之外,还可以指定另外两个与轮询相关的配置属性:和 。
下面的示例演示如何设置这两个属性:maxMessagesPerPoll
receiveTimeout
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
该属性指定在给定轮询操作中要接收的最大消息数。
这意味着轮询器无需等待即可继续调用,直到返回任一值或达到最大值。
例如,如果轮询器具有 10 秒的间隔触发器和设置为 ,并且它正在轮询队列中包含 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。
它抓取 25 个,等待 10 秒,抓取下一个 25,依此类推。
如果配置了负值,则在单个轮询周期内调用,直到返回 。
从版本 5.5 开始,值具有特殊含义 - 完全跳过调用,这可能被视为暂停此轮询端点,直到稍后更改为 n 个非零值,例如通过控制总线。maxMessagesPerPoll
receive()
null
maxMessagesPerPoll
25
maxMessagesPerPoll
MessageSource.receive()
null
0
MessageSource.receive()
maxMessagesPerPoll
该属性指定轮询器在调用接收操作时没有可用消息时应等待的时间。
例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个选项的间隔触发时间为 5 秒,接收超时为 50 毫秒,而第二个选项的间隔触发时间为 50 毫秒,接收超时为 5 秒。
第一个消息可能比它在频道上接受的消息晚 4950 毫秒(如果该消息在其一个轮询调用返回后立即到达)。
另一方面,第二种配置永远不会错过超过 50 毫秒的消息。
不同之处在于,第二个选项需要线程等待。
但是,因此,它可以更快地响应到达的消息。
这种技术称为“长轮询”,可用于模拟轮询源上的事件驱动行为。receiveTimeout
轮询使用者也可以委托给 Spring ,如以下示例所示:TaskExecutor
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,a 具有一个名为 的属性。
此属性允许您指定 AOP 建议,以处理其他横切关注点(包括事务)。
这些建议适用于该方法。
有关更深入的信息,请参阅终结点命名空间支持下有关 AOP 建议链和事务支持的部分。
另请参阅注释 Javadocs 和相应的消息传递注释支持部分。
Java DSL 还为其各自的工厂提供了一个 .poller()
端点配置选项。PollingConsumer
adviceChain
List
doPoll()
@Poller
Pollers
前面的示例显示了依赖项查找。
但是,请记住,这些使用者通常配置为 Spring Bean 定义。
事实上,Spring Integration 还提供了一个调用,该调用根据通道类型创建适当的使用者类型。
此外,Spring Integration 具有完整的 XML 命名空间支持,可以进一步隐藏这些细节。
本指南介绍了基于命名空间的配置,并介绍了每种组件类型。FactoryBean
ConsumerEndpointFactoryBean
许多实现可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
但是,发送回复消息的时间和数量取决于处理程序类型。
例如,聚合器等待大量消息到达,并且通常配置为拆分器的下游使用者,拆分器可以为其处理的每条消息生成多个回复。
使用命名空间配置时,您不需要严格了解所有详细信息。
但是,仍然值得一提的是,这些组件中的几个共享一个公共基类 ,并且它提供了一个方法。MessageHandler AbstractReplyProducingMessageHandler setOutputChannel(..) |
有关轮询使用者的详细信息,请参阅通道适配器和通道适配器。 |
轮询终端节点的默认触发器是具有 1 秒固定延迟期的实例。PeriodicTrigger |
许多实现可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
但是,发送回复消息的时间和数量取决于处理程序类型。
例如,聚合器等待大量消息到达,并且通常配置为拆分器的下游使用者,拆分器可以为其处理的每条消息生成多个回复。
使用命名空间配置时,您不需要严格了解所有详细信息。
但是,仍然值得一提的是,这些组件中的几个共享一个公共基类 ,并且它提供了一个方法。MessageHandler AbstractReplyProducingMessageHandler setOutputChannel(..) |
终结点命名空间支持
在本参考手册中,您可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等。
其中大多数支持属性,许多支持属性。
解析后,这些端点元素将分别生成 或 的实例,具体取决于所引用的 的 或 的类型。
当通道可轮询时,轮询行为基于终结点元素的子元素及其属性。input-channel
output-channel
PollingConsumer
EventDrivenConsumer
input-channel
PollableChannel
SubscribableChannel
poller
下面列出了 a 的所有可用配置选项:poller
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
initial-delay="" (6)
id="" (7)
max-messages-per-poll="" (8)
receive-timeout="" (9)
ref="" (10)
task-executor="" (11)
time-unit="MILLISECONDS" (12)
trigger=""> (13)
<int:advice-chain /> (14)
<int:transactional /> (15)
</int:poller>
1 | 提供使用 Cron 表达式配置轮询器的功能。
基础实现使用 .
如果设置了此属性,则无需指定以下任何属性:、 、 和 。org.springframework.scheduling.support.CronTrigger fixed-delay trigger fixed-rate ref |
2 | 通过将此属性设置为 ,可以只定义一个全局默认轮询器。
如果在应用程序上下文中定义了多个默认轮询器,则会引发异常。
连接到 () 的任何端点或任何没有显式配置轮询器的端点都使用全局默认轮询器。
它默认为 。
自选。true PollableChannel PollingConsumer SourcePollingChannelAdapter false |
3 | 标识在此轮询器调用中发生故障时向其发送错误消息的通道。
若要完全禁止异常,可以提供对 .
自选。nullChannel |
4 | 固定延迟触发器使用暗中。
数值采用或可以作为持续时间格式(从版本 6.2 开始),例如 、 。
如果设置了此属性,则无需指定以下任何属性:、 、 和 。PeriodicTrigger time-unit PT10S P1D fixed-rate trigger cron ref |
5 | 固定速率触发器使用暗中。
数值采用或可以作为持续时间格式(从版本 6.2 开始),例如 、 。
如果设置了此属性,则无需指定以下任何属性:、 、 和 。PeriodicTrigger time-unit PT10S P1D fixed-delay trigger cron ref |
6 | 幕后的初始延迟(从版本 6.2 开始)。
数值采用或可以作为持续时间格式,例如 , .PeriodicTrigger time-unit PT10S P1D |
7 | 引用轮询器的基础 Bean-definition 的 ID,其类型为 。
该属性对于顶级轮询器元素是必需的,除非它是默认的轮询器 ()。org.springframework.integration.scheduling.PollerMetadata id default="true" |
8 | 有关详细信息,请参阅配置入站通道适配器。
如果未指定,则默认值取决于上下文。
如果使用 ,则此属性默认为 。
但是,如果使用 ,则该属性默认为 。
自选。PollingConsumer -1 SourcePollingChannelAdapter max-messages-per-poll 1 |
9 | 在基础类上设置值。
如果未指定,则默认为 1000(毫秒)。
自选。PollerMetadata |
10 | Bean 引用另一个顶级轮询器。
顶级元素上不得存在该属性。
但是,如果设置了此属性,则无需指定以下任何属性:、、 和 。ref poller fixed-rate trigger cron fixed-delay |
11 | 提供引用自定义任务执行程序的功能。 有关详细信息,请参阅 TaskExecutor 支持。 自选。 |
12 | 此属性指定基础 上的枚举值。
因此,此属性只能与 or 属性结合使用。
如果与任一属性或引用属性结合使用,则会导致失败。
支持的最小粒度为毫秒。
因此,唯一可用的选项是毫秒和秒。
如果未提供此值,则将 any or 值解释为毫秒。
基本上,此枚举为基于秒的间隔触发值提供了便利。
对于每小时、每天和每月设置,我们建议改用触发器。java.util.concurrent.TimeUnit org.springframework.scheduling.support.PeriodicTrigger fixed-delay fixed-rate cron trigger PeriodicTrigger fixed-delay fixed-rate cron |
13 | 引用实现接口的任何 Spring 配置的 bean。
但是,如果设置了此属性,则无需指定以下任何属性:、、 和 。
自选。org.springframework.scheduling.Trigger fixed-delay fixed-rate cron ref |
14 | 允许指定额外的 AOP 建议,以处理其他跨领域问题。 有关详细信息,请参阅交易。 自选。 |
15 | 轮询器可以成为事务性的。 有关更多信息,请参阅 AOP 建议链。 自选。 |
例子
可以按如下方式配置具有 1 秒间隔的基于间隔的简单轮询器:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用该属性的替代方法,您还可以使用该属性。fixed-rate
fixed-delay
对于基于 Cron 表达式的轮询器,请改用该属性,如以下示例所示:cron
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是 ,则需要轮询器配置。
具体来说,如前所述,是类的必需属性。
因此,如果省略轮询使用方终结点配置的子元素,则可能会引发异常。
如果尝试在连接到不可轮询通道的元素上配置轮询器,也可能会引发异常。PollableChannel
trigger
PollingConsumer
poller
还可以创建顶级轮询器,在这种情况下,只需要一个属性,如以下示例所示:ref
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
该属性仅在内部轮询器定义上是允许的。
在顶级轮询器上定义此属性会导致在初始化应用程序上下文期间引发配置异常。ref |
全局默认轮询器
为了进一步简化配置,您可以定义全局默认轮询器。
XML DSL 中的单个顶级轮询器组件的属性可能设置为 。
对于 Java 配置,在这种情况下,必须声明具有名称的 Bean。
在这种情况下,任何具有 a 作为其输入通道的端点,该端点在同一个 中定义,并且没有显式配置,都使用该默认值。
以下示例显示了这样的轮询器和使用它的转换器:default
true
PollerMetadata
PollerMetadata.DEFAULT_POLLER
PollableChannel
ApplicationContext
poller
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
交易支持
Spring Integration 还为轮询器提供事务支持,以便每个接收和转发操作都可以作为原子工作单元执行。
要为轮询器配置事务,请添加子元素。
以下示例显示了可用的属性:<transactional/>
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关详细信息,请参阅轮询器事务支持。
AOP咨询链
由于 Spring 事务支持依赖于代理机制,其中 (AOP Advice) 处理轮询器启动的消息流的事务行为,因此有时必须提供额外的建议来处理与轮询器关联的其他横切行为。
为此,定义了一个元素,允许您在实现接口的类中添加更多建议。
以下示例演示如何定义 for :TransactionInterceptor
poller
advice-chain
MethodInterceptor
advice-chain
poller
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现接口的更多信息,请参阅《Spring Framework 参考指南》的 AOP 部分。
还可以将建议链应用于没有任何事务配置的轮询器,从而增强轮询器启动的消息流的行为。MethodInterceptor
使用建议链时,无法指定子元素。
相反,声明一个 Bean 并将其添加到 .
有关完整的配置详细信息,请参阅轮询器事务支持。<transactional/> <tx:advice/> <advice-chain/> |
TaskExecutor 支持
轮询线程可以由 Spring 抽象的任何实例执行。
这将为一个终结点或一组终结点启用并发。
从 Spring 3.0 开始,核心 Spring Framework 有一个命名空间,其元素支持创建一个简单的线程池执行器。
该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。
配置线程池执行程序可以对端点在负载下的性能产生重大影响。
这些设置适用于每个终端节点,因为终端节点的性能是要考虑的主要因素之一(另一个主要因素是终端订阅的通道上的预期音量)。
若要为配置了 XML 命名空间支持的轮询终结点启用并发性,请提供对其元素的引用,然后提供以下示例中显示的一个或多个属性:TaskExecutor
task
<executor/>
task-executor
<poller/>
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果未提供任务执行程序,则在调用方的线程中调用使用者的处理程序。
请注意,调用方通常是默认的(请参阅配置任务计划程序)。
您还应该记住,该属性可以通过指定 Bean 名称来提供对 Spring 接口的任何实现的引用。
为方便起见,提供了前面显示的元素。TaskScheduler
task-executor
TaskExecutor
executor
如前面轮询使用者的后台部分所述,您还可以配置轮询使用者,以模拟事件驱动的行为。
由于接收超时较长,触发器间隔较短,因此即使在轮询的消息源上,也可以确保对到达的消息做出非常及时的反应。
请注意,这仅适用于具有超时阻塞等待调用的源。
例如,文件轮询器不会阻止。
每个调用都会立即返回,并且是否包含新文件。
因此,即使轮询器包含 long ,也永远不会在这种情况下使用该值。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。
以下示例演示轮询使用者如何几乎即时接收消息:receive()
receive-timeout
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)抖动的无限 while 循环那样多的 CPU 资源使用。
1 | 提供使用 Cron 表达式配置轮询器的功能。
基础实现使用 .
如果设置了此属性,则无需指定以下任何属性:、 、 和 。org.springframework.scheduling.support.CronTrigger fixed-delay trigger fixed-rate ref |
2 | 通过将此属性设置为 ,可以只定义一个全局默认轮询器。
如果在应用程序上下文中定义了多个默认轮询器,则会引发异常。
连接到 () 的任何端点或任何没有显式配置轮询器的端点都使用全局默认轮询器。
它默认为 。
自选。true PollableChannel PollingConsumer SourcePollingChannelAdapter false |
3 | 标识在此轮询器调用中发生故障时向其发送错误消息的通道。
若要完全禁止异常,可以提供对 .
自选。nullChannel |
4 | 固定延迟触发器使用暗中。
数值采用或可以作为持续时间格式(从版本 6.2 开始),例如 、 。
如果设置了此属性,则无需指定以下任何属性:、 、 和 。PeriodicTrigger time-unit PT10S P1D fixed-rate trigger cron ref |
5 | 固定速率触发器使用暗中。
数值采用或可以作为持续时间格式(从版本 6.2 开始),例如 、 。
如果设置了此属性,则无需指定以下任何属性:、 、 和 。PeriodicTrigger time-unit PT10S P1D fixed-delay trigger cron ref |
6 | 幕后的初始延迟(从版本 6.2 开始)。
数值采用或可以作为持续时间格式,例如 , .PeriodicTrigger time-unit PT10S P1D |
7 | 引用轮询器的基础 Bean-definition 的 ID,其类型为 。
该属性对于顶级轮询器元素是必需的,除非它是默认的轮询器 ()。org.springframework.integration.scheduling.PollerMetadata id default="true" |
8 | 有关详细信息,请参阅配置入站通道适配器。
如果未指定,则默认值取决于上下文。
如果使用 ,则此属性默认为 。
但是,如果使用 ,则该属性默认为 。
自选。PollingConsumer -1 SourcePollingChannelAdapter max-messages-per-poll 1 |
9 | 在基础类上设置值。
如果未指定,则默认为 1000(毫秒)。
自选。PollerMetadata |
10 | Bean 引用另一个顶级轮询器。
顶级元素上不得存在该属性。
但是,如果设置了此属性,则无需指定以下任何属性:、、 和 。ref poller fixed-rate trigger cron fixed-delay |
11 | 提供引用自定义任务执行程序的功能。 有关详细信息,请参阅 TaskExecutor 支持。 自选。 |
12 | 此属性指定基础 上的枚举值。
因此,此属性只能与 or 属性结合使用。
如果与任一属性或引用属性结合使用,则会导致失败。
支持的最小粒度为毫秒。
因此,唯一可用的选项是毫秒和秒。
如果未提供此值,则将 any or 值解释为毫秒。
基本上,此枚举为基于秒的间隔触发值提供了便利。
对于每小时、每天和每月设置,我们建议改用触发器。java.util.concurrent.TimeUnit org.springframework.scheduling.support.PeriodicTrigger fixed-delay fixed-rate cron trigger PeriodicTrigger fixed-delay fixed-rate cron |
13 | 引用实现接口的任何 Spring 配置的 bean。
但是,如果设置了此属性,则无需指定以下任何属性:、、 和 。
自选。org.springframework.scheduling.Trigger fixed-delay fixed-rate cron ref |
14 | 允许指定额外的 AOP 建议,以处理其他跨领域问题。 有关详细信息,请参阅交易。 自选。 |
15 | 轮询器可以成为事务性的。 有关更多信息,请参阅 AOP 建议链。 自选。 |
该属性仅在内部轮询器定义上是允许的。
在顶级轮询器上定义此属性会导致在初始化应用程序上下文期间引发配置异常。ref |
使用建议链时,无法指定子元素。
相反,声明一个 Bean 并将其添加到 .
有关完整的配置详细信息,请参阅轮询器事务支持。<transactional/> <tx:advice/> <advice-chain/> |
在运行时更改轮询速率
使用 or 属性配置轮询器时,默认实现使用实例。
是核心 Spring 框架的一部分。
它只接受间隔作为构造函数参数。
因此,无法在运行时更改它。fixed-delay
fixed-rate
PeriodicTrigger
PeriodicTrigger
但是,您可以定义自己的接口实现。
您甚至可以将其用作起点。
然后,您可以为间隔(周期)添加一个 setter,甚至可以在触发器本身中嵌入自己的限制逻辑。
该属性与每次调用一起使用,以安排下一次轮询。
要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 Bean 定义,并使用引用自定义触发器 Bean 实例的属性将依赖项注入到轮询器配置中。
您现在可以获取对触发器 Bean 的引用,并更改轮询之间的轮询间隔。org.springframework.scheduling.Trigger
PeriodicTrigger
period
nextExecutionTime
trigger
有关示例,请参阅 Spring Integration Samples 项目。
它包含一个名为 的示例,该示例使用自定义触发器,并演示了在运行时更改轮询间隔的功能。dynamic-poller
该示例提供了一个实现 org.springframework.scheduling.Trigger
接口的自定义触发器。
该示例的触发器基于 Spring 的 PeriodicTrigger
实现。
但是,自定义触发器的字段不是最终的,并且属性具有显式的 getter 和 setter,允许您在运行时动态更改轮询周期。
但需要注意的是,由于 Trigger 方法是 ,因此对动态触发器的任何更改在基于现有配置的下一次轮询之前都不会生效。
无法强制触发器在当前配置的下一次执行时间之前触发。nextExecutionTime() |
但需要注意的是,由于 Trigger 方法是 ,因此对动态触发器的任何更改在基于现有配置的下一次轮询之前都不会生效。
无法强制触发器在当前配置的下一次执行时间之前触发。nextExecutionTime() |
有效负载类型转换
在本参考手册中,您还可以看到接受消息或任意消息作为输入参数的各种端点的特定配置和实现示例。
在 的情况下,此类参数映射到消息有效负载或有效负载或标头的一部分(使用 Spring Expression Language 时)。
但是,终结点方法的输入参数类型有时与有效负载或其部分的类型不匹配。
在这种情况下,我们需要执行类型转换。
Spring Integration 提供了一种方便的方法,用于在名为 的转换服务 Bean 的实例中注册类型转换器(通过使用 Spring )。
一旦使用 Spring Integration 基础结构定义了第一个转换器,就会自动创建该 Bean。
要注册转换器,您可以实现 、 或 。Object
Object
ConversionService
integrationConversionService
org.springframework.core.convert.converter.Converter
org.springframework.core.convert.converter.GenericConverter
org.springframework.core.convert.converter.ConverterFactory
实现是最简单的,可以从一种类型转换为另一种类型。
对于更复杂的操作,例如转换为类层次结构,您可以实现 a 和 .
通过这些描述符,您可以完全访问 和 类型描述符,从而实现复杂的转换。
例如,如果您有一个名为 (参数类型、通道数据类型等)的抽象类,则有两个名为 and 的具体实现,并且您希望根据输入类型转换为其中一个,这将是一个很好的选择。
有关更多信息,请参阅以下接口的 Javadoc:Converter
GenericConverter
ConditionalConverter
from
to
Something
Thing1
Thing
GenericConverter
实现转换器后,可以使用方便的命名空间支持对其进行注册,如以下示例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用内 Bean,如以下示例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,您可以使用注解来创建前面的配置,如以下示例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用批注,如以下示例所示:@Configuration
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
在配置应用程序上下文时,Spring Framework 允许您添加 bean(请参阅配置 ConversionService 一章)。
当需要时,此服务用于在 Bean 创建和配置期间执行适当的转换。 相反,用于运行时转换。
这些用途是完全不同的。
如果在运行时用于针对数据类型通道、有效负载类型转换器等中的消息进行 Spring Integration 表达式评估,则用于连接 Bean 构造函数参数和属性时使用的转换器可能会产生意外结果。 但是,如果您确实想使用 Spring 作为 Spring 集成,则可以在应用程序上下文中配置别名,如以下示例所示:
在这种情况下,提供的转换器可用于 Spring Integration 运行时转换。 |
在配置应用程序上下文时,Spring Framework 允许您添加 bean(请参阅配置 ConversionService 一章)。
当需要时,此服务用于在 Bean 创建和配置期间执行适当的转换。 相反,用于运行时转换。
这些用途是完全不同的。
如果在运行时用于针对数据类型通道、有效负载类型转换器等中的消息进行 Spring Integration 表达式评估,则用于连接 Bean 构造函数参数和属性时使用的转换器可能会产生意外结果。 但是,如果您确实想使用 Spring 作为 Spring 集成,则可以在应用程序上下文中配置别名,如以下示例所示:
在这种情况下,提供的转换器可用于 Spring Integration 运行时转换。 |
内容类型转换
从 V5.0 开始,默认情况下,方法调用机制基于基础结构。
它的实现(如 和 )可以使用抽象将传入转换为目标方法参数类型。
转换可以基于邮件头。
为此,Spring Integration 提供了 ,它委托给要调用的已注册转换器列表,直到其中一个转换器返回非 null 结果。
默认情况下,此转换器提供(严格顺序):org.springframework.messaging.handler.invocation.InvocableHandlerMethod
HandlerMethodArgumentResolver
PayloadArgumentResolver
MessageMethodArgumentResolver
MessageConverter
payload
contentType
ConfigurableCompositeMessageConverter
请参阅 Javadoc(在前面的列表中链接),以获取有关其用途和适当转换值的更多信息。
之所以使用 ,是因为它可以与任何其他实现一起提供,包括或不包括前面提到的默认转换器。
它还可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如以下示例所示:contentType
ConfigurableCompositeMessageConverter
MessageConverter
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在默认值之前注册到复合中。
您也可以不使用 bean,而是通过注册具有名称的 bean(通过设置属性)来提供自己的 bean。ConfigurableCompositeMessageConverter
MessageConverter
integrationArgumentResolverMessageConverter
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
使用 SpEL 方法调用时,基于 -based (包括标头) 的转换不可用。
在这种情况下,只有上面在有效负载类型转换中提到的常规类到类转换可用。MessageConverter contentType |
使用 SpEL 方法调用时,基于 -based (包括标头) 的转换不可用。
在这种情况下,只有上面在有效负载类型转换中提到的常规类到类转换可用。MessageConverter contentType |
异步轮询
如果希望轮询是异步的,轮询器可以选择指定指向任何 Bean 的现有实例的属性(Spring 3.0 通过命名空间提供了方便的命名空间配置)。
但是,在使用 .task-executor
TaskExecutor
task
TaskExecutor
问题在于有两种配置,轮询器和 .
它们必须彼此协调一致。
否则,您最终可能会造成人为的内存泄漏。TaskExecutor
请考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置演示了失调配置。
默认情况下,任务执行程序具有无界任务队列。 即使所有线程都被阻塞,轮询器也会继续计划新任务,等待新消息到达或超时到期。 假设有 20 个线程执行任务,超时时间为 5 秒,则它们以每秒 4 次的速率执行。 但是,新任务以每秒 20 个的速度调度,因此任务执行器中的内部队列以每秒 16 个的速度增长(当进程处于空闲状态时),因此存在内存泄漏。
处理此问题的方法之一是设置任务执行器的属性。
即使 0 也是一个合理的值。
您还可以通过设置任务执行程序的属性(例如,到 )来指定如何处理无法排队的消息来管理它。
换句话说,在配置时必须了解某些细节。
有关该主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”。queue-capacity
rejection-policy
DISCARD
TaskExecutor
端点内部 Bean
许多端点都是复合 Bean。
这包括所有使用者和所有轮询的入站通道适配器。
使用者(轮询或事件驱动)委托给 .
轮询适配器通过委托给 .
通常,获取对委托 Bean 的引用很有用,也许可以在运行时更改配置或进行测试。
这些豆子可以从众所周知的名称中获得。 实例使用类似于 (其中“consumer” 是终端节点属性的值)的 Bean ID 注册到应用程序上下文中。 实例使用类似于 的 Bean ID 进行注册,其中 'somePolledAdapter' 是适配器的 ID。MessageHandler
MessageSource
ApplicationContext
MessageHandler
someConsumer.handler
id
MessageSource
somePolledAdapter.source
上述内容仅适用于框架组件本身。 您可以改用内 Bean 定义,如以下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
Bean 被视为声明的任何内部 Bean,并且未在应用程序上下文中注册。
如果您希望以其他方式访问此 Bean,请在顶层使用 an 声明它,并改用该属性。
有关更多信息,请参阅 Spring 文档。id
ref