fix(ws) fix handling of messages (#13210)

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
This commit is contained in:
Ciro Spaciari
2024-08-09 23:27:29 +00:00
committed by GitHub
parent 35465d3a29
commit 28c40babd2
2 changed files with 53 additions and 3 deletions

View File

@@ -747,10 +747,14 @@ class BunWebSocketMocked extends EventEmitter {
if (this.#state === 1) {
const compress = opts?.compress;
data = normalizeData(data, opts);
// send returns:
// 1+ - The number of bytes sent is always the byte length of the data never less
// 0 - dropped due to backpressure (not sent)
// -1 - enqueue the data internaly
// we dont need to do anything with the return value here
const written = this.#ws.send(data, compress);
if (written == -1) {
// backpressure
if (written === 0) {
// dropped
this.#enquedMessages.push([data, compress, cb]);
this.#bufferedAmount += data.length;
return;

View File

@@ -454,3 +454,49 @@ async function listen(): Promise<URL> {
}
throw new Error("No URL found?");
}
it("WebSocketServer should handle backpressure", async () => {
const { promise, resolve, reject } = Promise.withResolvers();
const PAYLOAD_SIZE = 64 * 1024;
const ITERATIONS = 10;
const payload = Buffer.alloc(PAYLOAD_SIZE, "a");
let received = 0;
const wss = new WebSocketServer({ port: 0 });
wss.on("connection", function connection(ws) {
ws.onerror = reject;
let i = 0;
async function commit(err?: Error) {
if (err) {
reject(err);
return;
}
await Bun.sleep(10);
if (i < ITERATIONS) {
i++;
ws.send(payload, commit);
} else {
ws.close();
}
}
commit(undefined);
});
try {
const ws = new WebSocket("ws://localhost:" + wss.address().port);
ws.onmessage = event => {
received += event.data.byteLength;
};
ws.onclose = resolve;
ws.onerror = reject;
await promise;
expect(received).toBe(PAYLOAD_SIZE * ITERATIONS);
} finally {
wss.close();
}
});