此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.2.0spring-doc.cadn.net.cn

RabbitMQ Stream 插件的初始创建者支持

现在提供了对 RabbitMQ Stream 插件的基本支持。 要启用此功能,您必须添加spring-rabbit-streamjar 添加到类路径中 - 它必须与spring-amqpspring-rabbit.spring-doc.cadn.net.cn

当您将producerTypeproperty 设置为STREAM_SYNCSTREAM_ASYNC.

将 Binder 配置为使用流ProducerType,Spring Boot 将配置Environment @Bean从 Applicaation 属性。 您可以选择添加定制器来自定义消息处理程序。spring-doc.cadn.net.cn

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

有关配置环境和创建者生成器的信息,请参阅 RabbitMQ Stream Java 客户端文档spring-doc.cadn.net.cn

RabbitMQ Super Streams 的创建者支持

有关超级流的信息,请参阅 Super Streamsspring-doc.cadn.net.cn

使用超级流允许在超级流的每个分区上使用单个活动使用者进行自动纵向扩展。 使用 Spring Cloud Stream,您可以通过 AMQP 或使用流客户端发布到超级流。spring-doc.cadn.net.cn

super 流必须已经存在;创建者绑定不支持创建超级流。

通过 AMQP 发布到超级流:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客户端发布到 super 流:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客户端时,如果将confirmAckChannel,将向该通道发送成功发送消息的副本。spring-doc.cadn.net.cn