///
import { assert, expect, test, beforeEach, vi } from "vitest"
import { AMQPWebSocketClient } from "../src/amqp-websocket-client.js"
import { AMQPMessage } from "../src/amqp-message.js"
import type { AMQPError } from "../src/amqp-error.js"
const WS_URL = import.meta.env.VITE_WS_URL || "ws://127.0.0.1:15670/ws/amqp"
function getNewClient(init?: { frameMax?: number; heartbeat?: number }): AMQPWebSocketClient {
return init ? new AMQPWebSocketClient({ url: WS_URL, ...init }) : new AMQPWebSocketClient(WS_URL)
}
beforeEach(() => {
expect.hasAssertions()
})
test("can parse the url correctly", () => {
const username = "user_name"
const password = "passwd"
const hostname = "127.0.0.1"
const port = 15670
const vhost = "my_host"
const name = "test"
const client = new AMQPWebSocketClient({
url: `ws://${hostname}:${port}/ws/amqp`,
username: username,
password: password,
vhost: vhost,
name: name,
})
expect(client.username).toEqual(username)
expect(client.password).toEqual(password)
expect(client.vhost).toEqual(vhost)
expect(client.name).toEqual(name)
})
test("can open a connection and a channel", () => {
const amqp = getNewClient()
return amqp
.connect()
.then((conn) => conn.channel())
.then((ch) => expect(ch.connection.channels.length).toEqual(2)) // 2 because channel 0 is counted
})
test("can publish and consume", () => {
const amqp = getNewClient()
return new Promise((resolve, reject) => {
amqp
.connect()
.then(async (conn) => {
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.basicPublish("", q.name, "hello world")
await ch.basicConsume(q.name, { noAck: false }, (msg) => {
msg.ack()
resolve(msg)
})
})
.catch(reject)
}).then((result: AMQPMessage) => expect(result.bodyString()).toEqual("hello world"))
})
test("can nack a message", () => {
const amqp = getNewClient()
return new Promise((resolve, reject) => {
amqp
.connect()
.then(async (conn) => {
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.basicPublish("", q.name, "hello world")
await ch.basicConsume(q.name, { noAck: false }, (msg) => {
msg.nack()
resolve(msg)
})
})
.catch(reject)
}).then((result: AMQPMessage) => expect(result.bodyString()).toEqual("hello world"))
})
test("can reject a message", () => {
const amqp = getNewClient()
return new Promise((resolve, reject) => {
amqp
.connect()
.then(async (conn) => {
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.basicPublish("", q.name, "hello world")
await ch.basicConsume(q.name, { noAck: false }, (msg) => {
msg.reject()
resolve(msg)
})
})
.catch(reject)
}).then((result: AMQPMessage) => expect(result.bodyString()).toEqual("hello world"))
})
test("can unbind a queue from exchange", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.queueBind(q.name, "amq.topic", "asd")
await expect(ch.queueUnbind(q.name, "amq.topic", "asd")).resolves.toBeUndefined()
})
test("can unsubscribe from a queue", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, {}, () => {})
await expect(ch.basicCancel(consumer.tag)).resolves.toBeDefined()
})
test("can delete a queue", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await expect(ch.queueDelete(q.name)).resolves.toBeDefined()
})
test("can get message from a queue", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.basicPublish("", q.name, "message")
const msg = await ch.basicGet(q.name, { noAck: true })
expect((msg as AMQPMessage).bodyString()).toEqual("message")
})
test("will throw an error", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await expect(ch.queueDeclare("amq.foobar")).rejects.toThrow(/ACCESS_REFUSED/)
})
test("will throw an error after consumer timeout", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {})
await expect(consumer.wait(1)).rejects.toThrow()
})
test("will throw an error if consumer is closed", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {})
consumer.setClosed(new Error("testing"))
try {
await consumer.wait(1)
} catch (error) {
expect((error as Error).message).toEqual("testing")
}
})
test("can cancel a consumer", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, { noAck: false }, console.log)
const channel = await consumer.cancel()
expect(channel.consumers.size).toEqual(0)
})
test("will clear consumer wait timeout on cancel", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {})
const wait = consumer.wait(5000)
consumer.cancel()
await expect(wait).resolves.toBeUndefined()
})
test("can close a channel", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await ch.close()
await expect(ch.close()).rejects.toThrow("Channel is closed")
})
test("connection error raises everywhere", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await conn.close()
await expect(ch.close()).rejects.toThrow(/Channel is closed/)
})
test("consumer stops wait on cancel", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, {}, () => {})
await ch.basicPublish("", q.name, "foobar")
await consumer.cancel()
await expect(consumer.wait()).resolves.toBeUndefined()
})
test("consumer stops wait on channel error", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, {}, () => {})
// acking invalid delivery tag should close channel
setTimeout(() => ch.basicAck(99999), 1)
await expect(consumer.wait()).rejects.toThrow()
})
test("connection error raises on publish", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await conn.close()
await expect(ch.basicPublish("", q.name, "foobar")).rejects.toThrow()
})
test("closed socket closes client", async () => {
const amqp = getNewClient()
await amqp.connect()
const socket = amqp["socket"]
assert(socket, "Socket must be created")
const closed = new Promise((resolve) => socket.addEventListener("close", resolve))
socket.close()
await closed
expect(amqp.closed).toBe(true)
})
test("connection loss closes channels and consumers", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, { noAck: false }, () => {})
// Set up error handler to track when consumer is closed
const originalConsumerWait = consumer.wait()
const socket = amqp["socket"]
assert(socket, "Socket must be created")
// Simulate unclean connection loss by closing the socket
const closed = new Promise((resolve) => socket.addEventListener("close", resolve))
socket.close()
await closed
// Check that connection, channel, and consumer are all marked as closed
expect(amqp.closed).toBe(true)
expect(ch.closed).toBe(true)
// Consumer wait should reject with an error
await expect(originalConsumerWait).rejects.toThrow()
// Verify that operations on closed objects throw errors
await expect(ch.queueDeclare("")).rejects.toThrow(/closed/)
await expect(ch.basicPublish("", q.name, "test")).rejects.toThrow()
})
test("connection loss triggers onerror callback", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
let errorReceived: AMQPError | null = null
conn.onerror = vi.fn((err: AMQPError) => {
errorReceived = err
})
const socket = amqp["socket"]
assert(socket, "Socket must be created")
// Simulate unclean connection loss
const closed = new Promise((resolve) => socket.addEventListener("close", resolve))
socket.close()
await closed
// Check that error callback was called
expect(conn.onerror).toHaveBeenCalled()
expect(errorReceived).toBeTruthy()
if (errorReceived) {
expect((errorReceived as AMQPError).message).toMatch(/connection not cleanly closed/)
}
expect(amqp.closed).toBe(true)
})
test("wait for publish confirms", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
let tag
// publishes without confirm should return 0
tag = await ch.basicPublish("amq.fanout", "rk", "body")
expect(tag).toEqual(0)
tag = await ch.basicPublish("amq.fanout", "rk", "body")
expect(tag).toEqual(0)
// publishes with confirm should return the delivery tag id
await ch.confirmSelect()
tag = await ch.basicPublish("amq.fanout", "rk", "body")
expect(tag).toEqual(1)
tag = await ch.basicPublish("amq.fanout", "rk", "body")
expect(tag).toEqual(2)
// can wait for multiple tags
const tags = await Promise.all([
ch.basicPublish("amq.fanout", "rk", "body"),
ch.basicPublish("amq.fanout", "rk", "body"),
])
expect(tags).toEqual([3, 4])
})
test("can handle returned messages", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const returned = new Promise((resolve) => (ch.onReturn = resolve))
await ch.basicPublish("", "not-a-queue", "body", {}, true)
const msg = (await returned) as AMQPMessage
expect(msg.replyCode).toEqual(312)
expect(msg.routingKey).toEqual("not-a-queue")
})
test("can handle nacks on confirm channel", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("", {}, { "x-overflow": "reject-publish", "x-max-length": 0 })
await ch.confirmSelect()
await expect(ch.basicPublish("", q.name, "body")).rejects.toThrow("Message rejected")
})
test("throws on unknown exchange type", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const name = "test" + Math.random()
const unknownType = "unknowntype" + Math.random().toString(36).slice(2)
// LavinMQ < 2.8.0 reports "invalid exchange type"; newer LavinMQ and RabbitMQ report "unknown exchange type"
await expect(ch.exchangeDeclare(name, unknownType)).rejects.toThrow(/(unknown|invalid) exchange type/)
})
test("can declare an exchange", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const name = "test" + Math.random()
await ch.confirmSelect()
await ch.exchangeDeclare(name, "fanout")
await expect(ch.basicPublish(name, "rk", "body")).resolves.toBeDefined()
await ch.exchangeDelete(name)
await expect(ch.basicPublish(name, "rk", "body")).rejects.toThrow(/NOT_FOUND/)
})
test("exchange to exchange bind/unbind", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const name1 = "test1" + Math.random()
const name2 = "test2" + Math.random()
await ch.exchangeDeclare(name1, "fanout", { autoDelete: false })
await ch.exchangeDeclare(name2, "fanout", { autoDelete: true })
await ch.exchangeBind(name2, name1)
const q = await ch.queueDeclare("")
await ch.queueBind(q.name, name2, "")
await ch.confirmSelect()
await ch.basicPublish(name1, "", "")
const msg1 = await ch.basicGet(q.name)
expect(msg1?.exchange).toEqual(name1)
await ch.exchangeUnbind(name2, name1)
await ch.basicPublish(name1, "", "")
const msg2 = await ch.basicGet(q.name)
expect(msg2).toBeNull()
await ch.exchangeDelete(name1)
})
// not implemented on servers
test.skip("can change flow state of channel", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
let flow = await ch.basicFlow(false)
expect(flow).toEqual(false)
flow = await ch.basicFlow(true)
expect(flow).toEqual(true)
})
test("basic get", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
let msg
msg = await ch.basicGet(q.name)
expect(msg).toBeNull()
await ch.basicPublish("", q.name, "foobar")
msg = await ch.basicGet(q.name)
expect(msg?.bodyToString()).toEqual("foobar")
})
test("transactions", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.txSelect()
await ch.basicPublish("", q.name, "foobar")
const msg1 = await ch.basicGet(q.name)
expect(msg1).toBeNull()
await ch.txCommit()
const msg2 = await ch.basicGet(q.name)
expect(msg2, "missing message").toBeTruthy()
expect(msg2?.bodyToString()).toEqual("foobar")
await ch.basicPublish("", q.name, "foobar")
await ch.txRollback()
const msg3 = await ch.basicGet(q.name)
expect(msg3).toBeNull()
})
test("can publish and consume msgs with large headers", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.basicPublish("", q.name, "a".repeat(4000), {
headers: {
long: new Uint8Array(new TextEncoder().encode("a".repeat(4000))),
},
})
await ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: "a".repeat(4000) } })
await ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: Array(100).fill("a") } })
const consumer = await ch.basicConsume(q.name, { noAck: false }, async (msg) => {
if (msg.deliveryTag === 3) await msg.cancelConsumer()
})
await expect(consumer.wait()).resolves.toBeUndefined()
})
test("will throw when headers are too long", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await expect(ch.basicPublish("", q.name, "a".repeat(8000), { headers: { long: "a".repeat(9000) } })).rejects.toThrow()
})
test("can purge a queue", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.basicPublish("", q.name, "a")
const purged = await ch.queuePurge(q.name)
expect(purged.messageCount).toEqual(1)
const msg = await ch.basicGet(q.name)
expect(msg).toBeNull()
})
test("can publish all type of properties", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const headers = {
a: 2,
b: true,
c: "c",
d: 1.5,
e: null,
f: new Date(1000),
g: { a: 1 },
i: 2 ** 32 + 1,
j: 2.5 ** 33,
}
const properties = {
contentType: "application/json",
contentEncoding: "gzip",
headers: headers,
deliveryMode: 2,
priority: 1,
correlationId: "corr",
replyTo: "me",
expiration: "10000",
messageId: "msgid",
appId: "appid",
userId: "guest",
type: "type",
timestamp: new Date(Math.round(Date.now() / 1000) * 1000), // amqp timestamps does only have second resolution
}
await ch.basicPublish("", q.name, "", properties)
const msg = await ch.basicGet(q.name)
expect(msg?.properties).toMatchObject(properties)
})
test("cannot publish too long strings", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
expect(() => ch.queueDeclare("a".repeat(256))).toThrow(/Short string too long/)
})
test("can set prefetch", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await expect(ch.prefetch(1)).resolves.toBeUndefined()
})
test("can open a specific channel", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel(2)
expect(ch.id).toEqual(2)
})
test("can open a specific channel twice", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel(2)
expect(ch.id).toEqual(2)
const ch2 = await conn.channel(2)
expect(ch2 === ch).toEqual(true)
})
test("can publish messages spanning multiple frames", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await ch.confirmSelect()
const q = await ch.queueDeclare("")
const sizes = [4087, 4088, 4089, 4096, 5000, 10000]
expect.assertions(sizes.length)
for (const n of sizes) {
await ch.basicPublish("", q.name, new Uint8Array(n ?? 0))
const msg = await ch.basicGet(q.name)
expect(msg?.bodySize).toEqual(n)
}
})
test("set basic flow on channel", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await expect(ch.basicFlow(true)).resolves.toBeDefined()
})
test("confirming unknown deliveryTag", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
expect(() => ch.publishConfirmed(1, false, false)).not.toThrow()
})
// ch.deliver does enqueue a microtask, rendering the ch.deliver method untestable.
test.skip("delivering a message when no consumer exists raises", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const msg = new AMQPMessage(ch)
msg.consumerTag = "abc"
expect(() => ch.deliver(msg)).toThrow()
})
test("can publish null", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await expect(ch.basicPublish("amq.topic", "", null)).resolves.toBeDefined()
})
test("can publish ArrayBuffer", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await expect(ch.basicPublish("amq.topic", "", new ArrayBuffer(2))).resolves.toBeDefined()
})
test("can publish Uint8array", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await expect(ch.basicPublish("amq.topic", "", new Uint8Array(2))).resolves.toBeDefined()
})
test("can do basicRecover", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await expect(ch.basicRecover(true)).resolves.toBeUndefined()
})
test("can set frameMax", async () => {
const amqp = getNewClient({ frameMax: 16 * 1024 })
const conn = await amqp.connect()
const ch = await conn.channel()
await ch.confirmSelect()
const q = await ch.queueDeclare("")
const headerValue = "a".repeat(conn.frameMax - 100) // leave some space for other parts of the frame
await ch.basicPublish("", q.name, "", { headers: { a: headerValue } })
const msg = await ch.basicGet(q.name)
if (msg) {
const props = msg.properties
if (props) {
const headers = props.headers
if (headers) {
const a = headers["a"] as string
expect(a.length).toEqual(headerValue.length)
} else expect(headers).toBeTruthy()
} else expect(props).toBeTruthy()
} else expect(msg).toBeTruthy()
})
test("can't set too small frameMax", () => {
expect(() => getNewClient({ frameMax: 16 })).toThrow()
})
test("can handle frames split over socket reads", async () => {
const amqp = getNewClient({ frameMax: 8 * 1024 })
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const body = "a".repeat(5)
const msgs = 100000
for (let i = 0; i < msgs; i++) {
await ch.basicPublish("", q.name, body)
}
let i = 0
const consumer = await ch.basicConsume(q.name, { noAck: true }, () => {
if (++i === msgs) consumer.cancel()
})
await consumer.wait(20_000)
expect(i).toEqual(msgs)
}, 60_000)
test("have to connect socket before opening channels", async () => {
const amqp = getNewClient()
await expect(amqp.channel()).rejects.toThrow(/Connection closed/)
})
test("will raise if socket is closed on send", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
await amqp.close()
await expect(conn.channel()).rejects.toThrow()
}, 10_000)
test("can handle cancel from server", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
const consumer = await ch.basicConsume(q.name, {}, () => {})
await ch.queueDelete(q.name)
await expect(consumer.wait()).rejects.toThrow(/Consumer cancelled by the server/)
}, 10_000)
test("can handle heartbeats", async () => {
const amqp = getNewClient({ heartbeat: 1 })
const conn = await amqp.connect()
const wait = new Promise((resolv) => setTimeout(resolv, 2000))
await wait
expect(conn.closed).toEqual(false)
}, 10_000)
test("has an onerror callback", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
let errMessage: string | null = null
ch.onerror = vi.fn((reason) => (errMessage = reason))
const unknownType = "unknowntype" + Math.random().toString(36).slice(2)
await expect(ch.exchangeDeclare("name" + Math.random(), unknownType)).rejects.toThrow()
expect(ch.onerror).toBeCalled()
// LavinMQ < 2.8.0 reports "invalid exchange type"; newer LavinMQ and RabbitMQ report "unknown exchange type"
expect(errMessage).toMatch(/(unknown|invalid) exchange type/)
})
test("onerror is not called when conn is closed by client", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const callbackPromise = new Promise((done, reject) => {
conn.onerror = vi.fn((err: AMQPError) =>
reject(new Error(`onerror should not be called when gracefully closed. Error was: ${err.message}`)),
)
setTimeout(done, 10)
})
await conn.close()
await expect(callbackPromise).resolves.toBeUndefined()
expect(conn.onerror).not.toHaveBeenCalled()
})
test("will throw on too large headers", async () => {
const amqp = getNewClient({ frameMax: 8192 })
const conn = await amqp.connect()
const ch = await conn.channel()
await expect(
ch.basicPublish("", "x".repeat(255), null, {
headers: { a: Array(4000).fill(1) },
}),
).rejects.toThrow(RangeError)
await expect(ch.basicPublish("", "", null, { headers: { a: "x".repeat(9000) } })).rejects.toThrow(RangeError)
})
test("will split body over multiple frames", async () => {
const amqp = getNewClient({ frameMax: 8192 })
const conn = await amqp.connect()
const ch = await conn.channel()
const q = await ch.queueDeclare("")
await ch.confirmSelect()
await ch.basicPublish("", q.name, "x".repeat(5000))
const msg = await ch.basicGet(q.name)
if (msg)
if (msg._rawBytes) expect(msg._rawBytes.length).toEqual(5000)
else assert.fail("no body")
else assert.fail("no msg")
})
test("can republish in consume block without race condition", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
const ch = await conn.channel()
await ch.prefetch(0)
const q = await ch.queueDeclare("")
const queueName = q.name
await ch.confirmSelect()
await ch.basicPublish("", queueName, "x".repeat(500))
const consumer = await ch.basicConsume(queueName, { noAck: false }, async (msg) => {
if (msg.deliveryTag < 10000) {
await Promise.all([
ch.basicPublish("", queueName, msg._rawBytes),
ch.basicPublish("", queueName, msg._rawBytes),
msg.ack(),
])
} else if (msg.deliveryTag === 10000) {
await consumer.cancel()
}
})
await expect(consumer.wait()).resolves.toBeUndefined()
await expect(conn.close()).resolves.toBeUndefined()
console.log(conn.bufferPool.length)
}, 20_000)
test("raises when channelMax is reached", async () => {
const amqp = getNewClient()
const conn = await amqp.connect()
for (let i = 0; i < conn.channelMax; i++) {
await conn.channel()
}
await expect(conn.channel()).rejects.toThrow("Max number of channels reached")
// make sure other channels still work
const ch1 = await conn.channel(1)
await expect(ch1.basicQos(10)).resolves.toBeUndefined()
}, 20_000)
test("should fail to connect to an AMQP port", async () => {
const amqp = new AMQPWebSocketClient("ws://127.0.0.1:5672/ws/amqp")
await expect(amqp.connect()).rejects.toThrow()
})