881ef3574a0c5f88b70fb0492c20c0cd908c5219186129e7326075a52807e534f81f276b5f4f4ac7579acaccdfb0a563d487763e2e437ecdf14d2c8ae3e567 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import { EMPTY } from '../observable/empty';
  2. import { MonoTypeOperatorFunction } from '../types';
  3. import { operate } from '../util/lift';
  4. import { createOperatorSubscriber } from './OperatorSubscriber';
  5. /**
  6. * Waits for the source to complete, then emits the last N values from the source,
  7. * as specified by the `count` argument.
  8. *
  9. * ![](takeLast.png)
  10. *
  11. * `takeLast` results in an observable that will hold values up to `count` values in memory,
  12. * until the source completes. It then pushes all values in memory to the consumer, in the
  13. * order they were received from the source, then notifies the consumer that it is
  14. * complete.
  15. *
  16. * If for some reason the source completes before the `count` supplied to `takeLast` is reached,
  17. * all values received until that point are emitted, and then completion is notified.
  18. *
  19. * **Warning**: Using `takeLast` with an observable that never completes will result
  20. * in an observable that never emits a value.
  21. *
  22. * ## Example
  23. *
  24. * Take the last 3 values of an Observable with many values
  25. *
  26. * ```ts
  27. * import { range, takeLast } from 'rxjs';
  28. *
  29. * const many = range(1, 100);
  30. * const lastThree = many.pipe(takeLast(3));
  31. * lastThree.subscribe(x => console.log(x));
  32. * ```
  33. *
  34. * @see {@link take}
  35. * @see {@link takeUntil}
  36. * @see {@link takeWhile}
  37. * @see {@link skip}
  38. *
  39. * @param count The maximum number of values to emit from the end of
  40. * the sequence of values emitted by the source Observable.
  41. * @return A function that returns an Observable that emits at most the last
  42. * `count` values emitted by the source Observable.
  43. */
  44. export function takeLast<T>(count: number): MonoTypeOperatorFunction<T> {
  45. return count <= 0
  46. ? () => EMPTY
  47. : operate((source, subscriber) => {
  48. // This buffer will hold the values we are going to emit
  49. // when the source completes. Since we only want to take the
  50. // last N values, we can't emit until we're sure we're not getting
  51. // any more values.
  52. let buffer: T[] = [];
  53. source.subscribe(
  54. createOperatorSubscriber(
  55. subscriber,
  56. (value) => {
  57. // Add the most recent value onto the end of our buffer.
  58. buffer.push(value);
  59. // If our buffer is now larger than the number of values we
  60. // want to take, we remove the oldest value from the buffer.
  61. count < buffer.length && buffer.shift();
  62. },
  63. () => {
  64. // The source completed, we now know what are last values
  65. // are, emit them in the order they were received.
  66. for (const value of buffer) {
  67. subscriber.next(value);
  68. }
  69. subscriber.complete();
  70. },
  71. // Errors are passed through to the consumer
  72. undefined,
  73. () => {
  74. // During finalization release the values in our buffer.
  75. buffer = null!;
  76. }
  77. )
  78. );
  79. });
  80. }