diff --git a/packages/bun-types/redis.d.ts b/packages/bun-types/redis.d.ts index 327c9a624b..39fa64d793 100644 --- a/packages/bun-types/redis.d.ts +++ b/packages/bun-types/redis.d.ts @@ -52,25 +52,21 @@ declare module "bun" { export namespace RedisClient { type KeyLike = string | ArrayBufferView | Blob; - type StringPubSubListener = (message: string, channel: string) => void; - - // Buffer subscriptions are not yet implemented - // type BufferPubSubListener = (message: Uint8Array, channel: string) => void; } export class RedisClient { /** * Creates a new Redis client - * - * @param url URL to connect to, defaults to `process.env.VALKEY_URL`, - * `process.env.REDIS_URL`, or `"valkey://localhost:6379"` + * @param url URL to connect to, defaults to process.env.VALKEY_URL, process.env.REDIS_URL, or "valkey://localhost:6379" * @param options Additional options * * @example * ```ts - * const redis = new RedisClient(); - * await redis.set("hello", "world"); - * console.log(await redis.get("hello")); + * const valkey = new RedisClient(); + * + * await valkey.set("hello", "world"); + * + * console.log(await valkey.get("hello")); * ``` */ constructor(url?: string, options?: RedisOptions); @@ -92,14 +88,12 @@ declare module "bun" { /** * Callback fired when the client disconnects from the Redis server - * * @param error The error that caused the disconnection */ onclose: ((this: RedisClient, error: Error) => void) | null; /** * Connect to the Redis server - * * @returns A promise that resolves when connected */ connect(): Promise; @@ -158,12 +152,10 @@ declare module "bun" { set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, px: "PX", milliseconds: number): Promise<"OK">; /** - * Set key to hold the string value with expiration at a specific Unix - * timestamp + * Set key to hold the string value with expiration at a specific Unix timestamp * @param key The key to set * @param value The value to set - * @param exat Set the specified Unix time at which the key will expire, in - * seconds + * @param exat Set the specified Unix time at which the key will expire, in seconds * @returns Promise that resolves with "OK" on success */ set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, exat: "EXAT", timestampSeconds: number): Promise<"OK">; @@ -187,8 +179,7 @@ declare module "bun" { * @param key The key to set * @param value The value to set * @param nx Only set the key if it does not already exist - * @returns Promise that resolves with "OK" on success, or null if the key - * already exists + * @returns Promise that resolves with "OK" on success, or null if the key already exists */ set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, nx: "NX"): Promise<"OK" | null>; @@ -197,8 +188,7 @@ declare module "bun" { * @param key The key to set * @param value The value to set * @param xx Only set the key if it already exists - * @returns Promise that resolves with "OK" on success, or null if the key - * does not exist + * @returns Promise that resolves with "OK" on success, or null if the key does not exist */ set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, xx: "XX"): Promise<"OK" | null>; @@ -206,10 +196,8 @@ declare module "bun" { * Set key to hold the string value and return the old value * @param key The key to set * @param value The value to set - * @param get Return the old string stored at key, or null if key did not - * exist - * @returns Promise that resolves with the old value, or null if key did not - * exist + * @param get Return the old string stored at key, or null if key did not exist + * @returns Promise that resolves with the old value, or null if key did not exist */ set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, get: "GET"): Promise; @@ -255,8 +243,7 @@ declare module "bun" { /** * Determine if a key exists * @param key The key to check - * @returns Promise that resolves with true if the key exists, false - * otherwise + * @returns Promise that resolves with true if the key exists, false otherwise */ exists(key: RedisClient.KeyLike): Promise; @@ -271,8 +258,7 @@ declare module "bun" { /** * Get the time to live for a key in seconds * @param key The key to get the TTL for - * @returns Promise that resolves with the TTL, -1 if no expiry, or -2 if - * key doesn't exist + * @returns Promise that resolves with the TTL, -1 if no expiry, or -2 if key doesn't exist */ ttl(key: RedisClient.KeyLike): Promise; @@ -296,8 +282,7 @@ declare module "bun" { * Check if a value is a member of a set * @param key The set key * @param member The member to check - * @returns Promise that resolves with true if the member exists, false - * otherwise + * @returns Promise that resolves with true if the member exists, false otherwise */ sismember(key: RedisClient.KeyLike, member: string): Promise; @@ -305,8 +290,7 @@ declare module "bun" { * Add a member to a set * @param key The set key * @param member The member to add - * @returns Promise that resolves with 1 if the member was added, 0 if it - * already existed + * @returns Promise that resolves with 1 if the member was added, 0 if it already existed */ sadd(key: RedisClient.KeyLike, member: string): Promise; @@ -314,8 +298,7 @@ declare module "bun" { * Remove a member from a set * @param key The set key * @param member The member to remove - * @returns Promise that resolves with 1 if the member was removed, 0 if it - * didn't exist + * @returns Promise that resolves with 1 if the member was removed, 0 if it didn't exist */ srem(key: RedisClient.KeyLike, member: string): Promise; @@ -329,16 +312,14 @@ declare module "bun" { /** * Get a random member from a set * @param key The set key - * @returns Promise that resolves with a random member, or null if the set - * is empty + * @returns Promise that resolves with a random member, or null if the set is empty */ srandmember(key: RedisClient.KeyLike): Promise; /** * Remove and return a random member from a set * @param key The set key - * @returns Promise that resolves with the removed member, or null if the - * set is empty + * @returns Promise that resolves with the removed member, or null if the set is empty */ spop(key: RedisClient.KeyLike): Promise; @@ -405,32 +386,28 @@ declare module "bun" { /** * Remove and get the first element in a list * @param key The list key - * @returns Promise that resolves with the first element, or null if the - * list is empty + * @returns Promise that resolves with the first element, or null if the list is empty */ lpop(key: RedisClient.KeyLike): Promise; /** * Remove the expiration from a key * @param key The key to persist - * @returns Promise that resolves with 1 if the timeout was removed, 0 if - * the key doesn't exist or has no timeout + * @returns Promise that resolves with 1 if the timeout was removed, 0 if the key doesn't exist or has no timeout */ persist(key: RedisClient.KeyLike): Promise; /** * Get the expiration time of a key as a UNIX timestamp in milliseconds * @param key The key to check - * @returns Promise that resolves with the timestamp, or -1 if the key has - * no expiration, or -2 if the key doesn't exist + * @returns Promise that resolves with the timestamp, or -1 if the key has no expiration, or -2 if the key doesn't exist */ pexpiretime(key: RedisClient.KeyLike): Promise; /** * Get the time to live for a key in milliseconds * @param key The key to check - * @returns Promise that resolves with the TTL in milliseconds, or -1 if the - * key has no expiration, or -2 if the key doesn't exist + * @returns Promise that resolves with the TTL in milliseconds, or -1 if the key has no expiration, or -2 if the key doesn't exist */ pttl(key: RedisClient.KeyLike): Promise; @@ -444,48 +421,42 @@ declare module "bun" { /** * Get the number of members in a set * @param key The set key - * @returns Promise that resolves with the cardinality (number of elements) - * of the set + * @returns Promise that resolves with the cardinality (number of elements) of the set */ scard(key: RedisClient.KeyLike): Promise; /** * Get the length of the value stored in a key * @param key The key to check - * @returns Promise that resolves with the length of the string value, or 0 - * if the key doesn't exist + * @returns Promise that resolves with the length of the string value, or 0 if the key doesn't exist */ strlen(key: RedisClient.KeyLike): Promise; /** * Get the number of members in a sorted set * @param key The sorted set key - * @returns Promise that resolves with the cardinality (number of elements) - * of the sorted set + * @returns Promise that resolves with the cardinality (number of elements) of the sorted set */ zcard(key: RedisClient.KeyLike): Promise; /** * Remove and return members with the highest scores in a sorted set * @param key The sorted set key - * @returns Promise that resolves with the removed member and its score, or - * null if the set is empty + * @returns Promise that resolves with the removed member and its score, or null if the set is empty */ zpopmax(key: RedisClient.KeyLike): Promise; /** * Remove and return members with the lowest scores in a sorted set * @param key The sorted set key - * @returns Promise that resolves with the removed member and its score, or - * null if the set is empty + * @returns Promise that resolves with the removed member and its score, or null if the set is empty */ zpopmin(key: RedisClient.KeyLike): Promise; /** * Get one or multiple random members from a sorted set * @param key The sorted set key - * @returns Promise that resolves with a random member, or null if the set - * is empty + * @returns Promise that resolves with a random member, or null if the set is empty */ zrandmember(key: RedisClient.KeyLike): Promise; @@ -493,8 +464,7 @@ declare module "bun" { * Append a value to a key * @param key The key to append to * @param value The value to append - * @returns Promise that resolves with the length of the string after the - * append operation + * @returns Promise that resolves with the length of the string after the append operation */ append(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise; @@ -502,8 +472,7 @@ declare module "bun" { * Set the value of a key and return its old value * @param key The key to set * @param value The value to set - * @returns Promise that resolves with the old value, or null if the key - * didn't exist + * @returns Promise that resolves with the old value, or null if the key didn't exist */ getset(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise; @@ -511,8 +480,7 @@ declare module "bun" { * Prepend one or multiple values to a list * @param key The list key * @param value The value to prepend - * @returns Promise that resolves with the length of the list after the push - * operation + * @returns Promise that resolves with the length of the list after the push operation */ lpush(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise; @@ -520,8 +488,7 @@ declare module "bun" { * Prepend a value to a list, only if the list exists * @param key The list key * @param value The value to prepend - * @returns Promise that resolves with the length of the list after the push - * operation, or 0 if the list doesn't exist + * @returns Promise that resolves with the length of the list after the push operation, or 0 if the list doesn't exist */ lpushx(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise; @@ -529,8 +496,7 @@ declare module "bun" { * Add one or more members to a HyperLogLog * @param key The HyperLogLog key * @param element The element to add - * @returns Promise that resolves with 1 if the HyperLogLog was altered, 0 - * otherwise + * @returns Promise that resolves with 1 if the HyperLogLog was altered, 0 otherwise */ pfadd(key: RedisClient.KeyLike, element: string): Promise; @@ -538,8 +504,7 @@ declare module "bun" { * Append one or multiple values to a list * @param key The list key * @param value The value to append - * @returns Promise that resolves with the length of the list after the push - * operation + * @returns Promise that resolves with the length of the list after the push operation */ rpush(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise; @@ -547,8 +512,7 @@ declare module "bun" { * Append a value to a list, only if the list exists * @param key The list key * @param value The value to append - * @returns Promise that resolves with the length of the list after the push - * operation, or 0 if the list doesn't exist + * @returns Promise that resolves with the length of the list after the push operation, or 0 if the list doesn't exist */ rpushx(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise; @@ -556,8 +520,7 @@ declare module "bun" { * Set the value of a key, only if the key does not exist * @param key The key to set * @param value The value to set - * @returns Promise that resolves with 1 if the key was set, 0 if the key - * was not set + * @returns Promise that resolves with 1 if the key was set, 0 if the key was not set */ setnx(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise; @@ -565,16 +528,14 @@ declare module "bun" { * Get the score associated with the given member in a sorted set * @param key The sorted set key * @param member The member to get the score for - * @returns Promise that resolves with the score of the member as a string, - * or null if the member or key doesn't exist + * @returns Promise that resolves with the score of the member as a string, or null if the member or key doesn't exist */ zscore(key: RedisClient.KeyLike, member: string): Promise; /** * Get the values of all specified keys * @param keys The keys to get - * @returns Promise that resolves with an array of values, with null for - * keys that don't exist + * @returns Promise that resolves with an array of values, with null for keys that don't exist */ mget(...keys: RedisClient.KeyLike[]): Promise<(string | null)[]>; @@ -588,46 +549,37 @@ declare module "bun" { /** * Return a serialized version of the value stored at the specified key * @param key The key to dump - * @returns Promise that resolves with the serialized value, or null if the - * key doesn't exist + * @returns Promise that resolves with the serialized value, or null if the key doesn't exist */ dump(key: RedisClient.KeyLike): Promise; /** * Get the expiration time of a key as a UNIX timestamp in seconds - * * @param key The key to check - * @returns Promise that resolves with the timestamp, or -1 if the key has - * no expiration, or -2 if the key doesn't exist + * @returns Promise that resolves with the timestamp, or -1 if the key has no expiration, or -2 if the key doesn't exist */ expiretime(key: RedisClient.KeyLike): Promise; /** * Get the value of a key and delete the key - * * @param key The key to get and delete - * @returns Promise that resolves with the value of the key, or null if the - * key doesn't exist + * @returns Promise that resolves with the value of the key, or null if the key doesn't exist */ getdel(key: RedisClient.KeyLike): Promise; /** * Get the value of a key and optionally set its expiration - * * @param key The key to get - * @returns Promise that resolves with the value of the key, or null if the - * key doesn't exist + * @returns Promise that resolves with the value of the key, or null if the key doesn't exist */ getex(key: RedisClient.KeyLike): Promise; /** * Get the value of a key and set its expiration in seconds - * * @param key The key to get * @param ex Set the specified expire time, in seconds * @param seconds The number of seconds until expiration - * @returns Promise that resolves with the value of the key, or null if the - * key doesn't exist + * @returns Promise that resolves with the value of the key, or null if the key doesn't exist */ getex(key: RedisClient.KeyLike, ex: "EX", seconds: number): Promise; @@ -642,7 +594,6 @@ declare module "bun" { /** * Get the value of a key and set its expiration at a specific Unix timestamp in seconds - * * @param key The key to get * @param exat Set the specified Unix time at which the key will expire, in seconds * @param timestampSeconds The Unix timestamp in seconds @@ -652,7 +603,6 @@ declare module "bun" { /** * Get the value of a key and set its expiration at a specific Unix timestamp in milliseconds - * * @param key The key to get * @param pxat Set the specified Unix time at which the key will expire, in milliseconds * @param timestampMilliseconds The Unix timestamp in milliseconds @@ -662,7 +612,6 @@ declare module "bun" { /** * Get the value of a key and remove its expiration - * * @param key The key to get * @param persist Remove the expiration from the key * @returns Promise that resolves with the value of the key, or null if the key doesn't exist @@ -677,133 +626,10 @@ declare module "bun" { /** * Ping the server with a message - * * @param message The message to send to the server * @returns Promise that resolves with the message if the server is reachable, or throws an error if the server is not reachable */ ping(message: RedisClient.KeyLike): Promise; - - /** - * Publish a message to a Redis channel. - * - * @param channel The channel to publish to. - * @param message The message to publish. - * - * @returns The number of clients that received the message. Note that in a - * cluster this returns the total number of clients in the same node. - */ - publish(channel: string, message: string): Promise; - - /** - * Subscribe to a Redis channel. - * - * Subscribing disables automatic pipelining, so all commands will be - * received immediately. - * - * Subscribing moves the channel to a dedicated subscription state which - * prevents most other commands from being executed until unsubscribed. Only - * {@link ping `.ping()`}, {@link subscribe `.subscribe()`}, and - * {@link unsubscribe `.unsubscribe()`} are legal to invoke in a subscribed - * upon channel. - * - * @param channel The channel to subscribe to. - * @param listener The listener to call when a message is received on the - * channel. The listener will receive the message as the first argument and - * the channel as the second argument. - * - * @example - * ```ts - * await client.subscribe("my-channel", (message, channel) => { - * console.log(`Received message on ${channel}: ${message}`); - * }); - * ``` - */ - subscribe(channel: string, listener: RedisClient.StringPubSubListener): Promise; - - /** - * Subscribe to multiple Redis channels. - * - * Subscribing disables automatic pipelining, so all commands will be - * received immediately. - * - * Subscribing moves the channels to a dedicated subscription state in which - * only a limited set of commands can be executed. - * - * @param channels An array of channels to subscribe to. - * @param listener The listener to call when a message is received on any of - * the subscribed channels. The listener will receive the message as the - * first argument and the channel as the second argument. - */ - subscribe(channels: string[], listener: RedisClient.StringPubSubListener): Promise; - - /** - * Unsubscribe from a singular Redis channel. - * - * @param channel The channel to unsubscribe from. - * - * If there are no more channels subscribed to, the client automatically - * re-enables pipelining if it was previously enabled. - * - * Unsubscribing moves the channel back to a normal state out of the - * subscription state if all channels have been unsubscribed from. For - * further details on the subscription state, see - * {@link subscribe `.subscribe()`}. - */ - unsubscribe(channel: string): Promise; - - /** - * Remove a listener from a given Redis channel. - * - * If there are no more channels subscribed to, the client automatically - * re-enables pipelining if it was previously enabled. - * - * Unsubscribing moves the channel back to a normal state out of the - * subscription state if all channels have been unsubscribed from. For - * further details on the subscription state, see - * {@link subscribe `.subscribe()`}. - * - * @param channel The channel to unsubscribe from. - * @param listener The listener to remove. This is tested against - * referential equality so you must pass the exact same listener instance as - * when subscribing. - */ - unsubscribe(channel: string, listener: RedisClient.StringPubSubListener): Promise; - - /** - * Unsubscribe from all registered Redis channels. - * - * The client will automatically re-enable pipelining if it was previously - * enabled. - * - * Unsubscribing moves the channel back to a normal state out of the - * subscription state if all channels have been unsubscribed from. For - * further details on the subscription state, see - * {@link subscribe `.subscribe()`}. - */ - unsubscribe(): Promise; - - /** - * Unsubscribe from multiple Redis channels. - * - * @param channels An array of channels to unsubscribe from. - * - * If there are no more channels subscribed to, the client automatically - * re-enables pipelining if it was previously enabled. - * - * Unsubscribing moves the channel back to a normal state out of the - * subscription state if all channels have been unsubscribed from. For - * further details on the subscription state, see - * {@link subscribe `.subscribe()`}. - */ - unsubscribe(channels: string[]): Promise; - - /** - * @brief Create a new RedisClient instance with the same configuration as - * the current instance. - * - * This will open up a new connection to the Redis server. - */ - duplicate(): Promise; } /** diff --git a/src/bun.js/api/valkey.classes.ts b/src/bun.js/api/valkey.classes.ts index 8d0ae0d976..9a7af095ff 100644 --- a/src/bun.js/api/valkey.classes.ts +++ b/src/bun.js/api/valkey.classes.ts @@ -9,7 +9,6 @@ export default [ configurable: false, JSType: "0b11101110", memoryCost: true, - hasPendingActivity: true, proto: { connected: { getter: "getConnected", @@ -223,12 +222,11 @@ export default [ zrank: { fn: "zrank" }, zrevrank: { fn: "zrevrank" }, subscribe: { fn: "subscribe" }, - duplicate: { fn: "duplicate" }, psubscribe: { fn: "psubscribe" }, unsubscribe: { fn: "unsubscribe" }, punsubscribe: { fn: "punsubscribe" }, pubsub: { fn: "pubsub" }, }, - values: ["onconnect", "onclose", "connectionPromise", "hello", "subscriptionCallbackMap"], + values: ["onconnect", "onclose", "connectionPromise", "hello"], }), ]; diff --git a/src/bun.js/bindings/JSMap.zig b/src/bun.js/bindings/JSMap.zig index 4e09b0c109..5c4ce35be9 100644 --- a/src/bun.js/bindings/JSMap.zig +++ b/src/bun.js/bindings/JSMap.zig @@ -9,18 +9,13 @@ pub const JSMap = opaque { return bun.cpp.JSC__JSMap__set(this, globalObject, key, value); } - extern fn JSC__JSMap__get(*JSMap, *JSGlobalObject, JSValue) JSValue; + pub fn get_(this: *JSMap, globalObject: *JSGlobalObject, key: JSValue) JSValue { + return bun.cpp.JSC__JSMap__get_(this, globalObject, key); + } - pub fn get(this: *JSMap, globalObject: *JSGlobalObject, key: JSValue) bun.JSError!?JSValue { - var scope: jsc.CatchScope = undefined; - scope.init(globalObject, @src()); - defer scope.deinit(); - - const value = JSC__JSMap__get(this, globalObject, key); - - try scope.returnIfException(); - - if (value == .zero) { + pub fn get(this: *JSMap, globalObject: *JSGlobalObject, key: JSValue) ?JSValue { + const value = get_(this, globalObject, key); + if (value.isEmpty()) { return null; } return value; @@ -34,10 +29,6 @@ pub const JSMap = opaque { return bun.cpp.JSC__JSMap__remove(this, globalObject, key); } - pub fn size(this: *JSMap, globalObject: *JSGlobalObject) usize { - return bun.cpp.JSC__JSMap__size(this, globalObject); - } - pub fn fromJS(value: JSValue) ?*JSMap { if (value.jsTypeLoose() == .Map) { return bun.cast(*JSMap, value.asEncoded().asPtr.?); diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp index 0a4a66e200..aff6f36740 100644 --- a/src/bun.js/bindings/bindings.cpp +++ b/src/bun.js/bindings/bindings.cpp @@ -37,7 +37,6 @@ #include "JavaScriptCore/JSArrayInlines.h" #include "JavaScriptCore/ErrorInstanceInlines.h" #include "JavaScriptCore/BigIntObject.h" -#include "JavaScriptCore/OrderedHashTableHelper.h" #include "JavaScriptCore/JSCallbackObject.h" #include "JavaScriptCore/JSClassRef.h" @@ -6402,20 +6401,11 @@ CPP_DECL JSC::EncodedJSValue JSC__JSMap__create(JSC::JSGlobalObject* arg0) JSC::JSMap* map = JSC::JSMap::create(arg0->vm(), arg0->mapStructure()); return JSC::JSValue::encode(map); } - -// zero means "not found" or an exception was thrown -CPP_DECL JSC::EncodedJSValue JSC__JSMap__get(JSC::JSMap* map, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2) +CPP_DECL [[ZIG_EXPORT(nothrow)]] JSC::EncodedJSValue JSC__JSMap__get_(JSC::JSMap* map, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2) { - auto& vm = arg1->vm(); - auto scope = DECLARE_THROW_SCOPE(vm); - JSC::JSValue value = JSC::JSValue::decode(JSValue2); - JSValue entryValue = map->getImpl(arg1, [&](OrderedHashMap::Storage& storage) ALWAYS_INLINE_LAMBDA { - return OrderedHashMap::Helper::find(arg1, storage, value); - }); - - RELEASE_AND_RETURN(scope, JSC::JSValue::encode(entryValue)); + return JSC::JSValue::encode(map->get(arg1, value)); } CPP_DECL [[ZIG_EXPORT(nothrow)]] bool JSC__JSMap__has(JSC::JSMap* map, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2) { @@ -6432,11 +6422,6 @@ CPP_DECL [[ZIG_EXPORT(nothrow)]] void JSC__JSMap__set(JSC::JSMap* map, JSC::JSGl map->set(arg1, JSC::JSValue::decode(JSValue2), JSC::JSValue::decode(JSValue3)); } -CPP_DECL [[ZIG_EXPORT(nothrow)]] uint32_t JSC__JSMap__size(JSC::JSMap* map, JSC::JSGlobalObject* arg1) -{ - return map->size(); -} - CPP_DECL void JSC__VM__setControlFlowProfiler(JSC::VM* vm, bool isEnabled) { if (isEnabled) { diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index 0f3f542eaf..0428366adb 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -8,7 +8,7 @@ #define AUTO_EXTERN_C extern "C" #ifdef WIN32 - #define AUTO_EXTERN_C_ZIG extern "C" + #define AUTO_EXTERN_C_ZIG extern "C" #else #define AUTO_EXTERN_C_ZIG extern "C" __attribute__((weak)) #endif @@ -129,7 +129,7 @@ CPP_DECL void WebCore__AbortSignal__cleanNativeBindings(WebCore::AbortSignal* ar CPP_DECL JSC::EncodedJSValue WebCore__AbortSignal__create(JSC::JSGlobalObject* arg0); CPP_DECL WebCore::AbortSignal* WebCore__AbortSignal__fromJS(JSC::EncodedJSValue JSValue0); CPP_DECL WebCore::AbortSignal* WebCore__AbortSignal__ref(WebCore::AbortSignal* arg0); -CPP_DECL WebCore::AbortSignal* WebCore__AbortSignal__signal(WebCore::AbortSignal* arg0, JSC::JSGlobalObject*, uint8_t abortReason); +CPP_DECL WebCore::AbortSignal* WebCore__AbortSignal__signal(WebCore::AbortSignal* arg0, JSC::JSGlobalObject*, uint8_t abortReason); CPP_DECL JSC::EncodedJSValue WebCore__AbortSignal__toJS(WebCore::AbortSignal* arg0, JSC::JSGlobalObject* arg1); CPP_DECL void WebCore__AbortSignal__unref(WebCore::AbortSignal* arg0); @@ -186,11 +186,10 @@ CPP_DECL JSC::VM* JSC__JSGlobalObject__vm(JSC::JSGlobalObject* arg0); #pragma mark - JSC::JSMap CPP_DECL JSC::EncodedJSValue JSC__JSMap__create(JSC::JSGlobalObject* arg0); -CPP_DECL JSC::EncodedJSValue JSC__JSMap__get(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2); +CPP_DECL JSC::EncodedJSValue JSC__JSMap__get_(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2); CPP_DECL bool JSC__JSMap__has(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2); CPP_DECL bool JSC__JSMap__remove(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2); CPP_DECL void JSC__JSMap__set(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2, JSC::EncodedJSValue JSValue3); -CPP_DECL uint32_t JSC__JSMap__size(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1); #pragma mark - JSC::JSValue diff --git a/src/deps/uws/us_socket_t.zig b/src/deps/uws/us_socket_t.zig index bd502b0aef..bd84853b52 100644 --- a/src/deps/uws/us_socket_t.zig +++ b/src/deps/uws/us_socket_t.zig @@ -13,7 +13,7 @@ pub const us_socket_t = opaque { }; pub fn open(this: *us_socket_t, comptime is_ssl: bool, is_client: bool, ip_addr: ?[]const u8) void { - debug("us_socket_open({p}, is_client: {})", .{ this, is_client }); + debug("us_socket_open({d}, is_client: {})", .{ @intFromPtr(this), is_client }); const ssl = @intFromBool(is_ssl); if (ip_addr) |ip| { @@ -25,22 +25,22 @@ pub const us_socket_t = opaque { } pub fn pause(this: *us_socket_t, ssl: bool) void { - debug("us_socket_pause({p})", .{this}); + debug("us_socket_pause({d})", .{@intFromPtr(this)}); c.us_socket_pause(@intFromBool(ssl), this); } pub fn @"resume"(this: *us_socket_t, ssl: bool) void { - debug("us_socket_resume({p})", .{this}); + debug("us_socket_resume({d})", .{@intFromPtr(this)}); c.us_socket_resume(@intFromBool(ssl), this); } pub fn close(this: *us_socket_t, ssl: bool, code: CloseCode) void { - debug("us_socket_close({p}, {s})", .{ this, @tagName(code) }); + debug("us_socket_close({d}, {s})", .{ @intFromPtr(this), @tagName(code) }); _ = c.us_socket_close(@intFromBool(ssl), this, code, null); } pub fn shutdown(this: *us_socket_t, ssl: bool) void { - debug("us_socket_shutdown({p})", .{this}); + debug("us_socket_shutdown({d})", .{@intFromPtr(this)}); c.us_socket_shutdown(@intFromBool(ssl), this); } @@ -128,25 +128,25 @@ pub const us_socket_t = opaque { pub fn write(this: *us_socket_t, ssl: bool, data: []const u8) i32 { const rc = c.us_socket_write(@intFromBool(ssl), this, data.ptr, @intCast(data.len)); - debug("us_socket_write({p}, {d}) = {d}", .{ this, data.len, rc }); + debug("us_socket_write({d}, {d}) = {d}", .{ @intFromPtr(this), data.len, rc }); return rc; } pub fn writeFd(this: *us_socket_t, data: []const u8, file_descriptor: bun.FD) i32 { if (bun.Environment.isWindows) @compileError("TODO: implement writeFd on Windows"); const rc = c.us_socket_ipc_write_fd(this, data.ptr, @intCast(data.len), file_descriptor.native()); - debug("us_socket_ipc_write_fd({p}, {d}, {d}) = {d}", .{ this, data.len, file_descriptor.native(), rc }); + debug("us_socket_ipc_write_fd({d}, {d}, {d}) = {d}", .{ @intFromPtr(this), data.len, file_descriptor.native(), rc }); return rc; } pub fn write2(this: *us_socket_t, ssl: bool, first: []const u8, second: []const u8) i32 { const rc = c.us_socket_write2(@intFromBool(ssl), this, first.ptr, first.len, second.ptr, second.len); - debug("us_socket_write2({p}, {d}, {d}) = {d}", .{ this, first.len, second.len, rc }); + debug("us_socket_write2({d}, {d}, {d}) = {d}", .{ @intFromPtr(this), first.len, second.len, rc }); return rc; } pub fn rawWrite(this: *us_socket_t, ssl: bool, data: []const u8) i32 { - debug("us_socket_raw_write({p}, {d})", .{ this, data.len }); + debug("us_socket_raw_write({d}, {d})", .{ @intFromPtr(this), data.len }); return c.us_socket_raw_write(@intFromBool(ssl), this, data.ptr, @intCast(data.len)); } diff --git a/src/string/StringBuilder.zig b/src/string/StringBuilder.zig index ae89ee4f74..91e8b1f250 100644 --- a/src/string/StringBuilder.zig +++ b/src/string/StringBuilder.zig @@ -236,15 +236,6 @@ pub fn writable(this: *StringBuilder) []u8 { return ptr[this.len..this.cap]; } -/// Transfer ownership of the underlying memory to a slice. -/// -/// After calling this, you are responsible for freeing the underlying memory. -/// This StringBuilder should not be used after calling this function. -pub fn moveToSlice(this: *StringBuilder, into_slice: *[]u8) void { - into_slice.* = this.allocatedSlice(); - this.* = .{}; -} - const std = @import("std"); const Allocator = std.mem.Allocator; diff --git a/src/valkey/ValkeyCommand.zig b/src/valkey/ValkeyCommand.zig index 2349ecad4a..dfc9c448e6 100644 --- a/src/valkey/ValkeyCommand.zig +++ b/src/valkey/ValkeyCommand.zig @@ -137,7 +137,7 @@ pub const Promise = struct { self.promise.resolve(globalObject, js_value); } - pub fn reject(self: *Promise, globalObject: *jsc.JSGlobalObject, jsvalue: JSError!jsc.JSValue) void { + pub fn reject(self: *Promise, globalObject: *jsc.JSGlobalObject, jsvalue: jsc.JSValue) void { self.promise.reject(globalObject, jsvalue); } @@ -162,7 +162,6 @@ const protocol = @import("./valkey_protocol.zig"); const std = @import("std"); const bun = @import("bun"); -const JSError = bun.JSError; const jsc = bun.jsc; const node = bun.api.node; const Slice = jsc.ZigString.Slice; diff --git a/src/valkey/js_valkey.zig b/src/valkey/js_valkey.zig index c1f5800025..d55f5a4e5b 100644 --- a/src/valkey/js_valkey.zig +++ b/src/valkey/js_valkey.zig @@ -1,222 +1,9 @@ -pub const SubscriptionCtx = struct { - const Self = @This(); - - _parent: *JSValkeyClient, - original_enable_offline_queue: bool, - original_enable_auto_pipelining: bool, - - const ParentJS = JSValkeyClient.js; - - pub fn init(parent: *JSValkeyClient, enable_offline_queue: bool, enable_auto_pipelining: bool) Self { - const callback_map = jsc.JSMap.create(parent.globalObject); - ParentJS.gc.set(.subscriptionCallbackMap, parent.this_value.get(), parent.globalObject, callback_map); - - const self = Self{ - ._parent = parent, - .original_enable_offline_queue = enable_offline_queue, - .original_enable_auto_pipelining = enable_auto_pipelining, - }; - return self; - } - - fn subscriptionCallbackMap(this: *Self) *jsc.JSMap { - const value_js = ParentJS.gc.get(.subscriptionCallbackMap, this._parent.this_value.get()).?; - return jsc.JSMap.fromJS(value_js).?; - } - - /// Get the total number of channels that this subscription context is subscribed to. - pub fn subscriptionCount(this: *Self, globalObject: *jsc.JSGlobalObject) usize { - return this.subscriptionCallbackMap().size(globalObject); - } - - /// Test whether this context has any subscriptions. It is mandatory to - /// guard deinit with this function. - pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bool { - return this.subscriptionCount(globalObject) > 0; - } - - pub fn clearReceiveHandlers( - this: *Self, - globalObject: *jsc.JSGlobalObject, - channelName: JSValue, - ) void { - const map = this.subscriptionCallbackMap(); - if (map.remove(globalObject, channelName)) { - this._parent.channel_subscription_count -= 1; - this._parent.updateHasPendingActivity(); - } - } - - /// Remove a specific receive handler. - /// - /// Returns: The total number of remaining handlers for this channel, or null if here were no listeners originally - /// registered. - /// - /// Note: This function will empty out the map entry if there are no more handlers registered. - pub fn removeReceiveHandler( - this: *Self, - globalObject: *jsc.JSGlobalObject, - channelName: JSValue, - callback: JSValue, - ) !?usize { - const map = this.subscriptionCallbackMap(); - - const existing = try map.get(globalObject, channelName); - - if (existing == null) { - // Could not find the channel, nothing to remove. - return null; - } - - if (existing.?.isUndefinedOrNull()) { - // Nothing to remove. - return null; - } - - // Existing is guaranteed to be an array of callbacks. - bun.assert(existing.?.isArray()); - - // TODO(markovejnovic): I can't find a better way to do this... I generate a new array, - // filtering out the callback we want to remove. This is woefully inefficient for large - // sets (and surprisingly fast for small sets of callbacks). - // - // Perhaps there is an avenue to build a generic iterator pattern? @taylor.fish and I have - // briefly expressed a desire for this, and I promised her I would look into it, but at - // this moment have no proposal. - var array_it = try existing.?.arrayIterator(globalObject); - const updated_array = try jsc.JSArray.createEmpty(globalObject, 0); - while (try array_it.next()) |iter| { - if (iter == callback) - continue; - - try updated_array.push(globalObject, iter); - } - - // Otherwise, we have ourselves an array of callbacks. We need to remove the element in the - // array that matches the callback. - _ = map.remove(globalObject, channelName); - - // Only populate the map if we have remaining callbacks for this channel. - const new_length = (updated_array.arrayIterator(globalObject) catch unreachable).len; - if (new_length != 0) { - map.set(globalObject, channelName, updated_array); - } else { - this._parent.channel_subscription_count -= 1; - this._parent.updateHasPendingActivity(); - } - - return new_length; - } - - /// Add a handler for receiving messages on a specific channel - pub fn upsertReceiveHandler( - this: *Self, - globalObject: *jsc.JSGlobalObject, - channelName: JSValue, - callback: JSValue, - ) bun.JSError!void { - const map = this.subscriptionCallbackMap(); - - var handlers_array: JSValue = undefined; - var is_new_channel = false; - if (try map.get(globalObject, channelName)) |existing_handler_arr| { - debug("Adding a new receive handler.", .{}); - if (existing_handler_arr.isUndefined()) { - // Create a new array if the existing_handler_arr is undefined/null - handlers_array = try jsc.JSArray.createEmpty(globalObject, 0); - is_new_channel = true; - } else if (existing_handler_arr.isArray()) { - // Use the existing array - handlers_array = existing_handler_arr; - } else unreachable; - } else { - // No existing_handler_arr exists, create a new array - handlers_array = try jsc.JSArray.createEmpty(globalObject, 0); - is_new_channel = true; - } - - // Append the new callback to the array - try handlers_array.push(globalObject, callback); - - // Set the updated array back in the map - map.set(globalObject, channelName, handlers_array); - - // Update subscription count if this is a new channel - if (is_new_channel) { - this._parent.channel_subscription_count += 1; - this._parent.updateHasPendingActivity(); - } - } - - pub fn registerCallback(this: *Self, globalObject: *jsc.JSGlobalObject, eventString: JSValue, callback: JSValue) bun.JSError!void { - this.subscriptionCallbackMap().set(globalObject, eventString, callback); - } - - pub fn getCallbacks(this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue { - const result = try this.subscriptionCallbackMap().get(globalObject, channelName); - if (result) |r| { - if (r.isUndefinedOrNull()) { - return null; - } - } - - return result; - } - - /// Invoke callbacks for a channel with the given arguments - /// Handles both single callbacks and arrays of callbacks - pub fn invokeCallback( - this: *Self, - globalObject: *jsc.JSGlobalObject, - channelName: JSValue, - args: []const JSValue, - ) bun.JSError!void { - const callbacks = try this.getCallbacks(globalObject, channelName) orelse { - debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)}); - return; - }; - - // If callbacks is an array, iterate and call each one - if (callbacks.isArray()) { - var iter = try callbacks.arrayIterator(globalObject); - while (try iter.next()) |callback| { - if (callback.isCallable()) { - _ = callback.call(globalObject, .js_undefined, args) catch |e| { - return e; - }; - } - } - } else if (callbacks.isCallable()) { - _ = callbacks.call(globalObject, .js_undefined, args) catch |e| { - return e; - }; - } - } - - pub fn deinit(this: *Self) void { - this._parent.channel_subscription_count = 0; - this._parent.updateHasPendingActivity(); - - ParentJS.gc.set( - .subscriptionCallbackMap, - this._parent.this_value.get(), - this._parent.globalObject, - .js_undefined, - ); - } -}; - /// Valkey client wrapper for JavaScript pub const JSValkeyClient = struct { client: valkey.ValkeyClient, globalObject: *jsc.JSGlobalObject, this_value: jsc.JSRef = jsc.JSRef.empty(), poll_ref: bun.Async.KeepAlive = .{}, - - _subscription_ctx: ?SubscriptionCtx, - channel_subscription_count: u32 = 0, - has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true), - timer: Timer.EventLoopTimer = .{ .tag = .ValkeyConnectionTimeout, .next = .{ @@ -249,8 +36,6 @@ pub const JSValkeyClient = struct { } pub fn create(globalObject: *jsc.JSGlobalObject, arguments: []const JSValue) bun.JSError!*JSValkeyClient { - const this_allocator = bun.default_allocator; - const vm = globalObject.bunVM(); const url_str = if (arguments.len < 1 or arguments[0].isUndefined()) if (vm.transpiler.env.get("REDIS_URL") orelse vm.transpiler.env.get("VALKEY_URL")) |url| @@ -261,7 +46,7 @@ pub const JSValkeyClient = struct { try arguments[0].toBunString(globalObject); defer url_str.deref(); - const url_utf8 = url_str.toUTF8WithoutRef(this_allocator); + const url_utf8 = url_str.toUTF8WithoutRef(bun.default_allocator); defer url_utf8.deinit(); const url = bun.URL.parse(url_utf8.slice()); @@ -303,7 +88,7 @@ pub const JSValkeyClient = struct { var connection_strings: []u8 = &.{}; errdefer { - this_allocator.free(connection_strings); + bun.default_allocator.free(connection_strings); } if (url.username.len > 0 or url.password.len > 0 or hostname.len > 0) { @@ -311,12 +96,11 @@ pub const JSValkeyClient = struct { b.count(url.username); b.count(url.password); b.count(hostname); - try b.allocate(this_allocator); - defer b.deinit(this_allocator); + try b.allocate(bun.default_allocator); username = b.append(url.username); password = b.append(url.password); hostname = b.append(hostname); - b.moveToSlice(&connection_strings); + connection_strings = b.allocatedSlice(); } const database = if (url.pathname.len > 0) std.fmt.parseInt(u32, url.pathname[1..], 10) catch 0 else 0; @@ -325,7 +109,6 @@ pub const JSValkeyClient = struct { return JSValkeyClient.new(.{ .ref_count = .init(), - ._subscription_ctx = null, .client = .{ .vm = vm, .address = switch (uri) { @@ -337,11 +120,10 @@ pub const JSValkeyClient = struct { }, }, }, - .protocol = uri, .username = username, .password = password, - .in_flight = .init(this_allocator), - .queue = .init(this_allocator), + .in_flight = .init(bun.default_allocator), + .queue = .init(bun.default_allocator), .status = .disconnected, .connection_strings = connection_strings, .socket = .{ @@ -352,7 +134,7 @@ pub const JSValkeyClient = struct { }, }, .database = database, - .allocator = this_allocator, + .allocator = bun.default_allocator, .flags = .{ .enable_auto_reconnect = options.enable_auto_reconnect, .enable_offline_queue = options.enable_offline_queue, @@ -366,122 +148,6 @@ pub const JSValkeyClient = struct { }); } - /// Clone this client while remaining in the initial disconnected state. This does not preserve - /// the - pub fn cloneWithoutConnecting(this: *const JSValkeyClient) bun.OOM!*JSValkeyClient { - const vm = this.globalObject.bunVM(); - - const relocate = struct { - // Given a slice within a window, move the slice to point to a new - // window, with the same offset relative to the window start. - fn slice(original: []const u8, old_base: [*]const u8, new_base: [*]const u8) []const u8 { - const offset = @intFromPtr(original.ptr) - @intFromPtr(old_base); - return new_base[offset..][0..original.len]; - } - }.slice; - - // Make a copy of connection_strings to avoid double-free - const connection_strings_copy = try this.client.allocator.dupe(u8, this.client.connection_strings); - - // Note that there is no need to copy username, password and address since the copies live - // within the connection_strings buffer. - const base_ptr = this.client.connection_strings.ptr; - const new_base = connection_strings_copy.ptr; - const username = relocate(this.client.username, base_ptr, new_base); - const password = relocate(this.client.password, base_ptr, new_base); - const orig_hostname = this.client.address.hostname(); - const hostname = relocate(orig_hostname, base_ptr, new_base); - - return JSValkeyClient.new(.{ - .ref_count = .init(), - ._subscription_ctx = null, - .client = .{ - .vm = vm, - .address = switch (this.client.protocol) { - .standalone_unix, .standalone_tls_unix => .{ .unix = hostname }, - else => .{ - .host = .{ - .host = hostname, - .port = this.client.address.host.port, - }, - }, - }, - .protocol = this.client.protocol, - .username = username, - .password = password, - .in_flight = .init(this.client.allocator), - .queue = .init(this.client.allocator), - .status = .disconnected, - .connection_strings = connection_strings_copy, - .socket = .{ - .SocketTCP = .{ - .socket = .{ - .detached = {}, - }, - }, - }, - .database = this.client.database, - .allocator = this.client.allocator, - .flags = .{ - // Because this starts in the disconnected state, we need to reset some flags. - .is_authenticated = false, - // If the user manually closed the connection, then duplicating a closed client - // means the new client remains finalized. - .is_manually_closed = this.client.flags.is_manually_closed, - .enable_offline_queue = if (this._subscription_ctx) |*ctx| ctx.original_enable_offline_queue else this.client.flags.enable_offline_queue, - .needs_to_open_socket = true, - .enable_auto_reconnect = this.client.flags.enable_auto_reconnect, - .is_reconnecting = false, - .auto_pipelining = if (this._subscription_ctx) |*ctx| ctx.original_enable_auto_pipelining else this.client.flags.auto_pipelining, - // Duplicating a finalized client means it stays finalized. - .finalized = this.client.flags.finalized, - }, - .max_retries = this.client.max_retries, - .connection_timeout_ms = this.client.connection_timeout_ms, - .idle_timeout_interval_ms = this.client.idle_timeout_interval_ms, - }, - .globalObject = this.globalObject, - }); - } - - pub fn getOrCreateSubscriptionCtxEnteringSubscriptionMode( - this: *JSValkeyClient, - ) *SubscriptionCtx { - if (this._subscription_ctx) |*ctx| { - // If we already have a subscription context, return it - return ctx; - } - - // Save the original flag values and create a new subscription context - this._subscription_ctx = SubscriptionCtx.init( - this, - this.client.flags.enable_offline_queue, - this.client.flags.auto_pipelining, - ); - - // We need to make sure we disable the offline queue. - this.client.flags.enable_offline_queue = false; - this.client.flags.auto_pipelining = false; - - return &(this._subscription_ctx.?); - } - - pub fn deleteSubscriptionCtx(this: *JSValkeyClient) void { - if (this._subscription_ctx) |*ctx| { - // Restore the original flag values when leaving subscription mode - this.client.flags.enable_offline_queue = ctx.original_enable_offline_queue; - this.client.flags.auto_pipelining = ctx.original_enable_auto_pipelining; - - ctx.deinit(); - } - - this._subscription_ctx = null; - } - - pub fn isSubscriber(this: *const JSValkeyClient) bool { - return this._subscription_ctx != null; - } - pub fn getConnected(this: *JSValkeyClient, _: *jsc.JSGlobalObject) JSValue { return JSValue.jsBoolean(this.client.status == .connected); } @@ -493,22 +159,16 @@ pub const JSValkeyClient = struct { return JSValue.jsNumber(len); } - pub fn doConnect( - this: *JSValkeyClient, - globalObject: *jsc.JSGlobalObject, - this_value: JSValue, - ) bun.JSError!JSValue { + pub fn doConnect(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, this_value: JSValue) bun.JSError!JSValue { this.ref(); defer this.deref(); // If already connected, resolve immediately if (this.client.status == .connected) { - debug("Connecting client is already connected.", .{}); return jsc.JSPromise.resolvedPromiseValue(globalObject, js.helloGetCached(this_value) orelse .js_undefined); } if (js.connectionPromiseGetCached(this_value)) |promise| { - debug("Connecting client is already connected.", .{}); return promise; } @@ -521,7 +181,6 @@ pub const JSValkeyClient = struct { this.this_value.setStrong(this_value, globalObject); if (this.client.flags.needs_to_open_socket) { - debug("Need to open socket, starting connection process.", .{}); this.poll_ref.ref(this.client.vm); this.connect() catch |err| { @@ -544,7 +203,6 @@ pub const JSValkeyClient = struct { }, .failed => { this.client.status = .disconnected; - this.updateHasPendingActivity(); this.client.flags.is_reconnecting = true; this.client.retry_attempts = 0; this.reconnect(); @@ -715,7 +373,6 @@ pub const JSValkeyClient = struct { defer this.deref(); this.client.status = .connecting; - this.updateHasPendingActivity(); // Ref the poll to keep event loop alive during connection this.poll_ref.disable(); @@ -757,17 +414,7 @@ pub const JSValkeyClient = struct { if (js.connectionPromiseGetCached(this_value)) |promise| { js.connectionPromiseSetCached(this_value, globalObject, .zero); - const js_promise = promise.asPromise().?; - if (this.client.flags.connection_promise_returns_client) { - debug("Resolving connection promise with client instance", .{}); - const this_js = this.toJS(globalObject); - this_js.unprotect(); - js_promise.resolve(globalObject, this_js); - } else { - debug("Resolving connection promise with HELLO response", .{}); - js_promise.resolve(globalObject, hello_value); - } - this.client.flags.connection_promise_returns_client = false; + promise.asPromise().?.resolve(globalObject, hello_value); } } @@ -775,86 +422,6 @@ pub const JSValkeyClient = struct { this.updatePollRef(); } - pub fn onValkeySubscribe(this: *JSValkeyClient, value: *protocol.RESPValue) void { - if (!this.isSubscriber()) { - debug("onSubscribe called but client is not in subscriber mode", .{}); - return; - } - - _ = value; - - this.client.onWritable(); - this.updatePollRef(); - } - - pub fn onValkeyUnsubscribe(this: *JSValkeyClient, value: *protocol.RESPValue) void { - if (!this.isSubscriber()) { - debug("onUnsubscribe called but client is not in subscriber mode", .{}); - return; - } - - var subscription_ctx = this._subscription_ctx.?; - - // Check if we have any remaining subscriptions - // If the callback map is empty, we can exit subscription mode - if (!subscription_ctx.hasSubscriptions(this.globalObject)) { - // No more subscriptions, exit subscription mode - this.deleteSubscriptionCtx(); - } - - _ = value; - - this.client.onWritable(); - this.updatePollRef(); - } - - pub fn onValkeyMessage(this: *JSValkeyClient, value: []protocol.RESPValue) void { - if (!this.isSubscriber()) { - debug("onMessage called but client is not in subscriber mode", .{}); - return; - } - - const globalObject = this.globalObject; - const event_loop = this.client.vm.eventLoop(); - event_loop.enter(); - defer event_loop.exit(); - - // The message push should be an array with [channel, message] - if (value.len < 2) { - debug("Message array has insufficient elements: {}", .{value.len}); - return; - } - - // Extract channel and message - const channel_value = value[0].toJS(globalObject) catch { - debug("Failed to convert channel to JS", .{}); - return; - }; - const message_value = value[1].toJS(globalObject) catch { - debug("Failed to convert message to JS", .{}); - return; - }; - - // Get the subscription context - const subs_ctx = &(this._subscription_ctx orelse { - debug("No subscription context found", .{}); - return; - }); - - // Invoke callbacks for this channel with message and channel as arguments - subs_ctx.invokeCallback( - globalObject, - channel_value, - &[_]JSValue{ message_value, channel_value }, - ) catch |e| { - debug("Failed to invoke callbacks. Error: {}", .{e}); - this.failWithJSValue(globalObject.takeException(e)); - }; - - this.client.onWritable(); - this.updatePollRef(); - } - // Callback for when Valkey client needs to reconnect pub fn onValkeyReconnect(this: *JSValkeyClient) void { // Schedule reconnection using our safe timer methods @@ -927,33 +494,6 @@ pub const JSValkeyClient = struct { } } - pub fn hasPendingActivity(this: *JSValkeyClient) bool { - // TODO(markovejnovic): Could this be .monotonic? My intuition says - // yes, because none of the things that may be freed will actually be - // read. The pointers don't move, so it should be safe, but I've - // decided here to be conservative. - return this.has_pending_activity.load(.acquire); - } - - pub fn updateHasPendingActivity(this: *JSValkeyClient) void { - if (this.client.hasAnyPendingCommands()) { - this.has_pending_activity.store(true, .release); - return; - } - - if (this.channel_subscription_count > 0) { - this.has_pending_activity.store(true, .release); - return; - } - - if (this.client.status != .connected and this.client.status != .disconnected) { - this.has_pending_activity.store(true, .release); - return; - } - - this.has_pending_activity.store(false, .release); - } - pub fn finalize(this: *JSValkeyClient) void { // Since this.stopTimers impacts the reference count potentially, we // need to ref/unref here as well. @@ -967,9 +507,6 @@ pub const JSValkeyClient = struct { } this.client.flags.finalized = true; this.client.close(); - if (this._subscription_ctx) |*ctx| { - ctx.deinit(); - } this.deref(); } @@ -984,7 +521,6 @@ pub const JSValkeyClient = struct { } fn connect(this: *JSValkeyClient) !void { - debug("Connecting to Redis.", .{}); this.client.flags.needs_to_open_socket = false; const vm = this.client.vm; @@ -1105,7 +641,6 @@ pub const JSValkeyClient = struct { pub const decr = fns.decr; pub const del = fns.del; pub const dump = fns.dump; - pub const duplicate = fns.duplicate; pub const exists = fns.exists; pub const expire = fns.expire; pub const expiretime = fns.expiretime; @@ -1207,7 +742,6 @@ fn SocketHandler(comptime ssl: bool) type { loop.enter(); defer loop.exit(); this.client.status = .failed; - this.updateHasPendingActivity(); this.client.flags.is_manually_closed = true; this.client.failWithJSValue(this.globalObject, ssl_error.toJS(this.globalObject)); this.client.close(); @@ -1222,6 +756,7 @@ fn SocketHandler(comptime ssl: bool) type { pub fn onClose(this: *JSValkeyClient, _: SocketType, _: i32, _: ?*anyopaque) void { // Ensure the socket pointer is updated. + this.client.socket = .{ .SocketTCP = .detached }; this.client.onClose(); } diff --git a/src/valkey/js_valkey_functions.zig b/src/valkey/js_valkey_functions.zig index ee36988b8e..7092673dd8 100644 --- a/src/valkey/js_valkey_functions.zig +++ b/src/valkey/js_valkey_functions.zig @@ -1,19 +1,3 @@ -fn requireNotSubscriber(this: *const JSValkeyClient, function_name: []const u8) bun.JSError!void { - const fmt_string = "RedisClient.prototype.{s} cannot be called while in subscriber mode."; - - if (this.isSubscriber()) { - return this.globalObject.throw(fmt_string, .{function_name}); - } -} - -fn requireSubscriber(this: *const JSValkeyClient, function_name: []const u8) bun.JSError!void { - const fmt_string = "RedisClient.prototype.{s} can only be called while in subscriber mode."; - - if (!this.isSubscriber()) { - return this.globalObject.throw(fmt_string, .{function_name}); - } -} - pub fn jsSend(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { const command = try callframe.argument(0).toBunString(globalObject); defer command.deref(); @@ -57,8 +41,6 @@ pub fn jsSend(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfram } pub fn get(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("get", "key", "string or buffer"); }; @@ -79,8 +61,6 @@ pub fn get(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: } pub fn getBuffer(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("getBuffer", "key", "string or buffer"); }; @@ -101,8 +81,6 @@ pub fn getBuffer(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callf } pub fn set(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const args_view = callframe.arguments(); var stack_fallback = std.heap.stackFallback(512, bun.default_allocator); var args = try std.ArrayList(JSArgument).initCapacity(stack_fallback.get(), args_view.len); @@ -149,8 +127,6 @@ pub fn set(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: } pub fn incr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("incr", "key", "string or buffer"); }; @@ -171,8 +147,6 @@ pub fn incr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: } pub fn decr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("decr", "key", "string or buffer"); }; @@ -193,8 +167,6 @@ pub fn decr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: } pub fn exists(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("exists", "key", "string or buffer"); }; @@ -216,8 +188,6 @@ pub fn exists(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfram } pub fn expire(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("expire", "key", "string or buffer"); }; @@ -249,8 +219,6 @@ pub fn expire(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfram } pub fn ttl(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("ttl", "key", "string or buffer"); }; @@ -272,8 +240,6 @@ pub fn ttl(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: // Implement srem (remove value from a set) pub fn srem(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("srem", "key", "string or buffer"); }; @@ -299,8 +265,6 @@ pub fn srem(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: // Implement srandmember (get random member from set) pub fn srandmember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("srandmember", "key", "string or buffer"); }; @@ -322,8 +286,6 @@ pub fn srandmember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, cal // Implement smembers (get all members of a set) pub fn smembers(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("smembers", "key", "string or buffer"); }; @@ -345,8 +307,6 @@ pub fn smembers(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfr // Implement spop (pop a random member from a set) pub fn spop(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("spop", "key", "string or buffer"); }; @@ -368,8 +328,6 @@ pub fn spop(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: // Implement sadd (add member to a set) pub fn sadd(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("sadd", "key", "string or buffer"); }; @@ -395,8 +353,6 @@ pub fn sadd(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: // Implement sismember (check if value is member of a set) pub fn sismember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("sismember", "key", "string or buffer"); }; @@ -423,8 +379,6 @@ pub fn sismember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callf // Implement hmget (get multiple values from hash) pub fn hmget(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = (try fromJS(globalObject, callframe.argument(0))) orelse { return globalObject.throwInvalidArgumentType("hmget", "key", "string or buffer"); }; @@ -472,8 +426,6 @@ pub fn hmget(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe // Implement hincrby (increment hash field by integer value) pub fn hincrby(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = try callframe.argument(0).toBunString(globalObject); defer key.deref(); const field = try callframe.argument(1).toBunString(globalObject); @@ -504,8 +456,6 @@ pub fn hincrby(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfra // Implement hincrbyfloat (increment hash field by float value) pub fn hincrbyfloat(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = try callframe.argument(0).toBunString(globalObject); defer key.deref(); const field = try callframe.argument(1).toBunString(globalObject); @@ -536,8 +486,6 @@ pub fn hincrbyfloat(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, ca // Implement hmset (set multiple values in hash) pub fn hmset(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - const key = try callframe.argument(0).toBunString(globalObject); defer key.deref(); @@ -630,494 +578,59 @@ pub fn ping(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: return promise.toJS(); } -pub fn publish( - this: *JSValkeyClient, - globalObject: *jsc.JSGlobalObject, - callframe: *jsc.CallFrame, -) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - - const args_view = callframe.arguments(); - var stack_fallback = std.heap.stackFallback(512, bun.default_allocator); - var args = try std.ArrayList(JSArgument).initCapacity(stack_fallback.get(), args_view.len); - defer { - for (args.items) |*item| { - item.deinit(); - } - args.deinit(); - } - - const arg0 = callframe.argument(0); - if (!arg0.isString()) { - return globalObject.throwInvalidArgumentType("publish", "channel", "string"); - } - const channel = (try fromJS(globalObject, arg0)) orelse unreachable; - - args.appendAssumeCapacity(channel); - - const arg1 = callframe.argument(1); - if (!arg1.isString()) { - return globalObject.throwInvalidArgumentType("publish", "message", "string"); - } - const message = (try fromJS(globalObject, arg1)) orelse unreachable; - args.appendAssumeCapacity(message); - - const promise = this.send( - globalObject, - callframe.this(), - &.{ - .command = "PUBLISH", - .args = .{ .args = args.items }, - }, - ) catch |err| { - return protocol.valkeyErrorToJS(globalObject, "Failed to send PUBLISH command", err); - }; - - return promise.toJS(); -} - -pub fn subscribe( - this: *JSValkeyClient, - globalObject: *jsc.JSGlobalObject, - callframe: *jsc.CallFrame, -) bun.JSError!JSValue { - const channel_or_many, const handler_callback = callframe.argumentsAsArray(2); - var stack_fallback = std.heap.stackFallback(512, bun.default_allocator); - var redis_channels = try std.ArrayList(JSArgument).initCapacity(stack_fallback.get(), 1); - defer { - for (redis_channels.items) |*item| { - item.deinit(); - } - redis_channels.deinit(); - } - - if (!handler_callback.isCallable()) { - return globalObject.throwInvalidArgumentType("subscribe", "listener", "function"); - } - - // We now need to register the callback with our subscription context, which may or may not exist. - var subscription_ctx = this.getOrCreateSubscriptionCtxEnteringSubscriptionMode(); - - // The first argument given is the channel or may be an array of channels. - if (channel_or_many.isArray()) { - if ((try channel_or_many.getLength(globalObject)) == 0) { - return globalObject.throwInvalidArguments("subscribe requires at least one channel", .{}); - } - try redis_channels.ensureTotalCapacity(try channel_or_many.getLength(globalObject)); - - var array_iter = try channel_or_many.arrayIterator(globalObject); - while (try array_iter.next()) |channel_arg| { - const channel = (try fromJS(globalObject, channel_arg)) orelse { - return globalObject.throwInvalidArgumentType("subscribe", "channel", "string"); - }; - redis_channels.appendAssumeCapacity(channel); - - try subscription_ctx.upsertReceiveHandler(globalObject, channel_arg, handler_callback); - } - } else if (channel_or_many.isString()) { - // It is a single string channel - const channel = (try fromJS(globalObject, channel_or_many)) orelse { - return globalObject.throwInvalidArgumentType("subscribe", "channel", "string"); - }; - redis_channels.appendAssumeCapacity(channel); - - try subscription_ctx.upsertReceiveHandler(globalObject, channel_or_many, handler_callback); - } else { - return globalObject.throwInvalidArgumentType("subscribe", "channel", "string or array"); - } - - const command: valkey.Command = .{ - .command = "SUBSCRIBE", - .args = .{ .args = redis_channels.items }, - }; - const promise = this.send( - globalObject, - callframe.this(), - &command, - ) catch |err| { - // If we find an error, we need to clean up the subscription context. - this.deleteSubscriptionCtx(); - return protocol.valkeyErrorToJS(globalObject, "Failed to send SUBSCRIBE command", err); - }; - - return promise.toJS(); -} - -/// Send redis the UNSUBSCRIBE RESP command and clean up anything necessary after the unsubscribe commoand. -/// -/// The subscription context must exist when calling this function. -fn sendUnsubscribeRequestAndCleanup( - this: *JSValkeyClient, - this_js: jsc.JSValue, - globalObject: *jsc.JSGlobalObject, - redis_channels: []JSArgument, -) !jsc.JSValue { - // Send UNSUBSCRIBE command - const command: valkey.Command = .{ - .command = "UNSUBSCRIBE", - .args = .{ .args = redis_channels }, - }; - const promise = this.send( - globalObject, - this_js, - &command, - ) catch |err| { - return protocol.valkeyErrorToJS(globalObject, "Failed to send UNSUBSCRIBE command", err); - }; - - // We do not delete the subscription context here, but rather when the - // onValkeyUnsubscribe callback is invoked. - - return promise.toJS(); -} - -pub fn unsubscribe( - this: *JSValkeyClient, - globalObject: *jsc.JSGlobalObject, - callframe: *jsc.CallFrame, -) bun.JSError!JSValue { - // Check if we're in subscription mode - try requireSubscriber(this, @src().fn_name); - - const args_view = callframe.arguments(); - - var stack_fallback = std.heap.stackFallback(512, bun.default_allocator); - var redis_channels = try std.ArrayList(JSArgument).initCapacity(stack_fallback.get(), 1); - defer { - for (redis_channels.items) |*item| { - item.deinit(); - } - redis_channels.deinit(); - } - - // If no arguments, unsubscribe from all channels - if (args_view.len == 0) { - return try sendUnsubscribeRequestAndCleanup(this, callframe.this(), globalObject, redis_channels.items); - } - - // The first argument can be a channel or an array of channels - const channel_or_many = callframe.argument(0); - - // Get the subscription context - var subscription_ctx = this._subscription_ctx orelse { - return .js_undefined; - }; - - // Two arguments means .unsubscribe(channel, listener) is invoked. - if (callframe.arguments().len == 2) { - // In this case, the first argument is a channel string and the second - // argument is the handler to remove. - if (!channel_or_many.isString()) { - return globalObject.throwInvalidArgumentType( - "unsubscribe", - "channel", - "string", - ); - } - - const channel = channel_or_many; - const listener_cb = callframe.argument(1); - - if (!listener_cb.isCallable()) { - return globalObject.throwInvalidArgumentType( - "unsubscribe", - "listener", - "function", - ); - } - - // Populate the redis_channels list with the single channel to - // unsubscribe from. This s important since this list is used to send - // the UNSUBSCRIBE command to redis. Without this, we would end up - // unsubscribing from all channels. - redis_channels.appendAssumeCapacity((try fromJS(globalObject, channel)) orelse { - return globalObject.throwInvalidArgumentType("unsubscribe", "channel", "string"); - }); - - const remaining_listeners = subscription_ctx.removeReceiveHandler(globalObject, channel, listener_cb) catch { - return globalObject.throw( - "Failed to remove handler for channel {}", - .{channel.asString().getZigString(globalObject)}, - ); - } orelse { - // Listeners weren't present in the first place, so we can return a - // resolved promise. - const promise = jsc.JSPromise.create(globalObject); - promise.resolve(globalObject, .js_undefined); - return promise.toJS(); - }; - - // In this case, we only want to send the unsubscribe command to redis if there are no more listeners for this - // channel. - if (remaining_listeners == 0) { - return try sendUnsubscribeRequestAndCleanup(this, callframe.this(), globalObject, redis_channels.items); - } - - // Otherwise, in order to keep the API consistent, we need to return a resolved promise. - const promise = jsc.JSPromise.create(globalObject); - promise.resolve(globalObject, .js_undefined); - - return promise.toJS(); - } - - if (channel_or_many.isArray()) { - if ((try channel_or_many.getLength(globalObject)) == 0) { - return globalObject.throwInvalidArguments( - "unsubscribe requires at least one channel", - .{}, - ); - } - - try redis_channels.ensureTotalCapacity(try channel_or_many.getLength(globalObject)); - // It is an array, so let's iterate over it - var array_iter = try channel_or_many.arrayIterator(globalObject); - while (try array_iter.next()) |channel_arg| { - const channel = (try fromJS(globalObject, channel_arg)) orelse { - return globalObject.throwInvalidArgumentType("unsubscribe", "channel", "string"); - }; - redis_channels.appendAssumeCapacity(channel); - // Clear the handlers for this channel - subscription_ctx.clearReceiveHandlers(globalObject, channel_arg); - } - } else if (channel_or_many.isString()) { - // It is a single string channel - const channel = (try fromJS(globalObject, channel_or_many)) orelse { - return globalObject.throwInvalidArgumentType("unsubscribe", "channel", "string"); - }; - redis_channels.appendAssumeCapacity(channel); - // Clear the handlers for this channel - subscription_ctx.clearReceiveHandlers(globalObject, channel_or_many); - } else { - return globalObject.throwInvalidArgumentType("unsubscribe", "channel", "string or array"); - } - - // Now send the unsubscribe command and clean up if necessary - return try sendUnsubscribeRequestAndCleanup(this, callframe.this(), globalObject, redis_channels.items); -} - -pub fn bitcount(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("bitcount", "BITCOUNT", "key").call(this, globalObject, callframe); -} - -pub fn dump(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("dump", "DUMP", "key").call(this, globalObject, callframe); -} - -pub fn expiretime(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("expiretime", "EXPIRETIME", "key").call(this, globalObject, callframe); -} - -pub fn getdel(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("getdel", "GETDEL", "key").call(this, globalObject, callframe); -} - -pub fn getex(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("getex", "GETEX").call(this, globalObject, callframe); -} - -pub fn hgetall(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("hgetall", "HGETALL", "key").call(this, globalObject, callframe); -} - -pub fn hkeys(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("hkeys", "HKEYS", "key").call(this, globalObject, callframe); -} - -pub fn hlen(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("hlen", "HLEN", "key").call(this, globalObject, callframe); -} - -pub fn hvals(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("hvals", "HVALS", "key").call(this, globalObject, callframe); -} - -pub fn keys(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("keys", "KEYS", "key").call(this, globalObject, callframe); -} - -pub fn llen(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("llen", "LLEN", "key").call(this, globalObject, callframe); -} - -pub fn lpop(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("lpop", "LPOP", "key").call(this, globalObject, callframe); -} - -pub fn persist(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("persist", "PERSIST", "key").call(this, globalObject, callframe); -} - -pub fn pexpiretime(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("pexpiretime", "PEXPIRETIME", "key").call(this, globalObject, callframe); -} - -pub fn pttl(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("pttl", "PTTL", "key").call(this, globalObject, callframe); -} - -pub fn rpop(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("rpop", "RPOP", "key").call(this, globalObject, callframe); -} - -pub fn scard(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("scard", "SCARD", "key").call(this, globalObject, callframe); -} - -pub fn strlen(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("strlen", "STRLEN", "key").call(this, globalObject, callframe); -} - -pub fn @"type"(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("type", "TYPE", "key").call(this, globalObject, callframe); -} - -pub fn zcard(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("zcard", "ZCARD", "key").call(this, globalObject, callframe); -} - -pub fn zpopmax(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("zpopmax", "ZPOPMAX", "key").call(this, globalObject, callframe); -} - -pub fn zpopmin(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("zpopmin", "ZPOPMIN", "key").call(this, globalObject, callframe); -} - -pub fn zrandmember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey)"("zrandmember", "ZRANDMEMBER", "key").call(this, globalObject, callframe); -} - -pub fn append(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue)"("append", "APPEND", "key", "value").call(this, globalObject, callframe); -} -pub fn getset(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue)"("getset", "GETSET", "key", "value").call(this, globalObject, callframe); -} -pub fn lpush(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("lpush", "LPUSH").call(this, globalObject, callframe); -} -pub fn lpushx(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("lpushx", "LPUSHX").call(this, globalObject, callframe); -} -pub fn pfadd(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue)"("pfadd", "PFADD", "key", "value").call(this, globalObject, callframe); -} -pub fn rpush(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("rpush", "RPUSH").call(this, globalObject, callframe); -} -pub fn rpushx(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("rpushx", "RPUSHX").call(this, globalObject, callframe); -} -pub fn setnx(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue)"("setnx", "SETNX", "key", "value").call(this, globalObject, callframe); -} -pub fn zscore(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, value: RedisValue)"("zscore", "ZSCORE", "key", "value").call(this, globalObject, callframe); -} - -pub fn del(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, ...args: RedisKey[])"("del", "DEL", "key").call(this, globalObject, callframe); -} -pub fn mget(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(key: RedisKey, ...args: RedisKey[])"("mget", "MGET", "key").call(this, globalObject, callframe); -} - -pub fn script(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("script", "SCRIPT").call(this, globalObject, callframe); -} -pub fn select(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("select", "SELECT").call(this, globalObject, callframe); -} -pub fn spublish(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("spublish", "SPUBLISH").call(this, globalObject, callframe); -} -pub fn smove(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("smove", "SMOVE").call(this, globalObject, callframe); -} -pub fn substr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("substr", "SUBSTR").call(this, globalObject, callframe); -} -pub fn hstrlen(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("hstrlen", "HSTRLEN").call(this, globalObject, callframe); -} -pub fn zrank(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("zrank", "ZRANK").call(this, globalObject, callframe); -} -pub fn zrevrank(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue { - try requireNotSubscriber(this, @src().fn_name); - return compile.@"(...strings: string[])"("zrevrank", "ZREVRANK").call(this, globalObject, callframe); -} - -pub fn duplicate( - this: *JSValkeyClient, - globalObject: *jsc.JSGlobalObject, - callframe: *jsc.CallFrame, -) bun.JSError!JSValue { - // We ignore the arguments if the user provided any. - _ = callframe; - - var new_client: *JSValkeyClient = try this.cloneWithoutConnecting(); - var new_client_js = new_client.toJS(globalObject); - new_client.this_value = jsc.JSRef.initWeak(new_client_js); - - // If the original client is already connected and not manually closed, start connecting the new client. - if (this.client.status == .connected and !this.client.flags.is_manually_closed) { - new_client.client.flags.connection_promise_returns_client = true; - new_client_js.protect(); - return try new_client.doConnect(globalObject, new_client_js); - } - - // Otherwise, we create a dummy promise to yield the unconnected client. - const promise = jsc.JSPromise.create(globalObject); - promise.resolve(globalObject, new_client_js); - return promise.toJS(); -} - +pub const bitcount = compile.@"(key: RedisKey)"("bitcount", "BITCOUNT", "key").call; +pub const dump = compile.@"(key: RedisKey)"("dump", "DUMP", "key").call; +pub const expiretime = compile.@"(key: RedisKey)"("expiretime", "EXPIRETIME", "key").call; +pub const getdel = compile.@"(key: RedisKey)"("getdel", "GETDEL", "key").call; +pub const getex = compile.@"(...strings: string[])"("getex", "GETEX").call; +pub const hgetall = compile.@"(key: RedisKey)"("hgetall", "HGETALL", "key").call; +pub const hkeys = compile.@"(key: RedisKey)"("hkeys", "HKEYS", "key").call; +pub const hlen = compile.@"(key: RedisKey)"("hlen", "HLEN", "key").call; +pub const hvals = compile.@"(key: RedisKey)"("hvals", "HVALS", "key").call; +pub const keys = compile.@"(key: RedisKey)"("keys", "KEYS", "key").call; +pub const llen = compile.@"(key: RedisKey)"("llen", "LLEN", "key").call; +pub const lpop = compile.@"(key: RedisKey)"("lpop", "LPOP", "key").call; +pub const persist = compile.@"(key: RedisKey)"("persist", "PERSIST", "key").call; +pub const pexpiretime = compile.@"(key: RedisKey)"("pexpiretime", "PEXPIRETIME", "key").call; +pub const pttl = compile.@"(key: RedisKey)"("pttl", "PTTL", "key").call; +pub const rpop = compile.@"(key: RedisKey)"("rpop", "RPOP", "key").call; +pub const scard = compile.@"(key: RedisKey)"("scard", "SCARD", "key").call; +pub const strlen = compile.@"(key: RedisKey)"("strlen", "STRLEN", "key").call; +pub const @"type" = compile.@"(key: RedisKey)"("type", "TYPE", "key").call; +pub const zcard = compile.@"(key: RedisKey)"("zcard", "ZCARD", "key").call; +pub const zpopmax = compile.@"(key: RedisKey)"("zpopmax", "ZPOPMAX", "key").call; +pub const zpopmin = compile.@"(key: RedisKey)"("zpopmin", "ZPOPMIN", "key").call; +pub const zrandmember = compile.@"(key: RedisKey)"("zrandmember", "ZRANDMEMBER", "key").call; + +pub const append = compile.@"(key: RedisKey, value: RedisValue)"("append", "APPEND", "key", "value").call; +pub const getset = compile.@"(key: RedisKey, value: RedisValue)"("getset", "GETSET", "key", "value").call; +pub const lpush = compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("lpush", "LPUSH").call; +pub const lpushx = compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("lpushx", "LPUSHX").call; +pub const pfadd = compile.@"(key: RedisKey, value: RedisValue)"("pfadd", "PFADD", "key", "value").call; +pub const rpush = compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("rpush", "RPUSH").call; +pub const rpushx = compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("rpushx", "RPUSHX").call; +pub const setnx = compile.@"(key: RedisKey, value: RedisValue)"("setnx", "SETNX", "key", "value").call; +pub const zscore = compile.@"(key: RedisKey, value: RedisValue)"("zscore", "ZSCORE", "key", "value").call; + +pub const del = compile.@"(key: RedisKey, ...args: RedisKey[])"("del", "DEL", "key").call; +pub const mget = compile.@"(key: RedisKey, ...args: RedisKey[])"("mget", "MGET", "key").call; + +pub const publish = compile.@"(...strings: string[])"("publish", "PUBLISH").call; +pub const script = compile.@"(...strings: string[])"("script", "SCRIPT").call; +pub const select = compile.@"(...strings: string[])"("select", "SELECT").call; +pub const spublish = compile.@"(...strings: string[])"("spublish", "SPUBLISH").call; +pub const smove = compile.@"(...strings: string[])"("smove", "SMOVE").call; +pub const substr = compile.@"(...strings: string[])"("substr", "SUBSTR").call; +pub const hstrlen = compile.@"(...strings: string[])"("hstrlen", "HSTRLEN").call; +pub const zrank = compile.@"(...strings: string[])"("zrank", "ZRANK").call; +pub const zrevrank = compile.@"(...strings: string[])"("zrevrank", "ZREVRANK").call; +pub const subscribe = compile.@"(...strings: string[])"("subscribe", "SUBSCRIBE").call; pub const psubscribe = compile.@"(...strings: string[])"("psubscribe", "PSUBSCRIBE").call; +pub const unsubscribe = compile.@"(...strings: string[])"("unsubscribe", "UNSUBSCRIBE").call; pub const punsubscribe = compile.@"(...strings: string[])"("punsubscribe", "PUNSUBSCRIBE").call; pub const pubsub = compile.@"(...strings: string[])"("pubsub", "PUBSUB").call; +// publish(channel: RedisValue, message: RedisValue) // script(subcommand: "LOAD", script: RedisValue) // select(index: number | string) // spublish(shardchannel: RedisValue, message: RedisValue) diff --git a/src/valkey/valkey.zig b/src/valkey/valkey.zig index 67538e1098..87b9cea495 100644 --- a/src/valkey/valkey.zig +++ b/src/valkey/valkey.zig @@ -18,16 +18,6 @@ pub const ConnectionFlags = struct { is_reconnecting: bool = false, auto_pipelining: bool = true, finalized: bool = false, - // This flag is a slight hack to allow returning the client instance in the - // promise which resolves when the connection is established. There are two - // modes through which a client may connect: - // 1. Connect through `client.connect()` which has the semantics of - // resolving the promise with the connection information. - // 2. Through `client.duplicate()` which creates a promise through - // `onConnect()` which resolves with the client instance itself. - // This flag is set to true in the latter case to indicate to the promise - // resolution delegation to resolve the promise with the client. - connection_promise_returns_client: bool = false, }; /// Valkey connection status @@ -116,13 +106,6 @@ pub const Address = union(enum) { port: u16, }, - pub fn hostname(this: Address) []const u8 { - return switch (this) { - .unix => |unix_addr| return unix_addr, - .host => |h| return h.host, - }; - } - pub fn connect(this: *const Address, client: *ValkeyClient, ctx: *bun.uws.SocketContext, is_tls: bool) !uws.AnySocket { switch (is_tls) { inline else => |tls| { @@ -172,7 +155,6 @@ pub const ValkeyClient = struct { username: []const u8 = "", database: u32 = 0, address: Address, - protocol: Protocol, connection_strings: []u8 = &.{}, @@ -229,8 +211,6 @@ pub const ValkeyClient = struct { } this.allocator.free(this.connection_strings); - // Note there is no need to deallocate username, password and hostname since they are - // within the this.connection_strings buffer. this.write_buffer.deinit(this.allocator); this.read_buffer.deinit(this.allocator); this.tls.deinit(); @@ -279,7 +259,6 @@ pub const ValkeyClient = struct { .meta = command.meta, .promise = command.promise, }) catch |err| bun.handleOom(err); - this.parent().updateHasPendingActivity(); total += 1; total_bytelength += command.serialized_data.len; @@ -290,7 +269,6 @@ pub const ValkeyClient = struct { bun.handleOom(this.write_buffer.byte_list.ensureUnusedCapacity(this.allocator, total_bytelength)); for (pipelineable_commands) |*command| { bun.handleOom(this.write_buffer.write(this.allocator, command.serialized_data)); - this.parent().updateHasPendingActivity(); // Free the serialized data since we've copied it to the write buffer this.allocator.free(command.serialized_data); } @@ -376,7 +354,6 @@ pub const ValkeyClient = struct { if (wrote > 0) { this.write_buffer.consume(@intCast(wrote)); } - this.parent().updateHasPendingActivity(); return this.write_buffer.len() > 0; } @@ -406,14 +383,14 @@ pub const ValkeyClient = struct { /// Mark the connection as failed with error message pub fn fail(this: *ValkeyClient, message: []const u8, err: protocol.RedisError) void { - debug("failed: {s}: {}", .{ message, err }); + debug("failed: {s}: {s}", .{ message, @errorName(err) }); if (this.status == .failed) return; if (this.flags.finalized) { // We can't run promises inside finalizers. if (this.queue.count + this.in_flight.count > 0) { const vm = this.vm; - const deferred_failure = bun.new(DeferredFailure, .{ + const deferred_failrue = bun.new(DeferredFailure, .{ // This memory is not owned by us. .message = bun.handleOom(bun.default_allocator.dupe(u8, message)), @@ -424,7 +401,7 @@ pub const ValkeyClient = struct { }); this.in_flight = .init(this.allocator); this.queue = .init(this.allocator); - deferred_failure.enqueue(); + deferred_failrue.enqueue(); } // Allow the finalizer to call .close() @@ -438,7 +415,6 @@ pub const ValkeyClient = struct { pub fn failWithJSValue(this: *ValkeyClient, globalThis: *jsc.JSGlobalObject, jsvalue: jsc.JSValue) void { this.status = .failed; rejectAllPendingCommands(&this.in_flight, &this.queue, globalThis, this.allocator, jsvalue); - this.parent().updateHasPendingActivity(); if (!this.connectionReady()) { this.flags.is_manually_closed = true; @@ -487,7 +463,6 @@ pub const ValkeyClient = struct { debug("reconnect in {d}ms (attempt {d}/{d})", .{ delay_ms, this.retry_attempts, this.max_retries }); this.status = .disconnected; - this.parent().updateHasPendingActivity(); this.flags.is_reconnecting = true; this.flags.is_authenticated = false; this.flags.is_selecting_db_internal = false; @@ -523,8 +498,6 @@ pub const ValkeyClient = struct { // Without auto pipelining, wait for in-flight to empty before draining _ = this.drain(); } - - this.parent().updateHasPendingActivity(); } _ = this.flushData(); @@ -537,7 +510,6 @@ pub const ValkeyClient = struct { // Path 1: Buffer already has data, append and process from buffer if (this.read_buffer.remaining().len > 0) { this.read_buffer.write(this.allocator, data) catch @panic("failed to write to read buffer"); - this.parent().updateHasPendingActivity(); // Process as many complete messages from the buffer as possible while (true) { @@ -570,7 +542,6 @@ pub const ValkeyClient = struct { } this.read_buffer.consume(@truncate(bytes_consumed)); - this.parent().updateHasPendingActivity(); var value_to_handle = value; // Use temp var for defer this.handleResponse(&value_to_handle) catch |err| { @@ -642,55 +613,6 @@ pub const ValkeyClient = struct { // If the loop finishes, the entire 'data' was processed without needing the buffer. } - /// Try handling this response as a subscriber-state response. - /// Returns `handled` if we handled it, `fallthrough` if we did not. - fn handleSubscribeResponse(this: *ValkeyClient, value: *protocol.RESPValue, pair: *ValkeyCommand.PromisePair) enum { handled, fallthrough } { - // Resolve the promise with the potentially transformed value - var promise_ptr = &pair.promise; - const globalThis = this.globalObject(); - const loop = this.vm.eventLoop(); - - debug("Handling a subscribe response: {any}", .{value.*}); - loop.enter(); - defer loop.exit(); - - return switch (value.*) { - .Error => { - promise_ptr.reject(globalThis, value.toJS(globalThis)); - return .handled; - }, - .Push => |push| { - const p = this.parent(); - const subs_ctx = p.getOrCreateSubscriptionCtxEnteringSubscriptionMode(); - const sub_count = subs_ctx.subscriptionCount(globalThis); - - if (std.mem.eql(u8, push.kind, "subscribe")) { - this.onValkeySubscribe(value); - promise_ptr.promise.resolve(globalThis, .jsNumber(sub_count)); - return .handled; - } else if (std.mem.eql(u8, push.kind, "unsubscribe")) { - this.onValkeyUnsubscribe(value); - promise_ptr.promise.resolve(globalThis, .js_undefined); - return .handled; - } else { - // We should rarely reach this point. If we're guaranteed to be handling a subscribe/unsubscribe, - // then this is an unexpected path. - @branchHint(.cold); - this.fail( - "Push message is not a subscription message.", - protocol.RedisError.InvalidResponseType, - ); - return .handled; - } - }, - else => { - // This may be a regular command response. Let's pass it down - // to the next handler. - return .fallthrough; - }, - }; - } - fn handleHelloResponse(this: *ValkeyClient, value: *protocol.RESPValue) void { debug("Processing HELLO response", .{}); @@ -702,7 +624,6 @@ pub const ValkeyClient = struct { .SimpleString => |str| { if (std.mem.eql(u8, str, "OK")) { this.status = .connected; - this.parent().updateHasPendingActivity(); this.flags.is_authenticated = true; this.onValkeyConnect(value); return; @@ -736,7 +657,6 @@ pub const ValkeyClient = struct { // Authentication successful via HELLO this.status = .connected; - this.parent().updateHasPendingActivity(); this.flags.is_authenticated = true; this.onValkeyConnect(value); return; @@ -785,64 +705,9 @@ pub const ValkeyClient = struct { }, }; } - // Let's load the promise pair. - var pair_maybe = this.in_flight.readItem(); - - // We handle subscriptions specially because they are not regular - // commands and their failure will potentially cause the client to drop - // out of subscriber mode. - if (this.parent().isSubscriber()) { - debug("This client is a subscriber. Handling as subscriber...", .{}); - - // There are multiple different commands we may receive in - // subscriber mode. One is from a client.subscribe() call which - // requires that a promise is in-flight, but otherwise, we may also - // receive push messages from the server that do not have an - // associated promise. - if (pair_maybe) |*pair| { - debug("There is a request in flight. Handling as a subscribe request...", .{}); - if (this.handleSubscribeResponse(value, pair) == .handled) { - return; - } - } - - switch (value.*) { - .Error => |err| { - this.fail(err, protocol.RedisError.InvalidResponse); - return; - }, - .Push => |push| { - if (std.mem.eql(u8, push.kind, "message")) { - @branchHint(.likely); - debug("Received a message.", .{}); - this.onValkeyMessage(push.data); - return; - } else if (std.mem.eql(u8, push.kind, "subscribe")) { - @branchHint(.cold); - debug("Received subscription message without promise: {any}", .{push.data}); - return; - } else if (std.mem.eql(u8, push.kind, "unsubscribe")) { - @branchHint(.cold); - debug("Received unsubscribe message without promise: {any}", .{push.data}); - return; - } else { - @branchHint(.cold); - this.fail("Unexpected push message kind without promise", protocol.RedisError.InvalidResponseType); - return; - } - }, - else => { - // In the else case, we fall through to the regular - // handler. Subscribers can send .Push commands which have - // the same semantics as regular commands. - }, - } - - debug("Treating subscriber response as a regular command...", .{}); - } // For regular commands, get the next command+promise pair from the queue - var pair = pair_maybe orelse { + var pair = this.in_flight.readItem() orelse { debug("Received response but no promise in queue", .{}); return; }; @@ -964,7 +829,6 @@ pub const ValkeyClient = struct { .meta = offline_cmd.meta, .promise = offline_cmd.promise, }) catch |err| bun.handleOom(err); - this.parent().updateHasPendingActivity(); const data = offline_cmd.serialized_data; if (this.connectionReady() and this.write_buffer.remaining().len == 0) { @@ -977,7 +841,6 @@ pub const ValkeyClient = struct { if (unwritten.len > 0) { // Handle incomplete write. bun.handleOom(this.write_buffer.write(this.allocator, unwritten)); - this.parent().updateHasPendingActivity(); } return true; @@ -1042,7 +905,6 @@ pub const ValkeyClient = struct { // Add to queue with command type try this.in_flight.writeItem(cmd_pair); - this.parent().updateHasPendingActivity(); _ = this.flushData(); } @@ -1087,7 +949,6 @@ pub const ValkeyClient = struct { this.unregisterAutoFlusher(); if (this.status == .connected or this.status == .connecting) { this.status = .disconnected; - this.parent().updateHasPendingActivity(); this.close(); } } @@ -1100,7 +961,6 @@ pub const ValkeyClient = struct { /// Write data to the socket buffer fn write(this: *ValkeyClient, data: []const u8) !usize { try this.write_buffer.write(this.allocator, data); - this.parent().updateHasPendingActivity(); return data.len; } @@ -1125,18 +985,6 @@ pub const ValkeyClient = struct { this.parent().onValkeyConnect(value); } - pub fn onValkeySubscribe(this: *ValkeyClient, value: *protocol.RESPValue) void { - this.parent().onValkeySubscribe(value); - } - - pub fn onValkeyUnsubscribe(this: *ValkeyClient, value: *protocol.RESPValue) void { - this.parent().onValkeyUnsubscribe(value); - } - - pub fn onValkeyMessage(this: *ValkeyClient, value: []protocol.RESPValue) void { - this.parent().onValkeyMessage(value); - } - pub fn onValkeyReconnect(this: *ValkeyClient) void { this.parent().onValkeyReconnect(); } @@ -1151,9 +999,9 @@ pub const ValkeyClient = struct { }; // Auto-pipelining + const debug = bun.Output.scoped(.Redis, .visible); -const ValkeyCommand = @import("./ValkeyCommand.zig"); const protocol = @import("./valkey_protocol.zig"); const std = @import("std"); diff --git a/test/integration/bun-types/fixture/redis.ts b/test/integration/bun-types/fixture/redis.ts deleted file mode 100644 index f7e536200d..0000000000 --- a/test/integration/bun-types/fixture/redis.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { expectType } from "./utilities"; - -expectType(Bun.redis.publish("hello", "world")).is>(); - -const copy = await Bun.redis.duplicate(); -expectType(copy.connected).is(); -expectType(copy).is(); - -const listener: Bun.RedisClient.StringPubSubListener = (message, channel) => { - expectType(message).is(); - expectType(channel).is(); -}; -Bun.redis.subscribe("hello", listener); - -// Buffer subscriptions are not yet implemented -// const bufferListener: Bun.RedisClient.BufferPubSubListener = (message, channel) => { -// expectType(message).is>(); -// expectType(channel).is(); -// }; -// Bun.redis.subscribe("hello", bufferListener); - -expectType( - copy.subscribe("hello", message => { - expectType(message).is(); - }), -).is>(); - -await copy.unsubscribe(); -await copy.unsubscribe("hello"); - -expectType(copy.unsubscribe("hello", () => {})).is>(); diff --git a/test/js/valkey/test-utils.ts b/test/js/valkey/test-utils.ts index 264100262d..aa4ea31371 100644 --- a/test/js/valkey/test-utils.ts +++ b/test/js/valkey/test-utils.ts @@ -455,9 +455,9 @@ import { tmpdir } from "os"; * Create a new client with specific connection type */ export function createClient( - connectionType: ConnectionType = ConnectionType.TCP, - customOptions = {}, - dbId: number | undefined = undefined, + connectionType: ConnectionType = ConnectionType.TCP, + customOptions = {}, + dbId: number | undefined = undefined, ) { let url: string; const mkUrl = (baseUrl: string) => dbId ? `${baseUrl}/${dbId}`: baseUrl; @@ -765,14 +765,6 @@ async function getRedisContainerName(): Promise { /** * Restart the Redis container to simulate connection drop - * - * Restarts the container identified by the test harness and waits briefly for it - * to come back online (approximately 2 seconds). Use this to simulate a server - * restart or connection drop during tests. - * - * @returns A promise that resolves when the restart and short wait complete. - * @throws If the Docker restart command exits with a non-zero code; the error - * message includes the container's stderr output. */ export async function restartRedisContainer(): Promise { const containerName = await getRedisContainerName(); @@ -797,10 +789,3 @@ export async function restartRedisContainer(): Promise { console.log(`Redis container restarted: ${containerName}`); } - -/** - * @returns true or false with approximately equal probability - */ -export function randomCoinFlip(): boolean { - return Math.floor(Math.random() * 2) == 0; -} diff --git a/test/js/valkey/valkey.test.ts b/test/js/valkey/valkey.test.ts index 2891208107..95cbcda29b 100644 --- a/test/js/valkey/valkey.test.ts +++ b/test/js/valkey/valkey.test.ts @@ -1,14 +1,6 @@ -import { randomUUIDv7, RedisClient, sleep } from "bun"; +import { randomUUIDv7, RedisClient } from "bun"; import { beforeEach, describe, expect, test } from "bun:test"; -import { - ConnectionType, - createClient, - ctx, - DEFAULT_REDIS_URL, - expectType, - isEnabled, - randomCoinFlip, -} from "./test-utils"; +import { ConnectionType, createClient, ctx, DEFAULT_REDIS_URL, expectType, isEnabled } from "./test-utils"; describe.skipIf(!isEnabled)("Valkey Redis Client", () => { beforeEach(async () => { @@ -20,12 +12,6 @@ describe.skipIf(!isEnabled)("Valkey Redis Client", () => { await ctx.redis.send("FLUSHALL", ["SYNC"]); }); - const connectedRedis = async () => { - const redis = new RedisClient(DEFAULT_REDIS_URL); - await redis.connect(); - return redis; - }; - describe("Basic Operations", () => { test("should set and get strings", async () => { const redis = ctx.redis; @@ -223,566 +209,4 @@ describe.skipIf(!isEnabled)("Valkey Redis Client", () => { expect(valueAfterStop).toBe(TEST_VALUE); }); }); - - describe("PUB/SUB", () => { - const testChannel = "test-channel"; - const testKey = "test-key"; - const testValue = "test-value"; - const testMessage = "test-message"; - const flushTimeoutMs = 300; - - test("publishing to a channel does not fail", async () => { - const redis = await connectedRedis(); - // no subs - expect(await redis.publish(testChannel, testMessage)).toBe(0); - }); - - test("setting in subscriber mode gracefully fails", async () => { - const redis = await connectedRedis(); - - await redis.subscribe(testChannel, () => {}); - - expect(() => redis.set(testKey, testValue)).toThrow( - "RedisClient.prototype.set cannot be called while in subscriber mode", - ); - - // Clean up subscription - await redis.unsubscribe(testChannel); - }); - - test("setting after unsubscribing works", async () => { - const redis = await connectedRedis(); - - await redis.subscribe(testChannel, () => {}); - await redis.unsubscribe(testChannel); - - expect(redis.set(testKey, testValue)).resolves.toEqual("OK"); - }); - - test("subscribing to a channel receives messages", async () => { - const TEST_MESSAGE_COUNT = 128; - const redis = await connectedRedis(); - const subscriber = await connectedRedis(); - - var receiveCount = 0; - await subscriber.subscribe(testChannel, (message, channel) => { - receiveCount++; - expect(channel).toBe(testChannel); - expect(message).toBe(testMessage); - }); - - Array.from({ length: TEST_MESSAGE_COUNT }).forEach(async () => { - expect(await redis.publish(testChannel, testMessage)).toBe(1); - }); - - // Wait a little bit just to ensure all the messages are flushed. - await sleep(flushTimeoutMs); - - expect(receiveCount).toBe(TEST_MESSAGE_COUNT); - - await subscriber.unsubscribe(testChannel); - }); - - test("messages are received in order", async () => { - const TEST_MESSAGE_COUNT = 1024; - const redis = await connectedRedis(); - const subscriber = await connectedRedis(); - - var receivedMessages: string[] = []; - await subscriber.subscribe(testChannel, message => { - receivedMessages.push(message); - }); - - var sentMessages: string[] = []; - Array.from({ length: TEST_MESSAGE_COUNT }).forEach(async () => { - const message = randomUUIDv7(); - expect(await redis.publish(testChannel, message)).toBe(1); - sentMessages.push(message); - }); - - // Wait a little bit just to ensure all the messages are flushed. - await sleep(flushTimeoutMs); - - expect(receivedMessages.length).toBe(sentMessages.length); - expect(receivedMessages).toEqual(sentMessages); - - await subscriber.unsubscribe(testChannel); - }); - - test("subscribing to multiple channels receives messages", async () => { - const TEST_MESSAGE_COUNT = 128; - const redis = await connectedRedis(); - const subscriber = await connectedRedis(); - - const channels = [testChannel, "another-test-channel"]; - - var receivedMessages: { [channel: string]: string[] } = {}; - await subscriber.subscribe(channels, (message, channel) => { - receivedMessages[channel] = receivedMessages[channel] || []; - receivedMessages[channel].push(message); - }); - - var sentMessages: { [channel: string]: string[] } = {}; - for (let i = 0; i < TEST_MESSAGE_COUNT; i++) { - const channel = channels[randomCoinFlip() ? 0 : 1]; - const message = randomUUIDv7(); - - expect(await redis.publish(channel, message)).toBe(1); - - sentMessages[channel] = sentMessages[channel] || []; - sentMessages[channel].push(message); - } - - // Wait a little bit just to ensure all the messages are flushed. - await sleep(flushTimeoutMs); - - // Check that we received messages on both channels - expect(Object.keys(receivedMessages).sort()).toEqual(Object.keys(sentMessages).sort()); - - // Check messages match for each channel - for (const channel of channels) { - if (sentMessages[channel]) { - expect(receivedMessages[channel]).toEqual(sentMessages[channel]); - } - } - - await subscriber.unsubscribe(channels); - }); - - test("unsubscribing from specific channels while remaining subscribed to others", async () => { - const channel1 = "channel-1"; - const channel2 = "channel-2"; - const channel3 = "channel-3"; - - const redis = await connectedRedis(); - const subscriber = await connectedRedis(); - - let receivedMessages: { [channel: string]: string[] } = {}; - - // Subscribe to three channels - await subscriber.subscribe([channel1, channel2, channel3], (message, channel) => { - receivedMessages[channel] = receivedMessages[channel] || []; - receivedMessages[channel].push(message); - }); - - // Send initial messages to all channels - expect(await redis.publish(channel1, "msg1-before")).toBe(1); - expect(await redis.publish(channel2, "msg2-before")).toBe(1); - expect(await redis.publish(channel3, "msg3-before")).toBe(1); - - await sleep(flushTimeoutMs); - - // Unsubscribe from channel2 - await subscriber.unsubscribe(channel2); - - // Send messages after unsubscribing from channel2 - expect(await redis.publish(channel1, "msg1-after")).toBe(1); - expect(await redis.publish(channel2, "msg2-after")).toBe(0); - expect(await redis.publish(channel3, "msg3-after")).toBe(1); - - await sleep(flushTimeoutMs); - - // Check we received messages only on subscribed channels - expect(receivedMessages[channel1]).toEqual(["msg1-before", "msg1-after"]); - expect(receivedMessages[channel2]).toEqual(["msg2-before"]); // No "msg2-after" - expect(receivedMessages[channel3]).toEqual(["msg3-before", "msg3-after"]); - - await subscriber.unsubscribe([channel1, channel3]); - }); - - test("subscribing to the same channel multiple times", async () => { - const redis = await connectedRedis(); - const subscriber = await connectedRedis(); - const channel = "duplicate-channel"; - - let callCount = 0; - const listener = () => { - callCount++; - }; - - let callCount2 = 0; - const listener2 = () => { - callCount2++; - }; - - // Subscribe to the same channel twice - await subscriber.subscribe(channel, listener); - await subscriber.subscribe(channel, listener2); - - // Publish a single message - expect(await redis.publish(channel, "test-message")).toBe(1); - - await sleep(flushTimeoutMs); - - // Both listeners should have been called once. - expect(callCount).toBe(1); - expect(callCount2).toBe(1); - - await subscriber.unsubscribe(channel); - }); - - test("empty string messages", async () => { - const redis = await connectedRedis(); - const channel = "empty-message-channel"; - const subscriber = await connectedRedis(); - - let receivedMessage: string | undefined = undefined; - await subscriber.subscribe(channel, message => { - receivedMessage = message; - }); - - expect(await redis.publish(channel, "")).toBe(1); - await sleep(flushTimeoutMs); - - expect(receivedMessage).not.toBeUndefined(); - expect(receivedMessage!).toBe(""); - - await subscriber.unsubscribe(channel); - }); - - test("special characters in channel names", async () => { - const redis = await connectedRedis(); - const subscriber = await connectedRedis(); - - const specialChannels = [ - "channel:with:colons", - "channel with spaces", - "channel-with-unicode-😀", - "channel[with]brackets", - "channel@with#special$chars", - ]; - - for (const channel of specialChannels) { - let received = false; - await subscriber.subscribe(channel, () => { - received = true; - }); - - expect(await redis.publish(channel, "test")).toBe(1); - await sleep(flushTimeoutMs); - - expect(received).toBe(true); - await subscriber.unsubscribe(channel); - } - }); - - test("ping works in subscription mode", async () => { - const redis = await connectedRedis(); - const channel = "ping-test-channel"; - - await redis.subscribe(channel, () => {}); - - // Ping should work in subscription mode - const pong = await redis.ping(); - expect(pong).toBe("PONG"); - - const customPing = await redis.ping("hello"); - expect(customPing).toBe("hello"); - - await redis.unsubscribe(channel); - }); - - test("publish does not work from a subscribed client", async () => { - const redis = await connectedRedis(); - const channel = "self-publish-channel"; - - await redis.subscribe(channel, () => {}); - - // Publishing from the same client should work - expect(async () => redis.publish(channel, "self-published")).toThrow(); - await sleep(flushTimeoutMs); - - await redis.unsubscribe(channel); - }); - - test("complete unsubscribe restores normal command mode", async () => { - const redis = await connectedRedis(); - const channel = "restore-test-channel"; - const testKey = "restore-test-key"; - - await redis.subscribe(channel, () => {}); - - // Should fail in subscription mode - expect(() => redis.set(testKey, testValue)).toThrow( - "RedisClient.prototype.set cannot be called while in subscriber mode.", - ); - - // Unsubscribe from all channels - await redis.unsubscribe(channel); - - // Should work after unsubscribing - const result = await redis.set(testKey, "value"); - expect(result).toBe("OK"); - - const value = await redis.get(testKey); - expect(value).toBe("value"); - }); - - test("publishing without subscribers succeeds", async () => { - const redis = await connectedRedis(); - const channel = "no-subscribers-channel"; - - // Publishing without subscribers should not throw - expect(await redis.publish(channel, "message")).toBe(0); - }); - - test("unsubscribing from non-subscribed channels", async () => { - const redis = await connectedRedis(); - const channel = "never-subscribed-channel"; - - expect(() => redis.unsubscribe(channel)).toThrow( - "RedisClient.prototype.unsubscribe can only be called while in subscriber mode.", - ); - }); - - test("callback errors don't crash the client", async () => { - const redis = await connectedRedis(); - const channel = "error-callback-channel"; - - const subscriber = await connectedRedis(); - - let messageCount = 0; - await subscriber.subscribe(channel, () => { - messageCount++; - if (messageCount === 2) { - throw new Error("Intentional callback error"); - } - }); - - // Send multiple messages - expect(await redis.publish(channel, "message1")).toBe(1); - expect(await redis.publish(channel, "message2")).toBe(1); - expect(await redis.publish(channel, "message3")).toBe(1); - - await sleep(flushTimeoutMs); - - expect(messageCount).toBe(3); - - await subscriber.unsubscribe(channel); - }); - - test("subscriptions return correct counts", async () => { - const subscriber = await connectedRedis(); - - expect(await subscriber.subscribe("chan1", () => {})).toBe(1); - expect(await subscriber.subscribe("chan2", () => {})).toBe(2); - - await subscriber.unsubscribe(); - }); - - test("unsubscribing from listeners", async () => { - const redis = await connectedRedis(); - const channel = "error-callback-channel"; - - const subscriber = await connectedRedis(); - - let messageCount1 = 0; - const listener1 = () => { - messageCount1++; - }; - await subscriber.subscribe(channel, listener1); - - let messageCount2 = 0; - const listener2 = () => { - messageCount2++; - }; - await subscriber.subscribe(channel, listener2); - - await redis.publish(channel, "message1"); - - await sleep(flushTimeoutMs); - - expect(messageCount1).toBe(1); - expect(messageCount2).toBe(1); - - await subscriber.unsubscribe(channel, listener2); - - await redis.publish(channel, "message1"); - - await sleep(flushTimeoutMs); - - expect(messageCount1).toBe(2); - expect(messageCount2).toBe(1); - - await subscriber.unsubscribe(); - - await redis.publish(channel, "message1"); - - await sleep(flushTimeoutMs); - - expect(messageCount1).toBe(2); - expect(messageCount2).toBe(1); - }); - }); - - describe("duplicate()", () => { - test("should create duplicate of unconnected client that remains unconnected", async () => { - const redis = new RedisClient(DEFAULT_REDIS_URL); - expect(redis.connected).toBe(false); - - const duplicate = await redis.duplicate(); - expect(duplicate.connected).toBe(false); - expect(duplicate).not.toBe(redis); - }); - - test("should create duplicate of connected client that gets connected", async () => { - const redis = await connectedRedis(); - - const duplicate = await redis.duplicate(); - - expect(duplicate.connected).toBe(true); - expect(duplicate).not.toBe(redis); - - // Both should work independently - await redis.set("test-original", "original-value"); - await duplicate.set("test-duplicate", "duplicate-value"); - - expect(await redis.get("test-duplicate")).toBe("duplicate-value"); - expect(await duplicate.get("test-original")).toBe("original-value"); - - duplicate.close(); - }); - - test("should create duplicate of manually closed client that remains closed", async () => { - const redis = new RedisClient(DEFAULT_REDIS_URL); - await redis.connect(); - redis.close?.(); - expect(redis.connected).toBe(false); - - const duplicate = await redis.duplicate(); - expect(duplicate.connected).toBe(false); - }); - - test("should preserve connection configuration in duplicate", async () => { - const redis = new RedisClient(DEFAULT_REDIS_URL); - await redis.connect(); - - const duplicate = await redis.duplicate(); - - // Both clients should be able to perform the same operations - const testKey = `duplicate-config-test-${randomUUIDv7().substring(0, 8)}`; - const testValue = "test-value"; - - await redis.set(testKey, testValue); - const retrievedValue = await duplicate.get(testKey); - - expect(retrievedValue).toBe(testValue); - - duplicate.close?.(); - }); - - test("should allow duplicate to work independently from original", async () => { - const redis = new RedisClient(DEFAULT_REDIS_URL); - await redis.connect(); - - const duplicate = await redis.duplicate(); - - // Close original, duplicate should still work - redis.close?.(); - - const testKey = `independent-test-${randomUUIDv7().substring(0, 8)}`; - const testValue = "independent-value"; - - await duplicate.set(testKey, testValue); - const retrievedValue = await duplicate.get(testKey); - - expect(retrievedValue).toBe(testValue); - - duplicate.close?.(); - }); - - test("should handle duplicate of client in subscriber mode", async () => { - const redis = await connectedRedis(); - const testChannel = "test-subscriber-duplicate"; - - // Put original client in subscriber mode - await redis.subscribe(testChannel, () => {}); - - const duplicate = await redis.duplicate(); - - // Duplicate should not be in subscriber mode - expect(() => duplicate.set("test-key", "test-value")).not.toThrow(); - - await redis.unsubscribe(testChannel); - duplicate.close?.(); - }); - - test("should create multiple duplicates from same client", async () => { - const redis = new RedisClient(DEFAULT_REDIS_URL); - await redis.connect(); - - const duplicate1 = await redis.duplicate(); - const duplicate2 = await redis.duplicate(); - const duplicate3 = await redis.duplicate(); - - // All should be connected - expect(duplicate1.connected).toBe(true); - expect(duplicate2.connected).toBe(true); - expect(duplicate3.connected).toBe(true); - - // All should work independently - const testKey = `multi-duplicate-test-${randomUUIDv7().substring(0, 8)}`; - await duplicate1.set(`${testKey}-1`, "value-1"); - await duplicate2.set(`${testKey}-2`, "value-2"); - await duplicate3.set(`${testKey}-3`, "value-3"); - - expect(await duplicate1.get(`${testKey}-1`)).toBe("value-1"); - expect(await duplicate2.get(`${testKey}-2`)).toBe("value-2"); - expect(await duplicate3.get(`${testKey}-3`)).toBe("value-3"); - - // Cross-check: each duplicate can read what others wrote - expect(await duplicate1.get(`${testKey}-2`)).toBe("value-2"); - expect(await duplicate2.get(`${testKey}-3`)).toBe("value-3"); - expect(await duplicate3.get(`${testKey}-1`)).toBe("value-1"); - - duplicate1.close?.(); - duplicate2.close?.(); - duplicate3.close?.(); - redis.close?.(); - }); - - test("should duplicate client that failed to connect", async () => { - // Create client with invalid credentials to force connection failure - const url = new URL(DEFAULT_REDIS_URL); - url.username = "invaliduser"; - url.password = "invalidpassword"; - const failedRedis = new RedisClient(url.toString()); - - // Try to connect and expect it to fail - let connectionFailed = false; - try { - await failedRedis.connect(); - } catch { - connectionFailed = true; - } - - expect(connectionFailed).toBe(true); - expect(failedRedis.connected).toBe(false); - - // Duplicate should also remain unconnected - const duplicate = await failedRedis.duplicate(); - expect(duplicate.connected).toBe(false); - }); - - test("should handle duplicate timing with concurrent operations", async () => { - const redis = new RedisClient(DEFAULT_REDIS_URL); - await redis.connect(); - - // Start some operations on the original client - const testKey = `concurrent-test-${randomUUIDv7().substring(0, 8)}`; - const originalOperation = redis.set(testKey, "original-value"); - - // Create duplicate while operation is in flight - const duplicate = await redis.duplicate(); - - // Wait for original operation to complete - await originalOperation; - - // Duplicate should be able to read the value - expect(await duplicate.get(testKey)).toBe("original-value"); - - duplicate.close?.(); - redis.close?.(); - }); - }); });