上一篇文章,笔者与大家一起分析了RxSwift中最基础的一条调用链:create->subscribe的源码流程:
create创建AnonymousObservable,AnonymousObservable通过AnonymousObservableSink来实现事件的转发,最终传递到create传入的监听闭包中。
在阅读本文之前,建议读者先阅读一下上一篇。
正如我们之前所说,这是最简单的一个链条,通常来说,不能够满足我们的需求,也就是事件源所产生的事件,很可能并不是我们真正所期望去监听的事件。比如,我们通过发起网络请求,创建了一个Response事件序列,但是我们最终期望得到的,实际是特定的一个模型。RxSwift中有一类序列类型,正可以解决这样的需求。这类序列通常有这样的特点:
同AnonymousObservable一样,它们也都是Producer的子类
它们不能作为原始序列,也就是说,它们不能产生原始事件
它们可以对原始序列进行一定的变换
也就是说,它们是一种特殊的Observable,去对原始序列进行一定的变换操作,并将事件转发给最终的Observer。我们不妨称之为中间序列。比如,今天即将分析的三种序列:Map、CompactMap与Filter的原理。
或许有读者会问,为什么要把这三类序列放在一起分析?那是因为他们的原理极其相似,并且他们都对应着Swift中Sequence协议的一个函数,且功能一致。
Map
首先,我们分析一下Map。我们知道Sequence协议的map函数,作用就是遍历序列并对元素进行一定的转换,输出经过转换后的序列。
比如:
[1,2,3,4].map { i in
String(i*2)
}
输出结果是["2", "4", "6", "8"]
RxSwift中的map操作符,作用也是一样。
Observable<Int>.create { observer in
//订阅闭包
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onNext(4)
return Disposables.create()
}
.map { i in
String(i*2)
}
.subscribe(onNext: { i in
print(i)
})
.disposed(by: disposeBag)
首先第一行,通过create产生了一个四个Int元素的原始序列:1,2,3,4,然后通过map进行转换后,转成了四个字符串的序列:“2”,“4”,“6”, “8”。但是不像Swift中的Sequence的map函数是同步执行的,RxSwift里面的map是异步执行的。那么RxSwift里这个map函数是如何生效的呢?
中间序列的原理其实和AnonymousObservable的原理十分相似,或者说,RxSwift世界里,大部分的可观察序列,都有着相似的实现原理,即:通过一个特定的Producer子类去持有一定的资源,并创建一个特定的Sink子类 ,来负责具体的事件处理与转发。
所以,我们接下来就看一下这个Map的源码。
extension ObservableType {
public func map<Result>(_ transform: @escaping (Element) throws -> Result)
-> Observable<Result> {
Map(source: self.asObservable(), transform: transform)
}
}
final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Transform = (SourceType) throws -> ResultType
typealias ResultType = Observer.Element
typealias Element = SourceType
private let transform: Transform
init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
self.transform = transform
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
let mappedElement = try self.transform(element)
self.forwardOn(.next(mappedElement))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
}
final private class Map<SourceType, ResultType>: Producer<ResultType> {
typealias Transform = (SourceType) throws -> ResultType
private let source: Observable<SourceType>
private let transform: Transform
init(source: Observable<SourceType>, transform: @escaping Transform) {
self.source = source
self.transform = transform
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
let sink = MapSink(transform: self.transform, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
首先,通过协议扩展的方式,为ObservableType添加了一个map方法,接受(Element) throws -> Result这样一个转换闭包。Result就是这个map方法返回的Observable的元素类型,定义和Swift.Sequence.map定义非常像。
在这个方法中,返回了一个Map实例,这个Map持有了self.asObservable()和转换闭包。通过上一篇,我们知道了,create创建的是一个AnonymousObservable,这个self.asObservable()其实返回的就是self。
接下来,我们看run方法。run方法里面生成了一个MapSink,MapSink持有了转换闭包和run方法传入的参数observer的(上一篇分析过,这个observer是在ObservableType的subscribe(noNext:onError:onCompleted:onDisposed:)方法中创建的一个AnyObserver)。
接下来,调用了self.source.subscribe(sink),因此,这个source,也就是之前提到的原始序列AnonymousObservable的所有事件,都会被转发到这个MapSink的on方法中。
接下来,只要看MapSink的on方法就清楚了。
final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Transform = (SourceType) throws -> ResultType
typealias ResultType = Observer.Element
typealias Element = SourceType
private let transform: Transform
init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
self.transform = transform
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
let mappedElement = try self.transform(element)
self.forwardOn(.next(mappedElement))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
}
当接收到error事件和complete事件时,正常传递,当接收到next事件时,会对next事件的element进行transform操作,并将转换结果传递下去。通过forwardOn方法,将转换后的结果传递到自身持有观察者observer中,从而实现了元素的转换。
CompactMap与Filter
由于今天所要分析的三个序列原理几乎一样,所以我们只是主要分析一下第一个Map,后面这两个简单聊一下就好了。
CompactMap接受一个(SourceType) throws -> ResultType?转换闭包,将原始序列转换并忽略掉结果为nil的元素
Filter则接受一个(Element) throws -> Bool的过滤闭包,将原始序列中不满足条件的元素过滤掉
final private class CompactMap<SourceType, ResultType>: Producer<ResultType> {
typealias Transform = (SourceType) throws -> ResultType?
private let source: Observable<SourceType>
private let transform: Transform
init(source: Observable<SourceType>, transform: @escaping Transform) {
self.source = source
self.transform = transform
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
let sink = CompactMapSink(transform: self.transform, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
final private class Filter<Element>: Producer<Element> {
typealias Predicate = (Element) throws -> Bool
private let source: Observable<Element>
private let predicate: Predicate
init(source: Observable<Element>, predicate: @escaping Predicate) {
self.source = source
self.predicate = predicate
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = FilterSink(predicate: self.predicate, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
依然是在run方法中,生成了对应的CompactMapSink和FilterSink,并让原始序列订阅了创建的Sink。
final private class CompactMapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Transform = (SourceType) throws -> ResultType?
typealias ResultType = Observer.Element
typealias Element = SourceType
private let transform: Transform
init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
self.transform = transform
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
if let mappedElement = try self.transform(element) {
self.forwardOn(.next(mappedElement))
}
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
}
CompactMapSink的run方法中,同样对next元素进行变换,但是只传递了转换后非nil的元素。
final private class FilterSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Predicate = (Element) throws -> Bool
typealias Element = Observer.Element
private let predicate: Predicate
init(predicate: @escaping Predicate, observer: Observer, cancel: Cancelable) {
self.predicate = predicate
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
do {
let satisfies = try self.predicate(value)
if satisfies {
self.forwardOn(.next(value))
}
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .completed, .error:
self.forwardOn(event)
self.dispose()
}
}
}
FilterSink的run方法中,对next元素调用过滤闭包,值传递闭包结果为true的元素
总结
经过分析,我们知道,Map、CompactMap与Filter原理几乎一样:
1.通过对原始序列调用相应的map/compactMap/filter方法,创建了对应的Map/CompactMap/Filter序列
2.在各自的run方法中,让创建的Sink去订阅原始序列
3.在Sink的on方法中,调用持有的对应闭包,对元素进行转换或过滤,传递符合要求的元素
本文所有内容到此告一段落,如果各位坚持看到了这里,并且觉得笔者写的对您有一点帮助,还麻烦您动动手,给点个赞。不胜感激。