目录

1. 前言

对于基于 Apache Pulsar 的应用程序,我们建议对 Spring 使用 Spring-Boot-First 方法,因为这极大地简化了事情。 为此,您可以将模块添加为依赖项。spring-pulsar-spring-boot-starter
本参考的大部分内容都希望读者使用启动器,并给出了大多数配置说明,并牢记这一点。 但是,当指令特定于 Spring Boot 入门器用法时,会努力调用。
对于基于 Apache Pulsar 的应用程序,我们建议对 Spring 使用 Spring-Boot-First 方法,因为这极大地简化了事情。 为此,您可以将模块添加为依赖项。spring-pulsar-spring-boot-starter
本参考的大部分内容都希望读者使用启动器,并给出了大多数配置说明,并牢记这一点。 但是,当指令特定于 Spring Boot 入门器用法时,会努力调用。

2. 快速浏览

我们将通过展示一个生产和消费的示例Spring Boot应用程序来快速浏览Apache Pulsar的Spring。 这是一个完整的应用程序,不需要任何额外的配置,只要您在默认位置 - 上运行一个 Pulsar 集群。localhost:6650

2.1. 依赖关系

Spring Boot 应用程序只需要依赖项。以下列表分别显示了如何定义 Maven 和 Gradle 的依赖项:spring-pulsar-spring-boot-starter

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar</artifactId>
        <version>3.2.6</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-pulsar:3.2.6'
}

使用上述坐标时,请按如下方式更改:Version 0.2.x

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0'
}

2.2. 应用程序代码

以下列表显示了示例的 Spring Boot 应用程序案例:

@SpringBootApplication
public class PulsarBootHelloWorld {

    public static void main(String[] args) {
        SpringApplication.run(PulsarBootHelloWorld.class, args);
    }

    @Bean
    ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!");
    }

    @PulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
    void listen(String message) {
        System.out.println("Message Received: " + message);
    }
}

让我们快速浏览此应用程序的更高层次的详细信息。 在文档的后面,我们将更详细地看到这些组件。

在前面的示例中,我们严重依赖 Spring Boot 自动配置。 Spring Boot 为我们的应用程序自动配置多个组件。 它自动为应用程序提供生产者和使用者都使用的 。PulsarClient

Spring Boot 还会自动配置,我们将其注入应用程序并开始向 Pulsar 主题发送记录。 应用程序将消息发送到名为 的主题。 请注意,应用程序不会指定任何架构信息,因为 Spring for Apache Pulsar 库会自动从您发送的数据类型推断架构类型。PulsarTemplatehello-pulsar

我们使用注释从发布数据的主题中消费。 是一个方便的注解,用于包装 Spring for Apache Pulsar 的消息监听器容器基础设施。 在后台,它创建了一个消息侦听器容器来创建和管理 Pulsar 消费者。 与常规的 Pulsar 使用者一样,使用时的默认订阅类型是模式。 当记录发布到主题中时,它们会使用它们并在控制台上打印它们。 在本例中,该框架还从该方法用作有效负载的数据类型 — 中推断出使用的架构类型。PulsarListenerhello-pulsarPulsarListenerPulsarListenerExclusivehello-pulsarPulsarlistenerPulsarListnerString

使用上述坐标时,请按如下方式更改:Version 0.2.x

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0'
}

3. Pulsar 客户端

当您使用 Pulsar Spring Boot Starter 时,您将获得自动配置。PulsarClient

默认情况下,应用程序会尝试连接到位于 的本地 Pulsar 实例。 这可以通过将属性设置为不同的值来调整。pulsar://localhost:6650spring.pulsar.client.service-url

该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何 spring.pulsar.client.* 应用程序属性来进一步配置客户端。

如果您不使用启动器,则需要自行配置和注册。 有一个接受构建器定制器,可用于帮助解决此问题。PulsarClientDefaultPulsarClientFactory

3.1. TLS加密(SSL)

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务进行通信。 下面介绍如何配置 Pulsar 客户端使用 TLS 加密 (SSL)。 先决条件是代理也已配置为使用 TLS 加密。

Spring Boot 自动配置当前不支持任何 TLS/SSL 配置属性。 你可以改为提供一个在 Pulsar 客户端构建器上设置必要属性的属性。 Pulsar 支持隐私增强邮件 (PEM) 和 Java KeyStore (JKS) 证书格式。PulsarClientBuilderCustomizer

按照以下步骤配置 TLS:

  1. 调整 Pulsar 客户端服务 url 以使用方案和 TLS 端口(通常为 )。pulsar+ssl://6651

  2. 调整管理客户端服务 URL 以使用方案和 TLS Web 端口(通常为 )。https://8443

  3. 提供客户端构建器定制器,用于设置构建器上的相关属性。

您可以在官方 Pulsar TLS 加密文档中找到有关上述内容的更多信息。

3.2. 身份验证

要连接到需要身份验证的 Pulsar 集群,您需要指定要使用的身份验证插件以及指定插件所需的任何参数。 使用 Spring Boot 自动配置时,您可以通过配置属性设置插件和插件参数(在大多数情况下)。

您需要确保 下定义的名称与您的身份验证插件(通常是驼峰大小写)所期望的名称完全匹配。 Spring Boot 不会尝试对这些条目进行任何形式的宽松绑定。spring.pulsar.client.authentication.param.*

例如,如果要配置身份验证插件的颁发者 URL,则必须使用 . 如果您使用其他形式,例如 或 ,则该设置将不会应用于插件。AuthenticationOAuth2spring.pulsar.client.authentication.param.issuerUrlissuerurlissuer-url

将环境变量用于身份验证参数通常是有问题的,因为在转换过程中会丢失区分大小写。 例如,考虑通过环境变量设置的以下身份验证参数:issuerUrl

SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL=https://some.server.com

当Spring Boot加载此属性时,它将使用(小写)而不是预期的(驼峰大小写)。 您可以通过将 env var 的值用作application.yml中相关 auth 属性的值来绕过此限制。 继续上面的例子:issuerurlissuerUrl

spring:
  pulsar:
    client:
      authentication:
        param:
          issuerUrl: ${SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL}

当不使用 Spring Boot 自动配置时,您可以使用 创建身份验证,然后直接在提供给客户端工厂的客户端定制器中的 Pulsar 客户端构建器中设置它。org.apache.pulsar.client.api.AuthenticationFactory

以下清单显示了如何配置每个受支持的身份验证机制。

点击这里查看雅典娜
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz
        param:
          tenantDomain: ...
          tenantService: ...
          providerDomain: ...
          privateKey: ...
          keyId: ...
这也需要 TLS 加密
点击这里获取令牌
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
        param:
          token: some-token-goes-here
单击此处查看基本
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic
        param:
          userId: ...
          password: ...
单击此处获取 OAuth2
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: ...
          privateKey: ...
          audience: ...
          scope: ...
单击此处查看 Sasl
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl
        param:
          saslJaasClientSectionName: ...
          serverType: ...
单击此处获取 mTLS (PEM)
由于此选项需要 TLS 加密,而 TLS 加密已要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。 您可以使用 来帮助创建身份验证对象,如下所示:org.apache.pulsar.client.api.AuthenticationFactory
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");

请参阅 mTLS (PEM) 的官方 Pulsar 文档。

单击此处获取 mTLS (JKS)
由于此选项需要 TLS 加密,而 TLS 加密已要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。 您可以使用 来帮助创建身份验证对象,如下所示:org.apache.pulsar.client.api.AuthenticationFactory
Authentication auth = AuthenticationFactory.create(
        "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
        Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));

请参阅 mTLS (JKS) 上的官方 Pulsar 文档。

您可以在 Pulsar 官方安全文档中找到有关每个支持插件及其所需属性的更多信息。

该值必须是有效的 Pulsar 协议 URL
如果您不使用启动器,则需要自行配置和注册。 有一个接受构建器定制器,可用于帮助解决此问题。PulsarClientDefaultPulsarClientFactory

您需要确保 下定义的名称与您的身份验证插件(通常是驼峰大小写)所期望的名称完全匹配。 Spring Boot 不会尝试对这些条目进行任何形式的宽松绑定。spring.pulsar.client.authentication.param.*

例如,如果要配置身份验证插件的颁发者 URL,则必须使用 . 如果您使用其他形式,例如 或 ,则该设置将不会应用于插件。AuthenticationOAuth2spring.pulsar.client.authentication.param.issuerUrlissuerurlissuer-url

将环境变量用于身份验证参数通常是有问题的,因为在转换过程中会丢失区分大小写。 例如,考虑通过环境变量设置的以下身份验证参数:issuerUrl

SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL=https://some.server.com

当Spring Boot加载此属性时,它将使用(小写)而不是预期的(驼峰大小写)。 您可以通过将 env var 的值用作application.yml中相关 auth 属性的值来绕过此限制。 继续上面的例子:issuerurlissuerUrl

spring:
  pulsar:
    client:
      authentication:
        param:
          issuerUrl: ${SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL}
这也需要 TLS 加密
由于此选项需要 TLS 加密,而 TLS 加密已要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。 您可以使用 来帮助创建身份验证对象,如下所示:org.apache.pulsar.client.api.AuthenticationFactory
由于此选项需要 TLS 加密,而 TLS 加密已要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。 您可以使用 来帮助创建身份验证对象,如下所示:org.apache.pulsar.client.api.AuthenticationFactory

4. 消息制作

4.1. 脉冲星模板

在 Pulsar 生产者端,Spring Boot 自动配置提供了用于发布记录的功能。该模板实现一个名为的接口,并提供通过其协定发布记录的方法。PulsarTemplatePulsarOperations

这些发送 API 方法分为两类:和 。 这些方法使用 Pulsar 生产者上的同步发送功能来阻止调用。 它们返回在代理上保留消息后发布的消息。 方法调用是非阻塞的异步调用。 它们返回一个 ,您可以使用它来在消息发布后异步接收消息 ID。sendsendAsyncsendMessageIdsendAsyncCompletableFuture

对于不包含主题参数的 API 变体,使用主题解析过程来确定目标主题。

4.1.1. 简单API

该模板为简单的发送请求提供了一些方法(“send”为前缀)。对于更复杂的发送请求,Fluent API 允许您配置更多选项。

4.1.2. Fluent API

该模板提供了一个流畅的构建器来处理更复杂的发送请求。

4.1.3. 消息自定义

您可以指定 a 来配置传出消息。例如,以下代码演示如何发送键控消息:TypedMessageBuilderCustomizer

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

4.1.4. 生产者自定义

您可以指定一个来配置底层的 Pulsar 生产者构建器,该构建器最终构建用于发送传出消息的生产者。ProducerBuilderCustomizer

请谨慎使用,因为这样可以完全访问生产者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create

例如,以下代码演示如何禁用批处理和启用分块:

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

另一个示例演示在将记录发布到分区主题时如何使用自定义路由。 在生成器上指定自定义实现,例如:MessageRouterProducer

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
请注意,使用 时,唯一有效的设置是 。MessageRouterspring.pulsar.producer.message-routing-modecustom

另一个示例显示了如何添加一个,该消息将在发布到代理之前拦截和更改生产者收到的消息:ProducerInterceptor

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

定制器将仅适用于用于发送操作的创建者。 如果要将定制器应用于所有生产者,则必须将其提供给生产者工厂,如全局生产者自定义中所述。

使用 Lambda 定制工具时,必须遵循“Lambda 定制工具注意事项”中描述的规则。

4.2. 指定模式信息

如果使用 Java 基元类型,则框架会自动检测模式,并且无需指定任何模式类型即可发布数据。 对于非基元类型,如果在 上调用发送操作时未显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。PulsarTemplateSchema.JSON

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。

4.2.1. 自定义模式映射

作为在对复杂类型调用发送操作时指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需指定架构,因为框架会使用传出消息类型咨询解析程序。PulsarTemplate

可以使用该属性配置架构映射。 以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSON

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
是邮件类的完全限定名称。message-type

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。

下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:UserAddressAVROJSON

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

使用此配置后,无需在发送操作上设置指定架构。

4.3. Pulsar 生产者工厂

它依赖于 a 来实际创建基础生产者。 Spring Boot 自动配置还提供了此生产者工厂,您可以通过指定任何 spring.pulsar.producer.* 应用程序属性来进一步配置该工厂。PulsarTemplatePulsarProducerFactory

如果在直接使用生产者工厂 API 时未指定主题信息,则使用相同的主题解析过程,但省略了“消息类型默认值”步骤。PulsarTemplate

4.3.1. 全局生产者定制

该框架提供了合约,允许您配置用于构造每个生产者的底层构建器。 若要自定义所有生产者,可以将定制器列表传递到构造函数中。 使用多个定制器时,将按它们在列表中的显示顺序应用它们。ProducerBuilderCustomizerPulsarProducerFactory

如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给 ,根据其注解排序。PulsarProducerFactory@Order

如果只想将定制器应用于单个生产者,则可以使用 Fluent API 并在发送时指定定制器

4.4. Pulsar 生产者缓存

每个底层的 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建生产者,生产者工厂会缓存其创建的生产者。 它们以 LRU 方式缓存,并在配置的时间段内未使用时逐出。 缓存键由足够的信息组成,以确保调用方在后续创建请求中返回相同的生产者。

此外,您可以通过指定任何spring.pulsar.producer.cache.*应用程序属性来配置缓存设置。

4.4.1. Lambda 定制程序的注意事项

任何用户提供的生产者定制器也包含在缓存键中。 由于缓存键依赖于 的有效实现,因此在使用 Lambda 定制程序时必须小心。equals/hashCode

统治:且仅当两个作为 Lambda 实现的定制器使用相同的 Lambda 实例并且不需要在其闭包之外定义任何变量时,它们才会匹配。equals/hashCode

为了阐明上述规则,我们将看几个例子。 在以下示例中,定制器定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。此外,它不需要闭包之外的变量。因此,它将匹配为缓存键。sendUser

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

在下一种情况下,自定义器被定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。但是,它需要一个闭包之外的变量。因此,它不会与缓存键匹配。sendUser

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

在最后一个示例中,定制器被定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。虽然它确实使用变量名称,但它不是源自其闭包之外,因此作为缓存键匹配。 这说明变量可以在 Lambda 闭包中使用,甚至可以调用静态方法。sendUser

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用同一实例),或者它需要在其闭包之外定义变量,则您必须为定制器实施提供有效的实现。equals/hashCode
如果不遵循这些规则,则创建者缓存将始终丢失,并且应用程序性能将受到负面影响。

4.5. 在生产者上拦截消息

添加 a 可以让您在将消息发布到代理之前拦截和更改生产者收到的消息。 为此,可以将侦听器列表传递到构造函数中。 使用多个侦听器时,它们的应用顺序是它们在列表中的出现顺序。ProducerInterceptorPulsarTemplate

如果使用 Spring Boot 自动配置,则可以将拦截器指定为 Beans。 它们会自动传递给 . 拦截器的排序是通过使用如下注释来实现的:PulsarTemplate@Order

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
如果您不使用启动器,则需要自行配置和注册上述组件。
对于不包含主题参数的 API 变体,使用主题解析过程来确定目标主题。
请谨慎使用,因为这样可以完全访问生产者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create
请注意,使用 时,唯一有效的设置是 。MessageRouterspring.pulsar.producer.message-routing-modecustom
使用 Lambda 定制工具时,必须遵循“Lambda 定制工具注意事项”中描述的规则。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。
是邮件类的完全限定名称。message-type
如果在直接使用生产者工厂 API 时未指定主题信息,则使用相同的主题解析过程,但省略了“消息类型默认值”步骤。PulsarTemplate
如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给 ,根据其注解排序。PulsarProducerFactory@Order
统治:且仅当两个作为 Lambda 实现的定制器使用相同的 Lambda 实例并且不需要在其闭包之外定义任何变量时,它们才会匹配。equals/hashCode
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用同一实例),或者它需要在其闭包之外定义变量,则您必须为定制器实施提供有效的实现。equals/hashCode
如果不遵循这些规则,则创建者缓存将始终丢失,并且应用程序性能将受到负面影响。
如果您不使用启动器,则需要自行配置和注册上述组件。

5. 消息消费

5.1. Pulsar 监听器

对于 Pulsar 使用者,我们建议最终用户应用程序使用注解。 要使用 ,您需要使用注释。 当您使用 Spring Boot 支持时,它会自动启用此注解并配置 所需的所有组件,例如消息侦听器基础设施(负责创建 Pulsar 使用者)。 使用 a 创建和管理 Pulsar 消费者,即它用来消费消息的底层 Pulsar 消费者。PulsarListenerPulsarListener@EnablePulsarPulsarListenerPulsarMessageListenerContainerPulsarConsumerFactory

Spring Boot 提供了这个消费者工厂,您可以通过指定 spring.pulsar.consumer.* 应用程序属性来进一步配置它。出厂时的大多数已配置属性都将在侦听器中得到遵守,但以下情况除外

该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}

让我们重新审视一下我们在快速浏览部分看到的代码片段:PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

您可以进一步简化此方法:

@PulsarListener
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

在这种最基本的形式中,当注释中未提供时,将使用自动生成的订阅名称。 同样,当不直接提供时,使用主题解析过程来确定目标主题。subscriptionName@PulsarListenertopics

在前面显示的方法中,我们以 ,但未指定任何模式类型。 在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。 框架检测到您期望该类型,然后根据该信息推断架构类型。 然后,它将该架构提供给使用者。 对于 Java 中的所有基元类型,框架都会进行此推理。 对于任何复杂类型(如 JSON、AVRO 等),框架无法执行此推理,用户需要使用该属性在注释上提供架构类型。PulsarListenerStringStringschemaType

下面的示例显示了另一种方法,该方法采用:PulsarListenerInteger

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
   System.out.println(message);
}

以下方法显示了如何从主题中使用复杂类型:PulsarListener

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
    System.out.println(message);
}

请注意,在 上添加了一个属性。 这是因为库无法从提供的类型推断架构类型: 。我们必须告诉框架要使用什么模式。schemaTypePulsarListenerFoo

让我们再看几种方法。

您可以直接使用 Pulsar 消息:

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
}

以下示例使用 Spring 消息信封使用记录:

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
}

现在让我们看看如何批量使用记录。 以下示例用于将批量记录用作 POJO:PulsarListener

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

请注意,在此示例中,我们将记录作为对象的集合 () 接收。 此外,若要在级别上启用批处理使用,需要将批注上的属性设置为 。ListPulsarListenerbatchtrue

根据所持有的实际类型,框架会尝试推断要使用的架构。 如果 包含复杂类型,则仍需要提供 on .ListListschemaTypePulsarListener

下面使用 Pulsar Java 客户端提供的信封:Message

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

以下示例使用具有 Spring 消息类型信封的批处理记录:Message

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}

最后,您还可以将 Pulsar 中的 holder 对象用于批处理侦听器:Messages

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

使用 时,可以直接在注解本身上提供 Pulsar 使用者属性。 如果您不想使用前面提到的引导配置属性或具有多种方法,这将非常方便。PulsarListenerPulsarListener

以下示例直接在 Pulsar 消费者属性上使用 :PulsarListener

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 使用者属性,而不是应用程序配置属性spring.pulsar.consumer

5.1.1. 自定义 ConsumerBuilder

您可以通过提供 of 类型来自定义任何可用的字段,然后使其可用,如下所示。ConsumerBuilderPulsarListenerConsumerBuilderCustomizer@BeanPulsarListenerConsumerBuilderCustomizerPulsarListener

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@PulsarListenerPulsarListenerConsumerBuilderCustomizer

5.2. 指定模式信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 . 对于非基元类型,如果未在注解中明确指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。PulsarListenerSchema.JSON

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。

5.2.1. 自定义模式映射

作为在复杂类型上指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需在侦听器上设置架构,因为框架会使用传入消息类型咨询解析器。PulsarListener

可以使用该属性配置架构映射。 以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSON

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
是邮件类的完全限定名称。message-type

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。

下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:UserAddressAVROJSON

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

有了这个配置,就不需要在侦听器上设置架构,例如:

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
    System.out.println(user);
}

5.3. 访问 Pulsar 消费者对象

有时,您需要直接访问 Pulsar Consumer 对象。 以下示例演示如何获取它:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}
以这种方式访问对象时,请勿调用任何会通过调用任何接收方法来更改使用者的光标位置的操作。 所有此类操作都必须由容器完成。Consumer

5.4. Pulsar 消息监听器容器

现在我们通过 .现在让我们深入了解如何与底层 Pulsar 消费者交互的内部工作原理。 请记住,对于最终用户应用程序,在大多数情况下,我们建议在将 Spring 用于 Apache Pulsar 时,直接使用注释来使用 Pulsar 主题,因为该模型涵盖了广泛的应用程序用例。 但是,了解内部的工作方式很重要。本节将介绍这些详细信息。PulsarListenerPulsarListenerPulsarListenerPulsarListener

如前所述,当您将 Spring 用于 Apache Pulsar 时,消息侦听器容器是消息使用的核心。 在后台使用消息监听器容器基础设施来创建和管理 Pulsar 消费者。 Spring for Apache Pulsar 通过 . 此消息侦听器容器的缺省实现是通过 提供的。 顾名思义,包含消息侦听器。 容器创建 Pulsar 消费者,然后运行单独的线程来接收和处理数据。 数据由提供的消息侦听器实现处理。PulsarListenerPulsarMessageListenerContainerDefaultPulsarMessageListenerContainerPulsarMessageListenerContainer

消息侦听器容器使用使用者的方法批量使用数据。 一旦收到数据,它就会被移交给选定的消息侦听器实现。batchReceive

当您将 Spring 用于 Apache Pulsar 时,可以使用以下消息侦听器类型。

我们将在以下各节中查看有关这些不同消息侦听器的详细信息。

然而,在这样做之前,让我们仔细看看容器本身。

5.4.1. DefaultPulsarMessageListenerContainer

这是一个基于使用者的消息侦听器容器。 下面的列表显示了其构造函数:

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
			PulsarContainerProperties pulsarContainerProperties)
}

它接收一个(用于创建使用者)和一个对象(包含有关容器属性的信息)。 具有以下构造函数:PulsarConsumerFactoryPulsarContainerPropertiesPulsarContainerProperties

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)

您可以通过提供给消费者工厂的消费者财产或作为消费者财产提供主题信息。 以下示例使用 :PulsarContainerPropertiesDefaultPulsarMessageListenerContainer

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
		});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
        pulsarContainerProperties);

return pulsarListenerContainer;
如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。PulsarListener

DefaultPulsarMessageListenerContainer仅创建单个使用者。 如果要通过多个线程管理多个使用者,则需要使用 .ConcurrentPulsarMessageListenerContainer

5.4.2. ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer具有以下构造函数:

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
    PulsarContainerProperties pulsarContainerProperties)

ConcurrentPulsarMessageListenerContainer允许您通过 setter 指定属性。 只有在非独占订阅(、 和 )上才允许并发大于 。 只有当您具有独享订阅模式时,您才能拥有并发默认值。concurrency1failoversharedkey-shared1

以下示例通过订阅的批注启用。concurrencyPulsarListenerfailover

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
    ...
    System.out.println("Current Thread: " + Thread.currentThread().getName());
    System.out.println("Current Consumer: " + consumer.getConsumerName());
}

在前面的侦听器中,假定主题有三个分区。 如果是未分区的主题,则将并发设置为 no 执行任何操作。除了主要的活动使用者之外,您还获得两个空闲使用者。 如果主题有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。 如果运行此 ,您会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。my-topic3PulsarListener

当你以这种方式在分区主题上使用订阅时,Pulsar 会保证消息排序。Failover

以下列表显示了 的另一个示例,但具有 subscription 和 enabled。PulsarListenerSharedconcurrency

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
    ...
}

在前面的示例中,创建了五个不同的使用者(这一次,我们假设主题有五个分区)。PulsarListener

在这个版本中,没有消息排序,因为订阅不保证 Pulsar 中的任何消息排序。Shared

如果需要消息排序,但仍需要共享订阅类型,则需要使用订阅类型。Key_Shared

5.4.3. 消息消费

让我们看一下消息侦听器容器如何实现单记录和基于批处理的消息使用。

单条记录消耗

为了这次讨论,让我们重新审视我们的基本知识:PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

使用这种方法,我们必须要求 Spring for Apache Pulsar 每次都使用单个记录调用 listener 方法。 我们提到,消息侦听器容器使用使用者上的方法批量使用数据。 框架检测到 ,在本例中,接收到单个记录。这意味着,在每次调用该方法时,它都需要一个单一记录。 尽管消息侦听器容器会批量使用记录,但它会循环访问收到的批处理,并通过 的适配器调用侦听器方法。 如上一节所示,它从 Pulsar Java 客户端提供的扩展而来,它支持基本方法。PulsarListenerbatchReceivePulsarListenerPulsarRecordMessageListenerPulsarRecordMessageListenerMessageListenerreceived

批量消耗

以下示例显示了批量使用记录:PulsarListener

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

使用此类型的 时,框架会检测到您处于批处理模式。 由于它已经使用 Consumer 的方法批量接收数据,因此它通过 的适配器将整个批处理传递给侦听器方法。PulsarListenerbatchReceivePulsarBatchMessageListener

5.5. Pulsar 接头

Pulsar 消息元数据可以作为 Spring 消息头使用。 可以在 PulsarHeaders.java 中找到可用标头的列表。

5.5.1. 在基于单条记录的消费者中访问

以下示例显示了如何在使用单记录使用模式的应用程序中访问各种 Pulsar 标头:

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

在前面的示例中,我们访问了 和 消息元数据的值以及名为 的自定义消息属性。 Spring 注解用于每个标题字段。messageIdrawDatafoo@Header

您也可以使用 Pulsar 作为包络来携带有效载荷。 这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。 但是,为方便起见,您也可以使用注释来检索它。 请注意,您还可以使用 Spring 消息信封来携带有效负载,然后使用 检索 Pulsar 标头。MessageHeaderMessage@Header

5.5.2. 基于批量记录的消费者访问

在本节中,我们将了解如何在使用批处理消费者的应用程序中访问各种 Pulsar 标头:

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

在前面的示例中,我们将数据作为 . 在提取各种标头时,我们也这样做。 Spring for Apache Pulsar 确保头列表与数据列表相对应。List<String>List<>

在使用批处理侦听器并接收有效负载时,也可以以相同的方式提取标头,例如 、 或 。List<org.apache.pulsar.client.api.Message<?>org.apache.pulsar.client.api.Messages<?>org.springframework.messaging.Messsge<?>

5.6. 消息确认

当您将 Spring 用于 Apache Pulsar 时,除非应用程序选择退出,否则消息确认由框架处理。 在本节中,我们将详细介绍框架如何处理消息确认。

5.6.1. 消息 ACK 模式

Spring for Apache Pulsar 提供了以下确认消息的模式:

  • BATCH

  • RECORD

  • MANUAL

BATCH确认模式是默认模式,但您可以在消息侦听器容器上更改它。 在以下各节中,我们将了解当您同时使用单个版本和批处理版本时,确认是如何工作的,以及它们如何转换为后备消息侦听器容器(并最终转换为 Pulsar 使用者)。PulsarListener

5.6.2. 单记录模式下的自动消息确认

让我们重新审视我们基于以下基本单消息:PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

很自然地想知道,当你使用 时,确认是如何工作的,特别是如果你熟悉直接使用 Pulsar 消费者。 答案归结为消息侦听器容器,因为它是 Spring 中 Apache Pulsar 的中心位置,负责协调所有与消费者相关的活动。PulsarListener

假设您没有覆盖默认行为,则当您使用上述命令时,幕后会发生以下情况:PulsarListener

  1. 首先,监听器容器从 Pulsar 消费者那里批量接收消息。

  2. 收到的消息一次传递到一条消息。PulsarListener

  3. 当所有记录都传递到侦听器方法并成功处理时,容器将确认原始批处理中的所有消息。

这是正常流程。如果原始批次中的任何记录引发异常,Spring for Apache Pulsar 将单独跟踪这些记录。 当处理完批处理中的所有记录时,Spring for Apache Pulsar 会确认所有成功的消息,并否定地确认(nack)所有失败的消息。 换言之,当使用 using 并使用默认的 ack 模式时,框架会等待从调用中接收到的所有记录处理成功,然后在 Pulsar 消费者上调用该方法。 如果任何特定记录在调用处理程序方法时引发异常,Spring for Apache Pulsar 将跟踪这些记录,并在处理整个批处理后单独调用这些记录。PulsarRecordMessageListenerBATCHbatchReceiveacknowledgenegativeAcknowledge

如果应用程序希望每条记录发生确认或否定确认,则可以启用确认模式。 在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果存在错误,则否定确认消息。 以下示例在 Pulsar 侦听器上启用 ack 模式:RECORDRECORD

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

5.6.3. 单记录模式下的手动消息确认

您可能并不总是希望框架发送确认,而是直接从应用程序本身发送确认。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
    System.out.println("Message Received: " + message.getValue());
	acknowledgment.acknowledge();
}

这里有几件事值得解释。首先,我们通过设置 来增强手动 ack 模式。 启用手动 ack 模式时,Spring for Apache Pulsar 允许应用程序注入对象。 该框架通过选择兼容的消息侦听器容器来实现这一点:用于基于单条记录的使用,它允许您访问对象。ackModePulsarListenerAcknowledgmentPulsarAcknowledgingMessageListenerAcknowledgment

该对象提供以下 API 方法:Acknowledgment

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

您可以使用 ack 模式将此对象注入到 while 中,然后调用相应的方法之一。AcknowledgmentPulsarListenerMANUAL

在前面的示例中,我们调用一个无参数方法。 这是因为该框架知道它当前在哪个下运行。 调用时,您不需要接收带有信封器的有效负载,而是在本例中使用目标类型 — 。 您还可以通过提供邮件 ID 来调用 的其他变体: 使用 时,必须使用信封接收有效负载。PulsarListeneracknowledgeMessageacknowledge()MessageStringacknowledgeacknowledge.acknowledge(message.getMessageId());acknowledge(messageId)Message<?>

与可能的确认类似,API 还提供了否定确认的选项。 请参阅前面显示的 nack 方法。Acknowledgment

您也可以直接调用 Pulsar 消费者:acknowledge

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
    System.out.println("Message Received: " + message.getValue());
	try {
		consumer.acknowledge(message);
	}
	catch (Exception e) {
		....
	}
}

直接调用底层消费者时,需要自行进行错误处理。 使用 不需要这样做,因为框架可以为您做到这一点。 因此,在使用手动确认时,应使用对象方法。acknowledgeAcknowledgmentAcknowledgment

使用手动确认时,重要的是要了解框架完全不受任何确认的影响。 因此,在设计应用程序时,考虑正确的确认策略非常重要。

5.6.4. 批量消费中的自动消息确认

当您批量使用记录(请参阅 “消息 ACK 模式”)并使用默认的 ack 模式时,当整个批处理成功时,将确认整个批处理。 如果任何记录引发异常,则整个批次将被否定确认。 请注意,这可能与在生产者端批处理的批次不同。相反,这是从调用消费者返回的批次BATCHbatchReceive

请考虑以下批处理侦听器:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
    for (Foo foo : messages) {
		...
    }
}

当传入集合(在此示例中)中的所有消息都得到处理时,框架将确认所有这些消息。messages

在批处理模式下使用时,不是允许的 ack 模式。 这可能会导致问题,因为应用程序可能不希望再次重新交付整个批处理。 在这种情况下,您需要使用确认模式。RECORDMANUAL

5.6.5. 批量消费中的手动消息确认

如上一节所示,在消息侦听器容器上设置 ack 模式时,框架不会执行任何确认,无论是正面的还是负面的。 完全由应用程序来处理这些问题。 设置 ack 模式后,Spring for Apache Pulsar 会选择一个兼容的消息侦听器容器:用于批量使用,这样您就可以访问对象。 以下是 API 中可用的方法:MANUALMANUALPulsarBatchAcknowledgingMessageListenerAcknowledgmentAcknowledgment

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

您可以使用 ack 模式将此对象注入到您的 while 中。 以下列表显示了基于批处理的侦听器的基本示例:AcknowledgmentPulsarListenerMANUAL

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
    for (Message<String> message : messages) {
		try {
			...
	        acknowledgment.acknowledge(message.getMessageId());
		}
		catch (Exception e) {
			acknowledgment.nack(message.getMessageId());
		}
    }
}

使用批处理侦听器时,消息侦听器容器无法知道它当前正在对哪条记录进行操作。 因此,若要手动确认,需要使用接受 a 或 a 的重载方法之一。 您也可以使用 for 批处理侦听器进行否定确认。acknowledgeMessageIdList<MessageId>MessageId

5.7. 消息重新传递和错误处理

现在我们已经了解了消息侦听器容器基础结构及其各种功能,现在让我们尝试了解消息重新传递和错误处理。 Apache Pulsar 为消息重新传递和错误处理提供了各种原生策略。我们来看看它们,看看我们如何在 Spring 中将它们用于 Apache Pulsar。PulsarListener

5.7.1. 指定消息重新传递的确认超时

默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值高于零,并且 Pulsar 使用者在该超时期限内未确认消息,则重新传递消息。

当您将 Spring 用于 Apache Pulsar 时,您可以通过消费者定制器或使用本机 Pulsar 属性在以下属性中设置此属性:ackTimeoutproperties@PulsarListener

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"ackTimeout=60s"})
public void listen(String s) {
    ...
}

当您指定 ack 超时时,如果消费者在 60 秒内没有发送确认,则 Pulsar 会将消息重新传递给消费者。

如果要为具有不同延迟的 ack 超时指定一些高级退避选项,可以执行以下操作:

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

    @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
            topics = "withAckTimeoutRedeliveryBackoff-test-topic",
            ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
            properties = { "ackTimeout=60s" })
    void listen(String msg) {
        // some long-running process that may cause an ack timeout
    }

    @Bean
    RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

在前面的例子中,我们为 Pulsar 指定了一个 bean,最小延迟为 1 秒,最大延迟为 10 秒,退避乘数为 2。 在发生初始 ack 超时后,消息重新传递将通过此回退 Bean 进行控制。 在本例中,我们通过将属性设置为实际的 Bean 名称 — 来为注释提供退避 Bean。RedeliveryBackoffPulsarListenerackTimeoutRedeliveryBackoffackTimeoutRedeliveryBackoff

5.7.2. 指定否定确认重新传送

当确认为否定时,Pulsar 使用者允许您指定应用程序希望如何重新传递消息。 默认设置是在一分钟内重新传递消息,但您可以通过使用者定制器或使用属性中的原生 Pulsar 属性进行更改:negativeAckRedeliveryDelayproperties@PulsarListener

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
    ...
}

您还可以通过提供 Bean 并在 PulsarProducer 上提供 Bean 名称作为属性来指定具有乘数的不同延迟和回退机制,如下所示:RedeliveryBackoffnegativeAckRedeliveryBackoff

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

    @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
            topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
            subscriptionType = SubscriptionType.Shared)
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @Bean
    RedeliveryBackoff redeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

5.7.3. 使用 Apache Pulsar 的死信主题进行消息重新传递和错误处理

Apache Pulsar 允许应用程序在具有订阅类型的使用者上使用死信主题。 对于 和 订阅类型,此功能不可用。 其基本思想是,如果一条消息被重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。 让我们通过检查一些代码片段来了解此功能的一些详细信息:SharedExclusiveFailover

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
            topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
            subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

首先,我们有一个特殊的 bean,它被命名为(它可以是任何你想要的名称)。 此 Bean 指定了许多内容,例如最大投放量(在本例中为 10)和死信主题的名称 — 在本例中。 如果未指定 DLQ 主题名称,则默认为 Pulsar 中的 DLQ 主题名称。 接下来,我们通过设置属性来提供此 Bean 名称。 请注意,订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。 此代码主要用于演示目的,因此我们提供 1 秒的值。 这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,它会重试。 如果该循环持续十次(因为这是我们在 中的最大重新交付计数),则 Pulsar 使用者将消息发布到 DLQ 主题。 我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicydeadLetterPolicymy-dlq-topic<topicname>-<subscriptionname>-DLQPulsarListenerdeadLetterPolicyPulsarListenerSharedackTimeoutDeadLetterPolicyPulsarListener

使用分区主题时有关 DLQ 主题的特别说明

如果主主题被分区,则在后台,每个分区都会被 Pulsar 视为一个单独的主题。 Pulsar 将 ,其中 代表分区号附加到主主题名称中。 问题在于,如果你没有指定 DLQ 主题(与我们上面所做的相反),Pulsar 会发布一个包含此信息的默认主题名称,例如:. 解决此问题的简单方法是始终提供 DLQ 主题名称。partition-<n>n`partition-<n>topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ

5.7.4. Spring for Apache Pulsar 的原生错误处理

如前所述,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。 如果应用程序需要对非共享订阅使用某些类似的功能,该怎么办? Pulsar 不支持独占订阅和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是订单保证的。 允许重新传递、DLQ 等有效地接收无序消息。 但是,如果应用程序可以接受,但更重要的是,非共享订阅需要此 DLQ 功能怎么办? 为此,Spring for Apache Pulsar 提供了一个 ,您可以在 Pulsar 中的任何订阅类型中使用它:、、 或 。PulsarConsumerErrorHandlerExclusiveFailoverSharedKey_Shared

当您从 Spring 使用 Apache Pulsar 时,请确保不要在侦听器上设置 ack 超时属性。PulsarConsumerErrorHandler

让我们通过检查一些代码片段来了解一些细节:

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

    @Bean
    PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
            PulsarTemplate<String> pulsarTemplate) {
        return new DefaultPulsarConsumerErrorHandler<>(
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
    }

    @PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
                    topics = "pulsarConsumerErrorHandler-topic",
					pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
    void listenDlt(String msg) {
        System.out.println("From DLT: " + msg);
    }

}

以豆子为例。 这将创建一个类型的 bean,并使用 Spring 为 Apache Pulsar 提供的开箱即用的默认实现: . 具有一个接受 a 和 a 的构造函数。 是具有以下 API 的功能接口:pulsarConsumerErrorHandlerPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandlerPulsarMessageRecovererFactoryorg.springframework.util.backoff.BackoffPulsarMessageRecovererFactory

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

	/**
	 * Provides a message recoverer {@link PulsarMessageRecoverer}.
	 * @param consumer Pulsar consumer
	 * @return {@link PulsarMessageRecoverer}.
	 */
	PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}

该方法接受一个 Pulsar 消费者并返回一个 ,这是另一个函数接口。 这是 API :recovererForConsumerPulsarMessageRecovererPulsarMessageRecoverer

public interface PulsarMessageRecoverer<T> {

	/**
	 * Recover a failed message, for e.g. send the message to a DLT.
	 * @param message Pulsar message
	 * @param exception exception from failed message
	 */
	void recoverMessage(Message<T> message, Exception exception);

}

Spring for Apache Pulsar 提供了一个 called 实现,该实现提供了一个默认实现,该实现可以通过将消息发送到死信主题 (DLT) 来恢复消息。 我们将此实现提供给前面的构造函数。 作为第二个参数,我们提供 . 您还可以提供 from Spring 以获得高级退避功能。 然后,我们为 的 be 提供这个 bean 名称作为 的属性。 该属性称为 。 每次该方法对消息失败时,都会重试该方法。 重试次数由提供的实现值控制。在我们的示例中,我们进行了 10 次重试(总共 11 次尝试 — 第一次尝试,然后是 10 次重试)。 用尽所有重试后,消息将发送到 DLT 主题。PulsarMessageRecovererFactoryPulsarDeadLetterPublishingRecovererDefaultPulsarConsumerErrorHandlerFixedBackOffExponentialBackoffPulsarConsumerErrorHandlerPulsarListenerpulsarConsumerErrorHandlerPulsarListenerBackoff

我们提供的实现使用用于将消息发布到 DLT 的 a。 在大多数情况下,从 Spring Boot 自动配置的相同功能就足够了,但需要注意分区主题。 使用分区主题和对主主题使用自定义消息路由时,必须使用不采用填充值为 for 的自动配置的其他消息路由 。 您可以将 a 与以下蓝图一起使用:PulsarDeadLetterPublishingRecovererPulsarTemplatePulsarTemplatePulsarTemplatePulsarProducerFactorycustompartitionmessage-routing-modePulsarConsumerErrorHandler

@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
    PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
        PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

        BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
                (c, m) -> "my-foo-dlt";

        PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

        return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
                new FixedBackOff(100, 5));
}

请注意,我们为 作为第二个构造函数参数提供了一个目标解析器。 如果未提供,则用作 DLT 主题名称。 使用此功能时,应通过设置目标解析程序而不是使用默认值来使用正确的目标名称。PulsarDeadLetterPublishingRecovererPulsarDeadLetterPublishingRecoverer<subscription-name>-<topic-name>-DLT>

使用单条记录消息侦听器时,就像我们对 所做的那样,如果您使用手动确认,请确保在引发异常时不要否定地确认消息。 相反,将异常重新抛回容器。否则,容器会认为消息是单独处理的,并且不会触发错误处理。PulsarConsumerErrorHnadler

最后,我们还有第二个接收来自 DLT 主题的消息。PulsarListener

到目前为止,在本节提供的示例中,我们只看到了如何与单个记录消息侦听器一起使用。 接下来,我们将了解如何在批处理侦听器上使用它。PulsarConsumerErrorHandler

5.7.5. 使用 PulsarConsumerErrorHandler 的批处理监听器

首先,让我们看一个批处理方法:PulsarListener

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
            subscriptionType = SubscriptionType.Failover,
            pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
    for (Message<Integer> datum : data) {
        if (datum.getValue() == 5) {
            throw new PulsarBatchListenerFailedException("failed", datum);
        }
        acknowledgement.acknowledge(datum.getMessageId());
    }
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
        PulsarTemplate<String> pulsarTemplate) {
    return new DefaultPulsarConsumerErrorHandler<>(
            new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
    System.out.println("DLT - RECEIVED: " + message.getValue());
}

再一次,我们为 bean 名称提供属性。 当您使用批处理侦听器(如前面的示例所示)并希望将 from Spring 用于 Apache Pulsar 时,您需要使用手动确认。 这样,您可以确认所有成功的单个消息。 对于失败的,您必须抛出一个带有失败的消息。 如果没有此例外,框架不知道如何处理故障。 重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。 如果再次失败,则重试,直到重试次数用尽,此时消息将发送到 DLT。 此时,容器会确认消息,并将侦听器与原始批处理中的后续消息一起移交。pulsarConsumerErrorHandlerPulsarConsumerErrorHandlerPulsarConsumerErrorHandlerPulsarBatchListenerFailedException

5.8. PulsarListener 上的消费者自定义

Spring for Apache Pulsar 提供了一种方便的方式来自定义由 . 应用程序可以为 提供 bean 。 下面是一个示例。PulsarListenerPulsarListenerConsumerBuilderCustomizer

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return cb -> {
        cb.subscriptionName("modified-subscription-name");
    };
}

然后,可以将此定制器 Bean 名称作为注释上的属性提供,如下所示。PuslarListener

@PulsarListener(subscriptionName = "my-subscription",
        topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}

在创建 Pulsar Consumer 之前,该框架通过 在 Consumer 构建器上检测提供的 Bean,并将此定制器应用于 Consumer 构建器。PulsarListener

如果您有多个方法,并且每个方法都有不同的定制规则,那么您应该创建多个定制器 Bean 并在每个 .PulsarListenerPulsarListener

5.9. 暂停和恢复消息侦听器容器

在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后再恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的功能。 当 Pulsar 消息监听器容器暂停时,容器为接收 Pulsar 消费者数据而进行的任何轮询都将暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。

要暂停或恢复侦听器容器,请先通过 Bean 获取容器实例,然后在容器实例上调用暂停/恢复 API - 如下图所示:PulsarListenerEndpointRegistry

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
  PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
  container.pause();
}
传递给的 id 参数是容器 id - 在暂停/恢复 .getListenerContainer@PulsarListener@PulsarListener

5.10. Pulsar Reader 支持

该框架支持通过 .PulsarReaderFactory

Spring Boot 提供了此读取器工厂,您可以通过指定任何 spring.pulsar.reader.* 应用程序属性来进一步配置该工厂。

5.10.1. PulsarReader 注解

虽然可以直接使用,但 Spring for Apache Pulsar 提供了注释,您可以使用它来快速阅读主题,而无需自己设置任何阅读器工厂。 这与背后的相同想法类似 这里是一个简单的例子。PulsarReaderFactoryPulsarReaderPulsarListener.

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
    //...
}

该属性是可选的,但最佳做法是提供对应用程序有意义的值。 如果未指定,将使用自动生成的 ID。 另一方面,和 属性是必需的。 该属性可以是单个主题,也可以是以逗号分隔的主题列表。 该属性指示读者从主题中的特定消息开始。 的有效值是 或 假设您希望读者开始从最早或最新的可用消息以外的主题任意读取消息。在这种情况下,您需要使用 a 来自定义,以便它知道从中开始的权利。idtopicsstartMessageIdtopicsstartMessageIdstartMessageIdearliestlatest.ReaderBuilderCustomizerReaderBuilderMessageId

5.10.2. 自定义 ReaderBuilder

您可以通过在 Spring for Apache Pulsar 中使用 来自定义任何可用的字段。 您可以提供 of 类型,然后将其提供给以下内容。ReaderBuilderPulsarReaderReaderBuilderCustomizer@BeanPulsarReaderReaderBuilderCustomizerPulsarReader

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
    readerCustomizer = "myCustomizer")
void read(String message) {
    //...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
    return readerBuilder -> {
        readerBuilder.startMessageId(messageId); // the first message read is after this message id.
        // Any other customizations on the readerBuilder
    };
}
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@PulsarReaderPulsarReaderReaderBuilderCustomizer
该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}
使用的属性是直接的 Pulsar 使用者属性,而不是应用程序配置属性spring.pulsar.consumer
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@PulsarListenerPulsarListenerConsumerBuilderCustomizer
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。
是邮件类的完全限定名称。message-type
以这种方式访问对象时,请勿调用任何会通过调用任何接收方法来更改使用者的光标位置的操作。 所有此类操作都必须由容器完成。Consumer
如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。PulsarListener
当你以这种方式在分区主题上使用订阅时,Pulsar 会保证消息排序。Failover
在这个版本中,没有消息排序,因为订阅不保证 Pulsar 中的任何消息排序。Shared
使用手动确认时,重要的是要了解框架完全不受任何确认的影响。 因此,在设计应用程序时,考虑正确的确认策略非常重要。
传递给的 id 参数是容器 id - 在暂停/恢复 .getListenerContainer@PulsarListener@PulsarListener
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@PulsarReaderPulsarReaderReaderBuilderCustomizer

6. 主题解析

生成或使用消息时需要目标主题。 该框架在以下有序位置中查找以确定主题(在第一次找到时停止):

  • 用户指定

  • 消息类型默认

  • 全局默认值

通过默认机制之一找到主题时,无需在 produce or consume API 上指定该主题。

当找不到主题时,API 将相应地抛出异常。

6.1. 用户指定

传递到正在使用的 API 的主题具有最高优先级(例如。 或 )。PulsarTemplate.send("my-topic", myMessage)@PulsarListener(topics = "my-topic"

6.2. 消息类型默认

当没有主题传递到 API 中时,系统会查找为正在生成或使用的消息类型配置的消息类型到主题的映射。

可以使用该属性配置映射。 以下示例用于配置在使用或生成消息时使用的默认主题:spring.pulsar.defaults.type-mappingsapplication.ymlFooBar

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.Foo
          topic-name: foo-topic
        - message-type: com.acme.Bar
          topic-name: bar-topic
是邮件类的完全限定名称。message-type
如果消息(或输入的第一条消息)是 ,则框架将无法从中确定主题。如果应用程序可能发送消息,则应使用另一种方法指定主题。Publishernullnull

6.2.1. 自定义主题解析器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以通过证明自己的实现来替换默认解析程序,例如:

@Bean
public MyTopicResolver topicResolver() {
	return new MyTopicResolver();
}

6.3. 生产者全局默认

(生产时)参考的最终位置是系统范围的生产者默认主题。 使用命令式 API 时通过属性配置,使用反应式 API 时通过属性配置。spring.pulsar.producer.topic-namespring.pulsar.reactive.sender.topic-name

6.4. 消费者全局默认

咨询的最终位置(使用时)是系统范围的使用者默认主题。 使用命令式 API 时通过 or 属性配置它,使用反应式 API 时通过 or 属性之一配置它。spring.pulsar.consumer.topicsspring.pulsar.consumer.topics-patternspring.pulsar.reactive.consumer.topicsspring.pulsar.reactive.consumer.topics-pattern

是邮件类的完全限定名称。message-type
如果消息(或输入的第一条消息)是 ,则框架将无法从中确定主题。如果应用程序可能发送消息,则应使用另一种方法指定主题。Publishernullnull

7. 发布和使用分区主题

在以下示例中,我们发布到一个名为 的主题。 这是一个分区的主题,对于此示例,我们假定该主题已使用三个分区创建。hello-pulsar-partitioned

@SpringBootApplication
public class PulsarBootPartitioned {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
		return args -> {
			for (int i = 0; i < 10; i++) {
				pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
				pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
				pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
			}
		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}

    static class FooRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 0;
		}
	}

	static class BarRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 1;
		}
	}

	static class BuzzRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 2;
		}
	}

}

在前面的示例中,我们发布到一个分区主题,并希望将某个数据段发布到特定分区。 如果你把它留给 Pulsar 的默认值,它会遵循分区分配的循环模式,我们想覆盖它。 为此,我们提供了一个带有该方法的消息路由器对象。 考虑实现的三个消息路由器。 总是将数据发送到分区,发送到分区,并发送到分区。 另请注意,我们现在使用 that 返回 . 在运行应用程序时,我们还需要将 on the producer 设置为 ()。sendFooRouter0BarRouter1BuzzRouter2sendAsyncPulsarTemplateCompletableFuturemessageRoutingModeCustomPartitionspring.pulsar.producer.message-routing-mode

在消费者方面,我们使用具有独家订阅类型的订阅。 这意味着来自所有分区的数据最终都属于同一个使用者,并且没有排序保证。PulsarListener

如果我们希望每个分区都由一个不同的使用者使用,我们该怎么办? 我们可以切换到订阅模式,并添加三个单独的消费者:failover

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
    System.out.println("Message Received 3: " + foo);
}

当您遵循此方法时,单个分区始终由专用使用者使用。

同样,如果你想使用 Pulsar 的共享消费者类型,你可以使用订阅类型。 但是,当您使用该模式时,您将失去任何排序保证,因为单个使用者可能会在另一个使用者获得机会之前收到来自所有分区的消息。sharedshared

请看以下示例:

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}