此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Framework 6.1.10! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Framework 6.1.10! |
参考文档的这一部分介绍了对 reactive-stack WebSocket 的支持 消息。
WebSocket简介
WebSocket 协议 RFC 6455 提供了一个标准化的 在客户端和服务器之间建立全双工、双向通信通道的方法 通过单个 TCP 连接。它是一种与 HTTP 不同的 TCP 协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重用现有防火墙规则。
WebSocket 交互从使用 HTTP 标头的 HTTP 请求开始
升级,或者在本例中切换到 WebSocket 协议。以下示例
显示这样的交互:Upgrade
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 | 标头。Upgrade |
2 | 使用连接。Upgrade |
支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:
HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 | 协议交换机 |
握手成功后,HTTP 升级请求的基础 TCP 套接字将保留 为客户端和服务器打开以继续发送和接收消息。
WebSocket 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455、HTML5 的 WebSocket 章节或许多介绍和 Web 上的教程。
请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要将其配置为将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请选中 云提供商与 WebSocket 支持相关的说明。
HTTP 与 WebSocket
即使 WebSocket 被设计为与 HTTP 兼容并且以 HTTP 请求开头, 重要的是要了解这两种协议导致非常不同 体系结构和应用程序编程模型。
在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端访问这些 URL,请求-响应样式。服务器将请求路由 基于 HTTP URL、方法和标头的适当处理程序。
相比之下,在 WebSocket 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一个完全不同的异步、事件驱动的消息传递体系结构。
WebSocket 也是一种低级传输协议,与 HTTP 不同,它没有规定 消息内容的任何语义。这意味着没有办法路由或处理 除非客户端和服务器在消息语义上达成一致,否则消息。
WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议
(例如,STOMP),通过 HTTP 握手请求上的标头。
如果没有这一点,他们需要提出自己的惯例。Sec-WebSocket-Protocol
何时使用 WebSocket
WebSocket 可以使网页具有动态性和交互性。但是,在许多情况下, AJAX 和 HTTP 流式处理或长轮询的组合可以提供简单且 有效的解决方案。
例如,新闻、邮件和社交源需要动态更新,但可能需要动态更新 每隔几分钟就这样做是完全可以的。协作、游戏和金融应用,在 另一方面,需要更接近实时。
延迟本身并不是决定性因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流式处理或轮询可以提供有效的解决方案。 低延迟、高频率和高音量的结合才能发挥最佳效果 使用 WebSocket 的案例。
还要记住,在互联网上,您无法控制的限制性代理
可能会阻止 WebSocket 交互,因为它们未配置为传递标头,或者因为它们关闭了看似空闲的长期连接。这
意味着将 WebSocket 用于防火墙内的内部应用程序是一个
与面向公众的应用程序相比,它更直接。Upgrade
1 | 标头。Upgrade |
2 | 使用连接。Upgrade |
1 | 协议交换机 |
WebSocket 接口
Spring Framework 提供了一个 WebSocket API,您可以使用它来编写客户端和 处理 WebSocket 消息的服务器端应用程序。
服务器
若要创建 WebSocket 服务器,可以先创建一个 .
以下示例演示如何执行此操作:WebSocketHandler
-
Java
-
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然后,您可以将其映射到 URL:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用 WebFlux Config,则什么都没有
如果不使用 WebFlux 配置,则需要声明如下所示:WebSocketHandlerAdapter
-
Java
-
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
用于指示会话的应用程序处理何时完成的 take 和 return 方法。会话已处理
通过两个流,一个用于入站消息,一个用于出站消息。下表
介绍处理流的两种方法:handle
WebSocketHandler
WebSocketSession
Mono<Void>
WebSocketSession 方法 |
描述 |
---|---|
|
提供对入站消息流的访问,并在连接关闭时完成。 |
|
获取传出消息的源,写入消息,并返回
在源代码完成并完成写入时完成。 |
A 必须将入站和出站流组合成一个统一的流,并且
返回反映该流完成的 A。取决于应用
要求,统一流在以下情况下完成:WebSocketHandler
Mono<Void>
-
入站或出站消息流完成。
-
入站流完成(即连接关闭),而出站流是无限的。
-
在选定的点上,通过 .
close
WebSocketSession
当入站和出站消息流组合在一起时,无需 检查连接是否打开,因为 Reactive Streams 会发出结束活动的信号。 入站流接收完成或错误信号,出站流接收到完成或错误信号 接收取消信号。
处理程序的最基本实现是处理入站流的实现。这 以下示例演示了这样的实现:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() (1)
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then(); (4)
}
}
1 | 访问入站消息流。 |
2 | 对每条消息执行一些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回 a 在接收完成时完成。Mono<Void> |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() (1)
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then() (4)
}
}
1 | 访问入站消息流。 |
2 | 对每条消息执行一些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回 a 在接收完成时完成。Mono<Void> |
对于嵌套的异步操作,可能需要调用基础
使用池数据缓冲区的服务器(例如,Netty)。否则,数据缓冲区可能是
在您有机会阅读数据之前发布。有关更多背景信息,请参阅数据缓冲区和编解码器。message.retain() |
以下实现将入站流和出站流组合在一起:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); (2)
return session.send(output); (3)
}
}
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回未完成的 a,而我们继续接收。Mono<Void> |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } (2)
return session.send(output) (3)
}
}
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回未完成的 a,而我们继续接收。Mono<Void> |
入站和出站流可以是独立的,并且只有在完成时才能加入, 如以下示例所示:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); (2)
return Mono.zip(input, output).then(); (3)
}
}
1 | 处理入站消息流。 |
2 | 发送传出消息。 |
3 | 联接流并返回一个在任一流结束时完成的 a。Mono<Void> |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) (2)
return Mono.zip(input, output).then() (3)
}
}
1 | 处理入站消息流。 |
2 | 发送传出消息。 |
3 | 联接流并返回一个在任一流结束时完成的 a。Mono<Void> |
DataBuffer
DataBuffer
是 WebFlux 中字节缓冲区的表示形式。Spring Core 部分
参考资料在数据缓冲区和编解码器部分有更多内容。要了解的关键点是,在某些
像 Netty 这样的服务器,字节缓冲区是池化的,引用计数,必须释放
使用时以避免内存泄漏。
在 Netty 上运行时,应用程序必须使用
希望保留输入数据缓冲区以确保它们不会被释放,并且
随后在缓冲区用完时使用。DataBufferUtils.retain(dataBuffer)
DataBufferUtils.release(dataBuffer)
握手
WebSocketHandlerAdapter
委托给 .默认情况下,这是一个实例
的,它对 WebSocket 请求和
然后用于正在使用的服务器。目前,有内置的
支持 Reactor Netty、Tomcat、Jetty 和 Undertow。WebSocketService
HandshakeWebSocketService
RequestUpgradeStrategy
HandshakeWebSocketService
公开一个属性,该属性允许
设置 A 从中提取属性并插入它们
转换为 .sessionAttributePredicate
Predicate<String>
WebSession
WebSocketSession
服务器配置
对于每个服务器,都公开了特定于
底层 WebSocket 服务器引擎。使用 WebFlux Java 配置时,您可以自定义
WebFlux Config 的相应部分中显示的此类属性,或者如果
不使用 WebFlux 配置,请使用以下命令:RequestUpgradeStrategy
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
检查服务器的升级策略,了解有哪些选项可用。现在 只有 Tomcat 和 Jetty 公开了此类选项。
CORS(核心斯酒店)
配置 CORS 并限制对 WebSocket 终结点的访问的最简单方法是
实现并返回包含允许的源、标头和其他详细信息的 A。如果你做不到
那,您还可以将属性设置为
按 URL 模式指定 CORS 设置。如果同时指定了两者,则使用 on 上的方法将它们组合在一起。WebSocketHandler
CorsConfigurationSource
CorsConfiguration
corsConfigurations
SimpleUrlHandler
combine
CorsConfiguration
客户
Spring WebFlux 提供了一个抽象,其中包含以下实现
Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)。WebSocketClient
Tomcat 客户端实际上是标准 Java 客户端的扩展,但有一些额外的扩展
处理中的功能,以利用 Tomcat 特定的功能
用于暂停接收背压消息的 API。WebSocketSession |
若要启动 WebSocket 会话,可以创建客户端实例并使用其方法:execute
-
Java
-
Kotlin
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
某些客户端(如 Jetty)实施并需要停止和启动
在您可以使用它们之前。所有客户端都有与配置相关的构造函数选项
的基础 WebSocket 客户端。Lifecycle
WebSocketSession 方法 |
描述 |
---|---|
|
提供对入站消息流的访问,并在连接关闭时完成。 |
|
获取传出消息的源,写入消息,并返回
在源代码完成并完成写入时完成。 |
1 | 访问入站消息流。 |
2 | 对每条消息执行一些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回 a 在接收完成时完成。Mono<Void> |
1 | 访问入站消息流。 |
2 | 对每条消息执行一些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回 a 在接收完成时完成。Mono<Void> |
对于嵌套的异步操作,可能需要调用基础
使用池数据缓冲区的服务器(例如,Netty)。否则,数据缓冲区可能是
在您有机会阅读数据之前发布。有关更多背景信息,请参阅数据缓冲区和编解码器。message.retain() |
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回未完成的 a,而我们继续接收。Mono<Void> |
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回未完成的 a,而我们继续接收。Mono<Void> |
1 | 处理入站消息流。 |
2 | 发送传出消息。 |
3 | 联接流并返回一个在任一流结束时完成的 a。Mono<Void> |
1 | 处理入站消息流。 |
2 | 发送传出消息。 |
3 | 联接流并返回一个在任一流结束时完成的 a。Mono<Void> |
Tomcat 客户端实际上是标准 Java 客户端的扩展,但有一些额外的扩展
处理中的功能,以利用 Tomcat 特定的功能
用于暂停接收背压消息的 API。WebSocketSession |