0e2392447ee8834965317df7a975d9f7e3610228194cb755bd1106f4a72a54c7399a0e4c6e6c56164ab03251c083ef6e7627969df69bcd45253dfc9f39da9b 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import { MonoTypeOperatorFunction } from '../types';
  2. import { EMPTY } from '../observable/empty';
  3. import { operate } from '../util/lift';
  4. import { createOperatorSubscriber } from './OperatorSubscriber';
  5. /**
  6. * Emits only the first `count` values emitted by the source Observable.
  7. *
  8. * <span class="informal">Takes the first `count` values from the source, then
  9. * completes.</span>
  10. *
  11. * ![](take.png)
  12. *
  13. * `take` returns an Observable that emits only the first `count` values emitted
  14. * by the source Observable. If the source emits fewer than `count` values then
  15. * all of its values are emitted. After that, it completes, regardless if the
  16. * source completes.
  17. *
  18. * ## Example
  19. *
  20. * Take the first 5 seconds of an infinite 1-second interval Observable
  21. *
  22. * ```ts
  23. * import { interval, take } from 'rxjs';
  24. *
  25. * const intervalCount = interval(1000);
  26. * const takeFive = intervalCount.pipe(take(5));
  27. * takeFive.subscribe(x => console.log(x));
  28. *
  29. * // Logs:
  30. * // 0
  31. * // 1
  32. * // 2
  33. * // 3
  34. * // 4
  35. * ```
  36. *
  37. * @see {@link takeLast}
  38. * @see {@link takeUntil}
  39. * @see {@link takeWhile}
  40. * @see {@link skip}
  41. *
  42. * @param count The maximum number of `next` values to emit.
  43. * @return A function that returns an Observable that emits only the first
  44. * `count` values emitted by the source Observable, or all of the values from
  45. * the source if the source emits fewer than `count` values.
  46. */
  47. export function take<T>(count: number): MonoTypeOperatorFunction<T> {
  48. return count <= 0
  49. ? // If we are taking no values, that's empty.
  50. () => EMPTY
  51. : operate((source, subscriber) => {
  52. let seen = 0;
  53. source.subscribe(
  54. createOperatorSubscriber(subscriber, (value) => {
  55. // Increment the number of values we have seen,
  56. // then check it against the allowed count to see
  57. // if we are still letting values through.
  58. if (++seen <= count) {
  59. subscriber.next(value);
  60. // If we have met or passed our allowed count,
  61. // we need to complete. We have to do <= here,
  62. // because re-entrant code will increment `seen` twice.
  63. if (count <= seen) {
  64. subscriber.complete();
  65. }
  66. }
  67. })
  68. );
  69. });
  70. }