|
|
|
|
@@ -1,19 +1,22 @@
|
|
|
|
|
pub const SubscriptionCtx = struct {
|
|
|
|
|
const Self = @This();
|
|
|
|
|
|
|
|
|
|
is_subscriber: bool,
|
|
|
|
|
original_enable_offline_queue: bool,
|
|
|
|
|
original_enable_auto_pipelining: bool,
|
|
|
|
|
|
|
|
|
|
pub const empty: SubscriptionCtx = .{
|
|
|
|
|
.original_enable_offline_queue = false,
|
|
|
|
|
.original_enable_auto_pipelining = false,
|
|
|
|
|
.is_subscriber = false,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const ParentJS = JSValkeyClient.js;
|
|
|
|
|
|
|
|
|
|
pub fn init(valkey_parent: *JSValkeyClient) bun.JSError!Self {
|
|
|
|
|
const callback_map = jsc.JSMap.create(valkey_parent.globalObject);
|
|
|
|
|
const parent_this = valkey_parent.this_value.tryGet() orelse unreachable;
|
|
|
|
|
pub fn init(parent_js_value: JSValue, globalObject: *jsc.JSGlobalObject, valkey_parent: *JSValkeyClient) bun.JSError!SubscriptionCtx {
|
|
|
|
|
const callback_map = jsc.JSMap.create(globalObject);
|
|
|
|
|
|
|
|
|
|
ParentJS.gc.set(.subscriptionCallbackMap, parent_this, valkey_parent.globalObject, callback_map);
|
|
|
|
|
ParentJS.gc.set(.subscriptionCallbackMap, parent_js_value, globalObject, callback_map);
|
|
|
|
|
|
|
|
|
|
const self = Self{
|
|
|
|
|
const self = SubscriptionCtx{
|
|
|
|
|
.original_enable_offline_queue = valkey_parent.client.flags.enable_offline_queue,
|
|
|
|
|
.original_enable_auto_pipelining = valkey_parent.client.flags.enable_auto_pipelining,
|
|
|
|
|
.is_subscriber = false,
|
|
|
|
|
@@ -25,7 +28,11 @@ pub const SubscriptionCtx = struct {
|
|
|
|
|
return @alignCast(@fieldParentPtr("_subscription_ctx", this));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn subscriptionCallbackMap(this: *Self) *jsc.JSMap {
|
|
|
|
|
fn subscriptionCallbackMap(this: *SubscriptionCtx) *jsc.JSMap {
|
|
|
|
|
if (comptime bun.Environment.isDebug or bun.asan.enabled) {
|
|
|
|
|
bun.assert(!this.parent().client.flags.finalized); // can't access JSMap after JSValue is finalized.
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const parent_this = this.parent().this_value.tryGet() orelse unreachable;
|
|
|
|
|
|
|
|
|
|
const value_js = ParentJS.gc.get(.subscriptionCallbackMap, parent_this).?;
|
|
|
|
|
@@ -33,20 +40,18 @@ pub const SubscriptionCtx = struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get the total number of channels that this subscription context is subscribed to.
|
|
|
|
|
pub fn channelsSubscribedToCount(this: *Self, globalObject: *jsc.JSGlobalObject) bun.JSError!u32 {
|
|
|
|
|
const count = try this.subscriptionCallbackMap().size(globalObject);
|
|
|
|
|
|
|
|
|
|
return count;
|
|
|
|
|
pub fn channelsSubscribedToCount(this: *SubscriptionCtx, globalObject: *jsc.JSGlobalObject) u32 {
|
|
|
|
|
return this.subscriptionCallbackMap().size(globalObject);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Test whether this context has any subscriptions. It is mandatory to
|
|
|
|
|
/// guard deinit with this function.
|
|
|
|
|
pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bun.JSError!bool {
|
|
|
|
|
return (try this.channelsSubscribedToCount(globalObject)) > 0;
|
|
|
|
|
pub fn hasSubscriptions(this: *SubscriptionCtx, globalObject: *jsc.JSGlobalObject) bool {
|
|
|
|
|
return this.channelsSubscribedToCount(globalObject) > 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn clearReceiveHandlers(
|
|
|
|
|
this: *Self,
|
|
|
|
|
this: *SubscriptionCtx,
|
|
|
|
|
globalObject: *jsc.JSGlobalObject,
|
|
|
|
|
channelName: JSValue,
|
|
|
|
|
) bun.JSError!void {
|
|
|
|
|
@@ -54,7 +59,7 @@ pub const SubscriptionCtx = struct {
|
|
|
|
|
_ = try map.remove(globalObject, channelName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn clearAllReceiveHandlers(this: *Self, globalObject: *jsc.JSGlobalObject) bun.JSError!void {
|
|
|
|
|
pub fn clearAllReceiveHandlers(this: *SubscriptionCtx, globalObject: *jsc.JSGlobalObject) bun.JSError!void {
|
|
|
|
|
try this.subscriptionCallbackMap().clear(globalObject);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -65,7 +70,7 @@ pub const SubscriptionCtx = struct {
|
|
|
|
|
///
|
|
|
|
|
/// Note: This function will empty out the map entry if there are no more handlers registered.
|
|
|
|
|
pub fn removeReceiveHandler(
|
|
|
|
|
this: *Self,
|
|
|
|
|
this: *SubscriptionCtx,
|
|
|
|
|
globalObject: *jsc.JSGlobalObject,
|
|
|
|
|
channelName: JSValue,
|
|
|
|
|
callback: JSValue,
|
|
|
|
|
@@ -117,7 +122,7 @@ pub const SubscriptionCtx = struct {
|
|
|
|
|
|
|
|
|
|
/// Add a handler for receiving messages on a specific channel
|
|
|
|
|
pub fn upsertReceiveHandler(
|
|
|
|
|
this: *Self,
|
|
|
|
|
this: *SubscriptionCtx,
|
|
|
|
|
globalObject: *jsc.JSGlobalObject,
|
|
|
|
|
channelName: JSValue,
|
|
|
|
|
callback: JSValue,
|
|
|
|
|
@@ -153,7 +158,7 @@ pub const SubscriptionCtx = struct {
|
|
|
|
|
try map.set(globalObject, channelName, handlers_array);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn getCallbacks(this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue {
|
|
|
|
|
pub fn getCallbacks(this: *SubscriptionCtx, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue {
|
|
|
|
|
const result = try this.subscriptionCallbackMap().get(globalObject, channelName);
|
|
|
|
|
if (result == .js_undefined) {
|
|
|
|
|
return null;
|
|
|
|
|
@@ -165,13 +170,14 @@ pub const SubscriptionCtx = struct {
|
|
|
|
|
/// Invoke callbacks for a channel with the given arguments
|
|
|
|
|
/// Handles both single callbacks and arrays of callbacks
|
|
|
|
|
pub fn invokeCallbacks(
|
|
|
|
|
this: *Self,
|
|
|
|
|
this: *SubscriptionCtx,
|
|
|
|
|
globalObject: *jsc.JSGlobalObject,
|
|
|
|
|
channelName: JSValue,
|
|
|
|
|
args: []const JSValue,
|
|
|
|
|
) bun.JSError!void {
|
|
|
|
|
const callbacks = try this.getCallbacks(globalObject, channelName) orelse {
|
|
|
|
|
debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)});
|
|
|
|
|
if (comptime bun.Environment.enable_logs)
|
|
|
|
|
debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)});
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@@ -191,30 +197,16 @@ pub const SubscriptionCtx = struct {
|
|
|
|
|
// If callbacks is an array, iterate and call each one
|
|
|
|
|
var iter = try callbacks.arrayIterator(globalObject);
|
|
|
|
|
while (try iter.next()) |callback| {
|
|
|
|
|
if (comptime bun.Environment.isDebug) {
|
|
|
|
|
bun.assert(callback.isCallable());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
event_loop.runCallback(callback, globalObject, .js_undefined, args);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Return whether the subscription context is ready to be deleted by the JS garbage collector.
|
|
|
|
|
pub fn isDeletable(this: *Self, global_object: *jsc.JSGlobalObject) bun.JSError!bool {
|
|
|
|
|
pub fn isDeletable(this: *SubscriptionCtx, global_object: *jsc.JSGlobalObject) bun.JSError!bool {
|
|
|
|
|
// The user may request .close(), in which case we can dispose of the subscription object. If that is the case,
|
|
|
|
|
// finalized will be true. Otherwise, we should treat the object as disposable if there are no active
|
|
|
|
|
// subscriptions.
|
|
|
|
|
return this.parent().client.flags.finalized or !(try this.hasSubscriptions(global_object));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn deinit(this: *Self, global_object: *jsc.JSGlobalObject) void {
|
|
|
|
|
if (comptime bun.Environment.isDebug) {
|
|
|
|
|
bun.debugAssert(this.isDeletable(this.parent().globalObject) catch unreachable);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (this.parent().this_value.tryGet()) |parent_this| {
|
|
|
|
|
ParentJS.gc.set(.subscriptionCallbackMap, parent_this, global_object, .js_undefined);
|
|
|
|
|
}
|
|
|
|
|
return this.parent().client.flags.finalized or !this.hasSubscriptions(global_object);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@@ -245,7 +237,6 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
ref_count: RefCount,
|
|
|
|
|
|
|
|
|
|
pub const js = jsc.Codegen.JSRedisClient;
|
|
|
|
|
pub const toJS = js.toJS;
|
|
|
|
|
pub const fromJS = js.fromJS;
|
|
|
|
|
pub const fromJSDirect = js.fromJSDirect;
|
|
|
|
|
|
|
|
|
|
@@ -254,15 +245,21 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
pub const deref = RefCount.deref;
|
|
|
|
|
pub const new = bun.TrivialNew(@This());
|
|
|
|
|
|
|
|
|
|
pub fn toJS(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject) JSValue {
|
|
|
|
|
if (comptime bun.Environment.isDebug or bun.asan.enabled) {
|
|
|
|
|
bun.assert(this.this_value.isEmpty());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return js.toJS(this, globalObject);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Factory function to create a new Valkey client from JS
|
|
|
|
|
pub fn constructor(globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, js_this: JSValue) bun.JSError!*JSValkeyClient {
|
|
|
|
|
return try create(globalObject, callframe.arguments(), js_this);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Create a Valkey client that does not have an associated JS object nor a SubscriptionCtx.
|
|
|
|
|
///
|
|
|
|
|
/// This whole client needs a refactor.
|
|
|
|
|
pub fn createNoJsNoPubsub(globalObject: *jsc.JSGlobalObject, arguments: []const JSValue) bun.JSError!*JSValkeyClient {
|
|
|
|
|
pub fn createFromJS(globalObject: *jsc.JSGlobalObject, arguments: []const JSValue) bun.JSError!*JSValkeyClient {
|
|
|
|
|
const this_allocator = bun.default_allocator;
|
|
|
|
|
|
|
|
|
|
const vm = globalObject.bunVM();
|
|
|
|
|
@@ -382,13 +379,13 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn create(globalObject: *jsc.JSGlobalObject, arguments: []const JSValue, js_this: JSValue) bun.JSError!*JSValkeyClient {
|
|
|
|
|
var new_client = try JSValkeyClient.createNoJsNoPubsub(globalObject, arguments);
|
|
|
|
|
var new_client = try JSValkeyClient.createFromJS(globalObject, arguments);
|
|
|
|
|
|
|
|
|
|
// Initially, we only need to hold a weak reference to the JS object.
|
|
|
|
|
new_client.this_value = jsc.JSRef.initWeak(js_this);
|
|
|
|
|
|
|
|
|
|
// Need to associate the subscription context, after the JS ref has been populated.
|
|
|
|
|
new_client._subscription_ctx = try SubscriptionCtx.init(new_client);
|
|
|
|
|
new_client._subscription_ctx = try SubscriptionCtx.init(js_this, globalObject, new_client);
|
|
|
|
|
|
|
|
|
|
return new_client;
|
|
|
|
|
}
|
|
|
|
|
@@ -420,7 +417,11 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
|
|
|
|
|
return JSValkeyClient.new(.{
|
|
|
|
|
.ref_count = .init(),
|
|
|
|
|
._subscription_ctx = undefined,
|
|
|
|
|
._subscription_ctx = .{
|
|
|
|
|
.is_subscriber = false,
|
|
|
|
|
.original_enable_offline_queue = false,
|
|
|
|
|
.original_enable_auto_pipelining = false,
|
|
|
|
|
},
|
|
|
|
|
.client = .{
|
|
|
|
|
.vm = vm,
|
|
|
|
|
.address = switch (this.client.protocol) {
|
|
|
|
|
@@ -495,12 +496,13 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn removeSubscription(this: *JSValkeyClient) void {
|
|
|
|
|
debug("removeSubscription: entering, has subscriptions: {}", .{this._subscription_ctx.hasSubscriptions(this.globalObject) catch false});
|
|
|
|
|
if (comptime bun.Environment.enable_logs)
|
|
|
|
|
debug("removeSubscription: entering, has subscriptions: {}", .{this._subscription_ctx.hasSubscriptions(this.globalObject)});
|
|
|
|
|
this.ref();
|
|
|
|
|
defer this.deref();
|
|
|
|
|
|
|
|
|
|
// This is the last subscription, restore original flags
|
|
|
|
|
if (!(this._subscription_ctx.hasSubscriptions(this.globalObject) catch false)) {
|
|
|
|
|
if (!this._subscription_ctx.hasSubscriptions(this.globalObject)) {
|
|
|
|
|
this.client.flags.enable_offline_queue = this._subscription_ctx.original_enable_offline_queue;
|
|
|
|
|
this.client.flags.enable_auto_pipelining = this._subscription_ctx.original_enable_auto_pipelining;
|
|
|
|
|
this._subscription_ctx.is_subscriber = false;
|
|
|
|
|
@@ -510,32 +512,6 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
debug("removeSubscription: exiting", .{});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn getOrCreateSubscriptionCtx(
|
|
|
|
|
this: *JSValkeyClient,
|
|
|
|
|
) bun.JSError!*SubscriptionCtx {
|
|
|
|
|
if (this._subscription_ctx) |*ctx| {
|
|
|
|
|
// If we already have a subscription context, return it
|
|
|
|
|
return ctx;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Save the original flag values and create a new subscription context
|
|
|
|
|
this._subscription_ctx = try SubscriptionCtx.init(
|
|
|
|
|
this,
|
|
|
|
|
this.client.flags.enable_offline_queue,
|
|
|
|
|
this.client.flags.enable_auto_pipelining,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// We need to make sure we disable the offline queue, but we actually want to make sure that our HELLO message
|
|
|
|
|
// goes through first. Consequently, we only disable the offline queue if we're already connected.
|
|
|
|
|
if (this.client.status == .connected) {
|
|
|
|
|
this.client.flags.enable_offline_queue = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.client.flags.enable_auto_pipelining = false;
|
|
|
|
|
|
|
|
|
|
return &(this._subscription_ctx.?);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn isSubscriber(this: *const JSValkeyClient) bool {
|
|
|
|
|
return this._subscription_ctx.is_subscriber;
|
|
|
|
|
}
|
|
|
|
|
@@ -561,16 +537,16 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
|
|
|
|
|
// If already connected, resolve immediately
|
|
|
|
|
if (this.client.status == .connected) {
|
|
|
|
|
return jsc.JSPromise.resolvedPromiseValue(globalObject, js.helloGetCached(this_value) orelse .js_undefined);
|
|
|
|
|
return jsc.JSPromise.resolvedPromiseValue(globalObject, js.gc.hello.get(this_value) orelse .js_undefined);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (js.connectionPromiseGetCached(this_value)) |promise| {
|
|
|
|
|
if (js.gc.connectionPromise.get(this_value)) |promise| {
|
|
|
|
|
return promise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const promise_ptr = jsc.JSPromise.create(globalObject);
|
|
|
|
|
const promise = promise_ptr.toJS();
|
|
|
|
|
js.connectionPromiseSetCached(this_value, globalObject, promise);
|
|
|
|
|
js.gc.connectionPromise.set(this_value, globalObject, promise);
|
|
|
|
|
|
|
|
|
|
// If was manually closed, reset that flag
|
|
|
|
|
this.client.flags.is_manually_closed = false;
|
|
|
|
|
@@ -587,6 +563,7 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
event_loop.enter();
|
|
|
|
|
defer event_loop.exit();
|
|
|
|
|
promise_ptr.reject(globalObject, err_value);
|
|
|
|
|
js.gc.connectionPromise.set(this_value, globalObject, .zero);
|
|
|
|
|
return promise;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@@ -624,25 +601,25 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn getOnConnect(_: *JSValkeyClient, thisValue: JSValue, _: *jsc.JSGlobalObject) JSValue {
|
|
|
|
|
if (js.onconnectGetCached(thisValue)) |value| {
|
|
|
|
|
if (js.gc.onconnect.get(thisValue)) |value| {
|
|
|
|
|
return value;
|
|
|
|
|
}
|
|
|
|
|
return .js_undefined;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn setOnConnect(_: *JSValkeyClient, thisValue: JSValue, globalObject: *jsc.JSGlobalObject, value: JSValue) void {
|
|
|
|
|
js.onconnectSetCached(thisValue, globalObject, value);
|
|
|
|
|
js.gc.onconnect.set(thisValue, globalObject, value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn getOnClose(_: *JSValkeyClient, thisValue: JSValue, _: *jsc.JSGlobalObject) JSValue {
|
|
|
|
|
if (js.oncloseGetCached(thisValue)) |value| {
|
|
|
|
|
if (js.gc.onclose.get(thisValue)) |value| {
|
|
|
|
|
return value;
|
|
|
|
|
}
|
|
|
|
|
return .js_undefined;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn setOnClose(_: *JSValkeyClient, thisValue: JSValue, globalObject: *jsc.JSGlobalObject, value: JSValue) void {
|
|
|
|
|
js.oncloseSetCached(thisValue, globalObject, value);
|
|
|
|
|
js.gc.onclose.set(thisValue, globalObject, value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Safely add a timer with proper reference counting and event loop keepalive
|
|
|
|
|
@@ -810,24 +787,28 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
break :js_hello .js_undefined;
|
|
|
|
|
};
|
|
|
|
|
};
|
|
|
|
|
js.helloSetCached(this_value, globalObject, hello_value);
|
|
|
|
|
js.gc.hello.set(this_value, globalObject, hello_value);
|
|
|
|
|
// Call onConnect callback if defined by the user
|
|
|
|
|
if (js.onconnectGetCached(this_value)) |on_connect| {
|
|
|
|
|
if (js.gc.onconnect.get(this_value)) |on_connect| {
|
|
|
|
|
const js_value = this_value;
|
|
|
|
|
js_value.ensureStillAlive();
|
|
|
|
|
globalObject.queueMicrotask(on_connect, &[_]JSValue{ js_value, hello_value });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (js.connectionPromiseGetCached(this_value)) |promise| {
|
|
|
|
|
js.connectionPromiseSetCached(this_value, globalObject, .zero);
|
|
|
|
|
const js_promise = promise.asPromise().?;
|
|
|
|
|
if (this.client.flags.connection_promise_returns_client) {
|
|
|
|
|
debug("Resolving connection promise with client instance", .{});
|
|
|
|
|
js_promise.resolve(globalObject, this_value);
|
|
|
|
|
} else {
|
|
|
|
|
debug("Resolving connection promise with HELLO response", .{});
|
|
|
|
|
js_promise.resolve(globalObject, hello_value);
|
|
|
|
|
if (js.gc.connectionPromise.get(this_value)) |promise_value| {
|
|
|
|
|
const promise = promise_value.asPromise().?;
|
|
|
|
|
if (promise.status(this.client.vm.jsc_vm) == .pending) {
|
|
|
|
|
if (this.client.flags.connection_promise_returns_client) {
|
|
|
|
|
debug("Resolving connection promise with client instance", .{});
|
|
|
|
|
promise.resolve(globalObject, this_value);
|
|
|
|
|
} else {
|
|
|
|
|
debug("Resolving connection promise with HELLO response", .{});
|
|
|
|
|
promise.resolve(globalObject, hello_value);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
js.gc.connectionPromise.set(this_value, globalObject, .zero);
|
|
|
|
|
|
|
|
|
|
this.client.flags.connection_promise_returns_client = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -930,28 +911,31 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
|
|
|
|
|
const this_jsvalue = this.this_value.tryGet() orelse return;
|
|
|
|
|
this_jsvalue.ensureStillAlive();
|
|
|
|
|
defer this_jsvalue.ensureStillAlive();
|
|
|
|
|
|
|
|
|
|
// Create an error value
|
|
|
|
|
const error_value = protocol.valkeyErrorToJS(globalObject, "Connection closed", protocol.RedisError.ConnectionClosed);
|
|
|
|
|
|
|
|
|
|
const loop = this.client.vm.eventLoop();
|
|
|
|
|
loop.enter();
|
|
|
|
|
defer loop.exit();
|
|
|
|
|
{
|
|
|
|
|
loop.enter();
|
|
|
|
|
defer loop.exit();
|
|
|
|
|
|
|
|
|
|
if (!this_jsvalue.isUndefined()) {
|
|
|
|
|
if (js.connectionPromiseGetCached(this_jsvalue)) |promise| {
|
|
|
|
|
js.connectionPromiseSetCached(this_jsvalue, globalObject, .zero);
|
|
|
|
|
promise.asPromise().?.reject(globalObject, error_value);
|
|
|
|
|
if (js.gc.connectionPromise.get(this_jsvalue)) |promise_value| {
|
|
|
|
|
const promise = promise_value.asPromise().?;
|
|
|
|
|
if (promise.status(this.client.vm.jsc_vm) == .pending) {
|
|
|
|
|
promise.reject(globalObject, error_value);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Call onClose callback if it exists
|
|
|
|
|
if (js.oncloseGetCached(this_jsvalue)) |on_close| {
|
|
|
|
|
_ = on_close.call(
|
|
|
|
|
globalObject,
|
|
|
|
|
this_jsvalue,
|
|
|
|
|
&[_]JSValue{error_value},
|
|
|
|
|
) catch |e| globalObject.reportActiveExceptionAsUnhandled(e);
|
|
|
|
|
// Call onClose callback if it exists
|
|
|
|
|
if (js.gc.onclose.get(this_jsvalue)) |on_close| {
|
|
|
|
|
_ = on_close.call(
|
|
|
|
|
globalObject,
|
|
|
|
|
this_jsvalue,
|
|
|
|
|
&[_]JSValue{error_value},
|
|
|
|
|
) catch |e| globalObject.reportActiveExceptionAsUnhandled(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -967,7 +951,7 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
pub fn failWithJSValue(this: *JSValkeyClient, value: JSValue) void {
|
|
|
|
|
const this_value = this.this_value.tryGet() orelse return;
|
|
|
|
|
const globalObject = this.globalObject;
|
|
|
|
|
if (js.oncloseGetCached(this_value)) |on_close| {
|
|
|
|
|
if (js.gc.onclose.get(this_value)) |on_close| {
|
|
|
|
|
const loop = this.client.vm.eventLoop();
|
|
|
|
|
loop.enter();
|
|
|
|
|
defer loop.exit();
|
|
|
|
|
@@ -1181,12 +1165,10 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
// This is a mess beyond belief and it is incredibly fragile.
|
|
|
|
|
const has_pending_commands = this.client.hasAnyPendingCommands();
|
|
|
|
|
|
|
|
|
|
// isDeletable may throw an exception, and if it does, we have to assume
|
|
|
|
|
// that the object still has references. Best we can do is hope nothing
|
|
|
|
|
// catastrophic happens.
|
|
|
|
|
const subs_deletable: bool = !(this._subscription_ctx.hasSubscriptions(this.globalObject) catch false);
|
|
|
|
|
const has_active_subscriptions: bool = !this.client.flags.finalized and
|
|
|
|
|
(this._subscription_ctx.is_subscriber and this._subscription_ctx.hasSubscriptions(this.globalObject));
|
|
|
|
|
|
|
|
|
|
const has_activity = has_pending_commands or !subs_deletable or this.client.flags.is_reconnecting;
|
|
|
|
|
const has_activity = has_pending_commands or has_active_subscriptions or this.client.flags.is_reconnecting;
|
|
|
|
|
|
|
|
|
|
// There's a couple cases to handle here:
|
|
|
|
|
if (has_activity) {
|
|
|
|
|
@@ -1207,18 +1189,17 @@ pub const JSValkeyClient = struct {
|
|
|
|
|
// object.
|
|
|
|
|
switch (this.client.status) {
|
|
|
|
|
.connecting, .connected => {
|
|
|
|
|
// Whenever we're connected, we need to keep the object alive.
|
|
|
|
|
//
|
|
|
|
|
// TODO(markovejnovic): This is a leak.
|
|
|
|
|
// Note this is an intentional leak. Unless the user manually
|
|
|
|
|
// closes the connection, the object will stay alive forever,
|
|
|
|
|
// even if it falls out of scope. This is kind of stupid, since
|
|
|
|
|
// if the object is out of scope, and isn't subscribed upon,
|
|
|
|
|
// how exactly is the user going to call anything on the object?
|
|
|
|
|
//
|
|
|
|
|
// It is 100% safe to drop the strong reference there and let
|
|
|
|
|
// the object be GC'd, but we're not doing that now.
|
|
|
|
|
debug("upgrading this_value since we are connected/connecting", .{});
|
|
|
|
|
|
|
|
|
|
// Since we provide the RedisClient in the `this` value of
|
|
|
|
|
// callbacks, we must keep the this value alive.
|
|
|
|
|
//
|
|
|
|
|
// And for subscriptions, you want the connection to stay alive.
|
|
|
|
|
// so that you can continue to receive messages. That is the
|
|
|
|
|
// purpose of a subscription.
|
|
|
|
|
//
|
|
|
|
|
// And for other cases, where there are in-flight promises
|
|
|
|
|
// we want to make sure those stay alive while they're needed.
|
|
|
|
|
this.this_value.upgrade(this.globalObject);
|
|
|
|
|
},
|
|
|
|
|
.disconnected, .failed => {
|
|
|
|
|
@@ -1330,12 +1311,13 @@ fn SocketHandler(comptime ssl: bool) type {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn onHandshake_(this: *JSValkeyClient, _: anytype, success: i32, ssl_error: uws.us_bun_verify_error_t) void {
|
|
|
|
|
debug("onHandshake: {d} error={d} reason={s} code={s}", .{
|
|
|
|
|
success,
|
|
|
|
|
ssl_error.error_no,
|
|
|
|
|
if (ssl_error.reason != null) bun.span(ssl_error.reason[0..bun.len(ssl_error.reason) :0]) else "no reason",
|
|
|
|
|
if (ssl_error.code != null) bun.span(ssl_error.code[0..bun.len(ssl_error.code) :0]) else "no code",
|
|
|
|
|
});
|
|
|
|
|
if (comptime bun.Environment.enable_logs)
|
|
|
|
|
debug("onHandshake: {d} error={d} reason={s} code={s}", .{
|
|
|
|
|
success,
|
|
|
|
|
ssl_error.error_no,
|
|
|
|
|
if (ssl_error.reason != null) bun.span(ssl_error.reason[0..bun.len(ssl_error.reason) :0]) else "no reason",
|
|
|
|
|
if (ssl_error.code != null) bun.span(ssl_error.code[0..bun.len(ssl_error.code) :0]) else "no code",
|
|
|
|
|
});
|
|
|
|
|
const handshake_success = if (success == 1) true else false;
|
|
|
|
|
this.ref();
|
|
|
|
|
defer this.deref();
|
|
|
|
|
@@ -1369,7 +1351,9 @@ fn SocketHandler(comptime ssl: bool) type {
|
|
|
|
|
pub const onHandshake = if (ssl) onHandshake_ else null;
|
|
|
|
|
|
|
|
|
|
pub fn onClose(this: *JSValkeyClient, _: SocketType, _: i32, _: ?*anyopaque) void {
|
|
|
|
|
// No need to deref since this.client.onClose() invokes onValkeyClose which does the deref.
|
|
|
|
|
// We must ref/deref when it's a TLS connection failure.
|
|
|
|
|
this.ref();
|
|
|
|
|
defer this.deref();
|
|
|
|
|
|
|
|
|
|
debug("Socket closed.", .{});
|
|
|
|
|
|
|
|
|
|
@@ -1383,7 +1367,7 @@ fn SocketHandler(comptime ssl: bool) type {
|
|
|
|
|
pub fn onEnd(this: *JSValkeyClient, socket: SocketType) void {
|
|
|
|
|
_ = this;
|
|
|
|
|
_ = socket;
|
|
|
|
|
// Half-opened sockets are not allowed.
|
|
|
|
|
@panic("Assertion failure: onEnd should not be called for Valkey client");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn onConnectError(this: *JSValkeyClient, _: SocketType, _: i32) void {
|
|
|
|
|
|