bbf126b71a7341507267714429391fa1cd1599bb5dc0ec0eb52522d75252f40268ef0731e4ce7edaf52ab31dc248bcc7c69bc027b86bb8e00ea3d20d30fdd4 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import { Observable } from '../Observable';
  2. import { Subscriber } from '../Subscriber';
  3. import { createOperatorSubscriber } from './OperatorSubscriber';
  4. /**
  5. * A basic scan operation. This is used for `scan` and `reduce`.
  6. * @param accumulator The accumulator to use
  7. * @param seed The seed value for the state to accumulate
  8. * @param hasSeed Whether or not a seed was provided
  9. * @param emitOnNext Whether or not to emit the state on next
  10. * @param emitBeforeComplete Whether or not to emit the before completion
  11. */
  12. export function scanInternals<V, A, S>(
  13. accumulator: (acc: V | A | S, value: V, index: number) => A,
  14. seed: S,
  15. hasSeed: boolean,
  16. emitOnNext: boolean,
  17. emitBeforeComplete?: undefined | true
  18. ) {
  19. return (source: Observable<V>, subscriber: Subscriber<any>) => {
  20. // Whether or not we have state yet. This will only be
  21. // false before the first value arrives if we didn't get
  22. // a seed value.
  23. let hasState = hasSeed;
  24. // The state that we're tracking, starting with the seed,
  25. // if there is one, and then updated by the return value
  26. // from the accumulator on each emission.
  27. let state: any = seed;
  28. // An index to pass to the accumulator function.
  29. let index = 0;
  30. // Subscribe to our source. All errors and completions are passed through.
  31. source.subscribe(
  32. createOperatorSubscriber(
  33. subscriber,
  34. (value) => {
  35. // Always increment the index.
  36. const i = index++;
  37. // Set the state
  38. state = hasState
  39. ? // We already have state, so we can get the new state from the accumulator
  40. accumulator(state, value, i)
  41. : // We didn't have state yet, a seed value was not provided, so
  42. // we set the state to the first value, and mark that we have state now
  43. ((hasState = true), value);
  44. // Maybe send it to the consumer.
  45. emitOnNext && subscriber.next(state);
  46. },
  47. // If an onComplete was given, call it, otherwise
  48. // just pass through the complete notification to the consumer.
  49. emitBeforeComplete &&
  50. (() => {
  51. hasState && subscriber.next(state);
  52. subscriber.complete();
  53. })
  54. )
  55. );
  56. };
  57. }