05ff5b9f6493f2addf199d99d630da35df4684dd94242765e8113aeec6aa2226c97b6318241581c15053c4f42309519a2ba3d46609f4861966d02aab8c57d8 902 B

12345678910111213141516171819202122232425262728293031
  1. import { SchedulerLike } from '../types';
  2. import { Observable } from '../Observable';
  3. import { executeSchedule } from '../util/executeSchedule';
  4. export function scheduleAsyncIterable<T>(input: AsyncIterable<T>, scheduler: SchedulerLike) {
  5. if (!input) {
  6. throw new Error('Iterable cannot be null');
  7. }
  8. return new Observable<T>((subscriber) => {
  9. executeSchedule(subscriber, scheduler, () => {
  10. const iterator = input[Symbol.asyncIterator]();
  11. executeSchedule(
  12. subscriber,
  13. scheduler,
  14. () => {
  15. iterator.next().then((result) => {
  16. if (result.done) {
  17. // This will remove the subscriptions from
  18. // the parent subscription.
  19. subscriber.complete();
  20. } else {
  21. subscriber.next(result.value);
  22. }
  23. });
  24. },
  25. 0,
  26. true
  27. );
  28. });
  29. });
  30. }