新手读的懂的RxSwift源码解析(二)-- Map、CompactMap与Filter

上一篇文章,笔者与大家一起分析了RxSwift中最基础的一条调用链:create->subscribe的源码流程:
create创建AnonymousObservable,AnonymousObservable通过AnonymousObservableSink来实现事件的转发,最终传递到create传入的监听闭包中。

在阅读本文之前,建议读者先阅读一下上一篇。

正如我们之前所说,这是最简单的一个链条,通常来说,不能够满足我们的需求,也就是事件源所产生的事件,很可能并不是我们真正所期望去监听的事件。比如,我们通过发起网络请求,创建了一个Response事件序列,但是我们最终期望得到的,实际是特定的一个模型。RxSwift中有一类序列类型,正可以解决这样的需求。这类序列通常有这样的特点:

  • 同AnonymousObservable一样,它们也都是Producer的子类

  • 它们不能作为原始序列,也就是说,它们不能产生原始事件

  • 它们可以对原始序列进行一定的变换

也就是说,它们是一种特殊的Observable,去对原始序列进行一定的变换操作,并将事件转发给最终的Observer。我们不妨称之为中间序列。比如,今天即将分析的三种序列:Map、CompactMap与Filter的原理。

或许有读者会问,为什么要把这三类序列放在一起分析?那是因为他们的原理极其相似,并且他们都对应着Swift中Sequence协议的一个函数,且功能一致。

Map

首先,我们分析一下Map。我们知道Sequence协议的map函数,作用就是遍历序列并对元素进行一定的转换,输出经过转换后的序列。
比如:

[1,2,3,4].map { i in
    String(i*2)
}

输出结果是["2", "4", "6", "8"]
RxSwift中的map操作符,作用也是一样。

Observable<Int>.create { observer in
    //订阅闭包
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onNext(4)
    return Disposables.create()
}
.map { i in
    String(i*2)
}
.subscribe(onNext: { i in
    print(i)
})
.disposed(by: disposeBag)

首先第一行,通过create产生了一个四个Int元素的原始序列:1,2,3,4,然后通过map进行转换后,转成了四个字符串的序列:“2”,“4”,“6”, “8”。但是不像Swift中的Sequence的map函数是同步执行的,RxSwift里面的map是异步执行的。那么RxSwift里这个map函数是如何生效的呢?

中间序列的原理其实和AnonymousObservable的原理十分相似,或者说,RxSwift世界里,大部分的可观察序列,都有着相似的实现原理,即:通过一个特定的Producer子类去持有一定的资源,并创建一个特定的Sink子类 ,来负责具体的事件处理与转发。

所以,我们接下来就看一下这个Map的源码。

extension ObservableType {
    public func map<Result>(_ transform: @escaping (Element) throws -> Result)
        -> Observable<Result> {
        Map(source: self.asObservable(), transform: transform)
    }
}

final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType

    typealias ResultType = Observer.Element 
    typealias Element = SourceType

    private let transform: Transform

    init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
        self.transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                let mappedElement = try self.transform(element)
                self.forwardOn(.next(mappedElement))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

final private class Map<SourceType, ResultType>: Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType

    private let source: Observable<SourceType>

    private let transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        self.source = source
        self.transform = transform
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
        let sink = MapSink(transform: self.transform, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

首先,通过协议扩展的方式,为ObservableType添加了一个map方法,接受(Element) throws -> Result这样一个转换闭包。Result就是这个map方法返回的Observable的元素类型,定义和Swift.Sequence.map定义非常像。

在这个方法中,返回了一个Map实例,这个Map持有了self.asObservable()和转换闭包。通过上一篇,我们知道了,create创建的是一个AnonymousObservable,这个self.asObservable()其实返回的就是self。

接下来,我们看run方法。run方法里面生成了一个MapSink,MapSink持有了转换闭包和run方法传入的参数observer的(上一篇分析过,这个observer是在ObservableType的subscribe(noNext:onError:onCompleted:onDisposed:)方法中创建的一个AnyObserver)。
接下来,调用了self.source.subscribe(sink),因此,这个source,也就是之前提到的原始序列AnonymousObservable的所有事件,都会被转发到这个MapSink的on方法中。
接下来,只要看MapSink的on方法就清楚了。

final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType

    typealias ResultType = Observer.Element 
    typealias Element = SourceType

    private let transform: Transform

    init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
        self.transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                let mappedElement = try self.transform(element)
                self.forwardOn(.next(mappedElement))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

当接收到error事件和complete事件时,正常传递,当接收到next事件时,会对next事件的element进行transform操作,并将转换结果传递下去。通过forwardOn方法,将转换后的结果传递到自身持有观察者observer中,从而实现了元素的转换。

CompactMap与Filter

由于今天所要分析的三个序列原理几乎一样,所以我们只是主要分析一下第一个Map,后面这两个简单聊一下就好了。
CompactMap接受一个(SourceType) throws -> ResultType?转换闭包,将原始序列转换并忽略掉结果为nil的元素
Filter则接受一个(Element) throws -> Bool的过滤闭包,将原始序列中不满足条件的元素过滤掉

final private class CompactMap<SourceType, ResultType>: Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType?

    private let source: Observable<SourceType>

    private let transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        self.source = source
        self.transform = transform
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
        let sink = CompactMapSink(transform: self.transform, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

final private class Filter<Element>: Producer<Element> {
    typealias Predicate = (Element) throws -> Bool
    
    private let source: Observable<Element>
    private let predicate: Predicate
    
    init(source: Observable<Element>, predicate: @escaping Predicate) {
        self.source = source
        self.predicate = predicate
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = FilterSink(predicate: self.predicate, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

依然是在run方法中,生成了对应的CompactMapSink和FilterSink,并让原始序列订阅了创建的Sink。

final private class CompactMapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType?

    typealias ResultType = Observer.Element 
    typealias Element = SourceType

    private let transform: Transform

    init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
        self.transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                if let mappedElement = try self.transform(element) {
                    self.forwardOn(.next(mappedElement))
                }
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

CompactMapSink的run方法中,同样对next元素进行变换,但是只传递了转换后非nil的元素。

final private class FilterSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Predicate = (Element) throws -> Bool
    typealias Element = Observer.Element
    
    private let predicate: Predicate
    
    init(predicate: @escaping Predicate, observer: Observer, cancel: Cancelable) {
        self.predicate = predicate
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            do {
                let satisfies = try self.predicate(value)
                if satisfies {
                    self.forwardOn(.next(value))
                }
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .completed, .error:
            self.forwardOn(event)
            self.dispose()
        }
    }
}

FilterSink的run方法中,对next元素调用过滤闭包,值传递闭包结果为true的元素

总结

经过分析,我们知道,Map、CompactMap与Filter原理几乎一样:
1.通过对原始序列调用相应的map/compactMap/filter方法,创建了对应的Map/CompactMap/Filter序列
2.在各自的run方法中,让创建的Sink去订阅原始序列
3.在Sink的on方法中,调用持有的对应闭包,对元素进行转换或过滤,传递符合要求的元素

本文所有内容到此告一段落,如果各位坚持看到了这里,并且觉得笔者写的对您有一点帮助,还麻烦您动动手,给点个赞。不胜感激。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容