ReactiveX 探索(二) — Rx 五大部件

ReactiveX 官网

ReactiveX 文档中文翻译

RxJava GitHub 主页

RxJava Wiki 主页

一、Observable

在 ReactiveX 中,一个观察者 (Observer) 订阅一个可观察对象 (Observable)。观察者对 Observable 发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应 Observable 的通知,不需要阻塞等待 Observable 发射数据。

该部分会解释什么是响应式编程模式 (reactive pattern),以及什么是可观察对象 (Observables) 和观察者 (observers),其它几个部分会展示如何用操作符组合和改变 Observable 的行为。

1.1 背景知识

在很多软件编程任务中,或多或少你都会期望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。但是在 ReactiveX 中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象 (Observable),观察者 (Observer) 订阅 (Subscribe) 它,当数据就绪时,之前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。

这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。

有很多术语可用于描述这种异步编程和设计模式,在本文里我们使用这些术语:一个观察者订阅一个可观察对象 (An observer subscribes to an Observable)。通过调用观察者的方法,Observable 发射数据或通知给它的观察者。

在其它的文档和场景里,有时我们也将 Observer 叫做 SubscriberWatcherReactor。这个模型通常被称作 Reactor 模式。

1.2 创建观察者

本文使用类似于 Groovy 的伪代码举例,但是 ReactiveX 有多种语言的实现。

普通的方法调用(不是某种异步方法,也不是 Rx 中的并行调用),流程通常是这样的:

  1. 调用某一个方法

  2. 用一个变量保存方法返回的结果

  3. 使用这个变量和它的新值做些有用的事

用代码描述就是:

// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

在异步模型中流程更像这样的:

  1. 定义一个方法,这个方法拿着某个异步调用的返回值做一些有用的事情。这个方法是观察者的一部分。

  2. 将这个异步调用本身定义为一个 Observable

  3. 观察者通过订阅 (Subscribe) 操作关联到那个 Observable

  4. 继续你的业务逻辑,等方法返回时,Observable 会发射结果,观察者的方法会开始处理结果或结果集

用代码描述就是:

// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business

1.3 回调方法 (onNext, onCompleted, onError)

Subscribe() 方法用于将观察者连接到 Observable,你的观察者需要实现以下方法的一个子集:

  • onNext(T item)

Observable 调用这个方法发射数据,方法的参数就是 Observable 发射的数据,这个方法可能会被调用多次,取决于你的实现。

  • onError(Exception ex)

Observable 遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止 Observable,后续不会再调用 onNext()onCompleted()onError() 方法的参数是抛出的异常。

  • onCompleted

正常终止,如果没有遇到错误,Observable 在最后一次调用 onNext() 之后调用此方法。

根据 Observable 协议的定义,onNext() 可能会被调用零次或者很多次,最后会有一次 onCompleted()onError() 调用(不会同时),传递数据给 onNext() 通常被称作发射,onCompleted()onError() 被称作通知。

下面是一个更完整的例子:

def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business

1.4 取消订阅 (Unsubscribing)

在一些 ReactiveX 实现中,有一个特殊的观察者接口 Subscriber,它有一个 unsubscribe() 方法。调用这个方法表示你不关心当前订阅的 Observable 了,因此 Observable 可以选择停止发射新的数据项(如果没有其它观察者订阅)。

取消订阅的结果会传递给这个 Observable 的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生,然而,对一个 Observable 来说,即使没有观察者了,它也可以在一个 while 循环中继续生成并尝试发射数据项。

1.5 关于命名约定

ReactiveX 的每种特定语言的实现都有自己的命名偏好,虽然不同的实现之间有很多共同点,但并不存在一个统一的命名标准。而且,在某些场景中,一些名字有不同的隐含意义,或者在某些语言看来比较怪异。例如,有一个 onEvent 命名模式 (onNext, onCompleted, onError),在一些场景中,这些名字可能意味着事件处理器已经注册。然而在 ReactiveX 里,他们是事件处理器的名字。

1.6 Observables 的"热"和"冷"

Observable 什么时候开始发射数据序列?这取决于 Observable 的实现,一个"热"的 Observable 可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的 Observable 会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。

在一些 ReactiveX 实现里,还存在一种被称作 ConnectableObservable,不管有没有观察者订阅它,这种 Observable 都不会开始发射数据,除非 Connect() 方法被调用。

1.7 用操作符组合 Observable

对于 ReactiveX 来说,ObservableObserver 仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。

ReactiveX 真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理 Observable 发射的数据。

Rx 的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。

下面是常用的操作符列表:

  1. 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer

  2. 变换操作 Buffer, FlatMap, GroupBy, Map, Scan 和 Window

  3. 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast

  4. 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip

  5. 错误处理 Catch 和 Retry

  6. 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using

  7. 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile

  8. 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum

  9. 转换操作 To

  10. 连接操作 Connect, Publish, RefCount, Replay

  11. 反压操作,用于增加特殊的流程控制策略的操作符

这些操作符并不全都是 ReactiveX 的核心组成部分,有一些是语言特定的实现或可选的模块。

二、Single

RxJava(以及它派生出来的 RxGroovy 和 RxScala)中有一个名为 SingleObservable 变种。

Single 类似于 Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值。

因此,不同于 Observable 需要三个方法 onNext(), onError(), onCompleted(),订阅 Single 只需要两个方法:

  • onSuccess() - Single 发射单个的值到这个方法

  • onError() - 如果无法发射需要的值,Single 发射一个 Throwable 对象到这个方法

Single 只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。

2.1 Single 的操作符

Single 也可以组合使用多种操作,一些操作符让你可以混合使用 ObservableSingle

操作符 返回值 说明
compose Single 创建一个自定义的操作符
concat and concatWith Observable 连接多个 Single 和 Observable 发射的数据
create Single 调用观察者的 create 方法创建一个 Single
error Single 返回一个立即给订阅者发射错误通知的 Single
flatMap Single 返回一个 Single,它发射对原 Single 的数据执行 flatMap 操作后的结果
flatMapObservable Observable 返回一个 Observable,它发射对原 Single 的数据执行 flatMap 操作后的结果
from Single 将 Future 转换成 Single
just Single 返回一个发射一个指定值的 Single
map Single 返回一个 Single,它发射对原 Single 的数据执行 map 操作后的结果
merge Single 将一个 Single (它发射的数据是另一个 Single,假设为 B)转换成另一个 Single (它发射来自另一个 Single(B) 的数据)
merge and mergeWith Observable 合并发射来自多个 Single 的数据
observeOn Single 指示 Single 在指定的调度程序上调用订阅者的方法
onErrorReturn Single 将一个发射错误通知的 Single 转换成一个发射指定数据项的 Single
subscribeOn Single 指示 Single 在指定的调度程序上执行操作
timeout Single 它给原有的 Single 添加超时控制,如果超时了就发射一个错误通知
toSingle Single 将一个发射单个值的 Observable 转换为一个Single
zip and zipWith Single 将多个 Single 转换为一个,后者发射的数据是对前者应用一个函数后的结果

三、Subject

Subject 可以看成是一个桥梁或者代理,在某些 ReactiveX 实现中(如 RxJava),它同时充当了 ObserverObservable 的角色。因为它是一个 Observer,它可以订阅一个或多个 Observable;又因为它是一个 Observable,它可以转发它收到 (Observe) 的数据,也可以发射新的数据。

由于一个 Subject 订阅一个 Observable,它可以触发这个 Observable 开始发射数据(如果那个 Observable 是"冷"的 — 就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject 可以把原来那个"冷"的 Observable 变成"热"的。

3.1 Subject 的种类

针对不同的场景一共有四种类型的 Subject。他们并不是在所有的实现中全部都存在,而且一些实现使用其它的命名约定(例如,在 RxScala 中 Subject 被称作 PublishSubject)。

3.1.1 AsyncSubject

一个 AsyncSubject 只在原始 Observable 完成后,发射来自原始 Observable 的最后一个值。(如果原始 Observable 没有发射任何值,AsyncObject 也不发射任何值)它会把这最后一个值发射给任何后续的观察者。

然而,如果原始的 Observable 因为发生了错误而终止,AsyncSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。

3.1.2 BehaviorSubject

当观察者订阅 BehaviorSubject 时,它开始发射原始 Observable 最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始 Observable 的数据。

然而,如果原始的 Observable 因为发生了一个错误而终止,BehaviorSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。

3.1.3 PublishSubject

PublishSubject 只会把在订阅发生的时间点之后来自原始 Observable 的数据发射给观察者。需要注意的是,PublishSubject 可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在 Subject 被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始 Observable 的所有数据都被分发,你需要这样做:使用 Create 创建那个 Observable 以便手动给它引入"冷" Observable 的行为(当所有观察者都已经订阅时才开始发射数据),或者改用 ReplaySubject。

如果原始的 Observable 因为发生了一个错误而终止,PublishSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。

3.1.4 ReplaySubject

ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。也有其它版本的 ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始 Observable 发射的)。

如果你把 ReplaySubject 当作一个观察者使用,注意不要从多个线程中调用它的 onNext() 方法(包括其它的 on 系列方法),这可能导致同时(非顺序)调用,这会违反 Observable 协议,给 Subject 的结果增加了不确定性。

3.1.5 RxJava 的对应类

假设你有一个 Subject,你想把它传递给其它的代理或者暴露它的 Subscriber 接口,你可以调用它的 asObservable() 方法,这个方法返回一个 Observable。

3.1.6 串行化

如果你把 Subject 当作一个 Subscriber 使用,注意不要从多个线程中调用它的 onNext() 方法(包括其它的 on 系列方法),这可能导致同时(非顺序)调用,这会违反 Observable 协议,给 Subject 的结果增加了不确定性。

要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

四、调度器 Scheduler

如果你想给 Observable 操作符链添加多线程功能,你可以指定操作符(或者特定的 Observable)在特定的调度器 (Scheduler) 上执行。

某些 ReactiveX 的 Observable 操作符有一些变体,它们可以接受一个 Scheduler 参数。这个参数指定操作符将它们的部分或全部任务放在一个特定的调度器上执行。

使用 ObserveOnSubscribeOn 操作符,你可以让 Observable 在一个特定的调度器上执行,ObserveOn 指示一个 Observable 在一个特定的调度器上调用观察者的 onNext()onError()onCompleted() 方法,SubscribeOn 更进一步,它指示 Observable 将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。

4.1 RxJava 示例

4.1.1 调度器的种类

下表展示了 RxJava 中可用的调度器种类:

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于 IO 操作 (IO 操作请使用 Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的 Executor 作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于 IO 密集型任务,如异步阻塞 IO 操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用 Schedulers.computation()Schedulers.io( ) 默认是一个 CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
4.1.2 默认调度器

在 RxJava 中,某些 Observable 操作符的变体允许你设置用于操作执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操作符的默认调度器:

操作符 调度器
buffer(timespan) computation
buffer(timespan, count) computation
buffer(timespan, timeshift) computation
debounce(timeout, unit) computation
delay(delay, unit) computation
delaySubscription(delay, unit) computation
interval computation
repeat trampoline
replay(time, unit) computation
replay(buffersize, time, unit) computation
replay(selector, time, unit) computation
replay(selector, buffersize, time, unit) computation
retry trampoline
sample(period, unit) computation
skip(time, unit) computation
skipLast(time, unit) computation
take(time, unit) computation
takeLast(time, unit) computation
takeLast(count, time, unit) computation
takeLastBuffer(time, unit) computation
takeLastBuffer(count, time, unit) computation
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval immediate
timeout(timeoutSelector) immediate
timeout(firstTimeoutSelector, timeoutSelector) immediate
timeout(timeoutSelector, other) immediate
timeout(timeout, timeUnit) computation
timeout(firstTimeoutSelector, timeoutSelector, other) immediate
timeout(timeout, timeUnit, other) computation
timer computation
timestamp immediate
window(timespan) computation
window(timespan, count) computation
window(timespan, timeshift) computation
4.1.3 使用调度器

除了将这些调度器传递给 RxJava 的 Observable 操作符,你也可以用它们调度你自己的任务。下面的示例展示了 Scheduler.Worker 的用法:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();
4.1.4 递归调度器

要调度递归的方法调用,你可以使用 schedule,然后再用 schedule(this),示例:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();
4.1.5 检查或设置取消订阅状态

Worker 类的对象实现了 Subscription 接口,使用它的 isUnsubscribed()unsubscribe() 方法,你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

Worker 同时是 Subscription,因此你可以(通常也应该)调用它的 unsubscribe() 方法通知可以挂起任务和释放资源了。

4.1.6 延时和周期调度器

你可以使用 schedule(action,delayTime,timeUnit) 在指定的调度器上延时执行你的任务,下面例子中的任务将在 500 毫秒之后开始执行:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

使用另一个版本的 schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法让你可以安排一个定期执行的任务,下面例子的任务将在 500 毫秒之后执行,然后每 250 毫秒执行一次:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
4.1.7 测试调度器

TestScheduler 让你可以对调度器的时钟表现进行手动微调。这对依赖精确时间安排的任务的测试很有用处。这个调度器有三个额外的方法:

  • advanceTimeTo(time,unit) 向前拨动调度器的时钟到一个指定的时间点

  • advanceTimeBy(time,unit) 将调度器的时钟向前拨动一个指定的时间段

  • triggerActions() 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间

五、操作符 Operators

ReactiveX 的每种编程语言的实现都实现了一组操作符的集合。不同的实现之间有很多重叠的部分,也有一些操作符只存在特定的实现中。每种实现都倾向于用那种编程语言中他们熟悉的上下文中相似的方法给这些操作符命名。

5.1 创建操作

用于创建 Observable 的操作符。

  • Create — 通过调用观察者的方法从头创建一个 Observable

  • Defer — 在观察者订阅之前不创建这个 Observable,为每一个观察者创建一个新的 Observable

  • Empty/Never/Throw — 创建行为受限的特殊 Observable

  • From — 将其它的对象或数据结构转换为 Observable

  • Interval — 创建一个定时发射整数序列的 Observable

  • Just — 将对象或者对象集合转换为一个会发射这些对象的 Observable

  • Range — 创建发射指定范围的整数序列的 Observable

  • Repeat — 创建重复发射特定的数据或数据序列的 Observable

  • Start — 创建发射一个函数的返回值的 Observable

  • Timer — 创建在一个指定的延迟之后发射单个数据的 Observable

5.2 变换操作

这些操作符可用于对 Observable 发射的数据进行变换。

  • Buffer — 缓存,可以简单的理解为缓存,它定期从 Observable 收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个

  • FlatMap — 扁平映射,将 Observable 发射的数据变换为 Observables 集合,然后将这些 Observable 发射的数据平坦化的放进一个单独的 Observable,可以认为是一个将嵌套的数据结构展开的过程。

  • GroupBy — 分组,将原来的 Observable 分拆为 Observable 集合,将原始 Observable 发射的数据按 Key 分组,每一个 Observable 发射一组不同的数据

  • Map — 映射,通过对序列的每一项都应用一个函数变换 Observable 发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项

  • Scan — 扫描,对 Observable 发射的每一项数据应用一个函数,然后按顺序依次发射这些值

  • Window — 窗口,定期将来自 Observable 的数据分拆成一些 Observable 窗口,然后发射这些窗口,而不是每次发射一项。类似于 Buffer,但 Buffer 发射的是数据,Window 发射的是 Observable,每一个 Observable 发射原始 Observable 的数据的一个子集

这里简单介绍两个重要的操作符 mapflatmap / concattMap。这两者的使用场景也很好区分。当转换过程是同步过程时,使用 map,当转换过程是异步过程时使用 flatMap / contactMapconcatMap 可以保证数据的顺序性。

map 的操作很简单,就是传入一个函数,这个函数会将数据进行转换。

Flowable.range(1, 10)
      .map(v -> v * v)

flatMap 的传入的 function 与 map 传入的 function 不同,flatMap 传入的 function 返回值是一个 Flowable,就如它的名字一样,RxJava 会对返回的 Flowable 进行 “flat” 扁平化操作,将 Flowable 转换成数据好传给下一个操作函数。

service.apiCall()
    .flatMap(value -> service.anotherApiCall(value))
    .flatMap(next -> service.finalCall(next))

5.3 过滤操作

这些操作符用于从 Observable 发射的数据中进行选择。

  • Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作

  • Distinct — 去重,过滤掉重复数据项

  • ElementAt — 取值,取特定位置的数据项

  • Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的

  • First — 首项,只发射满足条件的第一条数据

  • IgnoreElements — 忽略所有的数据,只保留终止通知 (onError()onCompleted())

  • Last — 末项,只发射最后一条数据

  • Sample — 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫 ThrottleFirst

  • Skip — 跳过前面的若干项数据

  • SkipLast — 跳过后面的若干项数据

  • Take — 只保留前面的若干项数据

  • TakeLast — 只保留后面的若干项数据

5.4 组合操作

组合操作符用于将多个 Observable 组合成一个单一的 Observable

  • And/Then/When — 通过模式 (And 条件)和计划 (Then 次序)组合两个或多个 Observable 发射的数据集

  • CombineLatest — 当两个 Observables 中的任何一个发射了一个数据时,通过一个指定的函数组合每个 Observable 发射的最新数据(一共两个数据),然后发射这个函数的结果

  • Join — 无论何时,如果一个 Observable 发射了一个数据项,只要在另一个 Observable 发射的数据项定义的时间窗口内,就将两个 Observable 发射的数据合并发射

  • Merge — 将两个 Observable 发射的数据组合并成一个

  • StartWith — 在发射原来的 Observable 的数据序列之前,先发射一个指定的数据序列或数据项

  • Switch — 将一个发射 Observable 序列的 Observable 转换为这样一个 Observable:它逐个发射那些 Observable 最近发射的数据

  • Zip — 打包,使用一个指定的函数将多个 Observable 发射的数据组合在一起,然后将这个函数的结果作为单项数据发射

5.5 错误处理

这些操作符用于从错误通知中恢复。

  • Catch — 捕获,继续序列操作,将错误替换为正常的数据,从 onError() 通知中恢复

  • Retry — 重试,如果 Observable 发射了一个错误通知,重新订阅它,期待它正常终止

5.6 辅助操作

一组用于处理 Observable 的操作符。

  • Delay — 延迟一段时间发射结果数据

  • Do — 注册一个动作占用一些 Observable 的生命周期事件,相当于 Mock 某个操作

  • Materialize/Dematerialize — 将发射的数据和通知都当做数据发射,或者反过来

  • ObserveOn — 指定观察者观察 Observable 的调度程序(工作线程)

  • Serialize — 强制 Observable 按次序发射数据并且功能是有效的

  • Subscribe — 收到 Observable 发射的数据和通知后执行的操作

  • SubscribeOn — 指定 Observable 应该在哪个调度程序上执行

  • TimeInterval — 将一个 Observable 转换为发射两个数据之间所耗费时间的 Observable

  • Timeout — 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知

  • Timestamp — 给 Observable 发射的每个数据项添加一个时间戳

  • Using — 创建一个只在 Observable 的生命周期内存在的一次性资源

5.7 条件和布尔操作

这些操作符可用于单个或多个数据项,也可用于 Observable

  • All — 判断 Observable 发射的所有的数据项是否都满足某个条件

  • Amb — 给定多个 Observable,只让第一个发射数据的 Observable 发射全部数据

  • Contains — 判断 Observable 是否会发射一个指定的数据项

  • DefaultIfEmpty — 发射来自原始 Observable 的数据,如果原始 Observable 没有发射数据,就发射一个默认数据

  • SequenceEqual — 判断两个 Observable 是否按相同的数据序列

  • SkipUntil — 丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据

  • SkipWhile — 丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据

  • TakeUntil — 发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知

  • TakeWhile — 发射原始 Observable 的数据,直到一个特定的条件为真,然后跳过剩余的数据

5.8 算术和聚合操作

这些操作符可用于整个数据序列。

  • Average — 计算 Observable 发射的数据序列的平均值,然后发射这个结果

  • Concat — 不交错的连接多个 Observable 的数据

  • Count — 计算 Observable 发射的数据个数,然后发射这个结果

  • Max — 计算并发射数据序列的最大值

  • Min — 计算并发射数据序列的最小值

  • Reduce — 按顺序对数据序列的每一个应用某个函数,然后返回这个值

  • Sum — 计算并发射数据序列的和

5.9 连接操作

一些有精确可控的订阅行为的特殊 Observable

  • Connect — 指示一个可连接的 Observable 开始发射数据给订阅者

  • Publish — 将一个普通的 Observable 转换为可连接的

  • RefCount — 使一个可连接的 Observable 表现得像一个普通的 Observable

  • Replay — 确保所有的观察者收到同样的数据序列,即使他们在 Observable 开始发射数据之后才订阅

5.10 转换操作

  • To — 将 Observable 转换为其它的对象或数据结构

  • Blocking — 阻塞 Observable 的操作符

5.11 操作符决策树

几种主要的需求:
  • 直接创建一个 Observable(创建操作)

  • 组合多个 Observable(组合操作)

  • Observable 发射的数据执行变换操作(变换操作)

  • Observable 发射的数据中取特定的值(过滤操作)

  • 转发 Observable 的部分值(条件/布尔/过滤操作)

  • Observable 发射的数据序列求值(算术/聚合操作)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342