...
Single.just(1).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.single())
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String value) {
}
@Override
public void onError(Throwable e) {
}
});
...
Single.just(1)主要是返回SingleJust然后继续点击查看map源码其主要放回的是new SingleMap<T, R>(this, mapper)其中this是Single.just(1)返回的,然后继续subscribeOn()其返回的是SingleSubscribeOn,同样实例化的时候都是把上游传进去,observeOn返回的是new SingleObserveOn<T> 同样也是把上游传进去:如图
然后我来说一下为什么subscribeOn为什么影响上游的线程,看源码知道他是在@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
可以看到她只是做了两件事情一个是通知下游订阅开始了onSubscribe
另一个是切换到我们指定的线程中去
public void run() {
source.subscribe(this);
}
可以看到其是在指定的线程中订阅的。
observeOn为什么只能影响下游了,看源码可知
他不是在订阅的时候切换线程而是在返回消息的时候切换线程
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}