ae954b3fea185344952f56755cb711f55e858860df4db74dc56c389f51c30f9ddb88422a5895a3da36d5c7b120b72f0c2d0ee0dee1507ba5ee7f6b15a83ef5 852 B

12345678910111213141516171819
  1. import { Observable } from '../Observable';
  2. import { Subscription } from '../Subscription';
  3. import { observable as Symbol_observable } from '../symbol/observable';
  4. import { InteropObservable, SchedulerLike, Subscribable } from '../types';
  5. export function scheduleObservable<T>(input: InteropObservable<T>, scheduler: SchedulerLike) {
  6. return new Observable<T>(subscriber => {
  7. const sub = new Subscription();
  8. sub.add(scheduler.schedule(() => {
  9. const observable: Subscribable<T> = input[Symbol_observable]();
  10. sub.add(observable.subscribe({
  11. next(value) { sub.add(scheduler.schedule(() => subscriber.next(value))); },
  12. error(err) { sub.add(scheduler.schedule(() => subscriber.error(err))); },
  13. complete() { sub.add(scheduler.schedule(() => subscriber.complete())); },
  14. }));
  15. }));
  16. return sub;
  17. });
  18. }