5fdcac5360b605b14a06b11b749b9a096ebb60d733631dab81b62df1edf67e8c0129a1c328915c55846c50d55a664cd85ed0f9a2f380b1d4d631785d65eab4 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import { Subject } from '../Subject';
  2. import { Operator } from '../Operator';
  3. import { Subscriber } from '../Subscriber';
  4. import { Observable } from '../Observable';
  5. import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable';
  6. import { MonoTypeOperatorFunction, OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types';
  7. /* tslint:disable:max-line-length */
  8. export function multicast<T>(subject: Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
  9. export function multicast<T, O extends ObservableInput<any>>(subject: Subject<T>, selector: (shared: Observable<T>) => O): UnaryFunction<Observable<T>, ConnectableObservable<ObservedValueOf<O>>>;
  10. export function multicast<T>(subjectFactory: (this: Observable<T>) => Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
  11. export function multicast<T, O extends ObservableInput<any>>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
  12. /* tslint:enable:max-line-length */
  13. /**
  14. * Returns an Observable that emits the results of invoking a specified selector on items
  15. * emitted by a ConnectableObservable that shares a single subscription to the underlying stream.
  16. *
  17. * ![](multicast.png)
  18. *
  19. * @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate subject through
  20. * which the source sequence's elements will be multicast to the selector function
  21. * or Subject to push source elements into.
  22. * @param {Function} [selector] - Optional selector function that can use the multicasted source stream
  23. * as many times as needed, without causing multiple subscriptions to the source stream.
  24. * Subscribers to the given source will receive all notifications of the source from the
  25. * time of the subscription forward.
  26. * @return {Observable} An Observable that emits the results of invoking the selector
  27. * on the items emitted by a `ConnectableObservable` that shares a single subscription to
  28. * the underlying stream.
  29. * @method multicast
  30. * @owner Observable
  31. */
  32. export function multicast<T, R>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
  33. selector?: (source: Observable<T>) => Observable<R>): OperatorFunction<T, R> {
  34. return function multicastOperatorFunction(source: Observable<T>): Observable<R> {
  35. let subjectFactory: () => Subject<T>;
  36. if (typeof subjectOrSubjectFactory === 'function') {
  37. subjectFactory = <() => Subject<T>>subjectOrSubjectFactory;
  38. } else {
  39. subjectFactory = function subjectFactory() {
  40. return <Subject<T>>subjectOrSubjectFactory;
  41. };
  42. }
  43. if (typeof selector === 'function') {
  44. return source.lift(new MulticastOperator(subjectFactory, selector));
  45. }
  46. const connectable: any = Object.create(source, connectableObservableDescriptor);
  47. connectable.source = source;
  48. connectable.subjectFactory = subjectFactory;
  49. return <ConnectableObservable<R>> connectable;
  50. };
  51. }
  52. export class MulticastOperator<T, R> implements Operator<T, R> {
  53. constructor(private subjectFactory: () => Subject<T>,
  54. private selector: (source: Observable<T>) => Observable<R>) {
  55. }
  56. call(subscriber: Subscriber<R>, source: any): any {
  57. const { selector } = this;
  58. const subject = this.subjectFactory();
  59. const subscription = selector(subject).subscribe(subscriber);
  60. subscription.add(source.subscribe(subject));
  61. return subscription;
  62. }
  63. }