import type { AMQPMessage } from "./amqp-message.js"
import type { AMQPProperties } from "./amqp-properties.js"
import type { ResolveBody } from "./amqp-publisher.js"
import type { AMQPSession } from "./amqp-session.js"
import type { AMQPSubscription } from "./amqp-subscription.js"
import { serializeAndEncode } from "./amqp-codec-registry.js"
import type { ParserMap, CoderMap } from "./amqp-codec-registry.js"
/**
* Callback invoked for each incoming RPC request.
* Receives a decoded {@link AMQPMessage} and returns the response body.
*/
export type RPCHandler
= (
msg: AMQPMessage
,
) => ResolveBody
| Promise>
/**
* An RPC server that consumes messages from a queue and replies to each caller.
*
* Uses the session's queue and subscribe machinery, so the consumer is
* automatically recovered after a reconnection.
*
* @example
* ```ts
* const session = await AMQPSession.connect("amqp://localhost")
* const server = await session.rpcServer("my_rpc_queue", async (msg) => {
* return `processed:${msg.bodyString()}`
* })
* // later…
* await session.stop()
* ```
*/
export class AMQPRPCServer<
P extends ParserMap = {},
C extends CoderMap = {},
KP extends keyof P & string = never,
KC extends keyof C & string = never,
> {
private readonly session: AMQPSession
private subscription: AMQPSubscription | null = null
/** @internal Use {@link AMQPSession.rpcServer} instead. */
constructor(session: AMQPSession
) {
this.session = session
}
/** @internal Called by {@link AMQPSession.rpcServer}. */
async start(queue: string, handler: RPCHandler
, prefetch = 1): Promise {
if (this.subscription) throw new Error("RPC server already started")
const q = await this.session.queue(queue)
this.subscription = await q.subscribe({ prefetch, noAck: false, requeueOnNack: false }, async (msg) => {
const { replyTo, correlationId } = msg.properties
if (!replyTo) {
await msg.nack(false)
return
}
const result = await handler(msg)
const replyProps: AMQPProperties = {}
if (correlationId !== undefined) replyProps.correlationId = correlationId
const defaults: { contentType?: string; contentEncoding?: string } = {}
if (this.session.defaultContentType) defaults.contentType = this.session.defaultContentType
if (this.session.defaultContentEncoding) defaults.contentEncoding = this.session.defaultContentEncoding
const encoded = await serializeAndEncode(
this.session.parsers ?? {},
this.session.coders ?? {},
result,
replyProps,
defaults,
)
await msg.channel.basicPublish("", replyTo, encoded.body, encoded.properties)
})
return this
}
/**
* Cancel the consumer. The underlying queue remains declared.
*/
async close(): Promise {
const sub = this.subscription
if (!sub) return
this.subscription = null
await sub.cancel()
}
}