Compare commits

...

2 Commits

Author SHA1 Message Date
Jarred Sumner
c115087af5 Update watcher.zig 2024-07-05 01:20:56 -07:00
Jarred Sumner
768e1fe75c Clean up inotify read code 2024-07-05 01:20:15 -07:00
2 changed files with 68 additions and 78 deletions

View File

@@ -1877,6 +1877,21 @@ pub fn renameatConcurrentlyWithoutFallback(
return Maybe(void).success;
}
pub fn poll(fds: []posix.pollfd, timeout: ?*linux.timespec) Maybe(usize) {
if (comptime !Environment.isLinux) @compileError("ppoll() is not implemented on this platform");
while (true) {
const rc = linux.ppoll(fds.ptr, fds.len, timeout, null);
if (Maybe(usize).errnoSys(rc, .poll)) |err| {
if (err.getErrno() == .INTR) continue;
return err;
}
return Maybe(usize){ .result = rc };
}
unreachable;
}
pub fn renameat2(from_dir: bun.FileDescriptor, from: [:0]const u8, to_dir: bun.FileDescriptor, to: [:0]const u8, flags: RenameAt2Flags) Maybe(void) {
if (Environment.isWindows) {
return renameat(from_dir, from, to_dir, to);

View File

@@ -22,8 +22,16 @@ const INotify = struct {
loaded_inotify: bool = false,
inotify_fd: EventListIndex = 0,
eventlist: EventListBuffer = undefined,
eventlist_ptrs: [128]*const INotifyEvent = undefined,
// In the spirit of being flexible with inputs
//
// We make this pointer aligned, even though the actually set each event to
// have an alignment of 1 in the pointer array.
eventlist: EventListBuffer align(@alignOf(INotifyEvent)) = undefined,
eventlist_ptrs: [
// This buffer must always be >= the number of events we can get from the read buffer
@sizeOf(EventListBuffer) / @sizeOf(INotifyEvent)
]*align(1) const INotifyEvent = undefined,
watch_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
coalesce_interval: isize = 100_000,
@@ -113,89 +121,56 @@ const INotify = struct {
this.inotify_fd = try std.posix.inotify_init1(std.os.linux.IN.CLOEXEC);
}
pub fn read(this: *INotify) bun.JSC.Maybe([]*const INotifyEvent) {
pub fn read(this: *INotify) bun.JSC.Maybe([]*align(1) const INotifyEvent) {
bun.assert(this.loaded_inotify);
restart: while (true) {
Futex.wait(&this.watch_count, 0, null) catch |err| switch (err) {
error.TimedOut => unreachable, // timeout is infinite
};
// We *hope* that this inotify read() call will simply block until there
// are events to read.
switch (bun.sys.read(bun.toFD(this.inotify_fd), &this.eventlist)) {
.result => |len| {
if (len == 0) return .{ .result = &[_]*INotifyEvent{} };
const rc = std.posix.system.read(
this.inotify_fd,
@as([*]u8, @ptrCast(@alignCast(&this.eventlist))),
@sizeOf(EventListBuffer),
);
const errno = std.posix.errno(rc);
switch (errno) {
.SUCCESS => {
var len = @as(usize, @intCast(rc));
if (len == 0) return .{ .result = &[_]*INotifyEvent{} };
// IN_MODIFY is very noisy
// we do a 0.1ms sleep to try to coalesce events better
if (len < (@sizeOf(EventListBuffer) / 2)) {
var fds = [_]std.posix.pollfd{.{
.fd = this.inotify_fd,
.events = std.posix.POLL.IN | std.posix.POLL.ERR,
.revents = 0,
}};
var timespec = std.posix.timespec{ .tv_sec = 0, .tv_nsec = this.coalesce_interval };
if ((std.posix.ppoll(&fds, &timespec, null) catch 0) > 0) {
while (true) {
const new_rc = std.posix.system.read(
this.inotify_fd,
@as([*]u8, @ptrCast(@alignCast(&this.eventlist))) + len,
@sizeOf(EventListBuffer) - len,
);
const e = std.posix.errno(new_rc);
switch (e) {
.SUCCESS => {
len += @as(usize, @intCast(new_rc));
},
.AGAIN => continue,
.INTR => continue,
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(e)),
.syscall = .read,
} },
}
break;
}
// IN_MODIFY is very noisy
// we do a 0.1ms sleep to try to coalesce events better
if (len < (@sizeOf(EventListBuffer) / 2)) {
var fds = [_]std.os.linux.pollfd{.{
.fd = this.inotify_fd,
.events = std.os.linux.POLL.IN | std.os.linux.POLL.ERR,
.revents = 0,
}};
var timespec = std.os.linux.timespec{ .tv_sec = 0, .tv_nsec = this.coalesce_interval };
if ((bun.sys.ppoll(&fds, &timespec, null).unwrap() catch 0) > 0) {
switch (bun.sys.read(bun.toFD(this.inotify_fd), this.eventlist[len..])) {
.result => |new_rc| {
len += new_rc;
},
// Consume the pending events, ignore the error.
.err => {},
}
}
}
// This is what replit does as of Jaunary 2023.
// 1) CREATE .http.ts.3491171321~
// 2) OPEN .http.ts.3491171321~
// 3) ATTRIB .http.ts.3491171321~
// 4) MODIFY .http.ts.3491171321~
// 5) CLOSE_WRITE,CLOSE .http.ts.3491171321~
// 6) MOVED_FROM .http.ts.3491171321~
// 7) MOVED_TO http.ts
// We still don't correctly handle MOVED_FROM && MOVED_TO it seems.
// This is what replit does as of Jaunary 2023.
// 1) CREATE .http.ts.3491171321~
// 2) OPEN .http.ts.3491171321~
// 3) ATTRIB .http.ts.3491171321~
// 4) MODIFY .http.ts.3491171321~
// 5) CLOSE_WRITE,CLOSE .http.ts.3491171321~
// 6) MOVED_FROM .http.ts.3491171321~
// 7) MOVED_TO http.ts
// We still don't correctly handle MOVED_FROM && MOVED_TO it seems.
var count: usize = 0;
var i: usize = 0;
while (i < len) {
const event: *align(1) const INotifyEvent = @ptrCast(@alignCast(this.eventlist[i..][0..@sizeOf(INotifyEvent)]));
i += event.name_len + @sizeOf(INotifyEvent);
this.eventlist_ptrs[count] = event;
count += 1;
}
var count: u32 = 0;
var i: u32 = 0;
while (i < len) : (i += @sizeOf(INotifyEvent)) {
@setRuntimeSafety(false);
const event = @as(*INotifyEvent, @ptrCast(@alignCast(this.eventlist[i..][0..@sizeOf(INotifyEvent)])));
i += event.name_len;
this.eventlist_ptrs[count] = event;
count += 1;
}
return .{ .result = this.eventlist_ptrs[0..count] };
},
.AGAIN => continue :restart,
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} },
}
return .{ .result = this.eventlist_ptrs[0..count] };
},
else => |err| return .{ .err = err },
}
}