diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index b651b4d52d..b548de0c99 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -486,7 +486,7 @@ const Readable = union(enum) { defer pipe.detach(); this.* = .{ .closed = {} }; }, - .buffer => |buf| { + .buffer => |*buf| { buf.deinit(bun.default_allocator); }, else => {}, @@ -517,8 +517,7 @@ const Readable = union(enum) { const own = buffer.takeSlice(bun.default_allocator) catch { globalThis.throwOutOfMemory() catch return .zero; }; - const blob = JSC.WebCore.Blob.init(own, bun.default_allocator, globalThis); - return JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, 0); + return JSC.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0); }, else => { return JSValue.jsUndefined(); @@ -1087,6 +1086,12 @@ pub const PipeReader = struct { const out = this.reader._buffer; this.reader._buffer.items = &.{}; this.reader._buffer.capacity = 0; + + if (out.capacity > 0 and out.items.len == 0) { + out.deinit(); + return &.{}; + } + return out.items; } @@ -1109,9 +1114,8 @@ pub const PipeReader = struct { return stream; }, .done => |bytes| { - const blob = JSC.WebCore.Blob.init(bytes, bun.default_allocator, globalObject); this.state = .{ .done = &.{} }; - return JSC.WebCore.ReadableStream.fromBlob(globalObject, &blob, 0); + return JSC.WebCore.ReadableStream.fromOwnedSlice(globalObject, bytes, 0); }, .err => |err| { _ = err; // autofix diff --git a/src/bun.js/api/server/NodeHTTPResponse.zig b/src/bun.js/api/server/NodeHTTPResponse.zig index 82596c3956..007e443d0b 100644 --- a/src/bun.js/api/server/NodeHTTPResponse.zig +++ b/src/bun.js/api/server/NodeHTTPResponse.zig @@ -252,7 +252,7 @@ pub fn shouldRequestBePending(this: *const NodeHTTPResponse) bool { } pub fn dumpRequestBody(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame, thisValue: JSC.JSValue) bun.JSError!JSC.JSValue { - if (this.buffered_request_body_data_during_pause.len > 0) { + if (this.buffered_request_body_data_during_pause.cap > 0) { this.buffered_request_body_data_during_pause.deinitWithAllocator(bun.default_allocator); } if (!this.flags.request_has_completed) { diff --git a/src/bun.js/bindings/Uint8Array.cpp b/src/bun.js/bindings/Uint8Array.cpp index 9acc6cca9d..9e37d9225f 100644 --- a/src/bun.js/bindings/Uint8Array.cpp +++ b/src/bun.js/bindings/Uint8Array.cpp @@ -26,18 +26,17 @@ extern "C" JSC::EncodedJSValue JSUint8Array__fromDefaultAllocator(JSC::JSGlobalO extern "C" JSC::EncodedJSValue JSArrayBuffer__fromDefaultAllocator(JSC::JSGlobalObject* lexicalGlobalObject, uint8_t* ptr, size_t length) { - JSC::JSArrayBuffer* arrayBuffer; + RefPtr buffer; if (length > 0) [[likely]] { - RefPtr buffer = ArrayBuffer::createFromBytes({ ptr, length }, createSharedTask([](void* p) { + buffer = ArrayBuffer::createFromBytes({ ptr, length }, createSharedTask([](void* p) { mi_free(p); })); - - arrayBuffer = JSC::JSArrayBuffer::create(lexicalGlobalObject->vm(), lexicalGlobalObject->arrayBufferStructure(), WTFMove(buffer)); } else { - arrayBuffer = JSC::JSArrayBuffer::create(lexicalGlobalObject->vm(), lexicalGlobalObject->arrayBufferStructure(), nullptr); + buffer = ArrayBuffer::create(0, 1); } + auto arrayBuffer = JSC::JSArrayBuffer::create(lexicalGlobalObject->vm(), lexicalGlobalObject->arrayBufferStructure(), WTFMove(buffer)); return JSC::JSValue::encode(arrayBuffer); } diff --git a/src/bun.js/bindings/ZigString.zig b/src/bun.js/bindings/ZigString.zig index 8610862ecc..eb25070e01 100644 --- a/src/bun.js/bindings/ZigString.zig +++ b/src/bun.js/bindings/ZigString.zig @@ -270,6 +270,11 @@ pub const ZigString = extern struct { list.items.ptr[list.items.len] = 0; } + if (list.capacity > 0 and list.items.len == 0) { + list.deinit(); + return &.{}; + } + return list.items; } diff --git a/src/bun.js/webcore/Blob.zig b/src/bun.js/webcore/Blob.zig index 77c4d4487c..bd364b445f 100644 --- a/src/bun.js/webcore/Blob.zig +++ b/src/bun.js/webcore/Blob.zig @@ -1040,7 +1040,7 @@ pub fn writeFileWithSourceDestination( return file_copier.promise.value(); } else if (destination_type == .file and source_type == .s3) { const s3 = &source_store.data.s3; - if (JSC.WebCore.ReadableStream.fromJS(JSC.WebCore.ReadableStream.fromBlob( + if (JSC.WebCore.ReadableStream.fromJS(JSC.WebCore.ReadableStream.fromBlobCopyRef( ctx, source_blob, @truncate(s3.options.partSize), @@ -1077,7 +1077,7 @@ pub fn writeFileWithSourceDestination( switch (source_store.data) { .bytes => |bytes| { if (bytes.len > S3.MultiPartUploadOptions.MAX_SINGLE_UPLOAD_SIZE) { - if (JSC.WebCore.ReadableStream.fromJS(JSC.WebCore.ReadableStream.fromBlob( + if (JSC.WebCore.ReadableStream.fromJS(JSC.WebCore.ReadableStream.fromBlobCopyRef( ctx, source_blob, @truncate(s3.options.partSize), @@ -1144,7 +1144,7 @@ pub fn writeFileWithSourceDestination( }, .file, .s3 => { // stream - if (JSC.WebCore.ReadableStream.fromJS(JSC.WebCore.ReadableStream.fromBlob( + if (JSC.WebCore.ReadableStream.fromJS(JSC.WebCore.ReadableStream.fromBlobCopyRef( ctx, source_blob, @truncate(s3.options.partSize), @@ -1970,7 +1970,7 @@ pub fn getStream( recommended_chunk_size = @as(SizeType, @intCast(@max(0, @as(i52, @truncate(arguments[0].toInt64()))))); } - const stream = JSC.WebCore.ReadableStream.fromBlob( + const stream = JSC.WebCore.ReadableStream.fromBlobCopyRef( globalThis, this, recommended_chunk_size, @@ -4250,10 +4250,6 @@ pub const Any = union(enum) { // return value; // }, .InternalBlob => { - if (this.InternalBlob.bytes.items.len == 0) { - return JSC.ArrayBuffer.create(global, "", TypedArrayView); - } - const bytes = this.InternalBlob.toOwnedSlice(); this.* = .{ .Blob = .{} }; @@ -4413,8 +4409,15 @@ pub const Internal = struct { pub fn toOwnedSlice(this: *@This()) []u8 { const bytes = this.bytes.items; + const capacity = this.bytes.capacity; + if (bytes.len == 0 and capacity > 0) { + this.bytes.clearAndFree(); + return &.{}; + } + this.bytes.items = &.{}; this.bytes.capacity = 0; + return bytes; } diff --git a/src/bun.js/webcore/Body.zig b/src/bun.js/webcore/Body.zig index 770b360df8..809d1f17f6 100644 --- a/src/bun.js/webcore/Body.zig +++ b/src/bun.js/webcore/Body.zig @@ -460,7 +460,7 @@ pub const Value = union(Tag) { var blob = this.use(); defer blob.detach(); blob.resolveSize(); - const value = JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, blob.size); + const value = JSC.WebCore.ReadableStream.fromBlobCopyRef(globalThis, &blob, blob.size); this.* = .{ .Locked = .{ diff --git a/src/bun.js/webcore/FileReader.zig b/src/bun.js/webcore/FileReader.zig index a50dd143c5..32549f9e92 100644 --- a/src/bun.js/webcore/FileReader.zig +++ b/src/bun.js/webcore/FileReader.zig @@ -255,7 +255,7 @@ pub fn onStart(this: *FileReader) streams.Start { if (this.buffered.items.len > 0) { const buffered = this.buffered; this.buffered = .{}; - return .{ .owned_and_done = bun.ByteList.init(buffered.items) }; + return .{ .owned_and_done = bun.ByteList.fromList(buffered) }; } } else if (comptime Environment.isPosix) { if (!was_lazy and this.reader.flags.pollable) { @@ -524,10 +524,10 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: JSC.JSValue) streams.Resul this.buffered = .{}; log("onPull({d}) = {d}", .{ buffer.len, buffered.items.len }); if (this.reader.isDone()) { - return .{ .owned_and_done = bun.ByteList.init(buffered.items) }; + return .{ .owned_and_done = bun.ByteList.fromList(buffered) }; } - return .{ .owned = bun.ByteList.init(buffered.items) }; + return .{ .owned = bun.ByteList.fromList(buffered) }; }, else => {}, } @@ -549,7 +549,7 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: JSC.JSValue) streams.Resul pub fn drain(this: *FileReader) bun.ByteList { if (this.buffered.items.len > 0) { - const out = bun.ByteList.init(this.buffered.items); + const out = bun.ByteList.fromList(this.buffered); this.buffered = .{}; if (comptime Environment.allow_assert) { bun.assert(this.reader.buffer().items.ptr != out.ptr); @@ -626,6 +626,10 @@ pub fn onReaderDone(this: *FileReader) void { pub fn onReaderError(this: *FileReader, err: bun.sys.Error) void { this.consumeReaderBuffer(); + if (this.buffered.capacity > 0 and this.buffered.items.len == 0) { + this.buffered.deinit(bun.default_allocator); + this.buffered = .{}; + } this.pending.result = .{ .err = .{ .Error = err } }; this.pending.run(); diff --git a/src/bun.js/webcore/ReadableStream.zig b/src/bun.js/webcore/ReadableStream.zig index 1161224fb0..b9f4b0ba79 100644 --- a/src/bun.js/webcore/ReadableStream.zig +++ b/src/bun.js/webcore/ReadableStream.zig @@ -290,7 +290,13 @@ pub fn fromNative(globalThis: *JSGlobalObject, native: JSC.JSValue) JSC.JSValue return ZigGlobalObject__createNativeReadableStream(globalThis, native); } -pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { +pub fn fromOwnedSlice(globalThis: *JSGlobalObject, bytes: []u8, recommended_chunk_size: Blob.SizeType) JSC.JSValue { + var blob = Blob.init(bytes, bun.default_allocator, globalThis); + defer blob.deinit(); + return fromBlobCopyRef(globalThis, &blob, recommended_chunk_size); +} + +pub fn fromBlobCopyRef(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { JSC.markBinding(@src()); var store = blob.store orelse { return ReadableStream.empty(globalThis); diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index 1383a8eaaf..8ed7e72970 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -2385,7 +2385,7 @@ pub fn Bun__fetch_( prepare_body: { // is a S3 file we can use chunked here - if (JSC.WebCore.ReadableStream.fromJS(JSC.WebCore.ReadableStream.fromBlob(globalThis, &body.AnyBlob.Blob, s3.MultiPartUploadOptions.DefaultPartSize), globalThis)) |stream| { + if (JSC.WebCore.ReadableStream.fromJS(JSC.WebCore.ReadableStream.fromBlobCopyRef(globalThis, &body.AnyBlob.Blob, s3.MultiPartUploadOptions.DefaultPartSize), globalThis)) |stream| { var old = body; defer old.detach(); body = .{ .ReadableStream = JSC.WebCore.ReadableStream.Strong.init(stream, globalThis) }; diff --git a/src/cli/pack_command.zig b/src/cli/pack_command.zig index 6f90649f6f..89c2403d35 100644 --- a/src/cli/pack_command.zig +++ b/src/cli/pack_command.zig @@ -2411,7 +2411,7 @@ pub const PackCommand = struct { } pub fn deinit(this: *const IgnorePatterns, allocator: std.mem.Allocator) void { - for (this.list) |pattern_info| { + for (this.list) |*pattern_info| { pattern_info.glob.deinit(allocator); } allocator.free(this.list); diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index c9b8bccd27..ff026ca4c0 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -388,8 +388,6 @@ const PosixBufferedReader = struct { } } - const stack_buffer_len = 64 * 1024; - inline fn drainChunk(parent: *PosixBufferedReader, chunk: []const u8, hasMore: ReadState) bool { if (parent.vtable.isStreamingEnabled()) { if (chunk.len > 0) { diff --git a/src/ptr/CowSlice.zig b/src/ptr/CowSlice.zig index 108a34efd1..33411b21ce 100644 --- a/src/ptr/CowSlice.zig +++ b/src/ptr/CowSlice.zig @@ -229,7 +229,7 @@ pub fn CowSliceZ(T: type, comptime sentinel: ?T) type { /// /// In debug builds, deinitializing borrowed strings performs debug /// checks. In release builds it is a no-op. - pub fn deinit(str: Self, allocator: Allocator) void { + pub fn deinit(str: *const Self, allocator: Allocator) void { if (comptime cow_str_assertions) if (str.debug) |debug| { debug.mutex.lock(); bun.assertf( diff --git a/src/shell/subproc.zig b/src/shell/subproc.zig index 08757cfd60..969e51df8b 100644 --- a/src/shell/subproc.zig +++ b/src/shell/subproc.zig @@ -1249,6 +1249,11 @@ pub const PipeReader = struct { const out = this.reader._buffer; this.reader._buffer.items = &.{}; this.reader._buffer.capacity = 0; + + if (out.capacity > 0 and out.items.len == 0) { + out.deinit(); + return &.{}; + } return out.items; } @@ -1271,9 +1276,8 @@ pub const PipeReader = struct { return stream; }, .done => |bytes| { - const blob = JSC.WebCore.Blob.init(bytes, bun.default_allocator, globalObject); this.state = .{ .done = &.{} }; - return JSC.WebCore.ReadableStream.fromBlob(globalObject, &blob, 0); + return JSC.WebCore.ReadableStream.fromOwnedSlice(globalObject, bytes, 0); }, .err => |err| { _ = err; // autofix diff --git a/src/string/MutableString.zig b/src/string/MutableString.zig index 7ac7f0c0ee..0cdb26f374 100644 --- a/src/string/MutableString.zig +++ b/src/string/MutableString.zig @@ -276,7 +276,7 @@ pub fn sliceWithSentinel(self: *MutableString) [:0]u8 { } pub fn toOwnedSliceLength(self: *MutableString, length: usize) string { - self.list.shrinkAndFree(self.allocator, length); + self.list.items.len = length; return self.list.toOwnedSlice(self.allocator) catch bun.outOfMemory(); // TODO } diff --git a/test/js/bun/spawn/spawn-pipe-leak.test.ts b/test/js/bun/spawn/spawn-pipe-leak.test.ts index b6b68acbd9..4cd64995e8 100644 --- a/test/js/bun/spawn/spawn-pipe-leak.test.ts +++ b/test/js/bun/spawn/spawn-pipe-leak.test.ts @@ -5,23 +5,71 @@ * and then exits. We only await the `process.exited` promise without reading * any of the output data to test for potential memory leaks. */ -import { bunExe } from "harness"; +import { bunExe, isWindows } from "harness"; describe("Bun.spawn", () => { - const DEBUG_LOGS = false; // turn this on to see debug logs + const DEBUG_LOGS = true; // turn this on to see debug logs const log = (...args: any[]) => DEBUG_LOGS && console.log(...args); const MB = 1024 * 1024; - test("'pipe' stdout should not leak memory", async () => { + // Create a command that will generate ~512 KB of output + const cmd = [ + bunExe(), + "-e", + `for (let buffer = Buffer.alloc(1024 * 1024 * 8, 'X'); buffer.length > 0;) { + const written = require('fs').writeSync(1, buffer); + buffer = buffer.slice(written); +}`, + ]; + + const cmd10 = [ + bunExe(), + "-e", + `for (let buffer = Buffer.alloc(10, 'X'); buffer.length > 0;) { + const written = require('fs').writeSync(1, buffer); + buffer = buffer.slice(written); +}`, + ]; + + async function readPipeAfterExit() { + const process = Bun.spawn({ + cmd, + stdout: "pipe", + stderr: "ignore", + stdin: "ignore", + }); + await process.exited; + await Bun.readableStreamToBlob(process.stdout); + } + + async function dontRead() { + const process = Bun.spawn({ + cmd: cmd10, + stdout: "pipe", + stderr: "ignore", + stdin: "ignore", + }); + await process.exited; + } + + async function readPipeBeforeExit() { + const process = Bun.spawn({ + cmd, + stdout: "pipe", + stderr: "ignore", + stdin: "ignore", + }); + await Bun.readableStreamToBlob(process.stdout); + await process.exited; + } + + async function run(iterate: () => Promise) { /** * @param batchSize # of processes to spawn in parallel in each batch * @param totalBatches # of batches to run */ async function testSpawnMemoryLeak(batchSize: number, totalBatches: number) { - // Create a command that will generate ~512 KB of output - const cmd = [bunExe(), "-e", "process.stdout.write(Buffer.alloc(32 * 1024, 'X'))"]; - log("Starting memory leak test..."); log(`Initial memory usage: ${Math.round(process.memoryUsage.rss() / MB)} MB`); @@ -31,19 +79,7 @@ describe("Bun.spawn", () => { for (let i = 0; i < batchSize; i++) { // Use an async IIFE that doesn't return anything // This should help the GC clean up resources - batchPromises.push( - (async (): Promise => { - const process = Bun.spawn({ - cmd, - stdout: "pipe", // We pipe stdout but never read from it - stderr: "ignore", - stdin: "ignore", - }); - - // Only await the exit, don't read any data - await process.exited; - })(), - ); + batchPromises.push(iterate()); } // Wait for all processes in this batch to complete @@ -58,12 +94,14 @@ describe("Bun.spawn", () => { Bun.gc(true); } + const batchSize = process.platform === "win32" ? 10 : 50; + // Warmup - await testSpawnMemoryLeak(10, 2); + await testSpawnMemoryLeak(batchSize, 5); const memBefore = process.memoryUsage(); // Run the test - await testSpawnMemoryLeak(25, 5); + await testSpawnMemoryLeak(batchSize, 10); const memAfter = process.memoryUsage(); log("Memory leak test completed"); @@ -75,5 +113,21 @@ describe("Bun.spawn", () => { const pct = delta / memBefore.rss; console.log(`RSS delta: ${delta / MB}MB (${Math.round(100 * pct)}%)`); expect(pct).toBeLessThan(0.5); - }, 10_000); // NOTE: this test doesn't actually take this long, but keeping the limit high will help avoid flakyness + } + + test("'pipe' stdout if read after exit should not leak memory", async () => { + await run(readPipeAfterExit); + }, 30_000); + + test("'pipe' stdout if not read should not leak memory", async () => { + await run(dontRead); + }, 30_000); + + test.todoIf(isWindows)( + "'pipe' stdout if read before exit should not leak memory", + async () => { + await run(readPipeBeforeExit); + }, + 30_000, + ); });