OkHttp设计及源码解析

Okhttp设计与源码解析

本文针对OkHttp的设计及源码作一下解析:

**Okhttp如何做到请求的控制?
Okhttp核心设计Interceptor
Okhttp如何支持https? **

1.核心类库、简单使用、主要流程


(此图片来自网络,作者不详)

从上述UML类图可以看出,Okhttp在设计时采用的门面模式,将整个系统的复杂性给隐藏起来,将子系统接口通过一个客户端OkHttpClient统一暴露出来,使用户与内部系统之间达到解构的目的,这种方式在很多开源框架上都有所体现,像Glide、Picaso、ImageLoader、EventBus之类的,都有所体现。

Okhttp使用上和HttpClient基本一致,没啥好说的。下面是基本的get异步请求:

OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder().url("http://www.baidu.com").build();
    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Log.i("ggg",e.getMessage());
        }

        @Override
        public void onResponse(Call call,Response response) throws IOException {
            // NOT UI Thread
            if (response.isSuccessful()) {
                Log.i("ggg",response.code()+"");
                Log.i("ggg",response.headers()+"");
                Log.i("ggg",response.body().string());
            }
            response.close();
        }
    });

有两点是需要注意的:一是Okhttp建议使用一个OkHttpClient,二是Reponse使用完需要关闭。
Okhttp的基本加载流程如下:

上图中的拦截器即Interceptor,后面会着重讲。

2.请求的分发处理,如何做到对请求数量控制前提下做到高并发、低阻塞?

想到并发必然想到多线程,继而就自然想到线程池,通常我们在使用线程池的时候,通过ThreadPoolExecutor构造方法:

 public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

来创建线程池,那么要想做到高并发低阻塞,就意味着maximumPoolSize的值需要比较大,Okhttp在构造线程池时,使用了如下参数:

    public synchronized ExecutorService executorService() {
    if (executorService == null) {
        executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

可以看到,maximumPoolSize基本没有限制大小,工作队列上,采用了SynchronousQueue,这是一个没有容量的队列。当执行enqueue的时候,事实上,Okhttp并没有直接将任务交给线程池来执行,而是通过讲任务调度交给了Dispatcher这个类似代理服务器,来进行调度,这样才做到了在控制同一个host请求数量的前提下,达到高并发低阻塞,我们来看看enqueue的实现:

@Override
public void enqueue(Callback responseCallback) {
    synchronized (this) {
        if (executed)
            throw new IllegalStateException("Already Executed");
        executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

当调用Realcall的enqueue时,会重新创建一个AsyncCall,AsyncCall是RealCall的一个内部类,因此可以看作是RealCall的一个超集。然后交给了Dispatcher来进行调度:

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
        runningAsyncCalls.add(call);
        executorService().execute(call);
    } else {
        readyAsyncCalls.add(call);
    }
}

这段代码逻辑很清晰,如果正在执行请求与同一个Host请求没有达到上限,那么将该次请求加入到正在执行集合中,并交由线程池立刻执行,否则讲该次请求加入到等待队列中,然后我们继续看AsyncCall的执行代码:

        @Override
    protected void execute() {
        //这个标记用于确保回调只得到一次执行
        boolean signalledCallback = false;
        try {
            //这句代码会通过拦截链来获取响应
            Response response = getResponseWithInterceptorChain();
            //请求被取消
            if (retryAndFollowUpInterceptor.isCanceled()) {
                signalledCallback = true;
                responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
                signalledCallback = true;
                responseCallback.onResponse(RealCall.this, response);
            }
        } catch (IOException e) {
            if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
                responseCallback.onFailure(RealCall.this, e);
            }
        } finally {
            //无论执行过程发生了什么,通知Dispatcher该次请求已执行完毕
            client.dispatcher().finished(this);
        }
    }

再看看finished方法:

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
        if (!calls.remove(call))
            throw new AssertionError("Call wasn't in-flight!");
        if (promoteCalls)
            //异步请求会执行这里
            promoteCalls();
        runningCallsCount = runningCallsCount();
        idleCallback = this.idleCallback;
    }
    //如果没有正在执行的请求并且设置了空闲回调,那么会执行此分发器的空闲回掉
    if (runningCallsCount == 0 && idleCallback != null) {
        idleCallback.run();
    }
}

如果是异步的请求,那么当这个请求走到finally中的finished方法时,会调用promoteCalls:

    private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests)
        return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty())
        return; // No ready calls to promote.
    //注意这里的等待请求是一个先进先出的队列结构
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext();) {
        AsyncCall call = i.next();
        //如果主机请求限制没到上限就会取出执行
        if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
        }
        //一直遍历到达到总数上限为止
        if (runningAsyncCalls.size() >= maxRequests)
            return; // Reached max capacity.
    }
}

也就是说,当一个请求结束时,会马上使用该线程继续执行等待中的请求,避免了线程的闲置与浪费,这里,finally的使用是一个亮点,这在AsyncCall的父类NamedRunable中,同样有所体现:

public abstract class NamedRunnable implements Runnable {
    protected final String name;

    public NamedRunnable(String format, Object... args) {
        this.name = Util.format(format, args);
    }

    @Override
    public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
            execute();
        } finally {
            Thread.currentThread().setName(oldName);
        }
    }

    protected abstract void execute();

}

这个类实现了,在该线程执行该Runable时,线程名字与该Runable一致。

3.Okhttp的核心设计,Interceptor

不管是同步还是异步的请求,最终都会走到方法getResponseWithInterceptorChain来获得Reponse,从字面意思,这个方法可以理解为根据拦截链获得Reponse,后面我们会说到怎么自定义拦截器:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //如果使用时我们没有自定义过拦截器,那么这里addAll的集合其实是空的
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
        //如果使用时我们没有自定义过拦截器,那么这里addAll的集合其实是空的
        interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //这里构造方法接收五个参数,目前除了拦截器,当前拦截器位置,初始的请求,其它为null。
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest);
    return chain.proceed(originalRequest);
}

这里代码最终创建了一个RealInterceptorChain,即拦截链,并调用了其proceed方法,来获取Reponse,继续跟进去:

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
        RealConnection connection) throws IOException {
    if (index >= interceptors.size())
        throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
                + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
                + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
        throw new IllegalStateException(
                "network interceptor " + interceptor + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
        throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    return response;
}

这段代码比较长,但是核心代码其实就三行:

RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

创建一个拦截链,将其index+1,并拿到当前的拦截器,然后调用拦截器的intercept方法,根据我们前面的getResponseWithInterceptorChain方法可以知道,这第一个应该是retryAndFollowUpInterceptor(前提是没有自定义拦截器),即用来作重试与重定向处理的拦截器:

@Override
public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
            callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
        if (canceled) {
            streamAllocation.release();
            throw new IOException("Canceled");
        }

        Response response = null;
        boolean releaseConnection = true;
        try {
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
            //如果上面这句代码执行时没有发生异常,那么标记置为false,即连接是正常可用的
            releaseConnection = false;
        } catch (RouteException e) {
            // The attempt to connect via a route failed. The request will not have been sent.
            if (!recover(e.getLastConnectException(), false, request)) {
                //如果无法恢复连接 直接抛异常结束循环
                throw e.getLastConnectException();
            }
            //可恢复,标记置为false,即连接是正常可用的,不需要释放
            releaseConnection = false;
            //循环继续,即重试
            continue;
        } catch (IOException e) {
            // An attempt to communicate with a server failed. The request may have been sent.
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, requestSendStarted, request))
                throw e;
            releaseConnection = false;
            continue;
        } finally {
            // We're throwing an unchecked exception. Release any resources.
            if (releaseConnection) {
                //最终如果连接不可用,那么需要释放掉,因为catch块抛出了异常,这里需要在finally处理
                streamAllocation.streamFailed(null);
                streamAllocation.release();
            }
        }

        // Attach the prior response if it exists. Such responses never have a body.
        if (priorResponse != null) {
            //如果有重定向之前的reponse
            response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build())
                    .build();
        }

        Request followUp = followUpRequest(response);

        if (followUp == null) {
            //如果没有重定向请求
            if (!forWebSocket) {
                streamAllocation.release();
            }
            //大部分情况下,会以返回response结束
            return response;
        }

        closeQuietly(response.body());
        //重定向
        if (++followUpCount > MAX_FOLLOW_UPS) {
            streamAllocation.release();
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
        }

        if (followUp.body() instanceof UnrepeatableRequestBody) {
            streamAllocation.release();
            throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
        }

        if (!sameConnection(response, followUp.url())) {
            streamAllocation.release();
            streamAllocation = new StreamAllocation(client.connectionPool(),
                    createAddress(followUp.url()), callStackTrace);
        } else if (streamAllocation.codec() != null) {
            throw new IllegalStateException("Closing the body of " + response
                    + " didn't close its backing stream. Bad interceptor?");
        }
        //
        request = followUp;
        priorResponse = response;
    }
}

方法也很长,但是流程比较简单,其中有一个StreamAllocation比较重要,但在这里我们先不讲。在try语块中,重新又回到了RealInterceptorChain的proceed方法之中,这个方法执行完毕后,会根据结果做不同处理,如失败、返回Reponse、重试、重定向。值得注意的是,这里的RealInterceptorChain是我们在之前new出来的,index为加1之后的,对应的Interceptor为BridgeInterceptor:

@Override
public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
        MediaType contentType = body.contentType();
        if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
        }

        long contentLength = body.contentLength();
        if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
        } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
        }
    }

    if (userRequest.header("Host") == null) {
        requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
        requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
        transparentGzip = true;
        requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
        requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
        requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);

    if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
        GzipSource responseBody = new GzipSource(networkResponse.body().source());
        Headers strippedHeaders = networkResponse.headers().newBuilder().removeAll("Content-Encoding")
                .removeAll("Content-Length").build();
        responseBuilder.headers(strippedHeaders);
        responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
}

代码虽长,但是很简明,主要是对请求的header、body与响应的header、body做了一些处理,然后继续调用拦截链的proceed方法来处理对request处理后的request,这一次,我们来到了CacheInterceptor:

@Override
public Response intercept(Chain chain) throws IOException {
    //拿到该request的缓存Response
    Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;

    long now = System.currentTimeMillis();
    //CacheStrategy即决定了发网络请求还是使用缓存
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    //如果没有缓存,或者缓存无效,并且请求头没有设置“Cache-Control: only-if-cached”,networkRequest则不为null
    Request networkRequest = strategy.networkRequest;
    //如果缓存存在或者有效,那么cacheResponse不为null
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
        //统计被请求、发出网络请求、使用缓存的数目
        cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
        //缓存虽存在,但是已经失效了,关闭资源
        closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
        //直接504超时
        return new Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(504)
                .message("Unsatisfiable Request (only-if-cached)").body(Util.EMPTY_RESPONSE)
                .sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build();
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {
        //缓存有效,自此,整个拦截链调用就结束了,这里就会回到我们之前的RetryAndFollowUpInterceptor的循环中去了
        return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
    }

    Response networkResponse = null;
    try {
        //缓存无效,那么继续走我们的拦截链
        networkResponse = chain.proceed(networkRequest);
    } finally {
        // If we're crashing on I/O or otherwise, don't leak the cache body.
        if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
        }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
        //如果拦截链继续走执行完返回的header有not modified,那么取缓存更新并返回
        if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                    .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                    .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                    .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                    .cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse))
                    .build();
            networkResponse.body().close();

            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
        } else {
            closeQuietly(cacheResponse.body());
        }
    }
    //如果是新数据,做缓存处理后返回
    Response response = networkResponse.newBuilder().cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse)).build();

    if (HttpHeaders.hasBody(response)) {
        CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
        response = cacheWritingResponse(cacheRequest, response);
    }

    return response;
}

这里注释比较详细,根据注释就能明白整个缓存处理的流程,我们继续走拦截链,当没有缓存或者缓存已经失效时,上述代码会执行:

 networkResponse = chain.proceed(networkRequest);

继续我们的拦截链,这一次,来到了ConnectInterceptor:

@Override
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

终于有个比较短的intercept了,但其实这里代码确是最复杂的。注意上述代码中,从RealInterceptorChain中拿到了一个StreamAllocation,这个我们之前没有提及,事实上,可以将它理解为一个协调器,用来协调connection、stream、call三者之间的关系,在http/2中每个connection可能对应多个stream,而同一个call可能也对应着多个stream。它是在第一个拦截器,即retryAndFollowUpInterceptor被初始化的,我们先看看它时如何初始化的:

   public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
    this.connectionPool = connectionPool;
    this.address = address;
    this.routeSelector = new RouteSelector(address, routeDatabase());
    this.callStackTrace = callStackTrace;
}

其初始化需要一个连接池、物理地址、路由选择器,新的类比较多,我们用到的时候再解释每个是干嘛用的,这里我们只看看Address这个参数是怎么获取的:

private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    if (url.isHttps()) {
        //如果是https,那么还需要初始化sslsocket、主机认证、证书编解码相关类
        sslSocketFactory = client.sslSocketFactory();
        hostnameVerifier = client.hostnameVerifier();
        certificatePinner = client.certificatePinner();
    }

    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(), sslSocketFactory,
            hostnameVerifier, certificatePinner, client.proxyAuthenticator(), client.proxy(),
            client.protocols(), client.connectionSpecs(), client.proxySelector());
}

回到ConnectInterceptor的intercept方法:

HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);

这句代码通过调用streamAllocation的newStream拿到了一个HttpCodec,HttpCodec实质上就是一个辅助类,主要负责在Socket连接中,遵循http的规范来进行流的输入输出,其内部是通过OK IO这个jar包中的相关API实现流的读写操作,OK IO我们就不去详细介绍了,内部通过java nio相关api实现的。只要记住,凡是继承sink的就是输出流,继承source的就是输入流即可,HttpCodec有两个子类,Http1Codec 与Http2Codec,分别对应着http/1.x和http/2.x,我们来看看newStream的具体实现:

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
        RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout,
                connectionRetryEnabled, doExtensiveHealthChecks);
        HttpCodec resultCodec = resultConnection.newCodec(client, this);

        synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
        }
    } catch (IOException e) {
        throw new RouteException(e);
    }
}

这里的关键代码在于如何拿到一个RealConnection,ReConnection实质上是一个对Socket连接的封装类,与Sdk的HttpUrlConnection功能上是差不多的,最终获取RealConnection的代码走到了方法:

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
        boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
        if (released)
            throw new IllegalStateException("released");
        if (codec != null)
            throw new IllegalStateException("codec != null");
        if (canceled)
            throw new IOException("Canceled");

        // 尝试去获取之前已经获取的connection
        RealConnection allocatedConnection = this.connection;
        if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
            //如果这个连接存在并且没有新的请求直接拿来用
            return allocatedConnection;
        }

        // 尝试从连接池中获取连接
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
            return connection;
        }

        selectedRoute = route;
    }
    //这里阻塞去获取能用的服务器,如果遍历完没有可用的服务器,会抛出异常结束程序
    if (selectedRoute == null) {
        selectedRoute = routeSelector.next();
    }

    RealConnection result;
    //涉及到连接池的读写,加上同步
    synchronized (connectionPool) {
        if (canceled)
            throw new IOException("Canceled");

        //获取新的服务器地址之后,再次尝试去连接池中找到连接来复用
        Internal.instance.get(connectionPool, address, this, selectedRoute);
        if (connection != null)
            return connection;

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        //这个方法将创建的connection赋值给成员变量connection
        acquire(result);
    }
    //这里释放了锁
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    //涉及到连接池的读写这里又锁上了
    synchronized (connectionPool) {
        // 将连接存入连接池
        Internal.instance.put(connectionPool, result);

        // If another multiplexed connection to the same address was created concurrently, then
        // release this connection and acquire that one.
        //这里意思是如果同时有一个http/2.0的connection存在,那么我们废弃刚刚创建的那个改用那个连接
        //之所以会存在同时创建的connection,注意到这段代码中间握手的代码并没有加上同步,因为它是阻塞代码
        //那么很可能我们这条线程在执行握手的时候,别的线程也在握手中,并且先于我们走到到将连接放入连接池
        // 的代码,之后我们再进来,就可能存在指向同一个地址的连接。
        if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
        }
    }
    //关闭socket的代码放在同步外面,因为这句代码没有必要同步,加快释放锁的时间
    closeQuietly(socket);

    return result;
}

这个方法代码比较复杂,所以注释的比较详细,我们再来看看RealConnection的握手过程

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
        boolean connectionRetryEnabled) {
    //协议已经确定
    if (protocol != null)
        throw new IllegalStateException("already connected");

    RouteException routeException = null;
    //传输层协议(tls)与加密套件,即OkHttpClient的默认数组中的
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    //创建一个传输协议选择器
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    if (route.address().sslSocketFactory() == null) {
        //说明不是https
        if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
            throw new RouteException(
                    new UnknownServiceException("CLEARTEXT communication not enabled for client"));
        }
        String host = route.address().url().host();
        //没看懂,因为isCleartextTrafficPermitted永远返回的true
        if (!Platform.get().isCleartextTrafficPermitted(host)) {
            throw new RouteException(new UnknownServiceException(
                    "CLEARTEXT communication to " + host + " not permitted by network security policy"));
        }
    }

    while (true) {
        try {
            //如果是https而又是通过http代理访问的,最终都会走到connnectSocket
            if (route.requiresTunnel()) {
                connectTunnel(connectTimeout, readTimeout, writeTimeout);
            } else {
                connectSocket(connectTimeout, readTimeout);
            }
            establishProtocol(connectionSpecSelector);
            break;
        } catch (IOException e) {
            closeQuietly(socket);
            closeQuietly(rawSocket);
            socket = null;
            rawSocket = null;
            source = null;
            sink = null;
            handshake = null;
            protocol = null;
            http2Connection = null;

            if (routeException == null) {
                routeException = new RouteException(e);
            } else {
                routeException.addConnectException(e);
            }

            if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
                throw routeException;
            }
        }
    }

    if (http2Connection != null) {
        synchronized (connectionPool) {
            allocationLimit = http2Connection.maxConcurrentStreams();
        }
    }
}

最终,代码走到了connectSock中真正建立了连接:

private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket() : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
        //这里,便真正地建立了连接,Platform用于平台适配
        Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
        ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
        ce.initCause(e);
        throw ce;
    }
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
}

这里主要做了三件事,创建了Socket、拿到了Socket建立连接后的输入流、输出流、有了这三者,和服务器的通讯就建立起来了,然后便通过establishProtocol(connectionSpecSelector),建立起与服务器进行通讯的协议:

private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    if (route.address().sslSocketFactory() == null) {
        //都不是https,那只能1.x了
        protocol = Protocol.HTTP_1_1;
        socket = rawSocket;
        return;
    }
    
    connectTls(connectionSpecSelector);
    //如果支持http2.0协议,那么创建一个http2Connection,并开启,这里实际是开启一个线程去不断地进行读操作
    if (protocol == Protocol.HTTP_2) {
        socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
        http2Connection = new Http2Connection.Builder(true)
                .socket(socket, route.address().url().host(), source, sink).listener(this).build();
        http2Connection.start();
    }
}

我们再看看connectTls:

private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    Address address = route.address();
    SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
    boolean success = false;
    SSLSocket sslSocket = null;
    try {
        // Create the wrapper over the connected socket.
        sslSocket = (SSLSocket) sslSocketFactory.createSocket(rawSocket, address.url().host(),
                address.url().port(), true /* autoClose */);

        // Configure the socket's ciphers, TLS versions, and extensions.
        //这里是去验证服务器握手传递过来的数据,包括公钥,传输协议版本等信息
        ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
        if (connectionSpec.supportsTlsExtensions()) {
            Platform.get().configureTlsExtensions(sslSocket, address.url().host(), address.protocols());
        }

        // Force handshake. This can throw!
        sslSocket.startHandshake();
        //通过握手,拿到Session,并将传输协议版本,支持的解密套件,及SSL证书解析出来
        Handshake unverifiedHandshake = Handshake.get(sslSocket.getSession());

        // Verify that the socket's certificates are acceptable for the target host.
        if (!address.hostnameVerifier().verify(address.url().host(), sslSocket.getSession())) {
            X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
            throw new SSLPeerUnverifiedException(
                    "Hostname " + address.url().host() + " not verified:" + "\n    certificate: "
                            + CertificatePinner.pin(cert) + "\n    DN: " + cert.getSubjectDN().getName()
                            + "\n    subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
        }

        // Check that the certificate pinner is satisfied by the certificates presented.
        //检查该证书是否有效,注意这里如果没有检查没通过将会抛出AssertException
        //结束程序,因此,访问自签名证书且没有将证书配置到OkHttpClient的https网站,将会导致程序crash
        address.certificatePinner().check(address.url().host(), unverifiedHandshake.peerCertificates());
        //到这里证书验证通过
        // Success! Save the handshake and the ALPN protocol.
        String maybeProtocol = connectionSpec.supportsTlsExtensions()
                ? Platform.get().getSelectedProtocol(sslSocket) : null;
        socket = sslSocket;
        source = Okio.buffer(Okio.source(socket));
        sink = Okio.buffer(Okio.sink(socket));
        handshake = unverifiedHandshake;
        //拿到服务器支持的协议
        protocol = maybeProtocol != null ? Protocol.get(maybeProtocol) : Protocol.HTTP_1_1;
        success = true;
    } catch (AssertionError e) {
        if (Util.isAndroidGetsocknameError(e))
            throw new IOException(e);
        throw e;
    } finally {
        if (sslSocket != null) {
            Platform.get().afterHandshake(sslSocket);
        }
        if (!success) {
            closeQuietly(sslSocket);
        }
    }
}

到这里,整个握手的阶段就已经完成了,也就是RealConnection创建成功,我们回到方法streamAllocation.newStream中,现在我们有了Connnection来初始化一个HttpCodec,来看看它的初始化过程,RealConnection通过newCodec来进行初始化HttpCodec:

 public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    //根据协议版本的不同,创建不同的HttpCodec
    if (http2Connection != null) {
        return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
        socket.setSoTimeout(client.readTimeoutMillis());
        source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
        sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
        return new Http1Codec(client, streamAllocation, source, sink);
    }
}

还记得我们getResponseWithInterceptorChain方法中创建的第一个RealInterceptorChain,那时大多数参数还是null,拦截链执行到这儿,总算是把参数都凑齐了,我们继续回到ConnectInterceptor的intercept方法中,由于隔得远,这里干脆再贴下代码:

@Override
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

终于是来到了拦截链的最后一环,这一次的proceed将对应着会调用到CallServerInterceptor的intercept方法:

@Override
public Response intercept(Chain chain) throws IOException {
    HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();

    long sentRequestMillis = System.currentTimeMillis();
    //写入请求头
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
        // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
        // Continue" response before transmitting the request body. If we don't get that, return what
        // we did get (such as a 4xx response) without ever transmitting the request body.
        //100-continue用于客户端在发送POST数据给服务器前,征询服务器情况,看服务器是否处
        // 理POST的数据,如果不处理,客户端则不上传POST数据,如果处理,则POST上传数据。在现实应
        // 用中,通过在POST大数据时,才会使用100-continue协议
        if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            responseBuilder = httpCodec.readResponseHeaders(true);
        }

        // Write the request body, unless an "Expect: 100-continue" expectation failed.
        //如果没有100-continue头,那么会执行这里
        if (responseBuilder == null) {
            Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
        }
    }
    //到这里 整个request数据就写进去了,下面这句代码flush了一下
    httpCodec.finishRequest();

    if (responseBuilder == null) {
        //这里builder会记录下这次响应的头信息
        responseBuilder = httpCodec.readResponseHeaders(false);
    }
    //创建一个响应,依次为绑定request,获取握手的session数据,请求发出去时间,接收到响应时间
    Response response = responseBuilder.request(request)
            .handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis()).build();

    int code = response.code();
    if (forWebSocket && code == 101) {
        //切换协议,这里需要确保拿到的不是空的body
        response = response.newBuilder().body(Util.EMPTY_RESPONSE).build();
    } else {
        //注意这里responsebody是一个输入流,意味着可能是大文件
        response = response.newBuilder().body(httpCodec.openResponseBody(response)).build();
    }
    //响应头包含close,标识关闭,此时可以复用Connection
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
        streamAllocation.noNewStreams();
    }
    //响应成功,标识无内容,但是又明明有内容
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
        throw new ProtocolException(
                "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
}

代码不是很复杂,主要是一些读流写流的操作罢了,如果以上环节都没有出现问题,那么这个Response会一路return到拦截链的开始,继而通过方法getResponseWithInterceptorChain()返回,从而我们在AsyncCall中的回调得到了执行,自此,整个Interceptor的流程就结束了。我们将整个流程作出草图:

整个设计是非常优雅的,可以说是责任链模式的完美体现,又有点类似AOP设计思想。不但将从Request获取Reponse整个过程从RealCall中剥离出来,而且分层设计,每个拦截器均有可能完成这一工作,每个拦截器自行决定是自己完成整个工作,还是完成部分,然后交由下一个拦截器来完成,各个拦截器之间代码独立,各司其职,耦合度极低,即使需要替换某一个拦截器,也完全不会影响到其他拦截器的运行,有点类似于工厂的流水线。
责任链模式在Android中的体现即触摸事件的传递机制,这里我们不再展开,下面说说AOP,面向切面编程的思想。

面向切面编程(AOP是Aspect Oriented Program的首字母缩写) ,我们知道,面向对象的特点是继承、多态和封装。而封装就要求将功能分散到不同的对象中去,这在软件设计中往往称为职责分配。实际上也就是说,让不同的类设计不同的方法。这样代码就分散到一个个的类中去了。这样做的好处是降低了代码的复杂程度,使类可重用。 但是人们也发现,在分散代码的同时,也增加了代码的重复性。什么意思呢?比如说,我们在两个类中,可能都需要在每个方法中做日志。按面向对象的设计方法,我们就必须在两个类的方法中都加入日志的内容。也许他们是完全相同的,但就是因为面向对象的设计让类与类之间无法联系,而不能将这些重复的代码统一起来。 也许有人会说,那好办啊,我们可以将这段代码写在一个独立的类独立的方法里,然后再在这两个类中调用。但是,这样一来,这两个类跟我们上面提到的独立的类就有耦合了,它的改变会影响这两个类。那么,有没有什么办法,能让我们在需要的时候,随意地加入代码呢?这种在运行时,动态地将代码切入到类的指定方法、指定位置上的编程思想就是面向切面的编程。 (注:本段文字来自知乎,链接https://www.zhihu.com/question/24863332/answer/48376158,著作权归作者所有)

虽然真正的AOP是与动态代理密不可分的,但为什么说这里类似AOP的设计思想呢?从上述的拦截链,我们可以发现,Okhttp提供了两个位置供开发者去自定义拦截器,一个是拦截链的开头,一个是与服务器数据交互之前,为什么不是其他位置呢?事实上,Okhttp将整个拦截链流程,分为两个切面,连接层与通讯层,从而,开发者自定义拦截器的地方恰好就是连接点:


虽然在使用Okhttp时,我们一般比较少去对拦截器进行自定义,但是可以看到,Okhttp早就为这种扩展预留好了接口,事实上,包括java的线程池、android的asynctask对aop的设计思路均有一定体现,从而做到了以不变应万变,极大地提高了程序的扩展性,在Okhttp的wiki上可以找到这样一段话:
Since making this change we’ve been able to simplify OkHttp’s internals substantially. The code is faster and easier to understand: the whole thing is just a stack of built-in interceptors.

下面贴两个Okhttp wiki上的两个自定义的Interceptor,打印Log:

class LoggingInterceptor implements Interceptor {

    @Override public Response intercept(Interceptor.Chain chain) throws IOException {
        Request request = chain.request();

        long t1 = System.nanoTime();
        logger.info(String.format("Sending request %s on %s%n%s",
        request.url(), chain.connection(), request.headers()));

        Response response = chain.proceed(request);

        long t2 = System.nanoTime();
        logger.info(String.format("Received response for %s in %.1fms%n%s",
        response.request().url(), (t2 - t1) / 1e6d, response.headers()));

        return response;
    }
}

Gzip压缩:

final class GzipRequestInterceptor implements Interceptor {

    @Override public Response intercept(Interceptor.Chain chain) throws IOException {
        Request originalRequest = chain.request();
        if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) {
            return chain.proceed(originalRequest);
        }

        Request compressedRequest = originalRequest.newBuilder()
        .header("Content-Encoding", "gzip")
        .method(originalRequest.method(), gzip(originalRequest.body()))
        .build();
        return chain.proceed(compressedRequest);
    }

    private RequestBody gzip(final RequestBody body) {
        return new RequestBody() {
            
            @Override public MediaType contentType() {
                 return body.contentType();
            }

            @Override public long contentLength() {
                return -1; // We don't know the compressed length in advance!
            }

            @Override public void writeTo(BufferedSink sink) throws IOException {
                BufferedSink gzipSink = Okio.buffer(new GzipSink(sink));
                body.writeTo(gzipSink);
                gzipSink.close();
            }
        };
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容