Rxjava 过程分析三之 subscribeOn

Rxjava 过程分析三之 subscribeOn

说明

  • 只分析 Rxjava 线程切换的大致过程和思想。
  • 以弄明白流程为主, 线程切换就是切换到其他线程中去运行, 我们知道 Rxjava 提供了 newThread, io密集型的, cpu密集型的等方式. 我们就拿看名字最得劲的分析下。 那就是算 newThead。
  • 这篇只介绍 subscribeOn, 至于 observeOn 我们再下一篇再次介绍。

基本使用

Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
        // emitter.onNext("");
        // emitter.onError();
        // emitter.onComplete();
    }
}, BackpressureStrategy.LATEST)
        .subscribeOn(Schedulers.newThread())
        .subscribe(new FlowableSubscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
            }
            @Override
            public void onNext(String s) {
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });

我们可以看到需要切换线程只需要加上一行代码, 一般对应的就是异步操作耗时操作。 使用十分的简单, 那么它是怎么做到的呢, 我们往下分析。

引发的思考

  • 我们多写几遍 subscribeOn 去切换线程可以吗? 有必要吗?
  • 它是再什么时候去切换线程?
  • 它切换出来的线程是那些代码段再运行? eg: 上游再新线程还是下游在呢? 为啥了?
  • 我们再不使用 Rxjava 时, 如果其他线程需要运行到当前一般都时需要一个接口回调出来呢。 Rxjava 可以跟同步一样去拿到结果, 它是怎么解决这个问题的呢?

源码分析

前一堆和后一堆我们就不分析了, 如果不懂的可以看我之前的文章。 我们就直接拿 subscribeOn 开涮。

 public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
    return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}

调用了 subscribeOn 的两个参数的方法。

 public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
    return new FlowableSubscribeOn<T>(this, scheduler, requestOn);
}

好简单对吧, 跟以前一样的套路, 新建的一个处理 SubscribeOn 的 Flowable 而已, 并对成员变量赋值了。 我们从以前的几篇 Rxjava 的讲解中也得知要给套路, 一旦订阅了, 就会执行相应 Flowable 中的 subscribeActual, 所以我们就看看 FlowableSubscribeOn 中的做了啥事呢。

 public void subscribeActual(final Subscriber<? super T> s) {
    Scheduler.Worker w = scheduler.createWorker();
    final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
    s.onSubscribe(sos);

    w.schedule(sos);
}

第一行

来吧, 我们一行一行的分析。 Worker 是啥? 我们跟进去看看 createWorker 代码。

public abstract Worker createWorker();

怎么破。 还记得这个接口是那个去实现的吗? 对, 是 Scheduler.newThead。 那么我定位到 NewThreadScheduler 里。 其实再 Android Studio 中行数旁边点击向下的箭头就可以看看实现了 Scheduler 的类了。 我们会很方便的切换到想看的具体实现的类中。 ok, 我们看看 NewThreadScheduler 做了啥。

public Worker createWorker() {
    return new NewThreadWorker(threadFactory);
}

不说了,快点点进去 NewThreadWorker 是啥子呢。

 public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
}

好嘛, 看到了吗! 是一个封装要给线程池嘛。 好了到这里我们知道了 createWorker 是做啥了把, 不同的 worker 工作者内部实现可能不一样, 也就是选择不同工作者, 其中处理肯定不一样呀, 就比如现在的 newThead 和 io 等肯定是不一样的呀。 不过把抽象后, 是不是写法很好。 是不是又学到了一个编程思想呢。 还不拿笔记下快点!

第二行

好了停下来不扯了, 看看第二行做了什么吧。 仅仅是创建了一个对象, 相应的成员变量赋值。

 static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {}

我们需要关注下它是实现了 Runnable 接口哈, 还有还有实现了 FlowableSubscriber 接口, 你知道我想表达什么吗? 它是有 onNext 等等那些方法呢! 难道它是中间的要给代理接口回调? 也就是我们之前说的, 从分线程吧结果回调出来? 先猜着吧。

第三行

我们继续往下看第三行, 一行不说了, 至少在现在我们简易的里面就是调用了下游的方法, 处理背压等问题, 我们不分析哈。

第四行

我们继续往下看最后一行吧。 嗯? 这个是啥? 距我们之上的分析 worker 在 newThead 里是建了一个线程池, 传入的类又是 Runnable, 我们猜测是不是直接把 Runnable 扔进线程池, 是不是直接去执行了呀? 我们去验证下。 点击去直接跟着点到 NewThreadWorker 里吧。 最后调用到了如下

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = run;

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
        }

        return sr;
    }

我们看到关键的一步是

if (delayTime <= 0) {
    f = executor.submit((Callable<Object>)sr);
} else {
    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
} 

好吧不管有没有延迟吧, 是直接塞到了线程池里执行了呢。 这里可能发现了又 new 出来要给 Runnable, 又包了一层, 干啥玩意呢? 包装了下, 实现了一些特定的处理。 什么? 啥处理, 我不分析了。 我们只关注主流程啦。

到这里你打通一道路没? 就是一旦发生订阅, 就会在指定线程中运行 第二行 SubscribeOnSubscriber 中的 run 方法。 我们看看里面实现了啥。

 public void run() {
    lazySet(Thread.currentThread());
    Publisher<T> src = source;
    source = null;
    src.subscribe(this);
}

你一定要有一个意识就是, 在 run 里的方法都是在指定线程中运行的哈! 比如分线程。 其中 source 就是上一层的 Flowable, 在 run 方法中发生了订阅, 也就是在传递订阅和上面的执行在这一级看来都是 run 里去执行的。

上层调用了上游 onNext 等方法, 就会调用到该类 SubscribeOnSubscriber 中相应的方法。 注意不管怎么调用都是在这个线程中执行的。 对吧, 你想想。 那么让我们看看 onNext 做了什么。

 public void onNext(T t) {
    downstream.onNext(t);
}

没想到吧, 就是这么简单, 把结果直接流给了下游。 那到这里我想问问你在这里的 onNext 及往下流是在哪个线程呢?

前面的疑惑问题

  • 我们多写几遍 subscribeOn 去切换线程可以吗? 有必要吗?

这个嘛, 从我们上面的分析可以看出,如果调用了多次, 在每次向上订阅的时候就会在新的指定的线程中。 至于有必要吗? 看你怎么看了, 我用眼看! 哈哈, 其实在实际开发中, 我们在写上切换线程那语句上面的及订阅后流下来的都是在当前线程中, 我们一般都在分线程去处理数据, 你切多次有啥用或者有用, 看自己了哈。

  • 它是再什么时候去切换线程?

在上面的分析可知, 在发生订阅时, 运行了放到线程池中的 run, 又在 run 里发生了订阅。

  • 它切换出来的线程是那些代码段再运行? eg: 上游再新线程还是下游在呢? 为啥了?

通过上面的分析及上面第一个问题中其实也又回答了, 在 run 里发生订阅, 订阅后的事情都是在线程中运行的

  • 我们再不使用 Rxjava 时, 如果其他线程需要运行到当前一般都时需要一个接口回调出来呢。 Rxjava 可以跟同步一样去拿到结果, 它是怎么解决这个问题的呢?

是的, 其实可以把 SubscribeOnSubscriber 当成一个中间接口回调。 当然看个人理解了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,312评论 5 473
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,578评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,337评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,134评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,161评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,303评论 1 280
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,761评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,421评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,609评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,450评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,504评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,194评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,760评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,836评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,066评论 1 257
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,612评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,178评论 2 341

推荐阅读更多精彩内容