From ee68193de75e4530b477cf57515087fdbea186ca Mon Sep 17 00:00:00 2001 From: Alistair Smith Date: Fri, 29 Aug 2025 18:45:08 -0700 Subject: [PATCH] changes Co-authored-by: taylor.fish --- src/bun.js/web_worker.zig | 127 +++++++------------------------------- 1 file changed, 22 insertions(+), 105 deletions(-) diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index ff56da58f5..ddcdc8c595 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -12,7 +12,6 @@ const log = Output.scoped(.Worker, .hidden); vm: ?*jsc.VirtualMachine = null, status: std.atomic.Value(Status) = .init(.start), /// To prevent UAF, the `spin` function (aka the worker's event loop) will call deinit once this is set and properly exit the loop. -requested_terminate: std.atomic.Value(bool) = .init(false), execution_context_id: u32 = 0, parent_context_id: u32 = 0, parent: *jsc.VirtualMachine, @@ -65,22 +64,19 @@ export fn WebWorker__getParentWorker(vm: *jsc.VirtualMachine) ?*anyopaque { } pub fn hasRequestedTerminate(this: *const WebWorker) bool { - return this.requested_terminate.load(.monotonic); -} - -pub fn setRequestedTerminate(this: *WebWorker) bool { - return this.requested_terminate.swap(true, .release); + return this.lifecycle_handle.worker.load(.acquire) == null; } export fn WebWorker__updatePtr(handle: *WebWorkerLifecycleHandle, ptr: *anyopaque) bool { - handle.worker.?.cpp_worker = ptr; + const worker = handle.worker.swap(null, .monotonic) orelse return false; + worker.cpp_worker = ptr; var thread = std.Thread.spawn( .{ .stack_size = bun.default_thread_stack_size }, startWithErrorHandling, - .{handle.worker.?}, + .{worker}, ) catch { - handle.worker.?.destroy(); + worker.destroy(); return false; }; thread.detach(); @@ -453,7 +449,7 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje // notifyNeedTermination uses vm.global.requestTermination() instead of // vm.jsc.notifyNeedTermination() which avoid VMTraps assertion failures worker_.exit_called = true; - worker_.lifecycle_handle.requestTermination(); + _ = worker_.lifecycle_handle.requestTermination(); } } @@ -559,10 +555,11 @@ fn spin(this: *WebWorker) void { /// This is worker.ref()/.unref() from JS (Caller thread) pub fn setRef(handle: *WebWorkerLifecycleHandle, value: bool) callconv(.c) void { - if (handle.worker) |worker| { + if (handle.worker.load(.acquire)) |worker| { if (worker.hasRequestedTerminate()) { return; } + worker.setRefInternal(value); } } @@ -578,18 +575,15 @@ pub fn setRefInternal(this: *WebWorker, value: bool) void { /// Implement process.exit(). May only be called from the Worker thread. pub fn exit(this: *WebWorker) void { this.exit_called = true; - this.notifyNeedTermination(); + _ = this.lifecycle_handle.requestTermination(); } /// Request a terminate from any thread. -pub fn notifyNeedTermination(this: *WebWorker) void { +fn notifyNeedTermination(this: *WebWorker) void { if (this.status.load(.acquire) == .terminated) { return; } - if (this.setRequestedTerminate()) { - return; - } log("[{d}] notifyNeedTermination", .{this.execution_context_id}); if (this.vm) |vm| { @@ -632,7 +626,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn { vm_to_deinit = vm; } var arena = this.arena; - this.lifecycle_handle.onTermination(); + this.lifecycle_handle.deref(); WebWorker__dispatchExit(globalObject, cpp_worker, exit_code); if (loop) |loop_| { loop_.internal_loop_data.jsc_vm = null; @@ -656,7 +650,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn { pub export fn WebWorkerLifecycleHandle__requestTermination(handle: ?*WebWorkerLifecycleHandle) void { if (handle) |h| { - h.requestTermination(); + _ = h.requestTermination(); } } @@ -672,9 +666,7 @@ const WebWorkerLifecycleHandle = struct { pub const ref = WebWorkerLifecycleHandle.RefCount.ref; pub const deref = WebWorkerLifecycleHandle.RefCount.deref; - mutex: bun.Mutex = .{}, - worker: ?*WebWorker = null, - requested_terminate: std.atomic.Value(bool) = .init(false), + worker: std.atomic.Value(?*WebWorker), ref_count: WebWorkerLifecycleHandle.RefCount, pub const new = bun.TrivialNew(WebWorkerLifecycleHandle); @@ -699,13 +691,14 @@ const WebWorkerLifecycleHandle = struct { preload_modules_len: usize, ) callconv(.c) *WebWorkerLifecycleHandle { const handle = WebWorkerLifecycleHandle.new(.{ - .worker = null, + .worker = .init(null), .ref_count = .init(), }); const worker = create(cpp_worker, parent, name_str, specifier_str, error_message, parent_context_id, this_context_id, mini, default_unref, eval_mode, argv_ptr, argv_len, inherit_execArgv, execArgv_ptr, execArgv_len, preload_modules_ptr, preload_modules_len, handle); - handle.worker = worker; + handle.worker.store(worker, .release); + // Worker.cpp holds a reference to this handle.ref(); @@ -716,44 +709,15 @@ const WebWorkerLifecycleHandle = struct { bun.destroy(this); } - pub fn requestTermination(self: *WebWorkerLifecycleHandle) void { - if (self.requested_terminate.load(.acquire)) { - return; - } + pub const RequestTerminationResult = enum { success, already_requested }; - self.ref(); - self.mutex.lock(); + pub fn requestTermination(self: *WebWorkerLifecycleHandle) RequestTerminationResult { + const worker = self.worker.swap(null, .acq_rel) orelse return .already_requested; - if (self.requested_terminate.swap(true, .monotonic)) { - self.mutex.unlock(); - self.deref(); - return; - } + worker.notifyNeedTermination(); + worker.deref(); - if (self.worker) |worker| { - self.worker = null; - worker.notifyNeedTermination(); - self.mutex.unlock(); - worker.deref(); - } else { - self.mutex.unlock(); - // Let the reference counting system handle deinitialization - self.deref(); - } - - self.deref(); - } - - pub fn onTermination(self: *WebWorkerLifecycleHandle) void { - self.mutex.lock(); - self.worker = null; - const was_requested = self.requested_terminate.swap(false, .acquire); - self.mutex.unlock(); - - // requestTermination() adds an extra ref that we need to release - if (was_requested) { - self.deref(); - } + return .success; } }; @@ -762,53 +726,6 @@ comptime { @export(&setRef, .{ .name = "WebWorker__setRef" }); } -const TerminationHandle = struct { - mutex: bun.Mutex = .{}, - worker: ?*WebWorker = null, - requested_terminate: std.atomic.Value(bool) = .init(false), - - pub fn new(worker: *WebWorker) *TerminationHandle { - const handle = bun.default_allocator.create(TerminationHandle) catch bun.outOfMemory(); - handle.worker = worker; - return handle; - } - - pub fn deinit(this: *TerminationHandle) void { - this.worker.?.deinit(); - this.mutex.deinit(); - bun.default_allocator.destroy(this); - } - - pub fn requestTermination(this: *TerminationHandle) void { - this.mutex.lock(); - this.requested_terminate.store(true, .acquire); - const worker = this.worker orelse { - // the worker already terminated. - this.mutex.unlock(); - this.deinit(); - return; - }; - this.worker = null; - worker.notifyNeedTermination(); - this.mutex.unlock(); - worker.deref(); - } - - pub fn onTermination(this: *TerminationHandle) void { - this.mutex.lock(); - if (this.requested_terminate.swap(false, .acquire)) { - // we already requested to terminate, therefore this handle has - // already been consumed on the other thread and we are able to free - // it. - this.mutex.unlock(); - this.deinit(); - return; - } - this.worker = null; - this.mutex.unlock(); - } -}; - const std = @import("std"); const WTFStringImpl = @import("../string.zig").WTFStringImpl;