使用 Batching 进行@RabbitListener
当收到一批消息时,取消批处理通常由容器执行,并且一次使用一条消息调用侦听器。
从版本 2.2 开始,你可以配置监听器容器工厂和监听器在一次调用中接收整个批处理,只需将工厂的batchListener
属性,并将方法有效负载参数设为List
或Collection
:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
设置batchListener
属性设置为 true 会自动关闭deBatchingEnabled
container 属性(除非consumerBatchEnabled
是true
- 见下文)。实际上,取消批处理从容器移动到侦听器适配器,并且适配器会创建传递给侦听器的列表。
启用批处理的工厂不能与多方法侦听器一起使用。
同样从 2.2 版本开始。当一次接收一条批量消息时,最后一条消息包含一个布尔标头,该标头设置为true
.
可以通过添加@Header(AmqpHeaders.LAST_IN_BATCH)
boolean last' 参数添加到侦听器方法中。
标头从MessageProperties.isLastInBatch()
.
另外AmqpHeaders.BATCH_SIZE
填充每个消息片段中批次的大小。
此外,还有一个新属性consumerBatchEnabled
已添加到SimpleMessageListenerContainer
.
如果为 true,容器将创建一批消息,最多batchSize
;如果满足以下条件,则交付部分批次receiveTimeout
已过,没有新消息到达。
如果收到生产者创建的批处理,则会对其进行 Debatch 并将其添加到使用者端批处理中;因此,实际传送的消息数可能会超过batchSize
,表示从 Broker 接收的消息数。deBatchingEnabled
必须为 true,当consumerBatchEnabled
是真的;Container Factory 将强制执行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
使用consumerBatchEnabled
跟@RabbitListener
:
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一个是用原始的、未转换的
org.springframework.amqp.core.Message
s 收到。 -
第二个使用
org.springframework.messaging.Message<?>
替换为转换后的有效负载和映射的标头/属性。 -
第三个是使用转换后的有效负载调用的,无法访问标头/属性。
您还可以添加Channel
参数,通常在使用MANUAL
ACK 模式。
这对于第三个示例不是很有用,因为您无权访问delivery_tag
财产。
Spring Boot 为consumerBatchEnabled
和batchSize
,但不适用于batchListener
.
从版本 3.0 开始,将consumerBatchEnabled
自true
在 Container Factory 上还设置batchListener
自true
.
什么时候consumerBatchEnabled
是true
,则侦听器必须是 Batch 侦听器。
从版本 3.0 开始,侦听器方法可以消耗Collection<?>
或List<?>
.
批处理模式下的侦听器不支持回复,因为批处理中的消息与生成的单个回复之间可能没有关联。 批处理侦听器仍支持异步返回类型。 |