diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig index f7c151f9b5..e5e4ed6734 100644 --- a/src/bun.js/api/bun/socket.zig +++ b/src/bun.js/api/bun/socket.zig @@ -61,7 +61,7 @@ pub fn NewSocket(comptime ssl: bool) type { ref_count: RefCount, wrapped: WrappedType = .none, handlers: ?*Handlers, - this_value: jsc.JSValue = .zero, + this_value: jsc.JSRef = jsc.JSRef.empty(), poll_ref: Async.KeepAlive = Async.KeepAlive.init(), ref_pollref_on_connect: bool = true, connection: ?Listener.UnixOrHost = null, @@ -70,14 +70,24 @@ pub fn NewSocket(comptime ssl: bool) type { buffered_data_for_node_net: bun.ByteList = .{}, bytes_written: u64 = 0, - // TODO: switch to something that uses `visitAggregate` and have the - // `Listener` keep a list of all the sockets JSValue in there - // This is wasteful because it means we are keeping a JSC::Weak for every single open socket - has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true), native_callback: NativeCallbacks = .none, - pub fn hasPendingActivity(this: *This) callconv(.C) bool { - return this.has_pending_activity.load(.acquire); + pub fn computeHasPendingActivity(this: *const This) bool { + return this.flags.is_active; + } + + pub fn updateHasPendingActivity(this: *This) void { + const has_pending = this.computeHasPendingActivity(); + if (comptime Environment.isDebug) { + log("updateHasPendingActivity() -> {any}", .{has_pending}); + } + + // Upgrade or downgrade the reference based on pending activity + if (has_pending) { + this.this_value.upgrade(this.getHandlers().globalObject); + } else { + this.this_value.downgrade(); + } } pub fn memoryCost(this: *This) usize { @@ -321,11 +331,6 @@ pub fn NewSocket(comptime ssl: bool) type { if (callback == .zero) { if (handlers.promise.trySwap()) |promise| { handlers.promise.deinit(); - if (this.this_value != .zero) { - this.this_value = .zero; - } - this.has_pending_activity.store(false, .release); - // reject the promise on connect() error const err_value = err.toErrorInstance(globalObject); promise.asPromise().?.reject(globalObject, err_value) catch {}; // TODO: properly propagate exception upwards @@ -335,8 +340,6 @@ pub fn NewSocket(comptime ssl: bool) type { } const this_value = this.getThisValue(globalObject); - this.this_value = .zero; - this.has_pending_activity.store(false, .release); const err_value = err.toErrorInstance(globalObject); const result = callback.call(globalObject, this_value, &[_]JSValue{ this_value, err_value }) catch |e| globalObject.takeException(e); @@ -362,7 +365,7 @@ pub fn NewSocket(comptime ssl: bool) type { if (!this.flags.is_active) { this.getHandlers().markActive(); this.flags.is_active = true; - this.has_pending_activity.store(true, .release); + this.updateHasPendingActivity(); } } @@ -392,7 +395,7 @@ pub fn NewSocket(comptime ssl: bool) type { const vm = handlers.vm; handlers.markInactive(); this.poll_ref.unref(vm); - this.has_pending_activity.store(false, .release); + this.updateHasPendingActivity(); } } @@ -488,14 +491,16 @@ pub fn NewSocket(comptime ssl: bool) type { } pub fn getThisValue(this: *This, globalObject: *jsc.JSGlobalObject) JSValue { - if (this.this_value == .zero) { - const value = this.toJS(globalObject); - value.ensureStillAlive(); - this.this_value = value; + if (this.this_value.tryGet()) |value| { return value; } - return this.this_value; + const value = this.toJS(globalObject); + value.ensureStillAlive(); + this.this_value.setWeak(value); + // Immediately upgrade if there's pending activity + this.updateHasPendingActivity(); + return value; } pub fn onEnd(this: *This, _: Socket) void { @@ -683,7 +688,8 @@ pub fn NewSocket(comptime ssl: bool) type { pub fn setData(this: *This, globalObject: *jsc.JSGlobalObject, value: jsc.JSValue) void { log("setData()", .{}); - This.js.dataSetCached(this.this_value, globalObject, value); + const this_value = this.getThisValue(globalObject); + This.js.dataSetCached(this_value, globalObject, value); } pub fn getListener(this: *This, _: *jsc.JSGlobalObject) JSValue { @@ -1322,6 +1328,10 @@ pub fn NewSocket(comptime ssl: bool) type { pub fn finalize(this: *This) void { log("finalize() {d} {}", .{ @intFromPtr(this), this.socket_context != null }); this.flags.finalizing = true; + + // Finalize the JSRef before closing + this.this_value.finalize(); + if (!this.socket.isClosed()) { this.closeAndDetach(.failure); } @@ -1436,7 +1446,7 @@ pub fn NewSocket(comptime ssl: bool) type { var tls = bun.new(TLSSocket, .{ .ref_count = .init(), .handlers = handlers_ptr, - .this_value = .zero, + .this_value = jsc.JSRef.empty(), .socket = TLSSocket.Socket.detached, .connection = if (this.connection) |c| c.clone() else null, .wrapped = .tls, @@ -1516,7 +1526,7 @@ pub fn NewSocket(comptime ssl: bool) type { const raw = bun.new(TLSSocket, .{ .ref_count = .init(), .handlers = raw_handlers_ptr, - .this_value = .zero, + .this_value = jsc.JSRef.empty(), .socket = new_socket, .connection = if (this.connection) |c| c.clone() else null, .wrapped = .tcp, @@ -1552,7 +1562,7 @@ pub fn NewSocket(comptime ssl: bool) type { // the connection can be upgraded inside a handler call so we need to guarantee that it will be still alive this.getHandlers().markInactive(); - this.has_pending_activity.store(false, .release); + this.updateHasPendingActivity(); } const array = try jsc.JSValue.createEmptyArray(globalObject, 2); @@ -1934,7 +1944,7 @@ pub fn jsUpgradeDuplexToTLS(globalObject: *jsc.JSGlobalObject, callframe: *jsc.C var tls = bun.new(TLSSocket, .{ .ref_count = .init(), .handlers = handlers_ptr, - .this_value = .zero, + .this_value = jsc.JSRef.empty(), .socket = TLSSocket.Socket.detached, .connection = null, .wrapped = .tls, diff --git a/src/bun.js/api/bun/socket/Listener.zig b/src/bun.js/api/bun/socket/Listener.zig index 75252b70e2..f0b3df2c7f 100644 --- a/src/bun.js/api/bun/socket/Listener.zig +++ b/src/bun.js/api/bun/socket/Listener.zig @@ -365,7 +365,7 @@ pub fn onCreate(comptime ssl: bool, socket: uws.NewSocketHandler(ssl)) void { const this_socket = bun.new(Socket, .{ .ref_count = .init(), .handlers = &listener.handlers, - .this_value = .zero, + .this_value = jsc.JSRef.empty(), .socket = socket, .protos = listener.protos, .flags = .{ .owned_protos = false }, @@ -758,7 +758,7 @@ pub fn connectInner(globalObject: *jsc.JSGlobalObject, prev_maybe_tcp: ?*TCPSock prev_maybe_tcp; const socket = if (maybe_previous) |prev| blk: { - bun.assert(prev.this_value != .zero); + bun.assert(prev.this_value.tryGet() != null); if (prev.handlers) |prev_handlers| { prev_handlers.deinit(); handlers.vm.allocator.destroy(prev_handlers); @@ -773,7 +773,7 @@ pub fn connectInner(globalObject: *jsc.JSGlobalObject, prev_maybe_tcp: ?*TCPSock } else bun.new(SocketType, .{ .ref_count = .init(), .handlers = handlers_ptr, - .this_value = .zero, + .this_value = jsc.JSRef.empty(), .socket = SocketType.Socket.detached, .connection = connection, .protos = if (ssl) |s| s.takeProtos() else null, diff --git a/src/bun.js/api/sockets.classes.ts b/src/bun.js/api/sockets.classes.ts index 13d89453ff..883f2c6000 100644 --- a/src/bun.js/api/sockets.classes.ts +++ b/src/bun.js/api/sockets.classes.ts @@ -4,7 +4,6 @@ function generate(ssl) { return define({ name: !ssl ? "TCPSocket" : "TLSSocket", JSType: "0b11101110", - hasPendingActivity: true, noConstructor: true, configurable: false, memoryCost: true,