c20054c238b055ef5f46fe66497703efc21f89d99fd9dfd9f3eda53bf127a3d37abc0e4869886a34d9fd915f13558f4c0ae654a87c10b3dc2f4689714dc8b3 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import { OperatorFunction, ObservableInputTuple } from '../types';
  2. import { operate } from '../util/lift';
  3. import { createOperatorSubscriber } from './OperatorSubscriber';
  4. import { innerFrom } from '../observable/innerFrom';
  5. import { identity } from '../util/identity';
  6. import { noop } from '../util/noop';
  7. import { popResultSelector } from '../util/args';
  8. export function withLatestFrom<T, O extends unknown[]>(...inputs: [...ObservableInputTuple<O>]): OperatorFunction<T, [T, ...O]>;
  9. export function withLatestFrom<T, O extends unknown[], R>(
  10. ...inputs: [...ObservableInputTuple<O>, (...value: [T, ...O]) => R]
  11. ): OperatorFunction<T, R>;
  12. /**
  13. * Combines the source Observable with other Observables to create an Observable
  14. * whose values are calculated from the latest values of each, only when the
  15. * source emits.
  16. *
  17. * <span class="informal">Whenever the source Observable emits a value, it
  18. * computes a formula using that value plus the latest values from other input
  19. * Observables, then emits the output of that formula.</span>
  20. *
  21. * ![](withLatestFrom.png)
  22. *
  23. * `withLatestFrom` combines each value from the source Observable (the
  24. * instance) with the latest values from the other input Observables only when
  25. * the source emits a value, optionally using a `project` function to determine
  26. * the value to be emitted on the output Observable. All input Observables must
  27. * emit at least one value before the output Observable will emit a value.
  28. *
  29. * ## Example
  30. *
  31. * On every click event, emit an array with the latest timer event plus the click event
  32. *
  33. * ```ts
  34. * import { fromEvent, interval, withLatestFrom } from 'rxjs';
  35. *
  36. * const clicks = fromEvent(document, 'click');
  37. * const timer = interval(1000);
  38. * const result = clicks.pipe(withLatestFrom(timer));
  39. * result.subscribe(x => console.log(x));
  40. * ```
  41. *
  42. * @see {@link combineLatest}
  43. *
  44. * @param inputs An input Observable to combine with the source Observable. More
  45. * than one input Observables may be given as argument. If the last parameter is
  46. * a function, it will be used as a projection function for combining values
  47. * together. When the function is called, it receives all values in order of the
  48. * Observables passed, where the first parameter is a value from the source
  49. * Observable. (e.g.
  50. * `a.pipe(withLatestFrom(b, c), map(([a1, b1, c1]) => a1 + b1 + c1))`). If this
  51. * is not passed, arrays will be emitted on the output Observable.
  52. * @return A function that returns an Observable of projected values from the
  53. * most recent values from each input Observable, or an array of the most
  54. * recent values from each input Observable.
  55. */
  56. export function withLatestFrom<T, R>(...inputs: any[]): OperatorFunction<T, R | any[]> {
  57. const project = popResultSelector(inputs) as ((...args: any[]) => R) | undefined;
  58. return operate((source, subscriber) => {
  59. const len = inputs.length;
  60. const otherValues = new Array(len);
  61. // An array of whether or not the other sources have emitted. Matched with them by index.
  62. // TODO: At somepoint, we should investigate the performance implications here, and look
  63. // into using a `Set()` and checking the `size` to see if we're ready.
  64. let hasValue = inputs.map(() => false);
  65. // Flipped true when we have at least one value from all other sources and
  66. // we are ready to start emitting values.
  67. let ready = false;
  68. // Other sources. Note that here we are not checking `subscriber.closed`,
  69. // this causes all inputs to be subscribed to, even if nothing can be emitted
  70. // from them. This is an important distinction because subscription constitutes
  71. // a side-effect.
  72. for (let i = 0; i < len; i++) {
  73. innerFrom(inputs[i]).subscribe(
  74. createOperatorSubscriber(
  75. subscriber,
  76. (value) => {
  77. otherValues[i] = value;
  78. if (!ready && !hasValue[i]) {
  79. // If we're not ready yet, flag to show this observable has emitted.
  80. hasValue[i] = true;
  81. // Intentionally terse code.
  82. // If all of our other observables have emitted, set `ready` to `true`,
  83. // so we know we can start emitting values, then clean up the `hasValue` array,
  84. // because we don't need it anymore.
  85. (ready = hasValue.every(identity)) && (hasValue = null!);
  86. }
  87. },
  88. // Completing one of the other sources has
  89. // no bearing on the completion of our result.
  90. noop
  91. )
  92. );
  93. }
  94. // Source subscription
  95. source.subscribe(
  96. createOperatorSubscriber(subscriber, (value) => {
  97. if (ready) {
  98. // We have at least one value from the other sources. Go ahead and emit.
  99. const values = [value, ...otherValues];
  100. subscriber.next(project ? project(...values) : values);
  101. }
  102. })
  103. );
  104. });
  105. }