1. @ReactivePulsarListener

对于 Pulsar 使用者,我们建议最终用户应用程序使用注解。 要使用 ,您需要使用注释。 当您使用 Spring Boot 支持时,它会自动启用此注解并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。ReactivePulsarListenerReactivePulsarListener@EnableReactivePulsar

让我们重新审视一下我们在快速浏览部分看到的代码片段:ReactivePulsarListener

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}
侦听器方法返回 a 以指示消息是否已成功处理。 表示成功(确认),表示失败(否定确认)。Mono<Void>Mono.empty()Mono.error()

您还可以进一步简化此方法:

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

在这种最基本的形式中,当不直接提供时,使用主题解析过程来确定目标主题。 同样,如果注释中未提供 ,则将使用自动生成的订阅名称。topicssubscriptionName@ReactivePulsarListener

在前面显示的方法中,我们以 ,但未指定任何模式类型。 在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。ReactivePulsarListenerString

框架检测到您期望该类型,然后根据该信息推断架构类型,并将该架构提供给使用者。 该框架对所有基元类型执行此推理。 对于所有非基元类型,默认架构假定为 JSON。 如果复杂类型使用 JSON 以外的任何内容(如 AVRO 或 KEY_VALUE),则必须使用该属性在批注上提供架构类型。StringschemaType

此示例显示了如何从主题中使用复杂类型:

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

让我们再看看几种消费方式。

此示例直接使用 Pulsar 消息:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

此示例使用包装在 Spring 消息信封中的记录:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

1.1. 流式处理

以上所有内容都是逐个使用单个记录的示例。 然而,使用 Reactive 的一个令人信服的原因是因为支持背压的流式处理功能。

以下示例用于使用POJO流:ReactivePulsarListener

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);

在这里,我们接收记录作为 Pulsar 消息。 此外,要在级别上启用流使用,您需要将注释上的属性设置为 。FluxReactivePulsarListenerstreamtrue

侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 它有一组静态工厂方法,可用于创建相应的实例。Flux<MessageResult<Void>>MessageResultMessageResult

根据 中消息的实际类型,框架会尝试推断要使用的架构。 如果它包含复杂类型,则仍需要提供 on .FluxschemaTypeReactivePulsarListener

以下侦听器使用复杂类型的 Spring 消息信封:Message

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。 它为 Spring 消息提供的功能与 Pulsar 消息的工厂方法集相同。Flux<MessageResult<Void>>MessageUtilsMessageResultMessageUtilsMessagResult
不支持在org.apache.pulsar.client.api.Messages<T>@ReactivePulsarListener

1.2. 配置 - 应用程序属性

侦听器依赖于 来创建和管理它用来使用消息的底层 Pulsar 消费者。 Spring Boot 提供了这个消费者工厂,您可以通过指定 spring.pulsar.consumer.* 应用程序属性来进一步配置它。出厂时的大多数已配置属性都将在侦听器中得到遵守,但以下情况除外ReactivePulsarConsumerFactory

该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}

1.3. 带有AUTO_CONSUME的通用记录

如果无法提前知道 Pulsar 主题的 schema 类型,可以使用该 schema 类型来消费通用记录。 在这种情况下,主题使用与主题关联的架构信息将消息反序列化为对象。AUTO_CONSUMEGenericRecord

要使用通用记录,请将 on your 并使用 Pulsar 消息类型作为消息参数,如下所示。schemaType = SchemaType.AUTO_CONSUME@ReactivePulsarListenerGenericRecord

@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
	return Mono.empty();
}
API 允许访问字段及其关联值GenericRecord

1.4. 消费者定制

您可以指定 a 来配置底层 Pulsar 消费者构建器,该构建器最终构建侦听器用于接收消息的消费者。ReactivePulsarListenerMessageConsumerBuilderCustomizer

请谨慎使用,因为这可以完全访问使用者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create

例如,以下代码演示如何将订阅的初始位置设置为主题上最早的消息。

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@ReactivePulsarListenerReactivePulsarListenerMessageConsumerBuilderCustomizer

您还可以使用定制器将 Pulsar 使用者属性直接提供给使用者构建器。 如果您不想使用前面提到的引导配置属性,或者具有多种配置不同的方法,则这很方便。ReactivePulsarListener

以下定制器示例使用直接的 Pulsar 使用者属性:

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 使用者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer
侦听器方法返回 a 以指示消息是否已成功处理。 表示成功(确认),表示失败(否定确认)。Mono<Void>Mono.empty()Mono.error()
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 它有一组静态工厂方法,可用于创建相应的实例。Flux<MessageResult<Void>>MessageResultMessageResult
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。 它为 Spring 消息提供的功能与 Pulsar 消息的工厂方法集相同。Flux<MessageResult<Void>>MessageUtilsMessageResultMessageUtilsMessagResult
不支持在org.apache.pulsar.client.api.Messages<T>@ReactivePulsarListener
该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}
API 允许访问字段及其关联值GenericRecord
请谨慎使用,因为这可以完全访问使用者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@ReactivePulsarListenerReactivePulsarListenerMessageConsumerBuilderCustomizer
使用的属性是直接的 Pulsar 使用者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer

2. 指定架构信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 . 对于非基元类型,如果未在注解中明确指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarListenerSchema.JSON

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。

2.1. 自定义模式映射

作为在复杂类型上指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需在侦听器上设置架构,因为框架会使用传入消息类型咨询解析器。ReactivePulsarListener

2.1.1. 配置属性

可以使用该属性配置架构映射。 以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSON

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
是邮件类的完全限定名称。message-type

2.1.2. 模式解析器定制器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。

下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:UserAddressAVROJSON

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

指定要用于特定消息类型的缺省模式信息的另一个选项是使用注释标记消息类。 可以通过注释上的属性指定架构信息。@PulsarMessageschemaType

以下示例将系统配置为在生成或使用以下类型的消息时使用 JSON 作为默认架构:Foo

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要在侦听器上设置架构,例如:

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
    System.out.println(user);
    return Mono.empty();
}
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。
是邮件类的完全限定名称。message-type

3. 消息侦听器容器基础设施

在大多数情况下,我们建议直接使用注释来使用 Pulsar 主题,因为该模型涵盖了广泛的应用程序用例。 但是,了解内部的工作方式很重要。ReactivePulsarListenerReactivePulsarListener

当您将 Spring 用于 Apache Pulsar 时,消息侦听器容器是消息使用的核心。 在后台使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。ReactivePulsarListener

3.1. ReactivePulsarMessageListenerContainer

此消息侦听器容器的协定是通过其默认实现提供的,该协定创建一个响应式 Pulsar 使用者,并连接使用创建的使用者的反应式消息管道。ReactivePulsarMessageListenerContainer

3.2. ReactiveMessagePipeline

管道是底层 Apache Pulsar 反应式客户端的一项功能,它执行以反应式方式接收数据,然后将其移交给提供的消息处理程序的繁重工作。反应式消息侦听器容器实现要简单得多,因为管道处理大部分工作。

3.3. ReactivePulsarMessage处理程序

“listener”方面由提供,其中提供了两个实现:ReactivePulsarMessageHandler

  • ReactivePulsarOneByOneMessageHandler- 逐条处理单个消息

  • ReactivePulsarStreamingHandler- 通过Flux

如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarListener
如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarListener

4. 并发

在流式处理模式下使用记录时,并发性自然会通过客户端实现中的底层反应式支持来实现。stream = true

但是,在逐个处理消息时,可以指定并发性以提高处理吞吐量。 只需将属性设置为 。 此外,当您可以确保消息按键排序,因此通过设置注释将消息发送到同一处理程序时。concurrency@ReactivePulsarListenerconcurrency > 1useKeyOrderedProcessing = "true"

同样,它做了繁重的工作,我们只是在它上面设置属性。ReactiveMessagePipeline

反应式与命令式

反应式容器中的并发性与其命令式容器中的并发性不同。 后者创建多个线程(每个线程都有一个 Pulsar 使用者),而前者将消息同时分派给反应式并行调度器上的多个处理程序实例。

反应式并发模型的一个优点是它可以与订阅一起使用,而命令式并发模型则不能。Exclusive

5. 脉冲星接头

Pulsar 消息元数据可以作为 Spring 消息头使用。 可以在 PulsarHeaders.java 中找到可用标头的列表。

5.1. 在 OneByOne 监听器中访问

以下示例显示了在使用逐个消息侦听器时如何访问 Pulsar Headers:

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

在前面的示例中,我们访问消息元数据的值以及名为 的自定义消息属性。 Spring 注解用于每个标题字段。messageIdfoo@Header

您也可以使用 Pulsar 作为包络来携带有效载荷。 这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。 但是,为方便起见,您也可以使用注释来检索它。 请注意,您还可以使用 Spring 消息信封来携带有效负载,然后使用 检索 Pulsar 标头。MessageHeaderMessage@Header

5.2. 在流式处理侦听器中访问

使用流式消息侦听器时,标头支持是有限的。 只有当包含 Spring 元素时,才会填充标头。 此外,Spring 注解不能用于检索数据。 您必须直接调用 Spring 消息上的相应方法来检索数据。Fluxorg.springframework.messaging.Message@Header

6. 消息确认

该框架自动处理消息确认。 但是,侦听器方法必须发送一个信号,指示消息是否已成功处理。 然后,容器实现使用该信号来执行 ack 或 nack 操作。 这与命令式对应物略有不同,除非方法抛出异常,否则信号被暗示为正信号。

6.1. OneByOne监听器

单条消息(又名 OneByOne)消息侦听器方法返回 a 以指示消息是否已成功处理。 表示成功(确认),表示失败(否定确认)。Mono<Void>Mono.empty()Mono.error()

6.2. 流式处理侦听器

流式处理侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。它具有一组静态工厂方法,可用于创建相应的实例。Flux<MessageResult<Void>>MessageResultMessageResultacknowledgenegativeAcknowledgeMessageResult

7. 邮件重新传递和错误处理

Apache Pulsar 为消息重新传递和错误处理提供了各种原生策略。 我们将看看它们,看看如何在 Spring 中将它们用于 Apache Pulsar。

7.1. 确认超时

默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值高于零,并且 Pulsar 使用者在该超时期限内未确认消息,则重新传递消息。

您可以通过消费者定制器将此属性直接指定为 Pulsar 使用者属性,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeout", "60s");
}

7.2. 否定确认重新交付延迟

当确认为否定时,Pulsar 使用者允许您指定应用程序希望如何重新传递消息。 默认设置是在一分钟内重新传递消息,但可以通过使用者定制器进行更改,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

7.3. 死信主题

Apache Pulsar 允许应用程序在具有订阅类型的使用者上使用死信主题。 对于 和 订阅类型,此功能不可用。 其基本思想是,如果一条消息被重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。 让我们通过检查一些代码片段来了解此功能的一些详细信息:SharedExclusiveFailover

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeout", "1s");
    }
}

首先,我们有一个特殊的 bean,它被命名为(它可以是任何你想要的名称)。 此 Bean 指定了许多内容,例如最大投放量(在本例中为 10)和死信主题的名称 — 在本例中。 如果未指定 DLQ 主题名称,则默认为 Pulsar 中的 DLQ 主题名称。 接下来,我们通过设置属性来提供此 Bean 名称。 请注意,订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。 此代码主要用于演示目的,因此我们提供 1 秒的值。 这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,它会重试。 如果该循环持续十次(因为这是我们在 中的最大重新交付计数),则 Pulsar 使用者将消息发布到 DLQ 主题。 我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicydeadLetterPolicymy-dlq-topic<topicname>-<subscriptionname>-DLQReactivePulsarListenerdeadLetterPolicyReactivePulsarListenerSharedackTimeoutDeadLetterPolicyReactivePulsarListener

使用分区主题时有关 DLQ 主题的特别说明

如果主主题被分区,则在后台,每个分区都会被 Pulsar 视为一个单独的主题。 Pulsar 将 ,其中 代表分区号附加到主主题名称中。 问题在于,如果你没有指定 DLQ 主题(与我们上面所做的相反),Pulsar 会发布一个包含此信息的默认主题名称,例如:. 解决此问题的简单方法是始终提供 DLQ 主题名称。partition-<n>n`partition-<n>topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ

8. Pulsar 阅读器支持

该框架支持通过 .ReactivePulsarReaderFactory

Spring Boot 提供了这个读取器工厂,可以使用任何 spring.pulsar.reader.* 应用程序属性进行配置。