diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 2d79260cdd..d2c7b0774e 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -140,7 +140,7 @@ pub const Subprocess = struct { stdin: Writable, stdout: Readable, stderr: Readable, - stdio_pipes: std.ArrayListUnmanaged(bun.FileDescriptor) = .{}, + stdio_pipes: if (Environment.isWindows) std.ArrayListUnmanaged(StdioResult) else std.ArrayListUnmanaged(bun.FileDescriptor) = .{}, pid_rusage: ?Rusage = null, exit_promise: JSC.Strong = .{}, @@ -649,7 +649,14 @@ pub const Subprocess = struct { } for (pipes) |item| { - array.push(global, JSValue.jsNumber(item.cast())); + if (Environment.isWindows) { + if (item == .buffer) { + const fdno: usize = @intFromPtr(item.buffer.fd().cast()); + array.push(global, JSValue.jsNumber(fdno)); + } + } else { + array.push(global, JSValue.jsNumber(item.cast())); + } } return array; } @@ -1318,6 +1325,11 @@ pub const Subprocess = struct { } } + fn onPipeClose(this: *uv.Pipe) callconv(.C) void { + // safely free the pipes + bun.default_allocator.destroy(this); + } + // This must only be run once per Subprocess pub fn finalizeStreams(this: *Subprocess) void { log("finalizeStreams", .{}); @@ -1332,8 +1344,14 @@ pub const Subprocess = struct { break :close_stdio_pipes; } - for (this.stdio_pipes.items) |pipe| { - _ = bun.sys.close(pipe); + for (this.stdio_pipes.items) |item| { + if (Environment.isWindows) { + if (item == .buffer) { + item.buffer.close(onPipeClose); + } + } else { + _ = bun.sys.close(item); + } } this.stdio_pipes.clearAndFree(bun.default_allocator); } @@ -1452,7 +1470,6 @@ pub const Subprocess = struct { var ipc_mode = IPCMode.none; var ipc_callback: JSValue = .zero; var extra_fds = std.ArrayList(bun.spawn.SpawnOptions.Stdio).init(bun.default_allocator); - // TODO: FIX extra_fds memory leak var argv0: ?[*:0]const u8 = null; var windows_hide: bool = false; @@ -1792,17 +1809,25 @@ pub const Subprocess = struct { } else {}, }; + const process_allocator = globalThis.allocator(); + var subprocess = process_allocator.create(Subprocess) catch { + globalThis.throwOutOfMemory(); + return .zero; + }; + var spawned = switch (bun.spawn.spawnProcess( &spawn_options, @ptrCast(argv.items.ptr), @ptrCast(env_array.items.ptr), ) catch |err| { + process_allocator.destroy(subprocess); spawn_options.deinit(); globalThis.throwError(err, ": failed to spawn process"); return .zero; }) { .err => |err| { + process_allocator.destroy(subprocess); spawn_options.deinit(); globalThis.throwValue(err.toJSC(globalThis)); return .zero; @@ -1819,20 +1844,15 @@ pub const Subprocess = struct { @sizeOf(*Subprocess), spawned.extra_pipes.items[0].cast(), ) orelse { + process_allocator.destroy(subprocess); + spawn_options.deinit(); globalThis.throw("failed to create socket pair", .{}); - // TODO: return .zero; }, }; } } - var subprocess = globalThis.allocator().create(Subprocess) catch { - // TODO: fix pipe memory leak in spawn_options/spawned - globalThis.throwOutOfMemory(); - return .zero; - }; - const loop = jsc_vm.eventLoop(); // When run synchronously, subprocess isn't garbage collected @@ -1870,8 +1890,7 @@ pub const Subprocess = struct { default_max_buffer_size, is_sync, ), - // TODO: extra pipes on windows - .stdio_pipes = if (Environment.isWindows) .{} else spawned.extra_pipes.moveToUnmanaged(), + .stdio_pipes = spawned.extra_pipes.moveToUnmanaged(), .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .ipc_mode = ipc_mode, // will be assigned in the block below @@ -1889,7 +1908,7 @@ pub const Subprocess = struct { ptr.?.* = subprocess; } else { if (subprocess.ipc.configureServer(Subprocess, subprocess, ipc_info[20..]).asErr()) |err| { - globalThis.allocator().destroy(subprocess); + process_allocator.destroy(subprocess); globalThis.throwValue(err.toJSC(globalThis)); return .zero; } diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index 01127ed85e..75ae381c5f 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -468,7 +468,7 @@ fn ReqMixin(comptime Type: type) type { uv_req_set_data(@ptrCast(handle), ptr); } pub fn cancel(this: *Type) void { - uv_cancel(@ptrCast(this)); + _ = uv_cancel(@ptrCast(this)); } }; } @@ -1712,6 +1712,7 @@ pub const fs_t = extern struct { sys_errno_: DWORD, file: union_unnamed_450, fs: union_unnamed_451, + pub usingnamespace ReqMixin(@This()); pub inline fn deinit(this: *fs_t) void { this.assert(); @@ -2723,63 +2724,6 @@ pub const ReturnCodeI64 = enum(i64) { pub const addrinfo = std.os.windows.ws2_32.addrinfo; -pub fn StreamReaderMixin(comptime Type: type, comptime pipe_field_name: std.meta.FieldEnum(Type)) type { - return struct { - fn uv_alloc_cb(pipe: *uv_stream_t, suggested_size: usize, buf: *uv_buf_t) callconv(.C) void { - var this = bun.cast(*Type, pipe.data); - const result = this.getReadBufferWithStableMemoryAddress(suggested_size); - buf.* = uv_buf_t.init(result); - } - - fn uv_read_cb(pipe: *uv_stream_t, nread: ReturnCodeI64, buf: *const uv_buf_t) callconv(.C) void { - var this = bun.cast(*Type, pipe.data); - - const read = nread.int(); - - switch (read) { - 0 => { - // EAGAIN or EWOULDBLOCK - return this.onRead(.{ .result = 0 }, buf, .drained); - }, - UV_EOF => { - // EOF - return this.onRead(.{ .result = 0 }, buf, .eof); - }, - else => { - this.onRead(if (nread.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(read) }, buf, .progress); - }, - } - } - - fn __get_pipe(this: *Type) ?*uv_stream_t { - switch (@TypeOf(@field(this, @tagName(pipe_field_name)))) { - ?*Pipe, ?*uv_tcp_t, ?*uv_tty_t => return if (@field(this, @tagName(pipe_field_name))) |ptr| @ptrCast(ptr) else null, - *Pipe, *uv_tcp_t, *uv_tty_t => return @ptrCast(@field(this, @tagName(pipe_field_name))), - Pipe, uv_tcp_t, uv_tty_t => return @ptrCast(&@field(this, @tagName(pipe_field_name))), - else => @compileError("StreamWriterMixin only works with Pipe, uv_tcp_t, uv_tty_t"), - } - } - - pub fn startReading(this: *Type) Maybe(void) { - const pipe = __get_pipe(this) orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe) }; - - //TODO: change to pipe.readStart - if (uv_read_start(pipe, @ptrCast(&@This().uv_alloc_cb), @ptrCast(&@This().uv_read_cb)).toError(.open)) |err| { - return .{ .err = err }; - } - - return .{ .result = {} }; - } - - pub fn stopReading(this: *Type) Maybe(void) { - const pipe = __get_pipe(this) orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe) }; - pipe.readStop(); - - return .{ .result = {} }; - } - }; -} - // https://docs.libuv.org/en/v1.x/stream.html fn StreamMixin(comptime Type: type) type { return struct { diff --git a/src/fd.zig b/src/fd.zig index 20faf783dd..b8bac224ae 100644 --- a/src/fd.zig +++ b/src/fd.zig @@ -34,13 +34,15 @@ fn numberToHandle(handle: FDImpl.SystemAsInt) FDImpl.System { pub fn uv_get_osfhandle(in: c_int) libuv.uv_os_fd_t { const out = libuv.uv_get_osfhandle(in); - log("uv_get_osfhandle({d}) = {d}", .{ in, @intFromPtr(out) }); + // TODO: this is causing a dead lock because is also used on fd format + // log("uv_get_osfhandle({d}) = {d}", .{ in, @intFromPtr(out) }); return out; } pub fn uv_open_osfhandle(in: libuv.uv_os_fd_t) c_int { const out = libuv.uv_open_osfhandle(in); - log("uv_open_osfhandle({d}) = {d}", .{ @intFromPtr(in), out }); + // TODO: this is causing a dead lock because is also used on fd format + // log("uv_open_osfhandle({d}) = {d}", .{ @intFromPtr(in), out }); return out; } diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index bf65312170..261ea86ea6 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -329,66 +329,100 @@ pub fn WindowsPipeReader( comptime onError: fn (*This, bun.sys.Error) void, ) type { return struct { - fn uv_alloc_cb(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void { + fn onStreamAlloc(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void { var this = bun.cast(*This, handle.data); const result = this.getReadBufferWithStableMemoryAddress(suggested_size); buf.* = uv.uv_buf_t.init(result); } - fn uv_stream_read_cb(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void { + fn onStreamRead(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void { var this = bun.cast(*This, stream.data); const nread_int = nread.int(); - + //NOTE: pipes/tty need to call stopReading on errors (yeah) switch (nread_int) { 0 => { - // EAGAIN or EWOULDBLOCK or canceled - return this.onRead(.{ .result = 0 }, buf, .drained); + // EAGAIN or EWOULDBLOCK or canceled (buf is not safe to access here) + return this.onRead(.{ .result = 0 }, "", .drained); }, uv.UV_EOF => { - // EOF - return this.onRead(.{ .result = 0 }, buf, .eof); + _ = this.stopReading(); + // EOF (buf is not safe to access here) + return this.onRead(.{ .result = 0 }, "", .eof); }, else => { - this.onRead(if (nread.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress); + if (nread.toError(.recv)) |err| { + _ = this.stopReading(); + // ERROR (buf is not safe to access here) + this.onRead(.{ .err = err }, "", .progress); + return; + } + // we got some data we can slice the buffer! + const len: usize = @intCast(nread_int); + var slice = buf.slice(); + this.onRead(.{ .result = len }, slice[0..len], .progress); }, } } - fn uv_file_read_cb(fs: *uv.fs_t) callconv(.C) void { + fn onFileRead(fs: *uv.fs_t) callconv(.C) void { var this: *This = bun.cast(*This, fs.data); - const nread_int = fs.result.int(); - const buf = &this.*.source.?.file.iov; + const continue_reading = !this.is_paused; + this.is_paused = true; switch (nread_int) { - 0, uv.UV_ECANCELED => - // EAGAIN or EWOULDBLOCK or canceled - this.onRead(.{ .result = 0 }, buf, .drained), - uv.UV_EOF => - // EOF - this.onRead(.{ .result = 0 }, buf, .eof), - else => this.onRead(if (fs.result.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress), + // 0 actually means EOF too + 0, uv.UV_EOF => { + this.onRead(.{ .result = 0 }, "", .eof); + }, + uv.UV_ECANCELED => { + this.onRead(.{ .result = 0 }, "", .drained); + }, + else => { + if (fs.result.toError(.recv)) |err| { + this.onRead(.{ .err = err }, "", .progress); + return; + } + // continue reading + defer { + if (continue_reading) { + _ = this.startReading(); + } + } + + const len: usize = @intCast(nread_int); + // we got some data lets get the current iov + if (this.source) |source| { + if (source == .file) { + var buf = source.file.iov.slice(); + return this.onRead(.{ .result = len }, buf[0..len], .progress); + } + } + // ops we should not hit this lets fail with EPIPE + std.debug.assert(false); + return this.onRead(.{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .read) }, "", .progress); + }, } - uv.uv_fs_req_cleanup(fs); } pub fn startReading(this: *This) bun.JSC.Maybe(void) { + if (this.flags.is_done or !this.is_paused) return .{ .result = {} }; + this.is_paused = false; const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.BADF, .read) }; switch (source) { .file => |file| { - if (file.iov.len == 0) { - const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); - file.iov = uv.uv_buf_t.init(buf); - std.debug.assert(file.iov.len > 0); - } - if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, uv_file_read_cb).toError(.write)) |err| { + file.fs.deinit(); + source.setData(this); + const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); + file.iov = uv.uv_buf_t.init(buf); + if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| { return .{ .err = err }; } }, else => { - if (uv.uv_read_start(source.toStream(), &uv_alloc_cb, @ptrCast(&uv_stream_read_cb)).toError(.open)) |err| { + if (uv.uv_read_start(source.toStream(), &onStreamAlloc, @ptrCast(&onStreamRead)).toError(.open)) |err| { return .{ .err = err }; } }, @@ -398,14 +432,15 @@ pub fn WindowsPipeReader( } pub fn stopReading(this: *This) bun.JSC.Maybe(void) { + if (this.flags.is_done or this.is_paused) return .{ .result = {} }; + this.is_paused = true; const source = this.source orelse return .{ .result = {} }; switch (source) { .file => |file| { - _ = uv.uv_cancel(@ptrCast(&file.fs)); + file.fs.cancel(); }, else => { - // can be safely ignored as per libuv documentation - _ = uv.uv_read_stop(source.toStream()); + source.toStream().readStop(); }, } return .{ .result = {} }; @@ -414,7 +449,23 @@ pub fn WindowsPipeReader( pub fn close(this: *This) void { _ = this.stopReading(); if (this.source) |source| { - source.getHandle().close(onCloseSource); + switch (source) { + .file => |file| { + file.fs.deinit(); + file.fs.data = file; + _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&onFileClose)); + }, + .pipe => |pipe| { + pipe.data = pipe; + pipe.close(onPipeClose); + }, + .tty => |tty| { + tty.data = tty; + tty.close(onTTYClose); + }, + } + this.source = null; + done(this); } } @@ -425,41 +476,55 @@ pub fn WindowsPipeReader( .onError = onError, }; - fn onCloseSource(handle: *uv.Handle) callconv(.C) void { - const this = bun.cast(*This, handle.data); - switch (this.source.?) { - .file => |file| uv.uv_fs_req_cleanup(&file.fs), - else => {}, - } - done(this); + fn onFileClose(handle: *uv.fs_t) callconv(.C) void { + const file = bun.cast(*Source.File, handle.data); + file.fs.deinit(); + bun.default_allocator.destroy(file); } - pub fn onRead(this: *This, amount: bun.JSC.Maybe(usize), buf: *const uv.uv_buf_t, hasMore: ReadState) void { + fn onPipeClose(handle: *uv.Pipe) callconv(.C) void { + const this = bun.cast(*uv.Pipe, handle.data); + bun.default_allocator.destroy(this); + } + + fn onTTYClose(handle: *uv.uv_tty_t) callconv(.C) void { + const this = bun.cast(*uv.uv_tty_t, handle.data); + bun.default_allocator.destroy(this); + } + + pub fn onRead(this: *This, amount: bun.JSC.Maybe(usize), slice: []u8, hasMore: ReadState) void { if (amount == .err) { onError(this, amount.err); return; } - if (hasMore == .eof) { - _ = onReadChunk(this, "", hasMore); - close(this); - return; - } - - var buffer = getBuffer(this); - - if (comptime bun.Environment.allow_assert) { - if (!bun.isSliceInBuffer(buf.slice()[0..amount.result], buffer.allocatedSlice())) { - std.debug.print("buf len: {d}, buffer ln: {d}\n", .{ buf.slice().len, buffer.allocatedSlice().len }); - @panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it."); - } - } - - buffer.items.len += amount.result; - - const keep_reading = onReadChunk(this, buf.slice()[0..amount.result], hasMore); - if (!keep_reading) { - close(this); + switch (hasMore) { + .eof => { + // we call report EOF and close + _ = onReadChunk(this, slice, hasMore); + close(this); + }, + .drained => { + // we call drained so we know if we should stop here + const keep_reading = onReadChunk(this, slice, hasMore); + if (!keep_reading) { + close(this); + } + }, + else => { + var buffer = getBuffer(this); + if (comptime bun.Environment.allow_assert) { + if (slice.len > 0 and !bun.isSliceInBuffer(slice, buffer.allocatedSlice())) { + @panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it."); + } + } + // move cursor foward + buffer.items.len += amount.result; + const keep_reading = onReadChunk(this, slice, hasMore); + if (!keep_reading) { + close(this); + } + }, } } @@ -810,6 +875,7 @@ pub const WindowsBufferedReader = struct { flags: Flags = .{}, has_inflight_read: bool = false, + is_paused: bool = true, parent: *anyopaque = undefined, vtable: WindowsOutputReaderVTable = undefined, ref_count: u32 = 1, @@ -911,7 +977,7 @@ pub const WindowsBufferedReader = struct { pub fn hasPendingActivity(this: *const WindowsOutputReader) bool { const source = this.source orelse return false; - return source.isClosed(); + return source.isActive(); } pub fn hasPendingRead(this: *const WindowsOutputReader) bool { @@ -929,7 +995,6 @@ pub const WindowsBufferedReader = struct { } fn finish(this: *WindowsOutputReader) void { - std.debug.assert(!this.flags.is_done); this.has_inflight_read = false; this.flags.is_done = true; } @@ -951,14 +1016,11 @@ pub const WindowsBufferedReader = struct { this.has_inflight_read = true; this._buffer.ensureUnusedCapacity(suggested_size) catch bun.outOfMemory(); const res = this._buffer.allocatedSlice()[this._buffer.items.len..]; - std.debug.print("getReadBufferWithStableMemoryAddress({d}) = {d}\n", .{ suggested_size, res.len }); return res; } pub fn startWithCurrentPipe(this: *WindowsOutputReader) bun.JSC.Maybe(void) { std.debug.assert(this.source != null); - - std.debug.print("clearRetainingCapacity\n", .{}); this.buffer().clearRetainingCapacity(); this.flags.is_done = false; this.unpause(); @@ -983,7 +1045,6 @@ pub const WindowsBufferedReader = struct { } pub fn deinit(this: *WindowsOutputReader) void { - std.debug.print("deinit\n", .{}); this.buffer().deinit(); const source = this.source orelse return; std.debug.assert(source.isClosed()); diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index 7dbd73db35..3207cd9ceb 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -730,16 +730,42 @@ fn BaseWindowsPipeWriter( this.updateRef(event_loop, false); } + fn onFileClose(handle: *uv.fs_t) callconv(.C) void { + const file = bun.cast(*Source.File, handle.data); + file.fs.deinit(); + bun.default_allocator.destroy(file); + } + + fn onPipeClose(handle: *uv.Pipe) callconv(.C) void { + const this = bun.cast(*uv.Pipe, handle.data); + bun.default_allocator.destroy(this); + } + + fn onTTYClose(handle: *uv.uv_tty_t) callconv(.C) void { + const this = bun.cast(*uv.uv_tty_t, handle.data); + bun.default_allocator.destroy(this); + } + pub fn close(this: *WindowsPipeWriter) void { this.is_done = true; if (this.source) |source| { - if (source == .file) { - uv.uv_fs_req_cleanup(&source.file.fs); - // TODO: handle this error instead of ignoring it - _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&WindowsPipeWriter.onCloseSource)); - return; + switch (source) { + .file => |file| { + file.fs.deinit(); + file.fs.data = file; + _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&onFileClose)); + }, + .pipe => |pipe| { + pipe.data = pipe; + pipe.close(onPipeClose); + }, + .tty => |tty| { + tty.data = tty; + tty.close(onTTYClose); + }, } - source.getHandle().close(&WindowsPipeWriter.onCloseSource); + this.source = null; + this.onCloseSource(); } } @@ -819,8 +845,7 @@ pub fn WindowsBufferedWriter( pub usingnamespace BaseWindowsPipeWriter(WindowsWriter, Parent); - fn onCloseSource(pipe: *uv.Handle) callconv(.C) void { - const this = bun.cast(*WindowsWriter, pipe.data); + fn onCloseSource(this: *WindowsWriter) void { if (onClose) |onCloseFn| { onCloseFn(this.parent); } @@ -881,8 +906,10 @@ pub fn WindowsBufferedWriter( switch (pipe) { .file => |file| { this.pending_payload_size = buffer.len; - uv.uv_fs_req_cleanup(&file.fs); + file.fs.deinit(); + file.fs.setData(this); this.write_buffer = uv.uv_buf_t.init(buffer); + if (uv.uv_fs_write(uv.Loop.get(), &file.fs, file.file, @ptrCast(&this.write_buffer), 1, -1, onFsWriteComplete).toError(.write)) |err| { this.close(); onError(this.parent, err); @@ -1028,8 +1055,7 @@ pub fn WindowsStreamingWriter( pub usingnamespace BaseWindowsPipeWriter(WindowsWriter, Parent); - fn onCloseSource(pipe: *uv.Handle) callconv(.C) void { - const this = bun.cast(*WindowsWriter, pipe.data); + fn onCloseSource(this: *WindowsWriter) void { this.source = null; if (!this.closed_without_reporting) { onClose(this.parent); @@ -1127,8 +1153,10 @@ pub fn WindowsStreamingWriter( this.outgoing = temp; switch (pipe) { .file => |file| { - uv.uv_fs_req_cleanup(&file.fs); + file.fs.deinit(); + file.fs.setData(this); this.write_buffer = uv.uv_buf_t.init(bytes); + if (uv.uv_fs_write(uv.Loop.get(), &file.fs, file.file, @ptrCast(&this.write_buffer), 1, -1, onFsWriteComplete).toError(.write)) |err| { this.last_write_result = .{ .err = err }; onError(this.parent, err); diff --git a/src/io/source.zig b/src/io/source.zig index 3a765602d8..1ed036f095 100644 --- a/src/io/source.zig +++ b/src/io/source.zig @@ -11,7 +11,7 @@ pub const Source = union(enum) { const Pipe = uv.Pipe; const Tty = uv.uv_tty_t; - const File = struct { + pub const File = struct { fs: uv.fs_t, iov: uv.uv_buf_t, file: uv.uv_file, @@ -29,7 +29,7 @@ pub const Source = union(enum) { switch (this) { .pipe => |pipe| return pipe.isActive(), .tty => |tty| return tty.isActive(), - .file => return false, + .file => return true, } } diff --git a/src/js/node/child_process.js b/src/js/node/child_process.js index 2da8d2cab0..029f0ced50 100644 --- a/src/js/node/child_process.js +++ b/src/js/node/child_process.js @@ -343,9 +343,9 @@ function execFile(file, args, options, callback) { if (options.timeout > 0) { timeoutId = setTimeout(function delayedKill() { - kill(); + timeoutId && kill(); timeoutId = null; - }, options.timeout); + }, options.timeout).unref(); } if (child.stdout) { @@ -980,7 +980,6 @@ function checkExecSyncError(ret, args, cmd) { //------------------------------------------------------------------------------ class ChildProcess extends EventEmitter { #handle; - #exited = false; #closesNeeded = 1; #closesGot = 0; @@ -998,26 +997,54 @@ class ChildProcess extends EventEmitter { // constructor(options) { // super(options); - // this.#handle[owner_symbol] = this; // } #handleOnExit(exitCode, signalCode, err) { - if (this.#exited) return; if (signalCode) { this.signalCode = signalCode; } else { this.exitCode = exitCode; } - if (this.#stdin) { - this.#stdin.destroy(); + // Drain stdio streams + { + if (this.#stdin) { + this.#stdin.destroy(); + } else { + this.#stdioOptions[0] = "destroyed"; + } + + // If there was an error while spawning the subprocess, then we will never have any IO to drain. + if (err) { + this.#stdioOptions[1] = this.#stdioOptions[2] = "destroyed"; + } + + const stdout = this.#stdout, + stderr = this.#stderr; + + if (stdout === undefined) { + this.#stdout = this.#getBunSpawnIo(1, this.#encoding, true); + } else if (stdout && this.#stdioOptions[1] === "pipe" && !stdout?.destroyed) { + stdout.resume?.(); + } + + if (stderr === undefined) { + this.#stderr = this.#getBunSpawnIo(2, this.#encoding, true); + } else if (stderr && this.#stdioOptions[2] === "pipe" && !stderr?.destroyed) { + stderr.resume?.(); + } } if (this.#handle) { this.#handle = null; } - if (exitCode < 0) { + if (err) { + if (this.spawnfile) err.path = this.spawnfile; + err.spawnargs = ArrayPrototypeSlice.$call(this.spawnargs, 1); + err.pid = this.pid; + this.emit("error", err); + } else if (exitCode < 0) { const err = new SystemError( `Spawned process exited with error code: ${exitCode}`, undefined, @@ -1025,29 +1052,20 @@ class ChildProcess extends EventEmitter { "EUNKNOWN", "ERR_CHILD_PROCESS_UNKNOWN_ERROR", ); + err.pid = this.pid; if (this.spawnfile) err.path = this.spawnfile; err.spawnargs = ArrayPrototypeSlice.$call(this.spawnargs, 1); this.emit("error", err); - } else { - this.emit("exit", this.exitCode, this.signalCode); } - // If any of the stdio streams have not been touched, - // then pull all the data through so that it can get the - // eof and emit a 'close' event. - // Do it on nextTick so that the user has one last chance - // to consume the output, if for example they only want to - // start reading the data once the process exits. - process.nextTick(flushStdio, this); + this.emit("exit", this.exitCode, this.signalCode); this.#maybeClose(); - this.#exited = true; - this.#stdioOptions = ["destroyed", "destroyed", "destroyed"]; } - #getBunSpawnIo(i, encoding) { + #getBunSpawnIo(i, encoding, autoResume = false) { if ($debug && !this.#handle) { if (this.#handle === null) { $debug("ChildProcess: getBunSpawnIo: this.#handle is null. This means the subprocess already exited"); @@ -1058,7 +1076,6 @@ class ChildProcess extends EventEmitter { NativeWritable ||= StreamModule.NativeWritable; ReadableFromWeb ||= StreamModule.Readable.fromWeb; - if (!NetModule) NetModule = require("node:net"); const io = this.#stdioOptions[i]; switch (i) { @@ -1077,8 +1094,13 @@ class ChildProcess extends EventEmitter { case 2: case 1: { switch (io) { - case "pipe": - return ReadableFromWeb(this.#handle[fdToStdioName(i)], { encoding }); + case "pipe": { + const pipe = ReadableFromWeb(this.#handle[fdToStdioName(i)], { encoding }); + this.#closesNeeded++; + pipe.once("close", () => this.#maybeClose()); + if (autoResume) pipe.resume(); + return pipe; + } case "inherit": return process[fdToStdioName(i)] || null; case "destroyed": @@ -1090,6 +1112,7 @@ class ChildProcess extends EventEmitter { default: switch (io) { case "pipe": + if (!NetModule) NetModule = require("node:net"); const fd = this.#handle.stdio[i]; if (!fd) return null; return new NetModule.connect({ fd }); @@ -1127,7 +1150,7 @@ class ChildProcess extends EventEmitter { result[i] = this.stderr; continue; default: - result[i] = this.#getBunSpawnIo(i, this.#encoding); + result[i] = this.#getBunSpawnIo(i, this.#encoding, false); continue; } } @@ -1135,15 +1158,15 @@ class ChildProcess extends EventEmitter { } get stdin() { - return (this.#stdin ??= this.#getBunSpawnIo(0, this.#encoding)); + return (this.#stdin ??= this.#getBunSpawnIo(0, this.#encoding, false)); } get stdout() { - return (this.#stdout ??= this.#getBunSpawnIo(1, this.#encoding)); + return (this.#stdout ??= this.#getBunSpawnIo(1, this.#encoding, false)); } get stderr() { - return (this.#stderr ??= this.#getBunSpawnIo(2, this.#encoding)); + return (this.#stderr ??= this.#getBunSpawnIo(2, this.#encoding, false)); } get stdio() { @@ -1201,6 +1224,8 @@ class ChildProcess extends EventEmitter { this.#stdioOptions = bunStdio; const stdioCount = stdio.length; const hasSocketsToEagerlyLoad = stdioCount >= 3; + this.#closesNeeded = 1; + this.#handle = Bun.spawn({ cmd: spawnargs, stdio: bunStdio, @@ -1302,8 +1327,6 @@ class ChildProcess extends EventEmitter { this.#handle.kill(signal); } - this.#maybeClose(); - // TODO: Figure out how to make this conform to the Node spec... // The problem is that the handle does not report killed until the process exits // So we can't return whether or not the process was killed because Bun.spawn seems to handle this async instead of sync like Node does @@ -1426,22 +1449,6 @@ function normalizeStdio(stdio) { } } -function flushStdio(subprocess) { - const stdio = subprocess.stdio; - if (stdio == null) return; - - for (let i = 0; i < stdio.length; i++) { - const stream = stdio[i]; - // TODO(addaleax): This doesn't necessarily account for all the ways in - // which data can be read from a stream, e.g. being consumed on the - // native layer directly as a StreamBase. - if (!stream || !stream.readable) { - continue; - } - stream.resume(); - } -} - function onSpawnNT(self) { self.emit("spawn"); } @@ -1465,12 +1472,30 @@ class ShimmedStdin extends EventEmitter { return false; } destroy() {} - end() {} - pipe() {} + end() { + return this; + } + pipe() { + return this; + } + resume() { + return this; + } } class ShimmedStdioOutStream extends EventEmitter { pipe() {} + get destroyed() { + return true; + } + + resume() { + return this; + } + + destroy() { + return this; + } } //------------------------------------------------------------------------------ diff --git a/test/js/node/child_process/child-process-stdio.test.js b/test/js/node/child_process/child-process-stdio.test.js index 36bf278b06..afa36f1f41 100644 --- a/test/js/node/child_process/child-process-stdio.test.js +++ b/test/js/node/child_process/child-process-stdio.test.js @@ -75,8 +75,7 @@ describe("process.stdin", () => { done(err); } }); - child.stdin.write(input); - child.stdin.end(); + child.stdin.end(input); }); it("should allow us to read > 65kb from stdin", done => { @@ -106,8 +105,7 @@ describe("process.stdin", () => { done(err); } }); - child.stdin.write(input); - child.stdin.end(); + child.stdin.end(input); }); it("should allow us to read from a file", () => { diff --git a/test/js/web/fetch/fetch.test.ts b/test/js/web/fetch/fetch.test.ts index fbb5cd720b..eb1328c50f 100644 --- a/test/js/web/fetch/fetch.test.ts +++ b/test/js/web/fetch/fetch.test.ts @@ -1234,10 +1234,10 @@ describe("Response", () => { }).toThrow("Body already used"); }); it("with Bun.file() streams", async () => { - var stream = Bun.file(import.meta.dir + "/fixtures/file.txt").stream(); + var stream = Bun.file(join(import.meta.dir, "fixtures/file.txt")).stream(); expect(stream instanceof ReadableStream).toBe(true); var input = new Response((await new Response(stream).blob()).stream()).arrayBuffer(); - var output = Bun.file(import.meta.dir + "/fixtures/file.txt").arrayBuffer(); + var output = Bun.file(join(import.meta.dir, "/fixtures/file.txt")).arrayBuffer(); expect(await input).toEqual(await output); }); it("with Bun.file() with request/response", async () => {