Files
bun.sh/src/sync.zig
Jarred Sumner 6362414d65 Bun gets a new bundler (#2312)
* alright now just gotta try running it

* fix a gajillion compiler errors

* even more code

* okay i fixed more errors

* wip

* Update launch.json

* Update string_builder.zig

* `fast_debug_build_mode` makes debug build 2x faster

* Update bundle_v2.zig

* more code!

* It bundles!

* Rename `Bun.Transpiler` to `Bun.Bundler`

* `import()` expressions almost work

* wip attempt to get import() expr to work

* Bundle namespace imports

* Attempt to fix the issue with import() unsuccessfully

* consider current working directory when resolving relative paths (#2313)

* consider current working directory when resolving relative paths

fixes #2298

* comment test

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>

* support `expect().toThrow(/pattern/)` (#2314)

- fix time-zone-dependent test failure

* fix missing `Blob` error messages on Linux (#2315)

* fix & clean up tests (#2318)

- skip flaky tests when running as `root`
- use `expect().toThrow()`
- clean up temporary files after tests

* feat(tty): add some `tty.WriteStream` methods to `process.{stdout, stderr}` (#2320)

* feat(stdio): add some `tty.WriteStream` methods

* chore(builtins): add process builtin gen'd code

* Fix docker install command

* `bun test` on macOS in GitHub Actions (#2322)

* Fixes #2323

* throw invalid parameter errors in `crypto.scryptSync` (#2331)

* throw invalid parameter errors

* remove comptime, add empty buffer function

* remove error_name comptime

* Add reference documentation for bun:test (#2327)

* Reorganize tests (#2332)

* Fix html-rewriter.test.js

* fix the wrong thing being incremented in hmr example (#2334)

* Add more test harness

* Improve Benchmarking page, small fixes (#2339)

* Improve benchmarking page

* WIP

* Add typescript instructions to hot

* Document preload in Plugins. Fix loader in plugin types.

* Fix typo

* Fix links

* run prettier

* Document openInEditor

* improve `Buffer` compatibility with Node.js (#2341)

* improve `Buffer` compatibility with Node.js

* use `memmove()`
allow `encoding` to be `undefined`

* run `bun test` after macOS builds (#2343)

* "binary" is an alias of "latin1"

Fixes https://github.com/oven-sh/bun/issues/2110

* More spec compliant `Blob.prototype.type` (#2340)

* Make `Blob.prototype. type` more spec compliant

* Add a few more checks for isNumber()

* Fix `make headers`

* Safer JSValue.isString()

* More tests for blob.slice

* Make `Blob.prototype.type` more spec compliant

* Add isASCII check

* Fix types

* Fix failing type test

* Update blob.zig

* Update blob.zig

* Fix .eql check on empty values

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>

* Fix bug in test runner

* Support `import()` expressions

* Implement `require()`

* clean up bit_set.zig slightly

* Move some things around

* misc cleanup

* Cleanup some things

* Fix a lot of stuff

* Fix `module.exports.fn = fn;` in ESM entry point

* Fix crash due when printing file

* Fix issue with class names

* Fix issue with `export default identifier`

* Update js_parser.zig

* optimization: inline single-property object acceses and arrays

* Fix undefined memory in renamed symbols list

* Handle call target

* wip

* Inline it

* Fix undefined memory issue when reclaiming blocks in ast

* Halt linking on any parse errors

* alias

* Rename `enable_bundling` to `enable_legacy_bundling`

* Workaround anonymous struct literal zig bug

* Use slower approach (without bitset) because it doesn't break after 8 symbols

* Fix incorrectly-renaming statically defined symbols

* Handle more edgecases in our bit_set fork

* Reduce number of allocations for `define`

* Do not rename unbound symbols

* Clean up dot defines a little more

* Make the generated names prettier

* Workaround runtime symbol missing issue

* Fail the build on errors

* Support export * from

* Support `--outfile`

* partially fix renaming

* fanicer symbol renaming impl

* misc, extremely revertible cleanup

* Fix up some bugs with symbol renaming

* formatting

* Update launch.json

* Parse `__PURE__` comments

* clean up simd code for pure comments

* changes to merge

* workaround runtime issue

* Fix issue with `export * as` not propagating correctly

* Make all top-level declarations `var` when bundling

* Fix missing prefix

* Fix assigning to stack copy

* Fix missing runtime symbol

* Fix bug with namespace exports

* Dramatically reduce allocations

* Update launch.json

* Add missing flags

* Update js_parser.zig

* small cleanup

* Make the export name better

* Fix unnecessary `var foo = foo`

* Implement CommonJS -> ESM conversion

* Implement module redirects

* Port esbuild bundler tests for new bundler (#2380)

* started porting esbuild tests

* clean up test names and api before moving on

* port tests using a program i wrote

* replace todo generated comment

* fix generated tests not including some files

* work on tests

* [github web editor] add define, external, inject, minifySyntax, minifyWhitespace options.

* get most of the todo comments out of the way, but expectBundled does not handle most of the cases

* continue working on esbuild tests

* use test.skip for unsupported tests

* Fixups for test runner

* Hoist imports & exports

* Fix test

* Hoist classes

* bundler test refining, 51/835

* Fix runtime require

* bundler test refining, 81/835

* bundler test refining, 93/835

* Make the test work in any timezone

* feat(expect): update toBeInstanceOf (#2396)

* feat: update instanceof binding

* fix: according to PR comments

* Rename `expectObjectTypeCount` to `expectMaxObjectTypeCount`

* Fix socket tests with connection errors (#2403)

* release pending activity with connection error handler

* unref poll_ref

* remove trailing comma

* Organize Dockerfiles for official status

* Remove test Dockerfile

* Remove old Docker workflow

* Feat(test): add toMatch (#2404)

* Fix various fetch/response/request tests (#2416)

* fix most fetch tests, skip a few

* fastGet, toValueGC, and invalid init

* bigint unreachable, range error, log process as process

* remove extra fetch_headers

* remove js_type parameter, check isObject()

* throw invalid mime type error, use enum literal

* switch back to promise rejection

* RangeError pascal case

* Fix several bugs (#2418)

* utf16 codepoint with replacement character

* Fix test failure with `TextEncoder("ascii')`

* Add missing type

* Fix Response.prototype.bodyUsed and Request.prototype.bodyUsed

* Fix bug with scrypt error not clearing

* Update server.zig

* oopsie

* 💅

* docs: Use correct url in the 'Issues' link in README header (#2420)

* Fix crash when rendering error page and the server or network is slow

* [fetch] Make the default body value `null` when unspecified

This is better aligned with the fetch spec

* Make node-net tests less flaky

* [node:net] Fix issue with `listen` callback firing before it's listening

* Always clear timers in node test harness

* Fix out of bounds access

Repro'd in Buffer tests

* Update UWS

cc @cirospaciari

* Make this test more thorough

* Hanging abort test

* 0 length body is a null stream

* Several bug fixes (#2427)

* Fix test

* Fix segfault when unexpected type is passed in `expect().toThrow`

* Fix issues with request constructor

* Don't bother cloning headers when its empty

* woops

* more tests

* fix incorrect test

* Make the fetch error messages better

* Update response.zig

* Fix test that failed on macOS

* Fix test

* Remove extra hash table lookups

* Support running dummy registry directly

cc @alexlamsl

* Update test

* Update test

* fixup

* Workaround crash in test runner

* Fixup test

* Fixup test

* Update os.test.js

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>

* Remove usages of port numbers in tests

* Set -O2 and -fno-rtti

* Remove -g

* Prevent undefined memory access

* [bun test] Implement `--rerun-each` flag to run each test N times

* Reduce number of module scopes created

* add some extra abort checks into streams (#2430)

* add some checks to avoid UAF

* avoid multiple calls to finalize if endFromJS is called more than once

* fix no-op comment

* mark as requested_end on abort

* remove requested_end from abort

* remove unnecessary check (#2432)

* Fix bug with scoped aliased dependencies in bun install on macOS

* remove `addLog`, remove `--prominent-compile-errors`

* Finish the upgrade

* Optional chaining flag

* Implement same_target_becomes_destructuring optimization

* bundler test refining, 109/835

* Reset bindings

* Support multiple entry points

* Implement `--entry-names` flag

* Use a tempdir with a better name

* prettier

* Log file name

* Update js_parser.zig

* Mark all bun builtins as external

* Make resolve errors actually errors

* Update bundler_default.test.ts

* Fix `await import(foo)`

* WIP react server components

* Do more stuff at runtime

* ✂️

* Support automatic JSX imports

* Use a module cache for now

* Update tsconfig.base.json

* Fix ThisOutsideFunctionNotRenamed

* woopsie

* moar cpu

* clamp it

* fixup

* Add a bunch of assertions

* Bun uses automatic runtime by default

* Parse Import Attributes

* Add a note about Valgrind

* Update developing.md

* Fix up code splitting for React Server Components

* Implement client component manifest

* Fix crash with --react-server-components and no client components

* Backport 4d31e3c917

* Update launch.json

* Fix for latest zig

* Workaround bug with ?[]const string

Occasionally saw alignment errors in this code

Workaround https://github.com/ziglang/zig/issues/15085

related: https://github.com/ziglang/zig/pull/15089

* switch to regular slice

* Avoid initializing named_imports and named_exports as undefined

* Reduce usages of `undefined`

* Add more assertions

* --watch wip

* Update javascript.zig

* Possibly fix the race condition

* Faster `do`

* bump allocator

* Reduce the size of `Symbol` slightly

* Alphabetically sort runtime import symbols, for determinism

* Prepare for code splitting

* handle overlapping stdout

* pure

* clean up some things

* Fix bug with `$$typeof`

* Address CommonJS -> ESM hoisting bug

* Support `"use server"` in manifest

* Implement `"use server"`

* Fix importing bun builtins when bundling

* Make `commonjs_to_esm` a feature flag, fix some splitting bugs

* ✂️

* fixme remove this

* Fix crash in longestCommonPath

* Chunking! Just need to do import paths now.

* Import paths work...now trying to figure out how to make runtime symbols work

* add workaround

* Replace `bun bun` with `bun build`

* Fix crash with dual package hazard

* Fix many CommonJS <> ESM interop bugs

* Support package.json `"sideEffects"`

also skip loading unnecessary package.json data in `bun run`

* add a not good --watch implementation

* bundler test refining, 140/831

* remove accidentally committed file

* do not return status code 1 on successful bundles

* bundler test refining, 159/830

* pass exit code to exitOrWatch

* clean up help menu

-remove two spaces to line up bun build
-moved all <r> tags to the end of the text they are colorizing
-moved other colors to the start of the text they colorize
-removed unneeded <r> tags, keeping only one at the start of the block

* importstar is fully ported

* wip

* you can run code in this branch now

* Disable this transform

* organize and document bundler tests

* Fix double import

* Fix sloppy mode function declarations

* Disable our CommonJS transform for now

* add `assertNotPresent` to make splitting cases easier

* Bump!

* Update bun.d.ts

* use import.meta.require in runtime code

* Disable this again

* Fix dirname

* Fix ESM -> CJS wrapper

* 💅

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: Alex Lam S.L <alexlamsl@gmail.com>
Co-authored-by: Derrick Farris <mr.dcfarris@gmail.com>
Co-authored-by: Ashcon Partovi <ashcon@partovi.net>
Co-authored-by: Dylan Conway <35280289+dylan-conway@users.noreply.github.com>
Co-authored-by: pfg <pfg@pfg.pw>
Co-authored-by: Colin McDonnell <colinmcd94@gmail.com>
Co-authored-by: dave caruso <me@paperdave.net>
Co-authored-by: zhiyuan <32867472+zhiyuang@users.noreply.github.com>
Co-authored-by: Dylan Conway <dylan.conway567@gmail.com>
Co-authored-by: Kamil Ogórek <kamil.ogorek@gmail.com>
Co-authored-by: Ciro Spaciari <ciro.spaciari@gmail.com>
2023-04-07 20:08:01 -07:00

1227 lines
36 KiB
Zig

const std = @import("std");
const system = std.system;
const bun = @import("bun");
// https://gist.github.com/kprotty/0d2dc3da4840341d6ff361b27bdac7dc
pub const ThreadPool = struct {
state: usize = 0,
spawned: usize = 0,
run_queue: Queue,
idle_semaphore: Semaphore,
allocator: std.mem.Allocator,
workers: []Worker = &[_]Worker{},
pub const InitConfig = struct {
allocator: ?std.mem.Allocator = null,
max_threads: ?usize = null,
var default_gpa = std.heap.GeneralPurposeAllocator(.{}){};
var default_allocator = &default_gpa.allocator;
};
pub fn init(self: *ThreadPool, config: InitConfig) !void {
self.* = ThreadPool{
.run_queue = Queue.init(),
.idle_semaphore = Semaphore.init(0),
.allocator = config.allocator orelse InitConfig.default_allocator,
};
errdefer self.deinit();
const num_workers = std.math.max(1, config.max_threads orelse std.Thread.cpuCount() catch 1);
self.workers = try self.allocator.alloc(Worker, num_workers);
for (&self.workers) |*worker| {
try worker.init(self);
@atomicStore(usize, &self.spawned, self.spawned + 1, .SeqCst);
}
}
pub fn deinit(self: *ThreadPool) void {
self.shutdown();
for (&self.workers[0..self.spawned]) |*worker|
worker.deinit();
while (self.run_queue.pop()) |run_node|
(run_node.data.runFn)(&run_node.data);
self.allocator.free(self.workers);
self.idle_semaphore.deinit();
self.run_queue.deinit();
self.* = undefined;
}
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
const Args = @TypeOf(args);
const Closure = struct {
func_args: Args,
allocator: std.mem.Allocator,
run_node: RunNode = .{ .data = .{ .runFn = runFn } },
fn runFn(runnable: *Runnable) void {
const run_node = @fieldParentPtr(RunNode, "data", runnable);
const closure = @fieldParentPtr(@This(), "run_node", run_node);
_ = @call(.auto, func, closure.func_args);
closure.allocator.destroy(closure);
}
};
const allocator = self.allocator;
const closure = try allocator.create(Closure);
errdefer allocator.free(closure);
closure.* = Closure{
.func_args = args,
.allocator = allocator,
};
const run_node = &closure.run_node;
if (Worker.current) |worker| {
worker.run_queue.push(run_node);
} else {
self.run_queue.push(run_node);
}
self.notify();
}
const State = struct {
is_shutdown: bool = false,
is_notified: bool = false,
idle_workers: usize = 0,
fn pack(self: State) usize {
return ((@as(usize, @boolToInt(self.is_shutdown)) << 0) |
(@as(usize, @boolToInt(self.is_notified)) << 1) |
(self.idle_workers << 2));
}
fn unpack(value: usize) State {
return State{
.is_shutdown = value & (1 << 0) != 0,
.is_notified = value & (1 << 1) != 0,
.idle_workers = value >> 2,
};
}
};
fn wait(self: *ThreadPool) error{Shutdown}!void {
var state = State.unpack(@atomicLoad(usize, &self.state, .SeqCst));
while (true) {
if (state.is_shutdown)
return error.Shutdown;
var new_state = state;
if (state.is_notified) {
new_state.is_notified = false;
} else {
new_state.idle_workers += 1;
}
if (@cmpxchgWeak(
usize,
&self.state,
state.pack(),
new_state.pack(),
.SeqCst,
.SeqCst,
)) |updated| {
state = State.unpack(updated);
continue;
}
if (!state.is_notified)
self.idle_semaphore.wait();
return;
}
}
fn notify(self: *ThreadPool) void {
var state = State.unpack(@atomicLoad(usize, &self.state, .SeqCst));
while (true) {
if (state.is_shutdown)
return;
var new_state = state;
if (state.is_notified) {
return;
} else if (state.idle_workers == 0) {
new_state.is_notified = true;
} else {
new_state.idle_workers -= 1;
}
if (@cmpxchgWeak(
usize,
&self.state,
state.pack(),
new_state.pack(),
.SeqCst,
.SeqCst,
)) |updated| {
state = State.unpack(updated);
continue;
}
if (!new_state.is_notified)
self.idle_semaphore.post();
return;
}
}
fn shutdown(self: *ThreadPool) void {
var state = State.unpack(@atomicRmw(
usize,
&self.state,
.Xchg,
(State{ .is_shutdown = true }).pack(),
.SeqCst,
));
while (state.idle_workers > 0) : (state.idle_workers -= 1)
self.idle_semaphore.post();
}
const Worker = struct {
thread: *std.Thread,
run_queue: Queue,
fn init(self: *Worker, pool: *ThreadPool) !void {
self.* = Worker{
.thread = undefined,
.run_queue = Queue.init(),
};
self.thread = std.Thread.spawn(
Worker.run,
RunConfig{
.worker = self,
.pool = pool,
},
) catch |err| {
self.run_queue.deinit();
return err;
};
}
fn deinit(self: *Worker) void {
self.thread.wait();
self.run_queue.deinit();
self.* = undefined;
}
threadlocal var current: ?*Worker = null;
const RunConfig = struct {
worker: *Worker,
pool: *ThreadPool,
};
fn run(config: RunConfig) void {
const self = config.worker;
const pool = config.pool;
const old_current = current;
current = self;
defer current = old_current;
var tick = @ptrToInt(self);
var prng = std.rand.DefaultPrng.init(tick);
while (true) {
const run_node = self.poll(tick, pool, &prng.random) orelse {
pool.wait() catch break;
continue;
};
tick +%= 1;
(run_node.data.runFn)(&run_node.data);
}
}
fn poll(self: *Worker, tick: usize, pool: *ThreadPool, rand: *std.rand.Random) ?*RunNode {
if (tick % 128 == 0) {
if (self.steal(pool, rand, .fair)) |run_node|
return run_node;
}
if (tick % 64 == 0) {
if (self.run_queue.steal(&pool.run_queue, .fair)) |run_node|
return run_node;
}
if (self.run_queue.pop()) |run_node|
return run_node;
var attempts: usize = 8;
while (attempts > 0) : (attempts -= 1) {
if (self.steal(pool, rand, .unfair)) |run_node| {
return run_node;
} else {
std.os.sched_yield() catch spinLoopHint();
}
}
if (self.run_queue.steal(&pool.run_queue, .unfair)) |run_node|
return run_node;
return null;
}
fn steal(self: *Worker, pool: *ThreadPool, rand: *std.rand.Random, mode: anytype) ?*RunNode {
const spawned = @atomicLoad(usize, &pool.spawned, .SeqCst);
if (spawned < 2)
return null;
var index = rand.uintLessThan(usize, spawned);
var iter = spawned;
while (iter > 0) : (iter -= 1) {
const target = &pool.workers[index];
index += 1;
if (index == spawned)
index = 0;
if (target == self)
continue;
if (self.run_queue.steal(&target.run_queue, mode)) |run_node|
return run_node;
}
return null;
}
};
const Queue = struct {
mutex: Mutex,
size: usize,
list: List,
fn init() Queue {
return Queue{
.mutex = Mutex.init(),
.size = 0,
.list = .{},
};
}
fn deinit(self: *Queue) void {
self.mutex.deinit();
self.* = undefined;
}
fn push(self: *Queue, node: *List.Node) void {
self.mutex.lock();
defer self.mutex.unlock();
self.list.prepend(node);
@atomicStore(usize, &self.size, self.size + 1, .SeqCst);
}
fn pop(self: *Queue) ?*List.Node {
return self.popFrom(.head);
}
fn steal(_: *Queue, target: *Queue, mode: enum { fair, unfair }) ?*RunNode {
return target.popFrom(switch (mode) {
.fair => .tail,
.unfair => .head,
});
}
fn popFrom(self: *Queue, side: enum { head, tail }) ?*RunNode {
if (@atomicLoad(usize, &self.size, .SeqCst) == 0)
return null;
self.mutex.lock();
defer self.mutex.unlock();
// potential deadlock when all pops are fair..
const run_node = switch (side) {
.head => self.list.popFirst(),
.tail => self.list.pop(),
};
if (run_node != null)
@atomicStore(usize, &self.size, self.size - 1, .SeqCst);
return run_node;
}
};
const List = std.TailQueue(Runnable);
const RunNode = List.Node;
const Runnable = struct {
runFn: *const (fn (*Runnable) void),
};
};
pub fn Channel(
comptime T: type,
comptime buffer_type: std.fifo.LinearFifoBufferType,
) type {
return struct {
mutex: Mutex,
putters: Condvar,
getters: Condvar,
buffer: Buffer,
is_closed: bool,
const Self = @This();
const Buffer = std.fifo.LinearFifo(T, buffer_type);
pub usingnamespace switch (buffer_type) {
.Static => struct {
pub fn init() Self {
return Self.withBuffer(Buffer.init());
}
},
.Slice => struct {
pub fn init(buf: []T) Self {
return Self.withBuffer(Buffer.init(buf));
}
},
.Dynamic => struct {
pub fn init(allocator: std.mem.Allocator) Self {
return Self.withBuffer(Buffer.init(allocator));
}
},
};
fn withBuffer(buffer: Buffer) Self {
return Self{
.mutex = Mutex.init(),
.putters = Condvar.init(),
.getters = Condvar.init(),
.buffer = buffer,
.is_closed = false,
};
}
pub fn deinit(self: *Self) void {
self.mutex.deinit();
self.putters.deinit();
self.getters.deinit();
self.buffer.deinit();
self.* = undefined;
}
pub fn close(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
if (self.is_closed)
return;
self.is_closed = true;
self.putters.broadcast();
self.getters.broadcast();
}
pub fn tryWriteItem(self: *Self, item: T) !bool {
const wrote = try self.write(&[1]T{item});
return wrote == 1;
}
pub fn writeItem(self: *Self, item: T) !void {
return self.writeAll(&[1]T{item});
}
pub fn write(self: *Self, items: []const T) !usize {
return self.writeItems(items, false);
}
pub fn tryReadItem(self: *Self) !?T {
var items: [1]T = undefined;
if ((try self.read(&items)) != 1)
return null;
return items[0];
}
pub fn readItem(self: *Self) !T {
var items: [1]T = undefined;
try self.readAll(&items);
return items[0];
}
pub fn read(self: *Self, items: []T) !usize {
return self.readItems(items, false);
}
pub fn writeAll(self: *Self, items: []const T) !void {
std.debug.assert((try self.writeItems(items, true)) == items.len);
}
pub fn readAll(self: *Self, items: []T) !void {
std.debug.assert((try self.readItems(items, true)) == items.len);
}
fn writeItems(self: *Self, items: []const T, should_block: bool) !usize {
self.mutex.lock();
defer self.mutex.unlock();
var pushed: usize = 0;
while (pushed < items.len) {
const did_push = blk: {
if (self.is_closed)
return error.Closed;
self.buffer.write(items) catch |err| {
if (buffer_type == .Dynamic)
return err;
break :blk false;
};
self.getters.signal();
break :blk true;
};
if (did_push) {
pushed += 1;
} else if (should_block) {
self.putters.wait(&self.mutex);
} else {
break;
}
}
return pushed;
}
fn readItems(self: *Self, items: []T, should_block: bool) !usize {
self.mutex.lock();
defer self.mutex.unlock();
var popped: usize = 0;
while (popped < items.len) {
const new_item = blk: {
if (self.buffer.readItem()) |item| {
self.putters.signal();
break :blk item;
}
if (self.is_closed)
return error.Closed;
break :blk null;
};
if (new_item) |item| {
items[popped] = item;
popped += 1;
} else if (should_block) {
self.getters.wait(&self.mutex);
} else {
break;
}
}
return popped;
}
};
}
pub const RwLock = if (@import("builtin").os.tag != .windows and @import("builtin").link_libc)
struct {
rwlock: if (@import("builtin").os.tag != .windows) pthread_rwlock_t else void,
pub fn init() RwLock {
return .{ .rwlock = PTHREAD_RWLOCK_INITIALIZER };
}
pub fn deinit(self: *RwLock) void {
const safe_rc = switch (@import("builtin").os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const rc = std.c.pthread_rwlock_destroy(&self.rwlock);
std.debug.assert(rc == .SUCCESS or rc == safe_rc);
self.* = undefined;
}
pub fn tryLock(self: *RwLock) bool {
return pthread_rwlock_trywrlock(&self.rwlock) == 0;
}
pub fn lock(self: *RwLock) void {
const rc = pthread_rwlock_wrlock(&self.rwlock);
std.debug.assert(rc == .SUCCESS);
}
pub fn unlock(self: *RwLock) void {
const rc = pthread_rwlock_unlock(&self.rwlock);
std.debug.assert(rc == .SUCCESS);
}
pub fn tryLockShared(self: *RwLock) bool {
return pthread_rwlock_tryrdlock(&self.rwlock) == 0;
}
pub fn lockShared(self: *RwLock) void {
const rc = pthread_rwlock_rdlock(&self.rwlock);
std.debug.assert(rc == .SUCCESS);
}
pub fn unlockShared(self: *RwLock) void {
const rc = pthread_rwlock_unlock(&self.rwlock);
std.debug.assert(rc == .SUCCESS);
}
const PTHREAD_RWLOCK_INITIALIZER = pthread_rwlock_t{};
const pthread_rwlock_t = switch (@import("builtin").os.tag) {
.macos, .ios, .watchos, .tvos => extern struct {
__sig: c_long = 0x2DA8B3B4,
__opaque: [192]u8 = [_]u8{0} ** 192,
},
.linux => switch (@import("builtin").abi) {
.android => switch (@sizeOf(usize)) {
4 => extern struct {
lock: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER,
cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER,
numLocks: c_int = 0,
writerThreadId: c_int = 0,
pendingReaders: c_int = 0,
pendingWriters: c_int = 0,
attr: i32 = 0,
__reserved: [12]u8 = [_]u8{0} ** 2,
},
8 => extern struct {
numLocks: c_int = 0,
writerThreadId: c_int = 0,
pendingReaders: c_int = 0,
pendingWriters: c_int = 0,
attr: i32 = 0,
__reserved: [36]u8 = [_]u8{0} ** 36,
},
else => unreachable,
},
else => extern struct {
size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56,
},
},
.fuchsia => extern struct {
size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56,
},
.emscripten => extern struct {
size: [32]u8 align(4) = [_]u8{0} ** 32,
},
.netbsd => extern struct {
ptr_magic: c_uint = 0x99990009,
ptr_interlock: switch (@import("builtin").target.cpu.arch) {
.aarch64, .sparc, .x86_64 => u8,
.arm, .powerpc => c_int,
else => unreachable,
} = 0,
ptr_rblocked_first: ?*u8 = null,
ptr_rblocked_last: ?*u8 = null,
ptr_wblocked_first: ?*u8 = null,
ptr_wblocked_last: ?*u8 = null,
ptr_nreaders: c_uint = 0,
ptr_owner: std.c.pthread_t = null,
ptr_private: ?*anyopaque = null,
},
.haiku => extern struct {
flags: u32 = 0,
owner: i32 = -1,
lock_sem: i32 = 0,
lock_count: i32 = 0,
reader_count: i32 = 0,
writer_count: i32 = 0,
waiters: [2]?*anyopaque = [_]?*anyopaque{ null, null },
},
.kfreebsd, .freebsd, .openbsd => extern struct {
ptr: ?*anyopaque = null,
},
.hermit => extern struct {
ptr: usize = std.math.maxInt(usize),
},
else => @compileError("pthread_rwlock_t not implemented for this platform"),
};
extern "c" fn pthread_rwlock_destroy(p: *pthread_rwlock_t) callconv(.C) std.os.E;
extern "c" fn pthread_rwlock_rdlock(p: *pthread_rwlock_t) callconv(.C) std.os.E;
extern "c" fn pthread_rwlock_wrlock(p: *pthread_rwlock_t) callconv(.C) std.os.E;
extern "c" fn pthread_rwlock_tryrdlock(p: *pthread_rwlock_t) callconv(.C) std.os.E;
extern "c" fn pthread_rwlock_trywrlock(p: *pthread_rwlock_t) callconv(.C) std.os.E;
extern "c" fn pthread_rwlock_unlock(p: *pthread_rwlock_t) callconv(.C) std.os.E;
}
else
struct {
/// https://github.com/bloomberg/rwl-bench/blob/master/bench11.cpp
state: usize,
mutex: Mutex,
semaphore: Semaphore,
const IS_WRITING: usize = 1;
const WRITER: usize = 1 << 1;
const READER: usize = 1 << (1 + std.meta.bitCount(Count));
const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(WRITER);
const READER_MASK: usize = std.math.maxInt(Count) << @ctz(READER);
const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2));
pub fn init() RwLock {
return .{
.state = 0,
.mutex = Mutex.init(),
.semaphore = Semaphore.init(0),
};
}
pub fn deinit(self: *RwLock) void {
self.semaphore.deinit();
self.mutex.deinit();
self.* = undefined;
}
pub fn tryLock(self: *RwLock) bool {
if (self.mutex.tryLock()) {
const state = @atomicLoad(usize, &self.state, .SeqCst);
if (state & READER_MASK == 0) {
_ = @atomicRmw(usize, &self.state, .Or, IS_WRITING, .SeqCst);
return true;
}
self.mutex.unlock();
}
return false;
}
pub fn lock(self: *RwLock) void {
_ = @atomicRmw(usize, &self.state, .Add, WRITER, .SeqCst);
self.mutex.lock();
const state = @atomicRmw(usize, &self.state, .Or, IS_WRITING, .SeqCst);
if (state & READER_MASK != 0)
self.semaphore.wait();
}
pub fn unlock(self: *RwLock) void {
_ = @atomicRmw(usize, &self.state, .And, ~IS_WRITING, .SeqCst);
self.mutex.unlock();
}
pub fn tryLockShared(self: *RwLock) bool {
const state = @atomicLoad(usize, &self.state, .SeqCst);
if (state & (IS_WRITING | WRITER_MASK) == 0) {
_ = @cmpxchgStrong(
usize,
&self.state,
state,
state + READER,
.SeqCst,
.SeqCst,
) orelse return true;
}
if (self.mutex.tryLock()) {
_ = @atomicRmw(usize, &self.state, .Add, READER, .SeqCst);
self.mutex.unlock();
return true;
}
return false;
}
pub fn lockShared(self: *RwLock) void {
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (state & (IS_WRITING | WRITER_MASK) == 0) {
state = @cmpxchgWeak(
usize,
&self.state,
state,
state + READER,
.SeqCst,
.SeqCst,
) orelse return;
}
self.mutex.lock();
_ = @atomicRmw(usize, &self.state, .Add, READER, .SeqCst);
self.mutex.unlock();
}
pub fn unlockShared(self: *RwLock) void {
const state = @atomicRmw(usize, &self.state, .Sub, READER, .SeqCst);
if ((state & READER_MASK == READER) and (state & IS_WRITING != 0))
self.semaphore.post();
}
};
pub const WaitGroup = struct {
mutex: Mutex,
cond: Condvar,
active: usize,
pub fn init() WaitGroup {
return .{
.mutex = Mutex.init(),
.cond = Condvar.init(),
.active = 0,
};
}
pub fn deinit(self: *WaitGroup) void {
self.mutex.deinit();
self.cond.deinit();
self.* = undefined;
}
pub fn addN(self: *WaitGroup, n: usize) void {
self.mutex.lock();
defer self.mutex.unlock();
self.active += n;
}
pub fn add(self: *WaitGroup) void {
return self.addN(1);
}
pub fn done(self: *WaitGroup) void {
self.mutex.lock();
defer self.mutex.unlock();
self.active -= 1;
if (self.active == 0)
self.cond.signal();
}
pub fn wait(self: *WaitGroup) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.active != 0)
self.cond.wait(&self.mutex);
}
};
pub const Semaphore = struct {
mutex: Mutex,
cond: Condvar,
permits: usize,
pub fn init(permits: usize) Semaphore {
return .{
.mutex = Mutex.init(),
.cond = Condvar.init(),
.permits = permits,
};
}
pub fn deinit(self: *Semaphore) void {
self.mutex.deinit();
self.cond.deinit();
self.* = undefined;
}
pub fn wait(self: *Semaphore) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.permits == 0)
self.cond.wait(&self.mutex);
self.permits -= 1;
if (self.permits > 0)
self.cond.signal();
}
pub fn post(self: *Semaphore) void {
self.mutex.lock();
defer self.mutex.unlock();
self.permits += 1;
self.cond.signal();
}
};
pub const Mutex = if (@import("builtin").os.tag == .windows)
struct {
srwlock: SRWLOCK,
pub fn init() Mutex {
return .{ .srwlock = SRWLOCK_INIT };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE;
}
pub fn lock(self: *Mutex) void {
AcquireSRWLockExclusive(&self.srwlock);
}
pub fn unlock(self: *Mutex) void {
ReleaseSRWLockExclusive(&self.srwlock);
}
const SRWLOCK = usize;
const SRWLOCK_INIT: SRWLOCK = 0;
extern "kernel32" fn TryAcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) system.BOOL;
extern "kernel32" fn AcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void;
extern "kernel32" fn ReleaseSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void;
}
else if (@import("builtin").link_libc)
struct {
mutex: if (@import("builtin").link_libc) std.c.pthread_mutex_t else void,
pub fn init() Mutex {
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER };
}
pub fn deinit(self: *Mutex) void {
const safe_rc = switch (@import("builtin").os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const rc = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(rc == .SUCCESS or rc == safe_rc);
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return pthread_mutex_trylock(&self.mutex) == 0;
}
pub fn lock(self: *Mutex) void {
const rc = std.c.pthread_mutex_lock(&self.mutex);
std.debug.assert(rc == .SUCCESS);
}
pub fn unlock(self: *Mutex) void {
const rc = std.c.pthread_mutex_unlock(&self.mutex);
std.debug.assert(rc == .SUCCESS);
}
extern "c" fn pthread_mutex_trylock(m: *std.c.pthread_mutex_t) callconv(.C) c_int;
}
else if (@import("builtin").os.tag == .linux)
struct {
state: State,
const State = enum(i32) {
unlocked,
locked,
waiting,
};
pub fn init() Mutex {
return .{ .state = .unlocked };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return @cmpxchgStrong(
State,
&self.state,
.unlocked,
.locked,
.Acquire,
.Monotonic,
) == null;
}
pub fn lock(self: *Mutex) void {
switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) {
.unlocked => {},
else => |s| self.lockSlow(s),
}
}
fn lockSlow(self: *Mutex, current_state: State) void {
@setCold(true);
var new_state = current_state;
while (true) {
var spin: u8 = 0;
while (spin < 100) : (spin += 1) {
const state = @cmpxchgWeak(
State,
&self.state,
.unlocked,
new_state,
.Acquire,
.Monotonic,
) orelse return;
switch (state) {
.unlocked => {},
.locked => {},
.waiting => break,
}
var iter = spin + 1;
while (iter > 0) : (iter -= 1)
spinLoopHint();
}
new_state = .waiting;
switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) {
.unlocked => return,
else => {},
}
Futex.wait(
@ptrCast(*const i32, &self.state),
@enumToInt(new_state),
);
}
}
pub fn unlock(self: *Mutex) void {
switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) {
.unlocked => unreachable,
.locked => {},
.waiting => self.unlockSlow(),
}
}
fn unlockSlow(self: *Mutex) void {
@setCold(true);
Futex.wake(@ptrCast(*const i32, &self.state));
}
}
else
struct {
is_locked: bool,
pub fn init() Mutex {
return .{ .is_locked = false };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return @atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire) == false;
}
pub fn lock(self: *Mutex) void {
while (!self.tryLock())
spinLoopHint();
}
pub fn unlock(self: *Mutex) void {
@atomicStore(bool, &self.is_locked, false, .Release);
}
};
pub const Condvar = if (@import("builtin").os.tag == .windows)
struct {
cond: CONDITION_VARIABLE,
pub fn init() Condvar {
return .{ .cond = CONDITION_VARIABLE_INIT };
}
pub fn deinit(self: *Condvar) void {
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
const rc = SleepConditionVariableSRW(
&self.cond,
&mutex.srwlock,
system.INFINITE,
@as(system.ULONG, 0),
);
std.debug.assert(rc != system.FALSE);
}
pub fn signal(self: *Condvar) void {
WakeConditionVariable(&self.cond);
}
pub fn broadcast(self: *Condvar) void {
WakeAllConditionVariable(&self.cond);
}
const SRWLOCK = usize;
const CONDITION_VARIABLE = usize;
const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0;
extern "kernel32" fn WakeAllConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void;
extern "kernel32" fn WakeConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void;
extern "kernel32" fn SleepConditionVariableSRW(
c: *CONDITION_VARIABLE,
s: *SRWLOCK,
t: system.DWORD,
f: system.ULONG,
) callconv(system.WINAPI) system.BOOL;
}
else if (@import("builtin").link_libc)
struct {
cond: if (@import("builtin").link_libc) std.c.pthread_cond_t else void,
pub fn init() Condvar {
return .{ .cond = std.c.PTHREAD_COND_INITIALIZER };
}
pub fn deinit(self: *Condvar) void {
const safe_rc = switch (@import("builtin").os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const rc = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(rc == .SUCCESS or rc == safe_rc);
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
const rc = std.c.pthread_cond_wait(&self.cond, &mutex.mutex);
std.debug.assert(rc == .SUCCESS);
}
pub fn signal(self: *Condvar) void {
const rc = std.c.pthread_cond_signal(&self.cond);
std.debug.assert(rc == .SUCCESS);
}
pub fn broadcast(self: *Condvar) void {
const rc = std.c.pthread_cond_broadcast(&self.cond);
std.debug.assert(rc == .SUCCESS);
}
}
else
struct {
mutex: Mutex,
notified: bool,
waiters: std.SinglyLinkedList(Event),
pub fn init() Condvar {
return .{
.mutex = Mutex.init(),
.notified = false,
.waiters = .{},
};
}
pub fn deinit(self: *Condvar) void {
self.mutex.deinit();
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
self.mutex.lock();
if (self.notified) {
self.notified = false;
self.mutex.unlock();
return;
}
var wait_node = @TypeOf(self.waiters).Node{ .data = .{} };
self.waiters.prepend(&wait_node);
self.mutex.unlock();
mutex.unlock();
wait_node.data.wait();
mutex.lock();
}
pub fn signal(self: *Condvar) void {
self.mutex.lock();
const maybe_wait_node = self.waiters.popFirst();
if (maybe_wait_node == null)
self.notified = true;
self.mutex.unlock();
if (maybe_wait_node) |wait_node|
wait_node.data.set();
}
pub fn broadcast(self: *Condvar) void {
self.mutex.lock();
var waiters = self.waiters;
self.notified = true;
self.mutex.unlock();
while (waiters.popFirst()) |wait_node|
wait_node.data.set();
}
const Event = struct {
futex: i32 = 0,
fn wait(self: *Event) void {
while (@atomicLoad(i32, &self.futex, .Acquire) == 0) {
if (@hasDecl(Futex, "wait")) {
Futex.wait(&self.futex, 0);
} else {
spinLoopHint();
}
}
}
fn set(self: *Event) void {
@atomicStore(i32, &self.futex, 1, .Release);
if (@hasDecl(Futex, "wake"))
Futex.wake(&self.futex);
}
};
};
const Futex = switch (@import("builtin").os.tag) {
.linux => struct {
fn wait(ptr: *const i32, cmp: i32) void {
switch (system.getErrno(system.futex_wait(
ptr,
system.FUTEX.PRIVATE_FLAG | system.FUTEX.WAIT,
cmp,
null,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
else => unreachable,
}
}
fn wake(ptr: *const i32) void {
switch (system.getErrno(system.futex_wake(
ptr,
system.FUTEX.PRIVATE_FLAG | system.FUTEX.WAKE,
@as(i32, 1),
))) {
0 => {},
std.os.EFAULT => {},
else => unreachable,
}
}
},
else => void,
};
fn spinLoopHint() void {
switch (@import("builtin").cpu.arch) {
.i386, .x86_64 => asm volatile ("pause" ::: "memory"),
.arm, .aarch64 => asm volatile ("yield" ::: "memory"),
else => {},
}
}