此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
适用于 Apache Pulsar 的 Spring Cloud Stream Binder
Spring for Apache Pulsar 为 Spring Cloud Stream 提供了一个 Binder,我们可以使用它来使用 pub-sub 范式构建事件驱动的微服务。 在本节中,我们将介绍此活页夹的基本细节。
用法
我们需要在你的应用程序上包含以下依赖项,以使用 Apache Pulsar Binder for Spring Cloud Stream。
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}
概述
适用于 Apache Pulsar 的 Spring Cloud Stream Binder 允许应用程序专注于业务逻辑,而不是处理管理和维护 Pulsar 的较低级别细节。 Binder 为应用程序开发人员处理所有这些细节。 Spring Cloud Stream 带来了基于 Spring Cloud Function 的强大编程模型,允许应用程序开发人员使用函数式样式编写复杂的事件驱动应用程序。 应用程序可以从中间件中立的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目的地。 Spring Cloud Stream 构建在 Spring Boot 之上,当使用 Spring Cloud Stream 编写事件驱动型微服务时,您实际上是在编写 Boot 应用程序。 这是一个简单的 Spring Cloud Stream 应用程序。
@SpringBootApplication
public class SpringPulsarBinderSampleApp {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
}
@Bean
public Supplier<Time> timeSupplier() {
return () -> new Time(String.valueOf(System.currentTimeMillis()));
}
@Bean
public Function<Time, EnhancedTime> timeProcessor() {
return (time) -> {
EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
return enhancedTime;
};
}
@Bean
public Consumer<EnhancedTime> timeLogger() {
return (time) -> this.logger.info("SINK: {}", time);
}
record Time(String time) {
}
record EnhancedTime(Time time, String extra) {
}
}
上面的示例应用程序是一个成熟的 Spring Boot 应用程序,值得做一些解释。但是,在第一次传递时,您可以看到这只是普通的 Java 和一些 Spring 和 Spring Boot 注释。
我们这里有三种方法 - a 、 a ,最后是 a 。
供应商以毫秒为单位生成当前时间,该函数采用此时间,然后通过添加一些随机数据来增强它,然后使用者记录增强的时间。Bean
java.util.function.Supplier
java.util.function.Function
java.util.function.Consumer
为简洁起见,我们省略了所有导入,但整个应用程序中没有特定于 Spring Cloud Stream 的内容。 它如何成为与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序? 您必须在应用程序中包含 Binder 的上述依赖项。 添加该依赖项后,您必须提供以下配置属性。
spring:
cloud:
function:
definition: timeSupplier;timeProcessor;timeLogger;
stream:
bindings:
timeProcessor-in-0:
destination: timeSupplier-out-0
timeProcessor-out-0:
destination: timeProcessor-out-0
timeLogger-in-0:
destination: timeProcessor-out-0
至此,上述 Spring Boot 应用就变成了基于 Spring Cloud Stream 的端到端事件驱动应用。
因为我们在 classpath 上有 Pulsar binder,所以应用程序与 Apache Pulsar 交互。
如果应用程序中只有一个函数,那么我们不需要告诉 Spring Cloud Stream 激活该函数来执行,因为它默认这样做。
如果应用程序中有多个这样的函数,就像我们的例子一样,我们需要指示 Spring Cloud Stream 我们想要激活哪些函数。
在我们的例子中,我们需要激活所有这些 API,我们通过 property 来实现。
默认情况下,bean 名称成为 Spring Cloud Stream 绑定名称的一部分。
绑定是 Spring Cloud Stream 中的一个基本抽象概念,框架使用它与中间件目标进行通信。
Spring Cloud Stream 所做的几乎所有事情都发生在具体的绑定上。
供应商只有一个 output 绑定;函数具有 input 和 output bindings,而使用者只有 input binding。
让我们以我们的供应商 Bean 为例 - 此供应商的默认绑定名称将是 。
同样,函数的默认绑定名称将位于 inbound 和 outbound。
有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。
在大多数情况下,使用默认绑定名称就足够了。
我们在绑定名称上设置目标,如上所示。
如果未提供目标,则绑定名称将成为目标的值,就像 .spring.cloud.function.definition
timeSupplier.
timeSupplier-out-0
timeProcessor
timeProcessor-in-0
timeProcessor-out-0
timeSupplier-out-0
在运行上述应用程序时,您应该看到 supplier 每秒执行一次,然后被函数消耗,并增加了 Logger 消费者消耗的时间。
基于 Binder 的应用程序中的消息转换
在上面的示例应用程序中,我们没有提供用于消息转换的架构信息。
这是因为,默认情况下, Spring Cloud Stream 使用其消息转换机制,该机制通过 Spring Messaging 项目在 Spring Framework 中建立的消息支持。
除非指定,否则 Spring Cloud Stream 在入站和出站绑定上都用作消息转换。
在出站时,数据被序列化为 ,然后 Pulsar binder 通过网络将其发送到 Pulsar 主题。
同样,在入站时,数据从 Pulsar 主题中被消费,然后使用适当的消息转换器转换为目标类型。application/json
content-type
byte[],
Schema.BYTES
byte[]
使用 Pulsar Schema 在 Pulsar 中使用原生转换
尽管默认使用框架提供的消息转换,但 Spring Cloud Stream 允许每个 Binders 确定应如何转换消息。 假设应用程序选择走这条路。在这种情况下, Spring Cloud Stream 避免使用任何 Spring 提供的消息转换工具,而是传递它接收或生成的数据。 Spring Cloud Stream 中的此功能在生产者端称为本机编码,在使用者端称为本机解码。这意味着编码和解码在本地目标中间件上进行,在我们的例子中,在 Apache Pulsar 上。 对于上述应用,我们可以使用以下配置来绕过框架转换,使用原生的编码和解码。
spring:
cloud:
stream:
bindings:
timeSupplier-out-0:
producer:
use-native-encoding: true
timeProcessor-in-0:
destination: timeSupplier-out-0
consumer:
use-native-decoding: true
timeProcessor-out-0:
destination: timeProcessor-out-0
producer:
use-native-encoding: true
timeLogger-in-0:
destination: timeProcessor-out-0
consumer:
use-native-decoding: true
pulsar:
bindings:
timeSupplier-out-0:
producer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-in-0:
consumer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-out-0:
producer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
timeLogger-in-0:
consumer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
在生产者端启用本机编码的属性是核心 Spring Cloud Stream 的绑定级别属性。
你在 producer 绑定上设置 - 并将其设置为 同样,使用 - for consumer bindings,并将其设置为 如果我们决定使用原生编码和解码,在 Pulsar 的情况下,我们需要设置相应的 schema 和底层消息类型信息。
此信息作为扩展绑定属性提供。
正如您在上面的配置中看到的那样,属性为 - 用于架构信息和实际目标类型。
如果消息上同时具有键和值,则可以使用 和 来指定其目标类型。spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding
true.
spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding
true.
spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type
spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type
message-key-type
message-value-type
省略该属性时,将查询任何已配置的自定义架构映射。schema-type |
消息标头转换
每条消息通常都有标头信息,当消息通过 Spring Cloud Stream 输入和输出绑定在 Pulsar 和 Spring Messaging 之间遍历时,需要携带这些信息。 为了支持此遍历,框架会处理必要的消息标头转换。
自定义标头映射器
Pulsar Binder 配置了一个默认的 header mapper,可以通过提供你自己的 bean 来覆盖它。PulsarHeaderMapper
在以下示例中,配置了 JSON 标头映射器:
-
映射所有入站标头(键为 “top” 或 “secret” 的标头除外)
-
映射出站标头(键为 “id”、“timestamp” 或 “userId” 的标头除外)
-
仅信任 “com.acme” 包中的对象以进行出站反序列化
-
使用简单编码删除/序列化任何 “com.acme.Money” 标头值
toString()
@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
return JsonPulsarHeaderMapper.builder()
.inboundPatterns("!top", "!secret", "*")
.outboundPatterns("!id", "!timestamp", "!userId", "*")
.trustedPackages("com.acme")
.toStringClasses("com.acme.Money")
.build();
}
在 Binder 中使用 Pulsar 属性
Binder 使用 Spring for Apache Pulsar 框架的基本组件来构建其生产者和消费者绑定。
由于基于 Binder 的应用程序是 Spring Boot 应用程序,因此默认情况下,Binder 使用 Spring for Apache Pulsar 的 Spring Boot 自动配置。
因此,核心框架级别提供的所有 Pulsar Spring Boot 属性也可以通过 binder 获得。
例如,您可以使用带有前缀 的属性 等。
此外,您还可以在 binder 级别设置这些 Pulsar 属性。
例如,这也将起作用 - 或 .spring.pulsar.producer…
spring.pulsar.consumer…
spring.cloud.stream.pulsar.binder.producer…
spring.cloud.stream.pulsar.binder.consumer…
以上任何一种方法都可以,但是当使用此类属性时,它会应用于整个应用程序。
如果应用程序中有多个函数,则它们都具有相同的属性。
你也可以在扩展绑定属性级别设置这些 Pulsar 属性来解决这个问题。
扩展绑定属性应用于绑定本身。
例如,如果你有一个 input 和 output 绑定,并且都需要一组单独的 Pulsar 属性,则必须在扩展绑定上设置它们。
生产者绑定的模式是 .
同样,对于使用者绑定,模式为 .
这样,你可以将一组单独的 Pulsar 属性应用于同一应用程序中的不同绑定。spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…
spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…
最高优先级是扩展绑定属性。
在 Binder 中应用属性的优先顺序为 (从最高到最低)。extended binding properties → binder properties → Spring Boot properties.
Pulsar Binder 属性的资源
以下是一些资源,可用于查找有关 Pulsar Binder 中可用属性的更多信息。
Pulsar producer 绑定配置。
这些属性需要 prefix。
Spring Boot 提供的所有 Pulsar 生产者属性也可以通过这个配置类获得。spring.cloud.stream.bindings.<binding-name>.producer
Pulsar consumer binding 配置。
这些属性需要 prefix。
Spring Boot 提供的所有 Pulsar 消费者属性也可以通过这个配置类获得。spring.cloud.stream.bindings.<binding-name>.consumer
有关常见的 Pulsar binder 特定配置属性,请参阅此处。这些属性需要前缀 。
上面指定的 producer 和 consumer 属性(包括 Spring Boot 属性)可以在 binder 中使用 or 前缀。spring.cloud.stream.pulsar.binder
spring.cloud.stream.pulsar.binder.producer
spring.cloud.stream.pulsar.binder.consumer
Pulsar 主题配置器
适用于 Apache Pulsar 的 Spring Cloud Stream Binder 附带一个开箱即用的 Pulsar 主题配置程序。
在运行应用程序时,如果没有必要的 Topic,Pulsar 会为你创建 Topic。
但是,这是一个基本的非分区主题,如果您想要创建分区主题等高级功能,则可以依赖 Binder 中的主题预置程序。
Pulsar 主题配置者从框架中使用,框架使用因此,除非你在默认服务器和端口上运行 Pulsar,否则你需要设置该属性。PulsarAdministration
PulsarAdminBuilder.
spring.pulsar.administration.service-url
在创建主题时指定分区计数
创建主题时,您可以通过两种方式设置分区计数。
首先,您可以使用 property 在 Binder 级别设置它 。
正如我们在上面看到的,这样做会使应用程序创建的所有主题都继承这个属性。
假设您希望在绑定级别进行精细控制以设置分区。
在这种情况下,您可以使用 format .
这样,由同一应用程序中的不同函数创建的各种主题将根据应用程序要求具有不同的分区。spring.cloud.stream.pulsar.binder.partition-count
partition-count
spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count