c1735267a537836022e5b0562be889b1182c71446f3b26fa14bacab39638866c231c6b8adab1c6448479ad9454bc5aa27323c1c9ee3209e58e9ddcf3bde49b 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import { Observable } from '../Observable';
  2. import { Subscription } from '../Subscription';
  3. import { Scheduler } from '../Scheduler';
  4. import { TestMessage } from './TestMessage';
  5. import { SubscriptionLog } from './SubscriptionLog';
  6. import { SubscriptionLoggable } from './SubscriptionLoggable';
  7. import { applyMixins } from '../util/applyMixins';
  8. import { Subscriber } from '../Subscriber';
  9. /**
  10. * We need this JSDoc comment for affecting ESDoc.
  11. * @ignore
  12. * @extends {Ignored}
  13. */
  14. export class ColdObservable<T> extends Observable<T> implements SubscriptionLoggable {
  15. public subscriptions: SubscriptionLog[] = [];
  16. scheduler: Scheduler;
  17. logSubscribedFrame: () => number;
  18. logUnsubscribedFrame: (index: number) => void;
  19. constructor(public messages: TestMessage[],
  20. scheduler: Scheduler) {
  21. super(function (this: Observable<T>, subscriber: Subscriber<any>) {
  22. const observable: ColdObservable<T> = this as any;
  23. const index = observable.logSubscribedFrame();
  24. const subscription = new Subscription();
  25. subscription.add(new Subscription(() => {
  26. observable.logUnsubscribedFrame(index);
  27. }));
  28. observable.scheduleMessages(subscriber);
  29. return subscription;
  30. });
  31. this.scheduler = scheduler;
  32. }
  33. scheduleMessages(subscriber: Subscriber<any>) {
  34. const messagesLength = this.messages.length;
  35. for (let i = 0; i < messagesLength; i++) {
  36. const message = this.messages[i];
  37. subscriber.add(
  38. this.scheduler.schedule(({ message, subscriber }) => { message.notification.observe(subscriber); },
  39. message.frame,
  40. { message, subscriber })
  41. );
  42. }
  43. }
  44. }
  45. applyMixins(ColdObservable, [SubscriptionLoggable]);