b3911fea2a5f701d7ddeafd864604cc4b40d642022fb31dd5160b61bd01a1084739fd1d6970890ab014892cc2f49b079fbb2e15ed5e25f9d38cdb8fd6bac81 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import { Observable } from '../Observable';
  2. import { ReplaySubject } from '../ReplaySubject';
  3. import { multicast } from './multicast';
  4. import { MonoTypeOperatorFunction, OperatorFunction, TimestampProvider, ObservableInput, ObservedValueOf } from '../types';
  5. import { isFunction } from '../util/isFunction';
  6. /**
  7. * Creates a {@link ConnectableObservable} that uses a {@link ReplaySubject}
  8. * internally.
  9. *
  10. * @param bufferSize The buffer size for the underlying {@link ReplaySubject}.
  11. * @param windowTime The window time for the underlying {@link ReplaySubject}.
  12. * @param timestampProvider The timestamp provider for the underlying {@link ReplaySubject}.
  13. * @deprecated Will be removed in v8. To create a connectable observable that uses a
  14. * {@link ReplaySubject} under the hood, use {@link connectable}.
  15. * `source.pipe(publishReplay(size, time, scheduler))` is equivalent to
  16. * `connectable(source, { connector: () => new ReplaySubject(size, time, scheduler), resetOnDisconnect: false })`.
  17. * If you're using {@link refCount} after `publishReplay`, use the {@link share} operator instead.
  18. * `publishReplay(size, time, scheduler), refCount()` is equivalent to
  19. * `share({ connector: () => new ReplaySubject(size, time, scheduler), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })`.
  20. * Details: https://rxjs.dev/deprecations/multicasting
  21. */
  22. export function publishReplay<T>(
  23. bufferSize?: number,
  24. windowTime?: number,
  25. timestampProvider?: TimestampProvider
  26. ): MonoTypeOperatorFunction<T>;
  27. /**
  28. * Creates an observable, that when subscribed to, will create a {@link ReplaySubject},
  29. * and pass an observable from it (using [asObservable](api/index/class/Subject#asObservable)) to
  30. * the `selector` function, which then returns an observable that is subscribed to before
  31. * "connecting" the source to the internal `ReplaySubject`.
  32. *
  33. * Since this is deprecated, for additional details see the documentation for {@link connect}.
  34. *
  35. * @param bufferSize The buffer size for the underlying {@link ReplaySubject}.
  36. * @param windowTime The window time for the underlying {@link ReplaySubject}.
  37. * @param selector A function used to setup the multicast.
  38. * @param timestampProvider The timestamp provider for the underlying {@link ReplaySubject}.
  39. * @deprecated Will be removed in v8. Use the {@link connect} operator instead.
  40. * `source.pipe(publishReplay(size, window, selector, scheduler))` is equivalent to
  41. * `source.pipe(connect(selector, { connector: () => new ReplaySubject(size, window, scheduler) }))`.
  42. * Details: https://rxjs.dev/deprecations/multicasting
  43. */
  44. export function publishReplay<T, O extends ObservableInput<any>>(
  45. bufferSize: number | undefined,
  46. windowTime: number | undefined,
  47. selector: (shared: Observable<T>) => O,
  48. timestampProvider?: TimestampProvider
  49. ): OperatorFunction<T, ObservedValueOf<O>>;
  50. /**
  51. * Creates a {@link ConnectableObservable} that uses a {@link ReplaySubject}
  52. * internally.
  53. *
  54. * @param bufferSize The buffer size for the underlying {@link ReplaySubject}.
  55. * @param windowTime The window time for the underlying {@link ReplaySubject}.
  56. * @param selector Passing `undefined` here determines that this operator will return a {@link ConnectableObservable}.
  57. * @param timestampProvider The timestamp provider for the underlying {@link ReplaySubject}.
  58. * @deprecated Will be removed in v8. To create a connectable observable that uses a
  59. * {@link ReplaySubject} under the hood, use {@link connectable}.
  60. * `source.pipe(publishReplay(size, time, scheduler))` is equivalent to
  61. * `connectable(source, { connector: () => new ReplaySubject(size, time, scheduler), resetOnDisconnect: false })`.
  62. * If you're using {@link refCount} after `publishReplay`, use the {@link share} operator instead.
  63. * `publishReplay(size, time, scheduler), refCount()` is equivalent to
  64. * `share({ connector: () => new ReplaySubject(size, time, scheduler), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })`.
  65. * Details: https://rxjs.dev/deprecations/multicasting
  66. */
  67. export function publishReplay<T, O extends ObservableInput<any>>(
  68. bufferSize: number | undefined,
  69. windowTime: number | undefined,
  70. selector: undefined,
  71. timestampProvider: TimestampProvider
  72. ): OperatorFunction<T, ObservedValueOf<O>>;
  73. /**
  74. * @deprecated Will be removed in v8. Use the {@link connectable} observable, the {@link connect} operator or the
  75. * {@link share} operator instead. See the overloads below for equivalent replacement examples of this operator's
  76. * behaviors.
  77. * Details: https://rxjs.dev/deprecations/multicasting
  78. */
  79. export function publishReplay<T, R>(
  80. bufferSize?: number,
  81. windowTime?: number,
  82. selectorOrScheduler?: TimestampProvider | OperatorFunction<T, R>,
  83. timestampProvider?: TimestampProvider
  84. ) {
  85. if (selectorOrScheduler && !isFunction(selectorOrScheduler)) {
  86. timestampProvider = selectorOrScheduler;
  87. }
  88. const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined;
  89. // Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument
  90. // but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross).
  91. return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, timestampProvider), selector!)(source);
  92. }