257544891ae17e5ee1d67ef945fab2804dd502b997402ef93fd9eafb140c5625ccb229773178c255db14caf6b4f5e1d5af7dff475d8d96afab05f30c1770ec 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
  2. import { map } from './map';
  3. import { innerFrom } from '../observable/innerFrom';
  4. import { operate } from '../util/lift';
  5. import { mergeInternals } from './mergeInternals';
  6. import { isFunction } from '../util/isFunction';
  7. /* tslint:disable:max-line-length */
  8. export function mergeMap<T, O extends ObservableInput<any>>(
  9. project: (value: T, index: number) => O,
  10. concurrent?: number
  11. ): OperatorFunction<T, ObservedValueOf<O>>;
  12. /** @deprecated The `resultSelector` parameter will be removed in v8. Use an inner `map` instead. Details: https://rxjs.dev/deprecations/resultSelector */
  13. export function mergeMap<T, O extends ObservableInput<any>>(
  14. project: (value: T, index: number) => O,
  15. resultSelector: undefined,
  16. concurrent?: number
  17. ): OperatorFunction<T, ObservedValueOf<O>>;
  18. /** @deprecated The `resultSelector` parameter will be removed in v8. Use an inner `map` instead. Details: https://rxjs.dev/deprecations/resultSelector */
  19. export function mergeMap<T, R, O extends ObservableInput<any>>(
  20. project: (value: T, index: number) => O,
  21. resultSelector: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R,
  22. concurrent?: number
  23. ): OperatorFunction<T, R>;
  24. /* tslint:enable:max-line-length */
  25. /**
  26. * Projects each source value to an Observable which is merged in the output
  27. * Observable.
  28. *
  29. * <span class="informal">Maps each value to an Observable, then flattens all of
  30. * these inner Observables using {@link mergeAll}.</span>
  31. *
  32. * ![](mergeMap.png)
  33. *
  34. * Returns an Observable that emits items based on applying a function that you
  35. * supply to each item emitted by the source Observable, where that function
  36. * returns an Observable, and then merging those resulting Observables and
  37. * emitting the results of this merger.
  38. *
  39. * ## Example
  40. *
  41. * Map and flatten each letter to an Observable ticking every 1 second
  42. *
  43. * ```ts
  44. * import { of, mergeMap, interval, map } from 'rxjs';
  45. *
  46. * const letters = of('a', 'b', 'c');
  47. * const result = letters.pipe(
  48. * mergeMap(x => interval(1000).pipe(map(i => x + i)))
  49. * );
  50. *
  51. * result.subscribe(x => console.log(x));
  52. *
  53. * // Results in the following:
  54. * // a0
  55. * // b0
  56. * // c0
  57. * // a1
  58. * // b1
  59. * // c1
  60. * // continues to list a, b, c every second with respective ascending integers
  61. * ```
  62. *
  63. * @see {@link concatMap}
  64. * @see {@link exhaustMap}
  65. * @see {@link merge}
  66. * @see {@link mergeAll}
  67. * @see {@link mergeMapTo}
  68. * @see {@link mergeScan}
  69. * @see {@link switchMap}
  70. *
  71. * @param project A function that, when applied to an item emitted by the source
  72. * Observable, returns an Observable.
  73. * @param concurrent Maximum number of `ObservableInput`s being subscribed to concurrently.
  74. * @return A function that returns an Observable that emits the result of
  75. * applying the projection function (and the optional deprecated
  76. * `resultSelector`) to each item emitted by the source Observable and merging
  77. * the results of the Observables obtained from this transformation.
  78. */
  79. export function mergeMap<T, R, O extends ObservableInput<any>>(
  80. project: (value: T, index: number) => O,
  81. resultSelector?: ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R) | number,
  82. concurrent: number = Infinity
  83. ): OperatorFunction<T, ObservedValueOf<O> | R> {
  84. if (isFunction(resultSelector)) {
  85. // DEPRECATED PATH
  86. return mergeMap((a, i) => map((b: any, ii: number) => resultSelector(a, b, i, ii))(innerFrom(project(a, i))), concurrent);
  87. } else if (typeof resultSelector === 'number') {
  88. concurrent = resultSelector;
  89. }
  90. return operate((source, subscriber) => mergeInternals(source, subscriber, project, concurrent));
  91. }