diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 5ac9975619..05a1889485 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -420,6 +420,8 @@ pub const SendQueue = struct { internal_msg_queue: node_cluster_binding.InternalMsgHolder = .{}, incoming: bun.ByteList = .{}, // Maybe we should use StreamBuffer here as well incoming_fd: ?bun.FileDescriptor = null, + /// Counter for processed messages; used to trigger periodic memory reclamation + messages_processed: u32 = 0, socket: SocketUnion, owner: SendQueueOwner, @@ -791,6 +793,30 @@ pub const SendQueue = struct { }; } + /// Reclaim excess memory from buffers. Called periodically after processing messages. + fn reclaimMemory(this: *SendQueue) void { + // Shrink the incoming buffer if it has excess capacity. + // Only reclaim if capacity exceeds 2MB to avoid frequent reallocations. + const shrink_threshold = 2 * 1024 * 1024; + const min_capacity = 4096; + + if (this.incoming.cap > shrink_threshold and this.incoming.len < this.incoming.cap / 4) { + const new_cap = @max(this.incoming.len * 2, min_capacity); + if (new_cap < this.incoming.cap) { + this.incoming.shrinkAndFree(bun.default_allocator, new_cap); + } + } + + // Shrink the send queue if it has excess capacity (2MB worth of SendHandle entries) + const queue_shrink_threshold = shrink_threshold / @sizeOf(SendHandle); + if (this.queue.capacity > queue_shrink_threshold and this.queue.items.len < this.queue.capacity / 4) { + const new_cap = @max(this.queue.items.len * 2, 16); + if (new_cap < this.queue.capacity) { + this.queue.shrinkAndFree(new_cap); + } + } + } + /// starts a write request. on posix, this always calls _onWriteComplete immediately. on windows, it may /// call _onWriteComplete later. fn _write(this: *SendQueue, data: []const u8, fd: ?bun.FileDescriptor) void { @@ -1053,6 +1079,7 @@ fn handleIPCMessage(send_queue: *SendQueue, message: DecodedIPCMessage, globalTh _ = globalThis.takeException(e); break :handle_message; }; + defer cmd_str.deref(); if (cmd_str.eqlComptime("NODE_HANDLE")) { internal_command = .{ .handle = msg_data }; } else if (cmd_str.eqlComptime("NODE_HANDLE_ACK")) { @@ -1135,11 +1162,13 @@ fn onData2(send_queue: *SendQueue, all_data: []const u8) void { // Decode the message with just the temporary buffer, and if that // fails (not enough bytes) then we allocate to .ipc_buffer if (send_queue.incoming.len == 0) { + var messages_in_batch: u32 = 0; while (true) { const result = decodeIPCMessage(send_queue.mode, data, globalThis) catch |e| switch (e) { error.NotEnoughBytes => { _ = bun.handleOom(send_queue.incoming.write(bun.default_allocator, data)); log("hit NotEnoughBytes", .{}); + send_queue.messages_processed +%= messages_in_batch; return; }, error.InvalidFormat, error.JSError, error.JSTerminated => { @@ -1154,10 +1183,16 @@ fn onData2(send_queue: *SendQueue, all_data: []const u8) void { }; handleIPCMessage(send_queue, result.message, globalThis); + messages_in_batch += 1; if (result.bytes_consumed < data.len) { data = data[result.bytes_consumed..]; } else { + send_queue.messages_processed +%= messages_in_batch; + // Periodically reclaim memory (every 256 messages) + if (send_queue.messages_processed & 0xFF == 0) { + send_queue.reclaimMemory(); + } return; } } @@ -1166,6 +1201,7 @@ fn onData2(send_queue: *SendQueue, all_data: []const u8) void { _ = bun.handleOom(send_queue.incoming.write(bun.default_allocator, data)); var slice = send_queue.incoming.slice(); + var messages_in_batch: u32 = 0; while (true) { const result = decodeIPCMessage(send_queue.mode, slice, globalThis) catch |e| switch (e) { error.NotEnoughBytes => { @@ -1173,6 +1209,7 @@ fn onData2(send_queue: *SendQueue, all_data: []const u8) void { bun.copy(u8, send_queue.incoming.ptr[0..slice.len], slice); send_queue.incoming.len = @truncate(slice.len); log("hit NotEnoughBytes2", .{}); + send_queue.messages_processed +%= messages_in_batch; return; }, error.InvalidFormat, error.JSError, error.JSTerminated => { @@ -1187,12 +1224,18 @@ fn onData2(send_queue: *SendQueue, all_data: []const u8) void { }; handleIPCMessage(send_queue, result.message, globalThis); + messages_in_batch += 1; if (result.bytes_consumed < slice.len) { slice = slice[result.bytes_consumed..]; } else { // clear the buffer send_queue.incoming.len = 0; + send_queue.messages_processed +%= messages_in_batch; + // Periodically reclaim memory (every 256 messages) + if (send_queue.messages_processed & 0xFF == 0) { + send_queue.reclaimMemory(); + } return; } } @@ -1327,6 +1370,7 @@ pub const IPCHandlers = struct { bun.assert(send_queue.incoming.len <= send_queue.incoming.cap); bun.assert(bun.isSliceInBuffer(buffer, send_queue.incoming.allocatedSlice())); + var messages_in_batch: u32 = 0; while (true) { const result = decodeIPCMessage(send_queue.mode, slice, globalThis) catch |e| switch (e) { error.NotEnoughBytes => { @@ -1334,6 +1378,7 @@ pub const IPCHandlers = struct { bun.copy(u8, send_queue.incoming.ptr[0..slice.len], slice); send_queue.incoming.len = @truncate(slice.len); log("hit NotEnoughBytes3", .{}); + send_queue.messages_processed +%= messages_in_batch; return; }, error.InvalidFormat, error.JSError, error.JSTerminated => { @@ -1348,12 +1393,18 @@ pub const IPCHandlers = struct { }; handleIPCMessage(send_queue, result.message, globalThis); + messages_in_batch += 1; if (result.bytes_consumed < slice.len) { slice = slice[result.bytes_consumed..]; } else { // clear the buffer send_queue.incoming.len = 0; + send_queue.messages_processed +%= messages_in_batch; + // Periodically reclaim memory (every 256 messages) + if (send_queue.messages_processed & 0xFF == 0) { + send_queue.reclaimMemory(); + } return; } } diff --git a/test/js/bun/spawn/spawn-ipc-memory.test.ts b/test/js/bun/spawn/spawn-ipc-memory.test.ts new file mode 100644 index 0000000000..8ef6b9344a --- /dev/null +++ b/test/js/bun/spawn/spawn-ipc-memory.test.ts @@ -0,0 +1,291 @@ +import { describe, expect, test } from "bun:test"; +import { bunEnv, bunExe, tempDir } from "harness"; + +describe("IPC memory leak", () => { + test("sustained IPC messaging should reach steady state memory", async () => { + // This test runs multiple rounds of IPC messaging and checks that + // memory stabilizes rather than growing continuously. + // We use heapUsed instead of rss for more accurate JS memory tracking. + using dir = tempDir("ipc-sustained", { + "child.js": ` + process.on("message", (msg) => { + if (msg.type === "ping") { + process.send({ type: "pong", round: msg.round }); + } else if (msg.type === "done") { + process.exit(0); + } + }); + `, + "parent.js": ` + const messagesPerRound = 500; + const rounds = 6; + let currentRound = 0; + let messagesInRound = 0; + const memoryByRound = []; + + const proc = Bun.spawn([process.execPath, "child.js"], { + cwd: import.meta.dir, + env: { ...process.env }, + ipc: (message, subprocess) => { + if (message.type === "pong") { + messagesInRound++; + + if (messagesInRound >= messagesPerRound) { + // Round complete, record memory after GC + Bun.gc(true); + memoryByRound.push(process.memoryUsage().heapUsed); + + currentRound++; + messagesInRound = 0; + + if (currentRound >= rounds) { + subprocess.send({ type: "done" }); + + // Analyze memory trend - skip first round as warmup + const stableRounds = memoryByRound.slice(1); + const firstStable = stableRounds[0]; + const lastStable = stableRounds[stableRounds.length - 1]; + const growthOverRounds = lastStable - firstStable; + const growthPerRound = growthOverRounds / (stableRounds.length - 1); + + // Calculate average to check for stability + const avg = stableRounds.reduce((a, b) => a + b, 0) / stableRounds.length; + const maxDev = Math.max(...stableRounds.map(m => Math.abs(m - avg))); + + console.log(JSON.stringify({ + memoryByRound, + stableRounds, + growthPerRound, + totalGrowth: growthOverRounds, + avgMemory: avg, + maxDeviation: maxDev, + // Relative deviation should be small for stable memory + relativeDeviation: maxDev / avg + })); + process.exit(0); + } else { + // Start next round + for (let i = 0; i < messagesPerRound; i++) { + subprocess.send({ type: "ping", round: currentRound }); + } + } + } + } + }, + stdio: ["inherit", "inherit", "inherit"], + serialization: "json", + }); + + // Start first round + for (let i = 0; i < messagesPerRound; i++) { + proc.send({ type: "ping", round: currentRound }); + } + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), "parent.js"], + env: bunEnv, + cwd: String(dir), + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + if (stderr) { + console.error("stderr:", stderr); + } + + expect(exitCode).toBe(0); + + if (stdout.trim()) { + const result = JSON.parse(stdout.trim()); + console.log("Sustained IPC memory:", result); + + // Memory should stabilize after warmup - growth per round should be minimal + // Allow up to 100KB per round which accounts for normal variation + // A memory leak would show continuous growth much larger than this + expect(result.growthPerRound).toBeLessThan(100 * 1024); + + // Relative deviation from average should be small (less than 50%) + // indicating memory has reached a steady state + expect(result.relativeDeviation).toBeLessThan(0.5); + } + }); + + test("large message batches should not accumulate memory in incoming buffer", async () => { + // This test sends large batches of messages to stress the incoming buffer handling + using dir = tempDir("ipc-batch", { + "child.js": ` + // Send messages in batches + const batchSize = 100; + const batches = 10; + + for (let b = 0; b < batches; b++) { + for (let i = 0; i < batchSize; i++) { + process.send({ batch: b, index: i, padding: "x".repeat(50) }); + } + } + process.send({ done: true, totalSent: batchSize * batches }); + `, + "parent.js": ` + let messageCount = 0; + const memorySnapshots = []; + + const proc = Bun.spawn([process.execPath, "child.js"], { + cwd: import.meta.dir, + env: { ...process.env }, + ipc: (message, subprocess) => { + messageCount++; + + // Take memory snapshots at intervals + if (messageCount % 250 === 0) { + Bun.gc(true); + memorySnapshots.push(process.memoryUsage().heapUsed); + } + + if (message.done) { + subprocess.kill(); + + Bun.gc(true); + memorySnapshots.push(process.memoryUsage().heapUsed); + + // Check if memory grew linearly (indicating leak) or stayed stable + const firstSnapshot = memorySnapshots[0]; + const lastSnapshot = memorySnapshots[memorySnapshots.length - 1]; + + console.log(JSON.stringify({ + messageCount, + expectedMessages: message.totalSent + 1, + memorySnapshots, + firstSnapshot, + lastSnapshot, + growth: lastSnapshot - firstSnapshot + })); + process.exit(0); + } + }, + stdio: ["inherit", "inherit", "inherit"], + serialization: "json", + }); + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), "parent.js"], + env: bunEnv, + cwd: String(dir), + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + if (stderr) { + console.error("stderr:", stderr); + } + + expect(exitCode).toBe(0); + + if (stdout.trim()) { + const result = JSON.parse(stdout.trim()); + console.log("Batch IPC memory:", result); + + // All messages should have been received + expect(result.messageCount).toBe(result.expectedMessages); + + // Memory growth should be bounded - not growing linearly with message count + // Allow up to 500KB total growth (not per message!) + expect(result.growth).toBeLessThan(500 * 1024); + } + }); + + test("IPC with callbacks should not leak callback memory", async () => { + // Tests that send() callbacks are properly cleaned up + using dir = tempDir("ipc-callback", { + "child.js": ` + // Echo back messages + process.on("message", (msg) => { + if (msg.type === "done") { + process.exit(0); + } + process.send({ echo: msg.index }); + }); + `, + "parent.js": ` + const messageCount = 1000; + let received = 0; + let callbacksCalled = 0; + const memorySnapshots = []; + + const proc = Bun.spawn([process.execPath, "child.js"], { + cwd: import.meta.dir, + env: { ...process.env }, + ipc: (message, subprocess) => { + received++; + + if (received % 250 === 0) { + Bun.gc(true); + memorySnapshots.push(process.memoryUsage().heapUsed); + } + + if (received >= messageCount) { + subprocess.send({ type: "done" }); + + Bun.gc(true); + memorySnapshots.push(process.memoryUsage().heapUsed); + + console.log(JSON.stringify({ + received, + callbacksCalled, + memorySnapshots, + growth: memorySnapshots[memorySnapshots.length - 1] - memorySnapshots[0] + })); + process.exit(0); + } + }, + stdio: ["inherit", "inherit", "inherit"], + serialization: "json", + }); + + // Send messages with callbacks + for (let i = 0; i < messageCount; i++) { + proc.send({ index: i }, () => { + callbacksCalled++; + }); + } + `, + }); + + await using proc = Bun.spawn({ + cmd: [bunExe(), "parent.js"], + env: bunEnv, + cwd: String(dir), + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + + if (stderr) { + console.error("stderr:", stderr); + } + + expect(exitCode).toBe(0); + + if (stdout.trim()) { + const result = JSON.parse(stdout.trim()); + console.log("Callback IPC memory:", result); + + // All messages received + expect(result.received).toBe(1000); + + // Callbacks should have been called + expect(result.callbacksCalled).toBe(1000); + + // Memory growth should be bounded + expect(result.growth).toBeLessThan(500 * 1024); + } + }); +});