因为连接器只是特殊功能,为了简单起见,我们将源、接收和功能统称为“Pulsar 功能”。 |
1. Pulsar 功能管理
该框架提供了管理 Pulsar 函数的组件。
当您使用 Pulsar Spring Boot 启动器时,您将获得自动配置。PulsarFunctionAdministration
PulsarFunctionAdministration
默认情况下,应用程序会尝试连接到位于 的本地 Pulsar 实例。
但是,由于它利用了已配置的 ,请参阅 Pulsar Admin Client 了解可用的客户端选项(包括身份验证)。
spring.pulsar.function.*
应用程序属性提供了其他配置选项。localhost:8080
PulsarAdministration
2. 自动功能管理
在应用程序启动时,框架会在应用程序上下文中找到所有 、 和 bean。
对于每个 Bean,都会创建或更新相应的 Pulsar 函数。
根据函数类型、函数配置以及函数是否已存在来调用正确的 API。PulsarFunction
PulsarSink
PulsarSource
、 和 bean 分别是围绕 Apache Pulsar 配置对象 、 和 的简单包装器。
由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。
相反,用户需要提供完整的配置对象,然后框架使用提供的配置处理管理(创建/更新)。PulsarFunction PulsarSink PulsarSource FunctionConfig SinkConfig SourceConfig |
在应用程序关闭时,在应用程序启动期间处理的所有函数都会强制执行其停止策略,并且会单独保留、停止或从 Pulsar 服务器中删除。
、 和 bean 分别是围绕 Apache Pulsar 配置对象 、 和 的简单包装器。
由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。
相反,用户需要提供完整的配置对象,然后框架使用提供的配置处理管理(创建/更新)。PulsarFunction PulsarSink PulsarSource FunctionConfig SinkConfig SourceConfig |
4. 配置
4.1. Pulsar 函数存档
每个 Pulsar 函数都由一个实际的存档(eg. jar 文件)表示。
存档的路径是通过源和接收器的属性以及函数的属性指定的。archive
jar
以下规则确定路径的“类型”:
-
当它开始时,路径是一个 URL w/
(file|http|https|function|sink|source)://
-
该路径在启动时是内置的,w/(指向提供的现成连接器之一)
builtin://
-
否则路径是本地的。
在创建/更新操作期间发生的操作取决于路径“type”,如下所示:
-
当路径是 URL 时,服务器会下载内容
-
当路径是内置的时,内容在服务器上已经可用
-
当路径为本地路径时,内容将上传到服务器
5. 自定义功能
有关如何开发和打包自定义函数的详细信息,请参见 Pulsar 文档。 但是,在高层次上,要求如下:
-
代码使用 Java8
-
代码实现 或
java.util.Function
org.apache.pulsar.functions.api.Function
-
包装成 uber jar
构建和打包函数后,有几种方法可以使其可用于函数注册。
6. 示例
以下是一些示例,展示了如何配置 Bean,从而自动创建后备 Pulsar 源连接器。PulsarSource
PulsarFunctionAdministration
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
下一个示例与上一个示例相同,只是它使用自动配置的 Spring Boot 来减轻配置负担。当然,这需要应用程序使用启用了 Rabbit 自动配置的 Spring Boot。RabbitProperties
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序 |
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序 |