mirror of
https://github.com/oven-sh/bun
synced 2026-02-25 19:17:20 +01:00
fix: abort HTTP connection when fetch response reader is cancelled
When a client calls `reader.cancel()` on a fetch response body reader, the underlying HTTP connection was not being closed, causing the server to continue sending data and `request.signal.aborted` to remain false. Detect ByteStream cancellation in FetchTasklet.onBodyReceived() and call abortTask() to shut down the HTTP connection, which triggers the server-side abort signal. Closes #19211 Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -349,6 +349,17 @@ pub const FetchTasklet = struct {
|
||||
if (this.readable_stream_ref.get(globalThis)) |readable| {
|
||||
log("onBodyReceived readable_stream_ref", .{});
|
||||
if (readable.ptr == .Bytes) {
|
||||
// If the client cancelled the ReadableStream (e.g. reader.cancel()),
|
||||
// the ByteStream will be marked as done. In that case, abort the HTTP
|
||||
// connection so the server is notified and we stop receiving data.
|
||||
if (readable.ptr.Bytes.done) {
|
||||
log("onBodyReceived: stream cancelled, aborting", .{});
|
||||
this.readable_stream_ref.deinit();
|
||||
this.readable_stream_ref = .{};
|
||||
this.abortTask();
|
||||
return;
|
||||
}
|
||||
|
||||
readable.ptr.Bytes.size_hint = this.getSizeHint();
|
||||
// body can be marked as used but we still need to pipe the data
|
||||
const scheduled_response_buffer = &this.scheduled_response_buffer.list;
|
||||
@@ -386,6 +397,15 @@ pub const FetchTasklet = struct {
|
||||
if (response.getBodyReadableStream(globalThis)) |readable| {
|
||||
log("onBodyReceived CurrentResponse BodyReadableStream", .{});
|
||||
if (readable.ptr == .Bytes) {
|
||||
// If the client cancelled the ReadableStream (e.g. reader.cancel()),
|
||||
// abort the HTTP connection so the server is notified.
|
||||
if (readable.ptr.Bytes.done) {
|
||||
log("onBodyReceived: stream cancelled (via response), aborting", .{});
|
||||
response.detachReadableStream(globalThis);
|
||||
this.abortTask();
|
||||
return;
|
||||
}
|
||||
|
||||
const scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
|
||||
const chunk = scheduled_response_buffer.items;
|
||||
|
||||
111
test/regression/issue/19211.test.ts
Normal file
111
test/regression/issue/19211.test.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
import { expect, test } from "bun:test";
|
||||
|
||||
test("reader.cancel() on fetch response should trigger server abort signal", async () => {
|
||||
// Server that streams data and tracks abort signal
|
||||
const server = Bun.serve({
|
||||
fetch(request) {
|
||||
let count = 0;
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
async pull(controller) {
|
||||
count++;
|
||||
controller.enqueue(new TextEncoder().encode(`chunk ${count}\n`));
|
||||
// Small delay to allow cancellation to propagate
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
if (count > 20) controller.close();
|
||||
},
|
||||
cancel(_reason) {
|
||||
// Stream cancel callback - verifies server-side stream cancellation
|
||||
},
|
||||
}),
|
||||
);
|
||||
},
|
||||
port: 0,
|
||||
});
|
||||
|
||||
try {
|
||||
const res = await fetch(`http://localhost:${server.port}`, {
|
||||
method: "POST",
|
||||
});
|
||||
const reader = res.body!.getReader();
|
||||
|
||||
// Read a couple of chunks
|
||||
const chunk1 = await reader.read();
|
||||
expect(chunk1.done).toBe(false);
|
||||
expect(new TextDecoder().decode(chunk1.value)).toBe("chunk 1\n");
|
||||
|
||||
const chunk2 = await reader.read();
|
||||
expect(chunk2.done).toBe(false);
|
||||
expect(new TextDecoder().decode(chunk2.value)).toBe("chunk 2\n");
|
||||
|
||||
// Cancel the reader - this should eventually trigger server-side abort
|
||||
await reader.cancel();
|
||||
|
||||
// Give time for the abort to propagate through the HTTP connection
|
||||
await new Promise(resolve => setTimeout(resolve, 500));
|
||||
|
||||
// Make another request to verify the server is still functional
|
||||
// and the previous connection was properly closed
|
||||
const res2 = await fetch(`http://localhost:${server.port}`, {
|
||||
method: "POST",
|
||||
});
|
||||
const reader2 = res2.body!.getReader();
|
||||
const chunk = await reader2.read();
|
||||
expect(chunk.done).toBe(false);
|
||||
expect(new TextDecoder().decode(chunk.value)).toBe("chunk 1\n");
|
||||
await reader2.cancel();
|
||||
} finally {
|
||||
server.stop(true);
|
||||
}
|
||||
});
|
||||
|
||||
test("reader.cancel() on fetch response should abort the HTTP connection", async () => {
|
||||
// Track whether the server's stream cancel callback was called
|
||||
let streamCancelCalled = false;
|
||||
let serverChunkCount = 0;
|
||||
const { promise: cancelPromise, resolve: cancelResolve } = Promise.withResolvers<void>();
|
||||
|
||||
const server = Bun.serve({
|
||||
fetch(request) {
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
async pull(controller) {
|
||||
serverChunkCount++;
|
||||
controller.enqueue(new TextEncoder().encode(`data: ${serverChunkCount}\n\n`));
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
if (serverChunkCount > 50) controller.close();
|
||||
},
|
||||
cancel(_reason) {
|
||||
streamCancelCalled = true;
|
||||
cancelResolve();
|
||||
},
|
||||
}),
|
||||
);
|
||||
},
|
||||
port: 0,
|
||||
});
|
||||
|
||||
try {
|
||||
const res = await fetch(`http://localhost:${server.port}`);
|
||||
const reader = res.body!.getReader();
|
||||
|
||||
// Read two chunks
|
||||
await reader.read();
|
||||
await reader.read();
|
||||
|
||||
// Record how many chunks the server has sent so far
|
||||
const chunksBeforeCancel = serverChunkCount;
|
||||
|
||||
// Cancel the reader
|
||||
await reader.cancel();
|
||||
|
||||
// Wait for cancellation to propagate, but with a timeout
|
||||
await Promise.race([cancelPromise, new Promise(resolve => setTimeout(resolve, 2000))]);
|
||||
|
||||
// The server should have stopped sending chunks shortly after cancel
|
||||
// Allow a small margin for in-flight data
|
||||
expect(serverChunkCount).toBeLessThan(chunksBeforeCancel + 5);
|
||||
} finally {
|
||||
server.stop(true);
|
||||
}
|
||||
});
|
||||
Reference in New Issue
Block a user