Android开源项目原理系列
[搞定开源01] okhttp 3.10原理
[搞定开源02] okio 1.14原理
[搞定开源03] retrofit 2.4.0与设计模式
干巴巴地看源码,理解得很模糊,亲自写一遍才知道真正掌握并不容易。折腾一天,模拟实现了RxJava最简单的入门例子,代码在这里,很水的。
发射一个数据,使用操作符map变换,产生数据运行在一个线程,消费数据运行在另一个线程。
Observable observable = Observable.just(10)
.map(integer -> {
System.out.println("[Operation] map at " + Thread.currentThread().getName());
return String.valueOf(integer * integer);
}).subscribeOn(new NewThreadScheduler("subscribeOnThread"))
.observeOn(new NewThreadScheduler("observeOnThread"));
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("[onSubscribe] at " + Thread.currentThread().getName() + " upstream[" + disposable.getClass().getSimpleName() + "]");
}
@Override
public void onNext(String value) {
System.out.println("[onNext] value [" + value + "] at " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
System.out.println("[onError] -> " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("[onComplete] at " + Thread.currentThread().getName());
}
};
observable.subscribe(observer);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
运行结果很明确,just和map操作运行在subscribeOnThread,onNext和onComplete运行在observeOnThread。
[Operation] just at subscribeOnThread-1
[Operation] map at subscribeOnThread-1
[onNext] value [100] at observeOnThread-1
[onComplete] at observeOnThread-1
在模仿RxJava的过程中,不过多考虑代码扩展和并发问题,不一定和RxJava源码对象一一对应,在保持思想一致的基础上,分析最核心的骨骼。
1、Observer
Observer是观察者,定义为一个接口,提供的函数是熟悉的onSubscribe、onNext、onError、onComplete。
public interface Observer<T> {
void onSubscribe(Disposable disposable);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
订阅后,触发onSubscribe,得到Disposable(我叫它可中止对象)。
public interface Disposable {
void dispose();
boolean isDisposed();
}
RxJava在链式执行和线程调度时,可以随时调用disposable.dispose()中止执行,因此内部很多对象需要实现Disposable接口。
2、Observable
public abstract class Observable<T> {
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
abstract void subscribeActual(Observer<? super T> observer);
public static <T> Observable<T> just(T value) {
return new ObservableJust(value);
}
}
Observable是被观察者,被Observer这个观察者订阅,最核心的订阅函数subscribe理论应该定义在Observer,但为了保持链式结构,现在定义在Observable。这里subscribe简单地调用subscribeActual,subscribeActual作为抽象函数,需要子类实现。
public class ObservableJust<T> extends Observable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
void subscribeActual(Observer<? super T> observer) {
JustTask task = new JustTask(observer, value);
observer.onSubscribe(task);
task.run();
}
}
例子第一个使用just操作符,发射一个整形。很自然地,调用just函数返回的ObservableJust也是一个Observable,需要实现subscribeActual。
just是个很简单操作,不用看源码就能猜到原理是调用observer.onNext(),发射结束调用observer.onComplete()。
为了满足任何操作都能够中止,需要实现Disposable,因此封装一个JustTask。
public static final class JustTask<T> implements Disposable {
final Observer<? super T> downstream;
final T value;
private boolean disposed = false;
public JustTask(Observer<? super T> downstream, T value) {
this.downstream = downstream;
this.value = value;
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
public void just() {
downstream.onNext(value);
downstream.onComplete();
}
}
非常简单的逻辑,使用标志位disposed表示是否被中止,just函数分别调用传入Observer的onNext、onComplete。RxJava正是在这种简单的观察者模式上,一步步添加操作符,构建其强大的功能。
3、map操作符原理
常用的操作符map,实现对象一对一转换。创建ObservableMap继承Observable,在Observable添加map函数。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<>(this, mapper);
}
map需要提供一个转换函数Function,定义为接口,实现从类型T转换为类型R。
public interface Function<T, R> {
R apply(T t) throws Exception;
}
不同于just无脑发射传入的数据,map在接收到上游数据时,需要在onNext中增加调用Function.apply,于是添加一个MapObserver继承Observer,在接收数据时做出不同处理。
static final class MapObserver<T, R> implements Observer<T>, Disposable {
private Disposable upstream;
private final Observer<? super R> downstream;
private final Function<? super T, ? extends R> mapper;
private boolean done = false;
public MapObserver(Observer<? super R> downstream, Function<? super T, ? extends R> mapper) {
this.downstream = downstream;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable disposable) {
this.upstream = disposable;
downstream.onSubscribe(this);
}
@Override
public void onNext(T t) {
if (done) return;
R r = null;
try {
r = mapper.apply(t);
} catch (Exception e) {
fail(e);
return;
}
downstream.onNext(r);
}
//onComplete和onError直接调用下游Observer对应函数
@Override
public void dispose() {
upstream.dispose();
}
@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
在onNext调用apply很简单不用多说,从map操作符开始,需要注意上下游的概念。对于just操作符,它是发射数据的生产者,只能是上游。在这里,map的上游是just,下游是传入的Observer,新增操作符时要注意下面两点:
- 对于onSubscribe,当自己被订阅,需要调用下游的onSubscribe;
- 对于dispose,当自己被中止,需要调用上游的dispose。
@Override
void subscribeActual(Observer<? super R> observer) {
source.subscribe(new MapObserver<T, R>(observer, function));
}
MapObserver准备好后,在ObservableMap.subscribeActual使用MapObserver包装传入的Observer,直接执行订阅即可。
4、subscribeOn原理
核心来了,使用RxJava一大原因是它的线程调度,相关的核心类有Scheduler和Worker。我们模拟在一个新线程中执行数据的发射。
Worker
public abstract static class Worker implements Disposable {
public abstract Disposable schedule(Runnable run);
}
Worker是任务的具体执行者,它是一个抽象类,提供schedule函数让子类实现。任务可以随时被中止,因此Worker需要实现Disposable。
public class NewThreadUsePoolWorker extends Scheduler.Worker {
private final ExecutorService executor;
private boolean disposed;
public NewThreadUsePoolWorker() {
executor = Executors.newCachedThreadPool();
}
@Override
public Disposable schedule(Runnable run) {
CancelableRunnableTask cancelableRunnableTask = new CancelableRunnableTask(run);
try {
Future future = executor.submit(cancelableRunnableTask);
cancelableRunnableTask.setFuture(future);
} catch (RejectedExecutionException e) {
//uncaught error
}
return cancelableRunnableTask;
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
@Override
public boolean isDisposed() {
return disposed;
}
}
按着第六感,肯定直接开一个线程执行传入的Runnable,但是为了支持可中止,需要将Runnable包装进CancelableRunnableTask。回忆什么样的任务可以被中止,应该想到Future。
public class CancelableRunnableTask implements Disposable, Callable<Void> {
private Future<?> future;
private final Runnable runnable;
public CancelableRunnableTask(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void dispose() {
future.cancel(true);
}
@Override
public boolean isDisposed() {
return future.isCancelled();
}
public final void setFuture(Future<?> future) {
this.future = future;
}
@Override
public Void call() throws Exception {
runnable.run();
return null;
}
}
CancelableRunnableTask是一个Callable任务,call函数直接执行Runnable并返回null。那么如何得到Future呢?必须得使用线程池。Worker里维护了executor这个线程池,并提交Task,得到的Future保存回Task里,Task返回外部。
有点绕,应该可以理解外部调用Worker.schedule(),得到CancelableRunnableTask对象,中止时最终调用Future.cancel()。
Scheduler
Scheduler定义了线程调度,RxJava提供IO、NEW_THREAD或者传入自己线程池等几种方式。Scheduler是一个抽象类,提供createWorker和scheduleDirect函数,具体的执行过程交给Worker。
public abstract class Scheduler {
public abstract Worker createWorker();
public Disposable scheduleDirect(Runnable run) {
final Worker worker = createWorker();
return worker.schedule(run);
}
}
public final class NewThreadScheduler extends Scheduler {
private String threadName;
public NewThreadScheduler(String threadName) {
this.threadName = threadName;
}
@Override
public Worker createWorker() {
return new NewThreadUseRunnableWorker();
}
}
自定义NewThreadScheduler,在createWorker函数里创建上面准备好的Worker。如果是其他方式,需要提供Worker的管理,这里就不需要了。
ObservableSubscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<>(this, scheduler);
}
惯例在Observable添加subscribeOn函数,传入Scheduler,返回ObservableSubscribeOn。
//ObservableSubscribeOn
@Override
void subscribeActual(Observer<? super T> observer) {
SubscribeOnObserver parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setScheduleDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Observer对应使用SubscribeOnObserver包装,新增保存一个Disposable,来源于Scheduler.scheduleDirect()。前文描述了,返回的是CancelableRunnableTask对象,当SubscribeOnObserver被中止时,连锁中止Worker里的线程任务。
static final class SubscribeOnObserver<T> implements Observer<T>, Disposable {
private final Observer<? super T> downstream;
private Disposable upstream;
private Disposable scheduleDisposable;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
upstream.dispose();
scheduleDisposable.dispose();
}
public void setScheduleDisposable(Disposable scheduleDisposable) {
this.scheduleDisposable = scheduleDisposable;
}
//...
}
Scheduler接收一个Runnable,SubscribeOnObserver需要包装进SubscribeTask,在run函数里执行订阅。这样子,订阅过程将会执行在Scheduler指定地方。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
只有第一个subscribeOn有效?
当使用多个subscribeOn切换线程,你会发现只有第一个subscribeOn有效。
Observable observable = Observable.just(10)
.map(integer -> {
System.out.println("[Operation] map at " + Thread.currentThread().getName());
return String.valueOf(integer * integer);
}).subscribeOn(new NewThreadScheduler("subscribeOnThread1"))
.subscribeOn(new NewThreadScheduler("subscribeOnThread2"));
创建一个Observable,在订阅时进行两次线程变换,分别使用线程1和线程2,最后结果just和map会在线程1执行。
Observable链式本质是子Observable包含父Observable,从下向上订阅,在这个例子里:
- 第二个ObservableSubscribeOn包含第一个ObservableSubscribeOn;
- 第一个ObservableSubscribeOn包含ObservableMap;
- ObservableMap包含ObservableJust。
触发订阅时的顺序:
- 第二个ObservableSubscribeOn切换到线程2,调用第一个ObservableSubscribeOn的订阅;
- 第一个ObservableSubscribeOn切换到线程1,调用ObservableMap的订阅;
- ObservableMap调用ObservableJust的订阅。
回顾ObservableSubscribeOn.subscribeActual函数,先由Scheduler切换到指定线程,再在SubscribeTask执行父Observable的subscribe。这样不难理解为什么第二个ObservableSubscribeOn的切换无效,准确的说法是的确切换到线程2,但接下来又被切换到线程1,看起来像无效。
5、observeOn原理
理解subscribeOn后,再理解observeOn易如反掌。和subscribeOn在从下向上订阅中切换线程相反,observeOn则是从上到下的数据变换中切换线程。
public class ObservableObserverOn<T> extends Observable<T> {
final Scheduler scheduler;
final Observable source;
public ObservableObserverOn(Observable<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker worker = scheduler.createWorker();
source.subscribe(new ObserverOnObserver(observer, worker));
}
}
ObservableObserverOn仅仅将通过Scheduler创建的Worker传递给ObserverOnObserver。
@Override
public void onNext(T t) {
if (done) return;
queue.offer(t);
schedule();
}
@Override
public void onError(Throwable e) {
done = true;
error = e;
schedule();
}
@Override
public void onComplete() {
if (done) return;
done = true;
schedule();
}
private void schedule() {
worker.schedule(this);
}
接收数据的三个on函数,统一调用schedule函数,它直接调用worker.schedule。Worker执行的是个Runnable,因此ObserverOnObserver实现了Runnable,提供run函数。
@Override
public void run() {
if (error != null) {
disposed = true;
downstream.onError(error);
return;
}
while (true) {
try {
T t = queue.poll();
if (t != null) {
downstream.onNext(t);
//TODO 接收一次onNext就当结束
downstream.onComplete();
}
} catch (Exception e) {
dispose();
downstream.onError(e);
return;
}
}
}
注意到这里启用了一个queue处理线程同步,源码里更加复杂,为了简单起见,例子里取得一个数据就直接结束。
通过在指定线程执行上游的数据变换,成功实现线程切换。和subscribeOn只有第一次有效果不同,observeOn先将数据放进队列,再执行下游的回调,互相之间不会影响。
6、总结
模拟的代码有些粗糙,不过大概讲清楚RxJava的主干,其实源码还有很多细节不能一一道来,下一篇将挑一部分常用的操作符和Scheduler研究原理。