/** * @license Use of this source code is governed by an MIT-style license that * can be found in the LICENSE file at https://github.com/cartant/rxjs-etc */ import { concat, from, identity, MonoTypeOperatorFunction, noop, Observable, ObservableInput, of, OperatorFunction, Subject, } from "rxjs"; import { expand, ignoreElements, mergeMap, tap } from "rxjs/operators"; import { OperatorSubscriber } from "../OperatorSubscriber"; import { NotificationQueue } from "./NotificationQueue"; export type TraverseElement = { markers: ObservableInput; values: ObservableInput; }; export type TraverseFactory = ( marker: M | undefined, index: number ) => Observable>; export function traverse(options: { concurrency?: number; factory: TraverseFactory; notifier: Observable; }): Observable; export function traverse(options: { concurrency?: number; factory: TraverseFactory; operator: OperatorFunction; }): Observable; export function traverse(options: { concurrency?: number; factory: TraverseFactory; }): Observable; // https://github.com/palantir/tslint/issues/3906 export function traverse({ concurrency = 1, factory, operator = identity, notifier, }: { concurrency?: number; factory: TraverseFactory; operator?: OperatorFunction; notifier?: Observable; }): Observable { return new Observable((subscriber) => { let queue: NotificationQueue; let queueOperator: MonoTypeOperatorFunction; if (notifier) { queue = new NotificationQueue(notifier); queueOperator = identity; } else { const subject = new Subject(); queue = new NotificationQueue(subject); queueOperator = (markers) => { subject.next(); return markers; }; } const destination = new Subject(); destination.subscribe(subscriber); subscriber.add(queue.connect()); of(undefined) .pipe( expand( (marker: M | undefined) => queue.pipe( mergeMap((index) => factory(marker, index).pipe( mergeMap(({ markers, values }) => concat( from(values).pipe( operator, tap((value) => destination.next(value)), ignoreElements() ), from(markers) ) ) ) ), queueOperator ), concurrency ) ) .subscribe( new OperatorSubscriber(subscriber, { complete: () => destination.complete(), error: (error) => destination.error(error), next: noop, }) ); }); }