Redis PUB/SUB (#21728)

### What does this PR do?

The goal of this PR is to introduce PUB/SUB functionality to the
built-in Redis client. Based on the fact that the current Redis API does
not appear to have compatibility with `io-redis` or `redis-node`, I've
decided to do away with existing APIs and API compatibility with these
existing libraries.

I have decided to base my implementation on the [`redis-node` pub/sub
API](https://github.com/redis/node-redis/blob/master/docs/pub-sub.md).



### How did you verify your code works?

I've written a set of unit tests to hopefully catch the major use-cases
of this feature. They all appear to pass:

<img width="368" height="71" alt="image"
src="https://github.com/user-attachments/assets/36527386-c8fe-47f6-b69a-a11d4b614fa0"
/>


#### Future Improvements

I would have a lot more confidence in our Redis implementation if we
tested it with a test suite running over a network which emulates a high
network failure rate. There are large amounts of edge cases that are
worthwhile to grab, but I think we can roll that out in a future PR.

### Future Tasks

- [ ] Tests over flaky network
- [ ] Use the custom private members over `_<member>`.

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Dylan Conway <dylan.conway567@gmail.com>
Co-authored-by: Alistair Smith <hi@alistair.sh>
This commit is contained in:
Marko Vejnovic
2025-09-09 22:13:25 -07:00
committed by GitHub
parent 3ee477fc5b
commit dc3c8f79c4
14 changed files with 2069 additions and 132 deletions

View File

@@ -52,21 +52,25 @@ declare module "bun" {
export namespace RedisClient {
type KeyLike = string | ArrayBufferView | Blob;
type StringPubSubListener = (message: string, channel: string) => void;
// Buffer subscriptions are not yet implemented
// type BufferPubSubListener = (message: Uint8Array<ArrayBuffer>, channel: string) => void;
}
export class RedisClient {
/**
* Creates a new Redis client
* @param url URL to connect to, defaults to process.env.VALKEY_URL, process.env.REDIS_URL, or "valkey://localhost:6379"
*
* @param url URL to connect to, defaults to `process.env.VALKEY_URL`,
* `process.env.REDIS_URL`, or `"valkey://localhost:6379"`
* @param options Additional options
*
* @example
* ```ts
* const valkey = new RedisClient();
*
* await valkey.set("hello", "world");
*
* console.log(await valkey.get("hello"));
* const redis = new RedisClient();
* await redis.set("hello", "world");
* console.log(await redis.get("hello"));
* ```
*/
constructor(url?: string, options?: RedisOptions);
@@ -88,12 +92,14 @@ declare module "bun" {
/**
* Callback fired when the client disconnects from the Redis server
*
* @param error The error that caused the disconnection
*/
onclose: ((this: RedisClient, error: Error) => void) | null;
/**
* Connect to the Redis server
*
* @returns A promise that resolves when connected
*/
connect(): Promise<void>;
@@ -152,10 +158,12 @@ declare module "bun" {
set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, px: "PX", milliseconds: number): Promise<"OK">;
/**
* Set key to hold the string value with expiration at a specific Unix timestamp
* Set key to hold the string value with expiration at a specific Unix
* timestamp
* @param key The key to set
* @param value The value to set
* @param exat Set the specified Unix time at which the key will expire, in seconds
* @param exat Set the specified Unix time at which the key will expire, in
* seconds
* @returns Promise that resolves with "OK" on success
*/
set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, exat: "EXAT", timestampSeconds: number): Promise<"OK">;
@@ -179,7 +187,8 @@ declare module "bun" {
* @param key The key to set
* @param value The value to set
* @param nx Only set the key if it does not already exist
* @returns Promise that resolves with "OK" on success, or null if the key already exists
* @returns Promise that resolves with "OK" on success, or null if the key
* already exists
*/
set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, nx: "NX"): Promise<"OK" | null>;
@@ -188,7 +197,8 @@ declare module "bun" {
* @param key The key to set
* @param value The value to set
* @param xx Only set the key if it already exists
* @returns Promise that resolves with "OK" on success, or null if the key does not exist
* @returns Promise that resolves with "OK" on success, or null if the key
* does not exist
*/
set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, xx: "XX"): Promise<"OK" | null>;
@@ -196,8 +206,10 @@ declare module "bun" {
* Set key to hold the string value and return the old value
* @param key The key to set
* @param value The value to set
* @param get Return the old string stored at key, or null if key did not exist
* @returns Promise that resolves with the old value, or null if key did not exist
* @param get Return the old string stored at key, or null if key did not
* exist
* @returns Promise that resolves with the old value, or null if key did not
* exist
*/
set(key: RedisClient.KeyLike, value: RedisClient.KeyLike, get: "GET"): Promise<string | null>;
@@ -243,7 +255,8 @@ declare module "bun" {
/**
* Determine if a key exists
* @param key The key to check
* @returns Promise that resolves with true if the key exists, false otherwise
* @returns Promise that resolves with true if the key exists, false
* otherwise
*/
exists(key: RedisClient.KeyLike): Promise<boolean>;
@@ -258,7 +271,8 @@ declare module "bun" {
/**
* Get the time to live for a key in seconds
* @param key The key to get the TTL for
* @returns Promise that resolves with the TTL, -1 if no expiry, or -2 if key doesn't exist
* @returns Promise that resolves with the TTL, -1 if no expiry, or -2 if
* key doesn't exist
*/
ttl(key: RedisClient.KeyLike): Promise<number>;
@@ -282,7 +296,8 @@ declare module "bun" {
* Check if a value is a member of a set
* @param key The set key
* @param member The member to check
* @returns Promise that resolves with true if the member exists, false otherwise
* @returns Promise that resolves with true if the member exists, false
* otherwise
*/
sismember(key: RedisClient.KeyLike, member: string): Promise<boolean>;
@@ -290,7 +305,8 @@ declare module "bun" {
* Add a member to a set
* @param key The set key
* @param member The member to add
* @returns Promise that resolves with 1 if the member was added, 0 if it already existed
* @returns Promise that resolves with 1 if the member was added, 0 if it
* already existed
*/
sadd(key: RedisClient.KeyLike, member: string): Promise<number>;
@@ -298,7 +314,8 @@ declare module "bun" {
* Remove a member from a set
* @param key The set key
* @param member The member to remove
* @returns Promise that resolves with 1 if the member was removed, 0 if it didn't exist
* @returns Promise that resolves with 1 if the member was removed, 0 if it
* didn't exist
*/
srem(key: RedisClient.KeyLike, member: string): Promise<number>;
@@ -312,14 +329,16 @@ declare module "bun" {
/**
* Get a random member from a set
* @param key The set key
* @returns Promise that resolves with a random member, or null if the set is empty
* @returns Promise that resolves with a random member, or null if the set
* is empty
*/
srandmember(key: RedisClient.KeyLike): Promise<string | null>;
/**
* Remove and return a random member from a set
* @param key The set key
* @returns Promise that resolves with the removed member, or null if the set is empty
* @returns Promise that resolves with the removed member, or null if the
* set is empty
*/
spop(key: RedisClient.KeyLike): Promise<string | null>;
@@ -386,28 +405,32 @@ declare module "bun" {
/**
* Remove and get the first element in a list
* @param key The list key
* @returns Promise that resolves with the first element, or null if the list is empty
* @returns Promise that resolves with the first element, or null if the
* list is empty
*/
lpop(key: RedisClient.KeyLike): Promise<string | null>;
/**
* Remove the expiration from a key
* @param key The key to persist
* @returns Promise that resolves with 1 if the timeout was removed, 0 if the key doesn't exist or has no timeout
* @returns Promise that resolves with 1 if the timeout was removed, 0 if
* the key doesn't exist or has no timeout
*/
persist(key: RedisClient.KeyLike): Promise<number>;
/**
* Get the expiration time of a key as a UNIX timestamp in milliseconds
* @param key The key to check
* @returns Promise that resolves with the timestamp, or -1 if the key has no expiration, or -2 if the key doesn't exist
* @returns Promise that resolves with the timestamp, or -1 if the key has
* no expiration, or -2 if the key doesn't exist
*/
pexpiretime(key: RedisClient.KeyLike): Promise<number>;
/**
* Get the time to live for a key in milliseconds
* @param key The key to check
* @returns Promise that resolves with the TTL in milliseconds, or -1 if the key has no expiration, or -2 if the key doesn't exist
* @returns Promise that resolves with the TTL in milliseconds, or -1 if the
* key has no expiration, or -2 if the key doesn't exist
*/
pttl(key: RedisClient.KeyLike): Promise<number>;
@@ -421,42 +444,48 @@ declare module "bun" {
/**
* Get the number of members in a set
* @param key The set key
* @returns Promise that resolves with the cardinality (number of elements) of the set
* @returns Promise that resolves with the cardinality (number of elements)
* of the set
*/
scard(key: RedisClient.KeyLike): Promise<number>;
/**
* Get the length of the value stored in a key
* @param key The key to check
* @returns Promise that resolves with the length of the string value, or 0 if the key doesn't exist
* @returns Promise that resolves with the length of the string value, or 0
* if the key doesn't exist
*/
strlen(key: RedisClient.KeyLike): Promise<number>;
/**
* Get the number of members in a sorted set
* @param key The sorted set key
* @returns Promise that resolves with the cardinality (number of elements) of the sorted set
* @returns Promise that resolves with the cardinality (number of elements)
* of the sorted set
*/
zcard(key: RedisClient.KeyLike): Promise<number>;
/**
* Remove and return members with the highest scores in a sorted set
* @param key The sorted set key
* @returns Promise that resolves with the removed member and its score, or null if the set is empty
* @returns Promise that resolves with the removed member and its score, or
* null if the set is empty
*/
zpopmax(key: RedisClient.KeyLike): Promise<string | null>;
/**
* Remove and return members with the lowest scores in a sorted set
* @param key The sorted set key
* @returns Promise that resolves with the removed member and its score, or null if the set is empty
* @returns Promise that resolves with the removed member and its score, or
* null if the set is empty
*/
zpopmin(key: RedisClient.KeyLike): Promise<string | null>;
/**
* Get one or multiple random members from a sorted set
* @param key The sorted set key
* @returns Promise that resolves with a random member, or null if the set is empty
* @returns Promise that resolves with a random member, or null if the set
* is empty
*/
zrandmember(key: RedisClient.KeyLike): Promise<string | null>;
@@ -464,7 +493,8 @@ declare module "bun" {
* Append a value to a key
* @param key The key to append to
* @param value The value to append
* @returns Promise that resolves with the length of the string after the append operation
* @returns Promise that resolves with the length of the string after the
* append operation
*/
append(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise<number>;
@@ -472,7 +502,8 @@ declare module "bun" {
* Set the value of a key and return its old value
* @param key The key to set
* @param value The value to set
* @returns Promise that resolves with the old value, or null if the key didn't exist
* @returns Promise that resolves with the old value, or null if the key
* didn't exist
*/
getset(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise<string | null>;
@@ -480,7 +511,8 @@ declare module "bun" {
* Prepend one or multiple values to a list
* @param key The list key
* @param value The value to prepend
* @returns Promise that resolves with the length of the list after the push operation
* @returns Promise that resolves with the length of the list after the push
* operation
*/
lpush(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise<number>;
@@ -488,7 +520,8 @@ declare module "bun" {
* Prepend a value to a list, only if the list exists
* @param key The list key
* @param value The value to prepend
* @returns Promise that resolves with the length of the list after the push operation, or 0 if the list doesn't exist
* @returns Promise that resolves with the length of the list after the push
* operation, or 0 if the list doesn't exist
*/
lpushx(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise<number>;
@@ -496,7 +529,8 @@ declare module "bun" {
* Add one or more members to a HyperLogLog
* @param key The HyperLogLog key
* @param element The element to add
* @returns Promise that resolves with 1 if the HyperLogLog was altered, 0 otherwise
* @returns Promise that resolves with 1 if the HyperLogLog was altered, 0
* otherwise
*/
pfadd(key: RedisClient.KeyLike, element: string): Promise<number>;
@@ -504,7 +538,8 @@ declare module "bun" {
* Append one or multiple values to a list
* @param key The list key
* @param value The value to append
* @returns Promise that resolves with the length of the list after the push operation
* @returns Promise that resolves with the length of the list after the push
* operation
*/
rpush(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise<number>;
@@ -512,7 +547,8 @@ declare module "bun" {
* Append a value to a list, only if the list exists
* @param key The list key
* @param value The value to append
* @returns Promise that resolves with the length of the list after the push operation, or 0 if the list doesn't exist
* @returns Promise that resolves with the length of the list after the push
* operation, or 0 if the list doesn't exist
*/
rpushx(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise<number>;
@@ -520,7 +556,8 @@ declare module "bun" {
* Set the value of a key, only if the key does not exist
* @param key The key to set
* @param value The value to set
* @returns Promise that resolves with 1 if the key was set, 0 if the key was not set
* @returns Promise that resolves with 1 if the key was set, 0 if the key
* was not set
*/
setnx(key: RedisClient.KeyLike, value: RedisClient.KeyLike): Promise<number>;
@@ -528,14 +565,16 @@ declare module "bun" {
* Get the score associated with the given member in a sorted set
* @param key The sorted set key
* @param member The member to get the score for
* @returns Promise that resolves with the score of the member as a string, or null if the member or key doesn't exist
* @returns Promise that resolves with the score of the member as a string,
* or null if the member or key doesn't exist
*/
zscore(key: RedisClient.KeyLike, member: string): Promise<string | null>;
/**
* Get the values of all specified keys
* @param keys The keys to get
* @returns Promise that resolves with an array of values, with null for keys that don't exist
* @returns Promise that resolves with an array of values, with null for
* keys that don't exist
*/
mget(...keys: RedisClient.KeyLike[]): Promise<(string | null)[]>;
@@ -549,37 +588,46 @@ declare module "bun" {
/**
* Return a serialized version of the value stored at the specified key
* @param key The key to dump
* @returns Promise that resolves with the serialized value, or null if the key doesn't exist
* @returns Promise that resolves with the serialized value, or null if the
* key doesn't exist
*/
dump(key: RedisClient.KeyLike): Promise<string | null>;
/**
* Get the expiration time of a key as a UNIX timestamp in seconds
*
* @param key The key to check
* @returns Promise that resolves with the timestamp, or -1 if the key has no expiration, or -2 if the key doesn't exist
* @returns Promise that resolves with the timestamp, or -1 if the key has
* no expiration, or -2 if the key doesn't exist
*/
expiretime(key: RedisClient.KeyLike): Promise<number>;
/**
* Get the value of a key and delete the key
*
* @param key The key to get and delete
* @returns Promise that resolves with the value of the key, or null if the key doesn't exist
* @returns Promise that resolves with the value of the key, or null if the
* key doesn't exist
*/
getdel(key: RedisClient.KeyLike): Promise<string | null>;
/**
* Get the value of a key and optionally set its expiration
*
* @param key The key to get
* @returns Promise that resolves with the value of the key, or null if the key doesn't exist
* @returns Promise that resolves with the value of the key, or null if the
* key doesn't exist
*/
getex(key: RedisClient.KeyLike): Promise<string | null>;
/**
* Get the value of a key and set its expiration in seconds
*
* @param key The key to get
* @param ex Set the specified expire time, in seconds
* @param seconds The number of seconds until expiration
* @returns Promise that resolves with the value of the key, or null if the key doesn't exist
* @returns Promise that resolves with the value of the key, or null if the
* key doesn't exist
*/
getex(key: RedisClient.KeyLike, ex: "EX", seconds: number): Promise<string | null>;
@@ -594,6 +642,7 @@ declare module "bun" {
/**
* Get the value of a key and set its expiration at a specific Unix timestamp in seconds
*
* @param key The key to get
* @param exat Set the specified Unix time at which the key will expire, in seconds
* @param timestampSeconds The Unix timestamp in seconds
@@ -603,6 +652,7 @@ declare module "bun" {
/**
* Get the value of a key and set its expiration at a specific Unix timestamp in milliseconds
*
* @param key The key to get
* @param pxat Set the specified Unix time at which the key will expire, in milliseconds
* @param timestampMilliseconds The Unix timestamp in milliseconds
@@ -612,6 +662,7 @@ declare module "bun" {
/**
* Get the value of a key and remove its expiration
*
* @param key The key to get
* @param persist Remove the expiration from the key
* @returns Promise that resolves with the value of the key, or null if the key doesn't exist
@@ -626,10 +677,133 @@ declare module "bun" {
/**
* Ping the server with a message
*
* @param message The message to send to the server
* @returns Promise that resolves with the message if the server is reachable, or throws an error if the server is not reachable
*/
ping(message: RedisClient.KeyLike): Promise<string>;
/**
* Publish a message to a Redis channel.
*
* @param channel The channel to publish to.
* @param message The message to publish.
*
* @returns The number of clients that received the message. Note that in a
* cluster this returns the total number of clients in the same node.
*/
publish(channel: string, message: string): Promise<number>;
/**
* Subscribe to a Redis channel.
*
* Subscribing disables automatic pipelining, so all commands will be
* received immediately.
*
* Subscribing moves the channel to a dedicated subscription state which
* prevents most other commands from being executed until unsubscribed. Only
* {@link ping `.ping()`}, {@link subscribe `.subscribe()`}, and
* {@link unsubscribe `.unsubscribe()`} are legal to invoke in a subscribed
* upon channel.
*
* @param channel The channel to subscribe to.
* @param listener The listener to call when a message is received on the
* channel. The listener will receive the message as the first argument and
* the channel as the second argument.
*
* @example
* ```ts
* await client.subscribe("my-channel", (message, channel) => {
* console.log(`Received message on ${channel}: ${message}`);
* });
* ```
*/
subscribe(channel: string, listener: RedisClient.StringPubSubListener): Promise<number>;
/**
* Subscribe to multiple Redis channels.
*
* Subscribing disables automatic pipelining, so all commands will be
* received immediately.
*
* Subscribing moves the channels to a dedicated subscription state in which
* only a limited set of commands can be executed.
*
* @param channels An array of channels to subscribe to.
* @param listener The listener to call when a message is received on any of
* the subscribed channels. The listener will receive the message as the
* first argument and the channel as the second argument.
*/
subscribe(channels: string[], listener: RedisClient.StringPubSubListener): Promise<number>;
/**
* Unsubscribe from a singular Redis channel.
*
* @param channel The channel to unsubscribe from.
*
* If there are no more channels subscribed to, the client automatically
* re-enables pipelining if it was previously enabled.
*
* Unsubscribing moves the channel back to a normal state out of the
* subscription state if all channels have been unsubscribed from. For
* further details on the subscription state, see
* {@link subscribe `.subscribe()`}.
*/
unsubscribe(channel: string): Promise<void>;
/**
* Remove a listener from a given Redis channel.
*
* If there are no more channels subscribed to, the client automatically
* re-enables pipelining if it was previously enabled.
*
* Unsubscribing moves the channel back to a normal state out of the
* subscription state if all channels have been unsubscribed from. For
* further details on the subscription state, see
* {@link subscribe `.subscribe()`}.
*
* @param channel The channel to unsubscribe from.
* @param listener The listener to remove. This is tested against
* referential equality so you must pass the exact same listener instance as
* when subscribing.
*/
unsubscribe(channel: string, listener: RedisClient.StringPubSubListener): Promise<void>;
/**
* Unsubscribe from all registered Redis channels.
*
* The client will automatically re-enable pipelining if it was previously
* enabled.
*
* Unsubscribing moves the channel back to a normal state out of the
* subscription state if all channels have been unsubscribed from. For
* further details on the subscription state, see
* {@link subscribe `.subscribe()`}.
*/
unsubscribe(): Promise<void>;
/**
* Unsubscribe from multiple Redis channels.
*
* @param channels An array of channels to unsubscribe from.
*
* If there are no more channels subscribed to, the client automatically
* re-enables pipelining if it was previously enabled.
*
* Unsubscribing moves the channel back to a normal state out of the
* subscription state if all channels have been unsubscribed from. For
* further details on the subscription state, see
* {@link subscribe `.subscribe()`}.
*/
unsubscribe(channels: string[]): Promise<void>;
/**
* @brief Create a new RedisClient instance with the same configuration as
* the current instance.
*
* This will open up a new connection to the Redis server.
*/
duplicate(): Promise<RedisClient>;
}
/**

View File

@@ -9,6 +9,7 @@ export default [
configurable: false,
JSType: "0b11101110",
memoryCost: true,
hasPendingActivity: true,
proto: {
connected: {
getter: "getConnected",
@@ -222,11 +223,12 @@ export default [
zrank: { fn: "zrank" },
zrevrank: { fn: "zrevrank" },
subscribe: { fn: "subscribe" },
duplicate: { fn: "duplicate" },
psubscribe: { fn: "psubscribe" },
unsubscribe: { fn: "unsubscribe" },
punsubscribe: { fn: "punsubscribe" },
pubsub: { fn: "pubsub" },
},
values: ["onconnect", "onclose", "connectionPromise", "hello"],
values: ["onconnect", "onclose", "connectionPromise", "hello", "subscriptionCallbackMap"],
}),
];

View File

@@ -9,13 +9,18 @@ pub const JSMap = opaque {
return bun.cpp.JSC__JSMap__set(this, globalObject, key, value);
}
pub fn get_(this: *JSMap, globalObject: *JSGlobalObject, key: JSValue) JSValue {
return bun.cpp.JSC__JSMap__get_(this, globalObject, key);
}
extern fn JSC__JSMap__get(*JSMap, *JSGlobalObject, JSValue) JSValue;
pub fn get(this: *JSMap, globalObject: *JSGlobalObject, key: JSValue) ?JSValue {
const value = get_(this, globalObject, key);
if (value.isEmpty()) {
pub fn get(this: *JSMap, globalObject: *JSGlobalObject, key: JSValue) bun.JSError!?JSValue {
var scope: jsc.CatchScope = undefined;
scope.init(globalObject, @src());
defer scope.deinit();
const value = JSC__JSMap__get(this, globalObject, key);
try scope.returnIfException();
if (value == .zero) {
return null;
}
return value;
@@ -29,6 +34,10 @@ pub const JSMap = opaque {
return bun.cpp.JSC__JSMap__remove(this, globalObject, key);
}
pub fn size(this: *JSMap, globalObject: *JSGlobalObject) usize {
return bun.cpp.JSC__JSMap__size(this, globalObject);
}
pub fn fromJS(value: JSValue) ?*JSMap {
if (value.jsTypeLoose() == .Map) {
return bun.cast(*JSMap, value.asEncoded().asPtr.?);

View File

@@ -37,6 +37,7 @@
#include "JavaScriptCore/JSArrayInlines.h"
#include "JavaScriptCore/ErrorInstanceInlines.h"
#include "JavaScriptCore/BigIntObject.h"
#include "JavaScriptCore/OrderedHashTableHelper.h"
#include "JavaScriptCore/JSCallbackObject.h"
#include "JavaScriptCore/JSClassRef.h"
@@ -6401,11 +6402,20 @@ CPP_DECL JSC::EncodedJSValue JSC__JSMap__create(JSC::JSGlobalObject* arg0)
JSC::JSMap* map = JSC::JSMap::create(arg0->vm(), arg0->mapStructure());
return JSC::JSValue::encode(map);
}
CPP_DECL [[ZIG_EXPORT(nothrow)]] JSC::EncodedJSValue JSC__JSMap__get_(JSC::JSMap* map, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2)
// zero means "not found" or an exception was thrown
CPP_DECL JSC::EncodedJSValue JSC__JSMap__get(JSC::JSMap* map, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2)
{
auto& vm = arg1->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
JSC::JSValue value = JSC::JSValue::decode(JSValue2);
return JSC::JSValue::encode(map->get(arg1, value));
JSValue entryValue = map->getImpl(arg1, [&](OrderedHashMap::Storage& storage) ALWAYS_INLINE_LAMBDA {
return OrderedHashMap::Helper::find(arg1, storage, value);
});
RELEASE_AND_RETURN(scope, JSC::JSValue::encode(entryValue));
}
CPP_DECL [[ZIG_EXPORT(nothrow)]] bool JSC__JSMap__has(JSC::JSMap* map, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2)
{
@@ -6422,6 +6432,11 @@ CPP_DECL [[ZIG_EXPORT(nothrow)]] void JSC__JSMap__set(JSC::JSMap* map, JSC::JSGl
map->set(arg1, JSC::JSValue::decode(JSValue2), JSC::JSValue::decode(JSValue3));
}
CPP_DECL [[ZIG_EXPORT(nothrow)]] uint32_t JSC__JSMap__size(JSC::JSMap* map, JSC::JSGlobalObject* arg1)
{
return map->size();
}
CPP_DECL void JSC__VM__setControlFlowProfiler(JSC::VM* vm, bool isEnabled)
{
if (isEnabled) {

View File

@@ -8,7 +8,7 @@
#define AUTO_EXTERN_C extern "C"
#ifdef WIN32
#define AUTO_EXTERN_C_ZIG extern "C"
#define AUTO_EXTERN_C_ZIG extern "C"
#else
#define AUTO_EXTERN_C_ZIG extern "C" __attribute__((weak))
#endif
@@ -129,7 +129,7 @@ CPP_DECL void WebCore__AbortSignal__cleanNativeBindings(WebCore::AbortSignal* ar
CPP_DECL JSC::EncodedJSValue WebCore__AbortSignal__create(JSC::JSGlobalObject* arg0);
CPP_DECL WebCore::AbortSignal* WebCore__AbortSignal__fromJS(JSC::EncodedJSValue JSValue0);
CPP_DECL WebCore::AbortSignal* WebCore__AbortSignal__ref(WebCore::AbortSignal* arg0);
CPP_DECL WebCore::AbortSignal* WebCore__AbortSignal__signal(WebCore::AbortSignal* arg0, JSC::JSGlobalObject*, uint8_t abortReason);
CPP_DECL WebCore::AbortSignal* WebCore__AbortSignal__signal(WebCore::AbortSignal* arg0, JSC::JSGlobalObject*, uint8_t abortReason);
CPP_DECL JSC::EncodedJSValue WebCore__AbortSignal__toJS(WebCore::AbortSignal* arg0, JSC::JSGlobalObject* arg1);
CPP_DECL void WebCore__AbortSignal__unref(WebCore::AbortSignal* arg0);
@@ -186,10 +186,11 @@ CPP_DECL JSC::VM* JSC__JSGlobalObject__vm(JSC::JSGlobalObject* arg0);
#pragma mark - JSC::JSMap
CPP_DECL JSC::EncodedJSValue JSC__JSMap__create(JSC::JSGlobalObject* arg0);
CPP_DECL JSC::EncodedJSValue JSC__JSMap__get_(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2);
CPP_DECL JSC::EncodedJSValue JSC__JSMap__get(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2);
CPP_DECL bool JSC__JSMap__has(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2);
CPP_DECL bool JSC__JSMap__remove(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2);
CPP_DECL void JSC__JSMap__set(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2, JSC::EncodedJSValue JSValue3);
CPP_DECL uint32_t JSC__JSMap__size(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1);
#pragma mark - JSC::JSValue

View File

@@ -13,7 +13,7 @@ pub const us_socket_t = opaque {
};
pub fn open(this: *us_socket_t, comptime is_ssl: bool, is_client: bool, ip_addr: ?[]const u8) void {
debug("us_socket_open({d}, is_client: {})", .{ @intFromPtr(this), is_client });
debug("us_socket_open({p}, is_client: {})", .{ this, is_client });
const ssl = @intFromBool(is_ssl);
if (ip_addr) |ip| {
@@ -25,22 +25,22 @@ pub const us_socket_t = opaque {
}
pub fn pause(this: *us_socket_t, ssl: bool) void {
debug("us_socket_pause({d})", .{@intFromPtr(this)});
debug("us_socket_pause({p})", .{this});
c.us_socket_pause(@intFromBool(ssl), this);
}
pub fn @"resume"(this: *us_socket_t, ssl: bool) void {
debug("us_socket_resume({d})", .{@intFromPtr(this)});
debug("us_socket_resume({p})", .{this});
c.us_socket_resume(@intFromBool(ssl), this);
}
pub fn close(this: *us_socket_t, ssl: bool, code: CloseCode) void {
debug("us_socket_close({d}, {s})", .{ @intFromPtr(this), @tagName(code) });
debug("us_socket_close({p}, {s})", .{ this, @tagName(code) });
_ = c.us_socket_close(@intFromBool(ssl), this, code, null);
}
pub fn shutdown(this: *us_socket_t, ssl: bool) void {
debug("us_socket_shutdown({d})", .{@intFromPtr(this)});
debug("us_socket_shutdown({p})", .{this});
c.us_socket_shutdown(@intFromBool(ssl), this);
}
@@ -128,25 +128,25 @@ pub const us_socket_t = opaque {
pub fn write(this: *us_socket_t, ssl: bool, data: []const u8) i32 {
const rc = c.us_socket_write(@intFromBool(ssl), this, data.ptr, @intCast(data.len));
debug("us_socket_write({d}, {d}) = {d}", .{ @intFromPtr(this), data.len, rc });
debug("us_socket_write({p}, {d}) = {d}", .{ this, data.len, rc });
return rc;
}
pub fn writeFd(this: *us_socket_t, data: []const u8, file_descriptor: bun.FD) i32 {
if (bun.Environment.isWindows) @compileError("TODO: implement writeFd on Windows");
const rc = c.us_socket_ipc_write_fd(this, data.ptr, @intCast(data.len), file_descriptor.native());
debug("us_socket_ipc_write_fd({d}, {d}, {d}) = {d}", .{ @intFromPtr(this), data.len, file_descriptor.native(), rc });
debug("us_socket_ipc_write_fd({p}, {d}, {d}) = {d}", .{ this, data.len, file_descriptor.native(), rc });
return rc;
}
pub fn write2(this: *us_socket_t, ssl: bool, first: []const u8, second: []const u8) i32 {
const rc = c.us_socket_write2(@intFromBool(ssl), this, first.ptr, first.len, second.ptr, second.len);
debug("us_socket_write2({d}, {d}, {d}) = {d}", .{ @intFromPtr(this), first.len, second.len, rc });
debug("us_socket_write2({p}, {d}, {d}) = {d}", .{ this, first.len, second.len, rc });
return rc;
}
pub fn rawWrite(this: *us_socket_t, ssl: bool, data: []const u8) i32 {
debug("us_socket_raw_write({d}, {d})", .{ @intFromPtr(this), data.len });
debug("us_socket_raw_write({p}, {d})", .{ this, data.len });
return c.us_socket_raw_write(@intFromBool(ssl), this, data.ptr, @intCast(data.len));
}

View File

@@ -236,6 +236,15 @@ pub fn writable(this: *StringBuilder) []u8 {
return ptr[this.len..this.cap];
}
/// Transfer ownership of the underlying memory to a slice.
///
/// After calling this, you are responsible for freeing the underlying memory.
/// This StringBuilder should not be used after calling this function.
pub fn moveToSlice(this: *StringBuilder, into_slice: *[]u8) void {
into_slice.* = this.allocatedSlice();
this.* = .{};
}
const std = @import("std");
const Allocator = std.mem.Allocator;

View File

@@ -137,7 +137,7 @@ pub const Promise = struct {
self.promise.resolve(globalObject, js_value);
}
pub fn reject(self: *Promise, globalObject: *jsc.JSGlobalObject, jsvalue: jsc.JSValue) void {
pub fn reject(self: *Promise, globalObject: *jsc.JSGlobalObject, jsvalue: JSError!jsc.JSValue) void {
self.promise.reject(globalObject, jsvalue);
}
@@ -162,6 +162,7 @@ const protocol = @import("./valkey_protocol.zig");
const std = @import("std");
const bun = @import("bun");
const JSError = bun.JSError;
const jsc = bun.jsc;
const node = bun.api.node;
const Slice = jsc.ZigString.Slice;

View File

@@ -1,9 +1,222 @@
pub const SubscriptionCtx = struct {
const Self = @This();
_parent: *JSValkeyClient,
original_enable_offline_queue: bool,
original_enable_auto_pipelining: bool,
const ParentJS = JSValkeyClient.js;
pub fn init(parent: *JSValkeyClient, enable_offline_queue: bool, enable_auto_pipelining: bool) Self {
const callback_map = jsc.JSMap.create(parent.globalObject);
ParentJS.gc.set(.subscriptionCallbackMap, parent.this_value.get(), parent.globalObject, callback_map);
const self = Self{
._parent = parent,
.original_enable_offline_queue = enable_offline_queue,
.original_enable_auto_pipelining = enable_auto_pipelining,
};
return self;
}
fn subscriptionCallbackMap(this: *Self) *jsc.JSMap {
const value_js = ParentJS.gc.get(.subscriptionCallbackMap, this._parent.this_value.get()).?;
return jsc.JSMap.fromJS(value_js).?;
}
/// Get the total number of channels that this subscription context is subscribed to.
pub fn subscriptionCount(this: *Self, globalObject: *jsc.JSGlobalObject) usize {
return this.subscriptionCallbackMap().size(globalObject);
}
/// Test whether this context has any subscriptions. It is mandatory to
/// guard deinit with this function.
pub fn hasSubscriptions(this: *Self, globalObject: *jsc.JSGlobalObject) bool {
return this.subscriptionCount(globalObject) > 0;
}
pub fn clearReceiveHandlers(
this: *Self,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
) void {
const map = this.subscriptionCallbackMap();
if (map.remove(globalObject, channelName)) {
this._parent.channel_subscription_count -= 1;
this._parent.updateHasPendingActivity();
}
}
/// Remove a specific receive handler.
///
/// Returns: The total number of remaining handlers for this channel, or null if here were no listeners originally
/// registered.
///
/// Note: This function will empty out the map entry if there are no more handlers registered.
pub fn removeReceiveHandler(
this: *Self,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
callback: JSValue,
) !?usize {
const map = this.subscriptionCallbackMap();
const existing = try map.get(globalObject, channelName);
if (existing == null) {
// Could not find the channel, nothing to remove.
return null;
}
if (existing.?.isUndefinedOrNull()) {
// Nothing to remove.
return null;
}
// Existing is guaranteed to be an array of callbacks.
bun.assert(existing.?.isArray());
// TODO(markovejnovic): I can't find a better way to do this... I generate a new array,
// filtering out the callback we want to remove. This is woefully inefficient for large
// sets (and surprisingly fast for small sets of callbacks).
//
// Perhaps there is an avenue to build a generic iterator pattern? @taylor.fish and I have
// briefly expressed a desire for this, and I promised her I would look into it, but at
// this moment have no proposal.
var array_it = try existing.?.arrayIterator(globalObject);
const updated_array = try jsc.JSArray.createEmpty(globalObject, 0);
while (try array_it.next()) |iter| {
if (iter == callback)
continue;
try updated_array.push(globalObject, iter);
}
// Otherwise, we have ourselves an array of callbacks. We need to remove the element in the
// array that matches the callback.
_ = map.remove(globalObject, channelName);
// Only populate the map if we have remaining callbacks for this channel.
const new_length = (updated_array.arrayIterator(globalObject) catch unreachable).len;
if (new_length != 0) {
map.set(globalObject, channelName, updated_array);
} else {
this._parent.channel_subscription_count -= 1;
this._parent.updateHasPendingActivity();
}
return new_length;
}
/// Add a handler for receiving messages on a specific channel
pub fn upsertReceiveHandler(
this: *Self,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
callback: JSValue,
) bun.JSError!void {
const map = this.subscriptionCallbackMap();
var handlers_array: JSValue = undefined;
var is_new_channel = false;
if (try map.get(globalObject, channelName)) |existing_handler_arr| {
debug("Adding a new receive handler.", .{});
if (existing_handler_arr.isUndefined()) {
// Create a new array if the existing_handler_arr is undefined/null
handlers_array = try jsc.JSArray.createEmpty(globalObject, 0);
is_new_channel = true;
} else if (existing_handler_arr.isArray()) {
// Use the existing array
handlers_array = existing_handler_arr;
} else unreachable;
} else {
// No existing_handler_arr exists, create a new array
handlers_array = try jsc.JSArray.createEmpty(globalObject, 0);
is_new_channel = true;
}
// Append the new callback to the array
try handlers_array.push(globalObject, callback);
// Set the updated array back in the map
map.set(globalObject, channelName, handlers_array);
// Update subscription count if this is a new channel
if (is_new_channel) {
this._parent.channel_subscription_count += 1;
this._parent.updateHasPendingActivity();
}
}
pub fn registerCallback(this: *Self, globalObject: *jsc.JSGlobalObject, eventString: JSValue, callback: JSValue) bun.JSError!void {
this.subscriptionCallbackMap().set(globalObject, eventString, callback);
}
pub fn getCallbacks(this: *Self, globalObject: *jsc.JSGlobalObject, channelName: JSValue) bun.JSError!?JSValue {
const result = try this.subscriptionCallbackMap().get(globalObject, channelName);
if (result) |r| {
if (r.isUndefinedOrNull()) {
return null;
}
}
return result;
}
/// Invoke callbacks for a channel with the given arguments
/// Handles both single callbacks and arrays of callbacks
pub fn invokeCallback(
this: *Self,
globalObject: *jsc.JSGlobalObject,
channelName: JSValue,
args: []const JSValue,
) bun.JSError!void {
const callbacks = try this.getCallbacks(globalObject, channelName) orelse {
debug("No callbacks found for channel {s}", .{channelName.asString().getZigString(globalObject)});
return;
};
// If callbacks is an array, iterate and call each one
if (callbacks.isArray()) {
var iter = try callbacks.arrayIterator(globalObject);
while (try iter.next()) |callback| {
if (callback.isCallable()) {
_ = callback.call(globalObject, .js_undefined, args) catch |e| {
return e;
};
}
}
} else if (callbacks.isCallable()) {
_ = callbacks.call(globalObject, .js_undefined, args) catch |e| {
return e;
};
}
}
pub fn deinit(this: *Self) void {
this._parent.channel_subscription_count = 0;
this._parent.updateHasPendingActivity();
ParentJS.gc.set(
.subscriptionCallbackMap,
this._parent.this_value.get(),
this._parent.globalObject,
.js_undefined,
);
}
};
/// Valkey client wrapper for JavaScript
pub const JSValkeyClient = struct {
client: valkey.ValkeyClient,
globalObject: *jsc.JSGlobalObject,
this_value: jsc.JSRef = jsc.JSRef.empty(),
poll_ref: bun.Async.KeepAlive = .{},
_subscription_ctx: ?SubscriptionCtx,
channel_subscription_count: u32 = 0,
has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
timer: Timer.EventLoopTimer = .{
.tag = .ValkeyConnectionTimeout,
.next = .{
@@ -36,6 +249,8 @@ pub const JSValkeyClient = struct {
}
pub fn create(globalObject: *jsc.JSGlobalObject, arguments: []const JSValue) bun.JSError!*JSValkeyClient {
const this_allocator = bun.default_allocator;
const vm = globalObject.bunVM();
const url_str = if (arguments.len < 1 or arguments[0].isUndefined())
if (vm.transpiler.env.get("REDIS_URL") orelse vm.transpiler.env.get("VALKEY_URL")) |url|
@@ -46,7 +261,7 @@ pub const JSValkeyClient = struct {
try arguments[0].toBunString(globalObject);
defer url_str.deref();
const url_utf8 = url_str.toUTF8WithoutRef(bun.default_allocator);
const url_utf8 = url_str.toUTF8WithoutRef(this_allocator);
defer url_utf8.deinit();
const url = bun.URL.parse(url_utf8.slice());
@@ -88,7 +303,7 @@ pub const JSValkeyClient = struct {
var connection_strings: []u8 = &.{};
errdefer {
bun.default_allocator.free(connection_strings);
this_allocator.free(connection_strings);
}
if (url.username.len > 0 or url.password.len > 0 or hostname.len > 0) {
@@ -96,11 +311,12 @@ pub const JSValkeyClient = struct {
b.count(url.username);
b.count(url.password);
b.count(hostname);
try b.allocate(bun.default_allocator);
try b.allocate(this_allocator);
defer b.deinit(this_allocator);
username = b.append(url.username);
password = b.append(url.password);
hostname = b.append(hostname);
connection_strings = b.allocatedSlice();
b.moveToSlice(&connection_strings);
}
const database = if (url.pathname.len > 0) std.fmt.parseInt(u32, url.pathname[1..], 10) catch 0 else 0;
@@ -109,6 +325,7 @@ pub const JSValkeyClient = struct {
return JSValkeyClient.new(.{
.ref_count = .init(),
._subscription_ctx = null,
.client = .{
.vm = vm,
.address = switch (uri) {
@@ -120,10 +337,11 @@ pub const JSValkeyClient = struct {
},
},
},
.protocol = uri,
.username = username,
.password = password,
.in_flight = .init(bun.default_allocator),
.queue = .init(bun.default_allocator),
.in_flight = .init(this_allocator),
.queue = .init(this_allocator),
.status = .disconnected,
.connection_strings = connection_strings,
.socket = .{
@@ -134,7 +352,7 @@ pub const JSValkeyClient = struct {
},
},
.database = database,
.allocator = bun.default_allocator,
.allocator = this_allocator,
.flags = .{
.enable_auto_reconnect = options.enable_auto_reconnect,
.enable_offline_queue = options.enable_offline_queue,
@@ -148,6 +366,122 @@ pub const JSValkeyClient = struct {
});
}
/// Clone this client while remaining in the initial disconnected state. This does not preserve
/// the
pub fn cloneWithoutConnecting(this: *const JSValkeyClient) bun.OOM!*JSValkeyClient {
const vm = this.globalObject.bunVM();
const relocate = struct {
// Given a slice within a window, move the slice to point to a new
// window, with the same offset relative to the window start.
fn slice(original: []const u8, old_base: [*]const u8, new_base: [*]const u8) []const u8 {
const offset = @intFromPtr(original.ptr) - @intFromPtr(old_base);
return new_base[offset..][0..original.len];
}
}.slice;
// Make a copy of connection_strings to avoid double-free
const connection_strings_copy = try this.client.allocator.dupe(u8, this.client.connection_strings);
// Note that there is no need to copy username, password and address since the copies live
// within the connection_strings buffer.
const base_ptr = this.client.connection_strings.ptr;
const new_base = connection_strings_copy.ptr;
const username = relocate(this.client.username, base_ptr, new_base);
const password = relocate(this.client.password, base_ptr, new_base);
const orig_hostname = this.client.address.hostname();
const hostname = relocate(orig_hostname, base_ptr, new_base);
return JSValkeyClient.new(.{
.ref_count = .init(),
._subscription_ctx = null,
.client = .{
.vm = vm,
.address = switch (this.client.protocol) {
.standalone_unix, .standalone_tls_unix => .{ .unix = hostname },
else => .{
.host = .{
.host = hostname,
.port = this.client.address.host.port,
},
},
},
.protocol = this.client.protocol,
.username = username,
.password = password,
.in_flight = .init(this.client.allocator),
.queue = .init(this.client.allocator),
.status = .disconnected,
.connection_strings = connection_strings_copy,
.socket = .{
.SocketTCP = .{
.socket = .{
.detached = {},
},
},
},
.database = this.client.database,
.allocator = this.client.allocator,
.flags = .{
// Because this starts in the disconnected state, we need to reset some flags.
.is_authenticated = false,
// If the user manually closed the connection, then duplicating a closed client
// means the new client remains finalized.
.is_manually_closed = this.client.flags.is_manually_closed,
.enable_offline_queue = if (this._subscription_ctx) |*ctx| ctx.original_enable_offline_queue else this.client.flags.enable_offline_queue,
.needs_to_open_socket = true,
.enable_auto_reconnect = this.client.flags.enable_auto_reconnect,
.is_reconnecting = false,
.auto_pipelining = if (this._subscription_ctx) |*ctx| ctx.original_enable_auto_pipelining else this.client.flags.auto_pipelining,
// Duplicating a finalized client means it stays finalized.
.finalized = this.client.flags.finalized,
},
.max_retries = this.client.max_retries,
.connection_timeout_ms = this.client.connection_timeout_ms,
.idle_timeout_interval_ms = this.client.idle_timeout_interval_ms,
},
.globalObject = this.globalObject,
});
}
pub fn getOrCreateSubscriptionCtxEnteringSubscriptionMode(
this: *JSValkeyClient,
) *SubscriptionCtx {
if (this._subscription_ctx) |*ctx| {
// If we already have a subscription context, return it
return ctx;
}
// Save the original flag values and create a new subscription context
this._subscription_ctx = SubscriptionCtx.init(
this,
this.client.flags.enable_offline_queue,
this.client.flags.auto_pipelining,
);
// We need to make sure we disable the offline queue.
this.client.flags.enable_offline_queue = false;
this.client.flags.auto_pipelining = false;
return &(this._subscription_ctx.?);
}
pub fn deleteSubscriptionCtx(this: *JSValkeyClient) void {
if (this._subscription_ctx) |*ctx| {
// Restore the original flag values when leaving subscription mode
this.client.flags.enable_offline_queue = ctx.original_enable_offline_queue;
this.client.flags.auto_pipelining = ctx.original_enable_auto_pipelining;
ctx.deinit();
}
this._subscription_ctx = null;
}
pub fn isSubscriber(this: *const JSValkeyClient) bool {
return this._subscription_ctx != null;
}
pub fn getConnected(this: *JSValkeyClient, _: *jsc.JSGlobalObject) JSValue {
return JSValue.jsBoolean(this.client.status == .connected);
}
@@ -159,16 +493,22 @@ pub const JSValkeyClient = struct {
return JSValue.jsNumber(len);
}
pub fn doConnect(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, this_value: JSValue) bun.JSError!JSValue {
pub fn doConnect(
this: *JSValkeyClient,
globalObject: *jsc.JSGlobalObject,
this_value: JSValue,
) bun.JSError!JSValue {
this.ref();
defer this.deref();
// If already connected, resolve immediately
if (this.client.status == .connected) {
debug("Connecting client is already connected.", .{});
return jsc.JSPromise.resolvedPromiseValue(globalObject, js.helloGetCached(this_value) orelse .js_undefined);
}
if (js.connectionPromiseGetCached(this_value)) |promise| {
debug("Connecting client is already connected.", .{});
return promise;
}
@@ -181,6 +521,7 @@ pub const JSValkeyClient = struct {
this.this_value.setStrong(this_value, globalObject);
if (this.client.flags.needs_to_open_socket) {
debug("Need to open socket, starting connection process.", .{});
this.poll_ref.ref(this.client.vm);
this.connect() catch |err| {
@@ -203,6 +544,7 @@ pub const JSValkeyClient = struct {
},
.failed => {
this.client.status = .disconnected;
this.updateHasPendingActivity();
this.client.flags.is_reconnecting = true;
this.client.retry_attempts = 0;
this.reconnect();
@@ -373,6 +715,7 @@ pub const JSValkeyClient = struct {
defer this.deref();
this.client.status = .connecting;
this.updateHasPendingActivity();
// Ref the poll to keep event loop alive during connection
this.poll_ref.disable();
@@ -414,7 +757,17 @@ pub const JSValkeyClient = struct {
if (js.connectionPromiseGetCached(this_value)) |promise| {
js.connectionPromiseSetCached(this_value, globalObject, .zero);
promise.asPromise().?.resolve(globalObject, hello_value);
const js_promise = promise.asPromise().?;
if (this.client.flags.connection_promise_returns_client) {
debug("Resolving connection promise with client instance", .{});
const this_js = this.toJS(globalObject);
this_js.unprotect();
js_promise.resolve(globalObject, this_js);
} else {
debug("Resolving connection promise with HELLO response", .{});
js_promise.resolve(globalObject, hello_value);
}
this.client.flags.connection_promise_returns_client = false;
}
}
@@ -422,6 +775,86 @@ pub const JSValkeyClient = struct {
this.updatePollRef();
}
pub fn onValkeySubscribe(this: *JSValkeyClient, value: *protocol.RESPValue) void {
if (!this.isSubscriber()) {
debug("onSubscribe called but client is not in subscriber mode", .{});
return;
}
_ = value;
this.client.onWritable();
this.updatePollRef();
}
pub fn onValkeyUnsubscribe(this: *JSValkeyClient, value: *protocol.RESPValue) void {
if (!this.isSubscriber()) {
debug("onUnsubscribe called but client is not in subscriber mode", .{});
return;
}
var subscription_ctx = this._subscription_ctx.?;
// Check if we have any remaining subscriptions
// If the callback map is empty, we can exit subscription mode
if (!subscription_ctx.hasSubscriptions(this.globalObject)) {
// No more subscriptions, exit subscription mode
this.deleteSubscriptionCtx();
}
_ = value;
this.client.onWritable();
this.updatePollRef();
}
pub fn onValkeyMessage(this: *JSValkeyClient, value: []protocol.RESPValue) void {
if (!this.isSubscriber()) {
debug("onMessage called but client is not in subscriber mode", .{});
return;
}
const globalObject = this.globalObject;
const event_loop = this.client.vm.eventLoop();
event_loop.enter();
defer event_loop.exit();
// The message push should be an array with [channel, message]
if (value.len < 2) {
debug("Message array has insufficient elements: {}", .{value.len});
return;
}
// Extract channel and message
const channel_value = value[0].toJS(globalObject) catch {
debug("Failed to convert channel to JS", .{});
return;
};
const message_value = value[1].toJS(globalObject) catch {
debug("Failed to convert message to JS", .{});
return;
};
// Get the subscription context
const subs_ctx = &(this._subscription_ctx orelse {
debug("No subscription context found", .{});
return;
});
// Invoke callbacks for this channel with message and channel as arguments
subs_ctx.invokeCallback(
globalObject,
channel_value,
&[_]JSValue{ message_value, channel_value },
) catch |e| {
debug("Failed to invoke callbacks. Error: {}", .{e});
this.failWithJSValue(globalObject.takeException(e));
};
this.client.onWritable();
this.updatePollRef();
}
// Callback for when Valkey client needs to reconnect
pub fn onValkeyReconnect(this: *JSValkeyClient) void {
// Schedule reconnection using our safe timer methods
@@ -494,6 +927,33 @@ pub const JSValkeyClient = struct {
}
}
pub fn hasPendingActivity(this: *JSValkeyClient) bool {
// TODO(markovejnovic): Could this be .monotonic? My intuition says
// yes, because none of the things that may be freed will actually be
// read. The pointers don't move, so it should be safe, but I've
// decided here to be conservative.
return this.has_pending_activity.load(.acquire);
}
pub fn updateHasPendingActivity(this: *JSValkeyClient) void {
if (this.client.hasAnyPendingCommands()) {
this.has_pending_activity.store(true, .release);
return;
}
if (this.channel_subscription_count > 0) {
this.has_pending_activity.store(true, .release);
return;
}
if (this.client.status != .connected and this.client.status != .disconnected) {
this.has_pending_activity.store(true, .release);
return;
}
this.has_pending_activity.store(false, .release);
}
pub fn finalize(this: *JSValkeyClient) void {
// Since this.stopTimers impacts the reference count potentially, we
// need to ref/unref here as well.
@@ -507,6 +967,9 @@ pub const JSValkeyClient = struct {
}
this.client.flags.finalized = true;
this.client.close();
if (this._subscription_ctx) |*ctx| {
ctx.deinit();
}
this.deref();
}
@@ -521,6 +984,7 @@ pub const JSValkeyClient = struct {
}
fn connect(this: *JSValkeyClient) !void {
debug("Connecting to Redis.", .{});
this.client.flags.needs_to_open_socket = false;
const vm = this.client.vm;
@@ -641,6 +1105,7 @@ pub const JSValkeyClient = struct {
pub const decr = fns.decr;
pub const del = fns.del;
pub const dump = fns.dump;
pub const duplicate = fns.duplicate;
pub const exists = fns.exists;
pub const expire = fns.expire;
pub const expiretime = fns.expiretime;
@@ -742,6 +1207,7 @@ fn SocketHandler(comptime ssl: bool) type {
loop.enter();
defer loop.exit();
this.client.status = .failed;
this.updateHasPendingActivity();
this.client.flags.is_manually_closed = true;
this.client.failWithJSValue(this.globalObject, ssl_error.toJS(this.globalObject));
this.client.close();
@@ -756,7 +1222,6 @@ fn SocketHandler(comptime ssl: bool) type {
pub fn onClose(this: *JSValkeyClient, _: SocketType, _: i32, _: ?*anyopaque) void {
// Ensure the socket pointer is updated.
this.client.socket = .{ .SocketTCP = .detached };
this.client.onClose();
}

View File

@@ -1,3 +1,19 @@
fn requireNotSubscriber(this: *const JSValkeyClient, function_name: []const u8) bun.JSError!void {
const fmt_string = "RedisClient.prototype.{s} cannot be called while in subscriber mode.";
if (this.isSubscriber()) {
return this.globalObject.throw(fmt_string, .{function_name});
}
}
fn requireSubscriber(this: *const JSValkeyClient, function_name: []const u8) bun.JSError!void {
const fmt_string = "RedisClient.prototype.{s} can only be called while in subscriber mode.";
if (!this.isSubscriber()) {
return this.globalObject.throw(fmt_string, .{function_name});
}
}
pub fn jsSend(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
const command = try callframe.argument(0).toBunString(globalObject);
defer command.deref();
@@ -41,6 +57,8 @@ pub fn jsSend(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfram
}
pub fn get(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("get", "key", "string or buffer");
};
@@ -61,6 +79,8 @@ pub fn get(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
}
pub fn getBuffer(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("getBuffer", "key", "string or buffer");
};
@@ -81,6 +101,8 @@ pub fn getBuffer(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callf
}
pub fn set(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const args_view = callframe.arguments();
var stack_fallback = std.heap.stackFallback(512, bun.default_allocator);
var args = try std.ArrayList(JSArgument).initCapacity(stack_fallback.get(), args_view.len);
@@ -127,6 +149,8 @@ pub fn set(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
}
pub fn incr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("incr", "key", "string or buffer");
};
@@ -147,6 +171,8 @@ pub fn incr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
}
pub fn decr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("decr", "key", "string or buffer");
};
@@ -167,6 +193,8 @@ pub fn decr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
}
pub fn exists(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("exists", "key", "string or buffer");
};
@@ -188,6 +216,8 @@ pub fn exists(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfram
}
pub fn expire(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("expire", "key", "string or buffer");
};
@@ -219,6 +249,8 @@ pub fn expire(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfram
}
pub fn ttl(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("ttl", "key", "string or buffer");
};
@@ -240,6 +272,8 @@ pub fn ttl(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
// Implement srem (remove value from a set)
pub fn srem(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("srem", "key", "string or buffer");
};
@@ -265,6 +299,8 @@ pub fn srem(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
// Implement srandmember (get random member from set)
pub fn srandmember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("srandmember", "key", "string or buffer");
};
@@ -286,6 +322,8 @@ pub fn srandmember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, cal
// Implement smembers (get all members of a set)
pub fn smembers(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("smembers", "key", "string or buffer");
};
@@ -307,6 +345,8 @@ pub fn smembers(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfr
// Implement spop (pop a random member from a set)
pub fn spop(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("spop", "key", "string or buffer");
};
@@ -328,6 +368,8 @@ pub fn spop(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
// Implement sadd (add member to a set)
pub fn sadd(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("sadd", "key", "string or buffer");
};
@@ -353,6 +395,8 @@ pub fn sadd(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
// Implement sismember (check if value is member of a set)
pub fn sismember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("sismember", "key", "string or buffer");
};
@@ -379,6 +423,8 @@ pub fn sismember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callf
// Implement hmget (get multiple values from hash)
pub fn hmget(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = (try fromJS(globalObject, callframe.argument(0))) orelse {
return globalObject.throwInvalidArgumentType("hmget", "key", "string or buffer");
};
@@ -426,6 +472,8 @@ pub fn hmget(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe
// Implement hincrby (increment hash field by integer value)
pub fn hincrby(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = try callframe.argument(0).toBunString(globalObject);
defer key.deref();
const field = try callframe.argument(1).toBunString(globalObject);
@@ -456,6 +504,8 @@ pub fn hincrby(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callfra
// Implement hincrbyfloat (increment hash field by float value)
pub fn hincrbyfloat(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = try callframe.argument(0).toBunString(globalObject);
defer key.deref();
const field = try callframe.argument(1).toBunString(globalObject);
@@ -486,6 +536,8 @@ pub fn hincrbyfloat(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, ca
// Implement hmset (set multiple values in hash)
pub fn hmset(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
const key = try callframe.argument(0).toBunString(globalObject);
defer key.deref();
@@ -578,59 +630,494 @@ pub fn ping(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe:
return promise.toJS();
}
pub const bitcount = compile.@"(key: RedisKey)"("bitcount", "BITCOUNT", "key").call;
pub const dump = compile.@"(key: RedisKey)"("dump", "DUMP", "key").call;
pub const expiretime = compile.@"(key: RedisKey)"("expiretime", "EXPIRETIME", "key").call;
pub const getdel = compile.@"(key: RedisKey)"("getdel", "GETDEL", "key").call;
pub const getex = compile.@"(...strings: string[])"("getex", "GETEX").call;
pub const hgetall = compile.@"(key: RedisKey)"("hgetall", "HGETALL", "key").call;
pub const hkeys = compile.@"(key: RedisKey)"("hkeys", "HKEYS", "key").call;
pub const hlen = compile.@"(key: RedisKey)"("hlen", "HLEN", "key").call;
pub const hvals = compile.@"(key: RedisKey)"("hvals", "HVALS", "key").call;
pub const keys = compile.@"(key: RedisKey)"("keys", "KEYS", "key").call;
pub const llen = compile.@"(key: RedisKey)"("llen", "LLEN", "key").call;
pub const lpop = compile.@"(key: RedisKey)"("lpop", "LPOP", "key").call;
pub const persist = compile.@"(key: RedisKey)"("persist", "PERSIST", "key").call;
pub const pexpiretime = compile.@"(key: RedisKey)"("pexpiretime", "PEXPIRETIME", "key").call;
pub const pttl = compile.@"(key: RedisKey)"("pttl", "PTTL", "key").call;
pub const rpop = compile.@"(key: RedisKey)"("rpop", "RPOP", "key").call;
pub const scard = compile.@"(key: RedisKey)"("scard", "SCARD", "key").call;
pub const strlen = compile.@"(key: RedisKey)"("strlen", "STRLEN", "key").call;
pub const @"type" = compile.@"(key: RedisKey)"("type", "TYPE", "key").call;
pub const zcard = compile.@"(key: RedisKey)"("zcard", "ZCARD", "key").call;
pub const zpopmax = compile.@"(key: RedisKey)"("zpopmax", "ZPOPMAX", "key").call;
pub const zpopmin = compile.@"(key: RedisKey)"("zpopmin", "ZPOPMIN", "key").call;
pub const zrandmember = compile.@"(key: RedisKey)"("zrandmember", "ZRANDMEMBER", "key").call;
pub fn publish(
this: *JSValkeyClient,
globalObject: *jsc.JSGlobalObject,
callframe: *jsc.CallFrame,
) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
pub const append = compile.@"(key: RedisKey, value: RedisValue)"("append", "APPEND", "key", "value").call;
pub const getset = compile.@"(key: RedisKey, value: RedisValue)"("getset", "GETSET", "key", "value").call;
pub const lpush = compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("lpush", "LPUSH").call;
pub const lpushx = compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("lpushx", "LPUSHX").call;
pub const pfadd = compile.@"(key: RedisKey, value: RedisValue)"("pfadd", "PFADD", "key", "value").call;
pub const rpush = compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("rpush", "RPUSH").call;
pub const rpushx = compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("rpushx", "RPUSHX").call;
pub const setnx = compile.@"(key: RedisKey, value: RedisValue)"("setnx", "SETNX", "key", "value").call;
pub const zscore = compile.@"(key: RedisKey, value: RedisValue)"("zscore", "ZSCORE", "key", "value").call;
const args_view = callframe.arguments();
var stack_fallback = std.heap.stackFallback(512, bun.default_allocator);
var args = try std.ArrayList(JSArgument).initCapacity(stack_fallback.get(), args_view.len);
defer {
for (args.items) |*item| {
item.deinit();
}
args.deinit();
}
pub const del = compile.@"(key: RedisKey, ...args: RedisKey[])"("del", "DEL", "key").call;
pub const mget = compile.@"(key: RedisKey, ...args: RedisKey[])"("mget", "MGET", "key").call;
const arg0 = callframe.argument(0);
if (!arg0.isString()) {
return globalObject.throwInvalidArgumentType("publish", "channel", "string");
}
const channel = (try fromJS(globalObject, arg0)) orelse unreachable;
args.appendAssumeCapacity(channel);
const arg1 = callframe.argument(1);
if (!arg1.isString()) {
return globalObject.throwInvalidArgumentType("publish", "message", "string");
}
const message = (try fromJS(globalObject, arg1)) orelse unreachable;
args.appendAssumeCapacity(message);
const promise = this.send(
globalObject,
callframe.this(),
&.{
.command = "PUBLISH",
.args = .{ .args = args.items },
},
) catch |err| {
return protocol.valkeyErrorToJS(globalObject, "Failed to send PUBLISH command", err);
};
return promise.toJS();
}
pub fn subscribe(
this: *JSValkeyClient,
globalObject: *jsc.JSGlobalObject,
callframe: *jsc.CallFrame,
) bun.JSError!JSValue {
const channel_or_many, const handler_callback = callframe.argumentsAsArray(2);
var stack_fallback = std.heap.stackFallback(512, bun.default_allocator);
var redis_channels = try std.ArrayList(JSArgument).initCapacity(stack_fallback.get(), 1);
defer {
for (redis_channels.items) |*item| {
item.deinit();
}
redis_channels.deinit();
}
if (!handler_callback.isCallable()) {
return globalObject.throwInvalidArgumentType("subscribe", "listener", "function");
}
// We now need to register the callback with our subscription context, which may or may not exist.
var subscription_ctx = this.getOrCreateSubscriptionCtxEnteringSubscriptionMode();
// The first argument given is the channel or may be an array of channels.
if (channel_or_many.isArray()) {
if ((try channel_or_many.getLength(globalObject)) == 0) {
return globalObject.throwInvalidArguments("subscribe requires at least one channel", .{});
}
try redis_channels.ensureTotalCapacity(try channel_or_many.getLength(globalObject));
var array_iter = try channel_or_many.arrayIterator(globalObject);
while (try array_iter.next()) |channel_arg| {
const channel = (try fromJS(globalObject, channel_arg)) orelse {
return globalObject.throwInvalidArgumentType("subscribe", "channel", "string");
};
redis_channels.appendAssumeCapacity(channel);
try subscription_ctx.upsertReceiveHandler(globalObject, channel_arg, handler_callback);
}
} else if (channel_or_many.isString()) {
// It is a single string channel
const channel = (try fromJS(globalObject, channel_or_many)) orelse {
return globalObject.throwInvalidArgumentType("subscribe", "channel", "string");
};
redis_channels.appendAssumeCapacity(channel);
try subscription_ctx.upsertReceiveHandler(globalObject, channel_or_many, handler_callback);
} else {
return globalObject.throwInvalidArgumentType("subscribe", "channel", "string or array");
}
const command: valkey.Command = .{
.command = "SUBSCRIBE",
.args = .{ .args = redis_channels.items },
};
const promise = this.send(
globalObject,
callframe.this(),
&command,
) catch |err| {
// If we find an error, we need to clean up the subscription context.
this.deleteSubscriptionCtx();
return protocol.valkeyErrorToJS(globalObject, "Failed to send SUBSCRIBE command", err);
};
return promise.toJS();
}
/// Send redis the UNSUBSCRIBE RESP command and clean up anything necessary after the unsubscribe commoand.
///
/// The subscription context must exist when calling this function.
fn sendUnsubscribeRequestAndCleanup(
this: *JSValkeyClient,
this_js: jsc.JSValue,
globalObject: *jsc.JSGlobalObject,
redis_channels: []JSArgument,
) !jsc.JSValue {
// Send UNSUBSCRIBE command
const command: valkey.Command = .{
.command = "UNSUBSCRIBE",
.args = .{ .args = redis_channels },
};
const promise = this.send(
globalObject,
this_js,
&command,
) catch |err| {
return protocol.valkeyErrorToJS(globalObject, "Failed to send UNSUBSCRIBE command", err);
};
// We do not delete the subscription context here, but rather when the
// onValkeyUnsubscribe callback is invoked.
return promise.toJS();
}
pub fn unsubscribe(
this: *JSValkeyClient,
globalObject: *jsc.JSGlobalObject,
callframe: *jsc.CallFrame,
) bun.JSError!JSValue {
// Check if we're in subscription mode
try requireSubscriber(this, @src().fn_name);
const args_view = callframe.arguments();
var stack_fallback = std.heap.stackFallback(512, bun.default_allocator);
var redis_channels = try std.ArrayList(JSArgument).initCapacity(stack_fallback.get(), 1);
defer {
for (redis_channels.items) |*item| {
item.deinit();
}
redis_channels.deinit();
}
// If no arguments, unsubscribe from all channels
if (args_view.len == 0) {
return try sendUnsubscribeRequestAndCleanup(this, callframe.this(), globalObject, redis_channels.items);
}
// The first argument can be a channel or an array of channels
const channel_or_many = callframe.argument(0);
// Get the subscription context
var subscription_ctx = this._subscription_ctx orelse {
return .js_undefined;
};
// Two arguments means .unsubscribe(channel, listener) is invoked.
if (callframe.arguments().len == 2) {
// In this case, the first argument is a channel string and the second
// argument is the handler to remove.
if (!channel_or_many.isString()) {
return globalObject.throwInvalidArgumentType(
"unsubscribe",
"channel",
"string",
);
}
const channel = channel_or_many;
const listener_cb = callframe.argument(1);
if (!listener_cb.isCallable()) {
return globalObject.throwInvalidArgumentType(
"unsubscribe",
"listener",
"function",
);
}
// Populate the redis_channels list with the single channel to
// unsubscribe from. This s important since this list is used to send
// the UNSUBSCRIBE command to redis. Without this, we would end up
// unsubscribing from all channels.
redis_channels.appendAssumeCapacity((try fromJS(globalObject, channel)) orelse {
return globalObject.throwInvalidArgumentType("unsubscribe", "channel", "string");
});
const remaining_listeners = subscription_ctx.removeReceiveHandler(globalObject, channel, listener_cb) catch {
return globalObject.throw(
"Failed to remove handler for channel {}",
.{channel.asString().getZigString(globalObject)},
);
} orelse {
// Listeners weren't present in the first place, so we can return a
// resolved promise.
const promise = jsc.JSPromise.create(globalObject);
promise.resolve(globalObject, .js_undefined);
return promise.toJS();
};
// In this case, we only want to send the unsubscribe command to redis if there are no more listeners for this
// channel.
if (remaining_listeners == 0) {
return try sendUnsubscribeRequestAndCleanup(this, callframe.this(), globalObject, redis_channels.items);
}
// Otherwise, in order to keep the API consistent, we need to return a resolved promise.
const promise = jsc.JSPromise.create(globalObject);
promise.resolve(globalObject, .js_undefined);
return promise.toJS();
}
if (channel_or_many.isArray()) {
if ((try channel_or_many.getLength(globalObject)) == 0) {
return globalObject.throwInvalidArguments(
"unsubscribe requires at least one channel",
.{},
);
}
try redis_channels.ensureTotalCapacity(try channel_or_many.getLength(globalObject));
// It is an array, so let's iterate over it
var array_iter = try channel_or_many.arrayIterator(globalObject);
while (try array_iter.next()) |channel_arg| {
const channel = (try fromJS(globalObject, channel_arg)) orelse {
return globalObject.throwInvalidArgumentType("unsubscribe", "channel", "string");
};
redis_channels.appendAssumeCapacity(channel);
// Clear the handlers for this channel
subscription_ctx.clearReceiveHandlers(globalObject, channel_arg);
}
} else if (channel_or_many.isString()) {
// It is a single string channel
const channel = (try fromJS(globalObject, channel_or_many)) orelse {
return globalObject.throwInvalidArgumentType("unsubscribe", "channel", "string");
};
redis_channels.appendAssumeCapacity(channel);
// Clear the handlers for this channel
subscription_ctx.clearReceiveHandlers(globalObject, channel_or_many);
} else {
return globalObject.throwInvalidArgumentType("unsubscribe", "channel", "string or array");
}
// Now send the unsubscribe command and clean up if necessary
return try sendUnsubscribeRequestAndCleanup(this, callframe.this(), globalObject, redis_channels.items);
}
pub fn bitcount(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("bitcount", "BITCOUNT", "key").call(this, globalObject, callframe);
}
pub fn dump(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("dump", "DUMP", "key").call(this, globalObject, callframe);
}
pub fn expiretime(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("expiretime", "EXPIRETIME", "key").call(this, globalObject, callframe);
}
pub fn getdel(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("getdel", "GETDEL", "key").call(this, globalObject, callframe);
}
pub fn getex(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("getex", "GETEX").call(this, globalObject, callframe);
}
pub fn hgetall(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("hgetall", "HGETALL", "key").call(this, globalObject, callframe);
}
pub fn hkeys(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("hkeys", "HKEYS", "key").call(this, globalObject, callframe);
}
pub fn hlen(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("hlen", "HLEN", "key").call(this, globalObject, callframe);
}
pub fn hvals(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("hvals", "HVALS", "key").call(this, globalObject, callframe);
}
pub fn keys(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("keys", "KEYS", "key").call(this, globalObject, callframe);
}
pub fn llen(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("llen", "LLEN", "key").call(this, globalObject, callframe);
}
pub fn lpop(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("lpop", "LPOP", "key").call(this, globalObject, callframe);
}
pub fn persist(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("persist", "PERSIST", "key").call(this, globalObject, callframe);
}
pub fn pexpiretime(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("pexpiretime", "PEXPIRETIME", "key").call(this, globalObject, callframe);
}
pub fn pttl(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("pttl", "PTTL", "key").call(this, globalObject, callframe);
}
pub fn rpop(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("rpop", "RPOP", "key").call(this, globalObject, callframe);
}
pub fn scard(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("scard", "SCARD", "key").call(this, globalObject, callframe);
}
pub fn strlen(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("strlen", "STRLEN", "key").call(this, globalObject, callframe);
}
pub fn @"type"(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("type", "TYPE", "key").call(this, globalObject, callframe);
}
pub fn zcard(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("zcard", "ZCARD", "key").call(this, globalObject, callframe);
}
pub fn zpopmax(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("zpopmax", "ZPOPMAX", "key").call(this, globalObject, callframe);
}
pub fn zpopmin(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("zpopmin", "ZPOPMIN", "key").call(this, globalObject, callframe);
}
pub fn zrandmember(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey)"("zrandmember", "ZRANDMEMBER", "key").call(this, globalObject, callframe);
}
pub fn append(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue)"("append", "APPEND", "key", "value").call(this, globalObject, callframe);
}
pub fn getset(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue)"("getset", "GETSET", "key", "value").call(this, globalObject, callframe);
}
pub fn lpush(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("lpush", "LPUSH").call(this, globalObject, callframe);
}
pub fn lpushx(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("lpushx", "LPUSHX").call(this, globalObject, callframe);
}
pub fn pfadd(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue)"("pfadd", "PFADD", "key", "value").call(this, globalObject, callframe);
}
pub fn rpush(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("rpush", "RPUSH").call(this, globalObject, callframe);
}
pub fn rpushx(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue, ...args: RedisValue)"("rpushx", "RPUSHX").call(this, globalObject, callframe);
}
pub fn setnx(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue)"("setnx", "SETNX", "key", "value").call(this, globalObject, callframe);
}
pub fn zscore(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, value: RedisValue)"("zscore", "ZSCORE", "key", "value").call(this, globalObject, callframe);
}
pub fn del(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, ...args: RedisKey[])"("del", "DEL", "key").call(this, globalObject, callframe);
}
pub fn mget(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(key: RedisKey, ...args: RedisKey[])"("mget", "MGET", "key").call(this, globalObject, callframe);
}
pub fn script(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("script", "SCRIPT").call(this, globalObject, callframe);
}
pub fn select(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("select", "SELECT").call(this, globalObject, callframe);
}
pub fn spublish(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("spublish", "SPUBLISH").call(this, globalObject, callframe);
}
pub fn smove(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("smove", "SMOVE").call(this, globalObject, callframe);
}
pub fn substr(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("substr", "SUBSTR").call(this, globalObject, callframe);
}
pub fn hstrlen(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("hstrlen", "HSTRLEN").call(this, globalObject, callframe);
}
pub fn zrank(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("zrank", "ZRANK").call(this, globalObject, callframe);
}
pub fn zrevrank(this: *JSValkeyClient, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!JSValue {
try requireNotSubscriber(this, @src().fn_name);
return compile.@"(...strings: string[])"("zrevrank", "ZREVRANK").call(this, globalObject, callframe);
}
pub fn duplicate(
this: *JSValkeyClient,
globalObject: *jsc.JSGlobalObject,
callframe: *jsc.CallFrame,
) bun.JSError!JSValue {
// We ignore the arguments if the user provided any.
_ = callframe;
var new_client: *JSValkeyClient = try this.cloneWithoutConnecting();
var new_client_js = new_client.toJS(globalObject);
new_client.this_value = jsc.JSRef.initWeak(new_client_js);
// If the original client is already connected and not manually closed, start connecting the new client.
if (this.client.status == .connected and !this.client.flags.is_manually_closed) {
new_client.client.flags.connection_promise_returns_client = true;
new_client_js.protect();
return try new_client.doConnect(globalObject, new_client_js);
}
// Otherwise, we create a dummy promise to yield the unconnected client.
const promise = jsc.JSPromise.create(globalObject);
promise.resolve(globalObject, new_client_js);
return promise.toJS();
}
pub const publish = compile.@"(...strings: string[])"("publish", "PUBLISH").call;
pub const script = compile.@"(...strings: string[])"("script", "SCRIPT").call;
pub const select = compile.@"(...strings: string[])"("select", "SELECT").call;
pub const spublish = compile.@"(...strings: string[])"("spublish", "SPUBLISH").call;
pub const smove = compile.@"(...strings: string[])"("smove", "SMOVE").call;
pub const substr = compile.@"(...strings: string[])"("substr", "SUBSTR").call;
pub const hstrlen = compile.@"(...strings: string[])"("hstrlen", "HSTRLEN").call;
pub const zrank = compile.@"(...strings: string[])"("zrank", "ZRANK").call;
pub const zrevrank = compile.@"(...strings: string[])"("zrevrank", "ZREVRANK").call;
pub const subscribe = compile.@"(...strings: string[])"("subscribe", "SUBSCRIBE").call;
pub const psubscribe = compile.@"(...strings: string[])"("psubscribe", "PSUBSCRIBE").call;
pub const unsubscribe = compile.@"(...strings: string[])"("unsubscribe", "UNSUBSCRIBE").call;
pub const punsubscribe = compile.@"(...strings: string[])"("punsubscribe", "PUNSUBSCRIBE").call;
pub const pubsub = compile.@"(...strings: string[])"("pubsub", "PUBSUB").call;
// publish(channel: RedisValue, message: RedisValue)
// script(subcommand: "LOAD", script: RedisValue)
// select(index: number | string)
// spublish(shardchannel: RedisValue, message: RedisValue)

View File

@@ -18,6 +18,16 @@ pub const ConnectionFlags = struct {
is_reconnecting: bool = false,
auto_pipelining: bool = true,
finalized: bool = false,
// This flag is a slight hack to allow returning the client instance in the
// promise which resolves when the connection is established. There are two
// modes through which a client may connect:
// 1. Connect through `client.connect()` which has the semantics of
// resolving the promise with the connection information.
// 2. Through `client.duplicate()` which creates a promise through
// `onConnect()` which resolves with the client instance itself.
// This flag is set to true in the latter case to indicate to the promise
// resolution delegation to resolve the promise with the client.
connection_promise_returns_client: bool = false,
};
/// Valkey connection status
@@ -106,6 +116,13 @@ pub const Address = union(enum) {
port: u16,
},
pub fn hostname(this: Address) []const u8 {
return switch (this) {
.unix => |unix_addr| return unix_addr,
.host => |h| return h.host,
};
}
pub fn connect(this: *const Address, client: *ValkeyClient, ctx: *bun.uws.SocketContext, is_tls: bool) !uws.AnySocket {
switch (is_tls) {
inline else => |tls| {
@@ -155,6 +172,7 @@ pub const ValkeyClient = struct {
username: []const u8 = "",
database: u32 = 0,
address: Address,
protocol: Protocol,
connection_strings: []u8 = &.{},
@@ -211,6 +229,8 @@ pub const ValkeyClient = struct {
}
this.allocator.free(this.connection_strings);
// Note there is no need to deallocate username, password and hostname since they are
// within the this.connection_strings buffer.
this.write_buffer.deinit(this.allocator);
this.read_buffer.deinit(this.allocator);
this.tls.deinit();
@@ -259,6 +279,7 @@ pub const ValkeyClient = struct {
.meta = command.meta,
.promise = command.promise,
}) catch |err| bun.handleOom(err);
this.parent().updateHasPendingActivity();
total += 1;
total_bytelength += command.serialized_data.len;
@@ -269,6 +290,7 @@ pub const ValkeyClient = struct {
bun.handleOom(this.write_buffer.byte_list.ensureUnusedCapacity(this.allocator, total_bytelength));
for (pipelineable_commands) |*command| {
bun.handleOom(this.write_buffer.write(this.allocator, command.serialized_data));
this.parent().updateHasPendingActivity();
// Free the serialized data since we've copied it to the write buffer
this.allocator.free(command.serialized_data);
}
@@ -354,6 +376,7 @@ pub const ValkeyClient = struct {
if (wrote > 0) {
this.write_buffer.consume(@intCast(wrote));
}
this.parent().updateHasPendingActivity();
return this.write_buffer.len() > 0;
}
@@ -383,14 +406,14 @@ pub const ValkeyClient = struct {
/// Mark the connection as failed with error message
pub fn fail(this: *ValkeyClient, message: []const u8, err: protocol.RedisError) void {
debug("failed: {s}: {s}", .{ message, @errorName(err) });
debug("failed: {s}: {}", .{ message, err });
if (this.status == .failed) return;
if (this.flags.finalized) {
// We can't run promises inside finalizers.
if (this.queue.count + this.in_flight.count > 0) {
const vm = this.vm;
const deferred_failrue = bun.new(DeferredFailure, .{
const deferred_failure = bun.new(DeferredFailure, .{
// This memory is not owned by us.
.message = bun.handleOom(bun.default_allocator.dupe(u8, message)),
@@ -401,7 +424,7 @@ pub const ValkeyClient = struct {
});
this.in_flight = .init(this.allocator);
this.queue = .init(this.allocator);
deferred_failrue.enqueue();
deferred_failure.enqueue();
}
// Allow the finalizer to call .close()
@@ -415,6 +438,7 @@ pub const ValkeyClient = struct {
pub fn failWithJSValue(this: *ValkeyClient, globalThis: *jsc.JSGlobalObject, jsvalue: jsc.JSValue) void {
this.status = .failed;
rejectAllPendingCommands(&this.in_flight, &this.queue, globalThis, this.allocator, jsvalue);
this.parent().updateHasPendingActivity();
if (!this.connectionReady()) {
this.flags.is_manually_closed = true;
@@ -463,6 +487,7 @@ pub const ValkeyClient = struct {
debug("reconnect in {d}ms (attempt {d}/{d})", .{ delay_ms, this.retry_attempts, this.max_retries });
this.status = .disconnected;
this.parent().updateHasPendingActivity();
this.flags.is_reconnecting = true;
this.flags.is_authenticated = false;
this.flags.is_selecting_db_internal = false;
@@ -498,6 +523,8 @@ pub const ValkeyClient = struct {
// Without auto pipelining, wait for in-flight to empty before draining
_ = this.drain();
}
this.parent().updateHasPendingActivity();
}
_ = this.flushData();
@@ -510,6 +537,7 @@ pub const ValkeyClient = struct {
// Path 1: Buffer already has data, append and process from buffer
if (this.read_buffer.remaining().len > 0) {
this.read_buffer.write(this.allocator, data) catch @panic("failed to write to read buffer");
this.parent().updateHasPendingActivity();
// Process as many complete messages from the buffer as possible
while (true) {
@@ -542,6 +570,7 @@ pub const ValkeyClient = struct {
}
this.read_buffer.consume(@truncate(bytes_consumed));
this.parent().updateHasPendingActivity();
var value_to_handle = value; // Use temp var for defer
this.handleResponse(&value_to_handle) catch |err| {
@@ -613,6 +642,55 @@ pub const ValkeyClient = struct {
// If the loop finishes, the entire 'data' was processed without needing the buffer.
}
/// Try handling this response as a subscriber-state response.
/// Returns `handled` if we handled it, `fallthrough` if we did not.
fn handleSubscribeResponse(this: *ValkeyClient, value: *protocol.RESPValue, pair: *ValkeyCommand.PromisePair) enum { handled, fallthrough } {
// Resolve the promise with the potentially transformed value
var promise_ptr = &pair.promise;
const globalThis = this.globalObject();
const loop = this.vm.eventLoop();
debug("Handling a subscribe response: {any}", .{value.*});
loop.enter();
defer loop.exit();
return switch (value.*) {
.Error => {
promise_ptr.reject(globalThis, value.toJS(globalThis));
return .handled;
},
.Push => |push| {
const p = this.parent();
const subs_ctx = p.getOrCreateSubscriptionCtxEnteringSubscriptionMode();
const sub_count = subs_ctx.subscriptionCount(globalThis);
if (std.mem.eql(u8, push.kind, "subscribe")) {
this.onValkeySubscribe(value);
promise_ptr.promise.resolve(globalThis, .jsNumber(sub_count));
return .handled;
} else if (std.mem.eql(u8, push.kind, "unsubscribe")) {
this.onValkeyUnsubscribe(value);
promise_ptr.promise.resolve(globalThis, .js_undefined);
return .handled;
} else {
// We should rarely reach this point. If we're guaranteed to be handling a subscribe/unsubscribe,
// then this is an unexpected path.
@branchHint(.cold);
this.fail(
"Push message is not a subscription message.",
protocol.RedisError.InvalidResponseType,
);
return .handled;
}
},
else => {
// This may be a regular command response. Let's pass it down
// to the next handler.
return .fallthrough;
},
};
}
fn handleHelloResponse(this: *ValkeyClient, value: *protocol.RESPValue) void {
debug("Processing HELLO response", .{});
@@ -624,6 +702,7 @@ pub const ValkeyClient = struct {
.SimpleString => |str| {
if (std.mem.eql(u8, str, "OK")) {
this.status = .connected;
this.parent().updateHasPendingActivity();
this.flags.is_authenticated = true;
this.onValkeyConnect(value);
return;
@@ -657,6 +736,7 @@ pub const ValkeyClient = struct {
// Authentication successful via HELLO
this.status = .connected;
this.parent().updateHasPendingActivity();
this.flags.is_authenticated = true;
this.onValkeyConnect(value);
return;
@@ -705,9 +785,64 @@ pub const ValkeyClient = struct {
},
};
}
// Let's load the promise pair.
var pair_maybe = this.in_flight.readItem();
// We handle subscriptions specially because they are not regular
// commands and their failure will potentially cause the client to drop
// out of subscriber mode.
if (this.parent().isSubscriber()) {
debug("This client is a subscriber. Handling as subscriber...", .{});
// There are multiple different commands we may receive in
// subscriber mode. One is from a client.subscribe() call which
// requires that a promise is in-flight, but otherwise, we may also
// receive push messages from the server that do not have an
// associated promise.
if (pair_maybe) |*pair| {
debug("There is a request in flight. Handling as a subscribe request...", .{});
if (this.handleSubscribeResponse(value, pair) == .handled) {
return;
}
}
switch (value.*) {
.Error => |err| {
this.fail(err, protocol.RedisError.InvalidResponse);
return;
},
.Push => |push| {
if (std.mem.eql(u8, push.kind, "message")) {
@branchHint(.likely);
debug("Received a message.", .{});
this.onValkeyMessage(push.data);
return;
} else if (std.mem.eql(u8, push.kind, "subscribe")) {
@branchHint(.cold);
debug("Received subscription message without promise: {any}", .{push.data});
return;
} else if (std.mem.eql(u8, push.kind, "unsubscribe")) {
@branchHint(.cold);
debug("Received unsubscribe message without promise: {any}", .{push.data});
return;
} else {
@branchHint(.cold);
this.fail("Unexpected push message kind without promise", protocol.RedisError.InvalidResponseType);
return;
}
},
else => {
// In the else case, we fall through to the regular
// handler. Subscribers can send .Push commands which have
// the same semantics as regular commands.
},
}
debug("Treating subscriber response as a regular command...", .{});
}
// For regular commands, get the next command+promise pair from the queue
var pair = this.in_flight.readItem() orelse {
var pair = pair_maybe orelse {
debug("Received response but no promise in queue", .{});
return;
};
@@ -829,6 +964,7 @@ pub const ValkeyClient = struct {
.meta = offline_cmd.meta,
.promise = offline_cmd.promise,
}) catch |err| bun.handleOom(err);
this.parent().updateHasPendingActivity();
const data = offline_cmd.serialized_data;
if (this.connectionReady() and this.write_buffer.remaining().len == 0) {
@@ -841,6 +977,7 @@ pub const ValkeyClient = struct {
if (unwritten.len > 0) {
// Handle incomplete write.
bun.handleOom(this.write_buffer.write(this.allocator, unwritten));
this.parent().updateHasPendingActivity();
}
return true;
@@ -905,6 +1042,7 @@ pub const ValkeyClient = struct {
// Add to queue with command type
try this.in_flight.writeItem(cmd_pair);
this.parent().updateHasPendingActivity();
_ = this.flushData();
}
@@ -949,6 +1087,7 @@ pub const ValkeyClient = struct {
this.unregisterAutoFlusher();
if (this.status == .connected or this.status == .connecting) {
this.status = .disconnected;
this.parent().updateHasPendingActivity();
this.close();
}
}
@@ -961,6 +1100,7 @@ pub const ValkeyClient = struct {
/// Write data to the socket buffer
fn write(this: *ValkeyClient, data: []const u8) !usize {
try this.write_buffer.write(this.allocator, data);
this.parent().updateHasPendingActivity();
return data.len;
}
@@ -985,6 +1125,18 @@ pub const ValkeyClient = struct {
this.parent().onValkeyConnect(value);
}
pub fn onValkeySubscribe(this: *ValkeyClient, value: *protocol.RESPValue) void {
this.parent().onValkeySubscribe(value);
}
pub fn onValkeyUnsubscribe(this: *ValkeyClient, value: *protocol.RESPValue) void {
this.parent().onValkeyUnsubscribe(value);
}
pub fn onValkeyMessage(this: *ValkeyClient, value: []protocol.RESPValue) void {
this.parent().onValkeyMessage(value);
}
pub fn onValkeyReconnect(this: *ValkeyClient) void {
this.parent().onValkeyReconnect();
}
@@ -999,9 +1151,9 @@ pub const ValkeyClient = struct {
};
// Auto-pipelining
const debug = bun.Output.scoped(.Redis, .visible);
const ValkeyCommand = @import("./ValkeyCommand.zig");
const protocol = @import("./valkey_protocol.zig");
const std = @import("std");

View File

@@ -0,0 +1,31 @@
import { expectType } from "./utilities";
expectType(Bun.redis.publish("hello", "world")).is<Promise<number>>();
const copy = await Bun.redis.duplicate();
expectType(copy.connected).is<boolean>();
expectType(copy).is<Bun.RedisClient>();
const listener: Bun.RedisClient.StringPubSubListener = (message, channel) => {
expectType(message).is<string>();
expectType(channel).is<string>();
};
Bun.redis.subscribe("hello", listener);
// Buffer subscriptions are not yet implemented
// const bufferListener: Bun.RedisClient.BufferPubSubListener = (message, channel) => {
// expectType(message).is<Uint8Array<ArrayBuffer>>();
// expectType(channel).is<string>();
// };
// Bun.redis.subscribe("hello", bufferListener);
expectType(
copy.subscribe("hello", message => {
expectType(message).is<string>();
}),
).is<Promise<number>>();
await copy.unsubscribe();
await copy.unsubscribe("hello");
expectType(copy.unsubscribe("hello", () => {})).is<Promise<void>>();

View File

@@ -455,9 +455,9 @@ import { tmpdir } from "os";
* Create a new client with specific connection type
*/
export function createClient(
connectionType: ConnectionType = ConnectionType.TCP,
customOptions = {},
dbId: number | undefined = undefined,
connectionType: ConnectionType = ConnectionType.TCP,
customOptions = {},
dbId: number | undefined = undefined,
) {
let url: string;
const mkUrl = (baseUrl: string) => dbId ? `${baseUrl}/${dbId}`: baseUrl;
@@ -765,6 +765,14 @@ async function getRedisContainerName(): Promise<string> {
/**
* Restart the Redis container to simulate connection drop
*
* Restarts the container identified by the test harness and waits briefly for it
* to come back online (approximately 2 seconds). Use this to simulate a server
* restart or connection drop during tests.
*
* @returns A promise that resolves when the restart and short wait complete.
* @throws If the Docker restart command exits with a non-zero code; the error
* message includes the container's stderr output.
*/
export async function restartRedisContainer(): Promise<void> {
const containerName = await getRedisContainerName();
@@ -789,3 +797,10 @@ export async function restartRedisContainer(): Promise<void> {
console.log(`Redis container restarted: ${containerName}`);
}
/**
* @returns true or false with approximately equal probability
*/
export function randomCoinFlip(): boolean {
return Math.floor(Math.random() * 2) == 0;
}

View File

@@ -1,6 +1,14 @@
import { randomUUIDv7, RedisClient } from "bun";
import { randomUUIDv7, RedisClient, sleep } from "bun";
import { beforeEach, describe, expect, test } from "bun:test";
import { ConnectionType, createClient, ctx, DEFAULT_REDIS_URL, expectType, isEnabled } from "./test-utils";
import {
ConnectionType,
createClient,
ctx,
DEFAULT_REDIS_URL,
expectType,
isEnabled,
randomCoinFlip,
} from "./test-utils";
describe.skipIf(!isEnabled)("Valkey Redis Client", () => {
beforeEach(async () => {
@@ -12,6 +20,12 @@ describe.skipIf(!isEnabled)("Valkey Redis Client", () => {
await ctx.redis.send("FLUSHALL", ["SYNC"]);
});
const connectedRedis = async () => {
const redis = new RedisClient(DEFAULT_REDIS_URL);
await redis.connect();
return redis;
};
describe("Basic Operations", () => {
test("should set and get strings", async () => {
const redis = ctx.redis;
@@ -209,4 +223,566 @@ describe.skipIf(!isEnabled)("Valkey Redis Client", () => {
expect(valueAfterStop).toBe(TEST_VALUE);
});
});
describe("PUB/SUB", () => {
const testChannel = "test-channel";
const testKey = "test-key";
const testValue = "test-value";
const testMessage = "test-message";
const flushTimeoutMs = 300;
test("publishing to a channel does not fail", async () => {
const redis = await connectedRedis();
// no subs
expect(await redis.publish(testChannel, testMessage)).toBe(0);
});
test("setting in subscriber mode gracefully fails", async () => {
const redis = await connectedRedis();
await redis.subscribe(testChannel, () => {});
expect(() => redis.set(testKey, testValue)).toThrow(
"RedisClient.prototype.set cannot be called while in subscriber mode",
);
// Clean up subscription
await redis.unsubscribe(testChannel);
});
test("setting after unsubscribing works", async () => {
const redis = await connectedRedis();
await redis.subscribe(testChannel, () => {});
await redis.unsubscribe(testChannel);
expect(redis.set(testKey, testValue)).resolves.toEqual("OK");
});
test("subscribing to a channel receives messages", async () => {
const TEST_MESSAGE_COUNT = 128;
const redis = await connectedRedis();
const subscriber = await connectedRedis();
var receiveCount = 0;
await subscriber.subscribe(testChannel, (message, channel) => {
receiveCount++;
expect(channel).toBe(testChannel);
expect(message).toBe(testMessage);
});
Array.from({ length: TEST_MESSAGE_COUNT }).forEach(async () => {
expect(await redis.publish(testChannel, testMessage)).toBe(1);
});
// Wait a little bit just to ensure all the messages are flushed.
await sleep(flushTimeoutMs);
expect(receiveCount).toBe(TEST_MESSAGE_COUNT);
await subscriber.unsubscribe(testChannel);
});
test("messages are received in order", async () => {
const TEST_MESSAGE_COUNT = 1024;
const redis = await connectedRedis();
const subscriber = await connectedRedis();
var receivedMessages: string[] = [];
await subscriber.subscribe(testChannel, message => {
receivedMessages.push(message);
});
var sentMessages: string[] = [];
Array.from({ length: TEST_MESSAGE_COUNT }).forEach(async () => {
const message = randomUUIDv7();
expect(await redis.publish(testChannel, message)).toBe(1);
sentMessages.push(message);
});
// Wait a little bit just to ensure all the messages are flushed.
await sleep(flushTimeoutMs);
expect(receivedMessages.length).toBe(sentMessages.length);
expect(receivedMessages).toEqual(sentMessages);
await subscriber.unsubscribe(testChannel);
});
test("subscribing to multiple channels receives messages", async () => {
const TEST_MESSAGE_COUNT = 128;
const redis = await connectedRedis();
const subscriber = await connectedRedis();
const channels = [testChannel, "another-test-channel"];
var receivedMessages: { [channel: string]: string[] } = {};
await subscriber.subscribe(channels, (message, channel) => {
receivedMessages[channel] = receivedMessages[channel] || [];
receivedMessages[channel].push(message);
});
var sentMessages: { [channel: string]: string[] } = {};
for (let i = 0; i < TEST_MESSAGE_COUNT; i++) {
const channel = channels[randomCoinFlip() ? 0 : 1];
const message = randomUUIDv7();
expect(await redis.publish(channel, message)).toBe(1);
sentMessages[channel] = sentMessages[channel] || [];
sentMessages[channel].push(message);
}
// Wait a little bit just to ensure all the messages are flushed.
await sleep(flushTimeoutMs);
// Check that we received messages on both channels
expect(Object.keys(receivedMessages).sort()).toEqual(Object.keys(sentMessages).sort());
// Check messages match for each channel
for (const channel of channels) {
if (sentMessages[channel]) {
expect(receivedMessages[channel]).toEqual(sentMessages[channel]);
}
}
await subscriber.unsubscribe(channels);
});
test("unsubscribing from specific channels while remaining subscribed to others", async () => {
const channel1 = "channel-1";
const channel2 = "channel-2";
const channel3 = "channel-3";
const redis = await connectedRedis();
const subscriber = await connectedRedis();
let receivedMessages: { [channel: string]: string[] } = {};
// Subscribe to three channels
await subscriber.subscribe([channel1, channel2, channel3], (message, channel) => {
receivedMessages[channel] = receivedMessages[channel] || [];
receivedMessages[channel].push(message);
});
// Send initial messages to all channels
expect(await redis.publish(channel1, "msg1-before")).toBe(1);
expect(await redis.publish(channel2, "msg2-before")).toBe(1);
expect(await redis.publish(channel3, "msg3-before")).toBe(1);
await sleep(flushTimeoutMs);
// Unsubscribe from channel2
await subscriber.unsubscribe(channel2);
// Send messages after unsubscribing from channel2
expect(await redis.publish(channel1, "msg1-after")).toBe(1);
expect(await redis.publish(channel2, "msg2-after")).toBe(0);
expect(await redis.publish(channel3, "msg3-after")).toBe(1);
await sleep(flushTimeoutMs);
// Check we received messages only on subscribed channels
expect(receivedMessages[channel1]).toEqual(["msg1-before", "msg1-after"]);
expect(receivedMessages[channel2]).toEqual(["msg2-before"]); // No "msg2-after"
expect(receivedMessages[channel3]).toEqual(["msg3-before", "msg3-after"]);
await subscriber.unsubscribe([channel1, channel3]);
});
test("subscribing to the same channel multiple times", async () => {
const redis = await connectedRedis();
const subscriber = await connectedRedis();
const channel = "duplicate-channel";
let callCount = 0;
const listener = () => {
callCount++;
};
let callCount2 = 0;
const listener2 = () => {
callCount2++;
};
// Subscribe to the same channel twice
await subscriber.subscribe(channel, listener);
await subscriber.subscribe(channel, listener2);
// Publish a single message
expect(await redis.publish(channel, "test-message")).toBe(1);
await sleep(flushTimeoutMs);
// Both listeners should have been called once.
expect(callCount).toBe(1);
expect(callCount2).toBe(1);
await subscriber.unsubscribe(channel);
});
test("empty string messages", async () => {
const redis = await connectedRedis();
const channel = "empty-message-channel";
const subscriber = await connectedRedis();
let receivedMessage: string | undefined = undefined;
await subscriber.subscribe(channel, message => {
receivedMessage = message;
});
expect(await redis.publish(channel, "")).toBe(1);
await sleep(flushTimeoutMs);
expect(receivedMessage).not.toBeUndefined();
expect(receivedMessage!).toBe("");
await subscriber.unsubscribe(channel);
});
test("special characters in channel names", async () => {
const redis = await connectedRedis();
const subscriber = await connectedRedis();
const specialChannels = [
"channel:with:colons",
"channel with spaces",
"channel-with-unicode-😀",
"channel[with]brackets",
"channel@with#special$chars",
];
for (const channel of specialChannels) {
let received = false;
await subscriber.subscribe(channel, () => {
received = true;
});
expect(await redis.publish(channel, "test")).toBe(1);
await sleep(flushTimeoutMs);
expect(received).toBe(true);
await subscriber.unsubscribe(channel);
}
});
test("ping works in subscription mode", async () => {
const redis = await connectedRedis();
const channel = "ping-test-channel";
await redis.subscribe(channel, () => {});
// Ping should work in subscription mode
const pong = await redis.ping();
expect(pong).toBe("PONG");
const customPing = await redis.ping("hello");
expect(customPing).toBe("hello");
await redis.unsubscribe(channel);
});
test("publish does not work from a subscribed client", async () => {
const redis = await connectedRedis();
const channel = "self-publish-channel";
await redis.subscribe(channel, () => {});
// Publishing from the same client should work
expect(async () => redis.publish(channel, "self-published")).toThrow();
await sleep(flushTimeoutMs);
await redis.unsubscribe(channel);
});
test("complete unsubscribe restores normal command mode", async () => {
const redis = await connectedRedis();
const channel = "restore-test-channel";
const testKey = "restore-test-key";
await redis.subscribe(channel, () => {});
// Should fail in subscription mode
expect(() => redis.set(testKey, testValue)).toThrow(
"RedisClient.prototype.set cannot be called while in subscriber mode.",
);
// Unsubscribe from all channels
await redis.unsubscribe(channel);
// Should work after unsubscribing
const result = await redis.set(testKey, "value");
expect(result).toBe("OK");
const value = await redis.get(testKey);
expect(value).toBe("value");
});
test("publishing without subscribers succeeds", async () => {
const redis = await connectedRedis();
const channel = "no-subscribers-channel";
// Publishing without subscribers should not throw
expect(await redis.publish(channel, "message")).toBe(0);
});
test("unsubscribing from non-subscribed channels", async () => {
const redis = await connectedRedis();
const channel = "never-subscribed-channel";
expect(() => redis.unsubscribe(channel)).toThrow(
"RedisClient.prototype.unsubscribe can only be called while in subscriber mode.",
);
});
test("callback errors don't crash the client", async () => {
const redis = await connectedRedis();
const channel = "error-callback-channel";
const subscriber = await connectedRedis();
let messageCount = 0;
await subscriber.subscribe(channel, () => {
messageCount++;
if (messageCount === 2) {
throw new Error("Intentional callback error");
}
});
// Send multiple messages
expect(await redis.publish(channel, "message1")).toBe(1);
expect(await redis.publish(channel, "message2")).toBe(1);
expect(await redis.publish(channel, "message3")).toBe(1);
await sleep(flushTimeoutMs);
expect(messageCount).toBe(3);
await subscriber.unsubscribe(channel);
});
test("subscriptions return correct counts", async () => {
const subscriber = await connectedRedis();
expect(await subscriber.subscribe("chan1", () => {})).toBe(1);
expect(await subscriber.subscribe("chan2", () => {})).toBe(2);
await subscriber.unsubscribe();
});
test("unsubscribing from listeners", async () => {
const redis = await connectedRedis();
const channel = "error-callback-channel";
const subscriber = await connectedRedis();
let messageCount1 = 0;
const listener1 = () => {
messageCount1++;
};
await subscriber.subscribe(channel, listener1);
let messageCount2 = 0;
const listener2 = () => {
messageCount2++;
};
await subscriber.subscribe(channel, listener2);
await redis.publish(channel, "message1");
await sleep(flushTimeoutMs);
expect(messageCount1).toBe(1);
expect(messageCount2).toBe(1);
await subscriber.unsubscribe(channel, listener2);
await redis.publish(channel, "message1");
await sleep(flushTimeoutMs);
expect(messageCount1).toBe(2);
expect(messageCount2).toBe(1);
await subscriber.unsubscribe();
await redis.publish(channel, "message1");
await sleep(flushTimeoutMs);
expect(messageCount1).toBe(2);
expect(messageCount2).toBe(1);
});
});
describe("duplicate()", () => {
test("should create duplicate of unconnected client that remains unconnected", async () => {
const redis = new RedisClient(DEFAULT_REDIS_URL);
expect(redis.connected).toBe(false);
const duplicate = await redis.duplicate();
expect(duplicate.connected).toBe(false);
expect(duplicate).not.toBe(redis);
});
test("should create duplicate of connected client that gets connected", async () => {
const redis = await connectedRedis();
const duplicate = await redis.duplicate();
expect(duplicate.connected).toBe(true);
expect(duplicate).not.toBe(redis);
// Both should work independently
await redis.set("test-original", "original-value");
await duplicate.set("test-duplicate", "duplicate-value");
expect(await redis.get("test-duplicate")).toBe("duplicate-value");
expect(await duplicate.get("test-original")).toBe("original-value");
duplicate.close();
});
test("should create duplicate of manually closed client that remains closed", async () => {
const redis = new RedisClient(DEFAULT_REDIS_URL);
await redis.connect();
redis.close?.();
expect(redis.connected).toBe(false);
const duplicate = await redis.duplicate();
expect(duplicate.connected).toBe(false);
});
test("should preserve connection configuration in duplicate", async () => {
const redis = new RedisClient(DEFAULT_REDIS_URL);
await redis.connect();
const duplicate = await redis.duplicate();
// Both clients should be able to perform the same operations
const testKey = `duplicate-config-test-${randomUUIDv7().substring(0, 8)}`;
const testValue = "test-value";
await redis.set(testKey, testValue);
const retrievedValue = await duplicate.get(testKey);
expect(retrievedValue).toBe(testValue);
duplicate.close?.();
});
test("should allow duplicate to work independently from original", async () => {
const redis = new RedisClient(DEFAULT_REDIS_URL);
await redis.connect();
const duplicate = await redis.duplicate();
// Close original, duplicate should still work
redis.close?.();
const testKey = `independent-test-${randomUUIDv7().substring(0, 8)}`;
const testValue = "independent-value";
await duplicate.set(testKey, testValue);
const retrievedValue = await duplicate.get(testKey);
expect(retrievedValue).toBe(testValue);
duplicate.close?.();
});
test("should handle duplicate of client in subscriber mode", async () => {
const redis = await connectedRedis();
const testChannel = "test-subscriber-duplicate";
// Put original client in subscriber mode
await redis.subscribe(testChannel, () => {});
const duplicate = await redis.duplicate();
// Duplicate should not be in subscriber mode
expect(() => duplicate.set("test-key", "test-value")).not.toThrow();
await redis.unsubscribe(testChannel);
duplicate.close?.();
});
test("should create multiple duplicates from same client", async () => {
const redis = new RedisClient(DEFAULT_REDIS_URL);
await redis.connect();
const duplicate1 = await redis.duplicate();
const duplicate2 = await redis.duplicate();
const duplicate3 = await redis.duplicate();
// All should be connected
expect(duplicate1.connected).toBe(true);
expect(duplicate2.connected).toBe(true);
expect(duplicate3.connected).toBe(true);
// All should work independently
const testKey = `multi-duplicate-test-${randomUUIDv7().substring(0, 8)}`;
await duplicate1.set(`${testKey}-1`, "value-1");
await duplicate2.set(`${testKey}-2`, "value-2");
await duplicate3.set(`${testKey}-3`, "value-3");
expect(await duplicate1.get(`${testKey}-1`)).toBe("value-1");
expect(await duplicate2.get(`${testKey}-2`)).toBe("value-2");
expect(await duplicate3.get(`${testKey}-3`)).toBe("value-3");
// Cross-check: each duplicate can read what others wrote
expect(await duplicate1.get(`${testKey}-2`)).toBe("value-2");
expect(await duplicate2.get(`${testKey}-3`)).toBe("value-3");
expect(await duplicate3.get(`${testKey}-1`)).toBe("value-1");
duplicate1.close?.();
duplicate2.close?.();
duplicate3.close?.();
redis.close?.();
});
test("should duplicate client that failed to connect", async () => {
// Create client with invalid credentials to force connection failure
const url = new URL(DEFAULT_REDIS_URL);
url.username = "invaliduser";
url.password = "invalidpassword";
const failedRedis = new RedisClient(url.toString());
// Try to connect and expect it to fail
let connectionFailed = false;
try {
await failedRedis.connect();
} catch {
connectionFailed = true;
}
expect(connectionFailed).toBe(true);
expect(failedRedis.connected).toBe(false);
// Duplicate should also remain unconnected
const duplicate = await failedRedis.duplicate();
expect(duplicate.connected).toBe(false);
});
test("should handle duplicate timing with concurrent operations", async () => {
const redis = new RedisClient(DEFAULT_REDIS_URL);
await redis.connect();
// Start some operations on the original client
const testKey = `concurrent-test-${randomUUIDv7().substring(0, 8)}`;
const originalOperation = redis.set(testKey, "original-value");
// Create duplicate while operation is in flight
const duplicate = await redis.duplicate();
// Wait for original operation to complete
await originalOperation;
// Duplicate should be able to read the value
expect(await duplicate.get(testKey)).toBe("original-value");
duplicate.close?.();
redis.close?.();
});
});
});