严格的消息排序

本节介绍入站和出站消息的消息排序。spring-doc.cadn.net.cn

入境

如果需要对入站消息进行严格排序,则必须配置入站侦听器容器的prefetchCountproperty 设置为1. 这是因为,如果消息失败并重新传递,它将在现有的预取消息之后到达。 从 Spring AMQP 版本 2.0 开始,prefetchCount默认为250以提高性能。 严格的订购要求以性能下降为代价。spring-doc.cadn.net.cn

出境

请考虑以下集成流程:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假设我们发送消息A,BC到网关。 虽然消息A,B,C均按顺序发送,则不保证。 这是因为模板为每个 send作从缓存中“借用”一个通道,并且不能保证每条消息都使用相同的通道。 一种解决方案是在拆分器之前启动事务,但事务在 RabbitMQ 中成本高昂,并且性能会降低数百倍。spring-doc.cadn.net.cn

为了以更有效的方式解决这个问题,从版本 5.1 开始, Spring 集成提供了BoundRabbitChannelAdvice这是一个HandleMessageAdvice. 请参阅处理消息建议。 在拆分器之前应用时,它确保所有下游作都在同一通道上执行,并且可以选择等待收到所有已发送消息的发布者确认(如果连接工厂配置为确认)。 以下示例演示如何使用BoundRabbitChannelAdvice:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

请注意,相同的RabbitTemplate(它实现RabbitOperations) 用于通知和出站适配器。 该通知在模板的invoke方法,以便所有作都在同一通道上运行。 如果提供了可选的超时,则当流程完成时,建议会调用waitForConfirmsOrDie方法,如果在指定时间内未收到确认,则会引发异常。spring-doc.cadn.net.cn

下游流中不得有线程放手 (QueueChannel,ExecutorChannel等)。