FluxInterval实例及解析

本文主要研究下FluxInterval的机制

FluxInterval

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxInterval.java

/**
 * Periodically emits an ever increasing long value either via a ScheduledExecutorService
 * or a custom async callback function
 * @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
 */
final class FluxInterval extends Flux<Long> {

    final Scheduler timedScheduler;
    
    final long initialDelay;
    
    final long period;
    
    final TimeUnit unit;

    FluxInterval(
            long initialDelay, 
            long period, 
            TimeUnit unit, 
            Scheduler timedScheduler) {
        if (period < 0L) {
            throw new IllegalArgumentException("period >= 0 required but it was " + period);
        }
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = Objects.requireNonNull(unit, "unit");
        this.timedScheduler = Objects.requireNonNull(timedScheduler, "timedScheduler");
    }
    
    @Override
    public void subscribe(CoreSubscriber<? super Long> actual) {
        Worker w = timedScheduler.createWorker();

        IntervalRunnable r = new IntervalRunnable(actual, w);

        actual.onSubscribe(r);

        try {
            w.schedulePeriodically(r, initialDelay, period, unit);
        }
        catch (RejectedExecutionException ree) {
            if (!r.cancelled) {
                actual.onError(Operators.onRejectedExecution(ree, r, null, null,
                        actual.currentContext()));
            }
        }
    }
}   

可以看到这里利用Scheduler来创建一个定时调度任务IntervalRunnable

IntervalRunnable

    static final class IntervalRunnable implements Runnable, Subscription,
                                                   InnerProducer<Long> {
        final CoreSubscriber<? super Long> actual;
        
        final Worker worker;
        
        volatile long requested;
        static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED =
                AtomicLongFieldUpdater.newUpdater(IntervalRunnable.class, "requested");
        
        long count;
        
        volatile boolean cancelled;

        IntervalRunnable(CoreSubscriber<? super Long> actual, Worker worker) {
            this.actual = actual;
            this.worker = worker;
        }

        @Override
        public CoreSubscriber<? super Long> actual() {
            return actual;
        }

        @Override
        @Nullable
        public Object scanUnsafe(Attr key) {
            if (key == Attr.CANCELLED) return cancelled;

            return InnerProducer.super.scanUnsafe(key);
        }

        @Override
        public void run() {
            if (!cancelled) {
                if (requested != 0L) {
                    actual.onNext(count++);
                    if (requested != Long.MAX_VALUE) {
                        REQUESTED.decrementAndGet(this);
                    }
                } else {
                    cancel();
                    
                    actual.onError(Exceptions.failWithOverflow("Could not emit tick " + count + " due to lack of requests" +
                            " (interval doesn't support small downstream requests that replenish slower than the ticks)"));
                }
            }
        }
        
        @Override
        public void request(long n) {
            if (Operators.validate(n)) {
                Operators.addCap(REQUESTED, this, n);
            }
        }
        
        @Override
        public void cancel() {
            if (!cancelled) {
                cancelled = true;
                worker.dispose();
            }
        }
    }

这里重点看requested变量,run方法每次判断requested,如果requested为0则销毁worker,否则则每次发射一个元素计数就减一
而subscriber如果有继续request的话,则会增加requested的值

实例1

    public static void main(String[] args) throws InterruptedException {
        Flux<Long> flux = Flux.interval(Duration.ofMillis(1))
                .doOnNext(e -> {
                    System.out.println(e);
                }).doOnError(e -> e.printStackTrace());

        System.out.println("begin to subscribe");
        flux.subscribe(e -> {
            System.out.println(e);
            try {
                TimeUnit.MINUTES.sleep(30);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });
        TimeUnit.MINUTES.sleep(30);
    }

这个例子requested是Long.MAX_VALUE,但是由于subscribe的线程跟运行interval的线程一样,由于里头执行了sleep操作也导致interval的调度也跟着阻塞住了。

实例2

    public static void main(String[] args) throws InterruptedException {
        Flux<Long> flux = Flux.interval(Duration.ofMillis(1))
                .doOnNext(e -> {
                    System.out.println(e);
                })
                //NOTE 这里request prefetch=256个
                .publishOn(Schedulers.newElastic("publish-thread"))
                .doOnError(e -> e.printStackTrace());

        System.out.println("begin to subscribe");
        AtomicInteger count = new AtomicInteger(0);
        //NOTE 得有subscribe才能触发request
        flux.subscribe(e -> {
            LOGGER.info("receive:{}",e);
            try {
                //NOTE 使用publishOn将subscribe与interval的线程分开
                if(count.get() == 0){
                    TimeUnit.MINUTES.sleep(2);
                }
                count.incrementAndGet();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });
        TimeUnit.MINUTES.sleep(30);
    }

使用publishOn将subscriber线程与interval线程隔离,使其sleep不阻塞interval
这里publishOn隐含了一个prefetch参数,默认是Queues.SMALL_BUFFER_SIZE即Math.max(16,Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));

    public final Flux<T> publishOn(Scheduler scheduler) {
        return publishOn(scheduler, Queues.SMALL_BUFFER_SIZE);
    }

    final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
        if (this instanceof Callable) {
            if (this instanceof Fuseable.ScalarCallable) {
                @SuppressWarnings("unchecked")
                Fuseable.ScalarCallable<T> s = (Fuseable.ScalarCallable<T>) this;
                try {
                    return onAssembly(new FluxSubscribeOnValue<>(s.call(), scheduler));
                }
                catch (Exception e) {
                    //leave FluxSubscribeOnCallable defer exception call
                }
            }
            @SuppressWarnings("unchecked")
            Callable<T> c = (Callable<T>)this;
            return onAssembly(new FluxSubscribeOnCallable<>(c, scheduler));
        }

        return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
    }

这里使用Queues.get(prefetch)创建一个间接的队列来盛放元素

这个实例最后输出

//......
21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:254
21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:255
reactor.core.Exceptions$OverflowException: Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:121)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

由于第一次request默认是256,之后在发射256个元素之后,subscriber没有跟上,导致interval的worker被cancel掉了,于是后续消费完256个元素之后,紧挨着就是OverflowException这个异常

小结

reactor本身并不依赖线程,只有interval,delayElements等方法才会创建线程。而reactor本身是观察者设计模式的扩展,采用push+backpressure模式,一开始调用subscribe方法就触发request N请求推送数据,之后publisher就onNext推送数据,直到complete或cancel。实例1是因为线程阻塞导致interval的onNext阻塞,实例2是interval被cancel掉导致flux关闭。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容