import type { AMQPChannel } from "./amqp-channel.js"
import type { AMQPMessage } from "./amqp-message.js"
import type { AMQPProperties } from "./amqp-properties.js"
import type { AMQPSession } from "./amqp-session.js"
import type { ResolveBody } from "./amqp-publisher.js"
import { serializeAndEncode, decodeMessage } from "./amqp-codec-registry.js"
import type { ParserMap, CoderMap } from "./amqp-codec-registry.js"
const DIRECT_REPLY_TO = "amq.rabbitmq.reply-to"
/**
* Reusable RPC client using the direct reply-to feature.
*
* @example
* ```ts
* const session = await AMQPSession.connect("amqp://localhost")
* const rpc = await session.rpcClient() // tracked for reconnect recovery
* const reply = await rpc.call("my_queue", "request body")
* console.log(reply.bodyString())
* await rpc.close()
* ```
*/
export class AMQPRPCClient<
P extends ParserMap = {},
C extends CoderMap = {},
KP extends keyof P & string = never,
KC extends keyof C & string = never,
> {
private readonly session: AMQPSession
private ch: AMQPChannel | null = null
private correlationId = 0
private readonly pending = new Map<
string,
{
resolve: (msg: AMQPMessage
) => void
reject: (err: Error) => void
timer: ReturnType | undefined
}
>()
private closed = false
/** @internal Use {@link AMQPSession.rpcClient} instead. */
constructor(session: AMQPSession) {
this.session = session
}
/** @internal Called by {@link AMQPSession.rpcClient}. */
async start(): Promise {
if (this.closed) throw new Error("RPC client is closed")
if (this.ch && !this.ch.closed) return this
const ch = await this.session.openChannel()
try {
// Direct reply-to is scoped per-channel by RabbitMQ, so only replies
// for this client arrive here. Messages with an unknown correlationId
// (e.g. late replies after a timeout) are intentionally dropped — with
// noAck: true they are acknowledged on delivery and cannot be requeued.
const parsers = this.session.parsers
const coders = this.session.coders
await ch.basicConsume(DIRECT_REPLY_TO, { noAck: true }, async (msg) => {
const id = msg.properties.correlationId
if (id === undefined) return
const entry = this.pending.get(id)
if (!entry) return
this.pending.delete(id)
if (entry.timer) clearTimeout(entry.timer)
try {
if (parsers || coders) await decodeMessage(msg, parsers ?? {}, coders ?? {})
entry.resolve(msg as AMQPMessage)
} catch (err) {
entry.reject(err instanceof Error ? err : new Error(String(err)))
}
})
this.ch = ch
return this
} catch (err) {
ch.close().catch(() => {})
throw err
}
}
/**
* Perform an RPC call: publish a message and wait for the response.
*
* @param queue - The queue name (routing key) of the RPC server
* @param body - The request body
* @param options - Optional properties and timeout
* @param options.timeout - Timeout in milliseconds. Rejects with an error if
* no response is received within this time.
* @returns The reply {@link AMQPMessage}
*/
async call(
queue: string,
body: ResolveBody
,
{ timeout, ...properties }: AMQPProperties & { timeout?: number } = {},
): Promise> {
if (this.closed) throw new Error("RPC client is closed")
if (!this.ch || this.ch.closed) throw new Error("RPC client not started, call start() first")
const ch = this.ch
const correlationId = (++this.correlationId).toString(36)
const defaults: { contentType?: string; contentEncoding?: string } = {}
if (this.session.defaultContentType) defaults.contentType = this.session.defaultContentType
if (this.session.defaultContentEncoding) defaults.contentEncoding = this.session.defaultContentEncoding
const encoded = await serializeAndEncode(
this.session.parsers ?? {},
this.session.coders ?? {},
body,
properties,
defaults,
)
return new Promise>((resolve, reject) => {
let timer: ReturnType | undefined
if (timeout !== undefined && timeout > 0) {
timer = setTimeout(() => {
this.pending.delete(correlationId)
reject(new Error(`No response received in ${timeout}ms`))
}, timeout)
}
this.pending.set(correlationId, { resolve, reject, timer })
ch.basicPublish("", queue, encoded.body, {
...encoded.properties,
replyTo: DIRECT_REPLY_TO,
correlationId,
}).catch((err) => {
const entry = this.pending.get(correlationId)
if (!entry) return
this.pending.delete(correlationId)
if (entry.timer) clearTimeout(entry.timer)
entry.reject(err)
})
})
}
/**
* Re-establish the channel and consumer after a reconnection.
* All pending calls are rejected since the old channel is gone.
* @internal Called by the session's reconnect loop.
*/
async recover(): Promise {
if (this.closed) return
this.rejectAllPending(new Error("RPC client reconnecting"))
this.ch = null
await this.start()
}
/**
* Close the dedicated channel, reject any pending calls, and remove
* this client from the session's reconnect recovery.
*/
async close(): Promise {
if (this.closed) return
this.closed = true
this.session.untrackRPCClient(this)
this.rejectAllPending(new Error("RPC client closed"))
const ch = this.ch
this.ch = null
if (ch && !ch.closed) {
await ch.close()
}
}
private rejectAllPending(err: Error): void {
for (const [id, entry] of this.pending) {
if (entry.timer) clearTimeout(entry.timer)
entry.reject(err)
this.pending.delete(id)
}
}
}