diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index fbe433c84a..8342b44a44 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -212,6 +212,10 @@ pub const Subprocess = struct { return true; } + if (this.hasPendingActivityStdio()) { + return true; + } + return this.process.hasRef(); } @@ -229,6 +233,48 @@ pub const Subprocess = struct { ); } + pub fn hasPendingActivityStdio(this: *const Subprocess) bool { + if (this.stdin.hasPendingActivity()) { + return true; + } + + inline for (.{ StdioKind.stdout, StdioKind.stderr }) |kind| { + if (@field(this, @tagName(kind)).hasPendingActivity()) { + return true; + } + } + } + + pub fn onCloseIO(this: *Subprocess, kind: StdioKind) void { + switch (kind) { + .stdin => { + switch (this.stdin) { + .pipe => |pipe| { + pipe.signal.clear(); + pipe.deref(); + this.stdin.* = .{ .ignore = {} }; + }, + .buffer => { + this.stdin.buffer.source.detach(); + this.stdin.buffer.deref(); + this.stdin.* = .{ .ignore = {} }; + }, + else => {}, + } + }, + inline .stdout, .stderr => |tag| { + const out: *Readable = &@field(this, @tagName(tag)); + switch (out.*) { + .pipe => |pipe| { + out.* = .{ .ignore = {} }; + pipe.deref(); + }, + else => {}, + } + }, + } + } + pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool { @fence(.Acquire); return this.has_pending_activity.load(.Acquire); @@ -277,7 +323,7 @@ pub const Subprocess = struct { const Readable = union(enum) { fd: bun.FileDescriptor, memfd: bun.FileDescriptor, - pipe: Pipe, + pipe: *PipeReader, inherit: void, ignore: void, closed: void, @@ -285,15 +331,7 @@ pub const Subprocess = struct { pub fn ref(this: *Readable) void { switch (this.*) { .pipe => { - if (this.pipe == .buffer) { - if (Environment.isWindows) { - uv.uv_ref(@ptrCast(&this.pipe.buffer.stream)); - return; - } - if (this.pipe.buffer.stream.poll_ref) |poll| { - poll.enableKeepingProcessAlive(JSC.VirtualMachine.get()); - } - } + this.pipe.updateRef(true); }, else => {}, } @@ -302,89 +340,16 @@ pub const Subprocess = struct { pub fn unref(this: *Readable) void { switch (this.*) { .pipe => { - if (this.pipe == .buffer) { - if (Environment.isWindows) { - uv.uv_unref(@ptrCast(&this.pipe.buffer.stream)); - return; - } - if (this.pipe.buffer.stream.poll_ref) |poll| { - poll.disableKeepingProcessAlive(JSC.VirtualMachine.get()); - } - } + this.pipe.updateRef(false); }, else => {}, } } - pub const Pipe = union(enum) { - stream: JSC.WebCore.ReadableStream, - buffer: PipeReader, - detached: void, - - pub fn finish(this: *@This()) void { - if (this.* == .stream and this.stream.ptr == .File) { - this.stream.ptr.File.finish(); - } - } - - pub fn done(this: *@This()) void { - if (this.* == .detached) - return; - - if (this.* == .stream) { - if (this.stream.ptr == .File) this.stream.ptr.File.setSignal(JSC.WebCore.Signal{}); - this.stream.done(); - return; - } - - this.buffer.close(); - } - - pub fn toJS(this: *@This(), readable: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue { - if (comptime Environment.allow_assert) - std.debug.assert(this.* != .detached); // this should be cached by the getter - - if (this.* != .stream) { - const stream = this.buffer.toReadableStream(globalThis, exited); - // we do not detach on windows - if (Environment.isWindows) { - return stream.toJS(); - } - this.* = .{ .stream = stream }; - } - - if (this.stream.ptr == .File) { - this.stream.ptr.File.setSignal(JSC.WebCore.Signal.init(readable)); - } - - const result = this.stream.toJS(); - this.* = .detached; - return result; - } - }; - - pub fn initWithPipe(stdio: Stdio, pipe: *uv.Pipe, allocator: std.mem.Allocator, max_size: u32) Readable { - return switch (stdio) { - .inherit => Readable{ .inherit = {} }, - .ignore => Readable{ .ignore = {} }, - .pipe => brk: { - break :brk .{ - .pipe = .{ - .buffer = StreamingOutput.initWithPipeAndAllocator(allocator, pipe, max_size), - }, - }; - }, - .path => Readable{ .ignore = {} }, - .blob, .fd => @panic("use init() instead"), - .memfd => Readable{ .memfd = stdio.memfd }, - .array_buffer => Readable{ - .pipe = .{ - .buffer = StreamingOutput.initWithPipeAndSlice(pipe, stdio.array_buffer.slice()), - }, - }, - }; - } - pub fn init(stdio: Stdio, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32, is_sync: bool) Readable { + pub fn init(stdio: Stdio, event_loop: *JSC.EventLoop, process: *Subprocess, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32, is_sync: bool) Readable { + _ = allocator; // autofix + _ = max_size; // autofix + _ = is_sync; // autofix if (comptime Environment.allow_assert) { if (fd) |fd_| { std.debug.assert(fd_ != bun.invalid_fd); @@ -394,22 +359,10 @@ pub const Subprocess = struct { return switch (stdio) { .inherit => Readable{ .inherit = {} }, .ignore => Readable{ .ignore = {} }, - .pipe => brk: { - if (is_sync) {} - break :brk .{ - .pipe = .{ - .buffer = StreamingOutput.initWithAllocator(allocator, fd.?, max_size), - }, - }; - }, .path => Readable{ .ignore = {} }, - .blob, .fd => Readable{ .fd = fd.? }, + .fd => Readable{ .fd = fd.? }, .memfd => Readable{ .memfd = stdio.memfd }, - .array_buffer => Readable{ - .pipe = .{ - .buffer = StreamingOutput.initWithSlice(fd.?, stdio.array_buffer.slice()), - }, - }, + .pipe => Readable{ .pipe = PipeReader.create(event_loop, process, fd.?) }, }; } @@ -428,28 +381,12 @@ pub const Subprocess = struct { _ = bun.sys.close(fd); }, .pipe => { - this.pipe.done(); + this.pipe.close(); }, else => {}, } } - pub fn setCloseCallbackIfPossible(this: *Readable, callback: CloseCallbackHandler) bool { - switch (this.*) { - .pipe => { - if (Environment.isWindows) { - if (uv.uv_is_closed(@ptrCast(this.pipe.buffer.stream))) { - return false; - } - this.pipe.buffer.closeCallback = callback; - return true; - } - return false; - }, - else => return false, - } - } - pub fn finalize(this: *Readable) void { switch (this.*) { inline .memfd, .fd => |fd| { @@ -457,16 +394,8 @@ pub const Subprocess = struct { _ = bun.sys.close(fd); }, .pipe => |*pipe| { - if (pipe.* == .detached) { - return; - } - - if (pipe.* == .stream and pipe.stream.ptr == .File) { - this.close(); - return; - } - - pipe.buffer.close(); + defer pipe.detach(); + this.* = .{ .closed = {} }; }, else => {}, } @@ -480,8 +409,10 @@ pub const Subprocess = struct { .fd => |fd| { return JSValue.jsNumber(fd); }, - .pipe => { - return this.pipe.toJS(this, globalThis, exited); + .pipe => |pipe| { + defer pipe.detach(); + this.* = .{ .closed = {} }; + return pipe.toJS(this, globalThis, exited); }, else => { return JSValue.jsUndefined(); @@ -501,28 +432,10 @@ pub const Subprocess = struct { this.* = .{ .closed = {} }; return JSC.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis); }, - .sync_buffered_output => |*sync_buffered_output| { - const slice = sync_buffered_output.toOwnedSlice(globalThis); + .pipe => |pipe| { + defer pipe.detach(); this.* = .{ .closed = {} }; - return JSC.MarkedArrayBuffer - .fromBytes(slice, bun.default_allocator, .Uint8Array) - .toNodeBuffer(globalThis); - }, - .pipe => { - if (!Environment.isWindows) { - this.pipe.buffer.stream.close_on_empty_read = true; - this.pipe.buffer.readAll(); - } - - const bytes = this.pipe.buffer.internal_buffer.slice(); - this.pipe.buffer.internal_buffer = .{}; - - if (bytes.len > 0) { - // Return a Buffer so that they can do .toString() on it - return JSC.JSValue.createBuffer(globalThis, bytes, bun.default_allocator); - } - - return JSC.JSValue.createBuffer(globalThis, &.{}, bun.default_allocator); + return pipe.toBuffer(globalThis); }, else => { return JSValue.jsUndefined(); @@ -694,11 +607,44 @@ pub const Subprocess = struct { pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub fn updateRef(this: *StaticPipeWriter, add: bool) void { + if (add) { + this.writer.updateRef(this.event_loop, true); + } else { + this.writer.updateRef(this.event_loop, false); + } + } + + pub fn close(this: *StaticPipeWriter) void { + this.writer.close(); + } + + pub fn flush(this: *StaticPipeWriter) void { + this.writer.flush(); + } + + pub fn create(event_loop: *JSC.EventLoop, subprocess: *Subprocess, fd: bun.FileDescriptor, source: Source) *StaticPipeWriter { + return StaticPipeWriter.new(.{ + .event_loop = event_loop, + .process = subprocess, + .fd = fd, + .source = source, + }); + } + pub const Source = union(enum) { blob: JSC.WebCore.Blob, array_buffer: JSC.ArrayBuffer.Strong, detached: void, + pub fn slice(this: *const Source) []const u8 { + return switch (this.*) { + .blob => this.blob.sharedView(), + .array_buffer => this.array_buffer.slice(), + else => @panic("Invalid source"), + }; + } + pub fn detach(this: *@This()) void { switch (this.*) { .blob => { @@ -755,9 +701,23 @@ pub const Subprocess = struct { done: []u8, err: bun.sys.Error, } = .{ .pending = {} }, + fd: bun.FileDescriptor = bun.invalid_fd, pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub fn detach(this: *PipeReader) void { + this.process = undefined; + this.deref(); + } + + pub fn create(event_loop: *JSC.EventLoop, process: *Subprocess, fd: bun.FileDescriptor) *PipeReader { + return PipeReader.new(.{ + .process = process, + .event_loop = event_loop, + .fd = fd, + }); + } + pub fn readAll(this: *PipeReader) void { if (this.state == .pending) this.reader.read(); @@ -788,33 +748,46 @@ pub const Subprocess = struct { return out; } - pub fn toReadableStream(this: *PipeReader) JSC.JSValue { + pub fn setFd(this: *PipeReader, fd: bun.FileDescriptor) *PipeReader { + this.fd = fd; + return this; + } + + pub fn updateRef(this: *PipeReader, add: bool) void { + if (add) { + this.reader.updateRef(this.event_loop, true); + } else { + this.reader.updateRef(this.event_loop, false); + } + } + + pub fn toReadableStream(this: *PipeReader, globalObject: *JSC.JSGlobalObject) JSC.JSValue { switch (this.state) { .pending => { - const stream = JSC.WebCore.ReadableStream.fromPipe(this.event_loop.global, &this.reader); + const stream = JSC.WebCore.ReadableStream.fromPipe(globalObject, &this.reader); defer this.reader.deref(); this.state = .{ .done = .{} }; return stream; }, .done => |bytes| { - const blob = JSC.WebCore.Blob.init(bytes, bun.default_allocator, this.event_loop.global); + const blob = JSC.WebCore.Blob.init(bytes, bun.default_allocator, globalObject); this.state = .{ .done = .{} }; - return JSC.WebCore.ReadableStream.fromBlob(this.event_loop.global, &blob, 0); + return JSC.WebCore.ReadableStream.fromBlob(globalObject, &blob, 0); }, .err => |err| { _ = err; // autofix - const empty = JSC.WebCore.ReadableStream.empty(this.event_loop.global); - JSC.WebCore.ReadableStream.cancel(JSC.WebCore.ReadableStream.fromJS(empty, this.event_loop.global), this.event_loop.global); + const empty = JSC.WebCore.ReadableStream.empty(globalObject); + JSC.WebCore.ReadableStream.cancel(JSC.WebCore.ReadableStream.fromJS(empty, globalObject), globalObject); return empty; }, } } - pub fn toBuffer(this: *PipeReader) JSC.JSValue { + pub fn toBuffer(this: *PipeReader, globalThis: *JSC.JSGlobalObject) JSC.JSValue { switch (this.state) { .done => |bytes| { defer this.state = .{ .done = &.{} }; - return JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(this.event_loop.global); + return JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); }, else => { return JSC.JSValue.undefined; @@ -823,16 +796,19 @@ pub const Subprocess = struct { } pub fn onOutputError(this: *PipeReader, err: bun.sys.Error) void { + if (this.state == .done) { + bun.default_allocator.free(this.state.done); + } this.state = .{ .err = err }; this.process.onCloseIO(this.kind()); } fn kind(this: *const PipeReader) StdioKind { - if (this.process.stdout == .pipe and this.process.stdout.sync_buffered_output == this) { - // are we stdout? + if (this.process.stdout == .pipe and this.process.stdout.pipe == this) { return .stdout; - } else if (this.process.stderr == .sync_buffered_output and this.process.stderr.sync_buffered_output == this) { - // are we stderr? + } + + if (this.process.stderr == .pipe and this.process.stderr.pipe == this) { return .stderr; } @@ -870,32 +846,35 @@ pub const Subprocess = struct { bun.default_allocator.free(this.state.done); } + this.reader.deinit(); this.destroy(); } }; - const SinkType = if (Environment.isWindows) *JSC.WebCore.UVStreamSink else *JSC.WebCore.FileSink; - const BufferedInputType = BufferedInput; const Writable = union(enum) { - pipe: SinkType, - pipe: struct { - pipe: SinkType, - readable_stream: JSC.WebCore.ReadableStream, - }, + pipe: *JSC.WebCore.PipeSink, fd: bun.FileDescriptor, - buffered_input: BufferedInputType, + buffer: *StaticPipeWriter, memfd: bun.FileDescriptor, inherit: void, ignore: void, + pub fn hasPendingActivity(this: *Writable) bool { + return switch (this.*) { + // we mark them as .ignore when they are closed, so this must be true + .pipe => true, + .buffer => true, + else => false, + }; + } + pub fn ref(this: *Writable) void { switch (this.*) { .pipe => { - if (Environment.isWindows) { - _ = uv.uv_ref(@ptrCast(this.pipe.stream)); - } else if (this.pipe.poll_ref) |poll| { - poll.enableKeepingProcessAlive(JSC.VirtualMachine.get()); - } + this.pipe.updateRef(true); + }, + .buffer => { + this.buffer.updateRef(true); }, else => {}, } @@ -904,11 +883,10 @@ pub const Subprocess = struct { pub fn unref(this: *Writable) void { switch (this.*) { .pipe => { - if (Environment.isWindows) { - _ = uv.uv_unref(@ptrCast(this.pipe.stream)); - } else if (this.pipe.poll_ref) |poll| { - poll.disableKeepingProcessAlive(JSC.VirtualMachine.get()); - } + this.pipe.updateRef(false); + }, + .buffer => { + this.buffer.updateRef(false); }, else => {}, } @@ -917,6 +895,15 @@ pub const Subprocess = struct { // When the stream has closed we need to be notified to prevent a use-after-free // We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice pub fn onClose(this: *Writable, _: ?bun.sys.Error) void { + switch (this.*) { + .buffer => { + this.buffer.deref(); + }, + .pipe => { + this.pipe.deref(); + }, + else => {}, + } this.* = .{ .ignore = {}, }; @@ -924,7 +911,7 @@ pub const Subprocess = struct { pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {} pub fn onStart(_: *Writable) void {} - pub fn init(stdio: Stdio, fd: ?bun.FileDescriptor, globalThis: *JSC.JSGlobalObject) !Writable { + pub fn init(stdio: Stdio, event_loop: *JSC.EventLoop, subprocess: *Subprocess, fd: ?bun.FileDescriptor) !Writable { if (comptime Environment.allow_assert) { if (fd) |fd_| { std.debug.assert(fd_ != bun.invalid_fd); @@ -932,44 +919,21 @@ pub const Subprocess = struct { } switch (stdio) { - .pipe => |maybe_readable| { - if (Environment.isWindows) @panic("TODO"); - var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink); - sink.* = .{ - .fd = fd.?, - .buffer = bun.ByteList{}, - .allocator = globalThis.bunVM().allocator, - .auto_close = true, + .pipe => { + return Writable{ + .pipe = JSC.WebCore.PipeSink.create(event_loop, fd.?), }; - sink.mode = bun.S.IFIFO; - sink.watch(fd.?); - if (maybe_readable) |readable| { - return Writable{ - .pipe_to_readable_stream = .{ - .pipe = sink, - .readable_stream = readable, - }, - }; - } + }, - return Writable{ .pipe = sink }; + .blob => |blob| { + return Writable{ + .buffer = StaticPipeWriter.create(event_loop, subprocess, fd, .{ .blob = blob }), + }; }, - .sync_buffered_output => |buffer| { - _ = buffer; // autofix - @panic("This should never be called"); - }, - .array_buffer, .blob => { - var buffered_input: BufferedInput = .{ .fd = fd.?, .source = undefined }; - switch (stdio) { - .array_buffer => |array_buffer| { - buffered_input.source = .{ .array_buffer = array_buffer }; - }, - .blob => |blob| { - buffered_input.source = .{ .blob = blob }; - }, - else => unreachable, - } - return Writable{ .buffered_input = buffered_input }; + .array_buffer => |array_buffer| { + return Writable{ + .buffer = StaticPipeWriter.create(event_loop, subprocess, .{ .array_buffer = array_buffer }), + }; }, .memfd => |memfd| { std.debug.assert(memfd != bun.invalid_fd); @@ -988,82 +952,49 @@ pub const Subprocess = struct { } } - pub fn toJS(this: Writable, globalThis: *JSC.JSGlobalObject) JSValue { - return switch (this) { - .pipe => |pipe| pipe.toJS(globalThis), + pub fn toJS(this: *Writable, globalThis: *JSC.JSGlobalObject) JSValue { + return switch (this.*) { .fd => |fd| JSValue.jsNumber(fd), .memfd, .ignore => JSValue.jsUndefined(), - .inherit => JSValue.jsUndefined(), - .buffered_input => JSValue.jsUndefined(), - .pipe_to_readable_stream => this.pipe_to_readable_stream.readable_stream.value, + .buffer, .inherit => JSValue.jsUndefined(), + .pipe => |pipe| { + this.* = .{ .ignore = {} }; + return pipe.toJS(globalThis); + }, }; } pub fn finalize(this: *Writable) void { return switch (this.*) { .pipe => |pipe| { - pipe.close(); + pipe.deref(); + + this.* = .{ .ignore = {} }; }, - .pipe_to_readable_stream => |*pipe_to_readable_stream| { - _ = pipe_to_readable_stream.pipe.end(null); + .buffer => { + this.buffer.updateRef(false); + this.buffer.deref(); }, .memfd => |fd| { _ = bun.sys.close(fd); this.* = .{ .ignore = {} }; }, - .buffered_input => { - this.buffered_input.deinit(); - }, .ignore => {}, .fd, .inherit => {}, }; } - pub fn setCloseCallbackIfPossible(this: *Writable, callback: CloseCallbackHandler) bool { - switch (this.*) { - .pipe => |pipe| { - if (Environment.isWindows) { - if (pipe.isClosed()) { - return false; - } - pipe.closeCallback = callback; - return true; - } - return false; - }, - .pipe_to_readable_stream => |*pipe_to_readable_stream| { - if (Environment.isWindows) { - if (pipe_to_readable_stream.pipe.isClosed()) { - return false; - } - pipe_to_readable_stream.pipe.closeCallback = callback; - return true; - } - return false; - }, - .buffered_input => { - if (Environment.isWindows) { - this.buffered_input.closeCallback = callback; - return true; - } - return false; - }, - else => return false, - } - } - pub fn close(this: *Writable) void { switch (this.*) { - .pipe => {}, - .pipe_to_readable_stream => |*pipe_to_readable_stream| { - _ = pipe_to_readable_stream.pipe.end(null); + .pipe => |pipe| { + pipe.end(null); }, inline .memfd, .fd => |fd| { _ = bun.sys.close(fd); this.* = .{ .ignore = {} }; }, - .buffered_input => { - this.buffered_input.deinit(); + .buffer => { + this.buffer.close(); }, .ignore => {}, .inherit => {}, @@ -1574,34 +1505,34 @@ pub const Subprocess = struct { return .zero; }; - if (comptime is_sync) { - if (stdio[1] == .pipe and stdio[1].pipe == null) { - stdio[1] = .{ .sync_buffered_output = BufferedOutput.new(.{}) }; - } + // if (comptime is_sync) { + // if (stdio[1] == .pipe and stdio[1].pipe == null) { + // stdio[1] = .{ .sync_buffered_output = BufferedOutput.new(.{}) }; + // } - if (stdio[2] == .pipe and stdio[2].pipe == null) { - stdio[2] = .{ .sync_buffered_output = BufferedOutput.new(.{}) }; - } - } else { - if (stdio[1] == .pipe and stdio[1].pipe == null) { - stdio[1] = .{ .buffer = {} }; - } + // if (stdio[2] == .pipe and stdio[2].pipe == null) { + // stdio[2] = .{ .sync_buffered_output = BufferedOutput.new(.{}) }; + // } + // } else { + // if (stdio[1] == .pipe and stdio[1].pipe == null) { + // stdio[1] = .{ .buffer = {} }; + // } - if (stdio[2] == .pipe and stdio[2].pipe == null) { - stdio[2] = .{ .buffer = {} }; - } - } - defer { - if (comptime is_sync) { - if (stdio[1] == .sync_buffered_output) { - stdio[1].sync_buffered_output.deref(); - } + // if (stdio[2] == .pipe and stdio[2].pipe == null) { + // stdio[2] = .{ .buffer = {} }; + // } + // } + // defer { + // if (comptime is_sync) { + // if (stdio[1] == .sync_buffered_output) { + // stdio[1].sync_buffered_output.deref(); + // } - if (stdio[2] == .sync_buffered_output) { - stdio[2].sync_buffered_output.deref(); - } - } - } + // if (stdio[2] == .sync_buffered_output) { + // stdio[2].sync_buffered_output.deref(); + // } + // } + // } const spawn_options = bun.spawn.SpawnOptions{ .cwd = cwd, @@ -1661,12 +1592,33 @@ pub const Subprocess = struct { is_sync, ), .pid_rusage = null, - .stdin = Writable.init(stdio[0], spawned.stdin, globalThis) catch { + .stdin = Writable.init( + stdio[0], + jsc_vm.eventLoop(), + subprocess, + spawned.stdin, + ) catch { globalThis.throwOutOfMemory(); return .zero; }, - .stdout = Readable.init(stdio[1], spawned.stdout, jsc_vm.allocator, default_max_buffer_size, is_sync), - .stderr = Readable.init(stdio[2], spawned.stderr, jsc_vm.allocator, default_max_buffer_size, is_sync), + .stdout = Readable.init( + jsc_vm.eventLoop(), + subprocess, + stdio[1], + spawned.stdout, + jsc_vm.allocator, + default_max_buffer_size, + is_sync, + ), + .stderr = Readable.init( + jsc_vm.eventLoop(), + subprocess, + stdio[2], + spawned.stderr, + jsc_vm.allocator, + default_max_buffer_size, + is_sync, + ), .stdio_pipes = spawned.extra_pipes.moveToUnmanaged(), .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .ipc_mode = ipc_mode, @@ -1715,23 +1667,19 @@ pub const Subprocess = struct { } } - if (subprocess.stdin == .buffered_input) { - subprocess.stdin.buffered_input.remain = switch (subprocess.stdin.buffered_input.source) { - .blob => subprocess.stdin.buffered_input.source.blob.slice(), - .array_buffer => |array_buffer| array_buffer.slice(), - }; - subprocess.stdin.buffered_input.writeIfPossible(is_sync); + if (subprocess.stdin == .buffer) { + subprocess.stdin.buffer.start(spawned.stdin.?, true); } - if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) { + if (subprocess.stdout == .pipe) { if (is_sync or !lazy) { - subprocess.stdout.pipe.buffer.readAll(); + subprocess.stdout.pipe.readAll(); } } - if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) { + if (subprocess.stderr == .pie) { if (is_sync or !lazy) { - subprocess.stderr.pipe.buffer.readAll(); + subprocess.stderr.pipe.readAll(); } } @@ -1741,14 +1689,6 @@ pub const Subprocess = struct { return out; } - if (subprocess.stdin == .buffered_input) { - while (subprocess.stdin.buffered_input.remain.len > 0) { - subprocess.stdin.buffered_input.writeIfPossible(true); - } - } - - subprocess.closeIO(.stdin); - if (comptime is_sync) { switch (subprocess.process.watch(jsc_vm)) { .result => {}, @@ -1759,12 +1699,16 @@ pub const Subprocess = struct { } while (!subprocess.hasExited()) { - if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) { - subprocess.stderr.pipe.buffer.readAll(); + if (subprocess.stdin == .buffer) { + subprocess.stdin.buffer.flush(); } - if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) { - subprocess.stdout.pipe.buffer.readAll(); + if (subprocess.stderr == .pipe) { + subprocess.stderr.pipe.readAll(); + } + + if (subprocess.stdout == .pipe) { + subprocess.stdout.pipe.readAll(); } jsc_vm.tick(); @@ -1797,10 +1741,9 @@ pub const Subprocess = struct { fd: bun.FileDescriptor, path: JSC.Node.PathLike, blob: JSC.WebCore.AnyBlob, - pipe: ?JSC.WebCore.ReadableStream, array_buffer: JSC.ArrayBuffer.Strong, memfd: bun.FileDescriptor, - sync_buffered_output: *BufferedOutput, + pipe: void, const PipeExtra = struct { fd: i32, @@ -1815,7 +1758,7 @@ pub const Subprocess = struct { return switch (this.*) { .blob => !this.blob.needsToReadFile(), .memfd, .array_buffer => true, - .pipe => |pipe| pipe == null and is_sync, + .pipe => is_sync, else => false, }; } @@ -1892,10 +1835,10 @@ pub const Subprocess = struct { } fn toPosix( - stdio: @This(), + stdio: *@This(), ) bun.spawn.SpawnOptions.Stdio { return switch (stdio) { - .array_buffer, .blob, .pipe => .{ .buffer = {} }, + .pipe, .array_buffer, .blob => .{ .buffer = {} }, .fd => |fd| .{ .pipe = fd }, .memfd => |fd| .{ .pipe = fd }, .path => |pathlike| .{ .path = pathlike.slice() }, @@ -1905,22 +1848,21 @@ pub const Subprocess = struct { } fn toWindows( - stdio: @This(), + stdio: *@This(), ) bun.spawn.SpawnOptions.Stdio { return switch (stdio) { - .array_buffer, .blob, .pipe => .{ .buffer = {} }, + .pipe, .array_buffer, .blob, .pipe => .{ .buffer = {} }, .fd => |fd| .{ .pipe = fd }, .path => |pathlike| .{ .path = pathlike.slice() }, .inherit => .{ .inherit = {} }, .ignore => .{ .ignore = {} }, - .sync_buffer => .{ .buffer = &stdio.sync_buffer.reader.pipe }, .memfd => @panic("This should never happen"), }; } pub fn asSpawnOption( - stdio: @This(), + stdio: *@This(), ) bun.spawn.SpawnOptions.Stdio { if (comptime Environment.isWindows) { return stdio.toWindows(); @@ -1928,54 +1870,6 @@ pub const Subprocess = struct { return stdio.toPosix(); } } - - fn setUpChildIoUvSpawn( - stdio: @This(), - std_fileno: i32, - pipe: *uv.Pipe, - isReadable: bool, - fd: bun.FileDescriptor, - ) !uv.uv_stdio_container_s { - return switch (stdio) { - .array_buffer, .blob, .pipe => { - if (uv.uv_pipe_init(uv.Loop.get(), pipe, 0) != 0) { - return error.FailedToCreatePipe; - } - if (fd != bun.invalid_fd) { - // we receive a FD so we open this into our pipe - if (uv.uv_pipe_open(pipe, bun.uvfdcast(fd)).errEnum()) |_| { - return error.FailedToCreatePipe; - } - return uv.uv_stdio_container_s{ - .flags = @intCast(uv.UV_INHERIT_STREAM), - .data = .{ .stream = @ptrCast(pipe) }, - }; - } - // we dont have any fd so we create a new pipe - return uv.uv_stdio_container_s{ - .flags = @intCast(uv.UV_CREATE_PIPE | if (isReadable) uv.UV_READABLE_PIPE else uv.UV_WRITABLE_PIPE), - .data = .{ .stream = @ptrCast(pipe) }, - }; - }, - .fd => |_fd| uv.uv_stdio_container_s{ - .flags = uv.UV_INHERIT_FD, - .data = .{ .fd = bun.uvfdcast(_fd) }, - }, - .path => |pathlike| { - _ = pathlike; - @panic("TODO"); - }, - .inherit => uv.uv_stdio_container_s{ - .flags = uv.UV_INHERIT_FD, - .data = .{ .fd = std_fileno }, - }, - .ignore => uv.uv_stdio_container_s{ - .flags = uv.UV_IGNORE, - .data = undefined, - }, - .memfd => unreachable, - }; - } }; fn extractStdioBlob( diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index e829fc396d..cb220276a1 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -65,6 +65,15 @@ pub fn Maybe(comptime ResultType: type) type { .result = std.mem.zeroes(ReturnType), }; + pub fn assert(this: @This()) ReturnType { + switch (this) { + .err => |err| { + bun.Output.panic("Unexpected error\n{}", .{err}); + }, + .result => |result| return result, + } + } + pub inline fn todo() @This() { if (Environment.allow_assert) { if (comptime ResultType == void) { diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 6e6ad5fd19..0fce3f78d6 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -189,6 +189,7 @@ pub const ReadableStream = struct { .Blob => |blob| blob.parent().decrementCount(), .File => |file| file.parent().decrementCount(), .Bytes => |bytes| bytes.parent().decrementCount(), + .Pipe => |bytes| bytes.parent().decrementCount(), else => 0, }; @@ -422,6 +423,7 @@ pub const StreamStart = union(Tag) { as_uint8array: bool, stream: bool, }, + PipeSink: void, FileSink: struct { chunk_size: Blob.SizeType = 16384, input_path: PathOrFileDescriptor, @@ -644,6 +646,11 @@ pub const StreamResult = union(Tag) { into_array_and_done, }; + pub fn slice16(this: *const StreamResult) []const u16 { + const bytes = this.slice(); + return @as([*]const u16, @ptrCast(@alignCast(bytes.ptr)))[0..std.mem.bytesAsSlice(u16, bytes).len]; + } + pub fn slice(this: *const StreamResult) []const u8 { return switch (this.*) { .owned => |owned| owned.slice(), @@ -673,6 +680,10 @@ pub const StreamResult = union(Tag) { consumed: Blob.SizeType = 0, state: StreamResult.Pending.State = .none, + pub fn deinit(_: *@This()) void { + // TODO: + } + pub const Future = union(enum) { promise: struct { promise: *JSPromise, @@ -1477,8 +1488,6 @@ pub fn NewFileSink(comptime EventLoop: JSC.EventLoopKind) type { while (remain.len > 0) { const write_buf = remain[0..@min(remain.len, max_to_write)]; const res = bun.sys.write(fd, write_buf); - // this does not fix the issue with writes not showing up - // const res = bun.sys.sys_uv.write(fd, write_buf); if (res == .err) { const retry = @@ -2632,7 +2641,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { defer { if ((comptime @hasField(SinkType, "done")) and this.sink.done) { - callframe.this().unprotect(); + this.unprotect(); } } @@ -3662,27 +3671,279 @@ pub fn ReadableStreamSource( }; } +pub const PipeSink = struct { + writer: bun.io.StreamingWriter(@This(), onWrite, onError, onReady, onClose) = .{}, + done: bool = false, + event_loop_handle: JSC.EventLoopHandle, + fd: bun.FileDescriptor = bun.invalid_fd, + written: usize = 0, + + pending: StreamResult.Writable.Pending = .{}, + signal: Signal = Signal{}, + + const log = Output.scoped(.Pipe); + + pub usingnamespace bun.NewRefCounted(PipeSink, deinit); + + pub fn onWrite(this: *PipeSink, amount: usize, done: bool) void { + log("onWrite({d}, {any})", .{ amount, done }); + this.written += amount; + if (this.pending.state == .pending) + this.pending.consumed += amount; + + if (done) { + if (this.pending.state == .pending) { + this.pending.result = .{ .owned = this.pending.consumed }; + this.pending.run(); + } + } + } + pub fn onError(this: *PipeSink, err: bun.sys.Error) void { + log("onError({any})", .{err}); + if (this.pending.state == .pending) { + this.pending.result = .{ .err = err }; + + this.pending.run(); + } + } + pub fn onReady(this: *PipeSink) void { + log("onReady()", .{}); + + this.signal.ready(null, null); + } + pub fn onClose(this: *PipeSink) void { + log("onClose()", .{}); + + this.signal.close(null); + } + + pub fn create( + event_loop: *JSC.EventLoop, + fd: bun.FileDescriptor, + ) *PipeSink { + return PipeSink.new(.{ + .event_loop_handle = JSC.EventLoopHandle.init(event_loop), + .fd = fd, + }); + } + + pub fn setup( + this: *PipeSink, + fd: bun.FileDescriptor, + ) void { + this.fd = fd; + this.writer.start(fd, true).assert(); + } + + pub fn loop(this: *PipeSink) *Async.Loop { + return this.event_loop_handle.loop(); + } + + pub fn eventLoop(this: *PipeSink) JSC.EventLoopHandle { + return this.event_loop_handle; + } + + pub fn connect(this: *PipeSink, signal: Signal) void { + this.signal = signal; + } + + pub fn start(this: *PipeSink, stream_start: StreamStart) JSC.Node.Maybe(void) { + switch (stream_start) { + .PipeSink => {}, + else => {}, + } + + this.done = false; + + this.signal.start(); + return .{ .result = {} }; + } + + pub fn flush(_: *PipeSink) JSC.Node.Maybe(void) { + return .{ .result = {} }; + } + + pub fn flushFromJS(this: *PipeSink, globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) { + _ = globalThis; // autofix + _ = wait; // autofix + if (this.done or this.pending.state == .pending) { + return .{ .result = JSC.JSValue.jsUndefined() }; + } + return this.toResult(this.writer.flush()); + } + + pub fn finalize(this: *PipeSink) void { + this.pending.deinit(); + this.deref(); + } + + pub fn init(fd: bun.FileDescriptor) *PipeSink { + return PipeSink.new(.{ + .writer = .{}, + .fd = fd, + }); + } + + pub fn construct( + this: *PipeSink, + allocator: std.mem.Allocator, + ) void { + _ = allocator; // autofix + this.* = PipeSink{ + .event_loop_handle = JSC.EventLoopHandle.init(JSC.VirtualMachine.get().eventLoop()), + }; + } + + pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable { + if (this.next) |*next| { + return next.writeBytes(data); + } + + return this.toResult(this.writer.write(data.slice())); + } + pub const writeBytes = write; + pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable { + if (this.next) |*next| { + return next.writeLatin1(data); + } + + return this.toResult(this.writer.writeLatin1(data.slice())); + } + pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable { + if (this.next) |*next| { + return next.writeUTF16(data); + } + + return this.toResult(this.writer.writeUTF16(data.slice16())); + } + + pub fn end(this: *PipeSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { + if (this.next) |*next| { + return next.end(err); + } + + switch (this.writer.flush()) { + .done => { + this.writer.end(); + return .{ .result = {} }; + }, + .err => |e| { + return .{ .err = e }; + }, + .pending => |pending_written| { + _ = pending_written; // autofix + this.ref(); + this.done = true; + this.writer.close(); + return .{ .result = {} }; + }, + .written => |written| { + _ = written; // autofix + this.writer.end(); + return .{ .result = {} }; + }, + } + } + pub fn deinit(this: *PipeSink) void { + this.writer.deinit(); + } + + pub fn toJS(this: *PipeSink, globalThis: *JSGlobalObject) JSValue { + return JSSink.createObject(globalThis, this); + } + + pub fn endFromJS(this: *PipeSink, globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) { + if (this.done) { + if (this.pending.state == .pending) { + return .{ .result = this.pending.future.promise.promise.asValue(globalThis) }; + } + + return .{ .result = JSValue.jsNumber(this.written) }; + } + + switch (this.writer.flush()) { + .done => { + this.writer.end(); + return .{ .result = JSValue.jsNumber(this.written) }; + }, + .err => |err| { + this.writer.close(); + return .{ .err = err }; + }, + .pending => |pending_written| { + this.written += pending_written; + this.done = true; + this.pending.result = .{ .owned = pending_written }; + return .{ .result = this.pending.promise(globalThis).asValue(globalThis) }; + }, + .written => |written| { + this.writer.end(); + return .{ .result = JSValue.jsNumber(written) }; + }, + } + } + + pub fn sink(this: *PipeSink) Sink { + return Sink.init(this); + } + + pub fn updateRef(this: *PipeSink, value: bool) void { + if (value) { + this.writer.enableKeepingProcessAlive(this.event_loop_handle); + } else { + this.writer.disableKeepingProcessAlive(this.event_loop_handle); + } + } + + pub const JSSink = NewJSSink(@This(), "PipeSink"); + + fn toResult(this: *PipeSink, write_result: bun.io.WriteResult) StreamResult.Writable { + switch (write_result) { + .done => |amt| { + if (amt > 0) + return .{ .owned_and_done = @truncate(amt) }; + + return .{ .done = {} }; + }, + .wrote => |amt| { + if (amt > 0) + return .{ .owned = @truncate(amt) }; + + return .{ .temporary = @truncate(amt) }; + }, + .err => |err| { + return .{ .err = err }; + }, + .pending => |pending_written| { + this.pending.consumed += pending_written; + this.pending.result = .{ .owned = pending_written }; + return .{ .pending = &this.pending }; + }, + } + } +}; + pub const PipeReader = struct { reader: bun.io.BufferedOutputReader(@This(), onReadChunk) = .{}, done: bool = false, pending: StreamResult.Pending = .{}, pending_value: JSC.Strong = .{}, pending_view: []u8 = []u8{}, + fd: bun.io.FileDescriptor = bun.invalid_fd, pub fn setup( this: *PipeReader, - other_reader: anytype, + fd: bun.io.FileDescriptor, ) void { this.* = PipeReader{ .reader = .{}, .done = false, + .fd = fd, }; - - this.reader.fromOutputReader(other_reader, this); } pub fn onStart(this: *PipeReader) StreamStart { - switch (this.reader.start()) { + switch (this.reader.start(this.fd, true)) { .result => {}, .err => |e| { return .{ .err = e }; @@ -3752,8 +4013,12 @@ pub const PipeReader = struct { this.pending_value.clear(); this.pending_view = &.{}; - if (buffer.len >= drained.len) { + if (buffer.len >= @as(usize, drained.len)) { @memcpy(buffer[0..drained.len], drained); + + // give it back! + this.reader.buffer().* = drained; + if (this.done) { return .{ .into_array_and_done = .{ .value = array, .len = drained.len } }; } else { @@ -3801,7 +4066,7 @@ pub const PipeReader = struct { pub const Source = ReadableStreamSource( @This(), - "ReadableStreamPipe", + "PipeReader", onStart, onPull, onCancel, @@ -4355,7 +4620,7 @@ pub const File = struct { var fd = if (file.pathlike != .path) // We will always need to close the file descriptor. switch (Syscall.dup(file.pathlike.fd)) { - .result => |_fd| if (Environment.isWindows) bun.toLibUVOwnedFD(_fd) else _fd, + .result => |_fd| _fd, .err => |err| { return .{ .err = err.withFd(file.pathlike.fd) }; }, @@ -4829,11 +5094,6 @@ pub const FileReader = struct { } else if (this.lazy_readable == .empty) return .{ .empty = {} }; - if (this.readable().* == .File) { - const chunk_size = this.readable().File.calculateChunkSize(std.math.maxInt(usize)); - return .{ .chunk_size = @as(Blob.SizeType, @truncate(chunk_size)) }; - } - return .{ .chunk_size = if (this.user_chunk_size == 0) default_fifo_chunk_size else this.user_chunk_size }; } diff --git a/src/codegen/generate-jssink.ts b/src/codegen/generate-jssink.ts index 48a0a07c8b..f530d4da13 100644 --- a/src/codegen/generate-jssink.ts +++ b/src/codegen/generate-jssink.ts @@ -1,6 +1,6 @@ import { resolve, join } from "path"; -const classes = ["ArrayBufferSink", "FileSink", "HTTPResponseSink", "HTTPSResponseSink", "UVStreamSink"]; +const classes = ["ArrayBufferSink", "FileSink", "HTTPResponseSink", "HTTPSResponseSink", "UVStreamSink", "PipeSink"]; function names(name) { return { diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index b5ccb070f9..3a9fbc4dac 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -195,11 +195,6 @@ pub fn PosixPipeReader( } pub fn close(this: *This) void { - const fd = getFd(this); - if (fd != bun.invalid_fd) { - _ = bun.sys.close(); - this.handle.getPoll().deinit(); - } vtable.done(this); } }; @@ -377,6 +372,10 @@ pub fn PosixBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?* } pub fn done(this: *PosixOutputReader) void { + if (this.handle != .closed) { + this.handle.close(this, done); + return; + } this.finish(); this.parent.onOutputDone(); } @@ -393,6 +392,7 @@ pub fn PosixBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?* pub fn registerPoll(this: *PosixOutputReader) void { const poll = this.handle.getPoll() orelse return; + poll.owner.set(this); switch (poll.register(this.parent.loop(), .readable, true)) { .err => |err| { this.onError(err); diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index 2ff54cc960..e22a3f6309 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -236,6 +236,14 @@ pub fn PosixBufferedWriter( this.handle.close(this.parent, onClose); } + pub fn updateRef(this: *PosixWriter, value: bool, event_loop: JSC.EventLoopHandle) void { + if (value) { + this.enableKeepingProcessAlive(event_loop); + } else { + this.disableKeepingProcessAlive(event_loop); + } + } + pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, bytes: []const u8, pollable: bool) JSC.Maybe(void) { this.buffer = bytes; if (!pollable) { @@ -244,9 +252,9 @@ pub fn PosixBufferedWriter( return JSC.Maybe(void){ .result = {} }; } const loop = @as(*Parent, @ptrCast(this.parent)).loop(); - var poll = this.poll orelse brk: { + var poll = this.getPoll() orelse brk: { this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) }; - break :brk this.poll.?; + break :brk this.handle.poll; }; switch (poll.registerWithFd(loop, .writable, true, fd)) { @@ -479,7 +487,7 @@ pub fn PosixStreamingWriter( } pub fn hasRef(this: *PosixWriter) bool { - const poll = this.poll orelse return false; + const poll = this.getPoll() orelse return false; return !this.is_done and poll.canEnableKeepingProcessAlive(); } @@ -495,6 +503,14 @@ pub fn PosixStreamingWriter( poll.disableKeepingProcessAlive(event_loop); } + pub fn updateRef(this: *PosixWriter, event_loop: JSC.EventLoopHandle, value: bool) void { + if (value) { + this.enableKeepingProcessAlive(event_loop); + } else { + this.disableKeepingProcessAlive(event_loop); + } + } + pub fn end(this: *PosixWriter) void { if (this.is_done) { return; @@ -516,9 +532,9 @@ pub fn PosixStreamingWriter( } const loop = @as(*Parent, @ptrCast(this.parent)).loop(); - var poll = this.poll orelse brk: { + var poll = this.getPoll() orelse brk: { this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) }; - break :brk this.poll.?; + break :brk this.handle.poll; }; switch (poll.registerWithFd(loop, .writable, true, fd)) { diff --git a/src/io/io.zig b/src/io/io.zig index fc2b388ad9..2849d1a809 100644 --- a/src/io/io.zig +++ b/src/io/io.zig @@ -929,4 +929,5 @@ pub const retry = bun.C.E.AGAIN; pub const PipeReader = @import("./PipeReader.zig").PipeReader; pub const BufferedOutputReader = @import("./PipeReader.zig").BufferedOutputReader; pub const BufferedWriter = @import("./PipeWriter.zig").BufferedWriter; +pub const WriteResult = @import("./PipeWriter.zig").WriteResult; pub const StreamingWriter = @import("./PipeWriter.zig").StreamingWriter; diff --git a/src/io/pipes.zig b/src/io/pipes.zig index cfac1b0794..4069378a9d 100644 --- a/src/io/pipes.zig +++ b/src/io/pipes.zig @@ -33,8 +33,11 @@ pub const PollOrFd = union(enum) { if (fd != bun.invalid_fd) { this.handle = .{ .closed = {} }; + _ = bun.sys.close(fd); if (comptime onCloseFn != void) onCloseFn(@ptrCast(ctx.?)); + } else { + this.handle = .{ .closed = {} }; } } };