diff --git a/src/js/node/http.ts b/src/js/node/http.ts index ed03de7075..71d59f9337 100644 --- a/src/js/node/http.ts +++ b/src/js/node/http.ts @@ -533,6 +533,12 @@ Server.prototype.listen = function (port, host, backlog, onListen) { drain(ws) { ws.data.drain(ws); }, + ping(ws, data) { + ws.data.ping(ws, data); + }, + pong(ws, data) { + ws.data.pong(ws, data); + }, }, // Be very careful not to access (web) Request object // properties: diff --git a/src/js/thirdparty/ws.js b/src/js/thirdparty/ws.js index 27f470253b..25db515009 100644 --- a/src/js/thirdparty/ws.js +++ b/src/js/thirdparty/ws.js @@ -198,6 +198,10 @@ class BunWebSocket extends EventEmitter { } ping(data, mask, cb) { + if (this.#ws.readyState === 0) { + throw new Error("WebSocket is not open: readyState 0 (CONNECTING)"); + } + if (typeof data === "function") { cb = data; data = mask = undefined; @@ -211,7 +215,11 @@ class BunWebSocket extends EventEmitter { try { this.#ws.ping(data); } catch (error) { - typeof cb === "function" && cb(error); + if (typeof cb === "function") { + cb(error); + return; + } + this.emit("error", error); return; } @@ -219,6 +227,10 @@ class BunWebSocket extends EventEmitter { } pong(data, mask, cb) { + if (this.#ws.readyState === 0) { + throw new Error("WebSocket is not open: readyState 0 (CONNECTING)"); + } + if (typeof data === "function") { cb = data; data = mask = undefined; @@ -232,7 +244,11 @@ class BunWebSocket extends EventEmitter { try { this.#ws.pong(data); } catch (error) { - typeof cb === "function" && cb(error); + if (typeof cb === "function") { + cb(error); + return; + } + this.emit("error", error); return; } @@ -526,15 +542,29 @@ class BunWebSocketMocked extends EventEmitter { const open = this.#open.bind(this); const close = this.#close.bind(this); const drain = this.#drain.bind(this); + const ping = this.#ping.bind(this); + const pong = this.#pong.bind(this); this[kBunInternals] = { message, // a message is received open, // a socket is opened close, // a socket is closed drain, // the socket is ready to receive more data + ping, // a ping is received + pong, // a pong is received }; } + #ping(ws, data) { + this.#ws = ws; + this.emit("ping", data); + } + + #pong(ws, data) { + this.#ws = ws; + this.emit("pong", data); + } + #message(ws, message) { this.#ws = ws; @@ -579,22 +609,72 @@ class BunWebSocketMocked extends EventEmitter { } #drain(ws) { - const chunk = this.#enquedMessages[0]; - if (chunk) { + let chunk; + while ((chunk = this.#enquedMessages[0]) && this.#state === 1) { const [data, compress, cb] = chunk; const written = ws.send(data, compress); - if (written == -1) { + if (written < 1) { // backpressure wait until next drain event return; } - typeof cb === "function" && cb(); - this.#bufferedAmount -= chunk.length; this.#enquedMessages.shift(); + + typeof cb === "function" && queueMicrotask(cb); } } + ping(data, mask, cb) { + if (this.#state === 0) { + throw new Error("WebSocket is not open: readyState 0 (CONNECTING)"); + } + + if (typeof data === "function") { + cb = data; + data = mask = undefined; + } else if (typeof mask === "function") { + cb = mask; + mask = undefined; + } + + if (typeof data === "number") data = data.toString(); + + try { + this.#ws.ping(data); + } catch (error) { + typeof cb === "function" && cb(error); + return; + } + + typeof cb === "function" && cb(); + } + + pong(data, mask, cb) { + if (this.#state === 0) { + throw new Error("WebSocket is not open: readyState 0 (CONNECTING)"); + } + + if (typeof data === "function") { + cb = data; + data = mask = undefined; + } else if (typeof mask === "function") { + cb = mask; + mask = undefined; + } + + if (typeof data === "number") data = data.toString(); + + try { + this.#ws.pong(data); + } catch (error) { + typeof cb === "function" && cb(error); + return; + } + + typeof cb === "function" && cb(); + } + send(data, opts, cb) { if (this.#state === 1) { const compress = opts?.compress; @@ -606,7 +686,7 @@ class BunWebSocketMocked extends EventEmitter { return; } - typeof cb === "function" && cb(); + typeof cb === "function" && process.nextTick(cb); } else if (this.#state === 0) { // not connected yet this.#enquedMessages.push([data, opts?.compress, cb]); diff --git a/test/js/first_party/ws/ws.test.ts b/test/js/first_party/ws/ws.test.ts index 6c8334f115..58468b5a6f 100644 --- a/test/js/first_party/ws/ws.test.ts +++ b/test/js/first_party/ws/ws.test.ts @@ -1,9 +1,9 @@ -import { describe, it, expect, beforeEach, afterEach } from "bun:test"; import type { Subprocess } from "bun"; import { spawn } from "bun"; +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; import { bunEnv, bunExe, nodeExe } from "harness"; -import { Server, WebSocket, WebSocketServer } from "ws"; import path from "node:path"; +import { Server, WebSocket, WebSocketServer } from "ws"; const strings = [ { @@ -254,8 +254,9 @@ describe("WebSocket", () => { }); describe("WebSocketServer", () => { - it("sets websocket prototype properties correctly", done => { + it("sets websocket prototype properties correctly", async () => { const wss = new WebSocketServer({ port: 0 }); + const { resolve, reject, promise } = Promise.withResolvers(); wss.on("connection", ws => { try { @@ -263,9 +264,9 @@ describe("WebSocketServer", () => { expect(ws.CLOSING).toBeDefined(); expect(ws.CONNECTING).toBeDefined(); expect(ws.OPEN).toBeDefined(); - return done(); + resolve(); } catch (err) { - done(err); + reject(err); } finally { wss.close(); ws.close(); @@ -273,12 +274,14 @@ describe("WebSocketServer", () => { }); new WebSocket("ws://localhost:" + wss.address().port); + await promise; }); }); describe("Server", () => { - it("sets websocket prototype properties correctly", done => { + it("sets websocket prototype properties correctly", async () => { const wss = new Server({ port: 0 }); + const { resolve, reject, promise } = Promise.withResolvers(); wss.on("connection", ws => { try { @@ -286,9 +289,9 @@ describe("Server", () => { expect(ws.CLOSING).toBeDefined(); expect(ws.CONNECTING).toBeDefined(); expect(ws.OPEN).toBeDefined(); - return done(); + resolve(); } catch (err) { - done(err); + reject(err); } finally { wss.close(); ws.close(); @@ -296,24 +299,27 @@ describe("Server", () => { }); new WebSocket("ws://localhost:" + wss.address().port); + await promise; }); }); -it("isBinary", done => { +it("isBinary", async () => { const wss = new WebSocketServer({ port: 0 }); let isDone = false; + const { resolve, reject, promise } = Promise.withResolvers(); wss.on("connection", ws => { ws.on("message", (data, isBinary) => { if (isDone) { expect(isBinary).toBeTrue(); wss.close(); ws.close(); - done(); + resolve(); return; } expect(isBinary).toBeFalse(); isDone = true; }); + ws.on("error", reject); }); const ws = new WebSocket("ws://localhost:" + wss.address().port); @@ -321,6 +327,8 @@ it("isBinary", done => { ws.send("hello"); ws.send(Buffer.from([1, 2, 3])); }); + + await promise; }); it("onmessage", done => { @@ -364,10 +372,10 @@ function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void) async function listen(): Promise { const pathname = path.resolve(import.meta.dir, "../../web/websocket/websocket-server-echo.mjs"); const server = spawn({ - cmd: [nodeExe() ?? bunExe(), pathname], + cmd: [bunExe(), pathname], cwd: import.meta.dir, env: bunEnv, - stderr: "ignore", + stderr: "inherit", stdout: "pipe", }); servers.push(server);