Support non-classes in node:net (#1712)

* Support non-classes

* Update net.exports.js

* Make it less observable

* Update net.exports.js

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
This commit is contained in:
Jarred Sumner
2023-01-04 04:06:24 -08:00
committed by GitHub
parent a19c7b4304
commit e2231f15e8

View File

@@ -57,246 +57,264 @@ const { Bun, createFIFO } = import.meta.primordials;
const { connect: bunConnect } = Bun;
const { Duplex } = import.meta.require("node:stream");
export class Socket extends Duplex {
static #Handlers = {
close: Socket.#Close,
connectError(socket, error) {
const self = socket.data;
self.emit("error", error);
export const Socket = (function (InternalSocket) {
return Object.defineProperty(
function Socket(options) {
return new InternalSocket(options);
},
data(socket, buffer) {
Symbol.hasInstance,
{
value(instance) {
return instance instanceof InternalSocket;
},
},
);
})(
class Socket extends Duplex {
static #Handlers = {
close: Socket.#Close,
connectError(socket, error) {
const self = socket.data;
self.emit("error", error);
},
data(socket, buffer) {
const self = socket.data;
self.bytesRead += buffer.length;
const queue = self.#readQueue;
if (queue.isEmpty()) {
if (self.push(buffer)) return;
}
queue.push(buffer);
},
drain: Socket.#Drain,
end: Socket.#Close,
error(socket, error) {
const self = socket.data;
const callback = self.#writeCallback;
if (callback) {
self.#writeCallback = null;
callback(error);
}
self.emit("error", error);
},
open(socket) {
const self = socket.data;
socket.timeout(self.timeout);
self.#socket = socket;
self.connecting = false;
self.emit("connect");
Socket.#Drain(socket);
},
timeout() {
const self = socket.data;
self.emit("timeout");
},
};
static #Close(socket) {
const self = socket.data;
self.bytesRead += buffer.length;
if (self.#closed) return;
self.#closed = true;
const queue = self.#readQueue;
if (queue.isEmpty()) {
if (self.push(buffer)) return;
if (self.push(null)) return;
}
queue.push(buffer);
},
drain: Socket.#Drain,
end: Socket.#Close,
error(socket, error) {
queue.push(null);
}
static #Drain(socket) {
const self = socket.data;
const callback = self.#writeCallback;
if (callback) {
self.#writeCallback = null;
callback(error);
}
self.emit("error", error);
},
open(socket) {
const self = socket.data;
socket.timeout(self.timeout);
self.#socket = socket;
self.connecting = false;
self.emit("connect");
Socket.#Drain(socket);
},
timeout() {
const self = socket.data;
self.emit("timeout");
},
};
static #Close(socket) {
const self = socket.data;
if (self.#closed) return;
self.#closed = true;
const queue = self.#readQueue;
if (queue.isEmpty()) {
if (self.push(null)) return;
}
queue.push(null);
}
static #Drain(socket) {
const self = socket.data;
const callback = self.#writeCallback;
if (callback) {
const chunk = self.#writeChunk;
const written = socket.write(chunk);
self.bytesWritten += written;
if (written < chunk.length) {
self.#writeChunk = chunk.slice(written);
} else {
self.#writeCallback = null;
self.#writeChunk = null;
callback(null);
const chunk = self.#writeChunk;
const written = socket.write(chunk);
self.bytesWritten += written;
if (written < chunk.length) {
self.#writeChunk = chunk.slice(written);
} else {
self.#writeCallback = null;
self.#writeChunk = null;
callback(null);
}
}
}
}
bytesRead = 0;
bytesWritten = 0;
#closed = false;
connecting = false;
localAddress = "127.0.0.1";
#readQueue = createFIFO();
remotePort;
#socket;
timeout = 0;
#writeCallback;
#writeChunk;
bytesRead = 0;
bytesWritten = 0;
#closed = false;
connecting = false;
localAddress = "127.0.0.1";
#readQueue = createFIFO();
remotePort;
#socket;
timeout = 0;
#writeCallback;
#writeChunk;
constructor(options) {
super({
allowHalfOpen: options?.allowHalfOpen || false,
readable: true,
writable: true,
});
options?.signal?.once("abort", () => this.destroy());
this.once("connect", () => this.emit("ready"));
// TODO support `options.fd`
}
address() {
return {
address: this.localAddress,
family: this.localFamily,
port: this.localPort,
};
}
get bufferSize() {
return this.writableLength;
}
connect(port, host, connectListener) {
// TODO support IPC sockets
if (typeof host == "function") {
connectListener = host;
host = undefined;
constructor(options) {
super({
allowHalfOpen: options?.allowHalfOpen || false,
readable: true,
writable: true,
});
options?.signal?.once("abort", () => this.destroy());
this.once("connect", () => this.emit("ready"));
// TODO support `options.fd`
}
if (typeof port == "object") {
var {
port,
host,
// TODOs
localAddress,
localPort,
family,
hints,
lookup,
noDelay,
keepAlive,
keepAliveInitialDelay,
} = port;
address() {
return {
address: this.localAddress,
family: this.localFamily,
port: this.localPort,
};
}
this.connecting = true;
this.remotePort = port;
if (connectListener) this.on("connect", connectListener);
bunConnect({
data: this,
hostname: host || "localhost",
port: port,
socket: Socket.#Handlers,
});
return this;
}
_destroy(err, callback) {
this.#socket?.end();
callback(err);
}
_final(callback) {
this.#socket.end();
callback();
}
get localAddress() {
return "127.0.0.1";
}
get localFamily() {
return "IPv4";
}
get localPort() {
return this.#socket?.localPort;
}
get pending() {
return this.connecting;
}
_read(size) {
const queue = this.#readQueue;
let chunk;
while (chunk = queue.peek()) {
if (!this.push(chunk)) break;
queue.shift();
get bufferSize() {
return this.writableLength;
}
}
get readyState() {
if (this.connecting) return "opening";
if (this.readable) {
return this.writable ? "open" : "readOnly";
} else {
return this.writable ? "writeOnly" : "closed";
connect(port, host, connectListener) {
// TODO support IPC sockets
if (typeof host == "function") {
connectListener = host;
host = undefined;
}
if (typeof port == "object") {
var {
port,
host,
// TODOs
localAddress,
localPort,
family,
hints,
lookup,
noDelay,
keepAlive,
keepAliveInitialDelay,
} = port;
}
this.connecting = true;
this.remotePort = port;
if (connectListener) this.on("connect", connectListener);
bunConnect({
data: this,
hostname: host || "localhost",
port: port,
socket: Socket.#Handlers,
});
return this;
}
}
ref() {
this.#socket?.ref();
}
_destroy(err, callback) {
this.#socket?.end();
callback(err);
}
get remoteAddress() {
return this.#socket.remoteAddress;
}
get remoteFamily() {
return "IPv4";
}
resetAndDestroy() {
this.#socket?.end();
}
setKeepAlive(enable = false, initialDelay = 0) {
// TODO
}
setNoDelay(noDelay = true) {
// TODO
}
setTimeout(timeout, callback) {
this.#socket?.timeout(timeout);
this.timeout = timeout;
if (callback) this.once("timeout", callback);
return this;
}
unref() {
this.#socket?.unref();
}
_write(chunk, encoding, callback) {
if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding);
var written = this.#socket?.write(chunk);
if (written == chunk.length) {
_final(callback) {
this.#socket.end();
callback();
} else if (this.#writeCallback) {
callback(new Error("overlapping _write()"));
} else {
if (written > 0) chunk = chunk.slice(written);
this.#writeCallback = callback;
this.#writeChunk = chunk;
}
}
}
get localAddress() {
return "127.0.0.1";
}
get localFamily() {
return "IPv4";
}
get localPort() {
return this.#socket?.localPort;
}
get pending() {
return this.connecting;
}
_read(size) {
const queue = this.#readQueue;
let chunk;
while ((chunk = queue.peek())) {
if (!this.push(chunk)) break;
queue.shift();
}
}
get readyState() {
if (this.connecting) return "opening";
if (this.readable) {
return this.writable ? "open" : "readOnly";
} else {
return this.writable ? "writeOnly" : "closed";
}
}
ref() {
this.#socket?.ref();
}
get remoteAddress() {
return this.#socket.remoteAddress;
}
get remoteFamily() {
return "IPv4";
}
resetAndDestroy() {
this.#socket?.end();
}
setKeepAlive(enable = false, initialDelay = 0) {
// TODO
}
setNoDelay(noDelay = true) {
// TODO
}
setTimeout(timeout, callback) {
this.#socket?.timeout(timeout);
this.timeout = timeout;
if (callback) this.once("timeout", callback);
return this;
}
unref() {
this.#socket?.unref();
}
_write(chunk, encoding, callback) {
if (typeof chunk == "string" && encoding !== "utf8")
chunk = Buffer.from(chunk, encoding);
var written = this.#socket?.write(chunk);
if (written == chunk.length) {
callback();
} else if (this.#writeCallback) {
callback(new Error("overlapping _write()"));
} else {
if (written > 0) chunk = chunk.slice(written);
this.#writeCallback = callback;
this.#writeChunk = chunk;
}
}
},
);
export function createConnection(port, host, connectListener) {
if (typeof host == "function") {
connectListener = host;
host = undefined;
}
var options = typeof port == "object" ? port : {
host: host,
port: port,
};
var options =
typeof port == "object"
? port
: {
host: host,
port: port,
};
return new Socket(options).connect(options, connectListener);
}