消息端点

本章的第一部分涵盖了一些背景理论,并揭示了相当多的关于驱动 Spring Integration 的各种消息传递组件的底层 API。 如果您想真正了解幕后发生的事情,这些信息可能会有所帮助。 但是,如果您想启动并运行各种元素的简化基于命名空间的配置,请随时跳到 Endpoint Namespace Support (终端节点命名空间支持)。spring-doc.cn

如概述中所述,消息终端节点负责将各种消息收发组件连接到通道。 在接下来的几章中,我们将介绍许多使用消息的不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面的 Message Channels 中所示,您可以向消息通道发送消息。 但是,接收稍微复杂一些。 主要原因是有两种类型的使用者:轮询使用者事件驱动使用者spring-doc.cn

在这两者中,事件驱动型消费者要简单得多。 无需管理和调度单独的 Poller 线程,它们本质上是具有回调方法的侦听器。 当连接到 Spring Integration 的可订阅消息通道之一时,这个简单的选项效果很好。 但是,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。 Spring 集成提供了两种不同的端点实现来容纳这两种类型的消费者。 因此,消费者自己只需要实现回调接口即可。 当需要轮询时,终端节点充当使用者实例的容器。 其好处类似于使用容器托管消息驱动的 bean,但是,由于这些使用者是在 中运行的 Spring 管理的对象,因此它更类似于 Spring 自己的容器。ApplicationContextMessageListenerspring-doc.cn

消息处理程序

Spring Integration 的接口由框架中的许多组件实现。 换句话说,这不是公共 API 的一部分,您通常不会直接实现。 尽管如此,消息使用者使用它来实际处理使用的消息,因此了解此策略接口确实有助于理解使用者的整体角色。 接口定义如下:MessageHandlerMessageHandlerspring-doc.cn

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

尽管它很简单,但这个接口为以下章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。 这些组件各自对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。 Spring 集成提供了两个端点实现,它们托管这些基于回调的处理程序,并让它们连接到消息通道。spring-doc.cn

事件驱动型消费者

因为它是两者中更简单的,所以我们首先介绍事件驱动的使用者终端节点。 你可能还记得,该接口提供了一个方法,并且该方法接受一个参数(如SubscribableChannel所示)。 下面的清单显示了该方法的定义:SubscribableChannelsubscribe()MessageHandlersubscribespring-doc.cn

subscribableChannel.subscribe(messageHandler);

由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的使用者,并且 Spring Integration 提供的实现接受 a 和 a ,如下例所示:SubscribableChannelMessageHandlerspring-doc.cn

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

轮询消费者

Spring 集成还提供了一个 ,它可以以相同的方式实例化,只是通道必须实现 ,如下例所示:PollingConsumerPollableChannelspring-doc.cn

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询使用者的更多信息,请参阅通道适配器和通道适配器

轮询使用者还有许多其他配置选项。 以下示例显示如何设置触发器:spring-doc.cn

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));

通常使用简单的间隔 () 定义,但也支持属性和布尔属性(默认值为 — 即没有固定延迟)。 以下示例设置这两个属性:PeriodicTriggerDurationinitialDelayfixedRatefalsespring-doc.cn

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);

前面示例中的三个设置的结果是一个等待 5 秒,然后每秒触发一次的触发器。spring-doc.cn

这需要有效的 cron 表达式。 有关详细信息,请参阅 Javadoc。 以下示例将新的 :CronTriggerCronTriggerspring-doc.cn

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

上一个示例中定义的触发器的结果是星期一到星期五每 10 秒触发一次的触发器。spring-doc.cn

轮询终端节点的默认触发器是具有 1 秒固定延迟期的实例。PeriodicTrigger

除了触发器之外,您还可以指定另外两个与轮询相关的配置属性:和 。 以下示例说明如何设置这两个属性:maxMessagesPerPollreceiveTimeoutspring-doc.cn

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

该属性指定在给定轮询操作中要接收的最大消息数。 这意味着 poller 继续调用而不等待,直到返回或达到最大值。 例如,如果 Poller 具有 10 秒间隔触发器且设置为 ,并且它正在轮询队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。 它抓取 25 个,等待 10 秒,抓取下一个 25 个,依此类推。 如果 配置为负值,则在单个轮询周期内调用 ,直到返回 。 从版本 5.5 开始,值具有特殊含义 - 完全跳过调用,这可能被视为暂停此轮询端点,直到稍后将 更改为非零值,例如通过 Control Bus。maxMessagesPerPollreceive()nullmaxMessagesPerPoll25maxMessagesPerPollMessageSource.receive()null0MessageSource.receive()maxMessagesPerPollspring-doc.cn

该属性指定 Poller 在调用 receive 操作时如果没有可用的消息时应等待的时间。 例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个选项的间隔触发为 5 秒,接收超时为 50 毫秒,而第二个选项的间隔触发为 50 毫秒,接收超时为 5 秒。 第一个接收消息的时间可能比它在通道上接受的晚 4950 毫秒(如果该消息在其 poll 调用之一返回后立即到达)。 另一方面,第二个配置永远不会错过超过 50 毫秒的消息。 区别在于第二个选项需要线程等待。 但是,因此,它可以更快地响应到达的消息。 这种技术称为 “长轮询”,可用于在轮询源上模拟事件驱动的行为。receiveTimeoutspring-doc.cn

轮询使用者还可以委托给 Spring ,如下例所示:TaskExecutorspring-doc.cn

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

此外,a 具有一个名为 的属性。 此属性允许您指定 AOP 建议,以处理其他横切关注点,包括事务。 这些建议是围绕方法应用的。 有关更深入的信息,请参阅 Endpoint Namespace Support 下的有关 AOP 建议链和事务支持的部分。 另请参阅注释 Javadocs 和相应的消息传送注释支持部分。 Java DSL 还提供了 .poller() 端点配置选项及其各自的工厂。PollingConsumeradviceChainListdoPoll()@PollerPollersspring-doc.cn

前面的示例显示了依赖项查找。 但是,请记住,这些使用者通常配置为 Spring bean 定义。 事实上, Spring 集成还提供了一个调用,它根据通道的类型创建适当的消费者类型。 此外,Spring 集成具有完整的 XML 名称空间支持,以进一步隐藏这些细节。 本指南介绍了基于命名空间的配置,因为介绍了每种组件类型。FactoryBeanConsumerEndpointFactoryBeanspring-doc.cn

许多实现可以生成回复消息。 如前所述,与接收消息相比,发送消息是微不足道的。 但是,何时发送回复消息以及发送多少回复消息取决于处理程序类型。 例如,聚合器等待大量消息到达,并且通常配置为拆分器的下游使用者,该拆分器可以为其处理的每条消息生成多个回复。 使用 namespace 配置时,您不需要严格了解所有详细信息。 但是,仍然值得知道,这些组件中的几个共享一个公共基类 ,并且它提供了一个方法。MessageHandlerAbstractReplyProducingMessageHandlersetOutputChannel(..)

终端节点命名空间支持

在本参考手册中,您可以找到终端节点元素的特定配置示例,例如 router、transformer、service-activator 等。 其中大多数都支持属性,许多都支持属性。 解析后,这些 endpoint 元素会生成 the 或 the 的实例,具体取决于引用的类型: 或 。 当通道是可轮询的时,轮询行为基于 endpoint 元素的 sub-element 及其属性。input-channeloutput-channelPollingConsumerEventDrivenConsumerinput-channelPollableChannelSubscribableChannelpollerspring-doc.cn

下面列出了 的所有可用配置选项 :pollerspring-doc.cn

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            initial-delay=""                         (6)
            id=""                                    (7)
            max-messages-per-poll=""                 (8)
            receive-timeout=""                       (9)
            ref=""                                   (10)
            task-executor=""                         (11)
            time-unit="MILLISECONDS"                 (12)
            trigger="">                              (13)
            <int:advice-chain />                     (14)
            <int:transactional />                    (15)
</int:poller>
1 提供使用 Cron 表达式配置 Poller 的功能。 底层实现使用 . 如果设置了此属性,则不必指定以下任何属性: 、 、 和 。org.springframework.scheduling.support.CronTriggerfixed-delaytriggerfixed-rateref
2 通过将此属性设置为 ,你可以定义一个全局默认 Poller。 如果在应用程序上下文中定义了多个默认 Poller,则会引发异常。 任何连接到 () 的端点或任何没有显式配置的 Poller 的端点都使用全局默认 Poller。 它默认为 . 自选。truePollableChannelPollingConsumerSourcePollingChannelAdapterfalse
3 标识如果此 Poller 的调用失败,则向其发送错误消息的通道。 要完全禁止显示异常,您可以提供对 . 自选。nullChannel
4 fixed delay trigger 在幕后使用 a。 数值采用或可以是 duration 格式(从版本 6.2 开始),例如 , 。 如果设置了此属性,则不必指定以下任何属性: 、 、 和 。PeriodicTriggertime-unitPT10SP1Dfixed-ratetriggercronref
5 固定速率触发器在幕后使用 a。 数值采用或可以是 duration 格式(从版本 6.2 开始),例如 , 。 如果设置了此属性,则不必指定以下任何属性: 、 、 和 。PeriodicTriggertime-unitPT10SP1Dfixed-delaytriggercronref
6 a 的初始延迟 under the covers(从版本 6.2 开始)。 数值采用或可以是持续时间格式,例如 , .PeriodicTriggertime-unitPT10SP1D
7 引用 poller 的基础 bean 定义的 ID,其类型为 。 该属性对于顶级 poller 元素是必需的,除非它是默认的 poller () 。org.springframework.integration.scheduling.PollerMetadataiddefault="true"
8 有关更多信息,请参阅Configuring An Inbound Channel Adapter。 如果未指定,则默认值取决于上下文。 如果使用 ,则此属性默认为 。 但是,如果使用 ,则该属性默认为 。 自选。PollingConsumer-1SourcePollingChannelAdaptermax-messages-per-poll1
9 value 在底层 class 上设置。 如果未指定,则默认为 1000(毫秒)。 自选。PollerMetadata
10 Bean 引用另一个顶级 Poller 的 Pod 引用。 该属性不得出现在 top-level 元素上。 但是,如果设置了此属性,则不必指定以下任何属性:、、 、 和 。refpollerfixed-ratetriggercronfixed-delay
11 提供引用自定义任务执行程序的功能。 有关更多信息,请参阅 TaskExecutor 支持。 自选。
12 此属性指定基础 的 enum 值。 因此,此属性只能与 or 属性结合使用。 如果与 或 reference 属性结合使用,则会导致失败。 a 支持的最小粒度为毫秒。 因此,唯一可用的选项是毫秒和秒。 如果未提供此值,则 any 或 value 将解释为毫秒。 基本上,此枚举为基于秒的 interval 触发器值提供了便利。 对于每小时、每天和每月设置,我们建议改用触发器。java.util.concurrent.TimeUnitorg.springframework.scheduling.support.PeriodicTriggerfixed-delayfixed-ratecrontriggerPeriodicTriggerfixed-delayfixed-ratecron
13 对实现接口的任何 Spring 配置的 bean 的引用。 但是,如果设置了此属性,则不必指定以下任何属性:、、 、 和 。 自选。org.springframework.scheduling.Triggerfixed-delayfixed-ratecronref
14 允许指定额外的 AOP 建议来处理其他横切关注点。 有关更多信息,请参阅 交易 。 自选。
15 Poller 可以成为事务性的。 有关更多信息,请参阅 AOP Advice 链。 自选。

例子

可以按如下方式配置具有 1 秒间隔的简单基于间隔的 Poller :spring-doc.cn

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

作为使用 attribute 的替代方法,您还可以使用 attribute。fixed-ratefixed-delayspring-doc.cn

对于基于 Cron 表达式的 Poller ,请改用该属性,如下例所示:cronspring-doc.cn

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

如果 input 通道是 a ,则需要 poller 配置。 具体来说,如前所述,this 是类的 required 属性。 因此,如果省略轮询使用者终结点配置的子元素,则可能会引发异常。 如果你试图在连接到不可轮询通道的元素上配置 Poller,也可能引发异常。PollableChanneltriggerPollingConsumerpollerspring-doc.cn

也可以创建顶级 Poller,在这种情况下,只需要一个属性,如下例所示:refspring-doc.cn

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
该属性只允许在内部 Poller 定义上。 在顶级 Poller 上定义此属性会导致在应用程序上下文初始化期间引发配置异常。ref

全局默认轮询器

为了进一步简化配置,你可以定义一个全局默认 Poller。 XML DSL 中的单个顶级 Poller 组件可以将属性设置为 。 对于 Java 配置,在这种情况下必须声明具有 name 的 bean。 在这种情况下,任何具有 a 的 input 通道的端点,如果在同一 个中定义,并且没有明确配置,则使用该默认值。 下面的示例展示了这样的 poller 和使用它的转换器:defaulttruePollerMetadataPollerMetadata.DEFAULT_POLLERPollableChannelApplicationContextpollerspring-doc.cn

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlow.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>

事务支持

Spring 集成还为 pollers 提供了事务支持,以便每个接收和转发操作都可以作为原子工作单元执行。 要为 Poller 配置事务,请添加 sub-element。 以下示例显示了可用属性:<transactional/>spring-doc.cn

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

有关更多信息,请参阅 轮询器事务支持spring-doc.cn

AOP 建议链

由于 Spring 事务支持依赖于代理机制,其中(AOP 建议)处理由 Poller 发起的消息流的事务行为,因此有时必须提供额外的建议来处理与 Poller 关联的其他横切行为。 为此,它定义了一个元素,该元素允许您在实现该接口的类中添加更多建议。 以下示例说明如何定义 for a :TransactionInterceptorpolleradvice-chainMethodInterceptoradvice-chainpollerspring-doc.cn

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

有关如何实现该接口的更多信息,请参见 Spring Framework Reference Guide 的 AOP 部分。 建议链也可以应用于没有任何事务配置的 Poller,从而增强 Poller 启动的消息流的行为。MethodInterceptorspring-doc.cn

使用通知链时,不能指定 child 元素。 相反,声明一个 bean 并将其添加到 . 有关完整的配置详细信息,请参阅 Poller Transaction Support<transactional/><tx:advice/><advice-chain/>

TaskExecutor 支持

轮询线程可以由 Spring 抽象的任何实例执行。 这将为一个终端节点或一组终端节点启用并发。 从 Spring 3.0 开始,核心 Spring Framework 有一个命名空间,它的元素支持创建一个简单的线程池执行器。 该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。 配置线程池执行程序可以对终端节点在负载下的性能产生重大影响。 这些设置适用于每个终端节点,因为终端节点的性能是要考虑的主要因素之一(另一个主要因素是终端节点订阅的通道上的预期卷)。 要为配置了 XML 命名空间支持的轮询终端节点启用并发,请提供对其元素的引用,然后提供以下示例中所示的一个或多个属性:TaskExecutortask<executor/>task-executor<poller/>spring-doc.cn

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

如果您不提供 task-executor,则在调用者的线程中调用使用者的处理程序。 请注意,调用方通常是默认调用方(请参阅配置 Task Scheduler)。 您还应该记住,该属性可以通过指定 bean 名称来提供对 Spring 接口的任何实现的引用。 为方便起见,提供了前面显示的元素。TaskSchedulertask-executorTaskExecutorexecutorspring-doc.cn

如前面在轮询使用者的 background 部分中提到的,您还可以以模拟事件驱动行为的方式配置轮询使用者。 在触发器中使用较长的接收超时和较短的间隔,您可以确保对到达的消息做出非常及时的反应,即使在轮询的消息源上也是如此。 请注意,这仅适用于具有超时的阻塞 wait 调用的源。 例如,文件 poller 不会阻塞。 每个调用都会立即返回,并且要么包含新文件,要么不包含新文件。 因此,即使 poller 包含 long ,该值也永远不会在这种情况下使用。 另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。 以下示例显示了轮询使用者如何几乎即时地接收消息:receive()receive-timeoutspring-doc.cn

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)颠簸、无限 while 循环那样多的 CPU 资源使用。spring-doc.cn

在运行时更改轮询率

当使用 a 或 a 属性配置 Poller 时,默认实现使用实例。 它是核心 Spring 框架的一部分。 它仅接受 interval 作为构造函数参数。 因此,无法在运行时更改它。fixed-delayfixed-ratePeriodicTriggerPeriodicTriggerspring-doc.cn

但是,您可以定义自己的接口实现。 您甚至可以将 用作起点。 然后,您可以为间隔 (period) 添加 setter,甚至可以在触发器本身中嵌入自己的限制逻辑。 该属性与每次调用 to 一起使用,以安排下一次轮询。 要在 Poller 中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用引用自定义触发器 bean 实例的属性将依赖项注入到 poller 配置中。 现在,您可以获取对触发器 Bean 的引用并更改轮询之间的轮询间隔。org.springframework.scheduling.TriggerPeriodicTriggerperiodnextExecutionTimetriggerspring-doc.cn

有关示例,请参阅 Spring Integration Samples 项目。 它包含一个名为 的示例,该示例使用自定义触发器,并演示了在运行时更改轮询间隔的能力。dynamic-pollerspring-doc.cn

该示例提供了一个实现org.springframework.scheduling.Trigger接口的自定义触发器。 该示例的触发器基于 Spring 的PeriodicTrigger实现。 但是,自定义触发器的字段不是最终的,并且属性具有显式 getter 和 setter,允许您在运行时动态更改轮询周期。spring-doc.cn

但是,请务必注意,由于 Trigger 方法是 ,因此根据现有配置,对动态触发器的任何更改在下一次轮询之前都不会生效。 无法强制触发器在当前配置的下一次执行时间之前触发。nextExecutionTime()

负载类型转换

在本参考手册中,您还可以看到接受 message 或 any arbitrary 作为输入参数的各种终端节点的特定配置和实现示例。 在 的情况下 ,这样的参数被映射到消息有效负载或有效负载或 Headers 的一部分(使用 Spring 表达式语言时)。 但是,端点方法的 input 参数类型有时与有效负载或其部分的类型不匹配。 在此方案中,我们需要执行类型转换。 Spring 集成提供了一种方便的方法,用于在其自己的名为 . 一旦使用 Spring 集成基础结构定义了第一个转换器,就会自动创建该 bean。 要注册转换器,您可以实现 、 、 或 。ObjectObjectConversionServiceintegrationConversionServiceorg.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactoryspring-doc.cn

实现是最简单的,可以从单个类型转换为另一种类型。 对于更复杂的操作,例如转换为类层次结构,您可以实现 a 和可能的 . 这些提供了对 和 类型描述符的完全访问权限,从而实现复杂的转换。 例如,如果您有一个名为 abstract class 的 abstract class,它是转换的目标(参数类型、通道数据类型等),则您有两个名为 和 的具体实现,并且您希望根据输入类型转换为其中一个或另一个,那么 this 将是一个不错的选择。 有关更多信息,请参阅以下接口的 Javadoc:ConverterGenericConverterConditionalConverterfromtoSomethingThing1ThingGenericConverterspring-doc.cn

实现转换器后,可以使用方便的命名空间支持注册它,如下例所示:spring-doc.cn

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

或者,你可以使用内部 Bean,如下例所示:spring-doc.cn

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

从 Spring 集成 4.0 开始,你可以使用 Comments 来创建前面的配置,如下例所示:spring-doc.cn

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

或者,您可以使用注释,如下例所示:@Configurationspring-doc.cn

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

在配置应用程序上下文时, Spring 框架允许您添加 bean (请参见配置 ConversionService 章节)。 如果需要,此服务用于在 bean 创建和配置期间执行适当的转换。conversionServicespring-doc.cn

相反,the 用于运行时转换。 这些用途完全不同。 如果在运行时用于对数据类型通道、有效负载类型转换器等中的消息进行 Spring 集成表达式评估,则用于连接 bean 构造函数参数和属性时使用的转换器可能会产生意外的结果。integrationConversionServicespring-doc.cn

但是,如果您确实希望将 Spring 用作 Spring 集成 ,则可以在应用程序上下文中配置别名,如下例所示:conversionServiceintegrationConversionServicespring-doc.cn

<alias name="conversionService" alias="integrationConversionService"/>

在这种情况下,提供的转换器可用于 Spring 集成运行时转换。conversionServicespring-doc.cn

内容类型转换

从版本 5.0 开始,默认情况下,方法调用机制基于基础结构。 它的实现(例如 和 )可以使用抽象将 incoming 转换为目标方法参数类型。 转换可以基于消息标头。 为此, Spring 集成提供了 ,它委托给要调用的已注册转换器列表,直到其中一个转换器返回非空结果。 默认情况下,此转换器提供(按严格顺序):org.springframework.messaging.handler.invocation.InvocableHandlerMethodHandlerMethodArgumentResolverPayloadArgumentResolverMessageMethodArgumentResolverMessageConverterpayloadcontentTypeConfigurableCompositeMessageConverterspring-doc.cn

请参阅 Javadoc(在前面的列表中链接),以了解有关其用途和适当的转换值的更多信息。 之所以使用 The ,是因为它可以与任何其他 implementations 一起提供,包括或不包括前面提到的 default converters。 它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如下例所示:contentTypeConfigurableCompositeMessageConverterMessageConverterspring-doc.cn

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

这两个新转换器在 defaults 之前注册在 composite 中。 您也可以不使用 a 但通过注册一个名称为 (通过设置属性) 的 bean 来提供自己的 bean。ConfigurableCompositeMessageConverterMessageConverterintegrationArgumentResolverMessageConverterIntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAMEspring-doc.cn

使用 SPEL 方法调用时,基于 -(包括 header)的转换不可用。 在这种情况下,只有上面 Payload Type Conversion 中提到的常规类到类转换可用。MessageConvertercontentType

异步轮询

如果你希望轮询是异步的,那么 Poller 可以选择指定一个指向任何 bean 的现有实例的属性(Spring 3.0 通过名称空间提供了一个方便的命名空间配置)。 但是,在使用 .task-executorTaskExecutortaskTaskExecutorspring-doc.cn

问题在于有两种配置,即 poller 和 . 他们必须彼此协调一致。 否则,您最终可能会造成人为的内存泄漏。TaskExecutorspring-doc.cn

请考虑以下配置:spring-doc.cn

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

前面的配置演示了 Out-of-tune 配置。spring-doc.cn

默认情况下,任务执行程序具有无界任务队列。 即使所有线程都被阻塞,Poller 也会继续调度新任务,等待新消息到达或超时过期。 假设有 20 个线程执行任务,超时时间为 5 秒,因此它们以每秒 4 个的速率执行。 但是,新任务以每秒 20 个的速率调度,因此任务执行程序中的内部队列以每秒 16 个的速度增长(当进程空闲时),因此我们存在内存泄漏。spring-doc.cn

处理此问题的方法之一是设置 task executor 的属性。 即使 0 也是一个合理的值。 您还可以通过设置 Task Executor 的属性(例如,to )来指定如何处理无法排队的消息,从而对其进行管理。 换句话说,在配置 时,您必须了解某些详细信息。 有关该主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”。queue-capacityrejection-policyDISCARDTaskExecutorspring-doc.cn

端点内部 Bean

许多端点是复合 bean。 这包括所有使用者和所有轮询的入站通道适配器。 使用者(轮询或事件驱动型)委托给 . 轮询适配器通过委托 来获取消息。 通常,获取对委托 Bean 的引用很有用,这可能是为了在运行时更改配置或用于测试。 这些豆子可以从众所周知的名字中获得。 实例使用类似于 bean ID 的 bean ID 在应用程序上下文中注册(其中 'consumer' 是端点属性的值)。 实例使用类似于 的 bean ID 注册,其中 'somePolledAdapter' 是适配器的 ID。MessageHandlerMessageSourceApplicationContextMessageHandlersomeConsumer.handleridMessageSourcesomePolledAdapter.sourcespring-doc.cn

上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如下例所示:spring-doc.cn

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

该 bean 的处理方式与声明的任何内部 bean 相同,并且未在应用程序上下文中注册。 如果您希望以其他方式访问此 bean,请在顶层使用 an 声明它,并改用该属性。 有关更多信息,请参阅 Spring 文档idrefspring-doc.cn