f697c8cd90ec3f13402498d37268ef2a27d3eebe94e1a7190acda1832064a7a4d103d44c5b70d87aacad74f7e66c479742e48c9ac9d1b12ab4c7d1d68d8812 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import { OperatorFunction, ObservableInput } from '../types';
  2. import { operate } from '../util/lift';
  3. import { createOperatorSubscriber } from './OperatorSubscriber';
  4. import { innerFrom } from '../observable/innerFrom';
  5. /**
  6. * Compares all values of two observables in sequence using an optional comparator function
  7. * and returns an observable of a single boolean value representing whether or not the two sequences
  8. * are equal.
  9. *
  10. * <span class="informal">Checks to see of all values emitted by both observables are equal, in order.</span>
  11. *
  12. * ![](sequenceEqual.png)
  13. *
  14. * `sequenceEqual` subscribes to source observable and `compareTo` `ObservableInput` (that internally
  15. * gets converted to an observable) and buffers incoming values from each observable. Whenever either
  16. * observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom
  17. * up; If any value pair doesn't match, the returned observable will emit `false` and complete. If one of the
  18. * observables completes, the operator will wait for the other observable to complete; If the other
  19. * observable emits before completing, the returned observable will emit `false` and complete. If one observable never
  20. * completes or emits after the other completes, the returned observable will never complete.
  21. *
  22. * ## Example
  23. *
  24. * Figure out if the Konami code matches
  25. *
  26. * ```ts
  27. * import { from, fromEvent, map, bufferCount, mergeMap, sequenceEqual } from 'rxjs';
  28. *
  29. * const codes = from([
  30. * 'ArrowUp',
  31. * 'ArrowUp',
  32. * 'ArrowDown',
  33. * 'ArrowDown',
  34. * 'ArrowLeft',
  35. * 'ArrowRight',
  36. * 'ArrowLeft',
  37. * 'ArrowRight',
  38. * 'KeyB',
  39. * 'KeyA',
  40. * 'Enter', // no start key, clearly.
  41. * ]);
  42. *
  43. * const keys = fromEvent<KeyboardEvent>(document, 'keyup').pipe(map(e => e.code));
  44. * const matches = keys.pipe(
  45. * bufferCount(11, 1),
  46. * mergeMap(last11 => from(last11).pipe(sequenceEqual(codes)))
  47. * );
  48. * matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));
  49. * ```
  50. *
  51. * @see {@link combineLatest}
  52. * @see {@link zip}
  53. * @see {@link withLatestFrom}
  54. *
  55. * @param compareTo The `ObservableInput` sequence to compare the source sequence to.
  56. * @param comparator An optional function to compare each value pair.
  57. *
  58. * @return A function that returns an Observable that emits a single boolean
  59. * value representing whether or not the values emitted by the source
  60. * Observable and provided `ObservableInput` were equal in sequence.
  61. */
  62. export function sequenceEqual<T>(
  63. compareTo: ObservableInput<T>,
  64. comparator: (a: T, b: T) => boolean = (a, b) => a === b
  65. ): OperatorFunction<T, boolean> {
  66. return operate((source, subscriber) => {
  67. // The state for the source observable
  68. const aState = createState<T>();
  69. // The state for the compareTo observable;
  70. const bState = createState<T>();
  71. /** A utility to emit and complete */
  72. const emit = (isEqual: boolean) => {
  73. subscriber.next(isEqual);
  74. subscriber.complete();
  75. };
  76. /**
  77. * Creates a subscriber that subscribes to one of the sources, and compares its collected
  78. * state -- `selfState` -- to the other source's collected state -- `otherState`. This
  79. * is used for both streams.
  80. */
  81. const createSubscriber = (selfState: SequenceState<T>, otherState: SequenceState<T>) => {
  82. const sequenceEqualSubscriber = createOperatorSubscriber(
  83. subscriber,
  84. (a: T) => {
  85. const { buffer, complete } = otherState;
  86. if (buffer.length === 0) {
  87. // If there's no values in the other buffer
  88. // and the other stream is complete, we know
  89. // this isn't a match, because we got one more value.
  90. // Otherwise, we push onto our buffer, so when the other
  91. // stream emits, it can pull this value off our buffer and check it
  92. // at the appropriate time.
  93. complete ? emit(false) : selfState.buffer.push(a);
  94. } else {
  95. // If the other stream *does* have values in its buffer,
  96. // pull the oldest one off so we can compare it to what we
  97. // just got. If it wasn't a match, emit `false` and complete.
  98. !comparator(a, buffer.shift()!) && emit(false);
  99. }
  100. },
  101. () => {
  102. // Or observable completed
  103. selfState.complete = true;
  104. const { complete, buffer } = otherState;
  105. // If the other observable is also complete, and there's
  106. // still stuff left in their buffer, it doesn't match, if their
  107. // buffer is empty, then it does match. This is because we can't
  108. // possibly get more values here anymore.
  109. complete && emit(buffer.length === 0);
  110. // Be sure to clean up our stream as soon as possible if we can.
  111. sequenceEqualSubscriber?.unsubscribe();
  112. }
  113. );
  114. return sequenceEqualSubscriber;
  115. };
  116. // Subscribe to each source.
  117. source.subscribe(createSubscriber(aState, bState));
  118. innerFrom(compareTo).subscribe(createSubscriber(bState, aState));
  119. });
  120. }
  121. /**
  122. * A simple structure for the data used to test each sequence
  123. */
  124. interface SequenceState<T> {
  125. /** A temporary store for arrived values before they are checked */
  126. buffer: T[];
  127. /** Whether or not the sequence source has completed. */
  128. complete: boolean;
  129. }
  130. /**
  131. * Creates a simple structure that is used to represent
  132. * data used to test each sequence.
  133. */
  134. function createState<T>(): SequenceState<T> {
  135. return {
  136. buffer: [],
  137. complete: false,
  138. };
  139. }