在 PromiseKit 的实现当中,有两个核心的类 Promise 和 Guarantee,这两个类的差别为:
- Promise 表示任务的执行结果有两种:一种是成功,一种是失败(结果值为 error),
- Guarantee 表示任务的执行结果只能为成功。
Guarantee 实现了 Thenable 协议,Promise 实现了 Thenable, CatchMixin 协议
下面我们先来分析 Guarantee 的实现。
Guarantee
简单的说,Guarantee 负责管理一组待处理的事件,当收到结果时,用这个结果去调用这一组待处理的任务。
Guarantee 中待处理的事件用 Handlers 类管理,Handlers 的定义如下:
final class Handlers<R> {
var bodies: [(R) -> Void] = []
func append(_ item: @escaping(R) -> Void) { bodies.append(item) }
}
可以看出 Handlers 内部就是一个包含待处理事件的数组。
Guarantee 有两种状态:
- 待处理状态(pending):此时其中包含着一组待处理的任务
- 已处理状态(resolved):此时其中包含着一个用于处理任务的结果值
这两种状态用 Sealant 来表示,定义如下:
enum Sealant<R> {
case pending(Handlers<R>)
case resolved(R)
}
PromiseKit 是线程安全的。在修改 Guarantee 的状态或者添加待处理的事件时,都需要考虑到线程安全的问题。所以在获取或修改 Guarantee 的 Sealant 状态时,需要保证线程安全,为了解决这个问题 Sealant 的获取和设置过程使用了 Box 进行了封装:
class Box<T> {
func inspect() -> Sealant<T> { fatalError() } // 获取当前的状态
func inspect(_: (Sealant<T>) -> Void) { fatalError() } // 获取并修改 Sealant
func seal(_: T) {} // 设置为指定的值,并进行处理
}
在 PromiseKit 中 Box 有两个子类:
- SealedBox:初始化时已经有了结果值,即只会返回 resolved 状态
- EmptyBox :初始化时还没有结果值
final class SealedBox<T>: Box<T> {
let value: T
init(value: T) {
self.value = value
}
// SealedBox 没有重写 inspect(_: (Sealant<T>) -> Void) 的方法
// 因为 SealedBox 不应该调用这个方法,因为只有在 Sealant 为 padding 时,才需要调用
override func inspect() -> Sealant<T> {
return .resolved(value)
}
}
class EmptyBox<T>: Box<T> {
private var sealant = Sealant<T>.pending(.init())
private let barrier = DispatchQueue(label: "org.promisekit.barrier", attributes: .concurrent)
// 设置结果值
// 1. 修改状态 .pending -> .resolved
// 2. 处理待处理的任务
override func seal(_ value: T) {
var handlers: Handlers<T>!
barrier.sync(flags: .barrier) {
guard case .pending(let _handlers) = self.sealant else {
return // already fulfilled!
}
handlers = _handlers
self.sealant = .resolved(value)
}
//FIXME we are resolved so should `pipe(to:)` be called at this instant, “thens are called in order” would be invalid
//NOTE we don’t do this in the above `sync` because that could potentially deadlock
//THOUGH since `then` etc. typically invoke after a run-loop cycle, this issue is somewhat less severe
if let handlers = handlers {
handlers.bodies.forEach{ $0(value) }
}
//TODO solution is an unfortunate third state “sealed” where then's get added
// to a separate handler pool for that state
// any other solution has potential races
}
override func inspect() -> Sealant<T> {
var rv: Sealant<T>!
barrier.sync {
rv = self.sealant
}
return rv
}
override func inspect(_ body: (Sealant<T>) -> Void) {
var sealed = false
barrier.sync(flags: .barrier) {
switch sealant {
case .pending:
// body will append to handlers, so we must stay barrier’d
body(sealant)
case .resolved:
sealed = true
}
}
if sealed {
// we do this outside the barrier to prevent potential deadlocks
// it's safe because we never transition away from this state
// 我们在 barrier 之外执行这个操作,来避免潜在的死锁
// 这是安全的,因为我们不会将这个状态进行转换
body(sealant)
}
}
}
Guarantee 中就管理着一个 Box 实例:box,通过 pipe(to: @escaping(Result<T>) -> Void)
向其中添加任务。通过调用 box.seal(:T)
的方法修改 Guarantee 的状态,并处理待执行的任务。
简单来描述 Guarantee 的实现原理就是:在一个数组中存放待执行的任务,等到获取到结果后执行待处理的任务,并清除待执行的任务列表,保存获取到的结果值。
调用链
为了让 Guarantee 的结果值进行传递,对 Guarantee 扩展了很多方法来方便链式调用,比如:then,map 等。
map
下面是 map 扩展方法的实现:
func map<U>(on: DispatchQueue? = conf.Q.map,
flags: DispatchWorkItemFlags? = nil,
_ body: @escaping(T) -> U) -> Guarantee<U> {
let rg = Guarantee<U>(.pending)
pipe { value in
on.async(flags: flags) {
rg.box.seal(body(value))
}
}
return rg
}
在 map 中,给当前的 Guarantee 添加了一个新的任务,即当获取值时,将值通过 body 处理之后设置为新创建 Guarantee 实例 rg 的结果值。
then
下面是 then 方法的实现方法,定义如下:
@discardableResult
func then<U>(on: DispatchQueue? = conf.Q.map,
flags: DispatchWorkItemFlags? = nil,
_ body: @escaping(T) -> Guarantee<U>) -> Guarantee<U> {
let rg = Guarantee<U>(.pending)
pipe { value in
on.async(flags: flags) {
body(value).pipe(to: rg.box.seal)
}
}
return rg
}
在这个扩展的方法中,一共牵涉到了三个 Guarantee
- 调用 then 方法的当前 Guarantee
- 在 then 方法中创建并返回的 Guarantee
- 由参数 body 闭包返回的 Guarantee
这个三个 Guarantee 的关系为:通过调用 then 方法,给当前的 Guarantee 添加了一个待执行的任务,即调用 body 获取一个 Guarantee 并向其添加了一个完成 then 方法返回的 Guarantee 的任务。
所以,then 方法中返回的 Guarantee 需要等到从 body 中返回的 Guarantee 处理完成时才处理,而从 body 中返回的 Guarantee 需要等到当前的 Guarantee 处理完之后才处理。
为了应对不同的处理方式,PromiseKit 创建了多种方法。通过不同的方法名,可以让程序的可读性在代码层面得到提高。
Promise
对比 Guarantee,Promise 的执行结果可能是成功(获取到一个值),可能是发生异常(获取到一个 Error),所以在定义上, Promise 和 Guarantee 主要有两点不同:
- Promise 的结果有两种,成功或失败。
- Promise 不仅实现了 Thenable 协议,还实现了 CatchMixin 协议,用于处理异常的情况,
Promise 中的 Box 中的泛型就是 Result。 Result 定义如下:
public enum Result<T> {
case fulfilled(T)
case rejected(Error)
}
CatchMixin 异常处理
通过对 CatchMixin 进行扩展,提供了 catch 方法,定义入下:
public extension CatchMixin {
@discardableResult
func `catch`(on: DispatchQueue? = conf.Q.return, flags: DispatchWorkItemFlags? = nil, policy: CatchPolicy = conf.catchPolicy, _ body: @escaping(Error) -> Void) -> PMKFinalizer {
let finalizer = PMKFinalizer()
pipe {
switch $0 {
case .rejected(let error):
guard policy == .allErrors || !error.isCancelled else {
fallthrough
}
on.async(flags: flags) {
body(error)
finalizer.pending.resolve(())
}
case .fulfilled:
finalizer.pending.resolve(())
}
}
return finalizer
}
}
public class PMKFinalizer {
let pending = Guarantee<Void>.pending()
/// `finally` is the same as `ensure`, but it is not chainable
public func finally(on: DispatchQueue? = conf.Q.return, flags: DispatchWorkItemFlags? = nil, _ body: @escaping () -> Void) {
pending.guarantee.done(on: on, flags: flags) {
body()
}
}
}
在 catch 方法中,提供 body 参数传入一个处理 error 的处理程序,并返回一个 PMKFinalizer 实例。PMKFinalizer 类的作用有两个:
- 终结调用链
- 传入一个闭包,这个闭包不管 Promise 执行的结果是成功还是失败都会调用。
当发生异常的时候,还可以通过 error 尝试进行恢复:
func recover<U: Thenable>(on: DispatchQueue? = conf.Q.map, flags: DispatchWorkItemFlags? = nil, policy: CatchPolicy = conf.catchPolicy, _ body: @escaping(Error) throws -> U) -> Promise<T> where U.T == T {
let rp = Promise<U.T>(.pending)
pipe {
switch $0 {
case .fulfilled(let value):
rp.box.seal(.fulfilled(value))
case .rejected(let error):
if policy == .allErrors || !error.isCancelled {
on.async(flags: flags) {
do {
let rv = try body(error)
guard rv !== rp else { throw PMKError.returnedSelf }
rv.pipe(to: rp.box.seal)
} catch {
rp.box.seal(.rejected(error))
}
}
} else {
rp.box.seal(.rejected(error))
}
}
}
return rp
}
特性方法
wait、hang —— 异步转同步
PromiseKit 是用来应对异步程序的,但是有时候为了方便测试,希望将异步转化为同步,方便的获取到结果值,所以给 Promise 扩展了 wait 方法。下面是 wait 方法的实现:
/**
Blocks this thread, so—you know—don’t call this on a serial thread that
any part of your chain may use. Like the main thread for example.
*/
func wait() throws -> T {
if Thread.isMainThread {
conf.logHandler(LogEvent.waitOnMainThread)
}
var result = self.result
if result == nil {
let group = DispatchGroup()
group.enter()
pipe { result = $0; group.leave() }
group.wait()
}
switch result! {
case .rejected(let error):
throw error
case .fulfilled(let value):
return value
}
}
wait 方法通过 DispatchGroup 实现了将异步转化为同步,实现思路为:
- 创建一个 grounp
- 调用 grounp 的 enter 方法
- 通过 pipe 方法给当前的 promise 添加一个待处理的任务:调用 grounp 的 leave 方法
- 调用 grounp 的 wait 方法
在 PromiseKit 中,同时给 Promise 和 Guarantee 都提供了 wait 方法。区别是 Promise 的 wait 方法会抛出异常,Guarantee 的 wait 不会抛出异常。
除了 wait 方法外,还提供了一个全局方法 hang,定义如下:
public func hang<T>(_ promise: Promise<T>) throws -> T {
#if os(Linux) || os(Android)
#if swift(>=4.2)
let runLoopMode: CFRunLoopMode = kCFRunLoopDefaultMode
#else
// isMainThread is not yet implemented on Linux.
let runLoopModeRaw = RunLoopMode.defaultRunLoopMode.rawValue._bridgeToObjectiveC()
let runLoopMode: CFString = unsafeBitCast(runLoopModeRaw, to: CFString.self)
#endif
#else
guard Thread.isMainThread else {
// hang doesn't make sense on threads that aren't the main thread.
// use `.wait()` on those threads.
fatalError("Only call hang() on the main thread.")
}
let runLoopMode: CFRunLoopMode = CFRunLoopMode.defaultMode
#endif
if promise.isPending {
var context = CFRunLoopSourceContext()
let runLoop = CFRunLoopGetCurrent()
let runLoopSource = CFRunLoopSourceCreate(nil, 0, &context)
CFRunLoopAddSource(runLoop, runLoopSource, runLoopMode)
_ = promise.ensure {
CFRunLoopStop(runLoop)
}
while promise.isPending {
CFRunLoopRun()
}
CFRunLoopRemoveSource(runLoop, runLoopSource, runLoopMode)
}
switch promise.result! {
case .rejected(let error):
throw error
case .fulfilled(let value):
return value
}
}
hang 通过 RunLoop 原理实现了异步转同步。
实际上 RunLoop 就是这样一个函数,其内部是一个 do-while 循环。当你调用 CFRunLoopRun() 时,线程就会一直停留在这个循环里;直到超时或被手动停止,该函数才会返回。
实现过程为:
- 在当前的 RunLoop 的 defaultMode 中添加一个 Source,防止 RunLoop 开始之后退出
- 在当前的 promise 中添加一个待处理事件,即调用 CFRunLoopStop 来退出 RunLoop
- 在当前的 promise 为 pending 状态时,调用 CFRunLoopRun() 进入 RunLoop 循环
- 退出 RunLoop 循环之后,移除之前添加的 Source
关于 RunLoop 更多的内容可以查看下面的文章:
https://blog.ibireme.com/2015/05/18/runloop/
https://blog.devtang.com/2012/06/24/enhance-uiactionsheet/
after 等待指定的时间
有时需要等待一段指定的时间再去执行,after 就可以快速的返回这样一个 Guarantee。
/**
after(seconds: 1.5).then {
//…
}
- Returns: A guarantee that resolves after the specified duration.
*/
public func after(seconds: TimeInterval) -> Guarantee<Void> {
let (rg, seal) = Guarantee<Void>.pending()
let when = DispatchTime.now() + seconds
#if swift(>=4.0)
q.asyncAfter(deadline: when) { seal(()) }
#else
q.asyncAfter(deadline: when, execute: seal)
#endif
return rg
}
/**
after(.seconds(2)).then {
//…
}
- Returns: A guarantee that resolves after the specified duration.
*/
public func after(_ interval: DispatchTimeInterval) -> Guarantee<Void> {
let (rg, seal) = Guarantee<Void>.pending()
let when = DispatchTime.now() + interval
#if swift(>=4.0)
q.asyncAfter(deadline: when) { seal(()) }
#else
q.asyncAfter(deadline: when, execute: seal)
#endif
return rg
}
在 after 方法内部就是通过 DispatchQueue 的 asyncAfter 方法来实现的。
firstly
为了增强代码的可读性,PromiseKit 提供了一个 firstly 方法,先来看一下不使用 firstly 和使用 firstly 的代码对比:
// 不使用的情况
URLSession.shared.dataTask(url: url1).then {
URLSession.shared.dataTask(url: url2)
}.then {
URLSession.shared.dataTask(url: url3)
}
// 使用 firstly 的情况
firstly {
URLSession.shared.dataTask(url: url1)
}.then {
URLSession.shared.dataTask(url: url2)
}.then {
URLSession.shared.dataTask(url: url3)
}
使用 first 之后,确实可以提高代码的可读性。firstly 的实现方式如下:
public func firstly<U: Thenable>(execute body: () throws -> U) -> Promise<U.T> {
do {
let rp = Promise<U.T>(.pending)
try body().pipe(to: rp.box.seal)
return rp
} catch {
return Promise(error: error)
}
}
/// - See: firstly()
public func firstly<T>(execute body: () -> Guarantee<T>) -> Guarantee<T> {
return body()
}
race
在有时候我们需要在多个异步程序中,获取到最先执行结束的结果值,这是就可以使用 race 方法。race 方法有几个变体,我们来看其中的一个:
public func race<T>(_ guarantees: Guarantee<T>...) -> Guarantee<T> {
let rg = Guarantee<T>(.pending)
for guarantee in guarantees {
guarantee.pipe(to: rg.box.seal)
}
return rg
}
在 race 中创建了一个新的 Guarantee,在所有参与竞争的 Guarantee 中添加一个待处理程序,即设置这个新创建 Guarantee 的结果。由于 Guarantee 只会保存第一次设置的结果值,而忽略掉以后再设置的结果值,所以新创建的 Guarantee 中就保存了最先执行完的结果值。
when
有时需要同时获取多个异步处理程序的执行结果,所有需要等到所有的异步程序都完成,这种场景就可以通过 when 来实现。同样 when 的方法也有多个变型,下面是其中一个:
private func _when<U: Thenable>(_ thenables: [U]) -> Promise<Void> {
var countdown = thenables.count
guard countdown > 0 else {
return .value(Void())
}
let rp = Promise<Void>(.pending)
#if PMKDisableProgress || os(Linux)
var progress: (completedUnitCount: Int, totalUnitCount: Int) = (0, 0)
#else
let progress = Progress(totalUnitCount: Int64(thenables.count))
progress.isCancellable = false
progress.isPausable = false
#endif
let barrier = DispatchQueue(label: "org.promisekit.barrier.when", attributes: .concurrent)
for promise in thenables {
promise.pipe { result in
barrier.sync(flags: .barrier) {
switch result {
case .rejected(let error):
if rp.isPending {
progress.completedUnitCount = progress.totalUnitCount
rp.box.seal(.rejected(error))
}
case .fulfilled:
guard rp.isPending else { return }
progress.completedUnitCount += 1
countdown -= 1
if countdown == 0 {
rp.box.seal(.fulfilled(()))
}
}
}
}
}
return rp
}
public func when<U: Thenable>(fulfilled thenables: [U]) -> Promise<[U.T]> {
return _when(thenables).map(on: nil) { thenables.map{ $0.value! } }
}
其中核心方法是 _when,设计思路为:新创建一个 Promise,每当一个需要等待的 Promise 执行完之后,查看是否还有未执行结束的 Promise,如果全部执行完,则设置新创建的 Promise 的结果值。
技巧
Resolver 在必要的时候输出警告
Promise 有个初始化方法:
/// - Returns: a tuple of a new pending promise and its `Resolver`.
public class func pending() -> (promise: Promise<T>, resolver: Resolver<T>) {
return { ($0, Resolver($0.box)) }(Promise<T>(.pending))
}
Resolver 的定义如下:
/// An object for resolving promises
public final class Resolver<T> {
let box: Box<Result<T>>
init(_ box: Box<Result<T>>) {
self.box = box
}
deinit {
if case .pending = box.inspect() {
conf.logHandler(.pendingPromiseDeallocated)
}
}
}
看起来 Resolver 仅仅是将 Box 包装了一层,为什么要这样做呢?
在 PromiseKit 中,在 Promise 释放时,如果 Promise 的状态为未处理(pending),则会在控制台输出一条警告信息,来提示用户可能存在问题。
我想这样设计的主要目的是来满足这个需求的。
为什么不用子类呢?为什么不把这个判断放到 Box 类中呢?
扩展系统 API 的注意事项
为了方便的在指定的线程上创建一个 Guarantee,所以作者对 DispatchQueue 进行了扩展,但是在扩展的时候,可能会和其他库中的扩展重名,也可能在将来会和苹果的 API 重名,这样就会导致难以排查的 bug。PromiseKit 的做法是 在方法中添加一个独有类型的参数。代码如下:
public extension DispatchQueue {
/**
Asynchronously executes the provided closure on a dispatch queue.
DispatchQueue.global().async(.promise) {
md5(input)
}.done { md5 in
//…
}
- Parameter body: The closure that resolves this promise.
- Returns: A new `Guarantee` resolved by the result of the provided closure.
- Note: There is no Promise/Thenable version of this due to Swift compiler ambiguity issues.
*/
@available(macOS 10.10, iOS 2.0, tvOS 10.0, watchOS 2.0, *)
final func async<T>(_: PMKNamespacer, group: DispatchGroup? = nil, qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: @escaping () -> T) -> Guarantee<T> {
let rg = Guarantee<T>(.pending)
async(group: group, qos: qos, flags: flags) {
rg.box.seal(body())
}
return rg
}
}
其中 PMKNamespacer 的定义如下:
/// used by our extensions to provide unambiguous functions with the same name as the original function
public enum PMKNamespacer {
case promise
}