反应式支持
该框架为几乎所有支持的功能提供了 Reactive 对应项。
如果你把
|
但是,尚不支持以下各项:
-
非共享订阅中的错误处理
-
通过以下方式访问 Pulsar 头文件
@Header
在流模式下 -
观察
1. 前言
我们建议对基于 Apache Pulsar 的 Spring 应用程序使用 Spring-Boot-First 方法,因为这极大地简化了事情。
为此,您可以添加spring-pulsar-reactive-spring-boot-starter module 作为依赖项。 |
此参考的大部分内容都希望读者使用 starter,并在考虑这一点的情况下提供大多数配置说明。 但是,当说明特定于 Spring Boot Starters用法时,会努力调用。 |
2. 快速导览
我们将通过展示一个以 Reactive 方式生成和使用的示例 Spring Boot 应用程序,快速浏览一下 Spring for Apache Pulsar 中的 Reactive 支持。
这是一个完整的应用程序,不需要任何额外的配置,只要你在默认位置运行一个 Pulsar 集群 -localhost:6650
.
2.1. 依赖项
Spring Boot 应用程序只需要spring-pulsar-reactive-spring-boot-starter
Dependency。下面的清单分别显示了如何定义 Maven 和 Gradle 的依赖关系:
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar-reactive</artifactId>
<version>3.2.12</version>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive:3.2.12'
}
使用
|
2.2. 应用程序代码
以下是应用程序源代码:
@SpringBootApplication
public class ReactiveSpringPulsarHelloWorld {
public static void main(String[] args) {
SpringApplication.run(ReactiveSpringPulsarHelloWorld.class, args);
}
@Bean
ApplicationRunner runner(ReactivePulsarTemplate<String> pulsarTemplate) {
return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Reactive Pulsar World!").subscribe();
}
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println("Reactive listener received: " + message);
return Mono.empty();
}
}
就是这样,只需几行代码,我们就有了一个可以工作的 Spring Boot 应用程序,它以响应式方式生成和消费来自 Pulsar 主题的消息。
启动后,应用程序会使用ReactivePulsarTemplate
要将消息发送到hello-pulsar-topic
.
然后,它从hello-pulsar-topic
使用@ReactivePulsarListener
.
简单性的关键要素之一是 Spring Boot Starters,它可以自动配置并为应用程序提供所需的组件 |
3. 设计
以下是需要牢记的几个关键设计点。
3.1. Apache Pulsar 反应式
反应式支持最终由 Apache Pulsar Reactive 客户端提供,其当前实现是围绕常规 Pulsar 客户端异步 API 的完全非阻塞适配器。 这意味着 Reactive 客户端需要常规客户端。
4. 反应式 Pulsar 客户端
当您使用 Reactive Pulsar Spring Boot Starter 时,您将获得ReactivePulsarClient
auto-configured的。
默认情况下,应用程序会尝试连接到位于pulsar://localhost:6650
.
这可以通过设置spring.pulsar.client.service-url
属性设置为不同的值。
该值必须是有效的 Pulsar 协议 URL |
还有许多其他应用程序属性(继承自改编的命令式客户端)可供配置。
请参阅spring.pulsar.client.*
应用程序属性。
4.1. 身份验证
要连接到需要身份验证的 Pulsar 集群,请按照与命令式客户端相同的步骤进行作。 同样,这是因为反应式 Client 端采用了处理所有安全配置的命令式 Client 端。
5. 消息制作
5.1. ReactivePulsarTemplate
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个ReactivePulsarTemplate
用于发布记录。该模板实现了一个名为ReactivePulsarOperations
并提供通过其 Contract 发布记录的方法。
该模板提供了 send 方法,这些方法接受单个消息并返回Mono<MessageId>
.
它还提供了接受多条消息的 send 方法(以 ReactiveStreams 的形式Publisher
type) 并返回一个Flux<MessageId>
.
对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。 |
5.1.1. Fluent API
该模板提供了一个 Fluent 构建器来处理更复杂的发送请求。
5.1.2. 消息自定义
您可以指定MessageSpecBuilderCustomizer
以配置传出消息。例如,以下代码演示如何发送键控消息:
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
5.1.3. 发件人自定义
您可以指定ReactiveMessageSenderBuilderCustomizer
配置底层 Pulsar sender 构建器,最终构建用于发送传出消息的 sender。
请谨慎使用,因为这会提供对 sender 构建器的完全访问权限,并调用其某些方法(例如create ) 可能会产生意想不到的副作用。 |
例如,以下代码演示如何禁用批处理和启用分块:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
另一个示例显示了在将记录发布到分区主题时如何使用自定义路由。
指定您的自定义MessageRouter
在 Sender Builder 上实施,例如:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
请注意,当使用MessageRouter ,则spring.pulsar.producer.message-routing-mode 是custom . |
5.2. 指定 Schema 信息
如果您使用 Java 基元类型,框架会自动为您检测架构,并且您无需指定任何架构类型来发布数据。
对于非原始类型,如果在对ReactivePulsarTemplate
,Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从 type.
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。 |
5.2.1. 自定义 Schema 映射
作为在ReactivePulsarTemplate
对于复杂类型,可以使用类型的映射配置 Schema Resolver。
这样就无需在框架使用传出消息类型咨询解析程序时指定架构。
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
要为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是 Message 类的完全限定名称。 |
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
有了这个配置,就不需要设置 specify the schema on send operations 了。
5.3. ReactivePulsarSenderFactory
这ReactivePulsarTemplate
依赖于ReactivePulsarSenderFactory
以实际创建基础发件人。
Spring Boot 提供了这个发送者工厂,可以使用任何spring.pulsar.producer.*
应用程序属性。
5.3.1. 生产者缓存
每个底层 Pulsar 生产者都会消耗资源。
为了提高性能并避免持续创建 producer,ReactiveMessageSenderCache
在底层 Apache Pulsar Reactive 客户端缓存它创建的生产者。
它们以 LRU 方式缓存,并在配置的时间段内未使用时被驱逐。
您可以通过指定任何spring.pulsar.producer.cache.*
应用程序属性。
6. 消息消费
6.1. @ReactivePulsarListener
对于 Pulsar 消费者,我们建议最终用户应用程序使用ReactivePulsarListener
注解。
要使用ReactivePulsarListener
,您需要使用@EnableReactivePulsar
注解。
当您使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。
让我们重新审视一下ReactivePulsarListener
我们在 Quick-Tour 部分看到的代码片段:
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
listener 方法返回一个Mono<Void> 以指示消息是否已成功处理。Mono.empty() 表示成功 (确认) 和Mono.error() 表示失败(否定确认)。 |
您还可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最基本的形式中,当topics
未直接提供,则使用主题解析过程来确定目标主题。
同样,当subscriptionName
未在@ReactivePulsarListener
annotation 将使用自动生成的订阅名称。
在ReactivePulsarListener
方法,我们接收数据为String
,但我们没有指定任何 schema 类型。
在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。
框架检测到您希望String
type,然后根据该信息推断 schema 类型。
然后,它将该架构提供给使用者。
对于 Java 中的所有原始类型,框架都会执行此推理。
对于任何复杂类型(例如 JSON、AVRO 等),框架无法进行此推理,用户需要使用schemaType
财产。
此示例显示了我们如何使用 topic 中的复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
请注意,添加了schemaType
属性ReactivePulsarListener
.
这是因为该库无法从提供的类型推断架构类型:Foo
.我们必须告诉框架要使用什么 schema。
让我们看看我们可以消费的更多方式。
此示例直接使用 Pulsar 消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
此示例使用包装在 Spring 消息传递信封中的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
6.1.1. 流
以上都是逐个使用单个记录的示例。 然而,使用 Reactive 的一个令人信服的理由是具有背压支持的流功能。
以下示例使用ReactivePulsarListener
要使用 POJO 流:
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
在这里,我们以Flux
的 Pulsar 消息。
此外,要在ReactivePulsarListener
level 中,您需要设置stream
属性设置为true
.
listener 方法返回一个Flux<MessageResult<Void>> 其中,每个元素表示一条已处理的消息,并保存消息 ID、值以及是否被确认。
这MessageResult 有一组静态工厂方法,可用于创建适当的MessageResult 实例。 |
根据Flux
,框架会尝试推断要使用的 schema。
如果它包含复杂类型,您仍然需要提供schemaType
上ReactivePulsarListener
.
以下侦听器使用 Spring 消息传递Message
具有复杂类型的 envelope :
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
listener 方法返回一个Flux<MessageResult<Void>> 其中,每个元素表示一条已处理的消息,并保存消息 ID、值以及是否被确认。
SpringMessageUtils 有一组静态工厂方法,可用于创建适当的MessageResult 实例。 |
6.1.2. 配置 - 应用程序属性
侦听器依赖于ReactivePulsarConsumerFactory
创建和管理用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*
应用程序属性。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外:
这spring.pulsar.consumer.subscription.name 属性将被忽略,而是在未在 Comments 上指定时生成。 |
这spring.pulsar.consumer.subscription-type property 被忽略,而是取自 annotation 上的值。但是,您可以将subscriptionType = {} 以改用 Property 值作为默认值。 |
6.1.3. 消费者定制
您可以指定ReactivePulsarListenerMessageConsumerBuilderCustomizer
配置底层 Pulsar consumer builder,最终构造侦听器用来接收消息的 consumer。
请谨慎使用,因为这会提供对使用者构建器的完全访问权限,并调用其某些方法(例如create ) 可能会产生意想不到的副作用。 |
例如,下面的代码演示如何将订阅的初始位置设置为主题上最早的消息。
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只有一个@ReactivePulsarListener 和一个ReactivePulsarListenerMessageConsumerBuilderCustomizer bean 已注册,则将自动应用定制器。 |
您还可以使用定制器向 Consumer Builder 提供直接的 Pulsar Consumer 属性。
如果您不想使用前面提到的 Boot 配置属性或具有多个ReactivePulsarListener
方法。
以下定制器示例使用直接 Pulsar 消费者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 消费者属性,而不是spring.pulsar.consumer Spring Boot 配置属性 |
6.2. 指定 Schema 信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在ReactivePulsarListener
.
对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从 type.
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。 |
6.2.1. 自定义 Schema 映射
作为在ReactivePulsarListener
对于复杂类型,可以使用类型的映射配置 Schema Resolver。
这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
要为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是 Message 类的完全限定名称。 |
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
有了这个配置,就不需要在侦听器上设置 schema,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
6.3. 消息侦听器容器基础设施
在大多数情况下,我们建议使用ReactivePulsarListener
注解直接用于从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,了解如何作非常重要ReactivePulsarListener
在内部工作。
当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息使用的核心。
这ReactivePulsarListener
在幕后使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。
6.3.1. ReactivePulsarMessageListenerContainer
此消息侦听器容器的协定是通过ReactivePulsarMessageListenerContainer
其默认实现会创建一个反应式 Pulsar 消费者,并连接一个使用创建的消费者的反应式消息管道。
6.4. 并发
在流式处理模式下使用记录 (stream = true
) 并发性自然而然地通过客户端实现中的底层 Reactive 支持来实现。
但是,在逐个处理消息时,可以指定并发以提高处理吞吐量。
只需将concurrency
属性@ReactivePulsarListener
.
此外,当concurrency > 1
你可以确保消息按键排序,从而通过设置useKeyOrderedProcessing = "true"
在注释上。
同样,ReactiveMessagePipeline
执行繁重的工作,我们只需为其设置 Properties。
6.5. Pulsar 头文件
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。
6.5.1. 在 OneByOne Listener 中访问
以下示例显示了在使用逐个消息侦听器时如何访问 Pulsar Headers:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问messageId
message 元数据以及名为foo
.
Spring@Header
annotation 用于每个 Header 字段。
您还可以使用 Pulsar 的Message
作为 envelope 来承载负载。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为方便起见,您也可以使用Header
注解。
请注意,您还可以使用 Spring 消息传递Message
envelope 来携带 payload,然后使用@Header
.
6.6. 消息确认
框架会自动处理消息确认。 但是,侦听器方法必须发送一个信号,指示消息是否已成功处理。 然后,容器实现使用该信号执行 ack 或 nack作。 这与它的命令式对应项略有不同,在命令式中,除非方法引发异常,否则信号被暗示为正。
6.7. 消息重新投递和错误处理
Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。 我们将看看它们,并了解如何通过 Spring for Apache Pulsar 使用它们。
6.7.1. 鸣谢超时
默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。
你可以通过消费者定制器将此属性直接指定为 Pulsar 消费者属性,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
}
6.7.2. 否定确认重新传递延迟
当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。 默认情况下,在一分钟内重新传送消息,但您可以通过使用者定制器进行更改,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
6.7.3. Dead Letter 主题
Apache Pulsar 允许应用程序在消费者上使用死信主题,并带有Shared
订阅类型。
对于Exclusive
和Failover
订阅类型,则此功能不可用。
基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),则一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
}
}
首先,我们有一个特殊的 beanDeadLetterPolicy
,它被命名为deadLetterPolicy
(您可以根据需要使用任何名称)。
这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 —my-dlq-topic
,在本例中。
如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ
在 Pulsar 中。
接下来,我们将此 bean 名称提供给ReactivePulsarListener
通过设置deadLetterPolicy
财产。
请注意,ReactivePulsarListener
订阅类型的值为Shared
,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供了一个ackTimeout
值为 1 秒。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。
如果该周期持续 10 次(因为这是我们在DeadLetterPolicy
),则 Pulsar consumer 会将消息发布到 DLQ 主题。
我们还有另一个ReactivePulsarListener
侦听 DLQ 主题以在发布到 DLQ 主题时接收数据。
6.8. Pulsar Reader 支持
该框架支持以响应式方式使用 Pulsar Reader,通过ReactivePulsarReaderFactory
.
Spring Boot 提供了这个 reader 工厂,它可以配置任何spring.pulsar.reader.*
应用程序属性。
7. 主题解析
在生成或使用消息时,需要目标主题。 框架会查找以下有序位置以确定主题(在第一次查找时停止):
-
用户指定
-
消息类型默认
-
全局默认值
当通过默认机制之一找到主题时,无需在 produce 或 consume API 上指定主题。
当找不到主题时,API 将相应地引发异常。
7.1. 用户指定
传递到正在使用的 API 中的主题具有最高优先级(例如。PulsarTemplate.send("my-topic", myMessage)
或@PulsarListener(topics = "my-topic"
).
7.2. 消息类型默认
当没有主题传递到 API 中时,系统会查找为正在生成或使用的消息类型配置的消息类型到主题映射。
映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
配置在消费或生成时使用的默认主题Foo
或Bar
消息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.Foo
topic-name: foo-topic
- message-type: com.acme.Bar
topic-name: bar-topic
这message-type 是 Message 类的完全限定名称。 |
如果消息(或Publisher input) 为null ,框架将无法从中确定主题。如果您的应用程序可能会发送null 消息。 |