RxJava源码解析第二篇。
我们知道,在使用RxJava的时候,线程的调度是其内部帮我们实现的,这让我们可以便捷的实现函数式编程。
本文主要从源码的角度来分析RxJava的线程调度机制
= =最近被项目搞疯都没什么时间写笔记了。
引入
我们知道,线程调度主要通过observeOn
和subscribeOn
这两个方法,以及Schedular来指定使用的线程。
还是以上一次的代码为例:
Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
@Override
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
}) //调用登录接口
.map(new Function<LoginApiBean, UserInfoBean>() {
@Override
protected UserInfoBean decode(LoginApiBean loginApiBean) {
//处理登录结果,返回UserInfo
if (loginApiBean.isSuccess()) {
return loginApiBean.getUserInfoBean();
} else {
throw new RequestFailException("获取网络请求失败");
}
}
})
.doOnNext(new Consumer<UserInfoBean>() { //保存登录结果UserInfo
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {
saveUserInfo(bean);
}
})
.subscribeOn(Schedulers.io()) //调度线程
.observeOn(AndroidSchedulers.mainThread()) //调度线程
.subscribe(new Consumer<UserInfoBean>() {
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
//整个请求成功,根据获取的UserInfo更新对应的View
showSuccessView(bean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//请求失败,显示对应的View
showFailView();
}
});
我们知道,通过:
.subscribeOn(Schedulers.io()) //调度线程
.observeOn(AndroidSchedulers.mainThread()) //调度线程
这两句代码,就使我们上半部分的请求和保存数据都执行在io线程中,而下半部的ui更新则执行在主线程。
通过这段代码,我们引入几个问题:
- observeOn和subscribeOn是如何实现线程调度的?
- observeOn和subscribeOn之间是否存在冲突?
observeOn源码
首先解决第一个问题,我们先了解一下ObserveOn的实现原理:
首先看一下调用:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
我们可以看到,ObserveOn
最终是返回了一个ObservableObserveOn
对象,并将scheduler
传入。
根据上一篇文的思路:
ObservableObserveOn
会被我们最后subscribe
的时候传入的Observer
订阅。
让我们跟进看一下ObservableObserveOn
被订阅时会执行什么逻辑:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//TrampolineScheduler 表示当前线程
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//根据scheduler创建worker
Scheduler.Worker w = scheduler.createWorker();
//通过ObservableObserveOnObserver代理
source.subscribe(new ObservableObserveOn.ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里的逻辑并不难理解,(如果看了上一篇文章),
首先是判断了scheduler
是不是表示当前线程的TrampolineScheduler
,如果是就直接让observer
订阅上一级的Observable
,也就是跳过当前这一层,即图中的Observer
直接订阅ObservableSubscribeOn
。
然后根据schedular
生成对应的worker
,交由ObservableObserveOnObserver
代理,订阅上一级的Observable
。
根据我们引入的案例,我们以observeOn(AndroidSchedulers.mainThread())
为例,当完成逆向订阅,执行任务链到ObservableObserveOnObserver
时:
@Override
public void onNext(T t) {
// 上一级的模式如果不是异步的,加入队列
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//进行线程调度
schedule();
}
void schedule() {
// 判断当前正在执行的任务数目
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
这里首先是判断了sourceMode
,这里先不跟踪这个变量,只需要知道大多数情况下,这个判断是成立,所以会把数据加入队列。
然后转而让worker
执行接下去的步骤。
我们跟踪看看,可以发现,这是个抽象方法,可以找到他在不同类中有不同实现,分别对应了几种不同的线程调度机制,我们挑选案例中的AndroidSchedulers.mainThread()
来跟踪。
首先我们跟踪mainThread
方法,可以发现内部转到了这里:
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
我们再跟进HandlerScheduler
,我们知道worker
是通过createWorker
方法产生的:
public Worker createWorker() {
return new HandlerWorker(handler);
}
可以看到直接生成了HandlerWorker
,并传入了一开始创建的绑定了MainLooper
的Handler
。看到这里也能大致猜出,后续会把任务传给这个handler
执行:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//省略部分代码
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
可以看到,这里将传进来的runnable
包装成ScheduledRunnable
,然后提交给绑定的handler
。
我们知道,后续Handler
会调用ScheduledRunnable
的run方法:
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
//……
}
}
可以看到,只是简单的调用了我们传入的runnable
的run
方法,也就是刚才我们在ObservableObserveOnObserver
中通过schedule
方法传入的runnable
,我们回去看看:
void schedule() {
// 判断当前正在执行的任务数目
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
可以看到其实本身就是个runnable
:
@Override
public void run() {
//输出结果是否融合
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
可以看到,根据outputFused
来跳转方法,这里先不跟踪这个变量,后面会再提到。
现在只需要知道当连续两个observable都需要线程调度时(比如从observeOn
到observeOn
),这个outputFused才会发生变化,默认为false。
那么这里,我们先进入drainNormal
方法:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
//第一层循环
for (;;) {
// 检查异常处理
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//第二层循环
for (;;) {
boolean d = done;
T v;
//从队列中获取数据
v = q.poll();
boolean empty = v == null;
// 检查异常
if (checkTerminated(d, empty, a)) {
return;
}
//如果没有数据了,跳出
if (empty) {
break;
}
//执行下一次操作。
a.onNext(v);
}
//减掉执行的次数,并获取剩于任务数量,然后再次循环
//直到获取剩余任务量为0,跳出循环
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
这里的逻辑其实也不难,具体可以看注释。
到这里其实已经切换了线程,然后就是分发数据,逐个调用onNext
操作了。直到没有数据就跳出循环。(总觉得这里missed
的设计很奇怪- -为什么是初始化1而不是missed=get()
呢。望有大神解答~)
看到这里也就大致明白了ObserveOn
的流程呢。
总结一下:
ObserveOn
会用一个queue
保存上一级传下来的数据,然后通过scheduler创建一个worker
,提交数据,并将任务执行在worker
设置的线程中。
subscribeOn源码
看完ObserveOn
,我们看一下subscribeOn
,
首先看一下当他被订阅时会执行什么操作:
@Override
public void subscribeActual(final Observer<? super T> s) {
//创建对应的Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//执行线程调度,内部会订阅上一级的Observable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到,这里直接进行了线程调度,创建了SubscribeTask
任务,然后交由Scheduler
执行。
我们先看看scheduleDirect
会执行什么操作:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Scheduler.Worker w = createWorker();
Scheduler.DisposeTask task = new Scheduler.DisposeTask(run, w);
w.schedule(task, delay, unit);
return task;
}
可以看到,这里和我们刚才追踪ObserveOn
时的逻辑一样。都是将任务交给了Worker
处理。我们刚才已经分析了,Worker
会将任务提交给对应的线程执行。
所以我们回过头看一下我们提交了什么任务:
@Override
public void run() {
source.subscribe(parent);
}
可以看出,这里将订阅的操作提交给了Worker
执行。
总结一下:
subscribeOn
会将订阅上一级的操作调交给worker
中对应的线程执行。
ObserveOn和subscribeOn
我们还是以上述引入的例子为例,可以看出,整个过程进行了两次线程调度,首先是subscribeOn
,然后是ObserveOn
,这个过程比较简单,先解析这个过程。
根据上一篇文章的分析,RxJava的整个流程分为三个步骤:
创建任务链,这里没有涉及线程调度。默认执行在当前线程,在这里也就是主线程。
逆向订阅,这里当遇到
ObserveOn
的时候,ObserveOn
直接进行了订阅操作,所以没有影响。
但是但我们订阅ObservableSubscribeOn
的时候,其便将订阅操作提交到了对应线程,所以后续的订阅操作都执行在对应线程,在这里便是IO线程。执行任务链,受到
ObservableSubscribeOn
的影响,这里也会继续执行在IO线程。
但是当我们执行到ObserveOnObserver
的时候,onNext
操作会执行在对应的线程中,在这里也就是切换到主线程。
图中,紫色的箭头表示执行在默认线程(主线程),红色的箭头表示执行在IO线程,绳蓝色的线表示执行在切换后的主线程。
observeOn和subscribeOn之间是否存在冲突
其实从上述的例子我们可以看出并不存在冲突的问题,一个影响的subscribe之后的操作,一个影响的是doNext之后的操作。
从图中可以看出,不管subscribe
和ObserveOn
怎么变化,都不会发生冲突的情况。