diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index abb4873b22..386750d621 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -683,17 +683,25 @@ export function readDirectStream(stream, sink, underlyingSource) { $putByIdDirectPrivate(stream, "underlyingSource", null); // doing this causes isReadableStreamDefaultController to return false $putByIdDirectPrivate(stream, "start", undefined); function close(stream, reason) { - const cancelFn = underlyingSource?.cancel; - if (cancelFn) { - try { - var prom = cancelFn.$call(underlyingSource, reason); - if ($isPromise(prom)) { - $markPromiseAsHandled(prom); - } - } catch {} - - underlyingSource = undefined; + // Check if the stream is already closing/closed naturally + // If so, don't call cancel as this is a normal completion + const streamState = stream ? $getByIdDirectPrivate(stream, "state") : null; + const isNormalClose = streamState === $streamClosing || streamState === $streamClosed; + + // Only call cancel if there's a cancellation (not a normal close) + if (!isNormalClose) { + const cancelFn = underlyingSource?.cancel; + if (cancelFn) { + try { + var prom = cancelFn.$call(underlyingSource, reason); + if ($isPromise(prom)) { + $markPromiseAsHandled(prom); + } + } catch {} + } } + + underlyingSource = undefined; if (stream) { $putByIdDirectPrivate(stream, "readableStreamController", undefined); diff --git a/test-direct-stream-2.js b/test-direct-stream-2.js new file mode 100644 index 0000000000..62047c68ff --- /dev/null +++ b/test-direct-stream-2.js @@ -0,0 +1,23 @@ +// Test direct stream without server +console.log("Creating direct stream"); +const stream = new ReadableStream({ + type: 'direct', + pull(controller) { + console.log("Pull called, writing 'Hello'"); + controller.write('Hello'); + console.log("Closing controller"); + controller.close(); + }, + cancel(reason) { + console.log('Cancel called with reason:', reason); + }, +}); + +console.log("Creating Response with stream"); +const response = new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, +}); + +console.log("Getting text from response"); +const text = await response.text(); +console.log("Got text:", JSON.stringify(text)); \ No newline at end of file diff --git a/test-direct-stream-3.js b/test-direct-stream-3.js new file mode 100644 index 0000000000..30d2e49d8e --- /dev/null +++ b/test-direct-stream-3.js @@ -0,0 +1,36 @@ +// Test with async pull +const server = Bun.serve({ + port: 0, + async fetch(request) { + console.log("Creating direct stream with async pull"); + const stream = new ReadableStream({ + type: 'direct', + async pull(controller) { + console.log("Pull called, sleeping..."); + await Bun.sleep(10); + console.log("Writing 'Hello'"); + controller.write('Hello'); + console.log("Closing controller"); + controller.close(); + }, + cancel(reason) { + console.log('Cancel called with reason:', reason); + }, + }); + + console.log("Returning Response with stream"); + return new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, + }); + }, +}); + +console.log(`Server running on port ${server.port}`); + +// Make a request +const response = await fetch(`http://localhost:${server.port}/`); +console.log("Got response"); +const text = await response.text(); +console.log("Got text:", JSON.stringify(text)); + +server.stop(); \ No newline at end of file diff --git a/test-direct-stream-4.js b/test-direct-stream-4.js new file mode 100644 index 0000000000..46cf95e4fd --- /dev/null +++ b/test-direct-stream-4.js @@ -0,0 +1,37 @@ +// Test with synchronous pull but delayed close +const server = Bun.serve({ + port: 0, + fetch(request) { + console.log("Creating direct stream"); + const stream = new ReadableStream({ + type: 'direct', + pull(controller) { + console.log("Pull called, writing 'Hello'"); + controller.write('Hello'); + console.log("Setting timeout to close"); + setTimeout(() => { + console.log("Closing controller"); + controller.close(); + }, 0); + }, + cancel(reason) { + console.log('Cancel called with reason:', reason); + }, + }); + + console.log("Returning Response with stream"); + return new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, + }); + }, +}); + +console.log(`Server running on port ${server.port}`); + +// Make a request +const response = await fetch(`http://localhost:${server.port}/`); +console.log("Got response"); +const text = await response.text(); +console.log("Got text:", JSON.stringify(text)); + +server.stop(); \ No newline at end of file diff --git a/test-direct-stream-cancel.js b/test-direct-stream-cancel.js new file mode 100644 index 0000000000..00b2b728dc --- /dev/null +++ b/test-direct-stream-cancel.js @@ -0,0 +1,45 @@ +// Test cancellation +const cancelReasons = []; + +const server = Bun.serve({ + port: 0, + async fetch(request) { + const stream = new ReadableStream({ + type: 'direct', + async pull(controller) { + controller.write('Start'); + await Bun.sleep(100); // Keep stream open + }, + cancel(reason) { + cancelReasons.push(reason); + console.log('Cancel called with reason:', reason); + }, + }); + + return new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, + }); + }, +}); + +console.log(`Server running on port ${server.port}`); + +// Make a request and abort it +const controller = new AbortController(); +const fetchPromise = fetch(`http://localhost:${server.port}/`, { signal: controller.signal }); + +await Bun.sleep(50); +console.log("Aborting request"); +controller.abort(); + +try { + await fetchPromise; +} catch (e) { + console.log("Fetch aborted as expected"); +} + +await Bun.sleep(100); +console.log("Cancel reasons received:", cancelReasons.length); +console.log("Cancel reasons:", cancelReasons); + +server.stop(); \ No newline at end of file diff --git a/test-direct-stream.js b/test-direct-stream.js new file mode 100644 index 0000000000..0a51c541ee --- /dev/null +++ b/test-direct-stream.js @@ -0,0 +1,33 @@ +const server = Bun.serve({ + port: 0, + fetch(request) { + console.log("Creating direct stream"); + const stream = new ReadableStream({ + type: 'direct', + pull(controller) { + console.log("Pull called, writing 'Hello'"); + controller.write('Hello'); + console.log("Closing controller"); + controller.close(); + }, + cancel(reason) { + console.log('Cancel called with reason:', reason); + }, + }); + + console.log("Returning Response with stream"); + return new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, + }); + }, +}); + +console.log(`Server running on port ${server.port}`); + +// Make a request +const response = await fetch(`http://localhost:${server.port}/`); +console.log("Got response"); +const text = await response.text(); +console.log("Got text:", JSON.stringify(text)); + +server.stop(); \ No newline at end of file diff --git a/test/regression/issue/17175.test.ts b/test/regression/issue/17175.test.ts new file mode 100644 index 0000000000..a2087a6201 --- /dev/null +++ b/test/regression/issue/17175.test.ts @@ -0,0 +1,130 @@ +import { test, expect } from "bun:test"; +import { bunEnv, bunExe } from "harness"; + +test("direct ReadableStream should not trigger cancel when successfully consumed", async () => { + const cancelReasons: any[] = []; + + using server = Bun.serve({ + port: 0, + fetch(request) { + const stream = new ReadableStream({ + type: 'direct', + pull(controller) { + controller.write('Hello'); + controller.close(); + }, + cancel(reason) { + cancelReasons.push(reason); + }, + }); + + return new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, + }); + }, + }); + + const response = await fetch(`http://localhost:${server.port}/`); + const text = await response.text(); + + expect(text).toBe('Hello'); + expect(cancelReasons).toHaveLength(0); +}); + +test("direct ReadableStream with async pull should not trigger cancel when successfully consumed", async () => { + const cancelReasons: any[] = []; + + using server = Bun.serve({ + port: 0, + fetch(request) { + const stream = new ReadableStream({ + type: 'direct', + async pull(controller) { + await Bun.sleep(10); + controller.write('Hello'); + controller.close(); + }, + cancel(reason) { + cancelReasons.push(reason); + }, + }); + + return new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, + }); + }, + }); + + const response = await fetch(`http://localhost:${server.port}/`); + const text = await response.text(); + + expect(text).toBe('Hello'); + expect(cancelReasons).toHaveLength(0); +}); + +test("direct ReadableStream with await controller.close() should not trigger cancel", async () => { + const cancelReasons: any[] = []; + + using server = Bun.serve({ + port: 0, + fetch(request) { + const stream = new ReadableStream({ + type: 'direct', + async pull(controller) { + controller.write('Hello'); + await controller.close(); + }, + cancel(reason) { + cancelReasons.push(reason); + }, + }); + + return new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, + }); + }, + }); + + const response = await fetch(`http://localhost:${server.port}/`); + const text = await response.text(); + + expect(text).toBe('Hello'); + expect(cancelReasons).toHaveLength(0); +}); + +test("direct ReadableStream should only cancel when client disconnects", async () => { + const cancelReasons: any[] = []; + let streamController: any; + + using server = Bun.serve({ + port: 0, + fetch(request) { + const stream = new ReadableStream({ + type: 'direct', + async pull(controller) { + streamController = controller; + controller.write('Start'); + await Bun.sleep(100); + }, + cancel(reason) { + cancelReasons.push(reason); + }, + }); + + return new Response(stream, { + headers: { 'Content-Type': 'text/plain' }, + }); + }, + }); + + const controller = new AbortController(); + const fetchPromise = fetch(`http://localhost:${server.port}/`, { signal: controller.signal }); + + await Bun.sleep(50); + controller.abort(); + + await fetchPromise.catch(() => {}); + await Bun.sleep(100); + + expect(cancelReasons.length).toBeGreaterThan(0); +}); \ No newline at end of file