import type { Subprocess } from "bun"; import { spawn } from "bun"; import { afterEach, beforeEach, describe, expect, it } from "bun:test"; import crypto from "crypto"; import { once } from "events"; import { bunEnv, bunExe } from "harness"; import { createServer } from "http"; import { AddressInfo, connect } from "net"; import path from "node:path"; import { Server, WebSocket, WebSocketServer } from "ws"; const strings = [ { label: "string (ascii)", message: "ascii", bytes: [0x61, 0x73, 0x63, 0x69, 0x69], }, { label: "string (latin1)", message: "latin1-©", bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9], }, { label: "string (utf-8)", message: "utf8-😶", bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6], }, ] as const; const buffers = [ { label: "Uint8Array (utf-8)", message: new TextEncoder().encode("utf8-🙂"), bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82], }, { label: "ArrayBuffer (utf-8)", message: new TextEncoder().encode("utf8-🙃").buffer, bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83], }, { label: "Buffer (utf-8)", message: Buffer.from("utf8-🤩"), bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9], }, ] as const; const messages = [...strings, ...buffers] as const; const binaryTypes = [ { label: "nodebuffer", type: Buffer, }, { label: "arraybuffer", type: ArrayBuffer, }, ] as const; let servers: Subprocess[] = []; let clients: WebSocket[] = []; function cleanUp() { for (const client of clients) { client.terminate(); } for (const server of servers) { server.kill(); } } beforeEach(cleanUp); afterEach(cleanUp); describe("WebSocket", () => { test("url", (ws, done) => { expect(ws.url).toStartWith("ws://"); done(); }); test("readyState", (ws, done) => { expect(ws.readyState).toBe(WebSocket.CONNECTING); ws.on("open", () => { expect(ws.readyState).toBe(WebSocket.OPEN); ws.close(); }); ws.on("close", () => { expect(ws.readyState).toBe(WebSocket.CLOSED); done(); }); }); describe("binaryType", () => { test("(default)", (ws, done) => { expect(ws.binaryType).toBe("nodebuffer"); done(); }); test("(invalid)", (ws, done) => { try { // @ts-expect-error ws.binaryType = "invalid"; done(new Error("Expected an error")); } catch { done(); } }); for (const { label, type } of binaryTypes) { test(label, (ws, done) => { ws.binaryType = label; ws.on("open", () => { expect(ws.binaryType).toBe(label); ws.send(new Uint8Array(1)); }); ws.on("message", (data, isBinary) => { expect(data).toBeInstanceOf(type); expect(isBinary).toBeTrue(); ws.ping(); }); ws.on("ping", data => { expect(data).toBeInstanceOf(type); ws.pong(); }); ws.on("pong", data => { expect(data).toBeInstanceOf(type); done(); }); }); } }); describe("send()", () => { for (const { label, message, bytes } of messages) { test(label, (ws, done) => { ws.on("open", () => { ws.send(message); }); ws.on("message", (data, isBinary) => { if (typeof data === "string") { expect(data).toBe(message); expect(isBinary).toBeFalse(); } else { expect(data).toEqual(Buffer.from(bytes)); expect(isBinary).toBeTrue(); } done(); }); }); } }); describe("ping()", () => { test("(no argument)", (ws, done) => { ws.on("open", () => { ws.ping(); }); ws.on("ping", data => { expect(data).toBeInstanceOf(Buffer); done(); }); }); for (const { label, message, bytes } of messages) { test(label, (ws, done) => { ws.on("open", () => { ws.ping(message); }); ws.on("ping", data => { expect(data).toEqual(Buffer.from(bytes)); done(); }); }); } }); describe("pong()", () => { test("(no argument)", (ws, done) => { ws.on("open", () => { ws.pong(); }); ws.on("pong", data => { expect(data).toBeInstanceOf(Buffer); done(); }); }); for (const { label, message, bytes } of messages) { test(label, (ws, done) => { ws.on("open", () => { ws.pong(message); }); ws.on("pong", data => { expect(data).toEqual(Buffer.from(bytes)); done(); }); }); } }); describe("close()", () => { test("(no arguments)", (ws, done) => { ws.on("open", () => { ws.close(); }); ws.on("close", (code: number, reason: string, wasClean: boolean) => { expect(code).toBe(1000); expect(reason).toBeString(); expect(wasClean).toBeTrue(); done(); }); }); test("(no reason)", (ws, done) => { ws.on("open", () => { ws.close(1001); }); ws.on("close", (code: number, reason: string, wasClean: boolean) => { expect(code).toBe(1001); expect(reason).toBeString(); expect(wasClean).toBeTrue(); done(); }); }); // FIXME: Encoding issue // Expected: "latin1-©" // Received: "latin1-©" /* for (const { label, message } of strings) { test(label, (ws, done) => { ws.on("open", () => { ws.close(1002, message); }); ws.on("close", (code, reason, wasClean) => { expect(code).toBe(1002); expect(reason).toBe(message); expect(wasClean).toBeTrue(); done(); }); }); } */ }); test("terminate()", (ws, done) => { ws.on("open", () => { ws.terminate(); }); ws.on("close", (code: number, reason: string, wasClean: boolean) => { expect(code).toBe(1006); expect(reason).toBeString(); expect(wasClean).toBeFalse(); done(); }); }); test("prototype properties are set correctly", (ws, done) => { expect(ws.CLOSED).toBeDefined(); expect(ws.CLOSING).toBeDefined(); expect(ws.CONNECTING).toBeDefined(); expect(ws.OPEN).toBeDefined(); done(); }); it("sets static properties correctly", () => { expect(WebSocket.CLOSED).toBeDefined(); expect(WebSocket.CLOSING).toBeDefined(); expect(WebSocket.CONNECTING).toBeDefined(); expect(WebSocket.OPEN).toBeDefined(); }); }); describe("WebSocketServer", () => { it("sets websocket prototype properties correctly", async () => { const wss = new WebSocketServer({ port: 0 }); const { resolve, reject, promise } = Promise.withResolvers(); wss.on("connection", ws => { try { expect(ws.CLOSED).toBeDefined(); expect(ws.CLOSING).toBeDefined(); expect(ws.CONNECTING).toBeDefined(); expect(ws.OPEN).toBeDefined(); resolve(); } catch (err) { reject(err); } finally { wss.close(); ws.close(); } }); new WebSocket("ws://localhost:" + wss.address().port); await promise; }); it("sockets can be terminated", async () => { const wss = new WebSocketServer({ port: 0 }); const { resolve, reject, promise } = Promise.withResolvers(); wss.on("connection", ws => { ws.on("close", () => { resolve(); }); try { ws.terminate(); } catch (err) { reject(err); } }); new WebSocket("ws://localhost:" + wss.address().port); await promise; }); }); describe("Server", () => { it("sets websocket prototype properties correctly", async () => { const wss = new Server({ port: 0 }); const { resolve, reject, promise } = Promise.withResolvers(); wss.on("connection", ws => { try { expect(ws.CLOSED).toBeDefined(); expect(ws.CLOSING).toBeDefined(); expect(ws.CONNECTING).toBeDefined(); expect(ws.OPEN).toBeDefined(); resolve(); } catch (err) { reject(err); } finally { wss.close(); ws.close(); } }); new WebSocket("ws://localhost:" + wss.address().port); await promise; }); }); it("isBinary", async () => { const wss = new WebSocketServer({ port: 0 }); let isDone = false; const { resolve, reject, promise } = Promise.withResolvers(); wss.on("connection", ws => { ws.on("message", (data, isBinary) => { if (isDone) { expect(isBinary).toBeTrue(); wss.close(); ws.close(); resolve(); return; } expect(isBinary).toBeFalse(); isDone = true; }); ws.on("error", reject); }); const ws = new WebSocket("ws://localhost:" + wss.address().port); ws.on("open", function open() { ws.send("hello"); ws.send(Buffer.from([1, 2, 3])); }); await promise; }); it("onmessage", done => { const wss = new WebSocketServer({ port: 0 }); wss.on("connection", ws => { ws.onmessage = e => { expect(e.data).toEqual(Buffer.from("hello")); done(); wss.close(); }; }); const ws = new WebSocket("ws://localhost:" + wss.address().port); ws.onopen = () => { ws.send("hello"); }; }); // https://github.com/oven-sh/bun/issues/7896 it("close event", async () => { const via = [ function once(ws) { const { promise, resolve, reject } = Promise.withResolvers(); ws.once("close", () => resolve()); return promise; }, function on(ws) { const { promise, resolve, reject } = Promise.withResolvers(); ws.on("close", () => resolve()); return promise; }, function addEventListener(ws) { const { promise, resolve, reject } = Promise.withResolvers(); ws.addEventListener("close", () => resolve()); return promise; }, function onclose(ws) { const { promise, resolve, reject } = Promise.withResolvers(); // @ts-expect-error ws.onclose = () => resolve(); return promise; }, ]; const wss = new WebSocketServer({ port: 0 }); wss.on("connection", ws => { ws.onmessage = e => { expect(e.data).toEqual(Buffer.from("hello")); setTimeout(() => ws.close(), 10); }; }); await Promise.all( via.map(async version => { const ws = new WebSocket("ws://localhost:" + wss.address().port); ws.onopen = () => { ws.send("hello"); }; return version(ws); }), ); wss.close(); }); // https://github.com/oven-sh/bun/issues/14345 it("WebSocket finishRequest mocked", async () => { const { promise, resolve, reject } = Promise.withResolvers(); using server = Bun.serve({ port: 0, websocket: { open() {}, close() {}, message() {}, }, fetch(req, server) { expect(req.headers.get("X-Custom-Header")).toBe("CustomValue"); expect(req.headers.get("Another-Header")).toBe("AnotherValue"); return server.upgrade(req); }, }); const customHeaders = { "X-Custom-Header": "CustomValue", "Another-Header": "AnotherValue", }; const ws = new WebSocket(server.url, [], { finishRequest: req => { Object.entries(customHeaders).forEach(([key, value]) => { req.setHeader(key, value); }); req.end(); }, }); ws.once("open", () => { ws.send("Hello"); ws.close(); resolve(); }); await promise; }); function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void) => void, timeout?: number) { it( label, testDone => { let isDone = false; const done = (err?: unknown) => { if (!isDone) { isDone = true; testDone(err); } }; listen() .then(url => { const ws = new WebSocket(url); clients.push(ws); fn(ws, done); }) .catch(done); }, { timeout: timeout ?? 1000 }, ); } async function listen(): Promise { const pathname = path.resolve(import.meta.dir, "../../web/websocket/websocket-server-echo.mjs"); const { promise, resolve, reject } = Promise.withResolvers(); const server = spawn({ cmd: [bunExe(), pathname], cwd: import.meta.dir, env: bunEnv, stdout: "inherit", stderr: "inherit", serialization: "json", ipc(message) { const url = message?.href; if (url) { try { resolve(new URL(url)); } catch (error) { reject(error); } } }, }); servers.push(server); return await promise; } it("WebSocketServer should handle backpressure", async () => { const { promise, resolve, reject } = Promise.withResolvers(); const PAYLOAD_SIZE = 64 * 1024; const ITERATIONS = 10; const payload = Buffer.alloc(PAYLOAD_SIZE, "a"); let received = 0; const wss = new WebSocketServer({ port: 0 }); wss.on("connection", function connection(ws) { ws.onerror = reject; let i = 0; async function commit(err?: Error) { if (err) { reject(err); return; } await Bun.sleep(10); if (i < ITERATIONS) { i++; ws.send(payload, commit); } else { ws.close(); } } commit(undefined); }); try { const ws = new WebSocket("ws://localhost:" + wss.address().port); ws.onmessage = event => { received += event.data.byteLength; }; ws.onclose = resolve; ws.onerror = reject; await promise; expect(received).toBe(PAYLOAD_SIZE * ITERATIONS); } finally { wss.close(); } }); it("should abort incorrect WebSocket handshake", async () => { const { promise, resolve, reject } = Promise.withResolvers(); const wss = new WebSocketServer({ port: 0 }); let connectionAttempted = false; let testResolved = false; wss.on("connection", () => { connectionAttempted = true; if (!testResolved) { testResolved = true; reject(new Error("Connection should not have been established")); } }); wss.on("error", error => { // Server errors are expected for invalid handshakes console.log("Server error (expected):", error.message); }); try { const net = require("node:net"); const port = (wss.address() as any).port; const socket = net.createConnection(port, "localhost"); socket.on("connect", () => { // Send an invalid WebSocket handshake request (invalid Sec-WebSocket-Key) const invalidRequest = [ "GET / HTTP/1.1", "Host: localhost", "Connection: Upgrade", "Upgrade: websocket", "Sec-WebSocket-Key: invalid-key", // Invalid key format "Sec-WebSocket-Version: 13", "", "", ].join("\r\n"); socket.write(invalidRequest); }); let responseReceived = false; socket.on("data", data => { const response = data.toString(); responseReceived = true; // Should receive a 400 Bad Request response for invalid handshake if (response.includes("400") && !testResolved) { testResolved = true; resolve(); } else if (!testResolved) { testResolved = true; reject(new Error(`Expected 400 response, got: ${response}`)); } socket.end(); }); socket.on("error", error => { // Connection errors are also acceptable as the server may close the connection if (!testResolved) { testResolved = true; resolve(); } }); socket.on("close", () => { // If we reach here without getting a proper response and connection wasn't attempted, // the server properly rejected the invalid handshake if (!responseReceived && !connectionAttempted && !testResolved) { testResolved = true; resolve(); } }); await promise; } finally { wss.close(); } expect(connectionAttempted).toBeFalse(); expect(testResolved).toBeTrue(); }); it("Server should be able to send empty pings", async () => { // WebSocket frame creation function with masking function createWebSocketFrame(message: string) { const messageBuffer = Buffer.from(message); const frame = []; // Add FIN bit and opcode for text frame frame.push(0x81); // Payload length if (messageBuffer.length < 126) { frame.push(messageBuffer.length | 0x80); // Mask bit set } else if (messageBuffer.length < 65536) { frame.push(126 | 0x80); // Mask bit set frame.push((messageBuffer.length >> 8) & 0xff); frame.push(messageBuffer.length & 0xff); } else { frame.push(127 | 0x80); // Mask bit set for (let i = 7; i >= 0; i--) { frame.push((messageBuffer.length >> (i * 8)) & 0xff); } } // Generate masking key const maskingKey = crypto.randomBytes(4); frame.push(...maskingKey); // Mask the payload const maskedPayload = Buffer.alloc(messageBuffer.length); for (let i = 0; i < messageBuffer.length; i++) { maskedPayload[i] = messageBuffer[i] ^ maskingKey[i % 4]; } // Combine frame header and masked payload return Buffer.concat([Buffer.from(frame), maskedPayload]); } async function checkPing(helloMessage: string, pingMessage?: string) { const { promise, resolve, reject } = Promise.withResolvers(); const server = new WebSocketServer({ noServer: true }); const httpServer = createServer(); try { server.on("connection", async incoming => { incoming.on("message", value => { try { expect(value.toString()).toBe(helloMessage); if (arguments.length > 1) { incoming.ping(pingMessage); } else { incoming.ping(); } } catch (e) { reject(e); } }); }); httpServer.on("upgrade", async (request, socket, head) => { server.handleUpgrade(request, socket, head, ws => { server.emit("connection", ws, request); }); }); httpServer.listen(0); await once(httpServer, "listening"); const socket = connect({ port: (httpServer.address() as AddressInfo).port, host: "127.0.0.1", }); let upgradeResponse = ""; let state = 0; //connecting socket.on("data", (data: Buffer) => { switch (state) { case 0: { upgradeResponse += data.toString("utf8"); if (upgradeResponse.indexOf("\r\n\r\n") !== -1) { if (upgradeResponse.indexOf("HTTP/1.1 101 Switching Protocols") !== -1) { state = 1; socket.write(createWebSocketFrame(helloMessage)); } else { reject(new Error("Failed to Upgrade WebSockets")); state = 2; socket.end(); } } break; } case 1: { if (data.at(0) === 137) { try { const len = data.at(1) as number; if (len > 0) { const str = data.slice(2, len + 2).toString("utf8"); resolve(str); } else { resolve(""); } } catch (e) { reject(e); } state = 2; socket.end(); break; } reject(new Error("Unexpected data received")); } case 2: { reject(new Error("Connection Closed")); } } }); // Generate a Sec-WebSocket-Key const key = crypto.randomBytes(16).toString("base64"); // Create the WebSocket upgrade request socket.write( [ `GET / HTTP/1.1`, `Host: 127.0.0.1`, `Upgrade: websocket`, `Connection: Upgrade`, `Sec-WebSocket-Key: ${key}`, `Sec-WebSocket-Version: 13`, `\r\n`, ].join("\r\n"), ); return await promise; } finally { httpServer.closeAllConnections(); } } { // test without any payload const pingMessage = await checkPing(""); expect(pingMessage).toBe(""); } { // test with null payload //@ts-ignore const pingMessage = await checkPing("", null); expect(pingMessage).toBe(""); } { // test with undefined payload const pingMessage = await checkPing("", undefined); expect(pingMessage).toBe(""); } { // test with some payload const pingMessage = await checkPing("Hello", "bun"); expect(pingMessage).toBe("bun"); } { // test limits const pingPayload = Buffer.alloc(125, "b").toString(); const pingMessage = await checkPing("Hello, World", pingPayload); expect(pingMessage).toBe(pingPayload); } { // should not be equal because is bigger than 125 bytes const pingPayload = Buffer.alloc(126, "b").toString(); const pingMessage = await checkPing("Hello, World", pingPayload); expect(pingMessage).not.toBe(pingPayload); } });