fd3d5673de7452bb1a44eb13b5ee0c63e5a03e773848e398e187dc3d231511e28029001fb1ac68bda70d4d9b80c9e83e61b0b746c358c9e7494a4deb9c7b51 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import { Observable } from '../Observable';
  2. import { ReplaySubject } from '../ReplaySubject';
  3. import { Subscription } from '../Subscription';
  4. import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
  5. import { Subscriber } from '../Subscriber';
  6. export interface ShareReplayConfig {
  7. bufferSize?: number;
  8. windowTime?: number;
  9. refCount: boolean;
  10. scheduler?: SchedulerLike;
  11. }
  12. /**
  13. * Share source and replay specified number of emissions on subscription.
  14. *
  15. * This operator is a specialization of `replay` that connects to a source observable
  16. * and multicasts through a `ReplaySubject` constructed with the specified arguments.
  17. * A successfully completed source will stay cached in the `shareReplayed observable` forever,
  18. * but an errored source can be retried.
  19. *
  20. * ## Why use shareReplay?
  21. * You generally want to use `shareReplay` when you have side-effects or taxing computations
  22. * that you do not wish to be executed amongst multiple subscribers.
  23. * It may also be valuable in situations where you know you will have late subscribers to
  24. * a stream that need access to previously emitted values.
  25. * This ability to replay values on subscription is what differentiates {@link share} and `shareReplay`.
  26. *
  27. * ![](shareReplay.png)
  28. *
  29. * ## Example
  30. * ```ts
  31. * import { interval } from 'rxjs';
  32. * import { shareReplay, take } from 'rxjs/operators';
  33. *
  34. * const obs$ = interval(1000);
  35. * const shared$ = obs$.pipe(
  36. * take(4),
  37. * shareReplay(3)
  38. * );
  39. * shared$.subscribe(x => console.log('source A: ', x));
  40. * shared$.subscribe(y => console.log('source B: ', y));
  41. *
  42. * ```
  43. *
  44. * @see {@link publish}
  45. * @see {@link share}
  46. * @see {@link publishReplay}
  47. *
  48. * @param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
  49. * @param {Number} [windowTime=Number.POSITIVE_INFINITY] Maximum time length of the replay buffer in milliseconds.
  50. * @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function
  51. * will be invoked on.
  52. * @return {Observable} An observable sequence that contains the elements of a sequence produced
  53. * by multicasting the source sequence within a selector function.
  54. * @method shareReplay
  55. * @owner Observable
  56. */
  57. export function shareReplay<T>(
  58. config: ShareReplayConfig
  59. ): MonoTypeOperatorFunction<T>;
  60. export function shareReplay<T>(
  61. bufferSize?: number,
  62. windowTime?: number,
  63. scheduler?: SchedulerLike
  64. ): MonoTypeOperatorFunction<T>;
  65. export function shareReplay<T>(
  66. configOrBufferSize?: ShareReplayConfig | number,
  67. windowTime?: number,
  68. scheduler?: SchedulerLike
  69. ): MonoTypeOperatorFunction<T> {
  70. let config: ShareReplayConfig;
  71. if (configOrBufferSize && typeof configOrBufferSize === 'object') {
  72. config = configOrBufferSize as ShareReplayConfig;
  73. } else {
  74. config = {
  75. bufferSize: configOrBufferSize as number | undefined,
  76. windowTime,
  77. refCount: false,
  78. scheduler,
  79. };
  80. }
  81. return (source: Observable<T>) => source.lift(shareReplayOperator(config));
  82. }
  83. function shareReplayOperator<T>({
  84. bufferSize = Number.POSITIVE_INFINITY,
  85. windowTime = Number.POSITIVE_INFINITY,
  86. refCount: useRefCount,
  87. scheduler,
  88. }: ShareReplayConfig) {
  89. let subject: ReplaySubject<T> | undefined;
  90. let refCount = 0;
  91. let subscription: Subscription | undefined;
  92. let hasError = false;
  93. let isComplete = false;
  94. return function shareReplayOperation(
  95. this: Subscriber<T>,
  96. source: Observable<T>
  97. ) {
  98. refCount++;
  99. let innerSub: Subscription;
  100. if (!subject || hasError) {
  101. hasError = false;
  102. subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
  103. innerSub = subject.subscribe(this);
  104. subscription = source.subscribe({
  105. next(value) {
  106. subject.next(value);
  107. },
  108. error(err) {
  109. hasError = true;
  110. subject.error(err);
  111. },
  112. complete() {
  113. isComplete = true;
  114. subscription = undefined;
  115. subject.complete();
  116. },
  117. });
  118. // Here we need to check to see if the source synchronously completed. Although
  119. // we're setting `subscription = undefined` in the completion handler, if the source
  120. // is synchronous, that will happen *before* subscription is set by the return of
  121. // the `subscribe` call.
  122. if (isComplete) {
  123. subscription = undefined;
  124. }
  125. } else {
  126. innerSub = subject.subscribe(this);
  127. }
  128. this.add(() => {
  129. refCount--;
  130. innerSub.unsubscribe();
  131. innerSub = undefined;
  132. if (subscription && !isComplete && useRefCount && refCount === 0) {
  133. subscription.unsubscribe();
  134. subscription = undefined;
  135. subject = undefined;
  136. }
  137. });
  138. };
  139. }