RxJava,它是一个实现异步操作的库,异步操作很关键的一点是程序的简洁性,而RxJava正是实现了这一点。
一. 创建被观察者Observable
1. 首先我们在create中传入了一个创建的新的Onsubscribe对象,
其实这个Onsubscribe对象是一个Observable中的内部类,它是一个接口,继承自Action1,而Action1又继承自Action,Action又继承自Function,Function是一个接口。
2. 当我们调用了create方法之后,我们就把创建的OnSubscribe传进去,
进入create方法,我们可以看到,方法里面只是新建了一个Observable对象,我们看到这个方法里面有个RxJavaHooks.onCreate(f)的方法,其实这个方法,它返回的对象还是原来的对象f,之后new Observable,再点进去看到,他只是进行了一个赋值操作,把f赋值给了一个OnSubscribe。
3. 在传入的Onsubscribe对象中,我们写好了Onsubscribe对后续Subscriber或者Observer的call操作。
二. 被观察者Observable订阅观察者Observer
1. 在生成新的new Observable之后,开始订阅Observer。subscribe方法里面的参数是Observer,Subscriber,Action1,其实Subscriber是Observer的一个子类,Subscriber implements Observer, Subscription,它只是多了几个方法。
2. 以使用Observer为例,
首先生成new ObserverSubscriber(observer),observer作为ObserverSubscriber的一个参数。
其中ObserverSubscriber extends Subscriber,然后调用subscribe(subscriber),进而调用Observable.subscribe(subscriber, this),
即调用subscribe(subscriber, observable)。this即observable本身。
3. 在进行的一系列的判断后,执行subscriber.onStart(),onStart方法,而这个方法在rxJava只是一个空实现,所以我们在这里可以先不用管他。然后, 把subscribe包装成了一个SafeSubscriber对象。
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)方法是重点,这个方法才是一切回调的开始。
RxJavaHooks.onObservableStart(observable, observable.onSubscribe)里面其实是返回入参的onSubscribe,最后才是onSubscribe.call(subscriber),这个时候就是执行我们自己定义的onSubscribe.call,call中定义了对应的ObserverSubscriber的onNext、onError或onComplete方法,再调用最初入参observer的onNext、onError或onComplete方法。
4. 返回return RxJavaHooks.onObservableReturn(subscriber); 将observer 转化为ObserverSubscriber(即Subscriber),以Subscription的形式返回。
三. from发送集合数据
1. from中的入参是一个Iterable类型的,而很多的集合类都实现了这一个接口,继承自Collection,而Collection是继承自Interable的。
2. 当我们调用from的时候,我们new了一个OnSubscribeFromIterable(OnSubscribe的实现类)对象,我们知道实现OnSubscribe接口的类有好多个,OnSubscribeFromIterable便是其中的一个,调用new这个方法,我们只是把我们创建出来的list保存到了一个变量当中,这里暂且取名为is,
然后在from这个方法里面又调用了一个create的方法,其实这个方法,和我们1.2讲过的调用的是同一个方法,
3. 当我们调用from完毕时,我们构建了一个新的被观察者Observable,如下所例:
4. 接下来又是被观察者Observable订阅观察者Observer,又是这一流程。
5. 订阅流程的分析参考2.2,然后又到了关键的一步RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
这里的OnSubscribe对象,是我们第一次调用from的时候保存的OnSubscribe对象,即我们自己创建的OnSubscribeFromIterable对象,
而这里的入参subscriber,是我们创建的ObserverSubscriber
我们先看onSubscribe(OnSubscribeFromIterable)重写的call方法,
这里的集合为it,观察者为Subscriber o,在正常的流程下,先将观察者o和集合it作为入参,生成IterableProducer。然后o.setProducer(new IterableProducer(o, it));
在setProducer中,会调用Producer的request方法,
request方法里面会调用fastPath()或者slowPath(n),这两个方法里面都会循环调用集合Iterator
6. 这样就调用from完毕
四. just发送集合数据
1. 当调用just的方法的时候,传入的是一个泛型数据,而from,传入的是Iterable类型的对象。
2. create方法,也只是创建了一个ScalarSynchronousObservable,这个ScalarSynchronousObservable是继承自Observable的。从create点进去,我们可以看到它也是通过构造方法,创建了一个Obsevable对象。再后来是把我们 创建的泛型 T value 保存在了一个以 t 命名的泛型变量当中。
3. JustOnSubscribe类实现OnSubscribe接口,里面重写了call方法,在后续订阅观察者时,会调用该call方法。call方法中有setProducer方法,分析如3.5,这里不做赘述。3.5的setProducer方法的入参是 IterableProducer。这里不同于3.5的是,setProducer方法的入参是先生成一个createProducer(s, value)。
分析createProducer,里面会生成WeakSingleProducer或SingleProducer。
两个producer,不同的一点是SingleProducer实现了AtomicBoolean,顾名思义,这个单词的意思是原子性,原子性的解释是:
原子是世界上的最小单位,具有不可分割性。比如 a=0;(a非long和double类型) 这个操作是不可分割的,那么我们说这个操作时原子操作。再比如:a++; 这个操作实际是a = a + 1;是可分割的,所以他不是一个原子操作。非原子操作都会存在线程安全问题,需要我们使用同步技术(sychronized)来让它变成一个原子操作。一个操作是原子操作,那么我们称它具有原子性。
再看下它的注解,也正是这个意思,WeakSingleProducer,它在多线程中执行是不安全的。
setProducer方法,就是去调用request方法:
观察克制,just不同于from,onNext方法只会调用一次,并且是将集合一次性发出去。最后调用onCompleted方法。
最后调用super(RxJavaHooks.onCreate(new JustOnSubscribe(t))),因为ScalarSynchronousObservable是继承自Observable的,所以是执行
just方法执行后,就是生成了一个新的被观察者Observable。
接着又是熟悉的订阅流程,不做赘述。
4. from只会传入一个参数,just中会传入多个参数,
当just有多个参数传入时,RxJava会把多个参数合并成一个数组,
我们看到当n=0的时候,只是设置了empty,因为没有数据,最后调用一下onComplete方法,当n=1的时候,就是我们上面分析的这种情况,我们这边重点看下n>1的时候,它会new一个OnSubscribeFromArray,这个也是继承自OnSubscribe,所以和我们以前的分析一样,
我们可以看到当传过来的是个数组的时候,我们会循环获取数组里面的每个元素遍历发送,直到结束,所以我们可以得到这样的结论,当just的方法中传入多个参数的时候,RxJava会把他们组装成一个数组,然后在发送的时候,循环获取里面的内容进行发送,会调用多次的onNext的方法。至此,just的分析基本完成。
五. map转换符
map转换符的作用是实现把一个对象或其它通过map函数转换成一个对象或其它。
1. create函数,首先创建一个OnSubscribeMap,入参是Observable本身和转换函数func。
OnSubscribeMap类继承OnSubscribe,