/** * @license Use of this source code is governed by an MIT-style license that * can be found in the LICENSE file at https://github.com/cartant/rxjs-etc */ import { ConnectableObservable, Observable, OperatorFunction, Subject, Subscription, } from "rxjs"; import { publish } from "rxjs/operators"; export function prioritize( selector: ( prioritized: Observable, deprioritized: Observable, ...rest: Observable[] ) => Observable ): OperatorFunction { return (source: Observable) => new Observable((observer) => { const published = publish()(source) as ConnectableObservable; const subjects: Subject[] = []; const subscription = new Subscription(); const length = Math.max(selector.length, 2); for (let i = 0; i < length; ++i) { const subject = new Subject(); subjects.push(subject); subscription.add(published.subscribe(subject)); } const [first, second, ...rest] = subjects; subscription.add(selector(first, second, ...rest).subscribe(observer)); subscription.add(published.connect()); return subscription; }); }