该框架为几乎所有支持的功能提供了响应式对应物。

如果你把这个词放在一个提供的命令式组件的前面,你可能会发现它的响应式对应物。Reactive

  • PulsarTemplate → ReactivePulsarTemplate

  • PulsarListener → ReactivePulsarListener

  • PulsarConsumerFactory → ReactivePulsarConsumerFactory

  • 等。。

但是,尚不支持以下内容:

  • 非共享订阅中的错误处理

  • 在流模式下访问 Pulsar 标头@Header

  • 观察

如果你把这个词放在一个提供的命令式组件的前面,你可能会发现它的响应式对应物。Reactive

  • PulsarTemplate → ReactivePulsarTemplate

  • PulsarListener → ReactivePulsarListener

  • PulsarConsumerFactory → ReactivePulsarConsumerFactory

  • 等。。

1. 前言

对于基于 Apache Pulsar 的应用程序,我们建议对 Spring 使用 Spring-Boot-First 方法,因为这极大地简化了事情。 为此,您可以将模块添加为依赖项。spring-pulsar-reactive-spring-boot-starter
本参考的大部分内容都希望读者使用启动器,并给出了大多数配置说明,并牢记这一点。 但是,当指令特定于 Spring Boot 入门器用法时,会努力调用。
对于基于 Apache Pulsar 的应用程序,我们建议对 Spring 使用 Spring-Boot-First 方法,因为这极大地简化了事情。 为此,您可以将模块添加为依赖项。spring-pulsar-reactive-spring-boot-starter
本参考的大部分内容都希望读者使用启动器,并给出了大多数配置说明,并牢记这一点。 但是,当指令特定于 Spring Boot 入门器用法时,会努力调用。

2. 快速浏览

我们将通过展示一个以反应式方式生产和使用的示例 Spring Boot 应用程序,快速浏览 Spring 对 Apache Pulsar 的响应式支持。 这是一个完整的应用程序,不需要任何额外的配置,只要您在默认位置 - 上运行一个 Pulsar 集群。localhost:6650

2.1. 依赖关系

Spring Boot 应用程序只需要依赖项。以下列表分别显示了如何定义 Maven 和 Gradle 的依赖项:spring-pulsar-reactive-spring-boot-starter

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar-reactive</artifactId>
        <version>3.2.6</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive:3.2.6'
}

使用上述坐标时,请按如下方式更改:Version 0.2.x

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-reactive-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-reactive-spring-boot-starter:0.2.0'
}

2.2. 应用程序代码

以下是应用程序源代码:

@SpringBootApplication
public class ReactiveSpringPulsarHelloWorld {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveSpringPulsarHelloWorld.class, args);
    }

    @Bean
    ApplicationRunner runner(ReactivePulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Reactive Pulsar World!").subscribe();
    }

    @ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
    Mono<Void> listen(String message) {
        System.out.println("Reactive listener received: " + message);
        return Mono.empty();
    }
}

就是这样,只需几行代码,我们就有一个可以工作的 Spring Boot 应用程序,它以响应式方式生成和使用来自 Pulsar 主题的消息。

启动后,应用程序使用 a 向 . 然后,它从使用 .ReactivePulsarTemplatehello-pulsar-topichello-pulsar-topic@ReactivePulsarListener

简单性的关键要素之一是 Spring Boot 启动器,它可以自动配置并为应用程序提供所需的组件

使用上述坐标时,请按如下方式更改:Version 0.2.x

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-reactive-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-reactive-spring-boot-starter:0.2.0'
}
简单性的关键要素之一是 Spring Boot 启动器,它可以自动配置并为应用程序提供所需的组件

3. 设计

以下是需要牢记的几个关键设计点。

3.1. Apache Pulsar 反应式

响应式支持最终由 Apache Pulsar 响应式客户端提供,其当前实现是围绕常规 Pulsar 客户端的异步 API 的完全非阻塞适配器。 这意味着反应式客户端需要常规客户端。

3.2. 附加组件自动配置

由于依赖于常规(命令式)客户端,框架提供的响应式自动配置是对命令式自动配置的补充。 换句话说,命令式启动器仅包括命令式组件,而反应式启动器同时包括命令式和反应式组件。

4. 反应式 Pulsar 客户端

当您使用 Reactive Pulsar Spring Boot Starter 时,您将获得自动配置。ReactivePulsarClient

默认情况下,应用程序会尝试连接到位于 的本地 Pulsar 实例。 这可以通过将属性设置为不同的值来调整。pulsar://localhost:6650spring.pulsar.client.service-url

该值必须是有效的 Pulsar 协议 URL

还有许多其他应用程序属性(继承自适应的命令式客户端)可供配置。 请参阅 spring.pulsar.client.* 应用程序属性。

4.1. 身份验证

要连接到需要身份验证的 Pulsar 集群,请按照与命令式客户端相同的步骤操作。 同样,这是因为反应式客户端会适应处理所有安全配置的命令式客户端。

该值必须是有效的 Pulsar 协议 URL

5. 消息制作

5.1. ReactivePulsar模板

在 Pulsar 生产者端,Spring Boot 自动配置提供了用于发布记录的功能。该模板实现一个名为的接口,并提供通过其协定发布记录的方法。ReactivePulsarTemplateReactivePulsarOperations

该模板提供了接受单个消息并返回 . 它还提供了接受多条消息(以 ReactiveStreams 类型的形式)并返回 .Mono<MessageId>PublisherFlux<MessageId>

对于不包含主题参数的 API 变体,使用主题解析过程来确定目标主题。

5.1.1. Fluent API

该模板提供了一个流畅的构建器来处理更复杂的发送请求。

5.1.2. 消息自定义

您可以指定 a 来配置传出消息。例如,以下代码演示如何发送键控消息:MessageSpecBuilderCustomizer

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

5.1.3. 发送方自定义

您可以指定一个来配置底层 Pulsar 发送方构建器,该构建器最终构建用于发送传出消息的发送方。ReactiveMessageSenderBuilderCustomizer

请谨慎使用,因为这样可以完全访问发送方构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create

例如,以下代码演示如何禁用批处理和启用分块:

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

另一个示例演示在将记录发布到分区主题时如何使用自定义路由。 在发件人生成器上指定自定义实现,例如:MessageRouter

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();
请注意,使用 时,唯一有效的设置是 。MessageRouterspring.pulsar.producer.message-routing-modecustom

5.2. 指定模式信息

如果使用 Java 基元类型,则框架会自动检测模式,并且无需指定任何模式类型即可发布数据。 对于非基元类型,如果在 上调用发送操作时未显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarTemplateSchema.JSON

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。

5.2.1. 自定义模式映射

作为在对复杂类型调用发送操作时指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需指定架构,因为框架会使用传出消息类型咨询解析程序。ReactivePulsarTemplate

可以使用该属性配置架构映射。 以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSON

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
是邮件类的完全限定名称。message-type

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。

下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:UserAddressAVROJSON

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

使用此配置后,无需在发送操作上设置指定架构。

5.3. 反应性脉冲星SenderFactory

它依赖于 a 来实际创建基础发送方。ReactivePulsarTemplateReactivePulsarSenderFactory

Spring Boot 提供了这个发送方工厂,可以使用任何 spring.pulsar.producer.* 应用程序属性进行配置。

如果在直接使用发送方工厂 API 时未指定主题信息,则使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarTemplate

5.3.1. 生产者缓存

每个底层的 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建生产者,底层 Apache Pulsar 反应式客户端会缓存它创建的生产者。 它们以 LRU 方式缓存,并在配置的时间段内未使用时逐出。ReactiveMessageSenderCache

您可以通过指定任何spring.pulsar.producer.cache.*应用程序属性来配置缓存设置。

对于不包含主题参数的 API 变体,使用主题解析过程来确定目标主题。
请谨慎使用,因为这样可以完全访问发送方构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create
请注意,使用 时,唯一有效的设置是 。MessageRouterspring.pulsar.producer.message-routing-modecustom
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。
是邮件类的完全限定名称。message-type
如果在直接使用发送方工厂 API 时未指定主题信息,则使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarTemplate

6. 消息消费

6.1. @ReactivePulsarListener

对于 Pulsar 使用者,我们建议最终用户应用程序使用注解。 要使用 ,您需要使用注释。 当您使用 Spring Boot 支持时,它会自动启用此注解并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。ReactivePulsarListenerReactivePulsarListener@EnableReactivePulsar

让我们重新审视一下我们在快速浏览部分看到的代码片段:ReactivePulsarListener

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}
侦听器方法返回 a 以指示消息是否已成功处理。 表示成功(确认),表示失败(否定确认)。Mono<Void>Mono.empty()Mono.error()

您还可以进一步简化此方法:

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

在这种最基本的形式中,当不直接提供时,使用主题解析过程来确定目标主题。 同样,如果注释中未提供 ,则将使用自动生成的订阅名称。topicssubscriptionName@ReactivePulsarListener

在前面显示的方法中,我们以 ,但未指定任何模式类型。 在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。 框架检测到您期望该类型,然后根据该信息推断架构类型。 然后,它将该架构提供给使用者。 对于 Java 中的所有基元类型,框架都会进行此推理。 对于任何复杂类型(如 JSON、AVRO 等),框架无法执行此推理,用户需要使用该属性在注释上提供架构类型。ReactivePulsarListenerStringStringschemaType

此示例显示了如何从主题中使用复杂类型:

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

请注意,在 上添加了一个属性。 这是因为库无法从提供的类型推断架构类型: 。我们必须告诉框架要使用什么模式。schemaTypeReactivePulsarListenerFoo

让我们再看看几种消费方式。

此示例直接使用 Pulsar 消息:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

此示例使用包装在 Spring 消息信封中的记录:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

6.1.1. 流式处理

以上所有内容都是逐个使用单个记录的示例。 然而,使用 Reactive 的一个令人信服的原因是因为支持背压的流式处理功能。

以下示例用于使用POJO流:ReactivePulsarListener

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);

在这里,我们接收记录作为 Pulsar 消息。 此外,要在级别上启用流使用,您需要将注释上的属性设置为 。FluxReactivePulsarListenerstreamtrue

侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 它有一组静态工厂方法,可用于创建相应的实例。Flux<MessageResult<Void>>MessageResultMessageResult

根据 中消息的实际类型,框架会尝试推断要使用的架构。 如果它包含复杂类型,则仍需要提供 on .FluxschemaTypeReactivePulsarListener

以下侦听器使用复杂类型的 Spring 消息信封:Message

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。Flux<MessageResult<Void>>MessageUtilsMessageResult

6.1.2. 配置 - 应用程序属性

侦听器依赖于 来创建和管理它用来使用消息的底层 Pulsar 消费者。 Spring Boot 提供了这个消费者工厂,您可以通过指定 spring.pulsar.consumer.* 应用程序属性来进一步配置它。出厂时的大多数已配置属性都将在侦听器中得到遵守,但以下情况除外ReactivePulsarConsumerFactory

该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}

6.1.3. 消费者定制

您可以指定 a 来配置底层 Pulsar 消费者构建器,该构建器最终构建侦听器用于接收消息的消费者。ReactivePulsarListenerMessageConsumerBuilderCustomizer

请谨慎使用,因为这可以完全访问使用者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create

例如,以下代码演示如何将订阅的初始位置设置为主题上最早的消息。

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@ReactivePulsarListenerReactivePulsarListenerMessageConsumerBuilderCustomizer

您还可以使用定制器将 Pulsar 使用者属性直接提供给使用者构建器。 如果您不想使用前面提到的引导配置属性,或者具有多种配置不同的方法,则这很方便。ReactivePulsarListener

以下定制器示例使用直接的 Pulsar 使用者属性:

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 使用者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer

6.2. 指定模式信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 . 对于非基元类型,如果未在注解中明确指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarListenerSchema.JSON

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。

6.2.1. 自定义模式映射

作为在复杂类型上指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需在侦听器上设置架构,因为框架会使用传入消息类型咨询解析器。ReactivePulsarListener

可以使用该属性配置架构映射。 以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSON

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
是邮件类的完全限定名称。message-type

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。

下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:UserAddressAVROJSON

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

有了这个配置,就不需要在侦听器上设置架构,例如:

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
    System.out.println(user);
    return Mono.empty();
}

6.3. 消息侦听器容器基础架构

在大多数情况下,我们建议直接使用注释来使用 Pulsar 主题,因为该模型涵盖了广泛的应用程序用例。 但是,了解内部的工作方式很重要。ReactivePulsarListenerReactivePulsarListener

当您将 Spring 用于 Apache Pulsar 时,消息侦听器容器是消息使用的核心。 在后台使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。ReactivePulsarListener

6.3.1. ReactivePulsarMessageListenerContainer

此消息侦听器容器的协定是通过其默认实现提供的,该协定创建一个响应式 Pulsar 使用者,并连接使用创建的使用者的反应式消息管道。ReactivePulsarMessageListenerContainer

6.3.2. ReactiveMessagePipeline

管道是底层 Apache Pulsar 反应式客户端的一项功能,它执行以反应式方式接收数据,然后将其移交给提供的消息处理程序的繁重工作。反应式消息侦听器容器实现要简单得多,因为管道处理大部分工作。

6.3.3. ReactivePulsarMessageHandler

“listener”方面由提供,其中提供了两个实现:ReactivePulsarMessageHandler

  • ReactivePulsarOneByOneMessageHandler- 逐条处理单个消息

  • ReactivePulsarStreamingHandler- 通过Flux

如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarListener

6.4. 并发

在流式处理模式下使用记录时,并发性自然会通过客户端实现中的底层反应式支持来实现。stream = true

但是,在逐个处理消息时,可以指定并发性以提高处理吞吐量。 只需将属性设置为 。 此外,当您可以确保消息按键排序,因此通过设置注释将消息发送到同一处理程序时。concurrency@ReactivePulsarListenerconcurrency > 1useKeyOrderedProcessing = "true"

同样,它做了繁重的工作,我们只是在它上面设置属性。ReactiveMessagePipeline

反应式与命令式

反应式容器中的并发性与其命令式容器中的并发性不同。 后者创建多个线程(每个线程都有一个 Pulsar 使用者),而前者将消息同时分派给反应式并行调度器上的多个处理程序实例。

反应式并发模型的一个优点是它可以与订阅一起使用,而命令式并发模型则不能。Exclusive

6.5. 脉冲星接头

Pulsar 消息元数据可以作为 Spring 消息头使用。 可以在 PulsarHeaders.java 中找到可用标头的列表。

6.5.1. 在 OneByOne 监听器中访问

以下示例显示了在使用逐个消息侦听器时如何访问 Pulsar Headers:

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

在前面的示例中,我们访问消息元数据的值以及名为 的自定义消息属性。 Spring 注解用于每个标题字段。messageIdfoo@Header

您也可以使用 Pulsar 作为包络来携带有效载荷。 这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。 但是,为方便起见,您也可以使用注释来检索它。 请注意,您还可以使用 Spring 消息信封来携带有效负载,然后使用 检索 Pulsar 标头。MessageHeaderMessage@Header

6.5.2. 在流式处理侦听器中访问

使用流式消息侦听器时,标头支持是有限的。 只有当包含 Spring 元素时,才会填充标头。 此外,Spring 注解不能用于检索数据。 您必须直接调用 Spring 消息上的相应方法来检索数据。Fluxorg.springframework.messaging.Message@Header

6.6. 消息确认

该框架自动处理消息确认。 但是,侦听器方法必须发送一个信号,指示消息是否已成功处理。 然后,容器实现使用该信号来执行 ack 或 nack 操作。 这与命令式对应物略有不同,除非方法抛出异常,否则信号被暗示为正信号。

6.6.1. OneByOne监听器

单条消息(又名 OneByOne)消息侦听器方法返回 a 以指示消息是否已成功处理。 表示成功(确认),表示失败(否定确认)。Mono<Void>Mono.empty()Mono.error()

6.6.2. 流式处理侦听器

流式处理侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。它具有一组静态工厂方法,可用于创建相应的实例。Flux<MessageResult<Void>>MessageResultMessageResultacknowledgenegativeAcknowledgeMessageResult

6.7. 消息重新传递和错误处理

Apache Pulsar 为消息重新传递和错误处理提供了各种原生策略。 我们将看看它们,看看如何在 Spring 中将它们用于 Apache Pulsar。

6.7.1. 确认超时

默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值高于零,并且 Pulsar 使用者在该超时期限内未确认消息,则重新传递消息。

您可以通过消费者定制器将此属性直接指定为 Pulsar 使用者属性,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeout", "60s");
}

6.7.2. 否定确认重新交付延迟

当确认为否定时,Pulsar 使用者允许您指定应用程序希望如何重新传递消息。 默认设置是在一分钟内重新传递消息,但可以通过使用者定制器进行更改,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

6.7.3. 死信主题

Apache Pulsar 允许应用程序在具有订阅类型的使用者上使用死信主题。 对于 和 订阅类型,此功能不可用。 其基本思想是,如果一条消息被重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。 让我们通过检查一些代码片段来了解此功能的一些详细信息:SharedExclusiveFailover

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeout", "1s");
    }
}

首先,我们有一个特殊的 bean,它被命名为(它可以是任何你想要的名称)。 此 Bean 指定了许多内容,例如最大投放量(在本例中为 10)和死信主题的名称 — 在本例中。 如果未指定 DLQ 主题名称,则默认为 Pulsar 中的 DLQ 主题名称。 接下来,我们通过设置属性来提供此 Bean 名称。 请注意,订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。 此代码主要用于演示目的,因此我们提供 1 秒的值。 这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,它会重试。 如果该循环持续十次(因为这是我们在 中的最大重新交付计数),则 Pulsar 使用者将消息发布到 DLQ 主题。 我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicydeadLetterPolicymy-dlq-topic<topicname>-<subscriptionname>-DLQReactivePulsarListenerdeadLetterPolicyReactivePulsarListenerSharedackTimeoutDeadLetterPolicyReactivePulsarListener

使用分区主题时有关 DLQ 主题的特别说明

如果主主题被分区,则在后台,每个分区都会被 Pulsar 视为一个单独的主题。 Pulsar 将 ,其中 代表分区号附加到主主题名称中。 问题在于,如果你没有指定 DLQ 主题(与我们上面所做的相反),Pulsar 会发布一个包含此信息的默认主题名称,例如:. 解决此问题的简单方法是始终提供 DLQ 主题名称。partition-<n>n`partition-<n>topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ

6.8. Pulsar Reader 支持

该框架支持通过 .ReactivePulsarReaderFactory

Spring Boot 提供了这个读取器工厂,可以使用任何 spring.pulsar.reader.* 应用程序属性进行配置。

侦听器方法返回 a 以指示消息是否已成功处理。 表示成功(确认),表示失败(否定确认)。Mono<Void>Mono.empty()Mono.error()
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 它有一组静态工厂方法,可用于创建相应的实例。Flux<MessageResult<Void>>MessageResultMessageResult
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。Flux<MessageResult<Void>>MessageUtilsMessageResult
该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}
请谨慎使用,因为这可以完全访问使用者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@ReactivePulsarListenerReactivePulsarListenerMessageConsumerBuilderCustomizer
使用的属性是直接的 Pulsar 使用者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。
是邮件类的完全限定名称。message-type
如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarListener

7. 主题解决

生成或使用消息时需要目标主题。 该框架在以下有序位置中查找以确定主题(在第一次找到时停止):

  • 用户指定

  • 消息类型默认

  • 全局默认值

通过默认机制之一找到主题时,无需在 produce or consume API 上指定该主题。

当找不到主题时,API 将相应地抛出异常。

7.1. 用户指定

传递到正在使用的 API 的主题具有最高优先级(例如。 或 )。PulsarTemplate.send("my-topic", myMessage)@PulsarListener(topics = "my-topic"

7.2. 消息类型默认

当没有主题传递到 API 中时,系统会查找为正在生成或使用的消息类型配置的消息类型到主题的映射。

可以使用该属性配置映射。 以下示例用于配置在使用或生成消息时使用的默认主题:spring.pulsar.defaults.type-mappingsapplication.ymlFooBar

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.Foo
          topic-name: foo-topic
        - message-type: com.acme.Bar
          topic-name: bar-topic
是邮件类的完全限定名称。message-type
如果消息(或输入的第一条消息)是 ,则框架将无法从中确定主题。如果应用程序可能发送消息,则应使用另一种方法指定主题。Publishernullnull

7.2.1. 自定义主题解析器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以通过证明自己的实现来替换默认解析程序,例如:

@Bean
public MyTopicResolver topicResolver() {
	return new MyTopicResolver();
}

7.3. 生产者全局默认

(生产时)参考的最终位置是系统范围的生产者默认主题。 使用命令式 API 时通过属性配置,使用反应式 API 时通过属性配置。spring.pulsar.producer.topic-namespring.pulsar.reactive.sender.topic-name

7.4. 消费者全局默认

咨询的最终位置(使用时)是系统范围的使用者默认主题。 使用命令式 API 时通过 or 属性配置它,使用反应式 API 时通过 or 属性之一配置它。spring.pulsar.consumer.topicsspring.pulsar.consumer.topics-patternspring.pulsar.reactive.consumer.topicsspring.pulsar.reactive.consumer.topics-pattern

是邮件类的完全限定名称。message-type
如果消息(或输入的第一条消息)是 ,则框架将无法从中确定主题。如果应用程序可能发送消息,则应使用另一种方法指定主题。Publishernullnull