import { assert, expect, test, beforeEach, vi } from "vitest" import { AMQPClient } from "../src/amqp-socket-client.js" import { AMQPWebSocketClient } from "../src/amqp-websocket-client.js" import { AMQPMessage } from "../src/amqp-message.js" import type { AMQPError } from "../src/amqp-error.js" function getNewClient(init?: { frameMax?: number; heartbeat?: number; channelMax?: number }): AMQPClient { const url = new URL("amqp://127.0.0.1") if (init?.frameMax != null) url.searchParams.append("frameMax", init.frameMax.toString()) if (init?.heartbeat != null) url.searchParams.append("heartbeat", init.heartbeat.toString()) if (init?.channelMax != null) url.searchParams.append("channelMax", init.channelMax.toString()) return new AMQPClient(url.toString()) } beforeEach(() => { expect.hasAssertions() }) test("can parse the url correctly", () => { const username = "user_name" const password = "passwd" const hostname = "127.0.0.1" const port = 5672 const vhost = "my_host" const name = "test" const client = new AMQPClient(`amqp://${username}:${password}@${hostname}:${port}/${vhost}?name=${name}`) expect(client.username).toEqual(username) expect(client.password).toEqual(password) expect(client.host).toEqual(hostname) expect(client.port).toEqual(port) expect(client.vhost).toEqual(vhost) expect(client.name).toEqual(name) }) test("can open a connection and a channel", () => { const amqp = getNewClient() return amqp .connect() .then((conn) => conn.channel()) .then((ch) => expect(ch.connection.channels.length).toEqual(2)) // 2 because channel 0 is counted }) test("can publish and consume", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "hello world") const msg = await new Promise((resolve, reject) => { ch.basicConsume(q.name, { noAck: false }, (msg) => { msg.ack() resolve(msg) }).catch(reject) }) expect(msg.bodyString()).toEqual("hello world") }) test("can nack a message", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "hello world") const msg = await new Promise((resolve, reject) => { ch.basicConsume(q.name, { noAck: false }, (msg) => { msg.nack() resolve(msg) }).catch(reject) }) expect(msg.bodyString()).toEqual("hello world") }) test("can reject a message", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "hello world") const msg = await new Promise((resolve, reject) => { ch.basicConsume(q.name, { noAck: false }, (msg) => { msg.reject() resolve(msg) }).catch(reject) }) expect(msg.bodyString()).toEqual("hello world") }) test("can unbind a queue from exchange", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.queueBind(q.name, "amq.topic", "asd") await expect(ch.queueUnbind(q.name, "amq.topic", "asd")).resolves.toBeUndefined() }) test("can cancel a consumer via basicCancel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, {}, () => {}) await expect(ch.basicCancel(consumer.tag)).resolves.toBeDefined() }) test("can subscribe using AsyncGenerator", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "hello world") const consumer = await ch.basicConsume(q.name, { noAck: true }) const generator = consumer.messages const result = await generator.next() expect(result.done).toBe(false) const value = result.value if (!value) throw new Error("Expected a message") expect(value.bodyString()).toEqual("hello world") await generator.return() }) test("can consume multiple messages with AsyncGenerator", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") // Publish 3 messages await ch.basicPublish("", q.name, "message 1") await ch.basicPublish("", q.name, "message 2") await ch.basicPublish("", q.name, "message 3") const consumer = await ch.basicConsume(q.name, { noAck: true }) const messages: string[] = [] let count = 0 for await (const msg of consumer.messages) { messages.push(msg.bodyString()!) count++ if (count === 3) break } expect(messages).toEqual(["message 1", "message 2", "message 3"]) }) test("AsyncGenerator with manual acknowledgment", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "test message") const consumer = await ch.basicConsume(q.name, { noAck: false }) let ackCalled = false for await (const msg of consumer.messages) { expect(msg.bodyString()).toEqual("test message") await msg.ack() ackCalled = true break } expect(ackCalled).toBe(true) }) test("AsyncGenerator with nack", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "test message") const consumer = await ch.basicConsume(q.name, { noAck: false }) let nackCalled = false for await (const msg of consumer.messages) { expect(msg.bodyString()).toEqual("test message") await msg.nack(false) nackCalled = true break } expect(nackCalled).toBe(true) }) test("AsyncGenerator auto-cancels consumer on break", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") // Publish multiple messages for (let i = 0; i < 5; i++) { await ch.basicPublish("", q.name, `message ${i}`) } const consumer = await ch.basicConsume(q.name, { noAck: true }) let receivedCount = 0 for await (const msg of consumer.messages) { void msg // Intentionally unused in this test receivedCount++ if (receivedCount === 2) break } expect(receivedCount).toBe(2) }) test("AsyncGenerator works with prefetch", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.prefetch(1) const q = await ch.queueDeclare("") // Publish multiple messages await ch.basicPublish("", q.name, "message 1") await ch.basicPublish("", q.name, "message 2") await ch.basicPublish("", q.name, "message 3") const consumer = await ch.basicConsume(q.name, { noAck: false }) const messages: string[] = [] for await (const msg of consumer.messages) { messages.push(msg.bodyString()!) await msg.ack() if (messages.length === 3) break } expect(messages).toEqual(["message 1", "message 2", "message 3"]) }) test("AsyncGenerator with exclusive consumer", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "exclusive message") const consumer = await ch.basicConsume(q.name, { noAck: true, exclusive: true }) for await (const msg of consumer.messages) { expect(msg.bodyString()).toEqual("exclusive message") break } }) test("can delete a queue", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await expect(ch.queueDelete(q.name)).resolves.toBeDefined() }) test("can get message from a queue", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "message") const msg = await ch.basicGet(q.name, { noAck: true }) expect((msg as AMQPMessage).bodyString()).toEqual("message") }) test("will throw an error", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.queueDeclare("amq.foobar")).rejects.toThrow(/ACCESS_REFUSED/) }) test("will throw an error after consumer timeout", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {}) await expect(consumer.wait(1)).rejects.toThrow() }) test("will throw an error if consumer is closed", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {}) consumer.setClosed(new Error("testing")) try { await consumer.wait(1) } catch (error) { expect((error as Error).message).toEqual("testing") } }) test("can cancel a consumer", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, console.log) const channel = await consumer.cancel() expect(channel.consumers.size).toEqual(0) }) test("will clear consumer wait timeout on cancel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {}) const wait = consumer.wait(5000) consumer.cancel() await expect(wait).resolves.toBeUndefined() }) test("can close a channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.close() await expect(ch.close()).rejects.toThrow("Channel is closed") }) test("connection error raises everywhere", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await conn.close() await expect(ch.close()).rejects.toThrow(/Channel is closed/) }) test("consumer stops wait on cancel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, {}, () => {}) await ch.basicPublish("", q.name, "foobar") await consumer.cancel() await expect(consumer.wait()).resolves.toBeUndefined() }) test("consumer stops wait on channel error", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, {}, () => {}) // acking invalid delivery tag should close channel setTimeout(() => ch.basicAck(99999), 1) await expect(consumer.wait()).rejects.toThrow() }) test("connection error raises on publish", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await conn.close() await expect(ch.basicPublish("", q.name, "foobar")).rejects.toThrow() }) test("closed socket closes client", async () => { const amqp = getNewClient() await amqp.connect() const socket = amqp["socket"] assert(socket, "Socket must be created") const closed = new Promise((resolve) => socket.on("close", resolve)) socket.destroy() await closed expect(amqp.closed).toBe(true) }) test("wait for publish confirms", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() let tag // publishes without confirm should return 0 tag = await ch.basicPublish("amq.fanout", "rk", "body") expect(tag).toEqual(0) tag = await ch.basicPublish("amq.fanout", "rk", "body") expect(tag).toEqual(0) // publishes with confirm should return the delivery tag id await ch.confirmSelect() tag = await ch.basicPublish("amq.fanout", "rk", "body") expect(tag).toEqual(1) tag = await ch.basicPublish("amq.fanout", "rk", "body") expect(tag).toEqual(2) // can wait for multiple tags const tags = await Promise.all([ ch.basicPublish("amq.fanout", "rk", "body"), ch.basicPublish("amq.fanout", "rk", "body"), ]) expect(tags).toEqual([3, 4]) }) test("can handle returned messages", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const returned = new Promise((resolve) => (ch.onReturn = resolve)) await ch.basicPublish("", "not-a-queue", "body", {}, true) const msg = (await returned) as AMQPMessage expect(msg.replyCode).toEqual(312) expect(msg.routingKey).toEqual("not-a-queue") }) test("can handle nacks on confirm channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("", {}, { "x-overflow": "reject-publish", "x-max-length": 0 }) await ch.confirmSelect() await expect(ch.basicPublish("", q.name, "body")).rejects.toThrow("Message rejected") }) test("throws on unknown exchange type", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const name = "test" + Math.random() const unknownType = "unknowntype" + Math.random().toString(36).slice(2) // LavinMQ < 2.8.0 reports "invalid exchange type"; newer LavinMQ and RabbitMQ report "unknown exchange type" await expect(ch.exchangeDeclare(name, unknownType)).rejects.toThrow(/(unknown|invalid) exchange type/) }) test("can declare an exchange", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const name = "test" + Math.random() await ch.confirmSelect() await ch.exchangeDeclare(name, "fanout") await expect(ch.basicPublish(name, "rk", "body")).resolves.toBeDefined() await ch.exchangeDelete(name) await expect(ch.basicPublish(name, "rk", "body")).rejects.toThrow(/NOT_FOUND/) }) test("exchange to exchange bind/unbind", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const name1 = "test1" + Math.random() const name2 = "test2" + Math.random() await ch.exchangeDeclare(name1, "fanout", { autoDelete: false }) await ch.exchangeDeclare(name2, "fanout", { autoDelete: true }) await ch.exchangeBind(name2, name1) const q = await ch.queueDeclare("") await ch.queueBind(q.name, name2, "") await ch.confirmSelect() await ch.basicPublish(name1, "", "") const msg1 = await ch.basicGet(q.name) expect(msg1?.exchange).toEqual(name1) await ch.exchangeUnbind(name2, name1) await ch.basicPublish(name1, "", "") const msg2 = await ch.basicGet(q.name) expect(msg2).toBeNull() await ch.exchangeDelete(name1) }) // not implemented on servers test.skip("can change flow state of channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() let flow = await ch.basicFlow(false) expect(flow).toEqual(false) flow = await ch.basicFlow(true) expect(flow).toEqual(true) }) test("basic get", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") let msg msg = await ch.basicGet(q.name) expect(msg).toBeNull() await ch.basicPublish("", q.name, "foobar") msg = await ch.basicGet(q.name) expect(msg?.bodyToString()).toEqual("foobar") }) test("transactions", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.txSelect() await ch.basicPublish("", q.name, "foobar") const msg1 = await ch.basicGet(q.name) expect(msg1).toBeNull() await ch.txCommit() const msg2 = await ch.basicGet(q.name) expect(msg2, "missing message").toBeTruthy() expect(msg2?.bodyToString()).toEqual("foobar") await ch.basicPublish("", q.name, "foobar") await ch.txRollback() const msg3 = await ch.basicGet(q.name) expect(msg3).toBeNull() }) test("can publish and consume msgs with large headers", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "a".repeat(4000), { headers: { long: Buffer.from("a".repeat(4000)) }, }) await ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: "a".repeat(4000) } }) await ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: Array(100).fill("a") } }) const consumer = await ch.basicConsume(q.name, { noAck: false }, async (msg) => { if (msg.deliveryTag === 3) await msg.cancelConsumer() }) await expect(consumer.wait()).resolves.toBeUndefined() }) test("will throw when headers are too long", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await expect(ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: "a".repeat(9000) } })).rejects.toThrow() }) test("can purge a queue", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "a") const purged = await ch.queuePurge(q.name) expect(purged.messageCount).toEqual(1) const msg = await ch.basicGet(q.name) expect(msg).toBeNull() }) test("can publish all type of properties", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const headers = { a: 2, b: true, c: "c", d: 1.5, e: null, f: new Date(1000), g: { a: 1 }, i: 2 ** 32 + 1, j: 2.5 ** 33, } const properties = { contentType: "application/json", contentEncoding: "gzip", headers: headers, deliveryMode: 2, priority: 1, correlationId: "corr", replyTo: "me", expiration: "10000", messageId: "msgid", appId: "appid", userId: "guest", type: "type", timestamp: new Date(Math.round(Date.now() / 1000) * 1000), // amqp timestamps does only have second resolution } await ch.basicPublish("", q.name, "", properties) const msg = await ch.basicGet(q.name) expect(msg?.properties).toMatchObject(properties) }) test("cannot publish too long strings", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() expect(() => ch.queueDeclare("a".repeat(256))).toThrow(/Short string too long/) }) test("can set prefetch", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.prefetch(1)).resolves.toBeUndefined() }) test("can open a specific channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel(2) expect(ch.id).toEqual(2) }) test("can open a specific channel twice", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel(2) expect(ch.id).toEqual(2) const ch2 = await conn.channel(2) expect(ch2 === ch).toEqual(true) }) test("can publish messages spanning multiple frames", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.confirmSelect() const q = await ch.queueDeclare("") const sizes = [4087, 4088, 4089, 4096, 5000, 10000] expect.assertions(sizes.length) for (const n of sizes) { await ch.basicPublish("", q.name, Buffer.alloc(n ?? 0)) const msg = await ch.basicGet(q.name) expect(msg?.bodySize).toEqual(n) } }) test("set basic flow on channel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicFlow(true)).resolves.toBeDefined() }) test("confirming unknown deliveryTag", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() expect(() => ch.publishConfirmed(1, false, false)).not.toThrow() }) // ch.deliver does enqueue a microtask, rendering the ch.deliver method untestable. test.skip("delivering a message when no consumer exists raises", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const msg = new AMQPMessage(ch) msg.consumerTag = "abc" expect(() => ch.deliver(msg)).toThrow() }) test("can publish null", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicPublish("amq.topic", "", null)).resolves.toBeDefined() }) test("can publish ArrayBuffer", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicPublish("amq.topic", "", new ArrayBuffer(2))).resolves.toBeDefined() }) test("can publish Uint8array", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicPublish("amq.topic", "", new Uint8Array(2))).resolves.toBeDefined() }) test("can do basicRecover", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await expect(ch.basicRecover(true)).resolves.toBeUndefined() }) test("can set frameMax", async () => { const amqp = getNewClient({ frameMax: 16 * 1024 }) const conn = await amqp.connect() const ch = await conn.channel() await ch.confirmSelect() const q = await ch.queueDeclare("") await ch.basicPublish("", q.name, "", { headers: { a: "a".repeat(15000) } }) const msg = await ch.basicGet(q.name) if (msg) { const props = msg.properties if (props) { const headers = props.headers if (headers) { const a = headers["a"] as string expect(a.length).toEqual(15000) } else expect(headers).toBeTruthy() } else expect(props).toBeTruthy() } else expect(msg).toBeTruthy() }) test("can't set too small frameMax", () => { expect(() => getNewClient({ frameMax: 16 })).toThrow() }) test("can handle frames split over socket reads", async () => { const amqp = getNewClient({ frameMax: 8 * 1024 }) const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const body = "a".repeat(5) const msgs = 100000 for (let i = 0; i < msgs; i++) { await ch.basicPublish("", q.name, body) } let i = 0 const consumer = await ch.basicConsume(q.name, { noAck: true }, () => { if (++i === msgs) consumer.cancel() }) await consumer.wait(5000) expect(i).toEqual(msgs) }, 10_000) test("have to connect socket before opening channels", async () => { const amqp = getNewClient() await expect(amqp.channel()).rejects.toThrow(/Connection closed/) }) test("will raise if socket is closed on send", async () => { const amqp = getNewClient() const conn = await amqp.connect() if (amqp.socket) amqp.socket.destroy() await expect(conn.channel()).rejects.toThrow() }) test("can handle cancel from server", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") const consumer = await ch.basicConsume(q.name, {}, () => {}) await ch.queueDelete(q.name) await expect(consumer.wait()).rejects.toThrow(/Consumer cancelled by the server/) }) test("can handle heartbeats", async () => { const amqp = getNewClient({ heartbeat: 1 }) const conn = await amqp.connect() const wait = new Promise((resolv) => setTimeout(resolv, 2000)) await wait expect(conn.closed).toEqual(false) }) test("has an onerror callback", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() let errMessage: string | null = null ch.onerror = vi.fn((reason) => (errMessage = reason)) const unknownType = "unknowntype" + Math.random().toString(36).slice(2) await expect(ch.exchangeDeclare("name" + Math.random(), unknownType)).rejects.toThrow() expect(ch.onerror).toBeCalled() // LavinMQ < 2.8.0 reports "invalid exchange type"; newer LavinMQ and RabbitMQ report "unknown exchange type" expect(errMessage).toMatch(/(unknown|invalid) exchange type/) }) test("onerror is not called when conn is closed by client", async () => { const amqp = getNewClient() const conn = await amqp.connect() const callbackPromise = new Promise((done, reject) => { conn.onerror = vi.fn((err: AMQPError) => reject(new Error(`onerror should not be called when gracefully closed. Error was: ${err.message}`)), ) setTimeout(done, 10) }) await conn.close() await expect(callbackPromise).resolves.toBeUndefined() expect(conn.onerror).not.toHaveBeenCalled() }) test("will throw on too large headers", async () => { const amqp = getNewClient({ frameMax: 8192 }) const conn = await amqp.connect() const ch = await conn.channel() await expect( ch.basicPublish("", "x".repeat(255), null, { headers: { a: Array(4000).fill(1) }, }), ).rejects.toThrow(RangeError) await expect(ch.basicPublish("", "", null, { headers: { a: "x".repeat(9000) } })).rejects.toThrow(RangeError) }) test("will split body over multiple frames", async () => { const amqp = getNewClient({ frameMax: 8192 }) const conn = await amqp.connect() const ch = await conn.channel() const q = await ch.queueDeclare("") await ch.confirmSelect() await ch.basicPublish("", q.name, "x".repeat(5000)) const msg = await ch.basicGet(q.name) if (msg) if (msg._rawBytes) expect(msg._rawBytes.length).toEqual(5000) else assert.fail("no body") else assert.fail("no msg") }) test("can republish in consume block without race condition", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.prefetch(0) const q = await ch.queueDeclare("") await ch.confirmSelect() await ch.basicPublish("", q.name, "x".repeat(500)) const consumer = await ch.basicConsume(q.name, { noAck: false }, async (msg) => { if (msg.deliveryTag < 10000) { await Promise.all([ ch.basicPublish("", q.name, msg._rawBytes), ch.basicPublish("", q.name, msg._rawBytes), msg.ack(), ]) } else if (msg.deliveryTag === 10000) { await msg.ack() await consumer.cancel() } }) await expect(consumer.wait()).resolves.toBeUndefined() await expect(conn.close()).resolves.toBeUndefined() console.log(conn.bufferPool.length) }, 60_000) test("raises when channelMax is reached", async () => { const amqp = getNewClient() const conn = await amqp.connect() for (let i = 0; i < conn.channelMax; i++) { await conn.channel() } await expect(conn.channel()).rejects.toThrow("Max number of channels reached") // make sure other channels still work const ch1 = await conn.channel(1) await expect(ch1.basicQos(10)).resolves.toBeUndefined() }, 10_000) test("client can negotiate channelMax", async () => { const amqp = getNewClient({ channelMax: 1 }) const conn = await amqp.connect() await conn.channel() await expect(conn.channel()).rejects.toThrow("Max number of channels reached") }, 10_000) test("can update-secret", async () => { const amqp = getNewClient() const conn = await amqp.connect() await expect(conn.updateSecret("foobar", "no reason")).resolves.toBeUndefined() }) test("should fail to connect to HTTP", async () => { const amqp = new AMQPClient("amqp://127.0.0.1:15672?heartbeat=1") await expect(amqp.connect()).rejects.toThrow() }) test("should handle heartbeat timeout correctly", async () => { const amqp = getNewClient({ heartbeat: 1 }) const conn = await amqp.connect() // Mock the socket timeout to simulate missed heartbeats const socket = amqp["socket"] assert(socket, "Socket must be created") // Set up error callback to capture timeout error const errorPromise = new Promise((resolve) => { conn.onerror = (err: AMQPError) => { resolve(err) } }) // Set up close promise to detect when socket is closed const closePromise = new Promise((resolve) => { socket.on("close", resolve) }) // Trigger timeout event to simulate heartbeat timeout socket.emit("timeout") // Wait for error callback and socket close const error = await errorPromise await closePromise // Verify error message and that connection is closed expect(error.message).toEqual("Heartbeat timeout") expect(conn.closed).toBe(true) }) test("can bind queues in parallel", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.queueDeclare("test-queue") await ch.exchangeDeclare("test-exchange", "fanout") // This should not hang - parallel binds should work await Promise.all([ ch.queueBind("test-queue", "test-exchange", "foo:*"), ch.queueBind("test-queue", "test-exchange", "bar:*"), ]) // Test with more binds to stress test the RPC queue await Promise.all([ ch.queueBind("test-queue", "test-exchange", "baz:*"), ch.queueBind("test-queue", "test-exchange", "qux:*"), ch.queueBind("test-queue", "test-exchange", "quux:*"), ]) // Verify bindings were created: passive declare succeeds and queue is reachable await expect(ch.queueDeclare("test-queue", { passive: true })).resolves.toBeDefined() }) test("should have no logger by default", () => { const amqp = getNewClient() expect(amqp.logger).toBeUndefined() }) test("should accept logger in AMQPClient constructor", () => { const mockLogger = { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), } const amqp = new AMQPClient("amqp://127.0.0.1", undefined, mockLogger) expect(amqp.logger).toBe(mockLogger) }) test("should accept logger in AMQPWebSocketClient constructor", () => { const mockLogger = { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), } const client = new AMQPWebSocketClient( "wss://example.com/ws", undefined, undefined, undefined, undefined, undefined, undefined, mockLogger, ) expect(client.logger).toBe(mockLogger) }) test("should accept logger via AMQPWebSocketClient init object", () => { const mockLogger = { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), } const client = new AMQPWebSocketClient({ url: "wss://example.com/ws", logger: mockLogger, }) expect(client.logger).toBe(mockLogger) }) test("should not log when logger is null", () => { const amqp = getNewClient() // This should not throw even with null logger expect(() => { amqp.logger?.error("test message") amqp.logger?.warn("test message") amqp.logger?.info("test message") amqp.logger?.debug("test message") }).not.toThrow() }) test("should use provided logger when available", () => { const mockLogger = { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), } const amqp = new AMQPClient("amqp://127.0.0.1", undefined, mockLogger) // Call logger methods directly (simulating how they're used in the code) amqp.logger?.error("error message") amqp.logger?.warn("warn message") amqp.logger?.info("info message") amqp.logger?.debug("debug message") expect(mockLogger.error).toHaveBeenCalledWith("error message") expect(mockLogger.warn).toHaveBeenCalledWith("warn message") expect(mockLogger.info).toHaveBeenCalledWith("info message") expect(mockLogger.debug).toHaveBeenCalledWith("debug message") }) test("connect can be called twice", async () => { const amqp = getNewClient() await amqp.connect() await amqp.connect() expect(amqp.closed).toBe(false) await amqp.close() }) test("consumer.cancel() on closed channel resolves without error", async () => { const amqp = getNewClient() const conn = await amqp.connect() const ch = await conn.channel() await ch.queueDeclare("") const consumer = await ch.basicConsume("", { noAck: true }, () => {}) await ch.close() await expect(consumer.cancel()).resolves.toBeDefined() await amqp.close() })