cf6457b141d941f912c00de2e05505265f3b1f5b6db638ce0a114aca6db6f3ce19b07e0bde712a8dbbda9943296403c8e4f9c2d396ef9c48add2b70c73a645 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import { Observable } from './Observable';
  2. import { EmptyError } from './util/EmptyError';
  3. export interface LastValueFromConfig<T> {
  4. defaultValue: T;
  5. }
  6. export function lastValueFrom<T, D>(source: Observable<T>, config: LastValueFromConfig<D>): Promise<T | D>;
  7. export function lastValueFrom<T>(source: Observable<T>): Promise<T>;
  8. /**
  9. * Converts an observable to a promise by subscribing to the observable,
  10. * waiting for it to complete, and resolving the returned promise with the
  11. * last value from the observed stream.
  12. *
  13. * If the observable stream completes before any values were emitted, the
  14. * returned promise will reject with {@link EmptyError} or will resolve
  15. * with the default value if a default was specified.
  16. *
  17. * If the observable stream emits an error, the returned promise will reject
  18. * with that error.
  19. *
  20. * **WARNING**: Only use this with observables you *know* will complete. If the source
  21. * observable does not complete, you will end up with a promise that is hung up, and
  22. * potentially all of the state of an async function hanging out in memory. To avoid
  23. * this situation, look into adding something like {@link timeout}, {@link take},
  24. * {@link takeWhile}, or {@link takeUntil} amongst others.
  25. *
  26. * ## Example
  27. *
  28. * Wait for the last value from a stream and emit it from a promise in
  29. * an async function
  30. *
  31. * ```ts
  32. * import { interval, take, lastValueFrom } from 'rxjs';
  33. *
  34. * async function execute() {
  35. * const source$ = interval(2000).pipe(take(10));
  36. * const finalNumber = await lastValueFrom(source$);
  37. * console.log(`The final number is ${ finalNumber }`);
  38. * }
  39. *
  40. * execute();
  41. *
  42. * // Expected output:
  43. * // 'The final number is 9'
  44. * ```
  45. *
  46. * @see {@link firstValueFrom}
  47. *
  48. * @param source the observable to convert to a promise
  49. * @param config a configuration object to define the `defaultValue` to use if the source completes without emitting a value
  50. */
  51. export function lastValueFrom<T, D>(source: Observable<T>, config?: LastValueFromConfig<D>): Promise<T | D> {
  52. const hasConfig = typeof config === 'object';
  53. return new Promise<T | D>((resolve, reject) => {
  54. let _hasValue = false;
  55. let _value: T;
  56. source.subscribe({
  57. next: (value) => {
  58. _value = value;
  59. _hasValue = true;
  60. },
  61. error: reject,
  62. complete: () => {
  63. if (_hasValue) {
  64. resolve(_value);
  65. } else if (hasConfig) {
  66. resolve(config!.defaultValue);
  67. } else {
  68. reject(new EmptyError());
  69. }
  70. },
  71. });
  72. });
  73. }