diff --git a/cmake/sources/ZigSources.txt b/cmake/sources/ZigSources.txt index 02e47eb067..6bda03b2eb 100644 --- a/cmake/sources/ZigSources.txt +++ b/cmake/sources/ZigSources.txt @@ -242,7 +242,6 @@ src/bun.js/test/jest.zig src/bun.js/test/pretty_format.zig src/bun.js/test/snapshot.zig src/bun.js/test/test.zig -src/bun.js/unbounded_queue.zig src/bun.js/uuid.zig src/bun.js/virtual_machine_exports.zig src/bun.js/VirtualMachine.zig @@ -834,17 +833,19 @@ src/string/StringJoiner.zig src/string/unicode.zig src/string/visible.zig src/string/WTFStringImpl.zig -src/sync.zig src/sys_uv.zig src/sys.zig src/system_timer.zig src/test/fixtures.zig src/test/recover.zig -src/thread_pool.zig src/threading.zig +src/threading/channel.zig src/threading/Condition.zig src/threading/Futex.zig src/threading/Mutex.zig +src/threading/ThreadPool.zig +src/threading/unbounded_queue.zig +src/threading/WaitGroup.zig src/tmp.zig src/toml/toml_lexer.zig src/toml/toml_parser.zig diff --git a/misctools/http_bench.zig b/misctools/http_bench.zig index 63c6d7e172..8acdb085b0 100644 --- a/misctools/http_bench.zig +++ b/misctools/http_bench.zig @@ -196,7 +196,7 @@ pub fn main() anyerror!void { response_body: MutableString = undefined, context: HTTP.HTTPChannelContext = undefined, }; - const Batch = @import("../src/thread_pool.zig").Batch; + const Batch = bun.ThreadPool.Batch; var groups = try default_allocator.alloc(Group, args.count); var repeat_i: usize = 0; while (repeat_i < args.repeat + 1) : (repeat_i += 1) { diff --git a/src/bun.js/event_loop/ConcurrentTask.zig b/src/bun.js/event_loop/ConcurrentTask.zig index 0535c33b91..c5f3fa1f28 100644 --- a/src/bun.js/event_loop/ConcurrentTask.zig +++ b/src/bun.js/event_loop/ConcurrentTask.zig @@ -55,5 +55,5 @@ const std = @import("std"); const bun = @import("bun"); const JSC = bun.JSC; const Task = JSC.Task; -const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue; +const UnboundedQueue = bun.threading.UnboundedQueue; const ManagedTask = JSC.ManagedTask; diff --git a/src/bun.js/event_loop/MiniEventLoop.zig b/src/bun.js/event_loop/MiniEventLoop.zig index acf496a6f1..99c0b0fcb8 100644 --- a/src/bun.js/event_loop/MiniEventLoop.zig +++ b/src/bun.js/event_loop/MiniEventLoop.zig @@ -387,7 +387,7 @@ const bun = @import("bun"); const JSC = bun.JSC; const Async = bun.Async; const VirtualMachine = JSC.VirtualMachine; -const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue; +const UnboundedQueue = bun.threading.UnboundedQueue; const AnyTaskWithExtraContext = JSC.AnyTaskWithExtraContext; const uws = bun.uws; const EventLoop = JSC.EventLoop; diff --git a/src/bun.js/node/fs_events.zig b/src/bun.js/node/fs_events.zig index 01085c0987..b10318d4c4 100644 --- a/src/bun.js/node/fs_events.zig +++ b/src/bun.js/node/fs_events.zig @@ -1,9 +1,8 @@ const std = @import("std"); const bun = @import("bun"); const Mutex = bun.Mutex; -const sync = @import("../../sync.zig"); -const Semaphore = sync.Semaphore; -const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue; +const Semaphore = std.Thread.Semaphore; +const UnboundedQueue = bun.threading.UnboundedQueue; const string = bun.string; const PathWatcher = @import("./path_watcher.zig").PathWatcher; @@ -217,9 +216,9 @@ fn InitLibrary() void { pub const FSEventsLoop = struct { signal_source: CFRunLoopSourceRef, - mutex: Mutex, + mutex: Mutex = .{}, loop: CFRunLoopRef = null, - sem: Semaphore, + sem: Semaphore = .{}, thread: std.Thread = undefined, tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{}, watchers: bun.BabyList(?*FSEventsWatcher) = .{}, @@ -322,7 +321,7 @@ pub const FSEventsLoop = struct { return error.FailedToCreateCoreFoudationSourceLoop; } - const fs_loop = FSEventsLoop{ .sem = Semaphore.init(0), .mutex = .{}, .signal_source = signal_source }; + const fs_loop = FSEventsLoop{ .signal_source = signal_source }; this.* = fs_loop; this.thread = try std.Thread.spawn(.{}, FSEventsLoop.CFThreadLoop, .{this}); @@ -547,8 +546,6 @@ pub const FSEventsLoop = struct { CF.Release(this.signal_source); this.signal_source = null; - this.sem.deinit(); - if (this.watcher_count > 0) { while (this.watchers.pop()) |watcher| { if (watcher) |w| { diff --git a/src/bun.js/node/node_fs_stat_watcher.zig b/src/bun.js/node/node_fs_stat_watcher.zig index cd0b105e91..3851654199 100644 --- a/src/bun.js/node/node_fs_stat_watcher.zig +++ b/src/bun.js/node/node_fs_stat_watcher.zig @@ -4,7 +4,7 @@ const bun = @import("bun"); const Fs = @import("../../fs.zig"); const Path = @import("../../resolver/resolve_path.zig"); -const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue; +const UnboundedQueue = bun.threading.UnboundedQueue; const EventLoopTimer = @import("../api/Timer.zig").EventLoopTimer; const VirtualMachine = JSC.VirtualMachine; const EventLoop = JSC.EventLoop; diff --git a/src/bun.zig b/src/bun.zig index 9b3fe8e491..5201575581 100644 --- a/src/bun.zig +++ b/src/bun.zig @@ -695,7 +695,6 @@ pub const webcore = @import("bun.js/webcore.zig"); pub const api = @import("bun.js/api.zig"); pub const logger = @import("./logger.zig"); -pub const ThreadPool = @import("./thread_pool.zig"); pub const default_thread_stack_size = ThreadPool.default_thread_stack_size; pub const picohttp = @import("./deps/picohttp.zig"); pub const uws = @import("./deps/uws.zig"); @@ -1850,7 +1849,8 @@ pub const ParseTask = bundle_v2.ParseTask; pub const threading = @import("./threading.zig"); pub const Mutex = threading.Mutex; pub const Futex = threading.Futex; -pub const UnboundedQueue = @import("./bun.js/unbounded_queue.zig").UnboundedQueue; +pub const ThreadPool = threading.ThreadPool; +pub const UnboundedQueue = threading.UnboundedQueue; pub fn threadlocalAllocator() std.mem.Allocator { if (comptime use_mimalloc) { diff --git a/src/bundler/BundleThread.zig b/src/bundler/BundleThread.zig index 49ae6044b9..5915006c21 100644 --- a/src/bundler/BundleThread.zig +++ b/src/bundler/BundleThread.zig @@ -135,7 +135,6 @@ pub fn BundleThread(CompletionStruct: type) type { completion.transpiler = this; defer { - this.graph.pool.reset(); ast_memory_allocator.pop(); this.deinitWithoutFreeingArena(); } diff --git a/src/bundler/LinkerContext.zig b/src/bundler/LinkerContext.zig index 3389628624..bd60c77530 100644 --- a/src/bundler/LinkerContext.zig +++ b/src/bundler/LinkerContext.zig @@ -19,8 +19,6 @@ pub const LinkerContext = struct { options: LinkerOptions = .{}, - wait_group: ThreadPoolLib.WaitGroup = .{}, - ambiguous_result_pool: std.ArrayList(MatchImport) = undefined, loop: EventLoop, @@ -76,10 +74,10 @@ pub const LinkerContext = struct { }; pub const SourceMapData = struct { - line_offset_wait_group: sync.WaitGroup = .{}, + line_offset_wait_group: sync.WaitGroup = .init(), line_offset_tasks: []Task = &.{}, - quoted_contents_wait_group: sync.WaitGroup = .{}, + quoted_contents_wait_group: sync.WaitGroup = .init(), quoted_contents_tasks: []Task = &.{}, pub const Task = struct { @@ -207,7 +205,6 @@ pub const LinkerContext = struct { try this.graph.load(entry_points, sources, server_component_boundaries, bundle.dynamic_import_entry_points.keys()); bundle.dynamic_import_entry_points.deinit(); - this.wait_group.init(); this.ambiguous_result_pool = std.ArrayList(MatchImport).init(this.allocator); var runtime_named_exports = &this.graph.ast.items(.named_exports)[Index.runtime.get()]; @@ -252,10 +249,8 @@ pub const LinkerContext = struct { reachable: []const Index.Int, ) void { bun.assert(this.options.source_maps != .none); - this.source_maps.line_offset_wait_group.init(); - this.source_maps.quoted_contents_wait_group.init(); - this.source_maps.line_offset_wait_group.counter = @as(u32, @truncate(reachable.len)); - this.source_maps.quoted_contents_wait_group.counter = @as(u32, @truncate(reachable.len)); + this.source_maps.line_offset_wait_group = .initWithCount(reachable.len); + this.source_maps.quoted_contents_wait_group = .initWithCount(reachable.len); this.source_maps.line_offset_tasks = this.allocator.alloc(SourceMapData.Task, reachable.len) catch unreachable; this.source_maps.quoted_contents_tasks = this.allocator.alloc(SourceMapData.Task, reachable.len) catch unreachable; @@ -283,7 +278,7 @@ pub const LinkerContext = struct { } pub fn scheduleTasks(this: *LinkerContext, batch: ThreadPoolLib.Batch) void { - _ = this.pending_task_count.fetchAdd(@as(u32, @truncate(batch.len)), .monotonic); + _ = this.pending_task_count.fetchAdd(@as(u32, @intCast(batch.len)), .monotonic); this.parse_graph.pool.worker_pool.schedule(batch); } @@ -587,7 +582,6 @@ pub const LinkerContext = struct { pub const computeCrossChunkDependencies = @import("linker_context/computeCrossChunkDependencies.zig").computeCrossChunkDependencies; pub const GenerateChunkCtx = struct { - wg: *sync.WaitGroup, c: *LinkerContext, chunks: []Chunk, chunk: *Chunk, @@ -597,7 +591,6 @@ pub const LinkerContext = struct { pub const postProcessCSSChunk = @import("linker_context/postProcessCSSChunk.zig").postProcessCSSChunk; pub const postProcessHTMLChunk = @import("linker_context/postProcessHTMLChunk.zig").postProcessHTMLChunk; pub fn generateChunk(ctx: GenerateChunkCtx, chunk: *Chunk, chunk_index: usize) void { - defer ctx.wg.finish(); const worker = ThreadPool.Worker.get(@fieldParentPtr("linker", ctx.c)); defer worker.unget(); switch (chunk.content) { @@ -610,7 +603,6 @@ pub const LinkerContext = struct { pub const renameSymbolsInChunk = @import("linker_context/renameSymbolsInChunk.zig").renameSymbolsInChunk; pub fn generateJSRenamer(ctx: GenerateChunkCtx, chunk: *Chunk, chunk_index: usize) void { - defer ctx.wg.finish(); var worker = ThreadPool.Worker.get(@fieldParentPtr("linker", ctx.c)); defer worker.unget(); switch (chunk.content) { @@ -2488,11 +2480,11 @@ const sourcemap = bun.sourcemap; const StringJoiner = bun.StringJoiner; const base64 = bun.base64; pub const Ref = @import("../ast/base.zig").Ref; -pub const ThreadPoolLib = @import("../thread_pool.zig"); +pub const ThreadPoolLib = bun.ThreadPool; const BabyList = @import("../baby_list.zig").BabyList; pub const Fs = @import("../fs.zig"); const _resolver = @import("../resolver/resolver.zig"); -const sync = bun.ThreadPool; +const sync = bun.threading; const ImportRecord = bun.ImportRecord; const runtime = @import("../runtime.zig"); diff --git a/src/bundler/ParseTask.zig b/src/bundler/ParseTask.zig index b12b1c8ef4..adfbb69a14 100644 --- a/src/bundler/ParseTask.zig +++ b/src/bundler/ParseTask.zig @@ -1293,7 +1293,7 @@ pub fn runFromThreadPool(this: *ParseTask) void { } }; } - if (this.ctx.graph.pool.usesIOPool()) { + if (ThreadPool.usesIOPool()) { this.ctx.graph.pool.scheduleInsideThreadPool(this); return; } @@ -1392,7 +1392,7 @@ const js_ast = @import("../js_ast.zig"); const linker = @import("../linker.zig"); const base64 = bun.base64; pub const Ref = @import("../ast/base.zig").Ref; -const ThreadPoolLib = @import("../thread_pool.zig"); +const ThreadPoolLib = bun.ThreadPool; const BabyList = @import("../baby_list.zig").BabyList; const Fs = @import("../fs.zig"); const _resolver = @import("../resolver/resolver.zig"); diff --git a/src/bundler/ServerComponentParseTask.zig b/src/bundler/ServerComponentParseTask.zig index 3bd3974c1d..9bfecfdd84 100644 --- a/src/bundler/ServerComponentParseTask.zig +++ b/src/bundler/ServerComponentParseTask.zig @@ -212,7 +212,7 @@ const options = @import("../options.zig"); const js_parser = bun.js_parser; const js_ast = @import("../js_ast.zig"); pub const Ref = @import("../ast/base.zig").Ref; -const ThreadPoolLib = @import("../thread_pool.zig"); +const ThreadPoolLib = bun.ThreadPool; const BabyList = @import("../baby_list.zig").BabyList; const OOM = bun.OOM; diff --git a/src/bundler/ThreadPool.zig b/src/bundler/ThreadPool.zig index 1ba0f27fe8..64fa289f39 100644 --- a/src/bundler/ThreadPool.zig +++ b/src/bundler/ThreadPool.zig @@ -4,75 +4,120 @@ pub const ThreadPool = struct { /// On Windows, this seemed to be a small performance improvement. /// On Linux, this was a performance regression. /// In some benchmarks on macOS, this yielded up to a 60% performance improvement in microbenchmarks that load ~10,000 files. - io_pool: *ThreadPoolLib = undefined, - worker_pool: *ThreadPoolLib = undefined, + io_pool: *ThreadPoolLib, + worker_pool: *ThreadPoolLib, + worker_pool_is_owned: bool = false, workers_assignments: std.AutoArrayHashMap(std.Thread.Id, *Worker) = std.AutoArrayHashMap(std.Thread.Id, *Worker).init(bun.default_allocator), workers_assignments_lock: bun.Mutex = .{}, - v2: *BundleV2 = undefined, + v2: *BundleV2, const debug = Output.scoped(.ThreadPool, false); - pub fn reset(this: *ThreadPool) void { - if (this.usesIOPool()) { - if (this.io_pool.threadpool_context == @as(?*anyopaque, @ptrCast(this))) { - this.io_pool.threadpool_context = null; + const IOThreadPool = struct { + var thread_pool: ThreadPoolLib = undefined; + // Protects initialization and deinitialization of the IO thread pool. + var mutex = bun.threading.Mutex{}; + // 0 means not initialized. 1 means initialized but not used. + // N > 1 means N-1 `ThreadPool`s are using the IO thread pool. + var ref_count = std.atomic.Value(usize).init(0); + + pub fn acquire() *ThreadPoolLib { + var count = ref_count.load(.acquire); + while (true) { + if (count == 0) break; + // .monotonic is okay because we already loaded this value with .acquire, + // and we don't need the store to be .release because the only store that + // matters is the one that goes from 0 to 1, and that one is .release. + count = ref_count.cmpxchgWeak( + count, + count + 1, + .monotonic, + .monotonic, + ) orelse return &thread_pool; } - } - if (this.worker_pool.threadpool_context == @as(?*anyopaque, @ptrCast(this))) { - this.worker_pool.threadpool_context = null; - } - } + mutex.lock(); + defer mutex.unlock(); - pub fn go(this: *ThreadPool, allocator: std.mem.Allocator, comptime Function: anytype) !ThreadPoolLib.ConcurrentFunction(Function) { - return this.worker_pool.go(allocator, Function); - } - - pub fn start(this: *ThreadPool, v2: *BundleV2, existing_thread_pool: ?*ThreadPoolLib) !void { - this.v2 = v2; - - if (existing_thread_pool) |pool| { - this.worker_pool = pool; - } else { - const cpu_count = bun.getThreadCount(); - this.worker_pool = try v2.graph.allocator.create(ThreadPoolLib); - this.worker_pool.* = ThreadPoolLib.init(.{ - .max_threads = cpu_count, + // .monotonic because the store we care about (the one that stores 1 to + // indicate the thread pool is initialized) is guarded by the mutex. + if (ref_count.load(.monotonic) != 0) return &thread_pool; + thread_pool = .init(.{ + .max_threads = @max(@min(bun.getThreadCount(), 4), 2), + // Use a much smaller stack size for the IO thread pool + .stack_size = 512 * 1024, }); - debug("{d} workers", .{cpu_count}); + // 2 means initialized and referenced by one `ThreadPool`. + ref_count.store(2, .release); + return &thread_pool; } - this.worker_pool.setThreadContext(this); + pub fn release() void { + const old = ref_count.fetchSub(1, .release); + bun.assertf(old > 1, "IOThreadPool: too many calls to release()", .{}); + } - this.worker_pool.warm(8); - - const IOThreadPool = struct { - var thread_pool: ThreadPoolLib = undefined; - var once = bun.once(startIOThreadPool); - - fn startIOThreadPool() void { - thread_pool = ThreadPoolLib.init(.{ - .max_threads = @max(@min(bun.getThreadCount(), 4), 2), - - // Use a much smaller stack size for the IO thread pool - .stack_size = 512 * 1024, - }); + pub fn shutdown() bool { + // .acquire instead of .acq_rel is okay because we only need to ensure that other + // threads are done using the IO pool if we read 1 from the ref count. + // + // .monotonic is okay because this function is only guaranteed to succeed when we + // can ensure that no `ThreadPool`s exist. + if (ref_count.cmpxchgStrong(1, 0, .acquire, .monotonic) != null) { + // At least one `ThreadPool` still exists. + return false; } - pub fn get() *ThreadPoolLib { - once.call(.{}); - return &thread_pool; - } + mutex.lock(); + defer mutex.unlock(); + + // .monotonic is okay because the only store that could happen at this point + // is guarded by the mutex. + if (ref_count.load(.monotonic) != 0) return false; + thread_pool.deinit(); + thread_pool = undefined; + } + }; + + pub fn init(v2: *BundleV2, worker_pool: ?*ThreadPoolLib) !ThreadPool { + const pool = worker_pool orelse blk: { + const cpu_count = bun.getThreadCount(); + const pool = try v2.graph.allocator.create(ThreadPoolLib); + pool.* = .init(.{ .max_threads = cpu_count }); + debug("{d} workers", .{cpu_count}); + break :blk pool; }; + var this = initWithPool(v2, pool); + this.worker_pool_is_owned = false; + return this; + } - if (this.usesIOPool()) { - this.io_pool = IOThreadPool.get(); - this.io_pool.setThreadContext(this); + pub fn initWithPool(v2: *BundleV2, worker_pool: *ThreadPoolLib) ThreadPool { + return .{ + .worker_pool = worker_pool, + .io_pool = if (usesIOPool()) IOThreadPool.acquire() else undefined, + .v2 = v2, + }; + } + + pub fn deinit(this: *ThreadPool) void { + if (this.worker_pool_is_owned) { + this.worker_pool.deinit(); + this.v2.graph.allocator.destroy(this.worker_pool); + } + if (usesIOPool()) { + IOThreadPool.release(); + } + } + + pub fn start(this: *ThreadPool) void { + this.worker_pool.warm(8); + if (usesIOPool()) { this.io_pool.warm(1); } } - pub fn usesIOPool(_: *const ThreadPool) bool { + pub fn usesIOPool() bool { if (bun.getRuntimeFeatureFlag(.BUN_FEATURE_FLAG_FORCE_IO_POOL)) { // For testing. return true; @@ -91,6 +136,13 @@ pub const ThreadPool = struct { return false; } + /// Shut down the IO pool, if and only if no `ThreadPool`s exist right now. + /// If a `ThreadPool` exists, this function is a no-op and returns false. + /// Blocks until the IO pool is shut down. + pub fn shutdownIOPool() bool { + return if (usesIOPool()) IOThreadPool.shutdown() else true; + } + pub fn scheduleWithOptions(this: *ThreadPool, parse_task: *ParseTask, is_inside_thread_pool: bool) void { if (parse_task.contents_or_fd == .contents and parse_task.stage == .needs_source_code) { parse_task.stage = .{ @@ -103,7 +155,7 @@ pub const ThreadPool = struct { const scheduleFn = if (is_inside_thread_pool) &ThreadPoolLib.scheduleInsideThreadPool else &ThreadPoolLib.schedule; - if (this.usesIOPool()) { + if (usesIOPool()) { switch (parse_task.stage) { .needs_parse => { scheduleFn(this.worker_pool, .from(&parse_task.task)); @@ -283,9 +335,6 @@ pub const ThreadPool = struct { if (!this.has_created) { this.create(ctx); } - - // no funny business mr. cache - } }; }; @@ -302,7 +351,7 @@ const Logger = @import("../logger.zig"); const js_ast = @import("../js_ast.zig"); const linker = @import("../linker.zig"); pub const Ref = @import("../ast/base.zig").Ref; -const ThreadPoolLib = @import("../thread_pool.zig"); +const ThreadPoolLib = bun.ThreadPool; const ThreadlocalArena = @import("../allocators/mimalloc_arena.zig").Arena; const allocators = @import("../allocators.zig"); diff --git a/src/bundler/bundle_v2.zig b/src/bundler/bundle_v2.zig index 8844b3207d..cd5d34eead 100644 --- a/src/bundler/bundle_v2.zig +++ b/src/bundler/bundle_v2.zig @@ -865,20 +865,16 @@ pub const BundleV2 = struct { this.linker.dev_server = transpiler.options.dev_server; - var pool = try this.graph.allocator.create(ThreadPool); + const pool = try this.graph.allocator.create(ThreadPool); if (cli_watch_flag) { Watcher.enableHotModuleReloading(this); } // errdefer pool.destroy(); errdefer this.graph.heap.deinit(); - pool.* = ThreadPool{}; + pool.* = try .init(this, thread_pool); this.graph.pool = pool; - try pool.start( - this, - thread_pool, - ); - + pool.start(); return this; } @@ -2226,6 +2222,7 @@ pub const BundleV2 = struct { this.graph.pool.worker_pool.wakeForIdleEvents(); } + this.graph.pool.deinit(); for (this.free_list.items) |free| { bun.default_allocator.free(free); @@ -4130,14 +4127,13 @@ pub const sourcemap = bun.sourcemap; pub const StringJoiner = bun.StringJoiner; pub const base64 = bun.base64; pub const Ref = @import("../ast/base.zig").Ref; -pub const ThreadPoolLib = @import("../thread_pool.zig"); +const ThreadPoolLib = bun.ThreadPool; pub const ThreadlocalArena = @import("../allocators/mimalloc_arena.zig").Arena; pub const BabyList = @import("../baby_list.zig").BabyList; pub const Fs = @import("../fs.zig"); pub const schema = @import("../api/schema.zig"); pub const Api = schema.Api; pub const _resolver = @import("../resolver/resolver.zig"); -pub const sync = bun.ThreadPool; pub const ImportRecord = bun.ImportRecord; pub const ImportKind = bun.ImportKind; pub const allocators = @import("../allocators.zig"); diff --git a/src/bundler/linker_context/computeCrossChunkDependencies.zig b/src/bundler/linker_context/computeCrossChunkDependencies.zig index 2135c98a1c..25e2b5bcf7 100644 --- a/src/bundler/linker_context/computeCrossChunkDependencies.zig +++ b/src/bundler/linker_context/computeCrossChunkDependencies.zig @@ -41,9 +41,8 @@ pub fn computeCrossChunkDependencies(c: *LinkerContext, chunks: []Chunk) !void { .symbols = &c.graph.symbols, }; - c.parse_graph.pool.worker_pool.doPtr( + c.parse_graph.pool.worker_pool.eachPtr( c.allocator, - &c.wait_group, cross_chunk_dependencies, CrossChunkDependencies.walk, chunks, diff --git a/src/bundler/linker_context/generateChunksInParallel.zig b/src/bundler/linker_context/generateChunksInParallel.zig index e09ce74aea..762c8bfd15 100644 --- a/src/bundler/linker_context/generateChunksInParallel.zig +++ b/src/bundler/linker_context/generateChunksInParallel.zig @@ -13,15 +13,8 @@ pub fn generateChunksInParallel(c: *LinkerContext, chunks: []Chunk, comptime is_ // TODO(@paperclover/bake): instead of running a renamer per chunk, run it per file debug(" START {d} renamers", .{chunks.len}); defer debug(" DONE {d} renamers", .{chunks.len}); - var wait_group = try c.allocator.create(sync.WaitGroup); - wait_group.init(); - defer { - wait_group.deinit(); - c.allocator.destroy(wait_group); - } - wait_group.counter = @as(u32, @truncate(chunks.len)); - const ctx = GenerateChunkCtx{ .chunk = &chunks[0], .wg = wait_group, .c = c, .chunks = chunks }; - try c.parse_graph.pool.worker_pool.doPtr(c.allocator, wait_group, ctx, LinkerContext.generateJSRenamer, chunks); + const ctx = GenerateChunkCtx{ .chunk = &chunks[0], .c = c, .chunks = chunks }; + try c.parse_graph.pool.worker_pool.eachPtr(c.allocator, ctx, LinkerContext.generateJSRenamer, chunks); } if (c.source_maps.line_offset_tasks.len > 0) { @@ -37,12 +30,6 @@ pub fn generateChunksInParallel(c: *LinkerContext, chunks: []Chunk, comptime is_ // Remove duplicate rules across files. This must be done in serial, not // in parallel, and must be done from the last rule to the first rule. if (c.parse_graph.css_file_count > 0) { - var wait_group = try c.allocator.create(sync.WaitGroup); - wait_group.init(); - defer { - wait_group.deinit(); - c.allocator.destroy(wait_group); - } const total_count = total_count: { var total_count: usize = 0; for (chunks) |*chunk| { @@ -65,15 +52,13 @@ pub fn generateChunksInParallel(c: *LinkerContext, chunks: []Chunk, comptime is_ }, .chunk = chunk, .linker = c, - .wg = wait_group, }; batch.push(.from(&tasks[i].task)); i += 1; } } - wait_group.counter = @as(u32, @truncate(total_count)); c.parse_graph.pool.worker_pool.schedule(batch); - wait_group.wait(); + c.parse_graph.pool.worker_pool.waitForAll(); } else if (Environment.isDebug) { for (chunks) |*chunk| { bun.assert(chunk.content != .css); @@ -84,34 +69,27 @@ pub fn generateChunksInParallel(c: *LinkerContext, chunks: []Chunk, comptime is_ { const chunk_contexts = c.allocator.alloc(GenerateChunkCtx, chunks.len) catch unreachable; defer c.allocator.free(chunk_contexts); - var wait_group = try c.allocator.create(sync.WaitGroup); - wait_group.init(); - defer { - wait_group.deinit(); - c.allocator.destroy(wait_group); - } - errdefer wait_group.wait(); { var total_count: usize = 0; for (chunks, chunk_contexts) |*chunk, *chunk_ctx| { switch (chunk.content) { .javascript => { - chunk_ctx.* = .{ .wg = wait_group, .c = c, .chunks = chunks, .chunk = chunk }; + chunk_ctx.* = .{ .c = c, .chunks = chunks, .chunk = chunk }; total_count += chunk.content.javascript.parts_in_chunk_in_order.len; chunk.compile_results_for_chunk = c.allocator.alloc(CompileResult, chunk.content.javascript.parts_in_chunk_in_order.len) catch bun.outOfMemory(); has_js_chunk = true; }, .css => { has_css_chunk = true; - chunk_ctx.* = .{ .wg = wait_group, .c = c, .chunks = chunks, .chunk = chunk }; + chunk_ctx.* = .{ .c = c, .chunks = chunks, .chunk = chunk }; total_count += chunk.content.css.imports_in_chunk_in_order.len; chunk.compile_results_for_chunk = c.allocator.alloc(CompileResult, chunk.content.css.imports_in_chunk_in_order.len) catch bun.outOfMemory(); }, .html => { has_html_chunk = true; // HTML gets only one chunk. - chunk_ctx.* = .{ .wg = wait_group, .c = c, .chunks = chunks, .chunk = chunk }; + chunk_ctx.* = .{ .c = c, .chunks = chunks, .chunk = chunk }; total_count += 1; chunk.compile_results_for_chunk = c.allocator.alloc(CompileResult, 1) catch bun.outOfMemory(); }, @@ -183,9 +161,8 @@ pub fn generateChunksInParallel(c: *LinkerContext, chunks: []Chunk, comptime is_ }, } } - wait_group.counter = @as(u32, @truncate(total_count)); c.parse_graph.pool.worker_pool.schedule(batch); - wait_group.wait(); + c.parse_graph.pool.worker_pool.waitForAll(); } if (c.source_maps.quoted_contents_tasks.len > 0) { @@ -202,12 +179,9 @@ pub fn generateChunksInParallel(c: *LinkerContext, chunks: []Chunk, comptime is_ bun.assert(chunks_to_do.len > 0); debug(" START {d} postprocess chunks", .{chunks_to_do.len}); defer debug(" DONE {d} postprocess chunks", .{chunks_to_do.len}); - wait_group.init(); - wait_group.counter = @as(u32, @truncate(chunks_to_do.len)); - try c.parse_graph.pool.worker_pool.doPtr( + try c.parse_graph.pool.worker_pool.eachPtr( c.allocator, - wait_group, chunk_contexts[0], generateChunk, chunks_to_do, @@ -576,7 +550,6 @@ pub const ThreadPool = bun.bundle_v2.ThreadPool; const Loc = Logger.Loc; const Chunk = bun.bundle_v2.Chunk; -const sync = bun.ThreadPool; const GenerateChunkCtx = LinkerContext.GenerateChunkCtx; const CompileResult = LinkerContext.CompileResult; const PendingPartRange = LinkerContext.PendingPartRange; @@ -599,4 +572,4 @@ const base64 = bun.base64; const JSC = bun.JSC; -pub const ThreadPoolLib = bun.ThreadPool; +const ThreadPoolLib = bun.ThreadPool; diff --git a/src/bundler/linker_context/generateCompileResultForCssChunk.zig b/src/bundler/linker_context/generateCompileResultForCssChunk.zig index 926075c99d..b868aa2ead 100644 --- a/src/bundler/linker_context/generateCompileResultForCssChunk.zig +++ b/src/bundler/linker_context/generateCompileResultForCssChunk.zig @@ -1,7 +1,6 @@ pub fn generateCompileResultForCssChunk(task: *ThreadPoolLib.Task) void { const part_range: *const PendingPartRange = @fieldParentPtr("task", task); const ctx = part_range.ctx; - defer ctx.wg.finish(); var worker = ThreadPool.Worker.get(@fieldParentPtr("linker", ctx.c)); defer worker.unget(); diff --git a/src/bundler/linker_context/generateCompileResultForHtmlChunk.zig b/src/bundler/linker_context/generateCompileResultForHtmlChunk.zig index 21c4ee7db6..aeab9c31a2 100644 --- a/src/bundler/linker_context/generateCompileResultForHtmlChunk.zig +++ b/src/bundler/linker_context/generateCompileResultForHtmlChunk.zig @@ -20,7 +20,6 @@ pub fn generateCompileResultForHtmlChunk(task: *ThreadPoolLib.Task) void { const part_range: *const PendingPartRange = @fieldParentPtr("task", task); const ctx = part_range.ctx; - defer ctx.wg.finish(); var worker = ThreadPool.Worker.get(@fieldParentPtr("linker", ctx.c)); defer worker.unget(); diff --git a/src/bundler/linker_context/generateCompileResultForJSChunk.zig b/src/bundler/linker_context/generateCompileResultForJSChunk.zig index 0c6e6a1466..a67251e593 100644 --- a/src/bundler/linker_context/generateCompileResultForJSChunk.zig +++ b/src/bundler/linker_context/generateCompileResultForJSChunk.zig @@ -1,7 +1,6 @@ pub fn generateCompileResultForJSChunk(task: *ThreadPoolLib.Task) void { const part_range: *const PendingPartRange = @fieldParentPtr("task", task); const ctx = part_range.ctx; - defer ctx.wg.finish(); var worker = ThreadPool.Worker.get(@fieldParentPtr("linker", ctx.c)); defer worker.unget(); diff --git a/src/bundler/linker_context/prepareCssAstsForChunk.zig b/src/bundler/linker_context/prepareCssAstsForChunk.zig index f1da7f8082..bc8589c55c 100644 --- a/src/bundler/linker_context/prepareCssAstsForChunk.zig +++ b/src/bundler/linker_context/prepareCssAstsForChunk.zig @@ -2,12 +2,10 @@ pub const PrepareCssAstTask = struct { task: ThreadPoolLib.Task, chunk: *Chunk, linker: *LinkerContext, - wg: *sync.WaitGroup, }; pub fn prepareCssAstsForChunk(task: *ThreadPoolLib.Task) void { const prepare_css_asts: *const PrepareCssAstTask = @fieldParentPtr("task", task); - defer prepare_css_asts.wg.finish(); var worker = ThreadPool.Worker.get(@fieldParentPtr("linker", prepare_css_asts.linker)); defer worker.unget(); @@ -277,7 +275,6 @@ const LinkerContext = bun.bundle_v2.LinkerContext; const ThreadPoolLib = bun.ThreadPool; const std = @import("std"); -const sync = bun.ThreadPool; const ImportRecord = bun.ImportRecord; const bundler = bun.bundle_v2; diff --git a/src/bundler/linker_context/scanImportsAndExports.zig b/src/bundler/linker_context/scanImportsAndExports.zig index 2d53e3fabe..a3b25df521 100644 --- a/src/bundler/linker_context/scanImportsAndExports.zig +++ b/src/bundler/linker_context/scanImportsAndExports.zig @@ -370,7 +370,12 @@ pub fn scanImportsAndExports(this: *LinkerContext) !void { // for CommonJS files, and is also necessary for other files if they are // imported using an import star statement. // Note: `do` will wait for all to finish before moving forward - try this.parse_graph.pool.worker_pool.do(this.allocator, &this.wait_group, this, LinkerContext.doStep5, this.graph.reachable_files); + try this.parse_graph.pool.worker_pool.each( + this.allocator, + this, + LinkerContext.doStep5, + this.graph.reachable_files, + ); } if (comptime FeatureFlags.help_catch_memory_issues) { diff --git a/src/cli.zig b/src/cli.zig index 3224c3af42..2e55153a6f 100644 --- a/src/cli.zig +++ b/src/cli.zig @@ -14,7 +14,6 @@ const File = bun.sys.File; const debug = Output.scoped(.CLI, true); -const sync = @import("./sync.zig"); const Api = @import("api/schema.zig").Api; const clap = bun.clap; const BunJS = @import("./bun_js.zig"); @@ -33,7 +32,6 @@ pub var Bun__Node__ProcessTitle: ?string = null; pub const Cli = struct { pub const CompileTarget = @import("./compile_target.zig"); - var wait_group: sync.WaitGroup = undefined; pub var log_: logger.Log = undefined; pub fn startTransform(_: std.mem.Allocator, _: Api.TransformOptions, _: *logger.Log) anyerror!void {} pub fn start(allocator: std.mem.Allocator) void { diff --git a/src/deps/boringssl.translated.zig b/src/deps/boringssl.translated.zig index 3c1cdca6aa..ee30274dbe 100644 --- a/src/deps/boringssl.translated.zig +++ b/src/deps/boringssl.translated.zig @@ -1,7 +1,7 @@ const std = @import("std"); const bun = @import("bun"); const C = @import("std").zig.c_builtins; -const pthread_rwlock_t = if (bun.Environment.isPosix) @import("../sync.zig").RwLock.pthread_rwlock_t else *anyopaque; +const pthread_rwlock_t = if (bun.Environment.isPosix) std.c.pthread_rwlock_t else *anyopaque; const time_t = C.time_t; const va_list = C.va_list; const struct_timeval = C.struct_timeval; diff --git a/src/http/AsyncHTTP.zig b/src/http/AsyncHTTP.zig index c9d9f3800d..816ef1fa8c 100644 --- a/src/http/AsyncHTTP.zig +++ b/src/http/AsyncHTTP.zig @@ -500,7 +500,7 @@ const Batch = bun.ThreadPool.Batch; const SSLConfig = @import("../bun.js/api/server.zig").ServerConfig.SSLConfig; const HTTPCallbackPair = .{ *AsyncHTTP, HTTPClientResult }; -const Channel = @import("../sync.zig").Channel; +const Channel = bun.threading.Channel; pub const HTTPChannel = Channel(HTTPCallbackPair, .{ .Static = 1000 }); // 32 pointers much cheaper than 1000 pointers const SingleHTTPChannel = struct { diff --git a/src/http/HTTPThread.zig b/src/http/HTTPThread.zig index 23e1a088e6..83c7829bb1 100644 --- a/src/http/HTTPThread.zig +++ b/src/http/HTTPThread.zig @@ -468,7 +468,7 @@ const strings = bun.strings; const stringZ = bun.stringZ; const JSC = bun.JSC; const NewHTTPContext = bun.http.NewHTTPContext; -const UnboundedQueue = @import("../bun.js/unbounded_queue.zig").UnboundedQueue; +const UnboundedQueue = bun.threading.UnboundedQueue; const AsyncHTTP = bun.http.AsyncHTTP; pub const Queue = UnboundedQueue(AsyncHTTP, .next); diff --git a/src/sync.zig b/src/sync.zig deleted file mode 100644 index ea63c40920..0000000000 --- a/src/sync.zig +++ /dev/null @@ -1,1226 +0,0 @@ -const std = @import("std"); -const system = if (bun.Environment.isWindows) std.os.windows else std.posix.system; -const bun = @import("bun"); - -// https://gist.github.com/kprotty/0d2dc3da4840341d6ff361b27bdac7dc -pub const ThreadPool = struct { - state: usize = 0, - spawned: usize = 0, - run_queue: Queue, - idle_semaphore: Semaphore, - allocator: std.mem.Allocator, - workers: []Worker = &[_]Worker{}, - - pub const InitConfig = struct { - allocator: ?std.mem.Allocator = null, - max_threads: ?usize = null, - - var default_gpa = std.heap.GeneralPurposeAllocator(.{}){}; - var default_allocator = &default_gpa.allocator; - }; - - pub fn init(self: *ThreadPool, config: InitConfig) !void { - self.* = ThreadPool{ - .run_queue = Queue.init(), - .idle_semaphore = Semaphore.init(0), - .allocator = config.allocator orelse InitConfig.default_allocator, - }; - - errdefer self.deinit(); - - const num_workers = @max(1, config.max_threads orelse std.Thread.cpuCount() catch 1); - self.workers = try self.allocator.alloc(Worker, num_workers); - - for (&self.workers) |*worker| { - try worker.init(self); - @atomicStore(usize, &self.spawned, self.spawned + 1, .seq_cst); - } - } - - pub fn deinit(self: *ThreadPool) void { - self.shutdown(); - - for (&self.workers[0..self.spawned]) |*worker| - worker.deinit(); - - while (self.run_queue.pop()) |run_node| - (run_node.data.runFn)(&run_node.data); - - self.allocator.free(self.workers); - self.idle_semaphore.deinit(); - self.run_queue.deinit(); - self.* = undefined; - } - - pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { - const Args = @TypeOf(args); - const Closure = struct { - func_args: Args, - allocator: std.mem.Allocator, - run_node: RunNode = .{ .data = .{ .runFn = runFn } }, - - fn runFn(runnable: *Runnable) void { - const run_node: *RunNode = @fieldParentPtr("data", runnable); - const closure: *@This() = @fieldParentPtr("run_node", run_node); - _ = @call(.auto, func, closure.func_args); - closure.allocator.destroy(closure); - } - }; - - const allocator = self.allocator; - const closure = try allocator.create(Closure); - errdefer allocator.destroy(closure); - closure.* = Closure{ - .func_args = args, - .allocator = allocator, - }; - - const run_node = &closure.run_node; - if (Worker.current) |worker| { - worker.run_queue.push(run_node); - } else { - self.run_queue.push(run_node); - } - - self.notify(); - } - - const State = struct { - is_shutdown: bool = false, - is_notified: bool = false, - idle_workers: usize = 0, - - fn pack(self: State) usize { - return ((@as(usize, @intFromBool(self.is_shutdown)) << 0) | - (@as(usize, @intFromBool(self.is_notified)) << 1) | - (self.idle_workers << 2)); - } - - fn unpack(value: usize) State { - return State{ - .is_shutdown = value & (1 << 0) != 0, - .is_notified = value & (1 << 1) != 0, - .idle_workers = value >> 2, - }; - } - }; - - fn wait(self: *ThreadPool) error{Shutdown}!void { - var state = State.unpack(@atomicLoad(usize, &self.state, .seq_cst)); - while (true) { - if (state.is_shutdown) - return error.Shutdown; - - var new_state = state; - if (state.is_notified) { - new_state.is_notified = false; - } else { - new_state.idle_workers += 1; - } - - if (@cmpxchgWeak( - usize, - &self.state, - state.pack(), - new_state.pack(), - .seq_cst, - .seq_cst, - )) |updated| { - state = State.unpack(updated); - continue; - } - - if (!state.is_notified) - self.idle_semaphore.wait(); - return; - } - } - - fn notify(self: *ThreadPool) void { - var state = State.unpack(@atomicLoad(usize, &self.state, .seq_cst)); - while (true) { - if (state.is_shutdown) - return; - - var new_state = state; - if (state.is_notified) { - return; - } else if (state.idle_workers == 0) { - new_state.is_notified = true; - } else { - new_state.idle_workers -= 1; - } - - if (@cmpxchgWeak( - usize, - &self.state, - state.pack(), - new_state.pack(), - .seq_cst, - .seq_cst, - )) |updated| { - state = State.unpack(updated); - continue; - } - - if (!new_state.is_notified) - self.idle_semaphore.post(); - return; - } - } - - fn shutdown(self: *ThreadPool) void { - var state = State.unpack(@atomicRmw( - usize, - &self.state, - .Xchg, - (State{ .is_shutdown = true }).pack(), - .seq_cst, - )); - - while (state.idle_workers > 0) : (state.idle_workers -= 1) - self.idle_semaphore.post(); - } - - const Worker = struct { - thread: *std.Thread, - run_queue: Queue, - - fn init(self: *Worker, pool: *ThreadPool) !void { - self.* = Worker{ - .thread = undefined, - .run_queue = Queue.init(), - }; - - self.thread = std.Thread.spawn( - Worker.run, - RunConfig{ - .worker = self, - .pool = pool, - }, - ) catch |err| { - self.run_queue.deinit(); - return err; - }; - } - - fn deinit(self: *Worker) void { - self.thread.wait(); - self.run_queue.deinit(); - self.* = undefined; - } - - threadlocal var current: ?*Worker = null; - - const RunConfig = struct { - worker: *Worker, - pool: *ThreadPool, - }; - - fn run(config: RunConfig) void { - const self = config.worker; - const pool = config.pool; - - const old_current = current; - current = self; - defer current = old_current; - - var tick = @intFromPtr(self); - var prng = std.rand.DefaultPrng.init(tick); - - while (true) { - const run_node = self.poll(tick, pool, &prng.random) orelse { - pool.wait() catch break; - continue; - }; - - tick +%= 1; - (run_node.data.runFn)(&run_node.data); - } - } - - fn poll(self: *Worker, tick: usize, pool: *ThreadPool, rand: *std.rand.Random) ?*RunNode { - if (tick % 128 == 0) { - if (self.steal(pool, rand, .fair)) |run_node| - return run_node; - } - - if (tick % 64 == 0) { - if (self.run_queue.steal(&pool.run_queue, .fair)) |run_node| - return run_node; - } - - if (self.run_queue.pop()) |run_node| - return run_node; - - var attempts: usize = 8; - while (attempts > 0) : (attempts -= 1) { - if (self.steal(pool, rand, .unfair)) |run_node| { - return run_node; - } else { - std.posix.sched_yield() catch spinLoopHint(); - } - } - - if (self.run_queue.steal(&pool.run_queue, .unfair)) |run_node| - return run_node; - - return null; - } - - fn steal(self: *Worker, pool: *ThreadPool, rand: *std.rand.Random, mode: anytype) ?*RunNode { - const spawned = @atomicLoad(usize, &pool.spawned, .seq_cst); - if (spawned < 2) - return null; - - var index = rand.uintLessThan(usize, spawned); - - var iter = spawned; - while (iter > 0) : (iter -= 1) { - const target = &pool.workers[index]; - - index += 1; - if (index == spawned) - index = 0; - - if (target == self) - continue; - if (self.run_queue.steal(&target.run_queue, mode)) |run_node| - return run_node; - } - - return null; - } - }; - - const Queue = struct { - mutex: Mutex, - size: usize, - list: List, - - fn init() Queue { - return Queue{ - .mutex = Mutex.init(), - .size = 0, - .list = .{}, - }; - } - - fn deinit(self: *Queue) void { - self.mutex.deinit(); - self.* = undefined; - } - - fn push(self: *Queue, node: *List.Node) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.list.prepend(node); - @atomicStore(usize, &self.size, self.size + 1, .seq_cst); - } - - fn pop(self: *Queue) ?*List.Node { - return self.popFrom(.head); - } - - fn steal(_: *Queue, target: *Queue, mode: enum { fair, unfair }) ?*RunNode { - return target.popFrom(switch (mode) { - .fair => .tail, - .unfair => .head, - }); - } - - fn popFrom(self: *Queue, side: enum { head, tail }) ?*RunNode { - if (@atomicLoad(usize, &self.size, .seq_cst) == 0) - return null; - - self.mutex.lock(); - defer self.mutex.unlock(); - - // potential deadlock when all pops are fair.. - const run_node = switch (side) { - .head => self.list.popFirst(), - .tail => self.list.pop(), - }; - - if (run_node != null) - @atomicStore(usize, &self.size, self.size - 1, .seq_cst); - - return run_node; - } - }; - - const List = std.TailQueue(Runnable); - const RunNode = List.Node; - const Runnable = struct { - runFn: *const (fn (*Runnable) void), - }; -}; - -pub fn Channel( - comptime T: type, - comptime buffer_type: std.fifo.LinearFifoBufferType, -) type { - return struct { - mutex: Mutex, - putters: Condvar, - getters: Condvar, - buffer: Buffer, - is_closed: bool, - - const Self = @This(); - const Buffer = std.fifo.LinearFifo(T, buffer_type); - - pub const init = switch (buffer_type) { - .Static => initStatic, - .Slice => initSlice, - .Dynamic => initDynamic, - }; - - pub inline fn initStatic() Self { - return .withBuffer(Buffer.init()); - } - - pub inline fn initSlice(buf: []T) Self { - return .withBuffer(Buffer.init(buf)); - } - - pub inline fn initDynamic(allocator: std.mem.Allocator) Self { - return .withBuffer(Buffer.init(allocator)); - } - - fn withBuffer(buffer: Buffer) Self { - return Self{ - .mutex = Mutex.init(), - .putters = Condvar.init(), - .getters = Condvar.init(), - .buffer = buffer, - .is_closed = false, - }; - } - - pub fn deinit(self: *Self) void { - self.mutex.deinit(); - self.putters.deinit(); - self.getters.deinit(); - self.buffer.deinit(); - self.* = undefined; - } - - pub fn close(self: *Self) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - if (self.is_closed) - return; - - self.is_closed = true; - self.putters.broadcast(); - self.getters.broadcast(); - } - - pub fn tryWriteItem(self: *Self, item: T) !bool { - const wrote = try self.write(&[1]T{item}); - return wrote == 1; - } - - pub fn writeItem(self: *Self, item: T) !void { - return self.writeAll(&[1]T{item}); - } - - pub fn write(self: *Self, items: []const T) !usize { - return self.writeItems(items, false); - } - - pub fn tryReadItem(self: *Self) !?T { - var items: [1]T = undefined; - if ((try self.read(&items)) != 1) - return null; - return items[0]; - } - - pub fn readItem(self: *Self) !T { - var items: [1]T = undefined; - try self.readAll(&items); - return items[0]; - } - - pub fn read(self: *Self, items: []T) !usize { - return self.readItems(items, false); - } - - pub fn writeAll(self: *Self, items: []const T) !void { - bun.assert((try self.writeItems(items, true)) == items.len); - } - - pub fn readAll(self: *Self, items: []T) !void { - bun.assert((try self.readItems(items, true)) == items.len); - } - - fn writeItems(self: *Self, items: []const T, should_block: bool) !usize { - self.mutex.lock(); - defer self.mutex.unlock(); - - var pushed: usize = 0; - while (pushed < items.len) { - const did_push = blk: { - if (self.is_closed) - return error.Closed; - - self.buffer.write(items) catch |err| { - if (buffer_type == .Dynamic) - return err; - break :blk false; - }; - - self.getters.signal(); - break :blk true; - }; - - if (did_push) { - pushed += 1; - } else if (should_block) { - self.putters.wait(&self.mutex); - } else { - break; - } - } - - return pushed; - } - - fn readItems(self: *Self, items: []T, should_block: bool) !usize { - self.mutex.lock(); - defer self.mutex.unlock(); - - var popped: usize = 0; - while (popped < items.len) { - const new_item = blk: { - // Buffer can contain null items but readItem will return null if the buffer is empty. - // we need to check if the buffer is empty before trying to read an item. - if (self.buffer.count == 0) { - if (self.is_closed) - return error.Closed; - break :blk null; - } - - const item = self.buffer.readItem(); - self.putters.signal(); - break :blk item; - }; - - if (new_item) |item| { - items[popped] = item; - popped += 1; - } else if (should_block) { - self.getters.wait(&self.mutex); - } else { - break; - } - } - - return popped; - } - }; -} - -pub const RwLock = if (@import("builtin").os.tag != .windows and @import("builtin").link_libc) - struct { - rwlock: if (@import("builtin").os.tag != .windows) pthread_rwlock_t else void, - - pub fn init() RwLock { - return .{ .rwlock = PTHREAD_RWLOCK_INITIALIZER }; - } - - pub fn deinit(self: *RwLock) void { - const safe_rc = switch (@import("builtin").os.tag) { - .dragonfly, .netbsd => std.posix.EAGAIN, - else => std.c.E.SUCCESS, - }; - - const rc = std.c.pthread_rwlock_destroy(&self.rwlock); - bun.assert(rc == .SUCCESS or rc == safe_rc); - - self.* = undefined; - } - - pub fn tryLock(self: *RwLock) bool { - return pthread_rwlock_trywrlock(&self.rwlock) == 0; - } - - pub fn lock(self: *RwLock) void { - const rc = pthread_rwlock_wrlock(&self.rwlock); - bun.assert(rc == .SUCCESS); - } - - pub fn unlock(self: *RwLock) void { - const rc = pthread_rwlock_unlock(&self.rwlock); - bun.assert(rc == .SUCCESS); - } - - pub fn tryLockShared(self: *RwLock) bool { - return pthread_rwlock_tryrdlock(&self.rwlock) == 0; - } - - pub fn lockShared(self: *RwLock) void { - const rc = pthread_rwlock_rdlock(&self.rwlock); - bun.assert(rc == .SUCCESS); - } - - pub fn unlockShared(self: *RwLock) void { - const rc = pthread_rwlock_unlock(&self.rwlock); - bun.assert(rc == .SUCCESS); - } - - const PTHREAD_RWLOCK_INITIALIZER = pthread_rwlock_t{}; - pub const pthread_rwlock_t = switch (@import("builtin").os.tag) { - .macos, .ios, .watchos, .tvos => extern struct { - __sig: c_long = 0x2DA8B3B4, - __opaque: [192]u8 = [_]u8{0} ** 192, - }, - .linux => switch (@import("builtin").abi) { - .android => switch (@sizeOf(usize)) { - 4 => extern struct { - lock: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, - cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER, - numLocks: c_int = 0, - writerThreadId: c_int = 0, - pendingReaders: c_int = 0, - pendingWriters: c_int = 0, - attr: i32 = 0, - __reserved: [12]u8 = [_]u8{0} ** 2, - }, - 8 => extern struct { - numLocks: c_int = 0, - writerThreadId: c_int = 0, - pendingReaders: c_int = 0, - pendingWriters: c_int = 0, - attr: i32 = 0, - __reserved: [36]u8 = [_]u8{0} ** 36, - }, - else => @compileError("unreachable"), - }, - else => extern struct { - size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56, - }, - }, - .fuchsia => extern struct { - size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56, - }, - .emscripten => extern struct { - size: [32]u8 align(4) = [_]u8{0} ** 32, - }, - .netbsd => extern struct { - ptr_magic: c_uint = 0x99990009, - ptr_interlock: switch (@import("builtin").target.cpu.arch) { - .aarch64, .sparc, .x86_64 => u8, - .arm, .powerpc => c_int, - else => @compileError("unreachable"), - } = 0, - ptr_rblocked_first: ?*u8 = null, - ptr_rblocked_last: ?*u8 = null, - ptr_wblocked_first: ?*u8 = null, - ptr_wblocked_last: ?*u8 = null, - ptr_nreaders: c_uint = 0, - ptr_owner: std.c.pthread_t = null, - ptr_private: ?*anyopaque = null, - }, - .haiku => extern struct { - flags: u32 = 0, - owner: i32 = -1, - lock_sem: i32 = 0, - lock_count: i32 = 0, - reader_count: i32 = 0, - writer_count: i32 = 0, - waiters: [2]?*anyopaque = [_]?*anyopaque{ null, null }, - }, - .freebsd, .openbsd => extern struct { - ptr: ?*anyopaque = null, - }, - .hermit => extern struct { - ptr: usize = std.math.maxInt(usize), - }, - else => @compileError("pthread_rwlock_t not implemented for this platform"), - }; - - extern "c" fn pthread_rwlock_destroy(p: *pthread_rwlock_t) callconv(.C) std.posix.E; - extern "c" fn pthread_rwlock_rdlock(p: *pthread_rwlock_t) callconv(.C) std.posix.E; - extern "c" fn pthread_rwlock_wrlock(p: *pthread_rwlock_t) callconv(.C) std.posix.E; - extern "c" fn pthread_rwlock_tryrdlock(p: *pthread_rwlock_t) callconv(.C) std.posix.E; - extern "c" fn pthread_rwlock_trywrlock(p: *pthread_rwlock_t) callconv(.C) std.posix.E; - extern "c" fn pthread_rwlock_unlock(p: *pthread_rwlock_t) callconv(.C) std.posix.E; - } -else - struct { - /// https://github.com/bloomberg/rwl-bench/blob/master/bench11.cpp - state: usize, - mutex: Mutex, - semaphore: Semaphore, - - const IS_WRITING: usize = 1; - const WRITER: usize = 1 << 1; - const READER: usize = 1 << (1 + std.meta.bitCount(Count)); - const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(WRITER); - const READER_MASK: usize = std.math.maxInt(Count) << @ctz(READER); - const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2)); - - pub fn init() RwLock { - return .{ - .state = 0, - .mutex = Mutex.init(), - .semaphore = Semaphore.init(0), - }; - } - - pub fn deinit(self: *RwLock) void { - self.semaphore.deinit(); - self.mutex.deinit(); - self.* = undefined; - } - - pub fn tryLock(self: *RwLock) bool { - if (self.mutex.tryLock()) { - const state = @atomicLoad(usize, &self.state, .seq_cst); - if (state & READER_MASK == 0) { - _ = @atomicRmw(usize, &self.state, .Or, IS_WRITING, .seq_cst); - return true; - } - - self.mutex.unlock(); - } - - return false; - } - - pub fn lock(self: *RwLock) void { - _ = @atomicRmw(usize, &self.state, .Add, WRITER, .seq_cst); - self.mutex.lock(); - - const state = @atomicRmw(usize, &self.state, .Or, IS_WRITING, .seq_cst); - if (state & READER_MASK != 0) - self.semaphore.wait(); - } - - pub fn unlock(self: *RwLock) void { - _ = @atomicRmw(usize, &self.state, .And, ~IS_WRITING, .seq_cst); - self.mutex.unlock(); - } - - pub fn tryLockShared(self: *RwLock) bool { - const state = @atomicLoad(usize, &self.state, .seq_cst); - if (state & (IS_WRITING | WRITER_MASK) == 0) { - _ = @cmpxchgStrong( - usize, - &self.state, - state, - state + READER, - .seq_cst, - .seq_cst, - ) orelse return true; - } - - if (self.mutex.tryLock()) { - _ = @atomicRmw(usize, &self.state, .Add, READER, .seq_cst); - self.mutex.unlock(); - return true; - } - - return false; - } - - pub fn lockShared(self: *RwLock) void { - var state = @atomicLoad(usize, &self.state, .seq_cst); - while (state & (IS_WRITING | WRITER_MASK) == 0) { - state = @cmpxchgWeak( - usize, - &self.state, - state, - state + READER, - .seq_cst, - .seq_cst, - ) orelse return; - } - - self.mutex.lock(); - _ = @atomicRmw(usize, &self.state, .Add, READER, .seq_cst); - self.mutex.unlock(); - } - - pub fn unlockShared(self: *RwLock) void { - const state = @atomicRmw(usize, &self.state, .Sub, READER, .seq_cst); - - if ((state & READER_MASK == READER) and (state & IS_WRITING != 0)) - self.semaphore.post(); - } - }; - -pub const WaitGroup = struct { - mutex: Mutex, - cond: Condvar, - active: usize, - - pub fn init() WaitGroup { - return .{ - .mutex = Mutex.init(), - .cond = Condvar.init(), - .active = 0, - }; - } - - pub fn deinit(self: *WaitGroup) void { - self.mutex.deinit(); - self.cond.deinit(); - self.* = undefined; - } - - pub fn addN(self: *WaitGroup, n: usize) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.active += n; - } - - pub fn add(self: *WaitGroup) void { - return self.addN(1); - } - - pub fn done(self: *WaitGroup) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.active -= 1; - if (self.active == 0) - self.cond.signal(); - } - - pub fn wait(self: *WaitGroup) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - while (self.active != 0) - self.cond.wait(&self.mutex); - } -}; - -pub const Semaphore = struct { - mutex: Mutex, - cond: Condvar, - permits: usize, - - pub fn init(permits: usize) Semaphore { - return .{ - .mutex = Mutex.init(), - .cond = Condvar.init(), - .permits = permits, - }; - } - - pub fn deinit(self: *Semaphore) void { - self.mutex.deinit(); - self.cond.deinit(); - self.* = undefined; - } - - pub fn wait(self: *Semaphore) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - while (self.permits == 0) - self.cond.wait(&self.mutex); - - self.permits -= 1; - if (self.permits > 0) - self.cond.signal(); - } - - pub fn post(self: *Semaphore) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.permits += 1; - self.cond.signal(); - } -}; - -pub const Mutex = if (@import("builtin").os.tag == .windows) - struct { - srwlock: SRWLOCK, - - pub fn init() Mutex { - return .{ .srwlock = SRWLOCK_INIT }; - } - - pub fn deinit(self: *Mutex) void { - self.* = undefined; - } - - pub fn tryLock(self: *Mutex) bool { - return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE; - } - - pub fn lock(self: *Mutex) void { - AcquireSRWLockExclusive(&self.srwlock); - } - - pub fn unlock(self: *Mutex) void { - ReleaseSRWLockExclusive(&self.srwlock); - } - - const SRWLOCK = usize; - const SRWLOCK_INIT: SRWLOCK = 0; - - extern "kernel32" fn TryAcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) system.BOOL; - extern "kernel32" fn AcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void; - extern "kernel32" fn ReleaseSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void; - } -else if (@import("builtin").link_libc) - struct { - mutex: if (@import("builtin").link_libc) std.c.pthread_mutex_t else void, - - pub fn init() Mutex { - return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER }; - } - - pub fn deinit(self: *Mutex) void { - const safe_rc = switch (@import("builtin").os.tag) { - .dragonfly, .netbsd => std.posix.EAGAIN, - else => std.c.E.SUCCESS, - }; - - const rc = std.c.pthread_mutex_destroy(&self.mutex); - bun.assert(rc == .SUCCESS or rc == safe_rc); - - self.* = undefined; - } - - pub fn tryLock(self: *Mutex) bool { - return pthread_mutex_trylock(&self.mutex) == 0; - } - - pub fn lock(self: *Mutex) void { - const rc = std.c.pthread_mutex_lock(&self.mutex); - bun.assert(rc == .SUCCESS); - } - - pub fn unlock(self: *Mutex) void { - const rc = std.c.pthread_mutex_unlock(&self.mutex); - bun.assert(rc == .SUCCESS); - } - - extern "c" fn pthread_mutex_trylock(m: *std.c.pthread_mutex_t) callconv(.C) c_int; - } -else if (@import("builtin").os.tag == .linux) - struct { - state: State, - - const State = enum(i32) { - unlocked, - locked, - waiting, - }; - - pub fn init() Mutex { - return .{ .state = .unlocked }; - } - - pub fn deinit(self: *Mutex) void { - self.* = undefined; - } - - pub fn tryLock(self: *Mutex) bool { - return @cmpxchgStrong( - State, - &self.state, - .unlocked, - .locked, - .acquire, - .monotonic, - ) == null; - } - - pub fn lock(self: *Mutex) void { - switch (@atomicRmw(State, &self.state, .Xchg, .locked, .acquire)) { - .unlocked => {}, - else => |s| self.lockSlow(s), - } - } - - fn lockSlow(self: *Mutex, current_state: State) void { - @branchHint(.cold); - - var new_state = current_state; - while (true) { - for (0..100) |spin| { - const state = @cmpxchgWeak( - State, - &self.state, - .unlocked, - new_state, - .acquire, - .monotonic, - ) orelse return; - - switch (state) { - .unlocked => {}, - .locked => {}, - .waiting => break, - } - - for (0..spin) |_| - spinLoopHint(); - } - - new_state = .waiting; - switch (@atomicRmw(State, &self.state, .Xchg, new_state, .acquire)) { - .unlocked => return, - else => {}, - } - - Futex.wait( - @as(*const i32, @ptrCast(&self.state)), - @intFromEnum(new_state), - ); - } - } - - pub fn unlock(self: *Mutex) void { - switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .release)) { - .unlocked => unreachable, - .locked => {}, - .waiting => self.unlockSlow(), - } - } - - fn unlockSlow(self: *Mutex) void { - @branchHint(.cold); - - Futex.wake(@as(*const i32, @ptrCast(&self.state))); - } - } -else - struct { - is_locked: bool, - - pub fn init() Mutex { - return .{ .is_locked = false }; - } - - pub fn deinit(self: *Mutex) void { - self.* = undefined; - } - - pub fn tryLock(self: *Mutex) bool { - return @atomicRmw(bool, &self.is_locked, .Xchg, true, .acquire) == false; - } - - pub fn lock(self: *Mutex) void { - while (!self.tryLock()) - spinLoopHint(); - } - - pub fn unlock(self: *Mutex) void { - @atomicStore(bool, &self.is_locked, false, .release); - } - }; - -pub const Condvar = if (@import("builtin").os.tag == .windows) - struct { - cond: CONDITION_VARIABLE, - - pub fn init() Condvar { - return .{ .cond = CONDITION_VARIABLE_INIT }; - } - - pub fn deinit(self: *Condvar) void { - self.* = undefined; - } - - pub fn wait(self: *Condvar, mutex: *Mutex) void { - const rc = SleepConditionVariableSRW( - &self.cond, - &mutex.srwlock, - system.INFINITE, - @as(system.ULONG, 0), - ); - - bun.assert(rc != system.FALSE); - } - - pub fn signal(self: *Condvar) void { - WakeConditionVariable(&self.cond); - } - - pub fn broadcast(self: *Condvar) void { - WakeAllConditionVariable(&self.cond); - } - - const SRWLOCK = usize; - const CONDITION_VARIABLE = usize; - const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0; - - extern "kernel32" fn WakeAllConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void; - extern "kernel32" fn WakeConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void; - extern "kernel32" fn SleepConditionVariableSRW( - c: *CONDITION_VARIABLE, - s: *SRWLOCK, - t: system.DWORD, - f: system.ULONG, - ) callconv(system.WINAPI) system.BOOL; - } -else if (@import("builtin").link_libc) - struct { - cond: if (@import("builtin").link_libc) std.c.pthread_cond_t else void, - - pub fn init() Condvar { - return .{ .cond = std.c.PTHREAD_COND_INITIALIZER }; - } - - pub fn deinit(self: *Condvar) void { - const safe_rc = switch (@import("builtin").os.tag) { - .dragonfly, .netbsd => std.posix.EAGAIN, - else => std.c.E.SUCCESS, - }; - - const rc = std.c.pthread_cond_destroy(&self.cond); - bun.assert(rc == .SUCCESS or rc == safe_rc); - - self.* = undefined; - } - - pub fn wait(self: *Condvar, mutex: *Mutex) void { - const rc = std.c.pthread_cond_wait(&self.cond, &mutex.mutex); - bun.assert(rc == .SUCCESS); - } - - pub fn signal(self: *Condvar) void { - const rc = std.c.pthread_cond_signal(&self.cond); - bun.assert(rc == .SUCCESS); - } - - pub fn broadcast(self: *Condvar) void { - const rc = std.c.pthread_cond_broadcast(&self.cond); - bun.assert(rc == .SUCCESS); - } - } -else - struct { - mutex: Mutex, - notified: bool, - waiters: std.SinglyLinkedList(Event), - - pub fn init() Condvar { - return .{ - .mutex = Mutex.init(), - .notified = false, - .waiters = .{}, - }; - } - - pub fn deinit(self: *Condvar) void { - self.mutex.deinit(); - self.* = undefined; - } - - pub fn wait(self: *Condvar, mutex: *Mutex) void { - self.mutex.lock(); - - if (self.notified) { - self.notified = false; - self.mutex.unlock(); - return; - } - - var wait_node = @TypeOf(self.waiters).Node{ .data = .{} }; - self.waiters.prepend(&wait_node); - self.mutex.unlock(); - - mutex.unlock(); - wait_node.data.wait(); - mutex.lock(); - } - - pub fn signal(self: *Condvar) void { - self.mutex.lock(); - - const maybe_wait_node = self.waiters.popFirst(); - if (maybe_wait_node == null) - self.notified = true; - - self.mutex.unlock(); - - if (maybe_wait_node) |wait_node| - wait_node.data.set(); - } - - pub fn broadcast(self: *Condvar) void { - self.mutex.lock(); - - var waiters = self.waiters; - self.notified = true; - - self.mutex.unlock(); - - while (waiters.popFirst()) |wait_node| - wait_node.data.set(); - } - - const Event = struct { - futex: i32 = 0, - - fn wait(self: *Event) void { - while (@atomicLoad(i32, &self.futex, .acquire) == 0) { - if (@hasDecl(Futex, "wait")) { - Futex.wait(&self.futex, 0); - } else { - spinLoopHint(); - } - } - } - - fn set(self: *Event) void { - @atomicStore(i32, &self.futex, 1, .release); - - if (@hasDecl(Futex, "wake")) - Futex.wake(&self.futex); - } - }; - }; - -const Futex = switch (@import("builtin").os.tag) { - .linux => struct { - fn wait(ptr: *const i32, cmp: i32) void { - switch (system.getErrno(system.futex_wait( - ptr, - system.FUTEX.PRIVATE_FLAG | system.FUTEX.WAIT, - cmp, - null, - ))) { - 0 => {}, - std.posix.EINTR => {}, - std.posix.EAGAIN => {}, - else => unreachable, - } - } - - fn wake(ptr: *const i32) void { - switch (system.getErrno(system.futex_wake( - ptr, - system.FUTEX.PRIVATE_FLAG | system.FUTEX.WAKE, - @as(i32, 1), - ))) { - 0 => {}, - std.posix.EFAULT => {}, - else => unreachable, - } - } - }, - else => void, -}; - -fn spinLoopHint() void { - switch (@import("builtin").cpu.arch) { - .i386, .x86_64 => asm volatile ("pause" ::: "memory"), - .arm, .aarch64 => asm volatile ("yield" ::: "memory"), - else => {}, - } -} diff --git a/src/threading.zig b/src/threading.zig index df1ba2bd2a..ffde37c41b 100644 --- a/src/threading.zig +++ b/src/threading.zig @@ -1,3 +1,7 @@ pub const Mutex = @import("./threading/Mutex.zig"); pub const Futex = @import("./threading/Futex.zig"); pub const Condition = @import("./threading/Condition.zig"); +pub const WaitGroup = @import("./threading/WaitGroup.zig"); +pub const ThreadPool = @import("./threading/ThreadPool.zig"); +pub const Channel = @import("./threading/channel.zig").Channel; +pub const UnboundedQueue = @import("./threading/unbounded_queue.zig").UnboundedQueue; diff --git a/src/thread_pool.zig b/src/threading/ThreadPool.zig similarity index 83% rename from src/thread_pool.zig rename to src/threading/ThreadPool.zig index 0e9ab1394c..6f12a4dc45 100644 --- a/src/thread_pool.zig +++ b/src/threading/ThreadPool.zig @@ -1,29 +1,54 @@ // Thank you @kprotty. -// https://github.com/kprotty/zap/blob/blog/src/thread_pool.zig +// +// This file contains code derived from the following source: +// https://github.com/kprotty/zap/blob/blog/src/thread_pool.zig +// +// That code is covered by the following copyright and license notice: +// MIT License +// +// Copyright (c) 2021 kprotty +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. const std = @import("std"); const bun = @import("bun"); const ThreadPool = @This(); const Futex = bun.threading.Futex; +const WaitGroup = bun.threading.WaitGroup; const Environment = bun.Environment; const assert = bun.assert; const Atomic = std.atomic.Value; -pub const OnSpawnCallback = *const fn (ctx: ?*anyopaque) ?*anyopaque; sleep_on_idle_network_thread: bool = true, -/// executed on the thread -on_thread_spawn: ?OnSpawnCallback = null, -threadpool_context: ?*anyopaque = null, stack_size: u32, max_threads: u32, -sync: Atomic(u32) = Atomic(u32).init(@as(u32, @bitCast(Sync{}))), +sync: Atomic(u32) = .init(@as(u32, @bitCast(Sync{}))), idle_event: Event = .{}, join_event: Event = .{}, run_queue: Node.Queue = .{}, -threads: Atomic(?*Thread) = Atomic(?*Thread).init(null), +threads: Atomic(?*Thread) = .init(null), name: []const u8 = "", -spawned_thread_count: Atomic(u32) = Atomic(u32).init(0), +spawned_thread_count: Atomic(u32) = .init(0), +wait_group: WaitGroup = .init(), +/// Used by `schedule` to optimize for the case where the thread pool isn't running yet. +is_running: Atomic(bool) = .init(false), const Sync = packed struct { /// Tracks the number of threads not searching for Tasks @@ -69,8 +94,9 @@ pub fn wakeForIdleEvents(this: *ThreadPool) void { this.idle_event.wake(Event.NOTIFIED, std.math.maxInt(u32)); } -/// Wait for a thread to call shutdown() on the thread pool and kill the worker threads. +/// Shut down the thread pool and stop the worker threads. pub fn deinit(self: *ThreadPool) void { + self.shutdown(); self.join(); self.* = undefined; } @@ -131,170 +157,45 @@ pub const Batch = struct { } }; -pub const WaitGroup = struct { - mutex: bun.Mutex = .{}, - counter: u32 = 0, - event: std.Thread.ResetEvent = .{}, - - pub fn init(self: *WaitGroup) void { - self.* = .{}; - } - - pub fn deinit(self: *WaitGroup) void { - self.event.reset(); - self.* = undefined; - } - - pub fn start(self: *WaitGroup) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.counter += 1; - } - - pub fn isDone(this: *WaitGroup) bool { - return @atomicLoad(u32, &this.counter, .monotonic) == 0; - } - - pub fn finish(self: *WaitGroup) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.counter -= 1; - - if (self.counter == 0) { - self.event.set(); - } - } - - pub fn wait(self: *WaitGroup) void { - while (true) { - self.mutex.lock(); - - if (self.counter == 0) { - self.mutex.unlock(); - return; - } - - self.mutex.unlock(); - self.event.wait(); - } - } - - pub fn reset(self: *WaitGroup) void { - self.event.reset(); - } -}; - -pub fn ConcurrentFunction( - comptime Function: anytype, -) type { - return struct { - const Fn = Function; - const Args = std.meta.ArgsTuple(@TypeOf(Fn)); - const Runner = @This(); - thread_pool: *ThreadPool, - states: []Routine = undefined, - batch: Batch = .{}, - allocator: std.mem.Allocator, - - pub fn init(allocator: std.mem.Allocator, thread_pool: *ThreadPool, count: usize) !Runner { - return Runner{ - .allocator = allocator, - .thread_pool = thread_pool, - .states = try allocator.alloc(Routine, count), - .batch = .{}, - }; - } - - pub fn call(this: *@This(), args: Args) void { - this.states[this.batch.len] = .{ - .args = args, - }; - this.batch.push(Batch.from(&this.states[this.batch.len].task)); - } - - pub fn run(this: *@This()) void { - this.thread_pool.schedule(this.batch); - } - - pub const Routine = struct { - args: Args, - task: Task = .{ .callback = callback }, - - pub fn callback(task: *Task) void { - const routine: *@This() = @fieldParentPtr("task", task); - @call(bun.callmod_inline, Fn, routine.args); - } - }; - - pub fn deinit(this: *@This()) void { - this.allocator.free(this.states); - } - }; -} - -pub fn runner( - this: *ThreadPool, - allocator: std.mem.Allocator, - comptime Function: anytype, - count: usize, -) !ConcurrentFunction(Function) { - return try ConcurrentFunction(Function).init(allocator, this, count); -} - -/// Loop over an array of tasks and invoke `Run` on each one in a different thread +/// Loop over an array of tasks and invoke `run_fn` on each one in a different thread. /// **Blocks the calling thread** until all tasks are completed. -pub fn do( +/// +/// This function does not shut down or deinit the thread pool. +pub fn each( this: *ThreadPool, allocator: std.mem.Allocator, - wg: ?*WaitGroup, ctx: anytype, - comptime Run: anytype, + comptime run_fn: anytype, values: anytype, ) !void { - return try Do(this, allocator, wg, @TypeOf(ctx), ctx, Run, @TypeOf(values), values, false); + return try eachImpl(this, allocator, ctx, run_fn, values, false); } -pub fn doPtr( +/// Like `each`, but calls `run_fn` with a pointer to the value. +pub fn eachPtr( this: *ThreadPool, allocator: std.mem.Allocator, - wg: ?*WaitGroup, ctx: anytype, - comptime Run: anytype, + comptime run_fn: anytype, values: anytype, ) !void { - return try Do(this, allocator, wg, @TypeOf(ctx), ctx, Run, @TypeOf(values), values, true); + return try eachImpl(this, allocator, ctx, run_fn, values, true); } -pub fn Do( +fn eachImpl( this: *ThreadPool, allocator: std.mem.Allocator, - wg: ?*WaitGroup, - comptime Context: type, - ctx: Context, - comptime Function: anytype, - comptime ValuesType: type, - values: ValuesType, + ctx: anytype, + comptime run_fn: anytype, + values: anytype, comptime as_ptr: bool, ) !void { - if (values.len == 0) - return; - var allocated_wait_group: ?*WaitGroup = null; - defer { - if (allocated_wait_group) |group| { - group.deinit(); - allocator.destroy(group); - } - } + const Context = @TypeOf(ctx); + const ValuesType = @TypeOf(values); + + if (values.len == 0) return; - var wait_group = wg orelse brk: { - allocated_wait_group = try allocator.create(WaitGroup); - allocated_wait_group.?.init(); - break :brk allocated_wait_group.?; - }; const WaitContext = struct { - wait_group: *WaitGroup = undefined, ctx: Context, values: ValuesType, }; @@ -307,94 +208,97 @@ pub fn Do( pub fn call(task: *Task) void { var runner_task: *@This() = @fieldParentPtr("task", task); const i = runner_task.i; - if (comptime as_ptr) { - Function(runner_task.ctx.ctx, &runner_task.ctx.values[i], i); - } else { - Function(runner_task.ctx.ctx, runner_task.ctx.values[i], i); - } - - runner_task.ctx.wait_group.finish(); + const value = &runner_task.ctx.values[i]; + run_fn(runner_task.ctx.ctx, if (comptime as_ptr) value else value.*, i); } }; - const wait_context = allocator.create(WaitContext) catch unreachable; - wait_context.* = .{ + + var wait_context = WaitContext{ .ctx = ctx, - .wait_group = wait_group, .values = values, }; - defer allocator.destroy(wait_context); - var tasks = allocator.alloc(RunnerTask, values.len) catch unreachable; - defer allocator.free(tasks); - var batch: Batch = undefined; - var offset = tasks.len - 1; - { - tasks[0] = .{ + const tasks = allocator.alloc(RunnerTask, values.len) catch unreachable; + defer allocator.free(tasks); + var batch: Batch = .{}; + var offset = tasks.len; + + for (tasks) |*runner_task| { + offset -= 1; + runner_task.* = .{ .i = offset, .task = .{ .callback = RunnerTask.call }, - .ctx = wait_context, + .ctx = &wait_context, }; - batch = Batch.from(&tasks[0].task); + batch.push(Batch.from(&runner_task.task)); } - if (tasks.len > 1) { - for (tasks[1..]) |*runner_task| { - offset -= 1; - runner_task.* = .{ - .i = offset, - .task = .{ .callback = RunnerTask.call }, - .ctx = wait_context, - }; - batch.push(Batch.from(&runner_task.task)); - } - } - - wait_group.counter += @as(u32, @intCast(values.len)); this.schedule(batch); - wait_group.wait(); + this.waitForAll(); } -/// Schedule a batch of tasks to be executed by some thread on the thread pool. -pub fn schedule(self: *ThreadPool, batch: Batch) void { +fn scheduleImpl(self: *ThreadPool, batch: Batch, try_current: bool) void { // Sanity check if (batch.len == 0) { return; } - // Extract out the Node's from the Tasks + // Extract out the `Node`s from the `Task`s var list = Node.List{ .head = &batch.head.?.node, .tail = &batch.tail.?.node, }; - // Push the task Nodes to the most appropriate queue - if (Thread.current) |thread| { + // .monotonic access is okay because: + // + // * If the thread pool hasn't started yet, no thread could concurrently set + // `is_running` to true, because thread pool initialization should only + // happen on one thread. + // + // * If the thread pool is running, the current thread could be one of the threads + // in the thread pool, but `is_running` was necessarily set to true before the + // thread was created. + if (self.is_running.load(.monotonic)) { + self.wait_group.add(batch.len); + } else { + self.wait_group.addUnsynchronized(batch.len); + } + + const current = blk: { + if (!try_current) break :blk null; + const current = Thread.current orelse break :blk null; + // Make sure thread is part of this thread pool, not a different one. + break :blk if (current.thread_pool == self) current else null; + }; + if (current) |thread| { thread.run_buffer.push(&list) catch thread.run_queue.push(list); } else { self.run_queue.push(list); } - forceSpawn(self); } +/// Schedule a batch of tasks to be executed by some thread on the thread pool. +pub fn schedule(self: *ThreadPool, batch: Batch) void { + self.scheduleImpl(batch, false); +} + +/// This function should only be called from threads that are part of the thread pool. pub fn scheduleInsideThreadPool(self: *ThreadPool, batch: Batch) void { - // Sanity check - if (batch.len == 0) { - return; - } - - // Extract out the Node's from the Tasks - const list = Node.List{ - .head = &batch.head.?.node, - .tail = &batch.tail.?.node, - }; - - // Push the task Nodes to the most appropriate queue - self.run_queue.push(list); - - forceSpawn(self); + self.scheduleImpl(batch, true); } -pub fn forceSpawn(self: *ThreadPool) void { +/// Wait for all tasks to complete. This does not shut down or deinit the thread pool. +pub fn waitForAll(self: *ThreadPool) void { + self.wait_group.wait(); +} + +/// Wait for all tasks to complete, then shut down and deinit the thread pool. +pub fn waitAndDeinit(self: *ThreadPool) void { + self.waitForAll(); + self.deinit(); +} + +fn forceSpawn(self: *ThreadPool) void { // Try to notify a thread const is_waking = false; return self.notify(is_waking); @@ -416,11 +320,9 @@ inline fn notify(self: *ThreadPool, is_waking: bool) void { pub const default_thread_stack_size = brk: { // 4mb const default = 4 * 1024 * 1024; - if (!Environment.isMac) break :brk default; const size = default - (default % std.heap.page_size_max); - // stack size must be a multiple of page_size // macOS will fail to spawn a thread if the stack size is not a multiple of page_size if (size % std.heap.page_size_max != 0) @@ -432,6 +334,7 @@ pub const default_thread_stack_size = brk: { /// Warm the thread pool up to the given number of threads. /// https://www.youtube.com/watch?v=ys3qcbO5KWw pub fn warm(self: *ThreadPool, count: u14) void { + self.is_running.store(true, .monotonic); var sync = @as(Sync, @bitCast(self.sync.load(.monotonic))); if (sync.spawned >= count) return; @@ -453,6 +356,7 @@ pub fn warm(self: *ThreadPool, count: u14) void { } noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void { + self.is_running.store(true, .monotonic); var sync = @as(Sync, @bitCast(self.sync.load(.monotonic))); while (sync.state != .shutdown) { const can_wake = is_waking or (sync.state == .pending); @@ -592,17 +496,6 @@ fn register(noalias self: *ThreadPool, noalias thread: *Thread) void { } } -pub fn setThreadContext(noalias pool: *ThreadPool, ctx: ?*anyopaque) void { - pool.threadpool_context = ctx; - - var thread = pool.threads.load(.monotonic) orelse return; - thread.ctx = pool.threadpool_context; - while (thread.next) |next| { - next.ctx = pool.threadpool_context; - thread = next; - } -} - fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void { // Un-spawn one thread, either due to a failed OS thread spawning or the thread is exiting. const one_spawned = @as(u32, @bitCast(Sync{ .spawned = 1 })); @@ -620,7 +513,7 @@ fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void { thread.join_event.wait(); // After receiving the shutdown signal, shutdown the next thread in the pool. - // We have to do that without touching the thread pool itself since it's memory is invalidated by now. + // We have to do that without touching the thread pool itself since its memory is invalidated by now. // So just follow our .next link. const next_thread = thread.next orelse return; next_thread.join_event.notify(); @@ -652,7 +545,7 @@ pub const Thread = struct { run_queue: Node.Queue = .{}, idle_queue: Node.Queue = .{}, run_buffer: Node.Buffer = .{}, - ctx: ?*anyopaque = null, + thread_pool: *ThreadPool, pub threadlocal var current: ?*Thread = null; @@ -663,7 +556,7 @@ pub const Thread = struct { }; self.idle_queue.push(list); } - var counter: std.atomic.Value(u32) = std.atomic.Value(u32).init(0); + var counter: std.atomic.Value(u32) = .init(0); /// Thread entry point which runs a worker for the ThreadPool fn run(thread_pool: *ThreadPool) void { @@ -674,16 +567,12 @@ pub const Thread = struct { Output.Source.configureNamedThread(named); } - var self_ = Thread{}; + var self_ = Thread{ .thread_pool = thread_pool }; var self = &self_; current = self; - - if (thread_pool.on_thread_spawn) |spawn| { - current.?.ctx = spawn(thread_pool.threadpool_context); - } + defer current = null; thread_pool.register(self); - defer thread_pool.unregister(self); var is_waking = false; @@ -697,10 +586,10 @@ pub const Thread = struct { const task: *Task = @fieldParentPtr("node", result.node); (task.callback)(task); + thread_pool.wait_group.finish(); } Output.flush(); - self.drainIdleEvents(); } } @@ -767,7 +656,7 @@ pub const Thread = struct { /// The event can be shutdown(), waking up all wait()ing threads and /// making subsequent wait()'s return immediately. const Event = struct { - state: Atomic(u32) = Atomic(u32).init(EMPTY), + state: Atomic(u32) = .init(EMPTY), const EMPTY = 0; const WAITING = 1; @@ -861,7 +750,7 @@ pub const Node = struct { /// An unbounded multi-producer-(non blocking)-multi-consumer queue of Node pointers. const Queue = struct { - stack: Atomic(usize) = Atomic(usize).init(0), + stack: Atomic(usize) = .init(0), cache: ?*Node = null, const HAS_CACHE: usize = 0b01; @@ -961,8 +850,8 @@ pub const Node = struct { /// A bounded single-producer, multi-consumer ring buffer for node pointers. const Buffer = struct { - head: Atomic(Index) = Atomic(Index).init(0), - tail: Atomic(Index) = Atomic(Index).init(0), + head: Atomic(Index) = .init(0), + tail: Atomic(Index) = .init(0), array: [capacity]Atomic(*Node) = undefined, const Index = u32; @@ -980,7 +869,7 @@ pub const Node = struct { var size = tail -% head; assert(size <= capacity); - // Push nodes from the list to the buffer if it's not empty.. + // Push nodes from the list to the buffer if it's not empty. if (size < capacity) { var nodes: ?*Node = list.head; while (size < capacity) : (size += 1) { diff --git a/src/threading/WaitGroup.zig b/src/threading/WaitGroup.zig new file mode 100644 index 0000000000..3dadbe1895 --- /dev/null +++ b/src/threading/WaitGroup.zig @@ -0,0 +1,61 @@ +// This file contains code derived from the following source: +// https://gist.github.com/kprotty/0d2dc3da4840341d6ff361b27bdac7dc#file-sync-zig +// +// That code contains the following license and copyright notice: +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +const bun = @import("bun"); +const Mutex = bun.threading.Mutex; +const Condition = bun.threading.Condition; + +const Self = @This(); + +mutex: Mutex = .{}, +cond: Condition = .{}, +active: usize = 0, + +pub fn init() Self { + return .{}; +} + +pub fn initWithCount(count: usize) Self { + return .{ .active = count }; +} + +pub fn addUnsynchronized(self: *Self, n: usize) void { + self.active += n; +} + +pub fn add(self: *Self, n: usize) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.addUnsynchronized(n); +} + +pub fn addOne(self: *Self) void { + return self.add(1); +} + +pub fn finish(self: *Self) void { + { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.active -= 1; + if (self.active != 0) return; + } + self.cond.signal(); +} + +pub fn wait(self: *Self) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + while (self.active != 0) + self.cond.wait(&self.mutex); +} diff --git a/src/threading/channel.zig b/src/threading/channel.zig new file mode 100644 index 0000000000..bfb5fd14ab --- /dev/null +++ b/src/threading/channel.zig @@ -0,0 +1,171 @@ +// This file contains code derived from the following source: +// https://gist.github.com/kprotty/0d2dc3da4840341d6ff361b27bdac7dc#file-sync2-zig + +const std = @import("std"); +const bun = @import("bun"); +const Mutex = bun.threading.Mutex; +const Condition = bun.threading.Condition; + +pub fn Channel( + comptime T: type, + comptime buffer_type: std.fifo.LinearFifoBufferType, +) type { + return struct { + mutex: Mutex, + putters: Condition, + getters: Condition, + buffer: Buffer, + is_closed: bool, + + const Self = @This(); + const Buffer = std.fifo.LinearFifo(T, buffer_type); + + pub const init = switch (buffer_type) { + .Static => initStatic, + .Slice => initSlice, + .Dynamic => initDynamic, + }; + + pub inline fn initStatic() Self { + return .withBuffer(Buffer.init()); + } + + pub inline fn initSlice(buf: []T) Self { + return .withBuffer(Buffer.init(buf)); + } + + pub inline fn initDynamic(allocator: std.mem.Allocator) Self { + return .withBuffer(Buffer.init(allocator)); + } + + fn withBuffer(buffer: Buffer) Self { + return Self{ + .mutex = .{}, + .putters = .{}, + .getters = .{}, + .buffer = buffer, + .is_closed = false, + }; + } + + pub fn deinit(self: *Self) void { + self.buffer.deinit(); + self.* = undefined; + } + + pub fn close(self: *Self) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + if (self.is_closed) + return; + + self.is_closed = true; + self.putters.broadcast(); + self.getters.broadcast(); + } + + pub fn tryWriteItem(self: *Self, item: T) !bool { + const wrote = try self.write(&[1]T{item}); + return wrote == 1; + } + + pub fn writeItem(self: *Self, item: T) !void { + return self.writeAll(&[1]T{item}); + } + + pub fn write(self: *Self, items: []const T) !usize { + return self.writeItems(items, false); + } + + pub fn tryReadItem(self: *Self) !?T { + var items: [1]T = undefined; + if ((try self.read(&items)) != 1) + return null; + return items[0]; + } + + pub fn readItem(self: *Self) !T { + var items: [1]T = undefined; + try self.readAll(&items); + return items[0]; + } + + pub fn read(self: *Self, items: []T) !usize { + return self.readItems(items, false); + } + + pub fn writeAll(self: *Self, items: []const T) !void { + bun.assert((try self.writeItems(items, true)) == items.len); + } + + pub fn readAll(self: *Self, items: []T) !void { + bun.assert((try self.readItems(items, true)) == items.len); + } + + fn writeItems(self: *Self, items: []const T, should_block: bool) !usize { + self.mutex.lock(); + defer self.mutex.unlock(); + + var pushed: usize = 0; + while (pushed < items.len) { + const did_push = blk: { + if (self.is_closed) + return error.Closed; + + self.buffer.write(items) catch |err| { + if (buffer_type == .Dynamic) + return err; + break :blk false; + }; + + self.getters.signal(); + break :blk true; + }; + + if (did_push) { + pushed += 1; + } else if (should_block) { + self.putters.wait(&self.mutex); + } else { + break; + } + } + + return pushed; + } + + fn readItems(self: *Self, items: []T, should_block: bool) !usize { + self.mutex.lock(); + defer self.mutex.unlock(); + + var popped: usize = 0; + while (popped < items.len) { + const new_item = blk: { + // Buffer can contain null items but readItem will return null if the buffer is empty. + // we need to check if the buffer is empty before trying to read an item. + if (self.buffer.count == 0) { + if (self.is_closed) + return error.Closed; + break :blk null; + } + + const item = self.buffer.readItem(); + self.putters.signal(); + break :blk item; + }; + + if (new_item) |item| { + items[popped] = item; + popped += 1; + } else if (should_block) { + self.getters.wait(&self.mutex); + } else { + break; + } + } + + return popped; + } + }; +} diff --git a/src/bun.js/unbounded_queue.zig b/src/threading/unbounded_queue.zig similarity index 100% rename from src/bun.js/unbounded_queue.zig rename to src/threading/unbounded_queue.zig diff --git a/test/internal/ban-words.test.ts b/test/internal/ban-words.test.ts index 333ef03119..98713a5b86 100644 --- a/test/internal/ban-words.test.ts +++ b/test/internal/ban-words.test.ts @@ -32,7 +32,7 @@ const words: Record "== alloc.ptr": { reason: "The std.mem.Allocator context pointer can be undefined, which makes this comparison undefined behavior" }, "!= alloc.ptr": { reason: "The std.mem.Allocator context pointer can be undefined, which makes this comparison undefined behavior" }, - [String.raw`: [a-zA-Z0-9_\.\*\?\[\]\(\)]+ = undefined,`]: { reason: "Do not default a struct field to undefined", limit: 235, regex: true }, + [String.raw`: [a-zA-Z0-9_\.\*\?\[\]\(\)]+ = undefined,`]: { reason: "Do not default a struct field to undefined", limit: 230, regex: true }, "usingnamespace": { reason: "Zig 0.15 will remove `usingnamespace`" }, "std.fs.Dir": { reason: "Prefer bun.sys + bun.FD instead of std.fs", limit: 170 },