9f54450594111f1cd5e5c855326a627a6a8486662b08455ebda80b8bedad46e55364667e4d3b2031c87b2c144fe9bb33f449803acfe6c7d2b87f82b4292af6 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import { Operator } from '../Operator';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { Observer, OperatorFunction } from '../types';
  6. /**
  7. * Compares all values of two observables in sequence using an optional comparator function
  8. * and returns an observable of a single boolean value representing whether or not the two sequences
  9. * are equal.
  10. *
  11. * <span class="informal">Checks to see of all values emitted by both observables are equal, in order.</span>
  12. *
  13. * ![](sequenceEqual.png)
  14. *
  15. * `sequenceEqual` subscribes to two observables and buffers incoming values from each observable. Whenever either
  16. * observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom
  17. * up; If any value pair doesn't match, the returned observable will emit `false` and complete. If one of the
  18. * observables completes, the operator will wait for the other observable to complete; If the other
  19. * observable emits before completing, the returned observable will emit `false` and complete. If one observable never
  20. * completes or emits after the other complets, the returned observable will never complete.
  21. *
  22. * ## Example
  23. * figure out if the Konami code matches
  24. * ```ts
  25. * import { from, fromEvent } from 'rxjs';
  26. * import { sequenceEqual, bufferCount, mergeMap, map } from 'rxjs/operators';
  27. *
  28. * const codes = from([
  29. * 'ArrowUp',
  30. * 'ArrowUp',
  31. * 'ArrowDown',
  32. * 'ArrowDown',
  33. * 'ArrowLeft',
  34. * 'ArrowRight',
  35. * 'ArrowLeft',
  36. * 'ArrowRight',
  37. * 'KeyB',
  38. * 'KeyA',
  39. * 'Enter', // no start key, clearly.
  40. * ]);
  41. *
  42. * const keys = fromEvent(document, 'keyup').pipe(map(e => e.code));
  43. * const matches = keys.pipe(
  44. * bufferCount(11, 1),
  45. * mergeMap(
  46. * last11 => from(last11).pipe(sequenceEqual(codes)),
  47. * ),
  48. * );
  49. * matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));
  50. * ```
  51. *
  52. * @see {@link combineLatest}
  53. * @see {@link zip}
  54. * @see {@link withLatestFrom}
  55. *
  56. * @param {Observable} compareTo The observable sequence to compare the source sequence to.
  57. * @param {function} [comparator] An optional function to compare each value pair
  58. * @return {Observable} An Observable of a single boolean value representing whether or not
  59. * the values emitted by both observables were equal in sequence.
  60. * @method sequenceEqual
  61. * @owner Observable
  62. */
  63. export function sequenceEqual<T>(compareTo: Observable<T>,
  64. comparator?: (a: T, b: T) => boolean): OperatorFunction<T, boolean> {
  65. return (source: Observable<T>) => source.lift(new SequenceEqualOperator(compareTo, comparator));
  66. }
  67. export class SequenceEqualOperator<T> implements Operator<T, boolean> {
  68. constructor(private compareTo: Observable<T>,
  69. private comparator: (a: T, b: T) => boolean) {
  70. }
  71. call(subscriber: Subscriber<boolean>, source: any): any {
  72. return source.subscribe(new SequenceEqualSubscriber(subscriber, this.compareTo, this.comparator));
  73. }
  74. }
  75. /**
  76. * We need this JSDoc comment for affecting ESDoc.
  77. * @ignore
  78. * @extends {Ignored}
  79. */
  80. export class SequenceEqualSubscriber<T, R> extends Subscriber<T> {
  81. private _a: T[] = [];
  82. private _b: T[] = [];
  83. private _oneComplete = false;
  84. constructor(destination: Observer<R>,
  85. private compareTo: Observable<T>,
  86. private comparator: (a: T, b: T) => boolean) {
  87. super(destination);
  88. (this.destination as Subscription).add(compareTo.subscribe(new SequenceEqualCompareToSubscriber(destination, this)));
  89. }
  90. protected _next(value: T): void {
  91. if (this._oneComplete && this._b.length === 0) {
  92. this.emit(false);
  93. } else {
  94. this._a.push(value);
  95. this.checkValues();
  96. }
  97. }
  98. public _complete(): void {
  99. if (this._oneComplete) {
  100. this.emit(this._a.length === 0 && this._b.length === 0);
  101. } else {
  102. this._oneComplete = true;
  103. }
  104. this.unsubscribe();
  105. }
  106. checkValues() {
  107. const { _a, _b, comparator } = this;
  108. while (_a.length > 0 && _b.length > 0) {
  109. let a = _a.shift();
  110. let b = _b.shift();
  111. let areEqual = false;
  112. try {
  113. areEqual = comparator ? comparator(a, b) : a === b;
  114. } catch (e) {
  115. this.destination.error(e);
  116. }
  117. if (!areEqual) {
  118. this.emit(false);
  119. }
  120. }
  121. }
  122. emit(value: boolean) {
  123. const { destination } = this;
  124. destination.next(value);
  125. destination.complete();
  126. }
  127. nextB(value: T) {
  128. if (this._oneComplete && this._a.length === 0) {
  129. this.emit(false);
  130. } else {
  131. this._b.push(value);
  132. this.checkValues();
  133. }
  134. }
  135. completeB() {
  136. if (this._oneComplete) {
  137. this.emit(this._a.length === 0 && this._b.length === 0);
  138. } else {
  139. this._oneComplete = true;
  140. }
  141. }
  142. }
  143. class SequenceEqualCompareToSubscriber<T, R> extends Subscriber<T> {
  144. constructor(destination: Observer<R>, private parent: SequenceEqualSubscriber<T, R>) {
  145. super(destination);
  146. }
  147. protected _next(value: T): void {
  148. this.parent.nextB(value);
  149. }
  150. protected _error(err: any): void {
  151. this.parent.error(err);
  152. this.unsubscribe();
  153. }
  154. protected _complete(): void {
  155. this.parent.completeB();
  156. this.unsubscribe();
  157. }
  158. }