此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

@KafkaListener注解

@KafkaListenerannotation 用于将 Bean 方法指定为侦听器容器的侦听器。 该 bean 被包装在一个MessagingMessageListenerAdapter配置了各种功能,例如转换器,用于在必要时转换数据以匹配方法参数。spring-doc.cadn.net.cn

您可以使用 SPEL 配置 Comments 上的大多数属性#{…​}或属性占位符 (${…​}). 有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

录制侦听器

@KafkaListenerannotation 为简单的 POJO 侦听器提供了一种机制。 以下示例演示如何使用它:spring-doc.cadn.net.cn

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

此机制需要一个@EnableKafka注释添加到您的@Configuration类和侦听器容器工厂,用于配置底层ConcurrentMessageListenerContainer. 默认情况下,名称为kafkaListenerContainerFactory是意料之中的。 以下示例演示如何使用ConcurrentMessageListenerContainer:spring-doc.cadn.net.cn

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ...
        return props;
    }
}

请注意,要设置容器属性,必须使用getContainerProperties()方法。 它用作注入到容器中的实际属性的模板。spring-doc.cadn.net.cn

从版本 2.1.1 开始,您现在可以将client.id属性。 这clientIdPrefix后缀为-n哪里n是一个整数,表示使用并发时的容器编号。spring-doc.cadn.net.cn

从版本 2.2 开始,您现在可以覆盖容器工厂的concurrencyautoStartupproperties 的 PROPERTIES 来使用注解本身的 properties。 属性可以是简单值、属性占位符或 SPEL 表达式。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

显式分区分配

您还可以使用显式主题和分区(以及可选的其初始偏移量)配置 POJO 侦听器。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

您可以在partitionspartitionOffsets属性,但不能同时共享两者。spring-doc.cadn.net.cn

与大多数 Comments 属性一样,您可以使用 SPEL 表达式;有关如何生成大型分区列表的示例,请参阅手动分配所有分区spring-doc.cadn.net.cn

从版本 2.5.5 开始,您可以将初始偏移量应用于所有分配的分区:spring-doc.cadn.net.cn

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

通配符表示*partitions属性。 只能有一个@PartitionOffset每个@TopicPartition.spring-doc.cadn.net.cn

此外,当监听器实现ConsumerSeekAware,onPartitionsAssigned现在被调用,即使在使用手动分配时也是如此。 例如,这允许在当时执行任何任意 seek作。spring-doc.cadn.net.cn

从版本 2.6.4 开始,您可以指定以逗号分隔的分区列表或分区范围:spring-doc.cadn.net.cn

@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}

范围是非独占的;上面的示例将分配 partition0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15.spring-doc.cadn.net.cn

指定初始偏移量时,可以使用相同的技术:spring-doc.cadn.net.cn

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1",
             partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

初始偏移量将应用于所有 6 个分区。spring-doc.cadn.net.cn

从 3.2 开始,@PartitionOffset支持SeekPosition.END,SeekPosition.BEGINNING,SeekPosition.TIMESTAMP,seekPosition火柴SeekPosition枚举名称:spring-doc.cadn.net.cn

@KafkaListener(id = "seekPositionTime", topicPartitions = {
        @TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
                @PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
                @PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
        })
})
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

如果 seekPosition 设置了ENDBEGINNING将忽略initialOffsetrelativeToCurrent. 如果 seekPosition 设置了TIMESTAMP,initialOffset表示时间戳。spring-doc.cadn.net.cn

手动确认

使用手动时AckMode,您还可以为侦听器提供Acknowledgment. 以下示例还显示了如何使用其他容器工厂。spring-doc.cadn.net.cn

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

消费者记录元数据

最后,有关记录的元数据可从消息标头中获得。 您可以使用以下报头名称来检索邮件的报头:spring-doc.cadn.net.cn

从版本 2.5 开始,RECEIVED_KEY不存在,如果传入记录具有null钥匙;以前,标头中填充了null价值。 此更改是为了使框架与spring-messaging约定,其中null值标头不存在。spring-doc.cadn.net.cn

以下示例演示如何使用标头:spring-doc.cadn.net.cn

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
参数注释 (@Payload,@Header) 必须在 listener 方法的具体实现上指定;如果在接口上定义了它们,则不会检测到它们。

从版本 2.5 开始,您可以在ConsumerRecordMetadata参数。spring-doc.cadn.net.cn

@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}

这包含来自ConsumerRecord但 Key 和 Value 除外。spring-doc.cadn.net.cn

Batch 侦听器

从版本 1.1 开始,您可以配置@KafkaListener方法来接收从 Consumer Poll 接收的整批 Consumer 记录。spring-doc.cadn.net.cn

批处理侦听器不支持非阻塞重试

要配置侦听器容器工厂创建批处理侦听器,您可以设置batchListener财产。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
   return factory;
}
从版本 2.8 开始,您可以覆盖工厂的batchListenerpropery 使用batch属性@KafkaListener注解。 这与对 Container Error Handlers 的更改一起,允许将同一工厂用于记录侦听器和批处理侦听器。
从版本 2.9.6 开始,容器工厂为recordMessageConverterbatchMessageConverter性能。 以前,只有一个属性messageConverter该 API 适用于记录侦听器和批处理侦听器。

以下示例显示如何接收有效负载列表:spring-doc.cadn.net.cn

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

topic 、 partition 、 offset 等在与有效负载并行的标头中可用。 以下示例演示如何使用标头:spring-doc.cadn.net.cn

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

或者,您也可以收到ListMessage<?>对象,但它必须是唯一的参数(除了可选的Acknowledgment、使用手动提交时,和/或Consumer<?, ?>parameters) 的 API 中。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在这种情况下,不会对有效负载执行任何转换。spring-doc.cadn.net.cn

如果BatchMessagingMessageConverter配置了RecordMessageConverter中,您还可以将泛型类型添加到Message参数,并且有效负载会进行转换。 有关更多信息,请参阅使用 Batch Listeners 进行有效负载转换spring-doc.cadn.net.cn

您还可以接收ConsumerRecord<?, ?>对象,但它必须是唯一的参数(除了可选的Acknowledgment、使用手动提交和Consumer<?, ?>parameters) 的 API 中。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

从版本 2.2 开始,侦听器可以接收完整的ConsumerRecords<?, ?>对象返回的poll()方法,让侦听器访问其他方法,例如partitions()(返回TopicPartition实例)和records(TopicPartition)(获取选择性记录)。 同样,这必须是唯一的参数(除了可选的Acknowledgment、使用手动提交或Consumer<?, ?>parameters) 的 API 中。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}
如果容器工厂具有RecordFilterStrategyconfigured,则ConsumerRecords<?, ?>listeners,带有WARN日志消息。 如果<List<?>>使用侦听器的形式。 默认情况下,一次筛选一个记录;从版本 2.8 开始,您可以覆盖filterBatch以在一次调用中筛选整个批处理。

注释属性

从版本 2.0 开始,idproperty(如果存在)用作 Kafka 使用者group.id属性,覆盖 Consumer Factory 中的 configured 属性(如果存在)。 您还可以设置groupId显式或设置idIsGroup设置为 false 以恢复之前使用 Consumer Factory 的行为group.id.spring-doc.cadn.net.cn

您可以在大多数 Comments 属性中使用属性占位符或 SPEL 表达式,如下例所示:spring-doc.cadn.net.cn

@KafkaListener(topics = "${some.property}")

@KafkaListener(topics = "#{someBean.someProperty}",
    groupId = "#{someBean.someProperty}.group")

从版本 2.1.2 开始,SPEL 表达式支持特殊令牌:__listener. 它是一个伪 Bean 名称,表示存在此 Comments 的当前 Bean 实例。spring-doc.cadn.net.cn

请考虑以下示例:spring-doc.cadn.net.cn

@Bean
public Listener listener1() {
    return new Listener("topic1");
}

@Bean
public Listener listener2() {
    return new Listener("topic2");
}

给定上一个示例中的 bean,然后我们可以使用以下内容:spring-doc.cadn.net.cn

public class Listener {

    private final String topic;

    public Listener(String topic) {
        this.topic = topic;
    }

    @KafkaListener(topics = "#{__listener.topic}",
        groupId = "#{__listener.topic}.group")
    public void listen(...) {
        ...
    }

    public String getTopic() {
        return this.topic;
    }

}

如果,在不太可能的情况下,您有一个名为__listener中,您可以使用beanRef属性。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")

从版本 2.2.4 开始,您可以直接在 Comments 上指定 Kafka consumer 属性,这些属性将覆盖在 consumer 工厂中配置的具有相同名称的任何属性。您不能指定group.idclient.idproperties 以这种方式;他们将被忽视;使用groupIdclientIdPrefixannotation 属性。spring-doc.cadn.net.cn

这些属性被指定为具有普通 Java 的单个字符串Properties文件格式:foo:bar,foo=barfoo bar,如下例所示:spring-doc.cadn.net.cn

@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

以下是 中示例的相应侦听器的示例RoutingKafkaTemplate.spring-doc.cadn.net.cn

@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
    System.out.println("1: " + in);
}

@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}