7. Rxjava2 : 合并操作符

1. RxJava2 : 什么是观察者模式
2. RxJava2 : 创建操作符(无关时间)
3. Rxjava2 : 创建操作符(有关时间)
4. Rxjava2 : 变换操作符
5. Rxjava2 : 判断操作符
6. Rxjava2 : 筛选操作符
7. Rxjava2 : 合并操作符
8. Rxjava2 : do操作符
9. Rxjava2 : error处理
10. Rxjava2 : 重试
11. Rxjava2 : 线程切换

api use
concat / concatArray {{concat}}
concatDelayError {{concatDelayError}}
merge / mergeArray {{merge}}
mergeDelayError {{mergeDelayError}}
zip {{zip}}
reduce {{reduce}}
collect {collect}
startWith / startWithArray {{startWith}}

concat / concatArray

  • concat
    1.concat的参数不得超过4个(不能超过4个Observable)
    2.concat是按照顺序合并的
  • concatArray
    concatArray如果参数超过4个(就将Observable组成集合传入),剩下的均与concat相同

流程

3个Observable一共有9个元素
Observable0(元素1,元素2,元素3)
Observable1(元素4,元素5,元素6)
Observable2(元素7,元素8,元素9)
->
按照顺序合并元素
(元素1,元素2,元素3,元素4,元素5,元素6,元素7,元素8,元素9)
->
subscribe
->
observer 观察到9个元素
        Observable<Integer> observable0 = Observable.just(1, 2, 3);
        Observable<Integer> observable1 = Observable.just(4, 5, 6);
        Observable<String> observable2 = Observable.just("7", "8", "9");
        Observable.concat(observable0, observable1, observable2)
                .subscribe(new Observer<Serializable>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Serializable serializable) {
                        if (serializable instanceof Integer) {
                            Log.d(TAG, "Integer:" + serializable);
                        } else if (serializable instanceof String) {
                            Log.d(TAG, "String:" + serializable);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:1
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:2
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:3
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:4
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:5
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:6
02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:7
02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:8
02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:9
02-12 17:42:57.603 6230-6230/... D/SplashActivity: onComplete

concatDelayError

  • concatDelayError
    1.将onError放在了最后执行,那么onError就无法作为结束标识了,只能依靠onComplete作为结束标识
    2.能否执行onError取决于能否找到onComplete标识
    3.采用concat的方式,即按照元素顺序合并,Observable0的complete失去作用,能否执行onError就取决于Observable1是否有结束标识
    4.以onError结束,则不会执行onComplete

流程

Observable
->
Observable0(元素1,元素2,元素3,throwable,complete)
Observable1(元素4,元素5,complete)
->
元素1,元素2,元素3,元素4,元素5,complete
->
元素1,元素2,元素3,元素4,元素5,throwable
->
subscribe
-> 
observer
->
log throwable
//Observable1具有e.onComplete()结束标识
 Observable<Integer> observable = Observable.create(e -> {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new NullPointerException());
            e.onComplete();
        });
        Observable<Integer> observable2 = Observable.create(e -> {
            e.onNext(4);
            e.onNext(5);
            e.onComplete();
        });
        List<Observable<Integer>> objects = new ArrayList<>();
        objects.add(observable);
        objects.add(observable2);
        Observable.concatDelayError(objects)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:1
02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:2
02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:3
02-12 17:57:01.093 7281-7281/... D/SplashActivity: integer:4
02-12 17:57:01.093 7281-7281/... D/SplashActivity: integer:5
02-12 17:57:01.093 7281-7281/... D/SplashActivity: onError
//Observable1不具有e.onComplete()结束标识
Observable<Integer> observable = Observable.create(e -> {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new NullPointerException());
            e.onComplete();
        });
        Observable<Integer> observable2 = Observable.create(e -> {
            e.onNext(4);
            e.onNext(5);
        });
        List<Observable<Integer>> objects = new ArrayList<>();
        objects.add(observable);
        objects.add(observable2);
        Observable.concatDelayError(objects)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:1
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:2
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:3
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:4
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:5

merge

  • merge
    1.merge的参数不得超过4个(不能超过4个Observable)
    2.merge是按照时间合并的
  • mergeArray
    mergeArray如果参数超过4个(就将Observable组成集合传入),剩下的均与concat相同

流程

时间(秒) 0 1 2 3 4 5 6 7 8
Observable0 没发送 没发送 1 没发送 没发送 2 没发送 没发送 3
Observable1 没发送 4 没发送 没发送 5 没发送 没发送 6 没发送
Observable2 没发送 7 没发送 没发送 8 没发送 没发送 9 没发送

依照时间来合并:
则是 4,7,1,5,8,2,6,9,3

Log.d(TAG, "in");
        Observable<Long> longObservable = Observable
                .intervalRange(1, 3, 2, 2, TimeUnit.SECONDS);
        Observable<Long> longObservable1 = Observable
                .intervalRange(4, 3, 1, 2, TimeUnit.SECONDS);
        Observable<Long> longObservable2 = Observable
                .intervalRange(7, 3, 1, 2, TimeUnit.SECONDS);
        Observable
                .merge(longObservable, longObservable1,longObservable2)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "aLong:" + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-13 09:33:04.121 13228-13228/... D/SplashActivity: in
02-13 09:33:05.121 13228-13271/... D/SplashActivity: aLong:4
02-13 09:33:05.121 13228-13272/... D/SplashActivity: aLong:7
02-13 09:33:06.131 13228-13270/... D/SplashActivity: aLong:1
02-13 09:33:07.131 13228-13271/... D/SplashActivity: aLong:5
02-13 09:33:07.131 13228-13272/... D/SplashActivity: aLong:8
02-13 09:33:08.131 13228-13270/... D/SplashActivity: aLong:2
02-13 09:33:09.131 13228-13271/... D/SplashActivity: aLong:6
02-13 09:33:09.131 13228-13272/... D/SplashActivity: aLong:9
02-13 09:33:10.121 13228-13270/... D/SplashActivity: aLong:3
02-13 09:33:10.121 13228-13270/... D/SplashActivity: onComplete

mergeDelayError

  • mergeDelayError
    与concatDelayError类似,onError会在最后接收到
    区别在于:
    concatDelayError:
    会以最后一个onComplete作为结束的标识
    mergeDelayError:
    在时间过程当中,哪个Observable出现了error,哪个Observable就不再继续发送了,直至所有Observable的元素都结束之后,处理onError

流程

时间 0 1 2
Observable0 1 throwable 2
Observable1 3 4 5,complete
合并 1,3 throwable,4 error结束,5,complete

则最后的结果为: 1,3,4,5,onError;

注意:在示例中,我采取了指定不同的线程做延时,如果在相同线程中,会出现卡线程的情况,会影响到结果
Log.d(TAG, "in");
        Observable<Long> longObservable = Observable.create((ObservableOnSubscribe<Long>) e -> {
            e.onNext(1L);
            Thread.sleep(1000);
            e.onError(new NullPointerException());
            Thread.sleep(1000);
            e.onNext(2L);
            e.onComplete();
        }).subscribeOn(Schedulers.newThread());
        Observable<Long> longObservable1 = Observable.create((ObservableOnSubscribe<Long>) e -> {
            e.onNext(3L);
            Thread.sleep(1000);
            e.onNext(4L);
            Thread.sleep(1000);
            e.onNext(5L);
            e.onComplete();
        }).subscribeOn(Schedulers.newThread());
        Observable
                .mergeDelayError(longObservable, longObservable1)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "aLong:" + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-13 10:03:26.701 16418-16418/... D/SplashActivity: in
02-13 10:03:26.711 16418-16463/... D/SplashActivity: aLong:1
02-13 10:03:26.711 16418-16464/... D/SplashActivity: aLong:3
02-13 10:03:27.721 16418-16464/... D/SplashActivity: aLong:4
02-13 10:03:28.711 16418-16464/... D/SplashActivity: aLong:5
02-13 10:03:28.711 16418-16464/... D/SplashActivity: onError

zip

  • zip
    必须严格遵守一一对应的关系

流程

observable 元素 元素 元素 元素
observable0 1 2 3
observable1 4 5 onComplete 6
zip合并(相加) 5 7 onComplete 碰到onComplete标识,已经结束

最终的结果 5,7

Observable<Integer> observable = Observable.create(e -> {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
        });
        Observable<Integer> observable1 = Observable.create(e -> {
            e.onNext(4);
            e.onNext(5);
            e.onComplete();
            e.onNext(6);
        });
        Observable
                .zip(observable, observable1,
                        (integer, integer2) -> integer + integer2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-01 11:25:53.581 11233-11233/... D/ShellAppApplication: integer:5
02-01 11:25:53.581 11233-11233/... D/ShellAppApplication: integer:7
Observable<Integer> observable = Observable.create(e -> {
            e.onError(new NullPointerException());
            e.onNext(2);
            e.onNext(3);
        });
        Observable<Integer> observable1 = Observable.create(e -> {
            e.onNext(4);
            e.onNext(5);
            e.onComplete();
            e.onNext(6);
        });
        Observable
                .zip(observable, observable1,
                        (integer, integer2) ->  integer + integer2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-13 10:34:54.131 17980-17980/... D/SplashActivity: onError

reduce

  • reduce
    对于同一个Observable当中的所有元素逐一处理,合并为同一个元素

流程

observable(元素1,元素2,元素3)
->
reduce
->
apply (元素1,元素2) -> 元素X
apply(元素X,元素3) -> (元素X +元素3) 作为新的元素X 
递归,直至最后一个

-> 
one observable
->
subscribe
Disposable subscribe = Observable.just(1, 2, 3).reduce((integer, integer2) -> {
            Log.d(TAG, "first:" + integer);
            Log.d(TAG, "second:" + integer2);
            return integer + integer2;
        }).subscribe(integer -> Log.d(TAG, "integer:" + integer));

log

02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: first:1
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: second:2
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: first:3
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: second:3
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: integer:6

collect

  • collect
    收集
    1.指定一个容器
    2.将所有的元素收集到这个容器当中
Disposable subscribe = Observable.just(1, 2, 3)
                .collect(new Callable<List<Integer>>() {
                    @Override
                    public List<Integer> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<List<Integer>, Integer>() {
                    @Override
                    public void accept(List<Integer> list, Integer integer) throws Exception {
                        list.add(integer);
                    }
                })
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Log.d(TAG, "integers:" + integers);
                    }
                });

log

02-13 10:49:49.061 18664-18664/... D/SplashActivity: integers:[1, 2, 3]

startWith

  • startWith
    在最开始的时候插入
    1.支持插入单一元素
    2.支持插入Observable
Observable.just(1, 2, 3)
                .startWith(0)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:0
02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:1
02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:2
02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:3
02-13 10:53:59.151 18992-18992/... D/SplashActivity: onComplete
Observable.just(1, 2, 3)
                .startWith(Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:4
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:5
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:6
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:1
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:2
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:3
02-13 10:55:14.241 19238-19238/... D/SplashActivity: onComplete
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342