0c79e35055375c2c9dd95fb04c41e1f938e7e93a0c265c375ee160b52683abd620f4768c1c42c8d9bbf255867c9891c2fd489c4a5e4fb3eb1201f5eab82ac6 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. import { Observable } from '../Observable';
  2. import { innerFrom } from '../observable/innerFrom';
  3. import { Subject } from '../Subject';
  4. import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types';
  5. import { operate } from '../util/lift';
  6. import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber';
  7. export interface BasicGroupByOptions<K, T> {
  8. element?: undefined;
  9. duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
  10. connector?: () => SubjectLike<T>;
  11. }
  12. export interface GroupByOptionsWithElement<K, E, T> {
  13. element: (value: T) => E;
  14. duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
  15. connector?: () => SubjectLike<E>;
  16. }
  17. export function groupBy<T, K>(key: (value: T) => K, options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;
  18. export function groupBy<T, K, E>(
  19. key: (value: T) => K,
  20. options: GroupByOptionsWithElement<K, E, T>
  21. ): OperatorFunction<T, GroupedObservable<K, E>>;
  22. export function groupBy<T, K extends T>(
  23. key: (value: T) => value is K
  24. ): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
  25. export function groupBy<T, K>(key: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
  26. /**
  27. * @deprecated use the options parameter instead.
  28. */
  29. export function groupBy<T, K>(
  30. key: (value: T) => K,
  31. element: void,
  32. duration: (grouped: GroupedObservable<K, T>) => Observable<any>
  33. ): OperatorFunction<T, GroupedObservable<K, T>>;
  34. /**
  35. * @deprecated use the options parameter instead.
  36. */
  37. export function groupBy<T, K, R>(
  38. key: (value: T) => K,
  39. element?: (value: T) => R,
  40. duration?: (grouped: GroupedObservable<K, R>) => Observable<any>
  41. ): OperatorFunction<T, GroupedObservable<K, R>>;
  42. /**
  43. * Groups the items emitted by an Observable according to a specified criterion,
  44. * and emits these grouped items as `GroupedObservables`, one
  45. * {@link GroupedObservable} per group.
  46. *
  47. * ![](groupBy.png)
  48. *
  49. * When the Observable emits an item, a key is computed for this item with the key function.
  50. *
  51. * If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Otherwise, a new
  52. * {@link GroupedObservable} for this key is created and emits.
  53. *
  54. * A {@link GroupedObservable} represents values belonging to the same group represented by a common key. The common
  55. * key is available as the `key` field of a {@link GroupedObservable} instance.
  56. *
  57. * The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements
  58. * returned by the element function.
  59. *
  60. * ## Examples
  61. *
  62. * Group objects by `id` and return as array
  63. *
  64. * ```ts
  65. * import { of, groupBy, mergeMap, reduce } from 'rxjs';
  66. *
  67. * of(
  68. * { id: 1, name: 'JavaScript' },
  69. * { id: 2, name: 'Parcel' },
  70. * { id: 2, name: 'webpack' },
  71. * { id: 1, name: 'TypeScript' },
  72. * { id: 3, name: 'TSLint' }
  73. * ).pipe(
  74. * groupBy(p => p.id),
  75. * mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [])))
  76. * )
  77. * .subscribe(p => console.log(p));
  78. *
  79. * // displays:
  80. * // [{ id: 1, name: 'JavaScript' }, { id: 1, name: 'TypeScript'}]
  81. * // [{ id: 2, name: 'Parcel' }, { id: 2, name: 'webpack'}]
  82. * // [{ id: 3, name: 'TSLint' }]
  83. * ```
  84. *
  85. * Pivot data on the `id` field
  86. *
  87. * ```ts
  88. * import { of, groupBy, mergeMap, reduce, map } from 'rxjs';
  89. *
  90. * of(
  91. * { id: 1, name: 'JavaScript' },
  92. * { id: 2, name: 'Parcel' },
  93. * { id: 2, name: 'webpack' },
  94. * { id: 1, name: 'TypeScript' },
  95. * { id: 3, name: 'TSLint' }
  96. * ).pipe(
  97. * groupBy(p => p.id, { element: p => p.name }),
  98. * mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [`${ group$.key }`]))),
  99. * map(arr => ({ id: parseInt(arr[0], 10), values: arr.slice(1) }))
  100. * )
  101. * .subscribe(p => console.log(p));
  102. *
  103. * // displays:
  104. * // { id: 1, values: [ 'JavaScript', 'TypeScript' ] }
  105. * // { id: 2, values: [ 'Parcel', 'webpack' ] }
  106. * // { id: 3, values: [ 'TSLint' ] }
  107. * ```
  108. *
  109. * @param key A function that extracts the key
  110. * for each item.
  111. * @param element A function that extracts the
  112. * return element for each item.
  113. * @param duration
  114. * A function that returns an Observable to determine how long each group should
  115. * exist.
  116. * @param connector Factory function to create an
  117. * intermediate Subject through which grouped elements are emitted.
  118. * @return A function that returns an Observable that emits GroupedObservables,
  119. * each of which corresponds to a unique key value and each of which emits
  120. * those items from the source Observable that share that key value.
  121. *
  122. * @deprecated Use the options parameter instead.
  123. */
  124. export function groupBy<T, K, R>(
  125. key: (value: T) => K,
  126. element?: (value: T) => R,
  127. duration?: (grouped: GroupedObservable<K, R>) => Observable<any>,
  128. connector?: () => Subject<R>
  129. ): OperatorFunction<T, GroupedObservable<K, R>>;
  130. // Impl
  131. export function groupBy<T, K, R>(
  132. keySelector: (value: T) => K,
  133. elementOrOptions?: ((value: any) => any) | void | BasicGroupByOptions<K, T> | GroupByOptionsWithElement<K, R, T>,
  134. duration?: (grouped: GroupedObservable<any, any>) => ObservableInput<any>,
  135. connector?: () => SubjectLike<any>
  136. ): OperatorFunction<T, GroupedObservable<K, R>> {
  137. return operate((source, subscriber) => {
  138. let element: ((value: any) => any) | void;
  139. if (!elementOrOptions || typeof elementOrOptions === 'function') {
  140. element = elementOrOptions as ((value: any) => any);
  141. } else {
  142. ({ duration, element, connector } = elementOrOptions);
  143. }
  144. // A lookup for the groups that we have so far.
  145. const groups = new Map<K, SubjectLike<any>>();
  146. // Used for notifying all groups and the subscriber in the same way.
  147. const notify = (cb: (group: Observer<any>) => void) => {
  148. groups.forEach(cb);
  149. cb(subscriber);
  150. };
  151. // Used to handle errors from the source, AND errors that occur during the
  152. // next call from the source.
  153. const handleError = (err: any) => notify((consumer) => consumer.error(err));
  154. // The number of actively subscribed groups
  155. let activeGroups = 0;
  156. // Whether or not teardown was attempted on this subscription.
  157. let teardownAttempted = false;
  158. // Capturing a reference to this, because we need a handle to it
  159. // in `createGroupedObservable` below. This is what we use to
  160. // subscribe to our source observable. This sometimes needs to be unsubscribed
  161. // out-of-band with our `subscriber` which is the downstream subscriber, or destination,
  162. // in cases where a user unsubscribes from the main resulting subscription, but
  163. // still has groups from this subscription subscribed and would expect values from it
  164. // Consider: `source.pipe(groupBy(fn), take(2))`.
  165. const groupBySourceSubscriber = new OperatorSubscriber(
  166. subscriber,
  167. (value: T) => {
  168. // Because we have to notify all groups of any errors that occur in here,
  169. // we have to add our own try/catch to ensure that those errors are propagated.
  170. // OperatorSubscriber will only send the error to the main subscriber.
  171. try {
  172. const key = keySelector(value);
  173. let group = groups.get(key);
  174. if (!group) {
  175. // Create our group subject
  176. groups.set(key, (group = connector ? connector() : new Subject<any>()));
  177. // Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
  178. // because the grouped observable has special semantics around reference counting
  179. // to ensure we don't sever our connection to the source prematurely.
  180. const grouped = createGroupedObservable(key, group);
  181. subscriber.next(grouped);
  182. if (duration) {
  183. const durationSubscriber = createOperatorSubscriber(
  184. // Providing the group here ensures that it is disposed of -- via `unsubscribe` --
  185. // when the duration subscription is torn down. That is important, because then
  186. // if someone holds a handle to the grouped observable and tries to subscribe to it
  187. // after the connection to the source has been severed, they will get an
  188. // `ObjectUnsubscribedError` and know they can't possibly get any notifications.
  189. group as any,
  190. () => {
  191. // Our duration notified! We can complete the group.
  192. // The group will be removed from the map in the finalization phase.
  193. group!.complete();
  194. durationSubscriber?.unsubscribe();
  195. },
  196. // Completions are also sent to the group, but just the group.
  197. undefined,
  198. // Errors on the duration subscriber are sent to the group
  199. // but only the group. They are not sent to the main subscription.
  200. undefined,
  201. // Finalization: Remove this group from our map.
  202. () => groups.delete(key)
  203. );
  204. // Start our duration notifier.
  205. groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
  206. }
  207. }
  208. // Send the value to our group.
  209. group.next(element ? element(value) : value);
  210. } catch (err) {
  211. handleError(err);
  212. }
  213. },
  214. // Source completes.
  215. () => notify((consumer) => consumer.complete()),
  216. // Error from the source.
  217. handleError,
  218. // Free up memory.
  219. // When the source subscription is _finally_ torn down, release the subjects and keys
  220. // in our groups Map, they may be quite large and we don't want to keep them around if we
  221. // don't have to.
  222. () => groups.clear(),
  223. () => {
  224. teardownAttempted = true;
  225. // We only kill our subscription to the source if we have
  226. // no active groups. As stated above, consider this scenario:
  227. // source$.pipe(groupBy(fn), take(2)).
  228. return activeGroups === 0;
  229. }
  230. );
  231. // Subscribe to the source
  232. source.subscribe(groupBySourceSubscriber);
  233. /**
  234. * Creates the actual grouped observable returned.
  235. * @param key The key of the group
  236. * @param groupSubject The subject that fuels the group
  237. */
  238. function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
  239. const result: any = new Observable<T>((groupSubscriber) => {
  240. activeGroups++;
  241. const innerSub = groupSubject.subscribe(groupSubscriber);
  242. return () => {
  243. innerSub.unsubscribe();
  244. // We can kill the subscription to our source if we now have no more
  245. // active groups subscribed, and a finalization was already attempted on
  246. // the source.
  247. --activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe();
  248. };
  249. });
  250. result.key = key;
  251. return result;
  252. }
  253. });
  254. }
  255. /**
  256. * An observable of values that is the emitted by the result of a {@link groupBy} operator,
  257. * contains a `key` property for the grouping.
  258. */
  259. export interface GroupedObservable<K, T> extends Observable<T> {
  260. /**
  261. * The key value for the grouped notifications.
  262. */
  263. readonly key: K;
  264. }