Implement Jarred's advice

This commit is contained in:
Marko Vejnovic
2025-09-09 23:44:08 -07:00
parent dc3c8f79c4
commit 89feea25c8
2 changed files with 157 additions and 164 deletions

View File

@@ -229,6 +229,6 @@ export default [
punsubscribe: { fn: "punsubscribe" },
pubsub: { fn: "pubsub" },
},
values: ["onconnect", "onclose", "connectionPromise", "hello", "subscriptionCallbackMap"],
values: ["onconnect", "onclose", "connectionPromise", "hello"],
}),
];

View File

@@ -1,219 +1,210 @@
pub const SubscriptionCtx = struct {
const Self = @This();
fn SubscriptionCtx(comptime ParentType: type, comptime _: []const u8) type {
return struct {
const Self = @This();
_parent: *JSValkeyClient,
original_enable_offline_queue: bool,
original_enable_auto_pipelining: bool,
original_enable_offline_queue: bool,
original_enable_auto_pipelining: bool,
_subscriptionCallbackMap: jsc.JSValue,
parent_ref: *ParentType,
const ParentJS = JSValkeyClient.js;
const ParentJS = JSValkeyClient.js;
pub fn init(parent: *JSValkeyClient, enable_offline_queue: bool, enable_auto_pipelining: bool) Self {
const callback_map = jsc.JSMap.create(parent.globalObject);
ParentJS.gc.set(.subscriptionCallbackMap, parent.this_value.get(), parent.globalObject, callback_map);
fn parent(this: *Self) *ParentType {
return this.parent_ref;
}
const self = Self{
._parent = parent,
.original_enable_offline_queue = enable_offline_queue,
.original_enable_auto_pipelining = enable_auto_pipelining,
};
return self;
}
pub fn init(global_object: *jsc.JSGlobalObject, parent_ptr: *ParentType, enable_offline_queue: bool, enable_auto_pipelining: bool) Self {
const self = Self{
.original_enable_offline_queue = enable_offline_queue,
.original_enable_auto_pipelining = enable_auto_pipelining,
._subscriptionCallbackMap = jsc.JSMap.create(global_object),
.parent_ref = parent_ptr,
};
return self;
}
fn subscriptionCallbackMap(this: *Self) *jsc.JSMap {
const value_js = ParentJS.gc.get(.subscriptionCallbackMap, this._parent.this_value.get()).?;
return jsc.JSMap.fromJS(value_js).?;
}
fn subscriptionCallbackMap(this: *Self) *jsc.JSMap {
return jsc.JSMap.fromJS(this._subscriptionCallbackMap) orelse unreachable;
}
/// Get the total number of channels that this subscription context is subscribed to.
pub fn subscriptionCount(this: *Self, globalObject: *jsc.JSGlobalObject) usize {
return this.subscriptionCallbackMap().size(globalObject);
}
/// Get the total number of channels that this subscription context is subscribed to.
pub fn subscriptionCount(this: *Self, globalObject: *jsc.JSGlobalObject) usize {
return this.subscriptionCallbackMap().size(globalObject);
}
/// Test whether this context has any subscriptions. It is mandatory to
/// guard deinit with this function.
pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bool {
return this.subscriptionCount(globalObject) > 0;
}
/// Test whether this context has any subscriptions. It is mandatory to
/// guard deinit with this function.
pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bool {
return this.subscriptionCount(globalObject) > 0;
}
pub fn clearReceiveHandlers(
pub fn clearReceiveHandlers(
this: *Self,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
) void {
const map = this.subscriptionCallbackMap();
if (map.remove(globalObject, channelName)) {
this._parent.channel_subscription_count -= 1;
this._parent.updateHasPendingActivity();
if (this.subscriptionCallbackMap().remove(globalObject, channelName)) {
this.parent().channel_subscription_count -= 1;
this.parent().updateHasPendingActivity();
}
}
}
/// Remove a specific receive handler.
///
/// Returns: The total number of remaining handlers for this channel, or null if here were no listeners originally
/// registered.
///
/// Note: This function will empty out the map entry if there are no more handlers registered.
pub fn removeReceiveHandler(
/// Remove a specific receive handler.
///
/// Returns: The total number of remaining handlers for this channel, or null if here were no listeners
/// originally registered.
///
/// Note: This function will empty out the map entry if there are no more handlers registered.
pub fn removeReceiveHandler(
this: *Self,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
callback: JSValue,
) !?usize {
const map = this.subscriptionCallbackMap();
const existing = try this.subscriptionCallbackMap().get(globalObject, channelName);
const existing = try map.get(globalObject, channelName);
if (existing == null) {
// Could not find the channel, nothing to remove.
return null;
}
if (existing == null) {
// Could not find the channel, nothing to remove.
return null;
}
if (existing.?.isUndefinedOrNull()) {
// Nothing to remove.
return null;
}
if (existing.?.isUndefinedOrNull()) {
// Nothing to remove.
return null;
}
// Existing is guaranteed to be an array of callbacks.
bun.assert(existing.?.isArray());
// Existing is guaranteed to be an array of callbacks.
bun.assert(existing.?.isArray());
// TODO(markovejnovic): I can't find a better way to do this... I generate a new array,
// filtering out the callback we want to remove. This is woefully inefficient for large
// sets (and surprisingly fast for small sets of callbacks).
//
// Perhaps there is an avenue to build a generic iterator pattern? @taylor.fish and I have
// briefly expressed a desire for this, and I promised her I would look into it, but at
// this moment have no proposal.
var array_it = try existing.?.arrayIterator(globalObject);
const updated_array = try jsc.JSArray.createEmpty(globalObject, 0);
while (try array_it.next()) |iter| {
if (iter == callback)
// TODO(markovejnovic): I can't find a better way to do this... I generate a new array,
// filtering out the callback we want to remove. This is woefully inefficient for large
// sets (and surprisingly fast for small sets of callbacks).
//
// Perhaps there is an avenue to build a generic iterator pattern? @taylor.fish and I have
// briefly expressed a desire for this, and I promised her I would look into it, but at
// this moment have no proposal.
var array_it = try existing.?.arrayIterator(globalObject);
const updated_array = try jsc.JSArray.createEmpty(globalObject, 0);
while (try array_it.next()) |iter| {
if (iter == callback)
continue;
try updated_array.push(globalObject, iter);
try updated_array.push(globalObject, iter);
}
// Otherwise, we have ourselves an array of callbacks. We need to remove the element in the
// array that matches the callback.
_ = this.subscriptionCallbackMap().remove(globalObject, channelName);
// Only populate the map if we have remaining callbacks for this channel.
const new_length = (updated_array.arrayIterator(globalObject) catch unreachable).len;
if (new_length != 0) {
this.subscriptionCallbackMap().set(globalObject, channelName, updated_array);
} else {
this.parent().channel_subscription_count -= 1;
this.parent().updateHasPendingActivity();
}
return new_length;
}
// Otherwise, we have ourselves an array of callbacks. We need to remove the element in the
// array that matches the callback.
_ = map.remove(globalObject, channelName);
// Only populate the map if we have remaining callbacks for this channel.
const new_length = (updated_array.arrayIterator(globalObject) catch unreachable).len;
if (new_length != 0) {
map.set(globalObject, channelName, updated_array);
} else {
this._parent.channel_subscription_count -= 1;
this._parent.updateHasPendingActivity();
}
return new_length;
}
/// Add a handler for receiving messages on a specific channel
pub fn upsertReceiveHandler(
/// Add a handler for receiving messages on a specific channel
pub fn upsertReceiveHandler(
this: *Self,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
callback: JSValue,
) bun.JSError!void {
const map = this.subscriptionCallbackMap();
var handlers_array: JSValue = undefined;
var is_new_channel = false;
if (try map.get(globalObject, channelName)) |existing_handler_arr| {
debug("Adding a new receive handler.", .{});
if (existing_handler_arr.isUndefined()) {
// Create a new array if the existing_handler_arr is undefined/null
var handlers_array: JSValue = undefined;
var is_new_channel = false;
if (try this.subscriptionCallbackMap().get(globalObject, channelName)) |existing_handler_arr| {
debug("Adding a new receive handler.", .{});
if (existing_handler_arr.isUndefined()) {
// Create a new array if the existing_handler_arr is undefined/null
handlers_array = try jsc.JSArray.createEmpty(globalObject, 0);
is_new_channel = true;
} else if (existing_handler_arr.isArray()) {
// Use the existing array
handlers_array = existing_handler_arr;
} else unreachable;
} else {
// No existing_handler_arr exists, create a new array
handlers_array = try jsc.JSArray.createEmpty(globalObject, 0);
is_new_channel = true;
} else if (existing_handler_arr.isArray()) {
// Use the existing array
handlers_array = existing_handler_arr;
} else unreachable;
} else {
// No existing_handler_arr exists, create a new array
handlers_array = try jsc.JSArray.createEmpty(globalObject, 0);
is_new_channel = true;
}
}
// Append the new callback to the array
try handlers_array.push(globalObject, callback);
// Set the updated array back in the map
map.set(globalObject, channelName, handlers_array);
// Update subscription count if this is a new channel
if (is_new_channel) {
this._parent.channel_subscription_count += 1;
this._parent.updateHasPendingActivity();
}
}
pub fn registerCallback(this: *Self, globalObject: *jsc.JSGlobalObject, eventString: JSValue, callback: JSValue) bun.JSError!void {
this.subscriptionCallbackMap().set(globalObject, eventString, callback);
}
pub fn getCallbacks(this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue {
const result = try this.subscriptionCallbackMap().get(globalObject, channelName);
if (result) |r| {
if (r.isUndefinedOrNull()) {
return null;
try handlers_array.push(globalObject, callback);
this.subscriptionCallbackMap().set(globalObject, channelName, handlers_array);
// Update subscription count if this is a new channel
if (is_new_channel) {
this.parent().channel_subscription_count += 1;
this.parent().updateHasPendingActivity();
}
}
return result;
}
pub fn registerCallback(this: *Self, globalObject: *jsc.JSGlobalObject, eventString: JSValue, callback: JSValue) bun.JSError!void {
this._.set(globalObject, eventString, callback);
}
/// Invoke callbacks for a channel with the given arguments
/// Handles both single callbacks and arrays of callbacks
pub fn invokeCallback(
pub fn getCallbacks(this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue {
const result = try this.subscriptionCallbackMap().get(globalObject, channelName);
if (result) |r| {
if (r.isUndefinedOrNull()) {
return null;
}
}
return result;
}
/// Invoke callbacks for a channel with the given arguments
/// Handles both single callbacks and arrays of callbacks
pub fn invokeCallback(
this: *Self,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
args: []const JSValue,
) bun.JSError!void {
const callbacks = try this.getCallbacks(globalObject, channelName) orelse {
debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)});
return;
};
// If callbacks is an array, iterate and call each one
if (callbacks.isArray()) {
var iter = try callbacks.arrayIterator(globalObject);
while (try iter.next()) |callback| {
if (callback.isCallable()) {
_ = callback.call(globalObject, .js_undefined, args) catch |e| {
return e;
};
}
}
} else if (callbacks.isCallable()) {
_ = callbacks.call(globalObject, .js_undefined, args) catch |e| {
return e;
const callbacks = try this.getCallbacks(globalObject, channelName) orelse {
debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)});
return;
};
// If callbacks is an array, iterate and call each one
if (callbacks.isArray()) {
var iter = try callbacks.arrayIterator(globalObject);
while (try iter.next()) |callback| {
if (callback.isCallable()) {
_ = callback.call(globalObject, .js_undefined, args) catch |e| {
return e;
};
}
}
} else if (callbacks.isCallable()) {
_ = callbacks.call(globalObject, .js_undefined, args) catch |e| {
return e;
};
}
}
}
pub fn deinit(this: *Self) void {
this._parent.channel_subscription_count = 0;
this._parent.updateHasPendingActivity();
ParentJS.gc.set(
.subscriptionCallbackMap,
this._parent.this_value.get(),
this._parent.globalObject,
.js_undefined,
);
}
};
pub fn deinit(this: *Self) void {
this.parent().channel_subscription_count = 0;
this.parent().updateHasPendingActivity();
this._subscriptionCallbackMap = .js_undefined;
}
};
}
/// Valkey client wrapper for JavaScript
pub const JSValkeyClient = struct {
const SubscriptionCtxType = SubscriptionCtx(JSValkeyClient, "_subscription_ctx");
client: valkey.ValkeyClient,
globalObject: *jsc.JSGlobalObject,
this_value: jsc.JSRef = jsc.JSRef.empty(),
poll_ref: bun.Async.KeepAlive = .{},
_subscription_ctx: ?SubscriptionCtx,
_subscription_ctx: ?SubscriptionCtxType,
channel_subscription_count: u32 = 0,
has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
@@ -446,14 +437,15 @@ pub const JSValkeyClient = struct {
pub fn getOrCreateSubscriptionCtxEnteringSubscriptionMode(
this: *JSValkeyClient,
) *SubscriptionCtx {
) *SubscriptionCtxType {
if (this._subscription_ctx) |*ctx| {
// If we already have a subscription context, return it
return ctx;
}
// Save the original flag values and create a new subscription context
this._subscription_ctx = SubscriptionCtx.init(
this._subscription_ctx = SubscriptionCtxType.init(
this.globalObject,
this,
this.client.flags.enable_offline_queue,
this.client.flags.auto_pipelining,
@@ -1222,6 +1214,7 @@ fn SocketHandler(comptime ssl: bool) type {
pub fn onClose(this: *JSValkeyClient, _: SocketType, _: i32, _: ?*anyopaque) void {
// Ensure the socket pointer is updated.
this.client.socket = .{ .SocketTCP = .detached };
this.client.onClose();
}