diff --git a/src/bun.js/webcore.zig b/src/bun.js/webcore.zig index 4d06003b50..ffc12ab992 100644 --- a/src/bun.js/webcore.zig +++ b/src/bun.js/webcore.zig @@ -359,10 +359,15 @@ pub const Prompt = struct { pub const Crypto = struct { const UUID = @import("./uuid.zig"); const BoringSSL = @import("boringssl"); - pub const Class = JSC.NewClass(void, .{ .name = "crypto" }, .{ - .getRandomValues = JSC.DOMCall("Crypto", @This(), "getRandomValues", JSC.JSValue, JSC.DOMEffect.top), - .randomUUID = JSC.DOMCall("Crypto", @This(), "randomUUID", *JSC.JSString, JSC.DOMEffect.top), - }, .{}); + pub const Class = JSC.NewClass( + void, + .{ .name = "crypto" }, + .{ + .getRandomValues = JSC.DOMCall("Crypto", @This(), "getRandomValues", JSC.JSValue, JSC.DOMEffect.top), + .randomUUID = JSC.DOMCall("Crypto", @This(), "randomUUID", *JSC.JSString, JSC.DOMEffect.top), + }, + .{}, + ); pub const Prototype = JSC.NewClass( void, .{ .name = "Crypto" }, diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index c72edd4372..88211614dc 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -759,13 +759,22 @@ pub const Signal = struct { ) VTable { const Functions = struct { fn onClose(this: *anyopaque, err: ?Syscall.Error) void { - Wrapped.close(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), err); + if (comptime !@hasDecl(Wrapped, "onClose")) + Wrapped.close(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), err) + else + Wrapped.onClose(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), err); } fn onReady(this: *anyopaque, amount: ?Blob.SizeType, offset: ?Blob.SizeType) void { - Wrapped.ready(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), amount, offset); + if (comptime !@hasDecl(Wrapped, "onReady")) + Wrapped.ready(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), amount, offset) + else + Wrapped.onReady(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), amount, offset); } fn onStart(this: *anyopaque) void { - Wrapped.start(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this))); + if (comptime !@hasDecl(Wrapped, "onStart")) + Wrapped.start(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this))) + else + Wrapped.onStart(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this))); } }; @@ -1019,6 +1028,7 @@ pub const FileSink = struct { pub usingnamespace NewReadyWatcher(@This(), .write, ready); + const max_fifo_size = 64 * 1024; pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) { var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; const auto_close = this.auto_close; @@ -1091,6 +1101,10 @@ pub const FileSink = struct { } pub fn flush(this: *FileSink) StreamResult.Writable { + return flushMaybePoll(this); + } + + pub fn flushMaybePoll(this: *FileSink) StreamResult.Writable { std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); var total: usize = this.written; @@ -1111,7 +1125,9 @@ pub const FileSink = struct { } } while (remain.len > 0) { - const res = JSC.Node.Syscall.write(fd, remain); + const max_to_write = if (std.os.S.ISFIFO(this.mode)) max_fifo_size else remain.len; + const write_buf = remain[0..@minimum(remain.len, max_to_write)]; + const res = JSC.Node.Syscall.write(fd, write_buf); if (res == .err) { const retry = std.os.E.AGAIN; @@ -1134,6 +1150,20 @@ pub const FileSink = struct { remain = remain[res.result..]; total += res.result; if (res.result == 0) break; + + // we flushed an entire fifo + // but we still have more + // lets wait for the next poll + if (std.os.S.ISFIFO(this.mode) and + max_to_write == max_fifo_size and + res.result == max_fifo_size and + remain.len > 0) + { + this.watch(this.fd); + return .{ + .pending = &this.pending, + }; + } } this.pending.result = .{ @@ -1192,6 +1222,7 @@ pub const FileSink = struct { } pub fn finalize(this: *FileSink) void { + this.signal.close(null); this.cleanup(); this.reachable_from_js = false; @@ -1225,7 +1256,7 @@ pub const FileSink = struct { } pub fn ready(this: *FileSink, _: i64) void { - _ = this.flush(); + _ = this.flushMaybePoll(); } pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable { @@ -1308,6 +1339,10 @@ pub const FileSink = struct { if (this.next) |*next| { return next.end(err); } + + if (this.requested_end or this.done) + return .{ .result = void{} }; + this.requested_end = true; const flushy = this.flush(); @@ -3006,6 +3041,7 @@ pub const FileBlobLoader = struct { total_read: Blob.SizeType = 0, finalized: bool = false, callback: anyframe = undefined, + buffered_data: bun.ByteList = .{}, pending: StreamResult.Pending = StreamResult.Pending{ .frame = undefined, .state = .none, @@ -3019,6 +3055,8 @@ pub const FileBlobLoader = struct { stored_global_this_: ?*JSC.JSGlobalObject = null, poll_ref: JSC.PollRef = .{}, has_adjusted_pipe_size_on_linux: bool = false, + finished: bool = false, + signal: JSC.WebCore.Signal = .{}, pub usingnamespace NewReadyWatcher(@This(), .read, ready); @@ -3042,6 +3080,25 @@ pub const FileBlobLoader = struct { }; } + pub fn finish(this: *FileBlobLoader) void { + if (this.finished) return; + this.finished = true; + + // we are done + // resolve any promises with done + // but there could still be data in the pipe + // so we shouldn't actually end it, just means that we no longer will retry on EGAGAIN + if (this.pending.state == .pending) { + if (this.buffered_data.len > 0) { + this.pending.result = .{ .owned = this.buffered_data }; + this.buffered_data = .{}; + } else { + this.pending.result = .{ .done = {} }; + } + this.pending.run(); + } + } + const Concurrent = struct { read: Blob.SizeType = 0, task: NetworkThread.Task = .{ .callback = Concurrent.taskCallback }, @@ -3202,8 +3259,11 @@ pub const FileBlobLoader = struct { NetworkThread.global.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 }); } - const default_fifo_chunk_size = 1024; + // macOS default pipe size is page_size, 16k, or 64k. It changes based on how much was written + // Linux default pipe size is 16 pages of memory + const default_fifo_chunk_size = 64 * 1024; const default_file_chunk_size = 1024 * 1024 * 2; + pub fn onStart(this: *FileBlobLoader) StreamStart { var file = &this.store.data.file; std.debug.assert(!this.started); @@ -3293,6 +3353,7 @@ pub const FileBlobLoader = struct { this.auto_close = auto_close; const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); + this.signal.start(); return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) }; } @@ -3316,6 +3377,13 @@ pub const FileBlobLoader = struct { const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); std.debug.assert(this.started); + if (this.buffered_data.len > 0) { + const data = this.buffered_data; + this.buffered_data.len = 0; + this.buffered_data.cap = 0; + return .{ .owned = data }; + } + switch (chunk_size) { 0 => { std.debug.assert(this.store.data.file.seekable orelse false); @@ -3371,7 +3439,6 @@ pub const FileBlobLoader = struct { if (owned) { return .{ .owned_and_done = bun.ByteList.init(buf) }; } - return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result), .value = view } }; } @@ -3392,6 +3459,10 @@ pub const FileBlobLoader = struct { std.debug.assert(this.started); std.debug.assert(read_buf.len > 0); + if (this.fd == std.math.maxInt(JSC.Node.FileDescriptor)) { + return .{ .done = {} }; + } + var buf_to_use = read_buf; var free_buffer_on_error: bool = false; @@ -3467,6 +3538,10 @@ pub const FileBlobLoader = struct { switch (err.getErrno()) { retry => { + if (this.finished) { + return .{ .done = {} }; + } + if (free_buffer_on_error) { bun.default_allocator.free(buf_to_use); buf_to_use = read_buf; @@ -3562,7 +3637,16 @@ pub const FileBlobLoader = struct { if (this.finalized) return; - this.unwatch(this.fd); + this.signal.close(null); + + if (this.buffered_data.cap > 0) { + this.buffered_data.listManaged(bun.default_allocator).deinit(); + } + + this.finished = true; + + if (this.poll_ref.isActive()) + this.unwatch(this.fd); this.finalized = true; this.pending.result = .{ .done = {} };