mirror of
https://github.com/oven-sh/bun
synced 2026-02-16 05:42:43 +00:00
Fixes "Stream is already ended" error when cancelling a request (#21481)
### What does this PR do?
if you spam the refresh button in `next dev`, we print this error:
```
⨯ Error: Stream is already ended
at writeHead (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at processTicksAndRejections (null) {
digest: '2259044225',
code: 'ERR_STREAM_ALREADY_FINISHED',
toString: [Function: toString]
}
⨯ Error: failed to pipe response
at processTicksAndRejections (unknown:7:39) {
[cause]: Error: Stream is already ended
at writeHead (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at processTicksAndRejections (null) {
digest: '2259044225',
code: 'ERR_STREAM_ALREADY_FINISHED',
toString: [Function: toString]
}
}
⨯ Error: failed to pipe response
at processTicksAndRejections (unknown:7:39) {
page: '/',
[cause]: Error: Stream is already ended
at writeHead (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at <anonymous> (null)
at processTicksAndRejections (null) {
digest: '2259044225',
code: 'ERR_STREAM_ALREADY_FINISHED',
toString: [Function: toString]
}
}
```
If the socket is already closed when writeHead is called, we're supposed
to silently ignore it instead of throwing an error . The close event is
supposed to be emitted on the next tick. Now, I think there are also
cases where we do not emit the close event which is similarly bad.
### How did you verify your code works?
Need to go through the node http server tests and see if any new ones
pass. Also maybe some will fail on this PR, let's see.
This commit is contained in:
@@ -43,6 +43,15 @@ pub const Flags = packed struct(u8) {
|
||||
is_data_buffered_during_pause: bool = false,
|
||||
/// Did we receive the last chunk of data during pause?
|
||||
is_data_buffered_during_pause_last: bool = false,
|
||||
|
||||
/// Did the user end the request?
|
||||
pub fn isRequestedCompletedOrEnded(this: *const Flags) bool {
|
||||
return this.request_has_completed or this.ended;
|
||||
}
|
||||
|
||||
pub fn isDone(this: *const Flags) bool {
|
||||
return this.isRequestedCompletedOrEnded() or this.socket_closed;
|
||||
}
|
||||
};
|
||||
|
||||
pub const UpgradeCTX = struct {
|
||||
@@ -334,6 +343,14 @@ pub fn create(
|
||||
return js_this;
|
||||
}
|
||||
|
||||
fn isDone(this: *const NodeHTTPResponse) bool {
|
||||
return this.flags.isDone();
|
||||
}
|
||||
|
||||
fn isRequestedCompletedOrEnded(this: *const NodeHTTPResponse) bool {
|
||||
return this.flags.isRequestedCompletedOrEnded();
|
||||
}
|
||||
|
||||
pub fn setOnAbortedHandler(this: *NodeHTTPResponse) void {
|
||||
if (this.flags.socket_closed) {
|
||||
return;
|
||||
@@ -346,10 +363,6 @@ pub fn setOnAbortedHandler(this: *NodeHTTPResponse) void {
|
||||
this.upgrade_context.preserveWebSocketHeadersIfNeeded();
|
||||
}
|
||||
|
||||
fn isDone(this: *const NodeHTTPResponse) bool {
|
||||
return this.flags.request_has_completed or this.flags.ended or this.flags.socket_closed;
|
||||
}
|
||||
|
||||
pub fn getEnded(this: *const NodeHTTPResponse, _: *jsc.JSGlobalObject) jsc.JSValue {
|
||||
return jsc.JSValue.jsBoolean(this.flags.ended);
|
||||
}
|
||||
@@ -430,10 +443,15 @@ extern "C" fn NodeHTTPServer__writeHead_https(
|
||||
pub fn writeHead(this: *NodeHTTPResponse, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
|
||||
const arguments = callframe.argumentsUndef(3).slice();
|
||||
|
||||
if (this.isDone()) {
|
||||
if (this.isRequestedCompletedOrEnded()) {
|
||||
return globalObject.ERR(.STREAM_ALREADY_FINISHED, "Stream is already ended", .{}).throw();
|
||||
}
|
||||
|
||||
if (this.flags.socket_closed) {
|
||||
// We haven't emitted the "close" event yet.
|
||||
return .js_undefined;
|
||||
}
|
||||
|
||||
const state = this.raw_response.state();
|
||||
try handleEndedIfNecessary(state, globalObject);
|
||||
|
||||
@@ -773,6 +791,7 @@ fn onDrain(this: *NodeHTTPResponse, offset: u64, response: uws.AnyResponse) bool
|
||||
// return false means we don't have anything to drain
|
||||
return false;
|
||||
}
|
||||
|
||||
const thisValue = this.getThisValue();
|
||||
const on_writable = js.onWritableGetCached(thisValue) orelse return false;
|
||||
const globalThis = jsc.VirtualMachine.get().global;
|
||||
@@ -791,10 +810,22 @@ fn writeOrEnd(
|
||||
this_value: jsc.JSValue,
|
||||
comptime is_end: bool,
|
||||
) bun.JSError!jsc.JSValue {
|
||||
if (this.isDone()) {
|
||||
if (this.isRequestedCompletedOrEnded()) {
|
||||
return globalObject.ERR(.STREAM_WRITE_AFTER_END, "Stream already ended", .{}).throw();
|
||||
}
|
||||
|
||||
// Loosely mimicking this code:
|
||||
// function _writeRaw(data, encoding, callback, size) {
|
||||
// const conn = this[kSocket];
|
||||
// if (conn?.destroyed) {
|
||||
// // The socket was destroyed. If we're still trying to write to it,
|
||||
// // then we haven't gotten the 'close' event yet.
|
||||
// return false;
|
||||
// }
|
||||
if (this.flags.socket_closed) {
|
||||
return if (is_end) .js_undefined else jsc.JSValue.jsNumber(0);
|
||||
}
|
||||
|
||||
const state = this.raw_response.state();
|
||||
if (!state.isResponsePending()) {
|
||||
return globalObject.ERR(.STREAM_WRITE_AFTER_END, "Stream already ended", .{}).throw();
|
||||
@@ -943,7 +974,7 @@ pub fn setOnAbort(this: *NodeHTTPResponse, thisValue: jsc.JSValue, globalObject:
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.isDone() or value.isUndefined()) {
|
||||
if (this.isRequestedCompletedOrEnded() or value.isUndefined()) {
|
||||
js.onAbortedSetCached(thisValue, globalObject, .zero);
|
||||
} else {
|
||||
js.onAbortedSetCached(thisValue, globalObject, value.withAsyncContextIfNeeded(globalObject));
|
||||
@@ -1017,7 +1048,9 @@ pub fn write(this: *NodeHTTPResponse, globalObject: *jsc.JSGlobalObject, callfra
|
||||
}
|
||||
|
||||
pub fn flushHeaders(this: *NodeHTTPResponse, _: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue {
|
||||
this.raw_response.flushHeaders();
|
||||
if (!this.flags.socket_closed)
|
||||
this.raw_response.flushHeaders();
|
||||
|
||||
return .js_undefined;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user