对于最新的稳定版本,请使用 Spring Framework 6.2.4spring-doc.cadn.net.cn

WebSockets 浏览器

参考文档的这一部分涵盖了对反应式堆栈 WebSocket 的支持 消息。spring-doc.cadn.net.cn

WebSocket 简介

WebSocket 协议 RFC 6455 提供了标准化的 在 Client 端和 Server 之间建立全双工双向通信通道的方法 通过单个 TCP 连接。它是与 HTTP 不同的 TCP 协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重复使用现有防火墙规则。spring-doc.cadn.net.cn

WebSocket 交互以使用 HTTPUpgrade页眉 进行升级,或者在本例中切换到 WebSocket 协议。以下示例 显示了这样的交互:spring-doc.cadn.net.cn

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 Upgrade页眉。
2 使用Upgrade连接。

支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:spring-doc.cadn.net.cn

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 协议切换

握手成功后,HTTP 升级请求的基础 TCP 套接字将保留 open 以继续发送和接收消息。spring-doc.cadn.net.cn

有关 WebSockets 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455、HTML5 的 WebSocket 章节或许多介绍中的任何一个 Web 上的教程。spring-doc.cadn.net.cn

请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要对其进行配置,以便将 WebSocket 升级请求传递给 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查 云提供商与 WebSocket 支持相关的说明。spring-doc.cadn.net.cn

HTTP 与 WebSocket

尽管 WebSocket 设计为与 HTTP 兼容并以 HTTP 请求开头, 重要的是要了解这两种协议导致非常不同的结果 体系结构和应用程序编程模型。spring-doc.cadn.net.cn

在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端以请求-响应样式访问这些 URL。服务器将请求路由到 基于 HTTP URL、方法和标头的适当处理程序。spring-doc.cadn.net.cn

相比之下,在 WebSockets 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一种完全不同的异步、事件驱动的消息传递架构。spring-doc.cadn.net.cn

WebSocket 也是一种低级传输协议,与 HTTP 不同,它没有规定 消息内容的任何语义。这意味着无法路由或处理 一条消息,除非客户端和服务器在消息语义上达成一致。spring-doc.cadn.net.cn

WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议 (例如,STOMP),通过Sec-WebSocket-Protocol标头。 如果没有这些,他们需要提出自己的惯例。spring-doc.cadn.net.cn

何时使用 WebSockets

WebSockets 可以使网页具有动态和交互性。但是,在许多情况下, AJAX 和 HTTP 流式处理或长轮询的组合可以提供简单且 有效的解决方案。spring-doc.cadn.net.cn

例如,新闻、邮件和社交源需要动态更新,但可能需要 每隔几分钟这样做一次完全可以。协作、游戏和金融应用程序 另一方面,需要更接近实时。spring-doc.cadn.net.cn

延迟本身并不是决定因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频率和高容量的组合造就了最好的 case 来使用 WebSocket。spring-doc.cadn.net.cn

另请记住,在 Internet 上,您无法控制的限制性代理 可能会排除 WebSocket 交互,因为它们未配置为传递Upgrade标头,或者因为它们关闭了看起来空闲的长期连接。这 意味着将 WebSocket 用于防火墙内的内部应用程序是 比面向公众的应用程序更直接的决定。spring-doc.cadn.net.cn

WebSocket API

Spring Framework 提供了一个 WebSocket API,您可以使用它来编写 client- 和 处理 WebSocket 消息的服务器端应用程序。spring-doc.cadn.net.cn

服务器

要创建 WebSocket 服务器,您可以首先创建一个WebSocketHandler. 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

Then you can map it to a URL:spring-doc.cadn.net.cn

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

If using the WebFlux Config there is nothing further to do, or otherwise if not using the WebFlux config you’ll need to declare a WebSocketHandlerAdapter as shown below:spring-doc.cadn.net.cn

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

The handle method of WebSocketHandler takes WebSocketSession and returns Mono<Void> to indicate when application handling of the session is complete. The session is handled through two streams, one for inbound and one for outbound messages. The following table describes the two methods that handle the streams:spring-doc.cadn.net.cn

WebSocketSession method Description

Flux<WebSocketMessage> receive()spring-doc.cadn.net.cn

Provides access to the inbound message stream and completes when the connection is closed.spring-doc.cadn.net.cn

Mono<Void> send(Publisher<WebSocketMessage>)spring-doc.cadn.net.cn

Takes a source for outgoing messages, writes the messages, and returns a Mono<Void> that completes when the source completes and writing is done.spring-doc.cadn.net.cn

A WebSocketHandler must compose the inbound and outbound streams into a unified flow and return a Mono<Void> that reflects the completion of that flow. Depending on application requirements, the unified flow completes when:spring-doc.cadn.net.cn

When inbound and outbound message streams are composed together, there is no need to check if the connection is open, since Reactive Streams signals end activity. The inbound stream receives a completion or error signal, and the outbound stream receives a cancellation signal.spring-doc.cadn.net.cn

The most basic implementation of a handler is one that handles the inbound stream. The following example shows such an implementation:spring-doc.cadn.net.cn

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 Access the stream of inbound messages.
2 Do something with each message.
3 Perform nested asynchronous operations that use the message content.
4 Return a Mono<Void> that completes when receiving completes.
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
1 Access the stream of inbound messages.
2 Do something with each message.
3 Perform nested asynchronous operations that use the message content.
4 Return a Mono<Void> that completes when receiving completes.
For nested, asynchronous operations, you may need to call message.retain() on underlying servers that use pooled data buffers (for example, Netty). Otherwise, the data buffer may be released before you have had a chance to read the data. For more background, see Data Buffers and Codecs.

The following implementation combines the inbound and outbound streams:spring-doc.cadn.net.cn

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 Handle the inbound message stream.
2 Create the outbound message, producing a combined flow.
3 Return a Mono<Void> that does not complete while we continue to receive.
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
1 Handle the inbound message stream.
2 Create the outbound message, producing a combined flow.
3 Return a Mono<Void> that does not complete while we continue to receive.

Inbound and outbound streams can be independent and be joined only for completion, as the following example shows:spring-doc.cadn.net.cn

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
1 Handle inbound message stream.
2 Send outgoing messages.
3 Join the streams and return a Mono<Void> that completes when either stream ends.
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
1 Handle inbound message stream.
2 Send outgoing messages.
3 Join the streams and return a Mono<Void> that completes when either stream ends.

DataBuffer

DataBuffer is the representation for a byte buffer in WebFlux. The Spring Core part of the reference has more on that in the section on Data Buffers and Codecs. The key point to understand is that on some servers like Netty, byte buffers are pooled and reference counted, and must be released when consumed to avoid memory leaks.spring-doc.cadn.net.cn

When running on Netty, applications must use DataBufferUtils.retain(dataBuffer) if they wish to hold on input data buffers in order to ensure they are not released, and subsequently use DataBufferUtils.release(dataBuffer) when the buffers are consumed.spring-doc.cadn.net.cn

Handshake

WebSocketHandlerAdapter delegates to a WebSocketService. By default, that is an instance of HandshakeWebSocketService, which performs basic checks on the WebSocket request and then uses RequestUpgradeStrategy for the server in use. Currently, there is built-in support for Reactor Netty, Tomcat, Jetty, and Undertow.spring-doc.cadn.net.cn

HandshakeWebSocketService exposes a sessionAttributePredicate property that allows setting a Predicate<String> to extract attributes from the WebSession and insert them into the attributes of the WebSocketSession.spring-doc.cadn.net.cn

Server Configuration

The RequestUpgradeStrategy for each server exposes configuration specific to the underlying WebSocket server engine. When using the WebFlux Java config you can customize such properties as shown in the corresponding section of the WebFlux Config, or otherwise if not using the WebFlux config, use the below:spring-doc.cadn.net.cn

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

Check the upgrade strategy for your server to see what options are available. Currently, only Tomcat and Jetty expose such options.spring-doc.cadn.net.cn

CORS

The easiest way to configure CORS and restrict access to a WebSocket endpoint is to have your WebSocketHandler implement CorsConfigurationSource and return a CorsConfiguration with allowed origins, headers, and other details. If you cannot do that, you can also set the corsConfigurations property on the SimpleUrlHandler to specify CORS settings by URL pattern. If both are specified, they are combined by using the combine method on CorsConfiguration.spring-doc.cadn.net.cn

Client

Spring WebFlux provides a WebSocketClient abstraction with implementations for Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356).spring-doc.cadn.net.cn

The Tomcat client is effectively an extension of the standard Java one with some extra functionality in the WebSocketSession handling to take advantage of the Tomcat-specific API to suspend receiving messages for back pressure.

To start a WebSocket session, you can create an instance of the client and use its execute methods:spring-doc.cadn.net.cn

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://localhost:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

Some clients, such as Jetty, implement Lifecycle and need to be stopped and started before you can use them. All clients have constructor options related to configuration of the underlying WebSocket client.spring-doc.cadn.net.cn