d459dc2b304d0b3fae711971351fef2889d8f6a36c80e69ea07b09aba01f41baf0f89dbdd7f5172da2cc8a85d9bcbc2d83205824b2f0697b39f45280a47f5d 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import { ConnectableObservable } from '../observable/ConnectableObservable';
  2. import { Subscription } from '../Subscription';
  3. import { MonoTypeOperatorFunction } from '../types';
  4. import { operate } from '../util/lift';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. /**
  7. * Make a {@link ConnectableObservable} behave like a ordinary observable and automates the way
  8. * you can connect to it.
  9. *
  10. * Internally it counts the subscriptions to the observable and subscribes (only once) to the source if
  11. * the number of subscriptions is larger than 0. If the number of subscriptions is smaller than 1, it
  12. * unsubscribes from the source. This way you can make sure that everything before the *published*
  13. * refCount has only a single subscription independently of the number of subscribers to the target
  14. * observable.
  15. *
  16. * Note that using the {@link share} operator is exactly the same as using the `multicast(() => new Subject())` operator
  17. * (making the observable hot) and the *refCount* operator in a sequence.
  18. *
  19. * ![](refCount.png)
  20. *
  21. * ## Example
  22. *
  23. * In the following example there are two intervals turned into connectable observables
  24. * by using the *publish* operator. The first one uses the *refCount* operator, the
  25. * second one does not use it. You will notice that a connectable observable does nothing
  26. * until you call its connect function.
  27. *
  28. * ```ts
  29. * import { interval, tap, publish, refCount } from 'rxjs';
  30. *
  31. * // Turn the interval observable into a ConnectableObservable (hot)
  32. * const refCountInterval = interval(400).pipe(
  33. * tap(num => console.log(`refCount ${ num }`)),
  34. * publish(),
  35. * refCount()
  36. * );
  37. *
  38. * const publishedInterval = interval(400).pipe(
  39. * tap(num => console.log(`publish ${ num }`)),
  40. * publish()
  41. * );
  42. *
  43. * refCountInterval.subscribe();
  44. * refCountInterval.subscribe();
  45. * // 'refCount 0' -----> 'refCount 1' -----> etc
  46. * // All subscriptions will receive the same value and the tap (and
  47. * // every other operator) before the `publish` operator will be executed
  48. * // only once per event independently of the number of subscriptions.
  49. *
  50. * publishedInterval.subscribe();
  51. * // Nothing happens until you call .connect() on the observable.
  52. * ```
  53. *
  54. * @return A function that returns an Observable that automates the connection
  55. * to ConnectableObservable.
  56. * @see {@link ConnectableObservable}
  57. * @see {@link share}
  58. * @see {@link publish}
  59. * @deprecated Replaced with the {@link share} operator. How `share` is used
  60. * will depend on the connectable observable you created just prior to the
  61. * `refCount` operator.
  62. * Details: https://rxjs.dev/deprecations/multicasting
  63. */
  64. export function refCount<T>(): MonoTypeOperatorFunction<T> {
  65. return operate((source, subscriber) => {
  66. let connection: Subscription | null = null;
  67. (source as any)._refCount++;
  68. const refCounter = createOperatorSubscriber(subscriber, undefined, undefined, undefined, () => {
  69. if (!source || (source as any)._refCount <= 0 || 0 < --(source as any)._refCount) {
  70. connection = null;
  71. return;
  72. }
  73. ///
  74. // Compare the local RefCountSubscriber's connection Subscription to the
  75. // connection Subscription on the shared ConnectableObservable. In cases
  76. // where the ConnectableObservable source synchronously emits values, and
  77. // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
  78. // execution continues to here before the RefCountOperator has a chance to
  79. // supply the RefCountSubscriber with the shared connection Subscription.
  80. // For example:
  81. // ```
  82. // range(0, 10).pipe(
  83. // publish(),
  84. // refCount(),
  85. // take(5),
  86. // )
  87. // .subscribe();
  88. // ```
  89. // In order to account for this case, RefCountSubscriber should only dispose
  90. // the ConnectableObservable's shared connection Subscription if the
  91. // connection Subscription exists, *and* either:
  92. // a. RefCountSubscriber doesn't have a reference to the shared connection
  93. // Subscription yet, or,
  94. // b. RefCountSubscriber's connection Subscription reference is identical
  95. // to the shared connection Subscription
  96. ///
  97. const sharedConnection = (source as any)._connection;
  98. const conn = connection;
  99. connection = null;
  100. if (sharedConnection && (!conn || sharedConnection === conn)) {
  101. sharedConnection.unsubscribe();
  102. }
  103. subscriber.unsubscribe();
  104. });
  105. source.subscribe(refCounter);
  106. if (!refCounter.closed) {
  107. connection = (source as ConnectableObservable<T>).connect();
  108. }
  109. });
  110. }