Compare commits

...

2 Commits

Author SHA1 Message Date
Marko Vejnovic
a46601a564 Improve valkey.zig lifetimes 2025-09-10 22:20:24 -07:00
Marko Vejnovic
1c893931de Hotfix socket pointer reset 2025-09-10 00:02:46 -07:00
4 changed files with 20 additions and 65 deletions

View File

@@ -9,7 +9,6 @@ export default [
configurable: false,
JSType: "0b11101110",
memoryCost: true,
hasPendingActivity: true,
proto: {
connected: {
getter: "getConnected",
@@ -229,6 +228,6 @@ export default [
punsubscribe: { fn: "punsubscribe" },
pubsub: { fn: "pubsub" },
},
values: ["onconnect", "onclose", "connectionPromise", "hello", "subscriptionCallbackMap"],
values: ["onconnect", "onclose", "connectionPromise", "hello", "subscriptionCallbackMap" ],
}),
];

View File

@@ -25,14 +25,14 @@ pub const SubscriptionCtx = struct {
}
/// Get the total number of channels that this subscription context is subscribed to.
pub fn subscriptionCount(this: *Self, globalObject: *jsc.JSGlobalObject) usize {
pub fn channelsSubscribedToCount(this: *Self, globalObject: *jsc.JSGlobalObject) usize {
return this.subscriptionCallbackMap().size(globalObject);
}
/// Test whether this context has any subscriptions. It is mandatory to
/// guard deinit with this function.
pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bool {
return this.subscriptionCount(globalObject) > 0;
return this.channelsSubscribedToCount(globalObject) > 0;
}
pub fn clearReceiveHandlers(
@@ -43,7 +43,6 @@ pub const SubscriptionCtx = struct {
const map = this.subscriptionCallbackMap();
if (map.remove(globalObject, channelName)) {
this._parent.channel_subscription_count -= 1;
this._parent.updateHasPendingActivity();
}
}
@@ -102,7 +101,6 @@ pub const SubscriptionCtx = struct {
map.set(globalObject, channelName, updated_array);
} else {
this._parent.channel_subscription_count -= 1;
this._parent.updateHasPendingActivity();
}
return new_length;
@@ -144,7 +142,6 @@ pub const SubscriptionCtx = struct {
// Update subscription count if this is a new channel
if (is_new_channel) {
this._parent.channel_subscription_count += 1;
this._parent.updateHasPendingActivity();
}
}
@@ -195,7 +192,6 @@ pub const SubscriptionCtx = struct {
pub fn deinit(this: *Self) void {
this._parent.channel_subscription_count = 0;
this._parent.updateHasPendingActivity();
ParentJS.gc.set(
.subscriptionCallbackMap,
@@ -215,7 +211,6 @@ pub const JSValkeyClient = struct {
_subscription_ctx: ?SubscriptionCtx,
channel_subscription_count: u32 = 0,
has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
timer: Timer.EventLoopTimer = .{
.tag = .ValkeyConnectionTimeout,
@@ -544,7 +539,6 @@ pub const JSValkeyClient = struct {
},
.failed => {
this.client.status = .disconnected;
this.updateHasPendingActivity();
this.client.flags.is_reconnecting = true;
this.client.retry_attempts = 0;
this.reconnect();
@@ -715,7 +709,6 @@ pub const JSValkeyClient = struct {
defer this.deref();
this.client.status = .connecting;
this.updateHasPendingActivity();
// Ref the poll to keep event loop alive during connection
this.poll_ref.disable();
@@ -927,33 +920,6 @@ pub const JSValkeyClient = struct {
}
}
pub fn hasPendingActivity(this: *JSValkeyClient) bool {
// TODO(markovejnovic): Could this be .monotonic? My intuition says
// yes, because none of the things that may be freed will actually be
// read. The pointers don't move, so it should be safe, but I've
// decided here to be conservative.
return this.has_pending_activity.load(.acquire);
}
pub fn updateHasPendingActivity(this: *JSValkeyClient) void {
if (this.client.hasAnyPendingCommands()) {
this.has_pending_activity.store(true, .release);
return;
}
if (this.channel_subscription_count > 0) {
this.has_pending_activity.store(true, .release);
return;
}
if (this.client.status != .connected and this.client.status != .disconnected) {
this.has_pending_activity.store(true, .release);
return;
}
this.has_pending_activity.store(false, .release);
}
pub fn finalize(this: *JSValkeyClient) void {
// Since this.stopTimers impacts the reference count potentially, we
// need to ref/unref here as well.
@@ -1083,13 +1049,17 @@ pub const JSValkeyClient = struct {
/// Keep the event loop alive, or don't keep it alive
pub fn updatePollRef(this: *JSValkeyClient) void {
if (!this.client.hasAnyPendingCommands() and this.client.status == .connected) {
const pending_commands = this.client.hasAnyPendingCommands();
const have_subs = if (this._subscription_ctx) |*ctx| ctx.hasSubscriptions(this.globalObject) else false;
const has_activity = pending_commands or have_subs;
if (!has_activity and this.client.status == .connected) {
this.poll_ref.unref(this.client.vm);
// If we don't have any pending commands and we're connected, we don't need to keep the object alive.
if (this.this_value.tryGet()) |value| {
this.this_value.setWeak(value);
}
} else if (this.client.hasAnyPendingCommands()) {
} else if (has_activity) {
this.poll_ref.ref(this.client.vm);
// If we have pending commands, we need to keep the object alive.
if (this.this_value == .weak) {
@@ -1207,7 +1177,6 @@ fn SocketHandler(comptime ssl: bool) type {
loop.enter();
defer loop.exit();
this.client.status = .failed;
this.updateHasPendingActivity();
this.client.flags.is_manually_closed = true;
this.client.failWithJSValue(this.globalObject, ssl_error.toJS(this.globalObject));
this.client.close();
@@ -1222,6 +1191,7 @@ fn SocketHandler(comptime ssl: bool) type {
pub fn onClose(this: *JSValkeyClient, _: SocketType, _: i32, _: ?*anyopaque) void {
// Ensure the socket pointer is updated.
this.client.socket = .{ .SocketTCP = .detached };
this.client.onClose();
}

View File

@@ -279,7 +279,6 @@ pub const ValkeyClient = struct {
.meta = command.meta,
.promise = command.promise,
}) catch |err| bun.handleOom(err);
this.parent().updateHasPendingActivity();
total += 1;
total_bytelength += command.serialized_data.len;
@@ -290,7 +289,6 @@ pub const ValkeyClient = struct {
bun.handleOom(this.write_buffer.byte_list.ensureUnusedCapacity(this.allocator, total_bytelength));
for (pipelineable_commands) |*command| {
bun.handleOom(this.write_buffer.write(this.allocator, command.serialized_data));
this.parent().updateHasPendingActivity();
// Free the serialized data since we've copied it to the write buffer
this.allocator.free(command.serialized_data);
}
@@ -376,7 +374,6 @@ pub const ValkeyClient = struct {
if (wrote > 0) {
this.write_buffer.consume(@intCast(wrote));
}
this.parent().updateHasPendingActivity();
return this.write_buffer.len() > 0;
}
@@ -438,7 +435,6 @@ pub const ValkeyClient = struct {
pub fn failWithJSValue(this: *ValkeyClient, globalThis: *jsc.JSGlobalObject, jsvalue: jsc.JSValue) void {
this.status = .failed;
rejectAllPendingCommands(&this.in_flight, &this.queue, globalThis, this.allocator, jsvalue);
this.parent().updateHasPendingActivity();
if (!this.connectionReady()) {
this.flags.is_manually_closed = true;
@@ -487,7 +483,6 @@ pub const ValkeyClient = struct {
debug("reconnect in {d}ms (attempt {d}/{d})", .{ delay_ms, this.retry_attempts, this.max_retries });
this.status = .disconnected;
this.parent().updateHasPendingActivity();
this.flags.is_reconnecting = true;
this.flags.is_authenticated = false;
this.flags.is_selecting_db_internal = false;
@@ -524,7 +519,6 @@ pub const ValkeyClient = struct {
_ = this.drain();
}
this.parent().updateHasPendingActivity();
}
_ = this.flushData();
@@ -537,7 +531,6 @@ pub const ValkeyClient = struct {
// Path 1: Buffer already has data, append and process from buffer
if (this.read_buffer.remaining().len > 0) {
this.read_buffer.write(this.allocator, data) catch @panic("failed to write to read buffer");
this.parent().updateHasPendingActivity();
// Process as many complete messages from the buffer as possible
while (true) {
@@ -570,7 +563,6 @@ pub const ValkeyClient = struct {
}
this.read_buffer.consume(@truncate(bytes_consumed));
this.parent().updateHasPendingActivity();
var value_to_handle = value; // Use temp var for defer
this.handleResponse(&value_to_handle) catch |err| {
@@ -662,7 +654,7 @@ pub const ValkeyClient = struct {
.Push => |push| {
const p = this.parent();
const subs_ctx = p.getOrCreateSubscriptionCtxEnteringSubscriptionMode();
const sub_count = subs_ctx.subscriptionCount(globalThis);
const sub_count = subs_ctx.channelsSubscribedToCount(globalThis);
if (std.mem.eql(u8, push.kind, "subscribe")) {
this.onValkeySubscribe(value);
@@ -702,7 +694,6 @@ pub const ValkeyClient = struct {
.SimpleString => |str| {
if (std.mem.eql(u8, str, "OK")) {
this.status = .connected;
this.parent().updateHasPendingActivity();
this.flags.is_authenticated = true;
this.onValkeyConnect(value);
return;
@@ -736,7 +727,6 @@ pub const ValkeyClient = struct {
// Authentication successful via HELLO
this.status = .connected;
this.parent().updateHasPendingActivity();
this.flags.is_authenticated = true;
this.onValkeyConnect(value);
return;
@@ -964,7 +954,6 @@ pub const ValkeyClient = struct {
.meta = offline_cmd.meta,
.promise = offline_cmd.promise,
}) catch |err| bun.handleOom(err);
this.parent().updateHasPendingActivity();
const data = offline_cmd.serialized_data;
if (this.connectionReady() and this.write_buffer.remaining().len == 0) {
@@ -977,7 +966,6 @@ pub const ValkeyClient = struct {
if (unwritten.len > 0) {
// Handle incomplete write.
bun.handleOom(this.write_buffer.write(this.allocator, unwritten));
this.parent().updateHasPendingActivity();
}
return true;
@@ -1042,7 +1030,6 @@ pub const ValkeyClient = struct {
// Add to queue with command type
try this.in_flight.writeItem(cmd_pair);
this.parent().updateHasPendingActivity();
_ = this.flushData();
}
@@ -1087,7 +1074,6 @@ pub const ValkeyClient = struct {
this.unregisterAutoFlusher();
if (this.status == .connected or this.status == .connecting) {
this.status = .disconnected;
this.parent().updateHasPendingActivity();
this.close();
}
}
@@ -1100,7 +1086,6 @@ pub const ValkeyClient = struct {
/// Write data to the socket buffer
fn write(this: *ValkeyClient, data: []const u8) !usize {
try this.write_buffer.write(this.allocator, data);
this.parent().updateHasPendingActivity();
return data.len;
}

View File

@@ -11,17 +11,17 @@ import {
} from "./test-utils";
describe.skipIf(!isEnabled)("Valkey Redis Client", () => {
beforeEach(async () => {
if (ctx.redis?.connected) {
ctx.redis.close?.();
}
ctx.redis = createClient(ConnectionType.TCP);
//beforeEach(async () => {
// if (ctx.redis?.connected) {
// ctx.redis.close?.();
// }
// ctx.redis = createClient(ConnectionType.TCP);
await ctx.redis.send("FLUSHALL", ["SYNC"]);
});
// await ctx.redis.send("FLUSHALL", ["SYNC"]);
//});
const connectedRedis = async () => {
const redis = new RedisClient(DEFAULT_REDIS_URL);
const redis = new RedisClient("redis://localhost:6379");
await redis.connect();
return redis;
};
@@ -595,6 +595,7 @@ describe.skipIf(!isEnabled)("Valkey Redis Client", () => {
expect(messageCount1).toBe(1);
expect(messageCount2).toBe(1);
console.log("Unsubscribing listener2");
await subscriber.unsubscribe(channel, listener2);
await redis.publish(channel, "message1");