diff --git a/src/bun.js/api/bun/spawn/stdio.zig b/src/bun.js/api/bun/spawn/stdio.zig index 03665ede80..ae74304f4c 100644 --- a/src/bun.js/api/bun/spawn/stdio.zig +++ b/src/bun.js/api/bun/spawn/stdio.zig @@ -24,6 +24,15 @@ pub const Stdio = union(enum) { const log = bun.sys.syslog; + pub fn byteSlice(this: *const Stdio) []const u8 { + return switch (this.*) { + .capture => this.capture.slice(), + .array_buffer => this.array_buffer.array_buffer.byteSlice(), + .blob => this.blob.slice(), + else => &[_]u8{}, + }; + } + pub fn deinit(this: *Stdio) void { switch (this.*) { .array_buffer => |*array_buffer| { @@ -294,7 +303,7 @@ pub const Stdio = union(enum) { if (blob.store()) |store| { if (store.data.file.pathlike == .fd) { if (store.data.file.pathlike.fd == fd) { - stdio.* = Stdio{ .inherit = .{} }; + stdio.* = Stdio{ .inherit = {} }; } else { switch (bun.FDTag.get(i)) { .stdin => { diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index f62be13d98..ec4db67318 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -371,6 +371,7 @@ pub const Subprocess = struct { .memfd => Readable{ .memfd = stdio.memfd }, .pipe => Readable{ .pipe = PipeReader.create(event_loop, process, fd.?) }, .array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}), + .capture => Output.panic("TODO: implement capture support in Stdio readable", .{}), }; } @@ -649,7 +650,14 @@ pub const Subprocess = struct { pub usingnamespace bun.NewRefCounted(@This(), deinit); const This = @This(); - pub const IOWriter = bun.io.BufferedWriter(This, onWrite, onError, onClose, getBuffer); + pub const IOWriter = bun.io.BufferedWriter( + This, + onWrite, + onError, + onClose, + getBuffer, + null, + ); pub const Poll = IOWriter; pub fn updateRef(this: *This, add: bool) void { @@ -665,7 +673,8 @@ pub const Subprocess = struct { } pub fn flush(this: *This) void { - this.writer.flush(); + _ = this; // autofix + // this.writer.flush(); } pub fn create(event_loop: anytype, subprocess: *ProcessType, fd: bun.FileDescriptor, source: Source) *This { @@ -678,7 +687,7 @@ pub const Subprocess = struct { } pub fn start(this: *This) JSC.Maybe(void) { - return this.writer.start(this.fd, this.source.slice(), this.event_loop, true); + return this.writer.start(this.fd, true); } pub fn onWrite(this: *This, amount: usize, is_done: bool) void { @@ -986,6 +995,9 @@ pub const Subprocess = struct { .path, .ignore => { return Writable{ .ignore = {} }; }, + .capture => { + return Writable{ .ignore = {} }; + }, } } @@ -993,7 +1005,7 @@ pub const Subprocess = struct { return switch (this.*) { .fd => |fd| JSValue.jsNumber(fd), .memfd, .ignore => JSValue.jsUndefined(), - .buffer, .inherit => JSValue.jsUndefined(), + .capture, .buffer, .inherit => JSValue.jsUndefined(), .pipe => |pipe| { this.* = .{ .ignore = {} }; return pipe.toJS(globalThis); @@ -1429,7 +1441,7 @@ pub const Subprocess = struct { while (stdio_iter.next()) |value| : (i += 1) { var new_item: Stdio = undefined; - if (&new_item.extract(globalThis, i, value)) + if (new_item.extract(globalThis, i, value)) return JSC.JSValue.jsUndefined(); switch (new_item) { .pipe => { diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index ea6622f14c..4efba1e2be 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -133,8 +133,9 @@ pub fn PosixBufferedWriter( comptime Parent: type, comptime onWrite: *const fn (*Parent, amount: usize, done: bool) void, comptime onError: *const fn (*Parent, bun.sys.Error) void, - comptime onClose: *const fn (*Parent) void, + comptime onClose: ?*const fn (*Parent) void, comptime getBuffer: *const fn (*Parent) []const u8, + comptime onWritable: ?*const fn (*Parent) void, ) type { return struct { handle: PollOrFd = .{ .closed = {} }, @@ -171,7 +172,6 @@ pub fn PosixBufferedWriter( const parent = this.parent; onWrite(parent, written, done); - if (done and !was_done) { this.close(); } @@ -181,6 +181,10 @@ pub fn PosixBufferedWriter( if (this.is_done) { return; } + + if (onWritable) |cb| { + cb(this.parent); + } } fn registerPoll(this: *PosixWriter) void { @@ -228,7 +232,8 @@ pub fn PosixBufferedWriter( } pub fn close(this: *PosixWriter) void { - this.handle.close(this.parent, onClose); + if (onClose) |closer| + this.handle.close(this.parent, closer); } pub fn updateRef(this: *const PosixWriter, event_loop: anytype, value: bool) void { diff --git a/src/shell/interpreter.zig b/src/shell/interpreter.zig index 17b94de118..aa53a674ab 100644 --- a/src/shell/interpreter.zig +++ b/src/shell/interpreter.zig @@ -2681,7 +2681,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { const HandleIOWrite = struct { fn run(pipeline: *Pipeline, bufw: BufferedWriter) void { pipeline.state = .{ .waiting_write_err = bufw }; - pipeline.state.waiting_write_err.writeIfPossible(false); + pipeline.state.waiting_write_err.write(); } }; @@ -3001,7 +3001,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { const HandleIOWrite = struct { fn run(cmd: *Cmd, bufw: BufferedWriter) void { cmd.state = .{ .waiting_write_err = bufw }; - cmd.state.waiting_write_err.writeIfPossible(false); + cmd.state.waiting_write_err.write(); } }; _ = this.base.shell.writeFailingError(buf, this, HandleIOWrite.run); @@ -3014,7 +3014,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { // .parent = BufferedWriter.ParentPtr.init(this), // .bytelist = val.captured, // } }; - // this.state.waiting_write_err.writeIfPossible(false); + // this.state.waiting_write_err.write(); // }, // .fd => { // this.state = .{ .waiting_write_err = BufferedWriter{ @@ -3022,7 +3022,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { // .remain = buf, // .parent = BufferedWriter.ParentPtr.init(this), // } }; - // this.state.waiting_write_err.writeIfPossible(false); + // this.state.waiting_write_err.write(); // }, // .pipe, .ignore => { // this.parent.childDone(this, 1); @@ -4167,7 +4167,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(io_kind), }, }; - this.print_state.?.bufwriter.writeIfPossible(false); + this.print_state.?.bufwriter.write(); return Maybe(void).success; } @@ -4240,7 +4240,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { }, }; - this.print_state.?.bufwriter.writeIfPossible(false); + this.print_state.?.bufwriter.write(); // if (this.print_state.?.isDone()) { // if (this.print_state.?.bufwriter.err) |e| { @@ -4333,7 +4333,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stdout), }; this.state = .waiting; - this.io_write_state.?.writeIfPossible(false); + this.io_write_state.?.write(); return Maybe(void).success; } @@ -4404,7 +4404,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { }, }, }; - this.state.one_arg.writer.writeIfPossible(false); + this.state.one_arg.writer.write(); return Maybe(void).success; } @@ -4469,7 +4469,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stdout), }, }; - multiargs.state.waiting_write.writeIfPossible(false); + multiargs.state.waiting_write.write(); // yield execution return; }; @@ -4483,7 +4483,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stdout), }, }; - multiargs.state.waiting_write.writeIfPossible(false); + multiargs.state.waiting_write.write(); return; } @@ -4550,7 +4550,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { }, }, }; - this.state.waiting_write_stderr.buffered_writer.writeIfPossible(false); + this.state.waiting_write_stderr.buffered_writer.write(); } pub fn start(this: *Cd) Maybe(void) { @@ -4680,7 +4680,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { }, }, }; - this.state.waiting_io.writer.writeIfPossible(false); + this.state.waiting_io.writer.write(); return Maybe(void).success; } @@ -4706,7 +4706,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { }, }, }; - this.state.waiting_io.writer.writeIfPossible(false); + this.state.waiting_io.writer.write(); return Maybe(void).success; } @@ -4800,7 +4800,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stderr), }, }; - this.state.waiting_write_err.writeIfPossible(false); + this.state.waiting_write_err.write(); return Maybe(void).success; } @@ -4890,7 +4890,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { if (this.state.exec.output_queue.len == 1 and do_run) { // if (do_run and !this.state.exec.started_output_queue) { this.state.exec.started_output_queue = true; - this.state.exec.output_queue.first.?.data.writer.writeIfPossible(false); + this.state.exec.output_queue.first.?.data.writer.write(); return .yield; } return .cont; @@ -4898,7 +4898,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { fn scheduleBlockingOutput(this: *Ls) CoroutineResult { if (this.state.exec.output_queue.len > 0) { - this.state.exec.output_queue.first.?.data.writer.writeIfPossible(false); + this.state.exec.output_queue.first.?.data.writer.write(); return .yield; } return .cont; @@ -4919,7 +4919,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { bun.default_allocator.destroy(first); } if (first.next) |next_writer| { - next_writer.data.writer.writeIfPossible(false); + next_writer.data.writer.write(); return; } @@ -5789,7 +5789,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .exit_code = exit_code, }, }; - this.state.waiting_write_err.writer.writeIfPossible(false); + this.state.waiting_write_err.writer.write(); return Maybe(void).success; } @@ -6267,7 +6267,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stderr), }, }; - parse_opts.state.wait_write_err.writeIfPossible(false); + parse_opts.state.wait_write_err.write(); return Maybe(void).success; } @@ -6305,7 +6305,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stderr), }, }; - parse_opts.state.wait_write_err.writeIfPossible(false); + parse_opts.state.wait_write_err.write(); continue; } @@ -6350,7 +6350,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stderr), }, }; - parse_opts.state.wait_write_err.writeIfPossible(false); + parse_opts.state.wait_write_err.write(); return Maybe(void).success; } @@ -6389,7 +6389,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stderr), }, }; - parse_opts.state.wait_write_err.writeIfPossible(false); + parse_opts.state.wait_write_err.write(); return Maybe(void).success; } @@ -6412,7 +6412,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { .bytelist = this.bltn.stdBufferedBytelist(.stderr), }, }; - parse_opts.state.wait_write_err.writeIfPossible(false); + parse_opts.state.wait_write_err.write(); return Maybe(void).success; } @@ -6495,7 +6495,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { bun.default_allocator.destroy(first); } if (first.next) |next_writer| { - next_writer.data.writer.writeIfPossible(false); + next_writer.data.writer.write(); } else { if (this.state.exec.state.tasksDone() >= this.state.exec.total_tasks and this.state.exec.getOutputCount(.output_done) >= this.state.exec.getOutputCount(.output_count)) { this.bltn.done(if (this.state.exec.err != null) 1 else 0); @@ -6684,7 +6684,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { // Need to start it if (this.state.exec.output_queue.len == 1) { - this.state.exec.output_queue.first.?.data.writer.writeIfPossible(false); + this.state.exec.output_queue.first.?.data.writer.write(); } } @@ -7287,9 +7287,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { /// it. IT DOES NOT CLOSE FILE DESCRIPTORS pub const BufferedWriter = struct { - remain: []const u8 = "", - fd: bun.FileDescriptor, - poll_ref: ?*bun.Async.FilePoll = null, + writer: Writer = .{}, written: usize = 0, parent: ParentPtr, err: ?Syscall.Error = null, @@ -7304,6 +7302,48 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { const BuiltinJs = bun.shell.Interpreter.Builtin; const BuiltinMini = bun.shell.InterpreterMini.Builtin; + pub const Writer = bun.io.BufferedWriter( + @This(), + onWrite, + onError, + onClose, + getBuffer, + onReady, + ); + + pub const Status = union(enum) { + pending: void, + done: void, + err: bun.sys.Error, + }; + + pub fn getBuffer(this: *BufferedWriter) []const u8 { + _ = this; // autofix + // TODO: + return ""; + } + + pub fn onWrite(this: *BufferedWriter, amount: usize, done: bool) void { + _ = done; // autofix + if (this.bytelist) |bytelist| { + bytelist.append(bun.default_allocator, this.getBuffer()[this.getBuffer().len - amount ..]) catch bun.outOfMemory(); + } + } + + pub fn onError(this: *BufferedWriter, err: bun.sys.Error) void { + _ = this; // autofix + _ = err; // autofix + + } + pub fn onReady(this: *BufferedWriter) void { + _ = this; // autofix + + } + pub fn onClose(this: *BufferedWriter) void { + _ = this; // autofix + + } + pub const ParentPtr = struct { const Types = .{ BuiltinJs.Export, @@ -7373,132 +7413,8 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type { pub const event_loop_kind = EventLoopKind; pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedWriter, .writable, onReady); - pub fn onReady(this: *BufferedWriter, _: i64) void { - if (this.fd == bun.invalid_fd) { - return; - } - - this.__write(); - } - - pub fn writeIfPossible(this: *BufferedWriter, comptime is_sync: bool) void { - if (this.remain.len == 0) return this.deinit(); - if (comptime !is_sync) { - // we ask, "Is it possible to write right now?" - // we do this rather than epoll or kqueue() - // because we don't want to block the thread waiting for the write - switch (bun.isWritable(this.fd)) { - .ready => { - if (this.poll_ref) |poll| { - poll.flags.insert(.writable); - poll.flags.insert(.fifo); - std.debug.assert(poll.flags.contains(.poll_writable)); - } - }, - .hup => { - this.deinit(); - return; - }, - .not_ready => { - if (!this.isWatching()) this.watch(this.fd); - return; - }, - } - } - - this.writeAllowBlocking(is_sync); - } - - /// Calling this directly will block if the fd is not opened with non - /// blocking option. If the fd is blocking, you should call - /// `writeIfPossible()` first, which will check if the fd is writable. If so - /// it will then call this function, if not, then it will poll for the fd to - /// be writable - pub fn __write(this: *BufferedWriter) void { - this.writeAllowBlocking(false); - } - - pub fn writeAllowBlocking(this: *BufferedWriter, allow_blocking: bool) void { - var to_write = this.remain; - - if (to_write.len == 0) { - // we are done! - this.deinit(); - return; - } - - if (comptime bun.Environment.allow_assert) { - // bun.assertNonBlocking(this.fd); - } - - while (to_write.len > 0) { - switch (bun.sys.write(this.fd, to_write)) { - .err => |e| { - if (e.isRetry()) { - log("write({d}) retry", .{ - to_write.len, - }); - - this.watch(this.fd); - this.poll_ref.?.flags.insert(.fifo); - return; - } - - if (e.getErrno() == .PIPE) { - this.deinit(); - return; - } - - // fail - log("write({d}) fail: {d}", .{ to_write.len, e.errno }); - this.err = e; - this.deinit(); - return; - }, - - .result => |bytes_written| { - if (this.bytelist) |blist| { - blist.append(bun.default_allocator, to_write[0..bytes_written]) catch bun.outOfMemory(); - } - - this.written += bytes_written; - - log( - "write({d}) {d}", - .{ - to_write.len, - bytes_written, - }, - ); - - this.remain = this.remain[@min(bytes_written, this.remain.len)..]; - to_write = to_write[bytes_written..]; - - // we are done or it accepts no more input - if (this.remain.len == 0 or (allow_blocking and bytes_written == 0)) { - this.deinit(); - return; - } - }, - } - } - } - - fn close(this: *BufferedWriter) void { - if (this.poll_ref) |poll| { - this.poll_ref = null; - poll.deinit(); - } - - if (this.fd != bun.invalid_fd) { - // _ = bun.sys.close(this.fd); - // this.fd = bun.invalid_fd; - } - } - pub fn deinit(this: *BufferedWriter) void { - this.close(); - this.parent.onDone(this.err); + this.writer.deinit(); } }; }; @@ -7770,11 +7686,20 @@ pub fn NewBufferedWriter(comptime Src: type, comptime Parent: type, comptime Eve return struct { src: Src, - fd: bun.FileDescriptor, - poll_ref: ?*bun.Async.FilePoll = null, written: usize = 0, parent: Parent, err: ?Syscall.Error = null, + writer: Writer = .{}, + + pub const Writer = bun.io.BufferedWriter( + @This(), + onWrite, + onError, + // we don't close it + null, + getBuffer, + onReady, + ); pub const ParentType = Parent; @@ -7787,141 +7712,48 @@ pub fn NewBufferedWriter(comptime Src: type, comptime Parent: type, comptime Eve pub const event_loop_kind = EventLoopKind; pub usingnamespace JSC.WebCore.NewReadyWatcher(@This(), .writable, onReady); - pub fn onReady(this: *@This(), _: i64) void { - if (this.fd == bun.invalid_fd) { + pub fn onReady(this: *@This()) void { + if (this.src.isDone(this.written)) { + this.parent.onDone(this.err); return; } - this.__write(); + const buf = this.getBuffer(); + this.writer.write(buf); } - pub fn writeIfPossible(this: *@This(), comptime is_sync: bool) void { - if (SrcHandler.bufToWrite(this.src, 0).len == 0) return this.deinit(); - if (comptime !is_sync) { - // we ask, "Is it possible to write right now?" - // we do this rather than epoll or kqueue() - // because we don't want to block the thread waiting for the write - switch (bun.isWritable(this.fd)) { - .ready => { - if (this.poll_ref) |poll| { - poll.flags.insert(.writable); - poll.flags.insert(.fifo); - std.debug.assert(poll.flags.contains(.poll_writable)); - } - }, - .hup => { - this.deinit(); - return; - }, - .not_ready => { - if (!this.isWatching()) this.watch(this.fd); - return; - }, - } - } - - this.writeAllowBlocking(is_sync); + pub fn getBuffer(this: *@This()) []const u8 { + return SrcHandler.bufToWrite(this.src, this.written); } - /// Calling this directly will block if the fd is not opened with non - /// blocking option. If the fd is blocking, you should call - /// `writeIfPossible()` first, which will check if the fd is writable. If so - /// it will then call this function, if not, then it will poll for the fd to - /// be writable - pub fn __write(this: *@This()) void { - this.writeAllowBlocking(false); - } - - pub fn writeAllowBlocking(this: *@This(), allow_blocking: bool) void { - _ = allow_blocking; // autofix - - var to_write = SrcHandler.bufToWrite(this.src, this.written); - - if (to_write.len == 0) { - // we are done! - // this.closeFDIfOpen(); - if (SrcHandler.isDone(this.src, this.written)) { - this.deinit(); - } + pub fn write(this: *@This()) void { + if (this.src.isDone(this.written)) { return; } - if (comptime bun.Environment.allow_assert) { - // bun.assertNonBlocking(this.fd); - } + const buf = this.getBuffer(); + this.writer.write(buf); + } - while (to_write.len > 0) { - switch (bun.sys.write(this.fd, to_write)) { - .err => |e| { - if (e.isRetry()) { - log("write({d}) retry", .{ - to_write.len, - }); + pub fn onWrite(this: *@This(), amount: usize, done: bool) void { + this.written += amount; - this.watch(this.fd); - this.poll_ref.?.flags.insert(.fifo); - return; - } - - if (e.getErrno() == .PIPE) { - this.deinit(); - return; - } - - // fail - log("write({d}) fail: {d}", .{ to_write.len, e.errno }); - this.err = e; - this.deinit(); - return; - }, - - .result => |bytes_written| { - this.written += bytes_written; - - log( - "write({d}) {d}", - .{ - to_write.len, - bytes_written, - }, - ); - - // this.remain = this.remain[@min(bytes_written, this.remain.len)..]; - // to_write = to_write[bytes_written..]; - - // // we are done or it accepts no more input - // if (this.remain.len == 0 or (allow_blocking and bytes_written == 0)) { - // this.deinit(); - // return; - // } - - to_write = SrcHandler.bufToWrite(this.src, this.written); - if (to_write.len == 0) { - if (SrcHandler.isDone(this.src, this.written)) { - this.deinit(); - return; - } - } - }, - } + if (done or this.src.isDone(this.written)) { + this.parent.onDone(this.err); + } else { + const buf = this.getBuffer(); + this.writer.write(buf); } } - fn close(this: *@This()) void { - if (this.poll_ref) |poll| { - this.poll_ref = null; - poll.deinit(); - } + pub fn onError(this: *@This(), err: bun.sys.Error) void { + this.err = err; - if (this.fd != bun.invalid_fd) { - // _ = bun.sys.close(this.fd); - // this.fd = bun.invalid_fd; - } + this.parent.onDone(this.err); } pub fn deinit(this: *@This()) void { - this.close(); - this.parent.onDone(this.err); + this.writer.deinit(); } }; } diff --git a/src/shell/subproc.zig b/src/shell/subproc.zig index ad426994c4..1a85be54ce 100644 --- a/src/shell/subproc.zig +++ b/src/shell/subproc.zig @@ -50,6 +50,7 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh }; } }; + _ = get_vm; // autofix // const ShellCmd = switch (EventLoopKind) { // .js => bun.shell.interpret.Interpreter.Cmd, @@ -73,9 +74,9 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh process: *Process, - stdin: Writable, - stdout: Readable, - stderr: Readable, + stdin: *Writable = undefined, + stdout: *Readable = undefined, + stderr: *Readable = undefined, globalThis: GlobalRef, @@ -90,6 +91,9 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh pub const OutKind = util.OutKind; + const Readable = opaque {}; + const Writable = opaque {}; + pub const Flags = packed struct(u3) { is_sync: bool = false, killed: bool = false, @@ -97,335 +101,10 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh }; pub const SignalCode = bun.SignalCode; - pub const Writable = union(enum) { - pipe: *FileSink, - pipe_to_readable_stream: struct { - pipe: *FileSink, - readable_stream: JSC.WebCore.ReadableStream, - }, - fd: bun.FileDescriptor, - buffered_input: BufferedInput, - inherit: void, - ignore: void, - - pub fn ref(this: *Writable) void { - switch (this.*) { - .pipe => { - if (this.pipe.poll_ref) |poll| { - poll.enableKeepingProcessAlive(get_vm.get()); - } - }, - else => {}, - } - } - - pub fn unref(this: *Writable) void { - switch (this.*) { - .pipe => { - if (this.pipe.poll_ref) |poll| { - poll.enableKeepingProcessAlive(get_vm.get()); - } - }, - else => {}, - } - } - - // When the stream has closed we need to be notified to prevent a use-after-free - // We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice - pub fn onClose(this: *Writable, _: ?bun.sys.Error) void { - this.* = .{ - .ignore = {}, - }; - } - pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {} - pub fn onStart(_: *Writable) void {} - - pub fn init(subproc: *Subprocess, stdio: Stdio, fd: ?bun.FileDescriptor, globalThis: GlobalRef) !Writable { - switch (stdio) { - .pipe => { - // var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink); - var sink = try GlobalHandle.init(globalThis).allocator().create(FileSink); - sink.* = .{ - .fd = fd.?, - .buffer = bun.ByteList{}, - .allocator = GlobalHandle.init(globalThis).allocator(), - .auto_close = true, - }; - sink.mode = std.os.S.IFIFO; - sink.watch(fd.?); - if (stdio == .pipe) { - if (stdio.pipe) |readable| { - if (comptime EventLoopKind == .mini) @panic("FIXME TODO error gracefully but wait can this even happen"); - return Writable{ - .pipe_to_readable_stream = .{ - .pipe = sink, - .readable_stream = readable, - }, - }; - } - } - - return Writable{ .pipe = sink }; - }, - .array_buffer, .blob => { - var buffered_input: BufferedInput = .{ .fd = fd.?, .source = undefined, .subproc = subproc }; - switch (stdio) { - .array_buffer => |array_buffer| { - buffered_input.source = .{ .array_buffer = array_buffer.buf }; - }, - .blob => |blob| { - buffered_input.source = .{ .blob = blob }; - }, - else => unreachable, - } - return Writable{ .buffered_input = buffered_input }; - }, - .fd => { - return Writable{ .fd = fd.? }; - }, - .inherit => { - return Writable{ .inherit = {} }; - }, - .path, .ignore => { - return Writable{ .ignore = {} }; - }, - } - } - - pub fn toJS(this: Writable, globalThis: *JSC.JSGlobalObject) JSValue { - return switch (this) { - .pipe => |pipe| pipe.toJS(globalThis), - .fd => |fd| JSValue.jsNumber(fd), - .ignore => JSValue.jsUndefined(), - .inherit => JSValue.jsUndefined(), - .buffered_input => JSValue.jsUndefined(), - .pipe_to_readable_stream => this.pipe_to_readable_stream.readable_stream.value, - }; - } - - pub fn finalize(this: *Writable) void { - return switch (this.*) { - .pipe => |pipe| { - pipe.deref(); - }, - .pipe_to_readable_stream => |*pipe_to_readable_stream| { - _ = pipe_to_readable_stream.pipe.end(null); - }, - .fd => |fd| { - _ = bun.sys.close(fd); - this.* = .{ .ignore = {} }; - }, - .buffered_input => { - this.buffered_input.deinit(); - }, - .ignore => {}, - .inherit => {}, - }; - } - - pub fn close(this: *Writable) void { - return switch (this.*) { - .pipe => {}, - .pipe_to_readable_stream => |*pipe_to_readable_stream| { - _ = pipe_to_readable_stream.pipe.end(null); - }, - .fd => |fd| { - _ = bun.sys.close(fd); - this.* = .{ .ignore = {} }; - }, - .buffered_input => { - this.buffered_input.deinit(); - }, - .ignore => {}, - .inherit => {}, - }; - } - }; - - pub const Readable = union(enum) { - fd: bun.FileDescriptor, - - pipe: Pipe, - inherit: void, - ignore: void, - closed: void, - - pub fn ref(this: *Readable) void { - switch (this.*) { - .pipe => { - if (this.pipe == .buffer) { - // if (this.pipe.buffer.fifo.poll_ref) |poll| { - // poll.enableKeepingProcessAlive(get_vm.get()); - // } - } - }, - else => {}, - } - } - - pub fn unref(this: *Readable) void { - switch (this.*) { - .pipe => { - if (this.pipe == .buffer) { - // if (this.pipe.buffer.fifo.poll_ref) |poll| { - // poll.enableKeepingProcessAlive(get_vm.get()); - // } - } - }, - else => {}, - } - } - - pub fn init(subproc: *Subprocess, comptime kind: OutKind, stdio: Stdio, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32) Readable { - return switch (stdio) { - .ignore => Readable{ .ignore = {} }, - .pipe => { - var subproc_readable_ptr = subproc.getIO(kind); - subproc_readable_ptr.* = Readable{ .pipe = .{ .buffer = undefined } }; - BufferedOutput.initWithAllocator(subproc, &subproc_readable_ptr.pipe.buffer, kind, allocator, fd.?, max_size); - return subproc_readable_ptr.*; - }, - .inherit => { - return Readable{ .inherit = {} }; - }, - .captured => |captured| { - var subproc_readable_ptr = subproc.getIO(kind); - subproc_readable_ptr.* = Readable{ .pipe = .{ .buffer = undefined } }; - BufferedOutput.initWithAllocator(subproc, &subproc_readable_ptr.pipe.buffer, kind, allocator, fd.?, max_size); - subproc_readable_ptr.pipe.buffer.out = captured; - subproc_readable_ptr.pipe.buffer.writer = BufferedOutput.CapturedBufferedWriter{ - .src = WriterSrc{ - .inner = &subproc_readable_ptr.pipe.buffer, - }, - .fd = if (kind == .stdout) bun.STDOUT_FD else bun.STDERR_FD, - .parent = .{ .parent = &subproc_readable_ptr.pipe.buffer }, - }; - return subproc_readable_ptr.*; - }, - .path => Readable{ .ignore = {} }, - .blob, .fd => Readable{ .fd = fd.? }, - .array_buffer => { - var subproc_readable_ptr = subproc.getIO(kind); - subproc_readable_ptr.* = Readable{ - .pipe = .{ - .buffer = undefined, - }, - }; - if (stdio.array_buffer.from_jsc) { - BufferedOutput.initWithArrayBuffer(subproc, &subproc_readable_ptr.pipe.buffer, kind, fd.?, stdio.array_buffer.buf); - } else { - subproc_readable_ptr.pipe.buffer = BufferedOutput.initWithSlice(subproc, kind, fd.?, stdio.array_buffer.buf.slice()); - } - return subproc_readable_ptr.*; - }, - }; - } - - pub fn onClose(this: *Readable, _: ?bun.sys.Error) void { - this.* = .closed; - } - - pub fn onReady(_: *Readable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {} - - pub fn onStart(_: *Readable) void {} - - pub fn close(this: *Readable) void { - log("READABLE close", .{}); - switch (this.*) { - .fd => |fd| { - _ = bun.sys.close(fd); - }, - .pipe => { - this.pipe.done(); - }, - else => {}, - } - } - - pub fn finalize(this: *Readable) void { - log("Readable::finalize", .{}); - switch (this.*) { - .fd => |fd| { - _ = bun.sys.close(fd); - }, - .pipe => { - if (this.pipe == .stream and this.pipe.stream.ptr == .File) { - this.close(); - return; - } - - // this.pipe.buffer.close(); - }, - else => {}, - } - } - - pub fn toJS(this: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue { - switch (this.*) { - .fd => |fd| { - return JSValue.jsNumber(fd); - }, - .pipe => { - return this.pipe.toJS(this, globalThis, exited); - }, - else => { - return JSValue.jsUndefined(); - }, - } - } - - pub fn toSlice(this: *Readable) ?[]const u8 { - switch (this.*) { - .fd => return null, - .pipe => { - this.pipe.buffer.fifo.close_on_empty_read = true; - this.pipe.buffer.readAll(); - - const bytes = this.pipe.buffer.internal_buffer.slice(); - // this.pipe.buffer.internal_buffer = .{}; - - if (bytes.len > 0) { - return bytes; - } - - return ""; - }, - else => { - return null; - }, - } - } - - pub fn toBufferedValue(this: *Readable, globalThis: *JSC.JSGlobalObject) JSValue { - switch (this.*) { - .fd => |fd| { - return JSValue.jsNumber(fd); - }, - .pipe => { - this.pipe.buffer.fifo.close_on_empty_read = true; - this.pipe.buffer.readAll(); - - const bytes = this.pipe.buffer.internal_buffer.slice(); - this.pipe.buffer.internal_buffer = .{}; - - if (bytes.len > 0) { - // Return a Buffer so that they can do .toString() on it - return JSC.JSValue.createBuffer(globalThis, bytes, bun.default_allocator); - } - - return JSC.JSValue.createBuffer(globalThis, &.{}, bun.default_allocator); - }, - else => { - return JSValue.jsUndefined(); - }, - } - } - }; - pub const CapturedBufferedWriter = bun.shell.eval.NewBufferedWriter( WriterSrc, struct { - parent: *Out, + parent: *BufferedOutput, pub inline fn onDone(this: @This(), e: ?bun.sys.Error) void { this.parent.onBufferedWriterDone(e); } @@ -434,7 +113,7 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh ); const WriterSrc = struct { - inner: *Out, + inner: *BufferedOutput, pub inline fn bufToWrite(this: WriterSrc, written: usize) []const u8 { if (written >= this.inner.internal_buffer.len) return ""; @@ -448,210 +127,16 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh } }; - pub const Out = struct { - internal_buffer: bun.ByteList = .{}, - owns_internal_buffer: bool = true, - subproc: *Subprocess, - out_type: OutKind, + // pub const Pipe = struct { + // writer: Writer = Writer{}, + // parent: *Subprocess, + // src: WriterSrc, - writer: ?CapturedBufferedWriter = null, + // writer: ?CapturedBufferedWriter = null, - status: Status = .{ - .pending = {}, - }, - - pub const Status = union(enum) { - pending: void, - done: void, - err: bun.sys.Error, - }; - }; - - // pub const BufferedOutput = struct { - // fifo: FIFO = undefined, - // internal_buffer: bun.ByteList = .{}, - // auto_sizer: ?JSC.WebCore.AutoSizer = null, - // subproc: *Subprocess, - // out_type: OutKind, - // /// Sometimes the `internal_buffer` may be filled with memory from JSC, - // /// for example an array buffer. In that case we shouldn't dealloc - // /// memory and let the GC do it. - // from_jsc: bool = false, // status: Status = .{ // .pending = {}, // }, - // recall_readall: bool = true, - // /// Used to allow to write to fd and also capture the data - // writer: ?CapturedBufferedWriter = null, - // out: ?*bun.ByteList = null, - - // pub const Status = union(enum) { - // pending: void, - // done: void, - // err: bun.sys.Error, - // }; - - // pub fn init(subproc: *Subprocess, out_type: OutKind, fd: bun.FileDescriptor) BufferedOutput { - // return BufferedOutput{ - // .out_type = out_type, - // .subproc = subproc, - // .internal_buffer = .{}, - // .fifo = FIFO{ - // .fd = fd, - // }, - // }; - // } - - // pub fn initWithArrayBuffer(subproc: *Subprocess, out: *BufferedOutput, comptime out_type: OutKind, fd: bun.FileDescriptor, array_buf: JSC.ArrayBuffer.Strong) void { - // out.* = BufferedOutput.initWithSlice(subproc, out_type, fd, array_buf.slice()); - // out.from_jsc = true; - // out.fifo.view = array_buf.held; - // out.fifo.buf = out.internal_buffer.ptr[0..out.internal_buffer.cap]; - // } - - // pub fn initWithSlice(subproc: *Subprocess, comptime out_type: OutKind, fd: bun.FileDescriptor, slice: []u8) BufferedOutput { - // return BufferedOutput{ - // // fixed capacity - // .internal_buffer = bun.ByteList.initWithBuffer(slice), - // .auto_sizer = null, - // .subproc = subproc, - // .fifo = FIFO{ - // .fd = fd, - // }, - // .out_type = out_type, - // }; - // } - - // pub fn initWithAllocator(subproc: *Subprocess, out: *BufferedOutput, comptime out_type: OutKind, allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) void { - // out.* = init(subproc, out_type, fd); - // out.auto_sizer = .{ - // .max = max_size, - // .allocator = allocator, - // .buffer = &out.internal_buffer, - // }; - // out.fifo.auto_sizer = &out.auto_sizer.?; - // } - - // pub fn onBufferedWriterDone(this: *BufferedOutput, e: ?bun.sys.Error) void { - // _ = e; // autofix - - // defer this.signalDoneToCmd(); - // // if (e) |err| { - // // this.status = .{ .err = err }; - // // } - // } - - // pub fn isDone(this: *BufferedOutput) bool { - // if (this.status != .done and this.status != .err) return false; - // if (this.writer != null) { - // return this.writer.?.isDone(); - // } - // return true; - // } - - // pub fn signalDoneToCmd(this: *BufferedOutput) void { - // log("signalDoneToCmd ({x}: {s}) isDone={any}", .{ @intFromPtr(this), @tagName(this.out_type), this.isDone() }); - // // `this.fifo.close()` will be called from the parent - // // this.fifo.close(); - // if (!this.isDone()) return; - // if (this.subproc.cmd_parent) |cmd| { - // if (this.writer != null) { - // if (this.writer.?.err) |e| { - // if (this.status != .err) { - // this.status = .{ .err = e }; - // } - // } - // } - // cmd.bufferedOutputClose(this.out_type); - // } - // } - - // /// This is called after it is read (it's confusing because "on read" could - // /// be interpreted as present or past tense) - // pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void { - // log("ON READ {s} result={s}", .{ @tagName(this.out_type), @tagName(result) }); - // defer { - // if (this.status == .err or this.status == .done) { - // this.signalDoneToCmd(); - // } else if (this.recall_readall and this.recall_readall) { - // this.readAll(); - // } - // } - // switch (result) { - // .pending => { - // this.watch(); - // return; - // }, - // .err => |err| { - // if (err == .Error) { - // this.status = .{ .err = err.Error }; - // } else { - // this.status = .{ .err = bun.sys.Error.fromCode(.CANCELED, .read) }; - // } - // // this.fifo.close(); - // // this.closeFifoSignalCmd(); - // return; - // }, - // .done => { - // this.status = .{ .done = {} }; - // // this.fifo.close(); - // // this.closeFifoSignalCmd(); - // return; - // }, - // else => { - // const slice = switch (result) { - // .into_array => this.fifo.buf[0..result.into_array.len], - // else => result.slice(), - // }; - // log("buffered output ({s}) onRead: {s}", .{ @tagName(this.out_type), slice }); - // this.internal_buffer.len += @as(u32, @truncate(slice.len)); - // if (slice.len > 0) - // std.debug.assert(this.internal_buffer.contains(slice)); - - // if (this.writer != null) { - // this.writer.?.writeIfPossible(false); - // } - - // this.fifo.buf = this.internal_buffer.ptr[@min(this.internal_buffer.len, this.internal_buffer.cap)..this.internal_buffer.cap]; - - // if (result.isDone() or (slice.len == 0 and this.fifo.poll_ref != null and this.fifo.poll_ref.?.isHUP())) { - // this.status = .{ .done = {} }; - // // this.fifo.close(); - // // this.closeFifoSignalCmd(); - // } - // }, - // } - // } - - // pub fn readAll(this: *BufferedOutput) void { - // log("ShellBufferedOutput.readAll doing nothing", .{}); - // this.watch(); - // } - - // pub fn watch(this: *BufferedOutput) void { - // std.debug.assert(this.fifo.fd != bun.invalid_fd); - - // this.fifo.pending.set(BufferedOutput, this, onRead); - // if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd); - // return; - // } - - // pub fn close(this: *BufferedOutput) void { - // log("BufferedOutput close", .{}); - // switch (this.status) { - // .done => {}, - // .pending => { - // this.fifo.close(); - // this.status = .{ .done = {} }; - // }, - // .err => {}, - // } - - // if (this.internal_buffer.cap > 0 and !this.from_jsc) { - // this.internal_buffer.listManaged(bun.default_allocator).deinit(); - // this.internal_buffer = .{}; - // } - // } // }; pub const StaticPipeWriter = JSC.Subprocess.NewStaticPipeWriter(Subprocess); @@ -948,6 +433,9 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh spawn_args: *SpawnArgs, out_subproc: **@This(), ) bun.shell.Result(*@This()) { + if (comptime true) { + @panic("TODO"); + } const globalThis = GlobalHandle.init(globalThis_); const is_sync = config.is_sync;