c2bd13e8c72d8228f8bd90efc43cc78bc2504482cee367d1a4cb44616970834edf905121e1f22b7ea7f048f8bfd6f0aaed42afd93d4789278b7881c7a30796 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import { MonoTypeOperatorFunction } from '../types';
  2. import { identity } from '../util/identity';
  3. import { operate } from '../util/lift';
  4. import { createOperatorSubscriber } from './OperatorSubscriber';
  5. /**
  6. * Skip a specified number of values before the completion of an observable.
  7. *
  8. * ![](skipLast.png)
  9. *
  10. * Returns an observable that will emit values as soon as it can, given a number of
  11. * skipped values. For example, if you `skipLast(3)` on a source, when the source
  12. * emits its fourth value, the first value the source emitted will finally be emitted
  13. * from the returned observable, as it is no longer part of what needs to be skipped.
  14. *
  15. * All values emitted by the result of `skipLast(N)` will be delayed by `N` emissions,
  16. * as each value is held in a buffer until enough values have been emitted that that
  17. * the buffered value may finally be sent to the consumer.
  18. *
  19. * After subscribing, unsubscribing will not result in the emission of the buffered
  20. * skipped values.
  21. *
  22. * ## Example
  23. *
  24. * Skip the last 2 values of an observable with many values
  25. *
  26. * ```ts
  27. * import { of, skipLast } from 'rxjs';
  28. *
  29. * const numbers = of(1, 2, 3, 4, 5);
  30. * const skipLastTwo = numbers.pipe(skipLast(2));
  31. * skipLastTwo.subscribe(x => console.log(x));
  32. *
  33. * // Results in:
  34. * // 1 2 3
  35. * // (4 and 5 are skipped)
  36. * ```
  37. *
  38. * @see {@link skip}
  39. * @see {@link skipUntil}
  40. * @see {@link skipWhile}
  41. * @see {@link take}
  42. *
  43. * @param skipCount Number of elements to skip from the end of the source Observable.
  44. * @return A function that returns an Observable that skips the last `count`
  45. * values emitted by the source Observable.
  46. */
  47. export function skipLast<T>(skipCount: number): MonoTypeOperatorFunction<T> {
  48. return skipCount <= 0
  49. ? // For skipCounts less than or equal to zero, we are just mirroring the source.
  50. identity
  51. : operate((source, subscriber) => {
  52. // A ring buffer to hold the values while we wait to see
  53. // if we can emit it or it's part of the "skipped" last values.
  54. // Note that it is the _same size_ as the skip count.
  55. let ring: T[] = new Array(skipCount);
  56. // The number of values seen so far. This is used to get
  57. // the index of the current value when it arrives.
  58. let seen = 0;
  59. source.subscribe(
  60. createOperatorSubscriber(subscriber, (value) => {
  61. // Get the index of the value we have right now
  62. // relative to all other values we've seen, then
  63. // increment `seen`. This ensures we've moved to
  64. // the next slot in our ring buffer.
  65. const valueIndex = seen++;
  66. if (valueIndex < skipCount) {
  67. // If we haven't seen enough values to fill our buffer yet,
  68. // Then we aren't to a number of seen values where we can
  69. // emit anything, so let's just start by filling the ring buffer.
  70. ring[valueIndex] = value;
  71. } else {
  72. // We are traversing over the ring array in such
  73. // a way that when we get to the end, we loop back
  74. // and go to the start.
  75. const index = valueIndex % skipCount;
  76. // Pull the oldest value out so we can emit it,
  77. // and stuff the new value in it's place.
  78. const oldValue = ring[index];
  79. ring[index] = value;
  80. // Emit the old value. It is important that this happens
  81. // after we swap the value in the buffer, if it happens
  82. // before we swap the value in the buffer, then a synchronous
  83. // source can get the buffer out of whack.
  84. subscriber.next(oldValue);
  85. }
  86. })
  87. );
  88. return () => {
  89. // Release our values in memory
  90. ring = null!;
  91. };
  92. });
  93. }