4ff608173981286283f11d37151c65bc03ddcefefd67be663e4332f8d80251c36a948ac11a6be799fc2e7896a7548b71f70ff30d5eab0c2dfb6753bd5351cc 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import { Subject } from './Subject';
  2. import { TimestampProvider } from './types';
  3. import { Subscriber } from './Subscriber';
  4. import { Subscription } from './Subscription';
  5. import { dateTimestampProvider } from './scheduler/dateTimestampProvider';
  6. /**
  7. * A variant of {@link Subject} that "replays" old values to new subscribers by emitting them when they first subscribe.
  8. *
  9. * `ReplaySubject` has an internal buffer that will store a specified number of values that it has observed. Like `Subject`,
  10. * `ReplaySubject` "observes" values by having them passed to its `next` method. When it observes a value, it will store that
  11. * value for a time determined by the configuration of the `ReplaySubject`, as passed to its constructor.
  12. *
  13. * When a new subscriber subscribes to the `ReplaySubject` instance, it will synchronously emit all values in its buffer in
  14. * a First-In-First-Out (FIFO) manner. The `ReplaySubject` will also complete, if it has observed completion; and it will
  15. * error if it has observed an error.
  16. *
  17. * There are two main configuration items to be concerned with:
  18. *
  19. * 1. `bufferSize` - This will determine how many items are stored in the buffer, defaults to infinite.
  20. * 2. `windowTime` - The amount of time to hold a value in the buffer before removing it from the buffer.
  21. *
  22. * Both configurations may exist simultaneously. So if you would like to buffer a maximum of 3 values, as long as the values
  23. * are less than 2 seconds old, you could do so with a `new ReplaySubject(3, 2000)`.
  24. *
  25. * ### Differences with BehaviorSubject
  26. *
  27. * `BehaviorSubject` is similar to `new ReplaySubject(1)`, with a couple of exceptions:
  28. *
  29. * 1. `BehaviorSubject` comes "primed" with a single value upon construction.
  30. * 2. `ReplaySubject` will replay values, even after observing an error, where `BehaviorSubject` will not.
  31. *
  32. * @see {@link Subject}
  33. * @see {@link BehaviorSubject}
  34. * @see {@link shareReplay}
  35. */
  36. export class ReplaySubject<T> extends Subject<T> {
  37. private _buffer: (T | number)[] = [];
  38. private _infiniteTimeWindow = true;
  39. /**
  40. * @param _bufferSize The size of the buffer to replay on subscription
  41. * @param _windowTime The amount of time the buffered items will stay buffered
  42. * @param _timestampProvider An object with a `now()` method that provides the current timestamp. This is used to
  43. * calculate the amount of time something has been buffered.
  44. */
  45. constructor(
  46. private _bufferSize = Infinity,
  47. private _windowTime = Infinity,
  48. private _timestampProvider: TimestampProvider = dateTimestampProvider
  49. ) {
  50. super();
  51. this._infiniteTimeWindow = _windowTime === Infinity;
  52. this._bufferSize = Math.max(1, _bufferSize);
  53. this._windowTime = Math.max(1, _windowTime);
  54. }
  55. next(value: T): void {
  56. const { isStopped, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
  57. if (!isStopped) {
  58. _buffer.push(value);
  59. !_infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime);
  60. }
  61. this._trimBuffer();
  62. super.next(value);
  63. }
  64. /** @internal */
  65. protected _subscribe(subscriber: Subscriber<T>): Subscription {
  66. this._throwIfClosed();
  67. this._trimBuffer();
  68. const subscription = this._innerSubscribe(subscriber);
  69. const { _infiniteTimeWindow, _buffer } = this;
  70. // We use a copy here, so reentrant code does not mutate our array while we're
  71. // emitting it to a new subscriber.
  72. const copy = _buffer.slice();
  73. for (let i = 0; i < copy.length && !subscriber.closed; i += _infiniteTimeWindow ? 1 : 2) {
  74. subscriber.next(copy[i] as T);
  75. }
  76. this._checkFinalizedStatuses(subscriber);
  77. return subscription;
  78. }
  79. private _trimBuffer() {
  80. const { _bufferSize, _timestampProvider, _buffer, _infiniteTimeWindow } = this;
  81. // If we don't have an infinite buffer size, and we're over the length,
  82. // use splice to truncate the old buffer values off. Note that we have to
  83. // double the size for instances where we're not using an infinite time window
  84. // because we're storing the values and the timestamps in the same array.
  85. const adjustedBufferSize = (_infiniteTimeWindow ? 1 : 2) * _bufferSize;
  86. _bufferSize < Infinity && adjustedBufferSize < _buffer.length && _buffer.splice(0, _buffer.length - adjustedBufferSize);
  87. // Now, if we're not in an infinite time window, remove all values where the time is
  88. // older than what is allowed.
  89. if (!_infiniteTimeWindow) {
  90. const now = _timestampProvider.now();
  91. let last = 0;
  92. // Search the array for the first timestamp that isn't expired and
  93. // truncate the buffer up to that point.
  94. for (let i = 1; i < _buffer.length && (_buffer[i] as number) <= now; i += 2) {
  95. last = i;
  96. }
  97. last && _buffer.splice(0, last + 1);
  98. }
  99. }
  100. }