ce2229e205d3a9be0a75f53e6b07766e4a64157568b8d12f8f011232181d5ad8be01195dad99cebca1771cf1fd787b9ce98f05597ea9d1a5037378281a0911 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import { Observable } from '../Observable';
  2. import { innerFrom } from './innerFrom';
  3. import { Subscription } from '../Subscription';
  4. import { ObservableInput, ObservableInputTuple } from '../types';
  5. import { argsOrArgArray } from '../util/argsOrArgArray';
  6. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  7. import { Subscriber } from '../Subscriber';
  8. export function race<T extends readonly unknown[]>(inputs: [...ObservableInputTuple<T>]): Observable<T[number]>;
  9. export function race<T extends readonly unknown[]>(...inputs: [...ObservableInputTuple<T>]): Observable<T[number]>;
  10. /**
  11. * Returns an observable that mirrors the first source observable to emit an item.
  12. *
  13. * ![](race.png)
  14. *
  15. * `race` returns an observable, that when subscribed to, subscribes to all source observables immediately.
  16. * As soon as one of the source observables emits a value, the result unsubscribes from the other sources.
  17. * The resulting observable will forward all notifications, including error and completion, from the "winning"
  18. * source observable.
  19. *
  20. * If one of the used source observable throws an errors before a first notification
  21. * the race operator will also throw an error, no matter if another source observable
  22. * could potentially win the race.
  23. *
  24. * `race` can be useful for selecting the response from the fastest network connection for
  25. * HTTP or WebSockets. `race` can also be useful for switching observable context based on user
  26. * input.
  27. *
  28. * ## Example
  29. *
  30. * Subscribes to the observable that was the first to start emitting.
  31. *
  32. * ```ts
  33. * import { interval, map, race } from 'rxjs';
  34. *
  35. * const obs1 = interval(7000).pipe(map(() => 'slow one'));
  36. * const obs2 = interval(3000).pipe(map(() => 'fast one'));
  37. * const obs3 = interval(5000).pipe(map(() => 'medium one'));
  38. *
  39. * race(obs1, obs2, obs3)
  40. * .subscribe(winner => console.log(winner));
  41. *
  42. * // Outputs
  43. * // a series of 'fast one'
  44. * ```
  45. *
  46. * @param sources Used to race for which `ObservableInput` emits first.
  47. * @return An Observable that mirrors the output of the first Observable to emit an item.
  48. */
  49. export function race<T>(...sources: (ObservableInput<T> | ObservableInput<T>[])[]): Observable<any> {
  50. sources = argsOrArgArray(sources);
  51. // If only one source was passed, just return it. Otherwise return the race.
  52. return sources.length === 1 ? innerFrom(sources[0] as ObservableInput<T>) : new Observable<T>(raceInit(sources as ObservableInput<T>[]));
  53. }
  54. /**
  55. * An observable initializer function for both the static version and the
  56. * operator version of race.
  57. * @param sources The sources to race
  58. */
  59. export function raceInit<T>(sources: ObservableInput<T>[]) {
  60. return (subscriber: Subscriber<T>) => {
  61. let subscriptions: Subscription[] = [];
  62. // Subscribe to all of the sources. Note that we are checking `subscriptions` here
  63. // Is is an array of all actively "racing" subscriptions, and it is `null` after the
  64. // race has been won. So, if we have racer that synchronously "wins", this loop will
  65. // stop before it subscribes to any more.
  66. for (let i = 0; subscriptions && !subscriber.closed && i < sources.length; i++) {
  67. subscriptions.push(
  68. innerFrom(sources[i] as ObservableInput<T>).subscribe(
  69. createOperatorSubscriber(subscriber, (value) => {
  70. if (subscriptions) {
  71. // We're still racing, but we won! So unsubscribe
  72. // all other subscriptions that we have, except this one.
  73. for (let s = 0; s < subscriptions.length; s++) {
  74. s !== i && subscriptions[s].unsubscribe();
  75. }
  76. subscriptions = null!;
  77. }
  78. subscriber.next(value);
  79. })
  80. )
  81. );
  82. }
  83. };
  84. }