55d8749dbfe6f879c6fcbb336229bd3d5faf1e3a3d4e39508457d171b6c6ab7f11c924f30fd4291787db21bd2b8d24596865c0491ce35b18c701322c807a2d 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import { Subject } from '../Subject';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { refCount as higherOrderRefCount } from '../operators/refCount';
  6. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  7. import { hasLift } from '../util/lift';
  8. /**
  9. * @class ConnectableObservable<T>
  10. * @deprecated Will be removed in v8. Use {@link connectable} to create a connectable observable.
  11. * If you are using the `refCount` method of `ConnectableObservable`, use the {@link share} operator
  12. * instead.
  13. * Details: https://rxjs.dev/deprecations/multicasting
  14. */
  15. export class ConnectableObservable<T> extends Observable<T> {
  16. protected _subject: Subject<T> | null = null;
  17. protected _refCount: number = 0;
  18. protected _connection: Subscription | null = null;
  19. /**
  20. * @param source The source observable
  21. * @param subjectFactory The factory that creates the subject used internally.
  22. * @deprecated Will be removed in v8. Use {@link connectable} to create a connectable observable.
  23. * `new ConnectableObservable(source, factory)` is equivalent to
  24. * `connectable(source, { connector: factory })`.
  25. * When the `refCount()` method is needed, the {@link share} operator should be used instead:
  26. * `new ConnectableObservable(source, factory).refCount()` is equivalent to
  27. * `source.pipe(share({ connector: factory }))`.
  28. * Details: https://rxjs.dev/deprecations/multicasting
  29. */
  30. constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
  31. super();
  32. // If we have lift, monkey patch that here. This is done so custom observable
  33. // types will compose through multicast. Otherwise the resulting observable would
  34. // simply be an instance of `ConnectableObservable`.
  35. if (hasLift(source)) {
  36. this.lift = source.lift;
  37. }
  38. }
  39. /** @internal */
  40. protected _subscribe(subscriber: Subscriber<T>) {
  41. return this.getSubject().subscribe(subscriber);
  42. }
  43. protected getSubject(): Subject<T> {
  44. const subject = this._subject;
  45. if (!subject || subject.isStopped) {
  46. this._subject = this.subjectFactory();
  47. }
  48. return this._subject!;
  49. }
  50. protected _teardown() {
  51. this._refCount = 0;
  52. const { _connection } = this;
  53. this._subject = this._connection = null;
  54. _connection?.unsubscribe();
  55. }
  56. /**
  57. * @deprecated {@link ConnectableObservable} will be removed in v8. Use {@link connectable} instead.
  58. * Details: https://rxjs.dev/deprecations/multicasting
  59. */
  60. connect(): Subscription {
  61. let connection = this._connection;
  62. if (!connection) {
  63. connection = this._connection = new Subscription();
  64. const subject = this.getSubject();
  65. connection.add(
  66. this.source.subscribe(
  67. createOperatorSubscriber(
  68. subject as any,
  69. undefined,
  70. () => {
  71. this._teardown();
  72. subject.complete();
  73. },
  74. (err) => {
  75. this._teardown();
  76. subject.error(err);
  77. },
  78. () => this._teardown()
  79. )
  80. )
  81. );
  82. if (connection.closed) {
  83. this._connection = null;
  84. connection = Subscription.EMPTY;
  85. }
  86. }
  87. return connection;
  88. }
  89. /**
  90. * @deprecated {@link ConnectableObservable} will be removed in v8. Use the {@link share} operator instead.
  91. * Details: https://rxjs.dev/deprecations/multicasting
  92. */
  93. refCount(): Observable<T> {
  94. return higherOrderRefCount()(this) as Observable<T>;
  95. }
  96. }