/// import { assert, expect, test, beforeEach, vi } from "vitest" import { AMQPWebSocketClient } from "../src/amqp-websocket-client.js" import { AMQPMessage } from "../src/amqp-message.js" import type { AMQPError } from "../src/amqp-error.js" const WS_URL = import.meta.env.VITE_WS_URL || "ws://127.0.0.1:15670/ws/amqp" function getNewClient(init?: { frameMax?: number; heartbeat?: number }): AMQPWebSocketClient { return init ? new AMQPWebSocketClient({ url: WS_URL, ...init }) : new AMQPWebSocketClient(WS_URL) } beforeEach(() => { expect.hasAssertions() }) test("can parse the url correctly", () => { const username = "user_name" const password = "passwd" const hostname = "127.0.0.1" const port = 15670 const vhost = "my_host" const name = "test" const client = new AMQPWebSocketClient({ url: `ws://${hostname}:${port}/ws/amqp`, username: username, password: password, vhost: vhost, name: name, }) expect(client.username).toEqual(username) expect(client.password).toEqual(password) expect(client.vhost).toEqual(vhost) expect(client.name).toEqual(name) }) test("can open a connection and a channel", () => { const amqp = getNewClient() return amqp .connect() .then((conn) => conn.channel()) .then((ch) => expect(ch.connection.channels.length).toEqual(2)) // 2 because channel 0 is counted }) test("can publish and consume", () => { const amqp = getNewClient() return new Promise((resolve, reject) => { amqp .connect() .then(async (conn) => { const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "hello world") await ch.basicConsume(q.name, { noAck: false }, (msg) => { msg.ack() resolve(msg) }) }) .catch(reject) }).then((result: AMQPMessage) => expect(result.bodyString()).toEqual("hello world")) }) test("can nack a message", () => { const amqp = getNewClient() return new Promise((resolve, reject) => { amqp .connect() .then(async (conn) => { const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "hello world") await ch.basicConsume(q.name, { noAck: false }, (msg) => { msg.nack() resolve(msg) }) }) .catch(reject) }).then((result: AMQPMessage) => expect(result.bodyString()).toEqual("hello world")) }) test("can reject a message", () => { const amqp = getNewClient() return new Promise((resolve, reject) => { amqp .connect() .then(async (conn) => { const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "hello world") await ch.basicConsume(q.name, { noAck: false }, (msg) => { msg.reject() resolve(msg) }) }) .catch(reject) }).then((result: AMQPMessage) => expect(result.bodyString()).toEqual("hello world")) }) test("can unbind a queue from exchange", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.queueBind(q.name, "amq.topic", "asd") await expect(ch.queueUnbind(q.name, "amq.topic", "asd")).resolves.toBeUndefined() }) test("can unsubscribe from a queue", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, {}, () => {}) await expect(ch.basicCancel(consumer.tag)).resolves.toBeDefined() }) test("can delete a queue", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await expect(ch.queueDelete(q.name)).resolves.toBeDefined() }) test("can get message from a queue", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "message") const msg = await ch.basicGet(q.name, { noAck: true }) expect((msg as AMQPMessage).bodyString()).toEqual("message") }) test("will throw an error", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.queueDeclare("amq.foobar")).rejects.toThrow(/ACCESS_REFUSED/) }) test("will throw an error after consumer timeout", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {}) await expect(consumer.wait(1)).rejects.toThrow() }) test("will throw an error if consumer is closed", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {}) consumer.setClosed(new Error("testing")) try { await consumer.wait(1) } catch (error) { expect((error as Error).message).toEqual("testing") } }) test("can cancel a consumer", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, console.log) const channel = await consumer.cancel() expect(channel.consumers.size).toEqual(0) }) test("will clear consumer wait timeout on cancel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {}) const wait = consumer.wait(5000) consumer.cancel() await expect(wait).resolves.toBeUndefined() }) test("can close a channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.close() await expect(ch.close()).rejects.toThrow("Channel is closed") }) test("connection error raises everywhere", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await conn.close() await expect(ch.close()).rejects.toThrow(/Channel is closed/) }) test("consumer stops wait on cancel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, {}, () => {}) await ch.basicPublish("", q.name, "foobar") await consumer.cancel() await expect(consumer.wait()).resolves.toBeUndefined() }) test("consumer stops wait on channel error", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, {}, () => {}) // acking invalid delivery tag should close channel setTimeout(() => ch.basicAck(99999), 1) await expect(consumer.wait()).rejects.toThrow() }) test("connection error raises on publish", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await conn.close() await expect(ch.basicPublish("", q.name, "foobar")).rejects.toThrow() }) test("closed socket closes client", async () => { const amqp = getNewClient() await amqp.connect() const socket = amqp["socket"] assert(socket, "Socket must be created") const closed = new Promise((resolve) => socket.addEventListener("close", resolve)) socket.close() await closed expect(amqp.closed).toBe(true) }) test("connection loss closes channels and consumers", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {}) // Set up error handler to track when consumer is closed const originalConsumerWait = consumer.wait() const socket = amqp["socket"] assert(socket, "Socket must be created") // Simulate unclean connection loss by closing the socket const closed = new Promise((resolve) => socket.addEventListener("close", resolve)) socket.close() await closed // Check that connection, channel, and consumer are all marked as closed expect(amqp.closed).toBe(true) expect(ch.closed).toBe(true) // Consumer wait should reject with an error await expect(originalConsumerWait).rejects.toThrow() // Verify that operations on closed objects throw errors await expect(ch.queueDeclare("")).rejects.toThrow(/closed/) await expect(ch.basicPublish("", q.name, "test")).rejects.toThrow() }) test("connection loss triggers onerror callback", async () => { const amqp = getNewClient() const conn = await amqp.connect() let errorReceived: AMQPError | null = null conn.onerror = vi.fn((err: AMQPError) => { errorReceived = err }) const socket = amqp["socket"] assert(socket, "Socket must be created") // Simulate unclean connection loss const closed = new Promise((resolve) => socket.addEventListener("close", resolve)) socket.close() await closed // Check that error callback was called expect(conn.onerror).toHaveBeenCalled() expect(errorReceived).toBeTruthy() if (errorReceived) { expect((errorReceived as AMQPError).message).toMatch(/connection not cleanly closed/) } expect(amqp.closed).toBe(true) }) test("wait for publish confirms", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() let tag // publishes without confirm should return 0 tag = await ch.basicPublish("amq.fanout", "rk", "body") expect(tag).toEqual(0) tag = await ch.basicPublish("amq.fanout", "rk", "body") expect(tag).toEqual(0) // publishes with confirm should return the delivery tag id await ch.confirmSelect() tag = await ch.basicPublish("amq.fanout", "rk", "body") expect(tag).toEqual(1) tag = await ch.basicPublish("amq.fanout", "rk", "body") expect(tag).toEqual(2) // can wait for multiple tags const tags = await Promise.all([ ch.basicPublish("amq.fanout", "rk", "body"), ch.basicPublish("amq.fanout", "rk", "body"), ]) expect(tags).toEqual([3, 4]) }) test("can handle returned messages", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const returned = new Promise((resolve) => (ch.onReturn = resolve)) await ch.basicPublish("", "not-a-queue", "body", {}, true) const msg = (await returned) as AMQPMessage expect(msg.replyCode).toEqual(312) expect(msg.routingKey).toEqual("not-a-queue") }) test("can handle nacks on confirm channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("", {}, { "x-overflow": "reject-publish", "x-max-length": 0 }) await ch.confirmSelect() await expect(ch.basicPublish("", q.name, "body")).rejects.toThrow("Message rejected") }) test("throws on unknown exchange type", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const name = "test" + Math.random() const unknownType = "unknowntype" + Math.random().toString(36).slice(2) // LavinMQ < 2.8.0 reports "invalid exchange type"; newer LavinMQ and RabbitMQ report "unknown exchange type" await expect(ch.exchangeDeclare(name, unknownType)).rejects.toThrow(/(unknown|invalid) exchange type/) }) test("can declare an exchange", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const name = "test" + Math.random() await ch.confirmSelect() await ch.exchangeDeclare(name, "fanout") await expect(ch.basicPublish(name, "rk", "body")).resolves.toBeDefined() await ch.exchangeDelete(name) await expect(ch.basicPublish(name, "rk", "body")).rejects.toThrow(/NOT_FOUND/) }) test("exchange to exchange bind/unbind", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const name1 = "test1" + Math.random() const name2 = "test2" + Math.random() await ch.exchangeDeclare(name1, "fanout", { autoDelete: false }) await ch.exchangeDeclare(name2, "fanout", { autoDelete: true }) await ch.exchangeBind(name2, name1) const q = await ch.queueDeclare("") await ch.queueBind(q.name, name2, "") await ch.confirmSelect() await ch.basicPublish(name1, "", "") const msg1 = await ch.basicGet(q.name) expect(msg1?.exchange).toEqual(name1) await ch.exchangeUnbind(name2, name1) await ch.basicPublish(name1, "", "") const msg2 = await ch.basicGet(q.name) expect(msg2).toBeNull() await ch.exchangeDelete(name1) }) // not implemented on servers test.skip("can change flow state of channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() let flow = await ch.basicFlow(false) expect(flow).toEqual(false) flow = await ch.basicFlow(true) expect(flow).toEqual(true) }) test("basic get", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") let msg msg = await ch.basicGet(q.name) expect(msg).toBeNull() await ch.basicPublish("", q.name, "foobar") msg = await ch.basicGet(q.name) expect(msg?.bodyToString()).toEqual("foobar") }) test("transactions", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.txSelect() await ch.basicPublish("", q.name, "foobar") const msg1 = await ch.basicGet(q.name) expect(msg1).toBeNull() await ch.txCommit() const msg2 = await ch.basicGet(q.name) expect(msg2, "missing message").toBeTruthy() expect(msg2?.bodyToString()).toEqual("foobar") await ch.basicPublish("", q.name, "foobar") await ch.txRollback() const msg3 = await ch.basicGet(q.name) expect(msg3).toBeNull() }) test("can publish and consume msgs with large headers", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "a".repeat(4000), { headers: { long: new Uint8Array(new TextEncoder().encode("a".repeat(4000))), }, }) await ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: "a".repeat(4000) } }) await ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: Array(100).fill("a") } }) const consumer = await ch.basicConsume(q.name, { noAck: false }, async (msg) => { if (msg.deliveryTag === 3) await msg.cancelConsumer() }) await expect(consumer.wait()).resolves.toBeUndefined() }) test("will throw when headers are too long", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await expect(ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: "a".repeat(9000) } })).rejects.toThrow() }) test("can purge a queue", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "a") const purged = await ch.queuePurge(q.name) expect(purged.messageCount).toEqual(1) const msg = await ch.basicGet(q.name) expect(msg).toBeNull() }) test("can publish all type of properties", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const headers = { a: 2, b: true, c: "c", d: 1.5, e: null, f: new Date(1000), g: { a: 1 }, i: 2 ** 32 + 1, j: 2.5 ** 33, } const properties = { contentType: "application/json", contentEncoding: "gzip", headers: headers, deliveryMode: 2, priority: 1, correlationId: "corr", replyTo: "me", expiration: "10000", messageId: "msgid", appId: "appid", userId: "guest", type: "type", timestamp: new Date(Math.round(Date.now() / 1000) * 1000), // amqp timestamps does only have second resolution } await ch.basicPublish("", q.name, "", properties) const msg = await ch.basicGet(q.name) expect(msg?.properties).toMatchObject(properties) }) test("cannot publish too long strings", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() expect(() => ch.queueDeclare("a".repeat(256))).toThrow(/Short string too long/) }) test("can set prefetch", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.prefetch(1)).resolves.toBeUndefined() }) test("can open a specific channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel(2) expect(ch.id).toEqual(2) }) test("can open a specific channel twice", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel(2) expect(ch.id).toEqual(2) const ch2 = await conn.channel(2) expect(ch2 === ch).toEqual(true) }) test("can publish messages spanning multiple frames", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.confirmSelect() const q = await ch.queueDeclare("") const sizes = [4087, 4088, 4089, 4096, 5000, 10000] expect.assertions(sizes.length) for (const n of sizes) { await ch.basicPublish("", q.name, new Uint8Array(n ?? 0)) const msg = await ch.basicGet(q.name) expect(msg?.bodySize).toEqual(n) } }) test("set basic flow on channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicFlow(true)).resolves.toBeDefined() }) test("confirming unknown deliveryTag", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() expect(() => ch.publishConfirmed(1, false, false)).not.toThrow() }) // ch.deliver does enqueue a microtask, rendering the ch.deliver method untestable. test.skip("delivering a message when no consumer exists raises", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const msg = new AMQPMessage(ch) msg.consumerTag = "abc" expect(() => ch.deliver(msg)).toThrow() }) test("can publish null", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicPublish("amq.topic", "", null)).resolves.toBeDefined() }) test("can publish ArrayBuffer", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicPublish("amq.topic", "", new ArrayBuffer(2))).resolves.toBeDefined() }) test("can publish Uint8array", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicPublish("amq.topic", "", new Uint8Array(2))).resolves.toBeDefined() }) test("can do basicRecover", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicRecover(true)).resolves.toBeUndefined() }) test("can set frameMax", async () => { const amqp = getNewClient({ frameMax: 16 * 1024 }) const conn = await amqp.connect() const ch = await conn.channel() await ch.confirmSelect() const q = await ch.queueDeclare("") const headerValue = "a".repeat(conn.frameMax - 100) // leave some space for other parts of the frame await ch.basicPublish("", q.name, "", { headers: { a: headerValue } }) const msg = await ch.basicGet(q.name) if (msg) { const props = msg.properties if (props) { const headers = props.headers if (headers) { const a = headers["a"] as string expect(a.length).toEqual(headerValue.length) } else expect(headers).toBeTruthy() } else expect(props).toBeTruthy() } else expect(msg).toBeTruthy() }) test("can't set too small frameMax", () => { expect(() => getNewClient({ frameMax: 16 })).toThrow() }) test("can handle frames split over socket reads", async () => { const amqp = getNewClient({ frameMax: 8 * 1024 }) const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const body = "a".repeat(5) const msgs = 100000 for (let i = 0; i < msgs; i++) { await ch.basicPublish("", q.name, body) } let i = 0 const consumer = await ch.basicConsume(q.name, { noAck: true }, () => { if (++i === msgs) consumer.cancel() }) await consumer.wait(20_000) expect(i).toEqual(msgs) }, 60_000) test("have to connect socket before opening channels", async () => { const amqp = getNewClient() await expect(amqp.channel()).rejects.toThrow(/Connection closed/) }) test("will raise if socket is closed on send", async () => { const amqp = getNewClient() const conn = await amqp.connect() await amqp.close() await expect(conn.channel()).rejects.toThrow() }, 10_000) test("can handle cancel from server", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, {}, () => {}) await ch.queueDelete(q.name) await expect(consumer.wait()).rejects.toThrow(/Consumer cancelled by the server/) }, 10_000) test("can handle heartbeats", async () => { const amqp = getNewClient({ heartbeat: 1 }) const conn = await amqp.connect() const wait = new Promise((resolv) => setTimeout(resolv, 2000)) await wait expect(conn.closed).toEqual(false) }, 10_000) test("has an onerror callback", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() let errMessage: string | null = null ch.onerror = vi.fn((reason) => (errMessage = reason)) const unknownType = "unknowntype" + Math.random().toString(36).slice(2) await expect(ch.exchangeDeclare("name" + Math.random(), unknownType)).rejects.toThrow() expect(ch.onerror).toBeCalled() // LavinMQ < 2.8.0 reports "invalid exchange type"; newer LavinMQ and RabbitMQ report "unknown exchange type" expect(errMessage).toMatch(/(unknown|invalid) exchange type/) }) test("onerror is not called when conn is closed by client", async () => { const amqp = getNewClient() const conn = await amqp.connect() const callbackPromise = new Promise((done, reject) => { conn.onerror = vi.fn((err: AMQPError) => reject(new Error(`onerror should not be called when gracefully closed. Error was: ${err.message}`)), ) setTimeout(done, 10) }) await conn.close() await expect(callbackPromise).resolves.toBeUndefined() expect(conn.onerror).not.toHaveBeenCalled() }) test("will throw on too large headers", async () => { const amqp = getNewClient({ frameMax: 8192 }) const conn = await amqp.connect() const ch = await conn.channel() await expect( ch.basicPublish("", "x".repeat(255), null, { headers: { a: Array(4000).fill(1) }, }), ).rejects.toThrow(RangeError) await expect(ch.basicPublish("", "", null, { headers: { a: "x".repeat(9000) } })).rejects.toThrow(RangeError) }) test("will split body over multiple frames", async () => { const amqp = getNewClient({ frameMax: 8192 }) const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.confirmSelect() await ch.basicPublish("", q.name, "x".repeat(5000)) const msg = await ch.basicGet(q.name) if (msg) if (msg._rawBytes) expect(msg._rawBytes.length).toEqual(5000) else assert.fail("no body") else assert.fail("no msg") }) test("can republish in consume block without race condition", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.prefetch(0) const q = await ch.queueDeclare("") const queueName = q.name await ch.confirmSelect() await ch.basicPublish("", queueName, "x".repeat(500)) const consumer = await ch.basicConsume(queueName, { noAck: false }, async (msg) => { if (msg.deliveryTag < 10000) { await Promise.all([ ch.basicPublish("", queueName, msg._rawBytes), ch.basicPublish("", queueName, msg._rawBytes), msg.ack(), ]) } else if (msg.deliveryTag === 10000) { await consumer.cancel() } }) await expect(consumer.wait()).resolves.toBeUndefined() await expect(conn.close()).resolves.toBeUndefined() console.log(conn.bufferPool.length) }, 20_000) test("raises when channelMax is reached", async () => { const amqp = getNewClient() const conn = await amqp.connect() for (let i = 0; i < conn.channelMax; i++) { await conn.channel() } await expect(conn.channel()).rejects.toThrow("Max number of channels reached") // make sure other channels still work const ch1 = await conn.channel(1) await expect(ch1.basicQos(10)).resolves.toBeUndefined() }, 20_000) test("should fail to connect to an AMQP port", async () => { const amqp = new AMQPWebSocketClient("ws://127.0.0.1:5672/ws/amqp") await expect(amqp.connect()).rejects.toThrow() })