Files
bun.sh/src/thread_pool.zig
Jarred Sumner 6362414d65 Bun gets a new bundler (#2312)
* alright now just gotta try running it

* fix a gajillion compiler errors

* even more code

* okay i fixed more errors

* wip

* Update launch.json

* Update string_builder.zig

* `fast_debug_build_mode` makes debug build 2x faster

* Update bundle_v2.zig

* more code!

* It bundles!

* Rename `Bun.Transpiler` to `Bun.Bundler`

* `import()` expressions almost work

* wip attempt to get import() expr to work

* Bundle namespace imports

* Attempt to fix the issue with import() unsuccessfully

* consider current working directory when resolving relative paths (#2313)

* consider current working directory when resolving relative paths

fixes #2298

* comment test

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>

* support `expect().toThrow(/pattern/)` (#2314)

- fix time-zone-dependent test failure

* fix missing `Blob` error messages on Linux (#2315)

* fix & clean up tests (#2318)

- skip flaky tests when running as `root`
- use `expect().toThrow()`
- clean up temporary files after tests

* feat(tty): add some `tty.WriteStream` methods to `process.{stdout, stderr}` (#2320)

* feat(stdio): add some `tty.WriteStream` methods

* chore(builtins): add process builtin gen'd code

* Fix docker install command

* `bun test` on macOS in GitHub Actions (#2322)

* Fixes #2323

* throw invalid parameter errors in `crypto.scryptSync` (#2331)

* throw invalid parameter errors

* remove comptime, add empty buffer function

* remove error_name comptime

* Add reference documentation for bun:test (#2327)

* Reorganize tests (#2332)

* Fix html-rewriter.test.js

* fix the wrong thing being incremented in hmr example (#2334)

* Add more test harness

* Improve Benchmarking page, small fixes (#2339)

* Improve benchmarking page

* WIP

* Add typescript instructions to hot

* Document preload in Plugins. Fix loader in plugin types.

* Fix typo

* Fix links

* run prettier

* Document openInEditor

* improve `Buffer` compatibility with Node.js (#2341)

* improve `Buffer` compatibility with Node.js

* use `memmove()`
allow `encoding` to be `undefined`

* run `bun test` after macOS builds (#2343)

* "binary" is an alias of "latin1"

Fixes https://github.com/oven-sh/bun/issues/2110

* More spec compliant `Blob.prototype.type` (#2340)

* Make `Blob.prototype. type` more spec compliant

* Add a few more checks for isNumber()

* Fix `make headers`

* Safer JSValue.isString()

* More tests for blob.slice

* Make `Blob.prototype.type` more spec compliant

* Add isASCII check

* Fix types

* Fix failing type test

* Update blob.zig

* Update blob.zig

* Fix .eql check on empty values

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>

* Fix bug in test runner

* Support `import()` expressions

* Implement `require()`

* clean up bit_set.zig slightly

* Move some things around

* misc cleanup

* Cleanup some things

* Fix a lot of stuff

* Fix `module.exports.fn = fn;` in ESM entry point

* Fix crash due when printing file

* Fix issue with class names

* Fix issue with `export default identifier`

* Update js_parser.zig

* optimization: inline single-property object acceses and arrays

* Fix undefined memory in renamed symbols list

* Handle call target

* wip

* Inline it

* Fix undefined memory issue when reclaiming blocks in ast

* Halt linking on any parse errors

* alias

* Rename `enable_bundling` to `enable_legacy_bundling`

* Workaround anonymous struct literal zig bug

* Use slower approach (without bitset) because it doesn't break after 8 symbols

* Fix incorrectly-renaming statically defined symbols

* Handle more edgecases in our bit_set fork

* Reduce number of allocations for `define`

* Do not rename unbound symbols

* Clean up dot defines a little more

* Make the generated names prettier

* Workaround runtime symbol missing issue

* Fail the build on errors

* Support export * from

* Support `--outfile`

* partially fix renaming

* fanicer symbol renaming impl

* misc, extremely revertible cleanup

* Fix up some bugs with symbol renaming

* formatting

* Update launch.json

* Parse `__PURE__` comments

* clean up simd code for pure comments

* changes to merge

* workaround runtime issue

* Fix issue with `export * as` not propagating correctly

* Make all top-level declarations `var` when bundling

* Fix missing prefix

* Fix assigning to stack copy

* Fix missing runtime symbol

* Fix bug with namespace exports

* Dramatically reduce allocations

* Update launch.json

* Add missing flags

* Update js_parser.zig

* small cleanup

* Make the export name better

* Fix unnecessary `var foo = foo`

* Implement CommonJS -> ESM conversion

* Implement module redirects

* Port esbuild bundler tests for new bundler (#2380)

* started porting esbuild tests

* clean up test names and api before moving on

* port tests using a program i wrote

* replace todo generated comment

* fix generated tests not including some files

* work on tests

* [github web editor] add define, external, inject, minifySyntax, minifyWhitespace options.

* get most of the todo comments out of the way, but expectBundled does not handle most of the cases

* continue working on esbuild tests

* use test.skip for unsupported tests

* Fixups for test runner

* Hoist imports & exports

* Fix test

* Hoist classes

* bundler test refining, 51/835

* Fix runtime require

* bundler test refining, 81/835

* bundler test refining, 93/835

* Make the test work in any timezone

* feat(expect): update toBeInstanceOf (#2396)

* feat: update instanceof binding

* fix: according to PR comments

* Rename `expectObjectTypeCount` to `expectMaxObjectTypeCount`

* Fix socket tests with connection errors (#2403)

* release pending activity with connection error handler

* unref poll_ref

* remove trailing comma

* Organize Dockerfiles for official status

* Remove test Dockerfile

* Remove old Docker workflow

* Feat(test): add toMatch (#2404)

* Fix various fetch/response/request tests (#2416)

* fix most fetch tests, skip a few

* fastGet, toValueGC, and invalid init

* bigint unreachable, range error, log process as process

* remove extra fetch_headers

* remove js_type parameter, check isObject()

* throw invalid mime type error, use enum literal

* switch back to promise rejection

* RangeError pascal case

* Fix several bugs (#2418)

* utf16 codepoint with replacement character

* Fix test failure with `TextEncoder("ascii')`

* Add missing type

* Fix Response.prototype.bodyUsed and Request.prototype.bodyUsed

* Fix bug with scrypt error not clearing

* Update server.zig

* oopsie

* 💅

* docs: Use correct url in the 'Issues' link in README header (#2420)

* Fix crash when rendering error page and the server or network is slow

* [fetch] Make the default body value `null` when unspecified

This is better aligned with the fetch spec

* Make node-net tests less flaky

* [node:net] Fix issue with `listen` callback firing before it's listening

* Always clear timers in node test harness

* Fix out of bounds access

Repro'd in Buffer tests

* Update UWS

cc @cirospaciari

* Make this test more thorough

* Hanging abort test

* 0 length body is a null stream

* Several bug fixes (#2427)

* Fix test

* Fix segfault when unexpected type is passed in `expect().toThrow`

* Fix issues with request constructor

* Don't bother cloning headers when its empty

* woops

* more tests

* fix incorrect test

* Make the fetch error messages better

* Update response.zig

* Fix test that failed on macOS

* Fix test

* Remove extra hash table lookups

* Support running dummy registry directly

cc @alexlamsl

* Update test

* Update test

* fixup

* Workaround crash in test runner

* Fixup test

* Fixup test

* Update os.test.js

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>

* Remove usages of port numbers in tests

* Set -O2 and -fno-rtti

* Remove -g

* Prevent undefined memory access

* [bun test] Implement `--rerun-each` flag to run each test N times

* Reduce number of module scopes created

* add some extra abort checks into streams (#2430)

* add some checks to avoid UAF

* avoid multiple calls to finalize if endFromJS is called more than once

* fix no-op comment

* mark as requested_end on abort

* remove requested_end from abort

* remove unnecessary check (#2432)

* Fix bug with scoped aliased dependencies in bun install on macOS

* remove `addLog`, remove `--prominent-compile-errors`

* Finish the upgrade

* Optional chaining flag

* Implement same_target_becomes_destructuring optimization

* bundler test refining, 109/835

* Reset bindings

* Support multiple entry points

* Implement `--entry-names` flag

* Use a tempdir with a better name

* prettier

* Log file name

* Update js_parser.zig

* Mark all bun builtins as external

* Make resolve errors actually errors

* Update bundler_default.test.ts

* Fix `await import(foo)`

* WIP react server components

* Do more stuff at runtime

* ✂️

* Support automatic JSX imports

* Use a module cache for now

* Update tsconfig.base.json

* Fix ThisOutsideFunctionNotRenamed

* woopsie

* moar cpu

* clamp it

* fixup

* Add a bunch of assertions

* Bun uses automatic runtime by default

* Parse Import Attributes

* Add a note about Valgrind

* Update developing.md

* Fix up code splitting for React Server Components

* Implement client component manifest

* Fix crash with --react-server-components and no client components

* Backport 4d31e3c917

* Update launch.json

* Fix for latest zig

* Workaround bug with ?[]const string

Occasionally saw alignment errors in this code

Workaround https://github.com/ziglang/zig/issues/15085

related: https://github.com/ziglang/zig/pull/15089

* switch to regular slice

* Avoid initializing named_imports and named_exports as undefined

* Reduce usages of `undefined`

* Add more assertions

* --watch wip

* Update javascript.zig

* Possibly fix the race condition

* Faster `do`

* bump allocator

* Reduce the size of `Symbol` slightly

* Alphabetically sort runtime import symbols, for determinism

* Prepare for code splitting

* handle overlapping stdout

* pure

* clean up some things

* Fix bug with `$$typeof`

* Address CommonJS -> ESM hoisting bug

* Support `"use server"` in manifest

* Implement `"use server"`

* Fix importing bun builtins when bundling

* Make `commonjs_to_esm` a feature flag, fix some splitting bugs

* ✂️

* fixme remove this

* Fix crash in longestCommonPath

* Chunking! Just need to do import paths now.

* Import paths work...now trying to figure out how to make runtime symbols work

* add workaround

* Replace `bun bun` with `bun build`

* Fix crash with dual package hazard

* Fix many CommonJS <> ESM interop bugs

* Support package.json `"sideEffects"`

also skip loading unnecessary package.json data in `bun run`

* add a not good --watch implementation

* bundler test refining, 140/831

* remove accidentally committed file

* do not return status code 1 on successful bundles

* bundler test refining, 159/830

* pass exit code to exitOrWatch

* clean up help menu

-remove two spaces to line up bun build
-moved all <r> tags to the end of the text they are colorizing
-moved other colors to the start of the text they colorize
-removed unneeded <r> tags, keeping only one at the start of the block

* importstar is fully ported

* wip

* you can run code in this branch now

* Disable this transform

* organize and document bundler tests

* Fix double import

* Fix sloppy mode function declarations

* Disable our CommonJS transform for now

* add `assertNotPresent` to make splitting cases easier

* Bump!

* Update bun.d.ts

* use import.meta.require in runtime code

* Disable this again

* Fix dirname

* Fix ESM -> CJS wrapper

* 💅

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: Alex Lam S.L <alexlamsl@gmail.com>
Co-authored-by: Derrick Farris <mr.dcfarris@gmail.com>
Co-authored-by: Ashcon Partovi <ashcon@partovi.net>
Co-authored-by: Dylan Conway <35280289+dylan-conway@users.noreply.github.com>
Co-authored-by: pfg <pfg@pfg.pw>
Co-authored-by: Colin McDonnell <colinmcd94@gmail.com>
Co-authored-by: dave caruso <me@paperdave.net>
Co-authored-by: zhiyuan <32867472+zhiyuang@users.noreply.github.com>
Co-authored-by: Dylan Conway <dylan.conway567@gmail.com>
Co-authored-by: Kamil Ogórek <kamil.ogorek@gmail.com>
Co-authored-by: Ciro Spaciari <ciro.spaciari@gmail.com>
2023-04-07 20:08:01 -07:00

1154 lines
42 KiB
Zig

// Thank you @kprotty.
// https://github.com/kprotty/zap/blob/blog/src/thread_pool.zig
const std = @import("std");
const bun = @import("bun");
const ThreadPool = @This();
const Futex = @import("./futex.zig");
const Environment = bun.Environment;
const assert = std.debug.assert;
const Atomic = std.atomic.Atomic;
pub const OnSpawnCallback = *const fn (ctx: ?*anyopaque) ?*anyopaque;
sleep_on_idle_network_thread: bool = true,
/// executed on the thread
on_thread_spawn: ?OnSpawnCallback = null,
threadpool_context: ?*anyopaque = null,
stack_size: u32,
max_threads: u32,
sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})),
idle_event: Event = .{},
join_event: Event = .{},
run_queue: Node.Queue = .{},
threads: Atomic(?*Thread) = Atomic(?*Thread).init(null),
name: []const u8 = "",
const Sync = packed struct {
/// Tracks the number of threads not searching for Tasks
idle: u14 = 0,
/// Tracks the number of threads spawned
spawned: u14 = 0,
/// What you see is what you get
unused: bool = false,
/// Used to not miss notifications while state = waking
notified: bool = false,
/// The current state of the thread pool
state: enum(u2) {
/// A notification can be issued to wake up a sleeping as the "waking thread".
pending = 0,
/// The state was notified with a signal. A thread is woken up.
/// The first thread to transition to `waking` becomes the "waking thread".
signaled,
/// There is a "waking thread" among us.
/// No other thread should be woken up until the waking thread transitions the state.
waking,
/// The thread pool was terminated. Start decremented `spawned` so that it can be joined.
shutdown,
} = .pending,
};
/// Configuration options for the thread pool.
/// TODO: add CPU core affinity?
pub const Config = struct {
stack_size: u32 = (std.Thread.SpawnConfig{}).stack_size,
max_threads: u32,
};
/// Statically initialize the thread pool using the configuration.
pub fn init(config: Config) ThreadPool {
return .{
.stack_size = std.math.max(1, config.stack_size),
.max_threads = std.math.max(1, config.max_threads),
};
}
/// Wait for a thread to call shutdown() on the thread pool and kill the worker threads.
pub fn deinit(self: *ThreadPool) void {
self.join();
self.* = undefined;
}
/// A Task represents the unit of Work / Job / Execution that the ThreadPool schedules.
/// The user provides a `callback` which is invoked when the *Task can run on a thread.
pub const Task = struct {
node: Node = .{},
callback: *const (fn (*Task) void),
};
/// An unordered collection of Tasks which can be submitted for scheduling as a group.
pub const Batch = struct {
len: usize = 0,
head: ?*Task = null,
tail: ?*Task = null,
pub fn pop(this: *Batch) ?*Task {
const len = @atomicLoad(usize, &this.len, .Monotonic);
if (len == 0) {
return null;
}
var task = this.head.?;
if (task.node.next) |node| {
this.head = @fieldParentPtr(Task, "node", node);
} else {
if (task != this.tail.?) unreachable;
this.tail = null;
this.head = null;
}
this.len -= 1;
if (len == 0) {
this.tail = null;
}
return task;
}
/// Create a batch from a single task.
pub fn from(task: *Task) Batch {
return Batch{
.len = 1,
.head = task,
.tail = task,
};
}
/// Another batch into this one, taking ownership of its tasks.
pub fn push(self: *Batch, batch: Batch) void {
if (batch.len == 0) return;
if (self.len == 0) {
self.* = batch;
} else {
self.tail.?.node.next = if (batch.head) |h| &h.node else null;
self.tail = batch.tail;
self.len += batch.len;
}
}
};
pub const WaitGroup = struct {
mutex: std.Thread.Mutex = .{},
counter: u32 = 0,
event: std.Thread.ResetEvent,
pub fn init(self: *WaitGroup) void {
self.* = .{
.mutex = .{},
.counter = 0,
.event = undefined,
};
}
pub fn deinit(self: *WaitGroup) void {
self.event.reset();
self.* = undefined;
}
pub fn start(self: *WaitGroup) void {
self.mutex.lock();
defer self.mutex.unlock();
self.counter += 1;
}
pub fn finish(self: *WaitGroup) void {
self.mutex.lock();
defer self.mutex.unlock();
self.counter -= 1;
if (self.counter == 0) {
self.event.set();
}
}
pub fn wait(self: *WaitGroup) void {
while (true) {
self.mutex.lock();
if (self.counter == 0) {
self.mutex.unlock();
return;
}
self.mutex.unlock();
self.event.wait();
}
}
pub fn reset(self: *WaitGroup) void {
self.event.reset();
}
};
pub fn ConcurrentFunction(
comptime Function: anytype,
) type {
return struct {
const Fn = Function;
const Args = std.meta.ArgsTuple(@TypeOf(Fn));
const Runner = @This();
thread_pool: *ThreadPool,
states: []Routine = undefined,
batch: Batch = .{},
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator, thread_pool: *ThreadPool, count: usize) !Runner {
return Runner{
.allocator = allocator,
.thread_pool = thread_pool,
.states = try allocator.alloc(Routine, count),
.batch = .{},
};
}
pub fn call(this: *@This(), args: Args) void {
this.states[this.batch.len] = .{
.args = args,
};
this.batch.push(Batch.from(&this.states[this.batch.len].task));
}
pub fn run(this: *@This()) void {
this.thread_pool.schedule(this.batch);
}
pub const Routine = struct {
args: Args,
task: Task = .{ .callback = callback },
pub fn callback(task: *Task) void {
var routine = @fieldParentPtr(@This(), "task", task);
@call(.always_inline, Fn, routine.args);
}
};
pub fn deinit(this: *@This()) void {
this.allocator.free(this.states);
}
};
}
pub fn runner(
this: *ThreadPool,
allocator: std.mem.Allocator,
comptime Function: anytype,
count: usize,
) !ConcurrentFunction(Function) {
return try ConcurrentFunction(Function).init(allocator, this, count);
}
/// Loop over an array of tasks and invoke `Run` on each one in a different thread
/// **Blocks the calling thread** until all tasks are completed.
pub fn do(
this: *ThreadPool,
allocator: std.mem.Allocator,
wg: ?*WaitGroup,
ctx: anytype,
comptime Run: anytype,
values: anytype,
) !void {
return try Do(this, allocator, wg, @TypeOf(ctx), ctx, Run, @TypeOf(values), values, false);
}
pub fn doPtr(
this: *ThreadPool,
allocator: std.mem.Allocator,
wg: ?*WaitGroup,
ctx: anytype,
comptime Run: anytype,
values: anytype,
) !void {
return try Do(this, allocator, wg, @TypeOf(ctx), ctx, Run, @TypeOf(values), values, true);
}
pub fn Do(
this: *ThreadPool,
allocator: std.mem.Allocator,
wg: ?*WaitGroup,
comptime Context: type,
ctx: Context,
comptime Function: anytype,
comptime ValuesType: type,
values: ValuesType,
comptime as_ptr: bool,
) !void {
if (values.len == 0)
return;
var allocated_wait_group: ?*WaitGroup = null;
defer {
if (allocated_wait_group) |group| {
group.deinit();
allocator.destroy(group);
}
}
var wait_group = wg orelse brk: {
allocated_wait_group = try allocator.create(WaitGroup);
allocated_wait_group.?.init();
break :brk allocated_wait_group.?;
};
const WaitContext = struct {
wait_group: *WaitGroup = undefined,
ctx: Context,
values: ValuesType,
};
const RunnerTask = struct {
task: Task,
ctx: *WaitContext,
i: usize = 0,
pub fn call(task: *Task) void {
var runner_task = @fieldParentPtr(@This(), "task", task);
const i = runner_task.i;
if (comptime as_ptr) {
Function(runner_task.ctx.ctx, &runner_task.ctx.values[i], i);
} else {
Function(runner_task.ctx.ctx, runner_task.ctx.values[i], i);
}
runner_task.ctx.wait_group.finish();
}
};
var wait_context = allocator.create(WaitContext) catch unreachable;
wait_context.* = .{
.ctx = ctx,
.wait_group = wait_group,
.values = values,
};
defer allocator.destroy(wait_context);
var tasks = allocator.alloc(RunnerTask, values.len) catch unreachable;
defer allocator.free(tasks);
var batch: Batch = undefined;
var offset = tasks.len - 1;
{
tasks[0] = .{
.i = offset,
.task = .{ .callback = RunnerTask.call },
.ctx = wait_context,
};
batch = Batch.from(&tasks[0].task);
}
if (tasks.len > 1) {
for (tasks[1..]) |*runner_task| {
offset -= 1;
runner_task.* = .{
.i = offset,
.task = .{ .callback = RunnerTask.call },
.ctx = wait_context,
};
batch.push(Batch.from(&runner_task.task));
}
}
wait_group.counter += @intCast(u32, values.len);
this.schedule(batch);
wait_group.wait();
}
test "parallel for loop" {
Output.initTest();
var thread_pool = ThreadPool.init(.{ .max_threads = 12 });
var sleepy_time: u32 = 100;
var huge_array = &[_]u32{
sleepy_time + std.rand.DefaultPrng.init(1).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(2).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(3).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(4).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(5).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(6).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(7).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(8).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(9).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(10).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(11).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(12).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(13).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(14).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(15).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(16).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(17).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(18).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(19).random().uintAtMost(u32, 20),
sleepy_time + std.rand.DefaultPrng.init(20).random().uintAtMost(u32, 20),
};
const Runner = struct {
completed: usize = 0,
total: usize = 0,
pub fn run(ctx: *@This(), value: u32, _: usize) void {
std.time.sleep(value);
ctx.completed += 1;
std.debug.assert(ctx.completed <= ctx.total);
}
};
var runny = try std.heap.page_allocator.create(Runner);
runny.* = .{ .total = huge_array.len };
try thread_pool.doAndWait(std.heap.page_allocator, null, runny, Runner.run, std.mem.span(huge_array));
try std.testing.expectEqual(huge_array.len, runny.completed);
}
/// Schedule a batch of tasks to be executed by some thread on the thread pool.
pub fn schedule(self: *ThreadPool, batch: Batch) void {
// Sanity check
if (batch.len == 0) {
return;
}
// Extract out the Node's from the Tasks
var list = Node.List{
.head = &batch.head.?.node,
.tail = &batch.tail.?.node,
};
// Push the task Nodes to the most appropriate queue
if (Thread.current) |thread| {
thread.run_buffer.push(&list) catch thread.run_queue.push(list);
} else {
self.run_queue.push(list);
}
forceSpawn(self);
}
pub fn forceSpawn(self: *ThreadPool) void {
// Try to notify a thread
const is_waking = false;
return self.notify(is_waking);
}
inline fn notify(self: *ThreadPool, is_waking: bool) void {
// Fast path to check the Sync state to avoid calling into notifySlow().
// If we're waking, then we need to update the state regardless
if (!is_waking) {
const sync = @bitCast(Sync, self.sync.load(.Monotonic));
if (sync.notified) {
return;
}
}
return self.notifySlow(is_waking);
}
noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (sync.state != .shutdown) {
const can_wake = is_waking or (sync.state == .pending);
if (is_waking) {
assert(sync.state == .waking);
}
var new_sync = sync;
new_sync.notified = true;
if (can_wake and sync.idle > 0) { // wake up an idle thread
new_sync.state = .signaled;
} else if (can_wake and sync.spawned < self.max_threads) { // spawn a new thread
new_sync.state = .signaled;
new_sync.spawned += 1;
} else if (is_waking) { // no other thread to pass on "waking" status
new_sync.state = .pending;
} else if (sync.notified) { // nothing to update
return;
}
// Release barrier synchronizes with Acquire in wait()
// to ensure pushes to run queues happen before observing a posted notification.
sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Release,
.Monotonic,
) orelse {
// We signaled to notify an idle thread
if (can_wake and sync.idle > 0) {
return self.idle_event.notify();
}
// We signaled to spawn a new thread
if (can_wake and sync.spawned < self.max_threads) {
const spawn_config = if (Environment.isMac)
// stack size must be a multiple of page_size
// macOS will fail to spawn a thread if the stack size is not a multiple of page_size
std.Thread.SpawnConfig{ .stack_size = ((std.Thread.SpawnConfig{}).stack_size + (std.mem.page_size / 2) / std.mem.page_size) * std.mem.page_size }
else
std.Thread.SpawnConfig{};
const thread = std.Thread.spawn(spawn_config, Thread.run, .{self}) catch return self.unregister(null);
// if (self.name.len > 0) thread.setName(self.name) catch {};
return thread.detach();
}
return;
});
}
}
// sleep_on_idle seems to impact `bun install` performance negatively
// so we can just not sleep for that
noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
var is_idle = false;
var is_waking = _is_waking;
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
if (sync.state == .shutdown) return error.Shutdown;
if (is_waking) assert(sync.state == .waking);
// Consume a notification made by notify().
if (sync.notified) {
var new_sync = sync;
new_sync.notified = false;
if (is_idle)
new_sync.idle -= 1;
if (sync.state == .signaled)
new_sync.state = .waking;
// Acquire barrier synchronizes with notify()
// to ensure that pushes to run queue are observed after wait() returns.
sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Acquire,
.Monotonic,
) orelse {
return is_waking or (sync.state == .signaled);
});
} else if (!is_idle) {
var new_sync = sync;
new_sync.idle += 1;
if (is_waking)
new_sync.state = .pending;
sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Monotonic,
.Monotonic,
) orelse {
is_waking = false;
is_idle = true;
continue;
});
} else {
self.idle_event.wait();
sync = @bitCast(Sync, self.sync.load(.Monotonic));
}
}
}
/// Marks the thread pool as shutdown
pub noinline fn shutdown(self: *ThreadPool) void {
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (sync.state != .shutdown) {
var new_sync = sync;
new_sync.notified = true;
new_sync.state = .shutdown;
new_sync.idle = 0;
// Full barrier to synchronize with both wait() and notify()
sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.AcqRel,
.Monotonic,
) orelse {
// Wake up any threads sleeping on the idle_event.
// TODO: I/O polling notification here.
if (sync.idle > 0) self.idle_event.shutdown();
return;
});
}
}
fn register(noalias self: *ThreadPool, noalias thread: *Thread) void {
// Push the thread onto the threads stack in a lock-free manner.
var threads = self.threads.load(.Monotonic);
while (true) {
thread.next = threads;
threads = self.threads.tryCompareAndSwap(
threads,
thread,
.Release,
.Monotonic,
) orelse break;
}
}
fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {
// Un-spawn one thread, either due to a failed OS thread spawning or the thread is exiting.
const one_spawned = @bitCast(u32, Sync{ .spawned = 1 });
const sync = @bitCast(Sync, self.sync.fetchSub(one_spawned, .Release));
assert(sync.spawned > 0);
// The last thread to exit must wake up the thread pool join()er
// who will start the chain to shutdown all the threads.
if (sync.state == .shutdown and sync.spawned == 1) {
self.join_event.notify();
}
// If this is a thread pool thread, wait for a shutdown signal by the thread pool join()er.
const thread = maybe_thread orelse return;
thread.join_event.wait();
// After receiving the shutdown signal, shutdown the next thread in the pool.
// We have to do that without touching the thread pool itself since it's memory is invalidated by now.
// So just follow our .next link.
const next_thread = thread.next orelse return;
next_thread.join_event.notify();
}
fn join(self: *ThreadPool) void {
// Wait for the thread pool to be shutdown() then for all threads to enter a joinable state
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
if (!(sync.state == .shutdown and sync.spawned == 0)) {
self.join_event.wait();
sync = @bitCast(Sync, self.sync.load(.Monotonic));
}
assert(sync.state == .shutdown);
assert(sync.spawned == 0);
// If there are threads, start off the chain sending it the shutdown signal.
// The thread receives the shutdown signal and sends it to the next thread, and the next..
const thread = self.threads.load(.Acquire) orelse return;
thread.join_event.notify();
}
const Output = @import("bun").Output;
pub const Thread = struct {
next: ?*Thread = null,
target: ?*Thread = null,
join_event: Event = .{},
run_queue: Node.Queue = .{},
run_buffer: Node.Buffer = .{},
ctx: ?*anyopaque = null,
pub threadlocal var current: ?*Thread = null;
/// Thread entry point which runs a worker for the ThreadPool
fn run(thread_pool: *ThreadPool) void {
Output.Source.configureThread();
var self = Thread{};
current = &self;
if (thread_pool.on_thread_spawn) |spawn| {
current.?.ctx = spawn(thread_pool.threadpool_context);
}
thread_pool.register(&self);
defer thread_pool.unregister(&self);
var is_waking = false;
while (true) {
is_waking = thread_pool.wait(is_waking) catch return;
while (self.pop(thread_pool)) |result| {
if (result.pushed or is_waking)
thread_pool.notify(is_waking);
is_waking = false;
const task = @fieldParentPtr(Task, "node", result.node);
(task.callback)(task);
}
Output.flush();
}
}
/// Try to dequeue a Node/Task from the ThreadPool.
/// Spurious reports of dequeue() returning empty are allowed.
fn pop(noalias self: *Thread, noalias thread_pool: *ThreadPool) ?Node.Buffer.Stole {
// Check our local buffer first
if (self.run_buffer.pop()) |node| {
return Node.Buffer.Stole{
.node = node,
.pushed = false,
};
}
// Then check our local queue
if (self.run_buffer.consume(&self.run_queue)) |stole| {
return stole;
}
// Then the global queue
if (self.run_buffer.consume(&thread_pool.run_queue)) |stole| {
return stole;
}
// Then try work stealing from other threads
var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned;
while (num_threads > 0) : (num_threads -= 1) {
// Traverse the stack of registered threads on the thread pool
const target = self.target orelse thread_pool.threads.load(.Acquire) orelse unreachable;
self.target = target.next;
// Try to steal from their queue first to avoid contention (the target steal's from queue last).
if (self.run_buffer.consume(&target.run_queue)) |stole| {
return stole;
}
// Skip stealing from the buffer if we're the target.
// We still steal from our own queue above given it may have just been locked the first time we tried.
if (target == self) {
continue;
}
// Steal from the buffer of a remote thread as a last resort
if (self.run_buffer.steal(&target.run_buffer)) |stole| {
return stole;
}
}
return null;
}
};
/// An event which stores 1 semaphore token and is multi-threaded safe.
/// The event can be shutdown(), waking up all wait()ing threads and
/// making subsequent wait()'s return immediately.
const Event = struct {
state: Atomic(u32) = Atomic(u32).init(EMPTY),
const EMPTY = 0;
const WAITING = 1;
const NOTIFIED = 2;
const SHUTDOWN = 3;
/// Wait for and consume a notification
/// or wait for the event to be shutdown entirely
noinline fn wait(self: *Event) void {
var acquire_with: u32 = EMPTY;
var state = self.state.load(.Monotonic);
while (true) {
// If we're shutdown then exit early.
// Acquire barrier to ensure operations before the shutdown() are seen after the wait().
// Shutdown is rare so it's better to have an Acquire barrier here instead of on CAS failure + load which are common.
if (state == SHUTDOWN) {
std.atomic.fence(.Acquire);
return;
}
// Consume a notification when it pops up.
// Acquire barrier to ensure operations before the notify() appear after the wait().
if (state == NOTIFIED) {
state = self.state.tryCompareAndSwap(
state,
acquire_with,
.Acquire,
.Monotonic,
) orelse return;
continue;
}
// There is no notification to consume, we should wait on the event by ensuring its WAITING.
if (state != WAITING) blk: {
state = self.state.tryCompareAndSwap(
state,
WAITING,
.Monotonic,
.Monotonic,
) orelse break :blk;
continue;
}
// Wait on the event until a notify() or shutdown().
// If we wake up to a notification, we must acquire it with WAITING instead of EMPTY
// since there may be other threads sleeping on the Futex who haven't been woken up yet.
//
// Acquiring to WAITING will make the next notify() or shutdown() wake a sleeping futex thread
// who will either exit on SHUTDOWN or acquire with WAITING again, ensuring all threads are awoken.
// This unfortunately results in the last notify() or shutdown() doing an extra futex wake but that's fine.
Futex.wait(&self.state, WAITING, null) catch unreachable;
state = self.state.load(.Monotonic);
acquire_with = WAITING;
}
}
/// Wait for and consume a notification
/// or wait for the event to be shutdown entirely
noinline fn waitFor(self: *Event, timeout: usize) void {
var acquire_with: u32 = EMPTY;
var state = self.state.load(.Monotonic);
while (true) {
// If we're shutdown then exit early.
// Acquire barrier to ensure operations before the shutdown() are seen after the wait().
// Shutdown is rare so it's better to have an Acquire barrier here instead of on CAS failure + load which are common.
if (state == SHUTDOWN) {
std.atomic.fence(.Acquire);
return;
}
// Consume a notification when it pops up.
// Acquire barrier to ensure operations before the notify() appear after the wait().
if (state == NOTIFIED) {
state = self.state.tryCompareAndSwap(
state,
acquire_with,
.Acquire,
.Monotonic,
) orelse return;
continue;
}
// There is no notification to consume, we should wait on the event by ensuring its WAITING.
if (state != WAITING) blk: {
state = self.state.tryCompareAndSwap(
state,
WAITING,
.Monotonic,
.Monotonic,
) orelse break :blk;
continue;
}
// Wait on the event until a notify() or shutdown().
// If we wake up to a notification, we must acquire it with WAITING instead of EMPTY
// since there may be other threads sleeping on the Futex who haven't been woken up yet.
//
// Acquiring to WAITING will make the next notify() or shutdown() wake a sleeping futex thread
// who will either exit on SHUTDOWN or acquire with WAITING again, ensuring all threads are awoken.
// This unfortunately results in the last notify() or shutdown() doing an extra futex wake but that's fine.
Futex.wait(&self.state, WAITING, timeout) catch {};
state = self.state.load(.Monotonic);
acquire_with = WAITING;
}
}
/// Post a notification to the event if it doesn't have one already
/// then wake up a waiting thread if there is one as well.
fn notify(self: *Event) void {
return self.wake(NOTIFIED, 1);
}
/// Marks the event as shutdown, making all future wait()'s return immediately.
/// Then wakes up any threads currently waiting on the Event.
fn shutdown(self: *Event) void {
return self.wake(SHUTDOWN, std.math.maxInt(u32));
}
fn wake(self: *Event, release_with: u32, wake_threads: u32) void {
// Update the Event to notify it with the new `release_with` state (either NOTIFIED or SHUTDOWN).
// Release barrier to ensure any operations before this are this to happen before the wait() in the other threads.
const state = self.state.swap(release_with, .Release);
// Only wake threads sleeping in futex if the state is WAITING.
// Avoids unnecessary wake ups.
if (state == WAITING) {
Futex.wake(&self.state, wake_threads);
}
}
};
/// Linked list intrusive memory node and lock-free data structures to operate with it
pub const Node = struct {
next: ?*Node = null,
/// A linked list of Nodes
const List = struct {
head: *Node,
tail: *Node,
};
/// An unbounded multi-producer-(non blocking)-multi-consumer queue of Node pointers.
const Queue = struct {
stack: Atomic(usize) = Atomic(usize).init(0),
cache: ?*Node = null,
const HAS_CACHE: usize = 0b01;
const IS_CONSUMING: usize = 0b10;
const PTR_MASK: usize = ~(HAS_CACHE | IS_CONSUMING);
comptime {
assert(@alignOf(Node) >= ((IS_CONSUMING | HAS_CACHE) + 1));
}
fn push(noalias self: *Queue, list: List) void {
var stack = self.stack.load(.Monotonic);
while (true) {
// Attach the list to the stack (pt. 1)
list.tail.next = @intToPtr(?*Node, stack & PTR_MASK);
// Update the stack with the list (pt. 2).
// Don't change the HAS_CACHE and IS_CONSUMING bits of the consumer.
var new_stack = @ptrToInt(list.head);
assert(new_stack & ~PTR_MASK == 0);
new_stack |= (stack & ~PTR_MASK);
// Push to the stack with a release barrier for the consumer to see the proper list links.
stack = self.stack.tryCompareAndSwap(
stack,
new_stack,
.Release,
.Monotonic,
) orelse break;
}
}
fn tryAcquireConsumer(self: *Queue) error{ Empty, Contended }!?*Node {
var stack = self.stack.load(.Monotonic);
while (true) {
if (stack & IS_CONSUMING != 0)
return error.Contended; // The queue already has a consumer.
if (stack & (HAS_CACHE | PTR_MASK) == 0)
return error.Empty; // The queue is empty when there's nothing cached and nothing in the stack.
// When we acquire the consumer, also consume the pushed stack if the cache is empty.
var new_stack = stack | HAS_CACHE | IS_CONSUMING;
if (stack & HAS_CACHE == 0) {
assert(stack & PTR_MASK != 0);
new_stack &= ~PTR_MASK;
}
// Acquire barrier on getting the consumer to see cache/Node updates done by previous consumers
// and to ensure our cache/Node updates in pop() happen after that of previous consumers.
stack = self.stack.tryCompareAndSwap(
stack,
new_stack,
.Acquire,
.Monotonic,
) orelse return self.cache orelse @intToPtr(*Node, stack & PTR_MASK);
}
}
fn releaseConsumer(noalias self: *Queue, noalias consumer: ?*Node) void {
// Stop consuming and remove the HAS_CACHE bit as well if the consumer's cache is empty.
// When HAS_CACHE bit is zeroed, the next consumer will acquire the pushed stack nodes.
var remove = IS_CONSUMING;
if (consumer == null)
remove |= HAS_CACHE;
// Release the consumer with a release barrier to ensure cache/node accesses
// happen before the consumer was released and before the next consumer starts using the cache.
self.cache = consumer;
const stack = self.stack.fetchSub(remove, .Release);
assert(stack & remove != 0);
}
fn pop(noalias self: *Queue, noalias consumer_ref: *?*Node) ?*Node {
// Check the consumer cache (fast path)
if (consumer_ref.*) |node| {
consumer_ref.* = node.next;
return node;
}
// Load the stack to see if there was anything pushed that we could grab.
var stack = self.stack.load(.Monotonic);
assert(stack & IS_CONSUMING != 0);
if (stack & PTR_MASK == 0) {
return null;
}
// Nodes have been pushed to the stack, grab then with an Acquire barrier to see the Node links.
stack = self.stack.swap(HAS_CACHE | IS_CONSUMING, .Acquire);
assert(stack & IS_CONSUMING != 0);
assert(stack & PTR_MASK != 0);
const node = @intToPtr(*Node, stack & PTR_MASK);
consumer_ref.* = node.next;
return node;
}
};
/// A bounded single-producer, multi-consumer ring buffer for node pointers.
const Buffer = struct {
head: Atomic(Index) = Atomic(Index).init(0),
tail: Atomic(Index) = Atomic(Index).init(0),
array: [capacity]Atomic(*Node) = undefined,
const Index = u32;
const capacity = 256; // Appears to be a pretty good trade-off in space vs contended throughput
comptime {
assert(std.math.maxInt(Index) >= capacity);
assert(std.math.isPowerOfTwo(capacity));
}
fn push(noalias self: *Buffer, noalias list: *List) error{Overflow}!void {
var head = self.head.load(.Monotonic);
var tail = self.tail.loadUnchecked(); // we're the only thread that can change this
while (true) {
var size = tail -% head;
assert(size <= capacity);
// Push nodes from the list to the buffer if it's not empty..
if (size < capacity) {
var nodes: ?*Node = list.head;
while (size < capacity) : (size += 1) {
const node = nodes orelse break;
nodes = node.next;
// Array written atomically with weakest ordering since it could be getting atomically read by steal().
self.array[tail % capacity].store(node, .Unordered);
tail +%= 1;
}
// Release barrier synchronizes with Acquire loads for steal()ers to see the array writes.
self.tail.store(tail, .Release);
// Update the list with the nodes we pushed to the buffer and try again if there's more.
list.head = nodes orelse return;
std.atomic.spinLoopHint();
head = self.head.load(.Monotonic);
continue;
}
// Try to steal/overflow half of the tasks in the buffer to make room for future push()es.
// Migrating half amortizes the cost of stealing while requiring future pops to still use the buffer.
// Acquire barrier to ensure the linked list creation after the steal only happens after we successfully steal.
var migrate = size / 2;
head = self.head.tryCompareAndSwap(
head,
head +% migrate,
.Acquire,
.Monotonic,
) orelse {
// Link the migrated Nodes together
const first = self.array[head % capacity].loadUnchecked();
while (migrate > 0) : (migrate -= 1) {
const prev = self.array[head % capacity].loadUnchecked();
head +%= 1;
prev.next = self.array[head % capacity].loadUnchecked();
}
// Append the list that was supposed to be pushed to the end of the migrated Nodes
const last = self.array[(head -% 1) % capacity].loadUnchecked();
last.next = list.head;
list.tail.next = null;
// Return the migrated nodes + the original list as overflowed
list.head = first;
return error.Overflow;
};
}
}
fn pop(self: *Buffer) ?*Node {
var head = self.head.load(.Monotonic);
var tail = self.tail.loadUnchecked(); // we're the only thread that can change this
while (true) {
// Quick sanity check and return null when not empty
var size = tail -% head;
assert(size <= capacity);
if (size == 0) {
return null;
}
// Dequeue with an acquire barrier to ensure any writes done to the Node
// only happens after we successfully claim it from the array.
head = self.head.tryCompareAndSwap(
head,
head +% 1,
.Acquire,
.Monotonic,
) orelse return self.array[head % capacity].loadUnchecked();
}
}
const Stole = struct {
node: *Node,
pushed: bool,
};
fn consume(noalias self: *Buffer, noalias queue: *Queue) ?Stole {
var consumer = queue.tryAcquireConsumer() catch return null;
defer queue.releaseConsumer(consumer);
const head = self.head.load(.Monotonic);
const tail = self.tail.loadUnchecked(); // we're the only thread that can change this
const size = tail -% head;
assert(size <= capacity);
assert(size == 0); // we should only be consuming if our array is empty
// Pop nodes from the queue and push them to our array.
// Atomic stores to the array as steal() threads may be atomically reading from it.
var pushed: Index = 0;
while (pushed < capacity) : (pushed += 1) {
const node = queue.pop(&consumer) orelse break;
self.array[(tail +% pushed) % capacity].store(node, .Unordered);
}
// We will be returning one node that we stole from the queue.
// Get an extra, and if that's not possible, take one from our array.
const node = queue.pop(&consumer) orelse blk: {
if (pushed == 0) return null;
pushed -= 1;
break :blk self.array[(tail +% pushed) % capacity].loadUnchecked();
};
// Update the array tail with the nodes we pushed to it.
// Release barrier to synchronize with Acquire barrier in steal()'s to see the written array Nodes.
if (pushed > 0) self.tail.store(tail +% pushed, .Release);
return Stole{
.node = node,
.pushed = pushed > 0,
};
}
fn steal(noalias self: *Buffer, noalias buffer: *Buffer) ?Stole {
const head = self.head.load(.Monotonic);
const tail = self.tail.loadUnchecked(); // we're the only thread that can change this
const size = tail -% head;
assert(size <= capacity);
assert(size == 0); // we should only be stealing if our array is empty
while (true) : (std.atomic.spinLoopHint()) {
const buffer_head = buffer.head.load(.Acquire);
const buffer_tail = buffer.tail.load(.Acquire);
// Overly large size indicates the the tail was updated a lot after the head was loaded.
// Reload both and try again.
const buffer_size = buffer_tail -% buffer_head;
if (buffer_size > capacity) {
continue;
}
// Try to steal half (divCeil) to amortize the cost of stealing from other threads.
const steal_size = buffer_size - (buffer_size / 2);
if (steal_size == 0) {
return null;
}
// Copy the nodes we will steal from the target's array to our own.
// Atomically load from the target buffer array as it may be pushing and atomically storing to it.
// Atomic store to our array as other steal() threads may be atomically loading from it as above.
var i: Index = 0;
while (i < steal_size) : (i += 1) {
const node = buffer.array[(buffer_head +% i) % capacity].load(.Unordered);
self.array[(tail +% i) % capacity].store(node, .Unordered);
}
// Try to commit the steal from the target buffer using:
// - an Acquire barrier to ensure that we only interact with the stolen Nodes after the steal was committed.
// - a Release barrier to ensure that the Nodes are copied above prior to the committing of the steal
// because if they're copied after the steal, the could be getting rewritten by the target's push().
_ = buffer.head.compareAndSwap(
buffer_head,
buffer_head +% steal_size,
.AcqRel,
.Monotonic,
) orelse {
// Pop one from the nodes we stole as we'll be returning it
const pushed = steal_size - 1;
const node = self.array[(tail +% pushed) % capacity].loadUnchecked();
// Update the array tail with the nodes we pushed to it.
// Release barrier to synchronize with Acquire barrier in steal()'s to see the written array Nodes.
if (pushed > 0) self.tail.store(tail +% pushed, .Release);
return Stole{
.node = node,
.pushed = pushed > 0,
};
};
}
}
};
};