对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

在 Spring 中管理的 Producer Interceptor

从版本 3.0.0 开始,当涉及到 producer 拦截器时,你可以让 Spring 直接将其作为 bean 进行管理,而不是将拦截器的类名提供给 Apache Kafka 生产者配置。 如果你使用这种方法,那么你需要将这个 producer 拦截器 设置为 onKafkaTemplate. 下面是一个使用相同MyProducerInterceptor,但更改为不使用 internal config 属性。spring-doc.cadn.net.cn

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
  return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
   KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
   kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}

在发送记录之前,onSend调用生产者拦截器的方法。 服务器发送发布数据的确认后,onAcknowledgement方法。 这onAcknowledgement在创建者调用任何用户回调之前调用。spring-doc.cadn.net.cn

如果您有多个通过 Spring 管理的此类生产者拦截器,这些拦截器需要在KafkaTemplate,您需要使用CompositeProducerInterceptor相反。CompositeProducerInterceptor允许按顺序添加单个 producer 拦截器。 底层ProducerInterceptor实现按照添加到CompositeProducerInterceptor.spring-doc.cadn.net.cn