RxJava2.X源码解析(四)

更多分享:http://www.cherylgood.cn

一、前言

  • 基于RxJava2.1.1
  • 我们在前面的 RxJava2.0使用详解(一)初步分析了RxJava从创建到执行的流程。RxJava2.0使用详解(二) 中分析了RxJava的随意终止Reactive流的能力的来源;也明白了RxJavaonComplete();onError(t);只有一个会被执行的秘密。RxJava2.X 源码分析(三)中探索了RxJava2调用subscribeOn切换被观察者线程的原理。
  • 本次我们将继续探索RxJava2.x切换观察者的原理,分析observeOnsubscribeOn的不同之处。继续实现我们在第一篇中定下的小目标

二、从Demo到原理

  • OK,我们的Demo还是上次的demo,忘记了的小伙伴可以点击RxJava2.X 源码分析(三),这里就不再重复了哦,我们直接进入正题。
  • Ok,按照套路,我们从observeOn方法入手。
  • Ok,我点~_
 @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> observeOn(Scheduler scheduler) {
      //false为默认无延迟发送错误,bufferSize为缓冲区大小
      return observeOn(scheduler, false, bufferSize());
  }
  • 我们继续往下看,我猜套路跟subscribeOn的逃不多,也是采用装饰者模式,wrapper我们的ObservableObserver产生一个中间被观察者和观察中,通过中间被观察者订阅上游被观察者,通过中间观察者接收上游被观察者下发的数据,然后通过线程切换将数据传递给下游观察者。
  • Ok,我们来验证下才想。我觉得就是没完全猜对,也能猜对其中的大部分。
 @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      ObjectHelper.verifyPositive(bufferSize, "bufferSize");
      return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
  }
  • Ok,熟悉的RxJavaPlugins.onAssemblyhook处理,略过,直接看new ObservableObserveOn(this, scheduler, delayError, bufferSize)这句

      public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
          final Scheduler scheduler;
          final boolean delayError;
          final int bufferSize;
          public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
              super(source);
              this.scheduler = scheduler;
              this.delayError = delayError;
              this.bufferSize = bufferSize;
          }
    
          @Override
        protected void subscribeActual(Observersuper T> observer) {
             //1、在当前线程调度,但不是立即执行,放入队列中
              if (scheduler instanceof TrampolineScheduler) {
                  source.subscribe(observer);
              } else {
               //2、本次走的是这里
                  Scheduler.Worker w = scheduler.createWorker();
                //3
                  source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
              }
          }
    
  • Ok,果然,熟悉的模式,对我们上游的Observable,下游的Observerwrapper一次。
    1、ObservableObserveOn继承了AbstractObservableWithUpstream
    2、source保存上游的Observable
    3、scheduler为本次的调度器
    4、在下游调用subscribe订阅时触发->subscribeActual->Wrapper了下游的Observer观察者
  • 3处:source为游Observable,下游Observer被wrapper到ObserveOnObserver,发生订阅数件,上游Observable开始执行subscribeActual,调用ObserveOnObserver的onSubscribe以及onNext、onError、onComplete等

  • OK,我们接着看Observer被包装进 ObserveOnObserver的样子,代码有点多,我们分段讲解
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        //下游的Observer
        final Observersuper T> actual;
        //调度工作者
        final Scheduler.Worker worker;
        //是否延迟错误,默认false
        final boolean delayError;
        //队列大小
        final int bufferSize;
        //存储上游Observable下发的数据队列
        SimpleQueue<T> queue;
        //存储下游Observer的Disposable
        Disposable s;
        //存储错误信息
        Throwable error;
        //校验是否完毕
        volatile boolean done;
        //是否被取消
        volatile boolean cancelled;
        //存储执行模式,同步或者异步 同步
        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observersuper T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
      public void onSubscribe(Disposable s) {

            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                  //1、判断执行模式并调用onSubscribe传递给下游Observer
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        //true 后面的onXX方法都不会被调用
                        done = true;
                        actual.onSubscribe(this);
                        //2、同步模式下,直接调用schedule
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        //2、异步模式下,等待schedule
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //判断执行模式并调用onSubscribe传递给下游Observer
                actual.onSubscribe(this);
            }
        }
  • OK,执行玩这里之后,就到我们的onXX方法了
  • 首先可无限调用的onNext
 @Override
  public void onNext(T t) {
       //3、数据源是同步模式或者执行过error / complete 会是true
      if (done) {
          return;
      }
      //如果数据源不是异步类型,
      if (sourceMode != QueueDisposable.ASYNC) {
          //4、上游Observable下发的数据压入queue
          queue.offer(t);
      }
      //5、开始调度
      schedule();
  }
  • 其次只能触发一次的onError,基本差不多
 @Override
    public void onError(Throwable t) {
        if (done) {
            //6、已完成再执行会抛一场
            RxJavaPlugins.onError(t);
            return;
        }
        //7、记录错误信息
        error = t;
        //8、标识已完成
        done = true;
        //9、开始调度
        schedule();
    }
  • 同样是只能触发一次的onComplete,同样的套路,就不说了
 @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        schedule();
    }
  • 然后就是我们的关键点schedule();
 //关键点就是直接、简单、里面线程调度工作者调用schedule(this),传入了this
    void schedule() {
           //getAndIncrement很关键,他原子性的保证了worker.schedule(this);在调度完之前不会被再次调度
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
  • 什么?传入了this?那么说明什么呢?( ̄∇ ̄)

  • 嗯?this是个runnable,没错,我们的ObserveOnObserver实现了Runnable接口

  • 那么,接下来自然是调用run方法

    @Override
    public void run() {
          //outputFused一般是false
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
  • 好吧,在看drainNormal前,我们先看一个函数
 //从名字看是检测是否已终止
    boolean checkTerminated(boolean d, boolean empty, Observersuper T> a) {
        //1、订阅已取消
        if (cancelled) {
            //清空队列
            queue.clear();
            return true;
        }
        //2、d其实是done,
        if (d) {
            //done==ture可能的情况onNext刚被调度完,onError或者onCompele被调用,
            Throwable e = error;
            if (delayError) {
                //delayError==true时等到队列为空才调用
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return true;
                }
            } else {
                //否则直接调用
                if (e != null) {
                    queue.clear();
                    a.onError(e);
                    worker.dispose();
                    return true;
                } else
     if (empty) {
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        //否则未终结
        return false;
    }
  • true:1、订阅被取消cancelled==true,2、done==true onNext刚被调度完,onError或者onCompele被调用

  • 继续看drainNormal

void drainNormal() {
      int missed = 1;
      final SimpleQueue<T> q = queue;
      final Observersuper T> a = actual;
      //Ok,死循环,我们来看下有哪些出口
      for (;;) {
      //Ok,出口,该方法前面分析的
      if (checkTerminated(done, q.isEmpty(), a)) {
              return;
          }

          //在此死循环
          for (;;) {
              boolean d = done;
              T v;
              try {
                  //分发数据出队列
                  v = q.poll();
              } catch (Throwable ex) {
                  //有异常时终止退出
                  Exceptions.throwIfFatal(ex);
                  s.dispose();
                  q.clear();
                  a.onError(ex);
                  //停止worker(线程)
                  worker.dispose();
                  return;
              }
              boolean empty = v == null;
              //判断队列是否为空
              if (checkTerminated(d, empty, a)) {
                  return;
              }
               //没数据退出
              if (empty) {
                  break;
              }
              //数据下发给下游Obsever,这里支付者onNext,onComplete和onError主要放在了checkTerminated里面回调
              a.onNext(v);
          }
       //保证此时确实有一个 worker.schedule(this);正在被执行,
          missed = addAndGet(-missed);
       //为何要这样做呢?我的理解是保证drainNormal方法被原子性调用,如果执行了addAndGet之后getAndIncrement() == 0就成立了,此时又一个worker.schedule(this);被调用了,那么就不能执行break了
          if (missed == 0) {
              break;
          }
      }
  }

总结

  • Ok,看到这里我们基本了解了observeOn的实现流程,同样是老套路,使用装饰者模式,中间Wrapper了我们的Observable和Observer,通过中间增加一个Observable和Observer来实现线程的切换。
  • 喜欢就给我留言哦

相关文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容

  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,007评论 1 9
  • 先来个RxAndroid的github地址 https://github.com/ReactiveX/RxAndr...
    大批阅读 578评论 0 0
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,451评论 7 62
  • java.util.concurrent.locks包提供了锁和等待条件的接口和类, 可用于替代JDK1.5之前的...
    待汝豪杰只是凡夫阅读 357评论 0 0
  • 石家庄从今天早上就开始刮很大的风! 我骑车上班的时候,被风刮得前行很费劲儿。在过一个小路口儿时...
    關輝阅读 911评论 1 0