此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
本节介绍入站和出站消息的消息排序。
入境
如果需要对入站消息进行严格排序,则必须将入站侦听器容器的属性配置为 。
这是因为,如果消息失败并重新传递,则它会在现有的预取消息之后到达。
从 Spring AMQP 版本 2.0 开始,默认为提高性能。
严格的订购要求是以性能下降为代价的。prefetchCount
1
prefetchCount
250
出境
请考虑以下集成流程:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假设我们向网关发送消息。
虽然消息 , , 很可能是按顺序发送的,但不能保证。
这是因为模板从缓存中“借用”每个发送操作的通道,并且无法保证每条消息都使用相同的通道。
一种解决方案是在拆分器之前启动事务,但 RabbitMQ 中的事务成本很高,并且会使性能降低数百倍。A
B
C
A
B
C
为了以更有效的方式解决这个问题,从 5.1 版开始,Spring Integration 提供了 .
请参阅处理消息建议。
在拆分器之前应用时,它确保所有下游操作都在同一通道上执行,并且可以选择等到发布者确认所有已发送消息(如果连接工厂配置为确认)。
以下示例演示如何使用:BoundRabbitChannelAdvice
HandleMessageAdvice
BoundRabbitChannelAdvice
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
请注意,建议和出站适配器中使用了相同的(实现 )。
该建议在模板的方法中运行下游流,以便所有操作在同一通道上运行。
如果提供了可选超时,则当流完成时,建议将调用该方法,如果在指定时间内未收到确认,则该方法将引发异常。RabbitTemplate
RabbitOperations
invoke
waitForConfirmsOrDie
下游流(、 等)中不得有线程放手。QueueChannel ExecutorChannel |
下游流(、 等)中不得有线程放手。QueueChannel ExecutorChannel |