mirror of
https://github.com/oven-sh/bun
synced 2026-02-13 04:18:58 +00:00
Refactor Socket to use jsc.JSRef instead of hasPendingActivity
This migrates the Socket implementation from using the hasPendingActivity pattern to using jsc.JSRef for managing JavaScript object lifecycle. Changes: - Replace `this_value: jsc.JSValue` with `this_value: jsc.JSRef` - Remove `has_pending_activity` atomic field and `hasPendingActivity()` function - Add `computeHasPendingActivity()` to determine pending activity state - Add `updateHasPendingActivity()` to upgrade/downgrade JSRef based on activity - Update `getThisValue()` to use JSRef.tryGet() and setWeak() - Update `finalize()` to call this_value.finalize() - Remove `hasPendingActivity: true` from sockets.classes.ts - Update all socket initialization sites to use jsc.JSRef.empty() - Update setData() to get JSValue from JSRef before passing to dataSetCached() This follows the same pattern as the subprocess JSRef refactor and provides better GC lifecycle management by using weak/strong references instead of the hasPendingActivity callback. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -61,7 +61,7 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
ref_count: RefCount,
|
||||
wrapped: WrappedType = .none,
|
||||
handlers: ?*Handlers,
|
||||
this_value: jsc.JSValue = .zero,
|
||||
this_value: jsc.JSRef = jsc.JSRef.empty(),
|
||||
poll_ref: Async.KeepAlive = Async.KeepAlive.init(),
|
||||
ref_pollref_on_connect: bool = true,
|
||||
connection: ?Listener.UnixOrHost = null,
|
||||
@@ -70,14 +70,24 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
buffered_data_for_node_net: bun.ByteList = .{},
|
||||
bytes_written: u64 = 0,
|
||||
|
||||
// TODO: switch to something that uses `visitAggregate` and have the
|
||||
// `Listener` keep a list of all the sockets JSValue in there
|
||||
// This is wasteful because it means we are keeping a JSC::Weak for every single open socket
|
||||
has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
|
||||
native_callback: NativeCallbacks = .none,
|
||||
|
||||
pub fn hasPendingActivity(this: *This) callconv(.C) bool {
|
||||
return this.has_pending_activity.load(.acquire);
|
||||
pub fn computeHasPendingActivity(this: *const This) bool {
|
||||
return this.flags.is_active;
|
||||
}
|
||||
|
||||
pub fn updateHasPendingActivity(this: *This) void {
|
||||
const has_pending = this.computeHasPendingActivity();
|
||||
if (comptime Environment.isDebug) {
|
||||
log("updateHasPendingActivity() -> {any}", .{has_pending});
|
||||
}
|
||||
|
||||
// Upgrade or downgrade the reference based on pending activity
|
||||
if (has_pending) {
|
||||
this.this_value.upgrade(this.getHandlers().globalObject);
|
||||
} else {
|
||||
this.this_value.downgrade();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn memoryCost(this: *This) usize {
|
||||
@@ -321,11 +331,6 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
if (callback == .zero) {
|
||||
if (handlers.promise.trySwap()) |promise| {
|
||||
handlers.promise.deinit();
|
||||
if (this.this_value != .zero) {
|
||||
this.this_value = .zero;
|
||||
}
|
||||
this.has_pending_activity.store(false, .release);
|
||||
|
||||
// reject the promise on connect() error
|
||||
const err_value = err.toErrorInstance(globalObject);
|
||||
promise.asPromise().?.reject(globalObject, err_value) catch {}; // TODO: properly propagate exception upwards
|
||||
@@ -335,8 +340,6 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
}
|
||||
|
||||
const this_value = this.getThisValue(globalObject);
|
||||
this.this_value = .zero;
|
||||
this.has_pending_activity.store(false, .release);
|
||||
|
||||
const err_value = err.toErrorInstance(globalObject);
|
||||
const result = callback.call(globalObject, this_value, &[_]JSValue{ this_value, err_value }) catch |e| globalObject.takeException(e);
|
||||
@@ -362,7 +365,7 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
if (!this.flags.is_active) {
|
||||
this.getHandlers().markActive();
|
||||
this.flags.is_active = true;
|
||||
this.has_pending_activity.store(true, .release);
|
||||
this.updateHasPendingActivity();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -392,7 +395,7 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
const vm = handlers.vm;
|
||||
handlers.markInactive();
|
||||
this.poll_ref.unref(vm);
|
||||
this.has_pending_activity.store(false, .release);
|
||||
this.updateHasPendingActivity();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,14 +491,16 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
}
|
||||
|
||||
pub fn getThisValue(this: *This, globalObject: *jsc.JSGlobalObject) JSValue {
|
||||
if (this.this_value == .zero) {
|
||||
const value = this.toJS(globalObject);
|
||||
value.ensureStillAlive();
|
||||
this.this_value = value;
|
||||
if (this.this_value.tryGet()) |value| {
|
||||
return value;
|
||||
}
|
||||
|
||||
return this.this_value;
|
||||
const value = this.toJS(globalObject);
|
||||
value.ensureStillAlive();
|
||||
this.this_value.setWeak(value);
|
||||
// Immediately upgrade if there's pending activity
|
||||
this.updateHasPendingActivity();
|
||||
return value;
|
||||
}
|
||||
|
||||
pub fn onEnd(this: *This, _: Socket) void {
|
||||
@@ -683,7 +688,8 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
|
||||
pub fn setData(this: *This, globalObject: *jsc.JSGlobalObject, value: jsc.JSValue) void {
|
||||
log("setData()", .{});
|
||||
This.js.dataSetCached(this.this_value, globalObject, value);
|
||||
const this_value = this.getThisValue(globalObject);
|
||||
This.js.dataSetCached(this_value, globalObject, value);
|
||||
}
|
||||
|
||||
pub fn getListener(this: *This, _: *jsc.JSGlobalObject) JSValue {
|
||||
@@ -1322,6 +1328,10 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
pub fn finalize(this: *This) void {
|
||||
log("finalize() {d} {}", .{ @intFromPtr(this), this.socket_context != null });
|
||||
this.flags.finalizing = true;
|
||||
|
||||
// Finalize the JSRef before closing
|
||||
this.this_value.finalize();
|
||||
|
||||
if (!this.socket.isClosed()) {
|
||||
this.closeAndDetach(.failure);
|
||||
}
|
||||
@@ -1436,7 +1446,7 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
var tls = bun.new(TLSSocket, .{
|
||||
.ref_count = .init(),
|
||||
.handlers = handlers_ptr,
|
||||
.this_value = .zero,
|
||||
.this_value = jsc.JSRef.empty(),
|
||||
.socket = TLSSocket.Socket.detached,
|
||||
.connection = if (this.connection) |c| c.clone() else null,
|
||||
.wrapped = .tls,
|
||||
@@ -1516,7 +1526,7 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
const raw = bun.new(TLSSocket, .{
|
||||
.ref_count = .init(),
|
||||
.handlers = raw_handlers_ptr,
|
||||
.this_value = .zero,
|
||||
.this_value = jsc.JSRef.empty(),
|
||||
.socket = new_socket,
|
||||
.connection = if (this.connection) |c| c.clone() else null,
|
||||
.wrapped = .tcp,
|
||||
@@ -1552,7 +1562,7 @@ pub fn NewSocket(comptime ssl: bool) type {
|
||||
// the connection can be upgraded inside a handler call so we need to guarantee that it will be still alive
|
||||
this.getHandlers().markInactive();
|
||||
|
||||
this.has_pending_activity.store(false, .release);
|
||||
this.updateHasPendingActivity();
|
||||
}
|
||||
|
||||
const array = try jsc.JSValue.createEmptyArray(globalObject, 2);
|
||||
@@ -1934,7 +1944,7 @@ pub fn jsUpgradeDuplexToTLS(globalObject: *jsc.JSGlobalObject, callframe: *jsc.C
|
||||
var tls = bun.new(TLSSocket, .{
|
||||
.ref_count = .init(),
|
||||
.handlers = handlers_ptr,
|
||||
.this_value = .zero,
|
||||
.this_value = jsc.JSRef.empty(),
|
||||
.socket = TLSSocket.Socket.detached,
|
||||
.connection = null,
|
||||
.wrapped = .tls,
|
||||
|
||||
@@ -365,7 +365,7 @@ pub fn onCreate(comptime ssl: bool, socket: uws.NewSocketHandler(ssl)) void {
|
||||
const this_socket = bun.new(Socket, .{
|
||||
.ref_count = .init(),
|
||||
.handlers = &listener.handlers,
|
||||
.this_value = .zero,
|
||||
.this_value = jsc.JSRef.empty(),
|
||||
.socket = socket,
|
||||
.protos = listener.protos,
|
||||
.flags = .{ .owned_protos = false },
|
||||
@@ -758,7 +758,7 @@ pub fn connectInner(globalObject: *jsc.JSGlobalObject, prev_maybe_tcp: ?*TCPSock
|
||||
prev_maybe_tcp;
|
||||
|
||||
const socket = if (maybe_previous) |prev| blk: {
|
||||
bun.assert(prev.this_value != .zero);
|
||||
bun.assert(prev.this_value.tryGet() != null);
|
||||
if (prev.handlers) |prev_handlers| {
|
||||
prev_handlers.deinit();
|
||||
handlers.vm.allocator.destroy(prev_handlers);
|
||||
@@ -773,7 +773,7 @@ pub fn connectInner(globalObject: *jsc.JSGlobalObject, prev_maybe_tcp: ?*TCPSock
|
||||
} else bun.new(SocketType, .{
|
||||
.ref_count = .init(),
|
||||
.handlers = handlers_ptr,
|
||||
.this_value = .zero,
|
||||
.this_value = jsc.JSRef.empty(),
|
||||
.socket = SocketType.Socket.detached,
|
||||
.connection = connection,
|
||||
.protos = if (ssl) |s| s.takeProtos() else null,
|
||||
|
||||
@@ -4,7 +4,6 @@ function generate(ssl) {
|
||||
return define({
|
||||
name: !ssl ? "TCPSocket" : "TLSSocket",
|
||||
JSType: "0b11101110",
|
||||
hasPendingActivity: true,
|
||||
noConstructor: true,
|
||||
configurable: false,
|
||||
memoryCost: true,
|
||||
|
||||
Reference in New Issue
Block a user