123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- import { Observable } from '../Observable';
- import { innerFrom } from '../observable/innerFrom';
- import { Subject } from '../Subject';
- import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types';
- import { operate } from '../util/lift';
- import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber';
- export interface BasicGroupByOptions<K, T> {
- element?: undefined;
- duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
- connector?: () => SubjectLike<T>;
- }
- export interface GroupByOptionsWithElement<K, E, T> {
- element: (value: T) => E;
- duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
- connector?: () => SubjectLike<E>;
- }
- export function groupBy<T, K>(key: (value: T) => K, options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;
- export function groupBy<T, K, E>(
- key: (value: T) => K,
- options: GroupByOptionsWithElement<K, E, T>
- ): OperatorFunction<T, GroupedObservable<K, E>>;
- export function groupBy<T, K extends T>(
- key: (value: T) => value is K
- ): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
- export function groupBy<T, K>(key: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
- /**
- * @deprecated use the options parameter instead.
- */
- export function groupBy<T, K>(
- key: (value: T) => K,
- element: void,
- duration: (grouped: GroupedObservable<K, T>) => Observable<any>
- ): OperatorFunction<T, GroupedObservable<K, T>>;
- /**
- * @deprecated use the options parameter instead.
- */
- export function groupBy<T, K, R>(
- key: (value: T) => K,
- element?: (value: T) => R,
- duration?: (grouped: GroupedObservable<K, R>) => Observable<any>
- ): OperatorFunction<T, GroupedObservable<K, R>>;
- /**
- * Groups the items emitted by an Observable according to a specified criterion,
- * and emits these grouped items as `GroupedObservables`, one
- * {@link GroupedObservable} per group.
- *
- * 
- *
- * When the Observable emits an item, a key is computed for this item with the key function.
- *
- * If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Otherwise, a new
- * {@link GroupedObservable} for this key is created and emits.
- *
- * A {@link GroupedObservable} represents values belonging to the same group represented by a common key. The common
- * key is available as the `key` field of a {@link GroupedObservable} instance.
- *
- * The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements
- * returned by the element function.
- *
- * ## Examples
- *
- * Group objects by `id` and return as array
- *
- * ```ts
- * import { of, groupBy, mergeMap, reduce } from 'rxjs';
- *
- * of(
- * { id: 1, name: 'JavaScript' },
- * { id: 2, name: 'Parcel' },
- * { id: 2, name: 'webpack' },
- * { id: 1, name: 'TypeScript' },
- * { id: 3, name: 'TSLint' }
- * ).pipe(
- * groupBy(p => p.id),
- * mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [])))
- * )
- * .subscribe(p => console.log(p));
- *
- * // displays:
- * // [{ id: 1, name: 'JavaScript' }, { id: 1, name: 'TypeScript'}]
- * // [{ id: 2, name: 'Parcel' }, { id: 2, name: 'webpack'}]
- * // [{ id: 3, name: 'TSLint' }]
- * ```
- *
- * Pivot data on the `id` field
- *
- * ```ts
- * import { of, groupBy, mergeMap, reduce, map } from 'rxjs';
- *
- * of(
- * { id: 1, name: 'JavaScript' },
- * { id: 2, name: 'Parcel' },
- * { id: 2, name: 'webpack' },
- * { id: 1, name: 'TypeScript' },
- * { id: 3, name: 'TSLint' }
- * ).pipe(
- * groupBy(p => p.id, { element: p => p.name }),
- * mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [`${ group$.key }`]))),
- * map(arr => ({ id: parseInt(arr[0], 10), values: arr.slice(1) }))
- * )
- * .subscribe(p => console.log(p));
- *
- * // displays:
- * // { id: 1, values: [ 'JavaScript', 'TypeScript' ] }
- * // { id: 2, values: [ 'Parcel', 'webpack' ] }
- * // { id: 3, values: [ 'TSLint' ] }
- * ```
- *
- * @param key A function that extracts the key
- * for each item.
- * @param element A function that extracts the
- * return element for each item.
- * @param duration
- * A function that returns an Observable to determine how long each group should
- * exist.
- * @param connector Factory function to create an
- * intermediate Subject through which grouped elements are emitted.
- * @return A function that returns an Observable that emits GroupedObservables,
- * each of which corresponds to a unique key value and each of which emits
- * those items from the source Observable that share that key value.
- *
- * @deprecated Use the options parameter instead.
- */
- export function groupBy<T, K, R>(
- key: (value: T) => K,
- element?: (value: T) => R,
- duration?: (grouped: GroupedObservable<K, R>) => Observable<any>,
- connector?: () => Subject<R>
- ): OperatorFunction<T, GroupedObservable<K, R>>;
- // Impl
- export function groupBy<T, K, R>(
- keySelector: (value: T) => K,
- elementOrOptions?: ((value: any) => any) | void | BasicGroupByOptions<K, T> | GroupByOptionsWithElement<K, R, T>,
- duration?: (grouped: GroupedObservable<any, any>) => ObservableInput<any>,
- connector?: () => SubjectLike<any>
- ): OperatorFunction<T, GroupedObservable<K, R>> {
- return operate((source, subscriber) => {
- let element: ((value: any) => any) | void;
- if (!elementOrOptions || typeof elementOrOptions === 'function') {
- element = elementOrOptions as ((value: any) => any);
- } else {
- ({ duration, element, connector } = elementOrOptions);
- }
- // A lookup for the groups that we have so far.
- const groups = new Map<K, SubjectLike<any>>();
- // Used for notifying all groups and the subscriber in the same way.
- const notify = (cb: (group: Observer<any>) => void) => {
- groups.forEach(cb);
- cb(subscriber);
- };
- // Used to handle errors from the source, AND errors that occur during the
- // next call from the source.
- const handleError = (err: any) => notify((consumer) => consumer.error(err));
- // The number of actively subscribed groups
- let activeGroups = 0;
- // Whether or not teardown was attempted on this subscription.
- let teardownAttempted = false;
- // Capturing a reference to this, because we need a handle to it
- // in `createGroupedObservable` below. This is what we use to
- // subscribe to our source observable. This sometimes needs to be unsubscribed
- // out-of-band with our `subscriber` which is the downstream subscriber, or destination,
- // in cases where a user unsubscribes from the main resulting subscription, but
- // still has groups from this subscription subscribed and would expect values from it
- // Consider: `source.pipe(groupBy(fn), take(2))`.
- const groupBySourceSubscriber = new OperatorSubscriber(
- subscriber,
- (value: T) => {
- // Because we have to notify all groups of any errors that occur in here,
- // we have to add our own try/catch to ensure that those errors are propagated.
- // OperatorSubscriber will only send the error to the main subscriber.
- try {
- const key = keySelector(value);
- let group = groups.get(key);
- if (!group) {
- // Create our group subject
- groups.set(key, (group = connector ? connector() : new Subject<any>()));
- // Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
- // because the grouped observable has special semantics around reference counting
- // to ensure we don't sever our connection to the source prematurely.
- const grouped = createGroupedObservable(key, group);
- subscriber.next(grouped);
- if (duration) {
- const durationSubscriber = createOperatorSubscriber(
- // Providing the group here ensures that it is disposed of -- via `unsubscribe` --
- // when the duration subscription is torn down. That is important, because then
- // if someone holds a handle to the grouped observable and tries to subscribe to it
- // after the connection to the source has been severed, they will get an
- // `ObjectUnsubscribedError` and know they can't possibly get any notifications.
- group as any,
- () => {
- // Our duration notified! We can complete the group.
- // The group will be removed from the map in the finalization phase.
- group!.complete();
- durationSubscriber?.unsubscribe();
- },
- // Completions are also sent to the group, but just the group.
- undefined,
- // Errors on the duration subscriber are sent to the group
- // but only the group. They are not sent to the main subscription.
- undefined,
- // Finalization: Remove this group from our map.
- () => groups.delete(key)
- );
- // Start our duration notifier.
- groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
- }
- }
- // Send the value to our group.
- group.next(element ? element(value) : value);
- } catch (err) {
- handleError(err);
- }
- },
- // Source completes.
- () => notify((consumer) => consumer.complete()),
- // Error from the source.
- handleError,
- // Free up memory.
- // When the source subscription is _finally_ torn down, release the subjects and keys
- // in our groups Map, they may be quite large and we don't want to keep them around if we
- // don't have to.
- () => groups.clear(),
- () => {
- teardownAttempted = true;
- // We only kill our subscription to the source if we have
- // no active groups. As stated above, consider this scenario:
- // source$.pipe(groupBy(fn), take(2)).
- return activeGroups === 0;
- }
- );
- // Subscribe to the source
- source.subscribe(groupBySourceSubscriber);
- /**
- * Creates the actual grouped observable returned.
- * @param key The key of the group
- * @param groupSubject The subject that fuels the group
- */
- function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
- const result: any = new Observable<T>((groupSubscriber) => {
- activeGroups++;
- const innerSub = groupSubject.subscribe(groupSubscriber);
- return () => {
- innerSub.unsubscribe();
- // We can kill the subscription to our source if we now have no more
- // active groups subscribed, and a finalization was already attempted on
- // the source.
- --activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe();
- };
- });
- result.key = key;
- return result;
- }
- });
- }
- /**
- * An observable of values that is the emitted by the result of a {@link groupBy} operator,
- * contains a `key` property for the grouping.
- */
- export interface GroupedObservable<K, T> extends Observable<T> {
- /**
- * The key value for the grouped notifications.
- */
- readonly key: K;
- }
|