Compare commits

...

16 Commits

Author SHA1 Message Date
Jarred-Sumner
8be29136e8 bun run zig-format 2025-06-07 01:01:54 +00:00
Jarred Sumner
c776932a92 Delete filesystem-watcher-fixes.md 2025-06-06 17:59:28 -07:00
Jarred Sumner
5432c5814e more 2025-06-06 17:59:12 -07:00
Jarred Sumner
58b8201fbe apply patches 2025-06-06 16:45:12 -07:00
Jarred Sumner
e8a0136501 Cursor/fix next auth test timeouts and memory issues 43d5 (#20228)
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
2025-06-06 04:30:43 -07:00
Jarred Sumner
ed8ac14385 Merge branch 'main' into jarred/fix-node-fs-test 2025-06-06 03:57:26 -07:00
Jarred Sumner
b253214e00 Update posix_event_loop.zig 2025-05-02 23:14:48 -07:00
Jarred Sumner
45b36de60c try this 2025-05-02 23:14:46 -07:00
Jarred Sumner
3eff966276 Merge branch 'main' into jarred/fix-node-fs-test 2025-05-02 22:22:58 -07:00
Jarred Sumner
dabc14b7e9 Merge branch 'main' into jarred/fix-node-fs-test 2025-04-28 02:57:33 -07:00
Jarred Sumner
e99a7d634f Update path_watcher.zig 2025-04-28 02:26:41 -07:00
Jarred Sumner
5e71496065 Update pool.zig 2025-04-25 02:33:02 -07:00
Jarred Sumner
35608a9ced Update path_watcher.zig 2025-04-25 02:22:06 -07:00
Jarred Sumner
c0d9acae3c try this 2025-04-25 01:30:45 -07:00
Jarred Sumner
3f43dbcc80 There's more 2025-04-25 00:22:21 -07:00
Jarred Sumner
c8126a2b47 Fix next-auth test failure 2025-04-25 00:05:06 -07:00
11 changed files with 461 additions and 259 deletions

View File

@@ -103,10 +103,15 @@ pub fn start(this: *Watcher) !void {
pub fn deinit(this: *Watcher, close_descriptors: bool) void {
if (this.watchloop_handle != null) {
this.mutex.lock();
defer this.mutex.unlock();
this.close_descriptors = close_descriptors;
this.running = false;
{
this.mutex.lock();
defer this.mutex.unlock();
this.close_descriptors = close_descriptors;
this.running = false;
}
// Wait for the watcher thread to finish (mutex is unlocked here)
this.thread.join();
} else {
if (close_descriptors and this.running) {
const fds = this.watchlist.items(.fd);

View File

@@ -22,7 +22,8 @@ pub const KeepAlive = struct {
/// Make calling ref() on this poll into a no-op.
pub fn disable(this: *KeepAlive) void {
this.unref(JSC.VirtualMachine.get());
if (this.status == .active)
this.unref(JSC.VirtualMachine.get());
this.status = .done;
}

View File

@@ -269,7 +269,7 @@ pub const Async = struct {
this.result = @field(NodeFS, "uv_" ++ @tagName(FunctionEnum))(&node_fs, this.args, @intFromEnum(req.result));
if (this.result == .err) {
this.result.err.path = bun.default_allocator.dupe(u8, this.result.err.path) catch "";
this.result.err = this.result.err.clone(bun.default_allocator) catch bun.outOfMemory();
std.mem.doNotOptimizeAway(&node_fs);
}
@@ -283,7 +283,7 @@ pub const Async = struct {
this.result = @field(NodeFS, "uv_" ++ @tagName(FunctionEnum))(&node_fs, this.args, req, @intFromEnum(req.result));
if (this.result == .err) {
this.result.err.path = bun.default_allocator.dupe(u8, this.result.err.path) catch "";
this.result.err = this.result.err.clone(bun.default_allocator) catch bun.outOfMemory();
std.mem.doNotOptimizeAway(&node_fs);
}
@@ -320,7 +320,7 @@ pub const Async = struct {
pub fn deinit(this: *Task) void {
if (this.result == .err) {
bun.default_allocator.free(this.result.err.path);
this.result.err.deinit();
}
this.ref.unref(this.globalObject.bunVM());
@@ -382,7 +382,7 @@ pub const Async = struct {
this.result = function(&node_fs, this.args, .@"async");
if (this.result == .err) {
this.result.err.path = bun.default_allocator.dupe(u8, this.result.err.path) catch "";
this.result.err = this.result.err.clone(bun.default_allocator) catch bun.outOfMemory();
std.mem.doNotOptimizeAway(&node_fs);
}
@@ -428,7 +428,7 @@ pub const Async = struct {
pub fn deinit(this: *Task) void {
if (this.result == .err) {
bun.default_allocator.free(this.result.err.path);
this.result.err.deinit();
}
this.ref.unref(this.globalObject.bunVM());
@@ -467,7 +467,6 @@ pub fn NewAsyncCpTask(comptime is_shell: bool) type {
/// When each task is finished, decrement.
/// The maintask thread starts this at 1 and decrements it at the end, to avoid the promise being resolved while new tasks may be added.
subtask_count: std.atomic.Value(usize),
deinitialized: bool = false,
shelltask: ShellTaskT,
@@ -643,7 +642,7 @@ pub fn NewAsyncCpTask(comptime is_shell: bool) type {
this.result = result;
if (this.result == .err) {
this.result.err.path = bun.default_allocator.dupe(u8, this.result.err.path) catch "";
this.result.err = this.result.err.clone(bun.default_allocator) catch bun.outOfMemory();
}
if (this.evtloop == .js) {
@@ -693,8 +692,9 @@ pub fn NewAsyncCpTask(comptime is_shell: bool) type {
}
pub fn deinit(this: *ThisAsyncCpTask) void {
bun.assert(!this.deinitialized);
this.deinitialized = true;
if (this.result == .err) {
this.result.err.deinit();
}
if (comptime !is_shell) this.ref.unref(this.evtloop);
this.args.deinit();
this.promise.deinit();
@@ -1248,7 +1248,7 @@ pub const AsyncReaddirRecursiveTask = struct {
pub fn deinit(this: *AsyncReaddirRecursiveTask) void {
bun.assert(this.root_fd == bun.invalid_fd); // should already have closed it
if (this.pending_err) |*err| {
bun.default_allocator.free(err.path);
err.deinit();
}
this.ref.unref(this.globalObject.bunVM());
@@ -5951,11 +5951,7 @@ pub const NodeFS = struct {
pub fn watch(_: *NodeFS, args: Arguments.Watch, _: Flavor) Maybe(Return.Watch) {
return switch (args.createFSWatcher()) {
.result => |result| .{ .result = result.js_this },
.err => |err| .{ .err = .{
.errno = err.errno,
.syscall = err.syscall,
.path = if (err.path.len > 0) args.path.slice() else "",
} },
.err => |err| .{ .err = err },
};
}

View File

@@ -41,7 +41,13 @@ pub const FSWatcher = struct {
/// While it's not closed, the pending activity
pending_activity_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
current_task: FSWatchTask = undefined,
current_task: FSWatchTask = .empty,
// NOTE: this may equal `.current_task`. do not free it!
// if we create multiple on the same tick we use this buffer.
last_task: ?*FSWatchTask = null,
js_thread: std.Thread.Id,
pub fn eventLoop(this: FSWatcher) *EventLoop {
return this.ctx.eventLoop();
@@ -60,20 +66,32 @@ pub const FSWatcher = struct {
pub const finalize = deinit;
pub const FSWatchTask = if (Environment.isWindows) FSWatchTaskWindows else FSWatchTaskPosix;
pub const FSWatchTaskPosix = struct {
pub const new = bun.TrivialNew(@This());
const FSWatchTaskPosix = struct {
concurrent_task: JSC.EventLoopTask = .{ .mini = .{ .callback = undefined } },
ctx: *FSWatcher,
count: u8 = 0,
count: u8,
entries: [BATCH_SIZE]Entry = undefined,
// Track if we need to perform ref operation on main thread
needs_ref: bool = false,
entries: [8]Entry = undefined,
concurrent_task: JSC.ConcurrentTask = undefined,
pub const empty = FSWatchTask{
.concurrent_task = .{ .mini = .{ .callback = undefined } },
.ctx = undefined,
.count = 0,
.entries = undefined,
.needs_ref = false,
};
pub const Entry = struct {
const Entry = struct {
event: Event,
needs_free: bool,
};
const BATCH_SIZE = 10;
pub const new = bun.TrivialNew(@This());
pub fn deinit(this: *FSWatchTask) void {
this.cleanEntries();
if (comptime Environment.allow_assert) {
@@ -82,13 +100,15 @@ pub const FSWatcher = struct {
bun.destroy(this);
}
pub fn append(this: *FSWatchTask, event: Event, needs_free: bool) void {
if (this.count == 8) {
pub fn append(this: *FSWatchTaskPosix, event: Event, needs_free: bool) void {
if (this.count >= this.entries.len) {
this.enqueue();
const ctx = this.ctx;
this.* = .{
.ctx = ctx,
.count = 0,
.concurrent_task = undefined,
.needs_ref = false,
};
}
@@ -101,25 +121,35 @@ pub const FSWatcher = struct {
pub fn run(this: *FSWatchTask) void {
// this runs on JS Context Thread
const ctx = this.ctx;
// Perform ref operation on main thread if needed
if (this.needs_ref and ctx.persistent) {
ctx.poll_ref.ref(ctx.ctx);
}
for (this.entries[0..this.count]) |entry| {
switch (entry.event) {
inline .rename, .change => |file_path, t| {
this.ctx.emit(file_path, t);
if (comptime Environment.isWindows) {
unreachable; // Windows uses FSWatchTaskWindows
} else {
ctx.emit(file_path, t);
}
},
.@"error" => |err| {
this.ctx.emitError(err);
ctx.emitError(err);
},
.abort => {
this.ctx.emitIfAborted();
ctx.emitIfAborted();
},
.close => {
this.ctx.emit("", .close);
ctx.emit("", .close);
},
}
}
this.ctx.unrefTask();
ctx.unrefTaskMainThread();
}
pub fn appendAbort(this: *FSWatchTask) void {
@@ -131,17 +161,33 @@ pub const FSWatcher = struct {
if (this.count == 0)
return;
// if false is closed or detached (can still contain valid refs but will not create a new one)
if (this.ctx.refTask()) {
var that = FSWatchTask.new(this.*);
this.count = 0;
that.concurrent_task.task = JSC.Task.init(that);
this.ctx.enqueueTaskConcurrent(&that.concurrent_task);
// Check if we can proceed from watcher thread
const ctx = this.ctx;
ctx.mutex.lock();
defer ctx.mutex.unlock();
if (ctx.closed) {
this.cleanEntries();
return;
}
// closed or detached so just cleanEntries
this.cleanEntries();
// Track pending activity
const current = ctx.pending_activity_count.fetchAdd(1, .monotonic);
var that = FSWatchTask.new(this.*);
// Clear the count before enqueueing to avoid double processing
const saved_count = this.count;
this.count = 0;
that.count = saved_count;
// If we went from 0 to 1, we need to ref on the main thread
that.needs_ref = (current == 0);
that.concurrent_task = JSC.EventLoopTask.init(.js);
that.concurrent_task.js.task = JSC.Task.init(that);
ctx.enqueueTaskConcurrent(&that.concurrent_task.js);
}
pub fn cleanEntries(this: *FSWatchTask) void {
for (this.entries[0..this.count]) |*entry| {
if (entry.needs_free) {
@@ -167,6 +213,7 @@ pub const FSWatcher = struct {
pub fn dupe(event: Event) !Event {
return switch (event) {
inline .rename, .change => |path, t| @unionInit(Event, @tagName(t), try bun.default_allocator.dupe(u8, path)),
.@"error" => |err| @unionInit(Event, @tagName(.@"error"), try err.clone(bun.default_allocator)),
inline else => |value, t| @unionInit(Event, @tagName(t), value),
};
}
@@ -177,6 +224,7 @@ pub const FSWatcher = struct {
else => bun.default_allocator.free(path.*),
.windows => path.deinit(),
},
.@"error" => |*err| err.deinit(),
else => {},
}
}
@@ -206,6 +254,10 @@ pub const FSWatcher = struct {
/// Unused: To match the API of the posix version
count: u0 = 0,
pub const empty = FSWatchTaskWindows{
.ctx = undefined,
};
pub const StringOrBytesToDecode = union(enum) {
string: bun.String,
bytes_to_free: []const u8,
@@ -432,9 +484,9 @@ pub const FSWatcher = struct {
};
pub fn initJS(this: *FSWatcher, listener: JSC.JSValue) void {
// We're on main thread during init
if (this.persistent) {
this.poll_ref.ref(this.ctx);
_ = this.pending_activity_count.fetchAdd(1, .monotonic);
}
const js_this = this.toJS(this.globalThis);
@@ -446,9 +498,8 @@ pub const FSWatcher = struct {
// already aborted?
if (s.aborted()) {
// safely abort next tick
this.current_task = .{
.ctx = this,
};
this.current_task = .empty;
this.current_task.ctx = this;
this.current_task.appendAbort();
} else {
// watch for abortion
@@ -554,6 +605,7 @@ pub const FSWatcher = struct {
pub fn doRef(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
if (!this.closed and !this.persistent) {
this.persistent = true;
// We're on main thread here
this.poll_ref.ref(this.ctx);
}
return .undefined;
@@ -562,6 +614,7 @@ pub const FSWatcher = struct {
pub fn doUnref(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
if (this.persistent) {
this.persistent = false;
// We're on main thread here
this.poll_ref.unref(this.ctx);
}
return .undefined;
@@ -571,13 +624,31 @@ pub const FSWatcher = struct {
return JSC.JSValue.jsBoolean(this.persistent);
}
// this can be called from Watcher Thread or JS Context Thread
// Only call from main thread
pub fn refTaskMainThread(this: *FSWatcher) void {
if (this.persistent) {
this.poll_ref.ref(this.ctx);
}
}
// Only call from main thread
pub fn unrefTaskMainThread(this: *FSWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
const current = this.pending_activity_count.fetchSub(1, .monotonic);
// If we went from 1 to 0, we need to unref the poll
if (current == 1 and this.persistent) {
this.poll_ref.unref(this.ctx);
}
}
// Can be called from any thread - delegates to main thread version when on JS thread
pub fn refTask(this: *FSWatcher) bool {
this.mutex.lock();
defer this.mutex.unlock();
if (this.closed) return false;
_ = this.pending_activity_count.fetchAdd(1, .monotonic);
return true;
}
@@ -585,11 +656,15 @@ pub const FSWatcher = struct {
return this.pending_activity_count.load(.acquire) > 0;
}
// Can be called from any thread - delegates to main thread version when on JS thread
pub fn unrefTask(this: *FSWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
// JSC eventually will free it
_ = this.pending_activity_count.fetchSub(1, .monotonic);
// If we're on the JS thread (where we have access to the VM), use the main thread version
// Otherwise just decrement the count
if (this.js_thread == std.Thread.getCurrentId()) {
this.unrefTaskMainThread();
} else {
_ = this.pending_activity_count.fetchSub(1, .monotonic);
}
}
pub fn close(this: *FSWatcher) void {
@@ -602,14 +677,18 @@ pub const FSWatcher = struct {
if (js_this != .zero) {
if (FSWatcher.js.listenerGetCached(js_this)) |listener| {
_ = this.refTask();
// We're on main thread during close
const current = this.pending_activity_count.fetchAdd(1, .monotonic);
if (current == 0 and this.persistent) {
this.poll_ref.ref(this.ctx);
}
log("emit('close')", .{});
emitJS(listener, this.globalThis, .undefined, .close);
this.unrefTask();
this.unrefTaskMainThread();
}
}
this.unrefTask();
this.unrefTaskMainThread();
} else {
this.mutex.unlock();
}
@@ -624,7 +703,7 @@ pub const FSWatcher = struct {
if (this.persistent) {
this.persistent = false;
this.poll_ref.unref(this.ctx);
this.poll_ref.disable();
}
if (this.signal) |signal| {
@@ -641,30 +720,34 @@ pub const FSWatcher = struct {
}
pub fn init(args: Arguments) bun.JSC.Maybe(*FSWatcher) {
var buf: bun.PathBuffer = undefined;
var slice = args.path.slice();
if (bun.strings.startsWith(slice, "file://")) {
slice = slice[6..];
}
var joined_buf = bun.PathBufferPool.get();
defer bun.PathBufferPool.put(joined_buf);
const file_path = brk: {
var buf = bun.PathBufferPool.get();
defer bun.PathBufferPool.put(buf);
var parts = [_]string{
slice,
var slice = args.path.slice();
if (bun.strings.startsWith(slice, "file://")) {
slice = slice[6..];
}
var parts = [_]string{
slice,
};
const cwd = switch (bun.sys.getcwd(buf)) {
.result => |r| r,
.err => |err| return .{ .err = err },
};
buf[cwd.len] = std.fs.path.sep;
break :brk Path.joinAbsStringBuf(
buf[0 .. cwd.len + 1],
joined_buf,
&parts,
.auto,
);
};
const cwd = switch (bun.sys.getcwd(&buf)) {
.result => |r| r,
.err => |err| return .{ .err = err },
};
buf[cwd.len] = std.fs.path.sep;
var joined_buf: bun.PathBuffer = undefined;
const file_path = Path.joinAbsStringBuf(
buf[0 .. cwd.len + 1],
&joined_buf,
&parts,
.auto,
);
joined_buf[file_path.len] = 0;
const file_path_z = joined_buf[0..file_path.len :0];
@@ -672,19 +755,16 @@ pub const FSWatcher = struct {
const ctx = bun.new(FSWatcher, .{
.ctx = vm,
.current_task = .{
.ctx = undefined,
.count = 0,
},
.mutex = .{},
.signal = if (args.signal) |s| s.ref() else null,
.persistent = args.persistent,
.closed = false,
.path_watcher = null,
.globalThis = args.global_this,
.js_this = .zero,
.encoding = args.encoding,
.closed = false,
.verbose = args.verbose,
.encoding = args.encoding,
.js_thread = std.Thread.getCurrentId(),
});
ctx.current_task.ctx = ctx;
@@ -692,6 +772,8 @@ pub const FSWatcher = struct {
switch (PathWatcher.watch(vm, file_path_z, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx))) {
.result => |r| r,
.err => |err| {
var err2 = err;
defer err2.deinit();
ctx.deinit();
return .{ .err = .{
.errno = err.errno,

View File

@@ -59,11 +59,22 @@ pub const PathWatcherManager = struct {
}
fn unrefPendingTask(this: *PathWatcherManager) void {
this.mutex.lock();
defer this.mutex.unlock();
this.pending_tasks -= 1;
if (this.deinit_on_last_task and this.pending_tasks == 0) {
this.has_pending_tasks.store(false, .release);
const should_deinit = brk: {
this.mutex.lock();
defer this.mutex.unlock();
const pending_task_count = this.pending_tasks;
bun.debugAssert(pending_task_count > 0);
this.pending_tasks = pending_task_count - 1;
if (pending_task_count == 1) {
this.has_pending_tasks.store(false, .release);
break :brk this.deinit_on_last_task;
}
break :brk false;
};
if (should_deinit) {
this.deinit();
}
}
@@ -323,7 +334,7 @@ pub const PathWatcherManager = struct {
const watchers = this.watchers.slice();
const timestamp = std.time.milliTimestamp();
// stop all watchers
// stop all JS watchers by emitting the error
for (watchers) |w| {
if (w) |watcher| {
log("[watch] error: {}", .{err});
@@ -432,75 +443,75 @@ pub const PathWatcherManager = struct {
const manager = this.manager;
const path = this.path;
const fd = path.fd;
var iter = fd.stdDir().iterate();
var iter = bun.DirIterator.iterate(fd.stdDir(), .u8);
// now we iterate over all files and directories
while (iter.next() catch |err| {
return .{
.err = .{
.errno = @truncate(@intFromEnum(switch (err) {
error.AccessDenied => bun.sys.E.ACCES,
error.SystemResources => bun.sys.E.NOMEM,
error.Unexpected,
error.InvalidUtf8,
=> bun.sys.E.INVAL,
})),
.syscall = .watch,
while (true) {
const iteration = iter.next();
switch (iteration) {
.err => |err| {
var err2 = err;
err2.syscall = .watch;
return .{ .err = err2 };
},
};
}) |entry| {
var parts = [2]string{ path.path, entry.name };
const entry_path = Path.joinAbsStringBuf(
Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(),
buf,
&parts,
.auto,
);
.result => |entry_or_eof| if (entry_or_eof) |entry| {
var parts = [2]string{ path.path, entry.name.slice() };
const entry_path = Path.joinAbsStringBuf(
Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(),
buf,
&parts,
.auto,
);
buf[entry_path.len] = 0;
const entry_path_z = buf[0..entry_path.len :0];
buf[entry_path.len] = 0;
const entry_path_z = buf[0..entry_path.len :0];
const child_path = switch (manager._fdFromAbsolutePathZ(entry_path_z)) {
.result => |result| result,
.err => |e| return .{ .err = e },
};
{
watcher.mutex.lock();
defer watcher.mutex.unlock();
watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| {
manager._decrementPathRef(entry_path_z);
return switch (err) {
error.OutOfMemory => .{ .err = .{
.errno = @truncate(@intFromEnum(bun.sys.E.NOMEM)),
.syscall = .watch,
} },
const child_path = switch (manager._fdFromAbsolutePathZ(entry_path_z)) {
.result => |result| result,
.err => |e| return .{ .err = e },
};
};
}
// we need to call this unlocked
if (child_path.is_file) {
switch (manager.main_watcher.addFile(
child_path.fd,
child_path.path,
child_path.hash,
options.Loader.file,
.invalid,
null,
false,
)) {
.err => |err| return .{ .err = err },
.result => {},
}
} else {
if (watcher.recursive and !watcher.isClosed()) {
// this may trigger another thread with is desired when available to watch long trees
switch (manager._addDirectory(watcher, child_path)) {
.err => |err| return .{ .err = err },
.result => {},
{
watcher.mutex.lock();
defer watcher.mutex.unlock();
watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| {
manager._decrementPathRef(entry_path_z);
return switch (err) {
error.OutOfMemory => .{ .err = .{
.errno = @truncate(@intFromEnum(bun.sys.E.NOMEM)),
.syscall = .watch,
} },
};
};
}
}
// we need to call this unlocked
if (child_path.is_file) {
switch (manager.main_watcher.addFile(
child_path.fd,
child_path.path,
child_path.hash,
options.Loader.file,
.invalid,
null,
false,
)) {
.err => |err| return .{ .err = err.clone(bun.default_allocator) catch bun.outOfMemory() },
.result => {},
}
} else {
if (watcher.recursive and !watcher.isClosed()) {
// this may trigger another thread with is desired when available to watch long trees
switch (manager._addDirectory(watcher, child_path)) {
.err => |err| return .{ .err = err },
.result => {},
}
}
}
} else {
// EOF reached
break;
},
}
}
return .{ .result = {} };
@@ -511,13 +522,15 @@ pub const PathWatcherManager = struct {
return bun.todo(@src(), {});
}
var buf: bun.PathBuffer = undefined;
while (this.getNext()) |watcher| {
defer watcher.unrefPendingDirectory();
switch (this.processWatcher(watcher, &buf)) {
const buf = bun.PathBufferPool.get();
defer bun.PathBufferPool.put(buf);
switch (this.processWatcher(watcher, buf)) {
.err => |err| {
log("[watch] error registering directory: {s}", .{err});
var err2 = err.clone(bun.default_allocator) catch bun.outOfMemory();
defer err2.deinit();
watcher.emit(.{ .@"error" = err }, 0, std.time.milliTimestamp(), false);
watcher.flush();
},
@@ -624,43 +637,46 @@ pub const PathWatcherManager = struct {
// unregister is always called form main thread
fn unregisterWatcher(this: *PathWatcherManager, watcher: *PathWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
const should_deinit = brk: {
this.mutex.lock();
defer this.mutex.unlock();
var watchers = this.watchers.slice();
defer {
if (this.deinit_on_last_watcher and this.watcher_count == 0) {
this.deinit();
}
}
var watchers = this.watchers.slice();
for (watchers, 0..) |w, i| {
if (w) |item| {
if (item == watcher) {
watchers[i] = null;
// if is the last one just pop
if (i == watchers.len - 1) {
this.watchers.len -= 1;
}
this.watcher_count -= 1;
this._decrementPathRefNoLock(watcher.path.path);
if (comptime Environment.isMac) {
if (watcher.fsevents_watcher != null) {
break;
for (watchers, 0..) |w, i| {
if (w) |item| {
if (item == watcher) {
watchers[i] = null;
// if is the last one just pop
if (i == watchers.len - 1) {
this.watchers.len -= 1;
}
}
this.watcher_count -= 1;
{
watcher.mutex.lock();
defer watcher.mutex.unlock();
while (watcher.file_paths.pop()) |file_path| {
this._decrementPathRefNoLock(file_path);
this._decrementPathRefNoLock(watcher.path.path);
if (comptime Environment.isMac) {
if (watcher.fsevents_watcher != null) {
break;
}
}
{
watcher.mutex.lock();
defer watcher.mutex.unlock();
while (watcher.file_paths.pop()) |file_path| {
this._decrementPathRefNoLock(file_path);
}
}
break;
}
break;
}
}
break :brk this.deinit_on_last_watcher and this.watcher_count == 0;
};
if (should_deinit) {
this.deinit();
}
}
@@ -687,7 +703,8 @@ pub const PathWatcherManager = struct {
return;
}
this.main_watcher.deinit(false);
// Stop the watcher thread and wait for it to finish
this.main_watcher.deinit(true);
if (this.watcher_count > 0) {
while (this.watchers.pop()) |watcher| {
@@ -760,8 +777,9 @@ pub const PathWatcher = struct {
if (comptime Environment.isMac) {
if (!path.is_file) {
var buffer: bun.PathBuffer = undefined;
const resolved_path_temp = std.os.getFdPath(path.fd.cast(), &buffer) catch |err| {
const buffer = bun.PathBufferPool.get();
defer bun.PathBufferPool.put(buffer);
const resolved_path_temp = bun.sys.getFdPath(path.fd, buffer).unwrap() catch |err| {
bun.default_allocator.destroy(this);
return err;
};
@@ -845,11 +863,17 @@ pub const PathWatcher = struct {
}
pub fn unrefPendingDirectory(this: *PathWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
this.pending_directories -= 1;
if (this.isClosed() and this.pending_directories == 0) {
this.has_pending_directories.store(false, .release);
const should_deinit = brk: {
this.mutex.lock();
defer this.mutex.unlock();
this.pending_directories -= 1;
if (this.pending_directories == 0) {
this.has_pending_directories.store(false, .release);
}
break :brk this.pending_directories == 0 and this.closed.load(.acquire);
};
if (should_deinit) {
this.deinit();
}
}
@@ -963,7 +987,7 @@ pub fn watch(
.err => |_err| {
var err = _err;
err.syscall = .watch;
return .{ .err = err };
return .{ .err = err.clone(bun.default_allocator) catch bun.outOfMemory() };
},
};

View File

@@ -998,6 +998,7 @@ pub const CopyFileWindows = struct {
const globalThis = this.event_loop.global;
const promise = this.promise.swap();
const err_instance = err.toJSC(globalThis);
var event_loop = this.event_loop;
event_loop.enter();
defer event_loop.exit();
@@ -1109,9 +1110,10 @@ pub const CopyFileWindows = struct {
fn onMkdirpComplete(this: *CopyFileWindows) void {
this.event_loop.unrefConcurrently();
if (this.err) |err| {
if (bun.take(&this.err)) |err| {
this.throw(err);
bun.default_allocator.free(err.path);
var err2 = err;
err2.deinit();
return;
}

View File

@@ -25,6 +25,8 @@ pub const nocancel = struct {
pub extern "c" fn @"write$NOCANCEL"(fd: c.fd_t, buf: [*]const u8, nbyte: usize) isize;
pub extern "c" fn @"writev$NOCANCEL"(fd: c.fd_t, buf: [*]const std.posix.iovec_const, count: i32) isize;
pub extern "c" fn @"pwritev$NOCANCEL"(fd: c.fd_t, buf: [*]const std.posix.iovec_const, count: i32, offset: c.off_t) isize;
pub extern "c" fn @"poll$NOCANCEL"(fds: [*]std.posix.pollfd, nfds: c_int, timeout: c_int) isize;
pub extern "c" fn @"ppoll$NOCANCEL"(fds: [*]std.posix.pollfd, nfds: c_int, timeout: ?*const std.posix.timespec, sigmask: ?*const std.posix.sigset_t) isize;
};
pub const OSLog = opaque {

View File

@@ -352,12 +352,12 @@ pub fn batchedMoveTaskDone(this: *Mv, task: *ShellMvBatchedTask) void {
var exec = &this.state.executing;
if (task.err) |err| {
if (task.err) |*err| {
exec.error_signal.store(true, .seq_cst);
if (exec.err == null) {
exec.err = err;
exec.err = err.*;
} else {
bun.default_allocator.free(err.path);
err.deinit();
}
}

View File

@@ -581,7 +581,8 @@ pub const ShellRmTask = struct {
this.task_manager.err = err;
this.task_manager.error_signal.store(true, .seq_cst);
} else {
bun.default_allocator.free(err.path);
var err2 = err;
err2.deinit();
}
},
.result => {},
@@ -596,7 +597,7 @@ pub const ShellRmTask = struct {
this.task_manager.err = err;
this.task_manager.error_signal.store(true, .seq_cst);
} else {
bun.default_allocator.free(err.path);
this.task_manager.err.?.deinit();
}
}

View File

@@ -271,6 +271,7 @@ pub const Tag = enum(u8) {
futime,
pidfd_open,
poll,
ppoll,
watch,
scandir,
@@ -423,6 +424,18 @@ pub const Error = struct {
};
}
/// Only call this after it's been .clone()'d
pub fn deinit(this: *Error) void {
if (this.path.len > 0) {
bun.default_allocator.free(this.path);
this.path = "";
}
if (this.dest.len > 0) {
bun.default_allocator.free(this.dest);
this.dest = "";
}
}
pub inline fn withPathDest(this: Error, path: anytype, dest: anytype) Error {
if (std.meta.Child(@TypeOf(path)) == u16) {
@compileError("Do not pass WString path to withPathDest, it needs the path encoded as utf8 (path)");
@@ -2202,6 +2215,36 @@ pub fn recvNonBlock(fd: bun.FileDescriptor, buf: []u8) Maybe(usize) {
return recv(fd, buf, socket_flags_nonblock);
}
pub fn poll(fds: []std.posix.pollfd, timeout: i32) Maybe(usize) {
while (true) {
const rc = switch (Environment.os) {
.mac => darwin_nocancel.@"poll$NOCANCEL"(fds.ptr, fds.len, timeout),
.linux => linux.poll(fds.ptr, fds.len, timeout),
else => @compileError("poll is not implemented on this platform"),
};
if (Maybe(usize).errnoSys(rc, .poll)) |err| {
if (err.getErrno() == .INTR) continue;
return err;
}
return .{ .result = @as(usize, @intCast(rc)) };
}
}
pub fn ppoll(fds: []std.posix.pollfd, timeout: ?*std.posix.timespec, sigmask: ?*const std.posix.sigset_t) Maybe(usize) {
while (true) {
const rc = switch (Environment.os) {
.mac => darwin_nocancel.@"ppoll$NOCANCEL"(fds.ptr, fds.len, timeout, sigmask),
.linux => linux.ppoll(fds.ptr, fds.len, timeout, sigmask),
else => @compileError("ppoll is not implemented on this platform"),
};
if (Maybe(usize).errnoSys(rc, .ppoll)) |err| {
if (err.getErrno() == .INTR) continue;
return err;
}
return .{ .result = @as(usize, @intCast(rc)) };
}
}
pub fn recv(fd: bun.FileDescriptor, buf: []u8, flag: u32) Maybe(usize) {
const adjusted_len = @min(buf.len, max_count);
const debug_timer = bun.Output.DebugTimer.start();

View File

@@ -32,6 +32,11 @@ watch_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
/// nanoseconds
coalesce_interval: isize = 100_000,
/// Waker to signal the watcher thread
waker: bun.Async.Waker = undefined,
/// Whether the watcher is still running
running: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
pub const EventListIndex = c_int;
pub const Event = extern struct {
watch_descriptor: EventListIndex,
@@ -64,7 +69,7 @@ pub const Event = extern struct {
pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(EventListIndex) {
bun.assert(this.loaded);
const old_count = this.watch_count.fetchAdd(1, .release);
defer if (old_count == 0) Futex.wake(&this.watch_count, 10);
defer if (old_count == 0) this.waker.wake();
const watch_file_mask = IN.EXCL_UNLINK | IN.MOVE_SELF | IN.DELETE_SELF | IN.MOVED_TO | IN.MODIFY;
const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_file_mask);
log("inotify_add_watch({}) = {}", .{ this.fd, rc });
@@ -75,7 +80,7 @@ pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(Ev
pub fn watchDir(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(EventListIndex) {
bun.assert(this.loaded);
const old_count = this.watch_count.fetchAdd(1, .release);
defer if (old_count == 0) Futex.wake(&this.watch_count, 10);
defer if (old_count == 0) this.waker.wake();
const watch_dir_mask = IN.EXCL_UNLINK | IN.DELETE | IN.DELETE_SELF | IN.CREATE | IN.MOVE_SELF | IN.ONLYDIR | IN.MOVED_TO;
const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_dir_mask);
log("inotify_add_watch({}) = {}", .{ this.fd, rc });
@@ -100,6 +105,7 @@ pub fn init(this: *INotifyWatcher, _: []const u8) !void {
// TODO: convert to bun.sys.Error
this.fd = .fromNative(try std.posix.inotify_init1(IN.CLOEXEC));
this.eventlist_bytes = &(try bun.default_allocator.alignedAlloc(EventListBytes, @alignOf(Event), 1))[0];
this.waker = try bun.Async.Waker.init();
log("{} init", .{this.fd});
}
@@ -116,72 +122,105 @@ pub fn read(this: *INotifyWatcher) bun.JSC.Maybe([]const *align(1) Event) {
// We still don't correctly handle MOVED_FROM && MOVED_TO it seems.
var i: u32 = 0;
const read_eventlist_bytes = if (this.read_ptr) |ptr| brk: {
Futex.waitForever(&this.watch_count, 0);
i = ptr.i;
break :brk this.eventlist_bytes[0..ptr.len];
} else outer: while (true) {
Futex.waitForever(&this.watch_count, 0);
// Check if we should stop
if (!this.running.load(.acquire)) return .{ .result = &.{} };
const rc = std.posix.system.read(
this.fd.cast(),
this.eventlist_bytes,
this.eventlist_bytes.len,
);
const errno = std.posix.errno(rc);
switch (errno) {
.SUCCESS => {
var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)];
log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len });
if (read_eventlist_bytes.len == 0) return .{ .result = &.{} };
// Check if watch_count is 0, if so wait for waker
const count = this.watch_count.load(.acquire);
if (count == 0) {
// Wait on just the waker fd since there are no watches
var fds = [_]std.posix.pollfd{.{
.fd = this.waker.getFd().cast(),
.events = std.posix.POLL.IN,
.revents = 0,
}};
// IN_MODIFY is very noisy
// we do a 0.1ms sleep to try to coalesce events better
const double_read_threshold = Event.largest_size * (max_count / 2);
if (read_eventlist_bytes.len < double_read_threshold) {
var fds = [_]std.posix.pollfd{.{
.fd = this.fd.cast(),
.events = std.posix.POLL.IN | std.posix.POLL.ERR,
.revents = 0,
}};
var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval };
if ((std.posix.ppoll(&fds, &timespec, null) catch 0) > 0) {
inner: while (true) {
const rest = this.eventlist_bytes[read_eventlist_bytes.len..];
bun.assert(rest.len > 0);
const new_rc = std.posix.system.read(this.fd.cast(), rest.ptr, rest.len);
// Output.warn("wapa {} {} = {}", .{ this.fd, rest.len, new_rc });
const e = std.posix.errno(new_rc);
switch (e) {
.SUCCESS => {
read_eventlist_bytes.len += @intCast(new_rc);
break :outer read_eventlist_bytes;
},
.AGAIN, .INTR => continue :inner,
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(e)),
.syscall = .read,
} },
const poll_rc = switch (bun.sys.poll(&fds, -1)) {
.result => |rc| rc,
.err => |err| return .{ .err = err },
};
if (poll_rc > 0 and (fds[0].revents & std.posix.POLL.IN) != 0) {
// Consume the waker
this.waker.wait();
}
// Check again if we should stop or if watches were added
if (!this.running.load(.acquire)) return .{ .result = &.{} };
continue :outer;
}
// Wait on both inotify fd and waker fd
var fds = [_]std.posix.pollfd{
.{
.fd = this.fd.cast(),
.events = std.posix.POLL.IN,
.revents = 0,
},
.{
.fd = this.waker.getFd().cast(),
.events = std.posix.POLL.IN,
.revents = 0,
},
};
const poll_rc = switch (bun.sys.poll(&fds, -1)) {
.result => |rc| rc,
.err => |err| return .{ .err = err },
};
if (poll_rc > 0) {
// Check if waker was signaled
if ((fds[1].revents & std.posix.POLL.IN) != 0) {
// Consume the waker
this.waker.wait();
// Check if we should stop
if (!this.running.load(.acquire)) return .{ .result = &.{} };
}
// Check if inotify has events
if ((fds[0].revents & std.posix.POLL.IN) != 0) {
switch (bun.sys.read(
this.fd,
this.eventlist_bytes,
)) {
.result => |rc| {
var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)];
log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len });
if (read_eventlist_bytes.len == 0) return .{ .result = &.{} };
// IN_MODIFY is very noisy
// we do a 0.1ms sleep to try to coalesce events better
const double_read_threshold = Event.largest_size * (max_count / 2);
if (read_eventlist_bytes.len < double_read_threshold) {
var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval };
if ((bun.sys.ppoll(fds[0..1], &timespec, null).unwrap() catch 0) > 0) {
const rest = this.eventlist_bytes[read_eventlist_bytes.len..];
bun.assert(rest.len > 0);
switch (bun.sys.read(this.fd, rest)) {
.result => |rc2| {
read_eventlist_bytes.len += @intCast(rc2);
break :outer read_eventlist_bytes;
},
.err => |err| return .{ .err = err },
}
}
}
}
}
break :outer read_eventlist_bytes;
},
.AGAIN, .INTR => continue :outer,
.INVAL => {
if (Environment.isDebug) {
bun.Output.err("EINVAL", "inotify read({}, {d})", .{ this.fd, this.eventlist_bytes.len });
break :outer read_eventlist_bytes;
},
.err => |err| {
if (err.getErrno() == .AGAIN) {
continue :outer;
}
return .{ .err = err };
},
}
return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} };
},
else => return .{ .err = .{
.errno = @truncate(@intFromEnum(errno)),
.syscall = .read,
} },
}
}
};
@@ -213,11 +252,17 @@ pub fn read(this: *INotifyWatcher) bun.JSC.Maybe([]const *align(1) Event) {
}
}
// Clear read_ptr if we've processed all buffered events
this.read_ptr = null;
return .{ .result = this.eventlist_ptrs[0..count] };
}
pub fn stop(this: *INotifyWatcher) void {
log("{} stop", .{this.fd});
this.running.store(false, .release);
// Wake up any threads waiting in read()
this.waker.wake();
if (this.fd != bun.invalid_fd) {
this.fd.close();
this.fd = bun.invalid_fd;
@@ -236,6 +281,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
// TODO: is this thread safe?
var remaining_events = events.len;
var event_offset: usize = 0;
const eventlist_index = this.watchlist.items(.eventlist_index);
@@ -244,7 +290,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
var temp_name_list: [128]?[:0]u8 = undefined;
var temp_name_off: u8 = 0;
const slice = events[0..@min(128, remaining_events, this.watch_events.len)];
const slice = events[event_offset..][0..@min(128, remaining_events, this.watch_events.len)];
var watchevents = this.watch_events[0..slice.len];
var watch_event_id: u32 = 0;
for (slice) |event| {
@@ -297,6 +343,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
} else {
break;
}
event_offset += slice.len;
remaining_events -= slice.len;
}
@@ -319,7 +366,6 @@ const std = @import("std");
const bun = @import("bun");
const Environment = bun.Environment;
const Output = bun.Output;
const Futex = bun.Futex;
const system = std.posix.system;
const IN = std.os.linux.IN;