334ff1aa5e28cbddc353bea92a5abf25655dcc826f73ad9ca31af1c21c9fe5ebf2b03ecbf5610176d31981e0f601020c6a958d422d4b030e30ed631d5b567a 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import { Subscription } from '../Subscription';
  2. import { OperatorFunction, ObservableInput } from '../types';
  3. import { operate } from '../util/lift';
  4. import { innerFrom } from '../observable/innerFrom';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. import { noop } from '../util/noop';
  7. import { arrRemove } from '../util/arrRemove';
  8. /**
  9. * Buffers the source Observable values starting from an emission from
  10. * `openings` and ending when the output of `closingSelector` emits.
  11. *
  12. * <span class="informal">Collects values from the past as an array. Starts
  13. * collecting only when `opening` emits, and calls the `closingSelector`
  14. * function to get an Observable that tells when to close the buffer.</span>
  15. *
  16. * ![](bufferToggle.png)
  17. *
  18. * Buffers values from the source by opening the buffer via signals from an
  19. * Observable provided to `openings`, and closing and sending the buffers when
  20. * a Subscribable or Promise returned by the `closingSelector` function emits.
  21. *
  22. * ## Example
  23. *
  24. * Every other second, emit the click events from the next 500ms
  25. *
  26. * ```ts
  27. * import { fromEvent, interval, bufferToggle, EMPTY } from 'rxjs';
  28. *
  29. * const clicks = fromEvent(document, 'click');
  30. * const openings = interval(1000);
  31. * const buffered = clicks.pipe(bufferToggle(openings, i =>
  32. * i % 2 ? interval(500) : EMPTY
  33. * ));
  34. * buffered.subscribe(x => console.log(x));
  35. * ```
  36. *
  37. * @see {@link buffer}
  38. * @see {@link bufferCount}
  39. * @see {@link bufferTime}
  40. * @see {@link bufferWhen}
  41. * @see {@link windowToggle}
  42. *
  43. * @param openings A Subscribable or Promise of notifications to start new
  44. * buffers.
  45. * @param closingSelector A function that takes
  46. * the value emitted by the `openings` observable and returns a Subscribable or Promise,
  47. * which, when it emits, signals that the associated buffer should be emitted
  48. * and cleared.
  49. * @return A function that returns an Observable of arrays of buffered values.
  50. */
  51. export function bufferToggle<T, O>(
  52. openings: ObservableInput<O>,
  53. closingSelector: (value: O) => ObservableInput<any>
  54. ): OperatorFunction<T, T[]> {
  55. return operate((source, subscriber) => {
  56. const buffers: T[][] = [];
  57. // Subscribe to the openings notifier first
  58. innerFrom(openings).subscribe(
  59. createOperatorSubscriber(
  60. subscriber,
  61. (openValue) => {
  62. const buffer: T[] = [];
  63. buffers.push(buffer);
  64. // We use this composite subscription, so that
  65. // when the closing notifier emits, we can tear it down.
  66. const closingSubscription = new Subscription();
  67. const emitBuffer = () => {
  68. arrRemove(buffers, buffer);
  69. subscriber.next(buffer);
  70. closingSubscription.unsubscribe();
  71. };
  72. // The line below will add the subscription to the parent subscriber *and* the closing subscription.
  73. closingSubscription.add(innerFrom(closingSelector(openValue)).subscribe(createOperatorSubscriber(subscriber, emitBuffer, noop)));
  74. },
  75. noop
  76. )
  77. );
  78. source.subscribe(
  79. createOperatorSubscriber(
  80. subscriber,
  81. (value) => {
  82. // Value from our source. Add it to all pending buffers.
  83. for (const buffer of buffers) {
  84. buffer.push(value);
  85. }
  86. },
  87. () => {
  88. // Source complete. Emit all pending buffers.
  89. while (buffers.length > 0) {
  90. subscriber.next(buffers.shift()!);
  91. }
  92. subscriber.complete();
  93. }
  94. )
  95. );
  96. });
  97. }