此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

提示、技巧和Recipes

使用 Kafka 的简单 DLQ

问题陈述

作为开发人员,我想编写一个使用者应用程序来处理来自 Kafka 主题的记录。 但是,如果在处理过程中出现一些错误,我不希望应用程序完全停止。 相反,我想将错误的记录发送到 DLT (Dead-Letter-Topic),然后继续处理新记录。spring-doc.cadn.net.cn

溶液

此问题的解决方案是使用 Spring Cloud Stream 中的 DLQ 功能。 为了进行本次讨论,我们假设以下是我们的处理器函数。spring-doc.cadn.net.cn

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

这是一个非常简单的函数,它会为它处理的所有记录引发异常,但您可以使用此函数并将其扩展到任何其他类似情况。spring-doc.cadn.net.cn

为了将错误的记录发送到 DLT,我们需要提供以下配置。spring-doc.cadn.net.cn

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

要激活 DLQ,应用程序必须提供组名称。 匿名使用者无法使用 DLQ 功能。 我们还需要通过设置enableDLQ属性绑定到 Kafka 使用者true. 最后,我们可以通过提供dlqName在 Kafka 使用者绑定上,否则默认为error.input-topic.my-group在这种情况下。spring-doc.cadn.net.cn

请注意,在上面提供的示例 consumer 中,payload 的类型是byte[]. 默认情况下,Kafka Binder 中的 DLQ 生产者需要byte[]. 如果不是这种情况,那么我们需要为适当的 serializer 提供配置。 例如,让我们重写 consumer 函数,如下所示:spring-doc.cadn.net.cn

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

现在,我们需要告诉 Spring Cloud Stream,在写入 DLT 时,我们希望如何序列化数据。 以下是此方案的修改后的配置:spring-doc.cadn.net.cn

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

具有高级重试选项的 DLQ

问题陈述

这与上面的配方类似,但作为开发人员,我想配置处理重试的方式。spring-doc.cadn.net.cn

溶液

如果您遵循上述配方,则当处理遇到错误时,您将获得内置于 Kafka Binder 中的默认重试选项。spring-doc.cadn.net.cn

默认情况下,Binder 最多停用 3 次尝试,初始延迟为 1 秒,每次回退为 2.0 倍,最大延迟为 10 秒。 您可以更改所有这些配置,如下所示:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果需要,您还可以通过提供布尔值映射来提供可重试异常的列表。 例如spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

默认情况下,将重试上面映射中未列出的任何异常。 如果不需要,则可以通过提供spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您也可以提供自己的RetryTemplate并将其标记为@StreamRetryTemplate将被 Binder 扫描并使用。 当您需要更复杂的重试策略和策略时,这非常有用。spring-doc.cadn.net.cn

如果您有多个@StreamRetryTemplatebeans 的 bean 中,则您可以使用属性spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

使用 DLQ 处理反序列化错误

问题陈述

我有一个处理器在 Kafka Consumer 中遇到反序列化异常。 我希望 Spring Cloud Stream DLQ 机制能够捕捉到这种情况,但事实并非如此。 我该如何处理呢?spring-doc.cadn.net.cn

溶液

当 Kafka 使用者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的正常 DLQ 机制将无济于事。 这是因为,这个异常甚至发生在消费者的poll()method 返回。 Spring for Apache Kafka 项目提供了一些很好的方法来帮助 Binder 处理这种情况。 让我们来探讨一下。spring-doc.cadn.net.cn

假设这是我们的函数:spring-doc.cadn.net.cn

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

这是一个简单的函数,它采用String参数。spring-doc.cadn.net.cn

我们希望绕过 Spring Cloud Stream 提供的消息转换器,并希望改用本机反序列化器。 在String类型,这没有多大意义,但对于更复杂的类型,如 AVRO 等,您必须依赖外部反序列化器,因此希望将转换委托给 Kafka。spring-doc.cadn.net.cn

现在当 Consumer 收到数据时,我们假设有一条坏记录导致反序列化错误,可能是有人传递了一个Integer而不是String例如。 在这种情况下,如果您不在应用程序中执行某些作,则异常将通过链传播,并且您的应用程序最终将退出。spring-doc.cadn.net.cn

为了处理此问题,您可以添加ListenerContainerCustomizer @Bean该配置一个DefaultErrorHandler. 这DefaultErrorHandler配置了DeadLetterPublishingRecoverer. 我们还需要配置一个ErrorHandlingDeserializer对于消费者。 这听起来像很多复杂的事情,但实际上,在这种情况下,归结为这 3 颗豆子。spring-doc.cadn.net.cn

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

让我们逐一分析一下。 第一个是ListenerContainerCustomizerbean 的 bean 接受DefaultErrorHandler. 现在,容器已使用该特定错误处理程序进行自定义。 您可以在此处了解有关容器自定义的更多信息。spring-doc.cadn.net.cn

第二个 bean 是DefaultErrorHandler配置了一个发布到DLT. 有关的更多详细信息,请参阅此处DefaultErrorHandler.spring-doc.cadn.net.cn

第三个 bean 是DeadLetterPublishingRecoverer,最终负责发送到DLT. 默认情况下,DLTtopic 被命名为 ORIGINAL_TOPIC_NAME。DLT 的。 不过,你可以改变这一点。 有关更多详细信息,请参阅文档spring-doc.cadn.net.cn

我们还需要通过 application config 配置 ErrorHandlingDeserializerspring-doc.cadn.net.cn

ErrorHandlingDeserializer委托给实际的反序列化器。 如果出现错误,它会将记录的键/值设置为 null,并包含消息的原始字节。 然后,它在标头中设置异常并将此记录传递给侦听器,然后侦听器调用已注册的错误处理程序。spring-doc.cadn.net.cn

以下是所需的配置:spring-doc.cadn.net.cn

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我们提供ErrorHandlingDeserializer通过configuration属性。 我们还指出,要委托的实际反序列化器是StringDeserializer.spring-doc.cadn.net.cn

请记住,上述 dlq 属性都与此配方中的讨论无关。 它们纯粹用于解决任何应用程序级别的错误。spring-doc.cadn.net.cn

Kafka Binder 中的基本偏移管理

问题陈述

我想编写一个 Spring Cloud Stream Kafka 使用者应用程序,但不确定它如何管理 Kafka 使用者偏移量。 您能解释一下吗?spring-doc.cadn.net.cn

溶液

我们鼓励您阅读有关此内容的文档部分以全面了解它。spring-doc.cadn.net.cn

要点如下:spring-doc.cadn.net.cn

默认情况下,Kafka 支持两种类型的偏移量 -earliestlatest. 他们的语义从他们的名字中就不言自明。spring-doc.cadn.net.cn

假设您是第一次运行使用者。 如果您错过了 Spring Cloud Stream 应用程序中的 group.id,则它将成为匿名使用者。 只要您有匿名使用者,在这种情况下, Spring Cloud Stream 应用程序默认将从latestTopic 分区中的 available offset 中。 另一方面,如果你显式指定了 group.id,那么默认情况下,Spring Cloud Stream 应用程序将从earliestTopic 分区中的 available offset 中。spring-doc.cadn.net.cn

在上述两种情况下(具有显式组和匿名组的使用者),可以使用属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset并将其设置为earliestlatest.spring-doc.cadn.net.cn

现在,假设您之前已经运行了使用者,现在再次启动它。 在这种情况下,上述情况下的起始偏移量语义不适用,因为使用者会为使用者组找到已提交的偏移量(在匿名使用者的情况下,尽管应用程序不提供 group.id,但 Binder 会自动为您生成一个偏移量)。 它只是从最后一个提交的 offset 开始拾取。 这是真的,即使您有startOffsetvalue 提供。spring-doc.cadn.net.cn

但是,您可以使用resetOffsets财产。 为此,请将属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsetstrue(即false默认情况下)。 然后,请确保提供startOffset值(或者earliestlatest). 当您执行此作,然后启动使用者应用程序时,每次启动时,它都会像第一次启动一样启动,并忽略分区的任何已提交偏移量。spring-doc.cadn.net.cn

在 Kafka 中寻找任意偏移量

问题陈述

使用 Kafka 活页夹,我知道它可以将偏移量设置为earliestlatest,但我需要寻找中间某个东西的偏移量,即任意偏移量。 有没有办法使用 Spring Cloud Stream Kafka Binder 来实现这一点?spring-doc.cadn.net.cn

溶液

之前,我们了解了 Kafka Binder 如何让您处理基本的偏移管理。 默认情况下,Binder 不允许您倒回到任意偏移量,至少通过我们在该配方中看到的机制。 但是,Binder 提供了一些低级策略来实现此用例。 让我们来探索它们。spring-doc.cadn.net.cn

首先,当您想重置为earliestlatest,请确保将resetOffsetsconfiguration 设置为其默认值,即false. 然后,您必须提供KafkaBindingRebalanceListener,它将被注入到所有使用者绑定中。 这是一个带有一些默认方法的接口,但这是我们感兴趣的方法:spring-doc.cadn.net.cn

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

让我们看看细节。spring-doc.cadn.net.cn

本质上,每次在 topic partition 的初始分配期间或重新平衡之后,都会调用此方法。 为了更好地说明,让我们假设我们的主题是foo它有 4 个分区。 最初,我们只启动组中的单个使用者,该使用者将从所有分区中消费。 当使用者首次启动时,所有 4 个分区都会被初始分配。 但是,我们不想启动分区以默认 (earliest因为我们定义了一个组),而不是对于每个分区,我们希望它们在寻找任意偏移量后消费。 假设您有一个业务案例要从某些偏移量中使用,如下所示。spring-doc.cadn.net.cn

Partition   start offset

0           1000
1           2000
2           2000
3           1000

这可以通过实现上述方法来实现,如下所示。spring-doc.cadn.net.cn

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

这只是一个基本的实现。 现实世界的用例比这复杂得多,您需要进行相应的调整,但这肯定会为您提供一个基本的草图。 当消费者seek失败时,它可能会引发一些运行时异常,您需要决定在这些情况下该怎么做。spring-doc.cadn.net.cn

[[如果我们用相同的组 ID 开始第二个消费者怎么办? === 如果我们用相同的 group id 启动第二个 Consumer 怎么办?spring-doc.cadn.net.cn

当我们添加第二个消费者时,将发生再平衡,并且一些分区将被移动。 假设新的使用者获取了分区23. 当这个新的 Spring Cloud Stream 使用者调用这个onPartitionsAssigned方法,它会看到这是 partition 的初始赋值23在这个消费者上。 因此,它将执行 seek作,因为对initial论点。 对于第一个使用者,它现在只有分区01然而,对于这个消费者来说,这只是一个再平衡事件,不被视为初始分配。 因此,它不会重新查找给定的偏移量,因为对initial论点。spring-doc.cadn.net.cn

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka 活页夹手动确认?spring-doc.cadn.net.cn

问题陈述

使用 Kafka Binder,我想手动确认使用者中的消息。 我该怎么做?spring-doc.cadn.net.cn

溶液

默认情况下,Kafka Binder 委托给 Spring for Apache Kafka 项目中的默认提交设置。 默认的ackMode在 Spring Kafka 中是batch. 有关此的更多详细信息,请参阅此处spring-doc.cadn.net.cn

在某些情况下,您希望禁用此默认提交行为并依赖手动提交。 以下步骤允许您执行此作。spring-doc.cadn.net.cn

设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode更改为MANUALMANUAL_IMMEDIATE. 当它像这样设置时,将有一个名为kafka_acknowledgment(来自KafkaHeaders.ACKNOWLEDGMENT) 出现在 consumer 方法接收的消息中。spring-doc.cadn.net.cn

例如,将此视为您的使用者方法。spring-doc.cadn.net.cn

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然后,您将属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackModeMANUALMANUAL_IMMEDIATE.spring-doc.cadn.net.cn

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何覆盖 Spring Cloud Stream 中的默认绑定名称?spring-doc.cadn.net.cn

问题陈述

Spring Cloud Stream 根据函数定义和签名创建默认绑定,但是如何将它们覆盖为更多域名友好名称呢?spring-doc.cadn.net.cn

溶液

假设 following 是您的函数签名。spring-doc.cadn.net.cn

@Bean
public Function<String, String> uppercase(){
...
}

默认情况下,Spring Cloud Stream 将创建如下绑定。spring-doc.cadn.net.cn

  1. 大写输入 0spring-doc.cadn.net.cn

  2. 大写输出 0spring-doc.cadn.net.cn

您可以使用以下属性将这些绑定覆盖到某个内容。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

在此之后,必须对新名称my-transformer-inmy-transformer-out.spring-doc.cadn.net.cn

这是另一个使用 Kafka Streams 和多个输入的示例。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

默认情况下,Spring Cloud Stream 将为此功能创建三个不同的绑定名称。spring-doc.cadn.net.cn

  1. processOrder-in-0spring-doc.cadn.net.cn

  2. processOrder-in-1 (流程顺序合 1)spring-doc.cadn.net.cn

  3. processOrder-out-0spring-doc.cadn.net.cn

每次要为这些绑定设置一些配置时,都必须使用这些绑定名称。 您不喜欢这样,并且希望使用对域更友好且可读性更强的绑定名称,例如,类似于 .spring-doc.cadn.net.cn

您只需设置这三个属性即可轻松实现spring-doc.cadn.net.cn

  1. spring.cloud.stream.function.bindings.processOrder-in-0=订单spring-doc.cadn.net.cn

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accountsspring-doc.cadn.net.cn

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrdersspring-doc.cadn.net.cn

执行此作后,它将覆盖默认绑定名称,并且要对其设置的任何属性都必须位于这些新绑定名称上。spring-doc.cadn.net.cn

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何将消息密钥作为记录的一部分发送?spring-doc.cadn.net.cn

问题陈述

我需要发送一个密钥以及记录的有效负载,有没有办法在 Spring Cloud Stream 中做到这一点?spring-doc.cadn.net.cn

溶液

通常需要将关联数据结构(如 map)作为具有键和值的记录发送。 Spring Cloud Stream 允许您以简单的方式执行此作。 以下是执行此作的基本蓝图,但您可能希望将其调整为适用于您的特定用例。spring-doc.cadn.net.cn

这是示例生产者方法(又名Supplier).spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

这是一个简单的函数,它发送一条消息,其中包含Stringpayload 的 intent 和密钥。 请注意,我们使用 key 将 key 设置为消息标头KafkaHeaders.MESSAGE_KEY.spring-doc.cadn.net.cn

如果要将密钥从默认值更改为kafka_messageKey,那么在配置中,我们需要指定这个属性:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

请注意,我们使用绑定名称supplier-out-0由于这是我们的函数名称,请相应地更新。spring-doc.cadn.net.cn

然后,我们在生成消息时使用此新密钥。spring-doc.cadn.net.cn

[[我如何使用-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?] == 如何使用本机序列化器和反序列化器而不是由 Spring Cloud Stream 完成的消息转换?spring-doc.cadn.net.cn

问题陈述

我想在 Kafka 中使用本机 Serializer 和 Deserializer,而不是在 Spring Cloud Stream 中使用消息转换器。 默认情况下,Spring Cloud Stream 使用其内部内置的消息转换器来处理此转换。 我怎样才能绕过这种情况并将责任委托给 Kafka?spring-doc.cadn.net.cn

溶液

这真的很容易做到。spring-doc.cadn.net.cn

您所要做的就是提供以下属性以启用本机序列化。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然后,您还需要设置序列化程序。 有几种方法可以做到这一点。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或使用 Binder 配置。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

使用 Binder 方式时,它应用于所有绑定,而在绑定处设置它们则是针对每个绑定的。spring-doc.cadn.net.cn

在反序列化方面,你只需要提供反序列化器作为配置。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您还可以在 Binder 级别设置它们。spring-doc.cadn.net.cn

有一个可选属性,您可以将其设置为强制本机解码。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,对于 Kafka Binders,这是不必要的,因为当它到达 Binder 时, Kafka 已经使用配置的反序列化器对它们进行了反序列化。spring-doc.cadn.net.cn

说明 Kafka Streams Binder 中的偏移重置工作原理

问题陈述

默认情况下,Kafka Streams Binder 始终从新使用者的最早偏移量开始。 有时,应用程序从最新的偏移量开始是有益的或需要的。 Kafka Streams Binder 允许您执行此作。spring-doc.cadn.net.cn

溶液

在查看解决方案之前,让我们看看以下场景。spring-doc.cadn.net.cn

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我们有一个BiConsumer需要两个 input binding的 bean。 在这种情况下,第一个绑定是KStream第二个是KTable. 首次运行此应用程序时,默认情况下,两个绑定都从earliest抵消。 我想从latest由于某些要求而抵消? 您可以通过启用以下属性来执行此作。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果只希望一个绑定从latestoffset 和另一个 consumer 从默认值earliest,然后从配置中保留后者绑定。spring-doc.cadn.net.cn

请记住,一旦有提交的偏移量,这些设置就不会被遵循,而提交的偏移量优先。spring-doc.cadn.net.cn

跟踪 Kafka 中的记录成功发送(生产)

问题陈述

我有一个 Kafka 生产者应用程序,我想跟踪我所有成功的发送。spring-doc.cadn.net.cn

溶液

我们假设应用程序中有以下供应商。spring-doc.cadn.net.cn

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然后,我们需要定义一个新的MessageChannelbean 捕获所有成功的发送信息。spring-doc.cadn.net.cn

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下来,在应用程序配置中定义此属性,以提供recordMetadataChannel.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此时,发送成功的信息将发送到fooRecordChannel.spring-doc.cadn.net.cn

您可以编写IntegrationFlow如下查看信息。spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle方法,有效负载是发送到 Kafka 的内容,消息标头包含一个名为kafka_recordMetadata. 它的值是RecordMetadata其中包含有关 topic partition、current offset 等的信息。spring-doc.cadn.net.cn

在 Kafka 中添加自定义头映射器

问题陈述

我有一个 Kafka 生产者应用程序,它设置了一些标头,但使用者应用程序中缺少它们。为什么?spring-doc.cadn.net.cn

溶液

在正常情况下,这应该没问题。spring-doc.cadn.net.cn

想象一下,您有以下生产者。spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消费者端,您应该仍然会看到标头 “foo”,以下内容应该不会给您带来任何问题。spring-doc.cadn.net.cn

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在应用程序中提供自定义标头映射器,则此作将不起作用。 假设您有一个空的KafkaHeaderMapper在应用程序中。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果这是您的实现,那么您将错过foo标头。 很有可能,您可能有一些逻辑KafkaHeaderMapper方法。 您需要以下内容来填充foo页眉。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

这将正确填充foo标头。spring-doc.cadn.net.cn

关于 id 标头的特别说明

在 Spring Cloud Stream 中,idheader 是一个特殊的 header,但某些应用程序可能希望具有特殊的自定义 ID headers - 类似于custom-idIDId. 第一个 (custom-id) 将在没有任何自定义标头映射器的情况下从 producer 传播到 consumer。 但是,如果您在使用保留的框架的变体进行生产时idheader - 例如ID,Id,iD等等,那么您会遇到框架内部的问题。 有关此用例的更多上下文,请参阅此 StackOverflow 线程。 在这种情况下,您必须使用自定义KafkaHeaderMapper以映射区分大小写的 ID 标头。 例如,假设您有以下生产者。spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

标头Id上面将从消费端消失,因为它与框架冲突id页眉。 您可以提供自定义KafkaHeaderMapper来解决这个问题。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

通过这样做,两者idId标头将从 Producer 端提供给 Consumer 端。spring-doc.cadn.net.cn

在事务中向多个主题生产

问题陈述

如何向多个 Kafka 主题生成事务性消息?spring-doc.cadn.net.cn

有关更多上下文,请参阅此 StackOverflow 问题spring-doc.cadn.net.cn

溶液

在 Kafka Binder 中为事务使用事务支持,然后提供AfterRollbackProcessor. 为了生成到多个主题,请使用StreamBridge应用程序接口。spring-doc.cadn.net.cn

以下是此代码片段:spring-doc.cadn.net.cn

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

为了进行测试,您可以使用以下内容:spring-doc.cadn.net.cn

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要的说明:spring-doc.cadn.net.cn

请确保您在应用程序配置上没有任何 DLQ 设置,因为我们手动配置 DLT(默认情况下,它将发布到名为input.DLT基于初始 consumer 函数)。 此外,将maxAttempts在使用者绑定到1以避免 Binder 重试。 在上面的示例中,最多尝试总共 3 次(初始尝试 + 在FixedBackoff).spring-doc.cadn.net.cn

有关如何测试此代码的更多详细信息,请参阅 StackOverflow 线程。 如果你使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将isolation-level在 consumer 绑定到read-committed.spring-doc.cadn.net.cn

StackOverflow 线程也与此讨论相关。spring-doc.cadn.net.cn

运行多个可轮询使用者时要避免的陷阱

问题陈述

如何运行可轮询使用者的多个实例并生成唯一的client.id对于每个实例?spring-doc.cadn.net.cn

溶液

假设我有以下定义:spring-doc.cadn.net.cn

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

在运行应用程序时,Kafka 使用者会生成一个 client.id(类似于consumer-my-group-1). 对于正在运行的应用程序的每个实例,此client.id将相同,从而导致意外问题。spring-doc.cadn.net.cn

为了解决这个问题,您可以在应用程序的每个实例上添加以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有关更多详细信息,请参阅此 GitHub 问题spring-doc.cadn.net.cn