From 8d1f69650cc954ec571a03b2d968fbd29c990235 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Wed, 30 Jul 2025 23:49:42 -0700 Subject: [PATCH] Fixes "Stream is already ended" error when cancelling a request (#21481) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What does this PR do? if you spam the refresh button in `next dev`, we print this error: ``` ⨯ Error: Stream is already ended at writeHead (null) at (null) at (null) at (null) at (null) at (null) at (null) at (null) at processTicksAndRejections (null) { digest: '2259044225', code: 'ERR_STREAM_ALREADY_FINISHED', toString: [Function: toString] } ⨯ Error: failed to pipe response at processTicksAndRejections (unknown:7:39) { [cause]: Error: Stream is already ended at writeHead (null) at (null) at (null) at (null) at (null) at (null) at (null) at (null) at processTicksAndRejections (null) { digest: '2259044225', code: 'ERR_STREAM_ALREADY_FINISHED', toString: [Function: toString] } } ⨯ Error: failed to pipe response at processTicksAndRejections (unknown:7:39) { page: '/', [cause]: Error: Stream is already ended at writeHead (null) at (null) at (null) at (null) at (null) at (null) at (null) at (null) at processTicksAndRejections (null) { digest: '2259044225', code: 'ERR_STREAM_ALREADY_FINISHED', toString: [Function: toString] } } ``` If the socket is already closed when writeHead is called, we're supposed to silently ignore it instead of throwing an error . The close event is supposed to be emitted on the next tick. Now, I think there are also cases where we do not emit the close event which is similarly bad. ### How did you verify your code works? Need to go through the node http server tests and see if any new ones pass. Also maybe some will fail on this PR, let's see. --- src/bun.js/api/server/NodeHTTPResponse.zig | 49 ++++++++++++++++++---- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/src/bun.js/api/server/NodeHTTPResponse.zig b/src/bun.js/api/server/NodeHTTPResponse.zig index 58c9769ce0..d24a7c1d02 100644 --- a/src/bun.js/api/server/NodeHTTPResponse.zig +++ b/src/bun.js/api/server/NodeHTTPResponse.zig @@ -43,6 +43,15 @@ pub const Flags = packed struct(u8) { is_data_buffered_during_pause: bool = false, /// Did we receive the last chunk of data during pause? is_data_buffered_during_pause_last: bool = false, + + /// Did the user end the request? + pub fn isRequestedCompletedOrEnded(this: *const Flags) bool { + return this.request_has_completed or this.ended; + } + + pub fn isDone(this: *const Flags) bool { + return this.isRequestedCompletedOrEnded() or this.socket_closed; + } }; pub const UpgradeCTX = struct { @@ -334,6 +343,14 @@ pub fn create( return js_this; } +fn isDone(this: *const NodeHTTPResponse) bool { + return this.flags.isDone(); +} + +fn isRequestedCompletedOrEnded(this: *const NodeHTTPResponse) bool { + return this.flags.isRequestedCompletedOrEnded(); +} + pub fn setOnAbortedHandler(this: *NodeHTTPResponse) void { if (this.flags.socket_closed) { return; @@ -346,10 +363,6 @@ pub fn setOnAbortedHandler(this: *NodeHTTPResponse) void { this.upgrade_context.preserveWebSocketHeadersIfNeeded(); } -fn isDone(this: *const NodeHTTPResponse) bool { - return this.flags.request_has_completed or this.flags.ended or this.flags.socket_closed; -} - pub fn getEnded(this: *const NodeHTTPResponse, _: *jsc.JSGlobalObject) jsc.JSValue { return jsc.JSValue.jsBoolean(this.flags.ended); } @@ -430,10 +443,15 @@ extern "C" fn NodeHTTPServer__writeHead_https( pub fn writeHead(this: *NodeHTTPResponse, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { const arguments = callframe.argumentsUndef(3).slice(); - if (this.isDone()) { + if (this.isRequestedCompletedOrEnded()) { return globalObject.ERR(.STREAM_ALREADY_FINISHED, "Stream is already ended", .{}).throw(); } + if (this.flags.socket_closed) { + // We haven't emitted the "close" event yet. + return .js_undefined; + } + const state = this.raw_response.state(); try handleEndedIfNecessary(state, globalObject); @@ -773,6 +791,7 @@ fn onDrain(this: *NodeHTTPResponse, offset: u64, response: uws.AnyResponse) bool // return false means we don't have anything to drain return false; } + const thisValue = this.getThisValue(); const on_writable = js.onWritableGetCached(thisValue) orelse return false; const globalThis = jsc.VirtualMachine.get().global; @@ -791,10 +810,22 @@ fn writeOrEnd( this_value: jsc.JSValue, comptime is_end: bool, ) bun.JSError!jsc.JSValue { - if (this.isDone()) { + if (this.isRequestedCompletedOrEnded()) { return globalObject.ERR(.STREAM_WRITE_AFTER_END, "Stream already ended", .{}).throw(); } + // Loosely mimicking this code: + // function _writeRaw(data, encoding, callback, size) { + // const conn = this[kSocket]; + // if (conn?.destroyed) { + // // The socket was destroyed. If we're still trying to write to it, + // // then we haven't gotten the 'close' event yet. + // return false; + // } + if (this.flags.socket_closed) { + return if (is_end) .js_undefined else jsc.JSValue.jsNumber(0); + } + const state = this.raw_response.state(); if (!state.isResponsePending()) { return globalObject.ERR(.STREAM_WRITE_AFTER_END, "Stream already ended", .{}).throw(); @@ -943,7 +974,7 @@ pub fn setOnAbort(this: *NodeHTTPResponse, thisValue: jsc.JSValue, globalObject: return; } - if (this.isDone() or value.isUndefined()) { + if (this.isRequestedCompletedOrEnded() or value.isUndefined()) { js.onAbortedSetCached(thisValue, globalObject, .zero); } else { js.onAbortedSetCached(thisValue, globalObject, value.withAsyncContextIfNeeded(globalObject)); @@ -1017,7 +1048,9 @@ pub fn write(this: *NodeHTTPResponse, globalObject: *jsc.JSGlobalObject, callfra } pub fn flushHeaders(this: *NodeHTTPResponse, _: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { - this.raw_response.flushHeaders(); + if (!this.flags.socket_closed) + this.raw_response.flushHeaders(); + return .js_undefined; }