“Tombstone”记录的 Null 负载和日志压缩
使用日志压缩时,您可以使用null
payloads 来标识密钥的删除。
您还可以接收null
值,例如可能返回null
当它无法反序列化值时。
生成 Null 有效负载
您可以发送null
value 替换为ReactivePulsarTemplate
通过传递null
message 参数值设置为send
方法,例如:
reactiveTemplate
.send(null, Schema.STRING)
.subscribe();
发送 null 值时,必须指定架构类型,因为系统无法从null 有效载荷。 |
使用 Null 负载
为@ReactivePularListener
这null
payload 根据 Listener 的 message 参数的类型传递到 Listener 方法中,如下所示:
参数类型 | 传入值 |
---|---|
原始 |
|
用户定义 |
|
|
非 null Pulsar 消息,其 |
|
非 null Spring 消息,其 |
|
非 null flux,其条目是非 null 的 Pulsar 消息,其 |
|
非 null 通量,其条目是非 null 的 Spring 消息,其 |
当传入值为null (即具有原始类型或用户定义类型的单记录侦听器)您必须使用@Payload parameter 注解替换为required = false . |
使用 Spring 时org.springframework.messaging.Message 对于侦听器有效负载类型,其泛型类型信息必须足够宽才能接受Message<PulsarNull> (例如。Message ,Message<?> 或Message<Object> ).
这是因为 Spring Message 不允许其有效负载使用 null 值,而是使用PulsarNull 占 位 符。 |
如果它是压缩日志的逻辑删除消息,则通常还需要 key,以便您的应用程序可以确定哪个 key 是”deleted
".
以下示例显示了此类配置:
@ReactivePulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
Mono<Void> myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
当使用流式消息侦听器 (Flux ) 标头支持是有限的,因此它在日志压缩场景中的用处不大。 |