Files
bun.sh/src/bun.js/node/node_fs_watcher.zig
2023-07-27 03:34:18 -07:00

687 lines
23 KiB
Zig

const std = @import("std");
const JSC = @import("root").bun.JSC;
const bun = @import("root").bun;
const Fs = @import("../../fs.zig");
const Path = @import("../../resolver/resolve_path.zig");
const Encoder = JSC.WebCore.Encoder;
const Mutex = @import("../../lock.zig").Lock;
const FSEvents = @import("./fs_events.zig");
const PathWatcher = @import("./path_watcher.zig");
const VirtualMachine = JSC.VirtualMachine;
const EventLoop = JSC.EventLoop;
const PathLike = JSC.Node.PathLike;
const ArgumentsSlice = JSC.Node.ArgumentsSlice;
const Output = bun.Output;
const string = bun.string;
const StoredFileDescriptorType = bun.StoredFileDescriptorType;
const Environment = bun.Environment;
pub const FSWatcher = struct {
ctx: *VirtualMachine,
verbose: bool = false,
entry_path: ?string = null,
entry_dir: string = "",
// JSObject
mutex: Mutex,
signal: ?*JSC.AbortSignal,
persistent: bool,
default_watcher: ?*PathWatcher.PathWatcher,
fsevents_watcher: ?*FSEvents.FSEventsWatcher,
poll_ref: JSC.PollRef = .{},
globalThis: *JSC.JSGlobalObject,
js_this: JSC.JSValue,
encoding: JSC.Node.Encoding,
// user can call close and pre-detach so we need to track this
closed: bool,
// counts pending tasks so we only deinit after all tasks are done
task_count: u32,
has_pending_activity: std.atomic.Atomic(bool),
current_task: FSWatchTask = undefined,
pub usingnamespace JSC.Codegen.JSFSWatcher;
pub fn eventLoop(this: FSWatcher) *EventLoop {
return this.ctx.eventLoop();
}
pub fn enqueueTaskConcurrent(this: FSWatcher, task: *JSC.ConcurrentTask) void {
this.eventLoop().enqueueTaskConcurrent(task);
}
pub fn deinit(this: *FSWatcher) void {
// stop all managers and signals
this.detach();
if (this.entry_path) |path| {
this.entry_path = null;
bun.default_allocator.free(path);
}
bun.default_allocator.destroy(this);
}
pub const FSWatchTask = struct {
ctx: *FSWatcher,
count: u8 = 0,
entries: [8]Entry = undefined,
concurrent_task: JSC.ConcurrentTask = undefined,
pub const EventType = enum {
rename,
change,
@"error",
abort,
};
pub const Entry = struct {
file_path: string,
event_type: EventType,
needs_free: bool,
};
pub fn append(this: *FSWatchTask, file_path: string, event_type: EventType, needs_free: bool) void {
if (this.count == 8) {
this.enqueue();
var ctx = this.ctx;
this.* = .{
.ctx = ctx,
.count = 0,
};
}
this.entries[this.count] = .{
.file_path = file_path,
.event_type = event_type,
.needs_free = needs_free,
};
this.count += 1;
}
pub fn run(this: *FSWatchTask) void {
// this runs on JS Context Thread
for (this.entries[0..this.count]) |entry| {
switch (entry.event_type) {
.rename => {
this.ctx.emit(entry.file_path, "rename");
},
.change => {
this.ctx.emit(entry.file_path, "change");
},
.@"error" => {
// file_path is the error message in this case
this.ctx.emitError(entry.file_path);
},
.abort => {
this.ctx.emitIfAborted();
},
}
}
this.ctx.unrefTask();
}
pub fn enqueue(this: *FSWatchTask) void {
if (this.count == 0)
return;
// if false is closed or detached (can still contain valid refs but will not create a new one)
if (this.ctx.refTask()) {
var that = bun.default_allocator.create(FSWatchTask) catch unreachable;
that.* = this.*;
this.count = 0;
that.concurrent_task.task = JSC.Task.init(that);
this.ctx.enqueueTaskConcurrent(&that.concurrent_task);
return;
}
// closed or detached so just cleanEntries
this.cleanEntries();
}
pub fn cleanEntries(this: *FSWatchTask) void {
for (this.entries[0..this.count]) |entry| {
if (entry.needs_free) {
bun.default_allocator.free(entry.file_path);
}
}
this.count = 0;
}
pub fn deinit(this: *FSWatchTask) void {
this.cleanEntries();
bun.default_allocator.destroy(this);
}
};
pub fn onFSEventUpdate(
ctx: ?*anyopaque,
path: string,
is_file: bool,
is_rename: bool,
) void {
// only called by FSEventUpdate
const this = bun.cast(*FSWatcher, ctx.?);
const relative_path = bun.default_allocator.dupe(u8, path) catch unreachable;
const event_type: FSWatchTask.EventType = if (is_rename) .rename else .change;
if (this.verbose) {
if (is_file) {
Output.prettyErrorln("<r> <d>File changed: {s}<r>", .{relative_path});
} else {
Output.prettyErrorln("<r> <d>Dir changed: {s}<r>", .{relative_path});
}
}
this.current_task.append(relative_path, event_type, true);
}
pub fn onPathUpdate(ctx: ?*anyopaque, path: string, is_file: bool, event_type: PathWatcher.PathWatcher.EventType) void {
// only called by PathWatcher
const this = bun.cast(*FSWatcher, ctx.?);
const relative_path = bun.default_allocator.dupe(u8, path) catch unreachable;
if (this.verbose and event_type != .@"error") {
if (is_file) {
Output.prettyErrorln("<r> <d>File changed: {s}<r>", .{relative_path});
} else {
Output.prettyErrorln("<r> <d>Dir changed: {s}<r>", .{relative_path});
}
}
switch (event_type) {
.rename => {
this.current_task.append(relative_path, .rename, true);
},
.change => {
this.current_task.append(relative_path, .change, true);
},
else => {
this.current_task.append(relative_path, .@"error", true);
},
}
}
pub fn onUpdateEnd(ctx: ?*anyopaque) void {
const this = bun.cast(*FSWatcher, ctx.?);
if (this.verbose) {
Output.flush();
}
// we only enqueue after all events are processed
// this is called by FSEventsWatcher or PathWatcher
this.current_task.enqueue();
}
pub const Arguments = struct {
path: PathLike,
listener: JSC.JSValue,
global_this: JSC.C.JSContextRef,
signal: ?*JSC.AbortSignal,
persistent: bool,
recursive: bool,
encoding: JSC.Node.Encoding,
verbose: bool,
pub fn fromJS(ctx: JSC.C.JSContextRef, arguments: *ArgumentsSlice, exception: JSC.C.ExceptionRef) ?Arguments {
const vm = ctx.vm();
const path = PathLike.fromJS(ctx, arguments, exception) orelse {
if (exception.* == null) {
JSC.throwInvalidArguments(
"filename must be a string or TypedArray",
.{},
ctx,
exception,
);
}
return null;
};
if (exception.* != null) return null;
var listener: JSC.JSValue = .zero;
var signal: ?*JSC.AbortSignal = null;
var persistent: bool = true;
var recursive: bool = false;
var encoding: JSC.Node.Encoding = .utf8;
var verbose = false;
if (arguments.nextEat()) |options_or_callable| {
// options
if (options_or_callable.isObject()) {
if (options_or_callable.get(ctx, "persistent")) |persistent_| {
if (!persistent_.isBoolean()) {
JSC.throwInvalidArguments(
"persistent must be a boolean.",
.{},
ctx,
exception,
);
return null;
}
persistent = persistent_.toBoolean();
}
if (options_or_callable.get(ctx, "verbose")) |verbose_| {
if (!verbose_.isBoolean()) {
JSC.throwInvalidArguments(
"verbose must be a boolean.",
.{},
ctx,
exception,
);
return null;
}
verbose = verbose_.toBoolean();
}
if (options_or_callable.get(ctx, "encoding")) |encoding_| {
if (!encoding_.isString()) {
JSC.throwInvalidArguments(
"encoding must be a string.",
.{},
ctx,
exception,
);
return null;
}
if (JSC.Node.Encoding.fromJS(encoding_, ctx.ptr())) |node_encoding| {
encoding = node_encoding;
} else {
JSC.throwInvalidArguments(
"invalid encoding.",
.{},
ctx,
exception,
);
return null;
}
}
if (options_or_callable.get(ctx, "recursive")) |recursive_| {
if (!recursive_.isBoolean()) {
JSC.throwInvalidArguments(
"recursive must be a boolean.",
.{},
ctx,
exception,
);
return null;
}
recursive = recursive_.toBoolean();
}
// abort signal
if (options_or_callable.get(ctx, "signal")) |signal_| {
if (JSC.AbortSignal.fromJS(signal_)) |signal_obj| {
//Keep it alive
signal_.ensureStillAlive();
signal = signal_obj;
} else {
JSC.throwInvalidArguments(
"signal is not of type AbortSignal.",
.{},
ctx,
exception,
);
return null;
}
}
// listener
if (arguments.nextEat()) |callable| {
if (!callable.isCell() or !callable.isCallable(vm)) {
exception.* = JSC.toInvalidArguments("Expected \"listener\" callback to be a function", .{}, ctx).asObjectRef();
return null;
}
listener = callable;
}
} else {
if (!options_or_callable.isCell() or !options_or_callable.isCallable(vm)) {
exception.* = JSC.toInvalidArguments("Expected \"listener\" callback to be a function", .{}, ctx).asObjectRef();
return null;
}
listener = options_or_callable;
}
}
if (listener == .zero) {
exception.* = JSC.toInvalidArguments("Expected \"listener\" callback", .{}, ctx).asObjectRef();
return null;
}
return Arguments{
.path = path,
.listener = listener,
.global_this = ctx,
.signal = signal,
.persistent = persistent,
.recursive = recursive,
.encoding = encoding,
.verbose = verbose,
};
}
pub fn createFSWatcher(this: Arguments) !JSC.JSValue {
const obj = try FSWatcher.init(this);
if (obj.js_this != .zero) {
return obj.js_this;
}
return JSC.JSValue.jsUndefined();
}
};
pub fn initJS(this: *FSWatcher, listener: JSC.JSValue) void {
if (this.persistent) {
this.poll_ref.ref(this.ctx);
}
const js_this = FSWatcher.toJS(this, this.globalThis);
js_this.ensureStillAlive();
this.js_this = js_this;
FSWatcher.listenerSetCached(js_this, this.globalThis, listener);
if (this.signal) |s| {
// already aborted?
if (s.aborted()) {
// safely abort next tick
var current_task: FSWatchTask = .{
.ctx = this,
};
current_task.append("", .abort, false);
current_task.enqueue();
} else {
// watch for abortion
this.signal = s.listen(FSWatcher, this, FSWatcher.emitAbort);
}
}
}
pub fn emitIfAborted(this: *FSWatcher) void {
if (this.signal) |s| {
if (s.aborted()) {
const err = s.abortReason();
this.emitAbort(err);
}
}
}
pub fn emitAbort(this: *FSWatcher, err: JSC.JSValue) void {
if (this.closed) return;
defer this.close();
err.ensureStillAlive();
if (this.js_this != .zero) {
const js_this = this.js_this;
js_this.ensureStillAlive();
if (FSWatcher.listenerGetCached(js_this)) |listener| {
listener.ensureStillAlive();
var args = [_]JSC.JSValue{
JSC.ZigString.static("error").toValue(this.globalThis),
if (err.isEmptyOrUndefinedOrNull()) JSC.WebCore.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.globalThis) else err,
};
_ = listener.callWithGlobalThis(
this.globalThis,
&args,
);
}
}
}
pub fn emitError(this: *FSWatcher, err: string) void {
if (this.closed) return;
defer this.close();
if (this.js_this != .zero) {
const js_this = this.js_this;
js_this.ensureStillAlive();
if (FSWatcher.listenerGetCached(js_this)) |listener| {
listener.ensureStillAlive();
var args = [_]JSC.JSValue{
JSC.ZigString.static("error").toValue(this.globalThis),
JSC.ZigString.fromUTF8(err).toErrorInstance(this.globalThis),
};
_ = listener.callWithGlobalThis(
this.globalThis,
&args,
);
}
}
}
pub fn emit(this: *FSWatcher, file_name: string, comptime eventType: string) void {
if (this.js_this != .zero) {
const js_this = this.js_this;
js_this.ensureStillAlive();
if (FSWatcher.listenerGetCached(js_this)) |listener| {
listener.ensureStillAlive();
var filename: JSC.JSValue = JSC.JSValue.jsUndefined();
if (file_name.len > 0) {
if (this.encoding == .buffer)
filename = JSC.ArrayBuffer.createBuffer(this.globalThis, file_name)
else if (this.encoding == .utf8) {
filename = JSC.ZigString.fromUTF8(file_name).toValueGC(this.globalThis);
} else {
// convert to desired encoding
filename = Encoder.toStringAtRuntime(file_name.ptr, file_name.len, this.globalThis, this.encoding);
}
}
var args = [_]JSC.JSValue{
JSC.ZigString.static(eventType).toValue(this.globalThis),
filename,
};
_ = listener.callWithGlobalThis(
this.globalThis,
&args,
);
}
}
}
pub fn doRef(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
if (!this.closed and !this.persistent) {
this.persistent = true;
this.poll_ref.ref(this.ctx);
}
return JSC.JSValue.jsUndefined();
}
pub fn doUnref(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
if (this.persistent) {
this.persistent = false;
this.poll_ref.unref(this.ctx);
}
return JSC.JSValue.jsUndefined();
}
pub fn hasRef(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
return JSC.JSValue.jsBoolean(this.persistent);
}
// this can be called from Watcher Thread or JS Context Thread
pub fn refTask(this: *FSWatcher) bool {
this.mutex.lock();
defer this.mutex.unlock();
// stop new references
if (this.closed) return false;
this.task_count += 1;
return true;
}
pub fn hasPendingActivity(this: *FSWatcher) callconv(.C) bool {
@fence(.Acquire);
return this.has_pending_activity.load(.Acquire);
}
// only called from Main Thread
pub fn updateHasPendingActivity(this: *FSWatcher) void {
@fence(.Release);
this.has_pending_activity.store(false, .Release);
}
// unref is always called on main JS Context Thread
pub fn unrefTask(this: *FSWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
this.task_count -= 1;
if (this.closed and this.task_count == 0) {
this.updateHasPendingActivity();
}
}
pub fn close(
this: *FSWatcher,
) void {
this.mutex.lock();
if (!this.closed) {
this.closed = true;
// emit should only be called unlocked
this.mutex.unlock();
this.emit("", "close");
// we immediately detach here
this.detach();
// no need to lock again, because ref checks closed and unref is only called on main thread
if (this.task_count == 0) {
this.updateHasPendingActivity();
}
} else {
this.mutex.unlock();
}
}
// this can be called multiple times
pub fn detach(this: *FSWatcher) void {
if (this.signal) |signal| {
this.signal = null;
signal.detach(this);
}
if (this.default_watcher) |default_watcher| {
this.default_watcher = null;
default_watcher.deinit();
}
if (this.fsevents_watcher) |fsevents_watcher| {
this.fsevents_watcher = null;
fsevents_watcher.deinit();
}
if (this.persistent) {
this.persistent = false;
this.poll_ref.unref(this.ctx);
}
this.js_this = .zero;
}
pub fn doClose(this: *FSWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
this.close();
return JSC.JSValue.jsUndefined();
}
pub fn finalize(this: *FSWatcher) callconv(.C) void {
this.deinit();
}
const PathResult = struct {
fd: StoredFileDescriptorType = 0,
is_file: bool = true,
};
// TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages
fn fdFromAbsolutePathZ(
absolute_path_z: [:0]const u8,
) !PathResult {
if (std.fs.openIterableDirAbsoluteZ(absolute_path_z, .{
.access_sub_paths = true,
})) |iterable_dir| {
return PathResult{
.fd = iterable_dir.dir.fd,
.is_file = false,
};
} else |err| {
if (err == error.NotDir) {
var file = try std.fs.openFileAbsoluteZ(absolute_path_z, .{ .mode = .read_only });
return PathResult{
.fd = file.handle,
.is_file = true,
};
} else {
return err;
}
}
unreachable;
}
pub fn init(args: Arguments) !*FSWatcher {
var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
var slice = args.path.slice();
if (bun.strings.startsWith(slice, "file://")) {
slice = slice[6..];
}
var parts = [_]string{
slice,
};
var file_path = Path.joinAbsStringBuf(
Fs.FileSystem.instance.top_level_dir,
&buf,
&parts,
.auto,
);
buf[file_path.len] = 0;
var file_path_z = buf[0..file_path.len :0];
var fs_type = try fdFromAbsolutePathZ(file_path_z);
var ctx = try bun.default_allocator.create(FSWatcher);
const vm = args.global_this.bunVM();
ctx.* = .{
.ctx = vm,
.current_task = .{
.ctx = ctx,
.count = 0,
},
.mutex = Mutex.init(),
.signal = if (args.signal) |s| s.ref() else null,
.persistent = args.persistent,
.default_watcher = null,
.fsevents_watcher = null,
.globalThis = args.global_this,
.js_this = .zero,
.encoding = args.encoding,
.closed = false,
.task_count = 0,
.has_pending_activity = std.atomic.Atomic(bool).init(true),
.verbose = args.verbose,
};
errdefer ctx.deinit();
if (comptime Environment.isMac) {
if (!fs_type.is_file) {
var dir_path_clone = bun.default_allocator.dupeZ(u8, file_path) catch unreachable;
ctx.entry_path = dir_path_clone;
ctx.entry_dir = dir_path_clone;
ctx.fsevents_watcher = try FSEvents.watch(dir_path_clone, args.recursive, onFSEventUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx));
ctx.initJS(args.listener);
return ctx;
}
}
var file_path_clone = bun.default_allocator.dupeZ(u8, file_path) catch unreachable;
ctx.entry_path = file_path_clone;
ctx.entry_dir = std.fs.path.dirname(file_path_clone) orelse file_path_clone;
ctx.default_watcher = try PathWatcher.watch(vm, file_path_clone, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx));
ctx.initJS(args.listener);
return ctx;
}
};