f3a26d180882fd80839eb82ac36bfd5748547780e8e8148f738fe2b00802cad312ae1e51007ecfff1a5c786ac771a9d4c78bd56ab5ee8a3264360617b36ece 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import { SchedulerLike, SchedulerAction } from '../types';
  2. import { Subscriber } from '../Subscriber';
  3. import { Subscription } from '../Subscription';
  4. import { Observable } from '../Observable';
  5. import { asap } from '../scheduler/asap';
  6. import { isNumeric } from '../util/isNumeric';
  7. export interface DispatchArg<T> {
  8. source: Observable<T>;
  9. subscriber: Subscriber<T>;
  10. }
  11. /**
  12. * We need this JSDoc comment for affecting ESDoc.
  13. * @extends {Ignored}
  14. * @hide true
  15. */
  16. export class SubscribeOnObservable<T> extends Observable<T> {
  17. /** @nocollapse */
  18. static create<T>(source: Observable<T>, delay: number = 0, scheduler: SchedulerLike = asap): Observable<T> {
  19. return new SubscribeOnObservable(source, delay, scheduler);
  20. }
  21. /** @nocollapse */
  22. static dispatch<T>(this: SchedulerAction<T>, arg: DispatchArg<T>): Subscription {
  23. const { source, subscriber } = arg;
  24. return this.add(source.subscribe(subscriber));
  25. }
  26. constructor(public source: Observable<T>,
  27. private delayTime: number = 0,
  28. private scheduler: SchedulerLike = asap) {
  29. super();
  30. if (!isNumeric(delayTime) || delayTime < 0) {
  31. this.delayTime = 0;
  32. }
  33. if (!scheduler || typeof scheduler.schedule !== 'function') {
  34. this.scheduler = asap;
  35. }
  36. }
  37. /** @deprecated This is an internal implementation detail, do not use. */
  38. _subscribe(subscriber: Subscriber<T>) {
  39. const delay = this.delayTime;
  40. const source = this.source;
  41. const scheduler = this.scheduler;
  42. return scheduler.schedule<DispatchArg<any>>(SubscribeOnObservable.dispatch, delay, {
  43. source, subscriber
  44. });
  45. }
  46. }