123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import { Observable } from '../Observable';
- import { innerFrom } from '../observable/innerFrom';
- import { Subject } from '../Subject';
- import { Subscription } from '../Subscription';
- import { MonoTypeOperatorFunction, ObservableInput } from '../types';
- import { operate } from '../util/lift';
- import { createOperatorSubscriber } from './OperatorSubscriber';
- /**
- * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
- * calls `error`, this method will emit the Throwable that caused the error to the `ObservableInput` returned from `notifier`.
- * If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child
- * subscription. Otherwise this method will resubscribe to the source Observable.
- *
- * 
- *
- * Retry an observable sequence on error based on custom criteria.
- *
- * ## Example
- *
- * ```ts
- * import { interval, map, retryWhen, tap, delayWhen, timer } from 'rxjs';
- *
- * const source = interval(1000);
- * const result = source.pipe(
- * map(value => {
- * if (value > 5) {
- * // error will be picked up by retryWhen
- * throw value;
- * }
- * return value;
- * }),
- * retryWhen(errors =>
- * errors.pipe(
- * // log error message
- * tap(value => console.log(`Value ${ value } was too high!`)),
- * // restart in 5 seconds
- * delayWhen(value => timer(value * 1000))
- * )
- * )
- * );
- *
- * result.subscribe(value => console.log(value));
- *
- * // results:
- * // 0
- * // 1
- * // 2
- * // 3
- * // 4
- * // 5
- * // 'Value 6 was too high!'
- * // - Wait 5 seconds then repeat
- * ```
- *
- * @see {@link retry}
- *
- * @param notifier Function that receives an Observable of notifications with which a
- * user can `complete` or `error`, aborting the retry.
- * @return A function that returns an Observable that mirrors the source
- * Observable with the exception of an `error`.
- * @deprecated Will be removed in v9 or v10, use {@link retry}'s `delay` option instead.
- * Will be removed in v9 or v10. Use {@link retry}'s {@link RetryConfig#delay delay} option instead.
- * Instead of `retryWhen(() => notify$)`, use: `retry({ delay: () => notify$ })`.
- */
- export function retryWhen<T>(notifier: (errors: Observable<any>) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
- return operate((source, subscriber) => {
- let innerSub: Subscription | null;
- let syncResub = false;
- let errors$: Subject<any>;
- const subscribeForRetryWhen = () => {
- innerSub = source.subscribe(
- createOperatorSubscriber(subscriber, undefined, undefined, (err) => {
- if (!errors$) {
- errors$ = new Subject();
- innerFrom(notifier(errors$)).subscribe(
- createOperatorSubscriber(subscriber, () =>
- // If we have an innerSub, this was an asynchronous call, kick off the retry.
- // Otherwise, if we don't have an innerSub yet, that's because the inner subscription
- // call hasn't even returned yet. We've arrived here synchronously.
- // So we flag that we want to resub, such that we can ensure finalization
- // happens before we resubscribe.
- innerSub ? subscribeForRetryWhen() : (syncResub = true)
- )
- );
- }
- if (errors$) {
- // We have set up the notifier without error.
- errors$.next(err);
- }
- })
- );
- if (syncResub) {
- // Ensure that the inner subscription is torn down before
- // moving on to the next subscription in the synchronous case.
- // If we don't do this here, all inner subscriptions will not be
- // torn down until the entire observable is done.
- innerSub.unsubscribe();
- innerSub = null;
- // We may need to do this multiple times, so reset the flag.
- syncResub = false;
- // Resubscribe
- subscribeForRetryWhen();
- }
- };
- // Start the subscription
- subscribeForRetryWhen();
- });
- }
|