mirror of
https://github.com/oven-sh/bun
synced 2026-02-04 16:08:53 +00:00
Compare commits
16 Commits
dylan/byte
...
jarred/fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8be29136e8 | ||
|
|
c776932a92 | ||
|
|
5432c5814e | ||
|
|
58b8201fbe | ||
|
|
e8a0136501 | ||
|
|
ed8ac14385 | ||
|
|
b253214e00 | ||
|
|
45b36de60c | ||
|
|
3eff966276 | ||
|
|
dabc14b7e9 | ||
|
|
e99a7d634f | ||
|
|
5e71496065 | ||
|
|
35608a9ced | ||
|
|
c0d9acae3c | ||
|
|
3f43dbcc80 | ||
|
|
c8126a2b47 |
@@ -103,10 +103,15 @@ pub fn start(this: *Watcher) !void {
|
||||
|
||||
pub fn deinit(this: *Watcher, close_descriptors: bool) void {
|
||||
if (this.watchloop_handle != null) {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
this.close_descriptors = close_descriptors;
|
||||
this.running = false;
|
||||
{
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
this.close_descriptors = close_descriptors;
|
||||
this.running = false;
|
||||
}
|
||||
|
||||
// Wait for the watcher thread to finish (mutex is unlocked here)
|
||||
this.thread.join();
|
||||
} else {
|
||||
if (close_descriptors and this.running) {
|
||||
const fds = this.watchlist.items(.fd);
|
||||
|
||||
@@ -22,7 +22,8 @@ pub const KeepAlive = struct {
|
||||
|
||||
/// Make calling ref() on this poll into a no-op.
|
||||
pub fn disable(this: *KeepAlive) void {
|
||||
this.unref(JSC.VirtualMachine.get());
|
||||
if (this.status == .active)
|
||||
this.unref(JSC.VirtualMachine.get());
|
||||
this.status = .done;
|
||||
}
|
||||
|
||||
|
||||
@@ -269,7 +269,7 @@ pub const Async = struct {
|
||||
this.result = @field(NodeFS, "uv_" ++ @tagName(FunctionEnum))(&node_fs, this.args, @intFromEnum(req.result));
|
||||
|
||||
if (this.result == .err) {
|
||||
this.result.err.path = bun.default_allocator.dupe(u8, this.result.err.path) catch "";
|
||||
this.result.err = this.result.err.clone(bun.default_allocator) catch bun.outOfMemory();
|
||||
std.mem.doNotOptimizeAway(&node_fs);
|
||||
}
|
||||
|
||||
@@ -283,7 +283,7 @@ pub const Async = struct {
|
||||
this.result = @field(NodeFS, "uv_" ++ @tagName(FunctionEnum))(&node_fs, this.args, req, @intFromEnum(req.result));
|
||||
|
||||
if (this.result == .err) {
|
||||
this.result.err.path = bun.default_allocator.dupe(u8, this.result.err.path) catch "";
|
||||
this.result.err = this.result.err.clone(bun.default_allocator) catch bun.outOfMemory();
|
||||
std.mem.doNotOptimizeAway(&node_fs);
|
||||
}
|
||||
|
||||
@@ -320,7 +320,7 @@ pub const Async = struct {
|
||||
|
||||
pub fn deinit(this: *Task) void {
|
||||
if (this.result == .err) {
|
||||
bun.default_allocator.free(this.result.err.path);
|
||||
this.result.err.deinit();
|
||||
}
|
||||
|
||||
this.ref.unref(this.globalObject.bunVM());
|
||||
@@ -382,7 +382,7 @@ pub const Async = struct {
|
||||
this.result = function(&node_fs, this.args, .@"async");
|
||||
|
||||
if (this.result == .err) {
|
||||
this.result.err.path = bun.default_allocator.dupe(u8, this.result.err.path) catch "";
|
||||
this.result.err = this.result.err.clone(bun.default_allocator) catch bun.outOfMemory();
|
||||
std.mem.doNotOptimizeAway(&node_fs);
|
||||
}
|
||||
|
||||
@@ -428,7 +428,7 @@ pub const Async = struct {
|
||||
|
||||
pub fn deinit(this: *Task) void {
|
||||
if (this.result == .err) {
|
||||
bun.default_allocator.free(this.result.err.path);
|
||||
this.result.err.deinit();
|
||||
}
|
||||
|
||||
this.ref.unref(this.globalObject.bunVM());
|
||||
@@ -467,7 +467,6 @@ pub fn NewAsyncCpTask(comptime is_shell: bool) type {
|
||||
/// When each task is finished, decrement.
|
||||
/// The maintask thread starts this at 1 and decrements it at the end, to avoid the promise being resolved while new tasks may be added.
|
||||
subtask_count: std.atomic.Value(usize),
|
||||
deinitialized: bool = false,
|
||||
|
||||
shelltask: ShellTaskT,
|
||||
|
||||
@@ -643,7 +642,7 @@ pub fn NewAsyncCpTask(comptime is_shell: bool) type {
|
||||
this.result = result;
|
||||
|
||||
if (this.result == .err) {
|
||||
this.result.err.path = bun.default_allocator.dupe(u8, this.result.err.path) catch "";
|
||||
this.result.err = this.result.err.clone(bun.default_allocator) catch bun.outOfMemory();
|
||||
}
|
||||
|
||||
if (this.evtloop == .js) {
|
||||
@@ -693,8 +692,9 @@ pub fn NewAsyncCpTask(comptime is_shell: bool) type {
|
||||
}
|
||||
|
||||
pub fn deinit(this: *ThisAsyncCpTask) void {
|
||||
bun.assert(!this.deinitialized);
|
||||
this.deinitialized = true;
|
||||
if (this.result == .err) {
|
||||
this.result.err.deinit();
|
||||
}
|
||||
if (comptime !is_shell) this.ref.unref(this.evtloop);
|
||||
this.args.deinit();
|
||||
this.promise.deinit();
|
||||
@@ -1248,7 +1248,7 @@ pub const AsyncReaddirRecursiveTask = struct {
|
||||
pub fn deinit(this: *AsyncReaddirRecursiveTask) void {
|
||||
bun.assert(this.root_fd == bun.invalid_fd); // should already have closed it
|
||||
if (this.pending_err) |*err| {
|
||||
bun.default_allocator.free(err.path);
|
||||
err.deinit();
|
||||
}
|
||||
|
||||
this.ref.unref(this.globalObject.bunVM());
|
||||
@@ -5951,11 +5951,7 @@ pub const NodeFS = struct {
|
||||
pub fn watch(_: *NodeFS, args: Arguments.Watch, _: Flavor) Maybe(Return.Watch) {
|
||||
return switch (args.createFSWatcher()) {
|
||||
.result => |result| .{ .result = result.js_this },
|
||||
.err => |err| .{ .err = .{
|
||||
.errno = err.errno,
|
||||
.syscall = err.syscall,
|
||||
.path = if (err.path.len > 0) args.path.slice() else "",
|
||||
} },
|
||||
.err => |err| .{ .err = err },
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,13 @@ pub const FSWatcher = struct {
|
||||
|
||||
/// While it's not closed, the pending activity
|
||||
pending_activity_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
|
||||
current_task: FSWatchTask = undefined,
|
||||
current_task: FSWatchTask = .empty,
|
||||
|
||||
// NOTE: this may equal `.current_task`. do not free it!
|
||||
// if we create multiple on the same tick we use this buffer.
|
||||
last_task: ?*FSWatchTask = null,
|
||||
|
||||
js_thread: std.Thread.Id,
|
||||
|
||||
pub fn eventLoop(this: FSWatcher) *EventLoop {
|
||||
return this.ctx.eventLoop();
|
||||
@@ -60,20 +66,32 @@ pub const FSWatcher = struct {
|
||||
pub const finalize = deinit;
|
||||
|
||||
pub const FSWatchTask = if (Environment.isWindows) FSWatchTaskWindows else FSWatchTaskPosix;
|
||||
pub const FSWatchTaskPosix = struct {
|
||||
pub const new = bun.TrivialNew(@This());
|
||||
|
||||
const FSWatchTaskPosix = struct {
|
||||
concurrent_task: JSC.EventLoopTask = .{ .mini = .{ .callback = undefined } },
|
||||
ctx: *FSWatcher,
|
||||
count: u8 = 0,
|
||||
count: u8,
|
||||
entries: [BATCH_SIZE]Entry = undefined,
|
||||
// Track if we need to perform ref operation on main thread
|
||||
needs_ref: bool = false,
|
||||
|
||||
entries: [8]Entry = undefined,
|
||||
concurrent_task: JSC.ConcurrentTask = undefined,
|
||||
pub const empty = FSWatchTask{
|
||||
.concurrent_task = .{ .mini = .{ .callback = undefined } },
|
||||
.ctx = undefined,
|
||||
.count = 0,
|
||||
.entries = undefined,
|
||||
.needs_ref = false,
|
||||
};
|
||||
|
||||
pub const Entry = struct {
|
||||
const Entry = struct {
|
||||
event: Event,
|
||||
needs_free: bool,
|
||||
};
|
||||
|
||||
const BATCH_SIZE = 10;
|
||||
|
||||
pub const new = bun.TrivialNew(@This());
|
||||
|
||||
pub fn deinit(this: *FSWatchTask) void {
|
||||
this.cleanEntries();
|
||||
if (comptime Environment.allow_assert) {
|
||||
@@ -82,13 +100,15 @@ pub const FSWatcher = struct {
|
||||
bun.destroy(this);
|
||||
}
|
||||
|
||||
pub fn append(this: *FSWatchTask, event: Event, needs_free: bool) void {
|
||||
if (this.count == 8) {
|
||||
pub fn append(this: *FSWatchTaskPosix, event: Event, needs_free: bool) void {
|
||||
if (this.count >= this.entries.len) {
|
||||
this.enqueue();
|
||||
const ctx = this.ctx;
|
||||
this.* = .{
|
||||
.ctx = ctx,
|
||||
.count = 0,
|
||||
.concurrent_task = undefined,
|
||||
.needs_ref = false,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -101,25 +121,35 @@ pub const FSWatcher = struct {
|
||||
|
||||
pub fn run(this: *FSWatchTask) void {
|
||||
// this runs on JS Context Thread
|
||||
const ctx = this.ctx;
|
||||
|
||||
// Perform ref operation on main thread if needed
|
||||
if (this.needs_ref and ctx.persistent) {
|
||||
ctx.poll_ref.ref(ctx.ctx);
|
||||
}
|
||||
|
||||
for (this.entries[0..this.count]) |entry| {
|
||||
switch (entry.event) {
|
||||
inline .rename, .change => |file_path, t| {
|
||||
this.ctx.emit(file_path, t);
|
||||
if (comptime Environment.isWindows) {
|
||||
unreachable; // Windows uses FSWatchTaskWindows
|
||||
} else {
|
||||
ctx.emit(file_path, t);
|
||||
}
|
||||
},
|
||||
.@"error" => |err| {
|
||||
this.ctx.emitError(err);
|
||||
ctx.emitError(err);
|
||||
},
|
||||
.abort => {
|
||||
this.ctx.emitIfAborted();
|
||||
ctx.emitIfAborted();
|
||||
},
|
||||
.close => {
|
||||
this.ctx.emit("", .close);
|
||||
ctx.emit("", .close);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
this.ctx.unrefTask();
|
||||
ctx.unrefTaskMainThread();
|
||||
}
|
||||
|
||||
pub fn appendAbort(this: *FSWatchTask) void {
|
||||
@@ -131,17 +161,33 @@ pub const FSWatcher = struct {
|
||||
if (this.count == 0)
|
||||
return;
|
||||
|
||||
// if false is closed or detached (can still contain valid refs but will not create a new one)
|
||||
if (this.ctx.refTask()) {
|
||||
var that = FSWatchTask.new(this.*);
|
||||
this.count = 0;
|
||||
that.concurrent_task.task = JSC.Task.init(that);
|
||||
this.ctx.enqueueTaskConcurrent(&that.concurrent_task);
|
||||
// Check if we can proceed from watcher thread
|
||||
const ctx = this.ctx;
|
||||
ctx.mutex.lock();
|
||||
defer ctx.mutex.unlock();
|
||||
|
||||
if (ctx.closed) {
|
||||
this.cleanEntries();
|
||||
return;
|
||||
}
|
||||
// closed or detached so just cleanEntries
|
||||
this.cleanEntries();
|
||||
|
||||
// Track pending activity
|
||||
const current = ctx.pending_activity_count.fetchAdd(1, .monotonic);
|
||||
|
||||
var that = FSWatchTask.new(this.*);
|
||||
// Clear the count before enqueueing to avoid double processing
|
||||
const saved_count = this.count;
|
||||
this.count = 0;
|
||||
that.count = saved_count;
|
||||
|
||||
// If we went from 0 to 1, we need to ref on the main thread
|
||||
that.needs_ref = (current == 0);
|
||||
|
||||
that.concurrent_task = JSC.EventLoopTask.init(.js);
|
||||
that.concurrent_task.js.task = JSC.Task.init(that);
|
||||
ctx.enqueueTaskConcurrent(&that.concurrent_task.js);
|
||||
}
|
||||
|
||||
pub fn cleanEntries(this: *FSWatchTask) void {
|
||||
for (this.entries[0..this.count]) |*entry| {
|
||||
if (entry.needs_free) {
|
||||
@@ -167,6 +213,7 @@ pub const FSWatcher = struct {
|
||||
pub fn dupe(event: Event) !Event {
|
||||
return switch (event) {
|
||||
inline .rename, .change => |path, t| @unionInit(Event, @tagName(t), try bun.default_allocator.dupe(u8, path)),
|
||||
.@"error" => |err| @unionInit(Event, @tagName(.@"error"), try err.clone(bun.default_allocator)),
|
||||
inline else => |value, t| @unionInit(Event, @tagName(t), value),
|
||||
};
|
||||
}
|
||||
@@ -177,6 +224,7 @@ pub const FSWatcher = struct {
|
||||
else => bun.default_allocator.free(path.*),
|
||||
.windows => path.deinit(),
|
||||
},
|
||||
.@"error" => |*err| err.deinit(),
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
@@ -206,6 +254,10 @@ pub const FSWatcher = struct {
|
||||
/// Unused: To match the API of the posix version
|
||||
count: u0 = 0,
|
||||
|
||||
pub const empty = FSWatchTaskWindows{
|
||||
.ctx = undefined,
|
||||
};
|
||||
|
||||
pub const StringOrBytesToDecode = union(enum) {
|
||||
string: bun.String,
|
||||
bytes_to_free: []const u8,
|
||||
@@ -432,9 +484,9 @@ pub const FSWatcher = struct {
|
||||
};
|
||||
|
||||
pub fn initJS(this: *FSWatcher, listener: JSC.JSValue) void {
|
||||
// We're on main thread during init
|
||||
if (this.persistent) {
|
||||
this.poll_ref.ref(this.ctx);
|
||||
_ = this.pending_activity_count.fetchAdd(1, .monotonic);
|
||||
}
|
||||
|
||||
const js_this = this.toJS(this.globalThis);
|
||||
@@ -446,9 +498,8 @@ pub const FSWatcher = struct {
|
||||
// already aborted?
|
||||
if (s.aborted()) {
|
||||
// safely abort next tick
|
||||
this.current_task = .{
|
||||
.ctx = this,
|
||||
};
|
||||
this.current_task = .empty;
|
||||
this.current_task.ctx = this;
|
||||
this.current_task.appendAbort();
|
||||
} else {
|
||||
// watch for abortion
|
||||
@@ -554,6 +605,7 @@ pub const FSWatcher = struct {
|
||||
pub fn doRef(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
if (!this.closed and !this.persistent) {
|
||||
this.persistent = true;
|
||||
// We're on main thread here
|
||||
this.poll_ref.ref(this.ctx);
|
||||
}
|
||||
return .undefined;
|
||||
@@ -562,6 +614,7 @@ pub const FSWatcher = struct {
|
||||
pub fn doUnref(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
if (this.persistent) {
|
||||
this.persistent = false;
|
||||
// We're on main thread here
|
||||
this.poll_ref.unref(this.ctx);
|
||||
}
|
||||
return .undefined;
|
||||
@@ -571,13 +624,31 @@ pub const FSWatcher = struct {
|
||||
return JSC.JSValue.jsBoolean(this.persistent);
|
||||
}
|
||||
|
||||
// this can be called from Watcher Thread or JS Context Thread
|
||||
// Only call from main thread
|
||||
pub fn refTaskMainThread(this: *FSWatcher) void {
|
||||
if (this.persistent) {
|
||||
this.poll_ref.ref(this.ctx);
|
||||
}
|
||||
}
|
||||
|
||||
// Only call from main thread
|
||||
pub fn unrefTaskMainThread(this: *FSWatcher) void {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
|
||||
const current = this.pending_activity_count.fetchSub(1, .monotonic);
|
||||
// If we went from 1 to 0, we need to unref the poll
|
||||
if (current == 1 and this.persistent) {
|
||||
this.poll_ref.unref(this.ctx);
|
||||
}
|
||||
}
|
||||
|
||||
// Can be called from any thread - delegates to main thread version when on JS thread
|
||||
pub fn refTask(this: *FSWatcher) bool {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
if (this.closed) return false;
|
||||
_ = this.pending_activity_count.fetchAdd(1, .monotonic);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -585,11 +656,15 @@ pub const FSWatcher = struct {
|
||||
return this.pending_activity_count.load(.acquire) > 0;
|
||||
}
|
||||
|
||||
// Can be called from any thread - delegates to main thread version when on JS thread
|
||||
pub fn unrefTask(this: *FSWatcher) void {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
// JSC eventually will free it
|
||||
_ = this.pending_activity_count.fetchSub(1, .monotonic);
|
||||
// If we're on the JS thread (where we have access to the VM), use the main thread version
|
||||
// Otherwise just decrement the count
|
||||
if (this.js_thread == std.Thread.getCurrentId()) {
|
||||
this.unrefTaskMainThread();
|
||||
} else {
|
||||
_ = this.pending_activity_count.fetchSub(1, .monotonic);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close(this: *FSWatcher) void {
|
||||
@@ -602,14 +677,18 @@ pub const FSWatcher = struct {
|
||||
|
||||
if (js_this != .zero) {
|
||||
if (FSWatcher.js.listenerGetCached(js_this)) |listener| {
|
||||
_ = this.refTask();
|
||||
// We're on main thread during close
|
||||
const current = this.pending_activity_count.fetchAdd(1, .monotonic);
|
||||
if (current == 0 and this.persistent) {
|
||||
this.poll_ref.ref(this.ctx);
|
||||
}
|
||||
log("emit('close')", .{});
|
||||
emitJS(listener, this.globalThis, .undefined, .close);
|
||||
this.unrefTask();
|
||||
this.unrefTaskMainThread();
|
||||
}
|
||||
}
|
||||
|
||||
this.unrefTask();
|
||||
this.unrefTaskMainThread();
|
||||
} else {
|
||||
this.mutex.unlock();
|
||||
}
|
||||
@@ -624,7 +703,7 @@ pub const FSWatcher = struct {
|
||||
|
||||
if (this.persistent) {
|
||||
this.persistent = false;
|
||||
this.poll_ref.unref(this.ctx);
|
||||
this.poll_ref.disable();
|
||||
}
|
||||
|
||||
if (this.signal) |signal| {
|
||||
@@ -641,30 +720,34 @@ pub const FSWatcher = struct {
|
||||
}
|
||||
|
||||
pub fn init(args: Arguments) bun.JSC.Maybe(*FSWatcher) {
|
||||
var buf: bun.PathBuffer = undefined;
|
||||
var slice = args.path.slice();
|
||||
if (bun.strings.startsWith(slice, "file://")) {
|
||||
slice = slice[6..];
|
||||
}
|
||||
var joined_buf = bun.PathBufferPool.get();
|
||||
defer bun.PathBufferPool.put(joined_buf);
|
||||
const file_path = brk: {
|
||||
var buf = bun.PathBufferPool.get();
|
||||
defer bun.PathBufferPool.put(buf);
|
||||
|
||||
var parts = [_]string{
|
||||
slice,
|
||||
var slice = args.path.slice();
|
||||
if (bun.strings.startsWith(slice, "file://")) {
|
||||
slice = slice[6..];
|
||||
}
|
||||
|
||||
var parts = [_]string{
|
||||
slice,
|
||||
};
|
||||
|
||||
const cwd = switch (bun.sys.getcwd(buf)) {
|
||||
.result => |r| r,
|
||||
.err => |err| return .{ .err = err },
|
||||
};
|
||||
buf[cwd.len] = std.fs.path.sep;
|
||||
break :brk Path.joinAbsStringBuf(
|
||||
buf[0 .. cwd.len + 1],
|
||||
joined_buf,
|
||||
&parts,
|
||||
.auto,
|
||||
);
|
||||
};
|
||||
|
||||
const cwd = switch (bun.sys.getcwd(&buf)) {
|
||||
.result => |r| r,
|
||||
.err => |err| return .{ .err = err },
|
||||
};
|
||||
buf[cwd.len] = std.fs.path.sep;
|
||||
|
||||
var joined_buf: bun.PathBuffer = undefined;
|
||||
const file_path = Path.joinAbsStringBuf(
|
||||
buf[0 .. cwd.len + 1],
|
||||
&joined_buf,
|
||||
&parts,
|
||||
.auto,
|
||||
);
|
||||
|
||||
joined_buf[file_path.len] = 0;
|
||||
const file_path_z = joined_buf[0..file_path.len :0];
|
||||
|
||||
@@ -672,19 +755,16 @@ pub const FSWatcher = struct {
|
||||
|
||||
const ctx = bun.new(FSWatcher, .{
|
||||
.ctx = vm,
|
||||
.current_task = .{
|
||||
.ctx = undefined,
|
||||
.count = 0,
|
||||
},
|
||||
.mutex = .{},
|
||||
.signal = if (args.signal) |s| s.ref() else null,
|
||||
.persistent = args.persistent,
|
||||
.closed = false,
|
||||
.path_watcher = null,
|
||||
.globalThis = args.global_this,
|
||||
.js_this = .zero,
|
||||
.encoding = args.encoding,
|
||||
.closed = false,
|
||||
.verbose = args.verbose,
|
||||
.encoding = args.encoding,
|
||||
.js_thread = std.Thread.getCurrentId(),
|
||||
});
|
||||
ctx.current_task.ctx = ctx;
|
||||
|
||||
@@ -692,6 +772,8 @@ pub const FSWatcher = struct {
|
||||
switch (PathWatcher.watch(vm, file_path_z, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx))) {
|
||||
.result => |r| r,
|
||||
.err => |err| {
|
||||
var err2 = err;
|
||||
defer err2.deinit();
|
||||
ctx.deinit();
|
||||
return .{ .err = .{
|
||||
.errno = err.errno,
|
||||
|
||||
@@ -59,11 +59,22 @@ pub const PathWatcherManager = struct {
|
||||
}
|
||||
|
||||
fn unrefPendingTask(this: *PathWatcherManager) void {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
this.pending_tasks -= 1;
|
||||
if (this.deinit_on_last_task and this.pending_tasks == 0) {
|
||||
this.has_pending_tasks.store(false, .release);
|
||||
const should_deinit = brk: {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
|
||||
const pending_task_count = this.pending_tasks;
|
||||
bun.debugAssert(pending_task_count > 0);
|
||||
this.pending_tasks = pending_task_count - 1;
|
||||
|
||||
if (pending_task_count == 1) {
|
||||
this.has_pending_tasks.store(false, .release);
|
||||
break :brk this.deinit_on_last_task;
|
||||
}
|
||||
break :brk false;
|
||||
};
|
||||
|
||||
if (should_deinit) {
|
||||
this.deinit();
|
||||
}
|
||||
}
|
||||
@@ -323,7 +334,7 @@ pub const PathWatcherManager = struct {
|
||||
const watchers = this.watchers.slice();
|
||||
const timestamp = std.time.milliTimestamp();
|
||||
|
||||
// stop all watchers
|
||||
// stop all JS watchers by emitting the error
|
||||
for (watchers) |w| {
|
||||
if (w) |watcher| {
|
||||
log("[watch] error: {}", .{err});
|
||||
@@ -432,75 +443,75 @@ pub const PathWatcherManager = struct {
|
||||
const manager = this.manager;
|
||||
const path = this.path;
|
||||
const fd = path.fd;
|
||||
var iter = fd.stdDir().iterate();
|
||||
var iter = bun.DirIterator.iterate(fd.stdDir(), .u8);
|
||||
|
||||
// now we iterate over all files and directories
|
||||
while (iter.next() catch |err| {
|
||||
return .{
|
||||
.err = .{
|
||||
.errno = @truncate(@intFromEnum(switch (err) {
|
||||
error.AccessDenied => bun.sys.E.ACCES,
|
||||
error.SystemResources => bun.sys.E.NOMEM,
|
||||
error.Unexpected,
|
||||
error.InvalidUtf8,
|
||||
=> bun.sys.E.INVAL,
|
||||
})),
|
||||
.syscall = .watch,
|
||||
while (true) {
|
||||
const iteration = iter.next();
|
||||
switch (iteration) {
|
||||
.err => |err| {
|
||||
var err2 = err;
|
||||
err2.syscall = .watch;
|
||||
return .{ .err = err2 };
|
||||
},
|
||||
};
|
||||
}) |entry| {
|
||||
var parts = [2]string{ path.path, entry.name };
|
||||
const entry_path = Path.joinAbsStringBuf(
|
||||
Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(),
|
||||
buf,
|
||||
&parts,
|
||||
.auto,
|
||||
);
|
||||
.result => |entry_or_eof| if (entry_or_eof) |entry| {
|
||||
var parts = [2]string{ path.path, entry.name.slice() };
|
||||
const entry_path = Path.joinAbsStringBuf(
|
||||
Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(),
|
||||
buf,
|
||||
&parts,
|
||||
.auto,
|
||||
);
|
||||
|
||||
buf[entry_path.len] = 0;
|
||||
const entry_path_z = buf[0..entry_path.len :0];
|
||||
buf[entry_path.len] = 0;
|
||||
const entry_path_z = buf[0..entry_path.len :0];
|
||||
|
||||
const child_path = switch (manager._fdFromAbsolutePathZ(entry_path_z)) {
|
||||
.result => |result| result,
|
||||
.err => |e| return .{ .err = e },
|
||||
};
|
||||
|
||||
{
|
||||
watcher.mutex.lock();
|
||||
defer watcher.mutex.unlock();
|
||||
watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| {
|
||||
manager._decrementPathRef(entry_path_z);
|
||||
return switch (err) {
|
||||
error.OutOfMemory => .{ .err = .{
|
||||
.errno = @truncate(@intFromEnum(bun.sys.E.NOMEM)),
|
||||
.syscall = .watch,
|
||||
} },
|
||||
const child_path = switch (manager._fdFromAbsolutePathZ(entry_path_z)) {
|
||||
.result => |result| result,
|
||||
.err => |e| return .{ .err = e },
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
// we need to call this unlocked
|
||||
if (child_path.is_file) {
|
||||
switch (manager.main_watcher.addFile(
|
||||
child_path.fd,
|
||||
child_path.path,
|
||||
child_path.hash,
|
||||
options.Loader.file,
|
||||
.invalid,
|
||||
null,
|
||||
false,
|
||||
)) {
|
||||
.err => |err| return .{ .err = err },
|
||||
.result => {},
|
||||
}
|
||||
} else {
|
||||
if (watcher.recursive and !watcher.isClosed()) {
|
||||
// this may trigger another thread with is desired when available to watch long trees
|
||||
switch (manager._addDirectory(watcher, child_path)) {
|
||||
.err => |err| return .{ .err = err },
|
||||
.result => {},
|
||||
{
|
||||
watcher.mutex.lock();
|
||||
defer watcher.mutex.unlock();
|
||||
watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| {
|
||||
manager._decrementPathRef(entry_path_z);
|
||||
return switch (err) {
|
||||
error.OutOfMemory => .{ .err = .{
|
||||
.errno = @truncate(@intFromEnum(bun.sys.E.NOMEM)),
|
||||
.syscall = .watch,
|
||||
} },
|
||||
};
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// we need to call this unlocked
|
||||
if (child_path.is_file) {
|
||||
switch (manager.main_watcher.addFile(
|
||||
child_path.fd,
|
||||
child_path.path,
|
||||
child_path.hash,
|
||||
options.Loader.file,
|
||||
.invalid,
|
||||
null,
|
||||
false,
|
||||
)) {
|
||||
.err => |err| return .{ .err = err.clone(bun.default_allocator) catch bun.outOfMemory() },
|
||||
.result => {},
|
||||
}
|
||||
} else {
|
||||
if (watcher.recursive and !watcher.isClosed()) {
|
||||
// this may trigger another thread with is desired when available to watch long trees
|
||||
switch (manager._addDirectory(watcher, child_path)) {
|
||||
.err => |err| return .{ .err = err },
|
||||
.result => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// EOF reached
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
return .{ .result = {} };
|
||||
@@ -511,13 +522,15 @@ pub const PathWatcherManager = struct {
|
||||
return bun.todo(@src(), {});
|
||||
}
|
||||
|
||||
var buf: bun.PathBuffer = undefined;
|
||||
|
||||
while (this.getNext()) |watcher| {
|
||||
defer watcher.unrefPendingDirectory();
|
||||
switch (this.processWatcher(watcher, &buf)) {
|
||||
const buf = bun.PathBufferPool.get();
|
||||
defer bun.PathBufferPool.put(buf);
|
||||
switch (this.processWatcher(watcher, buf)) {
|
||||
.err => |err| {
|
||||
log("[watch] error registering directory: {s}", .{err});
|
||||
var err2 = err.clone(bun.default_allocator) catch bun.outOfMemory();
|
||||
defer err2.deinit();
|
||||
watcher.emit(.{ .@"error" = err }, 0, std.time.milliTimestamp(), false);
|
||||
watcher.flush();
|
||||
},
|
||||
@@ -624,43 +637,46 @@ pub const PathWatcherManager = struct {
|
||||
|
||||
// unregister is always called form main thread
|
||||
fn unregisterWatcher(this: *PathWatcherManager, watcher: *PathWatcher) void {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
const should_deinit = brk: {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
|
||||
var watchers = this.watchers.slice();
|
||||
defer {
|
||||
if (this.deinit_on_last_watcher and this.watcher_count == 0) {
|
||||
this.deinit();
|
||||
}
|
||||
}
|
||||
var watchers = this.watchers.slice();
|
||||
|
||||
for (watchers, 0..) |w, i| {
|
||||
if (w) |item| {
|
||||
if (item == watcher) {
|
||||
watchers[i] = null;
|
||||
// if is the last one just pop
|
||||
if (i == watchers.len - 1) {
|
||||
this.watchers.len -= 1;
|
||||
}
|
||||
this.watcher_count -= 1;
|
||||
|
||||
this._decrementPathRefNoLock(watcher.path.path);
|
||||
if (comptime Environment.isMac) {
|
||||
if (watcher.fsevents_watcher != null) {
|
||||
break;
|
||||
for (watchers, 0..) |w, i| {
|
||||
if (w) |item| {
|
||||
if (item == watcher) {
|
||||
watchers[i] = null;
|
||||
// if is the last one just pop
|
||||
if (i == watchers.len - 1) {
|
||||
this.watchers.len -= 1;
|
||||
}
|
||||
}
|
||||
this.watcher_count -= 1;
|
||||
|
||||
{
|
||||
watcher.mutex.lock();
|
||||
defer watcher.mutex.unlock();
|
||||
while (watcher.file_paths.pop()) |file_path| {
|
||||
this._decrementPathRefNoLock(file_path);
|
||||
this._decrementPathRefNoLock(watcher.path.path);
|
||||
if (comptime Environment.isMac) {
|
||||
if (watcher.fsevents_watcher != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
watcher.mutex.lock();
|
||||
defer watcher.mutex.unlock();
|
||||
while (watcher.file_paths.pop()) |file_path| {
|
||||
this._decrementPathRefNoLock(file_path);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
break :brk this.deinit_on_last_watcher and this.watcher_count == 0;
|
||||
};
|
||||
|
||||
if (should_deinit) {
|
||||
this.deinit();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -687,7 +703,8 @@ pub const PathWatcherManager = struct {
|
||||
return;
|
||||
}
|
||||
|
||||
this.main_watcher.deinit(false);
|
||||
// Stop the watcher thread and wait for it to finish
|
||||
this.main_watcher.deinit(true);
|
||||
|
||||
if (this.watcher_count > 0) {
|
||||
while (this.watchers.pop()) |watcher| {
|
||||
@@ -760,8 +777,9 @@ pub const PathWatcher = struct {
|
||||
|
||||
if (comptime Environment.isMac) {
|
||||
if (!path.is_file) {
|
||||
var buffer: bun.PathBuffer = undefined;
|
||||
const resolved_path_temp = std.os.getFdPath(path.fd.cast(), &buffer) catch |err| {
|
||||
const buffer = bun.PathBufferPool.get();
|
||||
defer bun.PathBufferPool.put(buffer);
|
||||
const resolved_path_temp = bun.sys.getFdPath(path.fd, buffer).unwrap() catch |err| {
|
||||
bun.default_allocator.destroy(this);
|
||||
return err;
|
||||
};
|
||||
@@ -845,11 +863,17 @@ pub const PathWatcher = struct {
|
||||
}
|
||||
|
||||
pub fn unrefPendingDirectory(this: *PathWatcher) void {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
this.pending_directories -= 1;
|
||||
if (this.isClosed() and this.pending_directories == 0) {
|
||||
this.has_pending_directories.store(false, .release);
|
||||
const should_deinit = brk: {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
this.pending_directories -= 1;
|
||||
if (this.pending_directories == 0) {
|
||||
this.has_pending_directories.store(false, .release);
|
||||
}
|
||||
break :brk this.pending_directories == 0 and this.closed.load(.acquire);
|
||||
};
|
||||
|
||||
if (should_deinit) {
|
||||
this.deinit();
|
||||
}
|
||||
}
|
||||
@@ -963,7 +987,7 @@ pub fn watch(
|
||||
.err => |_err| {
|
||||
var err = _err;
|
||||
err.syscall = .watch;
|
||||
return .{ .err = err };
|
||||
return .{ .err = err.clone(bun.default_allocator) catch bun.outOfMemory() };
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -998,6 +998,7 @@ pub const CopyFileWindows = struct {
|
||||
const globalThis = this.event_loop.global;
|
||||
const promise = this.promise.swap();
|
||||
const err_instance = err.toJSC(globalThis);
|
||||
|
||||
var event_loop = this.event_loop;
|
||||
event_loop.enter();
|
||||
defer event_loop.exit();
|
||||
@@ -1109,9 +1110,10 @@ pub const CopyFileWindows = struct {
|
||||
fn onMkdirpComplete(this: *CopyFileWindows) void {
|
||||
this.event_loop.unrefConcurrently();
|
||||
|
||||
if (this.err) |err| {
|
||||
if (bun.take(&this.err)) |err| {
|
||||
this.throw(err);
|
||||
bun.default_allocator.free(err.path);
|
||||
var err2 = err;
|
||||
err2.deinit();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@ pub const nocancel = struct {
|
||||
pub extern "c" fn @"write$NOCANCEL"(fd: c.fd_t, buf: [*]const u8, nbyte: usize) isize;
|
||||
pub extern "c" fn @"writev$NOCANCEL"(fd: c.fd_t, buf: [*]const std.posix.iovec_const, count: i32) isize;
|
||||
pub extern "c" fn @"pwritev$NOCANCEL"(fd: c.fd_t, buf: [*]const std.posix.iovec_const, count: i32, offset: c.off_t) isize;
|
||||
pub extern "c" fn @"poll$NOCANCEL"(fds: [*]std.posix.pollfd, nfds: c_int, timeout: c_int) isize;
|
||||
pub extern "c" fn @"ppoll$NOCANCEL"(fds: [*]std.posix.pollfd, nfds: c_int, timeout: ?*const std.posix.timespec, sigmask: ?*const std.posix.sigset_t) isize;
|
||||
};
|
||||
|
||||
pub const OSLog = opaque {
|
||||
|
||||
@@ -352,12 +352,12 @@ pub fn batchedMoveTaskDone(this: *Mv, task: *ShellMvBatchedTask) void {
|
||||
|
||||
var exec = &this.state.executing;
|
||||
|
||||
if (task.err) |err| {
|
||||
if (task.err) |*err| {
|
||||
exec.error_signal.store(true, .seq_cst);
|
||||
if (exec.err == null) {
|
||||
exec.err = err;
|
||||
exec.err = err.*;
|
||||
} else {
|
||||
bun.default_allocator.free(err.path);
|
||||
err.deinit();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -581,7 +581,8 @@ pub const ShellRmTask = struct {
|
||||
this.task_manager.err = err;
|
||||
this.task_manager.error_signal.store(true, .seq_cst);
|
||||
} else {
|
||||
bun.default_allocator.free(err.path);
|
||||
var err2 = err;
|
||||
err2.deinit();
|
||||
}
|
||||
},
|
||||
.result => {},
|
||||
@@ -596,7 +597,7 @@ pub const ShellRmTask = struct {
|
||||
this.task_manager.err = err;
|
||||
this.task_manager.error_signal.store(true, .seq_cst);
|
||||
} else {
|
||||
bun.default_allocator.free(err.path);
|
||||
this.task_manager.err.?.deinit();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
43
src/sys.zig
43
src/sys.zig
@@ -271,6 +271,7 @@ pub const Tag = enum(u8) {
|
||||
futime,
|
||||
pidfd_open,
|
||||
poll,
|
||||
ppoll,
|
||||
watch,
|
||||
scandir,
|
||||
|
||||
@@ -423,6 +424,18 @@ pub const Error = struct {
|
||||
};
|
||||
}
|
||||
|
||||
/// Only call this after it's been .clone()'d
|
||||
pub fn deinit(this: *Error) void {
|
||||
if (this.path.len > 0) {
|
||||
bun.default_allocator.free(this.path);
|
||||
this.path = "";
|
||||
}
|
||||
if (this.dest.len > 0) {
|
||||
bun.default_allocator.free(this.dest);
|
||||
this.dest = "";
|
||||
}
|
||||
}
|
||||
|
||||
pub inline fn withPathDest(this: Error, path: anytype, dest: anytype) Error {
|
||||
if (std.meta.Child(@TypeOf(path)) == u16) {
|
||||
@compileError("Do not pass WString path to withPathDest, it needs the path encoded as utf8 (path)");
|
||||
@@ -2202,6 +2215,36 @@ pub fn recvNonBlock(fd: bun.FileDescriptor, buf: []u8) Maybe(usize) {
|
||||
return recv(fd, buf, socket_flags_nonblock);
|
||||
}
|
||||
|
||||
pub fn poll(fds: []std.posix.pollfd, timeout: i32) Maybe(usize) {
|
||||
while (true) {
|
||||
const rc = switch (Environment.os) {
|
||||
.mac => darwin_nocancel.@"poll$NOCANCEL"(fds.ptr, fds.len, timeout),
|
||||
.linux => linux.poll(fds.ptr, fds.len, timeout),
|
||||
else => @compileError("poll is not implemented on this platform"),
|
||||
};
|
||||
if (Maybe(usize).errnoSys(rc, .poll)) |err| {
|
||||
if (err.getErrno() == .INTR) continue;
|
||||
return err;
|
||||
}
|
||||
return .{ .result = @as(usize, @intCast(rc)) };
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ppoll(fds: []std.posix.pollfd, timeout: ?*std.posix.timespec, sigmask: ?*const std.posix.sigset_t) Maybe(usize) {
|
||||
while (true) {
|
||||
const rc = switch (Environment.os) {
|
||||
.mac => darwin_nocancel.@"ppoll$NOCANCEL"(fds.ptr, fds.len, timeout, sigmask),
|
||||
.linux => linux.ppoll(fds.ptr, fds.len, timeout, sigmask),
|
||||
else => @compileError("ppoll is not implemented on this platform"),
|
||||
};
|
||||
if (Maybe(usize).errnoSys(rc, .ppoll)) |err| {
|
||||
if (err.getErrno() == .INTR) continue;
|
||||
return err;
|
||||
}
|
||||
return .{ .result = @as(usize, @intCast(rc)) };
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv(fd: bun.FileDescriptor, buf: []u8, flag: u32) Maybe(usize) {
|
||||
const adjusted_len = @min(buf.len, max_count);
|
||||
const debug_timer = bun.Output.DebugTimer.start();
|
||||
|
||||
@@ -32,6 +32,11 @@ watch_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
|
||||
/// nanoseconds
|
||||
coalesce_interval: isize = 100_000,
|
||||
|
||||
/// Waker to signal the watcher thread
|
||||
waker: bun.Async.Waker = undefined,
|
||||
/// Whether the watcher is still running
|
||||
running: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
|
||||
|
||||
pub const EventListIndex = c_int;
|
||||
pub const Event = extern struct {
|
||||
watch_descriptor: EventListIndex,
|
||||
@@ -64,7 +69,7 @@ pub const Event = extern struct {
|
||||
pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(EventListIndex) {
|
||||
bun.assert(this.loaded);
|
||||
const old_count = this.watch_count.fetchAdd(1, .release);
|
||||
defer if (old_count == 0) Futex.wake(&this.watch_count, 10);
|
||||
defer if (old_count == 0) this.waker.wake();
|
||||
const watch_file_mask = IN.EXCL_UNLINK | IN.MOVE_SELF | IN.DELETE_SELF | IN.MOVED_TO | IN.MODIFY;
|
||||
const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_file_mask);
|
||||
log("inotify_add_watch({}) = {}", .{ this.fd, rc });
|
||||
@@ -75,7 +80,7 @@ pub fn watchPath(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(Ev
|
||||
pub fn watchDir(this: *INotifyWatcher, pathname: [:0]const u8) bun.JSC.Maybe(EventListIndex) {
|
||||
bun.assert(this.loaded);
|
||||
const old_count = this.watch_count.fetchAdd(1, .release);
|
||||
defer if (old_count == 0) Futex.wake(&this.watch_count, 10);
|
||||
defer if (old_count == 0) this.waker.wake();
|
||||
const watch_dir_mask = IN.EXCL_UNLINK | IN.DELETE | IN.DELETE_SELF | IN.CREATE | IN.MOVE_SELF | IN.ONLYDIR | IN.MOVED_TO;
|
||||
const rc = system.inotify_add_watch(this.fd.cast(), pathname, watch_dir_mask);
|
||||
log("inotify_add_watch({}) = {}", .{ this.fd, rc });
|
||||
@@ -100,6 +105,7 @@ pub fn init(this: *INotifyWatcher, _: []const u8) !void {
|
||||
// TODO: convert to bun.sys.Error
|
||||
this.fd = .fromNative(try std.posix.inotify_init1(IN.CLOEXEC));
|
||||
this.eventlist_bytes = &(try bun.default_allocator.alignedAlloc(EventListBytes, @alignOf(Event), 1))[0];
|
||||
this.waker = try bun.Async.Waker.init();
|
||||
log("{} init", .{this.fd});
|
||||
}
|
||||
|
||||
@@ -116,72 +122,105 @@ pub fn read(this: *INotifyWatcher) bun.JSC.Maybe([]const *align(1) Event) {
|
||||
// We still don't correctly handle MOVED_FROM && MOVED_TO it seems.
|
||||
var i: u32 = 0;
|
||||
const read_eventlist_bytes = if (this.read_ptr) |ptr| brk: {
|
||||
Futex.waitForever(&this.watch_count, 0);
|
||||
i = ptr.i;
|
||||
break :brk this.eventlist_bytes[0..ptr.len];
|
||||
} else outer: while (true) {
|
||||
Futex.waitForever(&this.watch_count, 0);
|
||||
// Check if we should stop
|
||||
if (!this.running.load(.acquire)) return .{ .result = &.{} };
|
||||
|
||||
const rc = std.posix.system.read(
|
||||
this.fd.cast(),
|
||||
this.eventlist_bytes,
|
||||
this.eventlist_bytes.len,
|
||||
);
|
||||
const errno = std.posix.errno(rc);
|
||||
switch (errno) {
|
||||
.SUCCESS => {
|
||||
var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)];
|
||||
log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len });
|
||||
if (read_eventlist_bytes.len == 0) return .{ .result = &.{} };
|
||||
// Check if watch_count is 0, if so wait for waker
|
||||
const count = this.watch_count.load(.acquire);
|
||||
if (count == 0) {
|
||||
// Wait on just the waker fd since there are no watches
|
||||
var fds = [_]std.posix.pollfd{.{
|
||||
.fd = this.waker.getFd().cast(),
|
||||
.events = std.posix.POLL.IN,
|
||||
.revents = 0,
|
||||
}};
|
||||
|
||||
// IN_MODIFY is very noisy
|
||||
// we do a 0.1ms sleep to try to coalesce events better
|
||||
const double_read_threshold = Event.largest_size * (max_count / 2);
|
||||
if (read_eventlist_bytes.len < double_read_threshold) {
|
||||
var fds = [_]std.posix.pollfd{.{
|
||||
.fd = this.fd.cast(),
|
||||
.events = std.posix.POLL.IN | std.posix.POLL.ERR,
|
||||
.revents = 0,
|
||||
}};
|
||||
var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval };
|
||||
if ((std.posix.ppoll(&fds, ×pec, null) catch 0) > 0) {
|
||||
inner: while (true) {
|
||||
const rest = this.eventlist_bytes[read_eventlist_bytes.len..];
|
||||
bun.assert(rest.len > 0);
|
||||
const new_rc = std.posix.system.read(this.fd.cast(), rest.ptr, rest.len);
|
||||
// Output.warn("wapa {} {} = {}", .{ this.fd, rest.len, new_rc });
|
||||
const e = std.posix.errno(new_rc);
|
||||
switch (e) {
|
||||
.SUCCESS => {
|
||||
read_eventlist_bytes.len += @intCast(new_rc);
|
||||
break :outer read_eventlist_bytes;
|
||||
},
|
||||
.AGAIN, .INTR => continue :inner,
|
||||
else => return .{ .err = .{
|
||||
.errno = @truncate(@intFromEnum(e)),
|
||||
.syscall = .read,
|
||||
} },
|
||||
const poll_rc = switch (bun.sys.poll(&fds, -1)) {
|
||||
.result => |rc| rc,
|
||||
.err => |err| return .{ .err = err },
|
||||
};
|
||||
|
||||
if (poll_rc > 0 and (fds[0].revents & std.posix.POLL.IN) != 0) {
|
||||
// Consume the waker
|
||||
this.waker.wait();
|
||||
}
|
||||
|
||||
// Check again if we should stop or if watches were added
|
||||
if (!this.running.load(.acquire)) return .{ .result = &.{} };
|
||||
continue :outer;
|
||||
}
|
||||
|
||||
// Wait on both inotify fd and waker fd
|
||||
var fds = [_]std.posix.pollfd{
|
||||
.{
|
||||
.fd = this.fd.cast(),
|
||||
.events = std.posix.POLL.IN,
|
||||
.revents = 0,
|
||||
},
|
||||
.{
|
||||
.fd = this.waker.getFd().cast(),
|
||||
.events = std.posix.POLL.IN,
|
||||
.revents = 0,
|
||||
},
|
||||
};
|
||||
|
||||
const poll_rc = switch (bun.sys.poll(&fds, -1)) {
|
||||
.result => |rc| rc,
|
||||
.err => |err| return .{ .err = err },
|
||||
};
|
||||
|
||||
if (poll_rc > 0) {
|
||||
// Check if waker was signaled
|
||||
if ((fds[1].revents & std.posix.POLL.IN) != 0) {
|
||||
// Consume the waker
|
||||
this.waker.wait();
|
||||
// Check if we should stop
|
||||
if (!this.running.load(.acquire)) return .{ .result = &.{} };
|
||||
}
|
||||
|
||||
// Check if inotify has events
|
||||
if ((fds[0].revents & std.posix.POLL.IN) != 0) {
|
||||
switch (bun.sys.read(
|
||||
this.fd,
|
||||
this.eventlist_bytes,
|
||||
)) {
|
||||
.result => |rc| {
|
||||
var read_eventlist_bytes = this.eventlist_bytes[0..@intCast(rc)];
|
||||
log("{} read {} bytes", .{ this.fd, read_eventlist_bytes.len });
|
||||
if (read_eventlist_bytes.len == 0) return .{ .result = &.{} };
|
||||
|
||||
// IN_MODIFY is very noisy
|
||||
// we do a 0.1ms sleep to try to coalesce events better
|
||||
const double_read_threshold = Event.largest_size * (max_count / 2);
|
||||
if (read_eventlist_bytes.len < double_read_threshold) {
|
||||
var timespec = std.posix.timespec{ .sec = 0, .nsec = this.coalesce_interval };
|
||||
if ((bun.sys.ppoll(fds[0..1], ×pec, null).unwrap() catch 0) > 0) {
|
||||
const rest = this.eventlist_bytes[read_eventlist_bytes.len..];
|
||||
bun.assert(rest.len > 0);
|
||||
switch (bun.sys.read(this.fd, rest)) {
|
||||
.result => |rc2| {
|
||||
read_eventlist_bytes.len += @intCast(rc2);
|
||||
break :outer read_eventlist_bytes;
|
||||
},
|
||||
.err => |err| return .{ .err = err },
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break :outer read_eventlist_bytes;
|
||||
},
|
||||
.AGAIN, .INTR => continue :outer,
|
||||
.INVAL => {
|
||||
if (Environment.isDebug) {
|
||||
bun.Output.err("EINVAL", "inotify read({}, {d})", .{ this.fd, this.eventlist_bytes.len });
|
||||
break :outer read_eventlist_bytes;
|
||||
},
|
||||
.err => |err| {
|
||||
if (err.getErrno() == .AGAIN) {
|
||||
continue :outer;
|
||||
}
|
||||
|
||||
return .{ .err = err };
|
||||
},
|
||||
}
|
||||
return .{ .err = .{
|
||||
.errno = @truncate(@intFromEnum(errno)),
|
||||
.syscall = .read,
|
||||
} };
|
||||
},
|
||||
else => return .{ .err = .{
|
||||
.errno = @truncate(@intFromEnum(errno)),
|
||||
.syscall = .read,
|
||||
} },
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -213,11 +252,17 @@ pub fn read(this: *INotifyWatcher) bun.JSC.Maybe([]const *align(1) Event) {
|
||||
}
|
||||
}
|
||||
|
||||
// Clear read_ptr if we've processed all buffered events
|
||||
this.read_ptr = null;
|
||||
|
||||
return .{ .result = this.eventlist_ptrs[0..count] };
|
||||
}
|
||||
|
||||
pub fn stop(this: *INotifyWatcher) void {
|
||||
log("{} stop", .{this.fd});
|
||||
this.running.store(false, .release);
|
||||
// Wake up any threads waiting in read()
|
||||
this.waker.wake();
|
||||
if (this.fd != bun.invalid_fd) {
|
||||
this.fd.close();
|
||||
this.fd = bun.invalid_fd;
|
||||
@@ -236,6 +281,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
|
||||
|
||||
// TODO: is this thread safe?
|
||||
var remaining_events = events.len;
|
||||
var event_offset: usize = 0;
|
||||
|
||||
const eventlist_index = this.watchlist.items(.eventlist_index);
|
||||
|
||||
@@ -244,7 +290,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
|
||||
var temp_name_list: [128]?[:0]u8 = undefined;
|
||||
var temp_name_off: u8 = 0;
|
||||
|
||||
const slice = events[0..@min(128, remaining_events, this.watch_events.len)];
|
||||
const slice = events[event_offset..][0..@min(128, remaining_events, this.watch_events.len)];
|
||||
var watchevents = this.watch_events[0..slice.len];
|
||||
var watch_event_id: u32 = 0;
|
||||
for (slice) |event| {
|
||||
@@ -297,6 +343,7 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.JSC.Maybe(void) {
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
event_offset += slice.len;
|
||||
remaining_events -= slice.len;
|
||||
}
|
||||
|
||||
@@ -319,7 +366,6 @@ const std = @import("std");
|
||||
const bun = @import("bun");
|
||||
const Environment = bun.Environment;
|
||||
const Output = bun.Output;
|
||||
const Futex = bun.Futex;
|
||||
const system = std.posix.system;
|
||||
const IN = std.os.linux.IN;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user