ae9ec378f700cdedf4939d945beeedcd3f7eecf1f023f40c41f37cfaabe8e47aa9d7def9bab7c4e000dc3c2d42fbc353737974d0c24f99ab53afbcf63fb546 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import { Subscriber } from '../Subscriber';
  2. import { MonoTypeOperatorFunction, ObservableInput } from '../types';
  3. import { operate } from '../util/lift';
  4. import { noop } from '../util/noop';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. import { innerFrom } from '../observable/innerFrom';
  7. /**
  8. * Emits a notification from the source Observable only after a particular time span
  9. * determined by another Observable has passed without another source emission.
  10. *
  11. * <span class="informal">It's like {@link debounceTime}, but the time span of
  12. * emission silence is determined by a second Observable.</span>
  13. *
  14. * ![](debounce.svg)
  15. *
  16. * `debounce` delays notifications emitted by the source Observable, but drops previous
  17. * pending delayed emissions if a new notification arrives on the source Observable.
  18. * This operator keeps track of the most recent notification from the source
  19. * Observable, and spawns a duration Observable by calling the
  20. * `durationSelector` function. The notification is emitted only when the duration
  21. * Observable emits a next notification, and if no other notification was emitted on
  22. * the source Observable since the duration Observable was spawned. If a new
  23. * notification appears before the duration Observable emits, the previous notification will
  24. * not be emitted and a new duration is scheduled from `durationSelector` is scheduled.
  25. * If the completing event happens during the scheduled duration the last cached notification
  26. * is emitted before the completion event is forwarded to the output observable.
  27. * If the error event happens during the scheduled duration or after it only the error event is
  28. * forwarded to the output observable. The cache notification is not emitted in this case.
  29. *
  30. * Like {@link debounceTime}, this is a rate-limiting operator, and also a
  31. * delay-like operator since output emissions do not necessarily occur at the
  32. * same time as they did on the source Observable.
  33. *
  34. * ## Example
  35. *
  36. * Emit the most recent click after a burst of clicks
  37. *
  38. * ```ts
  39. * import { fromEvent, scan, debounce, interval } from 'rxjs';
  40. *
  41. * const clicks = fromEvent(document, 'click');
  42. * const result = clicks.pipe(
  43. * scan(i => ++i, 1),
  44. * debounce(i => interval(200 * i))
  45. * );
  46. * result.subscribe(x => console.log(x));
  47. * ```
  48. *
  49. * @see {@link audit}
  50. * @see {@link auditTime}
  51. * @see {@link debounceTime}
  52. * @see {@link delay}
  53. * @see {@link sample}
  54. * @see {@link sampleTime}
  55. * @see {@link throttle}
  56. * @see {@link throttleTime}
  57. *
  58. * @param durationSelector A function
  59. * that receives a value from the source Observable, for computing the timeout
  60. * duration for each source value, returned as an Observable or a Promise.
  61. * @return A function that returns an Observable that delays the emissions of
  62. * the source Observable by the specified duration Observable returned by
  63. * `durationSelector`, and may drop some values if they occur too frequently.
  64. */
  65. export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
  66. return operate((source, subscriber) => {
  67. let hasValue = false;
  68. let lastValue: T | null = null;
  69. // The subscriber/subscription for the current debounce, if there is one.
  70. let durationSubscriber: Subscriber<any> | null = null;
  71. const emit = () => {
  72. // Unsubscribe any current debounce subscription we have,
  73. // we only cared about the first notification from it, and we
  74. // want to clean that subscription up as soon as possible.
  75. durationSubscriber?.unsubscribe();
  76. durationSubscriber = null;
  77. if (hasValue) {
  78. // We have a value! Free up memory first, then emit the value.
  79. hasValue = false;
  80. const value = lastValue!;
  81. lastValue = null;
  82. subscriber.next(value);
  83. }
  84. };
  85. source.subscribe(
  86. createOperatorSubscriber(
  87. subscriber,
  88. (value: T) => {
  89. // Cancel any pending debounce duration. We don't
  90. // need to null it out here yet tho, because we're just going
  91. // to create another one in a few lines.
  92. durationSubscriber?.unsubscribe();
  93. hasValue = true;
  94. lastValue = value;
  95. // Capture our duration subscriber, so we can unsubscribe it when we're notified
  96. // and we're going to emit the value.
  97. durationSubscriber = createOperatorSubscriber(subscriber, emit, noop);
  98. // Subscribe to the duration.
  99. innerFrom(durationSelector(value)).subscribe(durationSubscriber);
  100. },
  101. () => {
  102. // Source completed.
  103. // Emit any pending debounced values then complete
  104. emit();
  105. subscriber.complete();
  106. },
  107. // Pass all errors through to consumer
  108. undefined,
  109. () => {
  110. // Finalization.
  111. lastValue = durationSubscriber = null;
  112. }
  113. )
  114. );
  115. });
  116. }