此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

时间戳提取器

Kafka Streams 允许您根据各种时间戳概念控制使用者记录的处理。 默认情况下,Kafka Streams 会提取嵌入在使用者记录中的时间戳元数据。 您可以通过提供不同的TimestampExtractorimplementation per input binding。 以下是有关如何执行此作的一些详细信息。spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然后你设置上面的TimestampExtractor每个使用者绑定的 bean 名称。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果您跳过用于设置自定义时间戳提取器的输入使用者绑定,则该使用者将使用默认设置。spring-doc.cadn.net.cn