RxJava2.0源码初探
RxJava2.0的源码相对于1.0发生了很大的变化, 命名方式也发生了很大变化, 下面根据一个例子来初步探索这种变化
Disposable disposable = Observable.create(
new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> emitter){
emitter.onNext("Hello RxJava");
}
}
)
.subscribeOn(Scheduler.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Comsumer<String>(){
@Override
public void accept(String s){
Log.d(TAG, s);
}
});
这个例子中, 我们使用的是Observable/Observer
这对组合, 在Observable
调用subscribe
方法之后, 返回的是一个Disposable
对象.
进入subscribe
方法的源码中去查看, 发现这个Disposable
其实是一个LambdaObserver
对象, 还记得我们说过吗, 所有的观察者要不然会实现Disposable
接口,要不然会实现Subscription
接口, 表示这个观察者是可取消的或者可以用来被取消订阅的. 具体来说就是Observer
会实现Disposable
接口, Subscriber
两个接口都会实现.
如果我们采用的是Flowable/Subscriber
这对组合, 这个Disposable
对象就是LambdaSubscriber
.
我们在取消订阅的时候, 会调用Disposable
的dispose
方法, 这个方法分两种情况, 如果是LambdaObserver
, 那么它的dispose
方法会直接执行, 如果是LambdaSubscriber
对象, 就会调用这个对象的cancel
方法去执行.
接下来我们从上往下看, 可以看出RxJava2.0在命名中的一个很有意思的地方.
在这个例子中, 我们从上到下总共调用了create
,subscribeOn
,observeOn
,这三个操作符, RxJava2.0中, 每调用一个操作符, 都会生成一个新的Observable
对象,这个新的Observable
对象是对上游的Observable
对象的封装.
具体来说, 我们调用了create
操作符, 会生成一个ObservableCreate
对象, 它里面有一个source
属性,是我们传入的ObservableOnSubscribe
(这是我们定义的一个匿名内部类实例).
然后我们调用了subscribeOn
操作符, 会生成一个ObservableSubscribeOn
对象, 它里面有一个source
属性, 是上游的ObservableCreate
对象
然后我们调用了observeOn
操作符, 会生成一个ObservableObserveOn
对象,它里面有一个source
属性, 是上游的ObservableSubscribeOn
对象.
在调用了subscribe
方法之后, 我们从下往上看
首先我们确定, 是ObservableObserveOn
这个对象调用了subscrbe
方法
在subscribe
方法中, 我们传入了一个Consumer
对象, 这个对象会作为参数, 来生成一个LambdaObserver
对象, 然后接下来会调用source
的subscribe
方法, 传入的参数是一个ObserveOnObserver
对象,这个对象是对LambdaObserver
的封装, 封装到了其actual
属性中. 这里ObservableObserveOn
的source
属性是ObservableSubscribeOn
那么接下来就会执行ObservableSubscribeOn
的subscribe
方法, 然后会调用其source
的subscribe
方法, 把传入的ObserveOnObserver
封装到SubscribeOnObserver
的actual
属性中. 这里ObservableSubscribeOn
的source
属性是ObservableCreate
那么ObservableCreate
执行subscribe
方法, 会调用其source
的subscribe
方法, 也就是我们传入的ObservableOnSubscribe
的subscribe
方法, 同时会把SubscribeOnObserver
封装到ObservableEmitter
的actual
属性中. 这里ObservableCreate
的source
属性是我们传入的ObservableOnSubscribe
, 我们实现了它的subscribe
方法, 传入了一个ObserableEmitter
, 这里我们就可以调用ObserableEmitter
的onNext
等方法实现数据的发射了.
这就形成了一个闭环, 下面用一幅图来表示这个闭环.
注意这里有一个内存泄露的问题, 在上面的代码中, 即使我们调用了Disposable
的dispose
方法, 会把我们的ObservableOnSubscribe
这个匿名内部类给切断, 但是不会切断我们自定义的Consumer
这个匿名内部类和我们Activity的联系.
要切断Consumer
这个匿名内部类和我们Activity的联系, 我们需要在observeOn
这个操作符值之后, 在调用一个onTerminalDetach
操作符, 这个操作符会生成一个ObservableDetach
, 这个被观察者的subscribe
方法会生成一个DetachObserver
, 同样也是对我们传入的Observer的封装, 也就是对LamdaObserver
的封装, 同样是封装到了actual
属性中.
但是这个DetachObserver
的dispose
方法会把actual
属性置空, 从而切断我们对于Consumer
这个匿名内部类的引用.