b8b7f8a96773bd9c98ea8571f390b366cd5cb3c4818886d74100c7a94df650f050f099ddccf0aa6d9f40713511530c4623822b6043036b20c83b312c12cbc9 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import { Observable } from '../Observable';
  2. import { innerFrom } from '../observable/innerFrom';
  3. import { Subscriber } from '../Subscriber';
  4. import { ObservableInput, SchedulerLike } from '../types';
  5. import { executeSchedule } from '../util/executeSchedule';
  6. import { createOperatorSubscriber } from './OperatorSubscriber';
  7. /**
  8. * A process embodying the general "merge" strategy. This is used in
  9. * `mergeMap` and `mergeScan` because the logic is otherwise nearly identical.
  10. * @param source The original source observable
  11. * @param subscriber The consumer subscriber
  12. * @param project The projection function to get our inner sources
  13. * @param concurrent The number of concurrent inner subscriptions
  14. * @param onBeforeNext Additional logic to apply before nexting to our consumer
  15. * @param expand If `true` this will perform an "expand" strategy, which differs only
  16. * in that it recurses, and the inner subscription must be schedule-able.
  17. * @param innerSubScheduler A scheduler to use to schedule inner subscriptions,
  18. * this is to support the expand strategy, mostly, and should be deprecated
  19. */
  20. export function mergeInternals<T, R>(
  21. source: Observable<T>,
  22. subscriber: Subscriber<R>,
  23. project: (value: T, index: number) => ObservableInput<R>,
  24. concurrent: number,
  25. onBeforeNext?: (innerValue: R) => void,
  26. expand?: boolean,
  27. innerSubScheduler?: SchedulerLike,
  28. additionalFinalizer?: () => void
  29. ) {
  30. // Buffered values, in the event of going over our concurrency limit
  31. const buffer: T[] = [];
  32. // The number of active inner subscriptions.
  33. let active = 0;
  34. // An index to pass to our accumulator function
  35. let index = 0;
  36. // Whether or not the outer source has completed.
  37. let isComplete = false;
  38. /**
  39. * Checks to see if we can complete our result or not.
  40. */
  41. const checkComplete = () => {
  42. // If the outer has completed, and nothing is left in the buffer,
  43. // and we don't have any active inner subscriptions, then we can
  44. // Emit the state and complete.
  45. if (isComplete && !buffer.length && !active) {
  46. subscriber.complete();
  47. }
  48. };
  49. // If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait.
  50. const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
  51. const doInnerSub = (value: T) => {
  52. // If we're expanding, we need to emit the outer values and the inner values
  53. // as the inners will "become outers" in a way as they are recursively fed
  54. // back to the projection mechanism.
  55. expand && subscriber.next(value as any);
  56. // Increment the number of active subscriptions so we can track it
  57. // against our concurrency limit later.
  58. active++;
  59. // A flag used to show that the inner observable completed.
  60. // This is checked during finalization to see if we should
  61. // move to the next item in the buffer, if there is on.
  62. let innerComplete = false;
  63. // Start our inner subscription.
  64. innerFrom(project(value, index++)).subscribe(
  65. createOperatorSubscriber(
  66. subscriber,
  67. (innerValue) => {
  68. // `mergeScan` has additional handling here. For example
  69. // taking the inner value and updating state.
  70. onBeforeNext?.(innerValue);
  71. if (expand) {
  72. // If we're expanding, then just recurse back to our outer
  73. // handler. It will emit the value first thing.
  74. outerNext(innerValue as any);
  75. } else {
  76. // Otherwise, emit the inner value.
  77. subscriber.next(innerValue);
  78. }
  79. },
  80. () => {
  81. // Flag that we have completed, so we know to check the buffer
  82. // during finalization.
  83. innerComplete = true;
  84. },
  85. // Errors are passed to the destination.
  86. undefined,
  87. () => {
  88. // During finalization, if the inner completed (it wasn't errored or
  89. // cancelled), then we want to try the next item in the buffer if
  90. // there is one.
  91. if (innerComplete) {
  92. // We have to wrap this in a try/catch because it happens during
  93. // finalization, possibly asynchronously, and we want to pass
  94. // any errors that happen (like in a projection function) to
  95. // the outer Subscriber.
  96. try {
  97. // INNER SOURCE COMPLETE
  98. // Decrement the active count to ensure that the next time
  99. // we try to call `doInnerSub`, the number is accurate.
  100. active--;
  101. // If we have more values in the buffer, try to process those
  102. // Note that this call will increment `active` ahead of the
  103. // next conditional, if there were any more inner subscriptions
  104. // to start.
  105. while (buffer.length && active < concurrent) {
  106. const bufferedValue = buffer.shift()!;
  107. // Particularly for `expand`, we need to check to see if a scheduler was provided
  108. // for when we want to start our inner subscription. Otherwise, we just start
  109. // are next inner subscription.
  110. if (innerSubScheduler) {
  111. executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
  112. } else {
  113. doInnerSub(bufferedValue);
  114. }
  115. }
  116. // Check to see if we can complete, and complete if so.
  117. checkComplete();
  118. } catch (err) {
  119. subscriber.error(err);
  120. }
  121. }
  122. }
  123. )
  124. );
  125. };
  126. // Subscribe to our source observable.
  127. source.subscribe(
  128. createOperatorSubscriber(subscriber, outerNext, () => {
  129. // Outer completed, make a note of it, and check to see if we can complete everything.
  130. isComplete = true;
  131. checkComplete();
  132. })
  133. );
  134. // Additional finalization (for when the destination is torn down).
  135. // Other finalization is added implicitly via subscription above.
  136. return () => {
  137. additionalFinalizer?.();
  138. };
  139. }