apply patches

This commit is contained in:
Jarred Sumner
2025-06-06 16:45:12 -07:00
parent e8a0136501
commit 58b8201fbe
2 changed files with 226 additions and 105 deletions

View File

@@ -43,6 +43,12 @@ pub const FSWatcher = struct {
pending_activity_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
current_task: FSWatchTask = undefined,
// NOTE: this may equal `.current_task`. do not free it!
// if we create multiple on the same tick we use this buffer.
last_task: ?*FSWatchTask = null,
js_thread: std.Thread.Id,
pub fn eventLoop(this: FSWatcher) *EventLoop {
return this.ctx.eventLoop();
}
@@ -59,21 +65,25 @@ pub const FSWatcher = struct {
pub const finalize = deinit;
pub const FSWatchTask = if (Environment.isWindows) FSWatchTaskWindows else FSWatchTaskPosix;
pub const FSWatchTaskPosix = struct {
pub const new = bun.TrivialNew(@This());
pub const FSWatchTask = FSWatchTaskPosix;
const FSWatchTaskPosix = struct {
concurrent_task: JSC.EventLoopTask,
ctx: *FSWatcher,
count: u8 = 0,
count: u8,
entries: [BATCH_SIZE]Entry = undefined,
// Track if we need to perform ref operation on main thread
needs_ref: bool = false,
entries: [8]Entry = undefined,
concurrent_task: JSC.ConcurrentTask = undefined,
pub const Entry = struct {
const Entry = struct {
event: Event,
needs_free: bool,
};
const BATCH_SIZE = 10;
pub const new = bun.TrivialNew(@This());
pub fn deinit(this: *FSWatchTask) void {
this.cleanEntries();
if (comptime Environment.allow_assert) {
@@ -82,13 +92,15 @@ pub const FSWatcher = struct {
bun.destroy(this);
}
pub fn append(this: *FSWatchTask, event: Event, needs_free: bool) void {
if (this.count == 8) {
pub fn append(this: *FSWatchTaskPosix, event: Event, needs_free: bool) void {
if (this.count >= this.entries.len) {
this.enqueue();
const ctx = this.ctx;
this.* = .{
.ctx = ctx,
.count = 0,
.concurrent_task = undefined,
.needs_ref = false,
};
}
@@ -101,25 +113,31 @@ pub const FSWatcher = struct {
pub fn run(this: *FSWatchTask) void {
// this runs on JS Context Thread
const ctx = this.ctx;
// Perform ref operation on main thread if needed
if (this.needs_ref and ctx.persistent) {
ctx.poll_ref.ref(ctx.ctx);
}
for (this.entries[0..this.count]) |entry| {
switch (entry.event) {
inline .rename, .change => |file_path, t| {
this.ctx.emit(file_path, t);
ctx.emit(file_path, t);
},
.@"error" => |err| {
this.ctx.emitError(err);
ctx.emitError(err);
},
.abort => {
this.ctx.emitIfAborted();
ctx.emitIfAborted();
},
.close => {
this.ctx.emit("", .close);
ctx.emit("", .close);
},
}
}
this.ctx.unrefTask();
ctx.unrefTaskMainThread();
}
pub fn appendAbort(this: *FSWatchTask) void {
@@ -131,20 +149,32 @@ pub const FSWatcher = struct {
if (this.count == 0)
return;
// if false is closed or detached (can still contain valid refs but will not create a new one)
if (this.ctx.refTask()) {
var that = FSWatchTask.new(this.*);
// Clear the count before enqueueing to avoid double processing
const saved_count = this.count;
this.count = 0;
that.count = saved_count;
that.concurrent_task.task = JSC.Task.init(that);
this.ctx.enqueueTaskConcurrent(&that.concurrent_task);
// Check if we can proceed from watcher thread
const ctx = this.ctx;
ctx.mutex.lock();
defer ctx.mutex.unlock();
if (ctx.closed) {
this.cleanEntries();
return;
}
// closed or detached so just cleanEntries
this.cleanEntries();
// Track pending activity
const current = ctx.pending_activity_count.fetchAdd(1, .monotonic);
var that = FSWatchTask.new(this.*);
// Clear the count before enqueueing to avoid double processing
const saved_count = this.count;
this.count = 0;
that.count = saved_count;
// If we went from 0 to 1, we need to ref on the main thread
that.needs_ref = (current == 0);
that.concurrent_task.task = JSC.Task.init(that);
ctx.enqueueTaskConcurrent(&that.concurrent_task);
}
pub fn cleanEntries(this: *FSWatchTask) void {
for (this.entries[0..this.count]) |*entry| {
if (entry.needs_free) {
@@ -437,7 +467,7 @@ pub const FSWatcher = struct {
};
pub fn initJS(this: *FSWatcher, listener: JSC.JSValue) void {
// The initial activity count is already 1 from init()
// We're on main thread during init
if (this.persistent) {
this.poll_ref.ref(this.ctx);
}
@@ -559,6 +589,7 @@ pub const FSWatcher = struct {
pub fn doRef(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
if (!this.closed and !this.persistent) {
this.persistent = true;
// We're on main thread here
this.poll_ref.ref(this.ctx);
}
return .undefined;
@@ -567,6 +598,7 @@ pub const FSWatcher = struct {
pub fn doUnref(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
if (this.persistent) {
this.persistent = false;
// We're on main thread here
this.poll_ref.unref(this.ctx);
}
return .undefined;
@@ -576,16 +608,31 @@ pub const FSWatcher = struct {
return JSC.JSValue.jsBoolean(this.persistent);
}
// this can be called from Watcher Thread or JS Context Thread
// Only call from main thread
pub fn refTaskMainThread(this: *FSWatcher) void {
if (this.persistent) {
this.poll_ref.ref(this.ctx);
}
}
// Only call from main thread
pub fn unrefTaskMainThread(this: *FSWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
const current = this.pending_activity_count.fetchSub(1, .monotonic);
// If we went from 1 to 0, we need to unref the poll
if (current == 1 and this.persistent) {
this.poll_ref.unref(this.ctx);
}
}
// Can be called from any thread - delegates to main thread version when on JS thread
pub fn refTask(this: *FSWatcher) bool {
this.mutex.lock();
defer this.mutex.unlock();
if (this.closed) return false;
const current = this.pending_activity_count.fetchAdd(1, .monotonic);
// If we went from 0 to 1, we need to ref the poll
if (current == 0 and this.persistent) {
this.poll_ref.ref(this.ctx);
}
_ = this.pending_activity_count.fetchAdd(1, .monotonic);
return true;
}
@@ -593,14 +640,14 @@ pub const FSWatcher = struct {
return this.pending_activity_count.load(.acquire) > 0;
}
// Can be called from any thread - delegates to main thread version when on JS thread
pub fn unrefTask(this: *FSWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
// JSC eventually will free it
const current = this.pending_activity_count.fetchSub(1, .monotonic);
// If we went from 1 to 0, we need to unref the poll
if (current == 1 and this.persistent) {
this.poll_ref.unref(this.ctx);
// If we're on the JS thread (where we have access to the VM), use the main thread version
// Otherwise just decrement the count
if (this.js_thread == std.Thread.getCurrentId()) {
this.unrefTaskMainThread();
} else {
_ = this.pending_activity_count.fetchSub(1, .monotonic);
}
}
@@ -614,14 +661,18 @@ pub const FSWatcher = struct {
if (js_this != .zero) {
if (FSWatcher.js.listenerGetCached(js_this)) |listener| {
_ = this.refTask();
// We're on main thread during close
const current = this.pending_activity_count.fetchAdd(1, .monotonic);
if (current == 0 and this.persistent) {
this.poll_ref.ref(this.ctx);
}
log("emit('close')", .{});
emitJS(listener, this.globalThis, .undefined, .close);
this.unrefTask();
this.unrefTaskMainThread();
}
}
this.unrefTask();
this.unrefTaskMainThread();
} else {
this.mutex.unlock();
}
@@ -698,9 +749,9 @@ pub const FSWatcher = struct {
.path_watcher = null,
.globalThis = args.global_this,
.js_this = .zero,
.encoding = args.encoding,
.closed = false,
.verbose = args.verbose,
.encoding = args.encoding,
.js_thread = std.Thread.getCurrentId(),
});
ctx.current_task.ctx = ctx;

View File

@@ -32,6 +32,11 @@ watch_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
/// nanoseconds
coalesce_interval: isize = 100_000,
/// Waker to signal the watcher thread
waker: bun.Async.Waker = undefined,
/// Whether the watcher is still running
running: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
pub const EventListIndex = c_int;
pub const Event = extern struct {
watch_descriptor: EventListIndex,
@@ -64,7 +69,7 @@ pub const Event = extern struct {
pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(EventListIndex) {
bun.assert(this.loaded);
const old_count = this.watch_count.fetchAdd(1, .release);
defer if (old_count == 0) Futex.wake(&this.watch_count, 10);
defer if (old_count == 0) this.waker.wake();
const watch_file_mask = IN.EXCL_UNLINK | IN.MOVE_SELF | IN.DELETE_SELF | IN.MOVED_TO | IN.MODIFY;
const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_file_mask);
log("inotify_add_watch({}) = {}", .{ this.fd, rc });
@@ -75,7 +80,7 @@ pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(Ev
pub fn watchDir(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(EventListIndex) {
bun.assert(this.loaded);
const old_count = this.watch_count.fetchAdd(1, .release);
defer if (old_count == 0) Futex.wake(&this.watch_count, 10);
defer if (old_count == 0) this.waker.wake();
const watch_dir_mask = IN.EXCL_UNLINK | IN.DELETE | IN.DELETE_SELF | IN.CREATE | IN.MOVE_SELF | IN.ONLYDIR | IN.MOVED_TO;
const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_dir_mask);
log("inotify_add_watch({}) = {}", .{ this.fd, rc });
@@ -100,6 +105,7 @@ pub fn init(this: *INotifyWatcher, _: []const u8) !void {
// TODO: convert to bun.sys.Error
this.fd = .fromNative(try std.posix.inotify_init1(IN.CLOEXEC));
this.eventlist_bytes = &(try bun.default_allocator.alignedAlloc(EventListBytes, @alignOf(Event), 1))[0];
this.waker = try bun.Async.Waker.init();
log("{} init", .{this.fd});
}
@@ -116,72 +122,128 @@ pub fn read(this: *INotifyWatcher) bun.JSC.Maybe([]const *align(1) Event) {
// We still don't correctly handle MOVED_FROM && MOVED_TO it seems.
var i: u32 = 0;
const read_eventlist_bytes = if (this.read_ptr) |ptr| brk: {
Futex.waitForever(&this.watch_count, 0);
i = ptr.i;
break :brk this.eventlist_bytes[0..ptr.len];
} else outer: while (true) {
Futex.waitForever(&this.watch_count, 0);
// Check if we should stop
if (!this.running.load(.acquire)) return .{ .result = &.{} };
const rc = std.posix.system.read(
this.fd.cast(),
this.eventlist_bytes,
this.eventlist_bytes.len,
);
const errno = std.posix.errno(rc);
switch (errno) {
.SUCCESS => {
var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)];
log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len });
if (read_eventlist_bytes.len == 0) return .{ .result = &.{} };
// Check if watch_count is 0, if so wait for waker
const count = this.watch_count.load(.acquire);
if (count == 0) {
// Wait on just the waker fd since there are no watches
var fds = [_]std.posix.pollfd{.{
.fd = this.waker.getFd().cast(),
.events = std.posix.POLL.IN,
.revents = 0,
}};
// IN_MODIFY is very noisy
// we do a 0.1ms sleep to try to coalesce events better
const double_read_threshold = Event.largest_size * (max_count / 2);
if (read_eventlist_bytes.len < double_read_threshold) {
var fds = [_]std.posix.pollfd{.{
.fd = this.fd.cast(),
.events = std.posix.POLL.IN | std.posix.POLL.ERR,
.revents = 0,
}};
var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval };
if ((std.posix.ppoll(&fds, &timespec, null) catch 0) > 0) {
inner: while (true) {
const rest = this.eventlist_bytes[read_eventlist_bytes.len..];
bun.assert(rest.len > 0);
const new_rc = std.posix.system.read(this.fd.cast(), rest.ptr, rest.len);
// Output.warn("wapa {} {} = {}", .{ this.fd, rest.len, new_rc });
const e = std.posix.errno(new_rc);
switch (e) {
.SUCCESS => {
read_eventlist_bytes.len += @intCast(new_rc);
break :outer read_eventlist_bytes;
},
.AGAIN, .INTR => continue :inner,
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(e)),
.syscall = .read,
} },
const poll_rc = std.posix.poll(&fds, -1) catch |err| {
return .{ .err = .{
.errno = @truncate(@intFromEnum(err)),
.syscall = .poll,
} };
};
if (poll_rc > 0 and (fds[0].revents & std.posix.POLL.IN) != 0) {
// Consume the waker
this.waker.wait();
}
// Check again if we should stop or if watches were added
if (!this.running.load(.acquire)) return .{ .result = &.{} };
continue :outer;
}
// Wait on both inotify fd and waker fd
var fds = [_]std.posix.pollfd{
.{
.fd = this.fd.cast(),
.events = std.posix.POLL.IN,
.revents = 0,
},
.{
.fd = this.waker.getFd().cast(),
.events = std.posix.POLL.IN,
.revents = 0,
},
};
const poll_rc = std.posix.poll(&fds, -1) catch |err| {
return .{ .err = .{
.errno = @truncate(@intFromEnum(err)),
.syscall = .poll,
} };
};
if (poll_rc > 0) {
// Check if waker was signaled
if ((fds[1].revents & std.posix.POLL.IN) != 0) {
// Consume the waker
this.waker.wait();
// Check if we should stop
if (!this.running.load(.acquire)) return .{ .result = &.{} };
}
// Check if inotify has events
if ((fds[0].revents & std.posix.POLL.IN) != 0) {
const rc = std.posix.system.read(
this.fd.cast(),
this.eventlist_bytes,
this.eventlist_bytes.len,
);
const errno = std.posix.errno(rc);
switch (errno) {
.SUCCESS => {
var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)];
log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len });
if (read_eventlist_bytes.len == 0) return .{ .result = &.{} };
// IN_MODIFY is very noisy
// we do a 0.1ms sleep to try to coalesce events better
const double_read_threshold = Event.largest_size * (max_count / 2);
if (read_eventlist_bytes.len < double_read_threshold) {
var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval };
if ((std.posix.ppoll(&fds[0..1], &timespec, null) catch 0) > 0) {
inner: while (true) {
const rest = this.eventlist_bytes[read_eventlist_bytes.len..];
bun.assert(rest.len > 0);
const new_rc = std.posix.system.read(this.fd.cast(), rest.ptr, rest.len);
// Output.warn("wapa {} {} = {}", .{ this.fd, rest.len, new_rc });
const e = std.posix.errno(new_rc);
switch (e) {
.SUCCESS => {
read_eventlist_bytes.len += @intCast(new_rc);
break :outer read_eventlist_bytes;
},
.AGAIN, .INTR => continue :inner,
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(e)),
.syscall = .read,
} },
}
}
}
}
}
}
break :outer read_eventlist_bytes;
},
.AGAIN, .INTR => continue :outer,
.INVAL => {
if (Environment.isDebug) {
bun.Output.err("EINVAL", "inotify read({}, {d})", .{ this.fd, this.eventlist_bytes.len });
break :outer read_eventlist_bytes;
},
.AGAIN, .INTR => continue :outer,
.INVAL => {
if (Environment.isDebug) {
bun.Output.err("EINVAL", "inotify read({}, {d})", .{ this.fd, this.eventlist_bytes.len });
}
return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} };
},
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} },
}
return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} };
},
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} },
}
}
};
@@ -213,11 +275,17 @@ pub fn read(this: *INotifyWatcher) bun.JSC.Maybe([]const *align(1) Event) {
}
}
// Clear read_ptr if we've processed all buffered events
this.read_ptr = null;
return .{ .result = this.eventlist_ptrs[0..count] };
}
pub fn stop(this: *INotifyWatcher) void {
log("{} stop", .{this.fd});
this.running.store(false, .release);
// Wake up any threads waiting in read()
this.waker.wake();
if (this.fd != bun.invalid_fd) {
this.fd.close();
this.fd = bun.invalid_fd;
@@ -236,6 +304,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
// TODO: is this thread safe?
var remaining_events = events.len;
var event_offset: usize = 0;
const eventlist_index = this.watchlist.items(.eventlist_index);
@@ -244,7 +313,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
var temp_name_list: [128]?[:0]u8 = undefined;
var temp_name_off: u8 = 0;
const slice = events[0..@min(128, remaining_events, this.watch_events.len)];
const slice = events[event_offset..][0..@min(128, remaining_events, this.watch_events.len)];
var watchevents = this.watch_events[0..slice.len];
var watch_event_id: u32 = 0;
for (slice) |event| {
@@ -297,6 +366,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
} else {
break;
}
event_offset += slice.len;
remaining_events -= slice.len;
}