WIP: Fix direct ReadableStream cancel issue

The issue is that when a direct ReadableStream completes normally
(via controller.close()), the cancel callback is incorrectly being called
with undefined as the reason.

This is a work in progress - currently trying to distinguish between
normal stream completion and actual cancellation.

Related to issue #17175
This commit is contained in:
Claude Bot
2025-09-11 22:56:45 +00:00
parent 4b5551d230
commit 6169d8ac66
7 changed files with 322 additions and 10 deletions

View File

@@ -683,17 +683,25 @@ export function readDirectStream(stream, sink, underlyingSource) {
$putByIdDirectPrivate(stream, "underlyingSource", null); // doing this causes isReadableStreamDefaultController to return false
$putByIdDirectPrivate(stream, "start", undefined);
function close(stream, reason) {
const cancelFn = underlyingSource?.cancel;
if (cancelFn) {
try {
var prom = cancelFn.$call(underlyingSource, reason);
if ($isPromise(prom)) {
$markPromiseAsHandled(prom);
}
} catch {}
underlyingSource = undefined;
// Check if the stream is already closing/closed naturally
// If so, don't call cancel as this is a normal completion
const streamState = stream ? $getByIdDirectPrivate(stream, "state") : null;
const isNormalClose = streamState === $streamClosing || streamState === $streamClosed;
// Only call cancel if there's a cancellation (not a normal close)
if (!isNormalClose) {
const cancelFn = underlyingSource?.cancel;
if (cancelFn) {
try {
var prom = cancelFn.$call(underlyingSource, reason);
if ($isPromise(prom)) {
$markPromiseAsHandled(prom);
}
} catch {}
}
}
underlyingSource = undefined;
if (stream) {
$putByIdDirectPrivate(stream, "readableStreamController", undefined);

23
test-direct-stream-2.js Normal file
View File

@@ -0,0 +1,23 @@
// Test direct stream without server
console.log("Creating direct stream");
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
console.log("Pull called, writing 'Hello'");
controller.write('Hello');
console.log("Closing controller");
controller.close();
},
cancel(reason) {
console.log('Cancel called with reason:', reason);
},
});
console.log("Creating Response with stream");
const response = new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
console.log("Getting text from response");
const text = await response.text();
console.log("Got text:", JSON.stringify(text));

36
test-direct-stream-3.js Normal file
View File

@@ -0,0 +1,36 @@
// Test with async pull
const server = Bun.serve({
port: 0,
async fetch(request) {
console.log("Creating direct stream with async pull");
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
console.log("Pull called, sleeping...");
await Bun.sleep(10);
console.log("Writing 'Hello'");
controller.write('Hello');
console.log("Closing controller");
controller.close();
},
cancel(reason) {
console.log('Cancel called with reason:', reason);
},
});
console.log("Returning Response with stream");
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request
const response = await fetch(`http://localhost:${server.port}/`);
console.log("Got response");
const text = await response.text();
console.log("Got text:", JSON.stringify(text));
server.stop();

37
test-direct-stream-4.js Normal file
View File

@@ -0,0 +1,37 @@
// Test with synchronous pull but delayed close
const server = Bun.serve({
port: 0,
fetch(request) {
console.log("Creating direct stream");
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
console.log("Pull called, writing 'Hello'");
controller.write('Hello');
console.log("Setting timeout to close");
setTimeout(() => {
console.log("Closing controller");
controller.close();
}, 0);
},
cancel(reason) {
console.log('Cancel called with reason:', reason);
},
});
console.log("Returning Response with stream");
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request
const response = await fetch(`http://localhost:${server.port}/`);
console.log("Got response");
const text = await response.text();
console.log("Got text:", JSON.stringify(text));
server.stop();

View File

@@ -0,0 +1,45 @@
// Test cancellation
const cancelReasons = [];
const server = Bun.serve({
port: 0,
async fetch(request) {
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
controller.write('Start');
await Bun.sleep(100); // Keep stream open
},
cancel(reason) {
cancelReasons.push(reason);
console.log('Cancel called with reason:', reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request and abort it
const controller = new AbortController();
const fetchPromise = fetch(`http://localhost:${server.port}/`, { signal: controller.signal });
await Bun.sleep(50);
console.log("Aborting request");
controller.abort();
try {
await fetchPromise;
} catch (e) {
console.log("Fetch aborted as expected");
}
await Bun.sleep(100);
console.log("Cancel reasons received:", cancelReasons.length);
console.log("Cancel reasons:", cancelReasons);
server.stop();

33
test-direct-stream.js Normal file
View File

@@ -0,0 +1,33 @@
const server = Bun.serve({
port: 0,
fetch(request) {
console.log("Creating direct stream");
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
console.log("Pull called, writing 'Hello'");
controller.write('Hello');
console.log("Closing controller");
controller.close();
},
cancel(reason) {
console.log('Cancel called with reason:', reason);
},
});
console.log("Returning Response with stream");
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
console.log(`Server running on port ${server.port}`);
// Make a request
const response = await fetch(`http://localhost:${server.port}/`);
console.log("Got response");
const text = await response.text();
console.log("Got text:", JSON.stringify(text));
server.stop();

View File

@@ -0,0 +1,130 @@
import { test, expect } from "bun:test";
import { bunEnv, bunExe } from "harness";
test("direct ReadableStream should not trigger cancel when successfully consumed", async () => {
const cancelReasons: any[] = [];
using server = Bun.serve({
port: 0,
fetch(request) {
const stream = new ReadableStream({
type: 'direct',
pull(controller) {
controller.write('Hello');
controller.close();
},
cancel(reason) {
cancelReasons.push(reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
const response = await fetch(`http://localhost:${server.port}/`);
const text = await response.text();
expect(text).toBe('Hello');
expect(cancelReasons).toHaveLength(0);
});
test("direct ReadableStream with async pull should not trigger cancel when successfully consumed", async () => {
const cancelReasons: any[] = [];
using server = Bun.serve({
port: 0,
fetch(request) {
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
await Bun.sleep(10);
controller.write('Hello');
controller.close();
},
cancel(reason) {
cancelReasons.push(reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
const response = await fetch(`http://localhost:${server.port}/`);
const text = await response.text();
expect(text).toBe('Hello');
expect(cancelReasons).toHaveLength(0);
});
test("direct ReadableStream with await controller.close() should not trigger cancel", async () => {
const cancelReasons: any[] = [];
using server = Bun.serve({
port: 0,
fetch(request) {
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
controller.write('Hello');
await controller.close();
},
cancel(reason) {
cancelReasons.push(reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
const response = await fetch(`http://localhost:${server.port}/`);
const text = await response.text();
expect(text).toBe('Hello');
expect(cancelReasons).toHaveLength(0);
});
test("direct ReadableStream should only cancel when client disconnects", async () => {
const cancelReasons: any[] = [];
let streamController: any;
using server = Bun.serve({
port: 0,
fetch(request) {
const stream = new ReadableStream({
type: 'direct',
async pull(controller) {
streamController = controller;
controller.write('Start');
await Bun.sleep(100);
},
cancel(reason) {
cancelReasons.push(reason);
},
});
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' },
});
},
});
const controller = new AbortController();
const fetchPromise = fetch(`http://localhost:${server.port}/`, { signal: controller.signal });
await Bun.sleep(50);
controller.abort();
await fetchPromise.catch(() => {});
await Bun.sleep(100);
expect(cancelReasons.length).toBeGreaterThan(0);
});