diff --git a/test/js/web/workers/worker-memory-leak.test.ts b/test/js/web/workers/worker-memory-leak.test.ts index 57e66d89d0..29d9ca4357 100644 --- a/test/js/web/workers/worker-memory-leak.test.ts +++ b/test/js/web/workers/worker-memory-leak.test.ts @@ -1,13 +1,13 @@ -import { beforeAll, describe, expect, test } from "bun:test"; +import { describe, expect, test } from "bun:test"; import { Worker as WebWorker } from "worker_threads"; const CONFIG = { - SMALL_BATCH_SIZE: 5, - LARGE_BATCH_SIZE: 50, - STRESS_ITERATIONS: 3, - MEMORY_THRESHOLD_MB: 200, + WARMUP_ITERATIONS: 5, + TEST_ITERATIONS: 20, + BATCH_SIZE: 10, + MEMORY_THRESHOLD_MB: 20, GC_SETTLE_TIME: 200, - TEST_TIMEOUT_MS: 10000, + TEST_TIMEOUT_MS: 30000, }; interface MemorySnapshot { @@ -42,7 +42,7 @@ function logMemoryDiff(before: MemorySnapshot, after: MemorySnapshot, label: str heap: `${before.heapUsed}MB -> ${after.heapUsed}MB (${heapDiff >= 0 ? "+" : ""}${heapDiff}MB)`, }); - if (rssDiff > 100) { + if (rssDiff > 50) { console.warn(`⚠️ Large memory increase detected: +${rssDiff}MB RSS`); } } @@ -54,54 +54,68 @@ async function withTimeout(promise: Promise, ms: number, description: stri return Promise.race([promise, timeout]); } +async function runWorkerBatch(workerCode: string, batchSize: number = CONFIG.BATCH_SIZE): Promise { + const workers: WebWorker[] = []; + + for (let i = 0; i < batchSize; i++) { + const worker = new WebWorker(workerCode, { eval: true }); + workers.push(worker); + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + worker.removeAllListeners(); + reject(new Error(`Worker ${i} failed to respond within timeout`)); + }, 5000); + + worker.once("message", msg => { + clearTimeout(timeout); + if (msg.error) { + reject(new Error(msg.error)); + } else { + resolve(); + } + }); + }); + } + + await Promise.all(workers.map(worker => worker.terminate())); + await forceGCAndSettle(); +} + describe("Worker Memory Leak Tests", () => { - let initialMemory: MemorySnapshot; - - beforeAll(async () => { - await forceGCAndSettle(); - initialMemory = takeMemorySnapshot(); - console.log("Initial memory:", initialMemory); - }); - test( "workers should not leak memory with basic create/terminate cycles", async () => { - const beforeTest = takeMemorySnapshot(); + const workerCode = ` + const { parentPort } = require('worker_threads'); + parentPort.postMessage('ready'); + `; - const testPromise = (async () => { - for (let iteration = 0; iteration < CONFIG.STRESS_ITERATIONS; iteration++) { - const workers: WebWorker[] = []; + console.log(`Running ${CONFIG.WARMUP_ITERATIONS} warmup iterations...`); - for (let i = 0; i < CONFIG.SMALL_BATCH_SIZE; i++) { - const worker = new WebWorker( - ` - const { parentPort } = require('worker_threads'); - parentPort.postMessage('ready'); - `, - { eval: true }, - ); + // warmup + for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) { + await runWorkerBatch(workerCode); + console.log(`Warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`); + } - workers.push(worker); + const baselineMemory = takeMemorySnapshot(); + console.log("Baseline memory after warmup:", baselineMemory); - await new Promise(resolve => { - worker.once("message", () => resolve()); - }); - } - - await Promise.all(workers.map(worker => worker.terminate())); - await forceGCAndSettle(); + console.log(`Running ${CONFIG.TEST_ITERATIONS} test iterations...`); + for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) { + await runWorkerBatch(workerCode); + if ((i + 1) % 5 === 0) { const currentMemory = takeMemorySnapshot(); - console.log(`Iteration ${iteration + 1}/${CONFIG.STRESS_ITERATIONS} - RSS: ${currentMemory.rss}MB`); + console.log(`Test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`); } - })(); + } - await withTimeout(testPromise, CONFIG.TEST_TIMEOUT_MS, "Basic worker test"); + const finalMemory = takeMemorySnapshot(); + logMemoryDiff(baselineMemory, finalMemory, "Basic create/terminate test"); - const afterTest = takeMemorySnapshot(); - logMemoryDiff(beforeTest, afterTest, "Basic create/terminate test"); - - const memoryIncrease = afterTest.rss - beforeTest.rss; + const memoryIncrease = finalMemory.rss - baselineMemory.rss; expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB); }, CONFIG.TEST_TIMEOUT_MS, @@ -110,132 +124,123 @@ describe("Worker Memory Leak Tests", () => { test( "workers with HTTP activity should not leak memory", async () => { - const beforeTest = takeMemorySnapshot(); - - const server = Bun.serve({ + using server = Bun.serve({ port: 0, fetch() { return new Response("OK"); }, }); - try { - const testPromise = (async () => { - for (let iteration = 0; iteration < CONFIG.STRESS_ITERATIONS; iteration++) { - const workers: WebWorker[] = []; - - for (let i = 0; i < CONFIG.SMALL_BATCH_SIZE; i++) { - const worker = new WebWorker( - ` - const { parentPort } = require('worker_threads'); - - async function doWork() { - try { - const response = await fetch('http://localhost:${server.port}'); - await response.text(); - parentPort.postMessage('done'); - } catch (err) { - parentPort.postMessage({ error: err.message }); - } - } - - doWork(); - `, - { eval: true }, - ); - - workers.push(worker); - - await new Promise((resolve, reject) => { - worker.once("message", msg => { - if (msg.error) { - reject(new Error(msg.error)); - } else { - resolve(); - } - }); - }); - } - - await Promise.all(workers.map(worker => worker.terminate())); - await forceGCAndSettle(); - - const currentMemory = takeMemorySnapshot(); - console.log(`HTTP iteration ${iteration + 1}/${CONFIG.STRESS_ITERATIONS} - RSS: ${currentMemory.rss}MB`); + const workerCode = ` + const { parentPort } = require('worker_threads'); + + async function doWork() { + try { + const response = await fetch('http://localhost:${server.port}'); + await response.text(); + parentPort.postMessage('done'); + } catch (err) { + parentPort.postMessage({ error: err.message }); } - })(); + } + + doWork(); + `; - await withTimeout(testPromise, CONFIG.TEST_TIMEOUT_MS, "HTTP worker test"); - } finally { - server.stop(true); + console.log(`Running ${CONFIG.WARMUP_ITERATIONS} HTTP warmup iterations...`); + + // warmup + for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) { + await runWorkerBatch(workerCode); + console.log(`HTTP warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`); } - const afterTest = takeMemorySnapshot(); - logMemoryDiff(beforeTest, afterTest, "HTTP activity test"); + const baselineMemory = takeMemorySnapshot(); + console.log("HTTP baseline memory after warmup:", baselineMemory); - const memoryIncrease = afterTest.rss - beforeTest.rss; + console.log(`Running ${CONFIG.TEST_ITERATIONS} HTTP test iterations...`); + for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) { + await runWorkerBatch(workerCode); + + if ((i + 1) % 5 === 0) { + const currentMemory = takeMemorySnapshot(); + console.log(`HTTP test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`); + } + } + + const finalMemory = takeMemorySnapshot(); + logMemoryDiff(baselineMemory, finalMemory, "HTTP activity test"); + + const memoryIncrease = finalMemory.rss - baselineMemory.rss; expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB); }, CONFIG.TEST_TIMEOUT_MS, ); test( - "workers with simple message passing should not leak memory", + "workers with message passing should not leak memory", async () => { - const beforeTest = takeMemorySnapshot(); + const workerCode = ` + const { parentPort } = require('worker_threads'); + + parentPort.on('message', (msg) => { + if (msg === 'start') { + for (let j = 0; j < 10; j++) { + parentPort.postMessage({ count: j, data: 'x'.repeat(1000) }); + } + parentPort.postMessage('done'); + } + }); + `; - const testPromise = (async () => { - for (let iteration = 0; iteration < CONFIG.STRESS_ITERATIONS; iteration++) { - const workers: WebWorker[] = []; + async function runMessagePassingBatch(): Promise { + const workers: WebWorker[] = []; - for (let i = 0; i < CONFIG.SMALL_BATCH_SIZE; i++) { - const worker = new WebWorker( - ` - const { parentPort } = require('worker_threads'); - - parentPort.on('message', (msg) => { - if (msg === 'start') { - for (let j = 0; j < 5; j++) { - parentPort.postMessage({ count: j, data: 'x'.repeat(100) }); - } - parentPort.postMessage('done'); + for (let i = 0; i < CONFIG.BATCH_SIZE; i++) { + const worker = new WebWorker(workerCode, { eval: true }); + workers.push(worker); + + await new Promise(resolve => { + worker.on("message", msg => { + if (msg === "done") { + resolve(); } }); - `, - { eval: true }, - ); + worker.postMessage("start"); + }); + } - workers.push(worker); + await Promise.all(workers.map(worker => worker.terminate())); + await forceGCAndSettle(); + } - await new Promise(resolve => { - let messageCount = 0; - worker.on("message", msg => { - if (msg === "done") { - resolve(); - } else { - messageCount++; - } - }); - worker.postMessage("start"); - }); - } + console.log(`Running ${CONFIG.WARMUP_ITERATIONS} message passing warmup iterations...`); - await Promise.all(workers.map(worker => worker.terminate())); - await forceGCAndSettle(); + // warmup + for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) { + await runMessagePassingBatch(); + console.log(`Message passing warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`); + } + const baselineMemory = takeMemorySnapshot(); + console.log("Message passing baseline memory after warmup:", baselineMemory); + + console.log(`Running ${CONFIG.TEST_ITERATIONS} message passing test iterations...`); + for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) { + await runMessagePassingBatch(); + + if ((i + 1) % 5 === 0) { const currentMemory = takeMemorySnapshot(); console.log( - `Message passing iteration ${iteration + 1}/${CONFIG.STRESS_ITERATIONS} - RSS: ${currentMemory.rss}MB`, + `Message passing test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`, ); } - })(); + } - await withTimeout(testPromise, CONFIG.TEST_TIMEOUT_MS, "Message passing test"); + const finalMemory = takeMemorySnapshot(); + logMemoryDiff(baselineMemory, finalMemory, "Message passing test"); - const afterTest = takeMemorySnapshot(); - logMemoryDiff(beforeTest, afterTest, "Message passing test"); - - const memoryIncrease = afterTest.rss - beforeTest.rss; + const memoryIncrease = finalMemory.rss - baselineMemory.rss; expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB); }, CONFIG.TEST_TIMEOUT_MS, @@ -244,72 +249,45 @@ describe("Worker Memory Leak Tests", () => { test( "workers with timers should not leak memory", async () => { - const beforeTest = takeMemorySnapshot(); - - const testPromise = (async () => { - for (let iteration = 0; iteration < CONFIG.STRESS_ITERATIONS; iteration++) { - const workers: WebWorker[] = []; - - for (let i = 0; i < CONFIG.SMALL_BATCH_SIZE; i++) { - const worker = new WebWorker( - ` - const { parentPort } = require('worker_threads'); - - const timers = []; - for (let i = 0; i < 3; i++) { - timers.push(setTimeout(() => {}, 10000)); - timers.push(setInterval(() => {}, 1000)); - } - - parentPort.postMessage('ready'); - `, - { eval: true }, - ); - - workers.push(worker); - - await new Promise(resolve => { - worker.once("message", () => resolve()); - }); - } - - await Promise.all(workers.map(worker => worker.terminate())); - await forceGCAndSettle(); - - const currentMemory = takeMemorySnapshot(); - console.log(`Timer iteration ${iteration + 1}/${CONFIG.STRESS_ITERATIONS} - RSS: ${currentMemory.rss}MB`); + const workerCode = ` + const { parentPort } = require('worker_threads'); + + const timers = []; + for (let i = 0; i < 5; i++) { + timers.push(setTimeout(() => {}, 10000)); + timers.push(setInterval(() => {}, 1000)); } - })(); + + parentPort.postMessage('ready'); + `; - await withTimeout(testPromise, CONFIG.TEST_TIMEOUT_MS, "Timer test"); + console.log(`Running ${CONFIG.WARMUP_ITERATIONS} timer warmup iterations...`); - const afterTest = takeMemorySnapshot(); - logMemoryDiff(beforeTest, afterTest, "Timer cleanup test"); + // warmup + for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) { + await runWorkerBatch(workerCode); + console.log(`Timer warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`); + } - const memoryIncrease = afterTest.rss - beforeTest.rss; + const baselineMemory = takeMemorySnapshot(); + console.log("Timer baseline memory after warmup:", baselineMemory); + + console.log(`Running ${CONFIG.TEST_ITERATIONS} timer test iterations...`); + for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) { + await runWorkerBatch(workerCode); + + if ((i + 1) % 5 === 0) { + const currentMemory = takeMemorySnapshot(); + console.log(`Timer test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`); + } + } + + const finalMemory = takeMemorySnapshot(); + logMemoryDiff(baselineMemory, finalMemory, "Timer cleanup test"); + + const memoryIncrease = finalMemory.rss - baselineMemory.rss; expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB); }, CONFIG.TEST_TIMEOUT_MS, ); - - test("memory leak detection summary", async () => { - await forceGCAndSettle(); - - const finalMemory = takeMemorySnapshot(); - logMemoryDiff(initialMemory, finalMemory, "Overall test suite"); - - const totalIncrease = finalMemory.rss - initialMemory.rss; - console.log(`Total memory increase from start: ${totalIncrease}MB`); - - if (totalIncrease > 500) { - console.error(`🚨 MAJOR MEMORY LEAK DETECTED: +${totalIncrease}MB`); - console.error("This indicates workers are not properly cleaning up resources"); - } else if (totalIncrease > 100) { - console.warn(`⚠️ Moderate memory increase: +${totalIncrease}MB`); - } else { - console.log(`✅ Memory usage looks reasonable: +${totalIncrease}MB`); - } - - expect(totalIncrease).toBeLessThan(3000); - }, 15000); });