From e2231f15e8a11474e4b97b094e144a1983d7bfb0 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Wed, 4 Jan 2023 04:06:24 -0800 Subject: [PATCH] Support non-classes in node:net (#1712) * Support non-classes * Update net.exports.js * Make it less observable * Update net.exports.js Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> --- src/bun.js/net.exports.js | 442 ++++++++++++++++++++------------------ 1 file changed, 230 insertions(+), 212 deletions(-) diff --git a/src/bun.js/net.exports.js b/src/bun.js/net.exports.js index 183fa86e1e..967d01a7f9 100644 --- a/src/bun.js/net.exports.js +++ b/src/bun.js/net.exports.js @@ -57,246 +57,264 @@ const { Bun, createFIFO } = import.meta.primordials; const { connect: bunConnect } = Bun; const { Duplex } = import.meta.require("node:stream"); -export class Socket extends Duplex { - static #Handlers = { - close: Socket.#Close, - connectError(socket, error) { - const self = socket.data; - self.emit("error", error); +export const Socket = (function (InternalSocket) { + return Object.defineProperty( + function Socket(options) { + return new InternalSocket(options); }, - data(socket, buffer) { + Symbol.hasInstance, + { + value(instance) { + return instance instanceof InternalSocket; + }, + }, + ); +})( + class Socket extends Duplex { + static #Handlers = { + close: Socket.#Close, + connectError(socket, error) { + const self = socket.data; + self.emit("error", error); + }, + data(socket, buffer) { + const self = socket.data; + self.bytesRead += buffer.length; + const queue = self.#readQueue; + if (queue.isEmpty()) { + if (self.push(buffer)) return; + } + queue.push(buffer); + }, + drain: Socket.#Drain, + end: Socket.#Close, + error(socket, error) { + const self = socket.data; + const callback = self.#writeCallback; + if (callback) { + self.#writeCallback = null; + callback(error); + } + self.emit("error", error); + }, + open(socket) { + const self = socket.data; + socket.timeout(self.timeout); + self.#socket = socket; + self.connecting = false; + self.emit("connect"); + Socket.#Drain(socket); + }, + timeout() { + const self = socket.data; + self.emit("timeout"); + }, + }; + + static #Close(socket) { const self = socket.data; - self.bytesRead += buffer.length; + if (self.#closed) return; + self.#closed = true; const queue = self.#readQueue; if (queue.isEmpty()) { - if (self.push(buffer)) return; + if (self.push(null)) return; } - queue.push(buffer); - }, - drain: Socket.#Drain, - end: Socket.#Close, - error(socket, error) { + queue.push(null); + } + + static #Drain(socket) { const self = socket.data; const callback = self.#writeCallback; if (callback) { - self.#writeCallback = null; - callback(error); - } - self.emit("error", error); - }, - open(socket) { - const self = socket.data; - socket.timeout(self.timeout); - self.#socket = socket; - self.connecting = false; - self.emit("connect"); - Socket.#Drain(socket); - }, - timeout() { - const self = socket.data; - self.emit("timeout"); - }, - }; - - static #Close(socket) { - const self = socket.data; - if (self.#closed) return; - self.#closed = true; - const queue = self.#readQueue; - if (queue.isEmpty()) { - if (self.push(null)) return; - } - queue.push(null); - } - - static #Drain(socket) { - const self = socket.data; - const callback = self.#writeCallback; - if (callback) { - const chunk = self.#writeChunk; - const written = socket.write(chunk); - self.bytesWritten += written; - if (written < chunk.length) { - self.#writeChunk = chunk.slice(written); - } else { - self.#writeCallback = null; - self.#writeChunk = null; - callback(null); + const chunk = self.#writeChunk; + const written = socket.write(chunk); + self.bytesWritten += written; + if (written < chunk.length) { + self.#writeChunk = chunk.slice(written); + } else { + self.#writeCallback = null; + self.#writeChunk = null; + callback(null); + } } } - } - bytesRead = 0; - bytesWritten = 0; - #closed = false; - connecting = false; - localAddress = "127.0.0.1"; - #readQueue = createFIFO(); - remotePort; - #socket; - timeout = 0; - #writeCallback; - #writeChunk; + bytesRead = 0; + bytesWritten = 0; + #closed = false; + connecting = false; + localAddress = "127.0.0.1"; + #readQueue = createFIFO(); + remotePort; + #socket; + timeout = 0; + #writeCallback; + #writeChunk; - constructor(options) { - super({ - allowHalfOpen: options?.allowHalfOpen || false, - readable: true, - writable: true, - }); - options?.signal?.once("abort", () => this.destroy()); - this.once("connect", () => this.emit("ready")); - // TODO support `options.fd` - } - - address() { - return { - address: this.localAddress, - family: this.localFamily, - port: this.localPort, - }; - } - - get bufferSize() { - return this.writableLength; - } - - connect(port, host, connectListener) { - // TODO support IPC sockets - if (typeof host == "function") { - connectListener = host; - host = undefined; + constructor(options) { + super({ + allowHalfOpen: options?.allowHalfOpen || false, + readable: true, + writable: true, + }); + options?.signal?.once("abort", () => this.destroy()); + this.once("connect", () => this.emit("ready")); + // TODO support `options.fd` } - if (typeof port == "object") { - var { - port, - host, - // TODOs - localAddress, - localPort, - family, - hints, - lookup, - noDelay, - keepAlive, - keepAliveInitialDelay, - } = port; + + address() { + return { + address: this.localAddress, + family: this.localFamily, + port: this.localPort, + }; } - this.connecting = true; - this.remotePort = port; - if (connectListener) this.on("connect", connectListener); - bunConnect({ - data: this, - hostname: host || "localhost", - port: port, - socket: Socket.#Handlers, - }); - return this; - } - _destroy(err, callback) { - this.#socket?.end(); - callback(err); - } - - _final(callback) { - this.#socket.end(); - callback(); - } - - get localAddress() { - return "127.0.0.1"; - } - - get localFamily() { - return "IPv4"; - } - - get localPort() { - return this.#socket?.localPort; - } - - get pending() { - return this.connecting; - } - - _read(size) { - const queue = this.#readQueue; - let chunk; - while (chunk = queue.peek()) { - if (!this.push(chunk)) break; - queue.shift(); + get bufferSize() { + return this.writableLength; } - } - get readyState() { - if (this.connecting) return "opening"; - if (this.readable) { - return this.writable ? "open" : "readOnly"; - } else { - return this.writable ? "writeOnly" : "closed"; + connect(port, host, connectListener) { + // TODO support IPC sockets + if (typeof host == "function") { + connectListener = host; + host = undefined; + } + if (typeof port == "object") { + var { + port, + host, + // TODOs + localAddress, + localPort, + family, + hints, + lookup, + noDelay, + keepAlive, + keepAliveInitialDelay, + } = port; + } + this.connecting = true; + this.remotePort = port; + if (connectListener) this.on("connect", connectListener); + bunConnect({ + data: this, + hostname: host || "localhost", + port: port, + socket: Socket.#Handlers, + }); + return this; } - } - ref() { - this.#socket?.ref(); - } + _destroy(err, callback) { + this.#socket?.end(); + callback(err); + } - get remoteAddress() { - return this.#socket.remoteAddress; - } - - get remoteFamily() { - return "IPv4"; - } - - resetAndDestroy() { - this.#socket?.end(); - } - - setKeepAlive(enable = false, initialDelay = 0) { - // TODO - } - - setNoDelay(noDelay = true) { - // TODO - } - - setTimeout(timeout, callback) { - this.#socket?.timeout(timeout); - this.timeout = timeout; - if (callback) this.once("timeout", callback); - return this; - } - - unref() { - this.#socket?.unref(); - } - - _write(chunk, encoding, callback) { - if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding); - var written = this.#socket?.write(chunk); - if (written == chunk.length) { + _final(callback) { + this.#socket.end(); callback(); - } else if (this.#writeCallback) { - callback(new Error("overlapping _write()")); - } else { - if (written > 0) chunk = chunk.slice(written); - this.#writeCallback = callback; - this.#writeChunk = chunk; } - } -} + + get localAddress() { + return "127.0.0.1"; + } + + get localFamily() { + return "IPv4"; + } + + get localPort() { + return this.#socket?.localPort; + } + + get pending() { + return this.connecting; + } + + _read(size) { + const queue = this.#readQueue; + let chunk; + while ((chunk = queue.peek())) { + if (!this.push(chunk)) break; + queue.shift(); + } + } + + get readyState() { + if (this.connecting) return "opening"; + if (this.readable) { + return this.writable ? "open" : "readOnly"; + } else { + return this.writable ? "writeOnly" : "closed"; + } + } + + ref() { + this.#socket?.ref(); + } + + get remoteAddress() { + return this.#socket.remoteAddress; + } + + get remoteFamily() { + return "IPv4"; + } + + resetAndDestroy() { + this.#socket?.end(); + } + + setKeepAlive(enable = false, initialDelay = 0) { + // TODO + } + + setNoDelay(noDelay = true) { + // TODO + } + + setTimeout(timeout, callback) { + this.#socket?.timeout(timeout); + this.timeout = timeout; + if (callback) this.once("timeout", callback); + return this; + } + + unref() { + this.#socket?.unref(); + } + + _write(chunk, encoding, callback) { + if (typeof chunk == "string" && encoding !== "utf8") + chunk = Buffer.from(chunk, encoding); + var written = this.#socket?.write(chunk); + if (written == chunk.length) { + callback(); + } else if (this.#writeCallback) { + callback(new Error("overlapping _write()")); + } else { + if (written > 0) chunk = chunk.slice(written); + this.#writeCallback = callback; + this.#writeChunk = chunk; + } + } + }, +); export function createConnection(port, host, connectListener) { if (typeof host == "function") { connectListener = host; host = undefined; } - var options = typeof port == "object" ? port : { - host: host, - port: port, - }; + var options = + typeof port == "object" + ? port + : { + host: host, + port: port, + }; return new Socket(options).connect(options, connectListener); }