1e90751ba04c69dc1bee25660583125b923fea5b9b599ff3a80bea6339371f22c122f3010182d85108d4e77551040c415e2f87f38e9b72f2d5769f32c74c9a 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import { Observable } from '../Observable';
  2. import { Operator } from '../Operator';
  3. import { Observer, OperatorFunction } from '../types';
  4. import { Subscriber } from '../Subscriber';
  5. /**
  6. * Counts the number of emissions on the source and emits that number when the
  7. * source completes.
  8. *
  9. * <span class="informal">Tells how many values were emitted, when the source
  10. * completes.</span>
  11. *
  12. * ![](count.png)
  13. *
  14. * `count` transforms an Observable that emits values into an Observable that
  15. * emits a single value that represents the number of values emitted by the
  16. * source Observable. If the source Observable terminates with an error, `count`
  17. * will pass this error notification along without emitting a value first. If
  18. * the source Observable does not terminate at all, `count` will neither emit
  19. * a value nor terminate. This operator takes an optional `predicate` function
  20. * as argument, in which case the output emission will represent the number of
  21. * source values that matched `true` with the `predicate`.
  22. *
  23. * ## Examples
  24. *
  25. * Counts how many seconds have passed before the first click happened
  26. * ```ts
  27. * import { fromEvent, interval } from 'rxjs';
  28. * import { count, takeUntil } from 'rxjs/operators';
  29. *
  30. * const seconds = interval(1000);
  31. * const clicks = fromEvent(document, 'click');
  32. * const secondsBeforeClick = seconds.pipe(takeUntil(clicks));
  33. * const result = secondsBeforeClick.pipe(count());
  34. * result.subscribe(x => console.log(x));
  35. * ```
  36. *
  37. * Counts how many odd numbers are there between 1 and 7
  38. * ```ts
  39. * import { range } from 'rxjs';
  40. * import { count } from 'rxjs/operators';
  41. *
  42. * const numbers = range(1, 7);
  43. * const result = numbers.pipe(count(i => i % 2 === 1));
  44. * result.subscribe(x => console.log(x));
  45. * // Results in:
  46. * // 4
  47. * ```
  48. *
  49. * @see {@link max}
  50. * @see {@link min}
  51. * @see {@link reduce}
  52. *
  53. * @param {function(value: T, i: number, source: Observable<T>): boolean} [predicate] A
  54. * boolean function to select what values are to be counted. It is provided with
  55. * arguments of:
  56. * - `value`: the value from the source Observable.
  57. * - `index`: the (zero-based) "index" of the value from the source Observable.
  58. * - `source`: the source Observable instance itself.
  59. * @return {Observable} An Observable of one number that represents the count as
  60. * described above.
  61. * @method count
  62. * @owner Observable
  63. */
  64. export function count<T>(predicate?: (value: T, index: number, source: Observable<T>) => boolean): OperatorFunction<T, number> {
  65. return (source: Observable<T>) => source.lift(new CountOperator(predicate, source));
  66. }
  67. class CountOperator<T> implements Operator<T, number> {
  68. constructor(private predicate?: (value: T, index: number, source: Observable<T>) => boolean,
  69. private source?: Observable<T>) {
  70. }
  71. call(subscriber: Subscriber<number>, source: any): any {
  72. return source.subscribe(new CountSubscriber(subscriber, this.predicate, this.source));
  73. }
  74. }
  75. /**
  76. * We need this JSDoc comment for affecting ESDoc.
  77. * @ignore
  78. * @extends {Ignored}
  79. */
  80. class CountSubscriber<T> extends Subscriber<T> {
  81. private count: number = 0;
  82. private index: number = 0;
  83. constructor(destination: Observer<number>,
  84. private predicate?: (value: T, index: number, source: Observable<T>) => boolean,
  85. private source?: Observable<T>) {
  86. super(destination);
  87. }
  88. protected _next(value: T): void {
  89. if (this.predicate) {
  90. this._tryPredicate(value);
  91. } else {
  92. this.count++;
  93. }
  94. }
  95. private _tryPredicate(value: T) {
  96. let result: any;
  97. try {
  98. result = this.predicate(value, this.index++, this.source);
  99. } catch (err) {
  100. this.destination.error(err);
  101. return;
  102. }
  103. if (result) {
  104. this.count++;
  105. }
  106. }
  107. protected _complete(): void {
  108. this.destination.next(this.count);
  109. this.destination.complete();
  110. }
  111. }