对于Rxjava想必大家都很熟悉了,这里不再赘述什么是Rxjava。
今天的主题是:从源码角度(2.0)分析,Rxjava是如何做到事件分发的??
以下是今天学习笔记的目录:
- 关键类及方法简要说明
- 分析源码,查看事件是如何传递的
关键类及方法说明
首先我们知道Rxjava是一个扩展的“观察者”模式,既然是“观察者”模式,那么不可避免的会涉及到:被观察者,观察者,订阅操作,事件。那么Rxjava中哪些类是这些身份的扮演者呢?先上一下基本类图。
类图:
基本使用code:
结合图-1和图2,我们可以知道"被观察者"以及"观察者"是谁了。 那"ObservableOnSubscribe","ObservableEmitter","Disposable"又是什么? 接下来我们就从"Observer"开始逐一梳理下他们是什么,以及如何工作的。
Observer:观察者
从上图中我们可以知道 “Observer” 是Rxjava 中 观察者的身份。我们简单介绍下Observer中各个方法的作用。
onSubscribe(Disposable d):
1:当 Observable向 Observer发送事件前,调用此方法(具体在哪调用的,后续会分
析到,这里先给出结论)。
2:可以调用 Disposable(为了便于理解,我们可以把它想象成 “消息控制开关”,因为它就像我们平常接触的各种开关一样,来控制消息是否分发给 Observer)的dispose()来告知 Observable,当前的 Observer是否需要接受事件。
onNext(T value):
1:该方法就是 Observable传递订阅事件给Observer的回调方法,其中value就是 Observer向Observable订阅的要监听的事件。
2:如方法注释所说的那样,该方法可以调用多次(想想也是,观察者订阅的事件当然可能发生多次了)。
3:当调用onComplete()/onError()之后就算再调用onNext()事件也不会发送至 Observer。
onComplete():
1:该方法标记之后的任何事件将不会发送给 Observer
2:当调用了onComplete()之后再调用onError()事件是不会发送给 Observer。
onError(Throwable e):
1:该方法通知观察者出现了错误情况。
2:当调用了onError()之后,再调用onNext()/onComplete()事件也不会发送给 Observer。
问题:
1:在实际应用时发现,“当调用onComplete()之后再调用onError() app会抛出异常并crash”。
2:当设置 observeOn(Scheduler s)之后,会导致 “onError()之前的事件接受不到或者丢失部分事件”。
以上两个问题会在稍后给出原因。
Observable:被观察者
首先,我们知道它是Rxjava中 “被观察者” 的具体实现。 其次,我们要创建一个被观察者可以通过Observable提供的N多静态方法"去new一个出来"。我们拿"create()"举个例子。
create():
create()方法其实做的事情并不多。
- 判断传入的ObservableOnSubscribe是否为null。
- 创建ObservableCreate(该类继承Observable)并把ObservableOnSubscribe添加到 ObservableCreate,并返回ObservableCreate。
既然创建好了 被观察者,那么接下来就需要 "观察者 订阅 被观察者,让被观察者时刻保持警惕,当有我要的事件发生时,记得通知我",那"订阅"这个动作 Rxjava是如何实现的呢?
subscribe():订阅操作
我们都知道 标准观察者模式中,实现"订阅"操作的话应该是 "观察者.订阅(被观察者)"这种写法才对,就拿给View设置点击事件来说应该是"View.setOnClickListener()"这种。那为啥Rxjava中要反其道而行要采用"被观察者.订阅(观察者)"这种方式呢? 这是因为Rxjava采用了“流式api”调用策略,这样写可以使代码更简洁,有种一气呵成的感觉,所以就把 "订阅"动作放到了Observable中。废话不多说,我们来看下subscribe()都做了些什么事情吧。
首先,通过ObjectHelper.requireNonNull()判断传入的Observer是否为null。
其次,调用RxJavaPlugins.onSubscribe()返回当前Observer(具体该方法会在后续章节分析)。
再次,调用ObjectHelper.requireNonNull()判断传入的Observer是否为null。
最后,调用subscribeActual()进行 订阅操作。
subscribeActual():
通过该方法的解释,我们可以知道此方法是 "实际订阅动作的发生地"。
到此“观察者模式”中的三大主角,“观察者”和“被观察者”以及“订阅操作”就简单介绍完了。那么问题来了,接下来“事件”需要怎么发出呢?好,我们继续往下看。
事件:
ObservableOnSubscribe:
首先从该接口的定义中我们知道,此接口是 “事件产生和推送的地方”。具体的事件的产生和发送是在subscribe()中实现的。我们来举个例子,声情并茂的说明下吧。 我们大都玩过cs,cf之类的射击游戏,假如我是警,对面人物是匪,此时我们相遇了(我没看到他),一根无形的线把我们“栓”在了一起。此时对面哥们一看,有敌人,二话不说举枪就开始射击,不过还好我穿了防弹衣,再加上反映比较快马上就找好了掩体进行了反击。
对于这样一个场景来说,假如我是Observer,那么对面那哥们就是Observable,那根“无形的线”就可以理解为subscribe动作,枪对于我们来说就是“事件的产生者或者容器”那么“子弹”可以理解为具体要发送的“事件”了。
这里的“枪”指的就是“ObservableOnSubscribe”。
ObservableEmitter:
继续沿用上边的例子,我们说到了可以用“枪”来比喻成“ObservableOnSubscribe”来理解它的定义。那么,枪要想发射子弹的话,人得扣动扳机才行,对吧,那我们就可以把“ObservableEmitter”理解成“枪的扳机”,用于控制事件的“发射”。
看一下类图吧。
通过类图我们不难发现,ObservableEmitter实现了Emitter接口,Emitter接口中定义了onNext(),onError(),onComplete()等三个方法,是不是感觉在哪见过?没错,这三个方法正好对应的是Observer中的onNext(),onError(),onComplete()。通过Emitter调用这三个方法,则会分别回调Observer对应的方法(具体实现我们在稍后会给出)。那么调用Emitter的三个方法后Observer会收到消息,事件不就是从 被观察者向观察者传递了吗?
Disposable
最后再来说一说,Disposable,老规矩,上图。
我们可以把"Disposable"理解为“消息控制开关”,就像电灯的开关一样,它控制了是否消息可以送达至Observer处。
而且细心的同学观察 图-2以及Observer接口的定义代码可以发现,Observer的onSubscribe(Disposable d)把该开关返回给了 Observer,为什么要这么设计呢?
原因应该有2个:
1:当消息发送前,首先回调Observer的onSubscribe()告知观察者我要开始给你发消息了,你先做做准备,这样的 话我们可以在第一条消息未发送之前在该方法中做一下准备工作之类的。
2:再调该方法时,如果观察者不想被观察者发送事件,可能我还没做好准备,或者我改变注意我不想接收 被观察的发送的事件了,可以调用Disposable 的dispose(),这样当 被观察发送事件的时候,就会判断,观察者是不是需要我的事件,如果不需要我就不发了(实际代码内部也是这样处理的,看过源码的东西大概都清楚这些事,这里就稍微介绍一下)。
简单介绍就到这里了,接下来我们就从源码角度来看看,事件是怎样从 Disposable 传递到 Observer的。
源码分析
看过Rxjava源码的同学都知道,Rxjava的代码量还是挺多的,我们不可能事无巨细。那么如何阅读其中的代码比较好呢?我觉得应该遵从这样一个原则:掌握大方向,梳理脉络。细化小方面,深入理解。
首先,我们需要了解这个功能,是怎样是实现的,用到了哪些类,接口等,最好列出来,画一画类图,流程图,要做到心中有数,不要过分追究细节,只要知道这个功能的每一步对应的是哪些类就行了。
最后,当上步我们完成之后,就可以对存在疑虑的地方做进一步研究,比如某个知识点可能不大清楚,这时就需要花事件和心思搞明白了。如此这般流程走下来,我相信阅读源码并不只是枯燥乏味的事情,这其中定会充斥着很多欢乐。废话不多说了,接下来我们就开始阅读源码吧。
大方向:
Observable创建:
在“关键类及方法说明”我们从create()方法入手简要分析了下Observable是如何创建的。通过代码我们知道,create()方法最终创建了一个名为“ObservableCreate”的Observable,并把“ObservableOnSubscribe”存储到ObservableCreate中。
Observer创建:
Observer的创建也可以参考“关键类及方法说明”中关于Observer的介绍,这里不再说了。
接下来我们主要看一下,执行“订阅”操作之后事件是如何传递的。
事件的传递:
通过“关键类及方法说明”中关于subscribe()发生后的介绍,我们知道“实际订阅”操作都是发生在subscribeActual()该方法中的,又因为该方法是抽象方法,所以我们直接进入“ObservableCreate”类查看其对subscribeActual()方法的实现吧。
这是ObservableCreate中subscribeActual()方法的具体实现。接下来我们具体分析下。
1:创建CreateEmitter
在“关键类及方法说明”中我们介绍了什么是“ObservableEmitter”,这里不再赘述。阅读源码我们发现,通过调用CreateEmitter的构造方法,把“Observer”对象保存至到了CreateEmitter中。 问题:为什么此Emitter中要保留一份外部Observer的引用呢? 稍后我们给出原因。
2:回调Observer的onSubscribe(Disposable d)
因为CreateEmitter也实现了Disposable,所以就可以把CreateEmitter回调给 Observer了。此时观察者就可以在onSubscribe()中做一些事件发送前的准备工作什么的。
3:发送事件
首先,回调ObservableOnSubscribe的subscribe()并把新创建的CreateEmitter返回去。此时我们就可以用该Emitter发送事件了(这就是Observable产生以及发送事件的地方)。例如:
3.1:OnNext()
首先,判断传递的“事件”是否为null。
其次,再调用“isDisposed()”判断“消息开关”是否已经关闭了(这个稍后分析)。
最后,如果没关闭,则回调Observer的onNext()方法回调事件。还记在分析ObservableEmitter时引入的那个问题吗?在此,就知道原因了吧。如果不给我一个观察者的引用,我把事件回调给谁呢,是吧。
3.2:onComplete()
首先,调用“isDisposed()”判断“消息开关”是否已经关闭了,如果没关闭和继续。
其次,调用Observer的onComplete()回调Complete事件。
最后,调用dispose()关闭“消息控制开关”。
3.3:onError()
首先,判断传入的Throwable是否为null。
其次,调用“isDisposed()”判断“消息开关”是否已经关闭了,如果没关闭和继续。如果已经关闭了,则调用RxJavaPlugins.onError(t),该方法稍后再解释。
最后,调用Observer的onError()回调error事件。
小方面
自此我们结合源码,大体捋清楚了,Rxjava中事件传递的一个过程,这就是前边提到的“掌握大方向,梳理脉络”。接下来我们对前面遗留的诸多问题进行一一深入理解,这部分也就是“细化小方面,深入理解”。
首先我们在创建CreateEmitter的时候,发现,它既实现了Emitter接口,又实现了Disposable接口,所以说它既是“消息发射器”又是“消息控制开关”。而且我们在分析onNext(),onComplete(),onError()方法时都发现,这些方法内部都先调用了"isDisposed()"判断“消息开关”是否关闭了,接下来我们就从“消息发射器”角度来捋一捋这个方法是怎么实现的。
isDisposed()
我们知道CreateEmitter是继承自AtomicReference(这是专门采用原子操作,进行更新操作对象的一个原子类)。 首先,通过get()获取AtomicReference中的value的值,默认值为null。 其次,调用DisposableHelper的isDisposed()把get()获取的值传入,并与DisposableHelper.DISPOSED进行比较,判断不是DisposableHelper.DISPOSED则返回false,如果是的话则返回true。
setDisposable()
做为“消息发射器”CreateEmitter还必须实现此方法,我们看看一下这个方法都干了些什么吧。
1:把CreateEmitter的当前实例和Disposable传入RxJavaPlugins的set()方法中。
2:CreateEmitter做为AtomicReference,获取当前CreateEmitter的value并赋值给“current”。
2.1:如果 “current ==DISPOSED”,且传入的Disposed不为null,则调用传入的Disposable的dispose()并跳出当前方法。
2.2:如果 “current !=DISPOSED”,执行AtomicReference的compareAndSet() 给 CreateEmitter的value 设置为传入的Disposable。如果current不为null的话,执行dispose()方法。
至此我们发现setDisposable()就是把传入的“Disposable”保存起来,等到调用“isDisposed等方法”来判断“消息开关是否关闭了”。 接下来我们做个“猜想”: **如果在消息发送前设置Disposable为DisposableHelper.DISPOSED的话,消息是会继续传递的,如果设置的是自定义的Disposable的话,消息则不会被传递。 ** 下面我们就实际测试下,这个“猜想”是否成立吧。
case1:在ObservableOnSubscribe的subscribe()中设置Disposable为DisposableHelper的唯一实例。
log信息为下图。
case2:在ObservableOnSubscribe的subscribe()中设置Disposable为自定义的Dispaseable。
根据以上测试结果确实和我们的“猜想”一致为什么呢?此时我们回过头来看看上边关于** isDisposed() 以及 onComplete()的分析,发现 当继续执行CreateEmitter的onComplete时,此时的Dispaseable如果为DisposableHelper的DISPOSED实例,isDisposed() 就会返回true,所以 后续的Complate会回传给Observer了。如果为false,当然就不会走了呗。**
CreateEmitter做为“消息发射器”的角色的责任分析完毕后,接下来我们分析下其做为“消息开关”又能干些什么吧。
当CreateEmitter做为“消息开关”时,它自身有两个方法需要实现,它们分别是:isDisposed()和dispose()。isDisposed()以及分析过了,接下来只需要分析下dispose()就行了。
dispose():
通过阅读源码我们知道,当CreateEmitter中的value对应的不是DisposableHelper的DISPOSED实例的话,就会把DisposableHelper的DISPOSED保存至CreateEmitter中。当通过Emitter发送事件时,就会先调用isDisposed()来判断“消息开关”是否关闭了,如果关闭了,则中断事件的传递。
到此RxJava的关于事件分发这块到底是如何做的已经分析完了,如果大家看完感觉有帮助的话欢迎点击收藏和喜欢。如果有错误之处,还望各位指出,我会尽快改正的。后续我会继续分享关于,线程调度,常用操作符以及2.0关于背压这3个方面的介绍,谢谢大家。