123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- import { OperatorFunction, ObservableInput } from '../types';
- import { operate } from '../util/lift';
- import { createOperatorSubscriber } from './OperatorSubscriber';
- import { innerFrom } from '../observable/innerFrom';
- /**
- * Compares all values of two observables in sequence using an optional comparator function
- * and returns an observable of a single boolean value representing whether or not the two sequences
- * are equal.
- *
- * <span class="informal">Checks to see of all values emitted by both observables are equal, in order.</span>
- *
- * 
- *
- * `sequenceEqual` subscribes to source observable and `compareTo` `ObservableInput` (that internally
- * gets converted to an observable) and buffers incoming values from each observable. Whenever either
- * observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom
- * up; If any value pair doesn't match, the returned observable will emit `false` and complete. If one of the
- * observables completes, the operator will wait for the other observable to complete; If the other
- * observable emits before completing, the returned observable will emit `false` and complete. If one observable never
- * completes or emits after the other completes, the returned observable will never complete.
- *
- * ## Example
- *
- * Figure out if the Konami code matches
- *
- * ```ts
- * import { from, fromEvent, map, bufferCount, mergeMap, sequenceEqual } from 'rxjs';
- *
- * const codes = from([
- * 'ArrowUp',
- * 'ArrowUp',
- * 'ArrowDown',
- * 'ArrowDown',
- * 'ArrowLeft',
- * 'ArrowRight',
- * 'ArrowLeft',
- * 'ArrowRight',
- * 'KeyB',
- * 'KeyA',
- * 'Enter', // no start key, clearly.
- * ]);
- *
- * const keys = fromEvent<KeyboardEvent>(document, 'keyup').pipe(map(e => e.code));
- * const matches = keys.pipe(
- * bufferCount(11, 1),
- * mergeMap(last11 => from(last11).pipe(sequenceEqual(codes)))
- * );
- * matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));
- * ```
- *
- * @see {@link combineLatest}
- * @see {@link zip}
- * @see {@link withLatestFrom}
- *
- * @param compareTo The `ObservableInput` sequence to compare the source sequence to.
- * @param comparator An optional function to compare each value pair.
- *
- * @return A function that returns an Observable that emits a single boolean
- * value representing whether or not the values emitted by the source
- * Observable and provided `ObservableInput` were equal in sequence.
- */
- export function sequenceEqual<T>(
- compareTo: ObservableInput<T>,
- comparator: (a: T, b: T) => boolean = (a, b) => a === b
- ): OperatorFunction<T, boolean> {
- return operate((source, subscriber) => {
- // The state for the source observable
- const aState = createState<T>();
- // The state for the compareTo observable;
- const bState = createState<T>();
- /** A utility to emit and complete */
- const emit = (isEqual: boolean) => {
- subscriber.next(isEqual);
- subscriber.complete();
- };
- /**
- * Creates a subscriber that subscribes to one of the sources, and compares its collected
- * state -- `selfState` -- to the other source's collected state -- `otherState`. This
- * is used for both streams.
- */
- const createSubscriber = (selfState: SequenceState<T>, otherState: SequenceState<T>) => {
- const sequenceEqualSubscriber = createOperatorSubscriber(
- subscriber,
- (a: T) => {
- const { buffer, complete } = otherState;
- if (buffer.length === 0) {
- // If there's no values in the other buffer
- // and the other stream is complete, we know
- // this isn't a match, because we got one more value.
- // Otherwise, we push onto our buffer, so when the other
- // stream emits, it can pull this value off our buffer and check it
- // at the appropriate time.
- complete ? emit(false) : selfState.buffer.push(a);
- } else {
- // If the other stream *does* have values in its buffer,
- // pull the oldest one off so we can compare it to what we
- // just got. If it wasn't a match, emit `false` and complete.
- !comparator(a, buffer.shift()!) && emit(false);
- }
- },
- () => {
- // Or observable completed
- selfState.complete = true;
- const { complete, buffer } = otherState;
- // If the other observable is also complete, and there's
- // still stuff left in their buffer, it doesn't match, if their
- // buffer is empty, then it does match. This is because we can't
- // possibly get more values here anymore.
- complete && emit(buffer.length === 0);
- // Be sure to clean up our stream as soon as possible if we can.
- sequenceEqualSubscriber?.unsubscribe();
- }
- );
- return sequenceEqualSubscriber;
- };
- // Subscribe to each source.
- source.subscribe(createSubscriber(aState, bState));
- innerFrom(compareTo).subscribe(createSubscriber(bState, aState));
- });
- }
- /**
- * A simple structure for the data used to test each sequence
- */
- interface SequenceState<T> {
- /** A temporary store for arrived values before they are checked */
- buffer: T[];
- /** Whether or not the sequence source has completed. */
- complete: boolean;
- }
- /**
- * Creates a simple structure that is used to represent
- * data used to test each sequence.
- */
- function createState<T>(): SequenceState<T> {
- return {
- buffer: [],
- complete: false,
- };
- }
|