Spring for Apache Pulsar 为 Pulsar IO(连接器)和 Pulsar 函数提供了基本支持,允许用户定义由 、 和 组成的流处理管道。 和 由 Pulsar IO(连接器)建模,由 Pulsar 函数表示。sourcesprocessorssinkssourcessinksprocessors

因为连接器只是特殊功能,为了简单起见,我们将源、接收和功能统称为“Pulsar 功能”。
先决条件

熟悉 - 预计观众对 Pulsar IOPulsar 功能有些熟悉。 如果不是这种情况,查看他们的入门指南可能会有所帮助。

功能已启用 - 要使用这些功能,必须启用并配置 Apache Pulsar 中的功能支持(默认情况下处于禁用状态)。 内置连接器可能还需要安装在 Pulsar 集群上。

有关详细信息,请参阅 Pulsar IOPulsar 函数文档。

因为连接器只是特殊功能,为了简单起见,我们将源、接收和功能统称为“Pulsar 功能”。

1. Pulsar 功能管理

该框架提供了管理 Pulsar 函数的组件。 当您使用 Pulsar Spring Boot 启动器时,您将获得自动配置。PulsarFunctionAdministrationPulsarFunctionAdministration

默认情况下,应用程序会尝试连接到位于 的本地 Pulsar 实例。 但是,由于它利用了已配置的 ,请参阅 Pulsar Admin Client 了解可用的客户端选项(包括身份验证)。 spring.pulsar.function.* 应用程序属性提供了其他配置选项。localhost:8080PulsarAdministration

2. 自动功能管理

在应用程序启动时,框架会在应用程序上下文中找到所有 、 和 bean。 对于每个 Bean,都会创建或更新相应的 Pulsar 函数。 根据函数类型、函数配置以及函数是否已存在来调用正确的 API。PulsarFunctionPulsarSinkPulsarSource

、 和 bean 分别是围绕 Apache Pulsar 配置对象 、 和 的简单包装器。 由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。 相反,用户需要提供完整的配置对象,然后框架使用提供的配置处理管理(创建/更新)。PulsarFunctionPulsarSinkPulsarSourceFunctionConfigSinkConfigSourceConfig

在应用程序关闭时,在应用程序启动期间处理的所有函数都会强制执行其停止策略,并且会单独保留、停止或从 Pulsar 服务器中删除。

、 和 bean 分别是围绕 Apache Pulsar 配置对象 、 和 的简单包装器。 由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。 相反,用户需要提供完整的配置对象,然后框架使用提供的配置处理管理(创建/更新)。PulsarFunctionPulsarSinkPulsarSourceFunctionConfigSinkConfigSourceConfig

3. 限制

3.1. 没有神奇的脉冲星功能

Pulsar 函数和自定义连接器由自定义应用程序代码(例如 a )表示。 没有魔术支持自动注册自定义代码。 虽然这将是惊人的,但它有一些技术挑战,尚未实施。 因此,用户需要确保函数(或自定义连接器)在函数配置中指定的位置可用。 例如,如果函数配置的值为 ,则函数 jar 文件必须存在于指定的路径中。java.util.Functionjar./some/path/MyFunction.jar

3.2. 名称标识符

函数配置中的属性用作标识符,以确定函数是否已存在,以便确定是否执行更新或创建操作。 因此,如果需要函数更新,则不应修改名称。name

4. 配置

4.1. Pulsar 函数存档

每个 Pulsar 函数都由一个实际的存档(eg. jar 文件)表示。 存档的路径是通过源和接收器的属性以及函数的属性指定的。archivejar

以下规则确定路径的“类型”:

  • 当它开始时,路径是一个 URL w/(file|http|https|function|sink|source)://

  • 该路径在启动时是内置的,w/(指向提供的现成连接器之一)builtin://

  • 否则路径是本地的。

在创建/更新操作期间发生的操作取决于路径“type”,如下所示:

  • 当路径是 URL 时,服务器会下载内容

  • 当路径是内置的时,内容在服务器上已经可用

  • 当路径为本地路径时,内容将上传到服务器

4.2. 内置源和接收器

Apache Pulsar 提供了许多开箱即用的源和接收连接器,也就是内置连接器。要使用内置连接器,只需将 设置为(例如)。archivebuiltin://<connector-type>builtin://rabbit

5. 自定义功能

有关如何开发和打包自定义函数的详细信息,请参见 Pulsar 文档。 但是,在高层次上,要求如下:

  • 代码使用 Java8

  • 代码实现 或java.util.Functionorg.apache.pulsar.functions.api.Function

  • 包装成 uber jar

构建和打包函数后,有几种方法可以使其可用于函数注册。

5.1. file://

jar 文件可以上传到服务器,然后通过函数配置的属性进行引用file://jar

5.2. 本地

jar 文件可以保留在本地,然后通过函数配置属性中的本地路径进行引用。jar

5.3. http://

jar 文件可以通过 HTTP 服务器提供,然后通过函数配置的属性进行引用http(s)://jar

5.4. function://

jar 文件可以上传到 Pulsar 包管理器,然后通过函数配置的属性进行引用function://jar

6. 示例

以下是一些示例,展示了如何配置 Bean,从而自动创建后备 Pulsar 源连接器。PulsarSourcePulsarFunctionAdministration

使用内置 Rabbit 连接器的 PulsarSource
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

下一个示例与上一个示例相同,只是它使用自动配置的 Spring Boot 来减轻配置负担。当然,这需要应用程序使用启用了 Rabbit 自动配置的 Spring Boot。RabbitProperties

使用内置 Rabbit 连接器和 Spring Boot RabbitProperties 的 PulsarSource
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序