improve worker memory leak test ?

This commit is contained in:
Alistair Smith
2025-06-11 15:31:08 -07:00
parent a8683d4d2a
commit be8ef90134

View File

@@ -1,13 +1,13 @@
import { beforeAll, describe, expect, test } from "bun:test";
import { describe, expect, test } from "bun:test";
import { Worker as WebWorker } from "worker_threads";
const CONFIG = {
SMALL_BATCH_SIZE: 5,
LARGE_BATCH_SIZE: 50,
STRESS_ITERATIONS: 3,
MEMORY_THRESHOLD_MB: 200,
WARMUP_ITERATIONS: 5,
TEST_ITERATIONS: 20,
BATCH_SIZE: 10,
MEMORY_THRESHOLD_MB: 20,
GC_SETTLE_TIME: 200,
TEST_TIMEOUT_MS: 10000,
TEST_TIMEOUT_MS: 30000,
};
interface MemorySnapshot {
@@ -42,7 +42,7 @@ function logMemoryDiff(before: MemorySnapshot, after: MemorySnapshot, label: str
heap: `${before.heapUsed}MB -> ${after.heapUsed}MB (${heapDiff >= 0 ? "+" : ""}${heapDiff}MB)`,
});
if (rssDiff > 100) {
if (rssDiff > 50) {
console.warn(`⚠️ Large memory increase detected: +${rssDiff}MB RSS`);
}
}
@@ -54,54 +54,68 @@ async function withTimeout<T>(promise: Promise<T>, ms: number, description: stri
return Promise.race([promise, timeout]);
}
async function runWorkerBatch(workerCode: string, batchSize: number = CONFIG.BATCH_SIZE): Promise<void> {
const workers: WebWorker[] = [];
for (let i = 0; i < batchSize; i++) {
const worker = new WebWorker(workerCode, { eval: true });
workers.push(worker);
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
worker.removeAllListeners();
reject(new Error(`Worker ${i} failed to respond within timeout`));
}, 5000);
worker.once("message", msg => {
clearTimeout(timeout);
if (msg.error) {
reject(new Error(msg.error));
} else {
resolve();
}
});
});
}
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
}
describe("Worker Memory Leak Tests", () => {
let initialMemory: MemorySnapshot;
beforeAll(async () => {
await forceGCAndSettle();
initialMemory = takeMemorySnapshot();
console.log("Initial memory:", initialMemory);
});
test(
"workers should not leak memory with basic create/terminate cycles",
async () => {
const beforeTest = takeMemorySnapshot();
const workerCode = `
const { parentPort } = require('worker_threads');
parentPort.postMessage('ready');
`;
const testPromise = (async () => {
for (let iteration = 0; iteration < CONFIG.STRESS_ITERATIONS; iteration++) {
const workers: WebWorker[] = [];
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} warmup iterations...`);
for (let i = 0; i < CONFIG.SMALL_BATCH_SIZE; i++) {
const worker = new WebWorker(
`
const { parentPort } = require('worker_threads');
parentPort.postMessage('ready');
`,
{ eval: true },
);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`Warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
workers.push(worker);
const baselineMemory = takeMemorySnapshot();
console.log("Baseline memory after warmup:", baselineMemory);
await new Promise<void>(resolve => {
worker.once("message", () => resolve());
});
}
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
console.log(`Running ${CONFIG.TEST_ITERATIONS} test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 5 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`Iteration ${iteration + 1}/${CONFIG.STRESS_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
console.log(`Test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
})();
}
await withTimeout(testPromise, CONFIG.TEST_TIMEOUT_MS, "Basic worker test");
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Basic create/terminate test");
const afterTest = takeMemorySnapshot();
logMemoryDiff(beforeTest, afterTest, "Basic create/terminate test");
const memoryIncrease = afterTest.rss - beforeTest.rss;
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
@@ -110,132 +124,123 @@ describe("Worker Memory Leak Tests", () => {
test(
"workers with HTTP activity should not leak memory",
async () => {
const beforeTest = takeMemorySnapshot();
const server = Bun.serve({
using server = Bun.serve({
port: 0,
fetch() {
return new Response("OK");
},
});
try {
const testPromise = (async () => {
for (let iteration = 0; iteration < CONFIG.STRESS_ITERATIONS; iteration++) {
const workers: WebWorker[] = [];
for (let i = 0; i < CONFIG.SMALL_BATCH_SIZE; i++) {
const worker = new WebWorker(
`
const { parentPort } = require('worker_threads');
async function doWork() {
try {
const response = await fetch('http://localhost:${server.port}');
await response.text();
parentPort.postMessage('done');
} catch (err) {
parentPort.postMessage({ error: err.message });
}
}
doWork();
`,
{ eval: true },
);
workers.push(worker);
await new Promise<void>((resolve, reject) => {
worker.once("message", msg => {
if (msg.error) {
reject(new Error(msg.error));
} else {
resolve();
}
});
});
}
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
const currentMemory = takeMemorySnapshot();
console.log(`HTTP iteration ${iteration + 1}/${CONFIG.STRESS_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
const workerCode = `
const { parentPort } = require('worker_threads');
async function doWork() {
try {
const response = await fetch('http://localhost:${server.port}');
await response.text();
parentPort.postMessage('done');
} catch (err) {
parentPort.postMessage({ error: err.message });
}
})();
}
doWork();
`;
await withTimeout(testPromise, CONFIG.TEST_TIMEOUT_MS, "HTTP worker test");
} finally {
server.stop(true);
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} HTTP warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`HTTP warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const afterTest = takeMemorySnapshot();
logMemoryDiff(beforeTest, afterTest, "HTTP activity test");
const baselineMemory = takeMemorySnapshot();
console.log("HTTP baseline memory after warmup:", baselineMemory);
const memoryIncrease = afterTest.rss - beforeTest.rss;
console.log(`Running ${CONFIG.TEST_ITERATIONS} HTTP test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 5 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`HTTP test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "HTTP activity test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
test(
"workers with simple message passing should not leak memory",
"workers with message passing should not leak memory",
async () => {
const beforeTest = takeMemorySnapshot();
const workerCode = `
const { parentPort } = require('worker_threads');
parentPort.on('message', (msg) => {
if (msg === 'start') {
for (let j = 0; j < 10; j++) {
parentPort.postMessage({ count: j, data: 'x'.repeat(1000) });
}
parentPort.postMessage('done');
}
});
`;
const testPromise = (async () => {
for (let iteration = 0; iteration < CONFIG.STRESS_ITERATIONS; iteration++) {
const workers: WebWorker[] = [];
async function runMessagePassingBatch(): Promise<void> {
const workers: WebWorker[] = [];
for (let i = 0; i < CONFIG.SMALL_BATCH_SIZE; i++) {
const worker = new WebWorker(
`
const { parentPort } = require('worker_threads');
parentPort.on('message', (msg) => {
if (msg === 'start') {
for (let j = 0; j < 5; j++) {
parentPort.postMessage({ count: j, data: 'x'.repeat(100) });
}
parentPort.postMessage('done');
for (let i = 0; i < CONFIG.BATCH_SIZE; i++) {
const worker = new WebWorker(workerCode, { eval: true });
workers.push(worker);
await new Promise<void>(resolve => {
worker.on("message", msg => {
if (msg === "done") {
resolve();
}
});
`,
{ eval: true },
);
worker.postMessage("start");
});
}
workers.push(worker);
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
}
await new Promise<void>(resolve => {
let messageCount = 0;
worker.on("message", msg => {
if (msg === "done") {
resolve();
} else {
messageCount++;
}
});
worker.postMessage("start");
});
}
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} message passing warmup iterations...`);
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runMessagePassingBatch();
console.log(`Message passing warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("Message passing baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} message passing test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runMessagePassingBatch();
if ((i + 1) % 5 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(
`Message passing iteration ${iteration + 1}/${CONFIG.STRESS_ITERATIONS} - RSS: ${currentMemory.rss}MB`,
`Message passing test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`,
);
}
})();
}
await withTimeout(testPromise, CONFIG.TEST_TIMEOUT_MS, "Message passing test");
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Message passing test");
const afterTest = takeMemorySnapshot();
logMemoryDiff(beforeTest, afterTest, "Message passing test");
const memoryIncrease = afterTest.rss - beforeTest.rss;
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
@@ -244,72 +249,45 @@ describe("Worker Memory Leak Tests", () => {
test(
"workers with timers should not leak memory",
async () => {
const beforeTest = takeMemorySnapshot();
const testPromise = (async () => {
for (let iteration = 0; iteration < CONFIG.STRESS_ITERATIONS; iteration++) {
const workers: WebWorker[] = [];
for (let i = 0; i < CONFIG.SMALL_BATCH_SIZE; i++) {
const worker = new WebWorker(
`
const { parentPort } = require('worker_threads');
const timers = [];
for (let i = 0; i < 3; i++) {
timers.push(setTimeout(() => {}, 10000));
timers.push(setInterval(() => {}, 1000));
}
parentPort.postMessage('ready');
`,
{ eval: true },
);
workers.push(worker);
await new Promise<void>(resolve => {
worker.once("message", () => resolve());
});
}
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
const currentMemory = takeMemorySnapshot();
console.log(`Timer iteration ${iteration + 1}/${CONFIG.STRESS_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
const workerCode = `
const { parentPort } = require('worker_threads');
const timers = [];
for (let i = 0; i < 5; i++) {
timers.push(setTimeout(() => {}, 10000));
timers.push(setInterval(() => {}, 1000));
}
})();
parentPort.postMessage('ready');
`;
await withTimeout(testPromise, CONFIG.TEST_TIMEOUT_MS, "Timer test");
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} timer warmup iterations...`);
const afterTest = takeMemorySnapshot();
logMemoryDiff(beforeTest, afterTest, "Timer cleanup test");
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`Timer warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const memoryIncrease = afterTest.rss - beforeTest.rss;
const baselineMemory = takeMemorySnapshot();
console.log("Timer baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} timer test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 5 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`Timer test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Timer cleanup test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
test("memory leak detection summary", async () => {
await forceGCAndSettle();
const finalMemory = takeMemorySnapshot();
logMemoryDiff(initialMemory, finalMemory, "Overall test suite");
const totalIncrease = finalMemory.rss - initialMemory.rss;
console.log(`Total memory increase from start: ${totalIncrease}MB`);
if (totalIncrease > 500) {
console.error(`🚨 MAJOR MEMORY LEAK DETECTED: +${totalIncrease}MB`);
console.error("This indicates workers are not properly cleaning up resources");
} else if (totalIncrease > 100) {
console.warn(`⚠️ Moderate memory increase: +${totalIncrease}MB`);
} else {
console.log(`✅ Memory usage looks reasonable: +${totalIncrease}MB`);
}
expect(totalIncrease).toBeLessThan(3000);
}, 15000);
});