From 57cda4a4452e20965fe71a629c4285e1be7844ce Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Wed, 26 Mar 2025 04:46:35 -0700 Subject: [PATCH] Clean-up after #18485 (#18489) --- packages/bun-uws/src/HttpContext.h | 2 +- src/bun.js/api/server.zig | 24 +- src/bun.js/api/server/NodeHTTPResponse.zig | 243 +++++++++------- src/bun.js/bindings/NodeHTTP.cpp | 29 +- src/js/node/http.ts | 3 +- test/js/node/http/node-http-uaf-fixture.ts | 16 +- test/js/node/http/node-http-uaf.test.ts | 5 +- .../test-http-request-invalid-method-error.js | 13 + ...-http-response-remove-header-after-sent.js | 24 ++ .../express-memory-leak-fixture.mjs | 67 +++++ .../body-parser/express-memory-leak.test.ts | 262 ++++++++++++++++++ 11 files changed, 558 insertions(+), 130 deletions(-) create mode 100644 test/js/node/test/parallel/test-http-request-invalid-method-error.js create mode 100644 test/js/node/test/parallel/test-http-response-remove-header-after-sent.js create mode 100644 test/js/third_party/body-parser/express-memory-leak-fixture.mjs create mode 100644 test/js/third_party/body-parser/express-memory-leak.test.ts diff --git a/packages/bun-uws/src/HttpContext.h b/packages/bun-uws/src/HttpContext.h index 7ecddfc7fa..dcc16b641a 100644 --- a/packages/bun-uws/src/HttpContext.h +++ b/packages/bun-uws/src/HttpContext.h @@ -235,7 +235,7 @@ private: } /* Returning from a request handler without responding or attaching an onAborted handler is ill-use */ - if (!((HttpResponse *) s)->hasResponded() && !httpResponseData->onAborted) { + if (!((HttpResponse *) s)->hasResponded() && !httpResponseData->onAborted && !httpResponseData->socketData) { /* Throw exception here? */ std::cerr << "Error: Returning from a request handler without responding or attaching an abort handler is forbidden!" << std::endl; std::terminate(); diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 13c70ebb13..f8047f3bbf 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -5278,9 +5278,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp if (arguments[0].as(Request)) |request| { _ = request.request_context.setTimeout(value); } else if (arguments[0].as(NodeHTTPResponse)) |response| { - if (!response.finished) { - _ = response.response.timeout(@intCast(@min(value, 255))); - } + response.setTimeout(@truncate(value % 255)); } else { return this.globalThis.throwInvalidArguments("timeout() requires a Request object", .{}); } @@ -5358,7 +5356,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp } if (object.as(NodeHTTPResponse)) |nodeHttpResponse| { - if (nodeHttpResponse.aborted or nodeHttpResponse.ended) { + if (nodeHttpResponse.ended or nodeHttpResponse.socket_closed) { return JSC.jsBoolean(false); } @@ -5427,8 +5425,8 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp } // we must write the status first so that 200 OK isn't written - nodeHttpResponse.response.writeStatus("101 Switching Protocols"); - fetch_headers_to_use.toUWSResponse(comptime ssl_enabled, nodeHttpResponse.response.socket()); + nodeHttpResponse.raw_response.writeStatus("101 Switching Protocols"); + fetch_headers_to_use.toUWSResponse(comptime ssl_enabled, nodeHttpResponse.raw_response.socket()); } if (globalThis.hasException()) { @@ -6486,7 +6484,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp .pending => { globalThis.handleRejectedPromises(); if (node_http_response) |node_response| { - if (node_response.finished or node_response.aborted or node_response.upgraded) { + if (node_response.request_has_completed or node_response.socket_closed or node_response.upgraded) { strong_promise.deinit(); break :brk .{ .success = {} }; } @@ -6517,12 +6515,12 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp _ = vm.uncaughtException(globalThis, err, http_result == .rejection); if (node_http_response) |node_response| { - if (!node_response.finished and node_response.response.state().isResponsePending()) { - if (node_response.response.state().isHttpStatusCalled()) { - node_response.response.writeStatus("500 Internal Server Error"); - node_response.response.endWithoutBody(true); + if (!node_response.request_has_completed and node_response.raw_response.state().isResponsePending()) { + if (node_response.raw_response.state().isHttpStatusCalled()) { + node_response.raw_response.writeStatus("500 Internal Server Error"); + node_response.raw_response.endWithoutBody(true); } else { - node_response.response.endStream(true); + node_response.raw_response.endStream(true); } } node_response.onRequestComplete(); @@ -6533,7 +6531,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp } if (node_http_response) |node_response| { - if (!node_response.finished and node_response.response.state().isResponsePending()) { + if (!node_response.request_has_completed and node_response.raw_response.state().isResponsePending()) { node_response.setOnAbortedHandler(); } // If we ended the response without attaching an ondata handler, we discard the body read stream diff --git a/src/bun.js/api/server/NodeHTTPResponse.zig b/src/bun.js/api/server/NodeHTTPResponse.zig index 96cdf42a22..7d61a6d960 100644 --- a/src/bun.js/api/server/NodeHTTPResponse.zig +++ b/src/bun.js/api/server/NodeHTTPResponse.zig @@ -1,15 +1,13 @@ -response: uws.AnyResponse, +raw_response: uws.AnyResponse, onDataCallback: JSC.Strong = .empty, onWritableCallback: JSC.Strong = .empty, -onAbortedCallback: JSC.Strong = .empty, ref_count: u32 = 1, js_ref: JSC.Ref = .{}, -aborted: bool = false, -finished: bool = false, +socket_closed: bool = false, +request_has_completed: bool = false, ended: bool = false, upgraded: bool = false, -closed: bool = false, hasCustomOnData: bool = false, is_request_pending: bool = true, body_read_state: BodyReadState = .none, @@ -82,18 +80,29 @@ pub const BodyReadState = enum(u8) { extern "C" fn Bun__getNodeHTTPResponseThisValue(bool, *anyopaque) JSC.JSValue; pub fn getThisValue(this: *NodeHTTPResponse) JSC.JSValue { - return Bun__getNodeHTTPResponseThisValue(this.response == .SSL, this.response.socket()); + if (this.socket_closed) { + return .zero; + } + + return Bun__getNodeHTTPResponseThisValue(this.raw_response == .SSL, this.raw_response.socket()); } extern "C" fn Bun__getNodeHTTPServerSocketThisValue(bool, *anyopaque) JSC.JSValue; pub fn getServerSocketValue(this: *NodeHTTPResponse) JSC.JSValue { - return Bun__getNodeHTTPServerSocketThisValue(this.response == .SSL, this.response.socket()); + if (this.socket_closed) { + return .zero; + } + + return Bun__getNodeHTTPServerSocketThisValue(this.raw_response == .SSL, this.raw_response.socket()); } pub fn upgrade(this: *NodeHTTPResponse, data_value: JSValue, sec_websocket_protocol: ZigString, sec_websocket_extensions: ZigString) bool { const upgrade_ctx = this.upgrade_context.context orelse return false; const ws_handler = this.server.webSocketHandler() orelse return false; const socketValue = this.getServerSocketValue(); + if (socketValue == .zero) { + return false; + } defer { this.setOnAbortedHandler(); @@ -112,9 +121,9 @@ pub fn upgrade(this: *NodeHTTPResponse, data_value: JSValue, sec_websocket_proto Bun__setNodeHTTPServerSocketUsSocketValue(socketValue, socket); ServerWebSocket.socketSetCached(ws.getThisValue(), ws_handler.globalObject, socketValue); defer this.js_ref.unref(JSC.VirtualMachine.get()); - switch (this.response) { - .SSL => this.response = uws.AnyResponse.init(uws.NewApp(true).Response.castRes(@alignCast(@ptrCast(socket)))), - .TCP => this.response = uws.AnyResponse.init(uws.NewApp(false).Response.castRes(@alignCast(@ptrCast(socket)))), + switch (this.raw_response) { + .SSL => this.raw_response = uws.AnyResponse.init(uws.NewApp(true).Response.castRes(@alignCast(@ptrCast(socket)))), + .TCP => this.raw_response = uws.AnyResponse.init(uws.NewApp(false).Response.castRes(@alignCast(@ptrCast(socket)))), } }; @@ -144,7 +153,7 @@ pub fn upgrade(this: *NodeHTTPResponse, data_value: JSValue, sec_websocket_proto if (sec_websocket_extensions_str) |str| str.deinit(); } - new_socket = this.response.upgrade( + new_socket = this.raw_response.upgrade( *ServerWebSocket, ws, request.header("sec-websocket-key") orelse "", @@ -178,7 +187,7 @@ pub fn upgrade(this: *NodeHTTPResponse, data_value: JSValue, sec_websocket_proto if (sec_websocket_extensions_str) |str| str.deinit(); } - new_socket = this.response.upgrade( + new_socket = this.raw_response.upgrade( *ServerWebSocket, ws, this.upgrade_context.sec_websocket_key, @@ -191,9 +200,9 @@ pub fn upgrade(this: *NodeHTTPResponse, data_value: JSValue, sec_websocket_proto pub fn maybeStopReadingBody(this: *NodeHTTPResponse, vm: *JSC.VirtualMachine) void { this.upgrade_context.deinit(); // we can discard the upgrade context now - if ((this.aborted or this.ended) and (this.body_read_ref.has or this.body_read_state == .pending) and (!this.hasCustomOnData or !this.onDataCallback.has())) { + if ((this.socket_closed or this.ended) and (this.body_read_ref.has or this.body_read_state == .pending) and (!this.hasCustomOnData or !this.onDataCallback.has())) { const had_ref = this.body_read_ref.has; - this.response.clearOnData(); + this.raw_response.clearOnData(); this.body_read_ref.unref(vm); this.body_read_state = .done; @@ -206,7 +215,7 @@ pub fn maybeStopReadingBody(this: *NodeHTTPResponse, vm: *JSC.VirtualMachine) vo } pub fn shouldRequestBePending(this: *const NodeHTTPResponse) bool { - if (this.aborted) { + if (this.socket_closed) { return false; } @@ -223,7 +232,7 @@ pub fn dumpRequestBody(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObjec if (this.buffered_request_body_data_during_pause.len > 0) { this.buffered_request_body_data_during_pause.deinitWithAllocator(bun.default_allocator); } - if (!this.finished) { + if (!this.request_has_completed) { this.clearOnDataCallback(); } @@ -280,7 +289,7 @@ pub fn create( .request = request, }, .server = AnyServer{ .ptr = AnyServer.Ptr.from(@ptrFromInt(any_server_tag)) }, - .response = switch (is_ssl != 0) { + .raw_response = switch (is_ssl != 0) { true => uws.AnyResponse{ .SSL = @ptrCast(response_ptr) }, false => uws.AnyResponse{ .TCP = @ptrCast(response_ptr) }, }, @@ -301,17 +310,19 @@ pub fn create( } pub fn setOnAbortedHandler(this: *NodeHTTPResponse) void { + if (this.socket_closed) { + return; + } // Don't overwrite WebSocket user data if (!this.upgraded) { - this.response.onAborted(*NodeHTTPResponse, onAbort, this); - this.response.onTimeout(*NodeHTTPResponse, onTimeout, this); + this.raw_response.onTimeout(*NodeHTTPResponse, onTimeout, this); } // detach and this.upgrade_context.preserveWebSocketHeadersIfNeeded(); } fn isDone(this: *const NodeHTTPResponse) bool { - return this.finished or this.ended or this.aborted; + return this.request_has_completed or this.ended or this.socket_closed; } pub fn getEnded(this: *const NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSValue { @@ -319,11 +330,11 @@ pub fn getEnded(this: *const NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSVal } pub fn getFinished(this: *const NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSValue { - return JSC.JSValue.jsBoolean(this.finished); + return JSC.JSValue.jsBoolean(this.request_has_completed); } pub fn getAborted(this: *const NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSValue { - return JSC.JSValue.jsBoolean(this.aborted); + return JSC.JSValue.jsBoolean(this.socket_closed); } pub fn getHasBody(this: *const NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSValue { @@ -344,11 +355,11 @@ pub fn getHasBody(this: *const NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSV } pub fn getBufferedAmount(this: *const NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSValue { - if (this.finished) { + if (this.request_has_completed or this.socket_closed) { return JSC.JSValue.jsNull(); } - return JSC.JSValue.jsNumber(this.response.getBufferedAmount()); + return JSC.JSValue.jsNumber(this.raw_response.getBufferedAmount()); } pub fn jsRef(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue { @@ -394,7 +405,7 @@ pub fn writeHead(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, cal return globalObject.ERR_STREAM_ALREADY_FINISHED("Stream is already ended", .{}).throw(); } - const state = this.response.state(); + const state = this.raw_response.state(); try handleEndedIfNecessary(state, globalObject); const status_code_value = if (arguments.len > 0) arguments[0] else .undefined; @@ -431,7 +442,7 @@ pub fn writeHead(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, cal do_it: { if (status_message_slice.len == 0) { if (HTTPStatusText.get(@intCast(status_code))) |status_message| { - writeHeadInternal(this.response, globalObject, status_message, headers_object_value); + writeHeadInternal(this.raw_response, globalObject, status_message, headers_object_value); break :do_it; } } @@ -439,7 +450,7 @@ pub fn writeHead(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, cal const message = if (status_message_slice.len > 0) status_message_slice.slice() else "HM"; const status_message = std.fmt.allocPrint(allocator, "{d} {s}", .{ status_code, message }) catch bun.outOfMemory(); defer allocator.free(status_message); - writeHeadInternal(this.response, globalObject, status_message, headers_object_value); + writeHeadInternal(this.raw_response, globalObject, status_message, headers_object_value); break :do_it; } @@ -461,10 +472,10 @@ pub fn writeContinue(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, return .undefined; } - const state = this.response.state(); + const state = this.raw_response.state(); try handleEndedIfNecessary(state, globalObject); - this.response.writeContinue(); + this.raw_response.writeContinue(); return .undefined; } @@ -474,27 +485,28 @@ pub const AbortEvent = enum(u8) { timeout = 2, }; -fn handleAbortOrTimeout(this: *NodeHTTPResponse, comptime event: AbortEvent) void { - if (this.finished) { +fn handleAbortOrTimeout(this: *NodeHTTPResponse, comptime event: AbortEvent, js_value: JSC.JSValue) void { + if (this.request_has_completed) { return; } if (event == .abort) { - this.aborted = true; + this.socket_closed = true; } this.ref(); defer this.deref(); defer if (event == .abort) this.markRequestAsDoneIfNecessary(); - const js_this: JSValue = this.getThisValue(); - if (this.onAbortedCallback.get()) |on_aborted| { + const js_this: JSValue = if (js_value == .zero) this.getThisValue() else js_value; + if (NodeHTTPResponse.onAbortedGetCached(js_this)) |on_aborted| { + const globalThis = JSC.VirtualMachine.get().global; defer { if (event == .abort) { - this.onAbortedCallback.deinit(); + NodeHTTPResponse.onAbortedSetCached(js_this, globalThis, .zero); } } - const globalThis = JSC.VirtualMachine.get().global; + const vm = globalThis.bunVM(); const event_loop = vm.eventLoop(); @@ -508,30 +520,28 @@ fn handleAbortOrTimeout(this: *NodeHTTPResponse, comptime event: AbortEvent) voi } } -pub fn onAbort(this: *NodeHTTPResponse, response: uws.AnyResponse) void { - _ = response; // autofix +pub fn onAbort(this: *NodeHTTPResponse, js_value: JSC.JSValue) void { log("onAbort", .{}); - this.handleAbortOrTimeout(.abort); + this.handleAbortOrTimeout(.abort, js_value); } -pub fn onTimeout(this: *NodeHTTPResponse, response: uws.AnyResponse) void { - _ = response; // autofix +pub fn onTimeout(this: *NodeHTTPResponse, _: uws.AnyResponse) void { log("onTimeout", .{}); - this.handleAbortOrTimeout(.timeout); + this.handleAbortOrTimeout(.timeout, .zero); } pub fn doPause(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue { _ = globalObject; // autofix _ = callframe; // autofix - if (this.finished or this.aborted) { + if (this.request_has_completed or this.socket_closed) { return .false; } if (this.body_read_ref.has and !this.onDataCallback.has()) { this.is_data_buffered_during_pause = true; - this.response.onData(*NodeHTTPResponse, onBufferRequestBodyWhilePaused, this); + this.raw_response.onData(*NodeHTTPResponse, onBufferRequestBodyWhilePaused, this); } - this.response.pause(); + this.raw_response.pause(); return .true; } @@ -551,13 +561,13 @@ fn drainBufferedRequestBodyFromPause(this: *NodeHTTPResponse, globalObject: *JSC pub fn doResume(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue { _ = callframe; // autofix - if (this.finished or this.aborted) { + if (this.request_has_completed or this.socket_closed) { return .false; } var result = JSC.JSValue.true; if (this.is_data_buffered_during_pause) { - this.response.clearOnData(); + this.raw_response.clearOnData(); this.is_data_buffered_during_pause = false; } @@ -565,16 +575,16 @@ pub fn doResume(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, call result = buffered_data; } - this.response.@"resume"(); + this.raw_response.@"resume"(); return result; } pub fn onRequestComplete(this: *NodeHTTPResponse) void { - if (this.finished) { + if (this.request_has_completed) { return; } log("onRequestComplete", .{}); - this.finished = true; + this.request_has_completed = true; this.js_ref.unref(JSC.VirtualMachine.get()); this.clearJSValues(); @@ -589,14 +599,17 @@ pub export fn Bun__NodeHTTPRequest__onResolve(globalObject: *JSC.JSGlobalObject, defer this.deref(); this.maybeStopReadingBody(globalObject.bunVM()); - if (!this.finished and !this.aborted) { + if (!this.request_has_completed and !this.socket_closed) { + const this_value = this.getThisValue(); + if (this_value != .zero) { + NodeHTTPResponse.onAbortedSetCached(this_value, globalObject, .zero); + } this.clearJSValues(); - this.response.clearAborted(); - this.response.clearOnData(); - this.response.clearOnWritable(); - this.response.clearTimeout(); - if (this.response.state().isResponsePending()) { - this.response.endWithoutBody(this.response.state().isHttpConnectionClose()); + this.raw_response.clearOnData(); + this.raw_response.clearOnWritable(); + this.raw_response.clearTimeout(); + if (this.raw_response.state().isResponsePending()) { + this.raw_response.endWithoutBody(this.raw_response.state().isHttpConnectionClose()); } this.onRequestComplete(); } @@ -613,16 +626,18 @@ pub export fn Bun__NodeHTTPRequest__onReject(globalObject: *JSC.JSGlobalObject, defer this.deref(); - if (!this.finished and !this.aborted) { - this.clearJSValues(); - this.response.clearAborted(); - this.response.clearOnData(); - this.response.clearOnWritable(); - this.response.clearTimeout(); - if (!this.response.state().isHttpStatusCalled()) { - this.response.writeStatus("500 Internal Server Error"); + if (!this.request_has_completed and !this.socket_closed) { + const this_value = this.getThisValue(); + if (this_value != .zero) { + NodeHTTPResponse.onAbortedSetCached(this_value, globalObject, .zero); } - this.response.endStream(this.response.state().isHttpConnectionClose()); + this.raw_response.clearOnData(); + this.raw_response.clearOnWritable(); + this.raw_response.clearTimeout(); + if (!this.raw_response.state().isHttpStatusCalled()) { + this.raw_response.writeStatus("500 Internal Server Error"); + } + this.raw_response.endStream(this.raw_response.state().isHttpConnectionClose()); this.onRequestComplete(); } @@ -633,7 +648,6 @@ pub export fn Bun__NodeHTTPRequest__onReject(globalObject: *JSC.JSGlobalObject, pub fn clearJSValues(this: *NodeHTTPResponse) void { // Promise is handled separately. this.onWritableCallback.deinit(); - this.onAbortedCallback.deinit(); } pub fn abort(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue { @@ -643,17 +657,16 @@ pub fn abort(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callfra return .undefined; } - this.aborted = true; - const state = this.response.state(); + this.socket_closed = true; + const state = this.raw_response.state(); if (state.isHttpEndCalled()) { return .undefined; } - this.response.clearAborted(); - this.response.clearOnData(); - this.response.clearOnWritable(); - this.response.clearTimeout(); - this.response.endWithoutBody(true); + this.raw_response.clearOnData(); + this.raw_response.clearOnWritable(); + this.raw_response.clearTimeout(); + this.raw_response.endWithoutBody(true); this.onRequestComplete(); return .undefined; } @@ -733,7 +746,7 @@ fn onDrain(this: *NodeHTTPResponse, offset: u64, response: uws.AnyResponse) bool this.ref(); defer this.deref(); response.clearOnWritable(); - if (this.aborted or this.finished) { + if (this.socket_closed or this.request_has_completed) { return false; } const on_writable = this.onWritableCallback.trySwap() orelse return false; @@ -741,7 +754,7 @@ fn onDrain(this: *NodeHTTPResponse, offset: u64, response: uws.AnyResponse) bool const vm = globalThis.bunVM(); response.corked(JSC.EventLoop.runCallback, .{ vm.eventLoop(), on_writable, globalThis, .undefined, &.{JSC.JSValue.jsNumberFromUint64(offset)} }); - if (this.aborted or this.finished) { + if (this.socket_closed or this.request_has_completed) { return false; } @@ -752,13 +765,14 @@ fn writeOrEnd( this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, arguments: []const JSC.JSValue, + this_value: JSC.JSValue, comptime is_end: bool, ) bun.JSError!JSC.JSValue { if (this.isDone()) { return globalObject.ERR_STREAM_WRITE_AFTER_END("Stream already ended", .{}).throw(); } - const state = this.response.state(); + const state = this.raw_response.state(); if (!state.isResponsePending()) { return globalObject.ERR_STREAM_WRITE_AFTER_END("Stream already ended", .{}).throw(); } @@ -826,29 +840,33 @@ fn writeOrEnd( this.body_read_state = .none; } - this.response.clearAborted(); - this.response.clearOnWritable(); - this.response.clearTimeout(); + if (this_value != .zero) { + NodeHTTPResponse.onAbortedSetCached(this_value, globalObject, .zero); + } + + this.raw_response.clearAborted(); + this.raw_response.clearOnWritable(); + this.raw_response.clearTimeout(); this.ended = true; if (!state.isHttpWriteCalled() or bytes.len > 0) { - this.response.end(bytes, state.isHttpConnectionClose()); + this.raw_response.end(bytes, state.isHttpConnectionClose()); } else { - this.response.endStream(state.isHttpConnectionClose()); + this.raw_response.endStream(state.isHttpConnectionClose()); } this.onRequestComplete(); return JSC.JSValue.jsNumberFromUint64(bytes.len); } else { - switch (this.response.write(bytes)) { + switch (this.raw_response.write(bytes)) { .want_more => |written| { - this.response.clearOnWritable(); + this.raw_response.clearOnWritable(); this.onWritableCallback.clearWithoutDeallocation(); return JSC.JSValue.jsNumberFromUint64(written); }, .backpressure => |written| { if (callback_value != .undefined) { this.onWritableCallback.set(globalObject, callback_value.withAsyncContextIfNeeded(globalObject)); - this.response.onWritable(*NodeHTTPResponse, onDrain, this); + this.raw_response.onWritable(*NodeHTTPResponse, onDrain, this); } return JSC.JSValue.jsNumberFromInt64(-@as(i64, @intCast(written))); }, @@ -871,14 +889,21 @@ pub fn getOnWritable(this: *NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSValu } pub fn getOnAbort(this: *NodeHTTPResponse, _: *JSC.JSGlobalObject) JSC.JSValue { - return this.onAbortedCallback.get() orelse .undefined; + if (this.socket_closed) { + return .undefined; + } + return NodeHTTPResponse.onAbortedGetCached(this.getThisValue()) orelse .undefined; } pub fn setOnAbort(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, value: JSValue) bool { + if (this.socket_closed) { + return true; + } + if (this.isDone() or value == .undefined) { - this.onAbortedCallback.clearWithoutDeallocation(); + NodeHTTPResponse.onAbortedSetCached(this.getThisValue(), globalObject, .zero); } else { - this.onAbortedCallback.set(globalObject, value.withAsyncContextIfNeeded(globalObject)); + NodeHTTPResponse.onAbortedSetCached(this.getThisValue(), globalObject, value.withAsyncContextIfNeeded(globalObject)); } return true; @@ -904,8 +929,8 @@ pub fn setHasCustomOnData(this: *NodeHTTPResponse, _: *JSC.JSGlobalObject, value fn clearOnDataCallback(this: *NodeHTTPResponse) void { if (this.body_read_state != .none) { this.onDataCallback.deinit(); - if (!this.aborted) - this.response.clearOnData(); + if (!this.socket_closed) + this.raw_response.clearOnData(); if (this.body_read_state != .done) { this.body_read_state = .done; if (this.body_read_ref.has) { @@ -916,7 +941,7 @@ fn clearOnDataCallback(this: *NodeHTTPResponse) void { } pub fn setOnData(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, value: JSValue) bool { - if (value == .undefined or this.ended or this.aborted or this.body_read_state == .none or this.is_data_buffered_during_pause_last) { + if (value == .undefined or this.ended or this.socket_closed or this.body_read_state == .none or this.is_data_buffered_during_pause_last) { this.onDataCallback.deinit(); defer { if (this.body_read_ref.has) { @@ -926,8 +951,8 @@ pub fn setOnData(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, val } switch (this.body_read_state) { .pending, .done => { - if (!this.finished and !this.aborted) { - this.response.clearOnData(); + if (!this.request_has_completed and !this.socket_closed) { + this.raw_response.clearOnData(); } this.body_read_state = .done; }, @@ -938,7 +963,7 @@ pub fn setOnData(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, val this.onDataCallback.set(globalObject, value.withAsyncContextIfNeeded(globalObject)); this.hasCustomOnData = true; - this.response.onData(*NodeHTTPResponse, onData, this); + this.raw_response.onData(*NodeHTTPResponse, onData, this); this.is_data_buffered_during_pause = false; if (!this.body_read_ref.has) { @@ -952,12 +977,12 @@ pub fn setOnData(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, val pub fn write(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue { const arguments = callframe.arguments_old(3).slice(); - return writeOrEnd(this, globalObject, arguments, false); + return writeOrEnd(this, globalObject, arguments, .zero, false); } pub fn end(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue { const arguments = callframe.arguments_old(3).slice(); - return writeOrEnd(this, globalObject, arguments, true); + return writeOrEnd(this, globalObject, arguments, callframe.this(), true); } fn handleCorked(globalObject: *JSC.JSGlobalObject, function: JSC.JSValue, result: *JSValue, is_exception: *bool) void { @@ -968,17 +993,25 @@ fn handleCorked(globalObject: *JSC.JSGlobalObject, function: JSC.JSValue, result }; } +pub fn setTimeout(this: *NodeHTTPResponse, seconds: u8) void { + if (this.request_has_completed or this.socket_closed) { + return; + } + + this.raw_response.timeout(seconds); +} + export fn NodeHTTPResponse__setTimeout(this: *NodeHTTPResponse, seconds: JSC.JSValue, globalThis: *JSC.JSGlobalObject) bool { if (!seconds.isNumber()) { globalThis.throwInvalidArgumentTypeValue("timeout", "number", seconds) catch {}; return false; } - if (this.finished or this.aborted) { + if (this.request_has_completed or this.socket_closed) { return false; } - this.response.timeout(@intCast(@min(seconds.to(c_uint), 255))); + this.raw_response.timeout(@intCast(@min(seconds.to(c_uint), 255))); return true; } @@ -992,7 +1025,7 @@ pub fn cork(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callfram return globalObject.throwInvalidArgumentTypeValue("cork", "function", arguments[0]); } - if (this.finished or this.aborted or this.closed) { + if (this.request_has_completed or this.socket_closed) { return globalObject.ERR_STREAM_ALREADY_FINISHED("Stream is already ended", .{}).throw(); } @@ -1001,7 +1034,7 @@ pub fn cork(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callfram this.ref(); defer this.deref(); - this.response.corked(handleCorked, .{ globalObject, arguments[0], &result, &is_exception }); + this.raw_response.corked(handleCorked, .{ globalObject, arguments[0], &result, &is_exception }); if (is_exception) { if (result != .zero) { @@ -1026,12 +1059,12 @@ pub fn deinit(this: *NodeHTTPResponse) void { bun.debugAssert(!this.body_read_ref.has); bun.debugAssert(!this.js_ref.has); bun.debugAssert(!this.is_request_pending); - bun.debugAssert(this.aborted or this.finished); + bun.debugAssert(this.socket_closed or this.request_has_completed); this.buffered_request_body_data_during_pause.deinitWithAllocator(bun.default_allocator); this.js_ref.unref(JSC.VirtualMachine.get()); this.body_read_ref.unref(JSC.VirtualMachine.get()); - this.onAbortedCallback.deinit(); + this.onDataCallback.deinit(); this.onWritableCallback.deinit(); this.promise.deinit(); @@ -1043,8 +1076,12 @@ comptime { } extern "c" fn Bun__setNodeHTTPServerSocketUsSocketValue(JSC.JSValue, ?*anyopaque) void; +pub export fn Bun__NodeHTTPResponse_onClose(response: *NodeHTTPResponse, js_value: JSC.JSValue) void { + response.onAbort(js_value); +} + pub export fn Bun__NodeHTTPResponse_setClosed(response: *NodeHTTPResponse) void { - response.closed = true; + response.socket_closed = true; } const NodeHTTPResponse = @This(); diff --git a/src/bun.js/bindings/NodeHTTP.cpp b/src/bun.js/bindings/NodeHTTP.cpp index b8605c7b09..614c6e3a3c 100644 --- a/src/bun.js/bindings/NodeHTTP.cpp +++ b/src/bun.js/bindings/NodeHTTP.cpp @@ -26,7 +26,7 @@ extern "C" uint64_t uws_res_get_remote_address_info(void* res, const char** dest extern "C" uint64_t uws_res_get_local_address_info(void* res, const char** dest, int* port, bool* is_ipv6); extern "C" void Bun__NodeHTTPResponse_setClosed(void* zigResponse); - +extern "C" void Bun__NodeHTTPResponse_onClose(void* zigResponse, JSC::EncodedJSValue jsValue); namespace Bun { using namespace JSC; @@ -183,20 +183,27 @@ public: [](auto& spaces, auto&& space) { spaces.m_subspaceForJSNodeHTTPServerSocket = std::forward(space); }); } + void detach() + { + this->m_duplex.clear(); + this->currentResponseObject.clear(); + this->strongThis.clear(); + } + void onClose() { this->socket = nullptr; - this->m_duplex.clear(); if (auto* res = this->currentResponseObject.get(); res != nullptr && res->m_ctx != nullptr) { Bun__NodeHTTPResponse_setClosed(res->m_ctx); } - this->currentResponseObject.clear(); // This function can be called during GC! Zig::GlobalObject* globalObject = static_cast(this->globalObject()); if (!functionToCallOnClose) { - this->strongThis.clear(); - + if (auto* res = this->currentResponseObject.get(); res != nullptr && res->m_ctx != nullptr) { + Bun__NodeHTTPResponse_onClose(res->m_ctx, JSValue::encode(res)); + } + this->detach(); return; } @@ -209,7 +216,10 @@ public: auto* thisObject = self; auto* callbackObject = thisObject->functionToCallOnClose.get(); if (!callbackObject) { - self->strongThis.clear(); + if (auto* res = thisObject->currentResponseObject.get(); res != nullptr && res->m_ctx != nullptr) { + Bun__NodeHTTPResponse_onClose(res->m_ctx, JSValue::encode(res)); + } + thisObject->detach(); return; } auto callData = JSC::getCallData(callbackObject); @@ -217,6 +227,10 @@ public: EnsureStillAliveScope ensureStillAlive(self); if (globalObject->scriptExecutionStatus(globalObject, thisObject) == ScriptExecutionStatus::Running) { + if (auto* res = thisObject->currentResponseObject.get(); res != nullptr && res->m_ctx != nullptr) { + Bun__NodeHTTPResponse_onClose(res->m_ctx, JSValue::encode(res)); + } + profiledCall(globalObject, JSC::ProfilingReason::API, callbackObject, callData, thisObject, args, exception); if (auto* ptr = exception.get()) { @@ -224,7 +238,7 @@ public: globalObject->reportUncaughtExceptionAtEventLoop(globalObject, ptr); } } - self->strongThis.clear(); + thisObject->detach(); }); } } @@ -252,6 +266,7 @@ JSC_DEFINE_HOST_FUNCTION(jsFunctionNodeHTTPServerSocketClose, (JSC::JSGlobalObje return JSValue::encode(JSC::jsUndefined()); } thisObject->close(); + return JSValue::encode(JSC::jsUndefined()); } diff --git a/src/js/node/http.ts b/src/js/node/http.ts index a5633bc087..d6c287dd20 100644 --- a/src/js/node/http.ts +++ b/src/js/node/http.ts @@ -702,6 +702,7 @@ function onRequestEvent(event) { function onServerRequestEvent(this: NodeHTTPServerSocket, event: NodeHTTPResponseAbortEvent) { const server: Server = this?.server; + const socket: NodeHTTPServerSocket = this; switch (event) { case NodeHTTPResponseAbortEvent.abort: { @@ -936,6 +937,7 @@ const ServerPrototype = { [kHandle]: handle, }); isNextIncomingMessageHTTPS = prevIsNextIncomingMessageHTTPS; + handle.onabort = onServerRequestEvent.bind(socket); drainMicrotasks(); let capturedError; @@ -951,7 +953,6 @@ const ServerPrototype = { let resolveFunction; let didFinish = false; - handle.onabort = onServerRequestEvent.bind(socket); const isRequestsLimitSet = typeof server.maxRequestsPerSocket === "number" && server.maxRequestsPerSocket > 0; let reachedRequestsLimit = false; if (isRequestsLimitSet) { diff --git a/test/js/node/http/node-http-uaf-fixture.ts b/test/js/node/http/node-http-uaf-fixture.ts index 68202ffe6b..61c8b2fc4b 100644 --- a/test/js/node/http/node-http-uaf-fixture.ts +++ b/test/js/node/http/node-http-uaf-fixture.ts @@ -1,6 +1,6 @@ import express from "express"; import bodyParser from "body-parser"; -import { fetch } from "undici"; + import { setTimeout as sleep } from "node:timers/promises"; const CONCURRENCY = 100; @@ -18,7 +18,7 @@ app.post("/error", (req, res) => { } }); -const server = app.listen(0, async () => { +var server = app.listen(0, async () => { const port = server.address().port; console.log(`Server running on http://localhost:${port}`); @@ -27,7 +27,7 @@ const server = app.listen(0, async () => { async function makeRequest(id) { const controller = new AbortController(); - setTimeout(() => controller.abort(), Math.random() * 5 + 1); + const secondPromise = setTimeout(() => controller.abort(), Math.random() * 5 + 1); try { await fetch(`http://localhost:${port}/error`, { @@ -38,6 +38,10 @@ const server = app.listen(0, async () => { }).catch(() => {}); } catch (e) {} + try { + await secondPromise; + } catch (e) {} + active.delete(id); } @@ -49,5 +53,11 @@ const server = app.listen(0, async () => { active.add(i); makeRequest(i); + if (i > 0 && i % 1000 === 0) { + console.count("Completed request x 1000"); + } } + + console.log("Done"); + server.close(); }); diff --git a/test/js/node/http/node-http-uaf.test.ts b/test/js/node/http/node-http-uaf.test.ts index 34955e5c46..5435921db5 100644 --- a/test/js/node/http/node-http-uaf.test.ts +++ b/test/js/node/http/node-http-uaf.test.ts @@ -4,14 +4,15 @@ import { bunExe, bunEnv } from "harness"; test("should not crash on abort", async () => { for (let i = 0; i < 2; i++) { - const { exitCode, signalCode } = await Bun.spawnSync({ + const { exited } = Bun.spawn({ cmd: [bunExe(), join(import.meta.dir, "node-http-uaf-fixture.ts")], env: bunEnv, stdout: "ignore", stderr: "ignore", stdin: "ignore", }); + const exitCode = await exited; expect(exitCode).not.toBeNull(); - expect(signalCode).toBeUndefined(); + expect(exitCode).toBe(0); } }); diff --git a/test/js/node/test/parallel/test-http-request-invalid-method-error.js b/test/js/node/test/parallel/test-http-request-invalid-method-error.js new file mode 100644 index 0000000000..20897f0891 --- /dev/null +++ b/test/js/node/test/parallel/test-http-request-invalid-method-error.js @@ -0,0 +1,13 @@ +'use strict'; +require('../common'); +const assert = require('assert'); +const http = require('http'); + +assert.throws( + () => http.request({ method: '\0' }), + { + code: 'ERR_INVALID_HTTP_TOKEN', + name: 'TypeError', + message: 'Method must be a valid HTTP token ["\u0000"]' + } +); diff --git a/test/js/node/test/parallel/test-http-response-remove-header-after-sent.js b/test/js/node/test/parallel/test-http-response-remove-header-after-sent.js new file mode 100644 index 0000000000..b5c0defac9 --- /dev/null +++ b/test/js/node/test/parallel/test-http-response-remove-header-after-sent.js @@ -0,0 +1,24 @@ +'use strict'; +require('../common'); +const assert = require('assert'); +const http = require('http'); + +const server = http.createServer((req, res) => { + res.removeHeader('header1', 1); + res.write('abc'); + assert.throws( + () => res.removeHeader('header2', 2), + { + code: 'ERR_HTTP_HEADERS_SENT', + name: 'Error', + message: 'Cannot remove headers after they are sent to the client' + } + ); + res.end(); +}); + +server.listen(0, () => { + http.get({ port: server.address().port }, () => { + server.close(); + }); +}); diff --git a/test/js/third_party/body-parser/express-memory-leak-fixture.mjs b/test/js/third_party/body-parser/express-memory-leak-fixture.mjs new file mode 100644 index 0000000000..607f522497 --- /dev/null +++ b/test/js/third_party/body-parser/express-memory-leak-fixture.mjs @@ -0,0 +1,67 @@ +import express from "express"; + +const app = express(); +const port = 0; +const body = Buffer.alloc(10 * 1024, "X"); + +// Empty endpoint with no body +app.get("/empty", (req, res) => { + res.end(); +}); + +// Endpoint that consumes request body +app.post("/request-body", express.json({ limit: "10mb" }), (req, res) => { + // Just consume the body and do nothing with it + + res.end(); +}); + +// Endpoint that sends response body +app.get("/response-body", (req, res) => { + res.send(body); +}); + +// Special RSS endpoint to check memory usage from inside the process +app.get("/rss", (req, res) => { + typeof Bun !== "undefined" && Bun.gc(true); + res.json({ + rss: process.memoryUsage.rss(), + objects: smallAssign(typeof Bun !== "undefined" ? require("bun:jsc").heapStats().objectTypeCounts : {}), + }); +}); + +function smallAssign(obj) { + for (let k in obj) { + if (obj[k] < 100) delete obj[k]; + } + + return obj; +} + +// The /aborted endpoint +app.post("/aborted", express.raw({ limit: "100mb" }), (req, res) => { + // This endpoint should receive an aborted request + // The test will abort before finishing the request body + res.status(200).end(); +}); + +// Start the server +const server = app.listen(port, () => { + const address = server.address(); + + // Send the port back to the parent process via IPC + process.send({ + type: "listening", + host: address.address === "::" ? "localhost" : address.address, + port: address.port, + }); +}); + +// Handle shutdown request from parent +process.on("message", message => { + if (message.type === "shutdown") { + server.close(() => { + process.exit(0); + }); + } +}); diff --git a/test/js/third_party/body-parser/express-memory-leak.test.ts b/test/js/third_party/body-parser/express-memory-leak.test.ts new file mode 100644 index 0000000000..99e16c1a9c --- /dev/null +++ b/test/js/third_party/body-parser/express-memory-leak.test.ts @@ -0,0 +1,262 @@ +import { test, expect } from "bun:test"; +import { spawn, ChildProcess } from "child_process"; +import { bunEnv, bunExe, isCI } from "harness"; +import { join } from "path"; + +const REQUESTS_COUNT = 50000; +const BATCH_SIZE = 50; + +interface ServerInfo { + host: string; + port: number; +} + +async function spawnServer(): Promise<{ child: ChildProcess; serverInfo: ServerInfo }> { + return new Promise((resolve, reject) => { + const child = spawn(bunExe(), [join(import.meta.dir, "express-memory-leak-fixture.mjs")], { + stdio: ["inherit", "inherit", "inherit", "ipc"], + env: bunEnv, + + serialization: "json", + }); + + console.log("Spawned", child.pid); + + child.on("message", (message: any) => { + if (message.type === "listening") { + resolve({ + child, + serverInfo: { + host: message.host, + port: message.port, + }, + }); + } + }); + + child.on("error", err => { + reject(err); + }); + + child.on("exit", code => { + if (code !== 0 && code !== null) { + reject(new Error(`Server process exited with code ${code}`)); + } + }); + }); +} + +async function runMemoryTest(endpoint: string, options: RequestInit = {}) { + // Start the fixture server in a separate process + const { child, serverInfo } = await spawnServer(); + + try { + // Run first batch of requests + await runRequestBatch(serverInfo, endpoint, REQUESTS_COUNT, BATCH_SIZE, options); + + // Check memory after first batch + const rss1 = await getMemoryUsage(serverInfo); + console.log(rss1.objects); + console.log(`After ${REQUESTS_COUNT} requests: RSS = ${formatBytes(rss1.rss)}`); + + // Run second batch of requests + await runRequestBatch(serverInfo, endpoint, REQUESTS_COUNT, BATCH_SIZE, options); + + // Check memory after second batch + const rss2 = await getMemoryUsage(serverInfo); + console.log(rss2.objects); + console.log(`After ${REQUESTS_COUNT * 2} requests: RSS = ${formatBytes(rss2.rss)}`); + + // Analyze memory growth + const ratio = rss2.rss / rss1.rss; + console.log(`Memory growth ratio: ${ratio.toFixed(2)}x`); + + // A memory leak would show a significant increase + // We use 1.5x as a threshold - in practice you might need to tune this + expect(ratio).toBeLessThan(1.5); + } finally { + // Shutdown the server + if (child.connected) { + child.send({ type: "shutdown" }); + } else { + child.kill(); + } + + // Wait for the process to exit + await new Promise(resolve => { + child.on("exit", () => resolve()); + setTimeout(() => { + child.kill("SIGKILL"); + resolve(); + }, 1000).unref(); + }); + } +} + +async function runRequestBatch( + serverInfo: ServerInfo, + endpoint: string, + total: number, + batchSize: number, + options: RequestInit = {}, +) { + const url = `http://${serverInfo.host}:${serverInfo.port}${endpoint}`; + + for (let i = 0; i < total; i += batchSize) { + const batch = []; + for (let j = 0; j < batchSize && i + j < total; j++) { + batch.push( + fetch(url, options) + .then(r => r.blob()) + .catch(e => { + if (url.endsWith("/aborted")) { + return; + } + + throw e; + }), + ); + } + await Promise.all(batch); + + // Log progress every 10% complete + if (i % (total / 10) < batchSize) { + console.log(`Completed ${i + batchSize} / ${total} requests`); + } + } +} + +async function getMemoryUsage(serverInfo: ServerInfo): Promise<{ rss: number; objects: Record }> { + const url = `http://${serverInfo.host}:${serverInfo.port}/rss`; + const response = await fetch(url); + const data = await response.json(); + return data; +} + +function formatBytes(bytes: number): string { + if (bytes === 0) return "0 Bytes"; + const k = 1024; + const sizes = ["Bytes", "KB", "MB", "GB"]; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + " " + sizes[i]; +} + +const body = new Blob([Buffer.alloc(1024 * 512, "X")]); + +test( + "memory leak check - empty response", + async () => { + await runMemoryTest("/empty"); + }, + 1000 * 20, +); + +test( + "memory leak check - request body", + async () => { + const body = JSON.stringify({ data: "X".repeat(10 * 1024) }); // 10KB JSON + await runMemoryTest("/request-body", { + method: "POST", + body, + headers: { "Content-Type": "application/json" }, + }); + }, + 1000 * 20, +); + +test( + "memory leak check - response body", + async () => { + await runMemoryTest("/response-body"); + }, + 1000 * 20, +); + +async function createAbortedRequestBatch(serverInfo: ServerInfo): Promise { + const url = `http://${serverInfo.host}:${serverInfo.port}/aborted`; + let signal = new AbortController(); + + let batch = new Array(BATCH_SIZE); + for (let i = 0; i < BATCH_SIZE; i++) { + batch[i] = fetch(url, { + method: "POST", + body, + signal: signal.signal, + }) + .then(r => r.blob()) + .catch(e => {}); + } + + await Bun.sleep(1); + signal.abort(); + await Promise.allSettled(batch); +} + +test.todoIf(isCI)( + "memory leak check - aborted requests", + async () => { + // Start the fixture server in a separate process + const { child, serverInfo } = await spawnServer(); + + try { + // Run first batch of aborted requests + for (let i = 0; i < REQUESTS_COUNT; i += BATCH_SIZE) { + await createAbortedRequestBatch(serverInfo); + + // Log progress every 10% complete + if (i % (REQUESTS_COUNT / 10) < BATCH_SIZE) { + console.log(`Completed ${i + BATCH_SIZE} / ${REQUESTS_COUNT} aborted requests`); + } + } + + // Check memory after first batch + const rss1 = await getMemoryUsage(serverInfo); + console.log(rss1.objects); + console.log(`After ${REQUESTS_COUNT} aborted requests: RSS = ${formatBytes(rss1.rss)}`); + + // Run garbage collection if available + if (typeof Bun !== "undefined") { + Bun.gc(true); + } + + // Run second batch of aborted requests + for (let i = 0; i < REQUESTS_COUNT; i += BATCH_SIZE) { + await createAbortedRequestBatch(serverInfo); + + // Log progress every 10% complete + if (i % (REQUESTS_COUNT / 10) < BATCH_SIZE) { + console.log(`Completed ${REQUESTS_COUNT + i + BATCH_SIZE} / ${REQUESTS_COUNT * 2} aborted requests`); + } + } + + // Check memory after second batch + const rss2 = await getMemoryUsage(serverInfo); + console.log(rss1.objects); + console.log(`After ${REQUESTS_COUNT * 2} aborted requests: RSS = ${formatBytes(rss2.rss)}`); + + // Analyze memory growth + const ratio = rss2.rss / rss1.rss; + console.log(`Memory growth ratio: ${ratio.toFixed(2)}x`); + + // A memory leak would show a significant increase + expect(ratio).toBeLessThan(1.5); + } finally { + // Shutdown the server + if (child.connected) { + child.send({ type: "shutdown" }); + } else { + child.kill(); + } + + // Wait for the process to exit + await new Promise(resolve => { + child.on("exit", () => resolve()); + setTimeout(() => { + child.kill("SIGKILL"); + resolve(); + }, 1000).unref(); + }); + } + }, + 20000, +);