RxJava 的 Subject

streams everywhere.png

Subject 是一种特殊的存在

在前面一篇文章Cold Observable 和 Hot Observable中,曾经介绍过 Subject 既是 Observable 又是 Observer(Subscriber)。官网称 Subject 可以看成是一个桥梁或者代理。

Subject的分类

Subject包含四种类型分别是AsyncSubject、BehaviorSubject、ReplaySubject和PublishSubject。

1. AsyncSubject

Observer会接收AsyncSubject的onComplete()之前的最后一个数据。

AsyncSubject<String> subject = AsyncSubject.create();
        subject.onNext("asyncSubject1");
        subject.onNext("asyncSubject2");
        subject.onComplete();
        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("asyncSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("asyncSubject onError");  //不输出(异常才会输出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("asyncSubject:complete");  //输出 asyncSubject onComplete
            }
        });

        subject.onNext("asyncSubject3");
        subject.onNext("asyncSubject4");

执行结果:

asyncSubject:asyncSubject2
asyncSubject:complete

改一下代码,将subject.onComplete()放在最后。

        AsyncSubject<String> subject = AsyncSubject.create();
        subject.onNext("asyncSubject1");
        subject.onNext("asyncSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("asyncSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("asyncSubject onError");  //不输出(异常才会输出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("asyncSubject:complete");  //输出 asyncSubject onComplete
            }
        });

        subject.onNext("asyncSubject3");
        subject.onNext("asyncSubject4");
        subject.onComplete();

执行结果:

asyncSubject:asyncSubject4
asyncSubject:complete

注意,subject.onComplete()必须要调用才会开始发送数据,否则Subscriber将不接收任何数据。

2. BehaviorSubject

Observer会接收到BehaviorSubject被订阅之前的最后一个数据,再接收订阅之后发射过来的数据。如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。

        BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("behaviorSubject:"+s); 
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("behaviorSubject onError");  //不输出(异常才会输出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("behaviorSubject:complete");  //输出 behaviorSubject onComplete
            }
        });

        subject.onNext("behaviorSubject2");
        subject.onNext("behaviorSubject3");

执行结果:

behaviorSubject:behaviorSubject1
behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3

在这里,behaviorSubject1是默认值。因为执行了

BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");

稍微改一下代码,在subscribe()之前,再发射一个事件。

        BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
        subject.onNext("behaviorSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("behaviorSubject:"+s);  //输出asyncSubject:asyncSubject3
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("behaviorSubject onError");  //不输出(异常才会输出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("behaviorSubject:complete");  //输出 behaviorSubject onComplete
            }
        });

        subject.onNext("behaviorSubject3");
        subject.onNext("behaviorSubject4");

执行结果:

behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3
behaviorSubject:behaviorSubject4

这次丢弃了默认值,而发射behaviorSubject2。
因为BehaviorSubject 每次只会发射调用subscribe()方法之前的最后一个事件和调用subscribe()方法之后的事件。

BehaviorSubject还可以缓存最近一次发出信息的数据。

3. ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

        ReplaySubject<String> subject = ReplaySubject.create();
        subject.onNext("replaySubject1");
        subject.onNext("replaySubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("replaySubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("replaySubject onError");  //不输出(异常才会输出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("replaySubject:complete");  //输出 replaySubject onComplete
            }
        });

        subject.onNext("replaySubject3");
        subject.onNext("replaySubject4");

执行结果:

replaySubject:replaySubject1
replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4

稍微改一下代码,将create()改成createWithSize(1)只缓存订阅前最后发送的1条数据

        ReplaySubject<String> subject = ReplaySubject.createWithSize(1);
        subject.onNext("replaySubject1");
        subject.onNext("replaySubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("replaySubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("replaySubject onError");  //不输出(异常才会输出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("replaySubject:complete");  //输出 replaySubject onComplete
            }
        });

        subject.onNext("replaySubject3");
        subject.onNext("replaySubject4");

执行结果:

replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4

这个执行结果跟BehaviorSubject是一样的。但是从并发的角度来看,ReplaySubject 在处理并发 subscribe() 和 onNext() 时会更加复杂。

ReplaySubject除了可以限制缓存数据的数量和还能限制缓存的时间。使用createWithTime()即可。

4. PublishSubject

Observer只接收PublishSubject被订阅之后发送的数据。

        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("publicSubject1");
        subject.onNext("publicSubject2");
        subject.onComplete();

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("publicSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("publicSubject onError");  //不输出(异常才会输出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("publicSubject:complete");  //输出 publicSubject onComplete
            }
        });

        subject.onNext("publicSubject3");
        subject.onNext("publicSubject4");

执行结果:

publicSubject:complete

因为subject在订阅之前,已经执行了onComplete()方法,所以无法发射数据。稍微改一下代码,将onComplete()方法放在最后。

        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("publicSubject1");
        subject.onNext("publicSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("publicSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("publicSubject onError");  //不输出(异常才会输出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("publicSubject:complete");  //输出 publicSubject onComplete
            }
        });

        subject.onNext("publicSubject3");
        subject.onNext("publicSubject4");
        subject.onComplete();

执行结果:

publicSubject:publicSubject3
publicSubject:publicSubject4
publicSubject:complete

最后,一句话总结一下四个Subject的特性。

Subject 发射行为
AsyncSubject 不论订阅发生在什么时候,只会发射最后一个数据
BehaviorSubject 发送订阅之前一个数据和订阅之后的全部数据
ReplaySubject 不论订阅发生在什么时候,都发射全部数据
PublishSubject 发送订阅之后全部数据

可能错过的事件

Subject 作为一个Observable时,可以不停地调用onNext()来发送事件,直到遇到onComplete()才会结束。

PublishSubject<String> subject = PublishSubject.create();
        subject.subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {

                    }
                },new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("completed");
                    }
                });
        subject.onNext("Foo");
        subject.onNext("Bar");
        subject.onComplete();

执行的结果:

Foo
Bar
completed

如果,使用 subsribeOn 操作符将 subject 切换到IO线程,再使用 Thread.sleep(2000) 让主线程休眠2秒。

 PublishSubject<String> subject = PublishSubject.create();
        subject.subscribeOn(Schedulers.io())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {

                    }
                },new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("completed");
                    }
                });
        subject.onNext("Foo");
        subject.onNext("Bar");
        subject.onComplete();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

这时,其执行的结果变为:

completed

为何会缺少打印Foo和Bar?

因为,subject 发射元素的线程被指派到了 IO 线程,此时 IO 线程正在初始化还没起来,subject 发射前这两个元素Foo、Bar还在主线程中,主线程的这两个元素往 IO 线程转发的过程中由于 IO 线程还没有起来,所以就被丢弃了。此时,无论Thread睡了多少秒,Foo、Bar都不会被打印出来。

其实,解决办法也很简单,将subject改成使用Observable.create()来替代,它允许为每个订阅者精确控制事件的发送,这样就不会缺少打印Foo和Bar。

使用PublishSubject来实现简化的RxBus

下面的代码是一个简化版本的Event Bus,在这里使用了PublishSubject。因为事件总线是基于发布/订阅模式实现的,如果某一事件在多个Activity/Fragment中被订阅的话,在App的任意地方一旦发布该事件,则多个订阅的地方都能够同时收到这一事件(在这里,订阅事件的Activity/Fragment不能被destory,一旦被destory就不能收到事件),这很符合Hot Observable的特性。所以,我们使用PublishSubject,考虑到多线程的情况,还需要使用 Subject 的 toSerialized() 方法。

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

public class RxBus {

    private final Subject<Object> mBus;

    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.onNext(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }
}

在这里Subject的toSerialized(),使用SerializedSubject包装了原先的Subject。

    /**
     * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized subject
     */
    @NonNull
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }

这个版本的Event Bus比较简单,并没有考虑到背压的情况,因为在 RxJava2.x 中 Subject 已经不再支持背压了。如果要增加背压的处理,可以使用Processor,我们需要将 PublishSubject 改成 PublishProcessor,对应的 Observable 也需要改成 Flowable。

使用BehaviorSubject来实现预加载

预加载可以很好的提高程序的用户体验。
每当用户处于弱网络时,打开一个App可能出现一片空白或者一直在loading,那用户一定会很烦躁。此时,如果能够预先加载一些数据,例如上一次打开App时保存的数据,这样不至于会损伤App的用户体验。

下面是借助 BehaviorSubject 的特性来实现一个简单的预加载类RxPreLoader。

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.BehaviorSubject;

/**
 * Created by Tony Shen on 2017/6/2.
 */

public class RxPreLoader<T> {

    //能够缓存订阅之前的最新数据
    private  BehaviorSubject<T> mData;
    private Disposable disposable;

    public RxPreLoader(T defaultValue) {

        mData = BehaviorSubject.createDefault(defaultValue);
    }

    /**
     * 发送事件
     * @param object
     */
    public void publish(T object) {
        mData.onNext(object);
    }

    /**
     * 订阅事件
     * @param onNext
     * @return
     */
    public  Disposable subscribe(Consumer onNext) {
        disposable = mData.subscribe(onNext);
        return disposable;
    }

    /**
     * 反订阅
     *
     */
    public void dispose() {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
            disposable = null;
        }
    }

    /**
     * 获取缓存数据的Subject
     *
     * @return
     */
    public BehaviorSubject<T> getCacheDataSubject() {
        return mData;
    }

    /**
     * 直接获取最近的一个数据
     *
     * @return
     */
    public T getLastCacheData() {
        return mData.getValue();
    }
}

可以考虑在基类的Activity/Fragment中也实现一个类似的RxPreLoader。

总结

RxJava 的 Subject 是一种特殊的存在,它的灵活性在使用时也会伴随着风险,没有用好它的话会错过事件,并且使用时还要小心 Subject 不是线程安全的。当然很多开源框架都在使用Subject,例如大名鼎鼎的RxLifecycle使用了BehaviorSubject。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,519评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,842评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,544评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,742评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,646评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,027评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,513评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,169评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,324评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,268评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,299评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,996评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,591评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,667评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,911评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,288评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,871评论 2 341

推荐阅读更多精彩内容