版本号:3.13.1
一.基本使用
//1.创建OkHttpClient对象
val okHttpClient = OkHttpClient.Builder().readTimeout(5,TimeUnit.SECONDS).build()
//2.创建Request对象
val request = Request.Builder().url("www.baidu.com").build()
//3.通过OkHttpClient将Request封装成Call对象
val call = okHttpClient.newCall(request)
//通过Call执行请求
//同步请求
val response = call.execute()
Log.d("okhttp",response.body().toString())
//异步请求
call.enqueue(object :Callback{
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
Log.d("okhttp",response.body().toString())
}
})
Call可以理解为Request和Response之间的桥梁,Http请求过程中可能有重定向和重试等操作,你的一个简单请求可能会产生多个请求和响应。OkHttp使用Call这一概念对此来建模:不论为了满足你的请求任务,中间做了多少次请求和响应,都算作一个Call。
二.源码分析
1.创建对象源码分析
不管是同步请求还是异步请求,都必须先创建OkHttpClient和Request对象,上面使用Build模式创建的,下面分别看一下各自的源码:
OkHttpClient.Builder().
public Builder() {
//任务分发器
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
//连接池
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
callTimeout = 0;
//java7以后在数字中可以使用下划线,只是增加阅读性,没其他作用
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
Request.Builder()
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
build()方法,都是在该方法中创建各自的对象,在构造方法中将当前的build对象传入,然后把对应的属性值赋。下面以Request为例:
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
//Builder模式
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tags = Util.immutableMap(builder.tags);
}
不管同步请求还是异步请求,都是调用Call的方法执行的,下面看一下Call对象的创建okHttpClient.newCall(request)
:
@Override public Call newCall(Request request) {
//Call是一个接口,RealCall是它的实现类
return RealCall.newRealCall(this, request, false /* for web socket */);
}
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
//创建RealCall对象,将client和request传入
RealCall call = new RealCall(client, originalRequest, forWebSocket);
//设置监听器
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
//重定向拦截器
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client);
this.timeout = new AsyncTimeout() {
@Override protected void timedOut() {
cancel();
}
};
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}
从上面代码中可以看到Call的实现类为RealCall,它持有client和request。
上面创建对象的源码已经分析完了,下面就看一下具体请求的方法。
2.同步请求:call.execute()
@Override public Response execute() throws IOException {
synchronized (this) {
//同一个请求执行执行一遍,否则跑出异常
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
......
//当执行的请求开始的时候,回调监听中的方法
eventListener.callStart(this);
try {
//真正的请求是dispatcher.executed
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
}......
finally {
//执行完成以后从对列中移除请求
client.dispatcher().finished(this);
}
}
public Dispatcher dispatcher() {
return dispatcher;
}
//dispatcher.executed
synchronized void executed(RealCall call) {
//将call加入到同步请求对列中
runningSyncCalls.add(call);
}
public final class Dispatcher {
......
//异步就绪对列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//异步执行对列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//同步执行对列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
......
}
同步请求调用
realCall.executed
方法,在该方法中调用dispatcher.executed
将realCall添加到同步运行对列中runningSyncCalls
然后调用getResponseWithInterceptorChain
获取响应报文。
3.异步请求: call.enqueue
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
//当前的call(创建的call)只能执行一次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//封装成了AsyncCall,它就是一个Runable
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
void enqueue(AsyncCall call) {
synchronized (this) {
//添加到就绪对列
readyAsyncCalls.add(call);
......
}
}
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
//遍历就绪对列执行任务
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//如果请求大于最大的请求数 maxRequests = 64,不执行
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
//请求的host不能大于maxRequestsPerHost = 5
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
//没有大于最大请求数,添加到执行对列中
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
//循环执行对列,执行具体的请求
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//执行任务,传入线程池
asyncCall.executeOn(executorService());
}
return isRunning;
}
异步请求的时候,通过
dispatcher.enqueue
方法将call(封装成了Runable(AsyncCall,它是RealCall的的内部类))添加到就绪对列中,然后循环就绪对列,如果现在执行的任务数没有超过最大的请求数(64)就添加到执行对列中,然后执行asyncCall.executeOn(executorService());
。
public synchronized ExecutorService executorService() {
if (executorService == null) {
//最大的线程数为 Integer.MAX_VALUE,上面已经限制最大的请求数为64所以这里的数量不会超过64
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
//执行任务
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
executorService.execute(this);
就是执行AsyncCall中的run()
方法
AsyncCall继承NamedRunnable,没有重写run()
方法,直接调用父类的,在父类的run()
方法中调用了一个execute();
方法,该方法是一个抽象方法,需要子类实现,所以实际执行的是AsyncCall.execute()
public abstract class NamedRunnable implements Runnable {
......
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
下面就看一个AsyncCall.execute(),真正执行任务的方法:
//该方法在子线程中执行
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
//获取响应报文
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
}......
finally {
//调用finished方法,将该请求从请求对列中移除
client.dispatcher().finished(this);
}
}
}
从上面的代码分析我们可以得出:异步请求流程:
call.enqueue
->realCall.enqueue
->dispatcher.enqueue(AsyncCall call)
(AsyncCall本质就是一个Runable)在dispatcher.enqueue
方法中将call添加到就绪对列中,然后外遍历就绪对列,如果现在运行的任务没有超过最大请求数(64)就会把它加入到运行对列中runningAsyncCalls,然后调用asyncCall.executeOn(executorService())
(executorService()方法就是创建一个线程池),在executeOn
方法中会执行asyncCall,就是调用它的execute
方法。在该方法中真正的去执行任务,该方法是在子线程中执行的。
通过上面的分析我们可以得出大致的请求流程图如下:
不管是同步请求还是异步请求,都调用了dispatcher对应的方法,它里面维护了三个任务对列和一个线程池(用来执行异步请求),dispatcher维护着请求任务的添加和移除。
三.Okhttp中的拦截器
拦截器是Okhttp提供的一种强大的机制,它可以实现网络监听、请求以及响应重写、请求失败重试等功能。
Okhttp的拦截器分为两种:一种是应用拦截器(就是我们自定义的拦截器),另一种就是网络拦截器(是Okhttp内部提供给我们的拦截器,真正的网络请求就是通过这些网络拦截器来实现的)。
从上面的代码分析得出:不管是同步请求还是异步请求,最终都是通过getResponseWithInterceptorChain()
方法来获取Response的,该方法就是构建一个拦截器链。下面看一下该方法的代码:
//RealCall中的方法
Response getResponseWithInterceptorChain() throws IOException {
List<Interceptor> interceptors = new ArrayList<>();
//添加自定义的拦截器
interceptors.addAll(client.interceptors());
//添加okhttp提供给我们的网络拦截器
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
//创建一个拦截器链对象,然后将拦截器集合传入
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
创建好拦截器链以后调用了该对象的chain.proceed(originalRequest)
方法。该方法源码如下:
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
......
//又创建了一个拦截器链对象,注意此时传入的index = index + 1
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
//顺序获取拦截器,然后调用拦截器的intercept方法
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
......
return response;
}
在
chain.proceed
方法中调用了interceptor.intercept(next);
方法,并将新创建的拦截器链对象传入,此时index为index + 1,这样就构成了依次调用拦截器集合的所用拦截器的intercept
方法。在该方法中完成对应的功能以后,调用下一个拦截器的intercept
方法,并将处理后的Response返回给上一个拦截器。
1.RetryAndFollowUpInterceptor(重定向拦截器)
该拦截器的主要作用就是:负责请求的重定向操作以及请求失败后的重试机制。
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
//1.创建StreamAllocation 对象,该对象用于分配请求过程中的流
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
//重连次数
int followUpCount = 0;
Response priorResponse = null;
while (true) {
......
Response response;
boolean releaseConnection = true;
try {
//2.调用RealInterceptorChain的proceed方法进行网络请求
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
}......
// 叠加先前的响应
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp;
try {
//根据响应判断是否需要重新请求
followUp = followUpRequest(response, streamAllocation.route());
}......
if (followUp == null) {
//不需要重新请求,直接返回response,结束while循环
streamAllocation.release(true);
return response;
}
......
//需要重新请求,先判断重新请求的次数是否超过设置的最大值,MAX_FOLLOW_UPS = 20
if (++followUpCount > MAX_FOLLOW_UPS) {
//超过最大的重新请求次数,抛出异常
streamAllocation.release(true);
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
......
//重新请求
if (!sameConnection(response, followUp.url())) {
streamAllocation.release(false);
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
}......
request = followUp;
priorResponse = response;
}
}
从RetryAndFollowUpInterceptor.intercept
方法得出主要做了以下几件事:
- 1.创建StreamAllocation对象。
- 2.调用RealInterceptorChain的
proceed
方法进行网络请求,该方法就会调用下一个拦截器的intercept
方法,依次调用,获取对应的Response。
intercept
方法有些类似递归调用,这里是不同拦截器对象的intercept
方法,这样就从上到下形成了一个链。
- 3.根据异常结果或者响应结果判断是否要进行重新请求。
2.BridgeInterceptor(桥接拦截器)
该拦截器的作用主要就是处理请求和响应
在RetryAndFollowUpInterceptor拦截器中创建StreamAllocation对象以后,就会调用chain.proceed
方法进行网络请求,其实就是调用下一个拦截器的intercept
方法,RetryAndFollowUpInterceptor的下一个拦截就是BridgeInterceptor,下面看一下它的intercept
代码:
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
//1.为请求添加一些头信息
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//2.发送网络请求
Response networkResponse = chain.proceed(requestBuilder.build());
//3.解压响应数据,支持`gzip`,所以需要解压
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
从BridgeInterceptor.intercept
方法得出主要做了以下几件事:
- 1.将用户构建的Request转化为能够进行网络访问的请求(添加一些头信息,如:
Connection
、Accept-Encoding
、Host
等)。 - 2.将设置好的Request发送网络请求(调用
chan.proceed
)。 - 3.将请求返回的Response转化为用户可用的Response(可能使用gzip压缩,需要解压)。
3.CacheInterceptor(缓存拦截器)
该拦截器的作用主要就是处理数据的缓存
在BridgeInterceptor.intercept
方法中构建好Request后就发送请求,就会调用CacheInterceptor.intercept
方法,该方法的代码为:
@Override public Response intercept(Chain chain) throws IOException {
//如果设置了缓存就获取缓存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//获取缓存策略,里面维护着一个networkRequest和cacheResponse
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
//如果有缓存,跟新一下缓存的各项指标,主要是缓存命中率
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
//有缓存,但是对应的Response 为null即缓存不符合要求,关闭该缓存
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// 如果此时网络不可用,同时缓存不可用,抛出一个504的错误
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
//网络不可用,但是有缓存,直接返回缓存。
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
//没有缓存,但是网路可用,发起网络请求
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
if (cacheResponse != null) {
//如果网络请求返回的状态码为 HTTP_NOT_MODIFIED = 304,从缓存中获取数据
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
......
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
//如果请求可以缓存,就将网络请求后的数据添加到缓存
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
//如果请求方法缓存无效,从缓存中删除
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
从上面代码可以得出CacheInterceptor的处理逻辑为:
- 1.如果网络不可用,同时没有缓存,会返回一个504的响应码。
- 2.如果网络不可用,但是有缓存,直接返回缓存的数据。
- 3.如果网络可用,发送网络请求(调用
chain.proceed
)。- 如果有缓存并且网络请求返回的状态码为 HTTP_NOT_MODIFIED = 304,就从缓存中获取数据。
- 如果没有缓存,就返回网络请求的数据。此时如果设置了缓存,就将网络请求后的数据添加到缓存中(
cache.put(response)
)
缓存的管理使用的Cache类中响应的方法(get
、put
),我们先看一下怎么使用缓存:
val okHttpClient = OkHttpClient.Builder()
.readTimeout(5,TimeUnit.SECONDS)
.cache(Cache(File("cache"),24 * 1024 * 1024))//通过配置Cache对象
.build()
下面我们就看一下Cache这个类的源码:
@Nullable CacheRequest put(Response response) {
String requestMethod = response.request().method();
if (HttpMethod.invalidatesCache(response.request().method())) {
try {
remove(response.request());
} catch (IOException ignored) {
// The cache cannot be written.
}
return null;
}
//只缓存GET请求
if (!requestMethod.equals("GET")) {
return null;
}
//Vary的内容会作为当前缓存数据是否可以作为请求结果返回给客户端的判断
//Vary详解:https://blog.csdn.net/qq_29405933/article/details/84315254
if (HttpHeaders.hasVaryAll(response)) {
return null;
}
//缓存数据的实体对象
Entry entry = new Entry(response);
//使用磁盘缓存DiskLruCache来实现缓存功能
DiskLruCache.Editor editor = null;
try {
editor = cache.edit(key(response.request().url()));
if (editor == null) {
return null;
}
entry.writeTo(editor);
return new CacheRequestImpl(editor);
}......
}
从上述代码可以看出Okhttp中的缓存使用的是DiskLruCache。
4.ConnectInterceptor(连接拦截器)
该拦截器的作用主要就是建立与服务器的连接(Socket)
如果没有缓存,但是网络可用的情况下就会调用ConnectInterceptor.intercept
方法,下面看一下该方法的代码:
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//获取StreamAllocation对象,该对象是在RetryAndFollowUpInterceptor中实例化的
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//创建HttpCodec对象,该对象用于请求和响应的处理
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//创建用于网络IO的RealConnection对象
RealConnection connection = streamAllocation.connection();
//调用后面拦截的intercept方法并传递对应的参数,发起网络请求
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
下面重点看一下streamAllocation.newStream
方法:
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
......
try {
//获取一个RealConnection对象
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//通过RealConnection对象创建HttpCodec对象
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
findHealthyConnection
方法会调用findConnection
方法来获取一个RealConnection对象,代码如下:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
......
synchronized (connectionPool) {
......
if (this.connection != null) {
//如果能复用就复用,this代表的是StreamAllocation对象
result = this.connection;
releasedConnection = null;
}
......
}
......
不能复用从连接池中找
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.acquire(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
//连接池中没有就new一个
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
......
开启Socket连接
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
//connection将加入连接池中
Internal.instance.put(connectionPool, result);
......
}
.......
return result;
}
findConnection
方法做的主要就是:
- 1.如果StreamAllocation中的
connection
能复用就复用,不同复用的话就从连接池connectionPool中获取,如果连接池中没有就new一个,然后加入连接池中。 - 最终调用RealConnection的
connect
方法打开一个socket链接。
获取resultConnection对象后然后调用resultConnection.newCodec
来获取HttpCodec 对象
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
创建好RealConnection和HttpCodec对象以后,调用下一个拦截器的intercept
方法。下面我们最后一个拦截器:CallServerInterceptor
5.CallServerInterceptor(发起请求拦截器)
该拦截器的主要作用就是真正的向服务器写入请求数据和读取响应数据
该拦截器为Okhttp拦截器链的最后一个拦截器,该拦截器是真正向服务器发送请求和处理响应的地方。下面看一下它的intercept
方法代码:
@Override public Response intercept(Chain chain) throws IOException {
final RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
final HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
......
//向socket中写入header数据
httpCodec.writeRequestHeaders(request);
......
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
//如果请求头中有配置“Expect: 100-continue” ,直接读取响应头信息
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(call);
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
if (request.body() instanceof DuplexRequestBody) {
httpCodec.flushRequest();
CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, -1L));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
//向socket写入请求体
request.body().writeTo(bufferedRequestBody);
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(call);
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener().requestBodyEnd(call, requestBodyOut.successfulCount);
}
}
......
}
if (!(request.body() instanceof DuplexRequestBody)) {
//结束请求
httpCodec.finishRequest();
}
//读取响应信息
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(call);
responseBuilder = httpCodec.readResponseHeaders(false);
}
responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis());
Internal.instance.initCodec(responseBuilder, httpCodec);
Response response = responseBuilder.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis());
Internal.instance.initCodec(responseBuilder, httpCodec);
response = responseBuilder.build();
code = response.code();
}
realChain.eventListener().responseHeadersEnd(call, response);
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
//回去响应体数据
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
......
return response;
}
从上面代码可以看出CallServerInterceptor主要就是向Socket中写入请求信息,然后读取响应信息,最后构建Response对象并返回给上一个拦截器。
总结
至此Okhttp的关键代码已经分析完毕,我们可以得出Okhttp的一次请求过程大概是:
- 1.将请求封装成Call对象。
- 2.通过Dispatcher对请求进行分发。
- 3.调用RealCall对象的
getResponseWithInterceptorChain
方法获取Response - 4.
getResponseWithInterceptorChain
方法中会依次调用拦截器:RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor、CallServerInterceptor的intercept
方法,完成对于请求信息的封装,发送和读取。
Kotlin项目实战
网络优化
HTTPDNS使用HTTP协议进行域名解析,代替现有基于UDP的DNS协议,域名解析请求直接发送到阿里云的HTTPDNS服务器,从而绕过运营商的Local DNS,能够避免Local DNS造成的域名劫持问题和调度不精准问题。