8d2066e34f59ed1469099607ff2268a853fe0a81880ccb9ff867a4d3857fdc070808ed9f702a0119c8366b13a3808fd1d45ab3b4839ac129ae518473db3354 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import { OperatorFunction, ObservableInput } from '../types';
  2. import { operate } from '../util/lift';
  3. import { noop } from '../util/noop';
  4. import { createOperatorSubscriber } from './OperatorSubscriber';
  5. import { innerFrom } from '../observable/innerFrom';
  6. /**
  7. * Buffers the source Observable values until `closingNotifier` emits.
  8. *
  9. * <span class="informal">Collects values from the past as an array, and emits
  10. * that array only when another Observable emits.</span>
  11. *
  12. * ![](buffer.png)
  13. *
  14. * Buffers the incoming Observable values until the given `closingNotifier`
  15. * `ObservableInput` (that internally gets converted to an Observable)
  16. * emits a value, at which point it emits the buffer on the output
  17. * Observable and starts a new buffer internally, awaiting the next time
  18. * `closingNotifier` emits.
  19. *
  20. * ## Example
  21. *
  22. * On every click, emit array of most recent interval events
  23. *
  24. * ```ts
  25. * import { fromEvent, interval, buffer } from 'rxjs';
  26. *
  27. * const clicks = fromEvent(document, 'click');
  28. * const intervalEvents = interval(1000);
  29. * const buffered = intervalEvents.pipe(buffer(clicks));
  30. * buffered.subscribe(x => console.log(x));
  31. * ```
  32. *
  33. * @see {@link bufferCount}
  34. * @see {@link bufferTime}
  35. * @see {@link bufferToggle}
  36. * @see {@link bufferWhen}
  37. * @see {@link window}
  38. *
  39. * @param closingNotifier An `ObservableInput` that signals the
  40. * buffer to be emitted on the output Observable.
  41. * @return A function that returns an Observable of buffers, which are arrays
  42. * of values.
  43. */
  44. export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]> {
  45. return operate((source, subscriber) => {
  46. // The current buffered values.
  47. let currentBuffer: T[] = [];
  48. // Subscribe to our source.
  49. source.subscribe(
  50. createOperatorSubscriber(
  51. subscriber,
  52. (value) => currentBuffer.push(value),
  53. () => {
  54. subscriber.next(currentBuffer);
  55. subscriber.complete();
  56. }
  57. )
  58. );
  59. // Subscribe to the closing notifier.
  60. innerFrom(closingNotifier).subscribe(
  61. createOperatorSubscriber(
  62. subscriber,
  63. () => {
  64. // Start a new buffer and emit the previous one.
  65. const b = currentBuffer;
  66. currentBuffer = [];
  67. subscriber.next(b);
  68. },
  69. noop
  70. )
  71. );
  72. return () => {
  73. // Ensure buffered values are released on finalization.
  74. currentBuffer = null!;
  75. };
  76. });
  77. }