924db0be1ec4aa1f256a875b2d8328a65853f23655645afd494b644200d93aa026081a535600ee17ff94c96491c06f0a6c08687a28c83c71acfb9d3a03d7a8 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import { Operator } from '../Operator';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
  6. import { map } from './map';
  7. import { from } from '../observable/from';
  8. import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
  9. /* tslint:disable:max-line-length */
  10. export function switchMap<T, O extends ObservableInput<any>>(project: (value: T, index: number) => O): OperatorFunction<T, ObservedValueOf<O>>;
  11. /** @deprecated resultSelector is no longer supported, use inner map instead */
  12. export function switchMap<T, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction<T, ObservedValueOf<O>>;
  13. /** @deprecated resultSelector is no longer supported, use inner map instead */
  14. export function switchMap<T, R, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, R>;
  15. /* tslint:enable:max-line-length */
  16. /**
  17. * Projects each source value to an Observable which is merged in the output
  18. * Observable, emitting values only from the most recently projected Observable.
  19. *
  20. * <span class="informal">Maps each value to an Observable, then flattens all of
  21. * these inner Observables.</span>
  22. *
  23. * ![](switchMap.png)
  24. *
  25. * Returns an Observable that emits items based on applying a function that you
  26. * supply to each item emitted by the source Observable, where that function
  27. * returns an (so-called "inner") Observable. Each time it observes one of these
  28. * inner Observables, the output Observable begins emitting the items emitted by
  29. * that inner Observable. When a new inner Observable is emitted, `switchMap`
  30. * stops emitting items from the earlier-emitted inner Observable and begins
  31. * emitting items from the new one. It continues to behave like this for
  32. * subsequent inner Observables.
  33. *
  34. * ## Example
  35. * Generate new Observable according to source Observable values
  36. * ```typescript
  37. * import { of } from 'rxjs';
  38. * import { switchMap } from 'rxjs/operators';
  39. *
  40. * const switched = of(1, 2, 3).pipe(switchMap((x: number) => of(x, x ** 2, x ** 3)));
  41. * switched.subscribe(x => console.log(x));
  42. * // outputs
  43. * // 1
  44. * // 1
  45. * // 1
  46. * // 2
  47. * // 4
  48. * // 8
  49. * // ... and so on
  50. * ```
  51. *
  52. * Rerun an interval Observable on every click event
  53. * ```ts
  54. * import { fromEvent, interval } from 'rxjs';
  55. * import { switchMap } from 'rxjs/operators';
  56. *
  57. * const clicks = fromEvent(document, 'click');
  58. * const result = clicks.pipe(switchMap((ev) => interval(1000)));
  59. * result.subscribe(x => console.log(x));
  60. * ```
  61. *
  62. * @see {@link concatMap}
  63. * @see {@link exhaustMap}
  64. * @see {@link mergeMap}
  65. * @see {@link switchAll}
  66. * @see {@link switchMapTo}
  67. *
  68. * @param {function(value: T, ?index: number): ObservableInput} project A function
  69. * that, when applied to an item emitted by the source Observable, returns an
  70. * Observable.
  71. * @return {Observable} An Observable that emits the result of applying the
  72. * projection function (and the optional deprecated `resultSelector`) to each item
  73. * emitted by the source Observable and taking only the values from the most recently
  74. * projected inner Observable.
  75. * @method switchMap
  76. * @owner Observable
  77. */
  78. export function switchMap<T, R, O extends ObservableInput<any>>(
  79. project: (value: T, index: number) => O,
  80. resultSelector?: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R,
  81. ): OperatorFunction<T, ObservedValueOf<O>|R> {
  82. if (typeof resultSelector === 'function') {
  83. return (source: Observable<T>) => source.pipe(
  84. switchMap((a, i) => from(project(a, i)).pipe(
  85. map((b, ii) => resultSelector(a, b, i, ii))
  86. ))
  87. );
  88. }
  89. return (source: Observable<T>) => source.lift(new SwitchMapOperator(project));
  90. }
  91. class SwitchMapOperator<T, R> implements Operator<T, R> {
  92. constructor(private project: (value: T, index: number) => ObservableInput<R>) {
  93. }
  94. call(subscriber: Subscriber<R>, source: any): any {
  95. return source.subscribe(new SwitchMapSubscriber(subscriber, this.project));
  96. }
  97. }
  98. /**
  99. * We need this JSDoc comment for affecting ESDoc.
  100. * @ignore
  101. * @extends {Ignored}
  102. */
  103. class SwitchMapSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
  104. private index = 0;
  105. private innerSubscription?: Subscription;
  106. constructor(destination: Subscriber<R>,
  107. private project: (value: T, index: number) => ObservableInput<R>) {
  108. super(destination);
  109. }
  110. protected _next(value: T) {
  111. let result: ObservableInput<R>;
  112. const index = this.index++;
  113. try {
  114. result = this.project(value, index);
  115. } catch (error) {
  116. this.destination.error!(error);
  117. return;
  118. }
  119. this._innerSub(result);
  120. }
  121. private _innerSub(result: ObservableInput<R>) {
  122. const innerSubscription = this.innerSubscription;
  123. if (innerSubscription) {
  124. innerSubscription.unsubscribe();
  125. }
  126. const innerSubscriber = new SimpleInnerSubscriber(this);
  127. const destination = this.destination as Subscription;
  128. destination.add(innerSubscriber);
  129. this.innerSubscription = innerSubscribe(result, innerSubscriber);
  130. // The returned subscription will usually be the subscriber that was
  131. // passed. However, interop subscribers will be wrapped and for
  132. // unsubscriptions to chain correctly, the wrapper needs to be added, too.
  133. if (this.innerSubscription !== innerSubscriber) {
  134. destination.add(this.innerSubscription);
  135. }
  136. }
  137. protected _complete(): void {
  138. const {innerSubscription} = this;
  139. if (!innerSubscription || innerSubscription.closed) {
  140. super._complete();
  141. }
  142. this.unsubscribe();
  143. }
  144. protected _unsubscribe() {
  145. this.innerSubscription = undefined;
  146. }
  147. notifyComplete(): void {
  148. this.innerSubscription = undefined;
  149. if (this.isStopped) {
  150. super._complete();
  151. }
  152. }
  153. notifyNext(innerValue: R): void {
  154. this.destination.next!(innerValue);
  155. }
  156. }