什么是RxJava
在GitHub主页上的介绍:
a library for composing asynchronous and event-based programs using observable sequences for the Java VM
在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
本质上来说就是解决了原来繁琐的异步操作带来的代码不简洁,可读性非常低的问题,尤其在调度过程比较复杂的应用场景下。
RxJava 的原理简析
简单点来说 RxJava 的异步实现 就是观察者模式的通用实现,用过Android 事件处理的同学都知道 Android的事件处理模型就是基于观察者实现的。
<h5> RxJava Android 实现三步骤</h5>
//创建一个观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "Completed");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "Error");
}
@Override
public void onNext(String s) {
Log.i(TAG, s);
}
};
//使用Observable.create()创建被观察者
Observable observable1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Wrold");
subscriber.onCompleted();
}
});
//订阅
observable1.subscribe(observer);
Observable
create() 最基本的创造事件序列:
just(T...):
将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from(T[])
将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
just(T...) from(T[]) create(OnSubscribe) 创建出来的观察者都是等价的。
Subscribe
onStart() 在 subscribe 刚开始调用,可以用于做一些准备工作,例如数据的清零或重置。
onNext() 事件回调时调用 (相当于 onClick() / onEvent())
onCompleted() 事件队列完结触发onCompleted()方法。
onError() 事件队列异常。在事件处理过程中出异常时,onError()
在一个正确运行的事件序列中,onCompleted()和onError()有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()
和onError()二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
Action
Action 的出现 其实 是一种更简单观察者的实现方式 ,比如我们在处理业务逻辑时 只用了onNext,但是onError和onCompleted并没有用到,那其实另外两个是可以省略掉的。
Observable.just("One", "Two").subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
Scheduler
RxJava在不指定线程的情况下,发起时间和消费时间默认使用当前线程,所有就油了Scheduler(线程控制器),可以指定每一段代码在什么样的线程中执行。
Observable.just("Hello", "Word")
.subscribeOn(Schedulers.newThread())//指定 subscribe() 发生在新的线程
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
Schedulers.immediate()
直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread()
总是启用新线程,并在新线程执行操作。
Schedulers.io()
I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation()
计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
AndroidSchedulers.mainThread()
它指定的操作将在 Android 主线程运行。
Observable(map、flatMap)
Map
有点类似于Spring 里面的拦截器,我们先假设一个需求,需要打印多个球员的比赛数据。
Action1<List<Match>> action = new Action1<List<Match>>() {
@Override
public void call(List<Match> matches) {
for (int i = 0; i < matches.size(); i++){
Log.i(TAG, matches.get(i).getName());
}
}
};
Observable.from(personList)
.map(new Func1<Person, List<Match>>() {
@Override
public List<Match> call(Person person) {
return person.getMatchesList();
}
}).subscribe(action1);
flatMap
一般用于输出一个Observable,而其随后的subscribe中的参数也跟Observable中的参数一样,注意不是Observable,一般用于对原始数据返回一个Observable,这个Observable中数据类型可以是原来的,也可以是其他的
List<Person> persons = new ArrayList<Person>();
Action1<List<Match>> action1 = new Action1<List<Match>>() {
@Override
public void call(List<Match> matches) {
for (int i = 0; i < matches.size(); i++){
Log.i(TAG, matches.get(i).getName());
}
}
};
Observable.from(persons)
.map(new Func1<Person, List<Match>>() {
@Override
public List<Course> call(Person person) {
//返回coursesList
return person.getMatchesList();
}
}).subscribe(action1);
递归转换
其它处理方法
filter 就是对集合进行过滤
each就是遍历集合
take取出集合中的前几个
skip跳过前几个元素
unique相当于按照数学上的集合处理,去重