This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4!spring-doc.cn

Partitioning with the RabbitMQ Binder

RabbitMQ does not support partitioning natively.spring-doc.cn

Sometimes, it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing, all messages for a particular customer should go to the same partition.spring-doc.cn

The RabbitMessageChannelBinder provides partitioning by binding a queue for each partition to the destination exchange.spring-doc.cn

The following Java and YAML examples show how to configure the producer:spring-doc.cn

Producer
@SpringBootApplication
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            generate-out-0:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

The configuration in the prececing example uses the default partitioning (key.hashCode() % partitionCount). This may or may not provide a suitably balanced algorithm, depending on the key values. You can override this default by using the partitionSelectorExpression or partitionSelectorClass properties.spring-doc.cn

The required-groups property is required only if you need the consumer queues to be provisioned when the producer is deployed. Otherwise, any messages sent to a partition are lost until the corresponding consumer is deployed.spring-doc.cn

The following configuration provisions a topic exchange:spring-doc.cn

part exchange

The following queues are bound to that exchange:spring-doc.cn

part queues

The following bindings associate the queues to the exchange:spring-doc.cn

part bindings

The following Java and YAML examples continue the previous examples and show how to configure the consumer:spring-doc.cn

Consumer
@SpringBootApplication
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
            System.out.println(in + " received from queue " + queue);
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            listen-in-0:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0
The RabbitMessageChannelBinder does not support dynamic scaling. There must be at least one consumer per partition. The consumer’s instanceIndex is used to indicate which partition is consumed. Platforms such as Cloud Foundry can have only one instance with an instanceIndex.