Pulsar 客户端

当您使用 Pulsar Spring Boot Starter 时,您将获得自动配置。PulsarClientspring-doc.cn

默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。 这可以通过将属性设置为其他值来调整。pulsar://localhost:6650spring.pulsar.client.service-urlspring-doc.cn

该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何 spring.pulsar.client.* 应用程序属性来进一步配置客户端。spring-doc.cn

如果您不使用 Starter,则需要自行配置和注册。 有一个接受构建器定制器的 that,可用于帮助实现此目的。PulsarClientDefaultPulsarClientFactory

1. TLS 加密 (SSL)

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。 以下部分介绍如何配置 Pulsar 客户端以使用 TLS 加密 (SSL)。 先决条件是 Broker 也已配置为使用 TLS 加密。spring-doc.cn

Spring Boot 自动配置当前不支持任何 TLS/SSL 配置属性。 相反,你可以提供一个在 Pulsar 客户端构建器上设置必要属性的 a。 Pulsar 支持隐私增强邮件 (PEM) 和 Java 密钥库 (JKS) 证书格式。PulsarClientBuilderCustomizerspring-doc.cn

请按照以下步骤配置 TLS:spring-doc.cn

  1. 调整 Pulsar 客户端服务 url 以使用 scheme 和 TLS 端口(通常为 )。pulsar+ssl://6651spring-doc.cn

  2. 调整 admin 客户端服务 url 以使用方案和 TLS Web 端口(通常为 )。https://8443spring-doc.cn

  3. 提供客户端生成器定制器,用于在生成器上设置相关属性。spring-doc.cn

您可以在 Pulsar TLS 加密官方文档中找到有关上述内容的更多信息。spring-doc.cn

2. 身份验证

要连接到需要身份验证的 Pulsar 集群,您需要指定要使用的身份验证插件以及指定插件所需的任何参数。 使用 Spring Boot 自动配置时,您可以通过配置属性设置插件和插件参数(在大多数情况下)。spring-doc.cn

您需要确保 下定义的名称与 auth 插件(通常是驼峰式大小写)所期望的名称完全匹配。 Spring Boot 不会尝试对这些条目进行任何类型的松散绑定。spring.pulsar.client.authentication.param.*spring-doc.cn

例如,如果要为 auth 插件配置颁发者 URL,则必须使用 。 如果使用其他形式(如 或 ),则设置将不会应用于插件。AuthenticationOAuth2spring.pulsar.client.authentication.param.issuerUrlissuerurlissuer-urlspring-doc.cn

对 auth 参数使用环境变量通常是有问题的,因为在转换过程中会丢失区分大小写。 例如,考虑通过环境变量设置的以下 auth 参数:issuerUrlspring-doc.cn

SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL=https://some.server.com

当 Spring Boot 加载此属性时,它将使用(小写)而不是预期的(驼峰式)。 您可以通过使用 env var 的值作为 application.yml 中相关 auth 属性的值来绕过此限制。 继续上面的示例:issuerurlissuerUrlspring-doc.cn

spring:
  pulsar:
    client:
      authentication:
        param:
          issuerUrl: ${SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL}

不使用 Spring Boot 自动配置时,你可以使用 来创建身份验证,然后直接在你提供给客户端工厂的客户端定制器中的 Pulsar 客户端构建器上进行设置。org.apache.pulsar.client.api.AuthenticationFactoryspring-doc.cn

以下列表显示了如何配置每个受支持的身份验证机制。spring-doc.cn

单击此处了解 Athenz
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz
        param:
          tenantDomain: ...
          tenantService: ...
          providerDomain: ...
          privateKey: ...
          keyId: ...
这也需要 TLS 加密
单击此处获取令牌
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
        param:
          token: some-token-goes-here
单击此处查看 Basic
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic
        param:
          userId: ...
          password: ...
单击此处获取 OAuth2
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: ...
          privateKey: ...
          audience: ...
          scope: ...
点击这里查看 Sasl
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl
        param:
          saslJaasClientSectionName: ...
          serverType: ...
单击此处获取 mTLS (PEM)
由于此选项需要 TLS 加密,而这已经要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。 您可以使用 来帮助创建身份验证对象,如下所示:org.apache.pulsar.client.api.AuthenticationFactory
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");

请参阅有关 mTLS (PEM) 的 Pulsar 官方文档。spring-doc.cn

单击此处获取 mTLS (JKS)
由于此选项需要 TLS 加密,而这已经要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。 您可以使用 来帮助创建身份验证对象,如下所示:org.apache.pulsar.client.api.AuthenticationFactory
Authentication auth = AuthenticationFactory.create(
        "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
        Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));

请参阅有关 mTLS (JKS) 的 Pulsar 官方文档。spring-doc.cn

您可以在官方 Pulsar 安全文档中找到有关每个支持插件及其所需属性的更多信息。spring-doc.cn

3. 自动集群级故障转移

Pulsar Spring Boot Starter 还可以自动配置 自动集群级故障转移PulsarClientspring-doc.cn

你可以使用 spring.pulsar.client.failover.* 应用程序属性来配置集群级故障转移。spring-doc.cn

以下示例为客户端配置一个主集群和两个备份集群。spring-doc.cn

application.yml
spring:
  pulsar:
    client:
      service-url: "pulsar://my.primary.server:6650"
      failover:
        delay: 30s
        switch-back-delay: 15s
        check-interval: 1s
        backup-clusters:
          - service-url: "pulsar://my.second.server:6650"
            authentication:
              plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
              param:
                token: "my-token"
          - service-url: "pulsar://my.third.server:6650"
除了客户端配置之外,代理上还必须满足一些先决条件才能使用此功能。

当不使用 Spring Boot 自动配置时,你可以提供一个 Client 端定制器,该定制器为 Client 端配置集群级故障转移。spring-doc.cn