此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-function 4.1.4! |
编程模型
函数目录和灵活的函数签名
Spring Cloud Function 的主要功能之一是适应和支持用户定义的函数的一系列类型签名,
同时提供一致的执行模型。
这就是为什么所有用户定义的函数都被转换为 Canonical 表示的原因FunctionCatalog
.
虽然用户通常不必关心FunctionCatalog
总之,了解什么是
类型的函数。
了解 Spring Cloud Function 为反应式 API 提供一流的支持也很重要
由 Project Reactor 提供,允许响应式原语(如Mono
和Flux
用作用户定义函数中的类型,在为
您的函数实现。
反应式编程模型还支持对原本难以或不可能实现的功能提供支持
使用命令式编程风格。有关更多信息,请阅读 函数 Arity 部分。
Java 8 函数支持
Spring Cloud Function 包含并构建在 Java 定义的 3 个核心功能接口之上 从 Java 8 开始可供我们使用。
-
供应商<O>
-
函数<I, O>
-
消费者<I>
为避免不断提及Supplier
,Function
和Consumer
在适当的时候,我们将在本手册的其余部分引用它们 Functional bean。
简而言之,Application Context 中任何属于 Functional bean 的 bean 都将延迟注册到FunctionCatalog
.
这意味着它可以受益于本参考手册中描述的所有其他功能。
在最简单的应用程序中,您需要做的就是声明@Bean
的类型Supplier
,Function
或Consumer
在您的应用程序配置中。
然后,您可以访问FunctionCatalog
并根据其名称查找特定函数。
例如:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
. . .
FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase = catalog.lookup(“uppercase”);
重要的是要了解这一点uppercase
是一个 bean,你当然可以从ApplicationContext
直接,但您得到的只是您声明的 bean,而没有 SCF 提供的任何额外功能。当您通过FunctionCatalog
,您将收到的实例使用本手册中描述的其他功能(即类型转换、组合等)进行包装(插桩)。此外,重要的是要了解典型用户不会直接使用 Spring Cloud Function。相反,典型用户实现 JavaFunction/Supplier/Consumer
其想法是在不同的执行上下文中使用它,而无需额外的工作。例如,相同的 java 函数可以通过提供的 Spring Cloud 函数表示为 REST 端点或流式消息处理程序或 AWS Lambda 等
适配器以及使用 Spring Cloud Function 作为核心编程模型的其他框架(例如 Spring Cloud Stream)。
因此,总而言之, Spring Cloud Function 使用可在各种执行上下文中使用的附加功能来检测 java 函数。
功能定义
虽然前面的示例向您展示了如何以编程方式在 FunctionCatalog 中查找函数,但在 Spring Cloud Function 被另一个框架(例如 Spring Cloud Stream)用作编程模型的典型集成案例中,您可以通过spring.cloud.function.definition
财产。知道在FunctionCatalog
.例如,如果您的ApplicationContext
这spring.cloud.function.definition
属性通常不需要,因为FunctionCatalog
可以通过空名称或任何名称进行查找。例如,假设uppercase
是目录中的唯一函数,则可以将其查找为catalog.lookup(null)
,catalog.lookup(“”)
,catalog.lookup(“foo”)
.
也就是说,对于您使用框架(例如 Spring Cloud Stream)的情况,它使用spring.cloud.function.definition
这是最佳实践,建议始终使用spring.cloud.function.definition
财产。
例如
spring.cloud.function.definition=uppercase
筛选不合格的函数
典型的 Application Context 可能包括 bean,这些 bean 是有效的 java 函数,但不是要注册的候选 beanFunctionCatalog
.
这样的 bean 可以是来自其他项目的自动配置,也可以是符合成为 Java 函数条件的任何其他 bean。
该框架提供了已知 bean 的默认过滤,这些 bean 不应该成为向函数 catalog 注册的候选 bean。
您还可以通过使用spring.cloud.function.ineligible-definitions
财产。
例如
spring.cloud.function.ineligible-definitions=foo,bar
供应商
供应商可以是被动的 - Supplier<Flux<T>>
或命令式 - Supplier<T>
.从调用的角度来看,这应该没有区别
向此类供应商的实施者披露。但是,在框架内使用
(例如,Spring Cloud Stream)、供应商,尤其是反应式的、
通常用于表示流的源,因此它们被调用一次以获取流(例如,Flux)
使用者可以订阅的。换句话说,这些供应商相当于无限流。
但是,相同的反应式 suppliers 也可以表示有限流(例如,轮询的 JDBC 数据上的结果集)。
在这些情况下,这种反应式供应商必须连接到底层框架的某种轮询机制。
为了帮助实现这一点, Spring Cloud Function 提供了一个 marker Commentsorg.springframework.cloud.function.context.PollableBean
来表示此类供应商生成
finite 流,可能需要再次轮询。也就是说,了解 Spring Cloud Function 本身是很重要的
不为此批注提供任何行为。
另外PollableBean
注解公开了一个 splittable 属性,以表示生成的流
需要拆分(参见 Splitter EIP)
示例如下:
@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
return () -> {
String v1 = String.valueOf(System.nanoTime());
String v2 = String.valueOf(System.nanoTime());
String v3 = String.valueOf(System.nanoTime());
return Flux.just(v1, v2, v3);
};
}
功能
Function 也可以以命令式或响应式方式编写,但与 Supplier 和 Consumer 不同的是,有 实现者无需特别考虑,只需了解在框架中使用时 比如 Spring Cloud Stream 等,响应式函数是 仅调用一次以将引用传递给流(即 Flux 或 Mono),并且每个事件调用一次 imperative。
public Function<String, String> uppercase() {
. . . .
}
双函数
如果您需要通过有效负载接收一些额外的数据(元数据),您始终可以将函数 signature 接收一个 Message,其中包含包含此类附加信息的 Headers 映射。
public Function<Message<String>, String> uppercase() {
. . . .
}
为了让你的函数签名更轻、更 POJO,还有另一种方法。您可以使用BiFunction
.
public BiFunction<String, Map, String> uppercase() {
. . . .
}
假设Message
仅包含两个属性(payload 和 headers)和BiFunction
需要两个输入参数,框架将自动识别此范例,并从Message
将其作为第一个参数传递,将 Headers 的 Map 作为第二个参数传递。
在这种情况下,你的函数也没有与 Spring 的消息传递 API 耦合。
请记住,BiFunction 需要严格的签名,其中第二个参数必须是 Map。
相同的规则适用于BiConsumer
.
功能组成
函数组合是一项功能,允许将多个函数组合成一个。 核心支持基于自 Java 8 以来提供的 Function.andThen(..) 支持中提供的函数组合功能。但是,最重要的是,我们提供了一些附加功能。
函数路由和筛选
从 2.2 版本开始,Spring Cloud Function 提供了路由功能,允许 you 调用单个函数,该函数充当您希望调用的实际函数的路由器。 此功能在某些维护配置的 FAAS 环境中非常有用 for several 函数可能很麻烦,或者无法公开多个函数。
这RoutingFunction
在 FunctionCatalog 中以名称functionRouter
.为了简单
和一致性,您也可以参考RoutingFunction.FUNCTION_NAME
不断。
此函数具有以下签名:
public class RoutingFunction implements Function<Object, Object> {
. . .
}
路由指令可以通过多种方式进行通信。我们支持通过 Message headers、System 属性以及可插拔策略。那么让我们看看一些细节
MessageRoutingCallback 回调
这MessageRoutingCallback
是一种帮助确定 route-to 函数定义名称的策略。
public interface MessageRoutingCallback {
default String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
}
您需要做的就是实现并将其注册为一个 bean,以便RoutingFunction
.
例如:
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
};
}
在前面的示例中,您可以看到MessageRoutingCallback
它确定函数定义FunctionProperties.FUNCTION_DEFINITION
Message 标头,并返回String
表示要调用的函数的定义。
消息报头
如果 input 参数的类型为Message<?>
中,您可以通过设置以下spring.cloud.function.definition
或spring.cloud.function.routing-expression
消息标头。
正如物业名称所暗示的那样spring.cloud.function.routing-expression
依赖于 Spring 表达式语言 (SpEL)。
对于更多静态情况,您可以使用spring.cloud.function.definition
标头,它允许您提供
单个函数的名称(例如…definition=foo
) 或组合指令(例如…definition=foo|bar|baz
).
对于更动态的情况,您可以使用spring.cloud.function.routing-expression
标头并提供应解析的 SpEL 表达式
转换为函数的定义(如上所述)。
SPEL 评估上下文的根对象是
actual input 参数,因此在Message<?> 您可以构造具有访问权限的表达式
到两者payload 和headers (例如,spring.cloud.function.routing-expression=headers.function_name ). |
SPEL 允许用户提供要执行的 Java 代码的字符串表示。鉴于spring.cloud.function.routing-expression 可以通过 Message Headers 提供,这意味着设置此类表达式的能力可能会暴露给最终用户(例如,使用 Web 模块时的 HTTP Headers),这可能会导致一些问题(例如,恶意代码)。为了管理这一点,所有通过 Message headers 传入的表达式将仅根据SimpleEvaluationContext 它的功能有限,并且旨在仅评估上下文对象(在我们的例子中为 Message)。另一方面,通过属性或系统变量设置的所有表达式都根据StandardEvaluationContext ,这允许 Java 语言的完全灵活性。
虽然通过系统/应用程序属性或环境变量设置表达式通常被认为是安全的,因为它在正常情况下不会向最终用户公开,但在某些情况下,可见性以及更新系统、应用程序和环境变量的能力确实通过某些 Spring 项目或第三方提供的 Spring Boot Actuator 端点或最终用户的自定义实现向最终用户公开。必须使用行业标准的 Web 安全实践来保护此类终端节点。
Spring Cloud Function 不会公开任何此类端点。 |
在特定的执行环境/模型中,适配器负责转换和通信spring.cloud.function.definition
和/或spring.cloud.function.routing-expression
通过 Message 标头。
例如,当使用 spring-cloud-function-web 时,你可以提供spring.cloud.function.definition
作为 HTTP
header 的 Header,框架会将其以及其他 HTTP 头作为 Message 头传播。
应用程序属性
路由指令也可以通过以下方式进行通信spring.cloud.function.definition
或spring.cloud.function.routing-expression
作为应用程序属性。在
上一节也适用于此处。唯一的区别是你以
应用程序属性(例如--spring.cloud.function.definition=foo
).
重要的是要了解,提供spring.cloud.function.definition 或spring.cloud.function.routing-expression 作为 Message 标头仅适用于命令式函数(例如Function<Foo, Bar> ).
也就是说,我们只能使用命令式函数路由每条消息。使用响应式函数,我们不能路由每条消息。因此,您只能将路由指令作为 Application Properties 提供。
这一切都与工作单元有关。在命令式功能中,工作单元是 Message,因此我们可以基于这样的工作单元进行路由。
使用 reactive function,work unit-of-work 是整个流,因此我们将只对 application 提供的指令进行作
属性并路由整个流。 |
路由指令的优先级顺序
鉴于我们有多种提供路由指令的机制,因此了解 在同时使用多个机制的情况下解决冲突,因此顺序如下:
-
MessageRoutingCallback
(如果 function 是 imperative 的,则无论是否定义了其他任何内容,都将接管) -
消息标头 (如果 function 是 imperative 且 no
MessageRoutingCallback
提供) -
应用程序属性(任何功能)
无法路由的消息
如果 route-to 函数在 catalog 中不可用,您将收到一个异常,指出该
在某些情况下,这种行为是不可取的,你可能希望有一些 “catch-all” 类型的函数来处理这样的消息。
为了实现这一点,框架提供了org.springframework.cloud.function.context.DefaultMessageRoutingHandler
策略。您需要做的就是将其注册为 bean。
它的默认实现将仅记录消息不可路由的事实,但将允许消息流在没有异常的情况下继续进行,从而有效地删除不可路由的消息。
如果你想要更复杂的东西,你需要做的就是提供你自己的这个策略的实现,并将其注册为 bean。
@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
return new DefaultMessageRoutingHandler() {
@Override
public void accept(Message<?> message) {
// do something really cool
}
};
}
函数过滤
过滤是一种只有两条路径的路由类型 - “go” 或 “discard”。就功能而言,它的意思是 你只想在某个条件返回 'true' 时调用某个函数,否则你想要丢弃输入。 但是,当涉及到丢弃 input 时,对于它在应用程序上下文中的含义,有许多解释。 例如,您可能希望记录它,或者您可能希望维护丢弃消息的计数器。您可能也不想什么都不做。 由于这些路径不同,我们不提供如何处理丢弃消息的通用配置选项。 相反,我们只建议定义一个简单的 Consumer,它表示 'discard' 路径:
@Bean
public Consumer<?> devNull() {
// log, count or whatever
}
现在,您可以让实际上只有两条路径的路由表达式有效地成为过滤器。例如:
--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'
每条不符合 'echo' 函数条件的消息都会进入 'devNull',在那里你什么都不能做。
签名Consumer<?>
还将确保不会尝试任何类型转换,从而几乎不会产生执行开销。
在处理响应式 Input(例如 Publisher)时,路由指令只能通过 Function 属性提供。这是 由于响应式函数的性质,这些函数只调用一次来传递 Publisher 和其余的 由 Reactor 处理,因此我们无法访问和/或依赖通过单个 值(例如 Message )。 |
多个路由器
默认情况下,框架将始终配置一个路由函数,如前面部分所述。但是,有时您可能需要多个路由功能。
在这种情况下,您可以创建自己的RoutingFunction
bean 中,只要您给它指定一个名称而不是functionRouter
.
您可以将spring.cloud.function.routing-expression
或spring.cloud.function.definition
到 RoutinFunction 作为 map 中的键/值对。
这是一个简单的例子
@Configuration protected static class MultipleRouterConfiguration { @Bean RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { Map<String, String> propertiesMap = new HashMap<>(); propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'"); return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback); } @Bean public Function<String, String> reverse() { return v -> new StringBuilder(v).reverse().toString(); } @Bean public Function<String, String> uppercase() { return String::toUpperCase; } }
以及演示其工作原理的测试
`
@Test public void testMultipleRouters() { System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'"); FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class); Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); assertThat(function).isNotNull(); Message<String> message = MessageBuilder.withPayload("hello").build(); assertThat(function.apply(message)).isEqualTo("HELLO"); function = functionCatalog.lookup("mySpecialRouter"); assertThat(function).isNotNull(); message = MessageBuilder.withPayload("hello").build(); assertThat(function.apply(message)).isEqualTo("olleh"); }
[[输入/输出丰富]] == 输入/输出丰富
很多时候,你需要修改或优化传入或传出的 Message,并保持你的代码没有非功能性问题。您不想在业务逻辑中执行此作。
您始终可以通过 Function Composition 来完成它。这种方法有几个好处:
-
它允许您将这个非功能性关注点隔离到一个单独的函数中,您可以将该函数与业务函数作为函数定义进行组合。
-
它为你提供了完全的自由(和危险),让你知道在传入消息到达实际业务功能之前你可以修改什么。
@Bean
public Function<Message<?>, Message<?>> enrich() {
return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}
@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
// do whatever
}
然后通过提供以下函数定义来编写您的函数enrich|myBusinessFunction
.
虽然所描述的方法是最灵活的,但它也是最复杂的,因为它需要您编写一些代码,使其成为 bean 或 手动将其注册为函数,然后才能将其与业务函数组合,如前面的示例所示。
但是,如果您尝试进行的修改 (扩充) 像前面的示例中一样微不足道,该怎么办?有没有一个更简单、更动态和可配置的 机制来实现相同的目标?
从版本 3.1.3 开始,该框架允许您提供 SPEL 表达式来丰富进入函数的 Importing 和 以及从中输出。让我们以其中一个测试为例。
@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("split");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("path", "foo/bar/baz").build());
assertThat(result.getHeaders().containsKey("keyOut1")).isTrue();
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
assertThat(result.getHeaders().containsKey("keyOut2")).isTrue();
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
}
}
在这里,您可以看到一个名为input-header-mapping-expression
和output-header-mapping-expression
前面是函数的名称(即split
),后跟要设置的消息标头键的名称和值 SPEL 表达式。第一个表达式(对于 'keyOut1')是用单引号括起来的文字 SpEL 表达式,有效地将 'keyOut1' 设置为 valuehello1
.这keyOut2
设置为现有 'contentType' 标头的值。
你还可以在 input Headers 映射中观察到一些有趣的功能,我们实际上拆分了现有 Headers 'path' 的值,将 key1 和 key2 的单个值设置为基于索引的拆分元素的值。
如果由于某种原因提供的表达式计算失败,则函数的执行将继续进行,就像什么都没发生一样。 但是,您会在日志中看到 WARN 消息,通知您 |
o.s.c.f.context.catalog.InputEnricher : Failed while evaluating expression "hello1" on incoming message. . .
如果您正在处理具有多个输入的函数(下一节),则可以在input-header-mapping-expression
--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'
函数 Arity
有时需要对数据流进行分类和组织。例如 考虑一个经典的大数据用例,即处理无组织的数据,比如说, 'orders' 和 'invoices',并且您希望每个都进入单独的数据存储。 这就是函数 arity(具有多个输入和输出的函数)支持的地方 来玩。
让我们看一个这样函数的示例(完整的实现细节在这里提供),
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
return flux -> ...;
}
鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用它的 Tuple 库。 元组通过向我们传达基数和类型信息,为我们提供了独特的优势。 在 SCSt 的背景下,两者都非常重要。Cardinality 让我们知道 需要创建多少个输入和输出绑定,并将其绑定到对应的 函数的输入和输出。了解类型信息可确保正确的类型 转换。
此外,这是绑定命名约定的 'index' 部分
names 开始发挥作用,因为在这个函数中,两个输出绑定
names 是organise-out-0
和organise-out-1
.
重要提示:目前,函数 arity 仅支持响应式函数
(Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>> ) 以复杂事件处理为中心
其中,对事件汇合的评估和计算通常需要查看
事件流,而不是单个事件。 |
Input Header 传播
在典型场景中,input Message 头不会传播到输出,这是理所当然的,因为函数的输出可能是其他事物的 input,需要它自己的 Message 头集。 但是,有时可能需要这种传播,因此 Spring Cloud Function 提供了几种机制来实现这一点。
首先,您始终可以手动复制 Headers。例如,如果你有一个 Function 的签名采用Message
并返回Message
(即Function<Message, Message>
),您可以简单而有选择地自己复制 headers。请记住,如果你的函数返回 Message,框架不会对它执行任何作,只会正确转换其有效负载。
但是,这种方法可能有点乏味,尤其是在您只想复制所有标头的情况下。
为了帮助处理此类情况,我们提供了一个简单的属性,该属性允许您在希望传播 input 标头的函数上设置布尔标志。
该属性为copy-input-headers
.
例如,假设您有以下配置:
@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
}
如你所知,你仍然可以通过向它发送 Message 来调用这个函数(框架将负责类型转换和有效负载提取)
只需设置spring.cloud.function.configuration.uppercase.copy-input-headers
自true
,以下断言也将为 true
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json"); Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build()); assertThat(result.getHeaders()).containsKey("foo");
类型转换(Content-Type 协商)
Content-Type 协商是 Spring Cloud Function 的核心功能之一,因为它不仅允许将传入数据转换为声明的类型 通过函数签名,但在函数组合期间执行相同的转换,使原本不可组合(按类型)的函数可组合。
为了更好地理解内容类型协商背后的机制和必要性,我们看了一个非常简单的用例,如下所示: 以以下函数为例:
@Bean
public Function<Person, String> personFunction {..}
前面示例中显示的函数需要一个Person
object 作为参数,并生成 String 类型作为输出。如果此类函数是
调用,并且Person
,然后一切正常。但通常函数扮演着传入数据的处理程序的角色,这些数据通常来自
在 Raw 格式(如byte[]
,JSON String
等。为了使框架成功地将传入数据作为参数传递给
这个函数,它必须以某种方式将传入的数据转换为Person
类型。
Spring Cloud Function 依赖于 Spring 的两种原生机制来实现这一点。
-
MessageConverter - 将传入的 Message 数据转换为函数声明的类型。
-
ConversionService - 将传入的非 Message 数据转换为函数声明的类型。
这意味着根据原始数据类型(Message 或非 Message),Spring Cloud Function 将应用一种或另一种机制。
在大多数情况下,当处理作为其他请求(例如 HTTP、消息等)的一部分调用的函数时,框架依赖于MessageConverters
,
因为这样的请求已经转换为 SpringMessage
.换句话说,框架会查找并应用适当的MessageConverter
.
为此,框架需要用户提供一些说明。这些指令之一已经由函数的签名提供
本身(Person 类型)。因此,从理论上讲,这应该(而且在某些情况下是)足够了。但是,对于大多数使用案例,为了
选择适当的MessageConverter
,框架需要一条额外的信息。缺失的部分是contentType
页眉。
这样的 Headers 通常作为 Message 的一部分,由首先创建此类 Message 的相应适配器注入。
例如,HTTP POST 请求会将其内容类型 HTTP 标头复制到contentType
标头。
对于不存在此类标头的情况,框架依赖于默认内容类型,如application/json
.
内容类型与参数类型
如前所述,对于框架选择适当的MessageConverter
,它需要参数类型和内容类型信息(可选)。
选择适当MessageConverter
驻留在参数 resolvers 中,该参数解析器在调用用户定义的
function(即框架知道实际参数类型时)。
如果参数类型与当前有效负载的类型不匹配,则框架会将
预配置MessageConverters
查看它们中的任何一个是否可以转换有效负载。
的组合contentType
参数类型是框架通过定位来确定消息是否可以转换为目标类型的机制
适当的MessageConverter
.
如果没有合适的MessageConverter
,则会引发异常,您可以通过添加自定义MessageConverter
(参见User-defined Message Converters
).
不要指望Message 仅根据contentType .
请记住,contentType 是对 target 类型的补充。
它是一个提示,它MessageConverter 可能会也可能不会考虑。 |
消息转换器
MessageConverters
定义两个方法:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
了解这些方法的 Contract 及其用法非常重要,特别是在 Spring Cloud Stream 的上下文中。
这fromMessage
方法将传入的Message
转换为参数类型。
的Message
可以是任何类型,并且是
直到实际实现MessageConverter
以支持多种类型。
提供的 MessageConverters
如前所述,该框架已经提供了MessageConverters
来处理最常见的使用案例。
以下列表描述了提供的MessageConverters
,按优先顺序(第一个MessageConverter
that works 被使用):
-
JsonMessageConverter
:支持转换Message
to/from POJO 的情况contentType
是application/json
使用 Jackson (DEFAULT) 或 Gson 库。此消息转换器还知道type
参数(例如,application/json;type=foo.bar.Person 的 Person)。这在开发函数时类型可能未知的情况下非常有用,因此函数签名可能如下所示Function<?, ?>
或Function
或Function<Object, Object>
.换句话说,对于类型转换,我们通常从函数签名中派生类型。具有 mime-type 参数允许您以更动态的方式传达类型。 -
ByteArrayMessageConverter
:支持转换Message
从byte[]
自byte[]
对于以下情况contentType
是application/octet-stream
.它本质上是一种传递,主要是为了向后兼容。 -
StringMessageConverter
:支持将任何类型的String
什么时候contentType
是text/plain
.
当找不到合适的转换器时,框架会引发异常。发生这种情况时,您应该检查您的代码和配置,并确保这样做了
不要错过任何内容(即,确保您提供了contentType
通过使用 Binding 或 Header)。
但是,您很可能发现了一些不常见的情况(例如自定义contentType
也许)和当前提供的MessageConverters
不知道如何转换。如果是这种情况,您可以添加自定义MessageConverter
.请参见用户定义的消息转换器。
用户定义的消息转换器
Spring Cloud Function 公开了一种机制来定义和注册额外的MessageConverters
.
要使用它,请实现org.springframework.messaging.converter.MessageConverter
,将其配置为@Bean
.
然后将其附加到现有的 'MessageConverter' 堆栈中。
了解这种习俗很重要MessageConverter implementations 被添加到现有堆栈的 head 中。
因此,自定义MessageConverter implementations 优先于现有 implementations,这使您可以覆盖和添加到现有 Converters。 |
以下示例说明如何创建消息转换器 Bean 以支持名为application/bar
:
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
有关 JSON 选项的说明
在 Spring Cloud Function 中,我们支持 Jackson 和 Gson 机制来处理 JSON。
为了您的利益,已将其抽象为org.springframework.cloud.function.json.JsonMapper
它本身知道两种机制,并将使用选定的
由您或遵循默认规则。
默认规则如下:
-
无论 Classpath 上的哪个库都是要使用的机制。因此,如果你有
com.fasterxml.jackson.*
添加到 classpath 中,将使用 Jackson,如果你有com.google.code.gson
,则将使用 Gson。 -
如果您两者都有,那么 Gson 将是默认的,或者您可以设置
spring.cloud.function.preferred-json-mapper
属性具有以下两个值之一:gson
或jackson
.
也就是说,类型转换通常对开发人员是透明的,但考虑到org.springframework.cloud.function.json.JsonMapper
也注册为 Bean
如果需要,您可以轻松地将其注入到您的代码中。
Kotlin Lambda 支持
我们还提供对 Kotlin lambda 的支持(自 v2.0 起)。 请考虑以下事项:
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
}
@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
}
以上表示配置为 Spring bean 的 Kotlin lambda。每个 cookie 的签名都映射到 Java 等效的Supplier
,Function
和Consumer
,因此框架支持/识别签名。
虽然 Kotlin 到 Java 映射的机制不在本文档的讨论范围之内,但请务必了解
此处也适用“Java 8 函数支持”部分中概述的签名转换规则。
要启用 Kotlin 支持,您只需在类路径上添加 Kotlin SDK 库,这将触发适当的 autoconfiguration 和支持类。
功能组件扫描
Spring Cloud Function 将扫描Function
,Consumer
和Supplier
在名为functions
如果存在。使用这个
功能,您可以编写不依赖于 Spring 的函数 - 甚至不依赖于 Spring@Component
需要 annotation。如果您想使用不同的
package 中,您可以设置spring.cloud.function.scan.packages
.您还可以使用spring.cloud.function.scan.enabled=false
以完全关闭扫描。
数据掩码
典型的应用程序带有多个级别的日志记录。某些云/无服务器平台可能会在记录的数据包中包含敏感数据,以供所有人查看。
虽然检查正在记录的数据是单个开发人员的责任,但日志记录来自框架本身,因此从 4.1 版本开始,我们引入了JsonMasker
最初帮助屏蔽 AWS Lambda 负载中的敏感数据。但是,JsonMasker
是通用的,可用于任何模块。目前,它仅适用于结构化数据,例如 JSON。您只需指定要屏蔽的键,其余的交给它。
应在文件中指定键META-INF/mask.keys
.文件格式非常简单,您可以用逗号或换行符或两者分隔多个键。
以下是此类文件内容的示例:
eventSourceARN asdf1, SS
在这里,您会看到定义了三个键 一旦存在这样的文件,JsonMasker 将使用它来屏蔽指定键的值。
下面是显示用法的示例代码
private final static JsonMasker masker = JsonMasker.INSTANCE(); . . . logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));