Understanding sources and subscribers makes it much easier to understand what‘s going on with mergeMap
under the hood. Where a typical operator invokes destination.next
directly, mergeMap
wraps destination.next
inside of a new source/subscriber combo so there‘s an "outer" next and an "inner" next.
import { fromEvent, of, Subscriber } from "rxjs"import { ?scan, ?delay, ?mergeMap} from "rxjs/operators"class MyMergeMapSubscriber extends Subscriber { ?constructor(sub, fn) { ???super(sub) ???this.fn = fn ?} ?_next(value) { ???console.log(`outer`, value) ???const o$ = this.fn(value) ???o$.subscribe({ ?????next: value => { ???????console.log(` ?inner`, value) ???????this.destination.next(value) ?????} ???}) ?}}const myMergeMap = fn => source => ?source.lift({ ???call(sub, source) { ?????source.subscribe( ???????new MyMergeMapSubscriber(sub, fn) ?????) ???} ?})const observable$ = fromEvent( ?document, ?"click").pipe( ?scan(i => i + 1, 0), ?myMergeMap(value => of(value).pipe(delay(500))))const subscriber = { ?next: value => { ???console.log(value) ?}, ?complete: () => { ???console.log("done") ?}, ?error: value => { ???console.log(value) ?}}observable$.subscribe(subscriber)
[RxJS] Implement RxJS `mergeMap` through inner Observables to Subscribe and Pass Values Through
原文地址:https://www.cnblogs.com/Answer1215/p/9715051.html