| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import { Subscriber } from '../Subscriber';
- import { ObservableInput, OperatorFunction } from '../types';
- import { operate } from '../util/lift';
- import { noop } from '../util/noop';
- import { createOperatorSubscriber } from './OperatorSubscriber';
- import { innerFrom } from '../observable/innerFrom';
- /**
- * Buffers the source Observable values, using a factory function of closing
- * Observables to determine when to close, emit, and reset the buffer.
- *
- * <span class="informal">Collects values from the past as an array. When it
- * starts collecting values, it calls a function that returns an Observable that
- * tells when to close the buffer and restart collecting.</span>
- *
- * 
- *
- * Opens a buffer immediately, then closes the buffer when the observable
- * returned by calling `closingSelector` function emits a value. When it closes
- * the buffer, it immediately opens a new buffer and repeats the process.
- *
- * ## Example
- *
- * Emit an array of the last clicks every [1-5] random seconds
- *
- * ```ts
- * import { fromEvent, bufferWhen, interval } from 'rxjs';
- *
- * const clicks = fromEvent(document, 'click');
- * const buffered = clicks.pipe(
- * bufferWhen(() => interval(1000 + Math.random() * 4000))
- * );
- * buffered.subscribe(x => console.log(x));
- * ```
- *
- * @see {@link buffer}
- * @see {@link bufferCount}
- * @see {@link bufferTime}
- * @see {@link bufferToggle}
- * @see {@link windowWhen}
- *
- * @param closingSelector A function that takes no arguments and returns an
- * Observable that signals buffer closure.
- * @return A function that returns an Observable of arrays of buffered values.
- */
- export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]> {
- return operate((source, subscriber) => {
- // The buffer we keep and emit.
- let buffer: T[] | null = null;
- // A reference to the subscriber used to subscribe to
- // the closing notifier. We need to hold this so we can
- // end the subscription after the first notification.
- let closingSubscriber: Subscriber<T> | null = null;
- // Ends the previous closing notifier subscription, so it
- // terminates after the first emission, then emits
- // the current buffer if there is one, starts a new buffer, and starts a
- // new closing notifier.
- const openBuffer = () => {
- // Make sure to finalize the closing subscription, we only cared
- // about one notification.
- closingSubscriber?.unsubscribe();
- // emit the buffer if we have one, and start a new buffer.
- const b = buffer;
- buffer = [];
- b && subscriber.next(b);
- // Get a new closing notifier and subscribe to it.
- innerFrom(closingSelector()).subscribe((closingSubscriber = createOperatorSubscriber(subscriber, openBuffer, noop)));
- };
- // Start the first buffer.
- openBuffer();
- // Subscribe to our source.
- source.subscribe(
- createOperatorSubscriber(
- subscriber,
- // Add every new value to the current buffer.
- (value) => buffer?.push(value),
- // When we complete, emit the buffer if we have one,
- // then complete the result.
- () => {
- buffer && subscriber.next(buffer);
- subscriber.complete();
- },
- // Pass all errors through to consumer.
- undefined,
- // Release memory on finalization
- () => (buffer = closingSubscriber = null!)
- )
- );
- });
- }
|