Compare commits

...

34 Commits

Author SHA1 Message Date
Marko Vejnovic
03165707a9 Callback lifetime 2025-11-17 16:55:37 -08:00
Marko Vejnovic
49d855d357 before claude 2025-11-04 14:53:33 -08:00
Marko Vejnovic
cd2b0b3496 Redo subscribe/unsubscribe to support out-of-band 2025-11-04 14:29:08 -08:00
Marko Vejnovic
d0acbffdbd implement basic unsubscribe 2025-11-03 14:38:14 -08:00
Marko Vejnovic
8f2d8b569f implement subscribe!! 2025-11-03 11:58:27 -08:00
Marko Vejnovic
a32d8a5c4e add subscription api 2025-11-03 10:37:11 -08:00
Marko Vejnovic
046a682ba5 Implement subscribe/unsubscribe 2025-10-28 22:23:48 -07:00
Marko Vejnovic
6f695a6637 Inch closer towards PUB/SUB 2025-10-28 13:55:24 -07:00
Marko Vejnovic
dea79c57af Start working on subscriptions 2025-10-24 22:01:37 -07:00
Marko Vejnovic
a681ad9bda Fix connection errors 2025-10-24 19:57:34 -07:00
Marko Vejnovic
cfdd2dd373 feat(valkey.zig): Add close() support 2025-10-24 14:59:16 -07:00
Marko Vejnovic
15d089d32f Implement onconnect 2025-10-09 12:20:59 -07:00
Marko Vejnovic
cd0a56989a Add support for onconnect 2025-10-08 14:11:32 -07:00
Marko Vejnovic
f8ce752ce9 Some more sadd tests 2025-10-08 12:17:05 -07:00
Marko Vejnovic
feb0765779 Implement bufferedAmount 2025-10-08 11:52:28 -07:00
Marko Vejnovic
9aa2a53be1 Implement expire 2025-10-08 10:49:04 -07:00
Marko Vejnovic
0fe36fff1e Implement smove, spop and spush 2025-10-08 10:38:04 -07:00
Marko Vejnovic
5fe16534ee Implement sadd 2025-10-08 10:27:16 -07:00
Marko Vejnovic
ad2e6039d6 Implement getBuffer 2025-10-08 09:55:37 -07:00
Marko Vejnovic
4f97832dff Fix exists 2025-10-08 09:37:48 -07:00
autofix-ci[bot]
eb55d19839 [autofix.ci] apply automated fixes 2025-10-04 04:18:49 +00:00
Marko Vejnovic
5ed5e1375b Add more commands 2025-10-03 21:16:32 -07:00
Marko Vejnovic
12b15312b0 Implement almost all the methods 2025-10-03 17:00:05 -07:00
Marko Vejnovic
cc2c20a76c Autoflushing 2025-10-03 12:07:35 -07:00
Marko Vejnovic
175dd0a762 95% of auto flushing 2025-10-02 22:25:35 -07:00
Marko Vejnovic
cef06659e0 Fix slow tests 2025-10-02 20:24:06 -07:00
Marko Vejnovic
507b7675dd Implement sending 0-arity commands 2025-10-02 18:59:14 -07:00
Marko Vejnovic
a64dc8da4b Fix connecting 2025-10-02 14:58:30 -07:00
Marko Vejnovic
a6096ccbfc Write some stronger tests 2025-10-01 15:58:21 -07:00
Marko Vejnovic
cb6bb2c6f0 Fix URL parsing 2025-09-30 22:19:30 -07:00
Marko Vejnovic
f7c386bfea Fix use-after-null 2025-09-30 21:57:23 -07:00
Marko Vejnovic
af6b9c02da Cleanup connection error handling 2025-09-30 21:37:21 -07:00
Marko Vejnovic
47918654d6 Get builds passing, for now... 2025-09-30 21:33:11 -07:00
Marko Vejnovic
31baea524e Scaffold Valkey2 2025-09-30 16:35:31 -07:00
30 changed files with 12882 additions and 1 deletions

View File

@@ -3339,6 +3339,9 @@ declare module "bun" {
): Promise<[string, [string, number][]] | null>;
}
// TODO(markovejnovic): Delete
export const RedisClient2 = RedisClient;
/**
* Default Redis client
*

View File

@@ -49,6 +49,7 @@ pub const ResolveMessage = @import("./ResolveMessage.zig").ResolveMessage;
pub const Shell = @import("../shell/shell.zig");
pub const UDPSocket = @import("./api/bun/udp_socket.zig").UDPSocket;
pub const Valkey = @import("../valkey/js_valkey.zig").JSValkeyClient;
pub const Valkey2 = @import("../valkey2/js_valkey.zig").JsValkey;
pub const BlockList = @import("./node/net/BlockList.zig");
pub const NativeZstd = @import("./node/zlib/NativeZstd.zig");

View File

@@ -77,6 +77,7 @@ pub const BunObject = struct {
pub const s3 = toJSLazyPropertyCallback(Bun.getS3DefaultClient);
pub const ValkeyClient = toJSLazyPropertyCallback(Bun.getValkeyClientConstructor);
pub const valkey = toJSLazyPropertyCallback(Bun.getValkeyDefaultClient);
pub const ValkeyClient2 = toJSLazyPropertyCallback(Bun.getValkeyClient2Constructor);
// --- Lazy property callbacks ---
// --- Getters ---
@@ -143,6 +144,7 @@ pub const BunObject = struct {
@export(&BunObject.s3, .{ .name = lazyPropertyCallbackName("s3") });
@export(&BunObject.ValkeyClient, .{ .name = lazyPropertyCallbackName("ValkeyClient") });
@export(&BunObject.valkey, .{ .name = lazyPropertyCallbackName("valkey") });
@export(&BunObject.ValkeyClient2, .{ .name = lazyPropertyCallbackName("ValkeyClient2") });
// --- Lazy property callbacks ---
// --- Callbacks ---
@@ -1323,6 +1325,10 @@ pub fn getValkeyClientConstructor(globalThis: *jsc.JSGlobalObject, _: *jsc.JSObj
return jsc.API.Valkey.js.getConstructor(globalThis);
}
pub fn getValkeyClient2Constructor(globalThis: *jsc.JSGlobalObject, _: *jsc.JSObject) jsc.JSValue {
return jsc.API.Valkey2.js.getConstructor(globalThis);
}
pub fn getEmbeddedFiles(globalThis: *jsc.JSGlobalObject, _: *jsc.JSObject) bun.JSError!jsc.JSValue {
const vm = globalThis.bunVM();
const graph = vm.standalone_module_graph orelse return try jsc.JSValue.createEmptyArray(globalThis, 0);

View File

@@ -0,0 +1,188 @@
import { define } from "../../codegen/class-definitions";
export default [
define({
name: "RedisClient2",
construct: true,
constructNeedsThis: true,
call: false,
finalize: true,
configurable: false,
JSType: "0b11101111",
memoryCost: true,
proto: {
append: { fn: "append" },
bitcount: { fn: "bitcount" },
blmove: { fn: "blmove", length: 5 },
blmpop: { fn: "blmpop", length: 3 },
blpop: { fn: "blpop", length: 2 },
brpop: { fn: "brpop", length: 2 },
brpoplpush: { fn: "brpoplpush", length: 3 },
bufferedAmount: { getter: "getBufferedAmount" },
bzmpop: { fn: "bzmpop", length: 3 },
bzpopmax: { fn: "bzpopmax", length: 2 },
bzpopmin: { fn: "bzpopmin", length: 2 },
close: { fn: "close", length: 0 },
connect: { fn: "connect", length: 0 },
connected: { getter: "getConnected" },
copy: { fn: "copy" },
decr: { fn: "decr", length: 1 },
decrby: { fn: "decrby", length: 2 },
del: { fn: "del", length: 1 },
dump: { fn: "dump" },
//duplicate: { fn: "duplicate" },
exists: { fn: "exists", length: 1 },
expire: { fn: "expire", length: 2 },
expireat: { fn: "expireat", length: 2 },
expiretime: { fn: "expiretime" },
get: { fn: "get", length: 1 },
getBuffer: { fn: "getBuffer", length: 1 },
getbit: { fn: "getbit", length: 2 },
getdel: { fn: "getdel" },
getex: { fn: "getex" },
getrange: { fn: "getrange", length: 3 },
getset: { fn: "getset" },
hdel: { fn: "hdel", length: 2 },
hexists: { fn: "hexists", length: 2 },
hexpire: { fn: "hexpire", length: 3 },
hexpireat: { fn: "hexpireat", length: 3 },
hexpiretime: { fn: "hexpiretime", length: 2 },
hget: { fn: "hget", length: 2 },
hgetall: { fn: "hgetall" },
hgetdel: { fn: "hgetdel", length: 2 },
hgetex: { fn: "hgetex", length: 2 },
hincrby: { fn: "hincrby", length: 3 },
hincrbyfloat: { fn: "hincrbyfloat", length: 3 },
hkeys: { fn: "hkeys" },
hlen: { fn: "hlen" },
hmget: { fn: "hmget", length: 2 },
hmset: { fn: "hmset", length: 2 },
hpersist: { fn: "hpersist", length: 2 },
hpexpire: { fn: "hpexpire", length: 3 },
hpexpireat: { fn: "hpexpireat", length: 3 },
hpexpiretime: { fn: "hpexpiretime", length: 2 },
hpttl: { fn: "hpttl", length: 2 },
hrandfield: { fn: "hrandfield", length: 1 },
hscan: { fn: "hscan", length: 2 },
hset: { fn: "hset", length: 2 },
hsetex: { fn: "hsetex", length: 3 },
hsetnx: { fn: "hsetnx", length: 3 },
hstrlen: { fn: "hstrlen" },
httl: { fn: "httl", length: 2 },
hvals: { fn: "hvals" },
incr: { fn: "incr", length: 1 },
incrby: { fn: "incrby", length: 2 },
incrbyfloat: { fn: "incrbyfloat", length: 2 },
keys: { fn: "keys" },
lindex: { fn: "lindex", length: 2 },
linsert: { fn: "linsert", length: 4 },
llen: { fn: "llen" },
lmove: { fn: "lmove", length: 4 },
lmpop: { fn: "lmpop", length: 2 },
lpop: { fn: "lpop" },
lpos: { fn: "lpos", length: 2 },
lpush: { fn: "lpush" },
lpushx: { fn: "lpushx" },
lrange: { fn: "lrange", length: 3 },
lrem: { fn: "lrem", length: 3 },
lset: { fn: "lset", length: 3 },
ltrim: { fn: "ltrim", length: 3 },
mget: { fn: "mget" },
mset: { fn: "mset" },
msetnx: { fn: "msetnx" },
//onclose: { getter: "getOnClose", setter: "setOnClose", this: true, cache: true },
onconnect: { getter: "getOnConnect", setter: "setOnConnect", this: true, cache: true },
persist: { fn: "persist" },
pexpire: { fn: "pexpire", length: 2 },
pexpireat: { fn: "pexpireat", length: 2 },
pexpiretime: { fn: "pexpiretime" },
pfadd: { fn: "pfadd" },
ping: { fn: "ping" },
psetex: { fn: "psetex", length: 3 },
//psubscribe: { fn: "psubscribe" },
pttl: { fn: "pttl" },
publish: { fn: "publish" },
pubsub: { fn: "pubsub" },
//punsubscribe: { fn: "punsubscribe" },
randomkey: { fn: "randomkey", length: 0 },
rename: { fn: "rename", length: 2 },
renamenx: { fn: "renamenx", length: 2 },
rpop: { fn: "rpop" },
rpoplpush: { fn: "rpoplpush", length: 2 },
rpush: { fn: "rpush" },
rpushx: { fn: "rpushx" },
scan: { fn: "scan" },
scard: { fn: "scard" },
script: { fn: "script" },
sdiff: { fn: "sdiff", length: 1 },
sdiffstore: { fn: "sdiffstore", length: 2 },
select: { fn: "select" },
send: { fn: "send", length: 2 },
set: { fn: "set", length: 2 },
setbit: { fn: "setbit", length: 3 },
setex: { fn: "setex", length: 3 },
setnx: { fn: "setnx" },
setrange: { fn: "setrange", length: 3 },
sinter: { fn: "sinter", length: 1 },
sintercard: { fn: "sintercard", length: 1 },
sinterstore: { fn: "sinterstore", length: 2 },
sadd: { fn: "sadd", length: 2 },
sismember: { fn: "sismember", length: 2 },
smembers: { fn: "smembers", length: 1 },
smismember: { fn: "smismember", length: 2 },
smove: { fn: "smove" },
spop: { fn: "spop", length: 1 },
spublish: { fn: "spublish" },
srandmember: { fn: "srandmember", length: 1 },
srem: { fn: "srem", length: 2 },
sscan: { fn: "sscan", length: 2 },
strlen: { fn: "strlen" },
subscribe: { fn: "subscribe" },
substr: { fn: "substr" },
sunion: { fn: "sunion", length: 1 },
sunionstore: { fn: "sunionstore", length: 2 },
touch: { fn: "touch" },
ttl: { fn: "ttl", length: 1 },
type: { fn: "type", length: 1 },
unlink: { fn: "unlink" },
unsubscribe: { fn: "unsubscribe" },
zadd: { fn: "zadd", length: 3 },
zcard: { fn: "zcard" },
zcount: { fn: "zcount", length: 3 },
zdiff: { fn: "zdiff", length: 1 },
zdiffstore: { fn: "zdiffstore", length: 2 },
zincrby: { fn: "zincrby", length: 3 },
zinter: { fn: "zinter", length: 2 },
zintercard: { fn: "zintercard", length: 1 },
zinterstore: { fn: "zinterstore", length: 3 },
zlexcount: { fn: "zlexcount", length: 3 },
zmpop: { fn: "zmpop", length: 2 },
zmscore: { fn: "zmscore" },
zpopmax: { fn: "zpopmax" },
zpopmin: { fn: "zpopmin" },
zrandmember: { fn: "zrandmember" },
zrange: { fn: "zrange", length: 3 },
zrangebylex: { fn: "zrangebylex", length: 3 },
zrangebyscore: { fn: "zrangebyscore", length: 3 },
zrangestore: { fn: "zrangestore", length: 4 },
zrank: { fn: "zrank" },
zrem: { fn: "zrem", length: 2 },
zremrangebylex: { fn: "zremrangebylex", length: 3 },
zremrangebyrank: { fn: "zremrangebyrank", length: 3 },
zremrangebyscore: { fn: "zremrangebyscore", length: 3 },
zrevrange: { fn: "zrevrange", length: 3 },
zrevrangebylex: { fn: "zrevrangebylex", length: 3 },
zrevrangebyscore: { fn: "zrevrangebyscore", length: 3 },
zrevrank: { fn: "zrevrank" },
zscan: { fn: "zscan", length: 2 },
zscore: { fn: "zscore" },
zunion: { fn: "zunion", length: 2 },
zunionstore: { fn: "zunionstore", length: 3 },
},
//values: ["onclose", "hello", "subscriptionCallbackMap"],
values: [
"connectionPromise", // Used to track the progress of the connection. Not exposed to JS.
"pushCallbacks", // Set of subscription handlers to keep them alive for GC
],
}),
];

View File

@@ -21,6 +21,7 @@
macro(YAML) \
macro(Transpiler) \
macro(ValkeyClient) \
macro(ValkeyClient2) \
macro(argv) \
macro(assetPrefix) \
macro(cwd) \

View File

@@ -800,6 +800,7 @@ JSC_DEFINE_HOST_FUNCTION(functionFileURLToPath, (JSC::JSGlobalObject * globalObj
version constructBunVersion ReadOnly|DontDelete|PropertyCallback
which BunObject_callback_which DontDelete|Function 1
RedisClient BunObject_lazyPropCb_wrap_ValkeyClient DontDelete|PropertyCallback
RedisClient2 BunObject_lazyPropCb_wrap_ValkeyClient2 DontDelete|PropertyCallback
redis BunObject_lazyPropCb_wrap_valkey DontDelete|PropertyCallback
secrets constructSecretsObject DontDelete|PropertyCallback
write BunObject_callback_write DontDelete|Function 1

View File

@@ -0,0 +1,35 @@
/// Opaque type for working with JavaScript `Set` objects.
pub const JSSet = opaque {
pub const create = bun.cpp.JSC__JSSet__create;
/// Add a value to this JS Set object.
pub const add = bun.cpp.JSC__JSSet__add;
/// Test whether this JS Set object has a given value.
pub const has = bun.cpp.JSC__JSSet__has;
/// Attempt to remove a value from this JS Set object.
pub const remove = bun.cpp.JSC__JSSet__remove;
/// Clear all entries from this JS Set object.
pub const clear = bun.cpp.JSC__JSSet__clear;
/// Retrieve the number of entries in this JS Set object.
pub const size = bun.cpp.JSC__JSSet__size;
/// Attempt to convert a `JSValue` to a `*JSSet`.
///
/// Returns `null` if the value is not a Set.
pub fn fromJS(value: JSValue) ?*JSSet {
if (value.jsTypeLoose() == .Set) {
return bun.cast(*JSSet, value.asEncoded().asPtr.?);
}
return null;
}
};
const bun = @import("bun");
const jsc = bun.jsc;
const JSValue = jsc.JSValue;

View File

@@ -54,6 +54,8 @@
#include "JavaScriptCore/JSInternalPromise.h"
#include "JavaScriptCore/JSMap.h"
#include "JavaScriptCore/JSMapIterator.h"
#include "JavaScriptCore/JSSet.h"
#include "JavaScriptCore/JSSetIterator.h"
#include "JavaScriptCore/JSModuleLoader.h"
#include "JavaScriptCore/JSModuleRecord.h"
#include "JavaScriptCore/JSNativeStdFunction.h"
@@ -129,6 +131,7 @@
#include "JSDOMFormData.h"
#include "ZigGeneratedClasses.h"
#include "JavaScriptCore/JSMapInlines.h"
#include "JavaScriptCore/JSSetInlines.h"
#include <JavaScriptCore/JSWeakMap.h>
#include "JSURLSearchParams.h"
@@ -6556,6 +6559,38 @@ CPP_DECL [[ZIG_EXPORT(check_slow)]] uint32_t JSC__JSMap__size(JSC::JSMap* map, J
return map->size();
}
CPP_DECL [[ZIG_EXPORT(nothrow)]] JSC::EncodedJSValue JSC__JSSet__create(JSC::JSGlobalObject* arg0)
{
return JSC::JSValue::encode(JSC::JSSet::create(arg0->vm(), arg0->setStructure()));
}
CPP_DECL [[ZIG_EXPORT(check_slow)]] void JSC__JSSet__add(JSC::JSSet* set, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2)
{
set->add(arg1, JSC::JSValue::decode(JSValue2));
}
CPP_DECL [[ZIG_EXPORT(check_slow)]] bool JSC__JSSet__has(JSC::JSSet* set, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2)
{
const JSC::JSValue value = JSC::JSValue::decode(JSValue2);
return set->has(arg1, value);
}
CPP_DECL [[ZIG_EXPORT(check_slow)]] bool JSC__JSSet__remove(JSC::JSSet* set, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2)
{
const JSC::JSValue value = JSC::JSValue::decode(JSValue2);
return set->remove(arg1, value);
}
CPP_DECL [[ZIG_EXPORT(check_slow)]] void JSC__JSSet__clear(JSC::JSSet* set, JSC::JSGlobalObject* arg1)
{
set->clear(arg1);
}
CPP_DECL [[ZIG_EXPORT(check_slow)]] uint32_t JSC__JSSet__size(JSC::JSSet* set, JSC::JSGlobalObject* arg1)
{
return set->size();
}
CPP_DECL void JSC__VM__setControlFlowProfiler(JSC::VM* vm, bool isEnabled)
{
if (isEnabled) {

View File

@@ -86,6 +86,7 @@ pub const Classes = struct {
pub const ResumableS3UploadSink = webcore.ResumableS3UploadSink;
pub const HTMLBundle = api.HTMLBundle;
pub const RedisClient = api.Valkey;
pub const RedisClient2 = api.Valkey2;
pub const BlockList = api.BlockList;
pub const NativeZstd = api.NativeZstd;
pub const SourceMap = bun.sourcemap.JSSourceMap;

View File

@@ -192,6 +192,15 @@ CPP_DECL bool JSC__JSMap__remove(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JS
CPP_DECL void JSC__JSMap__set(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2, JSC::EncodedJSValue JSValue3);
CPP_DECL uint32_t JSC__JSMap__size(JSC::JSMap* arg0, JSC::JSGlobalObject* arg1);
#pragma mark - JSC::JSSet
CPP_DECL JSC::EncodedJSValue JSC__JSSet__create(JSC::JSGlobalObject* arg0);
CPP_DECL void JSC__JSSet__add(JSC::JSSet* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2);
CPP_DECL bool JSC__JSSet__has(JSC::JSSet* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2);
CPP_DECL bool JSC__JSSet__remove(JSC::JSSet* arg0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2);
CPP_DECL void JSC__JSSet__clear(JSC::JSSet* arg0, JSC::JSGlobalObject* arg1);
CPP_DECL uint32_t JSC__JSSet__size(JSC::JSSet* arg0, JSC::JSGlobalObject* arg1);
#pragma mark - JSC::JSValue
CPP_DECL void JSC__JSValue__then(JSC::EncodedJSValue JSValue0, JSC::JSGlobalObject* arg1, JSC::EncodedJSValue JSValue2, SYSV_ABI JSC::EncodedJSValue(* ArgFn3)(JSC::JSGlobalObject* arg0, JSC::CallFrame* arg1), SYSV_ABI JSC::EncodedJSValue(* ArgFn4)(JSC::JSGlobalObject* arg0, JSC::CallFrame* arg1));

View File

@@ -110,6 +110,17 @@ pub fn pipeReadBuffer(this: *const EventLoop) []u8 {
return this.virtual_machine.rareData().pipeReadBuffer();
}
pub inline fn rejectPromise(
this: *EventLoop,
promise: *jsc.JSPromise,
global_object: *jsc.JSGlobalObject,
err_value: jsc.JSValue,
) void {
this.enter();
defer this.exit();
promise.reject(global_object, err_value);
}
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
const log = bun.Output.scoped(.EventLoop, .hidden);

View File

@@ -61,6 +61,7 @@ pub const JSFunction = @import("./bindings/JSFunction.zig").JSFunction;
pub const JSGlobalObject = @import("./bindings/JSGlobalObject.zig").JSGlobalObject;
pub const JSInternalPromise = @import("./bindings/JSInternalPromise.zig").JSInternalPromise;
pub const JSMap = @import("./bindings/JSMap.zig").JSMap;
pub const JSSet = @import("./bindings/JSSet.zig").JSSet;
pub const JSModuleLoader = @import("./bindings/JSModuleLoader.zig").JSModuleLoader;
pub const JSObject = @import("./bindings/JSObject.zig").JSObject;
pub const JSPromise = @import("./bindings/JSPromise.zig").JSPromise;

View File

@@ -52,6 +52,7 @@ export const sharedTypes: Record<string, string> = {
"ZigString": "bun.jsc.ZigString",
"JSC::JSPromise": "bun.jsc.JSPromise",
"JSC::JSMap": "bun.jsc.JSMap",
"JSC::JSSet": "bun.jsc.JSSet",
"JSC::CustomGetterSetter": "bun.jsc.CustomGetterSetter",
"JSC::SourceProvider": "bun.jsc.SourceProvider",
"JSC::CallFrame": "bun.jsc.CallFrame",

View File

@@ -662,6 +662,11 @@ pub const OffsetByteList = struct {
self.deinit(allocator);
self.* = .{};
}
/// Non-automatic memory usage of this object.
pub fn memoryCost(self: *const Self) usize {
return self.byte_list.memoryCost();
}
};
pub const safety_checks = Environment.ci_assert;

View File

@@ -152,7 +152,7 @@ pub fn LinearFifo(
}
/// Returns a writable slice from the 'read' end of the fifo
fn readableSliceMut(self: SliceSelfArg, offset: usize) []T {
pub fn readableSliceMut(self: SliceSelfArg, offset: usize) []T {
if (offset > self.count) return &[_]T{};
var start = self.head + offset;

View File

@@ -1151,6 +1151,13 @@ pub inline fn debugWarn(comptime fmt: []const u8, args: anytype) void {
}
}
/// Panic the application with a debug message, only in debug mode
pub inline fn debugPanic(comptime fmt: []const u8, args: anytype) void {
if (bun.Environment.isDebug) {
panic("<red>debug panic<r><d>:<r> " ++ fmt, args);
}
}
/// Print a red error message. The first argument takes an `error_name` value, which can be either
/// be a Zig error, or a string or enum. The error name is converted to a string and displayed
/// in place of "error:", making it useful to print things like "EACCES: Couldn't open package.json"

View File

@@ -23,6 +23,12 @@ pub fn count(this: *StringBuilder, slice: []const u8) void {
this.cap += slice.len;
}
pub fn countMany(this: *StringBuilder, slices: anytype) void {
inline for (slices) |slice| {
this.count(slice);
}
}
pub fn allocate(this: *StringBuilder, allocator: Allocator) Allocator.Error!void {
const slice = try allocator.alloc(u8, this.cap);
this.ptr = slice.ptr;
@@ -102,6 +108,32 @@ pub fn append(this: *StringBuilder, slice: []const u8) []const u8 {
return result;
}
/// Append many slices at once, returning an array of slices.
pub fn appendMany(
this: *StringBuilder,
slices: anytype,
) [slices.len][]const u8 {
var result: [slices.len][]const u8 = undefined;
inline for (slices, 0..) |slice, idx| {
result[idx] = this.append(slice);
}
return result;
}
/// Measure many slices, allocate and then append them all at once.
///
/// This is more efficient than appending them one by one due to the fact that
/// it allocates once.
pub fn measureAllocateAppend(
this: *StringBuilder,
allocator: std.mem.Allocator,
slices: anytype,
) ![slices.len][]const u8 {
this.countMany(slices);
try this.allocate(allocator);
return this.appendMany(slices);
}
pub fn addConcat(this: *StringBuilder, slices: []const []const u8) bun.StringPointer {
var remain = this.allocatedSlice()[this.len..];
var len: usize = 0;

View File

@@ -21,6 +21,14 @@ pub fn containsChar(self: string, char: u8) callconv(bun.callconv_inline) bool {
return indexOfChar(self, char) != null;
}
pub fn intToStr(
integer: anytype,
) [std.fmt.count("{d}", .{std.math.maxInt(i32)})]u8 {
var buf: [std.fmt.count("{d}", .{std.math.maxInt(i32)})]u8 = undefined;
std.fmt.format(buf[0..], "{d}", .{integer}) catch unreachable;
return buf;
}
pub fn containsCharT(comptime T: type, self: []const T, char: u8) callconv(bun.callconv_inline) bool {
return switch (T) {
u8 => containsChar(self, char),

View File

@@ -315,6 +315,7 @@ pub const JSValkeyClient = struct {
else
valkey.Options{};
// TODO(markovejnovic): This feels like it is leaked?
var connection_strings: []u8 = &.{};
errdefer {
this_allocator.free(connection_strings);
@@ -1152,6 +1153,8 @@ pub const JSValkeyClient = struct {
pub fn run(self: *@This()) void {
defer bun.default_allocator.destroy(self);
// TODO(markovejnovic): This true seems like a bug -- shouldn't
// we only do this in the SSL case?
self.ctx.deinit(true);
}
};

View File

@@ -1411,6 +1411,7 @@ const compile = struct {
}
};
}
pub fn @"(key: RedisKey, value: RedisValue)"(
comptime name: []const u8,
comptime command: []const u8,

View File

@@ -1,3 +1,4 @@
// TODO(markovejnovic): Remove JS from this file. Why oh why is this not decoupled?
pub const RedisError = error{
AuthenticationFailed,
ConnectionClosed,

295
src/valkey2/command.zig Normal file
View File

@@ -0,0 +1,295 @@
pub const CommandDescriptor = enum {
APPEND,
BITCOUNT,
HMSET,
HMGET,
BLMOVE,
BLMPOP,
HINCRBY,
BLPOP,
BRPOP,
BZMPOP,
BZPOPMAX,
BZPOPMIN,
COPY,
DECR,
DECRBY,
DEL,
DUMP,
EXISTS,
EXPIRE,
EXPIREAT,
EXPIRETIME,
GET,
GETBIT,
GETDEL,
GETEX,
GETSET,
HDEL,
HELLO,
HGET,
HGETALL,
HINCRBYFLOAT,
HKEYS,
HLEN,
HRANDFIELD,
HSCAN,
HSTRLEN,
HVALS,
INCR,
INCRBY,
INCRBYFLOAT,
KEYS,
LINDEX,
LINSERT,
LLEN,
LMOVE,
LMPOP,
LPOP,
LPOS,
LPUSH,
LPUSHX,
MGET,
MSET,
MSETNX,
PERSIST,
PEXPIRE,
PEXPIREAT,
PEXPIRETIME,
RPUSH,
RPUSHX,
ZMSCORE,
ZREMRANGEBYLEX,
ZREMRANGEBYRANK,
ZREMRANGEBYSCORE,
PFADD,
PING,
PTTL,
PUBLISH,
PUBSUB,
RANDOMKEY,
RENAME,
RENAMENX,
RPOP,
RPOPLPUSH,
SCAN,
SCARD,
SCRIPT,
SDIFF,
SDIFFSTORE,
SELECT,
SET,
SETNX,
SINTER,
SINTERCARD,
SINTERSTORE,
SMISMEMBER,
SPUBLISH,
SSCAN,
STRLEN,
SUBSCRIBE,
SUNION,
SUNIONSTORE,
TOUCH,
TYPE,
UNLINK,
/// Do not directly use UNSUBSCRIBE -- use the Pub/Sub API instead.
UNSUBSCRIBE,
ZADD,
ZCARD,
ZDIFF,
ZDIFFSTORE,
ZINTER,
ZINTERCARD,
ZINTERSTORE,
ZMPOP,
ZPOPMAX,
ZPOPMIN,
ZRANDMEMBER,
ZRANGE,
ZRANGEBYLEX,
ZRANGEBYSCORE,
ZRANGESTORE,
ZRANK,
ZREM,
ZREVRANGE,
ZREVRANGEBYLEX,
ZREVRANGEBYSCORE,
ZREVRANK,
ZSCAN,
ZSCORE,
ZUNION,
ZUNIONSTORE,
BRPOPLPUSH,
TTL,
SREM,
SETBIT,
GETRANGE,
SETRANGE,
LRANGE,
LREM,
LSET,
LTRIM,
SMEMBERS,
ZCOUNT,
ZLEXCOUNT,
SETEX,
PSETEX,
ZINCRBY,
SUBSTR,
SISMEMBER,
HTTL,
SMOVE,
SADD,
SPOP,
SRANDMEMBER,
HSETNX,
HPTTL,
HGETDEL,
HGETEX,
HSETEX,
HSET,
HEXPIRE,
HEXPIREAT,
HEXPIRETIME,
HPERSIST,
HPEXPIRE,
HPEXPIREAT,
HPEXPIRETIME,
// TODO(markovejnovic): Test this better
HEXISTS,
pub fn returnsBool(self: CommandDescriptor) bool {
return switch (self) {
.EXISTS, .SISMEMBER, .HSETNX, .HEXISTS, .SMOVE => true,
else => false,
};
}
pub fn toString(self: CommandDescriptor) []const u8 {
return @tagName(self);
}
/// Whether this command can be pipelined or not.
///
/// This is pretty important as some commands cannot really be pipelined and require flushing
/// any pending commands before executing them.
pub fn canBePipelined(self: CommandDescriptor) bool {
return switch (self) {
.HELLO => false,
else => true,
};
}
};
pub const Command = struct {
const Self = @This();
command: union(enum) {
inline_str: []const u8,
command_id: CommandDescriptor,
/// Test whether this command is one which returns a boolean value.
pub fn returnsBool(self: @This()) bool {
return switch (self) {
.inline_str => |_| false,
.command_id => |id| id.returnsBool(),
};
}
pub fn toString(self: @This()) []const u8 {
return switch (self) {
.inline_str => |s| s,
.command_id => |id| id.toString(),
};
}
},
args: CommandArgs,
pub fn initDirect(command: []const u8, args: CommandArgs) Self {
return Self{ .command = .{ .inline_str = command }, .args = args };
}
pub fn initById(command: CommandDescriptor, args: CommandArgs) Self {
return Self{ .command = .{ .command_id = command }, .args = args };
}
pub fn serialize(self: *const Self, allocator: std.mem.Allocator) ![]u8 {
var buf = try std.ArrayList(u8).initCapacity(allocator, self.byteLength());
errdefer buf.deinit();
try self.write(buf.writer());
return buf.items;
}
pub fn format(
this: Self,
comptime _: []const u8,
_: std.fmt.FormatOptions,
writer: anytype,
) !void {
try this.write(writer);
}
pub fn byteLength(self: *const Self) usize {
return std.fmt.count("{}", .{self.*});
}
pub fn canBePipelined(self: *const Self) bool {
return switch (self.command) {
// TODO(markovejnovic): This doesn't make too much sense to me since we don't know what
// the command is. Maybe we should assume the worst and say it can't be pipelined?
// However, this was the legacy behavior so I decided not to change it for now.
.inline_str => |_| return true,
.command_id => |id| return id.canBePipelined(),
};
}
pub fn returnsBool(self: *const Self) bool {
return self.command.returnsBool();
}
/// Write the command in RESP format to the given writer
pub fn write(this: *const Self, writer: anytype) !void {
try writer.print("*{d}\r\n", .{1 + this.args.len()});
try writer.print("${d}\r\n{s}\r\n", .{
this.command.toString().len,
this.command.toString(),
});
switch (this.args) {
inline .slices, .args => |args| {
for (args) |*arg| {
const slice = arg.slice();
try writer.print("${d}\r\n", .{arg.byteLength()});
try writer.writeAll(slice);
try writer.writeAll("\r\n");
}
},
.raw => |args| {
for (args) |arg| {
try writer.print("${d}\r\n", .{arg.len});
try writer.writeAll(arg);
try writer.writeAll("\r\n");
}
},
}
}
};
pub const CommandArgs = union(enum) {
slices: []const bun.jsc.ZigString.Slice,
args: []const bun.api.node.BlobOrStringOrBuffer,
raw: []const []const u8,
pub fn len(this: *const @This()) usize {
return switch (this.*) {
inline .slices, .args, .raw => |args| args.len,
};
}
};
const bun = @import("bun");
const std = @import("std");

1642
src/valkey2/js_valkey.zig Normal file

File diff suppressed because it is too large Load Diff

714
src/valkey2/protocol.zig Normal file
View File

@@ -0,0 +1,714 @@
//! Represents the RESP protocol used by Valkey and Redis.
//!
//! Note that this implementation has been blindly ported from the legacy
//! Valkey implementation and may not be optimal, idiomatic or correct.
//!
//! TODO(markovejnovic): This code should not need to rely on JS objects.
pub const RedisError = error{
AuthenticationFailed,
ConnectionClosed,
InvalidArgument,
InvalidArray,
InvalidAttribute,
InvalidBigNumber,
InvalidBlobError,
InvalidBoolean,
InvalidBulkString,
InvalidCommand,
InvalidDouble,
InvalidErrorString,
InvalidInteger,
InvalidMap,
InvalidNull,
InvalidPush,
InvalidResponse,
InvalidResponseType,
InvalidSet,
InvalidSimpleString,
InvalidVerbatimString,
JSError,
OutOfMemory,
UnsupportedProtocol,
ConnectionTimeout,
IdleTimeout,
};
pub fn valkeyErrorToJS(
globalObject: *jsc.JSGlobalObject,
err: RedisError,
comptime msg_fmt: ?[:0]const u8,
msg_args: anytype,
) jsc.JSValue {
const error_code: jsc.Error = switch (err) {
error.ConnectionClosed => .REDIS_CONNECTION_CLOSED,
error.InvalidResponse => .REDIS_INVALID_RESPONSE,
error.InvalidBulkString => .REDIS_INVALID_BULK_STRING,
error.InvalidArray => .REDIS_INVALID_ARRAY,
error.InvalidInteger => .REDIS_INVALID_INTEGER,
error.InvalidSimpleString => .REDIS_INVALID_SIMPLE_STRING,
error.InvalidErrorString => .REDIS_INVALID_ERROR_STRING,
error.InvalidDouble,
error.InvalidBoolean,
error.InvalidNull,
error.InvalidMap,
error.InvalidSet,
error.InvalidBigNumber,
error.InvalidVerbatimString,
error.InvalidBlobError,
error.InvalidAttribute,
error.InvalidPush,
=> .REDIS_INVALID_RESPONSE,
error.AuthenticationFailed => .REDIS_AUTHENTICATION_FAILED,
error.InvalidCommand => .REDIS_INVALID_COMMAND,
error.InvalidArgument => .REDIS_INVALID_ARGUMENT,
error.UnsupportedProtocol => .REDIS_INVALID_RESPONSE,
error.InvalidResponseType => .REDIS_INVALID_RESPONSE_TYPE,
error.ConnectionTimeout => .REDIS_CONNECTION_TIMEOUT,
error.IdleTimeout => .REDIS_IDLE_TIMEOUT,
error.JSError => return globalObject.takeException(error.JSError),
error.OutOfMemory => globalObject.throwOutOfMemory() catch
return globalObject.takeException(error.JSError),
};
return error_code.fmt(
globalObject,
if (msg_fmt) |f| f else "Valkey error: {s}",
if (msg_fmt != null) msg_args else .{@errorName(err)},
);
}
// RESP protocol types
pub const RESPType = enum(u8) {
// RESP2 types
SimpleString = '+',
Error = '-',
Integer = ':',
BulkString = '$',
Array = '*',
// RESP3 types
Null = '_',
Double = ',',
Boolean = '#',
BlobError = '!',
VerbatimString = '=',
Map = '%',
Set = '~',
Attribute = '|',
Push = '>',
BigNumber = '(',
pub fn fromByte(byte: u8) ?RESPType {
return switch (byte) {
@intFromEnum(RESPType.SimpleString) => .SimpleString,
@intFromEnum(RESPType.Error) => .Error,
@intFromEnum(RESPType.Integer) => .Integer,
@intFromEnum(RESPType.BulkString) => .BulkString,
@intFromEnum(RESPType.Array) => .Array,
@intFromEnum(RESPType.Null) => .Null,
@intFromEnum(RESPType.Double) => .Double,
@intFromEnum(RESPType.Boolean) => .Boolean,
@intFromEnum(RESPType.BlobError) => .BlobError,
@intFromEnum(RESPType.VerbatimString) => .VerbatimString,
@intFromEnum(RESPType.Map) => .Map,
@intFromEnum(RESPType.Set) => .Set,
@intFromEnum(RESPType.Attribute) => .Attribute,
@intFromEnum(RESPType.Push) => .Push,
@intFromEnum(RESPType.BigNumber) => .BigNumber,
else => null,
};
}
};
pub const RESPValue = union(RESPType) {
// RESP2 types
SimpleString: []const u8,
Error: []const u8,
Integer: i64,
BulkString: ?[]const u8,
Array: []RESPValue,
// RESP3 types
Null: void,
Double: f64,
Boolean: bool,
BlobError: []const u8,
VerbatimString: VerbatimString,
Map: []MapEntry,
Set: []RESPValue,
Attribute: Attribute,
Push: Push,
BigNumber: []const u8,
pub fn deinit(self: *RESPValue, allocator: std.mem.Allocator) void {
switch (self.*) {
.SimpleString => |str| allocator.free(str),
.Error => |str| allocator.free(str),
.Integer => {},
.BulkString => |maybe_str| if (maybe_str) |str| allocator.free(str),
.Array => |array| {
for (array) |*value| {
value.deinit(allocator);
}
allocator.free(array);
},
.Null => {},
.Double => {},
.Boolean => {},
.BlobError => |str| allocator.free(str),
.VerbatimString => |*verbatim| {
allocator.free(verbatim.format);
allocator.free(verbatim.content);
},
.Map => |entries| {
for (entries) |*entry| {
entry.deinit(allocator);
}
allocator.free(entries);
},
.Set => |set| {
for (set) |*value| {
value.deinit(allocator);
}
allocator.free(set);
},
.Attribute => |*attribute| {
attribute.deinit(allocator);
},
.Push => |*push| {
push.deinit(allocator);
},
.BigNumber => |str| allocator.free(str),
}
}
pub fn format(self: @This(), comptime _: []const u8, options: anytype, writer: anytype) !void {
switch (self) {
.SimpleString => |str| try writer.writeAll(str),
.Error => |str| try writer.writeAll(str),
.Integer => |int| try writer.print("{d}", .{int}),
.BulkString => |maybe_str| {
try writer.writeAll(if (maybe_str) |str| str else "(nil)");
},
.Array => |array| {
try writer.writeAll("[");
for (array, 0..) |value, i| {
if (i > 0) try writer.writeAll(", ");
try value.format("", options, writer);
}
try writer.writeAll("]");
},
.Null => try writer.writeAll("(nil)"),
.Double => |d| try writer.print("{d}", .{d}),
.Boolean => |b| try writer.print("{}", .{b}),
.BlobError => |str| try writer.print("Error: {s}", .{str}),
.VerbatimString => |verbatim| try writer.print("{s}:{s}", .{
verbatim.format,
verbatim.content,
}),
.Map => |entries| {
try writer.writeAll("{");
for (entries, 0..) |entry, i| {
if (i > 0) try writer.writeAll(", ");
try entry.key.format("", options, writer);
try writer.writeAll(": ");
try entry.value.format("", options, writer);
}
try writer.writeAll("}");
},
.Set => |set| {
try writer.writeAll("Set{");
for (set, 0..) |value, i| {
if (i > 0) try writer.writeAll(", ");
try value.format("", options, writer);
}
try writer.writeAll("}");
},
.Attribute => |attribute| {
try writer.writeAll("(Attr: ");
try writer.writeAll("{");
for (attribute.attributes, 0..) |entry, i| {
if (i > 0) try writer.writeAll(", ");
try entry.key.format("", options, writer);
try writer.writeAll(": ");
try entry.value.format("", options, writer);
}
try writer.writeAll("} => ");
try attribute.value.format("", options, writer);
try writer.writeAll(")");
},
.Push => |push| {
try writer.print("Push({s}: [", .{push.kind});
for (push.data, 0..) |value, i| {
if (i > 0) try writer.writeAll(", ");
try value.format("", options, writer);
}
try writer.writeAll("])");
},
.BigNumber => |str| try writer.print("BigNumber({s})", .{str}),
}
}
pub fn toJS(
self: *const RESPValue,
globalObject: *jsc.JSGlobalObject,
) bun.JSError!jsc.JSValue {
return self.toJSWithOptions(globalObject, .{});
}
pub const ToJSOptions = struct {
return_as_buffer: bool = false,
};
fn valkeyStrToJSValue(
globalObject: *jsc.JSGlobalObject,
str: []const u8,
options: *const ToJSOptions,
) bun.JSError!jsc.JSValue {
if (options.return_as_buffer) {
// TODO: handle values > 4.7 GB
return try jsc.ArrayBuffer.createBuffer(globalObject, str);
} else {
return bun.String.createUTF8ForJS(globalObject, str);
}
}
pub fn toJSWithOptions(
self: *const RESPValue,
globalObject: *jsc.JSGlobalObject,
options: ToJSOptions,
) bun.JSError!jsc.JSValue {
switch (self.*) {
.SimpleString => |str| return valkeyStrToJSValue(globalObject, str, &options),
.Error => |str| return jsc.Error.REDIS_INVALID_RESPONSE.fmt(
globalObject,
"{s}",
.{str},
),
.Integer => |int| return jsc.JSValue.jsNumber(int),
.BulkString => |maybe_str| {
if (maybe_str) |str| {
return valkeyStrToJSValue(globalObject, str, &options);
} else {
return jsc.JSValue.jsNull();
}
},
.Array => |array| {
var js_array = try jsc.JSValue.createEmptyArray(globalObject, array.len);
for (array, 0..) |*item, i| {
const js_item = try item.toJSWithOptions(globalObject, options);
try js_array.putIndex(globalObject, @intCast(i), js_item);
}
return js_array;
},
.Null => return jsc.JSValue.jsNull(),
.Double => |d| return jsc.JSValue.jsNumber(d),
.Boolean => |b| return jsc.JSValue.jsBoolean(b),
.BlobError => |str| return jsc.Error.REDIS_INVALID_RESPONSE.fmt(
globalObject,
"{s}",
.{str},
),
.VerbatimString => |verbatim| return valkeyStrToJSValue(
globalObject,
verbatim.content,
&options,
),
.Map => |entries| {
var js_obj = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
for (entries) |*entry| {
const js_key = try entry.key.toJSWithOptions(globalObject, .{});
var key_str = try js_key.toBunString(globalObject);
defer key_str.deref();
const js_value = try entry.value.toJSWithOptions(globalObject, options);
try js_obj.putMayBeIndex(globalObject, &key_str, js_value);
}
return js_obj;
},
.Set => |set| {
var js_array = try jsc.JSValue.createEmptyArray(globalObject, set.len);
for (set, 0..) |*item, i| {
const js_item = try item.toJSWithOptions(globalObject, options);
try js_array.putIndex(globalObject, @intCast(i), js_item);
}
return js_array;
},
.Attribute => |attribute| {
// For now, we just return the value and ignore attributes
// In the future, we could attach the attributes as a hidden property
return try attribute.value.toJSWithOptions(globalObject, options);
},
.Push => |push| {
var js_obj = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
// Add the push type
const kind_str = try bun.String.createUTF8ForJS(globalObject, push.kind);
js_obj.put(globalObject, "type", kind_str);
// Add the data as an array
var data_array = try jsc.JSValue.createEmptyArray(globalObject, push.data.len);
for (push.data, 0..) |*item, i| {
const js_item = try item.toJSWithOptions(globalObject, options);
try data_array.putIndex(globalObject, @intCast(i), js_item);
}
js_obj.put(globalObject, "data", data_array);
return js_obj;
},
.BigNumber => |str| {
// Try to parse as number if possible
return if (std.fmt.parseInt(i64, str, 10)) |int|
jsc.JSValue.jsNumber(int)
else |_|
// If it doesn't fit in an i64, return as string
return bun.String.createUTF8ForJS(globalObject, str);
},
}
}
};
pub const ValkeyReader = struct {
buffer: []const u8,
pos: usize = 0,
pub fn init(buffer: []const u8) ValkeyReader {
return .{
.buffer = buffer,
};
}
pub fn readByte(self: *ValkeyReader) RedisError!u8 {
if (self.pos >= self.buffer.len) return error.InvalidResponse;
const byte = self.buffer[self.pos];
self.pos += 1;
return byte;
}
pub fn readUntilCRLF(self: *ValkeyReader) RedisError![]const u8 {
const buffer = self.buffer[self.pos..];
for (buffer, 0..) |byte, i| {
if (byte == '\r' and buffer.len > i + 1 and buffer[i + 1] == '\n') {
const result = buffer[0..i];
self.pos += i + 2;
return result;
}
}
return error.InvalidResponse;
}
pub fn readInteger(self: *ValkeyReader) RedisError!i64 {
const str = try self.readUntilCRLF();
return std.fmt.parseInt(i64, str, 10) catch return error.InvalidInteger;
}
pub fn readDouble(self: *ValkeyReader) RedisError!f64 {
const str = try self.readUntilCRLF();
// Handle special values
if (std.mem.eql(u8, str, "inf")) return std.math.inf(f64);
if (std.mem.eql(u8, str, "-inf")) return -std.math.inf(f64);
if (std.mem.eql(u8, str, "nan")) return std.math.nan(f64);
// Parse normal double
return std.fmt.parseFloat(f64, str) catch return error.InvalidDouble;
}
pub fn readBoolean(self: *ValkeyReader) RedisError!bool {
const str = try self.readUntilCRLF();
if (str.len != 1) return error.InvalidBoolean;
return switch (str[0]) {
't' => true,
'f' => false,
else => error.InvalidBoolean,
};
}
pub fn readVerbatimString(self: *ValkeyReader, allocator: std.mem.Allocator) RedisError!VerbatimString {
const len = try self.readInteger();
if (len < 0) return error.InvalidVerbatimString;
if (self.pos + @as(usize, @intCast(len)) > self.buffer.len) return error.InvalidVerbatimString;
const content_with_format = self.buffer[self.pos .. self.pos + @as(usize, @intCast(len))];
self.pos += @as(usize, @intCast(len));
// Expect CRLF after content
const crlf = try self.readUntilCRLF();
if (crlf.len != 0) return error.InvalidVerbatimString;
// Format should be "xxx:" followed by content
if (content_with_format.len < 4 or content_with_format[3] != ':') {
return error.InvalidVerbatimString;
}
const format = try allocator.dupe(u8, content_with_format[0..3]);
const content = try allocator.dupe(u8, content_with_format[4..]);
return VerbatimString{
.format = format,
.content = content,
};
}
pub fn readValue(self: *ValkeyReader, allocator: std.mem.Allocator) RedisError!RESPValue {
const type_byte = try self.readByte();
return switch (RESPType.fromByte(type_byte) orelse return error.InvalidResponseType) {
// RESP2 types
.SimpleString => {
const str = try self.readUntilCRLF();
const owned = try allocator.dupe(u8, str);
return RESPValue{ .SimpleString = owned };
},
.Error => {
const str = try self.readUntilCRLF();
const owned = try allocator.dupe(u8, str);
return RESPValue{ .Error = owned };
},
.Integer => {
const int = try self.readInteger();
return RESPValue{ .Integer = int };
},
.BulkString => {
const len = try self.readInteger();
if (len < 0) return RESPValue{ .BulkString = null };
if (self.pos + @as(usize, @intCast(len)) > self.buffer.len) return error.InvalidResponse;
const str = self.buffer[self.pos .. self.pos + @as(usize, @intCast(len))];
self.pos += @as(usize, @intCast(len));
const crlf = try self.readUntilCRLF();
if (crlf.len != 0) return error.InvalidBulkString;
const owned = try allocator.dupe(u8, str);
return RESPValue{ .BulkString = owned };
},
.Array => {
const len = try self.readInteger();
if (len < 0) return RESPValue{ .Array = &[_]RESPValue{} };
const array = try allocator.alloc(RESPValue, @as(usize, @intCast(len)));
errdefer allocator.free(array);
var i: usize = 0;
errdefer {
for (array[0..i]) |*item| {
item.deinit(allocator);
}
}
while (i < len) : (i += 1) {
array[i] = try self.readValue(allocator);
}
return RESPValue{ .Array = array };
},
// RESP3 types
.Null => {
_ = try self.readUntilCRLF(); // Read and discard CRLF
return RESPValue{ .Null = {} };
},
.Double => {
const d = try self.readDouble();
return RESPValue{ .Double = d };
},
.Boolean => {
const b = try self.readBoolean();
return RESPValue{ .Boolean = b };
},
.BlobError => {
const len = try self.readInteger();
if (len < 0) return error.InvalidBlobError;
if (self.pos + @as(usize, @intCast(len)) > self.buffer.len) return error.InvalidBlobError;
const str = self.buffer[self.pos .. self.pos + @as(usize, @intCast(len))];
self.pos += @as(usize, @intCast(len));
const crlf = try self.readUntilCRLF();
if (crlf.len != 0) return error.InvalidBlobError;
const owned = try allocator.dupe(u8, str);
return RESPValue{ .BlobError = owned };
},
.VerbatimString => {
return RESPValue{ .VerbatimString = try self.readVerbatimString(allocator) };
},
.Map => {
const len = try self.readInteger();
if (len < 0) return error.InvalidMap;
const entries = try allocator.alloc(MapEntry, @as(usize, @intCast(len)));
errdefer allocator.free(entries);
var i: usize = 0;
errdefer {
for (entries[0..i]) |*entry| {
entry.deinit(allocator);
}
}
while (i < len) : (i += 1) {
entries[i] = .{ .key = try self.readValue(allocator), .value = try self.readValue(allocator) };
}
return RESPValue{ .Map = entries };
},
.Set => {
const len = try self.readInteger();
if (len < 0) return error.InvalidSet;
var set = try allocator.alloc(RESPValue, @as(usize, @intCast(len)));
errdefer allocator.free(set);
var i: usize = 0;
errdefer {
for (set[0..i]) |*item| {
item.deinit(allocator);
}
}
while (i < len) : (i += 1) {
set[i] = try self.readValue(allocator);
}
return RESPValue{ .Set = set };
},
.Attribute => {
const len = try self.readInteger();
if (len < 0) return error.InvalidAttribute;
var attrs = try allocator.alloc(MapEntry, @as(usize, @intCast(len)));
errdefer allocator.free(attrs);
var i: usize = 0;
errdefer {
for (attrs[0..i]) |*entry| {
entry.deinit(allocator);
}
}
while (i < len) : (i += 1) {
var key = try self.readValue(allocator);
errdefer key.deinit(allocator);
const value = try self.readValue(allocator);
attrs[i] = .{ .key = key, .value = value };
}
// Read the actual value that follows the attributes
const value_ptr = try allocator.create(RESPValue);
errdefer {
allocator.destroy(value_ptr);
}
value_ptr.* = try self.readValue(allocator);
return RESPValue{ .Attribute = .{
.attributes = attrs,
.value = value_ptr,
} };
},
.Push => {
const len = try self.readInteger();
if (len < 0 or len == 0) return error.InvalidPush;
// First element is the push type
const push_type = try self.readValue(allocator);
var push_type_str: []const u8 = "";
switch (push_type) {
.SimpleString => |str| push_type_str = str,
.BulkString => |maybe_str| {
if (maybe_str) |str| {
push_type_str = str;
} else {
return error.InvalidPush;
}
},
else => return error.InvalidPush,
}
// Copy the push type string since the original will be freed
const push_type_dup = try allocator.dupe(u8, push_type_str);
errdefer allocator.free(push_type_dup);
// Read the rest of the data
var data = try allocator.alloc(RESPValue, @as(usize, @intCast(len - 1)));
errdefer allocator.free(data);
var i: usize = 0;
errdefer {
for (data[0..i]) |*item| {
item.deinit(allocator);
}
}
while (i < len - 1) : (i += 1) {
data[i] = try self.readValue(allocator);
}
return RESPValue{ .Push = .{
.kind = push_type_dup,
.data = data,
} };
},
.BigNumber => {
const str = try self.readUntilCRLF();
const owned = try allocator.dupe(u8, str);
return RESPValue{ .BigNumber = owned };
},
};
}
};
pub const MapEntry = struct {
key: RESPValue,
value: RESPValue,
pub fn deinit(self: *MapEntry, allocator: std.mem.Allocator) void {
self.key.deinit(allocator);
self.value.deinit(allocator);
}
};
pub const VerbatimString = struct {
format: []const u8, // e.g. "txt" or "mkd"
content: []const u8,
pub fn deinit(self: *VerbatimString, allocator: std.mem.Allocator) void {
allocator.free(self.format);
allocator.free(self.content);
}
};
pub const Push = struct {
kind: []const u8,
data: []RESPValue,
pub fn deinit(self: *Push, allocator: std.mem.Allocator) void {
allocator.free(self.kind);
for (self.data) |*item| {
item.deinit(allocator);
}
allocator.free(self.data);
}
};
pub const Attribute = struct {
attributes: []MapEntry,
value: *RESPValue,
pub fn deinit(self: *Attribute, allocator: std.mem.Allocator) void {
for (self.attributes) |*entry| {
entry.deinit(allocator);
}
allocator.free(self.attributes);
self.value.deinit(allocator);
allocator.destroy(self.value);
}
};
pub const SubscriptionPushMessageKind = enum(u2) {
const Self = @This();
message,
subscribe,
unsubscribe,
const String = bun.ComptimeStringMap(SubscriptionPushMessageKind, .{
.{ "message", .message },
.{ "subscribe", .subscribe },
.{ "unsubscribe", .unsubscribe },
});
pub fn fromString(str: []const u8) ?Self {
return Self.String.get(str);
}
};
const std = @import("std");
const bun = @import("bun");
const String = bun.String;
const jsc = bun.jsc;

2547
src/valkey2/valkey.zig Normal file

File diff suppressed because it is too large Load Diff

4
test/_util/algo.ts Normal file
View File

@@ -0,0 +1,4 @@
/** Python's zip. Does not bounds checking. Very simplistic. */
export function zip<T1, T2>(a: T1[], b: T2[]): [T1, T2][] {
return a.map((k, i) => [k, b[i]] as [T1, T2]);
}

136
test/_util/promises.ts Normal file
View File

@@ -0,0 +1,136 @@
/**
* Promise-related utilities for testing asynchronous behavior.
*/
/**
* Tracks a value over time and provides async waiting until it reaches a threshold.
* Useful for testing conditions that change asynchronously without hardcoded timeouts.
* Supports custom comparison functions for complex types.
*
* @template T - Type of value being tracked
*
* @example
* // Basic usage with numbers
* const counter = new PromiseStateTracker(0);
* setTimeout(() => counter.value = 5, 100);
* await counter.untilValue(5); // Waits until counter reaches 5
*
* @example
* // Custom comparison for objects
* const tracker = new PromiseStateTracker({ count: 0 }, 5000, (a, b) => a.count - b.count);
* await tracker.untilValue({ count: 10 });
*/
export class PromiseStateTracker<T> {
#value: T;
#timeoutMs: number;
#activeResolvers: [T, number, (value: T | PromiseLike<T>) => void][] = [];
#compareFn: (a: T, b: T) => number;
/**
* @param initialValue - Starting value for the tracker
* @param defaultTimeoutMs - Default timeout in ms for untilValue() calls (default: 5000)
* @param compareFn - Comparison function returning <0 if a<b, 0 if a=b, >0 if a>b (default: standard comparison)
* @example
* const tracker = new PromiseStateTracker(0, 10000); // 10s timeout
* @example
* // With custom comparison
* const tracker = new PromiseStateTracker({ x: 0 }, 5000, (a, b) => a.x - b.x);
*/
constructor(
initialValue: T,
defaultTimeoutMs: number = 5000,
compareFn: (a: T, b: T) => number = (a, b) => (a < b ? -1 : a > b ? 1 : 0)
) {
this.#value = initialValue;
this.#timeoutMs = defaultTimeoutMs;
this.#compareFn = compareFn;
}
/**
* Gets the current tracked value.
* @example
* expect(tracker.value).toBe(5);
*/
get value(): T {
return this.#value;
}
/**
* Sets a new value and resolves any waiting promises that meet their threshold.
* Uses the compare function to determine if the new value satisfies waiting conditions.
* @example
* tracker.value = 10; // Resolves all untilValue() calls waiting for <= 10
*/
set value(newValue: T) {
this.#value = newValue;
const toResolve = this.#activeResolvers
.filter(([expected]) => this.#compareFn(newValue, expected) >= 0);
toResolve.forEach(([, alarm, resolve]) => {
clearTimeout(alarm);
resolve(newValue);
});
this.#activeResolvers = this.#activeResolvers
.filter(([expected]) => this.#compareFn(newValue, expected) < 0);
}
/**
* Returns a promise that resolves when the tracked value reaches or exceeds the threshold.
* If already at or above the threshold, resolves immediately.
* Comparison is done using the compare function provided in the constructor.
* @param expectedValue - Threshold value to wait for (resolves when compareFn(value, expectedValue) >= 0)
* @param timeoutMs - Optional timeout override in ms (uses constructor default if omitted)
* @returns Promise that resolves with the current value when condition is met
* @example
* await tracker.untilValue(10); // Wait until value >= 10
* await tracker.untilValue(5, 1000); // Wait with 1s timeout
*/
untilValue(expectedValue: T, timeoutMs: number | undefined = undefined): Promise<T> {
return new Promise<T>((resolve, reject) => {
if (this.#compareFn(this.#value, expectedValue) >= 0) {
resolve(this.#value);
return;
}
const timeout = timeoutMs !== undefined ? timeoutMs : this.#timeoutMs;
const alarm = setTimeout(() => {
reject(new Error(`Timeout waiting for counter to reach ${expectedValue}, current is ${this.#value}.`));
}, timeout);
this.#activeResolvers.push([expectedValue, alarm, resolve]);
});
}
};
/**
* A specialized counter that can be awaited until it reaches a target value.
* Extends PromiseStateTracker with numeric comparison and an increment helper.
*
* @example
* const counter = new AwaitableCounter();
* setTimeout(() => counter.increment(), 50);
* setTimeout(() => counter.increment(), 100);
* await counter.untilValue(2); // Waits until counter reaches 2
*/
export class AwaitableCounter extends PromiseStateTracker<number> {
/**
* @param initialValue - Starting counter value (default: 0)
* @param defaultTimeoutMs - Default timeout in ms for untilValue() calls (default: 5000)
* @example
* const counter = new AwaitableCounter(10); // Starts at 10
*/
constructor(initialValue: number = 0, defaultTimeoutMs: number = 5000) {
super(initialValue, defaultTimeoutMs, (a, b) => a - b);
}
/**
* Increments the counter by 1 and resolves any waiting promises.
* @example
* counter.increment(); // counter.value is now counter.value + 1
*/
increment() {
this.value += 1;
}
}

280
test/_util/random.ts Normal file
View File

@@ -0,0 +1,280 @@
type UnitInterval = number & { readonly __brand: 'UnitInterval' };
function isUnitInterval(n: number): n is UnitInterval {
return n >= 0 && n < 1;
}
function asUnitInterval(n: number): UnitInterval {
if (!isUnitInterval(n)) {
throw new Error(`Expected number in [0, 1), got ${n}`);
}
return n as UnitInterval;
}
export type RandomEngine = () => UnitInterval;
/**
* Return a seed based on the current month and year.
*
* Tests which depend on randomness should be stable and reproducible and without a seed that is impossible.
* This function provides a seed that changes monthly, allowing tests to vary over time while remaining stable
* within a given month.
*
* @returns A seed value based on the current month and year.
*/
export function currentMonthSeed(): number {
const now = new Date();
return now.getFullYear() * 100 + (now.getMonth() + 1);
}
/**
* Seedable PRNG implementation using Mulberry32 algorithm.
*
* @param seed The seed value.
* @returns A function that generates a pseudo-random value [0, 1). Subsequent invokations will produce a sequence of
* pseudo-random values in the same range.
*/
export function mulberry32Prng(seed: number): RandomEngine {
let state = seed;
return () => {
let t = state += 0x6D2B79F5;
t = Math.imul(t ^ t >>> 15, t | 1);
t ^= t + Math.imul(t ^ t >>> 7, t | 61);
return asUnitInterval(((t ^ t >>> 14) >>> 0) / 4294967296);
}
}
/** Generate a random integer between min (inclusive) and max (exclusive). */
export function range(randomEngine: RandomEngine, min: number, max: number): number {
return Math.floor(randomEngine() * (max - min)) + min;
}
/** Simulate a coin flip using the given random engine. */
export function coinFlip(randomEngine: RandomEngine): boolean {
return randomEngine() < 0.5;
}
/**
* Select `n` random elements from an array, ensuring that the resulting array contains at least one element from the
* given list.
*
* `n` must be greater than or equal to the length of `universe`.
*/
export function selectNUniversal<T>(universe: T[], count: number, randomEngine: RandomEngine): T[] {
if (count < universe.length) {
throw new Error("Count must be >= universe length");
}
// Inefficient claude implementation.
const remaining = count - universe.length;
const extras = Array.from({ length: remaining }, () =>
universe[Math.floor(randomEngine() * universe.length)]
);
return shuffle([...universe, ...extras], randomEngine);
}
/**
* Shuffle an array using the Fisher-Yates algorithm and a custom random engine.
*/
export function shuffle<T>(array: T[], randomEngine: () => number): T[] {
const result = [...array];
for (let i = result.length - 1; i > 0; i--) {
const j = Math.floor(randomEngine() * (i + 1));
[result[i], result[j]] = [result[j], result[i]];
}
return result;
}
/** Geneerates a random UTF-8 string */
export function utf8String(randomEngine: RandomEngine, length: number): string {
// TODO(markovejnovic): This is a sucky Claude-generated implementation. Improve it.
let result = '';
for (let i = 0; i < length; i++) {
const rand = randomEngine();
// Distribute across different Unicode ranges
// 80% ASCII, 15% Latin-1 Supplement, 5% other BMP
if (rand < 0.8) {
// ASCII printable: 0x20-0x7E
result += String.fromCharCode(range(randomEngine, 0x20, 0x7F));
} else if (rand < 0.95) {
// Latin-1 Supplement: 0xA0-0xFF
result += String.fromCharCode(range(randomEngine, 0xA0, 0x100));
} else {
// Basic Multilingual Plane: 0x0100-0xD7FF (excludes surrogates)
result += String.fromCharCode(range(randomEngine, 0x0100, 0xD800));
}
}
return result;
}
/** Utilities random filesystem operations. */
export namespace FileSystem {
/** Generate a fake file/directory name for the given platform. */
export function fakeInodeName(randomEngine: RandomEngine, platform: "posix" | "windows"): string {
switch (platform) {
case "posix": {
const NAME_MAX = 255; // TODO(markovejnovic): Technically not true, since it really depends on the FS.
const len = range(randomEngine, 1, NAME_MAX);
return utf8String(randomEngine, len);
}
case "windows": {
throw new Error("Not implemented.");
}
}
}
/** Generate a filesystem path -- does not need to exist on disk. */
export function fakeAbsPath(
randomEngine: RandomEngine,
platform: "posix" | "windows",
ext: string | undefined = undefined,
): string {
const generatePosix = () => {
// TODO(markovejnovic): Claude-generated implementation, improve.
const MAX_PATH = 4096;
const parts: string[] = [];
let currentLength = 1; // Start with leading "/"
// Add extension length if provided
const extToAdd = ext ? (ext.startsWith('.') ? ext : '.' + ext) : '';
while (currentLength < MAX_PATH - extToAdd.length - 1) {
const part = fakeInodeName(randomEngine, platform);
const newLength = currentLength + part.length + 1; // +1 for "/"
// Would this exceed our limit?
if (newLength + extToAdd.length > MAX_PATH) {
break;
}
parts.push(part);
currentLength = newLength;
// Randomly stop to create varying depths (30% chance after at least 2 components)
if (parts.length >= 2 && randomEngine() < 0.3) {
break;
}
}
// Ensure we have at least one component
if (parts.length === 0) {
parts.push(fakeInodeName(randomEngine, platform).slice(0, 10));
}
// Add extension to last component
if (extToAdd) {
parts[parts.length - 1] += extToAdd;
}
return '/' + parts.join('/');
};
const generateWindows = () => {
throw new Error("Not implemented.");
};
switch (platform) {
case "posix": return generatePosix();
case "windows": return generateWindows();
}
}
}
/**
* Generates a very dirty string containing arbitrary byte values, encoded as latin1.
*
* This is useful for testing binary-safe operations, since latin1 encoding maps byte values 0-255 directly to Unicode
* code points 0-255.
*/
export function dirtyLatin1String(randomEngine: RandomEngine, length: number): string {
// Generate a random size between 1 byte and maxSize
const size = range(randomEngine, 1, length + 1);
// Create buffer and fill with random bytes
const buffer = Buffer.allocUnsafe(size);
// Process in chunks of 4 bytes for efficiency
const fullChunks = (size / 4) | 0; // Faster than Math.floor
const remainder = size % 4;
let i = 0;
// Handle complete 4-byte chunks without conditionals
for (let chunk = 0; chunk < fullChunks; chunk++) {
// Get 32 bits of randomness (| 0 coerces to int32, faster than Math.floor)
const rand = (randomEngine() * 0x100000000) | 0;
buffer[i++] = rand & 0xFF;
buffer[i++] = (rand >>> 8) & 0xFF;
buffer[i++] = (rand >>> 16) & 0xFF;
buffer[i++] = (rand >>> 24) & 0xFF;
}
// Handle remaining bytes
if (remainder > 0) {
const rand = (randomEngine() * 0x100000000) | 0;
if (remainder >= 1) buffer[i++] = rand & 0xFF;
if (remainder >= 2) buffer[i++] = (rand >>> 8) & 0xFF;
if (remainder >= 3) buffer[i++] = (rand >>> 16) & 0xFF;
}
// Use latin1 encoding to preserve all byte values
return buffer.toString('latin1');
}
/** Utilities for working with network operations. */
export namespace Net {
/** Generate a fake IP address (IPv4). */
export function fakeIpv4(randomEngine: RandomEngine): string {
return Array.from({ length: 4 }, () => range(randomEngine, 0, 256)).join('.');
}
/** Generate a fake IP address (IPv6). */
export function fakeIpv6(randomEngine: RandomEngine): string {
const segments = Array.from({ length: 8 }, () => range(randomEngine, 0, 0x10000).toString(16));
return segments.join(':');
}
/** Generate a fake IP address (either IPv4 or IPv6). */
export function fakeIp(randomEngine: RandomEngine): string {
return coinFlip(randomEngine) ? fakeIpv4(randomEngine) : fakeIpv6(randomEngine);
}
/** Generate a fake hostname. */
export function fakeHostname(randomEngine: RandomEngine): string {
// TODO(markovejnovic): Claude-generated implementation, improve.
// What the hell is this even?
const alphanumeric = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
const withHyphen = alphanumeric + '-';
const label = (len: number) => Array.from(
{ length: len },
(_, i) => (i === 0 || i === len - 1 ? alphanumeric : withHyphen)[
range(randomEngine, 0, i === 0 || i === len - 1 ? alphanumeric.length : withHyphen.length)
]
).join('');
const labels = Array.from({ length: 10 }, () => label(range(randomEngine, 1, 64)))
.reduce((acc, l) =>
acc.join('.').length + l.length + 1 <= 253 ? [...acc, l] : acc,
[] as string[]
)
.slice(0, range(randomEngine, 1, 10));
return labels.length > 0 ? labels.join('.') : label(range(randomEngine, 1, 64));
}
export function location(randomEngine: RandomEngine): string {
return coinFlip(randomEngine) ? fakeIp(randomEngine) : fakeHostname(randomEngine);
}
export function port(randomEngine: RandomEngine, min = 1, max = 65536): number {
return range(randomEngine, min, max);
}
}

View File

@@ -0,0 +1,215 @@
import * as random from "_util/random";
import {RedisClient2} from "bun";
/**
* Options for {@link describeValkey}.
*/
export interface ValkeyOptions {
server: "docker" | string
}
const defaultValkeyOptions: ValkeyOptions = {
server: "docker"
};
/**
* Context passed to the test suite function of {@link describeValkey}.
*/
export interface ValkeyContext {
serverUrl: string,
/** Fetch a Redis client. Subsequent invocations return the same object. */
client: () => RedisClient2,
connectedClient: () => Promise<RedisClient2>,
/** Create a new disconnected client. Each invocation creates a new instance. */
newDisconnectedClient: () => RedisClient2,
/** Restart the server. */
restartServer: () => Promise<void>,
};
/**
* Helper which manages the lifetime of a Valkey instance.
*
* All valkey tests which require a Valkey server should be using this fixture instead of {@link describe}. The
* semantics are the same as of {@link describe}.
*/
export function describeValkey(
description: string,
testSuite: (context: ValkeyContext) => void | Promise<void>,
options: ValkeyOptions = defaultValkeyOptions,
) {
if (options.server === "docker") {
throw new Error("Not implemented.");
}
let clientInstance: RedisClient2 | null = null;
let clientConnected = false;
const context: ValkeyContext = {
serverUrl: options.server,
client: () => {
if (clientInstance === null) {
clientInstance = new RedisClient2(context.serverUrl);
}
return clientInstance;
},
connectedClient: async () => {
const client = context.client();
if (!clientConnected) {
// Note: this is an async operation, but we don't want to make the entire context async.
// The test suite should await .connect() itself if it needs to.
await client.connect();
clientConnected = true;
}
return client;
},
newDisconnectedClient: () => new RedisClient2(context.serverUrl),
restartServer: async () => {
if (options.server !== "docker") {
// We're not the ones managing the server, so there's absolutely nothing we can do here.
throw new Error("This test is not supported when running against a non-Docker server.");
}
},
};
beforeEach(async () => {
// If the client was closed by a previous test, reset it
if (clientInstance && !clientInstance.connected) {
clientInstance = null;
clientConnected = false;
}
const client = await context.connectedClient();
await client.send("FLUSHALL", ["SYNC"]);
});
describe(description, () => {
beforeAll(async () => {
clientInstance = null;
clientConnected = false;
});
testSuite.bind(null, context)();
});
}
/** Utilities for working with Valkey URLs. */
export namespace Url {
/** List of protocols supported by Valkey. Valid in the context of `<protocol>://...` */
export const VALID_PROTOCOLS = [
"valkey", "valkeys", "valkey+tls", "valkey+unix", "valkey+tls+unix", "redis", "rediss", "redis+tls", "redis+unix",
"redis+tls+unix",
];
/** Valid range of database IDs. Redis normally lets you have up to 16 DBs, but this is configurable. */
export const VALID_DB_ID_RANGE = [0, 0xFFFFFF];
/** Generate a set of valid URLs covering all supported protocols, with other parameters randomized. */
export function generateValidSet(count: number, randomEngine: random.RandomEngine): string[] {
const protos = random.selectNUniversal(VALID_PROTOCOLS, count, randomEngine);
function generateUrl(proto: string) {
if (proto.includes("+unix")) {
return `${proto}://${random.FileSystem.fakeAbsPath(randomEngine, "posix")}`;
}
const dbId: number | undefined =
random.coinFlip(randomEngine) ? random.range(randomEngine, VALID_DB_ID_RANGE[0], VALID_DB_ID_RANGE[1])
: undefined;
const dbStr = dbId !== undefined ? `/${dbId}` : "";
return `${proto}://${random.Net.location(randomEngine)}:${random.Net.port(randomEngine)}${dbStr}`;
}
return protos.map(generateUrl);
}
}
/** Constructor options for {@link ValkeyFaker}. */
export interface ValkeyFakerOptions {
unfuzzy?: boolean;
};
/** Faker-eseque utilities for Valkey. */
export class ValkeyFaker {
#randomEngine: random.RandomEngine;
#options: ValkeyFakerOptions;
#unfuzzyGenerator: number;
constructor(randomEngine: random.RandomEngine, options: ValkeyFakerOptions = {}) {
this.#randomEngine = randomEngine;
this.#options = options;
this.#unfuzzyGenerator = 0;
}
get randomEngine(): random.RandomEngine {
return this.#randomEngine;
}
/**
* Generate a random binary-safe string suitable for use as a Redis/Valkey key.
*
* Uses uniform distribution across all byte values (0-255) for maximum randomness.
* The size of the generated string is randomly chosen between 1 byte and maxSize.
*
* The manual states that the key name is a binary safe string up to 512 MB in length.
*
* @param randomEngine The random number generator to use
* @param maxSize Maximum size in bytes (default: 512 MB)
* @returns A binary-safe random string
*/
key(maxSize: number = 512 * 1024 * 1024): string {
if (this.#options.unfuzzy) {
return `key:${this.#unfuzzyGenerator++}`;
}
return random.dirtyLatin1String(this.#randomEngine, maxSize);
}
edgeCaseKeys(count: number): string[] {
return Array.from({ length: count }, () => this.key(512 * 1024));
}
keys(count: number): string[] {
// Use 1 KB max size for regular keys to keep tests fast. 1kB is still a reasonably large key.
return Array.from({ length: count }, () => this.key(1024));
}
/** Generate a random binary-safe string suitable for use as a Redis/Valkey value. */
value(maxSize: number = 512 * 1024 * 1024): string {
if (this.#options.unfuzzy) {
return `value:${this.#unfuzzyGenerator++}`;
}
return random.dirtyLatin1String(this.#randomEngine, random.range(this.randomEngine, 0, maxSize));
}
edgeCaseValues(count: number): string[] {
// Use 1 KB max size for regular values to keep tests fast. 1kB is still a reasonably large value.
return Array.from({ length: count }, () => this.value(512 * 1024));
}
values(count: number): string[] {
// Use 1 KB max size for regular values to keep tests fast. 1kB is still a reasonably large value.
return Array.from({ length: count }, () => this.value(1024));
}
channel(maxSize: number = 256): string {
if (this.#options.unfuzzy) {
return `channel:${this.#unfuzzyGenerator++}`;
}
return random.dirtyLatin1String(this.#randomEngine, maxSize);
}
channels(count: number): string[] {
return Array.from({ length: count }, () => this.channel(256));
}
publishMessage(maxSize: number = 512 * 1024 * 1024): string {
return this.value(maxSize);
}
}

File diff suppressed because it is too large Load Diff