此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

使用 Kafka Binder 进行分区

Apache Kafka 原生支持主题分区。spring-doc.cadn.net.cn

有时,将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时(特定客户的所有消息都应发送到同一分区)。spring-doc.cadn.net.cn

以下示例显示了如何配置生产者和使用者端:spring-doc.cadn.net.cn

@SpringBootApplication
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        generate-out-0:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
请务必记住,由于 Apache Kafka 原生支持分区,因此无需依赖上述 Binder 分区,除非您使用示例中的自定义分区键或涉及有效负载本身的表达式。 Binder 提供的分区选择适用于不支持本机分区的中间件技术。 请注意,我们使用的是名为partitionKey在上面的示例中,这将是分区的决定因素,因此在这种情况下,使用 Binder 分区是合适的。 当使用原生 Kafka 分区时,即当您没有提供partition-key-expression,则 Apache Kafka 将选择一个分区,默认情况下,该分区将是记录键与可用分区数的哈希值。 要向出站记录添加键,请将KafkaHeaders.KEYheader 添加到 spring-messaging 中所需的键值Message<?>. 默认情况下,当没有提供记录键时,Apache Kafka 会根据 Apache Kafka 文档中描述的逻辑选择分区。
必须预置主题以具有足够的分区,以实现所有使用者组所需的并发性。 上述配置最多支持 12 个 Consumer 实例(如果concurrency是 2,如果并发数为 3,则为 4,依此类推)。 通常,最好 “过度预置” 分区,以允许将来使用者或并发性的增加。
上述配置使用默认分区 (key.hashCode() % partitionCount). 这可能会也可能不会提供适当平衡的算法,具体取决于键值。特别要注意的是,此分区策略与独立 Kafka 生产者使用的默认设置不同,例如 Kafka Streams 使用的策略,这意味着当这些客户端生成时,相同的键值在分区之间可能会以不同的方式平衡。 您可以使用partitionSelectorExpressionpartitionSelectorClass性能。

由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。 Kafka 跨实例分配分区。spring-doc.cadn.net.cn

kafka 主题的 partitionCount 在运行时可能会更改(例如,由于管理任务)。 在此之后,计算出的分区将有所不同(例如,届时将使用新的分区)。 从 Spring Cloud Stream 运行时的 4.0.3 开始,将支持分区计数的更改。 另请参阅参数 'spring.kafka.producer.properties.metadata.max.age.ms' 以配置更新间隔。 由于某些限制,无法使用引用消息的 'payload' 的 'partition-key-expression',在这种情况下,该机制将被禁用。 默认情况下,整体行为处于禁用状态,可以使用配置参数 'producer.dynamicPartitionUpdatesEnabled=true' 来启用。

Spring Spring Boot 应用程序侦听 Kafka 流并打印(到控制台)每条消息发送到的分区 ID:spring-doc.cadn.net.cn

@SpringBootApplication
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
            System.out.println(message + " received from partition " + partition);
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        listen-in-0:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。 Kafka 重新平衡分区分配。 如果实例计数 (或instance count * concurrency) 超过分区数,则部分消费者处于空闲状态。spring-doc.cadn.net.cn