2f6051f955d8b2b5bd777a3311df8faba1a3d91ec5bbb5f115df7c9f491aa2c45cf30f987e7302dab4bf1530e8273e3d5706dbf3d4a3b11cf4aeeff79a7cef 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import { Observable } from '../Observable';
  2. import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
  3. import { Subscription } from '../Subscription';
  4. import { innerFrom } from '../observable/innerFrom';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. import { operate } from '../util/lift';
  7. export function catchError<T, O extends ObservableInput<any>>(
  8. selector: (err: any, caught: Observable<T>) => O
  9. ): OperatorFunction<T, T | ObservedValueOf<O>>;
  10. /**
  11. * Catches errors on the observable to be handled by returning a new observable or throwing an error.
  12. *
  13. * <span class="informal">
  14. * It only listens to the error channel and ignores notifications.
  15. * Handles errors from the source observable, and maps them to a new observable.
  16. * The error may also be rethrown, or a new error can be thrown to emit an error from the result.
  17. * </span>
  18. *
  19. * ![](catch.png)
  20. *
  21. * This operator handles errors, but forwards along all other events to the resulting observable.
  22. * If the source observable terminates with an error, it will map that error to a new observable,
  23. * subscribe to it, and forward all of its events to the resulting observable.
  24. *
  25. * ## Examples
  26. *
  27. * Continue with a different Observable when there's an error
  28. *
  29. * ```ts
  30. * import { of, map, catchError } from 'rxjs';
  31. *
  32. * of(1, 2, 3, 4, 5)
  33. * .pipe(
  34. * map(n => {
  35. * if (n === 4) {
  36. * throw 'four!';
  37. * }
  38. * return n;
  39. * }),
  40. * catchError(err => of('I', 'II', 'III', 'IV', 'V'))
  41. * )
  42. * .subscribe(x => console.log(x));
  43. * // 1, 2, 3, I, II, III, IV, V
  44. * ```
  45. *
  46. * Retry the caught source Observable again in case of error, similar to `retry()` operator
  47. *
  48. * ```ts
  49. * import { of, map, catchError, take } from 'rxjs';
  50. *
  51. * of(1, 2, 3, 4, 5)
  52. * .pipe(
  53. * map(n => {
  54. * if (n === 4) {
  55. * throw 'four!';
  56. * }
  57. * return n;
  58. * }),
  59. * catchError((err, caught) => caught),
  60. * take(30)
  61. * )
  62. * .subscribe(x => console.log(x));
  63. * // 1, 2, 3, 1, 2, 3, ...
  64. * ```
  65. *
  66. * Throw a new error when the source Observable throws an error
  67. *
  68. * ```ts
  69. * import { of, map, catchError } from 'rxjs';
  70. *
  71. * of(1, 2, 3, 4, 5)
  72. * .pipe(
  73. * map(n => {
  74. * if (n === 4) {
  75. * throw 'four!';
  76. * }
  77. * return n;
  78. * }),
  79. * catchError(err => {
  80. * throw 'error in source. Details: ' + err;
  81. * })
  82. * )
  83. * .subscribe({
  84. * next: x => console.log(x),
  85. * error: err => console.log(err)
  86. * });
  87. * // 1, 2, 3, error in source. Details: four!
  88. * ```
  89. *
  90. * @see {@link onErrorResumeNext}
  91. * @see {@link repeat}
  92. * @see {@link repeatWhen}
  93. * @see {@link retry }
  94. * @see {@link retryWhen}
  95. *
  96. * @param selector A function that takes as arguments `err`, which is the error, and `caught`, which
  97. * is the source observable, in case you'd like to "retry" that observable by returning it again.
  98. * Whatever observable is returned by the `selector` will be used to continue the observable chain.
  99. * @return A function that returns an Observable that originates from either
  100. * the source or the Observable returned by the `selector` function.
  101. */
  102. export function catchError<T, O extends ObservableInput<any>>(
  103. selector: (err: any, caught: Observable<T>) => O
  104. ): OperatorFunction<T, T | ObservedValueOf<O>> {
  105. return operate((source, subscriber) => {
  106. let innerSub: Subscription | null = null;
  107. let syncUnsub = false;
  108. let handledResult: Observable<ObservedValueOf<O>>;
  109. innerSub = source.subscribe(
  110. createOperatorSubscriber(subscriber, undefined, undefined, (err) => {
  111. handledResult = innerFrom(selector(err, catchError(selector)(source)));
  112. if (innerSub) {
  113. innerSub.unsubscribe();
  114. innerSub = null;
  115. handledResult.subscribe(subscriber);
  116. } else {
  117. // We don't have an innerSub yet, that means the error was synchronous
  118. // because the subscribe call hasn't returned yet.
  119. syncUnsub = true;
  120. }
  121. })
  122. );
  123. if (syncUnsub) {
  124. // We have a synchronous error, we need to make sure to
  125. // finalize right away. This ensures that callbacks in the `finalize` operator are called
  126. // at the right time, and that finalization occurs at the expected
  127. // time between the source error and the subscription to the
  128. // next observable.
  129. innerSub.unsubscribe();
  130. innerSub = null;
  131. handledResult!.subscribe(subscriber);
  132. }
  133. });
  134. }