diff --git a/src/async/posix_event_loop.zig b/src/async/posix_event_loop.zig index a659025e07..2942a2f593 100644 --- a/src/async/posix_event_loop.zig +++ b/src/async/posix_event_loop.zig @@ -175,7 +175,6 @@ pub const FilePoll = struct { const LifecycleScriptSubprocessOutputReader = bun.install.LifecycleScriptSubprocess.OutputReader; pub const Owner = bun.TaggedPointerUnion(.{ - FileReader, FileSink, // ShellBufferedWriter, @@ -187,9 +186,7 @@ pub const FilePoll = struct { // ShellBufferedOutput, // ShellBufferedOutputMini, - ProcessPipeReader, StaticPipeWriter, - FileSink, Deactivated, DNSResolver, @@ -347,10 +344,6 @@ pub const FilePoll = struct { // var loader = ptr.as(ShellSubprocessCapturedBufferedWriterMini); // loader.onPoll(size_or_offset, 0); // }, - @field(Owner.Tag, bun.meta.typeBase(@typeName(ProcessPipeReader))) => { - var handler: *ProcessPipeReader = ptr.as(ProcessPipeReader); - handler.onPoll(size_or_offset); - }, @field(Owner.Tag, bun.meta.typeBase(@typeName(StaticPipeWriter))) => { var handler: *StaticPipeWriter = ptr.as(StaticPipeWriter); handler.onPoll(size_or_offset); @@ -577,6 +570,11 @@ pub const FilePoll = struct { /// This decrements the active counter if it was previously incremented /// "active" controls whether or not the event loop should potentially idle pub fn disableKeepingProcessAlive(this: *FilePoll, event_loop_ctx_: anytype) void { + if (comptime @TypeOf(event_loop_ctx_) == *JSC.EventLoop) { + disableKeepingProcessAlive(this, JSC.EventLoopHandle.init(event_loop_ctx_)); + return; + } + if (comptime @TypeOf(event_loop_ctx_) == JSC.EventLoopHandle) { event_loop_ctx_.loop().subActive(@as(u32, @intFromBool(this.flags.contains(.has_incremented_active_count)))); } else { @@ -594,7 +592,19 @@ pub const FilePoll = struct { return this.flags.contains(.keeps_event_loop_alive) and this.flags.contains(.has_incremented_poll_count); } + pub fn setKeepingProcessAlive(this: *FilePoll, event_loop_ctx_: anytype, value: bool) void { + if (value) { + this.enableKeepingProcessAlive(event_loop_ctx_); + } else { + this.disableKeepingProcessAlive(event_loop_ctx_); + } + } pub fn enableKeepingProcessAlive(this: *FilePoll, event_loop_ctx_: anytype) void { + if (comptime @TypeOf(event_loop_ctx_) == *JSC.EventLoop) { + enableKeepingProcessAlive(this, JSC.EventLoopHandle.init(event_loop_ctx_)); + return; + } + if (this.flags.contains(.closed)) return; diff --git a/src/bun.js/api/bun/process.zig b/src/bun.js/api/bun/process.zig index b822f98244..0fed6d7165 100644 --- a/src/bun.js/api/bun/process.zig +++ b/src/bun.js/api/bun/process.zig @@ -111,8 +111,8 @@ pub const ProcessExitHandler = struct { subprocess.onProcessExit(process, status, rusage); }, @field(TaggedPointer.Tag, bun.meta.typeBaseName(@typeName(ShellSubprocess))) => { - const subprocess = this.ptr.as(ShellSubprocess); - subprocess.onProcessExit(process, status, rusage); + // const subprocess = this.ptr.as(ShellSubprocess); + // subprocess.onProcessExit(process, status, rusage); }, else => { @panic("Internal Bun error: ProcessExitHandler has an invalid tag. Please file a bug report."); diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 02dc9b65f2..26de59468f 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -127,8 +127,6 @@ pub const Subprocess = struct { stderr, }; process: *Process = undefined, - closed_streams: u8 = 0, - deinit_onclose: bool = false, stdin: Writable, stdout: Readable, stderr: Readable, @@ -242,6 +240,8 @@ pub const Subprocess = struct { return true; } } + + return false; } pub fn onCloseIO(this: *Subprocess, kind: StdioKind) void { @@ -251,12 +251,12 @@ pub const Subprocess = struct { .pipe => |pipe| { pipe.signal.clear(); pipe.deref(); - this.stdin.* = .{ .ignore = {} }; + this.stdin = .{ .ignore = {} }; }, .buffer => { this.stdin.buffer.source.detach(); this.stdin.buffer.deref(); - this.stdin.* = .{ .ignore = {} }; + this.stdin = .{ .ignore = {} }; }, else => {}, } @@ -327,6 +327,13 @@ pub const Subprocess = struct { ignore: void, closed: void, + pub fn hasPendingActivity(this: *const Readable) bool { + return switch (this.*) { + .pipe => this.pipe.hasPendingActivity(), + else => false, + }; + } + pub fn ref(this: *Readable) void { switch (this.*) { .pipe => { @@ -362,6 +369,7 @@ pub const Subprocess = struct { .fd => Readable{ .fd = fd.? }, .memfd => Readable{ .memfd = stdio.memfd }, .pipe => Readable{ .pipe = PipeReader.create(event_loop, process, fd.?) }, + .array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}), }; } @@ -392,7 +400,7 @@ pub const Subprocess = struct { this.* = .{ .closed = {} }; _ = bun.sys.close(fd); }, - .pipe => |*pipe| { + .pipe => |pipe| { defer pipe.detach(); this.* = .{ .closed = {} }; }, @@ -598,106 +606,116 @@ pub const Subprocess = struct { return array; } - pub const StaticPipeWriter = struct { - writer: IOWriter = .{}, - fd: bun.FileDescriptor = bun.invalid_fd, - source: Source = .{ .detached = {} }, - process: *Subprocess = undefined, - event_loop: *JSC.EventLoop, - ref_count: u32 = 1, + pub const Source = union(enum) { + blob: JSC.WebCore.AnyBlob, + array_buffer: JSC.ArrayBuffer.Strong, + detached: void, - pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub fn slice(this: *const Source) []const u8 { + return switch (this.*) { + .blob => this.blob.sharedView(), + .array_buffer => this.array_buffer.slice(), + else => @panic("Invalid source"), + }; + } - pub const IOWriter = bun.io.BufferedWriter(StaticPipeWriter, onWrite, onError, onClose); - pub const Poll = IOWriter; - - pub fn updateRef(this: *StaticPipeWriter, add: bool) void { - if (add) { - this.writer.updateRef(this.event_loop, true); - } else { - this.writer.updateRef(this.event_loop, false); + pub fn detach(this: *@This()) void { + switch (this.*) { + .blob => { + this.blob.detach(); + }, + .array_buffer => { + this.array_buffer.deinit(); + }, + else => {}, } - } - - pub fn close(this: *StaticPipeWriter) void { - this.writer.close(); - } - - pub fn flush(this: *StaticPipeWriter) void { - this.writer.flush(); - } - - pub fn create(event_loop: *JSC.EventLoop, subprocess: *Subprocess, fd: bun.FileDescriptor, source: Source) *StaticPipeWriter { - return StaticPipeWriter.new(.{ - .event_loop = event_loop, - .process = subprocess, - .fd = fd, - .source = source, - }); - } - - pub const Source = union(enum) { - blob: JSC.WebCore.Blob, - array_buffer: JSC.ArrayBuffer.Strong, - detached: void, - - pub fn slice(this: *const Source) []const u8 { - return switch (this.*) { - .blob => this.blob.sharedView(), - .array_buffer => this.array_buffer.slice(), - else => @panic("Invalid source"), - }; - } - - pub fn detach(this: *@This()) void { - switch (this.*) { - .blob => { - this.blob.detach(); - }, - .array_buffer => { - this.array_buffer.deinit(); - }, - else => {}, - } - this.* = .detached; - } - }; - - pub fn onWrite(this: *StaticPipeWriter, amount: usize, is_done: bool) void { - _ = amount; // autofix - if (is_done) { - this.writer.close(); - } - } - - pub fn onError(this: *StaticPipeWriter, err: bun.sys.Error) void { - _ = err; // autofix - this.source.detach(); - } - - pub fn onClose(this: *StaticPipeWriter) void { - this.source.detach(); - this.process.onCloseIO(.stdin); - } - - pub fn deinit(this: *StaticPipeWriter) void { - this.writer.end(); - this.source.detach(); - this.destroy(); - } - - pub fn loop(this: *StaticPipeWriter) *uws.Loop { - return this.event_loop.virtual_machine.uwsLoop(); - } - - pub fn eventLoop(this: *StaticPipeWriter) *JSC.EventLoop { - return this.event_loop; + this.* = .detached; } }; + pub const StaticPipeWriter = NewStaticPipeWriter(Subprocess); + + pub fn NewStaticPipeWriter(comptime ProcessType: type) type { + return struct { + writer: IOWriter = .{}, + fd: bun.FileDescriptor = bun.invalid_fd, + source: Source = .{ .detached = {} }, + process: *ProcessType = undefined, + event_loop: JSC.EventLoopHandle, + ref_count: u32 = 1, + buffer: []const u8 = "", + + pub usingnamespace bun.NewRefCounted(@This(), deinit); + const This = @This(); + + pub const IOWriter = bun.io.BufferedWriter(This, onWrite, onError, onClose, getBuffer); + pub const Poll = IOWriter; + + pub fn updateRef(this: *This, add: bool) void { + this.writer.updateRef(this.event_loop, add); + } + + pub fn getBuffer(this: *This) []const u8 { + return this.buffer; + } + + pub fn close(this: *This) void { + this.writer.close(); + } + + pub fn flush(this: *This) void { + this.writer.flush(); + } + + pub fn create(event_loop: anytype, subprocess: *ProcessType, fd: bun.FileDescriptor, source: Source) *This { + return This.new(.{ + .event_loop = JSC.EventLoopHandle.init(event_loop), + .process = subprocess, + .fd = fd, + .source = source, + }); + } + + pub fn start(this: *This) JSC.Maybe(void) { + return this.writer.start(this.fd, this.source.slice(), this.event_loop, true); + } + + pub fn onWrite(this: *This, amount: usize, is_done: bool) void { + this.buffer = this.buffer[@min(amount, this.buffer.len)..]; + if (is_done) { + this.writer.close(); + } + } + + pub fn onError(this: *This, err: bun.sys.Error) void { + _ = err; // autofix + this.source.detach(); + } + + pub fn onClose(this: *This) void { + this.source.detach(); + this.process.onCloseIO(.stdin); + } + + pub fn deinit(this: *This) void { + this.writer.end(); + this.source.detach(); + this.destroy(); + } + + pub fn loop(this: *This) *uws.Loop { + return this.event_loop.virtual_machine.uwsLoop(); + } + + pub fn eventLoop(this: *This) JSC.EventLoopHandle { + return this.event_loop; + } + }; + } + pub const PipeReader = struct { - reader: IOReader = .{}, - process: *Subprocess = undefined, + reader: IOReader = undefined, + process: ?*Subprocess = null, event_loop: *JSC.EventLoop = undefined, ref_count: u32 = 1, state: union(enum) { @@ -707,23 +725,29 @@ pub const Subprocess = struct { } = .{ .pending = {} }, fd: bun.FileDescriptor = bun.invalid_fd, - pub const IOReader = bun.io.BufferedReader(PipeReader); + pub const IOReader = bun.io.BufferedReader; pub const Poll = IOReader; - // pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub usingnamespace bun.NewRefCounted(PipeReader, deinit); + + pub fn hasPendingActivity(this: *const PipeReader) bool { + return this.reader.hasPendingRead(); + } pub fn detach(this: *PipeReader) void { - this.process = undefined; - this.reader.is_done = true; - this.deinit(); + this.process = null; + this.deref(); } pub fn create(event_loop: *JSC.EventLoop, process: *Subprocess, fd: bun.FileDescriptor) *PipeReader { - return PipeReader.new(.{ + var this = PipeReader.new(.{ .process = process, .event_loop = event_loop, .fd = fd, + .reader = IOReader.init(@This()), }); + this.reader.setParent(this); + return this; } pub fn readAll(this: *PipeReader) void { @@ -743,8 +767,24 @@ pub const Subprocess = struct { const owned = this.toOwnedSlice(); this.state = .{ .done = owned }; this.reader.close(); - this.reader.deref(); - this.process.onCloseIO(this.kind()); + if (this.process) |process| { + this.process = null; + process.onCloseIO(this.kind(process)); + } + + this.deref(); + } + + pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind { + if (process.stdout == .pipe and process.stdout.pipe == reader) { + return .stdout; + } + + if (process.stderr == .pipe and process.stderr.pipe == reader) { + return .stderr; + } + + @panic("We should be either stdout or stderr"); } pub fn toOwnedSlice(this: *PipeReader) []u8 { @@ -764,18 +804,15 @@ pub const Subprocess = struct { } pub fn updateRef(this: *PipeReader, add: bool) void { - if (add) { - this.reader.updateRef(this.event_loop, true); - } else { - this.reader.updateRef(this.event_loop, false); - } + this.reader.updateRef(add); } pub fn toReadableStream(this: *PipeReader, globalObject: *JSC.JSGlobalObject) JSC.JSValue { + defer this.detach(); + switch (this.state) { .pending => { const stream = JSC.WebCore.ReadableStream.fromPipe(globalObject, &this.reader); - defer this.deref(); this.state = .{ .done = &.{} }; return stream; }, @@ -787,7 +824,7 @@ pub const Subprocess = struct { .err => |err| { _ = err; // autofix const empty = JSC.WebCore.ReadableStream.empty(globalObject); - JSC.WebCore.ReadableStream.cancel(JSC.WebCore.ReadableStream.fromJS(empty, globalObject), globalObject); + JSC.WebCore.ReadableStream.cancel(&JSC.WebCore.ReadableStream.fromJS(empty, globalObject).?, globalObject); return empty; }, } @@ -810,19 +847,8 @@ pub const Subprocess = struct { bun.default_allocator.free(this.state.done); } this.state = .{ .err = err }; - this.process.onCloseIO(this.kind()); - } - - fn kind(this: *const PipeReader) StdioKind { - if (this.process.stdout == .pipe and this.process.stdout.pipe == this) { - return .stdout; - } - - if (this.process.stderr == .pipe and this.process.stderr.pipe == this) { - return .stderr; - } - - @panic("We should be either stdout or stderr"); + if (this.process) |process| + process.onCloseIO(this.kind()); } pub fn close(this: *PipeReader) void { @@ -869,7 +895,7 @@ pub const Subprocess = struct { inherit: void, ignore: void, - pub fn hasPendingActivity(this: *Writable) bool { + pub fn hasPendingActivity(this: *const Writable) bool { return switch (this.*) { // we mark them as .ignore when they are closed, so this must be true .pipe => true, @@ -937,12 +963,12 @@ pub const Subprocess = struct { .blob => |blob| { return Writable{ - .buffer = StaticPipeWriter.create(event_loop, subprocess, fd, .{ .blob = blob }), + .buffer = StaticPipeWriter.create(event_loop, subprocess, fd.?, .{ .blob = blob }), }; }, .array_buffer => |array_buffer| { return Writable{ - .buffer = StaticPipeWriter.create(event_loop, subprocess, .{ .array_buffer = array_buffer }), + .buffer = StaticPipeWriter.create(event_loop, subprocess, fd.?, .{ .array_buffer = array_buffer }), }; }, .memfd => |memfd| { @@ -997,7 +1023,7 @@ pub const Subprocess = struct { pub fn close(this: *Writable) void { switch (this.*) { .pipe => |pipe| { - pipe.end(null); + _ = pipe.end(null); }, inline .memfd, .fd => |fd| { _ = bun.sys.close(fd); @@ -1203,13 +1229,13 @@ pub const Subprocess = struct { var stdio = [3]Stdio{ .{ .ignore = {} }, - .{ .pipe = null }, + .{ .pipe = {} }, .{ .inherit = {} }, }; if (comptime is_sync) { - stdio[1] = .{ .pipe = null }; - stdio[2] = .{ .pipe = null }; + stdio[1] = .{ .pipe = {} }; + stdio[2] = .{ .pipe = {} }; } var lazy = false; var on_exit_callback = JSValue.zero; @@ -1612,18 +1638,18 @@ pub const Subprocess = struct { return .zero; }, .stdout = Readable.init( + stdio[1], jsc_vm.eventLoop(), subprocess, - stdio[1], spawned.stdout, jsc_vm.allocator, default_max_buffer_size, is_sync, ), .stderr = Readable.init( + stdio[2], jsc_vm.eventLoop(), subprocess, - stdio[2], spawned.stderr, jsc_vm.allocator, default_max_buffer_size, @@ -1678,7 +1704,7 @@ pub const Subprocess = struct { } if (subprocess.stdin == .buffer) { - subprocess.stdin.buffer.start(spawned.stdin.?, true); + subprocess.stdin.buffer.start().assert(); } if (subprocess.stdout == .pipe) { @@ -1687,7 +1713,7 @@ pub const Subprocess = struct { } } - if (subprocess.stderr == .pie) { + if (subprocess.stderr == .pipe) { if (is_sync or !lazy) { subprocess.stderr.pipe.readAll(); } @@ -1745,315 +1771,6 @@ pub const Subprocess = struct { const os = std.os; - const Stdio = union(enum) { - inherit: void, - ignore: void, - fd: bun.FileDescriptor, - path: JSC.Node.PathLike, - blob: JSC.WebCore.AnyBlob, - array_buffer: JSC.ArrayBuffer.Strong, - memfd: bun.FileDescriptor, - pipe: void, - - const PipeExtra = struct { - fd: i32, - fileno: i32, - }; - - pub fn canUseMemfd(this: *const @This(), is_sync: bool) bool { - if (comptime !Environment.isLinux) { - return false; - } - - return switch (this.*) { - .blob => !this.blob.needsToReadFile(), - .memfd, .array_buffer => true, - .pipe => is_sync, - else => false, - }; - } - - pub fn byteSlice(this: *const @This()) []const u8 { - return switch (this.*) { - .blob => this.blob.slice(), - .array_buffer => |array_buffer| array_buffer.slice(), - else => "", - }; - } - - pub fn useMemfd(this: *@This(), index: u32) void { - const label = switch (index) { - 0 => "spawn_stdio_stdin", - 1 => "spawn_stdio_stdout", - 2 => "spawn_stdio_stderr", - else => "spawn_stdio_memory_file", - }; - - // We use the linux syscall api because the glibc requirement is 2.27, which is a little close for comfort. - const rc = std.os.linux.memfd_create(label, 0); - - log("memfd_create({s}) = {d}", .{ label, rc }); - - switch (std.os.linux.getErrno(rc)) { - .SUCCESS => {}, - else => |errno| { - log("Failed to create memfd: {s}", .{@tagName(errno)}); - return; - }, - } - - const fd = bun.toFD(rc); - - var remain = this.byteSlice(); - - if (remain.len > 0) - // Hint at the size of the file - _ = bun.sys.ftruncate(fd, @intCast(remain.len)); - - // Dump all the bytes in there - var written: isize = 0; - while (remain.len > 0) { - switch (bun.sys.pwrite(fd, remain, written)) { - .err => |err| { - if (err.getErrno() == .AGAIN) { - continue; - } - - Output.debugWarn("Failed to write to memfd: {s}", .{@tagName(err.getErrno())}); - _ = bun.sys.close(fd); - return; - }, - .result => |result| { - if (result == 0) { - Output.debugWarn("Failed to write to memfd: EOF", .{}); - _ = bun.sys.close(fd); - return; - } - written += @intCast(result); - remain = remain[result..]; - }, - } - } - - switch (this.*) { - .array_buffer => this.array_buffer.deinit(), - .blob => this.blob.detach(), - else => {}, - } - - this.* = .{ .memfd = fd }; - } - - fn toPosix( - stdio: *@This(), - ) bun.spawn.SpawnOptions.Stdio { - return switch (stdio) { - .pipe, .array_buffer, .blob => .{ .buffer = {} }, - .fd => |fd| .{ .pipe = fd }, - .memfd => |fd| .{ .pipe = fd }, - .path => |pathlike| .{ .path = pathlike.slice() }, - .inherit => .{ .inherit = {} }, - .ignore => .{ .ignore = {} }, - }; - } - - fn toWindows( - stdio: *@This(), - ) bun.spawn.SpawnOptions.Stdio { - return switch (stdio) { - .pipe, .array_buffer, .blob, .pipe => .{ .buffer = {} }, - .fd => |fd| .{ .pipe = fd }, - .path => |pathlike| .{ .path = pathlike.slice() }, - .inherit => .{ .inherit = {} }, - .ignore => .{ .ignore = {} }, - - .memfd => @panic("This should never happen"), - }; - } - - pub fn asSpawnOption( - stdio: *@This(), - ) bun.spawn.SpawnOptions.Stdio { - if (comptime Environment.isWindows) { - return stdio.toWindows(); - } else { - return stdio.toPosix(); - } - } - }; - - fn extractStdioBlob( - globalThis: *JSC.JSGlobalObject, - blob: JSC.WebCore.AnyBlob, - i: u32, - out_stdio: *Stdio, - ) bool { - const fd = bun.stdio(i); - - if (blob.needsToReadFile()) { - if (blob.store()) |store| { - if (store.data.file.pathlike == .fd) { - if (store.data.file.pathlike.fd == fd) { - out_stdio.* = Stdio{ .inherit = {} }; - } else { - switch (bun.FDTag.get(i)) { - .stdin => { - if (i == 1 or i == 2) { - globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); - return false; - } - }, - .stdout, .stderr => { - if (i == 0) { - globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); - return false; - } - }, - else => {}, - } - - out_stdio.* = Stdio{ .fd = store.data.file.pathlike.fd }; - } - - return true; - } - - out_stdio.* = .{ .path = store.data.file.pathlike.path }; - return true; - } - } - - out_stdio.* = .{ .blob = blob }; - return true; - } - - fn extractStdio( - globalThis: *JSC.JSGlobalObject, - i: u32, - value: JSValue, - out_stdio: *Stdio, - ) bool { - if (value.isEmptyOrUndefinedOrNull()) { - return true; - } - - if (value.isString()) { - const str = value.getZigString(globalThis); - if (str.eqlComptime("inherit")) { - out_stdio.* = Stdio{ .inherit = {} }; - } else if (str.eqlComptime("ignore")) { - out_stdio.* = Stdio{ .ignore = {} }; - } else if (str.eqlComptime("pipe") or str.eqlComptime("overlapped")) { - out_stdio.* = Stdio{ .pipe = null }; - } else if (str.eqlComptime("ipc")) { - out_stdio.* = Stdio{ .pipe = null }; // TODO: - } else { - globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'pipe', 'ignore', Bun.file(pathOrFd), number, or null", .{}); - return false; - } - - return true; - } else if (value.isNumber()) { - const fd = value.asFileDescriptor(); - if (fd.int() < 0) { - globalThis.throwInvalidArguments("file descriptor must be a positive integer", .{}); - return false; - } - - if (fd.int() >= std.math.maxInt(i32)) { - var formatter = JSC.ConsoleObject.Formatter{ .globalThis = globalThis }; - globalThis.throwInvalidArguments("file descriptor must be a valid integer, received: {}", .{ - value.toFmt(globalThis, &formatter), - }); - return false; - } - - switch (bun.FDTag.get(fd)) { - .stdin => { - if (i == 1 or i == 2) { - globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); - return false; - } - - out_stdio.* = Stdio{ .inherit = {} }; - return true; - }, - - .stdout, .stderr => |tag| { - if (i == 0) { - globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); - return false; - } - - if (i == 1 and tag == .stdout) { - out_stdio.* = .{ .inherit = {} }; - return true; - } else if (i == 2 and tag == .stderr) { - out_stdio.* = .{ .inherit = {} }; - return true; - } - }, - else => {}, - } - - out_stdio.* = Stdio{ .fd = fd }; - - return true; - } else if (value.as(JSC.WebCore.Blob)) |blob| { - return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, out_stdio); - } else if (value.as(JSC.WebCore.Request)) |req| { - req.getBodyValue().toBlobIfPossible(); - return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio); - } else if (value.as(JSC.WebCore.Response)) |req| { - req.getBodyValue().toBlobIfPossible(); - return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio); - } else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |req_const| { - var req = req_const; - if (i == 0) { - if (req.toAnyBlob(globalThis)) |blob| { - return extractStdioBlob(globalThis, blob, i, out_stdio); - } - - switch (req.ptr) { - .File, .Blob => { - globalThis.throwTODO("Support fd/blob backed ReadableStream in spawn stdin. See https://github.com/oven-sh/bun/issues/8049"); - return false; - }, - .Direct, .JavaScript, .Bytes => { - if (req.isLocked(globalThis)) { - globalThis.throwInvalidArguments("ReadableStream cannot be locked", .{}); - return false; - } - - out_stdio.* = .{ .pipe = req }; - return true; - }, - .Invalid => { - globalThis.throwInvalidArguments("ReadableStream is in invalid state.", .{}); - return false; - }, - } - } - } else if (value.asArrayBuffer(globalThis)) |array_buffer| { - if (array_buffer.slice().len == 0) { - globalThis.throwInvalidArguments("ArrayBuffer cannot be empty", .{}); - return false; - } - - out_stdio.* = .{ - .array_buffer = JSC.ArrayBuffer.Strong{ - .array_buffer = array_buffer, - .held = JSC.Strong.create(array_buffer.value, globalThis), - }, - }; - - return true; - } - - globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{}); - return false; - } - pub fn handleIPCMessage( this: *Subprocess, message: IPC.DecodedIPCMessage, diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 9206eab5cc..b136bffa4f 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -662,7 +662,7 @@ pub const DeferredTaskQueue = struct { return existing.found_existing; } - pub fn unregisterTask(this: *EventLoop, ctx: ?*anyopaque) bool { + pub fn unregisterTask(this: *DeferredTaskQueue, ctx: ?*anyopaque) bool { return this.map.swapRemove(ctx); } @@ -757,34 +757,41 @@ pub const EventLoop = struct { defer counter += 1; switch (task.tag()) { @field(Task.Tag, typeBaseName(@typeName(ShellLsTask))) => { + if (comptime true) @panic("TODO"); var shell_ls_task: *ShellLsTask = task.get(ShellLsTask).?; shell_ls_task.runFromMainThread(); // shell_ls_task.deinit(); }, @field(Task.Tag, typeBaseName(@typeName(ShellMvBatchedTask))) => { + if (comptime true) @panic("TODO"); var shell_mv_batched_task: *ShellMvBatchedTask = task.get(ShellMvBatchedTask).?; shell_mv_batched_task.task.runFromMainThread(); }, @field(Task.Tag, typeBaseName(@typeName(ShellMvCheckTargetTask))) => { + if (comptime true) @panic("TODO"); var shell_mv_check_target_task: *ShellMvCheckTargetTask = task.get(ShellMvCheckTargetTask).?; shell_mv_check_target_task.task.runFromMainThread(); }, @field(Task.Tag, typeBaseName(@typeName(ShellRmTask))) => { + if (comptime true) @panic("TODO"); var shell_rm_task: *ShellRmTask = task.get(ShellRmTask).?; shell_rm_task.runFromMainThread(); // shell_rm_task.deinit(); }, @field(Task.Tag, typeBaseName(@typeName(ShellRmDirTask))) => { + if (comptime true) @panic("TODO"); var shell_rm_task: *ShellRmDirTask = task.get(ShellRmDirTask).?; shell_rm_task.runFromMainThread(); // shell_rm_task.deinit(); }, @field(Task.Tag, typeBaseName(@typeName(ShellRmDirTaskMini))) => { + if (comptime true) @panic("TODO"); var shell_rm_task: *ShellRmDirTaskMini = task.get(ShellRmDirTaskMini).?; shell_rm_task.runFromMainThread(); // shell_rm_task.deinit(); }, @field(Task.Tag, typeBaseName(@typeName(ShellGlobTask))) => { + if (comptime true) @panic("TODO"); var shell_glob_task: *ShellGlobTask = task.get(ShellGlobTask).?; shell_glob_task.runFromMainThread(); shell_glob_task.deinit(); diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig index a3858528d9..4299b61239 100644 --- a/src/bun.js/webcore/body.zig +++ b/src/bun.js/webcore/body.zig @@ -545,12 +545,15 @@ pub const Body = struct { switch (readable.ptr) { .Blob => |blob| { + var store = blob.store orelse { + return Body.Value{ .Blob = Blob.initEmpty(globalThis) }; + }; + store.ref(); readable.forceDetach(globalThis); const result: Value = .{ - .Blob = Blob.initWithStore(blob.store, globalThis), + .Blob = Blob.initWithStore(store, globalThis), }; - blob.store.ref(); if (!blob.done) { blob.done = true; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index c46f9f0490..28d8e86bdb 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -119,7 +119,7 @@ pub const ReadableStream = struct { switch (stream.ptr) { .Blob => |blobby| { - var blob = JSC.WebCore.Blob.initWithStore(blobby.store, globalThis); + var blob = JSC.WebCore.Blob.initWithStore(blobby.store orelse return null, globalThis); blob.offset = blobby.offset; blob.size = blobby.remain; blob.store.?.ref(); @@ -339,6 +339,7 @@ pub const ReadableStream = struct { var reader = FileReader.Source.new(.{ .globalThis = globalThis, .context = .{ + .event_loop = JSC.EventLoopHandle.init(globalThis.bunVM().eventLoop()), .lazy = .{ .blob = store, }, @@ -357,7 +358,9 @@ pub const ReadableStream = struct { JSC.markBinding(@src()); var source = FileReader.Source.new(.{ .globalThis = globalThis, - .context = .{}, + .context = .{ + .event_loop = JSC.EventLoopHandle.init(globalThis.bunVM().eventLoop()), + }, }); source.context.reader.from(buffered_reader, &source.context); @@ -2861,7 +2864,7 @@ pub const FileSink = struct { .fd = fd, .event_loop_handle = JSC.EventLoopHandle.init(event_loop_handle), }); - this.writer.setParent(this); + this.writer.parent = this; return this; } @@ -2992,16 +2995,17 @@ pub const FileSink = struct { }; pub const FileReader = struct { - reader: IOReader = .{}, + reader: IOReader = IOReader.init(FileReader), done: bool = false, pending: StreamResult.Pending = .{}, pending_value: JSC.Strong = .{}, pending_view: []u8 = &.{}, fd: bun.FileDescriptor = bun.invalid_fd, - + started: bool = false, + event_loop: JSC.EventLoopHandle, lazy: Lazy = .{ .none = {} }, - pub const IOReader = bun.io.BufferedReader(@This()); + pub const IOReader = bun.io.BufferedReader; pub const Poll = IOReader; pub const tag = ReadableStream.Tag.File; @@ -3010,13 +3014,12 @@ pub const FileReader = struct { blob: *Blob.Store, }; - pub fn eventLoop(this: *FileReader) JSC.EventLoopHandle { - return this.parent().globalThis.bunVM().eventLoop(); + pub fn eventLoop(this: *const FileReader) JSC.EventLoopHandle { + return this.event_loop; } - pub fn loop(this: *FileReader) *uws.Loop { - _ = this; // autofix - return uws.Loop.get(); + pub fn loop(this: *const FileReader) *Async.Loop { + return this.eventLoop().loop(); } pub fn setup( @@ -3028,6 +3031,8 @@ pub const FileReader = struct { .done = false, .fd = fd, }; + + this.event_loop = this.parent().globalThis.bunVM().eventLoop(); } pub fn onStart(this: *FileReader) StreamStart { @@ -3038,6 +3043,9 @@ pub const FileReader = struct { }, } + this.started = true; + this.event_loop = JSC.EventLoopHandle.init(this.parent().globalThis.bunVM().eventLoop()); + return .{ .ready = {} }; } @@ -3125,7 +3133,7 @@ pub const FileReader = struct { return .{ .done = {} }; } - this.pending_value.set(this.parent().globalThis(), array); + this.pending_value.set(this.parent().globalThis, array); this.pending_view = buffer; return .{ .pending = &this.pending }; @@ -3175,10 +3183,11 @@ pub const FileReader = struct { pub const ByteBlobLoader = struct { offset: Blob.SizeType = 0, - store: *Blob.Store, + store: ?*Blob.Store = null, chunk_size: Blob.SizeType = 1024 * 1024 * 2, remain: Blob.SizeType = 1024 * 1024 * 2, done: bool = false, + pulled: bool = false, pub const tag = ReadableStream.Tag.Blob; @@ -3197,7 +3206,10 @@ pub const ByteBlobLoader = struct { this.* = ByteBlobLoader{ .offset = blobe.offset, .store = blobe.store.?, - .chunk_size = if (user_chunk_size > 0) @min(user_chunk_size, blobe.size) else @min(1024 * 1024 * 2, blobe.size), + .chunk_size = @min( + if (user_chunk_size > 0) @min(user_chunk_size, blobe.size) else blobe.size, + 1024 * 1024 * 2, + ), .remain = blobe.size, .done = false, }; @@ -3210,16 +3222,18 @@ pub const ByteBlobLoader = struct { pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult { array.ensureStillAlive(); defer array.ensureStillAlive(); + this.pulled = true; + const store = this.store orelse return .{ .done = {} }; if (this.done) { return .{ .done = {} }; } - var temporary = this.store.sharedView(); - temporary = temporary[this.offset..]; + var temporary = store.sharedView(); + temporary = temporary[@min(this.offset, temporary.len)..]; temporary = temporary[0..@min(buffer.len, @min(temporary.len, this.remain))]; if (temporary.len == 0) { - this.store.deref(); + this.clearStore(); this.done = true; return .{ .done = {} }; } @@ -3237,19 +3251,26 @@ pub const ByteBlobLoader = struct { return .{ .into_array = .{ .value = array, .len = copied } }; } - pub fn onCancel(_: *ByteBlobLoader) void {} + pub fn onCancel(this: *ByteBlobLoader) void { + this.clearStore(); + } pub fn deinit(this: *ByteBlobLoader) void { - if (!this.done) { - this.done = true; - this.store.deref(); - } + this.clearStore(); this.parent().destroy(); } + fn clearStore(this: *ByteBlobLoader) void { + if (this.store) |store| { + this.store = null; + store.deref(); + } + } + pub fn drain(this: *ByteBlobLoader) bun.ByteList { - var temporary = this.store.sharedView(); + const store = this.store orelse return .{}; + var temporary = store.sharedView(); temporary = temporary[this.offset..]; temporary = temporary[0..@min(16384, @min(temporary.len, this.remain))]; @@ -3358,7 +3379,7 @@ pub const ByteStream = struct { pub fn unpipe(this: *@This()) void { this.pipe.ctx = null; this.pipe.onPipe = null; - this.parent().decrementCount(); + _ = this.parent().decrementCount(); } pub fn onData( @@ -3379,8 +3400,8 @@ pub const ByteStream = struct { std.debug.assert(!this.has_received_last_chunk); this.has_received_last_chunk = stream.isDone(); - if (this.pipe.ctx != null) { - this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator); + if (this.pipe.ctx) |ctx| { + this.pipe.onPipe.?(ctx, stream, allocator); return; } diff --git a/src/install/lifecycle_script_runner.zig b/src/install/lifecycle_script_runner.zig index 0a38a8d905..c74492c880 100644 --- a/src/install/lifecycle_script_runner.zig +++ b/src/install/lifecycle_script_runner.zig @@ -20,8 +20,8 @@ pub const LifecycleScriptSubprocess = struct { finished_fds: u8 = 0, process: ?*Process = null, - stdout: OutputReader = .{}, - stderr: OutputReader = .{}, + stdout: OutputReader = OutputReader.init(@This()), + stderr: OutputReader = OutputReader.init(@This()), manager: *PackageManager, envp: [:null]?[*:0]u8, @@ -35,7 +35,7 @@ pub const LifecycleScriptSubprocess = struct { const uv = bun.windows.libuv; - pub const OutputReader = bun.io.BufferedReader(LifecycleScriptSubprocess); + pub const OutputReader = bun.io.BufferedReader; pub fn loop(this: *const LifecycleScriptSubprocess) *bun.uws.Loop { return this.manager.event_loop.loop(); @@ -92,6 +92,8 @@ pub const LifecycleScriptSubprocess = struct { const original_script = this.scripts[next_script_index].?; const cwd = bun.path.z(original_script.cwd, &cwd_z_buf); const env = manager.env; + this.stdout.setParent(this); + this.stderr.setParent(this); if (manager.scripts_node) |scripts_node| { manager.setNodeName( @@ -162,20 +164,11 @@ pub const LifecycleScriptSubprocess = struct { if (comptime Environment.isPosix) { if (spawned.stdout) |stdout| { - this.stdout = .{ - .parent = this, - .poll = Async.FilePoll.init(manager, stdout, .{}, OutputReader, &this.stdout), - }; - try this.stdout.start().unwrap(); + try this.stdout.start(stdout, true).unwrap(); } if (spawned.stderr) |stderr| { - this.stderr = .{ - .parent = this, - .poll = Async.FilePoll.init(manager, stderr, .{}, OutputReader, &this.stderr), - }; - - try this.stderr.start().unwrap(); + try this.stdout.start(stderr, true).unwrap(); } } else if (comptime Environment.isWindows) { if (spawned.stdout == .buffer) { @@ -205,21 +198,21 @@ pub const LifecycleScriptSubprocess = struct { pub fn printOutput(this: *LifecycleScriptSubprocess) void { if (!this.manager.options.log_level.isVerbose()) { - if (this.stdout.buffer.items.len +| this.stderr.buffer.items.len == 0) { + if (this.stdout.buffer().items.len +| this.stderr.buffer().items.len == 0) { return; } Output.disableBuffering(); Output.flush(); - if (this.stdout.buffer.items.len > 0) { - Output.errorWriter().print("{s}\n", .{this.stdout.buffer.items}) catch {}; - this.stdout.buffer.clearAndFree(); + if (this.stdout.buffer().items.len > 0) { + Output.errorWriter().print("{s}\n", .{this.stdout.buffer().items}) catch {}; + this.stdout.buffer().clearAndFree(); } - if (this.stderr.buffer.items.len > 0) { - Output.errorWriter().print("{s}\n", .{this.stderr.buffer.items}) catch {}; - this.stderr.buffer.clearAndFree(); + if (this.stderr.buffer().items.len > 0) { + Output.errorWriter().print("{s}\n", .{this.stderr.buffer().items}) catch {}; + this.stderr.buffer().clearAndFree(); } Output.enableBuffering(); @@ -350,8 +343,8 @@ pub const LifecycleScriptSubprocess = struct { this.resetPolls(); if (!this.manager.options.log_level.isVerbose()) { - this.stdout.buffer.clearAndFree(); - this.stderr.buffer.clearAndFree(); + this.stdout.deinit(); + this.stderr.deinit(); } this.destroy(); diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 98234e0d8f..720a03d601 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -1,24 +1,22 @@ const bun = @import("root").bun; const std = @import("std"); -const PipeReaderVTable = struct { - getFd: *const fn (*anyopaque) bun.FileDescriptor, - getBuffer: *const fn (*anyopaque) *std.ArrayList(u8), - onReadChunk: ?*const fn (*anyopaque, chunk: []u8) void = null, - registerPoll: ?*const fn (*anyopaque) void = null, - done: *const fn (*anyopaque) void, - onError: *const fn (*anyopaque, bun.sys.Error) void, -}; - /// Read a blocking pipe without blocking the current thread. pub fn PosixPipeReader( comptime This: type, - comptime vtable: PipeReaderVTable, + comptime vtable: struct { + getFd: *const fn (*This) bun.FileDescriptor, + getBuffer: *const fn (*This) *std.ArrayList(u8), + onReadChunk: ?*const fn (*This, chunk: []u8) void = null, + registerPoll: ?*const fn (*This) void = null, + done: *const fn (*This) void, + onError: *const fn (*This, bun.sys.Error) void, + }, ) type { return struct { pub fn read(this: *This) void { - const buffer = @call(.always_inline, vtable.getBuffer, .{this}); - const fd = @call(.always_inline, vtable.getFd, .{this}); + const buffer = vtable.getBuffer(this); + const fd = vtable.getFd(this); if (comptime bun.Environment.isLinux) { if (bun.C.linux.RWFFlagSupport.isMaybeSupported()) { readFromBlockingPipeWithoutBlockingLinux(this, buffer, fd, 0); @@ -40,7 +38,7 @@ pub fn PosixPipeReader( pub fn onPoll(parent: *This, size_hint: isize) void { const resizable_buffer = vtable.getBuffer(parent); - const fd = @call(.always_inline, vtable.getFd, .{parent}); + const fd = vtable.getFd(parent); readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint); } @@ -48,7 +46,7 @@ pub fn PosixPipeReader( const stack_buffer_len = 64 * 1024; inline fn drainChunk(parent: *This, resizable_buffer: *std.ArrayList(u8), start_length: usize) void { - if (comptime vtable.onReadChunk) |onRead| { + if (vtable.onReadChunk) |onRead| { if (resizable_buffer.items[start_length..].len > 0) { const chunk = resizable_buffer.items[start_length..]; onRead(parent, chunk); @@ -63,6 +61,7 @@ pub fn PosixPipeReader( } const start_length: usize = resizable_buffer.items.len; + const streaming = parent.vtable.isStreamingEnabled(); while (true) { var buffer: []u8 = resizable_buffer.unusedCapacitySlice(); @@ -88,8 +87,8 @@ pub fn PosixPipeReader( buffer = resizable_buffer.items; } - if (comptime vtable.onReadChunk) |onRead| { - onRead(parent, buffer); + if (streaming) { + parent.vtable.onReadChunk(buffer); } else if (buffer.ptr != &stack_buffer) { resizable_buffer.items.len += bytes_read; } @@ -127,6 +126,7 @@ pub fn PosixPipeReader( } const start_length: usize = resizable_buffer.items.len; + const streaming = parent.vtable.isStreamingEnabled(); while (true) { var buffer: []u8 = resizable_buffer.unusedCapacitySlice(); @@ -153,8 +153,8 @@ pub fn PosixPipeReader( buffer = resizable_buffer.items; } - if (comptime vtable.onReadChunk) |onRead| { - onRead(parent, buffer); + if (streaming) { + parent.vtable.onReadChunk(buffer); } else if (buffer.ptr != &stack_buffer) { resizable_buffer.items.len += bytes_read; } @@ -290,46 +290,59 @@ const Async = bun.Async; // This is a runtime type instead of comptime due to bugs in Zig. // https://github.com/ziglang/zig/issues/18664 -// const BufferedReaderVTable = struct { parent: *anyopaque = undefined, fns: *const Fn = undefined, + pub fn init(comptime Type: type) BufferedReaderVTable { + return .{ + .fns = Fn.init(Type), + }; + } + pub const Fn = struct { onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void = null, onReaderDone: *const fn (*anyopaque) void, onReaderError: *const fn (*anyopaque, bun.sys.Error) void, - loop: *const fn (*anyopaque) JSC.EventLoopHandle, + loop: *const fn (*anyopaque) *Async.Loop, eventLoop: *const fn (*anyopaque) JSC.EventLoopHandle, + + pub fn init(comptime Type: type) *const BufferedReaderVTable.Fn { + const loop_fn = &struct { + pub fn loop_fn(this: *anyopaque) *Async.Loop { + return Type.loop(@alignCast(@ptrCast(this))); + } + }.loop_fn; + + const eventLoop_fn = &struct { + pub fn eventLoop_fn(this: *anyopaque) JSC.EventLoopHandle { + return JSC.EventLoopHandle.init(Type.eventLoop(@alignCast(@ptrCast(this)))); + } + }.eventLoop_fn; + return comptime &BufferedReaderVTable.Fn{ + .onReadChunk = if (@hasDecl(Type, "onReadChunk")) @ptrCast(&Type.onReadChunk) else null, + .onReaderDone = @ptrCast(&Type.onReaderDone), + .onReaderError = @ptrCast(&Type.onReaderError), + .eventLoop = eventLoop_fn, + .loop = loop_fn, + }; + } }; - pub fn init(comptime Type: type) *const BufferedReaderVTable.Fn { - const loop_fn = &struct { - pub fn doLoop(this: *anyopaque) *Async.Loop { - return Type.loop(@alignCast(@ptrCast(this))); - } - }.loop; - - const eventLoop = &struct { - pub fn doLoop(this: *anyopaque) JSC.EventLoopHandle { - return JSC.EventLoopHandle.init(Type.eventLoop(@alignCast(@ptrCast(this)))); - } - }.eventLoop; - return comptime &BufferedReaderVTable.Fn{ - .onReadChunk = if (@hasDecl(Type, "onReadChunk")) @ptrCast(&Type.onReadChunk) else null, - .onReaderDone = @ptrCast(&Type.onReaderDone), - .onReaderError = @ptrCast(&Type.onReaderError), - .eventLoop = eventLoop, - .loop = loop_fn, - }; - } - - pub fn loop(this: @This()) JSC.EventLoopHandle { + pub fn eventLoop(this: @This()) JSC.EventLoopHandle { return this.fns.eventLoop(this.parent); } + pub fn loop(this: @This()) *Async.Loop { + return this.fns.loop(this.parent); + } + + pub fn isStreamingEnabled(this: @This()) bool { + return this.fns.onReadChunk != null; + } + pub fn onReadChunk(this: @This(), chunk: []const u8) void { - this.fns.onReadChunk(this.parent, chunk); + this.fns.onReadChunk.?(this.parent, chunk); } pub fn onReaderDone(this: @This()) void { @@ -345,39 +358,43 @@ const PosixBufferedReader = struct { handle: PollOrFd = .{ .closed = {} }, _buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), is_done: bool = false, - vtable: BufferedReaderVTable = .{}, + vtable: BufferedReaderVTable, - pub fn @"for"(comptime Type: type) PosixBufferedReader { + pub fn init(comptime Type: type) PosixBufferedReader { return .{ .vtable = BufferedReaderVTable.init(Type), }; } - pub fn from(to: *@This(), other: anytype, parent_: *anyopaque) void { + pub fn updateRef(this: *const PosixBufferedReader, value: bool) void { + const poll = this.handle.getPoll() orelse return; + poll.setKeepingProcessAlive(this.vtable.eventLoop(), value); + } + + pub fn from(to: *@This(), other: *PosixBufferedReader, parent_: *anyopaque) void { to.* = .{ .handle = other.handle, ._buffer = other.buffer().*, .is_done = other.is_done, - ._parent = parent_, + .vtable = .{ + .fns = to.vtable.fns, + .parent = parent_, + }, }; other.buffer().* = std.ArrayList(u8).init(bun.default_allocator); - to.setParent(parent_); - other.is_done = true; other.handle = .{ .closed = {} }; } - pub fn setParent(this: *@This(), parent_: *anyopaque) void { - this._parent = parent_; - if (!this.is_done) { - this.handle.setOwner(this); - } + pub fn setParent(this: *PosixBufferedReader, parent_: *anyopaque) void { + this.vtable.parent = parent_; + this.handle.setOwner(this); } pub usingnamespace PosixPipeReader(@This(), .{ .getFd = @ptrCast(&getFd), .getBuffer = @ptrCast(&buffer), - .onReadChunk = if (vtable.onReadChunk != null) @ptrCast(&_onReadChunk) else null, + .onReadChunk = @ptrCast(&_onReadChunk), .registerPoll = @ptrCast(®isterPoll), .done = @ptrCast(&done), .onError = @ptrCast(&onError), @@ -398,7 +415,7 @@ const PosixBufferedReader = struct { } pub fn buffer(this: *PosixBufferedReader) *std.ArrayList(u8) { - return &this._buffer; + return &@as(*PosixBufferedReader, @alignCast(@ptrCast(this)))._buffer; } pub fn disableKeepingProcessAlive(this: *@This(), event_loop_ctx: anytype) void { @@ -456,7 +473,7 @@ const PosixBufferedReader = struct { return .{ .result = {} }; } - const poll = Async.FilePoll.init(this.loop(), fd, .readable, @This(), this); + const poll = Async.FilePoll.init(this.eventLoop(), fd, .{}, @This(), this); const maybe = poll.register(this.loop(), .readable, true); if (maybe != .result) { poll.deinit(); @@ -476,32 +493,24 @@ const PosixBufferedReader = struct { return false; } - pub fn loop(this: *const PosixBufferedReader) JSC.EventLoopHandle { - return vtable.loop(this._parent); + pub fn loop(this: *const PosixBufferedReader) *Async.Loop { + return this.vtable.loop(); + } + + pub fn eventLoop(this: *const PosixBufferedReader) JSC.EventLoopHandle { + return this.vtable.eventLoop(); } }; -pub fn PosixBufferedReader(comptime vtable: anytype) type { - return PosixBufferedReaderWithVTable(.{ - .onReaderDone = @ptrCast(&vtable.onReaderDone), - .onReaderError = @ptrCast(&vtable.onReaderError), - .onReadChunk = if (@hasDecl(vtable, "onReadChunk")) @ptrCast(&vtable.onReadChunk) else null, - .loop = &struct { - pub fn doLoop(this: *anyopaque) JSC.EventLoopHandle { - _ = this; // autofix - // return JSC.EventLoopHandle.init(Parent.eventLoop(@alignCast(@ptrCast(this)))); - return undefined; - } - }.doLoop, - }); -} - const JSC = bun.JSC; const WindowsOutputReaderVTable = struct { onReaderDone: *const fn (*anyopaque) void, onReaderError: *const fn (*anyopaque, bun.sys.Error) void, - onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void = null, + onReadChunk: ?*const fn ( + *anyopaque, + chunk: []const u8, + ) void = null, }; pub const GenericWindowsBufferedReader = struct { diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index 1710fc1c66..ea6622f14c 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -131,30 +131,26 @@ const PollOrFd = @import("./pipes.zig").PollOrFd; pub fn PosixBufferedWriter( comptime Parent: type, - comptime onWrite: fn (*Parent, amount: usize, done: bool) void, - comptime onError: fn (*Parent, bun.sys.Error) void, - comptime onClose: fn (*Parent) void, + comptime onWrite: *const fn (*Parent, amount: usize, done: bool) void, + comptime onError: *const fn (*Parent, bun.sys.Error) void, + comptime onClose: *const fn (*Parent) void, + comptime getBuffer: *const fn (*Parent) []const u8, ) type { return struct { - buffer: []const u8 = "", handle: PollOrFd = .{ .closed = {} }, parent: *Parent = undefined, is_done: bool = false, const PosixWriter = @This(); - pub fn getPoll(this: *@This()) ?*Async.FilePoll { + pub fn getPoll(this: *const @This()) ?*Async.FilePoll { return this.handle.getPoll(); } - pub fn getFd(this: *PosixWriter) bun.FileDescriptor { + pub fn getFd(this: *const PosixWriter) bun.FileDescriptor { return this.handle.getFd(); } - pub fn getBuffer(this: *PosixWriter) []const u8 { - return this.buffer; - } - fn _onError( this: *PosixWriter, err: bun.sys.Error, @@ -172,7 +168,6 @@ pub fn PosixBufferedWriter( done: bool, ) void { const was_done = this.is_done == true; - this.buffer = this.buffer[written..]; const parent = this.parent; onWrite(parent, written, done); @@ -209,19 +204,19 @@ pub fn PosixBufferedWriter( return poll.canEnableKeepingProcessAlive(); } - pub fn enableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void { - if (this.is_done) return; - - const poll = this.getPoll() orelse return; - poll.enableKeepingProcessAlive(event_loop); + pub fn enableKeepingProcessAlive(this: *PosixWriter, event_loop: anytype) void { + this.updateRef(event_loop, true); } - pub fn disableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void { - const poll = this.getPoll() orelse return; - poll.disableKeepingProcessAlive(event_loop); + pub fn disableKeepingProcessAlive(this: *PosixWriter, event_loop: anytype) void { + this.updateRef(event_loop, false); } - pub usingnamespace PosixPipeWriter(@This(), getFd, getBuffer, _onWrite, registerPoll, _onError, _onWritable); + fn getBufferInternal(this: *PosixWriter) []const u8 { + return getBuffer(this.parent); + } + + pub usingnamespace PosixPipeWriter(@This(), getFd, getBufferInternal, _onWrite, registerPoll, _onError, _onWritable); pub fn end(this: *PosixWriter) void { if (this.is_done) { @@ -236,16 +231,12 @@ pub fn PosixBufferedWriter( this.handle.close(this.parent, onClose); } - pub fn updateRef(this: *PosixWriter, value: bool, event_loop: JSC.EventLoopHandle) void { - if (value) { - this.enableKeepingProcessAlive(event_loop); - } else { - this.disableKeepingProcessAlive(event_loop); - } + pub fn updateRef(this: *const PosixWriter, event_loop: anytype, value: bool) void { + const poll = this.getPoll() orelse return; + poll.setKeepingProcessAlive(event_loop, value); } - pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, bytes: []const u8, pollable: bool) JSC.Maybe(void) { - this.buffer = bytes; + pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, pollable: bool) JSC.Maybe(void) { if (!pollable) { std.debug.assert(this.handle != .poll); this.handle = .{ .fd = fd }; @@ -253,7 +244,7 @@ pub fn PosixBufferedWriter( } const loop = @as(*Parent, @ptrCast(this.parent)).loop(); var poll = this.getPoll() orelse brk: { - this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) }; + this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .{}, PosixWriter, this) }; break :brk this.handle.poll; }; @@ -344,7 +335,7 @@ pub fn PosixStreamingWriter( const poll = this.getPoll() orelse return; switch (poll.registerWithFd(@as(*Parent, @ptrCast(this.parent)).loop(), .writable, true, poll.fd)) { .err => |err| { - onError(this, err); + onError(this.parent, err); this.close(); }, .result => {}, @@ -432,6 +423,7 @@ pub fn PosixStreamingWriter( return .{ .done = amt }; }, + else => |r| return r, } } @@ -536,7 +528,7 @@ pub fn PosixStreamingWriter( const loop = @as(*Parent, @ptrCast(this.parent)).loop(); var poll = this.getPoll() orelse brk: { - this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) }; + this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .{}, PosixWriter, this) }; break :brk this.handle.poll; }; diff --git a/src/io/pipes.zig b/src/io/pipes.zig index 93df36d038..87d0b3f781 100644 --- a/src/io/pipes.zig +++ b/src/io/pipes.zig @@ -38,12 +38,12 @@ pub const PollOrFd = union(enum) { } if (fd != bun.invalid_fd) { - this.handle = .{ .closed = {} }; + this.* = .{ .closed = {} }; _ = bun.sys.close(fd); - if (comptime onCloseFn != void) - onCloseFn(@ptrCast(ctx.?)); + if (comptime @TypeOf(onCloseFn) != void) + onCloseFn(@alignCast(@ptrCast(ctx.?))); } else { - this.handle = .{ .closed = {} }; + this.* = .{ .closed = {} }; } } }; diff --git a/src/shell/util.zig b/src/shell/util.zig index 87963b1119..ce75b38222 100644 --- a/src/shell/util.zig +++ b/src/shell/util.zig @@ -17,128 +17,326 @@ const os = std.os; pub const OutKind = enum { stdout, stderr }; pub const Stdio = union(enum) { - /// When set to true, it means to capture the output - inherit: struct { captured: ?*bun.ByteList = null }, + inherit: void, + capture: *bun.ByteList, ignore: void, fd: bun.FileDescriptor, path: JSC.Node.PathLike, blob: JSC.WebCore.AnyBlob, - pipe: ?JSC.WebCore.ReadableStream, - array_buffer: struct { buf: JSC.ArrayBuffer.Strong, from_jsc: bool = false }, + array_buffer: JSC.ArrayBuffer.Strong, + memfd: bun.FileDescriptor, + pipe: void, - pub fn toPosix(self: Stdio) bun.spawn.SpawnOptions.Stdio { - return switch (self) { - .pipe, .blob, .array_buffer => .{ .buffer = {} }, - .inherit => |inherit| if (inherit.captured == null) .{ .inherit = {} } else .{ .buffer = {} }, - .fd => .{ .pipe = self.fd }, - .path => .{ .path = self.path.slice() }, - .ignore => .{ .ignore = {} }, - }; + const log = bun.sys.syslog; + + pub fn deinit(this: *Stdio) void { + switch (this.*) { + .array_buffer => |*array_buffer| { + array_buffer.deinit(); + }, + .blob => |*blob| { + blob.detach(); + }, + .memfd => |fd| { + _ = bun.sys.close(fd); + }, + else => {}, + } } - pub fn isPiped(self: Stdio) bool { - return switch (self) { - .array_buffer, .blob, .pipe => true, - .inherit => self.inherit.captured != null, + pub fn canUseMemfd(this: *const @This(), is_sync: bool) bool { + if (comptime !Environment.isLinux) { + return false; + } + + return switch (this.*) { + .blob => !this.blob.needsToReadFile(), + .memfd, .array_buffer => true, + .pipe => is_sync, else => false, }; } - pub fn setUpChildIoPosixSpawn( - stdio: @This(), - actions: *PosixSpawn.Actions, - pipe_fd: [2]bun.FileDescriptor, - comptime std_fileno: bun.FileDescriptor, - ) !void { - switch (stdio) { - .array_buffer, .blob, .pipe => { - std.debug.assert(!(stdio == .blob and stdio.blob.needsToReadFile())); - const idx: usize = if (std_fileno == bun.STDIN_FD) 0 else 1; + pub fn useMemfd(this: *@This(), index: u32) void { + const label = switch (index) { + 0 => "spawn_stdio_stdin", + 1 => "spawn_stdio_stdout", + 2 => "spawn_stdio_stderr", + else => "spawn_stdio_memory_file", + }; - try actions.dup2(pipe_fd[idx], std_fileno); - try actions.close(pipe_fd[1 - idx]); - }, - .inherit => { - if (stdio.inherit.captured != null) { - // Same as above - std.debug.assert(!(stdio == .blob and stdio.blob.needsToReadFile())); - const idx: usize = if (std_fileno == bun.STDIN_FD) 0 else 1; + // We use the linux syscall api because the glibc requirement is 2.27, which is a little close for comfort. + const rc = std.os.linux.memfd_create(label, 0); - try actions.dup2(pipe_fd[idx], std_fileno); - try actions.close(pipe_fd[1 - idx]); - return; - } + log("memfd_create({s}) = {d}", .{ label, rc }); - if (comptime Environment.isMac) { - try actions.inherit(std_fileno); - } else { - try actions.dup2(std_fileno, std_fileno); - } - }, - .fd => |fd| { - try actions.dup2(fd, std_fileno); - }, - .path => |pathlike| { - const flag = if (std_fileno == bun.STDIN_FD) @as(u32, os.O.RDONLY) else @as(u32, std.os.O.WRONLY); - try actions.open(std_fileno, pathlike.slice(), flag | std.os.O.CREAT, 0o664); - }, - .ignore => { - const flag = if (std_fileno == bun.STDIN_FD) @as(u32, os.O.RDONLY) else @as(u32, std.os.O.WRONLY); - try actions.openZ(std_fileno, "/dev/null", flag, 0o664); + switch (std.os.linux.getErrno(rc)) { + .SUCCESS => {}, + else => |errno| { + log("Failed to create memfd: {s}", .{@tagName(errno)}); + return; }, } - } -}; -pub fn extractStdioBlob( - globalThis: *JSC.JSGlobalObject, - blob: JSC.WebCore.AnyBlob, - i: u32, - stdio_array: []Stdio, -) bool { - const fd = bun.stdio(i); + const fd = bun.toFD(rc); - if (blob.needsToReadFile()) { - if (blob.store()) |store| { - if (store.data.file.pathlike == .fd) { - if (store.data.file.pathlike.fd == fd) { - stdio_array[i] = Stdio{ .inherit = .{} }; - } else { - switch (bun.FDTag.get(i)) { - .stdin => { - if (i == 1 or i == 2) { - globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); - return false; - } - }, + var remain = this.byteSlice(); - .stdout, .stderr => { - if (i == 0) { - globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); - return false; - } - }, - else => {}, + if (remain.len > 0) + // Hint at the size of the file + _ = bun.sys.ftruncate(fd, @intCast(remain.len)); + + // Dump all the bytes in there + var written: isize = 0; + while (remain.len > 0) { + switch (bun.sys.pwrite(fd, remain, written)) { + .err => |err| { + if (err.getErrno() == .AGAIN) { + continue; } - stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd }; - } - - return true; + Output.debugWarn("Failed to write to memfd: {s}", .{@tagName(err.getErrno())}); + _ = bun.sys.close(fd); + return; + }, + .result => |result| { + if (result == 0) { + Output.debugWarn("Failed to write to memfd: EOF", .{}); + _ = bun.sys.close(fd); + return; + } + written += @intCast(result); + remain = remain[result..]; + }, } + } - stdio_array[i] = .{ .path = store.data.file.pathlike.path }; - return true; + switch (this.*) { + .array_buffer => this.array_buffer.deinit(), + .blob => this.blob.detach(), + else => {}, + } + + this.* = .{ .memfd = fd }; + } + + fn toPosix( + stdio: *@This(), + ) bun.spawn.SpawnOptions.Stdio { + return switch (stdio.*) { + .capture, .pipe, .array_buffer, .blob => .{ .buffer = {} }, + .fd => |fd| .{ .pipe = fd }, + .memfd => |fd| .{ .pipe = fd }, + .path => |pathlike| .{ .path = pathlike.slice() }, + .inherit => .{ .inherit = {} }, + .ignore => .{ .ignore = {} }, + }; + } + + fn toWindows( + stdio: *@This(), + ) bun.spawn.SpawnOptions.Stdio { + return switch (stdio.*) { + .capture, .pipe, .array_buffer, .blob => .{ .buffer = {} }, + .fd => |fd| .{ .pipe = fd }, + .path => |pathlike| .{ .path = pathlike.slice() }, + .inherit => .{ .inherit = {} }, + .ignore => .{ .ignore = {} }, + + .memfd => @panic("This should never happen"), + }; + } + + pub fn asSpawnOption( + stdio: *@This(), + ) bun.spawn.SpawnOptions.Stdio { + if (comptime Environment.isWindows) { + return stdio.toWindows(); + } else { + return stdio.toPosix(); } } - if (i == 1 or i == 2) { - globalThis.throwInvalidArguments("Blobs are immutable, and cannot be used for stdout/stderr", .{}); + pub fn isPiped(self: Stdio) bool { + return switch (self) { + .capture, .array_buffer, .blob, .pipe => true, + else => false, + }; + } + + fn extractStdio( + out_stdio: *Stdio, + globalThis: *JSC.JSGlobalObject, + i: u32, + value: JSValue, + ) bool { + if (value.isEmptyOrUndefinedOrNull()) { + return true; + } + + if (value.isString()) { + const str = value.getZigString(globalThis); + if (str.eqlComptime("inherit")) { + out_stdio.* = Stdio{ .inherit = {} }; + } else if (str.eqlComptime("ignore")) { + out_stdio.* = Stdio{ .ignore = {} }; + } else if (str.eqlComptime("pipe") or str.eqlComptime("overlapped")) { + out_stdio.* = Stdio{ .pipe = {} }; + } else if (str.eqlComptime("ipc")) { + out_stdio.* = Stdio{ .pipe = {} }; // TODO: + } else { + globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'pipe', 'ignore', Bun.file(pathOrFd), number, or null", .{}); + return false; + } + + return true; + } else if (value.isNumber()) { + const fd = value.asFileDescriptor(); + if (fd.int() < 0) { + globalThis.throwInvalidArguments("file descriptor must be a positive integer", .{}); + return false; + } + + if (fd.int() >= std.math.maxInt(i32)) { + var formatter = JSC.ConsoleObject.Formatter{ .globalThis = globalThis }; + globalThis.throwInvalidArguments("file descriptor must be a valid integer, received: {}", .{ + value.toFmt(globalThis, &formatter), + }); + return false; + } + + switch (bun.FDTag.get(fd)) { + .stdin => { + if (i == 1 or i == 2) { + globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); + return false; + } + + out_stdio.* = Stdio{ .inherit = {} }; + return true; + }, + + .stdout, .stderr => |tag| { + if (i == 0) { + globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); + return false; + } + + if (i == 1 and tag == .stdout) { + out_stdio.* = .{ .inherit = {} }; + return true; + } else if (i == 2 and tag == .stderr) { + out_stdio.* = .{ .inherit = {} }; + return true; + } + }, + else => {}, + } + + out_stdio.* = Stdio{ .fd = fd }; + + return true; + } else if (value.as(JSC.WebCore.Blob)) |blob| { + return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, out_stdio); + } else if (value.as(JSC.WebCore.Request)) |req| { + req.getBodyValue().toBlobIfPossible(); + return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio); + } else if (value.as(JSC.WebCore.Response)) |req| { + req.getBodyValue().toBlobIfPossible(); + return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio); + } else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |req_const| { + var req = req_const; + if (i == 0) { + if (req.toAnyBlob(globalThis)) |blob| { + return extractStdioBlob(globalThis, blob, i, out_stdio); + } + + switch (req.ptr) { + .File, .Blob => { + globalThis.throwTODO("Support fd/blob backed ReadableStream in spawn stdin. See https://github.com/oven-sh/bun/issues/8049"); + return false; + }, + .Direct, .JavaScript, .Bytes => { + // out_stdio.* = .{ .connect = req }; + globalThis.throwTODO("Re-enable ReadableStream support in spawn stdin. "); + return false; + }, + .Invalid => { + globalThis.throwInvalidArguments("ReadableStream is in invalid state.", .{}); + return false; + }, + } + } + } else if (value.asArrayBuffer(globalThis)) |array_buffer| { + if (array_buffer.slice().len == 0) { + globalThis.throwInvalidArguments("ArrayBuffer cannot be empty", .{}); + return false; + } + + out_stdio.* = .{ + .array_buffer = JSC.ArrayBuffer.Strong{ + .array_buffer = array_buffer, + .held = JSC.Strong.create(array_buffer.value, globalThis), + }, + }; + + return true; + } + + globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{}); return false; } - stdio_array[i] = .{ .blob = blob }; - return true; -} + pub fn extractStdioBlob( + globalThis: *JSC.JSGlobalObject, + blob: JSC.WebCore.AnyBlob, + i: u32, + stdio_array: []Stdio, + ) bool { + const fd = bun.stdio(i); + + if (blob.needsToReadFile()) { + if (blob.store()) |store| { + if (store.data.file.pathlike == .fd) { + if (store.data.file.pathlike.fd == fd) { + stdio_array[i] = Stdio{ .inherit = .{} }; + } else { + switch (bun.FDTag.get(i)) { + .stdin => { + if (i == 1 or i == 2) { + globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); + return false; + } + }, + + .stdout, .stderr => { + if (i == 0) { + globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); + return false; + } + }, + else => {}, + } + + stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd }; + } + + return true; + } + + stdio_array[i] = .{ .path = store.data.file.pathlike.path }; + return true; + } + } + + if (i == 1 or i == 2) { + globalThis.throwInvalidArguments("Blobs are immutable, and cannot be used for stdout/stderr", .{}); + return false; + } + + stdio_array[i] = .{ .blob = blob }; + return true; + } +}; pub const WatchFd = if (Environment.isLinux) std.os.fd_t else i32;