Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
7654fb72c6 fix(streams): allow cancellation of unlocked undisturbed streams
The `ReadableStream__cancel` function previously had an incorrect early return
when the stream was NOT locked. This prevented cancellation of unlocked streams,
which breaks SSE (Server-Sent Events) when using TransformStream - the readable
side is passed directly to Response without acquiring a reader lock.

The fix changes the logic to:
1. Allow cancellation of unlocked streams only if they are not disturbed yet.
   Disturbed streams may be in an error state, and cancelling them could
   inappropriately surface that error.
2. Use a direct JS call in ReadableStream::cancel instead of
   invokeReadableStreamFunction, and clear any exception after the call.
   The JS readableStreamCancel function can reject (return a rejected Promise)
   if the stream is already errored, which is expected behavior per the
   WHATWG Streams spec.

This fixes the issue where:
- `writer.closed` promise never resolved/rejected when clients disconnected
- Resources (like intervals) continued running after client disconnect
- Potential crashes from writes to destroyed connections

Fixes #26300

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 20:38:41 +00:00
2 changed files with 102 additions and 3 deletions

View File

@@ -206,7 +206,17 @@ void ReadableStream::cancel(WebCore::JSDOMGlobalObject& globalObject, JSReadable
arguments.append(readableStream);
arguments.append(value);
ASSERT(!arguments.hasOverflowed());
invokeReadableStreamFunction(globalObject, privateName, JSC::jsUndefined(), arguments);
auto function = globalObject.get(&globalObject, privateName);
if (!function.isCallable())
return;
auto callData = JSC::getCallData(function);
call(&globalObject, function, callData, JSC::jsUndefined(), arguments);
// Clear any exception - readableStreamCancel can reject if stream is already errored,
// which is expected behavior per the WHATWG Streams spec. The rejection will be
// handled by whoever is waiting on the cancel operation's return value (if anyone).
scope.clearException();
}
static inline bool checkReadableStream(JSDOMGlobalObject& globalObject, JSReadableStream* readableStream, JSC::JSValue function)
@@ -326,8 +336,17 @@ extern "C" void ReadableStream__cancel(JSC::EncodedJSValue possibleReadableStrea
if (!readableStream) [[unlikely]]
return;
if (!WebCore::ReadableStream::isLocked(globalObject, readableStream)) {
return;
// Only cancel unlocked streams if they haven't been disturbed yet.
// A disturbed stream that is unlocked may already be in an error state,
// and calling cancel on it could surface that error inappropriately.
// For unlocked, undisturbed streams (like TransformStream readable side
// passed to Response), we need to cancel them to propagate the abort.
bool isLocked = WebCore::ReadableStream::isLocked(globalObject, readableStream);
if (!isLocked) {
bool isDisturbed = WebCore::ReadableStream::isDisturbed(globalObject, readableStream);
if (isDisturbed) {
return;
}
}
WebCore::Exception exception { Bun::AbortError };

View File

@@ -0,0 +1,80 @@
import { expect, test } from "bun:test";
// Regression test for https://github.com/oven-sh/bun/issues/26300
// SSE connections using TransformStream should trigger onClosed/writer.closed when client disconnects
test("TransformStream readable side should be cancelled when client disconnects", async () => {
const { promise: cleanupPromise, resolve: resolveCleanup } = Promise.withResolvers<string>();
const { promise: serverReady, resolve: serverReadyResolve } = Promise.withResolvers<void>();
const { promise: dataSent, resolve: resolveDataSent } = Promise.withResolvers<void>();
using server = Bun.serve({
port: 0,
fetch(req) {
const stream = new TransformStream();
const writer = stream.writable.getWriter();
// This should be called when the client disconnects
writer.closed
.then(() => {
resolveCleanup("closed-normally");
})
.catch(e => {
// AbortError is expected when client disconnects
resolveCleanup(`closed-with-error:${e?.name ?? "unknown"}`);
});
// Send initial data to ensure connection is fully established
writer.write(new TextEncoder().encode("data: connected\n\n")).then(() => {
resolveDataSent();
});
serverReadyResolve();
return new Response(stream.readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
},
});
// Use a subprocess to make a request that we can kill to simulate socket close
await using proc = Bun.spawn({
cmd: ["curl", "-s", "-N", `http://localhost:${server.port}`],
stdout: "pipe",
stderr: "pipe",
});
// Wait for server to receive the request
await serverReady;
// Wait for data to be sent, ensuring connection is fully established
await dataSent;
// Kill the curl process to simulate client disconnect
proc.kill();
// Wait for the cleanup to happen - test runner timeout will fail the test if this never resolves
// Without the fix, this would hang forever as writer.closed never resolves/rejects
const result = await cleanupPromise;
// The writer.closed promise should resolve or reject with an error
expect(result).toMatch(/^closed-(normally|with-error:)/);
});
test("ReadableStream from TransformStream should propagate cancellation to writable side", async () => {
const { promise: closedPromise, resolve: resolveClosedPromise } = Promise.withResolvers<void>();
const transformStream = new TransformStream();
const writer = transformStream.writable.getWriter();
writer.closed.then(() => resolveClosedPromise()).catch(() => resolveClosedPromise());
// Cancel the readable side directly
await transformStream.readable.cancel(new Error("test cancellation"));
// Wait for writer.closed to resolve/reject - test runner timeout will fail if this never happens
await closedPromise;
});