a5bee40472e22a6e912f08a5b314622b5b30adb8d588ce637ece0f4c4358565947b8a5ec64865fd5dc947b71b717956977b4277c5ad5ae02635ed8161641cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import { Observable } from '../Observable';
  2. import { ObservableInput, SchedulerLike, ObservedValueOf, ObservableInputTuple } from '../types';
  3. import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
  4. import { Subscriber } from '../Subscriber';
  5. import { from } from './from';
  6. import { identity } from '../util/identity';
  7. import { Subscription } from '../Subscription';
  8. import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
  9. import { popResultSelector, popScheduler } from '../util/args';
  10. import { createObject } from '../util/createObject';
  11. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  12. import { AnyCatcher } from '../AnyCatcher';
  13. import { executeSchedule } from '../util/executeSchedule';
  14. // combineLatest(any)
  15. // We put this first because we need to catch cases where the user has supplied
  16. // _exactly `any`_ as the argument. Since `any` literally matches _anything_,
  17. // we don't want it to randomly hit one of the other type signatures below,
  18. // as we have no idea at build-time what type we should be returning when given an any.
  19. /**
  20. * You have passed `any` here, we can't figure out if it is
  21. * an array or an object, so you're getting `unknown`. Use better types.
  22. * @param arg Something typed as `any`
  23. */
  24. export function combineLatest<T extends AnyCatcher>(arg: T): Observable<unknown>;
  25. // combineLatest([a, b, c])
  26. export function combineLatest(sources: []): Observable<never>;
  27. export function combineLatest<A extends readonly unknown[]>(sources: readonly [...ObservableInputTuple<A>]): Observable<A>;
  28. /** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
  29. export function combineLatest<A extends readonly unknown[], R>(
  30. sources: readonly [...ObservableInputTuple<A>],
  31. resultSelector: (...values: A) => R,
  32. scheduler: SchedulerLike
  33. ): Observable<R>;
  34. export function combineLatest<A extends readonly unknown[], R>(
  35. sources: readonly [...ObservableInputTuple<A>],
  36. resultSelector: (...values: A) => R
  37. ): Observable<R>;
  38. /** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
  39. export function combineLatest<A extends readonly unknown[]>(
  40. sources: readonly [...ObservableInputTuple<A>],
  41. scheduler: SchedulerLike
  42. ): Observable<A>;
  43. // combineLatest(a, b, c)
  44. /** @deprecated Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument */
  45. export function combineLatest<A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): Observable<A>;
  46. /** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
  47. export function combineLatest<A extends readonly unknown[], R>(
  48. ...sourcesAndResultSelectorAndScheduler: [...ObservableInputTuple<A>, (...values: A) => R, SchedulerLike]
  49. ): Observable<R>;
  50. /** @deprecated Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument */
  51. export function combineLatest<A extends readonly unknown[], R>(
  52. ...sourcesAndResultSelector: [...ObservableInputTuple<A>, (...values: A) => R]
  53. ): Observable<R>;
  54. /** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
  55. export function combineLatest<A extends readonly unknown[]>(
  56. ...sourcesAndScheduler: [...ObservableInputTuple<A>, SchedulerLike]
  57. ): Observable<A>;
  58. // combineLatest({a, b, c})
  59. export function combineLatest(sourcesObject: { [K in any]: never }): Observable<never>;
  60. export function combineLatest<T extends Record<string, ObservableInput<any>>>(
  61. sourcesObject: T
  62. ): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;
  63. /**
  64. * Combines multiple Observables to create an Observable whose values are
  65. * calculated from the latest values of each of its input Observables.
  66. *
  67. * <span class="informal">Whenever any input Observable emits a value, it
  68. * computes a formula using the latest values from all the inputs, then emits
  69. * the output of that formula.</span>
  70. *
  71. * ![](combineLatest.png)
  72. *
  73. * `combineLatest` combines the values from all the Observables passed in the
  74. * observables array. This is done by subscribing to each Observable in order and,
  75. * whenever any Observable emits, collecting an array of the most recent
  76. * values from each Observable. So if you pass `n` Observables to this operator,
  77. * the returned Observable will always emit an array of `n` values, in an order
  78. * corresponding to the order of the passed Observables (the value from the first Observable
  79. * will be at index 0 of the array and so on).
  80. *
  81. * Static version of `combineLatest` accepts an array of Observables. Note that an array of
  82. * Observables is a good choice, if you don't know beforehand how many Observables
  83. * you will combine. Passing an empty array will result in an Observable that
  84. * completes immediately.
  85. *
  86. * To ensure the output array always has the same length, `combineLatest` will
  87. * actually wait for all input Observables to emit at least once,
  88. * before it starts emitting results. This means if some Observable emits
  89. * values before other Observables started emitting, all these values but the last
  90. * will be lost. On the other hand, if some Observable does not emit a value but
  91. * completes, resulting Observable will complete at the same moment without
  92. * emitting anything, since it will now be impossible to include a value from the
  93. * completed Observable in the resulting array. Also, if some input Observable does
  94. * not emit any value and never completes, `combineLatest` will also never emit
  95. * and never complete, since, again, it will wait for all streams to emit some
  96. * value.
  97. *
  98. * If at least one Observable was passed to `combineLatest` and all passed Observables
  99. * emitted something, the resulting Observable will complete when all combined
  100. * streams complete. So even if some Observable completes, the result of
  101. * `combineLatest` will still emit values when other Observables do. In case
  102. * of a completed Observable, its value from now on will always be the last
  103. * emitted value. On the other hand, if any Observable errors, `combineLatest`
  104. * will error immediately as well, and all other Observables will be unsubscribed.
  105. *
  106. * ## Examples
  107. *
  108. * Combine two timer Observables
  109. *
  110. * ```ts
  111. * import { timer, combineLatest } from 'rxjs';
  112. *
  113. * const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
  114. * const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
  115. * const combinedTimers = combineLatest([firstTimer, secondTimer]);
  116. * combinedTimers.subscribe(value => console.log(value));
  117. * // Logs
  118. * // [0, 0] after 0.5s
  119. * // [1, 0] after 1s
  120. * // [1, 1] after 1.5s
  121. * // [2, 1] after 2s
  122. * ```
  123. *
  124. * Combine a dictionary of Observables
  125. *
  126. * ```ts
  127. * import { of, delay, startWith, combineLatest } from 'rxjs';
  128. *
  129. * const observables = {
  130. * a: of(1).pipe(delay(1000), startWith(0)),
  131. * b: of(5).pipe(delay(5000), startWith(0)),
  132. * c: of(10).pipe(delay(10000), startWith(0))
  133. * };
  134. * const combined = combineLatest(observables);
  135. * combined.subscribe(value => console.log(value));
  136. * // Logs
  137. * // { a: 0, b: 0, c: 0 } immediately
  138. * // { a: 1, b: 0, c: 0 } after 1s
  139. * // { a: 1, b: 5, c: 0 } after 5s
  140. * // { a: 1, b: 5, c: 10 } after 10s
  141. * ```
  142. *
  143. * Combine an array of Observables
  144. *
  145. * ```ts
  146. * import { of, delay, startWith, combineLatest } from 'rxjs';
  147. *
  148. * const observables = [1, 5, 10].map(
  149. * n => of(n).pipe(
  150. * delay(n * 1000), // emit 0 and then emit n after n seconds
  151. * startWith(0)
  152. * )
  153. * );
  154. * const combined = combineLatest(observables);
  155. * combined.subscribe(value => console.log(value));
  156. * // Logs
  157. * // [0, 0, 0] immediately
  158. * // [1, 0, 0] after 1s
  159. * // [1, 5, 0] after 5s
  160. * // [1, 5, 10] after 10s
  161. * ```
  162. *
  163. * Use map operator to dynamically calculate the Body-Mass Index
  164. *
  165. * ```ts
  166. * import { of, combineLatest, map } from 'rxjs';
  167. *
  168. * const weight = of(70, 72, 76, 79, 75);
  169. * const height = of(1.76, 1.77, 1.78);
  170. * const bmi = combineLatest([weight, height]).pipe(
  171. * map(([w, h]) => w / (h * h)),
  172. * );
  173. * bmi.subscribe(x => console.log('BMI is ' + x));
  174. *
  175. * // With output to console:
  176. * // BMI is 24.212293388429753
  177. * // BMI is 23.93948099205209
  178. * // BMI is 23.671253629592222
  179. * ```
  180. *
  181. * @see {@link combineLatestAll}
  182. * @see {@link merge}
  183. * @see {@link withLatestFrom}
  184. *
  185. * @param args Any number of `ObservableInput`s provided either as an array or as an object
  186. * to combine with each other. If the last parameter is the function, it will be used to project the
  187. * values from the combined latest values into a new value on the output Observable.
  188. * @return An Observable of projected values from the most recent values from each `ObservableInput`,
  189. * or an array of the most recent values from each `ObservableInput`.
  190. */
  191. export function combineLatest<O extends ObservableInput<any>, R>(...args: any[]): Observable<R> | Observable<ObservedValueOf<O>[]> {
  192. const scheduler = popScheduler(args);
  193. const resultSelector = popResultSelector(args);
  194. const { args: observables, keys } = argsArgArrayOrObject(args);
  195. if (observables.length === 0) {
  196. // If no observables are passed, or someone has passed an empty array
  197. // of observables, or even an empty object POJO, we need to just
  198. // complete (EMPTY), but we have to honor the scheduler provided if any.
  199. return from([], scheduler as any);
  200. }
  201. const result = new Observable<ObservedValueOf<O>[]>(
  202. combineLatestInit(
  203. observables as ObservableInput<ObservedValueOf<O>>[],
  204. scheduler,
  205. keys
  206. ? // A handler for scrubbing the array of args into a dictionary.
  207. (values) => createObject(keys, values)
  208. : // A passthrough to just return the array
  209. identity
  210. )
  211. );
  212. return resultSelector ? (result.pipe(mapOneOrManyArgs(resultSelector)) as Observable<R>) : result;
  213. }
  214. export function combineLatestInit(
  215. observables: ObservableInput<any>[],
  216. scheduler?: SchedulerLike,
  217. valueTransform: (values: any[]) => any = identity
  218. ) {
  219. return (subscriber: Subscriber<any>) => {
  220. // The outer subscription. We're capturing this in a function
  221. // because we may have to schedule it.
  222. maybeSchedule(
  223. scheduler,
  224. () => {
  225. const { length } = observables;
  226. // A store for the values each observable has emitted so far. We match observable to value on index.
  227. const values = new Array(length);
  228. // The number of currently active subscriptions, as they complete, we decrement this number to see if
  229. // we are all done combining values, so we can complete the result.
  230. let active = length;
  231. // The number of inner sources that still haven't emitted the first value
  232. // We need to track this because all sources need to emit one value in order
  233. // to start emitting values.
  234. let remainingFirstValues = length;
  235. // The loop to kick off subscription. We're keying everything on index `i` to relate the observables passed
  236. // in to the slot in the output array or the key in the array of keys in the output dictionary.
  237. for (let i = 0; i < length; i++) {
  238. maybeSchedule(
  239. scheduler,
  240. () => {
  241. const source = from(observables[i], scheduler as any);
  242. let hasFirstValue = false;
  243. source.subscribe(
  244. createOperatorSubscriber(
  245. subscriber,
  246. (value) => {
  247. // When we get a value, record it in our set of values.
  248. values[i] = value;
  249. if (!hasFirstValue) {
  250. // If this is our first value, record that.
  251. hasFirstValue = true;
  252. remainingFirstValues--;
  253. }
  254. if (!remainingFirstValues) {
  255. // We're not waiting for any more
  256. // first values, so we can emit!
  257. subscriber.next(valueTransform(values.slice()));
  258. }
  259. },
  260. () => {
  261. if (!--active) {
  262. // We only complete the result if we have no more active
  263. // inner observables.
  264. subscriber.complete();
  265. }
  266. }
  267. )
  268. );
  269. },
  270. subscriber
  271. );
  272. }
  273. },
  274. subscriber
  275. );
  276. };
  277. }
  278. /**
  279. * A small utility to handle the couple of locations where we want to schedule if a scheduler was provided,
  280. * but we don't if there was no scheduler.
  281. */
  282. function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) {
  283. if (scheduler) {
  284. executeSchedule(subscription, scheduler, execute);
  285. } else {
  286. execute();
  287. }
  288. }