6238cfcd78e25735096d3182eaa7cd2e8cdb2af6dfbc68fa715521ef0150a250340a37f69d914987628f71e0b91257b0d8754a895eb061e133f6e50e8c7580 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import { Observable } from '../Observable';
  2. import { ObservableInputTuple } from '../types';
  3. import { innerFrom } from './innerFrom';
  4. import { argsOrArgArray } from '../util/argsOrArgArray';
  5. import { EMPTY } from './empty';
  6. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  7. import { popResultSelector } from '../util/args';
  8. export function zip<A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): Observable<A>;
  9. export function zip<A extends readonly unknown[], R>(
  10. sources: [...ObservableInputTuple<A>],
  11. resultSelector: (...values: A) => R
  12. ): Observable<R>;
  13. export function zip<A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): Observable<A>;
  14. export function zip<A extends readonly unknown[], R>(
  15. ...sourcesAndResultSelector: [...ObservableInputTuple<A>, (...values: A) => R]
  16. ): Observable<R>;
  17. /**
  18. * Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each
  19. * of its input Observables.
  20. *
  21. * If the last parameter is a function, this function is used to compute the created value from the input values.
  22. * Otherwise, an array of the input values is returned.
  23. *
  24. * ## Example
  25. *
  26. * Combine age and name from different sources
  27. *
  28. * ```ts
  29. * import { of, zip, map } from 'rxjs';
  30. *
  31. * const age$ = of(27, 25, 29);
  32. * const name$ = of('Foo', 'Bar', 'Beer');
  33. * const isDev$ = of(true, true, false);
  34. *
  35. * zip(age$, name$, isDev$).pipe(
  36. * map(([age, name, isDev]) => ({ age, name, isDev }))
  37. * )
  38. * .subscribe(x => console.log(x));
  39. *
  40. * // Outputs
  41. * // { age: 27, name: 'Foo', isDev: true }
  42. * // { age: 25, name: 'Bar', isDev: true }
  43. * // { age: 29, name: 'Beer', isDev: false }
  44. * ```
  45. *
  46. * @param args Any number of `ObservableInput`s provided either as an array or as an object
  47. * to combine with each other.
  48. * @return An Observable of array values of the values emitted at the same index from each
  49. * individual `ObservableInput`.
  50. */
  51. export function zip(...args: unknown[]): Observable<unknown> {
  52. const resultSelector = popResultSelector(args);
  53. const sources = argsOrArgArray(args) as Observable<unknown>[];
  54. return sources.length
  55. ? new Observable<unknown[]>((subscriber) => {
  56. // A collection of buffers of values from each source.
  57. // Keyed by the same index with which the sources were passed in.
  58. let buffers: unknown[][] = sources.map(() => []);
  59. // An array of flags of whether or not the sources have completed.
  60. // This is used to check to see if we should complete the result.
  61. // Keyed by the same index with which the sources were passed in.
  62. let completed = sources.map(() => false);
  63. // When everything is done, release the arrays above.
  64. subscriber.add(() => {
  65. buffers = completed = null!;
  66. });
  67. // Loop over our sources and subscribe to each one. The index `i` is
  68. // especially important here, because we use it in closures below to
  69. // access the related buffers and completion properties
  70. for (let sourceIndex = 0; !subscriber.closed && sourceIndex < sources.length; sourceIndex++) {
  71. innerFrom(sources[sourceIndex]).subscribe(
  72. createOperatorSubscriber(
  73. subscriber,
  74. (value) => {
  75. buffers[sourceIndex].push(value);
  76. // if every buffer has at least one value in it, then we
  77. // can shift out the oldest value from each buffer and emit
  78. // them as an array.
  79. if (buffers.every((buffer) => buffer.length)) {
  80. const result: any = buffers.map((buffer) => buffer.shift()!);
  81. // Emit the array. If theres' a result selector, use that.
  82. subscriber.next(resultSelector ? resultSelector(...result) : result);
  83. // If any one of the sources is both complete and has an empty buffer
  84. // then we complete the result. This is because we cannot possibly have
  85. // any more values to zip together.
  86. if (buffers.some((buffer, i) => !buffer.length && completed[i])) {
  87. subscriber.complete();
  88. }
  89. }
  90. },
  91. () => {
  92. // This source completed. Mark it as complete so we can check it later
  93. // if we have to.
  94. completed[sourceIndex] = true;
  95. // But, if this complete source has nothing in its buffer, then we
  96. // can complete the result, because we can't possibly have any more
  97. // values from this to zip together with the other values.
  98. !buffers[sourceIndex].length && subscriber.complete();
  99. }
  100. )
  101. );
  102. }
  103. // When everything is done, release the arrays above.
  104. return () => {
  105. buffers = completed = null!;
  106. };
  107. })
  108. : EMPTY;
  109. }