此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
有时,我们需要挂起消息流线程,直到发生其他异步事件。 例如,假设一个 HTTP 请求将消息发布到 RabbitMQ。 我们可能希望在 RabbitMQ 代理发出消息已收到的确认之前不回复用户。
在版本 4.2 中,Spring Integration 为此引入了该组件。
基础是 .
此类还实现 ,其中传递给方法的消息在方法中释放相应的线程(如果存在)。<barrier/>
MessageHandler
BarrierMessageHandler
MessageTriggerAction
trigger()
handleRequestMessage()
挂起的线程和触发线程通过对消息调用 a 进行关联。
当消息发送到 时,线程将暂停最多几毫秒,等待相应的触发消息。
默认关联策略使用标头。
当触发消息以相同的相关性到达时,线程将被释放。
发送到发布后的消息是使用 .
默认情况下,消息是两个有效负载中的一个,并且标头通过使用 .CorrelationStrategy
input-channel
requestTimeout
IntegrationMessageHeaderAccessor.CORRELATION_ID
output-channel
MessageGroupProcessor
Collection<?>
DefaultAggregatingMessageGroupProcessor
如果首先调用该方法(或在主线程超时之后),则该方法将被挂起,直到等待挂起消息到达。
如果不想挂起触发线程,请考虑改为切换,以便挂起其线程。trigger() triggerTimeout TaskExecutor |
如果首先调用该方法(或在主线程超时之后),则该方法将被挂起,直到等待挂起消息到达。
如果不想挂起触发线程,请考虑改为切换,以便挂起其线程。trigger() triggerTimeout TaskExecutor |
在之前的 5.4 版中,请求和触发消息只有一个选项,但在某些用例中,最好为这些操作设置不同的超时时间。
因此,引入了选项。timeout requestTimeout triggerTimeout |
在之前的 5.4 版中,请求和触发消息只有一个选项,但在某些用例中,最好为这些操作设置不同的超时时间。
因此,引入了选项。timeout requestTimeout triggerTimeout |
该属性确定如果挂起的线程在触发消息到达之前超时时要执行的操作。
默认情况下,它是 ,这意味着终结点返回 ,流结束,线程返回给调用方。
当 时,a 被抛出。requires-reply
false
null
true
ReplyRequiredException
您可以通过编程方式调用该方法(使用名称 — 获取 Bean 引用,其中 be 是屏障端点的 Bean 名称)。
或者,您可以配置一个以触发发布。trigger()
barrier.handler
barrier
<outbound-channel-adapter/>
只能以相同的相关性挂起一个线程。 相同的相关性可以多次使用,但只能同时使用一次。 如果第二个线程以相同的相关性到达,则会引发异常。 |
只能以相同的相关性挂起一个线程。 相同的相关性可以多次使用,但只能同时使用一次。 如果第二个线程以相同的相关性到达,则会引发异常。 |
下面的示例演示如何使用自定义标头进行关联:
-
Java
-
XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根据哪个线程先到达消息,向其发送消息的线程或向其发送消息的线程最多等待 10 秒,直到另一条消息到达。
释放消息时,将向通道发送一条消息,该消息结合了调用自定义 Bean 的结果,名为 。
如果主线程超时并且触发器稍后到达,则可以配置将延迟触发器发送到的丢弃通道。in
release
out
MessageGroupProcessor
myOutputProcessor
有关此组件的示例,请参阅屏障示例应用程序。