RxJava 过滤操作符

ReactiveX 系列文章目录


blockingFirst/blockingLast

阻塞直到返回第一个/最后一个才发射数据。

Log.e("RX", "first ${Observable.just(1,2,3,4).blockingFirst()}")
// Observable 没发射,返回默认值
Log.e("RX", "first default ${Observable.empty<Int>().blockingFirst(10)}")
Log.e("RX", "last ${Observable.just(1,2,3,4).blockingLast()}")
Log.e("RX", "last default ${Observable.empty<Int>().blockingLast(10)}")

结果

first 1
first default 10
last 4
last default 4

现在想到的应用场景也许是某些 Callable 返回的数据之类,先过滤拿到第一个,然后再用 Observable 发射这第一个数。

blockingSingle

val ob = Observable.just(1)
val i = ob.blockingSingle()
Log.e("RX", "$i")

如果发射数据只有一个,返回这个发射的值;如果多于一个,抛出异常;如果没有发射数据,返回参数传递的默认值。

distinct

public final Observable<T> distinct()
public final <K> Observable<T> distinct(Function<? super T, K> keySelector)
public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier)

去掉发射的重复数据。

val ob = Observable.fromArray(1,2,3,2,2,3,3,4,5,6,6,6,9)

// 默认规则去重,收到 1,2,3,4,5,6,9
ob.distinct().subscribe(observerInt)

// 求余 3 后比较,4 认为和 1 重复,5 认为和 2 重复,最终只收到 1,2,3
ob.distinct { it % 3 }.subscribe(observerInt)

最后一个构造方法,第三个参数提供一个集合,看源码是先通过前面的 keySelector 方法获取转换后的数据,然后往集合里面 add 要转换后的数据,只有这个 add 返回 true,才会通过 onNext 发射数据。而没有这个参数的方法,内部都是用的 HashSet,不许添加重复元素,这样前面的规则判断出哪些是重复,然后这里添加。

现在我写一个没什么意义的 Collection

class MyCollection<Int>: ArrayList<Int>() {
    override fun add(element: Int): Boolean {
        super.add(element)
        return (element == 6 || element == 9 || element == 3)
    }
}

就是说只要这个值是 3,6,9 中的一个,add 方法就返回 true,其余值一律 false

// 收到 3,3,3,6,6,6,9
ob.distinct ({ it }, {MyCollection<Int>()}).subscribe(observerInt)

虽然根据默认规则,多个 6 是相同的,但是往集合 add 返回值是 true,所以仍然发射出来,所以有多个 6。

// 只收到 onComplete
ob.distinct ({ it % 3 }, {MyCollection<Int>()}).subscribe(observerInt)

因为规则是 it % 3,所以大于 3 的数经转换全部变成了 0,1,2,然后 add 返回值全部 false,所以一个都没有发射出来。

distinctUntilChanged

public final Observable<T> distinctUntilChanged()
public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector)
public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer)

去除相邻的重复数据。也是通过一个 keySelector 来对数据做相应转换,然后通过 BiPredicate 对象 compare 来判断两个数据是否相等。

val ob = Observable.fromArray(1,2,3,2,2,3,3,4,5,6,6,6,9)
// 收到 1,2,3,2,3,4,5,6,9
//            ob.distinctUntilChanged().subscribe(observerInt)
// 都先转成了 1,然后用默认的 equals 判断,所有都相等,所以只收到 1
//            ob.distinctUntilChanged( Function<Int, Int> { 1 } ).subscribe(observerInt)
// 虽然数据没转换,但判断是否相等时都认为是一样的,所以也只收到 1
ob.distinctUntilChanged({ _, _ -> true}).subscribe(observerInt)

elementAt/elementAtOrError/firstElement/first/firstOrError/lastElement/last/lastOrError

只发射指定位置的数据。返回 Maybe。

// 越界,则发射 onComplete,否则发射指定 index 的数据
public final Maybe<T> elementAt(long index)
// 越界,用默认值 defaultItem
public final Single<T> elementAt(long index, T defaultItem)
// 越界,抛出异常
public final Single<T> elementAtOrError(long index)

public final Maybe<T> firstElement() {
    return elementAt(0L);
}
public final Single<T> first(T defaultItem) {
    return elementAt(0L, defaultItem);
}
public final Single<T> firstOrError() {
    return elementAtOrError(0L);
}

// 空,发射 onComplete
public final Maybe<T> lastElement()
// 空,用默认值
public final Single<T> last(T defaultItem)
// 空,抛出异常
public final Single<T> lastOrError()
Observable.just(1, 2, 3, 4).elementAt(2)
            .subscribe({ textView.text = "${textView.text}\n $it" })

Observable.just(1, 2, 3, 4).elementAt(6, 0)
                  .subscribe(Consumer<Int> { textView.text = "${textView.text}\n $it" })

Observable.just(1, 2, 3, 4).elementAtOrError(6)
                   .subscribe(Consumer<Int> { textView.text = "${textView.text}\n $it" })

filter

过滤。

Observable.just(1,2,3,4,5,6)
     .filter({ it % 2 == 0 }).subscribe(observerInt)

filter 的参数是 Predicate,test 方法用于过滤,返回 false 的丢弃。

public interface Predicate<T> {
    /**
     * Test the given input value and return a boolean.
     * @param t the value
     * @return the boolean result
     * @throws Exception on error
     */
    boolean test(@NonNull T t) throws Exception;
}

ofType

内部调用 filter,过滤指定的 Class 类型的数据。

// 只收到 a 和 b
Observable.just(1,2,"a","b", 4L, true)
                                .ofType(String::class.java).subscribe { t-> textView.text = "${textView.text}\n $t"}

ignoreElements

忽略所有数据,只接受 complete 或 error 事件。

Observable.range(0, 5)
        .ignoreElements().subscribe(object : Action {
            override fun run() {
                textView.text = "${textView.text}\n complete"
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                textView.text = "${textView.text}\n error"
            }
        })

singleElement

val observer = object : MaybeObserver<Int> {
        override fun onSuccess(t: Int) { Log.e("RX", "onSuccess $t") }
        override fun onComplete() { Log.e("RX", "onComplete") }
        override fun onSubscribe(d: Disposable) {}
        override fun onError(e: Throwable) {}
}

// 因为发射了两个数,所以什么都收不到
// val observable = Observable.just(1,2)
// onSuccess 1
// val observable = Observable.just(1)
// onComplete
val observable = Observable.empty<Int>()

observable.singleElement().subscribe(observer)

single/singleOrError

val observer = object : SingleObserver<Int> {
    override fun onSuccess(t: Int) { Log.e("RX", "onSuccess $t") }
    override fun onSubscribe(d: Disposable) {}
    override fun onError(e: Throwable) {Log.e("RX", "onError ${e.message}")}
}
// 超过一个,抛异常,进 onError
// val observable = Observable.just(1,2)
// onSuccess 1
// val observable = Observable.just(1)
// 发射数据为空,使用默认值,onSuccess 100
val observable = Observable.empty<Int>()
observable.single(100).subscribe(observer)

// 如果只发射一个值,进入 onSuccess,否则都是 onError
observable.singleOrError().subscribe(observer)

skip/skipLast/skipUntil/skipWhile

// 跳过指定数目
public final Observable<T> skip(long count)
// 跳过指定时间
public final Observable<T> skip(long time, TimeUnit unit)
public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler)

// 反方向跳过
public final Observable<T> skipLast(int count)
public final Observable<T> skipLast(long time, TimeUnit unit)
public final Observable<T> skipLast(long time, TimeUnit unit, boolean delayError)
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler)
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

public final <U> Observable<T> skipUntil(ObservableSource<U> other)

public final Observable<T> skipWhile(Predicate<? super T> predicate)
val ob = Observable.intervalRange(0, 10, 0, 100, TimeUnit.MILLISECONDS)
val consumer  = Consumer<Long>{ Log.e("RX", "$it") }

// 收到 3-9
ob.skip(3).subscribe(consumer)
// 跳过 400ms,收到 4-9
ob.skip(400, TimeUnit.MILLISECONDS).subscribe(consumer)

// 收到 0-6
ob.skipLast(3).subscribe(consumer)

// 直到返回的 Observable 开始发射,这之前源 Observable 发射的数据跳过
// 收到 5-9
ob.skipUntil(Observable.timer(500, TimeUnit.MILLISECONDS))
            .subscribe(consumer)

// 只要条件满足就跳过,收到 4-9
var i = 0
ob.skipWhile { i++ < 4 }.subscribe(consumer)

take/takeLast/takeUntil/takeWhile

// 最多发多少个数据
public final Observable<T> take(long count)
// 只发射某个时间间隔前的数据
public final Observable<T> take(long time, TimeUnit unit)
public final Observable<T> take(long time, TimeUnit unit, Scheduler scheduler)

// 反方向
public final Observable<T> takeLast(int count)
// 只发射在源 Observable 的 complete 之前某个间隔的数据
public final Observable<T> takeLast(long time, TimeUnit unit)
public final Observable<T> takeLast(long time, TimeUnit unit, boolean delayError)
public final Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler)
public final Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
public final Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

// 同时用时间和数目的限制
public final Observable<T> takeLast(long count, long time, TimeUnit unit)
public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler)
public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

// 当 other 发射时不再从源 Observable 中取
public final <U> Observable<T> takeUntil(ObservableSource<U> other)
// 返回 true 时停止 take,发射的数据作为参数
public final Observable<T> takeUntil(Predicate<? super T> stopPredicate)
// 条件满足时一直 take
public final Observable<T> takeWhile(Predicate<? super T> predicate)
val ob = Observable.intervalRange(0, 10, 0, 100, TimeUnit.MILLISECONDS)
val consumer  = Consumer<Long>{ Log.e("RX", "$it") }

ob.take(3).subscribe(consumer) // 0,1,2

// complete 前 300ms 的数据,即 7,8,9
ob.takeLast(300, TimeUnit.MILLISECONDS).subscribe(consumer)

// 参数的 Observable 开始发射时停止 take,收到 0,1,2
ob.takeUntil(Observable.timer(300, TimeUnit.MILLISECONDS)).subscribe(consumer)

// 发射的数据大于 3 时,停止 take,收到 0,1,2,3,4
ob.takeUntil { it > 3 }.subscribe(consumer)

// 发射的数据小于 3 时就一直 take,收到 0,1,2
ob.takeWhile { it < 3 }.subscribe(consumer)

debounce/throttleWithTimeout

public final Observable<T> debounce(long timeout, TimeUnit unit)

每产生一个数据后,如果在规定的间隔时间内没有别的数据产生,就会发射这个数据,否则忽略该数据。

Observable.create(ObservableOnSubscribe<Int> { emitter ->
    emitter.onNext(1)
    Thread.sleep(100)
    emitter.onNext(2)
    Thread.sleep(300)
    emitter.onNext(3)
    Thread.sleep(200)
    emitter.onNext(4)
    Thread.sleep(400)
}).debounce(250, TimeUnit.MILLISECONDS)
    .subscribe({
        Log.e("RX", "$it")
    })

日志显示 2 和 4,首先发射 1,100ms 后发射 2,也就是说发射 1 后的 250ms 以内又发射了其它数,那么忽略 1,2 发射后 250ms 内并没有新数据发出,因为 300ms 后才发出 3,所以接收了 2,3 发射后 200ms 发射 4,那么忽略 3,4 之后没数据再发射,所以接收 4.

public final <U> Observable<T> debounce(Function<? super T, ? extends ObservableSource<U>> debounceSelector)
Observable.create(ObservableOnSubscribe<Int> { emitter ->
    emitter.onNext(1)
    Thread.sleep(100)
    emitter.onNext(2)
    Thread.sleep(300)
    emitter.onNext(3)
    Thread.sleep(200)
    emitter.onNext(4)
}).debounce({
    if (it == 2)
        Observable.timer(200, TimeUnit.MILLISECONDS)
    else
        Observable.timer(100, TimeUnit.MILLISECONDS)
  })
  .subscribe({
      Log.e("RX", "$it")
  })

Function 里方法返回的 Observeable,在它发射结束之前,原始 Observable 发射了新的数据,旧的被忽略。比如发射 1 时,返回的这个 Observable 需要 200ms 后才结束,而 100ms 后就发射了 2,所以 1 没了,对于 2,返回的 Observable 也要 200ms 结束,它结束后再过 100ms 才发了 3,所以 2,3 都在。

throttleWithTimeout 内部就是直接调用了 debounce。

sample/throttleLast

定期扫描源 Observable 产生的结果,在指定的间隔周期内进行采样。

public final Observable<T> sample(long period, TimeUnit unit)
// emitLast 是否发射最后一个数据,若没有这个参数,默认是 false
public final Observable<T> sample(long period, TimeUnit unit, boolean emitLast)
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler)
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast)
// 收到 3,6,9,12,15,18
Observable.intervalRange(1, 20, 0,100, TimeUnit.MILLISECONDS)
                         .sample(300, TimeUnit.MILLISECONDS).subscribe { Log.e("RX", "$it") }

// 收到 3,6,9,12,15,18,20
Observable.intervalRange(1, 20, 0,100, TimeUnit.MILLISECONDS)
                         .sample(300, TimeUnit.MILLISECONDS, true).subscribe { Log.e("RX", "$it") }
public final <U> Observable<T> sample(ObservableSource<U> sampler)
public final <U> Observable<T> sample(ObservableSource<U> sampler, boolean emitLast)

参数里的 Observable 发射时,从源 Observable 中取出一个发射。

// 收到 1,2,3,...,8,9
Observable.intervalRange(1, 20, 0, 100, TimeUnit.MILLISECONDS)
                .sample(Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS))
                .subscribe { Log.e("RX", "$it") }

// 收到 2,4,6,...,16,18
Observable.intervalRange(1, 20, 0, 100, TimeUnit.MILLISECONDS)
                  .sample(Observable.intervalRange(1, 10, 200, 200, TimeUnit.MILLISECONDS))
                  .subscribe { Log.e("RX", "$it") }

// 什么都收不到
Observable.intervalRange(1, 20, 0, 100, TimeUnit.MILLISECONDS)
        .sample(Observable.just("a","b"))
        .subscribe { Log.e("RX", "$it") }

throttleLast 内部直接就是调用 sample 方法,两者没有任何区别,只是它的重载方法只有两个。

public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit)
public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler)

throttleFirst

和 throttleLast 基本一样,throttleLast 是每次采取取最后那个,而 throttleFirst 是第一项。

// throttleLast 收到 3,6,9,12,15,18
// throttleFirst 收到 1,5,9,13,17
Observable.intervalRange(1, 20, 0,100, TimeUnit.MILLISECONDS)
                    .throttleFirst(300, TimeUnit.MILLISECONDS)
                    .subscribe { Log.e("RX", "$it") }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,738评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,377评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,774评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,032评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,015评论 5 361
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,239评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,724评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,374评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,508评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,410评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,457评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,132评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,733评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,804评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,022评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,515评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,116评论 2 341

推荐阅读更多精彩内容