使用 RabbitMQ Stream 插件

版本 2.4 引入了对 RabbitMQ Stream Plugin 的 Java 客户端的初始支持。spring-doc.cadn.net.cn

添加spring-rabbit-stream依赖项添加到项目中:spring-doc.cadn.net.cn

maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.2.0</version>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit-stream:3.2.0'

您可以像往常一样预置队列,使用RabbitAdminbean,使用QueueBuilder.stream()方法来指定队列类型。 例如:spring-doc.cadn.net.cn

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

但是,这仅在您还使用非流组件(例如SimpleMessageListenerContainerDirectMessageListenerContainer),因为在打开 AMQP 连接时,会触发管理员来声明定义的 bean。 如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应配置StreamAdmin相反:spring-doc.cadn.net.cn

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

请参阅 RabbitMQ 文档,了解有关StreamCreator.spring-doc.cadn.net.cn

发送消息

RabbitStreamTemplate提供了RabbitTemplate(AMQP) 功能。spring-doc.cadn.net.cn

RabbitStream作
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

RabbitStreamTemplateimplementation 具有以下构造函数和属性:spring-doc.cadn.net.cn

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter用于convertAndSend将对象转换为 Spring AMQP 的方法Message.spring-doc.cadn.net.cn

StreamMessageConverter用于从 Spring AMQP 转换Message到本机流Message.spring-doc.cadn.net.cn

您还可以发送本机流Messages 直接;使用messageBuilder()方法提供对Producer的消息构建器。spring-doc.cadn.net.cn

ProducerCustomizer提供了一种在构建 Producer 之前对其进行自定义的机制。spring-doc.cadn.net.cn

请参阅 Java 客户端文档,了解如何自定义EnvironmentProducer.spring-doc.cadn.net.cn

接收消息

异步消息接收由StreamListenerContainer(以及StreamRabbitListenerContainerFactory使用@RabbitListener).spring-doc.cadn.net.cn

侦听器容器需要一个Environment以及单个流名称。spring-doc.cadn.net.cn

你可以接收 Spring AMQPMessages 使用经典MessageListener,或者您可以接收原生流Message使用新接口:spring-doc.cadn.net.cn

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

有关支持的属性的信息,请参阅 Message Listener Container Configurationspring-doc.cadn.net.cn

与模板类似,容器有一个ConsumerCustomizer财产。spring-doc.cadn.net.cn

请参阅 Java 客户端文档,了解如何自定义EnvironmentConsumer.spring-doc.cadn.net.cn

使用@RabbitListener,配置StreamRabbitListenerContainerFactory;此时,大多数@RabbitListener属性 (concurrency等)将被忽略。只id,queues,autoStartupcontainerFactory受支持。 另外queues只能包含一个流名称。spring-doc.cadn.net.cn

例子

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

版本 2.4.5 添加了adviceChain属性设置为StreamListenerContainer(及其工厂)。 还提供了一个新的工厂 Bean 来创建无状态重试拦截器,其中包含可选的StreamMessageRecoverer用于使用 Raw Stream 消息。spring-doc.cadn.net.cn

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}
此容器不支持有状态重试。

超级流

Super Stream 是分区流的抽象概念,通过将多个 stream 队列绑定到具有参数的 exchange 来实现x-super-stream: true.spring-doc.cadn.net.cn

供应

为方便起见,可以通过定义SuperStream.spring-doc.cadn.net.cn

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin检测到此 Bean 并将声明 Exchange (my.super.stream) 和 3 个队列(分区)-my.super-stream-n哪里n0,1,2绑定的路由密钥等于n.spring-doc.cadn.net.cn

如果您还希望通过 AMQP 向 Exchange 发布信息,则可以提供自定义路由密钥:spring-doc.cadn.net.cn

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

键的数量必须等于分区的数量。spring-doc.cadn.net.cn

使用 SuperStream 进行制作

您必须添加superStreamRoutingFunctionRabbitStreamTemplate:spring-doc.cadn.net.cn

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

您还可以使用RabbitTemplate.spring-doc.cadn.net.cn

使用具有单个活跃使用者的 Super Streams

调用superStream方法在 Super Stream 上启用单个活跃的 Consumer 的用户。spring-doc.cadn.net.cn

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}
此时,当并发数大于 1 时,实际的并发数由Environment;要实现完全并发,请将环境的maxConsumersByConnection设置为 1。 请参阅配置环境

千分尺观察

现在支持使用 Micrometer 进行观察,从 3.0.5 版本开始,对于RabbitStreamTemplate以及流侦听器容器。 该容器现在还支持 Micrometer 计时器(未启用观察时)。spring-doc.cadn.net.cn

设置observationEnabled在每个组件上以便于观察;这将禁用 Micrometer Timers,因为计时器现在将随每个观测一起管理。 使用带注解的侦听器时,将observationEnabled在集装箱工厂。spring-doc.cadn.net.cn

有关更多信息,请参阅 Micrometer Tracingspring-doc.cadn.net.cn

要向计时器/跟踪添加标签,请配置自定义RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConvention分别添加到模板或侦听器容器中。spring-doc.cadn.net.cn

默认实现会添加name标签,以及listener.id标记。spring-doc.cadn.net.cn

你可以子类化DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConvention或提供全新的实现。spring-doc.cadn.net.cn

有关更多详细信息,请参阅千分尺观测文档spring-doc.cadn.net.cn