1. @ReactivePulsarListener
对于 Pulsar 使用者,我们建议最终用户应用程序使用注解。
要使用 ,您需要使用注释。
当您使用 Spring Boot 支持时,它会自动启用此注解并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。ReactivePulsarListener
ReactivePulsarListener
@EnableReactivePulsar
让我们重新审视一下我们在快速浏览部分看到的代码片段:ReactivePulsarListener
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
侦听器方法返回 a 以指示消息是否已成功处理。 表示成功(确认),表示失败(否定确认)。Mono<Void> Mono.empty() Mono.error() |
您还可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最基本的形式中,当不直接提供时,使用主题解析过程来确定目标主题。
同样,如果注释中未提供 ,则将使用自动生成的订阅名称。topics
subscriptionName
@ReactivePulsarListener
在前面显示的方法中,我们以 ,但未指定任何模式类型。
在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。ReactivePulsarListener
String
框架检测到您期望该类型,然后根据该信息推断架构类型,并将该架构提供给使用者。
该框架对所有基元类型执行此推理。
对于所有非基元类型,默认架构假定为 JSON。
如果复杂类型使用 JSON 以外的任何内容(如 AVRO 或 KEY_VALUE),则必须使用该属性在批注上提供架构类型。String
schemaType
此示例显示了如何从主题中使用复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
让我们再看看几种消费方式。
此示例直接使用 Pulsar 消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
此示例使用包装在 Spring 消息信封中的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
1.1. 流式处理
以上所有内容都是逐个使用单个记录的示例。 然而,使用 Reactive 的一个令人信服的原因是因为支持背压的流式处理功能。
以下示例用于使用POJO流:ReactivePulsarListener
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
在这里,我们接收记录作为 Pulsar 消息。
此外,要在级别上启用流使用,您需要将注释上的属性设置为 。Flux
ReactivePulsarListener
stream
true
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。
它有一组静态工厂方法,可用于创建相应的实例。Flux<MessageResult<Void>> MessageResult MessageResult |
根据 中消息的实际类型,框架会尝试推断要使用的架构。
如果它包含复杂类型,则仍需要提供 on .Flux
schemaType
ReactivePulsarListener
以下侦听器使用复杂类型的 Spring 消息信封:Message
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。
Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。
它为 Spring 消息提供的功能与 Pulsar 消息的工厂方法集相同。Flux<MessageResult<Void>> MessageUtils MessageResult MessageUtils MessagResult |
不支持在org.apache.pulsar.client.api.Messages<T> @ReactivePulsarListener
|
1.2. 配置 - 应用程序属性
侦听器依赖于 来创建和管理它用来使用消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,您可以通过指定 spring.pulsar.consumer.*
应用程序属性来进一步配置它。出厂时的大多数已配置属性都将在侦听器中得到遵守,但以下情况除外:ReactivePulsarConsumerFactory
该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name |
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-type subscriptionType = {} |
1.3. 带有AUTO_CONSUME的通用记录
如果无法提前知道 Pulsar 主题的 schema 类型,可以使用该 schema 类型来消费通用记录。
在这种情况下,主题使用与主题关联的架构信息将消息反序列化为对象。AUTO_CONSUME
GenericRecord
要使用通用记录,请将 on your 并使用 Pulsar 消息类型作为消息参数,如下所示。schemaType = SchemaType.AUTO_CONSUME
@ReactivePulsarListener
GenericRecord
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
API 允许访问字段及其关联值GenericRecord |
1.4. 消费者定制
您可以指定 a 来配置底层 Pulsar 消费者构建器,该构建器最终构建侦听器用于接收消息的消费者。ReactivePulsarListenerMessageConsumerBuilderCustomizer
请谨慎使用,因为这可以完全访问使用者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
例如,以下代码演示如何将订阅的初始位置设置为主题上最早的消息。
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@ReactivePulsarListener ReactivePulsarListenerMessageConsumerBuilderCustomizer |
您还可以使用定制器将 Pulsar 使用者属性直接提供给使用者构建器。
如果您不想使用前面提到的引导配置属性,或者具有多种配置不同的方法,则这很方便。ReactivePulsarListener
以下定制器示例使用直接的 Pulsar 使用者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 使用者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer |
侦听器方法返回 a 以指示消息是否已成功处理。 表示成功(确认),表示失败(否定确认)。Mono<Void> Mono.empty() Mono.error() |
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。
它有一组静态工厂方法,可用于创建相应的实例。Flux<MessageResult<Void>> MessageResult MessageResult |
侦听器方法返回一个 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。
Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。
它为 Spring 消息提供的功能与 Pulsar 消息的工厂方法集相同。Flux<MessageResult<Void>> MessageUtils MessageResult MessageUtils MessagResult |
不支持在org.apache.pulsar.client.api.Messages<T> @ReactivePulsarListener
|
该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name |
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-type subscriptionType = {} |
API 允许访问字段及其关联值GenericRecord |
请谨慎使用,因为这可以完全访问使用者构建器,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@ReactivePulsarListener ReactivePulsarListenerMessageConsumerBuilderCustomizer |
使用的属性是直接的 Pulsar 使用者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer |
2. 指定架构信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 .
对于非基元类型,如果未在注解中明确指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarListener
Schema.JSON
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。 |
2.1. 自定义模式映射
作为在复杂类型上指定架构的替代方法,可以使用类型的映射配置架构解析程序。
这样就无需在侦听器上设置架构,因为框架会使用传入消息类型咨询解析器。ReactivePulsarListener
2.1.1. 配置属性
可以使用该属性配置架构映射。
以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappings
application.yml
User
Address
AVRO
JSON
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
是邮件类的完全限定名称。message-type |
2.1.2. 模式解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。
下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:User
Address
AVRO
JSON
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
指定要用于特定消息类型的缺省模式信息的另一个选项是使用注释标记消息类。
可以通过注释上的属性指定架构信息。@PulsarMessage
schemaType
以下示例将系统配置为在生成或使用以下类型的消息时使用 JSON 作为默认架构:Foo
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这个配置,就不需要在侦听器上设置架构,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。 |
是邮件类的完全限定名称。message-type |
3. 消息侦听器容器基础设施
在大多数情况下,我们建议直接使用注释来使用 Pulsar 主题,因为该模型涵盖了广泛的应用程序用例。
但是,了解内部的工作方式很重要。ReactivePulsarListener
ReactivePulsarListener
当您将 Spring 用于 Apache Pulsar 时,消息侦听器容器是消息使用的核心。
在后台使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。ReactivePulsarListener
3.1. ReactivePulsarMessageListenerContainer
此消息侦听器容器的协定是通过其默认实现提供的,该协定创建一个响应式 Pulsar 使用者,并连接使用创建的使用者的反应式消息管道。ReactivePulsarMessageListenerContainer
3.2. ReactiveMessagePipeline
管道是底层 Apache Pulsar 反应式客户端的一项功能,它执行以反应式方式接收数据,然后将其移交给提供的消息处理程序的繁重工作。反应式消息侦听器容器实现要简单得多,因为管道处理大部分工作。
3.3. ReactivePulsarMessage处理程序
“listener”方面由提供,其中提供了两个实现:ReactivePulsarMessageHandler
-
ReactivePulsarOneByOneMessageHandler
- 逐条处理单个消息 -
ReactivePulsarStreamingHandler
- 通过Flux
如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarListener |
如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。ReactivePulsarListener |
4. 并发
在流式处理模式下使用记录时,并发性自然会通过客户端实现中的底层反应式支持来实现。stream = true
但是,在逐个处理消息时,可以指定并发性以提高处理吞吐量。
只需将属性设置为 。
此外,当您可以确保消息按键排序,因此通过设置注释将消息发送到同一处理程序时。concurrency
@ReactivePulsarListener
concurrency > 1
useKeyOrderedProcessing = "true"
同样,它做了繁重的工作,我们只是在它上面设置属性。ReactiveMessagePipeline
5. 脉冲星接头
Pulsar 消息元数据可以作为 Spring 消息头使用。 可以在 PulsarHeaders.java 中找到可用标头的列表。
5.1. 在 OneByOne 监听器中访问
以下示例显示了在使用逐个消息侦听器时如何访问 Pulsar Headers:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问消息元数据的值以及名为 的自定义消息属性。
Spring 注解用于每个标题字段。messageId
foo
@Header
您也可以使用 Pulsar 作为包络来携带有效载荷。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为方便起见,您也可以使用注释来检索它。
请注意,您还可以使用 Spring 消息信封来携带有效负载,然后使用 检索 Pulsar 标头。Message
Header
Message
@Header
6. 消息确认
7. 邮件重新传递和错误处理
Apache Pulsar 为消息重新传递和错误处理提供了各种原生策略。 我们将看看它们,看看如何在 Spring 中将它们用于 Apache Pulsar。
7.1. 确认超时
默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值高于零,并且 Pulsar 使用者在该超时期限内未确认消息,则重新传递消息。
您可以通过消费者定制器将此属性直接指定为 Pulsar 使用者属性,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
}
7.2. 否定确认重新交付延迟
当确认为否定时,Pulsar 使用者允许您指定应用程序希望如何重新传递消息。 默认设置是在一分钟内重新传递消息,但可以通过使用者定制器进行更改,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
7.3. 死信主题
Apache Pulsar 允许应用程序在具有订阅类型的使用者上使用死信主题。
对于 和 订阅类型,此功能不可用。
其基本思想是,如果一条消息被重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解此功能的一些详细信息:Shared
Exclusive
Failover
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
}
}
首先,我们有一个特殊的 bean,它被命名为(它可以是任何你想要的名称)。
此 Bean 指定了许多内容,例如最大投放量(在本例中为 10)和死信主题的名称 — 在本例中。
如果未指定 DLQ 主题名称,则默认为 Pulsar 中的 DLQ 主题名称。
接下来,我们通过设置属性来提供此 Bean 名称。
请注意,订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供 1 秒的值。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,它会重试。
如果该循环持续十次(因为这是我们在 中的最大重新交付计数),则 Pulsar 使用者将消息发布到 DLQ 主题。
我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicy
deadLetterPolicy
my-dlq-topic
<topicname>-<subscriptionname>-DLQ
ReactivePulsarListener
deadLetterPolicy
ReactivePulsarListener
Shared
ackTimeout
DeadLetterPolicy
ReactivePulsarListener
8. Pulsar 阅读器支持
Spring Boot 提供了这个读取器工厂,可以使用任何 spring.pulsar.reader.*
应用程序属性进行配置。