Rxjava--背压(Backpressure)

前言:Rxjava是通过观察者模式设计的异步任务框架,他的有点在于简洁性,不是代码的简洁性,而是逻辑的简洁性,随着项目的中异步任务逻辑越来越复杂,Rxjava可以一直保持代码逻辑的简洁,不会像handler,ThreadHandler这些在几次线程间切换以后可能都已经晕头转向了.Rxjava提供了多种类的操作符,比如ObserverOn可以为每次业务的处理的处理制定线程,flatmap这种操作符来帮助我们处理循环遍历的问题,总的来说rxjava就是时代码简洁的异步任务框架.

但是在rxjava1.x版本中存在一个比较深的坑就是,有些操作符支持,背压,有些操作符不支持背压,当我们使用不支持背压的操作符时会报出rx.exceptions.MissingBackpressureException,当我们不理解Backpressure概念时导致我们很难处理这种错误。下面我们就来学习rxjava中背压的概念。

Backpressure
Rx 中的数据流是从一个地方发射到另外一个地方。每个地方处理数据的速度是不一样的。如果生产者发射数据的速度比消费者处理的快会出现什么情况?在同步操作中,这不是个问题,例如:

Observable<Integer> producer = Observable.create(o -> {
    o.onNext(1);
    o.onNext(2);
    o.onCompleted();
});
// Consume
producer.subscribe(i -> {
    try {
        Thread.sleep(1000);
        System.out.println(i);
    } catch (Exception e) { }
});

虽然上面的消费者处理数据的速度慢,但是由于是同步调用的,所以当 o.onNext(1) 执行后,一直阻塞到消费者处理完才执行 o.onNext(2)。 但是生产者和消费者异步处理的情况很常见。如果是在异步的情况下会出现什么情况呢?

在传统的 pull 模型中,当消费者请求数据的时候,如果生产者比较慢,则消费者会阻塞等待。如果生产者比较快,则生产者会等待消费者处理完后再生产新的数据。

而 Rx 为 push 模型。 在 Rx 中,只要生产者数据好了就发射出去了。如果生产者比较慢,则消费者就会等待新的数据到来。如果生产者快,则就会有很多数据发射给消费者,而不管消费者当前有没有能力处理数据。这样会导致一个问题,例如:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

结果;

0
1
rx.exceptions.MissingBackpressureException

上面的 MissingBackpressureException 告诉我们,生产者太快了,我们的操作函数无法处理这种情况。

解决:

RxJava 实现了一种通过 Subscriber 来通知 Observable 发射数据的方式。Subscriber 有个函数 request(n),调用该函数用来通知 Observable 现在 Subscriber 准备接受下面 n 个数据了。在 Subscriber 的 onStart 函数里面调用 request 函数则就开启了reactive pull backpressure。这并不是传统的 pull 模型,并不会阻塞调用。只是 Subscriber 通知 Observable 当前 Subscriber 的处理能力。 通过调用 request 可以发射更多的数据。

pull

观察者可以根据自身实际情况按需拉取数据,而不是被动接收(也就相当于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。

class MySubscriber extends Subscriber<T> {
    @Override
    public void onStart() {
      request(1); //要在onStart中通知被观察者先发送一个事件
    }

    @Override
    public void onCompleted() {
        ...
    }

    @Override
    public void onError(Throwable e) {
        ...
    }

    @Override
    public void onNext(T n) {
        ...
        request(1); //处理完毕之后,在通知被观察者发送下一个事件
    }
}


Observable observable=Observable.range(1,100000);
observable.observeOn(Schedulers.newThread())
            .subscribe(new MySubscriber());

在 onStart 函数中调用 request(1) 开启了 backpressure 模式,告诉 Observable 一次只发射一个数据。在 onNext 里面处理完该数据后,可以请求下一个数据。通过 quest(Long.MAX_VALUE) 可以取消 backpressure 模式。
实际上,在上面的代码中,你也可以不需要调用request(n)方法去拉取数据,程序依然能完美运行,这是因为range –> observeOn,这一段中间过程本身就是响应式拉取数据,observeOn这个操作符内部有一个缓冲区,Android环境下长度是16,它会告诉range最多发送16个事件,充满缓冲区即可。不过话说回来,在观察者中使用request(n)这个方法可以使背压的策略表现得更加直观,更便于理解。
如果你足够细心,会发现,在开头展示异常情况的代码中,使用的是interval这个操作符,但是在这里使用了range操作符,为什么呢?
这是因为interval操作符本身并不支持背压策略,它并不响应request(n),也就是说,它发送事件的速度是不受控制的,而range这类操作符是支持背压的,它发送事件的速度可以被控制。

Backpressure 策略

很多 Rx 操作函数内部都使用了 backpressure 从而避免过多的数据填满内部的队列。这样处理慢的消费者就会把这种情况传递给前面的消费者,前面的消费者开始缓冲数据直到他也缓存满为止再告诉他前面的消费者。Backpressure 并没有消除这种情况。只是让错误延迟发生,我们还是需要处理这种情况。
Rx 中有操作函数可以用来处理这种消费者处理不过来的情况。

onBackpressureBuffer

onBackpressureBuffer 会缓存所有当前无法消费的数据,直到 Observer 可以处理为止。

Observable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer(1000)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println
    );

上面的示例,生产者比消费者快 100 倍。使用 1000个缓冲来处理这种消费者比较慢的情况。当消费者消费 11个数据的时候,缓冲区满了,生产者生产了 1100个数据。数据流就抛出异常了。

onBackpressureDrop

如果消费者无法处理数据,则 onBackpressureDrop 就把该数据丢弃了。

Observable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop()
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

结果:

0
1
2
...
126
127
12861
12862
...

这个示例中,前面 128 个数据正常的被处理的,这是应为 observeOn 在切换线程的时候, 使用了一个 128 个数据的小缓冲。

在RxJava1.X中,背压的设计并不十分完美。而是希望你对背压有一个全面清晰的认识,对于它在RxJava1.X中的设计缺陷有所了解即可。可喜的是,RxJava2.X中解决了背压的问题,推出了Flowable(Observable在RxJava2.0中新的实现),而且其中的操作符全部都实现了背压

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容