4d827f42d306639f0476aa780d0f51516f4bec438b5722e6ad241aed6411bf782a4ecbc70a6d526d24a04e9adb3b11e5aad9873bb2f41cd9b6a031d3ec8d1d 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import { scheduleObservable } from './scheduleObservable';
  2. import { schedulePromise } from './schedulePromise';
  3. import { scheduleArray } from './scheduleArray';
  4. import { scheduleIterable } from './scheduleIterable';
  5. import { scheduleAsyncIterable } from './scheduleAsyncIterable';
  6. import { isInteropObservable } from '../util/isInteropObservable';
  7. import { isPromise } from '../util/isPromise';
  8. import { isArrayLike } from '../util/isArrayLike';
  9. import { isIterable } from '../util/isIterable';
  10. import { ObservableInput, SchedulerLike } from '../types';
  11. import { Observable } from '../Observable';
  12. import { isAsyncIterable } from '../util/isAsyncIterable';
  13. import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
  14. import { isReadableStreamLike } from '../util/isReadableStreamLike';
  15. import { scheduleReadableStreamLike } from './scheduleReadableStreamLike';
  16. /**
  17. * Converts from a common {@link ObservableInput} type to an observable where subscription and emissions
  18. * are scheduled on the provided scheduler.
  19. *
  20. * @see {@link from}
  21. * @see {@link of}
  22. *
  23. * @param input The observable, array, promise, iterable, etc you would like to schedule
  24. * @param scheduler The scheduler to use to schedule the subscription and emissions from
  25. * the returned observable.
  26. */
  27. export function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike): Observable<T> {
  28. if (input != null) {
  29. if (isInteropObservable(input)) {
  30. return scheduleObservable(input, scheduler);
  31. }
  32. if (isArrayLike(input)) {
  33. return scheduleArray(input, scheduler);
  34. }
  35. if (isPromise(input)) {
  36. return schedulePromise(input, scheduler);
  37. }
  38. if (isAsyncIterable(input)) {
  39. return scheduleAsyncIterable(input, scheduler);
  40. }
  41. if (isIterable(input)) {
  42. return scheduleIterable(input, scheduler);
  43. }
  44. if (isReadableStreamLike(input)) {
  45. return scheduleReadableStreamLike(input, scheduler);
  46. }
  47. }
  48. throw createInvalidObservableTypeError(input);
  49. }