57ba8d03054ccf2df2091b635bda42090f8ac8a0a94d140bb17eac60683f2eef54b1018ab72fac748a8d36e199949ef79fec27700876767c10c110451ec08c 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import { Observable } from '../Observable';
  2. import { Subject } from '../Subject';
  3. import { OperatorFunction } from '../types';
  4. import { operate } from '../util/lift';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. /**
  7. * Branch out the source Observable values as a nested Observable with each
  8. * nested Observable emitting at most `windowSize` values.
  9. *
  10. * <span class="informal">It's like {@link bufferCount}, but emits a nested
  11. * Observable instead of an array.</span>
  12. *
  13. * ![](windowCount.png)
  14. *
  15. * Returns an Observable that emits windows of items it collects from the source
  16. * Observable. The output Observable emits windows every `startWindowEvery`
  17. * items, each containing no more than `windowSize` items. When the source
  18. * Observable completes or encounters an error, the output Observable emits
  19. * the current window and propagates the notification from the source
  20. * Observable. If `startWindowEvery` is not provided, then new windows are
  21. * started immediately at the start of the source and when each window completes
  22. * with size `windowSize`.
  23. *
  24. * ## Examples
  25. *
  26. * Ignore every 3rd click event, starting from the first one
  27. *
  28. * ```ts
  29. * import { fromEvent, windowCount, map, skip, mergeAll } from 'rxjs';
  30. *
  31. * const clicks = fromEvent(document, 'click');
  32. * const result = clicks.pipe(
  33. * windowCount(3),
  34. * map(win => win.pipe(skip(1))), // skip first of every 3 clicks
  35. * mergeAll() // flatten the Observable-of-Observables
  36. * );
  37. * result.subscribe(x => console.log(x));
  38. * ```
  39. *
  40. * Ignore every 3rd click event, starting from the third one
  41. *
  42. * ```ts
  43. * import { fromEvent, windowCount, mergeAll } from 'rxjs';
  44. *
  45. * const clicks = fromEvent(document, 'click');
  46. * const result = clicks.pipe(
  47. * windowCount(2, 3),
  48. * mergeAll() // flatten the Observable-of-Observables
  49. * );
  50. * result.subscribe(x => console.log(x));
  51. * ```
  52. *
  53. * @see {@link window}
  54. * @see {@link windowTime}
  55. * @see {@link windowToggle}
  56. * @see {@link windowWhen}
  57. * @see {@link bufferCount}
  58. *
  59. * @param windowSize The maximum number of values emitted by each window.
  60. * @param startWindowEvery Interval at which to start a new window. For example
  61. * if `startWindowEvery` is `2`, then a new window will be started on every
  62. * other value from the source. A new window is started at the beginning of the
  63. * source by default.
  64. * @return A function that returns an Observable of windows, which in turn are
  65. * Observable of values.
  66. */
  67. export function windowCount<T>(windowSize: number, startWindowEvery: number = 0): OperatorFunction<T, Observable<T>> {
  68. const startEvery = startWindowEvery > 0 ? startWindowEvery : windowSize;
  69. return operate((source, subscriber) => {
  70. let windows = [new Subject<T>()];
  71. let starts: number[] = [];
  72. let count = 0;
  73. // Open the first window.
  74. subscriber.next(windows[0].asObservable());
  75. source.subscribe(
  76. createOperatorSubscriber(
  77. subscriber,
  78. (value: T) => {
  79. // Emit the value through all current windows.
  80. // We don't need to create a new window yet, we
  81. // do that as soon as we close one.
  82. for (const window of windows) {
  83. window.next(value);
  84. }
  85. // Here we're using the size of the window array to figure
  86. // out if the oldest window has emitted enough values. We can do this
  87. // because the size of the window array is a function of the values
  88. // seen by the subscription. If it's time to close it, we complete
  89. // it and remove it.
  90. const c = count - windowSize + 1;
  91. if (c >= 0 && c % startEvery === 0) {
  92. windows.shift()!.complete();
  93. }
  94. // Look to see if the next count tells us it's time to open a new window.
  95. // TODO: We need to figure out if this really makes sense. We're technically
  96. // emitting windows *before* we have a value to emit them for. It's probably
  97. // more expected that we should be emitting the window when the start
  98. // count is reached -- not before.
  99. if (++count % startEvery === 0) {
  100. const window = new Subject<T>();
  101. windows.push(window);
  102. subscriber.next(window.asObservable());
  103. }
  104. },
  105. () => {
  106. while (windows.length > 0) {
  107. windows.shift()!.complete();
  108. }
  109. subscriber.complete();
  110. },
  111. (err) => {
  112. while (windows.length > 0) {
  113. windows.shift()!.error(err);
  114. }
  115. subscriber.error(err);
  116. },
  117. () => {
  118. starts = null!;
  119. windows = null!;
  120. }
  121. )
  122. );
  123. });
  124. }