RxJava源码

使用

首先从代码层面来分析RxJava的每一步到底干了什么。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
        emitter.onNext("A");
    }
}).map(new Function<String, Bitmap>() {
    @Override
    public Bitmap apply(String s) throws Throwable {
        return null;
    }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<Bitmap>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
              disposable = d; 
        }

        @Override
        public void onNext(@NonNull Bitmap bitmap) {
                
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
  });

源码分析

new ObservableOnSubscribe

在这里ObservableOnSubscribe就是我们的被观察者

public interface ObservableOnSubscribe<@NonNull T> {

    /**
     * Called for each {@link Observer} that subscribes.
     * @param emitter the safe emitter instance, never {@code null}
     * @throws Throwable on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}
Observable.create

调用Observable.create的时候将被观察者传了进来并且创建了\color{#FF0000}{ObservableCreate}对象

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
Hook

hook的含义就是在函数执行的过程中,我们有一个钩子函数,可以优先执行我们的代码,然后再接着执行。
上面的方法中调用了RxJavaPlugins.onAssembly函数
RxJavaPlugins.onAssembly(new ObservableCreate<>(source))

/*在这里什么都没有做,直接将source返回了,所以我们在这里可以给
onObservableAssembly进行赋值,通过setOnObservableAssembly方法。
*/
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
//如果onObservableAssembly有值的话会执行apply方法
 static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
        try {
            //这里会执行我们的hook方法,将被观察者传递进来,在我们实现的hook方法中返回观察者
            return f.apply(t);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }
//自定义hook方法实现
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
            @Override
            public Observable apply(Observable observable) throws Throwable {
                //为了不影响代码流程,我们需要将observable返回
                return observable;
            }
});
map(new Function<String, Bitmap>()

上面我们获取到了\color{#FF0000}{ObservableCreate}对象,再调用map肯定是调用了\color{#FF0000}{ObservableCreate}对象的map方法,由于他继承自Observable,所以调用的还是Observable的map方法。
这里又生成了\color{#FF0000}{ObservableMap}对象

 public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
subscribeOn

\color{#FF0000}{ObservableMap}对象. subscribeOn

public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}

这里传入了一个Scheduler对象Schedulers.io(),并且创建了ObservableSubscribeOn对象

schedulers.io调用过程.png

最终拿到了一个线程池。

observeOn

AndroidSchedulers.mainThread()

生成了一个主线程Handler.png

observeOn创建了ObservableObserveOn对象

new Observer<Bitmap>()
创建了一个观察者
public interface Observer<@NonNull T> {

    /**
     * Provides the {@link Observer} with the means of cancelling (disposing) the
     * connection (channel) with the {@link Observable} in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the {@link Disposable} instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the {@link Observer} with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the {@link Observer} that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the {@link Observer} that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@code Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}
层次图

分析到这里就有了代码嵌套层次了


嵌套层次.png
subscribe

subscribe方法内部调用了subscribeActual(observer);那我们可以看到调用的就是ObservableObserveOn的subscribeActual方法

@Override
protected void subscribeActual(Observer<? super T> observer) {
     //主线程的HandlerScheduler明显不是TrampolineScheduler的子类
     if (scheduler instanceof TrampolineScheduler) {
         source.subscribe(observer);
     } else {
         Scheduler.Worker w = scheduler.createWorker();

         source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
     }
}
//调用了createWorker
@Override
public Worker createWorker() {
    return new HandlerWorker(handler, async);
}
/*source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
这里的source就是$\color{#FF0000}{ObservableSubscribeOn}$对象,因为调用observeOn方法的是ObservableSubscribeOn,并且他将自己作为source传了进去
*/
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}

new ObserveOnObserver

在上面的subscribeActual方法我们可以看到他创建了一个新的ObserveOnObserver对象

//这里接收了一个observer对象,而这个observer正是我们的观察者,也就是在这里对观察者又进行了一次封装
new ObserveOnObserver<>(observer, w, delayError, bufferSize)

source.subscribe,source就是ObservableSubscribeOn

//这里SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);又对observer封装了一层,并且调用了onSubscribe,最终会调到我们自己的观察者的onSubscribe方法
@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

    observer.onSubscribe(parent);
    //将观察者放入线程池中执行,下面的SubscribeTask可以看到在子线程里面执行了source.subscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        //执行了上一步的subscribe方法
        source.subscribe(parent);
    }
}

对观察者进行封装图

观察者封装图.png

一层一层向上调用subscribe(@NonNull Observer<? super T> observer),--->subscribeActual(observer);

当调用到了最上层的时候source就是我们的被观察者了

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        //注意他这里传的可不是this
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
create的source.png

在我们自定义的被观察者里面的subscribe调用了emitter.onNext("A");

从外层向内层调用onNext.png
当执行到以后一层也就是我们自己调用subscribe方法的时候

他会执行ObserveOnObserver的onNext

 @Override
 public void onNext(T t) {
     if (done) {
         return;
     }

     if (sourceMode != QueueDisposable.ASYNC) {
         queue.offer(t);
     }
         schedule();
}
void schedule() {
   if (getAndIncrement() == 0) {
        //这里的work就是我们创建出来的HandlerWork
        worker.schedule(this);
   }
}
//HandlerWork类的schedule方法
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposable.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }
    //使用了主线程的Handler发送消息
    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposable.disposed();
    }

    return scheduled;
}
handler的message中添加了该类的对象.png

当执行上面的run方法的时候

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                 v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }
            //*************************************************  
            //在这里最终调用了onNext的方法
            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
             break;
        }
    }
}

总结

1.执行过程

  • 从上到下对被观察者进行封装
  • 从下往上对观察者进行封装
  • 然后再从上往下执行onNext
    思考一个问题?对观察者和被观察者的封装层数是一样的,那么是不是可以看成是1个被观察者对应一个观察者
    对应关系.png

2.观察者的onSubscribe是在ObservableSubscribeOn的subscribeActual方法中执行的,此时还没有进行线程的切换,那么就是说在那个线程使用的RXJava那么观察者的onSubscribe方法就执行在哪个线程

  1. subscribeOn只会负责上层的线程调度,observeOn只有在执行onNext的时候才起作用,也就是下层的线程调度

4.使用handler(getMainLooper)来保证主线程操作


调用流程.png

自上而下(左边的流程)->自下而上(右边的流程+subscribe的调用)->自上而下(onNext的调用)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339