此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

交互式查询

Kafka Streams Binder API 公开了一个名为InteractiveQueryService以交互方式查询 State Store。 您可以在应用程序中将其作为 Spring bean 进行访问。从应用程序访问此 bean 的一种简单方法是autowire豆子。spring-doc.cadn.net.cn

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦你获得了对这个 bean 的访问权限,那么你就可以查询你感兴趣的特定 state-store。见下文。spring-doc.cadn.net.cn

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在启动期间,用于检索存储的上述方法调用可能会失败。 例如,它可能仍处于初始化 state store 的过程中。 在这种情况下,重试此作将非常有用。 Kafka Streams Binder 提供了一种简单的重试机制来适应这种情况。spring-doc.cadn.net.cn

以下是可用于控制此重试的两个属性。spring-doc.cadn.net.cn

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认为1.spring-doc.cadn.net.cn

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为1000毫秒。spring-doc.cadn.net.cn

如果有多个 kafka streams 应用程序实例正在运行,则在以交互方式查询它们之前,您需要确定哪个应用程序实例托管您正在查询的特定密钥。InteractiveQueryServiceAPI 提供了用于识别主机信息的方法。spring-doc.cadn.net.cn

要使其正常工作,您必须配置属性application.server如下:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些代码片段:spring-doc.cadn.net.cn

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有关这些主机查找方法的更多信息,请参阅有关这些方法的 Javadoc。 同样对于这些方法,在启动期间,如果底层 KafkaStreams 对象尚未准备好,它们可能会引发异常。 上述重试属性也适用于这些方法。spring-doc.cadn.net.cn

通过 InteractiveQueryService 提供的其他 API 方法

使用以下 API 方法检索KeyQueryMetadata与给定 store 和 key 的组合相关联的对象。spring-doc.cadn.net.cn

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法检索KakfaStreams与给定 store 和 key 的组合相关联的对象。spring-doc.cadn.net.cn

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

自定义 Store 查询参数

有时,您需要先微调 store 查询参数,然后再通过InteractiveQueryService. 为此,从4.0.1版本,您可以为StoreQueryParametersCustomizer这是一个功能接口,具有customize方法,该方法采用StoreQueryParameter作为参数。 这是它的方法签名。spring-doc.cadn.net.cn

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

使用此方法,应用程序可以进一步自定义StoreQueryParameters例如启用过时的 store。spring-doc.cadn.net.cn

当此应用程序中存在此 Bean 时,InteractiveQueryService将调用其customize方法。spring-doc.cadn.net.cn

请记住,必须有一个唯一的 beanStoreQueryParametersCustomizer在应用程序中可用。