1. 脉冲星监听器

对于 Pulsar 使用者,我们建议最终用户应用程序使用注解。 要使用 ,您需要使用注释。 当您使用 Spring Boot 支持时,它会自动启用此注解并配置 所需的所有组件,例如消息侦听器基础设施(负责创建 Pulsar 使用者)。 使用 a 创建和管理 Pulsar 消费者,即它用来消费消息的底层 Pulsar 消费者。PulsarListenerPulsarListener@EnablePulsarPulsarListenerPulsarMessageListenerContainerPulsarConsumerFactory

Spring Boot 提供了这个消费者工厂,您可以通过指定 spring.pulsar.consumer.* 应用程序属性来进一步配置它。出厂时的大多数已配置属性都将在侦听器中得到遵守,但以下情况除外

该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}

让我们重新审视一下我们在快速浏览部分看到的代码片段:PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

您可以进一步简化此方法:

@PulsarListener
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

在这种最基本的形式中,当注释中未提供时,将使用自动生成的订阅名称。 同样,当不直接提供时,使用主题解析过程来确定目标主题。subscriptionName@PulsarListenertopics

在前面显示的方法中,我们以 ,但未指定任何模式类型。 在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。 框架检测到您期望该类型,然后根据该信息推断架构类型,并将该架构提供给使用者。 该框架对所有基元类型执行此推理。 对于所有非基元类型,默认架构假定为 JSON。 如果复杂类型使用 JSON 以外的任何内容(如 AVRO 或 KEY_VALUE),则必须使用该属性在批注上提供架构类型。PulsarListenerStringStringschemaType

下面的示例显示了另一种方法,该方法采用:PulsarListenerInteger

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
   System.out.println(message);
}

以下方法显示了如何从主题中使用复杂类型:PulsarListener

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
    System.out.println(message);
}

让我们再看几种方法。

您可以直接使用 Pulsar 消息:

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
}

以下示例使用 Spring 消息信封使用记录:

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
}

现在让我们看看如何批量使用记录。 以下示例用于将批量记录用作 POJO:PulsarListener

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

请注意,在此示例中,我们将记录作为对象的集合 () 接收。 此外,若要在级别上启用批处理使用,需要将批注上的属性设置为 。ListPulsarListenerbatchtrue

根据所持有的实际类型,框架会尝试推断要使用的架构。 如果 除了 JSON 之外还包含复杂类型,则仍需要提供 on .ListListschemaTypePulsarListener

下面使用 Pulsar Java 客户端提供的信封:Message

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

以下示例使用具有 Spring 消息类型信封的批处理记录:Message

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}

最后,您还可以将 Pulsar 中的 holder 对象用于批处理侦听器:Messages

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

使用 时,可以直接在注解本身上提供 Pulsar 使用者属性。 如果您不想使用前面提到的引导配置属性或具有多种方法,这将非常方便。PulsarListenerPulsarListener

以下示例直接在 Pulsar 消费者属性上使用 :PulsarListener

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 使用者属性,而不是应用程序配置属性spring.pulsar.consumer

1.1. 带有AUTO_CONSUME的通用记录

如果无法提前知道 Pulsar 主题的 schema 类型,可以使用该 schema 类型来消费通用记录。 在这种情况下,主题使用与主题关联的架构信息将消息反序列化为对象。AUTO_CONSUMEGenericRecord

要使用通用记录,请将 on your 并使用 Pulsar 消息类型作为消息参数,如下所示。schemaType = SchemaType.AUTO_CONSUME@PulsarListenerGenericRecord

@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
API 允许访问字段及其关联值GenericRecord

1.2. 自定义 ConsumerBuilder

您可以通过提供 of 类型来自定义任何可用的字段,然后使其可用,如下所示。ConsumerBuilderPulsarListenerConsumerBuilderCustomizer@BeanPulsarListenerConsumerBuilderCustomizerPulsarListener

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@PulsarListenerPulsarListenerConsumerBuilderCustomizer
该属性将被忽略,而是在未在批注上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从批注上的值中获取。但是,您可以将 on the annotation 设置为改用属性值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}
使用的属性是直接的 Pulsar 使用者属性,而不是应用程序配置属性spring.pulsar.consumer
API 允许访问字段及其关联值GenericRecord
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@PulsarListenerPulsarListenerConsumerBuilderCustomizer

2. 指定架构信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 . 对于非基元类型,如果未在注解中明确指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。PulsarListenerSchema.JSON

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。

2.1. 自定义模式映射

作为在复杂类型上指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需在侦听器上设置架构,因为框架会使用传入消息类型咨询解析器。PulsarListener

2.1.1. 配置属性

可以使用该属性配置架构映射。 以下示例用于分别使用 和 架构为 和 复杂对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSON

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
是邮件类的完全限定名称。message-type

2.1.2. 模式解析器定制器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析程序定制器来添加映射。

下面的示例使用架构解析程序定制器分别使用 和 架构为复杂对象添加映射:UserAddressAVROJSON

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

指定要用于特定消息类型的缺省模式信息的另一个选项是使用注释标记消息类。 可以通过注释上的属性指定架构信息。@PulsarMessageschemaType

以下示例将系统配置为在生成或使用以下类型的消息时使用 JSON 作为默认架构:Foo

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要在侦听器上设置架构,例如:

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
    System.out.println(user);
}
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。
是邮件类的完全限定名称。message-type

3. 访问 Pulsar 消费者对象

有时,您需要直接访问 Pulsar Consumer 对象。 以下示例演示如何获取它:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}
以这种方式访问对象时,请勿调用任何会通过调用任何接收方法来更改使用者的光标位置的操作。 所有此类操作都必须由容器完成。Consumer
以这种方式访问对象时,请勿调用任何会通过调用任何接收方法来更改使用者的光标位置的操作。 所有此类操作都必须由容器完成。Consumer

4. Pulsar 消息监听器容器

现在我们通过 .现在让我们深入了解如何与底层 Pulsar 消费者交互的内部工作原理。 请记住,对于最终用户应用程序,在大多数情况下,我们建议在将 Spring 用于 Apache Pulsar 时,直接使用注释来使用 Pulsar 主题,因为该模型涵盖了广泛的应用程序用例。 但是,了解内部的工作方式很重要。本节将介绍这些详细信息。PulsarListenerPulsarListenerPulsarListenerPulsarListener

如前所述,当您将 Spring 用于 Apache Pulsar 时,消息侦听器容器是消息使用的核心。 在后台使用消息监听器容器基础设施来创建和管理 Pulsar 消费者。 Spring for Apache Pulsar 通过 . 此消息侦听器容器的缺省实现是通过 提供的。 顾名思义,包含消息侦听器。 容器创建 Pulsar 消费者,然后运行单独的线程来接收和处理数据。 数据由提供的消息侦听器实现处理。PulsarListenerPulsarMessageListenerContainerDefaultPulsarMessageListenerContainerPulsarMessageListenerContainer

消息侦听器容器使用使用者的方法批量使用数据。 一旦收到数据,它就会被移交给选定的消息侦听器实现。batchReceive

当您将 Spring 用于 Apache Pulsar 时,可以使用以下消息侦听器类型。

我们将在以下各节中查看有关这些不同消息侦听器的详细信息。

然而,在这样做之前,让我们仔细看看容器本身。

4.1. DefaultPulsarMessageListenerContainer

这是一个基于使用者的消息侦听器容器。 下面的列表显示了其构造函数:

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
			PulsarContainerProperties pulsarContainerProperties)
}

它接收一个(用于创建使用者)和一个对象(包含有关容器属性的信息)。 具有以下构造函数:PulsarConsumerFactoryPulsarContainerPropertiesPulsarContainerProperties

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)

您可以通过提供给消费者工厂的消费者财产或作为消费者财产提供主题信息。 以下示例使用 :PulsarContainerPropertiesDefaultPulsarMessageListenerContainer

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
		});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
        pulsarContainerProperties);

return pulsarListenerContainer;
如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。PulsarListener

DefaultPulsarMessageListenerContainer仅创建单个使用者。 如果要通过多个线程管理多个使用者,则需要使用 .ConcurrentPulsarMessageListenerContainer

4.2. ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer具有以下构造函数:

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
    PulsarContainerProperties pulsarContainerProperties)

ConcurrentPulsarMessageListenerContainer允许您通过 setter 指定属性。 只有在非独占订阅(、 和 )上才允许并发大于 。 只有当您具有独享订阅模式时,您才能拥有并发默认值。concurrency1failoversharedkey-shared1

以下示例通过订阅的批注启用。concurrencyPulsarListenerfailover

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
    ...
    System.out.println("Current Thread: " + Thread.currentThread().getName());
    System.out.println("Current Consumer: " + consumer.getConsumerName());
}

在前面的侦听器中,假定主题有三个分区。 如果是未分区的主题,则将并发设置为 no 执行任何操作。除了主要的活动使用者之外,您还获得两个空闲使用者。 如果主题有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。 如果运行此 ,您会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。my-topic3PulsarListener

当你以这种方式在分区主题上使用订阅时,Pulsar 会保证消息排序。Failover

以下列表显示了 的另一个示例,但具有 subscription 和 enabled。PulsarListenerSharedconcurrency

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
    ...
}

在前面的示例中,创建了五个不同的使用者(这一次,我们假设主题有五个分区)。PulsarListener

在这个版本中,没有消息排序,因为订阅不保证 Pulsar 中的任何消息排序。Shared

如果需要消息排序,但仍需要共享订阅类型,则需要使用订阅类型。Key_Shared

4.3. 消费记录

让我们看一下消息侦听器容器如何实现单记录和基于批处理的消息使用。

单条记录消耗

为了这次讨论,让我们重新审视我们的基本知识:PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

使用这种方法,我们必须要求 Spring for Apache Pulsar 每次都使用单个记录调用 listener 方法。 我们提到,消息侦听器容器使用使用者上的方法批量使用数据。 框架检测到 ,在本例中,接收到单个记录。这意味着,在每次调用该方法时,它都需要一个单一记录。 尽管消息侦听器容器会批量使用记录,但它会循环访问收到的批处理,并通过 的适配器调用侦听器方法。 如上一节所示,它从 Pulsar Java 客户端提供的扩展而来,它支持基本方法。PulsarListenerbatchReceivePulsarListenerPulsarRecordMessageListenerPulsarRecordMessageListenerMessageListenerreceived

批量消耗

以下示例显示了批量使用记录:PulsarListener

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

使用此类型的 时,框架会检测到您处于批处理模式。 由于它已经使用 Consumer 的方法批量接收数据,因此它通过 的适配器将整个批处理传递给侦听器方法。PulsarListenerbatchReceivePulsarBatchMessageListener

如果在直接使用侦听器容器时未指定主题信息,则将使用相同的主题解析过程,但省略了“消息类型默认值”步骤。PulsarListener
当你以这种方式在分区主题上使用订阅时,Pulsar 会保证消息排序。Failover
在这个版本中,没有消息排序,因为订阅不保证 Pulsar 中的任何消息排序。Shared

5. 脉冲星接头

Pulsar 消息元数据可以作为 Spring 消息头使用。 可以在 PulsarHeaders.java 中找到可用标头的列表。

5.1. 在基于单条记录的消费者中访问

以下示例显示了如何在使用单记录使用模式的应用程序中访问各种 Pulsar 标头:

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

在前面的示例中,我们访问了 和 消息元数据的值以及名为 的自定义消息属性。 Spring 注解用于每个标题字段。messageIdrawDatafoo@Header

您也可以使用 Pulsar 作为包络来携带有效载荷。 这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。 但是,为方便起见,您也可以使用注释来检索它。 请注意,您还可以使用 Spring 消息信封来携带有效负载,然后使用 检索 Pulsar 标头。MessageHeaderMessage@Header

5.2. 基于批量记录的消费者访问

在本节中,我们将了解如何在使用批处理消费者的应用程序中访问各种 Pulsar 标头:

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

在前面的示例中,我们将数据作为 . 在提取各种标头时,我们也这样做。 Spring for Apache Pulsar 确保头列表与数据列表相对应。List<String>List<>

在使用批处理侦听器并接收有效负载时,也可以以相同的方式提取标头,例如 、 或 。List<org.apache.pulsar.client.api.Message<?>org.apache.pulsar.client.api.Messages<?>org.springframework.messaging.Messsge<?>

6. 消息确认

当您将 Spring 用于 Apache Pulsar 时,除非应用程序选择退出,否则消息确认由框架处理。 在本节中,我们将详细介绍框架如何处理消息确认。

6.1. 消息 ACK 模式

Spring for Apache Pulsar 提供了以下确认消息的模式:

  • BATCH

  • RECORD

  • MANUAL

BATCH确认模式是默认模式,但您可以在消息侦听器容器上更改它。 在以下各节中,我们将了解当您同时使用单个版本和批处理版本时,确认是如何工作的,以及它们如何转换为后备消息侦听器容器(并最终转换为 Pulsar 使用者)。PulsarListener

6.2. 单记录模式下的自动消息确认

让我们重新审视我们基于以下基本单消息:PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

很自然地想知道,当你使用 时,确认是如何工作的,特别是如果你熟悉直接使用 Pulsar 消费者。 答案归结为消息侦听器容器,因为它是 Spring 中 Apache Pulsar 的中心位置,负责协调所有与消费者相关的活动。PulsarListener

假设您没有覆盖默认行为,则当您使用上述命令时,幕后会发生以下情况:PulsarListener

  1. 首先,监听器容器从 Pulsar 消费者那里批量接收消息。

  2. 收到的消息一次传递到一条消息。PulsarListener

  3. 当所有记录都传递到侦听器方法并成功处理时,容器将确认原始批处理中的所有消息。

这是正常流程。如果原始批次中的任何记录引发异常,Spring for Apache Pulsar 将单独跟踪这些记录。 当处理完批处理中的所有记录时,Spring for Apache Pulsar 会确认所有成功的消息,并否定地确认(nack)所有失败的消息。 换言之,当使用 using 并使用默认的 ack 模式时,框架会等待从调用中接收到的所有记录处理成功,然后在 Pulsar 消费者上调用该方法。 如果任何特定记录在调用处理程序方法时引发异常,Spring for Apache Pulsar 将跟踪这些记录,并在处理整个批处理后单独调用这些记录。PulsarRecordMessageListenerBATCHbatchReceiveacknowledgenegativeAcknowledge

如果应用程序希望每条记录发生确认或否定确认,则可以启用确认模式。 在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果存在错误,则否定确认消息。 以下示例在 Pulsar 侦听器上启用 ack 模式:RECORDRECORD

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

6.3. 单记录模式下的手动消息确认

您可能并不总是希望框架发送确认,而是直接从应用程序本身发送确认。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
    System.out.println("Message Received: " + message.getValue());
	acknowledgment.acknowledge();
}

这里有几件事值得解释。首先,我们通过设置 来增强手动 ack 模式。 启用手动 ack 模式时,Spring for Apache Pulsar 允许应用程序注入对象。 该框架通过选择兼容的消息侦听器容器来实现这一点:用于基于单条记录的使用,它允许您访问对象。ackModePulsarListenerAcknowledgmentPulsarAcknowledgingMessageListenerAcknowledgment

该对象提供以下 API 方法:Acknowledgment

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

您可以使用 ack 模式将此对象注入到 while 中,然后调用相应的方法之一。AcknowledgmentPulsarListenerMANUAL

在前面的示例中,我们调用一个无参数方法。 这是因为该框架知道它当前在哪个下运行。 调用时,您不需要接收带有信封器的有效负载,而是在本例中使用目标类型 — 。 您还可以通过提供邮件 ID 来调用 的其他变体: 使用 时,必须使用信封接收有效负载。PulsarListeneracknowledgeMessageacknowledge()MessageStringacknowledgeacknowledge.acknowledge(message.getMessageId());acknowledge(messageId)Message<?>

与可能的确认类似,API 还提供了否定确认的选项。 请参阅前面显示的 nack 方法。Acknowledgment

您也可以直接调用 Pulsar 消费者:acknowledge

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
    System.out.println("Message Received: " + message.getValue());
	try {
		consumer.acknowledge(message);
	}
	catch (Exception e) {
		....
	}
}

直接调用底层消费者时,需要自行进行错误处理。 使用 不需要这样做,因为框架可以为您做到这一点。 因此,在使用手动确认时,应使用对象方法。acknowledgeAcknowledgmentAcknowledgment

使用手动确认时,重要的是要了解框架完全不受任何确认的影响。 因此,在设计应用程序时,考虑正确的确认策略非常重要。

6.4. 批量消费中的自动消息确认

当您批量使用记录(请参阅 “消息 ACK 模式”)并使用默认的 ack 模式时,当整个批处理成功时,将确认整个批处理。 如果任何记录引发异常,则整个批次将被否定确认。 请注意,这可能与在生产者端批处理的批次不同。相反,这是从调用消费者返回的批次BATCHbatchReceive

请考虑以下批处理侦听器:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
    for (Foo foo : messages) {
		...
    }
}

当传入集合(在此示例中)中的所有消息都得到处理时,框架将确认所有这些消息。messages

在批处理模式下使用时,不是允许的 ack 模式。 这可能会导致问题,因为应用程序可能不希望再次重新交付整个批处理。 在这种情况下,您需要使用确认模式。RECORDMANUAL

6.5. 批量消费中的手动消息确认

如上一节所示,在消息侦听器容器上设置 ack 模式时,框架不会执行任何确认,无论是正面的还是负面的。 完全由应用程序来处理这些问题。 设置 ack 模式后,Spring for Apache Pulsar 会选择一个兼容的消息侦听器容器:用于批量使用,这样您就可以访问对象。 以下是 API 中可用的方法:MANUALMANUALPulsarBatchAcknowledgingMessageListenerAcknowledgmentAcknowledgment

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

您可以使用 ack 模式将此对象注入到您的 while 中。 以下列表显示了基于批处理的侦听器的基本示例:AcknowledgmentPulsarListenerMANUAL

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
    for (Message<String> message : messages) {
		try {
			...
	        acknowledgment.acknowledge(message.getMessageId());
		}
		catch (Exception e) {
			acknowledgment.nack(message.getMessageId());
		}
    }
}

使用批处理侦听器时,消息侦听器容器无法知道它当前正在对哪条记录进行操作。 因此,若要手动确认,需要使用接受 a 或 a 的重载方法之一。 您也可以使用 for 批处理侦听器进行否定确认。acknowledgeMessageIdList<MessageId>MessageId

使用手动确认时,重要的是要了解框架完全不受任何确认的影响。 因此,在设计应用程序时,考虑正确的确认策略非常重要。

7. 邮件重新传递和错误处理

现在我们已经了解了消息侦听器容器基础结构及其各种功能,现在让我们尝试了解消息重新传递和错误处理。 Apache Pulsar 为消息重新传递和错误处理提供了各种原生策略。我们来看看它们,看看我们如何在 Spring 中将它们用于 Apache Pulsar。PulsarListener

7.1. 指定消息重新传递的确认超时

默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值高于零,并且 Pulsar 使用者在该超时期限内未确认消息,则重新传递消息。

当您将 Spring 用于 Apache Pulsar 时,您可以通过消费者定制器或使用本机 Pulsar 属性在以下属性中设置此属性:ackTimeoutproperties@PulsarListener

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"ackTimeout=60s"})
public void listen(String s) {
    ...
}

当您指定 ack 超时时,如果消费者在 60 秒内没有发送确认,则 Pulsar 会将消息重新传递给消费者。

如果要为具有不同延迟的 ack 超时指定一些高级退避选项,可以执行以下操作:

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

    @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
            topics = "withAckTimeoutRedeliveryBackoff-test-topic",
            ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
            properties = { "ackTimeout=60s" })
    void listen(String msg) {
        // some long-running process that may cause an ack timeout
    }

    @Bean
    RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

在前面的例子中,我们为 Pulsar 指定了一个 bean,最小延迟为 1 秒,最大延迟为 10 秒,退避乘数为 2。 在发生初始 ack 超时后,消息重新传递将通过此回退 Bean 进行控制。 在本例中,我们通过将属性设置为实际的 Bean 名称 — 来为注释提供退避 Bean。RedeliveryBackoffPulsarListenerackTimeoutRedeliveryBackoffackTimeoutRedeliveryBackoff

7.2. 指定否定确认重新送达

当确认为否定时,Pulsar 使用者允许您指定应用程序希望如何重新传递消息。 默认设置是在一分钟内重新传递消息,但您可以通过使用者定制器或使用属性中的原生 Pulsar 属性进行更改:negativeAckRedeliveryDelayproperties@PulsarListener

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
    ...
}

您还可以通过提供 Bean 并在 PulsarProducer 上提供 Bean 名称作为属性来指定具有乘数的不同延迟和回退机制,如下所示:RedeliveryBackoffnegativeAckRedeliveryBackoff

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

    @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
            topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
            subscriptionType = SubscriptionType.Shared)
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @Bean
    RedeliveryBackoff redeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

7.3. 使用 Apache Pulsar 的死信主题进行消息重新传递和错误处理

Apache Pulsar 允许应用程序在具有订阅类型的使用者上使用死信主题。 对于 和 订阅类型,此功能不可用。 其基本思想是,如果一条消息被重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。 让我们通过检查一些代码片段来了解此功能的一些详细信息:SharedExclusiveFailover

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
            topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
            subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

首先,我们有一个特殊的 bean,它被命名为(它可以是任何你想要的名称)。 此 Bean 指定了许多内容,例如最大投放量(在本例中为 10)和死信主题的名称 — 在本例中。 如果未指定 DLQ 主题名称,则默认为 Pulsar 中的 DLQ 主题名称。 接下来,我们通过设置属性来提供此 Bean 名称。 请注意,订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。 此代码主要用于演示目的,因此我们提供 1 秒的值。 这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,它会重试。 如果该循环持续十次(因为这是我们在 中的最大重新交付计数),则 Pulsar 使用者将消息发布到 DLQ 主题。 我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicydeadLetterPolicymy-dlq-topic<topicname>-<subscriptionname>-DLQPulsarListenerdeadLetterPolicyPulsarListenerSharedackTimeoutDeadLetterPolicyPulsarListener

使用分区主题时有关 DLQ 主题的特别说明

如果主主题被分区,则在后台,每个分区都会被 Pulsar 视为一个单独的主题。 Pulsar 将 ,其中 代表分区号附加到主主题名称中。 问题在于,如果你没有指定 DLQ 主题(与我们上面所做的相反),Pulsar 会发布一个包含此信息的默认主题名称,例如:. 解决此问题的简单方法是始终提供 DLQ 主题名称。partition-<n>n`partition-<n>topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ

7.4. Spring for Apache Pulsar 的原生错误处理

如前所述,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。 如果应用程序需要对非共享订阅使用某些类似的功能,该怎么办? Pulsar 不支持独占订阅和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是订单保证的。 允许重新传递、DLQ 等有效地接收无序消息。 但是,如果应用程序可以接受,但更重要的是,非共享订阅需要此 DLQ 功能怎么办? 为此,Spring for Apache Pulsar 提供了一个 ,您可以在 Pulsar 中的任何订阅类型中使用它:、、 或 。PulsarConsumerErrorHandlerExclusiveFailoverSharedKey_Shared

当您从 Spring 使用 Apache Pulsar 时,请确保不要在侦听器上设置 ack 超时属性。PulsarConsumerErrorHandler

让我们通过检查一些代码片段来了解一些细节:

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

    @Bean
    PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
            PulsarTemplate<String> pulsarTemplate) {
        return new DefaultPulsarConsumerErrorHandler<>(
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
    }

    @PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
                    topics = "pulsarConsumerErrorHandler-topic",
					pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
    void listenDlt(String msg) {
        System.out.println("From DLT: " + msg);
    }

}

以豆子为例。 这将创建一个类型的 bean,并使用 Spring 为 Apache Pulsar 提供的开箱即用的默认实现: . 具有一个接受 a 和 a 的构造函数。 是具有以下 API 的功能接口:pulsarConsumerErrorHandlerPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandlerPulsarMessageRecovererFactoryorg.springframework.util.backoff.BackoffPulsarMessageRecovererFactory

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

	/**
	 * Provides a message recoverer {@link PulsarMessageRecoverer}.
	 * @param consumer Pulsar consumer
	 * @return {@link PulsarMessageRecoverer}.
	 */
	PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}

该方法接受一个 Pulsar 消费者并返回一个 ,这是另一个函数接口。 这是 API :recovererForConsumerPulsarMessageRecovererPulsarMessageRecoverer

public interface PulsarMessageRecoverer<T> {

	/**
	 * Recover a failed message, for e.g. send the message to a DLT.
	 * @param message Pulsar message
	 * @param exception exception from failed message
	 */
	void recoverMessage(Message<T> message, Exception exception);

}

Spring for Apache Pulsar 提供了一个 called 实现,该实现提供了一个默认实现,该实现可以通过将消息发送到死信主题 (DLT) 来恢复消息。 我们将此实现提供给前面的构造函数。 作为第二个参数,我们提供 . 您还可以提供 from Spring 以获得高级退避功能。 然后,我们为 的 be 提供这个 bean 名称作为 的属性。 该属性称为 。 每次该方法对消息失败时,都会重试该方法。 重试次数由提供的实现值控制。在我们的示例中,我们进行了 10 次重试(总共 11 次尝试 — 第一次尝试,然后是 10 次重试)。 用尽所有重试后,消息将发送到 DLT 主题。PulsarMessageRecovererFactoryPulsarDeadLetterPublishingRecovererDefaultPulsarConsumerErrorHandlerFixedBackOffExponentialBackoffPulsarConsumerErrorHandlerPulsarListenerpulsarConsumerErrorHandlerPulsarListenerBackoff

我们提供的实现使用用于将消息发布到 DLT 的 a。 在大多数情况下,从 Spring Boot 自动配置的相同功能就足够了,但需要注意分区主题。 使用分区主题和对主主题使用自定义消息路由时,必须使用不采用填充值为 for 的自动配置的其他消息路由 。 您可以将 a 与以下蓝图一起使用:PulsarDeadLetterPublishingRecovererPulsarTemplatePulsarTemplatePulsarTemplatePulsarProducerFactorycustompartitionmessage-routing-modePulsarConsumerErrorHandler

@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
    PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
        PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

        BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
                (c, m) -> "my-foo-dlt";

        PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

        return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
                new FixedBackOff(100, 5));
}

请注意,我们为 作为第二个构造函数参数提供了一个目标解析器。 如果未提供,则用作 DLT 主题名称。 使用此功能时,应通过设置目标解析程序而不是使用默认值来使用正确的目标名称。PulsarDeadLetterPublishingRecovererPulsarDeadLetterPublishingRecoverer<subscription-name>-<topic-name>-DLT>

使用单条记录消息侦听器时,就像我们对 所做的那样,如果您使用手动确认,请确保在引发异常时不要否定地确认消息。 相反,将异常重新抛回容器。否则,容器会认为消息是单独处理的,并且不会触发错误处理。PulsarConsumerErrorHnadler

最后,我们还有第二个接收来自 DLT 主题的消息。PulsarListener

到目前为止,在本节提供的示例中,我们只看到了如何与单个记录消息侦听器一起使用。 接下来,我们将了解如何在批处理侦听器上使用它。PulsarConsumerErrorHandler

7.5. 使用 PulsarConsumerErrorHandler 的批处理侦听器

首先,让我们看一个批处理方法:PulsarListener

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
            subscriptionType = SubscriptionType.Failover,
            pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
    for (Message<Integer> datum : data) {
        if (datum.getValue() == 5) {
            throw new PulsarBatchListenerFailedException("failed", datum);
        }
        acknowledgement.acknowledge(datum.getMessageId());
    }
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
        PulsarTemplate<String> pulsarTemplate) {
    return new DefaultPulsarConsumerErrorHandler<>(
            new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
    System.out.println("DLT - RECEIVED: " + message.getValue());
}

再一次,我们为 bean 名称提供属性。 当您使用批处理侦听器(如前面的示例所示)并希望将 from Spring 用于 Apache Pulsar 时,您需要使用手动确认。 这样,您可以确认所有成功的单个消息。 对于失败的,您必须抛出一个带有失败的消息。 如果没有此例外,框架不知道如何处理故障。 重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。 如果再次失败,则重试,直到重试次数用尽,此时消息将发送到 DLT。 此时,容器会确认消息,并将侦听器与原始批处理中的后续消息一起移交。pulsarConsumerErrorHandlerPulsarConsumerErrorHandlerPulsarConsumerErrorHandlerPulsarBatchListenerFailedException

8. PulsarListener 上的消费者定制

Spring for Apache Pulsar 提供了一种方便的方式来自定义由 . 应用程序可以为 提供 bean 。 下面是一个示例。PulsarListenerPulsarListenerConsumerBuilderCustomizer

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return cb -> {
        cb.subscriptionName("modified-subscription-name");
    };
}

然后,可以将此定制器 Bean 名称作为注释上的属性提供,如下所示。PuslarListener

@PulsarListener(subscriptionName = "my-subscription",
        topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}

在创建 Pulsar Consumer 之前,该框架通过 在 Consumer 构建器上检测提供的 Bean,并将此定制器应用于 Consumer 构建器。PulsarListener

如果您有多个方法,并且每个方法都有不同的定制规则,那么您应该创建多个定制器 Bean 并在每个 .PulsarListenerPulsarListener

9. 暂停和恢复消息侦听器容器

在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后再恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的功能。 当 Pulsar 消息监听器容器暂停时,容器为接收 Pulsar 消费者数据而进行的任何轮询都将暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。

要暂停或恢复侦听器容器,请先通过 Bean 获取容器实例,然后在容器实例上调用暂停/恢复 API - 如下图所示:PulsarListenerEndpointRegistry

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
  PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
  container.pause();
}
传递给的 id 参数是容器 id - 在暂停/恢复 .getListenerContainer@PulsarListener@PulsarListener
传递给的 id 参数是容器 id - 在暂停/恢复 .getListenerContainer@PulsarListener@PulsarListener

10. Pulsar 阅读器支持

该框架支持通过 .PulsarReaderFactory

Spring Boot 提供了此读取器工厂,您可以通过指定任何 spring.pulsar.reader.* 应用程序属性来进一步配置该工厂。

10.1. PulsarReader 注解

虽然可以直接使用,但 Spring for Apache Pulsar 提供了注释,您可以使用它来快速阅读主题,而无需自己设置任何阅读器工厂。 这与背后的相同想法类似 这里是一个简单的例子。PulsarReaderFactoryPulsarReaderPulsarListener.

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
    //...
}

该属性是可选的,但最佳做法是提供对应用程序有意义的值。 如果未指定,将使用自动生成的 ID。 另一方面,和 属性是必需的。 该属性可以是单个主题,也可以是以逗号分隔的主题列表。 该属性指示读者从主题中的特定消息开始。 的有效值是 或 假设您希望读者开始从最早或最新的可用消息以外的主题任意读取消息。在这种情况下,您需要使用 a 来自定义,以便它知道从中开始的权利。idtopicsstartMessageIdtopicsstartMessageIdstartMessageIdearliestlatest.ReaderBuilderCustomizerReaderBuilderMessageId

10.2. 自定义 ReaderBuilder

您可以通过在 Spring for Apache Pulsar 中使用 来自定义任何可用的字段。 您可以提供 of 类型,然后将其提供给以下内容。ReaderBuilderPulsarReaderReaderBuilderCustomizer@BeanPulsarReaderReaderBuilderCustomizerPulsarReader

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
    readerCustomizer = "myCustomizer")
void read(String message) {
    //...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
    return readerBuilder -> {
        readerBuilder.startMessageId(messageId); // the first message read is after this message id.
        // Any other customizations on the readerBuilder
    };
}
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@PulsarReaderPulsarReaderReaderBuilderCustomizer
如果您的应用程序只注册了一个 Bean 和一个 Bean,那么将自动应用定制器。@PulsarReaderPulsarReaderReaderBuilderCustomizer