Resolve watch directories outside main thread + async iterator and symlink fixes (#3846)

* linux working pending tests with FSEvents

* add more tests, fix async iterator

* remove unnecessary check

* fix macos symlink on directories

* remove indirection layer

* todos

* fixes and some permission test

* fix opsie and make prisma test more reliable

* rebase with main

* add comptime check for macOS

* oops

* oops2

* fix symlinks cascade on FSEvents

* use JSC.WorkPool

* use withResolver, createFIFO and fix close event on async iterator

* remove unused events

---------

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
This commit is contained in:
Ciro Spaciari
2023-07-27 23:23:54 -03:00
committed by GitHub
parent 52f39d728f
commit 7fa71dd032
8 changed files with 525 additions and 199 deletions

View File

@@ -354,25 +354,27 @@ pub const FSEventsLoop = struct {
for (loop.watchers.slice()) |watcher| {
if (watcher) |handle| {
const handle_path = handle.path;
for (paths, 0..) |path_ptr, i| {
var flags = event_flags[i];
var path = path_ptr[0..bun.len(path_ptr)];
// Filter out paths that are outside handle's request
if (path.len < handle.path.len or !bun.strings.startsWith(path, handle.path)) {
if (path.len < handle_path.len or !bun.strings.startsWith(path, handle_path)) {
continue;
}
const is_file = (flags & kFSEventStreamEventFlagItemIsDir) == 0;
// Remove common prefix, unless the watched folder is "/"
if (!(handle.path.len == 1 and handle.path[0] == '/')) {
path = path[handle.path.len..];
if (!(handle_path.len == 1 and handle_path[0] == '/')) {
path = path[handle_path.len..];
// Ignore events with path equal to directory itself
if (path.len <= 1 and is_file) {
continue;
}
if (path.len == 0) {
// Since we're using fsevents to watch the file itself, path == handle.path, and we now need to get the basename of the file back
// Since we're using fsevents to watch the file itself, path == handle_path, and we now need to get the basename of the file back
while (path.len > 0) {
if (bun.strings.startsWithChar(path, '/')) {
path = path[1..];
@@ -403,7 +405,7 @@ pub const FSEventsLoop = struct {
}
}
handle.emit(path, is_file, is_rename);
handle.emit(path, is_file, if (is_rename) .rename else .change);
}
handle.flush();
}
@@ -554,7 +556,7 @@ pub const FSEventsLoop = struct {
this.signal_source = null;
this.sem.deinit();
this.mutex.deinit();
if (this.watcher_count > 0) {
while (this.watchers.popOrNull()) |watcher| {
if (watcher) |w| {
@@ -578,11 +580,18 @@ pub const FSEventsWatcher = struct {
recursive: bool,
ctx: ?*anyopaque,
const Callback = *const fn (ctx: ?*anyopaque, path: string, is_file: bool, is_rename: bool) void;
const UpdateEndCallback = *const fn (ctx: ?*anyopaque) void;
pub const EventType = enum {
rename,
change,
@"error",
};
pub const Callback = *const fn (ctx: ?*anyopaque, path: string, is_file: bool, event_type: EventType) void;
pub const UpdateEndCallback = *const fn (ctx: ?*anyopaque) void;
pub fn init(loop: *FSEventsLoop, path: string, recursive: bool, callback: Callback, updateEnd: UpdateEndCallback, ctx: ?*anyopaque) *FSEventsWatcher {
var this = bun.default_allocator.create(FSEventsWatcher) catch unreachable;
this.* = FSEventsWatcher{
.path = path,
.callback = callback,
@@ -596,8 +605,8 @@ pub const FSEventsWatcher = struct {
return this;
}
pub fn emit(this: *FSEventsWatcher, path: string, is_file: bool, is_rename: bool) void {
this.callback(this.ctx, path, is_file, is_rename);
pub fn emit(this: *FSEventsWatcher, path: string, is_file: bool, event_type: EventType) void {
this.callback(this.ctx, path, is_file, event_type);
}
pub fn flush(this: *FSEventsWatcher) void {

View File

@@ -6,7 +6,6 @@ const Path = @import("../../resolver/resolve_path.zig");
const Encoder = JSC.WebCore.Encoder;
const Mutex = @import("../../lock.zig").Lock;
const FSEvents = @import("./fs_events.zig");
const PathWatcher = @import("./path_watcher.zig");
const VirtualMachine = JSC.VirtualMachine;
@@ -21,15 +20,12 @@ const Environment = bun.Environment;
pub const FSWatcher = struct {
ctx: *VirtualMachine,
verbose: bool = false,
entry_path: ?string = null,
entry_dir: string = "",
// JSObject
mutex: Mutex,
signal: ?*JSC.AbortSignal,
persistent: bool,
default_watcher: ?*PathWatcher.PathWatcher,
fsevents_watcher: ?*FSEvents.FSEventsWatcher,
path_watcher: ?*PathWatcher.PathWatcher,
poll_ref: JSC.PollRef = .{},
globalThis: *JSC.JSGlobalObject,
js_this: JSC.JSValue,
@@ -53,10 +49,6 @@ pub const FSWatcher = struct {
pub fn deinit(this: *FSWatcher) void {
// stop all managers and signals
this.detach();
if (this.entry_path) |path| {
this.entry_path = null;
bun.default_allocator.free(path);
}
bun.default_allocator.destroy(this);
}
@@ -154,33 +146,7 @@ pub const FSWatcher = struct {
}
};
pub fn onFSEventUpdate(
ctx: ?*anyopaque,
path: string,
is_file: bool,
is_rename: bool,
) void {
// only called by FSEventUpdate
const this = bun.cast(*FSWatcher, ctx.?);
const relative_path = bun.default_allocator.dupe(u8, path) catch unreachable;
const event_type: FSWatchTask.EventType = if (is_rename) .rename else .change;
if (this.verbose) {
if (is_file) {
Output.prettyErrorln("<r> <d>File changed: {s}<r>", .{relative_path});
} else {
Output.prettyErrorln("<r> <d>Dir changed: {s}<r>", .{relative_path});
}
}
this.current_task.append(relative_path, event_type, true);
}
pub fn onPathUpdate(ctx: ?*anyopaque, path: string, is_file: bool, event_type: PathWatcher.PathWatcher.EventType) void {
// only called by PathWatcher
const this = bun.cast(*FSWatcher, ctx.?);
const relative_path = bun.default_allocator.dupe(u8, path) catch unreachable;
@@ -212,7 +178,6 @@ pub const FSWatcher = struct {
Output.flush();
}
// we only enqueue after all events are processed
// this is called by FSEventsWatcher or PathWatcher
this.current_task.enqueue();
}
@@ -558,14 +523,9 @@ pub const FSWatcher = struct {
signal.detach(this);
}
if (this.default_watcher) |default_watcher| {
this.default_watcher = null;
default_watcher.deinit();
}
if (this.fsevents_watcher) |fsevents_watcher| {
this.fsevents_watcher = null;
fsevents_watcher.deinit();
if (this.path_watcher) |path_watcher| {
this.path_watcher = null;
path_watcher.deinit();
}
if (this.persistent) {
@@ -584,37 +544,6 @@ pub const FSWatcher = struct {
this.deinit();
}
const PathResult = struct {
fd: StoredFileDescriptorType = 0,
is_file: bool = true,
};
// TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages
fn fdFromAbsolutePathZ(
absolute_path_z: [:0]const u8,
) !PathResult {
if (std.fs.openIterableDirAbsoluteZ(absolute_path_z, .{
.access_sub_paths = true,
})) |iterable_dir| {
return PathResult{
.fd = iterable_dir.dir.fd,
.is_file = false,
};
} else |err| {
if (err == error.NotDir) {
var file = try std.fs.openFileAbsoluteZ(absolute_path_z, .{ .mode = .read_only });
return PathResult{
.fd = file.handle,
.is_file = true,
};
} else {
return err;
}
}
unreachable;
}
pub fn init(args: Arguments) !*FSWatcher {
var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
var slice = args.path.slice();
@@ -635,8 +564,6 @@ pub const FSWatcher = struct {
buf[file_path.len] = 0;
var file_path_z = buf[0..file_path.len :0];
var fs_type = try fdFromAbsolutePathZ(file_path_z);
var ctx = try bun.default_allocator.create(FSWatcher);
const vm = args.global_this.bunVM();
ctx.* = .{
@@ -648,8 +575,7 @@ pub const FSWatcher = struct {
.mutex = Mutex.init(),
.signal = if (args.signal) |s| s.ref() else null,
.persistent = args.persistent,
.default_watcher = null,
.fsevents_watcher = null,
.path_watcher = null,
.globalThis = args.global_this,
.js_this = .zero,
.encoding = args.encoding,
@@ -661,25 +587,7 @@ pub const FSWatcher = struct {
errdefer ctx.deinit();
if (comptime Environment.isMac) {
if (!fs_type.is_file) {
var dir_path_clone = bun.default_allocator.dupeZ(u8, file_path) catch unreachable;
ctx.entry_path = dir_path_clone;
ctx.entry_dir = dir_path_clone;
ctx.fsevents_watcher = try FSEvents.watch(dir_path_clone, args.recursive, onFSEventUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx));
ctx.initJS(args.listener);
return ctx;
}
}
var file_path_clone = bun.default_allocator.dupeZ(u8, file_path) catch unreachable;
ctx.entry_path = file_path_clone;
ctx.entry_dir = std.fs.path.dirname(file_path_clone) orelse file_path_clone;
ctx.default_watcher = try PathWatcher.watch(vm, file_path_clone, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx));
ctx.path_watcher = try PathWatcher.watch(vm, file_path_z, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx));
ctx.initJS(args.listener);
return ctx;
}

View File

@@ -4,6 +4,7 @@ const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue;
const Path = @import("../../resolver/resolve_path.zig");
const Fs = @import("../../fs.zig");
const Mutex = @import("../../lock.zig").Lock;
const FSEvents = @import("./fs_events.zig");
const bun = @import("root").bun;
const Output = bun.Output;
@@ -30,9 +31,11 @@ pub const PathWatcherManager = struct {
watcher_count: u32 = 0,
vm: *JSC.VirtualMachine,
file_paths: bun.StringHashMap(PathInfo),
current_fd_task: bun.FDHashMap(*DirectoryRegisterTask),
deinit_on_last_watcher: bool = false,
pending_tasks: u32 = 0,
deinit_on_last_task: bool = false,
mutex: Mutex,
const PathInfo = struct {
fd: StoredFileDescriptorType = 0,
is_file: bool = true,
@@ -42,10 +45,14 @@ pub const PathWatcherManager = struct {
hash: Watcher.HashType,
};
// TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages
fn _fdFromAbsolutePathZ(
this: *PathWatcherManager,
path: [:0]const u8,
) !PathInfo {
this.mutex.lock();
defer this.mutex.unlock();
if (this.file_paths.getEntry(path)) |entry| {
var info = entry.value_ptr;
info.refs += 1;
@@ -54,42 +61,39 @@ pub const PathWatcherManager = struct {
const cloned_path = try bun.default_allocator.dupeZ(u8, path);
errdefer bun.default_allocator.destroy(cloned_path);
var stat = try bun.C.lstat_absolute(cloned_path);
var result = PathInfo{
.path = cloned_path,
.dirname = cloned_path,
.hash = Watcher.getHash(cloned_path),
.refs = 1,
};
switch (stat.kind) {
.sym_link => {
if (std.fs.openIterableDirAbsoluteZ(cloned_path, .{
.access_sub_paths = true,
})) |iterable_dir| {
const result = PathInfo{
.fd = iterable_dir.dir.fd,
.is_file = false,
.path = cloned_path,
.dirname = cloned_path,
.hash = Watcher.getHash(cloned_path),
.refs = 1,
};
_ = try this.file_paths.put(cloned_path, result);
return result;
} else |err| {
if (err == error.NotDir) {
var file = try std.fs.openFileAbsoluteZ(cloned_path, .{ .mode = .read_only });
result.fd = file.handle;
const _stat = try file.stat();
result.is_file = _stat.kind != .directory;
if (result.is_file) {
result.dirname = std.fs.path.dirname(cloned_path) orelse cloned_path;
}
},
.directory => {
const dir = (try std.fs.openIterableDirAbsoluteZ(cloned_path, .{
.access_sub_paths = true,
})).dir;
result.fd = dir.fd;
result.is_file = false;
},
else => {
const file = try std.fs.openFileAbsoluteZ(cloned_path, .{ .mode = .read_only });
result.fd = file.handle;
result.is_file = true;
result.dirname = std.fs.path.dirname(cloned_path) orelse cloned_path;
},
const result = PathInfo{
.fd = file.handle,
.is_file = true,
.path = cloned_path,
// if is really a file we need to get the dirname
.dirname = std.fs.path.dirname(cloned_path) orelse cloned_path,
.hash = Watcher.getHash(cloned_path),
.refs = 1,
};
_ = try this.file_paths.put(cloned_path, result);
return result;
} else {
return err;
}
}
_ = try this.file_paths.put(cloned_path, result);
return result;
unreachable;
}
pub fn init(vm: *JSC.VirtualMachine) !*PathWatcherManager {
@@ -102,6 +106,7 @@ pub const PathWatcherManager = struct {
errdefer watchers.deinitWithAllocator(bun.default_allocator);
var manager = PathWatcherManager{
.file_paths = bun.StringHashMap(PathInfo).init(bun.default_allocator),
.current_fd_task = bun.FDHashMap(*DirectoryRegisterTask).init(bun.default_allocator),
.watchers = watchers,
.main_watcher = try Watcher.init(
this,
@@ -142,6 +147,7 @@ pub const PathWatcherManager = struct {
const watchers = this.watchers.slice();
for (events) |event| {
if (event.index >= file_paths.len) continue;
const file_path = file_paths[event.index];
const update_count = counts[event.index] + 1;
counts[event.index] = update_count;
@@ -168,6 +174,9 @@ pub const PathWatcherManager = struct {
for (watchers) |w| {
if (w) |watcher| {
if (comptime Environment.isMac) {
if (watcher.fsevents_watcher != null) continue;
}
const entry_point = watcher.path.dirname;
var path = file_path;
@@ -238,6 +247,9 @@ pub const PathWatcherManager = struct {
const event_type: PathWatcher.EventType = .rename; // renaming folders, creating folder or files will be always be rename
for (watchers) |w| {
if (w) |watcher| {
if (comptime Environment.isMac) {
if (watcher.fsevents_watcher != null) continue;
}
const entry_point = watcher.path.dirname;
var path = path_slice;
@@ -297,6 +309,7 @@ pub const PathWatcherManager = struct {
// stop all watchers
for (watchers) |w| {
if (w) |watcher| {
log("[watch] error: {s}", .{@errorName(err)});
watcher.emit(@errorName(err), 0, timestamp, false, .@"error");
watcher.flush();
}
@@ -312,47 +325,176 @@ pub const PathWatcherManager = struct {
this.deinit();
}
fn addDirectory(this: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo, buf: *[bun.MAX_PATH_BYTES + 1]u8) !void {
const fd = path.fd;
try this.main_watcher.addDirectory(fd, path.path, path.hash, false);
pub const DirectoryRegisterTask = struct {
manager: *PathWatcherManager,
path: PathInfo,
task: JSC.WorkPoolTask = .{ .callback = callback },
watcher_list: bun.BabyList(*PathWatcher) = .{},
var iter = (std.fs.IterableDir{ .dir = std.fs.Dir{
.fd = fd,
} }).iterate();
pub fn callback(task: *JSC.WorkPoolTask) void {
var routine = @fieldParentPtr(@This(), "task", task);
defer routine.deinit();
routine.run();
}
while (try iter.next()) |entry| {
var parts = [2]string{ path.path, entry.name };
var entry_path = Path.joinAbsStringBuf(
Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(),
buf,
&parts,
.auto,
);
fn schedule(manager: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo) !void {
manager.mutex.lock();
defer manager.mutex.unlock();
// keep the path alive
manager._incrementPathRefNoLock(path.path);
errdefer manager._decrementPathRef(path.path);
buf[entry_path.len] = 0;
var entry_path_z = buf[0..entry_path.len :0];
// use the same thread for the same fd to avoid race conditions
if (manager.current_fd_task.getEntry(path.fd)) |entry| {
var routine = entry.value_ptr.*;
watcher.mutex.lock();
defer watcher.mutex.unlock();
watcher.pending_directories += 1;
routine.watcher_list.push(bun.default_allocator, watcher) catch |err| {
watcher.pending_directories -= 1;
return err;
};
return;
}
var routine = try bun.default_allocator.create(DirectoryRegisterTask);
routine.* = DirectoryRegisterTask{
.manager = manager,
.path = path,
.watcher_list = bun.BabyList(*PathWatcher).initCapacity(bun.default_allocator, 1) catch |err| {
bun.default_allocator.destroy(routine);
return err;
},
};
errdefer routine.deinit();
try routine.watcher_list.push(bun.default_allocator, watcher);
watcher.mutex.lock();
defer watcher.mutex.unlock();
watcher.pending_directories += 1;
var child_path = try this._fdFromAbsolutePathZ(entry_path_z);
errdefer this._decrementPathRef(entry_path_z);
try watcher.file_paths.push(bun.default_allocator, child_path.path);
manager.current_fd_task.put(path.fd, routine) catch |err| {
watcher.pending_directories -= 1;
return err;
};
manager.pending_tasks += 1;
JSC.WorkPool.schedule(&routine.task);
return;
}
if (child_path.is_file) {
try this.main_watcher.addFile(child_path.fd, child_path.path, child_path.hash, options.Loader.file, 0, null, false);
} else {
if (watcher.recursive) {
try this.addDirectory(watcher, child_path, buf);
fn getNext(this: *DirectoryRegisterTask) ?*PathWatcher {
this.manager.mutex.lock();
defer this.manager.mutex.unlock();
const watcher = this.watcher_list.popOrNull();
if (watcher == null) {
// no more work todo, release the fd and path
_ = this.manager.current_fd_task.remove(this.path.fd);
this.manager._decrementPathRefNoLock(this.path.path);
return null;
}
return watcher;
}
fn processWatcher(
this: *DirectoryRegisterTask,
watcher: *PathWatcher,
buf: *[bun.MAX_PATH_BYTES + 1]u8,
) !void {
const manager = this.manager;
const path = this.path;
const fd = path.fd;
var iter = (std.fs.IterableDir{ .dir = std.fs.Dir{
.fd = fd,
} }).iterate();
defer {
watcher.mutex.lock();
watcher.pending_directories -= 1;
if (watcher.pending_directories == 0 and watcher.finalized) {
watcher.mutex.unlock();
watcher.deinit();
} else {
watcher.mutex.unlock();
}
}
// now we iterate over all files and directories
while (try iter.next()) |entry| {
var parts = [2]string{ path.path, entry.name };
var entry_path = Path.joinAbsStringBuf(
Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(),
buf,
&parts,
.auto,
);
buf[entry_path.len] = 0;
var entry_path_z = buf[0..entry_path.len :0];
var child_path = try manager._fdFromAbsolutePathZ(entry_path_z);
watcher.mutex.lock();
watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| {
watcher.mutex.unlock();
manager._decrementPathRef(entry_path_z);
return err;
};
watcher.mutex.unlock();
// we need to call this unlocked
if (child_path.is_file) {
try manager.main_watcher.addFile(child_path.fd, child_path.path, child_path.hash, options.Loader.file, 0, null, false);
} else {
if (watcher.recursive and !watcher.finalized) {
// this may trigger another thread with is desired when available to watch long trees
try manager._addDirectory(watcher, child_path);
}
}
}
}
fn run(this: *DirectoryRegisterTask) void {
var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
while (this.getNext()) |watcher| {
this.processWatcher(watcher, &buf) catch |err| {
log("[watch] error registering directory: {s}", .{@errorName(err)});
watcher.emit(@errorName(err), 0, std.time.milliTimestamp(), false, .@"error");
watcher.flush();
};
}
this.manager.mutex.lock();
this.manager.pending_tasks -= 1;
if (this.manager.deinit_on_last_task and this.manager.pending_tasks == 0) {
this.manager.mutex.unlock();
this.manager.deinit();
} else {
this.manager.mutex.unlock();
}
}
fn deinit(this: *DirectoryRegisterTask) void {
bun.default_allocator.destroy(this);
}
};
// this should only be called if thread pool is not null
fn _addDirectory(this: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo) !void {
const fd = path.fd;
try this.main_watcher.addDirectory(fd, path.path, path.hash, false);
return try DirectoryRegisterTask.schedule(this, watcher, path);
}
fn registerWatcher(this: *PathWatcherManager, watcher: *PathWatcher) !void {
this.mutex.lock();
defer this.mutex.unlock();
if (this.watcher_count == this.watchers.len) {
this.watcher_count += 1;
this.watchers.push(bun.default_allocator, watcher) catch unreachable;
this.watchers.push(bun.default_allocator, watcher) catch |err| {
this.watcher_count -= 1;
this.mutex.unlock();
return err;
};
} else {
var watchers = this.watchers.slice();
for (watchers, 0..) |w, i| {
@@ -363,16 +505,32 @@ pub const PathWatcherManager = struct {
}
}
}
this.mutex.unlock();
const path = watcher.path;
if (path.is_file) {
try this.main_watcher.addFile(path.fd, path.path, path.hash, options.Loader.file, 0, null, false);
} else {
var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
try this.addDirectory(watcher, path, &buf);
if (comptime Environment.isMac) {
if (watcher.fsevents_watcher != null) {
return;
}
}
try this._addDirectory(watcher, path);
}
}
fn _decrementPathRef(this: *PathWatcherManager, file_path: [:0]const u8) void {
fn _incrementPathRefNoLock(this: *PathWatcherManager, file_path: [:0]const u8) void {
if (this.file_paths.getEntry(file_path)) |entry| {
var path = entry.value_ptr;
if (path.refs > 0) {
path.refs += 1;
}
}
}
fn _decrementPathRefNoLock(this: *PathWatcherManager, file_path: [:0]const u8) void {
if (this.file_paths.getEntry(file_path)) |entry| {
var path = entry.value_ptr;
if (path.refs > 0) {
@@ -387,6 +545,12 @@ pub const PathWatcherManager = struct {
}
}
fn _decrementPathRef(this: *PathWatcherManager, file_path: [:0]const u8) void {
this.mutex.lock();
defer this.mutex.unlock();
this._decrementPathRefNoLock(file_path);
}
fn unregisterWatcher(this: *PathWatcherManager, watcher: *PathWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
@@ -408,9 +572,18 @@ pub const PathWatcherManager = struct {
}
this.watcher_count -= 1;
while (watcher.file_paths.popOrNull()) |file_path| {
this._decrementPathRef(file_path);
this._decrementPathRefNoLock(watcher.path.path);
if (comptime Environment.isMac) {
if (watcher.fsevents_watcher != null) {
break;
}
}
watcher.mutex.lock();
while (watcher.file_paths.popOrNull()) |file_path| {
this._decrementPathRefNoLock(file_path);
}
watcher.mutex.unlock();
break;
}
}
@@ -432,6 +605,12 @@ pub const PathWatcherManager = struct {
return;
}
if (this.pending_tasks > 0) {
// deinit when all tasks are done
this.deinit_on_last_task = true;
return;
}
this.main_watcher.deinit(false);
if (this.watcher_count > 0) {
@@ -454,7 +633,8 @@ pub const PathWatcherManager = struct {
this.file_paths.deinit();
this.watchers.deinitWithAllocator(bun.default_allocator);
// this.mutex.deinit();
this.current_fd_task.deinit();
bun.default_allocator.destroy(this);
}
@@ -471,7 +651,13 @@ pub const PathWatcher = struct {
// all watched file paths (including subpaths) except by path it self
file_paths: bun.BabyList([:0]const u8) = .{},
last_change_event: ChangeEvent = .{},
// on MacOS we use this to watch for changes on directories and subdirectories
fsevents_watcher: ?*FSEvents.FSEventsWatcher,
mutex: Mutex,
pending_directories: u32 = 0,
finalized: bool = false,
// only used on macOS
resolved_path: ?string = null,
pub const ChangeEvent = struct {
hash: PathWatcherManager.Watcher.HashType = 0,
event_type: EventType = .change,
@@ -488,13 +674,58 @@ pub const PathWatcher = struct {
pub fn init(manager: *PathWatcherManager, path: PathWatcherManager.PathInfo, recursive: bool, callback: Callback, updateEndCallback: UpdateEndCallback, ctx: ?*anyopaque) !*PathWatcher {
var this = try bun.default_allocator.create(PathWatcher);
if (comptime Environment.isMac) {
if (!path.is_file) {
var buffer: [bun.MAX_PATH_BYTES]u8 = undefined;
const resolved_path_temp = std.os.getFdPath(path.fd, &buffer) catch |err| {
bun.default_allocator.destroy(this);
return err;
};
const resolved_path = bun.default_allocator.dupeZ(u8, resolved_path_temp) catch |err| {
bun.default_allocator.destroy(this);
return err;
};
this.resolved_path = resolved_path;
this.* = PathWatcher{
.path = path,
.callback = callback,
.fsevents_watcher = FSEvents.watch(
resolved_path,
recursive,
bun.cast(FSEvents.FSEventsWatcher.Callback, callback),
bun.cast(FSEvents.FSEventsWatcher.UpdateEndCallback, updateEndCallback),
bun.cast(*anyopaque, ctx),
) catch |err| {
bun.default_allocator.destroy(this);
return err;
},
.manager = manager,
.recursive = recursive,
.flushCallback = updateEndCallback,
.file_paths = .{},
.ctx = ctx,
.mutex = Mutex.init(),
};
errdefer this.deinit();
// TODO: unify better FSEvents with PathWatcherManager
try manager.registerWatcher(this);
return this;
}
}
this.* = PathWatcher{
.fsevents_watcher = null,
.path = path,
.callback = callback,
.manager = manager,
.recursive = recursive,
.flushCallback = updateEndCallback,
.ctx = ctx,
.mutex = Mutex.init(),
.file_paths = bun.BabyList([:0]const u8).initCapacity(bun.default_allocator, 1) catch |err| {
bun.default_allocator.destroy(this);
return err;
@@ -525,10 +756,36 @@ pub const PathWatcher = struct {
}
pub fn deinit(this: *PathWatcher) void {
if (this.manager) |manager| {
manager.unregisterWatcher(this);
this.mutex.lock();
this.finalized = true;
if (this.pending_directories > 0) {
// will be freed on last directory
this.mutex.unlock();
return;
}
this.mutex.unlock();
if (this.manager) |manager| {
if (comptime Environment.isMac) {
if (this.fsevents_watcher) |watcher| {
// first unregister on FSEvents
watcher.deinit();
manager.unregisterWatcher(this);
} else {
manager.unregisterWatcher(this);
this.file_paths.deinitWithAllocator(bun.default_allocator);
}
} else {
manager.unregisterWatcher(this);
this.file_paths.deinitWithAllocator(bun.default_allocator);
}
}
if (comptime Environment.isMac) {
if (this.resolved_path) |path| {
bun.default_allocator.free(path);
}
}
this.file_paths.deinitWithAllocator(bun.default_allocator);
bun.default_allocator.destroy(this);
}

View File

@@ -729,6 +729,34 @@ pub fn getenvZ(path_: [:0]const u8) ?[]const u8 {
return sliceTo(ptr, 0);
}
//TODO: add windows support
pub const FDHashMapContext = struct {
pub fn hash(_: @This(), fd: FileDescriptor) u64 {
return @as(u64, @intCast(fd));
}
pub fn eql(_: @This(), a: FileDescriptor, b: FileDescriptor) bool {
return a == b;
}
pub fn pre(input: FileDescriptor) Prehashed {
return Prehashed{
.value = @This().hash(.{}, input),
.input = input,
};
}
pub const Prehashed = struct {
value: u64,
input: FileDescriptor,
pub fn hash(this: @This(), fd: FileDescriptor) u64 {
if (fd == this.input) return this.value;
return @as(u64, @intCast(fd));
}
pub fn eql(_: @This(), a: FileDescriptor, b: FileDescriptor) bool {
return a == b;
}
};
};
// These wrappers exist to use our strings.eqlLong function
pub const StringArrayHashMapContext = struct {
pub fn hash(_: @This(), s: []const u8) u32 {
@@ -831,6 +859,10 @@ pub fn StringHashMapUnmanaged(comptime Type: type) type {
return std.HashMapUnmanaged([]const u8, Type, StringHashMapContext, std.hash_map.default_max_load_percentage);
}
pub fn FDHashMap(comptime Type: type) type {
return std.HashMap(StoredFileDescriptorType, Type, FDHashMapContext, std.hash_map.default_max_load_percentage);
}
const CopyFile = @import("./copy_file.zig");
pub const copyFileRange = CopyFile.copyFileRange;
pub const copyFile = CopyFile.copyFile;

View File

@@ -2,6 +2,7 @@
// Note: `constants` is injected into the top of this file
declare var constants: typeof import("node:fs/promises").constants;
const { createFIFO } = $lazy("primordials");
var fs = Bun.fs();
@@ -26,7 +27,7 @@ export function watch(
eventType: string;
filename: string | Buffer | undefined;
};
const events: Array<Event> = [];
if (filename instanceof URL) {
throw new TypeError("Watch URLs are not supported yet");
} else if (Buffer.isBuffer(filename)) {
@@ -38,32 +39,55 @@ export function watch(
if (typeof options === "string") {
options = { encoding: options };
}
fs.watch(filename, options || {}, (eventType: string, filename: string | Buffer | undefined) => {
events.push({ eventType, filename });
const queue = createFIFO();
const watcher = fs.watch(filename, options || {}, (eventType: string, filename: string | Buffer | undefined) => {
queue.push({ eventType, filename });
if (nextEventResolve) {
const resolve = nextEventResolve;
nextEventResolve = null;
resolve();
}
});
return {
async *[Symbol.asyncIterator]() {
[Symbol.asyncIterator]() {
let closed = false;
while (!closed) {
while (events.length) {
let event = events.shift() as Event;
if (event.eventType === "close") {
closed = true;
break;
return {
async next() {
while (!closed) {
let event: Event;
while ((event = queue.shift() as Event)) {
if (event.eventType === "close") {
closed = true;
return { value: undefined, done: true };
}
if (event.eventType === "error") {
closed = true;
throw event.filename;
}
return { value: event, done: false };
}
const { promise, resolve } = Promise.withResolvers();
nextEventResolve = resolve;
await promise;
}
if (event.eventType === "error") {
return { value: undefined, done: true };
},
return() {
if (!closed) {
watcher.close();
closed = true;
throw event.filename;
if (nextEventResolve) {
const resolve = nextEventResolve;
nextEventResolve = null;
resolve();
}
}
yield event;
}
await new Promise((resolve: Function) => (nextEventResolve = resolve));
}
return { value: undefined, done: true };
},
};
},
};
}

View File

@@ -1 +1 @@
var o=(S)=>{return import.meta.require(S)};function G(S,U={}){const A=[];if(S instanceof URL)throw new TypeError("Watch URLs are not supported yet");else if(Buffer.isBuffer(S))S=S.toString();else if(typeof S!=="string")throw new TypeError("Expected path to be a string or Buffer");let z=null;if(typeof U==="string")U={encoding:U};return C.watch(S,U||{},(q,g)=>{if(A.push({eventType:q,filename:g}),z){const B=z;z=null,B()}}),{async*[Symbol.asyncIterator](){let q=!1;while(!q){while(A.length){let g=A.shift();if(g.eventType==="close"){q=!0;break}if(g.eventType==="error")throw q=!0,g.filename;yield g}await new Promise((g)=>z=g)}}}}var C=Bun.fs(),D="::bunternal::",J={[D]:(S)=>{return async function(...U){return await 1,S.apply(C,U)}}}[D],H=J(C.accessSync),I=J(C.appendFileSync),K=J(C.closeSync),L=J(C.copyFileSync),M=J(C.existsSync),N=J(C.chownSync),O=J(C.chmodSync),P=J(C.fchmodSync),Q=J(C.fchownSync),V=J(C.fstatSync),X=J(C.fsyncSync),Y=J(C.ftruncateSync),Z=J(C.futimesSync),_=J(C.lchmodSync),$=J(C.lchownSync),T=J(C.linkSync),W=C.lstat.bind(C),j=J(C.mkdirSync),x=J(C.mkdtempSync),E=J(C.openSync),F=J(C.readSync),k=J(C.writeSync),R=C.readdir.bind(C),w=C.readFile.bind(C),h=J(C.writeFileSync),b=J(C.readlinkSync),u=J(C.realpathSync),d=J(C.renameSync),c=C.stat.bind(C),v=J(C.symlinkSync),a=J(C.truncateSync),l=J(C.unlinkSync),y=J(C.utimesSync),p=J(C.lutimesSync),m=J(C.rmSync),n=J(C.rmdirSync),t=(S,U,A)=>{return new Promise((z,q)=>{try{var g=C.writevSync(S,U,A)}catch(B){q(B);return}z({bytesWritten:g,buffers:U})})},r=(S,U,A)=>{return new Promise((z,q)=>{try{var g=C.readvSync(S,U,A)}catch(B){q(B);return}z({bytesRead:g,buffers:U})})},i={access:H,appendFile:I,close:K,copyFile:L,exists:M,chown:N,chmod:O,fchmod:P,fchown:Q,fstat:V,fsync:X,ftruncate:Y,futimes:Z,lchmod:_,lchown:$,link:T,lstat:W,mkdir:j,mkdtemp:x,open:E,read:F,write:k,readdir:R,readFile:w,writeFile:h,readlink:b,realpath:u,rename:d,stat:c,symlink:v,truncate:a,unlink:l,utimes:y,lutimes:p,rm:m,rmdir:n,watch:G,writev:t,readv:r,constants,[Symbol.for("CommonJS")]:0};export{t as writev,h as writeFile,k as write,G as watch,y as utimes,l as unlink,a as truncate,v as symlink,c as stat,n as rmdir,m as rm,d as rename,u as realpath,r as readv,b as readlink,R as readdir,w as readFile,F as read,E as open,x as mkdtemp,j as mkdir,p as lutimes,W as lstat,T as link,$ as lchown,_ as lchmod,Z as futimes,Y as ftruncate,X as fsync,V as fstat,Q as fchown,P as fchmod,M as exists,i as default,L as copyFile,K as close,N as chown,O as chmod,I as appendFile,H as access};
var s=(z)=>{return import.meta.require(z)};function N(z,B={}){if(z instanceof URL)throw new TypeError("Watch URLs are not supported yet");else if(Buffer.isBuffer(z))z=z.toString();else if(typeof z!=="string")throw new TypeError("Expected path to be a string or Buffer");let C=null;if(typeof B==="string")B={encoding:B};const G=M(),H=S.watch(z,B||{},(D,A)=>{if(G.push({eventType:D,filename:A}),C){const I=C;C=null,I()}});return{[Symbol.asyncIterator](){let D=!1;return{async next(){while(!D){let A;while(A=G.shift()){if(A.eventType==="close")return D=!0,{value:void 0,done:!0};if(A.eventType==="error")throw D=!0,A.filename;return{value:A,done:!1}}const{promise:I,resolve:L}=Promise.withResolvers();C=L,await I}return{value:void 0,done:!0}},return(){if(!D){if(H.close(),D=!0,C){const A=C;C=null,A()}}return{value:void 0,done:!0}}}}}}var{createFIFO:M}=globalThis[Symbol.for("Bun.lazy")]("primordials"),S=Bun.fs(),K="::bunternal::",J={[K]:(z)=>{return async function(...B){return await 1,z.apply(S,B)}}}[K],P=J(S.accessSync),Q=J(S.appendFileSync),U=J(S.closeSync),V=J(S.copyFileSync),X=J(S.existsSync),Y=J(S.chownSync),Z=J(S.chmodSync),_=J(S.fchmodSync),$=J(S.fchownSync),q=J(S.fstatSync),O=J(S.fsyncSync),g=J(S.ftruncateSync),T=J(S.futimesSync),W=J(S.lchmodSync),j=J(S.lchownSync),k=J(S.linkSync),E=S.lstat.bind(S),h=J(S.mkdirSync),w=J(S.mkdtempSync),x=J(S.openSync),F=J(S.readSync),R=J(S.writeSync),b=S.readdir.bind(S),u=S.readFile.bind(S),d=J(S.writeFileSync),c=J(S.readlinkSync),v=J(S.realpathSync),a=J(S.renameSync),y=S.stat.bind(S),l=J(S.symlinkSync),p=J(S.truncateSync),m=J(S.unlinkSync),n=J(S.utimesSync),t=J(S.lutimesSync),r=J(S.rmSync),o=J(S.rmdirSync),f=(z,B,C)=>{return new Promise((G,H)=>{try{var D=S.writevSync(z,B,C)}catch(A){H(A);return}G({bytesWritten:D,buffers:B})})},i=(z,B,C)=>{return new Promise((G,H)=>{try{var D=S.readvSync(z,B,C)}catch(A){H(A);return}G({bytesRead:D,buffers:B})})},SS={access:P,appendFile:Q,close:U,copyFile:V,exists:X,chown:Y,chmod:Z,fchmod:_,fchown:$,fstat:q,fsync:O,ftruncate:g,futimes:T,lchmod:W,lchown:j,link:k,lstat:E,mkdir:h,mkdtemp:w,open:x,read:F,write:R,readdir:b,readFile:u,writeFile:d,readlink:c,realpath:v,rename:a,stat:y,symlink:l,truncate:p,unlink:m,utimes:n,lutimes:t,rm:r,rmdir:o,watch:N,writev:f,readv:i,constants,[Symbol.for("CommonJS")]:0};export{f as writev,d as writeFile,R as write,N as watch,n as utimes,m as unlink,p as truncate,l as symlink,y as stat,o as rmdir,r as rm,a as rename,v as realpath,i as readv,c as readlink,b as readdir,u as readFile,F as read,x as open,w as mkdtemp,h as mkdir,t as lutimes,E as lstat,k as link,j as lchown,W as lchmod,T as futimes,g as ftruncate,O as fsync,q as fstat,$ as fchown,_ as fchmod,X as exists,SS as default,V as copyFile,U as close,Y as chown,Z as chmod,Q as appendFile,P as access};

View File

@@ -374,9 +374,31 @@ describe("fs.watch", () => {
expect(promise).resolves.toBe("change");
});
test("immediately closing works correctly", async () => {
for (let i = 0; i < 100; i++) fs.watch(testDir, { persistent: true }).close();
for (let i = 0; i < 100; i++) fs.watch(testDir, { persistent: false }).close();
test("should throw if no permission to watch the directory", async () => {
const filepath = path.join(testDir, "permission-dir");
fs.mkdirSync(filepath, { recursive: true });
await fs.promises.chmod(filepath, 0o200);
try {
const watcher = fs.watch(filepath);
watcher.close();
expect("unreacheable").toBe(false);
} catch (err: any) {
expect(err.message.indexOf("AccessDenied") !== -1).toBeTrue();
}
});
test("should throw if no permission to watch the file", async () => {
const filepath = path.join(testDir, "permission-file");
fs.writeFileSync(filepath, "hello.txt");
await fs.promises.chmod(filepath, 0o200);
try {
const watcher = fs.watch(filepath);
watcher.close();
expect("unreacheable").toBe(false);
} catch (err: any) {
expect(err.message.indexOf("AccessDenied") !== -1).toBeTrue();
}
});
});
@@ -502,6 +524,64 @@ describe("fs.promises.watch", () => {
})();
});
test("should work with symlink -> symlink -> dir", async () => {
const filepath = path.join(testDir, "sym-symlink-indirect");
const dest = path.join(testDir, "sym-symlink-dest");
fs.rmSync(filepath, { recursive: true, force: true });
fs.rmSync(dest, { recursive: true, force: true });
fs.mkdirSync(dest, { recursive: true });
await fs.promises.symlink(dest, filepath);
const indirect_sym = path.join(testDir, "sym-symlink-to-symlink-dir");
await fs.promises.symlink(filepath, indirect_sym);
const watcher = fs.promises.watch(indirect_sym);
const interval = setInterval(() => {
fs.writeFileSync(path.join(indirect_sym, "hello.txt"), "hello");
}, 10);
const promise = (async () => {
try {
for await (const event of watcher) {
return event.eventType;
}
} catch {
expect("unreacheable").toBe(false);
} finally {
clearInterval(interval);
}
})();
expect(promise).resolves.toBe("rename");
});
test("should work with symlink dir", async () => {
const filepath = path.join(testDir, "sym-symlink-dir");
const dest = path.join(testDir, "sym-symlink-dest");
fs.rmSync(filepath, { recursive: true, force: true });
fs.rmSync(dest, { recursive: true, force: true });
fs.mkdirSync(dest, { recursive: true });
await fs.promises.symlink(dest, filepath);
const watcher = fs.promises.watch(filepath);
const interval = setInterval(() => {
fs.writeFileSync(path.join(filepath, "hello.txt"), "hello");
}, 10);
const promise = (async () => {
try {
for await (const event of watcher) {
return event.eventType;
}
} catch {
expect("unreacheable").toBe(false);
} finally {
clearInterval(interval);
}
})();
expect(promise).resolves.toBe("rename");
});
test("should work with symlink", async () => {
const filepath = path.join(testDir, "sym-symlink.txt");
await fs.promises.symlink(path.join(testDir, "sym.txt"), filepath);
@@ -525,3 +605,19 @@ describe("fs.promises.watch", () => {
expect(promise).resolves.toBe("change");
});
});
describe("immediately closing", () => {
test("works correctly with files", async () => {
const filepath = path.join(testDir, "close.txt");
for (let i = 0; i < 100; i++) fs.watch(filepath, { persistent: true }).close();
for (let i = 0; i < 100; i++) fs.watch(filepath, { persistent: false }).close();
});
test("works correctly with directories", async () => {
for (let i = 0; i < 100; i++) fs.watch(testDir, { persistent: true }).close();
for (let i = 0; i < 100; i++) fs.watch(testDir, { persistent: false }).close();
});
test("works correctly with recursive directories", async () => {
for (let i = 0; i < 100; i++) fs.watch(testDir, { persistent: true, recursive: true }).close();
for (let i = 0; i < 100; i++) fs.watch(testDir, { persistent: false, recursive: false }).close();
});
});

View File

@@ -3,7 +3,7 @@ import { generateClient } from "./helper.ts";
import type { PrismaClient } from "./prisma/types.d.ts";
function* TestIDGenerator(): Generator<number> {
let i = 0;
let i = Math.floor(Math.random() * 10000);
while (true) {
yield i++;
}