From 3b2289d76c3c5eb0cfc20bec3b3198370d64da13 Mon Sep 17 00:00:00 2001 From: "taylor.fish" Date: Tue, 15 Jul 2025 01:16:40 -0700 Subject: [PATCH] Sync `Mutex` and `Futex` with upstream and port `std.Thread.Condition` (#21060) Co-authored-by: Jarred Sumner --- cmake/sources/ZigSources.txt | 6 +- src/bun.zig | 6 +- src/cli/create_command.zig | 2 +- src/thread_pool.zig | 2 +- src/threading.zig | 3 + src/threading/Condition.zig | 276 +++++++++++++++++++++++++ src/{futex.zig => threading/Futex.zig} | 81 ++++++-- src/{ => threading}/Mutex.zig | 21 +- 8 files changed, 367 insertions(+), 30 deletions(-) create mode 100644 src/threading.zig create mode 100644 src/threading/Condition.zig rename src/{futex.zig => threading/Futex.zig} (80%) rename src/{ => threading}/Mutex.zig (94%) diff --git a/cmake/sources/ZigSources.txt b/cmake/sources/ZigSources.txt index 2261399b49..a7ce947952 100644 --- a/cmake/sources/ZigSources.txt +++ b/cmake/sources/ZigSources.txt @@ -510,7 +510,6 @@ src/feature_flags.zig src/fmt.zig src/fs.zig src/fs/stat_hash.zig -src/futex.zig src/generated_perf_trace_events.zig src/generated_versions_list.zig src/glob.zig @@ -641,7 +640,6 @@ src/main_wasm.zig src/main.zig src/meta.zig src/multi_array_list.zig -src/Mutex.zig src/napi/napi.zig src/node_fallbacks.zig src/open.zig @@ -836,6 +834,10 @@ src/system_timer.zig src/test/fixtures.zig src/test/recover.zig src/thread_pool.zig +src/threading.zig +src/threading/Condition.zig +src/threading/Futex.zig +src/threading/Mutex.zig src/tmp.zig src/toml/toml_lexer.zig src/toml/toml_parser.zig diff --git a/src/bun.zig b/src/bun.zig index a39e0cb9b9..be32d13b87 100644 --- a/src/bun.zig +++ b/src/bun.zig @@ -1847,7 +1847,9 @@ pub const Loader = bundle_v2.Loader; pub const BundleV2 = bundle_v2.BundleV2; pub const ParseTask = bundle_v2.ParseTask; -pub const Mutex = @import("./Mutex.zig"); +pub const threading = @import("./threading.zig"); +pub const Mutex = threading.Mutex; +pub const Futex = threading.Futex; pub const UnboundedQueue = @import("./bun.js/unbounded_queue.zig").UnboundedQueue; pub fn threadlocalAllocator() std.mem.Allocator { @@ -2975,8 +2977,6 @@ pub fn SliceIterator(comptime T: type) type { }; } -pub const Futex = @import("./futex.zig"); - // TODO: migrate pub const ArenaAllocator = std.heap.ArenaAllocator; diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig index 38c158716f..56678e94ea 100644 --- a/src/cli/create_command.zig +++ b/src/cli/create_command.zig @@ -32,7 +32,7 @@ const clap = bun.clap; const Headers = bun.http.Headers; const CopyFile = @import("../copy_file.zig"); var bun_path_buf: bun.PathBuffer = undefined; -const Futex = @import("../futex.zig"); +const Futex = bun.Futex; const target_nextjs_version = "12.2.3"; pub var initialized_store = false; diff --git a/src/thread_pool.zig b/src/thread_pool.zig index de16fddf77..0e9ab1394c 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -4,7 +4,7 @@ const std = @import("std"); const bun = @import("bun"); const ThreadPool = @This(); -const Futex = @import("./futex.zig"); +const Futex = bun.threading.Futex; const Environment = bun.Environment; const assert = bun.assert; diff --git a/src/threading.zig b/src/threading.zig new file mode 100644 index 0000000000..df1ba2bd2a --- /dev/null +++ b/src/threading.zig @@ -0,0 +1,3 @@ +pub const Mutex = @import("./threading/Mutex.zig"); +pub const Futex = @import("./threading/Futex.zig"); +pub const Condition = @import("./threading/Condition.zig"); diff --git a/src/threading/Condition.zig b/src/threading/Condition.zig new file mode 100644 index 0000000000..c07a033b44 --- /dev/null +++ b/src/threading/Condition.zig @@ -0,0 +1,276 @@ +//! Copy of std.Thread.Condition, but uses Bun's Mutex and Futex. +//! Synchronized with std as of Zig 0.14.1. +//! +//! Condition variables are used with a Mutex to efficiently wait for an arbitrary condition to occur. +//! It does this by atomically unlocking the mutex, blocking the thread until notified, and finally re-locking the mutex. +//! Condition can be statically initialized and is at most `@sizeOf(u64)` large. +//! +//! Example: +//! ``` +//! var m = Mutex{}; +//! var c = Condition{}; +//! var predicate = false; +//! +//! fn consumer() void { +//! m.lock(); +//! defer m.unlock(); +//! +//! while (!predicate) { +//! c.wait(&m); +//! } +//! } +//! +//! fn producer() void { +//! { +//! m.lock(); +//! defer m.unlock(); +//! predicate = true; +//! } +//! c.signal(); +//! } +//! +//! const thread = try std.Thread.spawn(.{}, producer, .{}); +//! consumer(); +//! thread.join(); +//! ``` +//! +//! Note that condition variables can only reliably unblock threads that are sequenced before them using the same Mutex. +//! This means that the following is allowed to deadlock: +//! ``` +//! thread-1: mutex.lock() +//! thread-1: condition.wait(&mutex) +//! +//! thread-2: // mutex.lock() (without this, the following signal may not see the waiting thread-1) +//! thread-2: // mutex.unlock() (this is optional for correctness once locked above, as signal can be called while holding the mutex) +//! thread-2: condition.signal() +//! ``` + +const std = @import("std"); +const builtin = @import("builtin"); +const bun = @import("bun"); +const Condition = @This(); +const Mutex = bun.Mutex; + +const os = std.os; +const assert = bun.assert; +const Futex = bun.Futex; + +impl: Impl = .{}, + +/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return. +/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex. +/// +/// The Mutex must be locked by the caller's thread when this function is called. +/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite. +/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently. +/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex. +/// +/// A blocking call to wait() is unblocked from one of the following conditions: +/// - a spurious ("at random") wake up occurs +/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `wait()`. +/// +/// Given wait() can be interrupted spuriously, the blocking condition should be checked continuously +/// irrespective of any notifications from `signal()` or `broadcast()`. +pub fn wait(self: *Condition, mutex: *Mutex) void { + self.impl.wait(mutex, null) catch |err| switch (err) { + error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out + }; +} + +/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return. +/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex. +/// +/// The Mutex must be locked by the caller's thread when this function is called. +/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite. +/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently. +/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex. +/// +/// A blocking call to `timedWait()` is unblocked from one of the following conditions: +/// - a spurious ("at random") wake occurs +/// - the caller was blocked for around `timeout_ns` nanoseconds, in which `error.Timeout` is returned. +/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `timedWait()`. +/// +/// Given `timedWait()` can be interrupted spuriously, the blocking condition should be checked continuously +/// irrespective of any notifications from `signal()` or `broadcast()`. +pub fn timedWait(self: *Condition, mutex: *Mutex, timeout_ns: u64) error{Timeout}!void { + return self.impl.wait(mutex, timeout_ns); +} + +/// Unblocks at least one thread blocked in a call to `wait()` or `timedWait()` with a given Mutex. +/// The blocked thread must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking. +/// `signal()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads. +pub fn signal(self: *Condition) void { + self.impl.wake(.one); +} + +/// Unblocks all threads currently blocked in a call to `wait()` or `timedWait()` with a given Mutex. +/// The blocked threads must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking. +/// `broadcast()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads. +pub fn broadcast(self: *Condition) void { + self.impl.wake(.all); +} + +const Impl = if (builtin.os.tag == .windows) + WindowsImpl +else + FutexImpl; + +const Notify = enum { + one, // wake up only one thread + all, // wake up all threads +}; + +const WindowsImpl = struct { + condition: os.windows.CONDITION_VARIABLE = .{}, + + fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { + var timeout_overflowed = false; + var timeout_ms: os.windows.DWORD = os.windows.INFINITE; + + if (timeout) |timeout_ns| { + // Round the nanoseconds to the nearest millisecond, + // then saturating cast it to windows DWORD for use in kernel32 call. + const ms = (timeout_ns +| (std.time.ns_per_ms / 2)) / std.time.ns_per_ms; + timeout_ms = std.math.cast(os.windows.DWORD, ms) orelse std.math.maxInt(os.windows.DWORD); + + // Track if the timeout overflowed into INFINITE and make sure not to wait forever. + if (timeout_ms == os.windows.INFINITE) { + timeout_overflowed = true; + timeout_ms -= 1; + } + } + + if (builtin.mode == .Debug) { + // The internal state of the DebugMutex needs to be handled here as well. + mutex.impl.locking_thread.store(0, .unordered); + } + const rc = os.windows.kernel32.SleepConditionVariableSRW( + &self.condition, + if (builtin.mode == .Debug) &mutex.impl.impl.srwlock else &mutex.impl.srwlock, + timeout_ms, + 0, // the srwlock was assumed to acquired in exclusive mode not shared + ); + if (builtin.mode == .Debug) { + // The internal state of the DebugMutex needs to be handled here as well. + mutex.impl.locking_thread.store(std.Thread.getCurrentId(), .unordered); + } + + // Return error.Timeout if we know the timeout elapsed correctly. + if (rc == os.windows.FALSE) { + assert(os.windows.GetLastError() == .TIMEOUT); + if (!timeout_overflowed) return error.Timeout; + } + } + + fn wake(self: *Impl, comptime notify: Notify) void { + switch (notify) { + .one => os.windows.kernel32.WakeConditionVariable(&self.condition), + .all => os.windows.kernel32.WakeAllConditionVariable(&self.condition), + } + } +}; + +const FutexImpl = struct { + state: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + epoch: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + + const one_waiter = 1; + const waiter_mask = 0xffff; + + const one_signal = 1 << 16; + const signal_mask = 0xffff << 16; + + fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { + // Observe the epoch, then check the state again to see if we should wake up. + // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock: + // + // - T1: s = LOAD(&state) + // - T2: UPDATE(&s, signal) + // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch) + // - T1: e = LOAD(&epoch) (was reordered after the state load) + // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change) + // + // Acquire barrier to ensure the epoch load happens before the state load. + var epoch = self.epoch.load(.acquire); + var state = self.state.fetchAdd(one_waiter, .monotonic); + assert(state & waiter_mask != waiter_mask); + state += one_waiter; + + mutex.unlock(); + defer mutex.lock(); + + var futex_deadline = Futex.Deadline.init(timeout); + + while (true) { + futex_deadline.wait(&self.epoch, epoch) catch |err| switch (err) { + // On timeout, we must decrement the waiter we added above. + error.Timeout => { + while (true) { + // If there's a signal when we're timing out, consume it and report being woken up instead. + // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = self.state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; + } + + // Remove the waiter we added and officially return timed out. + const new_state = state - one_waiter; + state = self.state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err; + } + }, + }; + + epoch = self.epoch.load(.acquire); + state = self.state.load(.monotonic); + + // Try to wake up by consuming a signal and decremented the waiter we added previously. + // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = self.state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; + } + } + } + + fn wake(self: *Impl, comptime notify: Notify) void { + var state = self.state.load(.monotonic); + while (true) { + const waiters = (state & waiter_mask) / one_waiter; + const signals = (state & signal_mask) / one_signal; + + // Reserves which waiters to wake up by incrementing the signals count. + // Therefore, the signals count is always less than or equal to the waiters count. + // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters. + const wakeable = waiters - signals; + if (wakeable == 0) { + return; + } + + const to_wake = switch (notify) { + .one => 1, + .all => wakeable, + }; + + // Reserve the amount of waiters to wake by incrementing the signals count. + // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads. + const new_state = state + (one_signal * to_wake); + state = self.state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse { + // Wake up the waiting threads we reserved above by changing the epoch value. + // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it. + // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption. + // + // Release barrier ensures the signal being added to the state happens before the epoch is changed. + // If not, the waiting thread could potentially deadlock from missing both the state and epoch change: + // + // - T2: UPDATE(&epoch, 1) (reordered before the state change) + // - T1: e = LOAD(&epoch) + // - T1: s = LOAD(&state) + // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) + // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) + _ = self.epoch.fetchAdd(1, .release); + Futex.wake(&self.epoch, to_wake); + return; + }; + } + } +}; diff --git a/src/futex.zig b/src/threading/Futex.zig similarity index 80% rename from src/futex.zig rename to src/threading/Futex.zig index 76ae3ab0ff..22655e3267 100644 --- a/src/futex.zig +++ b/src/threading/Futex.zig @@ -1,4 +1,6 @@ //! This is a copy-pasta of std.Thread.Futex, except without `unreachable` +//! Synchronized with std as of Zig 0.14.1 +//! //! A mechanism used to block (`wait`) and unblock (`wake`) threads using a //! 32bit memory address as hints. //! @@ -87,7 +89,8 @@ const UnsupportedImpl = struct { return unsupported(.{ ptr, max_waiters }); } - fn unsupported(_: anytype) noreturn { + fn unsupported(unused: anytype) noreturn { + _ = unused; @compileError("Unsupported operating system " ++ @tagName(builtin.target.os.tag)); } }; @@ -161,7 +164,10 @@ const DarwinImpl = struct { var timeout_overflowed = false; const addr: *const anyopaque = ptr; - const flags: c.UL = .{ .op = .COMPARE_AND_WAIT, .NO_ERRNO = true }; + const flags: c.UL = .{ + .op = .COMPARE_AND_WAIT, + .NO_ERRNO = true, + }; const status = blk: { if (supports_ulock_wait2) { break :blk c.__ulock_wait2(flags, addr, expect, timeout_ns, 0); @@ -193,8 +199,11 @@ const DarwinImpl = struct { } fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { - var flags: c.UL = .{ .op = .COMPARE_AND_WAIT, .NO_ERRNO = true }; - if (max_waiters > 1) flags.WAKE_ALL = true; + const flags: c.UL = .{ + .op = .COMPARE_AND_WAIT, + .NO_ERRNO = true, + .WAKE_ALL = max_waiters > 1, + }; while (true) { const addr: *const anyopaque = ptr; @@ -215,13 +224,11 @@ const DarwinImpl = struct { // https://man7.org/linux/man-pages/man2/futex.2.html const LinuxImpl = struct { fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { - const ts: linux.timespec = if (timeout) |timeout_ns| - .{ - .sec = @intCast(timeout_ns / std.time.ns_per_s), - .nsec = @intCast(timeout_ns % std.time.ns_per_s), - } - else - undefined; + var ts: linux.timespec = undefined; + if (timeout) |timeout_ns| { + ts.sec = @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s)); + ts.nsec = @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s)); + } const rc = linux.futex_wait( @as(*const i32, @ptrCast(&ptr.raw)), @@ -266,7 +273,7 @@ const WasmImpl = struct { @compileError("WASI target missing cpu feature 'atomics'"); } const to: i64 = if (timeout) |to| @intCast(to) else -1; - const result = asm ( + const result = asm volatile ( \\local.get %[ptr] \\local.get %[expected] \\local.get %[timeout] @@ -290,7 +297,7 @@ const WasmImpl = struct { @compileError("WASI target missing cpu feature 'atomics'"); } assert(max_waiters != 0); - const woken_count = asm ( + const woken_count = asm volatile ( \\local.get %[ptr] \\local.get %[waiters] \\memory.atomic.notify 0 @@ -302,3 +309,51 @@ const WasmImpl = struct { _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled } }; + +/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout. +/// +/// Futex's timedWait() api uses a relative duration which suffers from over-waiting +/// when used in a loop which is often required due to the possibility of spurious wakeups. +/// +/// Deadline instead converts the relative timeout to an absolute one so that multiple calls +/// to Futex timedWait() can block for and report more accurate error.Timeouts. +pub const Deadline = struct { + timeout: ?u64, + started: std.time.Timer, + + /// Create the deadline to expire after the given amount of time in nanoseconds passes. + /// Pass in `null` to have the deadline call `Futex.wait()` and never expire. + pub fn init(expires_in_ns: ?u64) Deadline { + var deadline: Deadline = undefined; + deadline.timeout = expires_in_ns; + + // std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout. + if (deadline.timeout != null) { + deadline.started = std.time.Timer.start() catch unreachable; + } + + return deadline; + } + + /// Wait until either: + /// - the `ptr`'s value changes from `expect`. + /// - `Futex.wake()` is called on the `ptr`. + /// - A spurious wake occurs. + /// - The deadline expires; In which case `error.Timeout` is returned. + pub fn wait(self: *Deadline, ptr: *const atomic.Value(u32), expect: u32) error{Timeout}!void { + @branchHint(.cold); + + // Check if we actually have a timeout to wait until. + // If not just wait "forever". + const timeout_ns = self.timeout orelse { + return Futex.waitForever(ptr, expect); + }; + + // Get how much time has passed since we started waiting + // then subtract that from the init() timeout to get how much longer to wait. + // Use overflow to detect when we've been waiting longer than the init() timeout. + const elapsed_ns = self.started.read(); + const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0; + return Futex.wait(ptr, expect, until_timeout_ns); + } +}; diff --git a/src/Mutex.zig b/src/threading/Mutex.zig similarity index 94% rename from src/Mutex.zig rename to src/threading/Mutex.zig index ea36c70996..9c53158bbc 100644 --- a/src/Mutex.zig +++ b/src/threading/Mutex.zig @@ -1,6 +1,8 @@ //! This is a copy-pasta of std.Thread.Mutex with some changes. //! - No assert with unreachable //! - uses bun.Futex instead of std.Thread.Futex +//! Synchronized with std as of Zig 0.14.1 +//! //! Mutex is a synchronization primitive which enforces atomic access to a shared region of code known as the "critical section". //! It does this by blocking ensuring only one thread is in the critical section at any given point in time by blocking the others. //! Mutex can be statically initialized and is at most `@sizeOf(u64)` large. @@ -56,13 +58,12 @@ const Impl = if (builtin.mode == .Debug and !builtin.single_threaded) else ReleaseImpl; -pub const ReleaseImpl = - if (builtin.os.tag == .windows) - WindowsImpl - else if (builtin.os.tag.isDarwin()) - DarwinImpl - else - FutexImpl; +pub const ReleaseImpl = if (builtin.os.tag == .windows) + WindowsImpl +else if (builtin.os.tag.isDarwin()) + DarwinImpl +else + FutexImpl; pub const ExternImpl = ReleaseImpl.Type; @@ -94,8 +95,8 @@ const DebugImpl = struct { } }; -// SRWLOCK on windows is almost always faster than Futex solution. -// It also implements an efficient Condition with requeue support for us. +/// SRWLOCK on windows is almost always faster than Futex solution. +/// It also implements an efficient Condition with requeue support for us. const WindowsImpl = struct { srwlock: Type = .{}, @@ -116,7 +117,7 @@ const WindowsImpl = struct { pub const Type = windows.SRWLOCK; }; -// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions. +/// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions. const DarwinImpl = struct { oul: Type = .{},