7192a5209cff8508fce1003293626adf24a2c48cd197fb110874e60413a12a23d323b1916397f652f2313f102c67c572b56f9d80d31df323cd6f8cec3e99ac 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import { Operator } from '../Operator';
  2. import { Subscriber } from '../Subscriber';
  3. import { Observable } from '../Observable';
  4. import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
  5. /**
  6. * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
  7. * calls `error`, this method will resubscribe to the source Observable for a maximum of `count` resubscriptions (given
  8. * as a number parameter) rather than propagating the `error` call.
  9. *
  10. * ![](retry.png)
  11. *
  12. * Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted
  13. * during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second
  14. * time and emits: [1, 2, 3, 4, 5] then the complete stream of emissions and notifications
  15. * would be: [1, 2, 1, 2, 3, 4, 5, `complete`].
  16. *
  17. * ## Example
  18. * ```ts
  19. * import { interval, of, throwError } from 'rxjs';
  20. * import { mergeMap, retry } from 'rxjs/operators';
  21. *
  22. * const source = interval(1000);
  23. * const example = source.pipe(
  24. * mergeMap(val => {
  25. * if(val > 5){
  26. * return throwError('Error!');
  27. * }
  28. * return of(val);
  29. * }),
  30. * //retry 2 times on error
  31. * retry(2)
  32. * );
  33. *
  34. * const subscribe = example.subscribe({
  35. * next: val => console.log(val),
  36. * error: val => console.log(`${val}: Retried 2 times then quit!`)
  37. * });
  38. *
  39. * // Output:
  40. * // 0..1..2..3..4..5..
  41. * // 0..1..2..3..4..5..
  42. * // 0..1..2..3..4..5..
  43. * // "Error!: Retried 2 times then quit!"
  44. * ```
  45. *
  46. * @param {number} count - Number of retry attempts before failing.
  47. * @return {Observable} The source Observable modified with the retry logic.
  48. * @method retry
  49. * @owner Observable
  50. */
  51. export function retry<T>(count: number = -1): MonoTypeOperatorFunction<T> {
  52. return (source: Observable<T>) => source.lift(new RetryOperator(count, source));
  53. }
  54. class RetryOperator<T> implements Operator<T, T> {
  55. constructor(private count: number,
  56. private source: Observable<T>) {
  57. }
  58. call(subscriber: Subscriber<T>, source: any): TeardownLogic {
  59. return source.subscribe(new RetrySubscriber(subscriber, this.count, this.source));
  60. }
  61. }
  62. /**
  63. * We need this JSDoc comment for affecting ESDoc.
  64. * @ignore
  65. * @extends {Ignored}
  66. */
  67. class RetrySubscriber<T> extends Subscriber<T> {
  68. constructor(destination: Subscriber<any>,
  69. private count: number,
  70. private source: Observable<T>) {
  71. super(destination);
  72. }
  73. error(err: any) {
  74. if (!this.isStopped) {
  75. const { source, count } = this;
  76. if (count === 0) {
  77. return super.error(err);
  78. } else if (count > -1) {
  79. this.count = count - 1;
  80. }
  81. source.subscribe(this._unsubscribeAndRecycle());
  82. }
  83. }
  84. }