158b72025bbaa897654f39b3801e3845091064405e5fe72a65322c87d2156c3c9c6cc790f18d7e91ba50d0a433b7c4fa06b0bbbdfa3bff4d19d5b85b6fca6f 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import { Observable } from '../Observable';
  2. import { ObservableInput, ObservedValuesFromArray, ObservedValueOf, SubscribableOrPromise } from '../types';
  3. import { isArray } from '../util/isArray';
  4. import { map } from '../operators/map';
  5. import { isObject } from '../util/isObject';
  6. import { isObservable } from '../util/isObservable';
  7. import { from } from './from';
  8. /* tslint:disable:max-line-length */
  9. // forkJoin(a$, b$, c$)
  10. /** @deprecated Use the version that takes an array of Observables instead */
  11. export function forkJoin<T>(v1: SubscribableOrPromise<T>): Observable<[T]>;
  12. /** @deprecated Use the version that takes an array of Observables instead */
  13. export function forkJoin<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>): Observable<[T, T2]>;
  14. /** @deprecated Use the version that takes an array of Observables instead */
  15. export function forkJoin<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<[T, T2, T3]>;
  16. /** @deprecated Use the version that takes an array of Observables instead */
  17. export function forkJoin<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<[T, T2, T3, T4]>;
  18. /** @deprecated Use the version that takes an array of Observables instead */
  19. export function forkJoin<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<[T, T2, T3, T4, T5]>;
  20. /** @deprecated Use the version that takes an array of Observables instead */
  21. export function forkJoin<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<[T, T2, T3, T4, T5, T6]>;
  22. // forkJoin([a$, b$, c$]);
  23. // TODO(benlesh): Uncomment for TS 3.0
  24. // export function forkJoin(sources: []): Observable<never>;
  25. export function forkJoin<A>(sources: [ObservableInput<A>]): Observable<[A]>;
  26. export function forkJoin<A, B>(sources: [ObservableInput<A>, ObservableInput<B>]): Observable<[A, B]>;
  27. export function forkJoin<A, B, C>(sources: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>]): Observable<[A, B, C]>;
  28. export function forkJoin<A, B, C, D>(sources: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>]): Observable<[A, B, C, D]>;
  29. export function forkJoin<A, B, C, D, E>(sources: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>, ObservableInput<E>]): Observable<[A, B, C, D, E]>;
  30. export function forkJoin<A, B, C, D, E, F>(sources: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>, ObservableInput<E>, ObservableInput<F>]): Observable<[A, B, C, D, E, F]>;
  31. export function forkJoin<A extends ObservableInput<any>[]>(sources: A): Observable<ObservedValuesFromArray<A>[]>;
  32. // forkJoin({})
  33. export function forkJoin(sourcesObject: {}): Observable<never>;
  34. export function forkJoin<T, K extends keyof T>(sourcesObject: T): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;
  35. /** @deprecated resultSelector is deprecated, pipe to map instead */
  36. export function forkJoin(...args: Array<ObservableInput<any>|Function>): Observable<any>;
  37. /** @deprecated Use the version that takes an array of Observables instead */
  38. export function forkJoin<T>(...sources: ObservableInput<T>[]): Observable<T[]>;
  39. /* tslint:enable:max-line-length */
  40. /**
  41. * Accepts an `Array` of {@link ObservableInput} or a dictionary `Object` of {@link ObservableInput} and returns
  42. * an {@link Observable} that emits either an array of values in the exact same order as the passed array,
  43. * or a dictionary of values in the same shape as the passed dictionary.
  44. *
  45. * <span class="informal">Wait for Observables to complete and then combine last values they emitted.</span>
  46. *
  47. * ![](forkJoin.png)
  48. *
  49. * `forkJoin` is an operator that takes any number of input observables which can be passed either as an array
  50. * or a dictionary of input observables. If no input observables are provided, resulting stream will complete
  51. * immediately.
  52. *
  53. * `forkJoin` will wait for all passed observables to complete and then it will emit an array or an object with last
  54. * values from corresponding observables.
  55. *
  56. * If you pass an array of `n` observables to the operator, resulting
  57. * array will have `n` values, where first value is the last thing emitted by the first observable,
  58. * second value is the last thing emitted by the second observable and so on.
  59. *
  60. * If you pass a dictionary of observables to the operator, resulting
  61. * objects will have the same keys as the dictionary passed, with their last values they've emitted
  62. * located at the corresponding key.
  63. *
  64. * That means `forkJoin` will not emit more than once and it will complete after that. If you need to emit combined
  65. * values not only at the end of lifecycle of passed observables, but also throughout it, try out {@link combineLatest}
  66. * or {@link zip} instead.
  67. *
  68. * In order for resulting array to have the same length as the number of input observables, whenever any of
  69. * that observables completes without emitting any value, `forkJoin` will complete at that moment as well
  70. * and it will not emit anything either, even if it already has some last values from other observables.
  71. * Conversely, if there is an observable that never completes, `forkJoin` will never complete as well,
  72. * unless at any point some other observable completes without emitting value, which brings us back to
  73. * the previous case. Overall, in order for `forkJoin` to emit a value, all observables passed as arguments
  74. * have to emit something at least once and complete.
  75. *
  76. * If any input observable errors at some point, `forkJoin` will error as well and all other observables
  77. * will be immediately unsubscribed.
  78. *
  79. * Optionally `forkJoin` accepts project function, that will be called with values which normally
  80. * would land in emitted array. Whatever is returned by project function, will appear in output
  81. * observable instead. This means that default project can be thought of as a function that takes
  82. * all its arguments and puts them into an array. Note that project function will be called only
  83. * when output observable is supposed to emit a result.
  84. *
  85. * ## Examples
  86. *
  87. * ### Use forkJoin with a dictionary of observable inputs
  88. * ```ts
  89. * import { forkJoin, of, timer } from 'rxjs';
  90. *
  91. * const observable = forkJoin({
  92. * foo: of(1, 2, 3, 4),
  93. * bar: Promise.resolve(8),
  94. * baz: timer(4000),
  95. * });
  96. * observable.subscribe({
  97. * next: value => console.log(value),
  98. * complete: () => console.log('This is how it ends!'),
  99. * });
  100. *
  101. * // Logs:
  102. * // { foo: 4, bar: 8, baz: 0 } after 4 seconds
  103. * // "This is how it ends!" immediately after
  104. * ```
  105. *
  106. * ### Use forkJoin with an array of observable inputs
  107. * ```ts
  108. * import { forkJoin, of } from 'rxjs';
  109. *
  110. * const observable = forkJoin([
  111. * of(1, 2, 3, 4),
  112. * Promise.resolve(8),
  113. * timer(4000),
  114. * ]);
  115. * observable.subscribe({
  116. * next: value => console.log(value),
  117. * complete: () => console.log('This is how it ends!'),
  118. * });
  119. *
  120. * // Logs:
  121. * // [4, 8, 0] after 4 seconds
  122. * // "This is how it ends!" immediately after
  123. * ```
  124. *
  125. * @see {@link combineLatest}
  126. * @see {@link zip}
  127. *
  128. * @param {...ObservableInput} sources Any number of Observables provided either as an array or as an arguments
  129. * passed directly to the operator.
  130. * @param {function} [project] Function that takes values emitted by input Observables and returns value
  131. * that will appear in resulting Observable instead of default array.
  132. * @return {Observable} Observable emitting either an array of last values emitted by passed Observables
  133. * or value from project function.
  134. */
  135. export function forkJoin(
  136. ...sources: any[]
  137. ): Observable<any> {
  138. if (sources.length === 1) {
  139. const first = sources[0];
  140. if (isArray(first)) {
  141. return forkJoinInternal(first, null);
  142. }
  143. // TODO(benlesh): isObservable check will not be necessary when deprecated path is removed.
  144. if (isObject(first) && Object.getPrototypeOf(first) === Object.prototype) {
  145. const keys = Object.keys(first);
  146. return forkJoinInternal(keys.map(key => first[key]), keys);
  147. }
  148. }
  149. // DEPRECATED PATHS BELOW HERE
  150. if (typeof sources[sources.length - 1] === 'function') {
  151. const resultSelector = sources.pop() as Function;
  152. sources = (sources.length === 1 && isArray(sources[0])) ? sources[0] : sources;
  153. return forkJoinInternal(sources, null).pipe(
  154. map((args: any[]) => resultSelector(...args))
  155. );
  156. }
  157. return forkJoinInternal(sources, null);
  158. }
  159. function forkJoinInternal(sources: ObservableInput<any>[], keys: string[] | null): Observable<any> {
  160. return new Observable(subscriber => {
  161. const len = sources.length;
  162. if (len === 0) {
  163. subscriber.complete();
  164. return;
  165. }
  166. const values = new Array(len);
  167. let completed = 0;
  168. let emitted = 0;
  169. for (let i = 0; i < len; i++) {
  170. const source = from(sources[i]);
  171. let hasValue = false;
  172. subscriber.add(source.subscribe({
  173. next: value => {
  174. if (!hasValue) {
  175. hasValue = true;
  176. emitted++;
  177. }
  178. values[i] = value;
  179. },
  180. error: err => subscriber.error(err),
  181. complete: () => {
  182. completed++;
  183. if (completed === len || !hasValue) {
  184. if (emitted === len) {
  185. subscriber.next(keys ?
  186. keys.reduce((result, key, i) => (result[key] = values[i], result), {}) :
  187. values);
  188. }
  189. subscriber.complete();
  190. }
  191. }
  192. }));
  193. }
  194. });
  195. }