此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Framework 6.2.4spring-doc.cadn.net.cn

RSocket 系列

本节描述了 Spring 框架对 RSocket 协议的支持。spring-doc.cadn.net.cn

概述

RSocket 是一种用于通过 TCP 进行多路复用、双工通信的应用程序协议, WebSocket 和其他字节流传输,使用以下交互之一 模型:spring-doc.cadn.net.cn

建立初始连接后,“client” 与 “server” 的区别将丢失,因为 双方都变得对称,并且每一方都可以发起上述交互之一。 这就是为什么在协议中将参与方称为 “requester” 和 “responder” 的原因 而上述交互称为 “请求流” 或简称为 “请求”。spring-doc.cadn.net.cn

以下是 RSocket 协议的主要功能和优势:spring-doc.cadn.net.cn

  • 跨网络边界的 Reactive Streams 语义 — 对于流式请求,例如Request-StreamChannel、背压信号 在请求者和响应者之间移动,允许请求者在 源,从而减少对网络层拥塞控制的依赖,以及需求 用于网络级别或任何级别的缓冲。spring-doc.cadn.net.cn

  • 请求限制 — 此功能被命名为 “租赁” ,以LEASEframe 那个 可以从每一端发送,以限制另一端允许的请求总数 在给定的时间内。租约会定期续订。spring-doc.cadn.net.cn

  • 会话恢复 — 这是为连接丢失而设计的,需要一些状态 待维护。状态管理对应用程序是透明的,并且运行良好 结合背压,可以在可能的情况下停止生产者并减少 所需的状态量。spring-doc.cadn.net.cn

  • 大型消息的分片和重新汇编。spring-doc.cadn.net.cn

  • Keepalive (检测信号)。spring-doc.cadn.net.cn

RSocket 具有多种语言的实现Java 库构建在 Project Reactor 之上, 和 Reactor Netty 用于运输。这意味着 应用程序中来自 Reactive Streams Publishers 的信号以透明方式传播 通过 RSocket 跨网络。spring-doc.cadn.net.cn

协议

RSocket 的好处之一是它在 wire 上具有明确定义的行为,并且 易于阅读的规范以及一些协议扩展。因此它是 阅读规范是个好主意,独立于语言实现和更高级别 框架 API 的 API 中。本节提供了简洁的概述,以建立一些上下文。spring-doc.cadn.net.cn

最初,客户端通过一些低级流传输(如 作为 TCP 或 WebSocket 发送,并发送一个SETUPframe 添加到服务器中,为 连接。spring-doc.cadn.net.cn

服务器可能会拒绝SETUPframe,但通常在发送之后(对于客户端) 和 received(对于服务器),双方都可以开始发出请求,除非SETUP指示使用租赁语义来限制请求数,在这种情况下 双方都必须等待LEASEframe 以允许发出请求。spring-doc.cadn.net.cn

建立连接后,双方都可以通过 框架REQUEST_RESPONSE,REQUEST_STREAM,REQUEST_CHANNELREQUEST_FNF.每个 这些帧将一条消息从请求者传送到响应者。spring-doc.cadn.net.cn

然后,响应方可以返回PAYLOAD帧,并且在这种情况下 之REQUEST_CHANNEL请求者还可以发送PAYLOAD具有更多请求的帧 消息。spring-doc.cadn.net.cn

当请求涉及消息流(如Request-StreamChannel, 响应方必须遵循来自请求方的需求信号。Demand 表示为 消息数。初始需求在REQUEST_STREAMREQUEST_CHANNEL框架。后续需求通过REQUEST_N框架。spring-doc.cadn.net.cn

每一方还可以通过METADATA_PUSHframe 的 与任何单个请求有关,但与整个连接有关。spring-doc.cadn.net.cn

RSocket 消息包含数据和元数据。元数据可用于发送路由、 证券令牌等数据和元数据的格式可以不同。每个 Mime 类型 在SETUPframe 并应用于给定连接上的所有请求。spring-doc.cadn.net.cn

虽然所有消息都可以包含元数据,但通常元数据(如路由)是按请求进行的 因此仅包含在请求的第一条消息中,即与其中一个帧一起REQUEST_RESPONSE,REQUEST_STREAM,REQUEST_CHANNELREQUEST_FNF.spring-doc.cadn.net.cn

协议扩展定义用于应用程序的常见元数据格式:spring-doc.cadn.net.cn

Java 实现

RSocket 的 Java 实现构建在 Project Reactor 之上。TCP 和 WebSocket 的传输方式是 基于 Reactor Netty 构建。作为反应式流 库,Reactor 简化了实现协议的工作。对于应用程序,它是 天生的使用FluxMono使用声明式运算符和透明 back 压力支持。spring-doc.cadn.net.cn

RSocket Java 中的 API 有意做到最小和基本。它侧重于协议 功能,并将应用程序编程模型(例如 RPC codegen 与其他模型)保留为 更高层次,独立关注。spring-doc.cadn.net.cn

主合约 io.rsocket.RSocket 使用Mono表示 单条消息 /Flux消息流,以及io.rsocket.Payload实际的 message 中访问数据和元数据作为字节缓冲区。这RSocket使用 Contract 对称。对于请求,应用程序将获得一个RSocket执行 请求与。为了响应,应用程序实现了RSocket处理请求。spring-doc.cadn.net.cn

这并不是一个详尽的介绍。在大多数情况下,Spring 应用程序 不必直接使用其 API。但是,观察或试验可能很重要 使用 RSocket 独立于 Spring。RSocket Java 存储库包含许多示例应用程序,这些应用程序 演示其 API 和协议功能。spring-doc.cadn.net.cn

弹簧支撑

spring-messagingmodule 包含以下内容:spring-doc.cadn.net.cn

spring-webmodule 包含EncoderDecoderJackson 等实现 CBOR/JSON 和 Protobuf 的 RSocket 应用程序可能需要。它还包含PathPatternParser可以插入以进行高效的路由匹配。spring-doc.cadn.net.cn

Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括 在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有 Client 支持和自动配置RSocketRequester.BuilderRSocketStrategies. 有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分spring-doc.cadn.net.cn

Spring Security 5.2 提供了 RSocket 支持。spring-doc.cadn.net.cn

Spring 集成 5.2 提供了入站和出站网关来与 RSocket 交互 客户端和服务器。有关更多详细信息,请参见 Spring 集成参考手册。spring-doc.cadn.net.cn

Spring Cloud 网关支持 RSocket 连接。spring-doc.cadn.net.cn

RSocketRequester 请求者

RSocketRequester提供 Fluent API 来执行 RSocket 请求、接受和 返回 data 和 metadata 的对象,而不是低级数据缓冲区。可以使用 对称地,从 Client 端发出请求,以及从 Server 发出请求。spring-doc.cadn.net.cn

客户端请求者

要获取RSocketRequester在客户端是连接到一个服务器,它涉及 发送 RSocketSETUPFrame 替换为 Connection Settings。RSocketRequester提供 构建器,它有助于准备io.rsocket.core.RSocketConnector包括连接 的设置SETUP框架。spring-doc.cadn.net.cn

这是使用默认设置进行连接的最基本方法:spring-doc.cadn.net.cn

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

以上不会立即连接。发出请求时,共享连接为 透明地建立并使用。spring-doc.cadn.net.cn

连接设置

RSocketRequester.Builder提供了以下内容来自定义初始SETUP框架:spring-doc.cadn.net.cn

对于 data,默认 MIME 类型派生自第一个配置的Decoder.为 metadata,默认的 MIME 类型是 Composite metadata,它允许多个 每个请求的元数据值和 MIME 类型对。通常,两者都不需要更改。spring-doc.cadn.net.cn

Data and metadata 中的SETUPframe 是可选的。在服务器端,可以使用@ConnectMapping方法处理 connection 和SETUP框架。元数据可用于连接 级别安全性。spring-doc.cadn.net.cn

策略

RSocketRequester.Builder接受RSocketStrategies以配置请求者。 您需要使用它来提供编码器和解码器,用于数据的 (de) 序列化和 metadata 值。默认情况下,只有spring-coreString,byte[]ByteBuffer已注册。添加spring-web提供对更多 可以按如下方式注册:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies专为重复使用而设计。在某些情况下,例如客户端和服务器在 相同的应用程序,最好在 Spring 配置中声明它。spring-doc.cadn.net.cn

客户端响应方

RSocketRequester.Builder可用于配置对来自 服务器。spring-doc.cadn.net.cn

您可以使用带注释的处理程序进行基于相同的客户端响应 在服务器上使用但以编程方式注册的基础结构,如下所示:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 PathPatternRouteMatcher如果spring-web存在,以实现高效 路由匹配。
2 从类创建响应者@MessageMapping和/或@ConnectMapping方法。
3 注册响应方。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 PathPatternRouteMatcher如果spring-web存在,以实现高效 路由匹配。
2 从类创建响应者@MessageMapping和/或@ConnectMapping方法。
3 注册响应方。

请注意,以上只是专为 client 的编程注册而设计的快捷方式 反应。对于客户端响应者处于 Spring 配置中的替代场景, 您仍然可以声明RSocketMessageHandler作为 Spring Bean,然后按如下方式应用:spring-doc.cadn.net.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

For the above you may also need to use setHandlerPredicate in RSocketMessageHandler to switch to a different strategy for detecting client responders, e.g. based on a custom annotation such as @RSocketClientResponder vs the default @Controller. This is necessary in scenarios with client and server, or multiple clients in the same application.spring-doc.cadn.net.cn

See also Annotated Responders, for more on the programming model.spring-doc.cadn.net.cn

Advanced

RSocketRequesterBuilder provides a callback to expose the underlying io.rsocket.core.RSocketConnector for further configuration options for keepalive intervals, session resumption, interceptors, and more. You can configure options at that level as follows:spring-doc.cadn.net.cn

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

Server Requester

To make requests from a server to connected clients is a matter of obtaining the requester for the connected client from the server.spring-doc.cadn.net.cn

In Annotated Responders, @ConnectMapping and @MessageMapping methods support an RSocketRequester argument. Use it to access the requester for the connection. Keep in mind that @ConnectMapping methods are essentially handlers of the SETUP frame which must be handled before requests can begin. Therefore, requests at the very start must be decoupled from handling. For example:spring-doc.cadn.net.cn

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 Start the request asynchronously, independent from handling.
2 Perform handling and return completion Mono<Void>.
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 Start the request asynchronously, independent from handling.
2 Perform handling in the suspending function.

Requests

Once you have a client or server requester, you can make requests as follows:spring-doc.cadn.net.cn

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 Specify a route to include in the metadata of the request message.
2 Provide data for the request message.
3 Declare the expected response.
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 Specify a route to include in the metadata of the request message.
2 Provide data for the request message.
3 Declare the expected response.

The interaction type is determined implicitly from the cardinality of the input and output. The above example is a Request-Stream because one value is sent and a stream of values is received. For the most part you don’t need to think about this as long as the choice of input and output matches an RSocket interaction type and the types of input and output expected by the responder. The only example of an invalid combination is many-to-one.spring-doc.cadn.net.cn

The data(Object) method also accepts any Reactive Streams Publisher, including Flux and Mono, as well as any other producer of value(s) that is registered in the ReactiveAdapterRegistry. For a multi-value Publisher such as Flux which produces the same types of values, consider using one of the overloaded data methods to avoid having type checks and Encoder lookup on every element:spring-doc.cadn.net.cn

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

The data(Object) step is optional. Skip it for requests that don’t send data:spring-doc.cadn.net.cn

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

Extra metadata values can be added if using composite metadata (the default) and if the values are supported by a registered Encoder. For example:spring-doc.cadn.net.cn

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

For Fire-and-Forget use the send() method that returns Mono<Void>. Note that the Mono indicates only that the message was successfully sent, and not that it was handled.spring-doc.cadn.net.cn

For Metadata-Push use the sendMetadata() method with a Mono<Void> return value.spring-doc.cadn.net.cn

Annotated Responders

RSocket responders can be implemented as @MessageMapping and @ConnectMapping methods. @MessageMapping methods handle individual requests while @ConnectMapping methods handle connection-level events (setup and metadata push). Annotated responders are supported symmetrically, for responding from the server side and for responding from the client side.spring-doc.cadn.net.cn

Server Responders

To use annotated responders on the server side, add RSocketMessageHandler to your Spring configuration to detect @Controller beans with @MessageMapping and @ConnectMapping methods:spring-doc.cadn.net.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

Then start an RSocket server through the Java RSocket API and plug the RSocketMessageHandler for the responder as follows:spring-doc.cadn.net.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler supports composite and routing metadata by default. You can set its MetadataExtractor if you need to switch to a different mime type or register additional metadata mime types.spring-doc.cadn.net.cn

You’ll need to set the Encoder and Decoder instances required for metadata and data formats to support. You’ll likely need the spring-web module for codec implementations.spring-doc.cadn.net.cn

By default SimpleRouteMatcher is used for matching routes via AntPathMatcher. We recommend plugging in the PathPatternRouteMatcher from spring-web for efficient route matching. RSocket routes can be hierarchical but are not URL paths. Both route matchers are configured to use "." as separator by default and there is no URL decoding as with HTTP URLs.spring-doc.cadn.net.cn

RSocketMessageHandler can be configured via RSocketStrategies which may be useful if you need to share configuration between a client and a server in the same process:spring-doc.cadn.net.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

Client Responders

Annotated responders on the client side need to be configured in the RSocketRequester.Builder. For details, see Client Responders.spring-doc.cadn.net.cn

@MessageMapping

Once server or client responder configuration is in place, @MessageMapping methods can be used as follows:spring-doc.cadn.net.cn

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

The above @MessageMapping method responds to a Request-Stream interaction having the route "locate.radars.within". It supports a flexible method signature with the option to use the following method arguments:spring-doc.cadn.net.cn

Method Argument Description

@Payloadspring-doc.cadn.net.cn

The payload of the request. This can be a concrete value of asynchronous types like Mono or Flux.spring-doc.cadn.net.cn

Note: Use of the annotation is optional. A method argument that is not a simple type and is not any of the other supported arguments, is assumed to be the expected payload.spring-doc.cadn.net.cn

RSocketRequesterspring-doc.cadn.net.cn

Requester for making requests to the remote end.spring-doc.cadn.net.cn

@DestinationVariablespring-doc.cadn.net.cn

Value extracted from the route based on variables in the mapping pattern, e.g. @MessageMapping("find.radar.{id}").spring-doc.cadn.net.cn

@Headerspring-doc.cadn.net.cn

Metadata value registered for extraction as described in MetadataExtractor.spring-doc.cadn.net.cn

@Headers Map<String, Object>spring-doc.cadn.net.cn

All metadata values registered for extraction as described in MetadataExtractor.spring-doc.cadn.net.cn

The return value is expected to be one or more Objects to be serialized as response payloads. That can be asynchronous types like Mono or Flux, a concrete value, or either void or a no-value asynchronous type such as Mono<Void>.spring-doc.cadn.net.cn

The RSocket interaction type that an @MessageMapping method supports is determined from the cardinality of the input (i.e. @Payload argument) and of the output, where cardinality means the following:spring-doc.cadn.net.cn

Cardinality Description

1spring-doc.cadn.net.cn

Either an explicit value, or a single-value asynchronous type such as Mono<T>.spring-doc.cadn.net.cn

Manyspring-doc.cadn.net.cn

A multi-value asynchronous type such as Flux<T>.spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

For input this means the method does not have an @Payload argument.spring-doc.cadn.net.cn

For output this is void or a no-value asynchronous type such as Mono<Void>.spring-doc.cadn.net.cn

The table below shows all input and output cardinality combinations and the corresponding interaction type(s):spring-doc.cadn.net.cn

Input Cardinality Output Cardinality Interaction Types

0, 1spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

Fire-and-Forget, Request-Responsespring-doc.cadn.net.cn

0, 1spring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

Request-Responsespring-doc.cadn.net.cn

0, 1spring-doc.cadn.net.cn

Manyspring-doc.cadn.net.cn

Request-Streamspring-doc.cadn.net.cn

Manyspring-doc.cadn.net.cn

0, 1, Manyspring-doc.cadn.net.cn

Request-Channelspring-doc.cadn.net.cn

@RSocketExchange

As an alternative to @MessageMapping, you can also handle requests with @RSocketExchange methods. Such methods are declared on an RSocket Interface and can be used as a requester via RSocketServiceProxyFactory or implemented by a responder.spring-doc.cadn.net.cn

For example, to handle requests as a responder:spring-doc.cadn.net.cn

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

There some differences between @RSocketExhange and @MessageMapping since the former needs to remain suitable for requester and responder use. For example, while @MessageMapping can be declared to handle any number of routes and each route can be a pattern, @RSocketExchange must be declared with a single, concrete route. There are also small differences in the supported method parameters related to metadata, see @MessageMapping and RSocket Interface for a list of supported parameters.spring-doc.cadn.net.cn

@RSocketExchange can be used at the type level to specify a common prefix for all routes for a given RSocket service interface.spring-doc.cadn.net.cn

@ConnectMapping

@ConnectMapping handles the SETUP frame at the start of an RSocket connection, and any subsequent metadata push notifications through the METADATA_PUSH frame, i.e. metadataPush(Payload) in io.rsocket.RSocket.spring-doc.cadn.net.cn

@ConnectMapping methods support the same arguments as @MessageMapping but based on metadata and data from the SETUP and METADATA_PUSH frames. @ConnectMapping can have a pattern to narrow handling to specific connections that have a route in the metadata, or if no patterns are declared then all connections match.spring-doc.cadn.net.cn

@ConnectMapping methods cannot return data and must be declared with void or Mono<Void> as the return value. If handling returns an error for a new connection then the connection is rejected. Handling must not be held up to make requests to the RSocketRequester for the connection. See Server Requester for details.spring-doc.cadn.net.cn

MetadataExtractor

Responders must interpret metadata. Composite metadata allows independently formatted metadata values (e.g. for routing, security, tracing) each with its own mime type. Applications need a way to configure metadata mime types to support, and a way to access extracted values.spring-doc.cadn.net.cn

MetadataExtractor is a contract to take serialized metadata and return decoded name-value pairs that can then be accessed like headers by name, for example via @Header in annotated handler methods.spring-doc.cadn.net.cn

DefaultMetadataExtractor can be given Decoder instances to decode metadata. Out of the box it has built-in support for "message/x.rsocket.routing.v0" which it decodes to String and saves under the "route" key. For any other mime type you’ll need to provide a Decoder and register the mime type as follows:spring-doc.cadn.net.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

Composite metadata works well to combine independent metadata values. However the requester might not support composite metadata, or may choose not to use it. For this, DefaultMetadataExtractor may needs custom logic to map the decoded value to the output map. Here is an example where JSON is used for metadata:spring-doc.cadn.net.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

When configuring MetadataExtractor through RSocketStrategies, you can let RSocketStrategies.Builder create the extractor with the configured decoders, and simply use a callback to customize registrations as follows:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket Interface

The Spring Framework lets you define an RSocket service as a Java interface with @RSocketExchange methods. You can pass such an interface to RSocketServiceProxyFactory to create a proxy which performs requests through an RSocketRequester. You can also implement the interface as a responder that handles requests.spring-doc.cadn.net.cn

Start by creating the interface with @RSocketExchange methods:spring-doc.cadn.net.cn

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

Now you can create a proxy that performs requests when methods are called:spring-doc.cadn.net.cn

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

You can also implement the interface to handle requests as a responder. See Annotated Responders.spring-doc.cadn.net.cn

Method Parameters

Annotated, RSocket exchange methods support flexible method signatures with the following method parameters:spring-doc.cadn.net.cn

Method argument Description

@DestinationVariablespring-doc.cadn.net.cn

Add a route variable to pass to RSocketRequester along with the route from the @RSocketExchange annotation in order to expand template placeholders in the route. This variable can be a String or any Object, which is then formatted via toString().spring-doc.cadn.net.cn

@Payloadspring-doc.cadn.net.cn

Set the input payload(s) for the request. This can be a concrete value, or any producer of values that can be adapted to a Reactive Streams Publisher via ReactiveAdapterRegistryspring-doc.cadn.net.cn

Object, if followed by MimeTypespring-doc.cadn.net.cn

The value for a metadata entry in the input payload. This can be any Object as long as the next argument is the metadata entry MimeType. The value can be a concrete value or any producer of a single value that can be adapted to a Reactive Streams Publisher via ReactiveAdapterRegistry.spring-doc.cadn.net.cn

MimeTypespring-doc.cadn.net.cn

The MimeType for a metadata entry. The preceding method argument is expected to be the metadata value.spring-doc.cadn.net.cn

Return Values

Annotated, RSocket exchange methods support return values that are concrete value(s), or any producer of value(s) that can be adapted to a Reactive Streams Publisher via ReactiveAdapterRegistry.spring-doc.cadn.net.cn

By default, the behavior of RSocket service methods with synchronous (blocking) method signature depends on response timeout settings of the underlying RSocket ClientTransport as well as RSocket keep-alive settings. RSocketServiceProxyFactory.Builder does expose a blockTimeout option that also lets you configure the maximum time to block for a response, but we recommend configuring timeout values at the RSocket level for more control.spring-doc.cadn.net.cn