一、实现的思路
RxJava 提供了很多的变换操作符,将上游的数据转换成另一种数据,可以在传送数据流的过程中构造一个新的类,这个类即持有上游的 Observable 也持有下游的 Observer 和 变换的能力来实现
二、具体代码
1.创建 map 操作符
map 操作符本身持有 ObservableOnSubscribe 对象,这里将 ObservableOnSubscribe 交给一个 ObservableMap 让它持有了上游的能力。同时也持有了变换的能力
/**
* 自定义 map 操作符
* map 操作符会拿到 SelfObservableMap 所持有的 Observable 对象
* 交给一个 ObservableMap 对象 然后替换一个新的(持有 ObservableMap 这个 ObservableOnSubscribe 的引用,已经一个转换的函数)
* 即拥有控制上一层的能力 也拥有控制下一层的能力
* <p>
* 变换操作符只考虑上一层的类型 变换成新的类型后 给到下一层
*/
public <R> SelfObserverable<R> map(CFunction<? super T, R> function) {
ObservableMap observableMap = new ObservableMap(observable, function);
// 这里会把 source 替换成 ObservableMap
// 在 subscribe 时实际调用的 observable map 所持有的 source 的 subscribe 方案
return new SelfObserverable<>(observableMap);
}
2.ObservableMap 同时提供了 subscribe 函数,让它持有了下游的能力
/**
* 为 Map 专门定义的 ObservableOnSubscribe
* T 接收的类型 R 返回的类型
*
*/
public class ObservableMap<T,R> implements ObservableOnSubscribe<R> {
// 这里持有的是第一次通过 create 函数创建持有的 ObservableOnSubscribe 对象
private ObservableOnSubscribe observable;
private CFunction<? super T, ? extends R> function;
public ObservableMap(ObservableOnSubscribe source, CFunction<? super T, ? extends R> function) {
this.observable = source;
this.function = function;
}
/**
* observableEmitter 是外层通过 subscribe 函数传递进来的 Observer 类
* @param observer
*/
@Override
public void subscribe(Observer<? super R> observer) {
// SelfObserverable 调用 subscribe 函数时 实际上调用的是
MapObserver mapObserver = new MapObserver(observer, function);
// observable 是通过 create 或者 just 传递进来的 observable,现在交给了它一个 MapObserver
/**
* observable ----> create 的 new ObservableOnSubscribe
* // 使用 Map 操作符
* SelfObserverable.create(new ObservableOnSubscribe<Integer>() {
* @Override
* public void subscribe(Observer<? super Integer> observableEmitter) {
* observableEmitter.onNext(1);
* observableEmitter.onComplete();
* }
* })
*
* 这个 observable.subscribe 调用的就是 ObservableOnSubscribe 的 subscribe 函数的方法。
* 会触发 MapObserver 类的 onNext 和 onComplete
*/
observable.subscribe(mapObserver);
}
3.在 subscribe 时又对 observer 做了进一步的包装,让被观察者 ObservableOnSubscribe 持有了 MapObserver 这个新的观察者类。在调用 onNext 时首先调用 CFunction 函数进行一次数据转换
/**
* 对 Observer 的包装
*
* @param <T>
*/
class MapObserver<T> implements Observer<T> {
Observer<? super R> observableEmitter;
ObservableOnSubscribe source;
CFunction<? super T, ? extends R> function;
/**
*
* @param observableEmitter 通过 subscribe 传递进来的观察者
* @param function function 是负责转换的函数
*/
public MapObserver(Observer<? super R> observableEmitter,
CFunction<? super T, ? extends R> function) {
this.observableEmitter = observableEmitter;
this.function = function;
}
@Override
public void onSubscirbe() {
observableEmitter.onSubscirbe();
}
@Override
public void onNext(T value) {
// 将转换后的值交给 onNext
R next = function.apply(value);
observableEmitter.onNext(next);
}
@Override
public void onComplete() {
observableEmitter.onComplete();
}
@Override
public void onError(Throwable throwable) {
observableEmitter.onError(throwable);
}
}
四、流程示例
五、使用示例
// 使用 Map 操作符
SelfObserverable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observableEmitter) {
observableEmitter.onNext(1);
observableEmitter.onComplete();
}
}).map(new CFunction<Integer, String>() {
@Override
public String apply(Integer integer) {
return "我被转换啦";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscirbe() {
Log.d("TAG", "on subscribe with map");
}
@Override
public void onNext(String value) {
Log.d("TAG", "我被转换啦");
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable throwable) {
}
});