最近再系统的整理了一下RxJava,感觉挺好用的,因为它,GitHub 上衍生了一堆比如 RxAndroid、RxBus、RxPermission 等之类的开源库。下面写写整理出来的文档。
另外使用场景文章Android RxJava之葵花宝典(下)(看我就够了)---使用场景
一、RxJava的介绍
1、RxJava是什么
在讲RxJava之前,先了解一下相关术语
响应式编程:一种面向数据流和变化传播的编程范式
不懂?那举个简单的例子,界面上的按钮,点击的时候会触发按钮的写好的点击事件。 我们不知道按钮什么时候会被点到,但是点到了就会通知app去触发你写好的事件,这个 “通知” 的过程就是RxJava的工作。
观察者模式:
观察者模式的基本需求:观察者和被观察者之间是完全分离的,当被观察者的状态发生变化之后,
通过Register(注册)
或者 Subscribe(订阅)
的方式,通知观察者。
RxJava 是一种函数式、响应式的异步操作库,它让你的代码更加简洁。
二、RxJava的重点基础
1、Observable
Observable [əb'zɜːvəbl]
可观察量,可观察量,可观测的。在观察者模式中称为“被观察者”
或“可观察对象”
。
//OnSubcriber是实现了一个Acton1接口的接口。
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//每个Observable有一个final OnSubscribe<T> onSubscribe 成员,
//在该成员方法中调用call()方法,这个call方法的参数
//就是 Observable.subscribe() 方法传入的 Subsriber实例。
}
});
注意:在Rxjava中ActionX系列,其实就是无返回值的的接口
2、Observer
接收源,是观察者模式中的“观察者”
,可接收Observable、Subject
发射的数据。
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
//告知Observable没有更多的数据了,即没有新的onNext()发出时,就执行onCompleted()。
}
@Override
public void onError(Throwable e) {
//在事件处理过程中,出现了异常或者错误,就会被触发,同时整个队列将被终止,不再有事件发出。
}
@Override
public void onNext(String s) {
//实现方法跟Subscriber一模一样
}
};
在一个队列中,onCompleted()和onError() 都是最后触发的,而且两者中只有一个会被触发。
3、Subscriber
“订阅者”,也是接收源,那它跟Observer
有什么区别呢?Subscriber
实现了Observer
接口,比Observer
多了一个最重要的方法unsubscribe( )
,用来取消订阅
,当你不再想接收数据了,可以调用unsubscribe( )方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源。
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
//实现方法跟Observer一模一样
}
@Override
public void onStart() {
// 它会在 Subscribe 刚开始,而事件还未发送之前被调用,
// 可以用于做一些准备工作,例如数据的清零或重置
super.onStart();
}
};
--------------------------------
//这是 Subscriber 所实现的另一个接口 Subscription 的方法.
//用于取消订阅。
subscriber.unsubscribe();
//这是 Subscriber 所实现的另一个接口 Subscription 的方法.
//用于判断当前是否订阅。
subscriber.isUnsubscribed();
4、Subscription
订阅。Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;
//被观察者
Observable<Integer> observableInteger = Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
//观察者
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
//进行订阅
Subscription subscription = observableInteger.subscribe(subscriber);
Log.d(TAG, "subscription: " + subscription.isUnsubscribed() + ",Observable:" + subscriber
.isUnsubscribed());
Observable.from()
List<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
integers.add(4);
integers.add(5);
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.from(integers).subscribe(subscriber);
Observable.just()
将传入的参数依次发射出去。
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just(1,2,3,4,5).subscribe(subscriber);
just()中可以传入1-10个参数,并且将传入参数的顺序来发射出去。
5、Subject
是一个比较特殊的对象,既可充当发射源,也可充当接收源。
源码如下
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
//这个Subject即实现了Observer,又继承了Observable
...
}
所以 Subject = Observable + Observer
RxJava针对不同的场景提供四种不同的Subject
- PublishSubject
订阅之后的数据全部被发射。 - BehaviorSubject
订阅之前的一个和订阅之后的全部数据被发射 - ReplaySubject
不论订阅所处任何位置,都将发射全部数据 - AsyncSubject
不论订阅所处任何位置,只会发射最后一个数据
PublishSubject
PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
PublishSubject<String> publishSubject = PublishSubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: PublishSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: PublishSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
publishSubject.onNext("one");
publishSubject.onNext("two");
publishSubject.subscribe(subscriber);
publishSubject.onNext("three");
---------------------------------------
//打印结果
onNext: three
BehaviorSubject
当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: BehaviorSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: BehaviorSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
behaviorSubject.subscribe(subscriber);
behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.onNext("three");
--------------------------------------------
onNext: default
onNext: one
onNext: two
onNext: three
如果把 behaviorSubject.subscribe(subscriber);放在倒数第二行
behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.subscribe(subscriber);
behaviorSubject.onNext("three");
---------------------------------
//打印结果
onNext: two
onNext: three
看到结果,我们不难看出,其实上面所说的发射最近所发射的数据,其实就是以
behaviorSubject.subscribe(subscriber);
为界,这句代码之前的一个和之后的所以发射。
当然,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
ReplaySubject
ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。
ReplaySubject<String> replaySubject = ReplaySubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ReplaySubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ReplaySubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
replaySubject.subscribe(subscriber);
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
--------------------------
//打印
onNext: one
onNext: two
onNext: three
AsyncSubject
当Observable完成时AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。
AsyncSubject<String> asyncSubject = AsyncSubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: AsyncSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: AsyncSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
};
Subscription subscription = asyncSubject.subscribe(subscriber);
asyncSubject.onNext("one!");
asyncSubject.onNext("two!");
asyncSubject.onNext("three!");
asyncSubject.onCompleted();
-----------------------------------
//打印结果如下
onNext: three!
onCompleted: AsyncSubject Completed!
当然如果原始Observable没有发射任何值,AsyncObject也不发射任何值
AsyncSubject会把最后一个值发射给后续的观察者。
请注意:如果在AsyncSubject异常时,那么不会向观察者发射任何值,只会传递一个错误的通知。
6、Action0
RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2...Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推。
7、Func0
与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1...Func9;
三、RxJava操作符
Scheduler(调度器)
subscribeOn()指定:Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。
ObserveOn()指定:一个Observable在一个特定的调度器上调用观察者的onNext, onError和onCompleted方法,
Subscriber subcriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriber);
上面这段代码中,由于指定了1,2,3,4发射代码为Schedulers.io(),那么发射数据就将在io线程中执行。而onNext, onError和onCompleted则将在主线中执行。
Operators(操作符)
RxJava提供了几个mapping函数:map(),flatMap(),concatMap(),flatMapIterable()以及switchMap().所有这些函数都作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。
map
map 是用于变换的一个操作符,这在RxJava中占据了一定的地位,就是因为它的变换操作。
Subscriber subcriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
})
.subscribe(subcriber);
在上面的代码中,我通过map将字符串转化成了整形的1,2,3,4,返回一个Observable的对象。
请注意:这个操作符默认不在任何特定的调度器上执行。
flatmap
Subscriber subcriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(String s) {
return Observable.just(Integer.parseInt(s)+1);
}
})
.subscribe(subcriber);
-------------------------------------------
//打印
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onCompleted: Completed!
从上面我们可以看出,map与flatmap很相似,都是用的Func1,而且模式都是<I,O>模式,即是:I转换成O并返回。但是最大的不同点在于:我们flatmap的输出类型是Observable的类型。
在这里请注意一个问题:在执行flatmap中返回之后(O输出返回的Observable),并不是立马把返回的Observable通过Subscribe进行订阅,而是将返回的若干Observables都交给同一个Observable,然后再进行subscribe。
所以,在上面我们先将字符串"1","2", "3", "4" 分别转换成一个整形的Observable类型,即是:Observable(2),Observable(3),Observable(4),Observable(5)
。然后将这些个Observables统一转换成一个Observable,再进行subscribe。
那么,这个flatmap到底有何用呢?可以用在什么地方呢?
假设这样一种情景:一个学校的老师我们定义为一个集合A,每个老师包括了个人信息和所教课程,一个老师不可能只教授一门课程,所以我们将老师所教授课程定义为集合B。如果让你打印每个老师所教课程,该怎么做?
Teacher[] teachers = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(teachers)
.flatMap(new Func1<Teacher, Observable<Course>>() {
@Override
public Observable<Course> call(Teacher teacher) {
return Observable.from(teacher.getCourses());
}
})
.subscribe(subscriber);
最后再补充一点:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。这意味着flatMap()函数在最后的Observable中不能够保证源Observables确切的发射顺序。
ConcatMap
RxJava的concatMap()函数解决了flatMap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。
repeat
让你发射的数据重复发射
Subscriber subcriber = new Subscriber<Integer>() {
...
}
};
Observable.just("1", "2","3")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(String s) {
return Observable.just(Integer.parseInt(s)+1);
}
})
.repeat(3)
.subscribe(subcriber);
--------------------------------------------
//打印输出
onNext: 2
onNext: 3
onNext: 4
onNext: 2
onNext: 3
onNext: 4
onNext: 2
onNext: 3
onNext: 4
onCompleted: Completed!
range
从起始点开始发射数据
Subscriber subcriber = new Subscriber<Integer>() {
...
};
Observable.range(10,3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriber);
结果为:10,11,12。range(10,3),其中10 是起始,3是数量。
interval
在需要轮询的时候是最好的选择
Observable.interval(3,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
...
});
interval()函数的两个参数:一个指定两次发射的时间间隔,另一个是用到的时间单位。
take
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(4)
.subscribe(new Subscriber<Integer>() {
...
});
------------------------------
//打印输出
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
TakeLast
如果我们想要最后N个元素,我们只需使用takeLast()函数:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.takelast(2)
.subscribe(new Subscriber<Integer>() {
...
});
--------------------------------------
//打印输出
Next: 7
Next: 8
Sequence complete.
五、RxJava的使用配置
1.build.gradle 添加依赖包
dependencies {
compile fileTree(include: ['*.jar'], dir: 'libs')
androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
exclude group: 'com.android.support', module: 'support-annotations'
})
compile 'com.android.support:appcompat-v7:25.1.1'
testCompile 'junit:junit:4.12'
/* 响应式*/
compile 'io.reactivex:rxjava:1.1.6'
// RxJava
compile 'io.reactivex:rxandroid:1.2.1'
/*retrofit*/
compile 'com.squareup.retrofit2:retrofit:2.2.0'//这个
// 如果要是用Gson转换的话,需要添加这个依赖
compile 'com.squareup.retrofit2:converter-gson:2.2.0'//这个
compile 'com.squareup.retrofit2:adapter-rxjava:2.2.0'//这个
compile 'com.squareup.okhttp3:okhttp:3.7.0'
}
2.build.gradle 支持JDK1.8(是为了用到Lambda表达式语言,这样可以更便捷的编程)
android {
...
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
如果出现以下提示,则在在build.gradle文件的defaultConfig中添加以下代码
//如果报错
Error:Jack is required to support java 8 language features. Either enable Jack or remove sourceCompatibility JavaVersion.VERSION_1_8.
android {
defaultConfig {
//添加以下代码
jackOptions {
enabled true
}
}
}
3.build.gradle 添加表达式语言的插件。
apply plugin: 'me.tatarka.retrolambda'
如果这里报错Error:(2, 0) Plugin with id 'me.tatarka.retrolambda' not found.还需要在项目的build.gradle中的dependencies节点中添加
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.2.2'
classpath 'me.tatarka:gradle-retrolambda:2.5.0'//添加这里
// NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files
}
}