From cb63d2bf69672400cb8a3dc04f2f478480032aec Mon Sep 17 00:00:00 2001 From: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Date: Tue, 30 Jan 2024 10:04:03 -0800 Subject: [PATCH] wip --- src/async/posix_event_loop.zig | 4 +- src/bun.js/api/bun/process.zig | 26 ++-- src/bun.js/api/bun/subprocess.zig | 192 ++++++++++++++++++----- src/bun.js/base.zig | 1 + src/bun.js/webcore/streams.zig | 3 +- src/install/lifecycle_script_runner.zig | 136 +---------------- src/io/PipeReader.zig | 193 ++++++++++++++++++++++-- src/io/io.zig | 1 + src/shell/subproc.zig | 2 +- 9 files changed, 365 insertions(+), 193 deletions(-) diff --git a/src/async/posix_event_loop.zig b/src/async/posix_event_loop.zig index bffdfc7252..33e9c19c4a 100644 --- a/src/async/posix_event_loop.zig +++ b/src/async/posix_event_loop.zig @@ -166,7 +166,7 @@ pub const FilePoll = struct { const Process = bun.spawn.Process; const Subprocess = JSC.Subprocess; const BufferedInput = Subprocess.BufferedInput; - const BufferedOutput = Subprocess.BufferedOutput; + const BufferedOutput = Subprocess.StreamingOutput; const DNSResolver = JSC.DNS.DNSResolver; const GetAddrInfoRequest = JSC.DNS.GetAddrInfoRequest; const Deactivated = opaque { @@ -376,7 +376,7 @@ pub const FilePoll = struct { loader.onMachportChange(); }, - @field(Owner.Tag, "PosixOutputReader") => { + @field(Owner.Tag, bun.meta.typeBaseName(@typeName(LifecycleScriptSubprocessOutputReader))) => { log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) OutputReader", .{poll.fd}); var output: *LifecycleScriptSubprocessOutputReader = ptr.as(LifecycleScriptSubprocessOutputReader); output.onPoll(size_or_offset); diff --git a/src/bun.js/api/bun/process.zig b/src/bun.js/api/bun/process.zig index 59b77e760d..54f327322d 100644 --- a/src/bun.js/api/bun/process.zig +++ b/src/bun.js/api/bun/process.zig @@ -185,12 +185,18 @@ pub const Process = struct { return this.status.signalCode(); } - pub fn wait(this: *Process, sync: bool) void { + pub fn waitPosix(this: *Process, sync: bool) void { var rusage = std.mem.zeroes(Rusage); const waitpid_result = PosixSpawn.wait4(this.pid, if (sync) 0 else std.os.W.NOHANG, &rusage); this.onWaitPid(&waitpid_result, &rusage); } + pub fn wait(this: *Process, sync: bool) void { + if (comptime Environment.isPosix) { + this.waitPosix(sync); + } else if (comptime Environment.isWindows) {} + } + pub fn onWaitPidFromWaiterThread(this: *Process, waitpid_result: *const JSC.Maybe(PosixSpawn.WaitPidResult)) void { if (comptime Environment.isWindows) { @compileError("not implemented on this platform"); @@ -526,19 +532,19 @@ pub const PollerPosix = union(enum) { detached: void, pub fn deinit(this: *PollerPosix) void { - if (this.poller == .fd) { - this.poller.fd.deinit(); - } else if (this.poller == .waiter_thread) { - this.poller.waiter_thread.disable(); + if (this.* == .fd) { + this.fd.deinit(); + } else if (this.* == .waiter_thread) { + this.waiter_thread.disable(); } } pub fn enableKeepingEventLoopAlive(this: *Poller, event_loop: JSC.EventLoopHandle) void { switch (this.*) { .fd => |poll| { - poll.enableKeepingEventLoopAlive(event_loop); + poll.enableKeepingProcessAlive(event_loop); }, - .waiter_thread => |waiter| { + .waiter_thread => |*waiter| { waiter.ref(event_loop); }, else => {}, @@ -548,9 +554,9 @@ pub const PollerPosix = union(enum) { pub fn disableKeepingEventLoopAlive(this: *PollerPosix, event_loop: JSC.EventLoopHandle) void { switch (this.*) { .fd => |poll| { - poll.disableKeepingEventLoopAlive(event_loop); + poll.disableKeepingProcessAlive(event_loop); }, - .waiter_thread => |waiter| { + .waiter_thread => |*waiter| { waiter.unref(event_loop); }, else => {}, @@ -559,7 +565,7 @@ pub const PollerPosix = union(enum) { pub fn hasRef(this: *const PollerPosix) bool { return switch (this.*) { - .fd => this.fd.hasRef(), + .fd => this.fd.canEnableKeepingProcessAlive(), .waiter_thread => this.waiter_thread.isActive(), else => false, }; diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 02120a362b..5cdbfa312f 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -122,6 +122,11 @@ pub const Subprocess = struct { const log = Output.scoped(.Subprocess, false); pub usingnamespace JSC.Codegen.JSSubprocess; const default_max_buffer_size = 1024 * 1024 * 4; + pub const StdioKind = enum { + stdin, + stdout, + stderr, + }; process: *Process = undefined, closed_streams: u8 = 0, deinit_onclose: bool = false, @@ -141,11 +146,7 @@ pub const Subprocess = struct { stderr, stdio, }) = .{}, - closed: std.enums.EnumSet(enum { - stdin, - stdout, - stderr, - }) = .{}, + closed: std.enums.EnumSet(StdioKind) = .{}, has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true), this_jsvalue: JSC.JSValue = .zero, @@ -276,6 +277,7 @@ pub const Subprocess = struct { const Readable = union(enum) { fd: bun.FileDescriptor, memfd: bun.FileDescriptor, + sync_buffered_output: *BufferedOutput, pipe: Pipe, inherit: void, @@ -318,7 +320,7 @@ pub const Subprocess = struct { pub const Pipe = union(enum) { stream: JSC.WebCore.ReadableStream, - buffer: BufferedOutput, + buffer: StreamingOutput, detached: void, pub fn finish(this: *@This()) void { @@ -370,7 +372,7 @@ pub const Subprocess = struct { .pipe => brk: { break :brk .{ .pipe = .{ - .buffer = BufferedOutput.initWithPipeAndAllocator(allocator, pipe, max_size), + .buffer = StreamingOutput.initWithPipeAndAllocator(allocator, pipe, max_size), }, }; }, @@ -379,24 +381,26 @@ pub const Subprocess = struct { .memfd => Readable{ .memfd = stdio.memfd }, .array_buffer => Readable{ .pipe = .{ - .buffer = BufferedOutput.initWithPipeAndSlice(pipe, stdio.array_buffer.slice()), + .buffer = StreamingOutput.initWithPipeAndSlice(pipe, stdio.array_buffer.slice()), }, }, }; } - pub fn init(stdio: Stdio, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32) Readable { + pub fn init(stdio: Stdio, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32, is_sync: bool) Readable { if (comptime Environment.allow_assert) { if (fd) |fd_| { std.debug.assert(fd_ != bun.invalid_fd); } } + return switch (stdio) { .inherit => Readable{ .inherit = {} }, .ignore => Readable{ .ignore = {} }, .pipe => brk: { + if (is_sync) {} break :brk .{ .pipe = .{ - .buffer = BufferedOutput.initWithAllocator(allocator, fd.?, max_size), + .buffer = StreamingOutput.initWithAllocator(allocator, fd.?, max_size), }, }; }, @@ -405,7 +409,7 @@ pub const Subprocess = struct { .memfd => Readable{ .memfd = stdio.memfd }, .array_buffer => Readable{ .pipe = .{ - .buffer = BufferedOutput.initWithSlice(fd.?, stdio.array_buffer.slice()), + .buffer = StreamingOutput.initWithSlice(fd.?, stdio.array_buffer.slice()), }, }, }; @@ -499,6 +503,13 @@ pub const Subprocess = struct { this.* = .{ .closed = {} }; return JSC.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis); }, + .sync_buffered_output => |*sync_buffered_output| { + const slice = sync_buffered_output.toOwnedSlice(globalThis); + this.* = .{ .closed = {} }; + return JSC.MarkedArrayBuffer + .fromBytes(slice, bun.default_allocator, .Uint8Array) + .toNodeBuffer(globalThis); + }, .pipe => { if (!Environment.isWindows) { this.pipe.buffer.stream.close_on_empty_read = true; @@ -819,6 +830,67 @@ pub const Subprocess = struct { }; pub const BufferedOutput = struct { + reader: bun.io.BufferedOutputReader(BufferedOutput, null) = .{}, + process: *Subprocess = undefined, + event_loop: *JSC.EventLoop = undefined, + ref_count: u32 = 1, + + pub usingnamespace bun.NewRefCounted(@This(), deinit); + + pub fn onOutputDone(this: *BufferedOutput) void { + this.process.onCloseIO(this.kind()); + } + + pub fn toOwnedSlice(this: *BufferedOutput) []u8 { + // we do not use .toOwnedSlice() because we don't want to reallocate memory. + const out = this.reader.buffer.items; + this.reader.buffer.items = &.{}; + this.reader.buffer.capacity = 0; + return out; + } + + pub fn onOutputError(this: *BufferedOutput, err: bun.sys.Error) void { + _ = this; // autofix + Output.panic("BufferedOutput should never error. If it does, it's a bug in the code.\n{}", .{err}); + } + + fn kind(this: *const BufferedOutput) StdioKind { + if (this.process.stdout == .sync_buffered_output and this.process.stdout.sync_buffered_output == this) { + // are we stdout? + return .stdout; + } else if (this.process.stderr == .sync_buffered_output and this.process.stderr.sync_buffered_output == this) { + // are we stderr? + return .stderr; + } + + @panic("We should be either stdout or stderr"); + } + + pub fn close(this: *BufferedOutput) void { + if (!this.reader.is_done) + this.reader.close(); + } + + pub fn eventLoop(this: *BufferedOutput) *JSC.EventLoop { + return this.event_loop; + } + + pub fn loop(this: *BufferedOutput) *uws.Loop { + return this.event_loop.virtual_machine.uwsLoop(); + } + + fn deinit(this: *BufferedOutput) void { + std.debug.assert(this.reader.is_done); + + if (comptime Environment.isWindows) { + std.debug.assert(this.reader.pipe.isClosed()); + } + + this.destroy(); + } + }; + + pub const StreamingOutput = struct { internal_buffer: bun.ByteList = .{}, stream: FIFOType = undefined, auto_sizer: ?JSC.WebCore.AutoSizer = null, @@ -836,13 +908,13 @@ pub const Subprocess = struct { err: bun.sys.Error, }; - pub fn init(fd: bun.FileDescriptor) BufferedOutput { + pub fn init(fd: bun.FileDescriptor) StreamingOutput { if (Environment.isWindows) { @compileError("Cannot use BufferedOutput with fd on Windows please use .initWithPipe"); } std.debug.assert(fd != .zero and fd != bun.invalid_fd); - return BufferedOutput{ + return StreamingOutput{ .internal_buffer = .{}, .stream = JSC.WebCore.FIFO{ .fd = fd, @@ -850,18 +922,18 @@ pub const Subprocess = struct { }; } - pub fn initWithPipe(pipe: *uv.Pipe) BufferedOutput { + pub fn initWithPipe(pipe: *uv.Pipe) StreamingOutput { if (!Environment.isWindows) { @compileError("uv.Pipe can only be used on Windows"); } - return BufferedOutput{ .internal_buffer = .{}, .stream = pipe }; + return StreamingOutput{ .internal_buffer = .{}, .stream = pipe }; } - pub fn initWithSlice(fd: bun.FileDescriptor, slice: []u8) BufferedOutput { + pub fn initWithSlice(fd: bun.FileDescriptor, slice: []u8) StreamingOutput { if (Environment.isWindows) { @compileError("Cannot use BufferedOutput with fd on Windows please use .initWithPipeAndSlice"); } - return BufferedOutput{ + return StreamingOutput{ // fixed capacity .internal_buffer = bun.ByteList.initWithBuffer(slice), .auto_sizer = null, @@ -871,11 +943,11 @@ pub const Subprocess = struct { }; } - pub fn initWithPipeAndSlice(pipe: *uv.Pipe, slice: []u8) BufferedOutput { + pub fn initWithPipeAndSlice(pipe: *uv.Pipe, slice: []u8) StreamingOutput { if (!Environment.isWindows) { @compileError("uv.Pipe can only be used on Window"); } - return BufferedOutput{ + return StreamingOutput{ // fixed capacity .internal_buffer = bun.ByteList.initWithBuffer(slice), .auto_sizer = null, @@ -883,7 +955,7 @@ pub const Subprocess = struct { }; } - pub fn initWithAllocator(allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) BufferedOutput { + pub fn initWithAllocator(allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) StreamingOutput { if (Environment.isWindows) { @compileError("Cannot use BufferedOutput with fd on Windows please use .initWithPipeAndAllocator"); } @@ -896,7 +968,7 @@ pub const Subprocess = struct { return this; } - pub fn initWithPipeAndAllocator(allocator: std.mem.Allocator, pipe: *uv.Pipe, max_size: u32) BufferedOutput { + pub fn initWithPipeAndAllocator(allocator: std.mem.Allocator, pipe: *uv.Pipe, max_size: u32) StreamingOutput { if (!Environment.isWindows) { @compileError("uv.Pipe can only be used on Window"); } @@ -909,7 +981,7 @@ pub const Subprocess = struct { return this; } - pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void { + pub fn onRead(this: *StreamingOutput, result: JSC.WebCore.StreamResult) void { if (Environment.isWindows) { @compileError("uv.Pipe can only be used on Window"); } @@ -948,7 +1020,7 @@ pub const Subprocess = struct { } fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: uv.ReturnCodeI64, _: *const uv.uv_buf_t) callconv(.C) void { - const this: *BufferedOutput = @ptrCast(@alignCast(handle.data)); + const this: *StreamingOutput = @ptrCast(@alignCast(handle.data)); if (nread.int() == uv.UV_EOF) { this.status = .{ .done = {} }; _ = uv.uv_read_stop(@ptrCast(handle)); @@ -968,7 +1040,7 @@ pub const Subprocess = struct { } fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { - const this: *BufferedOutput = @ptrCast(@alignCast(handle.data)); + const this: *StreamingOutput = @ptrCast(@alignCast(handle.data)); var size: usize = 0; var available = this.internal_buffer.available(); if (this.auto_sizer) |auto_sizer| { @@ -994,11 +1066,11 @@ pub const Subprocess = struct { } } - pub fn readAll(this: *BufferedOutput) void { + pub fn readAll(this: *StreamingOutput) void { if (Environment.isWindows) { if (this.status == .pending) { this.stream.data = this; - _ = uv.uv_read_start(@ptrCast(this.stream), BufferedOutput.uvStreamAllocCallback, BufferedOutput.uvStreamReadCallback); + _ = uv.uv_read_start(@ptrCast(this.stream), StreamingOutput.uvStreamAllocCallback, StreamingOutput.uvStreamReadCallback); } return; } @@ -1079,25 +1151,25 @@ pub const Subprocess = struct { } } - fn watch(this: *BufferedOutput) void { + fn watch(this: *StreamingOutput) void { if (Environment.isWindows) { this.readAll(); } else { std.debug.assert(this.stream.fd != bun.invalid_fd); - this.stream.pending.set(BufferedOutput, this, onRead); + this.stream.pending.set(StreamingOutput, this, onRead); if (!this.stream.isWatching()) this.stream.watch(this.stream.fd); } return; } - pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob { + pub fn toBlob(this: *StreamingOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob { const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis); this.internal_buffer = bun.ByteList.init(""); return blob; } pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult { - const this = bun.cast(*BufferedOutput, ctx); + const this = bun.cast(*StreamingOutput, ctx); this.readAll(); const internal_buffer = this.internal_buffer; this.internal_buffer = bun.ByteList.init(""); @@ -1110,7 +1182,7 @@ pub const Subprocess = struct { }; } - fn signalStreamError(this: *BufferedOutput) void { + fn signalStreamError(this: *StreamingOutput) void { if (this.status == .err) { // if we are streaming update with error if (this.readable_stream_ref.get()) |readable| { @@ -1127,7 +1199,7 @@ pub const Subprocess = struct { this.readable_stream_ref.deinit(); } } - fn flushBufferedDataIntoReadableStream(this: *BufferedOutput) void { + fn flushBufferedDataIntoReadableStream(this: *StreamingOutput) void { if (this.readable_stream_ref.get()) |readable| { if (readable.ptr != .Bytes) return; @@ -1159,13 +1231,13 @@ pub const Subprocess = struct { } fn onReadableStreamAvailable(ctx: *anyopaque, readable: JSC.WebCore.ReadableStream) void { - const this = bun.cast(*BufferedOutput, ctx); + const this = bun.cast(*StreamingOutput, ctx); if (this.globalThis) |globalThis| { this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, globalThis) catch .{}; } } - fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream { + fn toReadableStream(this: *StreamingOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream { if (Environment.isWindows) { if (this.readable_stream_ref.get()) |readable| { return readable; @@ -1208,8 +1280,8 @@ pub const Subprocess = struct { .size_hint = 0, .task = this, .global = globalThis, - .onStartStreaming = BufferedOutput.onStartStreamingRequestBodyCallback, - .onReadableStreamAvailable = BufferedOutput.onReadableStreamAvailable, + .onStartStreaming = StreamingOutput.onStartStreamingRequestBodyCallback, + .onReadableStreamAvailable = StreamingOutput.onReadableStreamAvailable, }, }; return JSC.WebCore.ReadableStream.fromJS(body.toReadableStream(globalThis), globalThis).?; @@ -1238,12 +1310,12 @@ pub const Subprocess = struct { fn uvClosedCallback(handler: *anyopaque) callconv(.C) void { const event = bun.cast(*uv.Pipe, handler); - var this = bun.cast(*BufferedOutput, event.data); + var this = bun.cast(*StreamingOutput, event.data); this.readable_stream_ref.deinit(); this.closeCallback.run(); } - pub fn close(this: *BufferedOutput) void { + pub fn close(this: *StreamingOutput) void { switch (this.status) { .done => {}, .pending => { @@ -1341,6 +1413,10 @@ pub const Subprocess = struct { return Writable{ .pipe = sink }; }, + .sync_buffered_output => |buffer| { + _ = buffer; // autofix + @panic("This should never be called"); + }, .array_buffer, .blob => { var buffered_input: BufferedInput = .{ .fd = fd.?, .source = undefined }; switch (stdio) { @@ -1957,6 +2033,35 @@ pub const Subprocess = struct { return .zero; }; + if (comptime is_sync) { + if (stdio[1] == .pipe and stdio[1].pipe == null) { + stdio[1] = .{ .sync_buffered_output = BufferedOutput.new(.{}) }; + } + + if (stdio[2] == .pipe and stdio[2].pipe == null) { + stdio[2] = .{ .sync_buffered_output = BufferedOutput.new(.{}) }; + } + } else { + if (stdio[1] == .pipe and stdio[1].pipe == null) { + stdio[1] = .{ .buffer = {} }; + } + + if (stdio[2] == .pipe and stdio[2].pipe == null) { + stdio[2] = .{ .buffer = {} }; + } + } + defer { + if (comptime is_sync) { + if (stdio[1] == .sync_buffered_output) { + stdio[1].sync_buffered_output.deref(); + } + + if (stdio[2] == .sync_buffered_output) { + stdio[2].sync_buffered_output.deref(); + } + } + } + const spawn_options = bun.spawn.SpawnOptions{ .cwd = cwd, .detached = detached, @@ -1964,6 +2069,11 @@ pub const Subprocess = struct { .stdout = stdio[1].asSpawnOption(), .stderr = stdio[2].asSpawnOption(), .extra_fds = extra_fds.items, + + .windows = if (Environment.isWindows) bun.spawn.WindowsSpawnOptions.WindowsOptions{ + .hide_window = windows_hide, + .loop = jsc_vm.eventLoop().uws_loop, + } else {}, }; var spawned = switch (bun.spawn.spawnProcess( @@ -2014,8 +2124,8 @@ pub const Subprocess = struct { globalThis.throwOutOfMemory(); return .zero; }, - .stdout = Readable.init(stdio[1], spawned.stdout, jsc_vm.allocator, default_max_buffer_size), - .stderr = Readable.init(stdio[2], spawned.stderr, jsc_vm.allocator, default_max_buffer_size), + .stdout = Readable.init(stdio[1], spawned.stdout, jsc_vm.allocator, default_max_buffer_size, is_sync), + .stderr = Readable.init(stdio[2], spawned.stderr, jsc_vm.allocator, default_max_buffer_size, is_sync), .stdio_pipes = spawned.extra_pipes.moveToUnmanaged(), .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .ipc_mode = ipc_mode, @@ -2149,6 +2259,7 @@ pub const Subprocess = struct { pipe: ?JSC.WebCore.ReadableStream, array_buffer: JSC.ArrayBuffer.Strong, memfd: bun.FileDescriptor, + sync_buffered_output: *BufferedOutput, const PipeExtra = struct { fd: i32, @@ -2261,6 +2372,7 @@ pub const Subprocess = struct { .path => |pathlike| .{ .path = pathlike.slice() }, .inherit => .{ .inherit = {} }, .ignore => .{ .ignore = {} }, + .sync_buffer => .{ .buffer = &stdio.sync_buffer.reader.pipe }, .memfd => @panic("This should never happen"), }; diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index ee8cff61ca..563af71698 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -409,6 +409,7 @@ pub const ArrayBuffer = extern struct { return Bun__createUint8ArrayForCopy(globalThis, bytes.ptr, bytes.len, true); } + extern "C" fn Bun__createUint8ArrayForCopy(*JSC.JSGlobalObject, ptr: ?*const anyopaque, len: usize, buffer: bool) JSValue; extern "C" fn Bun__createArrayBufferForCopy(*JSC.JSGlobalObject, ptr: ?*const anyopaque, len: usize) JSValue; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 36de8f341b..b4632d4401 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -4154,7 +4154,7 @@ pub fn NewFIFO(comptime EventLoop: JSC.EventLoopKind) type { return struct { buf: []u8 = &[_]u8{}, view: JSC.Strong = .{}, - poll_ref: ?*Async.FilePoll = null, + fd: bun.FileDescriptor = bun.invalid_fd, to_read: ?u32 = null, close_on_empty_read: bool = false, @@ -4165,7 +4165,6 @@ pub fn NewFIFO(comptime EventLoop: JSC.EventLoopKind) type { .result = .{ .done = {} }, }, signal: JSC.WebCore.Signal = .{}, - is_first_read: bool = true, has_adjusted_pipe_size_on_linux: bool = false, drained: bool = true, diff --git a/src/install/lifecycle_script_runner.zig b/src/install/lifecycle_script_runner.zig index 77f82031a9..c2e3918cec 100644 --- a/src/install/lifecycle_script_runner.zig +++ b/src/install/lifecycle_script_runner.zig @@ -35,137 +35,15 @@ pub const LifecycleScriptSubprocess = struct { const uv = bun.windows.libuv; - const PosixOutputReader = struct { - poll: *Async.FilePoll = undefined, - buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), - is_done: bool = false, + pub const OutputReader = bun.io.BufferedOutputReader(LifecycleScriptSubprocess, null); - // This is a workaround for "Dependency loop detected" - parent: *LifecycleScriptSubprocess = undefined, + pub fn loop(this: *const LifecycleScriptSubprocess) *bun.uws.Loop { + return this.manager.event_loop.loop(); + } - pub usingnamespace bun.io.PipeReader( - @This(), - getFd, - getBuffer, - null, - registerPoll, - done, - onError, - ); - - pub fn getFd(this: *PosixOutputReader) bun.FileDescriptor { - return this.poll.fd; - } - - pub fn getBuffer(this: *PosixOutputReader) *std.ArrayList(u8) { - return &this.buffer; - } - - fn finish(this: *PosixOutputReader) void { - this.poll.flags.insert(.ignore_updates); - this.subprocess().manager.event_loop.putFilePoll(this.poll); - std.debug.assert(!this.is_done); - this.is_done = true; - } - - pub fn done(this: *PosixOutputReader) void { - this.finish(); - this.subprocess().onOutputDone(); - } - - pub fn onError(this: *PosixOutputReader, err: bun.sys.Error) void { - this.finish(); - this.subprocess().onOutputError(err); - } - - pub fn registerPoll(this: *PosixOutputReader) void { - switch (this.poll.register(this.subprocess().manager.event_loop.loop(), .readable, true)) { - .err => |err| { - Output.prettyErrorln("error: Failed to register poll for {s} script output from \"{s}\" due to error {d} {s}", .{ - this.subprocess().scriptName(), - this.subprocess().package_name, - err.errno, - @tagName(err.getErrno()), - }); - }, - .result => {}, - } - } - - pub inline fn subprocess(this: *PosixOutputReader) *LifecycleScriptSubprocess { - return this.parent; - } - - pub fn start(this: *PosixOutputReader) JSC.Maybe(void) { - const maybe = this.poll.register(this.subprocess().manager.event_loop.loop(), .readable, true); - if (maybe != .result) { - return maybe; - } - - this.read(); - - return .{ - .result = {}, - }; - } - }; - - const WindowsOutputReader = struct { - pipe: uv.Pipe = std.mem.zeroes(uv.Pipe), - buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), - is_done: bool = false, - - // This is a workaround for "Dependency loop detected" - parent: *LifecycleScriptSubprocess = undefined, - - pub usingnamespace bun.io.PipeReader( - @This(), - {}, - getBuffer, - null, - null, - done, - onError, - ); - - pub fn getBuffer(this: *WindowsOutputReader) *std.ArrayList(u8) { - return &this.buffer; - } - - fn finish(this: *WindowsOutputReader) void { - std.debug.assert(!this.is_done); - this.is_done = true; - } - - pub fn done(this: *WindowsOutputReader) void { - std.debug.assert(this.pipe.isClosed()); - - this.finish(); - this.subprocess().onOutputDone(); - } - - pub fn onError(this: *WindowsOutputReader, err: bun.sys.Error) void { - this.finish(); - this.subprocess().onOutputError(err); - } - - pub inline fn subprocess(this: *WindowsOutputReader) *LifecycleScriptSubprocess { - return this.parent; - } - - pub fn getReadBufferWithStableMemoryAddress(this: *WindowsOutputReader, suggested_size: usize) []u8 { - this.buffer.ensureUnusedCapacity(suggested_size) catch bun.outOfMemory(); - return this.buffer.allocatedSlice()[this.buffer.items.len..]; - } - - pub fn start(this: *WindowsOutputReader) JSC.Maybe(void) { - this.buffer.clearRetainingCapacity(); - this.is_done = false; - return this.startReading(); - } - }; - - pub const OutputReader = if (Environment.isPosix) PosixOutputReader else WindowsOutputReader; + pub fn eventLoop(this: *const LifecycleScriptSubprocess) *JSC.AnyEventLoop { + return &this.manager.event_loop; + } pub fn scriptName(this: *const LifecycleScriptSubprocess) []const u8 { std.debug.assert(this.current_script_index < Lockfile.Scripts.names.len); diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 16abc1ea08..7cc6b9c208 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -45,7 +45,7 @@ pub fn PosixPipeReader( readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint); } - const stack_buffer_len = 16384; + const stack_buffer_len = 64 * 1024; fn readFromBlockingPipeWithoutBlocking(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize) void { if (size_hint > stack_buffer_len) { @@ -65,7 +65,7 @@ pub fn PosixPipeReader( switch (bun.sys.read(fd, buffer)) { .result => |bytes_read| { if (bytes_read == 0) { - vtable.done(parent); + parent.close(); return; } @@ -113,15 +113,19 @@ pub fn PosixPipeReader( } } } + + pub fn close(this: *This) void { + _ = bun.sys.close(getFd(this)); + this.poll.deinit(); + vtable.done(this); + } }; } const uv = bun.windows.libuv; pub fn WindowsPipeReader( comptime This: type, - // Originally this was the comptime vtable struct like the below - // But that caused a Zig compiler segfault as of 0.12.0-dev.1604+caae40c21 - comptime getFd: anytype, + comptime _: anytype, comptime getBuffer: fn (*This) *std.ArrayList(u8), comptime onReadChunk: ?fn (*This, chunk: []u8) void, comptime registerPoll: ?fn (*This) void, @@ -132,9 +136,7 @@ pub fn WindowsPipeReader( pub usingnamespace uv.StreamReaderMixin(This, .pipe); const vtable = .{ - .getFd = getFd, .getBuffer = getBuffer, - .onReadChunk = onReadChunk, .registerPoll = registerPoll, .done = done, .onError = onError, @@ -173,13 +175,13 @@ pub fn WindowsPipeReader( return; } - var buffer = getBuffer(this); - if (amount.result == 0) { close(this); return; } + var buffer = getBuffer(this); + if (comptime bun.Environment.allow_assert) { if (!bun.isSliceInBuffer(buf.slice()[0..amount.result], buffer.allocatedSlice())) { @panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it."); @@ -201,3 +203,176 @@ pub fn WindowsPipeReader( } pub const PipeReader = if (bun.Environment.isWindows) WindowsPipeReader else PosixPipeReader; +const Async = bun.Async; +pub fn PosixBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void) type { + return struct { + poll: *Async.FilePoll = undefined, + buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), + is_done: bool = false, + parent: *Parent = undefined, + + const PosixOutputReader = @This(); + + pub fn setParent(this: *@This(), parent: *Parent) void { + this.parent = parent; + if (!this.is_done) { + this.poll.owner = Async.FilePoll.Owner.init(this); + } + } + + pub usingnamespace PosixPipeReader( + @This(), + getFd, + getBuffer, + if (onReadChunk != null) _onReadChunk else null, + registerPoll, + done, + onError, + ); + + fn _onReadChunk(this: *PosixOutputReader, chunk: []u8) void { + onReadChunk.?(this.parent, chunk); + } + + pub fn getFd(this: *PosixOutputReader) bun.FileDescriptor { + return this.poll.fd; + } + + pub fn getBuffer(this: *PosixOutputReader) *std.ArrayList(u8) { + return &this.buffer; + } + + pub fn ref(this: *@This(), event_loop_ctx: anytype) void { + this.poll.ref(event_loop_ctx); + } + + pub fn unref(this: *@This(), event_loop_ctx: anytype) void { + this.poll.unref(event_loop_ctx); + } + + fn finish(this: *PosixOutputReader) void { + this.poll.flags.insert(.ignore_updates); + this.parent.eventLoop().putFilePoll(this.poll); + std.debug.assert(!this.is_done); + this.is_done = true; + } + + pub fn done(this: *PosixOutputReader) void { + this.finish(); + this.parent.onOutputDone(); + } + + pub fn deinit(this: *PosixOutputReader) void { + this.buffer.deinit(); + this.poll.deinit(); + } + + pub fn onError(this: *PosixOutputReader, err: bun.sys.Error) void { + this.finish(); + this.parent.onOutputError(err); + } + + pub fn registerPoll(this: *PosixOutputReader) void { + switch (this.poll.register(this.parent.loop(), .readable, true)) { + .err => |err| { + this.onError(err); + }, + .result => {}, + } + } + + pub fn start(this: *PosixOutputReader) bun.JSC.Maybe(void) { + const maybe = this.poll.register(this.parent.loop(), .readable, true); + if (maybe != .result) { + return maybe; + } + + this.read(); + + return .{ + .result = {}, + }; + } + }; +} +const JSC = bun.JSC; + +fn WindowsBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?*const fn (*anyopaque, buf: []u8) void) type { + return struct { + pipe: uv.Pipe = std.mem.zeroes(uv.Pipe), + buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), + is_done: bool = false, + + parent: *Parent = undefined, + + const WindowsOutputReader = @This(); + + pub fn setParent(this: *@This(), parent: *Parent) void { + this.parent = parent; + if (!this.is_done) { + this.pipe.data = this; + } + } + + pub fn ref(this: *@This()) void { + this.pipe.ref(); + } + + pub fn unref(this: *@This()) void { + this.pipe.unref(); + } + + pub usingnamespace WindowsPipeReader( + @This(), + {}, + getBuffer, + if (onReadChunk != null) _onReadChunk else null, + null, + done, + onError, + ); + + pub fn getBuffer(this: *WindowsOutputReader) *std.ArrayList(u8) { + return &this.buffer; + } + + fn _onReadChunk(this: *WindowsOutputReader, buf: []u8) void { + onReadChunk.?(this.parent, buf); + } + + fn finish(this: *WindowsOutputReader) void { + std.debug.assert(!this.is_done); + this.is_done = true; + } + + pub fn done(this: *WindowsOutputReader) void { + std.debug.assert(this.pipe.isClosed()); + + this.finish(); + this.parent.onOutputDone(); + } + + pub fn onError(this: *WindowsOutputReader, err: bun.sys.Error) void { + this.finish(); + this.parent.onOutputError(err); + } + + pub fn getReadBufferWithStableMemoryAddress(this: *WindowsOutputReader, suggested_size: usize) []u8 { + this.buffer.ensureUnusedCapacity(suggested_size) catch bun.outOfMemory(); + return this.buffer.allocatedSlice()[this.buffer.items.len..]; + } + + pub fn start(this: *WindowsOutputReader) JSC.Maybe(void) { + this.buffer.clearRetainingCapacity(); + this.is_done = false; + return this.startReading(); + } + + pub fn deinit(this: *WindowsOutputReader) void { + this.buffer.deinit(); + std.debug.assert(this.pipe.isClosed()); + } + }; +} + +pub const BufferedOutputReader = if (bun.Environment.isPosix) PosixBufferedOutputReader else WindowsBufferedOutputReader; diff --git a/src/io/io.zig b/src/io/io.zig index 1aef8995c0..c9afe91682 100644 --- a/src/io/io.zig +++ b/src/io/io.zig @@ -927,3 +927,4 @@ pub const Poll = struct { pub const retry = bun.C.E.AGAIN; pub const PipeReader = @import("./PipeReader.zig").PipeReader; +pub const BufferedOutputReader = @import("./PipeReader.zig").BufferedOutputReader; diff --git a/src/shell/subproc.zig b/src/shell/subproc.zig index 1d2f769613..20f2db636d 100644 --- a/src/shell/subproc.zig +++ b/src/shell/subproc.zig @@ -1279,7 +1279,7 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh } pub fn wait(this: *@This(), sync: bool) void { - return this.process.wait(sync); + return this.process.waitPosix(sync); } pub fn onProcessExit(this: *@This(), _: *Process, status: bun.spawn.Status, _: *const bun.spawn.Rusage) void {