605ce007ff5844e13133e202eb56aef655f8eddab211117ac42f1b96fc2badb50a25187c69a88995e6b70c5ff124d99aa9a7950929a20be77478a19128e3a8 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import { Operator } from '../Operator';
  2. import { Subscriber } from '../Subscriber';
  3. import { Observable } from '../Observable';
  4. import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
  5. import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
  6. /* tslint:disable:max-line-length */
  7. export function catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
  8. /* tslint:enable:max-line-length */
  9. /**
  10. * Catches errors on the observable to be handled by returning a new observable or throwing an error.
  11. *
  12. * ![](catch.png)
  13. *
  14. * ## Examples
  15. * Continues with a different Observable when there's an error
  16. *
  17. * ```ts
  18. * import { of } from 'rxjs';
  19. * import { map, catchError } from 'rxjs/operators';
  20. *
  21. * of(1, 2, 3, 4, 5).pipe(
  22. * map(n => {
  23. * if (n === 4) {
  24. * throw 'four!';
  25. * }
  26. * return n;
  27. * }),
  28. * catchError(err => of('I', 'II', 'III', 'IV', 'V')),
  29. * )
  30. * .subscribe(x => console.log(x));
  31. * // 1, 2, 3, I, II, III, IV, V
  32. * ```
  33. *
  34. * Retries the caught source Observable again in case of error, similar to retry() operator
  35. *
  36. * ```ts
  37. * import { of } from 'rxjs';
  38. * import { map, catchError, take } from 'rxjs/operators';
  39. *
  40. * of(1, 2, 3, 4, 5).pipe(
  41. * map(n => {
  42. * if (n === 4) {
  43. * throw 'four!';
  44. * }
  45. * return n;
  46. * }),
  47. * catchError((err, caught) => caught),
  48. * take(30),
  49. * )
  50. * .subscribe(x => console.log(x));
  51. * // 1, 2, 3, 1, 2, 3, ...
  52. * ```
  53. *
  54. * Throws a new error when the source Observable throws an error
  55. *
  56. * ```ts
  57. * import { of } from 'rxjs';
  58. * import { map, catchError } from 'rxjs/operators';
  59. *
  60. * of(1, 2, 3, 4, 5).pipe(
  61. * map(n => {
  62. * if (n === 4) {
  63. * throw 'four!';
  64. * }
  65. * return n;
  66. * }),
  67. * catchError(err => {
  68. * throw 'error in source. Details: ' + err;
  69. * }),
  70. * )
  71. * .subscribe(
  72. * x => console.log(x),
  73. * err => console.log(err)
  74. * );
  75. * // 1, 2, 3, error in source. Details: four!
  76. * ```
  77. *
  78. * @param {function} selector a function that takes as arguments `err`, which is the error, and `caught`, which
  79. * is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable
  80. * is returned by the `selector` will be used to continue the observable chain.
  81. * @return {Observable} An observable that originates from either the source or the observable returned by the
  82. * catch `selector` function.
  83. * @name catchError
  84. */
  85. export function catchError<T, O extends ObservableInput<any>>(
  86. selector: (err: any, caught: Observable<T>) => O
  87. ): OperatorFunction<T, T | ObservedValueOf<O>> {
  88. return function catchErrorOperatorFunction(source: Observable<T>): Observable<T | ObservedValueOf<O>> {
  89. const operator = new CatchOperator(selector);
  90. const caught = source.lift(operator);
  91. return (operator.caught = caught as Observable<T>);
  92. };
  93. }
  94. class CatchOperator<T, R> implements Operator<T, T | R> {
  95. caught: Observable<T>;
  96. constructor(private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>) {
  97. }
  98. call(subscriber: Subscriber<R>, source: any): any {
  99. return source.subscribe(new CatchSubscriber(subscriber, this.selector, this.caught));
  100. }
  101. }
  102. /**
  103. * We need this JSDoc comment for affecting ESDoc.
  104. * @ignore
  105. * @extends {Ignored}
  106. */
  107. class CatchSubscriber<T, R> extends SimpleOuterSubscriber<T, T | R> {
  108. constructor(destination: Subscriber<any>,
  109. private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>,
  110. private caught: Observable<T>) {
  111. super(destination);
  112. }
  113. // NOTE: overriding `error` instead of `_error` because we don't want
  114. // to have this flag this subscriber as `isStopped`. We can mimic the
  115. // behavior of the RetrySubscriber (from the `retry` operator), where
  116. // we unsubscribe from our source chain, reset our Subscriber flags,
  117. // then subscribe to the selector result.
  118. error(err: any) {
  119. if (!this.isStopped) {
  120. let result: any;
  121. try {
  122. result = this.selector(err, this.caught);
  123. } catch (err2) {
  124. super.error(err2);
  125. return;
  126. }
  127. this._unsubscribeAndRecycle();
  128. const innerSubscriber = new SimpleInnerSubscriber(this);
  129. this.add(innerSubscriber);
  130. const innerSubscription = innerSubscribe(result, innerSubscriber);
  131. // The returned subscription will usually be the subscriber that was
  132. // passed. However, interop subscribers will be wrapped and for
  133. // unsubscriptions to chain correctly, the wrapper needs to be added, too.
  134. if (innerSubscription !== innerSubscriber) {
  135. this.add(innerSubscription);
  136. }
  137. }
  138. }
  139. }