This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4!spring-doc.cn

Initial Consumer Support for the RabbitMQ Stream Plugin

Basic support for the RabbitMQ Stream Plugin is now provided. To enable this feature, you must add the spring-rabbit-stream jar to the class path - it must be the same version as spring-amqp and spring-rabbit.spring-doc.cn

The consumer properties described above are not supported when you set the containerType property to stream; concurrency is supported for super streams only. Only a single stream queue can be consumed by each binding.

To configure the binder to use containerType=stream, Spring Boot will automatically configure an Environment @Bean from the application properties. You can, optionally, add a customizer to customize the listener container.spring-doc.cn

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

The name argument passed to the customizer is destination + '.' + group + '.container'.spring-doc.cn

The stream name() (for the purpose of offset tracking) is set to the binding destination + '.' + group. It can be changed using a ConsumerCustomizer shown above. If you decide to use manual offset tracking, the Context is available as a message header:spring-doc.cn

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and consumer builder.spring-doc.cn

Consumer Support for the RabbitMQ Super Streams

See Super Streams for information about super streams.spring-doc.cn

Use of super streams allows for automatic scale-up scale-down with a single active consumer on each partition of a super stream.spring-doc.cn

Configuration example:spring-doc.cn

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

The framework will create a super stream named super, with 9 partitions. Up to 3 instances of this application can be deployed.spring-doc.cn