From fac5e71a0caefd3e33ce80f3b805b249af45b8b8 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Wed, 13 Aug 2025 20:47:50 -0700 Subject: [PATCH] Split subprocess into more files (#21842) ### What does this PR do? Split subprocess into more files ### How did you verify your code works? check --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- cmake/sources/ZigSources.txt | 5 + src/bun.js/api/bun/subprocess.zig | 944 +----------------- src/bun.js/api/bun/subprocess/Readable.zig | 195 ++++ .../api/bun/subprocess/ResourceUsage.zig | 75 ++ .../api/bun/subprocess/StaticPipeWriter.zig | 139 +++ .../bun/subprocess/SubprocessPipeReader.zig | 225 +++++ src/bun.js/api/bun/subprocess/Writable.zig | 334 +++++++ 7 files changed, 995 insertions(+), 922 deletions(-) create mode 100644 src/bun.js/api/bun/subprocess/Readable.zig create mode 100644 src/bun.js/api/bun/subprocess/ResourceUsage.zig create mode 100644 src/bun.js/api/bun/subprocess/StaticPipeWriter.zig create mode 100644 src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig create mode 100644 src/bun.js/api/bun/subprocess/Writable.zig diff --git a/cmake/sources/ZigSources.txt b/cmake/sources/ZigSources.txt index bdd16357fb..d32a670427 100644 --- a/cmake/sources/ZigSources.txt +++ b/cmake/sources/ZigSources.txt @@ -98,6 +98,11 @@ src/bun.js/api/bun/spawn.zig src/bun.js/api/bun/spawn/stdio.zig src/bun.js/api/bun/ssl_wrapper.zig src/bun.js/api/bun/subprocess.zig +src/bun.js/api/bun/subprocess/Readable.zig +src/bun.js/api/bun/subprocess/ResourceUsage.zig +src/bun.js/api/bun/subprocess/StaticPipeWriter.zig +src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig +src/bun.js/api/bun/subprocess/Writable.zig src/bun.js/api/bun/udp_socket.zig src/bun.js/api/bun/x509.zig src/bun.js/api/BunObject.zig diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index a1af7d9f42..8a494603bb 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -84,69 +84,7 @@ pub inline fn assertStdioResult(result: StdioResult) void { } } -pub const ResourceUsage = struct { - pub const js = jsc.Codegen.JSResourceUsage; - pub const toJS = ResourceUsage.js.toJS; - pub const fromJS = ResourceUsage.js.fromJS; - pub const fromJSDirect = ResourceUsage.js.fromJSDirect; - - rusage: Rusage, - - pub fn getCPUTime(this: *ResourceUsage, globalObject: *JSGlobalObject) bun.JSError!JSValue { - var cpu = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject); - const rusage = this.rusage; - - const usrTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.utime.usec, rusage.utime.sec); - const sysTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.stime.usec, rusage.stime.sec); - - cpu.put(globalObject, jsc.ZigString.static("user"), usrTime); - cpu.put(globalObject, jsc.ZigString.static("system"), sysTime); - cpu.put(globalObject, jsc.ZigString.static("total"), JSValue.bigIntSum(globalObject, usrTime, sysTime)); - - return cpu; - } - - pub fn getMaxRSS(this: *ResourceUsage, _: *JSGlobalObject) JSValue { - return jsc.JSValue.jsNumber(this.rusage.maxrss); - } - - pub fn getSharedMemorySize(this: *ResourceUsage, _: *JSGlobalObject) JSValue { - return jsc.JSValue.jsNumber(this.rusage.ixrss); - } - - pub fn getSwapCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue { - return jsc.JSValue.jsNumber(this.rusage.nswap); - } - - pub fn getOps(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue { - var ops = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject); - ops.put(globalObject, jsc.ZigString.static("in"), jsc.JSValue.jsNumber(this.rusage.inblock)); - ops.put(globalObject, jsc.ZigString.static("out"), jsc.JSValue.jsNumber(this.rusage.oublock)); - return ops; - } - - pub fn getMessages(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue { - var msgs = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject); - msgs.put(globalObject, jsc.ZigString.static("sent"), jsc.JSValue.jsNumber(this.rusage.msgsnd)); - msgs.put(globalObject, jsc.ZigString.static("received"), jsc.JSValue.jsNumber(this.rusage.msgrcv)); - return msgs; - } - - pub fn getSignalCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue { - return jsc.JSValue.jsNumber(this.rusage.nsignals); - } - - pub fn getContextSwitches(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue { - var ctx = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject); - ctx.put(globalObject, jsc.ZigString.static("voluntary"), jsc.JSValue.jsNumber(this.rusage.nvcsw)); - ctx.put(globalObject, jsc.ZigString.static("involuntary"), jsc.JSValue.jsNumber(this.rusage.nivcsw)); - return ctx; - } - - pub fn finalize(this: *ResourceUsage) callconv(.C) void { - bun.default_allocator.destroy(this); - } -}; +pub const ResourceUsage = @import("./subprocess/ResourceUsage.zig"); pub fn appendEnvpFromJS(globalThis: *jsc.JSGlobalObject, object: *jsc.JSObject, envp: *std.ArrayList(?[*:0]const u8), PATH: *[]const u8) bun.JSError!void { var object_iter = try jsc.JSPropertyIterator(.{ .skip_empty_name = false, .include_value = true }).init(globalThis, object); @@ -207,27 +145,24 @@ pub fn resourceUsage( return this.createResourceUsageObject(globalObject); } -pub fn createResourceUsageObject(this: *Subprocess, globalObject: *JSGlobalObject) JSValue { - const pid_rusage = this.pid_rusage orelse brk: { - if (Environment.isWindows) { - if (this.process.poller == .uv) { - this.pid_rusage = PosixSpawn.process.uv_getrusage(&this.process.poller.uv); - break :brk this.pid_rusage.?; +pub fn createResourceUsageObject(this: *Subprocess, globalObject: *JSGlobalObject) bun.JSError!JSValue { + return ResourceUsage.create( + brk: { + if (this.pid_rusage != null) { + break :brk &this.pid_rusage.?; } - } - return .js_undefined; - }; + if (Environment.isWindows) { + if (this.process.poller == .uv) { + this.pid_rusage = PosixSpawn.process.uv_getrusage(&this.process.poller.uv); + break :brk &this.pid_rusage.?; + } + } - const resource_usage = ResourceUsage{ - .rusage = pid_rusage, - }; - - var result = bun.default_allocator.create(ResourceUsage) catch { - return globalObject.throwOutOfMemoryValue(); - }; - result.* = resource_usage; - return result.toJS(globalObject); + return .js_undefined; + }, + globalObject, + ); } pub fn hasExited(this: *const Subprocess) bool { @@ -357,183 +292,8 @@ pub fn constructor(globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSE return globalObject.throw("Cannot construct Subprocess", .{}); } -const Readable = union(enum) { - fd: bun.FileDescriptor, - memfd: bun.FileDescriptor, - pipe: *PipeReader, - inherit: void, - ignore: void, - closed: void, - /// Eventually we will implement Readables created from blobs and array buffers. - /// When we do that, `buffer` will be borrowed from those objects. - /// - /// When a buffered `pipe` finishes reading from its file descriptor, - /// the owning `Readable` will be convered into this variant and the pipe's - /// buffer will be taken as an owned `CowString`. - buffer: CowString, - - pub fn memoryCost(this: *const Readable) usize { - return switch (this.*) { - .pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(), - .buffer => this.buffer.length(), - else => 0, - }; - } - - pub fn hasPendingActivity(this: *const Readable) bool { - return switch (this.*) { - .pipe => this.pipe.hasPendingActivity(), - else => false, - }; - } - - pub fn ref(this: *Readable) void { - switch (this.*) { - .pipe => { - this.pipe.updateRef(true); - }, - else => {}, - } - } - - pub fn unref(this: *Readable) void { - switch (this.*) { - .pipe => { - this.pipe.updateRef(false); - }, - else => {}, - } - } - - pub fn init(stdio: Stdio, event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, allocator: std.mem.Allocator, max_size: ?*MaxBuf, is_sync: bool) Readable { - _ = allocator; // autofix - _ = is_sync; // autofix - assertStdioResult(result); - - if (comptime Environment.isPosix) { - if (stdio == .pipe) { - _ = bun.sys.setNonblocking(result.?); - } - } - - return switch (stdio) { - .inherit => Readable{ .inherit = {} }, - .ignore, .ipc, .path => Readable{ .ignore = {} }, - .fd => |fd| if (Environment.isPosix) Readable{ .fd = result.? } else Readable{ .fd = fd }, - .memfd => if (Environment.isPosix) Readable{ .memfd = stdio.memfd } else Readable{ .ignore = {} }, - .dup2 => |dup2| if (Environment.isPosix) Output.panic("TODO: implement dup2 support in Stdio readable", .{}) else Readable{ .fd = dup2.out.toFd() }, - .pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, max_size) }, - .array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}), - .capture => Output.panic("TODO: implement capture support in Stdio readable", .{}), - .readable_stream => Readable{ .ignore = {} }, // ReadableStream is handled separately - }; - } - - pub fn onClose(this: *Readable, _: ?bun.sys.Error) void { - this.* = .closed; - } - - pub fn onReady(_: *Readable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {} - - pub fn onStart(_: *Readable) void {} - - pub fn close(this: *Readable) void { - switch (this.*) { - .memfd => |fd| { - this.* = .{ .closed = {} }; - fd.close(); - }, - .fd => |_| { - this.* = .{ .closed = {} }; - }, - .pipe => { - this.pipe.close(); - }, - else => {}, - } - } - - pub fn finalize(this: *Readable) void { - switch (this.*) { - .memfd => |fd| { - this.* = .{ .closed = {} }; - fd.close(); - }, - .fd => { - this.* = .{ .closed = {} }; - }, - .pipe => |pipe| { - defer pipe.detach(); - this.* = .{ .closed = {} }; - }, - .buffer => |*buf| { - buf.deinit(bun.default_allocator); - }, - else => {}, - } - } - - pub fn toJS(this: *Readable, globalThis: *jsc.JSGlobalObject, exited: bool) bun.JSError!JSValue { - _ = exited; // autofix - switch (this.*) { - // should only be reachable when the entire output is buffered. - .memfd => return this.toBufferedValue(globalThis), - - .fd => |fd| { - return fd.toJS(globalThis); - }, - .pipe => |pipe| { - defer pipe.detach(); - this.* = .{ .closed = {} }; - return pipe.toJS(globalThis); - }, - .buffer => |*buffer| { - defer this.* = .{ .closed = {} }; - - if (buffer.length() == 0) { - return jsc.WebCore.ReadableStream.empty(globalThis); - } - - const own = try buffer.takeSlice(bun.default_allocator); - return jsc.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0); - }, - else => { - return .js_undefined; - }, - } - } - - pub fn toBufferedValue(this: *Readable, globalThis: *jsc.JSGlobalObject) bun.JSError!JSValue { - switch (this.*) { - .fd => |fd| { - return fd.toJS(globalThis); - }, - .memfd => |fd| { - if (comptime !Environment.isPosix) { - Output.panic("memfd is only supported on Linux", .{}); - } - this.* = .{ .closed = {} }; - return jsc.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis); - }, - .pipe => |pipe| { - defer pipe.detach(); - this.* = .{ .closed = {} }; - return pipe.toBuffer(globalThis); - }, - .buffer => |*buf| { - defer this.* = .{ .closed = {} }; - const own = buf.takeSlice(bun.default_allocator) catch { - return globalThis.throwOutOfMemory(); - }; - - return jsc.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); - }, - else => { - return .js_undefined; - }, - } - } -}; +pub const PipeReader = @import("./subprocess/SubprocessPipeReader.zig"); +pub const Readable = @import("./subprocess/Readable.zig").Readable; pub fn getStderr(this: *Subprocess, globalThis: *JSGlobalObject) bun.JSError!JSValue { this.observable_getters.insert(.stderr); @@ -810,670 +570,9 @@ pub const Source = union(enum) { } }; +pub const NewStaticPipeWriter = @import("./subprocess/StaticPipeWriter.zig").NewStaticPipeWriter; pub const StaticPipeWriter = NewStaticPipeWriter(Subprocess); -pub fn NewStaticPipeWriter(comptime ProcessType: type) type { - return struct { - const This = @This(); - - ref_count: WriterRefCount, - writer: IOWriter = .{}, - stdio_result: StdioResult, - source: Source = .{ .detached = {} }, - process: *ProcessType = undefined, - event_loop: jsc.EventLoopHandle, - buffer: []const u8 = "", - - // It seems there is a bug in the Zig compiler. We'll get back to this one later - const WriterRefCount = bun.ptr.RefCount(@This(), "ref_count", _deinit, .{}); - pub const ref = WriterRefCount.ref; - pub const deref = WriterRefCount.deref; - - const print = bun.Output.scoped(.StaticPipeWriter, .visible); - - pub const IOWriter = bun.io.BufferedWriter(@This(), struct { - pub const onWritable = null; - pub const getBuffer = This.getBuffer; - pub const onClose = This.onClose; - pub const onError = This.onError; - pub const onWrite = This.onWrite; - }); - pub const Poll = IOWriter; - - pub fn updateRef(this: *This, add: bool) void { - this.writer.updateRef(this.event_loop, add); - } - - pub fn getBuffer(this: *This) []const u8 { - return this.buffer; - } - - pub fn close(this: *This) void { - log("StaticPipeWriter(0x{x}) close()", .{@intFromPtr(this)}); - this.writer.close(); - } - - pub fn flush(this: *This) void { - if (this.buffer.len > 0) - this.writer.write(); - } - - pub fn create(event_loop: anytype, subprocess: *ProcessType, result: StdioResult, source: Source) *This { - const this = bun.new(This, .{ - .ref_count = .init(), - .event_loop = jsc.EventLoopHandle.init(event_loop), - .process = subprocess, - .stdio_result = result, - .source = source, - }); - if (Environment.isWindows) { - this.writer.setPipe(this.stdio_result.buffer); - } - this.writer.setParent(this); - return this; - } - - pub fn start(this: *This) bun.sys.Maybe(void) { - log("StaticPipeWriter(0x{x}) start()", .{@intFromPtr(this)}); - this.ref(); - this.buffer = this.source.slice(); - if (Environment.isWindows) { - return this.writer.startWithCurrentPipe(); - } - switch (this.writer.start(this.stdio_result.?, true)) { - .err => |err| { - return .{ .err = err }; - }, - .result => { - if (comptime Environment.isPosix) { - const poll = this.writer.handle.poll; - poll.flags.insert(.socket); - } - - return .success; - }, - } - } - - pub fn onWrite(this: *This, amount: usize, status: bun.io.WriteStatus) void { - log("StaticPipeWriter(0x{x}) onWrite(amount={d} {})", .{ @intFromPtr(this), amount, status }); - this.buffer = this.buffer[@min(amount, this.buffer.len)..]; - if (status == .end_of_file or this.buffer.len == 0) { - this.writer.close(); - } - } - - pub fn onError(this: *This, err: bun.sys.Error) void { - log("StaticPipeWriter(0x{x}) onError(err={any})", .{ @intFromPtr(this), err }); - this.source.detach(); - } - - pub fn onClose(this: *This) void { - log("StaticPipeWriter(0x{x}) onClose()", .{@intFromPtr(this)}); - this.source.detach(); - this.process.onCloseIO(.stdin); - } - - fn _deinit(this: *This) void { - this.writer.end(); - this.source.detach(); - bun.destroy(this); - } - - pub fn memoryCost(this: *const This) usize { - return @sizeOf(@This()) + this.source.memoryCost() + this.writer.memoryCost(); - } - - pub fn loop(this: *This) *uws.Loop { - return this.event_loop.loop(); - } - - pub fn watch(this: *This) void { - if (this.buffer.len > 0) { - this.writer.watch(); - } - } - - pub fn eventLoop(this: *This) jsc.EventLoopHandle { - return this.event_loop; - } - }; -} - -pub const PipeReader = struct { - const RefCount = bun.ptr.RefCount(@This(), "ref_count", PipeReader.deinit, .{}); - pub const ref = PipeReader.RefCount.ref; - pub const deref = PipeReader.RefCount.deref; - - reader: IOReader = undefined, - process: ?*Subprocess = null, - event_loop: *jsc.EventLoop = undefined, - ref_count: PipeReader.RefCount, - state: union(enum) { - pending: void, - done: []u8, - err: bun.sys.Error, - } = .{ .pending = {} }, - stdio_result: StdioResult, - pub const IOReader = bun.io.BufferedReader; - pub const Poll = IOReader; - - pub fn memoryCost(this: *const PipeReader) usize { - return this.reader.memoryCost(); - } - - pub fn hasPendingActivity(this: *const PipeReader) bool { - if (this.state == .pending) - return true; - - return this.reader.hasPendingActivity(); - } - - pub fn detach(this: *PipeReader) void { - this.process = null; - this.deref(); - } - - pub fn create(event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, limit: ?*MaxBuf) *PipeReader { - var this = bun.new(PipeReader, .{ - .ref_count = .init(), - .process = process, - .reader = IOReader.init(@This()), - .event_loop = event_loop, - .stdio_result = result, - }); - MaxBuf.addToPipereader(limit, &this.reader.maxbuf); - if (Environment.isWindows) { - this.reader.source = .{ .pipe = this.stdio_result.buffer }; - } - - this.reader.setParent(this); - return this; - } - - pub fn readAll(this: *PipeReader) void { - if (this.state == .pending) - this.reader.read(); - } - - pub fn start(this: *PipeReader, process: *Subprocess, event_loop: *jsc.EventLoop) bun.sys.Maybe(void) { - this.ref(); - this.process = process; - this.event_loop = event_loop; - if (Environment.isWindows) { - return this.reader.startWithCurrentPipe(); - } - - switch (this.reader.start(this.stdio_result.?, true)) { - .err => |err| { - return .{ .err = err }; - }, - .result => { - if (comptime Environment.isPosix) { - const poll = this.reader.handle.poll; - poll.flags.insert(.socket); - this.reader.flags.socket = true; - this.reader.flags.nonblocking = true; - this.reader.flags.pollable = true; - poll.flags.insert(.nonblocking); - } - - return .success; - }, - } - } - - pub const toJS = toReadableStream; - - pub fn onReaderDone(this: *PipeReader) void { - const owned = this.toOwnedSlice(); - this.state = .{ .done = owned }; - if (this.process) |process| { - this.process = null; - process.onCloseIO(this.kind(process)); - this.deref(); - } - } - - pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind { - if (process.stdout == .pipe and process.stdout.pipe == reader) { - return .stdout; - } - - if (process.stderr == .pipe and process.stderr.pipe == reader) { - return .stderr; - } - - @panic("We should be either stdout or stderr"); - } - - pub fn toOwnedSlice(this: *PipeReader) []u8 { - if (this.state == .done) { - return this.state.done; - } - // we do not use .toOwnedSlice() because we don't want to reallocate memory. - const out = this.reader._buffer; - this.reader._buffer.items = &.{}; - this.reader._buffer.capacity = 0; - - if (out.capacity > 0 and out.items.len == 0) { - out.deinit(); - return &.{}; - } - - return out.items; - } - - pub fn updateRef(this: *PipeReader, add: bool) void { - this.reader.updateRef(add); - } - - pub fn watch(this: *PipeReader) void { - if (!this.reader.isDone()) - this.reader.watch(); - } - - pub fn toReadableStream(this: *PipeReader, globalObject: *jsc.JSGlobalObject) bun.JSError!jsc.JSValue { - defer this.detach(); - - switch (this.state) { - .pending => { - const stream = jsc.WebCore.ReadableStream.fromPipe(globalObject, this, &this.reader); - this.state = .{ .done = &.{} }; - return stream; - }, - .done => |bytes| { - this.state = .{ .done = &.{} }; - return jsc.WebCore.ReadableStream.fromOwnedSlice(globalObject, bytes, 0); - }, - .err => |err| { - _ = err; - const empty = try jsc.WebCore.ReadableStream.empty(globalObject); - jsc.WebCore.ReadableStream.cancel(&(try jsc.WebCore.ReadableStream.fromJS(empty, globalObject)).?, globalObject); - return empty; - }, - } - } - - pub fn toBuffer(this: *PipeReader, globalThis: *jsc.JSGlobalObject) jsc.JSValue { - switch (this.state) { - .done => |bytes| { - defer this.state = .{ .done = &.{} }; - return jsc.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); - }, - else => { - return .js_undefined; - }, - } - } - - pub fn onReaderError(this: *PipeReader, err: bun.sys.Error) void { - if (this.state == .done) { - bun.default_allocator.free(this.state.done); - } - this.state = .{ .err = err }; - if (this.process) |process| - process.onCloseIO(this.kind(process)); - } - - pub fn close(this: *PipeReader) void { - switch (this.state) { - .pending => { - this.reader.close(); - }, - .done => {}, - .err => {}, - } - } - - pub fn eventLoop(this: *PipeReader) *jsc.EventLoop { - return this.event_loop; - } - - pub fn loop(this: *PipeReader) *uws.Loop { - return this.event_loop.virtual_machine.uwsLoop(); - } - - fn deinit(this: *PipeReader) void { - if (comptime Environment.isPosix) { - bun.assert(this.reader.isDone()); - } - - if (comptime Environment.isWindows) { - bun.assert(this.reader.source == null or this.reader.source.?.isClosed()); - } - - if (this.state == .done) { - bun.default_allocator.free(this.state.done); - } - - this.reader.deinit(); - bun.destroy(this); - } -}; - -const Writable = union(enum) { - pipe: *jsc.WebCore.FileSink, - fd: bun.FileDescriptor, - buffer: *StaticPipeWriter, - memfd: bun.FileDescriptor, - inherit: void, - ignore: void, - - pub fn memoryCost(this: *const Writable) usize { - return switch (this.*) { - .pipe => |pipe| pipe.memoryCost(), - .buffer => |buffer| buffer.memoryCost(), - // TODO: memfd - else => 0, - }; - } - - pub fn hasPendingActivity(this: *const Writable) bool { - return switch (this.*) { - .pipe => false, - - // we mark them as .ignore when they are closed, so this must be true - .buffer => true, - else => false, - }; - } - - pub fn ref(this: *Writable) void { - switch (this.*) { - .pipe => { - this.pipe.updateRef(true); - }, - .buffer => { - this.buffer.updateRef(true); - }, - else => {}, - } - } - - pub fn unref(this: *Writable) void { - switch (this.*) { - .pipe => { - this.pipe.updateRef(false); - }, - .buffer => { - this.buffer.updateRef(false); - }, - else => {}, - } - } - - // When the stream has closed we need to be notified to prevent a use-after-free - // We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice - pub fn onClose(this: *Writable, _: ?bun.sys.Error) void { - const process: *Subprocess = @fieldParentPtr("stdin", this); - - if (process.this_jsvalue != .zero) { - if (js.stdinGetCached(process.this_jsvalue)) |existing_value| { - jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0); - } - } - - switch (this.*) { - .buffer => { - this.buffer.deref(); - }, - .pipe => { - this.pipe.deref(); - }, - else => {}, - } - - process.onStdinDestroyed(); - - this.* = .{ - .ignore = {}, - }; - } - pub fn onReady(_: *Writable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {} - pub fn onStart(_: *Writable) void {} - - pub fn init( - stdio: *Stdio, - event_loop: *jsc.EventLoop, - subprocess: *Subprocess, - result: StdioResult, - promise_for_stream: *jsc.JSValue, - ) !Writable { - assertStdioResult(result); - - if (Environment.isWindows) { - switch (stdio.*) { - .pipe, .readable_stream => { - if (result == .buffer) { - const pipe = jsc.WebCore.FileSink.createWithPipe(event_loop, result.buffer); - - switch (pipe.writer.startWithCurrentPipe()) { - .result => {}, - .err => |err| { - _ = err; // autofix - pipe.deref(); - if (stdio.* == .readable_stream) { - stdio.readable_stream.cancel(event_loop.global); - } - return error.UnexpectedCreatingStdin; - }, - } - pipe.writer.setParent(pipe); - subprocess.weak_file_sink_stdin_ptr = pipe; - subprocess.ref(); - subprocess.flags.deref_on_stdin_destroyed = true; - subprocess.flags.has_stdin_destructor_called = false; - - if (stdio.* == .readable_stream) { - const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global); - if (assign_result.toError()) |err| { - pipe.deref(); - subprocess.deref(); - return event_loop.global.throwValue(err); - } - promise_for_stream.* = assign_result; - } - - return Writable{ - .pipe = pipe, - }; - } - return Writable{ .inherit = {} }; - }, - - .blob => |blob| { - return Writable{ - .buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }), - }; - }, - .array_buffer => |array_buffer| { - return Writable{ - .buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }), - }; - }, - .fd => |fd| { - return Writable{ .fd = fd }; - }, - .dup2 => |dup2| { - return Writable{ .fd = dup2.to.toFd() }; - }, - .inherit => { - return Writable{ .inherit = {} }; - }, - .memfd, .path, .ignore => { - return Writable{ .ignore = {} }; - }, - .ipc, .capture => { - return Writable{ .ignore = {} }; - }, - } - } - - if (comptime Environment.isPosix) { - if (stdio.* == .pipe) { - _ = bun.sys.setNonblocking(result.?); - } - } - - switch (stdio.*) { - .dup2 => @panic("TODO dup2 stdio"), - .pipe, .readable_stream => { - const pipe = jsc.WebCore.FileSink.create(event_loop, result.?); - - switch (pipe.writer.start(pipe.fd, true)) { - .result => {}, - .err => |err| { - _ = err; // autofix - pipe.deref(); - if (stdio.* == .readable_stream) { - stdio.readable_stream.cancel(event_loop.global); - } - - return error.UnexpectedCreatingStdin; - }, - } - - pipe.writer.handle.poll.flags.insert(.socket); - - subprocess.weak_file_sink_stdin_ptr = pipe; - subprocess.ref(); - subprocess.flags.has_stdin_destructor_called = false; - subprocess.flags.deref_on_stdin_destroyed = true; - - if (stdio.* == .readable_stream) { - const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global); - if (assign_result.toError()) |err| { - pipe.deref(); - subprocess.deref(); - return event_loop.global.throwValue(err); - } - promise_for_stream.* = assign_result; - } - - return Writable{ - .pipe = pipe, - }; - }, - - .blob => |blob| { - return Writable{ - .buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }), - }; - }, - .array_buffer => |array_buffer| { - return Writable{ - .buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }), - }; - }, - .memfd => |memfd| { - bun.assert(memfd != bun.invalid_fd); - return Writable{ .memfd = memfd }; - }, - .fd => { - return Writable{ .fd = result.? }; - }, - .inherit => { - return Writable{ .inherit = {} }; - }, - .path, .ignore => { - return Writable{ .ignore = {} }; - }, - .ipc, .capture => { - return Writable{ .ignore = {} }; - }, - } - } - - pub fn toJS(this: *Writable, globalThis: *jsc.JSGlobalObject, subprocess: *Subprocess) JSValue { - return switch (this.*) { - .fd => |fd| fd.toJS(globalThis), - .memfd, .ignore => .js_undefined, - .buffer, .inherit => .js_undefined, - .pipe => |pipe| { - this.* = .{ .ignore = {} }; - if (subprocess.process.hasExited() and !subprocess.flags.has_stdin_destructor_called) { - // onAttachedProcessExit() can call deref on the - // subprocess. Since we never called ref(), it would be - // unbalanced to do so, leading to a use-after-free. - // So, let's not do that. - // https://github.com/oven-sh/bun/pull/14092 - bun.debugAssert(!subprocess.flags.deref_on_stdin_destroyed); - const debug_ref_count = if (Environment.isDebug) subprocess.ref_count else 0; - pipe.onAttachedProcessExit(&subprocess.process.status); - if (Environment.isDebug) { - bun.debugAssert(subprocess.ref_count.get() == debug_ref_count.get()); - } - return pipe.toJS(globalThis); - } else { - subprocess.flags.has_stdin_destructor_called = false; - subprocess.weak_file_sink_stdin_ptr = pipe; - subprocess.ref(); - subprocess.flags.deref_on_stdin_destroyed = true; - if (@intFromPtr(pipe.signal.ptr) == @intFromPtr(subprocess)) { - pipe.signal.clear(); - } - return pipe.toJSWithDestructor( - globalThis, - jsc.WebCore.Sink.DestructorPtr.init(subprocess), - ); - } - }, - }; - } - - pub fn finalize(this: *Writable) void { - const subprocess: *Subprocess = @fieldParentPtr("stdin", this); - if (subprocess.this_jsvalue != .zero) { - if (jsc.Codegen.JSSubprocess.stdinGetCached(subprocess.this_jsvalue)) |existing_value| { - jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0); - } - } - - return switch (this.*) { - .pipe => |pipe| { - if (pipe.signal.ptr == @as(*anyopaque, @ptrCast(this))) { - pipe.signal.clear(); - } - - pipe.deref(); - - this.* = .{ .ignore = {} }; - }, - .buffer => { - this.buffer.updateRef(false); - this.buffer.deref(); - }, - .memfd => |fd| { - fd.close(); - this.* = .{ .ignore = {} }; - }, - .ignore => {}, - .fd, .inherit => {}, - }; - } - - pub fn close(this: *Writable) void { - switch (this.*) { - .pipe => |pipe| { - _ = pipe.end(null); - }, - .memfd => |fd| { - fd.close(); - this.* = .{ .ignore = {} }; - }, - .fd => { - this.* = .{ .ignore = {} }; - }, - .buffer => { - this.buffer.close(); - }, - .ignore => {}, - .inherit => {}, - } - } -}; - pub fn memoryCost(this: *const Subprocess) usize { return @sizeOf(@This()) + this.process.memoryCost() + @@ -2618,7 +1717,7 @@ pub fn spawnMaybeSync( const exitCode = subprocess.getExitCode(globalThis); const stdout = try subprocess.stdout.toBufferedValue(globalThis); const stderr = try subprocess.stderr.toBufferedValue(globalThis); - const resource_usage: JSValue = if (!globalThis.hasException()) subprocess.createResourceUsageObject(globalThis) else .zero; + const resource_usage: JSValue = if (!globalThis.hasException()) try subprocess.createResourceUsageObject(globalThis) else .zero; const exitedDueToTimeout = subprocess.event_loop_timer.state == .FIRED; const exitedDueToMaxBuffer = subprocess.exited_due_to_maxbuf; const resultPid = jsc.JSValue.jsNumberFromInt32(subprocess.pid()); @@ -2717,7 +1816,8 @@ pub fn getGlobalThis(this: *Subprocess) ?*jsc.JSGlobalObject { const IPClog = Output.scoped(.IPC, .visible); -const StdioResult = if (Environment.isWindows) bun.spawn.WindowsSpawnResult.StdioResult else ?bun.FileDescriptor; +pub const StdioResult = if (Environment.isWindows) bun.spawn.WindowsSpawnResult.StdioResult else ?bun.FileDescriptor; +pub const Writable = @import("./subprocess/Writable.zig").Writable; pub const MaxBuf = bun.io.MaxBuf; diff --git a/src/bun.js/api/bun/subprocess/Readable.zig b/src/bun.js/api/bun/subprocess/Readable.zig new file mode 100644 index 0000000000..50108c84f6 --- /dev/null +++ b/src/bun.js/api/bun/subprocess/Readable.zig @@ -0,0 +1,195 @@ +pub const Readable = union(enum) { + fd: bun.FileDescriptor, + memfd: bun.FileDescriptor, + pipe: *PipeReader, + inherit: void, + ignore: void, + closed: void, + /// Eventually we will implement Readables created from blobs and array buffers. + /// When we do that, `buffer` will be borrowed from those objects. + /// + /// When a buffered `pipe` finishes reading from its file descriptor, + /// the owning `Readable` will be convered into this variant and the pipe's + /// buffer will be taken as an owned `CowString`. + buffer: CowString, + + pub fn memoryCost(this: *const Readable) usize { + return switch (this.*) { + .pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(), + .buffer => this.buffer.length(), + else => 0, + }; + } + + pub fn hasPendingActivity(this: *const Readable) bool { + return switch (this.*) { + .pipe => this.pipe.hasPendingActivity(), + else => false, + }; + } + + pub fn ref(this: *Readable) void { + switch (this.*) { + .pipe => { + this.pipe.updateRef(true); + }, + else => {}, + } + } + + pub fn unref(this: *Readable) void { + switch (this.*) { + .pipe => { + this.pipe.updateRef(false); + }, + else => {}, + } + } + + pub fn init(stdio: Stdio, event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, allocator: std.mem.Allocator, max_size: ?*MaxBuf, is_sync: bool) Readable { + _ = allocator; // autofix + _ = is_sync; // autofix + Subprocess.assertStdioResult(result); + + if (comptime Environment.isPosix) { + if (stdio == .pipe) { + _ = bun.sys.setNonblocking(result.?); + } + } + + return switch (stdio) { + .inherit => Readable{ .inherit = {} }, + .ignore, .ipc, .path => Readable{ .ignore = {} }, + .fd => |fd| if (Environment.isPosix) Readable{ .fd = result.? } else Readable{ .fd = fd }, + .memfd => if (Environment.isPosix) Readable{ .memfd = stdio.memfd } else Readable{ .ignore = {} }, + .dup2 => |dup2| if (Environment.isPosix) Output.panic("TODO: implement dup2 support in Stdio readable", .{}) else Readable{ .fd = dup2.out.toFd() }, + .pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, max_size) }, + .array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}), + .capture => Output.panic("TODO: implement capture support in Stdio readable", .{}), + .readable_stream => Readable{ .ignore = {} }, // ReadableStream is handled separately + }; + } + + pub fn onClose(this: *Readable, _: ?bun.sys.Error) void { + this.* = .closed; + } + + pub fn onReady(_: *Readable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {} + + pub fn onStart(_: *Readable) void {} + + pub fn close(this: *Readable) void { + switch (this.*) { + .memfd => |fd| { + this.* = .{ .closed = {} }; + fd.close(); + }, + .fd => |_| { + this.* = .{ .closed = {} }; + }, + .pipe => { + this.pipe.close(); + }, + else => {}, + } + } + + pub fn finalize(this: *Readable) void { + switch (this.*) { + .memfd => |fd| { + this.* = .{ .closed = {} }; + fd.close(); + }, + .fd => { + this.* = .{ .closed = {} }; + }, + .pipe => |pipe| { + defer pipe.detach(); + this.* = .{ .closed = {} }; + }, + .buffer => |*buf| { + buf.deinit(bun.default_allocator); + }, + else => {}, + } + } + + pub fn toJS(this: *Readable, globalThis: *jsc.JSGlobalObject, exited: bool) bun.JSError!JSValue { + _ = exited; // autofix + switch (this.*) { + // should only be reachable when the entire output is buffered. + .memfd => return this.toBufferedValue(globalThis), + + .fd => |fd| { + return fd.toJS(globalThis); + }, + .pipe => |pipe| { + defer pipe.detach(); + this.* = .{ .closed = {} }; + return pipe.toJS(globalThis); + }, + .buffer => |*buffer| { + defer this.* = .{ .closed = {} }; + + if (buffer.length() == 0) { + return jsc.WebCore.ReadableStream.empty(globalThis); + } + + const own = try buffer.takeSlice(bun.default_allocator); + return jsc.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0); + }, + else => { + return .js_undefined; + }, + } + } + + pub fn toBufferedValue(this: *Readable, globalThis: *jsc.JSGlobalObject) bun.JSError!JSValue { + switch (this.*) { + .fd => |fd| { + return fd.toJS(globalThis); + }, + .memfd => |fd| { + if (comptime !Environment.isPosix) { + Output.panic("memfd is only supported on Linux", .{}); + } + this.* = .{ .closed = {} }; + return jsc.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis); + }, + .pipe => |pipe| { + defer pipe.detach(); + this.* = .{ .closed = {} }; + return pipe.toBuffer(globalThis); + }, + .buffer => |*buf| { + defer this.* = .{ .closed = {} }; + const own = buf.takeSlice(bun.default_allocator) catch { + return globalThis.throwOutOfMemory(); + }; + + return jsc.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); + }, + else => { + return .js_undefined; + }, + } + } +}; + +const std = @import("std"); + +const bun = @import("bun"); +const Environment = bun.Environment; +const Output = bun.Output; +const default_allocator = bun.default_allocator; +const CowString = bun.ptr.CowString; +const Stdio = bun.spawn.Stdio; + +const jsc = bun.jsc; +const JSGlobalObject = jsc.JSGlobalObject; +const JSValue = jsc.JSValue; + +const Subprocess = jsc.API.Subprocess; +const MaxBuf = Subprocess.MaxBuf; +const PipeReader = Subprocess.PipeReader; +const StdioResult = Subprocess.StdioResult; diff --git a/src/bun.js/api/bun/subprocess/ResourceUsage.zig b/src/bun.js/api/bun/subprocess/ResourceUsage.zig new file mode 100644 index 0000000000..cc3762ca09 --- /dev/null +++ b/src/bun.js/api/bun/subprocess/ResourceUsage.zig @@ -0,0 +1,75 @@ +const ResourceUsage = @This(); + +pub const js = jsc.Codegen.JSResourceUsage; +pub const toJS = ResourceUsage.js.toJS; +pub const fromJS = ResourceUsage.js.fromJS; +pub const fromJSDirect = ResourceUsage.js.fromJSDirect; + +rusage: Rusage, + +pub fn create(rusage: *const Rusage, globalObject: *JSGlobalObject) bun.JSError!JSValue { + return bun.new(ResourceUsage, .{ .rusage = rusage.* }).toJS(globalObject); +} + +pub fn getCPUTime(this: *ResourceUsage, globalObject: *JSGlobalObject) bun.JSError!JSValue { + var cpu = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject); + const rusage = this.rusage; + + const usrTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.utime.usec, rusage.utime.sec); + const sysTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.stime.usec, rusage.stime.sec); + + cpu.put(globalObject, jsc.ZigString.static("user"), usrTime); + cpu.put(globalObject, jsc.ZigString.static("system"), sysTime); + cpu.put(globalObject, jsc.ZigString.static("total"), JSValue.bigIntSum(globalObject, usrTime, sysTime)); + + return cpu; +} + +pub fn getMaxRSS(this: *ResourceUsage, _: *JSGlobalObject) JSValue { + return jsc.JSValue.jsNumber(this.rusage.maxrss); +} + +pub fn getSharedMemorySize(this: *ResourceUsage, _: *JSGlobalObject) JSValue { + return jsc.JSValue.jsNumber(this.rusage.ixrss); +} + +pub fn getSwapCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue { + return jsc.JSValue.jsNumber(this.rusage.nswap); +} + +pub fn getOps(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue { + var ops = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject); + ops.put(globalObject, jsc.ZigString.static("in"), jsc.JSValue.jsNumber(this.rusage.inblock)); + ops.put(globalObject, jsc.ZigString.static("out"), jsc.JSValue.jsNumber(this.rusage.oublock)); + return ops; +} + +pub fn getMessages(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue { + var msgs = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject); + msgs.put(globalObject, jsc.ZigString.static("sent"), jsc.JSValue.jsNumber(this.rusage.msgsnd)); + msgs.put(globalObject, jsc.ZigString.static("received"), jsc.JSValue.jsNumber(this.rusage.msgrcv)); + return msgs; +} + +pub fn getSignalCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue { + return jsc.JSValue.jsNumber(this.rusage.nsignals); +} + +pub fn getContextSwitches(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue { + var ctx = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject); + ctx.put(globalObject, jsc.ZigString.static("voluntary"), jsc.JSValue.jsNumber(this.rusage.nvcsw)); + ctx.put(globalObject, jsc.ZigString.static("involuntary"), jsc.JSValue.jsNumber(this.rusage.nivcsw)); + return ctx; +} + +pub fn finalize(this: *ResourceUsage) callconv(.C) void { + bun.default_allocator.destroy(this); +} + +const bun = @import("bun"); +const default_allocator = bun.default_allocator; +const Rusage = bun.spawn.Rusage; + +const jsc = bun.jsc; +const JSGlobalObject = jsc.JSGlobalObject; +const JSValue = jsc.JSValue; diff --git a/src/bun.js/api/bun/subprocess/StaticPipeWriter.zig b/src/bun.js/api/bun/subprocess/StaticPipeWriter.zig new file mode 100644 index 0000000000..4679de7699 --- /dev/null +++ b/src/bun.js/api/bun/subprocess/StaticPipeWriter.zig @@ -0,0 +1,139 @@ +pub fn NewStaticPipeWriter(comptime ProcessType: type) type { + return struct { + const This = @This(); + + ref_count: WriterRefCount, + writer: IOWriter = .{}, + stdio_result: StdioResult, + source: Source = .{ .detached = {} }, + process: *ProcessType = undefined, + event_loop: jsc.EventLoopHandle, + buffer: []const u8 = "", + + // It seems there is a bug in the Zig compiler. We'll get back to this one later + const WriterRefCount = bun.ptr.RefCount(@This(), "ref_count", _deinit, .{}); + pub const ref = WriterRefCount.ref; + pub const deref = WriterRefCount.deref; + + const print = bun.Output.scoped(.StaticPipeWriter, .visible); + + pub const IOWriter = bun.io.BufferedWriter(@This(), struct { + pub const onWritable = null; + pub const getBuffer = This.getBuffer; + pub const onClose = This.onClose; + pub const onError = This.onError; + pub const onWrite = This.onWrite; + }); + pub const Poll = IOWriter; + + pub fn updateRef(this: *This, add: bool) void { + this.writer.updateRef(this.event_loop, add); + } + + pub fn getBuffer(this: *This) []const u8 { + return this.buffer; + } + + pub fn close(this: *This) void { + log("StaticPipeWriter(0x{x}) close()", .{@intFromPtr(this)}); + this.writer.close(); + } + + pub fn flush(this: *This) void { + if (this.buffer.len > 0) + this.writer.write(); + } + + pub fn create(event_loop: anytype, subprocess: *ProcessType, result: StdioResult, source: Source) *This { + const this = bun.new(This, .{ + .ref_count = .init(), + .event_loop = jsc.EventLoopHandle.init(event_loop), + .process = subprocess, + .stdio_result = result, + .source = source, + }); + if (Environment.isWindows) { + this.writer.setPipe(this.stdio_result.buffer); + } + this.writer.setParent(this); + return this; + } + + pub fn start(this: *This) bun.sys.Maybe(void) { + log("StaticPipeWriter(0x{x}) start()", .{@intFromPtr(this)}); + this.ref(); + this.buffer = this.source.slice(); + if (Environment.isWindows) { + return this.writer.startWithCurrentPipe(); + } + switch (this.writer.start(this.stdio_result.?, true)) { + .err => |err| { + return .{ .err = err }; + }, + .result => { + if (comptime Environment.isPosix) { + const poll = this.writer.handle.poll; + poll.flags.insert(.socket); + } + + return .success; + }, + } + } + + pub fn onWrite(this: *This, amount: usize, status: bun.io.WriteStatus) void { + log("StaticPipeWriter(0x{x}) onWrite(amount={d} {})", .{ @intFromPtr(this), amount, status }); + this.buffer = this.buffer[@min(amount, this.buffer.len)..]; + if (status == .end_of_file or this.buffer.len == 0) { + this.writer.close(); + } + } + + pub fn onError(this: *This, err: bun.sys.Error) void { + log("StaticPipeWriter(0x{x}) onError(err={any})", .{ @intFromPtr(this), err }); + this.source.detach(); + } + + pub fn onClose(this: *This) void { + log("StaticPipeWriter(0x{x}) onClose()", .{@intFromPtr(this)}); + this.source.detach(); + this.process.onCloseIO(.stdin); + } + + fn _deinit(this: *This) void { + this.writer.end(); + this.source.detach(); + bun.destroy(this); + } + + pub fn memoryCost(this: *const This) usize { + return @sizeOf(@This()) + this.source.memoryCost() + this.writer.memoryCost(); + } + + pub fn loop(this: *This) *uws.Loop { + return this.event_loop.loop(); + } + + pub fn watch(this: *This) void { + if (this.buffer.len > 0) { + this.writer.watch(); + } + } + + pub fn eventLoop(this: *This) jsc.EventLoopHandle { + return this.event_loop; + } + }; +} + +const log = Output.scoped(.StaticPipeWriter, .hidden); + +const bun = @import("bun"); +const Environment = bun.Environment; +const Output = bun.Output; +const jsc = bun.jsc; +const uws = bun.uws; + +const Subprocess = jsc.API.Subprocess; +const Source = Subprocess.Source; +const StdioResult = Subprocess.StdioResult; diff --git a/src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig b/src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig new file mode 100644 index 0000000000..2dde5e875b --- /dev/null +++ b/src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig @@ -0,0 +1,225 @@ +const PipeReader = @This(); + +const RefCount = bun.ptr.RefCount(@This(), "ref_count", PipeReader.deinit, .{}); +pub const ref = PipeReader.RefCount.ref; +pub const deref = PipeReader.RefCount.deref; + +reader: IOReader = undefined, +process: ?*Subprocess = null, +event_loop: *jsc.EventLoop = undefined, +ref_count: PipeReader.RefCount, +state: union(enum) { + pending: void, + done: []u8, + err: bun.sys.Error, +} = .{ .pending = {} }, +stdio_result: StdioResult, +pub const IOReader = bun.io.BufferedReader; +pub const Poll = IOReader; + +pub fn memoryCost(this: *const PipeReader) usize { + return this.reader.memoryCost(); +} + +pub fn hasPendingActivity(this: *const PipeReader) bool { + if (this.state == .pending) + return true; + + return this.reader.hasPendingActivity(); +} + +pub fn detach(this: *PipeReader) void { + this.process = null; + this.deref(); +} + +pub fn create(event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, limit: ?*MaxBuf) *PipeReader { + var this = bun.new(PipeReader, .{ + .ref_count = .init(), + .process = process, + .reader = IOReader.init(@This()), + .event_loop = event_loop, + .stdio_result = result, + }); + MaxBuf.addToPipereader(limit, &this.reader.maxbuf); + if (Environment.isWindows) { + this.reader.source = .{ .pipe = this.stdio_result.buffer }; + } + + this.reader.setParent(this); + return this; +} + +pub fn readAll(this: *PipeReader) void { + if (this.state == .pending) + this.reader.read(); +} + +pub fn start(this: *PipeReader, process: *Subprocess, event_loop: *jsc.EventLoop) bun.sys.Maybe(void) { + this.ref(); + this.process = process; + this.event_loop = event_loop; + if (Environment.isWindows) { + return this.reader.startWithCurrentPipe(); + } + + switch (this.reader.start(this.stdio_result.?, true)) { + .err => |err| { + return .{ .err = err }; + }, + .result => { + if (comptime Environment.isPosix) { + const poll = this.reader.handle.poll; + poll.flags.insert(.socket); + this.reader.flags.socket = true; + this.reader.flags.nonblocking = true; + this.reader.flags.pollable = true; + poll.flags.insert(.nonblocking); + } + + return .success; + }, + } +} + +pub const toJS = toReadableStream; + +pub fn onReaderDone(this: *PipeReader) void { + const owned = this.toOwnedSlice(); + this.state = .{ .done = owned }; + if (this.process) |process| { + this.process = null; + process.onCloseIO(this.kind(process)); + this.deref(); + } +} + +pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind { + if (process.stdout == .pipe and process.stdout.pipe == reader) { + return .stdout; + } + + if (process.stderr == .pipe and process.stderr.pipe == reader) { + return .stderr; + } + + @panic("We should be either stdout or stderr"); +} + +pub fn toOwnedSlice(this: *PipeReader) []u8 { + if (this.state == .done) { + return this.state.done; + } + // we do not use .toOwnedSlice() because we don't want to reallocate memory. + const out = this.reader._buffer; + this.reader._buffer.items = &.{}; + this.reader._buffer.capacity = 0; + + if (out.capacity > 0 and out.items.len == 0) { + out.deinit(); + return &.{}; + } + + return out.items; +} + +pub fn updateRef(this: *PipeReader, add: bool) void { + this.reader.updateRef(add); +} + +pub fn watch(this: *PipeReader) void { + if (!this.reader.isDone()) + this.reader.watch(); +} + +pub fn toReadableStream(this: *PipeReader, globalObject: *jsc.JSGlobalObject) bun.JSError!jsc.JSValue { + defer this.detach(); + + switch (this.state) { + .pending => { + const stream = jsc.WebCore.ReadableStream.fromPipe(globalObject, this, &this.reader); + this.state = .{ .done = &.{} }; + return stream; + }, + .done => |bytes| { + this.state = .{ .done = &.{} }; + return jsc.WebCore.ReadableStream.fromOwnedSlice(globalObject, bytes, 0); + }, + .err => |err| { + _ = err; + const empty = try jsc.WebCore.ReadableStream.empty(globalObject); + jsc.WebCore.ReadableStream.cancel(&(try jsc.WebCore.ReadableStream.fromJS(empty, globalObject)).?, globalObject); + return empty; + }, + } +} + +pub fn toBuffer(this: *PipeReader, globalThis: *jsc.JSGlobalObject) jsc.JSValue { + switch (this.state) { + .done => |bytes| { + defer this.state = .{ .done = &.{} }; + return jsc.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); + }, + else => { + return .js_undefined; + }, + } +} + +pub fn onReaderError(this: *PipeReader, err: bun.sys.Error) void { + if (this.state == .done) { + bun.default_allocator.free(this.state.done); + } + this.state = .{ .err = err }; + if (this.process) |process| + process.onCloseIO(this.kind(process)); +} + +pub fn close(this: *PipeReader) void { + switch (this.state) { + .pending => { + this.reader.close(); + }, + .done => {}, + .err => {}, + } +} + +pub fn eventLoop(this: *PipeReader) *jsc.EventLoop { + return this.event_loop; +} + +pub fn loop(this: *PipeReader) *uws.Loop { + return this.event_loop.virtual_machine.uwsLoop(); +} + +fn deinit(this: *PipeReader) void { + if (comptime Environment.isPosix) { + bun.assert(this.reader.isDone()); + } + + if (comptime Environment.isWindows) { + bun.assert(this.reader.source == null or this.reader.source.?.isClosed()); + } + + if (this.state == .done) { + bun.default_allocator.free(this.state.done); + } + + this.reader.deinit(); + bun.destroy(this); +} + +const bun = @import("bun"); +const Environment = bun.Environment; +const default_allocator = bun.default_allocator; +const uws = bun.uws; + +const jsc = bun.jsc; +const JSGlobalObject = jsc.JSGlobalObject; +const JSValue = jsc.JSValue; + +const Subprocess = jsc.API.Subprocess; +const MaxBuf = Subprocess.MaxBuf; +const StdioKind = Subprocess.StdioKind; +const StdioResult = Subprocess.StdioResult; diff --git a/src/bun.js/api/bun/subprocess/Writable.zig b/src/bun.js/api/bun/subprocess/Writable.zig new file mode 100644 index 0000000000..47e61ec1b4 --- /dev/null +++ b/src/bun.js/api/bun/subprocess/Writable.zig @@ -0,0 +1,334 @@ +pub const Writable = union(enum) { + pipe: *jsc.WebCore.FileSink, + fd: bun.FileDescriptor, + buffer: *StaticPipeWriter, + memfd: bun.FileDescriptor, + inherit: void, + ignore: void, + + pub fn memoryCost(this: *const Writable) usize { + return switch (this.*) { + .pipe => |pipe| pipe.memoryCost(), + .buffer => |buffer| buffer.memoryCost(), + // TODO: memfd + else => 0, + }; + } + + pub fn hasPendingActivity(this: *const Writable) bool { + return switch (this.*) { + .pipe => false, + + // we mark them as .ignore when they are closed, so this must be true + .buffer => true, + else => false, + }; + } + + pub fn ref(this: *Writable) void { + switch (this.*) { + .pipe => { + this.pipe.updateRef(true); + }, + .buffer => { + this.buffer.updateRef(true); + }, + else => {}, + } + } + + pub fn unref(this: *Writable) void { + switch (this.*) { + .pipe => { + this.pipe.updateRef(false); + }, + .buffer => { + this.buffer.updateRef(false); + }, + else => {}, + } + } + + // When the stream has closed we need to be notified to prevent a use-after-free + // We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice + pub fn onClose(this: *Writable, _: ?bun.sys.Error) void { + const process: *Subprocess = @fieldParentPtr("stdin", this); + + if (process.this_jsvalue != .zero) { + if (js.stdinGetCached(process.this_jsvalue)) |existing_value| { + jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0); + } + } + + switch (this.*) { + .buffer => { + this.buffer.deref(); + }, + .pipe => { + this.pipe.deref(); + }, + else => {}, + } + + process.onStdinDestroyed(); + + this.* = .{ + .ignore = {}, + }; + } + pub fn onReady(_: *Writable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {} + pub fn onStart(_: *Writable) void {} + + pub fn init( + stdio: *Stdio, + event_loop: *jsc.EventLoop, + subprocess: *Subprocess, + result: StdioResult, + promise_for_stream: *jsc.JSValue, + ) !Writable { + Subprocess.assertStdioResult(result); + + if (Environment.isWindows) { + switch (stdio.*) { + .pipe, .readable_stream => { + if (result == .buffer) { + const pipe = jsc.WebCore.FileSink.createWithPipe(event_loop, result.buffer); + + switch (pipe.writer.startWithCurrentPipe()) { + .result => {}, + .err => |err| { + _ = err; // autofix + pipe.deref(); + if (stdio.* == .readable_stream) { + stdio.readable_stream.cancel(event_loop.global); + } + return error.UnexpectedCreatingStdin; + }, + } + pipe.writer.setParent(pipe); + subprocess.weak_file_sink_stdin_ptr = pipe; + subprocess.ref(); + subprocess.flags.deref_on_stdin_destroyed = true; + subprocess.flags.has_stdin_destructor_called = false; + + if (stdio.* == .readable_stream) { + const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global); + if (assign_result.toError()) |err| { + pipe.deref(); + subprocess.deref(); + return event_loop.global.throwValue(err); + } + promise_for_stream.* = assign_result; + } + + return Writable{ + .pipe = pipe, + }; + } + return Writable{ .inherit = {} }; + }, + + .blob => |blob| { + return Writable{ + .buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }), + }; + }, + .array_buffer => |array_buffer| { + return Writable{ + .buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }), + }; + }, + .fd => |fd| { + return Writable{ .fd = fd }; + }, + .dup2 => |dup2| { + return Writable{ .fd = dup2.to.toFd() }; + }, + .inherit => { + return Writable{ .inherit = {} }; + }, + .memfd, .path, .ignore => { + return Writable{ .ignore = {} }; + }, + .ipc, .capture => { + return Writable{ .ignore = {} }; + }, + } + } + + if (comptime Environment.isPosix) { + if (stdio.* == .pipe) { + _ = bun.sys.setNonblocking(result.?); + } + } + + switch (stdio.*) { + .dup2 => @panic("TODO dup2 stdio"), + .pipe, .readable_stream => { + const pipe = jsc.WebCore.FileSink.create(event_loop, result.?); + + switch (pipe.writer.start(pipe.fd, true)) { + .result => {}, + .err => |err| { + _ = err; // autofix + pipe.deref(); + if (stdio.* == .readable_stream) { + stdio.readable_stream.cancel(event_loop.global); + } + + return error.UnexpectedCreatingStdin; + }, + } + + pipe.writer.handle.poll.flags.insert(.socket); + + subprocess.weak_file_sink_stdin_ptr = pipe; + subprocess.ref(); + subprocess.flags.has_stdin_destructor_called = false; + subprocess.flags.deref_on_stdin_destroyed = true; + + if (stdio.* == .readable_stream) { + const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global); + if (assign_result.toError()) |err| { + pipe.deref(); + subprocess.deref(); + return event_loop.global.throwValue(err); + } + promise_for_stream.* = assign_result; + } + + return Writable{ + .pipe = pipe, + }; + }, + + .blob => |blob| { + return Writable{ + .buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }), + }; + }, + .array_buffer => |array_buffer| { + return Writable{ + .buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }), + }; + }, + .memfd => |memfd| { + bun.assert(memfd != bun.invalid_fd); + return Writable{ .memfd = memfd }; + }, + .fd => { + return Writable{ .fd = result.? }; + }, + .inherit => { + return Writable{ .inherit = {} }; + }, + .path, .ignore => { + return Writable{ .ignore = {} }; + }, + .ipc, .capture => { + return Writable{ .ignore = {} }; + }, + } + } + + pub fn toJS(this: *Writable, globalThis: *jsc.JSGlobalObject, subprocess: *Subprocess) JSValue { + return switch (this.*) { + .fd => |fd| fd.toJS(globalThis), + .memfd, .ignore => .js_undefined, + .buffer, .inherit => .js_undefined, + .pipe => |pipe| { + this.* = .{ .ignore = {} }; + if (subprocess.process.hasExited() and !subprocess.flags.has_stdin_destructor_called) { + // onAttachedProcessExit() can call deref on the + // subprocess. Since we never called ref(), it would be + // unbalanced to do so, leading to a use-after-free. + // So, let's not do that. + // https://github.com/oven-sh/bun/pull/14092 + bun.debugAssert(!subprocess.flags.deref_on_stdin_destroyed); + const debug_ref_count = if (Environment.isDebug) subprocess.ref_count else 0; + pipe.onAttachedProcessExit(&subprocess.process.status); + if (Environment.isDebug) { + bun.debugAssert(subprocess.ref_count.get() == debug_ref_count.get()); + } + return pipe.toJS(globalThis); + } else { + subprocess.flags.has_stdin_destructor_called = false; + subprocess.weak_file_sink_stdin_ptr = pipe; + subprocess.ref(); + subprocess.flags.deref_on_stdin_destroyed = true; + if (@intFromPtr(pipe.signal.ptr) == @intFromPtr(subprocess)) { + pipe.signal.clear(); + } + return pipe.toJSWithDestructor( + globalThis, + jsc.WebCore.Sink.DestructorPtr.init(subprocess), + ); + } + }, + }; + } + + pub fn finalize(this: *Writable) void { + const subprocess: *Subprocess = @fieldParentPtr("stdin", this); + if (subprocess.this_jsvalue != .zero) { + if (jsc.Codegen.JSSubprocess.stdinGetCached(subprocess.this_jsvalue)) |existing_value| { + jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0); + } + } + + return switch (this.*) { + .pipe => |pipe| { + if (pipe.signal.ptr == @as(*anyopaque, @ptrCast(this))) { + pipe.signal.clear(); + } + + pipe.deref(); + + this.* = .{ .ignore = {} }; + }, + .buffer => { + this.buffer.updateRef(false); + this.buffer.deref(); + }, + .memfd => |fd| { + fd.close(); + this.* = .{ .ignore = {} }; + }, + .ignore => {}, + .fd, .inherit => {}, + }; + } + + pub fn close(this: *Writable) void { + switch (this.*) { + .pipe => |pipe| { + _ = pipe.end(null); + }, + .memfd => |fd| { + fd.close(); + this.* = .{ .ignore = {} }; + }, + .fd => { + this.* = .{ .ignore = {} }; + }, + .buffer => { + this.buffer.close(); + }, + .ignore => {}, + .inherit => {}, + } + } +}; + +const bun = @import("bun"); +const Environment = bun.Environment; +const Stdio = bun.spawn.Stdio; + +const jsc = bun.jsc; +const JSGlobalObject = jsc.JSGlobalObject; +const JSValue = jsc.JSValue; + +const Subprocess = jsc.API.Subprocess; +const StaticPipeWriter = Subprocess.StaticPipeWriter; +const StdioResult = Subprocess.StdioResult; +const js = Subprocess.js;