Files
bun.sh/src/watcher/INotifyWatcher.zig
taylor.fish 41b1efe12c Rename disabled parameter in Output.scoped (#21769)
It's very confusing.

(For internal tracking: fixes STAB-977)
2025-08-11 20:19:34 -07:00

385 lines
15 KiB
Zig

//! Bun's filesystem watcher implementation for linux using inotify
//! https://man7.org/linux/man-pages/man7/inotify.7.html
const INotifyWatcher = @This();
const log = Output.scoped(.watcher, .visible);
// inotify events are variable-sized, so a byte buffer is used (also needed
// since communication is done via the `read` syscall). what is notable about
// this is that while a max_count is defined, more events than max_count can be
// read if the paths are short. the buffer is sized not to the maximum possible,
// but an arbitrary but reasonable size. when reading, the strategy is to read
// as much as possible, then process the buffer in `max_count` chunks, since
// `bun.Watcher` has the same hardcoded `max_count`.
const eventlist_bytes_size = (Event.largest_size / 2) * max_count;
const EventListBytes = [eventlist_bytes_size]u8;
fd: bun.FileDescriptor = bun.invalid_fd,
loaded: bool = false,
// Avoid statically allocating because it increases the binary size.
eventlist_bytes: *EventListBytes align(@alignOf(Event)) = undefined,
/// pointers into the next chunk of events
eventlist_ptrs: [max_count]*align(1) Event = undefined,
/// if defined, it means `read` should continue from this offset before asking
/// for more bytes. this is only hit under high watching load.
/// see `test-fs-watch-recursive-linux-parallel-remove.js`
read_ptr: ?struct {
i: u32,
len: u32,
} = null,
watch_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
/// nanoseconds
coalesce_interval: isize = 100_000,
pub const EventListIndex = c_int;
pub const Event = extern struct {
watch_descriptor: EventListIndex,
mask: u32,
cookie: u32,
/// The name field is present only when an event is returned for a
/// file inside a watched directory; it identifies the filename
/// within the watched directory. This filename is null-terminated,
/// and may include further null bytes ('\0') to align subsequent
/// reads to a suitable address boundary.
///
/// The len field counts all of the bytes in name, including the null
/// bytes; the length of each inotify_event structure is thus
/// sizeof(struct inotify_event)+len.
name_len: u32,
const largest_size = std.mem.alignForward(usize, @sizeOf(Event) + bun.MAX_PATH_BYTES, @alignOf(Event));
pub fn name(event: *align(1) Event) [:0]u8 {
if (comptime Environment.allow_assert) bun.assertf(event.name_len > 0, "INotifyWatcher.Event.name() called with name_len == 0, you should check it before calling this function.", .{});
const name_first_char_ptr = std.mem.asBytes(&event.name_len).ptr + @sizeOf(u32);
return bun.sliceTo(@as([*:0]u8, @ptrCast(name_first_char_ptr)), 0);
}
pub fn size(event: *align(1) Event) u32 {
return @intCast(@sizeOf(Event) + event.name_len);
}
};
pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.sys.Maybe(EventListIndex) {
bun.assert(this.loaded);
const old_count = this.watch_count.fetchAdd(1, .release);
defer if (old_count == 0) Futex.wake(&this.watch_count, 10);
const watch_file_mask = IN.EXCL_UNLINK | IN.MOVE_SELF | IN.DELETE_SELF | IN.MOVED_TO | IN.MODIFY;
const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_file_mask);
log("inotify_add_watch({}) = {}", .{ this.fd, rc });
return bun.sys.Maybe(EventListIndex).errnoSysP(rc, .watch, pathname) orelse
.{ .result = rc };
}
pub fn watchDir(this: *INotifyWatcher, pathname: [:0]const u8) bun.sys.Maybe(EventListIndex) {
bun.assert(this.loaded);
const old_count = this.watch_count.fetchAdd(1, .release);
defer if (old_count == 0) Futex.wake(&this.watch_count, 10);
const watch_dir_mask = IN.EXCL_UNLINK | IN.DELETE | IN.DELETE_SELF | IN.CREATE | IN.MOVE_SELF | IN.ONLYDIR | IN.MOVED_TO;
const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_dir_mask);
log("inotify_add_watch({}) = {}", .{ this.fd, rc });
return bun.sys.Maybe(EventListIndex).errnoSysP(rc, .watch, pathname) orelse
.{ .result = rc };
}
pub fn unwatch(this: *INotifyWatcher, wd: EventListIndex) void {
bun.assert(this.loaded);
_ = this.watch_count.fetchSub(1, .release);
_ = system.inotify_rm_watch(this.fd, wd);
}
pub fn init(this: *INotifyWatcher, _: []const u8) !void {
bun.assert(!this.loaded);
this.loaded = true;
if (bun.getenvZ("BUN_INOTIFY_COALESCE_INTERVAL")) |env| {
this.coalesce_interval = std.fmt.parseInt(isize, env, 10) catch 100_000;
}
// TODO: convert to bun.sys.Error
this.fd = .fromNative(try std.posix.inotify_init1(IN.CLOEXEC));
this.eventlist_bytes = &(try bun.default_allocator.alignedAlloc(EventListBytes, @alignOf(Event), 1))[0];
log("{} init", .{this.fd});
}
pub fn read(this: *INotifyWatcher) bun.sys.Maybe([]const *align(1) Event) {
bun.assert(this.loaded);
// This is what replit does as of Jaunary 2023.
// 1) CREATE .http.ts.3491171321~
// 2) OPEN .http.ts.3491171321~
// 3) ATTRIB .http.ts.3491171321~
// 4) MODIFY .http.ts.3491171321~
// 5) CLOSE_WRITE,CLOSE .http.ts.3491171321~
// 6) MOVED_FROM .http.ts.3491171321~
// 7) MOVED_TO http.ts
// We still don't correctly handle MOVED_FROM && MOVED_TO it seems.
var i: u32 = 0;
const read_eventlist_bytes = if (this.read_ptr) |ptr| brk: {
Futex.waitForever(&this.watch_count, 0);
i = ptr.i;
break :brk this.eventlist_bytes[0..ptr.len];
} else outer: while (true) {
Futex.waitForever(&this.watch_count, 0);
const rc = std.posix.system.read(
this.fd.cast(),
this.eventlist_bytes,
this.eventlist_bytes.len,
);
const errno = std.posix.errno(rc);
switch (errno) {
.SUCCESS => {
var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)];
log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len });
if (read_eventlist_bytes.len == 0) return .{ .result = &.{} };
// IN_MODIFY is very noisy
// we do a 0.1ms sleep to try to coalesce events better
const double_read_threshold = Event.largest_size * (max_count / 2);
if (read_eventlist_bytes.len < double_read_threshold) {
var fds = [_]std.posix.pollfd{.{
.fd = this.fd.cast(),
.events = std.posix.POLL.IN | std.posix.POLL.ERR,
.revents = 0,
}};
var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval };
if ((std.posix.ppoll(&fds, &timespec, null) catch 0) > 0) {
inner: while (true) {
const rest = this.eventlist_bytes[read_eventlist_bytes.len..];
bun.assert(rest.len > 0);
const new_rc = std.posix.system.read(this.fd.cast(), rest.ptr, rest.len);
// Output.warn("wapa {} {} = {}", .{ this.fd, rest.len, new_rc });
const e = std.posix.errno(new_rc);
switch (e) {
.SUCCESS => {
read_eventlist_bytes.len += @intCast(new_rc);
break :outer read_eventlist_bytes;
},
.AGAIN, .INTR => continue :inner,
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(e)),
.syscall = .read,
} },
}
}
}
}
break :outer read_eventlist_bytes;
},
.AGAIN, .INTR => continue :outer,
.INVAL => {
if (Environment.isDebug) {
bun.Output.err("EINVAL", "inotify read({}, {d})", .{ this.fd, this.eventlist_bytes.len });
}
return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} };
},
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} },
}
};
var count: u32 = 0;
while (i < read_eventlist_bytes.len) {
// It is NOT aligned naturally. It is align 1!!!
const event: *align(1) Event = @ptrCast(read_eventlist_bytes[i..][0..@sizeOf(Event)].ptr);
this.eventlist_ptrs[count] = event;
i += event.size();
count += 1;
if (Environment.enable_logs)
log("{} read event {} {} {} {}", .{
this.fd,
event.watch_descriptor,
event.cookie,
event.mask,
bun.fmt.quote(if (event.name_len > 0) event.name() else ""),
});
// when under high load with short file paths, it is very easy to
// overrun the watcher's event buffer.
if (count == max_count) {
this.read_ptr = .{
.i = i,
.len = @intCast(read_eventlist_bytes.len),
};
log("{} read buffer filled up", .{this.fd});
return .{ .result = &this.eventlist_ptrs };
}
}
return .{ .result = this.eventlist_ptrs[0..count] };
}
pub fn stop(this: *INotifyWatcher) void {
log("{} stop", .{this.fd});
if (this.fd != bun.invalid_fd) {
this.fd.close();
this.fd = bun.invalid_fd;
}
}
/// Repeatedly called by the main watcher until the watcher is terminated.
pub fn watchLoopCycle(this: *bun.Watcher) bun.sys.Maybe(void) {
defer Output.flush();
const events = switch (this.platform.read()) {
.result => |result| result,
.err => |err| return .{ .err = err },
};
if (events.len == 0) return .success;
const eventlist_index = this.watchlist.items(.eventlist_index);
var event_id: usize = 0;
var events_processed: usize = 0;
while (events_processed < events.len) {
var name_off: u8 = 0;
var temp_name_list: [128]?[:0]u8 = undefined;
var temp_name_off: u8 = 0;
// Process events one by one, batching when we hit limits
while (events_processed < events.len) {
const event = events[events_processed];
// Check if we're about to exceed the watch_events array capacity
if (event_id >= this.watch_events.len) {
// Process current batch of events
switch (processINotifyEventBatch(this, event_id, temp_name_list[0..temp_name_off])) {
.err => |err| return .{ .err = err },
.result => {},
}
// Reset event_id to start a new batch
event_id = 0;
name_off = 0;
temp_name_off = 0;
}
// Check if we can fit this event's name in temp_name_list
const will_have_name = event.name_len > 0;
if (will_have_name and temp_name_off >= temp_name_list.len) {
// Process current batch and start a new one
if (event_id > 0) {
switch (processINotifyEventBatch(this, event_id, temp_name_list[0..temp_name_off])) {
.err => |err| return .{ .err = err },
.result => {},
}
event_id = 0;
name_off = 0;
temp_name_off = 0;
}
}
this.watch_events[event_id] = watchEventFromInotifyEvent(
event,
@intCast(std.mem.indexOfScalar(
EventListIndex,
eventlist_index,
event.watch_descriptor,
) orelse {
events_processed += 1;
continue;
}),
);
// Safely handle event names with bounds checking
if (event.name_len > 0 and temp_name_off < temp_name_list.len) {
temp_name_list[temp_name_off] = event.name();
this.watch_events[event_id].name_off = temp_name_off;
this.watch_events[event_id].name_len = 1;
temp_name_off += 1;
} else {
this.watch_events[event_id].name_off = temp_name_off;
this.watch_events[event_id].name_len = 0;
}
event_id += 1;
events_processed += 1;
}
// Process any remaining events in the final batch
if (event_id > 0) {
switch (processINotifyEventBatch(this, event_id, temp_name_list[0..temp_name_off])) {
.err => |err| return .{ .err = err },
.result => {},
}
}
break;
}
return .success;
}
fn processINotifyEventBatch(this: *bun.Watcher, event_count: usize, temp_name_list: []?[:0]u8) bun.sys.Maybe(void) {
if (event_count == 0) {
return .success;
}
var name_off: u8 = 0;
var all_events = this.watch_events[0..event_count];
std.sort.pdq(WatchEvent, all_events, {}, WatchEvent.sortByIndex);
var last_event_index: usize = 0;
var last_event_id: EventListIndex = std.math.maxInt(EventListIndex);
for (all_events, 0..) |_, i| {
if (all_events[i].name_len > 0) {
// Check bounds before accessing arrays
if (name_off < this.changed_filepaths.len and all_events[i].name_off < temp_name_list.len) {
this.changed_filepaths[name_off] = temp_name_list[all_events[i].name_off];
all_events[i].name_off = name_off;
name_off += 1;
}
}
if (all_events[i].index == last_event_id) {
all_events[last_event_index].merge(all_events[i]);
continue;
}
last_event_index = i;
last_event_id = all_events[i].index;
}
if (all_events.len == 0) return .success;
this.mutex.lock();
defer this.mutex.unlock();
if (this.running) {
// all_events.len == 0 is checked above, so last_event_index + 1 is safe
this.onFileUpdate(this.ctx, all_events[0 .. last_event_index + 1], this.changed_filepaths[0..name_off], this.watchlist);
}
return .success;
}
pub fn watchEventFromInotifyEvent(event: *align(1) const INotifyWatcher.Event, index: WatchItemIndex) WatchEvent {
return .{
.op = .{
.delete = (event.mask & IN.DELETE_SELF) > 0 or (event.mask & IN.DELETE) > 0,
.rename = (event.mask & IN.MOVE_SELF) > 0,
.move_to = (event.mask & IN.MOVED_TO) > 0,
.write = (event.mask & IN.MODIFY) > 0,
},
.index = index,
};
}
const std = @import("std");
const system = std.posix.system;
const IN = std.os.linux.IN;
const bun = @import("bun");
const Environment = bun.Environment;
const Futex = bun.Futex;
const Output = bun.Output;
const WatchEvent = bun.Watcher.Event;
const WatchItemIndex = bun.Watcher.WatchItemIndex;
const max_count = bun.Watcher.max_count;