消息消费
1. @ReactivePulsarListener
对于 Pulsar 消费者,我们建议最终用户应用程序使用 Annotation。
要使用 ,您需要使用注释。
当您使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。ReactivePulsarListener
ReactivePulsarListener
@EnableReactivePulsar
让我们重新访问一下我们在快速浏览部分看到的代码片段:ReactivePulsarListener
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
listener 方法返回 a 以指示消息是否已成功处理。 表示成功 (确认) 并指示失败 (否定确认)。Mono<Void> Mono.empty() Mono.error() |
您还可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最基本的形式中,当未直接提供时,将使用主题解析过程来确定目标主题。
同样,当 annotation 上未提供 the 时,将使用自动生成的订阅名称。topics
subscriptionName
@ReactivePulsarListener
在前面显示的方法中,我们以 形式接收数据,但我们没有指定任何架构类型。
在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。ReactivePulsarListener
String
框架检测到您期望的类型,然后根据该信息推断架构类型,并将该架构提供给使用者。
框架对所有基元类型执行此推理。
对于所有非基元类型,默认架构假定为 JSON。
如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用属性在注释上提供架构类型。String
schemaType
此示例显示了我们如何使用 topic 中的复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
让我们看看我们可以消费的更多方式。
此示例直接使用 Pulsar 消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
此示例使用包装在 Spring 消息传递信封中的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
1.1. 流式处理
以上都是逐个使用单个记录的示例。 然而,使用 Reactive 的一个令人信服的理由是具有背压支持的流功能。
以下示例用于使用 POJO 流:ReactivePulsarListener
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
在这里,我们接收 Pulsar 消息的记录。
此外,要在级别启用流使用,您需要将注释上的属性设置为 .Flux
ReactivePulsarListener
stream
true
listener 方法返回一个 where每个元素表示一个已处理的消息,并保存消息 ID、值以及它是否被确认。
它有一组静态工厂方法,可用于创建适当的实例。Flux<MessageResult<Void>> MessageResult MessageResult |
根据 中消息的实际类型,框架会尝试推断要使用的架构。
如果它包含复杂类型,您仍然需要提供 on .Flux
schemaType
ReactivePulsarListener
以下侦听器使用具有 complex type 的 Spring 消息传递信封:Message
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
listener 方法返回一个 where每个元素表示一个已处理的消息,并保存消息 ID、值以及它是否被确认。
Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。
它为 Spring 消息提供的功能与 Pulsar 消息的工厂方法集相同。Flux<MessageResult<Void>> MessageUtils MessageResult MessageUtils MessagResult |
不支持在org.apache.pulsar.client.api.Messages<T> @ReactivePulsarListener
|
1.2. 配置 - 应用程序属性
侦听器依赖于 来创建和管理它用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,你可以通过指定 spring.pulsar.consumer.*
应用程序属性来进一步配置它。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外:ReactivePulsarConsumerFactory
该属性将被忽略,而是在未在 annotation 上指定时生成。spring.pulsar.consumer.subscription.name |
该属性将被忽略,而是从 Comments 上的值中获取。但是,您可以将 on the annotation 设置为改用 property 值作为默认值。spring.pulsar.consumer.subscription-type subscriptionType = {} |
1.3. 使用 AUTO_CONSUME 的通用记录
如果无法提前知道某个 Pulsar topic 的 schema 类型,可以使用 schema type 来消费泛型记录。
在这种情况下,主题使用与主题关联的架构信息将消息反序列化为对象。AUTO_CONSUME
GenericRecord
要使用泛型记录,请在 your 上设置 ,并使用 type 为 Pulsar 的消息作为 message 参数,如下所示。schemaType = SchemaType.AUTO_CONSUME
@ReactivePulsarListener
GenericRecord
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
API 允许访问字段及其关联值GenericRecord |
1.4. 消费者定制
你可以指定 a 来配置底层 Pulsar consumer builder,该 builder 最终构造侦听器用来接收消息的 consumer。ReactivePulsarListenerMessageConsumerBuilderCustomizer
请谨慎使用,因为这提供了对 Consumer Builder 的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
例如,下面的代码演示如何将订阅的初始位置设置为主题上最早的消息。
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只注册了一个 bean 和一个 bean,则将自动应用定制器。@ReactivePulsarListener ReactivePulsarListenerMessageConsumerBuilderCustomizer |
您还可以使用定制器向 Consumer Builder 提供直接的 Pulsar Consumer 属性。
如果您不想使用前面提到的 Boot 配置属性或有多个配置不同的方法,这将非常方便。ReactivePulsarListener
以下定制器示例使用直接 Pulsar 消费者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 消费者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer |
2. 指定 Schema 信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 .
对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarListener
Schema.JSON
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。 |
2.1. 自定义 Schema 映射
作为在 for complex types 上指定架构的替代方法,可以使用类型的映射配置架构解析程序。
这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。ReactivePulsarListener
2.1.1. 配置属性
可以使用属性配置架构映射。
以下示例用于分别使用 和 schema 为 和 complex 对象添加映射:spring.pulsar.defaults.type-mappings
application.yml
User
Address
AVRO
JSON
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这是 message 类的完全限定名称。message-type |
2.1.2. Schema 解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器分别使用 and schemas 为 和 complex 对象添加映射:User
Address
AVRO
JSON
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
指定要用于特定消息类型的默认架构信息的另一个选项是使用注释标记消息类。
可以通过 annotation 上的属性指定 schema info。@PulsarMessage
schemaType
以下示例将系统配置为在生成或使用 type 为 messages 的 messages 时使用 JSON 作为默认模式:Foo
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这个配置,就不需要在侦听器上设置 schema,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
3. 消息侦听器容器基础设施
在大多数情况下,我们建议直接使用 Annotation 从 Pulsar 主题消费,因为该模型涵盖了广泛的应用程序用例。
但是,了解内部的工作原理很重要。ReactivePulsarListener
ReactivePulsarListener
当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息使用的核心。
它在幕后使用消息侦听器容器基础设施来创建和管理底层 Pulsar 消费者。ReactivePulsarListener
3.1. ReactivePulsarMessageListenerContainer
此消息侦听器容器的 Contract 通过其默认实现创建反应式 Pulsar Consumer,并连接使用创建的 Consumer 的反应式消息管道。ReactivePulsarMessageListenerContainer
3.2. ReactiveMessagePipeline
管道是底层 Apache Pulsar Reactive 客户端的一项功能,它以反应式方式完成接收数据,然后将其移交给提供的消息处理程序的繁重工作。反应式消息侦听器容器实现要简单得多,因为管道处理大部分工作。
3.3. ReactivePulsarMessageHandler
“listener” 方面由 提供,其中有两个提供的实现:ReactivePulsarMessageHandler
-
ReactivePulsarOneByOneMessageHandler
- 逐个处理单个消息 -
ReactivePulsarStreamingHandler
- 通过Flux
如果在直接使用侦听器容器时未指定主题信息,则使用与 相同的主题解析过程,但省略了 “Message type default” 步骤。ReactivePulsarListener |
4. 并发
当以流模式 () 使用记录时,并发性自然是通过客户端实现中的底层 Reactive 支持实现的。stream = true
但是,在逐个处理消息时,可以指定并发以提高处理吞吐量。
只需将属性设置为 .
此外,当您可以确保消息按键排序,从而通过对 annotation 进行设置来发送到同一处理程序时。concurrency
@ReactivePulsarListener
concurrency > 1
useKeyOrderedProcessing = "true"
同样,它做了繁重的工作,我们只需设置它的属性。ReactiveMessagePipeline
5. Pulsar 标头
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。
5.1. 在 OneByOne Listener 中访问
以下示例显示了在使用逐个消息侦听器时如何访问 Pulsar Headers:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问消息元数据的值以及名为 .
Spring 注解用于每个 header 字段。messageId
foo
@Header
你也可以使用 Pulsar 的 作为 envelope 来携带有效载荷。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为方便起见,您也可以使用 Comments 来检索它。
请注意,您还可以使用 Spring 消息传递信封来携带有效负载,然后使用 检索 Pulsar 标头。Message
Header
Message
@Header
6. 消息鸣谢
7. 消息重新投递和错误处理
Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。 我们将看看它们,并了解如何通过 Spring for Apache Pulsar 使用它们。
7.1. 确认超时
默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。
你可以通过消费者定制器将此属性直接指定为 Pulsar 消费者属性,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
}
7.2. 否定确认重新传递延迟
当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。 默认情况下,在一分钟内重新传送消息,但您可以通过使用者定制器进行更改,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
7.3. 死信主题
Apache Pulsar 允许应用程序在具有订阅类型的消费者上使用死信主题。
对于 和 订阅类型,此功能不可用。
基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:Shared
Exclusive
Failover
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
}
}
首先,我们有一个特殊的 bean for ,它被命名为 (它可以是你想要的任何名称)。
这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 — ,在本例中。
如果不指定 DLQ 主题名称,则默认为 in Pulsar。
接下来,我们通过设置属性来提供此 bean 名称。
请注意,它的订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供 1 秒的值。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。
如果该周期持续 10 次(因为这是我们在 中的最大重新传递计数),则 Pulsar consumer 会将消息发布到 DLQ 主题。
我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicy
deadLetterPolicy
my-dlq-topic
<topicname>-<subscriptionname>-DLQ
ReactivePulsarListener
deadLetterPolicy
ReactivePulsarListener
Shared
ackTimeout
DeadLetterPolicy
ReactivePulsarListener
8. Pulsar Reader 支持
Spring Boot 提供了这个 reader 工厂,可以使用任何 spring.pulsar.reader.*
应用程序属性进行配置。