一、Observable
在 ReactiveX 中,一个观察者 (Observer)
订阅一个可观察对象 (Observable)
。观察者对 Observable
发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应 Observable
的通知,不需要阻塞等待 Observable
发射数据。
该部分会解释什么是响应式编程模式 (reactive pattern),以及什么是可观察对象 (Observables)
和观察者 (observers)
,其它几个部分会展示如何用操作符组合和改变 Observable
的行为。
1.1 背景知识
在很多软件编程任务中,或多或少你都会期望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。但是在 ReactiveX 中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象 (Observable),观察者 (Observer) 订阅 (Subscribe) 它,当数据就绪时,之前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。
这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。
有很多术语可用于描述这种异步编程和设计模式,在本文里我们使用这些术语:一个观察者订阅一个可观察对象 (An observer subscribes to an Observable)。通过调用观察者的方法,Observable
发射数据或通知给它的观察者。
在其它的文档和场景里,有时我们也将 Observer
叫做 Subscriber
、Watcher
、Reactor
。这个模型通常被称作 Reactor
模式。
1.2 创建观察者
本文使用类似于 Groovy 的伪代码举例,但是 ReactiveX 有多种语言的实现。
普通的方法调用(不是某种异步方法,也不是 Rx 中的并行调用),流程通常是这样的:
调用某一个方法
用一个变量保存方法返回的结果
使用这个变量和它的新值做些有用的事
用代码描述就是:
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal
在异步模型中流程更像这样的:
定义一个方法,这个方法拿着某个异步调用的返回值做一些有用的事情。这个方法是观察者的一部分。
将这个异步调用本身定义为一个
Observable
观察者通过订阅
(Subscribe)
操作关联到那个Observable
继续你的业务逻辑,等方法返回时,
Observable
会发射结果,观察者的方法会开始处理结果或结果集
用代码描述就是:
// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business
1.3 回调方法 (onNext, onCompleted, onError)
Subscribe()
方法用于将观察者连接到 Observable
,你的观察者需要实现以下方法的一个子集:
- onNext(T item)
Observable
调用这个方法发射数据,方法的参数就是 Observable
发射的数据,这个方法可能会被调用多次,取决于你的实现。
- onError(Exception ex)
当 Observable
遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止 Observable
,后续不会再调用 onNext()
和 onCompleted()
,onError()
方法的参数是抛出的异常。
- onCompleted
正常终止,如果没有遇到错误,Observable
在最后一次调用 onNext()
之后调用此方法。
根据 Observable
协议的定义,onNext()
可能会被调用零次或者很多次,最后会有一次 onCompleted()
或 onError()
调用(不会同时),传递数据给 onNext()
通常被称作发射,onCompleted()
和 onError()
被称作通知。
下面是一个更完整的例子:
def myOnNext = { item -> /* do something useful with item */ };
def myError = { throwable -> /* react sensibly to a failed call */ };
def myComplete = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business
1.4 取消订阅 (Unsubscribing)
在一些 ReactiveX 实现中,有一个特殊的观察者接口 Subscriber,它有一个 unsubscribe()
方法。调用这个方法表示你不关心当前订阅的 Observable 了,因此 Observable 可以选择停止发射新的数据项(如果没有其它观察者订阅)。
取消订阅的结果会传递给这个 Observable 的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生,然而,对一个 Observable 来说,即使没有观察者了,它也可以在一个 while 循环中继续生成并尝试发射数据项。
1.5 关于命名约定
ReactiveX 的每种特定语言的实现都有自己的命名偏好,虽然不同的实现之间有很多共同点,但并不存在一个统一的命名标准。而且,在某些场景中,一些名字有不同的隐含意义,或者在某些语言看来比较怪异。例如,有一个 onEvent 命名模式 (onNext, onCompleted, onError)
,在一些场景中,这些名字可能意味着事件处理器已经注册。然而在 ReactiveX 里,他们是事件处理器的名字。
1.6 Observables 的"热"和"冷"
Observable
什么时候开始发射数据序列?这取决于 Observable
的实现,一个"热"的 Observable
可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的 Observable
会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。
在一些 ReactiveX 实现里,还存在一种被称作 Connectable
的 Observable
,不管有没有观察者订阅它,这种 Observable
都不会开始发射数据,除非 Connect()
方法被调用。
1.7 用操作符组合 Observable
对于 ReactiveX 来说,Observable
和 Observer
仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。
ReactiveX 真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理 Observable
发射的数据。
Rx 的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。
下面是常用的操作符列表:
创建操作
Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
变换操作
Buffer, FlatMap, GroupBy, Map, Scan 和 Window
过滤操作
Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
组合操作
And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
错误处理
Catch 和 Retry
辅助操作
Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
条件和布尔操作
All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
算术和集合操作
Average, Concat, Count, Max, Min, Reduce, Sum
转换操作
To
连接操作
Connect, Publish, RefCount, Replay
反压操作,用于增加特殊的流程控制策略的操作符
这些操作符并不全都是 ReactiveX 的核心组成部分,有一些是语言特定的实现或可选的模块。
二、Single
RxJava(以及它派生出来的 RxGroovy 和 RxScala)中有一个名为 Single
的 Observable
变种。
Single
类似于 Observable
,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值。
因此,不同于 Observable
需要三个方法 onNext(), onError(), onCompleted()
,订阅 Single
只需要两个方法:
onSuccess()
-Single
发射单个的值到这个方法onError()
- 如果无法发射需要的值,Single
发射一个Throwable
对象到这个方法
Single
只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。
2.1 Single 的操作符
Single
也可以组合使用多种操作,一些操作符让你可以混合使用 Observable
和 Single
:
操作符 | 返回值 | 说明 |
---|---|---|
compose | Single | 创建一个自定义的操作符 |
concat and concatWith | Observable | 连接多个 Single 和 Observable 发射的数据 |
create | Single | 调用观察者的 create 方法创建一个 Single |
error | Single | 返回一个立即给订阅者发射错误通知的 Single |
flatMap | Single | 返回一个 Single,它发射对原 Single 的数据执行 flatMap 操作后的结果 |
flatMapObservable | Observable | 返回一个 Observable,它发射对原 Single 的数据执行 flatMap 操作后的结果 |
from | Single | 将 Future 转换成 Single |
just | Single | 返回一个发射一个指定值的 Single |
map | Single | 返回一个 Single,它发射对原 Single 的数据执行 map 操作后的结果 |
merge | Single | 将一个 Single (它发射的数据是另一个 Single,假设为 B)转换成另一个 Single (它发射来自另一个 Single(B) 的数据) |
merge and mergeWith | Observable | 合并发射来自多个 Single 的数据 |
observeOn | Single | 指示 Single 在指定的调度程序上调用订阅者的方法 |
onErrorReturn | Single | 将一个发射错误通知的 Single 转换成一个发射指定数据项的 Single |
subscribeOn | Single | 指示 Single 在指定的调度程序上执行操作 |
timeout | Single | 它给原有的 Single 添加超时控制,如果超时了就发射一个错误通知 |
toSingle | Single | 将一个发射单个值的 Observable 转换为一个Single |
zip and zipWith | Single | 将多个 Single 转换为一个,后者发射的数据是对前者应用一个函数后的结果 |
三、Subject
Subject
可以看成是一个桥梁或者代理,在某些 ReactiveX 实现中(如 RxJava),它同时充当了 Observer
和 Observable
的角色。因为它是一个 Observer
,它可以订阅一个或多个 Observable
;又因为它是一个 Observable
,它可以转发它收到 (Observe)
的数据,也可以发射新的数据。
由于一个 Subject
订阅一个 Observable
,它可以触发这个 Observable
开始发射数据(如果那个 Observable
是"冷"的 — 就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject
可以把原来那个"冷"的 Observable
变成"热"的。
3.1 Subject 的种类
针对不同的场景一共有四种类型的 Subject。他们并不是在所有的实现中全部都存在,而且一些实现使用其它的命名约定(例如,在 RxScala 中 Subject
被称作 PublishSubject)。
3.1.1 AsyncSubject
一个 AsyncSubject 只在原始 Observable 完成后,发射来自原始 Observable 的最后一个值。(如果原始 Observable 没有发射任何值,AsyncObject 也不发射任何值)它会把这最后一个值发射给任何后续的观察者。
然而,如果原始的 Observable 因为发生了错误而终止,AsyncSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。
3.1.2 BehaviorSubject
当观察者订阅 BehaviorSubject 时,它开始发射原始 Observable 最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始 Observable 的数据。
然而,如果原始的 Observable 因为发生了一个错误而终止,BehaviorSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。
3.1.3 PublishSubject
PublishSubject 只会把在订阅发生的时间点之后来自原始 Observable 的数据发射给观察者。需要注意的是,PublishSubject 可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在 Subject 被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始 Observable 的所有数据都被分发,你需要这样做:使用 Create 创建那个 Observable 以便手动给它引入"冷" Observable 的行为(当所有观察者都已经订阅时才开始发射数据),或者改用 ReplaySubject。
如果原始的 Observable 因为发生了一个错误而终止,PublishSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。
3.1.4 ReplaySubject
ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。也有其它版本的 ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始 Observable 发射的)。
如果你把 ReplaySubject 当作一个观察者使用,注意不要从多个线程中调用它的 onNext()
方法(包括其它的 on 系列方法),这可能导致同时(非顺序)调用,这会违反 Observable 协议,给 Subject 的结果增加了不确定性。
3.1.5 RxJava 的对应类
假设你有一个 Subject,你想把它传递给其它的代理或者暴露它的 Subscriber 接口,你可以调用它的 asObservable()
方法,这个方法返回一个 Observable。
3.1.6 串行化
如果你把 Subject 当作一个 Subscriber 使用,注意不要从多个线程中调用它的 onNext()
方法(包括其它的 on 系列方法),这可能导致同时(非顺序)调用,这会违反 Observable 协议,给 Subject 的结果增加了不确定性。
要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
四、调度器 Scheduler
如果你想给 Observable 操作符链添加多线程功能,你可以指定操作符(或者特定的 Observable)在特定的调度器 (Scheduler) 上执行。
某些 ReactiveX 的 Observable 操作符有一些变体,它们可以接受一个 Scheduler 参数。这个参数指定操作符将它们的部分或全部任务放在一个特定的调度器上执行。
使用 ObserveOn
和 SubscribeOn
操作符,你可以让 Observable 在一个特定的调度器上执行,ObserveOn
指示一个 Observable 在一个特定的调度器上调用观察者的 onNext()
,onError()
和 onCompleted()
方法,SubscribeOn
更进一步,它指示 Observable 将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。
4.1 RxJava 示例
4.1.1 调度器的种类
下表展示了 RxJava 中可用的调度器种类:
调度器类型 | 效果 |
---|---|
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于 IO 操作 (IO 操作请使用 Schedulers.io() );默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的 Executor 作为调度器 |
Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于 IO 密集型任务,如异步阻塞 IO 操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用 Schedulers.computation() ;Schedulers.io( ) 默认是一个 CachedThreadScheduler ,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
4.1.2 默认调度器
在 RxJava 中,某些 Observable
操作符的变体允许你设置用于操作执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操作符的默认调度器:
操作符 | 调度器 |
---|---|
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retry | trampoline |
sample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
4.1.3 使用调度器
除了将这些调度器传递给 RxJava 的 Observable
操作符,你也可以用它们调度你自己的任务。下面的示例展示了 Scheduler.Worker
的用法:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// some time later...
worker.unsubscribe();
4.1.4 递归调度器
要调度递归的方法调用,你可以使用 schedule
,然后再用 schedule(this)
,示例:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}
});
// some time later...
worker.unsubscribe();
4.1.5 检查或设置取消订阅状态
Worker
类的对象实现了 Subscription
接口,使用它的 isUnsubscribed()
和 unsubscribe()
方法,你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:
Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}
});
Worker
同时是 Subscription
,因此你可以(通常也应该)调用它的 unsubscribe()
方法通知可以挂起任务和释放资源了。
4.1.6 延时和周期调度器
你可以使用 schedule(action,delayTime,timeUnit)
在指定的调度器上延时执行你的任务,下面例子中的任务将在 500 毫秒之后开始执行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
使用另一个版本的 schedule,schedulePeriodically(action,initialDelay,period,timeUnit)
方法让你可以安排一个定期执行的任务,下面例子的任务将在 500 毫秒之后执行,然后每 250 毫秒执行一次:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
4.1.7 测试调度器
TestScheduler
让你可以对调度器的时钟表现进行手动微调。这对依赖精确时间安排的任务的测试很有用处。这个调度器有三个额外的方法:
advanceTimeTo(time,unit)
向前拨动调度器的时钟到一个指定的时间点advanceTimeBy(time,unit)
将调度器的时钟向前拨动一个指定的时间段triggerActions()
开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间
五、操作符 Operators
ReactiveX 的每种编程语言的实现都实现了一组操作符的集合。不同的实现之间有很多重叠的部分,也有一些操作符只存在特定的实现中。每种实现都倾向于用那种编程语言中他们熟悉的上下文中相似的方法给这些操作符命名。
5.1 创建操作
用于创建 Observable
的操作符。
Create
— 通过调用观察者的方法从头创建一个Observable
Defer
— 在观察者订阅之前不创建这个Observable
,为每一个观察者创建一个新的Observable
Empty/Never/Throw
— 创建行为受限的特殊Observable
From
— 将其它的对象或数据结构转换为Observable
Interval
— 创建一个定时发射整数序列的Observable
Just
— 将对象或者对象集合转换为一个会发射这些对象的Observable
Range
— 创建发射指定范围的整数序列的Observable
Repeat
— 创建重复发射特定的数据或数据序列的Observable
Start
— 创建发射一个函数的返回值的Observable
Timer
— 创建在一个指定的延迟之后发射单个数据的Observable
5.2 变换操作
这些操作符可用于对 Observable
发射的数据进行变换。
Buffer
— 缓存,可以简单的理解为缓存,它定期从Observable
收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个FlatMap
— 扁平映射,将Observable
发射的数据变换为Observables
集合,然后将这些Observable
发射的数据平坦化的放进一个单独的Observable
,可以认为是一个将嵌套的数据结构展开的过程。GroupBy
— 分组,将原来的Observable
分拆为Observable
集合,将原始Observable
发射的数据按Key
分组,每一个Observable
发射一组不同的数据Map
— 映射,通过对序列的每一项都应用一个函数变换Observable
发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项Scan
— 扫描,对Observable
发射的每一项数据应用一个函数,然后按顺序依次发射这些值Window
— 窗口,定期将来自Observable
的数据分拆成一些Observable
窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer
,但Buffer
发射的是数据,Window
发射的是Observable
,每一个Observable
发射原始Observable
的数据的一个子集
这里简单介绍两个重要的操作符 map
和 flatmap / concattMap
。这两者的使用场景也很好区分。当转换过程是同步过程时,使用 map,当转换过程是异步过程时使用 flatMap / contactMap
。concatMap
可以保证数据的顺序性。
map
的操作很简单,就是传入一个函数,这个函数会将数据进行转换。
Flowable.range(1, 10)
.map(v -> v * v)
flatMap
的传入的 function 与 map
传入的 function 不同,flatMap
传入的 function 返回值是一个 Flowable
,就如它的名字一样,RxJava 会对返回的 Flowable
进行 “flat” 扁平化操作,将 Flowable
转换成数据好传给下一个操作函数。
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
5.3 过滤操作
这些操作符用于从 Observable
发射的数据中进行选择。
Debounce
— 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作Distinct
— 去重,过滤掉重复数据项ElementAt
— 取值,取特定位置的数据项Filter
— 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的First
— 首项,只发射满足条件的第一条数据IgnoreElements
— 忽略所有的数据,只保留终止通知 (onError()
或onCompleted()
)Last
— 末项,只发射最后一条数据Sample
— 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst
Skip
— 跳过前面的若干项数据SkipLast
— 跳过后面的若干项数据Take
— 只保留前面的若干项数据TakeLast
— 只保留后面的若干项数据
5.4 组合操作
组合操作符用于将多个 Observable
组合成一个单一的 Observable
。
And/Then/When
— 通过模式 (And
条件)和计划 (Then
次序)组合两个或多个Observable
发射的数据集CombineLatest
— 当两个Observables
中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable
发射的最新数据(一共两个数据),然后发射这个函数的结果Join
— 无论何时,如果一个Observable
发射了一个数据项,只要在另一个Observable
发射的数据项定义的时间窗口内,就将两个Observable
发射的数据合并发射Merge
— 将两个Observable
发射的数据组合并成一个StartWith
— 在发射原来的Observable
的数据序列之前,先发射一个指定的数据序列或数据项Switch
— 将一个发射Observable
序列的Observable
转换为这样一个Observable
:它逐个发射那些Observable
最近发射的数据Zip
— 打包,使用一个指定的函数将多个Observable
发射的数据组合在一起,然后将这个函数的结果作为单项数据发射
5.5 错误处理
这些操作符用于从错误通知中恢复。
Catch
— 捕获,继续序列操作,将错误替换为正常的数据,从onError()
通知中恢复Retry
— 重试,如果Observable
发射了一个错误通知,重新订阅它,期待它正常终止
5.6 辅助操作
一组用于处理 Observable
的操作符。
Delay
— 延迟一段时间发射结果数据Do
— 注册一个动作占用一些Observable
的生命周期事件,相当于Mock
某个操作Materialize/Dematerialize
— 将发射的数据和通知都当做数据发射,或者反过来ObserveOn
— 指定观察者观察Observable
的调度程序(工作线程)Serialize
— 强制Observable
按次序发射数据并且功能是有效的Subscribe
— 收到Observable
发射的数据和通知后执行的操作SubscribeOn
— 指定Observable
应该在哪个调度程序上执行TimeInterval
— 将一个Observable
转换为发射两个数据之间所耗费时间的 ObservableTimeout
— 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知Timestamp
— 给Observable
发射的每个数据项添加一个时间戳Using
— 创建一个只在Observable
的生命周期内存在的一次性资源
5.7 条件和布尔操作
这些操作符可用于单个或多个数据项,也可用于 Observable
。
All
— 判断Observable
发射的所有的数据项是否都满足某个条件Amb
— 给定多个Observable
,只让第一个发射数据的Observable
发射全部数据Contains
— 判断Observable
是否会发射一个指定的数据项DefaultIfEmpty
— 发射来自原始Observable
的数据,如果原始 Observable 没有发射数据,就发射一个默认数据SequenceEqual
— 判断两个Observable
是否按相同的数据序列SkipUntil
— 丢弃原始Observable
发射的数据,直到第二个Observable
发射了一个数据,然后发射原始Observable
的剩余数据SkipWhile
— 丢弃原始Observable
发射的数据,直到一个特定的条件为假,然后发射原始Observable
剩余的数据TakeUntil
— 发射来自原始Observable
的数据,直到第二个 Observable 发射了一个数据或一个通知TakeWhile
— 发射原始Observable
的数据,直到一个特定的条件为真,然后跳过剩余的数据
5.8 算术和聚合操作
这些操作符可用于整个数据序列。
Average
— 计算Observable
发射的数据序列的平均值,然后发射这个结果Concat
— 不交错的连接多个Observable
的数据Count
— 计算Observable
发射的数据个数,然后发射这个结果Max
— 计算并发射数据序列的最大值Min
— 计算并发射数据序列的最小值Reduce
— 按顺序对数据序列的每一个应用某个函数,然后返回这个值Sum
— 计算并发射数据序列的和
5.9 连接操作
一些有精确可控的订阅行为的特殊 Observable
。
Connect
— 指示一个可连接的Observable
开始发射数据给订阅者Publish
— 将一个普通的Observable
转换为可连接的RefCount
— 使一个可连接的Observable
表现得像一个普通的 ObservableReplay
— 确保所有的观察者收到同样的数据序列,即使他们在Observable
开始发射数据之后才订阅
5.10 转换操作
To
— 将Observable
转换为其它的对象或数据结构Blocking
— 阻塞Observable
的操作符
5.11 操作符决策树
几种主要的需求:
直接创建一个
Observable
(创建操作)组合多个
Observable
(组合操作)对
Observable
发射的数据执行变换操作(变换操作)从
Observable
发射的数据中取特定的值(过滤操作)转发
Observable
的部分值(条件/布尔/过滤操作)对
Observable
发射的数据序列求值(算术/聚合操作)