| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- import { OperatorFunction, ObservableInputTuple } from '../types';
- import { operate } from '../util/lift';
- import { createOperatorSubscriber } from './OperatorSubscriber';
- import { innerFrom } from '../observable/innerFrom';
- import { identity } from '../util/identity';
- import { noop } from '../util/noop';
- import { popResultSelector } from '../util/args';
- export function withLatestFrom<T, O extends unknown[]>(...inputs: [...ObservableInputTuple<O>]): OperatorFunction<T, [T, ...O]>;
- export function withLatestFrom<T, O extends unknown[], R>(
- ...inputs: [...ObservableInputTuple<O>, (...value: [T, ...O]) => R]
- ): OperatorFunction<T, R>;
- /**
- * Combines the source Observable with other Observables to create an Observable
- * whose values are calculated from the latest values of each, only when the
- * source emits.
- *
- * <span class="informal">Whenever the source Observable emits a value, it
- * computes a formula using that value plus the latest values from other input
- * Observables, then emits the output of that formula.</span>
- *
- * 
- *
- * `withLatestFrom` combines each value from the source Observable (the
- * instance) with the latest values from the other input Observables only when
- * the source emits a value, optionally using a `project` function to determine
- * the value to be emitted on the output Observable. All input Observables must
- * emit at least one value before the output Observable will emit a value.
- *
- * ## Example
- *
- * On every click event, emit an array with the latest timer event plus the click event
- *
- * ```ts
- * import { fromEvent, interval, withLatestFrom } from 'rxjs';
- *
- * const clicks = fromEvent(document, 'click');
- * const timer = interval(1000);
- * const result = clicks.pipe(withLatestFrom(timer));
- * result.subscribe(x => console.log(x));
- * ```
- *
- * @see {@link combineLatest}
- *
- * @param inputs An input Observable to combine with the source Observable. More
- * than one input Observables may be given as argument. If the last parameter is
- * a function, it will be used as a projection function for combining values
- * together. When the function is called, it receives all values in order of the
- * Observables passed, where the first parameter is a value from the source
- * Observable. (e.g.
- * `a.pipe(withLatestFrom(b, c), map(([a1, b1, c1]) => a1 + b1 + c1))`). If this
- * is not passed, arrays will be emitted on the output Observable.
- * @return A function that returns an Observable of projected values from the
- * most recent values from each input Observable, or an array of the most
- * recent values from each input Observable.
- */
- export function withLatestFrom<T, R>(...inputs: any[]): OperatorFunction<T, R | any[]> {
- const project = popResultSelector(inputs) as ((...args: any[]) => R) | undefined;
- return operate((source, subscriber) => {
- const len = inputs.length;
- const otherValues = new Array(len);
- // An array of whether or not the other sources have emitted. Matched with them by index.
- // TODO: At somepoint, we should investigate the performance implications here, and look
- // into using a `Set()` and checking the `size` to see if we're ready.
- let hasValue = inputs.map(() => false);
- // Flipped true when we have at least one value from all other sources and
- // we are ready to start emitting values.
- let ready = false;
- // Other sources. Note that here we are not checking `subscriber.closed`,
- // this causes all inputs to be subscribed to, even if nothing can be emitted
- // from them. This is an important distinction because subscription constitutes
- // a side-effect.
- for (let i = 0; i < len; i++) {
- innerFrom(inputs[i]).subscribe(
- createOperatorSubscriber(
- subscriber,
- (value) => {
- otherValues[i] = value;
- if (!ready && !hasValue[i]) {
- // If we're not ready yet, flag to show this observable has emitted.
- hasValue[i] = true;
- // Intentionally terse code.
- // If all of our other observables have emitted, set `ready` to `true`,
- // so we know we can start emitting values, then clean up the `hasValue` array,
- // because we don't need it anymore.
- (ready = hasValue.every(identity)) && (hasValue = null!);
- }
- },
- // Completing one of the other sources has
- // no bearing on the completion of our result.
- noop
- )
- );
- }
- // Source subscription
- source.subscribe(
- createOperatorSubscriber(subscriber, (value) => {
- if (ready) {
- // We have at least one value from the other sources. Go ahead and emit.
- const values = [value, ...otherValues];
- subscriber.next(project ? project(...values) : values);
- }
- })
- );
- });
- }
|