上一篇文章 写了一个极其简化的 Rxjava Observable ,现在,我试图添加一个 map
操作符。
public <R> Observable<R> map(Func1<T, R> func1) {
return new Observable<R>(subscriber -> this.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(T t) {
R r = null;
try {
r = func1.call(t);
} catch (Throwable e) {
unsubscribe();
return;
}
subscriber.onNext(r);
}
@Override
public void unsubscribe() {
subscriber.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return subscriber.isUnsubscribed();
}
}));
}
Java 本身语言限制,导致代码臃肿。代码的核心部分就是
return new Observable<R>(subscriber -> this.subscribe(new Subscriber<T>() {
@Override
public void onNext(T t) {
R r = null;
try {
r = func1.call(t);
} catch (Throwable e) {
unsubscribe();
return;
}
subscriber.onNext(r);
}
}
这里看到以下几点
-
map
接收两个参数 ,注,对于成员函数,第一个参数是this
。 - 第一个参数是
Observable<T> this
, - 第二个参数是
Func1<T,R> func1
;-
func1
接收一个参数T
-
func1.call(t)
返回一个R
-
-
map
要返回一个Observable<R>
,那么就要在OnSubscribe
的时候,需要从this
里面得到一个个T t
,然后用func1.call(t)
,然后转移给下一个subscriber
。
因为 Java8 lambda 关键字的引入,我们看到函数式编程中的 variable capture 的强大。
这是一个非常简化的 map
实现,还有很多问题。
- 还有非常多的操作符和
map
很类似,这里有很多重复代码。 - backpressure 没有处理。
-
unsubscribe
还没有处理好。subscriber 链的关系没有处理。 - 异常也没有处理好。
- 没有保证
onComplete
只被调用一次 。
这个简化的实现尽管有很多问题,但是可以帮助我们理解原有复杂完整的实现。Map
的核心结构是这样
- 本身含有一个
Subscriber
对象,订阅上层的Observable
- 返回一个
Observable
对象,提供给下层订阅。 - 这种方法组合了
Observable
,构成了一个链条。
OnSubscribe<?> onSubscribe = subscriber /*传递进来的 subscriber 参数,给下层产生数据*/
-> {
/* this 是上层的 Observable,订阅上层 */
this.subscribe(new Subscriber<T>() {
@Override
public void onNext(T t) {
R r = null;
try {
r = func1.call(t);
} catch (Throwable e) {
unsubscribe();
return;
}
/* 当上层产生数据的时候,经过转换,传递给下层*/
subscriber.onNext(r);
}
};
Observable<R> ret = new Observable<R>(onSubscribe);
return ret;
}