/** * @license Use of this source code is governed by an MIT-style license that * can be found in the LICENSE file at https://github.com/cartant/rxjs-etc */ /*tslint:disable:rxjs-no-nested-subscribe*/ import { Observable, OperatorFunction, Subscription } from "rxjs"; interface Source { completed: boolean; observable: Observable; subscription?: Subscription; } export function mergeHigherOrderArray(): OperatorFunction< Observable[], T > { return (higherOrder) => new Observable((observer) => { let lasts: Source[] = []; let nexts: Source[] = []; let higherOrderCompleted = false; const higherOrderSubscription = new Subscription(); higherOrderSubscription.add( higherOrder.subscribe( (observables) => { const subscribes: (() => void)[] = []; nexts = observables.map((observable) => { const index = lasts.findIndex( (last) => last.observable === observable ); if (index !== -1) { const next = lasts[index]; lasts.splice(index, 1); return next; } const next: Source = { completed: false, observable }; subscribes.push(() => { if (higherOrderSubscription.closed) { return; } next.subscription = next.observable.subscribe( (value) => observer.next(value), (error) => observer.error(error), () => { next.completed = true; if ( higherOrderCompleted && nexts.every(({ completed }) => completed) ) { observer.complete(); } } ); higherOrderSubscription.add(next.subscription); }); return next; }); lasts.forEach(({ subscription }) => { if (subscription) { subscription.unsubscribe(); } }); lasts = nexts; subscribes.forEach((subscribe) => subscribe()); }, (error) => observer.error(error), () => { if (lasts.every(({ completed }) => completed)) { observer.complete(); } higherOrderCompleted = true; } ) ); return higherOrderSubscription; }); }