diff --git a/src/bun.js/node/node_fs_watcher.zig b/src/bun.js/node/node_fs_watcher.zig index 40df4708cd..61ca43454f 100644 --- a/src/bun.js/node/node_fs_watcher.zig +++ b/src/bun.js/node/node_fs_watcher.zig @@ -43,6 +43,12 @@ pub const FSWatcher = struct { pending_activity_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1), current_task: FSWatchTask = undefined, + // NOTE: this may equal `.current_task`. do not free it! + // if we create multiple on the same tick we use this buffer. + last_task: ?*FSWatchTask = null, + + js_thread: std.Thread.Id, + pub fn eventLoop(this: FSWatcher) *EventLoop { return this.ctx.eventLoop(); } @@ -59,21 +65,25 @@ pub const FSWatcher = struct { pub const finalize = deinit; - pub const FSWatchTask = if (Environment.isWindows) FSWatchTaskWindows else FSWatchTaskPosix; - pub const FSWatchTaskPosix = struct { - pub const new = bun.TrivialNew(@This()); + pub const FSWatchTask = FSWatchTaskPosix; + const FSWatchTaskPosix = struct { + concurrent_task: JSC.EventLoopTask, ctx: *FSWatcher, - count: u8 = 0, + count: u8, + entries: [BATCH_SIZE]Entry = undefined, + // Track if we need to perform ref operation on main thread + needs_ref: bool = false, - entries: [8]Entry = undefined, - concurrent_task: JSC.ConcurrentTask = undefined, - - pub const Entry = struct { + const Entry = struct { event: Event, needs_free: bool, }; + const BATCH_SIZE = 10; + + pub const new = bun.TrivialNew(@This()); + pub fn deinit(this: *FSWatchTask) void { this.cleanEntries(); if (comptime Environment.allow_assert) { @@ -82,13 +92,15 @@ pub const FSWatcher = struct { bun.destroy(this); } - pub fn append(this: *FSWatchTask, event: Event, needs_free: bool) void { - if (this.count == 8) { + pub fn append(this: *FSWatchTaskPosix, event: Event, needs_free: bool) void { + if (this.count >= this.entries.len) { this.enqueue(); const ctx = this.ctx; this.* = .{ .ctx = ctx, .count = 0, + .concurrent_task = undefined, + .needs_ref = false, }; } @@ -101,25 +113,31 @@ pub const FSWatcher = struct { pub fn run(this: *FSWatchTask) void { // this runs on JS Context Thread + const ctx = this.ctx; + + // Perform ref operation on main thread if needed + if (this.needs_ref and ctx.persistent) { + ctx.poll_ref.ref(ctx.ctx); + } for (this.entries[0..this.count]) |entry| { switch (entry.event) { inline .rename, .change => |file_path, t| { - this.ctx.emit(file_path, t); + ctx.emit(file_path, t); }, .@"error" => |err| { - this.ctx.emitError(err); + ctx.emitError(err); }, .abort => { - this.ctx.emitIfAborted(); + ctx.emitIfAborted(); }, .close => { - this.ctx.emit("", .close); + ctx.emit("", .close); }, } } - this.ctx.unrefTask(); + ctx.unrefTaskMainThread(); } pub fn appendAbort(this: *FSWatchTask) void { @@ -131,20 +149,32 @@ pub const FSWatcher = struct { if (this.count == 0) return; - // if false is closed or detached (can still contain valid refs but will not create a new one) - if (this.ctx.refTask()) { - var that = FSWatchTask.new(this.*); - // Clear the count before enqueueing to avoid double processing - const saved_count = this.count; - this.count = 0; - that.count = saved_count; - that.concurrent_task.task = JSC.Task.init(that); - this.ctx.enqueueTaskConcurrent(&that.concurrent_task); + // Check if we can proceed from watcher thread + const ctx = this.ctx; + ctx.mutex.lock(); + defer ctx.mutex.unlock(); + + if (ctx.closed) { + this.cleanEntries(); return; } - // closed or detached so just cleanEntries - this.cleanEntries(); + + // Track pending activity + const current = ctx.pending_activity_count.fetchAdd(1, .monotonic); + + var that = FSWatchTask.new(this.*); + // Clear the count before enqueueing to avoid double processing + const saved_count = this.count; + this.count = 0; + that.count = saved_count; + + // If we went from 0 to 1, we need to ref on the main thread + that.needs_ref = (current == 0); + + that.concurrent_task.task = JSC.Task.init(that); + ctx.enqueueTaskConcurrent(&that.concurrent_task); } + pub fn cleanEntries(this: *FSWatchTask) void { for (this.entries[0..this.count]) |*entry| { if (entry.needs_free) { @@ -437,7 +467,7 @@ pub const FSWatcher = struct { }; pub fn initJS(this: *FSWatcher, listener: JSC.JSValue) void { - // The initial activity count is already 1 from init() + // We're on main thread during init if (this.persistent) { this.poll_ref.ref(this.ctx); } @@ -559,6 +589,7 @@ pub const FSWatcher = struct { pub fn doRef(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue { if (!this.closed and !this.persistent) { this.persistent = true; + // We're on main thread here this.poll_ref.ref(this.ctx); } return .undefined; @@ -567,6 +598,7 @@ pub const FSWatcher = struct { pub fn doUnref(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue { if (this.persistent) { this.persistent = false; + // We're on main thread here this.poll_ref.unref(this.ctx); } return .undefined; @@ -576,16 +608,31 @@ pub const FSWatcher = struct { return JSC.JSValue.jsBoolean(this.persistent); } - // this can be called from Watcher Thread or JS Context Thread + // Only call from main thread + pub fn refTaskMainThread(this: *FSWatcher) void { + if (this.persistent) { + this.poll_ref.ref(this.ctx); + } + } + + // Only call from main thread + pub fn unrefTaskMainThread(this: *FSWatcher) void { + this.mutex.lock(); + defer this.mutex.unlock(); + + const current = this.pending_activity_count.fetchSub(1, .monotonic); + // If we went from 1 to 0, we need to unref the poll + if (current == 1 and this.persistent) { + this.poll_ref.unref(this.ctx); + } + } + + // Can be called from any thread - delegates to main thread version when on JS thread pub fn refTask(this: *FSWatcher) bool { this.mutex.lock(); defer this.mutex.unlock(); if (this.closed) return false; - const current = this.pending_activity_count.fetchAdd(1, .monotonic); - // If we went from 0 to 1, we need to ref the poll - if (current == 0 and this.persistent) { - this.poll_ref.ref(this.ctx); - } + _ = this.pending_activity_count.fetchAdd(1, .monotonic); return true; } @@ -593,14 +640,14 @@ pub const FSWatcher = struct { return this.pending_activity_count.load(.acquire) > 0; } + // Can be called from any thread - delegates to main thread version when on JS thread pub fn unrefTask(this: *FSWatcher) void { - this.mutex.lock(); - defer this.mutex.unlock(); - // JSC eventually will free it - const current = this.pending_activity_count.fetchSub(1, .monotonic); - // If we went from 1 to 0, we need to unref the poll - if (current == 1 and this.persistent) { - this.poll_ref.unref(this.ctx); + // If we're on the JS thread (where we have access to the VM), use the main thread version + // Otherwise just decrement the count + if (this.js_thread == std.Thread.getCurrentId()) { + this.unrefTaskMainThread(); + } else { + _ = this.pending_activity_count.fetchSub(1, .monotonic); } } @@ -614,14 +661,18 @@ pub const FSWatcher = struct { if (js_this != .zero) { if (FSWatcher.js.listenerGetCached(js_this)) |listener| { - _ = this.refTask(); + // We're on main thread during close + const current = this.pending_activity_count.fetchAdd(1, .monotonic); + if (current == 0 and this.persistent) { + this.poll_ref.ref(this.ctx); + } log("emit('close')", .{}); emitJS(listener, this.globalThis, .undefined, .close); - this.unrefTask(); + this.unrefTaskMainThread(); } } - this.unrefTask(); + this.unrefTaskMainThread(); } else { this.mutex.unlock(); } @@ -698,9 +749,9 @@ pub const FSWatcher = struct { .path_watcher = null, .globalThis = args.global_this, .js_this = .zero, - .encoding = args.encoding, - .closed = false, .verbose = args.verbose, + .encoding = args.encoding, + .js_thread = std.Thread.getCurrentId(), }); ctx.current_task.ctx = ctx; diff --git a/src/watcher/INotifyWatcher.zig b/src/watcher/INotifyWatcher.zig index cba39c4e6c..cd665e0fb1 100644 --- a/src/watcher/INotifyWatcher.zig +++ b/src/watcher/INotifyWatcher.zig @@ -32,6 +32,11 @@ watch_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), /// nanoseconds coalesce_interval: isize = 100_000, +/// Waker to signal the watcher thread +waker: bun.Async.Waker = undefined, +/// Whether the watcher is still running +running: std.atomic.Value(bool) = std.atomic.Value(bool).init(true), + pub const EventListIndex = c_int; pub const Event = extern struct { watch_descriptor: EventListIndex, @@ -64,7 +69,7 @@ pub const Event = extern struct { pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(EventListIndex) { bun.assert(this.loaded); const old_count = this.watch_count.fetchAdd(1, .release); - defer if (old_count == 0) Futex.wake(&this.watch_count, 10); + defer if (old_count == 0) this.waker.wake(); const watch_file_mask = IN.EXCL_UNLINK | IN.MOVE_SELF | IN.DELETE_SELF | IN.MOVED_TO | IN.MODIFY; const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_file_mask); log("inotify_add_watch({}) = {}", .{ this.fd, rc }); @@ -75,7 +80,7 @@ pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(Ev pub fn watchDir(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(EventListIndex) { bun.assert(this.loaded); const old_count = this.watch_count.fetchAdd(1, .release); - defer if (old_count == 0) Futex.wake(&this.watch_count, 10); + defer if (old_count == 0) this.waker.wake(); const watch_dir_mask = IN.EXCL_UNLINK | IN.DELETE | IN.DELETE_SELF | IN.CREATE | IN.MOVE_SELF | IN.ONLYDIR | IN.MOVED_TO; const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_dir_mask); log("inotify_add_watch({}) = {}", .{ this.fd, rc }); @@ -100,6 +105,7 @@ pub fn init(this: *INotifyWatcher, _: []const u8) !void { // TODO: convert to bun.sys.Error this.fd = .fromNative(try std.posix.inotify_init1(IN.CLOEXEC)); this.eventlist_bytes = &(try bun.default_allocator.alignedAlloc(EventListBytes, @alignOf(Event), 1))[0]; + this.waker = try bun.Async.Waker.init(); log("{} init", .{this.fd}); } @@ -116,72 +122,128 @@ pub fn read(this: *INotifyWatcher) bun.JSC.Maybe([]const *align(1) Event) { // We still don't correctly handle MOVED_FROM && MOVED_TO it seems. var i: u32 = 0; const read_eventlist_bytes = if (this.read_ptr) |ptr| brk: { - Futex.waitForever(&this.watch_count, 0); i = ptr.i; break :brk this.eventlist_bytes[0..ptr.len]; } else outer: while (true) { - Futex.waitForever(&this.watch_count, 0); + // Check if we should stop + if (!this.running.load(.acquire)) return .{ .result = &.{} }; - const rc = std.posix.system.read( - this.fd.cast(), - this.eventlist_bytes, - this.eventlist_bytes.len, - ); - const errno = std.posix.errno(rc); - switch (errno) { - .SUCCESS => { - var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)]; - log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len }); - if (read_eventlist_bytes.len == 0) return .{ .result = &.{} }; + // Check if watch_count is 0, if so wait for waker + const count = this.watch_count.load(.acquire); + if (count == 0) { + // Wait on just the waker fd since there are no watches + var fds = [_]std.posix.pollfd{.{ + .fd = this.waker.getFd().cast(), + .events = std.posix.POLL.IN, + .revents = 0, + }}; - // IN_MODIFY is very noisy - // we do a 0.1ms sleep to try to coalesce events better - const double_read_threshold = Event.largest_size * (max_count / 2); - if (read_eventlist_bytes.len < double_read_threshold) { - var fds = [_]std.posix.pollfd{.{ - .fd = this.fd.cast(), - .events = std.posix.POLL.IN | std.posix.POLL.ERR, - .revents = 0, - }}; - var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval }; - if ((std.posix.ppoll(&fds, ×pec, null) catch 0) > 0) { - inner: while (true) { - const rest = this.eventlist_bytes[read_eventlist_bytes.len..]; - bun.assert(rest.len > 0); - const new_rc = std.posix.system.read(this.fd.cast(), rest.ptr, rest.len); - // Output.warn("wapa {} {} = {}", .{ this.fd, rest.len, new_rc }); - const e = std.posix.errno(new_rc); - switch (e) { - .SUCCESS => { - read_eventlist_bytes.len += @intCast(new_rc); - break :outer read_eventlist_bytes; - }, - .AGAIN, .INTR => continue :inner, - else => return .{ .err = .{ - .errno = @truncate(@intFromEnum(e)), - .syscall = .read, - } }, + const poll_rc = std.posix.poll(&fds, -1) catch |err| { + return .{ .err = .{ + .errno = @truncate(@intFromEnum(err)), + .syscall = .poll, + } }; + }; + + if (poll_rc > 0 and (fds[0].revents & std.posix.POLL.IN) != 0) { + // Consume the waker + this.waker.wait(); + } + + // Check again if we should stop or if watches were added + if (!this.running.load(.acquire)) return .{ .result = &.{} }; + continue :outer; + } + + // Wait on both inotify fd and waker fd + var fds = [_]std.posix.pollfd{ + .{ + .fd = this.fd.cast(), + .events = std.posix.POLL.IN, + .revents = 0, + }, + .{ + .fd = this.waker.getFd().cast(), + .events = std.posix.POLL.IN, + .revents = 0, + }, + }; + + const poll_rc = std.posix.poll(&fds, -1) catch |err| { + return .{ .err = .{ + .errno = @truncate(@intFromEnum(err)), + .syscall = .poll, + } }; + }; + + if (poll_rc > 0) { + // Check if waker was signaled + if ((fds[1].revents & std.posix.POLL.IN) != 0) { + // Consume the waker + this.waker.wait(); + // Check if we should stop + if (!this.running.load(.acquire)) return .{ .result = &.{} }; + } + + // Check if inotify has events + if ((fds[0].revents & std.posix.POLL.IN) != 0) { + const rc = std.posix.system.read( + this.fd.cast(), + this.eventlist_bytes, + this.eventlist_bytes.len, + ); + const errno = std.posix.errno(rc); + switch (errno) { + .SUCCESS => { + var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)]; + log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len }); + if (read_eventlist_bytes.len == 0) return .{ .result = &.{} }; + + // IN_MODIFY is very noisy + // we do a 0.1ms sleep to try to coalesce events better + const double_read_threshold = Event.largest_size * (max_count / 2); + if (read_eventlist_bytes.len < double_read_threshold) { + var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval }; + if ((std.posix.ppoll(&fds[0..1], ×pec, null) catch 0) > 0) { + inner: while (true) { + const rest = this.eventlist_bytes[read_eventlist_bytes.len..]; + bun.assert(rest.len > 0); + const new_rc = std.posix.system.read(this.fd.cast(), rest.ptr, rest.len); + // Output.warn("wapa {} {} = {}", .{ this.fd, rest.len, new_rc }); + const e = std.posix.errno(new_rc); + switch (e) { + .SUCCESS => { + read_eventlist_bytes.len += @intCast(new_rc); + break :outer read_eventlist_bytes; + }, + .AGAIN, .INTR => continue :inner, + else => return .{ .err = .{ + .errno = @truncate(@intFromEnum(e)), + .syscall = .read, + } }, + } + } } } - } - } - break :outer read_eventlist_bytes; - }, - .AGAIN, .INTR => continue :outer, - .INVAL => { - if (Environment.isDebug) { - bun.Output.err("EINVAL", "inotify read({}, {d})", .{ this.fd, this.eventlist_bytes.len }); + break :outer read_eventlist_bytes; + }, + .AGAIN, .INTR => continue :outer, + .INVAL => { + if (Environment.isDebug) { + bun.Output.err("EINVAL", "inotify read({}, {d})", .{ this.fd, this.eventlist_bytes.len }); + } + return .{ .err = .{ + .errno = @truncate(@intFromEnum(errno)), + .syscall = .read, + } }; + }, + else => return .{ .err = .{ + .errno = @truncate(@intFromEnum(errno)), + .syscall = .read, + } }, } - return .{ .err = .{ - .errno = @truncate(@intFromEnum(errno)), - .syscall = .read, - } }; - }, - else => return .{ .err = .{ - .errno = @truncate(@intFromEnum(errno)), - .syscall = .read, - } }, + } } }; @@ -213,11 +275,17 @@ pub fn read(this: *INotifyWatcher) bun.JSC.Maybe([]const *align(1) Event) { } } + // Clear read_ptr if we've processed all buffered events + this.read_ptr = null; + return .{ .result = this.eventlist_ptrs[0..count] }; } pub fn stop(this: *INotifyWatcher) void { log("{} stop", .{this.fd}); + this.running.store(false, .release); + // Wake up any threads waiting in read() + this.waker.wake(); if (this.fd != bun.invalid_fd) { this.fd.close(); this.fd = bun.invalid_fd; @@ -236,6 +304,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) { // TODO: is this thread safe? var remaining_events = events.len; + var event_offset: usize = 0; const eventlist_index = this.watchlist.items(.eventlist_index); @@ -244,7 +313,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) { var temp_name_list: [128]?[:0]u8 = undefined; var temp_name_off: u8 = 0; - const slice = events[0..@min(128, remaining_events, this.watch_events.len)]; + const slice = events[event_offset..][0..@min(128, remaining_events, this.watch_events.len)]; var watchevents = this.watch_events[0..slice.len]; var watch_event_id: u32 = 0; for (slice) |event| { @@ -297,6 +366,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) { } else { break; } + event_offset += slice.len; remaining_events -= slice.len; }