* Fixes #3202

* Update ws.js

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
This commit is contained in:
Jarred Sumner
2024-03-16 17:47:49 -07:00
committed by GitHub
parent 1879c499b1
commit 7c9f076385
3 changed files with 114 additions and 20 deletions

View File

@@ -533,6 +533,12 @@ Server.prototype.listen = function (port, host, backlog, onListen) {
drain(ws) {
ws.data.drain(ws);
},
ping(ws, data) {
ws.data.ping(ws, data);
},
pong(ws, data) {
ws.data.pong(ws, data);
},
},
// Be very careful not to access (web) Request object
// properties:

View File

@@ -198,6 +198,10 @@ class BunWebSocket extends EventEmitter {
}
ping(data, mask, cb) {
if (this.#ws.readyState === 0) {
throw new Error("WebSocket is not open: readyState 0 (CONNECTING)");
}
if (typeof data === "function") {
cb = data;
data = mask = undefined;
@@ -211,7 +215,11 @@ class BunWebSocket extends EventEmitter {
try {
this.#ws.ping(data);
} catch (error) {
typeof cb === "function" && cb(error);
if (typeof cb === "function") {
cb(error);
return;
}
this.emit("error", error);
return;
}
@@ -219,6 +227,10 @@ class BunWebSocket extends EventEmitter {
}
pong(data, mask, cb) {
if (this.#ws.readyState === 0) {
throw new Error("WebSocket is not open: readyState 0 (CONNECTING)");
}
if (typeof data === "function") {
cb = data;
data = mask = undefined;
@@ -232,7 +244,11 @@ class BunWebSocket extends EventEmitter {
try {
this.#ws.pong(data);
} catch (error) {
typeof cb === "function" && cb(error);
if (typeof cb === "function") {
cb(error);
return;
}
this.emit("error", error);
return;
}
@@ -526,15 +542,29 @@ class BunWebSocketMocked extends EventEmitter {
const open = this.#open.bind(this);
const close = this.#close.bind(this);
const drain = this.#drain.bind(this);
const ping = this.#ping.bind(this);
const pong = this.#pong.bind(this);
this[kBunInternals] = {
message, // a message is received
open, // a socket is opened
close, // a socket is closed
drain, // the socket is ready to receive more data
ping, // a ping is received
pong, // a pong is received
};
}
#ping(ws, data) {
this.#ws = ws;
this.emit("ping", data);
}
#pong(ws, data) {
this.#ws = ws;
this.emit("pong", data);
}
#message(ws, message) {
this.#ws = ws;
@@ -579,22 +609,72 @@ class BunWebSocketMocked extends EventEmitter {
}
#drain(ws) {
const chunk = this.#enquedMessages[0];
if (chunk) {
let chunk;
while ((chunk = this.#enquedMessages[0]) && this.#state === 1) {
const [data, compress, cb] = chunk;
const written = ws.send(data, compress);
if (written == -1) {
if (written < 1) {
// backpressure wait until next drain event
return;
}
typeof cb === "function" && cb();
this.#bufferedAmount -= chunk.length;
this.#enquedMessages.shift();
typeof cb === "function" && queueMicrotask(cb);
}
}
ping(data, mask, cb) {
if (this.#state === 0) {
throw new Error("WebSocket is not open: readyState 0 (CONNECTING)");
}
if (typeof data === "function") {
cb = data;
data = mask = undefined;
} else if (typeof mask === "function") {
cb = mask;
mask = undefined;
}
if (typeof data === "number") data = data.toString();
try {
this.#ws.ping(data);
} catch (error) {
typeof cb === "function" && cb(error);
return;
}
typeof cb === "function" && cb();
}
pong(data, mask, cb) {
if (this.#state === 0) {
throw new Error("WebSocket is not open: readyState 0 (CONNECTING)");
}
if (typeof data === "function") {
cb = data;
data = mask = undefined;
} else if (typeof mask === "function") {
cb = mask;
mask = undefined;
}
if (typeof data === "number") data = data.toString();
try {
this.#ws.pong(data);
} catch (error) {
typeof cb === "function" && cb(error);
return;
}
typeof cb === "function" && cb();
}
send(data, opts, cb) {
if (this.#state === 1) {
const compress = opts?.compress;
@@ -606,7 +686,7 @@ class BunWebSocketMocked extends EventEmitter {
return;
}
typeof cb === "function" && cb();
typeof cb === "function" && process.nextTick(cb);
} else if (this.#state === 0) {
// not connected yet
this.#enquedMessages.push([data, opts?.compress, cb]);

View File

@@ -1,9 +1,9 @@
import { describe, it, expect, beforeEach, afterEach } from "bun:test";
import type { Subprocess } from "bun";
import { spawn } from "bun";
import { afterEach, beforeEach, describe, expect, it } from "bun:test";
import { bunEnv, bunExe, nodeExe } from "harness";
import { Server, WebSocket, WebSocketServer } from "ws";
import path from "node:path";
import { Server, WebSocket, WebSocketServer } from "ws";
const strings = [
{
@@ -254,8 +254,9 @@ describe("WebSocket", () => {
});
describe("WebSocketServer", () => {
it("sets websocket prototype properties correctly", done => {
it("sets websocket prototype properties correctly", async () => {
const wss = new WebSocketServer({ port: 0 });
const { resolve, reject, promise } = Promise.withResolvers();
wss.on("connection", ws => {
try {
@@ -263,9 +264,9 @@ describe("WebSocketServer", () => {
expect(ws.CLOSING).toBeDefined();
expect(ws.CONNECTING).toBeDefined();
expect(ws.OPEN).toBeDefined();
return done();
resolve();
} catch (err) {
done(err);
reject(err);
} finally {
wss.close();
ws.close();
@@ -273,12 +274,14 @@ describe("WebSocketServer", () => {
});
new WebSocket("ws://localhost:" + wss.address().port);
await promise;
});
});
describe("Server", () => {
it("sets websocket prototype properties correctly", done => {
it("sets websocket prototype properties correctly", async () => {
const wss = new Server({ port: 0 });
const { resolve, reject, promise } = Promise.withResolvers();
wss.on("connection", ws => {
try {
@@ -286,9 +289,9 @@ describe("Server", () => {
expect(ws.CLOSING).toBeDefined();
expect(ws.CONNECTING).toBeDefined();
expect(ws.OPEN).toBeDefined();
return done();
resolve();
} catch (err) {
done(err);
reject(err);
} finally {
wss.close();
ws.close();
@@ -296,24 +299,27 @@ describe("Server", () => {
});
new WebSocket("ws://localhost:" + wss.address().port);
await promise;
});
});
it("isBinary", done => {
it("isBinary", async () => {
const wss = new WebSocketServer({ port: 0 });
let isDone = false;
const { resolve, reject, promise } = Promise.withResolvers();
wss.on("connection", ws => {
ws.on("message", (data, isBinary) => {
if (isDone) {
expect(isBinary).toBeTrue();
wss.close();
ws.close();
done();
resolve();
return;
}
expect(isBinary).toBeFalse();
isDone = true;
});
ws.on("error", reject);
});
const ws = new WebSocket("ws://localhost:" + wss.address().port);
@@ -321,6 +327,8 @@ it("isBinary", done => {
ws.send("hello");
ws.send(Buffer.from([1, 2, 3]));
});
await promise;
});
it("onmessage", done => {
@@ -364,10 +372,10 @@ function test(label: string, fn: (ws: WebSocket, done: (err?: unknown) => void)
async function listen(): Promise<URL> {
const pathname = path.resolve(import.meta.dir, "../../web/websocket/websocket-server-echo.mjs");
const server = spawn({
cmd: [nodeExe() ?? bunExe(), pathname],
cmd: [bunExe(), pathname],
cwd: import.meta.dir,
env: bunEnv,
stderr: "ignore",
stderr: "inherit",
stdout: "pipe",
});
servers.push(server);