-
Notifications
You must be signed in to change notification settings - Fork 1
白话RxJS
(个人约定,非任何best practice)
-
Observable变量以$结尾,如state$; -
Subject变量以$$结尾, 如state$$; -
Subscription变量以_结尾,如ultimate_。
RxJS包含许多概念,RxJS Manual在介绍RxJS的时候,引入了更多概念,对于初学者而言,不够直白。
比如:
ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.
再如:
Observable: represents the idea of an invokable collection of future values or events.
本文将尝试用白话来解读RxJS。
比如:
Observable:好似一个Function,有很多return,定义了在哪个时点return什么数据。
部分章节使用了伪代码,用来说明某个类的特性;其余部分,受限于水平,实在写不出伪代码。
难免错漏,敬请指正。
Observer最平易近人,就是一个Object,里面有3种callback,形如:
const observer = {
next: (value) => console.log(value),
error: (error) => console.log(error),
complete: () => console.log('completed')
}可以对照Function来看Observable。
Function是这样:
const funcX = () => {
在处理处理处理以后:
return 'gogogo, but only once';
}Observable就是这样:
const x$ = () => {
(无条件的):
return 'I\'m the 1st go';
在'500 ms'以后:
return 'I\'m the 2nd go';
...
在http请求出错时:
return 'something went wrong';
// 然后就没有然后了
}Observable可以return很多次,任意多次,每次return可能需要满足一定条件,比如“鼠标点击的时候”。
可是,怎么可能return多于1次呢?
这是伪代码,其实Observable应该是这样:
const x$ = (observer) => {
(无条件的):
observer.next('I\'m the 1st go');
在'500 ms'以后:
observer.next('I\'m the 2nd go');
...
在http请求出错时:
observer.error('something went wrong');
// 然后就没有然后了
}我们需要一个Observer,并用observer.callback来代替return。
Function在定义好以后,不会自动运行,Observable也是一样。
我们可以通过funX()(即在函数名后加上括号)来调用funX。
怎样调用Observable呢?看这里:
const FakeObservableClass = function() {
this.execution = (observer) => {
observer.next('I\'m the 1st go');
observer.next('I\'m the 2nd go');
}
}
FakeObservableClass.prototype.subscribe = function(observer) {
const that = this;
that.execution(observer);
}
const x$ = new FakeObservableClass();
const observer = {next: (value) => console.log(value)};
x$.subscribe(observer); // 启动x$的运行我们是通过subscribe方法,即x$.subscribe(observer)来启动x$的运行的。
Function在启动以后是停不下来的,直到return。
Observable在subscribe以后,如果execution的内容是async的(比如setInterval、dom events、http response),它是可以停下来的。
以IntervalObservable为例:
const FakeIntervalObservableClass = function(interval) {
this.intervalId = null;
this.execution = (observer) => {
let state = 0;
let execute = (state) => {
this.intervalId = setInterval(() => {
observer.next(++state);
}, interval);
}
execute(state);
}
}
FakeIntervalObservableClass.prototype.subscribe = function(observer) {
const that = this;
that.execution(observer); // subscribe的时候会执行setInterval
// 接下来return一个Obeject,这个Object就是Subscription
return {
unsubscribe: () => {
clearInterval(that.intervalId); // unsubscribe时执行clearInterval
}
}
}
const x$ = new FakeIntervalObservableClass(500);
const observer = {next: (value) => console.log(value)};
const x_ = x$.subscribe(observer);
x_.unsubscribe();subscribe方法返回一个Object,这个Object就是Subscription(即x_),它唯一的用途就是unsubsribe。
unsubscribe会停下Observable的运行,并释放其占用的资源。
上面的例子中,当我们调用x_.unsubscribe()的时候,触发了clearInterval,后面的execution就不再进行下去了。
如果是dom事件触发的Observable,subscribe时会addEventListener,unsubscribe时会removeEventListener。
如果是XMLHttpRequest触发的Observable,就是send和abort,等等。
Subject是一个Observable,因为它有subscribe方法;Subject又是一个Observer,因为它有next方法。
它维护一个observers列表,当运行subject.subscribe(observerX)的时候,这个observerX就被加到列表里,unsubscribe时从列表中删掉。
Subject像是一个proxy,外部可以调用subject.next(value)时,这个value会forEach给Subject的observers。
const FakeSubjectClass = function() {
this.observers = [];
}
FakeSubjectClass.prototype.subscribe = function(observer) {
let that = this;
// 添加新observer,获取其所在的index;添加之前会检查是否observers已经有了这个observer,这里偷懒了没写
let indexA = (that.observers.push(observer) - 1);
return {
unsubscribe: () => {
// 从observers列表里将这个位置在indexA的observer删除
that.observers = that.observers.filter((observer, index) => index !== indexA)
}
}
}
FakeSubjectClass.prototype.next = function(value) {
let that = this;
that.observers.forEach(observer => observer.next(value)); // 外部调用subject.next,subject.next马上forEach转出去;这个就是multicast
}
let x$$ = new FakeSubjectClass(); // Subject以$$结尾,Observable以$结尾
let observerB = {next: (value) => {console.log(`另一个logger说:${value}`)}}
x$$.subscribe(observerB); // 这个observerB被加到了x$$的observers列表里。
x$.subscribe(x$$); // 当x$向外推送时,调用的是x$$.next;x$$.next转身马上就forEach转给它的observers。顺带提一下unicast和multicast的资源消耗。
说Observable是unicast,而Subject是multicast,就是因为这个observers列表 -- 在Observable里是没有的这个列表的。
每次运行Observable.subscribe()都相当于一个Function.call(),是一个独立的运行,需要单独消耗资源。
而Subject.subscribe()消耗资源很少。
比如:
let y$ = Observable.whatever() // 创建新的Observable
let y1_ = y$.subscribe(observerA); // 运行一次y$里的execution,消耗资源
let y2_ = y$.subscribe(observerB); // 又运行一次...
let y3_ ... // 又...
// ===========
let z$ = Observable.whatever();
let z$$ = new Subject();
let z1_ = z$$.subscribe(observerX) // observers列表里加一项,基本不消耗什么资源
let z2_ = z$$.subscribe(observerY) // observers列表里加一项
let z0_ = z$.subscribe(z$$) // 运行一次z$里的execution,仅此一次,然后z$推给z$$,z$$用forEach推给后面在Observable类上调用的Operator是Static Operator, 比如:
Observable.interval(500); // 每隔500ms,推送一个递增整数,从0开始
Observable.from([1, 2]); // 连续推送1, 2
Observable.fromEvent(document, 'click');
Observable.fromPromise(fetch('/users'));
Observable.merge(x$, y$); // 将x$与y$的推送混合在一起
Observable.concat(x$, y$); // 先运行x$,等x$推送observer.complete(),再运行y$
...Static Operator可以从无到有创建一个Observable(像是变形金刚里的AllSpark),也可以把互不干预的Observable组合起来。
前面的伪代码中用到const x$ = new FakeObservableClass();。
实际生活中new Observable()都是由Static Operator来处理的,所以在代码中不会看到new Observable()。
===== 分割线 =====
在Observable的实例上调用的Operator是Instance Operator, 比如:
const x$ = Observable.interval(500); // 创建一个Observable实例
const y$ = x$
.map(v => v*3) // 每个推送的数值乘以3
.filter(w => w%5 === 1) // 只推送除以5余数为1的数值
.delay(1000) // 等待1秒再推送
.take(10) // 推送10个数值以后,调用observer.complete()结束
x$ !== y$ // x$还是那个x$
...如果Observable是一幅画,Instance Operator就是滤镜。
经过滤镜处理,我们拿到的一幅新的画,原来的画还在。
Scheduler的职能是控制并发事件。本人开发经验接近0,实在想不出实际生活中何时会用到Scheduler,也确实在实践中没用过。
如果要观察每种Scheduler对数据推送的影响,可以打开RxJS Manual,开启console,贴入下面的代码,回车。
const x$ = Rx.Observable.create((observer) => {
observer.next(0);
observer.next(1);
}) // 这个用的是null Scheduler
const xOnQueue$ = x$.map(v=>'onQueue'+v).observeOn(Rx.Scheduler.queue);
const xOnAsync$ = x$.map(v=>'onAsync'+v).observeOn(Rx.Scheduler.async);
const xOnAsap$ = x$.map(v=>'onAsap'+v).observeOn(Rx.Scheduler.asap);
const xOnAnimationFrame$ = x$.map(v=>'onAnimationFrame'+v).observeOn(Rx.Scheduler.animationFrame);
const merged$ = Rx.Observable.merge(xOnAnimationFrame$, xOnAsync$, xOnAsap$, xOnQueue$, x$);
// 注意Observable.merge时的顺序,再对照console.log中的输出顺序
const merged_ = merged$.subscribe(console.log);TestScheduler是测试RxJS代码时用到的一个虚拟时间机器,Observable可以挂靠其上。
我们定义的Observable在真实环境下可能要跑上一段时间才结束。而在TestScheduler里,就是一个同步的执行。
比如Observable.interval(20).take(20),这个observable每隔20 ms推送一个递增数字,一共推20个,需要用时400 ms。
通过Observable.interval(20, testScheduler).take(20),来设定Observable运行在虚拟时间机器上,在测试环境下,瞬间结束。
其中的testScheduler是TestScheduler的一个实例。
关于如何使用TestScheduler,后续文章中会提到。
提到Observable的时候,就想想Function,一个有随意数量return的Function。
Happy coding!