RxJava源码浅析———订阅流程、map与Filter操作符实现原理

RxJava源码浅析———订阅流程、map与Filter操作符实现原理

RxJava是一个非常流行的基于观察者模式的响应式编程框架,在Android开发等领域有很广泛的应用。作为Java开发者,我们有必要了解一下RxJava的实现原理,掌握RxJava的设计思想,这样我们才能更加熟练的使用RxJava来编写优秀的代码。

订阅流程

要使用RxJava,首先要创建一个被观察者Observable对象。我们通常是使用Observable的create方法来创建一个Observable对象的,如下:

    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(new Integer(1));
            e.onNext(new Integer(2));
            e.onComplete();
        }
    });    

我们来看一下create方法的源码:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

在create方法中,首先对被传入的参数ObservableOnSubscribe对象进行了空检查,然后new了一个ObservableCreate对象,并将这个对象作为参数传入了RxJavaPlugins.onAssembly方法中,我们首先来看onAssembly方法的源码:

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

在onAssembly方法中首先判断onObservableAssembly对象是否为null,如果不为null则调用apply方法并返回结果,如果为null则直接返回传入的参数。在默认情况下onObservableAssembly为null,也就是说默认情况下onAssembly方法其实什么都没有做,直接将传入的Observable对象返回了。

回到Observable的create方法,我们再来看new ObservableCreate到底做了什么,ObservableCreate类的构造方法如下:

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
        
        ...
    }    

ObservableCreate类的构造方法很简单,直接将传入的ObservableOnSubscribe对象保存在了自己的一个全局变量source中,并且这个变量被final所修饰。而我们通过分析ObservableCreate类可以发现,ObservableCreate继承自Observable,也就是说ObservableCreate本身也是一个被观察者对象。

通过上面的分析我们可以看出,Observable.create方法其实就是创建了一个ObservableCreate对象,并把一个ObservableOnSubscribe保存在了这个ObservableCreate中。

创建完Observable,我们便可以调用它的subscribe方法来关联相应的观察者:

    observable.subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("RxJava", "onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.e("RxJava", "onSubscribe" + integer.toString());
        }
    
        @Override
        public void onError(Throwable e) {
            Log.e("RxJava", "onError");
        }
    
        @Override
        public void onComplete() {
            Log.e("RxJava", "onComplete");
        }
    });

我们来看一下subscribe方法的源码:

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, 
                observer);//同上面的RxJavaPlugins.onAssembly方法,默认情况下直接将observer返回
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
         ...
        }
    }

subscribe方法中先检查了observer对象是否为null,然后调用了RxJavaPlugins.onSubscribe方法,该方法默认情况下什么都不做直接将observer对象返回,然后又调用了subscribeActual方法。上面我们分析过,create方法生成的Observable其实是一个ObservableCreate对象,因此我们看一下ObservableCreate类中的subscribeActual方法:

    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

可以看到在subscribeActual方法中先生成了一个Emitter事件发射器,然后直接调用了observer的onSubscribe方法,因此onSubscribe方法才可以在事件发射之前被执行。然后又调用了source.subscribe方法,前面我们说过,这个source对象就是在create方法调用时传入的ObservableOnSubscribe对象,此时ObservableOnSubscribe的subscribe方法就被调用了,各种事件也被成功的发射出去了。并且通过源码可以看出,如果在调用subscribe方法时出现异常,则会直接让Emitter发射一个onError事件。

至此一个最简单的订阅流程就完成了。

事件的发射

上面我们简单的分析了Rxjava的订阅流程,可见事件最终是在subscribeActual方法中调用source.subscribe(parent)来发射的。我们在subscribeActual方法中生成了一个CreateEmitter对象,并将这个Emmitter传入了onSubscribe方法中。通过本文最开始的那个例子可以看出,各种事件其实是由这个Emmitter来发射的,我们首先看一下CreateEmmitter对象的构造方法:

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

可见在CreateEmmitter中持有了Observer对象。我们可以通过CreateEmmitter的onNext、onComplete和onError方法来发射各种事件,在这里以onNext的源码为例看一下Emmitter究竟是如何将事件发送给Observer的:

    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

CreateEmmitter的onNext方法非常简单,就是直接调用了observer.onNext方法,onComplete和onError的原理也和onNext方法大同小异,只不过在onComplete和onError方法中还会调用dispose()方法,这样在调用onComplete和onError方法之后observer就无法在接收任何事件了。

map操作

我们可以通过map操作来进行数据类型的转换,map的基本使用方法如下:

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(new Integer(0));
            e.onComplete();
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return integer.toString();
        }
    }).subscribe(new Observer<String>() {
        public void onSubscribe(Disposable d) { Log.e("RxJava", "onSubscribe");}
        public void onNext(String s) { Log.e("RxJava", "onNext" + s);}
        public void onError(Throwable e) {Log.e("RxJava", "onSubscribe");}
        public void onComplete() {Log.e("RxJava", "onSubscribe");}
    });

map方法的源码如下:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

可以看出map方法其实和create方法很像,只不过这里返回的是一个ObservableMap对象,并将原来的Observable对象和参数mapper传入了ObservableMap的构造方法。ObservableMap的构造方法如下:

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);//父类构造方法将source保存在了一个成员变量中
            this.function = function;
        }
        
        ...
    }    

ObsevableMap的构造方法也只是将原Observable对象和Function对象分别保存在了ObservableMap对象的成员变量source和function中。

由此可见,map方法实际上就是生成了一个ObservableMap对象,并将原来的Observable和参数Function保存在了这个ObservableMap对象中。这个时候我们通过subscribe方法进行订阅时,实际上是调用的ObservableMap对象的subscribe方法。之前我们分析过,subscribe方法内部又调用了subscribeActual方法,ObservableMap的subscribeActual方法如下:

    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

可见subscribeActual方法中调用的还是是原Observable的subscribe方法,只不过subscribe方法中的参数不再是我们最初生成的那个Observer对象,而是对Observer进行了转换,变成了一个MapObserver对象。当各种事件发出后,接收事件的就变成了这个MapObserver对象,MapObserver对象对onNext方法进行了重写,MapObserver的源码如下:

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
    
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
    
        @Override
        public void onNext(T t) {
            ...
            U v;
    
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), 
                    "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }
    
        ...
    }

在onNext方法中,传入的参数T是原数据类型,U是转换后的数据类型,之后又调用了mapper.apply(t)来实现了数据类型的转换,这个mapper即我们在调用map方法时传入的Function对象。最后又直接调用了原Observer的onNext方法,并传入了转换后的数据类型,最终完成了map过程。

filter操作

我们也可以通过filter方法来对发射的事件进行过滤,filter方法的基本使用如下:

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(new Integer(0));
            e.onComplete();
        }
    }).filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer s) throws Exception {
            return integer.intValue() > 0 ? true : false;
        }
    }).subscribe(observer);

看一下filter方法的源码:

    public final Observable<T> filter(Predicate<? super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate is null");
        return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
    }

可以看到filter方法的源码和create方法、map方法非常相似,只不过这次的返回数据有变成了ObservableFilter。可见filter方法和create、map方法的套路基本一致,那我们直接来看ObservableFilter的subscribeActual方法:

    public void subscribeActual(Observer<? super T> s) {
        source.subscribe(new FilterObserver<T>(s, predicate));
    }

与ObservableMap类的subscribeActual方法类似,ObservableFilter类的subscribeActual方法也是直接调用了原Observer的subscribe方法,并传入了一个FilterObserver对象,参照之前分析map操作时的套路,我们直接看FilterObserver对象的onNext方法:

    public void onNext(T t) {
        if (sourceMode == NONE) {
            boolean b;
            try {
                b = filter.test(t);
            } catch (Throwable e) {
                fail(e);
                return;
            }
            if (b) {
                actual.onNext(t);
            }
        } else {
            actual.onNext(null);
        }
    }

FilterObserver的onNext方法很简单,直接调用filter.test方法,如果返回true,就调用actual.onNext方法,否则什么都不做。这样就实现了事件的过滤。

通过分析RxJava的订阅流程与map、filter操作符的源码,我们可以对RxJava的核心思想有一个大致的了解,RxJava中的很多其他操作都与map、filter类似,只要掌握了核心思想便不难理解。本文只是对RxJava的原理进行一个简单的讲解,因本人能力有限,如果读者发现文中有什么错误或不足之处,希望能帮忙指出,大家一起共同进步。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,440评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,814评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,427评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,710评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,625评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,014评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,511评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,162评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,311评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,262评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,278评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,989评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,583评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,664评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,904评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,274评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,856评论 2 339

推荐阅读更多精彩内容