combineLast是RxSwift中常用的函数之一,它可以把多个序列发送的数据进行组装后再返回。它的特点是组合的多个序列都发送信号后,才会有返回值,它是如何做到,后面看下它的源码实现就一目了然了
在看它的源码前,先来看下它是如何使用的
代码
let ob1 = PublishSubject<Int>()
let ob2 = PublishSubject<String>()
Observable.combineLatest(ob1, ob2) {
(val1, val2) in
"1:\(val1)-2:\(val2)"
}
.subscribe(onNext: {
print($0)
}).disposed(by: disposeBag);
ob1.onNext(1)
ob1.onNext(2)
ob2.onNext("a")
ob2.onNext("b")
打印
1:2-2:a
1:2-2:b
从这个打印结果上,可以验证上面发送信号的前提是组合的两个序列必须都要发送消息的说法,并且每次都是会记录一个最新的消息
源码解析
//CombineLatest+arity.swift
extension ObservableType where Element == Any {
/**
Merges the specified observable sequences into one observable sequence of tuples whenever any of the observable sequences produces an element.
- seealso: [combineLatest operator on reactivex.io](http://reactivex.io/documentation/operators/combinelatest.html)
- returns: An observable sequence containing the result of combining elements of the sources.
*/
public static func combineLatest<O1: ObservableType, O2: ObservableType>
(_ source1: O1, _ source2: O2)
-> Observable<(O1.Element, O2.Element)> {
//创建内部类CombineLatest2
return CombineLatest2(
source1: source1.asObservable(), source2: source2.asObservable(),
resultSelector: { ($0, $1) }
)
}
}
在这个方法里创建了一个内部类CombineLatest2
//CombineLatest+arity.swift
final class CombineLatest2<E1, E2, Result> : Producer<Result> {
typealias ResultSelector = (E1, E2) throws -> Result
//这两个属性保存源序列
let _source1: Observable<E1>
let _source2: Observable<E2>
//保存CombineLatest的尾随闭包
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
self._source1 = source1
self._source2 = source2
self._resultSelector = resultSelector
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
这个内部类CombineLatest2继承自Producer,通过Observable的源码解析后,我们很清楚的就知道,当这个组合消息被订阅后,会执行它的run方法,进而创建新的内部类CombineLatestSink2_
//CombineLatest+arity.swift
final class CombineLatestSink2_<E1, E2, Observer: ObserverType> : CombineLatestSink<Observer> {
typealias Result = Observer.Element
typealias Parent = CombineLatest2<E1, E2, Result>
let _parent: Parent
//这两个属性,就是记录最新消息的值
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
//这里我们组合的序列固定是两个,所以arity值为2
super.init(arity: 2, observer: observer, cancel: cancel)
}
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
//setLatestValue为前面记录最新值的属性赋值
let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
//在这里订阅源序列
subscription1.setDisposable(self._parent._source1.subscribe(observer1))
subscription2.setDisposable(self._parent._source2.subscribe(observer2))
return Disposables.create([
subscription1,
subscription2
])
}
override func getResult() throws-> Result {
return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
}
}
CombineLatestSink2_这个类里,我们创建了两个CombineLatestObserver,并在创建的时候分别对_latestElement1,latestElement2进行了赋值操作,对源序列进行订阅并传入CombineLatestObserver对象
下面我们下看下CombineLatestSink2的父类
//CombineLatest.swift
class CombineLatestSink<Observer: ObserverType>
: Sink<Observer>
, CombineLatestProtocol {
typealias Element = Observer.Element
let _lock = RecursiveLock()
private let _arity: Int
private var _numberOfValues = 0
private var _numberOfDone = 0
private var _hasValue: [Bool]
private var _isDone: [Bool]
init(arity: Int, observer: Observer, cancel: Cancelable) {
self._arity = arity
self._hasValue = [Bool](repeating: false, count: arity)
self._isDone = [Bool](repeating: false, count: arity)
super.init(observer: observer, cancel: cancel)
}
func getResult() throws -> Element {
rxAbstractMethod()
}
//这里是发送消息的处理,当两个序列都发送消息,即_numberOfValues等于_arity也就是2时,会先执行getResult取到组合的值,在执行forwardOn,这就是我们一开始就提出来特点的实现
func next(_ index: Int) {
if !self._hasValue[index] {
self._hasValue[index] = true
self._numberOfValues += 1
}
if self._numberOfValues == self._arity {
do {
let result = try self.getResult()
self.forwardOn(.next(result))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
}
else {
var allOthersDone = true
for i in 0 ..< self._arity {
if i != index && !self._isDone[i] {
allOthersDone = false
break
}
}
if allOthersDone {
self.forwardOn(.completed)
self.dispose()
}
}
}
func fail(_ error: Swift.Error) {
self.forwardOn(.error(error))
self.dispose()
}
func done(_ index: Int) {
if self._isDone[index] {
return
}
self._isDone[index] = true
self._numberOfDone += 1
if self._numberOfDone == self._arity {
self.forwardOn(.completed)
self.dispose()
}
}
}
接下来,再来看下CombineLatestObserver做了什么
//CombineLatest.swift
final class CombineLatestObserver<Element>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
...
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next(let value):
self._setLatestValue(value)
self._parent.next(self._index)
case .error(let error):
self._this.dispose()
self._parent.fail(error)
case .completed:
self._this.dispose()
self._parent.done(self._index)
}
}
}
这里实现了on方法,最终会调用子类的next、fail、done等方法
到这里,主要实现的部分都已经罗列出来了,接下来看下源序列通过onNext是如何调用到combineLatestd尾随闭包,进而调用到订阅的block的方法的
当我们序列1通过onNext发送消息时,因为我们在CombineLatestSink2_的run方法中已经对序列1进行过订阅了,所以会走普通序列Observable的一系列步骤,最终会执行传入对象CombineLatestObserver的on方法,并对_latestElement1进行值更新,on会调用synchronized_on方法,进而执行CombineLatestSink2的next方法,而next方法会做判断,这个序列是否来过,没有则_numberOfValues加1,然后判断_numberOfValues是否等于_arity,这次显然不等于,直接跳过;
下面序列1又发送一个消息,流程一致,再次更新了_latestElement1的值,到next方法时,判断这个序列来过_numberOfValues值不变,后面流程一致又会跳过;
接下来序列2来了,前面流程一致,更新_latestElement2的值,到next方法时,判断该序列未来过,_numberOfValues值加1变为2,接下来_numberOfValues等于_arity,会先执行getResult,这个方法里调用_parent._resultSelector(self._latestElement1, self._latestElement2),_resultSelector即为combineLatest的闭包,这样就将_latestElement1和_latestElement2值进行了组合并返回,接着调用forwardOn(.next(result)),调用Sink类的forwardOn,看到这个类,后面的逻辑大概就知道了,他会调用_observer.on方法,进而调用AnonymousObserver的onCore方法,去执行订阅的回调_eventHandler,最终完成事物的解析
吧哒吧哒分析了一大堆,自己都要晕了,下面来看下思维导图,看是否会清晰
生活如此美好,今天就点到为止。。。