创建一个Observable的代码
const hello = Rx.Observable.create(function(observer) {
observer.next('Hello');
observer.next('World');
});
- create传入的参数为一个subscriber函数,该函数有一个参数为observer,observer是一个观察者对象,该对象有三个属性,属性值为函数。如下:
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
- 当observable被订阅的时候,即observable.subscribe(function next(x), function error(x), function complete(x)), 创建该observable时传入的回调函数即被执行,observer调用next方法,并传入参数(即发射值),发射出去的值则作为subscribe中回调函数的参数,即传入function next(x)中的x,并执行该回调函数。 自此只要被观察者发生了变化,就会调用create的回调函数将变化的值通知观察者,并执行对应observer的next()方法。
Observable - 被观察者
Observable.subscribe(observer) - 对被观察者进行订阅
Observer - 观察者
创建被观察者时,传入一个订阅函数,当观察者对被观察者进行订阅时,就调用这个订阅函数,订阅函数的作用是向观察者发射数据,通知数据(流)的变化。观察者根据发射过来的不同数据,自行处理对应处理函数里的逻辑。
源码
看看源码做了什么:
创建Observable的过程:
(1) 将create()的参数(一个匿名函数function(observer){})作为参数传入构造函数,并保存该匿名函数到类变量this._subscribe。
Observable.create = function (subscribe) {
return new Observable(subscribe);
};
return Observable;
function Observable(subscribe) {
this._isScalar = false;
if (subscribe) {
this._subscribe = subscribe;
}
调用observable.subscribe的过程:
(1) 将subscribe()里的回调函数传入。
(2) 通过toSubscriber_1.toSubscriber包装成一个Subsciber对象即下文中的sink。该对象有一些属性,其中包括next, error, complete属性分别指向subscribe中对应的回调函数。并通过this._subscribe(sink),将sink传值给Rx.Observable.create(function(observer) {..})中的observer参数。
(3) 通过this._subscribe(sink)调用了create()里的回调函数即function(observer) { observer.next('Hello');},并执行对应的next()函数,并传入对应的参数。
Observable.prototype.subscribe = function (observerOrNext, error, complete) {
var operator = this.operator;
var sink = toSubscriber_1.toSubscriber(observerOrNext, error, complete);
if (operator) {
operator.call(sink, this.source);
}
else {
sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
}
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
return sink;
};
Observable.prototype._trySubscribe = function (sink) {
try {
return this._subscribe(sink);
}
catch (err) {
sink.syncErrorThrown = true;
sink.syncErrorValue = err;
sink.error(err);
}
};