本文要求:大概了解使用方法,知道如何通过okhttp建立websocket,四个回调的名字最好大概记住,因为后面会直接使用。
websocket协议有两种消息类型, 被称为frame, 帧,一种是消息类, 就是我们普通通信使用的, 一种是控制通信用的,比如关闭用的close帧,心跳相关的ping帧和pong帧。
if (isControlFrame) {
readControlFrame();
} else {
readMessageFrame();
}
而发射的概念就是write a frame,到服务端, 接受呢就是响应的read a frame
两者最为关键的源码就是runWriter()
和 loopReader()
这两个函数.
本文先从程序的入口开始查看设计和逻辑, 最后再从最贴近业务的,暴露给应用层使用的 发射,接受,以及回调 来总结归纳一下。
可以说市面上大部分的博客都是写的人都是一知半解,写出来的更是千篇一律,大部分仅仅讲解了demo级别的使用方法,缺失了很多情况的处理, 以及重要的注意点。
1. 入口
val client = OkHttpClient.Builder()
client.newWebSocket(request, object : WebSocketListener() {}
还是从OkHttpClient开始的
/**
* Uses {@code request} to connect a new web socket.
*/
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
webSocket.connect(this);
return webSocket;
}
newWebSocket()
函数里完成的事情:
- 初始化RealWebSocket
- 调用RealWebSocket$connect()
1.1初始化
public RealWebSocket(Request request, WebSocketListener listener, Random random,
long pingIntervalMillis) {
if (!"GET".equals(request.method())) {
throw new IllegalArgumentException("Request must be GET: " + request.method());
}
this.originalRequest = request;
this.listener = listener;
this.random = random;
this.pingIntervalMillis = pingIntervalMillis;
byte[] nonce = new byte[16];
random.nextBytes(nonce);
this.key = ByteString.of(nonce).base64();
this.writerRunnable = new Runnable() {
@Override public void run() {
try {
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null);
}
}
};
}
这里request和random用来建立连接,把http1升级成websocket。
this.pingIntervalMillis
是指定心跳的间隔, OkHttpClient.Builder()构建的时候默认是0,默认的话,就不会发射ping帧了,后面会提到。
还有重要的一步是初始化了this.writerRunnable
,这个runnable,是后续所有write操作(write就是客户端发送的操作,read就是接受服务端发来的操作)最终调用的任务(runWriter)
这个任务是无限循环writeOneFrame()
函数
注释基本写的非常的清楚了:
尝试从queue里取出一帧,然后send他,pong帧优先级高于message帧和close帧,会优先发射,如果一个调用者入队了一个被pong帧跟随的message帧,那么会发射pong帧,让message帧跟随, pong帧当他们入队的时候, 永远会被下一个执行。
如果queue已经空了, 或者websokcet断开连接了, 就不能send帧了。
这样会什么都不做,只是返回false,否则会返回true且立刻再次调用本方法直到返回false。
这个方法只能呗writter thread调用, 可能只有一个线程在同一时间调用这个方法(应该是一定的吧, 因为会检查是否有锁)。
看下源码其实pongQueue和messageAndCloseQueue是两个queue,先处理pongQueue的逻辑
如果poll出了pong帧,就直接
writer.writePong(pong);
发射如果没有poll出pong帧,那么处理messageAndCloseQueue
1.1.1处理messageAndCloseQueue
如果没有pong的话, messageAndCloseQueue.poll()取出消息messageOrClose
if(messageOrClose is null)
return false队列空了, 这个while死循环就断掉了
if(messageOrClose is Close帧)
if(receivedCloseCode!= -1,也就这是从服务器接收到的close帧)
关闭excutor
else 证明是自己发射的close帧
优雅的关闭,60秒后调用call.cancel
不管优不优雅是不是自己enqueue的Close帧,都会执行下面的代码
发射close帧;
如果是服务端接受到的close帧那么会回调onClosed()方法,通知调用者;
最后return true
/** The close code from the peer, or -1 if this web socket has not yet read a close frame. */
private int receivedCloseCode = -1;
初始化的代码基本就这些了,但却非常的重要,因为他定义了所有与发射相关的逻辑。
1.2connect()函数
public void connect(OkHttpClient client) {
client = client.newBuilder()
.eventListener(EventListener.NONE)
.protocols(ONLY_HTTP1)
.build();
final Request request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build();
call = Internal.instance.newWebSocketCall(client, request);
call.enqueue(new Callback() {
@Override public void onResponse(Call call, Response response) {
try {
checkResponse(response);
} catch (ProtocolException e) {
failWebSocket(e, response);
closeQuietly(response);
return;
}
// Promote the HTTP streams into web socket streams.
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
streamAllocation.noNewStreams(); // Prevent connection pooling!
Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
// Process all web socket messages.
try {
listener.onOpen(RealWebSocket.this, response);
String name = "OkHttp WebSocket " + request.url().redact();
initReaderAndWriter(name, streams);
streamAllocation.connection().socket().setSoTimeout(0);
loopReader();
} catch (Exception e) {
failWebSocket(e, null);
}
}
@Override public void onFailure(Call call, IOException e) {
failWebSocket(e, null);
}
});
}
connect函数完成了websocket协议要求的升级过程(如果不了解,请简单了解下websocket协议),可以看到他添加了一些请求头, 并且调用了一次http1的握手协议, 如果失败的话,回调failWebSocket给用户通知。
主要看下升级成功的逻辑, 此时已经是websocket协议了, 可以看到上来就检查response,确认无误后,回调onOpen通知用户。下面是几件重要的步骤
- 初始化reader和writer
- loopReader()死循环读服务端发来的消息
- 设置超时时间为0,证明是阻塞IO