diff --git a/src/async/posix_event_loop.zig b/src/async/posix_event_loop.zig index f366fde661..2c18b049a9 100644 --- a/src/async/posix_event_loop.zig +++ b/src/async/posix_event_loop.zig @@ -585,10 +585,10 @@ pub const FilePoll = struct { return; if (comptime @TypeOf(event_loop_ctx_) == JSC.EventLoopHandle) { - event_loop_ctx_.loop().addActive(@as(u32, @intFromBool(this.flags.contains(.has_incremented_active_count)))); + event_loop_ctx_.loop().addActive(@as(u32, @intFromBool(!this.flags.contains(.has_incremented_active_count)))); } else { const event_loop_ctx = JSC.AbstractVM(event_loop_ctx_); - event_loop_ctx.platformEventLoop().addActive(@as(u32, @intFromBool(this.flags.contains(.has_incremented_active_count)))); + event_loop_ctx.platformEventLoop().addActive(@as(u32, @intFromBool(!this.flags.contains(.has_incremented_active_count)))); } this.flags.insert(.keeps_event_loop_alive); diff --git a/src/bun.js/api/bun/process.zig b/src/bun.js/api/bun/process.zig index 2562c32bfb..a371f0e53d 100644 --- a/src/bun.js/api/bun/process.zig +++ b/src/bun.js/api/bun/process.zig @@ -180,12 +180,12 @@ pub const Process = struct { pub fn onExit(this: *Process, status: Status, rusage: *const Rusage) void { const exit_handler = this.exit_handler; - if (status == .exited or status == .err) { + this.status = status; + + if (this.hasExited()) { this.detach(); } - this.status = status; - exit_handler.call(Process, this, status, rusage); } @@ -318,6 +318,7 @@ pub const Process = struct { const watchfd = if (comptime Environment.isLinux) this.pidfd else this.pid; const poll = bun.Async.FilePoll.init(this.event_loop, bun.toFD(watchfd), .{}, Process, this); this.poller = .{ .fd = poll }; + this.poller.fd.enableKeepingProcessAlive(this.event_loop); switch (this.poller.fd.register( this.event_loop.loop(), @@ -325,11 +326,12 @@ pub const Process = struct { true, )) { .result => { - this.poller.fd.enableKeepingProcessAlive(this.event_loop); this.ref(); return JSC.Maybe(void){ .result = {} }; }, .err => |err| { + this.poller.fd.disableKeepingProcessAlive(this.event_loop); + if (err.getErrno() != .SRCH) { @panic("This shouldn't happen"); } diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 04763465a4..fcd1298249 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -140,6 +140,7 @@ pub const Subprocess = struct { stdin, stdout, stderr, + stdio, }) = .{}, closed: std.enums.EnumSet(enum { stdin, @@ -154,10 +155,9 @@ pub const Subprocess = struct { ipc: IPC.IPCData, flags: Flags = .{}, - pub const Flags = packed struct(u3) { + pub const Flags = packed struct { is_sync: bool = false, killed: bool = false, - waiting_for_onexit: bool = false, }; pub const SignalCode = bun.SignalCode; @@ -210,10 +210,6 @@ pub const Subprocess = struct { } pub fn hasPendingActivityNonThreadsafe(this: *const Subprocess) bool { - if (this.flags.waiting_for_onexit) { - return true; - } - if (this.ipc_mode != .none) { return true; } @@ -424,6 +420,7 @@ pub const Subprocess = struct { pub fn close(this: *Readable) void { switch (this.*) { inline .memfd, .fd => |fd| { + this.* = .{ .closed = {} }; _ = bun.sys.close(fd); }, .pipe => { @@ -452,6 +449,7 @@ pub const Subprocess = struct { pub fn finalize(this: *Readable) void { switch (this.*) { inline .memfd, .fd => |fd| { + this.* = .{ .closed = {} }; _ = bun.sys.close(fd); }, .pipe => |*pipe| { @@ -598,7 +596,6 @@ pub const Subprocess = struct { if (comptime !Environment.isLinux) { return; } - this.process.close(); } @@ -660,9 +657,10 @@ pub const Subprocess = struct { global: *JSGlobalObject, ) callconv(.C) JSValue { const array = JSValue.createEmptyArray(global, 0); + array.push(global, .null); array.push(global, .null); // TODO: align this with options array.push(global, .null); // TODO: align this with options - array.push(global, .null); // TODO: align this with options + this.observable_getters.insert(.stdio); for (this.stdio_pipes.items) |item| { const uno: u32 = @intCast(item.fileno); @@ -964,6 +962,8 @@ pub const Subprocess = struct { if (Environment.isWindows) { @compileError("Cannot use BufferedOutput with fd on Windows please use .initWithPipe"); } + + std.debug.assert(fd != .zero and fd != bun.invalid_fd); return BufferedOutput{ .internal_buffer = .{}, .stream = JSC.WebCore.FIFO{ @@ -1553,6 +1553,7 @@ pub const Subprocess = struct { return Writable{ .memfd = stdio.memfd }; }, .fd => { + std.debug.assert(fd != bun.invalid_fd); return Writable{ .fd = fd }; }, .inherit => { @@ -1651,42 +1652,20 @@ pub const Subprocess = struct { log("onProcessExit()", .{}); const this_jsvalue = this.this_jsvalue; const globalThis = this.globalThis; - defer this.updateHasPendingActivity(); this_jsvalue.ensureStillAlive(); this.pid_rusage = rusage.*; const is_sync = this.flags.is_sync; + _ = is_sync; // autofix + var must_drain_tasks = false; defer { - if (!is_sync) + this.updateHasPendingActivity(); + + if (must_drain_tasks) globalThis.bunVM().drainMicrotasks(); } - if (!is_sync and this.hasExited()) { - this.flags.waiting_for_onexit = true; - - const Holder = struct { - process: *Subprocess, - task: JSC.AnyTask, - - pub fn unref(self: *@This()) void { - // this calls disableKeepingProcessAlive on pool_ref and stdin, stdout, stderr - self.process.flags.waiting_for_onexit = false; - self.process.unref(true); - self.process.updateHasPendingActivity(); - bun.default_allocator.destroy(self); - } - }; - - var holder = bun.default_allocator.create(Holder) catch bun.outOfMemory(); - - holder.* = .{ - .process = this, - .task = JSC.AnyTask.New(Holder, Holder.unref).init(holder), - }; - - globalThis.bunVM().enqueueTask(JSC.Task.init(&holder.task)); - } - if (this.exit_promise.trySwap()) |promise| { + must_drain_tasks = true; switch (status) { .exited => |exited| promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(exited.code)), .err => |err| promise.asAnyPromise().?.reject(globalThis, err.toJSC(globalThis)), @@ -1700,6 +1679,7 @@ pub const Subprocess = struct { } if (this.on_exit_callback.trySwap()) |callback| { + must_drain_tasks = true; const waitpid_value: JSValue = if (status == .err) status.err.toJSC(globalThis) @@ -1728,21 +1708,6 @@ pub const Subprocess = struct { } } - fn closeIOCallback(this: *Subprocess) void { - log("closeIOCallback", .{}); - this.closed_streams += 1; - if (this.closed_streams == @TypeOf(this.closed).len) { - this.exit_promise.deinit(); - this.on_exit_callback.deinit(); - this.stdio_pipes.deinit(bun.default_allocator); - - if (this.deinit_onclose) { - log("destroy", .{}); - bun.default_allocator.destroy(this); - } - } - } - fn closeIO(this: *Subprocess, comptime io: @Type(.EnumLiteral)) void { if (this.closed.contains(io)) return; this.closed.insert(io); @@ -1754,19 +1719,11 @@ pub const Subprocess = struct { // 2. We need to free the memory // 3. We need to halt any pending reads (1) - const closeCallback = CloseCallbackHandler.init(this, @ptrCast(&Subprocess.closeIOCallback)); - const isAsync = @field(this, @tagName(io)).setCloseCallbackIfPossible(closeCallback); - if (!this.hasCalledGetter(io)) { @field(this, @tagName(io)).finalize(); } else { @field(this, @tagName(io)).close(); } - - if (!isAsync) { - // close is sync - closeCallback.run(); - } } // This must only be run once per Subprocess @@ -1777,11 +1734,25 @@ pub const Subprocess = struct { this.closeIO(.stdin); this.closeIO(.stdout); this.closeIO(.stderr); + + close_stdio_pipes: { + if (!this.observable_getters.contains(.stdio)) { + break :close_stdio_pipes; + } + + for (this.stdio_pipes.items) |pipe| { + _ = bun.sys.close(bun.toFD(pipe.fd)); + } + this.stdio_pipes.clearAndFree(bun.default_allocator); + } + + this.exit_promise.deinit(); + this.on_exit_callback.deinit(); } pub fn finalize(this: *Subprocess) callconv(.C) void { log("finalize", .{}); - std.debug.assert(!this.hasPendingActivity()); + std.debug.assert(!this.hasPendingActivity() or JSC.VirtualMachine.get().isShuttingDown()); this.finalizeStreams(); this.process.detach(); @@ -1793,28 +1764,24 @@ pub const Subprocess = struct { this: *Subprocess, globalThis: *JSGlobalObject, ) callconv(.C) JSValue { - if (this.hasExited()) { - switch (this.process.status) { - .exited => |exit| { - return JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(exit.code)); - }, - .signaled => |signal| { - return JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(signal.toExitCode() orelse 254)); - }, - .err => |err| { - return JSC.JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); - }, - else => { - @panic("Subprocess.getExited() has exited but has no exit code or signal code. This is a bug."); - }, - } - } + switch (this.process.status) { + .exited => |exit| { + return JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(exit.code)); + }, + .signaled => |signal| { + return JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(signal.toExitCode() orelse 254)); + }, + .err => |err| { + return JSC.JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); + }, + else => { + if (!this.exit_promise.has()) { + this.exit_promise.set(globalThis, JSC.JSPromise.create(globalThis).asValue(globalThis)); + } - if (!this.exit_promise.has()) { - this.exit_promise.set(globalThis, JSC.JSPromise.create(globalThis).asValue(globalThis)); + return this.exit_promise.get().?; + }, } - - return this.exit_promise.get().?; } pub fn getExitCode( @@ -2190,21 +2157,20 @@ pub const Subprocess = struct { } } - // TODO: move pipe2 to bun.sys so it can return [2]bun.FileDesriptor const stdin_pipe = if (stdio[0].isPiped()) bun.sys.pipe().unwrap() catch |err| { globalThis.throw("failed to create stdin pipe: {s}", .{@errorName(err)}); return .zero; - } else undefined; + } else .{ bun.invalid_fd, bun.invalid_fd }; const stdout_pipe = if (stdio[1].isPiped()) bun.sys.pipe().unwrap() catch |err| { globalThis.throw("failed to create stdout pipe: {s}", .{@errorName(err)}); return .zero; - } else undefined; + } else .{ bun.invalid_fd, bun.invalid_fd }; const stderr_pipe = if (stdio[2].isPiped()) bun.sys.pipe().unwrap() catch |err| { globalThis.throw("failed to create stderr pipe: {s}", .{@errorName(err)}); return .zero; - } else undefined; + } else .{ bun.invalid_fd, bun.invalid_fd }; stdio[0].setUpChildIoPosixSpawn( &actions, @@ -2313,13 +2279,13 @@ pub const Subprocess = struct { const raw_pid = brk: { defer { if (stdio[0].isPiped()) { - _ = bun.sys.close(bun.toFD(stdin_pipe[0])); + _ = bun.sys.close(stdin_pipe[0]); } if (stdio[1].isPiped()) { - _ = bun.sys.close(bun.toFD(stdout_pipe[1])); + _ = bun.sys.close(stdout_pipe[1]); } if (stdio[2].isPiped()) { - _ = bun.sys.close(bun.toFD(stderr_pipe[1])); + _ = bun.sys.close(stderr_pipe[1]); } // we always close these, but we want to close these earlier @@ -2396,6 +2362,7 @@ pub const Subprocess = struct { globalThis.throwOutOfMemory(); return .zero; }; + // When run synchronously, subprocess isn't garbage collected subprocess.* = Subprocess{ .globalThis = globalThis, @@ -2406,13 +2373,13 @@ pub const Subprocess = struct { is_sync, ), .pid_rusage = if (has_rusage) rusage_result else null, - .stdin = Writable.init(stdio[0], bun.toFD(stdin_pipe[1]), globalThis) catch { + .stdin = Writable.init(stdio[0], stdin_pipe[1], globalThis) catch { globalThis.throwOutOfMemory(); return .zero; }, // stdout and stderr only uses allocator and default_max_buffer_size if they are pipes and not a array buffer - .stdout = Readable.init(stdio[1], bun.toFD(stdout_pipe[0]), jsc_vm.allocator, default_max_buffer_size), - .stderr = Readable.init(stdio[2], bun.toFD(stderr_pipe[0]), jsc_vm.allocator, default_max_buffer_size), + .stdout = Readable.init(stdio[1], stdout_pipe[0], jsc_vm.allocator, default_max_buffer_size), + .stderr = Readable.init(stdio[2], stderr_pipe[0], jsc_vm.allocator, default_max_buffer_size), .stdio_pipes = stdio_pipes, .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .ipc_mode = ipc_mode, @@ -2457,7 +2424,6 @@ pub const Subprocess = struct { if (send_exit_notification) { // process has already exited // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007 - subprocess.process.deref(); // from the watch subprocess.process.wait(is_sync); } } @@ -2518,6 +2484,8 @@ pub const Subprocess = struct { jsc_vm.eventLoop().autoTick(); } + subprocess.updateHasPendingActivity(); + const exitCode = subprocess.getExitCode(globalThis); const stdout = subprocess.stdout.toBufferedValue(globalThis); const stderr = subprocess.stderr.toBufferedValue(globalThis); @@ -2817,13 +2785,24 @@ pub const Subprocess = struct { globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{}); return false; } + + out_stdio.* = Stdio{ .inherit = {} }; + return true; }, - .stdout, .stderr => { + .stdout, .stderr => |tag| { if (i == 0) { globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{}); return false; } + + if (i == 1 and tag == .stdout) { + out_stdio.* = .{ .inherit = {} }; + return true; + } else if (i == 2 and tag == .stderr) { + out_stdio.* = .{ .inherit = {} }; + return true; + } }, else => {}, } diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 15912379e3..8d01175036 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -504,7 +504,7 @@ pub const VirtualMachine = struct { hide_bun_stackframes: bool = true, is_printing_plugin: bool = false, - + is_shutting_down: bool = false, plugin_runner: ?PluginRunner = null, is_main_thread: bool = false, last_reported_error_for_dedupe: JSValue = .zero, @@ -625,6 +625,10 @@ pub const VirtualMachine = struct { return this.debugger != null; } + pub inline fn isShuttingDown(this: *const VirtualMachine) bool { + return this.is_shutting_down; + } + pub fn setOnException(this: *VirtualMachine, callback: *const OnException) void { this.on_exception = callback; } diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index e2e3908d4c..de3bd1b2bf 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -357,6 +357,7 @@ pub const WebWorker = struct { var vm_to_deinit: ?*JSC.VirtualMachine = null; if (this.vm) |vm| { this.vm = null; + vm.is_shutting_down = true; vm.onExit(); exit_code = vm.exit_handler.exit_code; globalObject = vm.global; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index a1db89e25a..25a8867e46 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -4466,7 +4466,7 @@ pub fn NewFIFO(comptime EventLoop: JSC.EventLoopKind) type { } if (result == 0) { - return .{ .read = buf[0..0] }; + return .{ .done = {} }; } return .{ .read = buf[0..result] }; }, diff --git a/test/js/bun/spawn/spawn.test.ts b/test/js/bun/spawn/spawn.test.ts index 860bb25fe2..6c290cc171 100644 --- a/test/js/bun/spawn/spawn.test.ts +++ b/test/js/bun/spawn/spawn.test.ts @@ -585,7 +585,7 @@ describe("spawn unref and kill should not hang", () => { }); async function runTest(sleep: string, order = ["sleep", "kill", "unref", "exited"]) { - console.log("running", order.join(",")); + console.log("running", order.join(","), "x 100"); for (let i = 0; i < 100; i++) { const proc = spawn({ cmd: ["sleep", sleep], @@ -625,31 +625,41 @@ async function runTest(sleep: string, order = ["sleep", "kill", "unref", "exited } describe("should not hang", () => { - for (let sleep of ["0.001", "0"]) { - describe("sleep " + sleep, () => { - for (let order of [ - ["sleep", "kill", "unref", "exited"], - ["sleep", "unref", "kill", "exited"], - ["kill", "sleep", "unref", "exited"], - ["kill", "unref", "sleep", "exited"], - ["unref", "sleep", "kill", "exited"], - ["unref", "kill", "sleep", "exited"], - ["exited", "sleep", "kill", "unref"], - ["exited", "sleep", "unref", "kill"], - ["exited", "kill", "sleep", "unref"], - ["exited", "kill", "unref", "sleep"], - ["exited", "unref", "sleep", "kill"], - ["exited", "unref", "kill", "sleep"], - ["unref", "exited"], - ["exited", "unref"], - ["kill", "exited"], - ["exited"], - ]) { - const name = order.join(","); - const fn = runTest.bind(undefined, sleep, order); - it(name, fn); - } - }); + for (let sleep of ["0", "0.1"]) { + it( + "sleep " + sleep, + () => { + const runs = []; + for (let order of [ + ["sleep", "kill", "unref", "exited"], + ["sleep", "unref", "kill", "exited"], + ["kill", "sleep", "unref", "exited"], + ["kill", "unref", "sleep", "exited"], + ["unref", "sleep", "kill", "exited"], + ["unref", "kill", "sleep", "exited"], + ["exited", "sleep", "kill", "unref"], + ["exited", "sleep", "unref", "kill"], + ["exited", "kill", "sleep", "unref"], + ["exited", "kill", "unref", "sleep"], + ["exited", "unref", "sleep", "kill"], + ["exited", "unref", "kill", "sleep"], + ["unref", "exited"], + ["exited", "unref"], + ["kill", "exited"], + ["exited"], + ]) { + runs.push( + runTest(sleep, order).catch(err => { + console.error("For order", JSON.stringify(order, null, 2)); + throw err; + }), + ); + } + + return Promise.all(runs); + }, + 128_000, + ); } }); diff --git a/test/js/third_party/esbuild/esbuild-child_process.test.ts b/test/js/third_party/esbuild/esbuild-child_process.test.ts index 70226c43e7..11485d9f87 100644 --- a/test/js/third_party/esbuild/esbuild-child_process.test.ts +++ b/test/js/third_party/esbuild/esbuild-child_process.test.ts @@ -4,15 +4,14 @@ import { describe, it, expect, test } from "bun:test"; import { bunEnv, bunExe } from "harness"; test("esbuild", () => { - const { exitCode, stderr, stdout } = spawnSync([bunExe(), import.meta.dir + "/esbuild-test.js"], { + const { exitCode } = spawnSync([bunExe(), import.meta.dir + "/esbuild-test.js"], { env: { ...bunEnv, }, + detached: true, + stdout: "inherit", + stderr: "inherit", + stdin: "inherit", }); - const out = "" + stderr?.toString() + stdout?.toString(); - if (exitCode !== 0 && out?.length) { - throw new Error(out); - } - expect(exitCode).toBe(0); });