af5dce26bc484c93fc4fc2c524cb64bbfe98553f4a89f067b59851e1ffdb7e3b8af375fb82591514afe44d15d91eeae1fbc26399af664c5505a6ea1a0a4e2e 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import { MonoTypeOperatorFunction } from '../types';
  2. import { identity } from '../util/identity';
  3. import { operate } from '../util/lift';
  4. import { createOperatorSubscriber } from './OperatorSubscriber';
  5. export function distinctUntilChanged<T>(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction<T>;
  6. export function distinctUntilChanged<T, K>(
  7. comparator: (previous: K, current: K) => boolean,
  8. keySelector: (value: T) => K
  9. ): MonoTypeOperatorFunction<T>;
  10. /**
  11. * Returns a result {@link Observable} that emits all values pushed by the source observable if they
  12. * are distinct in comparison to the last value the result observable emitted.
  13. *
  14. * When provided without parameters or with the first parameter (`{@link distinctUntilChanged#comparator comparator}`),
  15. * it behaves like this:
  16. *
  17. * 1. It will always emit the first value from the source.
  18. * 2. For all subsequent values pushed by the source, they will be compared to the previously emitted values
  19. * using the provided `comparator` or an `===` equality check.
  20. * 3. If the value pushed by the source is determined to be unequal by this check, that value is emitted and
  21. * becomes the new "previously emitted value" internally.
  22. *
  23. * When the second parameter (`{@link distinctUntilChanged#keySelector keySelector}`) is provided, the behavior
  24. * changes:
  25. *
  26. * 1. It will always emit the first value from the source.
  27. * 2. The `keySelector` will be run against all values, including the first value.
  28. * 3. For all values after the first, the selected key will be compared against the key selected from
  29. * the previously emitted value using the `comparator`.
  30. * 4. If the keys are determined to be unequal by this check, the value (not the key), is emitted
  31. * and the selected key from that value is saved for future comparisons against other keys.
  32. *
  33. * ## Examples
  34. *
  35. * A very basic example with no `{@link distinctUntilChanged#comparator comparator}`. Note that `1` is emitted more than once,
  36. * because it's distinct in comparison to the _previously emitted_ value,
  37. * not in comparison to _all other emitted values_.
  38. *
  39. * ```ts
  40. * import { of, distinctUntilChanged } from 'rxjs';
  41. *
  42. * of(1, 1, 1, 2, 2, 2, 1, 1, 3, 3)
  43. * .pipe(distinctUntilChanged())
  44. * .subscribe(console.log);
  45. * // Logs: 1, 2, 1, 3
  46. * ```
  47. *
  48. * With a `{@link distinctUntilChanged#comparator comparator}`, you can do custom comparisons. Let's say
  49. * you only want to emit a value when all of its components have
  50. * changed:
  51. *
  52. * ```ts
  53. * import { of, distinctUntilChanged } from 'rxjs';
  54. *
  55. * const totallyDifferentBuilds$ = of(
  56. * { engineVersion: '1.1.0', transmissionVersion: '1.2.0' },
  57. * { engineVersion: '1.1.0', transmissionVersion: '1.4.0' },
  58. * { engineVersion: '1.3.0', transmissionVersion: '1.4.0' },
  59. * { engineVersion: '1.3.0', transmissionVersion: '1.5.0' },
  60. * { engineVersion: '2.0.0', transmissionVersion: '1.5.0' }
  61. * ).pipe(
  62. * distinctUntilChanged((prev, curr) => {
  63. * return (
  64. * prev.engineVersion === curr.engineVersion ||
  65. * prev.transmissionVersion === curr.transmissionVersion
  66. * );
  67. * })
  68. * );
  69. *
  70. * totallyDifferentBuilds$.subscribe(console.log);
  71. *
  72. * // Logs:
  73. * // { engineVersion: '1.1.0', transmissionVersion: '1.2.0' }
  74. * // { engineVersion: '1.3.0', transmissionVersion: '1.4.0' }
  75. * // { engineVersion: '2.0.0', transmissionVersion: '1.5.0' }
  76. * ```
  77. *
  78. * You can also provide a custom `{@link distinctUntilChanged#comparator comparator}` to check that emitted
  79. * changes are only in one direction. Let's say you only want to get
  80. * the next record temperature:
  81. *
  82. * ```ts
  83. * import { of, distinctUntilChanged } from 'rxjs';
  84. *
  85. * const temps$ = of(30, 31, 20, 34, 33, 29, 35, 20);
  86. *
  87. * const recordHighs$ = temps$.pipe(
  88. * distinctUntilChanged((prevHigh, temp) => {
  89. * // If the current temp is less than
  90. * // or the same as the previous record,
  91. * // the record hasn't changed.
  92. * return temp <= prevHigh;
  93. * })
  94. * );
  95. *
  96. * recordHighs$.subscribe(console.log);
  97. * // Logs: 30, 31, 34, 35
  98. * ```
  99. *
  100. * Selecting update events only when the `updatedBy` field shows
  101. * the account changed hands.
  102. *
  103. * ```ts
  104. * import { of, distinctUntilChanged } from 'rxjs';
  105. *
  106. * // A stream of updates to a given account
  107. * const accountUpdates$ = of(
  108. * { updatedBy: 'blesh', data: [] },
  109. * { updatedBy: 'blesh', data: [] },
  110. * { updatedBy: 'ncjamieson', data: [] },
  111. * { updatedBy: 'ncjamieson', data: [] },
  112. * { updatedBy: 'blesh', data: [] }
  113. * );
  114. *
  115. * // We only want the events where it changed hands
  116. * const changedHands$ = accountUpdates$.pipe(
  117. * distinctUntilChanged(undefined, update => update.updatedBy)
  118. * );
  119. *
  120. * changedHands$.subscribe(console.log);
  121. * // Logs:
  122. * // { updatedBy: 'blesh', data: Array[0] }
  123. * // { updatedBy: 'ncjamieson', data: Array[0] }
  124. * // { updatedBy: 'blesh', data: Array[0] }
  125. * ```
  126. *
  127. * @see {@link distinct}
  128. * @see {@link distinctUntilKeyChanged}
  129. *
  130. * @param comparator A function used to compare the previous and current keys for
  131. * equality. Defaults to a `===` check.
  132. * @param keySelector Used to select a key value to be passed to the `comparator`.
  133. *
  134. * @return A function that returns an Observable that emits items from the
  135. * source Observable with distinct values.
  136. */
  137. export function distinctUntilChanged<T, K>(
  138. comparator?: (previous: K, current: K) => boolean,
  139. keySelector: (value: T) => K = identity as (value: T) => K
  140. ): MonoTypeOperatorFunction<T> {
  141. // We've been allowing `null` do be passed as the `compare`, so we can't do
  142. // a default value for the parameter, because that will only work
  143. // for `undefined`.
  144. comparator = comparator ?? defaultCompare;
  145. return operate((source, subscriber) => {
  146. // The previous key, used to compare against keys selected
  147. // from new arrivals to determine "distinctiveness".
  148. let previousKey: K;
  149. // Whether or not this is the first value we've gotten.
  150. let first = true;
  151. source.subscribe(
  152. createOperatorSubscriber(subscriber, (value) => {
  153. // We always call the key selector.
  154. const currentKey = keySelector(value);
  155. // If it's the first value, we always emit it.
  156. // Otherwise, we compare this key to the previous key, and
  157. // if the comparer returns false, we emit.
  158. if (first || !comparator!(previousKey, currentKey)) {
  159. // Update our state *before* we emit the value
  160. // as emission can be the source of re-entrant code
  161. // in functional libraries like this. We only really
  162. // need to do this if it's the first value, or if the
  163. // key we're tracking in previous needs to change.
  164. first = false;
  165. previousKey = currentKey;
  166. // Emit the value!
  167. subscriber.next(value);
  168. }
  169. })
  170. );
  171. });
  172. }
  173. function defaultCompare(a: any, b: any) {
  174. return a === b;
  175. }