Rxjava操作符五

nest

nest

nest操作符有一个特殊的用途:将一个Observable转换为一个发射这个Observable的Observable。

条件和布尔操作

这个页面的操作符可用于根据条件发射或变换Observables,或者对它们做布尔运算:

条件操作符

  • amb( ) — 给定多个Observable,只让第一个发射数据的Observable发射全部数据
  • defaultIfEmpty( ) — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
  • (rxjava-computation-expressions) doWhile( ) — 发射原始Observable的数据序列,然后重复发射这个序列直到不满足这个条件为止
  • (rxjava-computation-expressions) ifThen( ) — 只有当某个条件为真时才发射原始Observable的数据序列,否则发射一个空的或默认的序列
  • skipUntil( ) — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
  • skipWhile( ) — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
  • (rxjava-computation-expressions) switchCase( ) — 基于一个计算结果,发射一个指定Observable的数据序列
  • takeUntil( ) — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
  • takeWhile( ) and takeWhileWithIndex( ) — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据
  • (rxjava-computation-expressions) whileDo( ) — 如果条件为true,则发射源Observable数据序列,并且只要条件保持为true就重复发射此数据序列

(rxjava-computation-expressions) — 表示这个操作符当前是可选包 rxjava-computation-expressions 的一部分,还没有包含在标准RxJava的操作符集合里

布尔操作符

条件和布尔操作

All

判定是否Observable发射的所有数据都满足某个条件

all

传递一个谓词函数给All操作符,这个函数接受原始Observable发射的数据,根据计算返回一个布尔值。All返回一个只发射一个单个布尔值的Observable,如果原始Observable正常终止并且每一项数据都满足条件,就返回true;如果原始Observable的任何一项数据不满足条件就返回False。

all

RxJava将这个操作符实现为all,它默认不在任何特定的调度器上执行。

Amb

给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据

amb

当你传递多个Observable给Amb时,它只发射其中一个Observable的数据和通知:首先发送通知给Amb的那个,不管发射的是一项数据还是一个onErroronCompleted通知。Amb将忽略和丢弃其它所有Observables的发射物。

amb

RxJava的实现是amb,有一个类似的对象方法ambWith。例如,Observable.amb(o1,o2)o1.ambWith(o2)是等价的。

这个操作符默认不在任何特定的调度器上执行。

Contains

判定一个Observable是否发射一个特定的值

contains

Contains传一个指定的值,如果原始Observable发射了那个值,它返回的Observable将发射true,否则发射false。

相关的一个操作符IsEmpty用于判定原始Observable是否没有发射任何数据。

contains

contains默认不在任何特定的调度器上执行。

exists

RxJava中还有一个exists操作符,它通过一个谓词函数测试原始Observable发射的数据,只要任何一项满足条件就返回一个发射true的Observable,否则返回一个发射false的Observable。

exists默认不在任何特定的调度器上执行。

isEmpty

isEmpty默认不在任何特定的调度器上执行。

DefaultIfEmpty

发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值

defaultIfEmtpy

DefaultIfEmpty简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以onCompletedd的形式),DefaultIfEmpty返回的Observable就发射一个你提供的默认值。

RxJava将这个操作符实现为defaultIfEmpty。它默认不在任何特定的调度器上执行。

还有一个新的操作符switchIfEmpty,不在RxJava 1.0.0版中,它和defaultIfEmtpy类似,不同的是,如果原始Observable没有发射数据,它发射一个备用Observable的发射物。

SequenceEqual

判定两个Observables是否发射相同的数据序列。

sequenceEqual

传递两个Observable给SequenceEqual操作符,它会比较两个Observable的发射物,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false。

sequenceEqual

它还有一个版本接受第三个参数,可以传递一个函数用于比较两个数据项是否相同。

这个操作符默认不在任何特定的调度器上执行。

SkipUntil

丢弃原始Observable发射的数据,直到第二个Observable发射了一项数据

skipUntil

SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。

RxJava中对应的是skipUntil,它默认不在任何特定的调度器上执行。

SkipWhile

丢弃Observable发射的数据,直到一个指定的条件不成立

skipWhile

SkipWhile订阅原始的Observable,但是忽略它的发射物,直到你指定的某个条件变为false的那一刻,它开始发射原始Observable。

skipWhile默认不在任何特定的调度器上执行。

TakeUntil

当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据

takeUntil

TakeUntil订阅并开始发射原始Observable,它还监视你提供的第二个Observable。如果第二个Observable发射了一项数据或者发射了一个终止通知,TakeUntil返回的Observable会停止发射原始Observable并终止。

takeUntil

RxJava中的实现是takeUntil。注意:第二个Observable发射一项数据或一个onError通知或一个onCompleted通知都会导致takeUntil停止发射数据。

takeUntil默认不在任何特定的调度器上执行。

takeUntil

还有一个版本的takeUntil,不在RxJava 1.0.0版中,它使用一个谓词函数而不是第二个Observable来判定是否需要终止发射数据,它的行为类似于takeWhile

TakeWhile

发射Observable发射的数据,直到一个指定的条件不成立

takeWhile

TakeWhile发射原始Observable,直到你指定的某个条件不成立的那一刻,它停止发射原始Observable,并终止自己的Observable。

RxJava中的takeWhile操作符返回一个镜像原始Observable行为的Observable,直到某一项数据你指定的函数返回false那一刻,这个新的Observable发射onCompleted终止通知。

takeWhile默认不在任何特定的调度器上执行。

算术和聚合操作

本页展示的操作符用于对整个序列执行算法操作或其它操作,由于这些操作必须等待数据发射完成(通常也必须缓存这些数据),它们对于非常长或者无限的序列来说是危险的,不推荐使用。

rxjava-math 模块的操作符

其它聚合操作符

  • concat( ) — 顺序连接多个Observables
  • count( ) and countLong( ) — 计算数据项的个数并发射结果
  • reduce( ) — 对序列使用reduce()函数并发射最终的结果
  • collect( ) — 将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的Observable
  • toList( ) — 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
  • toSortedList( ) — 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表
  • toMap( ) — 将序列数据转换为一个Map,Map的key是根据一个函数计算的
  • toMultiMap( ) — 将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的

算术和聚合操作

Average

计算原始Observable发射数字的平均值并发射它

average

Average操作符操作符一个发射数字的Observable,并发射单个值:原始Observable发射的数字序列的平均值。

这个操作符不包含在RxJava核心模块中,它属于不同的rxjava-math模块。它被实现为四个操作符:averageDouble, averageFloat, averageInteger, averageLong

average

如果原始Observable不发射任何数据,这个操作符会抛异常:IllegalArgumentException

Min

发射原始Observable的最小值

min

Min操作符操作一个发射数值的Observable并发射单个值:最小的那个值。

RxJava中,min属于rxjava-math模块。

min

min接受一个可选参数,用于比较两项数据的大小,如果最小值的数据超过一项,min会发射原始Observable最近发射的那一项。

minBy

minBy类似于min,但是它发射的不是最小值,而是发射Key最小的项,Key由你指定的一个函数生成。

Max

发射原始Observable的最大值

max

Max操作符操作一个发射数值的Observable并发射单个值:最大的那个值。

RxJava中,max属于rxjava-math模块。

max

max接受一个可选参数,用于比较两项数据的大小,如果最大值的数据超过一项,max会发射原始Observable最近发射的那一项。

maxBy

maxBy类似于max,但是它发射的不是最大值,而是发射Key最大的项,Key由你指定的一个函数生成。

Count

计算原始Observable发射物的数量,然后只发射这个值

count

Count操作符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量。

如果原始Observable发生错误终止,Count不发射数据而是直接传递错误通知。如果原始Observable永远不终止,Count既不会发射数据也不会终止。

RxJava的实现是countcountLong

示例代码

String[] items = new String[] { "one", "two", "three" };
assertEquals( new Integer(3), Observable.from(items).count().toBlocking().single() );

Sum

计算Observable发射的数值的和并发射这个和

sum

Sum操作符操作一个发射数值的Observable,仅发射单个值:原始Observable所有数值的和。

RxJava的实现是sumDouble, sumFloat, sumInteger, sumLong,它们不是RxJava核心模块的一部分,属于rxjava-math模块。

sum.f

你可以使用一个函数,计算Observable每一项数据的函数返回值的和。

StringObservable类(这个类不是RxJava核心模块的一部分)中有一个stringConcat操作符,它将一个发射字符串序列的Observable转换为一个发射单个字符串的Observable,后者这个字符串表示的是前者所有字符串的连接。

St.join

StringObservable类还有一个join操作符,它将一个发射字符串序列的Observable转换为一个发射单个字符串的Observable,后者这个字符串表示的是前者所有字符串以你指定的分界符连接的结果。

Concat

不交错的发射两个或多个Observable的发射物

concat

Concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。

直到前面一个Observable终止,Concat才会订阅额外的一个Observable。注意:因此,如果你尝试连接一个"热"Observable(这种Observable在创建后立即开始发射数据,即使没有订阅者),Concat将不会看到也不会发射它之前发射的任何数据。

在ReactiveX的某些实现中有一种ConcatMap操作符(名字可能叫concat_all, concat_map, concatMapObserver, for, forIn/for_in, mapcat, selectConcatselectConcatObserver),他会变换原始Observable发射的数据到一个对应的Observable,然后再按观察和变换的顺序进行连接操作。

StartWith操作符类似于Concat,但是它是插入到前面,而不是追加那些Observable的数据到原始Observable发射的数据序列。

Merge操作符也差不多,它结合两个或多个Observable的发射物,但是数据可能交错,而Concat不会让多个Observable的发射物交错。

concat

RxJava中的实现叫concat

还有一个实例方法叫concatWith,这两者是等价的:Observable.concat(a,b)a.concatWith(b)

Reduce

按顺序对Observable发射的每项数据应用一个函数并发射最终的值

reduce

Reduce操作符对原始Observable发射数据的第一项应用一个函数,然后再将这个函数的返回值与第二项数据一起传递给函数,以此类推,持续这个过程知道原始Observable发射它的最后一项数据并终止,此时Reduce返回的Observable发射这个函数返回的最终值。

在其它场景中,这种操作有时被称为累积聚集压缩折叠注射等。

reduce

注意如果原始Observable没有发射任何数据,reduce抛出异常IllegalArgumentException

reduce默认不在任何特定的调度器上执行。

reduce

还有一个版本的reduce额外接受一个种子参数。注意传递一个值为null的种子是合法的,但是与不传种子参数的行为是不同的。如果你传递了种子参数,并且原始Observable没有发射任何数据,reduce操作符将发射这个种子值然后正常终止,而不是抛异常。

提示:不建议使用reduce收集发射的数据到一个可变的数据结构,那种场景你应该使用collect

collect

collectreduce类似,但它的目的是收集原始Observable发射的所有数据到一个可变的数据结构,collect生成的这个Observable会发射这项数据。它需要两个参数:

  1. 一个函数返回可变数据结构
  2. 另一个函数,当传递给它这个数据结构和原始Observable发射的数据项时,适当地修改数据结构。

collect默认不在任何特定的调度器上执行。

异步操作

下面的这些操作符属于单独的rxjava-async模块,它们用于将同步对象转换为Observable。

  • start( ) — 创建一个Observable,它发射一个函数的返回值
  • toAsync( ) or asyncAction( ) or asyncFunc( ) — 将一个函数或者Action转换为已Observable,它执行这个函数并发射函数的返回值
  • startFuture( ) — 将一个返回Future的函数转换为一个Observable,它发射Future的返回值
  • deferFuture( ) — 将一个返回Observable的Future转换为一个Observable,但是并不尝试获取这个Future返回的Observable,直到有订阅者订阅它
  • forEachFuture( ) — 传递Subscriber方法给一个Subscriber,但是同时表现得像一个Future一样阻塞直到它完成
  • fromAction( ) — 将一个Action转换为Observable,当一个订阅者订阅时,它执行这个action并发射它的返回值
  • fromCallable( ) — 将一个Callable转换为Observable,当一个订阅者订阅时,它执行这个Callable并发射Callable的返回值,或者发射异常
  • fromRunnable( ) — convert a Runnable into an Observable that invokes the runable and emits its result when a Subscriber subscribes将一个Runnable转换为Observable,当一个订阅者订阅时,它执行这个Runnable并发射Runnable的返回值
  • runAsync( ) — 返回一个StoppableObservable,它发射某个Scheduler上指定的Action生成的多个actions

连接操作

这一节解释ConnectableObservable 和它的子类以及它们的操作符:

一个可连接的Observable与普通的Observable差不多,除了这一点:可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。

publishConnect

The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output: 下面的示例代码展示了两个订阅者订阅同一个Observable的情况。第一种情形,它们订阅一个普通的Observable;第二种情形,它们订阅一个可连接的Observable,并且在两个都订阅后再连接。注意输出的不同:

示例 #1:

def firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

firstMillion.subscribe(
    { println("Subscriber #2:" + it); },       // onNext
    { println("Error: " + it.getMessage()); }, // onError
    { println("Sequence #2 complete"); }       // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete

示例 #2:

def firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

firstMillion.subscribe(
   { println("Subscriber #2:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #2 complete"); }       // onCompleted
);

firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete

Connect

让一个可连接的Observable开始发射数据给订阅者

connect

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。

connect

RxJava中connectConnectableObservable接口的一个方法,使用publish操作符可以将一个普通的Observable转换为一个ConnectableObservable

调用ConnectableObservableconnect方法会让它后面的Observable开始给发射数据给订阅者。

connect方法返回一个Subscription对象,可以调用它的unsubscribe方法让Observable停止发射数据给观察者。

即使没有任何订阅者订阅它,你也可以使用connect方法让一个Observable开始发射数据(或者开始生成待发射的数据)。这样,你可以将一个"冷"的Observable变为"热"的。

Publish

将普通的Observable转换为可连接的Observable

publish

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

publish

RxJava的实现为publish

publish

有一个变体接受一个函数作为参数。这个函数用原始Observable发射的数据作为参数,产生一个新的数据作为ConnectableObservable给发射,替换原位置的数据项。实质是在签名的基础上添加一个Map操作。

RefCount

让一个可连接的Observable行为像普通的Observable

refCount

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

RefCount操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。

refCount

RxJava中的实现为refCount,还有一个操作符叫share,它的作用等价于对一个Observable同时应用publishrefCount操作。

Replay

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅

replay

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

如果在将一个Observable转换为可连接的Observable之前对它使用Replay操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。

replay

RxJava的实现为replay,它有多个接受不同参数的变体,有的可以指定replay的最大缓存数量,有的还可以指定调度器。

replay

有一种 replay返回一个普通的Observable。它可以接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。因此,这个操作符其实是replay变换之后的数据项。

实现自己的操作符

你可以实现你自己的Observable操作符,本文展示怎么做。

如果你的操作符是被用于创造一个Observable,而不是变换或者响应一个Observable,使用 create( ) 方法,不要试图手动实现 Observable。另外,你可以按照下面的用法说明创建一个自定义的操作符。

如果你的操作符是用于Observable发射的单独的数据项,按照下面的说明做:Sequence Operators 。如果你的操作符是用于变换Observable发射的整个数据序列,按照这个说明做:Transformational Operators

提示: 在一个类似于Groovy的语言Xtend中,你可以以 extension methods 的方式实现你自己的操作符 ,不使用本文的方法,它们也可以链式调用。详情参见 RxJava and Xtend

序列操作符

下面的例子向你展示了怎样使用lift( )操作符将你的自定义操作符(在这个例子中是 myOperator)与标准的RxJava操作符(如ofTypemap)一起使用:

fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});

下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与lift()搭配使用。

实现你的操作符

将你的自定义操作符定义为实现了 Operator 接口的一个公开类, 就像这样:

public class MyOperator<T> implements Operator<T> {
  public MyOperator( /* any necessary params here */ ) {
    /* 这里添加必要的初始化代码 */
  }

  @Override
  public Subscriber<? super T> call(final Subscriber<? super T> s) {
    return new Subscriber<t>(s) {
      @Override
      public void onCompleted() {
        /* 这里添加你自己的onCompleted行为,或者仅仅传递完成通知: */
        if(!s.isUnsubscribed()) {
          s.onCompleted();
        }
      }

      @Override
      public void onError(Throwable t) {
        /* 这里添加你自己的onError行为, 或者仅仅传递错误通知:*/
        if(!s.isUnsubscribed()) {
          s.onError(t);
        }
      }

      @Override
      public void onNext(T item) {
        /* 这个例子对结果的每一项执行排序操作,然后返回这个结果 */
        if(!s.isUnsubscribed()) {
          transformedItem = myOperatorTransformOperation(item);
          s.onNext(transformedItem);
        }
      }
    };
  }
}

变换操作符

下面的例子向你展示了怎样使用 compose( ) 操作符将你得自定义操作符(在这个例子中,是一个名叫myTransformer的操作符,它将一个发射整数的Observable转换为发射字符串的)与标准的RxJava操作符(如ofTypemap)一起使用:

fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});

下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与compose()搭配使用。

实现你的变换器

将你的自定义操作符定义为实现了 Transformer 接口的一个公开类,就像这样:

public class MyTransformer<Integer,String> implements Transformer<Integer,String> {
  public MyTransformer( /* any necessary params here */ ) {
    /* 这里添加必要的初始化代码 */
  }

  @Override
  public Observable<String> call(Observable<Integer> source) {
    /* 
     * 这个简单的例子Transformer应用一个map操作,
     * 这个map操作将发射整数变换为发射整数的字符串表示。
     */
    return source.map( new Func1<Integer,String>() {
      @Override
      public String call(Integer t1) {
        return String.valueOf(t1);
      }
    } );
  }
}

参见

其它需要考虑的

  • 在发射任何数据(或者通知)给订阅者之前,你的序列操作符可能需要检查它的 Subscriber.isUnsubscribed( ) 状态,如果没有订阅者了,没必要浪费时间生成数据项。

  • 请注意:你的序列操作符必须复合Observable协议的核心原则:

    • 它可能调用订阅者的 onNext( ) 方法任意次,但是这些调用必须是不重叠的。
    • 它只能调用订阅者的 onCompleted( )onError( ) 正好一次,但不能都调用,而且不能在这之后调用订阅者的 onNext( ) 方法。
    • 如果你不能保证你得操作符遵从这两个原则,你可以给它添加 serialize( ) 操作符,它会强制保持正确的行为。
  • 请关注这里 Issue #1962 &mdash;需要有一个计划创建一个测试脚手架,你可以用它来写测试验证你的新操作符遵从了Observable协议。

  • 不要让你的操作符阻塞别的操作。

  • When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:

  • 如果可能,你应该组合现有的操作符创建你的新操作符,而不是从零开始实现它。RxJava自身的标准操作符也是这样做的,例如:

  • 如果你的操作符使用了函数或者lambda表达式作为参数,请注意它们可能是异常的来源,而且要准备好捕获这些异常,并且使用

onError()

通知订阅者。

  • 某些异常被认为是致命的,对它们来说,调用 onError()毫无意义,那样或者是无用的,或者只是对问题的妥协。你可以使用 Exceptions.throwIfFatal(throwable) 方法过滤掉这些致命的异常,并重新抛出它们,而不是试图发射关于它们的通知。

  • 一般说来,一旦发生错误应立即通知订阅者,而不是首先尝试发射更多的数据。

  • 请注意 null 可能是Observable发射的一个合法数据。频繁发生错误的一个来源是:测试一些变量并且将持有一个非 null 值作为是否发射了数据的替代。一个值为 null 的数据仍然是一个发射数据项,它与没有发射任何东西是不能等同的。

  • 想让你的操作符在反压(backpressure)场景中变得得好可能会非常棘手。可以参考Dávid Karnok的博客 Advanced RxJava,这里有一个涉及到的各种因素和怎样处理它们的很值得看的讨论。

插件让你可以用多种方式修改RxJava的默认行为:

  • 修改默认的计算、IO和新线程调度器集合
  • 为RxJava可能遇到的特殊错误注册一个错误处理器
  • 注册一个函数记录一些常规RxJava活动的发生

RxJavaSchedulersHook

这个插件让你可以使用你选择的调度器覆盖默认的计算、IO和新线程调度 (Scheduler),要做到这些,需要继承 RxJavaSchedulersHook 类并覆写这些方法:

  • Scheduler getComputationScheduler( )
  • Scheduler getIOScheduler( )
  • Scheduler getNewThreadScheduler( )
  • Action0 onSchedule(action)

然后是下面这些步骤:

  1. 创建一个你实现的 RxJavaSchedulersHook 子类的对象。
  2. 使用 RxJavaPlugins.getInstance( ) 获取全局的RxJavaPlugins对象。
  3. 将你的默认调度器对象传递给 RxJavaPluginsregisterSchedulersHook( ) 方法。

完成这些后,RxJava会开始使用你的方法返回的调度器,而不是内置的默认调度器。

RxJavaErrorHandler

这个插件让你可以注册一个函数处理传递给 Subscriber.onError(Throwable) 的错误。要做到这一点,需要继承 RxJavaErrorHandler 类并覆写这个方法:

  • void handleError(Throwable e)

然后是下面这些步骤:

  1. 创建一个你实现的 RxJavaErrorHandler 子类的对象。
  2. 使用 RxJavaPlugins.getInstance( ) 获取全局的RxJavaPlugins对象。
  3. 将你的错误处理器对象传递给 RxJavaPluginsregisterErrorHandler( ) 方法。

完成这些后,RxJava会开始使用你的错误处理器处理传递给 Subscriber.onError(Throwable) 的错误。

RxJavaObservableExecutionHook

这个插件让你可以注册一个函数用于记录日志或者性能数据收集,RxJava在某些常规活动时会调用它。要做到这一点,需要继承 RxJavaObservableExecutionHook 类并覆写这些方法:

方法 何时调用
onCreate( ) Observable.create( )方法中
onSubscribeStart( ) Observable.subscribe( )之前立刻
onSubscribeReturn( ) Observable.subscribe( )之后立刻
onSubscribeError( ) Observable.subscribe( )执行失败时
onLift( ) Observable.lift( )方法中

然后是下面这些步骤:

  1. 创建一个你实现的 RxJavaObservableExecutionHook 子类的对象。
  2. 使用 RxJavaPlugins.getInstance( ) 获取全局的RxJavaPlugins对象。
  3. 将你的Hook对象传递给 RxJavaPluginsregisterObservableExecutionHook( ) 方法。

When you do this, RxJava will begin to call your functions when it encounters the specific conditions they were designed to take note of. 完成这些后,在满足某些特殊的条件时,RxJava会开始调用你的方法。

背压问题

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略

简而言之,背压是流速控制的一种策略

需要强调两点:

  • 背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
  • 背压(Backpressure)并不是一个像flatMap一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。

响应式拉取(reactive pull)

首先我们回忆之前那篇《关于Rxjava最友好的文章》,里面其实提到,在RxJava的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收的。而响应式拉取则反过来,观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据

结构示意图如下:

[图片上传失败...(image-cccb92-1572187975155)]

观察者可以根据自身实际情况按需拉取数据,而不是被动接收(也就相当于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。

源码

public class FlowableOnBackpressureBufferStategy{
...
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            boolean callOnOverflow = false;
            boolean callError = false;
            Deque<T> dq = deque;
            synchronized (dq) {
               if (dq.size() == bufferSize) {
                   switch (strategy) {
                   case DROP_LATEST:
                       dq.pollLast();
                       dq.offer(t);
                       callOnOverflow = true;
                       break;
                   case DROP_OLDEST:
                       dq.poll();
                       dq.offer(t);
                       callOnOverflow = true;
                       break;
                   default:
                       // signal error
                       callError = true;
                       break;
                   }
               } else {
                   dq.offer(t);
               }
            }

            if (callOnOverflow) {
                if (onOverflow != null) {
                    try {
                        onOverflow.run();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.cancel();
                        onError(ex);
                    }
                }
            } else if (callError) {
                s.cancel();
                onError(new MissingBackpressureException());
            } else {
                drain();
            }
        }
...
}

在这段源码中,根据不同的背压策略进行了不同的处理措施,当然这只是列举了一段关于buffer背压策略的例子。

根源

产生背压问题的根源就是上游发送速度与下游的处理速度不均导致的,所以如果想要解决这个问题就需要通过匹配两个速率达到解决这个背压根源的措施。

通常有两个策略可供使用:

  1. 从数量上解决,对数据进行采样
  2. 从速度上解决,降低发送事件的速率
  3. 利用flowable和subscriber

使用Flowable

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一个参数

        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  //注意这句代码
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                 Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        upstream.subscribe(downstream);

我们注意到这次和Observable有些不同. 首先是创建Flowable的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接用BackpressureStrategy.ERROR这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException. 其余的策略后面再来讲解.

另外的一个区别是在下游的onSubscribe方法中传给我们的不再是Disposable了, 而是Subscription, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管, 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:

 s.request(Long.MAX_VALUE);

这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的控制数量控制速度不太一样, 这种方式用通俗易懂的话来说就好比是叶问打鬼子, 我们把上游看成小日本, 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个! 然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10), 叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...

所以我们把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !

同步情况

Observable.create(new ObservableOnSubscribe<Integer>() {                         
    @Override                                                                    
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 
        for (int i = 0; ; i++) {   //无限循环发事件                                              
            emitter.onNext(i);                                                   
        }                                                                        
    }                                                                            
}).subscribe(new Consumer<Integer>() {                                           
    @Override                                                                    
    public void accept(Integer integer) throws Exception {                       
        Thread.sleep(2000);                                                      
        Log.d(TAG, "" + integer);                                                
    }                                                                            
});

当上下游工作在同一个线程中时, 这时候是一个同步的订阅关系, 也就是说上游每发送一个事件必须等到下游接收处理完了以后才能接着发送下一个事件.

同步与异步的区别就在于有没有缓存发送事件的缓冲区。

异步情况

通过subscribeOn和observeOn来确定对应的线程,达到异步的效果,异步时会有一个对应的缓存区来换从从上游发送的事件。

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

背压策略:

  1. error, 缓冲区大概在128
  2. buffer, 缓冲区在1000左右
  3. drop, 把存不下的事件丢弃
  4. latest, 只保留最新的
  5. missing, 缺省设置,不做任何操作

上游从哪里得知下游的处理能力呢?我们来看看上游最重要的部分,肯定就是FlowableEmitter了啊,我们就是通过它来发送事件的啊,来看看它的源码吧(别紧张,它的代码灰常简单):

public interface FlowableEmitter<T> extends Emitter<T> {
    void setDisposable(Disposable s);
    void setCancellable(Cancellable c);

    /**
     * The current outstanding request amount.
     * <p>This method is thread-safe.
     * @return the current outstanding request amount
     */
    long requested();

    boolean isCancelled();
    FlowableEmitter<T> serialize();
}

FlowableEmitter是个接口,继承Emitter,Emitter里面就是我们的onNext(),onComplete()和onError()三个方法。我们看到FlowableEmitter中有这么一个方法:

long requested();

img

同步request.png

这张图的意思就是当上下游在同一个线程中的时候,在下游调用request(n)就会直接改变上游中的requested的值,多次调用便会叠加这个值,而上游每发送一个事件之后便会去减少这个值,当这个值减少至0的时候,继续发送事件便会抛异常了。

img

异步request.png

可以看到,当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们调用request(1000)时,实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。

Rxjava实例开发应用

  1. 网络请求处理(轮询,嵌套,出错重连)
  2. 功能防抖
  3. 从多级缓存获取数据
  4. 合并数据源
  5. 联合判断
  6. 与 Retrofit,RxBinding,EventBus结合使用

Rxjava原理

  1. Scheduler线程切换工作原理
  2. 数据的发送与接收(观察者模式)
  3. lift的工作原理
  4. map的工作原理
  5. flatMap的工作原理
  6. merge的工作原理
  7. concat的工作原理

结完。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • RxJava正在Android开发者中变的越来越流行。唯一的问题就是上手不容易,尤其是大部分人之前都是使用命令式编...
    刘启敏阅读 1,838评论 1 7
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 2,973评论 0 21
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    测天测地测空气阅读 625评论 0 1
  • 记录RxJava操作符,方便查询(2.2.2版本) 英文文档地址:http://reactivex.io/docu...
    凌云飞鱼阅读 810评论 0 0
  • 年薪百万,实现的路径, 思考路径,贡献千万, 千万以上流水的行业及市场容量, 企业软件,供应链金融,千万以上的流水...
    leoliwinner阅读 200评论 0 0