线程屏障
有时,我们需要暂停消息流线程,直到发生其他异步事件。 例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。 在 RabbitMQ 代理发出收到消息的确认之前,我们可能希望不回复用户。
在版本 4.2 中, Spring 集成引入了<barrier/>
组件。
底层MessageHandler
是BarrierMessageHandler
.
此类还实现MessageTriggerAction
,其中将消息传递给trigger()
方法在handleRequestMessage()
方法(如果存在)。
挂起的线程和触发线程通过调用CorrelationStrategy
在消息上。
当消息发送到input-channel
,则线程将暂停长达requestTimeout
毫秒,等待相应的触发器消息。
默认关联策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID
页眉。
当触发器消息以相同的关联到达时,线程将被释放。
发送到output-channel
使用MessageGroupProcessor
.
默认情况下,消息是Collection<?>
,标头使用DefaultAggregatingMessageGroupProcessor
.
如果trigger() method 被首先调用(或在主线程超时后),它将被暂停,直到triggerTimeout 等待 suspending 消息到达。
如果您不想暂停触发器线程,请考虑将TaskExecutor 而是使其线程被挂起。 |
在 5.4 版本之前,只有一个timeout 选项,但在某些用例中,最好为这些作设置不同的超时。
因此requestTimeout 和triggerTimeout 引入了选项。 |
这requires-reply
属性确定如果挂起的线程在触发器消息到达之前超时时要采取的作。
默认情况下,它是false
,这意味着终端节点返回null
,流结束,线程返回给调用方。
什么时候true
一个ReplyRequiredException
被抛出。
您可以调用trigger()
方法(使用名称barrier.handler
— 其中barrier
是 barrier 端点的 bean 名称)。
或者,您也可以配置<outbound-channel-adapter/>
以触发发布。
只能暂停一个具有相同关联的线程。 相同的关联可以多次使用,但只能同时使用一次。 如果第二个线程以相同的关联到达,则会引发异常。 |
以下示例演示如何使用自定义标头进行关联:
-
Java
-
XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根据哪个线程的消息先到达,将消息发送到in
或向release
最多等待 10 秒,直到另一条消息到达。
当消息被释放时,out
channel 会发送一条消息,该消息将调用自定义MessageGroupProcessor
Bean 中名为myOutputProcessor
.
如果主线程超时并且触发器稍后到达,则可以配置将延迟触发器发送到的 discard 通道。
如果请求消息未及时到达,触发器消息也会被丢弃。
有关此组件的示例,请参阅 barrier 示例应用程序。