RxJava2.+创建流程源码分析

本片文章适用于有一定Android开发经验并且对于响应式编程有一定了解的程序猿阅读。

简介

RxJava按照官方的定义为:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。在Android上使用的比较广泛,因为在移动开发中由于UI线程不能阻塞,否则会出现卡顿,所以异步操作对于移动端编程尤其重要。而RxJava就是这样一个基于事件流并且便于异步操作的程序库。下面我们从源码角度分析一下事件的创建流程。

Demo

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
               emitter.onNext("hello");
                emitter.onNext("world");
                emitter.onComplete();
            }
        });

Observer observer = new Observer<String>() {
           @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext data is :" + value);
            }


            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError data is :" + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

observable.subscribe(observer);

输出:

onSubscribe
onNext data is :hello
onNext data is :world
onComplete

从日志可以看出onSubscribe方法先被调用然后依次输出hello world 最后输出 onComplete 符合Observable 中subscribe方法的调用顺序。

Observable#create

 @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

Observable.create()的入参为ObservableOnSubscribe,那ObservableOnSubscribe是什么呢,它是一个接口

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

ObjectHelper.requireNonNull()对入参进行了空指针判断。
RxJavaPlugins根据注释来看其实就是一个对Rxjava标准操作进行处理的插件类。
RxJavaPlugins.onAssembly返回的其实就是new 出来的ObservableCreate对象,同时持有ObservableOnSubscribe对象引用。
那ObservableCreate又是什么呢?其实ObservableCreate就是一个Observable被观察者对象并重写了subscribeActual()方法。而subscribeActual()真正调用的地方发生在订阅发生的地方,下文会分析到。
所以Observable#create最终创建的是ObservableCreate对象。

Observable#subscribe

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

可以看到RxJavaPlugins.onSubscribe(this,observer)其实只是简单的返回了observer而已,关键的是subscribeActual(observer)。通过上文可知,subscribeActual真正发生的地方其实是在ObservableCreate中。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribeActual这段代码并不长但很重要,咱们一句一句来分析下。
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
采用的是组合的方式,parent 内部持有了observer对象仅此而已,可以理解为parent 为observer的代理。
然后observer调用了自己的onSubscribe方法,这也是为什么我们一开始看到onSubscribe日志最先输出的原因。同时它也脱离了被订阅者的管理,因为订阅者自己调用了自己。
接下来可以看到订阅真正发生的地方source.subscribe(parent);
source就是一开始我们new出来的ObservableOnSubscribe对象,而parent是订阅者的代理对象,所以当订阅发生的时候,就会输出上面的日子。

单个操作符订阅总结

1、传入的ObservableOnSubscribe最终被用来创建成ObservableCreate
2、ObservableCreate持有我们的被观察者对象以及重新了订阅触发时的回调函数subscribeActual
3、在subscribeActual实现了我们的主要逻辑,包括observer.onSubscribe(parent);
source.subscribe(parent);
parent.onError(ex)的调用
4、在Observable的subscribe被调用时开始执行事件分发流程
5、最后放一张对象间的关系图(此图来自于网络)

52eb2279jw1f2rx489robj20lk0a8my2.jpg

组合操作符总结

1、首先我们得明确一点每个操作符最后都会通过RxJavaPlugins.onAssembly(Observable<T> source)返回一个新的Observable。如下:


image.png

2、onAssembly的入参针对每一个操作符都会实现一个继承自AbstractObservableWithUpstream:Observable的对象,它的作用就是用于包装上级的AbstractObservableWithUpstream:Observable。比如:


image.png

而每一个新的Observable对象中都会有一个继承自Observer的对象,它的作用就是用于包装下级Observer和当前的Function
image.png

3、当我们调用最底层subscribe方法的时候其实真正调用的是上一级Observable的subscribeActual方法,然后subscribeActual方法中会构造一个内部Observer子类的对象,然后通过调用上级Observable的subscribe方法将新生成的包装Observer对象传入到上一级中。
4、通过一层层的包装上传,当调用链来到最顶层的ObservableCreate时,由于不能再往上一层进行封装了,就会执行ObservableOnSubscribe的subscribe方法,如下:


image.png

而subscribe方法的参数就是下级经过层层包装传递上来的Observer,所以当我们调用emitter的相关方法时,内部都会执行如下操作:


image.png

其中actual就是下级的Observer。
5、当往下的调用链来到最底层时自然调用的就是最底层Observer的相关方法了。

至此,组合操作符的调用链就分析完了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容