Rxjava2.0笔记-004-合并,过滤操作符实际应用

关于合并数据源:之前使用了flatMap()以及concatMap()进行嵌套调用,,注册之后登陆

合并数据源2:合并数据(获取图书详情以及评论)统一展示到客户端:采用merge()或者zip()操作符

merge()例子:实现较为简单的从(网络+本地)获取数据,,统一展示

zip()例子:结合Retrofit以及Rxjava,实现多个网络请求合并获得数据,,统一展示

二者区别为:merge()只添加被观察者合并数据源的操作在observable观察者的onnext()里面处理,进行合并,合并的结果在onComplete()处理,zip()可以直接添加发射者,再添加合并数据源的bean,在转主线程,订阅,可以使用new Consumer<Bean>() )里面处理合并结果

/**
     * 合并发射者,按时间线执行
     * 合并事件,还是merge()比较方便好用
     */

    String resultss = "数据源来自:";

    private void merge() {

//        Observable.merge(
//                //延迟发送操作符
//                //从0开始发送,发送3个数据,第一次发件延迟时间1秒。间隔时间1s
//                //
//                Observable.intervalRange(0,3,1,1,TimeUnit.SECONDS),
//                Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS)
//        ).subscribe(aLong -> {
//
//        });

        Observable.merge(
                Observable.just("网络"),
                Observable.just("本地文件")
        ).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(String s) {
                resultss += s;
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {

                KLog.d(TTAG, "接收完成统一处理事件:" + resultss);
            }
        });
    }

下面使用zip操作:

  /**
     * 合并数据源
     */
    private void zip() {

        Observable.zip(
                retrofitApi.getCall().subscribeOn(Schedulers.io()),
                retrofitApi.getCall().subscribeOn(Schedulers.io()),
                (translation, translation2) ->
                        translation.toString() + translation2.toString())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(s -> {

                    KLog.d(TTAG, "合并的数据源是:" + s.toString());
                }, throwable -> {

                });
    }

concat()实例

 /**
     * 该类型的操作符的作用 = 组合多个被观察者
     * 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
     * concat()
     * concatArray()
     * 
     * 实例:从内存以及磁盘和网络获取缓存
     */

    String memoryCache = null;
    String diskCache = "磁盘缓存数据";

    private void concat() {

    
        Observable.concat(
                Observable.create(emitter -> {

                    //判断内存是否含有缓存
                    if (null == memoryCache) {
                        emitter.onComplete();
                    } else {
                        emitter.onNext(memoryCache);
                    }
                }),
                Observable.create(emitter -> {

                    //判断磁盘
                    if (null == diskCache) {
                        emitter.onComplete();
                    } else {
                        emitter.onNext(diskCache);
                    }
                }),
                Observable.create((ObservableOnSubscribe<String>) emitter -> {

                    emitter.onNext("从网络获取缓存数据");
                })
                //通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件),即依次判断检查memory、disk、network
        ).firstElement()
                // 即本例的逻辑为:
                // a. firstElement()取出第1个事件 = memory,即先判断内存缓存中有无数据缓存;由于memoryCache = null,即内存缓存中无数据,所以发送结束事件(视为无效事件)
                // b. firstElement()继续取出第2个事件 = disk,即判断磁盘缓存中有无数据缓存:由于diskCache ≠ null,即磁盘缓存中有数据,所以发送Next事件(有效事件)
                // c. 即firstElement()已发出第1个有效事件(disk事件),所以停止判断。

                .subscribe(s -> {

                    KLog.d(TTAG, "缓存获得路径是:" + s.toString());
                });
    }

combineLatest()实例

进行多个输入框判断,有一个为空时按钮不可点击,都不为空时才可以点击(并且改变输入框颜色)

 /**
     * 通过combineLatest()合并事件 & 联合判断
     * <p>
     * 当两个Observables中的任何一个发送了数据后,
     * 将先发送了数据的Observables 的最新(最后)一个数据 与
     * 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
     */
    private void init() {

        nameObser = RxTextView.textChanges(name).skip(1);
        ageObser = RxTextView.textChanges(age).skip(1);
        jobObser = RxTextView.textChanges(job).skip(1);

        Observable.combineLatest(nameObser, ageObser, jobObser,
                (charSequence, charSequence2, charSequence3) -> {
                    boolean nameIsNOtEmpty = !TextUtils.isEmpty(name.getText());

                   // boolean nameIs = !TextUtils.isEmpty(name.getText()) && name.getText().toString().length() <= 10;
                    boolean ageIsNotEmpty = !TextUtils.isEmpty(age.getText());
                    boolean jobIsNotEmpty = !TextUtils.isEmpty(job.getText());

                    return nameIsNOtEmpty && ageIsNotEmpty && jobIsNotEmpty;
                }
        ).subscribe(aBoolean -> {

            KLog.d(TTAG, "点击结果是:" + aBoolean);
            push.setEnabled(aBoolean);
        });
    }

有条件的轮询操作:

使用关键字:repeatWhen

// 设置变量 = 模拟轮询服务器次数
    private int i = 0 ;
    /**
     * 有条件的轮询
     * 使用操作符:repeatWhen
     */
    private void init3() {


        RetrofitApi retrofitApi = OkHttpUtils.newInstance().create(RetrofitApi.class);

        retrofitApi.getCall()
                .repeatWhen(objectObservable -> {

                    // 将原始 Observable 停止发送事件的标识(Complete() /  Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
                    // 以此决定是否重新订阅 & 发送原来的 Observable,即轮询
                    // 此处有2种情况:
                    // 1. 若返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
                    // 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询

                    return objectObservable.flatMap((Function<Object, ObservableSource<?>>) o -> {

                        // 加入判断条件:当轮询次数 = 5次后,就停止轮询
                        if (i>3){
                            return Observable.error(new Throwable("轮询结束"));
                        }
                        // 若轮询次数<4次,则发送1Next事件以继续轮询
                        // 注:此处加入了delay操作符,作用 = 延迟一段时间发送(此处设置 = 2s),以实现轮询间间隔设置

                        return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
                    });
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Translation translation) {

                        translation.show();
                        i++;
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });


    }

有条件的网络请求出错,重试,可以设置条件

 /**
     * 请求出错去重复查询,可以设置条件
     * 使用操作符:retryWhen
     * 发送网络请求 & 通过retryWhen()进行重试
     * 主要异常才会回调retryWhen()进行重试
     *   参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
     */

    // 设置变量
    // 可重试次数
    private int maxConnectCount = 10;
    // 当前已重试次数
    private int currentRetryCount = 0;
    // 重试等待时间
    private int waitRetryTime = 0;

    private void init4() {

        retrofitApi.getCall()
                .retryWhen(throwableObservable ->

                        throwableObservable.flatMap((Function<Throwable, ObservableSource<?>>) throwable -> {

                            if (throwable instanceof IOException) {

                                if (currentRetryCount < maxConnectCount) {
                                    currentRetryCount++;
                                    waitRetryTime = 1000 + currentRetryCount * 1000;

                                    return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
                                } else {

                                    return Observable.error(new Throwable("超过重试次数:" + currentRetryCount));
                                }
                            } else {
                                return Observable.error(new Throwable("发生异常,非网络"));
                            }
                        }))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Translation translation) {
                        translation.show();
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

有关过滤操作符

ofType

  /**
     * 过滤操作符
     */
    private void useOfType() {

        Observable.just(1, "asd", 2, 3, 4, "qwe")
                .ofType(Integer.class)
                .subscribe(integer -> {

                    KLog.d(TTAG, "获得的整型消息事件是:" + integer);

                });
    }

Skip,,,SkipLast

  /**
     * 跳转开头和跳过结尾消息
     */
    private void userSkipAndSkipLast() {

        // 使用1:根据顺序跳过数据项
        Observable.just(1, 2, 3, 4, 5)
                .skip(1) // 跳过正序的前1项
                .skipLast(2) // 跳过正序的后2项
                .subscribe(integer -> KLog.d(TTAG, "获取到的整型事件元素是: " + integer));

// 使用2:根据时间跳过数据项
        // 发送事件特点:发送数据0-5,每隔1s发送一次,每次递增1;第1次发送延迟0s
        Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
                .skip(1, TimeUnit.SECONDS) // 跳过第1s发送的数据
                .skipLast(1, TimeUnit.SECONDS) // 跳过最后1s发送的数据
                .subscribe(along -> KLog.d(TTAG, "获取到的整型事件元素是: " + along));
    }

throttleFirst(),,throttleLast()

在某段时间内,只发送该段时间内第1次事件 / 最后1次事件

<<- 在某段时间内,只发送该段时间内第1次事件 ->>
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件发送时间
                e.onNext(1);
                Thread.sleep(500);
                
                e.onNext(2);
                Thread.sleep(400);
                
                e.onNext(3);
                Thread.sleep(300);
               
                Thread.sleep(300);
                e.onComplete();
            }
        }).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用数据
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });


<<- 在某段时间内,只发送该段时间内最后1次事件 ->>
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件发送时间
                e.onNext(1);
                Thread.sleep(500);

                e.onNext(2);
                Thread.sleep(400);

                Thread.sleep(300);
                e.onComplete();
            }
        }).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用数据
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        KLog.d(TTAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        KLog.d(TTAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        KLog.d(TTAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                       K Log.d(TTAG, "对Complete事件作出响应");
                    }
                });

实际应用:规定时间内,多次点击按钮禁止多次操作使用throttleFirst,操作符

        RxView.clicks(button)
                .throttleFirst(2, TimeUnit.SECONDS)  // 才发送 2s内第1次点击按钮的事件
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {  
                    }
                    @Override
                    public void onNext(Object value) {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });

Sample()实例应用实时搜索

在某段时间内,只发送该段时间内最新(最后)1次事件,与 throttleLast() 操作符类似

throttleWithTimeout () / debounce()

发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据

        RxTextView.textChanges(ed)
                .debounce(1, TimeUnit.SECONDS)
                .skip(1) //跳过 第1次请求 = 初始输入框的空字符状态
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<CharSequence>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }
                    @Override
                    public void onNext(CharSequence charSequence) {
                        tv.setText("发送给服务器的字符 = " + charSequence.toString());
                    }
                    @Override
                    public void onError(Throwable e) {
                     
                    }
                    @Override
                    public void onComplete() {
                      }
                });

firstElement() ,, lastElement()

仅选取第1个元素 ,,最后一个元素

// 获取第1个元素
        Observable.just(1, 2, 3, 4, 5)
                  .firstElement()
                  .subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept( Integer integer) throws Exception {
                          KLog.d(TTAG,"获取到的第一个事件是: "+ integer);
                      }
        });

// 获取最后1个元素
        Observable.just(1, 2, 3, 4, 5)
                .lastElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept( Integer integer) throws Exception {
                        KLog.d(TTAG,"获取到的最后1个事件是: "+ integer);
                    }
                });

elementAt()

指定接收某个消息,根据索引,可以设置默认消息

  private void userEleMentAt() {
        // 使用1:获取位置索引 = 2的 元素
        // 位置索引从0开始
        Observable.just(1, 2, 3, 4, 5)
                .elementAt(2)
                .subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));

// 使用2:获取的位置索引 > 发送事件序列长度时,设置默认参数
        Observable.just(1, 2, 3, 4, 5)
                .elementAt(6,10)
                .subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));
    }

elementAtOrError()

在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常

 private void userElementAtOrError() {
        Observable.just(1, 2, 3, 4, 5)
                .elementAtOrError(6)
                .subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));

    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342