From 3395774c8c88925c0c8f3e69f57c526ad2e8c8f5 Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Thu, 9 Oct 2025 22:56:49 -0700 Subject: [PATCH] improve(node:http): uncork after flushing headers to ensure data is sent immediately (#23413) ### What does this PR do? Calls `uncork()` after flushing response headers to ensure data is sent as soon as possible, improving responsiveness. This behavior still works correctly even without the explicit `uncork()` call, due to the deferred uncork logic implemented here: https://github.com/oven-sh/bun/blob/6e3359dd16aced2f6fca2a8e2de71f09e0bcb3cb/packages/bun-uws/src/Loop.h#L57-L64 A test already covers this scenario in `test/js/node/test/parallel/test-http-flush-response-headers.js`. ### How did you verify your code works? CI --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- packages/bun-uws/src/HttpResponse.h | 6 +++- src/bun.js/api/server/NodeHTTPResponse.zig | 38 ++++++++++++++++++++-- src/deps/libuwsockets.cpp | 16 +++++++-- src/deps/uws/Response.zig | 35 +++++++++++++------- src/js/node/_http_client.ts | 2 +- src/js/node/_http_incoming.ts | 6 +++- test/js/node/http/node-http.test.ts | 23 +++++++++++++ 7 files changed, 106 insertions(+), 20 deletions(-) diff --git a/packages/bun-uws/src/HttpResponse.h b/packages/bun-uws/src/HttpResponse.h index 974a4a95f6..6d40523508 100644 --- a/packages/bun-uws/src/HttpResponse.h +++ b/packages/bun-uws/src/HttpResponse.h @@ -478,7 +478,7 @@ public: return internalEnd({nullptr, 0}, 0, false, false, closeConnection); } - void flushHeaders() { + void flushHeaders(bool flushImmediately = false) { writeStatus(HTTP_200_OK); @@ -499,6 +499,10 @@ public: Super::write("\r\n", 2); httpResponseData->state |= HttpResponseData::HTTP_WRITE_CALLED; } + if (flushImmediately) { + /* Uncork the socket to send data to the client immediately */ + this->uncork(); + } } /* Write parts of the response in chunking fashion. Starts timeout if failed. */ bool write(std::string_view data, size_t *writtenPtr = nullptr) { diff --git a/src/bun.js/api/server/NodeHTTPResponse.zig b/src/bun.js/api/server/NodeHTTPResponse.zig index 4baba2f863..67bc27c3a0 100644 --- a/src/bun.js/api/server/NodeHTTPResponse.zig +++ b/src/bun.js/api/server/NodeHTTPResponse.zig @@ -33,6 +33,8 @@ bytes_written: usize = 0, upgrade_context: UpgradeCTX = .{}, +auto_flusher: AutoFlusher = .{}, + pub const Flags = packed struct(u8) { socket_closed: bool = false, request_has_completed: bool = false, @@ -244,6 +246,7 @@ pub fn dumpRequestBody(this: *NodeHTTPResponse, globalObject: *jsc.JSGlobalObjec fn markRequestAsDone(this: *NodeHTTPResponse) void { log("markRequestAsDone()", .{}); + defer this.deref(); this.flags.is_request_pending = false; this.clearOnDataCallback(this.getThisValue(), jsc.VirtualMachine.get().global); @@ -252,7 +255,8 @@ fn markRequestAsDone(this: *NodeHTTPResponse) void { this.buffered_request_body_data_during_pause.clearAndFree(bun.default_allocator); const server = this.server; this.poll_ref.unref(jsc.VirtualMachine.get()); - this.deref(); + this.unregisterAutoFlush(); + server.onRequestComplete(); } @@ -1068,9 +1072,36 @@ pub fn write(this: *NodeHTTPResponse, globalObject: *jsc.JSGlobalObject, callfra return writeOrEnd(this, globalObject, arguments, .zero, false); } +pub fn onAutoFlush(this: *NodeHTTPResponse) bool { + defer this.deref(); + if (!this.flags.socket_closed and !this.flags.upgraded and this.raw_response != null) { + this.raw_response.?.uncork(); + } + this.auto_flusher.registered = false; + return false; +} + +fn registerAutoFlush(this: *NodeHTTPResponse) void { + if (this.auto_flusher.registered) return; + this.ref(); + AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(NodeHTTPResponse, this, jsc.VirtualMachine.get()); +} + +fn unregisterAutoFlush(this: *NodeHTTPResponse) void { + if (!this.auto_flusher.registered) return; + AutoFlusher.unregisterDeferredMicrotaskWithTypeUnchecked(NodeHTTPResponse, this, jsc.VirtualMachine.get()); + this.deref(); +} + pub fn flushHeaders(this: *NodeHTTPResponse, _: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { - if (!this.flags.socket_closed and !this.flags.upgraded and this.raw_response != null) - this.raw_response.?.flushHeaders(); + if (!this.flags.socket_closed and !this.flags.upgraded and this.raw_response != null) { + const raw_response = this.raw_response.?; + // Don’t flush immediately; queue a microtask to uncork the socket. + raw_response.flushHeaders(false); + if (raw_response.isCorked()) { + this.registerAutoFlush(); + } + } return .js_undefined; } @@ -1199,6 +1230,7 @@ const jsc = bun.jsc; const JSGlobalObject = jsc.JSGlobalObject; const JSValue = jsc.JSValue; const ZigString = jsc.ZigString; +const AutoFlusher = jsc.WebCore.AutoFlusher; const AnyServer = jsc.API.AnyServer; const ServerWebSocket = jsc.API.ServerWebSocket; diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index 64372ffc58..991a66af0d 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1801,13 +1801,23 @@ __attribute__((callback (corker, ctx))) } } - void uws_res_flush_headers(int ssl, uws_res_r res) { + void uws_res_flush_headers(int ssl, uws_res_r res, bool flushImmediately) { if (ssl) { uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; - uwsRes->flushHeaders(); + uwsRes->flushHeaders(flushImmediately); } else { uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; - uwsRes->flushHeaders(); + uwsRes->flushHeaders(flushImmediately); + } + } + + bool uws_res_is_corked(int ssl, uws_res_r res) { + if (ssl) { + uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; + return uwsRes->isCorked(); + } else { + uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; + return uwsRes->isCorked(); } } diff --git a/src/deps/uws/Response.zig b/src/deps/uws/Response.zig index 757392d78a..291aa43627 100644 --- a/src/deps/uws/Response.zig +++ b/src/deps/uws/Response.zig @@ -42,10 +42,12 @@ pub fn NewResponse(ssl_flag: i32) type { return c.uws_res_is_connect_request(ssl_flag, res.downcast()); } - pub fn flushHeaders(res: *Response) void { - c.uws_res_flush_headers(ssl_flag, res.downcast()); + pub fn flushHeaders(res: *Response, flushImmediately: bool) void { + c.uws_res_flush_headers(ssl_flag, res.downcast(), flushImmediately); + } + pub fn isCorked(res: *Response) bool { + return c.uws_res_is_corked(ssl_flag, res.downcast()); } - pub fn state(res: *const Response) State { return c.uws_res_state(ssl_flag, @as(*const c.uws_res, @ptrCast(@alignCast(res)))); } @@ -58,11 +60,11 @@ pub fn NewResponse(ssl_flag: i32) type { c.uws_res_prepare_for_sendfile(ssl_flag, res.downcast()); } - pub fn uncork(_: *Response) void { - // c.uws_res_uncork( - // ssl_flag, - // res.downcast(), - // ); + pub fn uncork(res: *Response) void { + c.uws_res_uncork( + ssl_flag, + res.downcast(), + ); } pub fn pause(res: *Response) void { c.uws_res_pause(ssl_flag, res.downcast()); @@ -377,9 +379,19 @@ pub const AnyResponse = union(enum) { inline else => |resp| resp.getRemoteSocketInfo(), }; } - pub fn flushHeaders(this: AnyResponse) void { + pub fn flushHeaders(this: AnyResponse, flushImmediately: bool) void { switch (this) { - inline else => |resp| resp.flushHeaders(), + inline else => |resp| resp.flushHeaders(flushImmediately), + } + } + pub fn isCorked(this: AnyResponse) bool { + return switch (this) { + inline else => |resp| resp.isCorked(), + }; + } + pub fn uncork(this: AnyResponse) void { + switch (this) { + inline else => |resp| resp.uncork(), } } pub fn getWriteOffset(this: AnyResponse) u64 { @@ -658,7 +670,8 @@ const c = struct { pub extern fn uws_res_get_remote_address_info(res: *c.uws_res, dest: *[*]const u8, port: *i32, is_ipv6: *bool) usize; pub extern fn uws_res_uncork(ssl: i32, res: *c.uws_res) void; pub extern fn uws_res_end(ssl: i32, res: *c.uws_res, data: [*c]const u8, length: usize, close_connection: bool) void; - pub extern fn uws_res_flush_headers(ssl: i32, res: *c.uws_res) void; + pub extern fn uws_res_flush_headers(ssl: i32, res: *c.uws_res, flushImmediately: bool) void; + pub extern fn uws_res_is_corked(ssl: i32, res: *c.uws_res) bool; pub extern fn uws_res_get_socket_data(ssl: i32, res: *c.uws_res) ?*uws.SocketData; pub extern fn uws_res_pause(ssl: i32, res: *c.uws_res) void; pub extern fn uws_res_resume(ssl: i32, res: *c.uws_res) void; diff --git a/src/js/node/_http_client.ts b/src/js/node/_http_client.ts index d6299c2b80..bd2c7c687d 100644 --- a/src/js/node/_http_client.ts +++ b/src/js/node/_http_client.ts @@ -250,7 +250,7 @@ function ClientRequest(input, options, cb) { const onAbort = (_err?: Error) => { this[kClearTimeout]?.(); socketCloseListener(); - if (!this[abortedSymbol]) { + if (!this[abortedSymbol] && !this?.res?.complete) { process.nextTick(emitAbortNextTick, this); this[abortedSymbol] = true; } diff --git a/src/js/node/_http_incoming.ts b/src/js/node/_http_incoming.ts index aa8c1b993c..80e0dfa719 100644 --- a/src/js/node/_http_incoming.ts +++ b/src/js/node/_http_incoming.ts @@ -26,6 +26,7 @@ const { headersTuple, webRequestOrResponseHasBodyValue, getCompleteWebRequestOrResponseBodyValueAsArrayBuffer, + kAbortController, } = require("internal/http"); const { FakeSocket } = require("internal/http/FakeSocket"); @@ -307,7 +308,6 @@ const IncomingMessagePrototype = { if (isAbortError(err)) { err = undefined; } - var nodeHTTPResponse = this[kHandle]; if (nodeHTTPResponse) { this[kHandle] = undefined; @@ -333,6 +333,10 @@ const IncomingMessagePrototype = { socket.destroy(err); } } + const req = this.req; + if (req && !this.complete) { + req[kAbortController]?.abort?.(); + } if ($isCallable(cb)) { emitErrorNextTickIfErrorListenerNT(this, err, cb); diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index dafa58ffde..aaf0d24a93 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -1677,4 +1677,27 @@ describe("HTTP Server Security Tests - Advanced", () => { await doInvalidRequests(address); await Promise.all(clientErrors); }); + + test("flushHeaders should send the headers immediately", async () => { + let headers_sent_at: number = 0; + + let server_res: http.ServerResponse | undefined; + await using server = http.createServer(async (req, res) => { + res.writeHead(200, { "Content-Type": "text/plain" }); + headers_sent_at = Date.now(); + server_res = res; + res.flushHeaders(); + }); + + await once(server.listen(0, "127.0.0.1"), "listening"); + const address = server.address() as AddressInfo; + const response = await fetch(`http://127.0.0.1:${address.port}`); + expect(Date.now() - headers_sent_at).toBeLessThan(100); + expect(server_res).toBeDefined(); + server_res!.write("Hello", () => { + server_res!.end(" World"); + }); + const text = await response.text(); + expect(text).toBe("Hello World"); + }); });