RxJava Flowable Processor

ReactiveX 系列文章目录


Flowable/Subscriber

Backpressure 背压现象指生产者的速度大于消费者的速度。

同一个线程生产一个就消费了,不会产生问题,在异步线程中,如果生产者的速度大于消费者的速度,就会产生 Backpressure 问题。比如子线程的被观察者 1 秒生产发送一次,而观察者 2 秒才消费处理一个,造成事件的堆积,最后造成 OOM。

在 1.x 中,Backpressure 问题由 Observable 处理,2.x 中由 Flowable 专门来处理。

val flowable = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
    emitter.onNext(1)
    emitter.onNext(2)
    emitter.onNext(3)
    emitter.onComplete()
}, BackpressureStrategy.ERROR) // 增加了一个参数

val subscriber = object : Subscriber<Int> {
    override fun onSubscribe(s: Subscription) {
        s.request(java.lang.Long.MAX_VALUE)
    }

    override fun onNext(integer: Int?) {}
    override fun onError(t: Throwable) {}
    override fun onComplete() {}
}
flowable
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation())
        .subscribe(subscriber)

onSubscribe 的参数类型不再是 Disposable,而是 Subscription,可以调用它的 cancel() 切断观察者与被观察者之间的联系。Subscription 还有一个 request(long n) 方法,用来向生产者申请可以消费的事件数量。这样便可以根据本身的消费能力进行消费事件。

当调用了 request() 方法后,生产者便发送对应数量的事件供消费者消费。即生产者要求多少,消费者就发多少。

如果不显式调用 request 就表示消费能力为 0。request 这个方法若不调用,下游的 onNext 与 OnComplete 都不会调用。

处理策略

处理 Backpressure 的策略是处理 Subscriber 接收事件的方式,并不影响 Flowable 发送事件的方法。即使采用了处理 Backpressure 的策略,Flowable 原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是 Subscriber 接收事件的方式。

在异步调用时,RxJava 中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为 128,即只能缓存 128 个事件。无论 request() 中传入的数字比 128 大或小,缓存池中在刚开始都会存入 128 个事件。如果本身并没有这么多事件需要发送,则不会存 128 个事件。

策略就是创建 Flowable 的第二个参数。

  1. ERROR

在 ERROR 策略下,如果缓存池溢出,就会立刻抛出 MissingBackpressureException 异常。

Flowable.create(FlowableOnSubscribe<Int> { emitter ->
          for (i in 0..129) {
              debug("发$i")
              emitter.onNext(i)
          }
          emitter.onComplete()
      }, BackpressureStrategy.ERROR) //增加了一个参数
              .subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(object : Subscriber<Int> {
                  override fun onSubscribe(s: Subscription) {
//                      s.request(2)
                  }

                  override fun onNext(integer: Int?) {
                      debug("收$integer")
                  }

                  override fun onError(t: Throwable) {
                      error { t.toString() }
                  }

                  override fun onComplete() {
                  }
              })

Flowable 发送 129 个事件,而 Subscriber 一个也不处理,在 onError 中就收到了错误回调。

backpress.PNG
  1. BUFFER

就是把 RxJava 中默认的只能存 128 个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。

这样,消费者即使通过 request() 传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。

但是这种方式仍然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生 OOM。

总之 BUFFER 要慎用。

  1. DROP

消费者处理不了的事件就丢弃。消费者通过 request() 传入其需求 n,然后生产者把 n 个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。

  1. LATEST

与 DROP 功能基本一致。唯一的区别就是 LATEST 总能使消费者能够接收到生产者产生的最后一个事件。

  1. MISSING

直接消失了,下游不知道任何情况,不知道有没有溢出。


如果 Flowable 对象不是通过 create() 获取的或不是自己创建的,可以采用 onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest() 的方式指定背压策略。

Flowable.just(1).onBackpressureBuffer()
                .observeOn(AndroidSchedulers.mainThread())

Processor

Processor 和 Subject 的作用相同的,既是观察者,也是被观察者。Subject 不支持背压,是 RxJava 1.x 继承过来的,Processor 继承 FlowableProcessor,支持背压。

不要使用 Flowable 或 Observable 里的方法,这样会将 Processor 转成一个 Flowable 或 Observable,用 Processor 内部重写的 create。

自己控制在合适的时机发射什么值,是 complete,还是 error。

  • AsyncProcessor

    不论何时订阅,都只发射最后一个数据,如果因为异常而终止,不会释放任何数据,但是会向 Observer 传递一个异常通知。

  • BehaviorProcessor

    发射订阅之前的一个数据和订阅之后的全部数据。如果订阅之前没有值,可以使用默认值。

  • PublishProcessor

    从哪里订阅就从哪里发射数据。

  • ReplayProcessor

    无论何时订阅,都发射所有的数据。

  • SerializedProcessor

    其它 Processor 不要在多线程上发射数据,如果确实要在多线程上使用,用这个 Processor 封装,可以保证在一个时刻只在一个线程上执行。

  • UnicastProcessor

    只能有一个观察者。

// 发射 3
// val processor = AsyncProcessor.create<Int>()
// 发射 2,3
// val processor = BehaviorProcessor.create<Int>()
// 发射 3
// val processor = PublishProcessor.create<Int>()
// 发射 1,2,3
val processor = ReplayProcessor.create<Int>()

processor.onNext(1)
processor.onNext(2)
processor.subscribe({Log.e("RX", "$it")})
processor.onNext(3)
processor.onComplete()
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • RxJava RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式...
    Mr槑阅读 928评论 0 5
  • 本节介绍的是关于Flowabale的使用,以及RxJava 2.x中的backpressure的处理策略。这部分内...
    Ruheng阅读 18,696评论 10 68
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • RxJava 2.x与RxJava 1.x的差异 Nulls这是一个很大的变化,熟悉 RxJava 1.x 的童鞋...
    天空在微笑阅读 311评论 0 0
  • 前言 如果你对RxJava1.x还不是了解,可以参考下面文章。 1. RxJava使用介绍 【视频教程】2. Rx...
    jdsjlzx阅读 21,116评论 3 78