Compare commits

...

4 Commits

Author SHA1 Message Date
Jarred-Sumner
504f9b6c85 bun run clang-format 2025-06-02 10:53:50 +00:00
Jarred-Sumner
428ce3c44f bun run zig-format 2025-06-02 10:52:20 +00:00
Jarred-Sumner
b918b1ffcd bun scripts/glob-sources.mjs 2025-06-02 10:51:38 +00:00
Jarred Sumner
5c0f8a40cf Fixes #18239 2025-06-02 03:43:13 -07:00
8 changed files with 408 additions and 1 deletions

View File

@@ -11,6 +11,7 @@ src/analytics/analytics_schema.zig
src/analytics/analytics_thread.zig
src/api/schema.zig
src/ast/base.zig
src/async/darwin_select_fallback_thread.zig
src/async/posix_event_loop.zig
src/async/stub_event_loop.zig
src/async/windows_event_loop.zig

View File

@@ -0,0 +1,156 @@
const DarwinSelectFallbackThread = @This();
select_thread: ?std.Thread = null,
should_stop: std.atomic.Value(bool) = .init(false),
lock: bun.Mutex = .{},
waker: bun.Async.Waker,
registered_event_loops: std.AutoArrayHashMapUnmanaged(i32, std.ArrayListUnmanaged(JSC.EventLoopHandle)) = .{},
registered_event_loops_lock: bun.Mutex = .{},
extern "c" fn darwin_select_thread_is_needed_for_fd(fd: i32) bool;
/// Simple map for fd -> FilePoll that works with any EventLoopHandle
pub const Map = struct {
fd_to_file_poll: std.AutoHashMapUnmanaged(i32, *FilePoll) = .{},
pub fn put(self: *Map, fd: i32, file_poll: *FilePoll) void {
self.fd_to_file_poll.put(bun.default_allocator, fd, file_poll) catch {};
}
pub fn remove(self: *Map, fd: i32) void {
_ = self.fd_to_file_poll.remove(fd);
}
pub fn get(self: *Map, fd: i32) ?*FilePoll {
return self.fd_to_file_poll.get(fd);
}
pub fn deinit(self: *Map) void {
self.fd_to_file_poll.deinit(bun.default_allocator);
}
};
pub fn get() *DarwinSelectFallbackThread {
const Holder = struct {
pub var thread: *DarwinSelectFallbackThread = undefined;
pub var select_fallback_thread_once = std.once(initFallbackThread);
pub fn initFallbackThread() void {
thread = bun.default_allocator.create(DarwinSelectFallbackThread) catch bun.outOfMemory();
thread.* = .{
.waker = bun.Async.Waker.init() catch @panic("Unexpected error: Failed to initialize waker for select fallback thread"),
};
}
};
Holder.select_fallback_thread_once.call();
return Holder.thread;
}
pub fn isNeededForStdin() bool {
return darwin_select_thread_is_needed_for_fd(0);
}
pub fn isNeededForFd(fd: i32) bool {
return darwin_select_thread_is_needed_for_fd(fd);
}
pub fn register(event_loop_handle: JSC.EventLoopHandle, fd: bun.FileDescriptor) void {
const this = get();
{
this.lock.lock();
defer this.lock.unlock();
// Add to the list
const entry = this.registered_event_loops.getOrPut(bun.default_allocator, fd.cast()) catch bun.outOfMemory();
if (!entry.found_existing) {
entry.value_ptr.* = .{};
}
entry.value_ptr.*.append(bun.default_allocator, event_loop_handle) catch bun.outOfMemory();
// Start select thread if not running
if (this.select_thread == null) {
this.select_thread = std.Thread.spawn(.{}, selectThreadMain, .{this}) catch bun.outOfMemory();
this.select_thread.?.setName("Bun stdin select() thread") catch {};
}
}
this.waker.wake();
}
pub fn unregister(event_loop_handle: JSC.EventLoopHandle, fd: bun.FileDescriptor) void {
const this = get();
{
this.lock.lock();
defer this.lock.unlock();
const entry = this.registered_event_loops.getEntry(@intCast(fd.cast())) orelse return;
const index = for (entry.value_ptr.*.items, 0..) |handle, i| {
if (handle.eq(event_loop_handle)) break i;
} else return;
_ = entry.value_ptr.*.swapRemove(index);
if (entry.value_ptr.items.len == 0) {
_ = this.registered_event_loops.swapRemove(entry.key_ptr.*);
}
}
this.waker.wake();
}
export fn darwin_select_thread_fd_is_readable(fd: i32) void {
const this = get();
this.registered_event_loops_lock.lock();
var event_loop_handles: std.ArrayListUnmanaged(JSC.EventLoopHandle) = brk: {
if (this.registered_event_loops.get(fd)) |*loops| {
break :brk loops.clone(bun.default_allocator) catch bun.outOfMemory();
}
break :brk .{};
};
this.registered_event_loops_lock.unlock();
defer event_loop_handles.deinit(bun.default_allocator);
for (event_loop_handles.items) |event_loop_handle| {
switch (event_loop_handle) {
.js => |event_loop| event_loop.onDarwinSelectThreadFdIsReadable(fd),
.mini => |mini_event_loop| mini_event_loop.onDarwinSelectThreadFdIsReadable(fd),
}
}
}
extern "c" fn darwin_select_thread_wait_for_events(kqueue_fd: i32, machport: *anyopaque, buffer_ptr: [*]u8, buffer_len: usize, fds_ptr: [*]i32, fds_len: usize) void;
fn selectThreadMain(this: *DarwinSelectFallbackThread) void {
while (!this.should_stop.load(.acquire)) {
var stack_fallback = std.heap.stackFallback(1024, bun.default_allocator);
const fallback_allocator = stack_fallback.get();
this.lock.lock();
const fds = fallback_allocator.dupe(i32, this.registered_event_loops.keys()) catch bun.outOfMemory();
defer fallback_allocator.free(fds);
this.lock.unlock();
const kqueue_fd: i32 = @intCast(this.waker.getFd().cast());
const machport = this.waker.machport;
darwin_select_thread_wait_for_events(
kqueue_fd,
machport,
this.waker.machport_buf.ptr,
this.waker.machport_buf.len,
fds.ptr,
fds.len,
);
}
}
comptime {
_ = darwin_select_thread_fd_is_readable;
}
const std = @import("std");
const bun = @import("bun");
const JSC = bun.JSC;
const FilePoll = bun.Async.FilePoll;

View File

@@ -5,6 +5,9 @@ const uws = bun.uws;
const Environment = bun.Environment;
const std = @import("std");
pub const DarwinSelectFallbackThread = if (Environment.isMac) @import("darwin_select_fallback_thread.zig") else void;
const VirtualMachine = JSC.VirtualMachine;
pub const Loop = uws.Loop;
/// Track if an object whose file descriptor is being watched should keep the event loop alive.
@@ -835,6 +838,29 @@ pub const FilePoll = struct {
return errno;
}
} else if (comptime Environment.isMac) {
// Check if this needs the select fallback (for stdin or problematic pipes)
const needs_select_fallback = flag == .readable and ((fd.cast() == 0 and DarwinSelectFallbackThread.isNeededForStdin()) or
(this.flags.contains(.fifo) and DarwinSelectFallbackThread.isNeededForFd(fd.cast())));
if (needs_select_fallback) {
// Register with the Darwin select fallback thread instead of kqueue
const event_loop_handle = switch (this.allocator_type) {
.js => JSC.EventLoopHandle.init(VirtualMachine.get().eventLoop()),
.mini => JSC.EventLoopHandle.init(JSC.MiniEventLoop.global),
};
const map = switch (this.allocator_type) {
.js => VirtualMachine.get().eventLoop().getDarwinSelectFdMap(),
.mini => JSC.MiniEventLoop.global.getDarwinSelectFdMap(),
};
DarwinSelectFallbackThread.register(event_loop_handle, fd);
map.put(fd.cast(), this);
this.activate(loop);
this.flags.insert(.poll_readable);
this.flags.insert(.was_ever_registered);
return JSC.Maybe(void).success;
}
var changelist = std.mem.zeroes([2]std.posix.system.kevent64_s);
const one_shot_flag: u16 = if (!this.flags.contains(.one_shot))
0
@@ -1005,6 +1031,30 @@ pub const FilePoll = struct {
return errno;
}
} else if (comptime Environment.isMac) {
// Check if this is registered with the select fallback (stdin or problematic pipes)
const was_select_fallback = flag == .readable and ((fd.cast() == 0 and DarwinSelectFallbackThread.isNeededForStdin()) or
(this.flags.contains(.fifo) and DarwinSelectFallbackThread.isNeededForFd(fd.cast())));
if (was_select_fallback) {
// Unregister from the Darwin select fallback thread
const event_loop_handle = switch (this.allocator_type) {
.js => JSC.EventLoopHandle.init(VirtualMachine.get().eventLoop()),
.mini => JSC.EventLoopHandle.init(JSC.MiniEventLoop.global),
};
const map = switch (this.allocator_type) {
.js => VirtualMachine.get().eventLoop().getDarwinSelectFdMap(),
.mini => JSC.MiniEventLoop.global.getDarwinSelectFdMap(),
};
DarwinSelectFallbackThread.unregister(event_loop_handle, fd);
map.remove(fd.cast());
this.flags.remove(.poll_readable);
this.flags.remove(.poll_writable);
this.flags.remove(.poll_process);
this.flags.remove(.poll_machport);
return JSC.Maybe(void).success;
}
var changelist = std.mem.zeroes([2]std.posix.system.kevent64_s);
changelist[0] = switch (flag) {

View File

@@ -29,6 +29,9 @@ imminent_gc_timer: std.atomic.Value(?*Timer.WTFTimer) = .{ .raw = null },
signal_handler: if (Environment.isPosix) ?*PosixSignalHandle else void = if (Environment.isPosix) null,
/// Map file descriptors to FilePoll for Darwin select fallback
darwin_select_fd_map: if (Environment.isMac) ?*Async.DarwinSelectFallbackThread.Map else void = if (Environment.isMac) null,
pub const Debug = if (Environment.isDebug) struct {
is_inside_tick_queue: bool = false,
js_call_count_outside_tick_queue: usize = 0,
@@ -606,6 +609,48 @@ pub fn unrefConcurrently(this: *EventLoop) void {
this.wakeup();
}
// Dispatch just the fd number to the main thread for safe lookup
pub const DarwinSelectReadyTask = struct {
fd: i32,
pub fn decode(task_ptr_but_not_actually_a_task: *@This()) i32 {
return @bitCast(@as(u32, @truncate(@intFromPtr(task_ptr_but_not_actually_a_task))));
}
pub fn runOnJSThread(task_ptr_but_not_actually_a_task: *@This(), event_loop: *EventLoop) void {
if (comptime !Environment.isMac) unreachable;
const fd: i32 = decode(task_ptr_but_not_actually_a_task);
// Look up the FilePoll on the main thread where it's safe
if (event_loop.darwin_select_fd_map) |map| {
if (map.get(fd)) |file_poll| {
file_poll.onUpdate(0);
}
}
}
};
/// Called from the Darwin select fallback thread when a file descriptor becomes readable
pub fn onDarwinSelectThreadFdIsReadable(this: *EventLoop, fd: i32) void {
if (comptime !Environment.isMac) return;
// This works because its a 32 bit integer which is less than the 48 bits of address space
const store_fd_without_allocating_memory_by_stuffing_it_into_a_pointer: *DarwinSelectReadyTask = @ptrFromInt(@as(usize, @as(u32, @bitCast(fd))));
this.enqueueTaskConcurrent(ConcurrentTask.create(Task.init(store_fd_without_allocating_memory_by_stuffing_it_into_a_pointer)));
}
/// Get or create the Darwin select fallback map
pub fn getDarwinSelectFdMap(this: *EventLoop) *Async.DarwinSelectFallbackThread.Map {
if (comptime !Environment.isMac) unreachable;
return this.darwin_select_fd_map orelse brk: {
const map = bun.default_allocator.create(Async.DarwinSelectFallbackThread.Map) catch bun.outOfMemory();
map.* = .{};
this.darwin_select_fd_map = map;
break :brk map;
};
}
pub const AnyEventLoop = @import("./event_loop/AnyEventLoop.zig").AnyEventLoop;
pub const ConcurrentPromiseTask = @import("./event_loop/ConcurrentPromiseTask.zig").ConcurrentPromiseTask;
pub const WorkTask = @import("./event_loop/WorkTask.zig").WorkTask;

View File

@@ -114,6 +114,13 @@ pub const EventLoopHandle = union(EventLoopKind) {
this.loop().unref();
}
pub fn eq(this: EventLoopHandle, other: EventLoopHandle) bool {
return switch (this) {
.js => this.js == other.js,
.mini => this.mini == other.mini,
};
}
pub inline fn createNullDelimitedEnvMap(this: @This(), alloc: Allocator) ![:null]?[*:0]const u8 {
return switch (this) {
.js => this.js.virtual_machine.transpiler.env.map.createNullDelimitedEnvMap(alloc),

View File

@@ -32,6 +32,10 @@ after_event_loop_callback: ?JSC.OpaqueCallback = null,
pipe_read_buffer: ?*PipeReadBuffer = null,
stdout_store: ?*bun.webcore.Blob.Store = null,
stderr_store: ?*bun.webcore.Blob.Store = null,
/// Map file descriptors to FilePoll for Darwin select fallback
darwin_select_fd_map: if (Environment.isMac) ?*Async.DarwinSelectFallbackThread.Map else void = if (Environment.isMac) null,
const PipeReadBuffer = [256 * 1024]u8;
pub threadlocal var globalInitialized: bool = false;
@@ -282,6 +286,47 @@ pub fn stdout(this: *MiniEventLoop) *JSC.WebCore.Blob.Store {
};
}
/// Called from the Darwin select fallback thread when a file descriptor becomes readable
pub fn onDarwinSelectThreadFdIsReadable(this: *MiniEventLoop, fd: i32) void {
if (comptime !Environment.isMac) return;
// Dispatch just the fd number to the main thread for safe lookup
const DarwinSelectReadyTask = struct {
fd: i32,
pub fn runFromJSThreadMini(task: *@This(), mini_event_loop_ptr: *anyopaque) void {
defer bun.default_allocator.destroy(task);
const mini_event_loop: *MiniEventLoop = @ptrCast(@alignCast(mini_event_loop_ptr));
// Look up the FilePoll on the main thread where it's safe
if (mini_event_loop.darwin_select_fd_map) |map| {
if (map.get(task.fd)) |file_poll| {
file_poll.onUpdate(0);
}
}
}
};
const darwin_select_ready_task = bun.new(DarwinSelectReadyTask, .{
.fd = fd,
});
this.enqueueTaskConcurrent(JSC.AnyTaskWithExtraContext.fromCallbackAutoDeinit(darwin_select_ready_task, "runFromJSThreadMini"));
}
/// Get or create the Darwin select fallback map
pub fn getDarwinSelectFdMap(this: *MiniEventLoop) *Async.DarwinSelectFallbackThread.Map {
if (comptime !Environment.isMac) unreachable;
if (this.darwin_select_fd_map) |map| {
return map;
}
const map = this.allocator.create(Async.DarwinSelectFallbackThread.Map) catch bun.outOfMemory();
map.* = .{};
this.darwin_select_fd_map = map;
return map;
}
pub const JsVM = struct {
vm: *VirtualMachine,

View File

@@ -89,6 +89,7 @@ pub const Task = TaggedPointerUnion(.{
WriteFile,
WriteFileTask,
Writev,
DarwinSelectReadyTask,
});
pub fn tickQueueWithCount(this: *EventLoop, virtual_machine: *VirtualMachine) u32 {
@@ -479,7 +480,14 @@ pub fn tickQueueWithCount(this: *EventLoop, virtual_machine: *VirtualMachine) u3
var any: *FlushPendingFileSinkTask = task.get(FlushPendingFileSinkTask).?;
any.runFromJSThread();
},
@field(Task.Tag, @typeName(DarwinSelectReadyTask)) => {
if (comptime Environment.isMac) {
var any: *DarwinSelectReadyTask = task.get(DarwinSelectReadyTask).?;
any.runOnJSThread(this);
} else {
unreachable;
}
},
else => {
bun.Output.panic("Unexpected tag: {s}", .{@tagName(task.tag())});
},
@@ -581,6 +589,7 @@ const ShellAsyncSubprocessDone = shell.Interpreter.Cmd.ShellAsyncSubprocessDone;
const RuntimeTranspilerStore = JSC.ModuleLoader.RuntimeTranspilerStore;
const ServerAllConnectionsClosedTask = @import("../api/server.zig").ServerAllConnectionsClosedTask;
const FlushPendingFileSinkTask = bun.webcore.FileSink.FlushPendingTask;
const DarwinSelectReadyTask = JSC.EventLoop.DarwinSelectReadyTask;
const bun = @import("bun");
const JSC = bun.JSC;

View File

@@ -117,6 +117,100 @@ extern "C" bool io_darwin_schedule_wakeup(mach_port_t waker)
}
}
extern "C" void darwin_select_thread_fd_is_readable(int fd);
extern "C" void darwin_select_thread_wait_for_events(int kqueue_fd, mach_port_t* _Nonnull machport, char* machport_buffer, size_t machport_buffer_size, int* fds, size_t fds_len)
{
fd_set read_set;
FD_ZERO(&read_set);
int max_fd = kqueue_fd;
for (size_t i = 0; i < fds_len; i++) {
FD_SET(fds[i], &read_set);
if (fds[i] > max_fd) {
max_fd = fds[i];
}
}
FD_SET(kqueue_fd, &read_set);
while (true) {
int rv = select(max_fd + 1, &read_set, NULL, NULL, NULL);
if (rv == -1) {
if (errno == EINTR) {
continue;
}
break;
}
for (size_t i = 0; i < fds_len; i++) {
int fd = fds[i];
if (FD_ISSET(fd, &read_set)) {
darwin_select_thread_fd_is_readable(fd);
}
}
if (FD_ISSET(kqueue_fd, &read_set)) {
struct kevent64_s event[5];
while (true) {
// a 0 timeout so it immediately returns
// Use the flag to effect a poll
int ret = kevent64(kqueue_fd, NULL, 0, event, 5, 0, NULL);
if (ret == -1) {
if (errno == EINTR) {
continue;
}
break;
}
if (ret == 0) {
break;
}
for (size_t i = 0; i < ret; i++) {
if (event[i].filter == EVFILT_MACHPORT) {
// Read the machport message to clear it and prevent continuous wakeups
mach_msg_header_t msg;
mach_msg_return_t msg_ret = mach_msg(&msg, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0, sizeof(msg), *machport, 0, MACH_PORT_NULL);
// Validate the message was received successfully
if (msg_ret != MACH_MSG_SUCCESS && msg_ret != MACH_RCV_TIMED_OUT) {
break;
}
}
}
// Halt here, we've received a message from the machport, so we need to restart the outer loop.
return;
}
}
// infinite loop
}
}
extern "C" bool darwin_select_thread_is_needed_for_fd(int fd)
{
// Test if the given fd is compatible with kqueue
// Some fd configurations on macOS don't work well with kqueue
int test_kqueue = kqueue();
if (test_kqueue == -1) {
return true; // If kqueue fails, definitely need select fallback
}
struct kevent64_s event = {};
event.ident = fd;
event.filter = EVFILT_READ;
event.flags = EV_ADD | EV_ENABLE;
// Try to register fd with kqueue
int result = kevent64(test_kqueue, &event, 1, NULL, 0, 0, NULL);
bool needs_fallback = (result == -1);
close(test_kqueue);
// If kevent fails for stdin, we need the select fallback
return needs_fallback;
}
#else
// stub out these symbols