5627b7733f4fbaa5c39b89e2bf22ebe55fcbf3c2e848a774afafa18568a50d51fdf478a0c49de6fe6e9bec6e3d0e3798fac2ab015615c35f9e5accf4a4578e 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import { Observable } from '../Observable';
  2. import { ObservableInputTuple } from '../types';
  3. import { argsOrArgArray } from '../util/argsOrArgArray';
  4. import { OperatorSubscriber } from '../operators/OperatorSubscriber';
  5. import { noop } from '../util/noop';
  6. import { innerFrom } from './innerFrom';
  7. export function onErrorResumeNext<A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): Observable<A[number]>;
  8. export function onErrorResumeNext<A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): Observable<A[number]>;
  9. /**
  10. * When any of the provided Observable emits a complete or an error notification, it immediately subscribes to the next one
  11. * that was passed.
  12. *
  13. * <span class="informal">Execute series of Observables no matter what, even if it means swallowing errors.</span>
  14. *
  15. * ![](onErrorResumeNext.png)
  16. *
  17. * `onErrorResumeNext` will subscribe to each observable source it is provided, in order.
  18. * If the source it's subscribed to emits an error or completes, it will move to the next source
  19. * without error.
  20. *
  21. * If `onErrorResumeNext` is provided no arguments, or a single, empty array, it will return {@link EMPTY}.
  22. *
  23. * `onErrorResumeNext` is basically {@link concat}, only it will continue, even if one of its
  24. * sources emits an error.
  25. *
  26. * Note that there is no way to handle any errors thrown by sources via the result of
  27. * `onErrorResumeNext`. If you want to handle errors thrown in any given source, you can
  28. * always use the {@link catchError} operator on them before passing them into `onErrorResumeNext`.
  29. *
  30. * ## Example
  31. *
  32. * Subscribe to the next Observable after map fails
  33. *
  34. * ```ts
  35. * import { onErrorResumeNext, of, map } from 'rxjs';
  36. *
  37. * onErrorResumeNext(
  38. * of(1, 2, 3, 0).pipe(
  39. * map(x => {
  40. * if (x === 0) {
  41. * throw Error();
  42. * }
  43. * return 10 / x;
  44. * })
  45. * ),
  46. * of(1, 2, 3)
  47. * )
  48. * .subscribe({
  49. * next: value => console.log(value),
  50. * error: err => console.log(err), // Will never be called.
  51. * complete: () => console.log('done')
  52. * });
  53. *
  54. * // Logs:
  55. * // 10
  56. * // 5
  57. * // 3.3333333333333335
  58. * // 1
  59. * // 2
  60. * // 3
  61. * // 'done'
  62. * ```
  63. *
  64. * @see {@link concat}
  65. * @see {@link catchError}
  66. *
  67. * @param sources `ObservableInput`s passed either directly or as an array.
  68. * @return An Observable that concatenates all sources, one after the other,
  69. * ignoring all errors, such that any error causes it to move on to the next source.
  70. */
  71. export function onErrorResumeNext<A extends readonly unknown[]>(
  72. ...sources: [[...ObservableInputTuple<A>]] | [...ObservableInputTuple<A>]
  73. ): Observable<A[number]> {
  74. const nextSources: ObservableInputTuple<A> = argsOrArgArray(sources) as any;
  75. return new Observable((subscriber) => {
  76. let sourceIndex = 0;
  77. const subscribeNext = () => {
  78. if (sourceIndex < nextSources.length) {
  79. let nextSource: Observable<A[number]>;
  80. try {
  81. nextSource = innerFrom(nextSources[sourceIndex++]);
  82. } catch (err) {
  83. subscribeNext();
  84. return;
  85. }
  86. const innerSubscriber = new OperatorSubscriber(subscriber, undefined, noop, noop);
  87. nextSource.subscribe(innerSubscriber);
  88. innerSubscriber.add(subscribeNext);
  89. } else {
  90. subscriber.complete();
  91. }
  92. };
  93. subscribeNext();
  94. });
  95. }