fix(FileRoute) fix eof handling, fix max size handling, fix onReaderError behavior (#20317)

Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com>
Co-authored-by: cirospaciari <6379399+cirospaciari@users.noreply.github.com>
This commit is contained in:
Ciro Spaciari
2025-06-11 20:34:52 -07:00
committed by GitHub
parent 3d19c1156c
commit 631e674842
4 changed files with 84 additions and 107 deletions

View File

@@ -295,20 +295,23 @@ const FileType = bun.io.FileType;
const Output = bun.Output;
const StreamTransfer = struct {
const StreamTransferRefCount = bun.ptr.RefCount(@This(), "ref_count", StreamTransfer.deinit, .{});
pub const ref = StreamTransferRefCount.ref;
pub const deref = StreamTransferRefCount.deref;
reader: bun.io.BufferedReader = bun.io.BufferedReader.init(StreamTransfer),
ref_count: StreamTransferRefCount,
fd: bun.FileDescriptor,
resp: AnyResponse,
route: *FileRoute,
defer_deinit: ?*bool = null,
max_size: ?u64 = null,
eof_task: ?JSC.AnyTask = null,
state: packed struct(u8) {
waiting_for_readable: bool = false,
waiting_for_writable: bool = false,
has_ended_response: bool = false,
has_reader_closed: bool = false,
_: u4 = 0,
_: u7 = 0,
} = .{},
const log = Output.scoped(.StreamTransfer, false);
@@ -321,6 +324,7 @@ const StreamTransfer = struct {
file_type: FileType,
) *StreamTransfer {
var t = bun.new(StreamTransfer, .{
.ref_count = .init(),
.fd = fd,
.resp = resp,
.route = route,
@@ -340,12 +344,9 @@ const StreamTransfer = struct {
fn start(this: *StreamTransfer, start_offset: usize, size: ?usize) void {
log("start", .{});
var scope: DeinitScope = undefined;
scope.enter(this);
defer scope.exit();
this.ref();
defer this.deref();
this.state.waiting_for_readable = true;
this.state.waiting_for_writable = true;
this.max_size = size;
switch (if (start_offset > 0)
@@ -378,23 +379,21 @@ const StreamTransfer = struct {
if (this.route.server) |server| {
this.resp.timeout(server.config().idleTimeout);
}
this.reader.read();
// we connection aborts/closes so we need to be notified
this.resp.onAborted(*StreamTransfer, onAborted, this);
if (!scope.deinit_called) {
// This clones some data so we could avoid that if we're already done.
this.resp.onAborted(*StreamTransfer, onAborted, this);
}
// we are reading so increase the ref count until onReaderDone/onReaderError
this.ref();
this.reader.read();
}
pub fn onReadChunk(this: *StreamTransfer, chunk_: []const u8, state_: bun.io.ReadState) bool {
log("onReadChunk", .{});
var scope: DeinitScope = undefined;
scope.enter(this);
defer scope.exit();
this.ref();
defer this.deref();
if (this.state.has_ended_response) {
this.state.waiting_for_readable = false;
return false;
}
@@ -403,17 +402,29 @@ const StreamTransfer = struct {
const chunk = chunk_[0..@min(chunk_.len, max_size.*)];
max_size.* -|= chunk.len;
if (state_ != .eof and max_size.* == 0) {
// artificially end the stream aka max_size reached
log("max_size reached, ending stream", .{});
if (this.route.server) |server| {
// dont need to ref because we are already holding a ref and will be derefed in onReaderDone
this.reader.pause();
// we cannot free inside onReadChunk this would be UAF so we schedule it to be done in the next event loop tick
this.eof_task = JSC.AnyTask.New(StreamTransfer, StreamTransfer.onReaderDone).init(this);
server.vm().enqueueTask(JSC.Task.init(&this.eof_task.?));
}
break :brk .{ chunk, .eof };
}
break :brk .{ chunk_, state_ };
break :brk .{ chunk, state_ };
}
break :brk .{ chunk_, state_ };
};
if (state == .eof and !this.state.waiting_for_writable) {
this.state.waiting_for_readable = false;
if (this.route.server) |server| {
this.resp.timeout(server.config().idleTimeout);
}
if (state == .eof) {
this.state.has_ended_response = true;
const resp = this.resp;
const route = this.route;
@@ -423,31 +434,15 @@ const StreamTransfer = struct {
return false;
}
if (this.route.server) |server| {
this.resp.timeout(server.config().idleTimeout);
}
switch (this.resp.write(chunk)) {
.backpressure => {
// pause the reader so deref until onWritable
defer this.deref();
this.resp.onWritable(*StreamTransfer, onWritable, this);
this.reader.pause();
this.resp.markNeedsMore();
this.state.waiting_for_writable = true;
this.state.waiting_for_readable = false;
return false;
},
.want_more => {
this.state.waiting_for_readable = true;
this.state.waiting_for_writable = false;
if (state == .eof) {
this.state.waiting_for_readable = false;
return false;
}
if (bun.Environment.isWindows)
this.reader.unpause();
return true;
},
}
@@ -455,24 +450,25 @@ const StreamTransfer = struct {
pub fn onReaderDone(this: *StreamTransfer) void {
log("onReaderDone", .{});
this.state.waiting_for_readable = false;
this.state.has_reader_closed = true;
var scope: DeinitScope = undefined;
scope.enter(this);
defer scope.exit();
// deref the ref because reader is done
defer this.deref();
this.finish();
}
pub fn onReaderError(this: *StreamTransfer, err: bun.sys.Error) void {
log("onReaderError {any}", .{err});
this.state.waiting_for_readable = false;
var scope: DeinitScope = undefined;
scope.enter(this);
defer scope.exit();
defer this.deref(); // deref the ref because reader is done
if (!this.state.has_ended_response) {
// we need to signal to the client that something went wrong, so close the connection
// sending the end chunk would be a lie and could cause issues
this.state.has_ended_response = true;
const resp = this.resp;
const route = this.route;
route.onResponseComplete(resp);
this.resp.forceClose();
}
this.finish();
}
@@ -487,9 +483,8 @@ const StreamTransfer = struct {
fn onWritable(this: *StreamTransfer, _: u64, _: AnyResponse) bool {
log("onWritable", .{});
var scope: DeinitScope = undefined;
scope.enter(this);
defer scope.exit();
this.ref();
defer this.deref();
if (this.reader.isDone()) {
@branchHint(.unlikely);
@@ -503,84 +498,45 @@ const StreamTransfer = struct {
this.resp.timeout(server.config().idleTimeout);
}
this.state.waiting_for_writable = false;
this.state.waiting_for_readable = true;
// we are reading so increase the ref count until onReaderDone/onReaderError
this.ref();
this.reader.read();
return true;
}
fn finish(this: *StreamTransfer) void {
log("finish", .{});
// lets make sure that we detach the response
this.resp.clearOnWritable();
this.resp.clearAborted();
this.resp.clearTimeout();
if (!this.state.has_ended_response) {
this.state.has_ended_response = true;
this.state.waiting_for_writable = false;
const resp = this.resp;
const route = this.route;
route.onResponseComplete(resp);
log("endWithoutBody", .{});
resp.endWithoutBody(resp.shouldCloseConnection());
}
if (!this.state.has_reader_closed) {
this.reader.close();
return;
}
this.deinit();
// deref this indicates the main thing is done, the reader may be holding a ref and will be derefed in onReaderDone/onReaderError
this.deref();
}
fn onAborted(this: *StreamTransfer, _: AnyResponse) void {
log("onAborted", .{});
var scope: DeinitScope = undefined;
scope.enter(this);
defer scope.exit();
this.state.has_ended_response = true;
this.finish();
}
fn deinit(this: *StreamTransfer) void {
if (this.defer_deinit) |defer_deinit| {
defer_deinit.* = true;
log("deinit deferred", .{});
return;
}
pub fn deinit(this: *StreamTransfer) void {
log("deinit", .{});
// deinit will close the reader if it is not already closed (this will not trigger onReaderDone/onReaderError)
this.reader.deinit();
bun.destroy(this);
}
};
const DeinitScope = struct {
stream: *StreamTransfer,
prev_defer_deinit: ?*bool,
deinit_called: bool = false,
/// This has to be an instance method to avoid a use-after-stack.
pub fn enter(this: *DeinitScope, stream: *StreamTransfer) void {
this.stream = stream;
this.deinit_called = false;
this.prev_defer_deinit = this.stream.defer_deinit;
if (this.prev_defer_deinit == null) {
this.stream.defer_deinit = &this.deinit_called;
}
}
pub fn exit(this: *DeinitScope) void {
if (this.prev_defer_deinit == null and &this.deinit_called == this.stream.defer_deinit) {
this.stream.defer_deinit = this.prev_defer_deinit;
if (this.deinit_called) {
this.stream.deinit();
}
}
}
};
const RefCount = bun.ptr.RefCount(@This(), "ref_count", deinit, .{});
pub const ref = RefCount.ref;
pub const deref = RefCount.deref;

View File

@@ -470,6 +470,15 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
}
pub fn forceClose(this: *RequestContext) void {
if (this.resp) |resp| {
defer this.deref();
this.detachResponse();
this.endRequestStreamingAndDrain();
resp.forceClose();
}
}
pub fn onWritableResponseBuffer(this: *RequestContext, _: u64, resp: *App.Response) bool {
ctxLog("onWritableResponseBuffer", .{});

View File

@@ -24,6 +24,10 @@ pub fn NewResponse(ssl_flag: i32) type {
return @as(*c.uws_res, @ptrCast(@alignCast(res)));
}
pub inline fn downcastSocket(res: *Response) *bun.uws.us_socket_t {
return @as(*bun.uws.us_socket_t, @ptrCast(@alignCast(res)));
}
pub fn end(res: *Response, data: []const u8, close_connection: bool) void {
c.uws_res_end(ssl_flag, res.downcast(), data.ptr, data.len, close_connection);
}
@@ -461,6 +465,13 @@ pub const AnyResponse = union(enum) {
}
}
pub fn forceClose(this: AnyResponse) void {
switch (this) {
.SSL => |resp| resp.downcastSocket().close(true, .failure),
.TCP => |resp| resp.downcastSocket().close(false, .failure),
}
}
pub fn onWritable(this: AnyResponse, comptime UserDataType: type, comptime handler: fn (UserDataType, u64, AnyResponse) bool, optional_data: UserDataType) void {
const wrapper = struct {
pub fn ssl_handler(user_data: UserDataType, offset: u64, resp: *uws.NewApp(true).Response) bool {

View File

@@ -1,6 +1,6 @@
import type { Server } from "bun";
import { afterAll, beforeAll, describe, expect, it, mock, test } from "bun:test";
import { isWindows, rmScope, tempDirWithFiles } from "harness";
import { rmScope, tempDirWithFiles } from "harness";
import { unlinkSync } from "node:fs";
import { join } from "node:path";
@@ -373,8 +373,8 @@ describe("Bun.file in serve routes", () => {
test.each(["hello.txt", "large.txt"])(
"concurrent requests for %s",
async filename => {
const batchSize = isWindows ? 8 : 32;
const iterations = isWindows ? 2 : 5;
const batchSize = 16;
const iterations = 10;
async function iterate() {
const promises = Array.from({ length: batchSize }, () =>
@@ -388,9 +388,10 @@ describe("Bun.file in serve routes", () => {
// Verify all responses are identical
const expected = results[0];
results.forEach(result => {
for (const result of results) {
expect(result?.length).toBe(expected.length);
expect(result).toBe(expected);
});
}
}
for (let i = 0; i < iterations; i++) {
@@ -398,7 +399,7 @@ describe("Bun.file in serve routes", () => {
Bun.gc();
}
},
30000,
60000,
);
it("memory usage stays reasonable", async () => {