文档 http://cn.rx.js.org/manual/overview.html#h24
Observables 是使用 Rx.Observable.create 或创建操作符创建的,并使用观察者来订阅(subscribe)它,然后执行它并发送 next / error / complete 通知给观察者,而且执行可能会被清理(unsubscribe)。
const { Observable } = require('rxjs');// 创建 Observablesvar observable = Observable.create(observer => { ???var id = setInterval(() => { ???????observer.next('hi') ???}, 1000); ???// 提供取消和清理 interval 资源的方法 ???return function unsubscribe() { ???????clearInterval(id); ???};});// Observer (观察者,執行)var observer = { ???next(res) { ???????format(res) ???}, ???error(err) { ???????format(err) ???}, ???complete() { ???????format('done.') ???},};// 订阅 Observablesvar subscription = observable.subscribe(observer);// .add() 合并多个订阅,以便同时取消订阅// subscription.add(childSubscription);// 撤销 add的合并// subscription.remove(childSubscription);// 3s后取消订阅setTimeout(() => { ???subscription.unsubscribe();}, 3000)