- 网上已经有了相等多的分析博客,但终归是别人的知识点,倒不如自己走一遍流程,如果你看到了这篇博客,最好自己跟着思路对照源码过一遍哦!
Retrofit源码分析
Retrofit的构建
- 在我们开发工作中使用Retrofit一般建立一个单例模式,避免每次请求都创建,Retrofit只是对OkHttp的封装,主要用了动态代理,并不涉及网络请求哦!
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(apiUrl) //网络头
.client(getHttpClient()) //自己构建的okHttp对象
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) //增加对Rxjava的支持,这里也是可以同时添加多个的支持
.addConverterFactory(FastJsonConverterFactory.create()) //增加对阿里java对象转化支持,这里可以同时添加多个比如Gson等
.build(); //开始构建Retrofit对象
return retrofit;
- 注意: 这里可以通过client设置Okhttp对象,如果你没有设置,系统也会给你提供的,稍后分析,一般都需要自定义实现自定义拦截器功能,用于设置headers添加信息,日志打印,网络监听等操作,这些都是OkHttp做的事情,跟Retrofit无关
- 查看Retrofit的Build中各个参数的含义: 构建者模式创建
private final Platform platform; //那个平台java/Android
private @Nullable okhttp3.Call.Factory callFactory; //okhttp网络请求,通过client传入
private HttpUrl baseUrl; //请求头
private final List<Converter.Factory> converterFactories = new ArrayList<>(); //转换集合,用于requestBody转java对象
private final List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(); //适配器,一般是Rxjava支持链式调用
private @Nullable Executor callbackExecutor; //callBack的返回线程,对于Android来说通过MainHandler发送到UI线程的,默认的call返回在UI线程
private boolean validateEagerly; ////是否需要立即生效
- 记住Build中各个参数的含义,通过build()方法构建一个Retrofit对象
- 将通过Build添加的各种工厂实现类作为Retrofit对象的参数,构造一个Retrofit对象
public Retrofit build() {
if (baseUrl == null) {
throw new IllegalStateException("Base URL required.");
}
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) { //如果没有创建OkHttpClient就新建,一般项目中都会自己创建用于拦截添加header等操作
callFactory = new OkHttpClient();
}
Executor callbackExecutor = this.callbackExecutor;
if (callbackExecutor == null) { //没有指定callBack线程池Android默认MainThreadExecutor
callbackExecutor = platform.defaultCallbackExecutor();
}
// 将RxJava2转化同原来Android默认转化添加到集合中,如果当前使用的是RxJava2的返回Observe对象,则使用RxJava2的线程池返回,
//如果返回的是Call<T>对象,则使用默认的UI线程运行返回数据
List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
callAdapterFactories.add(platform.defaultCallAdapterFactory(callbackExecutor));
// 返回值的适配器,使用谷歌的Gson,阿里的FastJson等,但是会加上默认的RequestBody直接返回
List<Converter.Factory> converterFactories =
new ArrayList<>(1 + this.converterFactories.size());
// Add the built-in converter factory first. This prevents overriding its behavior but also
// ensures correct behavior when using converters that consume all types.
converterFactories.add(new BuiltInConverters()); //返回的是RequestBody
converterFactories.addAll(this.converterFactories);
//通过构建器中的各个参数传递给Retrofit创建对象
return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
}
- 以上new Retrofit对象,其参数为:
Retrofit(okhttp3.Call.Factory callFactory, HttpUrl baseUrl,
List<Converter.Factory> converterFactories, List<CallAdapter.Factory> callAdapterFactories,
@Nullable Executor callbackExecutor, boolean validateEagerly) {
this.callFactory = callFactory;
this.baseUrl = baseUrl;
this.converterFactories = converterFactories; // Copy+unmodifiable at call site.
this.callAdapterFactories = callAdapterFactories; // Copy+unmodifiable at call site.
this.callbackExecutor = callbackExecutor;
this.validateEagerly = validateEagerly;
}
- 这里只是创建了一个Retrofit对象,而create()方法通过动态代理创建接口,仅当调用接口中的方法才会运行代理类中的invoke方法
mediaApi = RetrofitHelper.getInstance().getRetrofit(type).create(Class<T> a);
//create方法分析
public <T> T create(final Class<T> service) {
//当前service类是否是接口
Utils.validateServiceInterface(service);
if (validateEagerly) { //是否提前加载service中声明的方法,如果需要,就调用下面方法
//通过反射获取在service中声明的方法,并将所有方法缓存在serviceMethodCache集合中
如果不需要缓存,则会在调用方法的时候在加载
eagerlyValidateMethods(service);
}
//通过代理的方式实现http请求,返回一个代理对象.当通过代理对象调用我们声明方法时,会执行invocationHandler的invoke方法
//对于retrofit.create(ApiInfo.class);只是返回newProxyInstance返回的类,并没有调用下面的invoke方法,只有在每次调用接口中的get/post方法时才会执行invoke方法,这个时候的method就是调用的接口方法
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
//得到一个Android平台类型
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
throws Throwable {
// 方法是否来自Object,而不是接口的方法
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
//Android中返回false,永远不会执行
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
//这三句才是最重要的
//1. 开始解析目前正在执行的方法,就是我们在接口中调用的get/post方法,loadServiceMethod解析get/post中的annocation
ServiceMethod<Object, Object> serviceMethod =
(ServiceMethod<Object, Object>) loadServiceMethod(method);
//2. serviceMethod对象生成一个okHttpCall对象,serviceMethod中有网络的各种配置
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
//3. 返回一个对象,如果没使用rxJava,适配就是默认返回call<ResponseBody>对象,
//如果使用了Rxjava则拿到的是Observable对象,这里返回的对象就是调用接口方法的返回值,通过上方配置不同的适配器返回不同的数据类型call/Observable
return serviceMethod.adapt(okHttpCall);
}
});
}
Retrofit的方法注解解析
- 通过以上分析可知,当create接口类后真正调用接口方法时才会实现动态代理的invoke方法,其中最最重要的就是以上三步操作,首先配置annocation解析的loadServiceMethod(method): method是调取的方法名
//实现方式很简单,判断HashMap<method, serviceMethod> 以当前方法为缓存的serviceMethod是否存在,如果不存在调用Build新建
ServiceMethod<?, ?> loadServiceMethod(Method method) {
ServiceMethod<?, ?> result = serviceMethodCache.get(method);
if (result != null) return result;
synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = new ServiceMethod.Builder<>(this, method).build();
serviceMethodCache.put(method, result);
}
}
return result;
}
//构建serviceMethod对象
Builder(Retrofit retrofit, Method method) {
this.retrofit = retrofit; //上面创建的retrofit跟调用方法method
this.method = method;
this.methodAnnotations = method.getAnnotations(); //获得注解信息
this.parameterTypes = method.getGenericParameterTypes(); //获取方法中参数列表数组,如果是泛型会使用 List<Integer>完整的泛型
this.parameterAnnotationsArray = method.getParameterAnnotations();
//获取注解的二维数组,第一个维度对应参数列表中的参数数目,第二个维度对应参数列表中对应的注解,这是因为一个参数可以添加多个注解,第二个维度就是表示多个注解的
}
//build方法
public ServiceMethod build() {
callAdapter = createCallAdapter(); //通过上方传递过来的适配器即Rxjava,获得方法返回的类型适用于那个适配器,这里返回类型是Observable<> ,适配器选择Rxjava,如果是Call,适配器选择默认的
responseType = callAdapter.responseType(); //方法返回值的类型
if (responseType == Response.class || responseType == okhttp3.Response.class) {
throw methodError("'"
+ Utils.getRawType(responseType).getName()
+ "' is not a valid response body type. Did you mean ResponseBody?");
}
//获取数据转换,即将网络数据responseBody转换成javaBean类型,这里用到了阿里的fastJson,最终调用的是FastJsonConverterFactory.responseBodyConverter()方法,这个也是我们使用fastJson时需要重新创建工厂类,实现的方法,否则报错的
responseConverter = createResponseConverter();
//通过注解,获取接口上方的请求方式get/post/传递文件还是其他即parseHttpMethodAndPath()
for (Annotation annotation : methodAnnotations) {
parseMethodAnnotation(annotation);
}
//还记得上面说的parameterAnnotationsArray是注解返回的二维数组,一个值可能有多个注解,下面就是遍历二维数组找到所有的注解
int parameterCount = parameterAnnotationsArray.length;
parameterHandlers = new ParameterHandler<?>[parameterCount];
for (int p = 0; p < parameterCount; p++) {
Type parameterType = parameterTypes[p];
Annotation[] parameterAnnotations = parameterAnnotationsArray[p];
if (parameterAnnotations == null) {
throw parameterError(p, "No Retrofit annotation found.");
}
//对于[1][2]来说数组1下有两个值,这里就是遍历这两个值,将注解全部存储进来
parameterHandlers[p] = parseParameter(p, parameterType, parameterAnnotations);
}
//返回新的serviceMethod对象,里面有请求接口中的请求形式,注解类型,适配器选择,网络接口数据转化等信息
return new ServiceMethod<>(this);
}
- 上方的createCallAdapter()返回的是根据返回类型找到合适的适配器
- Call类型返回是默认的适配器BuiltInConverters就是构建Retrofit时converterFactories 集合大小为传递进来的适配器和 + 1的那个1,如果是Observable对象则遍历找到的是Rxjava2
private CallAdapter<T, R> createCallAdapter() {
Type returnType = method.getGenericReturnType //找到返回类型,Observable<>对应的是Rxjava
Annotation[] annotations = method.getAnnotations();
try {
//通过返回类型,返回适配器,这里到
return (CallAdapter<T, R>) retrofit.callAdapter(returnType, annotations);
} catch (RuntimeException e) { // Wide exception range because factories are user code.
throw methodError(e, "Unable to create call adapter for %s", returnType);
}
}
//retrofit.callAdapter调用
public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType,
Annotation[] annotations) {
int start = callAdapterFactories.indexOf(skipPast) + 1;
//遍历添加的所有适配器,找到每个的get方法,判断当前type是否属于他的返回类型
for (int i = start, count = callAdapterFactories.size(); i < count; i++) {
CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this); //由于我们最上方构建Retrofit是使用的RxJava2CallAdapterFactory.create().get方法
if (adapter != null) {
return adapter;
}
}
//这里调用的是Rxjava的,我们进去看看get方法 => 调用的是RxJava2CallAdapterFactory.get()
return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable,
isSingle, isMaybe, false); //返回Rxjava 的 adapter,记住这个就行了,稍后会用到
- 我们构建了一个serviceMethod对象包含了该接口方法需要的所有信息,此时看第二步
//通过上方的serviceMethod和参数类型args表示接口方法中的参数数组构成一个OkHttpCall 继承自Call<T>
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
- 第三步serviceMethod.adapter(okHttpCall)
- 由于使用Rxjava2即调用的是RxJava2CallAdapter中的adapter方法
- 没有添加Rxjava2的支持,则调用是默认的ExecutorCallAdapterFactory.adapter
//Rxjava2中的
serviceMethod.adapt(okHttpCall) //实际调用的是callAdapter.adapter(call)
//callAdapter上方在构建serviceMethod应该还有印象,就是 RxJava2CallAdapter
@Override public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call); //当前请求是异步还是同步操作分别执行不同的方法
Observable<?> observable;
if (isResult) { //返回订阅者是什么类型的,就封装成不同的类型调用onNext,onComPlete,onError方法
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
//默认不添加RxJava2的支持调用上方第三步 adapter
@Override public Call<Object> adapt(Call<Object> call) {
return new ExecutorCallbackCall<>(callbackExecutor, call);
}
//callBackExecutor表示Android默认的那个UI线程池,call表示回调
Call调用两种方式: enqueue异步, execute同步
//异步调用
@Override public void enqueue(final Callback<T> callback) {
checkNotNull(callback, "callback == null");
delegate.enqueue(new Callback<T>() {
@Override public void onResponse(Call<T> call, final Response<T> response) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
if (delegate.isCanceled()) {
// Emulate OkHttp's behavior of throwing/delivering an IOException on cancellation.
callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled"));
} else {
callback.onResponse(ExecutorCallbackCall.this, response);
}
}
});
}
@Override public void onFailure(Call<T> call, final Throwable t) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(ExecutorCallbackCall.this, t);
}
});
}
});
}
//同步
@Override public Response<T> execute() throws IOException {
return delegate.execute();
}
- 以上是Retrofit调用接口方法时候的主要流程,下面进入真正的网络请求OkHttp中查看具体步骤
Okhttp请求部分
- 真正的网络请求是在订阅以后即Observable.subscribe()方法调用时,这个跟Rxjava订阅一致的
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
}
}
//subscribeActual 是一个抽象方法,还记得上面我们在Rxjava2CallAdapter.adapter时候创建了一个responseObservable对象,一个是同步,一个是异步
//对于同步的CallEnqueueObservable实现了subscribeActual方法
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallCallback<T> callback = new CallCallback<>(call, observer); //这个是返回数据
observer.onSubscribe(callback);
call.enqueue(callback);
//这里的call是rxJava2CallAdapter传递进来的,也就是
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.adapt(okHttpCall);
//即为okHttpCall.enqueue
@Override public void enqueue(final Callback<T> callback) {
okhttp3.Call call;
Throwable failure;
synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;
call = rawCall;
failure = creationFailure;
if (call == null && failure == null) {
try {
call = rawCall = createRawCall(); //注意这句话rawCall默认为null,使用createRawcall创建
} catch (Throwable t) {
throwIfFatal(t);
failure = creationFailure = t;
}
}
}
if (failure != null) {
callback.onFailure(this, failure);
return;
}
if (canceled) {
call.cancel();
}
//执行异步方法,回调等操作,回调在CallEnqueueObservable类中有实现
call.enqueue(new okhttp3.Callback() {
@Override public void onResponse(okhttp3.Call call, okhttp3.Response rawResponse) {
Response<T> response;
try {
response = parseResponse(rawResponse); //解析回调
} catch (Throwable e) {
callFailure(e);
return;
}
try { //接口返回,就是subscribe中的方法回调啦
callback.onResponse(OkHttpCall.this, response);
} catch (Throwable t) {
t.printStackTrace();
}
}
@Override public void onFailure(okhttp3.Call call, IOException e) {
callFailure(e);
}
private void callFailure(Throwable e) {
try {
callback.onFailure(OkHttpCall.this, e);
} catch (Throwable t) {
t.printStackTrace();
}
}
});
}
//跳转到createRawCall() 方法中
okhttp3.Call call = serviceMethod.toCall(args);
//方法很简单,只是调用了第一步生成retrofit的ServiceMethod中封装可很多东西,它的tocall方法去看看
okhttp3.Call toCall(@Nullable Object... args) throws IOException {
//构建Http请求头,headers,以及是表单 @FormBody , 文件上传@MultipartBody 等类型数据
RequestBuilder requestBuilder = new RequestBuilder(httpMethod, baseUrl, relativeUrl, headers,
contentType, hasBody, isFormEncoded, isMultipart);
@SuppressWarnings("unchecked") // It is an error to invoke a method with the wrong arg types.
ParameterHandler<Object>[] handlers = (ParameterHandler<Object>[]) parameterHandlers;
int argumentCount = args != null ? args.length : 0;
if (argumentCount != handlers.length) {
throw new IllegalArgumentException("Argument count (" + argumentCount
+ ") doesn't match expected count (" + handlers.length + ")");
}
for (int p = 0; p < argumentCount; p++) {
handlers[p].apply(requestBuilder, args[p]);
}
//上面都是构造一个请求头部分,这个才是真正的请求
return callFactory.newCall(requestBuilder.build());
//callFactory和他的newCall() 方法,参数时请求类型和数据
}
//callFactory是在ServiceMethod(build)方法中调用
//callFactoryd创建是 this.callFactory = builder.retrofit.callFactory();
//还记得我们最初创建Retrofit的build()方法中
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
callFactory = new OkHttpClient();
} //如果当前传过来一个Okhttp,一般都会自己生成一个,你需要设置拦截器修改请求参数或者接受数据,拦截器是OkHttp中的,跟Retrofit无关哦!
//因此上面的callFactory对象实际为OkHttpClient.newCall方法,这里就正式进入OkHttpClient的联网操作了,记住联网操作调用的是.equeue()方法哦
- OkhttpClient也是通过build构建者模式创建,一般都需要自定义Application拦截器和Network网络拦截器的
builder.addInterceptor(new NetworkInterceptor()); //自定义请求头添加
builder.addInterceptor(getInterceptor()); // 日志打印等
builder.addInterceptor(new UpLoadProgressInterceptor((bytesWritten, contentLength) -> Log.d("RetrofitHelper", "bytesWritten=" + bytesWritten + ",contentLength=" + contentLength)));
- 这里我们不必纠结build中到底传递了那些数据,我们看上方主要的流程callFactory.newCall()方法
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
- Call是HTTP请求任务的封装,是OkHttp的核心类,这里的newRealCall调用
//这里返回了call对象,我们需要返回RxJava2CallAdapter类中的adapter方法
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call) //异步调用
: new CallExecuteObservable<>(call); //同步调用
//上面newRealCall方法分析了最终调取的是里面的 subscribeActual,对于同步CallExecuteObservable.subscribeActual
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
Call<T> call = originalCall.clone();
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
Response<T> response = call.execute(); //执行execute即为上面的RealCall.execute方法
if (!call.isCanceled()) {
observer.onNext(response);
}
...省略...
}
}
//分析异步请求的CallEnqueueObservable.subscribeActual
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallCallback<T> callback = new CallCallback<>(call, observer);
observer.onSubscribe(callback);
call.enqueue(callback); //异步操作调用RealCall.enqueue方法
}
- 对于同步调用RealCall.execute方法具体实现
@Override public Response execute() throws IOException {
synchronized (this) { //判断call是否已经执行过,每个call对象只能使用一次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace(); //捕获请求的StackTrace
eventListener.callStart(this);
try {
client.dispatcher().executed(this); //使用Dispatcher里的队列缓存请求
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
- 里面有Client.dispatcher()线程池方法返回一个DisPatcher对象,他是在Build()中创建的
- maxRequestsPerHost: 当前执行的异步请求队列中相同主机的请求数小于maxRequestsPerHost(每个主机最大请求数,默认为5)
public Builder() {
dispatcher = new Dispatcher();
}
//分析Dispatcher类,包括三个缓存队列:使用数组表示的双向队列
public final class Dispatcher {
private int maxRequests = 64; //最大请求数 64个
private int maxRequestsPerHost = 5; //相同host请求的阈值,host下正在执行的任务数小于该阈值时,直接执行任务
private @Nullable Runnable idleCallback;
/** 线程池,是一个无核心线程,最大线程Max且没有缓存队列的线程池 */
private @Nullable ExecutorService executorService;
/** 等待执行任务Call的双向队列 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在异步执行请求任务Call的双向队列 */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/**正在同步执行请求任务Call的双向队列 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) { //创建一个没有最大线程数,且没有缓存队列的线程池
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
- 回到RealCall.execute中调用了Dispatcher.execute(this): 同步方法直接加入同步队列中去,而异步调用需要判断正在执行的异步队列是否满,如果已满加入等待执行中,每次运行完毕以后都会重新从队列中取数据,
- 同步比较简单,这里我们只看equeue
//这个是RealCall中的添加同步方法
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
//添加异步方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace(); //获取当前堆栈信息
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback)); //调用enqueue,且回调在AsyncCall
}
//enqueue处理异步请求
synchronized void enqueue(AsyncCall call) {
//如果当前正在运行的异步任务小于64个且相同host下正在执行的任务书小于5,直接执行任务
//host是指baseUrl : 比如www.baidu.com 就是一个Host
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call); //加入正在异步执行队列
executorService().execute(call); //添加到线程池执行操作
} else {
readyAsyncCalls.add(call); //否则添加到等待执行异步任务队列中
}
}
- 注意AsyncCall是RealCall的内部类,表示异步回调,继承自Runnable,查看他的run方法,可以发现执行了自定义的execute()方法, 而同步回调是RealCall.execute()方法
- 如果是我们写框架,这个execute异步请求网络,完成以后回调给RealCall接口,还记得上方我们Dispatcher中的缓存队列嘛?每当一个异步请求完成,我们需要从正在执行队列中移除,同时添加一个等待执行任务
- 在执行完毕后将在finally中执行Dispatch.finished,其中对于同步、异步均从运行队列中移除,只有异步才会从等待队列ArrayDeque中获取首个可执行任务添加到线程池。同步任务将会直接运行,待全部执行完毕后调用idleCallback 空闲回调
- 注意: 正在的网络请求是何时开始的,我们可以停下来想一下,上方的分析都是对请求头部分做处理,加入任务到队列等,注意这里无论同步还是异步调取了getResponseWithInterceptorChain方法后直接返回了Response数据了,那么它肯定是我们重点分析方法啦!其实它就是真正执行OkHttp网络请求的
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(); //执行实际的网络请求
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this); //表示该请求已经完毕,需要对Dispatcher线程池就行修改,从正在异步请求队列移除this,请求,并从等待请求队列中添加一个
}
}
//finished(this)调用规则
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//从正在异步请求队列中移除该任务,在上方已经请求过了
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls(); //如果当前是异步的就需要从等待队列添加到异步执行队列中
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
//从等待队列添加到异步执行队列中
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove(); //从等待队列移除添加到异步执行队列,并在线程池中执行操作
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) { //从当前同步/异步执行队列中移除
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls(); //异步时遍历添加等待队列中任务,同步传入false
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) { //如果正在运行的同步及异步队列空闲啦
idleCallback.run(); //调用空闲回调方法
}
}
- 上方是执行网络请求后,对缓存队列的处理操作!我们将目光从新回到最最重要的网络请求方法看一下他是怎么做的
- 小知识点: addInterceptor()添加的是应用拦截器作用在Okhttp core <--> Application,只能执行一次
- addNetworkInterceptor() 添加的是网络拦截器作用于Okhttp core <--> network,如果url发生重定向了,可能被执行多次,但如果是应用拦截器添加只能执行一次
- 看完实际代码你就明白为何会如此了,这跟拦截器的添加顺序相关,application是在第一步添加,而重定向是在第二步,对于一个网址,可能被重定向多次,但只会在第二步-> 其后的操作多次执行,对于第一步的只能执行一次,而NetWork拦截器在第六步执行,所以能够被多次执行,可以打印具体的网络执行过程(每一次重定向都能够执行)
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors()); //自定义拦截器
interceptors.add(retryAndFollowUpInterceptor); //重定向拦截器
interceptors.add(new BridgeInterceptor(client.cookieJar())); //桥接拦截器处理header,cookie等
interceptors.add(new CacheInterceptor(client.internalCache())); //缓存拦截器,处理cache
interceptors.add(new ConnectInterceptor(client)); //连接拦截器,负责建立连接
if (!forWebSocket) {
//自定义网络拦截器,此时已经建立连接,通过okHttp.addNetworkInterceptor()添加
interceptors.addAll(client.networkInterceptors());
}
//服务器请求拦截器(发起请求,接受响应)
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
拦截器分析
用户自定义拦截器: Application 应用拦截器 和 Network网络拦截器
- 结合上方的源码和图片分析拦截器过程是Request由上到下执行,Response回调是由下往上经过拦截器,分析这两种拦截器作用
- Application 应用拦截器,是第一个Interceptor,被第一个执行,这里的request是最原始的,而由于ArrayList是有序的,因此响应response则是递归调用的最终响应,虽然中间有重定向等,但这里的Response只关心最终的结果
- 不需要关心中间发生的重定向和重试操作,他处于第一位,首先可以更改原始的request对headers请求头添加字段等,也可以得到最终的响应response
//更改request请求数据
Request request = chain.request();
Request.Builder builder = request.newBuilder(); //在builder中添加修改等
return chain.proceed(builder.build()); //正在的网络执行即可build构建返回的还是一个Request对象
//更改Response,网络请求返回的数据,这里就可以更改response了
Response response = chain.proceed(request);
* 只会被调用一次,即使响应是从缓存中获取的
* 只关注原始请求,不关心请求资源是否发生改变,是否重定向,且是第一个被执行的拦截器,有权决定是否调用其后的拦截器,即chain.proceed()方法是否执行,如果不执行此方法其后拦截器都没用了!其后我们会分析拦截器的调用是个链式反应
- NetWork Interceptor网络拦截器,是第六个拦截器中,经过了重定向,并且通过桥接看截取进行request请求头和响应response的处理,这里可以得到更多的信息,且可以被多次调用,发现重定向等操作,比应用拦截器获取更多的信息
- 可以进行失败重试或重定向后得到的response
- 为响应直接从其CacheInterceptor返回,做到网络缓存
- 观察数据在网络中的传输
- 对于一个网络拦截器,注意: Post方式无法缓存,对于Get请求有网加载网络数据,无网加载缓存数据即可
- 拦截器的调取过程,根据在ArrayList集合中的位置依次链式调取,实现Request由上到下,而chain.proceed(request)由下网上回调Response
/**
* @interceptors: interceptors拦截器,是个有序的集合,从上到下request,从下到上的返回response
* @originalRequest 构建的请求头
* @ this : 网络请求以后的回调 异步:AsynCall,同步 RealCall
* @ eventListener :监听 , 后面参数就是设置的网络连接时间,是否超时等等
*/
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest); //已经构建了一个0初始拦截器,后面调用chain实际为 RealInterceptorChain.proceed
//
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
...判空操作..
//上方已经构建了初始位置0的拦截器,这里就从1开始轮询下一次调用拦截器的intercept方法 index+1 表示我们下一次拦截器的访问
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index); //获得本次拦截器
Response response = interceptor.intercept(next); //调用拦截器方法
//实际的网络请求在上方添加拦截器的最后一个CallServerInterceptor.intercept
return response;
}
- 查看数据传输的CallServerInterceptor.intercept方法
- 这里Socket连接已经建立,仅仅只是使用Okio的sink 写,source读 Socket通信而已
- HttpCodec是一个接口类,有两种实现Http1Codec Http 1.0请求, Http2Codec http 2.0请求
//真正的Socket 网络的IO操作
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream(); //这个为null,下面会写入header
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request(); //请求头数据
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request); //写入请求的headers,发送给服务器
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
//判断是否有请求实体的请求(不是get请求)
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// 如果头部信息添加了"Expect: 100-continue",这个请求头字段的作用是在发送RequestBody前向服务器确认是否接受RequestBody,如果服务器不接受也就没有发送的必要了。
// 有这个字段,相当于一次简单的握手操作,会等待拿到服务器返回的ResponseHeaders之后再继续,如果服务器接收RequestBody,会返回null。
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
//服务器同意接受,开始向流中写入RequestBody
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
//写入数据
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
//调用sink.flush
httpCodec.finishRequest();
//构建头部信息和状态码等
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
//构建Response,写入原来请求,握手情况,请求时间,得到相应时间
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
返回的状态码
int code = response.code();
if (code == 100) { //再次请求一次
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) { //如果是webScocket网络连接或者返回值101返回空body
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else { //读取body信息
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
//如果设置了连接关闭则断开连接
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
//对于204和205抛出异常
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
- 这只是网络IO部分,这里我们关注发送状态行和头部数据给服务器使用的是HttpCodec.writeRequestHeaders(request);实际调用的是Http1Codec中的方法
@Override public void writeRequestHeaders(Request request) throws IOException {
//返回状态行数据,类型get/http 1.1的字符串
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
//sink可以看成封装了Socket连接的输入流,将状态栏和头部数据输出到服务端
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
//sink是由Okio创建
sink = Okio.buffer(Okio.sink(socket));
- 而读取服务器返回的头部信息,状态码,并返回ResponseBuilder对象
总结各个拦截器
-
首先还是我们上方的那个图
- 自定义添加的已经分析过了,只需要知道他是第一个Request请求修改以及最后一个Response响应的结果即可,且是根据应用拦截器add()的顺序相关联的
- RetryAndFolloeInterceptor: 失败重试,主要根据url(主机名,端口号等),OkhttpClient提供的信息(dns,socketFactory , sslSocketFactory是否是http/https),创建一个Address信息(指定一个webserver和所有连接到该服务器的必需的静态配置: 端口,HTTPS设置和网络协议Http1.1或2.0), 对于URLs属于同一个Address可以共享同一个底层的Socket连接,优势:低延迟,高吞吐量,省电
- 主要是根据传递进来的BaseUrl解析主机IP,端口号信息等操作
- 其中还会创建一个StreamAllcation类用于管理连接,流,和请求三者的关系,有一个连接池 connectionPool,这样就可以实现connect的复用了
- BridgeInterceptor: 请求响应转换,主要是对请求头中如果不存在某些参数就添加默认的参数,如果响应response是gzip形式,对其解码等操作
- CacheInterceptor: 缓存拦截器,用于对header中的设置的一些字段来判断是否缓存当前请求的操作,其主要在Cache类中通过DiskLruCache类实现,注意put操作只对Get方法才能被缓存,对于post,patch,put,delete,move等请求,都是不应该被缓存的,就是将请求信息按顺序写入到DiskLruCache中,最终由DiskLruCache写入到磁盘中,缓存的键是request.url
- ConnectInterceptor: 通过Socket同服务器建立连接
- 通过2中的连接池获取一个健康的连接,这里可以从连接池connectionPool中获取,如果没有了,就根据路由信息创建一个新的连接并加入到连接池中,根据Http是1.1还是Http2.0协议不同,一般常用的依然是1.1协议,设置连接底层的Socket连接
- 当创建了一个新的Connection后,会调用他的connect方法,将客户端的Socket连接到服务端Socket中并使用Okio封装了Socket的输入输出流source(读) , sink(写) 这里连接Socket已经建立,用到的输入输出流也创建了,下面就是监听网络数据传输了
- NetworkInterceptor: 网络监听器,根据以上分析,可知是在Socket连接建立以后到发送网络数据之间的监听,可以自定义,还记得2中的失败重试,如果重定向了网址1中自定义的只会执行一次,而这里会被重复执行的,根据位置不同,如果需要详细的网络信息可以重写监听得到每次失败重试Log信息
- CallServerInterceptor: 数据传输,上方建立的Socket连接,通过sink将http头发送给服务器,写完通过sink.flush()刷新输出流即可发送给服务端
- 发送完成以后调用readResponseHeaders() 获取响应的头部信息,然后构成Response对象,在通过source读取响应的body即可
- 总结: 这样经过层层的系统拦截器就实现了okHttp的网络通信,我们只需要明白: StreamAllocation负责根据请求创建连接,可能新建一个连接connection,或者重用自己这个连接,或者从ConnectPool获取连接,连接建立就涉及到Socket创建和连接了,当连接创建好了,就创建HttpStream对象,负责底层操作Socket的输入输出流,sink,source
- 在RetryAndFollowupInterceptor中创建StreamAllocation
- 在ConnectInterceptor中创建Socket连接以及HttpStream对象
- 在CallServerInterceptor中操作HttpStream进行发送请求和读取响应