test(valkey): Add a failing subscriber test without IPC (#23253)

### What does this PR do?

Adds a new test which mirrors the _callback errors don't crash the
client_ test but doesn't rely on IPC.

### How did you verify your code works?

Hopefully, CI
This commit is contained in:
Marko Vejnovic
2025-10-06 17:03:39 -07:00
committed by GitHub
parent fc9db832dc
commit 90c0c72212
2 changed files with 169 additions and 0 deletions

View File

@@ -0,0 +1,90 @@
// Program which sets up a subscriber outside the scope of the main Jest process.
// Used within valkey.test.ts.
//
// DO NOT IMPORT FROM test-utils.ts. That import is janky and will have different state at different from different
// importers.
//
// These tests communicate over jsonb.
import { RedisClient } from "bun";
const CHANNEL = "error-callback-channel";
export interface Message {
event: string;
}
export interface RunInfoMessage extends Message {
event: "start";
url: string;
tlsPaths?: { cert: string; key: string; ca: string };
}
export interface ValkeyReceivedMessage extends Message {
event: "message";
index: number;
}
export interface ExceptionMessage extends Message {
event: "exception";
exMsg: string;
}
export interface ReadyMessage extends Message {
event: "ready";
}
async function messageParent<MsgT extends Message>(msg: MsgT): Promise<void> {
process.stdout.write(JSON.stringify(msg) + "\n");
}
async function waitForParentMessage<MsgT extends Message>(expectedEvent: MsgT["event"]): Promise<MsgT> {
for await (const line of console) {
const parsed = JSON.parse(line);
if (typeof(parsed) !== "object") {
throw new Error("Expected object message");
}
if (parsed.event === undefined || typeof(parsed.event) !== "string") {
throw new Error("Expected event field as a string");
}
if (parsed.event !== expectedEvent) {
throw new Error(`Expected event ${expectedEvent} but got ${parsed.event}`);
}
return parsed as MsgT;
}
throw new Error("Input stream unexpectedly closed");
}
if (import.meta.main) {
await messageParent({ event: "ready-for-url" });
const runInfo = await waitForParentMessage<RunInfoMessage>("start");
const subscriber = new RedisClient(runInfo.url, {
tls: runInfo.tlsPaths
? {
cert: Bun.file(runInfo.tlsPaths.cert),
key: Bun.file(runInfo.tlsPaths.key),
ca: Bun.file(runInfo.tlsPaths.ca),
}
: undefined,
});
await subscriber.connect();
let counter = 0;
await subscriber.subscribe(CHANNEL, () => {
if ((counter++) === 1) {
throw new Error("Intentional callback error");
}
messageParent<ValkeyReceivedMessage>({ event: "message", index: counter });
});
process.on("uncaughtException", e => {
messageParent<ExceptionMessage>({ event: "exception", exMsg: e.message });
});
await messageParent({ event: "ready" });
}

View File

@@ -16,6 +16,7 @@ import {
TLS_REDIS_URL,
} from "./test-utils";
import type { RedisTestStartMessage } from "./valkey.failing-subscriber";
import type { Message } from "./valkey.failing-subscriber-no-ipc";
for (const connectionType of [ConnectionType.TLS, ConnectionType.TCP]) {
const ctx = { ..._ctx, redis: connectionType ? _ctx.redis : (_ctx.redisTLS as RedisClient) };
@@ -6590,6 +6591,84 @@ for (const connectionType of [ConnectionType.TLS, ConnectionType.TCP]) {
subscriber.close();
});
test("callback errors don't crash the client (without IPC)", async () => {
const channel = "error-callback-channel";
const subscriberProc = spawn({
cmd: [bunExe(), `${__dirname}/valkey.failing-subscriber-no-ipc.ts`],
stdout: "pipe",
stderr: "inherit",
stdin: "pipe",
env: { ...process.env, NODE_ENV: "development" },
});
const reader = subscriberProc.stdout.getReader();
async function* readLines() {
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
yield line;
}
}
}
async function waitForChildMessage<MsgT extends Message>(expectedEvent: MsgT["event"]): Promise<MsgT> {
for await (const line of readLines()) {
const parsed = JSON.parse(line);
if (typeof parsed !== "object") {
throw new Error("Expected object message");
}
if (parsed.event === undefined || typeof parsed.event !== "string") {
throw new Error("Expected event field as a string");
}
if (parsed.event !== expectedEvent) {
throw new Error(`Expected event ${expectedEvent} but got ${parsed.event}`);
}
return parsed as MsgT;
}
throw new Error("Input stream unexpectedly closed");
}
async function messageChild<MsgT extends Message>(msg: MsgT): Promise<void> {
subscriberProc.stdin!.write(JSON.stringify(msg) + "\n");
}
try {
// Wait for the process to announce it is ready for messages.
await waitForChildMessage("ready-for-url");
// Tell the child to start and connect to Redis.
await messageChild({
event: "start",
url: connectionType === ConnectionType.TLS ? TLS_REDIS_URL : DEFAULT_REDIS_URL,
tlsPaths: connectionType === ConnectionType.TLS ? TLS_REDIS_OPTIONS.tlsPaths : undefined,
});
await waitForChildMessage("ready");
expect(await ctx.redis.publish(channel, "message1")).toBeGreaterThanOrEqual(1);
expect(await waitForChildMessage("message")).toMatchObject({ index: 1 });
// This should throw inside the child process, so it should notify us.
expect(await ctx.redis.publish(channel, "message2")).toBeGreaterThanOrEqual(1);
await waitForChildMessage("exception");
expect(await ctx.redis.publish(channel, "message1")).toBeGreaterThanOrEqual(1);
expect(await waitForChildMessage("message")).toMatchObject({ index: 3 });
} finally {
subscriberProc.kill();
await subscriberProc.exited;
}
});
test("callback errors don't crash the client", async () => {
const channel = "error-callback-channel";