上一篇讲了retrofit2的原理,这一篇咱们重点讲讲rxjava2的实现原理。不过呢,由于rxjava2博大精深,这里篇幅有限,而且精力有限,所以这里只讲以下几个点:
1.上游被观察者Observable的创建
2.subscribeOn如何初始化调度线程者
3.observeOn如何初始化调度线程者
4.下游观察者的创建,上游被观察者的订阅事件,subscribeOn如何调度上游被观察者线程以及observeOn如何调度下游观察者线程
//第一步
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//log1
LogUtil.d("subscribe : " + Thread.currentThread().getName());
e.onNext("a");
e.onComplete();
}
}).subscribeOn(Schedulers.io())//第二步
.observeOn(AndroidSchedulers.mainThread())//第三步
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//log2
LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
}
@Override
public void onNext(String str) {
//log3
LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
//log4
LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
//log5
LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
}
});//第四步
一、上游被观察者Observable的创建:
首先进入Observable的create方法里面去:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final class RxJavaPlugins {
...代码省略...
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
...代码省略...
}
RxJavaPlugins.onAssembly 是个hook方法,在这里不做详细介绍,我们可以
RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
直接看成
new ObservableCreate<T>(source)
就可以了。
下面很多地方会有类似的调用,都可以以这种形式看待,就不在说明了。
这里就返回了ObservableCreate的对象,然后把source传了进来,这里的source便是一开始new出来的ObservableOnSubscribe对象。
二、subscribeOn如何初始化调度线程者
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
1.我们可以看出此时返回的是一个ObservableSubscribeOn对象,参数一个是this说明是第一步new出来的ObservableCreate对象,这个就是当前对象里面的source变量,第二个参数是scheduler即Schedulers.io()。
2.那么Schedulers.io()到底是什么呢,一起去看看吧:
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
public final class RxJavaPlugins {
...代码省略...
/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
*/
@NonNull
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
...代码省略...
}
其实这也是个hook方法,所以返回的就是IO。再来看下IO是个什么东西
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
上图所知,这是个静态块,在程序加载的时候就会初始化出来
/**
* Calls the associated hook function.
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
然后根据hook function可以得知返回的Scheduler就是
callRequireNonNull(defaultScheduler)返回的对象。这个defaultScheduler是个IOTask对象,看下面代码
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
IOTask有个重写方法call,返回的是IoHolder.DEFAULT即IoScheduler。那么这个call方法是什么时候调用的呢,其实就是上面的callRequireNonNull方法里面调用。
所以可以得出Schedulers.io()获取到的就是IoSchedulers对象。
三、observeOn如何初始化调度线程者
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
1.可以看出来这里返回的是一个ObservableObserveOn对象,这里的this参数就是第二步的ObservableSubscribeOn对象,这个就是当前对象里面的source变量。而这个scheduler参数就是传入的AndroidSchedulers.mainThread(),看名字大概能猜出来这是个主线程调度者
2.那么就随我进入AndroidSchedulers的代码里一探究竟吧:
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
...代码省略...
}
mainThread方法里面返回的是MAIN_THREAD,而这个MAIN_THREAD是通过RxAndroidPlugins.initMainThreadScheduler返回的MainHolder.DEFAULT对象,也就是HandlerScheduler对象。这个对象传入了主线程的handler,用来调度线程。
四、下游观察者的创建,上游被观察者的订阅事件,subscribeOn如何调度上游被观察者线程以及observeOn如何调度下游观察者线程
最关键的一步来了,先上代码:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
1.ObjectHelper.requireNonNull已经出现了很多次,其实就是为了判断是否为空用的
2.这里的observer就是传入的观察者,即刚刚在最外层new出来的observer。
3.这个时候进入subscribeActual方法,发现这是个抽象方法,那么是谁实现了这个方法呢。还记得我们现在的subscribe是谁调用的嘛,没错就是第三步返回的ObservableObserveOn对象,所以ObservableObserveOn实现了subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里的source就是之前步骤3.1保存的ObservableSubscribeOn对象,而observer就是最外层传入的观察者对象,由此可以看出观察者被内部类ObserveOnObserver进行了装饰。
w就是handler线程调度者创建的工作者主要用来将ObserveOnObserver(实现了runnable接口)通过handler发送到主线程上面去,从而实现线程调度
4.这个时候要调用source的subscribe了,跟ObservableObserveOn类似,之后进入到ObservableSubscribeOn的subscribeActual方法中:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这里的s就是步骤4.3new出来的ObserveOnObserver对象,这里将s装饰到SubscribeOnObserver对象中,然后调用ObserveOnObserver的onSubscribe方法 ↓↓↓
@Override
public void onSubscribe(Disposable s) {
...代码省略...
actual.onSubscribe(this);
}
}
这里的actual其实就是最外层的观察者,然后就会调用最外层观察者的onSubscribe方法,此时由于并没有做任何的线程调度,所以当前的操作处于主线程中。因此log5出打出来的线程应该是主线程
好了,我们再回到subscribeActual方法中,此时调用scheduler.scheduleDirect方法,将SubscribeTask对象传入:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
run就是传入SubscribeTask的对象,将SubscribeTask的对象装饰到DisposeTask中,
w是IO线程调度者创建的切换线程的实际工作者,此时调用createWorker就会执行IoScheduler的createWork方法new 出EventLoopWorker对象:
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
而w.schedule也就是执行了EventLoopWorker的schedule方法,将DisposeTask传入:
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
接着调用threadWorker里面的scheduleActual方法:
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
因为之前传入的delayTime是0,所以此时会走 f = executor.submit((Callable<Object>)sr);这条线。然后我们看下executor是什么时候初始化的:
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
...代码省略...
/**
* Creates a ScheduledExecutorService with the given factory.
* @param factory the thread factory
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
由此可以得出这个executor是ScheduledExecutorService对象,然后调用ScheduledExecutorService的submit方法:
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
...代码省略...
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<Void> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
这里先是ScheduledFutureTask的初始化,将command传入,然后调用其父类的构造方法->调用Executors.callable方法返回一个RunnableAdapter对象(将command传入构造方法内,赋值给task),然后赋值给callable对象。
然后调用delayedExecute以后就会执行ScheduledFutureTask的run方法,调用父类的run方法:
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
接着就调用callable的call方法,这个callable就是上面提到的RunnableAdapter对象,调用RunnableAdapter的call方法->task.run,此处的task便是传入的command。
那么往前追溯到NewThreadWorker的schedule中,发现传入到ScheduledThreadPoolExecutor中的command是ScheduledRunnable对象,也就是说调用了ScheduledRunnable.run->DisposeTask.run->SubscribeTask.run->SubscribeOnObserver.run->source.subscribe(parent);
所以上面的一系列操作全部都在子线程中执行
SubscribeOnObserver里面的source是之前保存的ObservableSubscribeOn的ObservableCreate对象。
parent是装饰了下游ObservableObserveOn的内部类对象(装饰了外部观察者)的ObservableSubscribeOn的内部类,这句话比较拗口,多读几次理解一下。
5.这时候就执行到ObservableCreate的subscribeActual方法了:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
observer是装饰了下游ObservableSubscribeOn的内部类SubscribeOnObserver对象,该对象装饰了他下游ObservableObserveOn的内部类ObserveOnObserver对象(该对象装饰了外部观察者)如下图所示:
source是一开始在创建ObservableCreate时就传入的外部被观察者对象
因此,最终source.subscribe(parent)会调用最外层被观察者的subscribe方法,因此log1打印出来的线程应该处于子线程中,
//log1
LogUtil.d("subscribe : " + Thread.currentThread().getName());
e.onNext("a");
e.onComplete();
e就是传入的CreateEmitter对象,调用createEmitter的onNext方法:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
会依次调用observer.png所展示的对象的onNext方法,重点看下ObservableObserveOn.ObserveOnObserver的onNext方法:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
worker就是上面步骤4.3里面调用HandlerScheduler.createWorker方法new出来的HandlerWorker对象,调用HandlerWorker的schedule方法:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
将runnable封装到消息对象里面发送给主线程,主线程执行run方法,从而完成了线程调度,这时操作处理已经进入到了主线程当中执行ScheduledRunnable的run方法:
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
这里的delegate就是ObservableObserveOn.ObserveOnObserver对象,所以重新进入ObservableObserveOn.ObserveOnObserver代码的run方法中:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
run-> drainNormal->actual.onNext(v);
这里的actual便是最外层的观察者,所以观察者里面的log3,log4,log5都会是在主线程中打印出来。
接下来我们打印出来看看:
08-08 14:09:03.138 4928-4928/com.zkyl.zkyl D/(MainActivity.java:57): onSubscribe onSubscribe : main
08-08 14:09:03.142 4928-4974/com.zkyl.zkyl D/(MainActivity.java:47): subscribe subscribe : RxCachedThreadScheduler-1
08-08 14:09:03.162 4928-4928/com.zkyl.zkyl D/(MainActivity.java:62): onNext onSubscribe : main
08-08 14:09:03.162 4928-4928/com.zkyl.zkyl D/(MainActivity.java:72): onComplete onSubscribe : main
确实正如代码分析一般只有log1是处于子线程中,其他都处于主线程当中。
感想
rxjava2的设计实在是过于庞大,精髓远远不止这些。研究到这里已经非常累了,所以rxjava2的研究道路还是非常遥远漫长的