RxJava 使用总结

Rxjava

角色 作用 类比
Observable(被观察者) 事件的生产者 顾客
Observer(观察者) 事件消费者,接收事件后作出响应 厨师
Subscribe(订阅) 将Observable和Observer连接在一起 服务员
Event(事件) Observable通知Observer的载体 菜品

RxJava 3 主要特点

  • 单一依赖:Reactive-Streams
  • 继续支持Java 6+和Android 2.3+
  • 修复了API错误和RxJava 2的许多限制
  • 旨在替代RxJava 2,具有相对较少的二进制不兼容更改
  • 提供Java 8 lambda友好的API
  • 关于并发源的不同意见
  • 异步或同步执行
  • 参数化并发的虚拟时间和调度程序
  • 为测试schedulers,consumers和plugin hooks提供测试和诊断支持

RxJava 3 与RxJava 2的主要区别是:

  • 将eagerTruncate添加到replay运算符,以便head节点将在截断时丢失它保留的项引用
  • 新增 X.fromSupplier()
  • 使用 Scheduler 添加 concatMap,保证 mapper 函数的运行位置
  • 新增 startWithItem 和 startWithIterable
  • ConnectableFlowable/ConnetableFlowable 重新设计
  • 将 as() 并入 to()
  • 更改 Maybe.defaultIfEmpty() 以返回 Single
  • 用 Supplier 代替 Callable
  • 将一些实验操作符推广到标准
  • 从某些主题/处理器中删除 getValues()
  • 删除 replay(Scheduler) 及其重载
  • 删除 dematerialize()
  • 删除 startWith(T|Iterable)
  • 删除 as()
  • 删除 Maybe.toSingle(T)
  • 删除 Flowable.subscribe(4 args)
  • 删除 Observable.subscribe(4 args)
  • 删除 Single.toCompletable()
  • 删除 Completable.blockingGet()

下面是具体使用

依赖引用

implementation "io.reactivex.rxjava3:rxjava:3.1.5"

Observable(被观察者)的创建

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {

        e.onNext("事件1");
        e.onNext("事件2");
        e.onNext("事件3");
        e.onComplete();
    }
});
  • Observable.create(): 创建一个Observable的最基本方法,也可以通过just()、from()等方法来简化操作。
    • just() 它只能发射整个列表,不会迭代发射整个列表的每个值
    • from() 操作符可以转换Future、Iterable和数组。将 Iterable 和数组转化为单个数据
  • ObservableOnSubscribe<>(): 一个接口,在复写的subscribe()里定义需要发送的事件。
  • ObservableEmitter: 这是RxJava2中新推出的类,可以理解为发射器,用于发射数据onNext()和通知onComplete()/onError()。

Observer(观察者) 的创建:

Observer<String> observer = new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {

        Log.d(TAG, "onSubscribe: 达成订阅");
    }

    @Override
    public void onNext(String s) {

        Log.d(TAG, "onNext: 响应了"+s);
    }

    @Override
    public void onError(Throwable e) {

        Log.d(TAG, "onError: 执行错误");
    }

    @Override
    public void onComplete() {

        Log.d(TAG, "onComplete: 执行完成");
    }
};
  • onNext():普通事件,通过重写进行响应即可。
  • onError():错误事件,当队列中事件处理出现异常时,就会调用该方法,此后不再有事件发出。
  • onComplete():完成事件,当队列中的事件全部处理完成后触发。
    在一个正常的序列中,onError()和onComplete()有且仅有处于事件队列的末尾,并且为互斥关系,即调用了一个就不应该再调用另一个。

形成订阅

通过 subscribe 形成订阅

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {

        e.onNext("事件1");
        e.onNext("事件2");
        e.onNext("事件3");
        e.onComplete();
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

        Log.d(TAG, "onSubscribe: 达成订阅");
    }

    @Override
    public void onNext(String s) {

        Log.d(TAG, "onNext: 响应了"+s);
    }

    @Override
    public void onError(Throwable e) {

        Log.d(TAG, "onError: 执行错误");
    }

    @Override
    public void onComplete() {

        Log.d(TAG, "onComplete: 执行完成");
    }
});

更简洁的代码

如果我们不关心onCompleted()或者onError()的话,那么可以使用一个更简单的类来定义onNext()期间要完成什么功能

Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};

当我们不关心onCompleted()或者onError()时,为了更简化代码,可以使用以下代码:

Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });

但是这种简便的写法不利于处理异常,所以可能会报 crash,这时再添加一个 new Action1<Throwable>() 能够很好的 catch 住 crash,代码如下:

Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Error encountered: " + throwable.getMessage());
            }
        });

如果还想实现 onCompleted() 还可以再加一个 new Action0() 用来完成 onCompleted(), 代码如下:

 Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Error encountered: " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("Sequence complete");
            }
        });

更简便点还可以使用 Lambda:

 Observable.unsafeCreate((Observable.OnSubscribe<String>) subscriber -> {
        System.out.println(subscriber);
        }).subscribe(s -> System.out.println(s), 
                throwable -> System.out.println("Error encountered: " + throwable.getMessage()),
                () -> System.out.println("Sequence complete"));

分配线程

subscribeOn

为上面的内容 分配线程 Schedulers.io()

observeOn

为下面分配线程,一般是主线程 AndroidSchedulers.mainThread()

开发中常用到的操作符

  • Create — 通过调用观察者的方法从头创建一个Observable
  • From — 将其它的对象或数据结构转换为Observable
  • Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
  • Interval — 创建一个定时发射整数序列的Observable
  • Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
  • FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。

例子:使用 map 将 Long 型转化为 String,再使用 FlatMap 将 String 转化为 Long

 Observable.interval(5, TimeUnit.SECONDS)
                .map(new Func1<Long, String>() {
                    @Override
                    public String call(Long aLong) {
                        return String.valueOf(aLong);
                    }
                })
                .flatMap(new Func1<String, Observable<Long>>() {
                    @Override
                    public Observable<Long> call(String aLong) {
                        Observable<Long> observable = Observable.just(Long.parseLong(aLong));
                        return observable;
                    }
                }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long string) {
                            Log.d("test", String.valueOf(string));
                    }
                });

Func vs Action:Func 提供数据,而 Action 消耗数据,Func0 代表没有参数,Func1 有一个参数,以此类推,Action 同理,可以有 N 个参数

操作符列表

下面是常用的操作符列表:

  1. 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 错误处理 Catch和Retry
  6. 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 转换操作 To
  10. 连接操作 Connect, Publish, RefCount, Replay
  11. 反压操作,用于增加特殊的流程控制策略的操作符

创建操作

用于创建Observable的操作符

  • Create — 通过调用观察者的方法从头创建一个Observable
  • From — 将其它的对象或数据结构转换为Observable
  • Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
  • Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
  • Timer — 创建在一个指定的延迟之后发射单个数据的Observable
  • Interval — 创建一个定时发射整数序列的Observable
  • Range — 创建发射指定范围的整数序列的Observable
  • Repeat — 创建重复发射特定的数据或数据序列的Observable
  • Start — 创建发射一个函数的返回值的Observable
  • Empty/Never/Throw — 创建行为受限的特殊Observable

变换操作

这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档

  • Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
  • FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
  • Buffer — 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • GroupBy — 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
  • Scan — 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值
  • Window — 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集

过滤操作

这些操作符用于从Observable发射的数据中进行选择

  • Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作
  • Distinct — 去重,过滤掉重复数据项
  • ElementAt — 取值,取特定位置的数据项
  • Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
  • First — 首项,只发射满足条件的第一条数据
  • IgnoreElements — 忽略所有的数据,只保留终止通知(onError或onCompleted)
  • Last — 末项,只发射最后一条数据
  • Sample — 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst
  • Skip — 跳过前面的若干项数据
  • SkipLast — 跳过后面的若干项数据
  • Take — 只保留前面的若干项数据
  • TakeLast — 只保留后面的若干项数据

组合操作

组合操作符用于将多个Observable组合成一个单一的Observable

  • And/Then/When — 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
  • CombineLatest — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
  • Join — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
  • Merge — 将两个Observable发射的数据组合并成一个
  • StartWith — 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项
  • Switch — 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
  • Zip — 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射

错误处理

这些操作符用于从错误通知中恢复

  • Catch — 捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复
  • Retry — 重试,如果Observable发射了一个错误通知,重新订阅它,期待它正常终止

辅助操作

一组用于处理Observable的操作符

  • Delay — 延迟一段时间发射结果数据
  • Do — 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作
  • Materialize/Dematerialize — 将发射的数据和通知都当做数据发射,或者反过来
  • ObserveOn — 指定观察者观察Observable的调度程序(工作线程)
  • Serialize — 强制Observable按次序发射数据并且功能是有效的
  • Subscribe — 收到Observable发射的数据和通知后执行的操作
  • SubscribeOn — 指定Observable应该在哪个调度程序上执行
  • TimeInterval — 将一个Observable转换为发射两个数据之间所耗费时间的Observable
  • Timeout — 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
  • Timestamp — 给Observable发射的每个数据项添加一个时间戳
  • Using — 创建一个只在Observable的生命周期内存在的一次性资源

条件和布尔操作

这些操作符可用于单个或多个数据项,也可用于Observable

  • All — 判断Observable发射的所有的数据项是否都满足某个条件
  • Amb — 给定多个Observable,只让第一个发射数据的Observable发射全部数据
  • Contains — 判断Observable是否会发射一个指定的数据项
  • DefaultIfEmpty — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
  • SequenceEqual — 判断两个Observable是否按相同的数据序列
  • SkipUntil — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
  • SkipWhile — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
  • TakeUntil — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
  • TakeWhile — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据

算术和聚合操作

这些操作符可用于整个数据序列

  • Average — 计算Observable发射的数据序列的平均值,然后发射这个结果
  • Concat — 不交错的连接多个Observable的数据
  • Count — 计算Observable发射的数据个数,然后发射这个结果
  • Max — 计算并发射数据序列的最大值
  • Min — 计算并发射数据序列的最小值
  • Reduce — 按顺序对数据序列的每一个应用某个函数,然后返回这个值
  • Sum — 计算并发射数据序列的和

连接操作

一些有精确可控的订阅行为的特殊Observable

  • Connect — 指示一个可连接的Observable开始发射数据给订阅者
  • Publish — 将一个普通的Observable转换为可连接的
  • RefCount — 使一个可连接的Observable表现得像一个普通的Observable
  • Replay — 确保所有的观察者收到同样的数据序列,即使他们在Observable开始发射数据之后才订阅

转换操作

  • To — 将Observable转换为其它的对象或数据结构
  • Blocking 阻塞Observable的操作符
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,738评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,377评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,774评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,032评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,015评论 5 361
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,239评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,724评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,374评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,508评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,410评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,457评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,132评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,733评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,804评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,022评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,515评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,116评论 2 341

推荐阅读更多精彩内容