1d8461b72c6296f8e73de8e566e9d3164212d3e5a0925b1b5f0a96e2673494c055be29f9f4359578c4dcffc911d39477c06ba99cec486d9d3927379fa5a364 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import { Observable } from '../Observable';
  2. import { OperatorFunction, ObservableInput } from '../types';
  3. import { Subject } from '../Subject';
  4. import { operate } from '../util/lift';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. import { noop } from '../util/noop';
  7. import { innerFrom } from '../observable/innerFrom';
  8. /**
  9. * Branch out the source Observable values as a nested Observable whenever
  10. * `windowBoundaries` emits.
  11. *
  12. * <span class="informal">It's like {@link buffer}, but emits a nested Observable
  13. * instead of an array.</span>
  14. *
  15. * ![](window.png)
  16. *
  17. * Returns an Observable that emits windows of items it collects from the source
  18. * Observable. The output Observable emits connected, non-overlapping
  19. * windows. It emits the current window and opens a new one whenever the
  20. * `windowBoundaries` emits an item. `windowBoundaries` can be any type that
  21. * `ObservableInput` accepts. It internally gets converted to an Observable.
  22. * Because each window is an Observable, the output is a higher-order Observable.
  23. *
  24. * ## Example
  25. *
  26. * In every window of 1 second each, emit at most 2 click events
  27. *
  28. * ```ts
  29. * import { fromEvent, interval, window, map, take, mergeAll } from 'rxjs';
  30. *
  31. * const clicks = fromEvent(document, 'click');
  32. * const sec = interval(1000);
  33. * const result = clicks.pipe(
  34. * window(sec),
  35. * map(win => win.pipe(take(2))), // take at most 2 emissions from each window
  36. * mergeAll() // flatten the Observable-of-Observables
  37. * );
  38. * result.subscribe(x => console.log(x));
  39. * ```
  40. *
  41. * @see {@link windowCount}
  42. * @see {@link windowTime}
  43. * @see {@link windowToggle}
  44. * @see {@link windowWhen}
  45. * @see {@link buffer}
  46. *
  47. * @param windowBoundaries An `ObservableInput` that completes the
  48. * previous window and starts a new window.
  49. * @return A function that returns an Observable of windows, which are
  50. * Observables emitting values of the source Observable.
  51. */
  52. export function window<T>(windowBoundaries: ObservableInput<any>): OperatorFunction<T, Observable<T>> {
  53. return operate((source, subscriber) => {
  54. let windowSubject: Subject<T> = new Subject<T>();
  55. subscriber.next(windowSubject.asObservable());
  56. const errorHandler = (err: any) => {
  57. windowSubject.error(err);
  58. subscriber.error(err);
  59. };
  60. // Subscribe to our source
  61. source.subscribe(
  62. createOperatorSubscriber(
  63. subscriber,
  64. (value) => windowSubject?.next(value),
  65. () => {
  66. windowSubject.complete();
  67. subscriber.complete();
  68. },
  69. errorHandler
  70. )
  71. );
  72. // Subscribe to the window boundaries.
  73. innerFrom(windowBoundaries).subscribe(
  74. createOperatorSubscriber(
  75. subscriber,
  76. () => {
  77. windowSubject.complete();
  78. subscriber.next((windowSubject = new Subject()));
  79. },
  80. noop,
  81. errorHandler
  82. )
  83. );
  84. return () => {
  85. // Unsubscribing the subject ensures that anyone who has captured
  86. // a reference to this window that tries to use it after it can
  87. // no longer get values from the source will get an ObjectUnsubscribedError.
  88. windowSubject?.unsubscribe();
  89. windowSubject = null!;
  90. };
  91. });
  92. }