05. RxSwift源码解读:Connection

今天介绍可连接序列和连接相关操作符

可连接序列 Connection Observable, 不同于一般的序列,有订阅时不会立刻开始发送事件消息,只有当调用 connect()之后才会开始发送值。

connect & publish 操作符

我们看一个例子:

      let observable = Observable<Int>.create({ anyObserver in
            print("subscrition")
            anyObserver.onNext(1)
            anyObserver.onNext(2)
            anyObserver.onNext(3)
            return Disposables.create()
        })
        .publish()
        
        observable
            .subscribe {
                print("订阅1", $0)
            }
            .disposed(by: bag)
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            _ = observable.connect()
        }
        
        observable.subscribe {
            print("订阅2", $0)
        }
subscrition
订阅1 next(1)
订阅2 next(1)
订阅1 next(2)
订阅2 next(2)
订阅1 next(3)
订阅2 next(3)

姑且把创建序列传入的闭包称之为subscribe handler

3秒后才会打印信息,因为3秒后才连接。而且我们发现subscrition只打印了一次,说明subscribe handler只执行了一遍,如果去掉publish和connect,则会执行两遍,相当于每次订阅都会重新执行一遍;如果只去掉connect,则不会发送事件。

对于上面的结果我们分析一下源码:
对于publish操作符,不妨看看:

public func publish() -> ConnectableObservable<Element> {
      self.multicast { PublishSubject() }
}

publish本质上调用了multicast操作符:

    public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
        -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
        ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
    }

返回一个ConnectableObservableAdapter对象,它也是个Observable,保存了原始的Observable和 makeSubject闭包。

  • ConnectableObservableAdapter继承了ConnectableObservable,ConnectableObservable表示一个可连接的序列,ConnectableObservable遵循ConnectableObservableType协议声明一个connect()协议,表示连接操作。
  • SubjectType表示这是一个Subject,在上一篇文章已经讲述过,那么连接操作实际上将一个普通的Observable连接到一个Subject,而publish操作明显是接到PublishSubject,它是没有buffer的Subject。

然后再看看subscribe代码在哪:

    public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
        let observer = AnonymousObserver { e in
            on(e)
        }
        return self.asObservable().subscribe(observer)
    }

然后调用熟悉的subscribe(observer), 这里会进入ConnectableObservableAdapter的subscribe:

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        self.lazySubject.subscribe(observer)
    }

这里的lazySubject是通过makeSubject闭包创建的PublishSubject();通过它继续调用subscribe,这里相当于做了一次变形,而我们在上一篇已经介绍过PublishSubject执行subscribe是不会发送事件消息的(先不考虑error和complete事件),它只是将观察者插入到observers中保存起来,这块的代码上一篇已经详细讲解过,这里不再讲解。所以就解释了为什么connect之前调用subscribe不会发送事件消息;那么connect又干了什么呢?我们继续看connect的代码:

    // at `ConnectableObservableAdapter` class
    override func connect() -> Disposable {
        return self.lock.performLocked {
            if let connection = self.connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self.lock, subscription: singleAssignmentDisposable)
            self.connection = connection
            let subscription = self.source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }

这里创建了一个Connection,并且执行了原始序列的subscribe(connection), 以connection作为参数,原始序列是最初的AnonymousObservable, AnonymousObservable的subscribe方法实现已经比较熟悉了,不用再看。它最终会执行最初创建序列的闭包。但是因为传入的observer是connection,而且Connection类重写了on方法,所以在调用onNext时,会调用到这个on方法:

      func on(_ event: Event<Subject.Observer.Element>) {
        if isFlagSet(self.disposed, 1) {
            return
        }
        if event.isStopEvent {
            self.dispose()
        }
        self.subjectObserver.on(event)
    }

然后执行PulishSubject的on方法:这里会向所有观察者发送消息,例子中的观察者有两个。意思是例子每次调用onNext会通知所有的观察者。这也解释了为什么“subscrition”只打印了一遍,onNext每个打印两遍。如果先connect再subscribe,则只会打印“subscrition”,因为connect时还没有观察者。

share

share操作符可接受两个参数,replay 和 scope, replay表示缓存的个数或者叫重放的次数,这个好理解,而scope可接受两个枚举值.whileConnected 和 .forever,这两个什么区别:

  • .whileConnected 只有在连接时才支持缓存和重放,意味着如果连接断开则不能重放,下次再订阅时只能重新执行subscribe handler,一般什么操作会断开连接?当发送error或complete事件或者回收资源时会断开连接。
  • .forever 永远支持缓存和重放,无论是否断开连接。
    我们看看官方给的例子:
func testScope(){
        let xs = Observable.deferred { () -> Observable<TimeInterval> in
                print("Performing work ...")
                return Observable.just(Date().timeIntervalSince1970)
            }
            .share(replay: 1, scope: .forever)

        _ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
        _ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
        _ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
    }

这里的just方法会发送一个单一的元素:当前时间戳,还会在发一个complete事件,scope设置的是forever
打印结果:

Performing work ...
next 1622777422.7355828
completed

next 1622777422.7355828
completed

next 1622777422.7355828
completed

第一次订阅时,执行了subscribe handler,打印了Performing work ... next和complete,之后两次订阅时是对第一次的重放,打印的时间戳也一样。我们设置重放的个数为1,所以会重放一次next,同时也会重放complete;第一次执行complete其实以及断开连接了,但是依然能重放,说明就算连接断开了依然支持重放,这就是forever的意思。
如果改成.willConnected
则无法支持重放,打印结果:

Performing work ...
next 1622778439.6327991
completed

Performing work ...
next 1622778439.6332831
completed

Performing work ...
next 1622778439.633359
completed

这里相当于执行了三遍subscre handler,所以Performing work ...打印的三次,而且3次next打印的时间戳不一样;为什么呢,因为第一订阅之后就执行了complete事件了(just操作符会执行一次onNext+一次onComplete),这时候连接断开,再次订阅时需要重新建立连接,然又会重新执行subscribe handler。
我们再看看share的源码:

      public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
        -> Observable<Element> {
        switch scope {
        case .forever:
            switch replay {
            case 0: return self.multicast(PublishSubject()).refCount()
            default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
            }
        case .whileConnected:
            switch replay {
            case 0: return ShareWhileConnected(source: self.asObservable())
            case 1: return ShareReplay1WhileConnected(source: self.asObservable())
            default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
            }
        }
    }

当scope == forever时,调用的multicast, replay == 0 连接的subject是PublishSubject,这个跟publish的操作符是一样的,不一样的是后面接了一个refCount操作符。
refCount返回一个RefCount对象,并将当前对象作为source

    public func refCount() -> Observable<Element> {
        RefCount(source: self)
    }

RefCount是一个与源保持连接的可观察序列,只要该可观察序列至少有一次订阅。RefCount有一个对应的RefCountSink类,外面执行订阅时会调用RefCountSink的run方法:

 func run() -> Disposable {
        let subscription = self.parent.source.subscribe(self)
        self.parent.lock.lock(); defer { self.parent.lock.unlock() }

        self.connectionIdSnapshot = self.parent.connectionId

        if self.isDisposed {
            return Disposables.create()
        }

        if self.parent.count == 0 {
            self.parent.count = 1
            self.parent.connectableSubscription = self.parent.source.connect()
        }
        else {
            self.parent.count += 1
        }
        /// 省略。。。

第一句执行self.parent.source.subscribe(self),这里的parent是RefCount对象,而它的source是ConnectableObservableAdapter对象,它的subscribe代码:

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        self.lazySubject.subscribe(observer)
    }

lazySubject是要连接的Subject,之后调用subject的subscribe,这个subscribe会执行重放和插入观察者操作,回到RefCountSink的run:
后面有一个关键代码:

        if self.isDisposed {
            return Disposables.create()
        }

因为第一订阅时,会执行complete,所以执行当前dispose,第二次订阅时self.isDisposed == true,直接返回了。所以第二次第三次订阅时不会执行subscribe handler,只做重放。
如果share操作改成share(replay:2, .whileConnected), 执行的代码是return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount(), 这个跟 forever分支内的 return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()代码很像,为什么一个是forever一个是whileConnected。关键在于return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()这里只会创建一个ReplaySubject对象,而 { ReplaySubject.create(bufferSize: replay) }是一个闭包,每次在执行闭包时都会创建一个新的subject。所以每次订阅时都会创建一个新的subject。新的subject不存在重放的buffer;上面的代码self.isDisposed 一直是false,这导致每次需要再次连接。而每次连接都会执行subscribe handler。

还剩下下面两个case的代码没有解读:

 case 0: return ShareWhileConnected(source: self.asObservable())
 case 1: return ShareReplay1WhileConnected(source: self.asObservable())

ShareWhileConnected 处理无buffer的情况,ShareReplay1WhileConnected处理一个buffer的情况。这两个类非常相似,这两个分支并没有连接Subject,而是用ShareReplay1WhileConnected和ShareWhileConnected代替了Subject的功能。在ShareReplay1WhileConnected和ShareWhileConnected中,保存了所有的观察者;在ShareReplay1WhileConnected中保存了最后一个元素,这是为了实现重放。
它们都持有一个Connection,分别是ShareWhileConnectedConnection和ShareReplay1WhileConnectedConnection,这两个类实现了连接:

    final func connect() {
        self.subscription.setDisposable(self.parent.source.subscribe(self))
    }

这里会去调用原始序列的subscribe,最终调用subscribe handler。
同时实现了订阅和通知观察者的功能,代码类似于Subject。

    fileprivate var observers = Observers()
    private var element: Element?

那这两个类是怎么实现whileConnected效果的? 看看ShareReplay1WhileConnected的subscribe的实现:

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self.lock.lock()
        let connection = self.synchronized_subscribe(observer)
        let count = connection.observers.count

        let disposable = connection.synchronized_subscribe(observer)
        self.lock.unlock()
        
        if count == 0 {
            connection.connect()
        }

        return disposable
    }

    @inline(__always)
    private func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
        let connection: Connection

        if let existingConnection = self.connection {
            connection = existingConnection
        }
        else {
            connection = ShareReplay1WhileConnectedConnection<Element>(
                parent: self,
                lock: self.lock)
            self.connection = connection
        }

        return connection
    }

当count == 0 执行连接,如果发送complete事件断开连接后,此时再次订阅,则count依然是==0 需要重新连接,这样就会再执行一遍subscribe handler,如果没有发送complete时间,则只会连接一次,后面的订阅只会重放buffer。

replay

replay操作符实际上是通过self.multicast { ReplaySubject.create(bufferSize: bufferSize) }实现,效果类似于publish,只不过publish不带buffer,replay可以指定buffer个数。

总结

普通可观察序列都可以通过以上操作符转变成可连接的序列,可连接序列只有在连接之后才能发送事件元素,而whileConnected和forever区别在于:

  • whileConnected:每个连接将有它自己Subject实例来存储重放事件。
  • forever:一个Suject将存储到source的所有连接的重放事件。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容