此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Framework 6.2.0spring-doc.cadn.net.cn

RSocket 系列

本节描述了 Spring 框架对 RSocket 协议的支持。spring-doc.cadn.net.cn

概述

RSocket 是一种用于通过 TCP 进行多路复用、双工通信的应用程序协议, WebSocket 和其他字节流传输,使用以下交互之一 模型:spring-doc.cadn.net.cn

建立初始连接后,“client” 与 “server” 的区别将丢失,因为 双方都变得对称,并且每一方都可以发起上述交互之一。 这就是为什么在协议中将参与方称为 “requester” 和 “responder” 的原因 而上述交互称为 “请求流” 或简称为 “请求”。spring-doc.cadn.net.cn

以下是 RSocket 协议的主要功能和优势:spring-doc.cadn.net.cn

  • 跨网络边界的 Reactive Streams 语义 — 对于流式请求,例如Request-StreamChannel、背压信号 在请求者和响应者之间移动,允许请求者在 源,从而减少对网络层拥塞控制的依赖,以及需求 用于网络级别或任何级别的缓冲。spring-doc.cadn.net.cn

  • 请求限制 — 此功能被命名为 “租赁” ,以LEASEframe 那个 可以从每一端发送,以限制另一端允许的请求总数 在给定的时间内。租约会定期续订。spring-doc.cadn.net.cn

  • 会话恢复 — 这是为连接丢失而设计的,需要一些状态 待维护。状态管理对应用程序是透明的,并且运行良好 结合背压,可以在可能的情况下停止生产者并减少 所需的状态量。spring-doc.cadn.net.cn

  • 大型消息的分片和重新汇编。spring-doc.cadn.net.cn

  • Keepalive (检测信号)。spring-doc.cadn.net.cn

RSocket 具有多种语言的实现Java 库构建在 Project Reactor 之上, 和 Reactor Netty 用于运输。这意味着 应用程序中来自 Reactive Streams Publishers 的信号以透明方式传播 通过 RSocket 跨网络。spring-doc.cadn.net.cn

协议

RSocket 的好处之一是它在 wire 上具有明确定义的行为,并且 易于阅读的规范以及一些协议扩展。因此它是 阅读规范是个好主意,独立于语言实现和更高级别 框架 API 的 API 中。本节提供了简洁的概述,以建立一些上下文。spring-doc.cadn.net.cn

最初,客户端通过一些低级流传输(如 作为 TCP 或 WebSocket 发送,并发送一个SETUPframe 添加到服务器中,为 连接。spring-doc.cadn.net.cn

服务器可能会拒绝SETUPframe,但通常在发送之后(对于客户端) 和 received(对于服务器),双方都可以开始发出请求,除非SETUP指示使用租赁语义来限制请求数,在这种情况下 双方都必须等待LEASEframe 以允许发出请求。spring-doc.cadn.net.cn

建立连接后,双方都可以通过 框架REQUEST_RESPONSE,REQUEST_STREAM,REQUEST_CHANNELREQUEST_FNF.每个 这些帧将一条消息从请求者传送到响应者。spring-doc.cadn.net.cn

然后,响应方可以返回PAYLOAD帧,并且在这种情况下 之REQUEST_CHANNEL请求者还可以发送PAYLOAD具有更多请求的帧 消息。spring-doc.cadn.net.cn

当请求涉及消息流(如Request-StreamChannel, 响应方必须遵循来自请求方的需求信号。Demand 表示为 消息数。初始需求在REQUEST_STREAMREQUEST_CHANNEL框架。后续需求通过REQUEST_N框架。spring-doc.cadn.net.cn

每一方还可以通过METADATA_PUSHframe 的 与任何单个请求有关,但与整个连接有关。spring-doc.cadn.net.cn

RSocket 消息包含数据和元数据。元数据可用于发送路由、 证券令牌等数据和元数据的格式可以不同。每个 Mime 类型 在SETUPframe 并应用于给定连接上的所有请求。spring-doc.cadn.net.cn

虽然所有消息都可以包含元数据,但通常元数据(如路由)是按请求进行的 因此仅包含在请求的第一条消息中,即与其中一个帧一起REQUEST_RESPONSE,REQUEST_STREAM,REQUEST_CHANNELREQUEST_FNF.spring-doc.cadn.net.cn

协议扩展定义用于应用程序的常见元数据格式:spring-doc.cadn.net.cn

Java 实现

RSocket 的 Java 实现构建在 Project Reactor 之上。TCP 和 WebSocket 的传输方式是 基于 Reactor Netty 构建。作为反应式流 库,Reactor 简化了实现协议的工作。对于应用程序,它是 天生的使用FluxMono使用声明式运算符和透明 back 压力支持。spring-doc.cadn.net.cn

RSocket Java 中的 API 有意做到最小和基本。它侧重于协议 功能,并将应用程序编程模型(例如,RPC codegen 与其他模型)保留为 更高层次,独立关注。spring-doc.cadn.net.cn

主合约 io.rsocket.RSocket 使用Mono表示 单条消息 /Flux消息流,以及io.rsocket.Payload实际的 message 中访问数据和元数据作为字节缓冲区。这RSocket使用 Contract 对称。对于请求,应用程序将获得一个RSocket执行 请求与。为了响应,应用程序实现了RSocket处理请求。spring-doc.cadn.net.cn

这并不是一个详尽的介绍。在大多数情况下,Spring 应用程序 不必直接使用其 API。但是,观察或试验可能很重要 使用 RSocket 独立于 Spring。RSocket Java 存储库包含许多示例应用程序,这些应用程序 演示其 API 和协议功能。spring-doc.cadn.net.cn

弹簧支撑

spring-messagingmodule 包含以下内容:spring-doc.cadn.net.cn

spring-webmodule 包含EncoderDecoderJackson 等实现 CBOR/JSON 和 Protobuf 的 RSocket 应用程序可能需要。它还包含PathPatternParser可以插入以进行高效的路由匹配。spring-doc.cadn.net.cn

Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括 在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有 Client 支持和自动配置RSocketRequester.BuilderRSocketStrategies. 有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分spring-doc.cadn.net.cn

Spring Security 5.2 提供了 RSocket 支持。spring-doc.cadn.net.cn

Spring 集成 5.2 提供了入站和出站网关来与 RSocket 交互 客户端和服务器。有关更多详细信息,请参见 Spring 集成参考手册。spring-doc.cadn.net.cn

Spring Cloud 网关支持 RSocket 连接。spring-doc.cadn.net.cn

RSocketRequester 请求者

RSocketRequester提供 Fluent API 来执行 RSocket 请求、接受和 返回 data 和 metadata 的对象,而不是低级数据缓冲区。可以使用 对称地,从 Client 端发出请求,以及从 Server 发出请求。spring-doc.cadn.net.cn

客户端请求者

要获取RSocketRequester在客户端是连接到一个服务器,它涉及 发送 RSocketSETUPFrame 替换为 Connection Settings。RSocketRequester提供 构建器,它有助于准备io.rsocket.core.RSocketConnector包括连接 的设置SETUP框架。spring-doc.cadn.net.cn

这是使用默认设置进行连接的最基本方法:spring-doc.cadn.net.cn

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

以上不会立即连接。发出请求时,共享连接为 透明地建立并使用。spring-doc.cadn.net.cn

连接设置

RSocketRequester.Builder提供了以下内容来自定义初始SETUP框架:spring-doc.cadn.net.cn

对于 data,默认 MIME 类型派生自第一个配置的Decoder.为 metadata,则默认的 MIME 类型是 composite metadata,它允许多个 每个请求的元数据值和 MIME 类型对。通常,两者都不需要更改。spring-doc.cadn.net.cn

Data and metadata 中的SETUPframe 是可选的。在服务器端,可以使用@ConnectMapping方法处理 connection 和SETUP框架。元数据可用于连接 级别安全性。spring-doc.cadn.net.cn

策略

RSocketRequester.Builder接受RSocketStrategies以配置请求者。 您需要使用它来提供编码器和解码器,用于数据的 (de) 序列化和 metadata 值。默认情况下,只有spring-coreString,byte[]ByteBuffer已注册。添加spring-web提供对更多 可以按如下方式注册:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies专为重复使用而设计。在某些情况下,例如,客户端和服务器位于 相同的应用程序,最好在 Spring 配置中声明它。spring-doc.cadn.net.cn

客户端响应方

RSocketRequester.Builder可用于配置对来自 服务器。spring-doc.cadn.net.cn

您可以使用带注释的处理程序进行基于相同的客户端响应 在服务器上使用但以编程方式注册的基础结构,如下所示:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 PathPatternRouteMatcher如果spring-web存在,以实现高效 路由匹配。
2 从类创建响应者@MessageMapping和/或@ConnectMapping方法。
3 注册响应方。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 PathPatternRouteMatcher如果spring-web存在,以实现高效 路由匹配。
2 从类创建响应者@MessageMapping和/或@ConnectMapping方法。
3 注册响应方。

请注意,以上只是专为 client 的编程注册而设计的快捷方式 反应。对于客户端响应者处于 Spring 配置中的替代场景, 您仍然可以声明RSocketMessageHandler作为 Spring Bean,然后按如下方式应用:spring-doc.cadn.net.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于上述内容,您可能还需要使用setHandlerPredicateRSocketMessageHandler自 切换到不同的策略来检测客户端响应者,例如,基于自定义 注解(例如@RSocketClientResponder与默认值@Controller.这 在客户端和服务器或多个客户端位于同一 应用。spring-doc.cadn.net.cn

另请参阅 Annotated Responders ,以了解有关编程模型的更多信息。spring-doc.cadn.net.cn

高深

RSocketRequesterBuilder提供一个回调来暴露底层io.rsocket.core.RSocketConnector有关 Keepalive 的更多配置选项 intervals、session resumption、interceptor 等。您可以配置选项 在该级别,如下所示:spring-doc.cadn.net.cn

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

服务器请求者

要从服务器向连接的客户端发出请求,只需获取 来自服务器的已连接客户端的 requester。spring-doc.cadn.net.cn

Annotated Responders 中@ConnectMapping@MessageMapping方法支持RSocketRequester论点。使用它来访问连接的请求者。保留 请注意@ConnectMapping方法本质上是SETUPframe which 必须在请求开始之前处理。因此,请求在一开始就必须是 与处理分离。例如:spring-doc.cadn.net.cn

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 异步启动请求,独立于处理。
2 执行处理和退货完成Mono<Void>.
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 异步启动请求,独立于处理。
2 在 suspending 函数中执行处理。

请求

拥有客户端服务器请求者后,您可以按如下方式发出请求:spring-doc.cadn.net.cn

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。

交互类型由输入的基数隐式确定,而 输出。上面的示例是一个Request-Stream因为发送了一个值和一个流 of values 的值。在大多数情况下,您不需要考虑这个问题,只要 输入和输出的选择与 RSocket 交互类型以及 input 和 响应方所需的输出。无效组合的唯一示例是多对一。spring-doc.cadn.net.cn

data(Object)method 也接受任何 Reactive StreamsPublisher包括FluxMono,以及在ReactiveAdapterRegistry.对于多值PublisherFlux,这将产生 相同类型的值,请考虑使用重载的data避免 type checks 和Encoderlookup on each element:spring-doc.cadn.net.cn

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object)step 是可选的。对于不发送数据的请求,请跳过它:spring-doc.cadn.net.cn

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用复合元数据(默认),并且可以添加额外的元数据值 值由已注册的Encoder.例如:spring-doc.cadn.net.cn

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

Fire-and-Forget使用send()方法,该方法返回Mono<Void>.请注意,Mono仅指示消息已成功发送,而不指示消息已处理。spring-doc.cadn.net.cn

Metadata-Push使用sendMetadata()方法替换为Mono<Void>返回值。spring-doc.cadn.net.cn

带注释的响应者

RSocket 响应程序可以实现为@MessageMapping@ConnectMapping方法。@MessageMapping方法处理单个请求,而@ConnectMapping方法 handle 连接级事件 (Setup 和 Metadata Push)。支持带注释的响应者 对称地,用于从服务器端响应和从客户端响应。spring-doc.cadn.net.cn

服务器响应程序

要在服务器端使用带注释的响应程序,请添加RSocketMessageHandler到你的 Spring 要检测的配置@Controllerbean 替换为@MessageMapping@ConnectMapping方法:spring-doc.cadn.net.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过 Java RSocket API 启动一个 RSocket 服务器,并将RSocketMessageHandler对于响应方,如下所示:spring-doc.cadn.net.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler默认情况下,支持复合元数据和路由元数据。如果需要切换到 不同的 MIME 类型或注册其他元数据 MIME 类型。spring-doc.cadn.net.cn

您需要设置EncoderDecoder元数据和数据所需的实例 格式。您可能需要spring-webmodule 进行编码实现。spring-doc.cadn.net.cn

默认情况下SimpleRouteMatcher用于通过AntPathMatcher. 我们建议插入PathPatternRouteMatcherspring-web为 高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。 默认情况下,两个路由匹配器都配置为使用 “.” 作为分隔符,并且没有 URL 解码方式与 HTTP URL 一样。spring-doc.cadn.net.cn

RSocketMessageHandler可通过以下方式进行配置RSocketStrategies这可能很有用,如果 您需要在同一进程中在 Client 端和 Server 之间共享配置:spring-doc.cadn.net.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

客户端响应方

客户端上带注释的响应程序需要在RSocketRequester.Builder.有关详细信息,请参阅 客户端响应程序spring-doc.cadn.net.cn

@MessageMapping

服务器客户端响应方配置到位后,@MessageMapping方法可按如下方式使用:spring-doc.cadn.net.cn

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

以上内容@MessageMapping方法响应具有 路由 “locate.radars.within”。它支持灵活的方法签名,并可选择 使用以下方法参数:spring-doc.cadn.net.cn

方法参数 描述

@Payloadspring-doc.cadn.net.cn

请求的有效负载。这可以是异步类型的具体值,例如MonoFlux.spring-doc.cadn.net.cn

注意:使用注释是可选的。不是简单类型的方法参数 并且不是任何其他受支持的参数,则假定为预期的有效负载。spring-doc.cadn.net.cn

RSocketRequesterspring-doc.cadn.net.cn

请求者,用于向远程端发出请求。spring-doc.cadn.net.cn

@DestinationVariablespring-doc.cadn.net.cn

根据映射模式中的变量从路由中提取的值,例如@MessageMapping("find.radar.{id}").spring-doc.cadn.net.cn

@Headerspring-doc.cadn.net.cn

注册用于提取的元数据值,如 MetadataExtractor 中所述。spring-doc.cadn.net.cn

@Headers Map<String, Object>spring-doc.cadn.net.cn

注册用于提取的所有元数据值,如 MetadataExtractor 中所述。spring-doc.cadn.net.cn

返回值应为一个或多个要序列化为响应的 Object 负载。这可以是异步类型,例如MonoFlux、具体值或 也void或无值异步类型,例如Mono<Void>.spring-doc.cadn.net.cn

RSocket 交互类型中,一个@MessageMapping方法 Supports 的确定依据 input 的基数(即@Payload参数)和输出,其中 cardinality 的含义如下:spring-doc.cadn.net.cn

基数 描述

1spring-doc.cadn.net.cn

显式值或单值异步类型(如Mono<T>.spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

多值异步类型,例如Flux<T>.spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

对于 input,这意味着该方法没有@Payload论点。spring-doc.cadn.net.cn

对于输出,此值为void或无值异步类型,例如Mono<Void>.spring-doc.cadn.net.cn

下表显示了所有输入和输出基数组合以及相应的 交互类型:spring-doc.cadn.net.cn

输入基数 输出基数 交互类型

0, 1spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

Fire-and-Forget, Request-Responsespring-doc.cadn.net.cn

0, 1spring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

请求-响应spring-doc.cadn.net.cn

0, 1spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

请求流spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

0、1、多spring-doc.cadn.net.cn

请求通道spring-doc.cadn.net.cn

@RSocketExchange

作为@MessageMapping,您还可以使用@RSocketExchange方法。此类方法在 RSocket 接口上声明,并且可以通过以下方式用作请求者RSocketServiceProxyFactory或由响应方实现。spring-doc.cadn.net.cn

例如,要以响应者身份处理请求:spring-doc.cadn.net.cn

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

之间存在一些差异@RSocketExhange@MessageMapping由于 前者需要保持适合请求者和响应者使用。例如,虽然@MessageMapping可以声明为处理任意数量的路由,并且每个路由都可以 be a pattern,@RSocketExchange必须使用单个具体路由进行声明。有 与元数据相关的支持方法参数也存在细微差异,有关支持的参数列表,请参阅 @MessageMappingRSocket 接口spring-doc.cadn.net.cn

@RSocketExchange可以在类型级别使用,为所有路由指定通用前缀 对于给定的 RSocket 服务接口。spring-doc.cadn.net.cn

@ConnectMapping

@ConnectMapping处理SETUPframe 在 RSocket 连接开始时,以及 通过METADATA_PUSHframe 的metadataPush(Payload)io.rsocket.RSocket.spring-doc.cadn.net.cn

@ConnectMapping方法支持与 @MessageMapping 相同的参数,但基于元数据和数据SETUPMETADATA_PUSH框架。@ConnectMapping可以有一个模式来缩小处理范围 在元数据中具有路由的特定连接,或者如果未声明任何模式 则所有连接都匹配。spring-doc.cadn.net.cn

@ConnectMapping方法不能返回数据,必须使用voidMono<Void>作为返回值。如果处理返回新的 connection,则连接将被拒绝。处理不得搁置以使 请求发送到RSocketRequester对于连接。有关详细信息,请参阅 Server Requesterspring-doc.cadn.net.cn

元数据提取器

响应方必须解释元数据。复合元数据允许独立 格式化的元数据值(例如,用于路由、安全性、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置元数据 MIME 类型以支持,以及一种方法 以访问提取的值。spring-doc.cadn.net.cn

MetadataExtractor是一个合约,用于获取序列化元数据并返回解码 名称-值对,然后可以像 Headers 一样按名称访问,例如通过@Header在带注释的处理程序方法中。spring-doc.cadn.net.cn

DefaultMetadataExtractor可以给出Decoder实例来解码元数据。出 它内置了对 “message/x.rsocket.routing.v0” 的支持,它将其解码为该框String并保存在 “route” 键下。对于任何其他 mime 类型,您需要提供 一个Decoder并注册 MIME 类型,如下所示:spring-doc.cadn.net.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

复合元数据可以很好地组合独立的元数据值。但是, 请求者可能不支持复合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor可能需要自定义逻辑来将解码的值映射到输出 地图。以下是将 JSON 用于元数据的示例:spring-doc.cadn.net.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

配置MetadataExtractor通过RSocketStrategies,您可以让RSocketStrategies.Builder使用配置的解码器创建提取器,以及 只需使用回调即可自定义注册,如下所示:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 接口

Spring Framework 允许您将 RSocket 服务定义为 Java 接口,其中@RSocketExchange方法。您可以将此类接口传递给RSocketServiceProxyFactory创建通过 RSocketRequester 执行请求的代理。您还可以实现 interface 作为处理请求的响应方。spring-doc.cadn.net.cn

首先使用@RSocketExchange方法:spring-doc.cadn.net.cn

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

现在,您可以创建一个代理,在调用方法时执行请求:spring-doc.cadn.net.cn

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

您还可以实现该接口以作为响应方处理请求。 请参阅 带注释的响应程序spring-doc.cadn.net.cn

方法参数

带注释的 RSocket 交换方法支持灵活的方法签名,如下所示 方法参数:spring-doc.cadn.net.cn

Method 参数 描述

@DestinationVariablespring-doc.cadn.net.cn

添加要传递给的路由变量RSocketRequester以及从@RSocketExchangeannotation 来扩展路由中的模板占位符。 此变量可以是 String 或任何 Object,然后通过toString().spring-doc.cadn.net.cn

@Payloadspring-doc.cadn.net.cn

设置请求的输入负载。这可以是具体值,也可以是任何 producer 可以适应 Reactive Streams 的值Publisher通过ReactiveAdapterRegistry.必须提供有效负载,除非required属性 设置为false,或者参数标记为 optional (可选),由MethodParameter#isOptional.spring-doc.cadn.net.cn

Object,如果后跟MimeTypespring-doc.cadn.net.cn

输入负载中元数据条目的值。这可以是任何Object只要 因为下一个参数是元数据条目MimeType.该值可以是具体的 value 或可以适应 Reactive Streams 的单个值的任何 producerPublisher通过ReactiveAdapterRegistry.spring-doc.cadn.net.cn

MimeTypespring-doc.cadn.net.cn

MimeType对于元数据条目。前面的 method 参数应为 metadata 值。spring-doc.cadn.net.cn

返回值

带注释的 RSocket 交换方法支持作为具体值的返回值,或者 任何可以适应 Reactive Streams 的值的 producerPublisher通过ReactiveAdapterRegistry.spring-doc.cadn.net.cn

默认情况下,具有同步(阻塞)方法的 RSocket 服务方法的行为 签名取决于底层 RSocket 的响应超时设置ClientTransport以及 RSocket keep-alive 设置。RSocketServiceProxyFactory.Builder会暴露blockTimeout选项,该选项还允许您配置阻止响应的最长时间, 但我们建议在 RSocket 级别配置超时值以获得更多控制。spring-doc.cadn.net.cn