9aba4dfc584da244781e5f87eba6816171cc920e75b0c8365ff5d6eb2f565bf3f7de836beb409d3c6388bd3c2a973e458e5d89393b59d178f3e01ee962f6de 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import { SchedulerLike } from '../types';
  2. import { isScheduler } from '../util/isScheduler';
  3. import { Observable } from '../Observable';
  4. import { subscribeOn } from '../operators/subscribeOn';
  5. import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
  6. import { observeOn } from '../operators/observeOn';
  7. import { AsyncSubject } from '../AsyncSubject';
  8. export function bindCallbackInternals(
  9. isNodeStyle: boolean,
  10. callbackFunc: any,
  11. resultSelector?: any,
  12. scheduler?: SchedulerLike
  13. ): (...args: any[]) => Observable<unknown> {
  14. if (resultSelector) {
  15. if (isScheduler(resultSelector)) {
  16. scheduler = resultSelector;
  17. } else {
  18. // The user provided a result selector.
  19. return function (this: any, ...args: any[]) {
  20. return (bindCallbackInternals(isNodeStyle, callbackFunc, scheduler) as any)
  21. .apply(this, args)
  22. .pipe(mapOneOrManyArgs(resultSelector as any));
  23. };
  24. }
  25. }
  26. // If a scheduler was passed, use our `subscribeOn` and `observeOn` operators
  27. // to compose that behavior for the user.
  28. if (scheduler) {
  29. return function (this: any, ...args: any[]) {
  30. return (bindCallbackInternals(isNodeStyle, callbackFunc) as any)
  31. .apply(this, args)
  32. .pipe(subscribeOn(scheduler!), observeOn(scheduler!));
  33. };
  34. }
  35. return function (this: any, ...args: any[]): Observable<any> {
  36. // We're using AsyncSubject, because it emits when it completes,
  37. // and it will play the value to all late-arriving subscribers.
  38. const subject = new AsyncSubject<any>();
  39. // If this is true, then we haven't called our function yet.
  40. let uninitialized = true;
  41. return new Observable((subscriber) => {
  42. // Add our subscriber to the subject.
  43. const subs = subject.subscribe(subscriber);
  44. if (uninitialized) {
  45. uninitialized = false;
  46. // We're going to execute the bound function
  47. // This bit is to signal that we are hitting the callback asynchronously.
  48. // Because we don't have any anti-"Zalgo" guarantees with whatever
  49. // function we are handed, we use this bit to figure out whether or not
  50. // we are getting hit in a callback synchronously during our call.
  51. let isAsync = false;
  52. // This is used to signal that the callback completed synchronously.
  53. let isComplete = false;
  54. // Call our function that has a callback. If at any time during this
  55. // call, an error is thrown, it will be caught by the Observable
  56. // subscription process and sent to the consumer.
  57. callbackFunc.apply(
  58. // Pass the appropriate `this` context.
  59. this,
  60. [
  61. // Pass the arguments.
  62. ...args,
  63. // And our callback handler.
  64. (...results: any[]) => {
  65. if (isNodeStyle) {
  66. // If this is a node callback, shift the first value off of the
  67. // results and check it, as it is the error argument. By shifting,
  68. // we leave only the argument(s) we want to pass to the consumer.
  69. const err = results.shift();
  70. if (err != null) {
  71. subject.error(err);
  72. // If we've errored, we can stop processing this function
  73. // as there's nothing else to do. Just return to escape.
  74. return;
  75. }
  76. }
  77. // If we have one argument, notify the consumer
  78. // of it as a single value, otherwise, if there's more than one, pass
  79. // them as an array. Note that if there are no arguments, `undefined`
  80. // will be emitted.
  81. subject.next(1 < results.length ? results : results[0]);
  82. // Flip this flag, so we know we can complete it in the synchronous
  83. // case below.
  84. isComplete = true;
  85. // If we're not asynchronous, we need to defer the `complete` call
  86. // until after the call to the function is over. This is because an
  87. // error could be thrown in the function after it calls our callback,
  88. // and if that is the case, if we complete here, we are unable to notify
  89. // the consumer than an error occurred.
  90. if (isAsync) {
  91. subject.complete();
  92. }
  93. },
  94. ]
  95. );
  96. // If we flipped `isComplete` during the call, we resolved synchronously,
  97. // notify complete, because we skipped it in the callback to wait
  98. // to make sure there were no errors during the call.
  99. if (isComplete) {
  100. subject.complete();
  101. }
  102. // We're no longer synchronous. If the callback is called at this point
  103. // we can notify complete on the spot.
  104. isAsync = true;
  105. }
  106. // Return the subscription from adding our subscriber to the subject.
  107. return subs;
  108. });
  109. };
  110. }