Implement Redis PubSub functionality for Valkey client

Add pub/sub support with .on()/.off() methods, RESP3 push message handling, and subscription management. Includes comprehensive test suite covering regular subscriptions, pattern subscriptions, and callback management.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Claude Bot
2025-07-13 04:15:16 +00:00
parent f24e8cb98a
commit 0662c2f52d
4 changed files with 636 additions and 1 deletions

View File

@@ -226,7 +226,15 @@ export default [
unsubscribe: { fn: "unsubscribe" },
punsubscribe: { fn: "punsubscribe" },
pubsub: { fn: "pubsub" },
on: {
fn: "on",
length: 2,
},
off: {
fn: "off",
length: 2,
},
},
values: ["onconnect", "onclose", "connectionPromise", "hello"],
values: ["onconnect", "onclose", "connectionPromise", "hello", "subscriptions"],
}),
];

View File

@@ -477,6 +477,63 @@ pub const JSValkeyClient = struct {
pub fn onValkeyTimeout(this: *JSValkeyClient) void {
this.clientFail("Connection timeout", protocol.RedisError.ConnectionClosed);
}
/// Handle PubSub messages from Valkey
pub fn onPubSubMessage(this: *JSValkeyClient, kind: []const u8, pattern: []const u8, channel: []const u8, message: []const u8) void {
const this_value = this.this_value.tryGet() orelse return;
const globalObject = this.globalObject;
const loop = this.client.vm.eventLoop();
loop.enter();
defer loop.exit();
// Get the subscriptions object
if (js.subscriptionsGetCached(this_value)) |subscriptions| {
// For pattern messages, use the pattern as the key, otherwise use the channel
const callback_key = if (pattern.len > 0) pattern else channel;
// Get callbacks for this channel/pattern
if (subscriptions.get(globalObject, callback_key) catch .js_undefined) |callbacks_value| {
if (callbacks_value.isArray()) {
const callbacks_array = callbacks_value;
const length = callbacks_array.getLength(globalObject) catch return;
// Create event data
var event_obj = JSC.JSValue.createEmptyObjectWithNullPrototype(globalObject);
// Add event type
const kind_str = bun.String.createUTF8ForJS(globalObject, kind);
event_obj.put(globalObject, "type", kind_str);
// Add channel
const channel_str = bun.String.createUTF8ForJS(globalObject, channel);
event_obj.put(globalObject, "channel", channel_str);
// Add message
const message_str = bun.String.createUTF8ForJS(globalObject, message);
event_obj.put(globalObject, "message", message_str);
// Add pattern if it's a pattern message
if (pattern.len > 0) {
const pattern_str = bun.String.createUTF8ForJS(globalObject, pattern);
event_obj.put(globalObject, "pattern", pattern_str);
}
// Call all callbacks for this channel
for (0..@intCast(length)) |i| {
const callback = callbacks_array.getIndex(globalObject, @intCast(i)) catch continue;
if (!callback.isEmptyOrUndefinedOrNull()) {
if (callback.isCallable()) {
_ = callback.call(globalObject, this_value, &[_]JSValue{event_obj}) catch |e| {
globalObject.reportActiveExceptionAsUnhandled(e);
};
}
}
}
}
}
}
}
pub fn clientFail(this: *JSValkeyClient, message: []const u8, err: protocol.RedisError) void {
this.client.fail(message, err);
@@ -706,6 +763,113 @@ pub const JSValkeyClient = struct {
pub const zrevrank = fns.zrevrank;
pub const zscore = fns.zscore;
/// Add event listener for PubSub channels
pub fn on(this: *JSValkeyClient, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue {
const arguments = callframe.arguments();
if (arguments.len < 2) {
return globalObject.throwInvalidArguments("Expected channel and callback arguments", .{});
}
const channel_arg = arguments[0];
const callback_arg = arguments[1];
if (!callback_arg.isCallable()) {
return globalObject.throwInvalidArguments("Expected callback to be a function", .{});
}
const channel_str = try channel_arg.toBunString(globalObject);
defer channel_str.deref();
const this_value = callframe.this();
// Get or create subscriptions object
var subscriptions = if (js.subscriptionsGetCached(this_value)) |existing|
existing
else blk: {
const new_subscriptions = JSC.JSValue.createEmptyObjectWithNullPrototype(globalObject);
js.subscriptionsSetCached(this_value, globalObject, new_subscriptions);
break :blk new_subscriptions;
};
// Get or create callbacks array for this channel
const channel_slice = channel_str.toUTF8WithoutRef(bun.default_allocator);
defer channel_slice.deinit();
var callbacks_array = if (subscriptions.get(globalObject, channel_slice.slice()) catch .js_undefined) |existing| existing else blk: {
const new_array = try JSC.JSValue.createEmptyArray(globalObject, 0);
subscriptions.put(globalObject, channel_slice.slice(), new_array);
break :blk new_array;
};
// Add callback to array if it's not already there
if (callbacks_array.isArray()) {
const length = try callbacks_array.getLength(globalObject);
callbacks_array.putIndex(globalObject, @intCast(length), callback_arg) catch {
return globalObject.throwOutOfMemory();
};
}
// Disable pipelining when pubsub is first used
this.client.flags.auto_pipelining = false;
return .js_undefined;
}
/// Remove event listener for PubSub channels
pub fn off(_: *JSValkeyClient, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue {
const arguments = callframe.arguments();
if (arguments.len < 1) {
return globalObject.throwInvalidArguments("Expected channel argument", .{});
}
const channel_arg = arguments[0];
const channel_str = try channel_arg.toBunString(globalObject);
defer channel_str.deref();
const this_value = callframe.this();
// Get subscriptions object
if (js.subscriptionsGetCached(this_value)) |subscriptions| {
const channel_slice = channel_str.toUTF8WithoutRef(bun.default_allocator);
defer channel_slice.deinit();
if (arguments.len >= 2 and arguments[1].isCallable()) {
// Remove specific callback
const callback_to_remove = arguments[1];
if (subscriptions.get(globalObject, channel_slice.slice()) catch .js_undefined) |callbacks_value| {
if (callbacks_value.isArray()) {
const callbacks_array = callbacks_value;
const length = try callbacks_array.getLength(globalObject);
// Find and remove the specific callback
var new_callbacks = std.ArrayList(JSValue).init(bun.default_allocator);
defer new_callbacks.deinit();
for (0..@intCast(length)) |i| {
const callback = callbacks_array.getIndex(globalObject, @intCast(i)) catch continue;
if (!callback.isEmptyOrUndefinedOrNull()) {
if (!(callback.isSameValue(callback_to_remove, globalObject) catch false)) {
new_callbacks.append(callback) catch return globalObject.throwOutOfMemory();
}
}
}
// Create new array with remaining callbacks
const new_array = try JSC.JSValue.createEmptyArray(globalObject, new_callbacks.items.len);
for (new_callbacks.items, 0..) |callback, i| {
new_array.putIndex(globalObject, @intCast(i), callback) catch return globalObject.throwOutOfMemory();
}
subscriptions.put(globalObject, channel_slice.slice(), new_array);
}
}
} else {
// Remove all callbacks for the channel
subscriptions.put(globalObject, channel_slice.slice(), .js_undefined);
}
}
return .js_undefined;
}
const fns = @import("./js_valkey_functions.zig");
};

View File

@@ -170,6 +170,9 @@ pub const ValkeyClient = struct {
auto_flusher: AutoFlusher = .{},
vm: *JSC.VirtualMachine,
// PubSub callback for handling subscription messages
pubsub_callback: ?*const fn (*ValkeyClient, channel: []const u8, message: []const u8) void = null,
/// Clean up resources used by the Valkey client
pub fn deinit(this: *@This(), globalObjectOrFinalizing: ?*JSC.JSGlobalObject) void {
@@ -662,9 +665,122 @@ pub const ValkeyClient = struct {
}
}
/// Handle Push messages for PubSub
fn handlePushMessage(this: *ValkeyClient, push: *protocol.Push) !void {
debug("handlePushMessage: kind={s}, data_len={d}", .{ push.kind, push.data.len });
// Handle different push message types
if (std.mem.eql(u8, push.kind, "message") or
std.mem.eql(u8, push.kind, "pmessage") or
std.mem.eql(u8, push.kind, "smessage")) {
// Pubsub message format: [kind, channel, message] or [kind, pattern, channel, message]
if (push.data.len >= 2) {
var channel: []const u8 = "";
var message: []const u8 = "";
var pattern: []const u8 = "";
if (std.mem.eql(u8, push.kind, "pmessage") and push.data.len >= 3) {
// Pattern message: [pattern, channel, message]
switch (push.data[0]) {
.SimpleString => |str| {
pattern = str;
},
.BulkString => |maybe_str| {
if (maybe_str) |str| {
pattern = str;
}
},
else => return,
}
switch (push.data[1]) {
.SimpleString => |str| {
channel = str;
},
.BulkString => |maybe_str| {
if (maybe_str) |str| {
channel = str;
}
},
else => return,
}
switch (push.data[2]) {
.SimpleString => |str| {
message = str;
},
.BulkString => |maybe_str| {
if (maybe_str) |str| {
message = str;
}
},
else => return,
}
} else {
// Regular message: [channel, message]
switch (push.data[0]) {
.SimpleString => |str| {
channel = str;
},
.BulkString => |maybe_str| {
if (maybe_str) |str| {
channel = str;
}
},
else => return,
}
switch (push.data[1]) {
.SimpleString => |str| {
message = str;
},
.BulkString => |maybe_str| {
if (maybe_str) |str| {
message = str;
}
},
else => return,
}
}
// Call the JavaScript callback
this.parent().onPubSubMessage(push.kind, pattern, channel, message);
}
} else if (std.mem.eql(u8, push.kind, "subscribe") or
std.mem.eql(u8, push.kind, "psubscribe") or
std.mem.eql(u8, push.kind, "ssubscribe") or
std.mem.eql(u8, push.kind, "unsubscribe") or
std.mem.eql(u8, push.kind, "punsubscribe") or
std.mem.eql(u8, push.kind, "sunsubscribe")) {
// Subscription acknowledgment: [channel, subscription_count]
debug("Subscription acknowledgment: {s}", .{push.kind});
// Resolve the pending subscription command
var pair = this.in_flight.readItem() orelse {
debug("Received subscription acknowledgment but no promise in queue", .{});
return;
};
const globalThis = this.globalObject();
const loop = this.vm.eventLoop();
loop.enter();
defer loop.exit();
// Create a RESP value from the push data for the promise resolution
var resp_value = protocol.RESPValue{ .Array = push.data };
pair.promise.resolve(globalThis, &resp_value);
}
}
/// Handle Valkey protocol response
fn handleResponse(this: *ValkeyClient, value: *protocol.RESPValue) !void {
debug("onData() {any}", .{value.*});
// Handle Push messages for PubSub
if (value.* == .Push) {
try this.handlePushMessage(&value.Push);
return;
}
// Special handling for the initial HELLO response
if (!this.flags.is_authenticated) {
this.handleHelloResponse(value);

View File

@@ -0,0 +1,347 @@
import { test, expect, describe, beforeAll, afterAll } from "bun:test";
import { RedisClient } from "bun";
import { bunEnv } from "harness";
// Skip tests if no Redis/Valkey server is available
const skipIfNoRedis = test; // Always run tests locally
describe("Valkey PubSub functionality", () => {
let publisher: RedisClient;
let subscriber: RedisClient;
beforeAll(async () => {
// Create two separate clients - one for publishing, one for subscribing
publisher = new RedisClient(process.env.REDIS_URL || "redis://localhost:6379");
subscriber = new RedisClient(process.env.REDIS_URL || "redis://localhost:6379");
await publisher.connect();
await subscriber.connect();
});
afterAll(async () => {
if (publisher?.connected) {
await publisher.close();
}
if (subscriber?.connected) {
await subscriber.close();
}
});
skipIfNoRedis("should register and receive messages on channels", async () => {
const messages: any[] = [];
// Register event listener
subscriber.on("test-channel", (event: any) => {
messages.push(event);
});
// Subscribe to the channel
await subscriber.subscribe("test-channel");
// Give a moment for subscription to be processed
await Bun.sleep(10);
// Publish a message
await publisher.publish("test-channel", "Hello, World!");
// Wait for message to be received
await Bun.sleep(50);
expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({
type: "message",
channel: "test-channel",
message: "Hello, World!"
});
// Clean up
await subscriber.unsubscribe("test-channel");
});
skipIfNoRedis("should handle multiple subscribers on same channel", async () => {
const messages1: any[] = [];
const messages2: any[] = [];
// Register multiple event listeners
subscriber.on("multi-channel", (event: any) => {
messages1.push(event);
});
subscriber.on("multi-channel", (event: any) => {
messages2.push(event);
});
// Subscribe to the channel
await subscriber.subscribe("multi-channel");
// Give a moment for subscription to be processed
await Bun.sleep(10);
// Publish a message
await publisher.publish("multi-channel", "Multi-subscriber test");
// Wait for message to be received
await Bun.sleep(50);
// Both listeners should receive the message
expect(messages1).toHaveLength(1);
expect(messages2).toHaveLength(1);
expect(messages1[0]).toMatchObject({
type: "message",
channel: "multi-channel",
message: "Multi-subscriber test"
});
expect(messages2[0]).toMatchObject({
type: "message",
channel: "multi-channel",
message: "Multi-subscriber test"
});
// Clean up
await subscriber.unsubscribe("multi-channel");
});
skipIfNoRedis("should handle pattern subscriptions with psubscribe", async () => {
const messages: any[] = [];
// Register event listener for pattern
subscriber.on("news.*", (event: any) => {
messages.push(event);
});
// Subscribe to pattern
await subscriber.psubscribe("news.*");
// Give a moment for subscription to be processed
await Bun.sleep(10);
// Publish messages to matching channels
await publisher.publish("news.sports", "Sports update");
await publisher.publish("news.weather", "Weather update");
await publisher.publish("other.topic", "Should not match");
// Wait for messages to be received
await Bun.sleep(100);
expect(messages).toHaveLength(2);
expect(messages[0]).toMatchObject({
type: "pmessage",
pattern: "news.*",
message: "Sports update"
});
expect(messages[1]).toMatchObject({
type: "pmessage",
pattern: "news.*",
message: "Weather update"
});
// Clean up
await subscriber.punsubscribe("news.*");
});
skipIfNoRedis("should remove specific callbacks with off()", async () => {
const messages1: any[] = [];
const messages2: any[] = [];
const callback1 = (event: any) => {
messages1.push(event);
};
const callback2 = (event: any) => {
messages2.push(event);
};
// Register both callbacks
subscriber.on("removal-test", callback1);
subscriber.on("removal-test", callback2);
// Subscribe to the channel
await subscriber.subscribe("removal-test");
// Give a moment for subscription to be processed
await Bun.sleep(10);
// Publish first message - both should receive
await publisher.publish("removal-test", "Message 1");
await Bun.sleep(50);
expect(messages1).toHaveLength(1);
expect(messages2).toHaveLength(1);
// Remove first callback
subscriber.off("removal-test", callback1);
// Publish second message - only callback2 should receive
await publisher.publish("removal-test", "Message 2");
await Bun.sleep(50);
expect(messages1).toHaveLength(1); // Still 1
expect(messages2).toHaveLength(2); // Now 2
// Clean up
await subscriber.unsubscribe("removal-test");
});
skipIfNoRedis("should remove all callbacks for channel with off(channel)", async () => {
const messages: any[] = [];
// Register multiple callbacks
subscriber.on("remove-all-test", (event: any) => {
messages.push({ id: 1, ...event });
});
subscriber.on("remove-all-test", (event: any) => {
messages.push({ id: 2, ...event });
});
// Subscribe to the channel
await subscriber.subscribe("remove-all-test");
// Give a moment for subscription to be processed
await Bun.sleep(10);
// Publish first message - both should receive
await publisher.publish("remove-all-test", "Before removal");
await Bun.sleep(50);
expect(messages).toHaveLength(2);
// Remove all callbacks for the channel
subscriber.off("remove-all-test");
// Publish second message - none should receive
await publisher.publish("remove-all-test", "After removal");
await Bun.sleep(50);
expect(messages).toHaveLength(2); // Still 2, no new messages
// Clean up
await subscriber.unsubscribe("remove-all-test");
});
skipIfNoRedis("should handle binary data in pubsub messages", async () => {
const messages: any[] = [];
// Register event listener
subscriber.on("binary-channel", (event: any) => {
messages.push(event);
});
// Subscribe to the channel
await subscriber.subscribe("binary-channel");
// Give a moment for subscription to be processed
await Bun.sleep(10);
// Publish binary data
const binaryData = new Uint8Array([0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x57, 0x6f, 0x72, 0x6c, 0x64]);
await publisher.publish("binary-channel", binaryData);
// Wait for message to be received
await Bun.sleep(50);
expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({
type: "message",
channel: "binary-channel"
});
expect(typeof messages[0].message).toBe("string");
// Clean up
await subscriber.unsubscribe("binary-channel");
});
skipIfNoRedis("should handle subscription acknowledgments", async () => {
// Subscribe and check that the promise resolves
const subscribeResult = await subscriber.subscribe("ack-test");
// Should return subscription count or similar acknowledgment
expect(subscribeResult).toBeDefined();
// Clean up
await subscriber.unsubscribe("ack-test");
});
skipIfNoRedis("should disable pipelining when using pubsub", async () => {
const testClient = new RedisClient(process.env.REDIS_URL || "redis://localhost:6379");
await testClient.connect();
// Register a pubsub listener (this should disable pipelining)
testClient.on("pipeline-test", () => {});
// The client should now have pipelining disabled
// This is more of an implementation detail that would be verified
// through internal client state, but we can at least verify
// that pubsub works correctly even with this change
await testClient.subscribe("pipeline-test");
await testClient.unsubscribe("pipeline-test");
await testClient.close();
});
skipIfNoRedis("should handle multiple channels in single subscription", async () => {
const messages: Record<string, any[]> = {
"channel1": [],
"channel2": [],
"channel3": []
};
// Register listeners for multiple channels
subscriber.on("channel1", (event: any) => {
messages.channel1.push(event);
});
subscriber.on("channel2", (event: any) => {
messages.channel2.push(event);
});
subscriber.on("channel3", (event: any) => {
messages.channel3.push(event);
});
// Subscribe to multiple channels
await subscriber.subscribe("channel1", "channel2", "channel3");
// Give a moment for subscriptions to be processed
await Bun.sleep(10);
// Publish to each channel
await publisher.publish("channel1", "Message for channel 1");
await publisher.publish("channel2", "Message for channel 2");
await publisher.publish("channel3", "Message for channel 3");
// Wait for messages to be received
await Bun.sleep(100);
expect(messages.channel1).toHaveLength(1);
expect(messages.channel2).toHaveLength(1);
expect(messages.channel3).toHaveLength(1);
expect(messages.channel1[0].message).toBe("Message for channel 1");
expect(messages.channel2[0].message).toBe("Message for channel 2");
expect(messages.channel3[0].message).toBe("Message for channel 3");
// Clean up
await subscriber.unsubscribe("channel1", "channel2", "channel3");
});
skipIfNoRedis("should handle error cases gracefully", async () => {
// Test with invalid arguments
expect(() => {
subscriber.on(); // No arguments
}).toThrow();
expect(() => {
subscriber.on("channel"); // Missing callback
}).toThrow();
expect(() => {
subscriber.on("channel", "not-a-function"); // Invalid callback
}).toThrow();
expect(() => {
subscriber.off(); // No arguments
}).toThrow();
});
});