在RxJava2源码浅析(一) 里我们分析RxJava2最简单的用法,实际上就是复杂一点的回调.今天一起来看看线程调度的神秘面纱,一步一步的分析,同样的我们拿一段代码来分析.
本文基于:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
public static void debug() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
① public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
② Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
1. observable.observeOn(AndroidSchedulers.mainThread())
2. .subscribeOn(Schedulers.newThread())
3. .subscribe(consumer);
}
开始两段代码就是预处理Observable和consumer, 是很普通的对象生成.
然后在标号为1和2的两句里则是传入scheduler调度器并进行包装, 这个就是类名看起来好复杂, 经常不知道传入的对象是哪个跟哪个,还好AS的调试器比较强大,这2句代码便于方便记忆,我画了一张图.
加上了自己的理解,能看懂就好,哈哈,最后一句是因为在初始化LambdaObserver的时候传了2个空实现.
它后面的我没画是因为跳转太深,直接贴代码:
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
parent.setDisposable() 是一个set方法, 里面是安全判断.那重点就是scheduler.schedulerDirect()方法了,跟进去
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
首先生成了一个Worker, 它是抽象调度器scheduler的一个抽象内部类,我们就把它想象成实际工作的工人吧,它的工作需要我们指定一个调度.由谁来执行它呢,看代码:
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
}
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
由一个调度池工厂方法生成一个执行者 executor.就是说当我们创建了工人后,它的领导就附带上了.
再回到scheduleDirect()方法中,下一步就是Hook检测,然后就是w.schedule正式工作了
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f); //收尾工作
} catch (RejectedExecutionException ex) {
parent.remove(sr);
RxJavaPlugins.onError(ex);
}
return sr;
}
这里又new了一个Runnable . 有什么不一样吗?
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
...
}
继承了AtomicReferenceArray,看名称就是原子性的操作, 其实在Rxjava中,大量的类都继承了原子特性.有一个很重要的方法compareAndSet.
public final boolean compareAndSet(int i, E expect, E update)
作用是以原子方式赋值.如果当前值expect==内存值,则将i位置的值以update更新,否则返回false.
再继续跟进,由于delayTime默认为0,则只需executor.submit()方法, 这个方法就是最终的执行者了.然后代码会进入ScheduledRunnable线程的run.
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
@Override
public void run() {
try {
try {
actual.run();
...
很明显actual就是我们的decoratedRun了. 这个run从一开始new 出来的Runnable->decoratedRun ->sr经历了3层封装,一切都为了数据安全,不难想象,它最终的回调也是一层一层往回走的.即又会回到
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
>>> source.subscribe(parent); <<<
}
}));
请注意,此时还在子线程中呢.而且source就是ObservableObservOn对象,可以对照那张png图片看看.
但是ObservableObservOn并没有实现subscribe(), 因此由Observable来执行.
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { //
...
}
当执行到subscribeActual()时,又回到了ObservableObservOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
有一点,那就是这个方法的参数是谁?它就是上上片代码的parent
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
就是在回调时带过来的,而source成员变量是谁呢? 就是png图片中的ObservableCreate对象, scheduler呢, 是AndroidSchedulers.mainThread()生成的handlerScheduler.
紧接着主线程调度器生成了worker. 同样的,ObservableCreate也没有实现subscribe(), 被他的父类Observable执行,最后也同样的来到了subscribeActual:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
5. source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
很熟悉的代码段,只是比上一篇的参数较复杂而已,它的类型ObservableObserveOn#ObserveOnObserver , 反正也是Oberver. 然后来到它的onSubscribe()
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
...
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
这个actual还记得吧.就是构造ObserveOnObserver的参数SubscribeOnObserver,它的onSubscribe():
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
数据检查的作用.回到标号5的代码.很显然实现了线程的回调.即①的位置
在执行emitter.onNext(1);时
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t);
}
}
此时的observer类型是ObservableObserveOn#ObserveOnObserver,进入他的onNext():
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
queue添加了一个t (Integer)
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
这个worker在执行subscribeActual时就是handlerScheduler,this是observer,紧跟着进入HandlerScheduler
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
看到没,线程调度的面目原来是handler来处理的.消息发送后处理run了,run是上面的this传来的,别忘了
这个执行完后,它并不是直接去运行run参数的run方法.而是先处理sr.setFuture(f) // 收尾工作. 子线程处理完成后才到达run.run()
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
剩下的就往回调了,进入ObservableObserveOn drainNormal()方法:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
...
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
a = actual . actual 显然是ObservableSubscribeOn#SubscribeOnObserver对象,png上写的它真正的Observer, 真的是贯穿始末啊.a.onNext()后,进入ObservableSubscribeOn
@Override
public void onNext(T t) {
actual.onNext(t);
}
这个actual就是LambdaObserver,紧跟着进入它的onNext吧:
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
}
}
}
这个onNext地球人都知道了.
还记得吗,从进入ScheduledRunnable后子线程就死亡了,然后一直在主线程.可以说,线程的转折点就在这里
结束
当然本文只是基于一开始的代码示例,更复杂的也是同样的,加入了上游事件的循环.谢谢大家!