29f79ec25327f7d98a3da287f59ed79e674f75bc09cc43ddf2398f7be1b4d1da4816ae6ea39c43478342b66edadd1fc3730ef5917c7edb533a115249201433 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import { Observable } from '../Observable';
  2. import { AsyncSubject } from '../AsyncSubject';
  3. import { ConnectableObservable } from '../observable/ConnectableObservable';
  4. import { UnaryFunction } from '../types';
  5. /**
  6. * Returns a connectable observable sequence that shares a single subscription to the
  7. * underlying sequence containing only the last notification.
  8. *
  9. * ![](publishLast.png)
  10. *
  11. * Similar to {@link publish}, but it waits until the source observable completes and stores
  12. * the last emitted value.
  13. * Similarly to {@link publishReplay} and {@link publishBehavior}, this keeps storing the last
  14. * value even if it has no more subscribers. If subsequent subscriptions happen, they will
  15. * immediately get that last stored value and complete.
  16. *
  17. * ## Example
  18. *
  19. * ```ts
  20. * import { ConnectableObservable, interval, publishLast, tap, take } from 'rxjs';
  21. *
  22. * const connectable = <ConnectableObservable<number>>interval(1000)
  23. * .pipe(
  24. * tap(x => console.log('side effect', x)),
  25. * take(3),
  26. * publishLast()
  27. * );
  28. *
  29. * connectable.subscribe({
  30. * next: x => console.log('Sub. A', x),
  31. * error: err => console.log('Sub. A Error', err),
  32. * complete: () => console.log('Sub. A Complete')
  33. * });
  34. *
  35. * connectable.subscribe({
  36. * next: x => console.log('Sub. B', x),
  37. * error: err => console.log('Sub. B Error', err),
  38. * complete: () => console.log('Sub. B Complete')
  39. * });
  40. *
  41. * connectable.connect();
  42. *
  43. * // Results:
  44. * // 'side effect 0' - after one second
  45. * // 'side effect 1' - after two seconds
  46. * // 'side effect 2' - after three seconds
  47. * // 'Sub. A 2' - immediately after 'side effect 2'
  48. * // 'Sub. B 2'
  49. * // 'Sub. A Complete'
  50. * // 'Sub. B Complete'
  51. * ```
  52. *
  53. * @see {@link ConnectableObservable}
  54. * @see {@link publish}
  55. * @see {@link publishReplay}
  56. * @see {@link publishBehavior}
  57. *
  58. * @return A function that returns an Observable that emits elements of a
  59. * sequence produced by multicasting the source sequence.
  60. * @deprecated Will be removed in v8. To create a connectable observable with an
  61. * {@link AsyncSubject} under the hood, use {@link connectable}.
  62. * `source.pipe(publishLast())` is equivalent to
  63. * `connectable(source, { connector: () => new AsyncSubject(), resetOnDisconnect: false })`.
  64. * If you're using {@link refCount} after `publishLast`, use the {@link share} operator instead.
  65. * `source.pipe(publishLast(), refCount())` is equivalent to
  66. * `source.pipe(share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))`.
  67. * Details: https://rxjs.dev/deprecations/multicasting
  68. */
  69. export function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
  70. // Note that this has *never* supported a selector function like `publish` and `publishReplay`.
  71. return (source) => {
  72. const subject = new AsyncSubject<T>();
  73. return new ConnectableObservable(source, () => subject);
  74. };
  75. }