c264dcdbe0de7af6620f08bd1c33fc2926a924413166a12bcac4c82c2d1f91dd4851217adecb7effc2760312f59bc00f90a9cd4d09a83ca42b436d7d29df10 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import { ReplaySubject } from '../ReplaySubject';
  2. import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
  3. import { share } from './share';
  4. export interface ShareReplayConfig {
  5. bufferSize?: number;
  6. windowTime?: number;
  7. refCount: boolean;
  8. scheduler?: SchedulerLike;
  9. }
  10. export function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;
  11. export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
  12. /**
  13. * Share source and replay specified number of emissions on subscription.
  14. *
  15. * This operator is a specialization of `replay` that connects to a source observable
  16. * and multicasts through a `ReplaySubject` constructed with the specified arguments.
  17. * A successfully completed source will stay cached in the `shareReplay`ed observable forever,
  18. * but an errored source can be retried.
  19. *
  20. * ## Why use `shareReplay`?
  21. *
  22. * You generally want to use `shareReplay` when you have side-effects or taxing computations
  23. * that you do not wish to be executed amongst multiple subscribers.
  24. * It may also be valuable in situations where you know you will have late subscribers to
  25. * a stream that need access to previously emitted values.
  26. * This ability to replay values on subscription is what differentiates {@link share} and `shareReplay`.
  27. *
  28. * ## Reference counting
  29. *
  30. * By default `shareReplay` will use `refCount` of false, meaning that it will _not_ unsubscribe the
  31. * source when the reference counter drops to zero, i.e. the inner `ReplaySubject` will _not_ be unsubscribed
  32. * (and potentially run for ever).
  33. * This is the default as it is expected that `shareReplay` is often used to keep around expensive to setup
  34. * observables which we want to keep running instead of having to do the expensive setup again.
  35. *
  36. * As of RXJS version 6.4.0 a new overload signature was added to allow for manual control over what
  37. * happens when the operators internal reference counter drops to zero.
  38. * If `refCount` is true, the source will be unsubscribed from once the reference count drops to zero, i.e.
  39. * the inner `ReplaySubject` will be unsubscribed. All new subscribers will receive value emissions from a
  40. * new `ReplaySubject` which in turn will cause a new subscription to the source observable.
  41. *
  42. * ## Examples
  43. *
  44. * Example with a third subscriber coming late to the party
  45. *
  46. * ```ts
  47. * import { interval, take, shareReplay } from 'rxjs';
  48. *
  49. * const shared$ = interval(2000).pipe(
  50. * take(6),
  51. * shareReplay(3)
  52. * );
  53. *
  54. * shared$.subscribe(x => console.log('sub A: ', x));
  55. * shared$.subscribe(y => console.log('sub B: ', y));
  56. *
  57. * setTimeout(() => {
  58. * shared$.subscribe(y => console.log('sub C: ', y));
  59. * }, 11000);
  60. *
  61. * // Logs:
  62. * // (after ~2000 ms)
  63. * // sub A: 0
  64. * // sub B: 0
  65. * // (after ~4000 ms)
  66. * // sub A: 1
  67. * // sub B: 1
  68. * // (after ~6000 ms)
  69. * // sub A: 2
  70. * // sub B: 2
  71. * // (after ~8000 ms)
  72. * // sub A: 3
  73. * // sub B: 3
  74. * // (after ~10000 ms)
  75. * // sub A: 4
  76. * // sub B: 4
  77. * // (after ~11000 ms, sub C gets the last 3 values)
  78. * // sub C: 2
  79. * // sub C: 3
  80. * // sub C: 4
  81. * // (after ~12000 ms)
  82. * // sub A: 5
  83. * // sub B: 5
  84. * // sub C: 5
  85. * ```
  86. *
  87. * Example for `refCount` usage
  88. *
  89. * ```ts
  90. * import { Observable, tap, interval, shareReplay, take } from 'rxjs';
  91. *
  92. * const log = <T>(name: string, source: Observable<T>) => source.pipe(
  93. * tap({
  94. * subscribe: () => console.log(`${ name }: subscribed`),
  95. * next: value => console.log(`${ name }: ${ value }`),
  96. * complete: () => console.log(`${ name }: completed`),
  97. * finalize: () => console.log(`${ name }: unsubscribed`)
  98. * })
  99. * );
  100. *
  101. * const obs$ = log('source', interval(1000));
  102. *
  103. * const shared$ = log('shared', obs$.pipe(
  104. * shareReplay({ bufferSize: 1, refCount: true }),
  105. * take(2)
  106. * ));
  107. *
  108. * shared$.subscribe(x => console.log('sub A: ', x));
  109. * shared$.subscribe(y => console.log('sub B: ', y));
  110. *
  111. * // PRINTS:
  112. * // shared: subscribed <-- reference count = 1
  113. * // source: subscribed
  114. * // shared: subscribed <-- reference count = 2
  115. * // source: 0
  116. * // shared: 0
  117. * // sub A: 0
  118. * // shared: 0
  119. * // sub B: 0
  120. * // source: 1
  121. * // shared: 1
  122. * // sub A: 1
  123. * // shared: completed <-- take(2) completes the subscription for sub A
  124. * // shared: unsubscribed <-- reference count = 1
  125. * // shared: 1
  126. * // sub B: 1
  127. * // shared: completed <-- take(2) completes the subscription for sub B
  128. * // shared: unsubscribed <-- reference count = 0
  129. * // source: unsubscribed <-- replaySubject unsubscribes from source observable because the reference count dropped to 0 and refCount is true
  130. *
  131. * // In case of refCount being false, the unsubscribe is never called on the source and the source would keep on emitting, even if no subscribers
  132. * // are listening.
  133. * // source: 2
  134. * // source: 3
  135. * // source: 4
  136. * // ...
  137. * ```
  138. *
  139. * @see {@link publish}
  140. * @see {@link share}
  141. * @see {@link publishReplay}
  142. *
  143. * @param configOrBufferSize Maximum element count of the replay buffer or {@link ShareReplayConfig configuration}
  144. * object.
  145. * @param windowTime Maximum time length of the replay buffer in milliseconds.
  146. * @param scheduler Scheduler where connected observers within the selector function
  147. * will be invoked on.
  148. * @return A function that returns an Observable sequence that contains the
  149. * elements of a sequence produced by multicasting the source sequence within a
  150. * selector function.
  151. */
  152. export function shareReplay<T>(
  153. configOrBufferSize?: ShareReplayConfig | number,
  154. windowTime?: number,
  155. scheduler?: SchedulerLike
  156. ): MonoTypeOperatorFunction<T> {
  157. let bufferSize: number;
  158. let refCount = false;
  159. if (configOrBufferSize && typeof configOrBufferSize === 'object') {
  160. ({ bufferSize = Infinity, windowTime = Infinity, refCount = false, scheduler } = configOrBufferSize);
  161. } else {
  162. bufferSize = (configOrBufferSize ?? Infinity) as number;
  163. }
  164. return share<T>({
  165. connector: () => new ReplaySubject(bufferSize, windowTime, scheduler),
  166. resetOnError: true,
  167. resetOnComplete: false,
  168. resetOnRefCountZero: refCount,
  169. });
  170. }