This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.6!spring-doc.cn

@KafkaListener on a Class

When you use @KafkaListener at the class-level, you must specify @KafkaHandler at the method level. If no @KafkaHandler on any methods of this class or its sub-classes, the framework will reject such a configuration. The @KafkaHandler annotation is required for explicit and concise purpose of the method. Otherwise it is hard to make a decision about this or other method without extra restrictions.spring-doc.cn

When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so:spring-doc.cn

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

Starting with version 2.1.3, you can designate a @KafkaHandler method as the default method that is invoked if there is no match on other methods. At most, one method can be so designated. When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer, or the JsonMessageConverter with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.spring-doc.cn

Due to some limitations in the way Spring resolves method arguments, a default @KafkaHandler cannot receive discrete headers; it must use the ConsumerRecordMetadata as discussed in Consumer Record Metadata.

For example:spring-doc.cn

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

This won’t work if the object is a String; the topic parameter will also get a reference to object.spring-doc.cn

If you need metadata about the record in a default method, use this:spring-doc.cn

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}

Also, this won’t work as well. The topic is resolved to the payload.spring-doc.cn

@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    // payload.equals(topic) is True.
    ...
}

If there are use cases in which discrete custom headers are required in a default method, use this:spring-doc.cn

@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
    Object myValue = headers.get("MyCustomHeader");
    ...
}