This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4!spring-doc.cn

Initial Producer Support for the RabbitMQ Stream Plugin

Basic support for the RabbitMQ Stream Plugin is now provided. To enable this feature, you must add the spring-rabbit-stream jar to the class path - it must be the same version as spring-amqp and spring-rabbit.spring-doc.cn

The producer properties described above are not supported when you set the producerType property to STREAM_SYNC or STREAM_ASYNC.

To configure the binder to use a stream ProducerType, Spring Boot will configure an Environment @Bean from the applicaation properties. You can, optionally, add a customizer to customize the message handler.spring-doc.cn

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and producer builder.spring-doc.cn

Producer Support for the RabbitMQ Super Streams

See Super Streams for information about super streams.spring-doc.cn

Use of super streams allows for automatic scale-up scale-down with a single active consumer on each partition of a super stream. Using Spring Cloud Stream, you can publish to a super stream either over AMQP, or using the stream client.spring-doc.cn

The super stream must already exist; creating a super stream is not supported by producer bindings.

Publishing to a super stream over AMQP:spring-doc.cn

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

Publishing to a super stream using the stream client:spring-doc.cn

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

When using the stream client, if you set a confirmAckChannel, a copy of a successfully sent message will be sent to that channel.spring-doc.cn