此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-function 4.1.4! |
Spring 集成交互
Spring Integration Framework 扩展了 Spring 编程模型以支持众所周知的企业集成模式。
它支持在基于 Spring 的应用程序内进行轻量级消息传递,并支持通过声明式适配器与外部系统集成。
它还提供了一个高级 DSL,用于将各种作(端点)组合到一个逻辑集成流中。
使用这种 DSL 配置的 lambda 样式, Spring 集成已经具有良好的java.util.function
interfaces 采用。
这@MessagingGateway
代理接口也可以作为Function
或Consumer
,根据 Spring Cloud Function 环境,可以将其注册到函数目录中。
请参阅 Spring Integration ReferenceManual 中有关其对函数的支持的更多信息。
另一方面,从 version4.0.3
Spring Cloud Function 引入了一个spring-cloud-function-integration
模块,该模块提供更深入、更特定于云和基于自动配置的 API,用于与FunctionCatalog
从 Spring 集成 DSL 的角度来看。
这FunctionFlowBuilder
自动配置并自动装配FunctionCatalog
,表示 target 的特定于函数的 DSL 的入口点IntegrationFlow
实例。
除了标准IntegrationFlow.from()
factories(为方便起见)、FunctionFlowBuilder
暴露一个fromSupplier(String supplierDefinition)
factory 查找目标Supplier
在提供的FunctionCatalog
.
然后这个FunctionFlowBuilder
导致FunctionFlowDefinition
.
这FunctionFlowDefinition
是IntegrationFlowExtension
和 exposesapply(String functionDefinition)
和accept(String consumerDefinition)
要查找的运算符Function
或Consumer
从FunctionCatalog
分别。
有关更多信息,请参阅他们的 Javadocs。
以下示例演示了FunctionFlowBuilder
与其他IntegrationFlow
应用程序接口:
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由于FunctionCatalog.lookup()
功能不仅限于简单的函数名称,函数组合功能也可以用于上述apply()
和accept()
运营商:
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
当我们在 Spring Cloud 应用程序中添加预定义函数的自动配置依赖项时,此 API 变得更加相关。
例如,Stream Applications 项目除了应用程序映像之外,还为各种集成用例提供了具有函数的工件,例如debezium-supplier
,elasticsearch-consumer
,aggregator-function
等。
以下配置基于http-supplier
,spel-function
和file-consumer
分别:
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
否则,我们需要的只是将它们的配置添加到application.properties
(如有必要):
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt