123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import { Subscription } from '../Subscription';
- import { MonoTypeOperatorFunction, ObservableInput } from '../types';
- import { operate } from '../util/lift';
- import { createOperatorSubscriber } from './OperatorSubscriber';
- import { innerFrom } from '../observable/innerFrom';
- /**
- * An object interface used by {@link throttle} or {@link throttleTime} that ensure
- * configuration options of these operators.
- *
- * @see {@link throttle}
- * @see {@link throttleTime}
- */
- export interface ThrottleConfig {
- /**
- * If `true`, the resulting Observable will emit the first value from the source
- * Observable at the **start** of the "throttling" process (when starting an
- * internal timer that prevents other emissions from the source to pass through).
- * If `false`, it will not emit the first value from the source Observable at the
- * start of the "throttling" process.
- *
- * If not provided, defaults to: `true`.
- */
- leading?: boolean;
- /**
- * If `true`, the resulting Observable will emit the last value from the source
- * Observable at the **end** of the "throttling" process (when ending an internal
- * timer that prevents other emissions from the source to pass through).
- * If `false`, it will not emit the last value from the source Observable at the
- * end of the "throttling" process.
- *
- * If not provided, defaults to: `false`.
- */
- trailing?: boolean;
- }
- /**
- * Emits a value from the source Observable, then ignores subsequent source
- * values for a duration determined by another Observable, then repeats this
- * process.
- *
- * <span class="informal">It's like {@link throttleTime}, but the silencing
- * duration is determined by a second Observable.</span>
- *
- * 
- *
- * `throttle` emits the source Observable values on the output Observable
- * when its internal timer is disabled, and ignores source values when the timer
- * is enabled. Initially, the timer is disabled. As soon as the first source
- * value arrives, it is forwarded to the output Observable, and then the timer
- * is enabled by calling the `durationSelector` function with the source value,
- * which returns the "duration" Observable. When the duration Observable emits a
- * value, the timer is disabled, and this process repeats for the
- * next source value.
- *
- * ## Example
- *
- * Emit clicks at a rate of at most one click per second
- *
- * ```ts
- * import { fromEvent, throttle, interval } from 'rxjs';
- *
- * const clicks = fromEvent(document, 'click');
- * const result = clicks.pipe(throttle(() => interval(1000)));
- *
- * result.subscribe(x => console.log(x));
- * ```
- *
- * @see {@link audit}
- * @see {@link debounce}
- * @see {@link delayWhen}
- * @see {@link sample}
- * @see {@link throttleTime}
- *
- * @param durationSelector A function that receives a value from the source
- * Observable, for computing the silencing duration for each source value,
- * returned as an `ObservableInput`.
- * @param config A configuration object to define `leading` and `trailing`
- * behavior. Defaults to `{ leading: true, trailing: false }`.
- * @return A function that returns an Observable that performs the throttle
- * operation to limit the rate of emissions from the source.
- */
- export function throttle<T>(durationSelector: (value: T) => ObservableInput<any>, config?: ThrottleConfig): MonoTypeOperatorFunction<T> {
- return operate((source, subscriber) => {
- const { leading = true, trailing = false } = config ?? {};
- let hasValue = false;
- let sendValue: T | null = null;
- let throttled: Subscription | null = null;
- let isComplete = false;
- const endThrottling = () => {
- throttled?.unsubscribe();
- throttled = null;
- if (trailing) {
- send();
- isComplete && subscriber.complete();
- }
- };
- const cleanupThrottling = () => {
- throttled = null;
- isComplete && subscriber.complete();
- };
- const startThrottle = (value: T) =>
- (throttled = innerFrom(durationSelector(value)).subscribe(createOperatorSubscriber(subscriber, endThrottling, cleanupThrottling)));
- const send = () => {
- if (hasValue) {
- // Ensure we clear out our value and hasValue flag
- // before we emit, otherwise reentrant code can cause
- // issues here.
- hasValue = false;
- const value = sendValue!;
- sendValue = null;
- // Emit the value.
- subscriber.next(value);
- !isComplete && startThrottle(value);
- }
- };
- source.subscribe(
- createOperatorSubscriber(
- subscriber,
- // Regarding the presence of throttled.closed in the following
- // conditions, if a synchronous duration selector is specified - weird,
- // but legal - an already-closed subscription will be assigned to
- // throttled, so the subscription's closed property needs to be checked,
- // too.
- (value) => {
- hasValue = true;
- sendValue = value;
- !(throttled && !throttled.closed) && (leading ? send() : startThrottle(value));
- },
- () => {
- isComplete = true;
- !(trailing && hasValue && throttled && !throttled.closed) && subscriber.complete();
- }
- )
- );
- });
- }
|