反应式支持

该框架为几乎所有支持的功能提供了 Reactive 对应项。spring-doc.cadn.net.cn

如果你把Reactive在提供的命令式组件前面,你可能会找到它的 Reactive 对应项。spring-doc.cadn.net.cn

但是,尚不支持以下各项:spring-doc.cadn.net.cn

1. 前言

我们建议对基于 Apache Pulsar 的 Spring 应用程序使用 Spring-Boot-First 方法,因为这极大地简化了事情。 为此,您可以添加spring-pulsar-reactive-spring-boot-startermodule 作为依赖项。
此参考的大部分内容都希望读者使用 starter,并在考虑这一点的情况下提供大多数配置说明。 但是,当说明特定于 Spring Boot Starters用法时,会努力调用。

2. 快速导览

我们将通过展示一个以 Reactive 方式生成和使用的示例 Spring Boot 应用程序,快速浏览一下 Spring for Apache Pulsar 中的 Reactive 支持。 这是一个完整的应用程序,不需要任何额外的配置,只要你在默认位置运行一个 Pulsar 集群 -localhost:6650.spring-doc.cadn.net.cn

2.1. 依赖项

Spring Boot 应用程序只需要spring-pulsar-reactive-spring-boot-starterDependency。下面的清单分别显示了如何定义 Maven 和 Gradle 的依赖关系:spring-doc.cadn.net.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar-reactive</artifactId>
        <version>3.2.12</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive:3.2.12'
}

使用Version 0.2.x上述坐标变化如下:spring-doc.cadn.net.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-reactive-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-reactive-spring-boot-starter:0.2.0'
}

2.2. 应用程序代码

以下是应用程序源代码:spring-doc.cadn.net.cn

@SpringBootApplication
public class ReactiveSpringPulsarHelloWorld {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveSpringPulsarHelloWorld.class, args);
    }

    @Bean
    ApplicationRunner runner(ReactivePulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Reactive Pulsar World!").subscribe();
    }

    @ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
    Mono<Void> listen(String message) {
        System.out.println("Reactive listener received: " + message);
        return Mono.empty();
    }
}

就是这样,只需几行代码,我们就有了一个可以工作的 Spring Boot 应用程序,它以响应式方式生成和消费来自 Pulsar 主题的消息。spring-doc.cadn.net.cn

启动后,应用程序会使用ReactivePulsarTemplate要将消息发送到hello-pulsar-topic. 然后,它从hello-pulsar-topic使用@ReactivePulsarListener.spring-doc.cadn.net.cn

简单性的关键要素之一是 Spring Boot Starters,它可以自动配置并为应用程序提供所需的组件

3. 设计

以下是需要牢记的几个关键设计点。spring-doc.cadn.net.cn

3.1. Apache Pulsar 反应式

反应式支持最终由 Apache Pulsar Reactive 客户端提供,其当前实现是围绕常规 Pulsar 客户端异步 API 的完全非阻塞适配器。 这意味着 Reactive 客户端需要常规客户端。spring-doc.cadn.net.cn

3.2. 加法自动配置

由于对常规(命令式)客户端的依赖,框架提供的 Reactive 自动配置是对命令式自动配置的补充。 换句话说,命令式Starters只包括命令式组件,而反应式Starters包括命令式和反应式组件。spring-doc.cadn.net.cn

4. 反应式 Pulsar 客户端

当您使用 Reactive Pulsar Spring Boot Starter 时,您将获得ReactivePulsarClientauto-configured的。spring-doc.cadn.net.cn

默认情况下,应用程序会尝试连接到位于pulsar://localhost:6650. 这可以通过设置spring.pulsar.client.service-url属性设置为不同的值。spring-doc.cadn.net.cn

该值必须是有效的 Pulsar 协议 URL

还有许多其他应用程序属性(继承自改编的命令式客户端)可供配置。 请参阅spring.pulsar.client.*应用程序属性。spring-doc.cadn.net.cn

4.1. 身份验证

要连接到需要身份验证的 Pulsar 集群,请按照与命令式客户端相同的步骤进行作。 同样,这是因为反应式 Client 端采用了处理所有安全配置的命令式 Client 端。spring-doc.cadn.net.cn

5. 消息制作

5.1. ReactivePulsarTemplate

在 Pulsar 生产者端,Spring Boot 自动配置提供了一个ReactivePulsarTemplate用于发布记录。该模板实现了一个名为ReactivePulsarOperations并提供通过其 Contract 发布记录的方法。spring-doc.cadn.net.cn

该模板提供了 send 方法,这些方法接受单个消息并返回Mono<MessageId>. 它还提供了接受多条消息的 send 方法(以 ReactiveStreams 的形式Publishertype) 并返回一个Flux<MessageId>.spring-doc.cadn.net.cn

对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。

5.1.1. Fluent API

该模板提供了一个 Fluent 构建器来处理更复杂的发送请求。spring-doc.cadn.net.cn

5.1.2. 消息自定义

您可以指定MessageSpecBuilderCustomizer以配置传出消息。例如,以下代码演示如何发送键控消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

5.1.3. 发件人自定义

您可以指定ReactiveMessageSenderBuilderCustomizer配置底层 Pulsar sender 构建器,最终构建用于发送传出消息的 sender。spring-doc.cadn.net.cn

请谨慎使用,因为这会提供对 sender 构建器的完全访问权限,并调用其某些方法(例如create) 可能会产生意想不到的副作用。

例如,以下代码演示如何禁用批处理和启用分块:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

另一个示例显示了在将记录发布到分区主题时如何使用自定义路由。 指定您的自定义MessageRouter在 Sender Builder 上实施,例如:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();
请注意,当使用MessageRouter,则spring.pulsar.producer.message-routing-modecustom.

5.2. 指定 Schema 信息

如果您使用 Java 基元类型,框架会自动为您检测架构,并且您无需指定任何架构类型来发布数据。 对于非原始类型,如果在对ReactivePulsarTemplate,Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON从 type.spring-doc.cadn.net.cn

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。

5.2.1. 自定义 Schema 映射

作为在ReactivePulsarTemplate对于复杂类型,可以使用类型的映射配置 Schema Resolver。 这样就无需在框架使用传出消息类型咨询解析程序时指定架构。spring-doc.cadn.net.cn

架构映射可以使用spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml要为UserAddress复杂对象 usingAVROJSONschemas 的 Schemas:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type是 Message 类的完全限定名称。

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。spring-doc.cadn.net.cn

以下示例使用架构解析程序定制器为UserAddress复杂对象 usingAVROJSONschemas 的 Schemas:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

有了这个配置,就不需要设置 specify the schema on send operations 了。spring-doc.cadn.net.cn

5.3. ReactivePulsarSenderFactory

ReactivePulsarTemplate依赖于ReactivePulsarSenderFactory以实际创建基础发件人。spring-doc.cadn.net.cn

Spring Boot 提供了这个发送者工厂,可以使用任何spring.pulsar.producer.*应用程序属性。spring-doc.cadn.net.cn

如果直接使用发送方工厂 API 时未指定 topic 信息,则ReactivePulsarTemplate与一个例外一起使用,即省略了 “Message type default” 步骤。

5.3.1. 生产者缓存

每个底层 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建 producer,ReactiveMessageSenderCache在底层 Apache Pulsar Reactive 客户端缓存它创建的生产者。 它们以 LRU 方式缓存,并在配置的时间段内未使用时被驱逐。spring-doc.cadn.net.cn

您可以通过指定任何spring.pulsar.producer.cache.*应用程序属性。spring-doc.cadn.net.cn

6. 消息消费

6.1. @ReactivePulsarListener

对于 Pulsar 消费者,我们建议最终用户应用程序使用ReactivePulsarListener注解。 要使用ReactivePulsarListener,您需要使用@EnableReactivePulsar注解。 当您使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。spring-doc.cadn.net.cn

让我们重新审视一下ReactivePulsarListener我们在 Quick-Tour 部分看到的代码片段:spring-doc.cadn.net.cn

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}
listener 方法返回一个Mono<Void>以指示消息是否已成功处理。Mono.empty()表示成功 (确认) 和Mono.error()表示失败(否定确认)。

您还可以进一步简化此方法:spring-doc.cadn.net.cn

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

在这种最基本的形式中,当topics未直接提供,则使用主题解析过程来确定目标主题。 同样,当subscriptionName未在@ReactivePulsarListenerannotation 将使用自动生成的订阅名称。spring-doc.cadn.net.cn

ReactivePulsarListener方法,我们接收数据为String,但我们没有指定任何 schema 类型。 在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。 框架检测到您希望Stringtype,然后根据该信息推断 schema 类型。 然后,它将该架构提供给使用者。 对于 Java 中的所有原始类型,框架都会执行此推理。 对于任何复杂类型(例如 JSON、AVRO 等),框架无法进行此推理,用户需要使用schemaType财产。spring-doc.cadn.net.cn

此示例显示了我们如何使用 topic 中的复杂类型:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

请注意,添加了schemaType属性ReactivePulsarListener. 这是因为该库无法从提供的类型推断架构类型:Foo.我们必须告诉框架要使用什么 schema。spring-doc.cadn.net.cn

让我们看看我们可以消费的更多方式。spring-doc.cadn.net.cn

此示例直接使用 Pulsar 消息:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

此示例使用包装在 Spring 消息传递信封中的记录:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

6.1.1. 流

以上都是逐个使用单个记录的示例。 然而,使用 Reactive 的一个令人信服的理由是具有背压支持的流功能。spring-doc.cadn.net.cn

以下示例使用ReactivePulsarListener要使用 POJO 流:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);

在这里,我们以Flux的 Pulsar 消息。 此外,要在ReactivePulsarListenerlevel 中,您需要设置stream属性设置为true.spring-doc.cadn.net.cn

listener 方法返回一个Flux<MessageResult<Void>>其中,每个元素表示一条已处理的消息,并保存消息 ID、值以及是否被确认。 这MessageResult有一组静态工厂方法,可用于创建适当的MessageResult实例。

根据Flux,框架会尝试推断要使用的 schema。 如果它包含复杂类型,您仍然需要提供schemaTypeReactivePulsarListener.spring-doc.cadn.net.cn

以下侦听器使用 Spring 消息传递Message具有复杂类型的 envelope :spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}
listener 方法返回一个Flux<MessageResult<Void>>其中,每个元素表示一条已处理的消息,并保存消息 ID、值以及是否被确认。 SpringMessageUtils有一组静态工厂方法,可用于创建适当的MessageResult实例。

6.1.2. 配置 - 应用程序属性

侦听器依赖于ReactivePulsarConsumerFactory创建和管理用于消费消息的底层 Pulsar 消费者。 Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*应用程序属性。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外spring-doc.cadn.net.cn

spring.pulsar.consumer.subscription.name属性将被忽略,而是在未在 Comments 上指定时生成。
spring.pulsar.consumer.subscription-typeproperty 被忽略,而是取自 annotation 上的值。但是,您可以将subscriptionType = {}以改用 Property 值作为默认值。

6.1.3. 消费者定制

您可以指定ReactivePulsarListenerMessageConsumerBuilderCustomizer配置底层 Pulsar consumer builder,最终构造侦听器用来接收消息的 consumer。spring-doc.cadn.net.cn

请谨慎使用,因为这会提供对使用者构建器的完全访问权限,并调用其某些方法(例如create) 可能会产生意想不到的副作用。

例如,下面的代码演示如何将订阅的初始位置设置为主题上最早的消息。spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只有一个@ReactivePulsarListener和一个ReactivePulsarListenerMessageConsumerBuilderCustomizerbean 已注册,则将自动应用定制器。

您还可以使用定制器向 Consumer Builder 提供直接的 Pulsar Consumer 属性。 如果您不想使用前面提到的 Boot 配置属性或具有多个ReactivePulsarListener方法。spring-doc.cadn.net.cn

以下定制器示例使用直接 Pulsar 消费者属性:spring-doc.cadn.net.cn

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 消费者属性,而不是spring.pulsar.consumerSpring Boot 配置属性

6.2. 指定 Schema 信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在ReactivePulsarListener. 对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON从 type.spring-doc.cadn.net.cn

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。

6.2.1. 自定义 Schema 映射

作为在ReactivePulsarListener对于复杂类型,可以使用类型的映射配置 Schema Resolver。 这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。spring-doc.cadn.net.cn

架构映射可以使用spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml要为UserAddress复杂对象 usingAVROJSONschemas 的 Schemas:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type是 Message 类的完全限定名称。

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。spring-doc.cadn.net.cn

以下示例使用架构解析程序定制器为UserAddress复杂对象 usingAVROJSONschemas 的 Schemas:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

有了这个配置,就不需要在侦听器上设置 schema,例如:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
    System.out.println(user);
    return Mono.empty();
}

6.3. 消息侦听器容器基础设施

在大多数情况下,我们建议使用ReactivePulsarListener注解直接用于从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。 但是,了解如何作非常重要ReactivePulsarListener在内部工作。spring-doc.cadn.net.cn

当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息使用的核心。 这ReactivePulsarListener在幕后使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。spring-doc.cadn.net.cn

6.3.1. ReactivePulsarMessageListenerContainer

此消息侦听器容器的协定是通过ReactivePulsarMessageListenerContainer其默认实现会创建一个反应式 Pulsar 消费者,并连接一个使用创建的消费者的反应式消息管道。spring-doc.cadn.net.cn

6.3.2. ReactiveMessagePipeline

管道是底层 Apache Pulsar Reactive 客户端的一项功能,它以反应式方式完成接收数据,然后将其移交给提供的消息处理程序的繁重工作。反应式消息侦听器容器实现要简单得多,因为管道处理大部分工作。spring-doc.cadn.net.cn

6.3.3. ReactivePulsarMessageHandler

“listener” 方面由ReactivePulsarMessageHandler其中提供了两种实现:spring-doc.cadn.net.cn

如果在直接使用侦听器容器时未指定主题信息,则ReactivePulsarListener与一个例外一起使用,即省略了 “Message type default” 步骤。

6.4. 并发

在流式处理模式下使用记录 (stream = true) 并发性自然而然地通过客户端实现中的底层 Reactive 支持来实现。spring-doc.cadn.net.cn

但是,在逐个处理消息时,可以指定并发以提高处理吞吐量。 只需将concurrency属性@ReactivePulsarListener. 此外,当concurrency > 1你可以确保消息按键排序,从而通过设置useKeyOrderedProcessing = "true"在注释上。spring-doc.cadn.net.cn

同样,ReactiveMessagePipeline执行繁重的工作,我们只需为其设置 Properties。spring-doc.cadn.net.cn

反应式与命令式

反应式容器中的并发性与命令式容器不同。 后者创建多个线程(每个线程都有一个 Pulsar 消费者),而前者将消息同时分派给 Reactive 并行调度器上的多个处理程序实例。spring-doc.cadn.net.cn

反应式并发模型的一个优点是它可以与Exclusivesubscriptions 的 intent 请求,而命令式并发模型则不能。spring-doc.cadn.net.cn

6.5. Pulsar 头文件

Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。spring-doc.cadn.net.cn

6.5.1. 在 OneByOne Listener 中访问

以下示例显示了在使用逐个消息侦听器时如何访问 Pulsar Headers:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

在前面的示例中,我们访问messageIdmessage 元数据以及名为foo. Spring@Headerannotation 用于每个 Header 字段。spring-doc.cadn.net.cn

您还可以使用 Pulsar 的Message作为 envelope 来承载负载。 这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。 但是,为方便起见,您也可以使用Header注解。 请注意,您还可以使用 Spring 消息传递Messageenvelope 来携带 payload,然后使用@Header.spring-doc.cadn.net.cn

6.5.2. 访问 In Streaming Listener

使用流式消息侦听器时,标头支持是有限的。 只有当Flux包含 Springorg.springframework.messaging.Message元素中,将填充标题。 此外,弹簧@Headerannotation 不能用于检索数据。 您必须直接调用 Spring 消息上的相应方法来检索数据。spring-doc.cadn.net.cn

6.6. 消息确认

框架会自动处理消息确认。 但是,侦听器方法必须发送一个信号,指示消息是否已成功处理。 然后,容器实现使用该信号执行 ack 或 nack作。 这与它的命令式对应项略有不同,在命令式中,除非方法引发异常,否则信号被暗示为正。spring-doc.cadn.net.cn

6.6.1. OneByOne 监听器

单个消息(又名 OneByOne)消息侦听器方法返回一个Mono<Void>以指示消息是否已成功处理。Mono.empty()表示成功 (确认) 和Mono.error()表示失败(否定确认)。spring-doc.cadn.net.cn

6.6.2. 流式监听器

流式处理侦听器方法返回一个Flux<MessageResult<Void>>其中每个MessageResult元素表示已处理的消息,并保存消息 ID、值以及是否被确认。这MessageResult具有一组acknowledgenegativeAcknowledgestatic Factory 方法,可用于创建适当的MessageResult实例。spring-doc.cadn.net.cn

6.7. 消息重新投递和错误处理

Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。 我们将看看它们,并了解如何通过 Spring for Apache Pulsar 使用它们。spring-doc.cadn.net.cn

6.7.1. 鸣谢超时

默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。spring-doc.cadn.net.cn

你可以通过消费者定制器将此属性直接指定为 Pulsar 消费者属性,例如:spring-doc.cadn.net.cn

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeout", "60s");
}

6.7.2. 否定确认重新传递延迟

当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。 默认情况下,在一分钟内重新传送消息,但您可以通过使用者定制器进行更改,例如:spring-doc.cadn.net.cn

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

6.7.3. Dead Letter 主题

Apache Pulsar 允许应用程序在消费者上使用死信主题,并带有Shared订阅类型。 对于ExclusiveFailover订阅类型,则此功能不可用。 基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),则一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。 让我们通过检查一些代码片段来了解有关此功能的一些细节:spring-doc.cadn.net.cn

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeout", "1s");
    }
}

首先,我们有一个特殊的 beanDeadLetterPolicy,它被命名为deadLetterPolicy(您可以根据需要使用任何名称)。 这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 —my-dlq-topic,在本例中。 如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ在 Pulsar 中。 接下来,我们将此 bean 名称提供给ReactivePulsarListener通过设置deadLetterPolicy财产。 请注意,ReactivePulsarListener订阅类型的值为Shared,因为 DLQ 功能仅适用于共享订阅。 此代码主要用于演示目的,因此我们提供了一个ackTimeout值为 1 秒。 这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。 如果该周期持续 10 次(因为这是我们在DeadLetterPolicy),则 Pulsar consumer 会将消息发布到 DLQ 主题。 我们还有另一个ReactivePulsarListener侦听 DLQ 主题以在发布到 DLQ 主题时接收数据。spring-doc.cadn.net.cn

使用分区主题时有关 DLQ 主题的特别说明

如果 main topic 是分区的,那么在幕后,Pulsar 会将每个分区视为一个单独的 topic。 Pulsar 附加partition-<n>哪里n代表主主题名称的分区号。 问题是,如果你不指定 DLQ 主题(与我们上面所做的相反),Pulsar 会发布到一个默认的主题名称,该名称具有`partition-<n>info 中 — 例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ. 解决此问题的简单方法是始终提供 DLQ 主题名称。spring-doc.cadn.net.cn

6.8. Pulsar Reader 支持

该框架支持以响应式方式使用 Pulsar Reader,通过ReactivePulsarReaderFactory.spring-doc.cadn.net.cn

Spring Boot 提供了这个 reader 工厂,它可以配置任何spring.pulsar.reader.*应用程序属性。spring-doc.cadn.net.cn

7. 主题解析

在生成或使用消息时,需要目标主题。 框架会查找以下有序位置以确定主题(在第一次查找时停止):spring-doc.cadn.net.cn

当通过默认机制之一找到主题时,无需在 produce 或 consume API 上指定主题。spring-doc.cadn.net.cn

当找不到主题时,API 将相应地引发异常。spring-doc.cadn.net.cn

7.1. 用户指定

传递到正在使用的 API 中的主题具有最高优先级(例如。PulsarTemplate.send("my-topic", myMessage)@PulsarListener(topics = "my-topic").spring-doc.cadn.net.cn

7.2. 消息类型默认

当没有主题传递到 API 中时,系统会查找为正在生成或使用的消息类型配置的消息类型到主题映射。spring-doc.cadn.net.cn

映射可以使用spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml配置在消费或生成时使用的默认主题FooBar消息:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.Foo
          topic-name: foo-topic
        - message-type: com.acme.Bar
          topic-name: bar-topic
message-type是 Message 类的完全限定名称。
如果消息(或Publisherinput) 为null,框架将无法从中确定主题。如果您的应用程序可能会发送null消息。

7.2.1. 自定义主题解析程序

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,您可以通过证明自己的实现来替换默认解析器,例如:spring-doc.cadn.net.cn

@Bean
public MyTopicResolver topicResolver() {
	return new MyTopicResolver();
}

7.3. 生产者全局默认值

最终查询的位置(生产时)是系统范围的生产者默认主题。 它是通过spring.pulsar.producer.topic-name属性,并且spring.pulsar.reactive.sender.topic-name属性。spring-doc.cadn.net.cn

7.4. Consumer 全局默认值

查询的最后一个位置(使用时)是系统范围的使用者默认主题。 它是通过spring.pulsar.consumer.topicsspring.pulsar.consumer.topics-pattern属性,以及spring.pulsar.reactive.consumer.topicsspring.pulsar.reactive.consumer.topics-pattern属性。spring-doc.cadn.net.cn