From 60823348c5b27309ff9bd86ae3f02527dabea6de Mon Sep 17 00:00:00 2001 From: "taylor.fish" Date: Fri, 25 Jul 2025 18:12:12 -0700 Subject: [PATCH] Optimize `WaitGroup` implementation (#21260) `add` no longer locks a mutex, and `finish` no longer locks a mutex except for the last task. This could meaningfully improve performance in cases where we spawn a large number of tasks on a thread pool. This change doesn't alter the semantics of the type, unlike the standard library's `WaitGroup`, which also uses atomics but has to be explicitly reset. (For internal tracking: fixes ENG-19722) --------- Co-authored-by: Jarred Sumner --- scripts/sort-imports.ts | 39 ++++++++++++++++++++++++++----------- src/threading/WaitGroup.zig | 39 +++++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/scripts/sort-imports.ts b/scripts/sort-imports.ts index b1c05bfef8..b83c34cdc1 100755 --- a/scripts/sort-imports.ts +++ b/scripts/sort-imports.ts @@ -102,6 +102,10 @@ function parseDeclarations( continue; } + if (declarations.has(name)) { + unusedLineIndices.push(i); + continue; + } declarations.set(name, { whole: line, index: i, @@ -341,20 +345,33 @@ async function processFile(filePath: string): Promise { } } if (thisDeclaration) { - sortedLines[thisDeclaration.index] = DELETED_LINE; - } - if (thisDeclaration) { - let firstNonFileCommentLine = 0; - for (const line of sortedLines) { - if (line.startsWith("//!")) { - firstNonFileCommentLine++; - } else { + var onlyCommentsBeforeThis = true; + for (const [i, line] of sortedLines.entries()) { + if (i >= thisDeclaration.index) { + break; + } + if (line === "" || line === DELETED_LINE) { + continue; + } + if (!line.startsWith("//")) { + onlyCommentsBeforeThis = false; break; } } - const insert = [thisDeclaration.whole, ""]; - if (firstNonFileCommentLine > 0) insert.unshift(""); - sortedLines.splice(firstNonFileCommentLine, 0, ...insert); + if (!onlyCommentsBeforeThis) { + sortedLines[thisDeclaration.index] = DELETED_LINE; + let firstNonFileCommentLine = 0; + for (const line of sortedLines) { + if (line.startsWith("//!")) { + firstNonFileCommentLine++; + } else { + break; + } + } + const insert = [thisDeclaration.whole, ""]; + if (firstNonFileCommentLine > 0) insert.unshift(""); + sortedLines.splice(firstNonFileCommentLine, 0, ...insert); + } } fileContents = sortedLines.join("\n"); } diff --git a/src/threading/WaitGroup.zig b/src/threading/WaitGroup.zig index 580ac6dc1f..eeca160f0c 100644 --- a/src/threading/WaitGroup.zig +++ b/src/threading/WaitGroup.zig @@ -1,5 +1,3 @@ -const Self = @This(); - // This file contains code derived from the following source: // https://gist.github.com/kprotty/0d2dc3da4840341d6ff361b27bdac7dc#file-sync-zig // @@ -10,41 +8,47 @@ const Self = @This(); // The MIT license requires this copyright notice to be included in all copies // and substantial portions of the software. +const Self = @This(); + +raw_count: std.atomic.Value(usize) = .init(0), mutex: Mutex = .{}, cond: Condition = .{}, -active: usize = 0, pub fn init() Self { return .{}; } pub fn initWithCount(count: usize) Self { - return .{ .active = count }; + return .{ .raw_count = .init(count) }; } pub fn addUnsynchronized(self: *Self, n: usize) void { - self.active += n; + self.raw_count.raw += n; } pub fn add(self: *Self, n: usize) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.addUnsynchronized(n); + // Not .acquire because we don't need to synchronize with other tasks (each runs independently). + // Not .release because there are no side effects that other threads depend on when they see + // the *start* of a task (only finishing a task has such requirements). + _ = self.raw_count.fetchAdd(n, .monotonic); } pub fn addOne(self: *Self) void { - return self.add(1); + self.add(1); } pub fn finish(self: *Self) void { - { - self.mutex.lock(); - defer self.mutex.unlock(); + const old_count = self.raw_count.fetchSub(1, .acq_rel); + if (old_count > 1) return; - self.active -= 1; - if (self.active != 0) return; - } + // This is the last task, so we need to signal the condition. If we were to call `cond.signal` + // right now, a concurrent call to `wait` which has read a non-zero count (from before we + // decremented it above) but which has not yet called `cond.wait` will miss the signal and + // end up blocking forever. A thread in this state (in between reading the count and calling + // `cond.wait`) is necessarily holding the mutex, so by locking and unlocking the mutex here, + // we ensure that it reaches the call to `cond.wait` before we call `cond.signal`. + self.mutex.lock(); + self.mutex.unlock(); self.cond.signal(); } @@ -52,11 +56,12 @@ pub fn wait(self: *Self) void { self.mutex.lock(); defer self.mutex.unlock(); - while (self.active != 0) + while (self.raw_count.load(.acquire) > 0) self.cond.wait(&self.mutex); } const bun = @import("bun"); +const std = @import("std"); const Condition = bun.threading.Condition; const Mutex = bun.threading.Mutex;