Message Consumption

1. Pulsar Listener

When it comes to Pulsar consumers, we recommend that end-user applications use the PulsarListener annotation. To use PulsarListener, you need to use the @EnablePulsar annotation. When you use Spring Boot support, it automatically enables this annotation and configures all the components necessary for PulsarListener, such as the message listener infrastructure (which is responsible for creating the Pulsar consumer). PulsarMessageListenerContainer uses a PulsarConsumerFactory to create and manage the Pulsar consumer the underlying Pulsar consumer that it uses to consume messages.spring-doc.cn

Spring Boot provides this consumer factory which you can further configure by specifying the spring.pulsar.consumer.* application properties.spring-doc.cn

Let us revisit the PulsarListener code snippet we saw in the quick-tour section:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

You can further simplify this method:spring-doc.cn

@PulsarListener
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

In this most basic form, when the subscriptionName is not provided on the @PulsarListener annotation an auto-generated subscription name will be used. Likewise, when the topics are not directly provided, a topic resolution process is used to determine the destination topic.spring-doc.cn

In the PulsarListener method shown earlier, we receive the data as String, but we do not specify any schema types. Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type. The framework detects that you expect the String type and then infers the schema type based on that information and provides that schema to the consumer. The framework does this inference for all primitive types. For all non-primitive types the default schema is assumed to be JSON. If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the schemaType property.spring-doc.cn

The following example shows another PulsarListener method, which takes an Integer:spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
   System.out.println(message);
}

The following PulsarListener method shows how we can consume complex types from a topic:spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
    System.out.println(message);
}

Let us look at a few more ways.spring-doc.cn

You can consume the Pulsar message directly:spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
}

The following example consumes the record by using the Spring messaging envelope:spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
}

Now let us see how we can consume records in batches. The following example uses PulsarListener to consume records in batches as POJOs:spring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

Note that, in this example, we receive the records as a collection (List) of objects. In addition, to enable batch consumption at the PulsarListener level, you need to set the batch property on the annotation to true.spring-doc.cn

Based on the actual type that the List holds, the framework tries to infer the schema to use. If the List contains a complex type besides JSON, you still need to provide the schemaType on PulsarListener.spring-doc.cn

The following uses the Message envelope provided by the Pulsar Java client:spring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

The following example consumes batch records with an envelope of the Spring messaging Message type:spring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}

Finally, you can also use the Messages holder object from Pulsar for the batch listener:spring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

When you use PulsarListener, you can provide Pulsar consumer properties directly on the annotation itself. This is convenient if you do not want to use the Boot configuration properties mentioned earlier or have multiple PulsarListener methods.spring-doc.cn

The following example uses Pulsar consumer properties directly on PulsarListener:spring-doc.cn

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer application configuration properties

1.1. Generic records with AUTO_CONSUME

If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the AUTO_CONSUME schema type to consume generic records. In this case, the topic deserializes messages into GenericRecord objects using the schema info associated with the topic.spring-doc.cn

To consume generic records set the schemaType = SchemaType.AUTO_CONSUME on your @PulsarListener and use a Pulsar message of type GenericRecord as the message parameter as shown below.spring-doc.cn

@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
The GenericRecord API allows access to the fields and their associated values

1.2. Customizing the ConsumerBuilder

You can customize any fields available through ConsumerBuilder using a PulsarListenerConsumerBuilderCustomizer by providing a @Bean of type PulsarListenerConsumerBuilderCustomizer and then making it available to the PulsarListener as shown below.spring-doc.cn

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return (builder) -> builder.consumerName("myConsumer");
}
If your application only has a single @PulsarListener and a single PulsarListenerConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.

2. Specifying Schema Information

As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the PulsarListener. For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.spring-doc.cn

Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding.

2.1. Custom Schema Mapping

As an alternative to specifying the schema on the PulsarListener for complex types, the schema resolver can be configured with mappings for the types. This removes the need to set the schema on the listener as the framework consults the resolver using the incoming message type.spring-doc.cn

2.1.1. Configuration properties

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:spring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
The message-type is the fully-qualified name of the message class.

2.1.2. Schema resolver customizer

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).spring-doc.cn

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:spring-doc.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. Type mapping annotation

Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage annotation. The schema info can be specified via the schemaType attribute on the annotation.spring-doc.cn

The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo:spring-doc.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

With this configuration in place, there is no need to set the schema on the listener, for example:spring-doc.cn

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
    System.out.println(user);
}

3. Accessing the Pulsar Consumer Object

Sometimes, you need direct access to the Pulsar Consumer object. The following example shows how to get it:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}
When accessing the Consumer object this way, do NOT invoke any operations that would change the Consumer’s cursor position by invoking any receive methods. All such operations must be done by the container.

4. Pulsar Message Listener Container

Now that we saw the basic interactions on the consumer side through PulsarListener. Let us now dive into the inner workings of how PulsarListener interacts with the underlying Pulsar consumer. Keep in mind that, for end-user applications, in most scenarios, we recommend using the PulsarListener annotation directly for consuming from a Pulsar topic when using Spring for Apache Pulsar, as that model covers a broad set of application use cases. However, it is important to understand how PulsarListener works internally. This section goes through those details.spring-doc.cn

As briefly mentioned earlier, the message listener container is at the heart of message consumption when you use Spring for Apache Pulsar. PulsarListener uses the message listener container infrastructure behind the scenes to create and manage the Pulsar consumer. Spring for Apache Pulsar provides the contract for this message listener container through PulsarMessageListenerContainer. The default implementation for this message listener container is provided through DefaultPulsarMessageListenerContainer. As its name indicates, PulsarMessageListenerContainer contains the message listener. The container creates the Pulsar consumer and then runs a separate thread to receive and handle the data. The data is handled by the provided message listener implementation.spring-doc.cn

The message listener container consumes the data in batch by using the consumer’s batchReceive method. Once data is received, it is handed over to the selected message listener implementation.spring-doc.cn

The following message listener types are available when you use Spring for Apache Pulsar.spring-doc.cn

We see the details about these various message listeners in the following sections.spring-doc.cn

Before doing so, however, let us take a closer look at the container itself.spring-doc.cn

4.1. DefaultPulsarMessageListenerContainer

This is a single consumer-based message listener container. The following listing shows its constructor:spring-doc.cn

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
			PulsarContainerProperties pulsarContainerProperties)
}

It receives a PulsarConsumerFactory (which it uses to create the consumer) and a PulsarContainerProperties object (which contains information about the container properties). PulsarContainerProperties has the following constructors:spring-doc.cn

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)

You can provide the topic information through PulsarContainerProperties or as a consumer property that is provided to the consumer factory. The following example uses the DefaultPulsarMessageListenerContainer:spring-doc.cn

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
		});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
        pulsarContainerProperties);

return pulsarListenerContainer;
If topic information is not specified when using the listener containers directly, the same topic resolution process used by the PulsarListener is used with the one exception that the "Message type default" step is omitted.

DefaultPulsarMessageListenerContainer creates only a single consumer. If you want to have multiple consumers managed through multiple threads, you need to use ConcurrentPulsarMessageListenerContainer.spring-doc.cn

4.2. ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer has the following constructor:spring-doc.cn

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
    PulsarContainerProperties pulsarContainerProperties)

ConcurrentPulsarMessageListenerContainer lets you specify a concurrency property through a setter. Concurrency of more than 1 is allowed only on non-exclusive subscriptions (failover, shared, and key-shared). You can only have the default 1 for concurrency when you have an exclusive subscription mode.spring-doc.cn

The following example enables concurrency through the PulsarListener annotation for a failover subscription.spring-doc.cn

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
    ...
    System.out.println("Current Thread: " + Thread.currentThread().getName());
    System.out.println("Current Consumer: " + consumer.getConsumerName());
}

In the preceding listener, it is assumed that the topic my-topic has three partitions. If it is a non-partitioned topic, having concurrency set to 3 does nothing. You get two idle consumers in addition to the main active one. If the topic has more than three partitions, messages are load-balanced across the consumers that the container creates. If you run this PulsarListener, you see that messages from different partitions are consumed through different consumers, as implied by the thread name and consumer names printouts in the preceding example.spring-doc.cn

When you use the Failover subscription this way on partitioned topics, Pulsar guarantees message ordering.

The following listing shows another example of PulsarListener, but with Shared subscription and concurrency enabled.spring-doc.cn

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
    ...
}

In the preceding example, the PulsarListener creates five different consumers (this time, we assume that the topic has five partitions).spring-doc.cn

In this version, there is no message ordering, as Shared subscriptions do not guarantee any message ordering in Pulsar.

If you need message ordering and still want a shared subscription types, you need to use the Key_Shared subscription type.spring-doc.cn

4.3. Consuming Records

Let us take a look at how the message listener container enables both single-record and batch-based message consumption.spring-doc.cn

Single Record Consumption

Let us revisit our basic PulsarListener for the sake of this discussion:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

With this PulsarListener method, we essential ask Spring for Apache Pulsar to invoke the listener method with a single record each time. We mentioned that the message listener container consumes the data in batches using the batchReceive method on the consumer. The framework detects that the PulsarListener, in this case, receives a single record. This means that, on each invocation of the method, it needs a singe record. Although the records are consumed by the message listener container in batches, it iterates through the received batch and invokes the listener method through an adapter for PulsarRecordMessageListener. As you can see in the previous section, PulsarRecordMessageListener extends from the MessageListener provided by the Pulsar Java client, and it supports the basic received method.spring-doc.cn

Batch Consumption

The following example shows the PulsarListener consuming records in batches:spring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

When you use this type of PulsarListener, the framework detects that you are in batch mode. Since it already received the data in batches by using the Consumer’s batchReceive method, it hands off the entire batch to the listener method through an adapter for PulsarBatchMessageListener.spring-doc.cn

5. Pulsar Headers

The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in PulsarHeaders.java.spring-doc.cn

5.1. Accessing in Single Record based Consumer

The following example shows how you can access the various Pulsar Headers in an application that uses the single record mode of consuming:spring-doc.cn

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

In the preceding example, we access the values for the messageId and rawData message metadata as well as a custom message property named foo. The Spring @Header annotation is used for each header field.spring-doc.cn

You can also use Pulsar’s Message as the envelope to carry the payload. When doing so, the user can directly call the corresponding methods on the Pulsar message for retrieving the metadata. However, as a convenience, you can also retrieve it by using the Header annotation. Note that you can also use the Spring messaging Message envelope to carry the payload and then retrieve the Pulsar headers by using @Header.spring-doc.cn

5.2. Accessing in Batch Record based Consumer

In this section, we see how to access the various Pulsar Headers in an application that uses a batch consumer:spring-doc.cn

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

In the preceding example, we consume the data as a List<String>. When extracting the various headers, we do so as a List<> as well. Spring for Apache Pulsar ensures that the headers list corresponds to the data list.spring-doc.cn

You can also extract headers in the same manner when you use the batch listener and receive payloads as List<org.apache.pulsar.client.api.Message<?>, org.apache.pulsar.client.api.Messages<?>, or org.springframework.messaging.Messsge<?>.spring-doc.cn

6. Message Acknowledgment

When you use Spring for Apache Pulsar, the message acknowledgment is handled by the framework, unless opted out by the application. In this section, we go through the details of how the framework takes care of message acknowledgment.spring-doc.cn

6.1. Message ACK modes

Spring for Apache Pulsar provides the following modes for acknowledging messages:spring-doc.cn

BATCH acknowledgment mode is the default, but you can change it on the message listener container. In the following sections, we see how acknowledgment works when you use both single and batch versions of PulsarListener and how they translate to the backing message listener container (and, ultimately, to the Pulsar consumer).spring-doc.cn

6.2. Automatic Message Ack in Single Record Mode

Let us revisit our basic single message based PulsarListener:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

It is natural to wonder, how acknowledgment works when you use PulsarListener, especially if you are familiar with using Pulsar consumer directly. The answer comes down to the message listener container, as that is the central place in Spring for Apache Pulsar that coordinates all the consumer related activities.spring-doc.cn

Assuming you are not overriding the default behavior, this is what happens behind the scenes when you use the preceding PulsarListener:spring-doc.cn

  1. First, the listener container receives messages as batches from the Pulsar consumer.spring-doc.cn

  2. The received messages are handed down to PulsarListener one message at a time.spring-doc.cn

  3. When all the records are handed down to the listener method and successfully processed, the container acknowledges all the messages from the original batch.spring-doc.cn

This is the normal flow. If any records from the original batch throw an exception, Spring for Apache Pulsar track those records separately. When all the records from the batch are processed, Spring for Apache Pulsar acknowledges all the successful messages and negatively acknowledges (nack) all the failed messages. In other words, when consuming single records by using PulsarRecordMessageListener and the default ack mode of BATCH is used, the framework waits for all the records received from the batchReceive call to process successfully and then calls the acknowledge method on the Pulsar consumer. If any particular record throws an exception when invoking the handler method, Spring for Apache Pulsar tracks those records and separately calls negativeAcknowledge on those records after the entire batch is processed.spring-doc.cn

If the application wants the acknowledgment or negative acknowledgment to occur per record, the RECORD ack mode can be enabled. In that case, after handling each record, the message is acknowledged if no error and negatively acknowledged if there was an error. The following example enables RECORD ack mode on the Pulsar listener:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

6.3. Manual Message Ack in Single Record Mode

You might not always want the framework to send acknowledgments but, rather, do that directly from the application itself. Spring for Apache Pulsar provides a couple of ways to enable manual message acknowledgments. The following example shows one of them:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
    System.out.println("Message Received: " + message.getValue());
	acknowledgment.acknowledge();
}

A few things merit explanation here. First, we enablE manual ack mode by setting ackMode on PulsarListener. When enabling manual ack mode, Spring for Apache Pulsar lets the application inject an Acknowledgment object. The framework achieves this by selecting a compatible message listener container: PulsarAcknowledgingMessageListener for single record based consumption, which gives you access to an Acknowledgment object.spring-doc.cn

The Acknowledgment object provides the following API methods:spring-doc.cn

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

You can inject this Acknowledgment object into your PulsarListener while using MANUAL ack mode and then call one of the corresponding methods.spring-doc.cn

In the preceding PulsarListener example, we call a parameter-less acknowledge method. This is because the framework knows which Message it is currently operating under. When calling acknowledge(), you need not receive the payload with the Message enveloper` but, rather, use the target type — String, in this example. You can also call a different variant of acknowledge by providing the message ID: acknowledge.acknowledge(message.getMessageId()); When you use acknowledge(messageId), you must receive the payload by using the Message<?> envelope.spring-doc.cn

Similar to what is possible for acknowledging, the Acknowledgment API also provides options for negatively acknowledging. See the nack methods shown earlier.spring-doc.cn

You can also call acknowledge directly on the Pulsar consumer:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
    System.out.println("Message Received: " + message.getValue());
	try {
		consumer.acknowledge(message);
	}
	catch (Exception e) {
		....
	}
}

When calling acknowledge directly on the underlying consumer, you need to do error handling by yourself. Using the Acknowledgment does not require that, as the framework can do that for you. Therefore, you should use the Acknowledgment object approach when using manual acknowledgment.spring-doc.cn

When using manual acknowledgment, it is important to understand that the framework completely stays from any acknowledgment at all. Hence, it is extremely important to think through the right acknowledgment strategies when designing applications.

6.4. Automatic Message Ack in Batch Consumption

When you consume records in batches (see “Message ACK modes”) and you use the default ack mode of BATCH is used, when the entire batch is processed successfully, the entire batch is acknowledged. If any records throw an exception, the entire batch is negatively acknowledged. Note that this may not be the same batch that was batched on the producer side. Rather, this is the batch that returned from calling batchReceive on the consumerspring-doc.cn

Consider the following batch listener:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
    for (Foo foo : messages) {
		...
    }
}

When all the messages in the incoming collection (messages in this example) are processed, the framework acknowledges all of them.spring-doc.cn

When consuming in batch mode, RECORD is not an allowed ack mode. This might cause an issue, as an application may not want the entire batch to be re-delivered again. In such situations, you need to use the MANUAL acknowledgement mode.spring-doc.cn

6.5. Manual Message Ack in Batch Consumption

As seen in the previous section, when MANUAL ack mode is set on the message listener container, the framework does not do any acknowledgment, positive or negative. It is entirely up to the application to take care of such concerns. When MANUAL ack mode is set, Spring for Apache Pulsar selects a compatible message listener container: PulsarBatchAcknowledgingMessageListener for batch consumption, which gives you access to an Acknowledgment object. The following are the methods available in the Acknowledgment API:spring-doc.cn

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

You can inject this Acknowledgment object into your PulsarListener while using MANUAL ack mode. The following listing shows a basic example for a batch based listener:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
    for (Message<String> message : messages) {
		try {
			...
	        acknowledgment.acknowledge(message.getMessageId());
		}
		catch (Exception e) {
			acknowledgment.nack(message.getMessageId());
		}
    }
}

When you use a batch listener, the message listener container cannot know which record it is currently operating upon. Therefore, to manually acknowledge, you need to use one of the overloaded acknowledge method that takes a MessageId or a List<MessageId>. You can also negatively acknowledge with the MessageId for the batch listener.spring-doc.cn

7. Message Redelivery and Error Handling

Now that we have seen both PulsarListener and the message listener container infrastructure and its various functions, let us now try to understand message redelivery and error handling. Apache Pulsar provides various native strategies for message redelivery and error handling. We take a look at them and see how we can use them through Spring for Apache Pulsar.spring-doc.cn

7.1. Specifying Acknowledgment Timeout for Message Redelivery

By default, Pulsar consumers do not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer. If the ack timeout property has a value above zero and if the Pulsar consumer does not acknowledge a message within that timeout period, the message is redelivered.spring-doc.cn

When you use Spring for Apache Pulsar, you can set this property via a consumer customizer or with the native Pulsar ackTimeout property in the properties attribute of @PulsarListener:spring-doc.cn

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"ackTimeout=60s"})
public void listen(String s) {
    ...
}

When you specify the ack timeout, if the consumer does not send an acknowledgement within 60 seconds, the message is redelivered by Pulsar to the consumer.spring-doc.cn

If you want to specify some advanced backoff options for ack timeout with different delays, you can do the following:spring-doc.cn

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

    @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
            topics = "withAckTimeoutRedeliveryBackoff-test-topic",
            ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
            properties = { "ackTimeout=60s" })
    void listen(String msg) {
        // some long-running process that may cause an ack timeout
    }

    @Bean
    RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

In the preceding example, we specify a bean for Pulsar’s RedeliveryBackoff with a minimum delay of 1 second, a maximum delay of 10 seconds, and a backoff multiplier of 2. After the initial ack timeout occurs, the message redeliveries are controlled through this backoff bean. We provide the backoff bean to the PulsarListener annotation by setting the ackTimeoutRedeliveryBackoff property to the actual bean name — ackTimeoutRedeliveryBackoff, in this case.spring-doc.cn

7.2. Specifying Negative Acknowledgment Redelivery

When acknowledging negatively, Pulsar consumer lets you specify how the application wants the message to be re-delivered. The default is to redeliver the message in one minute, but you can change it via a consumer customizer or with the native Pulsar negativeAckRedeliveryDelay property in the properties attribute of @PulsarListener:spring-doc.cn

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
    ...
}

You can also specify different delays and backoff mechanisms with a multiplier by providing a RedeliveryBackoff bean and providing the bean name as the negativeAckRedeliveryBackoff property on the PulsarProducer, as follows:spring-doc.cn

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

    @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
            topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
            subscriptionType = SubscriptionType.Shared)
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @Bean
    RedeliveryBackoff redeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

7.3. Using Dead Letter Topic from Apache Pulsar for Message Redelivery and Error Handling

Apache Pulsar lets applications use a dead letter topic on consumers with a Shared subscription type. For the Exclusive and Failover subscription types, this feature is not available. The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ). Let us see some details around this feature in action by inspecting some code snippets:spring-doc.cn

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
            topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
            subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

First, we have a special bean for DeadLetterPolicy, and it is named as deadLetterPolicy (it can be any name as you wish). This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic, in this case. If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ in Pulsar. Next, we provide this bean name to PulsarListener by setting the deadLetterPolicy property. Note that the PulsarListener has a subscription type of Shared, as the DLQ feature only works with shared subscriptions. This code is primarily for demonstration purposes, so we provide an ackTimeout value of 1 second. The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry. If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy), the Pulsar consumer publishes the messages to the DLQ topic. We have another PulsarListener that listens on the DLQ topic to receive data as it is published to the DLQ topic.spring-doc.cn

Special note on DLQ topics when using partitioned topics

If the main topic is partitioned, behind the scenes, each partition is treated as a separate topic by Pulsar. Pulsar appends partition-<n>, where n stands for the partition number to the main topic name. The problem is that, if you do not specify a DLQ topic (as opposed to what we did above), Pulsar publishes to a default topic name that has this `partition-<n> info in it — for example: topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ. The easy way to solve this is to provide a DLQ topic name always.spring-doc.cn

7.4. Native Error Handling in Spring for Apache Pulsar

As we noted earlier, the DLQ feature in Apache Pulsar works only for shared subscriptions. What does an application do if it needs to use some similar feature for non-shared subscriptions? The main reason Pulsar does not support DLQ on exclusive and failover subscriptions is because those subscription types are order-guaranteed. Allowing redeliveries, DLQ, and so on effectively receives messages out of order. However, what if an application are okay with that but, more importantly, needs this DLQ feature for non-shared subscriptions? For that, Spring for Apache Pulsar provides a PulsarConsumerErrorHandler, which you can use across any subscription types in Pulsar: Exclusive, Failover, Shared, or Key_Shared.spring-doc.cn

When you use PulsarConsumerErrorHandler from Spring for Apache Pulsar, make sure not to set the ack timeout properties on the listener.spring-doc.cn

Let us see some details by examining a few code snippets:spring-doc.cn

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

    @Bean
    PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
            PulsarTemplate<String> pulsarTemplate) {
        return new DefaultPulsarConsumerErrorHandler<>(
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
    }

    @PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
                    topics = "pulsarConsumerErrorHandler-topic",
					pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
    void listenDlt(String msg) {
        System.out.println("From DLT: " + msg);
    }

}

Consider the pulsarConsumerErrorHandler bean. This creates a bean of type PulsarConsumerErrorHandler and uses the default implementation provided out of the box by Spring for Apache Pulsar: DefaultPulsarConsumerErrorHandler. DefaultPulsarConsumerErrorHandler has a constructor that takes a PulsarMessageRecovererFactory and a org.springframework.util.backoff.Backoff. PulsarMessageRecovererFactory is a functional interface with the following API:spring-doc.cn

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

	/**
	 * Provides a message recoverer {@link PulsarMessageRecoverer}.
	 * @param consumer Pulsar consumer
	 * @return {@link PulsarMessageRecoverer}.
	 */
	PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}

The recovererForConsumer method takes a Pulsar consumer and returns a PulsarMessageRecoverer, which is another functional interface. Here is the API of PulsarMessageRecoverer:spring-doc.cn

public interface PulsarMessageRecoverer<T> {

	/**
	 * Recover a failed message, for e.g. send the message to a DLT.
	 * @param message Pulsar message
	 * @param exception exception from failed message
	 */
	void recoverMessage(Message<T> message, Exception exception);

}

Spring for Apache Pulsar provides an implementation for PulsarMessageRecovererFactory called PulsarDeadLetterPublishingRecoverer that provides a default implementation that can recover the message by sending it to a Dead Letter Topic (DLT). We provide this implementation to the constructor for the preceding DefaultPulsarConsumerErrorHandler. As the second argument, we provide a FixedBackOff. You can also provide the ExponentialBackoff from Spring for advanced backoff features. Then we provide this bean name for the PulsarConsumerErrorHandler as a property to the PulsarListener. The property is called pulsarConsumerErrorHandler. Each time the PulsarListener method fails for a message, it gets retried. The number of retries are controlled by the Backoff provided implementation values. In our example, we do 10 retries (11 total tries — the first one and then the 10 retries). Once all the retries are exhausted, the message is sent to the DLT topic.spring-doc.cn

The PulsarDeadLetterPublishingRecoverer implementation we provide uses a PulsarTemplate that is used for publishing the message to the DLT. In most cases, the same auto-configured PulsarTemplate from Spring Boot is sufficient with the caveat for partitioned topics. When using partitioned topics and using custom message routing for the main topic, you must use a different PulsarTemplate that does not take the auto-configured PulsarProducerFactory that is populated with a value of custompartition for message-routing-mode. You can use a PulsarConsumerErrorHandler with the following blueprint:spring-doc.cn

@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
    PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
        PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

        BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
                (c, m) -> "my-foo-dlt";

        PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

        return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
                new FixedBackOff(100, 5));
}

Note that we are provide a destination resolver to the PulsarDeadLetterPublishingRecoverer as the second constructor argument. If not provided, PulsarDeadLetterPublishingRecoverer uses <subscription-name>-<topic-name>-DLT> as the DLT topic name. When using this feature, you should use a proper destination name by setting the destination resolver rather than using the default.spring-doc.cn

When using a single record message listener, as we did with PulsarConsumerErrorHnadler, and if you use manual acknowledgement, make sure to not negatively acknowledge the message when an exception is thrown. Rather, re-throw the exception back to the container. Otherwise, the container thinks the message is handled separately, and the error handling is not triggered.spring-doc.cn

Finally, we have a second PulsarListener that receives messages from the DLT topic.spring-doc.cn

In the examples provided in this section so far, we only saw how to use PulsarConsumerErrorHandler with a single record message listener. Next, we look at how you can use this on batch listeners.spring-doc.cn

7.5. Batch listener with PulsarConsumerErrorHandler

First, let us look at a batch PulsarListener method:spring-doc.cn

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
            subscriptionType = SubscriptionType.Failover,
            pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
    for (Message<Integer> datum : data) {
        if (datum.getValue() == 5) {
            throw new PulsarBatchListenerFailedException("failed", datum);
        }
        acknowledgement.acknowledge(datum.getMessageId());
    }
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
        PulsarTemplate<String> pulsarTemplate) {
    return new DefaultPulsarConsumerErrorHandler<>(
            new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
    System.out.println("DLT - RECEIVED: " + message.getValue());
}

Once again, we provide the pulsarConsumerErrorHandler property with the PulsarConsumerErrorHandler bean name. When you use a batch listener (as shown in the preceding example) and want to use the PulsarConsumerErrorHandler from Spring for Apache Pulsar, you need to use manual acknowledgment. This way, you can acknowledge all the successful individual messages. For the ones that fail, you must throw a PulsarBatchListenerFailedException with the message on which it fails. Without this exception, the framework does not know what to do with the failure. On retry, the container sends a new batch of messages, starting with the failed message to the listener. If it fails again, it is retried, until the retries are exhausted, at which point the message is sent to the DLT. At that point, the message is acknowledged by the container, and the listener is handed over with the subsequent messages in the original batch.spring-doc.cn

8. Consumer Customization on PulsarListener

Spring for Apache Pulsar provides a convenient way to customize the consumer created by the container used by the PulsarListener. Applications can provide a bean for PulsarListenerConsumerBuilderCustomizer. Here is an example.spring-doc.cn

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return cb -> {
        cb.subscriptionName("modified-subscription-name");
    };
}

Then this customizer bean name can be provided as an attribute on the PuslarListener annotation as shown below.spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription",
        topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}

The framework detects the provided bean through the PulsarListener and applies this customizer on the Consumer builder before creating the Pulsar Consumer.spring-doc.cn

If you have multiple PulsarListener methods, and each of them have different customization rules, you should create multiple customizer beans and attach the proper customizers on each PulsarListener.spring-doc.cn

9. Message Listener Container Lifecycle

9.1. Pausing and Resuming

There are situations in which an application might want to pause message consumption temporarily and then resume later. Spring for Apache Pulsar provides the ability to pause and resume the underlying message listener containers. When the Pulsar message listener container is paused, any polling done by the container to receive data from the Pulsar consumer will be paused. Similarly, when the container is resumed, the next poll starts returning data if the topic has any new records added while paused.spring-doc.cn

To pause or resume a listener container, first obtain the container instance via the PulsarListenerEndpointRegistry bean and then invoke the pause/resume API on the container instance - as shown in the snippet below:spring-doc.cn

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
  PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
  container.pause();
}
The id parameter passed to getListenerContainer is the container id - which will be the value of the @PulsarListener id attribute when pausing/resuming a @PulsarListener.

9.2. Handling Startup Failures

Message listener containers are started when the application context is refreshed. By default, any failures encountered during startup are re-thrown and the application will fail to start. You can adjust this behavior with the StartupFailurePolicy on the corresponding container properties.spring-doc.cn

The available options are:spring-doc.cn

  • Stop (default) - log and re-throw the exception, effectively stopping the applicationspring-doc.cn

  • Continue - log the exception, leave the container in a non-running state, but do not stop the applicationspring-doc.cn

  • Retry - log the exception, retry to start the container asynchronously, but do not stop the application.spring-doc.cn

The default retry behavior is to retry 3 times with a 10-second delay between each attempt. However, a custom retry template can be specified on the corresponding container properties. If the container fails to restart after the retries are exhausted, it is left in a non-running state.spring-doc.cn

9.2.1. Configuration

With Spring Boot

When using Spring Boot you can register a PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean that sets the container startup properties.spring-doc.cn

Without Spring Boot

However, if you are instead manually configuring the components, you will have to update the container startup properties accordingly when constructing the message listener container factory.spring-doc.cn

10. Pulsar Reader Support

The framework provides support for using Pulsar Reader via the PulsarReaderFactory.spring-doc.cn

Spring Boot provides this reader factory which you can further configure by specifying any of the spring.pulsar.reader.* application properties.spring-doc.cn

10.1. PulsarReader Annotation

While it is possible to use PulsarReaderFactory directly, Spring for Apache Pulsar provides the PulsarReader annotation that you can use to quickly read from a topic without setting up any reader factories yourselves. This is similar to the same ideas behind PulsarListener. Here is a quick example.spring-doc.cn

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
    //...
}

The id attribute is optional, but it is a best practice to provide a value that is meaningful to your application. When not specified an auto-generated id will be used. On the other hand, the topics and startMessageId attributes are mandatory. The topics attribute can be a single topic or a comma-separated list of topics. The startMessageId attribute instructs the reader to start from a particular message in the topic. The valid values for startMessageId are earliest or latest. Suppose you want the reader to start reading messages arbitrarily from a topic other than the earliest or latest available messages. In that case, you need to use a ReaderBuilderCustomizer to customize the ReaderBuilder so it knows the right MessageId to start from.spring-doc.cn

10.2. Customizing the ReaderBuilder

You can customize any fields available through ReaderBuilder using a PulsarReaderReaderBuilderCustomizer in Spring for Apache Pulsar. You can provide a @Bean of type PulsarReaderReaderBuilderCustomizer and then make it available to the PulsarReader as below.spring-doc.cn

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
    readerCustomizer = "myCustomizer")
void read(String message) {
    //...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
    return readerBuilder -> {
        readerBuilder.startMessageId(messageId); // the first message read is after this message id.
        // Any other customizations on the readerBuilder
    };
}
If your application only has a single @PulsarReader and a single PulsarReaderReaderBuilderCustomizer bean registered then the customizer will be automatically applied.

10.3. Handling Startup Failures

Message listener containers are started when the application context is refreshed. By default, any failures encountered during startup are re-thrown and the application will fail to start. You can adjust this behavior with the StartupFailurePolicy on the corresponding container properties.spring-doc.cn

The available options are:spring-doc.cn

  • Stop (default) - log and re-throw the exception, effectively stopping the applicationspring-doc.cn

  • Continue - log the exception, leave the container in a non-running state, but do not stop the applicationspring-doc.cn

  • Retry - log the exception, retry to start the container asynchronously, but do not stop the application.spring-doc.cn

The default retry behavior is to retry 3 times with a 10-second delay between each attempt. However, a custom retry template can be specified on the corresponding container properties. If the container fails to restart after the retries are exhausted, it is left in a non-running state.spring-doc.cn

10.3.1. Configuration

With Spring Boot

When using Spring Boot you can register a PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> bean that sets the container startup properties.spring-doc.cn

Without Spring Boot

However, if you are instead manually configuring the components, you will have to update the container startup properties accordingly when constructing the message listener container factory.spring-doc.cn