4415 es5 class http.Server (#7705)

This commit is contained in:
Vlad Sirenko
2023-12-20 15:55:30 -08:00
committed by GitHub
parent c271c6c38e
commit 7e511f55de
2 changed files with 403 additions and 367 deletions

View File

@@ -345,241 +345,240 @@ function emitListeningNextTick(self, onListen, err, hostname, port) {
}
}
class Server extends EventEmitter {
#server;
#options;
#tls;
#is_tls = false;
listening = false;
serverName;
var tlsSymbol = Symbol("tls");
var isTlsSymbol = Symbol("is_tls");
var optionsSymbol = Symbol("options");
var serverSymbol = Symbol("server");
function Server(options, callback) {
if (!(this instanceof Server)) return new Server(options, callback);
constructor(options, callback) {
super();
this.listening = false;
EventEmitter.$call(this);
if (typeof options === "function") {
callback = options;
options = {};
} else if (options == null || typeof options === "object") {
options = { ...options };
this[tlsSymbol] = null;
let key = options.key;
if (key) {
if (!isValidTLSArray(key)) {
throw new TypeError(
"key argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
}
this[isTlsSymbol] = true;
}
let cert = options.cert;
if (cert) {
if (!isValidTLSArray(cert)) {
throw new TypeError(
"cert argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
}
this[isTlsSymbol] = true;
}
if (typeof options === "function") {
callback = options;
options = {};
} else if (options == null || typeof options === "object") {
options = { ...options };
this.#tls = null;
let key = options.key;
if (key) {
if (!isValidTLSArray(key)) {
throw new TypeError(
"key argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
}
this.#is_tls = true;
}
let cert = options.cert;
if (cert) {
if (!isValidTLSArray(cert)) {
throw new TypeError(
"cert argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
}
this.#is_tls = true;
let ca = options.ca;
if (ca) {
if (!isValidTLSArray(ca)) {
throw new TypeError(
"ca argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
}
this[isTlsSymbol] = true;
}
let passphrase = options.passphrase;
if (passphrase && typeof passphrase !== "string") {
throw new TypeError("passphrase argument must be an string");
}
let ca = options.ca;
if (ca) {
if (!isValidTLSArray(ca)) {
throw new TypeError(
"ca argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
}
this.#is_tls = true;
}
let passphrase = options.passphrase;
if (passphrase && typeof passphrase !== "string") {
throw new TypeError("passphrase argument must be an string");
}
let serverName = options.servername;
if (serverName && typeof serverName !== "string") {
throw new TypeError("servername argument must be an string");
}
let serverName = options.servername;
if (serverName && typeof serverName !== "string") {
throw new TypeError("servername argument must be an string");
}
let secureOptions = options.secureOptions || 0;
if (secureOptions && typeof secureOptions !== "number") {
throw new TypeError("secureOptions argument must be an number");
}
let secureOptions = options.secureOptions || 0;
if (secureOptions && typeof secureOptions !== "number") {
throw new TypeError("secureOptions argument must be an number");
}
if (this.#is_tls) {
this.#tls = {
serverName,
key: key,
cert: cert,
ca: ca,
passphrase: passphrase,
secureOptions: secureOptions,
};
} else {
this.#tls = null;
}
if (this[isTlsSymbol]) {
this[tlsSymbol] = {
serverName,
key,
cert,
ca,
passphrase,
secureOptions,
};
} else {
throw new Error("bun-http-polyfill: invalid arguments");
this[tlsSymbol] = null;
}
this.#options = options;
if (callback) this.on("request", callback);
} else {
throw new Error("bun-http-polyfill: invalid arguments");
}
closeAllConnections() {
const server = this.#server;
if (!server) {
return;
}
this.#server = undefined;
server.stop(true);
this.emit("close");
}
this[optionsSymbol] = options;
closeIdleConnections() {
// not actually implemented
}
close(optionalCallback?) {
const server = this.#server;
if (!server) {
if (typeof optionalCallback === "function")
process.nextTick(optionalCallback, new Error("Server is not running"));
return;
}
this.#server = undefined;
if (typeof optionalCallback === "function") this.once("close", optionalCallback);
server.stop();
this.emit("close");
}
address() {
if (!this.#server) return null;
return this.#server.address;
}
listen(port, host, backlog, onListen) {
const server = this;
let socketPath;
if (typeof port == "string" && !Number.isSafeInteger(Number(port))) {
socketPath = port;
}
if (typeof host === "function") {
onListen = host;
host = undefined;
}
if (typeof port === "function") {
onListen = port;
} else if (typeof port === "object") {
port?.signal?.addEventListener("abort", () => {
this.close();
});
host = port?.host;
port = port?.port;
if (typeof port?.callback === "function") onListen = port?.callback;
}
if (typeof backlog === "function") {
onListen = backlog;
}
const ResponseClass = this.#options.ServerResponse || ServerResponse;
const RequestClass = this.#options.IncomingMessage || IncomingMessage;
let isHTTPS = false;
try {
const tls = this.#tls;
if (tls) {
this.serverName = tls.serverName || host || "localhost";
}
this.#server = Bun.serve<any>({
tls,
port,
hostname: host,
unix: socketPath,
// Bindings to be used for WS Server
websocket: {
open(ws) {
ws.data.open(ws);
},
message(ws, message) {
ws.data.message(ws, message);
},
close(ws, code, reason) {
ws.data.close(ws, code, reason);
},
drain(ws) {
ws.data.drain(ws);
},
},
// Be very careful not to access (web) Request object
// properties:
// - request.url
// - request.headers
//
// We want to avoid triggering the getter for these properties because
// that will cause the data to be cloned twice, which costs memory & performance.
fetch(req, _server) {
var pendingResponse;
var pendingError;
var reject = err => {
if (pendingError) return;
pendingError = err;
if (rejectFunction) rejectFunction(err);
};
var reply = function (resp) {
if (pendingResponse) return;
pendingResponse = resp;
if (resolveFunction) resolveFunction(resp);
};
const prevIsNextIncomingMessageHTTPS = isNextIncomingMessageHTTPS;
isNextIncomingMessageHTTPS = isHTTPS;
const http_req = new RequestClass(req);
isNextIncomingMessageHTTPS = prevIsNextIncomingMessageHTTPS;
const upgrade = http_req.headers.upgrade;
const http_res = new ResponseClass(http_req, reply);
http_req.socket[kInternalSocketData] = [_server, http_res, req];
const rejectFn = err => reject(err);
http_req.once("error", rejectFn);
http_res.once("error", rejectFn);
if (upgrade) {
server.emit("upgrade", http_req, http_req.socket, kEmptyBuffer);
} else {
server.emit("request", http_req, http_res);
}
if (pendingError) {
throw pendingError;
}
if (pendingResponse) {
return pendingResponse;
}
var { promise, resolve: resolveFunction, reject: rejectFunction } = $newPromiseCapability(GlobalPromise);
return promise;
},
});
isHTTPS = this.#server.protocol === "https";
setTimeout(emitListeningNextTick, 1, this, onListen, null, this.#server.hostname, this.#server.port);
} catch (err) {
server.emit("error", err);
}
return this;
}
setTimeout(msecs, callback) {}
if (callback) this.on("request", callback);
return this;
}
Object.setPrototypeOf((Server.prototype = {}), EventEmitter.prototype);
Object.setPrototypeOf(Server, EventEmitter);
Server.prototype.closeAllConnections = function () {
const server = this[serverSymbol];
if (!server) {
return;
}
this[serverSymbol] = undefined;
server.stop(true);
this.emit("close");
};
Server.prototype.closeIdleConnections = function () {
// not actually implemented
};
Server.prototype.close = function (optionalCallback?) {
const server = this[serverSymbol];
if (!server) {
if (typeof optionalCallback === "function") process.nextTick(optionalCallback, new Error("Server is not running"));
return;
}
this[serverSymbol] = undefined;
if (typeof optionalCallback === "function") this.once("close", optionalCallback);
server.stop();
this.emit("close");
};
Server.prototype.address = function () {
if (!this[serverSymbol]) return null;
return this[serverSymbol].address;
};
Server.prototype.listen = function (port, host, backlog, onListen) {
const server = this;
let socketPath;
if (typeof port == "string" && !Number.isSafeInteger(Number(port))) {
socketPath = port;
}
if (typeof host === "function") {
onListen = host;
host = undefined;
}
if (typeof port === "function") {
onListen = port;
} else if (typeof port === "object") {
port?.signal?.addEventListener("abort", () => {
this.close();
});
host = port?.host;
port = port?.port;
if (typeof port?.callback === "function") onListen = port?.callback;
}
if (typeof backlog === "function") {
onListen = backlog;
}
const ResponseClass = this[optionsSymbol].ServerResponse || ServerResponse;
const RequestClass = this[optionsSymbol].IncomingMessage || IncomingMessage;
let isHTTPS = false;
try {
const tls = this[tlsSymbol];
if (tls) {
this.serverName = tls.serverName || host || "localhost";
}
this[serverSymbol] = Bun.serve<any>({
tls,
port,
hostname: host,
unix: socketPath,
// Bindings to be used for WS Server
websocket: {
open(ws) {
ws.data.open(ws);
},
message(ws, message) {
ws.data.message(ws, message);
},
close(ws, code, reason) {
ws.data.close(ws, code, reason);
},
drain(ws) {
ws.data.drain(ws);
},
},
// Be very careful not to access (web) Request object
// properties:
// - request.url
// - request.headers
//
// We want to avoid triggering the getter for these properties because
// that will cause the data to be cloned twice, which costs memory & performance.
fetch(req, _server) {
var pendingResponse;
var pendingError;
var reject = err => {
if (pendingError) return;
pendingError = err;
if (rejectFunction) rejectFunction(err);
};
var reply = function (resp) {
if (pendingResponse) return;
pendingResponse = resp;
if (resolveFunction) resolveFunction(resp);
};
const prevIsNextIncomingMessageHTTPS = isNextIncomingMessageHTTPS;
isNextIncomingMessageHTTPS = isHTTPS;
const http_req = new RequestClass(req);
isNextIncomingMessageHTTPS = prevIsNextIncomingMessageHTTPS;
const upgrade = http_req.headers.upgrade;
const http_res = new ResponseClass(http_req, reply);
http_req.socket[kInternalSocketData] = [_server, http_res, req];
const rejectFn = err => reject(err);
http_req.once("error", rejectFn);
http_res.once("error", rejectFn);
if (upgrade) {
server.emit("upgrade", http_req, http_req.socket, kEmptyBuffer);
} else {
server.emit("request", http_req, http_res);
}
if (pendingError) {
throw pendingError;
}
if (pendingResponse) {
return pendingResponse;
}
var { promise, resolve: resolveFunction, reject: rejectFunction } = $newPromiseCapability(GlobalPromise);
return promise;
},
});
isHTTPS = this[serverSymbol].protocol === "https";
setTimeout(emitListeningNextTick, 1, this, onListen, null, this[serverSymbol].hostname, this[serverSymbol].port);
} catch (err) {
server.emit("error", err);
}
return this;
};
Server.prototype.setTimeout = function (msecs, callback) {};
function assignHeadersSlow(object, req) {
const headers = req.headers;
@@ -652,165 +651,179 @@ function requestHasNoBody(method, req) {
// This lets us skip some URL parsing
var isNextIncomingMessageHTTPS = false;
class IncomingMessage extends Readable {
method: string | null = null;
complete: boolean;
var typeSymbol = Symbol("type");
var reqSymbol = Symbol("req");
var bodyStreamSymbol = Symbol("bodyStream");
var noBodySymbol = Symbol("noBody");
var abortedSymbol = Symbol("aborted");
function IncomingMessage(req, defaultIncomingOpts) {
this.method = null;
this._consuming = false;
this._dumped = false;
this[noBodySymbol] = false;
this[abortedSymbol] = false;
Readable.$call(this);
var { type = "request", [kInternalRequest]: nodeReq } = defaultIncomingOpts || {};
constructor(req, defaultIncomingOpts) {
super();
this[reqSymbol] = req;
this[typeSymbol] = type;
var { type = "request", [kInternalRequest]: nodeReq } = defaultIncomingOpts || {};
this[bodyStreamSymbol] = undefined;
this.#req = req;
this.#type = type;
this.req = nodeReq;
this.#bodyStream = undefined;
this.req = nodeReq;
if (!assignHeaders(this, req)) {
this.#fakeSocket = req;
const reqUrl = String(req?.url || "");
this.url = reqUrl;
}
if (isNextIncomingMessageHTTPS) {
// Creating a new Duplex is expensive.
// We can skip it if the request is not HTTPS.
const socket = new FakeSocket();
this.#fakeSocket = socket;
socket.encrypted = true;
isNextIncomingMessageHTTPS = false;
}
this.#noBody =
type === "request" // TODO: Add logic for checking for body on response
? requestHasNoBody(this.method, this)
: false;
this.complete = !!this.#noBody;
if (!assignHeaders(this, req)) {
this[fakeSocketSymbol] = req;
const reqUrl = String(req?.url || "");
this.url = reqUrl;
}
headers;
rawHeaders;
_consuming = false;
_dumped = false;
#bodyStream: ReadableStreamDefaultReader | undefined;
#fakeSocket: FakeSocket | undefined = undefined;
#noBody = false;
#aborted = false;
#req;
url;
#type;
if (isNextIncomingMessageHTTPS) {
// Creating a new Duplex is expensive.
// We can skip it if the request is not HTTPS.
const socket = new FakeSocket();
this[fakeSocketSymbol] = socket;
socket.encrypted = true;
isNextIncomingMessageHTTPS = false;
}
_construct(callback) {
// TODO: streaming
if (this.#type === "response" || this.#noBody) {
callback();
return;
}
this[noBodySymbol] =
type === "request" // TODO: Add logic for checking for body on response
? requestHasNoBody(this.method, this)
: false;
const contentLength = this.headers["content-length"];
const length = contentLength ? parseInt(contentLength, 10) : 0;
if (length === 0) {
this.#noBody = true;
callback();
return;
}
this.complete = !!this[noBodySymbol];
}
Object.setPrototypeOf((IncomingMessage.prototype = {}), Readable.prototype);
Object.setPrototypeOf(IncomingMessage, Readable);
IncomingMessage.prototype._construct = function (callback) {
// TODO: streaming
if (this[typeSymbol] === "response" || this[noBodySymbol]) {
callback();
return;
}
async #consumeStream(reader: ReadableStreamDefaultReader) {
while (true) {
var { done, value } = await reader.readMany();
if (this.#aborted) return;
if (done) {
this.push(null);
process.nextTick(destroyBodyStreamNT, this);
break;
}
for (var v of value) {
this.push(v);
}
const contentLength = this.headers["content-length"];
const length = contentLength ? parseInt(contentLength, 10) : 0;
if (length === 0) {
this[noBodySymbol] = true;
callback();
return;
}
callback();
};
async function consumeStream(self, reader: ReadableStreamDefaultReader) {
while (true) {
var { done, value } = await reader.readMany();
if (self[abortedSymbol]) return;
if (done) {
self.push(null);
process.nextTick(destroyBodyStreamNT, self);
break;
}
}
_read(size) {
if (this.#noBody) {
this.push(null);
this.complete = true;
} else if (this.#bodyStream == null) {
const reader = this.#req.body?.getReader() as ReadableStreamDefaultReader;
if (!reader) {
this.push(null);
return;
}
this.#bodyStream = reader;
this.#consumeStream(reader);
for (var v of value) {
self.push(v);
}
}
get aborted() {
return this.#aborted;
}
#abort() {
if (this.#aborted) return;
this.#aborted = true;
var bodyStream = this.#bodyStream;
if (!bodyStream) return;
bodyStream.cancel();
this.complete = true;
this.#bodyStream = undefined;
this.push(null);
}
get connection() {
return (this.#fakeSocket ??= new FakeSocket());
}
get statusCode() {
return this.#req.status;
}
get statusMessage() {
return STATUS_CODES[this.#req.status];
}
get httpVersion() {
return "1.1";
}
get rawTrailers() {
return [];
}
get httpVersionMajor() {
return 1;
}
get httpVersionMinor() {
return 1;
}
get trailers() {
return kEmptyObject;
}
get socket() {
return (this.#fakeSocket ??= new FakeSocket());
}
set socket(val) {
this.#fakeSocket = val;
}
setTimeout(msecs, callback) {
throw new Error("not implemented");
}
}
IncomingMessage.prototype._read = function (size) {
if (this[noBodySymbol]) {
this.push(null);
this.complete = true;
} else if (this[bodyStreamSymbol] == null) {
const reader = this[reqSymbol].body?.getReader() as ReadableStreamDefaultReader;
if (!reader) {
this.push(null);
return;
}
this[bodyStreamSymbol] = reader;
consumeStream(this, reader);
}
};
Object.defineProperty(IncomingMessage.prototype, "aborted", {
get() {
return this[abortedSymbol];
},
});
function abort(self) {
if (self[abortedSymbol]) return;
self[abortedSymbol] = true;
var bodyStream = self[bodyStreamSymbol];
if (!bodyStream) return;
bodyStream.cancel();
self.complete = true;
self[bodyStreamSymbol] = undefined;
self.push(null);
}
Object.defineProperty(IncomingMessage.prototype, "connection", {
get() {
return (this[fakeSocketSymbol] ??= new FakeSocket());
},
});
Object.defineProperty(IncomingMessage.prototype, "statusCode", {
get() {
return this[reqSymbol].status;
},
});
Object.defineProperty(IncomingMessage.prototype, "statusMessage", {
get() {
return STATUS_CODES[this[reqSymbol].status];
},
});
Object.defineProperty(IncomingMessage.prototype, "httpVersion", {
get() {
return "1.1";
},
});
Object.defineProperty(IncomingMessage.prototype, "rawTrailers", {
get() {
return [];
},
});
Object.defineProperty(IncomingMessage.prototype, "httpVersionMajor", {
get() {
return 1;
},
});
Object.defineProperty(IncomingMessage.prototype, "httpVersionMinor", {
get() {
return 1;
},
});
Object.defineProperty(IncomingMessage.prototype, "trailers", {
get() {
return kEmptyObject;
},
});
Object.defineProperty(IncomingMessage.prototype, "socket", {
get() {
return (this[fakeSocketSymbol] ??= new FakeSocket());
},
set(val) {
this[fakeSocketSymbol] = val;
},
});
IncomingMessage.prototype.setTimeout = function (msecs, callback) {
throw new Error("not implemented");
};
function emitErrorNt(msg, err, callback) {
callback(err);
if (typeof msg.emit === "function" && !msg._closed) {

View File

@@ -1615,7 +1615,7 @@ it("#6892", () => {
}
});
it("#4415.1", () => {
it("#4415.1 ServerResponse es6", () => {
class Response extends ServerResponse {
constructor(req) {
super(req);
@@ -1626,7 +1626,7 @@ it("#4415.1", () => {
expect(res.req).toBe(req);
});
it("#4415.2", () => {
it("#4415.2 ServerResponse es5", () => {
function Response(req) {
ServerResponse.call(this, req);
}
@@ -1635,3 +1635,26 @@ it("#4415.2", () => {
const res = new Response(req);
expect(res.req).toBe(req);
});
it("#4415.3 Server es5", done => {
const server = Server((req, res) => {
res.end();
});
server.listen(0, async (_err, host, port) => {
try {
const res = await fetch(`http://localhost:${port}`);
expect(res.status).toBe(200);
done();
} catch (err) {
done(err);
} finally {
server.close();
}
});
});
it("#4415.4 IncomingMessage es5", () => {
const im = Object.create(IncomingMessage.prototype);
IncomingMessage.call(im, { url: "/foo" });
expect(im.url).toBe("/foo");
});