Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
3ca19e8249 fix(fetch): error ReadableStream body on connection failure
When a fetch() request with a ReadableStream body fails before the
streaming starts (e.g., ConnectionRefused), the stream was not being
properly errored. This caused controller.desiredSize to remain 0
instead of becoming null, making it impossible for the stream to
detect the error condition.

The fix adds a new ReadableStream__error function that transitions
the stream to the errored state, and calls it from FetchTasklet when
the request fails before the sink is created.

Fixes #26515

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 01:36:27 +00:00
4 changed files with 168 additions and 0 deletions

View File

@@ -334,6 +334,25 @@ extern "C" void ReadableStream__cancel(JSC::EncodedJSValue possibleReadableStrea
WebCore::ReadableStream::cancel(*globalObject, readableStream, exception);
}
extern "C" void ReadableStream__error(JSC::EncodedJSValue possibleReadableStream, Zig::GlobalObject* globalObject, JSC::EncodedJSValue reason)
{
auto* readableStream = jsDynamicCast<WebCore::JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream));
if (!readableStream) [[unlikely]]
return;
auto* clientData = static_cast<JSVMClientData*>(globalObject->vm().clientData);
auto& privateName = clientData->builtinFunctions().readableStreamInternalsBuiltins().readableStreamErrorPrivateName();
auto& vm = globalObject->vm();
JSC::JSLockHolder lock(vm);
MarkedArgumentBuffer arguments;
arguments.append(readableStream);
arguments.append(JSC::JSValue::decode(reason));
ASSERT(!arguments.hasOverflowed());
invokeReadableStreamFunction(*globalObject, privateName, JSC::jsUndefined(), arguments);
}
extern "C" void ReadableStream__detach(JSC::EncodedJSValue possibleReadableStream, Zig::GlobalObject* globalObject)
{
auto value = JSC::JSValue::decode(possibleReadableStream);

View File

@@ -149,6 +149,13 @@ pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
this.cancel(globalThis);
}
/// Error the stream with a reason, transitioning it to the errored state.
/// This makes desiredSize return null and invokes the cancel callback on the underlying source.
pub fn errorWithReason(this: *const ReadableStream, globalThis: *JSGlobalObject, reason: JSValue) void {
jsc.markBinding(@src());
ReadableStream__error(this.value, globalThis, reason);
}
pub fn forceDetach(this: *const ReadableStream, globalObject: *JSGlobalObject) void {
ReadableStream__detach(this.value, globalObject);
}
@@ -212,6 +219,7 @@ extern fn ReadableStream__used(*JSGlobalObject) jsc.JSValue;
extern fn ReadableStream__cancel(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__abort(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__detach(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__error(stream: JSValue, *JSGlobalObject, reason: JSValue) void;
extern fn ReadableStream__fromBlob(
*JSGlobalObject,
store: *anyopaque,

View File

@@ -539,6 +539,14 @@ pub const FetchTasklet = struct {
const err = value.toJS(globalThis);
if (this.sink) |sink| {
sink.cancel(err);
} else if (this.request_body == .ReadableStream) {
// If there's no sink yet (connection failed before streaming started),
// we need to error the stream directly so that:
// 1. controller.desiredSize returns null
// 2. The underlying source's cancel callback is invoked
if (this.request_body.ReadableStream.get(globalThis)) |stream| {
stream.errorWithReason(globalThis, err);
}
}
break :brk value.JSValue;
},

View File

@@ -0,0 +1,133 @@
import { expect, test } from "bun:test";
// https://github.com/oven-sh/bun/issues/26515
// ReadableStream's controller.desiredSize should be null on fetch() ConnectionRefused
// Helper to get an ephemeral port that's guaranteed to be closed
async function getClosedPort(): Promise<number> {
await using server = Bun.serve({
port: 0,
fetch() {
return new Response("ok");
},
});
return server.port;
}
test("ReadableStream body desiredSize is null on fetch connection error", async () => {
const port = await getClosedPort();
// Use a deferred promise pattern to avoid async executor
let resolveResult: (value: { desiredSizes: (number | null)[]; fetchError: string | null }) => void;
const resultPromise = new Promise<{
desiredSizes: (number | null)[];
fetchError: string | null;
}>(resolve => {
resolveResult = resolve;
});
// Promise to signal when fetch has failed
let resolveFetchFailed: () => void;
const fetchFailedPromise = new Promise<void>(resolve => {
resolveFetchFailed = resolve;
});
const desiredSizes: (number | null)[] = [];
let fetchError: string | null = null;
const inputStream = new ReadableStream({
start(controller) {
// Record initial desiredSize
desiredSizes.push(controller.desiredSize);
},
async pull(controller) {
// Enqueue a chunk
controller.enqueue(new Uint8Array(1024));
desiredSizes.push(controller.desiredSize);
// Wait for the fetch to actually fail
await fetchFailedPromise;
// Record desiredSize after connection error
desiredSizes.push(controller.desiredSize);
resolveResult({ desiredSizes, fetchError });
},
});
// Start the fetch (don't await - let it run concurrently)
fetch(`http://localhost:${port}`, {
method: "POST",
body: inputStream,
duplex: "half",
}).catch((err: Error) => {
fetchError = err.message;
resolveFetchFailed();
});
const result = await resultPromise;
// Verify fetch failed with connection error
expect(result.fetchError).toContain("Unable to connect");
// Verify desiredSizes:
// - First value (start): should be highWaterMark (default 1)
// - Second value (after enqueue): should be 0 (queue is full)
// - Third value (after error): should be null (stream errored)
expect(result.desiredSizes[0]).toBe(1);
expect(result.desiredSizes[1]).toBe(0);
expect(result.desiredSizes[2]).toBe(null);
});
test("ReadableStream body enqueue throws after fetch connection error", async () => {
const port = await getClosedPort();
// Use a deferred promise pattern to avoid async executor
let resolveResult: (value: { enqueueError: string | null; fetchError: string | null }) => void;
const resultPromise = new Promise<{
enqueueError: string | null;
fetchError: string | null;
}>(resolve => {
resolveResult = resolve;
});
// Promise to signal when fetch has failed
let resolveFetchFailed: () => void;
const fetchFailedPromise = new Promise<void>(resolve => {
resolveFetchFailed = resolve;
});
let enqueueError: string | null = null;
let fetchError: string | null = null;
const inputStream = new ReadableStream({
async pull(controller) {
controller.enqueue(new Uint8Array(1024));
// Wait for the fetch to actually fail
await fetchFailedPromise;
// Try to enqueue after error - should throw
try {
controller.enqueue(new Uint8Array(1024));
} catch (err: any) {
enqueueError = err.message;
}
resolveResult({ enqueueError, fetchError });
},
});
// Start the fetch (don't await - let it run concurrently)
fetch(`http://localhost:${port}`, {
method: "POST",
body: inputStream,
duplex: "half",
}).catch((err: Error) => {
fetchError = err.message;
resolveFetchFailed();
});
const result = await resultPromise;
expect(result.fetchError).toContain("Unable to connect");
expect(result.enqueueError).toContain("Controller is already closed");
});