RxSwift---高阶函数(五)

Rx的世界里,充斥着函数响应式,如果想在项目中游刃有余的玩转RxSwift那么RxSwift的高阶函数是不我们绕不开的内容

下面就一起在RxSwift高阶函数的世界中畅游吧

RxSwift高阶函数分为七类

  • 1.组合操作符
  • 2.映射操作符
  • 3.过滤条件操作符
  • 4.集合控制操作符
  • 5.从可观察对象的错误通知中恢复的操作符
  • 6.debug Rx流程操作符
  • 7.链接操作符

一.组合操作符

1.1 startWith

在开始从可观察源发出元素之前,发出指定的元素序列

// *** startWith : 在开始从可观察源发出元素之前,发出指定的元素序列
        print("***** startWith *****")
        Observable.of(1,2,3,4)
            .startWith(5)
            .startWith(6)
            .startWith(7,8,9)
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
打印结果:7,8,9,5,6,1,2,3,4

1.2 merge

将源可观察序列中的元素组合成一个新的可观察序列,并将像每个源可观察序列发出元素一样发出每个元素

        // **** merge : 将源可观察序列中的元素组合成一个新的可观察序列,并将像每个源可观察序列发出元素一样发出每个元素
        print("***** merge *****")
        let subject1 = PublishSubject<String>()
        let subject2 = PublishSubject<String>()
        Observable.of(subject1, subject2)
            .merge()
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        
        subject1.onNext("M")
        subject1.onNext("G")
        subject2.onNext("L")
        subject2.onNext("O")
        subject1.onNext("V")
        subject2.onNext("E")
        //任何一个响应都会勾起新序列响应
    打印结果: MGLOVE

1.3 zip

将多达8个源可观察序列组合成一个新的可观察序列,并将从组合的可观察序列中发射出对应索引处每个源可观察序列的元素

        //  *** zip: 将多达8个源可观测序列组合成一个新的可观测序列,并将从组合的可观测序列中发射出对应索引处每个源可观测序列的元素
        print("***** zip *****")
        
        let stringSubject = PublishSubject<String>()
        let intSubject = PublishSubject<Int>()
        
        Observable.zip(stringSubject, intSubject) { stringElement, intElement in
            "\(stringElement) \(intElement)"
        }.subscribe {
            print($0)
        }
        .disposed(by: disposeBag)
        
        stringSubject.onNext("A")
        stringSubject.onNext("B") // 到这里存储了 AB 但是不会响应,除非另一个响应
        
        intSubject.onNext(1) //响应第一个
        intSubject.onNext(2) //响应第二个
        stringSubject.onNext("C") //存C
        intSubject.onNext(3) //响应第三个
        //stringSubject,intSubject 必须一一对应,成对出现
        
        打印结果: (A,1) (B,2) (C,3)

1.4 combineLatest

将8个源可观察序列组合成一个新的观察序列,并将开始发出联合观察序列的每个源的最新元素,可观察序列一旦所有排放源序列至少有一个元素,并且当源可观察序列发出的任何一个新元素

        //  *****  combineLatest:将8源可观测序列组合成一个新的观测序列,并将开始发出联合观测序列的每个源的最新元素可观测序列一旦所有排放源序列至少有一个元素,并且当源可观测序列发出的任何一个新元素
        print("***** combineLatest *****")
        let stringSub = PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        
        Observable.combineLatest(stringSub, intSub) {strElement, intElement in
            "\(strElement) \(intElement)"
        }.subscribe {
            print($0)
        }
        .disposed(by: disposeBag)
        
        stringSub.onNext("M") //和zip类似 存一个
        stringSub.onNext("O") //这里和zip不一样了 O覆盖了M 只会存储一个
        intSub.onNext(1) //发现stringSub有值 响应(O,1)
        intSub.onNext(2) //由于M被覆盖了 响应(O,2)
        stringSub.onNext("T") //响应 (T,2)
        // combineLatest 比较zip 会覆盖
        // 应用非常频繁: 比如账户和密码同时满足->才能登录,不关心账户密码怎么变化,只要查看最后有值就可以 loginEnable
        打印结果:(O,1) (O,2)(T,2)

1.5 switchLatest

将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素

        // switchLatest : 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
        print("***** switchLatest *****")
        
        let switchLatestSub1 = BehaviorSubject(value: "M")
        let switchLatestSub2 = BehaviorSubject(value: "1")
        let switchLatestSub  = BehaviorSubject(value: switchLatestSub1)// 选择了 switchLatestSub1 就不会监听 switchLatestSub2
        
        switchLatestSub.asObserver()
            .switchLatest()
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        
        switchLatestSub1.onNext("G")
        switchLatestSub1.onNext("L")
        switchLatestSub2.onNext("2")
        switchLatestSub2.onNext("3") //2-3都不会监听,但是默认保存由 2覆盖1 3覆盖2
        switchLatestSub.onNext(switchLatestSub2) // 切换到 switchLatestSub2
        switchLatestSub1.onNext("E")
        switchLatestSub1.onNext("MO") // 原理同上面 下面如果再次切换到 switchLatestSub1 会打印出 TO
        switchLatestSub2.onNext("TO")

二.映射操作符

2.1 map

转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列

        // ***** map: 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。
        print("***** map *****")
        
        let ob = Observable.of(1,2,3,4)
        ob.map { number in
            return number + 1 //将上面序列每个元素都加1 变成一个新的序列
        }
        .subscribe {
            print($0)
        }
        .disposed(by: disposeBag)
打印结果:2,3,4,5

2.2 flatMap and flatMapLatest

将可观察序列发射的元素转换为可观察序列,并将两个可观察序列的发射合并为一个可观察序列
这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射作出反应(序列中的序列:比如网络序列中还有模型序列)
flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观察序列发射元素

        // *** flatMap and flatMapLatest
        // 将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列。
        // 这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射做出反应(序列中序列:比如网络序列中还有模型序列)
        // flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观测序列发射元素
        print("***** flatMap *****")
        let boy = BehaviorSubject(value: 100)
        let girl = BehaviorSubject(value: 90)
        let player = BehaviorSubject(value: boy)
        
        player.asObserver()
            .flatMap { $0.asObserver() }
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        boy.onNext(60)
        player.onNext(girl)
        boy.onNext(50)
        boy.onNext(40) //  如果切换到 flatMapLatest 就不会打印
        girl.onNext(10)
        girl.onNext(0)
        // flatMapLatest实际上是map和switchLatest操作符的组合。

2.3 scan

从初识就带有一个默认值,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果

        // ** scan: 从初始就带有一个默认值开始,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果
        print("***** scan *****")
        Observable.of(10, 100, 1000)
            .scan(2) { aggregateValue, newValue in
                aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 2
            }
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        // 这里主要强调序列值之间的关系
        //打印结果 12,112,1112

三.过滤条件操作符

3.1 filter

仅从满足指定条件的可观察序列中发出那些元素

        // **** filter : 仅从满足指定条件的可观察序列中发出那些元素
        print("***** filter *****")
        Observable.of(1,2,3,4,5,6,7,8,9,0)
            .filter {$0 % 2 == 0} //挑选出其中的偶数
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)

3.2 distinctUntilChanged

抑制可观察序列发出的顺序重复元素

        // ***** distinctUntilChanged: 抑制可观察序列发出的顺序重复元素
        print("***** distinctUntilChanged *****")
        Observable.of("1", "2", "2", "2", "3", "3", "4")
            .distinctUntilChanged()
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        //打印结果 1,2,3,4

3.3 elementAt

仅在可观察序列发出的所有元素的指定索引处发出元素

        // **** elementAt: 仅在可观察序列发出的所有元素的指定索引处发出元素
        print("***** elementAt *****")
        Observable.of("1", "2", "3", "4", "5")
            .element(at: 0) //下标
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        //打印结果 1

3.4 single

只发出可观察序列发出的第一个元素(或满足条件的第一个元素),如果可观察序列发出多个元素,将抛出一个错误

        // *** single: 只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
        print("***** single *****")
        Observable.of("1", "2")
            .single()
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        
        Observable.of("A", "B")
            .single {
                $0 == "B"
            }
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        //打印结果 1,错误,B

3.5 take

只从一个可观察序列的开始发出指定数量的元素,上面single只有一个序列,在实际开发会受到局限,这里引出take想几个就几个

// **** take: 只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个
        print("***** take *****")
        Observable.of("A","B","C","D")
            .take(2) //打印的数量
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)

3.6 takeLast

仅从可观察序列的末尾发出指定数量的元素

        // *** takeLast: 仅从可观察序列的末尾发出指定数量的元素
        print("***** takeLast *****")
        Observable.of("A", "B", "C", "D")
            .takeLast(1) //打印的数量
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        //打印结果 D

3.7 takeWhile

只要指定条件的值为true,就从可观察序列的开始发出元素

        // **** takeWhile: 只要指定条件的值为true,就从可观察序列的开始发出元素
        print("***** takeWhile *****")
        Observable.of(1, 2, 3, 4, 5, 6)
            .take(while: { $0 < 3 })
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        
        //打印结果 1,2

3.8 takeUntil

从源可观察序列发出元素,直到参考可观察序列发出元素,非常常用,比如我的页面销毁了,就不能获取值了(cell重用运用)

        // ***** takeUntil: 从源可观察序列发出元素,直到参考可观察序列发出元素
        // 这个要重点,应用非常频繁 比如我页面销毁了,就不能获取值了(cell重用运用)
        print("***** takeUntil *****")
        let sourceSequence = PublishSubject<String>()
        let referenceSequence = PublishSubject<String>()
        
        sourceSequence
            .take(until: referenceSequence)
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        
        sourceSequence.onNext("A")
        sourceSequence.onNext("B")
        sourceSequence.onNext("C")
        
        referenceSequence.onNext("1") //条件出来。下面就不走了
        
        sourceSequence.onNext("D")
        sourceSequence.onNext("E")
        sourceSequence.onNext("F")
//打印结果 A,B,C

3.8 skip

从源可观察序列发出元素,直到参考可观察序列发出元素,非常常用,比如textField会有默认序列的产生

        // ***** skip: 从源可观察序列发出元素,直到参考可观察序列发出元素
        // 这个要重点,应用非常频繁 不用解释 textfiled 都会有默认序列产生
        print("***** skip *****")
        Observable.of(1, 2, 3, 4, 5, 6)
            .skip(2) //跳过2次,1,2不执行
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        //打印结果 3,4,5,6

3.9 skipUntil

抑制从源可观察序列发出元素,直到参考可观察序列发出元素

        // *** skipUntil: 抑制从源可观察序列发出元素,直到参考可观察序列发出元素
        print("***** skipUntil *****")
        let sourceSeq = PublishSubject<String>()
        let referenceSeq = PublishSubject<String>()
        
        sourceSeq
            .skip(until: referenceSeq)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        // 没有条件命令 下面走不了
        sourceSeq.onNext("A")
        sourceSeq.onNext("B")
        sourceSeq.onNext("C")
        
        referenceSeq.onNext("1") // 条件一出来,下面就可以走了
        
        sourceSeq.onNext("D")
        sourceSeq.onNext("E")
        sourceSeq.onNext("F")
        //打印结果 D,E,F

四.集合控制操作符

4.1 toArray

将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止

        // *** toArray: 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
        print("***** toArray *****")
        Observable.range(start: 1, count: 10)
            .toArray()
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        //打印结果 1-10

4.2 reduce

从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似scan

        // *** reduce: 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似scan
        print("***** reduce *****")
        Observable.of(10, 100, 1000)
            .reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        //打印结果 1111

4.3 concat

以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止,用来控制顺序

        // *** concat: 以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止
        // 用来控制顺序
        print("***** concat *****")
        let subject1 = BehaviorSubject(value: "A")
        let subject2 = BehaviorSubject(value: "1")
        
        let subjectsSubject = BehaviorSubject(value: subject1)
        
        subjectsSubject.asObservable()
            .concat()
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        subject1.onNext("B")
        subject1.onNext("C")
        
        subjectsSubject.onNext(subject2)
        
        subject2.onNext("打印不出来")
        subject2.onNext("2")
        subject2.onNext("3") //会保存最后一个
        
        subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
        subject2.onNext("4")
        
        //打印结果:A,B,C,3,4

五. 从可观察对象的错误通知中恢复的操作符。

5.1 catchErrorJustReturn

从错误事件中恢复,方式是返回一个可观察的序列,该序列发出单个元素,然后终止

        // **** catchErrorJustReturn
        // 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
        print("***** catchErrorJustReturn *****")
        let sequenceThatFails = PublishSubject<String>()
        
        sequenceThatFails
            .catchAndReturn("A")
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("B")
        sequenceThatFails.onNext("C") // 正常序列发送成功的
        sequenceThatFails.onError(self.mgError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
//打印结果 A,B,C,完成

5.2 catchError

通过切换到提供的恢复可观察序列,从错误事件中恢复

        // **** catchError
        // 通过切换到提供的恢复可观察序列,从错误事件中恢复
        print("***** catchError *****")
        let sequenceThatFails = PublishSubject<String>()
        let recoverySequence = PublishSubject<String>()
        
        sequenceThatFails.catch {
            print("Error:", $0)
            return recoverySequence  // 获取到了错误序列-我们在中间的闭包操作处理完毕,返回给用户需要的序列(showAlert)
        }
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("A")
        sequenceThatFails.onNext("B")  // 正常序列发送成功的
        sequenceThatFails.onError(mgError) // 发送失败的序列
        recoverySequence.onNext("C")
        //打印结果: 错误, C

5.3 retry

通过无限的重新订阅可观察序列来恢复重复的错误事件

        // *** retry: 通过无限地重新订阅可观察序列来恢复重复的错误事件
        print("***** retry *****")
        var count = 1 //外界变量控制流程
        let sequenceRetryErrors = Observable<String>.create { observer in
            observer.onNext("A")
            
            if count == 1 {  // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
                observer.onError(self.mgError)// 接收到了错误序列,重试序列发生
                print("错误序列来了")
                count += 1
            }
            observer.onNext("1")
            
            
            return Disposables.create()
        }
        
        sequenceRetryErrors.retry()
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        
        //打印结果: A,错误序列来了,A,1

5.4 retry(_:)

通过重新订阅可观察序列,重复的从错误事件中恢复,直到重试次数达到Max未遂计数

        // **** retry(_:): 通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数
        print("***** retry(_:) *****")
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("A")
                        
            if count < 5 { // 这里设置的错误出口是没有太多意义的额,因为我们设置重试次数
                observer.onError(self.mgError)
                print("错误序列来了")
                count += 1
            }
            
            observer.onNext("1")
            
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3) //重复次数
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
        
        //打印结果 A,错误序列来了,A,错误序列来了,A,错误序列来了,error

六. Rx流程操作符

6.1 debug

打印所有订阅、事件和处理

  // 打印所有订阅、事件和处理。
        print("***** debug *****")
        var count = 1
        
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("A")
            
            if count < 5 {
                observer.onError(self.mgError)
                print("错误序列来了")
                count += 1
            }
            
            observer.onNext("1")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(1) //重复次数
            .debug()
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)

6.2 RxSwift.Resources.total:

提供所有Rx资源分配的计数,这对于在开发期间检测泄漏非常有用

        // ** RxSwift.Resources.total: 提供所有Rx资源分配的计数,这对于在开发期间检测泄漏非常有用。
        print("***** RxSwift.Resources.total *****")

        print(RxSwift.Resources.total)

        let subject = BehaviorSubject(value: "Cooci")

        let subscription1 = subject.subscribe(onNext: { print($0) })

        print(RxSwift.Resources.total)

        let subscription2 = subject.subscribe(onNext: { print($0) })

        print(RxSwift.Resources.total)

        subscription1.dispose()

        print(RxSwift.Resources.total)

        subscription2.dispose()

        print(RxSwift.Resources.total)

七.链接操作符

7.1 multicast

将源可观察序列转换为可连接序列,并通过指定的主题广播其发射

       // *** multicast : 将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。
        print("***** multicast *****")
        let subject = PublishSubject<Any>()
        subject.subscribe{ print("00:\($0)") }
            .disposed(by: disposeBag)
        
        let netOB = Observable<Any>.create { observer -> Disposable in
            sleep(2)// 模拟网络延迟
            print("我开始请求网络了")
            observer.onNext("请求到的网络数据")
            observer.onNext("请求到的本地")
            observer.onCompleted()
            return Disposables.create {
                print("销毁回调了")
            }
        }.publish()
        
        netOB.subscribe(onNext: { anything in
            print("订阅1:", anything)
        })
        .disposed(by: disposeBag)

        // 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的地方
        // 所以再订阅一次 会出现什么问题?
        netOB.subscribe(onNext: { anything in
            print("订阅2:", anything)
        })
        .disposed(by: disposeBag)
        
        _ = netOB.connect()

7.2 replay

将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放以前排放的缓冲大小
拥有和publish一样的能力,共享Observable sequence,其次使用replay还需要我们传入一个参数(buffer size)来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者

       // **** replay: 将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放以前排放的缓冲大小
        // 首先拥有和publish一样的能力,共享 Observable sequence, 其次使用replay还需要我们传入一个参数(buffer size)来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者
        print("***** replay *****")

        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
        
        interval.subscribe(onNext: { print(Date.time, "订阅: 1, 事件: \($0)") })
            .disposed(by: self.disposeBag)
        
        delay(2) { _ = interval.connect() }
        
        delay(4) {
            interval.subscribe(onNext: { print(Date.time, "订阅: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        
        delay(8) {
            interval.subscribe(onNext: { print(Date.time, "订阅: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(20, closure: {
            self.disposeBag = DisposeBag()
        })
        
        /**
         订阅: 1, 事件: 4
         订阅: 1, 事件: 0
         2019-05-28 21-32-42 订阅: 2, 事件: 0
         2019-05-28 21-32-42 订阅: 1, 事件: 1
         2019-05-28 21-32-42 订阅: 2, 事件: 1
         2019-05-28 21-32-45 订阅: 2, 事件: 4
         2019-05-28 21-32-46 订阅: 3, 事件: 0
         2019-05-28 21-32-46 订阅: 3, 事件: 1
         2019-05-28 21-32-46 订阅: 3, 事件: 2
         2019-05-28 21-32-46 订阅: 3, 事件: 3
         2019-05-28 21-32-46 订阅: 3, 事件: 4
         
         // 序列从 0开始
         // 定时器也没有断层  sub2 sub3 和 sub1 是同步的
         */

7.3 push

将源可观察序列转换为可连接序列
共享一个Observable的事件序列,避免创建多个Observable sequence
注意:需要调用connect之后才会开始发送事件

       // **** push:将源可观察序列转换为可连接序列
        // 共享一个Observable的事件序列,避免创建多个Observable sequence。
        // 注意:需要调用connect之后才会开始发送事件
        print("***** testPushConnect *****")

        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
        
        interval.subscribe(onNext: { print("订阅: 1, 事件: \($0)") })
            .disposed(by: disposeBag)
        
        delay(2) {
            _ = interval.connect()
        }
        delay(4) {
            interval.subscribe(onNext: { print("订阅: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(6) {
            interval.subscribe(onNext: { print("订阅: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(10, closure: {
            self.disposeBag = DisposeBag()
        })
        /**
            订阅: 1, 事件: 1
            订阅: 2, 事件: 1
            订阅: 1, 事件: 2
            订阅: 2, 事件: 2
            订阅: 1, 事件: 3
            订阅: 2, 事件: 3
            订阅: 3, 事件: 3
         
            订阅: 2 从1开始
            订阅: 3 从3开始
        */
        // 但是后面来的订阅者,却无法得到之前已发生的事件
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容