936b8b14572b76d24f5a83ef7b52eeab2a1b0a44d5437dd4cdf8c5acbe1bdcb9a4a84940903d53845826c732f88f6fc6b90d4252883a6f798f6ef64ed4bd12 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import { Observable } from '../Observable';
  2. import { Subject } from '../Subject';
  3. import { Subscription } from '../Subscription';
  4. import { ObservableInput, OperatorFunction } from '../types';
  5. import { operate } from '../util/lift';
  6. import { innerFrom } from '../observable/innerFrom';
  7. import { createOperatorSubscriber } from './OperatorSubscriber';
  8. import { noop } from '../util/noop';
  9. import { arrRemove } from '../util/arrRemove';
  10. /**
  11. * Branch out the source Observable values as a nested Observable starting from
  12. * an emission from `openings` and ending when the output of `closingSelector`
  13. * emits.
  14. *
  15. * <span class="informal">It's like {@link bufferToggle}, but emits a nested
  16. * Observable instead of an array.</span>
  17. *
  18. * ![](windowToggle.png)
  19. *
  20. * Returns an Observable that emits windows of items it collects from the source
  21. * Observable. The output Observable emits windows that contain those items
  22. * emitted by the source Observable between the time when the `openings`
  23. * Observable emits an item and when the Observable returned by
  24. * `closingSelector` emits an item.
  25. *
  26. * ## Example
  27. *
  28. * Every other second, emit the click events from the next 500ms
  29. *
  30. * ```ts
  31. * import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs';
  32. *
  33. * const clicks = fromEvent(document, 'click');
  34. * const openings = interval(1000);
  35. * const result = clicks.pipe(
  36. * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
  37. * mergeAll()
  38. * );
  39. * result.subscribe(x => console.log(x));
  40. * ```
  41. *
  42. * @see {@link window}
  43. * @see {@link windowCount}
  44. * @see {@link windowTime}
  45. * @see {@link windowWhen}
  46. * @see {@link bufferToggle}
  47. *
  48. * @param openings An observable of notifications to start new windows.
  49. * @param closingSelector A function that takes the value emitted by the
  50. * `openings` observable and returns an Observable, which, when it emits a next
  51. * notification, signals that the associated window should complete.
  52. * @return A function that returns an Observable of windows, which in turn are
  53. * Observables.
  54. */
  55. export function windowToggle<T, O>(
  56. openings: ObservableInput<O>,
  57. closingSelector: (openValue: O) => ObservableInput<any>
  58. ): OperatorFunction<T, Observable<T>> {
  59. return operate((source, subscriber) => {
  60. const windows: Subject<T>[] = [];
  61. const handleError = (err: any) => {
  62. while (0 < windows.length) {
  63. windows.shift()!.error(err);
  64. }
  65. subscriber.error(err);
  66. };
  67. innerFrom(openings).subscribe(
  68. createOperatorSubscriber(
  69. subscriber,
  70. (openValue) => {
  71. const window = new Subject<T>();
  72. windows.push(window);
  73. const closingSubscription = new Subscription();
  74. const closeWindow = () => {
  75. arrRemove(windows, window);
  76. window.complete();
  77. closingSubscription.unsubscribe();
  78. };
  79. let closingNotifier: Observable<any>;
  80. try {
  81. closingNotifier = innerFrom(closingSelector(openValue));
  82. } catch (err) {
  83. handleError(err);
  84. return;
  85. }
  86. subscriber.next(window.asObservable());
  87. closingSubscription.add(closingNotifier.subscribe(createOperatorSubscriber(subscriber, closeWindow, noop, handleError)));
  88. },
  89. noop
  90. )
  91. );
  92. // Subscribe to the source to get things started.
  93. source.subscribe(
  94. createOperatorSubscriber(
  95. subscriber,
  96. (value: T) => {
  97. // Copy the windows array before we emit to
  98. // make sure we don't have issues with reentrant code.
  99. const windowsCopy = windows.slice();
  100. for (const window of windowsCopy) {
  101. window.next(value);
  102. }
  103. },
  104. () => {
  105. // Complete all of our windows before we complete.
  106. while (0 < windows.length) {
  107. windows.shift()!.complete();
  108. }
  109. subscriber.complete();
  110. },
  111. handleError,
  112. () => {
  113. // Add this finalization so that all window subjects are
  114. // disposed of. This way, if a user tries to subscribe
  115. // to a window *after* the outer subscription has been unsubscribed,
  116. // they will get an error, instead of waiting forever to
  117. // see if a value arrives.
  118. while (0 < windows.length) {
  119. windows.shift()!.unsubscribe();
  120. }
  121. }
  122. )
  123. );
  124. });
  125. }