此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-function 4.1.4spring-doc.cadn.net.cn

Spring 集成交互

Spring Integration Framework 扩展了 Spring 编程模型以支持众所周知的企业集成模式。 它支持在基于 Spring 的应用程序内进行轻量级消息传递,并支持通过声明式适配器与外部系统集成。 它还提供了一个高级 DSL,用于将各种作(端点)组合到一个逻辑集成流中。 使用这种 DSL 配置的 lambda 样式, Spring 集成已经具有良好的java.util.functioninterfaces 采用。 这@MessagingGateway代理接口也可以作为FunctionConsumer,根据 Spring Cloud Function 环境,可以将其注册到函数目录中。 请参阅 Spring Integration ReferenceManual 中有关其对函数的支持的更多信息。spring-doc.cadn.net.cn

另一方面,从 version4.0.3Spring Cloud Function 引入了一个spring-cloud-function-integration模块,该模块提供更深入、更特定于云和基于自动配置的 API,用于与FunctionCatalog从 Spring 集成 DSL 的角度来看。 这FunctionFlowBuilder自动配置并自动装配FunctionCatalog,表示 target 的特定于函数的 DSL 的入口点IntegrationFlow实例。 除了标准IntegrationFlow.from()factories(为方便起见)、FunctionFlowBuilder暴露一个fromSupplier(String supplierDefinition)factory 查找目标Supplier在提供的FunctionCatalog. 然后这个FunctionFlowBuilder导致FunctionFlowDefinition. 这FunctionFlowDefinitionIntegrationFlowExtension和 exposesapply(String functionDefinition)accept(String consumerDefinition)要查找的运算符FunctionConsumerFunctionCatalog分别。 有关更多信息,请参阅他们的 Javadocs。spring-doc.cadn.net.cn

以下示例演示了FunctionFlowBuilder与其他IntegrationFlow应用程序接口:spring-doc.cadn.net.cn

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

由于FunctionCatalog.lookup()功能不仅限于简单的函数名称,函数组合功能也可以用于上述apply()accept()运营商:spring-doc.cadn.net.cn

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

当我们在 Spring Cloud 应用程序中添加预定义函数的自动配置依赖项时,此 API 变得更加相关。 例如,Stream Applications 项目除了应用程序映像之外,还为各种集成用例提供了具有函数的工件,例如debezium-supplier,elasticsearch-consumer,aggregator-function等。spring-doc.cadn.net.cn

以下配置基于http-supplier,spel-functionfile-consumer分别:spring-doc.cadn.net.cn

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

否则,我们需要的只是将它们的配置添加到application.properties(如有必要):spring-doc.cadn.net.cn

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt