diff --git a/packages/bun-usockets/src/eventing/epoll_kqueue.c b/packages/bun-usockets/src/eventing/epoll_kqueue.c index dd69e1fe68..997fdf7e01 100644 --- a/packages/bun-usockets/src/eventing/epoll_kqueue.c +++ b/packages/bun-usockets/src/eventing/epoll_kqueue.c @@ -179,11 +179,11 @@ void bun_on_tick_after(void* ctx); void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs, void* tickCallbackContext) { - us_loop_integrate(loop); - if (loop->num_polls == 0) return; + us_loop_integrate(loop); + if (tickCallbackContext) { bun_on_tick_before(tickCallbackContext); } diff --git a/src/async/posix_event_loop.zig b/src/async/posix_event_loop.zig index 83f6bde435..8b2de740cb 100644 --- a/src/async/posix_event_loop.zig +++ b/src/async/posix_event_loop.zig @@ -32,8 +32,7 @@ pub const KeepAlive = struct { return; this.status = .inactive; - loop.num_polls -= 1; - loop.active -|= 1; + loop.subActive(1); } /// Only intended to be used from EventLoop.Pollable @@ -42,8 +41,7 @@ pub const KeepAlive = struct { return; this.status = .active; - loop.num_polls += 1; - loop.active += 1; + loop.addActive(1); } pub fn init() KeepAlive { @@ -55,7 +53,7 @@ pub const KeepAlive = struct { if (this.status != .active) return; this.status = .inactive; - vm.event_loop_handle.?.unref(); + vm.event_loop_handle.?.subActive(1); } /// From another thread, Prevent a poll from keeping the process alive. @@ -159,17 +157,17 @@ pub const FilePoll = struct { poll.flags = flags; } - pub fn onKQueueEvent(poll: *FilePoll, loop: *Loop, kqueue_event: *const std.os.system.kevent64_s) void { + pub fn onKQueueEvent(poll: *FilePoll, _: *Loop, kqueue_event: *const std.os.system.kevent64_s) void { if (KQueueGenerationNumber != u0) std.debug.assert(poll.generation_number == kqueue_event.ext[0]); poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*)); - poll.onUpdate(loop, kqueue_event.data); + poll.onUpdate(kqueue_event.data); } - pub fn onEpollEvent(poll: *FilePoll, loop: *Loop, epoll_event: *std.os.linux.epoll_event) void { + pub fn onEpollEvent(poll: *FilePoll, _: *Loop, epoll_event: *std.os.linux.epoll_event) void { poll.updateFlags(Flags.fromEpollEvent(epoll_event.*)); - poll.onUpdate(loop, 0); + poll.onUpdate(0); } pub fn clearEvent(poll: *FilePoll, flag: Flags) void { @@ -213,9 +211,7 @@ pub const FilePoll = struct { } fn deinitPossiblyDefer(this: *FilePoll, vm: *JSC.VirtualMachine, loop: *Loop, polls: *FilePoll.Store, force_unregister: bool) void { - if (this.isRegistered()) { - _ = this.unregister(loop, force_unregister); - } + _ = this.unregister(loop, force_unregister); this.owner = Deactivated.owner; const was_ever_registered = this.flags.contains(.was_ever_registered); @@ -235,14 +231,11 @@ pub const FilePoll = struct { const kqueue_or_epoll = if (Environment.isMac) "kevent" else "epoll"; - pub fn onUpdate(poll: *FilePoll, loop: *Loop, size_or_offset: i64) void { + pub fn onUpdate(poll: *FilePoll, size_or_offset: i64) void { if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) { - if (poll.flags.contains(.has_incremented_poll_count)) { - loop.active -|= @as(u32, @intFromBool(!poll.flags.contains(.disable))); - poll.flags.remove(.has_incremented_poll_count); - } poll.flags.insert(.needs_rearm); } + var ptr = poll.owner; switch (ptr.tag()) { @field(Owner.Tag, "FIFO") => { @@ -310,8 +303,10 @@ pub const FilePoll = struct { needs_rearm, has_incremented_poll_count, + has_incremented_active_count, + closed, - disable, + keeps_event_loop_alive, nonblocking, @@ -448,53 +443,48 @@ pub const FilePoll = struct { return !this.flags.contains(.needs_rearm) and (this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process)); } - pub inline fn isKeepingProcessAlive(this: *const FilePoll) bool { - return !this.flags.contains(.disable) and this.isActive(); - } - - pub inline fn canDisableKeepingProcessAlive(this: *const FilePoll) bool { - return !this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count); - } - - /// Make calling ref() on this poll into a no-op. + /// This decrements the active counter if it was previously incremented + /// "active" controls whether or not the event loop should potentially idle pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (this.flags.contains(.disable)) - return; - this.flags.insert(.disable); - - vm.event_loop_handle.?.active -= @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count))); + vm.event_loop_handle.?.subActive(@as(u32, @intFromBool(this.flags.contains(.has_incremented_active_count)))); + this.flags.remove(.keeps_event_loop_alive); + this.flags.remove(.has_incremented_active_count); } pub inline fn canEnableKeepingProcessAlive(this: *const FilePoll) bool { - return this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count); + return this.flags.contains(.keeps_event_loop_alive) and this.flags.contains(.has_incremented_poll_count); } pub fn enableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (!this.flags.contains(.disable)) + if (this.flags.contains(.closed)) return; - this.flags.remove(.disable); - vm.event_loop_handle.?.active += @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count))); - } - - pub fn canActivate(this: *const FilePoll) bool { - return !this.flags.contains(.has_incremented_poll_count); + vm.event_loop_handle.?.addActive(@as(u32, @intFromBool(!this.flags.contains(.has_incremented_active_count)))); + this.flags.insert(.keeps_event_loop_alive); + this.flags.insert(.has_incremented_active_count); } /// Only intended to be used from EventLoop.Pollable - pub fn deactivate(this: *FilePoll, loop: *Loop) void { - std.debug.assert(this.flags.contains(.has_incremented_poll_count)); + fn deactivate(this: *FilePoll, loop: *Loop) void { loop.num_polls -= @as(i32, @intFromBool(this.flags.contains(.has_incremented_poll_count))); - loop.active -|= @as(u32, @intFromBool(!this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count))); - this.flags.remove(.has_incremented_poll_count); + + loop.subActive(@as(u32, @intFromBool(this.flags.contains(.has_incremented_active_count)))); + this.flags.remove(.keeps_event_loop_alive); + this.flags.remove(.has_incremented_active_count); } /// Only intended to be used from EventLoop.Pollable - pub fn activate(this: *FilePoll, loop: *Loop) void { + fn activate(this: *FilePoll, loop: *Loop) void { + this.flags.remove(.closed); + loop.num_polls += @as(i32, @intFromBool(!this.flags.contains(.has_incremented_poll_count))); - loop.active += @as(u32, @intFromBool(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count))); this.flags.insert(.has_incremented_poll_count); + + if (this.flags.contains(.keeps_event_loop_alive)) { + loop.addActive(@as(u32, @intFromBool(!this.flags.contains(.has_incremented_active_count)))); + this.flags.insert(.has_incremented_active_count); + } } pub fn init(vm: *JSC.VirtualMachine, fd: bun.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll { @@ -515,31 +505,26 @@ pub const FilePoll = struct { return poll; } - pub inline fn canRef(this: *const FilePoll) bool { - if (this.flags.contains(.disable)) - return false; - - return !this.flags.contains(.has_incremented_poll_count); - } - - pub inline fn canUnref(this: *const FilePoll) bool { - return this.flags.contains(.has_incremented_poll_count); - } - /// Prevent a poll from keeping the process alive. pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (!this.canUnref()) - return; log("unref", .{}); - this.deactivate(vm.event_loop_handle.?); + this.disableKeepingProcessAlive(vm); } /// Allow a poll to keep the process alive. pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (!this.canRef()) + if (this.flags.contains(.closed)) return; + log("ref", .{}); - this.activate(vm.event_loop_handle.?); + + this.enableKeepingProcessAlive(vm); + } + + pub fn onEnded(this: *FilePoll, vm: *JSC.VirtualMachine) void { + this.flags.remove(.keeps_event_loop_alive); + this.flags.insert(.closed); + this.deactivate(vm.event_loop_handle.?); } pub fn onTick(loop: *Loop, tagged_pointer: ?*anyopaque) callconv(.C) void { @@ -611,6 +596,7 @@ pub const FilePoll = struct { ); this.flags.insert(.was_ever_registered); if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| { + this.deactivate(loop); return errno; } } else if (comptime Environment.isMac) { @@ -697,6 +683,7 @@ pub const FilePoll = struct { const errno = std.c.getErrno(rc); if (errno != .SUCCESS) { + this.deactivate(loop); return JSC.Maybe(void){ .err = bun.sys.Error.fromCode(errno, .kqueue), }; @@ -704,8 +691,7 @@ pub const FilePoll = struct { } else { bun.todo(@src(), {}); } - if (this.canActivate()) - this.activate(loop); + this.activate(loop); this.flags.insert(switch (flag) { .readable => .poll_readable, .process => if (comptime Environment.isLinux) .poll_readable else .poll_process, @@ -725,7 +711,10 @@ pub const FilePoll = struct { } pub fn unregisterWithFd(this: *FilePoll, loop: *Loop, fd: bun.UFileDescriptor, force_unregister: bool) JSC.Maybe(void) { + defer this.deactivate(loop); + if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process) or this.flags.contains(.poll_machport))) { + // no-op return JSC.Maybe(void).success; } @@ -853,8 +842,6 @@ pub const FilePoll = struct { this.flags.remove(.poll_writable); this.flags.remove(.poll_process); this.flags.remove(.poll_machport); - if (this.isActive()) - this.deactivate(loop); return JSC.Maybe(void).success; } diff --git a/src/async/windows_event_loop.zig b/src/async/windows_event_loop.zig index c2c3a699ef..654b4d87cd 100644 --- a/src/async/windows_event_loop.zig +++ b/src/async/windows_event_loop.zig @@ -148,7 +148,7 @@ pub const FilePoll = struct { } pub inline fn isKeepingProcessAlive(this: *const FilePoll) bool { - return !this.flags.contains(.disable) and this.isActive(); + return !this.flags.contains(.closed) and this.isActive(); } pub fn isRegistered(this: *const FilePoll) bool { @@ -157,9 +157,9 @@ pub const FilePoll = struct { /// Make calling ref() on this poll into a no-op. pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (this.flags.contains(.disable)) + if (this.flags.contains(.closed)) return; - this.flags.insert(.disable); + this.flags.insert(.closed); vm.event_loop_handle.?.active_handles -= @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count))); } @@ -183,6 +183,8 @@ pub const FilePoll = struct { this.deinitWithVM(vm); } + pub const deinitForceUnregister = deinit; + pub fn unregister(this: *FilePoll, loop: *Loop) bool { _ = loop; uv.uv_unref(@ptrFromInt(this.fd)); @@ -234,9 +236,9 @@ pub const FilePoll = struct { } pub fn enableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (!this.flags.contains(.disable)) + if (!this.flags.contains(.closed)) return; - this.flags.remove(.disable); + this.flags.remove(.closed); vm.event_loop_handle.?.active_handles += @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count))); } @@ -248,18 +250,18 @@ pub const FilePoll = struct { /// Only intended to be used from EventLoop.Pollable pub fn deactivate(this: *FilePoll, loop: *Loop) void { std.debug.assert(this.flags.contains(.has_incremented_poll_count)); - loop.active_handles -= @as(u32, @intFromBool(!this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count))); + loop.active_handles -= @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count))); this.flags.remove(.has_incremented_poll_count); } /// Only intended to be used from EventLoop.Pollable pub fn activate(this: *FilePoll, loop: *Loop) void { - loop.active_handles += @as(u32, @intFromBool(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count))); + loop.active_handles += @as(u32, @intFromBool(!this.flags.contains(.closed) and !this.flags.contains(.has_incremented_poll_count))); this.flags.insert(.has_incremented_poll_count); } pub inline fn canRef(this: *const FilePoll) bool { - if (this.flags.contains(.disable)) + if (this.flags.contains(.closed)) return false; return !this.flags.contains(.has_incremented_poll_count); diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig index 3d26bbbfbf..0f5453965c 100644 --- a/src/bun.js/api/bun/spawn.zig +++ b/src/bun.js/api/bun/spawn.zig @@ -289,7 +289,7 @@ pub const PosixSpawn = struct { /// `execve` method. pub fn waitpid(pid: pid_t, flags: u32) Maybe(WaitPidResult) { const Status = c_int; - var status: Status = undefined; + var status: Status = 0; while (true) { const rc = system.waitpid(pid, &status, @as(c_int, @intCast(flags))); switch (errno(rc)) { diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 6a1b2888a9..de427076dc 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -99,7 +99,7 @@ pub const Subprocess = struct { if (this.poll == .poll_ref) { if (this.poll.poll_ref) |poll| { - if (poll.isRegistered()) { + if (poll.isActive() or poll.isRegistered()) { return true; } } @@ -113,8 +113,9 @@ pub const Subprocess = struct { pub fn updateHasPendingActivity(this: *Subprocess) void { @fence(.SeqCst); - if (Environment.isDebug) { - log("updateHasPendingActivity: real: {}", .{ + if (comptime Environment.isDebug) { + log("updateHasPendingActivity() {any} -> {any}", .{ + this.has_pending_activity.value, this.hasPendingActivityNonThreadsafe(), }); } @@ -126,12 +127,6 @@ pub const Subprocess = struct { pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool { @fence(.Acquire); - if (Environment.isDebug) { - log("hasPendingActivity: {} (real: {})", .{ - this.has_pending_activity.load(.Acquire), - this.hasPendingActivityNonThreadsafe(), - }); - } return this.has_pending_activity.load(.Acquire); } @@ -167,7 +162,7 @@ pub const Subprocess = struct { switch (this.poll) { .poll_ref => if (this.poll.poll_ref) |poll| { if (deactivate_poll_ref) { - poll.disableKeepingProcessAlive(vm); + poll.onEnded(vm); } else { poll.unref(vm); } @@ -419,11 +414,11 @@ pub const Subprocess = struct { } pub fn hasKilled(this: *const Subprocess) bool { - return this.flags.killed or this.exit_code != null; + return this.exit_code != null or this.signal_code != null; } pub fn tryKill(this: *Subprocess, sig: i32) JSC.Node.Maybe(void) { - if (this.hasKilled()) { + if (this.hasExited()) { return .{ .result = {} }; } @@ -462,7 +457,6 @@ pub const Subprocess = struct { } } - this.flags.killed = true; return .{ .result = {} }; } @@ -1094,8 +1088,17 @@ pub const Subprocess = struct { this: *Subprocess, globalThis: *JSGlobalObject, ) callconv(.C) JSValue { - if (this.exit_code) |code| { - return JSC.JSPromise.resolvedPromiseValue(globalThis, JSC.JSValue.jsNumber(code)); + if (this.hasExited()) { + const waitpid_error = this.waitpid_err; + if (this.exit_code) |code| { + return JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(code)); + } else if (waitpid_error) |err| { + return JSC.JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis)); + } else if (this.signal_code != null) { + return JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(128 +% @intFromEnum(this.signal_code.?))); + } else { + @panic("Subprocess.getExited() has exited but has no exit code or signal code. This is a bug."); + } } if (!this.exit_promise.has()) { @@ -1147,7 +1150,6 @@ pub const Subprocess = struct { globalThis.throwTODO("spawn() is not yet implemented on Windows"); return .zero; } - var arena = @import("root").bun.ArenaAllocator.init(bun.default_allocator); defer arena.deinit(); var allocator = arena.allocator(); @@ -1631,7 +1633,9 @@ pub const Subprocess = struct { .process, true, )) { - .result => {}, + .result => { + subprocess.poll.poll_ref.?.enableKeepingProcessAlive(jsc_vm); + }, .err => |err| { if (err.getErrno() != .SRCH) { @panic("This shouldn't happen"); @@ -1697,7 +1701,9 @@ pub const Subprocess = struct { .process, true, )) { - .result => {}, + .result => { + subprocess.poll.poll_ref.?.enableKeepingProcessAlive(jsc_vm); + }, .err => |err| { if (err.getErrno() != .SRCH) { @panic("This shouldn't happen"); @@ -1797,6 +1803,7 @@ pub const Subprocess = struct { const pid = this.pid; var waitpid_result = waitpid_result_; + while (true) { switch (waitpid_result) { .err => |err| { @@ -1808,9 +1815,16 @@ pub const Subprocess = struct { this.exit_code = @as(u8, @truncate(std.os.W.EXITSTATUS(result.status))); } + // True if the process terminated due to receipt of a signal. if (std.os.W.IFSIGNALED(result.status)) { this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.TERMSIG(result.status))))); - } else if (std.os.W.IFSTOPPED(result.status)) { + } else if ( + // https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/waitpid.2.html + // True if the process has not terminated, but has stopped and can + // be restarted. This macro can be true only if the wait call spec-ified specified + // ified the WUNTRACED option or if the child process is being + // traced (see ptrace(2)). + std.os.W.IFSTOPPED(result.status)) { this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.STOPSIG(result.status))))); } } @@ -1853,12 +1867,59 @@ pub const Subprocess = struct { } } + fn runOnExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject, this_jsvalue: JSC.JSValue) void { + const waitpid_error = this.waitpid_err; + this.waitpid_err = null; + + if (this.exit_promise.trySwap()) |promise| { + if (this.exit_code) |code| { + promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(code)); + } else if (waitpid_error) |err| { + promise.asAnyPromise().?.reject(globalThis, err.toJSC(globalThis)); + } else if (this.signal_code != null) { + promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(128 +% @intFromEnum(this.signal_code.?))); + } else { + // crash in debug mode + if (comptime Environment.allow_assert) + unreachable; + } + } + + if (this.on_exit_callback.trySwap()) |callback| { + const waitpid_value: JSValue = + if (waitpid_error) |err| + err.toJSC(globalThis) + else + JSC.JSValue.jsUndefined(); + + const this_value = if (this_jsvalue.isEmptyOrUndefinedOrNull()) JSC.JSValue.jsUndefined() else this_jsvalue; + this_value.ensureStillAlive(); + + const args = [_]JSValue{ + this_value, + this.getExitCode(globalThis), + this.getSignalCode(globalThis), + waitpid_value, + }; + + const result = callback.callWithThis( + globalThis, + this_value, + &args, + ); + + if (result.isAnyError()) { + globalThis.bunVM().onUnhandledError(globalThis, result); + } + } + } + fn onExit( this: *Subprocess, globalThis: *JSC.JSGlobalObject, this_jsvalue: JSC.JSValue, ) void { - log("onExit {d}, code={d}", .{ this.pid, if (this.exit_code) |e| @as(i32, @intCast(e)) else -1 }); + log("onExit({d}) = {d}, \"{s}\"", .{ this.pid, if (this.exit_code) |e| @as(i32, @intCast(e)) else -1, if (this.signal_code) |code| @tagName(code) else "" }); defer this.updateHasPendingActivity(); this_jsvalue.ensureStillAlive(); @@ -1889,53 +1950,7 @@ pub const Subprocess = struct { this.globalThis.bunVM().enqueueTask(JSC.Task.init(&holder.task)); } - if (this.exit_promise.trySwap()) |promise| { - const waitpid_error = this.waitpid_err; - this.waitpid_err = null; - - if (this.exit_code) |code| { - promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(code)); - } else if (this.signal_code != null) { - promise.asAnyPromise().?.resolve(globalThis, this.getSignalCode(globalThis)); - } else if (waitpid_error) |err| { - promise.asAnyPromise().?.reject(globalThis, err.toJSC(globalThis)); - } else { - // crash in debug mode - if (comptime Environment.allow_assert) - unreachable; - } - } - } - - if (this.on_exit_callback.trySwap()) |callback| { - const waitpid_error = this.waitpid_err; - this.waitpid_err = null; - - const waitpid_value: JSValue = - if (waitpid_error) |err| - err.toJSC(globalThis) - else - JSC.JSValue.jsUndefined(); - - const this_value = if (this_jsvalue.isEmptyOrUndefinedOrNull()) JSC.JSValue.jsUndefined() else this_jsvalue; - this_value.ensureStillAlive(); - - const args = [_]JSValue{ - this_value, - this.getExitCode(globalThis), - this.getSignalCode(globalThis), - waitpid_value, - }; - - const result = callback.callWithThis( - globalThis, - this_value, - &args, - ); - - if (result.isAnyError()) { - globalThis.bunVM().onUnhandledError(globalThis, result); - } + this.runOnExit(globalThis, this_jsvalue); } } diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 24ae5757ab..04860ec1cf 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1257,9 +1257,9 @@ pub const FileSink = struct { pub fn updateRef(this: *FileSink, value: bool) void { if (this.poll_ref) |poll| { if (value) - poll.enableKeepingProcessAlive(JSC.VirtualMachine.get()) + poll.ref(JSC.VirtualMachine.get()) else - poll.disableKeepingProcessAlive(JSC.VirtualMachine.get()); + poll.unref(JSC.VirtualMachine.get()); } } @@ -1572,7 +1572,7 @@ pub const FileSink = struct { if (this.poll_ref) |poll| { this.poll_ref = null; - poll.deinit(); + poll.deinitForceUnregister(); } if (this.auto_close) { @@ -1743,7 +1743,7 @@ pub const FileSink = struct { if (signal_close) { if (this.poll_ref) |poll| { this.poll_ref = null; - poll.deinit(); + poll.deinitForceUnregister(); } this.fd = bun.invalid_fd; @@ -3811,6 +3811,7 @@ pub const FIFO = struct { this.close_on_empty_read = true; if (this.poll_ref) |poll| { poll.flags.insert(.hup); + poll.disableKeepingProcessAlive(JSC.VirtualMachine.get()); } this.pending.result = .{ .done = {} }; @@ -3820,12 +3821,7 @@ pub const FIFO = struct { pub fn close(this: *FIFO) void { if (this.poll_ref) |poll| { this.poll_ref = null; - if (comptime Environment.isLinux) { - // force target fd to be removed from epoll - poll.deinitForceUnregister(); - } else { - poll.deinit(); - } + poll.deinit(); } const fd = this.fd; @@ -4624,6 +4620,10 @@ pub const FileReader = struct { } }; + pub fn toBlob(this: *Readable) Blob { + if (this.isClosed()) return Blob.initEmpty(JSC.VirtualMachine.get().global); + } + pub fn deinit(this: *Readable) void { switch (this.*) { .FIFO => { @@ -4748,7 +4748,7 @@ pub const FileReader = struct { }, }; this.lazy_readable.readable.FIFO.watch(readable_file.fd); - this.lazy_readable.readable.FIFO.pollRef().ref(this.globalThis().bunVM()); + this.lazy_readable.readable.FIFO.pollRef().enableKeepingProcessAlive(this.globalThis().bunVM()); if (!(blob.data.file.is_atty orelse false)) { this.lazy_readable.readable.FIFO.poll_ref.?.flags.insert(.nonblocking); } @@ -4822,9 +4822,9 @@ pub const FileReader = struct { .FIFO => { if (this.lazy_readable.readable.FIFO.poll_ref) |poll| { if (value) { - poll.enableKeepingProcessAlive(this.globalThis().bunVM()); + poll.ref(this.globalThis().bunVM()); } else { - poll.disableKeepingProcessAlive(this.globalThis().bunVM()); + poll.unref(this.globalThis().bunVM()); } } }, @@ -4909,6 +4909,7 @@ pub fn NewReadyWatcher( std.debug.assert( this.poll_ref.?.unregister(JSC.VirtualMachine.get().event_loop_handle.?, false) == .result, ); + this.poll_ref.?.disableKeepingProcessAlive(JSC.VirtualMachine.get()); } pub fn pollRef(this: *Context) *Async.FilePoll { diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig index 0e85ddca5d..998fd36e2a 100644 --- a/src/cli/test_command.zig +++ b/src/cli/test_command.zig @@ -969,7 +969,7 @@ pub const TestCommand = struct { const file_end = reporter.jest.files.len; for (file_start..file_end) |module_id| { - const module = reporter.jest.files.items(.module_scope)[module_id]; + const module: *jest.DescribeScope = reporter.jest.files.items(.module_scope)[module_id]; vm.onUnhandledRejectionCtx = null; vm.onUnhandledRejection = jest.TestRunnerTask.onUnhandledRejection; diff --git a/src/deps/uws.zig b/src/deps/uws.zig index ac80e62b43..cca7bafaa9 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -910,6 +910,16 @@ pub const PosixLoop = extern struct { return this.active > 0; } + // This exists as a method so that we can stick a debugger in here + pub fn addActive(this: *PosixLoop, value: u32) void { + this.active +|= value; + } + + // This exists as a method so that we can stick a debugger in here + pub fn subActive(this: *PosixLoop, value: u32) void { + this.active -|= value; + } + pub fn unrefCount(this: *PosixLoop, count: i32) void { log("unref x {d}", .{count}); this.num_polls -|= count; diff --git a/test/js/bun/spawn/spawn-streaming-stdin.test.ts b/test/js/bun/spawn/spawn-streaming-stdin.test.ts index 3226f03cc3..c7065a681a 100644 --- a/test/js/bun/spawn/spawn-streaming-stdin.test.ts +++ b/test/js/bun/spawn/spawn-streaming-stdin.test.ts @@ -10,9 +10,10 @@ const N = 100; test("spawn can write to stdin multiple chunks", async () => { const maxFD = openSync("/dev/null", "w"); for (let i = 0; i < N; i++) { - const tmperr = join(tmpdir(), "stdin-repro-error.log." + i); var exited; await (async function () { + const tmperr = join(tmpdir(), "stdin-repro-error.log." + i); + const proc = spawn({ cmd: [bunExe(), import.meta.dir + "/stdin-repro.js"], stdout: "pipe", @@ -43,7 +44,7 @@ test("spawn can write to stdin multiple chunks", async () => { if (inCounter === 4) break; } - proc.stdin!.end(); + await proc.stdin!.end(); })(); await Promise.all([prom, prom2]); diff --git a/test/js/bun/spawn/spawn.test.ts b/test/js/bun/spawn/spawn.test.ts index bfc7b6aa8a..06b6af37f3 100644 --- a/test/js/bun/spawn/spawn.test.ts +++ b/test/js/bun/spawn/spawn.test.ts @@ -500,7 +500,7 @@ describe("spawn unref and kill should not hang", () => { it("kill and await exited", async () => { for (let i = 0; i < 10; i++) { const proc = spawn({ - cmd: ["sleep", "0"], + cmd: ["sleep", "0.001"], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -514,7 +514,7 @@ describe("spawn unref and kill should not hang", () => { it("unref", async () => { for (let i = 0; i < 100; i++) { const proc = spawn({ - cmd: ["sleep", "0"], + cmd: ["sleep", "0.001"], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -528,7 +528,7 @@ describe("spawn unref and kill should not hang", () => { it("kill and unref", async () => { for (let i = 0; i < 100; i++) { const proc = spawn({ - cmd: ["sleep", "0"], + cmd: ["sleep", "0.001"], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -543,7 +543,7 @@ describe("spawn unref and kill should not hang", () => { it("unref and kill", async () => { for (let i = 0; i < 100; i++) { const proc = spawn({ - cmd: ["sleep", "0"], + cmd: ["sleep", "0.001"], stdout: "ignore", stderr: "ignore", stdin: "ignore", @@ -555,36 +555,7 @@ describe("spawn unref and kill should not hang", () => { expect().pass(); }); - it("unref and kill after sleep", async () => { - for (let i = 0; i < 100; i++) { - const proc = spawn({ - cmd: ["sleep", "0"], - stdout: "ignore", - stderr: "ignore", - stdin: "ignore", - }); - await Bun.sleep(1); - proc.unref(); - proc.kill(); - await proc.exited; - } - expect().pass(); - }); - it("kill and unref after sleep", async () => { - for (let i = 0; i < 100; i++) { - const proc = spawn({ - cmd: ["sleep", "0"], - stdout: "ignore", - stderr: "ignore", - stdin: "ignore", - }); - await Bun.sleep(1); - proc.kill(); - proc.unref(); - await proc.exited; - } - expect().pass(); - }); + it("should not hang after unref", async () => { const proc = spawn({ cmd: [bunExe(), path.join(import.meta.dir, "does-not-hang.js")], @@ -594,3 +565,90 @@ describe("spawn unref and kill should not hang", () => { expect().pass(); }); }); + +async function runTest(sleep: string, order = ["sleep", "kill", "unref", "exited"]) { + console.log("running", order.join(",")); + for (let i = 0; i < 100; i++) { + const proc = spawn({ + cmd: ["sleep", sleep], + stdout: "ignore", + stderr: "ignore", + stdin: "ignore", + }); + for (let action of order) { + switch (action) { + case "sleep": { + await Bun.sleep(1); + break; + } + + case "kill": { + proc.kill(); + break; + } + + case "unref": { + proc.unref(); + break; + } + + case "exited": { + expect(await proc.exited).toBeNumber(); + break; + } + + default: { + throw new Error("unknown action"); + } + } + } + } + expect().pass(); +} + +describe("should not hang", () => { + for (let sleep of ["0.001", "0"]) { + describe("sleep " + sleep, () => { + for (let order of [ + ["sleep", "kill", "unref", "exited"], + ["sleep", "unref", "kill", "exited"], + ["kill", "sleep", "unref", "exited"], + ["kill", "unref", "sleep", "exited"], + ["unref", "sleep", "kill", "exited"], + ["unref", "kill", "sleep", "exited"], + ["exited", "sleep", "kill", "unref"], + ["exited", "sleep", "unref", "kill"], + ["exited", "kill", "sleep", "unref"], + ["exited", "kill", "unref", "sleep"], + ["exited", "unref", "sleep", "kill"], + ["exited", "unref", "kill", "sleep"], + ["unref", "exited"], + ["exited", "unref"], + ["kill", "exited"], + ["exited"], + ]) { + const name = order.join(","); + const fn = runTest.bind(undefined, sleep, order); + it(name, fn); + } + }); + } +}); + +it("#3480", async () => { + try { + var server = Bun.serve({ + port: 0, + fetch: (req, res) => { + Bun.spawnSync(["echo", "1"], {}); + return new Response("Hello world!"); + }, + }); + + const response = await fetch("http://" + server.hostname + ":" + server.port); + expect(await response.text()).toBe("Hello world!"); + expect(response.ok); + } finally { + server!.stop(true); + } +}); diff --git a/test/js/bun/spawn/stdin-repro.js b/test/js/bun/spawn/stdin-repro.js index 02840d00b6..51b101764a 100644 --- a/test/js/bun/spawn/stdin-repro.js +++ b/test/js/bun/spawn/stdin-repro.js @@ -1,10 +1,12 @@ var stdout = Bun.stdout.writer(); console.error("Started"); var count = 0; -for await (let chunk of Bun.stdin.stream()) { +// const file = Bun.file("/tmp/testpipe"); +const file = Bun.stdin; +for await (let chunk of file.stream()) { const str = new Buffer(chunk).toString(); stdout.write(str); - stdout.flush(); + await stdout.flush(); count++; } console.error("Finished with", count); diff --git a/test/js/node/child_process/child_process-node.test.js b/test/js/node/child_process/child_process-node.test.js index 18fd88687b..70f637795a 100644 --- a/test/js/node/child_process/child_process-node.test.js +++ b/test/js/node/child_process/child_process-node.test.js @@ -200,7 +200,7 @@ describe("ChildProcess spawn bad stdio", () => { }); } - it.todo("should handle normal execution of child process", async () => { + it("should handle normal execution of child process", async () => { await createChild({}, (err, stdout, stderr) => { strictEqual(err, null); strictEqual(stdout, ""); @@ -386,7 +386,7 @@ describe("child_process default options", () => { }); describe("child_process double pipe", () => { - it.skip("should allow two pipes to be used at once", done => { + it.skipIf(process.platform === "linux")("should allow two pipes to be used at once", done => { // const { mustCallAtLeast, mustCall } = createCallCheckCtx(done); const mustCallAtLeast = fn => fn; const mustCall = fn => fn; diff --git a/test/js/node/child_process/child_process.test.ts b/test/js/node/child_process/child_process.test.ts index 8c24ef55e8..e94e163114 100644 --- a/test/js/node/child_process/child_process.test.ts +++ b/test/js/node/child_process/child_process.test.ts @@ -22,7 +22,7 @@ describe("ChildProcess.spawn()", () => { resolve(true); }); // @ts-ignore - proc.spawn({ file: "bun", args: ["bun", "-v"] }); + proc.spawn({ file: bunExe(), args: [bunExe(), "-v"] }); }); expect(result).toBe(true); }); @@ -34,7 +34,7 @@ describe("ChildProcess.spawn()", () => { resolve(true); }); // @ts-ignore - proc.spawn({ file: "bun", args: ["bun", "-v"] }); + proc.spawn({ file: bunExe(), args: [bunExe(), "-v"] }); proc.kill(); }); expect(result).toBe(true); @@ -60,8 +60,8 @@ describe("spawn()", () => { expect(!!child2).toBe(false); }); - it.todo("should allow stdout to be read via Node stream.Readable `data` events", async () => { - const child = spawn("bun", ["-v"]); + it("should allow stdout to be read via Node stream.Readable `data` events", async () => { + const child = spawn(bunExe(), ["-v"]); const result: string = await new Promise(resolve => { child.stdout.on("error", e => { console.error(e); @@ -77,8 +77,8 @@ describe("spawn()", () => { expect(SEMVER_REGEX.test(result.trim())).toBe(true); }); - it.todo("should allow stdout to be read via .read() API", async () => { - const child = spawn("bun", ["-v"]); + it("should allow stdout to be read via .read() API", async () => { + const child = spawn(bunExe(), ["-v"]); const result: string = await new Promise((resolve, reject) => { let finalData = ""; child.stdout.on("error", e => { @@ -97,10 +97,10 @@ describe("spawn()", () => { }); it("should accept stdio option with 'ignore' for no stdio fds", async () => { - const child1 = spawn("bun", ["-v"], { + const child1 = spawn(bunExe(), ["-v"], { stdio: "ignore", }); - const child2 = spawn("bun", ["-v"], { + const child2 = spawn(bunExe(), ["-v"], { stdio: ["ignore", "ignore", "ignore"], }); @@ -177,7 +177,7 @@ describe("spawn()", () => { resolve = resolve1; }); process.env.NO_COLOR = "1"; - const child = spawn("node", ["--help"], { argv0: "bun" }); + const child = spawn("node", ["--help"], { argv0: bunExe() }); delete process.env.NO_COLOR; let msg = ""; @@ -216,9 +216,9 @@ describe("spawn()", () => { }); describe("execFile()", () => { - it.todo("should execute a file", async () => { + it("should execute a file", async () => { const result: Buffer = await new Promise((resolve, reject) => { - execFile("bun", ["-v"], { encoding: "buffer" }, (error, stdout, stderr) => { + execFile(bunExe(), ["-v"], { encoding: "buffer" }, (error, stdout, stderr) => { if (error) { reject(error); } @@ -230,7 +230,7 @@ describe("execFile()", () => { }); describe("exec()", () => { - it.todo("should execute a command in a shell", async () => { + it("should execute a command in a shell", async () => { const result: Buffer = await new Promise((resolve, reject) => { exec("bun -v", { encoding: "buffer" }, (error, stdout, stderr) => { if (error) { @@ -242,7 +242,7 @@ describe("exec()", () => { expect(SEMVER_REGEX.test(result.toString().trim())).toBe(true); }); - it.todo("should return an object w/ stdout and stderr when promisified", async () => { + it("should return an object w/ stdout and stderr when promisified", async () => { const result = await promisify(exec)("bun -v"); expect(typeof result).toBe("object"); expect(typeof result.stdout).toBe("string"); @@ -262,8 +262,8 @@ describe("spawnSync()", () => { }); describe("execFileSync()", () => { - it.todo("should execute a file synchronously", () => { - const result = execFileSync("bun", ["-v"], { encoding: "utf8" }); + it("should execute a file synchronously", () => { + const result = execFileSync(bunExe(), ["-v"], { encoding: "utf8" }); expect(SEMVER_REGEX.test(result.trim())).toBe(true); }); @@ -277,7 +277,7 @@ describe("execFileSync()", () => { }); describe("execSync()", () => { - it.todo("should execute a command in the shell synchronously", () => { + it("should execute a command in the shell synchronously", () => { const result = execSync("bun -v", { encoding: "utf8" }); expect(SEMVER_REGEX.test(result.trim())).toBe(true); }); diff --git a/test/js/node/child_process/fixtures/child-process-exit-event.js b/test/js/node/child_process/fixtures/child-process-exit-event.js index 4400ace1b3..ac82ebb764 100644 --- a/test/js/node/child_process/fixtures/child-process-exit-event.js +++ b/test/js/node/child_process/fixtures/child-process-exit-event.js @@ -7,7 +7,14 @@ function closeHandler() { console.log("closeHandler called"); } -const p = spawn("bun", ["--version"]); +let bunExe = process.execPath; +if ((process.versions.bun || "").endsWith("_debug")) { + bunExe = "bun-debug"; +} else if (bunExe.endsWith("node")) { + bunExe = "bun"; +} + +const p = spawn(bunExe, ["--version"]); p.on("exit", exitHandler); p.on("close", closeHandler); diff --git a/test/js/web/websocket/websocket.test.js b/test/js/web/websocket/websocket.test.js index 7b88be9236..be86ec8468 100644 --- a/test/js/web/websocket/websocket.test.js +++ b/test/js/web/websocket/websocket.test.js @@ -443,7 +443,7 @@ describe("websocket in subprocess", () => { subprocess.kill(); - expect(await subprocess.exited).toBe("SIGHUP"); + expect(await subprocess.exited).toBe(129); }); it("should exit with invalid url", async () => { diff --git a/test/regression/issue/03830.test.ts b/test/regression/issue/03830.test.ts index 9723c59005..caf2d9490c 100644 --- a/test/regression/issue/03830.test.ts +++ b/test/regression/issue/03830.test.ts @@ -20,11 +20,13 @@ it("macros should not lead to seg faults under any given input", async () => { writeFileSync(join(testDir, "macro.ts"), "export function fn(str) { return str; }"); writeFileSync(join(testDir, "index.ts"), "import { fn } from './macro' assert { type: 'macro' };\nfn(`©${''}`);"); - const { stdout, exitCode } = Bun.spawnSync({ - cmd: [bunExe(), "build", join(testDir, "index.ts")], + const { stderr, exitCode } = Bun.spawnSync({ + cmd: [bunExe(), "build", "--minify", join(testDir, "index.ts")], env: bunEnv, - stderr: "inherit", + stderr: "pipe", }); - expect(exitCode).toBe(0); + expect(stderr.toString().trim()).toStartWith('error: "Cannot convert argument type to JS'); + expect(exitCode).not.toBe(0); + expect(exitCode).toBe(1); });