3.组合
(1) Merge
merge(Observable, Observable)将两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。你可以简单的将它理解为两个Obsrvable合并成了一个Observable,合并后的数据是无序的。
String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"};
Observable<String> merge = Observable.merge(Observable.from(array1),Observable.from(array2));
merge.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("main","输出结果:"+s);
}
});
结果输出:
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:kpioneer
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Tiger
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Cook
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Zhang
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Haocai
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:WangWu
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Zhangsan
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Lisi
08-08 08:50:12.518 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Luo
在Observable中,一旦某一个事件抛异常,后面的序列将会终止
注意:mergeDelayError如果你希望在序列中出错的时候,不影响后面的序列,那么可以使用mergeDelayError方法
(2) Zip
zip(Observable, Observable, Func2)用来合并两个Observable发射的数据项,根据Func2函数生成一个新的值并发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停在发射数据。
简单来说zip操作符就是合并多个数据流,
然后发送(Emit)最终合并的数据。
流程图:
String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"};
//1.合并算法自己决定
//2.序列长度有原始的最小数组的长度决定
Observable<String> zip = Observable.zip(Observable.from(array1),Observable.from(array2), new Func2<String, String, String>() {
@Override
public String call(String s, String s2) {
//这个Func是我们合并数组算法
return s+"--"+s2;
}
});
zip.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("main","输出结果:"+s);
}
});
结果输出:
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 输出结果:kpioneer--WangWu
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 输出结果:Tiger--Zhangsan
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 输出结果:Cook--Lisi
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 输出结果:Zhang--Luo
同时发现Haocai 那项没有输出 以最小数组为单位
String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"};
Integer[] array3 = { 100, 1000, 10000 };
Observable<String> zip = Observable.zip(Observable.from(array1),Observable.from(array2),Observable.from(array3), new Func3<String, String,Integer, String>() {
@Override
public String call(String s, String s2, Integer integer) {
return s+"---"+s2+"---"+integer;
}
});
zip.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("main","输出结果:"+s);
}
});
结果输出:
08-08 10:02:06.519 4330-4330/com.haocai.architect.rxjava E/main: 输出结果:kpioneer---WangWu---100
08-08 10:02:06.519 4330-4330/com.haocai.architect.rxjava E/main: 输出结果:Tiger---Zhangsan---1000
08-08 10:02:06.520 4330-4330/com.haocai.architect.rxjava E/main: 输出结果:Cook---Lisi---10000
(3) Join
前面两个方法,zip()和merge()方法作用在发射数据的范畴内,在决定如何操作值之前有些场景我们需要考虑时间的。RxJava的join()函数基于时间窗口将两个Observables发射的数据结合在一起。
join(Observable, Func1, Func1, Func2)我们先介绍下join操作符的4个参数:
Observable:源Observable需要组合的Observable,这里我们姑且称之为目标Observable;
Func1:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了源Obsrvable发射出来的数据的有效期;
Func1:接收目标Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了目标Obsrvable发射出来的数据的有效期;
Func2:接收从源Observable和目标Observable发射出来的数据,并将这两个数据组合后返回。
所以Join操作符的语法结构大致是这样的:onservableA.join(observableB, 控制observableA发射数据有效期的函数, 控制observableB发射数据有效期的函数,两个observable发射数据的合并规则)
String[] array1 = {"A","B","C"};
String[] array2 = {"1","2"};
Observable<String> observable1 = Observable.from(array1);
Observable<String> observable2 = Observable.from(array2);
Observable<String> join = observable1.join(observable2, new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(2,TimeUnit.SECONDS);
}
}, new Func2<String,String,String>() {
@Override
public String call(String o, String o2) {
return o +"---" +o2;
}
});
join.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("main","输出结果:"+s);
}
});
结果输出:
08-08 12:21:13.407 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:A---1
08-08 12:21:13.407 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:B---1
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:C---1
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:A---2
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:B---2
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:C---2
4.线程
(1)Scheduler
默认情况下,RxJava遵循线程不变原则。即:在哪个线程调用subscribe()方法,就在哪个线程生产事件,在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到Scheduler(调度器)。
在RxJava中,Scheduler相当于线程控制器,RxJava通过它来指定每一段代码运行在什么线程中。RxJava内置了几个Scheduler,适合大多数使用场景:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。
Android 专用的AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn()和 observeOn()两个方法来对线程进行控制了。
subscribeOn(): 指定subscribe()(订阅/注册)所发生的线程,即 Observable.OnSubscribe被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定Subscriber(观察者)所运行在的线程。或者叫做事件消费的线程。
文字叙述总归难理解,上代码:
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程(即 Action1对象)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
上面这段代码中,由于 subscribeOn(Schedulers.io())的指定,被创建的事件的内容 1、2、3、4将会在 IO 线程发出;而由于observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber数字的打印将发生在主线程 。事实上,这种在subscribe()之前写上两句 subscribeOn(Scheduler.io())和 observeOn(AndroidSchedulers.mainThread())的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。
已加载图片为例:
加载图片传统方式写法:
private ImageView iv_image;
private static String URL_STR ="http://pic36.nipic.com/20131203/3822951_101052690000_2.jpg";
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_scheduler);
iv_image = (ImageView)findViewById(R.id.iv_image);
loadImage();
}
//传统方式:下载图片
//方案一: Thread+Handler
//方案二: AsyncTask
private Bitmap download(String urlString) {
try {
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url
.openConnection();
InputStream inputStream = connection.getInputStream();
return BitmapFactory.decodeStream(inputStream);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private void loadImage(){
new DownloadTask().execute();
}
class DownloadTask extends AsyncTask<Void,Void,Bitmap>{
@Override
protected Bitmap doInBackground(Void... params) {
return download(URL_STR);
}
@Override
protected void onPostExecute(Bitmap bitmap) {
super.onPostExecute(bitmap);
iv_image.setImageBitmap(bitmap);
}
}
RxJava方式写法:
private static String URL_STR = "http://pic36.nipic.com/20131203/3822951_101052690000_2.jpg";
private ImageView iv_image;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_scheduler);
iv_image = (ImageView) findViewById(R.id.iv_image);
rxJavaLoadImage();
}
//RxJava实现下载
private void rxJavaLoadImage() {
//Url地址变换成Bitmap
//我们需要指定这些事件的执行所在的线程
Observable.just(URL_STR).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
return download(s);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())//需要引入RxAndroid库
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
//更新UI
iv_image.setImageBitmap(bitmap);
}
});
}
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。
注意:
1.在RxJava中整个流程分为事件的生产和消费
2.RxJava整体架构分为4个角色:Observable、Observer、Subscriber、Subjects
其中Observables和Subjects是两个“生产”实体,Observers和Subscribers是两个“消费”实体
subscribeOn和observeOn线程控制的区别?--线程切换
1.subscribeOn:指定生产事件所在的线程
observeOn :指定消费事件所在的线程
2.subscribeOn:按照顺序执行序列,作用于他前后的序列,直到遇到observeOn才切换新的线程
onserveOn :按照顺序执行序列,只能作用于他之后的序列
注意:subscribeOn可以在序列中调用(执行)多次,但是前提条件(在生产序列之前调用,在访问网络之前,我们需要初始化一些UI)
例子证明:
//RxJava实现下载
private void rxJavaLoadImage() {
//Url地址变换成Bitmap
//我们需要指定这些事件的执行所在的线程
Observable.just(URL_STR).doOnSubscribe(new Action0() {
@Override
public void call() {
// 在执行生产事件之前,回调方法,我们需要做一些初始化工作,而这个工作可以在子线程,也可以在主线程
// 更新UI必须在主线程
iv_image.setVisibility(View.VISIBLE);
// 第一步:初始化
Log.e("main", "当前线程状态 : " + Thread.currentThread().getName());
}
}).subscribeOn(AndroidSchedulers.mainThread()).observeOn(Schedulers.io()).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
Log.e("main", "当前线程状态 : " + Thread.currentThread().getName());
return download(s);
}
}).map(new Func1<Bitmap, Bitmap>() {
@Override
public Bitmap call(Bitmap t) {
Log.e("main", "当前线程状态 : " + Thread.currentThread().getName());
// 加水印、裁剪、灰度处理等等(图像处理相关)......
// 第三步:图像处理
return t;
}
}).observeOn(AndroidSchedulers.mainThread())//需要引入RxAndroid库
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
Log.e("main", "当前线程状态 : " + Thread.currentThread().getName());
//更新UI
iv_image.setImageBitmap(bitmap);
}
});
}
结果输出:
08-09 06:53:29.852 17616-17616/com.haocai.architect.rxjava E/main: 当前线程状态 : main
08-09 06:53:29.854 17616-17647/com.haocai.architect.rxjava E/main: 当前线程状态 : RxIoScheduler-2
08-09 06:53:29.983 17616-17647/com.haocai.architect.rxjava E/main: 当前线程状态 : RxIoScheduler-2
08-09 06:53:30.072 17616-17616/com.haocai.architect.rxjava E/main: 当前线程状态 : main
#其中main表示主线程, RxIoScheduler表示子线程