ed0b318d58dcbb461c2feebcaac30a2e05a1fbe0bc479ddcad92f1a6d135efad49d2f6eea9ebf7b1ba75e43babfed92a7d3a579484cd60cf29828e796533a6 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import { MonoTypeOperatorFunction, ObservableInput } from '../types';
  2. import { operate } from '../util/lift';
  3. import { createOperatorSubscriber } from './OperatorSubscriber';
  4. import { innerFrom } from '../observable/innerFrom';
  5. import { noop } from '../util/noop';
  6. /**
  7. * Emits the values emitted by the source Observable until a `notifier`
  8. * Observable emits a value.
  9. *
  10. * <span class="informal">Lets values pass until a second Observable,
  11. * `notifier`, emits a value. Then, it completes.</span>
  12. *
  13. * ![](takeUntil.png)
  14. *
  15. * `takeUntil` subscribes and begins mirroring the source Observable. It also
  16. * monitors a second Observable, `notifier` that you provide. If the `notifier`
  17. * emits a value, the output Observable stops mirroring the source Observable
  18. * and completes. If the `notifier` doesn't emit any value and completes
  19. * then `takeUntil` will pass all values.
  20. *
  21. * ## Example
  22. *
  23. * Tick every second until the first click happens
  24. *
  25. * ```ts
  26. * import { interval, fromEvent, takeUntil } from 'rxjs';
  27. *
  28. * const source = interval(1000);
  29. * const clicks = fromEvent(document, 'click');
  30. * const result = source.pipe(takeUntil(clicks));
  31. * result.subscribe(x => console.log(x));
  32. * ```
  33. *
  34. * @see {@link take}
  35. * @see {@link takeLast}
  36. * @see {@link takeWhile}
  37. * @see {@link skip}
  38. *
  39. * @param notifier The `ObservableInput` whose first emitted value will cause the output
  40. * Observable of `takeUntil` to stop emitting values from the source Observable.
  41. * @return A function that returns an Observable that emits the values from the
  42. * source Observable until `notifier` emits its first value.
  43. */
  44. export function takeUntil<T>(notifier: ObservableInput<any>): MonoTypeOperatorFunction<T> {
  45. return operate((source, subscriber) => {
  46. innerFrom(notifier).subscribe(createOperatorSubscriber(subscriber, () => subscriber.complete(), noop));
  47. !subscriber.closed && source.subscribe(subscriber);
  48. });
  49. }