此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
RabbitMQ Stream 插件的初始使用者支持
现在提供了对 RabbitMQ Stream 插件的基本支持。
要启用此功能,您必须添加spring-rabbit-stream
jar 添加到类路径中 - 它必须与spring-amqp
和spring-rabbit
.
当您将containerType property 设置为stream ; concurrency 仅支持 Super Streams。
每个绑定只能使用一个流队列。 |
配置活页夹以使用containerType=stream
,Spring Boot 将自动配置Environment
@Bean
从 Application Properties 中。
您可以选择添加定制器来自定义侦听器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
这name
传递给定制器的参数是destination + '.' + group + '.container'
.
流name()
(为了偏移跟踪)设置为 bindingdestination + '.' + group
.
可以使用ConsumerCustomizer
如上所示。
如果您决定使用手动偏移跟踪,则Context
可用作消息标头:
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有关配置环境和使用者生成器的信息,请参阅 RabbitMQ Stream Java 客户端文档。
RabbitMQ Super Streams 的使用者支持
有关超级流的信息,请参阅 Super Streams。
使用超级流允许在超级流的每个分区上使用单个活动使用者进行自动纵向扩展。
配置示例:
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
框架将创建一个名为super
,具有 9 个分区。
最多可以部署此应用程序的 3 个实例。