★42.RxJava2

简介

  • 相关网站:GitHub地址官方文档中文文档
  • RxJava2 通过一种扩展的 观察者模式 来实现 异步 执行任务。
  • RxJava2 能方便地进行 线程 切换。
  • RxJava2 能方便地把 异步 执行的代码写在一处。

基础知识

相关概念

  • Observable被观察者
  • Observer观察者
  • Subscriber订阅者 ,实现了Observer接口,多了unsubscribe(),用来取消订阅。
  • Subscription:类似SubscriberObservable调用subscribe()方法返回的对象。
  • Subject:可以当作ObservableObserver来用。
  • subscribe()订阅 方法。
  • Event:事件。
  • Scheduler调度器 ,相当于 线程控制器
  • Action0:接口,里面只有一个无返回值0参数的call(),同理有Action1Action2等,代表着1个、2个参数等。
  • Func0:接口,类似Action0,区别是Func0有返回值。

相关方法

  • onNext():在 事件队列 中,进入下一个事件时调用,同时也是 事件处理方法
  • onCompleted()事件队列 完成时调用。
  • onError()事件队列 错误时调用。

简单示例

1. 普通示例

1. 定义Observable

Observable<Integer> observable = Observable.create(emitter -> {
    // Todo: 执行后台请求,请求后的结果通过onNext()发送给Observer
    // 发送消息
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    // 消息发送完毕
    emitter.onComplete();
});

2. 定义Observer

Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
        // 订阅时
        // Disposable对象可以保存,日后通过调用Disposable.dispose()来中断订阅。
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "" + value);
        // 接收到事件时
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "error");
        // 接收到错误时
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "complete");
        // 事件接收完毕时
    }
};

3. 订阅

注意 :此处代码的书写顺序看起来不符合直觉,这样设计是为了便于 链式调用

observable.subscribe(observer);

2. 链式示例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    // Todo: 执行后台请求,请求后的结果通过onNext()发送给Observer
    // 发送消息
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    // 消息发送完毕
    emitter.onComplete();
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
        // 订阅时
        // Disposable对象可以保存,日后通过调用Disposable.dispose()来中断订阅。
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "" + value);
        // 接收到事件时
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "error");
        // 接收到错误时
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "complete");
        // 事件接收完毕时
    }
});

3. 无Observer示例

subscribe()的所有重载形式

public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

代码

Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
    emitter.onNext(4);
}).subscribe(
        integer -> Log.d(TAG, "onNext: " + integer),
        throwable -> Log.d(TAG, "onError"),
        () -> Log.d(TAG, "onComplete"));

示例解说

  • emitterObservableEmitter类型,用于发射onNext()onComplete()onError()消息。
  • Disposable对象可以保存,日后通过调用Disposable.dispose()中断订阅中断订阅 以后发送的消息无法被Observer接收到。
  • Observable可以不发送onComplete()onError()
  • Observable发送了一个onComplete()后,ObservableonComplete()之后的onNext()将会继续发送,而Observer收到onComplete()之后将不再继续接收onNext()
  • Observable发送了一个onError()后,ObservableonError()之后的onNext()将继续发送,而Observer收到onError()之后将不再继续接收onNext()
  • onComplete()onError()必须 唯一 并且 互斥 ,即只能发送一个onComplete()onError()

事件流向图

事件流向 示意图
只发送onNext()事件
发送onComplete()事件
发送onError()事件

线程相关

切换线程示例

  • subscribeOn():控制observable在什么 线程 中发送事件。
  • observeOn():控制observer在什么 线程 中处理事件。
// observable的链式操作中
observable.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);

线程种类

线程种类 描述
Schedulers.newThread() 线程
Schedulers.computation() 用于CPU计算密集型的操作的 线程
Schedulers.io() 用于IO操作的 线程 ,如网络IO,文件IO,数据库IO。
AndroidSchedulers.mainThread() AndroidUI主线程

注意事项

  • ObservableObserver默认是在 UI主线程 中运行的。
  • Observable多次切换线程的话,只有第一次有效。
  • Observer多次切换线程的话,只有最后一次生效。

过滤操作符

Sample

简介

  • Sample操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。
  • 会导致某些事件丢失。

示意图

变换操作符

Map

示意图

简介

用于将一种类型的 事件 转换为另一种类型的 事件

啰嗦示例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
}).map(new Function<Integer, String>() {
    @Override
    public String apply(@NonNull Integer integer) throws Exception {
        return "This is result " + integer;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        Log.d(TAG, s);
    }
});

简洁示例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> emitter.onNext(1))
        .map(integer -> "This is result " + integer)
        .subscribe(s -> Log.d(TAG, s));

FlatMap

示意图

简介

  • FlatMap将单个 事件事件队列 变换为一个发送单个 事件事件队列Observable
  • 可以通过FlatMap事件 变换为Observable来实现链式中连接多个ObservableObservable->事件->Observable->事件...->Observer
  • FlatMap并不保证 事件 的顺序,如果需要保证顺序则使用ConcatMap,使用方法同FlatMap

啰嗦示例

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
    }
}).flatMap(new Function<Integer, Observable<String>>() {
    @Override
    public Observable<String> apply(Integer integer) throws Exception {
        final List<String> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add("I am value " + integer);
        }
        return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, s);
    }
});

简洁示例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> emitter.onNext(1))
        .flatMap(new Function<Integer, Observable<String>>() {
            @Override
            public Observable<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
            }
        })
        .subscribe(s -> Log.d(TAG, s));

ConcatMap

  • ConcatMap保证 事件 的顺序,使用方法同FlatMap
  • 示例省略,参考FlatMap

Zip

示意图

简介

  • 组合多个Observable发送的 事件 ,然后发送这个 事件组合
  • 事件组合 的顺序是严格按照 事件 发送的顺序来进行的。
  • 发送 事件 的数量,与所有Observable事件 数量最少的那个一样。
  • 注意:这多个Observable在同一个 线程 时无法将 事件 组合发送,即发送一个Observable的所有 事件 以后,再发送另一个Observable
  • 应用场景:一个界面需要展示用户的一些信息,而这些信息分别要从两个服务器接口中获取,而只有当两个都获取到了之后才能进行展示。

啰嗦示例

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "emit 1");
        emitter.onNext(1);

        Log.d(TAG, "emit complete1");
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io());

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "emit A");
        emitter.onNext("A");

        Log.d(TAG, "emit complete2");
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io());

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
    @Override
    public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
        return integer + s;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        Log.d(TAG, "onNext: " + value);
    }
});

简洁示例

Observable<Integer> observable1 = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    Log.d(TAG, "emit 1");
    emitter.onNext(1);

    Log.d(TAG, "emit complete1");
    emitter.onComplete();
}).subscribeOn(Schedulers.io());

Observable<String> observable2 = Observable.create((ObservableOnSubscribe<String>) emitter -> {
    Log.d(TAG, "emit A");
    emitter.onNext("A");

    Log.d(TAG, "emit complete2");
    emitter.onComplete();
}).subscribeOn(Schedulers.io());

Observable.zip(observable1, observable2, (integer, s) -> integer + s).subscribe(s -> Log.d(TAG, "onNext: " + s));

特殊情况处理

对于ObserverUI主线程 进行 事件 处理,但是接收到 事件 时,所在的ActivityFragment已经退出的情况,需要通过调用Disposable.dispose()Subscription.cancel()中断订阅

1. 创建CompositeDisposable

  • CompositeDisposable对象是储存Disposable对象的容器。
  • 作为ActivityFragment的成员:
    CompositeDisposable mCompositeDisposable = new CompositeDisposable();
    

2. 保存Disposable

Observer.onSubscribe(...)中:

@Override
public void onSubscribe(Disposable d) {
    mCompositeDisposable.add(d);
}

3. 中断订阅

ActivityFragmentonDestroy()中:

mCompositeDisposable.clear();

Flowable

简介

  • Flowable类似Observable,但是性能不如Observable
  • Flowable为解决 Backpressure问题 而生。

Backpressure问题简介

  • Backpressure问题 :当 事件 的发送远远快于 事件 的消耗时,未消耗的 事件 会堆积起来,最终可能发生 OOM
  • 出现 Backpressure问题 可能是因为两个原因:
    • 某个Observable发送 事件 的速度太快,或数量太多。
    • 某个Observer处理 事件 的速度太慢。
  • 事件 的发送和接收在同一个 线程 的时候不会出现这种问题,因为必定会处理完一个 事件 以后才能继续发送下一个 事件

简洁示例

Flowable<Integer> upstream = Flowable.create(emitter -> {
    // 直到下游开始请求事件
    while (emitter.requested() == 0) {
        if (emitter.isCancelled())
            break;
    }
    Log.d(TAG, "emit 1");
    emitter.onNext(1);
    Log.d(TAG, "emit complete");
    emitter.onComplete();
}, BackpressureStrategy.ERROR);

Subscriber<Integer> downstream = new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        Log.d(TAG, "onSubscribe");
        s.request(10);
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
        Log.w(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete");
    }
};

示例解说

  • BackpressureStrategy背压策略 ),表示 事件 发送速度快于消耗速度时说使用的策略,有以下几种:
    • MISSING:多出来的 事件 直接丢弃。
    • ERROR:抛出MissingBackpressureException异常。
    • BUFFER:全部 事件 缓存直到被消耗,注意 OOM 问题。也可以通过Flowable.onBackpressureBuffer()设置。
    • DROP:丢弃最近发送的 事件 。也可以通过Flowable.onBackpressureDrop()设置。
    • LATEST:只保留一个最近的 事件 覆盖前一个 事件 。也可以通过Flowable.onBackpressureLatest()设置。
  • Subscription.cancel():可以用于 取消订阅关系
  • Subscription.request(..):告知FlowableObserver事件请求量 ,可以多次调用,每次调用累加 事件请求量 计数器。
  • FlowableEmitter.requested():得知Observer事件请求量
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容