e269fb0d093210b2d208fc875d9b37fa40748ec2808964fa90667340b27aa4600049754adfef6088ea855fd5e26994ebcab5a7a28204512e33f6d9362245f9 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import { Subscription } from '../Subscription';
  2. import { EMPTY } from '../observable/empty';
  3. import { operate } from '../util/lift';
  4. import { MonoTypeOperatorFunction, ObservableInput } from '../types';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. import { innerFrom } from '../observable/innerFrom';
  7. import { timer } from '../observable/timer';
  8. export interface RepeatConfig {
  9. /**
  10. * The number of times to repeat the source. Defaults to `Infinity`.
  11. */
  12. count?: number;
  13. /**
  14. * If a `number`, will delay the repeat of the source by that number of milliseconds.
  15. * If a function, it will provide the number of times the source has been subscribed to,
  16. * and the return value should be a valid observable input that will notify when the source
  17. * should be repeated. If the notifier observable is empty, the result will complete.
  18. */
  19. delay?: number | ((count: number) => ObservableInput<any>);
  20. }
  21. /**
  22. * Returns an Observable that will resubscribe to the source stream when the source stream completes.
  23. *
  24. * <span class="informal">Repeats all values emitted on the source. It's like {@link retry}, but for non error cases.</span>
  25. *
  26. * ![](repeat.png)
  27. *
  28. * Repeat will output values from a source until the source completes, then it will resubscribe to the
  29. * source a specified number of times, with a specified delay. Repeat can be particularly useful in
  30. * combination with closing operators like {@link take}, {@link takeUntil}, {@link first}, or {@link takeWhile},
  31. * as it can be used to restart a source again from scratch.
  32. *
  33. * Repeat is very similar to {@link retry}, where {@link retry} will resubscribe to the source in the error case, but
  34. * `repeat` will resubscribe if the source completes.
  35. *
  36. * Note that `repeat` will _not_ catch errors. Use {@link retry} for that.
  37. *
  38. * - `repeat(0)` returns an empty observable
  39. * - `repeat()` will repeat forever
  40. * - `repeat({ delay: 200 })` will repeat forever, with a delay of 200ms between repetitions.
  41. * - `repeat({ count: 2, delay: 400 })` will repeat twice, with a delay of 400ms between repetitions.
  42. * - `repeat({ delay: (count) => timer(count * 1000) })` will repeat forever, but will have a delay that grows by one second for each repetition.
  43. *
  44. * ## Example
  45. *
  46. * Repeat a message stream
  47. *
  48. * ```ts
  49. * import { of, repeat } from 'rxjs';
  50. *
  51. * const source = of('Repeat message');
  52. * const result = source.pipe(repeat(3));
  53. *
  54. * result.subscribe(x => console.log(x));
  55. *
  56. * // Results
  57. * // 'Repeat message'
  58. * // 'Repeat message'
  59. * // 'Repeat message'
  60. * ```
  61. *
  62. * Repeat 3 values, 2 times
  63. *
  64. * ```ts
  65. * import { interval, take, repeat } from 'rxjs';
  66. *
  67. * const source = interval(1000);
  68. * const result = source.pipe(take(3), repeat(2));
  69. *
  70. * result.subscribe(x => console.log(x));
  71. *
  72. * // Results every second
  73. * // 0
  74. * // 1
  75. * // 2
  76. * // 0
  77. * // 1
  78. * // 2
  79. * ```
  80. *
  81. * Defining two complex repeats with delays on the same source.
  82. * Note that the second repeat cannot be called until the first
  83. * repeat as exhausted it's count.
  84. *
  85. * ```ts
  86. * import { defer, of, repeat } from 'rxjs';
  87. *
  88. * const source = defer(() => {
  89. * return of(`Hello, it is ${new Date()}`)
  90. * });
  91. *
  92. * source.pipe(
  93. * // Repeat 3 times with a delay of 1 second between repetitions
  94. * repeat({
  95. * count: 3,
  96. * delay: 1000,
  97. * }),
  98. *
  99. * // *Then* repeat forever, but with an exponential step-back
  100. * // maxing out at 1 minute.
  101. * repeat({
  102. * delay: (count) => timer(Math.min(60000, 2 ^ count * 1000))
  103. * })
  104. * )
  105. * ```
  106. *
  107. * @see {@link repeatWhen}
  108. * @see {@link retry}
  109. *
  110. * @param countOrConfig Either the number of times the source Observable items are repeated
  111. * (a count of 0 will yield an empty Observable) or a {@link RepeatConfig} object.
  112. */
  113. export function repeat<T>(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction<T> {
  114. let count = Infinity;
  115. let delay: RepeatConfig['delay'];
  116. if (countOrConfig != null) {
  117. if (typeof countOrConfig === 'object') {
  118. ({ count = Infinity, delay } = countOrConfig);
  119. } else {
  120. count = countOrConfig;
  121. }
  122. }
  123. return count <= 0
  124. ? () => EMPTY
  125. : operate((source, subscriber) => {
  126. let soFar = 0;
  127. let sourceSub: Subscription | null;
  128. const resubscribe = () => {
  129. sourceSub?.unsubscribe();
  130. sourceSub = null;
  131. if (delay != null) {
  132. const notifier = typeof delay === 'number' ? timer(delay) : innerFrom(delay(soFar));
  133. const notifierSubscriber = createOperatorSubscriber(subscriber, () => {
  134. notifierSubscriber.unsubscribe();
  135. subscribeToSource();
  136. });
  137. notifier.subscribe(notifierSubscriber);
  138. } else {
  139. subscribeToSource();
  140. }
  141. };
  142. const subscribeToSource = () => {
  143. let syncUnsub = false;
  144. sourceSub = source.subscribe(
  145. createOperatorSubscriber(subscriber, undefined, () => {
  146. if (++soFar < count) {
  147. if (sourceSub) {
  148. resubscribe();
  149. } else {
  150. syncUnsub = true;
  151. }
  152. } else {
  153. subscriber.complete();
  154. }
  155. })
  156. );
  157. if (syncUnsub) {
  158. resubscribe();
  159. }
  160. };
  161. subscribeToSource();
  162. });
  163. }