核心消息

消息传送通道

消息通道

虽然Message在封装数据方面起着至关重要的作用,它是MessageChannel将消息生成者与消息使用者分离。spring-doc.cadn.net.cn

MessageChannel 接口

Spring 集成的顶级MessageChannel接口定义如下:spring-doc.cadn.net.cn

public interface MessageChannel {

    boolean send(Message message);

    boolean send(Message message, long timeout);
}

发送消息时,返回值为true如果消息发送成功。 如果 send 调用超时或中断,则返回false.spring-doc.cadn.net.cn

PollableChannel

由于消息通道可以也可能不缓冲消息(如 Spring 集成概述中所述),因此两个子接口定义了缓冲(可轮询)和非缓冲(可订阅)通道行为。 下面的清单显示了PollableChannel接口:spring-doc.cadn.net.cn

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}

与 send 方法一样,当接收消息时,如果出现超时或中断,则返回值为 null。spring-doc.cadn.net.cn

SubscribableChannel

SubscribableChannelbase 接口由直接向其订阅的MessageHandler实例。 因此,它们不提供用于轮询的 receive 方法。 相反,它们定义了管理这些订阅者的方法。 下面的清单显示了SubscribableChannel接口:spring-doc.cadn.net.cn

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

消息通道实现

Spring 集成提供了不同的消息通道实现。 以下各节将简要介绍每个选项。spring-doc.cadn.net.cn

PublishSubscribeChannel

PublishSubscribeChannelimplementation 广播任何Message发送到它的所有订阅处理程序。 这最常用于发送事件消息,其主要角色是通知(与文档消息相反,文档消息通常由单个处理程序处理)。 请注意,PublishSubscribeChannel仅用于发送。 由于它会直接向订阅者广播,因此当send(Message)方法,使用者无法轮询消息(它不会实现PollableChannel,因此没有receive()方法)。 相反,任何订阅者本身都必须是MessageHandler和订阅者的handleMessage(Message)方法。spring-doc.cadn.net.cn

在 3.0 版本之前,调用send方法在PublishSubscribeChannel没有返回订阅者false. 当与MessagingTemplate一个MessageDeliveryException被抛出。 从版本 3.0 开始,行为发生了变化,使得send如果至少存在最小订阅者(并成功处理消息),则始终被视为成功。 可以通过设置minSubscribers属性,默认为0.spring-doc.cadn.net.cn

如果您使用TaskExecutor,则仅使用正确数量的订户进行此确定,因为消息的实际处理是异步执行的。
QueueChannel

QueueChannelimplementation 包装一个队列。 与PublishSubscribeChannelQueueChannel具有点对点语义。 换句话说,即使通道有多个消费者,也只有一个消费者应该接收任何Message发送到该通道。 它提供了一个默认的无参数构造函数(提供基本上不受限制的容量Integer.MAX_VALUE)以及接受队列容量的构造函数,如下面的清单所示:spring-doc.cadn.net.cn

public QueueChannel(int capacity)

未达到其容量限制的通道将消息存储在其内部队列中,而send(Message<?>)method 会立即返回,即使没有接收方准备好处理该消息。 如果队列已达到容量上限,则发送方将阻止,直到队列中有 room 可用。 或者,如果您使用具有附加 timeout 参数的 send 方法,则队列将阻止,直到任一房间可用或超时期限已过(以先发生者为准)。 同样,receive()如果队列中有消息可用,则 call 会立即返回,但是,如果队列为空,则 receive 调用可能会阻止,直到消息可用或超时(如果提供)过去。 在任何一种情况下,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。 但请注意,对send()receive()没有timeoutparameter 块。spring-doc.cadn.net.cn

PriorityChannel

QueueChannel强制执行先进先出 (FIFO) 排序,则PriorityChannel是一种替代实现,它允许根据优先级在通道内对消息进行排序。 默认情况下,优先级由priority标头。 但是,对于自定义优先级确定逻辑,类型为Comparator<Message<?>>可以提供给PriorityChannel构造 函数。spring-doc.cadn.net.cn

RendezvousChannel

RendezvousChannel启用“直接切换”场景,其中发送者阻塞,直到另一方调用通道的receive()方法。 另一方会阻止,直到发送方发送消息。 在内部,这个实现与QueueChannel,不同之处在于它使用SynchronousQueue(零容量实现BlockingQueue). 这在发送方和接收方在不同的线程中作的情况下效果很好,但异步将消息放入队列中是不合适的。 换句话说,使用RendezvousChannel,发送方知道某个接收方已经接受了该消息,而使用QueueChannel,则消息将存储到内部队列中,并且可能永远不会收到。spring-doc.cadn.net.cn

请记住,默认情况下,所有这些基于队列的通道都仅将消息存储在内存中。 当需要持久性时,您可以在 'queue' 元素中提供 'message-store' 属性来引用持久性MessageStore实现,或者你可以将本地通道替换为由持久代理支持的通道,例如 JMS 支持的通道或通道适配器。 后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如 JMS 支持中所述。 但是,当不需要在队列中缓冲时,最简单的方法是依赖DirectChannel,将在下一节中讨论。

RendezvousChannel对于实现请求-答复作也很有用。 发送者可以创建一个临时的匿名实例RendezvousChannel,然后在构建Message. 发送后Message,发送者可以立即调用receive(可选提供超时值)以便在等待回复时阻止Message. 这与 Spring 集成的许多请求-回复组件内部使用的实现非常相似。spring-doc.cadn.net.cn

DirectChannel

DirectChannel具有点对点语义,但在其他方面更类似于PublishSubscribeChannel比前面描述的任何基于队列的 Channel 实现都要多。 它实现了SubscribableChannel接口而不是PollableChannel接口,因此它将消息直接分派给订阅者。 但是,作为点对点通道,它与PublishSubscribeChannel因为它将每个Message到单个订阅MessageHandler.spring-doc.cadn.net.cn

除了是最简单的点对点通道选项之外,它最重要的功能之一是它使单个线程能够在通道的 “两侧” 执行作。 例如,如果处理程序订阅了DirectChannel,然后发送Message触发对该处理程序的handleMessage(Message)方法,在send()method invocation 可以返回。spring-doc.cadn.net.cn

提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍然受益于通道提供的抽象和松散耦合。 如果send()调用时,处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)方面发挥作用。spring-doc.cadn.net.cn

由于DirectChannel是最简单的选项,并且不会增加调度和管理 Poller 线程所需的任何额外开销,它是 Spring Integration 中的默认通道类型。 一般的思路是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,然后将这些通道修改为基于队列的PollableChannels. 同样,如果频道需要广播消息,则它不应该是DirectChannel而是一个PublishSubscribeChannel. 稍后,我们将展示如何配置这些通道中的每一个。

DirectChannel在内部委托给消息调度程序来调用其订阅的消息处理程序,并且该调度程序可以具有由load-balancerload-balancer-refattributes (互斥)。 消息调度器使用负载平衡策略来帮助确定当多个消息处理程序订阅同一通道时,如何在消息处理程序之间分发消息。 为方便起见,load-balancerattribute 公开一个值枚举,这些值指向预先存在的LoadBalancingStrategy. 一个round-robin(在轮换的处理程序之间进行负载均衡)和none(对于想要显式禁用负载均衡的情况)是唯一可用的值。 将来的版本中可能会添加其他策略实现。 但是,从版本 3.0 开始,您可以提供自己的LoadBalancingStrategy并使用load-balancer-ref属性,该属性应指向实现LoadBalancingStrategy,如下例所示:spring-doc.cadn.net.cn

一个FixedSubscriberChannel是一个SubscribableChannel仅支持单个MessageHandler无法取消订阅的订阅者。 这对于不涉及其他订阅者且不需要通道拦截器的高吞吐量性能使用案例非常有用。spring-doc.cadn.net.cn

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

请注意,load-balancerload-balancer-ref属性是互斥的。spring-doc.cadn.net.cn

负载均衡还与布尔值failover财产。 如果failovervalue 为 true(默认值),则当前面的处理程序引发异常时,调度程序将回退到任何后续处理程序(根据需要)。 订单由处理程序本身定义的可选 order 值确定,如果不存在此类值,则由处理程序订阅的顺序确定。spring-doc.cadn.net.cn

如果某种情况要求 Dispatcher 始终尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序回退,则不应提供负载平衡策略。 换句话说,调度程序仍然支持failoverboolean 属性,即使未启用负载均衡也是如此。 但是,如果没有负载平衡,处理程序的调用总是根据它们的顺序从第一个开始。 例如,当有 primary、secondary、tritiary 等的明确定义时,此方法效果很好。 使用命名空间支持时,order属性确定顺序。spring-doc.cadn.net.cn

请记住,负载均衡和failover仅当通道具有多个 subscribed message 处理程序时应用。 当使用命名空间支持时,这意味着多个端点共享input-channel属性。

从版本 5.2 开始,当failover为 true,则当前处理程序的失败以及失败的消息将记录在debuginfo如果分别配置。spring-doc.cadn.net.cn

ExecutorChannel

ExecutorChannel是点对点渠道,支持与DirectChannel(负载均衡策略和failoverboolean 属性)。 这两种调度通道类型之间的主要区别在于ExecutorChanneldelegates 传递给TaskExecutor执行调度。 这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。 因此,它不支持跨 sender 和 receiving handler 的事务。spring-doc.cadn.net.cn

发件人有时可能会阻止。 例如,当使用TaskExecutor使用限制客户端的拒绝策略(例如ThreadPoolExecutor.CallerRunsPolicy),则发送方的线程可以在线程池达到其最大容量且执行程序的工作队列已满时执行该方法。 由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。
FluxMessageChannel

FluxMessageChannel是一个org.reactivestreams.Publisherimplementation for"sinking"将消息发送到内部reactor.core.publisher.Flux供下游反应式订阅者按需使用。 此 channel 实现既不是SubscribableChannel,也不是PollableChannel,所以只有org.reactivestreams.Subscriber实例可用于从此通道使用,以遵守反应式流的背压性质。 另一方面,FluxMessageChannel实现一个ReactiveStreamsSubscribableChannel及其subscribeTo(Publisher<Message<?>>)Contract 允许从反应式源发布者接收事件,将反应式流桥接到集成流中。 为了实现整个集成流的完全反应行为,必须在流中的所有端点之间放置这样的通道。spring-doc.cadn.net.cn

有关与 Reactive Streams 交互的更多信息,请参阅 Reactive Streams Supportspring-doc.cadn.net.cn

作用域通道

Spring 集成 1.0 提供了一个ThreadLocalChannel实现,但从 2.0 开始已被删除。 现在,处理相同需求的更通用方法是添加scope属性分配给频道。 该属性的值可以是上下文中可用的范围的名称。 例如,在 Web 环境中,某些范围可用,并且任何自定义范围实现都可以注册到上下文中。 下面的示例展示了一个应用于通道的线程本地作用域,包括作用域本身的注册:spring-doc.cadn.net.cn

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

上一个示例中定义的通道也在内部委托给队列,但该通道绑定到当前线程,因此队列的内容也类似地绑定。 这样,发送到通道的线程稍后可以接收这些相同的消息,但其他线程将无法访问它们。 虽然很少需要线程范围的通道,但它们在以下情况下可能很有用DirectChannel实例被用来强制执行单个线程作,但任何回复消息都应该发送到 “terminal” 通道。 如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。spring-doc.cadn.net.cn

现在,由于任何通道都可以被限定范围,因此除了 thread-Local 之外,您还可以定义自己的范围。spring-doc.cadn.net.cn

通道拦截器

消息传递体系结构的一个优点是能够提供常见行为,并以非侵入性方式捕获有关通过系统传递的消息的有意义信息。 由于Message实例发送到和接收自MessageChannel实例中,这些通道提供了拦截 Send 和 Receive作的机会。 这ChannelInterceptorstrategy 接口,如下面的清单所示,为这些作中的每一个都提供了方法:spring-doc.cadn.net.cn

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);

    void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}

实现接口后,向 channel 注册拦截器只需进行以下调用即可:spring-doc.cadn.net.cn

channel.addInterceptor(someChannelInterceptor);

返回Message实例可用于转换Message或者可以返回 'null' 以防止进一步处理(当然,任何方法都可以抛出RuntimeException). 此外,preReceivemethod 可以返回false以防止接收作继续进行。spring-doc.cadn.net.cn

请记住,receive()呼叫仅与PollableChannels. 事实上,SubscribableChannelinterface 甚至没有定义receive()方法。 这样做的原因是,当Message发送到SubscribableChannel,它会直接发送给零个或多个订阅者,具体取决于通道类型(例如 一个PublishSubscribeChannel发送到其所有订阅者)。 因此,preReceive(…​),postReceive(…​)afterReceiveCompletion(…​)仅当拦截器应用于PollableChannel.

Spring 集成还提供了 Wire Tap 模式的实现。 它是一个简单的拦截器,它发送Message到另一个通道,而无需更改现有流。 它对于调试和监控非常有用。 Wire Tap 中显示了一个示例。spring-doc.cadn.net.cn

由于很少需要实现所有拦截器方法,因此该接口提供了无作方法(返回voidmethod 没有代码,则Message-returning 方法返回Messageas-is,并且booleanmethod 返回true).spring-doc.cadn.net.cn

拦截器方法的调用 Sequences 取决于通道的类型。 如前所述,基于队列的通道是唯一receive()方法首先被拦截。 此外,发送和接收拦截之间的关系取决于单独的发送方和接收方线程的计时。 例如,如果接收方在等待消息时已被阻止,则顺序可能如下所示:preSend,preReceive,postReceive,postSend. 但是,如果接收方在发送方在通道上放置消息并已返回后进行轮询,则顺序将如下所示:preSend,postSend(有时已经过去了)、preReceive,postReceive. 在这种情况下,经过的时间取决于许多因素,因此通常是不可预测的(事实上,接收可能永远不会发生)。 队列的类型也起着一定的作用(例如,rendezvous 与 priority)。 简而言之,除了preSend之前postSendpreReceive之前postReceive.

从 Spring Framework 4.1 和 Spring Integration 4.1 开始,ChannelInterceptor提供了新的方法:afterSendCompletion()afterReceiveCompletion(). 它们在send()' and 'receive()调用,而不管引发的任何异常如何,这允许进行资源清理。 请注意,通道在ChannelInterceptor列表与初始preSend()preReceive()调用。spring-doc.cadn.net.cn

从版本 5.1 开始,全局通道拦截器现在适用于动态注册的通道 - 例如通过使用beanFactory.initializeBean()IntegrationFlowContext使用 Java DSL 时。 以前,在刷新应用程序上下文后创建 bean 时,不会应用拦截器。spring-doc.cadn.net.cn

此外,从版本 5.1 开始,ChannelInterceptor.postReceive()未收到消息时不再调用;不再需要检查null Message<?>. 以前,该方法被调用。 如果你有一个依赖于先前行为的拦截器,请实现afterReceiveCompleted()相反,因为该方法被调用,无论是否收到消息。spring-doc.cadn.net.cn

从版本 5.2 开始,ChannelInterceptorAware已弃用,取而代之的是InterceptableChannel从 Spring Messaging 模块,它现在扩展了该模块以实现向后兼容性。

MessagingTemplate

当引入端点及其各种配置选项时, Spring 集成为消息传递组件提供了一个基础,该组件支持从消息传递系统非侵入性地调用应用程序代码。 但是,有时需要从应用程序代码中调用消息传送系统。 为了方便实现此类用例, Spring 集成提供了一个MessagingTemplate支持跨消息通道的各种作,包括请求和回复场景。 例如,可以发送请求并等待回复,如下所示:spring-doc.cadn.net.cn

MessagingTemplate template = new MessagingTemplate();

Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));

在前面的示例中,模板将在内部创建一个临时匿名通道。 还可以在模板上设置 'sendTimeout' 和 'receiveTimeout' 属性,并且还支持其他交换类型。 下面的清单显示了此类方法的签名:spring-doc.cadn.net.cn

public boolean send(final MessageChannel channel, final Message<?> message) { ...
}

public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}

public Message<?> receive(final PollableChannel<?> channel) { ...
}
一种侵入性较小的方法,允许您使用 payload 或 header 值调用简单的接口,而不是Message实例在输入GatewayProxyFactoryBean.

配置消息通道

要创建消息通道实例,您可以使用<channel/>元素或DirectChannelinstance 进行 Java 配置,如下所示:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
XML 格式
<int:channel id="exampleChannel"/>

当您使用<channel/>元素中,它会创建一个DirectChannel实例(一个SubscribableChannel).spring-doc.cadn.net.cn

要创建 publish-subscribe 通道,请使用<publish-subscribe-channel/>元素(PublishSubscribeChannel在 Java 中),如下所示:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
XML 格式
<int:publish-subscribe-channel id="exampleChannel"/>

您也可以提供各种<queue/>子元素来创建任何可轮询的通道类型(如 消息通道实现中所述)。 以下部分显示了每种通道类型的示例。spring-doc.cadn.net.cn

DirectChannel配置

如前所述,DirectChannel是默认类型。 下面的清单显示了定义谁:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
XML 格式
<int:channel id="directChannel"/>

默认通道具有循环负载均衡器,并且还启用了故障转移(请参阅DirectChannel了解更多详情)。 要禁用其中一项或两项,请添加<dispatcher/>子元素 (一个LoadBalancingStrategy构造函数的DirectChannel) 并配置属性,如下所示:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
XML 格式
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>
数据类型 Channel 配置

有时,使用者只能处理特定类型的有效负载,这迫使您确保输入消息的有效负载类型。 首先想到的可能是使用消息过滤器。 但是,消息筛选器所能做的只是筛选出不符合使用者要求的消息。 另一种方法是使用基于内容的路由器,并将具有不合规数据类型的消息路由到特定的转换器,以强制转换和转换为所需的数据类型。 这将有效,但完成相同任务的更简单方法是应用 Datatype Channel 模式。 您可以为每个特定的负载数据类型使用单独的数据类型通道。spring-doc.cadn.net.cn

要创建仅接受包含特定有效负载类型的消息的数据类型通道,请在通道元素的datatype属性,如下例所示:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
XML 格式
<int:channel id="numberChannel" datatype="java.lang.Number"/>

请注意,对于可分配给通道数据类型的任何类型的类型,类型检查都会通过。 换句话说,numberChannel在前面的示例中,将接受 payload 为java.lang.Integerjava.lang.Double. 可以将多个类型作为逗号分隔的列表提供,如下例所示:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
XML 格式
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

因此,前面示例中的 'numberChannel' 只接受数据类型为java.lang.Number. 但是,如果消息的有效负载不是 required 类型,会发生什么情况呢? 这取决于您是否定义了一个名为integrationConversionService那是 Spring 的 Conversion Service 的一个实例。 如果不是,则Exception将立即被抛出。 但是,如果您定义了integrationConversionServicebean,则使用它来尝试将消息的有效负载转换为可接受的类型。spring-doc.cadn.net.cn

您甚至可以注册自定义转换器。 例如,假设您发送一条消息,其中包含Stringpayload 添加到我们上面配置的 'numberChannel' 中。 您可以按如下方式处理该消息:spring-doc.cadn.net.cn

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常,这将是一个完全合法的作。 但是,由于我们使用 Datatype Channel,因此此类作的结果将生成类似于以下内容的异常:spring-doc.cadn.net.cn

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

发生异常是因为我们要求 payload 类型为Number,但我们发送了String. 所以我们需要一些东西来转换String更改为Number. 为此,我们可以实现类似于以下示例的转换器:spring-doc.cadn.net.cn

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

然后,我们可以将其注册为 Integration Conversion Service 的转换器,如下例所示:spring-doc.cadn.net.cn

Java
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
XML 格式
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

或者在StringToIntegerConverter类(当它标有@Componentannotation 进行自动扫描。spring-doc.cadn.net.cn

当 'converter' 元素被解析时,它会创建integrationConversionServicebean(如果尚未定义)。 有了该转换器,send作现在将成功,因为 DataType 通道使用该转换器将Stringpayload 添加到Integer.spring-doc.cadn.net.cn

有关负载类型转换的更多信息,请参阅负载类型转换spring-doc.cadn.net.cn

从版本 4.0 开始,integrationConversionServiceDefaultDatatypeChannelMessageConverter,它在应用程序上下文中查找 conversion 服务。 要使用其他转换技术,您可以指定message-converter属性。 这必须是对MessageConverter实现。 只有fromMessage方法。 它为转换器提供了对消息 Headers 的访问(如果转换可能需要来自 Headers 的信息,例如content-type). 该方法只能返回转换后的有效负载或完整的Message对象。 如果是后者,则转换器必须小心地从入站消息中复制所有 Headers。spring-doc.cadn.net.cn

或者,您可以声明<bean/>的类型MessageConverter的 ID 为datatypeChannelMessageConverter,并且该转换器被所有通道使用datatype.spring-doc.cadn.net.cn

QueueChannel配置

要创建QueueChannel,请使用<queue/>sub-元素。 您可以按如下方式指定通道的容量:spring-doc.cadn.net.cn

Java
@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
XML 格式
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
如果您没有为此的 'capacity' 属性提供值<queue/>sub-元素,则生成的队列是无界的。 为避免内存不足等问题,我们强烈建议您为有界队列设置显式值。
持续QueueChannel配置

由于QueueChannel提供缓冲消息的功能,但默认情况下仅在内存中缓冲,它还引入了在系统故障时消息可能会丢失的可能性。 为了降低这种风险,一个QueueChannel可能由MessageGroupStorestrategy 界面。 有关的更多详细信息MessageGroupStoreMessageStore,请参阅 Message Storespring-doc.cadn.net.cn

capacity属性时不允许使用message-store属性。

QueueChannel接收一个Message,它会将消息添加到邮件存储中。 当MessageQueueChannel,则会从邮件存储中删除该 ID。spring-doc.cadn.net.cn

默认情况下,QueueChannel将其消息存储在内存中队列中,这可能会导致前面提到的消息丢失情况。 但是, Spring 集成提供了持久存储,例如JdbcChannelMessageStore.spring-doc.cadn.net.cn

您可以为任何QueueChannel通过添加message-store属性,如下例所示:spring-doc.cadn.net.cn

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(有关Java/Kotlin配置选项,请参阅以下示例。spring-doc.cadn.net.cn

Spring 集成 JDBC 模块还为许多流行的数据库提供了模式数据定义语言(DDL)。 这些模式位于该模块的 org.springframework.integration.jdbc.store.channel 包(spring-integration-jdbc).spring-doc.cadn.net.cn

一个重要的特性是,对于任何事务性持久存储(例如JdbcChannelMessageStore),只要 Poller 配置了事务,则只有在事务成功完成时,才能永久删除从 store 中删除的消息。 否则,事务将回滚,并且Message没有丢失。

随着越来越多的与 “NoSQL” 数据存储相关的 Spring 项目开始为这些存储提供底层支持,消息存储的许多其他实现都可用。 您还可以提供自己的MessageGroupStore界面。spring-doc.cadn.net.cn

从 4.0 版本开始,我们建议QueueChannel实例配置为使用ChannelMessageStore,如果可能。 与一般邮件存储相比,这些存储通常针对此用途进行了优化。 如果ChannelMessageStore是一个ChannelPriorityMessageStore,则消息将按优先级顺序在 FIFO 中接收。 优先级的概念由 message store 实现确定。 例如,以下示例显示了 MongoDB 通道消息存储的 Java 配置:spring-doc.cadn.net.cn

Java
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
Java DSL
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
Kotlin DSL
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
注意MessageGroupQueue类。 那是一个BlockingQueueimplementation 以使用MessageGroupStore操作。

自定义QueueChannelenvironment 由ref属性的<int:queue>sub-元素或其特定构造函数。 此属性提供对任何java.util.Queue实现。 例如,分布式 HazelcastIQueue可以按如下方式进行配置:spring-doc.cadn.net.cn

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel配置

要创建PublishSubscribeChannel,请使用 <publish-subscribe-channel/> 元素。 使用此元素时,您还可以指定task-executor用于发布消息(如果未指定,则在 sender 的线程中发布),如下所示:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
XML 格式
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

如果您在PublishSubscribeChannel中,您可以将渠道上的“apply-sequence”属性设置为true. 这样做表明通道应将sequence-sizesequence-number消息标头以及相关 ID。 例如,如果有 5 个订阅者,则sequence-size将设置为5,并且消息将具有sequence-number标头值范围从15.spring-doc.cadn.net.cn

Executor,您还可以配置ErrorHandler. 默认情况下,PublishSubscribeChannel使用MessagePublishingErrorHandler实现将错误发送到MessageChannelerrorChannel标头或全局errorChannel实例。 如果Executor未配置,则ErrorHandler被忽略,异常将直接抛出到调用方的线程中。spring-doc.cadn.net.cn

如果您提供ResequencerAggregator下游PublishSubscribeChannel中,您可以将渠道上的“apply-sequence”属性设置为true. 这样做表示通道应在传递消息之前设置 sequence-size 和 sequence-number 消息头以及相关 ID。 例如,如果有 5 个订阅者,则 sequence-size 将设置为5,并且消息将具有序列号报头值,范围从15.spring-doc.cadn.net.cn

以下示例演示如何设置apply-sequenceheader 设置为true:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
XML 格式
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
apply-sequencevalue 为false默认情况下,以便 Publish-Subscribe 通道可以将完全相同的消息实例发送到多个出站通道。 由于 Spring Integration 强制执行有效负载和标头引用的不可变性,因此当该标志设置为true,通道会创建新的Message具有相同有效负载引用但标头值不同的实例。

从版本 5.4.3 开始,PublishSubscribeChannel也可以使用requireSubscribers选项的BroadcastingDispatcher表示该频道在没有订阅者时不会静默忽略消息。 一个MessageDispatchingException替换为Dispatcher has no subscribers消息 当没有订阅者时抛出,并且此选项设置为true.spring-doc.cadn.net.cn

ExecutorChannel

要创建ExecutorChannel,请添加<dispatcher>子元素替换为task-executor属性。 该属性的值可以引用任何TaskExecutor在上下文中。 例如,这样做可以启用线程池的配置,以便将消息分派给订阅的处理程序。 如前所述,这样做会破坏发送方和接收方之间的单线程执行上下文,以便任何活动的事务上下文都不会被处理程序的调用共享(即,处理程序可能会抛出一个Exception,但sendinvocation has already returned successfully) 的调用。 以下示例演示如何使用dispatcher元素并在task-executor属性:spring-doc.cadn.net.cn

Java
@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
XML 格式
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

load-balancerfailover选项在 <dispatcher/> 子元素上也可用,如前面的 中所述DirectChannel配置. 相同的默认值适用。 因此,通道具有启用故障转移的循环负载平衡策略,除非为其中一个或两个属性提供了显式配置,如下例所示:spring-doc.cadn.net.cn

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>
PriorityChannel配置

要创建PriorityChannel,请使用<priority-queue/>sub-元素,如下例所示:spring-doc.cadn.net.cn

Java
@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
XML 格式
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

默认情况下,通道会查询priority标头。 但是,您可以改为提供自定义Comparator参考。 另请注意,PriorityChannel(与其他类型一样)支持datatype属性。 与QueueChannel,它还支持capacity属性。 以下示例演示了所有这些:spring-doc.cadn.net.cn

Java
@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
XML 格式
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

从 4.0 版本开始,priority-channel子元素支持message-store选项 (comparatorcapacity在这种情况下是不允许的)。 邮件存储必须是PriorityCapableChannelMessageStore. 的PriorityCapableChannelMessageStore目前为Redis,JDBCMongoDB. 看QueueChannel配置Message Store 了解更多信息。 您可以在 Backing Message Channels 中找到示例配置。spring-doc.cadn.net.cn

RendezvousChannel配置

一个RendezvousChannel在队列子元素为<rendezvous-queue>. 它不提供前面描述的配置选项的任何其他配置选项,并且其队列不接受任何容量值,因为它是零容量直接切换队列。 以下示例说明如何声明RendezvousChannel:spring-doc.cadn.net.cn

Java
@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
XML 格式
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>
作用域通道配置

任何通道都可以配置scope属性,如下例所示:spring-doc.cadn.net.cn

<int:channel id="threadLocalChannel" scope="thread"/>
通道拦截器配置

消息通道也可能具有拦截器,如 通道拦截器中所述。 这<interceptors/>sub-元素可以添加到<channel/>(或更具体的元素类型)。 您可以提供ref属性来引用任何 Spring 管理的对象,该对象实现了ChannelInterceptor接口,如下例所示:spring-doc.cadn.net.cn

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

通常,我们建议在单独的位置定义拦截器实现,因为它们通常提供可在多个通道之间重用的常见行为。spring-doc.cadn.net.cn

全局通道拦截器配置

Channel interceptors 提供了一种简洁明了的方式来为每个单独的 Channel 应用横切行为。 如果应该在多个 channel 上应用相同的行为,则为每个 channel 配置相同的拦截器集将不是最有效的方法。 为了避免重复配置,同时使拦截器能够应用于多个通道, Spring 集成提供了全局拦截器。 请考虑以下一对示例:spring-doc.cadn.net.cn

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

<channel-interceptor/>元素允许您定义一个全局拦截器,该拦截器将应用于与pattern属性。 在前面的情况下,全局拦截器应用于 'thing1' 通道和所有其他以 'thing2' 或 'input' 开头的通道,但不应用于以 'thing3' 开头的通道(从 5.0 版本开始)。spring-doc.cadn.net.cn

将此语法添加到模式中会导致一个可能的(尽管可能不太可能)问题。 如果你有一个名为!thing1,并且您包含一个!thing1在你的通道拦截器的patternpatterns,则它不再匹配。 该模式现在匹配所有未命名的 beanthing1. 在这种情况下,您可以转义!在模式中。 模式\\!thing1匹配名为!thing1.

order 属性允许您管理当给定通道上有多个拦截器时,此拦截器的注入位置。 例如,通道 'inputChannel' 可以在本地配置单独的拦截器(见下文),如下例所示:spring-doc.cadn.net.cn

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

一个合理的问题是“相对于本地配置的其他拦截器或通过其他全局拦截器定义,全局拦截器是如何注入的? 当前的实现提供了一种简单的机制来定义拦截器执行的顺序。 中order属性确保在任何现有拦截器之后注入拦截器,而负数确保拦截器在现有拦截器之前注入。 这意味着,在前面的示例中,全局拦截器是在order大于0) 本地配置的 'wire-tap' 侦听器。 如果存在另一个全局拦截器,并且pattern,则其顺序将通过比较两个拦截器的值来确定order属性。 要在现有拦截器之前注入全局拦截器,请对order属性。spring-doc.cadn.net.cn

请注意,orderpattern属性是可选的。 的默认值order将为 0,对于pattern,默认值为 '*' (以匹配所有通道)。
丝锥

如前所述, Spring 集成提供了一个简单的 wire tap 拦截器。 您可以在<interceptors/>元素。 这样做对于调试特别有用,并且可以与 Spring 集成的日志记录通道适配器结合使用,如下所示:spring-doc.cadn.net.cn

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter'还接受'expression'属性,以便您可以根据'payload'和'headers'变量评估 SPEL 表达式。 或者,要记录完整消息toString()result,请提供true对于 'log-full-message' 属性。 默认情况下,它是false,以便仅记录有效负载。 将其设置为true启用除有效负载之外的所有标头的日志记录。 'expression' 选项提供了最大的灵活性(例如expression="payload.user.name").

关于 wire tap 和其他类似组件(消息发布配置)的一个常见误解是,它们在本质上是自动异步的。 默认情况下,作为组件的 wire tap 不会异步调用。 相反, Spring 集成专注于配置异步行为的单一统一方法:消息通道。 使消息流的某些部分同步或异步的是在该流中配置的 Message Channel 的类型。 这是消息通道抽象的主要好处之一。 从框架成立之初,我们就一直强调消息通道作为框架的一等公民的需求和价值。 它不仅仅是 EIP 模式的内部隐式实现。 它作为可配置组件完全公开给最终用户。 因此,Wire Tap 组件仅负责执行以下任务:spring-doc.cadn.net.cn

它本质上是桥接模式的变体,但它封装在通道定义中(因此更容易在不中断流的情况下启用和禁用)。 此外,与桥接不同,它基本上是分叉另一个消息流。 该流是同步的还是异步的?答案取决于 'channelB' 的消息通道类型。 我们有以下选项:direct channel、pollable channel 和 executor channel。 最后两个打破了线程边界,使通过此类通道的通信异步,因为将消息从该通道分派到其订阅的处理程序发生在与用于将消息发送到该通道的线程不同的线程上。 这就是使您的 wire-tap 流同步或异步的原因。 它与框架中的其他组件(比如消息发布者)一致,并且通过让您无需提前担心(除了编写线程安全代码)特定代码段应该作为同步还是异步实现,从而增加了一定程度的一致性和简单性。 两个代码段(比如组件 A 和组件 B)在消息通道上的实际连接使它们的协作同步或异步。 你甚至可能希望将来从 synchronous 更改为 asynchronous ,而 message channel 让你无需接触代码即可快速完成。spring-doc.cadn.net.cn

关于窃听的最后一点是,尽管上面提供了默认情况下不异步的基本原理,但您应该记住,通常希望尽快传递消息。 因此,使用 asynchronous channel 选项作为 wire tap 的出站通道是很常见的。 但是,默认情况下不强制实施异步行为。 如果我们这样做,有许多用例会中断,包括您可能不想打破事务边界。 也许您使用 wire tap 模式进行审计,并且您确实希望在原始事务中发送审计消息。 例如,您可以将 wire tap 连接到 JMS 出站通道适配器。 这样,您可以获得两全其美的效果:1) JMS 消息的发送可以在事务中进行,而 2) 它仍然是一个 “即发即弃”作,从而防止主消息流中出现任何明显的延迟。spring-doc.cadn.net.cn

从版本 4.0 开始,当拦截器(例如WireTap) 引用频道。 您需要将此类 channel 从当前拦截器拦截的 channels 中排除。 这可以通过适当的模式或编程方式完成。 如果您有一个自定义的ChannelInterceptor引用channel,请考虑实施VetoCapableInterceptor. 这样,框架会询问拦截器是否可以根据提供的模式拦截每个候选通道。 您还可以在拦截器方法中添加运行时保护,以确保通道不是拦截器引用的通道。 这WireTap使用这两种技术。

从版本 4.3 开始,WireTap具有其他构造函数,这些构造函数采用channelName而不是MessageChannel实例。 这对于 Java 配置以及使用通道自动创建逻辑时非常方便。 目标MessageChannelbean 从提供的channelName稍后,在第一次与 拦截 器。spring-doc.cadn.net.cn

通道解析需要BeanFactory,因此 wire tap 实例必须是 Spring 管理的 bean。

这种后期绑定方法还允许使用 Java DSL 配置简化典型的窃听模式,如下例所示:spring-doc.cadn.net.cn

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}
条件接线器

Wire Tap 可以通过使用selectorselector-expression属性。 这selector参考资料 aMessageSelectorbean,它可以在运行时确定消息是否应转到 tap 通道。 同样,selector-expression是执行相同目的的布尔 SpEL 表达式:如果表达式的计算结果为true,消息将发送到 Tap 渠道。spring-doc.cadn.net.cn

全局 Wire Tap 配置

可以将全局 wire tap 配置为 Global Channel Interceptor Configuration 的特殊情况。 为此,请配置顶级wire-tap元素。 现在,除了正常的wire-tap命名空间支持、patternorderattributes 受支持,其工作方式与它们对channel-interceptor. 以下示例说明如何配置全局 Wire Tap:spring-doc.cadn.net.cn

Java
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
XML 格式
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局 Wire Tap 提供了一种在外部配置单通道 Wire Tap 的便捷方法,而无需修改现有通道配置。 为此,请将pattern属性添加到目标频道名称中。 例如,您可以使用此技术来配置测试用例以验证通道上的消息。

特殊频道

默认情况下,在 application context 中定义了两个特殊通道:errorChannelnullChannel. 'nullChannel'(NullChannel) 的作用类似于/dev/null,记录发送到DEBUG级别并立即返回。 特殊处理适用于org.reactivestreams.Publisherpayload of a transmitted message:它立即在此通道中订阅,以启动反应式流处理,尽管数据被丢弃。 反应式流处理引发的错误(参见Subscriber.onError(Throwable)) 记录在warn级别以进行可能的调查。 如果需要对此类错误执行任何作,则ReactiveRequestHandlerAdvice替换为Mono.doOnError()自定义可以应用于生成Mono回复这个nullChannel. 任何时候,当你遇到你不关心的回复的通道解析错误时,你可以设置受影响组件的output-channel属性设置为 'nullChannel' (名称 'nullChannel' 在应用程序上下文中保留)。spring-doc.cadn.net.cn

'errorChannel' 在内部用于发送错误消息,并且可以被自定义配置覆盖。 错误处理中对此进行了更详细的讨论。spring-doc.cadn.net.cn

有关消息通道和拦截器的更多信息,另请参阅 Java DSL 一章中的消息通道spring-doc.cadn.net.cn

轮询器

本节描述了 Spring Integration 中 polling 的工作原理。spring-doc.cadn.net.cn

轮询消费者

当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一:spring-doc.cadn.net.cn

实际实现取决于这些终端节点连接到的通道类型。 一个通道适配器,该适配器连接到实现org.springframework.messaging.SubscribableChannelinterface 会生成一个EventDrivenConsumer. 另一方面,连接到实现org.springframework.messaging.PollableChannel接口(例如QueueChannel) 会生成一个PollingConsumer.spring-doc.cadn.net.cn

轮询使用者允许 Spring 集成组件主动轮询消息,而不是以事件驱动的方式处理消息。spring-doc.cadn.net.cn

在许多消息传递方案中,它们代表一个关键的横切关注点。 在 Spring Integration 中,轮询使用者基于同名模式,Gregor Hohpe 和 Bobby Woolf 在 Enterprise Integration Patterns 一书中对此进行了描述。 您可以在该书的网站上找到该模式的描述。spring-doc.cadn.net.cn

Pollable 消息源

Spring 集成提供了轮询消费者模式的第二种变体。 使用入站通道适配器时,这些适配器通常由SourcePollingChannelAdapter. 例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置了 Poller 以定期检索消息。 因此,当组件使用 Poller 进行配置时,生成的实例是以下类型之一:spring-doc.cadn.net.cn

这意味着轮询器用于入站和出站消息传递方案。 以下是使用 Poller 的一些用例:spring-doc.cadn.net.cn

AOP 通知类可以应用于轮询器,在advice-chain,例如用于启动事务的事务通知。 从版本 4.1 开始,PollSkipAdvice。 轮询器使用触发器来确定下一次轮询的时间。 这PollSkipAdvice可用于抑制(跳过)轮询,可能是因为存在一些下游条件会阻止处理消息。 要使用此建议,您必须为其提供PollSkipStrategy. 从版本 4.2.5 开始,SimplePollSkipStrategy。 要使用它,您可以将实例作为 bean 添加到应用程序上下文中,将其注入到PollSkipAdvice,并将其添加到 poller's advice 链中。 要跳过轮询,请调用skipPolls(). 要恢复轮询,请调用reset(). 版本 4.2 在此领域增加了更多灵活性。 参见 Message Sources 的条件轮询器

本章仅对轮询使用者以及它们如何适应消息通道(请参见消息通道)和通道适配器(请参见通道适配器)的概念进行简要概述。 有关消息收发终端节点的一般信息,特别是轮询使用者的更多信息,请参阅消息终端节点spring-doc.cadn.net.cn

延迟确认轮询消息源

从版本 5.0.1 开始,某些模块提供MessageSource支持将确认推迟到 downstream flow 完成(或将消息移交给另一个线程)的实现。 这目前仅限于AmqpMessageSourceKafkaMessageSource.spring-doc.cadn.net.cn

使用这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK标头(请参阅MessageHeaderAccessor应用程序接口) 将添加到消息中。 当与可轮询消息源一起使用时,标头的值是AcknowledgmentCallback,如下例所示:spring-doc.cadn.net.cn

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如,KafkaMessageSource) 支持REJECT地位。 它的处理方式与ACCEPT.spring-doc.cadn.net.cn

应用程序可以随时确认消息,如下例所示:spring-doc.cadn.net.cn

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果MessageSource连接到SourcePollingChannelAdapter,当 Poller 线程在下游流完成后返回到适配器时,适配器会检查确认是否已被确认,如果没有,则将其状态设置为ACCEPTit(或REJECT如果流引发异常)。 状态值在AcknowledgmentCallback.Status列举.spring-doc.cadn.net.cn

Spring 集成提供了MessageSourcePollingTemplate要对MessageSource. 这也负责设置ACCEPTREJECTAcknowledgmentCallbackMessageHandlercallback 返回(或引发异常)。 以下示例演示如何使用MessageSourcePollingTemplate:spring-doc.cadn.net.cn

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在这两种情况下 (SourcePollingChannelAdapterMessageSourcePollingTemplate),您可以通过调用noAutoAck()在回调上。 如果您将消息交给另一个线程并希望稍后确认,则可以执行此作。 并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。spring-doc.cadn.net.cn

消息源的条件轮询器

本节介绍如何使用条件 Poller。spring-doc.cadn.net.cn

背景

Advice对象, 在advice-chain在 Poller 上,通知整个轮询任务(消息检索和处理)。 这些 “around advice” 方法无法访问 poll 的任何上下文 — 只能访问 poll 本身。 如前所述,这对于诸如使任务事务性或由于某些外部条件而跳过轮询等要求来说很好。 如果我们希望根据receive轮询的一部分,或者我们是否想根据条件调整轮询器?对于这些实例, Spring 集成提供了“智能”轮询。spring-doc.cadn.net.cn

“智能”轮询

版本 5.3 引入了ReceiveMessageAdvice接口。 任何Advice对象advice-chain的 API 版本仅应用于receive()操作-MessageSource.receive()PollableChannel.receive(timeout). 因此,它们只能应用于SourcePollingChannelAdapterPollingConsumer. 此类实现以下方法:spring-doc.cadn.net.cn

  • beforeReceive(Object source)此方法在Object.receive()方法。 它允许您检查和重新配置源。 返回false取消此轮询(类似于PollSkipAdvice前面提到过)。spring-doc.cadn.net.cn

  • Message<?> afterReceive(Message<?> result, Object source)该方法在receive()方法。 同样,您可以重新配置源或采取任何作(可能取决于结果,可以是null如果源没有创建消息)。 您甚至可以返回不同的消息spring-doc.cadn.net.cn

线程安全

如果Advice更改源,则不应使用TaskExecutor. 如果Advice更改源,则此类更改不是线程安全的,可能会导致意外结果,尤其是对于高频轮询器。 如果需要并发处理轮询结果,请考虑使用下游ExecutorChannel而不是向 Poller 添加 executor。spring-doc.cadn.net.cn

Advice Chain 订购

您应该了解在初始化期间如何处理通知链。Advice未实现ReceiveMessageAdvice应用于整个轮询过程,并且都按顺序首先调用,然后依次在任何ReceiveMessageAdvice. 然后ReceiveMessageAdvice对象在源周围按顺序调用receive()方法。 例如,如果您已收到Advice对象a, b, c, d哪里bdReceiveMessageAdvice,则按以下顺序应用对象:a, c, b, d. 此外,如果源已经是ProxyReceiveMessageAdvice在任何现有的Advice对象。 如果您想更改订单,您必须自己连接代理。spring-doc.cadn.net.cn

SimpleActiveIdleReceiveMessageAdvice

此建议是ReceiveMessageAdvice. 当与DynamicPeriodicTrigger,它会调整轮询频率,具体取决于上一次轮询是否导致消息。 轮询器还必须具有对相同DynamicPeriodicTrigger.spring-doc.cadn.net.cn

重要: 异步切换
SimpleActiveIdleReceiveMessageAdvice根据receive()结果。 这仅在 poller 线程上调用通知时有效。 如果 Poller 具有task-executor. 要在轮询结果后使用异步作时使用此建议,请稍后执行异步切换,可能通过使用ExecutorChannel.
CompoundTriggerAdvice

此建议允许根据 poll 是否返回消息来选择两个触发器之一。 考虑一个使用CronTrigger.CronTrigger实例是不可变的,因此一旦构建就无法更改。 考虑这样一个使用案例:我们希望使用 cron 表达式每小时触发一次轮询,但如果未收到任何消息,则每分钟轮询一次,并在检索到消息时恢复为使用 cron 表达式。spring-doc.cadn.net.cn

建议(和 poller)使用CompoundTrigger为此目的。 触发器的primarytrigger 可以是CronTrigger. 当通知检测到未收到任何消息时,它会将辅助触发器添加到CompoundTrigger. 当CompoundTrigger实例的nextExecutionTime方法,它会委托给辅助触发器(如果存在)。 否则,它将委托给主触发器。spring-doc.cadn.net.cn

轮询器还必须具有对相同CompoundTrigger.spring-doc.cadn.net.cn

以下示例显示了回退到每分钟的每小时 cron 表达式的配置:spring-doc.cadn.net.cn

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要: 异步切换
CompoundTriggerAdvice根据receive()结果。 这仅在 poller 线程上调用通知时有效。 如果 Poller 具有task-executor. 要在轮询结果后使用异步作时使用此建议,请稍后执行异步切换,可能通过使用ExecutorChannel.
仅限 MessageSource 的建议

某些建议可能仅适用于MessageSource.receive()而且它们对PollableChannel. 为此,一个MessageSourceMutator接口(ReceiveMessageAdvice) 仍然存在。 有关更多信息,请参见入站通道适配器:轮询多个服务器和目录spring-doc.cadn.net.cn

Channel Adapter

通道适配器是一个消息端点,它允许将单个发送方或接收方连接到消息通道。 Spring 集成提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。 本参考指南的后续章节将讨论每个适配器。 但是,本章重点介绍简单但灵活的方法调用通道适配器支持。 有入站和出站适配器,每个适配器都可以使用 core 命名空间中提供的 XML 元素进行配置。 这些提供了一种扩展 Spring Integration 的简单方法,只要你有一个可以作为源或目标调用的方法。spring-doc.cadn.net.cn

配置入站通道适配器

inbound-channel-adapter元素 (一个SourcePollingChannelAdapter)可以在 Spring Management 的对象上调用任何方法,并将非 null 返回值发送到MessageChannel将方法的输出转换为Message. 激活适配器的订阅后,轮询器会尝试从源接收消息。 轮询器是使用TaskScheduler根据提供的配置。 要为单个通道适配器配置轮询间隔或 cron 表达式,你可以提供一个 'poller' 元素,其中包含一个调度属性,例如 'fixed-rate' 或 'cron'。 以下示例定义了两个inbound-channel-adapter实例:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
Java
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
Kotlin DSL
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
XML 格式
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
如果未提供 Poller,则必须在上下文中注册单个默认 Poller。 有关更多详细信息,请参阅 Endpoint Namespace Support
重要:轮询器配置

所有inbound-channel-adapter类型由SourcePollingChannelAdapter,这意味着它们包含一个轮询MessageSource(要调用一个自定义方法,该方法生成的值将变为Messagepayload) 的 Payload 中。 以下示例显示了两个 Poller 的配置:spring-doc.cadn.net.cn

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一个配置中,轮询任务每次轮询调用一次,并且在每个任务(轮询)期间,根据max-messages-per-poll属性值。 在第二种配置中,轮询任务每次轮询调用 10 次,或者直到它返回 'null',因此每次轮询可能会生成 10 条消息,而每次轮询的间隔为 1 秒。 但是,如果配置类似于以下示例,会发生什么情况:spring-doc.cadn.net.cn

<int:poller fixed-rate="1000"/>

请注意,没有max-messages-per-poll指定。 正如我们稍后介绍的,PollingConsumer(例如,service-activator,filter,router等)的默认值为-1max-messages-per-poll,这意味着“不停地执行轮询任务,除非轮询方法返回 null(可能是因为QueueChannel)“,然后睡一秒钟。spring-doc.cadn.net.cn

但是,在SourcePollingChannelAdapter,这有点不同。 的默认值max-messages-per-poll1,除非您将其显式设置为负值(例如-1). 这确保了 Poller 可以对生命周期事件(例如启动和停止)做出反应,并防止它可能在无限循环中旋转(如果MessageSource有可能永远不会返回 null,并且恰好是不可中断的。spring-doc.cadn.net.cn

但是,如果您确定您的方法可以返回 null,并且您需要在每次轮询中轮询尽可能多的可用源,则应显式设置max-messages-per-poll设置为负值,如下例所示:spring-doc.cadn.net.cn

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

从版本 5.5 开始,一个0的值max-messages-per-poll具有特殊含义 - 跳过MessageSource.receive()调用,这可能被视为此入站通道适配器的暂停,直到maxMessagesPerPoll稍后更改为非零值,例如通过 Control Bus。spring-doc.cadn.net.cn

另请参阅 Global Default Poller 了解更多信息。spring-doc.cadn.net.cn

配置出站通道适配器

outbound-channel-adapter元素 (一个@ServiceActivator对于 Java 配置)也可以连接一个MessageChannel到任何 POJO 消费者方法,该方法应该使用发送到该通道的消息的有效负载来调用。 以下示例说明如何定义出站通道适配器:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
Java
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
Kotlin DSL
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
XML 格式
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果要适配的通道是PollableChannel,您必须提供一个 Poller 子元素(@Pollersub-annotation 的@ServiceActivator),如下例所示:spring-doc.cadn.net.cn

Java
public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
XML 格式
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

您应该使用ref属性,如果 POJO 消费者实现可以在其他<outbound-channel-adapter>定义。 但是,如果使用者实现仅由<outbound-channel-adapter>,您可以将其定义为内部 Bean,如下例所示:spring-doc.cadn.net.cn

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
同时使用refattribute 和内部处理程序定义位于同一<outbound-channel-adapter>不允许配置,因为它会产生不明确的条件。 此类配置会导致引发异常。

任何通道适配器都可以在没有channel引用,在这种情况下,它会隐式地创建一个DirectChannel. 创建的频道名称与id属性的<inbound-channel-adapter><outbound-channel-adapter>元素。 因此,如果channel未提供、id是必需的。spring-doc.cadn.net.cn

通道适配器表达式和脚本

与许多其他 Spring 集成组件一样,<inbound-channel-adapter><outbound-channel-adapter>还提供对 SPEL 表达式评估的支持。 要使用 SPEL,请在'expression'属性中提供表达式字符串,而不是提供用于在 Bean 上调用方法的'ref'和'method'属性。 当一个表达式被求值时,它遵循与 method-invocation 相同的契约,其中:一个<inbound-channel-adapter>每当评估结果为非 null 值时都会生成一条消息,而<outbound-channel-adapter>必须等效于返回 void 的方法调用。spring-doc.cadn.net.cn

从 Spring Integration 3.0 开始,<int:inbound-channel-adapter/>也可以使用 SPEL 进行配置<expression/>(甚至使用<script/>) 子元素,当需要比简单的 'expression' 属性更复杂时。 如果您将脚本作为Resource通过使用location属性,您还可以设置refresh-check-delay,这允许定期刷新资源。 如果希望在每次轮询时检查脚本,则需要将此设置与 Poller 的触发器协调,如下例所示:spring-doc.cadn.net.cn

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

另请参阅cacheSeconds属性ReloadableResourceBundleExpressionSource使用<expression/>sub-元素。 有关表达式的更多信息,请参阅 Spring 表达式语言 (SpEL)。 有关脚本,请参阅 Groovy 支持脚本支持spring-doc.cadn.net.cn

<int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一个端点,它通过定期触发以轮询某些底层MessageSource. 由于在轮询时没有消息对象,因此表达式和脚本无权访问根Message,因此在大多数其他消息传递 SPEL 表达式中没有可用的 payload 或 headers 属性。 该脚本可以生成并返回一个完整的Message对象,或者仅是有效负载,框架将其添加到具有基本标头的消息中。

消息桥

消息桥是连接两个消息通道或通道适配器的相对简单的端点。 例如,您可能希望连接PollableChannel更改为SubscribableChannel这样订阅终端节点就不必担心任何轮询配置。 相反,消息桥提供轮询配置。spring-doc.cadn.net.cn

通过在两个通道之间提供中间轮询器,您可以使用消息传递桥来限制入站消息。 poller 的触发器确定消息到达第二个通道的速率,而 poller 的maxMessagesPerPoll属性对吞吐量实施限制。spring-doc.cadn.net.cn

消息桥的另一个有效用途是连接两个不同的系统。 在这种情况下, Spring 集成的作用仅限于在这些系统之间建立连接并在必要时管理 Poller。 在两个系统之间至少有一个转换器,以便在它们的格式之间进行转换,这可能更常见。 在这种情况下,通道可以作为转换器端点的 'input-channel' 和 'output-channel' 提供。 如果不需要数据格式转换,则消息桥可能确实足够了。spring-doc.cadn.net.cn

使用 XML 配置 Bridge

您可以使用<bridge>元素用于在两个消息通道或通道适配器之间创建消息桥。 为此,请提供input-channeloutput-channel属性,如下例所示:spring-doc.cadn.net.cn

<int:bridge input-channel="input" output-channel="output"/>

如上所述,消息传递桥的一个常见用例是将PollableChannel更改为SubscribableChannel. 在执行此角色时,消息传送桥还可以用作限制器:spring-doc.cadn.net.cn

<int:bridge input-channel="pollable" output-channel="subscribable">
     <int:poller max-messages-per-poll="10" fixed-rate="5000"/>
 </int:bridge>

您可以使用类似的机制来连接通道适配器。 以下示例显示了stdinstdout来自 Spring Integration 的streamNamespace:spring-doc.cadn.net.cn

<int-stream:stdin-channel-adapter id="stdin"/>

 <int-stream:stdout-channel-adapter id="stdout"/>

 <int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>

类似的配置适用于其他(可能更有用的)Channel Adapter 桥接,例如文件到 JMS 或邮件到文件。 接下来的章节将介绍各种通道适配器。spring-doc.cadn.net.cn

如果未在桥接上定义 'output-channel',则使用入站消息提供的回复通道(如果可用)。 如果 output 和 reply channel 都不可用,则会引发异常。

使用 Java 配置配置 Bridge

以下示例演示如何使用@BridgeFrom注解:spring-doc.cadn.net.cn

@Bean
public PollableChannel polled() {
    return new QueueChannel();
}

@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
    return new DirectChannel();
}

以下示例演示如何使用@BridgeTo注解:spring-doc.cadn.net.cn

@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
    return new QueueChannel();
}

@Bean
public SubscribableChannel direct() {
    return new DirectChannel();
}

或者,您可以使用BridgeHandler,如下例所示:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "polled",
        poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
    BridgeHandler bridge = new BridgeHandler();
    bridge.setOutputChannelName("direct");
    return bridge;
}

使用 Java DSL 配置 Bridge

您可以使用 Java 域特定语言 (DSL) 来配置网桥,如下例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow bridgeFlow() {
    return IntegrationFlow.from("polled")
            .bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
            .channel("direct")
            .get();
}