今天介绍可连接序列和连接相关操作符
可连接序列 Connection Observable, 不同于一般的序列,有订阅时不会立刻开始发送事件消息,只有当调用 connect()之后才会开始发送值。
connect & publish 操作符
我们看一个例子:
let observable = Observable<Int>.create({ anyObserver in
print("subscrition")
anyObserver.onNext(1)
anyObserver.onNext(2)
anyObserver.onNext(3)
return Disposables.create()
})
.publish()
observable
.subscribe {
print("订阅1", $0)
}
.disposed(by: bag)
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
_ = observable.connect()
}
observable.subscribe {
print("订阅2", $0)
}
subscrition
订阅1 next(1)
订阅2 next(1)
订阅1 next(2)
订阅2 next(2)
订阅1 next(3)
订阅2 next(3)
姑且把创建序列传入的闭包称之为subscribe handler
3秒后才会打印信息,因为3秒后才连接。而且我们发现subscrition只打印了一次,说明subscribe handler只执行了一遍,如果去掉publish和connect,则会执行两遍,相当于每次订阅都会重新执行一遍;如果只去掉connect,则不会发送事件。
对于上面的结果我们分析一下源码:
对于publish操作符,不妨看看:
public func publish() -> ConnectableObservable<Element> {
self.multicast { PublishSubject() }
}
publish本质上调用了multicast操作符:
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
返回一个ConnectableObservableAdapter对象,它也是个Observable,保存了原始的Observable和 makeSubject闭包。
- ConnectableObservableAdapter继承了ConnectableObservable,ConnectableObservable表示一个可连接的序列,ConnectableObservable遵循ConnectableObservableType协议声明一个connect()协议,表示连接操作。
- SubjectType表示这是一个Subject,在上一篇文章已经讲述过,那么连接操作实际上将一个普通的Observable连接到一个Subject,而publish操作明显是接到PublishSubject,它是没有buffer的Subject。
然后再看看subscribe代码在哪:
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
然后调用熟悉的subscribe(observer), 这里会进入ConnectableObservableAdapter的subscribe:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
self.lazySubject.subscribe(observer)
}
这里的lazySubject是通过makeSubject闭包创建的PublishSubject();通过它继续调用subscribe,这里相当于做了一次变形,而我们在上一篇已经介绍过PublishSubject执行subscribe是不会发送事件消息的(先不考虑error和complete事件),它只是将观察者插入到observers中保存起来,这块的代码上一篇已经详细讲解过,这里不再讲解。所以就解释了为什么connect之前调用subscribe不会发送事件消息;那么connect又干了什么呢?我们继续看connect的代码:
// at `ConnectableObservableAdapter` class
override func connect() -> Disposable {
return self.lock.performLocked {
if let connection = self.connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self.lock, subscription: singleAssignmentDisposable)
self.connection = connection
let subscription = self.source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
这里创建了一个Connection,并且执行了原始序列的subscribe(connection), 以connection作为参数,原始序列是最初的AnonymousObservable, AnonymousObservable的subscribe方法实现已经比较熟悉了,不用再看。它最终会执行最初创建序列的闭包。但是因为传入的observer是connection,而且Connection类重写了on方法,所以在调用onNext时,会调用到这个on方法:
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self.disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self.subjectObserver.on(event)
}
然后执行PulishSubject的on方法:这里会向所有观察者发送消息,例子中的观察者有两个。意思是例子每次调用onNext会通知所有的观察者。这也解释了为什么“subscrition”只打印了一遍,onNext每个打印两遍。如果先connect再subscribe,则只会打印“subscrition”,因为connect时还没有观察者。
share
share操作符可接受两个参数,replay 和 scope, replay表示缓存的个数或者叫重放的次数,这个好理解,而scope可接受两个枚举值.whileConnected 和 .forever,这两个什么区别:
-
.whileConnected
只有在连接时才支持缓存和重放,意味着如果连接断开则不能重放,下次再订阅时只能重新执行subscribe handler,一般什么操作会断开连接?当发送error或complete事件或者回收资源时会断开连接。 -
.forever
永远支持缓存和重放,无论是否断开连接。
我们看看官方给的例子:
func testScope(){
let xs = Observable.deferred { () -> Observable<TimeInterval> in
print("Performing work ...")
return Observable.just(Date().timeIntervalSince1970)
}
.share(replay: 1, scope: .forever)
_ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
_ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
_ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
}
这里的just方法会发送一个单一的元素:当前时间戳,还会在发一个complete事件,scope设置的是forever
打印结果:
Performing work ...
next 1622777422.7355828
completed
next 1622777422.7355828
completed
next 1622777422.7355828
completed
第一次订阅时,执行了subscribe handler,打印了Performing work ... next和complete,之后两次订阅时是对第一次的重放,打印的时间戳也一样。我们设置重放的个数为1,所以会重放一次next,同时也会重放complete;第一次执行complete其实以及断开连接了,但是依然能重放,说明就算连接断开了依然支持重放,这就是forever的意思。
如果改成.willConnected
则无法支持重放,打印结果:
Performing work ...
next 1622778439.6327991
completed
Performing work ...
next 1622778439.6332831
completed
Performing work ...
next 1622778439.633359
completed
这里相当于执行了三遍subscre handler,所以Performing work ...打印的三次,而且3次next打印的时间戳不一样;为什么呢,因为第一订阅之后就执行了complete事件了(just操作符会执行一次onNext+一次onComplete),这时候连接断开,再次订阅时需要重新建立连接,然又会重新执行subscribe handler。
我们再看看share的源码:
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
-> Observable<Element> {
switch scope {
case .forever:
switch replay {
case 0: return self.multicast(PublishSubject()).refCount()
default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
}
case .whileConnected:
switch replay {
case 0: return ShareWhileConnected(source: self.asObservable())
case 1: return ShareReplay1WhileConnected(source: self.asObservable())
default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
}
}
}
当scope == forever时,调用的multicast, replay == 0 连接的subject是PublishSubject,这个跟publish的操作符是一样的,不一样的是后面接了一个refCount操作符。
refCount返回一个RefCount对象,并将当前对象作为source
public func refCount() -> Observable<Element> {
RefCount(source: self)
}
RefCount是一个与源保持连接的可观察序列,只要该可观察序列至少有一次订阅。RefCount有一个对应的RefCountSink类,外面执行订阅时会调用RefCountSink的run方法:
func run() -> Disposable {
let subscription = self.parent.source.subscribe(self)
self.parent.lock.lock(); defer { self.parent.lock.unlock() }
self.connectionIdSnapshot = self.parent.connectionId
if self.isDisposed {
return Disposables.create()
}
if self.parent.count == 0 {
self.parent.count = 1
self.parent.connectableSubscription = self.parent.source.connect()
}
else {
self.parent.count += 1
}
/// 省略。。。
第一句执行self.parent.source.subscribe(self),这里的parent是RefCount对象,而它的source是ConnectableObservableAdapter对象,它的subscribe代码:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
self.lazySubject.subscribe(observer)
}
lazySubject是要连接的Subject,之后调用subject的subscribe,这个subscribe会执行重放和插入观察者操作,回到RefCountSink的run:
后面有一个关键代码:
if self.isDisposed {
return Disposables.create()
}
因为第一订阅时,会执行complete,所以执行当前dispose,第二次订阅时self.isDisposed == true,直接返回了。所以第二次第三次订阅时不会执行subscribe handler,只做重放。
如果share操作改成share(replay:2, .whileConnected), 执行的代码是return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount(), 这个跟 forever分支内的 return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()代码很像,为什么一个是forever一个是whileConnected。关键在于return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()这里只会创建一个ReplaySubject对象,而 { ReplaySubject.create(bufferSize: replay) }是一个闭包,每次在执行闭包时都会创建一个新的subject。所以每次订阅时都会创建一个新的subject。新的subject不存在重放的buffer;上面的代码self.isDisposed 一直是false,这导致每次需要再次连接。而每次连接都会执行subscribe handler。
还剩下下面两个case的代码没有解读:
case 0: return ShareWhileConnected(source: self.asObservable())
case 1: return ShareReplay1WhileConnected(source: self.asObservable())
ShareWhileConnected 处理无buffer的情况,ShareReplay1WhileConnected处理一个buffer的情况。这两个类非常相似,这两个分支并没有连接Subject,而是用ShareReplay1WhileConnected和ShareWhileConnected代替了Subject的功能。在ShareReplay1WhileConnected和ShareWhileConnected中,保存了所有的观察者;在ShareReplay1WhileConnected中保存了最后一个元素,这是为了实现重放。
它们都持有一个Connection,分别是ShareWhileConnectedConnection和ShareReplay1WhileConnectedConnection,这两个类实现了连接:
final func connect() {
self.subscription.setDisposable(self.parent.source.subscribe(self))
}
这里会去调用原始序列的subscribe,最终调用subscribe handler。
同时实现了订阅和通知观察者的功能,代码类似于Subject。
fileprivate var observers = Observers()
private var element: Element?
那这两个类是怎么实现whileConnected效果的? 看看ShareReplay1WhileConnected的subscribe的实现:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock()
let connection = self.synchronized_subscribe(observer)
let count = connection.observers.count
let disposable = connection.synchronized_subscribe(observer)
self.lock.unlock()
if count == 0 {
connection.connect()
}
return disposable
}
@inline(__always)
private func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
let connection: Connection
if let existingConnection = self.connection {
connection = existingConnection
}
else {
connection = ShareReplay1WhileConnectedConnection<Element>(
parent: self,
lock: self.lock)
self.connection = connection
}
return connection
}
当count == 0 执行连接,如果发送complete事件断开连接后,此时再次订阅,则count依然是==0 需要重新连接,这样就会再执行一遍subscribe handler,如果没有发送complete时间,则只会连接一次,后面的订阅只会重放buffer。
replay
replay操作符实际上是通过self.multicast { ReplaySubject.create(bufferSize: bufferSize) }实现,效果类似于publish,只不过publish不带buffer,replay可以指定buffer个数。
总结
普通可观察序列都可以通过以上操作符转变成可连接的序列,可连接序列只有在连接之后才能发送事件元素,而whileConnected和forever区别在于:
- whileConnected:每个连接将有它自己Subject实例来存储重放事件。
- forever:一个Suject将存储到source的所有连接的重放事件。