关于合并数据源:之前使用了flatMap()以及concatMap()进行嵌套调用,,注册之后登陆
合并数据源2:合并数据(获取图书详情以及评论)统一展示到客户端:采用merge()或者zip()操作符
merge()例子:实现较为简单的从(网络+本地)获取数据,,统一展示
zip()例子:结合
Retrofit
以及Rxjava,实现多个网络请求合并获得数据,,统一展示
二者区别为:merge()只添加被观察者合并数据源的操作在observable观察者的onnext()里面处理,进行合并,合并的结果在
onComplete()
处理,zip()可以直接添加发射者,再添加合并数据源的bean,在转主线程,订阅,可以使用new Consumer<Bean>() )
里面处理合并结果
/**
* 合并发射者,按时间线执行
* 合并事件,还是merge()比较方便好用
*/
String resultss = "数据源来自:";
private void merge() {
// Observable.merge(
// //延迟发送操作符
// //从0开始发送,发送3个数据,第一次发件延迟时间1秒。间隔时间1s
// //
// Observable.intervalRange(0,3,1,1,TimeUnit.SECONDS),
// Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS)
// ).subscribe(aLong -> {
//
// });
Observable.merge(
Observable.just("网络"),
Observable.just("本地文件")
).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
resultss += s;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
KLog.d(TTAG, "接收完成统一处理事件:" + resultss);
}
});
}
下面使用zip操作:
/**
* 合并数据源
*/
private void zip() {
Observable.zip(
retrofitApi.getCall().subscribeOn(Schedulers.io()),
retrofitApi.getCall().subscribeOn(Schedulers.io()),
(translation, translation2) ->
translation.toString() + translation2.toString())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
KLog.d(TTAG, "合并的数据源是:" + s.toString());
}, throwable -> {
});
}
concat()实例
/**
* 该类型的操作符的作用 = 组合多个被观察者
* 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
* concat()
* concatArray()
*
* 实例:从内存以及磁盘和网络获取缓存
*/
String memoryCache = null;
String diskCache = "磁盘缓存数据";
private void concat() {
Observable.concat(
Observable.create(emitter -> {
//判断内存是否含有缓存
if (null == memoryCache) {
emitter.onComplete();
} else {
emitter.onNext(memoryCache);
}
}),
Observable.create(emitter -> {
//判断磁盘
if (null == diskCache) {
emitter.onComplete();
} else {
emitter.onNext(diskCache);
}
}),
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("从网络获取缓存数据");
})
//通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件),即依次判断检查memory、disk、network
).firstElement()
// 即本例的逻辑为:
// a. firstElement()取出第1个事件 = memory,即先判断内存缓存中有无数据缓存;由于memoryCache = null,即内存缓存中无数据,所以发送结束事件(视为无效事件)
// b. firstElement()继续取出第2个事件 = disk,即判断磁盘缓存中有无数据缓存:由于diskCache ≠ null,即磁盘缓存中有数据,所以发送Next事件(有效事件)
// c. 即firstElement()已发出第1个有效事件(disk事件),所以停止判断。
.subscribe(s -> {
KLog.d(TTAG, "缓存获得路径是:" + s.toString());
});
}
combineLatest()实例
进行多个输入框判断,有一个为空时按钮不可点击,都不为空时才可以点击(并且改变输入框颜色)
/**
* 通过combineLatest()合并事件 & 联合判断
* <p>
* 当两个Observables中的任何一个发送了数据后,
* 将先发送了数据的Observables 的最新(最后)一个数据 与
* 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
*/
private void init() {
nameObser = RxTextView.textChanges(name).skip(1);
ageObser = RxTextView.textChanges(age).skip(1);
jobObser = RxTextView.textChanges(job).skip(1);
Observable.combineLatest(nameObser, ageObser, jobObser,
(charSequence, charSequence2, charSequence3) -> {
boolean nameIsNOtEmpty = !TextUtils.isEmpty(name.getText());
// boolean nameIs = !TextUtils.isEmpty(name.getText()) && name.getText().toString().length() <= 10;
boolean ageIsNotEmpty = !TextUtils.isEmpty(age.getText());
boolean jobIsNotEmpty = !TextUtils.isEmpty(job.getText());
return nameIsNOtEmpty && ageIsNotEmpty && jobIsNotEmpty;
}
).subscribe(aBoolean -> {
KLog.d(TTAG, "点击结果是:" + aBoolean);
push.setEnabled(aBoolean);
});
}
有条件的轮询操作:
使用关键字:repeatWhen
// 设置变量 = 模拟轮询服务器次数
private int i = 0 ;
/**
* 有条件的轮询
* 使用操作符:repeatWhen
*/
private void init3() {
RetrofitApi retrofitApi = OkHttpUtils.newInstance().create(RetrofitApi.class);
retrofitApi.getCall()
.repeatWhen(objectObservable -> {
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
// 以此决定是否重新订阅 & 发送原来的 Observable,即轮询
// 此处有2种情况:
// 1. 若返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
// 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询
return objectObservable.flatMap((Function<Object, ObservableSource<?>>) o -> {
// 加入判断条件:当轮询次数 = 5次后,就停止轮询
if (i>3){
return Observable.error(new Throwable("轮询结束"));
}
// 若轮询次数<4次,则发送1Next事件以继续轮询
// 注:此处加入了delay操作符,作用 = 延迟一段时间发送(此处设置 = 2s),以实现轮询间间隔设置
return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
});
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
translation.show();
i++;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
有条件的网络请求出错,重试,可以设置条件
/**
* 请求出错去重复查询,可以设置条件
* 使用操作符:retryWhen
* 发送网络请求 & 通过retryWhen()进行重试
* 主要异常才会回调retryWhen()进行重试
* 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
*/
// 设置变量
// 可重试次数
private int maxConnectCount = 10;
// 当前已重试次数
private int currentRetryCount = 0;
// 重试等待时间
private int waitRetryTime = 0;
private void init4() {
retrofitApi.getCall()
.retryWhen(throwableObservable ->
throwableObservable.flatMap((Function<Throwable, ObservableSource<?>>) throwable -> {
if (throwable instanceof IOException) {
if (currentRetryCount < maxConnectCount) {
currentRetryCount++;
waitRetryTime = 1000 + currentRetryCount * 1000;
return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
} else {
return Observable.error(new Throwable("超过重试次数:" + currentRetryCount));
}
} else {
return Observable.error(new Throwable("发生异常,非网络"));
}
}))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
translation.show();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
有关过滤操作符
ofType
/**
* 过滤操作符
*/
private void useOfType() {
Observable.just(1, "asd", 2, 3, 4, "qwe")
.ofType(Integer.class)
.subscribe(integer -> {
KLog.d(TTAG, "获得的整型消息事件是:" + integer);
});
}
Skip,,,SkipLast
/**
* 跳转开头和跳过结尾消息
*/
private void userSkipAndSkipLast() {
// 使用1:根据顺序跳过数据项
Observable.just(1, 2, 3, 4, 5)
.skip(1) // 跳过正序的前1项
.skipLast(2) // 跳过正序的后2项
.subscribe(integer -> KLog.d(TTAG, "获取到的整型事件元素是: " + integer));
// 使用2:根据时间跳过数据项
// 发送事件特点:发送数据0-5,每隔1s发送一次,每次递增1;第1次发送延迟0s
Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
.skip(1, TimeUnit.SECONDS) // 跳过第1s发送的数据
.skipLast(1, TimeUnit.SECONDS) // 跳过最后1s发送的数据
.subscribe(along -> KLog.d(TTAG, "获取到的整型事件元素是: " + along));
}
throttleFirst(),,throttleLast()
在某段时间内,只发送该段时间内第1次事件 / 最后1次事件
<<- 在某段时间内,只发送该段时间内第1次事件 ->>
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// 隔段事件发送时间
e.onNext(1);
Thread.sleep(500);
e.onNext(2);
Thread.sleep(400);
e.onNext(3);
Thread.sleep(300);
Thread.sleep(300);
e.onComplete();
}
}).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用数据
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<<- 在某段时间内,只发送该段时间内最后1次事件 ->>
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// 隔段事件发送时间
e.onNext(1);
Thread.sleep(500);
e.onNext(2);
Thread.sleep(400);
Thread.sleep(300);
e.onComplete();
}
}).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用数据
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
KLog.d(TTAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
KLog.d(TTAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
K Log.d(TTAG, "对Complete事件作出响应");
}
});
实际应用:规定时间内,多次点击按钮禁止多次操作使用throttleFirst,操作符
RxView.clicks(button)
.throttleFirst(2, TimeUnit.SECONDS) // 才发送 2s内第1次点击按钮的事件
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Sample()实例应用实时搜索
在某段时间内,只发送该段时间内最新(最后)1次事件,与 throttleLast() 操作符类似
throttleWithTimeout () / debounce()
发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据
RxTextView.textChanges(ed)
.debounce(1, TimeUnit.SECONDS)
.skip(1) //跳过 第1次请求 = 初始输入框的空字符状态
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<CharSequence>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(CharSequence charSequence) {
tv.setText("发送给服务器的字符 = " + charSequence.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
firstElement() ,, lastElement()
仅选取第1个元素 ,,最后一个元素
// 获取第1个元素
Observable.just(1, 2, 3, 4, 5)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept( Integer integer) throws Exception {
KLog.d(TTAG,"获取到的第一个事件是: "+ integer);
}
});
// 获取最后1个元素
Observable.just(1, 2, 3, 4, 5)
.lastElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept( Integer integer) throws Exception {
KLog.d(TTAG,"获取到的最后1个事件是: "+ integer);
}
});
elementAt()
指定接收某个消息,根据索引,可以设置默认消息
private void userEleMentAt() {
// 使用1:获取位置索引 = 2的 元素
// 位置索引从0开始
Observable.just(1, 2, 3, 4, 5)
.elementAt(2)
.subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));
// 使用2:获取的位置索引 > 发送事件序列长度时,设置默认参数
Observable.just(1, 2, 3, 4, 5)
.elementAt(6,10)
.subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));
}
elementAtOrError()
在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常
private void userElementAtOrError() {
Observable.just(1, 2, 3, 4, 5)
.elementAtOrError(6)
.subscribe(integer -> KLog.d(TTAG,"获取到的事件元素是: "+ integer));
}