123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- import { innerFrom } from '../observable/innerFrom';
- import { Subject } from '../Subject';
- import { SafeSubscriber } from '../Subscriber';
- import { Subscription } from '../Subscription';
- import { MonoTypeOperatorFunction, SubjectLike, ObservableInput } from '../types';
- import { operate } from '../util/lift';
- export interface ShareConfig<T> {
- /**
- * The factory used to create the subject that will connect the source observable to
- * multicast consumers.
- */
- connector?: () => SubjectLike<T>;
- /**
- * If `true`, the resulting observable will reset internal state on error from source and return to a "cold" state. This
- * allows the resulting observable to be "retried" in the event of an error.
- * If `false`, when an error comes from the source it will push the error into the connecting subject, and the subject
- * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries
- * or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however
- * {@link ReplaySubject} will also push its buffered values before pushing the error.
- * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
- * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
- */
- resetOnError?: boolean | ((error: any) => ObservableInput<any>);
- /**
- * If `true`, the resulting observable will reset internal state on completion from source and return to a "cold" state. This
- * allows the resulting observable to be "repeated" after it is done.
- * If `false`, when the source completes, it will push the completion through the connecting subject, and the subject
- * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats
- * or resubscriptions will resubscribe to that same subject.
- * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
- * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
- */
- resetOnComplete?: boolean | (() => ObservableInput<any>);
- /**
- * If `true`, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the
- * internal state will be reset and the resulting observable will return to a "cold" state. This means that the next
- * time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to
- * again.
- * If `false`, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject
- * will remain connected to the source, and new subscriptions to the result will be connected through that same subject.
- * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
- * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
- */
- resetOnRefCountZero?: boolean | (() => ObservableInput<any>);
- }
- export function share<T>(): MonoTypeOperatorFunction<T>;
- export function share<T>(options: ShareConfig<T>): MonoTypeOperatorFunction<T>;
- /**
- * Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one
- * Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will
- * unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream `hot`.
- * This is an alias for `multicast(() => new Subject()), refCount()`.
- *
- * The subscription to the underlying source Observable can be reset (unsubscribe and resubscribe for new subscribers),
- * if the subscriber count to the shared observable drops to 0, or if the source Observable errors or completes. It is
- * possible to use notifier factories for the resets to allow for behaviors like conditional or delayed resets. Please
- * note that resetting on error or complete of the source Observable does not behave like a transparent retry or restart
- * of the source because the error or complete will be forwarded to all subscribers and their subscription will be
- * closed. Only new subscribers after a reset on error or complete happened will cause a fresh subscription to the
- * source. To achieve transparent retries or restarts pipe the source through appropriate operators before sharing.
- *
- * 
- *
- * ## Example
- *
- * Generate new multicast Observable from the `source` Observable value
- *
- * ```ts
- * import { interval, tap, map, take, share } from 'rxjs';
- *
- * const source = interval(1000).pipe(
- * tap(x => console.log('Processing: ', x)),
- * map(x => x * x),
- * take(6),
- * share()
- * );
- *
- * source.subscribe(x => console.log('subscription 1: ', x));
- * source.subscribe(x => console.log('subscription 2: ', x));
- *
- * // Logs:
- * // Processing: 0
- * // subscription 1: 0
- * // subscription 2: 0
- * // Processing: 1
- * // subscription 1: 1
- * // subscription 2: 1
- * // Processing: 2
- * // subscription 1: 4
- * // subscription 2: 4
- * // Processing: 3
- * // subscription 1: 9
- * // subscription 2: 9
- * // Processing: 4
- * // subscription 1: 16
- * // subscription 2: 16
- * // Processing: 5
- * // subscription 1: 25
- * // subscription 2: 25
- * ```
- *
- * ## Example with notifier factory: Delayed reset
- *
- * ```ts
- * import { interval, take, share, timer } from 'rxjs';
- *
- * const source = interval(1000).pipe(
- * take(3),
- * share({
- * resetOnRefCountZero: () => timer(1000)
- * })
- * );
- *
- * const subscriptionOne = source.subscribe(x => console.log('subscription 1: ', x));
- * setTimeout(() => subscriptionOne.unsubscribe(), 1300);
- *
- * setTimeout(() => source.subscribe(x => console.log('subscription 2: ', x)), 1700);
- *
- * setTimeout(() => source.subscribe(x => console.log('subscription 3: ', x)), 5000);
- *
- * // Logs:
- * // subscription 1: 0
- * // (subscription 1 unsubscribes here)
- * // (subscription 2 subscribes here ~400ms later, source was not reset)
- * // subscription 2: 1
- * // subscription 2: 2
- * // (subscription 2 unsubscribes here)
- * // (subscription 3 subscribes here ~2000ms later, source did reset before)
- * // subscription 3: 0
- * // subscription 3: 1
- * // subscription 3: 2
- * ```
- *
- * @see {@link shareReplay}
- *
- * @return A function that returns an Observable that mirrors the source.
- */
- export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction<T> {
- const { connector = () => new Subject<T>(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options;
- // It's necessary to use a wrapper here, as the _operator_ must be
- // referentially transparent. Otherwise, it cannot be used in calls to the
- // static `pipe` function - to create a partial pipeline.
- //
- // The _operator function_ - the function returned by the _operator_ - will
- // not be referentially transparent - as it shares its source - but the
- // _operator function_ is called when the complete pipeline is composed via a
- // call to a source observable's `pipe` method - not when the static `pipe`
- // function is called.
- return (wrapperSource) => {
- let connection: SafeSubscriber<T> | undefined;
- let resetConnection: Subscription | undefined;
- let subject: SubjectLike<T> | undefined;
- let refCount = 0;
- let hasCompleted = false;
- let hasErrored = false;
- const cancelReset = () => {
- resetConnection?.unsubscribe();
- resetConnection = undefined;
- };
- // Used to reset the internal state to a "cold"
- // state, as though it had never been subscribed to.
- const reset = () => {
- cancelReset();
- connection = subject = undefined;
- hasCompleted = hasErrored = false;
- };
- const resetAndUnsubscribe = () => {
- // We need to capture the connection before
- // we reset (if we need to reset).
- const conn = connection;
- reset();
- conn?.unsubscribe();
- };
- return operate<T, T>((source, subscriber) => {
- refCount++;
- if (!hasErrored && !hasCompleted) {
- cancelReset();
- }
- // Create the subject if we don't have one yet. Grab a local reference to
- // it as well, which avoids non-null assertions when using it and, if we
- // connect to it now, then error/complete need a reference after it was
- // reset.
- const dest = (subject = subject ?? connector());
- // Add the finalization directly to the subscriber - instead of returning it -
- // so that the handling of the subscriber's unsubscription will be wired
- // up _before_ the subscription to the source occurs. This is done so that
- // the assignment to the source connection's `closed` property will be seen
- // by synchronous firehose sources.
- subscriber.add(() => {
- refCount--;
- // If we're resetting on refCount === 0, and it's 0, we only want to do
- // that on "unsubscribe", really. Resetting on error or completion is a different
- // configuration.
- if (refCount === 0 && !hasErrored && !hasCompleted) {
- resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero);
- }
- });
- // The following line adds the subscription to the subscriber passed.
- // Basically, `subscriber === dest.subscribe(subscriber)` is `true`.
- dest.subscribe(subscriber);
- if (
- !connection &&
- // Check this shareReplay is still activate - it can be reset to 0
- // and be "unsubscribed" _before_ it actually subscribes.
- // If we were to subscribe then, it'd leak and get stuck.
- refCount > 0
- ) {
- // We need to create a subscriber here - rather than pass an observer and
- // assign the returned subscription to connection - because it's possible
- // for reentrant subscriptions to the shared observable to occur and in
- // those situations we want connection to be already-assigned so that we
- // don't create another connection to the source.
- connection = new SafeSubscriber({
- next: (value) => dest.next(value),
- error: (err) => {
- hasErrored = true;
- cancelReset();
- resetConnection = handleReset(reset, resetOnError, err);
- dest.error(err);
- },
- complete: () => {
- hasCompleted = true;
- cancelReset();
- resetConnection = handleReset(reset, resetOnComplete);
- dest.complete();
- },
- });
- innerFrom(source).subscribe(connection);
- }
- })(wrapperSource);
- };
- }
- function handleReset<T extends unknown[] = never[]>(
- reset: () => void,
- on: boolean | ((...args: T) => ObservableInput<any>),
- ...args: T
- ): Subscription | undefined {
- if (on === true) {
- reset();
- return;
- }
- if (on === false) {
- return;
- }
- const onSubscriber = new SafeSubscriber({
- next: () => {
- onSubscriber.unsubscribe();
- reset();
- },
- });
- return innerFrom(on(...args)).subscribe(onSubscriber);
- }
|