RxJava2 使用解析——常见的使用场景

RxJava——目前最热门的响应式函数编程框架。
本文主要总结了笔者在项目中使用到的RxJava的场景,部分例子参考自网络

[笔者仍为Android初学者。如有解释错误的地方,欢迎评论区指正探讨]

本文主要介绍一些RxJava的使用场景,为了完整介绍,本文不使用Retrofit或者将RxJava进行简单的封装,原汁原味。
当然,如果结合Retrofit,下述的代码会简洁很多。


单个网络请求数据并更新UI

这个比较简单,整个流程大致是:

  1. 通过Obsrvable.create方法,调用OkHttp网络请求
  2. 通过map方法结合gson,将response转换为bean
  3. 通过onNext,解析bean中数据,并进行数据库存储
  4. 调度线程
  5. 通过subscribe,根据请求成功或异常来更新UI
Observable.create(new ObservableOnSubscribe<Response>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
        Request.Builder builder = new Request.Builder()
                .url("url")
                .get();
        Request request = builder.build();
        Call call = new OkHttpClient().newCall(request);
        Response response = call.execute();
        e.onNext(response);
    }
}).map(new Function<Response, Bean>() {
    @Override
    public Bean apply(@NonNull Response response) throws Exception {
        //Gson
    }
}).doOnNext(new Consumer<Bean>() {
    @Override
    public void accept(@NonNull Bean bean) throws Exception {
        //saveData
    }
})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Bean>() {
        @Override
        public void accept(@NonNull Bean bean) throws Exception {
            //refresh UI
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(@NonNull Throwable throwable) throws Exception {
            //get ERROR
        }
    });

多个网络请求依次依赖

这里主要是依赖于flatMap关键字,FlatMap可以将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
利用这个特性,我们可以将Observable转成另一个Observable

Observable.create(new ObservableOnSubscribe<Response>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
        Request.Builder builder = new Request.Builder()
                .url("url")
                .get();
        Request request = builder.build();
        Call call = new OkHttpClient().newCall(request);
        Response response = call.execute();
        e.onNext(response);
    }
}).map(new Function<Response, FirstBean>() {
    @Override
    public FirstBean apply(@NonNull Response response) throws Exception {
        //Gson
    }
}).flatMap(new Function<FirstBean, ObservableSource<Response>>() {
    @Override
    public ObservableSource<Response> apply(@NonNull FirstBean bean) throws Exception {
        final String s = bean.getData();
        return Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Request.Builder builder = new Request.Builder()
                        .url("url/" + s)
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        });
    }
}).map(new Function<Response, SecondBean>() {
    @Override
    public SecondBean apply(@NonNull Response response) throws Exception {
        //Gson
    }
})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<SecondBean>() {
        @Override
        public void accept(@NonNull SecondBean secondBean) throws Exception {
            //refresh UI
        }
    });

先读取缓存数据并展示UI再获取网络数据刷新UI

这里需要依赖另一个操作符:Concat
concat可以做到不交错的发射两个或多个Observable的发射物,并且只有前一个Observable终止(onComleted)才会订阅下一个Obervable

利用这个特性,我们就可以依次的读取缓存数据展示UI,然后再获取网络数据刷新UI

  1. 首先创建一个从cache获取数据的observable
  2. 再创建一个从网络获取数据的Observable(可以通过map等方法转换数据类型)
  3. 通过concat方法将多个observable结合起来
  4. 通过subscribe订阅每一个observable
Observable<List<String>> cache = Observable.create(new ObservableOnSubscribe<List<String>>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        CacheManager manager = CacheManager.getInstance();
        List<String> data = manager.query();
        e.onNext(data);
        //一定要有onComplete,不然不会执行第二个Observale
        e.onComplete();
    }
});

Observable<List<String>> network = Observable.create(new ObservableOnSubscribe<Response>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        Request.Builder builder = new Request.Builder()
                .url("url")
                .get();
        Request request = builder.build();
        Call call = new OkHttpClient().newCall(request);
        Response response = call.execute();
        e.onNext(response);
        e.onComplete();
    }
}).map(new Function<Response, List<String>>() {
    @Override
    public List<String> apply(@NonNull Response response) throws Exception {
        //解析数据
    }
});

//两个observable的泛型应该保持一致
Observable.concat(cache, network)
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(@NonNull List<String> strings) throws Exception {
                //refresh ui
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                //get error
            }
        });

获取网络数据前先读取缓存

其实和上面的那种类似,只需要稍微修改一下逻辑即可:
当缓存的Observable获取到数据时,只执行onNext,获取不到则只执行onComplete

Observable<String> cache = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        CacheManager manager = CacheManager.getInstance();
        String data = manager.queryForPosition(0);
        if (data != null) {
            e.onNext(data);
        } else {
            //调用onComplete之后会执行下一个Observable
            //如果缓存为空,那么直接结束,进行网络请求
            e.onComplete();
        }
    }
});

Observable<String> network = Observable.create(new ObservableOnSubscribe<Response>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        Request.Builder builder = new Request.Builder()
                .url("url")
                .get();
        Request request = builder.build();
        Call call = new OkHttpClient().newCall(request);
        Response response = call.execute();
        e.onNext(response);
        e.onComplete();
    }
}).map(new Function<Response, String>() {
    @Override
    public String apply(@NonNull Response response) throws Exception {
        //解析数据
    }
});

//两个observable的泛型应该保持一致
Observable.concat(cache, network)
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String strings) throws Exception {
                //refresh ui
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                //get error
            }
        });

当然,有的时候我们的缓存可能还会分为memorydisk,无差,只需要多写一个Observable然后一样通过concat合并即可。

结合多个接口的数据再更新UI

这个时候就需要靠zip方法啦,zip方法可以将多个Observable的数据结合为一个数据源再发射出去。

Observable<FirstBean> firstRequest = Observable.create(new ObservableOnSubscribe<Response>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        Request.Builder builder = new Request.Builder()
                .url("firstUrl")
                .get();
        Request request = builder.build();
        Call call = new OkHttpClient().newCall(request);
        Response response = call.execute();
        e.onNext(response);
        e.onComplete();
    }
}).map(new Function<Response, FirstBean>() {
    @Override
    public FirstBean apply(@NonNull Response response) throws Exception {
        //解析数据
    }
});

Observable<SecondBean> secondRequest = Observable.create(new ObservableOnSubscribe<Response>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        Request.Builder builder = new Request.Builder()
                .url("secondUrl")
                .get();
        Request request = builder.build();
        Call call = new OkHttpClient().newCall(request);
        Response response = call.execute();
        e.onNext(response);
        e.onComplete();
    }
}).map(new Function<Response, SecondBean>() {
    @Override
    public SecondBean apply(@NonNull Response response) throws Exception {
        //解析数据
    }
});

Observable.zip(firstRequest, secondRequest, new BiFunction<FirstBean, SecondBean, WholeBean>() {
    @Override
    public WholeBean apply(@NonNull FirstBean firstBean, @NonNull SecondBean secondBean) throws Exception {
        //结合数据为一体
    }
})
    .subscribeOn(Schedulers.io())
    .subscribe(new Consumer<WholeBean>() {
        @Override
        public void accept(@NonNull WholeBean strings) throws Exception {
            //refresh ui
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(@NonNull Throwable throwable) throws Exception {
            //get error
        }
    });

当然,如果你的两个api返回的是相同类型的数据,那么可以直接使用merge将数据合并,而不需要实现回调。

减少频繁的网络请求

设想一种场景:点击一次button就进行一次网络请求,或者当输入框数据变化时进行网络请求,那么这样就会在一下子产生大量的网络请求,但实际上又没有必要,这个时候就可以通过debounce方法来处理,debounce操作符会过滤掉发射速率过快的数据项:

为了方便处理点击事件Observable的关系,我们引入RxBinding处理:

RxView.clicks(mButton)
        .debounce(2, TimeUnit.SECONDS)
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(@NonNull Object o) throws Exception {
                // refresh ui
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                // get error
            }
        });
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 193,812评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,626评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,144评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,052评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,925评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,035评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,461评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,150评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,413评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,501评论 2 307
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,277评论 1 325
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,159评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,528评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,868评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,143评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,407评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,615评论 2 335

推荐阅读更多精彩内容