From 2465ccae53ef430cfd9d07c2161011d77aa46139 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Wed, 8 Jan 2025 21:09:27 -0800 Subject: [PATCH] Re-sync our Mutex implementation with zig stdlib (#16271) --- src/Mutex.zig | 211 ++++++++++ src/Progress.zig | 5 +- src/StandaloneModuleGraph.zig | 2 +- src/allocators.zig | 2 +- src/allocators/mimalloc_arena.zig | 2 +- src/bake/DevServer.zig | 2 +- src/bun.js/ConsoleObject.zig | 4 +- src/bun.js/api/bun/dns_resolver.zig | 2 +- src/bun.js/bindings/exports.zig | 2 +- src/bun.js/event_loop.zig | 2 +- src/bun.js/javascript.zig | 8 +- src/bun.js/module_loader.zig | 2 +- src/bun.js/node/fs_events.zig | 2 +- src/bun.js/node/node_fs.zig | 2 +- src/bun.js/node/node_fs_stat_watcher.zig | 2 +- src/bun.js/node/node_fs_watcher.zig | 2 +- src/bun.js/node/path_watcher.zig | 2 +- src/bun.js/node/win_watcher.zig | 2 +- src/bun.js/rare_data.zig | 4 +- src/bun.js/uuid.zig | 2 +- src/bun.js/webcore/ObjectURLRegistry.zig | 2 +- src/bun.js/webcore/response.zig | 2 +- src/bun.zig | 7 +- src/bundler/bundle_v2.zig | 4 +- src/cache.zig | 1 - src/cli/create_command.zig | 2 +- src/cli/install_completions_command.zig | 2 +- src/cli/upgrade_command.zig | 2 +- src/crash_handler.zig | 11 +- src/deps/uws.zig | 2 +- src/fs.zig | 2 +- src/futex.zig | 508 +++++++++-------------- src/http.zig | 6 +- src/http/zlib.zig | 2 +- src/install/install.zig | 4 +- src/install/lockfile.zig | 2 +- src/js_ast.zig | 3 - src/js_printer.zig | 2 +- src/lock.zig | 150 ------- src/napi/napi.zig | 4 +- src/output.zig | 2 +- src/resolver/resolver.zig | 2 +- src/s3/download_stream.zig | 2 +- src/shell/interpreter.zig | 6 +- src/shell/shell.zig | 2 +- src/thread_pool.zig | 53 +-- src/transpiler.zig | 2 +- src/watcher.zig | 9 +- 48 files changed, 484 insertions(+), 574 deletions(-) create mode 100644 src/Mutex.zig delete mode 100644 src/lock.zig diff --git a/src/Mutex.zig b/src/Mutex.zig new file mode 100644 index 0000000000..f344fd2f5e --- /dev/null +++ b/src/Mutex.zig @@ -0,0 +1,211 @@ +//! This is a copy-pasta of std.Thread.Mutex with some changes. +//! - No assert with unreachable +//! - uses bun.Futex instead of std.Thread.Futex +//! Mutex is a synchronization primitive which enforces atomic access to a shared region of code known as the "critical section". +//! It does this by blocking ensuring only one thread is in the critical section at any given point in time by blocking the others. +//! Mutex can be statically initialized and is at most `@sizeOf(u64)` large. +//! Use `lock()` or `tryLock()` to enter the critical section and `unlock()` to leave it. +//! +//! Example: +//! ``` +//! var m = Mutex{}; +//! +//! { +//! m.lock(); +//! defer m.unlock(); +//! // ... critical section code +//! } +//! +//! if (m.tryLock()) { +//! defer m.unlock(); +//! // ... critical section code +//! } +//! ``` + +const std = @import("std"); +const builtin = @import("builtin"); +const bun = @import("root").bun; +const assert = bun.assert; +const testing = std.testing; +const Thread = std.Thread; +const Futex = bun.Futex; + +impl: Impl = .{}, + +/// Tries to acquire the mutex without blocking the caller's thread. +/// Returns `false` if the calling thread would have to block to acquire it. +/// Otherwise, returns `true` and the caller should `unlock()` the Mutex to release it. +pub fn tryLock(self: *Mutex) bool { + return self.impl.tryLock(); +} + +/// Acquires the mutex, blocking the caller's thread until it can. +/// It is undefined behavior if the mutex is already held by the caller's thread. +/// Once acquired, call `unlock()` on the Mutex to release it. +pub fn lock(self: *Mutex) void { + self.impl.lock(); +} + +/// Releases the mutex which was previously acquired with `lock()` or `tryLock()`. +/// It is undefined behavior if the mutex is unlocked from a different thread that it was locked from. +pub fn unlock(self: *Mutex) void { + self.impl.unlock(); +} + +const Impl = if (builtin.mode == .Debug and !builtin.single_threaded) + DebugImpl +else + ReleaseImpl; + +const ReleaseImpl = + if (builtin.os.tag == .windows) + WindowsImpl +else if (builtin.os.tag.isDarwin()) + DarwinImpl +else + FutexImpl; + +const DebugImpl = struct { + locking_thread: std.atomic.Value(Thread.Id) = std.atomic.Value(Thread.Id).init(0), // 0 means it's not locked. + impl: ReleaseImpl = .{}, + + inline fn tryLock(self: *@This()) bool { + const locking = self.impl.tryLock(); + if (locking) { + self.locking_thread.store(Thread.getCurrentId(), .unordered); + } + return locking; + } + + inline fn lock(self: *@This()) void { + const current_id = Thread.getCurrentId(); + if (self.locking_thread.load(.unordered) == current_id and current_id != 0) { + @panic("Deadlock detected"); + } + self.impl.lock(); + self.locking_thread.store(current_id, .unordered); + } + + inline fn unlock(self: *@This()) void { + assert(self.locking_thread.load(.unordered) == Thread.getCurrentId()); + self.locking_thread.store(0, .unordered); + self.impl.unlock(); + } +}; + +// SRWLOCK on windows is almost always faster than Futex solution. +// It also implements an efficient Condition with requeue support for us. +const WindowsImpl = struct { + srwlock: windows.SRWLOCK = .{}, + + fn tryLock(self: *@This()) bool { + return windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != windows.FALSE; + } + + fn lock(self: *@This()) void { + windows.kernel32.AcquireSRWLockExclusive(&self.srwlock); + } + + fn unlock(self: *@This()) void { + windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock); + } + + const windows = std.os.windows; +}; + +// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions. +const DarwinImpl = struct { + oul: c.os_unfair_lock = .{}, + + fn tryLock(self: *@This()) bool { + return c.os_unfair_lock_trylock(&self.oul); + } + + fn lock(self: *@This()) void { + c.os_unfair_lock_lock(&self.oul); + } + + fn unlock(self: *@This()) void { + c.os_unfair_lock_unlock(&self.oul); + } + + const c = std.c; +}; + +const FutexImpl = struct { + state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked), + + const unlocked: u32 = 0b00; + const locked: u32 = 0b01; + const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below + + fn lock(self: *@This()) void { + if (!self.tryLock()) + self.lockSlow(); + } + + fn tryLock(self: *@This()) bool { + // On x86, use `lock bts` instead of `lock cmpxchg` as: + // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048 + // - `lock bts` is smaller instruction-wise which makes it better for inlining + if (comptime builtin.target.cpu.arch.isX86()) { + const locked_bit = @ctz(locked); + return self.state.bitSet(locked_bit, .acquire) == 0; + } + + // Acquire barrier ensures grabbing the lock happens before the critical section + // and that the previous lock holder's critical section happens before we grab the lock. + return self.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null; + } + + fn lockSlow(self: *@This()) void { + @setCold(true); + + // Avoid doing an atomic swap below if we already know the state is contended. + // An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily. + if (self.state.load(.monotonic) == contended) { + Futex.waitForever(&self.state, contended); + } + + // Try to acquire the lock while also telling the existing lock holder that there are threads waiting. + // + // Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`. + // If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock. + // The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake + // but this is better than having to wake all waiting threads on mutex unlock. + // + // Acquire barrier ensures grabbing the lock happens before the critical section + // and that the previous lock holder's critical section happens before we grab the lock. + while (self.state.swap(contended, .acquire) != unlocked) { + Futex.waitForever(&self.state, contended); + } + } + + fn unlock(self: *@This()) void { + // Unlock the mutex and wake up a waiting thread if any. + // + // A waiting thread will acquire with `contended` instead of `locked` + // which ensures that it wakes up another thread on the next unlock(). + // + // Release barrier ensures the critical section happens before we let go of the lock + // and that our critical section happens before the next lock holder grabs the lock. + const state = self.state.swap(unlocked, .release); + assert(state != unlocked); + + if (state == contended) { + Futex.wake(&self.state, 1); + } + } +}; + +const Mutex = @This(); + +pub fn spinCycle() void {} + +export fn Bun__lock(ptr: *Mutex) void { + ptr.lock(); +} + +export fn Bun__unlock(ptr: *Mutex) void { + ptr.unlock(); +} diff --git a/src/Progress.zig b/src/Progress.zig index 822feb7576..9fbbe12a77 100644 --- a/src/Progress.zig +++ b/src/Progress.zig @@ -18,8 +18,9 @@ const std = @import("std"); const builtin = @import("builtin"); const windows = std.os.windows; const testing = std.testing; -const assert = (std.debug).assert; +const assert = bun.assert; const Progress = @This(); +const bun = @import("root").bun; /// `null` if the current node (and its children) should /// not print on update() @@ -64,7 +65,7 @@ done: bool = true, /// Protects the `refresh` function, as well as `node.recently_updated_child`. /// Without this, callsites would call `Node.end` and then free `Node` memory /// while it was still being accessed by the `refresh` function. -update_mutex: std.Thread.Mutex = .{}, +update_mutex: bun.Mutex = .{}, /// Keeps track of how many columns in the terminal have been output, so that /// we can move the cursor back later. diff --git a/src/StandaloneModuleGraph.zig b/src/StandaloneModuleGraph.zig index f067674855..aae3c32a17 100644 --- a/src/StandaloneModuleGraph.zig +++ b/src/StandaloneModuleGraph.zig @@ -168,7 +168,7 @@ pub const StandaloneModuleGraph = struct { none, /// It probably is not possible to run two decoding jobs on the same file - var init_lock: bun.Lock = .{}; + var init_lock: bun.Mutex = .{}; pub fn load(this: *LazySourceMap) ?*SourceMap.ParsedSourceMap { init_lock.lock(); diff --git a/src/allocators.zig b/src/allocators.zig index 32b3e5fd8a..a71791466a 100644 --- a/src/allocators.zig +++ b/src/allocators.zig @@ -267,7 +267,7 @@ pub fn BSSList(comptime ValueType: type, comptime _count: anytype) type { }; } -const Mutex = @import("./lock.zig").Lock; +const Mutex = bun.Mutex; /// Append-only list. /// Stores an initial count in .bss section of the object file diff --git a/src/allocators/mimalloc_arena.zig b/src/allocators/mimalloc_arena.zig index 48d0a30acc..d249ee6f6e 100644 --- a/src/allocators/mimalloc_arena.zig +++ b/src/allocators/mimalloc_arena.zig @@ -76,7 +76,7 @@ pub const GlobalArena = struct { const ArenaRegistry = struct { arenas: std.AutoArrayHashMap(?*mimalloc.Heap, std.Thread.Id) = std.AutoArrayHashMap(?*mimalloc.Heap, std.Thread.Id).init(bun.default_allocator), - mutex: std.Thread.Mutex = .{}, + mutex: bun.Mutex = .{}, var registry = ArenaRegistry{}; diff --git a/src/bake/DevServer.zig b/src/bake/DevServer.zig index 09c6cbf997..096220b9c7 100644 --- a/src/bake/DevServer.zig +++ b/src/bake/DevServer.zig @@ -4446,7 +4446,7 @@ pub const EntryPointList = struct { const std = @import("std"); const Allocator = std.mem.Allocator; -const Mutex = std.Thread.Mutex; +const Mutex = bun.Mutex; const ArrayListUnmanaged = std.ArrayListUnmanaged; const AutoArrayHashMapUnmanaged = std.AutoArrayHashMapUnmanaged; diff --git a/src/bun.js/ConsoleObject.zig b/src/bun.js/ConsoleObject.zig index 727c707559..cadd785ed9 100644 --- a/src/bun.js/ConsoleObject.zig +++ b/src/bun.js/ConsoleObject.zig @@ -72,8 +72,8 @@ pub const MessageType = enum(u32) { _, }; -var stderr_mutex: bun.Lock = .{}; -var stdout_mutex: bun.Lock = .{}; +var stderr_mutex: bun.Mutex = .{}; +var stdout_mutex: bun.Mutex = .{}; threadlocal var stderr_lock_count: u16 = 0; threadlocal var stdout_lock_count: u16 = 0; diff --git a/src/bun.js/api/bun/dns_resolver.zig b/src/bun.js/api/bun/dns_resolver.zig index 44a36a9e02..4151f720e6 100644 --- a/src/bun.js/api/bun/dns_resolver.zig +++ b/src/bun.js/api/bun/dns_resolver.zig @@ -1239,7 +1239,7 @@ pub const InternalDNS = struct { const GlobalCache = struct { const MAX_ENTRIES = 256; - lock: bun.Lock = .{}, + lock: bun.Mutex = .{}, cache: [MAX_ENTRIES]*Request = undefined, len: usize = 0, diff --git a/src/bun.js/bindings/exports.zig b/src/bun.js/bindings/exports.zig index 2fead91daa..a6b716c523 100644 --- a/src/bun.js/bindings/exports.zig +++ b/src/bun.js/bindings/exports.zig @@ -373,7 +373,7 @@ pub const Process = extern struct { pub const shim = Shimmer("Bun", "Process", @This()); pub const name = "Process"; pub const namespace = shim.namespace; - var title_mutex = std.Thread.Mutex{}; + var title_mutex = bun.Mutex{}; pub fn getTitle(_: *JSGlobalObject, title: *ZigString) callconv(.C) void { title_mutex.lock(); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index d31753a19e..232be9a227 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -3,7 +3,7 @@ const JSC = bun.JSC; const JSGlobalObject = JSC.JSGlobalObject; const VirtualMachine = JSC.VirtualMachine; const Allocator = std.mem.Allocator; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const bun = @import("root").bun; const Environment = bun.Environment; const Fetch = JSC.WebCore.Fetch; diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 57bd18ade7..13b08f72bd 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -92,7 +92,7 @@ const TaggedPointerUnion = @import("../tagged_pointer.zig").TaggedPointerUnion; const Task = JSC.Task; pub const Buffer = MarkedArrayBuffer; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const BuildMessage = JSC.BuildMessage; const ResolveMessage = JSC.ResolveMessage; const Async = bun.Async; @@ -123,7 +123,7 @@ const uv = bun.windows.libuv; pub const SavedSourceMap = struct { /// This is a pointer to the map located on the VirtualMachine struct map: *HashTable, - mutex: bun.Lock = .{}, + mutex: bun.Mutex = .{}, pub const vlq_offset = 24; @@ -1604,7 +1604,7 @@ pub const VirtualMachine = struct { Debugger.log("spin", .{}); while (futex_atomic.load(.monotonic) > 0) { - std.Thread.Futex.wait(&futex_atomic, 1); + bun.Futex.waitForever(&futex_atomic, 1); } if (comptime Environment.enable_logs) Debugger.log("waitForDebugger: {}", .{Output.ElapsedFormatter{ @@ -1768,7 +1768,7 @@ pub const VirtualMachine = struct { log("wake", .{}); futex_atomic.store(0, .monotonic); - std.Thread.Futex.wake(&futex_atomic, 1); + bun.Futex.wake(&futex_atomic, 1); other_vm.eventLoop().wakeup(); diff --git a/src/bun.js/module_loader.zig b/src/bun.js/module_loader.zig index db12727318..fe982165b3 100644 --- a/src/bun.js/module_loader.zig +++ b/src/bun.js/module_loader.zig @@ -120,7 +120,7 @@ fn dumpSourceStringFailiable(vm: *VirtualMachine, specifier: string, written: [] const BunDebugHolder = struct { pub var dir: ?std.fs.Dir = null; - pub var lock: bun.Lock = .{}; + pub var lock: bun.Mutex = .{}; }; BunDebugHolder.lock.lock(); diff --git a/src/bun.js/node/fs_events.zig b/src/bun.js/node/fs_events.zig index fd019882fb..9953527a6e 100644 --- a/src/bun.js/node/fs_events.zig +++ b/src/bun.js/node/fs_events.zig @@ -1,7 +1,7 @@ const std = @import("std"); const bun = @import("root").bun; const Environment = bun.Environment; -const Mutex = @import("../../lock.zig").Lock; +const Mutex = bun.Mutex; const sync = @import("../../sync.zig"); const Semaphore = sync.Semaphore; const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue; diff --git a/src/bun.js/node/node_fs.zig b/src/bun.js/node/node_fs.zig index 8086084209..d0500f31cb 100644 --- a/src/bun.js/node/node_fs.zig +++ b/src/bun.js/node/node_fs.zig @@ -950,7 +950,7 @@ pub const AsyncReaddirRecursiveTask = struct { root_path: PathString = PathString.empty, pending_err: ?Syscall.Error = null, - pending_err_mutex: bun.Lock = .{}, + pending_err_mutex: bun.Mutex = .{}, pub usingnamespace bun.New(@This()); diff --git a/src/bun.js/node/node_fs_stat_watcher.zig b/src/bun.js/node/node_fs_stat_watcher.zig index 55418ffca8..8071bfbc2c 100644 --- a/src/bun.js/node/node_fs_stat_watcher.zig +++ b/src/bun.js/node/node_fs_stat_watcher.zig @@ -4,7 +4,7 @@ const bun = @import("root").bun; const Fs = @import("../../fs.zig"); const Path = @import("../../resolver/resolve_path.zig"); const Encoder = JSC.WebCore.Encoder; -const Mutex = @import("../../lock.zig").Lock; +const Mutex = bun.Mutex; const uws = @import("../../deps/uws.zig"); const PathWatcher = @import("./path_watcher.zig"); diff --git a/src/bun.js/node/node_fs_watcher.zig b/src/bun.js/node/node_fs_watcher.zig index f2ccb8258c..05cf2b4987 100644 --- a/src/bun.js/node/node_fs_watcher.zig +++ b/src/bun.js/node/node_fs_watcher.zig @@ -4,7 +4,7 @@ const bun = @import("root").bun; const Fs = @import("../../fs.zig"); const Path = @import("../../resolver/resolve_path.zig"); const Encoder = JSC.WebCore.Encoder; -const Mutex = @import("../../lock.zig").Lock; +const Mutex = bun.Mutex; const VirtualMachine = JSC.VirtualMachine; const EventLoop = JSC.EventLoop; diff --git a/src/bun.js/node/path_watcher.zig b/src/bun.js/node/path_watcher.zig index c5045fc542..4dd3aae178 100644 --- a/src/bun.js/node/path_watcher.zig +++ b/src/bun.js/node/path_watcher.zig @@ -3,7 +3,7 @@ const std = @import("std"); const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue; const Path = @import("../../resolver/resolve_path.zig"); const Fs = @import("../../fs.zig"); -const Mutex = @import("../../lock.zig").Lock; +const Mutex = bun.Mutex; const FSEvents = @import("./fs_events.zig"); const bun = @import("root").bun; diff --git a/src/bun.js/node/win_watcher.zig b/src/bun.js/node/win_watcher.zig index 9469ccc62a..93a96faf4a 100644 --- a/src/bun.js/node/win_watcher.zig +++ b/src/bun.js/node/win_watcher.zig @@ -4,7 +4,7 @@ const windows = bun.windows; const uv = windows.libuv; const Path = @import("../../resolver/resolve_path.zig"); const Fs = @import("../../fs.zig"); -const Mutex = @import("../../lock.zig").Lock; +const Mutex = bun.Mutex; const string = bun.string; const JSC = bun.JSC; const VirtualMachine = JSC.VirtualMachine; diff --git a/src/bun.js/rare_data.zig b/src/bun.js/rare_data.zig index 5bad28c278..3cc1880157 100644 --- a/src/bun.js/rare_data.zig +++ b/src/bun.js/rare_data.zig @@ -44,7 +44,7 @@ mime_types: ?bun.http.MimeType.Map = null, node_fs_stat_watcher_scheduler: ?*StatWatcherScheduler = null, listening_sockets_for_watch_mode: std.ArrayListUnmanaged(bun.FileDescriptor) = .{}, -listening_sockets_for_watch_mode_lock: bun.Lock = .{}, +listening_sockets_for_watch_mode_lock: bun.Mutex = .{}, temp_pipe_read_buffer: ?*PipeReadBuffer = null, @@ -55,7 +55,7 @@ const DIGESTED_HMAC_256_LEN = 32; pub const AWSSignatureCache = struct { cache: bun.StringArrayHashMap([DIGESTED_HMAC_256_LEN]u8) = bun.StringArrayHashMap([DIGESTED_HMAC_256_LEN]u8).init(bun.default_allocator), date: u64 = 0, - lock: bun.Lock = .{}, + lock: bun.Mutex = .{}, pub fn clean(this: *@This()) void { for (this.cache.keys()) |cached_key| { diff --git a/src/bun.js/uuid.zig b/src/bun.js/uuid.zig index 0d59acd532..428e12b5cc 100644 --- a/src/bun.js/uuid.zig +++ b/src/bun.js/uuid.zig @@ -140,7 +140,7 @@ pub fn newV4() UUID { pub const UUID7 = struct { bytes: [16]u8, - var uuid_v7_lock = bun.Lock{}; + var uuid_v7_lock = bun.Mutex{}; var uuid_v7_last_timestamp: std.atomic.Value(u64) = .{ .raw = 0 }; var uuid_v7_counter: std.atomic.Value(u32) = .{ .raw = 0 }; diff --git a/src/bun.js/webcore/ObjectURLRegistry.zig b/src/bun.js/webcore/ObjectURLRegistry.zig index 846c1c0434..5bf2ed7803 100644 --- a/src/bun.js/webcore/ObjectURLRegistry.zig +++ b/src/bun.js/webcore/ObjectURLRegistry.zig @@ -5,7 +5,7 @@ const UUID = bun.UUID; const assert = bun.assert; const ObjectURLRegistry = @This(); -lock: bun.Lock = .{}, +lock: bun.Mutex = .{}, map: std.AutoHashMap(UUID, *RegistryEntry) = std.AutoHashMap(UUID, *RegistryEntry).init(bun.default_allocator), pub const RegistryEntry = struct { diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 341e8c01aa..1e1341081f 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -41,7 +41,7 @@ const JSPrinter = bun.js_printer; const picohttp = bun.picohttp; const StringJoiner = bun.StringJoiner; const uws = bun.uws; -const Mutex = @import("../../lock.zig").Lock; +const Mutex = bun.Mutex; const InlineBlob = JSC.WebCore.InlineBlob; const AnyBlob = JSC.WebCore.AnyBlob; diff --git a/src/bun.zig b/src/bun.zig index 0dd0dfdfad..d3351ed033 100644 --- a/src/bun.zig +++ b/src/bun.zig @@ -1963,7 +1963,8 @@ pub const bundle_v2 = @import("./bundler/bundle_v2.zig"); pub const BundleV2 = bundle_v2.BundleV2; pub const ParseTask = bundle_v2.ParseTask; -pub const Lock = @import("./lock.zig").Lock; +pub const Lock = @compileError("Use bun.Mutex instead"); +pub const Mutex = @import("./Mutex.zig"); pub const UnboundedQueue = @import("./bun.js/unbounded_queue.zig").UnboundedQueue; pub fn threadlocalAllocator() std.mem.Allocator { @@ -3439,7 +3440,7 @@ pub fn selfExePath() ![:0]u8 { 4096 + 1 // + 1 for the null terminator ]u8 = undefined; var len: usize = 0; - var lock: Lock = .{}; + var lock: Mutex = .{}; pub fn load() ![:0]u8 { const init = try std.fs.selfExePath(&value); @@ -4075,7 +4076,7 @@ pub fn Once(comptime f: anytype) type { done: bool = false, payload: Return = undefined, - mutex: std.Thread.Mutex = .{}, + mutex: bun.Mutex = .{}, /// Call the function `f`. /// If `call` is invoked multiple times `f` will be executed only the diff --git a/src/bundler/bundle_v2.zig b/src/bundler/bundle_v2.zig index 6cd32b15dd..55ce5fe86e 100644 --- a/src/bundler/bundle_v2.zig +++ b/src/bundler/bundle_v2.zig @@ -93,7 +93,7 @@ const OOM = bun.OOM; const HTMLScanner = @import("../HTMLScanner.zig"); const Router = @import("../router.zig"); const isPackagePath = _resolver.isPackagePath; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const NodeFallbackModules = @import("../node_fallbacks.zig"); const CacheEntry = @import("../cache.zig").Fs.Entry; const Analytics = @import("../analytics/analytics_thread.zig"); @@ -141,7 +141,7 @@ fn tracer(comptime src: std.builtin.SourceLocation, comptime name: [:0]const u8) pub const ThreadPool = struct { pool: *ThreadPoolLib = undefined, workers_assignments: std.AutoArrayHashMap(std.Thread.Id, *Worker) = std.AutoArrayHashMap(std.Thread.Id, *Worker).init(bun.default_allocator), - workers_assignments_lock: bun.Lock = .{}, + workers_assignments_lock: bun.Mutex = .{}, v2: *BundleV2 = undefined, diff --git a/src/cache.zig b/src/cache.zig index ee164f1258..96ecf3484c 100644 --- a/src/cache.zig +++ b/src/cache.zig @@ -20,7 +20,6 @@ const Define = @import("./defines.zig").Define; const std = @import("std"); const fs = @import("./fs.zig"); const sync = @import("sync.zig"); -const Mutex = @import("./lock.zig").Lock; const import_record = @import("./import_record.zig"); diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig index d611d8e259..2723e33f5a 100644 --- a/src/cli/create_command.zig +++ b/src/cli/create_command.zig @@ -38,7 +38,7 @@ const DotEnv = @import("../env_loader.zig"); const NPMClient = @import("../which_npm_client.zig").NPMClient; const which = @import("../which.zig").which; const clap = bun.clap; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const Headers = bun.http.Headers; const CopyFile = @import("../copy_file.zig"); var bun_path_buf: bun.PathBuffer = undefined; diff --git a/src/cli/install_completions_command.zig b/src/cli/install_completions_command.zig index ba1053397d..ed9e5d02ac 100644 --- a/src/cli/install_completions_command.zig +++ b/src/cli/install_completions_command.zig @@ -35,7 +35,7 @@ const DotEnv = @import("../env_loader.zig"); const NPMClient = @import("../which_npm_client.zig").NPMClient; const which = @import("../which.zig").which; const clap = bun.clap; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const Headers = bun.http.Headers; const CopyFile = @import("../copy_file.zig"); const ShellCompletions = @import("./shell_completions.zig"); diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig index 1ce8301f15..f39e4b906e 100644 --- a/src/cli/upgrade_command.zig +++ b/src/cli/upgrade_command.zig @@ -36,7 +36,7 @@ const JSPrinter = bun.js_printer; const DotEnv = @import("../env_loader.zig"); const which = @import("../which.zig").which; const clap = bun.clap; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const Headers = bun.http.Headers; const CopyFile = @import("../copy_file.zig"); diff --git a/src/crash_handler.zig b/src/crash_handler.zig index 15d6821ec6..b8d4e5a4e4 100644 --- a/src/crash_handler.zig +++ b/src/crash_handler.zig @@ -44,7 +44,8 @@ var has_printed_message = false; var panicking = std.atomic.Value(u8).init(0); // Locked to avoid interleaving panic messages from multiple threads. -var panic_mutex = std.Thread.Mutex{}; +// TODO: I don't think it's safe to lock/unlock a mutex inside a signal handler. +var panic_mutex = bun.Mutex{}; /// Counts how many times the panic handler is invoked by this thread. /// This is used to catch and handle panics triggered by the panic handler. @@ -66,7 +67,9 @@ export fn CrashHandler__setInsideNativePlugin(name: ?[*:0]const u8) callconv(.C) pub threadlocal var current_action: ?Action = null; var before_crash_handlers: std.ArrayListUnmanaged(struct { *anyopaque, *const OnBeforeCrash }) = .{}; -var before_crash_handlers_mutex: std.Thread.Mutex = .{}; + +// TODO: I don't think it's safe to lock/unlock a mutex inside a signal handler. +var before_crash_handlers_mutex: bun.Mutex = .{}; const CPUFeatures = @import("./bun.js/bindings/CPUFeatures.zig").CPUFeatures; @@ -971,7 +974,7 @@ fn waitForOtherThreadToFinishPanicking() void { // Sleep forever without hammering the CPU var futex = std.atomic.Value(u32).init(0); - while (true) std.Thread.Futex.wait(&futex, 0); + while (true) bun.Futex.waitForever(&futex, 0); comptime unreachable; } } @@ -987,7 +990,7 @@ pub fn sleepForeverIfAnotherThreadIsCrashing() void { if (panicking.load(.acquire) > 0) { // Sleep forever without hammering the CPU var futex = std.atomic.Value(u32).init(0); - while (true) std.Thread.Futex.wait(&futex, 0); + while (true) bun.Futex.waitForever(&futex, 0); comptime unreachable; } } diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 044149b731..96d3be9657 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -62,7 +62,7 @@ pub const InternalLoopData = extern struct { low_prio_budget: i32, dns_ready_head: *ConnectingSocket, closed_connecting_head: *ConnectingSocket, - mutex: u32, // this is actually a bun.Lock + mutex: u32, // this is actually a bun.Mutex parent_ptr: ?*anyopaque, parent_tag: c_char, iteration_nr: usize, diff --git a/src/fs.zig b/src/fs.zig index b1feeadd4b..d64b827091 100644 --- a/src/fs.zig +++ b/src/fs.zig @@ -13,7 +13,7 @@ const stringZ = bun.stringZ; const default_allocator = bun.default_allocator; const C = bun.C; const sync = @import("sync.zig"); -const Mutex = @import("./lock.zig").Lock; +const Mutex = bun.Mutex; const Semaphore = sync.Semaphore; const Fs = @This(); const path_handler = @import("./resolver/resolve_path.zig"); diff --git a/src/futex.zig b/src/futex.zig index 49f4e83e77..24d66b056e 100644 --- a/src/futex.zig +++ b/src/futex.zig @@ -1,182 +1,142 @@ -//! Futex is a mechanism used to block (`wait`) and unblock (`wake`) threads using a 32bit memory address as hints. -//! Blocking a thread is acknowledged only if the 32bit memory address is equal to a given value. -//! This check helps avoid block/unblock deadlocks which occur if a `wake()` happens before a `wait()`. -//! Using Futex, other Thread synchronization primitives can be built which efficiently wait for cross-thread events or signals. - -// This is copy-pasted from Zig's source code to fix an issue with linking on macOS Catalina and earlier. +//! This is a copy-pasta of std.Thread.Futex, except without `unreachable` +//! A mechanism used to block (`wait`) and unblock (`wake`) threads using a +//! 32bit memory address as hints. +//! +//! Blocking a thread is acknowledged only if the 32bit memory address is equal +//! to a given value. This check helps avoid block/unblock deadlocks which +//! occur if a `wake()` happens before a `wait()`. +//! +//! Using Futex, other Thread synchronization primitives can be built which +//! efficiently wait for cross-thread events or signals. const std = @import("std"); -const bun = @import("root").bun; const builtin = @import("builtin"); const Futex = @This(); - -const target = builtin.target; -const single_threaded = builtin.single_threaded; - +const windows = std.os.windows; +const linux = std.os.linux; +const c = std.c; +const bun = @import("root").bun; const assert = bun.assert; -const testing = std.testing; - -const Atomic = std.atomic.Value; -const spinLoopHint = std.atomic.spinLoopHint; +const atomic = std.atomic; /// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either: /// - The value at `ptr` is no longer equal to `expect`. /// - The caller is unblocked by a matching `wake()`. -/// - The caller is unblocked spuriously by an arbitrary internal signal. -/// -/// If `timeout` is provided, and the caller is blocked for longer than `timeout` nanoseconds`, `error.TimedOut` is returned. +/// - The caller is unblocked spuriously ("at random"). +/// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned. /// /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically /// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`. -pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - if (single_threaded) { - // check whether the caller should block - if (ptr.raw != expect) { - return; - } +pub fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout_ns: ?u64) error{Timeout}!void { + @setCold(true); - // There are no other threads which could notify the caller on single_threaded. - // Therefore a wait() without a timeout would block indefinitely. - const timeout_ns = timeout orelse { - @panic("deadlock"); - }; - - // Simulate blocking with the timeout knowing that: - // - no other thread can change the ptr value - // - no other thread could unblock us if we waiting on the ptr - std.time.sleep(timeout_ns); - return error.TimedOut; - } - - // Avoid calling into the OS for no-op waits() - if (timeout) |timeout_ns| { - if (timeout_ns == 0) { + // Avoid calling into the OS for no-op timeouts. + if (timeout_ns) |t| { + if (t == 0) { if (ptr.load(.seq_cst) != expect) return; - return error.TimedOut; + return error.Timeout; } } - return OsFutex.wait(ptr, expect, timeout); + return Impl.wait(ptr, expect, timeout_ns); } -/// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`. -/// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`. -pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - if (single_threaded) return; - if (num_waiters == 0) return; +pub fn waitForever(ptr: *const atomic.Value(u32), expect: u32) void { + @setCold(true); - return OsFutex.wake(ptr, num_waiters); + while (true) { + Impl.wait(ptr, expect, null) catch |err| switch (err) { + // Shouldn't happen, but people can override system calls sometimes. + error.Timeout => continue, + }; + break; + } } -const OsFutex = if (target.os.tag == .windows) - WindowsFutex -else if (target.os.tag == .linux) - LinuxFutex -else if (target.isDarwin()) - DarwinFutex -else if (builtin.link_libc) - PosixFutex +/// Unblocks at most `max_waiters` callers blocked in a `wait()` call on `ptr`. +pub fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { + @setCold(true); + + // Avoid calling into the OS if there's nothing to wake up. + if (max_waiters == 0) { + return; + } + + Impl.wake(ptr, max_waiters); +} + +const Impl = if (builtin.os.tag == .windows) + WindowsImpl +else if (builtin.os.tag.isDarwin()) + DarwinImpl +else if (builtin.os.tag == .linux) + LinuxImpl +else if (builtin.target.isWasm()) + WasmImpl else - UnsupportedFutex; + UnsupportedImpl; -const UnsupportedFutex = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { +/// We can't do @compileError() in the `Impl` switch statement above as its eagerly evaluated. +/// So instead, we @compileError() on the methods themselves for platforms which don't support futex. +const UnsupportedImpl = struct { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { return unsupported(.{ ptr, expect, timeout }); } - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - return unsupported(.{ ptr, num_waiters }); + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { + return unsupported(.{ ptr, max_waiters }); } - fn unsupported(unused: anytype) noreturn { - _ = unused; - @compileError("Unsupported operating system: " ++ @tagName(target.os.tag)); + fn unsupported(_: anytype) noreturn { + @compileError("Unsupported operating system " ++ @tagName(builtin.target.os.tag)); } }; -const WindowsFutex = struct { - const windows = std.os.windows; - - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { +// We use WaitOnAddress through NtDll instead of API-MS-Win-Core-Synch-l1-2-0.dll +// as it's generally already a linked target and is autoloaded into all processes anyway. +const WindowsImpl = struct { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { var timeout_value: windows.LARGE_INTEGER = undefined; var timeout_ptr: ?*const windows.LARGE_INTEGER = null; // NTDLL functions work with time in units of 100 nanoseconds. - // Positive values for timeouts are absolute time while negative is relative. - if (timeout) |timeout_ns| { + // Positive values are absolute deadlines while negative values are relative durations. + if (timeout) |delay| { + timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100)); + timeout_value = -timeout_value; timeout_ptr = &timeout_value; - timeout_value = -@as(windows.LARGE_INTEGER, @intCast(timeout_ns / 100)); } - switch (windows.ntdll.RtlWaitOnAddress( - @as(?*const anyopaque, @ptrCast(ptr)), - @as(?*const anyopaque, @ptrCast(&expect)), + const rc = windows.ntdll.RtlWaitOnAddress( + ptr, + &expect, @sizeOf(@TypeOf(expect)), timeout_ptr, - )) { + ); + + switch (rc) { .SUCCESS => {}, - .TIMEOUT => return error.TimedOut, - else => unreachable, + .TIMEOUT => { + assert(timeout != null); + return error.Timeout; + }, + else => @panic("Unexpected RtlWaitOnAddress() return code"), } } - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - const address = @as(?*const anyopaque, @ptrCast(ptr)); - switch (num_waiters) { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { + const address: ?*const anyopaque = ptr; + assert(max_waiters != 0); + + switch (max_waiters) { 1 => windows.ntdll.RtlWakeAddressSingle(address), else => windows.ntdll.RtlWakeAddressAll(address), } } }; -const LinuxFutex = struct { - const linux = std.os.linux; - - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - var ts: std.posix.timespec = undefined; - var ts_ptr: ?*std.posix.timespec = null; - - // Futex timespec timeout is already in relative time. - if (timeout) |timeout_ns| { - ts_ptr = &ts; - ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s)); - ts.tv_nsec = @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s)); - } - - switch (bun.C.getErrno(linux.futex_wait( - @as(*const i32, @ptrCast(ptr)), - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, - @as(i32, @bitCast(expect)), - ts_ptr, - ))) { - .SUCCESS => {}, // notified by `wake()` - .INTR => {}, // spurious wakeup - .AGAIN => {}, // ptr.* != expect - .TIMEDOUT => return error.TimedOut, - .INVAL => {}, // possibly timeout overflow - .FAULT => unreachable, - else => unreachable, - } - } - - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - switch (bun.C.getErrno(linux.futex_wake( - @as(*const i32, @ptrCast(ptr)), - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, - std.math.cast(i32, num_waiters) orelse std.math.maxInt(i32), - ))) { - .SUCCESS => {}, // successful wake up - .INVAL => {}, // invalid futex_wait() on ptr done elsewhere - .FAULT => {}, // pointer became invalid while doing the wake - else => unreachable, - } - } -}; - -const DarwinFutex = struct { - const darwin = std.c; - - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { +const DarwinImpl = struct { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { // Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it: // https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6 // @@ -185,218 +145,160 @@ const DarwinFutex = struct { // // ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout // ulock_wait2() uses 64-bit nano-second timeouts (with the same convention) + const supports_ulock_wait2 = builtin.target.os.version_range.semver.min.major >= 11; + var timeout_ns: u64 = 0; - if (timeout) |timeout_value| { - // This should be checked by the caller. - assert(timeout_value != 0); - timeout_ns = timeout_value; + if (timeout) |delay| { + assert(delay != 0); // handled by timedWait() + timeout_ns = delay; } - const addr = @as(*const anyopaque, @ptrCast(ptr)); - const flags = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO; + // If we're using `__ulock_wait` and `timeout` is too big to fit inside a `u32` count of // micro-seconds (around 70min), we'll request a shorter timeout. This is fine (users // should handle spurious wakeups), but we need to remember that we did so, so that - // we don't return `TimedOut` incorrectly. If that happens, we set this variable to + // we don't return `Timeout` incorrectly. If that happens, we set this variable to // true so that we we know to ignore the ETIMEDOUT result. var timeout_overflowed = false; + + const addr: *const anyopaque = ptr; + const flags = c.UL_COMPARE_AND_WAIT | c.ULF_NO_ERRNO; const status = blk: { - const timeout_us = cast: { - const timeout_u32 = std.math.cast(u32, timeout_ns / std.time.ns_per_us); - timeout_overflowed = timeout_u32 == null; - break :cast timeout_u32 orelse std.math.maxInt(u32); + if (supports_ulock_wait2) { + break :blk c.__ulock_wait2(flags, addr, expect, timeout_ns, 0); + } + + const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) orelse overflow: { + timeout_overflowed = true; + break :overflow std.math.maxInt(u32); }; - break :blk darwin.__ulock_wait(flags, addr, expect, timeout_us); + + break :blk c.__ulock_wait(flags, addr, expect, timeout_us); }; if (status >= 0) return; - switch (@as(std.posix.E, @enumFromInt(-status))) { + switch (@as(c.E, @enumFromInt(-status))) { + // Wait was interrupted by the OS or other spurious signalling. .INTR => {}, - // Address of the futex is paged out. This is unlikely, but possible in theory, and + // Address of the futex was paged out. This is unlikely, but possible in theory, and // pthread/libdispatch on darwin bother to handle it. In this case we'll return // without waiting, but the caller should retry anyway. .FAULT => {}, - .TIMEDOUT => if (!timeout_overflowed) return error.TimedOut, - else => unreachable, + // Only report Timeout if we didn't have to cap the timeout + .TIMEDOUT => { + assert(timeout != null); + if (!timeout_overflowed) return error.Timeout; + }, + else => @panic("Unexpected __ulock_wait() return code"), } } - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - var flags: u32 = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO; - if (num_waiters > 1) { - flags |= darwin.ULF_WAKE_ALL; - } + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { + const default_flags: u32 = c.UL_COMPARE_AND_WAIT | c.ULF_NO_ERRNO; + const flags: u32 = default_flags | (if (max_waiters > 1) c.ULF_WAKE_ALL else @as(u32, 0)); while (true) { - const addr = @as(*const anyopaque, @ptrCast(ptr)); - const status = darwin.__ulock_wake(flags, addr, 0); + const addr: *const anyopaque = ptr; + const status = c.__ulock_wake(flags, addr, 0); if (status >= 0) return; - switch (@as(std.posix.E, @enumFromInt(-status))) { + switch (@as(c.E, @enumFromInt(-status))) { .INTR => continue, // spurious wake() - .FAULT => continue, // address of the lock was paged out + .FAULT => @panic("__ulock_wake() returned EFAULT unexpectedly"), // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t .NOENT => return, // nothing was woken up - .ALREADY => unreachable, // only for ULF_WAKE_THREAD - else => unreachable, + .ALREADY => @panic("__ulock_wake() returned EALREADY unexpectedly"), // only for ULF_WAKE_THREAD + else => @panic("Unexpected __ulock_wake() return code"), } } } }; -const PosixFutex = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - const address = @intFromPtr(ptr); - const bucket = Bucket.from(address); - var waiter: List.Node = undefined; - - { - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - if (ptr.load(.seq_cst) != expect) { - return; +// https://man7.org/linux/man-pages/man2/futex.2.html +const LinuxImpl = struct { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + const ts: linux.timespec = if (timeout) |timeout_ns| + .{ + .tv_sec = @intCast(timeout_ns / std.time.ns_per_s), + .tv_nsec = @intCast(timeout_ns % std.time.ns_per_s), } + else + undefined; - waiter.data = .{ .address = address }; - bucket.list.prepend(&waiter); - } + const rc = linux.futex_wait( + @as(*const i32, @ptrCast(&ptr.raw)), + linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, + @as(i32, @bitCast(expect)), + if (timeout != null) &ts else null, + ); - var timed_out = false; - waiter.data.wait(timeout) catch { - defer if (!timed_out) { - waiter.data.wait(null) catch unreachable; - }; - - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - if (waiter.data.address == address) { - timed_out = true; - bucket.list.remove(&waiter); - } - }; - - waiter.data.deinit(); - if (timed_out) { - return error.TimedOut; + switch (linux.E.init(rc)) { + .SUCCESS => {}, // notified by `wake()` + .INTR => {}, // spurious wakeup + .AGAIN => {}, // ptr.* != expect + .TIMEDOUT => { + assert(timeout != null); + return error.Timeout; + }, + .INVAL => {}, // possibly timeout overflow + .FAULT => @panic("futex_wait() returned EFAULT unexpectedly"), // ptr was invalid + else => |err| bun.Output.panic("Unexpected futex_wait() return code: {d} - {s}", .{ rc, std.enums.tagName(linux.E, err) orelse "unknown" }), } } - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - const address = @intFromPtr(ptr); - const bucket = Bucket.from(address); - var can_notify = num_waiters; + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { + const rc = linux.futex_wake( + @as(*const i32, @ptrCast(&ptr.raw)), + linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, + std.math.cast(i32, max_waiters) orelse std.math.maxInt(i32), + ); - var notified = List{}; - defer while (notified.popFirst()) |waiter| { - waiter.data.notify(); - }; - - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - var waiters = bucket.list.first; - while (waiters) |waiter| { - assert(waiter.data.address != null); - waiters = waiter.next; - - if (waiter.data.address != address) continue; - if (can_notify == 0) break; - can_notify -= 1; - - bucket.list.remove(waiter); - waiter.data.address = null; - notified.prepend(waiter); + switch (linux.E.init(rc)) { + .SUCCESS => {}, // successful wake up + .INVAL => {}, // invalid futex_wait() on ptr done elsewhere + .FAULT => @panic("futex_wake() returned EFAULT unexpectedly"), // pointer became invalid while doing the wake + else => @panic("Unexpected futex_wake() return code"), } } - - const Bucket = struct { - mutex: std.c.pthread_mutex_t = .{}, - list: List = .{}, - - var buckets = [_]Bucket{.{}} ** 64; - - fn from(address: usize) *Bucket { - return &buckets[address % buckets.len]; - } - }; - - const List = std.TailQueue(struct { - address: ?usize, - state: State = .empty, - cond: std.c.pthread_cond_t = .{}, - mutex: std.c.pthread_mutex_t = .{}, - - const Self = @This(); - const State = enum { - empty, - waiting, - notified, - }; - - fn deinit(self: *Self) void { - _ = std.c.pthread_cond_destroy(&self.cond); - _ = std.c.pthread_mutex_destroy(&self.mutex); - } - - fn wait(self: *Self, timeout: ?u64) error{TimedOut}!void { - assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); - - switch (self.state) { - .empty => self.state = .waiting, - .waiting => unreachable, - .notified => return, - } - - var ts: std.posix.timespec = undefined; - var ts_ptr: ?*const std.posix.timespec = null; - if (timeout) |timeout_ns| { - ts_ptr = &ts; - std.posix.clock_gettime(std.posix.CLOCK_REALTIME, &ts) catch unreachable; - ts.tv_sec += @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s)); - ts.tv_nsec += @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s)); - if (ts.tv_nsec >= std.time.ns_per_s) { - ts.tv_sec += 1; - ts.tv_nsec -= std.time.ns_per_s; - } - } - - while (true) { - switch (self.state) { - .empty => unreachable, - .waiting => {}, - .notified => return, - } - - const ts_ref = ts_ptr orelse { - assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == .SUCCESS); - continue; - }; - - const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ref); - switch (rc) { - .SUCCESS => {}, - .TIMEDOUT => { - self.state = .empty; - return error.TimedOut; - }, - else => unreachable, - } - } - } - - fn notify(self: *Self) void { - assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); - - switch (self.state) { - .empty => self.state = .notified, - .waiting => { - self.state = .notified; - assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS); - }, - .notified => unreachable, - } - } - }); +}; + +const WasmImpl = struct { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + if (!comptime std.Target.wasm.featureSetHas(builtin.target.cpu.features, .atomics)) { + @compileError("WASI target missing cpu feature 'atomics'"); + } + const to: i64 = if (timeout) |to| @intCast(to) else -1; + const result = asm ( + \\local.get %[ptr] + \\local.get %[expected] + \\local.get %[timeout] + \\memory.atomic.wait32 0 + \\local.set %[ret] + : [ret] "=r" (-> u32), + : [ptr] "r" (&ptr.raw), + [expected] "r" (@as(i32, @bitCast(expect))), + [timeout] "r" (to), + ); + switch (result) { + 0 => {}, // ok + 1 => {}, // expected =! loaded + 2 => return error.Timeout, + else => @panic("Unexpected memory.atomic.wait32() return code"), + } + } + + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { + if (!comptime std.Target.wasm.featureSetHas(builtin.target.cpu.features, .atomics)) { + @compileError("WASI target missing cpu feature 'atomics'"); + } + assert(max_waiters != 0); + const woken_count = asm ( + \\local.get %[ptr] + \\local.get %[waiters] + \\memory.atomic.notify 0 + \\local.set %[ret] + : [ret] "=r" (-> u32), + : [ptr] "r" (&ptr.raw), + [waiters] "r" (max_waiters), + ); + _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled + } }; diff --git a/src/http.zig b/src/http.zig index 49e077a037..9a40d91427 100644 --- a/src/http.zig +++ b/src/http.zig @@ -18,7 +18,7 @@ const URL = @import("./url.zig").URL; const PercentEncoding = @import("./url.zig").PercentEncoding; pub const Method = @import("./http/method.zig").Method; const Api = @import("./api/schema.zig").Api; -const Lock = @import("./lock.zig").Lock; +const Lock = bun.Mutex; const HTTPClient = @This(); const Zlib = @import("./zlib.zig"); const Brotli = bun.brotli; @@ -1041,8 +1041,8 @@ pub const HTTPThread = struct { queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){}, queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){}, - queued_shutdowns_lock: bun.Lock = .{}, - queued_writes_lock: bun.Lock = .{}, + queued_shutdowns_lock: bun.Mutex = .{}, + queued_writes_lock: bun.Mutex = .{}, queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){}, diff --git a/src/http/zlib.zig b/src/http/zlib.zig index 3055ac99f1..c492ce743a 100644 --- a/src/http/zlib.zig +++ b/src/http/zlib.zig @@ -1,4 +1,4 @@ -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const std = @import("std"); const MutableString = bun.MutableString; const getAllocator = @import("../http.zig").getAllocator; diff --git a/src/install/install.zig b/src/install/install.zig index 6464ccac49..400b5a0c12 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -44,7 +44,7 @@ const which = @import("../which.zig").which; const Run = @import("../bun_js.zig").Run; const Fs = @import("../fs.zig"); const FileSystem = Fs.FileSystem; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const URL = @import("../url.zig").URL; const HTTP = bun.http; const AsyncHTTP = HTTP.AsyncHTTP; @@ -3068,7 +3068,7 @@ pub const PackageManager = struct { duration: u64, }; - mutex: std.Thread.Mutex = .{}, + mutex: bun.Mutex = .{}, list: std.ArrayListUnmanaged(Entry) = .{}, pub fn appendConcurrent(log: *LifecycleScriptTimeLog, allocator: std.mem.Allocator, entry: Entry) void { diff --git a/src/install/lockfile.zig b/src/install/lockfile.zig index 63b2114352..969a31c1dd 100644 --- a/src/install/lockfile.zig +++ b/src/install/lockfile.zig @@ -41,7 +41,7 @@ const Run = @import("../bun_js.zig").Run; const HeaderBuilder = bun.http.HeaderBuilder; const Fs = @import("../fs.zig"); const FileSystem = Fs.FileSystem; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const URL = @import("../url.zig").URL; const AsyncHTTP = bun.http.AsyncHTTP; const HTTPChannel = bun.http.HTTPChannel; diff --git a/src/js_ast.zig b/src/js_ast.zig index 331e0bbc1d..6b4fd31d32 100644 --- a/src/js_ast.zig +++ b/src/js_ast.zig @@ -132,7 +132,6 @@ pub fn NewStore(comptime types: []const type, comptime count: usize) type { } pub fn reset(store: *Store) void { - store.debug_lock.assertUnlocked(); log("reset", .{}); if (Environment.isDebug) { @@ -153,8 +152,6 @@ pub fn NewStore(comptime types: []const type, comptime count: usize) type { @compileError("Store does not know about type: " ++ @typeName(T)); }; - store.debug_lock.assertUnlocked(); - if (store.current.tryAlloc(T)) |ptr| return ptr; diff --git a/src/js_printer.zig b/src/js_printer.zig index fe3e6d4cd4..8cb2b6327d 100644 --- a/src/js_printer.zig +++ b/src/js_printer.zig @@ -6,7 +6,7 @@ const js_ast = bun.JSAst; const options = @import("options.zig"); const rename = @import("renamer.zig"); const runtime = @import("runtime.zig"); -const Lock = @import("./lock.zig").Lock; +const Lock = bun.Mutex; const Api = @import("./api/schema.zig").Api; const fs = @import("fs.zig"); const bun = @import("root").bun; diff --git a/src/lock.zig b/src/lock.zig deleted file mode 100644 index 4334986ade..0000000000 --- a/src/lock.zig +++ /dev/null @@ -1,150 +0,0 @@ -// Note(2024-10-01): there is little reason to use this over std.Thread.Mutex, -// as we have dropped old macOS versions it didnt support. Additionally, the -// Zig Standard Library has deadlock protections in debug builds. -const std = @import("std"); -const Atomic = std.atomic.Value; -const Futex = @import("./futex.zig"); - -// Credit: this is copypasta from @kprotty. Thank you @kprotty! -pub const Mutex = struct { - state: Atomic(u32) = Atomic(u32).init(UNLOCKED), // if changed update loop.c in usockets - - const UNLOCKED = 0; // if changed update loop.c in usockets - const LOCKED = 0b01; - const CONTENDED = 0b11; - const is_x86 = @import("builtin").target.cpu.arch.isX86(); - - pub fn tryAcquire(self: *Mutex) bool { - return self.acquireFast(true); - } - - pub fn acquire(self: *Mutex) void { - if (!self.acquireFast(false)) { - self.acquireSlow(); - } - } - - inline fn acquireFast(self: *Mutex, comptime strong: bool) bool { - // On x86, "lock bts" uses less i-cache & can be faster than "lock cmpxchg" below. - if (comptime is_x86) { - return self.state.bitSet(@ctz(@as(u32, LOCKED)), .acquire) == UNLOCKED; - } - - const cas_fn = comptime switch (strong) { - true => Atomic(u32).cmpxchgStrong, - else => Atomic(u32).cmpxchgWeak, - }; - - return cas_fn( - &self.state, - UNLOCKED, - LOCKED, - .acquire, - .monotonic, - ) == null; - } - - noinline fn acquireSlow(self: *Mutex) void { - // Spin a little bit on the Mutex state in the hopes that - // we can acquire it without having to call Futex.wait(). - // Give up spinning if the Mutex is contended. - // This helps acquire() latency under micro-contention. - // - var spin: u8 = 100; - while (spin > 0) : (spin -= 1) { - std.atomic.spinLoopHint(); - - switch (self.state.load(.monotonic)) { - UNLOCKED => _ = self.state.cmpxchgWeak( - UNLOCKED, - LOCKED, - .acquire, - .monotonic, - ) orelse return, - LOCKED => continue, - CONTENDED => break, - else => unreachable, // invalid Mutex state - } - } - - // Make sure the state is CONTENDED before sleeping with Futex so release() can wake us up. - // Transitioning to CONTENDED may also acquire the mutex in the process. - // - // If we sleep, we must acquire the Mutex with CONTENDED to ensure that other threads - // sleeping on the Futex having seen CONTENDED before are eventually woken up by release(). - // This unfortunately ends up in an extra Futex.wake() for the last thread but that's ok. - while (true) : (Futex.wait(&self.state, CONTENDED, null) catch unreachable) { - // On x86, "xchg" can be faster than "lock cmpxchg" below. - if (comptime is_x86) { - switch (self.state.swap(CONTENDED, .acquire)) { - UNLOCKED => return, - LOCKED, CONTENDED => continue, - else => unreachable, // invalid Mutex state - } - } - - var state = self.state.load(.monotonic); - while (state != CONTENDED) { - state = switch (state) { - UNLOCKED => self.state.cmpxchgWeak(state, CONTENDED, .acquire, .monotonic) orelse return, - LOCKED => self.state.cmpxchgWeak(state, CONTENDED, .monotonic, .monotonic) orelse break, - CONTENDED => unreachable, // checked above - else => unreachable, // invalid Mutex state - }; - } - } - } - - pub fn release(self: *Mutex) void { - switch (self.state.swap(UNLOCKED, .release)) { - UNLOCKED => unreachable, // released without being acquired - LOCKED => {}, - CONTENDED => Futex.wake(&self.state, 1), - else => unreachable, // invalid Mutex state - } - } -}; - -pub const Lock = struct { - mutex: Mutex = .{}, - - pub inline fn lock(this: *Lock) void { - this.mutex.acquire(); - } - - pub inline fn unlock(this: *Lock) void { - this.mutex.release(); - } - - pub inline fn releaseAssertUnlocked(this: *Lock, comptime message: []const u8) void { - if (this.mutex.state.load(.monotonic) != 0) { - @panic(message); - } - } - - pub inline fn assertUnlocked(this: *Lock) void { - if (std.debug.runtime_safety) { - if (this.mutex.state.load(.monotonic) != 0) { - @panic("Mutex is expected to be unlocked"); - } - } - } - - pub inline fn assertLocked(this: *Lock) void { - if (std.debug.runtime_safety) { - if (this.mutex.state.load(.monotonic) == 0) { - @panic("Mutex is expected to be locked"); - } - } - } -}; - -pub fn spinCycle() void {} - -export fn Bun__lock(lock: *Lock) void { - lock.lock(); -} - -export fn Bun__unlock(lock: *Lock) void { - lock.unlock(); -} diff --git a/src/napi/napi.zig b/src/napi/napi.zig index 85650c51d8..ed9e2ab9e0 100644 --- a/src/napi/napi.zig +++ b/src/napi/napi.zig @@ -2,7 +2,7 @@ const std = @import("std"); const JSC = bun.JSC; const strings = bun.strings; const bun = @import("root").bun; -const Lock = @import("../lock.zig").Lock; +const Lock = bun.Mutex; const JSValue = JSC.JSValue; const ZigString = JSC.ZigString; const TODO_EXCEPTION: JSC.C.ExceptionRef = null; @@ -1420,7 +1420,9 @@ pub const ThreadSafeFunction = struct { // User implementation error can cause this number to go negative. thread_count: std.atomic.Value(i64) = std.atomic.Value(i64).init(0), + // for std.condvar lock: std.Thread.Mutex = .{}, + event_loop: *JSC.EventLoop, tracker: JSC.AsyncTaskTracker, diff --git a/src/output.zig b/src/output.zig index 29fb3019d2..6aa50b0f24 100644 --- a/src/output.zig +++ b/src/output.zig @@ -746,7 +746,7 @@ fn ScopedLogger(comptime tagname: []const u8, comptime disabled: bool) type { var out_set = false; var really_disable = disabled; var evaluated_disable = false; - var lock = std.Thread.Mutex{}; + var lock = bun.Mutex{}; pub fn isVisible() bool { if (!evaluated_disable) { diff --git a/src/resolver/resolver.zig b/src/resolver/resolver.zig index d737f4af3d..d427747c82 100644 --- a/src/resolver/resolver.zig +++ b/src/resolver/resolver.zig @@ -28,7 +28,7 @@ const DataURL = @import("./data_url.zig").DataURL; pub const DirInfo = @import("./dir_info.zig"); const ResolvePath = @import("./resolve_path.zig"); const NodeFallbackModules = @import("../node_fallbacks.zig"); -const Mutex = @import("../lock.zig").Lock; +const Mutex = bun.Mutex; const StringBoolMap = bun.StringHashMap(bool); const FileDescriptorType = bun.FileDescriptor; const JSC = bun.JSC; diff --git a/src/s3/download_stream.zig b/src/s3/download_stream.zig index 9adfbb65af..fd2470b19b 100644 --- a/src/s3/download_stream.zig +++ b/src/s3/download_stream.zig @@ -27,7 +27,7 @@ pub const S3HttpDownloadStreamingTask = struct { .capacity = 0, }, }, - mutex: bun.Lock = .{}, + mutex: bun.Mutex = .{}, reported_response_buffer: bun.MutableString = .{ .allocator = bun.default_allocator, .list = .{ diff --git a/src/shell/interpreter.zig b/src/shell/interpreter.zig index 4fbb9a564b..6fb90f25ac 100644 --- a/src/shell/interpreter.zig +++ b/src/shell/interpreter.zig @@ -8765,7 +8765,7 @@ pub const Interpreter = struct { filepath_args: []const [*:0]const u8, total_tasks: usize, err: ?Syscall.Error = null, - lock: std.Thread.Mutex = std.Thread.Mutex{}, + lock: bun.Mutex = bun.Mutex{}, error_signal: std.atomic.Value(bool) = .{ .raw = false }, output_done: std.atomic.Value(usize) = .{ .raw = 0 }, output_count: std.atomic.Value(usize) = .{ .raw = 0 }, @@ -9243,7 +9243,7 @@ pub const Interpreter = struct { root_is_absolute: bool, error_signal: *std.atomic.Value(bool), - err_mutex: bun.Lock = .{}, + err_mutex: bun.Mutex = .{}, err: ?Syscall.Error = null, event_loop: JSC.EventLoopHandle, @@ -10746,7 +10746,7 @@ pub const Interpreter = struct { src_absolute: ?[:0]const u8 = null, tgt_absolute: ?[:0]const u8 = null, cwd_path: [:0]const u8, - verbose_output_lock: std.Thread.Mutex = .{}, + verbose_output_lock: bun.Mutex = .{}, verbose_output: ArrayList(u8) = ArrayList(u8).init(bun.default_allocator), task: JSC.WorkPoolTask = .{ .callback = &runFromThreadPool }, diff --git a/src/shell/shell.zig b/src/shell/shell.zig index ae35c27140..481c0581e9 100644 --- a/src/shell/shell.zig +++ b/src/shell/shell.zig @@ -3566,7 +3566,7 @@ fn isValidVarNameAscii(var_name: []const u8) bool { return true; } -var stderr_mutex = std.Thread.Mutex{}; +var stderr_mutex = bun.Mutex{}; pub fn hasEqSign(str: []const u8) ?u32 { if (isAllAscii(str)) { diff --git a/src/thread_pool.zig b/src/thread_pool.zig index e7e8b8d107..4186d489c2 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -132,7 +132,7 @@ pub const Batch = struct { }; pub const WaitGroup = struct { - mutex: std.Thread.Mutex = .{}, + mutex: bun.Mutex = .{}, counter: u32 = 0, event: std.Thread.ResetEvent = .{}, @@ -806,57 +806,6 @@ const Event = struct { } } - /// Wait for and consume a notification - /// or wait for the event to be shutdown entirely - noinline fn waitFor(self: *Event, timeout: usize) void { - var acquire_with: u32 = EMPTY; - var state = self.state.load(.monotonic); - - while (true) { - // If we're shutdown then exit early. - // Acquire barrier to ensure operations before the shutdown() are seen after the wait(). - // Shutdown is rare so it's better to have an Acquire barrier here instead of on CAS failure + load which are common. - if (state == SHUTDOWN) { - @fence(.acquire); - return; - } - - // Consume a notification when it pops up. - // Acquire barrier to ensure operations before the notify() appear after the wait(). - if (state == NOTIFIED) { - state = self.state.cmpxchgWeak( - state, - acquire_with, - .acquire, - .monotonic, - ) orelse return; - continue; - } - - // There is no notification to consume, we should wait on the event by ensuring its WAITING. - if (state != WAITING) blk: { - state = self.state.cmpxchgWeak( - state, - WAITING, - .monotonic, - .monotonic, - ) orelse break :blk; - continue; - } - - // Wait on the event until a notify() or shutdown(). - // If we wake up to a notification, we must acquire it with WAITING instead of EMPTY - // since there may be other threads sleeping on the Futex who haven't been woken up yet. - // - // Acquiring to WAITING will make the next notify() or shutdown() wake a sleeping futex thread - // who will either exit on SHUTDOWN or acquire with WAITING again, ensuring all threads are awoken. - // This unfortunately results in the last notify() or shutdown() doing an extra futex wake but that's fine. - Futex.wait(&self.state, WAITING, timeout) catch {}; - state = self.state.load(.monotonic); - acquire_with = WAITING; - } - } - /// Post a notification to the event if it doesn't have one already /// then wake up a waiting thread if there is one as well. fn notify(self: *Event) void { diff --git a/src/transpiler.zig b/src/transpiler.zig index ead4afecc5..5836e0e20c 100644 --- a/src/transpiler.zig +++ b/src/transpiler.zig @@ -41,7 +41,7 @@ const Router = @import("./router.zig"); const isPackagePath = _resolver.isPackagePath; const Css = @import("css_scanner.zig"); const DotEnv = @import("./env_loader.zig"); -const Lock = @import("./lock.zig").Lock; +const Lock = bun.Mutex; const NodeFallbackModules = @import("./node_fallbacks.zig"); const CacheEntry = @import("./cache.zig").FsCacheEntry; const Analytics = @import("./analytics/analytics_thread.zig"); diff --git a/src/watcher.zig b/src/watcher.zig index 1a412eb7ef..1cd75586b5 100644 --- a/src/watcher.zig +++ b/src/watcher.zig @@ -9,7 +9,7 @@ const stringZ = bun.stringZ; const FeatureFlags = bun.FeatureFlags; const options = @import("./options.zig"); -const Mutex = @import("./lock.zig").Lock; +const Mutex = bun.Mutex; const Futex = @import("./futex.zig"); pub const WatchItemIndex = u16; const PackageJSON = @import("./resolver/package_json.zig").PackageJSON; @@ -117,9 +117,7 @@ const INotify = struct { bun.assert(this.loaded_inotify); restart: while (true) { - Futex.wait(&this.watch_count, 0, null) catch |err| switch (err) { - error.TimedOut => unreachable, // timeout is infinite - }; + Futex.waitForever(&this.watch_count, 0); const rc = std.posix.system.read( this.inotify_fd, @@ -647,9 +645,6 @@ pub const NewWatcher = if (true) this.close_descriptors = close_descriptors; this.running = false; } else { - // if the mutex is locked, then that's now a UAF. - this.mutex.releaseAssertUnlocked("Watcher mutex is locked when it should not be."); - if (close_descriptors and this.running) { const fds = this.watchlist.items(.fd); for (fds) |fd| {