1. RxJava2 : 什么是观察者模式
2. RxJava2 : 创建操作符(无关时间)
3. Rxjava2 : 创建操作符(有关时间)
4. Rxjava2 : 变换操作符
5. Rxjava2 : 判断操作符
6. Rxjava2 : 筛选操作符
7. Rxjava2 : 合并操作符
8. Rxjava2 : do操作符
9. Rxjava2 : error处理
10. Rxjava2 : 重试
11. Rxjava2 : 线程切换
api | use |
---|---|
concat / concatArray | {{concat}} |
concatDelayError | {{concatDelayError}} |
merge / mergeArray | {{merge}} |
mergeDelayError | {{mergeDelayError}} |
zip | {{zip}} |
reduce | {{reduce}} |
collect | {collect} |
startWith / startWithArray | {{startWith}} |
concat / concatArray
- concat
1.concat的参数不得超过4个(不能超过4个Observable)
2.concat是按照顺序合并的 - concatArray
concatArray如果参数超过4个(就将Observable组成集合传入),剩下的均与concat相同
流程
3个Observable一共有9个元素
Observable0(元素1,元素2,元素3)
Observable1(元素4,元素5,元素6)
Observable2(元素7,元素8,元素9)
->
按照顺序合并元素
(元素1,元素2,元素3,元素4,元素5,元素6,元素7,元素8,元素9)
->
subscribe
->
observer 观察到9个元素
Observable<Integer> observable0 = Observable.just(1, 2, 3);
Observable<Integer> observable1 = Observable.just(4, 5, 6);
Observable<String> observable2 = Observable.just("7", "8", "9");
Observable.concat(observable0, observable1, observable2)
.subscribe(new Observer<Serializable>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Serializable serializable) {
if (serializable instanceof Integer) {
Log.d(TAG, "Integer:" + serializable);
} else if (serializable instanceof String) {
Log.d(TAG, "String:" + serializable);
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:1
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:2
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:3
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:4
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:5
02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:6
02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:7
02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:8
02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:9
02-12 17:42:57.603 6230-6230/... D/SplashActivity: onComplete
concatDelayError
- concatDelayError
1.将onError放在了最后执行,那么onError就无法作为结束标识了,只能依靠onComplete作为结束标识
2.能否执行onError取决于能否找到onComplete标识
3.采用concat的方式,即按照元素顺序合并,Observable0的complete失去作用,能否执行onError就取决于Observable1是否有结束标识
4.以onError结束,则不会执行onComplete
流程
Observable
->
Observable0(元素1,元素2,元素3,throwable,complete)
Observable1(元素4,元素5,complete)
->
元素1,元素2,元素3,元素4,元素5,complete
->
元素1,元素2,元素3,元素4,元素5,throwable
->
subscribe
->
observer
->
log throwable
//Observable1具有e.onComplete()结束标识
Observable<Integer> observable = Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
e.onComplete();
});
Observable<Integer> observable2 = Observable.create(e -> {
e.onNext(4);
e.onNext(5);
e.onComplete();
});
List<Observable<Integer>> objects = new ArrayList<>();
objects.add(observable);
objects.add(observable2);
Observable.concatDelayError(objects)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:1
02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:2
02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:3
02-12 17:57:01.093 7281-7281/... D/SplashActivity: integer:4
02-12 17:57:01.093 7281-7281/... D/SplashActivity: integer:5
02-12 17:57:01.093 7281-7281/... D/SplashActivity: onError
//Observable1不具有e.onComplete()结束标识
Observable<Integer> observable = Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
e.onComplete();
});
Observable<Integer> observable2 = Observable.create(e -> {
e.onNext(4);
e.onNext(5);
});
List<Observable<Integer>> objects = new ArrayList<>();
objects.add(observable);
objects.add(observable2);
Observable.concatDelayError(objects)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:1
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:2
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:3
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:4
02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:5
merge
- merge
1.merge的参数不得超过4个(不能超过4个Observable)
2.merge是按照时间合并的 - mergeArray
mergeArray如果参数超过4个(就将Observable组成集合传入),剩下的均与concat相同
流程
时间(秒) | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |
---|---|---|---|---|---|---|---|---|---|
Observable0 | 没发送 | 没发送 | 1 | 没发送 | 没发送 | 2 | 没发送 | 没发送 | 3 |
Observable1 | 没发送 | 4 | 没发送 | 没发送 | 5 | 没发送 | 没发送 | 6 | 没发送 |
Observable2 | 没发送 | 7 | 没发送 | 没发送 | 8 | 没发送 | 没发送 | 9 | 没发送 |
依照时间来合并:
则是 4,7,1,5,8,2,6,9,3
Log.d(TAG, "in");
Observable<Long> longObservable = Observable
.intervalRange(1, 3, 2, 2, TimeUnit.SECONDS);
Observable<Long> longObservable1 = Observable
.intervalRange(4, 3, 1, 2, TimeUnit.SECONDS);
Observable<Long> longObservable2 = Observable
.intervalRange(7, 3, 1, 2, TimeUnit.SECONDS);
Observable
.merge(longObservable, longObservable1,longObservable2)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "aLong:" + aLong);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-13 09:33:04.121 13228-13228/... D/SplashActivity: in
02-13 09:33:05.121 13228-13271/... D/SplashActivity: aLong:4
02-13 09:33:05.121 13228-13272/... D/SplashActivity: aLong:7
02-13 09:33:06.131 13228-13270/... D/SplashActivity: aLong:1
02-13 09:33:07.131 13228-13271/... D/SplashActivity: aLong:5
02-13 09:33:07.131 13228-13272/... D/SplashActivity: aLong:8
02-13 09:33:08.131 13228-13270/... D/SplashActivity: aLong:2
02-13 09:33:09.131 13228-13271/... D/SplashActivity: aLong:6
02-13 09:33:09.131 13228-13272/... D/SplashActivity: aLong:9
02-13 09:33:10.121 13228-13270/... D/SplashActivity: aLong:3
02-13 09:33:10.121 13228-13270/... D/SplashActivity: onComplete
mergeDelayError
- mergeDelayError
与concatDelayError类似,onError会在最后接收到
区别在于:
concatDelayError:
会以最后一个onComplete作为结束的标识
mergeDelayError:
在时间过程当中,哪个Observable出现了error,哪个Observable就不再继续发送了,直至所有Observable的元素都结束之后,处理onError
流程
时间 | 0 | 1 | 2 |
---|---|---|---|
Observable0 | 1 | throwable | 2 |
Observable1 | 3 | 4 | 5,complete |
合并 | 1,3 | throwable,4 | error结束,5,complete |
则最后的结果为: 1,3,4,5,onError;
注意:在示例中,我采取了指定不同的线程做延时,如果在相同线程中,会出现卡线程的情况,会影响到结果
Log.d(TAG, "in");
Observable<Long> longObservable = Observable.create((ObservableOnSubscribe<Long>) e -> {
e.onNext(1L);
Thread.sleep(1000);
e.onError(new NullPointerException());
Thread.sleep(1000);
e.onNext(2L);
e.onComplete();
}).subscribeOn(Schedulers.newThread());
Observable<Long> longObservable1 = Observable.create((ObservableOnSubscribe<Long>) e -> {
e.onNext(3L);
Thread.sleep(1000);
e.onNext(4L);
Thread.sleep(1000);
e.onNext(5L);
e.onComplete();
}).subscribeOn(Schedulers.newThread());
Observable
.mergeDelayError(longObservable, longObservable1)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "aLong:" + aLong);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-13 10:03:26.701 16418-16418/... D/SplashActivity: in
02-13 10:03:26.711 16418-16463/... D/SplashActivity: aLong:1
02-13 10:03:26.711 16418-16464/... D/SplashActivity: aLong:3
02-13 10:03:27.721 16418-16464/... D/SplashActivity: aLong:4
02-13 10:03:28.711 16418-16464/... D/SplashActivity: aLong:5
02-13 10:03:28.711 16418-16464/... D/SplashActivity: onError
zip
- zip
必须严格遵守一一对应的关系
流程
observable | 元素 | 元素 | 元素 | 元素 |
---|---|---|---|---|
observable0 | 1 | 2 | 3 | 无 |
observable1 | 4 | 5 | onComplete | 6 |
zip合并(相加) | 5 | 7 | onComplete | 碰到onComplete标识,已经结束 |
最终的结果 5,7
Observable<Integer> observable = Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
});
Observable<Integer> observable1 = Observable.create(e -> {
e.onNext(4);
e.onNext(5);
e.onComplete();
e.onNext(6);
});
Observable
.zip(observable, observable1,
(integer, integer2) -> integer + integer2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-01 11:25:53.581 11233-11233/... D/ShellAppApplication: integer:5
02-01 11:25:53.581 11233-11233/... D/ShellAppApplication: integer:7
Observable<Integer> observable = Observable.create(e -> {
e.onError(new NullPointerException());
e.onNext(2);
e.onNext(3);
});
Observable<Integer> observable1 = Observable.create(e -> {
e.onNext(4);
e.onNext(5);
e.onComplete();
e.onNext(6);
});
Observable
.zip(observable, observable1,
(integer, integer2) -> integer + integer2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-13 10:34:54.131 17980-17980/... D/SplashActivity: onError
reduce
- reduce
对于同一个Observable当中的所有元素逐一处理,合并为同一个元素
流程
observable(元素1,元素2,元素3)
->
reduce
->
apply (元素1,元素2) -> 元素X
apply(元素X,元素3) -> (元素X +元素3) 作为新的元素X
递归,直至最后一个
->
one observable
->
subscribe
Disposable subscribe = Observable.just(1, 2, 3).reduce((integer, integer2) -> {
Log.d(TAG, "first:" + integer);
Log.d(TAG, "second:" + integer2);
return integer + integer2;
}).subscribe(integer -> Log.d(TAG, "integer:" + integer));
log
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: first:1
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: second:2
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: first:3
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: second:3
02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: integer:6
collect
- collect
收集
1.指定一个容器
2.将所有的元素收集到这个容器当中
Disposable subscribe = Observable.just(1, 2, 3)
.collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer integer) throws Exception {
list.add(integer);
}
})
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.d(TAG, "integers:" + integers);
}
});
log
02-13 10:49:49.061 18664-18664/... D/SplashActivity: integers:[1, 2, 3]
startWith
- startWith
在最开始的时候插入
1.支持插入单一元素
2.支持插入Observable
Observable.just(1, 2, 3)
.startWith(0)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:0
02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:1
02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:2
02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:3
02-13 10:53:59.151 18992-18992/... D/SplashActivity: onComplete
Observable.just(1, 2, 3)
.startWith(Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:4
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:5
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:6
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:1
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:2
02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:3
02-13 10:55:14.241 19238-19238/... D/SplashActivity: onComplete