此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Framework 6.2.4! |
RSocket 系列
本节描述了 Spring 框架对 RSocket 协议的支持。
概述
RSocket 是一种用于通过 TCP 进行多路复用、双工通信的应用程序协议, WebSocket 和其他字节流传输,使用以下交互之一 模型:
-
Request-Response
— 发送一条消息,然后接收一条消息。 -
Request-Stream
— 发送一条消息并接收回消息流。 -
Channel
— 双向发送消息流。 -
Fire-and-Forget
— 发送单向消息。
建立初始连接后,“client” 与 “server” 的区别将丢失,因为 双方都变得对称,并且每一方都可以发起上述交互之一。 这就是为什么在协议中将参与方称为 “requester” 和 “responder” 的原因 而上述交互称为 “请求流” 或简称为 “请求”。
以下是 RSocket 协议的主要功能和优势:
-
跨网络边界的 Reactive Streams 语义 — 对于流式请求,例如
Request-Stream
和Channel
、背压信号 在请求者和响应者之间移动,允许请求者在 源,从而减少对网络层拥塞控制的依赖,以及需求 用于网络级别或任何级别的缓冲。 -
请求限制 — 此功能被命名为 “租赁” ,以
LEASE
frame 那个 可以从每一端发送,以限制另一端允许的请求总数 在给定的时间内。租约会定期续订。 -
会话恢复 — 这是为连接丢失而设计的,需要一些状态 待维护。状态管理对应用程序是透明的,并且运行良好 结合背压,可以在可能的情况下停止生产者并减少 所需的状态量。
-
大型消息的分片和重新汇编。
-
Keepalive (检测信号)。
RSocket 具有多种语言的实现。Java 库构建在 Project Reactor 之上, 和 Reactor Netty 用于运输。这意味着 应用程序中来自 Reactive Streams Publishers 的信号以透明方式传播 通过 RSocket 跨网络。
协议
RSocket 的好处之一是它在 wire 上具有明确定义的行为,并且 易于阅读的规范以及一些协议扩展。因此它是 阅读规范是个好主意,独立于语言实现和更高级别 框架 API 的 API 中。本节提供了简洁的概述,以建立一些上下文。
连接
最初,客户端通过一些低级流传输(如
作为 TCP 或 WebSocket 发送,并发送一个SETUP
frame 添加到服务器中,为
连接。
服务器可能会拒绝SETUP
frame,但通常在发送之后(对于客户端)
和 received(对于服务器),双方都可以开始发出请求,除非SETUP
指示使用租赁语义来限制请求数,在这种情况下
双方都必须等待LEASE
frame 以允许发出请求。
发出请求
建立连接后,双方都可以通过
框架REQUEST_RESPONSE
,REQUEST_STREAM
,REQUEST_CHANNEL
或REQUEST_FNF
.每个
这些帧将一条消息从请求者传送到响应者。
然后,响应方可以返回PAYLOAD
帧,并且在这种情况下
之REQUEST_CHANNEL
请求者还可以发送PAYLOAD
具有更多请求的帧
消息。
当请求涉及消息流(如Request-Stream
和Channel
,
响应方必须遵循来自请求方的需求信号。Demand 表示为
消息数。初始需求在REQUEST_STREAM
和REQUEST_CHANNEL
框架。后续需求通过REQUEST_N
框架。
每一方还可以通过METADATA_PUSH
frame 的
与任何单个请求有关,但与整个连接有关。
消息格式
RSocket 消息包含数据和元数据。元数据可用于发送路由、
证券令牌等数据和元数据的格式可以不同。每个 Mime 类型
在SETUP
frame 并应用于给定连接上的所有请求。
虽然所有消息都可以包含元数据,但通常元数据(如路由)是按请求进行的
因此仅包含在请求的第一条消息中,即与其中一个帧一起REQUEST_RESPONSE
,REQUEST_STREAM
,REQUEST_CHANNEL
或REQUEST_FNF
.
协议扩展定义用于应用程序的常见元数据格式:
Java 实现
RSocket 的 Java 实现构建在 Project Reactor 之上。TCP 和 WebSocket 的传输方式是
基于 Reactor Netty 构建。作为反应式流
库,Reactor 简化了实现协议的工作。对于应用程序,它是
天生的使用Flux
和Mono
使用声明式运算符和透明 back
压力支持。
RSocket Java 中的 API 有意做到最小和基本。它侧重于协议 功能,并将应用程序编程模型(例如 RPC codegen 与其他模型)保留为 更高层次,独立关注。
主合约 io.rsocket.RSocket 使用Mono
表示
单条消息 /Flux
消息流,以及io.rsocket.Payload
实际的
message 中访问数据和元数据作为字节缓冲区。这RSocket
使用 Contract
对称。对于请求,应用程序将获得一个RSocket
执行
请求与。为了响应,应用程序实现了RSocket
处理请求。
这并不是一个详尽的介绍。在大多数情况下,Spring 应用程序 不必直接使用其 API。但是,观察或试验可能很重要 使用 RSocket 独立于 Spring。RSocket Java 存储库包含许多示例应用程序,这些应用程序 演示其 API 和协议功能。
弹簧支撑
这spring-messaging
module 包含以下内容:
-
RSocketRequester — 用于发出请求的 Fluent API 通过
io.rsocket.RSocket
使用数据和元数据编码/解码。 -
带注释的响应者 —
@MessageMapping
和@RSocketExchange
用于响应的带注释的处理程序方法。 -
RSocket 接口 — RSocket 服务声明 作为 Java 接口,使用
@RSocketExchange
方法,用作请求者或响应者。
这spring-web
module 包含Encoder
和Decoder
Jackson 等实现
CBOR/JSON 和 Protobuf 的 RSocket 应用程序可能需要。它还包含PathPatternParser
可以插入以进行高效的路由匹配。
Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括
在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有 Client
支持和自动配置RSocketRequester.Builder
和RSocketStrategies
.
有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分。
Spring Security 5.2 提供了 RSocket 支持。
Spring 集成 5.2 提供了入站和出站网关来与 RSocket 交互 客户端和服务器。有关更多详细信息,请参见 Spring 集成参考手册。
Spring Cloud 网关支持 RSocket 连接。
RSocketRequester 请求者
RSocketRequester
提供 Fluent API 来执行 RSocket 请求、接受和
返回 data 和 metadata 的对象,而不是低级数据缓冲区。可以使用
对称地,从 Client 端发出请求,以及从 Server 发出请求。
客户端请求者
要获取RSocketRequester
在客户端是连接到一个服务器,它涉及
发送 RSocketSETUP
Frame 替换为 Connection Settings。RSocketRequester
提供
构建器,它有助于准备io.rsocket.core.RSocketConnector
包括连接
的设置SETUP
框架。
这是使用默认设置进行连接的最基本方法:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
以上不会立即连接。发出请求时,共享连接为 透明地建立并使用。
连接设置
RSocketRequester.Builder
提供了以下内容来自定义初始SETUP
框架:
-
dataMimeType(MimeType)
— 设置连接上数据的 MIME 类型。 -
metadataMimeType(MimeType)
— 设置连接上元数据的 MIME 类型。 -
setupData(Object)
- 要包含在SETUP
. -
setupRoute(String, Object…)
— 路由到元数据中以包含在SETUP
. -
setupMetadata(Object, MimeType)
— 要包含在SETUP
.
对于 data,默认 MIME 类型派生自第一个配置的Decoder
.为
metadata,默认的 MIME 类型是 Composite metadata,它允许多个
每个请求的元数据值和 MIME 类型对。通常,两者都不需要更改。
Data and metadata 中的SETUP
frame 是可选的。在服务器端,可以使用@ConnectMapping方法处理
connection 和SETUP
框架。元数据可用于连接
级别安全性。
策略
RSocketRequester.Builder
接受RSocketStrategies
以配置请求者。
您需要使用它来提供编码器和解码器,用于数据的 (de) 序列化和
metadata 值。默认情况下,只有spring-core
为String
,byte[]
和ByteBuffer
已注册。添加spring-web
提供对更多
可以按如下方式注册:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
专为重复使用而设计。在某些情况下,例如客户端和服务器在
相同的应用程序,最好在 Spring 配置中声明它。
客户端响应方
RSocketRequester.Builder
可用于配置对来自
服务器。
您可以使用带注释的处理程序进行基于相同的客户端响应 在服务器上使用但以编程方式注册的基础结构,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
1 | 用PathPatternRouteMatcher 如果spring-web 存在,以实现高效
路由匹配。 |
2 | 从类创建响应者@MessageMapping 和/或@ConnectMapping 方法。 |
3 | 注册响应方。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
1 | 用PathPatternRouteMatcher 如果spring-web 存在,以实现高效
路由匹配。 |
2 | 从类创建响应者@MessageMapping 和/或@ConnectMapping 方法。 |
3 | 注册响应方。 |
请注意,以上只是专为 client 的编程注册而设计的快捷方式
反应。对于客户端响应者处于 Spring 配置中的替代场景,
您仍然可以声明RSocketMessageHandler
作为 Spring Bean,然后按如下方式应用:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
For the above you may also need to use setHandlerPredicate
in RSocketMessageHandler
to
switch to a different strategy for detecting client responders, e.g. based on a custom
annotation such as @RSocketClientResponder
vs the default @Controller
. This
is necessary in scenarios with client and server, or multiple clients in the same
application.
See also Annotated Responders, for more on the programming model.
Advanced
RSocketRequesterBuilder
provides a callback to expose the underlying
io.rsocket.core.RSocketConnector
for further configuration options for keepalive
intervals, session resumption, interceptors, and more. You can configure options
at that level as follows:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
Server Requester
To make requests from a server to connected clients is a matter of obtaining the
requester for the connected client from the server.
In Annotated Responders, @ConnectMapping
and @MessageMapping
methods support an
RSocketRequester
argument. Use it to access the requester for the connection. Keep in
mind that @ConnectMapping
methods are essentially handlers of the SETUP
frame which
must be handled before requests can begin. Therefore, requests at the very start must be
decoupled from handling. For example:
-
Java
-
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
1
Start the request asynchronously, independent from handling.
2
Perform handling and return completion Mono<Void>
.
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
1
Start the request asynchronously, independent from handling.
2
Perform handling in the suspending function.
Requests
-
Java
-
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
1
Specify a route to include in the metadata of the request message.
2
Provide data for the request message.
3
Declare the expected response.
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
1
Specify a route to include in the metadata of the request message.
2
Provide data for the request message.
3
Declare the expected response.
The interaction type is determined implicitly from the cardinality of the input and
output. The above example is a Request-Stream
because one value is sent and a stream
of values is received. For the most part you don’t need to think about this as long as the
choice of input and output matches an RSocket interaction type and the types of input and
output expected by the responder. The only example of an invalid combination is many-to-one.
The data(Object)
method also accepts any Reactive Streams Publisher
, including
Flux
and Mono
, as well as any other producer of value(s) that is registered in the
ReactiveAdapterRegistry
. For a multi-value Publisher
such as Flux
which produces the
same types of values, consider using one of the overloaded data
methods to avoid having
type checks and Encoder
lookup on every element:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
The data(Object)
step is optional. Skip it for requests that don’t send data:
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
Extra metadata values can be added if using
composite metadata (the default) and if the
values are supported by a registered Encoder
. For example:
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
For Fire-and-Forget
use the send()
method that returns Mono<Void>
. Note that the Mono
indicates only that the message was successfully sent, and not that it was handled.
For Metadata-Push
use the sendMetadata()
method with a Mono<Void>
return value.
Annotated Responders
RSocket responders can be implemented as @MessageMapping
and @ConnectMapping
methods.
@MessageMapping
methods handle individual requests while @ConnectMapping
methods handle
connection-level events (setup and metadata push). Annotated responders are supported
symmetrically, for responding from the server side and for responding from the client side.
Server Responders
To use annotated responders on the server side, add RSocketMessageHandler
to your Spring
configuration to detect @Controller
beans with @MessageMapping
and @ConnectMapping
methods:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
Then start an RSocket server through the Java RSocket API and plug the
RSocketMessageHandler
for the responder as follows:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
supports
composite and
routing metadata by default. You can set its
MetadataExtractor if you need to switch to a
different mime type or register additional metadata mime types.
You’ll need to set the Encoder
and Decoder
instances required for metadata and data
formats to support. You’ll likely need the spring-web
module for codec implementations.
By default SimpleRouteMatcher
is used for matching routes via AntPathMatcher
.
We recommend plugging in the PathPatternRouteMatcher
from spring-web
for
efficient route matching. RSocket routes can be hierarchical but are not URL paths.
Both route matchers are configured to use "." as separator by default and there is no URL
decoding as with HTTP URLs.
RSocketMessageHandler
can be configured via RSocketStrategies
which may be useful if
you need to share configuration between a client and a server in the same process:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
Client Responders
Annotated responders on the client side need to be configured in the
RSocketRequester.Builder
. For details, see
Client Responders.
@MessageMapping
Once server or
client responder configuration is in place,
@MessageMapping
methods can be used as follows:
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
The above @MessageMapping
method responds to a Request-Stream interaction having the
route "locate.radars.within". It supports a flexible method signature with the option to
use the following method arguments:
Method Argument
Description
@Payload
The payload of the request. This can be a concrete value of asynchronous types like
Mono
or Flux
.
Note: Use of the annotation is optional. A method argument that is not a simple type
and is not any of the other supported arguments, is assumed to be the expected payload.
RSocketRequester
Requester for making requests to the remote end.
@DestinationVariable
Value extracted from the route based on variables in the mapping pattern, e.g.
@MessageMapping("find.radar.{id}")
.
@Header
Metadata value registered for extraction as described in MetadataExtractor.
@Headers Map<String, Object>
All metadata values registered for extraction as described in MetadataExtractor.
The return value is expected to be one or more Objects to be serialized as response
payloads. That can be asynchronous types like Mono
or Flux
, a concrete value, or
either void
or a no-value asynchronous type such as Mono<Void>
.
The RSocket interaction type that an @MessageMapping
method supports is determined from
the cardinality of the input (i.e. @Payload
argument) and of the output, where
cardinality means the following:
Cardinality
Description
1
Either an explicit value, or a single-value asynchronous type such as Mono<T>
.
Many
A multi-value asynchronous type such as Flux<T>
.
0
For input this means the method does not have an @Payload
argument.
For output this is void
or a no-value asynchronous type such as Mono<Void>
.
The table below shows all input and output cardinality combinations and the corresponding
interaction type(s):
Input Cardinality
Output Cardinality
Interaction Types
0, 1
0
Fire-and-Forget, Request-Response
0, 1
1
Request-Response
0, 1
Many
Request-Stream
Many
0, 1, Many
Request-Channel
@RSocketExchange
As an alternative to @MessageMapping
, you can also handle requests with
@RSocketExchange
methods. Such methods are declared on an
RSocket Interface and can be used as a requester via
RSocketServiceProxyFactory
or implemented by a responder.
For example, to handle requests as a responder:
-
Java
-
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
There some differences between @RSocketExhange
and @MessageMapping
since the
former needs to remain suitable for requester and responder use. For example, while
@MessageMapping
can be declared to handle any number of routes and each route can
be a pattern, @RSocketExchange
must be declared with a single, concrete route. There are
also small differences in the supported method parameters related to metadata, see
@MessageMapping and
RSocket Interface for a list of supported parameters.
@RSocketExchange
can be used at the type level to specify a common prefix for all routes
for a given RSocket service interface.
@ConnectMapping
@ConnectMapping
handles the SETUP
frame at the start of an RSocket connection, and
any subsequent metadata push notifications through the METADATA_PUSH
frame, i.e.
metadataPush(Payload)
in io.rsocket.RSocket
.
@ConnectMapping
methods support the same arguments as
@MessageMapping but based on metadata and data from the SETUP
and
METADATA_PUSH
frames. @ConnectMapping
can have a pattern to narrow handling to
specific connections that have a route in the metadata, or if no patterns are declared
then all connections match.
@ConnectMapping
methods cannot return data and must be declared with void
or
Mono<Void>
as the return value. If handling returns an error for a new
connection then the connection is rejected. Handling must not be held up to make
requests to the RSocketRequester
for the connection. See
Server Requester for details.
MetadataExtractor
Responders must interpret metadata.
Composite metadata allows independently
formatted metadata values (e.g. for routing, security, tracing) each with its own mime
type. Applications need a way to configure metadata mime types to support, and a way
to access extracted values.
MetadataExtractor
is a contract to take serialized metadata and return decoded
name-value pairs that can then be accessed like headers by name, for example via @Header
in annotated handler methods.
DefaultMetadataExtractor
can be given Decoder
instances to decode metadata. Out of
the box it has built-in support for
"message/x.rsocket.routing.v0" which it decodes to
String
and saves under the "route" key. For any other mime type you’ll need to provide
a Decoder
and register the mime type as follows:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
Composite metadata works well to combine independent metadata values. However the
requester might not support composite metadata, or may choose not to use it. For this,
DefaultMetadataExtractor
may needs custom logic to map the decoded value to the output
map. Here is an example where JSON is used for metadata:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
When configuring MetadataExtractor
through RSocketStrategies
, you can let
RSocketStrategies.Builder
create the extractor with the configured decoders, and
simply use a callback to customize registrations as follows:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket Interface
The Spring Framework lets you define an RSocket service as a Java interface with
@RSocketExchange
methods. You can pass such an interface to RSocketServiceProxyFactory
to create a proxy which performs requests through an
RSocketRequester. You can also implement the
interface as a responder that handles requests.
Start by creating the interface with @RSocketExchange
methods:
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
Now you can create a proxy that performs requests when methods are called:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
You can also implement the interface to handle requests as a responder.
See Annotated Responders.
Method Parameters
Annotated, RSocket exchange methods support flexible method signatures with the following
method parameters:
Method argument
Description
@DestinationVariable
Add a route variable to pass to RSocketRequester
along with the route from the
@RSocketExchange
annotation in order to expand template placeholders in the route.
This variable can be a String or any Object, which is then formatted via toString()
.
@Payload
Set the input payload(s) for the request. This can be a concrete value, or any producer
of values that can be adapted to a Reactive Streams Publisher
via
ReactiveAdapterRegistry
Object
, if followed by MimeType
The value for a metadata entry in the input payload. This can be any Object
as long
as the next argument is the metadata entry MimeType
. The value can be a concrete
value or any producer of a single value that can be adapted to a Reactive Streams
Publisher
via ReactiveAdapterRegistry
.
MimeType
The MimeType
for a metadata entry. The preceding method argument is expected to be
the metadata value.
Return Values
Annotated, RSocket exchange methods support return values that are concrete value(s), or
any producer of value(s) that can be adapted to a Reactive Streams Publisher
via
ReactiveAdapterRegistry
.
By default, the behavior of RSocket service methods with synchronous (blocking) method
signature depends on response timeout settings of the underlying RSocket ClientTransport
as well as RSocket keep-alive settings. RSocketServiceProxyFactory.Builder
does expose a
blockTimeout
option that also lets you configure the maximum time to block for a response,
but we recommend configuring timeout values at the RSocket level for more control.