本文主要探究 RxSwift 中 timer 的源码
首先创建一个timer,timer的创建方式有两种
- 自定义timer,即通过 timer函数创建
Observable<Int>.timer(.seconds(5), period: .seconds(2), scheduler: MainScheduler.instance)
.subscribe { event in
print(event)
}
.disposed(by: disposeBag)
- 使用 RxSwift 中封装好的timer,即通过 interval 创建
var timer: Observable<Int> = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
timer.subscribe { num in
print(num)
}
.disposed(by: disposeBag)
下面我们来进行一一探索
timer() 源码解析
- 进入函数
timer()
源码,创建一个Timer
类对象(记为A
),有三个属性scheduler
(调度环境)、dueTime
(延迟时间)、period
(时间间隔)。
extension ObservableType where Element: RxAbstractInteger {
public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
-> Observable<Element> {
return Timer(
dueTime: dueTime,
period: period,
scheduler: scheduler
)
}
}
- 查看 Timer 类的继承链:
Timer --> Producer --> Observable --> 遵循 ObservableType 协议 --> 遵循 ObservableConvertibleType 协议
。所有 Timer 类其本质也是一个可观察序列
。 - 进入函数 subscribe() 源码,创建
AnonymousObserver
对象(记为B
),并保存订阅信号的闭包 eventHandler
extension ObservableType {
......
public func subscribe(
onNext: ((Element) -> Void)? = nil, // element是Observablecreate创建时外部传进来的
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
...
//创建匿名观察者
let observer = AnonymousObserver<Element> { event in
...
}
//创建销毁者
return Disposables.create(
// self.asObservable() 是一个序列(即统一成序列),直接返回self,即ob
self.asObservable().subscribe(observer),
disposable
)
}
}
- 进入
AnonymousObservable
类的 subscribe 函数,最终在父类 Producer 中找到,即A.subscribe(B)
class Producer<Element>: Observable<Element> {
......
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
// 调度者
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer() //销毁者
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
......
}
- 进入函数
run
的实现,这个在 Producer 子类Timer
类中,源码如下,即A.run(B)
- 这里的
TimerSink
是timer的通道类,用于链接 timer 序列 和 observer 观察者 - sink在初始化时,就将 timer 序列作为了 TimerSInk的 parent属性传入了,同时持有 observer(观察者)、cancel(销毁者)
- 这里的
final private class Timer<Element: RxAbstractInteger>: Producer<Element> {
fileprivate let scheduler: SchedulerType //调度环境
fileprivate let dueTime: RxTimeInterval //延迟时间
fileprivate let period: RxTimeInterval? //时间间隔
init(dueTime: RxTimeInterval, period: RxTimeInterval?, scheduler: SchedulerType) {
self.scheduler = scheduler
self.dueTime = dueTime
self.period = period
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
if self.period != nil {
//传入的 self 是 timer
let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
else {
let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
}
- 进入
TimerSink
类的函数run()
实现,这里与AnonymousObservable
序列的区别就是少了一个self传入
,但其实已经在初始化时将self 传入了。即TimerSink.run()
,
final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger {
typealias Parent = Timer<Observer.Element>
private let parent: Parent //timer序列对象
private let lock = RecursiveLock() //递归锁
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent //保存parent,即Timer
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
//action 就是 👇🏻 的闭包
return self.parent.scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
self.lock.performLocked {
self.forwardOn(.next(state))
return state &+ 1 // +1 操作
}
}
}
}
- 进入函数
scheduleRelative
源码,其中scheduler
是定时器初始化时传入的定时器,遵循SchedulerType
协议(注:需要根据具体传入的线程查找 scheduleRelative 具体实现,这里我们传入的是MainScheduler
,所以查找 MainScheduler 的函数 scheduleRelative ),即SerialDispatchQueueScheduler.scheduleRelative()
- MainScheduler的继承链:
MainScheduler --> SerialDispatchQueueScheduler --> 遵循 SchedulerType 协议
- 所以根据继承链,最终在
SerialDispatchQueueScheduler
类中找到 scheduleRelative 的具体实现
- MainScheduler的继承链:
//SerialDispatchQueueScheduler 类
public class SerialDispatchQueueScheduler : SchedulerType {
...
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
...
}
- 进入函数 scheduleRelative() 的实现,在这里我们可以看到,其本质是通过GCD定义一个timer,即
DispatchQueueConfiguration.scheduleRelative()
- 这里的
action
就是 TimerSink 类中run() 函数中schedulePeriodic 函数的闭包
,在这里做了 +1 操作 - 在每次执行完定时器后,走到回调方法,
不断地发送 onNext 信号,并做 +1 操作
,以此来达到循环的操作
- 这里的
extension DispatchQueueConfiguration {
...
func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
let deadline = DispatchTime.now() + dueTime
let compositeDisposable = CompositeDisposable()
//初始化一个 gcd timer,并指定线程
let timer = DispatchSource.makeTimerSource(queue: self.queue)
//设置执行次数
/**
wallDeadline: 什么时候开始
leeway: 调用频率,即多久调用一次
*/
timer.schedule(deadline: deadline, leeway: self.leeway)
// TODO:
// This looks horrible, and yes, it is.
// It looks like Apple has made a conceptual change here, and I'm unsure why.
// Need more info on this.
// It looks like just setting timer to fire and not holding a reference to it
// until deadline causes timer cancellation.
var timerReference: DispatchSourceTimer? = timer
let cancelTimer = Disposables.create {
timerReference?.cancel()
timerReference = nil
}
//设置执行回调
timer.setEventHandler(handler: {
if compositeDisposable.isDisposed {
return
}
_ = compositeDisposable.insert(action(state))
cancelTimer.dispose()
})
//执行timer
timer.resume()
_ = compositeDisposable.insert(cancelTimer)
return compositeDisposable
}
...
}
- 进入 action 闭包内的函数 forwardOn 实现,这里在父类 Sink 中找到,即
Sink.forwardOn()
- 这里的self.observer 就是前面传入 Sink observer 观察者,即 AnonymousObserver 类的对象
- 内部实现执行了 观察者的函数
on()
final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger {
...
func run() -> Disposable {
//action 就是 👇🏻 的闭包
return self.parent.scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
self.lock.performLocked {
self.forwardOn(.next(state))
return state &+ 1 // +1 操作
}
}
}
...
}
👇
class Sink<Observer: ObserverType>: Disposable {
...
final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
if isFlagSet(self.disposed, 1) {
return
}
//self 为 Sink
self.observer.on(event)
}
...
}
- 进入
AnonymousObserver
的函数 on() 实现,在其父类 ObserverBase 中找到,即ObserverBase.on()
- 其内部执行了函数
onCore()
- 其内部执行了函数
class ObserverBase<Element> : Disposable, ObserverType {
...
//不断发送响应的功能
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self.isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self.isStopped, 1) == 0 {
self.onCore(event)
}
}
}
...
}
- 进入 AnonymousObserver 类的函数 onCore() 的实现,因为父类没有具体实现,所以在子类查找,即
B.eventHandler(event)
,会走到订阅信号的事件处理回调中
final class AnonymousObserver<Element>: ObserverBase<Element> {
...
override func onCore(_ event: Event<Element>) {
self.eventHandler(event)
}
....
}
interval() 源码解析
- 进入函数 interval 源码,也是创建了一个 Timer 类对象,同样的有三个属性
extension ObservableType where Element: RxAbstractInteger {
public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return Timer(
dueTime: period,
period: period,
scheduler: scheduler
)
}
}
- 后续分析同 timer一致,这里就不赘述了。
总结
继承链
- Timer 类的继承链:
Timer --> Producer --> Observable --> 遵循 ObservableType 协议 --> 遵循 ObservableConvertibleType 协议
。所有 Timer 类其本质也是一个可观察序列
。 - MainScheduler的继承链:
MainScheduler --> SerialDispatchQueueScheduler --> 遵循 SchedulerType 协议
timer()/interval() 核心逻辑
- 创建序列 timer()/interval():创建 Timer 对象(A)
- 订阅信号 subscribe():创建 AnonymousObserver 对象(B),保存eventHandler
- A.subscribe(B)
- A.run(B)
- TimerSink.run() :执行通过方法
- SerialDispatchQueueScheduler.scheduleRelative():闭包中作+1操作
- DispatchQueueConfiguration.scheduleRelative():初始化gcd定时器
- Sink.forwardOn():发送信号
- ObserverBase.on() :转发信号
- B.eventHandler(event) :处理信号