From d50acbc0bdf00eb14915d23a66e4db464294a3db Mon Sep 17 00:00:00 2001 From: Georgijs <48869301+gvilums@users.noreply.github.com> Date: Wed, 22 May 2024 16:57:05 -0700 Subject: [PATCH] Fix `refConcurrent` and `unrefConcurrent` (#11280) Co-authored-by: Georgijs Vilums <=> --- src/async/posix_event_loop.zig | 12 +++------ src/async/windows_event_loop.zig | 7 ++---- src/bun.js/event_loop.zig | 35 +++++++++++++++++++++++++-- src/bun.js/web_worker.zig | 3 +-- src/bun.js/webcore/blob.zig | 4 +-- src/bun.js/webcore/blob/WriteFile.zig | 8 +++--- src/deps/libuv.zig | 10 -------- src/deps/uws.zig | 10 -------- 8 files changed, 46 insertions(+), 43 deletions(-) diff --git a/src/async/posix_event_loop.zig b/src/async/posix_event_loop.zig index f7ca031337..f531940a69 100644 --- a/src/async/posix_event_loop.zig +++ b/src/async/posix_event_loop.zig @@ -63,13 +63,11 @@ pub const KeepAlive = struct { } /// From another thread, Prevent a poll from keeping the process alive. - pub fn unrefConcurrently(this: *KeepAlive, event_loop_ctx_: anytype) void { - const event_loop_ctx = JSC.AbstractVM(event_loop_ctx_); + pub fn unrefConcurrently(this: *KeepAlive, vm: *JSC.VirtualMachine) void { if (this.status != .active) return; this.status = .inactive; - // vm.event_loop_handle.?.unrefConcurrently(); - event_loop_ctx.platformEventLoop().unrefConcurrently(); + vm.event_loop.unrefConcurrently(); } /// Prevent a poll from keeping the process alive on the next tick. @@ -106,13 +104,11 @@ pub const KeepAlive = struct { } /// Allow a poll to keep the process alive. - pub fn refConcurrently(this: *KeepAlive, event_loop_ctx_: anytype) void { - const event_loop_ctx = JSC.AbstractVM(event_loop_ctx_); + pub fn refConcurrently(this: *KeepAlive, vm: *JSC.VirtualMachine) void { if (this.status != .inactive) return; this.status = .active; - // vm.event_loop_handle.?.refConcurrently(); - event_loop_ctx.platformEventLoop().refConcurrently(); + vm.event_loop.refConcurrently(); } pub fn refConcurrentlyFromEventLoop(this: *KeepAlive, loop: *JSC.EventLoop) void { diff --git a/src/async/windows_event_loop.zig b/src/async/windows_event_loop.zig index 978bbd69a3..09262e2895 100644 --- a/src/async/windows_event_loop.zig +++ b/src/async/windows_event_loop.zig @@ -68,9 +68,7 @@ pub const KeepAlive = struct { if (this.status != .active) return; this.status = .inactive; - - // TODO: https://github.com/oven-sh/bun/pull/4410#discussion_r1317326194 - vm.event_loop_handle.?.dec(); + vm.event_loop.unrefConcurrently(); } /// Prevent a poll from keeping the process alive on the next tick. @@ -109,8 +107,7 @@ pub const KeepAlive = struct { if (this.status != .inactive) return; this.status = .active; - // TODO: https://github.com/oven-sh/bun/pull/4410#discussion_r1317326194 - vm.event_loop_handle.?.inc(); + vm.event_loop.refConcurrently(); } pub fn refConcurrentlyFromEventLoop(this: *KeepAlive, loop: *JSC.EventLoop) void { diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 59ce94dd0b..fb27a411e7 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -310,9 +310,9 @@ pub const JSCScheduler = struct { JSC.markBinding(@src()); if (delta > 0) { - jsc_vm.event_loop_handle.?.refConcurrently(); + jsc_vm.event_loop.refConcurrently(); } else { - jsc_vm.event_loop_handle.?.unrefConcurrently(); + jsc_vm.event_loop.unrefConcurrently(); } } @@ -775,6 +775,7 @@ pub const EventLoop = struct { debug: Debug = .{}, entered_event_loop_count: isize = 0, + concurrent_ref: std.atomic.Value(i32) = std.atomic.Value(i32).init(0), pub const Debug = if (Environment.isDebug) struct { is_inside_tick_queue: bool = false, @@ -1270,6 +1271,24 @@ pub const EventLoop = struct { pub fn tickConcurrentWithCount(this: *EventLoop) usize { JSC.markBinding(@src()); + const delta = this.concurrent_ref.swap(0, .Monotonic); + const loop = this.virtual_machine.event_loop_handle.?; + if (comptime Environment.isWindows) { + if (delta > 0) { + loop.active_handles += @intCast(delta); + } else { + loop.active_handles -= @intCast(-delta); + } + } else { + if (delta > 0) { + loop.num_polls += @intCast(delta); + loop.active += @intCast(delta); + } else { + loop.num_polls -= @intCast(-delta); + loop.active -= @intCast(-delta); + } + } + var concurrent = this.concurrent_tasks.popBatch(); const count = concurrent.count; if (count == 0) @@ -1639,6 +1658,18 @@ pub const EventLoop = struct { this.concurrent_tasks.pushBatch(batch.front.?, batch.last.?, batch.count); this.wakeup(); } + + pub fn refConcurrently(this: *EventLoop) void { + // TODO maybe this should be AcquireRelease + _ = this.concurrent_ref.fetchAdd(1, .Monotonic); + this.wakeup(); + } + + pub fn unrefConcurrently(this: *EventLoop) void { + // TODO maybe this should be AcquireRelease + _ = this.concurrent_ref.fetchSub(1, .Monotonic); + this.wakeup(); + } }; pub const JsVM = struct { diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index 665ca7d428..02e033d4ed 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -123,7 +123,7 @@ pub const WebWorker = struct { .execArgv = if (execArgv_ptr) |ptr| ptr[0..execArgv_len] else null, }; - worker.parent_poll_ref.refConcurrently(parent); + worker.parent_poll_ref.ref(parent); return worker; } @@ -190,7 +190,6 @@ pub const WebWorker = struct { fn deinit(this: *WebWorker) void { log("[{d}] deinit", .{this.execution_context_id}); this.parent_poll_ref.unrefConcurrently(this.parent); - this.parent.event_loop_handle.?.wakeup(); bun.default_allocator.free(this.specifier); bun.default_allocator.destroy(this); } diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig index 33383ba77f..059f24f8cd 100644 --- a/src/bun.js/webcore/blob.zig +++ b/src/bun.js/webcore/blob.zig @@ -2021,7 +2021,7 @@ pub const Blob = struct { }); return; } - loop.refConcurrently(); + this.event_loop.refConcurrently(); } pub fn throw(this: *CopyFileWindows, err: bun.sys.Error) void { @@ -2039,7 +2039,7 @@ pub const Blob = struct { var this: *CopyFileWindows = @fieldParentPtr(CopyFileWindows, "io_request", req); assert(req.data == @as(?*anyopaque, @ptrCast(this))); var event_loop = this.event_loop; - event_loop.virtual_machine.event_loop_handle.?.unrefConcurrently(); + event_loop.unrefConcurrently(); const rc = req.result; bun.sys.syslog("uv_fs_copyfile() = {}", .{rc}); diff --git a/src/bun.js/webcore/blob/WriteFile.zig b/src/bun.js/webcore/blob/WriteFile.zig index 055dcebc15..77a604daec 100644 --- a/src/bun.js/webcore/blob/WriteFile.zig +++ b/src/bun.js/webcore/blob/WriteFile.zig @@ -416,7 +416,7 @@ pub const WriteFileWindows = struct { }, } - write_file.loop().refConcurrently(); + write_file.event_loop.refConcurrently(); return write_file; } pub const ResultType = WriteFile.ResultType; @@ -492,7 +492,7 @@ pub const WriteFileWindows = struct { fn mkdirp(this: *WriteFileWindows) void { log("mkdirp", .{}); this.mkdirp_if_not_exists = false; - this.loop().refConcurrently(); + this.event_loop.refConcurrently(); const path = this.file_blob.store.?.data.file.pathlike.path.slice(); JSC.Node.Async.AsyncMkdirp.new(.{ @@ -505,7 +505,7 @@ pub const WriteFileWindows = struct { } fn onMkdirpComplete(this: *WriteFileWindows) void { - this.loop().unrefConcurrently(); + this.event_loop.unrefConcurrently(); if (this.err) |err| { this.throw(err); @@ -540,7 +540,7 @@ pub const WriteFileWindows = struct { } pub fn onFinish(container: *WriteFileWindows) void { - container.loop().unrefConcurrently(); + container.event_loop.unrefConcurrently(); var event_loop = container.event_loop; event_loop.enter(); defer event_loop.exit(); diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index 8c737764a7..73dc0f7d9e 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -682,16 +682,6 @@ pub const Loop = extern struct { this.wq_async.send(); } - pub fn refConcurrently(this: *Loop) void { - log("refConcurrently", .{}); - _ = @atomicRmw(c_uint, &this.active_handles, std.builtin.AtomicRmwOp.Add, 1, .Monotonic); - } - - pub fn unrefConcurrently(this: *Loop) void { - log("unrefConcurrently", .{}); - _ = @atomicRmw(c_uint, &this.active_handles, std.builtin.AtomicRmwOp.Sub, 1, .Monotonic); - } - pub fn unrefCount(this: *Loop, count: i32) void { log("unrefCount({d})", .{count}); this.active_handles -= @intCast(count); diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 1374585f14..a8997594ad 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -1226,16 +1226,6 @@ pub const PosixLoop = extern struct { this.num_polls += 1; this.active += 1; } - pub fn refConcurrently(this: *PosixLoop) void { - _ = @atomicRmw(@TypeOf(this.num_polls), &this.num_polls, .Add, 1, .Monotonic); - _ = @atomicRmw(@TypeOf(this.active), &this.active, .Add, 1, .Monotonic); - log("refConcurrently ({d}, {d})", .{ this.num_polls, this.active }); - } - pub fn unrefConcurrently(this: *PosixLoop) void { - _ = @atomicRmw(@TypeOf(this.num_polls), &this.num_polls, .Sub, 1, .Monotonic); - _ = @atomicRmw(@TypeOf(this.active), &this.active, .Sub, 1, .Monotonic); - log("unrefConcurrently ({d}, {d})", .{ this.num_polls, this.active }); - } pub fn unref(this: *PosixLoop) void { log("unref", .{});