45f80179c9d45fcb2f91ad350f9e3e5e066d000c6f5c988cd623d82d73fe0785403f1a777363149d434b192a3241d25870307101756d6385e57338cb109a08 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import { Subscriber } from '../Subscriber';
  2. /**
  3. * Creates an instance of an `OperatorSubscriber`.
  4. * @param destination The downstream subscriber.
  5. * @param onNext Handles next values, only called if this subscriber is not stopped or closed. Any
  6. * error that occurs in this function is caught and sent to the `error` method of this subscriber.
  7. * @param onError Handles errors from the subscription, any errors that occur in this handler are caught
  8. * and send to the `destination` error handler.
  9. * @param onComplete Handles completion notification from the subscription. Any errors that occur in
  10. * this handler are sent to the `destination` error handler.
  11. * @param onFinalize Additional teardown logic here. This will only be called on teardown if the
  12. * subscriber itself is not already closed. This is called after all other teardown logic is executed.
  13. */
  14. export function createOperatorSubscriber<T>(
  15. destination: Subscriber<any>,
  16. onNext?: (value: T) => void,
  17. onComplete?: () => void,
  18. onError?: (err: any) => void,
  19. onFinalize?: () => void
  20. ): Subscriber<T> {
  21. return new OperatorSubscriber(destination, onNext, onComplete, onError, onFinalize);
  22. }
  23. /**
  24. * A generic helper for allowing operators to be created with a Subscriber and
  25. * use closures to capture necessary state from the operator function itself.
  26. */
  27. export class OperatorSubscriber<T> extends Subscriber<T> {
  28. /**
  29. * Creates an instance of an `OperatorSubscriber`.
  30. * @param destination The downstream subscriber.
  31. * @param onNext Handles next values, only called if this subscriber is not stopped or closed. Any
  32. * error that occurs in this function is caught and sent to the `error` method of this subscriber.
  33. * @param onError Handles errors from the subscription, any errors that occur in this handler are caught
  34. * and send to the `destination` error handler.
  35. * @param onComplete Handles completion notification from the subscription. Any errors that occur in
  36. * this handler are sent to the `destination` error handler.
  37. * @param onFinalize Additional finalization logic here. This will only be called on finalization if the
  38. * subscriber itself is not already closed. This is called after all other finalization logic is executed.
  39. * @param shouldUnsubscribe An optional check to see if an unsubscribe call should truly unsubscribe.
  40. * NOTE: This currently **ONLY** exists to support the strange behavior of {@link groupBy}, where unsubscription
  41. * to the resulting observable does not actually disconnect from the source if there are active subscriptions
  42. * to any grouped observable. (DO NOT EXPOSE OR USE EXTERNALLY!!!)
  43. */
  44. constructor(
  45. destination: Subscriber<any>,
  46. onNext?: (value: T) => void,
  47. onComplete?: () => void,
  48. onError?: (err: any) => void,
  49. private onFinalize?: () => void,
  50. private shouldUnsubscribe?: () => boolean
  51. ) {
  52. // It's important - for performance reasons - that all of this class's
  53. // members are initialized and that they are always initialized in the same
  54. // order. This will ensure that all OperatorSubscriber instances have the
  55. // same hidden class in V8. This, in turn, will help keep the number of
  56. // hidden classes involved in property accesses within the base class as
  57. // low as possible. If the number of hidden classes involved exceeds four,
  58. // the property accesses will become megamorphic and performance penalties
  59. // will be incurred - i.e. inline caches won't be used.
  60. //
  61. // The reasons for ensuring all instances have the same hidden class are
  62. // further discussed in this blog post from Benedikt Meurer:
  63. // https://benediktmeurer.de/2018/03/23/impact-of-polymorphism-on-component-based-frameworks-like-react/
  64. super(destination);
  65. this._next = onNext
  66. ? function (this: OperatorSubscriber<T>, value: T) {
  67. try {
  68. onNext(value);
  69. } catch (err) {
  70. destination.error(err);
  71. }
  72. }
  73. : super._next;
  74. this._error = onError
  75. ? function (this: OperatorSubscriber<T>, err: any) {
  76. try {
  77. onError(err);
  78. } catch (err) {
  79. // Send any errors that occur down stream.
  80. destination.error(err);
  81. } finally {
  82. // Ensure finalization.
  83. this.unsubscribe();
  84. }
  85. }
  86. : super._error;
  87. this._complete = onComplete
  88. ? function (this: OperatorSubscriber<T>) {
  89. try {
  90. onComplete();
  91. } catch (err) {
  92. // Send any errors that occur down stream.
  93. destination.error(err);
  94. } finally {
  95. // Ensure finalization.
  96. this.unsubscribe();
  97. }
  98. }
  99. : super._complete;
  100. }
  101. unsubscribe() {
  102. if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) {
  103. const { closed } = this;
  104. super.unsubscribe();
  105. // Execute additional teardown if we have any and we didn't already do so.
  106. !closed && this.onFinalize?.();
  107. }
  108. }
  109. }