mirror of
https://github.com/oven-sh/bun
synced 2026-02-10 10:58:56 +00:00
* @min and @max * builtins and some trivial ones * Most of them * more * more! * More Progress * wip * Update tagged_pointer.zig * Update http_client_async.zig * Most of the iterable dir changes * alright * Remove usages of deprecated formatters * 📷 * fmt * Update shimmer.zig * wip * wip * wip * progress * more * Latest * stuck on error * latest * workaround stage2 * wip * Update string_immutable.zig * wip * Migrate `Dirent` and `require("fs')` to use JSC<>Zig bindings * Fix build errors * Fixup most of the test failures * Fix `make headers` * Fix "outside package path" error * Fixup aligned alloc * Add missing file * linux * More linux fixes * use latest peechy * Fix transpiler test failure * Forgot about these * Fixup test failure * Update node-timers.test.ts * [node:htt] Fix `undefined is not an object` error Fixes https://github.com/oven-sh/bun/issues/1618 * Update http.exports.js * Make this test less flaky * fix hashes * Fix hex formatting and zls issues * Download zig version * Update Dockerfile * Update Dockerfile * Update uws * Update Dockerfile * Set llvm version * Update README.md * Update uws * Update Dockerfile * Update io_linux.zig * Update bun.zig * Log output * workaround strange @cInclude error * Make ffi tests better * Don't use cImport * Update c.zig * Update c-bindings.cpp * call setOutputDir * Update Dockerfile * Use a longer name * latest * Update serve.test.ts Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Co-authored-by: Veikka Tuominen <git@vexu.eu>
150 lines
5.6 KiB
Zig
150 lines
5.6 KiB
Zig
const std = @import("std");
|
|
|
|
const os = std.os;
|
|
const mem = std.mem;
|
|
const meta = std.meta;
|
|
const atomic = std.atomic;
|
|
const builtin = std.builtin;
|
|
const testing = std.testing;
|
|
|
|
const assert = std.debug.assert;
|
|
|
|
const mpsc = @This();
|
|
|
|
pub const cache_line_length = switch (@import("builtin").target.cpu.arch) {
|
|
.x86_64, .aarch64, .powerpc64 => 128,
|
|
.arm, .mips, .mips64, .riscv64 => 32,
|
|
.s390x => 256,
|
|
else => 64,
|
|
};
|
|
|
|
pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type {
|
|
const next_name = meta.fieldInfo(T, next_field).name;
|
|
return struct {
|
|
const Self = @This();
|
|
|
|
pub const Batch = struct {
|
|
pub const Iterator = struct {
|
|
batch: Self.Batch,
|
|
|
|
pub fn next(self: *Self.Batch.Iterator) ?*T {
|
|
if (self.batch.count == 0) return null;
|
|
const front = self.batch.front orelse unreachable;
|
|
self.batch.front = @field(front, next_name);
|
|
self.batch.count -= 1;
|
|
return front;
|
|
}
|
|
};
|
|
|
|
front: ?*T = null,
|
|
last: ?*T = null,
|
|
count: usize = 0,
|
|
|
|
pub fn iterator(self: Self.Batch) Self.Batch.Iterator {
|
|
return .{ .batch = self };
|
|
}
|
|
};
|
|
const next = next_name;
|
|
|
|
pub const queue_padding_length = cache_line_length / 2;
|
|
|
|
back: ?*T align(queue_padding_length) = null,
|
|
count: usize = 0,
|
|
front: T align(queue_padding_length) = init: {
|
|
var stub: T = undefined;
|
|
@field(stub, next) = null;
|
|
break :init stub;
|
|
},
|
|
|
|
pub fn push(self: *Self, src: *T) void {
|
|
assert(@atomicRmw(usize, &self.count, .Add, 1, .Release) >= 0);
|
|
|
|
@field(src, next) = null;
|
|
const old_back = @atomicRmw(?*T, &self.back, .Xchg, src, .AcqRel) orelse &self.front;
|
|
@field(old_back, next) = src;
|
|
}
|
|
|
|
pub fn pushBatch(self: *Self, first: *T, last: *T, count: usize) void {
|
|
assert(@atomicRmw(usize, &self.count, .Add, count, .Release) >= 0);
|
|
|
|
@field(last, next) = null;
|
|
const old_back = @atomicRmw(?*T, &self.back, .Xchg, last, .AcqRel) orelse &self.front;
|
|
@field(old_back, next) = first;
|
|
}
|
|
|
|
pub fn pop(self: *Self) ?*T {
|
|
const first = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return null;
|
|
if (@atomicLoad(?*T, &@field(first, next), .Acquire)) |next_item| {
|
|
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
|
|
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
|
|
return first;
|
|
}
|
|
const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
|
|
if (first != last) return null;
|
|
@atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
|
|
if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
|
|
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
|
|
return first;
|
|
}
|
|
var next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
|
|
while (next_item == null) : (atomic.spinLoopHint()) {
|
|
next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
|
|
}
|
|
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
|
|
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
|
|
return first;
|
|
}
|
|
|
|
pub fn popBatch(self: *Self) Self.Batch {
|
|
var batch: Self.Batch = .{};
|
|
|
|
var front = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return batch;
|
|
batch.front = front;
|
|
|
|
var next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
|
|
while (next_item) |next_node| : (next_item = @atomicLoad(?*T, &@field(next_node, next), .Acquire)) {
|
|
batch.count += 1;
|
|
batch.last = front;
|
|
|
|
front = next_node;
|
|
}
|
|
|
|
const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
|
|
if (front != last) {
|
|
@atomicStore(?*T, &@field(self.front, next), front, .Release);
|
|
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
|
|
return batch;
|
|
}
|
|
|
|
@atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
|
|
if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
|
|
batch.count += 1;
|
|
batch.last = front;
|
|
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
|
|
return batch;
|
|
}
|
|
|
|
next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
|
|
while (next_item == null) : (atomic.spinLoopHint()) {
|
|
next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
|
|
}
|
|
|
|
batch.count += 1;
|
|
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
|
|
batch.last = front;
|
|
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
|
|
return batch;
|
|
}
|
|
|
|
pub fn peek(self: *Self) usize {
|
|
const count = @atomicLoad(usize, &self.count, .Acquire);
|
|
assert(count >= 0);
|
|
return count;
|
|
}
|
|
|
|
pub fn isEmpty(self: *Self) bool {
|
|
return self.peek() == 0;
|
|
}
|
|
};
|
|
}
|