From 39ecac0a99554c7612ed21f124f6389fbca68e3f Mon Sep 17 00:00:00 2001 From: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Date: Thu, 8 Feb 2024 22:54:29 -0800 Subject: [PATCH] more --- src/bun.js/api/bun/subprocess.zig | 10 ++ src/bun.js/webcore/streams.zig | 69 +++++++++---- src/io/PipeReader.zig | 161 +++++++++++++++++------------- 3 files changed, 150 insertions(+), 90 deletions(-) diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 1b123f8a98..85e8568a5f 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -430,6 +430,16 @@ pub const Subprocess = struct { this.* = .{ .closed = {} }; return pipe.toJS(globalThis); }, + .buffer => |buffer| { + defer this.* = .{ .closed = {} }; + + if (buffer.len == 0) { + return JSC.WebCore.ReadableStream.empty(globalThis); + } + + const blob = JSC.WebCore.Blob.init(buffer, bun.default_allocator, globalThis); + return JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, 0); + }, else => { return JSValue.jsUndefined(); }, diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 22e0b71d8f..6bfd5a6cd7 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -319,6 +319,12 @@ pub const ReadableStream = struct { return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@intFromEnum(id))); } + pub fn fromOwnedSlice(globalThis: *JSGlobalObject, bytes: []u8) JSC.JSValue { + JSC.markBinding(@src()); + var stream = ByteStream.new(globalThis, bytes); + return stream.toJS(globalThis); + } + pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { JSC.markBinding(@src()); var store = blob.store orelse { @@ -3016,6 +3022,7 @@ pub const FileReader = struct { lazy: Lazy = .{ .none = {} }, buffered: std.ArrayListUnmanaged(u8) = .{}, read_inside_on_pull: ReadDuringJSOnPullResult = .{ .none = {} }, + highwater_mark: usize = 16384, pub const IOReader = bun.io.BufferedReader; pub const Poll = IOReader; @@ -3026,6 +3033,7 @@ pub const FileReader = struct { js: []u8, amount_read: usize, temporary: []const u8, + use_buffered: usize, }; pub const Lazy = union(enum) { @@ -3099,24 +3107,31 @@ pub const FileReader = struct { this.pending_value.deinit(); } - pub fn onReadChunk(this: *@This(), buf: []const u8) void { + pub fn onReadChunk(this: *@This(), buf: []const u8, hasMore: bool) bool { log("onReadChunk() = {d}", .{buf.len}); if (this.done) { this.reader.close(); - return; + return false; } if (this.read_inside_on_pull != .none) { switch (this.read_inside_on_pull) { .js => |in_progress| { - if (in_progress.len >= buf.len) { + if (in_progress.len >= buf.len and !hasMore) { @memcpy(in_progress[0..buf.len], buf); - this.read_inside_on_pull = .{ .amount_read = buf.len }; - } else { + this.read_inside_on_pull = .{ .js = in_progress[buf.len..] }; + } else if (in_progress.len > 0 and !hasMore) { this.read_inside_on_pull = .{ .temporary = buf }; + } else if (hasMore and !bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) { + this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory(); + this.read_inside_on_pull = .{ .use_buffered = buf.len }; } }, + .use_buffered => |original| { + this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory(); + this.read_inside_on_pull = .{ .use_buffered = buf.len + original }; + }, .none => unreachable, else => @panic("Invalid state"), } @@ -3129,7 +3144,7 @@ pub const FileReader = struct { this.reader.close(); this.done = true; this.pending.run(); - return; + return false; } if (this.pending_view.len >= buf.len) { @@ -3145,15 +3160,13 @@ pub const FileReader = struct { this.pending_value.clear(); this.pending_view = &.{}; this.pending.run(); - return; - } - } else if (!bun.isSliceInBuffer(buf, this.reader.buffer().allocatedSlice())) { - if (this.reader.isDone() and this.reader.buffer().capacity == 0) { - this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory(); - } else { - this.reader.buffer().appendSlice(buf) catch bun.outOfMemory(); + return false; } + } else if (!bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) { + this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory(); } + + return this.read_inside_on_pull != .temporary and this.buffered.items.len + this.reader.buffer().items.len < this.highwater_mark; } pub fn onPull(this: *FileReader, buffer: []u8, array: JSC.JSValue) StreamResult { @@ -3161,17 +3174,15 @@ pub const FileReader = struct { defer array.ensureStillAlive(); const drained = this.drain(); - log("onPull({d}) = {d}", .{ buffer.len, drained.len }); - if (drained.len > 0) { + log("onPull({d}) = {d}", .{ buffer.len, drained.len }); + this.pending_value.clear(); this.pending_view = &.{}; if (buffer.len >= @as(usize, drained.len)) { @memcpy(buffer[0..drained.len], drained.slice()); - - // give it back! - this.reader.buffer().* = drained.listManaged(bun.default_allocator); + this.buffered.clearAndFree(bun.default_allocator); if (this.reader.isDone()) { return .{ .into_array_and_done = .{ .value = array, .len = drained.len } }; @@ -3194,9 +3205,14 @@ pub const FileReader = struct { if (!this.reader.hasPendingRead()) { this.read_inside_on_pull = .{ .js = buffer }; this.reader.read(); + defer this.read_inside_on_pull = .{ .none = {} }; switch (this.read_inside_on_pull) { - .amount_read => |amount_read| { + .js => |remaining_buf| { + const amount_read = buffer.len - remaining_buf.len; + + log("onPull({d}) = {d}", .{ buffer.len, amount_read }); + if (amount_read > 0) { if (this.reader.isDone()) { return .{ .into_array_and_done = .{ .value = array, .len = @truncate(amount_read) } }; @@ -3210,16 +3226,29 @@ pub const FileReader = struct { } }, .temporary => |buf| { + log("onPull({d}) = {d}", .{ buffer.len, buf.len }); if (this.reader.isDone()) { return .{ .temporary_and_done = bun.ByteList.init(buf) }; } return .{ .temporary = bun.ByteList.init(buf) }; }, + .use_buffered => { + const buffered = this.buffered; + this.buffered = .{}; + log("onPull({d}) = {d}", .{ buffer.len, buffered.items.len }); + if (this.reader.isDone()) { + return .{ .owned_and_done = bun.ByteList.init(buffered.items) }; + } + + return .{ .owned = bun.ByteList.init(buffered.items) }; + }, else => {}, } if (this.reader.isDone()) { + log("onPull({d}) = done", .{buffer.len}); + return .{ .done = {} }; } } @@ -3227,6 +3256,8 @@ pub const FileReader = struct { this.pending_value.set(this.parent().globalThis, array); this.pending_view = buffer; + log("onPull({d}) = pending", .{buffer.len}); + return .{ .pending = &this.pending }; } diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 6d7629c1c1..42759a6b97 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -7,7 +7,7 @@ pub fn PosixPipeReader( comptime vtable: struct { getFd: *const fn (*This) bun.FileDescriptor, getBuffer: *const fn (*This) *std.ArrayList(u8), - onReadChunk: ?*const fn (*This, chunk: []u8) void = null, + onReadChunk: ?*const fn (*This, chunk: []u8, hasMore: bool) void = null, registerPoll: ?*const fn (*This) void = null, done: *const fn (*This) void, onError: *const fn (*This, bun.sys.Error) void, @@ -40,18 +40,20 @@ pub fn PosixPipeReader( pub fn onPoll(parent: *This, size_hint: isize) void { const resizable_buffer = vtable.getBuffer(parent); const fd = vtable.getFd(parent); - + bun.sys.syslog("onPoll({d}) = {d}", .{ fd, size_hint }); readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint); } const stack_buffer_len = 64 * 1024; - inline fn drainChunk(parent: *This, chunk: []const u8) void { + inline fn drainChunk(parent: *This, chunk: []const u8, hasMore: bool) bool { if (parent.vtable.isStreamingEnabled()) { if (chunk.len > 0) { - parent.vtable.onReadChunk(chunk); + return parent.vtable.onReadChunk(chunk, hasMore); } } + + return false; } // On Linux, we use preadv2 to read without blocking. @@ -79,19 +81,15 @@ pub fn PosixPipeReader( stack_buffer_head = stack_buffer_head[bytes_read..]; if (bytes_read == 0) { - drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]); + drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false); close(parent); return; } - - if (streaming) { - parent.vtable.onReadChunk(buffer); - } }, .err => |err| { if (err.isRetry()) { resizable_buffer.appendSlice(buffer) catch bun.outOfMemory(); - drainChunk(parent, resizable_buffer.items[0..resizable_buffer.items.len]); + drainChunk(parent, resizable_buffer.items[0..resizable_buffer.items.len], false); if (comptime vtable.registerPoll) |register| { register(parent); @@ -152,61 +150,71 @@ pub fn PosixPipeReader( } fn readFromBlockingPipeWithoutBlockingPOSIX(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize) void { - if (size_hint > stack_buffer_len) { - resizable_buffer.ensureUnusedCapacity(@intCast(size_hint)) catch bun.outOfMemory(); - } + _ = size_hint; // autofix const start_length: usize = resizable_buffer.items.len; const streaming = parent.vtable.isStreamingEnabled(); - if (streaming and resizable_buffer.capacity == 0) { + if (streaming) { const stack_buffer = parent.vtable.eventLoop().pipeReadBuffer(); - var stack_buffer_head = stack_buffer; + while (resizable_buffer.capacity == 0) { + var stack_buffer_head = stack_buffer; + while (stack_buffer_head.len > 16 * 1024) { + var buffer = stack_buffer_head; - while (stack_buffer_head.len > 16 * 1024) { - var buffer = stack_buffer_head; + switch (bun.sys.readNonblocking( + fd, + buffer, + )) { + .result => |bytes_read| { + buffer = stack_buffer_head[0..bytes_read]; + stack_buffer_head = stack_buffer_head[bytes_read..]; - switch (bun.sys.readNonblocking( - fd, - buffer, - )) { - .result => |bytes_read| { - buffer = stack_buffer_head[0..bytes_read]; - stack_buffer_head = stack_buffer_head[bytes_read..]; - - if (bytes_read == 0) { - drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]); - close(parent); - return; - } - - switch (bun.isReadable(fd)) { - .ready, .hup => continue, - .not_ready => { - drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]); - if (comptime vtable.registerPoll) |register| { - register(parent); - } - return; - }, - } - }, - .err => |err| { - drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]); - - if (err.isRetry()) { - if (comptime vtable.registerPoll) |register| { - register(parent); + if (bytes_read == 0) { + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) + _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false); + close(parent); return; } - } - vtable.onError(parent, err); - return; - }, - } - } + }, + .err => |err| { + if (err.isRetry()) { + if (comptime vtable.registerPoll) |register| { + register(parent); + _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false); + return; + } + } - resizable_buffer.appendSlice(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]) catch bun.outOfMemory(); + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) + _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false); + vtable.onError(parent, err); + return; + }, + } + + switch (bun.isReadable(fd)) { + .ready, .hup => {}, + .not_ready => { + if (comptime vtable.registerPoll) |register| { + register(parent); + } + + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) + _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false); + return; + }, + } + } + + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) { + if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false)) { + return; + } + } + + if (!parent.vtable.isStreamingEnabled()) break; + } } while (true) { @@ -219,19 +227,27 @@ pub fn PosixPipeReader( resizable_buffer.items.len += bytes_read; if (bytes_read == 0) { - drainChunk(parent, resizable_buffer.items[start_length..]); + _ = drainChunk(parent, resizable_buffer.items[start_length..], false); close(parent); return; } - if (streaming) { - parent.vtable.onReadChunk(buffer); + switch (bun.isReadable(fd)) { + .ready, .hup => continue, + .not_ready => { + _ = drainChunk(parent, resizable_buffer.items[start_length..], false); + + if (comptime vtable.registerPoll) |register| { + register(parent); + } + return; + }, } }, .err => |err| { - if (err.isRetry()) { - drainChunk(parent, resizable_buffer.items[start_length..]); + _ = drainChunk(parent, resizable_buffer.items[start_length..], false); + if (err.isRetry()) { if (comptime vtable.registerPoll) |register| { register(parent); return; @@ -257,7 +273,7 @@ pub fn WindowsPipeReader( comptime This: type, comptime _: anytype, comptime getBuffer: fn (*This) *std.ArrayList(u8), - comptime onReadChunk: fn (*This, chunk: []u8) void, + comptime onReadChunk: fn (*This, chunk: []u8, bool) bool, comptime registerPoll: ?fn (*This) void, comptime done: fn (*This) void, comptime onError: fn (*This, bun.sys.Error) void, @@ -358,7 +374,7 @@ const BufferedReaderVTable = struct { } pub const Fn = struct { - onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void = null, + onReadChunk: ?*const fn (*anyopaque, chunk: []const u8, hasMore: bool) bool = null, onReaderDone: *const fn (*anyopaque) void, onReaderError: *const fn (*anyopaque, bun.sys.Error) void, loop: *const fn (*anyopaque) *Async.Loop, @@ -398,8 +414,12 @@ const BufferedReaderVTable = struct { return this.fns.onReadChunk != null; } - pub fn onReadChunk(this: @This(), chunk: []const u8) void { - this.fns.onReadChunk.?(this.parent, chunk); + /// When the reader has read a chunk of data + /// and hasMore is true, it means that there might be more data to read. + /// + /// Returning false prevents the reader from reading more data. + pub fn onReadChunk(this: @This(), chunk: []const u8, hasMore: bool) bool { + return this.fns.onReadChunk.?(this.parent, chunk, hasMore); } pub fn onReaderDone(this: @This()) void { @@ -464,8 +484,8 @@ const PosixBufferedReader = struct { .onError = @ptrCast(&onError), }); - fn _onReadChunk(this: *PosixBufferedReader, chunk: []u8) void { - this.vtable.onReadChunk(chunk); + fn _onReadChunk(this: *PosixBufferedReader, chunk: []u8, hasMore: bool) bool { + return this.vtable.onReadChunk(chunk, hasMore); } pub fn getFd(this: *PosixBufferedReader) bun.FileDescriptor { @@ -547,9 +567,8 @@ const PosixBufferedReader = struct { return .{ .result = {} }; } this.pollable = true; - this.handle = .{ .fd = fd }; - this.read(); + this.registerPoll(); return .{ .result = {}, @@ -642,11 +661,11 @@ pub const GenericWindowsBufferedReader = struct { return this.has_inflight_read; } - fn _onReadChunk(this: *WindowsOutputReader, buf: []u8) void { + fn _onReadChunk(this: *WindowsOutputReader, buf: []u8, hasMore: bool) bool { this.has_inflight_read = false; const onReadChunkFn = this.vtable.onReadChunk orelse return; - onReadChunkFn(this.parent() orelse return, buf); + return onReadChunkFn(this.parent() orelse return, buf, hasMore); } fn finish(this: *WindowsOutputReader) void { @@ -689,7 +708,7 @@ pub const GenericWindowsBufferedReader = struct { } }; -pub fn WindowsBufferedReader(comptime Parent: type, comptime onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void) type { +pub fn WindowsBufferedReader(comptime Parent: type, comptime onReadChunk: ?*const fn (*anyopaque, chunk: []const u8, more: bool) bool) type { return struct { reader: ?*GenericWindowsBufferedReader = null,