| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import { Observable } from '../Observable';
- import { Subject } from '../Subject';
- import { Subscription } from '../Subscription';
- import { ObservableInput, OperatorFunction } from '../types';
- import { operate } from '../util/lift';
- import { innerFrom } from '../observable/innerFrom';
- import { createOperatorSubscriber } from './OperatorSubscriber';
- import { noop } from '../util/noop';
- import { arrRemove } from '../util/arrRemove';
- /**
- * Branch out the source Observable values as a nested Observable starting from
- * an emission from `openings` and ending when the output of `closingSelector`
- * emits.
- *
- * <span class="informal">It's like {@link bufferToggle}, but emits a nested
- * Observable instead of an array.</span>
- *
- * 
- *
- * Returns an Observable that emits windows of items it collects from the source
- * Observable. The output Observable emits windows that contain those items
- * emitted by the source Observable between the time when the `openings`
- * Observable emits an item and when the Observable returned by
- * `closingSelector` emits an item.
- *
- * ## Example
- *
- * Every other second, emit the click events from the next 500ms
- *
- * ```ts
- * import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs';
- *
- * const clicks = fromEvent(document, 'click');
- * const openings = interval(1000);
- * const result = clicks.pipe(
- * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
- * mergeAll()
- * );
- * result.subscribe(x => console.log(x));
- * ```
- *
- * @see {@link window}
- * @see {@link windowCount}
- * @see {@link windowTime}
- * @see {@link windowWhen}
- * @see {@link bufferToggle}
- *
- * @param openings An observable of notifications to start new windows.
- * @param closingSelector A function that takes the value emitted by the
- * `openings` observable and returns an Observable, which, when it emits a next
- * notification, signals that the associated window should complete.
- * @return A function that returns an Observable of windows, which in turn are
- * Observables.
- */
- export function windowToggle<T, O>(
- openings: ObservableInput<O>,
- closingSelector: (openValue: O) => ObservableInput<any>
- ): OperatorFunction<T, Observable<T>> {
- return operate((source, subscriber) => {
- const windows: Subject<T>[] = [];
- const handleError = (err: any) => {
- while (0 < windows.length) {
- windows.shift()!.error(err);
- }
- subscriber.error(err);
- };
- innerFrom(openings).subscribe(
- createOperatorSubscriber(
- subscriber,
- (openValue) => {
- const window = new Subject<T>();
- windows.push(window);
- const closingSubscription = new Subscription();
- const closeWindow = () => {
- arrRemove(windows, window);
- window.complete();
- closingSubscription.unsubscribe();
- };
- let closingNotifier: Observable<any>;
- try {
- closingNotifier = innerFrom(closingSelector(openValue));
- } catch (err) {
- handleError(err);
- return;
- }
- subscriber.next(window.asObservable());
- closingSubscription.add(closingNotifier.subscribe(createOperatorSubscriber(subscriber, closeWindow, noop, handleError)));
- },
- noop
- )
- );
- // Subscribe to the source to get things started.
- source.subscribe(
- createOperatorSubscriber(
- subscriber,
- (value: T) => {
- // Copy the windows array before we emit to
- // make sure we don't have issues with reentrant code.
- const windowsCopy = windows.slice();
- for (const window of windowsCopy) {
- window.next(value);
- }
- },
- () => {
- // Complete all of our windows before we complete.
- while (0 < windows.length) {
- windows.shift()!.complete();
- }
- subscriber.complete();
- },
- handleError,
- () => {
- // Add this finalization so that all window subjects are
- // disposed of. This way, if a user tries to subscribe
- // to a window *after* the outer subscription has been unsubscribed,
- // they will get an error, instead of waiting forever to
- // see if a value arrives.
- while (0 < windows.length) {
- windows.shift()!.unsubscribe();
- }
- }
- )
- );
- });
- }
|