WebFlux之Websocket
2021-03-01 02:26
标签:return inf load data require remote bin sas pool har 参考 https://www.cnblogs.com/nuccch/p/10947256.html WebSocket是基于TCP的应用层协议,用于在C/S架构的应用中实现双向通信,关于WebSocket协议的详细规范和定义参见rfc6455。 WebSocket与Http的区别 websocket应用层的连接协议有两种 标准的websocket连接请求头需要包含 WebSocket协议格式: 继承WebSocketHandler EchoWebSocketHandler继承WebSocketHandler 注册HandlerMapping 注册了 /echo 到 EchoWebSocketHandler 的映射 注入WebSocketHandlerAdapter 注入了SimpleUrlHandlerMapping和WebSocketHandlerAdapter,看下如何处理来自浏览器的请求 supports方法判断EchoWebSocketHandler的返回结果为true handle方法调用WebSocketService的handleRequest方法 实现类为HandshakeWebSocketService 验证HTTP请求头 RequestUpgradeStrategy处理websocket请求升级,即将HTTP协议转为Websocket协议,后面的数据都通过WebSocket数据帧进行通信 Reactor Netty的实现类ReactorNettyRequestUpgradeStrategy 调用HttpServerOperations.sendWebsocket进一步处理 握手信息HandshakeInfo 握手协议的建立 创建WebSocketServerHandshakerFactory 创建WebSocketServerHandshaker = WebSocketServerHandshakerFactory.newHandshaker 这里会根据sec-websocket-protocol加载相应的协议 移除reactor.left.httpTrafficHandler WebSocketServerHandshaker.handshake实现握手 实现类有WebSocketServerHandshaker13、WebSocketServerHandshaker08、WebSocketServerHandshaker07、WebSocketServerHandshaker00 新建HandshakeResponse 主要设置了三个Header 移除HttpObjectAggregator pipeline 移除HttpContentCompressor pipeline 在HttpServerCodec pipeline之前添加WebsocketDecoder和WebsocketEncoder 写HandshakeResponse到输出流 握手前后的Netty Pipeline对比如下 注:WebSocketFrameAggregator是在org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#receive方法中添加的 reactor.netty.http.server.HttpServerOperations#withWebsocketSupport websocketHandler.apply实际调用的是org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy#upgrade handler的实际类是EchoWebSocketHandler 所以最终调用了 核心解码在WebSocket08FrameDecoder类 再来看下WebSocket协议格式: 接上面 wsdecoder处理后数据封装为TextWebSocketFrame WebSocketFrameAggregator遍历TextWebSocketFrame数据,逐个发送到下一个pipeline处理 reactiveBridge的handler为ChannelOperationsHandler,调用ReactorNetty.CONNECTION绑定的Connection进行处理 reactor.netty.channel.ChannelOperationsHandler#channelRead 实际调用为FluxReceive.onInboundNext 现在分析下FluxReceive对象是如何生成处理链的 com.example.demo.EchoWebSocketHandler#handle org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#receive com.example.demo.EchoWebSocketHandler#handle org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#send reactor.netty.http.server.HttpServerOperations#withWebsocketSupport WebFlux之Websocket 标签:return inf load data require remote bin sas pool har 原文地址:https://www.cnblogs.com/huiyao/p/14454510.html
协议
需要特别注意的是:虽然WebSocket协议在建立连接时会使用HTTP协议,但这并意味着WebSocket协议是基于HTTP协议实现的
连接(Handshake)
GET /xxx HTTP/1.1
# 主机。
Host: server.example.com
# 协议升级。
Upgrade: websocket
# 连接状态。
Connection: Upgrade
# websocket客户端生成的随机字符。
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
# websocket协议的版本是13。
Sec-WebSocket-Version: 13
数据帧(Message)
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
webflux集成
public class EchoWebSocketHandler implements WebSocketHandler {
public EchoWebSocketHandler() {
}
@Override
public Mono
@Bean
public HandlerMapping handlerMapping() {
Map
@Bean
WebSocketHandlerAdapter webSocketHandlerAdapter(){
return new WebSocketHandlerAdapter();
}
原理解析
1.DispatcherHandler
public Mono
2.WebSocketHandlerAdapter
@Override
public boolean supports(Object handler) {
return WebSocketHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Mono
3.WebSocketService
public Mono
4.RequestUpgradeStrategy
public Mono
private HandshakeInfo createHandshakeInfo(ServerWebExchange exchange, ServerHttpRequest request,
@Nullable String protocol, Map
5.HttpServerOperations
public Mono
public WebSocketServerHandshaker newHandshaker(HttpRequest req) {
CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
if (version != null) {
if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
return new WebSocketServerHandshaker13(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch);
} else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
return new WebSocketServerHandshaker08(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch);
} else {
return version.equals(WebSocketVersion.V07.toHttpHeaderValue()) ? new WebSocketServerHandshaker07(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch) : null;
}
} else {
return new WebSocketServerHandshaker00(this.webSocketURL, this.subprotocols, this.maxFramePayloadLength);
}
}
WebsocketServerOperations(String wsUrl, @Nullable String protocols, int maxFramePayloadLength, HttpServerOperations replaced) {
super(replaced);
Channel channel = replaced.channel();
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(wsUrl, protocols, true, maxFramePayloadLength);
this.handshaker = wsFactory.newHandshaker(replaced.nettyRequest);
if (this.handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel);
this.handshakerResult = null;
} else {
this.removeHandler("reactor.left.httpTrafficHandler");
this.handshakerResult = channel.newPromise();
HttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri());
request.headers().set(replaced.nettyRequest.headers());
if (this.channel().pipeline().get("reactor.left.compressionHandler") != null) {
this.removeHandler("reactor.left.compressionHandler");
WebSocketServerCompressionHandler wsServerCompressionHandler = new WebSocketServerCompressionHandler();
try {
wsServerCompressionHandler.channelRead(channel.pipeline().context("reactor.right.reactiveBridge"), request);
this.addHandlerFirst("reactor.left.wsCompressionHandler", wsServerCompressionHandler);
} catch (Throwable var10) {
log.error(ReactorNetty.format(this.channel(), ""), var10);
}
}
this.handshaker.handshake(channel, request, replaced.responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING), this.handshakerResult).addListener((f) -> {
this.markPersistent(false);
});
}
}
6.WebSocketServerHandshaker
public final ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug("{} WebSocket version {} server handshake", channel, this.version());
}
FullHttpResponse response = this.newHandshakeResponse(req, responseHeaders);
ChannelPipeline p = channel.pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
if (p.get(HttpContentCompressor.class) != null) {
p.remove(HttpContentCompressor.class);
}
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
final String encoderName;
if (ctx == null) {
ctx = p.context(HttpServerCodec.class);
if (ctx == null) {
promise.setFailure(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return promise;
}
p.addBefore(ctx.name(), "wsdecoder", this.newWebsocketDecoder());
p.addBefore(ctx.name(), "wsencoder", this.newWebSocketEncoder());
encoderName = ctx.name();
} else {
p.replace(ctx.name(), "wsdecoder", this.newWebsocketDecoder());
encoderName = p.context(HttpResponseEncoder.class).name();
p.addBefore(encoderName, "wsencoder", this.newWebSocketEncoder());
}
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
p.remove(encoderName);
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
@Override
public Flux
7.WebsocketHandler绑定
FutureMono.from(ops.handshakerResult)
.doOnEach(signal -> {
if(!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) {
websocketHandler.apply(ops, ops)
.subscribe(new WebsocketSubscriber(ops, signal.getContext()));
}
});
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
return handler.handle(session);
})
public Mono
8.websocket解码
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
9.数据帧处理
ChannelOperations, ?> ops = ChannelOperations.get(ctx.channel());
if (ops != null) {
ops.onInboundNext(ctx, msg);
}
@Override
public Mono
public Flux
session.receive().doOnNext(WebSocketMessage::retain) // doOnNext 5 FluxPeek
public Mono
websocketHandler.apply(ops, ops).subscribe(new WebsocketSubscriber(ops, signal.getContext()) //9 WebsocketSubscriber