此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
自定义 Kafka Binder 运行状况指示器
覆盖默认 Kafka Binder 运行状况指示器
Kafka Binders 在 Spring Boot 执行器位于 Classpath 上时激活默认运行状况指示器。
此运行状况指示器检查 Binder 的运行状况以及与 Kafka 代理的任何通信问题。
如果应用程序想要禁用此默认运行状况检查实现并包含自定义实现,则它可以为KafkaBinderHealth
接口。KafkaBinderHealth
是一个标记接口,它从HealthIndicator
.
在自定义实现中,它必须为health()
方法。
自定义实现必须作为 Bean 存在于应用程序配置中。
当 Binder 发现自定义实现时,它将使用该实现而不是默认实现。
以下是应用程序中此类自定义实现 Bean 的示例。
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
自定义 kafka Binder Health Indicator 示例
以下是编写自定义 Kafka Binder HealthIndicator 的伪代码。 在此示例中,我们尝试通过首先专门检查集群连接,然后检查与主题相关的问题来覆盖 Binder 提供的 Kafka HealthIndicator。
首先,我们需要创建一个KafkaBinderHealth
接口。
public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
@Value("${spring.cloud.bus.destination}")
private String topic;
private final AdminClient client;
public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
// More about configuring Kafka
// https://docs.spring.io/spring-kafka/reference/html/#configuring-topics
this.client = AdminClient.create(admin.getConfigurationProperties());
}
@Override
public Health health() {
if (!checkBrokersConnection()) {
logger.error("Error when connect brokers");
return Health.down().withDetail("BrokersConnectionError", "Error message").build();
}
if (!checkTopicConnection()) {
logger.error("Error when trying to connect with specific topic");
return Health.down().withDetail("TopicError", "Error message with topic name").build();
}
return Health.up().build();
}
public boolean checkBrokersConnection() {
// Your implementation
}
public boolean checkTopicConnection() {
// Your implementation
}
}
然后我们需要为自定义实现创建一个 Bean。
@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
return new KafkaBinderHealthImplementation(admin);
}
}