c02ba5b7f030d746cf223c236cd5e328f30555e98b039994e2f43d4fb57090417d829a22bc9a22481e93309042c4cdb7f35cc741924a38cf21e68026c79027 1.3 KB

1234567891011121314151617181920212223242526272829
  1. import { Observable } from '../Observable';
  2. import { ObservableInput, OperatorFunction } from '../types';
  3. import { identity } from '../util/identity';
  4. import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
  5. import { pipe } from '../util/pipe';
  6. import { mergeMap } from './mergeMap';
  7. import { toArray } from './toArray';
  8. /**
  9. * Collects all of the inner sources from source observable. Then, once the
  10. * source completes, joins the values using the given static.
  11. *
  12. * This is used for {@link combineLatestAll} and {@link zipAll} which both have the
  13. * same behavior of collecting all inner observables, then operating on them.
  14. *
  15. * @param joinFn The type of static join to apply to the sources collected
  16. * @param project The projection function to apply to the values, if any
  17. */
  18. export function joinAllInternals<T, R>(joinFn: (sources: ObservableInput<T>[]) => Observable<T>, project?: (...args: any[]) => R) {
  19. return pipe(
  20. // Collect all inner sources into an array, and emit them when the
  21. // source completes.
  22. toArray() as OperatorFunction<ObservableInput<T>, ObservableInput<T>[]>,
  23. // Run the join function on the collected array of inner sources.
  24. mergeMap((sources) => joinFn(sources)),
  25. // If a projection function was supplied, apply it to each result.
  26. project ? mapOneOrManyArgs(project) : (identity as any)
  27. );
  28. }