OkHttp Dispatcher的调度过程分析

Dispatcher是负责对okhttp所有的请求进行调度管理的类。可以通过Dispatcher获取,或者取消所有请求。这里指的一个请求就是对应的Call并不是Request,下面出现的所有的请求都是指Call。首先看下调度的整个流程图。接着通过分析跟踪okhttp发送请求的过程来分析Dispatcher是如何维护和调度我们发出的所有请求的。

Dispatcher调度流程

Call其实就是对Request的封装。

OkHttp请求方式

通过okhttp发送请求主要有两种方式。

  1. 通过execute()调用,此时request会被马上发出, 直到返回response或者发生错误前会一直阻塞。可以理解为一个立即执行的同步请求。
  2. 通过enqueue()调用,此时request将会在未来的某个时间点被执行,具体由dispatcher进行调度,这种方式是异步返回结果的。可以理解为会被尽快执行的一个异步请求。

第一种方式

通过execute()调用,一般是这样的

okHttpClient.newCall(request).execute();

通过OkHttpClient的代码可以看出newCall()方法其实是new了一个RealCall,所以这里直接查看RealCallexecute()方法。

@Override public Call newCall(Request request) {
  return new RealCall(this, request, false /* for web socket */);
}

RealCallexecute()方法,这里只看下核心的代码:

@Override public Response execute() throws IOException {
  synchronized (this) {
  //此处除去一些其他代码
  //...
  try {
    //通知Dispatcher这个Call正在被执行,同时将此Call交给Dispatcher
    //Dispatcher可以对此Call进行管理
    client.dispatcher().executed(this);
    //请求的过程,注意这行代码是阻塞的,直到返回result!
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
  } finally {
    //此时这个请求已经执行完毕了,通知Dispatcher,不要再维护这个Call了
    client.dispatcher().finished(this);
  }
}

首先注意这行代码

client.dispatcher().executed(this);

它是调用的Dispatcher的executed()方法,注意看方法名是executed并不是execute。接下来去Dispatcher里看下这个方法做了什么。

/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}

看注释就明白了,这里他只是一个通知的作用,通知Dispatcher我这个call立即要被或者正在被执行,然后Dispatcher会把加入一个名为runningSyncCalls的双端队列中,这个队列中存储着所有的正在运行的同步请求。这样Dispatcher就可以很方便的对所有的同步请求进行管理了。既然有添加,那么也应该有删除,在请求执行完毕时调用了这行代码:

client.dispatcher().finished(this);

通过字面意思理解他应该就是删除的操作,通知Dispatcher这个请求已经被执行完毕了。这里暂时理解为调用finished方法就是将此call从runningSyncCalls中移除,后面会再讨论finished方法的细节。

因为同步请求是被马上执行的,所以Dispatcher能对同步请求进行的调度也只有cancel了。具体可以通过调用Dispatcher.cancelAll()方法进行取消。

所以真正执行请求的只有这行代码了。

Response result = getResponseWithInterceptorChain();

这个方法先不管他,就可以理解为这行代码的执行就是请求从发出到完成的过程。在分析拦截器的实现原理的时候再来讨论。

第二种方式

client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
});

通过上面我们已经知道这里调用的也是RealCall的enqueue方法,我们直接来看代码:

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
  //判断是否已经执行过了
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  //捕获调用栈的信息,用来分析连接泄露
  captureCallStackTrace();
  //封装一个AsyncCall交给Dispatcher调度
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

通过上面的代码可以看出调用enqueue()方法,其实是调用了Dispatcher的enqueue()方法,并且new了一个AsyncCall作为参数。AsyncCall为RealCall的一个内部类,下面继续看AsyncCall类里到底做了什么。

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    
    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
    //...
    /**
    *真正执行发出请求的地方,为了看起来清晰,精简了部分代码
    */
    @Override protected void execute() {
      try {
        //请求的过程,注意这里也是阻塞的
        Response response = getResponseWithInterceptorChain();
        //先不管这个Interceptor是干嘛的,下面的代码可以理解为:
        //如果没有被取消,并且没有发生异常,回调onResponse方法。
        //如果发生了异常或者被取消,回调onFailure方法。
        if (retryAndFollowUpInterceptor.isCanceled()) {
          //此请求被取消了,回调onFailure
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          //此请求成功了,回调onResponse
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
          //发生了异常,回调onFailure
          responseCallback.onFailure(RealCall.this, e);
      } finally {
        //通知Dispatcher Call被执行完毕了
        client.dispatcher().finished(this);
      }
    }
  }

可以看到AsyncCall的execute()就是具体请求执行的地方,只不过和上面的RealCall的execute()方法相比,多了回调的处理。retryAndFollowUpInterceptor其实是负责请求超时的重试和重定向操作的,retryAndFollowUpInterceptor.isCanceled()就是用来判断这个请求是否被取消了,这里就不深入展开了。那么AsyncCall的execute()方法是怎么被执行的呢,继续来看AsyncCall的父类NamedRunnable。

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      //注意这里调用了execute方法
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

可以看到NamedRunnable其实就是一个实现了Runnable接口的抽象类,并且在run方法中调用了execute()。也就是说AsyncCal其实就是一个Runnable,当这个Runnable被调用的时候execute()方法自然会被调用。看到这里就很清晰了,再回过头来看RealCall的enqueue()中调用的这段代码

//封装一个AsyncCall交给Dispatcher调度
client.dispatcher().enqueue(new AsyncCall(responseCallback));

其实这里的new AsyncCall(responseCallback)就是new了一个封装的Runnable对象,这个Runnable的执行,就是整个请求的发起与回调的过程。好啦,这里搞明白了其实调用Dispatcher().enqueue()方法传递过去的是一个Runnable对象,接下来就去Dispatcher中看下,他对这个Runnable做了什么。

synchronized void enqueue(AsyncCall call) {
  //判断正在执行的异步请求数没有达到阈值,并且每一个Host的请求数也没有达到阈值
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    //加入到正在执行队列,并立即执行
    runningAsyncCalls.add(call);
    executorService().execute(call);
  } else {
    //加入到等待队列
    readyAsyncCalls.add(call);
  }
}

上面的代码中又出现了两个双端队列,runningAsyncCalls和readyAsyncCalls,加上上面出现的runningSyncCalls可以看到Dispatcher一共维护了3个请求队列,分别是

  1. runningAsyncCalls,正在请求的异步队列
  2. readyAsyncCalls,准备请求的异步队列\等待请求的异步队列
  3. runningSyncCalls,正在请求的同步队列

还出现了一个方法executorService(),接下来看下这个方法是干嘛的。

private ExecutorService executorService;
public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

可以看出来这个方法就是以懒汉的方式创建最大容量为 Integer.MAX_VALUE, 存活等待时间为60S的线程池(其实这里的最大容量并没什么用,因为他的最大容量不会超过runningAsyncCalls的size,即设置的并发请求数的阈值)。executorService().execute(call)就是把这个请求丢进去执行。那么enqueue()方法执行的过程大概就是,首先判断当前正在执行的异步请求总数是否已经达到的阈值(默认为64),针对每个host的同时请求数量是否达到了阈值(默认为5)。如果都没有达到那么将这个请求加入到runningAsyncCalls队列中,马上执行。

否则,会将这个请求加入到readyAsyncCalls中,准备执行。那么readyAsyncCalls中的请求时何时被调用的呢?掐指一算,应该是在runningAsyncCalls中某些请求被执行完毕时,不满足上面的两个条件自然会被调用。是不是呢?接下来看上面一直忽略的Dispatcher的三个finished方法:

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
  //异步请求结束时调用此方法
  finished(runningAsyncCalls, call, true);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
  //同步请求结束时调用此方法
  finished(runningSyncCalls, call, false);
}
/**
*将执行完毕的call从相应的队列移除
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
    //从相应的队列中移除相应的call,如果不包含,抛异常
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    //是否需要提升Call的级别
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
  }
    //如果没有任何需要执行的请求,那么执行idleCallBack
  if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
  }
}

可以看出来不管是异步调用结束,还是同步调用结束,最终都是调用的这个被private修饰的finished方法,都会将完成的call从相应的队列中移除。唯一不同的是调用时传递的promoteCalls参数不同,异步请求结束时传入的是true,同步请求时结束传入的是false。并且会根据这个flag来判断是否执行promoteCalls()方法,接下来看promoteCalls()里做了什么。

/**
*提升call的优先级
*/
private void promoteCalls() {
  //runningAsyncCalls已经满了,不能再加了
  if (runningAsyncCalls.size() >= maxRequests) return; 
  //没有请求在readyAsyncCalls等着被执行
  if (readyAsyncCalls.isEmpty()) return; 
  //遍历准备队列里的请求
  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();
    //判断该请求的host是否小于每个host最大请求阈值
    if (runningCallsForHost(call) < maxRequestsPerHost) {
      //将该请求从readyAsyncCalls移除,加入runningAsyncCalls并执行
      i.remove();
      runningAsyncCalls.add(call);
      executorService().execute(call);
    }
    //如果runningAsyncCalls数量已经达到阈值,终止遍历
    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
  }
}

可以看出promoteCalls()方法就是试图去readyAsyncCalls中取出Call来加入runningAsyncCalls中执行。所以上面的两个finished方法调用方式的区别也就明晰了。同步调用结束因为并没有涉及到runningAsyncCalls中的任何东西,对runningAsyncCalls没任何影响,所以不需要调用promoteCalls。而异步的调用结束意味着runningAsyncCalls中会出现一个空位值,所以它会调用promoteCalls去尝试从readyAsyncCalls中拉一个进来。

总结

好啦 到这里整个dispatcher的调度分析算是完成了。总结起来其实他就是维护了三个队列,三个队列中包含了正在执行或者将要执行的所有请求。总结起来就是:

  1. 当发送一个异步请求时:如果runningAsyncCalls没达到阈值,那么会将这个请求加入到runningAsyncCalls立即执行,否则会将这个请求加入到readyAsyncCalls中等待执行。当一个异步请求执行完毕时会试图去执行readyAsyncCalls中的请求。
  2. 当发送一个同步请求时:该请求会直接加入到runningSyncCalls中,并且马上开始执行,注意这个执行并不是由Dispatcher调度的。
  3. 所有异步执行的请求都会通过executorService线程池来执行,这是个懒汉方式创建的线程池。
    最后再看下上面的流程图整个过程是不是都清晰了呢!
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,009评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,808评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,891评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,283评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,285评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,409评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,809评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,487评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,680评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,499评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,548评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,268评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,815评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,872评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,102评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,683评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,253评论 2 341

推荐阅读更多精彩内容