ce0874ebc7db7f95e0907962f8af042a740b0c1f59c271996dcc3c9a7f6f0ea4f7a78eb3464116855047a789f7930659a43a9693554c5c73bc014059493d66 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import { Observable } from '../Observable';
  2. import { ObservedValueOf, ObservableInputTuple, ObservableInput } from '../types';
  3. import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
  4. import { innerFrom } from './innerFrom';
  5. import { popResultSelector } from '../util/args';
  6. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  7. import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
  8. import { createObject } from '../util/createObject';
  9. import { AnyCatcher } from '../AnyCatcher';
  10. // forkJoin(any)
  11. // We put this first because we need to catch cases where the user has supplied
  12. // _exactly `any`_ as the argument. Since `any` literally matches _anything_,
  13. // we don't want it to randomly hit one of the other type signatures below,
  14. // as we have no idea at build-time what type we should be returning when given an any.
  15. /**
  16. * You have passed `any` here, we can't figure out if it is
  17. * an array or an object, so you're getting `unknown`. Use better types.
  18. * @param arg Something typed as `any`
  19. */
  20. export function forkJoin<T extends AnyCatcher>(arg: T): Observable<unknown>;
  21. // forkJoin(null | undefined)
  22. export function forkJoin(scheduler: null | undefined): Observable<never>;
  23. // forkJoin([a, b, c])
  24. export function forkJoin(sources: readonly []): Observable<never>;
  25. export function forkJoin<A extends readonly unknown[]>(sources: readonly [...ObservableInputTuple<A>]): Observable<A>;
  26. export function forkJoin<A extends readonly unknown[], R>(
  27. sources: readonly [...ObservableInputTuple<A>],
  28. resultSelector: (...values: A) => R
  29. ): Observable<R>;
  30. // forkJoin(a, b, c)
  31. /** @deprecated Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument */
  32. export function forkJoin<A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): Observable<A>;
  33. /** @deprecated Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument */
  34. export function forkJoin<A extends readonly unknown[], R>(
  35. ...sourcesAndResultSelector: [...ObservableInputTuple<A>, (...values: A) => R]
  36. ): Observable<R>;
  37. // forkJoin({a, b, c})
  38. export function forkJoin(sourcesObject: { [K in any]: never }): Observable<never>;
  39. export function forkJoin<T extends Record<string, ObservableInput<any>>>(
  40. sourcesObject: T
  41. ): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;
  42. /**
  43. * Accepts an `Array` of {@link ObservableInput} or a dictionary `Object` of {@link ObservableInput} and returns
  44. * an {@link Observable} that emits either an array of values in the exact same order as the passed array,
  45. * or a dictionary of values in the same shape as the passed dictionary.
  46. *
  47. * <span class="informal">Wait for Observables to complete and then combine last values they emitted;
  48. * complete immediately if an empty array is passed.</span>
  49. *
  50. * ![](forkJoin.png)
  51. *
  52. * `forkJoin` is an operator that takes any number of input observables which can be passed either as an array
  53. * or a dictionary of input observables. If no input observables are provided (e.g. an empty array is passed),
  54. * then the resulting stream will complete immediately.
  55. *
  56. * `forkJoin` will wait for all passed observables to emit and complete and then it will emit an array or an object with last
  57. * values from corresponding observables.
  58. *
  59. * If you pass an array of `n` observables to the operator, then the resulting
  60. * array will have `n` values, where the first value is the last one emitted by the first observable,
  61. * second value is the last one emitted by the second observable and so on.
  62. *
  63. * If you pass a dictionary of observables to the operator, then the resulting
  64. * objects will have the same keys as the dictionary passed, with their last values they have emitted
  65. * located at the corresponding key.
  66. *
  67. * That means `forkJoin` will not emit more than once and it will complete after that. If you need to emit combined
  68. * values not only at the end of the lifecycle of passed observables, but also throughout it, try out {@link combineLatest}
  69. * or {@link zip} instead.
  70. *
  71. * In order for the resulting array to have the same length as the number of input observables, whenever any of
  72. * the given observables completes without emitting any value, `forkJoin` will complete at that moment as well
  73. * and it will not emit anything either, even if it already has some last values from other observables.
  74. * Conversely, if there is an observable that never completes, `forkJoin` will never complete either,
  75. * unless at any point some other observable completes without emitting a value, which brings us back to
  76. * the previous case. Overall, in order for `forkJoin` to emit a value, all given observables
  77. * have to emit something at least once and complete.
  78. *
  79. * If any given observable errors at some point, `forkJoin` will error as well and immediately unsubscribe
  80. * from the other observables.
  81. *
  82. * Optionally `forkJoin` accepts a `resultSelector` function, that will be called with values which normally
  83. * would land in the emitted array. Whatever is returned by the `resultSelector`, will appear in the output
  84. * observable instead. This means that the default `resultSelector` can be thought of as a function that takes
  85. * all its arguments and puts them into an array. Note that the `resultSelector` will be called only
  86. * when `forkJoin` is supposed to emit a result.
  87. *
  88. * ## Examples
  89. *
  90. * Use `forkJoin` with a dictionary of observable inputs
  91. *
  92. * ```ts
  93. * import { forkJoin, of, timer } from 'rxjs';
  94. *
  95. * const observable = forkJoin({
  96. * foo: of(1, 2, 3, 4),
  97. * bar: Promise.resolve(8),
  98. * baz: timer(4000)
  99. * });
  100. * observable.subscribe({
  101. * next: value => console.log(value),
  102. * complete: () => console.log('This is how it ends!'),
  103. * });
  104. *
  105. * // Logs:
  106. * // { foo: 4, bar: 8, baz: 0 } after 4 seconds
  107. * // 'This is how it ends!' immediately after
  108. * ```
  109. *
  110. * Use `forkJoin` with an array of observable inputs
  111. *
  112. * ```ts
  113. * import { forkJoin, of, timer } from 'rxjs';
  114. *
  115. * const observable = forkJoin([
  116. * of(1, 2, 3, 4),
  117. * Promise.resolve(8),
  118. * timer(4000)
  119. * ]);
  120. * observable.subscribe({
  121. * next: value => console.log(value),
  122. * complete: () => console.log('This is how it ends!'),
  123. * });
  124. *
  125. * // Logs:
  126. * // [4, 8, 0] after 4 seconds
  127. * // 'This is how it ends!' immediately after
  128. * ```
  129. *
  130. * @see {@link combineLatest}
  131. * @see {@link zip}
  132. *
  133. * @param args Any number of `ObservableInput`s provided either as an array, as an object
  134. * or as arguments passed directly to the operator.
  135. * @return Observable emitting either an array of last values emitted by passed Observables
  136. * or value from project function.
  137. */
  138. export function forkJoin(...args: any[]): Observable<any> {
  139. const resultSelector = popResultSelector(args);
  140. const { args: sources, keys } = argsArgArrayOrObject(args);
  141. const result = new Observable((subscriber) => {
  142. const { length } = sources;
  143. if (!length) {
  144. subscriber.complete();
  145. return;
  146. }
  147. const values = new Array(length);
  148. let remainingCompletions = length;
  149. let remainingEmissions = length;
  150. for (let sourceIndex = 0; sourceIndex < length; sourceIndex++) {
  151. let hasValue = false;
  152. innerFrom(sources[sourceIndex]).subscribe(
  153. createOperatorSubscriber(
  154. subscriber,
  155. (value) => {
  156. if (!hasValue) {
  157. hasValue = true;
  158. remainingEmissions--;
  159. }
  160. values[sourceIndex] = value;
  161. },
  162. () => remainingCompletions--,
  163. undefined,
  164. () => {
  165. if (!remainingCompletions || !hasValue) {
  166. if (!remainingEmissions) {
  167. subscriber.next(keys ? createObject(keys, values) : values);
  168. }
  169. subscriber.complete();
  170. }
  171. }
  172. )
  173. );
  174. }
  175. });
  176. return resultSelector ? result.pipe(mapOneOrManyArgs(resultSelector)) : result;
  177. }