严格来说,create应不算操作符,但是第一篇文章我还是希望能先以最简单的入门。这样再去学习接下来的操作符的话会更加简单,由浅入深。
来段小代码
class ObservaleActivity : AppCompatActivity() {
var sub: Subscription? = null
var observable: Observable<String>? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_common)
btSub.setOnClickListener({
/**
* 暂时都不考虑 onComplete onError 只做onNext的处理,由浅入深
*/
observable?.subscribe({ msg ->
tvContent.text = tvContent.text.toString() + "\n" + msg
})
})
/**
* 最简单的使用方式
*/
observable = Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test1")
}
})
}
//这里需要去解除注册,防止内存泄漏
override fun onDestroy() {
super.onDestroy()
sub?.unsubscribe()
}
}
这里直接先贴出整个Activity
的代码,给大家一个印象,在以后的话就不再直接贴全代码,而是贴关键代码。
注意:一般来说,是需要在onDestroy的时候去unsubscribe,防止内存泄漏
先了解重要的接口和类
在阅读源码的时候,我get到了几个重要的方法
1. 不需要每行都去搞懂
2. 抓住最重要的类和接口去搞懂
这里比较重要的接口和类分别是:
接口或类名 | |
---|---|
OnSubscribe |
实现了Action接口,只有一个call方法,在subscribe时候才会去调用 |
Subscriber |
调用subscribe所传的参数,里面是onNext,onComplete,onError等方法 |
Observable |
所有的操作符都在这个类里面,我认为最重要的其实是OnSubscribe和Subscriber,而Observable其实是作为一个桥梁来链接这2者 |
Subscription |
这个是调用subscribe方法的返回值,用来取消这次订阅 |
我阅读RxJavau 源码的时候,最让我头疼的一个地方其实就是它的命名,很多名字起的都起的差不多,所以搞得很乱。
这里我专门提出这4个接口或类,只要能把这几个理清楚,其实就差不多。下面我们深入源码去理解。
看看源代码
Observable.create方法
Observable
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
RxJavaHooks
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
这2段代码涉及到了刚才提到的Observable
,OnSubscribe
。
Observable.create(OnSubscribe)
其实只是new了一个Observable然后把OnSubscribe作为参数传入,这里的RxJavaHooks大家可以先忽略,这个主要是做全局处理使用的,如果没有全局设置的话,onObservableCreate = null
,会直接在第二个方法直接返回你传入的OnSubscribe参数。
Observable.subscribe方法
Observable
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
subscribe重载方法比较多,我把我们demo中涉及到的3个重载列出。
在demo中我们使用的是直接传入了一个Action
对象处理了onNext的事件,在源码中,Observable会把我们的Action
包装成一个Subscriber
对象,添加上onError
,onComplete
等方法的默认实现。
最终调用的其实是subscribe
的第三个重载方法。
首先判断下参数Subscriber
对象是否是SafeSubscriber
对象的子类,如果不是,那么就重新new一个SafeSubscriber
替换掉当前Observable
对象的subscriber
变量。
注意:这里的
SafeSubscriber
大家可以先忽略,我们先看完完整的流程,在最后再为大家来看看SafeSubscriber
最重要的代码
...
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
...
RxJavaHooks
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
这里又出现了RxJavaHooks
,大家可以直接忽略看最后返回的参数是什么,直接返回的是传入的参数OnSubscribe
对象。这里的RxJavaHooks
也是用来做全局处理的作用,所以我们可以先忽略,简单的来看整个流程。
所以上面所提到的最终的代码,其实可以简化为
observable.onSubscribe.call(subscriber)
大家可以看到,其实这就是最关键的代码,也可以看出 Observable
这个对象的作用,其实就是用来链接OnSubscribe
和Subscriber
对象。
那么我们重新来看看Activity
中的关键的代码
...
observable?.subscribe({ msg ->
tvContent.text = tvContent.text.toString() + "\n" + msg
})
...
observable = Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test1")
}
})
...
应该对整个流程有一个简单的认识了。
附加知识点
SafeSubscriber
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
...
}
}
}
@Override
public void onError(Throwable e) {
if (!done) {
done = true;
_onError(e);
}
}
@Override
public void onNext(T t) {
try {
if (!done) {
actual.onNext(t);
}
...
}
主要作用: 对原有的
Subscriber
对象包装一下。前面刚学RxJava的时候,看到有些文章说,onError
和onComplete
只有一个会运行,其实从这里可以看出。
总结
RxJava
中核心的设计模式我认为是装饰者模式
,所以在阅读源码的时候也非常的累,因为RxJava
的链式结构,大家可以想象下如果4,5个操作符链式书写,然后一步步跟入源码会很难受的,完全不知道当前是哪个OnSubscribe
和Subscriber
。
当然只要我们能搞清楚1层结构的流程,那么后面的3,4层结构也能理解,只是稍微麻烦点而已。