94df797dd2fe929a5e287f79d7815091583e6334710e6f3ed9c6194d1639b8f7daa727d8b9ad5f6d36ecc7cc0d2f8042621ff74cda6b0bcf7ccfad392540c1 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import { Observable } from '../Observable';
  2. import { innerFrom } from '../observable/innerFrom';
  3. import { Subject } from '../Subject';
  4. import { Subscription } from '../Subscription';
  5. import { MonoTypeOperatorFunction, ObservableInput } from '../types';
  6. import { operate } from '../util/lift';
  7. import { createOperatorSubscriber } from './OperatorSubscriber';
  8. /**
  9. * Returns an Observable that mirrors the source Observable with the exception of a `complete`. If the source
  10. * Observable calls `complete`, this method will emit to the Observable returned from `notifier`. If that Observable
  11. * calls `complete` or `error`, then this method will call `complete` or `error` on the child subscription. Otherwise
  12. * this method will resubscribe to the source Observable.
  13. *
  14. * ![](repeatWhen.png)
  15. *
  16. * ## Example
  17. *
  18. * Repeat a message stream on click
  19. *
  20. * ```ts
  21. * import { of, fromEvent, repeatWhen } from 'rxjs';
  22. *
  23. * const source = of('Repeat message');
  24. * const documentClick$ = fromEvent(document, 'click');
  25. *
  26. * const result = source.pipe(repeatWhen(() => documentClick$));
  27. *
  28. * result.subscribe(data => console.log(data))
  29. * ```
  30. *
  31. * @see {@link repeat}
  32. * @see {@link retry}
  33. * @see {@link retryWhen}
  34. *
  35. * @param notifier Function that receives an Observable of notifications with
  36. * which a user can `complete` or `error`, aborting the repetition.
  37. * @return A function that returns an Observable that mirrors the source
  38. * Observable with the exception of a `complete`.
  39. * @deprecated Will be removed in v9 or v10. Use {@link repeat}'s {@link RepeatConfig#delay delay} option instead.
  40. * Instead of `repeatWhen(() => notify$)`, use: `repeat({ delay: () => notify$ })`.
  41. */
  42. export function repeatWhen<T>(notifier: (notifications: Observable<void>) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
  43. return operate((source, subscriber) => {
  44. let innerSub: Subscription | null;
  45. let syncResub = false;
  46. let completions$: Subject<void>;
  47. let isNotifierComplete = false;
  48. let isMainComplete = false;
  49. /**
  50. * Checks to see if we can complete the result, completes it, and returns `true` if it was completed.
  51. */
  52. const checkComplete = () => isMainComplete && isNotifierComplete && (subscriber.complete(), true);
  53. /**
  54. * Gets the subject to send errors through. If it doesn't exist,
  55. * we know we need to setup the notifier.
  56. */
  57. const getCompletionSubject = () => {
  58. if (!completions$) {
  59. completions$ = new Subject();
  60. // If the call to `notifier` throws, it will be caught by the OperatorSubscriber
  61. // In the main subscription -- in `subscribeForRepeatWhen`.
  62. innerFrom(notifier(completions$)).subscribe(
  63. createOperatorSubscriber(
  64. subscriber,
  65. () => {
  66. if (innerSub) {
  67. subscribeForRepeatWhen();
  68. } else {
  69. // If we don't have an innerSub yet, that's because the inner subscription
  70. // call hasn't even returned yet. We've arrived here synchronously.
  71. // So we flag that we want to resub, such that we can ensure finalization
  72. // happens before we resubscribe.
  73. syncResub = true;
  74. }
  75. },
  76. () => {
  77. isNotifierComplete = true;
  78. checkComplete();
  79. }
  80. )
  81. );
  82. }
  83. return completions$;
  84. };
  85. const subscribeForRepeatWhen = () => {
  86. isMainComplete = false;
  87. innerSub = source.subscribe(
  88. createOperatorSubscriber(subscriber, undefined, () => {
  89. isMainComplete = true;
  90. // Check to see if we are complete, and complete if so.
  91. // If we are not complete. Get the subject. This calls the `notifier` function.
  92. // If that function fails, it will throw and `.next()` will not be reached on this
  93. // line. The thrown error is caught by the _complete handler in this
  94. // `OperatorSubscriber` and handled appropriately.
  95. !checkComplete() && getCompletionSubject().next();
  96. })
  97. );
  98. if (syncResub) {
  99. // Ensure that the inner subscription is torn down before
  100. // moving on to the next subscription in the synchronous case.
  101. // If we don't do this here, all inner subscriptions will not be
  102. // torn down until the entire observable is done.
  103. innerSub.unsubscribe();
  104. // It is important to null this out. Not only to free up memory, but
  105. // to make sure code above knows we are in a subscribing state to
  106. // handle synchronous resubscription.
  107. innerSub = null;
  108. // We may need to do this multiple times, so reset the flags.
  109. syncResub = false;
  110. // Resubscribe
  111. subscribeForRepeatWhen();
  112. }
  113. };
  114. // Start the subscription
  115. subscribeForRepeatWhen();
  116. });
  117. }