1202e29ae3b5e36f336d0cfa9306232a5ddc4074ae9826ae1eaa61cab86645a637a4b38dee605994a29a0f04f845e247f09f4a23016fefae330f70c50e9746 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import { Subscription } from '../Subscription';
  2. import { OperatorFunction, SchedulerLike } from '../types';
  3. import { operate } from '../util/lift';
  4. import { createOperatorSubscriber } from './OperatorSubscriber';
  5. import { arrRemove } from '../util/arrRemove';
  6. import { asyncScheduler } from '../scheduler/async';
  7. import { popScheduler } from '../util/args';
  8. import { executeSchedule } from '../util/executeSchedule';
  9. export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
  10. export function bufferTime<T>(
  11. bufferTimeSpan: number,
  12. bufferCreationInterval: number | null | undefined,
  13. scheduler?: SchedulerLike
  14. ): OperatorFunction<T, T[]>;
  15. export function bufferTime<T>(
  16. bufferTimeSpan: number,
  17. bufferCreationInterval: number | null | undefined,
  18. maxBufferSize: number,
  19. scheduler?: SchedulerLike
  20. ): OperatorFunction<T, T[]>;
  21. /**
  22. * Buffers the source Observable values for a specific time period.
  23. *
  24. * <span class="informal">Collects values from the past as an array, and emits
  25. * those arrays periodically in time.</span>
  26. *
  27. * ![](bufferTime.png)
  28. *
  29. * Buffers values from the source for a specific time duration `bufferTimeSpan`.
  30. * Unless the optional argument `bufferCreationInterval` is given, it emits and
  31. * resets the buffer every `bufferTimeSpan` milliseconds. If
  32. * `bufferCreationInterval` is given, this operator opens the buffer every
  33. * `bufferCreationInterval` milliseconds and closes (emits and resets) the
  34. * buffer every `bufferTimeSpan` milliseconds. When the optional argument
  35. * `maxBufferSize` is specified, the buffer will be closed either after
  36. * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
  37. *
  38. * ## Examples
  39. *
  40. * Every second, emit an array of the recent click events
  41. *
  42. * ```ts
  43. * import { fromEvent, bufferTime } from 'rxjs';
  44. *
  45. * const clicks = fromEvent(document, 'click');
  46. * const buffered = clicks.pipe(bufferTime(1000));
  47. * buffered.subscribe(x => console.log(x));
  48. * ```
  49. *
  50. * Every 5 seconds, emit the click events from the next 2 seconds
  51. *
  52. * ```ts
  53. * import { fromEvent, bufferTime } from 'rxjs';
  54. *
  55. * const clicks = fromEvent(document, 'click');
  56. * const buffered = clicks.pipe(bufferTime(2000, 5000));
  57. * buffered.subscribe(x => console.log(x));
  58. * ```
  59. *
  60. * @see {@link buffer}
  61. * @see {@link bufferCount}
  62. * @see {@link bufferToggle}
  63. * @see {@link bufferWhen}
  64. * @see {@link windowTime}
  65. *
  66. * @param bufferTimeSpan The amount of time to fill each buffer array.
  67. * @param otherArgs Other configuration arguments such as:
  68. * - `bufferCreationInterval` - the interval at which to start new buffers;
  69. * - `maxBufferSize` - the maximum buffer size;
  70. * - `scheduler` - the scheduler on which to schedule the intervals that determine buffer boundaries.
  71. * @return A function that returns an Observable of arrays of buffered values.
  72. */
  73. export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): OperatorFunction<T, T[]> {
  74. const scheduler = popScheduler(otherArgs) ?? asyncScheduler;
  75. const bufferCreationInterval = (otherArgs[0] as number) ?? null;
  76. const maxBufferSize = (otherArgs[1] as number) || Infinity;
  77. return operate((source, subscriber) => {
  78. // The active buffers, their related subscriptions, and removal functions.
  79. let bufferRecords: { buffer: T[]; subs: Subscription }[] | null = [];
  80. // If true, it means that every time we emit a buffer, we want to start a new buffer
  81. // this is only really used for when *just* the buffer time span is passed.
  82. let restartOnEmit = false;
  83. /**
  84. * Does the work of emitting the buffer from the record, ensuring that the
  85. * record is removed before the emission so reentrant code (from some custom scheduling, perhaps)
  86. * does not alter the buffer. Also checks to see if a new buffer needs to be started
  87. * after the emit.
  88. */
  89. const emit = (record: { buffer: T[]; subs: Subscription }) => {
  90. const { buffer, subs } = record;
  91. subs.unsubscribe();
  92. arrRemove(bufferRecords, record);
  93. subscriber.next(buffer);
  94. restartOnEmit && startBuffer();
  95. };
  96. /**
  97. * Called every time we start a new buffer. This does
  98. * the work of scheduling a job at the requested bufferTimeSpan
  99. * that will emit the buffer (if it's not unsubscribed before then).
  100. */
  101. const startBuffer = () => {
  102. if (bufferRecords) {
  103. const subs = new Subscription();
  104. subscriber.add(subs);
  105. const buffer: T[] = [];
  106. const record = {
  107. buffer,
  108. subs,
  109. };
  110. bufferRecords.push(record);
  111. executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan);
  112. }
  113. };
  114. if (bufferCreationInterval !== null && bufferCreationInterval >= 0) {
  115. // The user passed both a bufferTimeSpan (required), and a creation interval
  116. // That means we need to start new buffers on the interval, and those buffers need
  117. // to wait the required time span before emitting.
  118. executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true);
  119. } else {
  120. restartOnEmit = true;
  121. }
  122. startBuffer();
  123. const bufferTimeSubscriber = createOperatorSubscriber(
  124. subscriber,
  125. (value: T) => {
  126. // Copy the records, so if we need to remove one we
  127. // don't mutate the array. It's hard, but not impossible to
  128. // set up a buffer time that could mutate the array and
  129. // cause issues here.
  130. const recordsCopy = bufferRecords!.slice();
  131. for (const record of recordsCopy) {
  132. // Loop over all buffers and
  133. const { buffer } = record;
  134. buffer.push(value);
  135. // If the buffer is over the max size, we need to emit it.
  136. maxBufferSize <= buffer.length && emit(record);
  137. }
  138. },
  139. () => {
  140. // The source completed, emit all of the active
  141. // buffers we have before we complete.
  142. while (bufferRecords?.length) {
  143. subscriber.next(bufferRecords.shift()!.buffer);
  144. }
  145. bufferTimeSubscriber?.unsubscribe();
  146. subscriber.complete();
  147. subscriber.unsubscribe();
  148. },
  149. // Pass all errors through to consumer.
  150. undefined,
  151. // Clean up
  152. () => (bufferRecords = null)
  153. );
  154. source.subscribe(bufferTimeSubscriber);
  155. });
  156. }