@TOC
1. 函数响应式编程思想必备基本概念简介
- 函数式编程
在计算机科学里,函数式编程是一种编程范式,它将计算描述为表达式求值并避免了状态和数据改变。函数式编程里面的“函数”指的是数学函数。
函数式编程思想:是将操作尽可能写在一起!嵌套的函数!!
本质:就是往方法里面传入Block,方法中嵌套Block调用.
了解函数式编程首先要理解命令式编程,命令式编程往往是我们大多数人固有的编程思维,这中模式依赖于我们希望自己的程序如何完成某项任务,而与之对应的是声明式编程(Declarative Programming),它关心是任务的目的是什么,而不是具体如何实现。这把程序员从那些细枝末节中解放了出来。而函数响应式编程正是声明式编程的一个子范式。
-
编程函数和数学函数对比
数学函数的特点是对于每一个自变量,存在唯一的因变量与之对应。而编程函数的特点是参数和返回值都不是必须的,函数可能依赖外界或者影响外界。
命令式和函数式的区别
所有的命令式语言都被设计来高效地使用冯诺依曼体系结构的计算机。实际上,最初的命令式语言的目的就是取代汇编语言,对机器指令进行进一步抽象。因此,命令式语言带有强烈的硬件结构特征。命令式语言的核心特性有:模拟存储单元的变量、基于传输操作的赋值语句,以及迭代形式的循环运算。命令式语言的基础是语句(特别是赋值),它们通过修改存储器的值而产生副作用(side effect)的方式去影响后续的计算。
函数式语言设计的基础是Lambda表达式,函数式程序设计把程序的输出定义为其输入的一个数学函数,在这里没有内部状态,也没有副作用。函数式语言进行计算的主要是将函数作用与给定参数之上。函数式语言没有命令式语言所必需的那种变量,可以没有赋值语句,也可以没有循环。一个程序就是函数定义和函数应用的说明;一个程序的执行就是对函数应用的求值。
详细教程参考:理解函数式编程
- block
block可以作为对象的属性,也可以作为方法的参数,也可以作为返回值。而作为返回值是链式编程的核心。
常用的block用法如下:
1. block表达式语法:
^返回值类型(参数列表){表达式}
实例 1:
^int (int count) {
return count + 1;
};
2. 声明类型变量的语法
//返回值类型(^变量名)(参数列表) = block表达式
实例 2:
int (^sum)(int count) = (^int count) {
return count + 1;
};
3. 作为函数参数的语法
- (void)func(int (^)(int))block {
}
//定义block简写
typedef int (^Sumblock)(int);
- (void)func(Sumblock)block {
}
- 高阶函数
高阶函数实际上就是函数的函数,它是所有函数式语言的性质。函数式语言中,函数作为第一等公民,这也意味着你像定义或调用变量一样去定义或调用函数。可以在任何地方定义,在函数内或函数外,可以将函数作为参数或者返回值。在数学和计算机科学中,高阶函数是至少满足下列一个条件的函数:
1 接受一个或多个函数作为输入
2 输出一个函数
在数学中它们也叫做算子(运算符)或泛函。微积分中的导数就是常见的例子,因为它映射一个函数到另一个函数。
关于Swift的相关高阶函数可以参考我的一篇博客:Swift的高阶函数
- 闭包
相关闭包知识总结请参考我的一篇博客:IOS基础-闭包
2. iOS中三种编程思想:链式、函数式和响应式编程
2.1 链式编程
- 特点:方法的返回值必须是方法的调用者
- 链式写法对比普通写法
- 普通写法:
实例 3
@interface Person : NSObject
- (void)eat;
- (void)sleep;
@end
@implementation Person
- (void)eat
{
NSLog(@"%s", __FUNCTION__);
}
- (void)sleep
{
NSLog(@"%s", __FUNCTION__);
}
@end
ViewController.m
Person *person = [[Person alloc] init];
//调用时必须单个调用,而且不能任意组合顺序
/** 普通的调用方式 */
[person eat];
[person sleep];
- 链式写法:
将上面的普通写法改为链式,方法增加一个返回值,且返回值为调用者本身。代码如下:
实例 4
// 链式写法
Person.h
- (Person *)eat;
- (Person *)sleep;
Person.m
- (Person *)eat
{
NSLog(@"%s", __FUNCTION__);
return self;
}
- (Person *)sleep
{
NSLog(@"%s", __FUNCTION__);
return self;
}
ViewController.m
Person *person = [[Person alloc] init];
/** 链式写法,这样不仅可以无限调用,而且可以控制顺序 */
[[person eat] sleep];
[[person sleep] eat];
[[person eat] eat];
/** 通过”点”语法,将需要执行的代码块连续的书写下去,就是链式编程.它能使代码简单易读,书写方便 */
person.eat.sleep.eat.sleep.sleep;
- 此外我们还可以将blockly作为返回值---链式编程带参数
代码如下:
实例 5
Person.h
- (Person *(^)(NSString *food))eat3;
- (Person *(^)(NSString *where))sleep3;
Person.m
- (Person *(^)(NSString *food))eat3
{
return ^(NSString *food) {
NSLog(@"吃:%@ ",food);
return self;
};
}
- (Person *(^)(NSString *where))sleep3
{
return ^(NSString *where) {
NSLog(@"睡在:%@上",where);
return self;
};
}
ViewController.m
Person *person = [[Person alloc] init];
/** 链式 + 函数式写法 */
person.sleep3(@"床").eat3(@"苹果").eat3(@"香蕉").sleep3(@"沙发");
返回值block不带参数,()不传参即可
person.sleep3().eat3().eat3().sleep3();
2.2 函数式编程
函数式编程思想:是将操作尽可能写在一起!嵌套的函数!!
本质:就是往方法里面传入Block,方法中嵌套Block调用.
如下代码:
实例 6
/** 返回调用者本身,获取其它属性和方法 */
- (Person *)calculator:(NSInteger(^)(NSInteger result))block
{
_result = block(_result);
return self;
}
Person *person = [[Person alloc] init];
/** 计算器 */
Person *calculatPerson = [person calculator:^NSInteger(NSInteger result) {
result = result + 10;
result = result*10;
return result;
}];
NSLog(@"%ld", calculatPerson.result);
简单理解就是将block块作为函数的参数,可以异步在参数block回调,将函数运行所需要的一些信息或者产出的一些结果通过block传递。代码逻辑清晰。
2.3 响应式编程
- 响应式编程解决的痛点:
在程序开发中:a = b + c赋值之后 b 或者 c 的值变化后,a 的值不会跟着变化;如果我们有这么一个需求:如果 b 或者 c 的数值发生变化,a 的数值会同时发生变化。怎么实现呢?
响应式编程就是为这个而生的。如我们经典的KVO(详情可以参考我的一篇博客 KVO 底层实现原理),经典的响应式编程框架RAC。当然少不了我们的主角Rxswift.
列如:
实例 7
a = 2
b = 2
c = a + b // c is 4
b = 3
// now what is the value of c?
在响应式编程中,c的值将会随着b的值改变而改变。
FRP提供了一种信号机制来实现这样的效果,通过信号来记录值的变化。信号可以被叠加、分割或合并。通过对信号的组合,就不需要去监听某个值或事件.
如下图:
可以把信号想象成水龙头,只不过里面不是水,而是玻璃球(value),直径跟水管的内径一样,这样就能保证玻璃球是依次排列,不会出现并排的情况(数据都是线性处理的,不会出现并发情况)。水龙头的开关默认是关的,除非有了接收方(subscriber),才会打开。这样只要有新的玻璃球进来,就会自动传送给接收方。可以在水龙头上加一个过滤嘴(filter),不符合的不让通过,也可以加一个改动装置,把球改变成符合自己的需求(map)。也可以把多个水龙头合并成一个新的水龙头(combineLatest:reduce:),这样只要其中的一个水龙头有玻璃球出来,这个新合并的水龙头就会得到这个球。
3. 什么是函数响应式编程
什么是函数响应式编程呢? 可以一句话概括:
-
响应式思想为体,函数式编程思想为用
- 函数式
特点:代码简洁(复用)、易于理解(接近自然语言)、便于代码管理
例如:doSomething1().doSomething2().doSomething3()
一系列的动作简洁明了,相互不干扰,可以合并使用也可以分开单独使用.
实例 8:下面代码清楚的讲解了函数式编程与普通处理方式的区别。
//例1:
//遍历数组(要求:1.首先获取 > 3的数字;2.获取的数字之后 + 1;3.所有数字中的偶数;4.可读性 清晰度)
let array = [1,2,3,4,5,6,7]
//普通处理方式:
for num in array{
if num > 3{
let number = num + 1
if (number % 2 == 0) {
print(number)
}
}
}
//函数式:
array.filter{ $0 > 3}
.filter{ ($0+1) % 2 == 0 }
.forEach { print($0) }
从代码中我们可以明显的对比出代码的优劣,普通代码实现for循环需要层层嵌套,非常累赘,可读性不高;而利用高阶函数实现的函数式编码代码清晰简洁。你可以注释掉中间的一个.filter{ ($0+1) % 2 == 0 }
代码一样可以正常运行,非常灵活。
- 响应式
对象对某一数据流变化做出响应的这种编码方式称为响应式。如对象A和对象B,A和B有一种“说不清”的关系,A要时刻监控B的行为,对B的变化也做出相应的变化。那么怎么实现呢?对于B来说,B做任何事都需要向A汇报,这样A就能实时监控B的行为,并响应。在iOS开发中我们经常会响应一些事件,button、tap、textField、textView、notifaction、KVO、NSTimer等等这些,都需要做响应监听,响应后都需要在对应的响应事件中去做处理,而原生开发中,触发对象与响应方法是分离的,如button的初始化和点击响应方法是分离的。
在程序开发中:a = b + c赋值之后 b 或者 c 的值变化后,a 的值不会跟着变化;
响应式编程目标就是:如果 b 或者 c 的数值发生变化,a 的数值会同时发生变化;
响应编程的经典案例:KVO
响应式编程框架:ReactiveCocoa(RAC) 详细学习可以参考:ReactiveCocoa博客
响应式编程的优点:
1) 开发过程中,状态以及状态之间依赖过多,RAC更加有效率地处理事件流,而无需显式去管理状态。在OO或者过程式编程中,状态变化是最难跟踪,最头痛的事。这个也是最重要的一点。
2) 减少变量的使用,由于它跟踪状态和值的变化,因此不需要再申明变量不断地观察状态和更新值。
3) 提供统一的消息传递机制,将oc中的通知,action,KVO以及其它所有UIControl事件的变化都进行监控,当变化发生时,就会传递事件和值。
4) 当值随着事件变换时,可以使用map,filter,reduce等函数便利地对值进行变换操作。
下面统一对比Rxswift的UIButton事件实现和原生的实现方式,了解一下响应式编程的优点。
实例 9 :对比传统UIButton响应事件写法和Rxswift中函数响应式写法的区别。
1. 传统写法
//传统写法,UI代码和逻辑是分开的,为了监听一个按钮的响应事件,我们需要在另外一处地方实现。这样可读性不好,代码繁琐。
button = UIButton()
button.addTarget(self, action: #selector(text), for: .touchUpInside)
@objc func text() {
print("Button clicked!")
}
2. Rxswift写法
//Rxswift的实现就简单多了,而且目标非常明确,就是三部曲:1创建序列,2,订阅响应消息,3.析构销毁
//当你订阅了响应消息后,只要序列发生了变化,订阅的消息总会触发,如下面的代码,当你订阅了按钮的点击事件后,每次点击按钮,订阅的消息subscibe就会收到一次。
self.button.rx.tap //序列,这里默认的序列是默认是.onTouchUpInside事件
.subscribe(onNext: { () in //订阅
print("Button clicked!")
}, onError: { (error) in //当Rxswift的事件链走不通,会回调这个onError,通知错误
print("错误信息")
}, onCompleted: {//当Rxswift订阅的所有事件链走完了,会回调这个onCompleted,告知执行完毕,这个和onError是对立互斥的,两者只会发生一个。
print("订阅完成")
})
.disposed(by: DisposeBag()) //销毁
这里 taps 就是按钮点击事件的序列。然后我们通过打印“Button clicked!”,来对每一次点击事件做出响应。这种编程方式叫做响应式编程。我们结合函数式编程以及响应式编程就得到了函数响应式编程。通过不同的构建函数,来创建所需要的数据序列。最后通过适当的方式来响应这个序列。这就是函数响应式编程。
通过上面的代码对比分析,我们可以看出Rxswift写出来的代码是多么简介,可读性逻辑清晰,看每一行就知道在做什么,不需要想原生UI响应和逻辑分开的,看代码需要跳来跳去的。通过一个这么小小的实例代码,我们初次见识到了Rxswift的强大,然而这个只是Rxswift框架功能的冰山一角。我强烈推荐大家都来学习下这么优秀的开源框架。
下面我们来窥探一下Rxswift到底是个什么东东,为啥这么牛逼。
4. Rxswift简介
4.1 什么是 ReactiveX(Reactive Extensions)
An API for asynchronous programming with observable streams
通过可观察的流实现异步编程的一种API(不明白?嗯,看完所有的例子再读一篇)
ReactiveX is more than an API, it's an idea and a breakthrough in programming. It has inspired several other APIs, frameworks, and even programming languages.
ReactiveX 不仅仅是一种 API 那么简单,它更是一种编程思想的突破。它已经影响了其他 API,frameworks,以及编程语言。
总的一句话概括:
ReactiveX(Reactive Extensions)是通过可观察的流实现异步编程的一种API,它结合了观察者模式、迭代器模式和函数式编程的精华。RxSwift 是 ReactiveX 编程思想的一种实现,几乎每一种语言都会有那么一个 Rx[xxxx] 框架,比如Rxswift, RxJava,RxJS 等等。
4.2 Rx的基本概念
Rx是一个家族,他们把函数响应式编程思想使用到了极致,要学习Rxswift必须先了解一下一些基本概念。
- 观察者模式 Observable:
对某些数据流(很广,可以是一些事件等)进行处理,使其变成可观察对象(Observable)序列,这样观察者(observer)就可以订阅这些序列;
Observable 直译为可观察的,它在 RxSwift 起到了举足轻重的作用,在整个 RxSwift 的使用过程中你会经常与它打交道。如果你使用过 RAC ,它如同 Signal 一样。RxSwift 中关键点就是在于如何把普通的数据或者事件变成可观察的,这样当某些数据或事件有变化的时候就会通知它的订阅者。RxSwift 中提供很多种创建 Observable 创建方法。比如:From、never、empty 和 create 等,更多创建方法。订阅者可以收到 3 个事件,onNext、onError 和 onCompleted,每个 Observable 都应该至少有一个 onError 或 onCompleted 事件,onNext 表示它传给下一个接收者时的数据流。
在Rxswift中我们把它理解为:观察者序列,(就是一系列可以被监听,观察的事件等,当你订阅了这些序列,你就可以在闭包中监听到对应的事件)通过下面一个图可以更好的理解Observable:
序列监听有三个步骤:1.创建序列,2订阅序列,3.销毁序列。当创建序列,并订阅了序列后,只要某个事件发送了序列消息,就可以在订阅的闭包里面监听到。下面我们一个小的实例来加深理解:
实例 10
//1:创建序列
//利用函数式编程思想,在create()构造函数中传入一个闭包,这个闭包会被类对象保存起来,后续每个时间,事件触发的时候会回调这个传入的闭包,这样就像连接了一个链条一样,顺着链条就可找到需要调用的闭包。
let ob = Observable<Any>.create { (observer) -> Disposable in
// 3:发送信号
obserber.onNext([1,2,3,4])
obserber.onCompleted()
// obserber.onError(NSError.init(domain: "error!", code: 10087, userInfo: nil))
return Disposables.create()
//2:订阅信息
//当我们订阅了Observable的消息后,只要Observable的事件触发,都会通过OnNext这个闭包告诉我们。
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)") //这里会监听到订阅的Observable消息
}, onError: { (error) in
print("error: \(error)") //当发生错误时,会回调这里
}, onCompleted: { // 当所有序列执行完毕时,会回调这里。
print("完成")
}) {
print("销毁")
}
如果你仔细观察这里的代码,会有一个疑问:从订阅中心observer,一直在用的序列,序列内部的代码是不曾看到的。为什么从序列闭包里面的发出信号,订阅信号的闭包里面能够订阅到? 这个问题我们将会在Rxswift 序列核心逻辑浅析
中详细分析
- 操作符 Operators:
然而对于订阅者来说(observer)某些选项(items)并不是自己需要的(需要过滤),某些选项(items)需要转换才能达到自己的目的;
Observable 创建后,可能为了满足某些需求需要修改它,这时就需要用到操作符。RxSwift 提供了非常多的操作符,当然不必要一一掌握这些操作符,使用的时候查一下即可,当然常见的操作符必须要掌握,比如 map、flatMap 、create 、filter 等。Operators详细介绍
实例 11:
这个例子主要把查找数组中的字符串 kongyulu,并显示到 Label 上。
override func viewDidLoad() {
super.viewDidLoad()
DispatchQueue.global().async {
self.from()
}
}
func from() {
Observable.from(["haha", "kongyulu", "cc", "wswy", "Rx"])
.subscribeOn(MainScheduler.instance)
.filter({ (text) -> Bool in
return text == "kongyulu"
})
.map({ (text) -> String in
return "my name is: " + text
})
.subscribe(onNext: { [weak self] (text) in
self?.nickNameLabel.text = text
})
.disposed(by: disposeBag)
}
迭代模式 Iterator:
这样集合或者序列中的值就可以进行遍历了。调度器 Scheduler:
为了提升用户体验,或其它目的,有些操作需要放到特定的线程去执行,比如 UI 操作需要放到主线程,这就涉及到了调度器。
如果你想给 Observable 操作符链添加多线程功能,你可以指定操作符(或者特定的Observable)在特定的调度器(Scheduler)上执行。对于 ReactiveX 中可观察对象操作符来说,它有时会携带一个调度器作为参数,这样可以指定可观察对象在哪一个线程中执行。而默认的情况下,某些可观察对象是在订阅者订阅时的那个线程中执行。SubscribeOn 可以改变可观察对象该在那个调度器中执行。ObserveOn 用来改变给订阅者发送通知时所在的调度器。这样就可以使可观察对象想在那个调度器中执行就在那个调度器中执行,不受约束,而这些细节是不被调用者所关心的。犹如 GCD 一样,你只管使用,底层线程是咋么创建的,你不必关心。
4.3 Rxswift框架的优点
4.4 Rxswift框架安装
Rxswift就是一个框架而已,通过pod安装跟其他框架没有什么差异
在podfile中写入
pod 'RxSwift'
pod 'RxCocoa'
命令行执行pod install
就可以了。
4.5 Rxswift简单使用
5. Rxswift 序列核心逻辑浅析
在实例 10
中我们留下一个疑问:从订阅中心observer,一直在用的序列,序列内部的代码是不曾看到的。为什么从序列闭包里面的发出信号,订阅信号的闭包里面能够订阅到?
下面我们将一步步通过Rxswift的源码分析来揭开这个疑团。
在分析代码前我们先回顾一下序列的三部曲:1.创建序列,2,订阅序列,3,销毁序列,其中在2中订阅序列之后,为什么我们就能监听到序列呢,理解了这个逻辑后,我们的疑问就会自然而解。
先看一个简单的类图:
接着我们来研究一下这段代码的执行逻辑
1:创建序列
// AnonymousObservable -> producer.subscriber -> run
// 保存闭包 - 函数式 保存 _subscribeHandler
//
let ob = Observable<Any>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("框架班级")
obserber.onCompleted()
// obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
return Disposables.create()
}
2:订阅信号
// AnonymousObserver - event .next -> onNext()
// _eventHandler
// AnonymousObservable._subscribeHandler(observer)
// 销毁
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}
用一个图来表达上面这段代码执行时,Rxswift框架做的事情如下:
现在我们来研究一下源码的具体实现:
- 创建序列
当我们调用let ob = Observable<Any>.create { (obserber) -> Disposable in }
这行代码时,进入Rxswift源码可以看到实际调用了:
extension ObservableType {
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> Disposable) -> RxSwift.Observable<Self.E>
}
-
保存序列到对象中
根据这个函数的官方注释,我们得知create()这个方法是在Create.swift中实现的
- 接着我们订阅消息
当我执行let _ = ob.subscribe(onNext: { (text) }, onError: { (error) in }, onCompleted: { })
这行代码时,我们就订阅了消息,我们分析源码得知,Rxswift实际上是调用了
我们进入源码查看该方法的实现:
/**
Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence.
- parameter onNext: Action to invoke for each element in the observable sequence.
- parameter onError: Action to invoke upon errored termination of the observable sequence.
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
- parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has
gracefully completed, errored, or if the generation is canceled by disposing subscription).
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
分析源码,可以看到,我们在订阅消息注册的回调:onNext, onError,onComplete, onDisposed这个四个闭包都是作为函数的参数,在调用ob.subscribe()这个方法是作为参数传入进来的,在上面代码中定义了一个逃逸闭包 let observer = AnonymousObserver<E> {} 在这个闭包内部,当调用这个逃逸闭包的调用者传递不同的event就会调用我们传入的相应闭包,这样我们就监听到了订阅的消息。如下图:
这里我们可以得知,只有我们的观察者有了事件变化,只需要通知上面代码定义的observer这个参数闭包就可以了。
现在我们前面提到的疑问的答案来了:
在源码中我们只看到了return Disposables.create( self.asObservable().subscribe(observer), disposable
就结束了。 然后玄机就在这句代码,self.asObservable()其实就是我们的创建的序列,而subscribe()就回调了我们传入的observer闭包,而在这个observer就会调用我们船人都监听序列消息闭包onNext(), onError(),onCompleted().
接下来我们可以分析下 subscribe(observer)是如何调用observer的
- subscribe(observer)是如何调用observer的?
看看下面这张图就明白了:
5.1 Rxswift 序列核心逻辑流程总结
通过上面的分析,到这里我们总结一下大致流程:
1.调用create()创建序列时,首先创建了一个AnonymousObserver对象,在初始化时传递了一个闭包作为参数并且保存下来self._eventHandler = eventHandler。
AnonymousObserver是匿名观察者,用于存储和处理事件的闭包。
- 然后在最后有self.asObservable().subscribe(observer)这样一行代码,asObservable返回的是对象本身。
- 然后调用subscribe这个函数并且把创建的AnonymousObserver对象传递过去,会来到AnonymousObservable这个类里面,但是发现这个类里面没有subscribe方法,我们往父类Producer里面找到这个方法。
- 在Producer源码里面我们发现调用了run方法,也就是AnonymousObservable这个类里面的run方法,把observer作为参数传过来。
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
在run方法中,先创建一个AnonymousObservableSink对象并持有observer,然后调用这个对象的run方法把self传递过去,也就是把observable作为参数。
AnonymousObservableSink这个类将可观察者Observable和观察者Observer链接起来,实现事件的传递,起到一个桥梁的作用。
- 最后Producer父类会调用子类的run()方法,子类parent就是我们创建的序列。
//parent就是我们创建的序列。
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
在这里触发了_subscribeHandler的调用,这里的_subscribeHandler就是之前create函数参数传入的闭包。也就是这段闭包代码:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:发送信号 obserber.onNext("框架班级") obserber.onCompleted() // obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil)) return Disposables.create() }
注意:parent._subscribeHandler(AnyObserver(self)) 这行代码把self转换成AnyObserver对象,也就是把AnonymousObservableSink对象转换成AnyObserver对象。
- 接着分析下AnyObserver源码,可以看到在构造函数中有一行代码self.observer = observer.on,就是把AnonymousObservableSink类的on函数赋值给AnyObserver类的observer变量。从这里就可以明白为什么这行代码observer.onNext("发送信号") 最终会触发AnonymousObservableSink.on事件。
public struct AnyObserver<Element> : ObserverType {
...
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<E> {
return self
}
}
通过上面这段源码我们也看见public func asObserver() -> AnyObserver<E> { return self }
这个函数返回self 也就明白了之前说的 asObserver()为什么就是我们创建的序列。也就是这个非常牛逼的代码return Disposables.create( self.asObservable().subscribe(observer), disposable )
这里的self.asObservable() 实际就是我们创建的序列。
- 分析AnonymousObservableSink.on事件源码,可以得知在收到.error, .completed事件时,会调用forwardOn()方法,而在fowardOn()方法里面会调用self._observer.on(event)。
这里的_observer就是第二步调用subscribe函数里面创建的observer对象。
会先进入到父类的ObserverBase的on方法。
具体源码如下:
1. AnonymousObservableSink类on方法
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
2. Sink类的forwardOn方法
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
3. ObserverBase的on方法
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
4. onCore()方法最终会调用之前保存的闭包_eventHandler(event)
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
- 通过一张图分析上面5的源码逻辑
- 到此我们可以弄明白的最初的create()函数里面的observer是怎么调用到ob.subscribe()的。
最后有两张图总结序列事件传递的核心流程