diff --git a/src/bake/DevServer/WatcherAtomics.zig b/src/bake/DevServer/WatcherAtomics.zig index 2de6123054..86f5e5c37a 100644 --- a/src/bake/DevServer/WatcherAtomics.zig +++ b/src/bake/DevServer/WatcherAtomics.zig @@ -132,6 +132,8 @@ pub fn watcherReleaseAndSubmitEvent(self: *Self, ev: *HotReloadEvent) void { self.dbg_server_event = ev; } ev.concurrent_task = .{ + .auto_delete = false, + .next = null, .task = jsc.Task.init(ev), }; ev.owner.vm.event_loop.enqueueTaskConcurrent(&ev.concurrent_task); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index a922b6b38a..281238dcf7 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -321,7 +321,7 @@ pub fn tickConcurrentWithCount(this: *EventLoop) usize { dest.deinit(); } - if (task.autoDelete()) { + if (task.auto_delete) { to_destroy = task; } diff --git a/src/bun.js/event_loop/ConcurrentTask.zig b/src/bun.js/event_loop/ConcurrentTask.zig index e449564608..5f8ca7c73e 100644 --- a/src/bun.js/event_loop/ConcurrentTask.zig +++ b/src/bun.js/event_loop/ConcurrentTask.zig @@ -11,63 +11,8 @@ const ConcurrentTask = @This(); task: Task = undefined, -/// Packed representation of the next pointer and auto_delete flag. -/// Uses the low bit to store auto_delete (since pointers are at least 2-byte aligned). -next: PackedNextPtr = .none, - -/// Packed next pointer that encodes both the next ConcurrentTask pointer and the auto_delete flag. -/// Uses the low bit for auto_delete since ConcurrentTask pointers are at least 2-byte aligned. -pub const PackedNextPtr = enum(usize) { - none = 0, - auto_delete = 1, - _, - - pub inline fn init(ptr: ?*ConcurrentTask, auto_del: bool) PackedNextPtr { - const ptr_bits = if (ptr) |p| @intFromPtr(p) else 0; - return @enumFromInt(ptr_bits | @intFromBool(auto_del)); - } - - pub inline fn getPtr(self: PackedNextPtr) ?*ConcurrentTask { - const addr = @intFromEnum(self) & ~@as(usize, 1); - return if (addr == 0) null else @ptrFromInt(addr); - } - - pub inline fn setPtr(self: *PackedNextPtr, ptr: ?*ConcurrentTask) void { - const auto_del = @intFromEnum(self.*) & 1; - const ptr_bits = if (ptr) |p| @intFromPtr(p) else 0; - self.* = @enumFromInt(ptr_bits | auto_del); - } - - pub inline fn isAutoDelete(self: PackedNextPtr) bool { - return (@intFromEnum(self) & 1) != 0; - } - - pub inline fn atomicLoadPtr(self: *const PackedNextPtr, ordering: std.builtin.AtomicOrder) ?*ConcurrentTask { - const value = @atomicLoad(usize, @as(*const usize, @ptrCast(self)), ordering); - const addr = value & ~@as(usize, 1); - return if (addr == 0) null else @ptrFromInt(addr); - } - - pub inline fn atomicStorePtr(self: *PackedNextPtr, ptr: ?*ConcurrentTask, ordering: std.builtin.AtomicOrder) void { - const ptr_bits = if (ptr) |p| @intFromPtr(p) else 0; - // auto_delete is immutable after construction, so we can safely read it - // with a relaxed load and preserve it in the new value. - const self_ptr: *usize = @ptrCast(self); - const auto_del_bit = @atomicLoad(usize, self_ptr, .monotonic) & 1; - @atomicStore(usize, self_ptr, ptr_bits | auto_del_bit, ordering); - } -}; - -comptime { - if (@sizeOf(ConcurrentTask) != 16) { - @compileError("ConcurrentTask should be 16 bytes, but is " ++ std.fmt.comptimePrint("{}", .{@sizeOf(ConcurrentTask)}) ++ " bytes"); - } - // PackedNextPtr stores a pointer in the upper bits and auto_delete in bit 0. - // This requires ConcurrentTask to be at least 2-byte aligned. - if (@alignOf(ConcurrentTask) < 2) { - @compileError("ConcurrentTask must be at least 2-byte aligned for pointer packing, but alignment is " ++ std.fmt.comptimePrint("{}", .{@alignOf(ConcurrentTask)})); - } -} +next: ?*ConcurrentTask = null, +auto_delete: bool = false, pub const Queue = UnboundedQueue(ConcurrentTask, .next); pub const new = bun.TrivialNew(@This()); @@ -77,11 +22,11 @@ pub const AutoDeinit = enum { manual_deinit, auto_deinit, }; - pub fn create(task: Task) *ConcurrentTask { return ConcurrentTask.new(.{ .task = task, - .next = .auto_delete, + .next = null, + .auto_delete = true, }); } @@ -101,20 +46,16 @@ pub fn from(this: *ConcurrentTask, of: anytype, auto_deinit: AutoDeinit) *Concur this.* = .{ .task = Task.init(of), - .next = if (auto_deinit == .auto_deinit) .auto_delete else .none, + .next = null, + .auto_delete = auto_deinit == .auto_deinit, }; return this; } -/// Returns whether this task should be automatically deallocated after execution. -pub fn autoDelete(this: *const ConcurrentTask) bool { - return this.next.isAutoDelete(); -} - const std = @import("std"); const bun = @import("bun"); -const UnboundedQueue = bun.UnboundedQueue; +const UnboundedQueue = bun.threading.UnboundedQueue; const jsc = bun.jsc; const ManagedTask = jsc.ManagedTask; diff --git a/src/bun.js/event_loop/JSCScheduler.zig b/src/bun.js/event_loop/JSCScheduler.zig index b2f48786ac..dbce749657 100644 --- a/src/bun.js/event_loop/JSCScheduler.zig +++ b/src/bun.js/event_loop/JSCScheduler.zig @@ -27,7 +27,8 @@ export fn Bun__queueJSCDeferredWorkTaskConcurrently(jsc_vm: *VirtualMachine, tas var loop = jsc_vm.eventLoop(); loop.enqueueTaskConcurrent(ConcurrentTask.new(.{ .task = Task.init(task), - .next = .auto_delete, + .next = null, + .auto_delete = true, })); } diff --git a/src/bun.js/hot_reloader.zig b/src/bun.js/hot_reloader.zig index ff4e580423..2e311bd484 100644 --- a/src/bun.js/hot_reloader.zig +++ b/src/bun.js/hot_reloader.zig @@ -225,7 +225,7 @@ pub fn NewHotReloader(comptime Ctx: type, comptime EventLoopType: type, comptime .hashes = this.hashes, .concurrent_task = undefined, }); - that.concurrent_task = .{ .task = jsc.Task.init(that) }; + that.concurrent_task = .{ .task = jsc.Task.init(that), .auto_delete = false }; that.reloader.enqueueTaskConcurrent(&that.concurrent_task); this.count = 0; } diff --git a/src/threading/unbounded_queue.zig b/src/threading/unbounded_queue.zig index f5f71fcd30..612a635a77 100644 --- a/src/threading/unbounded_queue.zig +++ b/src/threading/unbounded_queue.zig @@ -6,56 +6,10 @@ pub const cache_line_length = switch (@import("builtin").target.cpu.arch) { }; pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type { - const field_info = meta.fieldInfo(T, next_field); - const next_name = field_info.name; - const FieldType = field_info.type; - - // Check if the field type has custom accessors (for packed pointer types). - // If so, use the accessor methods instead of direct field access. - const has_custom_accessors = @typeInfo(FieldType) != .optional and - @hasDecl(FieldType, "getPtr") and - @hasDecl(FieldType, "setPtr") and - @hasDecl(FieldType, "atomicLoadPtr") and - @hasDecl(FieldType, "atomicStorePtr"); - + const next_name = meta.fieldInfo(T, next_field).name; return struct { const Self = @This(); - inline fn getNext(item: *T) ?*T { - if (comptime has_custom_accessors) { - return @field(item, next_name).getPtr(); - } else { - return @field(item, next_name); - } - } - - inline fn setNext(item: *T, ptr: ?*T) void { - if (comptime has_custom_accessors) { - const field_ptr: *FieldType = &@field(item, next_name); - field_ptr.setPtr(ptr); - } else { - @field(item, next_name) = ptr; - } - } - - inline fn atomicLoadNext(item: *T, ordering: std.builtin.AtomicOrder) ?*T { - if (comptime has_custom_accessors) { - const field_ptr: *FieldType = &@field(item, next_name); - return field_ptr.atomicLoadPtr(ordering); - } else { - return @atomicLoad(?*T, &@field(item, next_name), ordering); - } - } - - inline fn atomicStoreNext(item: *T, ptr: ?*T, ordering: std.builtin.AtomicOrder) void { - if (comptime has_custom_accessors) { - const field_ptr: *FieldType = &@field(item, next_name); - field_ptr.atomicStorePtr(ptr, ordering); - } else { - @atomicStore(?*T, &@field(item, next_name), ptr, ordering); - } - } - pub const Batch = struct { pub const Iterator = struct { batch: Self.Batch, @@ -63,7 +17,7 @@ pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) pub fn next(self: *Self.Batch.Iterator) ?*T { if (self.batch.count == 0) return null; const front = self.batch.front orelse unreachable; - self.batch.front = getNext(front); + self.batch.front = @field(front, next_name); self.batch.count -= 1; return front; } @@ -78,6 +32,7 @@ pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) } }; + const next = next_name; pub const queue_padding_length = cache_line_length / 2; back: std.atomic.Value(?*T) align(queue_padding_length) = .init(null), @@ -88,31 +43,31 @@ pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) } pub fn pushBatch(self: *Self, first: *T, last: *T) void { - setNext(last, null); + @field(last, next) = null; if (comptime bun.Environment.allow_assert) { var item = first; - while (getNext(item)) |next_item| { + while (@field(item, next)) |next_item| { item = next_item; } assertf(item == last, "`last` should be reachable from `first`", .{}); } - if (self.back.swap(last, .acq_rel)) |old_back| { - atomicStoreNext(old_back, first, .release); - } else { - self.front.store(first, .release); - } + const prev_next_ptr = if (self.back.swap(last, .acq_rel)) |old_back| + &@field(old_back, next) + else + &self.front.raw; + @atomicStore(?*T, prev_next_ptr, first, .release); } pub fn pop(self: *Self) ?*T { var first = self.front.load(.acquire) orelse return null; const next_item = while (true) { - const next_ptr = atomicLoadNext(first, .acquire); + const next_item = @atomicLoad(?*T, &@field(first, next), .acquire); const maybe_first = self.front.cmpxchgWeak( first, - next_ptr, + next_item, .release, // not .acq_rel because we already loaded this value with .acquire .acquire, - ) orelse break next_ptr; + ) orelse break next_item; first = maybe_first orelse return null; }; if (next_item != null) return first; @@ -130,7 +85,7 @@ pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) // Another item was added to the queue before we could finish removing this one. const new_first = while (true) : (atomic.spinLoopHint()) { // Wait for push/pushBatch to set `next`. - break atomicLoadNext(first, .acquire) orelse continue; + break @atomicLoad(?*T, &@field(first, next), .acquire) orelse continue; }; self.front.store(new_first, .release); @@ -153,7 +108,7 @@ pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) while (next_item != last) : (batch.count += 1) { next_item = while (true) : (atomic.spinLoopHint()) { // Wait for push/pushBatch to set `next`. - break atomicLoadNext(next_item, .acquire) orelse continue; + break @atomicLoad(?*T, &@field(next_item, next), .acquire) orelse continue; }; }