本片文章适用于有一定Android开发经验并且对于响应式编程有一定了解的程序猿阅读。
简介
RxJava按照官方的定义为:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。在Android上使用的比较广泛,因为在移动开发中由于UI线程不能阻塞,否则会出现卡顿,所以异步操作对于移动端编程尤其重要。而RxJava就是这样一个基于事件流并且便于异步操作的程序库。下面我们从源码角度分析一下事件的创建流程。
Demo
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
emitter.onNext("world");
emitter.onComplete();
}
});
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext data is :" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError data is :" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribe(observer);
输出:
onSubscribe
onNext data is :hello
onNext data is :world
onComplete
从日志可以看出onSubscribe方法先被调用然后依次输出hello world 最后输出 onComplete 符合Observable 中subscribe方法的调用顺序。
Observable#create
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Observable.create()的入参为ObservableOnSubscribe,那ObservableOnSubscribe是什么呢,它是一个接口
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(ObservableEmitter<T> e) throws Exception;
}
ObjectHelper.requireNonNull()对入参进行了空指针判断。
RxJavaPlugins根据注释来看其实就是一个对Rxjava标准操作进行处理的插件类。
RxJavaPlugins.onAssembly返回的其实就是new 出来的ObservableCreate对象,同时持有ObservableOnSubscribe对象引用。
那ObservableCreate又是什么呢?其实ObservableCreate就是一个Observable被观察者对象并重写了subscribeActual()方法。而subscribeActual()真正调用的地方发生在订阅发生的地方,下文会分析到。
所以Observable#create最终创建的是ObservableCreate对象。
Observable#subscribe
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
可以看到RxJavaPlugins.onSubscribe(this,observer)其实只是简单的返回了observer而已,关键的是subscribeActual(observer)。通过上文可知,subscribeActual真正发生的地方其实是在ObservableCreate中。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual这段代码并不长但很重要,咱们一句一句来分析下。
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
采用的是组合的方式,parent 内部持有了observer对象仅此而已,可以理解为parent 为observer的代理。
然后observer调用了自己的onSubscribe方法,这也是为什么我们一开始看到onSubscribe日志最先输出的原因。同时它也脱离了被订阅者的管理,因为订阅者自己调用了自己。
接下来可以看到订阅真正发生的地方source.subscribe(parent);
source就是一开始我们new出来的ObservableOnSubscribe对象,而parent是订阅者的代理对象,所以当订阅发生的时候,就会输出上面的日子。
单个操作符订阅总结
1、传入的ObservableOnSubscribe最终被用来创建成ObservableCreate
2、ObservableCreate持有我们的被观察者对象以及重新了订阅触发时的回调函数subscribeActual
3、在subscribeActual实现了我们的主要逻辑,包括observer.onSubscribe(parent);
source.subscribe(parent);
parent.onError(ex)的调用
4、在Observable的subscribe被调用时开始执行事件分发流程
5、最后放一张对象间的关系图(此图来自于网络)
组合操作符总结
1、首先我们得明确一点每个操作符最后都会通过RxJavaPlugins.onAssembly(Observable<T> source)返回一个新的Observable。如下:
2、onAssembly的入参针对每一个操作符都会实现一个继承自AbstractObservableWithUpstream:Observable的对象,它的作用就是用于包装上级的AbstractObservableWithUpstream:Observable。比如:
而每一个新的Observable对象中都会有一个继承自Observer的对象,它的作用就是用于包装下级Observer和当前的Function
3、当我们调用最底层subscribe方法的时候其实真正调用的是上一级Observable的subscribeActual方法,然后subscribeActual方法中会构造一个内部Observer子类的对象,然后通过调用上级Observable的subscribe方法将新生成的包装Observer对象传入到上一级中。
4、通过一层层的包装上传,当调用链来到最顶层的ObservableCreate时,由于不能再往上一层进行封装了,就会执行ObservableOnSubscribe的subscribe方法,如下:
而subscribe方法的参数就是下级经过层层包装传递上来的Observer,所以当我们调用emitter的相关方法时,内部都会执行如下操作:
其中actual就是下级的Observer。
5、当往下的调用链来到最底层时自然调用的就是最底层Observer的相关方法了。
至此,组合操作符的调用链就分析完了。