///
import { expect, test, beforeEach } from "vitest"
import { AMQPSession } from "../src/amqp-session.js"
import { AMQPQueue } from "../src/amqp-queue.js"
import { AMQPExchange } from "../src/amqp-exchange.js"
import { AMQPMessage } from "../src/amqp-message.js"
const WS_URL = import.meta.env.VITE_WS_URL || "ws://127.0.0.1:15670/ws/amqp"
beforeEach(() => {
expect.hasAssertions()
})
test("AMQPSession.connect() returns a session over WebSocket", async () => {
const session = await AMQPSession.connect(WS_URL)
expect(session).toBeInstanceOf(AMQPSession)
await session.stop()
})
test("session.queue() declares a queue and returns AMQPQueue", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-sq-" + Math.random(), { durable: false, autoDelete: true })
expect(q).toBeInstanceOf(AMQPQueue)
expect(q.name).toMatch(/^test-ws-sq-/)
await session.stop()
})
test("AMQPQueue.publish() and get() round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-rtt-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("round-trip")
const msg = await q.get({ noAck: true })
expect(msg?.bodyString()).toBe("round-trip")
await session.stop()
})
test("AMQPQueue.subscribe() delivers messages via callback", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-sub-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("hello ws queue")
const received = await new Promise((resolve) => {
q.subscribe({ noAck: true }, (msg) => resolve(msg.bodyString()!))
})
expect(received).toBe("hello ws queue")
await session.stop()
})
test("AMQPQueue.subscribe() nack", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-nack-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("nack me")
const msg = await new Promise((resolve) => {
q.subscribe({ noAck: false }, (m) => {
m.nack()
resolve(m)
})
})
expect(msg.bodyString()).toBe("nack me")
await session.stop()
})
test("AMQPQueue.subscribe() async generator", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-gen-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("msg1")
await q.publish("msg2")
const received: string[] = []
const sub = await q.subscribe({ noAck: true })
for await (const msg of sub) {
received.push(msg.bodyString()!)
if (received.length >= 2) break
}
expect(received).toEqual(["msg1", "msg2"])
await session.stop()
})
test("AMQPQueue.publish({ confirm: false }) sends without waiting for confirm", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-paf-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("fire and forget", { confirm: false })
await new Promise((resolve) => setTimeout(resolve, 50))
const msg = await q.get({ noAck: true })
expect(msg?.bodyString()).toBe("fire and forget")
await session.stop()
})
test("AMQPQueue.bind() and unbind()", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-bind-" + Math.random(), { durable: false, autoDelete: true })
await expect(q.bind("amq.topic", "test.key")).resolves.toBeInstanceOf(AMQPQueue)
await expect(q.unbind("amq.topic", "test.key")).resolves.toBeInstanceOf(AMQPQueue)
await session.stop()
})
test("AMQPQueue.purge() empties the queue", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-purge-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("msg1", { confirm: false })
await q.publish("msg2", { confirm: false })
await new Promise((resolve) => setTimeout(resolve, 50))
const result = await q.purge()
expect(result.messageCount).toBe(2)
await session.stop()
})
test("AMQPQueue.delete() removes the queue", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-del-" + Math.random(), { durable: false, autoDelete: false })
await expect(q.delete()).resolves.toBeDefined()
await session.stop()
})
test("AMQPQueue.subscribe() with unconfirmed publish round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-sessub-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("via subscribe", { confirm: false })
const received = await new Promise((resolve) => {
void q.subscribe({ noAck: true }, (msg) => resolve(msg.bodyString()!))
})
expect(received).toBe("via subscribe")
await session.stop()
})
test("AMQPQueue.publish() and get() confirmed round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-spub-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("confirmed")
const msg = await q.get({ noAck: true })
expect(msg?.bodyString()).toBe("confirmed")
await session.stop()
})
test("session.exchange() and AMQPExchange.publish() route messages", async () => {
const session = await AMQPSession.connect(WS_URL)
const xName = "test-ws-x-" + Math.random()
const q = await session.queue("test-ws-xq-" + Math.random(), { durable: false, autoDelete: true })
const x = await session.fanoutExchange(xName, { durable: false, autoDelete: true })
expect(x).toBeInstanceOf(AMQPExchange)
await q.bind(xName)
await x.publish("via exchange")
await new Promise((resolve) => setTimeout(resolve, 100))
const msg = await q.get({ noAck: true })
expect(msg?.bodyString()).toBe("via exchange")
await session.stop()
})
test("session.rpcClient() and session.rpcServer() round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const qName = "test-ws-rpc-" + Math.random()
await session.rpcServer(qName, (msg) => {
return `reply:${msg.bodyString()}`
})
const rpc = await session.rpcClient()
const reply = await rpc.call(qName, "hello")
expect(reply.bodyString()).toEqual("reply:hello")
await session.stop()
})
test("session.rpcCall() one-shot round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const qName = "test-ws-rpc-oneshot-" + Math.random()
await session.rpcServer(qName, (msg) => {
return `got:${msg.bodyString()}`
})
const reply = await session.rpcCall(qName, "ping")
expect(reply.bodyString()).toEqual("got:ping")
await session.stop()
})