消息消费
1. Pulsar 监听器
对于 Pulsar 消费者,我们建议最终用户应用程序使用PulsarListener
注解。
要使用PulsarListener
,您需要使用@EnablePulsar
注解。
当您使用 Spring Boot 支持时,它会自动启用此 Comments 并配置所有必要的组件PulsarListener
,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer
使用PulsarConsumerFactory
创建和管理 Pulsar 消费者,即用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*
应用程序属性。
让我们重新审视一下PulsarListener
我们在 Quick-Tour 部分看到的代码片段:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
您可以进一步简化此方法:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在这种最基本的形式中,当subscriptionName
未在@PulsarListener
annotation 将使用自动生成的订阅名称。
同样,当topics
未直接提供,则使用主题解析过程来确定目标主题。
在PulsarListener
方法,我们接收数据为String
,但我们没有指定任何 schema 类型。
在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。
框架检测到您希望String
type 的 Schema 类型,然后根据该信息推断 schema 类型,并将该 schema 提供给使用者。
框架对所有基元类型执行此推理。
对于所有非基元类型,默认架构假定为 JSON。
如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用schemaType
财产。
以下示例显示了另一个PulsarListener
方法,该方法采用Integer
:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
以下内容PulsarListener
method 展示了我们如何从 topic 中使用复杂类型:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
让我们看看更多方法。
你可以直接消费 Pulsar 消息:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
下面的示例使用 Spring 消息传递信封来使用记录:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何批量使用记录。
以下示例使用PulsarListener
要批量使用记录作为 POJO:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
请注意,在此示例中,我们将记录作为集合 (List
) 的对象。
此外,要在PulsarListener
level 中,您需要设置batch
属性设置为true
.
根据实际类型List
holds,则框架会尝试推断要使用的 schema。
如果List
包含除 JSON 之外的复杂类型,您仍然需要提供schemaType
上PulsarListener
.
下面使用Message
envelope 的 Pulsar Java 客户端提供的 Paypal 中:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例使用带有 Spring 消息传递信封的批处理记录Message
类型:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,您还可以使用Messages
holder 对象用于批处理监听器:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
当您使用PulsarListener
,你可以直接在 Comments 本身上提供 Pulsar consumer 属性。
如果您不想使用前面提到的 Boot 配置属性或具有多个PulsarListener
方法。
以下示例直接在PulsarListener
:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 消费者属性,而不是spring.pulsar.consumer 应用程序配置属性 |
1.1. 使用 AUTO_CONSUME 的通用记录
如果无法提前知道某个 Pulsar 主题的 schema 类型,可以使用AUTO_CONSUME
schema 类型来使用通用记录。
在这种情况下,主题将消息反序列化为GenericRecord
对象使用与主题关联的架构信息。
要使用通用记录,请将schemaType = SchemaType.AUTO_CONSUME
在您的@PulsarListener
并使用GenericRecord
作为 message 参数,如下所示。
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
这GenericRecord API 允许访问字段及其关联值 |
1.2. 自定义 ConsumerBuilder
您可以通过ConsumerBuilder
使用PulsarListenerConsumerBuilderCustomizer
通过提供@Bean
的类型PulsarListenerConsumerBuilderCustomizer
然后将其提供给PulsarListener
如下所示。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只有一个@PulsarListener 和一个PulsarListenerConsumerBuilderCustomizer bean 已注册,则将自动应用定制器。 |
2. 指定 Schema 信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在PulsarListener
.
对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从 type.
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。 |
2.1. 自定义 Schema 映射
作为在PulsarListener
对于复杂类型,可以使用类型的映射配置 Schema Resolver。
这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。
2.1.1. 配置属性
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
要为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是 Message 类的完全限定名称。 |
2.1.2. Schema 解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
指定要用于特定消息类型的默认架构信息的另一个选项是使用@PulsarMessage
注解。
架构信息可以通过schemaType
属性。
以下示例将系统配置为在生成或使用 JSON 类型的消息时使用 JSON 作为默认架构Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这个配置,就不需要在侦听器上设置 schema,例如:
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
3. 访问 Pulsar Consumer Object
有时,你需要直接访问 Pulsar Consumer 对象。 以下示例显示了如何获取它:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
访问Consumer 对象,请不要调用任何会通过调用任何 receive 方法来改变 Consumer 光标位置的作。
所有此类作都必须由容器完成。 |
4. Pulsar 消息监听器容器
现在我们已经看到了消费者端的基本交互PulsarListener
.现在让我们深入了解一下PulsarListener
与底层 Pulsar 消费者交互。
请记住,对于最终用户应用程序,在大多数情况下,我们建议使用PulsarListener
注解,以便在使用 Spring for Apache Pulsar 时直接从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,了解如何作非常重要PulsarListener
在内部工作。本节将介绍这些详细信息。
如前所述,当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。PulsarListener
在幕后使用消息侦听器容器基础设施来创建和管理 Pulsar 消费者。
Spring for Apache Pulsar 通过以下方式提供此消息侦听器容器的 ContractPulsarMessageListenerContainer
.
此消息侦听器容器的默认实现是通过DefaultPulsarMessageListenerContainer
.
顾名思义,PulsarMessageListenerContainer
包含消息侦听器。
容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。
数据由提供的消息侦听器实现处理。
消息侦听器容器使用使用者的batchReceive
方法。
收到数据后,数据将移交给选定的消息侦听器实现。
使用 Spring for Apache Pulsar 时,可以使用以下消息侦听器类型。
我们将在以下部分中看到有关这些不同消息侦听器的详细信息。
但是,在这样做之前,让我们仔细看看容器本身。
4.1. 默认 PulsarMessageListenerContainer
这是一个基于使用者的消息侦听器容器。 下面的清单显示了它的构造函数:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它接收一个PulsarConsumerFactory
(用于创建使用者)和PulsarContainerProperties
对象(包含有关容器属性的信息)。PulsarContainerProperties
具有以下构造函数:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
您可以通过以下方式提供主题信息PulsarContainerProperties
或作为提供给 Consumer Factory 的 Consumer Property 的 Consumer 属性。
以下示例使用DefaultPulsarMessageListenerContainer
:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
DefaultPulsarMessageListenerContainer
仅创建一个使用者。
如果要通过多个线程管理多个使用者,则需要使用ConcurrentPulsarMessageListenerContainer
.
4.2. ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer
具有以下构造函数:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer
允许您指定concurrency
property 的 setter 实现。
并发性超过1
仅允许在非独占订阅 (failover
,shared
和key-shared
).
您只能使用默认的1
用于并发。
以下示例启用concurrency
通过PulsarListener
注解failover
订阅。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前面的监听器中,假设 topicmy-topic
有三个分区。
如果是非分区主题,请将并发设置为3
什么都不做。除了主要的活跃使用者之外,您还会获得两个空闲使用者。
如果主题具有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。
如果您运行此PulsarListener
,您会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。
当您使用Failover subscription 这样,Pulsar 保证消息排序。 |
下面的清单显示了PulsarListener
,但使用Shared
subscription 和concurrency
启用。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前面的示例中,PulsarListener
创建五个不同的使用者(这一次,我们假设主题有 5 个分区)。
在这个版本中,没有消息排序,因为Shared 订阅不保证 Pulsar 中的任何消息排序。 |
如果您需要消息排序,但仍需要共享订阅类型,则需要使用Key_Shared
订阅类型。
4.3. 使用记录
让我们看看消息侦听器容器如何实现基于单记录和基于批处理的消息使用。
单记录消耗
让我们重新审视一下我们的基本PulsarListener
为了这次讨论:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
有了这个PulsarListener
方法,我们必须要求 Spring for Apache Pulsar 每次调用具有单个记录的监听器方法。
我们提到了消息侦听器容器使用batchReceive
方法。
框架检测到PulsarListener
,在本例中,接收单个记录。这意味着,在每次调用该方法时,它都需要一个 singe 记录。
尽管消息侦听器容器会批量使用记录,但它会遍历收到的批处理,并通过适配器调用PulsarRecordMessageListener
.
正如您在上一节中所看到的,PulsarRecordMessageListener
从MessageListener
由 Pulsar Java 客户端提供,并且支持基本的received
方法。
批量消耗
以下示例显示了PulsarListener
批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当您使用这种类型的PulsarListener
,框架会检测到您处于批处理模式。
由于它已经使用 Consumer 的batchReceive
方法,它会通过适配器将整个 Batch 移交给 listener 方法PulsarBatchMessageListener
.
5. Pulsar 标头
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。
5.1. 在基于单记录的 Consumer 中访问
以下示例展示了如何在使用单记录消费模式的应用程序中访问各种 Pulsar Headers:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前面的示例中,我们访问messageId
和rawData
message 元数据以及名为foo
.
Spring@Header
annotation 用于每个 Header 字段。
您还可以使用 Pulsar 的Message
作为 envelope 来承载负载。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为方便起见,您也可以使用Header
注解。
请注意,您还可以使用 Spring 消息传递Message
envelope 来携带 payload,然后使用@Header
.
5.2. 基于批量记录的 Consumer 访问
在本节中,我们将了解如何在使用批处理 consumer 的应用程序中访问各种 Pulsar Headers:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在前面的示例中,我们将数据作为List<String>
.
在提取各种 header 时,我们以List<>
也。
Spring for Apache Pulsar 确保 header 列表与 data 列表相对应。
当您使用批处理侦听器并接收有效负载时,您还可以以相同的方式提取标头List<org.apache.pulsar.client.api.Message<?>
,org.apache.pulsar.client.api.Messages<?>
或org.springframework.messaging.Messsge<?>
.
6. 消息鸣谢
当您使用 Spring for Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。 在本节中,我们将详细介绍框架如何处理消息确认。
6.1. 消息 ACK 模式
Spring for Apache Pulsar 提供了以下确认消息的模式:
-
BATCH
-
RECORD
-
MANUAL
BATCH
确认模式是默认模式,但您可以在 Message Listener Container (消息侦听器) 容器上更改它。
在以下部分中,我们将了解在同时使用PulsarListener
以及它们如何转换为后备消息侦听器容器(并最终转换为 Pulsar 消费者)。
6.2. 单记录模式下的自动消息确认
让我们重新审视一下我们基本的基于PulsarListener
:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
很自然地想知道,当您使用PulsarListener
,特别是如果你熟悉直接使用 Pulsar consumer 的话。
答案归结为消息侦听器容器,因为这是 Spring for Apache Pulsar 中协调所有与消费者相关的活动的中心位置。
假设您没有覆盖默认行为,那么当您使用前面的PulsarListener
:
-
首先,侦听器容器从 Pulsar 消费者批量接收消息。
-
收到的消息将传递给
PulsarListener
一次一条消息。 -
当所有记录都传递给侦听器方法并成功处理时,容器将确认来自原始批处理的所有消息。
这是正常流程。如果原始批处理中的任何记录引发异常, Spring for Apache Pulsar 会单独跟踪这些记录。
当批处理中的所有记录都被处理完时, Spring for Apache Pulsar 会确认所有成功的消息,并否定地确认 (nack) 所有失败的消息。
换句话说,当使用PulsarRecordMessageListener
和默认的 ACK 模式BATCH
时,框架会等待从batchReceive
调用以成功处理,然后调用acknowledge
方法。
如果任何特定记录在调用处理程序方法时引发异常,Spring for Apache Pulsar 会跟踪这些记录并单独调用negativeAcknowledge
在处理整个批次之后的这些记录上。
如果应用程序希望每条记录发生确认或否定确认,则RECORD
可以启用 ACK 模式。
在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果有错误,则否定确认消息。
以下示例启用RECORD
ack 模式:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
6.3. 单记录模式下的手动消息确认
您可能并不总是希望框架发送确认,而是直接从应用程序本身发送。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
有几件事值得在这里解释。首先,我们通过设置ackMode
上PulsarListener
.
启用手动 ack 模式时, Spring for Apache Pulsar 允许应用程序注入Acknowledgment
对象。
框架通过选择兼容的消息侦听器容器来实现此目的:PulsarAcknowledgingMessageListener
对于基于单个记录的消费,它允许您访问Acknowledgment
对象。
这Acknowledgment
object 提供了以下 API 方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
你可以注入这个Acknowledgment
object 导入到PulsarListener
使用MANUAL
ack 模式,然后调用相应的方法之一。
在前面的PulsarListener
例如,我们将无参数acknowledge
方法。
这是因为框架知道哪个Message
它目前正在 Under.
调用acknowledge()
,则无需接收带有Message
enveloper' 的 URL,而是使用 target 类型 —String
,在此示例中。
您还可以调用acknowledge
通过提供消息 ID:acknowledge.acknowledge(message.getMessageId());
当您使用acknowledge(messageId)
,您必须使用Message<?>
信封。
与可能的确认类似,Acknowledgment
API 还提供了用于否定确认的选项。
请参阅前面显示的 nack 方法。
您还可以调用acknowledge
直接在 Pulsar 消费者上:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
调用acknowledge
直接在底层 Consumer 上,你需要自己做错误处理。
使用Acknowledgment
不需要这样做,因为框架可以为您执行此作。
因此,您应该使用Acknowledgment
Object 方法。
使用手动确认时,请务必了解框架完全不执行任何确认。 因此,在设计应用程序时考虑正确的确认策略非常重要。 |
6.4. 批量消费自动确认消息
当您批量使用记录时(请参阅“消息 ACK 模式”),并且使用默认的 ack 模式BATCH
,则当成功处理整个批处理时,将确认整个批处理。
如果任何记录引发异常,则对整个批处理进行否定确认。
请注意,这可能不是在生产者端批处理的同一批次。相反,这是从调用batchReceive
在消费者身上
请考虑以下批处理侦听器:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当传入集合中的所有邮件 (messages
在此示例中)进行处理,则框架会确认所有这些 ID。
在批处理模式下使用时,RECORD
不是允许的 ACK 模式。
这可能会导致问题,因为应用程序可能不希望再次重新交付整个批处理。
在这种情况下,您需要使用MANUAL
确认模式。
6.5. 批量消费时手动消息确认
如上一节所示,当MANUAL
ack 模式,框架不做任何确认,无论是肯定的还是否定的。
处理此类问题完全取决于应用程序。
什么时候MANUAL
ack 模式,则 Spring for Apache Pulsar 会选择兼容的消息侦听器容器:PulsarBatchAcknowledgingMessageListener
用于批量使用,这使您可以访问Acknowledgment
对象。
以下是Acknowledgment
应用程序接口:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
你可以注入这个Acknowledgment
object 导入到PulsarListener
使用MANUAL
ACK 模式。
下面的清单显示了基于批处理的侦听器的基本示例:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
使用批处理侦听器时,消息侦听器容器无法知道它当前正在处理哪个记录。
因此,要手动确认,您需要使用一个重载的acknowledge
方法,该方法采用MessageId
或List<MessageId>
.
您也可以使用MessageId
对于批处理侦听器。
7. 消息重新投递和错误处理
现在我们已经看到了两者PulsarListener
以及消息侦听器容器基础设施及其各种功能,现在让我们尝试了解消息重投和错误处理。
Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。我们来看看它们,看看如何通过 Spring for Apache Pulsar 使用它们。
7.1. 指定消息重新投递的确认超时
默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。
使用 Spring for Apache Pulsar 时,可以通过消费者定制器或本机 Pulsar 设置此属性ackTimeout
属性在properties
属性@PulsarListener
:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeout=60s"})
public void listen(String s) {
...
}
当你指定 ack 超时时,如果 Consumer 在 60 秒内没有发送确认,则 Pulsar 会将消息重新投递给 Consumer。
如果要为具有不同延迟的 ack timeout 指定一些高级回退选项,可以执行以下作:
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeout=60s" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的示例中,我们为 Pulsar 的RedeliveryBackoff
最小延迟为 1 秒,最大延迟为 10 秒,回退乘数为 2。
在发生初始 ack 超时后,通过此回退 Bean 控制消息重新传递。
我们将 backoff bean 提供给PulsarListener
注解,方法是将ackTimeoutRedeliveryBackoff
property 设置为实际的 bean 名称 —ackTimeoutRedeliveryBackoff
,在本例中。
7.2. 指定否定确认重新传递
当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。
默认设置是在一分钟内重新传递消息,但您可以通过消费者定制器或使用本机 Pulsar 进行更改negativeAckRedeliveryDelay
属性在properties
属性@PulsarListener
:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
您还可以通过提供RedeliveryBackoff
Bean 并提供 Bean 名称作为negativeAckRedeliveryBackoff
property 属性,如下所示:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
7.3. 使用 Apache Pulsar 的死信主题进行消息重投和错误处理
Apache Pulsar 允许应用程序在消费者上使用死信主题,并带有Shared
订阅类型。
对于Exclusive
和Failover
订阅类型,则此功能不可用。
基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一个特殊的 beanDeadLetterPolicy
,它被命名为deadLetterPolicy
(您可以根据需要使用任何名称)。
这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 —my-dlq-topic
,在本例中。
如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ
在 Pulsar 中。
接下来,我们将此 bean 名称提供给PulsarListener
通过设置deadLetterPolicy
财产。
请注意,PulsarListener
订阅类型的值为Shared
,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供了一个ackTimeout
值为 1 秒。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。
如果该周期持续 10 次(因为这是我们在DeadLetterPolicy
),则 Pulsar consumer 会将消息发布到 DLQ 主题。
我们还有另一个PulsarListener
侦听 DLQ 主题以在发布到 DLQ 主题时接收数据。
7.4. Spring for Apache Pulsar 中的原生错误处理
正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。
如果应用程序需要对非共享订阅使用一些类似的功能,该怎么办?
Pulsar 不支持独占订阅和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是有订单保证的。
允许重新投递、DLQ 等实际上会不按顺序接收消息。
但是,如果应用程序对此没有问题,但更重要的是,需要此 DLQ 功能用于非共享订阅,该怎么办?
为此,Spring for Apache Pulsar 提供了一个PulsarConsumerErrorHandler
,您可以在 Pulsar 中的任何订阅类型中使用它:Exclusive
,Failover
,Shared
或Key_Shared
.
当您使用PulsarConsumerErrorHandler
从 Spring for Apache Pulsar 开始,请确保不要在侦听器上设置 ack timeout 属性。
让我们通过检查一些代码片段来了解一些细节:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑一下pulsarConsumerErrorHandler
豆。
这将创建一个PulsarConsumerErrorHandler
并使用 Spring for Apache Pulsar 提供的开箱即用的默认实现:DefaultPulsarConsumerErrorHandler
.DefaultPulsarConsumerErrorHandler
有一个构造函数,该构造函数采用PulsarMessageRecovererFactory
以及org.springframework.util.backoff.Backoff
.PulsarMessageRecovererFactory
是具有以下 API 的功能性接口:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
这recovererForConsumer
method 接受一个 Pulsar Consumer 并返回一个PulsarMessageRecoverer
,这是另一个功能接口。
下面是 APIPulsarMessageRecoverer
:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Spring for Apache Pulsar 为PulsarMessageRecovererFactory
叫PulsarDeadLetterPublishingRecoverer
,它提供了一个默认实现,可以通过将消息发送到死信主题 (DLT) 来恢复消息。
我们将此实现提供给前面的构造函数DefaultPulsarConsumerErrorHandler
.
作为第二个参数,我们提供了一个FixedBackOff
.
您还可以提供ExponentialBackoff
来自 Spring 的高级回退功能。
然后我们为PulsarConsumerErrorHandler
作为属性添加到PulsarListener
.
该属性称为pulsarConsumerErrorHandler
.
每次使用PulsarListener
方法失败,则会重试。
重试次数由Backoff
提供的实现值。在我们的示例中,我们执行 10 次重试(总共 11 次尝试 — 第一次,然后是 10 次重试)。
用尽所有重试后,消息将发送到 DLT 主题。
这PulsarDeadLetterPublishingRecoverer
implementation 我们使用PulsarTemplate
用于将消息发布到 DLT。
在大多数情况下,相同的自动配置PulsarTemplate
从 Spring Boot 就足够了,但需要注意分区主题。
使用分区主题并为主主题使用自定义消息路由时,您必须使用不同的PulsarTemplate
那不采用自动配置的PulsarProducerFactory
,该值填充为custompartition
为message-routing-mode
.
您可以使用PulsarConsumerErrorHandler
使用以下蓝图:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
请注意,我们为PulsarDeadLetterPublishingRecoverer
作为第二个 constructor 参数。
如果未提供,则PulsarDeadLetterPublishingRecoverer
使用<subscription-name>-<topic-name>-DLT>
作为 DLT 主题名称。
使用此功能时,您应该通过设置目标解析程序而不是使用默认值来使用正确的目标名称。
当使用单记录消息侦听器时,就像我们对PulsarConsumerErrorHnadler
,如果使用手动确认,请确保在引发异常时不要否定确认消息。
相反,将异常重新引发回容器。否则,容器会认为消息是单独处理的,不会触发错误处理。
最后,我们有第二个PulsarListener
接收来自 DLT 主题的消息。
到目前为止,在本节提供的示例中,我们只看到了如何使用PulsarConsumerErrorHandler
使用单个记录消息侦听器。
接下来,我们看看如何在批处理侦听器上使用它。
7.5. 使用 PulsarConsumerErrorHandler 的批处理监听器
首先,让我们看一个批次PulsarListener
方法:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
我们再次提供pulsarConsumerErrorHandler
属性替换为PulsarConsumerErrorHandler
Bean 名称。
当您使用批处理侦听器(如前面的示例所示)并希望使用PulsarConsumerErrorHandler
从 Spring for Apache Pulsar 开始,您需要使用手动确认。
这样,您可以确认所有成功的单个消息。
对于失败的 API,您必须抛出一个PulsarBatchListenerFailedException
替换为失败的消息。
如果没有此异常,框架不知道如何处理失败。
重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。
如果再次失败,则重试,直到重试次数用完,此时消息将发送到 DLT。
此时,容器会确认消息,并将原始批处理中的后续消息移交给侦听器。
8. PulsarListener 上的消费者定制
Spring for Apache Pulsar 提供了一种便捷的方式来自定义由PulsarListener
.
应用程序可以为PulsarListenerConsumerBuilderCustomizer
.
下面是一个示例。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,可以将此定制器 Bean 名称作为PuslarListener
annotation 中,如下所示。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
框架通过PulsarListener
并在创建 Pulsar Consumer 之前将此定制器应用于 Consumer 构建器。
如果您有多个PulsarListener
方法,并且每个方法都有不同的自定义规则,则应创建多个定制器 bean 并在每个 bean 上附加适当的定制器PulsarListener
.
9. 消息侦听器容器生命周期
9.1. 暂停和恢复
在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的能力。 当 Pulsar 消息侦听器容器暂停时,容器为从 Pulsar 消费者接收数据而进行的任何轮询都将被暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。
要暂停或恢复侦听器容器,请首先通过PulsarListenerEndpointRegistry
bean,然后在容器实例上调用 pause/resume API - 如下面的代码段所示:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
传递给getListenerContainer 是容器 ID - 这将是@PulsarListener id 属性@PulsarListener . |
9.2. 处理启动失败
消息侦听器容器在刷新应用程序上下文时启动。
默认情况下,启动过程中遇到的任何故障都会重新引发,应用程序将无法启动。
您可以使用StartupFailurePolicy
在相应的容器属性上。
可用选项包括:
-
Stop
(默认) - 记录并重新引发异常,从而有效地停止应用程序 -
Continue
- 记录异常,使容器处于 Non-Running 状态,但不停止应用程序 -
Retry
- 记录异常,重试异步启动容器,但不要停止应用程序。
默认重试行为是重试 3 次,两者之间有 10 秒的延迟 每次尝试。 但是,可以在相应的容器属性上指定自定义重试模板。 如果容器在重试次数用尽后无法重新启动,则容器将处于非运行状态。
10. Pulsar Reader 支持
该框架支持通过PulsarReaderFactory
.
Spring Boot 提供了这个读取器工厂,您可以通过指定任何spring.pulsar.reader.*
应用程序属性。
10.1. PulsarReader 注解
虽然可以使用PulsarReaderFactory
直接,Spring for Apache Pulsar 提供了PulsarReader
注解,你可以用它来快速读取主题,而无需自己设置任何 Reader 工厂。
这与背后的相同想法类似PulsarListener.
下面是一个简短的示例。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
这id
属性是可选的,但最佳实践是提供对应用程序有意义的值。
如果未指定,则将使用自动生成的 ID。
另一方面,topics
和startMessageId
属性是必需的。
这topics
attribute 可以是单个主题或逗号分隔的主题列表。
这startMessageId
属性指示读者从主题中的特定消息开始。
的有效值startMessageId
是earliest
或latest.
假设您希望读者从最早或最新的可用消息以外的主题开始任意读取消息。在这种情况下,您需要使用ReaderBuilderCustomizer
要自定义ReaderBuilder
所以它知道正确的MessageId
开始。
10.2. 自定义 ReaderBuilder
您可以通过ReaderBuilder
使用PulsarReaderReaderBuilderCustomizer
在 Spring 中用于 Apache Pulsar。
您可以提供@Bean
的类型PulsarReaderReaderBuilderCustomizer
,然后将其提供给PulsarReader
如下。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果您的应用程序只有一个@PulsarReader 和一个PulsarReaderReaderBuilderCustomizer bean 已注册,则将自动应用定制器。 |
10.3. 处理启动失败
消息侦听器容器在刷新应用程序上下文时启动。
默认情况下,启动过程中遇到的任何故障都会重新引发,应用程序将无法启动。
您可以使用StartupFailurePolicy
在相应的容器属性上。
可用选项包括:
-
Stop
(默认) - 记录并重新引发异常,从而有效地停止应用程序 -
Continue
- 记录异常,使容器处于 Non-Running 状态,但不停止应用程序 -
Retry
- 记录异常,重试异步启动容器,但不要停止应用程序。
默认重试行为是重试 3 次,两者之间有 10 秒的延迟 每次尝试。 但是,可以在相应的容器属性上指定自定义重试模板。 如果容器在重试次数用尽后无法重新启动,则容器将处于非运行状态。