Co-authored-by: taylor.fish <contact@taylor.fish>
This commit is contained in:
Alistair Smith
2025-08-29 18:45:08 -07:00
parent 2210add1a8
commit ee68193de7

View File

@@ -12,7 +12,6 @@ const log = Output.scoped(.Worker, .hidden);
vm: ?*jsc.VirtualMachine = null,
status: std.atomic.Value(Status) = .init(.start),
/// To prevent UAF, the `spin` function (aka the worker's event loop) will call deinit once this is set and properly exit the loop.
requested_terminate: std.atomic.Value(bool) = .init(false),
execution_context_id: u32 = 0,
parent_context_id: u32 = 0,
parent: *jsc.VirtualMachine,
@@ -65,22 +64,19 @@ export fn WebWorker__getParentWorker(vm: *jsc.VirtualMachine) ?*anyopaque {
}
pub fn hasRequestedTerminate(this: *const WebWorker) bool {
return this.requested_terminate.load(.monotonic);
}
pub fn setRequestedTerminate(this: *WebWorker) bool {
return this.requested_terminate.swap(true, .release);
return this.lifecycle_handle.worker.load(.acquire) == null;
}
export fn WebWorker__updatePtr(handle: *WebWorkerLifecycleHandle, ptr: *anyopaque) bool {
handle.worker.?.cpp_worker = ptr;
const worker = handle.worker.swap(null, .monotonic) orelse return false;
worker.cpp_worker = ptr;
var thread = std.Thread.spawn(
.{ .stack_size = bun.default_thread_stack_size },
startWithErrorHandling,
.{handle.worker.?},
.{worker},
) catch {
handle.worker.?.destroy();
worker.destroy();
return false;
};
thread.detach();
@@ -453,7 +449,7 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
// notifyNeedTermination uses vm.global.requestTermination() instead of
// vm.jsc.notifyNeedTermination() which avoid VMTraps assertion failures
worker_.exit_called = true;
worker_.lifecycle_handle.requestTermination();
_ = worker_.lifecycle_handle.requestTermination();
}
}
@@ -559,10 +555,11 @@ fn spin(this: *WebWorker) void {
/// This is worker.ref()/.unref() from JS (Caller thread)
pub fn setRef(handle: *WebWorkerLifecycleHandle, value: bool) callconv(.c) void {
if (handle.worker) |worker| {
if (handle.worker.load(.acquire)) |worker| {
if (worker.hasRequestedTerminate()) {
return;
}
worker.setRefInternal(value);
}
}
@@ -578,18 +575,15 @@ pub fn setRefInternal(this: *WebWorker, value: bool) void {
/// Implement process.exit(). May only be called from the Worker thread.
pub fn exit(this: *WebWorker) void {
this.exit_called = true;
this.notifyNeedTermination();
_ = this.lifecycle_handle.requestTermination();
}
/// Request a terminate from any thread.
pub fn notifyNeedTermination(this: *WebWorker) void {
fn notifyNeedTermination(this: *WebWorker) void {
if (this.status.load(.acquire) == .terminated) {
return;
}
if (this.setRequestedTerminate()) {
return;
}
log("[{d}] notifyNeedTermination", .{this.execution_context_id});
if (this.vm) |vm| {
@@ -632,7 +626,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
vm_to_deinit = vm;
}
var arena = this.arena;
this.lifecycle_handle.onTermination();
this.lifecycle_handle.deref();
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);
if (loop) |loop_| {
loop_.internal_loop_data.jsc_vm = null;
@@ -656,7 +650,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
pub export fn WebWorkerLifecycleHandle__requestTermination(handle: ?*WebWorkerLifecycleHandle) void {
if (handle) |h| {
h.requestTermination();
_ = h.requestTermination();
}
}
@@ -672,9 +666,7 @@ const WebWorkerLifecycleHandle = struct {
pub const ref = WebWorkerLifecycleHandle.RefCount.ref;
pub const deref = WebWorkerLifecycleHandle.RefCount.deref;
mutex: bun.Mutex = .{},
worker: ?*WebWorker = null,
requested_terminate: std.atomic.Value(bool) = .init(false),
worker: std.atomic.Value(?*WebWorker),
ref_count: WebWorkerLifecycleHandle.RefCount,
pub const new = bun.TrivialNew(WebWorkerLifecycleHandle);
@@ -699,13 +691,14 @@ const WebWorkerLifecycleHandle = struct {
preload_modules_len: usize,
) callconv(.c) *WebWorkerLifecycleHandle {
const handle = WebWorkerLifecycleHandle.new(.{
.worker = null,
.worker = .init(null),
.ref_count = .init(),
});
const worker = create(cpp_worker, parent, name_str, specifier_str, error_message, parent_context_id, this_context_id, mini, default_unref, eval_mode, argv_ptr, argv_len, inherit_execArgv, execArgv_ptr, execArgv_len, preload_modules_ptr, preload_modules_len, handle);
handle.worker = worker;
handle.worker.store(worker, .release);
// Worker.cpp holds a reference to this
handle.ref();
@@ -716,44 +709,15 @@ const WebWorkerLifecycleHandle = struct {
bun.destroy(this);
}
pub fn requestTermination(self: *WebWorkerLifecycleHandle) void {
if (self.requested_terminate.load(.acquire)) {
return;
}
pub const RequestTerminationResult = enum { success, already_requested };
self.ref();
self.mutex.lock();
pub fn requestTermination(self: *WebWorkerLifecycleHandle) RequestTerminationResult {
const worker = self.worker.swap(null, .acq_rel) orelse return .already_requested;
if (self.requested_terminate.swap(true, .monotonic)) {
self.mutex.unlock();
self.deref();
return;
}
worker.notifyNeedTermination();
worker.deref();
if (self.worker) |worker| {
self.worker = null;
worker.notifyNeedTermination();
self.mutex.unlock();
worker.deref();
} else {
self.mutex.unlock();
// Let the reference counting system handle deinitialization
self.deref();
}
self.deref();
}
pub fn onTermination(self: *WebWorkerLifecycleHandle) void {
self.mutex.lock();
self.worker = null;
const was_requested = self.requested_terminate.swap(false, .acquire);
self.mutex.unlock();
// requestTermination() adds an extra ref that we need to release
if (was_requested) {
self.deref();
}
return .success;
}
};
@@ -762,53 +726,6 @@ comptime {
@export(&setRef, .{ .name = "WebWorker__setRef" });
}
const TerminationHandle = struct {
mutex: bun.Mutex = .{},
worker: ?*WebWorker = null,
requested_terminate: std.atomic.Value(bool) = .init(false),
pub fn new(worker: *WebWorker) *TerminationHandle {
const handle = bun.default_allocator.create(TerminationHandle) catch bun.outOfMemory();
handle.worker = worker;
return handle;
}
pub fn deinit(this: *TerminationHandle) void {
this.worker.?.deinit();
this.mutex.deinit();
bun.default_allocator.destroy(this);
}
pub fn requestTermination(this: *TerminationHandle) void {
this.mutex.lock();
this.requested_terminate.store(true, .acquire);
const worker = this.worker orelse {
// the worker already terminated.
this.mutex.unlock();
this.deinit();
return;
};
this.worker = null;
worker.notifyNeedTermination();
this.mutex.unlock();
worker.deref();
}
pub fn onTermination(this: *TerminationHandle) void {
this.mutex.lock();
if (this.requested_terminate.swap(false, .acquire)) {
// we already requested to terminate, therefore this handle has
// already been consumed on the other thread and we are able to free
// it.
this.mutex.unlock();
this.deinit();
return;
}
this.worker = null;
this.mutex.unlock();
}
};
const std = @import("std");
const WTFStringImpl = @import("../string.zig").WTFStringImpl;