最近看源码的时候,经常会看到FusionMode
。 这玩意网上介绍比较少,粗看也比较复杂,但因为较多运算符中都用到了它,所以此篇决定选择几个代表运算符对它做一下分析。(图看不清楚点开看大图吧。。orz)
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 5)
.flatMapIterable(mapper, 2)
.subscribe(ts);
我们以上面的test code
作为主要例子进行一步步肢解分析。最初看可能会比较晕,但这玩意多看几次就可以串起来了> < 自己的笔记记录,写的可能不是很流畅 见谅。
FlowableRange
这里的分析与 Rxjava图示入门篇的方法一致。这里先介绍三个重要的类:
BaseRangeSubscription
我们知道,subscription
其实可以看成上游和下游中间的连接点。下游拿到subscription
可以随时取消订阅,而在这节,我们会发现他还有另一个极其重要的作用---与FusionMode
的结合。BaseRangeSubscription
继承于BasicQueueSubscription
,它向外界提供三个接口:
requestFusion(mode)
request()
poll()
requestFusion
外界调用此接口时,会带着自己希望请求的Mode
,这个Mode
到底是什么后面再说。subsciption
收到这个请求的mode
后,根据自己的心情返回新的Mode
给请求方。这里BaseRangeSubscription
就返回了mode&SYNC
request
外界调用此接口时,代表向subscription
请求n个item。BaseRangeSubscription
收到该请求时,根据n是否=max
,调用fastPath
与slowPath
。这两个方法又是干什么的呢?先暂时忽略。
poll
外界调用此接口时,代表希望从subscription
中获取item。可以看到FlowableRange
就返回index
,假设range是[0,10],那就依次会返回0,1,...
其实一般与FusionMode
有关的subscription
都会提供这三个接口,后面我们会看到这三个接口分别起到什么重要的作用。
RangeSubscription & RangeConditionSubscription
这两货主要为FlowableRange
实现了前面接口中的fastPath
与slowPath
,根据图示可以看到本质都是调用了根据request(n)
中请求的数量依次调用subscriber.onNext(index)
FlowableFlattenIterable
这货与以前介绍过的flatMap
类似。传入一个mapper
,
FlattenIterableSubscriber
当其onSubscribe
被调用时,该subscriber
会观察与上游连接的subscription
是否为QueueSubscription
。是不是似曾相识,这个QueueSubscription
在前面介绍FlowableRange
时提过。如果与上游连接的subscription
为QueueSubscription
,就会调用前面的接口1requestFusion(ANY)
。看上游返回什么Mode
,这里会返回两类,分别为SYNC
与ASYNC
。
sync
假设上游返回SYNC
,done = true
。代表它们之间的交互为同步。
if (m == SYNC) {
fusionMode = m;
this.queue = qs;
done = true;
actual.onSubscribe(this);
return;
}
这里看到存在一个变量为queue
,下游可以通过queue
向上游获取数据
aysnc
假设上游返回ASYNC
,代表上下游之间的交互为异步。subscription.request(prefetch)
。调用接口2。根据前面的介绍会调用slowPath
,即调用FlattenIterableSubscriber.onNext
。
这里我们看onNext
:上代码吧
@Override
public void onNext(T t) {
if (done) {
return;
}
if (fusionMode == NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Queue is full?!"));
return;
}
drain();
}
这里因为是async
,所以不是done
。
ASYNC 进入
drain()
NONE 如果队列已经满了抛出错误
FlattenIterableSubscriber
也和queue
有关,它也提供之前介绍过的三个接口。
在讲两个运算符结合之前,我们最后讲一下drain()
到底干了什么吧!
drain
前面提到过,subscription
有一个this.queue
队列。drain会依次通过队列向上游请求item,通过iterable = mapper.apply(t);
得到iterable
,然后再遍历iterable
,调用下游subscriber.onNext/onComplte
,完成数据流动过程。
两个操作符熔合
终于到了激动人心的时刻。这两个操作符如果合在一起会产生什么神奇的效果呢?数据到底是怎么流动的呢?
再放一次要介绍的例子:
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 5)
.flatMapIterable(mapper, 2)
.subscribe(ts);
当调用subscribe(ts)
时首先调用FlowableFlatMapIterable.subscribe
,此时自己看代码发现调用了source.subscibe(FlattenIterableSubscriber)
,即将上游FlowableRange
与中游的FlattenIterableSubscriber
结合。中游发现有人要给自己发数据了,就调用自己的onSubscribe
,即FlattenIterableSubscriber.onSubscribe(BaseRangeSubscription)
,此时BaseRangeSubscription
作为上游和中游之间的connection
onSubscribe
这时候发生的事情其实前面都介绍过了。但我自己都快忘了,重述一遍,此时中游的FlattenIterableSubscriber
会通过connection BaseRangeSubscription
请求Mode
,这里BaseRangeSubscription
返回SYNC
,代表上游只支持同步模式。前面只简单介绍了SYNC
,这里再重点介绍一下。
if (m == SYNC) {
fusionMode = m;
this.queue = qs;
done = true;
actual.onSubscribe(this);
return;
}
这里会继续调用realSubscriber.onSubscribe
,我们先忽略下游也会有mode
请求的情况,
long mr = missedRequested.getAndSet(0L);
if (mr != 0L) {
s.request(mr);
}
此时我们的testSubscriber
会通过中游与下游之间的connection
即FlattenIterableSubscriber
向中游请求数据。此时会调用前面介绍过的drain
,而调用drain
时,中游会主要通过上游与中游之间的connection
即BaseRangeSubscription.poll
即接口三要求上游同步返回数据给它,数据返回后再依次返回给下游。
耶其实到这里就完成了整个数据的流动。是不是其实也不怎么复杂(...别欺骗自己了)
到这里我们可以说,到底mode
是什么了。拿住官方解释吧。
/**
* Request a synchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In synchronous fusion, all upstream values are either already available or is generated
* when {@link #poll()} is called synchronously. When the {@link #poll()} returns null,
* that is the indication if a terminated stream.
* In this mode, the upstream won't call the onXXX methods and callers of
* {@link #poll()} should be prepared to catch exceptions. Note that {@link #poll()} has
* to be called sequentially (from within a serializing drain-loop).
*/
先介绍SYNC
,说明当上游告诉下游它支持同步时,下游只会通过poll
求取数据,取一个给一个,如果上游返回null
,说明整个数据流已经被中断了。所以在这种模式下,上游是不会主动调用下游的onXXX
的。
那ASYNC
又是什么呢?
ASYNC
我们再走一遍async
的流程吧!
if (m == ASYNC) {
fusionMode = m;
this.queue = qs;
actual.onSubscribe(this);
s.request(prefetch);
return;
}
看到async
时,中游已经主动向上游去预先取数据了,所以有prefetch
的概念。因此此时,上游就会去调用中游的onNext
。看到这里可以发现async
和sync
的部分不同了,sync
只会通过poll
去主动拉数据,而async
是会去主动request
数据让上游主动调用自己的onNext
方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (fusionMode == NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Queue is full?!"));
return;
}
drain();
}
在onNext
中看到依旧调用了drain
,说明中游收到上游的通知时会再通过poll
去拉取上游给它准备的数据。
好吧,那现在让我们看看ASYNC
的官方解释吧
/**
* Request an asynchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In asynchronous fusion, upstream values may become available to {@link #poll()} eventually.
* Upstream signals onError() and onComplete() as usual but onNext may not actually contain
* the upstream value but have {@code null} instead. Downstream should treat such onNext as indication
* that {@link #poll()} can be called. Note that {@link #poll()} has to be called sequentially
* (from within a serializing drain-loop). In addition, callers of {@link #poll()} should be
* prepared to catch exceptions.
*/
在异步模式中,上游会通过onNext
通知下游去取数据,此时下游就会通过poll
去取上游给自己的准备的数据。
可以看到,一个是下游主动同步去取,一个是上游发通知并携带数据给下游。下游其实是不知道数据什么时候会来的。
注释其实还有另一层意思,如果上游给下游发了onNext(null)
,说明下游此时要通过poll
主动去取。这个又是什么意思呢?这里的async
例子其实不怎么好,因为FlowableRange
只支持sync
,所以我们换个例子再看看
FlowabledoAfterNext
sync
TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueSubscription.SYNC);
Flowable.range(1, 5)
.doAfterNext(afterNext)
.subscribe(ts0);
这里sync运用在中游与下游之间。可以看到,当mode
为sync
时,
if (initialFusionMode != 0) {
if (s instanceof QueueSubscription) {
qs = (QueueSubscription<T>)s;
int m = qs.requestFusion(initialFusionMode);//sync
establishedFusionMode = m;
if (m == QueueSubscription.SYNC) {
checkSubscriptionOnce = true;
lastThread = Thread.currentThread();
try {
T t;
while ((t = qs.poll()) != null) {
values.add(t);
}
completions++;
} catch (Throwable ex) {
// Exceptions.throwIfFatal(e); TODO add fatal exceptions?
errors.add(ex);
}
return;
}
}
}
while ((t = qs.poll()) != null) {
values.add(t);
}
所以再次印证,同步都是靠下游主动去拉的。
重点关注async
:
async
TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC);
Flowable.range(1, 5)
.doAfterNext(afterNext)
.subscribe(ts0);
actual.onSubscribe(s);
long mr = missedRequested.getAndSet(0L);
if (mr != 0L) {
s.request(mr);
}
进入DoAfterSubscriber.request(mr)->UnicastProcessor.request(mr)->drain->a.onNext(null)
if (establishedFusionMode == QueueSubscription.ASYNC) {
try {
while ((t = qs.poll()) != null) {
values.add(t);
}
} catch (Throwable ex) {
// Exceptions.throwIfFatal(e); TODO add fatal exceptions?
errors.add(ex);
qs.cancel();
}
return;
}
可以看到这里就是官方解释的验证。这里中游和下游是异步的,所以当下游发出request告诉中游自己想要n个item时,最终中游通过上游返回onNext(null)
,下游收到这个通知时才会主动去拉数据。显而易见,中游什么时候给数据下游其实是不知道的,这里onNext
也没有像之前我们分析过的操作符一样把item传递给下游,只起了一个通知的作用。
其实这里我们还是忽略了很多东西,为什么上游会返回onNext(null)
,这里其实我们已经假定上中游也是异步模式了。这是因为FlowableOnNext
当收到下游要异步的请求时,会也向上游请求自己也要使用异步的模式。所以这里上游就直接返回了onNext(null)
,我们进一步研究代码会发现如果中游向上游请求sync
模式,那上游会返回onNext(t)
while (r != e) {
boolean d = done;
T t = q.poll();
boolean empty = t == null;
if (checkTerminated(failFast, d, empty, a, q)) {
return;
}
if (empty) {
break;
}
a.onNext(t);
e++;
}
总结
写到这里其实可以理解为什么要提供三个接口了,
- poll 用于同步请求返回数据,或是异步通知下游来拿数据
- request 用于当收到下游的请求时的操作,如果是同步直接会返回给下游数据,否则可能只是通知下游我有数据了你自己来拿吧。
操作符背后真的包含很多思想,这节只介绍了async
与sync
,boundary
还没看懂。。 后面还会继续研究操作符熔合
,看懂了再写吧
如果哪里理解的不对,欢迎及时指正。