mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
fix(spawn): memory leak in "pipe"d stdout/stderr (#18316)
Co-authored-by: DonIsaac <22823424+DonIsaac@users.noreply.github.com> Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
This commit is contained in:
@@ -311,7 +311,7 @@ pub fn onCloseIO(this: *Subprocess, kind: StdioKind) void {
|
||||
switch (out.*) {
|
||||
.pipe => |pipe| {
|
||||
if (pipe.state == .done) {
|
||||
out.* = .{ .buffer = pipe.state.done };
|
||||
out.* = .{ .buffer = CowString.initOwned(pipe.state.done, bun.default_allocator) };
|
||||
pipe.state = .{ .done = &.{} };
|
||||
} else {
|
||||
out.* = .{ .ignore = {} };
|
||||
@@ -376,12 +376,18 @@ const Readable = union(enum) {
|
||||
inherit: void,
|
||||
ignore: void,
|
||||
closed: void,
|
||||
buffer: []u8,
|
||||
/// Eventually we will implement Readables created from blobs and array buffers.
|
||||
/// When we do that, `buffer` will be borrowed from those objects.
|
||||
///
|
||||
/// When a buffered `pipe` finishes reading from its file descriptor,
|
||||
/// the owning `Readable` will be convered into this variant and the pipe's
|
||||
/// buffer will be taken as an owned `CowString`.
|
||||
buffer: CowString,
|
||||
|
||||
pub fn memoryCost(this: *const Readable) usize {
|
||||
return switch (this.*) {
|
||||
.pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(),
|
||||
.buffer => this.buffer.len,
|
||||
.buffer => this.buffer.length(),
|
||||
else => 0,
|
||||
};
|
||||
}
|
||||
@@ -484,6 +490,9 @@ const Readable = union(enum) {
|
||||
defer pipe.detach();
|
||||
this.* = .{ .closed = {} };
|
||||
},
|
||||
.buffer => |buf| {
|
||||
buf.deinit(bun.default_allocator);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
@@ -502,14 +511,17 @@ const Readable = union(enum) {
|
||||
this.* = .{ .closed = {} };
|
||||
return pipe.toJS(globalThis);
|
||||
},
|
||||
.buffer => |buffer| {
|
||||
.buffer => |*buffer| {
|
||||
defer this.* = .{ .closed = {} };
|
||||
|
||||
if (buffer.len == 0) {
|
||||
if (buffer.length() == 0) {
|
||||
return JSC.WebCore.ReadableStream.empty(globalThis);
|
||||
}
|
||||
|
||||
const blob = JSC.WebCore.Blob.init(buffer, bun.default_allocator, globalThis);
|
||||
const own = buffer.takeSlice(bun.default_allocator) catch {
|
||||
globalThis.throwOutOfMemory() catch return .zero;
|
||||
};
|
||||
const blob = JSC.WebCore.Blob.init(own, bun.default_allocator, globalThis);
|
||||
return JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, 0);
|
||||
},
|
||||
else => {
|
||||
@@ -535,10 +547,13 @@ const Readable = union(enum) {
|
||||
this.* = .{ .closed = {} };
|
||||
return pipe.toBuffer(globalThis);
|
||||
},
|
||||
.buffer => |buf| {
|
||||
this.* = .{ .closed = {} };
|
||||
.buffer => |*buf| {
|
||||
defer this.* = .{ .closed = {} };
|
||||
const own = buf.takeSlice(bun.default_allocator) catch {
|
||||
return globalThis.throwOutOfMemory();
|
||||
};
|
||||
|
||||
return JSC.MarkedArrayBuffer.fromBytes(buf, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
|
||||
return JSC.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
|
||||
},
|
||||
else => {
|
||||
return JSValue.jsUndefined();
|
||||
@@ -568,6 +583,9 @@ pub fn getStdout(
|
||||
globalThis: *JSGlobalObject,
|
||||
) JSValue {
|
||||
this.observable_getters.insert(.stdout);
|
||||
// NOTE: ownership of internal buffers is transferred to the JSValue, which
|
||||
// gets cached on JSSubprocess (created via bindgen). This makes it
|
||||
// re-accessable to JS code but not via `this.stdout`, which is now `.closed`.
|
||||
return this.stdout.toJS(globalThis, this.hasExited());
|
||||
}
|
||||
|
||||
@@ -2596,6 +2614,7 @@ const Global = bun.Global;
|
||||
const strings = bun.strings;
|
||||
const string = bun.string;
|
||||
const Output = bun.Output;
|
||||
const CowString = bun.ptr.CowString;
|
||||
const MutableString = bun.MutableString;
|
||||
const std = @import("std");
|
||||
const Allocator = std.mem.Allocator;
|
||||
|
||||
@@ -2066,6 +2066,7 @@ pub const Blob = struct {
|
||||
return store;
|
||||
}
|
||||
|
||||
/// Takes ownership of `bytes`, which must have been allocated with `allocator`.
|
||||
pub fn init(bytes: []u8, allocator: std.mem.Allocator) *Store {
|
||||
const store = Blob.Store.new(.{
|
||||
.data = .{
|
||||
@@ -3655,6 +3656,8 @@ pub const Blob = struct {
|
||||
/// Used by standalone module graph and the File constructor
|
||||
stored_name: bun.PathString = bun.PathString.empty,
|
||||
|
||||
/// Takes ownership of `bytes`, which must have been allocated with
|
||||
/// `allocator`.
|
||||
pub fn init(bytes: []u8, allocator: std.mem.Allocator) ByteStore {
|
||||
return .{
|
||||
.ptr = bytes.ptr,
|
||||
@@ -5000,6 +5003,7 @@ pub const Blob = struct {
|
||||
};
|
||||
}
|
||||
|
||||
/// Takes ownership of `bytes`, which must have been allocated with `allocator`.
|
||||
pub fn init(bytes: []u8, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) Blob {
|
||||
return Blob{
|
||||
.size = @as(SizeType, @truncate(bytes.len)),
|
||||
|
||||
@@ -114,6 +114,10 @@ pub fn CowSliceZ(T: type, comptime sentinel: ?T) type {
|
||||
return str.ptr[0..str.flags.len];
|
||||
}
|
||||
|
||||
pub inline fn length(str: Self) usize {
|
||||
return str.flags.len;
|
||||
}
|
||||
|
||||
/// Mutably borrow this `Cow`'s slice.
|
||||
///
|
||||
/// Borrowed `Cow`s will be automatically converted to owned, incurring
|
||||
@@ -132,6 +136,20 @@ pub fn CowSliceZ(T: type, comptime sentinel: ?T) type {
|
||||
return str.ptr[0..str.flags.len];
|
||||
}
|
||||
|
||||
/// Take ownership over this string's allocation. `str` is left in a
|
||||
/// valid, empty state.
|
||||
///
|
||||
/// Caller owns the returned memory and must deinitialize it when done.
|
||||
/// `str` may be re-used. An allocation will be incurred if and only if
|
||||
/// `str` is not owned.
|
||||
pub fn takeSlice(str: *Self, allocator: Allocator) Allocator.Error!SliceMut {
|
||||
if (!str.isOwned()) {
|
||||
try str.intoOwned(allocator);
|
||||
}
|
||||
defer str.* = Self.empty;
|
||||
return str.ptr[0..str.flags.len];
|
||||
}
|
||||
|
||||
/// Returns a new string that borrows this string's data.
|
||||
///
|
||||
/// The borrowed string should be deinitialized so that debug assertions
|
||||
@@ -169,7 +187,7 @@ pub fn CowSliceZ(T: type, comptime sentinel: ?T) type {
|
||||
if (comptime cow_str_assertions) {
|
||||
if (str.debug) |debug| {
|
||||
debug.mutex.lock();
|
||||
defer debug.mutext.unlock();
|
||||
defer debug.mutex.unlock();
|
||||
bun.assert(debug.borrows > 0);
|
||||
debug.borrows -= 1;
|
||||
str.debug = null;
|
||||
|
||||
79
test/js/bun/spawn/spawn-pipe-leak.test.ts
Normal file
79
test/js/bun/spawn/spawn-pipe-leak.test.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
/**
|
||||
* Memory leak measurement test for {@link Bun.spawn}
|
||||
*
|
||||
* This test spawns processes that write about 32 KB of data to stdout
|
||||
* and then exits. We only await the `process.exited` promise without reading
|
||||
* any of the output data to test for potential memory leaks.
|
||||
*/
|
||||
import { bunExe } from "harness";
|
||||
|
||||
describe("Bun.spawn", () => {
|
||||
const DEBUG_LOGS = false; // turn this on to see debug logs
|
||||
const log = (...args: any[]) => DEBUG_LOGS && console.log(...args);
|
||||
|
||||
const MB = 1024 * 1024;
|
||||
|
||||
test("'pipe' stdout should not leak memory", async () => {
|
||||
/**
|
||||
* @param batchSize # of processes to spawn in parallel in each batch
|
||||
* @param totalBatches # of batches to run
|
||||
*/
|
||||
async function testSpawnMemoryLeak(batchSize: number, totalBatches: number) {
|
||||
// Create a command that will generate ~512 KB of output
|
||||
const cmd = [bunExe(), "-e", "process.stdout.write(Buffer.alloc(32 * 1024, 'X'))"];
|
||||
|
||||
log("Starting memory leak test...");
|
||||
log(`Initial memory usage: ${Math.round(process.memoryUsage.rss() / MB)} MB`);
|
||||
|
||||
for (let batch = 0; batch < totalBatches; batch++) {
|
||||
const batchPromises: Promise<void>[] = [];
|
||||
|
||||
for (let i = 0; i < batchSize; i++) {
|
||||
// Use an async IIFE that doesn't return anything
|
||||
// This should help the GC clean up resources
|
||||
batchPromises.push(
|
||||
(async (): Promise<void> => {
|
||||
const process = Bun.spawn({
|
||||
cmd,
|
||||
stdout: "pipe", // We pipe stdout but never read from it
|
||||
stderr: "ignore",
|
||||
stdin: "ignore",
|
||||
});
|
||||
|
||||
// Only await the exit, don't read any data
|
||||
await process.exited;
|
||||
})(),
|
||||
);
|
||||
}
|
||||
|
||||
// Wait for all processes in this batch to complete
|
||||
await Promise.all(batchPromises);
|
||||
|
||||
// Log progress after each batch
|
||||
log(`Batch ${batch + 1}/${totalBatches} completed (${(batch + 1) * batchSize} processes)`);
|
||||
log(`Current memory usage: ${Math.round(process.memoryUsage.rss() / MB)} MB`);
|
||||
}
|
||||
|
||||
// Force garbage collection after all batches have completed
|
||||
Bun.gc(true);
|
||||
}
|
||||
|
||||
// Warmup
|
||||
await testSpawnMemoryLeak(10, 2);
|
||||
const memBefore = process.memoryUsage();
|
||||
|
||||
// Run the test
|
||||
await testSpawnMemoryLeak(25, 5);
|
||||
const memAfter = process.memoryUsage();
|
||||
|
||||
log("Memory leak test completed");
|
||||
log(`Final memory usage: ${Math.round(process.memoryUsage.rss() / MB)} MB`);
|
||||
log(`Memory difference: ${Math.round((process.memoryUsage.rss() - memBefore.rss) / MB)} MB`);
|
||||
|
||||
// should not have grown more than 50%
|
||||
const delta = memAfter.rss - memBefore.rss;
|
||||
const pct = delta / memBefore.rss;
|
||||
console.log(`RSS delta: ${delta / MB}MB (${Math.round(100 * pct)}%)`);
|
||||
expect(pct).toBeLessThan(0.5);
|
||||
}, 10_000); // NOTE: this test doesn't actually take this long, but keeping the limit high will help avoid flakyness
|
||||
});
|
||||
Reference in New Issue
Block a user