RxJava 2.x上篇(具体用法)

  • 前言
    RxJava出来已经很久了,2.x版本都已经有了,但是掌握的人还是很少。原因有难度太大,操作符太多,不知道它具体有什么用,原来掌握的库在开发中已经够用了,懒得去学。这些原因里面有些是客观的比如操作符确实是太多了,掌握起来难度还是挺大的,剩余的一些基本就是借口了,给自己逃避找个理由安慰下自己罢了。当然我自己也是找理由中的一份子,2.x版本的RxJava出来都已经很久了,自己连RxJava1.x版本都没接触过。不过这也是算是个好事吧,自己1.x版本可以不用学了,可以直接学2.x版本了,福兮祸所依就是这个道理吧,哈哈!所以本篇都是基于RxJava2.x版本的,同时开篇会直接引入具体用法,让你知道用处之后才有动力去学,不然一上来就是一堆的操作符,会让你完全没有继续学下去的兴趣。好了,闲话就扯到这么多了,向上吧,少年!
  • RxJava是什么?
    RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准,但是其实还是不明白,太正式了一点都不接地气。说白了RxJava就是个实现异步操作的库,和Android中现成的 AsyncTask / Handler达到的目的是一样的;那么既然Android已经有了现成的了,我们为什么还要去重新学一个难度这么大的新东西呢?答案就是RxJava能随着程序逻辑变得越来越复杂,依然能够保持简洁,这里的简洁并不是代码量的多少啊(一般代码量少的反而可读性差),而是代码结构简单明了,可读性很好。说了这么多还是贴段代码来的实在:
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                e.onNext(1);
                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                    }
                });

怎么样,是不是感觉结构很清晰,很有层次感,看起来干净整洁很舒服。更加重要的是逻辑简洁,一条从上到下的链式调用,没有任何嵌套,能让开发者快速读懂,从而提升开发效率。

  • EventBus和RxJava对比
    网上有很多说RxJava取代了EventBus的说法,所以我就在这把这两个库做个对比。其实这两个库的设计初衷和使用场景是不是一样的,并不存在说谁取代谁、谁优谁劣的说法,大家会把它们放一起比较主要是因为它们都是采用观察者模。RxJava是为了更方便的处理异步事件流,而EventBus就是为了处理事件分发,功能类似java中的观察者模式和Android中的广播,但是实现起来比这两个简单方便。

二者区别如下:
1、RxJava有大量丰富强大的operator,可以满足用户的大部分数据处理需求。RxJava另一个强大的地方就是scheduler,用户可以为Observable和Subscriber指定不同的执行线程,在Android中可以方便的将Observable指定在IO线程中运行,Subscriber在UI线程中运行。
2、EventBus比较适合仅仅当做组件间的通讯工具使用,主要用来传递消息。使用EventBus可以避免搞出一大推的interface,仅仅是为了实现组件间的通讯,而不得不去实现那一推的接口。

具体用法

操作符和基础没讲一上来就上实例看不懂是很正常的,但是这样讲的目的前面也提过了带着目的性去学习这样效果会更好。看不懂知道大概用法就好了,基础和操作符会留到下篇中去讲,明确目标之后再去学习下篇这样会更加高效。

在app的build.gradle中添加RxJava依赖

dependencies {
    //添加RxJava依赖
    implementation "io.reactivex.rxjava2:rxjava:2.2.3"
    //添加RxAndroid依赖,专门用于Android的Rx库
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
}  

天气信息的网络请求

  1. 通过 Observable.create() 方法创建被观察者,调用 OkHttp 进行网络请求;
  2. 通过 map 操作符结合FastJson对请求结果解析,将 Response 转换为 bean 类;
  3. 通过 doOnNext() 方法,进行一些其它操作;
  4. 调度线程,在子线程中进行耗时操作任务,在主线程中更新 UI ;
  5. 观察者通过 subscribe()订阅事件,根据请求成功或者失败来更新 UI 。
1、准备工作

天气信息接口地址

  • 请求结果Json如下图:


    天气.png

2、添加OkHttp和FastJson依赖

    implementation 'com.squareup.okhttp3:okhttp:3.11.0'
    implementation 'com.alibaba:fastjson:1.1.68.android'

3、声明网络权限

<uses-permission android:name="android.permission.INTERNET"/>

4、创建天气返回结果WeatherBean对象

public class WeatherBean {
    private String success;

    private Result result;

    public void setSuccess(String success) {
        this.success = success;
    }

    public String getSuccess() {
        return this.success;
    }

    public void setResult(Result result) {
        this.result = result;
    }

    public Result getResult() {
        return this.result;
    }

    class Result {
        private String weaid;

        private String days;

        private String week;

        private String cityno;

        private String citynm;

        private String cityid;

        private String temperature;

        private String temperature_curr;

        private String humidity;

        private String aqi;

        private String weather;

        private String weather_curr;

        private String weather_icon;

        private String weather_icon1;

        private String wind;

        private String winp;

        private String temp_high;

        private String temp_low;

        private String temp_curr;

        private String humi_high;

        private String humi_low;

        private String weatid;

        private String weatid1;

        private String windid;

        private String winpid;

        private String weather_iconid;

        public void setWeaid(String weaid) {
            this.weaid = weaid;
        }

        public String getWeaid() {
            return this.weaid;
        }

        public void setDays(String days) {
            this.days = days;
        }

        public String getDays() {
            return this.days;
        }

        public void setWeek(String week) {
            this.week = week;
        }

        public String getWeek() {
            return this.week;
        }

        public void setCityno(String cityno) {
            this.cityno = cityno;
        }

        public String getCityno() {
            return this.cityno;
        }

        public void setCitynm(String citynm) {
            this.citynm = citynm;
        }

        public String getCitynm() {
            return this.citynm;
        }

        public void setCityid(String cityid) {
            this.cityid = cityid;
        }

        public String getCityid() {
            return this.cityid;
        }

        public void setTemperature(String temperature) {
            this.temperature = temperature;
        }

        public String getTemperature() {
            return this.temperature;
        }

        public void setTemperature_curr(String temperature_curr) {
            this.temperature_curr = temperature_curr;
        }

        public String getTemperature_curr() {
            return this.temperature_curr;
        }

        public void setHumidity(String humidity) {
            this.humidity = humidity;
        }

        public String getHumidity() {
            return this.humidity;
        }

        public void setAqi(String aqi) {
            this.aqi = aqi;
        }

        public String getAqi() {
            return this.aqi;
        }

        public void setWeather(String weather) {
            this.weather = weather;
        }

        public String getWeather() {
            return this.weather;
        }

        public void setWeather_curr(String weather_curr) {
            this.weather_curr = weather_curr;
        }

        public String getWeather_curr() {
            return this.weather_curr;
        }

        public void setWeather_icon(String weather_icon) {
            this.weather_icon = weather_icon;
        }

        public String getWeather_icon() {
            return this.weather_icon;
        }

        public void setWeather_icon1(String weather_icon1) {
            this.weather_icon1 = weather_icon1;
        }

        public String getWeather_icon1() {
            return this.weather_icon1;
        }

        public void setWind(String wind) {
            this.wind = wind;
        }

        public String getWind() {
            return this.wind;
        }

        public void setWinp(String winp) {
            this.winp = winp;
        }

        public String getWinp() {
            return this.winp;
        }

        public void setTemp_high(String temp_high) {
            this.temp_high = temp_high;
        }

        public String getTemp_high() {
            return this.temp_high;
        }

        public void setTemp_low(String temp_low) {
            this.temp_low = temp_low;
        }

        public String getTemp_low() {
            return this.temp_low;
        }

        public void setTemp_curr(String temp_curr) {
            this.temp_curr = temp_curr;
        }

        public String getTemp_curr() {
            return this.temp_curr;
        }

        public void setHumi_high(String humi_high) {
            this.humi_high = humi_high;
        }

        public String getHumi_high() {
            return this.humi_high;
        }

        public void setHumi_low(String humi_low) {
            this.humi_low = humi_low;
        }

        public String getHumi_low() {
            return this.humi_low;
        }

        public void setWeatid(String weatid) {
            this.weatid = weatid;
        }

        public String getWeatid() {
            return this.weatid;
        }

        public void setWeatid1(String weatid1) {
            this.weatid1 = weatid1;
        }

        public String getWeatid1() {
            return this.weatid1;
        }

        public void setWindid(String windid) {
            this.windid = windid;
        }

        public String getWindid() {
            return this.windid;
        }

        public void setWinpid(String winpid) {
            this.winpid = winpid;
        }

        public String getWinpid() {
            return this.winpid;
        }

        public void setWeather_iconid(String weather_iconid) {
            this.weather_iconid = weather_iconid;
        }

        public String getWeather_iconid() {
            return this.weather_iconid;
        }

        @Override
        public String toString() {
            return "Result{" +
                    "weaid='" + weaid + '\'' +
                    ", days='" + days + '\'' +
                    ", week='" + week + '\'' +
                    ", cityno='" + cityno + '\'' +
                    ", citynm='" + citynm + '\'' +
                    ", cityid='" + cityid + '\'' +
                    ", temperature='" + temperature + '\'' +
                    ", temperature_curr='" + temperature_curr + '\'' +
                    ", humidity='" + humidity + '\'' +
                    ", aqi='" + aqi + '\'' +
                    ", weather='" + weather + '\'' +
                    ", weather_curr='" + weather_curr + '\'' +
                    ", weather_icon='" + weather_icon + '\'' +
                    ", weather_icon1='" + weather_icon1 + '\'' +
                    ", wind='" + wind + '\'' +
                    ", winp='" + winp + '\'' +
                    ", temp_high='" + temp_high + '\'' +
                    ", temp_low='" + temp_low + '\'' +
                    ", temp_curr='" + temp_curr + '\'' +
                    ", humi_high='" + humi_high + '\'' +
                    ", humi_low='" + humi_low + '\'' +
                    ", weatid='" + weatid + '\'' +
                    ", weatid1='" + weatid1 + '\'' +
                    ", windid='" + windid + '\'' +
                    ", winpid='" + winpid + '\'' +
                    ", weather_iconid='" + weather_iconid + '\'' +
                    '}';
        }
    }

    @Override
    public String toString() {
        return "WeatherBean{" +
                "success='" + success + '\'' +
                ", result=" + result +
                '}';
    }
}

5、activity_main.xml

<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">

    <Button
        android:id="@+id/btn_weather"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_centerHorizontal="true"
        android:text="加载天气信息" />

    <TextView
        android:id="@+id/tv_msg1"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:layout_below="@+id/btn_weather" />

    <View
        android:id="@+id/view"
        android:layout_width="match_parent"
        android:layout_below="@+id/tv_msg1"
        android:layout_height="1dp"
        android:background="@android:color/holo_red_light"/>

    <TextView
        android:id="@+id/tv_msg2"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:layout_below="@+id/view" />

</RelativeLayout>

6、MainActivity.class

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

import com.alibaba.fastjson.JSON;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.Call;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;

public class MainActivity extends AppCompatActivity implements View.OnClickListener {

    private Button btn_weather;
    private TextView tv_msg1;
    private TextView tv_msg2;
    private String url = "http://api.k780.com:88/?weaid=hangzhou&app=weather.today&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        btn_weather = findViewById(R.id.btn_weather);
        btn_weather.setOnClickListener(this);
        tv_msg1 = findViewById(R.id.tv_msg1);
        tv_msg2 = findViewById(R.id.tv_msg2);
    }

    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.btn_weather:
                Observable.create(new ObservableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
                        //发送天气信息请求
                        Request.Builder builder = new Request.Builder()
                                .url(url)
                                .get();
                        Request request = builder.build();
                        Call call = new OkHttpClient().newCall(request);
                        Response response = call.execute();
                        //被观察者将返回的天气信息发射出去
                        emitter.onNext(response);
                    }
                })
                        .subscribeOn(Schedulers.io())//将被观察者线程切换到io操作线程进行网络请求操作
                        .observeOn(AndroidSchedulers.mainThread())//指定map操作符线程为Android主线程(Ui线程)
                        .map(new Function<Response, WeatherBean>() { //对被观察者发送的事件通过Function函数进行变化
                            @Override
                            public WeatherBean apply(Response response) throws Exception {
                                if (response.isSuccessful()) {
                                    ResponseBody body = response.body();
                                    if (body != null) {
                                        //将请求返回Json数据转换为WeatherBean对象
                                        WeatherBean weatherBean = JSON.parseObject(body.string(), WeatherBean.class);
                                        tv_msg1.setText(weatherBean.toString());
                                        return weatherBean;
                                    }
                                }
                                return null;
                            }
                        })
                        .observeOn(Schedulers.newThread())//指定doOnNext()方法的工作线程为常规新线程,执行耗时操作
                        .doOnNext(new Consumer<WeatherBean>() { //让订阅者在接收到数据前进行一些事情处理的操作符
                            @Override
                            public void accept(WeatherBean weatherBean) throws Exception {
                                //加个延时,模拟耗时操作,比如将网络请求数据进行数据库存储或文件存储
                                Thread.sleep(1000);
                            }
                        })
                        .observeOn(AndroidSchedulers.mainThread())//指定观察者接收线程为Android主线程(Ui线程)
                        .subscribe(new Consumer<WeatherBean>() { //观察者订阅被观察者事件
                            @Override
                            public void accept(WeatherBean weatherBean) throws Exception {
                                tv_msg2.setText("日期:" + weatherBean.getResult().getDays() + "\n"
                                        + "城市:" + weatherBean.getResult().getCitynm() + "\n"
                                        + "最高和最低温度:" + weatherBean.getResult().getTemperature() + "\n"
                                        + "当前温度:" + weatherBean.getResult().getTemperature_curr() + "\n"
                                        + "天气:" + weatherBean.getResult().getWeather());
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                tv_msg2.setText("失败:" + throwable.getMessage());
                            }
                        });
                break;
        }
    }
}

7、运行效果

RxJava实例1.gif

先读取缓存,如果缓存没数据再通过网络请求获取数据后更新UI

  • 在实际应用中,很多时候都需要我们先读取缓存的数据,如果缓存没有数据,再通过网络请求获取,随后在主线程更新我们的 UI。concat 操作符简直就是为我们这种需求量身定做。concat 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止( onComplete() ) 后才会定义下一个Observable。利用这个特性,我们就可以先读取缓存数据,倘若获取到的缓存数据不是我们想要的,再调用 onComplete() 以执行获取网络数据的 Observable,如果缓存数据能应我们所需,则直接调用 onNext() ,防止过度的网络请求,浪费用户的流量。
public class MainActivity extends AppCompatActivity implements View.OnClickListener {

    private Button btn_weather;
    private TextView tv_msg1;
    private TextView tv_msg2;
    private String url = "http://api.k780.com:88/?weaid=hangzhou&app=weather.today&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json";
    private WeatherBean cacheWeatherData;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        btn_weather = findViewById(R.id.btn_weather);
        btn_weather.setOnClickListener(this);
        tv_msg1 = findViewById(R.id.tv_msg1);
        tv_msg2 = findViewById(R.id.tv_msg2);
    }

    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.btn_weather:
                //读取缓存数据被观察者
                Observable<WeatherBean> cache = Observable.create(new ObservableOnSubscribe<WeatherBean>() {
                    @Override
                    public void subscribe(ObservableEmitter<WeatherBean> emitter) throws Exception {
                        if (cacheWeatherData != null) { //如果缓存不为空,直接通过缓存更新UI
                            tv_msg1.setText("直接通过缓存数据更新UI");
                            //缓存不为空直接调用onNext()发射数据给观察者更新UI
                            emitter.onNext(cacheWeatherData);
                        } else {
                            // 在操作符 concat 中,只有调用 onComplete 之后才会执行下一个 Observable,这里是下面的network被观察者
                            emitter.onComplete();
                        }
                    }
                });
                //通过网络请求获取数据被观察者
                Observable<WeatherBean> network = Observable.create(new ObservableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
                        //发送天气信息请求
                        Request.Builder builder = new Request.Builder()
                                .url(url)
                                .get();
                        Request request = builder.build();
                        Call call = new OkHttpClient().newCall(request);
                        Response response = call.execute();
                        //被观察者将返回的天气信息发射出去
                        emitter.onNext(response);
                    }
                })
                        .subscribeOn(Schedulers.io())//将被观察者线程切换到io操作线程进行网络请求操作
                        .observeOn(AndroidSchedulers.mainThread())//指定map操作符线程为Android主线程(Ui线程)
                        .map(new Function<Response, WeatherBean>() { //对被观察者发送的事件通过Function函数进行变化
                            @Override
                            public WeatherBean apply(Response response) throws Exception {
                                if (response.isSuccessful()) {
                                    ResponseBody body = response.body();
                                    if (body != null) {
                                        //将请求返回Json数据转换为WeatherBean对象
                                        WeatherBean weatherBean = JSON.parseObject(body.string(), WeatherBean.class);
                                        tv_msg1.setText(weatherBean.toString());
                                        return weatherBean;
                                    }
                                }
                                return null;
                            }
                        })
                        .observeOn(Schedulers.newThread())//指定doOnNext()方法的工作线程为常规新线程,执行耗时操作
                        .doOnNext(new Consumer<WeatherBean>() { //让订阅者在接收到数据前进行一些事情处理的操作符
                            @Override
                            public void accept(WeatherBean weatherBean) throws Exception {
                                //加个延时,模拟耗时操作,比如将网络请求数据进行数据库存储或文件存储
                                Thread.sleep(1000);
                                cacheWeatherData = weatherBean;
                            }
                        });

                // 两个 Observable 的泛型应当保持一致
                Observable.concat(cache, network)
                        .observeOn(AndroidSchedulers.mainThread())//指定观察者接收线程为Android主线程(Ui线程)
                        .subscribe(new Consumer<WeatherBean>() { //观察者订阅被观察者事件
                            @Override
                            public void accept(WeatherBean weatherBean) throws Exception {
                                tv_msg2.setText("日期:" + weatherBean.getResult().getDays() + "\n"
                                        + "城市:" + weatherBean.getResult().getCitynm() + "\n"
                                        + "最高和最低温度:" + weatherBean.getResult().getTemperature() + "\n"
                                        + "当前温度:" + weatherBean.getResult().getTemperature_curr() + "\n"
                                        + "天气:" + weatherBean.getResult().getWeather());
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                tv_msg2.setText("失败:" + throwable.getMessage());
                            }
                        });
                break;
        }
    }
}
  • 有时候我们的缓存可能还会分为 memory 和 disk ,实际上都差不多,无非是多写点 Observable ,然后通过 concat 合并即可。
  • 效果图:
    RxJava实例2.gif

多个网络请求依次依赖

  • 这里就还是拿上面那个请求天气数据例子来做文章。天气数据里面有个weather_icon数据是天气状况图片的url,当天气数据请求下来之后得到图片url,我们还需要依赖这个url继续去请求得到图片显示出来。我们需要用操作符来实现呢,其实就用我们最开始用过的map操作符就可以实现。
  • 类似代码我就不贴出了,仅将点击事件的代码贴出如下,还有就是加了一个显示图片的ImageView控件iv_weather。
Observable.create(new ObservableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
                        //发送天气信息请求
                        Request.Builder builder = new Request.Builder()
                                .url(url)
                                .get();
                        Request request = builder.build();
                        Call call = new OkHttpClient().newCall(request);
                        Response response = call.execute();
                        //被观察者将返回的天气信息发射出去
                        emitter.onNext(response);
                    }
                })
                        .subscribeOn(Schedulers.io())//将被观察者线程切换到io操作线程进行网络请求操作
                        .map(new Function<Response, Bitmap>() {
                            @Override
                            public Bitmap apply(Response response) throws Exception {
                                if (response.isSuccessful()) {
                                    ResponseBody body = response.body();
                                    if (body != null) {
                                        //将请求返回Json数据转换为WeatherBean对象
                                        WeatherBean weatherBean = JSON.parseObject(body.string(), WeatherBean.class);
                                        //根据天气图片url再进行一次网络请求得到图片
                                        Request.Builder builder = new Request.Builder()
                                                .url(weatherBean.getResult().getWeather_icon())
                                                .get();
                                        Request request = builder.build();
                                        Call call = new OkHttpClient().newCall(request);
                                        Response imgResponse = call.execute();
                                        InputStream imgInputStream = imgResponse.body().byteStream();
                                        Bitmap bitmap = BitmapFactory.decodeStream(imgInputStream);
                                        if (bitmap != null) {
                                            return bitmap;
                                        }
                                    }
                                }
                                return null;
                            }
                        })
                        .observeOn(AndroidSchedulers.mainThread())//指定观察者接收线程为Android主线程(Ui线程)
                        .subscribe(new Consumer<Bitmap>() {
                            @Override
                            public void accept(Bitmap bitmap) throws Exception {
                                iv_weather.setImageBitmap(bitmap);
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                tv_msg2.setText("失败:" + throwable.getMessage());
                            }
                        });
  • 效果图


    RxJava实例3.gif

结合多个接口的数据更新UI

  • 依旧是上面那个天气数据请求的例子,比如我要同时请求我家和我工作地方的天气显示在页面上,这时就要用到zip操作符了,zip 操作符可以将多个 Observable 的数据结合为一个数据源再发射出去。
    //请求工作处天气数据
                Observable<WeatherBean> work = Observable.create(new ObservableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
                        //发送天气信息请求
                        Request.Builder builder = new Request.Builder()
                                .url(url)
                                .get();
                        Request request = builder.build();
                        Call call = new OkHttpClient().newCall(request);
                        Response response = call.execute();
                        //被观察者将返回的天气信息发射出去
                        emitter.onNext(response);
                    }
                })
                        .subscribeOn(Schedulers.io())//将被观察者线程切换到io操作线程进行网络请求操作
                        .observeOn(AndroidSchedulers.mainThread())//指定map操作符线程为Android主线程(Ui线程)
                        .map(new Function<Response, WeatherBean>() { //对被观察者发送的事件通过Function函数进行变化
                            @Override
                            public WeatherBean apply(Response response) throws Exception {
                                if (response.isSuccessful()) {
                                    ResponseBody body = response.body();
                                    if (body != null) {
                                        //将请求返回Json数据转换为WeatherBean对象
                                        WeatherBean weatherBean = JSON.parseObject(body.string(), WeatherBean.class);
                                        tv_msg1.setText(weatherBean.toString());
                                        return weatherBean;
                                    }
                                }
                                return null;
                            }
                        });
                //请求家里天气数据
                Observable<String> home = Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        //发送天气信息请求
                        Request.Builder builder = new Request.Builder()
                                .url("http://api.k780.com:88/?weaid=ganzhou&app=weather.today&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json")
                                .get();
                        Request request = builder.build();
                        Call call = new OkHttpClient().newCall(request);
                        Response response = call.execute();
                        //被观察者将返回的天气信息发射出去
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                emitter.onNext(body.string());
                            }
                        }
                    }
                }).subscribeOn(Schedulers.io());//将被观察者线程切换到io操作线程进行网络请求操作
                //结合多个接口的数据更新UI
                Observable.zip(work, home, new BiFunction<WeatherBean, String, String>() {
                    @Override
                    public String apply(WeatherBean weatherBean, String responseString) throws Exception {
                        //合并两地天气
                        String workWeather = weatherBean.getResult().getWeather();
                        int weatherDataStartIndex = responseString.indexOf("\"weather\":\"") + "\"weather\":\"".length();
                        int weatherDataEndIndex = responseString.indexOf("\",", weatherDataStartIndex);
                        String homeWeather = responseString.substring(weatherDataStartIndex, weatherDataEndIndex);
                        return "工作地方天气:" + workWeather + "\n家里天气:" + homeWeather;
                    }
                })
                        .observeOn(AndroidSchedulers.mainThread()) //切换到Ui线程
                        .subscribe(new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                tv_msg2.setText(s);
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                tv_msg2.setText("失败:" + throwable.getMessage());
                            }
                        });

间隔任务实现心跳

  • 想必即时通讯等需要轮训的任务在如今的 APP 中已是很常见,而 RxJava 的 interval 操作符可谓完美地解决了我们的疑惑。
private Disposable mDisposable;

    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.btn_weather:
                mDisposable = Flowable.interval(1, TimeUnit.SECONDS) //interval间隔操作符,自带Long类型返回值从0开始每执行间隔操作一次自加一
                        .observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
                            @Override
                            public void accept(@NonNull Long aLong) throws Exception { //Long aLong为interval自带的返回值
                                tv_msg2.append("心跳次数:" + aLong + "\n");
                            }
                        });
                break;
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) { //页面销毁时记得要停止心跳
            mDisposable.dispose();
        }
    }
  • 效果图
    RxJava实例4.gif

结束语

实例就暂时先告一段落了,虽然有些实例有些牵强,但是对RxJava有个大概的了解完全够了。还有一点提下就是现在RxJava现在基本都是结合Retrofit来一起进行网络请求,但是我们这里结合的是OkHttp,一是我认为这样简单点更便于理解,二是因为是Retrofit提供了和RxJava一起使用的支持,所有这部分内容放到讲Retrofit时会更合适。

感谢

这可能是最好的 RxJava 2.x 入门教程(五)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,056评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,842评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,938评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,296评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,292评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,413评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,824评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,493评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,686评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,502评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,553评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,281评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,820评论 3 305
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,873评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,109评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,699评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,257评论 2 341

推荐阅读更多精彩内容