90a847b5b5fe3e5479128bc15603cd944ac9020ed00ab8bc7fdd668e58c08d9b5f20d86cc30bac19faf8a3844cb90b1e1508ff471f6e19aeabbe26dc8491d2 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import { isFunction } from './util/isFunction';
  2. import { Observer, ObservableNotification } from './types';
  3. import { isSubscription, Subscription } from './Subscription';
  4. import { config } from './config';
  5. import { reportUnhandledError } from './util/reportUnhandledError';
  6. import { noop } from './util/noop';
  7. import { nextNotification, errorNotification, COMPLETE_NOTIFICATION } from './NotificationFactories';
  8. import { timeoutProvider } from './scheduler/timeoutProvider';
  9. import { captureError } from './util/errorContext';
  10. /**
  11. * Implements the {@link Observer} interface and extends the
  12. * {@link Subscription} class. While the {@link Observer} is the public API for
  13. * consuming the values of an {@link Observable}, all Observers get converted to
  14. * a Subscriber, in order to provide Subscription-like capabilities such as
  15. * `unsubscribe`. Subscriber is a common type in RxJS, and crucial for
  16. * implementing operators, but it is rarely used as a public API.
  17. */
  18. export class Subscriber<T> extends Subscription implements Observer<T> {
  19. /**
  20. * A static factory for a Subscriber, given a (potentially partial) definition
  21. * of an Observer.
  22. * @param next The `next` callback of an Observer.
  23. * @param error The `error` callback of an
  24. * Observer.
  25. * @param complete The `complete` callback of an
  26. * Observer.
  27. * @return A Subscriber wrapping the (partially defined)
  28. * Observer represented by the given arguments.
  29. * @deprecated Do not use. Will be removed in v8. There is no replacement for this
  30. * method, and there is no reason to be creating instances of `Subscriber` directly.
  31. * If you have a specific use case, please file an issue.
  32. */
  33. static create<T>(next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void): Subscriber<T> {
  34. return new SafeSubscriber(next, error, complete);
  35. }
  36. /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
  37. protected isStopped: boolean = false;
  38. /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
  39. protected destination: Subscriber<any> | Observer<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
  40. /**
  41. * @deprecated Internal implementation detail, do not use directly. Will be made internal in v8.
  42. * There is no reason to directly create an instance of Subscriber. This type is exported for typings reasons.
  43. */
  44. constructor(destination?: Subscriber<any> | Observer<any>) {
  45. super();
  46. if (destination) {
  47. this.destination = destination;
  48. // Automatically chain subscriptions together here.
  49. // if destination is a Subscription, then it is a Subscriber.
  50. if (isSubscription(destination)) {
  51. destination.add(this);
  52. }
  53. } else {
  54. this.destination = EMPTY_OBSERVER;
  55. }
  56. }
  57. /**
  58. * The {@link Observer} callback to receive notifications of type `next` from
  59. * the Observable, with a value. The Observable may call this method 0 or more
  60. * times.
  61. * @param value The `next` value.
  62. */
  63. next(value: T): void {
  64. if (this.isStopped) {
  65. handleStoppedNotification(nextNotification(value), this);
  66. } else {
  67. this._next(value!);
  68. }
  69. }
  70. /**
  71. * The {@link Observer} callback to receive notifications of type `error` from
  72. * the Observable, with an attached `Error`. Notifies the Observer that
  73. * the Observable has experienced an error condition.
  74. * @param err The `error` exception.
  75. */
  76. error(err?: any): void {
  77. if (this.isStopped) {
  78. handleStoppedNotification(errorNotification(err), this);
  79. } else {
  80. this.isStopped = true;
  81. this._error(err);
  82. }
  83. }
  84. /**
  85. * The {@link Observer} callback to receive a valueless notification of type
  86. * `complete` from the Observable. Notifies the Observer that the Observable
  87. * has finished sending push-based notifications.
  88. */
  89. complete(): void {
  90. if (this.isStopped) {
  91. handleStoppedNotification(COMPLETE_NOTIFICATION, this);
  92. } else {
  93. this.isStopped = true;
  94. this._complete();
  95. }
  96. }
  97. unsubscribe(): void {
  98. if (!this.closed) {
  99. this.isStopped = true;
  100. super.unsubscribe();
  101. this.destination = null!;
  102. }
  103. }
  104. protected _next(value: T): void {
  105. this.destination.next(value);
  106. }
  107. protected _error(err: any): void {
  108. try {
  109. this.destination.error(err);
  110. } finally {
  111. this.unsubscribe();
  112. }
  113. }
  114. protected _complete(): void {
  115. try {
  116. this.destination.complete();
  117. } finally {
  118. this.unsubscribe();
  119. }
  120. }
  121. }
  122. /**
  123. * This bind is captured here because we want to be able to have
  124. * compatibility with monoid libraries that tend to use a method named
  125. * `bind`. In particular, a library called Monio requires this.
  126. */
  127. const _bind = Function.prototype.bind;
  128. function bind<Fn extends (...args: any[]) => any>(fn: Fn, thisArg: any): Fn {
  129. return _bind.call(fn, thisArg);
  130. }
  131. /**
  132. * Internal optimization only, DO NOT EXPOSE.
  133. * @internal
  134. */
  135. class ConsumerObserver<T> implements Observer<T> {
  136. constructor(private partialObserver: Partial<Observer<T>>) {}
  137. next(value: T): void {
  138. const { partialObserver } = this;
  139. if (partialObserver.next) {
  140. try {
  141. partialObserver.next(value);
  142. } catch (error) {
  143. handleUnhandledError(error);
  144. }
  145. }
  146. }
  147. error(err: any): void {
  148. const { partialObserver } = this;
  149. if (partialObserver.error) {
  150. try {
  151. partialObserver.error(err);
  152. } catch (error) {
  153. handleUnhandledError(error);
  154. }
  155. } else {
  156. handleUnhandledError(err);
  157. }
  158. }
  159. complete(): void {
  160. const { partialObserver } = this;
  161. if (partialObserver.complete) {
  162. try {
  163. partialObserver.complete();
  164. } catch (error) {
  165. handleUnhandledError(error);
  166. }
  167. }
  168. }
  169. }
  170. export class SafeSubscriber<T> extends Subscriber<T> {
  171. constructor(
  172. observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
  173. error?: ((e?: any) => void) | null,
  174. complete?: (() => void) | null
  175. ) {
  176. super();
  177. let partialObserver: Partial<Observer<T>>;
  178. if (isFunction(observerOrNext) || !observerOrNext) {
  179. // The first argument is a function, not an observer. The next
  180. // two arguments *could* be observers, or they could be empty.
  181. partialObserver = {
  182. next: (observerOrNext ?? undefined) as ((value: T) => void) | undefined,
  183. error: error ?? undefined,
  184. complete: complete ?? undefined,
  185. };
  186. } else {
  187. // The first argument is a partial observer.
  188. let context: any;
  189. if (this && config.useDeprecatedNextContext) {
  190. // This is a deprecated path that made `this.unsubscribe()` available in
  191. // next handler functions passed to subscribe. This only exists behind a flag
  192. // now, as it is *very* slow.
  193. context = Object.create(observerOrNext);
  194. context.unsubscribe = () => this.unsubscribe();
  195. partialObserver = {
  196. next: observerOrNext.next && bind(observerOrNext.next, context),
  197. error: observerOrNext.error && bind(observerOrNext.error, context),
  198. complete: observerOrNext.complete && bind(observerOrNext.complete, context),
  199. };
  200. } else {
  201. // The "normal" path. Just use the partial observer directly.
  202. partialObserver = observerOrNext;
  203. }
  204. }
  205. // Wrap the partial observer to ensure it's a full observer, and
  206. // make sure proper error handling is accounted for.
  207. this.destination = new ConsumerObserver(partialObserver);
  208. }
  209. }
  210. function handleUnhandledError(error: any) {
  211. if (config.useDeprecatedSynchronousErrorHandling) {
  212. captureError(error);
  213. } else {
  214. // Ideal path, we report this as an unhandled error,
  215. // which is thrown on a new call stack.
  216. reportUnhandledError(error);
  217. }
  218. }
  219. /**
  220. * An error handler used when no error handler was supplied
  221. * to the SafeSubscriber -- meaning no error handler was supplied
  222. * do the `subscribe` call on our observable.
  223. * @param err The error to handle
  224. */
  225. function defaultErrorHandler(err: any) {
  226. throw err;
  227. }
  228. /**
  229. * A handler for notifications that cannot be sent to a stopped subscriber.
  230. * @param notification The notification being sent.
  231. * @param subscriber The stopped subscriber.
  232. */
  233. function handleStoppedNotification(notification: ObservableNotification<any>, subscriber: Subscriber<any>) {
  234. const { onStoppedNotification } = config;
  235. onStoppedNotification && timeoutProvider.setTimeout(() => onStoppedNotification(notification, subscriber));
  236. }
  237. /**
  238. * The observer used as a stub for subscriptions where the user did not
  239. * pass any arguments to `subscribe`. Comes with the default error handling
  240. * behavior.
  241. */
  242. export const EMPTY_OBSERVER: Readonly<Observer<any>> & { closed: true } = {
  243. closed: true,
  244. next: noop,
  245. error: defaultErrorHandler,
  246. complete: noop,
  247. };