diff --git a/src/bun.js/api/valkey.classes.ts b/src/bun.js/api/valkey.classes.ts index 8d0ae0d976..c3ec23e0df 100644 --- a/src/bun.js/api/valkey.classes.ts +++ b/src/bun.js/api/valkey.classes.ts @@ -229,6 +229,6 @@ export default [ punsubscribe: { fn: "punsubscribe" }, pubsub: { fn: "pubsub" }, }, - values: ["onconnect", "onclose", "connectionPromise", "hello", "subscriptionCallbackMap"], + values: ["onconnect", "onclose", "connectionPromise", "hello"], }), ]; diff --git a/src/valkey/js_valkey.zig b/src/valkey/js_valkey.zig index c1f5800025..e35f03df28 100644 --- a/src/valkey/js_valkey.zig +++ b/src/valkey/js_valkey.zig @@ -1,219 +1,210 @@ -pub const SubscriptionCtx = struct { - const Self = @This(); +fn SubscriptionCtx(comptime ParentType: type, comptime _: []const u8) type { + return struct { + const Self = @This(); - _parent: *JSValkeyClient, - original_enable_offline_queue: bool, - original_enable_auto_pipelining: bool, + original_enable_offline_queue: bool, + original_enable_auto_pipelining: bool, + _subscriptionCallbackMap: jsc.JSValue, + parent_ref: *ParentType, - const ParentJS = JSValkeyClient.js; + const ParentJS = JSValkeyClient.js; - pub fn init(parent: *JSValkeyClient, enable_offline_queue: bool, enable_auto_pipelining: bool) Self { - const callback_map = jsc.JSMap.create(parent.globalObject); - ParentJS.gc.set(.subscriptionCallbackMap, parent.this_value.get(), parent.globalObject, callback_map); + fn parent(this: *Self) *ParentType { + return this.parent_ref; + } - const self = Self{ - ._parent = parent, - .original_enable_offline_queue = enable_offline_queue, - .original_enable_auto_pipelining = enable_auto_pipelining, - }; - return self; - } + pub fn init(global_object: *jsc.JSGlobalObject, parent_ptr: *ParentType, enable_offline_queue: bool, enable_auto_pipelining: bool) Self { + const self = Self{ + .original_enable_offline_queue = enable_offline_queue, + .original_enable_auto_pipelining = enable_auto_pipelining, + ._subscriptionCallbackMap = jsc.JSMap.create(global_object), + .parent_ref = parent_ptr, + }; + return self; + } - fn subscriptionCallbackMap(this: *Self) *jsc.JSMap { - const value_js = ParentJS.gc.get(.subscriptionCallbackMap, this._parent.this_value.get()).?; - return jsc.JSMap.fromJS(value_js).?; - } + fn subscriptionCallbackMap(this: *Self) *jsc.JSMap { + return jsc.JSMap.fromJS(this._subscriptionCallbackMap) orelse unreachable; + } - /// Get the total number of channels that this subscription context is subscribed to. - pub fn subscriptionCount(this: *Self, globalObject: *jsc.JSGlobalObject) usize { - return this.subscriptionCallbackMap().size(globalObject); - } + /// Get the total number of channels that this subscription context is subscribed to. + pub fn subscriptionCount(this: *Self, globalObject: *jsc.JSGlobalObject) usize { + return this.subscriptionCallbackMap().size(globalObject); + } - /// Test whether this context has any subscriptions. It is mandatory to - /// guard deinit with this function. - pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bool { - return this.subscriptionCount(globalObject) > 0; - } + /// Test whether this context has any subscriptions. It is mandatory to + /// guard deinit with this function. + pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bool { + return this.subscriptionCount(globalObject) > 0; + } - pub fn clearReceiveHandlers( + pub fn clearReceiveHandlers( this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue, ) void { - const map = this.subscriptionCallbackMap(); - if (map.remove(globalObject, channelName)) { - this._parent.channel_subscription_count -= 1; - this._parent.updateHasPendingActivity(); + if (this.subscriptionCallbackMap().remove(globalObject, channelName)) { + this.parent().channel_subscription_count -= 1; + this.parent().updateHasPendingActivity(); + } } - } - /// Remove a specific receive handler. - /// - /// Returns: The total number of remaining handlers for this channel, or null if here were no listeners originally - /// registered. - /// - /// Note: This function will empty out the map entry if there are no more handlers registered. - pub fn removeReceiveHandler( + /// Remove a specific receive handler. + /// + /// Returns: The total number of remaining handlers for this channel, or null if here were no listeners + /// originally registered. + /// + /// Note: This function will empty out the map entry if there are no more handlers registered. + pub fn removeReceiveHandler( this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue, callback: JSValue, ) !?usize { - const map = this.subscriptionCallbackMap(); + const existing = try this.subscriptionCallbackMap().get(globalObject, channelName); - const existing = try map.get(globalObject, channelName); + if (existing == null) { + // Could not find the channel, nothing to remove. + return null; + } - if (existing == null) { - // Could not find the channel, nothing to remove. - return null; - } + if (existing.?.isUndefinedOrNull()) { + // Nothing to remove. + return null; + } - if (existing.?.isUndefinedOrNull()) { - // Nothing to remove. - return null; - } + // Existing is guaranteed to be an array of callbacks. + bun.assert(existing.?.isArray()); - // Existing is guaranteed to be an array of callbacks. - bun.assert(existing.?.isArray()); - - // TODO(markovejnovic): I can't find a better way to do this... I generate a new array, - // filtering out the callback we want to remove. This is woefully inefficient for large - // sets (and surprisingly fast for small sets of callbacks). - // - // Perhaps there is an avenue to build a generic iterator pattern? @taylor.fish and I have - // briefly expressed a desire for this, and I promised her I would look into it, but at - // this moment have no proposal. - var array_it = try existing.?.arrayIterator(globalObject); - const updated_array = try jsc.JSArray.createEmpty(globalObject, 0); - while (try array_it.next()) |iter| { - if (iter == callback) + // TODO(markovejnovic): I can't find a better way to do this... I generate a new array, + // filtering out the callback we want to remove. This is woefully inefficient for large + // sets (and surprisingly fast for small sets of callbacks). + // + // Perhaps there is an avenue to build a generic iterator pattern? @taylor.fish and I have + // briefly expressed a desire for this, and I promised her I would look into it, but at + // this moment have no proposal. + var array_it = try existing.?.arrayIterator(globalObject); + const updated_array = try jsc.JSArray.createEmpty(globalObject, 0); + while (try array_it.next()) |iter| { + if (iter == callback) continue; - try updated_array.push(globalObject, iter); + try updated_array.push(globalObject, iter); + } + + // Otherwise, we have ourselves an array of callbacks. We need to remove the element in the + // array that matches the callback. + _ = this.subscriptionCallbackMap().remove(globalObject, channelName); + + // Only populate the map if we have remaining callbacks for this channel. + const new_length = (updated_array.arrayIterator(globalObject) catch unreachable).len; + if (new_length != 0) { + this.subscriptionCallbackMap().set(globalObject, channelName, updated_array); + } else { + this.parent().channel_subscription_count -= 1; + this.parent().updateHasPendingActivity(); + } + + return new_length; } - // Otherwise, we have ourselves an array of callbacks. We need to remove the element in the - // array that matches the callback. - _ = map.remove(globalObject, channelName); - - // Only populate the map if we have remaining callbacks for this channel. - const new_length = (updated_array.arrayIterator(globalObject) catch unreachable).len; - if (new_length != 0) { - map.set(globalObject, channelName, updated_array); - } else { - this._parent.channel_subscription_count -= 1; - this._parent.updateHasPendingActivity(); - } - - return new_length; - } - - /// Add a handler for receiving messages on a specific channel - pub fn upsertReceiveHandler( + /// Add a handler for receiving messages on a specific channel + pub fn upsertReceiveHandler( this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue, callback: JSValue, ) bun.JSError!void { - const map = this.subscriptionCallbackMap(); - - var handlers_array: JSValue = undefined; - var is_new_channel = false; - if (try map.get(globalObject, channelName)) |existing_handler_arr| { - debug("Adding a new receive handler.", .{}); - if (existing_handler_arr.isUndefined()) { - // Create a new array if the existing_handler_arr is undefined/null + var handlers_array: JSValue = undefined; + var is_new_channel = false; + if (try this.subscriptionCallbackMap().get(globalObject, channelName)) |existing_handler_arr| { + debug("Adding a new receive handler.", .{}); + if (existing_handler_arr.isUndefined()) { + // Create a new array if the existing_handler_arr is undefined/null + handlers_array = try jsc.JSArray.createEmpty(globalObject, 0); + is_new_channel = true; + } else if (existing_handler_arr.isArray()) { + // Use the existing array + handlers_array = existing_handler_arr; + } else unreachable; + } else { + // No existing_handler_arr exists, create a new array handlers_array = try jsc.JSArray.createEmpty(globalObject, 0); is_new_channel = true; - } else if (existing_handler_arr.isArray()) { - // Use the existing array - handlers_array = existing_handler_arr; - } else unreachable; - } else { - // No existing_handler_arr exists, create a new array - handlers_array = try jsc.JSArray.createEmpty(globalObject, 0); - is_new_channel = true; - } + } - // Append the new callback to the array - try handlers_array.push(globalObject, callback); - - // Set the updated array back in the map - map.set(globalObject, channelName, handlers_array); - - // Update subscription count if this is a new channel - if (is_new_channel) { - this._parent.channel_subscription_count += 1; - this._parent.updateHasPendingActivity(); - } - } - - pub fn registerCallback(this: *Self, globalObject: *jsc.JSGlobalObject, eventString: JSValue, callback: JSValue) bun.JSError!void { - this.subscriptionCallbackMap().set(globalObject, eventString, callback); - } - - pub fn getCallbacks(this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue { - const result = try this.subscriptionCallbackMap().get(globalObject, channelName); - if (result) |r| { - if (r.isUndefinedOrNull()) { - return null; + try handlers_array.push(globalObject, callback); + this.subscriptionCallbackMap().set(globalObject, channelName, handlers_array); + // Update subscription count if this is a new channel + if (is_new_channel) { + this.parent().channel_subscription_count += 1; + this.parent().updateHasPendingActivity(); } } - return result; - } + pub fn registerCallback(this: *Self, globalObject: *jsc.JSGlobalObject, eventString: JSValue, callback: JSValue) bun.JSError!void { + this._.set(globalObject, eventString, callback); + } - /// Invoke callbacks for a channel with the given arguments - /// Handles both single callbacks and arrays of callbacks - pub fn invokeCallback( + pub fn getCallbacks(this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue { + const result = try this.subscriptionCallbackMap().get(globalObject, channelName); + if (result) |r| { + if (r.isUndefinedOrNull()) { + return null; + } + } + + return result; + } + + /// Invoke callbacks for a channel with the given arguments + /// Handles both single callbacks and arrays of callbacks + pub fn invokeCallback( this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue, args: []const JSValue, ) bun.JSError!void { - const callbacks = try this.getCallbacks(globalObject, channelName) orelse { - debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)}); - return; - }; - - // If callbacks is an array, iterate and call each one - if (callbacks.isArray()) { - var iter = try callbacks.arrayIterator(globalObject); - while (try iter.next()) |callback| { - if (callback.isCallable()) { - _ = callback.call(globalObject, .js_undefined, args) catch |e| { - return e; - }; - } - } - } else if (callbacks.isCallable()) { - _ = callbacks.call(globalObject, .js_undefined, args) catch |e| { - return e; + const callbacks = try this.getCallbacks(globalObject, channelName) orelse { + debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)}); + return; }; + + // If callbacks is an array, iterate and call each one + if (callbacks.isArray()) { + var iter = try callbacks.arrayIterator(globalObject); + while (try iter.next()) |callback| { + if (callback.isCallable()) { + _ = callback.call(globalObject, .js_undefined, args) catch |e| { + return e; + }; + } + } + } else if (callbacks.isCallable()) { + _ = callbacks.call(globalObject, .js_undefined, args) catch |e| { + return e; + }; + } } - } - pub fn deinit(this: *Self) void { - this._parent.channel_subscription_count = 0; - this._parent.updateHasPendingActivity(); - - ParentJS.gc.set( - .subscriptionCallbackMap, - this._parent.this_value.get(), - this._parent.globalObject, - .js_undefined, - ); - } -}; + pub fn deinit(this: *Self) void { + this.parent().channel_subscription_count = 0; + this.parent().updateHasPendingActivity(); + this._subscriptionCallbackMap = .js_undefined; + } + }; +} /// Valkey client wrapper for JavaScript pub const JSValkeyClient = struct { + const SubscriptionCtxType = SubscriptionCtx(JSValkeyClient, "_subscription_ctx"); + client: valkey.ValkeyClient, globalObject: *jsc.JSGlobalObject, this_value: jsc.JSRef = jsc.JSRef.empty(), poll_ref: bun.Async.KeepAlive = .{}, - _subscription_ctx: ?SubscriptionCtx, + _subscription_ctx: ?SubscriptionCtxType, channel_subscription_count: u32 = 0, has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true), @@ -446,14 +437,15 @@ pub const JSValkeyClient = struct { pub fn getOrCreateSubscriptionCtxEnteringSubscriptionMode( this: *JSValkeyClient, - ) *SubscriptionCtx { + ) *SubscriptionCtxType { if (this._subscription_ctx) |*ctx| { // If we already have a subscription context, return it return ctx; } // Save the original flag values and create a new subscription context - this._subscription_ctx = SubscriptionCtx.init( + this._subscription_ctx = SubscriptionCtxType.init( + this.globalObject, this, this.client.flags.enable_offline_queue, this.client.flags.auto_pipelining, @@ -1222,6 +1214,7 @@ fn SocketHandler(comptime ssl: bool) type { pub fn onClose(this: *JSValkeyClient, _: SocketType, _: i32, _: ?*anyopaque) void { // Ensure the socket pointer is updated. + this.client.socket = .{ .SocketTCP = .detached }; this.client.onClose(); }