简介
Rxjava 是由微软架构师 Erik Meijer 领导的团队研发出来的开源库,目的是为了提供一个一致的编程接口,以便开发者对异步操作能够进行方便的处理。Rxjava 的设计是的开发者能通过 LINQ 的风格进行异步处理,所以它能将各种判断、循环、过滤、转换等的嵌套层级拉平,以流式的方式来编程,极大地简化了代码,增强了代码的阅读性。
如何使用 Rxjava
下面先举几个例子来说明一下 Rxjava 的简单使用,注意,每一个例子都是一个不同类型的使用方式,且包含了 Rxjava 不同的组成成分。
示例 01:简单示例
下面看一个 Rxjava 使用上的一个最简示例:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("a");
emitter.onNext("b");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"onError");
}
@Override
public void onComplete() {
Log.d(TAG,"onComplete");
}
};
observable.subscribe(observer);
// ------------------- 打印结果 -------------
onSubscribe
a
b
onComplete
我们分析上面的示例,它包含两个对象和一个动作:被观察者(Observable)、观察者(Observer)和订阅动作 subscribe()。Rxjava 是以观察者模式为基础扩展而来的,如上示例所示,当观察者和被观察者通过订阅发生关联后,被观察者就是作为信号的发射方发射消息,而观察者接受被观察者发送来的消息。观察者的onSubscribe()
方法是在subscribe()
的时候便执行了,onNext()
方法是被观察者执行emitter.onNext("a")
发射数据的时候执行,当调用emitter.onComplete()
时,观察者通过 onComplete()方法响应,当调用过程中出现某些异常时,onError(Throwable e)
方法响应。
onSubscribe()
的方法参数为一个 Disposable 对象,持有这个对象后,我们能够在任意位置通过调用d.dispose()
停止观察者对被观察者的响应。原理我们后续再说。
示例 02:异步示例
下面我们看一个异步的示例:
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.d(TAG,"apply--" + Thread.currentThread().toString());
return integer.toString();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,"accept--" + s + "--" + Thread.currentThread().toString());
}
});
// -----------------打印结果如下 -------------------
apply--Thread[RxComputationThreadPool-1,5,main]
apply--Thread[RxComputationThreadPool-1,5,main]
apply--Thread[RxComputationThreadPool-1,5,main]
apply--Thread[RxComputationThreadPool-1,5,main]
accept--1--Thread[main,5,main]
accept--2--Thread[main,5,main]
accept--3--Thread[main,5,main]
accept--4--Thread[main,5,main]
这个示例中引入了异步的概念,我们发现,当 observeOn 被调用后便能够指定下一步的转换等操作的工作线程,每次指定都会生效。这个功能的实现便是 Rxjava 通过调度器 Schedulers 来实现的异步调用。AndroidSchedulers 是 rxandroid 提供的一个扩展,使得后续的执行会在 Android 主线程中执行。我们可以利用调度器对异步操作进行流式的编程,调度器也是 Rxjava 重要的组成成分。
示例 03:背压示例
下面是 Rxjava 背压的一个示例。背压是指如下的情况:被观察者产生信号而观察者接受信号,对于异步的调用流来说,如果被观察者产生的信号非常快,但是观察者消化的信号非常慢,那么就会造成信号的阻塞。而 rxjava 背压的设计解决了这个问题,下游可以控制上游信号的发射速度,从而解决消化不良的问题。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i+ "");
}
}
}, BackpressureStrategy.BUFFER).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
BackpressureStrategy.BUFFER
是背压设计的一个模式,在这个模式下如果下游的观察者消费不了数据,那么就会无限增加缓存,直到产生 OOM,当然还有其他的背压模式。Rxjava 1.x 的版本并不是所有的 Observable 都支持背压,而 Rxjava 2.x 中 Observable 不再支持背压,而是改用 Flowable 来专门支持背压。 背压,也是 rxjava 的一个重要组成部分。
示例 04:Subject 代理示例
subject 可以看成是一个桥梁或者说代理,它既能充当观察者、也能充当被观察者。作为观察者,它能订阅一个或者多个被观察者,作为被观察者它能接收转发给它的数据然后再发射新的数据。下面举一个比较简单的示例:
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("subject01");
subject.onNext("subject02");
subject.onComplete();
subject.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe_async");
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG,"onComplete");
}
});
// ------------------------------- 打印结果如下 --------------------------
onSubscribe_async
subject02
onComplete
AsyncSubject 只关注在 onComplete()调用前的最后一个数据,然后把数据发送给观察者。当然,还有一些其他类型的 Subject 来实现各式各样的功能,我们再后续的文章中会介绍到。
什么是观察者模式
观察者模式是当自身状态发生变化时能够自动通知所有依赖于它的对象的一种行为型模式。比如说电灯和开关的关系,开关属于被观察者,电灯属于观察者,电线则为连接两者的依赖关系。当开关的状态发生变化时,那么就会通过电线通知到绑定的电灯的开启状态。这就是一个比较简单的观察者模式。
下面我们就把电灯的这个观察者模式实现出来。
public class Light {
private void doOpen() {
// dosomething
}
private void doClose() {
// do Something
}
public void update(boolean isOpen) {
if (isOpen) {
doOpen();
}else {
doClose();
}
}
}
public class Switch {
// 一个开关可以控制许多个灯。
List<Light> observers = new ArrayList<>();
public void subscribe(Light light) {
observers.add(light);
}
public void switchOnOff(boolean isOn) {
for (Light light : observers) {
light.update(isOn);
}
}
}
void main() {
Light light = new Light();
Light light02 = new Light();
Switch switchOn = new Switch();
switchOn.subscribe(light);
switchOn.subscribe(light02);
switchOn.switchOnOff(true);
}
上述是一个较为简单的观察者模式的设计。当开关状态发生变化时,会通知自己持有的灯的对象。
Rxjava 的组成成分
从第一节中举的那些例子来看,我们可以归纳一下 Rxjava 的组成成分。被观察者(Observable)、观察者(Observer)、调度器(Scheduler)、代理桥(Subject)、背压(使用 Flowable 实现)、如何将这些元素创建并连接起来,那就涉及到另外一个成分,就是操作符。如上面例子中的 Observable.just()就是一个创建操作符。另外还有 变换操作符、过滤操作符、条件操作符、布尔操作符、合并操作符和连接操作符等供我们使用。那我们就开始我们详细了解 RXjava 的步伐吧。
使用说明也可以参考 RxJava 的官方文档一起来看。ReactiveX 文档中文翻译
Rxjava 的操作符
Rxjava 有许多操作符来协助开发者更好的去开发功能,大致可以如下分类:
- 创建操作符
- 变换操作符
- 过滤操作符
- 条件操作符
- 布尔操作符
- 合并操作符
- 连接操作符
下面我们对这些操作符进行一下详细的介绍,每个操作符我都会举一个简单的例子以便加深对它的理解。
创建操作符
创建操作符的作用是创建一个被观察者(Observable)。
just(T item1, T item2,...)
Observable.just(1,2,3,4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer.toString());
}
});
// ---------------- 输出结果 -------------------
1
2
3
4
just 创建出来的被观察者会将参数中的值依次发给观察者,最后的打印结果便如上面所示。just 操作符最多只能有十个参数,创建的 Observable 会依次发射参数中的数据向 Observer,示例如上所示。
fromXXX()
这其中包含如下的方法调用,使用方式和 just 是一样的:
// 传入参数数量不限,一个一个发射给观察者
Observable.fromArray();
// 传入一个迭代器对象,如 list,会将其中的元素一个一个发射出去
Observable.fromIterable();
// 传入 Future 对象,最终会发生 Future.get()方法返回的参数。
Observable.fromFuture();
// 传入一个 Callable 对象
Observable.fromCallable();
defer()
每次订阅都会创造一个新的 Observable,然后重新发射数据。下面是使用示例:
Observable observable1 = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just("a","b","c");
}
});
observable1.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s+"001");
}
});
observable1.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s+"002");
}
});
// ------------------- 打印结果如下 --------------------
a001
b001
c001
a002
b002
c002
我们可以发现,a b c 这三条讯息在第二次订阅的时候又被重新发了一遍,这些观察者实际上订阅的根本就不是同一个被观察者,每次发生订阅事件,都会 return 一个新的被观察者对象(Observable.just("a","b","c")
),而这个被观察者是在订阅的那一瞬间才创建的,这样能够保证订阅数据的实时性。
Empty/Never/Throw
这三个的功能都比较简单,在测试的时候会经常用到。
- Empty 创建一个不发射任何数据但是正常终止的 Observable,最终只调用 onSubscribe 和 onComplete 方法
- Never 创建一个不发射数据也不终止的 Observable,只会调用 onSubscribe 方法。
- Throw 创建一个不发射数据然后一个错误终止的 Observable,只会调用 onSubscribe 和 onError 方法。
下面的示例发射空的,打印结果如下,最终也不会去调用 onNext() 方法。
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe");
}
@Override
public void onNext(Object o) {
Log.d(TAG,"onNext_" + o);
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"onError" );
}
@Override
public void onComplete() {
Log.d(TAG,"onComplete");
}
});
// --------------- 打印结果 ------------------//
onSubscribe
onComplete
interval
这个操作符是创建了一个按照固定的间隔时间发送连续整数的被观察者,我们看如下示例:
// 3 秒后开始发射数据,后续数据每隔 2 秒发射一个。
Observable.interval(3,2,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG,aLong+"");
}
});
// ------------------- 打印结果 ------------
1
2
...
range(start,count)
该操作符会发射以 start 数字开始,count 为数量的连续的整数序列。如下示例:
Observable.range(100,20).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer+"");
}
});
// --------------------- 打印结果 ----------------//
100
101
....
119
repeat
- repeate(count) 重复发射指定次数的事件
Observable.just("hello")
.repeat(3)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
// ------------------ 打印结果 ------------------
hello
hello
hello
- repeatWhen() 当参数中返回的 Observable 发射数据时,进行重复发射,看下面的示例。
Observable.just("hello")
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
// 这里返回了一个 Observable 对象。repeatWhen 的使用方法就是,
// 每当这个返回的被观察者发射数据的时候,just()中的信号就会被重放一次。所以这里的 hello 会每两秒发生重放
return Observable.interval(2,2,TimeUnit.SECONDS);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
- repeatUntil 直到条件满足一定的条件,才停止重复。
long currentTime = System.currentTimeMillis();
Observable.interval(1,1,TimeUnit.SECONDS)
.take(2)
.repeatUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return System.currentTimeMillis() - currentTime > 5000;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
Log.d(TAG,s+"");
}
});
// --------------------- 打印结果 ----------------
0
1
0
1
0
1
如上所示,如果 getAsBoolean 返回了 false,那么就会继续重放发送的消息。
timer 操作符
给定的延迟时间延迟发送一条消息。
Observable.timer(1000,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG,aLong+"");
}
});
变换操作符
变换操作符可用于对 Observable 发射的数据进行各种各样的变换操作。
map()
对 Observable 发射的数据通过方法进行一次转换操作,生成新的序列。
- cast 转化为指定类型
- map 通过 apply 的转化生成新的序列
Observable.just(1,2,3,4)
.cast(Long.TYPE) // 将 int 类型转化为 long
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return aLong + "";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
flatMap()、concatMap()、flatMapIterable()
将 Observable 发射的集合数据变换为 Observable 的集合,然后这个 Observable 集合中的每个 Observable 单独对数据进行发射,形成一个平坦化的过程。可以认为是把类似二维数组结构的所有元素单拎出来进行发射。如下示例
// 生成数据结构
List<List<String>> list = new ArrayList<>();
List<String> inner01 = new ArrayList<>();
inner01.add("aaa");
inner01.add("bbb");
list.add(inner01);
List<String> inner02 = new ArrayList<>();
inner02.add("ccc");
inner02.add("ddd");
list.add(inner02);
// 使用 flatmap
Observable.fromIterable(list)
.flatMap(new Function<List<String>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
// ------------------------打印结果 --------------//
aaa
bbb
ccc
ddd
看到上面的例子实际上我们可以这么理解,fromIterable 的调用将 list 的第一层列表进行了剥离,剥离出来后,apply 方法返回了另一个 Observable 对象,而这个对象的作用是将第二层列表进行了剥离,然后最终的观察者和所有这些返回的 Observable 都进行了关联。这样,每个元素发射的消息观察者都能够收到了。
concatMap 和 flatMap 的用法是一样的,只不过 flatMap 打印出来的数据可能是存在乱序的,但是 concatMap 一定是正常的序列。
flatMapIterable 的用法和 flatMap 也是类似的,只是返回的结果是 Iterable。
Observable.fromIterable(list)
.flatMapIterable(new Function<List<String>, Iterable<String>>() {
@Override
public Iterable<String> apply(List<String> strings) throws Exception {
return strings;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
buffer
定期收集 Observable 的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
Observable.range(0,10)
.buffer(3)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.d(TAG,integers.toString());
}
});
// ---------------------- 打印结果 ------------------
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]
window
window 和 buffer 比较类似,都是定期收集 Observable 发射出来的数据,然后发射出去,与 buffer 不同的是,window 发射的也是一个被观察者,这个被观察者被接收后还需要注册一个观察者接收最终的数据。
Observable.range(0,7)
.window(3)
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
Log.d(TAG, "accept-outer");
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept-inner" + integer);
}
});
}
});
// ----------------------- 打印结果 -------------------//
accept-outer
accept-inner0
accept-inner1
accept-inner2
accept-outer
accept-inner3
accept-inner4
accept-inner5
accept-outer
accept-inner6
scan
这个操作符是将序列中的每一个元素都应用到一个方法中进行操作,然后返回操作的结果。实例如下所示:
Observable.range(0, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG, integer + "----" + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept-" + integer);
}
});
});
// ------------------------打印结果 -----------------------//
accept-0
0----1
accept-1
1----2
accept-3
3----3
accept-6
6----4
accept-10
首先从打印结果中可以看到,第一个发射的数据直接发送给了观察者,然后从第二个开始,执行 BiFunction 的回调,里面 apply 的第一个参数是上一次计算的结果,那么第二个的上次计算结果便是第一个数据。然后进行一系列的操作,直到数据发射完毕。
groupBy
将原始的 Observable 拆分成多个 Observable,然后每个拆分后的 Observable 发射的都是原始序列的子序列。拆分规则就是分组拆分,如何分组就是按照传入的参数来分组。
Observable.just(0,1,2,3,4).groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer % 2 == 0? "偶数": "奇数";
}
}).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
Log.d(TAG,stringIntegerGroupedObservable.getKey());
stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,stringIntegerGroupedObservable.getKey() + "--" + integer);
}
});
}
});
// -------------- 打印结果 ---------------//
偶数
偶数--0
奇数
奇数--1
偶数--2
奇数--3
偶数--4
过滤操作符
过滤操作符,顾名思义也就是用来过滤所发射的消息的操作符,下面具体看一下都包含哪些。
first、last
first 操作符是取所有发射元素中的第一个值,如果说没有元素发射,那么就取默认值,默认值是通过 first 方法的参数传进去的。调用 first 方法后返回的是一个 Single 对象,这个对象是一个特殊的被观察者,只能发送单个消息。
Observable.range(0,10)
.first(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer+"");
}
});
// ----------------- 打印结果 ---------------
0
last 的用法和 first 类似,只是取的是最后一个发射的元素。
elementAt、ignoreElements
elementAt 是取发射序列中的第 index 个值,如果取不到,那么要么是默认值,要么调用 onError。
Observable.range(0,10)
.elementAt(3,10)
//.elementAtOrError(3) // 如果发射序列数量小于 3 那么取不到便调用 onError
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer+"");
}
});
// ------------------ 打印结果 ----------------
3
ignoreElements 是忽略所有发射的消息,只关心最后的结果,即执行 onComplete 方法
take、takeLast、skip、skipLast
take 的使用方法为拿取前 n 个发射出来的元素,如果传入的 n 实际大于发射的元素个数,那么获取所有元素后就直接执行 onComplete 方法
它的另一个用法是获取前 n 个时间段的数据,丢弃超过这个时间段的其他剩余数据。
Observable.range(0,10)
.take(3) // 获取前三个发射出来的数据,剩余数据丢弃
// .take(3,TimeUnit.SECONDS) 获取前 3 秒发射的数据,剩余数据丢弃
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
- takeLast 的用法自然和 take 的用法类似,只是是最后的三个元素。
- skip 的用法和 take 类似,只是 skip 是跳过前 n 个元素。
- skipLast 的含义和 skip 类似,只是跳过后 n 个元素。
distinct
distinct 是过滤重复的信号,只允许没有发射的数据项通过到达观察者。
Observable.just(1,3,2,4,2,2,4)
.distinct()
// .distinctUntilChanged()
// .distinct(new Function<Integer, String>() {
// @Override
// public String apply(Integer integer) throws Exception {
// return integer+"";
// }
// })
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer+"");
}
});
// -------------------打印结果 -------------------
1
3
2
4
- distinct 的一个重载方法是传入一个 Function 对象,通过这个对象中方法的返回值进行 key 是否重复的判断。
- distinctUntilChanged 则是只过滤连续相邻的信号。
filter
只发射符合要求的数据信号。
Observable.just(1,3,2,2,2,4,2,2,4)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer+"");
}
});
// -------------------- 打印结果 ---------------
3
4
4
debounce
该操作符的初衷是为了过滤掉发射速率过快的消息,如下所示,间隔小于 500 的消息都没被接收
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i< 10;i++) {
emitter.onNext(i);
Thread.sleep(100*i);
}
emitter.onComplete();
}
})
.debounce(500,TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer.toString());
}
});
// -------------------- 打印结果 ----------------
6
7
8
9
条件操作符和布尔操作符
all、contains、amb
all 是判断发射的数据是否全部满足传入的条件。如下所示,传入的数字序列 1,2,3,4,5 全部都小于 6,那么最终观察者拿到的数据为 true。
Observable.just(1, 2, 3, 4, 5)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 6;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, aBoolean.toString());
}
});
// -------------------- 打印结果 ----------------
true
contains 是判断是否发射了指定的数据。如下所示,打印结果为 true。
Observable.just(1, 2, 3, 4, 5)
.contains(3)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, aBoolean.toString());
}
});
// -------------------- 打印结果 ----------------
true
amb 是选择列表中发射数据最早的那个被观察者,一旦这个被观察者发射了数据,那么它就可以发射完,但是其他的被观察者就不能发射数据了。
Observable.ambArray(
Observable.just(1,2).delay(1,TimeUnit.SECONDS),
Observable.just(3,4)
).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, integer.toString());
}
});
// -------------------- 打印结果 ----------------
3
4
defaultIfEmpty
defaultIfEmpty 用来判断如果被观察者发射空的数据,那么 defaultIfEmpty 就指定一个默认的数据来发射。
Observable.empty()
.defaultIfEmpty(4)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object integer) throws Exception {
Log.d(TAG, integer.toString());
}
});
// ------------ 打印结果 -----------
4
sequenceEqual
用来判断两个发射序列是否是相同的,如果是相同的则发送给观察者 true,否则为 false。发射的顺序不同也是不同。
Observable.sequenceEqual(
Observable.just(1,2,3,4),
Observable.just(2,1,3,4) // 如果此处为 1,2,3,4,则打印为 true
).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, aBoolean.toString());
}
});
// ----------------打印结果 --------------
false
另外有一个重载方法来进行复杂的判断,如下所示,在 BiPredicate 的 test 方法中进行每一项元素的对比。
Observable.sequenceEqual(
Observable.just(1, 2, 3, 4),
Observable.just(2, 3, 4, 5),
new BiPredicate<Integer, Integer>() {
@Override
public boolean test(Integer integer, Integer integer2) throws Exception {
return integer == integer2 - 1;
}
}
).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, aBoolean.toString());
}
});
// ---------------------- 打印结果 ----------------
true
skipWhile、skipUntil、takeUntil、takeWhile
skipWhile 一直跳过发送的序列,直到指定的条件不成立为止,它和 filter 的区别在于是 filter 是对每个元素都进行判断,而 skipWhile 则是如果某一个元素满足了条件,那么它后续的所有元素不再进行判断,直接发送给观察者。如下代码所示, 6 满足条件以后,就算 4 不满足条件,也会发射给观察者。
Observable.just(1,2,3,6,5,4)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 5;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, integer.toString());
}
});
// -------------------- 打印结果 ---------------------
6
5
4
- takeWhile 和 skipWhile 刚好相反,从一开始发送序列,直到条件满足是不发送序列。
skipUntil 丢弃最开始的数据,直到 skipUntil 中传递的 Observable 也开始发送数据了。
Observable.interval(2,TimeUnit.SECONDS)
// 直到这个被观察者开始发射数据,上面的被观察者发射的数据才会传到观察者里
.skipUntil(Observable.timer(5,TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, aLong.toString());
}
});
// ------------------------- 打印结果-----------------
2
3
4
...
- takeUntil 和 skipUtil 也是相反的操作,一直拿数据,直到不满足条件后丢弃剩余的数据。
合并操作符
merge 操作符
merge 操作是将多个 Observable 合并起来发射,从下面的示例中可以发现多个的 Observable 是并行发送的。一旦某个 Observable 出现了 onError,那么就会停止所有的发射工作。
Observable.merge(
Observable.interval(2,TimeUnit.SECONDS),
Observable.interval(3,TimeUnit.SECONDS)
).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, aLong.toString());
}
});
// ------------------- 打印结果 ---------------
0
0
1
2
1
3
...
zip
这个操作符可以传入多个 Observable,它的作用是将每个 Observable 第 n 个元素提取出来做一些处理发送给观察者,如果某个 Observable 发射的数据较多的话,就会直接舍弃。
Observable.zip(
Observable.just(1, 3, 5),
Observable.just(7, 8, 9, 10),
new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return integer + " -- " +integer2;
}
}
).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
// ------------------------- 打印结果 -------------------------
1 -- 7
3 -- 8
5 -- 9
combineLatest
我们先看一下示例,在分析这个操作符是做什么的。
Observable.combineLatest(
Observable.interval(2, TimeUnit.SECONDS),
Observable.interval(3, TimeUnit.SECONDS),
new BiFunction<Long, Long, String>() {
@Override
public String apply(Long aLong, Long aLong2) throws Exception {
return "first" + aLong + "----second" + aLong2;
}
}
).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
// ---------------------- 打印结果 -----------------
first0----second0
first1----second0
first2----second0
first2----second1
first3----second1
传入 combineLatest 有两个 Observable,这两个 Observable 和 zip 的方法类似,是并发执行的。而这两个 Observable 任意一个发射数据都会调用 apply 方法,而调用的参数则是两个 Observable 发送的最后一个数据。
startWith
在最开始插入一个指定的数据
Observable.just(1,2,3)
.startWith(100)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer.toString());
}
});
// ------------------------ 打印结果 --------------------------
100
1
2
3
switch
传入的参数为一个被观察者,将原 Observable 中的元素每一个都提取出来生成一个新的 Observable,返回出来。
Observable.switchOnNext(
Observable.interval(2,TimeUnit.SECONDS)
.map(new Function<Long, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Long aLong) throws Exception {
return Observable.just(aLong.toString() + "--");
}
})
).take(4).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
// ----------------------打印结果 --------------
0--
1--
2--
3--
连接操作符
connect
ConnectableObservable 是一个特殊的被观察者,它并不是在调用 subscribe()的时候发射的数据,而是调用 connect 方法的时候发射数据。一个 ConnectableObservable 可以订阅多个观察者,而这些订阅的观察者会共享一组发射的数据,示例如下所示:
Observable observable = Observable.interval(1,TimeUnit.SECONDS).take(5);
ConnectableObservable co = observable.publish();
co.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG,o.toString());
}
});
co.delaySubscription(3,TimeUnit.SECONDS).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG,o.toString());
}
});
Log.d(TAG,"start connect");
co.connect();
// -------------------------- 打印结果 ---------------------
start connect
0
1
2
2
3
3
4
4
你会发现,就算第二个观察者进行了延迟注册,那么被观察者的数据也不会重新发射,而是继续按照自己的步骤去发射。
- publish 操作符是将普通的 Observable 转化为 ConnectableObservable
- refcount 则是将 ConnectableObservable 转化为普通的 Observable。
- replay 操作符返回 ConnectableObservable 对象,它生成的这个对象能保证所有的观察者都能收到相同的序列,因为它有缓存。
Observable observable = Observable.interval(1,TimeUnit.SECONDS).take(5);
ConnectableObservable co = observable.replay();
co.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG,o.toString());
}
});
co.delay(3,TimeUnit.SECONDS).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG,o.toString());
}
});
co.connect();
// -------------------------- 打印结果 ----------------------
0
1
2
3
0
4
1
2
3
4
从上面的打印结果可以看出,0,1,2,3,4 这个序列是打印和两次的,这个和通过 publish 生成的 ConnectableObservable 正好有个鲜明的对比。
do 操作符
do 操作符是用来给生命周期加回调的,我们能在回调中做自己想做的事情。下面用一个例子来做说明。
Observable.just("aaaa")
.doOnNext(new Consumer<String>() { // 每发射一项数据都会调用一次,在观察者调用之前
@Override
public void accept(String s) throws Exception {
Log.d(TAG,"doOnNext----" + s);
}
})
.doAfterNext(new Consumer<String>() { // 每发射一项数据都会调用一次,在观察者调用之后
@Override
public void accept(String s) throws Exception {
Log.d(TAG,"doAfterNext----" + s);
}
})
.doOnComplete(new Action() { // 正常终止的时候调用
@Override
public void run() throws Exception {
Log.d(TAG,"doOnComplete----");
}
})
.doOnSubscribe(new Consumer<Disposable>() { // 订阅时调用
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG,"doOnSubscribe----");
}
})
.doAfterTerminate(new Action() { // 当 Observable 调用 onComplete 或 onError 的时候调用
@Override
public void run() throws Exception {
Log.d(TAG,"doAfterTerminate----");
}
})
.doFinally(new Action() { // 终止的时候调用,不管是正常终止还是异常终止,会在 doAfterTerminate 之前调用
@Override
public void run() throws Exception {
Log.d(TAG,"doFinally----");
}
})
.doOnEach(new Consumer<Notification<String>>() { // 每发射一项数据都会调用,在 doOnNext 之后,观察者之前
@Override
public void accept(Notification<String> stringNotification) throws Exception {
Log.d(TAG,"doOnEach----" + stringNotification.isOnNext());
}
})
.doOnLifecycle(new Consumer<Disposable>() { // 可以再观察者订阅之后取消订阅,在 doOnSubscribe 之后
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "doOnLifecycle----");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "doOnLifecycle----action");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,"subscribevalue ----" + s);
}
});
// --------------------------- 打印结果 ------------------------
doOnSubscribe----
doOnLifecycle----
doOnNext----aaaa
doOnEach----aaaa
subscribevalue ----aaaa
doAfterNext----aaaa
doOnComplete----
doOnEach----true
doFinally----
doAfterTerminate----
小结
可以发现,上面的操作符还是挺多的,读者如果想要加深理解的话,可以把每个操作符的使用都实践一遍,这样印象就更深刻了。当然,就算在印象深刻也有忘的时刻,那么忘记的话就可以再来翻一下本篇的文章,或者去看官方文档。
Rxjava 的被观察者 Observable
说了那么多操作符,那么 Rxjava 的 Observable 是不是只有一种呢?答案当然不是咯,Rxjava 为了方便开发者在各种场景下使用,开发出了多种多样的 Observable,我们可以先预览一下这些 Observable。
Observable
这是最常用的被观察者,我们发现在介绍操作符的时候大部分都是用的这个,它能够发射 0-n 个数据,并以成功或错误事件而终止。而其他的操作符都是用在特殊场景下的操作符。因为上面已经有了太多的例子,这里就不再展开进行说明了。
Flowable
Flowable 能够发射 0 到 n 个数据,并以成功或错误事件而终止,相比 Observable 它多出来的是背压的概念。在整片文章的最开始举了一个背压的例子,现在我们做一下详细的解释。
在 Rxjava 中有观察者和被观察者的概念,被观察者发射数据,观察者接收数据。如果说观察者和被观察者在不同线程中,那么有可能存在以下状况:被观察者产生数据的速度高于观察者处理的速度,这就有可能导致观察者“消化不良”而导致数据的积攒。背压就是为了解决这个问题的。
如下所示:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i + "");
}
}
}, BackpressureStrategy.BUFFER) // 在此处指定背压的模式。
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
因为发射数据和处理数据在不同的线程,所以需要一个缓存在进行存储,而 Rxjava 的默认缓存的队列大小只有 128,所以背压的状况才有可能发生。如上面指定的背压策略 BackpressureStrategy.BUFFER,该模式下这个缓存池没有固定大小,可以无限添加数据,当然如果添加过多数据的话就会导致 OOM。还有其他的一些背压策略我们简单进行说明。
背压策略 | 说明 |
---|---|
MISSING | 没有指定背压策略 |
ERROR | 如果放入异步缓存中的数据超限,那么抛出异常 |
BUFFER | 无限增加缓存池大小 |
DROP | 如果缓存池已满,那么丢弃新的打算放入缓存池的数据 |
LATEST | 如果缓存池已满,则丢掉要放入缓存池的数据,但是保留最新的那个数据,也就是最新的数据会强行放入缓存池。 |
Single
只会发射单个数据或者错误事件
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
emitter.onSuccess("success");
// emitter.onError(new RuntimeException());
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
// ------------------------------- 打印结果 ----------------
success
Completable
从来不发射数据,只发射 oncomplete 事件和 onError 事件。
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
emitter.onComplete();
}
}).subscribe(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG,"onComplete");
}
});
// ---------------- 打印结果 -----------------
onComplete
Maybe
能够发射 0 或 1 个数据
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> emitter) throws Exception {
emitter.onSuccess("success");
emitter.onSuccess("success02");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
// ------------------- 打印结果 --------------------
success
Subject
Subject 在文章的最上面也举了例子,它相当于是一个代理或者桥,它既是观察者,也是被观察者。Subject 一共有四个类型,分别是 AsyncSubject、BehaviorSubject、ReplaySubject、PublishSubject。
AsyncSubject 的观察者只会接受调用 onComplete()之前的那一个数据,而且如果不调用 onComplete,数据就不会发射。
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("subject01");
subject.onNext("subject02");
subject.onComplete();
subject.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe_async");
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG,"onComplete");
}
});
// ------------------------------- 打印结果如下 --------------------------
onSubscribe_async
subject02
onComplete
BehaviorSubject 的观察者只接受订阅前的最后一个发射的数据和订阅之后的所有数据。
// 传入的这个 default 值是如果订阅之前没有发射任何数据的话,那么就会发射这个数据
BehaviorSubject subject = BehaviorSubject.createDefault(666);
subject.onNext(1);
subject.onNext(2);
subject.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG,o.toString());
}
});
subject.onNext(3);
subject.onNext(4);
// -------------------------- 打印结果 ---------------------
2
3
4
ReplaySubject 的观察者会接受订阅前和订阅后的所有数据。
// 还有个 create 参数,会发射所有的数据,createWithSize 参数传入 2 是缓存订阅前的两个数据。
ReplaySubject replaySubject = ReplaySubject.createWithSize(2);
replaySubject.onNext(1);
replaySubject.onNext(2);
replaySubject.onNext(3);
replaySubject.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG,o.toString());
}
});
replaySubject.onNext(4);
// ---------------------- 打印结果 -------------
2
3
4
PublishSubject 只接收订阅之后发射的数据。
PublishSubject subject = PublishSubject.create();
subject.onNext(1);
subject.onNext(2);
subject.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG,o.toString());
}
});
subject.onNext(3);
// ----------------------- 打印结果 ------------------
3
Rxjava 调度器
Rxjava 通过 Schedulers 来进行线程调度,如下面所展示的这个例子 subscribe 发生的新建的线程中,map 操作发生在 computation 线程中,而最后的订阅后的操作则是发生在 Android 的主线程中。异步操作能用这种流式的方式来写,也是 Rxjava 的魅力所在。
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.d(TAG,"apply--" + Thread.currentThread().toString());
return integer.toString();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,"accept--" + s + "--" + Thread.currentThread().toString());
}
});
// -----------------打印结果如下 -------------------
apply--Thread[RxComputationThreadPool-1,5,main]
apply--Thread[RxComputationThreadPool-1,5,main]
apply--Thread[RxComputationThreadPool-1,5,main]
apply--Thread[RxComputationThreadPool-1,5,main]
accept--1--Thread[main,5,main]
accept--2--Thread[main,5,main]
accept--3--Thread[main,5,main]
accept--4--Thread[main,5,main]
调度器 | 作用 |
---|---|
Schedulers.single() | 使用长度为 1 的线程池,重复利用这个线程 |
Schedulers.computation() | 使用固定线程池,适合 CPU 密集计算 |
Schedulers.io() | 适合 io 操作 |
Schedulers.trampoline() | 直接在当前线程运行,如果有其他任务,则先暂停其他任务 |
AndroidSchedulers.mainThread() | Rxandroid 中的调度器,在主线程中运行 |
Rxjava 源码解析
相信经过上面对 Rxjava 的详细解读,大家都 Rxjava 的使用有了一定程度的了解吧。下面我们就开始一步一步的撸 Rxjava 的源码了。
那么我们从最简示例开始。
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("a");
emitter.onNext("b");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"onError");
}
@Override
public void onComplete() {
Log.d(TAG,"onComplete");
}
};
observable.subscribe(observer);
首先,通过Observable.create
方法创建了一个被观察者,create 方法传入的参数是新创建的 ObservableOnSubscribe 实例。我们看到这个实例是只有一个方法供我们继承,而我们可以在这个方法中自定义我们的事件发射机制。。
// 在复写的这个方法里有一个发射器,通过这个发射器我们自定义发射事件的顺序
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
我们具体看一下 create 方法里面如何实现。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
传入的 source 参数外部又被封装了一层 ObservableCreate。我们先看RxJavaPlugins.onAssembly
做了什么处理。
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
这个方法是当添加了一些 hook 后,用来在订阅之前执行 hook 操作的,我们可以暂时不用管,直接看ObservableCreate
这个类。这个类是返回给 create 方法的实现类,也就是我们获取到的 Observable,它实现了 subscribeActual 方法。
protected void subscribeActual(Observer<? super T> observer) {
// 创建了一个事件发射器,并和观察者进行了绑定
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用观察者的 onSubscribe 回调
observer.onSubscribe(parent);
try {
// 调用被观察者的 subscribe 方法开始触发事件,并将发射器传递进去。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// 如果失败了那么发射器就发射错误消息
parent.onError(ex);
}
}
那这个方法到底是在哪里调用的呢?我们往后看一下。
创建了观察者和被观察者后,下一步的操作就是将观察者和被观察者进行绑定,通过我们上面示例中每次都会用到的 subscribe 方法。我们看一下这个方法的实现。(省略掉了一些非重点的操作)
public final void subscribe(Observer<? super T> observer) {
try {
// ...
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
throw e;
}
}
在这里,我们找到了刚才的那个疑问,subscribeActual
方法是在订阅的时候调用的。回去再看 subscribeActual 的代码,我们发现在调用 Observable 的 subscribe 方法之前,先调用了观察者的 onSubscribe。而在调用被观察者的 subscribe 方法的时候将发射器作为参数传了进行。我们在新建 Observable 的时候持有的发射器就是这个实例。
我们看一下这个事件发射器 CreateEmitter,发现它继承了 Disposable 接口,这个接口是做什么用的呢?这个是用来阻断事件的传播的。我们从最初 Rxjava 使用的那个代码块里可以看到 onSubscribe 方法里可以获取到这个参数。在任意位置调用 Disposable.dispose()方法后,即使发射器再发射消息,那么观察者也不会再去接受消息了。如下代码所示
public void onNext(T t) {
// ...
if (!isDisposed()) {
observer.onNext(t);
}
}
观察者和被观察者是通过发射器这个类关联起来的,被观察者通过发射器发送消息,发射器则持有观察者的实例。每次通过发射器发射一个数据,这个数据都会直接被传到观察者那里,调用观察者的 onNext()方法。
简单变种
下面是 Rxjava 使用时的一个简单的变种,也是操作符 just 最简单的一个使用了。
Observable.just(1,2,3,4)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
这个变种不难理解,Observable.just()返回了ObservableFromArray
对象,这里面通过 for 循环不断去执行 onNext()方法。
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
下面这个更复杂了一点,添加了 map 变换操作符。
Observable observable = Observable.just(1,2,3,4)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
just 返回了 ObservableFromArray 的实例,调用 map 方法后,返回的是 ObservableMap 的实例,该实例也持有 ObservableFromArray 的实例。ObservableMap 调用 subscribe 与观察者关联的时候,实际上是新建了一个观察者,然后将新建的这个观察者关联到 ObservableFromArray 中。
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
新建的这个观察者是 MapObserver ,它相当于是一个桥梁,它持有我们自己新建的观察者对象、持有 map 的变换方法,这样在它的 onNext 方法中,便可以先进性数据转换,然后把数据转换的结果传递个我们定义的观察者里。
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
变种 2.0
defer 你可以认为实际创建的不是真正的 Observable,而是一个被观察者创建的工厂类。
// 只有发生订阅 subscribe 动作的时候才创建出真正的被观察者,defer 创建的相当于是被观察者的创建工厂
// 真实的被观察者都是通过工厂类创建出来的
Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just("1","2");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
我们看一下源码,如下所示,subscribeActual 代码中通过 supplier.call()的调用创建了被观察者,然后创建后的被观察者再去订阅我们自己创建的观察者。也就是说,虽然外部是 defer 关联了观察者,而实际上却是内部创建的被观察者对观察者进行了关联。
这也就牵涉到了 Observable 分类的概念,Observable 分为 Hot 和 Cold 两类,Cold Observable 是只有在订阅的时候才会去创建被观察者,而 Hot Observable 则在一开始就被创建出来了。
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null ObservableSource supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
return;
}
pub.subscribe(s);
}
变种 3.0 flatMap
我们先看一下前面 flatMap 举的事例:
List<List<String>> list = new ArrayList<>();
List<String> inner01 = new ArrayList<>();
inner01.add("aaa");
inner01.add("bbb");
list.add(inner01);
List<String> inner02 = new ArrayList<>();
inner02.add("ccc");
inner02.add("ddd");
list.add(inner02);
Observable.fromIterable(list)
.flatMapIterable(new Function<List<String>, Iterable<String>>() {
@Override
public Iterable<String> apply(List<String> strings) throws Exception {
return strings;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,s);
}
});
flatMap 是将二维数组的第二层数据进行了剥离,我们具体看是怎么做的。
真正的实现是在 FlattenIterableObserver 类中,它持有真正的观察者对象,持有 FlatMap 里面的 Function。当看到源码时,我们也不觉着很神奇了,FlattenIterableObserver 的 onNext 方法将 apply 返回的对象进行了一下遍历,然后将遍历的结果全部传给了真正的 Observer,然后就是我们现在看到的熨平二维数组的效果。
public void onNext(T value) {
Iterator<? extends R> it;
try {
// 将 fromIterable 拿到的结果进行一下转换
it = mapper.apply(value).iterator();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
return;
}
Observer<? super R> a = actual;
for (;;) { // 将转换后的 Iterable 类型的进行遍历
boolean b;
try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
return;
}
if (b) {
R v;
try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
return;
}
// 将遍历的每一个元素应用到真正的观察者上。
a.onNext(v);
} else {
break;
}
}
}
异步源码
在使用 Rxjava 的异步的时候是不是对异步操作竟然能用这种流式的编程方式开发出来而感到神奇呢?下面我们就来揭露出它神秘的面纱。
Observable.just(0,1,2) // just 返回了 ObservableFromArray 对象
.subscribeOn(Schedulers.newThread()) // 返回了 ObservableSubscribeOn 对象
.observeOn(Schedulers.io()) // 返回了 ObservableObserveOn 对象
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
首先我们先看 subscribeOn 指定订阅事件发生的线程。
在 ObservableSubscribeOn 类的 subscribeActual 方法中,我们发现观察者的 onSubscribe 是无法控制在哪个线程调用的。而其他的任务都放到了线程调度器里去执行了。
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 观察者 onSubscribe 方法的回调。
s.onSubscribe(parent);
// 把需要执行的任务放进了调度器里
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
我们看下 scheduleDirect 里有什么。
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
在 scheduleDirect 里创建了 Worker,做真正的执行操作:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
我们可以看到调用到了 NewThreadWorker 的 schedule 方法,而其中则是真正调用到了 scheduleActual 方法。
如下为 schedule 方法,在这个方法里可以看到是调用了线程池新建了一个线程来处理具体任务的执行。
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
看完了 subscribeOn 的代码,我们再看一下 ObservableOn 的代码。这个方法返回的是 ObservableObserveOn 对象,这个对象也是一个被观察者,当它注册到观察者时也会调用自己的 subscribeActual 方法。
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
可以看到,也是生成了一个 Worker 对象,并注册上了新建的观察者,ObserveOnObserver。
我们最终看到,也是调用了 Worker 的 schedule 方法来执行具体的操作。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
兜兜转转,这一个异步的原理和 subscribeOn 是一样的,主要都集中在了线程调度的那个类里,即 Scheduler 里。
背压的源码
背压的代码和正常的观察者的代码是类似的,较大的区别在于背压使用了特殊的数据发射器。我们举一个无限扩容的背压的例子来说吧,即 BackpressureStrategy.BUFFER 模式。
// BufferAsyncEmitter
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 在这里,向缓存队列里添加数据,而如果缓存不够用的话那就添加缓存。
queue.offer(t);
drain();
}
在此处是将发射的数据放置到缓存队列里去。
public boolean offer(final T e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<Object> buffer = producerBuffer;
final long index = lpProducerIndex();
final int mask = producerMask;
final int offset = calcWrappedOffset(index, mask);
if (index < producerLookAhead) {
return writeToQueue(buffer, e, index, offset);
} else {
final int lookAheadStep = producerLookAheadStep;
// go around the buffer or resize if full (unless we hit max capacity)
int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad
producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
return writeToQueue(buffer, e, index, offset);
} else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
return writeToQueue(buffer, e, index, offset);
} else {
resize(buffer, index, offset, e, mask); // add a buffer and link old to new
return true;
}
}
}
subject 源码分析
Subject 即是观察者,也是被观察者,我们使用 ReplaySubject 来分析一下它是如何实现的。
首先我们看到 Subject 是既继承了观察者类、又继承了被观察者类。
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
}
作为被观察者,当它被观察者订阅的时候会调用 subscribeActual 。
protected void subscribeActual(Observer<? super T> observer) {
ReplayDisposable<T> rs = new ReplayDisposable<T>(observer, this);
observer.onSubscribe(rs);
if (!rs.cancelled) {
if (add(rs)) {
if (rs.cancelled) {
remove(rs);
return;
}
}
buffer.replay(rs);
}
}
它里面有一个缓存 buffer,用来存储通过 作为观察者的 onNext()方法传进去的数据。
public void onNext(T t) {
if (done) {
return;
}
ReplayBuffer<T> b = buffer;
b.add(t);
for (ReplayDisposable<T> rs : observers.get()) {
b.replay(rs);
}
}
而调用它的 subscribe 方法后就会从 buffer 中取数据。具体怎么取需要看这个缓存的类型。
public static <T> ReplaySubject<T> createWithSize(int maxSize) {
return new ReplaySubject<T>(new SizeBoundReplayBuffer<T>(maxSize));
}
createWithSize 的用法在上面已经介绍过了,它是如何实现的呢?我们只需要看一下 SizeBoundReplayBuffer 的源码就知道了。当队列中的缓存过多时,链表就会删掉尾部最早插入的数据,而实现 subscribe 时能发生订阅前的 n 个数据。
void trim() {
if (size > maxSize) {
size--;
Node<Object> h = head;
head = h.get();
}
}
@Override
public void add(T value) {
Node<Object> n = new Node<Object>(value);
Node<Object> t = tail;
tail = n;
size++;
t.set(n); // releases both the tail and size
trim();
}
总结
Rxjava 的 组成、用法和原理基本上总结的差不多了。当然这不是一个结束,而是一个开始,当真正的了解到了它的强大,并在开发过程中使用它而得到了便利,这才是它真正的价值。