Fix atomic operations to preserve auto_delete flag

The previous implementation had a critical bug where atomic stores in
pushBatch would overwrite the auto_delete flag. This fix ensures:

1. When atomically updating the next pointer, we preserve the existing
   auto_delete flag by doing a read-modify-write
2. All atomic operations use consistent types:
   - u64 for packed structs (via bitcast)
   - ?*T for plain pointers (front/back queue pointers)
3. Added initPreserveAutoDelete() method to create new PackedNext
   values while preserving the auto_delete bit

The key insight is that front/back hold plain pointers to tasks, while
each task's 'next' field holds a PackedNext struct. When updating a
task's next pointer via pushBatch, we must preserve that task's
auto_delete flag.
This commit is contained in:
Claude Bot
2025-11-02 12:05:59 +00:00
parent 0c52688060
commit ade70ec09e
2 changed files with 32 additions and 20 deletions

View File

@@ -28,6 +28,20 @@ pub const PackedNext = packed struct(u64) {
return .{};
}
pub fn initPreserveAutoDelete(self: PackedNext, ptr: ?*ConcurrentTask) PackedNext {
if (ptr) |p| {
const addr = @intFromPtr(p);
return .{
.ptr_bits = @as(u63, @truncate(addr)),
.auto_delete = self.auto_delete,
};
}
return .{
.ptr_bits = 0,
.auto_delete = self.auto_delete,
};
}
pub fn get(self: PackedNext) ?*ConcurrentTask {
if (self.ptr_bits == 0) return null;
const ptr: u64 = @as(u64, self.ptr_bits);
@@ -39,6 +53,8 @@ pub const PackedNext = packed struct(u64) {
}
pub fn setAutoDelete(self: *PackedNext, value: bool) void {
// Non-atomic write is safe because this is only called during initialization
// before the task is shared with other threads
self.auto_delete = value;
}

View File

@@ -24,8 +24,7 @@ pub fn UnboundedQueuePacked(comptime T: type, comptime next_field: meta.FieldEnu
fn setter(instance: *T, value: ?*T) void {
if (packed_or_unpacked == .@"packed") {
const FieldType = @TypeOf(@field(instance, next_name));
@field(instance, next_name) = FieldType.init(value);
@field(instance, next_name) = @field(instance, next_name).initPreserveAutoDelete(value);
} else {
@field(instance, next_name) = value;
}
@@ -72,26 +71,23 @@ pub fn UnboundedQueuePacked(comptime T: type, comptime next_field: meta.FieldEnu
}
assertf(item == last, "`last` should be reachable from `first`", .{});
}
const prev_next_ptr = if (self.back.swap(last, .acq_rel)) |old_back| blk: {
if (packed_or_unpacked == .@"packed") {
break :blk @as(*u64, @ptrCast(&@field(old_back, next)));
} else {
break :blk &@field(old_back, next);
}
} else blk: {
if (packed_or_unpacked == .@"packed") {
break :blk @as(*u64, @ptrCast(&self.front.raw));
} else {
break :blk &self.front.raw;
}
};
if (packed_or_unpacked == .@"packed") {
const FieldType = @TypeOf(@field(first, next_name));
const packed_val = FieldType.init(first);
@atomicStore(u64, @as(*u64, @ptrCast(prev_next_ptr)), @bitCast(packed_val), .release);
if (self.back.swap(last, .acq_rel)) |old_back| {
// There was a previous back item - set its `next` field to point to `first`
if (packed_or_unpacked == .@"packed") {
// We need to preserve old_back's auto_delete flag while updating the pointer
const prev_next_ptr = @as(*u64, @ptrCast(&@field(old_back, next)));
const current_packed = @atomicLoad(u64, prev_next_ptr, .monotonic);
const current_typed = @as(@TypeOf(@field(first, next_name)), @bitCast(current_packed));
const new_packed = current_typed.initPreserveAutoDelete(first);
@atomicStore(u64, prev_next_ptr, @bitCast(new_packed), .release);
} else {
@atomicStore(?*T, &@field(old_back, next), first, .release);
}
} else {
@atomicStore(?*T, prev_next_ptr, first, .release);
// Queue was empty - set front to point to `first`
// front/back atomics hold plain pointers, not PackedNext
@atomicStore(?*T, &self.front.raw, first, .release);
}
}