From 0662c2f52db2f8dde4d22ddec76f9c50d60f9fe3 Mon Sep 17 00:00:00 2001 From: Claude Bot Date: Sun, 13 Jul 2025 04:15:16 +0000 Subject: [PATCH] Implement Redis PubSub functionality for Valkey client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add pub/sub support with .on()/.off() methods, RESP3 push message handling, and subscription management. Includes comprehensive test suite covering regular subscriptions, pattern subscriptions, and callback management. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/bun.js/api/valkey.classes.ts | 10 +- src/valkey/js_valkey.zig | 164 ++++++++++++++ src/valkey/valkey.zig | 116 ++++++++++ test/js/bun/valkey/pubsub.test.ts | 347 ++++++++++++++++++++++++++++++ 4 files changed, 636 insertions(+), 1 deletion(-) create mode 100644 test/js/bun/valkey/pubsub.test.ts diff --git a/src/bun.js/api/valkey.classes.ts b/src/bun.js/api/valkey.classes.ts index 9a7af095ff..7685552201 100644 --- a/src/bun.js/api/valkey.classes.ts +++ b/src/bun.js/api/valkey.classes.ts @@ -226,7 +226,15 @@ export default [ unsubscribe: { fn: "unsubscribe" }, punsubscribe: { fn: "punsubscribe" }, pubsub: { fn: "pubsub" }, + on: { + fn: "on", + length: 2, + }, + off: { + fn: "off", + length: 2, + }, }, - values: ["onconnect", "onclose", "connectionPromise", "hello"], + values: ["onconnect", "onclose", "connectionPromise", "hello", "subscriptions"], }), ]; diff --git a/src/valkey/js_valkey.zig b/src/valkey/js_valkey.zig index 0513723634..e629de7ae4 100644 --- a/src/valkey/js_valkey.zig +++ b/src/valkey/js_valkey.zig @@ -477,6 +477,63 @@ pub const JSValkeyClient = struct { pub fn onValkeyTimeout(this: *JSValkeyClient) void { this.clientFail("Connection timeout", protocol.RedisError.ConnectionClosed); } + + /// Handle PubSub messages from Valkey + pub fn onPubSubMessage(this: *JSValkeyClient, kind: []const u8, pattern: []const u8, channel: []const u8, message: []const u8) void { + const this_value = this.this_value.tryGet() orelse return; + const globalObject = this.globalObject; + const loop = this.client.vm.eventLoop(); + + loop.enter(); + defer loop.exit(); + + // Get the subscriptions object + if (js.subscriptionsGetCached(this_value)) |subscriptions| { + // For pattern messages, use the pattern as the key, otherwise use the channel + const callback_key = if (pattern.len > 0) pattern else channel; + + // Get callbacks for this channel/pattern + if (subscriptions.get(globalObject, callback_key) catch .js_undefined) |callbacks_value| { + if (callbacks_value.isArray()) { + const callbacks_array = callbacks_value; + const length = callbacks_array.getLength(globalObject) catch return; + + // Create event data + var event_obj = JSC.JSValue.createEmptyObjectWithNullPrototype(globalObject); + + // Add event type + const kind_str = bun.String.createUTF8ForJS(globalObject, kind); + event_obj.put(globalObject, "type", kind_str); + + // Add channel + const channel_str = bun.String.createUTF8ForJS(globalObject, channel); + event_obj.put(globalObject, "channel", channel_str); + + // Add message + const message_str = bun.String.createUTF8ForJS(globalObject, message); + event_obj.put(globalObject, "message", message_str); + + // Add pattern if it's a pattern message + if (pattern.len > 0) { + const pattern_str = bun.String.createUTF8ForJS(globalObject, pattern); + event_obj.put(globalObject, "pattern", pattern_str); + } + + // Call all callbacks for this channel + for (0..@intCast(length)) |i| { + const callback = callbacks_array.getIndex(globalObject, @intCast(i)) catch continue; + if (!callback.isEmptyOrUndefinedOrNull()) { + if (callback.isCallable()) { + _ = callback.call(globalObject, this_value, &[_]JSValue{event_obj}) catch |e| { + globalObject.reportActiveExceptionAsUnhandled(e); + }; + } + } + } + } + } + } + } pub fn clientFail(this: *JSValkeyClient, message: []const u8, err: protocol.RedisError) void { this.client.fail(message, err); @@ -706,6 +763,113 @@ pub const JSValkeyClient = struct { pub const zrevrank = fns.zrevrank; pub const zscore = fns.zscore; + /// Add event listener for PubSub channels + pub fn on(this: *JSValkeyClient, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue { + const arguments = callframe.arguments(); + if (arguments.len < 2) { + return globalObject.throwInvalidArguments("Expected channel and callback arguments", .{}); + } + + const channel_arg = arguments[0]; + const callback_arg = arguments[1]; + + if (!callback_arg.isCallable()) { + return globalObject.throwInvalidArguments("Expected callback to be a function", .{}); + } + + const channel_str = try channel_arg.toBunString(globalObject); + defer channel_str.deref(); + + const this_value = callframe.this(); + + // Get or create subscriptions object + var subscriptions = if (js.subscriptionsGetCached(this_value)) |existing| + existing + else blk: { + const new_subscriptions = JSC.JSValue.createEmptyObjectWithNullPrototype(globalObject); + js.subscriptionsSetCached(this_value, globalObject, new_subscriptions); + break :blk new_subscriptions; + }; + + // Get or create callbacks array for this channel + const channel_slice = channel_str.toUTF8WithoutRef(bun.default_allocator); + defer channel_slice.deinit(); + var callbacks_array = if (subscriptions.get(globalObject, channel_slice.slice()) catch .js_undefined) |existing| existing else blk: { + const new_array = try JSC.JSValue.createEmptyArray(globalObject, 0); + subscriptions.put(globalObject, channel_slice.slice(), new_array); + break :blk new_array; + }; + + // Add callback to array if it's not already there + if (callbacks_array.isArray()) { + const length = try callbacks_array.getLength(globalObject); + callbacks_array.putIndex(globalObject, @intCast(length), callback_arg) catch { + return globalObject.throwOutOfMemory(); + }; + } + + // Disable pipelining when pubsub is first used + this.client.flags.auto_pipelining = false; + + return .js_undefined; + } + + /// Remove event listener for PubSub channels + pub fn off(_: *JSValkeyClient, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue { + const arguments = callframe.arguments(); + if (arguments.len < 1) { + return globalObject.throwInvalidArguments("Expected channel argument", .{}); + } + + const channel_arg = arguments[0]; + const channel_str = try channel_arg.toBunString(globalObject); + defer channel_str.deref(); + + const this_value = callframe.this(); + + // Get subscriptions object + if (js.subscriptionsGetCached(this_value)) |subscriptions| { + const channel_slice = channel_str.toUTF8WithoutRef(bun.default_allocator); + defer channel_slice.deinit(); + + if (arguments.len >= 2 and arguments[1].isCallable()) { + // Remove specific callback + const callback_to_remove = arguments[1]; + if (subscriptions.get(globalObject, channel_slice.slice()) catch .js_undefined) |callbacks_value| { + if (callbacks_value.isArray()) { + const callbacks_array = callbacks_value; + const length = try callbacks_array.getLength(globalObject); + + // Find and remove the specific callback + var new_callbacks = std.ArrayList(JSValue).init(bun.default_allocator); + defer new_callbacks.deinit(); + + for (0..@intCast(length)) |i| { + const callback = callbacks_array.getIndex(globalObject, @intCast(i)) catch continue; + if (!callback.isEmptyOrUndefinedOrNull()) { + if (!(callback.isSameValue(callback_to_remove, globalObject) catch false)) { + new_callbacks.append(callback) catch return globalObject.throwOutOfMemory(); + } + } + } + + // Create new array with remaining callbacks + const new_array = try JSC.JSValue.createEmptyArray(globalObject, new_callbacks.items.len); + for (new_callbacks.items, 0..) |callback, i| { + new_array.putIndex(globalObject, @intCast(i), callback) catch return globalObject.throwOutOfMemory(); + } + subscriptions.put(globalObject, channel_slice.slice(), new_array); + } + } + } else { + // Remove all callbacks for the channel + subscriptions.put(globalObject, channel_slice.slice(), .js_undefined); + } + } + + return .js_undefined; + } + const fns = @import("./js_valkey_functions.zig"); }; diff --git a/src/valkey/valkey.zig b/src/valkey/valkey.zig index af0947becc..be7e8278ce 100644 --- a/src/valkey/valkey.zig +++ b/src/valkey/valkey.zig @@ -170,6 +170,9 @@ pub const ValkeyClient = struct { auto_flusher: AutoFlusher = .{}, vm: *JSC.VirtualMachine, + + // PubSub callback for handling subscription messages + pubsub_callback: ?*const fn (*ValkeyClient, channel: []const u8, message: []const u8) void = null, /// Clean up resources used by the Valkey client pub fn deinit(this: *@This(), globalObjectOrFinalizing: ?*JSC.JSGlobalObject) void { @@ -662,9 +665,122 @@ pub const ValkeyClient = struct { } } + /// Handle Push messages for PubSub + fn handlePushMessage(this: *ValkeyClient, push: *protocol.Push) !void { + debug("handlePushMessage: kind={s}, data_len={d}", .{ push.kind, push.data.len }); + + // Handle different push message types + if (std.mem.eql(u8, push.kind, "message") or + std.mem.eql(u8, push.kind, "pmessage") or + std.mem.eql(u8, push.kind, "smessage")) { + + // Pubsub message format: [kind, channel, message] or [kind, pattern, channel, message] + if (push.data.len >= 2) { + var channel: []const u8 = ""; + var message: []const u8 = ""; + var pattern: []const u8 = ""; + + if (std.mem.eql(u8, push.kind, "pmessage") and push.data.len >= 3) { + // Pattern message: [pattern, channel, message] + switch (push.data[0]) { + .SimpleString => |str| { + pattern = str; + }, + .BulkString => |maybe_str| { + if (maybe_str) |str| { + pattern = str; + } + }, + else => return, + } + switch (push.data[1]) { + .SimpleString => |str| { + channel = str; + }, + .BulkString => |maybe_str| { + if (maybe_str) |str| { + channel = str; + } + }, + else => return, + } + switch (push.data[2]) { + .SimpleString => |str| { + message = str; + }, + .BulkString => |maybe_str| { + if (maybe_str) |str| { + message = str; + } + }, + else => return, + } + } else { + // Regular message: [channel, message] + switch (push.data[0]) { + .SimpleString => |str| { + channel = str; + }, + .BulkString => |maybe_str| { + if (maybe_str) |str| { + channel = str; + } + }, + else => return, + } + switch (push.data[1]) { + .SimpleString => |str| { + message = str; + }, + .BulkString => |maybe_str| { + if (maybe_str) |str| { + message = str; + } + }, + else => return, + } + } + + // Call the JavaScript callback + this.parent().onPubSubMessage(push.kind, pattern, channel, message); + } + } else if (std.mem.eql(u8, push.kind, "subscribe") or + std.mem.eql(u8, push.kind, "psubscribe") or + std.mem.eql(u8, push.kind, "ssubscribe") or + std.mem.eql(u8, push.kind, "unsubscribe") or + std.mem.eql(u8, push.kind, "punsubscribe") or + std.mem.eql(u8, push.kind, "sunsubscribe")) { + // Subscription acknowledgment: [channel, subscription_count] + debug("Subscription acknowledgment: {s}", .{push.kind}); + + // Resolve the pending subscription command + var pair = this.in_flight.readItem() orelse { + debug("Received subscription acknowledgment but no promise in queue", .{}); + return; + }; + + const globalThis = this.globalObject(); + const loop = this.vm.eventLoop(); + + loop.enter(); + defer loop.exit(); + + // Create a RESP value from the push data for the promise resolution + var resp_value = protocol.RESPValue{ .Array = push.data }; + pair.promise.resolve(globalThis, &resp_value); + } + } + /// Handle Valkey protocol response fn handleResponse(this: *ValkeyClient, value: *protocol.RESPValue) !void { debug("onData() {any}", .{value.*}); + + // Handle Push messages for PubSub + if (value.* == .Push) { + try this.handlePushMessage(&value.Push); + return; + } + // Special handling for the initial HELLO response if (!this.flags.is_authenticated) { this.handleHelloResponse(value); diff --git a/test/js/bun/valkey/pubsub.test.ts b/test/js/bun/valkey/pubsub.test.ts new file mode 100644 index 0000000000..498c7662e7 --- /dev/null +++ b/test/js/bun/valkey/pubsub.test.ts @@ -0,0 +1,347 @@ +import { test, expect, describe, beforeAll, afterAll } from "bun:test"; +import { RedisClient } from "bun"; +import { bunEnv } from "harness"; + +// Skip tests if no Redis/Valkey server is available +const skipIfNoRedis = test; // Always run tests locally + +describe("Valkey PubSub functionality", () => { + let publisher: RedisClient; + let subscriber: RedisClient; + + beforeAll(async () => { + // Create two separate clients - one for publishing, one for subscribing + publisher = new RedisClient(process.env.REDIS_URL || "redis://localhost:6379"); + subscriber = new RedisClient(process.env.REDIS_URL || "redis://localhost:6379"); + + await publisher.connect(); + await subscriber.connect(); + }); + + afterAll(async () => { + if (publisher?.connected) { + await publisher.close(); + } + if (subscriber?.connected) { + await subscriber.close(); + } + }); + + skipIfNoRedis("should register and receive messages on channels", async () => { + const messages: any[] = []; + + // Register event listener + subscriber.on("test-channel", (event: any) => { + messages.push(event); + }); + + // Subscribe to the channel + await subscriber.subscribe("test-channel"); + + // Give a moment for subscription to be processed + await Bun.sleep(10); + + // Publish a message + await publisher.publish("test-channel", "Hello, World!"); + + // Wait for message to be received + await Bun.sleep(50); + + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ + type: "message", + channel: "test-channel", + message: "Hello, World!" + }); + + // Clean up + await subscriber.unsubscribe("test-channel"); + }); + + skipIfNoRedis("should handle multiple subscribers on same channel", async () => { + const messages1: any[] = []; + const messages2: any[] = []; + + // Register multiple event listeners + subscriber.on("multi-channel", (event: any) => { + messages1.push(event); + }); + + subscriber.on("multi-channel", (event: any) => { + messages2.push(event); + }); + + // Subscribe to the channel + await subscriber.subscribe("multi-channel"); + + // Give a moment for subscription to be processed + await Bun.sleep(10); + + // Publish a message + await publisher.publish("multi-channel", "Multi-subscriber test"); + + // Wait for message to be received + await Bun.sleep(50); + + // Both listeners should receive the message + expect(messages1).toHaveLength(1); + expect(messages2).toHaveLength(1); + expect(messages1[0]).toMatchObject({ + type: "message", + channel: "multi-channel", + message: "Multi-subscriber test" + }); + expect(messages2[0]).toMatchObject({ + type: "message", + channel: "multi-channel", + message: "Multi-subscriber test" + }); + + // Clean up + await subscriber.unsubscribe("multi-channel"); + }); + + skipIfNoRedis("should handle pattern subscriptions with psubscribe", async () => { + const messages: any[] = []; + + // Register event listener for pattern + subscriber.on("news.*", (event: any) => { + messages.push(event); + }); + + // Subscribe to pattern + await subscriber.psubscribe("news.*"); + + // Give a moment for subscription to be processed + await Bun.sleep(10); + + // Publish messages to matching channels + await publisher.publish("news.sports", "Sports update"); + await publisher.publish("news.weather", "Weather update"); + await publisher.publish("other.topic", "Should not match"); + + // Wait for messages to be received + await Bun.sleep(100); + + expect(messages).toHaveLength(2); + expect(messages[0]).toMatchObject({ + type: "pmessage", + pattern: "news.*", + message: "Sports update" + }); + expect(messages[1]).toMatchObject({ + type: "pmessage", + pattern: "news.*", + message: "Weather update" + }); + + // Clean up + await subscriber.punsubscribe("news.*"); + }); + + skipIfNoRedis("should remove specific callbacks with off()", async () => { + const messages1: any[] = []; + const messages2: any[] = []; + + const callback1 = (event: any) => { + messages1.push(event); + }; + + const callback2 = (event: any) => { + messages2.push(event); + }; + + // Register both callbacks + subscriber.on("removal-test", callback1); + subscriber.on("removal-test", callback2); + + // Subscribe to the channel + await subscriber.subscribe("removal-test"); + + // Give a moment for subscription to be processed + await Bun.sleep(10); + + // Publish first message - both should receive + await publisher.publish("removal-test", "Message 1"); + await Bun.sleep(50); + + expect(messages1).toHaveLength(1); + expect(messages2).toHaveLength(1); + + // Remove first callback + subscriber.off("removal-test", callback1); + + // Publish second message - only callback2 should receive + await publisher.publish("removal-test", "Message 2"); + await Bun.sleep(50); + + expect(messages1).toHaveLength(1); // Still 1 + expect(messages2).toHaveLength(2); // Now 2 + + // Clean up + await subscriber.unsubscribe("removal-test"); + }); + + skipIfNoRedis("should remove all callbacks for channel with off(channel)", async () => { + const messages: any[] = []; + + // Register multiple callbacks + subscriber.on("remove-all-test", (event: any) => { + messages.push({ id: 1, ...event }); + }); + + subscriber.on("remove-all-test", (event: any) => { + messages.push({ id: 2, ...event }); + }); + + // Subscribe to the channel + await subscriber.subscribe("remove-all-test"); + + // Give a moment for subscription to be processed + await Bun.sleep(10); + + // Publish first message - both should receive + await publisher.publish("remove-all-test", "Before removal"); + await Bun.sleep(50); + + expect(messages).toHaveLength(2); + + // Remove all callbacks for the channel + subscriber.off("remove-all-test"); + + // Publish second message - none should receive + await publisher.publish("remove-all-test", "After removal"); + await Bun.sleep(50); + + expect(messages).toHaveLength(2); // Still 2, no new messages + + // Clean up + await subscriber.unsubscribe("remove-all-test"); + }); + + skipIfNoRedis("should handle binary data in pubsub messages", async () => { + const messages: any[] = []; + + // Register event listener + subscriber.on("binary-channel", (event: any) => { + messages.push(event); + }); + + // Subscribe to the channel + await subscriber.subscribe("binary-channel"); + + // Give a moment for subscription to be processed + await Bun.sleep(10); + + // Publish binary data + const binaryData = new Uint8Array([0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x57, 0x6f, 0x72, 0x6c, 0x64]); + await publisher.publish("binary-channel", binaryData); + + // Wait for message to be received + await Bun.sleep(50); + + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ + type: "message", + channel: "binary-channel" + }); + expect(typeof messages[0].message).toBe("string"); + + // Clean up + await subscriber.unsubscribe("binary-channel"); + }); + + skipIfNoRedis("should handle subscription acknowledgments", async () => { + // Subscribe and check that the promise resolves + const subscribeResult = await subscriber.subscribe("ack-test"); + + // Should return subscription count or similar acknowledgment + expect(subscribeResult).toBeDefined(); + + // Clean up + await subscriber.unsubscribe("ack-test"); + }); + + skipIfNoRedis("should disable pipelining when using pubsub", async () => { + const testClient = new RedisClient(process.env.REDIS_URL || "redis://localhost:6379"); + await testClient.connect(); + + // Register a pubsub listener (this should disable pipelining) + testClient.on("pipeline-test", () => {}); + + // The client should now have pipelining disabled + // This is more of an implementation detail that would be verified + // through internal client state, but we can at least verify + // that pubsub works correctly even with this change + + await testClient.subscribe("pipeline-test"); + await testClient.unsubscribe("pipeline-test"); + + await testClient.close(); + }); + + skipIfNoRedis("should handle multiple channels in single subscription", async () => { + const messages: Record = { + "channel1": [], + "channel2": [], + "channel3": [] + }; + + // Register listeners for multiple channels + subscriber.on("channel1", (event: any) => { + messages.channel1.push(event); + }); + + subscriber.on("channel2", (event: any) => { + messages.channel2.push(event); + }); + + subscriber.on("channel3", (event: any) => { + messages.channel3.push(event); + }); + + // Subscribe to multiple channels + await subscriber.subscribe("channel1", "channel2", "channel3"); + + // Give a moment for subscriptions to be processed + await Bun.sleep(10); + + // Publish to each channel + await publisher.publish("channel1", "Message for channel 1"); + await publisher.publish("channel2", "Message for channel 2"); + await publisher.publish("channel3", "Message for channel 3"); + + // Wait for messages to be received + await Bun.sleep(100); + + expect(messages.channel1).toHaveLength(1); + expect(messages.channel2).toHaveLength(1); + expect(messages.channel3).toHaveLength(1); + + expect(messages.channel1[0].message).toBe("Message for channel 1"); + expect(messages.channel2[0].message).toBe("Message for channel 2"); + expect(messages.channel3[0].message).toBe("Message for channel 3"); + + // Clean up + await subscriber.unsubscribe("channel1", "channel2", "channel3"); + }); + + skipIfNoRedis("should handle error cases gracefully", async () => { + // Test with invalid arguments + expect(() => { + subscriber.on(); // No arguments + }).toThrow(); + + expect(() => { + subscriber.on("channel"); // Missing callback + }).toThrow(); + + expect(() => { + subscriber.on("channel", "not-a-function"); // Invalid callback + }).toThrow(); + + expect(() => { + subscriber.off(); // No arguments + }).toThrow(); + }); +}); \ No newline at end of file