9deed2a08b1f7b41551a9564c1c2e178cebec44427aa42d67b9b4795283b3c7bd82177d0b378535380816e563b06e0faaee20b45e7f2643a4afdbc0342aace 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import { isArrayLike } from '../util/isArrayLike';
  2. import { isPromise } from '../util/isPromise';
  3. import { Observable } from '../Observable';
  4. import { ObservableInput, ObservedValueOf, ReadableStreamLike } from '../types';
  5. import { isInteropObservable } from '../util/isInteropObservable';
  6. import { isAsyncIterable } from '../util/isAsyncIterable';
  7. import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
  8. import { isIterable } from '../util/isIterable';
  9. import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';
  10. import { Subscriber } from '../Subscriber';
  11. import { isFunction } from '../util/isFunction';
  12. import { reportUnhandledError } from '../util/reportUnhandledError';
  13. import { observable as Symbol_observable } from '../symbol/observable';
  14. export function innerFrom<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
  15. export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
  16. if (input instanceof Observable) {
  17. return input;
  18. }
  19. if (input != null) {
  20. if (isInteropObservable(input)) {
  21. return fromInteropObservable(input);
  22. }
  23. if (isArrayLike(input)) {
  24. return fromArrayLike(input);
  25. }
  26. if (isPromise(input)) {
  27. return fromPromise(input);
  28. }
  29. if (isAsyncIterable(input)) {
  30. return fromAsyncIterable(input);
  31. }
  32. if (isIterable(input)) {
  33. return fromIterable(input);
  34. }
  35. if (isReadableStreamLike(input)) {
  36. return fromReadableStreamLike(input);
  37. }
  38. }
  39. throw createInvalidObservableTypeError(input);
  40. }
  41. /**
  42. * Creates an RxJS Observable from an object that implements `Symbol.observable`.
  43. * @param obj An object that properly implements `Symbol.observable`.
  44. */
  45. export function fromInteropObservable<T>(obj: any) {
  46. return new Observable((subscriber: Subscriber<T>) => {
  47. const obs = obj[Symbol_observable]();
  48. if (isFunction(obs.subscribe)) {
  49. return obs.subscribe(subscriber);
  50. }
  51. // Should be caught by observable subscribe function error handling.
  52. throw new TypeError('Provided object does not correctly implement Symbol.observable');
  53. });
  54. }
  55. /**
  56. * Synchronously emits the values of an array like and completes.
  57. * This is exported because there are creation functions and operators that need to
  58. * make direct use of the same logic, and there's no reason to make them run through
  59. * `from` conditionals because we *know* they're dealing with an array.
  60. * @param array The array to emit values from
  61. */
  62. export function fromArrayLike<T>(array: ArrayLike<T>) {
  63. return new Observable((subscriber: Subscriber<T>) => {
  64. // Loop over the array and emit each value. Note two things here:
  65. // 1. We're making sure that the subscriber is not closed on each loop.
  66. // This is so we don't continue looping over a very large array after
  67. // something like a `take`, `takeWhile`, or other synchronous unsubscription
  68. // has already unsubscribed.
  69. // 2. In this form, reentrant code can alter that array we're looping over.
  70. // This is a known issue, but considered an edge case. The alternative would
  71. // be to copy the array before executing the loop, but this has
  72. // performance implications.
  73. for (let i = 0; i < array.length && !subscriber.closed; i++) {
  74. subscriber.next(array[i]);
  75. }
  76. subscriber.complete();
  77. });
  78. }
  79. export function fromPromise<T>(promise: PromiseLike<T>) {
  80. return new Observable((subscriber: Subscriber<T>) => {
  81. promise
  82. .then(
  83. (value) => {
  84. if (!subscriber.closed) {
  85. subscriber.next(value);
  86. subscriber.complete();
  87. }
  88. },
  89. (err: any) => subscriber.error(err)
  90. )
  91. .then(null, reportUnhandledError);
  92. });
  93. }
  94. export function fromIterable<T>(iterable: Iterable<T>) {
  95. return new Observable((subscriber: Subscriber<T>) => {
  96. for (const value of iterable) {
  97. subscriber.next(value);
  98. if (subscriber.closed) {
  99. return;
  100. }
  101. }
  102. subscriber.complete();
  103. });
  104. }
  105. export function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
  106. return new Observable((subscriber: Subscriber<T>) => {
  107. process(asyncIterable, subscriber).catch((err) => subscriber.error(err));
  108. });
  109. }
  110. export function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
  111. return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
  112. }
  113. async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
  114. for await (const value of asyncIterable) {
  115. subscriber.next(value);
  116. // A side-effect may have closed our subscriber,
  117. // check before the next iteration.
  118. if (subscriber.closed) {
  119. return;
  120. }
  121. }
  122. subscriber.complete();
  123. }