通道适配器是一个消息端点,它允许将单个发送方或接收方连接到消息通道。 Spring Integration 提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。 本参考指南的后续章节将讨论每个适配器。 但是,本章重点介绍简单但灵活的方法调用通道适配器支持。 有入站适配器和出站适配器,每个适配器都可以使用核心命名空间中提供的 XML 元素进行配置。 这些提供了一种扩展 Spring Integration 的简单方法,只要您有一个可以作为源或目标调用的方法。

配置入站通道适配器

元素(在 Java 配置中为 a)可以在 Spring 托管对象上调用任何方法,并在将方法的输出转换为 后将非 null 返回值发送给 。 激活适配器的订阅后,轮询器会尝试从源接收消息。 轮询器根据提供的配置进行调度。 要为单个通道适配器配置轮询间隔或 cron 表达式,可以为“poller”元素提供其中一个调度属性,例如“fixed-rate”或“cron”。 下面的示例定义了两个实例:inbound-channel-adapterSourcePollingChannelAdapterMessageChannelMessageTaskSchedulerinbound-channel-adapter

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
如果未提供轮询器,则必须在上下文中注册单个默认轮询器。 有关详细信息,请参阅终结点命名空间支持
轮询终端节点的默认触发器是具有 1 秒固定延迟期的实例。PeriodicTrigger
重要:轮询器配置

所有类型都由 支持,这意味着它们包含一个轮询器配置,该配置根据 Poller 中指定的配置轮询 (以调用生成成为有效负载的值的自定义方法)。 以下示例显示了两个轮询器的配置:inbound-channel-adapterSourcePollingChannelAdapterMessageSourceMessage

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一种配置中,轮询任务每次轮询调用一次,并且在每次任务(轮询)期间,根据属性值调用一次方法(导致消息生成)。 在第二种配置中,轮询任务每次轮询调用 10 次,或者直到它返回“null”,因此每次轮询可能生成 10 条消息,而每次轮询以 1 秒的间隔进行。 但是,如果配置如以下示例所示,会发生什么情况:max-messages-per-poll

<int:poller fixed-rate="1000"/>

请注意,没有指定。 正如我们稍后介绍的那样,相同的轮询器配置(例如,、、等)的默认值为 for ,这意味着“除非轮询方法返回 null(可能是因为轮询方法中没有更多消息)否则不间断地执行轮询任务”,然后休眠一秒钟。max-messages-per-pollPollingConsumerservice-activatorfilterrouter-1max-messages-per-pollQueueChannel

但是,在 中,它有点不同。 的默认值为 ,除非显式将其设置为负值(例如 )。 这确保了轮询器可以对生命周期事件(如启动和停止)做出反应,并防止轮询器在自定义方法的实现可能永远不会返回 null 并且恰好是不可中断的情况下在无限循环中旋转。SourcePollingChannelAdaptermax-messages-per-poll1-1MessageSource

但是,如果确定方法可以返回 null,并且需要轮询每个轮询的可用源数,则应显式设置为负值,如以下示例所示:max-messages-per-poll

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

从版本 5.5 开始,for 值具有特殊含义 - 完全跳过呼叫,这可能被视为暂停此入站通道适配器,直到稍后更改为非零值,例如通过控制总线。0max-messages-per-pollMessageSource.receive()maxMessagesPerPoll

从版本 6.2 开始,可以以 ISO 8601 持续时间格式配置 and,例如 等。 此外,基础的 an 也以与 和 类似的值格式公开。fixed-delayfixed-ratePT10SP1Dinitial-intervalPeriodicTriggerfixed-delayfixed-rate

有关详细信息,请参阅全局默认轮询器

如果未提供轮询器,则必须在上下文中注册单个默认轮询器。 有关详细信息,请参阅终结点命名空间支持
轮询终端节点的默认触发器是具有 1 秒固定延迟期的实例。PeriodicTrigger
重要:轮询器配置

所有类型都由 支持,这意味着它们包含一个轮询器配置,该配置根据 Poller 中指定的配置轮询 (以调用生成成为有效负载的值的自定义方法)。 以下示例显示了两个轮询器的配置:inbound-channel-adapterSourcePollingChannelAdapterMessageSourceMessage

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一种配置中,轮询任务每次轮询调用一次,并且在每次任务(轮询)期间,根据属性值调用一次方法(导致消息生成)。 在第二种配置中,轮询任务每次轮询调用 10 次,或者直到它返回“null”,因此每次轮询可能生成 10 条消息,而每次轮询以 1 秒的间隔进行。 但是,如果配置如以下示例所示,会发生什么情况:max-messages-per-poll

<int:poller fixed-rate="1000"/>

请注意,没有指定。 正如我们稍后介绍的那样,相同的轮询器配置(例如,、、等)的默认值为 for ,这意味着“除非轮询方法返回 null(可能是因为轮询方法中没有更多消息)否则不间断地执行轮询任务”,然后休眠一秒钟。max-messages-per-pollPollingConsumerservice-activatorfilterrouter-1max-messages-per-pollQueueChannel

但是,在 中,它有点不同。 的默认值为 ,除非显式将其设置为负值(例如 )。 这确保了轮询器可以对生命周期事件(如启动和停止)做出反应,并防止轮询器在自定义方法的实现可能永远不会返回 null 并且恰好是不可中断的情况下在无限循环中旋转。SourcePollingChannelAdaptermax-messages-per-poll1-1MessageSource

但是,如果确定方法可以返回 null,并且需要轮询每个轮询的可用源数,则应显式设置为负值,如以下示例所示:max-messages-per-poll

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

从版本 5.5 开始,for 值具有特殊含义 - 完全跳过呼叫,这可能被视为暂停此入站通道适配器,直到稍后更改为非零值,例如通过控制总线。0max-messages-per-pollMessageSource.receive()maxMessagesPerPoll

从版本 6.2 开始,可以以 ISO 8601 持续时间格式配置 and,例如 等。 此外,基础的 an 也以与 和 类似的值格式公开。fixed-delayfixed-ratePT10SP1Dinitial-intervalPeriodicTriggerfixed-delayfixed-rate

有关详细信息,请参阅全局默认轮询器

配置出站通道适配器

元素(a 表示 Java 配置)还可以将 a 连接到任何 POJO 使用者方法,该方法应使用发送到该通道的消息的有效负载进行调用。 以下示例演示如何定义出站通道适配器:outbound-channel-adapter@ServiceActivatorMessageChannel

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果要改编的通道是 ,则必须提供轮询器子元素(对 的子注释),如以下示例所示:PollableChannel@Poller@ServiceActivator

  • Java

  • XML

public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

如果 POJO 使用者实现可以在其他定义中重用,则应使用属性。 但是,如果消费者实现仅由 的单个定义引用,则可以将其定义为内部 Bean,如以下示例所示:ref<outbound-channel-adapter><outbound-channel-adapter>

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
不允许在同一配置中同时使用属性和内部处理程序定义,因为这会创建不明确的条件。 此类配置会导致引发异常。ref<outbound-channel-adapter>

可以在没有引用的情况下创建任何通道适配器,在这种情况下,它会隐式创建 的实例。 创建的通道的名称与 or 元素的属性匹配。 因此,如果未提供,则为必需。channelDirectChannelid<inbound-channel-adapter><outbound-channel-adapter>channelid

不允许在同一配置中同时使用属性和内部处理程序定义,因为这会创建不明确的条件。 此类配置会导致引发异常。ref<outbound-channel-adapter>

通道适配器表达式和脚本

与许多其他 Spring Integration 组件一样,和 也为 SpEL 表达式评估提供支持。 要使用 SpEL,请在 'expression' 属性中提供表达式字符串,而不是提供用于在 Bean 上调用方法的 'ref' 和 'method' 属性。 当计算表达式时,它遵循与方法调用相同的约定,其中:每当计算结果为非 null 值时,an 的表达式就会生成一条消息,而 an 的表达式必须等同于 void 返回方法调用。<inbound-channel-adapter><outbound-channel-adapter><inbound-channel-adapter><outbound-channel-adapter>

从 Spring Integration 3.0 开始,还可以配置 SpEL(甚至 )子元素,以用于需要比简单的“expression”属性更复杂的情况。 如果使用该属性将脚本作为 提供,则还可以设置 ,这允许定期刷新资源。 如果要在每次轮询时检查脚本,则需要将此设置与轮询器的触发器协调,如以下示例所示:<int:inbound-channel-adapter/><expression/><script/>Resourcelocationrefresh-check-delay

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

另请参阅 when using the sub-element 上的属性。 有关表达式的更多信息,请参见 Spring 表达式语言 (SpEL)。 有关脚本,请参阅 Groovy 支持脚本支持cacheSecondsReloadableResourceBundleExpressionSource<expression/>

() 是一个端点,它通过定期触发轮询某个底层来启动消息流。 由于在轮询时没有消息对象,因此表达式和脚本无法访问根,因此没有大多数其他消息传递 SpEL 表达式中可用的有效负载或标头属性。 该脚本可以生成并返回一个包含标头和有效负载的完整对象,也可以仅生成一个有效负载,该负载由框架添加到具有基本标头的消息中。<int:inbound-channel-adapter/>SourcePollingChannelAdapterMessageSourceMessageMessage
() 是一个端点,它通过定期触发轮询某个底层来启动消息流。 由于在轮询时没有消息对象,因此表达式和脚本无法访问根,因此没有大多数其他消息传递 SpEL 表达式中可用的有效负载或标头属性。 该脚本可以生成并返回一个包含标头和有效负载的完整对象,也可以仅生成一个有效负载,该负载由框架添加到具有基本标头的消息中。<int:inbound-channel-adapter/>SourcePollingChannelAdapterMessageSourceMessageMessage