/* * Copyright 2020-2021 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { isRequestError } from "./msg.ts"; import { createInbox, ErrorCode, Msg, NatsError, Request } from "./core.ts"; export class MuxSubscription { baseInbox!: string; reqs: Map; constructor() { this.reqs = new Map(); } size(): number { return this.reqs.size; } init(prefix?: string): string { this.baseInbox = `${createInbox(prefix)}.`; return this.baseInbox; } add(r: Request) { if (!isNaN(r.received)) { r.received = 0; } this.reqs.set(r.token, r); } get(token: string): Request | undefined { return this.reqs.get(token); } cancel(r: Request): void { this.reqs.delete(r.token); } getToken(m: Msg): string | null { const s = m.subject || ""; if (s.indexOf(this.baseInbox) === 0) { return s.substring(this.baseInbox.length); } return null; } all(): Request[] { return Array.from(this.reqs.values()); } handleError(isMuxPermissionError: boolean, err?: NatsError): boolean { if (err && err.permissionContext) { if (isMuxPermissionError) { // one or more requests queued but mux cannot process them this.all().forEach((r) => { r.resolver(err, {} as Msg); }); return true; } const ctx = err.permissionContext; if (ctx.operation === "publish") { const req = this.all().find((s) => { return s.requestSubject === ctx.subject; }); if (req) { req.resolver(err, {} as Msg); return true; } } } return false; } dispatcher() { return (err: NatsError | null, m: Msg) => { const token = this.getToken(m); if (token) { const r = this.get(token); if (r) { if (err === null && m.headers) { err = isRequestError(m); } r.resolver(err, m); } } }; } close() { const err = NatsError.errorForCode(ErrorCode.Timeout); this.reqs.forEach((req) => { req.resolver(err, {} as Msg); }); } }