123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- import { EMPTY } from '../observable/empty';
- import { MonoTypeOperatorFunction } from '../types';
- import { operate } from '../util/lift';
- import { createOperatorSubscriber } from './OperatorSubscriber';
- /**
- * Waits for the source to complete, then emits the last N values from the source,
- * as specified by the `count` argument.
- *
- * 
- *
- * `takeLast` results in an observable that will hold values up to `count` values in memory,
- * until the source completes. It then pushes all values in memory to the consumer, in the
- * order they were received from the source, then notifies the consumer that it is
- * complete.
- *
- * If for some reason the source completes before the `count` supplied to `takeLast` is reached,
- * all values received until that point are emitted, and then completion is notified.
- *
- * **Warning**: Using `takeLast` with an observable that never completes will result
- * in an observable that never emits a value.
- *
- * ## Example
- *
- * Take the last 3 values of an Observable with many values
- *
- * ```ts
- * import { range, takeLast } from 'rxjs';
- *
- * const many = range(1, 100);
- * const lastThree = many.pipe(takeLast(3));
- * lastThree.subscribe(x => console.log(x));
- * ```
- *
- * @see {@link take}
- * @see {@link takeUntil}
- * @see {@link takeWhile}
- * @see {@link skip}
- *
- * @param count The maximum number of values to emit from the end of
- * the sequence of values emitted by the source Observable.
- * @return A function that returns an Observable that emits at most the last
- * `count` values emitted by the source Observable.
- */
- export function takeLast<T>(count: number): MonoTypeOperatorFunction<T> {
- return count <= 0
- ? () => EMPTY
- : operate((source, subscriber) => {
- // This buffer will hold the values we are going to emit
- // when the source completes. Since we only want to take the
- // last N values, we can't emit until we're sure we're not getting
- // any more values.
- let buffer: T[] = [];
- source.subscribe(
- createOperatorSubscriber(
- subscriber,
- (value) => {
- // Add the most recent value onto the end of our buffer.
- buffer.push(value);
- // If our buffer is now larger than the number of values we
- // want to take, we remove the oldest value from the buffer.
- count < buffer.length && buffer.shift();
- },
- () => {
- // The source completed, we now know what are last values
- // are, emit them in the order they were received.
- for (const value of buffer) {
- subscriber.next(value);
- }
- subscriber.complete();
- },
- // Errors are passed through to the consumer
- undefined,
- () => {
- // During finalization release the values in our buffer.
- buffer = null!;
- }
- )
- );
- });
- }
|