之前看了下RxLifecycle的实现,短短几行代码,屌到不行。其中核心实现就是takeUntil操作符,然后我就想到这个操作符的一些应用场景,例如:从网络下载序列帧zip包 > 保存本地 >解压> 转换成序列帧drawable > ui界面播放。那么当页面关闭的时候,中断这个任务链,其实调用到>解压是我想要的。于是乎我想到takeUntil这个到底放在任务链的哪部分比较合适呢?
demo走起
//创建subject,也就是监听的observable
final BehaviorSubject<String> subject = BehaviorSubject.create();
//对数据进行过滤
final Observable<String> ob = subject.filter(new Predicate<String>() {
@Override
public boolean test(@NonNull String s) throws Exception {
return s.equals("stop");
}
});
//创建ObservableTransformer,当监听subject成功发射数据,那么Disposable 源upstream。
ObservableTransformer<String,String> takeUntilTransformer = new ObservableTransformer<String,String>() {
@Override
public ObservableSource<String> apply(@NonNull Observable<String> upstream) {
return upstream.takeUntil(ob);
}
};
//开始发射数据进行测试,其中compose(takeUntilTransformer)会放在2个doNext的前中后3个位置,还有subscribe之前4种情况,然后看log
Observable.just("data").compose(takeUntilTransformer).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,"第一个doOnNext---睡眠");
Thread.sleep(5000);
Log.d(TAG,"第一个doOnNext---醒来");
}
}).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG,"第二个doOnNext---");
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
Log.d(TAG,"subscribe---onNext---");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG,"subscribe---onComplete---");
}
});
//2秒后发送终止
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG,"发射-stop");
subject.onNext("stop");
}
});
第一种情况compose(takeUntilTransformer)在发射数据之后
第一个doOnNext---睡眠
2秒后
发射-stop
3秒后
第一个doOnNext---醒来
第二个doOnNext---
subscribe---onNext---
subscribe---onComplete---
第二种情况compose(takeUntilTransformer)放在2个doNext()中间
第一个doOnNext---睡眠
2秒后
发射-stop
subscribe---onComplete---
3秒后
第一个doOnNext---醒来
第三种情况compose(takeUntilTransformer)放在2个doNext()之后
第一个doOnNext---睡眠
2秒后
发射-stop
subscribe---onComplete---
3秒后
第一个doOnNext---醒来
第二个doOnNext---
第四种情况compose(takeUntilTransformer)放在subscribe之前
第一个doOnNext---睡眠
2秒后
发射-stop
subscribe---onComplete---
好吧,4种情况打印了4种不同的结果。先看下takeUntil的实现吧
//老套路,又是返回一个Observable的实现类
public final <U> Observable<T> takeUntil(ObservableSource<U> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new ObservableTakeUntil<T, U>(this, other));
}
ObservableTakeUntil
public final class ObservableTakeUntil<T, U> extends AbstractObservableWithUpstream<T, T> {
final ObservableSource<? extends U> other;
public ObservableTakeUntil(ObservableSource<T> source, ObservableSource<? extends U> other) {
super(source);
this.other = other;
}
@Override
public void subscribeActual(Observer<? super T> child) {
//当下游订阅开始后,到这里child变成了SerializedObserver ,它是用来处理并发的,最后说
final SerializedObserver<T> serial = new SerializedObserver<T>(child);
//存放上游的Disposable
final ArrayCompositeDisposable frc = new ArrayCompositeDisposable(2);
final TakeUntilObserver<T> tus = new TakeUntilObserver<T>(serial, frc);
//onSubscribe在订阅处回调,不受线程影响,使各个部分对parent Disposable做处理
child.onSubscribe(frc);
//other是监听的Observable即subject,订阅了TakeUntil
other.subscribe(new TakeUntil(frc, serial));
//source为源Observable,继续订阅TakeUntilObserver
source.subscribe(tus);
}
static final class TakeUntilObserver<T> extends AtomicBoolean implements Observer<T> {
private static final long serialVersionUID = 3451719290311127173L;
final Observer<? super T> actual;
final ArrayCompositeDisposable frc;
Disposable s;
TakeUntilObserver(Observer<? super T> actual, ArrayCompositeDisposable frc) {
this.actual = actual;
this.frc = frc;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
frc.setResource(0, s);
}
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
frc.dispose();
actual.onError(t);
}
@Override
public void onComplete() {
frc.dispose();
actual.onComplete();
}
}
final class TakeUntil implements Observer<U> {
private final ArrayCompositeDisposable frc;
private final SerializedObserver<T> serial;
TakeUntil(ArrayCompositeDisposable frc, SerializedObserver<T> serial) {
this.frc = frc;
this.serial = serial;
}
@Override
public void onSubscribe(Disposable s) {
frc.setResource(1, s);
}
@Override
public void onNext(U t) {
frc.dispose();
//当监听的observable成功发射数据后。立刻调用下游observer的onComplete
//同时调用上游的dispose()方法
serial.onComplete();
}
@Override
public void onError(Throwable t) {
frc.dispose();
serial.onError(t);
}
@Override
public void onComplete() {
frc.dispose();
serial.onComplete();
}
}
}
看到这里二三种情况就应该很好理解了
当主线程发射了stop,那么立刻调用的下游的observer的onComplete,所以立刻打印了subscribe---onComplete---,二三种情况的唯一区别的是。第二种情况没有打印第二个doOnNext,这是因为第二种情况在第一个doOnNext阻塞的时候,other发射了stop,立刻调用了下游的serial的onComplete,也就是第二个doOnNext的onComplete,在第二个doOnNext的onComplete调用后done=true是不能再接收到onNext的,所以第二个doOnNext不会被打印,那么第三种情况打印了第二个doOnNext,因为TakeUntil只作用于下游,所以第二个doOnNext不受影响。
@Override
public void onNext(T t) {
if (done) {
return;
}
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
actual.onNext(t);
}
@Override
public void onComplete() {
if (done) {
return;
}
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
done = true;
actual.onComplete();
try {
onAfterTerminate.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
然后是第四种情况,《第一个doOnNext---醒来》的log都没有打印。证明在中断源Observable的同时将子线程干掉了。之前看过subscribeOn得实现,在撸一遍吧
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//看这个就明白喽。将Disposable设置成scheduler.scheduleDirect()。
//SubscribeTask所在的线程就是scheduler指定的线程了
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
看下scheduleDirect方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
//OK ,NewThreadWorker里面有个运行子线程的线程池。调用的就线程池的shutdown()
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
当compose(takeUntilTransformer)放在subscribeOn之后时,在subscribe的时候由于subscribeActual逆向调用,所以child.onSubscribe(Dispose dispose)是调用TakeUntilObserver的
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
frc.setResource(0, s);
}
}
frc就间接有了SubscribeOnObserver的引用.
还差第一种情况,第一种情况看上去是和第一个doOnNext一起阻塞了,其实不是哦,主要是因为SerializedObserver这个类,它用来处理并发,这个类的描述是这样的
When multiple threads are emitting and/or notifying they will be serialized by:
Allowing only one thread at a time to emit
Adding notifications to a queue if another thread is already emitting
Not holding any locks or blocking any threads while emitting
一次只会允许一个线程进行发送事物
如果其他线程已经准备就绪,会通知给队列
在发送事物中,不会持有任何锁和阻塞任何线程
如何处理并发
@Override
public void onNext(@NonNull T t) {
if (done) {
return;
}
if (t == null) {
s.dispose();
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
synchronized (this) {
if (done) {
return;
}
if (emitting) {
AppendOnlyLinkedArrayList<Object> q = queue;
if (q == null) {
q = new AppendOnlyLinkedArrayList<Object>(QUEUE_LINK_SIZE);
queue = q;
}
q.add(NotificationLite.next(t));
return;
}
emitting = true;
}
actual.onNext(t);
emitLoop();
}
@Override
public void onComplete() {
if (done) {
return;
}
synchronized (this) {
if (done) {
return;
}
if (emitting) {
AppendOnlyLinkedArrayList<Object> q = queue;
if (q == null) {
q = new AppendOnlyLinkedArrayList<Object>(QUEUE_LINK_SIZE);
queue = q;
}
q.add(NotificationLite.complete());
return;
}
done = true;
emitting = true;
}
actual.onComplete();
// no need to loop because this onComplete is the last event
}
void emitLoop() {
for (;;) {
AppendOnlyLinkedArrayList<Object> q;
synchronized (this) {
q = queue;
if (q == null) {
emitting = false;
return;
}
queue = null;
}
if (q.accept(actual)) {
return;
}
}
}
onError和onComplete同理就不贴了,有一个emitting 变量,如果一个线程调用了3个方法的某一个那么emitting 为true,当其他线程再次调用的时候那么这个操作会被加入链表数组里,当 actual.onNext(t);执行完毕会调用emitLoop();检查链表数组里是否保存了其他线程的操作,如果操作是onNext会一直调用,如果调用了onError或者onComplete那么终止。
public <U> boolean accept(Observer<? super U> observer) {
Object[] a = head;
final int c = capacity;
while (a != null) {
for (int i = 0; i < c; i++) {
Object o = a[i];
if (o == null) {
break;
}
if (NotificationLite.acceptFull(o, observer)) {
return true;
}
}
a = (Object[])a[c];
}
return false;
}
NotificationLite
public static <T> boolean acceptFull(Object o, Observer<? super T> s) {
if (o == COMPLETE) {
s.onComplete();
return true;
} else
if (o instanceof ErrorNotification) {
s.onError(((ErrorNotification)o).e);
return true;
} else
if (o instanceof DisposableNotification) {
s.onSubscribe(((DisposableNotification)o).d);
return false;
}
s.onNext((T)o);
return false;
}
那么第一种情况就是主线程调用的onComplete被放到了链表数组里,待子线程的doOnNext执行完毕,然后调用了onComplete。
情况分析完毕,那么takeUntil放的位置根据不同需求进行选择了。
如有分析不对的地方请指出,Thanks~