Compare commits

...

1 Commits

Author SHA1 Message Date
Jarred Sumner
ddc5da1464 wip doesnt work 2024-02-17 05:50:48 -08:00
5 changed files with 102 additions and 35 deletions

View File

@@ -11,6 +11,7 @@ pub const Loop = uws.Loop;
/// This is not reference counted. It only tracks active or inactive.
pub const KeepAlive = struct {
status: Status = .inactive,
debug_poll_id: if (Environment.isDebug) i64 else void = if (Environment.isDebug) -1 else {},
const log = Output.scoped(.KeepAlive, false);
@@ -54,7 +55,10 @@ pub const KeepAlive = struct {
if (this.status != .active)
return;
this.status = .inactive;
// vm.event_loop_handle.?.subActive(1);
if (comptime Environment.isDebug) {
if (comptime @TypeOf(event_loop_ctx) == *JSC.VirtualMachine)
event_loop_ctx.vm.eventLoop().debug.removePoll(this.debug_poll_id);
}
event_loop_ctx.platformEventLoop().subActive(1);
}
@@ -65,6 +69,10 @@ pub const KeepAlive = struct {
return;
this.status = .inactive;
// vm.event_loop_handle.?.unrefConcurrently();
if (comptime Environment.isDebug) {
if (comptime @TypeOf(event_loop_ctx) == *JSC.VirtualMachine)
event_loop_ctx.vm.eventLoop().debug.removePoll(this.debug_poll_id);
}
event_loop_ctx.platformEventLoop().unrefConcurrently();
}
@@ -75,6 +83,10 @@ pub const KeepAlive = struct {
return;
this.status = .inactive;
// vm.pending_unref_counter +|= 1;
if (comptime Environment.isDebug) {
if (comptime @TypeOf(event_loop_ctx) == *JSC.VirtualMachine)
event_loop_ctx.vm.eventLoop().debug.removePoll(this.debug_poll_id);
}
event_loop_ctx.incrementPendingUnrefCounter();
}
@@ -83,6 +95,9 @@ pub const KeepAlive = struct {
if (this.status != .active)
return;
this.status = .inactive;
if (comptime Environment.isDebug) {
vm.eventLoop().debug.removePoll(this.debug_poll_id);
}
_ = @atomicRmw(@TypeOf(vm.pending_unref_counter), &vm.pending_unref_counter, .Add, 1, .Monotonic);
}
@@ -92,6 +107,11 @@ pub const KeepAlive = struct {
if (this.status != .inactive)
return;
this.status = .active;
if (comptime Environment.isDebug) {
if (comptime @TypeOf(event_loop_ctx) == *JSC.VirtualMachine)
event_loop_ctx.vm.eventLoop().debug.addPoll(&this.debug_poll_id);
}
event_loop_ctx.platformEventLoop().ref();
// vm.event_loop_handle.?.ref();
}
@@ -103,6 +123,11 @@ pub const KeepAlive = struct {
return;
this.status = .active;
// vm.event_loop_handle.?.refConcurrently();
if (comptime Environment.isDebug) {
if (comptime @TypeOf(event_loop_ctx) == *JSC.VirtualMachine)
event_loop_ctx.vm.eventLoop().debug.addPoll(&this.debug_poll_id);
}
event_loop_ctx.platformEventLoop().refConcurrently();
}
@@ -122,6 +147,7 @@ pub const FilePoll = struct {
fd: bun.FileDescriptor = invalid_fd,
flags: Flags.Set = Flags.Set{},
owner: Owner = undefined,
debug_poll_id: if (Environment.isDebug) i64 else void = if (Environment.isDebug) -1 else {},
/// We re-use FilePoll objects to avoid allocating new ones.
///
@@ -598,6 +624,13 @@ pub const FilePoll = struct {
/// Only intended to be used from EventLoop.Pollable
fn deactivate(this: *FilePoll, loop: *Loop) void {
if (comptime Environment.isDebug) {
if (this.event_loop_kind == .js) {
if (this.flags.contains(.has_incremented_poll_count))
JSC.VirtualMachine.get().eventLoop().debug.removePoll(this.debug_poll_id);
}
}
loop.num_polls -= @as(i32, @intFromBool(this.flags.contains(.has_incremented_poll_count)));
this.flags.remove(.has_incremented_poll_count);
@@ -610,6 +643,13 @@ pub const FilePoll = struct {
fn activate(this: *FilePoll, loop: *Loop) void {
this.flags.remove(.closed);
if (comptime Environment.isDebug) {
if (this.event_loop_kind == .js) {
if (!this.flags.contains(.has_incremented_poll_count))
JSC.VirtualMachine.get().eventLoop().debug.addPoll(&this.debug_poll_id);
}
}
loop.num_polls += @as(i32, @intFromBool(!this.flags.contains(.has_incremented_poll_count)));
this.flags.insert(.has_incremented_poll_count);

View File

@@ -1416,6 +1416,8 @@ pub fn dump_mimalloc(globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) callc
if (comptime bun.is_heap_breakdown_enabled) {
dump_zone_malloc_stats();
}
if (comptime Environment.isDebug)
globalObject.bunVM().eventLoop().debug.queryActivePolls();
return .undefined;
}
@@ -3624,7 +3626,6 @@ pub const Timer = struct {
const CallbackJob = struct {
id: i32 = 0,
task: JSC.AnyTask = undefined,
ref: JSC.Ref = JSC.Ref.init(),
globalThis: *JSC.JSGlobalObject,
callback: JSC.Strong = .{},
arguments: JSC.Strong = .{},
@@ -3658,7 +3659,6 @@ pub const Timer = struct {
pub fn deinit(this: *CallbackJob) void {
this.callback.deinit();
this.arguments.deinit();
this.ref.unref(this.globalThis.bunVM());
bun.default_allocator.destroy(this);
}
@@ -3815,12 +3815,6 @@ pub const Timer = struct {
if (vm.timer.maps.get(this.kind).getPtr(this.id)) |val_| {
if (val_.*) |*val| {
val.poll_ref.ref(vm);
if (val.did_unref_timer) {
val.did_unref_timer = false;
if (comptime Environment.isPosix)
vm.event_loop_handle.?.num_polls += 1;
}
}
}
},
@@ -3869,7 +3863,6 @@ pub const Timer = struct {
job.* = cb;
job.task = CallbackJob.Task.init(job);
job.ref.ref(vm);
// cancel the current event if exists before re-adding it
if (map.fetchSwapRemove(this.id)) |timer| {
@@ -3903,8 +3896,6 @@ pub const Timer = struct {
}
timeout.timer.?.interval = this.interval;
timeout.poll_ref.ref(vm);
// cancel the current event if exists before re-adding it
if (map.fetchSwapRemove(this.id)) |timer| {
if (timer.value != null) {
@@ -3931,12 +3922,6 @@ pub const Timer = struct {
if (vm.timer.maps.get(this.kind).getPtr(this.id)) |val_| {
if (val_.*) |*val| {
val.poll_ref.unref(vm);
if (!val.did_unref_timer) {
val.did_unref_timer = true;
if (comptime Environment.isPosix)
vm.event_loop_handle.?.num_polls -= 1;
}
}
}
},
@@ -4222,7 +4207,6 @@ pub const Timer = struct {
);
job.* = cb;
job.task = CallbackJob.Task.init(job);
job.ref.ref(vm);
if (vm.isInspectorEnabled()) {
Debugger.didScheduleAsyncCall(globalThis, .DOMTimer, timer_id.asyncID(), !repeats);
@@ -4307,7 +4291,6 @@ pub const Timer = struct {
job.* = cb;
job.task = CallbackJob.Task.init(job);
job.ref.ref(vm);
vm.enqueueTask(JSC.Task.init(&job.task));
if (vm.isInspectorEnabled()) {
@@ -4318,7 +4301,7 @@ pub const Timer = struct {
pub fn deinit(this: *Timeout) void {
JSC.markBinding(@src());
var vm = this.globalThis.bunVM();
const vm = this.globalThis.bunVM();
this.poll_ref.unref(vm);
@@ -4326,10 +4309,6 @@ pub const Timer = struct {
timer.cancelled = true;
}
if (comptime Environment.isPosix)
// balance double unreffing in doUnref
vm.event_loop_handle.?.num_polls += @as(i32, @intFromBool(this.did_unref_timer));
this.callback.deinit();
this.arguments.deinit();
}
@@ -4369,7 +4348,6 @@ pub const Timer = struct {
job.* = cb;
job.task = CallbackJob.Task.init(job);
job.ref.ref(vm);
vm.enqueueImmediateTask(JSC.Task.init(&job.task));
if (vm.isInspectorEnabled()) {
@@ -4397,7 +4375,6 @@ pub const Timer = struct {
timeout.arguments = JSC.Strong.create(arguments_array_or_zero, globalThis);
}
timeout.poll_ref.ref(vm);
map.put(vm.allocator, id, timeout) catch unreachable;
if (vm.isInspectorEnabled()) {

View File

@@ -951,8 +951,8 @@ pub const CAresNameInfo = struct {
var promise = this.promise;
const globalThis = this.globalThis;
this.promise = .{};
promise.resolve(globalThis, result);
this.deinit();
promise.resolve(globalThis, result);
}
pub fn deinit(this: *@This()) void {
@@ -1143,8 +1143,12 @@ pub const GetAddrInfoRequest = struct {
const this = @as(*GetAddrInfoRequest, @ptrFromInt(@intFromPtr(arg)));
log("getAddrInfoAsyncCallback: status={d}", .{status});
if (this.backend == .libinfo) {
if (this.backend.libinfo.file_poll) |poll| poll.deinit();
if (comptime Environment.isMac) {
if (this.backend == .libinfo) {
if (this.backend.libinfo.file_poll) |poll| {
poll.deinit();
}
}
}
if (this.resolver_for_caching) |resolver| {
@@ -1369,8 +1373,8 @@ pub const CAresReverse = struct {
var promise = this.promise;
const globalThis = this.globalThis;
this.promise = .{};
promise.resolve(globalThis, result);
this.deinit();
promise.resolve(globalThis, result);
}
pub fn deinit(this: *@This()) void {
@@ -1448,8 +1452,8 @@ pub fn CAresLookup(comptime cares_type: type, comptime type_name: []const u8) ty
var promise = this.promise;
const globalThis = this.globalThis;
this.promise = .{};
promise.resolve(globalThis, result);
this.deinit();
promise.resolve(globalThis, result);
}
pub fn deinit(this: *@This()) void {
@@ -1538,15 +1542,16 @@ pub const DNSLookup = struct {
if (result == null or result.?.node == null) {
var promise = this.promise;
var globalThis = this.globalThis;
this.promise = .{};
const error_value = globalThis.createErrorInstance("DNS lookup failed: {s}", .{"No results"});
error_value.put(
globalThis,
JSC.ZigString.static("code"),
JSC.ZigString.init("EUNREACHABLE").toValueGC(globalThis),
);
promise.reject(globalThis, error_value);
this.deinit();
promise.reject(globalThis, error_value);
return;
}
this.onComplete(result.?);
@@ -1564,8 +1569,8 @@ pub const DNSLookup = struct {
var promise = this.promise;
this.promise = .{};
const globalThis = this.globalThis;
promise.resolve(globalThis, result);
this.deinit();
promise.resolve(globalThis, result);
}
pub fn deinit(this: *DNSLookup) void {

View File

@@ -671,6 +671,49 @@ pub const EventLoop = struct {
_prev_is_inside_tick_queue: bool = false,
last_fn_name: bun.String = bun.String.empty,
track_last_fn_name: bool = false,
lock: bun.Lock = bun.Lock.init(),
last_poll_id: i64 = 1,
active_polls: std.AutoHashMapUnmanaged(i64, void) = .{},
pub fn addPoll(this: *@This(), id: *i64) void {
this.lock.lock();
defer this.lock.unlock();
if (id.* <= 0) {
this.last_poll_id += 1;
id.* = this.last_poll_id;
}
// so it can be referenced in a debugger
const this_id = id.*;
const gpe = this.active_polls.getOrPut(bun.default_allocator, this_id) catch unreachable;
if (!gpe.found_existing) {
log("+ poll({d})", .{this_id});
}
}
pub fn queryActivePolls(this: *@This()) void {
var iter = this.active_polls.keyIterator();
const writer = bun.Output.writer();
writer.writeAll("\nPolls: [") catch unreachable;
while (iter.next()) |id| {
writer.print(" {d}, ", .{id}) catch unreachable;
}
writer.writeAll("]\n") catch unreachable;
bun.Output.flush();
}
pub fn removePoll(this: *@This(), id: i64) void {
if (id < 0) return;
this.lock.lock();
defer this.lock.unlock();
if (this.active_polls.remove(id)) {
log("- poll({d})", .{id});
}
}
pub fn enter(this: *Debug) void {
this._prev_is_inside_tick_queue = this.is_inside_tick_queue;

View File

@@ -933,11 +933,13 @@ class ClientHttp2Session extends Http2Session {
}
#onClose() {
this[bunHTTP2Socket]?.unref?.();
this.#parser = null;
this[bunHTTP2Socket] = null;
this.emit("close");
}
#onError(error: Error) {
this[bunHTTP2Socket]?.unref?.();
this.#parser = null;
this[bunHTTP2Socket] = null;
this.emit("error", error);