d47de132b5a9e92532878876c1449d8ef11d322c63dd8fa3acce9924d7ea1ad192960aacb1d58231c5b844d71146a9151310a1a7f352506f9d5acaace09fcd 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import { Subscriber } from '../Subscriber';
  2. import { ObservableInput, OperatorFunction } from '../types';
  3. import { operate } from '../util/lift';
  4. import { noop } from '../util/noop';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. import { innerFrom } from '../observable/innerFrom';
  7. /**
  8. * Buffers the source Observable values, using a factory function of closing
  9. * Observables to determine when to close, emit, and reset the buffer.
  10. *
  11. * <span class="informal">Collects values from the past as an array. When it
  12. * starts collecting values, it calls a function that returns an Observable that
  13. * tells when to close the buffer and restart collecting.</span>
  14. *
  15. * ![](bufferWhen.svg)
  16. *
  17. * Opens a buffer immediately, then closes the buffer when the observable
  18. * returned by calling `closingSelector` function emits a value. When it closes
  19. * the buffer, it immediately opens a new buffer and repeats the process.
  20. *
  21. * ## Example
  22. *
  23. * Emit an array of the last clicks every [1-5] random seconds
  24. *
  25. * ```ts
  26. * import { fromEvent, bufferWhen, interval } from 'rxjs';
  27. *
  28. * const clicks = fromEvent(document, 'click');
  29. * const buffered = clicks.pipe(
  30. * bufferWhen(() => interval(1000 + Math.random() * 4000))
  31. * );
  32. * buffered.subscribe(x => console.log(x));
  33. * ```
  34. *
  35. * @see {@link buffer}
  36. * @see {@link bufferCount}
  37. * @see {@link bufferTime}
  38. * @see {@link bufferToggle}
  39. * @see {@link windowWhen}
  40. *
  41. * @param closingSelector A function that takes no arguments and returns an
  42. * Observable that signals buffer closure.
  43. * @return A function that returns an Observable of arrays of buffered values.
  44. */
  45. export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]> {
  46. return operate((source, subscriber) => {
  47. // The buffer we keep and emit.
  48. let buffer: T[] | null = null;
  49. // A reference to the subscriber used to subscribe to
  50. // the closing notifier. We need to hold this so we can
  51. // end the subscription after the first notification.
  52. let closingSubscriber: Subscriber<T> | null = null;
  53. // Ends the previous closing notifier subscription, so it
  54. // terminates after the first emission, then emits
  55. // the current buffer if there is one, starts a new buffer, and starts a
  56. // new closing notifier.
  57. const openBuffer = () => {
  58. // Make sure to finalize the closing subscription, we only cared
  59. // about one notification.
  60. closingSubscriber?.unsubscribe();
  61. // emit the buffer if we have one, and start a new buffer.
  62. const b = buffer;
  63. buffer = [];
  64. b && subscriber.next(b);
  65. // Get a new closing notifier and subscribe to it.
  66. innerFrom(closingSelector()).subscribe((closingSubscriber = createOperatorSubscriber(subscriber, openBuffer, noop)));
  67. };
  68. // Start the first buffer.
  69. openBuffer();
  70. // Subscribe to our source.
  71. source.subscribe(
  72. createOperatorSubscriber(
  73. subscriber,
  74. // Add every new value to the current buffer.
  75. (value) => buffer?.push(value),
  76. // When we complete, emit the buffer if we have one,
  77. // then complete the result.
  78. () => {
  79. buffer && subscriber.next(buffer);
  80. subscriber.complete();
  81. },
  82. // Pass all errors through to consumer.
  83. undefined,
  84. // Release memory on finalization
  85. () => (buffer = closingSubscriber = null!)
  86. )
  87. );
  88. });
  89. }