1. 反应性脉冲星模板
在 Pulsar 生产者端,Spring Boot 自动配置提供了用于发布记录的功能。该模板实现一个名为的接口,并提供通过其协定发布记录的方法。ReactivePulsarTemplate
ReactivePulsarOperations
该模板提供了接受单个消息并返回 .
它还提供了接受多条消息(以 ReactiveStreams 类型的形式)并返回 .Mono<MessageId>
Publisher
Flux<MessageId>
对于不包含主题参数的 API 变体,使用主题解析过程来确定目标主题。 |
1.1. Fluent API
该模板提供了一个流畅的构建器来处理更复杂的发送请求。
1.2. 消息定制
您可以指定 a 来配置传出消息。例如,以下代码演示如何发送键控消息:MessageSpecBuilderCustomizer
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
1.3. 发送器自定义
您可以指定一个来配置底层 Pulsar 发送方构建器,该构建器最终构建用于发送传出消息的发送方。ReactiveMessageSenderBuilderCustomizer
请谨慎使用,因为这样可以完全访问发送方构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
例如,以下代码演示如何禁用批处理和启用分块:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
另一个示例演示在将记录发布到分区主题时如何使用自定义路由。
在发件人生成器上指定自定义实现,例如:MessageRouter
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
请注意,使用 时,唯一有效的设置是 。MessageRouter spring.pulsar.producer.message-routing-mode custom |
对于不包含主题参数的 API 变体,使用主题解析过程来确定目标主题。 |
请谨慎使用,因为这样可以完全访问发送方构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
请注意,使用 时,唯一有效的设置是 。MessageRouter spring.pulsar.producer.message-routing-mode custom |
2. 指定架构信息
如果使用 Java 基元类型,则框架会自动检测模式,并且无需指定任何模式类型即可发布数据。
对于非基元类型,如果在 上调用发送操作时未显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarTemplate
Schema.JSON
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和具有内联编码的KEY_VALUE。 |
2.1. 自定义模式映射
作为在对复杂类型调用发送操作时指定架构的替代方法,可以使用类型的映射配置架构解析程序。
这样就无需指定架构,因为框架会使用传出消息类型咨询解析程序。ReactivePulsarTemplate
2.1.1. 配置属性
可以使用该属性配置架构映射。
以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappings
application.yml
User
Address
AVRO
JSON
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
是邮件类的完全限定名称。message-type |
2.1.2. 模式解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。
下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:User
Address
AVRO
JSON
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.2. 使用AUTO_SCHEMA生产
如果无法提前知道 Pulsar 主题的 schema 类型,您可以使用 AUTO_PRODUCE schema 将原始 JSON 或 Avro 负载安全地发布为。byte[]
在这种情况下,生产者将验证出站字节是否与目标主题的架构兼容。
只需在模板上指定一个架构,发送操作,如以下示例所示:Schema.AUTO_PRODUCE_BYTES()
void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
这仅支持 Avro 和 JSON 架构类型。 |
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和具有内联编码的KEY_VALUE。 |
是邮件类的完全限定名称。message-type |
这仅支持 Avro 和 JSON 架构类型。 |
3. 反应式脉冲星发送器工厂
它依赖于 a 来实际创建基础发送方。ReactivePulsarTemplate
ReactivePulsarSenderFactory
Spring Boot 提供了这个发送方工厂,可以使用任何 spring.pulsar.producer.*
应用程序属性进行配置。
如果在直接使用发送方工厂 API 时未指定主题信息,则使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarTemplate |
3.1. 生产者缓存
每个底层的 Pulsar 生产者都会消耗资源。
为了提高性能并避免持续创建生产者,底层 Apache Pulsar 反应式客户端会缓存它创建的生产者。
它们以 LRU 方式缓存,并在配置的时间段内未使用时逐出。ReactiveMessageSenderCache
您可以通过指定任何spring.pulsar.producer.cache.*
应用程序属性来配置缓存设置。
如果在直接使用发送方工厂 API 时未指定主题信息,则使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarTemplate |