mirror of
https://github.com/oven-sh/bun
synced 2026-02-10 02:48:50 +00:00
Fix running Bun.spawn on Vercel and GCP (#6724)
* Fix running `Bun.spawn` on Vercel and GCP * Update subprocess.zig * Deflake test * Update spawn-streaming-stdout.test.ts * Fix tests + cleanup * Fix hang * Handle edgecase * Update subprocess.zig * Update subprocess.zig --------- Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
This commit is contained in:
@@ -30,13 +30,13 @@ const errno = std.os.errno;
|
||||
const mode_t = std.os.mode_t;
|
||||
const unexpectedErrno = std.os.unexpectedErrno;
|
||||
|
||||
pub const WaitPidResult = struct {
|
||||
pid: pid_t,
|
||||
status: u32,
|
||||
};
|
||||
|
||||
// mostly taken from zig's posix_spawn.zig
|
||||
pub const PosixSpawn = struct {
|
||||
pub const WaitPidResult = struct {
|
||||
pid: pid_t,
|
||||
status: u32,
|
||||
};
|
||||
|
||||
pub const Attr = struct {
|
||||
attr: system.posix_spawnattr_t,
|
||||
|
||||
|
||||
@@ -16,6 +16,8 @@ const Which = @import("../../../which.zig");
|
||||
const uws = @import("../../../deps/uws.zig");
|
||||
const IPC = @import("../../ipc.zig");
|
||||
|
||||
const PosixSpawn = @import("./spawn.zig").PosixSpawn;
|
||||
|
||||
pub const Subprocess = struct {
|
||||
const log = Output.scoped(.Subprocess, false);
|
||||
pub usingnamespace JSC.Codegen.JSSubprocess;
|
||||
@@ -29,8 +31,7 @@ pub const Subprocess = struct {
|
||||
stdin: Writable,
|
||||
stdout: Readable,
|
||||
stderr: Readable,
|
||||
killed: bool = false,
|
||||
poll_ref: ?*JSC.FilePoll = null,
|
||||
poll: Poll = Poll{ .poll_ref = null },
|
||||
|
||||
exit_promise: JSC.Strong = .{},
|
||||
on_exit_callback: JSC.Strong = .{},
|
||||
@@ -39,14 +40,6 @@ pub const Subprocess = struct {
|
||||
signal_code: ?SignalCode = null,
|
||||
waitpid_err: ?bun.sys.Error = null,
|
||||
|
||||
has_waitpid_task: bool = false,
|
||||
notification_task: JSC.AnyTask = undefined,
|
||||
waitpid_task: JSC.AnyTask = undefined,
|
||||
|
||||
wait_task: JSC.ConcurrentTask = .{},
|
||||
|
||||
finalized: bool = false,
|
||||
|
||||
globalThis: *JSC.JSGlobalObject,
|
||||
observable_getters: std.enums.EnumSet(enum {
|
||||
stdin,
|
||||
@@ -59,16 +52,31 @@ pub const Subprocess = struct {
|
||||
stderr,
|
||||
}) = .{},
|
||||
has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
|
||||
is_sync: bool = false,
|
||||
this_jsvalue: JSC.JSValue = .zero,
|
||||
|
||||
ipc_mode: IPCMode,
|
||||
ipc_callback: JSC.Strong = .{},
|
||||
ipc: IPC.IPCData,
|
||||
flags: Flags = .{},
|
||||
|
||||
pub const Flags = packed struct(u32) {
|
||||
is_sync: bool = false,
|
||||
killed: bool = false,
|
||||
reference_count: u30 = 0,
|
||||
};
|
||||
|
||||
has_pending_unref: bool = false,
|
||||
pub const SignalCode = bun.SignalCode;
|
||||
|
||||
pub const Poll = union(enum) {
|
||||
poll_ref: ?*JSC.FilePoll,
|
||||
wait_thread: WaitThreadPoll,
|
||||
};
|
||||
|
||||
pub const WaitThreadPoll = struct {
|
||||
ref_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
|
||||
poll_ref: JSC.PollRef = .{},
|
||||
};
|
||||
|
||||
pub const IPCMode = enum {
|
||||
none,
|
||||
bun,
|
||||
@@ -79,9 +87,37 @@ pub const Subprocess = struct {
|
||||
return this.exit_code != null or this.waitpid_err != null or this.signal_code != null;
|
||||
}
|
||||
|
||||
pub fn updateHasPendingActivityFlag(this: *Subprocess) void {
|
||||
pub fn hasPendingActivityNonThreadsafe(this: *const Subprocess) bool {
|
||||
if (this.poll == .wait_thread and this.poll.wait_thread.ref_count.load(.Monotonic) > 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.flags.reference_count > 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.ipc_mode != .none) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.poll == .poll_ref) {
|
||||
if (this.poll.poll_ref) |poll| {
|
||||
if (poll.isRegistered()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn updateHasPendingActivity(this: *Subprocess) void {
|
||||
@fence(.SeqCst);
|
||||
this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc_mode == .none and this.has_pending_unref, .SeqCst);
|
||||
|
||||
this.has_pending_activity.store(
|
||||
this.hasPendingActivityNonThreadsafe(),
|
||||
.Monotonic,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool {
|
||||
@@ -89,14 +125,19 @@ pub const Subprocess = struct {
|
||||
return this.has_pending_activity.load(.Acquire);
|
||||
}
|
||||
|
||||
pub fn updateHasPendingActivity(this: *Subprocess) void {
|
||||
@fence(.Release);
|
||||
this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc_mode == .none and this.has_pending_unref, .Release);
|
||||
}
|
||||
|
||||
pub fn ref(this: *Subprocess) void {
|
||||
var vm = this.globalThis.bunVM();
|
||||
if (this.poll_ref) |poll| poll.enableKeepingProcessAlive(vm);
|
||||
switch (this.poll) {
|
||||
.poll_ref => if (this.poll.poll_ref) |poll| {
|
||||
this.flags.reference_count += @as(u30, @intFromBool(!poll.isRegistered()));
|
||||
poll.enableKeepingProcessAlive(vm);
|
||||
},
|
||||
|
||||
.wait_thread => |*wait_thread| {
|
||||
wait_thread.poll_ref.ref(vm);
|
||||
},
|
||||
}
|
||||
|
||||
if (!this.hasCalledGetter(.stdin)) {
|
||||
this.stdin.ref();
|
||||
}
|
||||
@@ -114,7 +155,15 @@ pub const Subprocess = struct {
|
||||
pub fn unref(this: *Subprocess) void {
|
||||
var vm = this.globalThis.bunVM();
|
||||
|
||||
if (this.poll_ref) |poll| poll.disableKeepingProcessAlive(vm);
|
||||
switch (this.poll) {
|
||||
.poll_ref => if (this.poll.poll_ref) |poll| {
|
||||
this.flags.reference_count -= @as(u30, @intFromBool(poll.isRegistered()));
|
||||
poll.disableKeepingProcessAlive(vm);
|
||||
},
|
||||
.wait_thread => |*wait_thread| {
|
||||
wait_thread.poll_ref.unref(vm);
|
||||
},
|
||||
}
|
||||
if (!this.hasCalledGetter(.stdin)) {
|
||||
this.stdin.unref();
|
||||
}
|
||||
@@ -358,7 +407,7 @@ pub const Subprocess = struct {
|
||||
}
|
||||
|
||||
pub fn hasKilled(this: *const Subprocess) bool {
|
||||
return this.killed or this.hasExited();
|
||||
return this.flags.killed or this.exit_code != null;
|
||||
}
|
||||
|
||||
pub fn tryKill(this: *Subprocess, sig: i32) JSC.Node.Maybe(void) {
|
||||
@@ -366,23 +415,31 @@ pub const Subprocess = struct {
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
if (comptime Environment.isLinux) {
|
||||
// should this be handled differently?
|
||||
// this effectively shouldn't happen
|
||||
if (this.pidfd == bun.invalid_fd) {
|
||||
return .{ .result = {} };
|
||||
send_signal: {
|
||||
if (comptime Environment.isLinux) {
|
||||
// if these are the same, it means the pidfd is invalid.
|
||||
if (!WaiterThread.shouldUseWaiterThread()) {
|
||||
// should this be handled differently?
|
||||
// this effectively shouldn't happen
|
||||
if (this.pidfd == bun.invalid_fd) {
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
// first appeared in Linux 5.1
|
||||
const rc = std.os.linux.pidfd_send_signal(this.pidfd, @as(u8, @intCast(sig)), null, 0);
|
||||
|
||||
if (rc != 0) {
|
||||
const errno = std.os.linux.getErrno(rc);
|
||||
|
||||
// if the process was already killed don't throw
|
||||
if (errno != .SRCH and errno != .NOSYS)
|
||||
return .{ .err = bun.sys.Error.fromCode(errno, .kill) };
|
||||
} else {
|
||||
break :send_signal;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// first appeared in Linux 5.1
|
||||
const rc = std.os.linux.pidfd_send_signal(this.pidfd, @as(u8, @intCast(sig)), null, 0);
|
||||
|
||||
if (rc != 0) {
|
||||
const errno = std.os.linux.getErrno(rc);
|
||||
// if the process was already killed don't throw
|
||||
if (errno != .SRCH)
|
||||
return .{ .err = bun.sys.Error.fromCode(errno, .kill) };
|
||||
}
|
||||
} else {
|
||||
const err = std.c.kill(this.pid, sig);
|
||||
if (err != 0) {
|
||||
const errno = bun.C.getErrno(err);
|
||||
@@ -393,7 +450,7 @@ pub const Subprocess = struct {
|
||||
}
|
||||
}
|
||||
|
||||
this.killed = true;
|
||||
this.flags.killed = true;
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
@@ -493,10 +550,10 @@ pub const Subprocess = struct {
|
||||
// because we don't want to block the thread waiting for the write
|
||||
switch (bun.isWritable(this.fd)) {
|
||||
.ready => {
|
||||
if (this.poll_ref) |poll_ref| {
|
||||
poll_ref.flags.insert(.writable);
|
||||
poll_ref.flags.insert(.fifo);
|
||||
std.debug.assert(poll_ref.flags.contains(.poll_writable));
|
||||
if (this.poll_ref) |poll| {
|
||||
poll.flags.insert(.writable);
|
||||
poll.flags.insert(.fifo);
|
||||
std.debug.assert(poll.flags.contains(.poll_writable));
|
||||
}
|
||||
},
|
||||
.hup => {
|
||||
@@ -1452,7 +1509,7 @@ pub const Subprocess = struct {
|
||||
};
|
||||
|
||||
const pidfd: std.os.fd_t = brk: {
|
||||
if (!Environment.isLinux) {
|
||||
if (!Environment.isLinux or WaiterThread.shouldUseWaiterThread()) {
|
||||
break :brk pid;
|
||||
}
|
||||
|
||||
@@ -1493,16 +1550,8 @@ pub const Subprocess = struct {
|
||||
|
||||
const error_instance = brk2: {
|
||||
if (err == .NOSYS) {
|
||||
break :brk2 globalThis.createErrorInstance(
|
||||
\\"pidfd_open(2)" system call is not supported by your Linux kernel
|
||||
\\To fix this error, either:
|
||||
\\- Upgrade your Linux kernel to a newer version (current: {})
|
||||
\\- Ensure the seccomp filter allows "pidfd_open"
|
||||
,
|
||||
.{
|
||||
kernel.fmt(""),
|
||||
},
|
||||
);
|
||||
WaiterThread.setShouldUseWaiterThread();
|
||||
break :brk pid;
|
||||
}
|
||||
|
||||
break :brk2 bun.sys.Error.fromCode(err, .open).toJSC(globalThis);
|
||||
@@ -1534,11 +1583,13 @@ pub const Subprocess = struct {
|
||||
.stdout = Readable.init(stdio[bun.STDOUT_FD], stdout_pipe[0], jsc_vm.allocator, default_max_buffer_size),
|
||||
.stderr = Readable.init(stdio[bun.STDERR_FD], stderr_pipe[0], jsc_vm.allocator, default_max_buffer_size),
|
||||
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
|
||||
.is_sync = is_sync,
|
||||
.ipc_mode = ipc_mode,
|
||||
// will be assigned in the block below
|
||||
.ipc = .{ .socket = socket },
|
||||
.ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined,
|
||||
.flags = .{
|
||||
.is_sync = is_sync,
|
||||
},
|
||||
};
|
||||
if (ipc_mode != .none) {
|
||||
var ptr = socket.ext(*Subprocess);
|
||||
@@ -1560,22 +1611,27 @@ pub const Subprocess = struct {
|
||||
const watchfd = if (comptime Environment.isLinux) pidfd else pid;
|
||||
|
||||
if (comptime !is_sync) {
|
||||
var poll = JSC.FilePoll.init(jsc_vm, watchfd, .{}, Subprocess, subprocess);
|
||||
subprocess.poll_ref = poll;
|
||||
switch (subprocess.poll_ref.?.register(
|
||||
jsc_vm.event_loop_handle.?,
|
||||
.process,
|
||||
true,
|
||||
)) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
if (err.getErrno() != .SRCH) {
|
||||
@panic("This shouldn't happen");
|
||||
}
|
||||
if (!WaiterThread.shouldUseWaiterThread()) {
|
||||
var poll = JSC.FilePoll.init(jsc_vm, watchfd, .{}, Subprocess, subprocess);
|
||||
subprocess.poll = .{ .poll_ref = poll };
|
||||
subprocess.flags.reference_count += 1;
|
||||
switch (subprocess.poll.poll_ref.?.register(
|
||||
jsc_vm.event_loop_handle.?,
|
||||
.process,
|
||||
true,
|
||||
)) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
if (err.getErrno() != .SRCH) {
|
||||
@panic("This shouldn't happen");
|
||||
}
|
||||
|
||||
send_exit_notification = true;
|
||||
lazy = false;
|
||||
},
|
||||
send_exit_notification = true;
|
||||
lazy = false;
|
||||
},
|
||||
}
|
||||
} else {
|
||||
WaiterThread.append(subprocess);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1583,7 +1639,7 @@ pub const Subprocess = struct {
|
||||
if (send_exit_notification) {
|
||||
// process has already exited
|
||||
// https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007
|
||||
subprocess.onExitNotification();
|
||||
subprocess.wait(subprocess.flags.is_sync);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1622,10 +1678,11 @@ pub const Subprocess = struct {
|
||||
}
|
||||
subprocess.closeIO(.stdin);
|
||||
|
||||
{
|
||||
if (!WaiterThread.shouldUseWaiterThread()) {
|
||||
var poll = JSC.FilePoll.init(jsc_vm, watchfd, .{}, Subprocess, subprocess);
|
||||
subprocess.poll_ref = poll;
|
||||
switch (subprocess.poll_ref.?.register(
|
||||
subprocess.poll = .{ .poll_ref = poll };
|
||||
subprocess.flags.reference_count += 1;
|
||||
switch (subprocess.poll.poll_ref.?.register(
|
||||
jsc_vm.event_loop_handle.?,
|
||||
.process,
|
||||
true,
|
||||
@@ -1641,6 +1698,8 @@ pub const Subprocess = struct {
|
||||
subprocess.onExitNotification();
|
||||
},
|
||||
}
|
||||
} else {
|
||||
WaiterThread.append(subprocess);
|
||||
}
|
||||
|
||||
while (!subprocess.hasExited()) {
|
||||
@@ -1671,28 +1730,43 @@ pub const Subprocess = struct {
|
||||
|
||||
pub fn onExitNotificationTask(this: *Subprocess) void {
|
||||
var vm = this.globalThis.bunVM();
|
||||
defer vm.drainMicrotasks();
|
||||
std.debug.assert(!this.is_sync);
|
||||
const is_sync = this.flags.is_sync;
|
||||
|
||||
defer {
|
||||
if (!is_sync)
|
||||
vm.drainMicrotasks();
|
||||
}
|
||||
this.wait(false);
|
||||
}
|
||||
|
||||
pub fn onExitNotification(
|
||||
this: *Subprocess,
|
||||
) void {
|
||||
this.wait(this.is_sync);
|
||||
std.debug.assert(this.flags.is_sync);
|
||||
|
||||
defer this.flags.reference_count -= 1;
|
||||
this.wait(this.flags.is_sync);
|
||||
}
|
||||
|
||||
pub fn wait(this: *Subprocess, sync: bool) void {
|
||||
return this.waitWithJSValue(sync, this.this_jsvalue);
|
||||
}
|
||||
|
||||
pub fn watch(this: *Subprocess) void {
|
||||
if (this.poll_ref) |poll| {
|
||||
_ = poll.register(
|
||||
pub fn watch(this: *Subprocess) JSC.Maybe(void) {
|
||||
if (WaiterThread.shouldUseWaiterThread()) {
|
||||
WaiterThread.append(this);
|
||||
return JSC.Maybe(void){ .result = {} };
|
||||
}
|
||||
|
||||
if (this.poll.poll_ref) |poll| {
|
||||
this.flags.reference_count += @as(u30, @intFromBool(!poll.isRegistered()));
|
||||
const registration = poll.register(
|
||||
this.globalThis.bunVM().event_loop_handle.?,
|
||||
.process,
|
||||
true,
|
||||
);
|
||||
|
||||
return registration;
|
||||
} else {
|
||||
@panic("Internal Bun error: poll_ref in Subprocess is null unexpectedly. Please file a bug report.");
|
||||
}
|
||||
@@ -1703,44 +1777,66 @@ pub const Subprocess = struct {
|
||||
sync: bool,
|
||||
this_jsvalue: JSC.JSValue,
|
||||
) void {
|
||||
if (this.has_waitpid_task) {
|
||||
return;
|
||||
}
|
||||
defer if (sync) this.updateHasPendingActivityFlag();
|
||||
this.has_waitpid_task = true;
|
||||
this.onWaitPid(sync, this_jsvalue, PosixSpawn.waitpid(this.pid, if (sync) 0 else std.os.W.NOHANG));
|
||||
}
|
||||
|
||||
pub fn onWaitPid(this: *Subprocess, sync: bool, this_jsvalue: JSC.JSValue, waitpid_result_: JSC.Maybe(PosixSpawn.WaitPidResult)) void {
|
||||
defer if (sync) this.updateHasPendingActivity();
|
||||
|
||||
const pid = this.pid;
|
||||
|
||||
switch (PosixSpawn.waitpid(pid, if (sync) 0 else std.os.W.NOHANG)) {
|
||||
.err => |err| {
|
||||
this.waitpid_err = err;
|
||||
},
|
||||
.result => |result| {
|
||||
if (result.pid != 0) {
|
||||
if (std.os.W.IFEXITED(result.status)) {
|
||||
this.exit_code = @as(u8, @truncate(std.os.W.EXITSTATUS(result.status)));
|
||||
var waitpid_result = waitpid_result_;
|
||||
while (true) {
|
||||
switch (waitpid_result) {
|
||||
.err => |err| {
|
||||
this.waitpid_err = err;
|
||||
},
|
||||
.result => |result| {
|
||||
if (result.pid == pid) {
|
||||
if (std.os.W.IFEXITED(result.status)) {
|
||||
this.exit_code = @as(u8, @truncate(std.os.W.EXITSTATUS(result.status)));
|
||||
}
|
||||
|
||||
if (std.os.W.IFSIGNALED(result.status)) {
|
||||
this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.TERMSIG(result.status)))));
|
||||
} else if (std.os.W.IFSTOPPED(result.status)) {
|
||||
this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.STOPSIG(result.status)))));
|
||||
}
|
||||
}
|
||||
|
||||
if (std.os.W.IFSIGNALED(result.status)) {
|
||||
this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.TERMSIG(result.status)))));
|
||||
} else if (std.os.W.IFSTOPPED(result.status)) {
|
||||
this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.STOPSIG(result.status)))));
|
||||
if (!this.hasExited()) {
|
||||
switch (this.watch()) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
if (comptime Environment.isMac) {
|
||||
if (err.getErrno() == .SRCH) {
|
||||
waitpid_result = PosixSpawn.waitpid(pid, if (sync) 0 else std.os.W.NOHANG);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.hasExited()) {
|
||||
this.watch();
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
break;
|
||||
}
|
||||
this.has_waitpid_task = false;
|
||||
|
||||
if (!sync and this.hasExited()) {
|
||||
var vm = this.globalThis.bunVM();
|
||||
|
||||
// prevent duplicate notifications
|
||||
if (this.poll_ref) |poll| {
|
||||
this.poll_ref = null;
|
||||
poll.deinitWithVM(vm);
|
||||
switch (this.poll) {
|
||||
.poll_ref => |poll_| {
|
||||
if (poll_) |poll| {
|
||||
this.poll.poll_ref = null;
|
||||
this.flags.reference_count -= @as(u30, @intFromBool(poll.isRegistered()));
|
||||
poll.deinitWithVM(vm);
|
||||
}
|
||||
},
|
||||
.wait_thread => {
|
||||
this.poll.wait_thread.poll_ref.deactivate(vm.event_loop_handle.?);
|
||||
},
|
||||
}
|
||||
|
||||
this.onExit(this.globalThis, this_jsvalue);
|
||||
@@ -1755,16 +1851,43 @@ pub const Subprocess = struct {
|
||||
log("onExit {d}, code={d}", .{ this.pid, if (this.exit_code) |e| @as(i32, @intCast(e)) else -1 });
|
||||
defer this.updateHasPendingActivity();
|
||||
this_jsvalue.ensureStillAlive();
|
||||
this.has_waitpid_task = false;
|
||||
|
||||
if (this.hasExited()) {
|
||||
{
|
||||
this.flags.reference_count += 1;
|
||||
|
||||
const Holder = struct {
|
||||
process: *Subprocess,
|
||||
task: JSC.AnyTask,
|
||||
|
||||
pub fn unref(self: *@This()) void {
|
||||
// this calls disableKeepingProcessAlive on pool_ref and stdin, stdout, stderr
|
||||
self.process.unref();
|
||||
self.process.flags.reference_count -= 1;
|
||||
self.process.updateHasPendingActivity();
|
||||
bun.default_allocator.destroy(self);
|
||||
}
|
||||
};
|
||||
|
||||
var holder = bun.default_allocator.create(Holder) catch @panic("OOM");
|
||||
|
||||
holder.* = .{
|
||||
.process = this,
|
||||
.task = JSC.AnyTask.New(Holder, Holder.unref).init(holder),
|
||||
};
|
||||
|
||||
this.globalThis.bunVM().enqueueTask(JSC.Task.init(&holder.task));
|
||||
}
|
||||
|
||||
if (this.exit_promise.trySwap()) |promise| {
|
||||
const waitpid_error = this.waitpid_err;
|
||||
this.waitpid_err = null;
|
||||
|
||||
if (this.exit_code) |code| {
|
||||
promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(code));
|
||||
} else if (this.signal_code != null) {
|
||||
promise.asAnyPromise().?.resolve(globalThis, this.getSignalCode(globalThis));
|
||||
} else if (this.waitpid_err) |err| {
|
||||
this.waitpid_err = null;
|
||||
} else if (waitpid_error) |err| {
|
||||
promise.asAnyPromise().?.reject(globalThis, err.toJSC(globalThis));
|
||||
} else {
|
||||
// crash in debug mode
|
||||
@@ -1775,8 +1898,11 @@ pub const Subprocess = struct {
|
||||
}
|
||||
|
||||
if (this.on_exit_callback.trySwap()) |callback| {
|
||||
const waitpid_error = this.waitpid_err;
|
||||
this.waitpid_err = null;
|
||||
|
||||
const waitpid_value: JSValue =
|
||||
if (this.waitpid_err) |err|
|
||||
if (waitpid_error) |err|
|
||||
err.toJSC(globalThis)
|
||||
else
|
||||
JSC.JSValue.jsUndefined();
|
||||
@@ -1801,30 +1927,6 @@ pub const Subprocess = struct {
|
||||
globalThis.bunVM().onUnhandledError(globalThis, result);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.hasExited()) {
|
||||
const Holder = struct {
|
||||
process: *Subprocess,
|
||||
task: JSC.AnyTask,
|
||||
|
||||
pub fn unref(self: *@This()) void {
|
||||
// this calls disableKeepingProcessAlive on pool_ref and stdin, stdout, stderr
|
||||
self.process.unref();
|
||||
self.process.has_pending_unref = false;
|
||||
self.process.updateHasPendingActivity();
|
||||
bun.default_allocator.destroy(self);
|
||||
}
|
||||
};
|
||||
|
||||
var holder = bun.default_allocator.create(Holder) catch @panic("OOM");
|
||||
this.has_pending_unref = true;
|
||||
holder.* = .{
|
||||
.process = this,
|
||||
.task = JSC.AnyTask.New(Holder, Holder.unref).init(holder),
|
||||
};
|
||||
|
||||
this.globalThis.bunVM().enqueueTask(JSC.Task.init(&holder.task));
|
||||
}
|
||||
}
|
||||
|
||||
const os = std.os;
|
||||
@@ -1833,8 +1935,6 @@ pub const Subprocess = struct {
|
||||
if (pipe[0] != pipe[1]) os.close(pipe[1]);
|
||||
}
|
||||
|
||||
const PosixSpawn = @import("./spawn.zig").PosixSpawn;
|
||||
|
||||
const Stdio = union(enum) {
|
||||
inherit: void,
|
||||
ignore: void,
|
||||
@@ -2072,4 +2172,135 @@ pub const Subprocess = struct {
|
||||
}
|
||||
|
||||
pub const IPCHandler = IPC.NewIPCHandler(Subprocess);
|
||||
|
||||
// Machines which do not support pidfd_open (GVisor, Linux Kernel < 5.6)
|
||||
// use a thread to wait for the child process to exit.
|
||||
// We use a single thread to call waitpid() in a loop.
|
||||
pub const WaiterThread = struct {
|
||||
concurrent_queue: Queue = .{},
|
||||
queue: std.ArrayList(*Subprocess) = std.ArrayList(*Subprocess).init(bun.default_allocator),
|
||||
started: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
|
||||
|
||||
pub fn setShouldUseWaiterThread() void {
|
||||
@atomicStore(bool, &should_use_waiter_thread, true, .Monotonic);
|
||||
}
|
||||
|
||||
pub fn shouldUseWaiterThread() bool {
|
||||
return @atomicLoad(bool, &should_use_waiter_thread, .Monotonic);
|
||||
}
|
||||
|
||||
pub const WaitTask = struct {
|
||||
subprocess: *Subprocess,
|
||||
next: ?*WaitTask = null,
|
||||
};
|
||||
|
||||
var should_use_waiter_thread = false;
|
||||
|
||||
pub const Queue = bun.UnboundedQueue(WaitTask, .next);
|
||||
pub var instance: WaiterThread = .{};
|
||||
pub fn init() !void {
|
||||
std.debug.assert(should_use_waiter_thread);
|
||||
|
||||
if (instance.started.fetchMax(1, .Monotonic) > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
var thread = try std.Thread.spawn(.{ .stack_size = 512 * 1024 }, loop, .{});
|
||||
thread.detach();
|
||||
}
|
||||
|
||||
pub const WaitPidResultTask = struct {
|
||||
result: JSC.Maybe(PosixSpawn.WaitPidResult),
|
||||
subprocess: *Subprocess,
|
||||
|
||||
pub fn runFromJSThread(self: *@This()) void {
|
||||
var result = self.result;
|
||||
var subprocess = self.subprocess;
|
||||
_ = subprocess.poll.wait_thread.ref_count.fetchSub(1, .Monotonic);
|
||||
bun.default_allocator.destroy(self);
|
||||
subprocess.onWaitPid(false, subprocess.this_jsvalue, result);
|
||||
}
|
||||
};
|
||||
|
||||
pub fn append(process: *Subprocess) void {
|
||||
if (process.poll == .wait_thread) {
|
||||
process.poll.wait_thread.poll_ref.activate(process.globalThis.bunVM().event_loop_handle.?);
|
||||
_ = process.poll.wait_thread.ref_count.fetchAdd(1, .Monotonic);
|
||||
} else {
|
||||
process.poll = .{
|
||||
.wait_thread = .{
|
||||
.poll_ref = .{},
|
||||
.ref_count = std.atomic.Atomic(u32).init(1),
|
||||
},
|
||||
};
|
||||
process.poll.wait_thread.poll_ref.activate(process.globalThis.bunVM().event_loop_handle.?);
|
||||
}
|
||||
|
||||
var task = bun.default_allocator.create(WaitTask) catch unreachable;
|
||||
task.* = WaitTask{
|
||||
.subprocess = process,
|
||||
};
|
||||
instance.concurrent_queue.push(task);
|
||||
process.updateHasPendingActivity();
|
||||
|
||||
init() catch @panic("Failed to start WaiterThread");
|
||||
}
|
||||
|
||||
pub fn loop() void {
|
||||
Output.Source.configureNamedThread("Waitpid");
|
||||
|
||||
var this = &instance;
|
||||
|
||||
while (true) {
|
||||
{
|
||||
var batch = this.concurrent_queue.popBatch();
|
||||
var iter = batch.iterator();
|
||||
this.queue.ensureUnusedCapacity(batch.count) catch unreachable;
|
||||
while (iter.next()) |task| {
|
||||
this.queue.appendAssumeCapacity(task.subprocess);
|
||||
bun.default_allocator.destroy(task);
|
||||
}
|
||||
}
|
||||
|
||||
var queue: []*Subprocess = this.queue.items;
|
||||
var i: usize = 0;
|
||||
while (queue.len > 0 and i < queue.len) {
|
||||
var process = queue[i];
|
||||
|
||||
// this case shouldn't really happen
|
||||
if (process.pid == bun.invalid_fd) {
|
||||
_ = this.queue.orderedRemove(i);
|
||||
_ = process.poll.wait_thread.ref_count.fetchSub(1, .Monotonic);
|
||||
queue = this.queue.items;
|
||||
continue;
|
||||
}
|
||||
|
||||
const result = PosixSpawn.waitpid(process.pid, std.os.W.NOHANG);
|
||||
if (result == .err or (result == .result and result.result.pid == process.pid)) {
|
||||
_ = this.queue.orderedRemove(i);
|
||||
queue = this.queue.items;
|
||||
|
||||
var task = bun.default_allocator.create(WaitPidResultTask) catch unreachable;
|
||||
task.* = WaitPidResultTask{
|
||||
.result = result,
|
||||
.subprocess = process,
|
||||
};
|
||||
|
||||
process.globalThis.bunVMConcurrently().enqueueTaskConcurrent(
|
||||
JSC.ConcurrentTask.create(
|
||||
JSC.Task.init(task),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
|
||||
var mask = std.os.empty_sigset;
|
||||
var signal: c_int = std.os.SIG.CHLD;
|
||||
var rc = std.c.sigwait(&mask, &signal);
|
||||
_ = rc;
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
@@ -1834,7 +1834,7 @@ pub const FilePoll = struct {
|
||||
log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) Subprocess", .{poll.fd});
|
||||
var loader = ptr.as(JSC.Subprocess);
|
||||
|
||||
loader.onExitNotification();
|
||||
loader.onExitNotificationTask();
|
||||
},
|
||||
@field(Owner.Tag, "FileSink") => {
|
||||
log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) FileSink", .{poll.fd});
|
||||
|
||||
@@ -344,7 +344,7 @@ const Futimes = JSC.Node.Async.futimes;
|
||||
const Lchmod = JSC.Node.Async.lchmod;
|
||||
const Lchown = JSC.Node.Async.lchown;
|
||||
const Unlink = JSC.Node.Async.unlink;
|
||||
|
||||
const WaitPidResultTask = JSC.Subprocess.WaiterThread.WaitPidResultTask;
|
||||
// Task.get(ReadFileTask) -> ?ReadFileTask
|
||||
pub const Task = TaggedPointerUnion(.{
|
||||
FetchTasklet,
|
||||
@@ -403,6 +403,7 @@ pub const Task = TaggedPointerUnion(.{
|
||||
Lchmod,
|
||||
Lchown,
|
||||
Unlink,
|
||||
WaitPidResultTask,
|
||||
});
|
||||
const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue;
|
||||
pub const ConcurrentTask = struct {
|
||||
@@ -923,6 +924,10 @@ pub const EventLoop = struct {
|
||||
var any: *Unlink = task.get(Unlink).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(WaitPidResultTask))) => {
|
||||
var any: *WaitPidResultTask = task.get(WaitPidResultTask).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
else => if (Environment.allow_assert) {
|
||||
bun.Output.prettyln("\nUnexpected tag: {s}\n", .{@tagName(task.tag())});
|
||||
} else {
|
||||
@@ -1131,6 +1136,7 @@ pub const EventLoop = struct {
|
||||
|
||||
var global = ctx.global;
|
||||
var global_vm = ctx.jsc;
|
||||
|
||||
while (true) {
|
||||
while (this.tickWithCount() > 0) : (this.global.handleRejectedPromises()) {
|
||||
this.tickConcurrent();
|
||||
|
||||
@@ -739,6 +739,13 @@ pub const VirtualMachine = struct {
|
||||
}
|
||||
|
||||
if (map.get("BUN_GARBAGE_COLLECTOR_LEVEL")) |gc_level| {
|
||||
// Reuse this flag for other things to avoid unnecessary hashtable
|
||||
// lookups on start for obscure flags which we do not want others to
|
||||
// depend on.
|
||||
if (map.get("BUN_FEATURE_FLAG_FORCE_WAITER_THREAD") != null) {
|
||||
JSC.Subprocess.WaiterThread.setShouldUseWaiterThread();
|
||||
}
|
||||
|
||||
if (strings.eqlComptime(gc_level, "1")) {
|
||||
this.aggressive_garbage_collection = .mild;
|
||||
} else if (strings.eqlComptime(gc_level, "2")) {
|
||||
|
||||
@@ -476,11 +476,23 @@ pub fn scoped(comptime tag: @Type(.EnumLiteral), comptime disabled: bool) _log_f
|
||||
defer lock.unlock();
|
||||
|
||||
if (Output.enable_ansi_colors_stderr) {
|
||||
out.print(comptime prettyFmt("<r><d>[" ++ @tagName(tag) ++ "]<r> " ++ fmt, true), args) catch unreachable;
|
||||
buffered_writer.flush() catch unreachable;
|
||||
out.print(comptime prettyFmt("<r><d>[" ++ @tagName(tag) ++ "]<r> " ++ fmt, true), args) catch {
|
||||
really_disable = true;
|
||||
return;
|
||||
};
|
||||
buffered_writer.flush() catch {
|
||||
really_disable = true;
|
||||
return;
|
||||
};
|
||||
} else {
|
||||
out.print(comptime prettyFmt("<r><d>[" ++ @tagName(tag) ++ "]<r> " ++ fmt, false), args) catch unreachable;
|
||||
buffered_writer.flush() catch unreachable;
|
||||
out.print(comptime prettyFmt("<r><d>[" ++ @tagName(tag) ++ "]<r> " ++ fmt, false), args) catch {
|
||||
really_disable = true;
|
||||
return;
|
||||
};
|
||||
buffered_writer.flush() catch {
|
||||
really_disable = true;
|
||||
return;
|
||||
};
|
||||
}
|
||||
}
|
||||
}.log;
|
||||
|
||||
@@ -568,12 +568,24 @@ pub const StandaloneModuleGraph = struct {
|
||||
if (bun.strings.eqlComptimeIgnoreLen(bun.argv()[0][0..argv0_len], "bun")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (comptime Environment.isDebug) {
|
||||
if (bun.strings.eqlComptimeIgnoreLen(bun.argv()[0][0..argv0_len], "bun-debug")) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (argv0_len == 4) {
|
||||
if (bun.strings.eqlComptimeIgnoreLen(bun.argv()[0][0..argv0_len], "bunx")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (comptime Environment.isDebug) {
|
||||
if (bun.strings.eqlComptimeIgnoreLen(bun.argv()[0][0..argv0_len], "bun-debugx")) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,12 +5,9 @@ import { closeSync, openSync } from "fs";
|
||||
|
||||
test("spawn can read from stdout multiple chunks", async () => {
|
||||
gcTick(true);
|
||||
const maxFD = openSync("/dev/null", "w");
|
||||
closeSync(maxFD);
|
||||
|
||||
for (let i = 0; i < 10; i++)
|
||||
var maxFD: number = -1;
|
||||
for (let i = 0; i < 100; i++) {
|
||||
await (async function () {
|
||||
var exited;
|
||||
const proc = spawn({
|
||||
cmd: [bunExe(), import.meta.dir + "/spawn-streaming-stdout-repro.js"],
|
||||
stdin: "ignore",
|
||||
@@ -34,9 +31,14 @@ test("spawn can read from stdout multiple chunks", async () => {
|
||||
// TODO: fix bug with returning SIGHUP instead of exit code 1
|
||||
proc.kill();
|
||||
expect(Buffer.concat(chunks).toString()).toBe("Wrote to stdout\n".repeat(4));
|
||||
await proc.exited;
|
||||
})();
|
||||
|
||||
if (maxFD === -1) {
|
||||
maxFD = openSync("/dev/null", "w");
|
||||
closeSync(maxFD);
|
||||
}
|
||||
}
|
||||
const newMaxFD = openSync("/dev/null", "w");
|
||||
closeSync(newMaxFD);
|
||||
expect(newMaxFD).toBe(maxFD);
|
||||
});
|
||||
}, 60_000);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { ArrayBufferSink, readableStreamToText, spawn, spawnSync, write } from "bun";
|
||||
import { describe, expect, it } from "bun:test";
|
||||
import { gcTick as _gcTick, bunExe } from "harness";
|
||||
import { gcTick as _gcTick, bunExe, bunEnv } from "harness";
|
||||
import { rmSync, writeFileSync } from "node:fs";
|
||||
import path from "path";
|
||||
|
||||
@@ -161,7 +161,7 @@ for (let [gcTick, label] of [
|
||||
expect(exitCode1).toBe(0);
|
||||
expect(exitCode2).toBe(1);
|
||||
}
|
||||
}, 20_000);
|
||||
}, 60_000_0);
|
||||
|
||||
// FIXME: fix the assertion failure
|
||||
it.skip("Uint8Array works as stdout", () => {
|
||||
@@ -476,3 +476,22 @@ for (let [gcTick, label] of [
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (!process.env.BUN_FEATURE_FLAG_FORCE_WAITER_THREAD) {
|
||||
it("with BUN_FEATURE_FLAG_FORCE_WAITER_THREAD", async () => {
|
||||
const result = spawnSync({
|
||||
cmd: [bunExe(), "test", import.meta.path],
|
||||
env: {
|
||||
...bunEnv,
|
||||
// Both flags are necessary to force this condition
|
||||
"BUN_FEATURE_FLAG_FORCE_WAITER_THREAD": "1",
|
||||
"BUN_GARBAGE_COLLECTOR_LEVEL": "1",
|
||||
},
|
||||
});
|
||||
if (result.exitCode !== 0) {
|
||||
console.error(result.stderr.toString());
|
||||
console.log(result.stdout.toString());
|
||||
}
|
||||
expect(result.exitCode).toBe(0);
|
||||
}, 60_000);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user