WS send with callback (#2986)

* WS send with callback

* add opts.compress support

* fmt

* compress is the only option we care

* add ws client options

* only change ws client when using blob

* fmt

* fix errors

* fixup

* fixup

* fmt
This commit is contained in:
Ciro Spaciari
2023-05-21 23:36:42 -03:00
committed by GitHub
parent d90f7c7bf6
commit fd21243efd
3 changed files with 39 additions and 44 deletions

View File

@@ -21,36 +21,28 @@ class BunWebSocket extends globalThis.WebSocket {
return this.#binaryType;
}
set binaryType(type) {
if (type !== "nodebuffer" && type !== "buffer" && type !== "blob" && type !== "arraybuffer") {
throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'");
if (type !== "nodebuffer" && type !== "blob" && type !== "arraybuffer") {
throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'");
}
if (type !== "blob") {
super.binaryType = type;
}
this.#binaryType = type;
}
send(data, opts, cb) {
super.send(data, opts?.compress);
typeof cb === "function" && cb();
}
on(event, callback) {
if (event === "message") {
var handler = ({ message }) => {
var handler = ({ data }) => {
try {
if (typeof message === "string") {
if (this.#binaryType === "arraybuffer") {
message = encoder.encode(message).buffer;
} else if (this.#binaryType === "blob") {
message = new Blob([message], { type: "text/plain" });
} else {
// nodebuffer or buffer
message = Buffer.from(message);
}
} else {
//Uint8Array
if (this.#binaryType === "arraybuffer") {
message = message.buffer;
} else if (this.#binaryType === "blob") {
message = new Blob([message]);
} else {
// nodebuffer or buffer
message = Buffer.from(message);
}
if (this.#binaryType == "blob") {
data = new Blob([data]);
}
callback(message);
callback(data);
} catch (e) {
globalThis.reportError(e);
}
@@ -349,13 +341,8 @@ class BunWebSocketMocked extends EventEmitter {
this.#url = url;
this.#bufferedAmount = 0;
binaryType = binaryType || "arraybuffer";
if (
binaryType !== "nodebuffer" &&
binaryType !== "buffer" &&
binaryType !== "blob" &&
binaryType !== "arraybuffer"
) {
throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'");
if (binaryType !== "nodebuffer" && binaryType !== "blob" && binaryType !== "arraybuffer") {
throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'");
}
this.#binaryType = binaryType;
this.#protocol = protocol;
@@ -383,7 +370,7 @@ class BunWebSocketMocked extends EventEmitter {
} else if (this.#binaryType === "blob") {
message = new Blob([message], { type: "text/plain" });
} else {
// nodebuffer or buffer
// nodebuffer
message = Buffer.from(message);
}
} else {
@@ -393,10 +380,11 @@ class BunWebSocketMocked extends EventEmitter {
} else if (this.#binaryType === "blob") {
message = new Blob([message]);
} else {
// nodebuffer or buffer
// nodebuffer
message = Buffer.from(message);
}
}
this.emit("message", message);
}
@@ -418,28 +406,35 @@ class BunWebSocketMocked extends EventEmitter {
#drain(ws) {
const chunk = this.#enquedMessages[0];
if (chunk) {
const written = ws.send(chunk);
const [data, compress, cb] = chunk;
const written = ws.send(data, compress);
if (written == -1) {
// backpressure wait until next drain event
return;
}
typeof cb === "function" && cb();
this.#bufferedAmount -= chunk.length;
this.#enquedMessages.shift();
}
}
send(data) {
send(data, opts, cb) {
if (this.#state === 1) {
const written = this.#ws.send(data);
const compress = opts?.compress;
const written = this.#ws.send(data, compress);
if (written == -1) {
// backpressure
this.#enquedMessages.push(data);
this.#enquedMessages.push([data, compress, cb]);
this.#bufferedAmount += data.length;
return;
}
typeof cb === "function" && cb();
} else if (this.#state === 0) {
// not connected yet
this.#enquedMessages.push(data);
this.#enquedMessages.push([data, opts?.compress, cb]);
this.#bufferedAmount += data.length;
}
}
@@ -455,8 +450,8 @@ class BunWebSocketMocked extends EventEmitter {
}
set binaryType(type) {
if (type !== "nodebuffer" && type !== "buffer" && type !== "blob" && type !== "arraybuffer") {
throw new TypeError("binaryType must be either 'blob', 'arraybuffer', 'nodebuffer' or 'buffer'");
if (type !== "nodebuffer" && type !== "blob" && type !== "arraybuffer") {
throw new TypeError("binaryType must be either 'blob', 'arraybuffer' or 'nodebuffer'");
}
this.#binaryType = type;
}
@@ -775,7 +770,7 @@ class Server extends EventEmitter {
? this.options.handleProtocols(protocols, request)
: protocols.values().next().value;
}
const ws = new BunWebSocketMocked(request.url, protocol, extensions, "arraybuffer");
const ws = new BunWebSocketMocked(request.url, protocol, extensions, "nodebuffer");
const headers = ["HTTP/1.1 101 Switching Protocols", "Upgrade: websocket", "Connection: Upgrade"];
this.emit("headers", headers, request);

View File

@@ -1,6 +1,6 @@
import { test } from "bun:test";
test.todo("todo 1")
test.todo("todo 1");
test.todo("todo 2", () => {
throw new Error("this error is shown");
})
});

View File

@@ -1,9 +1,9 @@
import { test } from "bun:test";
test.todo("todo 1")
test.todo("todo 1");
test.todo("todo 2", () => {
throw new Error("this error is shown");
})
});
test.todo("todo 3", () => {
// passes
});