725779ae40d4ed5a44805a7f8fea8564d2361d88ec382dd054be9e08e9956c6ec35005c6aa5044dcbc3fc919b7c20ac7107615362ed47b2061996a0e28e064 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import { Subject } from './Subject';
  2. import { Subscriber } from './Subscriber';
  3. /**
  4. * A variant of Subject that only emits a value when it completes. It will emit
  5. * its latest value to all its observers on completion.
  6. */
  7. export class AsyncSubject<T> extends Subject<T> {
  8. private _value: T | null = null;
  9. private _hasValue = false;
  10. private _isComplete = false;
  11. /** @internal */
  12. protected _checkFinalizedStatuses(subscriber: Subscriber<T>) {
  13. const { hasError, _hasValue, _value, thrownError, isStopped, _isComplete } = this;
  14. if (hasError) {
  15. subscriber.error(thrownError);
  16. } else if (isStopped || _isComplete) {
  17. _hasValue && subscriber.next(_value!);
  18. subscriber.complete();
  19. }
  20. }
  21. next(value: T): void {
  22. if (!this.isStopped) {
  23. this._value = value;
  24. this._hasValue = true;
  25. }
  26. }
  27. complete(): void {
  28. const { _hasValue, _value, _isComplete } = this;
  29. if (!_isComplete) {
  30. this._isComplete = true;
  31. _hasValue && super.next(_value!);
  32. super.complete();
  33. }
  34. }
  35. }