Swift Combine 入门导读

在具体介绍 Combine 之前,有两个重要的概念需要简要介绍一下:

  • 观察者模式
  • 响应式编程

观察者模式

观察者模式(Observer Pattern)是一种设计模式,用来描述一对多关系:一个对象发生改变时将自动通知其他对象,其他对象将相应做出反应。这两类对象分别被称为被 观察目标(Observable)和 观察者(Observer),也就是说一个观察目标可以对应多个观察者,观察者可以订阅它们感兴趣的内容,当观察目标内容改变时,它会向这些观察者广播通知(调用 Observer 的更新方法)。有一点需要说明的是,观察者之间彼此时互相独立的,也并不知道对方的存在。

在 Swift 中,一个简单的观察者模式可以被描述为:

protocol Observable {
    associatedtype T: Observer
    mutating func attach(observer: T)
}

protocol Observer {
    associatedtype State
    func notify(_ state: State)
}

struct AnyObservable<T: Observer>: Observable{

    var state: T.State {
        didSet {
            notifyStateChange()
        }
    }

    var observers: [T] = []

    init(_ state: T.State) {
        self.state = state
    }

    mutating func attach(observer: T) {
        observers.append(observer)
        observer.notify(state)
    }

    private func notifyStateChange() {
        for observer in observers {
            observer.notify(state)
        }
    }
}

struct AnyObserver<S>: Observer {

    private let name: String

    init(name: String) {
        self.name = name
    }

    func notify(_ state: S) {
        print("\(name)'s state updated to \(state)")
    }
}

var observable = AnyObservable<AnyObserver<String>>("hello")
let observer = AnyObserver<String>(name: "My Observer")
observable.attach(observer: observer)
observable.state = "world"

// "My Observer's state updated: hello"
// "My Observer's state updated: world"

响应式编程

响应式编程(Reactive Programming)是一种编程思想,相对应的也有面向过程编程、面向对象编程、函数式编程等等。不同的是,响应式编程的核心是面向异步数据流和变化的。

在现在的前端世界中,我们需要处理大量的事件,既有用户的交互,也有不断的网络请求,还有来自系统或者框架的各种通知,因此也无可避免产生纷繁复杂的状态。使用响应式编程后,所有事件将成为异步的数据流,更加方便的是可以对这些数据流可以进行组合变换,最终我们只需要监听所关心的数据流的变化并做出响应即可。

举个有趣的例子来解释一下:

当你早上起床之后,你需要一边洗漱一边烤个面包,最后吃早饭。

传统的编程方法:

func goodMorning() {
    wake {
        let group = DispatchGroup()
        group.enter()
        wash {
            group.leave()
        }
        group.enter()
        cook {
            group.leave()
        }
        group.notify(queue: .main) {
            eat {
                print("Finished")
            }
        }
    }
}

响应式编程:

func reactiveGoodMorning() {
    let routine = wake.rx.flapLatest {
        return Observable.zip(wash, cook)
    }.flapLatest {
        return eat.rx
    }
    routine.subsribe(onCompleted: {
        print("Finished")
    })
}

不同于传统可以看到 wake/wash/cook/eat 这几个事件通过一些组合转换被串联成一个流,我们也只需要订阅这个流就可以在应该响应的时候得到通知。

Marble Diagram

为了更方便理解数据流,我们通常用一种叫 Marble Diagram 的图象形式来表示数据流基于时间的变化。



我们用从左至右的箭头表示时间线,不同的形状代表 Publisher 发出的值(Input),竖线代表正常终止,叉代表发生错误而终止。上图中,上面一条时间线是一个数据流,中间的方块代表组合变换,下方的另外一条时间线代表经过变换后的数据流。

Combine

现在可以进入正题了,什么是 Combine?

官方文档的介绍是:

The Combine framework provides a declarative Swift API for processing values over time. These values can represent user interface events, network responses, scheduled events, and many other kinds of asynchronous data.
By adopting Combine, you’ll make your code easier to read and maintain, by centralizing your event-processing code and eliminating troublesome techniques like nested closures and convention-based callbacks.

简而言之,Combine 可以使代码更加简洁、易于维护,也免除了饱受诟病的嵌套闭包和回调地狱。Combine 是 Reactive Programming 在 Swift 中的一个实现,更确切的说是对 ReactiveX (Reactive Extensions, 简称 Rx) 的实现,而这个实现正是基于观察者模式的。

Combine 是基于泛型实现的,是类型安全的。它可以无缝地接入已有的工程,用来处理现有的 Target/ActionNotificationKVOcallback/closure 以及各种异步网络请求。

在 Combine 中,有几个重要的组成部分:

  • 发布者:Publiser
  • 订阅者:Subscriber
  • 操作符:Operator
Publisher

在 Combine 中,Publisher 是观察者模式中的 Observable,并且可以通过组合变换(利用 Operator)重新生成新的 Publisher。

public protocol Publisher {
    /// The kind of values published by this publisher.
    associatedtype Output

    /// The kind of errors this publisher might publish.
    ///
    /// Use `Never` if this `Publisher` does not publish errors.
    associatedtype Failure : Error

    /// This function is called to attach the specified `Subscriber` to 
    /// this `Publisher` by `subscribe(_:)`
    ///
    /// - SeeAlso: `subscribe(_:)`
    /// - Parameters:
    ///     - subscriber: The subscriber to attach to this `Publisher`.
    ///                   once attached it can begin to receive values.
    func receive<S>(subscriber: S) where S : Subscriber, 
        Self.Failure == S.Failure, Self.Output == S.Input
}

在 Publisher 的定义中,Output 代表数据流中输出的值,值的更新可能是同步,也可能是异步,Failure 代表可能产生的错误,也就是说 Pubslier 最核心的是定义了值与可能的错误。Publisher 通过 receive(subscriber:) 用来接受订阅,并且要求 Subscriber 的值和错误类型要一致来保证类型安全。

例如来自官方的一个例子:

extension NotificationCenter {
        struct Publisher: Combine.Publisher {
                typealias Output = Notification
                typealias Failure = Never
                init(center: NotificationCenter, name: Notification.Name, object: Any? = nil)
        }
}

这个 Publisher 提供的值就是 Notification 类型,而且永远不会产生错误(Never)。这个扩展非常有用,可以很方便地将任何 Notification 转换成 Publisher,便于我们将应用改造成 Reactive。


让我们再看一个的例子:

let justPubliser = Publishers.Just("Hello")

justPubliser 会给每个订阅者发送一个 "Hello" 消息,然后立即结束(这个数据流只包含一个值)。


类似的,Combine 为我们提供了一些便捷的 Publisher 的实现,除了上面的 Just,这里也列出一些很有用的 Publisher。

Empty

Empty 不提供任何值的更新,并且可以选择立即正常结束。


Once

Once 可以提供以下两种数据流之一:

发送一次值更新,然后立即正常结束(和 Just 一样)
立即因错误而终止
Fail

Fail 和 Once 很像,也是提供两种情况之一:

发送一次值更新,然后立即因错误而终止
立即因错误而终止


Sequence

Sequence 将给定的一个序列按序通知到订阅者。


Future

Future 初始化需要提供执行具体操作的 closure,这个操作可以是异步的,并且最终返回一个 Result,所以 Future 要么发送一个值,然后正常结束,要么因错误而终止。在将一些异步操作转换为 Publisher 时非常有用,尤其是网络请求。例如:

let apiRequest = Publishers.Future { promise in
    URLSession.shared.dataTask(with: url) { data, _, _ in
        promise(.success(data))
    }.resume()
}


Deferred

Deferred 初始化需要提供一个生成 Publisher 的 closure,只有在有 Subscriber 订阅的时候才会生成指定的 Publisher,并且每个 Subscriber 获得的 Publisher 都是全新的。

那么,在之前的 Just Publisher 例子中,假设我们要实现延迟两秒后再发送消息呢?也很简单。我们前面说到 Publisher 都是可以组合变换的,这些组合变换可以通过操作符来实现的。

上面的例子利用 delay 操作符就可以改写为:

let delayedJustPublisher = justPubliser.delay(for: 2, scheduler: DispatchQueue.main)

在文章的后半部分,我们将了解一些常用操作符以及它们的使用场景。

Subscriber

和 Publisher 相对应的,Subscriber 就是观察者模式中 Observer。

public protocol Subscriber : CustomCombineIdentifierConvertible {
    /// The kind of values this subscriber receives.
    associatedtype Input

    /// The kind of errors this subscriber might receive.
    ///
    /// Use `Never` if this `Subscriber` cannot receive errors.
    associatedtype Failure : Error

    /// Tells the subscriber that it has successfully subscribed to 
    /// the publisher and may request items.
    ///
    /// Use the received `Subscription` to request items from the publisher.
    /// - Parameter subscription: A subscription that represents the connection 
    /// between publisher and subscriber.
    func receive(subscription: Subscription)

    /// Tells the subscriber that the publisher has produced an element.
    ///
    /// - Parameter input: The published element.
    /// - Returns: A `Demand` instance indicating how many more elements 
    /// the subcriber expects to receive.
    func receive(_ input: Self.Input) -> Subscribers.Demand

    /// Tells the subscriber that the publisher has completed publishing, 
    /// either normally or with an error.
    ///
    /// - Parameter completion: A `Completion` case indicating 
    /// whether publishing completed normally or with an error.
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

可以看出,从概念上和我们之前定义的简单观察者模式相差无几。Publisher 在自身状态改变时,调用 Subscriber 的三个不同方法(receive(subscription), receive(_:Input), receive(completion:))来通知 Subscriber。


这里也可以看出,Publisher 发出的通知有三种类型:

  • Subscription:Subscriber 成功订阅的消息,只会发送一次,取消订阅会调用它的 Cancel 方法来释放资源
  • Value(Subscriber 的 Input,Publisher 中的 Output):真正的数据,可能发送 0 次或多次
  • Completion:数据流终止的消息,包含两种类型:.finished 和 .failure(Error),最多发送一次,一旦发送了终止消息,这个数据流就断开了,当然有的数据流可能永远没有终止

大部分场景下我们主要关心的是后两种消息,即数据流的更新和终止。

Combine 内置的 Subscriber 有三种:

  • Sink
  • Assign
  • Subject

Sink 是非常通用的 Subscriber,我们可以自由的处理数据流的状态。

例如:

let once: Publishers.Once<Int, Never> = Publishers.Once(100)
let observer: Subscribers.Sink<Publishers.Once<Int, Never>> = Subscribers.Sink(receiveCompletion: {
    print("completed: \($0)")
}, receiveValue: {
    print("received value: \($0)")
})
once.subscribe(observer)

// received value: 100
// completed: finished

Publisher 甚至提供了 sink(receiveCompletion:, receiveValue:) 方法来直接订阅。

Assign 可以很方便地将接收到的值通过 KeyPath 设置到指定的 Class 上(不支持 Struct),很适合将已有的程序改造成 Reactive。

例如:

class Student {
    let name: String
    var score: Int

    init(name: String, score: Int) {
        self.name = name
        self.score = score
    }
}

let student = Student(name: "Jack", score: 90)
print(student.score)
let observer = Subscribers.Assign(object: student, keyPath: \Student.score)
let publisher = PassthroughSubject<Int, Never>()
publisher.subscribe(observer)
publisher.send(91)
print(student.score)
publisher.send(100)
print(student.score)

// 90
// 91
// 100

在例子中,一旦 publisher 的值发生改变,相应的,student 的 score 也会被更新。

PassthroughSubject 这里是 Combine 内置的一个 Publisher,可能你会好奇:前面不是说 Subject 是一种 Observer 吗,这里怎么又是 Pulisher 呢?

Subject

有些时候我们想随时在 Publisher 插入值来通知订阅者,在 Rx 中也提供了一个 Subject 类型来实现。Subject 通常是一个中间代理,即可以作为 Publisher,也可以作为 Observer。Subject 的定义如下:

public protocol Subject : AnyObject, Publisher {

    /// Sends a value to the subscriber.
    ///
    /// - Parameter value: The value to send.
    func send(_ value: Self.Output)

    /// Sends a completion signal to the subscriber.
    ///
    /// - Parameter completion: A `Completion` instance which indicates 
    /// whether publishing has finished normally or failed with an error.
    func send(completion: Subscribers.Completion<Self.Failure>)
}

作为 Observer 的时候,可以通过 Publisher 的 subscribe(_:Subject) 方法订阅某个 Publisher。

作为 Publisher 的时候,可以主动通过 Subject 的两个 send 方法,我们可以在数据流中随时插入数据。目前在 Combine 中,有三个已经实现对 Subject: AnySubject,CurrentValueSubject 和 PassthroughSubject 。

利用 Subject 可以很轻松的将现在已有代码的一部分转为 Reactive,一个比较典型的使用场景是:

// Before
class ContentManager {

    var content: [String] {
        didSet {
            delegate?.contentDidChange(content)
        }
    }

    func getContent() {
        content = ["hello", "world"]
    }
}

// After
class RxContentController {

    var content = CurrentValueSubject<[String], NSError>([])

    func getContent() {
        content.value = ["hello", "world"]
    }
}

CurrentValueSubject 的功能很简单,就是包含一个初始值,并且会在每次值变化的时候发送一个消息,这个值会被保存,可以很方便的用来替代 Property Observer。在上例中,以前需要实现 delegate 来获取变化,现在只需要订阅 content 的变化即可,并且它作为一个 Publisher,可以很方便的利用操作符进行组合变换。

PassthroughSubject 和 CurrentValueSubject 几乎一样,只是没有初始值,也不会保存任何值。

AnyPublisher、AnySubscriber、AnySubject

前面说到 Publisher、Subscriber、Subject 是类型安全的,那么在使用中不同类型的 Publisher、Subscriber、Subject 势必会造成一些麻烦。

考虑下面这种情况:

class StudentManager {

    let namesPublisher: ??? // what's the type?

    func updateStudentsFromLocal() {
        let student1 = Student(name: "Jack", score: 75)
        let student2 = Student(name: "David", score: 80)
        let student3 = Student(name: "Alice", score: 96)
        let namesPublisher: Publishers.Sequence<[String], Never> = Publishers.Sequence<[Student], Never>(sequence: [student1, student2, student3]).map { $0.name }
        self.namesPublisher = namesPublisher
    }

    func updateStudentsFromNetwork() {
        let namesPublisher: Publishers.Future<[String], Never> = Publishers.Future { promise in
            getStudentsFromNetwork {
                let names: [String] = ....
                promise(.success([names]))
            }
        }
        self.namesPublisher = namesPublisher
    }
}

这里 namesPublisher 究竟该是什么类型呢?第一个方法返回的是 Publishers.Sequence<[String], Never>,第二个方法是 Publishers.Future<[String], Never>,而我们是无法定义成 Publisher<[String], Never> 的。

AnyPublisher、AnySubscriber、AnySubject 正是为这样的场景设计的,它们是通用类型,任意的 Publisher、Subscriber、Subject 都可以通过 eraseToAnyPublisher()、eraseToAnySubscriber()、eraceToAnySubject() 转化为对应的通用类型。

class StudentManager {

    let namePublisher: AnyPublisher<[String, Never]>

    func updateStudentsFromLocal() {
        let namePublisher: AnyPublisher<[String, Never]> = Publishers.Sequence<[Student], Never>(sequence: students).map { $0.name }.eraseToAnyPublisher()
        self.namePublisher = namePublisher
    }

    func updateStudentsFromNetwork() {
        let namePublisher: AnyPublisher<[String, Never]> = Publishers.Future { promise in
            promise(.success([names]))
        }.eraseToAnyPublisher()
        self.namePublisher = namePublisher
    }
}

AnyPublisher、AnySubscriber、AnySubject 的另外一个实用场景是创建自定义的 Publisher/Subscriber/Subject,因为在框架中它们已经是实现好相应的协议了。

let justPubliser = AnyPublisher<String, NSError> { subscribe in
    _ = subscribe.receive("hello")  // ignore demand
    subscribe.receive(completion: .finished)
}
let subscriber = AnySubscriber<String, NSError>(receiveValue: { input in
    print("Received input: \(input)")
    return .unlimited
}, receiveCompletion: { completion in
    print("Completed with \(completion)")
})
justPubliser.subscribe(subscriber)

// Received input: hello
// Completed with finished

Cancellable

让我们回到上文提到的 Future 的一个例子:

let apiRequest = Publishers.Future { promise in
    URLSession.shared.dataTask(with: url) { data, _, _ in
        promise(.success(data))
    }.resume()
}

它似乎能够很好的工作,但是一个比较经典的场景是:用户上传某个文件,上传到一半发现选错了就点击取消,我们也应该取消这个上传。类似的还有一些临时生成的资源需要在取消订阅时释放。这在 Combine 中该怎么实现呢?

Combine 中提供了 Cancellable 这个协议。定义很简单:

protocol Cancellable {
    /// Cancel the activity.
    func cancel()
}

前面我们提到 Publisher 在被订阅时会给 Subscriber 发送一个 Subscription 消息,这个 Subscription 恰好也实现了 Cancellable 协议,在取消订阅时,会调用它的 cancel 方法。

这样,我们就可以实现一个简单的例子:

struct AnySubscription: Subscription {
    private let cancellable: Cancellable

    init(_ cancel: @escaping () -> Void) {
        cancellable = AnyCancellable(cancel)
    }

    func cancel() {
        cancellable.cancel()
    }
}

let downloadPublisher = AnyPublisher<Data?, Never> { subscribe in
    let task = URLSession.shared.uploadTask(with: request, fromFile: file) { (data, _, _) in
        _ = subscribe.receive(data) // ignore demand
        subscribe.receive(completion: .finished)
    }
    let subscription = AnySubscription {
        task.cancel()
    }
    subscribe.receive(subscription: subscription)
    task.resume()
}
let cancellable = downloadPublisher.sink { data in
    print("Received data: \(data)")
}

// Cancel the task before it finishes
cancellable.cancel()
操作符

操作符是 Combine 中非常重要的一部分,通过各式各样的操作符,可以将原来各自不相关的逻辑变成一致的(unified)、声明式的(declarative)的数据流。

转换操作符:

  • map/mapError
  • flatMap
  • replaceNil
  • scan
  • setFailureType

过滤操作符:

  • filter
  • compactMap
  • removeDuplicates
  • replaceEmpty/replaceError

reduce 操作符:

  • collect
  • ignoreOutput
  • reduce

运算操作符:

  • count
  • min/max

匹配操作符:

  • contains
  • allSatisfy

序列操作符:

  • drop/dropFirst
  • append/prepend
  • prefix/first/last/output

组合操作符:

  • combineLatest
  • merge
  • zip

错误处理操作符:

  • assertNoFailure
  • catch
  • retry

时间控制操作符:

measureTimeInterval
debounce
delay
throttle
timeout

其他操作符:

  • encode/decode
  • switchToLatest
  • share
  • breakpoint/breakpointOnError
  • handleEvents

以下内容仅供参考,目前并未完成所有的操作符的详细介绍
map/mapError

map 将收到的值按照给定的 closure 转换为其他值,mapError 则将错误转换为另外一种错误类型。

func map<T>(_ transform: (Output) -> T) -> Publishers.Just<T>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • 简介 Combine是Apple在2019年WWDC上推出的一个新框架。该框架提供了一个声明性的Swift API...
    云天涯丶阅读 24,403评论 5 22
  • 作者寄语 很久之前就想写一个专题,专写Android开发框架,专题的名字叫 XXX 从入门到放弃 ,沉淀了这么久,...
    戴定康阅读 7,612评论 13 85
  • 一、Retrofit详解 ·Retrofit的官网地址为 : http://square.github.io/re...
    余生_d630阅读 1,804评论 0 5
  • RxJava详解(一) 年初的时候就想学习下RxJava然后写一些RxJava的教程,无奈发现已经年底了,然而我还...
    CharonChui阅读 2,402评论 0 0
  • http://blog.csdn.net/yyh352091626/article/details/5330472...
    奈何心善阅读 3,544评论 0 0