推荐我的Rxjs教程:Rxjs系列教程目录
前言
随着开发中项目的越来越大,代码的要求越来越高,于是开始四处搜找各种js库进行学习。为了学习代码行为模式,例如:竞争等等。在技术总监的指引下找到Rxjs进行学习,再次表以感谢。在看教程时,有很多地方不解,于是用博客做以记录,并将自己的经验以尽可能简单的方式分享给大家。
这里简单解释一下Rxjs,RxJS 是一个js库,它通过使用 observable 序列来编写异步和基于事件的程序。ReactiveX 结合了 观察者模式、迭代器模式 和 使用集合的函数式编程,以满足以一种理想方式来管理事件序列所需要的一切。看到这你肯定疑问它有什么用?你先放下这个疑问,先看看一个简单的案例。
Observable可观察对象
Observable可观察对象:表示一个可调用的未来值或者事件的集合。
一个例子
通常你这样注册事件监听:
var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));
使用RxJS创建一个可观察对象:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.subscribe(() => console.log('Clicked!'));
看到这段代码你可能迷茫,这是什么意思?难道只是换了一种写法?
观察者模式
要说Observable可观察对象首先得说说:观察者模式
。
观察者模式
又叫发布-订阅(Publish/Subscribe)模式
:
他定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象(也可叫做抽象的通知者)。这个主题对象在状态发生变化时,会通知所有观察者对象,使他们能够自动更新自己。而且各个观察者之间相互独立。
观察者模式的结构中包含四种角色:
(1)抽象主题(Subject):主题是一个接口,该接口规定了具体主题需要实现的方法,比如,添加、删除观察者以及通知观察者更新数据的方法。
(2)抽象观察者(Observer):观察者是一个接口,该接口规定了具体观察者用来更新数据的方法。
(3)具体主题(ConcreteSubject):具体主题是实现主题接口类的一个实例,该实例包含有可以经常发生变化的数据。具体主题需使用一个集合,比如ArrayList,存放观察者的引用,以便数据变化时通知具体观察者。
(4)具体观察者(ConcreteObserver):具体观察者是实现观察者接口类的一个实例。具体观察者包含有可以存放具体主题引用的主题接口变量,以便具体观察者让具体主题将自己的引用添加到具体主题的集合中,使自己成为它的观察者,或让这个具体主题将自己从具体主题的集合中删除,使自己不再是它的观察者。
观察者模式结构的类图如下所示:
在现实生活中,我们经常用它来“放风”,比如:上自习时,老师不在我们在玩,派出一个同学看老师,老师来了通知大家;如果该同学没有发现老师,老师“咳咳”两声通知大家自己来了让大家安静自习,然后批评一番。
这里的监听的抽象主题对象是“老师是否来了”,同学们是观察者,同学们依赖主题对象的状态并且是一种一对多的依赖关系,同学们同时监听主题对象的反馈结果,同学们订阅(观察)这个主题,在这个主题发生变化时,来更新自己:
老师来了->安静自习,写作业。
老师没来->该玩玩,该吃吃。
放风的同学发现老师会通知,放风的同学没发现,老师进入教室老师自己也会“咳咳”两声通知。因此“老师是否来了”这个抽象主题中,老师通知和放风的同学通知都是这个抽象主题对象的具体实现,这个抽象主题对象就是可观察对象(可以观察嘛~~~),这时再想想前面对于可观察对象的定义(可调用的未来值或者事件的集合)是不是明白了?
同样上面的RxJS的代码也是这种效应,Rx.Observable.fromEvent(button, 'click')
是一个创建一个点击事件的可观察对象,然后使用subscribe(() => console.log('Clicked!'));
订阅这个可观察对象,其中() => console.log('Clicked!')
是一个观察者,如果发生点击事件主题对象的状态会发生改变,而他则会被执行。
这样有什么好处呢?
- 我们只需要针对可观察对象这一抽象的主题对象接口编程,减少了与具体的耦合,即他只是一个抽象的通知者。
- 观察者只依赖主题对象的状态,这意味着维持各个观察者的一致性,但又保证了各个观察者是相互独立的。
发布-订阅
Observables(可观察对象)
以惰性的方式推送多值的集合。
示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流(即完成推送):
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
要调用 Observable 并看到这些值,我们需要订阅 Observable:
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
结果如下:
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
拉取 (Pull) vs. 推送 (Push)
拉取和推送是两种不同的协议,用来描述数据生产者 (Producer)
如何与数据消费者 (Consumer)
如何进行通信的。
什么是拉取? - 在拉取体系中,数据的消费者
决定何时从数据生产者
那里获取数据,而数据生产者
自身并不会意识到什么时候数据将会被发送给数据消费者
。
每个 JavaScript 函数都是拉取体系。函数是数据的生产者
,调用该函数的代码通过从函数调用中“取出”一个单个返回值来对该函数进行消费(return 语句)。
ES2015 引入了生成器generator 函数
和 迭代器iterators (function*)
,这是另外一种类型的拉取体系。调用iterator.next()
的代码是消费者,它会从 iterator(生产者) 那“取出”多个值。
什么是推送? -在推体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。
Promise(承诺)是当今JS中最常见的Push推体系,一个Promise(数据的生产者)发送一个resolved value
(成功状态的值)来注册一个回调(数据消费者),但是不同于函数的地方的是:Promise决定着何时数据才被推送至这个回调函数。
RxJS引入了Observables(可观察对象),一个新的 JavaScript 推送体系。一个可观察对象是一个产生多值的生产者,并将值“推送”给观察者(消费者)。
Function 是惰性的评估运算,调用时会同步地返回一个单一值。
Generator(生成器):是惰性的评估运算,在迭代时同步的返回零到无限多个值(如果有可能的话)
Promise 是最终可能(或可能不)返回单个值的运算。
Observable 是惰性的评估运算,它可以从它被调用的时刻起同步或异步地返回零到(有可能的)无限多个值。
producer(生产者) | consumer (消费者) | 单个值 | 多个值 | |
---|---|---|---|---|
pull拉 | Passive(被动的一方):被请求的时候产生数据 | Active(起主导的一方):决定何时请求数据 | Function | Iterator |
push推 | Active:按自己的节奏生产数据 | Passive:对接收的数据做出反应(处理接收到的数据) | Promise | Observable |
可观察对象(Observables):作为函数的泛化
与常见的主张相悖的是,可观察对象不像EventEmitters(事件驱动),也不象Promises因为它可以返回多个值。可观察对象可能会在某些情况下有点像EventEmitters(事件驱动),也即是当它们使用Subjects被多播时,但是大多数情况下,并不像EventEmitters。
可观察对象(Observables)像是没有参数, 但可以泛化为允许返回多个值的函数。
思考下面的程序
function foo() {
console.log('Hello');
return 42;
}
var x = foo.call(); // same as foo()
console.log(x); // "Hello" 42
var y = foo.call(); // same as foo()
console.log(y); // "Hello" 42
使用Observables得到同样的结果
var foo=Rx.Observable.create(function(observer){
console.log('Hello');
observer.next(42);
});
foo.subscribe(function(x){
console.log(x);
});
foo.subscribe(function (y){
console.log(y);
});
得到同样的输出
"Hello" 42 "Hello" 42
这是因为函数和可观察对象均是惰性计算。
如果你不调用call()
函数,console.log('Hello')
将不会发生。可观察对象同样如此,如果你不调用subscribe()
函数订阅,console.log('Hello')也将不会发生。
此外,call()
或者subscribe()
是一个独立的操作:两次call()函数调用触发两个独立副作用,两次subscribe()订阅触发两个独立的副作用。相反的,EventEmitters(事件驱动)共享副作用并且无论是否存在订阅者都会尽早执行,Observables 与之相反,不会共享副作用并且是延迟执行。
订阅一个可观察对象类似于调用一个函数。
一些人认为可观察对象是异步的。这并不确切,如果你用一些log语句包围在订阅程序的前后:
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
输出如下:
"before"
"Hello"
42
"after"
以上可以显示对foo的订阅是完全同步的,就像调用一个函数。
可观察对象以同步或者异步的方式发送多个值。
那它和普通函数有哪些不同之处呢?
可观察对象可以随时间"return"多个值。然而函数却做不到,你不能够使得如下的情况发生:
function foo() {
console.log('Hello');
return 42;
return 100; // dead code. will never happen
}
函数仅仅可以返回一个值,然而,不要惊讶,可观察对象却可以做到这些:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100); // "return" another value
observer.next(200); // "return" yet another
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
同步输出:
"before"
"Hello"
42
100
200
"after"
当然,你也可以以异步的方式返回值:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // happens asynchronously
}, 1000);
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
同步输出:
"before"
"Hello"
42
100
200
"after"
300
总结:
- fun.call()意味着"同步地给我一个值"
- observable.subscribe()意味着"给我任意多个值,同步也好异步也罢。"
Observable 剖析
Observables(可观察对象) 是使用 Rx.Observable.create 或创建操作符创建的,并使用观察者来订阅它,然后执行它并发送 next / error / complete 通知给观察者,而且执行可能会被清理。这四个方面全部编码进 Observables 实例中,但某些方面是与其他类型相关的,像 Observer (观察者) 和 Subscription (订阅)。
Observable 的核心关注点:
-创建 Observables(可观察对象)
-订阅 Observables(可观察对象)
-执行 Observables(可观察对象)
-清理 Observables(可观察对象)
创建 Observables(可观察对象)
Rx.Observable.create
是 Observable
构造函数的别名,它接收一个参数:subscribe
函数。
下面的示例创建了一个 Observable(可观察对象),它每隔一秒会向观察者发送字符串 'hi' 。
var observable = Rx.Observable.create(function subscribe(observer) { // 通常我们会像之前的案例一样,省略subscribe这个名字
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});
Observables 可以使用 create 来创建, 但通常我们使用所谓的创建操作符, 像 of、from、interval、等等。
在上面的示例中,subscribe 函数是用来描述 Observable 最重要的一块。我们来看下订阅是什么意思。
订阅 Observables
示例中的 Observable 对象创建的 observable 可以订阅,像这样:
observable.subscribe(x => console.log(x));
observable.subscribe
和 Observable.create(function subscribe(observer) {...})
中的 subscribe
有着同样的名字,这并不是一个巧合。在Rx
库中,它们是不同的。但从实际出发,你可以认为在概念上它们是等同的。
subscribe 调用
在同一 Observable(可观察对象)
的多个观察者之间是不共享的。当使用一个观察者调用 observable.subscribe 时,Observable.create(function subscribe(observer) {...}) 中的 subscribe 函数只服务于给定的观察者。对 observable.subscribe 的每次调用都会触发针对给定观察者的独立设置。
订阅 Observable 像是调用函数, 并提供接收数据的回调函数。
这与像 addEventListener / removeEventListener 这样的事件处理方法 API 是完全不同的。使用 observable.subscribe,在 Observable 中不会将给定的观察者注册为监听器。Observable 甚至不会去维护一个附加的观察者列表。
subscribe 调用是启动 “Observable 执行”的一种简单方式, 并将值或事件传递给本次执行的观察者。
整体性案例:
程序先同步执行,过1s之后执行异步,之后点击按钮执行事件。
var observable = Rx.Observable.create(function subscribe (observer) { // 创建Observable
console.log('start-----------')
observer.next(42) // 同步执行
observer.next(100)
observer.next(200)
setTimeout(() => { // 异步执行
observer.next(300)
}, 1000)
var button = document.getElementById('rx-eventListener')
button.addEventListener('click', () => { // 不知何时执行
console.log('Clicked!')
observer.next('Clicked-end')
})
console.log('end-------------')
})
observable.subscribe(x => { // 订阅Observable
console.log('观察者1')
console.log(x)
})
observable.subscribe(x => { // 订阅Observable
console.log('观察者2')
console.log(x)
})
结果如下:
这里我们可以看到观察者1
和观察者2
虽然从发布者那里拿到的值是一样的,但是每个观察者都是相互独立的。
执行 Observables
Observable.create(function subscribe(observer) {...})
中...
的代码表示 “Observable 执行”,它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。
Observable 执行可以传递三种类型的值:
-"Next" 通知: 发送一个值,比如数字、字符串、对象,等等。
-"Error" 通知: 发送一个 JavaScript 错误 或 异常。
-"Complete" 通知: 不再发送任何值。
"Next" 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。
这些约束用所谓的 Observable 语法或合约表达最好,写为正则表达式是这样的:
next*(error|complete)?
在 Observable 执行中, 可能会发送零个到无穷多个 "Next" 通知。如果发送的是 "Error" 或 "Complete" 通知的话,那么之后不会再发送任何通知了。
下面是 Observable 执行的示例,它发送了三个 "Next" 通知,然后是 "Complete" 通知:
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
Observable 严格遵守自身的规约,所以下面的代码不会发送 "Next" 通知 4:
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4); // 因为违反规约,所以不会发送
});
在 subscribe 中用 try/catch 代码块来包裹任意代码是个不错的主意,如果捕获到异常的话,会发送 "Error" 通知:
var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // 如果捕获到异常会发送一个错误
}
});
整体性案例:
var observable = Rx.Observable.create(function subscribe (observer) {
try {
observer.next(1)
observer.next(2)
observer.next(3)
observer.complete() // 不再发送任何值
observer.next(4) // 因为违反规约,所以不会发送
} catch (err) {
observer.error(err) // 如果捕获到异常会发送一个 JavaScript 错误 或 异常
}
})
observable.subscribe(x => { // 正常
console.log('观察者-正常')
console.log(x)
})
observable.subscribe(x => { // 模拟异常
throw new Error('抛出一个异常')
console.log('观察者-异常')
console.log(x)
})
清理 Observable 执行
因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。(每一个观察者都是互相独立的)
当调用了 observable.subscribe ,观察者会被附加到新创建的Observable 执行
中。这个调用还返回一个对象,即 Subscription (订阅):
var subscription = observable.subscribe(x => console.log(x));
Subscription 表示进行中的执行,它有最小化的 API 以允许你取消执行。使用 subscription.unsubscribe()
你可以取消进行中的执行:
var observable = Rx.Observable.from([10, 20, 30])
var subscription = observable.subscribe(x => console.log(x)) // 10 20 30
subscription.unsubscribe() // 同步执行完成立马清除
当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。
当我们使用 create() 方法创建 Observable 时,Observable 必须定义如何清理执行的资源。你可以通过在 function subscribe() 中返回一个自定义的 unsubscribe 函数。
举例来说,这是我们如何清理使用了 setInterval 的 interval 执行集合:
var cont = 0
var setInterObs = Rx.Observable.create(function subscribe(observer) {
// 追踪 interval 资源
var intervalID = setInterval(() => {
cont++
observer.next('hi')
}, 1000)
// 提供取消和清理 interval 资源的方法
return function unsubscribe() {
cont = 0
clearInterval(intervalID)
}
})
var unsubscribe = setInterObs.subscribe(function (x) { // 如果不使用箭头函数,回调中的 this 代表 subscription
if (cont < 10) {
console.log(x)
} else {
console.log(this)
console.log('清除')
this.unsubscribe()
console.log(cont) // 我们可以发现cont被重置为0,这表明从 subscribe 返回的 unsubscribe 在概念上也等同于 subscription.unsubscribe。
}
})
执行结果如图:
我们可以从执行结果里看到 subscription
的unsubscribe方法作为 subscription
的私有方法 _unsubscribe
。
并且从 console.log(cont)
等于0,我们可以知道正如 observable.subscribe
类似于 Observable.create(function subscribe() {...})
,从 subscribe
返回的 unsubscribe
在概念上也等同于 subscription.unsubscribe
。(即我们在执行subscription.unsubscribe
时,从 subscribe
返回的 unsubscribe
也是会被执行的)
事实上,如果我们抛开围绕这些概念的 ReactiveX 类型,也就只剩下更加直观的JavaScript。代码如下:
function subscribe (observer) {
var intervalID = setInterval(() => {
observer.next('hello')
}, 1000)
return function unsubscribe() {
clearInterval(intervalID)
}
}
var unsubscribe = subscribe({next: (x) => console.log(x)})
unsubscribe() // 清理资源
为什么我们要使用像 Observable、Observer 和 Subscription 这样的 Rx 类型?原因是保证代码的安全性(比如 Observable 规约)和操作符的可组合性。
结语
这里我们大致已经了解了Rxjs的可观察对象(Observables)。
再回顾一下前面的内容:
观察者模式
又叫发布-订阅(Publish/Subscribe)模式
:
他定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有观察者对象,使他们能够自动更新自己。而且各个观察者之间相互独立。
这样的好处是:
- 我们只需要针对可观察对象这一抽象的主题对象接口编程,减少了与具体的耦合,即他只是一个抽象的通知者。
- 观察者只依赖主题对象的状态,这意味着维持各个观察者的一致性,但又保证了各个观察者是相互独立的。
producer(生产者) | consumer (消费者) | 单个值 | 多个值 | |
---|---|---|---|---|
pull拉 | Passive(被动的一方):被请求的时候产生数据 | Active(起主导的一方):决定何时请求数据 | Function | Iterator |
push推 | Active:按自己的节奏生产数据 | Passive:对接收的数据做出反应(处理接收到的数据) | Promise | Observable |
可观察对象(Observables)像是没有参数, 但可以泛化为允许返回多个值的函数。订阅一个可观察对象类似于调用一个函数。可观察对象以同步或者异步的方式发送多个值。
使用:
创建:
Rx.Observable.create(function subscribe(observer) {...})
。Observables 可以使用 create 来创建, 但通常我们使用所谓的创建操作符, 像 of、from、interval、等等。如:Rx.Observable.from([10, 20, 30])
订阅:observable.subscribe(x => {...})
。
执行:传递值的话,会发送"Next" 通知observer.next(value)
;结束的话,会发送 "Complete" 通知observer.complete()
;捕获到异常的话,会发送 "Error" 通知observer.error(err)
。在 Observable 执行中, 可能会发送零个到无穷多个 "Next" 通知。如果发送的是 "Error" 或 "Complete" 通知的话,那么之后不会再发送任何通知了。
清理:subscription.unsubscribe()
。当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用unsubscribe()
方法就可以取消执行。
提示:后面还有精彩敬请期待,请大家关注我的专题:web前端。如有意见可以进行评论,每一条评论我都会认真对待。