使用 Batching 进行@RabbitListener

当收到一批消息时,取消批处理通常由容器执行,并且一次使用一条消息调用侦听器。 从版本 2.2 开始,你可以配置监听器容器工厂和监听器在一次调用中接收整个批处理,只需将工厂的batchListener属性,并将方法有效负载参数设为ListCollection:spring-doc.cadn.net.cn

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

设置batchListener属性设置为 true 会自动关闭deBatchingEnabledcontainer 属性(除非consumerBatchEnabledtrue- 见下文)。实际上,取消批处理从容器移动到侦听器适配器,并且适配器会创建传递给侦听器的列表。spring-doc.cadn.net.cn

启用批处理的工厂不能与多方法侦听器一起使用。spring-doc.cadn.net.cn

同样从 2.2 版本开始。当一次接收一条批量消息时,最后一条消息包含一个布尔标头,该标头设置为true. 可以通过添加@Header(AmqpHeaders.LAST_IN_BATCH)boolean last' 参数添加到侦听器方法中。 标头从MessageProperties.isLastInBatch(). 另外AmqpHeaders.BATCH_SIZE填充每个消息片段中批次的大小。spring-doc.cadn.net.cn

此外,还有一个新属性consumerBatchEnabled已添加到SimpleMessageListenerContainer. 如果为 true,容器将创建一批消息,最多batchSize;如果满足以下条件,则交付部分批次receiveTimeout已过,没有新消息到达。 如果收到生产者创建的批处理,则会对其进行 Debatch 并将其添加到使用者端批处理中;因此,实际传送的消息数可能会超过batchSize,表示从 Broker 接收的消息数。deBatchingEnabled必须为 true,当consumerBatchEnabled是真的;Container Factory 将强制执行此要求。spring-doc.cadn.net.cn

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

使用consumerBatchEnabled@RabbitListener:spring-doc.cadn.net.cn

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 第一个是用原始的、未转换的org.springframework.amqp.core.Messages 收到。spring-doc.cadn.net.cn

  • 第二个使用org.springframework.messaging.Message<?>替换为转换后的有效负载和映射的标头/属性。spring-doc.cadn.net.cn

  • 第三个是使用转换后的有效负载调用的,无法访问标头/属性。spring-doc.cadn.net.cn

您还可以添加Channel参数,通常在使用MANUALACK 模式。 这对于第三个示例不是很有用,因为您无权访问delivery_tag财产。spring-doc.cadn.net.cn

Spring Boot 为consumerBatchEnabledbatchSize,但不适用于batchListener. 从版本 3.0 开始,将consumerBatchEnabledtrue在 Container Factory 上还设置batchListenertrue. 什么时候consumerBatchEnabledtrue,则侦听器必须是 Batch 侦听器。spring-doc.cadn.net.cn

从版本 3.0 开始,侦听器方法可以消耗Collection<?>List<?>.spring-doc.cadn.net.cn

批处理模式下的侦听器不支持回复,因为批处理中的消息与生成的单个回复之间可能没有关联。 批处理侦听器仍支持异步返回类型