Rxjava2~take~timer~interval~buffer~filter等源码如何实现(你应该懂的)~学渣带你扣rxjava2

take()


<pre>
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver())

</pre>

输出没错是123

我们面来看看源码
直接来看ObservableTake的subscribeActual,[不懂的同学请看我前面的学渣系列]
<pre>
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new TakeObserver<T>(observer, limit));
}
</pre>
这个source是ObservableSource的对象。 那么我们去找实现他的Observable
好吧 又回到了。
<pre>
public final void subscribe(Observer<? super T> observer)
subscribeActual(observer);
其他的省略了

</pre>
关键点一步,这回调用了谁的方法呢? 下面来揭晓
是ObservableObserveOn的subscribeActual
<pre>
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

</pre>
看到了吗 又会调用
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
然后 又要调用的是ObservableSubscribeOn的subscribeActual
<pre>
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}

</pre>

大家会好奇这两个地方为什么会被调用呢?
下面我给大家看一个地方
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
O(∩_∩)O
你没有看错
<pre>
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
</pre>
大家可以看到不。 这两个方法返回的也是Observable对象。 所以 他们会分别调用这两个对象subscribeActual方法。好吧,让我们来像下进行。
【下面是一个小扩展 给大家一个小小的感觉】
<pre>

Observable.just(1, 2, 3, 4, 5)

            .observeOn(AndroidSchedulers.mainThread())
            .take(3)
            .subscribe(getObserver())

看到有什么不同了吗? 我注释掉了一个方法。我为什么要这么干?我注视掉了那么
source.subscribe 会调用谁呢? 我直接给出来答案。大家可以思考一个 当我直接注释之后会调用just的subscribeActual

public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

相信大家看过我之前的应该可以看懂。
</pre>

让我们回归正题当执行到ObservableSubscribeOn的subscribeActual的方法的时候

  public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));

source.subscribe(parent); 看到这个方法了吗、首先它是异步的。另外执行
.source.subscribe(parent);的时候 ,实际上就执行了ObservableFromArray的subscribeActual
<pre>
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();

</pre>
剩下的就好理解了,都是分别执行onnext等方法。

到这里task的大体思路介绍完毕

2下面开始timer 定时器

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    w.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                decoratedRun.run();
            } finally {
                w.dispose();
            }
        }
    }, delay, unit);

    return w;
}

重复的就不贴了。 都是差不多重复的。 只是给大家贴上关键代码
看到这里面了吗。delay 就是大家贴上的时间。 详细这个大家都是可以看明白的。,

3interval

做周期性操作,从翻译上大家就应该可以看明白
ComputationScheduler的schedulePeriodicallyDirect的方法
<pre>
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
</pre>

<pre>
NewThreadWorker的schedulePeriodicallyDirect的方法
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Future<?> f = executor.scheduleAtFixedRate(decoratedRun, initialDelay, period, unit);
return Disposables.fromFuture(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}

</pre>

分别设置了 什么时候开始。多长时间执行一次

4buffer

Observable<List<String>> buffered = getObservable().buffer(3, 2);

    buffered.subscribe(getObserver());

ObservableBuffer的subscribeActual的方法
<pre>

protected void subscribeActual(Observer<? super U> t) {
if (skip == count) {
BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
if (bes.createBuffer()) {
source.subscribe(bes);
}
} else {
source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
}
}
</pre>
好吧到了关键的地方 source.subscribe是调用谁的地方
Observable.just("one", "two", "three", "four", "five");
所以是ObservableFromArray的subscribeActual方法

<pre>
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

void run() {
T[] a = array;
int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }

</pre>

看到这个for方法了吗 这个就是决定你跳过的数量的。

5filter

Paste_Image.png

这个相信大家很熟悉,对就是过滤
<pre>
fromArray(1, 0, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer.intValue() > 5;
}
})
</pre>

这里只是放出来关键代码
ObservableFilter的onNext
<pre>
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
actual.onNext(t);
}
} else {
actual.onNext(null);
}
}

</pre>
这个b就是你的过滤条件。 下面的就是判断。 不符合的就不执行 actual.onNext(t);其实很简单的方式

6skip

和上面同理关键部分ObservableSkip的onNext方法
<pre>

public void onNext(T t) {
if (remaining != 0L) {
remaining--;
} else {
actual.onNext(t);
}
}
</pre>

7 scan

Paste_Image.png

RxJava的scan()函数可以看做是一个累加器函数。scan()函数对原始Observable发射的每一项数据都应用一个函数,它将函数的结果填充回可观测序列,等待和下一次发射的数据一起使用。

关键代码
<pre>
@Override
public void onNext(T t) {
if (done) {
return;
}
final Observer<? super T> a = actual;
T v = value;
if (v == null) {
value = t;
a.onNext(t);
} else {
T u;

            try {
                u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The value returned by the accumulator is null");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }

            value = u;
            a.onNext(u);
        }
    }

</pre>
执行的时候value 会累加。 a.onNext(u);在发射出去

8 replay

<pre>
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(2); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();

   /*
     * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
     */
    connectableObservable.subscribe(getSecondObserver());

</pre>
replay 这个是缓存操作。
第二次订阅之后,就是缓存后面两个数据

9concat

Paste_Image.png

<pre>
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};

    final Observable<String> aObservable = Observable.fromArray(aStrings);
    final Observable<String> bObservable = Observable.fromArray(bStrings);

    Observable.concat(aObservable, bObservable)
            .subscribe(getObserver());

</pre>

他的过程是
<pre>
return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
</pre>

concat操作符肯定也是有序的,实际上fromArray(sources)这么一个过程。

10merge

Paste_Image.png

<pre>
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};

    final Observable<String> aObservable = Observable.fromArray(aStrings);
    final Observable<String> bObservable = Observable.fromArray(bStrings);

    Observable.merge(aObservable, bObservable)
            .subscribe(getObserver());

</pre>

无序的合并

11distinct 去除重复的

Paste_Image.png

<pre>
enum HashSetCallable implements Callable<Set<Object>> {
INSTANCE;
@Override
public Set<Object> call() throws Exception {
return new HashSet<Object>();
}
}
</pre>
HashSet中 是不允许重复元素的

12last

Paste_Image.png

<pre>
private void doSomeWork() {
getObservable().last("A1") // the default item ("A1") to emit if the source ObservableSource is empty
.subscribe(getObserver());
}

private Observable<String> getObservable() {
    return Observable.just("A1", "A2", "A3", "A4", "A5", "A6");
}

打印出来的是a6

</pre>

ObservableFromArray的run方法
<pre>
void run() {
T[] a = array;
int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }

</pre>

ObservableLastSingle的onComplete
<pre>
public void onComplete() {
s = DisposableHelper.DISPOSED;
T v = item;
if (v != null) {
item = null;
actual.onSuccess(v);
} else {
v = defaultItem;
if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}

</pre>

last方法会返回Single

13throttleFirst

Paste_Image.png

<pre>
private void doSomeWork() {
getObservable()
.throttleFirst(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}

private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}

</pre>

从这个可以理解到发送第一个之后。剩下的500之后才会接受第二个

14throttleLast

Paste_Image.png

从这个可以看出来,这是在一段时间内接受最后一个数据
<pre>
getObservable()
.throttleLast(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}

private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}

</pre>

15debounce

<pre>

getObservable()
            .debounce(500, TimeUnit.MILLISECONDS)
            // Run on a background thread
            .subscribeOn(Schedulers.io())
            // Be notified on the main thread
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(getObserver());
}

private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            emitter.onNext(1); // skip
            Thread.sleep(400);
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(100);
            emitter.onNext(4); // deliver
            Thread.sleep(605);
            emitter.onNext(5); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });

</pre>

Paste_Image.png

这个接受的一一个时间跨度之内的数据

16window

Paste_Image.png

可以看出来大概 的意思就是截取被观察者组成一个新的被观察者

17delay

Paste_Image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容

  • 我和rx的对话来继续了解冷观察者和热观察者 wow我们又多了一天来学习新的知识这真的很棒。 hello 伙伴们。希...
    品味与回味阅读 753评论 3 1
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,178评论 2 8
  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,746评论 0 1
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,497评论 18 139
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,570评论 8 93