概要
RxJava
最神秘的部分莫过于此,我为了编写这篇文章也是一遍一遍的查看源码寻找它的运行原理,同样的也查阅了很多的相关资料,但是惭愧的是并未找到实质性有用的资料,很多都是避开这话题避而不谈,在此我们详细的对它进行剖析。
不得不说这个框架编写的真的很棒,相关类也是错综复杂,不过作为程序员我们必须拿出积极的态度来学习它的实现,以此提高自身的价值;好了废话不多说我们马上开始。
源码剖析
我们先来看一下外部的实现调用
public void onMineTask() {
//声明一个ObservableCreate类型的 被观察者对象
Observable mObservable = new ObservableCreate(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("ONE");
e.onNext("TWO");
e.onNext("THREE");
e.onNext("FOUR");
}
});
//声明一个Observer类型的观察者对象
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
Log.e("回调执行了onSubscribe函数->", "观察者已成功订阅");
}
@Override
public void onNext(String value) {
if (disposable.isDisposed())
onComplete();
value = mine_result.getText().toString() + "\n" + value;
mine_result.setText(value);
Log.e("回调执行了onNext函数->", value);
}
@Override
public void onError(Throwable e) {
Log.e("回调执行了onError函数->", e.getMessage());
}
@Override
public void onComplete() {
Log.e("回调执行了onComplete函数->", "本次结束!");
}
};
//指定被观察者和观察者运行的线程
//指定 被观察者线程 ObservableSubscribeOn类型
mObservable.subscribeOn(Schedulers.io())
//指定 观察者线程 ObservableObserveOn类型
.observeOn(AndroidSchedulers.mainThread())
//事件发布
.subscribe(observer);
}
我们通过上述的代码可以看出mObservable
它是ObservableCreate
类型,通过他调取了[1]subscribeOn(Schedulers.io())
来指定被观察者的执行线程,并返回一个Observable
类型的返回值,当然这个Observable
类型是个抽象类型;
而获取到Observable
类型的返回值之后,我们有调用了[2]observeOn(AndroidSchedulers.mainThread())
函数方法,实现了对观察者执行线程的指定,此时我们得到返回值依旧是Observable
类型;
最终我们依旧使用Observable
类型的返回值,通过调取[3]subscribe(observer)
函数实现主题发布,并携带着observer
观察者实例。
那么我们接下来,就按序讲解他们的执行的过程:
[ 1 ]
subscribeOn(Schedulers.io())
函数方法,源码解析
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
//此处查Null操作
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//这里是一个钩子函数,在无扩展性特殊操作情况下,
//原封不动的返回一个ObservableSubscribeOn类型的值,
//而由于它的基类是 Observable类型,所以我们直接将其基类类型
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
我们来看RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler))
函数方法,它其实是一个钩子函数,在无扩展性特殊操作情况下,会原封不动的返回一个ObservableSubscribeOn
类型的值,而由于它的基类是 Observable
类型,所以我们直接将其基类类型返回。
而我们重点来看这方法中的参数new ObservableSubscribeOn<T>(this, scheduler)
,它是声明了一个ObservableSubscribeOn
类型对象,并传入了一个Observable
类型的被观察者,但由于Observable
实现了ObservableSource
的接口,而我们也只需要ObservableSource
这部分内容,所以最终声明ObservableSubscribeOn
类型实例,我们传入的是一个ObservableSource
类型参数和一个指定运行线程的Scheduler
类型参数.
在ObservableSubscribeOn
类中我们对subscribeActual(final Observer<? super T> observer)
抽象方法进行了实现.但目前还未进行调用
[ 2 ]
observeOn(AndroidSchedulers.mainThread())
函数方法,源码解析
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//调用其自身的多态方法
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//省略...
//钩子函数,若无扩展性特殊逻辑实现依旧返回一个`ObservableObserveOn`类型的值,
//由于`Observable`是其基类,所以这里直接将其基类类型返回值
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
此方法与[ 1 ]中的方法一样这里不再赘述,我们的重点是其参数new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
,它返回是一个ObservableObserveOn
类型的值,但由于它的基类是Observable
类型,所以此处直接按基类类型返回,它携带参数包括
ObservableSource
类型的this
;Scheduler
类型的执行线程;boolean
类型的延迟错误,指示onError通知是否不能在另一侧的onNext通知之前切换;int
类型的缓冲区大小.
ObservableObserveOn
类同样对`subscribeActual(final Observer<? super T> observer)抽象方法进行了实现.但目前还未进行调用.
[ 3 ]
subscribe(observer)
函数实现主题发布,携带着观察者参数。源码解析:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
//省略...
//调取 subscribeActual抽象方法
subscribeActual(observer);
}
//省略...
}
我们来看subscribeActual(observer)
这个函数,我们的被观察者的ObservableSubscribeOn
和观察者的ObservableObserveOn
分别都重写了此函数,那么此处的subscribeActual(observer)
将进入哪个类呢?
我们将外部的调用链式表达分解一下来看:
Observable observableSubscribeOn1 = mObservable.subscribeOn(Schedulers.io()); //第一次
//指定 观察者线程 ObservableObserveOn类型
Observable observeObserveOn2 = observableSubscribeOn1.observeOn(AndroidSchedulers.mainThread());
//发布执行
observeObserveOn2.subscribe(observer);
很清晰的我们能看到,最终对subscribe(observer)
函数方法发起调用的是观察者的ObservableObserveOn
类,所以我们将会执行ObservableObserveOn
类中的subscribeActual(observer)
实现
public ObservableObserveOn(ObservableSource<T> source, Scheduler
@Override
protected void subscribeActual(Observer<? super T> observer) {
//省略...
//创建一个线程,当前创建出来的是一个Handler线程
Scheduler.Worker w = scheduler.createWorker();
//此处我们调用上一个Observable类型对象的发布事件
//即 被观察者对象的发布事件
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
首先明确一点此处的source
对象是被观察者的Observable
类型实例,由于在指定运行线程时我们都传入一个Observable
类型对象,而传递的Observable
都是上一个对象的实例。
此处的参数new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
是声明的ObservableObserveOn
类的静态内部类的实例。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}
可以看出它实现了Observer
接口,而我们在source.subscribe(...)
方法中也仅需要这部分内容,至此我们又回到了Observable
类中的subscribe(Observer<? super T> observer)
函数方法。
public final void subscribe(Observer<? super T> observer) {
//省略...
subscribeActual(observer);
//省略...
而再次调用subscribeActual(observer)
函数方法,不会再进入到ObservableObserveOn
类中,而是进入ObservableSubscribeOn
类,执行其subscribeActual(final Observer<? super T> observer)
实现方法
@Override
public void subscribeActual(final Observer<? super T> observer) {
//将观察者包装成ObservableObserveOn类型
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//[ 3.1 ]调用此方法会进入观察者`ObservableObserveOn`类中的实现方法
//可以看出此处还没进行被观察者的线程指定
observer.onSubscribe(parent);
//[ 3.2 ]在此处设置被观察者的运行线程
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
[ 3.1 ] 我们来看看
observer.onSubscribe(parent)
,它在ObservableObserveOn
类的内部类ObserveOnObserver
的onSubscribe(Disposable d)
做了什么
@Override
public void onSubscribe(Disposable d) {
//省略...
downstream.onSubscribe(this);
}
}
downstream
对象使我们在创建ObserveOnObserver
类型实例时传入的Observer
类型实例,即观察者实例,而它调取onSubscribe(this)
方法后直接回调至我们的外部实现中所创建的观察者对象的onSubscribe(Disposable d)
中,通知观察者订阅成功。
[ 3.2 ] 接下来分析一下
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))
方法是如何指定被观察者运行线程的,
先来分析他参数实现scheduler.scheduleDirect(new SubscribeTask(parent))
,
[ 3.2.1 ] 来看
new SubscribeTask(parent)
做了什么
//实现Runnable 接口
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//调取事件发布事件,而此处的source是`ObservableCreate`类型
source.subscribe(parent);
}
}
此处实现了Runnable
接口,并在重写的run()
方法中调取了source.subscribe(parent)
,首先明确此时的source
对象它的类型是谁?答案是ObservableCreate
,因为在创建ObservableSubscribeOn
类型实例时传入的ObservableSource
类型参数是上一个对象,所以在执行subscribe(parent)
方法后我们会通过subscribeActual(observer)
方法函数,进入到ObservableCreate
类中对subscribeActual(observer)
方法实现;而在此处我们还未进行有效的调用.
[ 3.2.2 ] 分析完
SubscribeTask
后我们接着分析他外层scheduler.scheduleDirect(new SubscribeTask(parent))
方法做了什么
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//创建一个Worker线程,
final Worker w = createWorker();
//钩子函数,若无扩展性特殊处理则返回参数本身
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//声明一个处理任务,将Runnable和Work封装成DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
//调取woker对象的schedule方法
w.schedule(task, delay, unit);
return task;
}
<3.2.2.1> 我们先调用了createWorker()
函数创建了一个EventLoopWorker
类型对象,利用它实现线程调度;而EventLoopWorker
继承于Scheduler.Worker
抽象类
static final class EventLoopWorker extends Scheduler.Worker {
//省略...
//结束操作
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// 释放线程池操作
pool.release(threadWorker);
}
}
//是否结束
@Override
public boolean isDisposed() {
return once.get();
}
//重写进程调度
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//执行线程调度执行
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
<3.2.2.2> 然后声明了一个DisposeTask
类型对象,将Runnable
对象和线程调度器包装在一起,而DisposeTask
类实现了Runnable
接口,重写了run()
函数方法,
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
//省略...
@Override
public void run() {
runner = Thread.currentThread();
try {
//执行Runnable
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
//省略...
}
<3.2.2.3> 完成上述预备工作后,我们开始调用w.schedule(task, delay, unit)
方法进行任务执行
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//判定任务是否成功被订阅
if (tasks.isDisposed()) {
// 若没有被订阅,则不作任何处理
return EmptyDisposable.INSTANCE;
}
//进行线程调度
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
<3.2.2.4> 调用 scheduleActual()
进行线程任务调度
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
// 省略...
try {
// 线程提交
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
线程被提交后,会交给Executor
来执行,调用Runnable
接口的run
方法,那么这就简单了.
小结
在上述操作中我们成功的将[被观察者]提交给了指定的线程去执行,那么接下来就是线程向上执行调取被观察者的
run()
的流程了,别头大,这个框架的复杂度就是那么绕~~~,想要成长我们就得迈过这道坎,毕竟成长的就是这么痛苦!!
这里我建议一下,阅读这样复杂的框架的时候我们最好在Debug
模式下,跟随着框架流程进行学习····好了休息一下吧!我们马上开始下半场的学习。
[ 3.3 ]我们在<3.2.2.4>()中执行的
scheduleActual()
中将Runnable
封装成了ScheduledRunnable
类型,那么首先执行进入它的里面,看看它的run()
方法做了什么诡谲的操作.
public void run() {
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
//获取的上层的Runnable对象,执行其run方法
actual.run();
} catch (Throwable e) {
//若发生异常,则直接调用onError通知观察者
RxJavaPlugins.onError(e);
}
//省略....
}
可以看到ScheduledRunnable
类中的run()
并未做什么特殊操作,而是执行了上层的Runnable
类型对象的run()
方法,那么它的上层是谁呢?
[ 3.4 ] 目光转向[ 3.2.2 ]的源码,我们将其中把
Runnable
和Work
类型对象关联了起来,并封装成了一个DisposeTask
类型对象,那么废话不多说我们直接去看看其中的源码
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
[ 3.5 ]依旧是继续向上调取,我最终回到了ObservableSubscribeOn
类中内部类SubscribeOnObserver
中的内部类的SubscribeTask
的run()
方法,饶了这么大一圈最终还是回到了最初始调取的地方才有实质的处理操作_!! 来看看吧
final class SubscribeTask implements Runnable {
// 省略...
@Override
public void run() {
//调取被观察者的 subscribe方法
source.subscribe(parent);
}
}
[ 3.6 ] 此时的
source
就是外部第一次声明的Observable
类型对象的实例了,即ObservableCreate
类,那么我们将再次回到Observable
类中执行其subscribe(Observer<? super T> observer)
函数,老套路来看吧!
public final void subscribe(Observer<? super T> observer) {
//省略...
subscribeActual(observer);
//省略...
}
[ 3.7 ]我们又一次要执行
subscribeActual(observer)
方法了,那么这次我们要进入哪个类呢?猜对了就是ObservableCreate
类
@Override
protected void subscribeActual(Observer<? super T> observer) {
//将观察者封装成CreateEmitter类型
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//回调观察者的接口,告诉观察者订阅成功,
//并将新封装的观察者传递过去
observer.onSubscribe(parent);
try {
//回调外部被观察者的接口,告诉被观察者主题已被订阅,
//可进行接下来相关操作了,并将最新封装的观察者对象传入
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
[ 3.8 ] 终于出去了=_=!! 我们首先通知观察者成功订阅主题,然后再告诉被观察者被成功订阅,可以新进行接下来的主题发布了!!
Observable mObservable = new ObservableCreate(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//[ 4 ]发布下一个
e.onNext("ONE");
e.onNext("TWO");
e.onNext("THREE");
e.onNext("FOUR");
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("回调执行了onSubscribe函数->", "观察者已成功订阅");
}
@Override
public void onNext(String value) {
Log.e("回调执行了onNext函数->", value);
}
@Override
public void onError(Throwable e) {
Log.e("回调执行了onError函数->", e.getMessage());
}
@Override
public void onComplete() {
Log.e("回调执行了onComplete函数->", "本次结束!");
}
};
[ 3.8.1 ]调取
ObservableEmitter
类型的观察者的onNext(Object value)
方法
@Override
public void onNext(T t) {
//省略...
if (!isDisposed()) {
//调取未进行包装的观察者的onNext方法函数
observer.onNext(t);
}
}
[ 3.8.2 ]这里的并没有进行任何处理,而是调取了上层的
SubscribeOnObserver
类型的观察者的onNext(Object value)
@Override
public void onNext(T t) {
downstream.onNext(t);
}
[ 3.8.3 ] 这里依旧是继续向上调取,通过调取上层的
ObserveOnObserver
类型的观察者的onNext(Object value)
方法
@Override
public void onNext(T t) {
//省略
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//进行线程调度,并将观察者传入
worker.schedule(this);
}
}
[ 3.8.4 ]此处进行观察者执行线程的调度,由于我们在外部指定观察者运行于
UI
线程,所以此处我们HandlerScheduler
类的schedule(Runnable run, long delay, TimeUnit unit)
方法中
public abstract class Scheduler {
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
}
}
final class HandlerScheduler extends Scheduler {
private static final class HandlerWorker extends Worker {
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//省略...
//将handle和run关联起来包装成新对象
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
if (async) {
message.setAsynchronous(true);
}
//handler发送消息
handler.sendMessageDelayed(message, unit.toMillis(delay));
//省略...
return scheduled;
}
}
}
[ 3.8.5 ] 我们在这里将
Runnable
包装成Message
同过Handler
进行发送,这将进入到Hanlder
类中的runWithScissors(final Runnable r, long timeout)
方法
public final boolean runWithScissors(final Runnable r, long timeout) {
//省略
if (Looper.myLooper() == mLooper) {
//执行Runnable接口的run()方法
r.run();
return true;
}
//省略...
}
[ 3.8.6 ] 通过判定我们当前运行的线程是否跟
Handler
所在的线程是否一致 ,如果一致直接运行Runnable
的run()
方法,即我们再次回到了HandlerScheduler
类中的内部类HandlerWorker
下,执行其run()
方法
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
[ 3.8.7 ]没有意外,这没有进行任何处理,继续向上传递进入
ObservableObserveOn
类的run()
中
@Override
public void run() {
//省略...
drainNormal();
}
void drainNormal() {
//省略...
a.onNext(v);
//省略...
}
[ 3.8.8 ] 可以看到在此处我们回调了外部声明观察者接口的
onNext(Object value)
方法,进而执行我们自定义的业务!!!
@Override
public void onNext(String value) {
if (disposable.isDisposed())
onComplete();
value = mine_result.getText().toString() + "\n" + value;
mine_result.setText(value);
Log.e("回调执行了onNext函数->", value);
}
小结
至此完成了线程切换的全部过程,不得不说这个框架的复杂程度尤其之高,为了写这篇文章也是累的自己一头的汗=_=!! 一开始分析源码的时候真是一个头两个大,主要是其中的类粘连性太强了,特别容易把自己搞得晕头转向,不过分析完之后脑中也就豁然开朗了。
由于篇幅太长,我们将整体流程大致的再进行一下梳理,之后我会将流程图奉上!!
流程梳理
- 4.使用最新的[被观察者]调取
subscribe(observer)
方法进行主题发布,并将观察者作为参数传入。- 5.调用新的被观察者2 对象的
subscribeActual(Observer<? super T> observer)
方法,创建一个执行线程,调取新的被观察者1的subscribe(Observer<? super T> observer)
方法,比创建一个ObserveOnObserver
类型对象传入- 6.调用新的被观察者1 对象的
subscribeActual(Observer<? super T> observer)
方法,创建一个执行线程,在线程的run()
方法中调取被观察者 0的subscribe(Observer<? super T> observer)
方法,声明一个SubscribeOnObserver
类型的对象传入- 7.调用被观察者0 对象的
subscribeActual(Observer<? super T> observer)
方法,通过调取被观察者的subscribe(ObservableEmitter e)
方法,回调到了我们的外部实现.- 8.在外部的
subscribe(...)
方法,调取观察者的onNext(Object value)
方法,而此时的观察者经过一套流程下来也是被进行的层层包装,最终回调到了我们的外部实现.
这篇文章篇幅我有点长,希望有兴趣的同学慢慢读,之后我把流程图梳理完毕后会如数奉上~~~
此文章只代表个人理解观点,如有差错希望积极之处,我们共同交流!!!