此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
从 3.0.0 版开始,当涉及到生产者拦截器时,您可以让 Spring 直接将其作为 Bean 进行管理,而不是向 Apache Kafka 生产者配置提供拦截器的类名。
如果采用此方法,则需要将此生产者拦截器设置为 。
下面是一个使用上述相同方法的示例,但已更改为不使用内部配置属性。KafkaTemplate
MyProducerInterceptor
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private final SomeBean bean;
public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
在发送记录之前,将调用生产者拦截器的方法。
一旦服务器发送有关发布数据的确认,就会调用该方法。
在生产者调用任何用户回调之前调用。onSend
onAcknowledgement
onAcknowledgement
如果您通过 Spring 管理了多个这样的生产者拦截器,并且需要在 上应用,则需要改用。 允许按顺序添加单个生产者拦截器。
底层实现中的方法按添加到 .KafkaTemplate
CompositeProducerInterceptor
CompositeProducerInterceptor
ProducerInterceptor
CompositeProducerInterceptor