此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-function 4.2.1spring-doc.cadn.net.cn

编程模型

函数目录和灵活的函数签名

Spring Cloud Function 的主要功能之一是适应和支持用户定义函数的一系列类型签名,同时提供一致的执行模型。 这就是为什么所有用户定义的函数都被转换为 Canonical 表示形式的原因FunctionCatalog.spring-doc.cadn.net.cn

虽然用户通常不必关心FunctionCatalog总之,了解用户代码中支持哪些类型的函数是很有用的。spring-doc.cadn.net.cn

了解 Spring Cloud Function 为 Project Reactor 提供的反应式 API 提供一流的支持也很重要。 这允许响应式原语(如MonoFlux用作用户定义函数中的类型,从而在为函数实现选择编程模型时提供更大的灵活性。 反应式编程模型还支持对使用命令式编程风格难以或不可能实现的功能提供支持。 有关更多信息,请阅读 函数 Arity 部分。spring-doc.cadn.net.cn

Java 8 函数支持

Spring Cloud Function 包含并构建在 Java 自 Java 8 以来定义的 3 个核心功能接口之上。spring-doc.cadn.net.cn

为了不断避免提及Supplier,FunctionConsumer,在本手册的其余部分,我们将它们称为 Functional bean。spring-doc.cadn.net.cn

简而言之,你的ApplicationContext也就是说,一个 Functional bean 将被延迟注册到FunctionCatalog. 这意味着它可以受益于本参考手册中描述的所有其他功能。spring-doc.cadn.net.cn

在最简单的应用程序中,您需要做的就是声明一个@Bean的类型Supplier,FunctionConsumer在您的应用程序配置中。 然后,您可以使用FunctionCatalog根据其名称查找特定函数。spring-doc.cadn.net.cn

@Bean
public Function<String, String> uppercase() {
    return value -> value.toUpperCase();
}

// . . .

FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase =  catalog.lookup(“uppercase”);

重要的是要理解,鉴于uppercase是一个 bean,你当然可以从ApplicationContext直接,但您得到的只是您声明的 bean,而没有 SCF 提供的任何额外功能。 当您通过FunctionCatalog,则您收到的实例将包装(插桩)本手册中描述的其他功能(即类型转换、组合等)。spring-doc.cadn.net.cn

此外,重要的是要了解典型用户不会直接使用 Spring Cloud Function。 相反,典型用户实现 JavaFunction,SupplierConsumer其想法是在不同的执行上下文中使用它,而无需额外的工作。spring-doc.cadn.net.cn

例如,相同的 Java 函数可以表示为 REST 端点流式消息处理程序AWS Lambda,甚至通过 Spring Cloud Function 提供的适配器以及使用 Spring Cloud Function 作为核心编程模型的其他框架(例如 Spring Cloud Stream)表示。spring-doc.cadn.net.cn

总之, Spring Cloud Function 使用附加功能来检测 Java 函数,以便在各种执行上下文中使用。spring-doc.cadn.net.cn

功能定义

虽然前面的示例向您展示了如何在FunctionCatalog以编程方式,在 Spring Cloud Function 被另一个框架(例如 Spring Cloud Stream)用作编程模型的典型集成案例中,您可以通过spring.cloud.function.definition财产。 在发现FunctionCatalog.spring-doc.cadn.net.cn

例如,如果你的 Functional bean 中只有一个ApplicationContextspring.cloud.function.definition属性通常不需要,因为FunctionCatalog可以通过空名称或任何名称进行查找。 例如,假设uppercase是目录中的唯一函数,则可以将其查找为catalog.lookup(null),catalog.lookup(“”),catalog.lookup(“foo”).spring-doc.cadn.net.cn

也就是说,对于您使用 Spring Cloud Stream 等框架的情况,该框架使用spring.cloud.function.definition,建议始终使用spring.cloud.function.definition财产。spring-doc.cadn.net.cn

spring.cloud.function.definition=uppercase

筛选不合格的函数

典型的ApplicationContext可以包含作为有效 Java 函数的 bean,但不打算作为要注册的候选 beanFunctionCatalog. 这样的 bean 可以是来自其他项目的自动配置,也可以是符合Java函数条件的任何其他 bean。spring-doc.cadn.net.cn

该框架提供了对已知 bean 的默认过滤,这些 bean 不应该是注册的候选FunctionCatalog. 您还可以通过使用spring.cloud.function.ineligible-definitions财产。spring-doc.cadn.net.cn

spring.cloud.function.ineligible-definitions=foo,bar

供应商

供应商可以是被动的 - Supplier<Flux<T>>命令式 - Supplier<T>. 从调用的角度来看,这对此类Supplier.spring-doc.cadn.net.cn

但是,当在框架(例如 Spring Cloud Stream)中使用时,Suppliers,尤其是反应式的,通常用于表示流的源。 因此,它们被调用一次来获取流(例如Flux) 中,使用者可以订阅。 换句话说,这样的供应商相当于无限流spring-doc.cadn.net.cn

虽然,相同的响应式供应商也可以表示一个有限的流(例如,轮询的 JDBC 数据的结果集)。 在这些情况下,这种反应式供应商必须连接到底层框架的某种轮询机制。spring-doc.cadn.net.cn

为了帮助实现这一点, Spring Cloud Function 提供了一个 marker Commentsorg.springframework.cloud.function.context.PollableBean来表示此类供应商生成有限流,并且可能需要再次轮询。 但是,重要的是要了解 Spring Cloud Function 本身没有为此 Comments 提供任何行为。spring-doc.cadn.net.cn

此外,PollableBean注解公开了一个 splittable 属性,表示生成的流需要被分割(详见 Splitter EIP)spring-doc.cadn.net.cn

下面是一个示例:spring-doc.cadn.net.cn

@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
	return () -> {
		String v1 = String.valueOf(System.nanoTime());
		String v2 = String.valueOf(System.nanoTime());
		String v3 = String.valueOf(System.nanoTime());
		return Flux.just(v1, v2, v3);
	};
}

功能

函数也可以以命令式或反应式方式编写。 然而,与SupplierConsumer,实现者没有特别的考虑,只需了解在框架(例如 Spring Cloud Stream)中使用时,反应式函数仅调用一次以将引用传递给流(即FluxMono),而命令式函数则为每个事件调用一次。spring-doc.cadn.net.cn

public Function<String, String> uppercase() {
    . . . .
}

双函数

如果您需要通过有效负载接收一些额外的数据(元数据),您始终可以声明您的函数签名以接收Message包含带有附加信息的 Headers 映射。spring-doc.cadn.net.cn

public Function<Message<String>, String> uppercase() {
    . . . .
}

为了使您的函数签名更轻量级、更像 POJO,还有另一种方法。您可以使用BiFunction.spring-doc.cadn.net.cn

public BiFunction<String, Map, String> uppercase() {
    . . . .
}

假设Message仅包含两个属性(payload 和 headers)和一个BiFunction需要两个输入参数,框架将自动识别此签名并从Message将其作为第一个参数传递,并将Mapof 标头作为第二个。 因此,您的函数未与 Spring 的消息传递 API 耦合。 请记住,BiFunction需要一个严格的签名,其中第二个参数必须是Map. 相同的规则适用于BiConsumer.spring-doc.cadn.net.cn

消费者

Consumer 有点特殊,因为它有一个voidreturn 类型,这意味着阻塞,至少是潜在的。 很可能您不需要编写Consumer<Flux<?>>,但如果您确实需要这样做,请记得订阅 inputFlux.spring-doc.cadn.net.cn

功能组成

函数组合是一项功能,允许将多个函数组合成一个。 核心支持基于 Function.andThen(..) 提供的函数组合功能,该功能自 Java 8 起可用。 但是,Spring Cloud Function 在此基础上提供了一些额外的功能。spring-doc.cadn.net.cn

声明式函数组合

此功能允许您使用|(竖线)或,(逗号)分隔符。spring.cloud.function.definition财产。spring-doc.cadn.net.cn

--spring.cloud.function.definition=uppercase|reverse

在这里,我们有效地提供了一个函数的定义,它本身就是函数的组合uppercase和函数reverse. 事实上,这就是属性 name 是 definition 而不是 name 的原因之一,因为函数的定义可以是多个命名函数的组合。 如前所述,您可以使用,而不是|…​definition=uppercase,reverse.spring-doc.cadn.net.cn

编写非函数

Spring Cloud Function 还支持组合SupplierConsumerFunction以及FunctionConsumer. 重要的是要了解这些定义的最终结果。 构成SupplierFunction仍然导致Supplier撰写时SupplierConsumer将有效地呈现Runnable. 遵循相同的逻辑,编写FunctionConsumer将导致Consumer.spring-doc.cadn.net.cn

当然,您不能组合不可组合的对象,例如ConsumerFunction,ConsumerSupplier等。spring-doc.cadn.net.cn

函数路由和筛选

从版本 2.2 开始, Spring Cloud Function 提供了一个路由功能,允许您调用单个函数,该函数充当您希望调用的实际函数的路由器。 在某些 FAAS 环境中,此功能非常有用,因为在这些环境中,维护多个函数的配置可能很麻烦,或者无法公开多个函数。spring-doc.cadn.net.cn

RoutingFunctionFunctionCatalog 中以名称functionRouter. 为了简单和一致,您还可以参考RoutingFunction.FUNCTION_NAME不断。spring-doc.cadn.net.cn

此函数具有以下签名:spring-doc.cadn.net.cn

public class RoutingFunction implements Function<Object, Object> {
// . . .
}

路由指令可以通过多种方式进行通信。 我们支持通过 Message 标头、系统属性以及可插拔策略提供说明。 让我们看看一些细节。spring-doc.cadn.net.cn

MessageRoutingCallback 回调

MessageRoutingCallback是一种帮助确定 route-to 函数定义名称的策略。spring-doc.cadn.net.cn

public interface MessageRoutingCallback {
    default String routingResult(Message<?> message) {
	    return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
    }
}

您需要做的就是实现并注册一个MessageRoutingCallback作为 bean 中,由RoutingFunction. 例如:spring-doc.cadn.net.cn

@Bean
public MessageRoutingCallback customRouter() {
	return new MessageRoutingCallback() {
		@Override
		public String routingResult(Message<?> message) {
			return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
		}
	};
}

在前面的示例中,您可以看到MessageRoutingCallback,它从FunctionProperties.FUNCTION_DEFINITION Message标头Message,返回String表示要调用的函数的定义。spring-doc.cadn.net.cn

如果 input 参数的类型为Message<?>中,您可以通过设置以下spring.cloud.function.definitionspring.cloud.function.routing-expression Message头。 正如属性的名称所暗示的那样,spring.cloud.function.routing-expression依赖于 Spring 表达式语言 (SpEL)。 对于更多静态情况,您可以使用spring.cloud.function.definitionheader,它允许您提供单个函数的名称(例如…​definition=foo) 或组合指令(例如…​definition=foo|bar|baz). 对于更多动态情况,您可以使用spring.cloud.function.routing-expressionheader 并提供应解析为函数定义的 SPEL 表达式(如上所述)。spring-doc.cadn.net.cn

SPEL 求值上下文的根对象是实际的 input 参数,因此在Message<?>您可以构造一个表达式,该表达式可以访问两者payloadheaders(例如spring.cloud.function.routing-expression=headers.function_name).
SPEL 允许用户提供要执行的 Java 代码的 String 表示形式。 鉴于spring.cloud.function.routing-expression可以通过 Message headers 提供,这意味着设置此类表达式的能力可能会暴露给最终用户(即使用 Web 模块时的 HTTP Headers),这可能会导致一些问题(例如恶意代码)。 为了管理这一点,所有通过 Message headers 传入的表达式将仅根据SimpleEvaluationContext,它的功能有限,旨在仅评估上下文对象(在我们的例子中为 Message)。 另一方面,通过属性或系统环境变量设置的所有表达式都将根据StandardEvaluationContext允许 Java 语言的充分灵活性。 虽然通过系统/应用程序属性或环境变量设置表达式通常被认为是安全的,因为它在正常情况下不会向最终用户公开,但在某些情况下,可见性以及更新系统、应用程序和环境变量的能力确实通过其他一些 Spring 项目提供的 Spring Boot Actuator 端点向最终用户公开。 第三方或最终用户创建的自定义实施。 必须使用行业标准的 Web 安全实践来保护此类终端节点。 Spring Cloud Function 不会公开任何此类端点。

在特定的执行环境/模型中,适配器负责转换和通信spring.cloud.function.definition和/或spring.cloud.function.routing-expression通过Message页眉。 例如,当使用 spring-cloud-function-web 时,你可以提供spring.cloud.function.definition作为 HTTP 标头,框架会将其与其他 HTTP 标头一起作为 Message 标头传播。spring-doc.cadn.net.cn

应用程序属性spring-doc.cadn.net.cn

路由指令也可以通过以下方式进行通信spring.cloud.function.definitionspring.cloud.function.routing-expression作为应用程序属性。 上一节中描述的规则也适用于此处。唯一的区别是,您将这些指令作为应用程序属性(例如--spring.cloud.function.definition=foo).spring-doc.cadn.net.cn

重要的是要了解,提供spring.cloud.function.definitionspring.cloud.function.routing-expression作为 Message 标头仅适用于命令式函数(例如Function<Foo, Bar>). 也就是说,我们只能使用命令式函数路由每条消息。 使用响应式函数,我们不能路由每条消息。 因此,您只能将路由指令作为应用程序属性提供。 这一切都与工作单元有关。 在命令式函数中,工作单元是 Message,因此我们可以基于这样的工作单元进行路由。 使用响应式函数,工作单元是整个流,因此我们将仅对通过应用程序属性提供的指令进行作并路由整个流。

路由指令的优先级顺序spring-doc.cadn.net.cn

鉴于我们有多种提供路由指令的机制,因此在同时使用多种机制的情况下了解冲突解决的优先级非常重要。 这是顺序:spring-doc.cadn.net.cn

  1. MessageRoutingCallback(当 function 是 imperative 时优先,无论是否定义了其他任何内容)spring-doc.cadn.net.cn

  2. 消息标头 (如果 function 是 imperative 且 noMessageRoutingCallback提供)spring-doc.cadn.net.cn

  3. 应用程序属性(任何功能)spring-doc.cadn.net.cn

无法路由的消息spring-doc.cadn.net.cn

如果 route-to 函数在目录中不可用,您将收到一个异常,说明这一点。spring-doc.cadn.net.cn

在某些情况下,这种行为是不可取的,你可能希望有一些 “catch-all” 类型的函数能够处理此类消息。 为了实现这一点,框架提供了org.springframework.cloud.function.context.DefaultMessageRoutingHandler策略。 您需要做的就是将其注册为 bean。 它的默认实现将仅记录消息不可路由的事实,但将允许消息流在没有异常的情况下继续进行,从而有效地删除不可路由的消息。 如果您需要更复杂的东西,您需要做的就是提供您自己的此策略实现并将其注册为 bean。spring-doc.cadn.net.cn

@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
	return new DefaultMessageRoutingHandler() {
		@Override
		public void accept(Message<?> message) {
			// do something really cool
		}
	};
}

函数过滤

过滤是一种只有两条路径的路由类型 - “go” 或 “discard”。就函数而言,这意味着你只想在某个条件返回 'true' 时调用某个函数,否则你想要丢弃输入。spring-doc.cadn.net.cn

但是,当涉及到丢弃 input 时,对于它在应用程序上下文中的含义,有多种解释。 例如,您可能希望记录它,或者您可能希望维护一个丢弃消息的计数器。 您可能还希望什么都不做。spring-doc.cadn.net.cn

由于这些路径不同,我们不提供如何处理丢弃消息的通用配置选项。 相反,我们只建议定义一个简单的Consumer这将表示 'discard' 路径:spring-doc.cadn.net.cn

@Bean
public Consumer<?> devNull() {
   // log, count, or whatever
}

现在,您可以拥有一个实际上只有两条路径的路由表达式,它实际上变成了一个过滤器。 例如:spring-doc.cadn.net.cn

--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'

每条不符合 'echo' 函数条件的消息都会进入 'devNull',在那里你不能对它做任何事情。 签名Consumer<?>还将确保不会尝试任何类型转换,从而几乎不会产生执行开销。spring-doc.cadn.net.cn

当处理响应式 Input(例如 Publisher)时,路由指令只能通过 Function 属性提供。 这是由于响应式函数的性质,它们只被调用一次以传递Publisher其余的由 reactor 处理,因此我们无法访问和/或依赖通过单个值(例如 Message)传递的路由指令。

多个路由器

默认情况下,框架将始终配置一个路由函数,如前面部分所述。 但是,有时您可能需要多个路由功能。 在这种情况下,您可以创建自己的RoutingFunctionbean 中,只要您给它指定一个名称而不是functionRouter.spring-doc.cadn.net.cn

您可以将spring.cloud.function.routing-expressionspring.cloud.function.definitionRoutingFunction作为 Map 中的键/值对。spring-doc.cadn.net.cn

下面是一个简单的示例:spring-doc.cadn.net.cn

@Configuration
protected static class MultipleRouterConfiguration {

	@Bean
	RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
		Map<String, String> propertiesMap = new HashMap<>();
		propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
		return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
	}

	@Bean
	public Function<String, String> reverse() {
		return v -> new StringBuilder(v).reverse().toString();
	}

	@Bean
	public Function<String, String> uppercase() {
		return String::toUpperCase;
	}
}

下面是一个测试来演示它是如何工作的:spring-doc.cadn.net.cn

@Test
public void testMultipleRouters() {
	System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
	FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
	Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
	assertThat(function).isNotNull();
	Message<String> message = MessageBuilder.withPayload("hello").build();
	assertThat(function.apply(message)).isEqualTo("HELLO");

	function = functionCatalog.lookup("mySpecialRouter");
	assertThat(function).isNotNull();
	message = MessageBuilder.withPayload("hello").build();
	assertThat(function.apply(message)).isEqualTo("olleh");
}

输入/输出扩充

很多时候,你需要修改或优化传入或传出的 Message,并保持你的代码没有非功能性问题。 您不想在业务逻辑中执行此作。spring-doc.cadn.net.cn

您始终可以通过 Function Composition 来完成它。 这种方法有几个好处:spring-doc.cadn.net.cn

  • 它允许您将这个非功能性关注点隔离到一个单独的函数中,您可以将该函数与业务函数组合为函数定义。spring-doc.cadn.net.cn

  • 它为你提供了完全的自由(和危险),让你知道在传入的消息到达实际的业务功能之前你可以修改什么。spring-doc.cadn.net.cn

@Bean
public Function<Message<?>, Message<?>> enrich() {
    return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}

@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
    // do whatever
}

然后,通过提供以下函数定义来编写函数:enrich|myBusinessFunction.spring-doc.cadn.net.cn

虽然所描述的方法是最灵活的,但它也是最复杂的。 它要求您编写一些代码,然后将其设为 bean,或者手动将其注册为函数,然后才能使用业务函数编写它,如前面的示例所示。spring-doc.cadn.net.cn

但是,如果您尝试进行的修改 (扩充) 像前面的示例中一样微不足道,该怎么办? 是否有一种更简单、更动态且可配置的机制来实现相同的目的?spring-doc.cadn.net.cn

从版本 3.1.3 开始,该框架允许您提供 SPEL 表达式,以丰富进入函数的 Importing 和从函数输出的单个消息 Headers。 让我们以其中一个测试为例。spring-doc.cadn.net.cn

@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
					"--logging.level.org.springframework.cloud.function=DEBUG",
					"--spring.main.lazy-initialization=true",
					"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
					"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {

		FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
		FunctionInvocationWrapper function = functionCatalog.lookup("split");
		Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("hello")
				.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
				.setHeader("path", "foo/bar/baz")
				.build());
		assertThat(result.getHeaders()).containsKey("keyOut1"));
		assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
		assertThat(result.getHeaders()).containsKey("keyOut2"));
		assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
	}
}

在这里,您可以看到名为input-header-mapping-expressionoutput-header-mapping-expression前面是函数的名称(即split),后跟要设置的消息标头键的名称和值 SPEL 表达式。 第一个表达式(对于 'keyOut1')是用单引号括起来的文字 SpEL 表达式,有效地将 'keyOut1' 设置为 valuehello1. 这keyOut2设置为现有 'contentType' 标头的值。spring-doc.cadn.net.cn

你还可以在 input header mapping 中观察到一些有趣的功能,我们实际上是在拆分现有 header 'path' 的值,将 key1 和 key2 的单个值设置为基于索引的 split 元素的值。spring-doc.cadn.net.cn

如果由于任何原因提供的表达式计算失败,则函数的执行将继续进行,就像什么都没发生一样。 但是,您将在日志中看到 WARN 消息,通知您。
o.s.c.f.context.catalog.InputEnricher    : Failed while evaluating expression "hello1"  on incoming message. . .

如果您正在处理具有多个输入的函数(下一部分),则可以在input-header-mapping-expression:spring-doc.cadn.net.cn

--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'

函数 Arity

有时需要对数据流进行分类和组织。 例如,考虑一个经典的大数据使用案例,即处理包含“订单”和“发票”的无组织数据,您希望每个数据都进入单独的数据存储。 这就是函数 arity (具有多个输入和输出的函数) 支持发挥作用的地方。spring-doc.cadn.net.cn

让我们看一下这样一个函数的示例。MessageRoutingCallback 回调spring-doc.cadn.net.cn

完整的实施详情可在此处查看。
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
	return flux -> ...;
}

鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用它的 Tuple 库。 元组通过向我们传达基数类型信息,为我们提供了独特的优势。 在 SCSt 的上下文中,这两者都非常重要。Cardinality 让我们知道需要创建多少个输入和输出绑定,并将其绑定到函数的相应输入和输出。 了解类型信息可确保正确的类型转换。spring-doc.cadn.net.cn

此外,这也是绑定名称命名约定的 'index' 部分发挥作用的地方,因为在此函数中,两个输出绑定名称是organise-out-0organise-out-1.spring-doc.cadn.net.cn

目前,函数 arity 仅支持响应式函数 (Function<TupleN<Flux<?>…​>, TupleN<Flux<?>…​>>) 以复杂事件处理为中心,其中对事件汇合的评估和计算通常需要查看事件流,而不是单个事件。

Input Header 传播

在典型场景中,input Message 头不会传播到输出,这是理所当然的,因为函数的输出可能是其他事物的 input,需要它自己的 Message 头集。 但是,有时可能需要这种传播,因此 Spring Cloud Function 提供了几种机制来实现这一点。spring-doc.cadn.net.cn

首先,您始终可以手动复制 Headers。 例如,如果你有一个 Function 的签名采用Message并返回Message(即Function<Message, Message>),您可以简单而有选择地自己复制 headers。 请记住,如果你的函数返回 Message,框架不会对它执行任何作,只会正确转换其有效负载。 但是,这种方法可能有点乏味,尤其是在您只想复制所有标头的情况下。 为了帮助处理此类情况,我们提供了一个简单的属性,该属性允许您在希望传播 input 标头的函数上设置布尔标志。 该属性为copy-input-headers.spring-doc.cadn.net.cn

例如,假设您有以下配置:spring-doc.cadn.net.cn

@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {

	@Bean
	public Function<String, String> uppercase() {
		return x -> x.toUpperCase();
	}
}

如你所知,你仍然可以通过向它发送 Message 来调用这个函数(框架将负责类型转换和有效负载提取)spring-doc.cadn.net.cn

只需设置spring.cloud.function.configuration.uppercase.copy-input-headerstrue,以下断言也将为 truespring-doc.cadn.net.cn

Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).containsKey("foo");

类型转换(Content-Type 协商)

Content-Type 协商是 Spring Cloud Function 的核心功能之一,因为它不仅允许将传入数据转换为声明的类型 通过函数签名,但在函数组合期间执行相同的转换,使原本不可组合(按类型)的函数可组合。spring-doc.cadn.net.cn

为了更好地理解内容类型协商背后的机制和必要性,我们看了一个非常简单的用例,如下所示: 以以下函数为例:spring-doc.cadn.net.cn

@Bean
public Function<Person, String> personFunction {..}

前面示例中显示的函数需要一个Personobject 作为参数,并生成 String 类型作为输出。如果此类函数是使用Person,则一切正常。 但是,通常,函数扮演传入数据的处理程序的角色,这些数据通常以原始格式出现,例如byte[],JSON String等。 为了使框架成功地将传入数据作为参数传递给此函数,它必须以某种方式将传入数据转换为Person类型。spring-doc.cadn.net.cn

Spring Cloud Function 依赖于 Spring 的两种原生机制来实现这一点。spring-doc.cadn.net.cn

  1. MessageConverter - 将传入的 Message 数据转换为函数声明的类型。spring-doc.cadn.net.cn

  2. ConversionService - 将传入的非 Message 数据转换为函数声明的类型。spring-doc.cadn.net.cn

这意味着根据原始数据类型(Message 或非 Message),Spring Cloud Function 将应用一种或另一种机制。spring-doc.cadn.net.cn

在大多数情况下,当处理作为其他请求(例如 HTTP、消息等)的一部分调用的函数时,框架依赖于MessageConverters,因为此类请求已转换为 SpringMessage. 换句话说,框架会查找并应用适当的MessageConverter. 为此,框架需要用户提供一些说明。 这些指令之一已经由函数本身的签名(Person 类型)提供。 因此,从理论上讲,这应该(而且在某些情况下是)足够了。 但是,对于大多数使用案例,为了选择合适的MessageConverter,框架需要一条额外的信息。 缺失的部分是contentType页眉。spring-doc.cadn.net.cn

这样的 Headers 通常作为 Message 的一部分,由首先创建此类 Message 的相应适配器注入。 例如,HTTP POST 请求会将其内容类型 HTTP 标头复制到contentType标头。spring-doc.cadn.net.cn

对于不存在此类标头的情况,框架依赖于默认内容类型,如application/json.spring-doc.cadn.net.cn

内容类型与参数类型

如前所述,对于框架选择适当的MessageConverter,它需要参数类型和内容类型信息(可选)。 选择适当MessageConverter驻留在参数解析器中,该解析器在调用用户定义的函数之前触发(即当框架知道实际参数类型时)。 如果参数类型与当前有效负载的类型不匹配,则框架将委托给预配置的MessageConverters查看它们中的任何一个是否可以转换有效负载。spring-doc.cadn.net.cn

的组合contentType参数类型是框架通过查找适当的MessageConverter. 如果没有合适的MessageConverter,则会引发异常,您可以通过添加自定义MessageConverter(参见User-defined Message Converters).spring-doc.cadn.net.cn

不要指望Message仅根据contentType. 请记住,contentType是对 target 类型的补充。 它是一个提示,它MessageConverter可能会也可能不会考虑。

消息转换器

MessageConverters定义两个方法:spring-doc.cadn.net.cn

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

了解这些方法的 Contract 及其用法非常重要,特别是在 Spring Cloud Stream 的上下文中。spring-doc.cadn.net.cn

fromMessage方法将传入的Message转换为参数类型。 的Message可以是任何类型,这取决于MessageConverter以支持多种类型。spring-doc.cadn.net.cn

提供的 MessageConverters

如前所述,该框架已经提供了MessageConverters来处理最常见的使用案例。 以下列表描述了提供的MessageConverters,按优先顺序(第一个MessageConverterthat works 被使用):spring-doc.cadn.net.cn

  1. JsonMessageConverter:支持转换Messageto/from POJO 的情况contentTypeapplication/json使用 Jackson (DEFAULT) 或 Gson 库。此消息转换器还知道type参数(例如,application/json;type=foo.bar.Person 的 Person)。这在开发函数时类型可能未知的情况下非常有用,因此函数签名可能如下所示Function<?, ?>FunctionFunction<Object, Object>.换句话说,对于类型转换,我们通常从函数签名中派生类型。具有 mime-type 参数允许您以更动态的方式传达类型。spring-doc.cadn.net.cn

  2. ByteArrayMessageConverter:支持转换Messagebyte[]byte[]对于以下情况contentTypeapplication/octet-stream.它本质上是一种传递,主要是为了向后兼容。spring-doc.cadn.net.cn

  3. StringMessageConverter:支持将任何类型的String什么时候contentTypetext/plain.spring-doc.cadn.net.cn

当找不到合适的转换器时,框架会引发异常。发生这种情况时,您应该检查您的代码和配置,并确保没有遗漏任何内容(即,确保您提供了contentType通过使用 Binding 或 Header)。 但是,您很可能发现了一些不常见的情况(例如自定义contentType也许)和当前提供的MessageConverters不知道如何转换。 如果是这种情况,您可以添加自定义MessageConverter.请参见用户定义的消息转换器spring-doc.cadn.net.cn

用户定义的 MessageConverters

Spring Cloud Function 公开了一种机制来定义和注册额外的MessageConverters. 要使用它,请实现org.springframework.messaging.converter.MessageConverter,将其配置为@Bean. 然后将其附加到现有的 'MessageConverter' 堆栈中。spring-doc.cadn.net.cn

了解这种习俗很重要MessageConverterimplementations 被添加到现有堆栈的 head 中。 因此,自定义MessageConverterimplementations 优先于现有 implementations,这使您可以覆盖和添加到现有 Converters。

以下示例说明如何创建消息转换器 Bean 以支持名为application/bar:spring-doc.cadn.net.cn

@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

有关 JSON 选项的说明

在 Spring Cloud Function 中,我们支持 Jackson 和 Gson 机制来处理 JSON。 为了您的利益,已将其抽象为org.springframework.cloud.function.json.JsonMapper它本身知道两种机制,并将使用您选择的机制或遵循默认规则的机制。 默认规则如下:spring-doc.cadn.net.cn

  • 无论 Classpath 上的哪个库都是要使用的机制。因此,如果你有com.fasterxml.jackson.*添加到 classpath 中,将使用 Jackson,如果你有com.google.code.gson,则将使用 Gson。spring-doc.cadn.net.cn

  • 如果您两者都有,那么 Gson 将是默认的,或者您可以设置spring.cloud.function.preferred-json-mapper属性具有以下两个值之一:gsonjackson.spring-doc.cadn.net.cn

也就是说,类型转换通常对开发人员是透明的。 然而,鉴于org.springframework.cloud.function.json.JsonMapper也注册为 bean,如果需要,可以轻松地将其注入到代码中。spring-doc.cadn.net.cn

Kotlin Lambda 支持

我们还提供对 Kotlin lambda 的支持(自 v2.0 起)。 请考虑以下事项:spring-doc.cadn.net.cn

@Bean
open fun kotlinSupplier(): () -> String {
    return  { "Hello from Kotlin" }
}

@Bean
open fun kotlinFunction(): (String) -> String {
    return  { it.toUpperCase() }
}

@Bean
open fun kotlinConsumer(): (String) -> Unit {
    return  { println(it) }
}

以上表示配置为 Spring bean 的 Kotlin lambda。每个 cookie 的签名都映射到 Java 等效的Supplier,FunctionConsumer,因此框架支持/识别签名。 虽然 Kotlin 到 Java 映射的机制不在本文档的讨论范围之内,但请务必了解“Java 8 函数支持”部分中概述的签名转换规则也适用于此处。spring-doc.cadn.net.cn

要启用 Kotlin 支持,您只需在 Classpath 上添加 Kotlin SDK 库,这将触发适当的自动配置和支持类。spring-doc.cadn.net.cn

功能组件扫描

Spring Cloud Function 将扫描Function,ConsumerSupplier在名为functions如果存在。 使用此功能,您可以编写不依赖于 Spring 的函数 - 甚至不依赖于 Spring@Component需要 annotation。 如果要使用不同的包,可以将spring.cloud.function.scan.packages. 您还可以使用spring.cloud.function.scan.enabled=false以完全关闭扫描。spring-doc.cadn.net.cn

数据掩码

典型的应用程序带有多个级别的日志记录。 某些云/无服务器平台可能会在记录的数据包中包含敏感数据,以供所有人查看。 虽然检查正在记录的数据是各个开发人员的责任,但由于日志记录来自框架本身,因此从 4.1 版开始,我们引入了JsonMasker最初帮助屏蔽 AWS Lambda 负载中的敏感数据。 但是,JsonMasker是通用的,可用于任何模块。 目前,它仅适用于结构化数据,例如 JSON。 您只需指定要屏蔽的键,其余的交给它。 应在文件中指定键META-INF/mask.keys. 文件格式非常简单,您可以用逗号、换行符或两者分隔多个键。spring-doc.cadn.net.cn

以下是此类文件内容的示例:spring-doc.cadn.net.cn

eventSourceARN
asdf1, SS

在这里,您会看到定义了三个键。 一旦存在这样的文件,JsonMasker将使用它来屏蔽指定键的值。spring-doc.cadn.net.cn

下面是显示用法的示例代码:spring-doc.cadn.net.cn

private final static JsonMasker masker = JsonMasker.INSTANCE();
// . . .
logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));