From 0a42ac0deda782847fdec70047b3d69f42740fe2 Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Thu, 22 Feb 2024 08:57:36 -0300 Subject: [PATCH] more stable stream and now Content-Range pass --- src/bun.js/api/server.zig | 25 ++++++++++++++----------- src/bun.js/webcore/blob/ReadFile.zig | 21 +++++++++------------ src/bun.js/webcore/body.zig | 7 ++++--- src/bun.js/webcore/streams.zig | 5 +++-- 4 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index e9ae8c8d93..32ae10be2c 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -2140,11 +2140,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.blob == .Blob) { const original_size = this.blob.Blob.size; - + // if we dont know the size we use the stat size this.blob.Blob.size = if (original_size == 0 or original_size == Blob.max_size) stat_size - else - @min(original_size, stat_size); + else // the blob can be a slice of a file + @max(original_size, stat_size); } if (!this.flags.has_written_status) @@ -2158,6 +2158,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp .auto_close = false, .socket_fd = bun.invalid_fd, }; + this.response_buf_owned = .{ .items = result.result.buf, .capacity = result.result.buf.len }; this.resp.?.runCorkedWithType(*RequestContext, renderResponseBufferAndMetadata, this); } @@ -2196,7 +2197,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp var this = pair.this; var stream = pair.stream; if (this.resp == null or this.flags.aborted) { - stream.value.unprotect(); + // stream.value.unprotect(); this.finalizeForAbort(); return; } @@ -2264,7 +2265,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp response_stream.sink.destroy(); this.endStream(this.shouldCloseConnection()); this.finalize(); - stream.value.unprotect(); + // stream.value.unprotect(); return; } @@ -2293,6 +2294,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp .global = globalThis, }, }; + stream.incrementCount(); assignment_result.then( globalThis, this, @@ -2304,13 +2306,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, .Fulfilled => { streamLog("promise Fulfilled", .{}); - defer stream.value.unprotect(); + // defer stream.value.unprotect(); this.handleResolveStream(); }, .Rejected => { streamLog("promise Rejected", .{}); - defer stream.value.unprotect(); + // defer stream.value.unprotect(); this.handleRejectStream(globalThis, promise.result(globalThis.vm())); }, @@ -2330,7 +2332,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.flags.aborted) { response_stream.detach(); stream.cancel(globalThis); - defer stream.value.unprotect(); + // defer stream.value.unprotect(); response_stream.sink.markDone(); this.finalizeForAbort(); response_stream.sink.onFirstWrite = null; @@ -2340,7 +2342,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } stream.value.ensureStillAlive(); - defer stream.value.unprotect(); + // defer stream.value.unprotect(); const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and response_stream.sink.buffer.len == 0); @@ -2691,7 +2693,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE))), .message = bun.String.static("Stream already used, please create a new one"), }; - stream.value.unprotect(); + // stream.value.unprotect(); this.runErrorHandler(err.toErrorInstance(this.server.globalThis)); return; } @@ -3046,7 +3048,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp var response: *JSC.WebCore.Response = this.response_ptr.?; var status = response.statusCode(); - var needs_content_range = this.flags.needs_content_range and this.sendfile.remain <= this.blob.size(); + var needs_content_range = this.flags.needs_content_range and this.sendfile.remain < this.blob.size(); + const size = if (needs_content_range) this.sendfile.remain else diff --git a/src/bun.js/webcore/blob/ReadFile.zig b/src/bun.js/webcore/blob/ReadFile.zig index bd1d7907ba..4739979ca7 100644 --- a/src/bun.js/webcore/blob/ReadFile.zig +++ b/src/bun.js/webcore/blob/ReadFile.zig @@ -63,6 +63,7 @@ pub const ReadFile = struct { store: ?*Store = null, offset: SizeType = 0, max_length: SizeType = Blob.max_size, + total_size: SizeType = Blob.max_size, opened_fd: bun.FileDescriptor = invalid_fd, read_off: SizeType = 0, read_eof: bool = false, @@ -287,7 +288,6 @@ pub const ReadFile = struct { const buf = this.buffer.items; defer store.deref(); - const total_size = this.size; const system_error = this.system_error; bun.destroy(this); @@ -296,7 +296,7 @@ pub const ReadFile = struct { return; } - cb(cb_ctx, .{ .result = .{ .buf = buf, .total_size = total_size, .is_temporary = true } }); + cb(cb_ctx, .{ .result = .{ .buf = buf, .total_size = this.total_size, .is_temporary = true } }); } pub fn run(this: *ReadFile, task: *ReadFileTask) void { @@ -368,12 +368,10 @@ pub const ReadFile = struct { } this.could_block = !bun.isRegularFile(stat.mode); + this.total_size = @truncate(@as(SizeType, @intCast(@max(@as(i64, @intCast(stat.size)), 0)))); if (stat.size > 0 and !this.could_block) { - this.size = @min( - @as(SizeType, @truncate(@as(SizeType, @intCast(@max(@as(i64, @intCast(stat.size)), 0))))), - this.max_length, - ); + this.size = @min(this.total_size, this.max_length); // read up to 4k at a time if // they didn't explicitly set a size and we're reading from something that's not a regular file } else if (stat.size == 0 and this.could_block) { @@ -556,6 +554,7 @@ pub const ReadFileUV = struct { store: *Store, offset: SizeType = 0, max_length: SizeType = Blob.max_size, + total_size: SizeType = Blob.max_size, opened_fd: bun.FileDescriptor = invalid_fd, read_len: SizeType = 0, read_off: SizeType = 0, @@ -602,9 +601,8 @@ pub const ReadFileUV = struct { cb(cb_ctx, ReadFile.ResultType{ .err = err }); return; } - const size = this.size; - cb(cb_ctx, .{ .result = .{ .buf = buf, .total_size = size, .is_temporary = true } }); + cb(cb_ctx, .{ .result = .{ .buf = buf, .total_size = this.total_size, .is_temporary = true } }); } pub fn isAllowedToClose(this: *const ReadFileUV) bool { @@ -617,6 +615,7 @@ pub const ReadFileUV = struct { const needs_close = fd != bun.invalid_fd; this.size = @max(this.read_len, this.size); + this.total_size = @max(this.total_size, this.size); if (needs_close) { if (this.doClose(this.isAllowedToClose())) { @@ -678,13 +677,11 @@ pub const ReadFileUV = struct { this.onFinish(); return; } + this.total_size = @truncate(@as(SizeType, @intCast(@max(@as(i64, @intCast(stat.size)), 0)))); this.could_block = !bun.isRegularFile(stat.mode); if (stat.size > 0 and !this.could_block) { - this.size = @min( - @as(SizeType, @truncate(@as(SizeType, @intCast(@max(@as(i64, @intCast(stat.size)), 0))))), - this.max_length, - ); + this.size = @min(this.total_size, this.max_length); // read up to 4k at a time if // they didn't explicitly set a size and we're reading from something that's not a regular file } else if (stat.size == 0 and this.could_block) { diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig index 4babc8745b..4b76f1e0ee 100644 --- a/src/bun.js/webcore/body.zig +++ b/src/bun.js/webcore/body.zig @@ -393,6 +393,7 @@ pub const Body = struct { .global = globalThis, }, }; + this.Locked.readable.?.incrementCount(); return value; @@ -442,7 +443,7 @@ pub const Body = struct { .ptr = .{ .Bytes = &reader.context }, .value = reader.toReadableStream(globalThis), }; - locked.readable.?.value.protect(); + locked.readable.?.incrementCount(); if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| { onReadableStreamAvailable(locked.task.?, locked.readable.?); @@ -1360,12 +1361,12 @@ pub const BodyValueBufferer = struct { ); }, .Fulfilled => { - defer stream.value.unprotect(); + // defer stream.value.unprotect(); sink.handleResolveStream(false); }, .Rejected => { - defer stream.value.unprotect(); + // defer stream.value.unprotect(); sink.handleRejectStream(promise.result(globalThis.vm()), false); }, diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 0bb0b681ca..a11f69e306 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -52,6 +52,7 @@ pub const ReadableStream = struct { ptr: Source, pub fn incrementCount(this: *const ReadableStream) void { + this.value.protect(); switch (this.ptr) { .Blob => |blob| blob.parent().incrementCount(), .File => |file| file.parent().incrementCount(), @@ -161,13 +162,13 @@ pub const ReadableStream = struct { pub fn cancel(this: *const ReadableStream, globalThis: *JSGlobalObject) void { JSC.markBinding(@src()); ReadableStream__cancel(this.value, globalThis); - this.value.unprotect(); + this.detachIfPossible(globalThis); } pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void { JSC.markBinding(@src()); ReadableStream__cancel(this.value, globalThis); - this.value.unprotect(); + this.detachIfPossible(globalThis); } pub fn forceDetach(this: *const ReadableStream, globalObject: *JSGlobalObject) void {