Message Production
1. ReactivePulsarTemplate
On the Pulsar producer side, Spring Boot auto-configuration provides a ReactivePulsarTemplate
for publishing records. The template implements an interface called ReactivePulsarOperations
and provides methods to publish records through its contract.
The template provides send methods that accept a single message and return a Mono<MessageId>
.
It also provides send methods that accept multiple messages (in the form of the ReactiveStreams Publisher
type) and return a Flux<MessageId>
.
For the API variants that do not include a topic parameter, a topic resolution process is used to determine the destination topic. |
1.1. Fluent API
The template provides a fluent builder to handle more complicated send requests.
1.2. Message customization
You can specify a MessageSpecBuilderCustomizer
to configure the outgoing message. For example, the following code shows how to send a keyed message:
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
1.3. Sender customization
You can specify a ReactiveMessageSenderBuilderCustomizer
to configure the underlying Pulsar sender builder that ultimately constructs the sender used to send the outgoing message.
Use with caution as this gives full access to the sender builder and invoking some of its methods (such as create ) may have unintended side effects.
|
For example, the following code shows how to disable batching and enable chunking:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
This other example shows how to use custom routing when publishing records to partitioned topics.
Specify your custom MessageRouter
implementation on the sender builder such as:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
Note that, when using a MessageRouter , the only valid setting for spring.pulsar.producer.message-routing-mode is custom .
|
2. Specifying Schema Information
If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data.
For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the ReactivePulsarTemplate
, the Spring for Apache Pulsar framework will try to build a Schema.JSON
from the type.
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding. |
2.1. Custom Schema Mapping
As an alternative to specifying the schema when invoking send operations on the ReactivePulsarTemplate
for complex types, the schema resolver can be configured with mappings for the types.
This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.
2.1.1. Configuration properties
Schema mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
The message-type is the fully-qualified name of the message class.
|
2.1.2. Schema resolver customizer
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
The following example uses a schema resolver customizer to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. Type mapping annotation
Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage
annotation.
The schema info can be specified via the schemaType
attribute on the annotation.
The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
With this configuration in place, there is no need to set specify the schema on send operations.
2.2. Producing with AUTO_SCHEMA
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an AUTO_PRODUCE schema to publish a raw JSON or Avro payload as a byte[]
safely.
In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic.
Simply specify a schema of Schema.AUTO_PRODUCE_BYTES()
on your template send operations as shown in the example below:
void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
This is only supported with Avro and JSON schema types. |
3. ReactivePulsarSenderFactory
The ReactivePulsarTemplate
relies on a ReactivePulsarSenderFactory
to actually create the underlying sender.
Spring Boot provides this sender factory which can be configured with any of the spring.pulsar.producer.*
application properties.
If topic information is not specified when using the sender factory APIs directly, the same topic resolution process used by the ReactivePulsarTemplate is used with the one exception that the "Message type default" step is omitted.
|
3.1. Producer Caching
Each underlying Pulsar producer consumes resources.
To improve performance and avoid continual creation of producers, the ReactiveMessageSenderCache
in the underlying Apache Pulsar Reactive client caches the producers that it creates.
They are cached in an LRU fashion and evicted when they have not been used within a configured time period.
You can configure the cache settings by specifying any of the spring.pulsar.producer.cache.*
application properties.