Dispatcher是负责对okhttp所有的请求进行调度管理的类。可以通过Dispatcher获取,或者取消所有请求。这里指的一个请求就是对应的Call,并不是指Request,下面出现的所有的请求都是指Call。首先看下调度的整个流程图。接着通过分析跟踪okhttp发送请求的过程来分析Dispatcher是如何维护和调度我们发出的所有请求的。
Call其实就是对Request的封装。
OkHttp请求方式
通过okhttp发送请求主要有两种方式。
- 通过
execute()
调用,此时request会被马上发出, 直到返回response或者发生错误前会一直阻塞。可以理解为一个立即执行的同步请求。 - 通过
enqueue()
调用,此时request将会在未来的某个时间点被执行,具体由dispatcher进行调度,这种方式是异步返回结果的。可以理解为会被尽快执行的一个异步请求。
第一种方式
通过execute()
调用,一般是这样的
okHttpClient.newCall(request).execute();
通过OkHttpClient的代码可以看出newCall()
方法其实是new了一个RealCall,所以这里直接查看RealCall的execute()
方法。
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
RealCall的execute()
方法,这里只看下核心的代码:
@Override public Response execute() throws IOException {
synchronized (this) {
//此处除去一些其他代码
//...
try {
//通知Dispatcher这个Call正在被执行,同时将此Call交给Dispatcher
//Dispatcher可以对此Call进行管理
client.dispatcher().executed(this);
//请求的过程,注意这行代码是阻塞的,直到返回result!
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
//此时这个请求已经执行完毕了,通知Dispatcher,不要再维护这个Call了
client.dispatcher().finished(this);
}
}
首先注意这行代码
client.dispatcher().executed(this);
它是调用的Dispatcher的executed()
方法,注意看方法名是executed并不是execute。接下来去Dispatcher里看下这个方法做了什么。
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
看注释就明白了,这里他只是一个通知的作用,通知Dispatcher我这个call立即要被或者正在被执行,然后Dispatcher会把加入一个名为runningSyncCalls的双端队列中,这个队列中存储着所有的正在运行的同步请求。这样Dispatcher就可以很方便的对所有的同步请求进行管理了。既然有添加,那么也应该有删除,在请求执行完毕时调用了这行代码:
client.dispatcher().finished(this);
通过字面意思理解他应该就是删除的操作,通知Dispatcher这个请求已经被执行完毕了。这里暂时理解为调用finished方法就是将此call从runningSyncCalls中移除,后面会再讨论finished方法的细节。
因为同步请求是被马上执行的,所以Dispatcher能对同步请求进行的调度也只有cancel了。具体可以通过调用Dispatcher.cancelAll()
方法进行取消。
所以真正执行请求的只有这行代码了。
Response result = getResponseWithInterceptorChain();
这个方法先不管他,就可以理解为这行代码的执行就是请求从发出到完成的过程。在分析拦截器的实现原理的时候再来讨论。
第二种方式
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
通过上面我们已经知道这里调用的也是RealCall的enqueue方法,我们直接来看代码:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
//判断是否已经执行过了
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//捕获调用栈的信息,用来分析连接泄露
captureCallStackTrace();
//封装一个AsyncCall交给Dispatcher调度
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
通过上面的代码可以看出调用enqueue()
方法,其实是调用了Dispatcher的enqueue()
方法,并且new了一个AsyncCall作为参数。AsyncCall为RealCall的一个内部类,下面继续看AsyncCall类里到底做了什么。
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
//...
/**
*真正执行发出请求的地方,为了看起来清晰,精简了部分代码
*/
@Override protected void execute() {
try {
//请求的过程,注意这里也是阻塞的
Response response = getResponseWithInterceptorChain();
//先不管这个Interceptor是干嘛的,下面的代码可以理解为:
//如果没有被取消,并且没有发生异常,回调onResponse方法。
//如果发生了异常或者被取消,回调onFailure方法。
if (retryAndFollowUpInterceptor.isCanceled()) {
//此请求被取消了,回调onFailure
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//此请求成功了,回调onResponse
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
//发生了异常,回调onFailure
responseCallback.onFailure(RealCall.this, e);
} finally {
//通知Dispatcher Call被执行完毕了
client.dispatcher().finished(this);
}
}
}
可以看到AsyncCall的execute()
就是具体请求执行的地方,只不过和上面的RealCall的execute()
方法相比,多了回调的处理。retryAndFollowUpInterceptor其实是负责请求超时的重试和重定向操作的,retryAndFollowUpInterceptor.isCanceled()
就是用来判断这个请求是否被取消了,这里就不深入展开了。那么AsyncCall的execute()
方法是怎么被执行的呢,继续来看AsyncCall的父类NamedRunnable。
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//注意这里调用了execute方法
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
可以看到NamedRunnable其实就是一个实现了Runnable接口的抽象类,并且在run方法中调用了execute()
。也就是说AsyncCal其实就是一个Runnable,当这个Runnable被调用的时候execute()
方法自然会被调用。看到这里就很清晰了,再回过头来看RealCall的enqueue()
中调用的这段代码
//封装一个AsyncCall交给Dispatcher调度
client.dispatcher().enqueue(new AsyncCall(responseCallback));
其实这里的new AsyncCall(responseCallback)
就是new了一个封装的Runnable对象,这个Runnable的执行,就是整个请求的发起与回调的过程。好啦,这里搞明白了其实调用Dispatcher().enqueue()
方法传递过去的是一个Runnable对象,接下来就去Dispatcher中看下,他对这个Runnable做了什么。
synchronized void enqueue(AsyncCall call) {
//判断正在执行的异步请求数没有达到阈值,并且每一个Host的请求数也没有达到阈值
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//加入到正在执行队列,并立即执行
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
//加入到等待队列
readyAsyncCalls.add(call);
}
}
上面的代码中又出现了两个双端队列,runningAsyncCalls和readyAsyncCalls,加上上面出现的runningSyncCalls可以看到Dispatcher一共维护了3个请求队列,分别是
- runningAsyncCalls,正在请求的异步队列
- readyAsyncCalls,准备请求的异步队列\等待请求的异步队列
- runningSyncCalls,正在请求的同步队列
还出现了一个方法executorService()
,接下来看下这个方法是干嘛的。
private ExecutorService executorService;
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看出来这个方法就是以懒汉的方式创建最大容量为 Integer.MAX_VALUE
, 存活等待时间为60S的线程池(其实这里的最大容量并没什么用,因为他的最大容量不会超过runningAsyncCalls的size,即设置的并发请求数的阈值)。executorService().execute(call)
就是把这个请求丢进去执行。那么enqueue()
方法执行的过程大概就是,首先判断当前正在执行的异步请求总数是否已经达到的阈值(默认为64),针对每个host的同时请求数量是否达到了阈值(默认为5)。如果都没有达到那么将这个请求加入到runningAsyncCalls队列中,马上执行。
否则,会将这个请求加入到readyAsyncCalls中,准备执行。那么readyAsyncCalls中的请求时何时被调用的呢?掐指一算,应该是在runningAsyncCalls中某些请求被执行完毕时,不满足上面的两个条件自然会被调用。是不是呢?接下来看上面一直忽略的Dispatcher的三个finished方法:
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
//异步请求结束时调用此方法
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
//同步请求结束时调用此方法
finished(runningSyncCalls, call, false);
}
/**
*将执行完毕的call从相应的队列移除
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//从相应的队列中移除相应的call,如果不包含,抛异常
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//是否需要提升Call的级别
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//如果没有任何需要执行的请求,那么执行idleCallBack
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
可以看出来不管是异步调用结束,还是同步调用结束,最终都是调用的这个被private修饰的finished方法,都会将完成的call从相应的队列中移除。唯一不同的是调用时传递的promoteCalls参数不同,异步请求结束时传入的是true,同步请求时结束传入的是false。并且会根据这个flag来判断是否执行promoteCalls()
方法,接下来看promoteCalls()
里做了什么。
/**
*提升call的优先级
*/
private void promoteCalls() {
//runningAsyncCalls已经满了,不能再加了
if (runningAsyncCalls.size() >= maxRequests) return;
//没有请求在readyAsyncCalls等着被执行
if (readyAsyncCalls.isEmpty()) return;
//遍历准备队列里的请求
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//判断该请求的host是否小于每个host最大请求阈值
if (runningCallsForHost(call) < maxRequestsPerHost) {
//将该请求从readyAsyncCalls移除,加入runningAsyncCalls并执行
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//如果runningAsyncCalls数量已经达到阈值,终止遍历
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
可以看出promoteCalls()
方法就是试图去readyAsyncCalls中取出Call来加入runningAsyncCalls中执行。所以上面的两个finished方法调用方式的区别也就明晰了。同步调用结束因为并没有涉及到runningAsyncCalls中的任何东西,对runningAsyncCalls没任何影响,所以不需要调用promoteCalls。而异步的调用结束意味着runningAsyncCalls中会出现一个空位值,所以它会调用promoteCalls去尝试从readyAsyncCalls中拉一个进来。
总结
好啦 到这里整个dispatcher的调度分析算是完成了。总结起来其实他就是维护了三个队列,三个队列中包含了正在执行或者将要执行的所有请求。总结起来就是:
- 当发送一个异步请求时:如果runningAsyncCalls没达到阈值,那么会将这个请求加入到runningAsyncCalls立即执行,否则会将这个请求加入到readyAsyncCalls中等待执行。当一个异步请求执行完毕时会试图去执行readyAsyncCalls中的请求。
- 当发送一个同步请求时:该请求会直接加入到runningSyncCalls中,并且马上开始执行,注意这个执行并不是由Dispatcher调度的。
- 所有异步执行的请求都会通过executorService线程池来执行,这是个懒汉方式创建的线程池。
最后再看下上面的流程图整个过程是不是都清晰了呢!