diff --git a/src/async/posix_event_loop.zig b/src/async/posix_event_loop.zig index f067970614..951042171d 100644 --- a/src/async/posix_event_loop.zig +++ b/src/async/posix_event_loop.zig @@ -144,16 +144,13 @@ pub const FilePoll = struct { allocator_type: AllocatorType = .js, - pub const AllocatorType = enum { - js, - mini, - }; + const ShellBufferedWriter = bun.shell.Interpreter.IOWriter.Poll; + // const ShellBufferedWriter = bun.shell.Interpreter.WriterImpl; const FileReader = JSC.WebCore.FileReader; // const FIFO = JSC.WebCore.FIFO; // const FIFOMini = JSC.WebCore.FIFOMini; - const ShellBufferedWriter = bun.shell.Interpreter.BufferedWriter.Poll; const ShellSubprocessCapturedPipeWriter = bun.shell.subproc.PipeReader.CapturedWriter.Poll; // const ShellBufferedWriterMini = bun.shell.InterpreterMini.BufferedWriter; // const ShellBufferedInput = bun.shell.ShellSubprocess.BufferedInput; @@ -171,7 +168,6 @@ pub const FilePoll = struct { const Deactivated = opaque { pub var owner: Owner = Owner.init(@as(*Deactivated, @ptrFromInt(@as(usize, 0xDEADBEEF)))); }; - const LifecycleScriptSubprocessOutputReader = bun.install.LifecycleScriptSubprocess.OutputReader; const BufferedReader = bun.io.BufferedReader; pub const Owner = bun.TaggedPointerUnion(.{ @@ -188,7 +184,7 @@ pub const FilePoll = struct { StaticPipeWriter, - ShellBufferedWriter, + // ShellBufferedWriter, ShellSubprocessCapturedPipeWriter, BufferedReader, @@ -198,8 +194,14 @@ pub const FilePoll = struct { GetAddrInfoRequest, // LifecycleScriptSubprocessOutputReader, Process, + ShellBufferedWriter, // i do not know why, but this has to be here otherwise compiler will complain about dependency loop }); + pub const AllocatorType = enum { + js, + mini, + }; + fn updateFlags(poll: *FilePoll, updated: Flags.Set) void { var flags = poll.flags; flags.remove(.readable); diff --git a/src/baby_list.zig b/src/baby_list.zig index 07a25959f4..17329a854d 100644 --- a/src/baby_list.zig +++ b/src/baby_list.zig @@ -210,6 +210,12 @@ pub fn BabyList(comptime Type: type) type { this.update(list_); } + pub fn appendFmt(this: *@This(), allocator: std.mem.Allocator, comptime fmt: []const u8, args: anytype) !void { + var list__ = this.listManaged(allocator); + const writer = list__.writer(); + try writer.print(fmt, args); + } + pub fn append(this: *@This(), allocator: std.mem.Allocator, value: []const Type) !void { var list__ = this.listManaged(allocator); try list__.appendSlice(value); diff --git a/src/bun.js/ConsoleObject.zig b/src/bun.js/ConsoleObject.zig index 9cd7a3fd12..d8820e3809 100644 --- a/src/bun.js/ConsoleObject.zig +++ b/src/bun.js/ConsoleObject.zig @@ -35,6 +35,8 @@ writer: BufferedWriter, counts: Counter = .{}, +pub fn format(_: @This(), comptime _: []const u8, _: anytype, _: anytype) !void {} + pub fn init(error_writer: Output.WriterType, writer: Output.WriterType) ConsoleObject { return ConsoleObject{ .error_writer = BufferedWriter{ .unbuffered_writer = error_writer }, @@ -195,7 +197,7 @@ pub fn messageWithTypeAndLevel( } if (print_length > 0) - format( + format2( level, global, vals, @@ -650,7 +652,7 @@ pub const FormatOptions = struct { max_depth: u16 = 2, }; -pub fn format( +pub fn format2( level: MessageLevel, global: *JSGlobalObject, vals: [*]const JSValue, diff --git a/src/bun.js/api/BunObject.zig b/src/bun.js/api/BunObject.zig index 2e687edb58..b082b23cca 100644 --- a/src/bun.js/api/BunObject.zig +++ b/src/bun.js/api/BunObject.zig @@ -854,7 +854,7 @@ pub fn inspect( const Writer = @TypeOf(writer); // we buffer this because it'll almost always be < 4096 // when it's under 4096, we want to avoid the dynamic allocation - ConsoleObject.format( + ConsoleObject.format2( .Debug, globalThis, @as([*]const JSValue, @ptrCast(&value)), diff --git a/src/bun.js/api/bun/spawn/stdio.zig b/src/bun.js/api/bun/spawn/stdio.zig index bc474d89cd..485001fb69 100644 --- a/src/bun.js/api/bun/spawn/stdio.zig +++ b/src/bun.js/api/bun/spawn/stdio.zig @@ -13,7 +13,7 @@ const os = std.os; const uv = bun.windows.libuv; pub const Stdio = union(enum) { inherit: void, - capture: *bun.ByteList, + capture: struct { fd: bun.FileDescriptor, buf: *bun.ByteList }, ignore: void, fd: bun.FileDescriptor, dup2: struct { out: bun.JSC.Subprocess.StdioKind, to: bun.JSC.Subprocess.StdioKind }, diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index d2aa5525b0..b6e7ab5039 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -1610,6 +1610,25 @@ pub const SystemError = extern struct { pub const name = "SystemError"; pub const namespace = ""; + pub fn getErrno(this: *const SystemError) bun.C.E { + // The inverse in bun.sys.Error.toSystemError() + return @enumFromInt(this.errno * -1); + } + + pub fn deref(this: *const SystemError) void { + this.path.deref(); + this.code.deref(); + this.message.deref(); + this.syscall.deref(); + } + + pub fn ref(this: *SystemError) void { + this.path.ref(); + this.code.ref(); + this.message.ref(); + this.syscall.ref(); + } + pub fn toErrorInstance(this: *const SystemError, global: *JSGlobalObject) JSValue { defer { this.path.deref(); @@ -3905,7 +3924,7 @@ pub const JSValue = enum(JSValueReprInt) { .quote_strings = true, }; - JSC.ConsoleObject.format( + JSC.ConsoleObject.format2( .Debug, globalObject, @as([*]const JSValue, @ptrCast(&this)), diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index 237e63811b..2e56d25ac9 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -1238,7 +1238,7 @@ pub const PathOrFileDescriptor = union(Tag) { } switch (this) { .path => |p| try writer.writeAll(p.slice()), - .fd => |fd| try writer.print("{d}", .{fd}), + .fd => |fd| try writer.print("{}", .{fd}), } } diff --git a/src/bun.js/test/diff_format.zig b/src/bun.js/test/diff_format.zig index ca7d03f573..27e4bf78a0 100644 --- a/src/bun.js/test/diff_format.zig +++ b/src/bun.js/test/diff_format.zig @@ -102,7 +102,7 @@ pub const DiffFormatter = struct { .quote_strings = true, .max_depth = 100, }; - ConsoleObject.format( + ConsoleObject.format2( .Debug, this.globalObject, @as([*]const JSValue, @ptrCast(&received)), @@ -116,7 +116,7 @@ pub const DiffFormatter = struct { buffered_writer_.context = &expected_buf; - ConsoleObject.format( + ConsoleObject.format2( .Debug, this.globalObject, @as([*]const JSValue, @ptrCast(&this.expected)), diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index c50d152310..3f03db1fcb 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -221,7 +221,7 @@ pub const WebWorker = struct { const Writer = @TypeOf(writer); // we buffer this because it'll almost always be < 4096 // when it's under 4096, we want to avoid the dynamic allocation - bun.JSC.ConsoleObject.format( + bun.JSC.ConsoleObject.format2( .Debug, globalObject, &[_]JSC.JSValue{error_instance}, diff --git a/src/bun.zig b/src/bun.zig index f2428b42e9..6e6d96d038 100644 --- a/src/bun.zig +++ b/src/bun.zig @@ -2521,6 +2521,10 @@ pub fn NewRefCounted(comptime T: type, comptime deinit_fn: ?fn (self: *T) void) } } + const output_name: []const u8 = if (@hasDecl(T, "DEBUG_REFCOUNT_NAME")) T.DEBUG_REFCOUNT_NAME else meta.typeBaseName(@typeName(T)); + + const log = Output.scoped(output_name, true); + return struct { const allocation_logger = Output.scoped(.alloc, @hasDecl(T, "logAllocations")); @@ -2538,10 +2542,12 @@ pub fn NewRefCounted(comptime T: type, comptime deinit_fn: ?fn (self: *T) void) } pub fn ref(self: *T) void { + log("0x{x} ref {d} + 1 = {d}", .{ @intFromPtr(self), self.ref_count, self.ref_count + 1 }); self.ref_count += 1; } pub fn deref(self: *T) void { + log("0x{x} deref {d} - 1 = {d}", .{ @intFromPtr(self), self.ref_count, self.ref_count - 1 }); self.ref_count -= 1; if (self.ref_count == 0) { diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 5c6558f63e..4a4a0cbe37 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -624,6 +624,7 @@ const PosixBufferedReader = struct { _buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), vtable: BufferedReaderVTable, flags: Flags = .{}, + close_handle: bool = true, const Flags = packed struct { is_done: bool = false, @@ -756,6 +757,7 @@ const PosixBufferedReader = struct { } fn closeHandle(this: *PosixBufferedReader) void { + if (!this.close_handle) return; if (this.flags.closed_without_reporting) { this.flags.closed_without_reporting = false; this.done(); diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index 77d040ab9a..afdff29b77 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -189,6 +189,10 @@ pub fn PosixBufferedWriter( pub const auto_poll = if (@hasDecl(Parent, "auto_poll")) Parent.auto_poll else true; + pub fn createPoll(this: *@This(), fd: bun.FileDescriptor) *Async.FilePoll { + return Async.FilePoll.init(@as(*Parent, @ptrCast(this.parent)).eventLoop(), fd, .{}, PosixWriter, this); + } + pub fn getPoll(this: *const @This()) ?*Async.FilePoll { return this.handle.getPoll(); } @@ -322,7 +326,7 @@ pub fn PosixBufferedWriter( pub fn watch(this: *PosixWriter) void { if (this.pollable) { if (this.handle == .fd) { - this.handle = .{ .poll = Async.FilePoll.init(@as(*Parent, @ptrCast(this.parent)).eventLoop(), this.getFd(), .{}, PosixWriter, this) }; + this.handle = .{ .poll = this.createPoll(this.getFd()) }; } this.registerPoll(); @@ -337,7 +341,7 @@ pub fn PosixBufferedWriter( return JSC.Maybe(void){ .result = {} }; } var poll = this.getPoll() orelse brk: { - this.handle = .{ .poll = Async.FilePoll.init(@as(*Parent, @ptrCast(this.parent)).eventLoop(), fd, .{}, PosixWriter, this) }; + this.handle = .{ .poll = this.createPoll(fd) }; break :brk this.handle.poll; }; const loop = @as(*Parent, @ptrCast(this.parent)).eventLoop().loop(); diff --git a/src/output.zig b/src/output.zig index a204cd75f2..f09daace05 100644 --- a/src/output.zig +++ b/src/output.zig @@ -442,7 +442,12 @@ pub noinline fn print(comptime fmt: string, args: anytype) callconv(std.builtin. /// To enable all logs, set the environment variable /// BUN_DEBUG_ALL=1 const _log_fn = fn (comptime fmt: string, args: anytype) void; -pub fn scoped(comptime tag: @Type(.EnumLiteral), comptime disabled: bool) _log_fn { +pub fn scoped(comptime tag: anytype, comptime disabled: bool) _log_fn { + const tagname = switch (@TypeOf(tag)) { + @Type(.EnumLiteral) => @tagName(tag), + []const u8 => tag, + else => @compileError("Output.scoped expected @Type(.EnumLiteral) or []const u8, you gave: " ++ @typeName(@Type(tag))), + }; if (comptime !Environment.isDebug or !Environment.isNative) { return struct { pub fn log(comptime _: string, _: anytype) void {} @@ -473,7 +478,7 @@ pub fn scoped(comptime tag: @Type(.EnumLiteral), comptime disabled: bool) _log_f if (!evaluated_disable) { evaluated_disable = true; if (bun.getenvZ("BUN_DEBUG_ALL") != null or - bun.getenvZ("BUN_DEBUG_" ++ @tagName(tag)) != null) + bun.getenvZ("BUN_DEBUG_" ++ tagname) != null) { really_disable = false; } else if (bun.getenvZ("BUN_DEBUG_QUIET_LOGS")) |val| { @@ -496,7 +501,7 @@ pub fn scoped(comptime tag: @Type(.EnumLiteral), comptime disabled: bool) _log_f defer lock.unlock(); if (Output.enable_ansi_colors_stdout and buffered_writer.unbuffered_writer.context.handle == writer().context.handle) { - out.print(comptime prettyFmt("[" ++ @tagName(tag) ++ "] " ++ fmt, true), args) catch { + out.print(comptime prettyFmt("[" ++ tagname ++ "] " ++ fmt, true), args) catch { really_disable = true; return; }; @@ -505,7 +510,7 @@ pub fn scoped(comptime tag: @Type(.EnumLiteral), comptime disabled: bool) _log_f return; }; } else { - out.print(comptime prettyFmt("[" ++ @tagName(tag) ++ "] " ++ fmt, false), args) catch { + out.print(comptime prettyFmt("[" ++ tagname ++ "] " ++ fmt, false), args) catch { really_disable = true; return; }; diff --git a/src/shell/interpreter.zig b/src/shell/interpreter.zig index 8942a7aeb4..5625fc0453 100644 --- a/src/shell/interpreter.zig +++ b/src/shell/interpreter.zig @@ -71,7 +71,7 @@ pub fn assert(cond: bool, comptime msg: []const u8) void { } } -const ExitCode = if (bun.Environment.isWindows) u16 else u8; +const ExitCode = if (bun.Environment.isWindows) u16 else u16; pub const StateKind = enum(u8) { script, @@ -138,6 +138,69 @@ pub fn Cow(comptime T: type, comptime VTable: type) type { }; } +/// Copy-on-write file descriptor. This is to avoid having multiple non-blocking +/// writers to the same file descriptor, which breaks epoll/kqueue +/// +/// Two main fields: +/// 1. refcount - tracks number of references to the fd, closes file descriptor when reaches 0 +/// 2. being_written - if the fd is currently being used by a BufferedWriter for non-blocking writes +/// +/// If you want to write to the file descriptor, you call `.write()`, if `being_written` is true it will duplicate the file descriptor. +const CowFd = struct { + __fd: bun.FileDescriptor, + refcount: u32 = 1, + being_used: bool = false, + + pub fn init(fd: bun.FileDescriptor) *CowFd { + const this = bun.default_allocator.create(CowFd) catch bun.outOfMemory(); + this.* = .{ + .__fd = fd, + }; + return this; + } + + pub fn dup(this: *CowFd) Maybe(*CowFd) { + const new = bun.new(CowFd, .{ + .fd = bun.sys.dup(this.fd), + .writercount = 1, + }); + return new; + } + + pub fn use(this: *CowFd) Maybe(*CowFd) { + if (!this.being_used) { + this.being_used = true; + this.ref(); + return .{ .result = this }; + } + return this.dup(); + } + + pub fn doneUsing(this: *CowFd) void { + this.being_used = false; + } + + pub fn ref(this: *CowFd) void { + this.refcount += 1; + } + + pub fn refSelf(this: *CowFd) *CowFd { + this.ref(); + return this; + } + + pub fn deref(this: *CowFd) void { + this.refcount -= 1; + if (this.refcount == 0) {} + } + + pub fn deinit(this: *CowFd) void { + std.debug.assert(this.refcount == 0); + _ = bun.sys.close(this.fd); + bun.destroy(this); + } +}; + pub const CoroutineResult = enum { /// it's okay for the caller to continue its execution cont, @@ -145,18 +208,73 @@ pub const CoroutineResult = enum { }; pub const IO = struct { - stdin: Kind = .{ .std = .{} }, - stdout: Kind = .{ .std = .{} }, - stderr: Kind = .{ .std = .{} }, + stdin: InKind, + stdout: OutKind, + stderr: OutKind, - pub const Kind = union(enum) { - /// Use stdin/stdout/stderr of this process + pub fn deinit(this: *IO) void { + this.stdin.close(); + this.stdout.close(); + this.stderr.close(); + } + + pub fn ref(this: *IO) *IO { + _ = this.stdin.ref(); + _ = this.stdout.ref(); + _ = this.stderr.ref(); + return this; + } + + pub fn deref(this: *IO) void { + this.stdin.deref(); + this.stdout.deref(); + this.stderr.deref(); + } + + pub const InKind = union(enum) { + fd: *CowFd, + ignore, + + pub fn ref(this: InKind) InKind { + switch (this) { + .fd => this.fd.ref(), + .ignore => {}, + } + return this; + } + + pub fn deref(this: InKind) void { + switch (this) { + .fd => this.fd.deref(), + .ignore => {}, + } + } + + pub fn close(this: InKind) void { + switch (this) { + .fd => this.fd.deref(), + .ignore => {}, + } + } + + pub fn to_subproc_stdio(this: InKind, stdio: *bun.shell.subproc.Stdio) void { + switch (this) { + .fd => { + stdio.* = .{ .fd = this.fd.__fd }; + }, + .ignore => { + stdio.* = .ignore; + }, + } + } + }; + + pub const OutKind = union(enum) { + /// Write/Read to/from file descriptor /// If `captured` is non-null, it will write to std{out,err} and also buffer it. /// The pointer points to the `buffered_stdout`/`buffered_stdin` fields /// in the Interpreter struct - std: struct { captured: ?*bun.ByteList = null }, - /// Write/Read to/from file descriptor - fd: bun.FileDescriptor, + fd: struct { writer: *Interpreter.IOWriter, captured: ?*bun.ByteList = null }, /// Buffers the output (handled in Cmd.BufferedIoClosed.close()) pipe, /// Discards output @@ -164,19 +282,55 @@ pub const IO = struct { // fn dupeForSubshell(this: *ShellState, - fn close(this: Kind) void { + pub fn ref(this: @This()) @This() { switch (this) { .fd => { - closefd(this.fd); + this.fd.writer.ref(); + }, + else => {}, + } + return this; + } + + pub fn deref(this: @This()) void { + this.close(); + } + + pub fn enqueueFmtBltn( + this: *@This(), + ptr: anytype, + comptime kind: ?Interpreter.Builtin.Kind, + comptime fmt_: []const u8, + args: anytype, + ) void { + this.enqueueFmtBltnImpl(ptr, kind, fmt_, args, false); + } + + pub fn enqueueFmtBltnImpl( + this: *@This(), + ptr: anytype, + comptime kind: ?Interpreter.Builtin.Kind, + comptime fmt_: []const u8, + args: anytype, + comptime write: bool, + ) void { + if (bun.Environment.allow_assert) std.debug.assert(this.* == .fd); + this.fd.writer.enqueueFmtBltn(ptr, this.fd.captured, kind, fmt_, args); + if (comptime write) this.fd.writer.write(); + } + + fn close(this: OutKind) void { + switch (this) { + .fd => { + this.fd.writer.deref(); }, else => {}, } } - fn to_subproc_stdio(this: Kind) bun.shell.subproc.Stdio { + fn to_subproc_stdio(this: OutKind) bun.shell.subproc.Stdio { return switch (this) { - .std => if (this.std.captured) |cap| .{ .capture = cap } else .inherit, - .fd => |val| .{ .fd = val }, + .fd => |val| if (val.captured) |cap| .{ .capture = .{ .buf = cap, .fd = val.writer.fd } } else .{ .fd = val.writer.fd }, .pipe => .pipe, .ignore => .ignore, }; @@ -184,7 +338,8 @@ pub const IO = struct { }; fn to_subproc_stdio(this: IO, stdio: *[3]bun.shell.subproc.Stdio) void { - stdio[stdin_no] = this.stdin.to_subproc_stdio(); + // stdio[stdin_no] = this.stdin.to_subproc_stdio(); + this.stdin.to_subproc_stdio(&stdio[0]); stdio[stdout_no] = this.stdout.to_subproc_stdio(); stdio[stderr_no] = this.stderr.to_subproc_stdio(); } @@ -452,7 +607,7 @@ pub const Interpreter = struct { }); pub const ShellState = struct { - io: IO = .{}, + io: IO, kind: Kind = .normal, /// These MUST use the `bun.default_allocator` Allocator @@ -536,6 +691,7 @@ pub const Interpreter = struct { } } + this.io.deinit(); this.shell_env.deinit(); this.cmd_local_env.deinit(); this.export_env.deinit(); @@ -554,21 +710,15 @@ pub const Interpreter = struct { .result => |fd| fd, }; - const stdout: Bufio = if (io.stdout == .std) brk: { - if (io.stdout.std.captured != null) break :brk .{ .borrowed = io.stdout.std.captured.? }; + const stdout: Bufio = if (io.stdout == .fd) brk: { + if (io.stdout.fd.captured != null) break :brk .{ .borrowed = io.stdout.fd.captured.? }; break :brk .{ .owned = .{} }; - } else if (kind == .pipeline) - .{ .borrowed = this.buffered_stdout() } - else - .{ .owned = .{} }; + } else if (kind == .pipeline) .{ .borrowed = this.buffered_stdout() } else .{ .owned = .{} }; - const stderr: Bufio = if (io.stderr == .std) brk: { - if (io.stderr.std.captured != null) break :brk .{ .borrowed = io.stderr.std.captured.? }; + const stderr: Bufio = if (io.stderr == .fd) brk: { + if (io.stderr.fd.captured != null) break :brk .{ .borrowed = io.stderr.fd.captured.? }; break :brk .{ .owned = .{} }; - } else if (kind == .pipeline) - .{ .borrowed = this.buffered_stderr() } - else - .{ .owned = .{} }; + } else if (kind == .pipeline) .{ .borrowed = this.buffered_stderr() } else .{ .owned = .{} }; duped.* = .{ .io = io, @@ -683,77 +833,25 @@ pub const Interpreter = struct { return EnvStr.initSlice("unknown"); } - pub fn writeFailingError( + pub fn writeFailingErrorFmt( this: *ShellState, - buf: []const u8, ctx: anytype, - comptime handleIOWrite: fn ( - c: @TypeOf(ctx), - bufw: BufferedWriter, - ) void, - event_loop: JSC.EventLoopHandle, - ) CoroutineResult { - const IOWriteFn = struct { - pub fn run(c: @TypeOf(ctx), bufw: BufferedWriter) void { - handleIOWrite(c, bufw); - } - }; - - switch (this.writeIO(.stderr, buf, ctx, IOWriteFn.run, event_loop)) { - .cont => { - ctx.parent.childDone(ctx, 1); - return .yield; - }, - .yield => return .yield, - } - } - - pub fn writeIO( - this: *ShellState, - comptime iotype: @Type(.EnumLiteral), - buf: []const u8, - ctx: anytype, - comptime handleIOWrite: fn ( - c: @TypeOf(ctx), - bufw: BufferedWriter, - ) void, - event_loop: JSC.EventLoopHandle, - ) CoroutineResult { - const io: *IO.Kind = &@field(this.io, @tagName(iotype)); - + enqueueCb: fn (c: @TypeOf(ctx)) void, + comptime fmt: []const u8, + args: anytype, + ) void { + const io: *IO.OutKind = &@field(this.io, "stderr"); switch (io.*) { - .std => |val| { - const bw = BufferedWriter{ - .event_loop = event_loop, - .fd = if (iotype == .stdout) bun.STDOUT_FD else bun.STDERR_FD, - .buffer = buf, - .parent = BufferedWriter.ParentPtr.init(ctx), - .bytelist = val.captured, - }; - handleIOWrite(ctx, bw); - return .yield; - }, - .fd => { - const bw = BufferedWriter{ - .event_loop = event_loop, - .fd = if (iotype == .stdout) bun.STDOUT_FD else bun.STDERR_FD, - .buffer = buf, - .parent = BufferedWriter.ParentPtr.init(ctx), - }; - handleIOWrite(ctx, bw); - return .yield; + .fd => |x| { + enqueueCb(ctx); + x.writer.enqueueFmt(ctx, x.captured, fmt, args); + x.writer.write(); }, .pipe => { - const func = @field(ShellState, "buffered_" ++ @tagName(iotype)); - const bufio: *bun.ByteList = func(this); - bufio.append(bun.default_allocator, buf) catch bun.outOfMemory(); - // this.parent.childDone(this, 1); - return .cont; - }, - .ignore => { - // this.parent.childDone(this, 1); - return .cont; + const bufio: *bun.ByteList = this.buffered_stderr(); + bufio.appendFmt(bun.default_allocator, fmt, args) catch bun.outOfMemory(); }, + .ignore => {}, } } }; @@ -958,6 +1056,25 @@ pub const Interpreter = struct { std.debug.assert(cwd_arr.items[cwd_arr.items.len -| 1] == 0); } + const stdin_fd = switch (Syscall.dup(bun.STDIN_FD)) { + .result => |fd| fd, + .err => |err| return .{ .err = .{ .sys = err.toSystemError() } }, + }; + + const stdout_fd = switch (Syscall.dup(bun.STDOUT_FD)) { + .result => |fd| fd, + .err => |err| return .{ .err = .{ .sys = err.toSystemError() } }, + }; + + const stderr_fd = switch (Syscall.dup(bun.STDERR_FD)) { + .result => |fd| fd, + .err => |err| return .{ .err = .{ .sys = err.toSystemError() } }, + }; + + const stdin_reader = CowFd.init(stdin_fd); + const stdout_writer = IOWriter.init(stdout_fd, event_loop); + const stderr_writer = IOWriter.init(stderr_fd, event_loop); + interpreter.* = .{ .event_loop = event_loop, @@ -968,7 +1085,21 @@ pub const Interpreter = struct { .arena = arena.*, .root_shell = ShellState{ - .io = .{}, + .io = .{ + .stdin = .{ + .fd = stdin_reader, + }, + .stdout = .{ + .fd = .{ + .writer = stdout_writer, + }, + }, + .stderr = .{ + .fd = .{ + .writer = stderr_writer, + }, + }, + }, .shell_env = EnvMap.init(allocator), .cmd_local_env = EnvMap.init(allocator), @@ -981,8 +1112,8 @@ pub const Interpreter = struct { }; if (event_loop == .js) { - interpreter.root_shell.io.stdout = .{ .std = .{ .captured = &interpreter.root_shell._buffered_stdout.owned } }; - interpreter.root_shell.io.stderr = .{ .std = .{ .captured = &interpreter.root_shell._buffered_stderr.owned } }; + interpreter.root_shell.io.stdout.fd.captured = &interpreter.root_shell._buffered_stdout.owned; + interpreter.root_shell.io.stderr.fd.captured = &interpreter.root_shell._buffered_stderr.owned; } return .{ .result = interpreter }; @@ -1141,7 +1272,7 @@ pub const Interpreter = struct { // this.promise.resolve(this.global, JSValue.jsNumberFromInt32(@intCast(exit_code))); // this.buffered_stdout. this.reject.deinit(); - _ = this.resolve.call(&[_]JSValue{if (comptime bun.Environment.isWindows) JSValue.jsNumberFromU16(exit_code) else JSValue.jsNumberFromChar(exit_code)}); + _ = this.resolve.call(&.{JSValue.jsNumberFromU16(exit_code)}); } else { this.done.?.* = true; } @@ -1193,6 +1324,8 @@ pub const Interpreter = struct { pub fn setQuiet(this: *ThisInterpreter, globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { _ = globalThis; _ = callframe; + this.root_shell.io.stdout.deref(); + this.root_shell.io.stderr.deref(); this.root_shell.io.stdout = .pipe; this.root_shell.io.stderr = .pipe; return .undefined; @@ -1319,6 +1452,10 @@ pub const Interpreter = struct { _ = has_pending_activity.fetchSub(1, .SeqCst); } + pub fn rootIO(this: *const Interpreter) *const IO { + return &this.root_shell.io; + } + const AssignCtx = enum { cmd, shell, @@ -1604,9 +1741,11 @@ pub const Interpreter = struct { .simple => |*simp| { const is_cmd_subst = this.expandSimpleNoIO(simp, &this.current_out); if (is_cmd_subst) { - var io: IO = .{}; - io.stdout = .pipe; - io.stderr = this.base.shell.io.stderr; + const io: IO = .{ + .stdin = this.base.rootIO().stdin.ref(), + .stdout = .pipe, + .stderr = this.base.shell.io.stderr.ref(), + }; const shell_state = switch (this.base.shell.dupeForSubshell(this.base.interpreter.allocator, io, .cmd_subst)) { .result => |s| s, .err => |e| { @@ -1631,9 +1770,11 @@ pub const Interpreter = struct { for (cmp.atoms[start_word_idx..]) |*simple_atom| { const is_cmd_subst = this.expandSimpleNoIO(simple_atom, &this.current_out); if (is_cmd_subst) { - var io: IO = .{}; - io.stdout = .pipe; - io.stderr = this.base.shell.io.stderr; + const io: IO = .{ + .stdin = this.base.rootIO().stdin.ref(), + .stdout = .pipe, + .stderr = this.base.shell.io.stderr.ref(), + }; const shell_state = switch (this.base.shell.dupeForSubshell(this.base.interpreter.allocator, io, .cmd_subst)) { .result => |s| s, .err => |e| { @@ -2101,6 +2242,10 @@ pub const Interpreter = struct { pub fn throw(this: *const State, err: *const bun.shell.ShellErr) void { throwShellErr(err, this.eventLoop()); } + + pub fn rootIO(this: *const State) *const IO { + return this.interpreter.rootIO(); + } }; pub const Script = struct { @@ -2164,7 +2309,8 @@ pub const Interpreter = struct { if (this.state.normal.idx >= this.node.stmts.len) return; const stmt_node = &this.node.stmts[this.state.normal.idx]; this.state.normal.idx += 1; - var stmt = Stmt.init(this.base.interpreter, this.base.shell, stmt_node, this, this.getIO()) catch bun.outOfMemory(); + var io = this.getIO(); + var stmt = Stmt.init(this.base.interpreter, this.base.shell, stmt_node, this, io.ref().*) catch bun.outOfMemory(); stmt.start(); return; }, @@ -2608,7 +2754,7 @@ pub const Interpreter = struct { state: union(enum) { idle, executing, - waiting_write_err: BufferedWriter, + waiting_write_err, done, } = .idle, @@ -2657,6 +2803,15 @@ pub const Interpreter = struct { return this.io orelse this.base.shell.io; } + fn writeFailingError(this: *Pipeline, comptime fmt: []const u8, args: anytype) void { + const handler = struct { + fn enqueueCb(ctx: *Pipeline) void { + ctx.state = .waiting_write_err; + } + }; + this.base.shell.writeFailingErrorFmt(this, handler.enqueueCb, fmt, args); + } + fn setupCommands(this: *Pipeline) CoroutineResult { const cmd_count = brk: { var i: u32 = 0; @@ -2678,27 +2833,29 @@ pub const Interpreter = struct { closefd(pipe[1]); } const system_err = err.toSystemError(); - this.writeFailingError("bun: {s}\n", .{system_err.message}, 1); + this.writeFailingError("bun: {s}\n", .{system_err.message}); return .yield; } } var i: u32 = 0; + const evtloop = this.base.eventLoop(); for (this.node.items) |*item| { switch (item.*) { .cmd => { const kind = "subproc"; _ = kind; var cmd_io = this.getIO(); - const stdin = if (cmd_count > 1) Pipeline.readPipe(pipes, i, &cmd_io) else cmd_io.stdin; - const stdout = if (cmd_count > 1) Pipeline.writePipe(pipes, i, cmd_count, &cmd_io) else cmd_io.stdout; + const stdin = if (cmd_count > 1) Pipeline.readPipe(pipes, i, &cmd_io) else cmd_io.stdin.ref(); + const stdout = if (cmd_count > 1) Pipeline.writePipe(pipes, i, cmd_count, &cmd_io, evtloop) else cmd_io.stdout.ref(); cmd_io.stdin = stdin; cmd_io.stdout = stdout; + _ = cmd_io.stderr.ref(); const subshell_state = switch (this.base.shell.dupeForSubshell(this.base.interpreter.allocator, cmd_io, .pipeline)) { .result => |s| s, .err => |err| { const system_err = err.toSystemError(); - this.writeFailingError("bun: {s}\n", .{system_err.message}, 1); + this.writeFailingError("bun: {s}\n", .{system_err.message}); return .yield; }, }; @@ -2716,25 +2873,6 @@ pub const Interpreter = struct { return .cont; } - pub fn writeFailingError( - this: *Pipeline, - comptime fmt: []const u8, - args: anytype, - exit_code: ExitCode, - ) void { - _ = exit_code; // autofix - - const HandleIOWrite = struct { - fn run(pipeline: *Pipeline, bufw: BufferedWriter) void { - pipeline.state = .{ .waiting_write_err = bufw }; - pipeline.state.waiting_write_err.write(); - } - }; - - const buf = std.fmt.allocPrint(this.base.interpreter.arena.allocator(), fmt, args) catch bun.outOfMemory(); - _ = this.base.shell.writeFailingError(buf, this, HandleIOWrite.run, this.base.eventLoop()); - } - pub fn start(this: *Pipeline) void { if (this.setupCommands() == .yield) return; @@ -2755,24 +2893,29 @@ pub const Interpreter = struct { return; } - for (cmds, 0..) |*cmd_or_result, i| { - var stdin: IO.Kind = if (i == 0) this.getIO().stdin else .{ .fd = this.pipes.?[i - 1][0] }; - var stdout: IO.Kind = if (i == cmds.len - 1) this.getIO().stdout else .{ .fd = this.pipes.?[i][1] }; + for (cmds) |*cmd_or_result| { + // var stdin: IO.InKind = if (i == 0) this.getIO().stdin.ref() else .{ .fd = CowFd.init(this.pipes.?[i - 1][0]) }; + // var stdout: IO.OutKind = brk: { + // if (i == cmds.len - 1) break :brk this.getIO().stdout.ref(); + + // const fd = this.pipes.?[i][1]; + // const writer = IOWriter.init(fd, this.base.eventLoop()); + // break :brk .{ .fd = .{ .writer = writer } }; + // }; std.debug.assert(cmd_or_result.* == .cmd); var cmd = cmd_or_result.cmd; - log("Spawn: proc_idx={d} stdin={any} stdout={any} stderr={any}\n", .{ i, stdin, stdout, cmd.io.stderr }); cmd.start(); // If command is a subproc (and not a builtin) we need to close the fd - if (cmd.isSubproc()) { - stdin.close(); - stdout.close(); - } + // if (cmd.isSubproc()) { + // stdin.close(); + // stdout.close(); + // } } } - pub fn onBufferedWriterDone(this: *Pipeline, err: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Pipeline, err: ?JSC.SystemError) void { if (comptime bun.Environment.allow_assert) { std.debug.assert(this.state == .waiting_write_err); } @@ -2854,16 +2997,16 @@ pub const Interpreter = struct { return Maybe(void).success; } - fn writePipe(pipes: []Pipe, proc_idx: usize, cmd_count: usize, io: *IO) IO.Kind { + fn writePipe(pipes: []Pipe, proc_idx: usize, cmd_count: usize, io: *IO, evtloop: JSC.EventLoopHandle) IO.OutKind { // Last command in the pipeline should write to stdout - if (proc_idx == cmd_count - 1) return io.stdout; - return .{ .fd = pipes[proc_idx][1] }; + if (proc_idx == cmd_count - 1) return io.stdout.ref(); + return .{ .fd = .{ .writer = IOWriter.init(pipes[proc_idx][1], evtloop) } }; } - fn readPipe(pipes: []Pipe, proc_idx: usize, io: *IO) IO.Kind { + fn readPipe(pipes: []Pipe, proc_idx: usize, io: *IO) IO.InKind { // First command in the pipeline should read from stdin - if (proc_idx == 0) return io.stdin; - return .{ .fd = pipes[proc_idx - 1][0] }; + if (proc_idx == 0) return io.stdin.ref(); + return .{ .fd = CowFd.init(pipes[proc_idx - 1][0]) }; } }; @@ -2888,7 +3031,7 @@ pub const Interpreter = struct { /// If the cmd redirects to a file we have to expand that string. /// Allocated in `spawn_arena` redirection_file: std.ArrayList(u8), - redirection_fd: bun.FileDescriptor = bun.invalid_fd, + redirection_fd: ?*CowFd = null, exec: Exec = .none, exit_code: ?ExitCode = null, @@ -2908,8 +3051,7 @@ pub const Interpreter = struct { }, exec, done, - waiting_write_err: BufferedWriter, - err: ?Syscall.Error, + waiting_write_err, }, const Subprocess = bun.shell.subproc.ShellSubprocess; @@ -2966,9 +3108,11 @@ pub const Interpreter = struct { } fn allClosed(this: *BufferedIoClosed) bool { - return (if (this.stdin) |stdin| stdin else true) and + const ret = (if (this.stdin) |stdin| stdin else true) and (if (this.stdout) |*stdout| stdout.closed() else true) and (if (this.stderr) |*stderr| stderr.closed() else true); + log("BufferedIOClosed(0x{x}) all_closed={any}", .{ @intFromPtr(this), ret }); + return ret; } fn close(this: *BufferedIoClosed, cmd: *Cmd, io: union(enum) { stdout: *Subprocess.Readable, stderr: *Subprocess.Readable, stdin }) void { @@ -3042,40 +3186,13 @@ pub const Interpreter = struct { /// If starting a command results in an error (failed to find executable in path for example) /// then it should write to the stderr of the entire shell script process - pub fn writeFailingError(this: *Cmd, buf: []const u8, exit_code: ExitCode) void { - _ = exit_code; // autofix - - const HandleIOWrite = struct { - fn run(cmd: *Cmd, bufw: BufferedWriter) void { - cmd.state = .{ .waiting_write_err = bufw }; - cmd.state.waiting_write_err.write(); + pub fn writeFailingError(this: *Cmd, comptime fmt: []const u8, args: anytype) void { + const handler = struct { + fn enqueueCb(ctx: *Cmd) void { + ctx.state = .waiting_write_err; } }; - _ = this.base.shell.writeFailingError(buf, this, HandleIOWrite.run, this.base.eventLoop()); - - // switch (this.base.shell.io.stderr) { - // .std => |val| { - // this.state = .{ .waiting_write_err = BufferedWriter{ - // .fd = stderr_no, - // .buffer = buf, - // .parent = BufferedWriter.ParentPtr.init(this), - // .bytelist = val.captured, - // } }; - // this.state.waiting_write_err.write(); - // }, - // .fd => { - // this.state = .{ .waiting_write_err = BufferedWriter{ - // .fd = stderr_no, - // .buffer = buf, - // .parent = BufferedWriter.ParentPtr.init(this), - // } }; - // this.state.waiting_write_err.write(); - // }, - // .pipe, .ignore => { - // this.parent.childDone(this, 1); - // }, - // } - return; + this.base.shell.writeFailingErrorFmt(this, handler.enqueueCb, fmt, args); } pub fn init( @@ -3109,7 +3226,7 @@ pub const Interpreter = struct { } pub fn next(this: *Cmd) void { - while (!(this.state == .done or this.state == .err)) { + while (this.state != .done) { switch (this.state) { .idle => { this.state = .{ .expanding_assigns = undefined }; @@ -3191,7 +3308,7 @@ pub const Interpreter = struct { // yield execution to subproc/builtin return; }, - .done, .err => unreachable, + .done => unreachable, } } @@ -3214,14 +3331,13 @@ pub const Interpreter = struct { return this.next(); } - pub fn onBufferedWriterDone(this: *Cmd, e: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Cmd, e: ?JSC.SystemError) void { if (e) |err| { this.base.throw(&bun.shell.ShellErr.newSys(err)); return; } std.debug.assert(this.state == .waiting_write_err); - this.state = .{ .err = e }; - this.next(); + this.parent.childDone(this, 1); return; } @@ -3232,7 +3348,7 @@ pub const Interpreter = struct { defer err.deinit(bun.default_allocator); this.state.expanding_assigns.deinit(); const buf = err.fmt(); - this.writeFailingError(buf, exit_code); + this.writeFailingError("{s}", .{buf}); return; } @@ -3256,7 +3372,7 @@ pub const Interpreter = struct { }; defer err.deinit(bun.default_allocator); const buf = err.fmt(); - this.writeFailingError(buf, exit_code); + this.writeFailingError("{s}", .{buf}); return; } this.next(); @@ -3337,8 +3453,7 @@ pub const Interpreter = struct { switch (this.exec.bltn.start()) { .result => {}, .err => |e| { - const buf = std.fmt.allocPrint(this.spawn_arena.allocator(), "bun: {s}: {s}", .{ @tagName(this.exec.bltn.kind), e.toSystemError().message }) catch bun.outOfMemory(); - this.writeFailingError(buf, 1); + this.writeFailingError("bun: {s}: {s}", .{ @tagName(this.exec.bltn.kind), e.toSystemError().message }); return; }, } @@ -3347,8 +3462,7 @@ pub const Interpreter = struct { var path_buf: [bun.MAX_PATH_BYTES]u8 = undefined; const resolved = which(&path_buf, spawn_args.PATH, spawn_args.cwd, first_arg[0..first_arg_len]) orelse { - const buf = std.fmt.allocPrint(arena_allocator, "bun: command not found: {s}\n", .{first_arg}) catch bun.outOfMemory(); - this.writeFailingError(buf, 1); + this.writeFailingError("bun: command not found: {s}\n", .{first_arg}); return; }; @@ -3445,8 +3559,7 @@ pub const Interpreter = struct { }, .atom => { if (this.redirection_file.items.len == 0) { - const buf = std.fmt.allocPrint(spawn_args.arena.allocator(), "bun: ambiguous redirect: at `{s}`\n", .{spawn_args.argv.items[0] orelse ""}) catch bun.outOfMemory(); - this.writeFailingError(buf, 1); + this.writeFailingError("bun: ambiguous redirect: at `{s}`\n", .{spawn_args.argv.items[0] orelse ""}); return; } const path = this.redirection_file.items[0..this.redirection_file.items.len -| 1 :0]; @@ -3455,12 +3568,11 @@ pub const Interpreter = struct { const flags = this.node.redirect.toFlags(); const redirfd = switch (Syscall.openat(this.base.shell.cwd_fd, path, flags, perm)) { .err => |e| { - const buf = std.fmt.allocPrint(this.spawn_arena.allocator(), "bun: {s}: {s}", .{ e.toSystemError().message, path }) catch bun.outOfMemory(); - return this.writeFailingError(buf, 1); + return this.writeFailingError("bun: {s}: {s}", .{ e.toSystemError().message, path }); }, .result => |f| f, }; - this.redirection_fd = redirfd; + this.redirection_fd = CowFd.init(redirfd); setStdioFromRedirect(&spawn_args.stdio, this.node.redirect, .{ .fd = redirfd }); }, } @@ -3538,20 +3650,24 @@ pub const Interpreter = struct { } pub fn hasFinished(this: *Cmd) bool { + log("Cmd(0x{x}) exit_code={any}", .{ @intFromPtr(this), this.exit_code }); if (this.exit_code == null) return false; if (this.exec != .none) { - if (this.exec == .subproc) return this.exec.subproc.buffered_closed.allClosed(); - return this.exec.bltn.ioAllClosed(); + if (this.exec == .subproc) { + return this.exec.subproc.buffered_closed.allClosed(); + } + // return this.exec.bltn.ioAllClosed(); + return false; } return true; } /// Called by Subprocess pub fn onExit(this: *Cmd, exit_code: ExitCode) void { - log("cmd exit code={d} ({x})", .{ exit_code, @intFromPtr(this) }); this.exit_code = exit_code; const has_finished = this.hasFinished(); + log("cmd exit code={d} has_finished={any} ({x})", .{ exit_code, has_finished, @intFromPtr(this) }); if (has_finished) { this.state = .done; this.next(); @@ -3567,9 +3683,9 @@ pub const Interpreter = struct { pub fn deinit(this: *Cmd) void { log("cmd deinit {x}", .{@intFromPtr(this)}); // this.base.shell.cmd_local_env.clearRetainingCapacity(); - if (this.redirection_fd != bun.invalid_fd) { - _ = Syscall.close(this.redirection_fd); - this.redirection_fd = bun.invalid_fd; + if (this.redirection_fd) |redirfd| { + this.redirection_fd = null; + redirfd.deref(); } // if (this.exit_code != null) { // if (this.cmd) |cmd| { @@ -3638,8 +3754,8 @@ pub const Interpreter = struct { std.debug.assert(this.exec == .subproc); } log("cmd ({x}) close buffered stdout", .{@intFromPtr(this)}); - if (this.io.stdout == .std and this.io.stdout.std.captured != null and !this.node.redirect.redirectsElsewhere(.stdout)) { - var buf = this.io.stdout.std.captured.?; + if (this.io.stdout == .fd and this.io.stdout.fd.captured != null and !this.node.redirect.redirectsElsewhere(.stdout)) { + var buf = this.io.stdout.fd.captured.?; const the_slice = this.exec.subproc.child.stdout.pipe.slice(); buf.append(bun.default_allocator, the_slice) catch bun.outOfMemory(); } @@ -3652,8 +3768,8 @@ pub const Interpreter = struct { std.debug.assert(this.exec == .subproc); } log("cmd ({x}) close buffered stderr", .{@intFromPtr(this)}); - if (this.io.stderr == .std and this.io.stderr.std.captured != null and !this.node.redirect.redirectsElsewhere(.stderr)) { - var buf = this.io.stderr.std.captured.?; + if (this.io.stderr == .fd and this.io.stderr.fd.captured != null and !this.node.redirect.redirectsElsewhere(.stderr)) { + var buf = this.io.stderr.fd.captured.?; buf.append(bun.default_allocator, this.exec.subproc.child.stderr.pipe.slice()) catch bun.outOfMemory(); } this.exec.subproc.buffered_closed.close(this, .{ .stderr = &this.exec.subproc.child.stderr }); @@ -3663,9 +3779,9 @@ pub const Interpreter = struct { pub const Builtin = struct { kind: Kind, - stdin: BuiltinIO, - stdout: BuiltinIO, - stderr: BuiltinIO, + stdin: BuiltinIO.Input, + stdout: BuiltinIO.Output, + stderr: BuiltinIO.Output, exit_code: ?ExitCode = null, export_env: *EnvMap, @@ -3741,90 +3857,122 @@ pub const Interpreter = struct { } }; - /// in the case of array buffer we simply need to write to the pointer - /// in the case of blob, we write to the file descriptor - pub const BuiltinIO = union(enum) { - fd: bun.FileDescriptor, - buf: std.ArrayList(u8), - captured: struct { - out_kind: enum { stdout, stderr }, - bytelist: *bun.ByteList, - }, - arraybuf: ArrayBuf, - blob: *bun.JSC.WebCore.Blob, - ignore, + pub const BuiltinIO = struct { + /// in the case of array buffer we simply need to write to the pointer + /// in the case of blob, we write to the file descriptor + pub const Output = union(enum) { + fd: struct { writer: *IOWriter, captured: ?*bun.ByteList = null }, + /// array list not owned by this type + buf: std.ArrayList(u8), + arraybuf: ArrayBuf, + blob: *bun.JSC.WebCore.Blob, + ignore, + + const FdOutput = struct { + writer: *IOWriter, + captured: ?*bun.ByteList = null, + + // pub fn + }; + + pub fn deinit(this: *Output) void { + switch (this.*) { + .fd => { + this.fd.writer.deref(); + }, + else => {}, + } + } + + pub fn needsIO(this: *Output) bool { + return switch (this.*) { + .fd => true, + else => false, + }; + } + + pub fn start(this: *@This()) void { + if (bun.Environment.allow_assert) std.debug.assert(this.* == .fd); + this.fd.writer.write(); + } + + pub fn enqueueFmtBltn( + this: *@This(), + ptr: anytype, + comptime kind: ?Interpreter.Builtin.Kind, + comptime fmt_: []const u8, + args: anytype, + ) void { + this.enqueueFmtBltnImpl(ptr, kind, fmt_, args, false); + } + + pub fn enqueueFmtBltnAndWrite( + this: *@This(), + ptr: anytype, + comptime kind: ?Interpreter.Builtin.Kind, + comptime fmt_: []const u8, + args: anytype, + ) void { + this.enqueueFmtBltnImpl(ptr, kind, fmt_, args, true); + } + + pub fn enqueueFmtBltnImpl( + this: *@This(), + ptr: anytype, + comptime kind: ?Interpreter.Builtin.Kind, + comptime fmt_: []const u8, + args: anytype, + comptime call_write: bool, + ) void { + if (bun.Environment.allow_assert) std.debug.assert(this.* == .fd); + this.fd.writer.enqueueFmtBltn(ptr, this.fd.captured, kind, fmt_, args); + if (comptime call_write) this.fd.writer.write(); + } + + pub fn enqueue(this: *@This(), ptr: anytype, buf: []const u8) void { + this.enqueueImpl(ptr, buf, false); + } + + pub fn enqueueAndWrite(this: *@This(), ptr: anytype, buf: []const u8) void { + this.enqueueImpl(ptr, buf, true); + } + + pub fn enqueueImpl(this: *@This(), ptr: anytype, buf: []const u8, comptime call_write: bool) void { + if (bun.Environment.allow_assert) std.debug.assert(this.* == .fd); + this.fd.writer.enqueue(ptr, this.fd.captured, buf); + if (comptime call_write) this.fd.writer.write(); + } + }; + + pub const Input = union(enum) { + fd: *CowFd, + /// array list not ownedby this type + buf: std.ArrayList(u8), + arraybuf: ArrayBuf, + blob: *bun.JSC.WebCore.Blob, + ignore, + + pub fn deinit(this: *Input) void { + switch (this.*) { + .fd => { + this.fd.deref(); + }, + else => {}, + } + } + + pub fn needsIO(this: *Output) bool { + return switch (this.*) { + .fd => true, + else => false, + }; + } + }; const ArrayBuf = struct { buf: JSC.ArrayBuffer.Strong, i: u32 = 0, }; - - pub fn asFd(this: *BuiltinIO) ?bun.FileDescriptor { - return switch (this.*) { - .fd => this.fd, - .captured => if (this.captured.out_kind == .stdout) bun.STDOUT_FD else bun.STDERR_FD, - else => null, - }; - } - - pub fn expectFd(this: *BuiltinIO) bun.FileDescriptor { - return switch (this.*) { - .fd => this.fd, - .captured => if (this.captured.out_kind == .stdout) bun.STDOUT_FD else bun.STDERR_FD, - else => @panic("No fd"), - }; - } - - pub fn isClosed(this: *BuiltinIO) bool { - switch (this.*) { - .fd => { - return this.fd != bun.invalid_fd; - }, - .buf => { - return true; - // try this.buf.deinit(allocator); - }, - else => return true, - } - } - - pub fn deinit(this: *BuiltinIO) void { - switch (this.*) { - .buf => { - this.buf.deinit(); - }, - .fd => { - if (this.fd != bun.invalid_fd and this.fd != bun.STDIN_FD) { - _ = Syscall.close(this.fd); - this.fd = bun.invalid_fd; - } - }, - .blob => |blob| { - blob.deinit(); - }, - else => {}, - } - } - - pub fn close(this: *BuiltinIO) void { - switch (this.*) { - .fd => { - if (this.fd != bun.invalid_fd) { - closefd(this.fd); - this.fd = bun.invalid_fd; - } - }, - .buf => {}, - else => {}, - } - } - - pub fn needsIO(this: *BuiltinIO) bool { - return switch (this.*) { - .fd, .captured => true, - else => false, - }; - } }; pub fn argsSlice(this: *Builtin) []const [*:0]const u8 { @@ -3886,22 +4034,22 @@ pub const Interpreter = struct { ) CoroutineResult { const io = io_.*; - const stdin: Builtin.BuiltinIO = switch (io.stdin) { - .std => .{ .fd = bun.STDIN_FD }, - .fd => |fd| .{ .fd = fd }, - .pipe => .{ .buf = std.ArrayList(u8).init(interpreter.allocator) }, + const stdin: BuiltinIO.Input = switch (io.stdin) { + .fd => |fd| .{ .fd = fd.refSelf() }, .ignore => .ignore, }; - const stdout: Builtin.BuiltinIO = switch (io.stdout) { - .std => if (io.stdout.std.captured) |bytelist| .{ .captured = .{ .out_kind = .stdout, .bytelist = bytelist } } else .{ .fd = bun.STDOUT_FD }, - .fd => |fd| .{ .fd = fd }, - .pipe => .{ .buf = std.ArrayList(u8).init(interpreter.allocator) }, + const stdout: BuiltinIO.Output = switch (io.stdout) { + .fd => |val| .{ .fd = .{ .writer = val.writer.refSelf(), .captured = val.captured } }, + .pipe => .{ .buf = std.ArrayList(u8).init(bun.default_allocator) }, .ignore => .ignore, + // .std => if (io.stdout.std.captured) |bytelist| .{ .captured = .{ .out_kind = .stdout, .bytelist = bytelist } } else .{ .fd = bun.STDOUT_FD }, + // .fd => |fd| .{ .fd = fd }, + // .pipe => .{ .buf = std.ArrayList(u8).init(interpreter.allocator) }, + // .ignore => .ignore, }; - const stderr: Builtin.BuiltinIO = switch (io.stderr) { - .std => if (io.stderr.std.captured) |bytelist| .{ .captured = .{ .out_kind = .stderr, .bytelist = bytelist } } else .{ .fd = bun.STDERR_FD }, - .fd => |fd| .{ .fd = fd }, - .pipe => .{ .buf = std.ArrayList(u8).init(interpreter.allocator) }, + const stderr: BuiltinIO.Output = switch (io.stderr) { + .fd => |val| .{ .fd = .{ .writer = val.writer.refSelf(), .captured = val.captured } }, + .pipe => .{ .buf = std.ArrayList(u8).init(bun.default_allocator) }, .ignore => .ignore, }; @@ -3996,8 +4144,7 @@ pub const Interpreter = struct { switch (file) { .atom => { if (cmd.redirection_file.items.len == 0) { - const buf = std.fmt.allocPrint(arena.allocator(), "bun: ambiguous redirect: at `{s}`\n", .{@tagName(kind)}) catch bun.outOfMemory(); - cmd.writeFailingError(buf, 1); + cmd.writeFailingError("bun: ambiguous redirect: at `{s}`\n", .{@tagName(kind)}); return .yield; } const path = cmd.redirection_file.items[0..cmd.redirection_file.items.len -| 1 :0]; @@ -4006,55 +4153,54 @@ pub const Interpreter = struct { const flags = node.redirect.toFlags(); const redirfd = switch (Syscall.openat(cmd.base.shell.cwd_fd, path, flags, perm)) { .err => |e| { - const buf = std.fmt.allocPrint(arena.allocator(), "bun: {s}: {s}", .{ e.toSystemError().message, path }) catch bun.outOfMemory(); - cmd.writeFailingError(buf, 1); + cmd.writeFailingError("bun: {s}: {s}", .{ e.toSystemError().message, path }); return .yield; }, .result => |f| f, + // cmd.redirection_fd = redirfd; }; - // cmd.redirection_fd = redirfd; if (node.redirect.stdin) { - cmd.exec.bltn.stdin = .{ .fd = redirfd }; + cmd.exec.bltn.stdin = .{ .fd = CowFd.init(redirfd) }; } if (node.redirect.stdout) { - cmd.exec.bltn.stdout = .{ .fd = redirfd }; + cmd.exec.bltn.stdout = .{ .fd = .{ .writer = IOWriter.init(redirfd, cmd.base.eventLoop()) } }; } if (node.redirect.stderr) { - cmd.exec.bltn.stderr = .{ .fd = redirfd }; + cmd.exec.bltn.stderr = .{ .fd = .{ .writer = IOWriter.init(redirfd, cmd.base.eventLoop()) } }; } }, .jsbuf => |val| { const globalObject = interpreter.event_loop.js.global; if (interpreter.jsobjs[file.jsbuf.idx].asArrayBuffer(globalObject)) |buf| { - const builtinio: Builtin.BuiltinIO = .{ .arraybuf = .{ .buf = JSC.ArrayBuffer.Strong{ + const arraybuf: BuiltinIO.ArrayBuf = .{ .buf = JSC.ArrayBuffer.Strong{ .array_buffer = buf, .held = JSC.Strong.create(buf.value, globalObject), - }, .i = 0 } }; + }, .i = 0 }; if (node.redirect.stdin) { - cmd.exec.bltn.stdin = builtinio; + cmd.exec.bltn.stdin = .{ .arraybuf = arraybuf }; } if (node.redirect.stdout) { - cmd.exec.bltn.stdout = builtinio; + cmd.exec.bltn.stdout = .{ .arraybuf = arraybuf }; } if (node.redirect.stderr) { - cmd.exec.bltn.stderr = builtinio; + cmd.exec.bltn.stderr = .{ .arraybuf = arraybuf }; } } else if (interpreter.jsobjs[file.jsbuf.idx].as(JSC.WebCore.Blob)) |blob| { - const builtinio: Builtin.BuiltinIO = .{ .blob = bun.newWithAlloc(arena.allocator(), JSC.WebCore.Blob, blob.dupe()) }; + const theblob: *bun.JSC.WebCore.Blob = bun.newWithAlloc(arena.allocator(), JSC.WebCore.Blob, blob.dupe()); if (node.redirect.stdin) { - cmd.exec.bltn.stdin = builtinio; + cmd.exec.bltn.stdin = .{ .blob = theblob }; } if (node.redirect.stdout) { - cmd.exec.bltn.stdout = builtinio; + cmd.exec.bltn.stdout = .{ .blob = theblob }; } if (node.redirect.stderr) { - cmd.exec.bltn.stderr = builtinio; + cmd.exec.bltn.stderr = .{ .blob = theblob }; } } else { const jsval = cmd.base.interpreter.jsobjs[val.idx]; @@ -4094,11 +4240,17 @@ pub const Interpreter = struct { return @fieldParentPtr(Cmd, "exec", union_ptr); } - pub fn done(this: *Builtin, exit_code: ExitCode) void { + pub fn done(this: *Builtin, exit_code: anytype) void { // if (comptime bun.Environment.allow_assert) { // std.debug.assert(this.exit_code != null); // } - this.exit_code = exit_code; + const code: ExitCode = switch (@TypeOf(exit_code)) { + bun.C.E => @intFromEnum(exit_code), + u1, u8, u16 => exit_code, + comptime_int => exit_code, + else => @compileError("Invalid type: " ++ @typeName(@TypeOf(exit_code))), + }; + this.exit_code = code; var cmd = this.parentCmdMut(); log("builtin done ({s}: exit={d}) cmd to free: ({x})", .{ @tagName(this.kind), exit_code, @intFromPtr(cmd) }); @@ -4167,26 +4319,27 @@ pub const Interpreter = struct { }; } - pub fn writeNoIO(this: *Builtin, comptime io_kind: @Type(.EnumLiteral), buf: []const u8) Maybe(usize) { + pub fn writeNoIO(this: *Builtin, comptime io_kind: @Type(.EnumLiteral), buf: []const u8) usize { if (comptime io_kind != .stdout and io_kind != .stderr) { @compileError("Bad IO" ++ @tagName(io_kind)); } - if (buf.len == 0) return .{ .result = 0 }; + if (buf.len == 0) return 0; - var io: *BuiltinIO = &@field(this, @tagName(io_kind)); + var io: *BuiltinIO.Output = &@field(this, @tagName(io_kind)); switch (io.*) { - .captured, .fd => @panic("writeNoIO can't write to a file descriptor"), + .fd => @panic("writeNoIO can't write to a file descriptor"), .buf => { log("{s} write to buf len={d} str={s}{s}\n", .{ this.kind.asString(), buf.len, buf[0..@min(buf.len, 16)], if (buf.len > 16) "..." else "" }); io.buf.appendSlice(buf) catch bun.outOfMemory(); - return Maybe(usize).initResult(buf.len); + return buf.len; }, .arraybuf => { if (io.arraybuf.i >= io.arraybuf.buf.array_buffer.byte_len) { // TODO is it correct to return an error here? is this error the correct one to return? - return Maybe(usize).initErr(Syscall.Error.fromCode(bun.C.E.NOSPC, .write)); + // return Maybe(usize).initErr(Syscall.Error.fromCode(bun.C.E.NOSPC, .write)); + @panic("TODO shell: forgot this"); } const len = buf.len; @@ -4202,9 +4355,9 @@ pub const Interpreter = struct { @memcpy(slice, buf[0..write_len]); io.arraybuf.i +|= @truncate(write_len); log("{s} write to arraybuf {d}\n", .{ this.kind.asString(), write_len }); - return Maybe(usize).initResult(write_len); + return write_len; }, - .blob, .ignore => return Maybe(usize).initResult(buf.len), + .blob, .ignore => return buf.len, } } @@ -4219,9 +4372,9 @@ pub const Interpreter = struct { }; } - pub fn ioAllClosed(this: *Builtin) bool { - return this.stdin.isClosed() and this.stdout.isClosed() and this.stderr.isClosed(); - } + // pub fn ioAllClosed(this: *Builtin) bool { + // return this.stdin.isClosed() and this.stdout.isClosed() and this.stderr.isClosed(); + // } pub fn fmtErrorArena(this: *Builtin, comptime kind: ?Kind, comptime fmt_: []const u8, args: anytype) []u8 { const cmd_str = comptime if (kind) |k| k.asString() ++ ": " else ""; @@ -4231,14 +4384,7 @@ pub const Interpreter = struct { pub const Export = struct { bltn: *Builtin, - print_state: ?struct { - bufwriter: BufferedWriter, - err: ?Syscall.Error = null, - - pub fn isDone(this: *@This()) bool { - return this.err != null or this.bufwriter.written >= this.bufwriter.buffer.len; - } - } = null, + printing: bool = false, const Entry = struct { key: EnvStr, @@ -4249,41 +4395,30 @@ pub const Interpreter = struct { } }; - pub fn writeOutput(this: *Export, comptime io_kind: @Type(.EnumLiteral), buf: []const u8) Maybe(void) { + pub fn writeOutput(this: *Export, comptime io_kind: @Type(.EnumLiteral), comptime fmt: []const u8, args: anytype) Maybe(void) { if (!this.bltn.stdout.needsIO()) { - switch (this.bltn.writeNoIO(io_kind, buf)) { - .err => |e| { - this.bltn.exit_code = e.errno; - return Maybe(void).initErr(e); - }, - .result => |written| { - if (comptime bun.Environment.allow_assert) std.debug.assert(written == buf.len); - }, - } + const buf = this.bltn.fmtErrorArena(.@"export", fmt, args); + _ = this.bltn.writeNoIO(io_kind, buf); this.bltn.done(0); return Maybe(void).success; } - this.print_state = .{ - .bufwriter = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .buffer = buf, - .fd = if (comptime io_kind == .stdout) this.bltn.stdout.expectFd() else this.bltn.stderr.expectFd(), - .parent = BufferedWriter.ParentPtr{ .ptr = BufferedWriter.ParentPtr.Repr.init(this) }, - .bytelist = this.bltn.stdBufferedBytelist(io_kind), - }, - }; - this.print_state.?.bufwriter.write(); + var output: *BuiltinIO.Output = &@field(this.bltn, @tagName(io_kind)); + this.printing = true; + output.enqueueFmtBltnAndWrite(this, .@"export", fmt, args); return Maybe(void).success; } - pub fn onBufferedWriterDone(this: *Export, e: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Export, e: ?JSC.SystemError) void { if (comptime bun.Environment.allow_assert) { - std.debug.assert(this.print_state != null); + std.debug.assert(this.printing); } - this.print_state.?.err = e; - const exit_code: ExitCode = if (e != null) e.?.errno else 0; + const exit_code: ExitCode = if (e != null) brk: { + defer e.?.deref(); + break :brk @intFromEnum(e.?.getErrno()); + } else 0; + this.bltn.done(exit_code); } @@ -4322,41 +4457,13 @@ pub const Interpreter = struct { } if (!this.bltn.stdout.needsIO()) { - switch (this.bltn.writeNoIO(.stdout, buf)) { - .err => |e| { - this.bltn.exit_code = e.errno; - return Maybe(void).initErr(e); - }, - .result => |written| { - if (comptime bun.Environment.allow_assert) std.debug.assert(written == buf.len); - }, - } + _ = this.bltn.writeNoIO(.stdout, buf); this.bltn.done(0); return Maybe(void).success; } - if (comptime bun.Environment.allow_assert) {} - - this.print_state = .{ - .bufwriter = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .buffer = buf, - .fd = this.bltn.stdout.expectFd(), - .parent = BufferedWriter.ParentPtr{ .ptr = BufferedWriter.ParentPtr.Repr.init(this) }, - .bytelist = this.bltn.stdBufferedBytelist(.stdout), - }, - }; - - this.print_state.?.bufwriter.write(); - - // if (this.print_state.?.isDone()) { - // if (this.print_state.?.bufwriter.err) |e| { - // this.bltn.exit_code = e.errno; - // return Maybe(void).initErr(e); - // } - // this.bltn.exit_code = 0; - // return Maybe(void).success; - // } + this.printing = true; + this.bltn.stdout.enqueueAndWrite(this, buf); return Maybe(void).success; } @@ -4369,7 +4476,7 @@ pub const Interpreter = struct { const eqsign_idx = std.mem.indexOfScalar(u8, arg, '=') orelse { if (!shell.isValidVarName(arg)) { const buf = this.bltn.fmtErrorArena(.@"export", "`{s}`: not a valid identifier", .{arg}); - return this.writeOutput(.stderr, buf); + return this.writeOutput(.stderr, "{s}\n", .{buf}); } this.bltn.parentCmd().base.shell.assignVar(this.bltn.parentCmd().base.interpreter, EnvStr.initSlice(arg), EnvStr.initSlice(""), .exported); continue; @@ -4396,13 +4503,10 @@ pub const Interpreter = struct { /// Should be allocated with the arena from Builtin output: std.ArrayList(u8), - io_write_state: ?BufferedWriter = null, - state: union(enum) { idle, waiting, done, - err: Syscall.Error, } = .idle, pub fn start(this: *Echo) Maybe(void) { @@ -4420,39 +4524,25 @@ pub const Interpreter = struct { this.output.append('\n') catch bun.outOfMemory(); if (!this.bltn.stdout.needsIO()) { - switch (this.bltn.writeNoIO(.stdout, this.output.items[0..])) { - .err => |e| { - this.state.err = e; - return Maybe(void).initErr(e); - }, - .result => {}, - } - + _ = this.bltn.writeNoIO(.stdout, this.output.items[0..]); this.state = .done; this.bltn.done(0); return Maybe(void).success; } - this.io_write_state = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .fd = this.bltn.stdout.expectFd(), - .buffer = this.output.items[0..], - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stdout), - }; this.state = .waiting; - this.io_write_state.?.write(); + this.bltn.stdout.enqueueAndWrite(this, this.output.items[0..]); return Maybe(void).success; } - pub fn onBufferedWriterDone(this: *Echo, e: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Echo, e: ?JSC.SystemError) void { if (comptime bun.Environment.allow_assert) { - std.debug.assert(this.io_write_state != null and this.state == .waiting); + std.debug.assert(this.state == .waiting); } if (e != null) { - this.state = .{ .err = e.? }; - this.bltn.done(e.?.errno); + defer e.?.deref(); + this.bltn.done(e.?.getErrno()); return; } @@ -4473,47 +4563,30 @@ pub const Interpreter = struct { state: union(enum) { idle, - one_arg: struct { - writer: BufferedWriter, - }, + one_arg, multi_args: struct { args_slice: []const [*:0]const u8, arg_idx: usize, had_not_found: bool = false, state: union(enum) { none, - waiting_write: BufferedWriter, + waiting_write, }, }, done, - err: Syscall.Error, + err: JSC.SystemError, } = .idle, pub fn start(this: *Which) Maybe(void) { const args = this.bltn.argsSlice(); if (args.len == 0) { if (!this.bltn.stdout.needsIO()) { - switch (this.bltn.writeNoIO(.stdout, "\n")) { - .err => |e| { - return Maybe(void).initErr(e); - }, - .result => {}, - } + _ = this.bltn.writeNoIO(.stdout, "\n"); this.bltn.done(1); return Maybe(void).success; } - this.state = .{ - .one_arg = .{ - .writer = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .fd = this.bltn.stdout.expectFd(), - .buffer = "\n", - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stdout), - }, - }, - }; - this.state.one_arg.writer.write(); + this.state = .one_arg; + this.bltn.stdout.enqueueAndWrite(this, "\n"); return Maybe(void).success; } @@ -4526,17 +4599,11 @@ pub const Interpreter = struct { const resolved = which(&path_buf, PATH.slice(), this.bltn.parentCmd().base.shell.cwdZ(), arg) orelse { had_not_found = true; const buf = this.bltn.fmtErrorArena(.which, "{s} not found\n", .{arg}); - switch (this.bltn.writeNoIO(.stdout, buf)) { - .err => |e| return Maybe(void).initErr(e), - .result => {}, - } + _ = this.bltn.writeNoIO(.stdout, buf); continue; }; - switch (this.bltn.writeNoIO(.stdout, resolved)) { - .err => |e| return Maybe(void).initErr(e), - .result => {}, - } + _ = this.bltn.writeNoIO(.stdout, resolved); } this.bltn.done(@intFromBool(had_not_found)); return Maybe(void).success; @@ -4568,33 +4635,28 @@ pub const Interpreter = struct { const PATH = this.bltn.parentCmd().base.shell.export_env.get(EnvStr.initSlice("PATH")) orelse EnvStr.initSlice(""); const resolved = which(&path_buf, PATH.slice(), this.bltn.parentCmd().base.shell.cwdZ(), arg) orelse { - const buf = this.bltn.fmtErrorArena(null, "{s} not found\n", .{arg}); multiargs.had_not_found = true; - multiargs.state = .{ - .waiting_write = BufferedWriter{ - .fd = this.bltn.stdout.expectFd(), - .buffer = buf, - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stdout), - }, - }; - multiargs.state.waiting_write.write(); + if (!this.bltn.stdout.needsIO()) { + const buf = this.bltn.fmtErrorArena(null, "{s} not found\n", .{arg}); + _ = this.bltn.writeNoIO(.stdout, buf); + this.argComplete(); + return; + } + multiargs.state = .waiting_write; + this.bltn.stdout.enqueueFmtBltnAndWrite(this, null, "{s} not found\n", .{arg}); // yield execution return; }; - const buf = this.bltn.fmtErrorArena(null, "{s}\n", .{resolved}); - multiargs.state = .{ - .waiting_write = BufferedWriter{ - .fd = this.bltn.stdout.expectFd(), - .buffer = buf, - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stdout), - }, - }; - multiargs.state.waiting_write.write(); + if (!this.bltn.stdout.needsIO()) { + const buf = this.bltn.fmtErrorArena(null, "{s}\n", .{resolved}); + _ = this.bltn.writeNoIO(.stdout, buf); + this.argComplete(); + return; + } + + multiargs.state = .waiting_write; + this.bltn.stdout.enqueueFmtBltnAndWrite(this, null, "{s}\n", .{resolved}); return; } @@ -4608,7 +4670,7 @@ pub const Interpreter = struct { this.next(); } - pub fn onBufferedWriterDone(this: *Which, e: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Which, e: ?JSC.SystemError) void { if (comptime bun.Environment.allow_assert) { std.debug.assert(this.state == .one_arg or (this.state == .multi_args and this.state.multi_args.state == .waiting_write)); @@ -4616,7 +4678,7 @@ pub const Interpreter = struct { if (e != null) { this.state = .{ .err = e.? }; - this.bltn.done(e.?.errno); + this.bltn.done(e.?.getErrno()); return; } @@ -4643,33 +4705,20 @@ pub const Interpreter = struct { bltn: *Builtin, state: union(enum) { idle, - waiting_write_stderr: struct { - buffered_writer: BufferedWriter, - }, + waiting_write_stderr, done, err: Syscall.Error, } = .idle, - fn writeStderrNonBlocking(this: *Cd, buf: []u8) void { - this.state = .{ - .waiting_write_stderr = .{ - .buffered_writer = BufferedWriter{ - .fd = this.bltn.stderr.expectFd(), - .buffer = buf, - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - }, - }; - this.state.waiting_write_stderr.buffered_writer.write(); + fn writeStderrNonBlocking(this: *Cd, comptime fmt: []const u8, args: anytype) void { + this.state = .waiting_write_stderr; + this.bltn.stderr.enqueueFmtBltnAndWrite(this, .cd, fmt, args); } pub fn start(this: *Cd) Maybe(void) { const args = this.bltn.argsSlice(); if (args.len > 1) { - const buf = this.bltn.fmtErrorArena(.cd, "too many arguments", .{}); - this.writeStderrNonBlocking(buf); + this.writeStderrNonBlocking("too many arguments", .{}); // yield execution return Maybe(void).success; } @@ -4708,49 +4757,43 @@ pub const Interpreter = struct { switch (errno) { @as(usize, @intFromEnum(bun.C.E.NOTDIR)) => { - const buf = this.bltn.fmtErrorArena(.cd, "not a directory: {s}", .{new_cwd_}); if (!this.bltn.stderr.needsIO()) { - switch (this.bltn.writeNoIO(.stderr, buf)) { - .err => |e| return Maybe(void).initErr(e), - .result => {}, - } + const buf = this.bltn.fmtErrorArena(.cd, "not a directory: {s}", .{new_cwd_}); + _ = this.bltn.writeNoIO(.stderr, buf); this.state = .done; this.bltn.done(1); // yield execution return Maybe(void).success; } - this.writeStderrNonBlocking(buf); + this.writeStderrNonBlocking("not a directory: {s}", .{new_cwd_}); return Maybe(void).success; }, @as(usize, @intFromEnum(bun.C.E.NOENT)) => { - const buf = this.bltn.fmtErrorArena(.cd, "not a directory: {s}", .{new_cwd_}); if (!this.bltn.stderr.needsIO()) { - switch (this.bltn.writeNoIO(.stderr, buf)) { - .err => |e| return Maybe(void).initErr(e), - .result => {}, - } + const buf = this.bltn.fmtErrorArena(.cd, "not a directory: {s}", .{new_cwd_}); + _ = this.bltn.writeNoIO(.stderr, buf); this.state = .done; this.bltn.done(1); // yield execution return Maybe(void).success; } - this.writeStderrNonBlocking(buf); + this.writeStderrNonBlocking("not a directory: {s}", .{new_cwd_}); return Maybe(void).success; }, else => return Maybe(void).success, } } - pub fn onBufferedWriterDone(this: *Cd, e: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Cd, e: ?JSC.SystemError) void { if (comptime bun.Environment.allow_assert) { std.debug.assert(this.state == .waiting_write_stderr); } if (e != null) { - this.state = .{ .err = e.? }; - this.bltn.done(e.?.errno); + defer e.?.deref(); + this.bltn.done(e.?.getErrno()); return; } @@ -4770,9 +4813,8 @@ pub const Interpreter = struct { idle, waiting_io: struct { kind: enum { stdout, stderr }, - writer: BufferedWriter, }, - err: Syscall.Error, + err, done, } = .idle, @@ -4781,52 +4823,26 @@ pub const Interpreter = struct { if (args.len > 0) { const msg = "pwd: too many arguments"; if (this.bltn.stderr.needsIO()) { - this.state = .{ - .waiting_io = .{ - .kind = .stderr, - .writer = BufferedWriter{ - .fd = this.bltn.stderr.expectFd(), - .buffer = msg, - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - }, - }; - this.state.waiting_io.writer.write(); + this.state = .{ .waiting_io = .{ .kind = .stderr } }; + this.bltn.stderr.enqueueAndWrite(this, msg); return Maybe(void).success; } - if (this.bltn.writeNoIO(.stderr, msg).asErr()) |e| { - return .{ .err = e }; - } + _ = this.bltn.writeNoIO(.stderr, msg); this.bltn.done(1); return Maybe(void).success; } const cwd_str = this.bltn.parentCmd().base.shell.cwd(); - const buf = this.bltn.fmtErrorArena(null, "{s}\n", .{cwd_str}); if (this.bltn.stdout.needsIO()) { - this.state = .{ - .waiting_io = .{ - .kind = .stdout, - .writer = BufferedWriter{ - .fd = this.bltn.stdout.expectFd(), - .buffer = buf, - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stdout), - }, - }, - }; - this.state.waiting_io.writer.write(); + this.state = .{ .waiting_io = .{ .kind = .stdout } }; + this.bltn.stdout.enqueueFmtBltnAndWrite(this, null, "{s}\n", .{cwd_str}); return Maybe(void).success; } + const buf = this.bltn.fmtErrorArena(null, "{s}\n", .{cwd_str}); - if (this.bltn.writeNoIO(.stdout, buf).asErr()) |err| { - return .{ .err = err }; - } + _ = this.bltn.writeNoIO(.stdout, buf); this.state = .done; this.bltn.done(0); @@ -4847,18 +4863,19 @@ pub const Interpreter = struct { } if (this.state == .err) { - this.bltn.done(this.state.err.errno); + this.bltn.done(1); return; } } - pub fn onBufferedWriterDone(this: *Pwd, e: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Pwd, e: ?JSC.SystemError) void { if (comptime bun.Environment.allow_assert) { std.debug.assert(this.state == .waiting_io); } if (e != null) { - this.state = .{ .err = e.? }; + defer e.?.deref(); + this.state = .err; this.next(); return; } @@ -4883,22 +4900,13 @@ pub const Interpreter = struct { err: ?Syscall.Error = null, task_count: std.atomic.Value(usize), tasks_done: usize = 0, - output_queue: std.DoublyLinkedList(BlockingOutput) = .{}, - started_output_queue: bool = false, + output_waiting: usize = 0, + output_done: usize = 0, }, - waiting_write_err: BufferedWriter, + waiting_write_err, done, } = .idle, - const BlockingOutput = struct { - writer: BufferedWriter, - arr: std.ArrayList(u8), - - pub fn deinit(this: *BlockingOutput) void { - this.arr.deinit(); - } - }; - pub fn start(this: *Ls) Maybe(void) { this.next(); return Maybe(void).success; @@ -4906,22 +4914,11 @@ pub const Interpreter = struct { pub fn writeFailingError(this: *Ls, buf: []const u8, exit_code: ExitCode) Maybe(void) { if (this.bltn.stderr.needsIO()) { - this.state = .{ - .waiting_write_err = BufferedWriter{ - .fd = this.bltn.stderr.expectFd(), - .buffer = buf, - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - }; - this.state.waiting_write_err.write(); + this.bltn.stderr.enqueueAndWrite(this, buf); return Maybe(void).success; } - if (this.bltn.writeNoIO(.stderr, buf).asErr()) |e| { - return .{ .err = e }; - } + _ = this.bltn.writeNoIO(.stderr, buf); this.bltn.done(exit_code); return Maybe(void).success; @@ -4967,7 +4964,14 @@ pub const Interpreter = struct { }, .exec => { // It's done - if (this.state.exec.tasks_done >= this.state.exec.task_count.load(.Monotonic) and this.state.exec.output_queue.len == 0) { + log("Ls(0x{x}, state=exec) Check: tasks_done={d} task_count={d} output_done={d} output_waiting={d}", .{ + @intFromPtr(this), + this.state.exec.tasks_done, + this.state.exec.task_count.load(.Monotonic), + this.state.exec.output_done, + this.state.exec.output_waiting, + }); + if (this.state.exec.tasks_done >= this.state.exec.task_count.load(.Monotonic) and this.state.exec.output_done >= this.state.exec.output_waiting) { const exit_code: ExitCode = if (this.state.exec.err != null) 1 else 0; this.state = .done; this.bltn.done(exit_code); @@ -4990,125 +4994,96 @@ pub const Interpreter = struct { _ = this; // autofix } - pub fn queueBlockingOutput(this: *Ls, bo: BlockingOutput) void { - _ = this.queueBlockingOutputImpl(bo, true); - } - - pub fn queueBlockingOutputImpl(this: *Ls, bo: BlockingOutput, do_run: bool) CoroutineResult { - const node = bun.default_allocator.create(std.DoublyLinkedList(BlockingOutput).Node) catch bun.outOfMemory(); - node.* = .{ - .data = bo, - }; - this.state.exec.output_queue.append(node); - - // Start it - if (this.state.exec.output_queue.len == 1 and do_run) { - // if (do_run and !this.state.exec.started_output_queue) { - this.state.exec.started_output_queue = true; - this.state.exec.output_queue.first.?.data.writer.write(); - return .yield; - } - return .cont; - } - - fn scheduleBlockingOutput(this: *Ls) CoroutineResult { - if (this.state.exec.output_queue.len > 0) { - this.state.exec.output_queue.first.?.data.writer.write(); - return .yield; - } - return .cont; - } - - pub fn onBufferedWriterDone(this: *Ls, e: ?Syscall.Error) void { - _ = e; // autofix - + pub fn onIOWriterDone(this: *Ls, e: ?JSC.SystemError) void { + if (e) |err| err.deref(); if (this.state == .waiting_write_err) { // if (e) |err| return this.bltn.done(1); return this.bltn.done(1); } - - var queue = &this.state.exec.output_queue; - var first = queue.popFirst().?; - defer { - first.data.deinit(); - bun.default_allocator.destroy(first); - } - if (first.next) |next_writer| { - next_writer.data.writer.write(); - return; - } - + this.state.exec.output_done += 1; this.next(); } - pub fn onAsyncTaskDone(this: *Ls, task_: *ShellLsTask) void { + pub fn onShellLsTaskDone(this: *Ls, task: *ShellLsTask) void { this.state.exec.tasks_done += 1; - const output = task_.takeOutput(); - const err = task_.err; - task_.deinit(); + const output = task.takeOutput(); + const err_ = task.err; - // const need_to_write_to_stdout_with_io = output.items.len > 0 and this.bltn.stdout.needsIO(); - var queued: bool = false; - - // Check for error, print it, but still want to print task output - if (err) |e| { - const error_string = this.bltn.taskErrorToString(.ls, e); - this.state.exec.err = e; + const reused: *ShellLsOutputTask = bun.new(ShellLsOutputTask, .{ + .ls = this, + .output = output, + .state = .waiting_write_err, + }); + if (err_) |err| { + const error_string = this.bltn.taskErrorToString(.ls, err); + this.state.exec.err = err; if (this.bltn.stderr.needsIO()) { - queued = true; - const blocking_output: BlockingOutput = .{ - .writer = BufferedWriter{ - .fd = this.bltn.stderr.expectFd(), - .buffer = error_string, - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - .arr = std.ArrayList(u8).init(bun.default_allocator), - }; - _ = this.queueBlockingOutputImpl(blocking_output, false); - // if (!need_to_write_to_stdout_with_io) return; // yield execution - } else { - if (this.bltn.writeNoIO(.stderr, error_string).asErr()) |theerr| { - this.bltn.throw(&bun.shell.ShellErr.newSys(theerr)); - } + this.state.exec.output_waiting += 1; + this.bltn.stderr.enqueueAndWrite(reused, error_string); + return; } + _ = this.bltn.writeNoIO(.stderr, error_string); } if (this.bltn.stdout.needsIO()) { - queued = true; - const blocking_output: BlockingOutput = .{ - .writer = BufferedWriter{ - .fd = this.bltn.stdout.expectFd(), - .buffer = output.items[0..], - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stdout), - }, - .arr = output, - }; - _ = this.queueBlockingOutputImpl(blocking_output, false); - // if (this.state == .done) return; - // return this.next(); - } - - if (queued) { - if (this.scheduleBlockingOutput() == .yield) return; - if (this.state == .done) return; - return this.next(); - } - - defer output.deinit(); - - if (this.bltn.writeNoIO(.stdout, output.items[0..]).asErr()) |e| { - this.bltn.throw(&bun.shell.ShellErr.newSys(e)); + this.state.exec.output_waiting += 1; + reused.state = .waiting_write_out; + this.bltn.stdout.enqueueAndWrite(reused, reused.output.items[0..]); return; } + _ = this.bltn.writeNoIO(.stdout, reused.output.items[0..]); - return this.next(); + reused.state = .done; + reused.deinit(); } + pub const ShellLsOutputTask = struct { + ls: *Ls, + output: std.ArrayList(u8), + state: union(enum) { + waiting_write_err, + waiting_write_out, + done, + }, + + pub fn deinit(this: *ShellLsOutputTask) void { + log("ReusedShellLsTask(0x{x}).deinit()", .{@intFromPtr(this)}); + if (comptime bun.Environment.allow_assert) std.debug.assert(this.state == .done); + this.ls.next(); + this.output.deinit(); + bun.destroy(this); + } + + pub fn onIOWriterDone(this: *ShellLsOutputTask, err: ?JSC.SystemError) void { + log("ShellLsOutputTask(0x{x}) onIOWriterDone", .{@intFromPtr(this)}); + if (err) |e| { + e.deref(); + } + + switch (this.state) { + .waiting_write_err => { + this.ls.state.exec.output_done += 1; + if (this.ls.bltn.stdout.needsIO()) { + this.ls.state.exec.output_waiting += 1; + this.state = .waiting_write_out; + this.ls.bltn.stdout.enqueueAndWrite(this, this.output.items[0..]); + return; + } + _ = this.ls.bltn.writeNoIO(.stdout, this.output.items[0..]); + this.state = .done; + this.deinit(); + }, + .waiting_write_out => { + this.ls.state.exec.output_done += 1; + this.state = .done; + this.deinit(); + }, + .done => @panic("Invalid state"), + } + } + }; + pub const ShellLsTask = struct { const print = bun.Output.scoped(.ShellLsTask, false); ls: *Ls, @@ -5289,18 +5264,18 @@ pub const Interpreter = struct { pub fn runFromMainThread(this: *@This()) void { print("runFromMainThread", .{}); - this.ls.onAsyncTaskDone(this); + this.ls.onShellLsTaskDone(this); } pub fn runFromMainThreadMini(this: *@This(), _: *void) void { this.runFromMainThread(); } - pub fn deinit(this: *@This()) void { - print("deinit", .{}); + pub fn deinit(this: *@This(), comptime free_this: bool) void { + print("deinit {s}", .{if (free_this) "free_this=true" else "free_this=false"}); bun.default_allocator.free(this.path); this.output.deinit(); - bun.default_allocator.destroy(this); + if (comptime free_this) bun.default_allocator.destroy(this); } }; @@ -5744,10 +5719,9 @@ pub const Interpreter = struct { }, done, waiting_write_err: struct { - writer: BufferedWriter, exit_code: ExitCode, }, - err: Syscall.Error, + err, } = .idle, pub const ShellMvCheckTargetTask = struct { @@ -5897,25 +5871,12 @@ pub const Interpreter = struct { pub fn writeFailingError(this: *Mv, buf: []const u8, exit_code: ExitCode) Maybe(void) { if (this.bltn.stderr.needsIO()) { - this.state = .{ - .waiting_write_err = .{ - .writer = BufferedWriter{ - .fd = this.bltn.stderr.expectFd(), - .buffer = buf, - .event_loop = this.bltn.eventLoop(), - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - .exit_code = exit_code, - }, - }; - this.state.waiting_write_err.writer.write(); + this.state = .{ .waiting_write_err = .{ .exit_code = exit_code } }; + this.bltn.stderr.enqueueAndWrite(this, buf); return Maybe(void).success; } - if (this.bltn.writeNoIO(.stderr, buf).asErr()) |e| { - return .{ .err = e }; - } + _ = this.bltn.writeNoIO(.stderr, buf); this.bltn.done(exit_code); return Maybe(void).success; @@ -6076,15 +6037,16 @@ pub const Interpreter = struct { return Maybe(void).success; } - this.bltn.done(this.state.err.errno); + this.bltn.done(1); return Maybe(void).success; } - pub fn onBufferedWriterDone(this: *Mv, e: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Mv, e: ?JSC.SystemError) void { + defer if (e) |err| err.deref(); switch (this.state) { .waiting_write_err => { if (e != null) { - this.state.err = e.?; + this.state = .err; _ = this.next(); return; } @@ -6261,7 +6223,7 @@ pub const Interpreter = struct { idx: u32 = 0, state: union(enum) { normal, - wait_write_err: BufferedWriter, + wait_write_err, } = .normal, }, exec: struct { @@ -6271,7 +6233,6 @@ pub const Interpreter = struct { err: ?Syscall.Error = null, lock: std.Thread.Mutex = std.Thread.Mutex{}, error_signal: std.atomic.Value(bool) = .{ .raw = false }, - output_queue: std.DoublyLinkedList(BlockingOutput) = .{}, output_done: std.atomic.Value(usize) = .{ .raw = 0 }, output_count: std.atomic.Value(usize) = .{ .raw = 0 }, state: union(enum) { @@ -6303,7 +6264,7 @@ pub const Interpreter = struct { } }, done: struct { exit_code: ExitCode }, - err: Syscall.Error, + err: ExitCode, } = .idle, pub const Opts = struct { @@ -6384,23 +6345,13 @@ pub const Interpreter = struct { if (parse_opts.idx >= parse_opts.args_slice.len) { const error_string = Builtin.Kind.usageString(.rm); if (this.bltn.stderr.needsIO()) { - parse_opts.state = .{ - .wait_write_err = BufferedWriter{ - .fd = this.bltn.stderr.expectFd(), - .event_loop = this.bltn.eventLoop(), - .buffer = error_string, - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - }; - parse_opts.state.wait_write_err.write(); + parse_opts.state = .wait_write_err; + this.bltn.stderr.enqueueAndWrite(this, error_string); return Maybe(void).success; } - switch (this.bltn.writeNoIO(.stderr, error_string)) { - .result => {}, - .err => |e| return Maybe(void).initErr(e), - } + _ = this.bltn.writeNoIO(.stderr, error_string); + this.bltn.done(1); return Maybe(void).success; } @@ -6423,21 +6374,12 @@ pub const Interpreter = struct { if (this.opts.prompt_behaviour != .never) { const buf = "rm: \"-i\" is not supported yet"; if (this.bltn.stderr.needsIO()) { - parse_opts.state = .{ - .wait_write_err = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .fd = this.bltn.stderr.expectFd(), - .buffer = buf, - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - }; - parse_opts.state.wait_write_err.write(); + parse_opts.state = .wait_write_err; + this.bltn.stderr.enqueueAndWrite(this, buf); continue; } - if (this.bltn.writeNoIO(.stderr, buf).asErr()) |e| - return Maybe(void).initErr(e); + _ = this.bltn.writeNoIO(.stderr, buf); this.bltn.done(1); return Maybe(void).success; @@ -6467,25 +6409,16 @@ pub const Interpreter = struct { }; if (is_root) { - const error_string = this.bltn.fmtErrorArena(.rm, "\"{s}\" may not be removed\n", .{resolved_path}); if (this.bltn.stderr.needsIO()) { - parse_opts.state = .{ - .wait_write_err = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .fd = this.bltn.stderr.expectFd(), - .buffer = error_string, - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - }; - parse_opts.state.wait_write_err.write(); + parse_opts.state = .wait_write_err; + this.bltn.stderr.enqueueFmtBltnAndWrite(this, .rm, "\"{s}\" may not be removed\n", .{resolved_path}); return Maybe(void).success; } - switch (this.bltn.writeNoIO(.stderr, error_string)) { - .result => {}, - .err => |e| return Maybe(void).initErr(e), - } + const error_string = this.bltn.fmtErrorArena(.rm, "\"{s}\" may not be removed\n", .{resolved_path}); + + _ = this.bltn.writeNoIO(.stderr, error_string); + this.bltn.done(1); return Maybe(void).success; } @@ -6509,67 +6442,48 @@ pub const Interpreter = struct { .illegal_option => { const error_string = "rm: illegal option -- -\n"; if (this.bltn.stderr.needsIO()) { - parse_opts.state = .{ - .wait_write_err = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .fd = this.bltn.stderr.expectFd(), - .buffer = error_string, - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - }; - parse_opts.state.wait_write_err.write(); + parse_opts.state = .wait_write_err; + this.bltn.stderr.enqueueAndWrite(this, error_string); return Maybe(void).success; } - switch (this.bltn.writeNoIO(.stderr, error_string)) { - .result => {}, - .err => |e| return Maybe(void).initErr(e), - } + _ = this.bltn.writeNoIO(.stderr, error_string); + this.bltn.done(1); return Maybe(void).success; }, .illegal_option_with_flag => { const flag = arg; - const error_string = this.bltn.fmtErrorArena(.rm, "illegal option -- {s}\n", .{flag[1..]}); if (this.bltn.stderr.needsIO()) { - parse_opts.state = .{ - .wait_write_err = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .fd = this.bltn.stderr.expectFd(), - .buffer = error_string, - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - }; - parse_opts.state.wait_write_err.write(); + parse_opts.state = .wait_write_err; + this.bltn.stderr.enqueueFmtBltnAndWrite(this, .rm, "illegal option -- {s}\n", .{flag[1..]}); return Maybe(void).success; } + const error_string = this.bltn.fmtErrorArena(.rm, "illegal option -- {s}\n", .{flag[1..]}); + + _ = this.bltn.writeNoIO(.stderr, error_string); - switch (this.bltn.writeNoIO(.stderr, error_string)) { - .result => {}, - .err => |e| return Maybe(void).initErr(e), - } this.bltn.done(1); return Maybe(void).success; }, } }, .wait_write_err => { - // Errored - if (parse_opts.state.wait_write_err.err) |e| { - this.state = .{ .err = e }; - continue; - } + @panic("Invalid"); + // // Errored + // if (parse_opts.state.wait_write_err.err) |e| { + // this.state = .{ .err = e }; + // continue; + // } - // Done writing - if (this.state.parse_opts.state.wait_write_err.remain() == 0) { - this.state = .{ .done = .{ .exit_code = 0 } }; - continue; - } + // // Done writing + // if (this.state.parse_opts.state.wait_write_err.remain() == 0) { + // this.state = .{ .done = .{ .exit_code = 0 } }; + // continue; + // } - // yield execution to continue writing - return Maybe(void).success; + // // yield execution to continue writing + // return Maybe(void).success; }, } }, @@ -6601,43 +6515,34 @@ pub const Interpreter = struct { } if (this.state == .err) { - this.bltn.done(this.state.err.errno); + this.bltn.done(this.state.err); return Maybe(void).success; } return Maybe(void).success; } - pub fn onBufferedWriterDone(this: *Rm, e: ?Syscall.Error) void { + pub fn onIOWriterDone(this: *Rm, e: ?JSC.SystemError) void { if (comptime bun.Environment.allow_assert) { std.debug.assert((this.state == .parse_opts and this.state.parse_opts.state == .wait_write_err) or - (this.state == .exec and this.state.exec.state == .waiting and this.state.exec.output_queue.len > 0)); + (this.state == .exec and this.state.exec.state == .waiting and this.state.exec.output_count.load(.SeqCst) > 0)); } if (this.state == .exec and this.state.exec.state == .waiting) { log("[rm] output done={d} output count={d}", .{ this.state.exec.getOutputCount(.output_done), this.state.exec.getOutputCount(.output_count) }); this.state.exec.incrementOutputCount(.output_done); - // _ = this.state.exec.output_done.fetchAdd(1, .Monotonic); - var queue = &this.state.exec.output_queue; - var first = queue.popFirst().?; - defer { - first.data.deinit(); - bun.default_allocator.destroy(first); - } - if (first.next) |next_writer| { - next_writer.data.writer.write(); - } else { - if (this.state.exec.state.tasksDone() >= this.state.exec.total_tasks and this.state.exec.getOutputCount(.output_done) >= this.state.exec.getOutputCount(.output_count)) { - this.bltn.done(if (this.state.exec.err != null) 1 else 0); - return; - } + if (this.state.exec.state.tasksDone() >= this.state.exec.total_tasks and this.state.exec.getOutputCount(.output_done) >= this.state.exec.getOutputCount(.output_count)) { + const code: ExitCode = if (this.state.exec.err != null) 1 else 0; + this.bltn.done(code); + return; } return; } if (e != null) { - this.state = .{ .err = e.? }; - this.bltn.done(e.?.errno); + defer e.?.deref(); + this.state = .{ .err = @intFromEnum(e.?.getErrno()) }; + this.bltn.done(e.?.getErrno()); return; } @@ -6645,25 +6550,23 @@ pub const Interpreter = struct { return; } - pub fn writeToStdoutFromAsyncTask(this: *Rm, comptime fmt: []const u8, args: anytype) Maybe(void) { - const buf = this.rm.bltn.fmtErrorArena(null, fmt, args); - if (!this.rm.bltn.stdout.needsIO()) { - this.state.exec.lock.lock(); - defer this.state.exec.lock.unlock(); - return switch (this.rm.bltn.writeNoIO(.stdout, buf)) { - .result => Maybe(void).success, - .err => |e| Maybe(void).initErr(e), - }; - } + // pub fn writeToStdoutFromAsyncTask(this: *Rm, comptime fmt: []const u8, args: anytype) Maybe(void) { + // const buf = this.rm.bltn.fmtErrorArena(null, fmt, args); + // if (!this.rm.bltn.stdout.needsIO()) { + // this.state.exec.lock.lock(); + // defer this.state.exec.lock.unlock(); + // _ = this.rm.bltn.writeNoIO(.stdout, buf); + // return Maybe(void).success; + // } - var written: usize = 0; - while (written < buf.len) : (written += switch (Syscall.write(this.rm.bltn.stdout.fd, buf)) { - .err => |e| return Maybe(void).initErr(e), - .result => |n| n, - }) {} + // var written: usize = 0; + // while (written < buf.len) : (written += switch (Syscall.write(this.rm.bltn.stdout.fd, buf)) { + // .err => |e| return Maybe(void).initErr(e), + // .result => |n| n, + // }) {} - return Maybe(void).success; - } + // return Maybe(void).success; + // } pub fn deinit(this: *Rm) void { _ = this; @@ -6743,7 +6646,7 @@ pub const Interpreter = struct { return .continue_parsing; } - pub fn onAsyncTaskDone(this: *Rm, task: *ShellRmTask) void { + pub fn onShellRmTaskDone(this: *Rm, task: *ShellRmTask) void { var exec = &this.state.exec; const tasks_done = switch (exec.state) { .idle => @panic("Invalid state"), @@ -6754,30 +6657,18 @@ pub const Interpreter = struct { exec.err = err; const error_string = this.bltn.taskErrorToString(.rm, err); if (!this.bltn.stderr.needsIO()) { - if (this.bltn.writeNoIO(.stderr, error_string).asErr()) |e| { - this.bltn.throw(&bun.shell.ShellErr.newSys(e)); - return; - } + _ = this.bltn.writeNoIO(.stderr, error_string); } else { - const bo = BlockingOutput{ - .writer = BufferedWriter{ - .event_loop = this.bltn.eventLoop(), - .fd = this.bltn.stderr.expectFd(), - .buffer = error_string, - .parent = BufferedWriter.ParentPtr.init(this), - .bytelist = this.bltn.stdBufferedBytelist(.stderr), - }, - .arr = std.ArrayList(u8).init(bun.default_allocator), - }; exec.incrementOutputCount(.output_count); - // _ = exec.output_count.fetchAdd(1, .Monotonic); - return this.queueBlockingOutput(bo); + this.bltn.stderr.enqueueAndWrite(this, error_string); + return; } } break :brk amt; }, }; + log("ShellRmTask(0x{x}, task.)", .{task.root_path}); // Wait until all tasks done and all output is written if (tasks_done >= this.state.exec.total_tasks and exec.getOutputCount(.output_done) >= exec.getOutputCount(.output_count)) @@ -6790,44 +6681,20 @@ pub const Interpreter = struct { fn writeVerbose(this: *Rm, verbose: *ShellRmTask.DirTask) void { if (!this.bltn.stdout.needsIO()) { - if (this.bltn.writeNoIO(.stdout, verbose.deleted_entries.items[0..]).asErr()) |err| { - this.bltn.parentCmd().base.throw(&bun.shell.ShellErr.newSys(err)); - return; - } + _ = this.bltn.writeNoIO(.stdout, verbose.deleted_entries.items[0..]); // _ = this.state.exec.output_done.fetchAdd(1, .SeqCst); _ = this.state.exec.incrementOutputCount(.output_done); if (this.state.exec.state.tasksDone() >= this.state.exec.total_tasks and this.state.exec.getOutputCount(.output_done) >= this.state.exec.getOutputCount(.output_count)) { - this.bltn.done(if (this.state.exec.err != null) 1 else 0); + this.bltn.done(if (this.state.exec.err != null) @as(ExitCode, 1) else @as(ExitCode, 0)); return; } return; } - this.queueBlockingOutput(verbose.toBlockingOutput()); + const buf = verbose.takeDeletedEntries(); + defer buf.deinit(); + this.bltn.stdout.enqueueAndWrite(this, buf.items[0..]); } - fn queueBlockingOutput(this: *Rm, bo: BlockingOutput) void { - const node = bun.default_allocator.create(std.DoublyLinkedList(BlockingOutput).Node) catch bun.outOfMemory(); - node.* = .{ - .data = bo, - }; - - this.state.exec.output_queue.append(node); - - // Need to start it - if (this.state.exec.output_queue.len == 1) { - this.state.exec.output_queue.first.?.data.writer.write(); - } - } - - const BlockingOutput = struct { - writer: BufferedWriter, - arr: std.ArrayList(u8), - - pub fn deinit(this: *BlockingOutput) void { - this.arr.deinit(); - } - }; - pub const ShellRmTask = struct { const print = bun.Output.scoped(.AsyncRmTask, false); @@ -6869,21 +6736,6 @@ pub const Interpreter = struct { const EntryKindHint = enum { idk, dir, file }; - pub fn toBlockingOutput(this: *DirTask) BlockingOutput { - const arr = this.takeDeletedEntries(); - const bo = BlockingOutput{ - .arr = arr, - .writer = BufferedWriter{ - .event_loop = this.task_manager.event_loop, - .fd = bun.STDOUT_FD, - .buffer = arr.items[0..], - .parent = BufferedWriter.ParentPtr.init(this.task_manager.rm), - .bytelist = this.task_manager.rm.bltn.stdBufferedBytelist(.stdout), - }, - }; - return bo; - } - pub fn takeDeletedEntries(this: *DirTask) std.ArrayList(u8) { const ret = this.deleted_entries; this.deleted_entries = std.ArrayList(u8).init(ret.allocator); @@ -6891,7 +6743,7 @@ pub const Interpreter = struct { } pub fn runFromMainThread(this: *DirTask) void { - print("runFromMainThread", .{}); + print("DirTask(0x{x}, path={s}) runFromMainThread", .{ @intFromPtr(this), this.path }); this.task_manager.rm.writeVerbose(this); } @@ -7069,7 +6921,6 @@ pub const Interpreter = struct { .concurrent_task = JSC.EventLoopTask.fromEventLoop(this.event_loop), }; std.debug.assert(parent_task.subtask_count.fetchAdd(1, .Monotonic) > 0); - print("enqueue: {s}", .{path}); JSC.WorkPool.schedule(&subtask.task); } @@ -7401,11 +7252,11 @@ pub const Interpreter = struct { } pub fn runFromMainThread(this: *ShellRmTask) void { - this.rm.onAsyncTaskDone(this); + this.rm.onShellRmTaskDone(this); } pub fn runFromMainThreadMini(this: *ShellRmTask, _: *void) void { - this.rm.onAsyncTaskDone(this); + this.rm.onShellRmTaskDone(this); } pub fn deinit(this: *ShellRmTask) void { @@ -7415,63 +7266,34 @@ pub const Interpreter = struct { }; }; - /// This is modified version of BufferedInput for file descriptors only. - /// - /// This struct cleans itself up when it is done, so no need to call `.deinit()` on - /// it. IT DOES NOT CLOSE FILE DESCRIPTORS - pub const BufferedWriter = - struct { - writer: Writer = if (bun.Environment.isWindows) .{} else .{ + pub const IOWriter = struct { + writer: WriterImpl = if (bun.Environment.isWindows) .{} else .{ .close_fd = false, }, - fd: bun.FileDescriptor = bun.invalid_fd, - buffer: []const u8 = "", - written: usize = 0, - parent: ParentPtr, - err: ?Syscall.Error = null, - /// optional bytelist for capturing the data - bytelist: ?*bun.ByteList = null, - event_loop: JSC.EventLoopHandle, + fd: bun.FileDescriptor, + writers: Writers = .{ .inlined = .{} }, + buf: std.ArrayListUnmanaged(u8) = .{}, + idx: usize = 0, + total_bytes_written: usize = 0, + ref_count: u32 = 1, + err: ?JSC.SystemError = null, + evtloop: JSC.EventLoopHandle, - const print = bun.Output.scoped(.BufferedWriter, false); + pub const DEBUG_REFCOUNT_NAME: []const u8 = "IOWriterRefCount"; + + const ChildPtr = IOWriterChildPtr; + // const ChildPtr = anyopaque{}; + + /// ~128kb + /// We shrunk the `buf` when we reach the last writer, + /// but if this never happens, we shrink `buf` when it exceeds this threshold + const SHRINK_THRESHOLD = 1024 * 128; pub const auto_poll = false; - pub fn write(this: *@This()) void { - if (comptime bun.Environment.isPosix) { - this.writer.parent = this; - // if (bun.Environment.allow_assert) std.debug.assert(@intFromPtr(this) == @intFromPtr(this.writer.parent)); - // if (this.writer.start(this.fd, true).asErr()) |_| { - // @panic("TODO handle file poll register faill"); - // } - switch (this.writer.start(this.fd, true)) { - .err => { - @panic("TODO handle file poll register faill"); - }, - .result => { - if (comptime bun.Environment.isPosix) { - // if (this.nonblocking) { - this.writer.getPoll().?.flags.insert(.nonblocking); - // } - - // TODO be able to configure this - // if (this.is_socket) { - // this.writer.getPoll().?.flags.insert(.socket); - // } else if (this.pollable) { - this.writer.getPoll().?.flags.insert(.fifo); - // } - } - }, - } - - return; - } - @panic("TODO SHELL WINDOWS!"); - } - + usingnamespace bun.NewRefCounted(@This(), This.deinit); const This = @This(); - pub const Poll = Writer; - pub const Writer = bun.io.BufferedWriter( + pub const WriterImpl = bun.io.BufferedWriter( This, onWrite, onError, @@ -7479,104 +7301,278 @@ pub const Interpreter = struct { getBuffer, null, ); + pub const Poll = WriterImpl; - pub const Status = union(enum) { - pending: void, - done: void, - err: bun.sys.Error, + pub fn __onClose(_: *This) void {} + pub fn __flush(_: *This) void {} + + pub fn refSelf(this: *This) *This { + this.ref(); + return this; + } + + pub fn init(fd: bun.FileDescriptor, evtloop: JSC.EventLoopHandle) *This { + const this = IOWriter.new(.{ + .fd = fd, + .evtloop = evtloop, + }); + + this.writer.parent = this; + this.writer.handle = .{ + .poll = this.writer.createPoll(fd), + }; + + return this; + } + + pub fn eventLoop(this: *This) JSC.EventLoopHandle { + return this.evtloop; + } + + /// Idempotent write call + pub fn write(this: *This) void { + if (bun.Environment.allow_assert) { + if (this.writer.handle != .poll) @panic("Should be poll."); + } + if (!this.writer.handle.poll.isRegistered()) { + this.writer.write(); + } + } + + const Writer = struct { + ptr: ChildPtr, + len: usize, + written: usize = 0, + bytelist: ?*bun.ByteList = null, }; - pub fn remain(this: *BufferedWriter) usize { - return this.buffer.len -| this.written; - } + pub const Writers = union(enum) { + inlined: Inlined, + heap: std.ArrayListUnmanaged(Writer), - pub fn eventLoop(this: *BufferedWriter) JSC.EventLoopHandle { - return this.event_loop; - } + const INLINED_MAX = 2; - pub fn getBuffer(this: *BufferedWriter) []const u8 { - if (this.written >= this.buffer.len) return ""; - return this.buffer[this.written..]; - } + pub const Inlined = struct { + writers: [INLINED_MAX]Writer = undefined, + len: u32 = 0, - pub fn onWrite(this: *BufferedWriter, amount: usize, done: bool) void { - if (this.bytelist) |bytelist| { - bytelist.append(bun.default_allocator, this.buffer[this.written .. this.written + amount]) catch bun.outOfMemory(); - } - this.written += amount; - log("BufferedWriter(0x{x}).onWrite({d}, {any}, total={d}, buffer={d})", .{ @intFromPtr(this), amount, done, this.written, this.buffer.len }); - if (done) return; - if (this.written >= this.buffer.len) return this.writer.end(); - if (comptime bun.Environment.isWindows) { - this.writer.write(); - } else this.writer.registerPoll(); - } - - pub fn onError(this: *BufferedWriter, err: bun.sys.Error) void { - this.err = err; - } - - pub fn onReady(this: *BufferedWriter) void { - _ = this; // autofix - } - - pub fn onClose(this: *BufferedWriter) void { - this.parent.onDone(this.err); - } - - pub const ParentPtr = struct { - const Types = .{ - Builtin.Export, - Builtin.Echo, - Builtin.Cd, - Builtin.Which, - Builtin.Rm, - Builtin.Pwd, - Builtin.Mv, - Builtin.Ls, - Cmd, - Pipeline, - }; - ptr: Repr, - pub const Repr = TaggedPointerUnion(Types); - - pub fn underlying(this: ParentPtr) type { - inline for (Types) |Ty| { - if (this.ptr.is(Ty)) return Ty; + pub fn promote(this: *Inlined, n: usize) std.ArrayListUnmanaged(Writer) { + var list = std.ArrayListUnmanaged(Writer).initCapacity(bun.default_allocator, n) catch bun.outOfMemory(); + list.appendSlice(bun.default_allocator, this.writers[0..this.len]) catch bun.outOfMemory(); + return list; } - @panic("Uh oh"); + }; + + pub inline fn len(this: *Writers) usize { + return this.inlined.len; } - pub fn init(p: anytype) ParentPtr { - return .{ - .ptr = Repr.init(p), + pub fn truncate(this: *Writers, starting_idx: usize) void { + switch (this.*) { + .inlined => { + if (starting_idx >= this.inlined.len) return; + const slice_to_move = this.inlined.writers[starting_idx..this.inlined.len]; + std.mem.copyForwards(Writer, this.inlined.writers[0..starting_idx], slice_to_move); + }, + .heap => { + const new_len = this.heap.items.len - starting_idx; + this.heap.replaceRange(bun.default_allocator, 0, starting_idx, this.heap.items[starting_idx..this.heap.items.len]) catch bun.outOfMemory(); + this.heap.items.len = new_len; + }, + } + } + + pub inline fn slice(this: *Writers) []const Writer { + return switch (this.*) { + .inlined => { + if (this.inlined.len == 0) return &[_]Writer{}; + return this.inlined.writers[0..this.inlined.len]; + }, + .heap => { + if (this.heap.items.len == 0) return &[_]Writer{}; + return this.heap.items[0..]; + }, }; } - pub fn onDone(this: ParentPtr, e: ?Syscall.Error) void { - if (this.ptr.is(Builtin.Export)) return this.ptr.as(Builtin.Export).onBufferedWriterDone(e); - if (this.ptr.is(Builtin.Echo)) return this.ptr.as(Builtin.Echo).onBufferedWriterDone(e); - if (this.ptr.is(Builtin.Cd)) return this.ptr.as(Builtin.Cd).onBufferedWriterDone(e); - if (this.ptr.is(Builtin.Which)) return this.ptr.as(Builtin.Which).onBufferedWriterDone(e); - if (this.ptr.is(Builtin.Rm)) return this.ptr.as(Builtin.Rm).onBufferedWriterDone(e); - if (this.ptr.is(Builtin.Pwd)) return this.ptr.as(Builtin.Pwd).onBufferedWriterDone(e); - if (this.ptr.is(Builtin.Mv)) return this.ptr.as(Builtin.Mv).onBufferedWriterDone(e); - if (this.ptr.is(Builtin.Ls)) return this.ptr.as(Builtin.Ls).onBufferedWriterDone(e); - if (this.ptr.is(Cmd)) return this.ptr.as(Cmd).onBufferedWriterDone(e); - @panic("Invalid ptr tag"); + pub inline fn get(this: *Writers, idx: usize) *Writer { + return switch (this.*) { + .inlined => { + if (bun.Environment.allow_assert) { + if (idx >= this.inlined.len) @panic("Index out of bounds"); + } + return &this.inlined.writers[idx]; + }, + .heap => &this.heap.items[idx], + }; + } + + pub fn append(this: *Writers, writer: Writer) void { + switch (this.*) { + .inlined => { + if (this.inlined.len == INLINED_MAX) { + this.* = .{ .heap = this.inlined.promote(INLINED_MAX) }; + return; + } + this.inlined.writers[this.inlined.len] = writer; + this.inlined.len += 1; + }, + .heap => { + this.heap.append(bun.default_allocator, writer) catch bun.outOfMemory(); + }, + } + } + + pub fn popFirst(this: *@This()) ?ChildPtr { + switch (this.*) { + .inlined => { + if (this.inlined.len == 0) return null; + const child = this.inlined.writers[0]; + if (this.inlined.len == 1) { + return child; + } + std.mem.copyForwards(ChildPtr, this.inlined[0 .. this.inlined.len - 1], this.inlined[1 .. this.inlined.len - 1]); + return child; + }, + .heap => { + if (this.heap.items.len == 0) return null; + const child = this.heap.orderedRemove(0) catch bun.outOfMemory(); + return child; + }, + } + } + + pub fn clearRetainingCapacity(this: *@This()) void { + switch (this.*) { + .inlined => { + this.inlined.len = 0; + }, + .heap => { + this.heap.clearRetainingCapacity(); + }, + } } }; - pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedWriter, .writable, onReady); + pub fn onWrite(this: *This, amount: usize, done: bool) void { + log("IOWriter(0x{x}, fd={}) write(amount={d}, done={})", .{ @intFromPtr(this), this.fd, amount, done }); + const child = this.writers.get(this.idx); + if (child.bytelist) |bl| { + const written_slice = this.buf.items[this.total_bytes_written .. this.total_bytes_written + amount]; + bl.append(bun.default_allocator, written_slice) catch bun.outOfMemory(); + } + this.total_bytes_written += amount; + child.written += amount; + if (done) { + const not_fully_written = !this.isLastIdx(this.idx) or child.written < child.len; + if (bun.Environment.allow_assert and not_fully_written) { + bun.Output.debugWarn("IOWriter(0x{x}) received done without fully writing data, check that onError is thrown", .{@intFromPtr(this)}); + } + return; + } - pub fn deref(this: *BufferedWriter) void { - this.pseudoref_count -= 1; - if (this.pseudoref_count == 0) {} + const wrote_everything = this.total_bytes_written >= this.buf.items.len; + + if (child.written >= child.len) { + this.bump(child); + } + + if (!wrote_everything) { + log("IOWriter(0x{x}, fd={}) poll again", .{ @intFromPtr(this), this.fd }); + if (comptime bun.Environment.isWindows) this.writer.write() else this.writer.registerPoll(); + } } - pub fn deinit(this: *BufferedWriter) void { - this.writer.deinit(); - this.parent.onDone(this.err); + pub fn onClose(this: *This) void { + _ = this; + } + + pub fn onError(this: *This, err__: bun.sys.Error) void { + this.err = err__.toSystemError(); + var seen_alloc = std.heap.stackFallback(@sizeOf(usize) * 64, bun.default_allocator); + var seen = std.ArrayList(usize).initCapacity(seen_alloc.get(), 64) catch bun.outOfMemory(); + defer seen.deinit(); + writer_loop: for (this.writers.slice()) |w| { + const ptr = w.ptr.ptr.ptr(); + for (seen.items[0..]) |item| { + if (item == @intFromPtr(ptr)) { + continue :writer_loop; + } + } + + w.ptr.onDone(this.err); + seen.append(@intFromPtr(ptr)) catch bun.outOfMemory(); + } + } + + pub fn getBuffer(this: *This) []const u8 { + const writer = this.writers.get(this.idx); + return this.buf.items[this.total_bytes_written .. this.total_bytes_written + writer.len]; + } + + pub fn bump(this: *This, current_writer: *Writer) void { + const child_ptr = current_writer.ptr; + defer child_ptr.onDone(null); + if (this.isLastIdx(this.idx)) { + this.buf.clearRetainingCapacity(); + this.idx = 0; + this.writers.clearRetainingCapacity(); + this.total_bytes_written = 0; + return; + } + this.idx += 1; + if (this.total_bytes_written >= SHRINK_THRESHOLD) { + const replace_range_len = this.buf.items.len - this.total_bytes_written; + if (replace_range_len == 0) { + this.buf.clearRetainingCapacity(); + } else { + this.buf.replaceRange(bun.default_allocator, 0, replace_range_len, this.buf.items[this.total_bytes_written..replace_range_len]) catch bun.outOfMemory(); + this.buf.items.len = replace_range_len; + } + this.writers.truncate(this.idx); + this.idx = 0; + } + } + + pub fn enqueue(this: *This, ptr: anytype, bytelist: ?*bun.ByteList, buf: []const u8) void { + const writer: Writer = .{ + .ptr = if (@TypeOf(ptr) == ChildPtr) ptr else ChildPtr.init(ptr), + .len = buf.len, + .bytelist = bytelist, + }; + this.buf.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory(); + this.writers.append(writer); + } + + pub fn enqueueFmtBltn(this: *This, ptr: anytype, bytelist: ?*bun.ByteList, comptime kind: ?Interpreter.Builtin.Kind, comptime fmt_: []const u8, args: anytype) void { + const cmd_str = comptime if (kind) |k| k.asString() ++ ": " else ""; + const fmt__ = cmd_str ++ fmt_; + this.enqueueFmt(ptr, bytelist, fmt__, args); + } + + pub fn enqueueFmt(this: *This, ptr: anytype, bytelist: ?*bun.ByteList, comptime fmt: []const u8, args: anytype) void { + var buf_writer = this.buf.writer(bun.default_allocator); + const start = this.buf.items.len; + buf_writer.print(fmt, args) catch bun.outOfMemory(); + const end = this.buf.items.len; + const writer: Writer = .{ + .ptr = if (@TypeOf(ptr) == ChildPtr) ptr else ChildPtr.init(ptr), + .len = end - start, + .bytelist = bytelist, + }; + this.writers.append(writer); + } + + pub fn deinit(this: *This) void { + log("IOWriter(0x{x}) deinit", .{@intFromPtr(this)}); + if (bun.Environment.allow_assert) std.debug.assert(this.ref_count == 0); + this.buf.deinit(bun.default_allocator); + if (this.fd != bun.invalid_fd) _ = bun.sys.close(this.fd); + this.destroy(); + } + + pub fn isLastIdx(this: *This, idx: usize) bool { + return idx == this.writers.len() -| 1; } }; }; @@ -7820,3 +7816,59 @@ fn throwShellErr(e: *const bun.shell.ShellErr, event_loop: JSC.EventLoopHandle) .js => e.throwJS(event_loop.js.global), } } + +pub const IOReader = struct { + fd: bun.FileDescriptor, + pipe_reader: bun.io.PipeReader = .{ .close_handle = false }, + buf: std.ArrayListUnmanaged(u8) = .{}, + read: usize = 0, + ref_count: u32 = 1, + + pub usingnamespace bun.NewRefCounted(@This(), deinit); + + pub const Reader = struct {}; + + pub fn init(fd: bun.FileDescriptor) *IOReader { + const reader = IOReader.new(.{ + .fd = fd, + }); + return reader; + } + + pub fn deinit(this: *@This()) void { + if (this.fd != bun.invalid_fd) { + _ = bun.sys.close(this.fd); + } + this.buf.deinit(bun.default_allocator); + bun.destroy(this); + } +}; + +pub const IOWriterChildPtr = struct { + ptr: ChildPtrRaw, + + pub const ChildPtrRaw = TaggedPointerUnion(.{ + Interpreter.Cmd, + Interpreter.Pipeline, + Interpreter.Builtin.Cd, + Interpreter.Builtin.Echo, + Interpreter.Builtin.Export, + Interpreter.Builtin.Ls, + Interpreter.Builtin.Ls.ShellLsOutputTask, + Interpreter.Builtin.Mv, + Interpreter.Builtin.Pwd, + Interpreter.Builtin.Rm, + Interpreter.Builtin.Which, + }); + + pub fn init(p: anytype) IOWriterChildPtr { + return .{ + .ptr = ChildPtrRaw.init(p), + // .ptr = @ptrCast(p), + }; + } + + pub fn onDone(this: IOWriterChildPtr, err: ?JSC.SystemError) void { + return this.ptr.call("onIOWriterDone", .{err}, void); + } +}; diff --git a/src/shell/shell.zig b/src/shell/shell.zig index 4c9e69926a..f3dbcb8239 100644 --- a/src/shell/shell.zig +++ b/src/shell/shell.zig @@ -26,6 +26,7 @@ pub const EnvMap = interpret.EnvMap; pub const EnvStr = interpret.EnvStr; pub const Interpreter = eval.Interpreter; pub const Subprocess = subproc.ShellSubprocess; +// pub const IOWriter = interpret.IOWriter; // pub const SubprocessMini = subproc.ShellSubprocessMini; const GlobWalker = Glob.GlobWalker_(null, true); @@ -40,9 +41,13 @@ pub const ShellErr = union(enum) { invalid_arguments: struct { val: []const u8 = "" }, todo: []const u8, - pub fn newSys(e: Syscall.Error) @This() { + pub fn newSys(e: anytype) @This() { return .{ - .sys = e.toSystemError(), + .sys = switch (@TypeOf(e)) { + Syscall.Error => e.toSystemError(), + JSC.SystemError => e, + else => @compileError("Invalid `e`: " ++ @typeName(e)), + }, }; } @@ -68,6 +73,7 @@ pub const ShellErr = union(enum) { } pub fn throwJS(this: *const @This(), globalThis: *JSC.JSGlobalObject) void { + defer this.deinit(bun.default_allocator); switch (this.*) { .sys => { const err = this.sys.toErrorInstance(globalThis); @@ -90,6 +96,7 @@ pub const ShellErr = union(enum) { } pub fn throwMini(this: @This()) void { + defer this.deinit(bun.default_allocator); switch (this) { .sys => { const err = this.sys; diff --git a/src/shell/subproc.zig b/src/shell/subproc.zig index e3a1691ada..6521e48919 100644 --- a/src/shell/subproc.zig +++ b/src/shell/subproc.zig @@ -866,7 +866,7 @@ pub const PipeReader = struct { pub fn onReadChunk(ptr: *anyopaque, chunk: []const u8, has_more: bun.io.ReadState) bool { var this: *PipeReader = @ptrCast(@alignCast(ptr)); this.buffered_output.append(chunk); - log("PipeReader(0x{x}, {s}) onReadChunk(...)", .{ @intFromPtr(this), @tagName(this.out_type) }); + log("PipeReader(0x{x}, {s}) onReadChunk(chunk_len={d}, has_more={s})", .{ @intFromPtr(this), @tagName(this.out_type), chunk.len, @tagName(has_more) }); if (!this.captured_writer.dead) { if (this.captured_writer.writer.getPoll() == null) { this.captured_writer.writer.handle = .{ .poll = Async.FilePoll.init(this.eventLoop(), if (this.out_type == .stdout) bun.STDOUT_FD else bun.STDERR_FD, .{}, @TypeOf(this.captured_writer.writer), &this.captured_writer.writer) }; @@ -880,7 +880,14 @@ pub const PipeReader = struct { else => {}, } } - return has_more != .eof; + + const should_continue = has_more != .eof; + + if (should_continue) { + this.reader.registerPoll(); + } + + return should_continue; } pub fn onReaderDone(this: *PipeReader) void { @@ -989,12 +996,17 @@ pub const PipeReader = struct { } pub fn onReaderError(this: *PipeReader, err: bun.sys.Error) void { + log("PipeReader(0x{x}) onReaderError {}", .{ @intFromPtr(this), err }); if (this.state == .done) { bun.default_allocator.free(this.state.done); } this.state = .{ .err = err }; - if (this.process) |process| + this.signalDoneToCmd(); + if (this.process) |process| { + this.process = null; process.onCloseIO(this.kind(process)); + this.deref(); + } } pub fn close(this: *PipeReader) void { @@ -1018,7 +1030,7 @@ pub const PipeReader = struct { pub fn deinit(this: *PipeReader) void { log("PipeReader(0x{x}, {s}) deinit()", .{ @intFromPtr(this), @tagName(this.out_type) }); if (comptime Environment.isPosix) { - std.debug.assert(this.reader.isDone()); + std.debug.assert(this.reader.isDone() or this.state == .err); } if (comptime Environment.isWindows) { diff --git a/src/tagged_pointer.zig b/src/tagged_pointer.zig index 3ec0456e57..2d91447f6a 100644 --- a/src/tagged_pointer.zig +++ b/src/tagged_pointer.zig @@ -180,11 +180,16 @@ pub fn TaggedPointerUnion(comptime Types: anytype) type { } pub inline fn init(_ptr: anytype) @This() { + const tyinfo = @typeInfo(@TypeOf(_ptr)); + if (tyinfo != .Pointer) @compileError("Only pass pointers to TaggedPointerUnion.init(), you gave us a: " ++ @typeName(@TypeOf(_ptr))); + const Type = std.meta.Child(@TypeOf(_ptr)); return initWithType(Type, _ptr); } pub inline fn initWithType(comptime Type: type, _ptr: anytype) @This() { + const tyinfo = @typeInfo(@TypeOf(_ptr)); + if (tyinfo != .Pointer) @compileError("Only pass pointers to TaggedPointerUnion.init(), you gave us a: " ++ @typeName(@TypeOf(_ptr))); const name = comptime typeBaseName(@typeName(Type)); // there will be a compiler error if the passed in type doesn't exist in the enum @@ -194,6 +199,27 @@ pub fn TaggedPointerUnion(comptime Types: anytype) type { pub inline fn isNull(this: This) bool { return this.repr._ptr == 0; } + + pub inline fn call(this: This, comptime fn_name: []const u8, args_without_this: anytype, comptime Ret: type) Ret { + inline for (type_map) |entry| { + if (this.repr.data == entry.value) { + const pointer = this.as(entry.ty); + const func = &@field(entry.ty, fn_name); + const args = brk: { + var args: std.meta.ArgsTuple(@TypeOf(@field(entry.ty, fn_name))) = undefined; + args[0] = pointer; + + inline for (args_without_this, 1..) |a, i| { + args[i] = a; + } + + break :brk args; + }; + return @call(.auto, func, args); + } + } + @panic("Invalid tag"); + } }; } diff --git a/test/js/bun/shell/bunshell.test.ts b/test/js/bun/shell/bunshell.test.ts index 50dfa83ad6..f079b90c5b 100644 --- a/test/js/bun/shell/bunshell.test.ts +++ b/test/js/bun/shell/bunshell.test.ts @@ -38,12 +38,12 @@ afterAll(async () => { const BUN = process.argv0; describe("bunshell", () => { - describe.todo("concurrency", () => { + describe("concurrency", () => { test("writing to stdout", async () => { await Promise.all([ TestBuilder.command`echo 1`.stdout("1\n").run(), TestBuilder.command`echo 2`.stdout("2\n").run(), - TestBuilder.command`echo 3`.stdout("2\n").run(), + TestBuilder.command`echo 3`.stdout("3\n").run(), ]); }); }); @@ -116,7 +116,7 @@ describe("bunshell", () => { }); describe("quiet", async () => { - test("basic", async () => { + test.todo("basic", async () => { // Check its buffered { const { stdout, stderr } = await $`BUN_DEBUG_QUIET_LOGS=1 ${BUN} -e "console.log('hi'); console.error('lol')"`;