Null Payloads and Log Compaction of 'Tombstone' Records
When using log compaction, you can send and receive messages with null
payloads to identify the deletion of a key.
You can also receive null
values for other reasons, such as a deserializer that might return null
when it cannot deserialize a value.
Producing Null Payloads
To send a null
payload by using the PulsarTemplate
, you can use the fluent API and pass null into the value argument of the newMessage()
method, for example:
pulsarTemplate
.newMessage(null)
.withTopic("my-topic")
.withSchema(Schema.STRING)
.withMessageCustomizer((mb) -> mb.key("key:1234"))
.send();
When sending null values you must specify the schema type as the system can not determine the type of the message from a null payload.
|
Consuming Null Payloads
For @PulsarListener
and @PulsarReader
, the null
payload is passed into the listener method based on the type of its message parameter as follows:
Parameter type | Passed-in value |
---|---|
primitive |
|
user-defined |
|
|
non-null Pulsar message whose |
|
non-null Spring message whose |
|
non-null list whose entries ( |
|
non-null container of non-null Pulsar messages whose |
When the passed-in value is null (ie. single record listeners with primitive or user-defined types) you must use the @Payload parameter annotation with required = false .
|
When using the Spring org.springframework.messaging.Message for your listener payload type, its generic type information must be wide enough to accept Message<PulsarNull> (eg. Message , Message<?> , or Message<Object> ).
This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the PulsarNull placeholder.
|
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted
".
The following example shows such a configuration:
@PulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
void myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
The @PulsarReader does not yet support @Header arguments, so it is less useful in the log compaction scenario.
|