官方文档链接:http://reactivex.io/documentation/operators.html
1.前言
每种实现了ReactiveX的语言都有一套操作符,并且命名上趋向于该语言下那些已经熟悉的、功能类似的方法。虽然这些实现之间有许多相同的部分,但也有些操作符只在特定语言中存在。
2.操作链
大多数操作符作用于被观察者,并返回被观察者。这就允许操作符如链条一样,一个接着一个,导致每个操作符可以修改上一个操作符处理后的被观察者(操作链不直接作用于链最开始的那个被观察者,而是依次执行,每个操作符作用的被观察者由紧挨着它的上一个操作符生成)。
还有其它模式允许以相似的方式链接方法,比如建造者模式,指定类有一系列方法可以对自己操作,通过调用这些方法修改类的实例。但对它而言,方法在链中出现的顺序通常不重要,而被观察者的操作符则关注顺序。
3.创建被观察者
产生新的被观察者的操作符。
-
Empty / Never / Throw — 创建有非常明确和限制性行为的被观察者。
empty()
、never()
和throw()
操作符对测试有帮助,有时也结合其它被观察者或作为操作符的参数。 -
Create — 通过程序地调用观察者的方法,从头创建一个被观察者。
create()
操作符从头创建一个被观察者。给这个操作符传入支持观察者的函数(这是函数式编程的概念,可以理解为匿名内部类)作为参数,并将函数的行为当作被观察的目标,比如适当地调用观察者的onNext()
、onError()
和onCompleted()
方法。一个良好定义的被观察者必须尝试调用观察者的onCompleted()
方法一次或它的onError()
方法一次,此后不再尝试调用观察者的任何其它方法。参考的文章:
-
Defer — 直到观察者订阅后才创建被观察者,并且为每个观察者创建新的被观察者。
defer()
操作符直到观察者订阅了它,才根据被观察者工厂函数生成被观察者。为每个订阅者重复以上步骤,尽管它们可能会认为自己订阅的是同一个被观察者(实际上每个订阅者获得了专属自己的事件队列)。某些情况下,等到最后时刻(指订阅的时刻)生成被观察者能保证它包含最新的数据。参考这篇文章 Deferring Observable code until subscription in RxJava by Dan Lew
-
Start — 创建一个被观察者,将一个函数的返回值作为事件产生。
编程语言有许多方式获取计算结果的值,它们被称为functions、futures、actions、callables、runnables,等。集聚在
start()
操作符类别下的这些操作行为与被观察者类似,可以与其它被观察者结合使用。参考的文章:
-
From — 将各种其它对象或数据结构(含有多个元素)转变为一个被观察者(每个元素都是一个事件)。
from()
操作符可以很方便地将需要处理的所有数据表现为被观察者,而不是混杂着被观察者和其它类型。这允许使用一套操作符来管理数据流的整个有效期。Iterables,可以被认为是一种同步执行的被观察者;Futures(Java中指异步执行),则认为是一种只产生单一事件的被观察者。通过将这些对象显式转换成被观察者,允许它们像普通被观察者一样调用。出于这个原因,大多数ReactiveX的实现都有方法将某一特定语言对象和数据结构转换成被观察者。参考的文章:
-
Just — 将一个或一系列对象转换成一个产生一个或多个事件的被观察者。
just()
操作符和from()
操作符类似。但得注意,前者将数组、迭代器等简单地当作一个事件产生,不做改变;而后者将这些类型拆成单独的事件产生。注意,如果给just()
操作符传null,将返回产生事件为null的被观察者。不要犯这种错误,因为会返回一个空的被观察者(不产生任何事件)。若需要这种情况,建议使用empty()
操作符。参考的文章:
-
Repeat — 创建一个被观察者,重复地产生某特定的事件或事件队列。
repeat()
操作符重复地产生一个事件,别的实现允许重复产生事件队列或设置重复的次数。参考的文章:
-
Timer — 创建一个被观察者,在给定的延迟后产生一个特定的事件。
参考的文章:
-
Interval — 创建一个被观察者,每隔指定时间产生一个整数。
interval()
操作符返回一个被观察者,它产生一个递增的整数的无穷序列,并且元素间保持着固定的时间间隔。参考的文章:
-
Range — 创建一个产生特定范围内若干连续整数的被观察者。
range()
操作符根据确定的开始位置和范围长度,按顺序产生一系列整数。参考的文章:
4.改变被观察者
改变被观察者产生的事件的操作符。由于以后讲的操作符不处于操作链首部,所以默认为传递事件,实际上是内部创建的被观察者产生事件。同时,约定操作符之前的被观察者称为原始被观察者,它产生的事件为原始事件。
-
Buffer — 定期收集被观察者的事件,每次传递事件的集合而不是单个事件。
buffer()
操作符将每次产生单个事件的被观察者改变为每次传递缓冲的事件集合的被观察者。它在不同的特定语言实现中是不一样的,区别在于如何选择哪个事件进哪个缓存区。需注意的是,如果原始被观察者发布一个onError()
通知,buffer()
操作符将立即传递这个通知。不管准备传递的集合是处于收集的过程中,还是在通知发布之前已经收集好了。参考的文章:
-
Window — 定期划分被观察者的事件到不同的视窗,每次传递这些视窗而不是单个事件。
window()
操作符和buffer()
操作符类似。但相较于后者通过数据结构容纳事件并传递;前者则是新建被观察者,每一个都传递原始事件的子集,并由一个onCompleted()
通知终止。与
buffer()
操作符一样,window()
操作符有许多种实现,每种都用自己的方式将原始被观察者划分为多个被观察者,它们将分别包含的原始事件统称视窗。在window()
操作符的术语中,当一个视窗打开,意味着一个新的被观察者将开始传递原始事件;当那个视窗关闭,意味着那个被观察者停止传递原始事件,并给自己的观察者发送一个onCompleted()
通知来终止。参考的文章:
-
GroupBy — 将一个被观察者划分成一系列被观察者,其中每个被观察者传递按键值区分的不同的原始事件的子集。
groupBy()
操作符将一个产生事件的被观察者划分成多个,每个仅传递原始事件的子集。哪些事件最终属于哪个被观察者,通常由一个判别函数(评估每个事件并指派它键值)决定。所有拥有相同键值的事件需同一个被观察者传递。参考的文章:
- Window
- Introduction to Rx: GroupBy
- Animations of Rx operators: GroupBy by Tamir Dresher
-
Map — 通过使用一个函数来改变被观察者产生的事件。
map()
操作符对每个原始事件使用指定的函数,并且返回一个用这些函数结果产生事件的被观察者。参考的文章:
- FlatMap
- Introduction to Rx: Select
- RxMarbles:
map
- 101 Rx Samples: Select — Indexed
- Using the map method with Observable by Jafar Husain (JavaScript视频教程)
-
FlatMap — 将被观察者产生的每个事件转换成被观察者,然后将它们产生的所有事件由统一的被观察者传递。
flatmap()
操作符使用一个指定的函数来处理每个原始事件,从而改变被观察者。与map()
操作符不同的是,它的函数返回一个自己能够产生事件的被观察者。然后,该操作符混合所有后生成的被观察者产生的事件,作为自己的事件队列传递出去。这个方法是有用的,例如,当一个被观察者产生一系列事件,这些事件本身拥有被观察者成员或通过其它方式转变成被观察者,所以会创建新的被观察者来传递它们产生的所有事件的集合。需注意的是,
flatmap()
操作符混合了那些被观察者的事件,所以事件在顺序上可能会交错。在几个特定语言的实现中,拥有不会导致交错问题的操作符,而是按照严格的顺序传递这些事件,通常称为
concatMap()
之类的。参考的文章:
- Map
- Grokking RxJava: Operator, Operator by Dan Lew
- Introduction to Rx: SelectMany
- Recursive Observables with RxJava by Jon Schneider
- RxJava Observable transformation: concatMap() vs. flatMap() by Fernando Cejas
-
Scan — 给被观察者产生的每个事件应用函数,处理时可使用前一个值作为参数。
scan()
操作符使用函数处理第一个原始事件,然后将函数的结果作为自己的第一个事件传递出去。同时也将函数的结果反馈给函数处理中的第二个原始事件,从而生成自己的第二个事件,并依此规则创建自己剩余的事件队列。在其它上下文中,这种操作符有时被称为累加器。参考的文章:
5.过滤被观察者
选择性地传递原始事件的操作符。
-
Sample — 传递被观察者在周期时间间隔内最近产生的事件。
sample()
操作符周期性地查看被观察者,且传递任何一个从上次采样后最近产生的事件。在某些实现中,还有相似的throttleFirst()
操作符,但不是传递取样事件内最近产生的事件,而是那个时间内最初创建的事件。参考的文章:
-
Debounce — 只传递被观察者在特定时间内仅产生了一个的事件(没产生另一个)。
debounce()
操作符从原始事件中过滤出那些被另一个事件紧密追随的事件。参考的文章:
-
Distinct — 不传递被观察者产生的重复事件。
distinct()
操作符只允许被观察者之前没有产生过的事件通过。在某些实现中会有不同,允许调整区别两个事件的标准;或仅比较相邻两项的区别,从而只过滤事件队列中连续重复的。参考的文章:
-
Filter — 只传递被观察者那些通过判断检验的事件。
filter()
操作符只允许被观察者符合指定的判断函数检验的事件通过。参考的文章:
-
Take — 只传递被观察者产生的最初的n个事件。
通过
take()
操作符修改被观察者,使只传递被观察者产生的最初的n个事件,然后完成时忽略剩下的。参考的文章:
-
TakeLast — 只传递被观察者产生的最终的n个事件。
通过
takeLast()
操作符修改被观察者,使只传递被观察者产生的最终的n个事件,然后忽略之前的那些事件。参考的文章:
-
Skip — 不传递被观察者产生的最初的n个事件。
通过
skip()
操作符修改被观察者,使忽略被观察者产生的最初的n个事件,然后只关注后面的那些事件。参考的文章:
-
SkipLast — 不传递被观察者产生的最终的n个事件。
通过
skipLast()
操作符修改被观察者,使忽略被观察者产生的最终的n个事件,然后只关注之前的那些事件。参考的文章:
-
ElementAt — 只传递被观察者产生的第n个事件。
elementAt()
操作符获取原始事件队列中一个特定位置的事件,作为自己单一的事件产生。参考的文章:
-
First — 只传递被观察者产生的第一个事件,或符合某条件的第一个事件。
如果仅关注被观察者产生的第一个事件,或者满足一定条件的第一个事件,可以通过
first()
操作符过滤。在某些实现中,
first()
操作符不是作为过滤操作符实现的,而是当原始被观察者产生某事件时,作为阻塞函数返回该事件。若反而想要过滤操作,可能需使用Take(1)
或ElementAt(0)
。还有single()
操作符,它的行为与first()
操作符类似,除了会一直等到原始被观察者终止来保证只产生单一事件(否则,直接发onError()
通知终止,而不是传递事件)。它不仅能获取第一个原始事件,而且保证只有一个事件。参考的文章:
- ElementAt
- Last
- Take
- Introduction to Rx: First
- Introduction to Rx: Single
- Loading data from multiple sources with RxJava by Dan Lew (example using Concat and First)
- RxMarbles:
find
- RxMarbles:
findIndex
- RxMarbles:
first
-
Last — 只传递被观察者产生的最后的事件,或符合某条件的最后的事件。
如果仅关注被观察者产生的最后的事件,或者满足一定条件的最后的事件,可以通过
last()
操作符过滤。在某些实现中,last()
操作符不是作为过滤操作符实现的,而是当原始被观察者终止时,作为阻塞函数返回特定的事件。若反而想要过滤操作,可能需使用TakeLast(1)
。参考的文章:
-
IgnoreElements — 不传递被观察者的任何事件,除了它的终止通知。
ignoreElements()
操作符禁止所有原始事件,除了原始被观察者的终止通知(onError()
或onCompleted()
)能直接通过。如果不关心被观察者产生的事件,只想关注它什么时候结束或被错误终止,可以对被观察者使用这个操作符,将确保不会调用它的观察者的onNext()
处理。参考的文章:
6.合并被观察者
通过多个原始被观察者创建单一的被观察者的操作符。
-
And / Then / When — 通过Pattern和Plan的转换,合并两个或多个被观察者产生的事件集合。
and()
、then()
和when()
操作符合并的行为非常类似于zip()
操作符,但它们是通过中间层的数据结构实现的。首先接收两个或多个被观察者,每次分别取出一个事件组成集合放入Pattern对象。接着将这些Pattern对象分别转换成Plan对象,再依次将各种Plan对象转换成被观察者的事件。参考的文章:
-
Zip — 通过指定的函数将多个被观察者的事件合并在一起,并把结果作为每种组合的单一事件产生。
zip()
操作符返回一个被观察者,它是由指定的函数依次合并两个或多个其它被观察者产生的事件,并将结果作为自己产生的事件。函数的使用有严格的顺序,新的被观察者产生的第一个事件是函数将作用的被观察者产生的第一个事件合并的结果,其它以此类推,并且产生事件的数量由原始事件中最少的那列决定。参考的文章:
-
CombineLatest — 当两个被观察者中任一个产生一个事件,通过指定函数合并每个被观察者产生的最新事件,并将结果作为事件产生。
combineLatest()
操作符的行为与zip()
操作符相似。除了后者要求每个需要压缩的原始被观察者产生的事件没被压缩过;前者则不管这些,只要每个原始被观察者产生至少一个事件。当任一个原始被观察者产生一个事件,前者使用提供的函数合并其它原始被观察者最近产生的事件,并将返回值作为事件产生。参考的文章:
-
Join — 在被观察者产生的一个事件的时间窗内,其它被观察者也产生了事件,则合并两个被观察者相应的事件。
join()
操作符合并两个被观察者产生的事件,并根据已经定义的事件的时间窗决定哪些需要合并。当这样一个定义了时间窗的被观察者产生新事件或onCompleted()
通知时,之前的时间窗将关闭。只要一个事件的时间窗是开启的,它将合并其它被观察者产生的任一事件。还需定义事件合并的函数。大多数ReactiveX的实现中,不仅有
join()
操作符还有相似的groupJoin()
操作符。除了定义用来合并两个被观察者产生的事件的函数有所改变,即后一个参数由第二个被观察者的符合条件的事件替换为产生相应符合事件的被观察者(可以将前一个参数理解为键,后一个参数理解为对应的所有的值)。参考的文章:
-
Merge — 通过直接合并事件来合并多个被观察者。
merge()
操作符能够合并多个被观察者的事件,使它们看起来像单一被观察者。它可能会交错合并的被观察者产生的事件(一个类似的操作符concat()
,则没有这个问题,在下个原始被观察者开始产生事件之前会依次传递当前的被观察者的事件)。如上图所示,任一个原始被观察者发出
onError()
通知,将会立即传递给观察者,并且会终止合并的被观察者。在许多ReactiveX的实现中,还有一个操作符
mergeDelayError()
,改变了这种行为——推迟onError()
通知直到所有合并的被观察者完成才传给观察者。参考的文章:
-
StartWith — 在开始传递原始事件之前,传递指定的事件队列。
如果想要被观察者在开始传递期望的事件之前,传递指定的事件队列,可以使用
startWith()
操作符。(相反,如果想要在被观察者传递期望的事件之后添加事件队列,可以使用concat()
操作符。)参考的文章:
-
Switch — 将一个以被观察者为事件的被观察者转换为单一被观察者,若那些事件级被观察者同时间段内产生了事件,取最近的被观察者的事件作为自己事件产生。
switch()
操作符关注的是以被观察者为事件的被观察者。每次它只面对这些事件级被观察者中的一个,并在最新的被观察者开始产生事件时,转而传递这些新事件。注意,前一个被观察者在后一个被观察者开始产生事件之后(包括仅是被订阅还未发送事件)产生的事件将丢弃(如上图中的黄色圆圈)。参考的文章:
7.错误处理操作符
帮助从被观察者的onError()
通知中恢复的操作符。
-
Catch — 通过拦截队列中的一个
onError()
通知,来保证继续执行。catch()
操作符拦截原始被观察者产生的一个onError()
通知,而不是将它传递给任一观察者,同时可以用一些其它事件或事件队列替换,还可以允许被观察者正常终止或根本不终止。还有几种catch()
操作符,以及在不同的ReactiveX实现中用不同的名字描述这个操作,正如下面部分看到的。在一些ReactiveX实现中,有一个操作符叫什么
onErrorResumeNext()
,行为类似catch()
操作符:对原始被观察者产生的一个onError()
通知有特定反应。还有一个同样名字的操作符,行为更像concat()
操作符:不管原始被观察者是正常终止还是错误终止,执行连接操作。参考的文章:
-
Retry — 如果原始被观察者发送一个
onError()
通知,将重新订阅以期望下次没有错误地完成。retry()
操作符通过不传递onError()
通知给观察者,来响应原始被观察者发生的错误,并重新订阅它,获得另一次无错误地完成自己的事件队列的机会。这个操作符先传递onNext()
通知给自己的观察者,即使队列是因为错误而终止的,所以会导致重复的事件(如上图所示)。参考的文章:
8.总结
到目前为止,已经介绍了ReactiveX一半的操作符,而这些操作符恰恰也是最常用的。ReactiveX可以理解为设计模式,讲的是一种思路,具体每种语言的实现是不同的。对于Android开发的同学,肯定希望使用RxJava,这里推荐几篇我认为比较全的文章,以供参考: