/** * @license Use of this source code is governed by an MIT-style license that * can be found in the LICENSE file at https://github.com/cartant/rxjs-etc */ import { concat, MonoTypeOperatorFunction, Observable, of, SchedulerLike, } from "rxjs"; import { concatMap, delay, distinctUntilChanged, filter, publish, startWith, switchMap, take, takeUntil, } from "rxjs/operators"; export function throttleAfter( notifier: Observable, duration: number, scheduler?: SchedulerLike ): MonoTypeOperatorFunction { return (source: Observable) => source.pipe( publish((sharedSource) => notifier.pipe( switchMap(() => concat(of(true), delay(duration, scheduler)(of(false))) ), startWith(false), distinctUntilChanged(), publish((sharedSignal: Observable) => sharedSignal.pipe( concatMap((signalled: boolean) => signalled ? sharedSource.pipe( take(1), takeUntil( sharedSignal.pipe( filter((signalled: boolean) => !signalled) ) ) ) : sharedSource.pipe( takeUntil( sharedSignal.pipe( filter((signalled: boolean) => signalled) ) ) ) ) ) ) ) ) ); }