Split subprocess into more files (#21842)

### What does this PR do?

Split subprocess into more files

### How did you verify your code works?

check

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Jarred Sumner
2025-08-13 20:47:50 -07:00
committed by GitHub
parent 53b870af74
commit fac5e71a0c
7 changed files with 995 additions and 922 deletions

View File

@@ -98,6 +98,11 @@ src/bun.js/api/bun/spawn.zig
src/bun.js/api/bun/spawn/stdio.zig
src/bun.js/api/bun/ssl_wrapper.zig
src/bun.js/api/bun/subprocess.zig
src/bun.js/api/bun/subprocess/Readable.zig
src/bun.js/api/bun/subprocess/ResourceUsage.zig
src/bun.js/api/bun/subprocess/StaticPipeWriter.zig
src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig
src/bun.js/api/bun/subprocess/Writable.zig
src/bun.js/api/bun/udp_socket.zig
src/bun.js/api/bun/x509.zig
src/bun.js/api/BunObject.zig

View File

@@ -84,69 +84,7 @@ pub inline fn assertStdioResult(result: StdioResult) void {
}
}
pub const ResourceUsage = struct {
pub const js = jsc.Codegen.JSResourceUsage;
pub const toJS = ResourceUsage.js.toJS;
pub const fromJS = ResourceUsage.js.fromJS;
pub const fromJSDirect = ResourceUsage.js.fromJSDirect;
rusage: Rusage,
pub fn getCPUTime(this: *ResourceUsage, globalObject: *JSGlobalObject) bun.JSError!JSValue {
var cpu = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
const rusage = this.rusage;
const usrTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.utime.usec, rusage.utime.sec);
const sysTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.stime.usec, rusage.stime.sec);
cpu.put(globalObject, jsc.ZigString.static("user"), usrTime);
cpu.put(globalObject, jsc.ZigString.static("system"), sysTime);
cpu.put(globalObject, jsc.ZigString.static("total"), JSValue.bigIntSum(globalObject, usrTime, sysTime));
return cpu;
}
pub fn getMaxRSS(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
return jsc.JSValue.jsNumber(this.rusage.maxrss);
}
pub fn getSharedMemorySize(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
return jsc.JSValue.jsNumber(this.rusage.ixrss);
}
pub fn getSwapCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
return jsc.JSValue.jsNumber(this.rusage.nswap);
}
pub fn getOps(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
var ops = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
ops.put(globalObject, jsc.ZigString.static("in"), jsc.JSValue.jsNumber(this.rusage.inblock));
ops.put(globalObject, jsc.ZigString.static("out"), jsc.JSValue.jsNumber(this.rusage.oublock));
return ops;
}
pub fn getMessages(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
var msgs = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
msgs.put(globalObject, jsc.ZigString.static("sent"), jsc.JSValue.jsNumber(this.rusage.msgsnd));
msgs.put(globalObject, jsc.ZigString.static("received"), jsc.JSValue.jsNumber(this.rusage.msgrcv));
return msgs;
}
pub fn getSignalCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
return jsc.JSValue.jsNumber(this.rusage.nsignals);
}
pub fn getContextSwitches(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
var ctx = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
ctx.put(globalObject, jsc.ZigString.static("voluntary"), jsc.JSValue.jsNumber(this.rusage.nvcsw));
ctx.put(globalObject, jsc.ZigString.static("involuntary"), jsc.JSValue.jsNumber(this.rusage.nivcsw));
return ctx;
}
pub fn finalize(this: *ResourceUsage) callconv(.C) void {
bun.default_allocator.destroy(this);
}
};
pub const ResourceUsage = @import("./subprocess/ResourceUsage.zig");
pub fn appendEnvpFromJS(globalThis: *jsc.JSGlobalObject, object: *jsc.JSObject, envp: *std.ArrayList(?[*:0]const u8), PATH: *[]const u8) bun.JSError!void {
var object_iter = try jsc.JSPropertyIterator(.{ .skip_empty_name = false, .include_value = true }).init(globalThis, object);
@@ -207,27 +145,24 @@ pub fn resourceUsage(
return this.createResourceUsageObject(globalObject);
}
pub fn createResourceUsageObject(this: *Subprocess, globalObject: *JSGlobalObject) JSValue {
const pid_rusage = this.pid_rusage orelse brk: {
if (Environment.isWindows) {
if (this.process.poller == .uv) {
this.pid_rusage = PosixSpawn.process.uv_getrusage(&this.process.poller.uv);
break :brk this.pid_rusage.?;
pub fn createResourceUsageObject(this: *Subprocess, globalObject: *JSGlobalObject) bun.JSError!JSValue {
return ResourceUsage.create(
brk: {
if (this.pid_rusage != null) {
break :brk &this.pid_rusage.?;
}
}
return .js_undefined;
};
if (Environment.isWindows) {
if (this.process.poller == .uv) {
this.pid_rusage = PosixSpawn.process.uv_getrusage(&this.process.poller.uv);
break :brk &this.pid_rusage.?;
}
}
const resource_usage = ResourceUsage{
.rusage = pid_rusage,
};
var result = bun.default_allocator.create(ResourceUsage) catch {
return globalObject.throwOutOfMemoryValue();
};
result.* = resource_usage;
return result.toJS(globalObject);
return .js_undefined;
},
globalObject,
);
}
pub fn hasExited(this: *const Subprocess) bool {
@@ -357,183 +292,8 @@ pub fn constructor(globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSE
return globalObject.throw("Cannot construct Subprocess", .{});
}
const Readable = union(enum) {
fd: bun.FileDescriptor,
memfd: bun.FileDescriptor,
pipe: *PipeReader,
inherit: void,
ignore: void,
closed: void,
/// Eventually we will implement Readables created from blobs and array buffers.
/// When we do that, `buffer` will be borrowed from those objects.
///
/// When a buffered `pipe` finishes reading from its file descriptor,
/// the owning `Readable` will be convered into this variant and the pipe's
/// buffer will be taken as an owned `CowString`.
buffer: CowString,
pub fn memoryCost(this: *const Readable) usize {
return switch (this.*) {
.pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(),
.buffer => this.buffer.length(),
else => 0,
};
}
pub fn hasPendingActivity(this: *const Readable) bool {
return switch (this.*) {
.pipe => this.pipe.hasPendingActivity(),
else => false,
};
}
pub fn ref(this: *Readable) void {
switch (this.*) {
.pipe => {
this.pipe.updateRef(true);
},
else => {},
}
}
pub fn unref(this: *Readable) void {
switch (this.*) {
.pipe => {
this.pipe.updateRef(false);
},
else => {},
}
}
pub fn init(stdio: Stdio, event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, allocator: std.mem.Allocator, max_size: ?*MaxBuf, is_sync: bool) Readable {
_ = allocator; // autofix
_ = is_sync; // autofix
assertStdioResult(result);
if (comptime Environment.isPosix) {
if (stdio == .pipe) {
_ = bun.sys.setNonblocking(result.?);
}
}
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore, .ipc, .path => Readable{ .ignore = {} },
.fd => |fd| if (Environment.isPosix) Readable{ .fd = result.? } else Readable{ .fd = fd },
.memfd => if (Environment.isPosix) Readable{ .memfd = stdio.memfd } else Readable{ .ignore = {} },
.dup2 => |dup2| if (Environment.isPosix) Output.panic("TODO: implement dup2 support in Stdio readable", .{}) else Readable{ .fd = dup2.out.toFd() },
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, max_size) },
.array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
.capture => Output.panic("TODO: implement capture support in Stdio readable", .{}),
.readable_stream => Readable{ .ignore = {} }, // ReadableStream is handled separately
};
}
pub fn onClose(this: *Readable, _: ?bun.sys.Error) void {
this.* = .closed;
}
pub fn onReady(_: *Readable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Readable) void {}
pub fn close(this: *Readable) void {
switch (this.*) {
.memfd => |fd| {
this.* = .{ .closed = {} };
fd.close();
},
.fd => |_| {
this.* = .{ .closed = {} };
},
.pipe => {
this.pipe.close();
},
else => {},
}
}
pub fn finalize(this: *Readable) void {
switch (this.*) {
.memfd => |fd| {
this.* = .{ .closed = {} };
fd.close();
},
.fd => {
this.* = .{ .closed = {} };
},
.pipe => |pipe| {
defer pipe.detach();
this.* = .{ .closed = {} };
},
.buffer => |*buf| {
buf.deinit(bun.default_allocator);
},
else => {},
}
}
pub fn toJS(this: *Readable, globalThis: *jsc.JSGlobalObject, exited: bool) bun.JSError!JSValue {
_ = exited; // autofix
switch (this.*) {
// should only be reachable when the entire output is buffered.
.memfd => return this.toBufferedValue(globalThis),
.fd => |fd| {
return fd.toJS(globalThis);
},
.pipe => |pipe| {
defer pipe.detach();
this.* = .{ .closed = {} };
return pipe.toJS(globalThis);
},
.buffer => |*buffer| {
defer this.* = .{ .closed = {} };
if (buffer.length() == 0) {
return jsc.WebCore.ReadableStream.empty(globalThis);
}
const own = try buffer.takeSlice(bun.default_allocator);
return jsc.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0);
},
else => {
return .js_undefined;
},
}
}
pub fn toBufferedValue(this: *Readable, globalThis: *jsc.JSGlobalObject) bun.JSError!JSValue {
switch (this.*) {
.fd => |fd| {
return fd.toJS(globalThis);
},
.memfd => |fd| {
if (comptime !Environment.isPosix) {
Output.panic("memfd is only supported on Linux", .{});
}
this.* = .{ .closed = {} };
return jsc.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis);
},
.pipe => |pipe| {
defer pipe.detach();
this.* = .{ .closed = {} };
return pipe.toBuffer(globalThis);
},
.buffer => |*buf| {
defer this.* = .{ .closed = {} };
const own = buf.takeSlice(bun.default_allocator) catch {
return globalThis.throwOutOfMemory();
};
return jsc.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
},
else => {
return .js_undefined;
},
}
}
};
pub const PipeReader = @import("./subprocess/SubprocessPipeReader.zig");
pub const Readable = @import("./subprocess/Readable.zig").Readable;
pub fn getStderr(this: *Subprocess, globalThis: *JSGlobalObject) bun.JSError!JSValue {
this.observable_getters.insert(.stderr);
@@ -810,670 +570,9 @@ pub const Source = union(enum) {
}
};
pub const NewStaticPipeWriter = @import("./subprocess/StaticPipeWriter.zig").NewStaticPipeWriter;
pub const StaticPipeWriter = NewStaticPipeWriter(Subprocess);
pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
return struct {
const This = @This();
ref_count: WriterRefCount,
writer: IOWriter = .{},
stdio_result: StdioResult,
source: Source = .{ .detached = {} },
process: *ProcessType = undefined,
event_loop: jsc.EventLoopHandle,
buffer: []const u8 = "",
// It seems there is a bug in the Zig compiler. We'll get back to this one later
const WriterRefCount = bun.ptr.RefCount(@This(), "ref_count", _deinit, .{});
pub const ref = WriterRefCount.ref;
pub const deref = WriterRefCount.deref;
const print = bun.Output.scoped(.StaticPipeWriter, .visible);
pub const IOWriter = bun.io.BufferedWriter(@This(), struct {
pub const onWritable = null;
pub const getBuffer = This.getBuffer;
pub const onClose = This.onClose;
pub const onError = This.onError;
pub const onWrite = This.onWrite;
});
pub const Poll = IOWriter;
pub fn updateRef(this: *This, add: bool) void {
this.writer.updateRef(this.event_loop, add);
}
pub fn getBuffer(this: *This) []const u8 {
return this.buffer;
}
pub fn close(this: *This) void {
log("StaticPipeWriter(0x{x}) close()", .{@intFromPtr(this)});
this.writer.close();
}
pub fn flush(this: *This) void {
if (this.buffer.len > 0)
this.writer.write();
}
pub fn create(event_loop: anytype, subprocess: *ProcessType, result: StdioResult, source: Source) *This {
const this = bun.new(This, .{
.ref_count = .init(),
.event_loop = jsc.EventLoopHandle.init(event_loop),
.process = subprocess,
.stdio_result = result,
.source = source,
});
if (Environment.isWindows) {
this.writer.setPipe(this.stdio_result.buffer);
}
this.writer.setParent(this);
return this;
}
pub fn start(this: *This) bun.sys.Maybe(void) {
log("StaticPipeWriter(0x{x}) start()", .{@intFromPtr(this)});
this.ref();
this.buffer = this.source.slice();
if (Environment.isWindows) {
return this.writer.startWithCurrentPipe();
}
switch (this.writer.start(this.stdio_result.?, true)) {
.err => |err| {
return .{ .err = err };
},
.result => {
if (comptime Environment.isPosix) {
const poll = this.writer.handle.poll;
poll.flags.insert(.socket);
}
return .success;
},
}
}
pub fn onWrite(this: *This, amount: usize, status: bun.io.WriteStatus) void {
log("StaticPipeWriter(0x{x}) onWrite(amount={d} {})", .{ @intFromPtr(this), amount, status });
this.buffer = this.buffer[@min(amount, this.buffer.len)..];
if (status == .end_of_file or this.buffer.len == 0) {
this.writer.close();
}
}
pub fn onError(this: *This, err: bun.sys.Error) void {
log("StaticPipeWriter(0x{x}) onError(err={any})", .{ @intFromPtr(this), err });
this.source.detach();
}
pub fn onClose(this: *This) void {
log("StaticPipeWriter(0x{x}) onClose()", .{@intFromPtr(this)});
this.source.detach();
this.process.onCloseIO(.stdin);
}
fn _deinit(this: *This) void {
this.writer.end();
this.source.detach();
bun.destroy(this);
}
pub fn memoryCost(this: *const This) usize {
return @sizeOf(@This()) + this.source.memoryCost() + this.writer.memoryCost();
}
pub fn loop(this: *This) *uws.Loop {
return this.event_loop.loop();
}
pub fn watch(this: *This) void {
if (this.buffer.len > 0) {
this.writer.watch();
}
}
pub fn eventLoop(this: *This) jsc.EventLoopHandle {
return this.event_loop;
}
};
}
pub const PipeReader = struct {
const RefCount = bun.ptr.RefCount(@This(), "ref_count", PipeReader.deinit, .{});
pub const ref = PipeReader.RefCount.ref;
pub const deref = PipeReader.RefCount.deref;
reader: IOReader = undefined,
process: ?*Subprocess = null,
event_loop: *jsc.EventLoop = undefined,
ref_count: PipeReader.RefCount,
state: union(enum) {
pending: void,
done: []u8,
err: bun.sys.Error,
} = .{ .pending = {} },
stdio_result: StdioResult,
pub const IOReader = bun.io.BufferedReader;
pub const Poll = IOReader;
pub fn memoryCost(this: *const PipeReader) usize {
return this.reader.memoryCost();
}
pub fn hasPendingActivity(this: *const PipeReader) bool {
if (this.state == .pending)
return true;
return this.reader.hasPendingActivity();
}
pub fn detach(this: *PipeReader) void {
this.process = null;
this.deref();
}
pub fn create(event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, limit: ?*MaxBuf) *PipeReader {
var this = bun.new(PipeReader, .{
.ref_count = .init(),
.process = process,
.reader = IOReader.init(@This()),
.event_loop = event_loop,
.stdio_result = result,
});
MaxBuf.addToPipereader(limit, &this.reader.maxbuf);
if (Environment.isWindows) {
this.reader.source = .{ .pipe = this.stdio_result.buffer };
}
this.reader.setParent(this);
return this;
}
pub fn readAll(this: *PipeReader) void {
if (this.state == .pending)
this.reader.read();
}
pub fn start(this: *PipeReader, process: *Subprocess, event_loop: *jsc.EventLoop) bun.sys.Maybe(void) {
this.ref();
this.process = process;
this.event_loop = event_loop;
if (Environment.isWindows) {
return this.reader.startWithCurrentPipe();
}
switch (this.reader.start(this.stdio_result.?, true)) {
.err => |err| {
return .{ .err = err };
},
.result => {
if (comptime Environment.isPosix) {
const poll = this.reader.handle.poll;
poll.flags.insert(.socket);
this.reader.flags.socket = true;
this.reader.flags.nonblocking = true;
this.reader.flags.pollable = true;
poll.flags.insert(.nonblocking);
}
return .success;
},
}
}
pub const toJS = toReadableStream;
pub fn onReaderDone(this: *PipeReader) void {
const owned = this.toOwnedSlice();
this.state = .{ .done = owned };
if (this.process) |process| {
this.process = null;
process.onCloseIO(this.kind(process));
this.deref();
}
}
pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind {
if (process.stdout == .pipe and process.stdout.pipe == reader) {
return .stdout;
}
if (process.stderr == .pipe and process.stderr.pipe == reader) {
return .stderr;
}
@panic("We should be either stdout or stderr");
}
pub fn toOwnedSlice(this: *PipeReader) []u8 {
if (this.state == .done) {
return this.state.done;
}
// we do not use .toOwnedSlice() because we don't want to reallocate memory.
const out = this.reader._buffer;
this.reader._buffer.items = &.{};
this.reader._buffer.capacity = 0;
if (out.capacity > 0 and out.items.len == 0) {
out.deinit();
return &.{};
}
return out.items;
}
pub fn updateRef(this: *PipeReader, add: bool) void {
this.reader.updateRef(add);
}
pub fn watch(this: *PipeReader) void {
if (!this.reader.isDone())
this.reader.watch();
}
pub fn toReadableStream(this: *PipeReader, globalObject: *jsc.JSGlobalObject) bun.JSError!jsc.JSValue {
defer this.detach();
switch (this.state) {
.pending => {
const stream = jsc.WebCore.ReadableStream.fromPipe(globalObject, this, &this.reader);
this.state = .{ .done = &.{} };
return stream;
},
.done => |bytes| {
this.state = .{ .done = &.{} };
return jsc.WebCore.ReadableStream.fromOwnedSlice(globalObject, bytes, 0);
},
.err => |err| {
_ = err;
const empty = try jsc.WebCore.ReadableStream.empty(globalObject);
jsc.WebCore.ReadableStream.cancel(&(try jsc.WebCore.ReadableStream.fromJS(empty, globalObject)).?, globalObject);
return empty;
},
}
}
pub fn toBuffer(this: *PipeReader, globalThis: *jsc.JSGlobalObject) jsc.JSValue {
switch (this.state) {
.done => |bytes| {
defer this.state = .{ .done = &.{} };
return jsc.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
},
else => {
return .js_undefined;
},
}
}
pub fn onReaderError(this: *PipeReader, err: bun.sys.Error) void {
if (this.state == .done) {
bun.default_allocator.free(this.state.done);
}
this.state = .{ .err = err };
if (this.process) |process|
process.onCloseIO(this.kind(process));
}
pub fn close(this: *PipeReader) void {
switch (this.state) {
.pending => {
this.reader.close();
},
.done => {},
.err => {},
}
}
pub fn eventLoop(this: *PipeReader) *jsc.EventLoop {
return this.event_loop;
}
pub fn loop(this: *PipeReader) *uws.Loop {
return this.event_loop.virtual_machine.uwsLoop();
}
fn deinit(this: *PipeReader) void {
if (comptime Environment.isPosix) {
bun.assert(this.reader.isDone());
}
if (comptime Environment.isWindows) {
bun.assert(this.reader.source == null or this.reader.source.?.isClosed());
}
if (this.state == .done) {
bun.default_allocator.free(this.state.done);
}
this.reader.deinit();
bun.destroy(this);
}
};
const Writable = union(enum) {
pipe: *jsc.WebCore.FileSink,
fd: bun.FileDescriptor,
buffer: *StaticPipeWriter,
memfd: bun.FileDescriptor,
inherit: void,
ignore: void,
pub fn memoryCost(this: *const Writable) usize {
return switch (this.*) {
.pipe => |pipe| pipe.memoryCost(),
.buffer => |buffer| buffer.memoryCost(),
// TODO: memfd
else => 0,
};
}
pub fn hasPendingActivity(this: *const Writable) bool {
return switch (this.*) {
.pipe => false,
// we mark them as .ignore when they are closed, so this must be true
.buffer => true,
else => false,
};
}
pub fn ref(this: *Writable) void {
switch (this.*) {
.pipe => {
this.pipe.updateRef(true);
},
.buffer => {
this.buffer.updateRef(true);
},
else => {},
}
}
pub fn unref(this: *Writable) void {
switch (this.*) {
.pipe => {
this.pipe.updateRef(false);
},
.buffer => {
this.buffer.updateRef(false);
},
else => {},
}
}
// When the stream has closed we need to be notified to prevent a use-after-free
// We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice
pub fn onClose(this: *Writable, _: ?bun.sys.Error) void {
const process: *Subprocess = @fieldParentPtr("stdin", this);
if (process.this_jsvalue != .zero) {
if (js.stdinGetCached(process.this_jsvalue)) |existing_value| {
jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0);
}
}
switch (this.*) {
.buffer => {
this.buffer.deref();
},
.pipe => {
this.pipe.deref();
},
else => {},
}
process.onStdinDestroyed();
this.* = .{
.ignore = {},
};
}
pub fn onReady(_: *Writable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Writable) void {}
pub fn init(
stdio: *Stdio,
event_loop: *jsc.EventLoop,
subprocess: *Subprocess,
result: StdioResult,
promise_for_stream: *jsc.JSValue,
) !Writable {
assertStdioResult(result);
if (Environment.isWindows) {
switch (stdio.*) {
.pipe, .readable_stream => {
if (result == .buffer) {
const pipe = jsc.WebCore.FileSink.createWithPipe(event_loop, result.buffer);
switch (pipe.writer.startWithCurrentPipe()) {
.result => {},
.err => |err| {
_ = err; // autofix
pipe.deref();
if (stdio.* == .readable_stream) {
stdio.readable_stream.cancel(event_loop.global);
}
return error.UnexpectedCreatingStdin;
},
}
pipe.writer.setParent(pipe);
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.deref_on_stdin_destroyed = true;
subprocess.flags.has_stdin_destructor_called = false;
if (stdio.* == .readable_stream) {
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
if (assign_result.toError()) |err| {
pipe.deref();
subprocess.deref();
return event_loop.global.throwValue(err);
}
promise_for_stream.* = assign_result;
}
return Writable{
.pipe = pipe,
};
}
return Writable{ .inherit = {} };
},
.blob => |blob| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }),
};
},
.array_buffer => |array_buffer| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }),
};
},
.fd => |fd| {
return Writable{ .fd = fd };
},
.dup2 => |dup2| {
return Writable{ .fd = dup2.to.toFd() };
},
.inherit => {
return Writable{ .inherit = {} };
},
.memfd, .path, .ignore => {
return Writable{ .ignore = {} };
},
.ipc, .capture => {
return Writable{ .ignore = {} };
},
}
}
if (comptime Environment.isPosix) {
if (stdio.* == .pipe) {
_ = bun.sys.setNonblocking(result.?);
}
}
switch (stdio.*) {
.dup2 => @panic("TODO dup2 stdio"),
.pipe, .readable_stream => {
const pipe = jsc.WebCore.FileSink.create(event_loop, result.?);
switch (pipe.writer.start(pipe.fd, true)) {
.result => {},
.err => |err| {
_ = err; // autofix
pipe.deref();
if (stdio.* == .readable_stream) {
stdio.readable_stream.cancel(event_loop.global);
}
return error.UnexpectedCreatingStdin;
},
}
pipe.writer.handle.poll.flags.insert(.socket);
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.has_stdin_destructor_called = false;
subprocess.flags.deref_on_stdin_destroyed = true;
if (stdio.* == .readable_stream) {
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
if (assign_result.toError()) |err| {
pipe.deref();
subprocess.deref();
return event_loop.global.throwValue(err);
}
promise_for_stream.* = assign_result;
}
return Writable{
.pipe = pipe,
};
},
.blob => |blob| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }),
};
},
.array_buffer => |array_buffer| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }),
};
},
.memfd => |memfd| {
bun.assert(memfd != bun.invalid_fd);
return Writable{ .memfd = memfd };
},
.fd => {
return Writable{ .fd = result.? };
},
.inherit => {
return Writable{ .inherit = {} };
},
.path, .ignore => {
return Writable{ .ignore = {} };
},
.ipc, .capture => {
return Writable{ .ignore = {} };
},
}
}
pub fn toJS(this: *Writable, globalThis: *jsc.JSGlobalObject, subprocess: *Subprocess) JSValue {
return switch (this.*) {
.fd => |fd| fd.toJS(globalThis),
.memfd, .ignore => .js_undefined,
.buffer, .inherit => .js_undefined,
.pipe => |pipe| {
this.* = .{ .ignore = {} };
if (subprocess.process.hasExited() and !subprocess.flags.has_stdin_destructor_called) {
// onAttachedProcessExit() can call deref on the
// subprocess. Since we never called ref(), it would be
// unbalanced to do so, leading to a use-after-free.
// So, let's not do that.
// https://github.com/oven-sh/bun/pull/14092
bun.debugAssert(!subprocess.flags.deref_on_stdin_destroyed);
const debug_ref_count = if (Environment.isDebug) subprocess.ref_count else 0;
pipe.onAttachedProcessExit(&subprocess.process.status);
if (Environment.isDebug) {
bun.debugAssert(subprocess.ref_count.get() == debug_ref_count.get());
}
return pipe.toJS(globalThis);
} else {
subprocess.flags.has_stdin_destructor_called = false;
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.deref_on_stdin_destroyed = true;
if (@intFromPtr(pipe.signal.ptr) == @intFromPtr(subprocess)) {
pipe.signal.clear();
}
return pipe.toJSWithDestructor(
globalThis,
jsc.WebCore.Sink.DestructorPtr.init(subprocess),
);
}
},
};
}
pub fn finalize(this: *Writable) void {
const subprocess: *Subprocess = @fieldParentPtr("stdin", this);
if (subprocess.this_jsvalue != .zero) {
if (jsc.Codegen.JSSubprocess.stdinGetCached(subprocess.this_jsvalue)) |existing_value| {
jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0);
}
}
return switch (this.*) {
.pipe => |pipe| {
if (pipe.signal.ptr == @as(*anyopaque, @ptrCast(this))) {
pipe.signal.clear();
}
pipe.deref();
this.* = .{ .ignore = {} };
},
.buffer => {
this.buffer.updateRef(false);
this.buffer.deref();
},
.memfd => |fd| {
fd.close();
this.* = .{ .ignore = {} };
},
.ignore => {},
.fd, .inherit => {},
};
}
pub fn close(this: *Writable) void {
switch (this.*) {
.pipe => |pipe| {
_ = pipe.end(null);
},
.memfd => |fd| {
fd.close();
this.* = .{ .ignore = {} };
},
.fd => {
this.* = .{ .ignore = {} };
},
.buffer => {
this.buffer.close();
},
.ignore => {},
.inherit => {},
}
}
};
pub fn memoryCost(this: *const Subprocess) usize {
return @sizeOf(@This()) +
this.process.memoryCost() +
@@ -2618,7 +1717,7 @@ pub fn spawnMaybeSync(
const exitCode = subprocess.getExitCode(globalThis);
const stdout = try subprocess.stdout.toBufferedValue(globalThis);
const stderr = try subprocess.stderr.toBufferedValue(globalThis);
const resource_usage: JSValue = if (!globalThis.hasException()) subprocess.createResourceUsageObject(globalThis) else .zero;
const resource_usage: JSValue = if (!globalThis.hasException()) try subprocess.createResourceUsageObject(globalThis) else .zero;
const exitedDueToTimeout = subprocess.event_loop_timer.state == .FIRED;
const exitedDueToMaxBuffer = subprocess.exited_due_to_maxbuf;
const resultPid = jsc.JSValue.jsNumberFromInt32(subprocess.pid());
@@ -2717,7 +1816,8 @@ pub fn getGlobalThis(this: *Subprocess) ?*jsc.JSGlobalObject {
const IPClog = Output.scoped(.IPC, .visible);
const StdioResult = if (Environment.isWindows) bun.spawn.WindowsSpawnResult.StdioResult else ?bun.FileDescriptor;
pub const StdioResult = if (Environment.isWindows) bun.spawn.WindowsSpawnResult.StdioResult else ?bun.FileDescriptor;
pub const Writable = @import("./subprocess/Writable.zig").Writable;
pub const MaxBuf = bun.io.MaxBuf;

View File

@@ -0,0 +1,195 @@
pub const Readable = union(enum) {
fd: bun.FileDescriptor,
memfd: bun.FileDescriptor,
pipe: *PipeReader,
inherit: void,
ignore: void,
closed: void,
/// Eventually we will implement Readables created from blobs and array buffers.
/// When we do that, `buffer` will be borrowed from those objects.
///
/// When a buffered `pipe` finishes reading from its file descriptor,
/// the owning `Readable` will be convered into this variant and the pipe's
/// buffer will be taken as an owned `CowString`.
buffer: CowString,
pub fn memoryCost(this: *const Readable) usize {
return switch (this.*) {
.pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(),
.buffer => this.buffer.length(),
else => 0,
};
}
pub fn hasPendingActivity(this: *const Readable) bool {
return switch (this.*) {
.pipe => this.pipe.hasPendingActivity(),
else => false,
};
}
pub fn ref(this: *Readable) void {
switch (this.*) {
.pipe => {
this.pipe.updateRef(true);
},
else => {},
}
}
pub fn unref(this: *Readable) void {
switch (this.*) {
.pipe => {
this.pipe.updateRef(false);
},
else => {},
}
}
pub fn init(stdio: Stdio, event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, allocator: std.mem.Allocator, max_size: ?*MaxBuf, is_sync: bool) Readable {
_ = allocator; // autofix
_ = is_sync; // autofix
Subprocess.assertStdioResult(result);
if (comptime Environment.isPosix) {
if (stdio == .pipe) {
_ = bun.sys.setNonblocking(result.?);
}
}
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore, .ipc, .path => Readable{ .ignore = {} },
.fd => |fd| if (Environment.isPosix) Readable{ .fd = result.? } else Readable{ .fd = fd },
.memfd => if (Environment.isPosix) Readable{ .memfd = stdio.memfd } else Readable{ .ignore = {} },
.dup2 => |dup2| if (Environment.isPosix) Output.panic("TODO: implement dup2 support in Stdio readable", .{}) else Readable{ .fd = dup2.out.toFd() },
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, max_size) },
.array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
.capture => Output.panic("TODO: implement capture support in Stdio readable", .{}),
.readable_stream => Readable{ .ignore = {} }, // ReadableStream is handled separately
};
}
pub fn onClose(this: *Readable, _: ?bun.sys.Error) void {
this.* = .closed;
}
pub fn onReady(_: *Readable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Readable) void {}
pub fn close(this: *Readable) void {
switch (this.*) {
.memfd => |fd| {
this.* = .{ .closed = {} };
fd.close();
},
.fd => |_| {
this.* = .{ .closed = {} };
},
.pipe => {
this.pipe.close();
},
else => {},
}
}
pub fn finalize(this: *Readable) void {
switch (this.*) {
.memfd => |fd| {
this.* = .{ .closed = {} };
fd.close();
},
.fd => {
this.* = .{ .closed = {} };
},
.pipe => |pipe| {
defer pipe.detach();
this.* = .{ .closed = {} };
},
.buffer => |*buf| {
buf.deinit(bun.default_allocator);
},
else => {},
}
}
pub fn toJS(this: *Readable, globalThis: *jsc.JSGlobalObject, exited: bool) bun.JSError!JSValue {
_ = exited; // autofix
switch (this.*) {
// should only be reachable when the entire output is buffered.
.memfd => return this.toBufferedValue(globalThis),
.fd => |fd| {
return fd.toJS(globalThis);
},
.pipe => |pipe| {
defer pipe.detach();
this.* = .{ .closed = {} };
return pipe.toJS(globalThis);
},
.buffer => |*buffer| {
defer this.* = .{ .closed = {} };
if (buffer.length() == 0) {
return jsc.WebCore.ReadableStream.empty(globalThis);
}
const own = try buffer.takeSlice(bun.default_allocator);
return jsc.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0);
},
else => {
return .js_undefined;
},
}
}
pub fn toBufferedValue(this: *Readable, globalThis: *jsc.JSGlobalObject) bun.JSError!JSValue {
switch (this.*) {
.fd => |fd| {
return fd.toJS(globalThis);
},
.memfd => |fd| {
if (comptime !Environment.isPosix) {
Output.panic("memfd is only supported on Linux", .{});
}
this.* = .{ .closed = {} };
return jsc.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis);
},
.pipe => |pipe| {
defer pipe.detach();
this.* = .{ .closed = {} };
return pipe.toBuffer(globalThis);
},
.buffer => |*buf| {
defer this.* = .{ .closed = {} };
const own = buf.takeSlice(bun.default_allocator) catch {
return globalThis.throwOutOfMemory();
};
return jsc.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
},
else => {
return .js_undefined;
},
}
}
};
const std = @import("std");
const bun = @import("bun");
const Environment = bun.Environment;
const Output = bun.Output;
const default_allocator = bun.default_allocator;
const CowString = bun.ptr.CowString;
const Stdio = bun.spawn.Stdio;
const jsc = bun.jsc;
const JSGlobalObject = jsc.JSGlobalObject;
const JSValue = jsc.JSValue;
const Subprocess = jsc.API.Subprocess;
const MaxBuf = Subprocess.MaxBuf;
const PipeReader = Subprocess.PipeReader;
const StdioResult = Subprocess.StdioResult;

View File

@@ -0,0 +1,75 @@
const ResourceUsage = @This();
pub const js = jsc.Codegen.JSResourceUsage;
pub const toJS = ResourceUsage.js.toJS;
pub const fromJS = ResourceUsage.js.fromJS;
pub const fromJSDirect = ResourceUsage.js.fromJSDirect;
rusage: Rusage,
pub fn create(rusage: *const Rusage, globalObject: *JSGlobalObject) bun.JSError!JSValue {
return bun.new(ResourceUsage, .{ .rusage = rusage.* }).toJS(globalObject);
}
pub fn getCPUTime(this: *ResourceUsage, globalObject: *JSGlobalObject) bun.JSError!JSValue {
var cpu = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
const rusage = this.rusage;
const usrTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.utime.usec, rusage.utime.sec);
const sysTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.stime.usec, rusage.stime.sec);
cpu.put(globalObject, jsc.ZigString.static("user"), usrTime);
cpu.put(globalObject, jsc.ZigString.static("system"), sysTime);
cpu.put(globalObject, jsc.ZigString.static("total"), JSValue.bigIntSum(globalObject, usrTime, sysTime));
return cpu;
}
pub fn getMaxRSS(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
return jsc.JSValue.jsNumber(this.rusage.maxrss);
}
pub fn getSharedMemorySize(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
return jsc.JSValue.jsNumber(this.rusage.ixrss);
}
pub fn getSwapCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
return jsc.JSValue.jsNumber(this.rusage.nswap);
}
pub fn getOps(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
var ops = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
ops.put(globalObject, jsc.ZigString.static("in"), jsc.JSValue.jsNumber(this.rusage.inblock));
ops.put(globalObject, jsc.ZigString.static("out"), jsc.JSValue.jsNumber(this.rusage.oublock));
return ops;
}
pub fn getMessages(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
var msgs = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
msgs.put(globalObject, jsc.ZigString.static("sent"), jsc.JSValue.jsNumber(this.rusage.msgsnd));
msgs.put(globalObject, jsc.ZigString.static("received"), jsc.JSValue.jsNumber(this.rusage.msgrcv));
return msgs;
}
pub fn getSignalCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
return jsc.JSValue.jsNumber(this.rusage.nsignals);
}
pub fn getContextSwitches(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
var ctx = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
ctx.put(globalObject, jsc.ZigString.static("voluntary"), jsc.JSValue.jsNumber(this.rusage.nvcsw));
ctx.put(globalObject, jsc.ZigString.static("involuntary"), jsc.JSValue.jsNumber(this.rusage.nivcsw));
return ctx;
}
pub fn finalize(this: *ResourceUsage) callconv(.C) void {
bun.default_allocator.destroy(this);
}
const bun = @import("bun");
const default_allocator = bun.default_allocator;
const Rusage = bun.spawn.Rusage;
const jsc = bun.jsc;
const JSGlobalObject = jsc.JSGlobalObject;
const JSValue = jsc.JSValue;

View File

@@ -0,0 +1,139 @@
pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
return struct {
const This = @This();
ref_count: WriterRefCount,
writer: IOWriter = .{},
stdio_result: StdioResult,
source: Source = .{ .detached = {} },
process: *ProcessType = undefined,
event_loop: jsc.EventLoopHandle,
buffer: []const u8 = "",
// It seems there is a bug in the Zig compiler. We'll get back to this one later
const WriterRefCount = bun.ptr.RefCount(@This(), "ref_count", _deinit, .{});
pub const ref = WriterRefCount.ref;
pub const deref = WriterRefCount.deref;
const print = bun.Output.scoped(.StaticPipeWriter, .visible);
pub const IOWriter = bun.io.BufferedWriter(@This(), struct {
pub const onWritable = null;
pub const getBuffer = This.getBuffer;
pub const onClose = This.onClose;
pub const onError = This.onError;
pub const onWrite = This.onWrite;
});
pub const Poll = IOWriter;
pub fn updateRef(this: *This, add: bool) void {
this.writer.updateRef(this.event_loop, add);
}
pub fn getBuffer(this: *This) []const u8 {
return this.buffer;
}
pub fn close(this: *This) void {
log("StaticPipeWriter(0x{x}) close()", .{@intFromPtr(this)});
this.writer.close();
}
pub fn flush(this: *This) void {
if (this.buffer.len > 0)
this.writer.write();
}
pub fn create(event_loop: anytype, subprocess: *ProcessType, result: StdioResult, source: Source) *This {
const this = bun.new(This, .{
.ref_count = .init(),
.event_loop = jsc.EventLoopHandle.init(event_loop),
.process = subprocess,
.stdio_result = result,
.source = source,
});
if (Environment.isWindows) {
this.writer.setPipe(this.stdio_result.buffer);
}
this.writer.setParent(this);
return this;
}
pub fn start(this: *This) bun.sys.Maybe(void) {
log("StaticPipeWriter(0x{x}) start()", .{@intFromPtr(this)});
this.ref();
this.buffer = this.source.slice();
if (Environment.isWindows) {
return this.writer.startWithCurrentPipe();
}
switch (this.writer.start(this.stdio_result.?, true)) {
.err => |err| {
return .{ .err = err };
},
.result => {
if (comptime Environment.isPosix) {
const poll = this.writer.handle.poll;
poll.flags.insert(.socket);
}
return .success;
},
}
}
pub fn onWrite(this: *This, amount: usize, status: bun.io.WriteStatus) void {
log("StaticPipeWriter(0x{x}) onWrite(amount={d} {})", .{ @intFromPtr(this), amount, status });
this.buffer = this.buffer[@min(amount, this.buffer.len)..];
if (status == .end_of_file or this.buffer.len == 0) {
this.writer.close();
}
}
pub fn onError(this: *This, err: bun.sys.Error) void {
log("StaticPipeWriter(0x{x}) onError(err={any})", .{ @intFromPtr(this), err });
this.source.detach();
}
pub fn onClose(this: *This) void {
log("StaticPipeWriter(0x{x}) onClose()", .{@intFromPtr(this)});
this.source.detach();
this.process.onCloseIO(.stdin);
}
fn _deinit(this: *This) void {
this.writer.end();
this.source.detach();
bun.destroy(this);
}
pub fn memoryCost(this: *const This) usize {
return @sizeOf(@This()) + this.source.memoryCost() + this.writer.memoryCost();
}
pub fn loop(this: *This) *uws.Loop {
return this.event_loop.loop();
}
pub fn watch(this: *This) void {
if (this.buffer.len > 0) {
this.writer.watch();
}
}
pub fn eventLoop(this: *This) jsc.EventLoopHandle {
return this.event_loop;
}
};
}
const log = Output.scoped(.StaticPipeWriter, .hidden);
const bun = @import("bun");
const Environment = bun.Environment;
const Output = bun.Output;
const jsc = bun.jsc;
const uws = bun.uws;
const Subprocess = jsc.API.Subprocess;
const Source = Subprocess.Source;
const StdioResult = Subprocess.StdioResult;

View File

@@ -0,0 +1,225 @@
const PipeReader = @This();
const RefCount = bun.ptr.RefCount(@This(), "ref_count", PipeReader.deinit, .{});
pub const ref = PipeReader.RefCount.ref;
pub const deref = PipeReader.RefCount.deref;
reader: IOReader = undefined,
process: ?*Subprocess = null,
event_loop: *jsc.EventLoop = undefined,
ref_count: PipeReader.RefCount,
state: union(enum) {
pending: void,
done: []u8,
err: bun.sys.Error,
} = .{ .pending = {} },
stdio_result: StdioResult,
pub const IOReader = bun.io.BufferedReader;
pub const Poll = IOReader;
pub fn memoryCost(this: *const PipeReader) usize {
return this.reader.memoryCost();
}
pub fn hasPendingActivity(this: *const PipeReader) bool {
if (this.state == .pending)
return true;
return this.reader.hasPendingActivity();
}
pub fn detach(this: *PipeReader) void {
this.process = null;
this.deref();
}
pub fn create(event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, limit: ?*MaxBuf) *PipeReader {
var this = bun.new(PipeReader, .{
.ref_count = .init(),
.process = process,
.reader = IOReader.init(@This()),
.event_loop = event_loop,
.stdio_result = result,
});
MaxBuf.addToPipereader(limit, &this.reader.maxbuf);
if (Environment.isWindows) {
this.reader.source = .{ .pipe = this.stdio_result.buffer };
}
this.reader.setParent(this);
return this;
}
pub fn readAll(this: *PipeReader) void {
if (this.state == .pending)
this.reader.read();
}
pub fn start(this: *PipeReader, process: *Subprocess, event_loop: *jsc.EventLoop) bun.sys.Maybe(void) {
this.ref();
this.process = process;
this.event_loop = event_loop;
if (Environment.isWindows) {
return this.reader.startWithCurrentPipe();
}
switch (this.reader.start(this.stdio_result.?, true)) {
.err => |err| {
return .{ .err = err };
},
.result => {
if (comptime Environment.isPosix) {
const poll = this.reader.handle.poll;
poll.flags.insert(.socket);
this.reader.flags.socket = true;
this.reader.flags.nonblocking = true;
this.reader.flags.pollable = true;
poll.flags.insert(.nonblocking);
}
return .success;
},
}
}
pub const toJS = toReadableStream;
pub fn onReaderDone(this: *PipeReader) void {
const owned = this.toOwnedSlice();
this.state = .{ .done = owned };
if (this.process) |process| {
this.process = null;
process.onCloseIO(this.kind(process));
this.deref();
}
}
pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind {
if (process.stdout == .pipe and process.stdout.pipe == reader) {
return .stdout;
}
if (process.stderr == .pipe and process.stderr.pipe == reader) {
return .stderr;
}
@panic("We should be either stdout or stderr");
}
pub fn toOwnedSlice(this: *PipeReader) []u8 {
if (this.state == .done) {
return this.state.done;
}
// we do not use .toOwnedSlice() because we don't want to reallocate memory.
const out = this.reader._buffer;
this.reader._buffer.items = &.{};
this.reader._buffer.capacity = 0;
if (out.capacity > 0 and out.items.len == 0) {
out.deinit();
return &.{};
}
return out.items;
}
pub fn updateRef(this: *PipeReader, add: bool) void {
this.reader.updateRef(add);
}
pub fn watch(this: *PipeReader) void {
if (!this.reader.isDone())
this.reader.watch();
}
pub fn toReadableStream(this: *PipeReader, globalObject: *jsc.JSGlobalObject) bun.JSError!jsc.JSValue {
defer this.detach();
switch (this.state) {
.pending => {
const stream = jsc.WebCore.ReadableStream.fromPipe(globalObject, this, &this.reader);
this.state = .{ .done = &.{} };
return stream;
},
.done => |bytes| {
this.state = .{ .done = &.{} };
return jsc.WebCore.ReadableStream.fromOwnedSlice(globalObject, bytes, 0);
},
.err => |err| {
_ = err;
const empty = try jsc.WebCore.ReadableStream.empty(globalObject);
jsc.WebCore.ReadableStream.cancel(&(try jsc.WebCore.ReadableStream.fromJS(empty, globalObject)).?, globalObject);
return empty;
},
}
}
pub fn toBuffer(this: *PipeReader, globalThis: *jsc.JSGlobalObject) jsc.JSValue {
switch (this.state) {
.done => |bytes| {
defer this.state = .{ .done = &.{} };
return jsc.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
},
else => {
return .js_undefined;
},
}
}
pub fn onReaderError(this: *PipeReader, err: bun.sys.Error) void {
if (this.state == .done) {
bun.default_allocator.free(this.state.done);
}
this.state = .{ .err = err };
if (this.process) |process|
process.onCloseIO(this.kind(process));
}
pub fn close(this: *PipeReader) void {
switch (this.state) {
.pending => {
this.reader.close();
},
.done => {},
.err => {},
}
}
pub fn eventLoop(this: *PipeReader) *jsc.EventLoop {
return this.event_loop;
}
pub fn loop(this: *PipeReader) *uws.Loop {
return this.event_loop.virtual_machine.uwsLoop();
}
fn deinit(this: *PipeReader) void {
if (comptime Environment.isPosix) {
bun.assert(this.reader.isDone());
}
if (comptime Environment.isWindows) {
bun.assert(this.reader.source == null or this.reader.source.?.isClosed());
}
if (this.state == .done) {
bun.default_allocator.free(this.state.done);
}
this.reader.deinit();
bun.destroy(this);
}
const bun = @import("bun");
const Environment = bun.Environment;
const default_allocator = bun.default_allocator;
const uws = bun.uws;
const jsc = bun.jsc;
const JSGlobalObject = jsc.JSGlobalObject;
const JSValue = jsc.JSValue;
const Subprocess = jsc.API.Subprocess;
const MaxBuf = Subprocess.MaxBuf;
const StdioKind = Subprocess.StdioKind;
const StdioResult = Subprocess.StdioResult;

View File

@@ -0,0 +1,334 @@
pub const Writable = union(enum) {
pipe: *jsc.WebCore.FileSink,
fd: bun.FileDescriptor,
buffer: *StaticPipeWriter,
memfd: bun.FileDescriptor,
inherit: void,
ignore: void,
pub fn memoryCost(this: *const Writable) usize {
return switch (this.*) {
.pipe => |pipe| pipe.memoryCost(),
.buffer => |buffer| buffer.memoryCost(),
// TODO: memfd
else => 0,
};
}
pub fn hasPendingActivity(this: *const Writable) bool {
return switch (this.*) {
.pipe => false,
// we mark them as .ignore when they are closed, so this must be true
.buffer => true,
else => false,
};
}
pub fn ref(this: *Writable) void {
switch (this.*) {
.pipe => {
this.pipe.updateRef(true);
},
.buffer => {
this.buffer.updateRef(true);
},
else => {},
}
}
pub fn unref(this: *Writable) void {
switch (this.*) {
.pipe => {
this.pipe.updateRef(false);
},
.buffer => {
this.buffer.updateRef(false);
},
else => {},
}
}
// When the stream has closed we need to be notified to prevent a use-after-free
// We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice
pub fn onClose(this: *Writable, _: ?bun.sys.Error) void {
const process: *Subprocess = @fieldParentPtr("stdin", this);
if (process.this_jsvalue != .zero) {
if (js.stdinGetCached(process.this_jsvalue)) |existing_value| {
jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0);
}
}
switch (this.*) {
.buffer => {
this.buffer.deref();
},
.pipe => {
this.pipe.deref();
},
else => {},
}
process.onStdinDestroyed();
this.* = .{
.ignore = {},
};
}
pub fn onReady(_: *Writable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Writable) void {}
pub fn init(
stdio: *Stdio,
event_loop: *jsc.EventLoop,
subprocess: *Subprocess,
result: StdioResult,
promise_for_stream: *jsc.JSValue,
) !Writable {
Subprocess.assertStdioResult(result);
if (Environment.isWindows) {
switch (stdio.*) {
.pipe, .readable_stream => {
if (result == .buffer) {
const pipe = jsc.WebCore.FileSink.createWithPipe(event_loop, result.buffer);
switch (pipe.writer.startWithCurrentPipe()) {
.result => {},
.err => |err| {
_ = err; // autofix
pipe.deref();
if (stdio.* == .readable_stream) {
stdio.readable_stream.cancel(event_loop.global);
}
return error.UnexpectedCreatingStdin;
},
}
pipe.writer.setParent(pipe);
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.deref_on_stdin_destroyed = true;
subprocess.flags.has_stdin_destructor_called = false;
if (stdio.* == .readable_stream) {
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
if (assign_result.toError()) |err| {
pipe.deref();
subprocess.deref();
return event_loop.global.throwValue(err);
}
promise_for_stream.* = assign_result;
}
return Writable{
.pipe = pipe,
};
}
return Writable{ .inherit = {} };
},
.blob => |blob| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }),
};
},
.array_buffer => |array_buffer| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }),
};
},
.fd => |fd| {
return Writable{ .fd = fd };
},
.dup2 => |dup2| {
return Writable{ .fd = dup2.to.toFd() };
},
.inherit => {
return Writable{ .inherit = {} };
},
.memfd, .path, .ignore => {
return Writable{ .ignore = {} };
},
.ipc, .capture => {
return Writable{ .ignore = {} };
},
}
}
if (comptime Environment.isPosix) {
if (stdio.* == .pipe) {
_ = bun.sys.setNonblocking(result.?);
}
}
switch (stdio.*) {
.dup2 => @panic("TODO dup2 stdio"),
.pipe, .readable_stream => {
const pipe = jsc.WebCore.FileSink.create(event_loop, result.?);
switch (pipe.writer.start(pipe.fd, true)) {
.result => {},
.err => |err| {
_ = err; // autofix
pipe.deref();
if (stdio.* == .readable_stream) {
stdio.readable_stream.cancel(event_loop.global);
}
return error.UnexpectedCreatingStdin;
},
}
pipe.writer.handle.poll.flags.insert(.socket);
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.has_stdin_destructor_called = false;
subprocess.flags.deref_on_stdin_destroyed = true;
if (stdio.* == .readable_stream) {
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
if (assign_result.toError()) |err| {
pipe.deref();
subprocess.deref();
return event_loop.global.throwValue(err);
}
promise_for_stream.* = assign_result;
}
return Writable{
.pipe = pipe,
};
},
.blob => |blob| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }),
};
},
.array_buffer => |array_buffer| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }),
};
},
.memfd => |memfd| {
bun.assert(memfd != bun.invalid_fd);
return Writable{ .memfd = memfd };
},
.fd => {
return Writable{ .fd = result.? };
},
.inherit => {
return Writable{ .inherit = {} };
},
.path, .ignore => {
return Writable{ .ignore = {} };
},
.ipc, .capture => {
return Writable{ .ignore = {} };
},
}
}
pub fn toJS(this: *Writable, globalThis: *jsc.JSGlobalObject, subprocess: *Subprocess) JSValue {
return switch (this.*) {
.fd => |fd| fd.toJS(globalThis),
.memfd, .ignore => .js_undefined,
.buffer, .inherit => .js_undefined,
.pipe => |pipe| {
this.* = .{ .ignore = {} };
if (subprocess.process.hasExited() and !subprocess.flags.has_stdin_destructor_called) {
// onAttachedProcessExit() can call deref on the
// subprocess. Since we never called ref(), it would be
// unbalanced to do so, leading to a use-after-free.
// So, let's not do that.
// https://github.com/oven-sh/bun/pull/14092
bun.debugAssert(!subprocess.flags.deref_on_stdin_destroyed);
const debug_ref_count = if (Environment.isDebug) subprocess.ref_count else 0;
pipe.onAttachedProcessExit(&subprocess.process.status);
if (Environment.isDebug) {
bun.debugAssert(subprocess.ref_count.get() == debug_ref_count.get());
}
return pipe.toJS(globalThis);
} else {
subprocess.flags.has_stdin_destructor_called = false;
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.deref_on_stdin_destroyed = true;
if (@intFromPtr(pipe.signal.ptr) == @intFromPtr(subprocess)) {
pipe.signal.clear();
}
return pipe.toJSWithDestructor(
globalThis,
jsc.WebCore.Sink.DestructorPtr.init(subprocess),
);
}
},
};
}
pub fn finalize(this: *Writable) void {
const subprocess: *Subprocess = @fieldParentPtr("stdin", this);
if (subprocess.this_jsvalue != .zero) {
if (jsc.Codegen.JSSubprocess.stdinGetCached(subprocess.this_jsvalue)) |existing_value| {
jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0);
}
}
return switch (this.*) {
.pipe => |pipe| {
if (pipe.signal.ptr == @as(*anyopaque, @ptrCast(this))) {
pipe.signal.clear();
}
pipe.deref();
this.* = .{ .ignore = {} };
},
.buffer => {
this.buffer.updateRef(false);
this.buffer.deref();
},
.memfd => |fd| {
fd.close();
this.* = .{ .ignore = {} };
},
.ignore => {},
.fd, .inherit => {},
};
}
pub fn close(this: *Writable) void {
switch (this.*) {
.pipe => |pipe| {
_ = pipe.end(null);
},
.memfd => |fd| {
fd.close();
this.* = .{ .ignore = {} };
},
.fd => {
this.* = .{ .ignore = {} };
},
.buffer => {
this.buffer.close();
},
.ignore => {},
.inherit => {},
}
}
};
const bun = @import("bun");
const Environment = bun.Environment;
const Stdio = bun.spawn.Stdio;
const jsc = bun.jsc;
const JSGlobalObject = jsc.JSGlobalObject;
const JSValue = jsc.JSValue;
const Subprocess = jsc.API.Subprocess;
const StaticPipeWriter = Subprocess.StaticPipeWriter;
const StdioResult = Subprocess.StdioResult;
const js = Subprocess.js;