使用日志压缩时,您可以发送和接收带有有效负载的消息,以识别密钥的删除。 您还可以出于其他原因接收值,例如,当无法反序列化值时可能会返回的反序列化程序。nullnullnull

生成空有效负载

您可以通过将消息参数值传递给其中一个方法来发送带有 的值,例如:nullReactivePulsarTemplatenullsend

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
发送 null 值时,必须指定架构类型,因为系统无法确定有效负载中的消息类型。null
发送 null 值时,必须指定架构类型,因为系统无法确定有效负载中的消息类型。null

使用 Null 有效负载

对于 ,有效负载根据其消息参数的类型传递到 listener 方法中,如下所示:@ReactivePularListenernull

参数类型 传入值

原始

null

用户定义

null

org.apache.pulsar.client.api.Message<T>

非空 Pulsar 消息,其返回getValue()null

org.springframework.messaging.Message<T>

非空 Spring 消息,其返回getPayload()PulsarNull

Flux<org.apache.pulsar.client.api.Message<T>>

非空通量,其条目为非空 Pulsar 消息,其返回getValue()null

Flux<org.apache.pulsar.client.api.Messages<T>>

非空通量,其条目为非空 Spring 消息,其返回getPayload()PulsarNull

当传入值为 (即具有基元或用户定义类型的单记录侦听器)时,必须将参数 annotation 与 .null@Payloadrequired = false
将 Spring 用于侦听器有效负载类型时,其泛型类型信息必须足够宽以接受(例如 、 或 )。 这是因为 Spring Message 不允许其有效负载为 null 值,而是使用占位符。org.springframework.messaging.MessageMessage<PulsarNull>MessageMessage<?>Message<Object>PulsarNull

如果是压缩日志的逻辑删除消息,则通常还需要密钥,以便应用程序可以确定哪个键是“”。 以下示例显示了这样的配置:deleted

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
使用流式消息侦听器 () 时,标头支持是有限的,因此在日志压缩方案中不太有用。Flux
参数类型 传入值

原始

null

用户定义

null

org.apache.pulsar.client.api.Message<T>

非空 Pulsar 消息,其返回getValue()null

org.springframework.messaging.Message<T>

非空 Spring 消息,其返回getPayload()PulsarNull

Flux<org.apache.pulsar.client.api.Message<T>>

非空通量,其条目为非空 Pulsar 消息,其返回getValue()null

Flux<org.apache.pulsar.client.api.Messages<T>>

非空通量,其条目为非空 Spring 消息,其返回getPayload()PulsarNull

当传入值为 (即具有基元或用户定义类型的单记录侦听器)时,必须将参数 annotation 与 .null@Payloadrequired = false
将 Spring 用于侦听器有效负载类型时,其泛型类型信息必须足够宽以接受(例如 、 或 )。 这是因为 Spring Message 不允许其有效负载为 null 值,而是使用占位符。org.springframework.messaging.MessageMessage<PulsarNull>MessageMessage<?>Message<Object>PulsarNull
使用流式消息侦听器 () 时,标头支持是有限的,因此在日志压缩方案中不太有用。Flux