Compare commits

...

1 Commits

Author SHA1 Message Date
Andrew Johnston
2467f60e78 fix(node:http): use a unique FakeSocket per socketId. only fire connection event once 2024-08-12 23:35:24 +01:00
3 changed files with 53 additions and 8 deletions

View File

@@ -32,6 +32,10 @@ function generate(name) {
fn: "doRequestIP",
length: 1,
},
requestSocketId: {
fn: "doRequestSocketId",
length: 1,
},
port: {
getter: "getPort",
},

View File

@@ -1324,6 +1324,13 @@ pub const AnyRequestContext = struct {
return .{ .tagged_pointer = Pointer.init(request_ctx) };
}
pub fn getRemoteSocketId(self: AnyRequestContext) i32 {
if (self.tagged_pointer.as(HTTPServer.RequestContext).getNativeHandle()) |handle| {
return handle.cast();
}
return 0;
}
pub fn getRemoteSocketInfo(self: AnyRequestContext) ?uws.SocketAddress {
if (self.tagged_pointer.isNull()) {
return null;
@@ -3584,6 +3591,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return onStartStreamingRequestBody(bun.cast(*RequestContext, this));
}
pub fn getNativeHandle(this: *RequestContext) ?bun.FileDescriptor {
return (this.resp orelse return null).getNativeHandle();
}
pub fn getRemoteSocketInfo(this: *RequestContext) ?uws.SocketAddress {
return (this.resp orelse return null).getRemoteSocketInfo();
}
@@ -5321,6 +5332,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
pub const doReload = onReload;
pub const doFetch = onFetch;
pub const doRequestIP = JSC.wrapInstanceMethod(ThisServer, "requestIP", false);
pub const doRequestSocketId = JSC.wrapInstanceMethod(ThisServer, "socketId", false);
pub usingnamespace NamespaceType;
pub usingnamespace bun.New(@This());
@@ -5332,6 +5344,10 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
extern fn JSSocketAddress__create(global: *JSC.JSGlobalObject, ip: JSValue, port: i32, is_ipv6: bool) JSValue;
pub fn socketId(_: *ThisServer, request: *JSC.WebCore.Request) JSC.JSValue {
return JSC.JSValue.jsNumber(request.request_context.getRemoteSocketId());
}
pub fn requestIP(this: *ThisServer, request: *JSC.WebCore.Request) JSC.JSValue {
if (this.config.address == .unix) {
return JSValue.jsNull();

View File

@@ -94,8 +94,8 @@ var _defaultHTTPSAgent;
var kInternalRequest = Symbol("kInternalRequest");
const kInternalSocketData = Symbol.for("::bunternal::");
const kfakeSocket = Symbol("kfakeSocket");
const kEmptyBuffer = Buffer.alloc(0);
const socketMap = new Map(); // keep track of active sockets so we can handle socket events correctly
function isValidTLSArray(obj) {
if (typeof obj === "string" || isTypedArray(obj) || obj instanceof ArrayBuffer || obj instanceof Blob) return true;
@@ -135,6 +135,8 @@ var FakeSocket = class Socket extends Duplex {
isServer = false;
#address;
#id = 0;
address() {
// Call server.requestIP() without doing any propety getter twice.
var internalData;
@@ -149,6 +151,13 @@ var FakeSocket = class Socket extends Duplex {
return this;
}
end() {
if (socketMap.has(this.#id)) {
socketMap.delete(this.#id);
this.emit("close");
}
}
_destroy(err, callback) {}
_final(callback) {}
@@ -211,6 +220,10 @@ var FakeSocket = class Socket extends Duplex {
this.address().family = val;
}
set id(v) {
this.#id = v;
}
resetAndDestroy() {}
setKeepAlive(enable = false, initialDelay = 0) {}
@@ -570,11 +583,28 @@ Server.prototype = {
const http_res = new ResponseClass(http_req, reply);
// tried this initially with _server.address() and it adds too much overhead = 156k rps v 133k rps
// this is 147k rps
/*
with these changes, bun is 66k rps and 140 MB RSS on the test provided for the issue
node is 22k rps and 220 MB RSS
*/
const socketId = _server.requestSocketId(req);
// http_req.socket.id = socketId;
if (!socketMap.has(socketId)) {
const socket = new FakeSocket();
socket.id = socketId;
socketMap.set(socketId, socket);
server.emit("connection", socket);
http_req.socket = socket;
} else {
http_req.socket = socketMap.get(socketId);
}
http_req.socket[kInternalSocketData] = [_server, http_res, req];
server.emit("connection", http_req.socket);
const rejectFn = err => reject(err);
http_req.once("error", rejectFn);
// question: why is this here twice?
http_res.once("error", rejectFn);
if (upgrade) {
@@ -705,6 +735,7 @@ function IncomingMessage(req, defaultIncomingOpts) {
this._dumped = false;
this[noBodySymbol] = false;
this[abortedSymbol] = false;
this.socket = null;
Readable.$call(this);
var { type = "request", [kInternalRequest]: nodeReq } = defaultIncomingOpts || {};
@@ -858,12 +889,6 @@ IncomingMessage.prototype = {
// noop
return this;
},
get socket() {
return (this[fakeSocketSymbol] ??= new FakeSocket());
},
set socket(value) {
this[fakeSocketSymbol] = value;
},
};
$setPrototypeDirect.$call(IncomingMessage.prototype, Readable.prototype);
$setPrototypeDirect.$call(IncomingMessage, Readable);