This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-function 4.2.1! |
Programming model
Function Catalog and Flexible Function Signatures
One of the main features of Spring Cloud Function is to adapt and support a range of type signatures for user-defined functions, while providing a consistent execution model.
That’s why all user-defined functions are transformed into a canonical representation by FunctionCatalog
.
While users don’t normally have to care about the FunctionCatalog
at all, it is useful to know what kind of functions are supported in user code.
It is also important to understand that Spring Cloud Function provides first-class support for reactive APIs, provided by Project Reactor.
This allows reactive primitives such as Mono
and Flux
to be used as types in user-defined functions thereby providing greater flexibility when choosing a programming model for your function implementation.
A reactive programming model also enables functional support for features that would be otherwise difficult or impossible to implement using an imperative programming style.
For more on this, please read the section on Function Arity.
Java 8 function support
Spring Cloud Function embraces and builds on top of the 3 core functional interfaces defined by Java since Java 8.
-
Supplier<O>
-
Function<I, O>
-
Consumer<I>
To constantly avoid mentioning Supplier
, Function
and Consumer
, we’ll refer to them as Functional beans where appropriate for the rest of this manual.
In a nutshell, any bean in your ApplicationContext
that is a Functional bean will be lazily registered with FunctionCatalog
.
This means that it could benefit from all of the additional features described in this reference manual.
In the simplest application, all you need to do is to declare a @Bean
of type Supplier
, Function
or Consumer
in your application configuration.
Then, you can use FunctionCatalog
to lookup a particular function based on its name.
For example:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
// . . .
FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase = catalog.lookup(“uppercase”);
It is important to understand that given uppercase
is a bean, you can certainly get it form the ApplicationContext
directly, but all you will get is just your bean as you declared it without any extra features provided by SCF.
When you look up a function via FunctionCatalog
, the instance you receive is wrapped (instrumented) with additional features (i.e., type conversion, composition, etc.) described in this manual.
Also, it is important to understand that a typical user does not use Spring Cloud Function directly.
Instead, a typical user implements a Java Function
, Supplier
, or Consumer
with the idea of using it in different execution contexts without additional work.
For example, the same Java function could be represented as a REST endpoint, a Streaming message handler, or an AWS Lambda, and even more, via Spring Cloud Function provided adapters as well as other frameworks using Spring Cloud Function as the core programming model (e.g. Spring Cloud Stream).
In summary, Spring Cloud Function instruments Java functions with additional features to be utilized in variety of execution contexts.
Function definition
While the previous example shows you how to lookup a function in FunctionCatalog
programmatically, in a typical integration case where Spring Cloud Function is used as the programming model by another framework (e.g. Spring Cloud Stream), you can declare which functions to use via the spring.cloud.function.definition
property.
It is important to know and understand the default behaviour when it comes to discovering functions in FunctionCatalog
.
For instance, if you only have one Functional bean in your ApplicationContext
, the spring.cloud.function.definition
property typically will not be required since a single function in FunctionCatalog
can be looked up by an empty name, or any name.
For example, assuming that uppercase
is the only function in your catalog, it can be looked up as catalog.lookup(null)
, catalog.lookup(“”)
, catalog.lookup(“foo”)
.
That said, for cases where you are using a framework such as Spring Cloud Stream, which uses spring.cloud.function.definition
, it is recommended to always use the spring.cloud.function.definition
property.
For example,
spring.cloud.function.definition=uppercase
Filtering ineligible functions
A typical ApplicationContext
may include beans that are valid Java functions, but not intended as candidates to be registered with FunctionCatalog
.
Such beans could be auto-configurations from other projects or any other bean that qualifies as a Java function.
The framework provides default filtering of known beans that should not be candidates for registration with FunctionCatalog
.
You can also add additional beans to this list by providing a comma-delimited list of bean definition names using the spring.cloud.function.ineligible-definitions
property.
For example,
spring.cloud.function.ineligible-definitions=foo,bar
Supplier
Supplier can be reactive - Supplier<Flux<T>>
or imperative - Supplier<T>
.
From an invocation standpoint, this should make no difference to the implementor of such a Supplier
.
However, when used within frameworks (e.g. Spring Cloud Stream), Suppliers, especially reactive, are often used to represent the source of a stream.
Therefore, they are invoked once to get the stream (e.g. Flux
) to which consumers can subscribe.
In other words, such suppliers represent an equivalent of an infinite stream.
Although, the same reactive suppliers can also represent a finite stream (e.g. result set on polled JDBC data). In those cases, such reactive suppliers must be hooked up to some polling mechanism of the underlying framework.
To assist with that Spring Cloud Function provides a marker annotation org.springframework.cloud.function.context.PollableBean
to signal that such supplier produces a finite stream and may need to be polled again.
However, it is important to understand that Spring Cloud Function itself provides no behavior for this annotation.
In addition, the PollableBean
annotation exposes a splittable attribute to signal that the produced stream needs to be split (see Splitter EIP)
Here is an example:
@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
return () -> {
String v1 = String.valueOf(System.nanoTime());
String v2 = String.valueOf(System.nanoTime());
String v3 = String.valueOf(System.nanoTime());
return Flux.just(v1, v2, v3);
};
}
Function
Functions can also be written in an imperative or reactive way.
Yet, unlike Supplier
and Consumer
, there are no special considerations for the implementor other then understanding that when used within frameworks, such as Spring Cloud Stream, a reactive function is invoked only once to pass a reference to the stream (i.e. Flux
or Mono
) whereas an imperative function is invoked once per event.
public Function<String, String> uppercase() {
. . . .
}
BiFunction
In the event you need to receive some additional data (metadata) with your payload, you can always declare your function signature to receive a Message
containing a map of headers with additional information.
public Function<Message<String>, String> uppercase() {
. . . .
}
To make your function signature a bit lighter and more POJO-like, there is another approach. You can use BiFunction
.
public BiFunction<String, Map, String> uppercase() {
. . . .
}
Given that a Message
only contains two attributes (payload and headers), and a BiFunction
requires two input parameters, the framework will automatically recognise this signature and extract the payload from the Message
passing it as a first argument and a Map
of headers as the second.
As a result, your function is not coupled to Spring’s messaging API.
Keep in mind that BiFunction
requires a strict signature where the second argument must be a Map
.
The same rule applies to BiConsumer
.
Function Composition
Function Composition is a feature that allows one to compose several functions into one. The core support is based on the function composition feature provided by Function.andThen(..), available since Java 8. However, Spring Cloud Function provides a few additional features on top of this.
Declarative Function Composition
This feature allows you to provide composition instructions in a declarative way using |
(pipe) or ,
(comma) delimiters when setting the spring.cloud.function.definition
property.
For example:
--spring.cloud.function.definition=uppercase|reverse
Here, we effectively provided a definition of a single function which itself is a composition of function uppercase
and function reverse
.
In fact, that is one of the reasons why the property name is definition and not name, since the definition of a function can be a composition of several named functions.
As mentioned, you can use ,
instead of |
, such as …definition=uppercase,reverse
.
Composing non-Functions
Spring Cloud Function also supports composing Supplier
with Consumer
or Function
as well as Function
with Consumer
.
What’s important to understand is the end product of such definitions.
Composing Supplier
with Function
still results in Supplier
while composing Supplier
with Consumer
will effectively render Runnable
.
Following the same logic, composing Function
with Consumer
will result in Consumer
.
And, of course, you can’t compose uncomposable objects such as Consumer
and Function
, Consumer
and Supplier
, etc.
Function Routing and Filtering
Since version 2.2, Spring Cloud Function provides a routing feature allowing you to invoke a single function, which acts as a router to an actual function you wish to invoke. This feature is very useful in certain FAAS environments where maintaining configurations for several functions could be cumbersome or exposing more than one function is not possible.
The RoutingFunction
is registered in FunctionCatalog under the name functionRouter
.
For simplicity and consistency, you can also refer to the RoutingFunction.FUNCTION_NAME
constant.
This function has the following signature:
public class RoutingFunction implements Function<Object, Object> {
// . . .
}
The routing instructions could be communicated in several ways. We support providing instructions via Message headers, System properties as well as a pluggable strategy. Let’s look at some of the details.
MessageRoutingCallback
The MessageRoutingCallback
is a strategy to assist with determining the name of the route-to function definition.
public interface MessageRoutingCallback {
default String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
}
All you need to do is implement and register a MessageRoutingCallback
as a bean to be picked up by the RoutingFunction
.
For example:
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
};
}
In the preceding example you can see a very simple implementation of MessageRoutingCallback
, which determines the function definition from the FunctionProperties.FUNCTION_DEFINITION
Message
header of the incoming Message
, returning an instance of String
representing the definition of the function to invoke.
Message Headers
If the input argument is of type Message<?>
, you can communicate routing instructions by setting one of spring.cloud.function.definition
or spring.cloud.function.routing-expression
Message
headers.
As the name of the property suggests, spring.cloud.function.routing-expression
relies on the Spring Expression Language (SpEL).
For more static cases you can use the spring.cloud.function.definition
header, which allows you to provide the name of a single function (e.g., …definition=foo
) or a composition instruction (e.g. …definition=foo|bar|baz
).
For more dynamic cases you can use the spring.cloud.function.routing-expression
header and provide SpEL expression that should resolve into definition of a function (as described above).
SpEL evaluation context’s root object is the actual input argument, so in the case of Message<?> you can construct an expression that has access to both payload and headers (e.g. spring.cloud.function.routing-expression=headers.function_name ).
|
SpEL allows users to provide a String representation of the Java code to be executed.
Given that the spring.cloud.function.routing-expression could be provided via Message headers means that the ability to set such expressions could be exposed to the end user (i.e. HTTP Headers when using the web module), which could result in some problems (e.g. malicious code).
To manage that, all expressions coming via Message headers will only be evaluated against SimpleEvaluationContext , which has limited functionality and is designed to only evaluate the context object (Message in our case).
On the other hand, all expressions that are set via property or system environment variable are evaluated against StandardEvaluationContext allowing for the full flexibility of the Java language.
While setting expressions via system/application property or environment variable is generally considered to be secure as it is not exposed to the end user in normal cases, there are cases where visibility as well as capability to update system, application and environment variables are indeed exposed to the end user via Spring Boot Actuator endpoints provided either by some other Spring project, a third party, or a custom implementation created by the end user.
Such endpoints must be secured using industry standard web security practices.
Spring Cloud Function does not expose any such endpoints.
|
In specific execution environments/models the adapters are responsible to translate and communicate spring.cloud.function.definition
and/or spring.cloud.function.routing-expression
via Message
header.
For example, when using spring-cloud-function-web you can provide spring.cloud.function.definition
as an HTTP header and the framework will propagate it, along with other HTTP headers, as Message headers.
Application Properties
Routing instructions can also be communicated via spring.cloud.function.definition
or spring.cloud.function.routing-expression
as application properties.
The rules described in the previous section apply here as well. The only difference is you provide these instructions as application properties (e.g., --spring.cloud.function.definition=foo
).
It is important to understand that providing spring.cloud.function.definition or spring.cloud.function.routing-expression as Message headers will only work for imperative functions (e.g. Function<Foo, Bar> ).
That is to say that we can only route per-message with imperative functions.
With reactive functions we can not route per-message.
Therefore, you can only provide your routing instructions as application properties.
It’s all about unit-of-work.
In an imperative function, the unit of work is Message so we can route based on such unit-of-work.
With a reactive function, the unit of work is the entire stream, so we’ll act only on the instruction provided via application properties and route the entire stream.
|
Order of priority for routing instructions
Given that we have several mechanisms of providing routing instructions, it is important to understand the priorities for conflict resolution in the event multiple mechanisms are used at the same time. Here is the order:
-
MessageRoutingCallback
(Takes precedence when function is imperative regardless if anything else is defined) -
Message Headers (If function is imperative and no
MessageRoutingCallback
provided) -
Application Properties (Any function)
Unroutable Messages
In the event a route-to function is not available in the catalog, you will get an exception stating that.
There are cases when such behavior is not desired and you may want to have some "catch-all" type function capable of handling such messages.
To accomplish that, the framework provides the org.springframework.cloud.function.context.DefaultMessageRoutingHandler
strategy.
All you need to do is register it as a bean.
Its default implementation will simply log the fact that the message is un-routable, but will allow message flow to proceed without the exception, effectively dropping the un-routable message.
If you need something more sophisticated, all you need to do is provide your own implementation of this strategy and register it as a bean.
@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
return new DefaultMessageRoutingHandler() {
@Override
public void accept(Message<?> message) {
// do something really cool
}
};
}
Function Filtering
Filtering is the type of routing where there are only two paths - 'go' or 'discard'. In terms of functions it mean you only want to invoke a certain function if some condition returns 'true', otherwise you want to discard input.
However, when it comes to discarding input there are many interpretations of what it could mean in the context of your application. For example, you may want to log it, or you may want to maintain a counter of discarded messages. You may also want to do nothing at all.
Because of these different paths, we do not provide a general configuration option for how to deal with discarded messages.
Instead, we simply recommend to define a simple Consumer
which would signify the 'discard' path:
@Bean
public Consumer<?> devNull() {
// log, count, or whatever
}
Now you can have a routing expression that really only has two paths effectively becoming a filter. For example:
--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'
Every message that does not fit the criteria to go to 'echo' function will go to 'devNull' where you can simply do nothing with it.
The signature Consumer<?>
will also ensure that no type conversion will be attempted resulting in almost no execution overhead.
When dealing with reactive inputs (e.g. Publisher), routing instructions must only be provided via Function properties.
This is due to the nature of the reactive functions which are invoked only once to pass a Publisher and the rest is handled by the reactor, hence we cannot access and/or rely on the routing instructions communicated via individual values (e.g., Message).
|
Multiple Routers
By default, the framework will always have a single routing function configured as described in previous sections.
However, there are times when you may need more than one routing function.
In that case you can create your own instance of the RoutingFunction
bean in addition to the existing one as long as you give it a name other than functionRouter
.
You can pass spring.cloud.function.routing-expression
or spring.cloud.function.definition
to RoutingFunction
as key/value pairs in the map.
Here is a simple example:
@Configuration protected static class MultipleRouterConfiguration { @Bean RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { Map<String, String> propertiesMap = new HashMap<>(); propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'"); return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback); } @Bean public Function<String, String> reverse() { return v -> new StringBuilder(v).reverse().toString(); } @Bean public Function<String, String> uppercase() { return String::toUpperCase; } }
Here is a test to demonstrates how it works:
@Test
public void testMultipleRouters() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("HELLO");
function = functionCatalog.lookup("mySpecialRouter");
assertThat(function).isNotNull();
message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
Input/Output Enrichment
There are often times when you need to modify or refine an incoming or outgoing Message and to keep your code clean of non-functional concerns. You don’t want to do it inside of your business logic.
You can always accomplish it via Function Composition. Such an approach provides several benefits:
-
It allows you to isolate this non-functional concern into a separate function which you can compose with the business function as a function definition.
-
It provides you with complete freedom (and danger) as to what you can modify before the incoming message reaches the actual business function.
@Bean
public Function<Message<?>, Message<?>> enrich() {
return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}
@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
// do whatever
}
Then, compose your function by providing the following function definition: enrich|myBusinessFunction
.
While the described approach is the most flexible, it is also the most involved. It requires you to write some code, then make it a bean, or manually register it as a function before you can compose it with the business function as you can see from the preceding example.
But what if modifications (enrichments) you are trying to make are trivial as they are in the preceding example? Is there a simpler and more dynamic and configurable mechanism to accomplish the same?
Since version 3.1.3, the framework allows you to provide SpEL expression to enrich individual message headers for both input going into a function and and output coming out of it. Let’s look at one of the tests as an example.
@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("split");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("hello")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("path", "foo/bar/baz")
.build());
assertThat(result.getHeaders()).containsKey("keyOut1"));
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
assertThat(result.getHeaders()).containsKey("keyOut2"));
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
}
}
Here you see properties called input-header-mapping-expression
and output-header-mapping-expression
preceded by the name of the function (i.e. split
) followed by the name of the message header key you want to set and the value as SpEL expression.
The first expression (for 'keyOut1') is a literal SpEL expressions enclosed in single quotes, effectively setting 'keyOut1' to value hello1
.
The keyOut2
is set to the value of the existing 'contentType' header.
You can also observe some interesting features in the input header mapping where we are actually splitting a value of the existing header 'path', setting individual values of key1 and key2 to the values of split elements based on the index.
If for whatever reason the provided expression evaluation fails, the execution of the function will proceed as if nothing ever happened. However, you will see the WARN message in your logs informing you about it. |
o.s.c.f.context.catalog.InputEnricher : Failed while evaluating expression "hello1" on incoming message. . .
In the event you are dealing with functions that have multiple inputs (next section), you can use an index immediately after input-header-mapping-expression
:
--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'
Function Arity
There are times when a stream of data needs to be categorized and organized. For example, consider a classic big-data use case of dealing with unorganized data containing, let’s say, ‘orders’ and ‘invoices’, and you want each to go into a separate data store. This is where function arity (functions with multiple inputs and outputs) support comes to play.
Let’s look at an example of such a function.MessageRoutingCallback
Full implementation details are available here. |
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
return flux -> ...;
}
Given that Project Reactor is a core dependency of SCF, we are using its Tuple library. Tuples give us a unique advantage by communicating to us both cardinality and type information. Both are extremely important in the context of SCSt. Cardinality lets us know how many input and output bindings need to be created and bound to the corresponding inputs and outputs of a function. Awareness of the type information ensures proper type conversion.
Also, this is where the ‘index’ part of the naming convention for binding names comes into play, since, in this function, the two output binding names are organise-out-0
and organise-out-1
.
At the moment, function arity is only supported for reactive functions (Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>> ) centered on complex event processing where evaluation and computation on confluence of events typically requires view into a stream of events rather than single event.
|
Input Header propagation
In a typical scenario input Message headers are not propagated to output and rightfully so, since the output of a function may be an input to something else requiring it’s own set of Message headers. However, there are times when such propagation may be necessary so Spring Cloud Function provides several mechanisms to accomplish this.
First you can always copy headers manually.
For example, if you have a Function with the signature that takes Message
and returns Message
(i.e., Function<Message, Message>
), you can simply and selectively copy headers yourselves.
Remember, if your function returns Message, the framework will not do anything to it other then properly converting its payload.
However, such approach may prove to be a bit tedious, especially in cases when you simply want to copy all headers.
To assist with cases like this we provide a simple property that would allow you to set a boolean flag on a function where you want input headers to be propagated.
The property is copy-input-headers
.
For example, let’s assume you have the following configuration:
@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
}
As you know you can still invoke this function by sending a Message to it (framework will take care of type conversion and payload extraction)
By simply setting spring.cloud.function.configuration.uppercase.copy-input-headers
to true
, the following assertion will be true as well
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).containsKey("foo");
Type conversion (Content-Type negotiation)
Content-Type negotiation is one of the core features of Spring Cloud Function as it allows to not only transform the incoming data to the types declared by the function signature, but to do the same transformation during function composition making otherwise un-composable (by type) functions composable.
To better understand the mechanics and the necessity behind content-type negotiation, we take a look at a very simple use case by using the following function as an example:
@Bean
public Function<Person, String> personFunction {..}
The function shown in the preceding example expects a Person
object as an argument and produces a String type as an output. If such function is invoked with the type Person
, then all works fine.
But, typically function plays a role of a handler for the incoming data which most often comes in the raw format such as byte[]
, JSON String
etc.
In order for the framework to succeed in passing the incoming data as an argument to this function, it has to somehow transform the incoming data to a Person
type.
Spring Cloud Function relies on two native to Spring mechanisms to accomplish that.
-
MessageConverter - to convert from incoming Message data to a type declared by the function.
-
ConversionService - to convert from incoming non-Message data to a type declared by the function.
This means that depending on the type of the raw data (Message or non-Message) Spring Cloud Function will apply one or the other mechanisms.
For most cases when dealing with functions that are invoked as part of some other request (e.g., HTTP, Messaging etc) the framework relies on MessageConverters
, since such requests already converted to Spring Message
.
In other words, the framework locates and applies the appropriate MessageConverter
.
To accomplish that, the framework needs some instructions from the user.
One of these instructions is already provided by the signature of the function itself (Person type).
Consequently, in theory, that should be (and, in some cases, is) enough.
However, for the majority of use cases, in order to select the appropriate MessageConverter
, the framework needs an additional piece of information.
That missing piece is contentType
header.
Such header usually comes as part of the Message where it is injected by the corresponding adapter that created such Message in the first place.
For example, HTTP POST request will have its content-type HTTP header copied to contentType
header of the Message.
For cases when such header does not exist framework relies on the default content type as application/json
.
Content Type versus Argument Type
As mentioned earlier, for the framework to select the appropriate MessageConverter
, it requires argument type and, optionally, content type information.
The logic for selecting the appropriate MessageConverter
resides with the argument resolvers which trigger right before the invocation of the user-defined function (which is when the actual argument type is known to the framework).
If the argument type does not match the type of the current payload, the framework delegates to the stack of the pre-configured MessageConverters
to see if any one of them can convert the payload.
The combination of contentType
and argument type is the mechanism by which framework determines if message can be converted to a target type by locating the appropriate MessageConverter
.
If no appropriate MessageConverter
is found, an exception is thrown, which you can handle by adding a custom MessageConverter
(see User-defined Message Converters
).
Do not expect Message to be converted into some other type based only on the contentType .
Remember that the contentType is complementary to the target type.
It is a hint, which MessageConverter may or may not take into consideration.
|
Message Converters
MessageConverters
define two methods:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
It is important to understand the contract of these methods and their usage, specifically in the context of Spring Cloud Stream.
The fromMessage
method converts an incoming Message
to an argument type.
The payload of the Message
could be any type, and it is up to the actual implementation of the MessageConverter
to support multiple types.
Provided MessageConverters
As mentioned earlier, the framework already provides a stack of MessageConverters
to handle most common use cases.
The following list describes the provided MessageConverters
, in order of precedence (the first MessageConverter
that works is used):
-
JsonMessageConverter
: Supports conversion of the payload of theMessage
to/from POJO for cases whencontentType
isapplication/json
using Jackson (DEFAULT) or Gson libraries. This message converter also aware oftype
parameter (e.g., application/json;type=foo.bar.Person). This is useful for cases where types may not be known at the time when function is developed, hence function signature may look likeFunction<?, ?>
orFunction
orFunction<Object, Object>
. In other words for type conversion we typically derive type from function signature. Having, mime-type parameter allows you to communicate type in a more dynamic way. -
ByteArrayMessageConverter
: Supports conversion of the payload of theMessage
frombyte[]
tobyte[]
for cases whencontentType
isapplication/octet-stream
. It is essentially a pass through and exists primarily for backward compatibility. -
StringMessageConverter
: Supports conversion of any type to aString
whencontentType
istext/plain
.
When no appropriate converter is found, the framework throws an exception. When that happens, you should check your code and configuration and ensure you did not miss anything (that is, ensure that you provided a contentType
by using a binding or a header).
However, most likely, you found some uncommon case (such as a custom contentType
perhaps) and the current stack of provided MessageConverters
does not know how to convert.
If that is the case, you can add custom MessageConverter
. See User-defined Message Converters.
User-defined MessageConverters
Spring Cloud Function exposes a mechanism to define and register additional MessageConverters
.
To use it, implement org.springframework.messaging.converter.MessageConverter
, configure it as a @Bean
.
It is then appended to the existing stack of `MessageConverter`s.
It is important to understand that custom MessageConverter implementations are added to the head of the existing stack.
Consequently, custom MessageConverter implementations take precedence over the existing ones, which lets you override as well as add to the existing converters.
|
The following example shows how to create a message converter bean to support a new content type called application/bar
:
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
Note on JSON options
In Spring Cloud Function we support Jackson and Gson mechanisms to deal with JSON.
And for your benefit have abstracted it under org.springframework.cloud.function.json.JsonMapper
which itself is aware of two mechanisms and will use the one selected by you or following the default rule.
The default rules are as follows:
-
Whichever library is on the classpath that is the mechanism that is going to be used. So if you have
com.fasterxml.jackson.*
to the classpath, Jackson is going to be used and if you havecom.google.code.gson
, then Gson will be used. -
If you have both, then Gson will be the default, or you can set
spring.cloud.function.preferred-json-mapper
property with either of two values:gson
orjackson
.
That said, the type conversion is usually transparent to the developer.
However, given that org.springframework.cloud.function.json.JsonMapper
is also registered as a bean you can easily inject it into your code if needed.
Kotlin Lambda support
We also provide support for Kotlin lambdas (since v2.0). Consider the following:
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
}
@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
}
The above represents Kotlin lambdas configured as Spring beans. The signature of each maps to a Java equivalent of Supplier
, Function
and Consumer
, and thus supported/recognized signatures by the framework.
While mechanics of Kotlin-to-Java mapping are outside of the scope of this documentation, it is important to understand that the same rules for signature transformation outlined in "Java 8 function support" section are applied here as well.
To enable Kotlin support all you need is to add Kotlin SDK libraries on the classpath which will trigger appropriate autoconfiguration and supporting classes.
Function Component Scan
Spring Cloud Function will scan for implementations of Function
, Consumer
and Supplier
in a package called functions
if it exists.
Using this feature you can write functions that have no dependencies on Spring - not even the @Component
annotation is needed.
If you want to use a different package, you can set spring.cloud.function.scan.packages
.
You can also use spring.cloud.function.scan.enabled=false
to switch off the scan completely.
Data Masking
A typical application comes with several levels of logging.
Certain cloud/serverless platforms may include sensitive data in the packets that are being logged for everyone to see.
While it is the responsibility of individual developers to inspect the data that is being logged, since logging comes from the framework itself, as of version 4.1, we have introduced JsonMasker
to initially help with masking sensitive data in AWS Lambda payloads.
However, the JsonMasker
is generic and is available to any module.
At the moment it will only work with structured data such as JSON.
All you need is to specify the keys you want to mask and it will take care of the rest.
Keys should be specified in the file META-INF/mask.keys
.
The format of the file is very simple where you can delimit several keys by commas, new line, or both.
Here is the example of the contents of such file:
eventSourceARN
asdf1, SS
Here you see three keys defined.
Once such a file exists, the JsonMasker
will use it to mask values of the keys specified.
And, here is the sample code that shows the usage:
private final static JsonMasker masker = JsonMasker.INSTANCE();
// . . .
logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));