b1b263b5a78fba2643d2bf6df838212bdee10900c16fc70812cdad9d98da27e6d83a3550e1f6a64ff6348af0dd4e39713b820df1b16aff2ef1019d72d6caf8 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import { ObservableInput, OperatorFunction } from '../types';
  2. import { operate } from '../util/lift';
  3. import { mergeInternals } from './mergeInternals';
  4. /**
  5. * Applies an accumulator function over the source Observable where the
  6. * accumulator function itself returns an Observable, then each intermediate
  7. * Observable returned is merged into the output Observable.
  8. *
  9. * <span class="informal">It's like {@link scan}, but the Observables returned
  10. * by the accumulator are merged into the outer Observable.</span>
  11. *
  12. * The first parameter of the `mergeScan` is an `accumulator` function which is
  13. * being called every time the source Observable emits a value. `mergeScan` will
  14. * subscribe to the value returned by the `accumulator` function and will emit
  15. * values to the subscriber emitted by inner Observable.
  16. *
  17. * The `accumulator` function is being called with three parameters passed to it:
  18. * `acc`, `value` and `index`. The `acc` parameter is used as the state parameter
  19. * whose value is initially set to the `seed` parameter (the second parameter
  20. * passed to the `mergeScan` operator).
  21. *
  22. * `mergeScan` internally keeps the value of the `acc` parameter: as long as the
  23. * source Observable emits without inner Observable emitting, the `acc` will be
  24. * set to `seed`. The next time the inner Observable emits a value, `mergeScan`
  25. * will internally remember it and it will be passed to the `accumulator`
  26. * function as `acc` parameter the next time source emits.
  27. *
  28. * The `value` parameter of the `accumulator` function is the value emitted by the
  29. * source Observable, while the `index` is a number which represent the order of the
  30. * current emission by the source Observable. It starts with 0.
  31. *
  32. * The last parameter to the `mergeScan` is the `concurrent` value which defaults
  33. * to Infinity. It represents the maximum number of inner Observable subscriptions
  34. * at a time.
  35. *
  36. * ## Example
  37. *
  38. * Count the number of click events
  39. *
  40. * ```ts
  41. * import { fromEvent, map, mergeScan, of } from 'rxjs';
  42. *
  43. * const click$ = fromEvent(document, 'click');
  44. * const one$ = click$.pipe(map(() => 1));
  45. * const seed = 0;
  46. * const count$ = one$.pipe(
  47. * mergeScan((acc, one) => of(acc + one), seed)
  48. * );
  49. *
  50. * count$.subscribe(x => console.log(x));
  51. *
  52. * // Results:
  53. * // 1
  54. * // 2
  55. * // 3
  56. * // 4
  57. * // ...and so on for each click
  58. * ```
  59. *
  60. * @see {@link scan}
  61. * @see {@link switchScan}
  62. *
  63. * @param accumulator The accumulator function called on each source value.
  64. * @param seed The initial accumulation value.
  65. * @param concurrent Maximum number of input Observables being subscribed to
  66. * concurrently.
  67. * @return A function that returns an Observable of the accumulated values.
  68. */
  69. export function mergeScan<T, R>(
  70. accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
  71. seed: R,
  72. concurrent = Infinity
  73. ): OperatorFunction<T, R> {
  74. return operate((source, subscriber) => {
  75. // The accumulated state.
  76. let state = seed;
  77. return mergeInternals(
  78. source,
  79. subscriber,
  80. (value, index) => accumulator(state, value, index),
  81. concurrent,
  82. (value) => {
  83. state = value;
  84. },
  85. false,
  86. undefined,
  87. () => (state = null!)
  88. );
  89. });
  90. }