diff --git a/src/js/thirdparty/ws.js b/src/js/thirdparty/ws.js index 53e6b9e88b..2b86283300 100644 --- a/src/js/thirdparty/ws.js +++ b/src/js/thirdparty/ws.js @@ -747,10 +747,14 @@ class BunWebSocketMocked extends EventEmitter { if (this.#state === 1) { const compress = opts?.compress; data = normalizeData(data, opts); + // send returns: + // 1+ - The number of bytes sent is always the byte length of the data never less + // 0 - dropped due to backpressure (not sent) + // -1 - enqueue the data internaly + // we dont need to do anything with the return value here const written = this.#ws.send(data, compress); - - if (written == -1) { - // backpressure + if (written === 0) { + // dropped this.#enquedMessages.push([data, compress, cb]); this.#bufferedAmount += data.length; return; diff --git a/test/js/first_party/ws/ws.test.ts b/test/js/first_party/ws/ws.test.ts index 7bb540c1e9..b79e90879b 100644 --- a/test/js/first_party/ws/ws.test.ts +++ b/test/js/first_party/ws/ws.test.ts @@ -454,3 +454,49 @@ async function listen(): Promise { } throw new Error("No URL found?"); } +it("WebSocketServer should handle backpressure", async () => { + const { promise, resolve, reject } = Promise.withResolvers(); + const PAYLOAD_SIZE = 64 * 1024; + const ITERATIONS = 10; + const payload = Buffer.alloc(PAYLOAD_SIZE, "a"); + let received = 0; + + const wss = new WebSocketServer({ port: 0 }); + + wss.on("connection", function connection(ws) { + ws.onerror = reject; + + let i = 0; + + async function commit(err?: Error) { + if (err) { + reject(err); + return; + } + await Bun.sleep(10); + + if (i < ITERATIONS) { + i++; + ws.send(payload, commit); + } else { + ws.close(); + } + } + + commit(undefined); + }); + + try { + const ws = new WebSocket("ws://localhost:" + wss.address().port); + ws.onmessage = event => { + received += event.data.byteLength; + }; + ws.onclose = resolve; + ws.onerror = reject; + await promise; + + expect(received).toBe(PAYLOAD_SIZE * ITERATIONS); + } finally { + wss.close(); + } +});