46569866d16433d5608a3a5f22cfd5dc0aa5c1cee10a248a6b6c5f62c9b820f502695413b20bee203ccfe66a2cf9b011989ca253106df48f6d1e59918d0311 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import { OperatorFunction } from '../types';
  2. import { operate } from '../util/lift';
  3. import { createOperatorSubscriber } from './OperatorSubscriber';
  4. import { arrRemove } from '../util/arrRemove';
  5. /**
  6. * Buffers the source Observable values until the size hits the maximum
  7. * `bufferSize` given.
  8. *
  9. * <span class="informal">Collects values from the past as an array, and emits
  10. * that array only when its size reaches `bufferSize`.</span>
  11. *
  12. * ![](bufferCount.png)
  13. *
  14. * Buffers a number of values from the source Observable by `bufferSize` then
  15. * emits the buffer and clears it, and starts a new buffer each
  16. * `startBufferEvery` values. If `startBufferEvery` is not provided or is
  17. * `null`, then new buffers are started immediately at the start of the source
  18. * and when each buffer closes and is emitted.
  19. *
  20. * ## Examples
  21. *
  22. * Emit the last two click events as an array
  23. *
  24. * ```ts
  25. * import { fromEvent, bufferCount } from 'rxjs';
  26. *
  27. * const clicks = fromEvent(document, 'click');
  28. * const buffered = clicks.pipe(bufferCount(2));
  29. * buffered.subscribe(x => console.log(x));
  30. * ```
  31. *
  32. * On every click, emit the last two click events as an array
  33. *
  34. * ```ts
  35. * import { fromEvent, bufferCount } from 'rxjs';
  36. *
  37. * const clicks = fromEvent(document, 'click');
  38. * const buffered = clicks.pipe(bufferCount(2, 1));
  39. * buffered.subscribe(x => console.log(x));
  40. * ```
  41. *
  42. * @see {@link buffer}
  43. * @see {@link bufferTime}
  44. * @see {@link bufferToggle}
  45. * @see {@link bufferWhen}
  46. * @see {@link pairwise}
  47. * @see {@link windowCount}
  48. *
  49. * @param bufferSize The maximum size of the buffer emitted.
  50. * @param startBufferEvery Interval at which to start a new buffer.
  51. * For example if `startBufferEvery` is `2`, then a new buffer will be started
  52. * on every other value from the source. A new buffer is started at the
  53. * beginning of the source by default.
  54. * @return A function that returns an Observable of arrays of buffered values.
  55. */
  56. export function bufferCount<T>(bufferSize: number, startBufferEvery: number | null = null): OperatorFunction<T, T[]> {
  57. // If no `startBufferEvery` value was supplied, then we're
  58. // opening and closing on the bufferSize itself.
  59. startBufferEvery = startBufferEvery ?? bufferSize;
  60. return operate((source, subscriber) => {
  61. let buffers: T[][] = [];
  62. let count = 0;
  63. source.subscribe(
  64. createOperatorSubscriber(
  65. subscriber,
  66. (value) => {
  67. let toEmit: T[][] | null = null;
  68. // Check to see if we need to start a buffer.
  69. // This will start one at the first value, and then
  70. // a new one every N after that.
  71. if (count++ % startBufferEvery! === 0) {
  72. buffers.push([]);
  73. }
  74. // Push our value into our active buffers.
  75. for (const buffer of buffers) {
  76. buffer.push(value);
  77. // Check to see if we're over the bufferSize
  78. // if we are, record it so we can emit it later.
  79. // If we emitted it now and removed it, it would
  80. // mutate the `buffers` array while we're looping
  81. // over it.
  82. if (bufferSize <= buffer.length) {
  83. toEmit = toEmit ?? [];
  84. toEmit.push(buffer);
  85. }
  86. }
  87. if (toEmit) {
  88. // We have found some buffers that are over the
  89. // `bufferSize`. Emit them, and remove them from our
  90. // buffers list.
  91. for (const buffer of toEmit) {
  92. arrRemove(buffers, buffer);
  93. subscriber.next(buffer);
  94. }
  95. }
  96. },
  97. () => {
  98. // When the source completes, emit all of our
  99. // active buffers.
  100. for (const buffer of buffers) {
  101. subscriber.next(buffer);
  102. }
  103. subscriber.complete();
  104. },
  105. // Pass all errors through to consumer.
  106. undefined,
  107. () => {
  108. // Clean up our memory when we finalize
  109. buffers = null!;
  110. }
  111. )
  112. );
  113. });
  114. }