fb77a3da78b7b5b51b3e9dd85bba962aa6b57346fbbe19be4c04bae8384ebd6340670920af68387ee7630bfb01e36aadb974d3a02a4144b97c7227de911d50 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import { createOperatorSubscriber } from '../../operators/OperatorSubscriber';
  2. import { Observable } from '../../Observable';
  3. import { innerFrom } from '../../observable/innerFrom';
  4. import { ObservableInput } from '../../types';
  5. export function fromFetch<T>(
  6. input: string | Request,
  7. init: RequestInit & {
  8. selector: (response: Response) => ObservableInput<T>;
  9. }
  10. ): Observable<T>;
  11. export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response>;
  12. /**
  13. * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
  14. * make an HTTP request.
  15. *
  16. * **WARNING** Parts of the fetch API are still experimental. `AbortController` is
  17. * required for this implementation to work and use cancellation appropriately.
  18. *
  19. * Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController)
  20. * in order to finalize the internal `fetch` when the subscription tears down.
  21. *
  22. * If a `signal` is provided via the `init` argument, it will behave like it usually does with
  23. * `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with
  24. * in that scenario will be emitted as an error from the observable.
  25. *
  26. * ## Examples
  27. *
  28. * Basic use
  29. *
  30. * ```ts
  31. * import { fromFetch } from 'rxjs/fetch';
  32. * import { switchMap, of, catchError } from 'rxjs';
  33. *
  34. * const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe(
  35. * switchMap(response => {
  36. * if (response.ok) {
  37. * // OK return data
  38. * return response.json();
  39. * } else {
  40. * // Server is returning a status requiring the client to try something else.
  41. * return of({ error: true, message: `Error ${ response.status }` });
  42. * }
  43. * }),
  44. * catchError(err => {
  45. * // Network or other error, handle appropriately
  46. * console.error(err);
  47. * return of({ error: true, message: err.message })
  48. * })
  49. * );
  50. *
  51. * data$.subscribe({
  52. * next: result => console.log(result),
  53. * complete: () => console.log('done')
  54. * });
  55. * ```
  56. *
  57. * ### Use with Chunked Transfer Encoding
  58. *
  59. * With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
  60. * the promise returned by `fetch` will resolve as soon as the response's headers are
  61. * received.
  62. *
  63. * That means the `fromFetch` observable will emit a `Response` - and will
  64. * then complete - before the body is received. When one of the methods on the
  65. * `Response` - like `text()` or `json()` - is called, the returned promise will not
  66. * resolve until the entire body has been received. Unsubscribing from any observable
  67. * that uses the promise as an observable input will not abort the request.
  68. *
  69. * To facilitate aborting the retrieval of responses that use chunked transfer encoding,
  70. * a `selector` can be specified via the `init` parameter:
  71. *
  72. * ```ts
  73. * import { of } from 'rxjs';
  74. * import { fromFetch } from 'rxjs/fetch';
  75. *
  76. * const data$ = fromFetch('https://api.github.com/users?per_page=5', {
  77. * selector: response => response.json()
  78. * });
  79. *
  80. * data$.subscribe({
  81. * next: result => console.log(result),
  82. * complete: () => console.log('done')
  83. * });
  84. * ```
  85. *
  86. * @param input The resource you would like to fetch. Can be a url or a request object.
  87. * @param initWithSelector A configuration object for the fetch.
  88. * [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters)
  89. * @returns An Observable, that when subscribed to, performs an HTTP request using the native `fetch`
  90. * function. The {@link Subscription} is tied to an `AbortController` for the fetch.
  91. */
  92. export function fromFetch<T>(
  93. input: string | Request,
  94. initWithSelector: RequestInit & {
  95. selector?: (response: Response) => ObservableInput<T>;
  96. } = {}
  97. ): Observable<Response | T> {
  98. const { selector, ...init } = initWithSelector;
  99. return new Observable<Response | T>((subscriber) => {
  100. // Our controller for aborting this fetch.
  101. // Any externally provided AbortSignal will have to call
  102. // abort on this controller when signaled, because the
  103. // signal from this controller is what is being passed to `fetch`.
  104. const controller = new AbortController();
  105. const { signal } = controller;
  106. // This flag exists to make sure we don't `abort()` the fetch upon tearing down
  107. // this observable after emitting a Response. Aborting in such circumstances
  108. // would also abort subsequent methods - like `json()` - that could be called
  109. // on the Response. Consider: `fromFetch().pipe(take(1), mergeMap(res => res.json()))`
  110. let abortable = true;
  111. // If the user provided an init configuration object,
  112. // let's process it and chain our abort signals, if necessary.
  113. // If a signal is provided, just have it finalized. It's a cancellation token, basically.
  114. const { signal: outerSignal } = init;
  115. if (outerSignal) {
  116. if (outerSignal.aborted) {
  117. controller.abort();
  118. } else {
  119. // We got an AbortSignal from the arguments passed into `fromFetch`.
  120. // We need to wire up our AbortController to abort when this signal aborts.
  121. const outerSignalHandler = () => {
  122. if (!signal.aborted) {
  123. controller.abort();
  124. }
  125. };
  126. outerSignal.addEventListener('abort', outerSignalHandler);
  127. subscriber.add(() => outerSignal.removeEventListener('abort', outerSignalHandler));
  128. }
  129. }
  130. // The initialization object passed to `fetch` as the second
  131. // argument. This ferries in important information, including our
  132. // AbortSignal. Create a new init, so we don't accidentally mutate the
  133. // passed init, or reassign it. This is because the init passed in
  134. // is shared between each subscription to the result.
  135. const perSubscriberInit: RequestInit = { ...init, signal };
  136. const handleError = (err: any) => {
  137. abortable = false;
  138. subscriber.error(err);
  139. };
  140. fetch(input, perSubscriberInit)
  141. .then((response) => {
  142. if (selector) {
  143. // If we have a selector function, use it to project our response.
  144. // Note that any error that comes from our selector will be
  145. // sent to the promise `catch` below and handled.
  146. innerFrom(selector(response)).subscribe(
  147. createOperatorSubscriber(
  148. subscriber,
  149. // Values are passed through to the subscriber
  150. undefined,
  151. // The projected response is complete.
  152. () => {
  153. abortable = false;
  154. subscriber.complete();
  155. },
  156. handleError
  157. )
  158. );
  159. } else {
  160. abortable = false;
  161. subscriber.next(response);
  162. subscriber.complete();
  163. }
  164. })
  165. .catch(handleError);
  166. return () => {
  167. if (abortable) {
  168. controller.abort();
  169. }
  170. };
  171. });
  172. }