diff --git a/test/js/valkey/valkey.failing-subscriber-no-ipc.ts b/test/js/valkey/valkey.failing-subscriber-no-ipc.ts new file mode 100644 index 0000000000..cb2d5c346b --- /dev/null +++ b/test/js/valkey/valkey.failing-subscriber-no-ipc.ts @@ -0,0 +1,90 @@ +// Program which sets up a subscriber outside the scope of the main Jest process. +// Used within valkey.test.ts. +// +// DO NOT IMPORT FROM test-utils.ts. That import is janky and will have different state at different from different +// importers. +// +// These tests communicate over jsonb. +import { RedisClient } from "bun"; + +const CHANNEL = "error-callback-channel"; + +export interface Message { + event: string; +} + +export interface RunInfoMessage extends Message { + event: "start"; + url: string; + tlsPaths?: { cert: string; key: string; ca: string }; +} + +export interface ValkeyReceivedMessage extends Message { + event: "message"; + index: number; +} + +export interface ExceptionMessage extends Message { + event: "exception"; + exMsg: string; +} + +export interface ReadyMessage extends Message { + event: "ready"; +} + +async function messageParent(msg: MsgT): Promise { + process.stdout.write(JSON.stringify(msg) + "\n"); +} + +async function waitForParentMessage(expectedEvent: MsgT["event"]): Promise { + for await (const line of console) { + const parsed = JSON.parse(line); + if (typeof(parsed) !== "object") { + throw new Error("Expected object message"); + } + + if (parsed.event === undefined || typeof(parsed.event) !== "string") { + throw new Error("Expected event field as a string"); + } + + if (parsed.event !== expectedEvent) { + throw new Error(`Expected event ${expectedEvent} but got ${parsed.event}`); + } + + return parsed as MsgT; + } + + throw new Error("Input stream unexpectedly closed"); +} + +if (import.meta.main) { + await messageParent({ event: "ready-for-url" }); + const runInfo = await waitForParentMessage("start"); + const subscriber = new RedisClient(runInfo.url, { + tls: runInfo.tlsPaths + ? { + cert: Bun.file(runInfo.tlsPaths.cert), + key: Bun.file(runInfo.tlsPaths.key), + ca: Bun.file(runInfo.tlsPaths.ca), + } + : undefined, + }); + await subscriber.connect(); + + let counter = 0; + await subscriber.subscribe(CHANNEL, () => { + if ((counter++) === 1) { + throw new Error("Intentional callback error"); + } + + messageParent({ event: "message", index: counter }); + }); + + + process.on("uncaughtException", e => { + messageParent({ event: "exception", exMsg: e.message }); + }); + + await messageParent({ event: "ready" }); +} diff --git a/test/js/valkey/valkey.test.ts b/test/js/valkey/valkey.test.ts index df4132a30c..6f0364d6ca 100644 --- a/test/js/valkey/valkey.test.ts +++ b/test/js/valkey/valkey.test.ts @@ -16,6 +16,7 @@ import { TLS_REDIS_URL, } from "./test-utils"; import type { RedisTestStartMessage } from "./valkey.failing-subscriber"; +import type { Message } from "./valkey.failing-subscriber-no-ipc"; for (const connectionType of [ConnectionType.TLS, ConnectionType.TCP]) { const ctx = { ..._ctx, redis: connectionType ? _ctx.redis : (_ctx.redisTLS as RedisClient) }; @@ -6590,6 +6591,84 @@ for (const connectionType of [ConnectionType.TLS, ConnectionType.TCP]) { subscriber.close(); }); + test("callback errors don't crash the client (without IPC)", async () => { + const channel = "error-callback-channel"; + + const subscriberProc = spawn({ + cmd: [bunExe(), `${__dirname}/valkey.failing-subscriber-no-ipc.ts`], + stdout: "pipe", + stderr: "inherit", + stdin: "pipe", + env: { ...process.env, NODE_ENV: "development" }, + }); + + const reader = subscriberProc.stdout.getReader(); + async function* readLines() { + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + + for (const line of lines) { + yield line; + } + } + } + + async function waitForChildMessage(expectedEvent: MsgT["event"]): Promise { + for await (const line of readLines()) { + const parsed = JSON.parse(line); + if (typeof parsed !== "object") { + throw new Error("Expected object message"); + } + if (parsed.event === undefined || typeof parsed.event !== "string") { + throw new Error("Expected event field as a string"); + } + if (parsed.event !== expectedEvent) { + throw new Error(`Expected event ${expectedEvent} but got ${parsed.event}`); + } + return parsed as MsgT; + } + throw new Error("Input stream unexpectedly closed"); + } + + async function messageChild(msg: MsgT): Promise { + subscriberProc.stdin!.write(JSON.stringify(msg) + "\n"); + } + + try { + // Wait for the process to announce it is ready for messages. + await waitForChildMessage("ready-for-url"); + + // Tell the child to start and connect to Redis. + await messageChild({ + event: "start", + url: connectionType === ConnectionType.TLS ? TLS_REDIS_URL : DEFAULT_REDIS_URL, + tlsPaths: connectionType === ConnectionType.TLS ? TLS_REDIS_OPTIONS.tlsPaths : undefined, + }); + await waitForChildMessage("ready"); + + expect(await ctx.redis.publish(channel, "message1")).toBeGreaterThanOrEqual(1); + expect(await waitForChildMessage("message")).toMatchObject({ index: 1 }); + + // This should throw inside the child process, so it should notify us. + expect(await ctx.redis.publish(channel, "message2")).toBeGreaterThanOrEqual(1); + await waitForChildMessage("exception"); + + expect(await ctx.redis.publish(channel, "message1")).toBeGreaterThanOrEqual(1); + expect(await waitForChildMessage("message")).toMatchObject({ index: 3 }); + } finally { + subscriberProc.kill(); + await subscriberProc.exited; + } + }); + test("callback errors don't crash the client", async () => { const channel = "error-callback-channel";