RxSwift-combineLast源码解析

combineLast是RxSwift中常用的函数之一,它可以把多个序列发送的数据进行组装后再返回。它的特点是组合的多个序列都发送信号后,才会有返回值,它是如何做到,后面看下它的源码实现就一目了然了

在看它的源码前,先来看下它是如何使用的
代码

let ob1 = PublishSubject<Int>()
let ob2 = PublishSubject<String>()
Observable.combineLatest(ob1, ob2) {
    (val1, val2) in
    "1:\(val1)-2:\(val2)"
    }
    .subscribe(onNext: {
        print($0)
    }).disposed(by: disposeBag);

ob1.onNext(1)
ob1.onNext(2)
ob2.onNext("a")
ob2.onNext("b")

打印

1:2-2:a
1:2-2:b

从这个打印结果上,可以验证上面发送信号的前提是组合的两个序列必须都要发送消息的说法,并且每次都是会记录一个最新的消息

源码解析

//CombineLatest+arity.swift
extension ObservableType where Element == Any {
    /**
    Merges the specified observable sequences into one observable sequence of tuples whenever any of the observable sequences produces an element.

    - seealso: [combineLatest operator on reactivex.io](http://reactivex.io/documentation/operators/combinelatest.html)

    - returns: An observable sequence containing the result of combining elements of the sources.
    */
    public static func combineLatest<O1: ObservableType, O2: ObservableType>
        (_ source1: O1, _ source2: O2)
            -> Observable<(O1.Element, O2.Element)> {
        //创建内部类CombineLatest2
        return CombineLatest2(
            source1: source1.asObservable(), source2: source2.asObservable(),
            resultSelector: { ($0, $1) }
        )
    }
}

在这个方法里创建了一个内部类CombineLatest2

//CombineLatest+arity.swift
final class CombineLatest2<E1, E2, Result> : Producer<Result> {
    typealias ResultSelector = (E1, E2) throws -> Result
    //这两个属性保存源序列
    let _source1: Observable<E1>
    let _source2: Observable<E2>

    //保存CombineLatest的尾随闭包
    let _resultSelector: ResultSelector

    init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
        self._source1 = source1
        self._source2 = source2

        self._resultSelector = resultSelector
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
        let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

这个内部类CombineLatest2继承自Producer,通过Observable的源码解析后,我们很清楚的就知道,当这个组合消息被订阅后,会执行它的run方法,进而创建新的内部类CombineLatestSink2_

//CombineLatest+arity.swift
final class CombineLatestSink2_<E1, E2, Observer: ObserverType> : CombineLatestSink<Observer> {
    typealias Result = Observer.Element
    typealias Parent = CombineLatest2<E1, E2, Result>

    let _parent: Parent
    //这两个属性,就是记录最新消息的值
    var _latestElement1: E1! = nil
    var _latestElement2: E2! = nil

    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        //这里我们组合的序列固定是两个,所以arity值为2
        super.init(arity: 2, observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        let subscription1 = SingleAssignmentDisposable()
        let subscription2 = SingleAssignmentDisposable()
        
        //setLatestValue为前面记录最新值的属性赋值
        let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
        let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)

        //在这里订阅源序列
         subscription1.setDisposable(self._parent._source1.subscribe(observer1))
         subscription2.setDisposable(self._parent._source2.subscribe(observer2))

        return Disposables.create([
                subscription1,
                subscription2
        ])
    }

    override func getResult() throws-> Result {
        return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
    }
}

CombineLatestSink2_这个类里,我们创建了两个CombineLatestObserver,并在创建的时候分别对_latestElement1,latestElement2进行了赋值操作,对源序列进行订阅并传入CombineLatestObserver对象
下面我们下看下CombineLatestSink2
的父类

//CombineLatest.swift
class CombineLatestSink<Observer: ObserverType>
    : Sink<Observer>
    , CombineLatestProtocol {
    typealias Element = Observer.Element 
   
    let _lock = RecursiveLock()

    private let _arity: Int
    private var _numberOfValues = 0
    private var _numberOfDone = 0
    private var _hasValue: [Bool]
    private var _isDone: [Bool]
   
    init(arity: Int, observer: Observer, cancel: Cancelable) {
        self._arity = arity
        self._hasValue = [Bool](repeating: false, count: arity)
        self._isDone = [Bool](repeating: false, count: arity)
        
        super.init(observer: observer, cancel: cancel)
    }
    
    func getResult() throws -> Element {
        rxAbstractMethod()
    }
    
    //这里是发送消息的处理,当两个序列都发送消息,即_numberOfValues等于_arity也就是2时,会先执行getResult取到组合的值,在执行forwardOn,这就是我们一开始就提出来特点的实现
    func next(_ index: Int) {
        if !self._hasValue[index] {
            self._hasValue[index] = true
            self._numberOfValues += 1
        }

        if self._numberOfValues == self._arity {
            do {
                let result = try self.getResult()
                self.forwardOn(.next(result))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
        else {
            var allOthersDone = true

            for i in 0 ..< self._arity {
                if i != index && !self._isDone[i] {
                    allOthersDone = false
                    break
                }
            }
            
            if allOthersDone {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
    
    func fail(_ error: Swift.Error) {
        self.forwardOn(.error(error))
        self.dispose()
    }
    
    func done(_ index: Int) {
        if self._isDone[index] {
            return
        }

        self._isDone[index] = true
        self._numberOfDone += 1

        if self._numberOfDone == self._arity {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

接下来,再来看下CombineLatestObserver做了什么

//CombineLatest.swift
final class CombineLatestObserver<Element>
    : ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    ...
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }

    func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            self._setLatestValue(value)
            self._parent.next(self._index)
        case .error(let error):
            self._this.dispose()
            self._parent.fail(error)
        case .completed:
            self._this.dispose()
            self._parent.done(self._index)
        }
    }
}

这里实现了on方法,最终会调用子类的next、fail、done等方法
到这里,主要实现的部分都已经罗列出来了,接下来看下源序列通过onNext是如何调用到combineLatestd尾随闭包,进而调用到订阅的block的方法的
当我们序列1通过onNext发送消息时,因为我们在CombineLatestSink2_的run方法中已经对序列1进行过订阅了,所以会走普通序列Observable的一系列步骤,最终会执行传入对象CombineLatestObserver的on方法,并对_latestElement1进行值更新,on会调用synchronized_on方法,进而执行CombineLatestSink2的next方法,而next方法会做判断,这个序列是否来过,没有则_numberOfValues加1,然后判断_numberOfValues是否等于_arity,这次显然不等于,直接跳过;
下面序列1又发送一个消息,流程一致,再次更新了_latestElement1的值,到next方法时,判断这个序列来过_numberOfValues值不变,后面流程一致又会跳过;
接下来序列2来了,前面流程一致,更新_latestElement2的值,到next方法时,判断该序列未来过,_numberOfValues值加1变为2,接下来_numberOfValues等于_arity,会先执行getResult,这个方法里调用_parent._resultSelector(self._latestElement1, self._latestElement2),_resultSelector即为combineLatest的闭包,这样就将_latestElement1和_latestElement2值进行了组合并返回,接着调用forwardOn(.next(result)),调用Sink类的forwardOn,看到这个类,后面的逻辑大概就知道了,他会调用_observer.on方法,进而调用AnonymousObserver的onCore方法,去执行订阅的回调_eventHandler,最终完成事物的解析
吧哒吧哒分析了一大堆,自己都要晕了,下面来看下思维导图,看是否会清晰

combineLatest

生活如此美好,今天就点到为止。。。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容