4216f80d5ff848b71f2c7e055aded90123e3bf7fceeb818bd8028cad8ed6814b5e38e7844159930e548a29312ea84bf71870a33b157757e27b0195362f3b8a 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import { Observable } from '../Observable';
  2. import { innerFrom } from '../observable/innerFrom';
  3. import { Subject } from '../Subject';
  4. import { Subscription } from '../Subscription';
  5. import { MonoTypeOperatorFunction, ObservableInput } from '../types';
  6. import { operate } from '../util/lift';
  7. import { createOperatorSubscriber } from './OperatorSubscriber';
  8. /**
  9. * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
  10. * calls `error`, this method will emit the Throwable that caused the error to the `ObservableInput` returned from `notifier`.
  11. * If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child
  12. * subscription. Otherwise this method will resubscribe to the source Observable.
  13. *
  14. * ![](retryWhen.png)
  15. *
  16. * Retry an observable sequence on error based on custom criteria.
  17. *
  18. * ## Example
  19. *
  20. * ```ts
  21. * import { interval, map, retryWhen, tap, delayWhen, timer } from 'rxjs';
  22. *
  23. * const source = interval(1000);
  24. * const result = source.pipe(
  25. * map(value => {
  26. * if (value > 5) {
  27. * // error will be picked up by retryWhen
  28. * throw value;
  29. * }
  30. * return value;
  31. * }),
  32. * retryWhen(errors =>
  33. * errors.pipe(
  34. * // log error message
  35. * tap(value => console.log(`Value ${ value } was too high!`)),
  36. * // restart in 5 seconds
  37. * delayWhen(value => timer(value * 1000))
  38. * )
  39. * )
  40. * );
  41. *
  42. * result.subscribe(value => console.log(value));
  43. *
  44. * // results:
  45. * // 0
  46. * // 1
  47. * // 2
  48. * // 3
  49. * // 4
  50. * // 5
  51. * // 'Value 6 was too high!'
  52. * // - Wait 5 seconds then repeat
  53. * ```
  54. *
  55. * @see {@link retry}
  56. *
  57. * @param notifier Function that receives an Observable of notifications with which a
  58. * user can `complete` or `error`, aborting the retry.
  59. * @return A function that returns an Observable that mirrors the source
  60. * Observable with the exception of an `error`.
  61. * @deprecated Will be removed in v9 or v10, use {@link retry}'s `delay` option instead.
  62. * Will be removed in v9 or v10. Use {@link retry}'s {@link RetryConfig#delay delay} option instead.
  63. * Instead of `retryWhen(() => notify$)`, use: `retry({ delay: () => notify$ })`.
  64. */
  65. export function retryWhen<T>(notifier: (errors: Observable<any>) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
  66. return operate((source, subscriber) => {
  67. let innerSub: Subscription | null;
  68. let syncResub = false;
  69. let errors$: Subject<any>;
  70. const subscribeForRetryWhen = () => {
  71. innerSub = source.subscribe(
  72. createOperatorSubscriber(subscriber, undefined, undefined, (err) => {
  73. if (!errors$) {
  74. errors$ = new Subject();
  75. innerFrom(notifier(errors$)).subscribe(
  76. createOperatorSubscriber(subscriber, () =>
  77. // If we have an innerSub, this was an asynchronous call, kick off the retry.
  78. // Otherwise, if we don't have an innerSub yet, that's because the inner subscription
  79. // call hasn't even returned yet. We've arrived here synchronously.
  80. // So we flag that we want to resub, such that we can ensure finalization
  81. // happens before we resubscribe.
  82. innerSub ? subscribeForRetryWhen() : (syncResub = true)
  83. )
  84. );
  85. }
  86. if (errors$) {
  87. // We have set up the notifier without error.
  88. errors$.next(err);
  89. }
  90. })
  91. );
  92. if (syncResub) {
  93. // Ensure that the inner subscription is torn down before
  94. // moving on to the next subscription in the synchronous case.
  95. // If we don't do this here, all inner subscriptions will not be
  96. // torn down until the entire observable is done.
  97. innerSub.unsubscribe();
  98. innerSub = null;
  99. // We may need to do this multiple times, so reset the flag.
  100. syncResub = false;
  101. // Resubscribe
  102. subscribeForRetryWhen();
  103. }
  104. };
  105. // Start the subscription
  106. subscribeForRetryWhen();
  107. });
  108. }