Compare commits

...

2 Commits

Author SHA1 Message Date
Claude Bot
fbb31d784e Fix direct ReadableStream incorrectly calling cancel on normal completion
When a direct ReadableStream completes normally (via controller.close()),
the cancel callback should not be called. This fix checks if the stream
is being closed normally (reason is undefined and stream state is closing)
and skips calling the cancel callback in that case.

This resolves issue #17175 where direct streams would incorrectly report
'cancel reason: undefined' when they completed successfully.

The fix works by:
1. Checking if the close reason is undefined (normal completion)
2. Verifying the stream is in the closing state
3. Only calling cancel for actual cancellations (errors or aborts)

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-11 22:59:58 +00:00
Claude Bot
6169d8ac66 WIP: Fix direct ReadableStream cancel issue
The issue is that when a direct ReadableStream completes normally
(via controller.close()), the cancel callback is incorrectly being called
with undefined as the reason.

This is a work in progress - currently trying to distinguish between
normal stream completion and actual cancellation.

Related to issue #17175
2025-09-11 22:56:45 +00:00
8 changed files with 358 additions and 10 deletions

View File

@@ -683,17 +683,25 @@ export function readDirectStream(stream, sink, underlyingSource) {
$putByIdDirectPrivate(stream, "underlyingSource", null); // doing this causes isReadableStreamDefaultController to return false
$putByIdDirectPrivate(stream, "start", undefined);
function close(stream, reason) {
const cancelFn = underlyingSource?.cancel;
if (cancelFn) {
try {
var prom = cancelFn.$call(underlyingSource, reason);
if ($isPromise(prom)) {
$markPromiseAsHandled(prom);
}
} catch {}
underlyingSource = undefined;
// Check if this is a normal close (controller.close() was called)
// Normal close: reason is undefined AND stream is in closing state
const streamState = stream ? $getByIdDirectPrivate(stream, "state") : null;
const isNormalClose = reason === undefined && streamState === $streamClosing;
// Only call cancel if there's a cancellation (not a normal close)
if (!isNormalClose) {
const cancelFn = underlyingSource?.cancel;
if (cancelFn) {
try {
var prom = cancelFn.$call(underlyingSource, reason);
if ($isPromise(prom)) {
$markPromiseAsHandled(prom);
}
} catch {}
}
}
underlyingSource = undefined;
if (stream) {
$putByIdDirectPrivate(stream, "readableStreamController", undefined);

23
test-direct-stream-2.js Normal file
View File

@@ -0,0 +1,23 @@
// Test direct stream without server
console.log("Creating direct stream");
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
console.log("Pull called, writing 'Hello'");
controller.write('Hello');
console.log("Closing controller");
controller.close();
},
cancel(reason) {
console.log('Cancel called with reason:', reason);
},
});
console.log("Creating Response with stream");
const response = new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
console.log("Getting text from response");
const text = await response.text();
console.log("Got text:", JSON.stringify(text));

36
test-direct-stream-3.js Normal file
View File

@@ -0,0 +1,36 @@
// Test with async pull
const server = Bun.serve({
port: 0,
async fetch(request) {
console.log("Creating direct stream with async pull");
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
console.log("Pull called, sleeping...");
await Bun.sleep(10);
console.log("Writing 'Hello'");
controller.write('Hello');
console.log("Closing controller");
controller.close();
},
cancel(reason) {
console.log('Cancel called with reason:', reason);
},
});
console.log("Returning Response with stream");
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request
const response = await fetch(`http://localhost:${server.port}/`);
console.log("Got response");
const text = await response.text();
console.log("Got text:", JSON.stringify(text));
server.stop();

37
test-direct-stream-4.js Normal file
View File

@@ -0,0 +1,37 @@
// Test with synchronous pull but delayed close
const server = Bun.serve({
port: 0,
fetch(request) {
console.log("Creating direct stream");
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
console.log("Pull called, writing 'Hello'");
controller.write('Hello');
console.log("Setting timeout to close");
setTimeout(() => {
console.log("Closing controller");
controller.close();
}, 0);
},
cancel(reason) {
console.log('Cancel called with reason:', reason);
},
});
console.log("Returning Response with stream");
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request
const response = await fetch(`http://localhost:${server.port}/`);
console.log("Got response");
const text = await response.text();
console.log("Got text:", JSON.stringify(text));
server.stop();

View File

@@ -0,0 +1,45 @@
// Test cancellation
const cancelReasons = [];
const server = Bun.serve({
port: 0,
async fetch(request) {
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
controller.write('Start');
await Bun.sleep(100); // Keep stream open
},
cancel(reason) {
cancelReasons.push(reason);
console.log('Cancel called with reason:', reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request and abort it
const controller = new AbortController();
const fetchPromise = fetch(`http://localhost:${server.port}/`, { signal: controller.signal });
await Bun.sleep(50);
console.log("Aborting request");
controller.abort();
try {
await fetchPromise;
} catch (e) {
console.log("Fetch aborted as expected");
}
await Bun.sleep(100);
console.log("Cancel reasons received:", cancelReasons.length);
console.log("Cancel reasons:", cancelReasons);
server.stop();

View File

@@ -0,0 +1,36 @@
// Debug test to understand the order of operations
const server = Bun.serve({
port: 0,
fetch(request) {
console.log("1. Creating direct stream");
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
console.log("2. Pull called");
controller.write('Hello');
console.log("3. About to call controller.close()");
controller.close();
console.log("4. After controller.close()");
},
cancel(reason) {
console.log('5. Cancel called with reason:', reason);
console.trace();
},
});
console.log("6. Returning Response with stream");
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request
const response = await fetch(`http://localhost:${server.port}/`);
console.log("7. Got response");
const text = await response.text();
console.log("8. Got text:", JSON.stringify(text));
server.stop();

33
test-direct-stream.js Normal file
View File

@@ -0,0 +1,33 @@
const server = Bun.serve({
port: 0,
fetch(request) {
console.log("Creating direct stream");
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
console.log("Pull called, writing 'Hello'");
controller.write('Hello');
console.log("Closing controller");
controller.close();
},
cancel(reason) {
console.log('Cancel called with reason:', reason);
},
});
console.log("Returning Response with stream");
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request
const response = await fetch(`http://localhost:${server.port}/`);
console.log("Got response");
const text = await response.text();
console.log("Got text:", JSON.stringify(text));
server.stop();

View File

@@ -0,0 +1,130 @@
import { test, expect } from "bun:test";
import { bunEnv, bunExe } from "harness";
test("direct ReadableStream should not trigger cancel when successfully consumed", async () => {
const cancelReasons: any[] = [];
using server = Bun.serve({
port: 0,
fetch(request) {
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
controller.write('Hello');
controller.close();
},
cancel(reason) {
cancelReasons.push(reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
const response = await fetch(`http://localhost:${server.port}/`);
const text = await response.text();
expect(text).toBe('Hello');
expect(cancelReasons).toHaveLength(0);
});
test("direct ReadableStream with async pull should not trigger cancel when successfully consumed", async () => {
const cancelReasons: any[] = [];
using server = Bun.serve({
port: 0,
fetch(request) {
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
await Bun.sleep(10);
controller.write('Hello');
controller.close();
},
cancel(reason) {
cancelReasons.push(reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
const response = await fetch(`http://localhost:${server.port}/`);
const text = await response.text();
expect(text).toBe('Hello');
expect(cancelReasons).toHaveLength(0);
});
test("direct ReadableStream with await controller.close() should not trigger cancel", async () => {
const cancelReasons: any[] = [];
using server = Bun.serve({
port: 0,
fetch(request) {
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
controller.write('Hello');
await controller.close();
},
cancel(reason) {
cancelReasons.push(reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
const response = await fetch(`http://localhost:${server.port}/`);
const text = await response.text();
expect(text).toBe('Hello');
expect(cancelReasons).toHaveLength(0);
});
test("direct ReadableStream should only cancel when client disconnects", async () => {
const cancelReasons: any[] = [];
let streamController: any;
using server = Bun.serve({
port: 0,
fetch(request) {
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
streamController = controller;
controller.write('Start');
await Bun.sleep(100);
},
cancel(reason) {
cancelReasons.push(reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
const controller = new AbortController();
const fetchPromise = fetch(`http://localhost:${server.port}/`, { signal: controller.signal });
await Bun.sleep(50);
controller.abort();
await fetchPromise.catch(() => {});
await Bun.sleep(100);
expect(cancelReasons.length).toBeGreaterThan(0);
});