f9ba82c0a75ee7de1a9b1950c6ee3d32f20a7aacc38de8776468c5dd1e3190e79c5f5db43a363ce577d5772dd816fddbea707b5822d8c4d18f421e7a6b77d9 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import { Subscription } from '../Subscription';
  2. import { MonoTypeOperatorFunction, ObservableInput } from '../types';
  3. import { operate } from '../util/lift';
  4. import { createOperatorSubscriber } from './OperatorSubscriber';
  5. import { innerFrom } from '../observable/innerFrom';
  6. /**
  7. * An object interface used by {@link throttle} or {@link throttleTime} that ensure
  8. * configuration options of these operators.
  9. *
  10. * @see {@link throttle}
  11. * @see {@link throttleTime}
  12. */
  13. export interface ThrottleConfig {
  14. /**
  15. * If `true`, the resulting Observable will emit the first value from the source
  16. * Observable at the **start** of the "throttling" process (when starting an
  17. * internal timer that prevents other emissions from the source to pass through).
  18. * If `false`, it will not emit the first value from the source Observable at the
  19. * start of the "throttling" process.
  20. *
  21. * If not provided, defaults to: `true`.
  22. */
  23. leading?: boolean;
  24. /**
  25. * If `true`, the resulting Observable will emit the last value from the source
  26. * Observable at the **end** of the "throttling" process (when ending an internal
  27. * timer that prevents other emissions from the source to pass through).
  28. * If `false`, it will not emit the last value from the source Observable at the
  29. * end of the "throttling" process.
  30. *
  31. * If not provided, defaults to: `false`.
  32. */
  33. trailing?: boolean;
  34. }
  35. /**
  36. * Emits a value from the source Observable, then ignores subsequent source
  37. * values for a duration determined by another Observable, then repeats this
  38. * process.
  39. *
  40. * <span class="informal">It's like {@link throttleTime}, but the silencing
  41. * duration is determined by a second Observable.</span>
  42. *
  43. * ![](throttle.svg)
  44. *
  45. * `throttle` emits the source Observable values on the output Observable
  46. * when its internal timer is disabled, and ignores source values when the timer
  47. * is enabled. Initially, the timer is disabled. As soon as the first source
  48. * value arrives, it is forwarded to the output Observable, and then the timer
  49. * is enabled by calling the `durationSelector` function with the source value,
  50. * which returns the "duration" Observable. When the duration Observable emits a
  51. * value, the timer is disabled, and this process repeats for the
  52. * next source value.
  53. *
  54. * ## Example
  55. *
  56. * Emit clicks at a rate of at most one click per second
  57. *
  58. * ```ts
  59. * import { fromEvent, throttle, interval } from 'rxjs';
  60. *
  61. * const clicks = fromEvent(document, 'click');
  62. * const result = clicks.pipe(throttle(() => interval(1000)));
  63. *
  64. * result.subscribe(x => console.log(x));
  65. * ```
  66. *
  67. * @see {@link audit}
  68. * @see {@link debounce}
  69. * @see {@link delayWhen}
  70. * @see {@link sample}
  71. * @see {@link throttleTime}
  72. *
  73. * @param durationSelector A function that receives a value from the source
  74. * Observable, for computing the silencing duration for each source value,
  75. * returned as an `ObservableInput`.
  76. * @param config A configuration object to define `leading` and `trailing`
  77. * behavior. Defaults to `{ leading: true, trailing: false }`.
  78. * @return A function that returns an Observable that performs the throttle
  79. * operation to limit the rate of emissions from the source.
  80. */
  81. export function throttle<T>(durationSelector: (value: T) => ObservableInput<any>, config?: ThrottleConfig): MonoTypeOperatorFunction<T> {
  82. return operate((source, subscriber) => {
  83. const { leading = true, trailing = false } = config ?? {};
  84. let hasValue = false;
  85. let sendValue: T | null = null;
  86. let throttled: Subscription | null = null;
  87. let isComplete = false;
  88. const endThrottling = () => {
  89. throttled?.unsubscribe();
  90. throttled = null;
  91. if (trailing) {
  92. send();
  93. isComplete && subscriber.complete();
  94. }
  95. };
  96. const cleanupThrottling = () => {
  97. throttled = null;
  98. isComplete && subscriber.complete();
  99. };
  100. const startThrottle = (value: T) =>
  101. (throttled = innerFrom(durationSelector(value)).subscribe(createOperatorSubscriber(subscriber, endThrottling, cleanupThrottling)));
  102. const send = () => {
  103. if (hasValue) {
  104. // Ensure we clear out our value and hasValue flag
  105. // before we emit, otherwise reentrant code can cause
  106. // issues here.
  107. hasValue = false;
  108. const value = sendValue!;
  109. sendValue = null;
  110. // Emit the value.
  111. subscriber.next(value);
  112. !isComplete && startThrottle(value);
  113. }
  114. };
  115. source.subscribe(
  116. createOperatorSubscriber(
  117. subscriber,
  118. // Regarding the presence of throttled.closed in the following
  119. // conditions, if a synchronous duration selector is specified - weird,
  120. // but legal - an already-closed subscription will be assigned to
  121. // throttled, so the subscription's closed property needs to be checked,
  122. // too.
  123. (value) => {
  124. hasValue = true;
  125. sendValue = value;
  126. !(throttled && !throttled.closed) && (leading ? send() : startThrottle(value));
  127. },
  128. () => {
  129. isComplete = true;
  130. !(trailing && hasValue && throttled && !throttled.closed) && subscriber.complete();
  131. }
  132. )
  133. );
  134. });
  135. }