1. 脉冲星模板

在 Pulsar 生产者端,Spring Boot 自动配置提供了用于发布记录的功能。该模板实现一个名为的接口,并提供通过其协定发布记录的方法。PulsarTemplatePulsarOperations

这些发送 API 方法分为两类:和 。 这些方法使用 Pulsar 生产者上的同步发送功能来阻止调用。 它们返回在代理上保留消息后发布的消息。 方法调用是非阻塞的异步调用。 它们返回一个 ,您可以使用它来在消息发布后异步接收消息 ID。sendsendAsyncsendMessageIdsendAsyncCompletableFuture

对于不包含主题参数的 API 变体,使用主题解析过程来确定目标主题。

1.1. 简单API

该模板为简单的发送请求提供了一些方法(“send”为前缀)。对于更复杂的发送请求,Fluent API 允许您配置更多选项。

1.2. Fluent API

该模板提供了一个流畅的构建器来处理更复杂的发送请求。

1.3. 消息定制

您可以指定 a 来配置传出消息。例如,以下代码演示如何发送键控消息:TypedMessageBuilderCustomizer

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

1.4. 生产者定制

您可以指定一个来配置底层的 Pulsar 生产者构建器,该构建器最终构建用于发送传出消息的生产者。ProducerBuilderCustomizer

请谨慎使用,因为这样可以完全访问生产者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create

例如,以下代码演示如何禁用批处理和启用分块:

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

另一个示例演示在将记录发布到分区主题时如何使用自定义路由。 在生成器上指定自定义实现,例如:MessageRouterProducer

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
请注意,使用 时,唯一有效的设置是 。MessageRouterspring.pulsar.producer.message-routing-modecustom

另一个示例显示了如何添加一个,该消息将在发布到代理之前拦截和更改生产者收到的消息:ProducerInterceptor

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

定制器将仅适用于用于发送操作的创建者。 如果要将定制器应用于所有生产者,则必须将其提供给生产者工厂,如全局生产者自定义中所述。

使用 Lambda 定制工具时,必须遵循“Lambda 定制工具注意事项”中描述的规则。
对于不包含主题参数的 API 变体,使用主题解析过程来确定目标主题。
请谨慎使用,因为这样可以完全访问生产者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create
请注意,使用 时,唯一有效的设置是 。MessageRouterspring.pulsar.producer.message-routing-modecustom
使用 Lambda 定制工具时,必须遵循“Lambda 定制工具注意事项”中描述的规则。

2. 指定架构信息

如果使用 Java 基元类型,则框架会自动检测模式,并且无需指定任何模式类型即可发布数据。 对于非基元类型,如果在 上调用发送操作时未显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。PulsarTemplateSchema.JSON

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和具有内联编码的KEY_VALUE。

2.1. 自定义模式映射

作为在对复杂类型调用发送操作时指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需指定架构,因为框架会使用传出消息类型咨询解析程序。PulsarTemplate

2.1.1. 配置属性

可以使用该属性配置架构映射。 以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSON

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
是邮件类的完全限定名称。message-type

2.1.2. 模式解析器定制器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。

下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:UserAddressAVROJSON

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

指定要用于特定消息类型的缺省模式信息的另一个选项是使用注释标记消息类。 可以通过注释上的属性指定架构信息。@PulsarMessageschemaType

以下示例将系统配置为在生成或使用以下类型的消息时使用 JSON 作为默认架构:Foo

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

使用此配置后,无需在发送操作上设置指定架构。

2.2. 使用AUTO_SCHEMA生产

如果无法提前知道 Pulsar 主题的 schema 类型,您可以使用 AUTO_PRODUCE schema 将原始 JSON 或 Avro 负载安全地发布为。byte[]

在这种情况下,生产者将验证出站字节是否与目标主题的架构兼容。

只需在模板上指定一个架构,发送操作,如以下示例所示:Schema.AUTO_PRODUCE_BYTES()

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这仅支持 Avro 和 JSON 架构类型。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和具有内联编码的KEY_VALUE。
是邮件类的完全限定名称。message-type
这仅支持 Avro 和 JSON 架构类型。

3. 脉冲星生产者工厂

它依赖于 a 来实际创建基础生产者。 Spring Boot 自动配置还提供了此生产者工厂,您可以通过指定任何 spring.pulsar.producer.* 应用程序属性来进一步配置该工厂。PulsarTemplatePulsarProducerFactory

如果在直接使用生产者工厂 API 时未指定主题信息,则使用相同的主题解析过程,但省略了“消息类型默认值”步骤。PulsarTemplate

3.1. 全局生产者定制

该框架提供了合约,允许您配置用于构造每个生产者的底层构建器。 若要自定义所有生产者,可以将定制器列表传递到构造函数中。 使用多个定制器时,将按它们在列表中的显示顺序应用它们。ProducerBuilderCustomizerPulsarProducerFactory

如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给 ,根据其注解排序。PulsarProducerFactory@Order

如果只想将定制器应用于单个生产者,则可以使用 Fluent API 并在发送时指定定制器

如果在直接使用生产者工厂 API 时未指定主题信息,则使用相同的主题解析过程,但省略了“消息类型默认值”步骤。PulsarTemplate
如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给 ,根据其注解排序。PulsarProducerFactory@Order

4. Pulsar 生产者缓存

每个底层的 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建生产者,生产者工厂会缓存其创建的生产者。 它们以 LRU 方式缓存,并在配置的时间段内未使用时逐出。 缓存键由足够的信息组成,以确保调用方在后续创建请求中返回相同的生产者。

此外,您可以通过指定任何spring.pulsar.producer.cache.*应用程序属性来配置缓存设置。

4.1. Lambda 定制工具的注意事项

任何用户提供的生产者定制器也包含在缓存键中。 由于缓存键依赖于 的有效实现,因此在使用 Lambda 定制程序时必须小心。equals/hashCode

统治:且仅当两个作为 Lambda 实现的定制器使用相同的 Lambda 实例并且不需要在其闭包之外定义任何变量时,它们才会匹配。equals/hashCode

为了阐明上述规则,我们将看几个例子。 在以下示例中,定制器定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。此外,它不需要闭包之外的变量。因此,它将匹配为缓存键。sendUser

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

在下一种情况下,自定义器被定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。但是,它需要一个闭包之外的变量。因此,它不会与缓存键匹配。sendUser

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

在最后一个示例中,定制器被定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。虽然它确实使用变量名称,但它不是源自其闭包之外,因此作为缓存键匹配。 这说明变量可以在 Lambda 闭包中使用,甚至可以调用静态方法。sendUser

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用同一实例),或者它需要在其闭包之外定义变量,则您必须为定制器实施提供有效的实现。equals/hashCode
如果不遵循这些规则,则创建者缓存将始终丢失,并且应用程序性能将受到负面影响。
统治:且仅当两个作为 Lambda 实现的定制器使用相同的 Lambda 实例并且不需要在其闭包之外定义任何变量时,它们才会匹配。equals/hashCode
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用同一实例),或者它需要在其闭包之外定义变量,则您必须为定制器实施提供有效的实现。equals/hashCode
如果不遵循这些规则,则创建者缓存将始终丢失,并且应用程序性能将受到负面影响。

5. 拦截生产者上的消息

添加 a 可以让您在将消息发布到代理之前拦截和更改生产者收到的消息。 为此,可以将侦听器列表传递到构造函数中。 使用多个侦听器时,它们的应用顺序是它们在列表中的出现顺序。ProducerInterceptorPulsarTemplate

如果使用 Spring Boot 自动配置,则可以将拦截器指定为 Beans。 它们会自动传递给 . 拦截器的排序是通过使用如下注释来实现的:PulsarTemplate@Order

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
如果您不使用启动器,则需要自行配置和注册上述组件。
如果您不使用启动器,则需要自行配置和注册上述组件。