diff --git a/src/watcher/INotifyWatcher.zig b/src/watcher/INotifyWatcher.zig index 8781b37e95..65b634c566 100644 --- a/src/watcher/INotifyWatcher.zig +++ b/src/watcher/INotifyWatcher.zig @@ -199,7 +199,7 @@ pub fn read(this: *INotifyWatcher) bun.sys.Maybe([]const *align(1) Event) { event.watch_descriptor, event.cookie, event.mask, - bun.fmt.quote(event.name()), + bun.fmt.quote(if (event.name_len > 0) event.name() else ""), }); // when under high load with short file paths, it is very easy to @@ -229,76 +229,130 @@ pub fn stop(this: *INotifyWatcher) void { pub fn watchLoopCycle(this: *bun.Watcher) bun.sys.Maybe(void) { defer Output.flush(); - var events = switch (this.platform.read()) { + const events = switch (this.platform.read()) { .result => |result| result, .err => |err| return .{ .err = err }, }; if (events.len == 0) return .success; - // TODO: is this thread safe? - var remaining_events = events.len; - const eventlist_index = this.watchlist.items(.eventlist_index); - while (remaining_events > 0) { + var event_id: usize = 0; + var events_processed: usize = 0; + + while (events_processed < events.len) { var name_off: u8 = 0; var temp_name_list: [128]?[:0]u8 = undefined; var temp_name_off: u8 = 0; - const slice = events[0..@min(128, remaining_events, this.watch_events.len)]; - var watchevents = this.watch_events[0..slice.len]; - var watch_event_id: u32 = 0; - for (slice) |event| { - watchevents[watch_event_id] = watchEventFromInotifyEvent( + // Process events one by one, batching when we hit limits + while (events_processed < events.len) { + const event = events[events_processed]; + + // Check if we're about to exceed the watch_events array capacity + if (event_id >= this.watch_events.len) { + // Process current batch of events + switch (processINotifyEventBatch(this, event_id, temp_name_list[0..temp_name_off])) { + .err => |err| return .{ .err = err }, + .result => {}, + } + // Reset event_id to start a new batch + event_id = 0; + name_off = 0; + temp_name_off = 0; + } + + // Check if we can fit this event's name in temp_name_list + const will_have_name = event.name_len > 0; + if (will_have_name and temp_name_off >= temp_name_list.len) { + // Process current batch and start a new one + if (event_id > 0) { + switch (processINotifyEventBatch(this, event_id, temp_name_list[0..temp_name_off])) { + .err => |err| return .{ .err = err }, + .result => {}, + } + event_id = 0; + name_off = 0; + temp_name_off = 0; + } + } + + this.watch_events[event_id] = watchEventFromInotifyEvent( event, @intCast(std.mem.indexOfScalar( EventListIndex, eventlist_index, event.watch_descriptor, - ) orelse continue), + ) orelse { + events_processed += 1; + continue; + }), ); - temp_name_list[temp_name_off] = if (event.name_len > 0) - event.name() - else - null; - watchevents[watch_event_id].name_off = temp_name_off; - watchevents[watch_event_id].name_len = @as(u8, @intFromBool((event.name_len > 0))); - temp_name_off += @as(u8, @intFromBool((event.name_len > 0))); - watch_event_id += 1; + // Safely handle event names with bounds checking + if (event.name_len > 0 and temp_name_off < temp_name_list.len) { + temp_name_list[temp_name_off] = event.name(); + this.watch_events[event_id].name_off = temp_name_off; + this.watch_events[event_id].name_len = 1; + temp_name_off += 1; + } else { + this.watch_events[event_id].name_off = temp_name_off; + this.watch_events[event_id].name_len = 0; + } + + event_id += 1; + events_processed += 1; } - var all_events = watchevents[0..watch_event_id]; - std.sort.pdq(WatchEvent, all_events, {}, WatchEvent.sortByIndex); + // Process any remaining events in the final batch + if (event_id > 0) { + switch (processINotifyEventBatch(this, event_id, temp_name_list[0..temp_name_off])) { + .err => |err| return .{ .err = err }, + .result => {}, + } + } + break; + } - var last_event_index: usize = 0; - var last_event_id: EventListIndex = std.math.maxInt(EventListIndex); + return .success; +} - for (all_events, 0..) |_, i| { - if (all_events[i].name_len > 0) { +fn processINotifyEventBatch(this: *bun.Watcher, event_count: usize, temp_name_list: []?[:0]u8) bun.sys.Maybe(void) { + if (event_count == 0) { + return .success; + } + + var name_off: u8 = 0; + var all_events = this.watch_events[0..event_count]; + std.sort.pdq(WatchEvent, all_events, {}, WatchEvent.sortByIndex); + + var last_event_index: usize = 0; + var last_event_id: EventListIndex = std.math.maxInt(EventListIndex); + + for (all_events, 0..) |_, i| { + if (all_events[i].name_len > 0) { + // Check bounds before accessing arrays + if (name_off < this.changed_filepaths.len and all_events[i].name_off < temp_name_list.len) { this.changed_filepaths[name_off] = temp_name_list[all_events[i].name_off]; all_events[i].name_off = name_off; name_off += 1; } - - if (all_events[i].index == last_event_id) { - all_events[last_event_index].merge(all_events[i]); - continue; - } - last_event_index = i; - last_event_id = all_events[i].index; } - if (all_events.len == 0) return .success; - this.mutex.lock(); - defer this.mutex.unlock(); - if (this.running) { - // all_events.len == 0 is checked above, so last_event_index + 1 is safe - this.onFileUpdate(this.ctx, all_events[0 .. last_event_index + 1], this.changed_filepaths[0..name_off], this.watchlist); - } else { - break; + if (all_events[i].index == last_event_id) { + all_events[last_event_index].merge(all_events[i]); + continue; } - remaining_events -= slice.len; + last_event_index = i; + last_event_id = all_events[i].index; + } + if (all_events.len == 0) return .success; + + this.mutex.lock(); + defer this.mutex.unlock(); + if (this.running) { + // all_events.len == 0 is checked above, so last_event_index + 1 is safe + this.onFileUpdate(this.ctx, all_events[0 .. last_event_index + 1], this.changed_filepaths[0..name_off], this.watchlist); } return .success; diff --git a/src/watcher/WindowsWatcher.zig b/src/watcher/WindowsWatcher.zig index b387097662..5b2267f7ac 100644 --- a/src/watcher/WindowsWatcher.zig +++ b/src/watcher/WindowsWatcher.zig @@ -238,18 +238,43 @@ pub fn watchLoopCycle(this: *bun.Watcher) bun.sys.Maybe(void) { // skip unrelated items if (rel == .unrelated) continue; // if the event is for a parent dir of the item, only emit it if it's a delete or rename + + // Check if we're about to exceed the watch_events array capacity + if (event_id >= this.watch_events.len) { + // Process current batch of events + switch (processWatchEventBatch(this, event_id)) { + .err => |err| return .{ .err = err }, + .result => {}, + } + // Reset event_id to start a new batch + event_id = 0; + } + this.watch_events[event_id] = createWatchEvent(event, @truncate(item_idx)); event_id += 1; } } } - if (event_id == 0) { + + // Process any remaining events in the final batch + if (event_id > 0) { + switch (processWatchEventBatch(this, event_id)) { + .err => |err| return .{ .err = err }, + .result => {}, + } + } + + return .success; +} + +fn processWatchEventBatch(this: *bun.Watcher, event_count: usize) bun.sys.Maybe(void) { + if (event_count == 0) { return .success; } - // log("event_id: {d}\n", .{event_id}); + // log("event_count: {d}\n", .{event_count}); - var all_events = this.watch_events[0..event_id]; + var all_events = this.watch_events[0..event_count]; std.sort.pdq(WatchEvent, all_events, {}, WatchEvent.sortByIndex); var last_event_index: usize = 0; diff --git a/test/cli/hot/watch-many-dirs.test.ts b/test/cli/hot/watch-many-dirs.test.ts new file mode 100644 index 0000000000..dad90166ad --- /dev/null +++ b/test/cli/hot/watch-many-dirs.test.ts @@ -0,0 +1,101 @@ +import { spawn } from "bun"; +import { describe, expect, test } from "bun:test"; +import { bunEnv, bunExe, forEachLine, isASAN, isCI, tempDirWithFiles } from "harness"; +import { mkdirSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; + +describe("--hot with many directories", () => { + // TODO: fix watcher thread exit handling so the main thread waits for the + // watcher thread to exit. This causes a crash inside the libc exit() function + // that triggers in ASAN. + test.skipIf(isCI && isASAN)( + "handles 129 directories being updated simultaneously", + async () => { + // Create initial test structure + const tmpdir = tempDirWithFiles("hot-many-dirs", { + "entry.js": `console.log('Initial load');`, + }); + + // Generate 129 directories with files + const dirCount = 129; + const maxCount = 3; + for (let i = 0; i < dirCount; i++) { + const dirName = `dir-${i.toString().padStart(4, "0")}`; + const dirPath = join(tmpdir, dirName); + mkdirSync(dirPath, { recursive: true }); + + // Create an index.js in each directory + writeFileSync(join(dirPath, "index.js"), `export const value${i} = ${i};`); + } + + // Create main index that imports all directories + const imports = Array.from({ length: dirCount }, (_, i) => { + const dirName = `dir-${i.toString().padStart(4, "0")}`; + return `import * as dir${i} from './${dirName}/index.js';`; + }).join("\n"); + + writeFileSync( + join(tmpdir, "entry.js"), + ` +${imports} +console.log('Loaded', ${dirCount}, 'directories'); +(globalThis.reloaded ??= 0); +if (globalThis.reloaded++ >= ${maxCount}) process.exit(0); +`, + ); + + // Start bun --hot + await using proc = spawn({ + cmd: [bunExe(), "--hot", "entry.js"], + cwd: tmpdir, + env: bunEnv, + stdout: "pipe", + stderr: "inherit", + }); + + const stdout = proc.stdout; + + const iter = forEachLine(stdout); + + // Wait for initial load + let { value: line } = await iter.next(); + expect(line).toContain(`Loaded ${dirCount} directories`); + + // Trigger maxCount reload cycles + let reloadCount = 0; + + for (let cycle = 0; cycle < maxCount; cycle++) { + // Update all files simultaneously + const timestamp = Date.now() + cycle; + const updatePromises = []; + + for (let i = 0; i < dirCount; i++) { + const dirName = `dir-${i.toString().padStart(4, "0")}`; + const filePath = join(tmpdir, dirName, "index.js"); + + updatePromises.push( + Bun.write(filePath, `export const value${i} = ${i};\nexport const timestamp${i} = ${timestamp};`), + ); + } + + // Wait for all updates to complete + await Promise.all(updatePromises); + + // Wait for reload message + ({ value: line } = await iter.next()); + expect(line).toContain(`Loaded ${dirCount} directories`); + reloadCount++; + } + + // Verify we got maxCount successful reloads + expect(reloadCount).toBe(maxCount); + + // Wait for the process to exit on its own after maxCount reloads + const exitCode = await proc.exited; + + // Should exit with 0 + expect(exitCode).toBe(0); + }, + 30000, + ); // 30 second timeout +});