From ecbd860122cc41a42e32cd26dee4475605c284c5 Mon Sep 17 00:00:00 2001 From: Claude Bot Date: Fri, 20 Feb 2026 05:14:08 +0000 Subject: [PATCH] fix: abort HTTP connection when fetch response reader is cancelled When a client calls `reader.cancel()` on a fetch response body reader, the underlying HTTP connection was not being closed, causing the server to continue sending data and `request.signal.aborted` to remain false. Detect ByteStream cancellation in FetchTasklet.onBodyReceived() and call abortTask() to shut down the HTTP connection, which triggers the server-side abort signal. Closes #19211 Co-Authored-By: Claude --- src/bun.js/webcore/fetch/FetchTasklet.zig | 20 ++++ test/regression/issue/19211.test.ts | 111 ++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 test/regression/issue/19211.test.ts diff --git a/src/bun.js/webcore/fetch/FetchTasklet.zig b/src/bun.js/webcore/fetch/FetchTasklet.zig index 3ea467b2cc..4138af00aa 100644 --- a/src/bun.js/webcore/fetch/FetchTasklet.zig +++ b/src/bun.js/webcore/fetch/FetchTasklet.zig @@ -349,6 +349,17 @@ pub const FetchTasklet = struct { if (this.readable_stream_ref.get(globalThis)) |readable| { log("onBodyReceived readable_stream_ref", .{}); if (readable.ptr == .Bytes) { + // If the client cancelled the ReadableStream (e.g. reader.cancel()), + // the ByteStream will be marked as done. In that case, abort the HTTP + // connection so the server is notified and we stop receiving data. + if (readable.ptr.Bytes.done) { + log("onBodyReceived: stream cancelled, aborting", .{}); + this.readable_stream_ref.deinit(); + this.readable_stream_ref = .{}; + this.abortTask(); + return; + } + readable.ptr.Bytes.size_hint = this.getSizeHint(); // body can be marked as used but we still need to pipe the data const scheduled_response_buffer = &this.scheduled_response_buffer.list; @@ -386,6 +397,15 @@ pub const FetchTasklet = struct { if (response.getBodyReadableStream(globalThis)) |readable| { log("onBodyReceived CurrentResponse BodyReadableStream", .{}); if (readable.ptr == .Bytes) { + // If the client cancelled the ReadableStream (e.g. reader.cancel()), + // abort the HTTP connection so the server is notified. + if (readable.ptr.Bytes.done) { + log("onBodyReceived: stream cancelled (via response), aborting", .{}); + response.detachReadableStream(globalThis); + this.abortTask(); + return; + } + const scheduled_response_buffer = this.scheduled_response_buffer.list; const chunk = scheduled_response_buffer.items; diff --git a/test/regression/issue/19211.test.ts b/test/regression/issue/19211.test.ts new file mode 100644 index 0000000000..da17d12dff --- /dev/null +++ b/test/regression/issue/19211.test.ts @@ -0,0 +1,111 @@ +import { expect, test } from "bun:test"; + +test("reader.cancel() on fetch response should trigger server abort signal", async () => { + // Server that streams data and tracks abort signal + const server = Bun.serve({ + fetch(request) { + let count = 0; + return new Response( + new ReadableStream({ + async pull(controller) { + count++; + controller.enqueue(new TextEncoder().encode(`chunk ${count}\n`)); + // Small delay to allow cancellation to propagate + await new Promise(resolve => setTimeout(resolve, 100)); + if (count > 20) controller.close(); + }, + cancel(_reason) { + // Stream cancel callback - verifies server-side stream cancellation + }, + }), + ); + }, + port: 0, + }); + + try { + const res = await fetch(`http://localhost:${server.port}`, { + method: "POST", + }); + const reader = res.body!.getReader(); + + // Read a couple of chunks + const chunk1 = await reader.read(); + expect(chunk1.done).toBe(false); + expect(new TextDecoder().decode(chunk1.value)).toBe("chunk 1\n"); + + const chunk2 = await reader.read(); + expect(chunk2.done).toBe(false); + expect(new TextDecoder().decode(chunk2.value)).toBe("chunk 2\n"); + + // Cancel the reader - this should eventually trigger server-side abort + await reader.cancel(); + + // Give time for the abort to propagate through the HTTP connection + await new Promise(resolve => setTimeout(resolve, 500)); + + // Make another request to verify the server is still functional + // and the previous connection was properly closed + const res2 = await fetch(`http://localhost:${server.port}`, { + method: "POST", + }); + const reader2 = res2.body!.getReader(); + const chunk = await reader2.read(); + expect(chunk.done).toBe(false); + expect(new TextDecoder().decode(chunk.value)).toBe("chunk 1\n"); + await reader2.cancel(); + } finally { + server.stop(true); + } +}); + +test("reader.cancel() on fetch response should abort the HTTP connection", async () => { + // Track whether the server's stream cancel callback was called + let streamCancelCalled = false; + let serverChunkCount = 0; + const { promise: cancelPromise, resolve: cancelResolve } = Promise.withResolvers(); + + const server = Bun.serve({ + fetch(request) { + return new Response( + new ReadableStream({ + async pull(controller) { + serverChunkCount++; + controller.enqueue(new TextEncoder().encode(`data: ${serverChunkCount}\n\n`)); + await new Promise(resolve => setTimeout(resolve, 100)); + if (serverChunkCount > 50) controller.close(); + }, + cancel(_reason) { + streamCancelCalled = true; + cancelResolve(); + }, + }), + ); + }, + port: 0, + }); + + try { + const res = await fetch(`http://localhost:${server.port}`); + const reader = res.body!.getReader(); + + // Read two chunks + await reader.read(); + await reader.read(); + + // Record how many chunks the server has sent so far + const chunksBeforeCancel = serverChunkCount; + + // Cancel the reader + await reader.cancel(); + + // Wait for cancellation to propagate, but with a timeout + await Promise.race([cancelPromise, new Promise(resolve => setTimeout(resolve, 2000))]); + + // The server should have stopped sending chunks shortly after cancel + // Allow a small margin for in-flight data + expect(serverChunkCount).toBeLessThan(chunksBeforeCancel + 5); + } finally { + server.stop(true); + } +});