5800c62c2949e4ce5ec61d529e1c4969decad68b71359b8452d667b1417aeafc92b1d72635d2424e9213a31303505727d5b9b0e1d4b19208b88b531e32f739 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import { Operator } from '../Operator';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types';
  6. import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
  7. /**
  8. * Emits a value from the source Observable only after a particular time span
  9. * determined by another Observable has passed without another source emission.
  10. *
  11. * <span class="informal">It's like {@link debounceTime}, but the time span of
  12. * emission silence is determined by a second Observable.</span>
  13. *
  14. * ![](debounce.png)
  15. *
  16. * `debounce` delays values emitted by the source Observable, but drops previous
  17. * pending delayed emissions if a new value arrives on the source Observable.
  18. * This operator keeps track of the most recent value from the source
  19. * Observable, and spawns a duration Observable by calling the
  20. * `durationSelector` function. The value is emitted only when the duration
  21. * Observable emits a value or completes, and if no other value was emitted on
  22. * the source Observable since the duration Observable was spawned. If a new
  23. * value appears before the duration Observable emits, the previous value will
  24. * be dropped and will not be emitted on the output Observable.
  25. *
  26. * Like {@link debounceTime}, this is a rate-limiting operator, and also a
  27. * delay-like operator since output emissions do not necessarily occur at the
  28. * same time as they did on the source Observable.
  29. *
  30. * ## Example
  31. * Emit the most recent click after a burst of clicks
  32. * ```ts
  33. * import { fromEvent, interval } from 'rxjs';
  34. * import { debounce } from 'rxjs/operators';
  35. *
  36. * const clicks = fromEvent(document, 'click');
  37. * const result = clicks.pipe(debounce(() => interval(1000)));
  38. * result.subscribe(x => console.log(x));
  39. * ```
  40. *
  41. * @see {@link audit}
  42. * @see {@link debounceTime}
  43. * @see {@link delayWhen}
  44. * @see {@link throttle}
  45. *
  46. * @param {function(value: T): SubscribableOrPromise} durationSelector A function
  47. * that receives a value from the source Observable, for computing the timeout
  48. * duration for each source value, returned as an Observable or a Promise.
  49. * @return {Observable} An Observable that delays the emissions of the source
  50. * Observable by the specified duration Observable returned by
  51. * `durationSelector`, and may drop some values if they occur too frequently.
  52. * @method debounce
  53. * @owner Observable
  54. */
  55. export function debounce<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): MonoTypeOperatorFunction<T> {
  56. return (source: Observable<T>) => source.lift(new DebounceOperator(durationSelector));
  57. }
  58. class DebounceOperator<T> implements Operator<T, T> {
  59. constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>) {
  60. }
  61. call(subscriber: Subscriber<T>, source: any): TeardownLogic {
  62. return source.subscribe(new DebounceSubscriber(subscriber, this.durationSelector));
  63. }
  64. }
  65. /**
  66. * We need this JSDoc comment for affecting ESDoc.
  67. * @ignore
  68. * @extends {Ignored}
  69. */
  70. class DebounceSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
  71. private value?: T;
  72. private hasValue = false;
  73. private durationSubscription?: Subscription;
  74. constructor(destination: Subscriber<R>,
  75. private durationSelector: (value: T) => SubscribableOrPromise<any>) {
  76. super(destination);
  77. }
  78. protected _next(value: T): void {
  79. try {
  80. const result = this.durationSelector.call(this, value);
  81. if (result) {
  82. this._tryNext(value, result);
  83. }
  84. } catch (err) {
  85. this.destination.error!(err);
  86. }
  87. }
  88. protected _complete(): void {
  89. this.emitValue();
  90. this.destination.complete!();
  91. }
  92. private _tryNext(value: T, duration: SubscribableOrPromise<any>): void {
  93. let subscription = this.durationSubscription;
  94. this.value = value;
  95. this.hasValue = true;
  96. if (subscription) {
  97. subscription.unsubscribe();
  98. this.remove(subscription);
  99. }
  100. subscription = innerSubscribe(duration, new SimpleInnerSubscriber(this));
  101. if (subscription && !subscription.closed) {
  102. this.add(this.durationSubscription = subscription);
  103. }
  104. }
  105. notifyNext(): void {
  106. this.emitValue();
  107. }
  108. notifyComplete(): void {
  109. this.emitValue();
  110. }
  111. emitValue(): void {
  112. if (this.hasValue) {
  113. const value = this.value;
  114. const subscription = this.durationSubscription;
  115. if (subscription) {
  116. this.durationSubscription = undefined;
  117. subscription.unsubscribe();
  118. this.remove(subscription);
  119. }
  120. // This must be done *before* passing the value
  121. // along to the destination because it's possible for
  122. // the value to synchronously re-enter this operator
  123. // recursively if the duration selector Observable
  124. // emits synchronously
  125. this.value = undefined;
  126. this.hasValue = false;
  127. super._next(value!);
  128. }
  129. }
  130. }