11497fe6e37508fbfffcbf44e1bf1f3ef10be06daa95d8fba3619b484ccefc105ba91cb872aac4c4edaac7b4424adf6236a1522544ccb9332a0a48347a8265 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import { Subject, SubjectSubscriber } from '../Subject';
  2. import { Operator } from '../Operator';
  3. import { Observable } from '../Observable';
  4. import { Subscriber } from '../Subscriber';
  5. import { Subscription } from '../Subscription';
  6. import { TeardownLogic } from '../types';
  7. import { refCount as higherOrderRefCount } from '../operators/refCount';
  8. /**
  9. * @class ConnectableObservable<T>
  10. */
  11. export class ConnectableObservable<T> extends Observable<T> {
  12. protected _subject: Subject<T>;
  13. protected _refCount: number = 0;
  14. protected _connection: Subscription;
  15. /** @internal */
  16. _isComplete = false;
  17. constructor(public source: Observable<T>,
  18. protected subjectFactory: () => Subject<T>) {
  19. super();
  20. }
  21. /** @deprecated This is an internal implementation detail, do not use. */
  22. _subscribe(subscriber: Subscriber<T>) {
  23. return this.getSubject().subscribe(subscriber);
  24. }
  25. protected getSubject(): Subject<T> {
  26. const subject = this._subject;
  27. if (!subject || subject.isStopped) {
  28. this._subject = this.subjectFactory();
  29. }
  30. return this._subject;
  31. }
  32. connect(): Subscription {
  33. let connection = this._connection;
  34. if (!connection) {
  35. this._isComplete = false;
  36. connection = this._connection = new Subscription();
  37. connection.add(this.source
  38. .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
  39. if (connection.closed) {
  40. this._connection = null;
  41. connection = Subscription.EMPTY;
  42. }
  43. }
  44. return connection;
  45. }
  46. refCount(): Observable<T> {
  47. return higherOrderRefCount()(this) as Observable<T>;
  48. }
  49. }
  50. export const connectableObservableDescriptor: PropertyDescriptorMap = (() => {
  51. const connectableProto = <any>ConnectableObservable.prototype;
  52. return {
  53. operator: { value: null as null },
  54. _refCount: { value: 0, writable: true },
  55. _subject: { value: null as null, writable: true },
  56. _connection: { value: null as null, writable: true },
  57. _subscribe: { value: connectableProto._subscribe },
  58. _isComplete: { value: connectableProto._isComplete, writable: true },
  59. getSubject: { value: connectableProto.getSubject },
  60. connect: { value: connectableProto.connect },
  61. refCount: { value: connectableProto.refCount }
  62. };
  63. })();
  64. class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
  65. constructor(destination: Subject<T>,
  66. private connectable: ConnectableObservable<T>) {
  67. super(destination);
  68. }
  69. protected _error(err: any): void {
  70. this._unsubscribe();
  71. super._error(err);
  72. }
  73. protected _complete(): void {
  74. this.connectable._isComplete = true;
  75. this._unsubscribe();
  76. super._complete();
  77. }
  78. protected _unsubscribe() {
  79. const connectable = <any>this.connectable;
  80. if (connectable) {
  81. this.connectable = null;
  82. const connection = connectable._connection;
  83. connectable._refCount = 0;
  84. connectable._subject = null;
  85. connectable._connection = null;
  86. if (connection) {
  87. connection.unsubscribe();
  88. }
  89. }
  90. }
  91. }
  92. class RefCountOperator<T> implements Operator<T, T> {
  93. constructor(private connectable: ConnectableObservable<T>) {
  94. }
  95. call(subscriber: Subscriber<T>, source: any): TeardownLogic {
  96. const { connectable } = this;
  97. (<any> connectable)._refCount++;
  98. const refCounter = new RefCountSubscriber(subscriber, connectable);
  99. const subscription = source.subscribe(refCounter);
  100. if (!refCounter.closed) {
  101. (<any> refCounter).connection = connectable.connect();
  102. }
  103. return subscription;
  104. }
  105. }
  106. class RefCountSubscriber<T> extends Subscriber<T> {
  107. private connection: Subscription;
  108. constructor(destination: Subscriber<T>,
  109. private connectable: ConnectableObservable<T>) {
  110. super(destination);
  111. }
  112. protected _unsubscribe() {
  113. const { connectable } = this;
  114. if (!connectable) {
  115. this.connection = null;
  116. return;
  117. }
  118. this.connectable = null;
  119. const refCount = (<any> connectable)._refCount;
  120. if (refCount <= 0) {
  121. this.connection = null;
  122. return;
  123. }
  124. (<any> connectable)._refCount = refCount - 1;
  125. if (refCount > 1) {
  126. this.connection = null;
  127. return;
  128. }
  129. ///
  130. // Compare the local RefCountSubscriber's connection Subscription to the
  131. // connection Subscription on the shared ConnectableObservable. In cases
  132. // where the ConnectableObservable source synchronously emits values, and
  133. // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
  134. // execution continues to here before the RefCountOperator has a chance to
  135. // supply the RefCountSubscriber with the shared connection Subscription.
  136. // For example:
  137. // ```
  138. // range(0, 10).pipe(
  139. // publish(),
  140. // refCount(),
  141. // take(5),
  142. // ).subscribe();
  143. // ```
  144. // In order to account for this case, RefCountSubscriber should only dispose
  145. // the ConnectableObservable's shared connection Subscription if the
  146. // connection Subscription exists, *and* either:
  147. // a. RefCountSubscriber doesn't have a reference to the shared connection
  148. // Subscription yet, or,
  149. // b. RefCountSubscriber's connection Subscription reference is identical
  150. // to the shared connection Subscription
  151. ///
  152. const { connection } = this;
  153. const sharedConnection = (<any> connectable)._connection;
  154. this.connection = null;
  155. if (sharedConnection && (!connection || sharedConnection === connection)) {
  156. sharedConnection.unsubscribe();
  157. }
  158. }
  159. }