Compare commits

...

2 Commits

Author SHA1 Message Date
Jarred Sumner
31d118796d WIP 2025-09-30 05:04:41 -07:00
Jarred Sumner
2555854240 Fix crash in Redis client observed in CI 2025-09-30 01:14:05 -07:00
5 changed files with 129 additions and 146 deletions

View File

@@ -1294,9 +1294,19 @@ pub fn setTLSDefaultCiphers(globalThis: *jsc.JSGlobalObject, _: *jsc.JSObject, c
}
pub fn getValkeyDefaultClient(globalThis: *jsc.JSGlobalObject, _: *jsc.JSObject) jsc.JSValue {
const valkey = jsc.API.Valkey.createFromJS(globalThis, &.{.js_undefined}) catch |err| {
if (err != error.JSError) {
_ = globalThis.throwError(err, "Failed to create Redis client") catch {};
return .zero;
}
return .zero;
};
const this_value = valkey.toJS(globalThis);
const SubscriptionCtx = @import("../../valkey/js_valkey.zig").SubscriptionCtx;
var valkey = jsc.API.Valkey.createNoJsNoPubsub(globalThis, &.{.js_undefined}) catch |err| {
valkey.this_value = .initWeak(this_value);
valkey._subscription_ctx = SubscriptionCtx.init(this_value, globalThis, valkey) catch |err| {
if (err != error.JSError) {
_ = globalThis.throwError(err, "Failed to create Redis client") catch {};
return .zero;
@@ -1304,18 +1314,7 @@ pub fn getValkeyDefaultClient(globalThis: *jsc.JSGlobalObject, _: *jsc.JSObject)
return .zero;
};
const as_js = valkey.toJS(globalThis);
valkey.this_value = jsc.JSRef.initWeak(as_js);
valkey._subscription_ctx = SubscriptionCtx.init(valkey) catch |err| {
if (err != error.JSError) {
_ = globalThis.throwError(err, "Failed to create Redis client") catch {};
return .zero;
}
return .zero;
};
return as_js;
return this_value;
}
pub fn getValkeyClientConstructor(globalThis: *jsc.JSGlobalObject, _: *jsc.JSObject) jsc.JSValue {

View File

@@ -6525,7 +6525,7 @@ CPP_DECL [[ZIG_EXPORT(check_slow)]] void JSC__JSMap__set(JSC::JSMap* map, JSC::J
map->set(arg1, JSC::JSValue::decode(JSValue2), JSC::JSValue::decode(JSValue3));
}
CPP_DECL [[ZIG_EXPORT(check_slow)]] uint32_t JSC__JSMap__size(JSC::JSMap* map, JSC::JSGlobalObject* arg1)
CPP_DECL [[ZIG_EXPORT(nothrow)]] uint32_t JSC__JSMap__size(JSC::JSMap* map, JSC::JSGlobalObject* arg1)
{
return map->size();
}

View File

@@ -1,19 +1,22 @@
pub const SubscriptionCtx = struct {
const Self = @This();
is_subscriber: bool,
original_enable_offline_queue: bool,
original_enable_auto_pipelining: bool,
pub const empty: SubscriptionCtx = .{
.original_enable_offline_queue = false,
.original_enable_auto_pipelining = false,
.is_subscriber = false,
};
const ParentJS = JSValkeyClient.js;
pub fn init(valkey_parent: *JSValkeyClient) bun.JSError!Self {
const callback_map = jsc.JSMap.create(valkey_parent.globalObject);
const parent_this = valkey_parent.this_value.tryGet() orelse unreachable;
pub fn init(parent_js_value: JSValue, globalObject: *jsc.JSGlobalObject, valkey_parent: *JSValkeyClient) bun.JSError!SubscriptionCtx {
const callback_map = jsc.JSMap.create(globalObject);
ParentJS.gc.set(.subscriptionCallbackMap, parent_this, valkey_parent.globalObject, callback_map);
ParentJS.gc.set(.subscriptionCallbackMap, parent_js_value, globalObject, callback_map);
const self = Self{
const self = SubscriptionCtx{
.original_enable_offline_queue = valkey_parent.client.flags.enable_offline_queue,
.original_enable_auto_pipelining = valkey_parent.client.flags.enable_auto_pipelining,
.is_subscriber = false,
@@ -25,7 +28,11 @@ pub const SubscriptionCtx = struct {
return @alignCast(@fieldParentPtr("_subscription_ctx", this));
}
fn subscriptionCallbackMap(this: *Self) *jsc.JSMap {
fn subscriptionCallbackMap(this: *SubscriptionCtx) *jsc.JSMap {
if (comptime bun.Environment.isDebug or bun.asan.enabled) {
bun.assert(!this.parent().client.flags.finalized); // can't access JSMap after JSValue is finalized.
}
const parent_this = this.parent().this_value.tryGet() orelse unreachable;
const value_js = ParentJS.gc.get(.subscriptionCallbackMap, parent_this).?;
@@ -33,20 +40,18 @@ pub const SubscriptionCtx = struct {
}
/// Get the total number of channels that this subscription context is subscribed to.
pub fn channelsSubscribedToCount(this: *Self, globalObject: *jsc.JSGlobalObject) bun.JSError!u32 {
const count = try this.subscriptionCallbackMap().size(globalObject);
return count;
pub fn channelsSubscribedToCount(this: *SubscriptionCtx, globalObject: *jsc.JSGlobalObject) u32 {
return this.subscriptionCallbackMap().size(globalObject);
}
/// Test whether this context has any subscriptions. It is mandatory to
/// guard deinit with this function.
pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bun.JSError!bool {
return (try this.channelsSubscribedToCount(globalObject)) > 0;
pub fn hasSubscriptions(this: *SubscriptionCtx, globalObject: *jsc.JSGlobalObject) bool {
return this.channelsSubscribedToCount(globalObject) > 0;
}
pub fn clearReceiveHandlers(
this: *Self,
this: *SubscriptionCtx,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
) bun.JSError!void {
@@ -54,7 +59,7 @@ pub const SubscriptionCtx = struct {
_ = try map.remove(globalObject, channelName);
}
pub fn clearAllReceiveHandlers(this: *Self, globalObject: *jsc.JSGlobalObject) bun.JSError!void {
pub fn clearAllReceiveHandlers(this: *SubscriptionCtx, globalObject: *jsc.JSGlobalObject) bun.JSError!void {
try this.subscriptionCallbackMap().clear(globalObject);
}
@@ -65,7 +70,7 @@ pub const SubscriptionCtx = struct {
///
/// Note: This function will empty out the map entry if there are no more handlers registered.
pub fn removeReceiveHandler(
this: *Self,
this: *SubscriptionCtx,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
callback: JSValue,
@@ -117,7 +122,7 @@ pub const SubscriptionCtx = struct {
/// Add a handler for receiving messages on a specific channel
pub fn upsertReceiveHandler(
this: *Self,
this: *SubscriptionCtx,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
callback: JSValue,
@@ -153,7 +158,7 @@ pub const SubscriptionCtx = struct {
try map.set(globalObject, channelName, handlers_array);
}
pub fn getCallbacks(this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue {
pub fn getCallbacks(this: *SubscriptionCtx, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue {
const result = try this.subscriptionCallbackMap().get(globalObject, channelName);
if (result == .js_undefined) {
return null;
@@ -165,13 +170,14 @@ pub const SubscriptionCtx = struct {
/// Invoke callbacks for a channel with the given arguments
/// Handles both single callbacks and arrays of callbacks
pub fn invokeCallbacks(
this: *Self,
this: *SubscriptionCtx,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
args: []const JSValue,
) bun.JSError!void {
const callbacks = try this.getCallbacks(globalObject, channelName) orelse {
debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)});
if (comptime bun.Environment.enable_logs)
debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)});
return;
};
@@ -191,30 +197,16 @@ pub const SubscriptionCtx = struct {
// If callbacks is an array, iterate and call each one
var iter = try callbacks.arrayIterator(globalObject);
while (try iter.next()) |callback| {
if (comptime bun.Environment.isDebug) {
bun.assert(callback.isCallable());
}
event_loop.runCallback(callback, globalObject, .js_undefined, args);
}
}
/// Return whether the subscription context is ready to be deleted by the JS garbage collector.
pub fn isDeletable(this: *Self, global_object: *jsc.JSGlobalObject) bun.JSError!bool {
pub fn isDeletable(this: *SubscriptionCtx, global_object: *jsc.JSGlobalObject) bun.JSError!bool {
// The user may request .close(), in which case we can dispose of the subscription object. If that is the case,
// finalized will be true. Otherwise, we should treat the object as disposable if there are no active
// subscriptions.
return this.parent().client.flags.finalized or !(try this.hasSubscriptions(global_object));
}
pub fn deinit(this: *Self, global_object: *jsc.JSGlobalObject) void {
if (comptime bun.Environment.isDebug) {
bun.debugAssert(this.isDeletable(this.parent().globalObject) catch unreachable);
}
if (this.parent().this_value.tryGet()) |parent_this| {
ParentJS.gc.set(.subscriptionCallbackMap, parent_this, global_object, .js_undefined);
}
return this.parent().client.flags.finalized or !this.hasSubscriptions(global_object);
}
};
@@ -245,7 +237,6 @@ pub const JSValkeyClient = struct {
ref_count: RefCount,
pub const js = jsc.Codegen.JSRedisClient;
pub const toJS = js.toJS;
pub const fromJS = js.fromJS;
pub const fromJSDirect = js.fromJSDirect;
@@ -254,15 +245,21 @@ pub const JSValkeyClient = struct {
pub const deref = RefCount.deref;
pub const new = bun.TrivialNew(@This());
pub fn toJS(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject) JSValue {
if (comptime bun.Environment.isDebug or bun.asan.enabled) {
bun.assert(this.this_value.isEmpty());
}
return js.toJS(this, globalObject);
}
// Factory function to create a new Valkey client from JS
pub fn constructor(globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, js_this: JSValue) bun.JSError!*JSValkeyClient {
return try create(globalObject, callframe.arguments(), js_this);
}
/// Create a Valkey client that does not have an associated JS object nor a SubscriptionCtx.
///
/// This whole client needs a refactor.
pub fn createNoJsNoPubsub(globalObject: *jsc.JSGlobalObject, arguments: []const JSValue) bun.JSError!*JSValkeyClient {
pub fn createFromJS(globalObject: *jsc.JSGlobalObject, arguments: []const JSValue) bun.JSError!*JSValkeyClient {
const this_allocator = bun.default_allocator;
const vm = globalObject.bunVM();
@@ -382,13 +379,13 @@ pub const JSValkeyClient = struct {
}
pub fn create(globalObject: *jsc.JSGlobalObject, arguments: []const JSValue, js_this: JSValue) bun.JSError!*JSValkeyClient {
var new_client = try JSValkeyClient.createNoJsNoPubsub(globalObject, arguments);
var new_client = try JSValkeyClient.createFromJS(globalObject, arguments);
// Initially, we only need to hold a weak reference to the JS object.
new_client.this_value = jsc.JSRef.initWeak(js_this);
// Need to associate the subscription context, after the JS ref has been populated.
new_client._subscription_ctx = try SubscriptionCtx.init(new_client);
new_client._subscription_ctx = try SubscriptionCtx.init(js_this, globalObject, new_client);
return new_client;
}
@@ -420,7 +417,11 @@ pub const JSValkeyClient = struct {
return JSValkeyClient.new(.{
.ref_count = .init(),
._subscription_ctx = undefined,
._subscription_ctx = .{
.is_subscriber = false,
.original_enable_offline_queue = false,
.original_enable_auto_pipelining = false,
},
.client = .{
.vm = vm,
.address = switch (this.client.protocol) {
@@ -495,12 +496,13 @@ pub const JSValkeyClient = struct {
}
pub fn removeSubscription(this: *JSValkeyClient) void {
debug("removeSubscription: entering, has subscriptions: {}", .{this._subscription_ctx.hasSubscriptions(this.globalObject) catch false});
if (comptime bun.Environment.enable_logs)
debug("removeSubscription: entering, has subscriptions: {}", .{this._subscription_ctx.hasSubscriptions(this.globalObject)});
this.ref();
defer this.deref();
// This is the last subscription, restore original flags
if (!(this._subscription_ctx.hasSubscriptions(this.globalObject) catch false)) {
if (!this._subscription_ctx.hasSubscriptions(this.globalObject)) {
this.client.flags.enable_offline_queue = this._subscription_ctx.original_enable_offline_queue;
this.client.flags.enable_auto_pipelining = this._subscription_ctx.original_enable_auto_pipelining;
this._subscription_ctx.is_subscriber = false;
@@ -510,32 +512,6 @@ pub const JSValkeyClient = struct {
debug("removeSubscription: exiting", .{});
}
pub fn getOrCreateSubscriptionCtx(
this: *JSValkeyClient,
) bun.JSError!*SubscriptionCtx {
if (this._subscription_ctx) |*ctx| {
// If we already have a subscription context, return it
return ctx;
}
// Save the original flag values and create a new subscription context
this._subscription_ctx = try SubscriptionCtx.init(
this,
this.client.flags.enable_offline_queue,
this.client.flags.enable_auto_pipelining,
);
// We need to make sure we disable the offline queue, but we actually want to make sure that our HELLO message
// goes through first. Consequently, we only disable the offline queue if we're already connected.
if (this.client.status == .connected) {
this.client.flags.enable_offline_queue = false;
}
this.client.flags.enable_auto_pipelining = false;
return &(this._subscription_ctx.?);
}
pub fn isSubscriber(this: *const JSValkeyClient) bool {
return this._subscription_ctx.is_subscriber;
}
@@ -561,16 +537,16 @@ pub const JSValkeyClient = struct {
// If already connected, resolve immediately
if (this.client.status == .connected) {
return jsc.JSPromise.resolvedPromiseValue(globalObject, js.helloGetCached(this_value) orelse .js_undefined);
return jsc.JSPromise.resolvedPromiseValue(globalObject, js.gc.hello.get(this_value) orelse .js_undefined);
}
if (js.connectionPromiseGetCached(this_value)) |promise| {
if (js.gc.connectionPromise.get(this_value)) |promise| {
return promise;
}
const promise_ptr = jsc.JSPromise.create(globalObject);
const promise = promise_ptr.toJS();
js.connectionPromiseSetCached(this_value, globalObject, promise);
js.gc.connectionPromise.set(this_value, globalObject, promise);
// If was manually closed, reset that flag
this.client.flags.is_manually_closed = false;
@@ -587,6 +563,7 @@ pub const JSValkeyClient = struct {
event_loop.enter();
defer event_loop.exit();
promise_ptr.reject(globalObject, err_value);
js.gc.connectionPromise.set(this_value, globalObject, .zero);
return promise;
};
@@ -624,25 +601,25 @@ pub const JSValkeyClient = struct {
}
pub fn getOnConnect(_: *JSValkeyClient, thisValue: JSValue, _: *jsc.JSGlobalObject) JSValue {
if (js.onconnectGetCached(thisValue)) |value| {
if (js.gc.onconnect.get(thisValue)) |value| {
return value;
}
return .js_undefined;
}
pub fn setOnConnect(_: *JSValkeyClient, thisValue: JSValue, globalObject: *jsc.JSGlobalObject, value: JSValue) void {
js.onconnectSetCached(thisValue, globalObject, value);
js.gc.onconnect.set(thisValue, globalObject, value);
}
pub fn getOnClose(_: *JSValkeyClient, thisValue: JSValue, _: *jsc.JSGlobalObject) JSValue {
if (js.oncloseGetCached(thisValue)) |value| {
if (js.gc.onclose.get(thisValue)) |value| {
return value;
}
return .js_undefined;
}
pub fn setOnClose(_: *JSValkeyClient, thisValue: JSValue, globalObject: *jsc.JSGlobalObject, value: JSValue) void {
js.oncloseSetCached(thisValue, globalObject, value);
js.gc.onclose.set(thisValue, globalObject, value);
}
/// Safely add a timer with proper reference counting and event loop keepalive
@@ -810,24 +787,28 @@ pub const JSValkeyClient = struct {
break :js_hello .js_undefined;
};
};
js.helloSetCached(this_value, globalObject, hello_value);
js.gc.hello.set(this_value, globalObject, hello_value);
// Call onConnect callback if defined by the user
if (js.onconnectGetCached(this_value)) |on_connect| {
if (js.gc.onconnect.get(this_value)) |on_connect| {
const js_value = this_value;
js_value.ensureStillAlive();
globalObject.queueMicrotask(on_connect, &[_]JSValue{ js_value, hello_value });
}
if (js.connectionPromiseGetCached(this_value)) |promise| {
js.connectionPromiseSetCached(this_value, globalObject, .zero);
const js_promise = promise.asPromise().?;
if (this.client.flags.connection_promise_returns_client) {
debug("Resolving connection promise with client instance", .{});
js_promise.resolve(globalObject, this_value);
} else {
debug("Resolving connection promise with HELLO response", .{});
js_promise.resolve(globalObject, hello_value);
if (js.gc.connectionPromise.get(this_value)) |promise_value| {
const promise = promise_value.asPromise().?;
if (promise.status(this.client.vm.jsc_vm) == .pending) {
if (this.client.flags.connection_promise_returns_client) {
debug("Resolving connection promise with client instance", .{});
promise.resolve(globalObject, this_value);
} else {
debug("Resolving connection promise with HELLO response", .{});
promise.resolve(globalObject, hello_value);
}
}
js.gc.connectionPromise.set(this_value, globalObject, .zero);
this.client.flags.connection_promise_returns_client = false;
}
}
@@ -930,28 +911,31 @@ pub const JSValkeyClient = struct {
const this_jsvalue = this.this_value.tryGet() orelse return;
this_jsvalue.ensureStillAlive();
defer this_jsvalue.ensureStillAlive();
// Create an error value
const error_value = protocol.valkeyErrorToJS(globalObject, "Connection closed", protocol.RedisError.ConnectionClosed);
const loop = this.client.vm.eventLoop();
loop.enter();
defer loop.exit();
{
loop.enter();
defer loop.exit();
if (!this_jsvalue.isUndefined()) {
if (js.connectionPromiseGetCached(this_jsvalue)) |promise| {
js.connectionPromiseSetCached(this_jsvalue, globalObject, .zero);
promise.asPromise().?.reject(globalObject, error_value);
if (js.gc.connectionPromise.get(this_jsvalue)) |promise_value| {
const promise = promise_value.asPromise().?;
if (promise.status(this.client.vm.jsc_vm) == .pending) {
promise.reject(globalObject, error_value);
}
}
}
// Call onClose callback if it exists
if (js.oncloseGetCached(this_jsvalue)) |on_close| {
_ = on_close.call(
globalObject,
this_jsvalue,
&[_]JSValue{error_value},
) catch |e| globalObject.reportActiveExceptionAsUnhandled(e);
// Call onClose callback if it exists
if (js.gc.onclose.get(this_jsvalue)) |on_close| {
_ = on_close.call(
globalObject,
this_jsvalue,
&[_]JSValue{error_value},
) catch |e| globalObject.reportActiveExceptionAsUnhandled(e);
}
}
}
@@ -967,7 +951,7 @@ pub const JSValkeyClient = struct {
pub fn failWithJSValue(this: *JSValkeyClient, value: JSValue) void {
const this_value = this.this_value.tryGet() orelse return;
const globalObject = this.globalObject;
if (js.oncloseGetCached(this_value)) |on_close| {
if (js.gc.onclose.get(this_value)) |on_close| {
const loop = this.client.vm.eventLoop();
loop.enter();
defer loop.exit();
@@ -1181,12 +1165,10 @@ pub const JSValkeyClient = struct {
// This is a mess beyond belief and it is incredibly fragile.
const has_pending_commands = this.client.hasAnyPendingCommands();
// isDeletable may throw an exception, and if it does, we have to assume
// that the object still has references. Best we can do is hope nothing
// catastrophic happens.
const subs_deletable: bool = !(this._subscription_ctx.hasSubscriptions(this.globalObject) catch false);
const has_active_subscriptions: bool = !this.client.flags.finalized and
(this._subscription_ctx.is_subscriber and this._subscription_ctx.hasSubscriptions(this.globalObject));
const has_activity = has_pending_commands or !subs_deletable or this.client.flags.is_reconnecting;
const has_activity = has_pending_commands or has_active_subscriptions or this.client.flags.is_reconnecting;
// There's a couple cases to handle here:
if (has_activity) {
@@ -1207,18 +1189,17 @@ pub const JSValkeyClient = struct {
// object.
switch (this.client.status) {
.connecting, .connected => {
// Whenever we're connected, we need to keep the object alive.
//
// TODO(markovejnovic): This is a leak.
// Note this is an intentional leak. Unless the user manually
// closes the connection, the object will stay alive forever,
// even if it falls out of scope. This is kind of stupid, since
// if the object is out of scope, and isn't subscribed upon,
// how exactly is the user going to call anything on the object?
//
// It is 100% safe to drop the strong reference there and let
// the object be GC'd, but we're not doing that now.
debug("upgrading this_value since we are connected/connecting", .{});
// Since we provide the RedisClient in the `this` value of
// callbacks, we must keep the this value alive.
//
// And for subscriptions, you want the connection to stay alive.
// so that you can continue to receive messages. That is the
// purpose of a subscription.
//
// And for other cases, where there are in-flight promises
// we want to make sure those stay alive while they're needed.
this.this_value.upgrade(this.globalObject);
},
.disconnected, .failed => {
@@ -1330,12 +1311,13 @@ fn SocketHandler(comptime ssl: bool) type {
}
fn onHandshake_(this: *JSValkeyClient, _: anytype, success: i32, ssl_error: uws.us_bun_verify_error_t) void {
debug("onHandshake: {d} error={d} reason={s} code={s}", .{
success,
ssl_error.error_no,
if (ssl_error.reason != null) bun.span(ssl_error.reason[0..bun.len(ssl_error.reason) :0]) else "no reason",
if (ssl_error.code != null) bun.span(ssl_error.code[0..bun.len(ssl_error.code) :0]) else "no code",
});
if (comptime bun.Environment.enable_logs)
debug("onHandshake: {d} error={d} reason={s} code={s}", .{
success,
ssl_error.error_no,
if (ssl_error.reason != null) bun.span(ssl_error.reason[0..bun.len(ssl_error.reason) :0]) else "no reason",
if (ssl_error.code != null) bun.span(ssl_error.code[0..bun.len(ssl_error.code) :0]) else "no code",
});
const handshake_success = if (success == 1) true else false;
this.ref();
defer this.deref();
@@ -1369,7 +1351,9 @@ fn SocketHandler(comptime ssl: bool) type {
pub const onHandshake = if (ssl) onHandshake_ else null;
pub fn onClose(this: *JSValkeyClient, _: SocketType, _: i32, _: ?*anyopaque) void {
// No need to deref since this.client.onClose() invokes onValkeyClose which does the deref.
// We must ref/deref when it's a TLS connection failure.
this.ref();
defer this.deref();
debug("Socket closed.", .{});
@@ -1383,7 +1367,7 @@ fn SocketHandler(comptime ssl: bool) type {
pub fn onEnd(this: *JSValkeyClient, socket: SocketType) void {
_ = this;
_ = socket;
// Half-opened sockets are not allowed.
@panic("Assertion failure: onEnd should not be called for Valkey client");
}
pub fn onConnectError(this: *JSValkeyClient, _: SocketType, _: i32) void {

View File

@@ -959,7 +959,7 @@ pub fn duplicate(
const new_client_js = new_client.toJS(globalObject);
new_client.this_value = jsc.JSRef.initWeak(new_client_js);
new_client._subscription_ctx = try SubscriptionCtx.init(new_client);
new_client._subscription_ctx = try SubscriptionCtx.init(new_client_js, globalObject, new_client);
// If the original client is already connected and not manually closed, start connecting the new client.
if (this.client.status == .connected and !this.client.flags.is_manually_closed) {
// Use strong reference during connection to prevent premature GC

View File

@@ -675,7 +675,7 @@ pub const ValkeyClient = struct {
},
.Push => |push| {
const p = this.parent();
const sub_count = try p._subscription_ctx.channelsSubscribedToCount(globalThis);
const subscriptions_count = p._subscription_ctx.channelsSubscribedToCount(globalThis);
if (protocol.SubscriptionPushMessage.map.get(push.kind)) |msg_type| {
switch (msg_type) {
@@ -690,7 +690,7 @@ pub const ValkeyClient = struct {
// For SUBSCRIBE responses, only resolve the promise for the first channel confirmation
// Additional channel confirmations from multi-channel SUBSCRIBE commands don't need promise pairs
if (pair) |req_pair| {
req_pair.promise.promise.resolve(globalThis, .jsNumber(sub_count));
req_pair.promise.promise.resolve(globalThis, .jsNumber(subscriptions_count));
}
return .handled;
},