此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
Spring Integration 提供了不同的消息通道实现。 以下各节简要介绍。
PublishSubscribeChannel
该实现将发送给它的任何内容广播到其所有订阅的处理程序。
这最常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常由单个处理程序处理)。
请注意,这仅用于发送。
由于它在调用其方法时直接向订阅者广播,因此使用者无法轮询消息(它不实现,因此没有方法)。
相反,任何订阅服务器本身都必须是 ,并且依次调用订阅服务器的方法。PublishSubscribeChannel
Message
PublishSubscribeChannel
send(Message)
PollableChannel
receive()
MessageHandler
handleMessage(Message)
在版本 3.0 之前,在没有订阅者返回的 上调用该方法。
当与 a 一起使用时,a 被抛出。
从版本 3.0 开始,行为已更改,因此,如果至少存在最少的订阅者(并成功处理消息),则始终将 a 视为成功。
可以通过设置默认为 的属性来修改此行为。send
PublishSubscribeChannel
false
MessagingTemplate
MessageDeliveryException
send
minSubscribers
0
如果使用 ,则仅使用正确数量的订阅者进行此确定,因为消息的实际处理是异步执行的。TaskExecutor |
如果使用 ,则仅使用正确数量的订阅者进行此确定,因为消息的实际处理是异步执行的。TaskExecutor |
QueueChannel
该实现包装一个队列。
与 不同,具有点对点语义。
换言之,即使通道有多个使用者,也只有其中一个使用者应接收发送到该通道的任何信息。
它提供了一个默认的无参数构造函数(提供本质上无限制的容量 )以及一个接受队列容量的构造函数,如以下列表所示:QueueChannel
PublishSubscribeChannel
QueueChannel
Message
Integer.MAX_VALUE
public QueueChannel(int capacity)
未达到其容量限制的通道将消息存储在其内部队列中,即使没有接收方准备好处理消息,该方法也会立即返回。
如果队列已达到容量,则发件人将阻止,直到队列中有可用空间。
或者,如果使用具有附加超时参数的 send 方法,则队列将阻塞,直到任一房间可用或超时期限结束(以先发生者为准)。
同样,如果队列中有可用消息,则呼叫会立即返回,但是,如果队列为空,则接收呼叫可能会阻塞,直到消息可用或超时(如果提供)过去。
无论哪种情况,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。
但请注意,对 和 的 和 没有参数的版本的调用会无限期地阻塞。send(Message<?>)
receive()
send()
receive()
timeout
PriorityChannel
虽然强制执行先进先出 (FIFO) 排序,但 是一种替代实现,允许根据优先级在通道内对消息进行排序。
默认情况下,优先级由每条消息中的标头决定。
但是,对于自定义优先级确定逻辑,可以向构造函数提供类型的比较器。QueueChannel
PriorityChannel
priority
Comparator<Message<?>>
PriorityChannel
RendezvousChannel
这支持“直接切换”方案,其中发送方会阻止,直到另一方调用通道的方法。
另一方会阻止,直到发件人发送消息。
在内部,此实现与 非常相似,只不过它使用了 ( 的零容量实现)。
这在发送方和接收方在不同线程中操作的情况非常有效,但异步删除队列中的消息是不合适的。
换言之,使用 ,发送方知道某个接收方已接受消息,而使用 ,消息将存储到内部队列中,并且可能永远不会收到。RendezvousChannel
receive()
QueueChannel
SynchronousQueue
BlockingQueue
RendezvousChannel
QueueChannel
请记住,默认情况下,所有这些基于队列的通道都仅将消息存储在内存中。
当需要持久性时,可以在 'queue' 元素中提供 'message-store' 属性来引用持久性实现,也可以将本地通道替换为持久性代理(例如 JMS 支持的通道或通道适配器)支持的通道。
后一个选项允许您利用任何 JMS 提供程序的消息持久性实现,如 JMS 支持中所述。
但是,如果不需要在队列中进行缓冲,则最简单的方法是依赖下一节中讨论的 。MessageStore DirectChannel |
对于实现请求-答复操作也很有用。
发送方可以创建一个临时的匿名实例,然后在构建 .
发送后,发送方可以立即调用(可选地提供超时值)以便在等待回复时进行阻塞。
这与Spring Integration的许多请求-回复组件内部使用的实现非常相似。RendezvousChannel
RendezvousChannel
Message
Message
receive
Message
请记住,默认情况下,所有这些基于队列的通道都仅将消息存储在内存中。
当需要持久性时,可以在 'queue' 元素中提供 'message-store' 属性来引用持久性实现,也可以将本地通道替换为持久性代理(例如 JMS 支持的通道或通道适配器)支持的通道。
后一个选项允许您利用任何 JMS 提供程序的消息持久性实现,如 JMS 支持中所述。
但是,如果不需要在队列中进行缓冲,则最简单的方法是依赖下一节中讨论的 。MessageStore DirectChannel |
DirectChannel
具有点对点语义,但在其他方面比前面描述的任何基于队列的通道实现都更相似。
它实现接口而不是接口,因此它直接将消息调度给订阅者。
但是,作为点对点通道,它与将每个通道发送到单个订阅者的不同之处在于。DirectChannel
PublishSubscribeChannel
SubscribableChannel
PollableChannel
PublishSubscribeChannel
Message
MessageHandler
除了是最简单的点对点通道选项外,其最重要的功能之一是它使单个线程能够在通道的“两侧”执行操作。
例如,如果处理程序订阅了 ,则向该通道发送 将直接在发送方的线程中触发对该处理程序方法的调用,然后方法调用才能返回。DirectChannel
Message
handleMessage(Message)
send()
提供具有此行为的通道实现的关键动机是支持必须跨越通道的事务,同时仍受益于通道提供的抽象和松散耦合。
如果调用是在事务范围内调用的,则处理程序的调用结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)方面起作用。send()
由于 是最简单的选项,并且不会增加调度和管理轮询器线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。
一般的想法是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,并修改这些通道以基于队列。
同样,如果频道需要广播消息,则不应是 .
稍后,我们将展示如何配置这些通道中的每一个。DirectChannel PollableChannels DirectChannel PublishSubscribeChannel |
内部委托给消息调度器以调用其订阅的消息处理程序,并且该调度程序可以具有由 或 属性公开的负载平衡策略(互斥)。
消息调度程序使用负载平衡策略来帮助确定当多个消息处理程序订阅同一通道时消息在消息处理程序之间如何分发消息。
为方便起见,该属性公开指向预先存在的 的实现的值枚举。
A(轮流处理程序之间的负载均衡)和(对于想要显式禁用负载均衡的情况)是唯一可用的值。
将来的版本中可能会添加其他策略实现。
但是,从 3.0 版开始,您可以提供自己的实现并使用属性注入它,该属性应指向实现 的 Bean,如以下示例所示:DirectChannel
load-balancer
load-balancer-ref
load-balancer
LoadBalancingStrategy
round-robin
none
LoadBalancingStrategy
load-balancer-ref
LoadBalancingStrategy
A 是仅支持无法取消订阅的单个订阅者。
这对于不涉及其他用户且不需要信道拦截器的高吞吐量性能用例非常有用。FixedSubscriberChannel
SubscribableChannel
MessageHandler
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
请注意,和 属性是互斥的。load-balancer
load-balancer-ref
负载平衡还与布尔属性结合使用。
如果该值为 true(默认值),则当前面的处理程序引发异常时,调度程序会回退到任何后续处理程序(根据需要)。
顺序由处理程序本身定义的可选顺序值确定,如果不存在此类值,则由处理程序订阅的顺序确定。failover
failover
如果某种情况要求调度程序始终尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序顺序回退,则不应提供负载均衡策略。
换言之,即使未启用负载平衡,调度程序仍支持布尔属性。
但是,如果没有负载均衡,处理程序的调用总是从第一个处理程序开始,根据它们的顺序。
例如,当有明确的一级、二级、三级等定义时,这种方法非常有效。
使用命名空间支持时,任何终结点上的属性都会确定顺序。failover
order
请记住,仅当通道具有多个订阅的消息处理程序时,才进行负载平衡和应用。
使用命名空间支持时,这意味着多个终结点共享属性中定义的相同通道引用。failover input-channel |
从版本 5.2 开始,如果为 true,则将记录当前处理程序的故障以及失败的消息,如果分别在 或 Configured 下。failover
debug
info
由于 是最简单的选项,并且不会增加调度和管理轮询器线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。
一般的想法是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,并修改这些通道以基于队列。
同样,如果频道需要广播消息,则不应是 .
稍后,我们将展示如何配置这些通道中的每一个。DirectChannel PollableChannels DirectChannel PublishSubscribeChannel |
请记住,仅当通道具有多个订阅的消息处理程序时,才进行负载平衡和应用。
使用命名空间支持时,这意味着多个终结点共享属性中定义的相同通道引用。failover input-channel |
ExecutorChannel
这是一个点对点通道,支持与(负载平衡策略和布尔属性)相同的调度程序配置。
这两种调度通道类型之间的主要区别在于,委托给 的实例来执行调度。
这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。
因此,它不支持跨越发送方和接收方处理程序的事务。ExecutorChannel
DirectChannel
failover
ExecutorChannel
TaskExecutor
发件人有时可以阻止。
例如,当使用带有限制客户端的拒绝策略(如 )时,发送方的线程可以在线程池达到其最大容量且执行程序的工作队列已满时执行该方法。
由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。TaskExecutor ThreadPoolExecutor.CallerRunsPolicy |
发件人有时可以阻止。
例如,当使用带有限制客户端的拒绝策略(如 )时,发送方的线程可以在线程池达到其最大容量且执行程序的工作队列已满时执行该方法。
由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。TaskExecutor ThreadPoolExecutor.CallerRunsPolicy |
PartitionedChannel
从版本 6.1 开始,将提供实现。
这是点对点调度逻辑的扩展,表示点对点调度逻辑,其中实际消耗在特定线程上处理,由从发送到此通道的消息评估的分区键确定。
此通道与上面提到的类似,但不同之处在于具有相同分区键的消息始终在同一线程中处理,从而保持顺序。
它不需要外部 ,但可以使用自定义(例如)进行配置。
此工厂用于将单线程执行程序填充到每个分区的委托中。
缺省情况下,消息头用作分区键。
此通道可以配置为简单的 Bean:PartitionedChannel
AbstractExecutorChannel
ExecutorChannel
TaskExecutor
ThreadFactory
Thread.ofVirtual().name("partition-", 0).factory()
MessageDispatcher
IntegrationMessageHeaderAccessor.CORRELATION_ID
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
通道将有分区 - 专用线程;将使用标头来确定将在哪个分区中处理消息。
有关更多信息,请参阅 Javadocs 类。3
partitionKey
PartitionedChannel
FluxMessageChannel
这是一种实现,用于将消息发送到内部,供下游的反应式订阅者按需使用。
此通道实现既不是 ,也不是 ,因此只能使用实例从此通道使用,以尊重反应流的背压性质。
另一方面,它通过其合约实现 a,允许从响应式源发布者接收事件,将响应式流桥接到集成流中。
为了实现整个集成流的完全反应行为,必须在流中的所有端点之间放置这样的通道。FluxMessageChannel
org.reactivestreams.Publisher
"sinking"
reactor.core.publisher.Flux
SubscribableChannel
PollableChannel
org.reactivestreams.Subscriber
FluxMessageChannel
ReactiveStreamsSubscribableChannel
subscribeTo(Publisher<Message<?>>)
有关与反应式流交互的更多信息,请参阅反应式流支持。
作用域通道
Spring Integration 1.0 提供了一个实现,但从 2.0 开始已被删除。
现在,处理相同要求的更通用方法是向通道添加属性。
属性的值可以是上下文中可用的范围的名称。
例如,在 Web 环境中,某些作用域可用,并且任何自定义作用域实现都可以注册到上下文中。
以下示例演示了应用于通道的线程本地作用域,包括作用域本身的注册:ThreadLocalChannel
scope
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
上一示例中定义的通道也委托给内部队列,但通道绑定到当前线程,因此队列的内容也类似地绑定。
这样,发送到通道的线程以后可以接收相同的消息,但其他线程无法访问它们。
虽然很少需要线程范围的通道,但在使用实例强制执行单个操作线程但任何应答消息都应发送到“终端”通道的情况下,它们可能很有用。
如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。DirectChannel
现在,由于任何通道都可以作用域,因此除了 thread-Local 之外,您还可以定义自己的作用域。