okhttp
1.基本使用
初始化可以添加自定义的拦截器
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.addInterceptor(interceptorImpl).builder();//创建OKHttpClient的Builder
使用方法
String url = "http://wwww.baidu.com";
final Request request = new Request.Builder()
.url(url)
.get()//默认就是GET请求,可以不写
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure: ");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, "onResponse: " + response.body().string());
}
});
一般的使用大致就是这样的
2.从OkHttpClient创建开始入手分析
OkHttpClient.Builder()使用builder模式,用户可以自定义相应的参数
开发一般会用到的是
.connectTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.addInterceptor(interceptorImpl)
连接时间,写时间,读时间以及对应的Interceptor相关的拦截器
3.构建Request
Request用的也是Builder模式,好处主要是可以动态配置相应的参数
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tags = Util.immutableMap(builder.tags);
}
tag主要是做标识的,请求返回为null时候的标识操作
4.构建Call
构建Call,主要是调用RealCall.newRealCall方法,并在其内部添加了一个事件回调监听
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
//RealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
//添加一个事件回调,后续会有用处
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
而在newRealCall方法中同时也调用了RealCall的构造方法
构造方法中加入了RetryAndFollowUpInterceptor重试拦截器,okhttp中加入了很多拦截器,这也是一大特色
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
5. 执行异步请求enqueue
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
executed以及synchronized主要是用来防止重复操作和多线程同步用的
接下来的方法
private void captureCallStackTrace() {
Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}
重试监听器做一些栈StackTrace记录,以及eventListener.callStart(this);事件监听做回调处理,不影响流程
接着就到了Dispatcher的enqueue方法
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
Dispatcher中定义三个队列分别是readyAsyncCalls异步等待,同步运行runningAsyncCalls以及runningSyncCalls异步运行队列,enqueue方法中,当运行异步队列个数小于最大请求数(64)并且同一Host请求个数小于maxRequestsPerHost(5)则加入异步运行队列,并且用线程执行,否则加入异步等待队列中,这是okhttp的线程队列优化
6.查看AsyncCall的run方法
AsyncCall 继承了NamedRunnable,其内部会run方法会调用execute(),代码如下
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
signalledCallback这个标识用来处理是否打印对应的日志,这里可以看到Response类,说明网络请求是在getResponseWithInterceptorChain中完成的,之后会回调当前的Call状态值
7.真正的网络请求的getResponseWithInterceptorChain
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
//失败重试拦截器
interceptors.add(retryAndFollowUpInterceptor);
//request和response拦截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//网络请求连接拦截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//网络拦截器
interceptors.addAll(client.networkInterceptors());
}
//实际网络请求的拦截器
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
加入各式各样的拦截器,各个拦截器之间不耦合,易于用户的自己配置,最后调用RealInterceptorChain的proceed方法
8.RealInterceptorChain的proceed方法
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
this.call = call;
this.eventListener = eventListener;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
//...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// .....
return response;
}
构造方法中加入了eventListener事件监听,看来okhttp中eventListener的监听一直延伸到这里,还加入了
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
连接时间的配置
要重点关注的是index这个字段,前面传进来的时候,默认是0,而在proceed方法中,又重新执行了RealInterceptorChain的构造方法,并通过 interceptors.get(index)获取下一个拦截器,并且执行interceptor.intercept(next)方法,随便找一个拦截器看看
public final class BridgeInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
//省略部分代码
Response networkResponse = chain.proceed(requestBuilder.build());
//省略部分代码
return responseBuilder.build();
}
}
拦截器内部又重新调用了chain.proceed的方法,这和递归操作类似,也是okHttp最经典的责任链模式。
9.同步操作
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
同步请求也是通过getResponseWithInterceptorChain来完成的,流程更简单
10.大致的流程图
总结
OkHttp 主要是通过 5 个拦截器和 3 个队列(同步队列,异步队列,等待队列)工作,内部实现通过一个责任链模式完成,将网络请求的各个阶段封装到各个链条中,实现了各层的解耦。具体流程是>1.构建OkHttpClient对象,通过RealCall发起同步或异步请求,而决定是异步还是同步请求的是由线程分发器dispatcher来决定。
2.当发起同步请求时会将请求加入到同步队列中开启子线程并执行,当发起异步请求时会创建一个线程池,并且判断请求队列是否大于最大请求队列64,请求主机数是否大于5,如果大于请求添加到异步等待队列中,否则添加到异步执行队列并执行任务
3.通过Connection实例执行请求,经过拦截器链后拿到data,然后回调callback的onresponse返回数据。
Dispatcher 功能?
他负责将每一次Requst进行分发,压栈到自己的线程池,并通过调用者自己不同的方式进行异步和同步处理。很好的维护了任务队列。
拦截器的作用
**设置任意数量的 Intercepter 来对网络请求及其响应做任何中间处理,比如设置缓存,Https证书认证,请求加密,过滤请求,打印log等等。
BridgeInterceptor(桥接):为请求添加请求头,为响应添加响应头,完成应用层和网络层的桥接
RetryAndFollowUpInterceptor(重定向阻):请求失败重试
CacheInterceptor(缓存):缓存get请求
ConnectInterceptor(连接):内部维护一个连接池,负责连接复用、创建连接、释放连接。
CallServerInterceptor(网络):真正发起了网络请求。里面用的okio库,主要是segment的机制运用内存共享和复用,数据不需要进行二次copy,尽可能少的去申请内存,同时也就降低了GC的频率。强了流与流交互,优化缓存策略减小内存压力和性能消耗。
okhttp底层用的socket通信,而socket底层是tcp/ip传输协议,每次都需要进行三次握手四次挥手过程,而请求过程也经常是频繁的,碎片化的,为了提高网络连接的效率,Okhttp3还实现了connectionPool网络连接池进行复用。
okhttp优势
1.支持 http2,对一台机器的所有请求共享同一个 Socket
2.支持透明的 gzip 压缩响应体
3.请求失败时自动重试主机的其他 ip,自动重定向
4.响应缓存可以完全避免网络重复请求
5.内置连接池,支持连接复用,减少延迟
6.丰富的 API,可扩展性好
7.框架使用了很多设计模式
Okhttp 运用了设计模式?
1.构造者模式(OkhttpClient,Request 等各种对象的创建)
2.工厂模式(在 Call 接口中,有一个内部工厂 realCallFactory 接口。)
3.单例模式(每个 OkHttpClient 对象都管理自己独有的线程池和连接池。 这一点很多同学,甚至在我经历的团队中就有人踩过坑, 每一个请求都创建一个 OkHttpClient 导致内存爆掉)
5.责任链模式(拦截器的链式调用)降低逻辑的耦合,相互独立的逻辑写到自己的拦截器中,也无需关注其它拦截器所做的事情。还扩展性强,可以添加新的拦截器。*
6.享元模式(Dispatcher 的线程池中实现了对象复用)
retrofit
1.使用
- 1、创建HTTP接口首先创建HTTP的API服务接口,接口下的一个方法对应HTTP的一个请求,方法上面的注解表示请求的接口地址部分,返回类型是请求的返回值类型,方法的注解参数即是请求的参数。
public interface ApiService {
/**
* 登录:
* @param body
* @return
*/
@POST("/ny/consumer/login")
Observable<BaseResponse<User>> login(@Body RequestBody body);
}
- 2、构建Retrofit实例配置OkHttpClient实例;设置HTTP接口的域名地址;添加RxJava2网络请求适配器工厂;添加Gson数据转换器工厂;
mRetrofit = new Retrofit.Builder()
.client(sOkHttpClient)
.baseUrl(HOST)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
- 3、生成ApiService的动态代理通过Retrofit生成动态代理,用于发起HTTP请求。
mApiService = sRetrofit.create(ApiService.class);
- 4、发起请求:使用动态代理发起HTTP请求。
getApiService().login(requestBody);
2、源码解析
2.1、Retrofit实例的构建
Retrofit实例的构建使用建造者模式,包括
1、okhttp3.Call.Factory也就是OkHttpClient,因为OkHttpClient实现了okhttp3.Call.Factory,用于发起请求。
2、Executor 用于提交回调任务,默认使用Platform.Android的MainThreadExecutor,其实现了Executor接口,并在执行回调中实现了主线程Handler的handler.post(runnable)操作,用于将异步请求的回调结果从子线程切换到主线程。
static class MainThreadExecutor implements Executor {
private final Handler handler = new Handler(Looper.getMainLooper());
@Override public void execute(Runnable r) {
handler.post(r);
}
}
3、List<CallAdapter.Factory>
网络请求适配器工厂,默认使用Platform.Android的ExecutorCallAdapterFactory,该网络请求适配器工厂适配的网络请求是ExecutorCallbackCall。
@Override public @Nullable CallAdapter<?, ?> get(
Type returnType, Annotation[] annotations, Retrofit retrofit) {
if (getRawType(returnType) != Call.class) {
return null;
}
final Type responseType = Utils.getCallResponseType(returnType);
return new CallAdapter<Object, Call<?>>() {
@Override public Type responseType() {
return responseType;
}
@Override public Call<Object> adapt(Call<Object> call) {
return new ExecutorCallbackCall<>(callbackExecutor, call);
}
};
}
4、List<Converter.Factory> 数据转换器工厂,默认使用的是Platform.Android的OptionalConverterFactory,该数据转换器工厂使用的是默认的OptionalConverter。
2.2、生成ApiService的动态代理并发起HTTP请求
采用动态代理可以非常灵活地实现解耦,传入ApiService的Class对象,Proxy提供了用于创建动态代理对象的静态方法,执行动态代理实例的每个方法时都会被替换为执行InvocationHandler对象的invoke方法。
public <T> T create(final Class<T> service){
}
InvocationHandler对象的invoke方法中最后调用的是ServiceMethod的invoke方法:
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
也就是HttpServiceMethod实现的invoke方法:
方法中进行了网络请求适配器对OkHttpCall进行的适配,也就是说网络请求是使用OkHttpCall进行的,但返回类型由网络请求适配器进行适配。
@Override ReturnT invoke(Object[] args) {
return callAdapter.adapt(
new OkHttpCall<>(requestFactory, args, callFactory, responseConverter));
}
从上文配置请求网络适配器工厂我们知道,默认的网络请求适配器适配的是ExecutorCallbackCall,故默认使用ExecutorCallbackCall的enqueue做异步网络请求:
@Override public void enqueue(final Callback<T> callback) {
delegate.enqueue(new Callback<T>() {
@Override public void onResponse(Call<T> call, final Response<T> response) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
if (delegate.isCanceled()) {
callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled"));
} else {
callback.onResponse(ExecutorCallbackCall.this, response);
}
}
});
}
@Override public void onFailure(Call<T> call, final Throwable t) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(ExecutorCallbackCall.this, t);
}
});
}
});
}
这里的delegate就是OkHttpCall实例,callbackExecutor就是MainThreadExecutor实例,OkHttpCall异步请求回调后,使用MainThreadExecutor提交回调任务,该任务执行的就是在异步请求回调的子线程中将异步请求的回调结果从子线程切换到主线程。
其中OkHttpCall的异步请求方法中,调用的就是okhttp3.Call的异步请求,回调结果中会使用Converter<ResponseBody, T>对数据进行转换并返回。
T body = responseConverter.convert(catchingBody);
2.3、RxJava网络请求适配器工厂
使用RxJava2CallAdapter做网络请求适配器,将Call转换为Observable<Response>。
@Override public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
2.4、Gson数据转换器工厂
使用GsonResponseBodyConverter做数据转换器,将ResponseBody转换为T。
@Override public T convert(ResponseBody value) throws IOException {
JsonReader jsonReader = gson.newJsonReader(value.charStream());
try {
T result = adapter.read(jsonReader);
if (jsonReader.peek() != JsonToken.END_DOCUMENT) {
throw new JsonIOException("JSON document was not fully consumed.");
}
return result;
} finally {
value.close();
}
}
2.5 流程图
总结
定义:Retrofit就是一个网络请求框架的封装,通过java接口以及注解来描述网络请求,并用动态代理的方式在调用接口方法时注入自己的方法,本身只是简化了用户网络请求的参数配置等,底层的网络请求还是Okhttp,请求完成后将返回的response通过converterFactorty转换成相应的数据model,最后通过calladapter转换成其他数据类型,比如Rxjava的 Observable)。所以可以很好的与Rxjava相结合,使用起来简洁方便
代理模式:通过访问代理对象的方式来间接访问目标对象
分为静态代理 & 动态代理:
静态代理:代理类在程序运行前已经存在的代理方式
动态代理:代理类在程序运行前不存在、运行时由程序动态生成的代理方式
retrofit通过Proxy.newProxyInstance产生的代理类,当调用接口方法时都会调用InvocationHandler#invoke方法得到Http请求链接、请求方法、请求路径、请求参数等请求信息,构建一个OkHttp的请求并执行。
Retrofit 优点
1.可以配置不同 HTTP client 来实现网络请求,如 okhttp、httpclient 等;
2.请求的方法参数注解都可以定制;
3.支持同步、异步和 RxJava;
4.超级解耦;
5.可以配置不同的反序列化工具来解析数据,如 json、xml;
6.框架使用了很多设计模式:代理模式,构造者模式,工厂模式,适配器模式,观察者模式,外观模式。
RXjava:
RxJava采取的是观察者模式,使用时要先分别创建一个观察者Observer或Subscriber处理接收到数据后要做的处理(onNext,onError,onCompleted),一个被观察者Observable用来发送要处理的数据,最后由被观察者订阅观察者(subscribe),这时要发送的数据就会由被观察发出,然后观察者做出相应处理。用代码来简单描述为:
//创建被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
}
});
//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
//completed
}
@Override
public void onError(Throwable e) {
//error
}
@Override
public void onNext(String s) {
//do it
}
};
//订阅事件
observable.subscribe(subscriber);
这里有个问题,为什么是被观察者订阅观察者而不观察者订阅被观察者呢?我认为应该是这样:被观察者,即发送数据方,他的数据可以发送给多个观察者,即可以有多个观察者观察他,因此他是占据主导权的,他想让哪个观察者看就订阅哪个观察者把数据发给他。
当然,观察者,被观察者及订阅的代码还有很多简单的书写方式,如直接使用just()等方法发送数据,不创建观察者而是在subscribe()方法中传递几个Action()方法等等,在这里我只是展示了最基本的一套用法用来比较清晰地梳理一下工作流程。除此之外,RxJava还可以切换线程,可以对数据进行变换,这些都是在订阅过程中完成的,代码如下:
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String,Integer>() {
@Override
public Integer call(String s) {
//do String --> Integer
return 0;
}
})
.filter(new Func1<Integer,Boolean>() {
@Override
public Boolean call(Integer integer) {
//return your boolean
return integer>10;
}
})
.subscribe(subscriber);
其中,在切换线程时,subscribeOn指定subscribe发生的线程,observeOn指定Subscriber的回调发生的线程,其他操作符如过滤、变换操作符可以自己琢磨一下。
以上,就是RxJava大体上的使用流程,接下来我将从源码角度看一下他的实际工作过程。
工作原理
1.创建Observable
首先看看Observable里面有哪些变量:
final OnSubscribe<T> onSubscribe;
然后看看create的源码:
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
通过传入一个OnSubscribe对象,将其作为参数传入hook.onCreate()方法,将返回值作为参数构造一个Observable对象。这里的 hook 是一个static的RxJavaObservableExecutionHook对象,他的create()方法是这样的:
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
他接受一个OnSubscribe对象,然后就将他返回。hook.create(f) 可以等价的看作是 f 本身,Observable 的构造器接受的就是一个OnSubscribe对象了,看看他的构造器:
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
这里将传入的OnSubscribe对象赋给了自己的onSubscribe。
分析完了,创建Observable就是给他一个OnSubscribe对象,把他传入构造器创建一个Observable对象。那么OnSubscribe是什么?
2.OnSubscribe是什么?
看看OnSubscribe源码:
/**
* Invoked when Observable.subscribe is called.
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>>{
// cover for generics insanity
}
原来是继承自 Action1 的一个接口,注释说他在subscribe被调用的时候唤醒,OnSubscribe应该就是所谓的“事务”,他的call方法负责发起事务,即notifyObservers()。结合前面分析的使用过程,在创建Observable时传入的OnSubscribe中实现了call方法并且执行了subscriber的一些方法。
3.创建观察者Observer/Subscriber
首先看Observer的定义:
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
只是一个简单的接口,再看看Subscriber:
public abstract class Subscriber<T> implements Observer<T>,Subscription{
Subscriber是一个实现了Observer接口的抽象类,并且还扩充了许多方法。既然如此,那我们在使用RxJava时就应当尽量用Subscriber代替Observer了。
看看他有哪些属性:
// represents requested not set yet
private static final Long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
/* protected by `this` */
private Producer producer;
/* protected by `this` */
private long requested = NOT_SET; // default to not set
他持有一个自己的引用,一个SubscriptionList引用。分析一下,创建一个观察者Subscriber,就必须要实现来自Observer接口的三个方法:onNext(), onError(), onCompleted()。
4.订阅subscribe
创建好观察者和被观察者之后,就可以进行订阅了:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
在subscribe方法中又调用了Observer的一个私有方法:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See Guideline 6.4: Protect calls to user code from within an operator · Issue #216 · ReactiveX/RxJava for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
精简一下这个方法,逐步分析。首先,对传入的subscriber对象和observable.onSubscribe方法判空,然后执行了sunscriber的start()方法,之后对subscriber做了安全性封装:
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
所有检测完毕,开始执行下列方法:
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
又看到那个熟悉的hook了,看看他的onSubscribeStart方法是怎样实现的:
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass-thru by default
return onSubscribe;
}
传给了observer对象和他的onSubscribe对象,结果直接把后者返回了……有趣的设计,返回之后又继续调用了onSubscribe对象的call()方法,并传入了subscriber对象。
5.Subscription
在分析订阅部分代码时,我发现了subscribe()方法完成后,执行了Subscriptions的unsubscribed()方法并返回。这个Subscription是什么呢?
/**
* Subscription returns from {@link Observable#subscribe(Subscriber)} to allow unsubscribing.
* <p>
* See the utilities in {@link Subscriptions} and the implementations in the {@code rx.subscriptions} package.
* <p>
* This interface is the RxJava equivalent of {@code IDisposable} in Microsoft's Rx implementation.
*/
public interface Subscription {
/**
* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
* was received.
* <p>
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
* onCompleted is called).
*/
void unsubscribe();
/**
* Indicates whether this {@code Subscription} is currently unsubscribed.
*
* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
*/
boolean isUnsubscribed();
}
一直说观察者模式中要订阅,要订阅,怎么取消订阅呢?原来就在这里。我的理解,Subscription可以理解为一件订阅事务,他有一个取消订阅和检测是否取消的方法。每一个订阅事件,最后是可以返回这样一个subscription对象的。我们完全可以把这个对象收集起来,在需要的时候将他取消订阅。例如像下面这样:
private CompositeSubscription subscriptions = new CompositeSubscription();
//创建一个异步任务
subscriptions.add(subscription);//将subscription加入合集中
subscriptions.unsubscribe();//取消订阅
6.变换:map()
RxJava的操作符很多,我这里只选一个最基础的map来看看,首先看map代码如下:
//这段代码是我在1.1版本中分析的,在1.2.2中已经更改了实现方式,多谢评论区提醒~
public final <R> Observable<R> map(Func1<? super T, ? extends R> func){
return lift(new OperatorMap<T, R>(func));
}
在内部调用了lift()方法,并将结果返回了。可以看到,变换的过程中,将包含T在内的T的基类变换为了包含R在内的R的子类,所以这里重点要看两个地方,一是lift()如何实现,二是OperatorMap是什么。先看看lift()方法的实现:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
可以看到,在lift内部用类型R又重新创建了一个Observable。注意观察,这里的代码和调用subscribe()时很像,但又不同。对比一下发现,在subscribe()时,是由Observable自己的OnSubscribe调用了call()方法,并将自己的subscriber作为参数传入call()。但是在这里,通过一个新的OnSubscribe创建了一个新的Observable,在内部先创建了一个新的Subscriber,然后由旧的onSubscribe调用自己的call()方法,这里传入的又是新的Subscriber。新旧之间的关联就在于新的SUbscriber创建的过程:
Subscriber<? super T> st = hook.onLift(operator).call(o);
可以看到,创建新的Subscriber时用到了我们传入的operator,看看hook的lift()实现:
public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {
return lift;
}
把传入的operator又原样返回了。那么前面的代码就可以简化为operator直接调用了call()方法。我们自己写的变换代码就是实现了这个operator的call()方法。
.map(new Func1<String,Integer>() {
@Override
public Integer call(String s) {
//do String --> Integer
return 0;
}
})
看看前面的代码,我们传入的这个Func1,在内部会由他创建一个OperatorMap,然后将OperatorMap传入了lift(),这个OperatorMap就是我们刚才讲的operator的来源。
再来看看OperatorMap是什么:
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
可以看到,当构造一个OperatorMap时,传入了一个func,在OperatorMap构造器中,是将其赋给了自己记得一个叫做transformer的属性,这个transformer是一个Func1对象,因此我们的实现变换的主要细节其实就在于这个Func1。
7.变换:compose()
除了最基础的mao进行变换外,我们常用的还有compose变换,看看他是怎么实现的:
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
return ((Transformer<T, R>) transformer).call(this);
}
通过compose变换时,传入的是一个transfomer,最后调用的是他的call()方法。transfomer就是前面Map变换中用到的那个,综合来看,在RxJava中进行变换时,是通过创建新的Observable进行代理来实现的,而具体实现细节使用了transformer。
8.线程切换:subscrieOn()
subscribeOn()指定了subscribe()所发生的线程,看看他是怎样实现的:
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}
先调用了nest(),返回一个Observable对象,然后调用lift()进行变换,进行变换时传入的是一个由线程调度器scheduler构造的OperatorSubscribeOn对象。先看看nest中发生了什么:
public final Observable<Observable<T>> nest() {
return just(this);
}
把自己传给了just()方法:
public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
这个ScalarSynchronousObservable是继承自Observabel的,到头来还是调用create()创建了一个Observable对象。lift()方法前面已经分析过了,lift()中创建了一个新的Observable,这里不同的地方在于传入的是一个线程调度器scheduler而非OperatorMap,所以线程调度的具体实现应该就是由scheduler和OperatorSubscribeOn来决定的了。那么接下来就看看OperatorSubscribeOn是如何实现线程控制的。首先根据上面的分析,这里传入了一个scheduler对象给构造器,点进来看看他的实现,:
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
private final Scheduler scheduler;
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {
@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
producer.request(n);
}
});
}
}
});
}
});
}
});
}
};
}
}
可以看到,在构造器中,传入的scheduler赋给了自己的scheduler,然后在call方法中,通过scheduler创建了一个worker对象,名叫inner,之后的所有操作都是由inner完成的。总结一下,就是传入的scheduler创建了一个worker对象,由这个对象进行了实际上的线程控制。所以线程控制的关键就在于这个scheduler。而scheduler就是我们在使用过程中传入的http://Schedulers.io()等,这里就拿http://Schedulers.io()看看。
public static Scheduler io() {
return INSTANCE.ioScheduler;
}
再看看Schedulers类的构造器,可以知道INSTANCE.ioSchduler是在构造器中进行初始化的:
Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}
再结合:
/**
* Scheduler to return from {@link rx.schedulers.Schedulers#io()} or null if default should be used.
*
* This instance should be or behave like a stateless singleton;
*/
public Scheduler getIOScheduler() {
return null;
}
可知,ioScheduler是由CacheThreadSchduler这个类创建的,这个类继承自Scheduler,那么也就是说抽象类Scheduler的createWorker()方法由子类CacheThreadSchduler实现了。那就来看看这个方法具体的实现:
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
可以看到,createWorker()方法返回了一个EventLoopWorker对象。而这个类是CacheThreadSchduler类的内部类。回忆一下,之前我们创建好worker之后,他是如何工作的?:
inner.schedule(new Action0() {...};
是有这个worker对象调用了schedule()方法,并且传入了一个Action0。那么就来看看worker的源头,EventLoopWorker,在schedule()时做了什么:
private static final class EventLoopWorker extends Scheduler.Worker {
/* 省略部分代码*/
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}
可以看到,最终实际上是调用了thread.scheduleActual()方法,并将action传给了他,返回一个 ScheduledAction 对象。那么看看这个方法内部是如何实现的:
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
创建了一个ScheduledAction的对象,并将其返回,而ScheduledAction类是实现了Runnable接口的:
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
因此,具体対线程的操作就是在这里了。总结一下,SubscribeOn() 是通过 life() 变换来完成的,而在变换中实际上是通过 CachedThreadScheduler 类提供的 schedule() 方法,用Runnable来完成的线程控制。
9.线程切换:observeOn()
和 subscribeOn() 方法一样,observeOn() 方法实现原理也是通过 lift() 变换:
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}
类似的,这里也是传入了一个Operator,不同的是这里传入的是通过scheduler创建的OperatorObserveOn对象。先来看看OperatorObserveOn的构造器:
public OperatorObserveOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
类似的,将传入的 scheduler 赋给了自己的 scheduler 属性。这个scheduler 在哪里用到了呢?首先是在回调方法 call() 中:
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
parent.init();
return parent;
}
}
通过scheduler创建了一个 ObserveOnSubscriber 对象 parent ,并调用了 init() 方法。这个 ObserveOnSubscriber 类是一个内部类,他的构造器:
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
}
在这里就调用了 scheduler 的 createWorker() 方法,并将返回结果赋给了自己的 recursiveScheduler ,然后由他创建了 ScheduledUnsubscribe 对象,将这个对象赋给了 scheduledUnsubscribe。好像有点乱,大概理一下,这里创建了一个 Subscriber 对象,在内部做了一些初始化的操作,而这个Subscriber 对象实际上就是 ObserveOnSubscriber 的对象。观察 ObserveOnSubscriber 类:
@Override
public void onNext(final T t) {
if (isUnsubscribed()) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
return;
}
error = e;
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
unsubscribe();
finished = true;
// polling thread should skip any onNext still in the queue
schedule();
}
会发现,在这三个被调用的方法中都会调用 schedule() 方法,而 schedule() 方法的实现是这样的:
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(action);
}
}
注意,这个 recursiveScheduler 就是前面创建的worker。所以控制线程切换的关键还是在于传入的 scheduler及他所创建的 worker 和worker的 schedule() 方法。传入的 scheduler 有很多种,就拿 AndroidSchedulers.mainThread() 来说:
public final class AndroidSchedulers {
private AndroidSchedulers() {
throw new AssertionError("No instances");
}
// See Unit testing support for AndroidSchedulers · Issue #238 · ReactiveX/RxAndroid
// Initialization-on-demand holder idiom
private static class MainThreadSchedulerHolder {
static final Scheduler MAIN_THREAD_SCHEDULER =
new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread() {
Scheduler scheduler =
RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER;
}
}
可以看到,scheduler就是一个 HandlerScheduler 对象,看HandlerScheduler类的实现:
public final class HandlerScheduler extends Scheduler {
/*省略部分代码*/
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
/*省略部分代码*/
}
}
HandlerScheduler 类也继承了 Scheduler ,他的createWorker() 创建了一个HandlerWorker 对象。所以前面创建的worker其实就是 HandlerWorker。HandlerWorker 类是HandlerScheduler 的内部类,他的schedule 方法:
static class HandlerWorker extends Worker {
private final Handler handler;
private final CompositeSubscription compositeSubscription = new CompositeSubscription();
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (compositeSubscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);
final ScheduledAction scheduledAction = new ScheduledAction(action);
scheduledAction.addParent(compositeSubscription);
compositeSubscription.add(scheduledAction);
handler.postDelayed(scheduledAction, unit.toMillis(delayTime));
scheduledAction.add(Subscriptions.create(new Action0() {
@Override
public void call() {
handler.removeCallbacks(scheduledAction);
}
}));
return scheduledAction;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
可以看到,在 schedule 内部还是创建了一个 ScheduledAction 对象,之后所有的操作都有他来完成。由前面分析可知,ScheduledAction 类实现了 Runnable。所以归根结底,两个线程控制都是由 Runnable 来实现的。
总结
RxJava 是一个 基于事件流、实现异步操作的库
原理:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer)
观察者(Observer) 按顺序接收事件 & 作出对应的响应动作
Scheduler的原理(线程切换)
首先是observeOn切换线程,他根据传入的参数(newThread(),io(),singleThread()、AndroidmainThread可以生成不同的线程,它在调用onNext方法中执行schedule方法内部有一个work对象实现了runnable接口,完成了线程切换。但如果是AndroidmainThread,则由handle发送postdeply完成到主线程的切换。subscribeOn切换线程与observeOn类似,但是observeOn是改变它所在线程所以每次切换都有效,而subscribeOn是改变数据源的运行线程,只在第一次有效,后续切换都无效,因为subscribeOn自下而上每次在指定线程中向上级订阅,下次再执行subscribeOn只会在改变的线程里进行,用户感受不到线程切换。
优点
1)采用链式调用,代码简洁优雅有美感,并且可读性增强。
2)rxjava中采用观察者模式。模块之间划定了清晰的界限,降低了模块间的耦合性,提高了代码的可维护性和重用性。
3)rxjava中提供了强大的操作符。
:just:将同种数据源组合放到被观察者上面
from:将类似数组、集合的数据源放到被观察者上面
map:将一种数据源转化成另外一种,可以是任意类型变换是1对1
flatmap:将一种数据源转化成另外一种数据,返回ObservableSource对象。可以对数据进行一对多,多对多的变换。flatMap并不保证数据有序。
zip:处理多种不同结果集的数据发射,一般用得多的地方是多个网络请求组合然后统一处理业务逻辑。
除此之外还有经常用到compose操作符,因为rxjava发布订阅如果没及时取消会内存泄漏,通过compose与rxlivercycle配合使用绑定容器生命周期。