Okhttp设计与源码解析
本文针对OkHttp的设计及源码作一下解析:
**Okhttp如何做到请求的控制?
Okhttp核心设计Interceptor
Okhttp如何支持https? **
1.核心类库、简单使用、主要流程
(此图片来自网络,作者不详)
从上述UML类图可以看出,Okhttp在设计时采用的门面模式,将整个系统的复杂性给隐藏起来,将子系统接口通过一个客户端OkHttpClient统一暴露出来,使用户与内部系统之间达到解构的目的,这种方式在很多开源框架上都有所体现,像Glide、Picaso、ImageLoader、EventBus之类的,都有所体现。
Okhttp使用上和HttpClient基本一致,没啥好说的。下面是基本的get异步请求:
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url("http://www.baidu.com").build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.i("ggg",e.getMessage());
}
@Override
public void onResponse(Call call,Response response) throws IOException {
// NOT UI Thread
if (response.isSuccessful()) {
Log.i("ggg",response.code()+"");
Log.i("ggg",response.headers()+"");
Log.i("ggg",response.body().string());
}
response.close();
}
});
有两点是需要注意的:一是Okhttp建议使用一个OkHttpClient,二是Reponse使用完需要关闭。
Okhttp的基本加载流程如下:
上图中的拦截器即Interceptor,后面会着重讲。
2.请求的分发处理,如何做到对请求数量控制前提下做到高并发、低阻塞?
想到并发必然想到多线程,继而就自然想到线程池,通常我们在使用线程池的时候,通过ThreadPoolExecutor构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
来创建线程池,那么要想做到高并发低阻塞,就意味着maximumPoolSize的值需要比较大,Okhttp在构造线程池时,使用了如下参数:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看到,maximumPoolSize基本没有限制大小,工作队列上,采用了SynchronousQueue,这是一个没有容量的队列。当执行enqueue的时候,事实上,Okhttp并没有直接将任务交给线程池来执行,而是通过讲任务调度交给了Dispatcher这个类似代理服务器,来进行调度,这样才做到了在控制同一个host请求数量的前提下,达到高并发低阻塞,我们来看看enqueue的实现:
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed)
throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
当调用Realcall的enqueue时,会重新创建一个AsyncCall,AsyncCall是RealCall的一个内部类,因此可以看作是RealCall的一个超集。然后交给了Dispatcher来进行调度:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
这段代码逻辑很清晰,如果正在执行请求与同一个Host请求没有达到上限,那么将该次请求加入到正在执行集合中,并交由线程池立刻执行,否则讲该次请求加入到等待队列中,然后我们继续看AsyncCall的执行代码:
@Override
protected void execute() {
//这个标记用于确保回调只得到一次执行
boolean signalledCallback = false;
try {
//这句代码会通过拦截链来获取响应
Response response = getResponseWithInterceptorChain();
//请求被取消
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//无论执行过程发生了什么,通知Dispatcher该次请求已执行完毕
client.dispatcher().finished(this);
}
}
再看看finished方法:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call))
throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls)
//异步请求会执行这里
promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//如果没有正在执行的请求并且设置了空闲回调,那么会执行此分发器的空闲回掉
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
如果是异步的请求,那么当这个请求走到finally中的finished方法时,会调用promoteCalls:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests)
return; // Already running max capacity.
if (readyAsyncCalls.isEmpty())
return; // No ready calls to promote.
//注意这里的等待请求是一个先进先出的队列结构
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext();) {
AsyncCall call = i.next();
//如果主机请求限制没到上限就会取出执行
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//一直遍历到达到总数上限为止
if (runningAsyncCalls.size() >= maxRequests)
return; // Reached max capacity.
}
}
也就是说,当一个请求结束时,会马上使用该线程继续执行等待中的请求,避免了线程的闲置与浪费,这里,finally的使用是一个亮点,这在AsyncCall的父类NamedRunable中,同样有所体现:
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override
public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
这个类实现了,在该线程执行该Runable时,线程名字与该Runable一致。
3.Okhttp的核心设计,Interceptor
不管是同步还是异步的请求,最终都会走到方法getResponseWithInterceptorChain来获得Reponse,从字面意思,这个方法可以理解为根据拦截链获得Reponse,后面我们会说到怎么自定义拦截器:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//如果使用时我们没有自定义过拦截器,那么这里addAll的集合其实是空的
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//如果使用时我们没有自定义过拦截器,那么这里addAll的集合其实是空的
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
//这里构造方法接收五个参数,目前除了拦截器,当前拦截器位置,初始的请求,其它为null。
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest);
return chain.proceed(originalRequest);
}
这里代码最终创建了一个RealInterceptorChain,即拦截链,并调用了其proceed方法,来获取Reponse,继续跟进去:
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size())
throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException(
"network interceptor " + interceptor + " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}
这段代码比较长,但是核心代码其实就三行:
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
创建一个拦截链,将其index+1,并拿到当前的拦截器,然后调用拦截器的intercept方法,根据我们前面的getResponseWithInterceptorChain方法可以知道,这第一个应该是retryAndFollowUpInterceptor(前提是没有自定义拦截器),即用来作重试与重定向处理的拦截器:
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
//如果上面这句代码执行时没有发生异常,那么标记置为false,即连接是正常可用的
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
//如果无法恢复连接 直接抛异常结束循环
throw e.getLastConnectException();
}
//可恢复,标记置为false,即连接是正常可用的,不需要释放
releaseConnection = false;
//循环继续,即重试
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request))
throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
//最终如果连接不可用,那么需要释放掉,因为catch块抛出了异常,这里需要在finally处理
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
//如果有重定向之前的reponse
response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build())
.build();
}
Request followUp = followUpRequest(response);
if (followUp == null) {
//如果没有重定向请求
if (!forWebSocket) {
streamAllocation.release();
}
//大部分情况下,会以返回response结束
return response;
}
closeQuietly(response.body());
//重定向
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
//
request = followUp;
priorResponse = response;
}
}
方法也很长,但是流程比较简单,其中有一个StreamAllocation比较重要,但在这里我们先不讲。在try语块中,重新又回到了RealInterceptorChain的proceed方法之中,这个方法执行完毕后,会根据结果做不同处理,如失败、返回Reponse、重试、重定向。值得注意的是,这里的RealInterceptorChain是我们在之前new出来的,index为加1之后的,对应的Interceptor为BridgeInterceptor:
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);
if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder().removeAll("Content-Encoding")
.removeAll("Content-Length").build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
代码虽长,但是很简明,主要是对请求的header、body与响应的header、body做了一些处理,然后继续调用拦截链的proceed方法来处理对request处理后的request,这一次,我们来到了CacheInterceptor:
@Override
public Response intercept(Chain chain) throws IOException {
//拿到该request的缓存Response
Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;
long now = System.currentTimeMillis();
//CacheStrategy即决定了发网络请求还是使用缓存
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
//如果没有缓存,或者缓存无效,并且请求头没有设置“Cache-Control: only-if-cached”,networkRequest则不为null
Request networkRequest = strategy.networkRequest;
//如果缓存存在或者有效,那么cacheResponse不为null
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
//统计被请求、发出网络请求、使用缓存的数目
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
//缓存虽存在,但是已经失效了,关闭资源
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
//直接504超时
return new Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(504)
.message("Unsatisfiable Request (only-if-cached)").body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build();
}
// If we don't need the network, we're done.
if (networkRequest == null) {
//缓存有效,自此,整个拦截链调用就结束了,这里就会回到我们之前的RetryAndFollowUpInterceptor的循环中去了
return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
}
Response networkResponse = null;
try {
//缓存无效,那么继续走我们的拦截链
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
//如果拦截链继续走执行完返回的header有not modified,那么取缓存更新并返回
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//如果是新数据,做缓存处理后返回
Response response = networkResponse.newBuilder().cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse)).build();
if (HttpHeaders.hasBody(response)) {
CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
response = cacheWritingResponse(cacheRequest, response);
}
return response;
}
这里注释比较详细,根据注释就能明白整个缓存处理的流程,我们继续走拦截链,当没有缓存或者缓存已经失效时,上述代码会执行:
networkResponse = chain.proceed(networkRequest);
继续我们的拦截链,这一次,来到了ConnectInterceptor:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
终于有个比较短的intercept了,但其实这里代码确是最复杂的。注意上述代码中,从RealInterceptorChain中拿到了一个StreamAllocation,这个我们之前没有提及,事实上,可以将它理解为一个协调器,用来协调connection、stream、call三者之间的关系,在http/2中每个connection可能对应多个stream,而同一个call可能也对应着多个stream。它是在第一个拦截器,即retryAndFollowUpInterceptor被初始化的,我们先看看它时如何初始化的:
public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.routeSelector = new RouteSelector(address, routeDatabase());
this.callStackTrace = callStackTrace;
}
其初始化需要一个连接池、物理地址、路由选择器,新的类比较多,我们用到的时候再解释每个是干嘛用的,这里我们只看看Address这个参数是怎么获取的:
private Address createAddress(HttpUrl url) {
SSLSocketFactory sslSocketFactory = null;
HostnameVerifier hostnameVerifier = null;
CertificatePinner certificatePinner = null;
if (url.isHttps()) {
//如果是https,那么还需要初始化sslsocket、主机认证、证书编解码相关类
sslSocketFactory = client.sslSocketFactory();
hostnameVerifier = client.hostnameVerifier();
certificatePinner = client.certificatePinner();
}
return new Address(url.host(), url.port(), client.dns(), client.socketFactory(), sslSocketFactory,
hostnameVerifier, certificatePinner, client.proxyAuthenticator(), client.proxy(),
client.protocols(), client.connectionSpecs(), client.proxySelector());
}
回到ConnectInterceptor的intercept方法:
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
这句代码通过调用streamAllocation的newStream拿到了一个HttpCodec,HttpCodec实质上就是一个辅助类,主要负责在Socket连接中,遵循http的规范来进行流的输入输出,其内部是通过OK IO这个jar包中的相关API实现流的读写操作,OK IO我们就不去详细介绍了,内部通过java nio相关api实现的。只要记住,凡是继承sink的就是输出流,继承source的就是输入流即可,HttpCodec有两个子类,Http1Codec 与Http2Codec,分别对应着http/1.x和http/2.x,我们来看看newStream的具体实现:
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
这里的关键代码在于如何拿到一个RealConnection,ReConnection实质上是一个对Socket连接的封装类,与Sdk的HttpUrlConnection功能上是差不多的,最终获取RealConnection的代码走到了方法:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released)
throw new IllegalStateException("released");
if (codec != null)
throw new IllegalStateException("codec != null");
if (canceled)
throw new IOException("Canceled");
// 尝试去获取之前已经获取的connection
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
//如果这个连接存在并且没有新的请求直接拿来用
return allocatedConnection;
}
// 尝试从连接池中获取连接
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
//这里阻塞去获取能用的服务器,如果遍历完没有可用的服务器,会抛出异常结束程序
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
//涉及到连接池的读写,加上同步
synchronized (connectionPool) {
if (canceled)
throw new IOException("Canceled");
//获取新的服务器地址之后,再次尝试去连接池中找到连接来复用
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null)
return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
//这个方法将创建的connection赋值给成员变量connection
acquire(result);
}
//这里释放了锁
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
//涉及到连接池的读写这里又锁上了
synchronized (connectionPool) {
// 将连接存入连接池
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
//这里意思是如果同时有一个http/2.0的connection存在,那么我们废弃刚刚创建的那个改用那个连接
//之所以会存在同时创建的connection,注意到这段代码中间握手的代码并没有加上同步,因为它是阻塞代码
//那么很可能我们这条线程在执行握手的时候,别的线程也在握手中,并且先于我们走到到将连接放入连接池
// 的代码,之后我们再进来,就可能存在指向同一个地址的连接。
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
//关闭socket的代码放在同步外面,因为这句代码没有必要同步,加快释放锁的时间
closeQuietly(socket);
return result;
}
这个方法代码比较复杂,所以注释的比较详细,我们再来看看RealConnection的握手过程
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) {
//协议已经确定
if (protocol != null)
throw new IllegalStateException("already connected");
RouteException routeException = null;
//传输层协议(tls)与加密套件,即OkHttpClient的默认数组中的
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
//创建一个传输协议选择器
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
//说明不是https
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(
new UnknownServiceException("CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
//没看懂,因为isCleartextTrafficPermitted永远返回的true
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
while (true) {
try {
//如果是https而又是通过http代理访问的,最终都会走到connnectSocket
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
connectSocket(connectTimeout, readTimeout);
}
establishProtocol(connectionSpecSelector);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
最终,代码走到了connectSock中真正建立了连接:
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket() : new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
//这里,便真正地建立了连接,Platform用于平台适配
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
这里主要做了三件事,创建了Socket、拿到了Socket建立连接后的输入流、输出流、有了这三者,和服务器的通讯就建立起来了,然后便通过establishProtocol(connectionSpecSelector),建立起与服务器进行通讯的协议:
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
if (route.address().sslSocketFactory() == null) {
//都不是https,那只能1.x了
protocol = Protocol.HTTP_1_1;
socket = rawSocket;
return;
}
connectTls(connectionSpecSelector);
//如果支持http2.0协议,那么创建一个http2Connection,并开启,这里实际是开启一个线程去不断地进行读操作
if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink).listener(this).build();
http2Connection.start();
}
}
我们再看看connectTls:
private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
Address address = route.address();
SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
boolean success = false;
SSLSocket sslSocket = null;
try {
// Create the wrapper over the connected socket.
sslSocket = (SSLSocket) sslSocketFactory.createSocket(rawSocket, address.url().host(),
address.url().port(), true /* autoClose */);
// Configure the socket's ciphers, TLS versions, and extensions.
//这里是去验证服务器握手传递过来的数据,包括公钥,传输协议版本等信息
ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
if (connectionSpec.supportsTlsExtensions()) {
Platform.get().configureTlsExtensions(sslSocket, address.url().host(), address.protocols());
}
// Force handshake. This can throw!
sslSocket.startHandshake();
//通过握手,拿到Session,并将传输协议版本,支持的解密套件,及SSL证书解析出来
Handshake unverifiedHandshake = Handshake.get(sslSocket.getSession());
// Verify that the socket's certificates are acceptable for the target host.
if (!address.hostnameVerifier().verify(address.url().host(), sslSocket.getSession())) {
X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
throw new SSLPeerUnverifiedException(
"Hostname " + address.url().host() + " not verified:" + "\n certificate: "
+ CertificatePinner.pin(cert) + "\n DN: " + cert.getSubjectDN().getName()
+ "\n subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
}
// Check that the certificate pinner is satisfied by the certificates presented.
//检查该证书是否有效,注意这里如果没有检查没通过将会抛出AssertException
//结束程序,因此,访问自签名证书且没有将证书配置到OkHttpClient的https网站,将会导致程序crash
address.certificatePinner().check(address.url().host(), unverifiedHandshake.peerCertificates());
//到这里证书验证通过
// Success! Save the handshake and the ALPN protocol.
String maybeProtocol = connectionSpec.supportsTlsExtensions()
? Platform.get().getSelectedProtocol(sslSocket) : null;
socket = sslSocket;
source = Okio.buffer(Okio.source(socket));
sink = Okio.buffer(Okio.sink(socket));
handshake = unverifiedHandshake;
//拿到服务器支持的协议
protocol = maybeProtocol != null ? Protocol.get(maybeProtocol) : Protocol.HTTP_1_1;
success = true;
} catch (AssertionError e) {
if (Util.isAndroidGetsocknameError(e))
throw new IOException(e);
throw e;
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket);
}
if (!success) {
closeQuietly(sslSocket);
}
}
}
到这里,整个握手的阶段就已经完成了,也就是RealConnection创建成功,我们回到方法streamAllocation.newStream中,现在我们有了Connnection来初始化一个HttpCodec,来看看它的初始化过程,RealConnection通过newCodec来进行初始化HttpCodec:
public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
//根据协议版本的不同,创建不同的HttpCodec
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
还记得我们getResponseWithInterceptorChain方法中创建的第一个RealInterceptorChain,那时大多数参数还是null,拦截链执行到这儿,总算是把参数都凑齐了,我们继续回到ConnectInterceptor的intercept方法中,由于隔得远,这里干脆再贴下代码:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
终于是来到了拦截链的最后一环,这一次的proceed将对应着会调用到CallServerInterceptor的intercept方法:
@Override
public Response intercept(Chain chain) throws IOException {
HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
long sentRequestMillis = System.currentTimeMillis();
//写入请求头
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
//100-continue用于客户端在发送POST数据给服务器前,征询服务器情况,看服务器是否处
// 理POST的数据,如果不处理,客户端则不上传POST数据,如果处理,则POST上传数据。在现实应
// 用中,通过在POST大数据时,才会使用100-continue协议
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
// Write the request body, unless an "Expect: 100-continue" expectation failed.
//如果没有100-continue头,那么会执行这里
if (responseBuilder == null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
}
//到这里 整个request数据就写进去了,下面这句代码flush了一下
httpCodec.finishRequest();
if (responseBuilder == null) {
//这里builder会记录下这次响应的头信息
responseBuilder = httpCodec.readResponseHeaders(false);
}
//创建一个响应,依次为绑定request,获取握手的session数据,请求发出去时间,接收到响应时间
Response response = responseBuilder.request(request)
.handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis()).build();
int code = response.code();
if (forWebSocket && code == 101) {
//切换协议,这里需要确保拿到的不是空的body
response = response.newBuilder().body(Util.EMPTY_RESPONSE).build();
} else {
//注意这里responsebody是一个输入流,意味着可能是大文件
response = response.newBuilder().body(httpCodec.openResponseBody(response)).build();
}
//响应头包含close,标识关闭,此时可以复用Connection
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
//响应成功,标识无内容,但是又明明有内容
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
代码不是很复杂,主要是一些读流写流的操作罢了,如果以上环节都没有出现问题,那么这个Response会一路return到拦截链的开始,继而通过方法getResponseWithInterceptorChain()返回,从而我们在AsyncCall中的回调得到了执行,自此,整个Interceptor的流程就结束了。我们将整个流程作出草图:
整个设计是非常优雅的,可以说是责任链模式的完美体现,又有点类似AOP设计思想。不但将从Request获取Reponse整个过程从RealCall中剥离出来,而且分层设计,每个拦截器均有可能完成这一工作,每个拦截器自行决定是自己完成整个工作,还是完成部分,然后交由下一个拦截器来完成,各个拦截器之间代码独立,各司其职,耦合度极低,即使需要替换某一个拦截器,也完全不会影响到其他拦截器的运行,有点类似于工厂的流水线。
责任链模式在Android中的体现即触摸事件的传递机制,这里我们不再展开,下面说说AOP,面向切面编程的思想。
面向切面编程(AOP是Aspect Oriented Program的首字母缩写) ,我们知道,面向对象的特点是继承、多态和封装。而封装就要求将功能分散到不同的对象中去,这在软件设计中往往称为职责分配。实际上也就是说,让不同的类设计不同的方法。这样代码就分散到一个个的类中去了。这样做的好处是降低了代码的复杂程度,使类可重用。 但是人们也发现,在分散代码的同时,也增加了代码的重复性。什么意思呢?比如说,我们在两个类中,可能都需要在每个方法中做日志。按面向对象的设计方法,我们就必须在两个类的方法中都加入日志的内容。也许他们是完全相同的,但就是因为面向对象的设计让类与类之间无法联系,而不能将这些重复的代码统一起来。 也许有人会说,那好办啊,我们可以将这段代码写在一个独立的类独立的方法里,然后再在这两个类中调用。但是,这样一来,这两个类跟我们上面提到的独立的类就有耦合了,它的改变会影响这两个类。那么,有没有什么办法,能让我们在需要的时候,随意地加入代码呢?这种在运行时,动态地将代码切入到类的指定方法、指定位置上的编程思想就是面向切面的编程。 (注:本段文字来自知乎,链接https://www.zhihu.com/question/24863332/answer/48376158,著作权归作者所有)
虽然真正的AOP是与动态代理密不可分的,但为什么说这里类似AOP的设计思想呢?从上述的拦截链,我们可以发现,Okhttp提供了两个位置供开发者去自定义拦截器,一个是拦截链的开头,一个是与服务器数据交互之前,为什么不是其他位置呢?事实上,Okhttp将整个拦截链流程,分为两个切面,连接层与通讯层,从而,开发者自定义拦截器的地方恰好就是连接点:
虽然在使用Okhttp时,我们一般比较少去对拦截器进行自定义,但是可以看到,Okhttp早就为这种扩展预留好了接口,事实上,包括java的线程池、android的asynctask对aop的设计思路均有一定体现,从而做到了以不变应万变,极大地提高了程序的扩展性,在Okhttp的wiki上可以找到这样一段话:
Since making this change we’ve been able to simplify OkHttp’s internals substantially. The code is faster and easier to understand: the whole thing is just a stack of built-in interceptors.
下面贴两个Okhttp wiki上的两个自定义的Interceptor,打印Log:
class LoggingInterceptor implements Interceptor {
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
Request request = chain.request();
long t1 = System.nanoTime();
logger.info(String.format("Sending request %s on %s%n%s",
request.url(), chain.connection(), request.headers()));
Response response = chain.proceed(request);
long t2 = System.nanoTime();
logger.info(String.format("Received response for %s in %.1fms%n%s",
response.request().url(), (t2 - t1) / 1e6d, response.headers()));
return response;
}
}
Gzip压缩:
final class GzipRequestInterceptor implements Interceptor {
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
Request originalRequest = chain.request();
if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) {
return chain.proceed(originalRequest);
}
Request compressedRequest = originalRequest.newBuilder()
.header("Content-Encoding", "gzip")
.method(originalRequest.method(), gzip(originalRequest.body()))
.build();
return chain.proceed(compressedRequest);
}
private RequestBody gzip(final RequestBody body) {
return new RequestBody() {
@Override public MediaType contentType() {
return body.contentType();
}
@Override public long contentLength() {
return -1; // We don't know the compressed length in advance!
}
@Override public void writeTo(BufferedSink sink) throws IOException {
BufferedSink gzipSink = Okio.buffer(new GzipSink(sink));
body.writeTo(gzipSink);
gzipSink.close();
}
};
}
}