317fb4c368bcb25ddbe2ec64eceb616ec8c935fc0dfe6e141299a5bcf99bdb9a0c8bcfdc276479d7d8f5b54410a70cb052b0b95d31976d701b05e0adf9e10e 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import { innerFrom } from '../observable/innerFrom';
  2. import { Subject } from '../Subject';
  3. import { SafeSubscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { MonoTypeOperatorFunction, SubjectLike, ObservableInput } from '../types';
  6. import { operate } from '../util/lift';
  7. export interface ShareConfig<T> {
  8. /**
  9. * The factory used to create the subject that will connect the source observable to
  10. * multicast consumers.
  11. */
  12. connector?: () => SubjectLike<T>;
  13. /**
  14. * If `true`, the resulting observable will reset internal state on error from source and return to a "cold" state. This
  15. * allows the resulting observable to be "retried" in the event of an error.
  16. * If `false`, when an error comes from the source it will push the error into the connecting subject, and the subject
  17. * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries
  18. * or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however
  19. * {@link ReplaySubject} will also push its buffered values before pushing the error.
  20. * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
  21. * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
  22. */
  23. resetOnError?: boolean | ((error: any) => ObservableInput<any>);
  24. /**
  25. * If `true`, the resulting observable will reset internal state on completion from source and return to a "cold" state. This
  26. * allows the resulting observable to be "repeated" after it is done.
  27. * If `false`, when the source completes, it will push the completion through the connecting subject, and the subject
  28. * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats
  29. * or resubscriptions will resubscribe to that same subject.
  30. * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
  31. * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
  32. */
  33. resetOnComplete?: boolean | (() => ObservableInput<any>);
  34. /**
  35. * If `true`, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the
  36. * internal state will be reset and the resulting observable will return to a "cold" state. This means that the next
  37. * time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to
  38. * again.
  39. * If `false`, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject
  40. * will remain connected to the source, and new subscriptions to the result will be connected through that same subject.
  41. * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
  42. * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
  43. */
  44. resetOnRefCountZero?: boolean | (() => ObservableInput<any>);
  45. }
  46. export function share<T>(): MonoTypeOperatorFunction<T>;
  47. export function share<T>(options: ShareConfig<T>): MonoTypeOperatorFunction<T>;
  48. /**
  49. * Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one
  50. * Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will
  51. * unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream `hot`.
  52. * This is an alias for `multicast(() => new Subject()), refCount()`.
  53. *
  54. * The subscription to the underlying source Observable can be reset (unsubscribe and resubscribe for new subscribers),
  55. * if the subscriber count to the shared observable drops to 0, or if the source Observable errors or completes. It is
  56. * possible to use notifier factories for the resets to allow for behaviors like conditional or delayed resets. Please
  57. * note that resetting on error or complete of the source Observable does not behave like a transparent retry or restart
  58. * of the source because the error or complete will be forwarded to all subscribers and their subscription will be
  59. * closed. Only new subscribers after a reset on error or complete happened will cause a fresh subscription to the
  60. * source. To achieve transparent retries or restarts pipe the source through appropriate operators before sharing.
  61. *
  62. * ![](share.png)
  63. *
  64. * ## Example
  65. *
  66. * Generate new multicast Observable from the `source` Observable value
  67. *
  68. * ```ts
  69. * import { interval, tap, map, take, share } from 'rxjs';
  70. *
  71. * const source = interval(1000).pipe(
  72. * tap(x => console.log('Processing: ', x)),
  73. * map(x => x * x),
  74. * take(6),
  75. * share()
  76. * );
  77. *
  78. * source.subscribe(x => console.log('subscription 1: ', x));
  79. * source.subscribe(x => console.log('subscription 2: ', x));
  80. *
  81. * // Logs:
  82. * // Processing: 0
  83. * // subscription 1: 0
  84. * // subscription 2: 0
  85. * // Processing: 1
  86. * // subscription 1: 1
  87. * // subscription 2: 1
  88. * // Processing: 2
  89. * // subscription 1: 4
  90. * // subscription 2: 4
  91. * // Processing: 3
  92. * // subscription 1: 9
  93. * // subscription 2: 9
  94. * // Processing: 4
  95. * // subscription 1: 16
  96. * // subscription 2: 16
  97. * // Processing: 5
  98. * // subscription 1: 25
  99. * // subscription 2: 25
  100. * ```
  101. *
  102. * ## Example with notifier factory: Delayed reset
  103. *
  104. * ```ts
  105. * import { interval, take, share, timer } from 'rxjs';
  106. *
  107. * const source = interval(1000).pipe(
  108. * take(3),
  109. * share({
  110. * resetOnRefCountZero: () => timer(1000)
  111. * })
  112. * );
  113. *
  114. * const subscriptionOne = source.subscribe(x => console.log('subscription 1: ', x));
  115. * setTimeout(() => subscriptionOne.unsubscribe(), 1300);
  116. *
  117. * setTimeout(() => source.subscribe(x => console.log('subscription 2: ', x)), 1700);
  118. *
  119. * setTimeout(() => source.subscribe(x => console.log('subscription 3: ', x)), 5000);
  120. *
  121. * // Logs:
  122. * // subscription 1: 0
  123. * // (subscription 1 unsubscribes here)
  124. * // (subscription 2 subscribes here ~400ms later, source was not reset)
  125. * // subscription 2: 1
  126. * // subscription 2: 2
  127. * // (subscription 2 unsubscribes here)
  128. * // (subscription 3 subscribes here ~2000ms later, source did reset before)
  129. * // subscription 3: 0
  130. * // subscription 3: 1
  131. * // subscription 3: 2
  132. * ```
  133. *
  134. * @see {@link shareReplay}
  135. *
  136. * @return A function that returns an Observable that mirrors the source.
  137. */
  138. export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction<T> {
  139. const { connector = () => new Subject<T>(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options;
  140. // It's necessary to use a wrapper here, as the _operator_ must be
  141. // referentially transparent. Otherwise, it cannot be used in calls to the
  142. // static `pipe` function - to create a partial pipeline.
  143. //
  144. // The _operator function_ - the function returned by the _operator_ - will
  145. // not be referentially transparent - as it shares its source - but the
  146. // _operator function_ is called when the complete pipeline is composed via a
  147. // call to a source observable's `pipe` method - not when the static `pipe`
  148. // function is called.
  149. return (wrapperSource) => {
  150. let connection: SafeSubscriber<T> | undefined;
  151. let resetConnection: Subscription | undefined;
  152. let subject: SubjectLike<T> | undefined;
  153. let refCount = 0;
  154. let hasCompleted = false;
  155. let hasErrored = false;
  156. const cancelReset = () => {
  157. resetConnection?.unsubscribe();
  158. resetConnection = undefined;
  159. };
  160. // Used to reset the internal state to a "cold"
  161. // state, as though it had never been subscribed to.
  162. const reset = () => {
  163. cancelReset();
  164. connection = subject = undefined;
  165. hasCompleted = hasErrored = false;
  166. };
  167. const resetAndUnsubscribe = () => {
  168. // We need to capture the connection before
  169. // we reset (if we need to reset).
  170. const conn = connection;
  171. reset();
  172. conn?.unsubscribe();
  173. };
  174. return operate<T, T>((source, subscriber) => {
  175. refCount++;
  176. if (!hasErrored && !hasCompleted) {
  177. cancelReset();
  178. }
  179. // Create the subject if we don't have one yet. Grab a local reference to
  180. // it as well, which avoids non-null assertions when using it and, if we
  181. // connect to it now, then error/complete need a reference after it was
  182. // reset.
  183. const dest = (subject = subject ?? connector());
  184. // Add the finalization directly to the subscriber - instead of returning it -
  185. // so that the handling of the subscriber's unsubscription will be wired
  186. // up _before_ the subscription to the source occurs. This is done so that
  187. // the assignment to the source connection's `closed` property will be seen
  188. // by synchronous firehose sources.
  189. subscriber.add(() => {
  190. refCount--;
  191. // If we're resetting on refCount === 0, and it's 0, we only want to do
  192. // that on "unsubscribe", really. Resetting on error or completion is a different
  193. // configuration.
  194. if (refCount === 0 && !hasErrored && !hasCompleted) {
  195. resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero);
  196. }
  197. });
  198. // The following line adds the subscription to the subscriber passed.
  199. // Basically, `subscriber === dest.subscribe(subscriber)` is `true`.
  200. dest.subscribe(subscriber);
  201. if (
  202. !connection &&
  203. // Check this shareReplay is still activate - it can be reset to 0
  204. // and be "unsubscribed" _before_ it actually subscribes.
  205. // If we were to subscribe then, it'd leak and get stuck.
  206. refCount > 0
  207. ) {
  208. // We need to create a subscriber here - rather than pass an observer and
  209. // assign the returned subscription to connection - because it's possible
  210. // for reentrant subscriptions to the shared observable to occur and in
  211. // those situations we want connection to be already-assigned so that we
  212. // don't create another connection to the source.
  213. connection = new SafeSubscriber({
  214. next: (value) => dest.next(value),
  215. error: (err) => {
  216. hasErrored = true;
  217. cancelReset();
  218. resetConnection = handleReset(reset, resetOnError, err);
  219. dest.error(err);
  220. },
  221. complete: () => {
  222. hasCompleted = true;
  223. cancelReset();
  224. resetConnection = handleReset(reset, resetOnComplete);
  225. dest.complete();
  226. },
  227. });
  228. innerFrom(source).subscribe(connection);
  229. }
  230. })(wrapperSource);
  231. };
  232. }
  233. function handleReset<T extends unknown[] = never[]>(
  234. reset: () => void,
  235. on: boolean | ((...args: T) => ObservableInput<any>),
  236. ...args: T
  237. ): Subscription | undefined {
  238. if (on === true) {
  239. reset();
  240. return;
  241. }
  242. if (on === false) {
  243. return;
  244. }
  245. const onSubscriber = new SafeSubscriber({
  246. next: () => {
  247. onSubscriber.unsubscribe();
  248. reset();
  249. },
  250. });
  251. return innerFrom(on(...args)).subscribe(onSubscriber);
  252. }