RxJava中map的源码分析

本文的分析基于RxJava1.1.5版本,map的主要作用是用来将一个对象转换成另外一个对象,它的实现基于了RxJava中非常重要的lift()方法

1、下面先写一个简单的例子

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onCompleted();
            }
        }).map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                return Integer.valueOf(s);
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e("TAG","---onCompleted()----");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG","---onError()----");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("TAG", "值为:" + integer);
            }
        });

以上是最简单的RxJava的一个试用,因为本文是分析map的过程,所以并没有编写线程调度器

2、下面是具体的分析过程:
首先:进入到map()方法的实现

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

这里传入func1对象作为参数,返回值为lift()方法的返回值,其中在调用liftf()方法的时候,会创建一个OperatorMap对象作为参数传入,而OperatorMap继承自Operator接口,我的理解是,这里主要是为了将func1对象作为参数,然后在OperatorMap对象中创建新的Subscriber对象(新的观察者对象),这里新的Subscriber对象称为subscriber_new

接着进入到liftf()方法中

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

在lift()方法中,会返回新的Observable对象(新的被观察者对象,这里先称为observable_new),同时会将原来的Observable对象中的onSubscribe和subscriber_new作为参数创建OnSubscribeLift对象,在我理解中,这个对象就是新的OnSubscribe对象,这里先称为onsubscribe_new

通过上面两步的分析可以知道,map最终的返回值是在lift()方法中创建的新的Observable对象,即observable_new对象。

接着,我们的程序会到了主线上,程序将会调用subscribe()方法,这个方法用于订阅被观察者,那么,这里观察者将不会是旧的Observable对象,而是新的Observable对象,即observable_new对象,那么我们进入到该方法内部看看,下面是subscribe()方法实现

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

在该方法内部会将旧的Subscriber对象和旧的Observable对象作为参数去调用Observable.subscribe(subscriber, this)

下面接着看Observable.subscribe(subscriber, this)

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }
        
        subscriber.onStart();
        
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            //异常的处理代码省略....
        }
    }

前面的判断跳过,直接看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);这个调用,它的返回值为OnSubscribe对象,这个OnSubscribe对象就是新的OnSubscribe对象,即为onsubscribe_new对象,而调用它的call()方法,即调用的是OnSubscribeLift类中的call()方法,那么进入到OnSubscribeLift类中的call()方法看看,注意,这里传入的参数的旧的subscriber对象

@Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

首先Subscriber<? super T> st =hook.onLift(operator).call(o);这个操作,这里的onLift()方法返回的是传入的operator对象,那么这个operator对象是什么呢,它就是在之前创建的OperatorMap对象,即使新的伪subscriber对象,那么也就是说,这个操作调用的call()方法,调用的是OperatorMap类中的call()方法,传入的参数为旧的subscriber对象,下面进入OperatorMap类中的call()方法中看看

@Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        return parent;
    }

在这个方法中,首先会以旧的subscriber对象和transformer(这个transformer就是func1对象,在创建OperatorMap对象的时候传入)为参数创建新的subscriber对象,之后将该对象添加到subscriptions集合中,并且返回新的subscriber对象。那么在OnSubscribeLift类中的这个类中的call()方法中的Subscriber<? super T> st = hook.onLift(operator).call(o);这个操作所获得的就是新的subscriber对象,接着回到OnSubscribeLift类中的这个类中的call()方法中,在获取到新的subscriber对象之后,它会用新的subscriber对象去调用onStart()方法,之后会执行parent.call(st);这个方法,那么这个parent是什么呢?它就是在创建OnSubscribeLift对象是传入的旧的OnSubscribe对象,那么旧的OnSubscribe对象去调用call()方法,并且传入的参数为新的subscriber对象,那么自然调用的这个call()方法就是我们在创建初始Observable对象的时候的回调的call()方法,即使以下的方法

@Override
public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onCompleted();
            }

这里的subscriber对象已经不是原来的subscriber对象了,而是在OperatorMap类中的call()方法中创建的MapSubscriber对象(即新的subscriber对象),那么它调用的onNext()方法,自然调用的就应该是MapSubscriber类中的onNext()方法

@Override
public void onNext(T t) {
    R result;
            
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        unsubscribe();
        onError(OnErrorThrowable.addValueAsLastCause(ex, t));
        return;
    }
            
    actual.onNext(result);
}

在这个方法中,主要的作用就是将func1对象中的类型转换逻辑,作为新的参数去调用旧的subscriber对象中的onNext()方法,也就是我们自己创建的subscriber对象中的onNext()回调方法,在这里,T类型就是代表旧的数据类型,而R转换后的数据类型,mapper就是func1对象,它调用的call()方法,就是我们自己在call中写的类型转换逻辑,而actual就是旧的subscriber对象,到这里,整个流程就已经贯通了

最后,这里需要注意的就是旧的对象就是代表初始的对象,而新的对象就代表是调用map()方法的时候创建的新的对象,还有要注意的就是旧的Observable对象中的回调call()方法中的subscriber对象在经过map方法之后已经不是旧的subscriber对象了,而是新的subscriber对象

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容