diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index a78efd9f88..98af9f8431 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -327,31 +327,11 @@ pub fn onCloseIO(this: *Subprocess, kind: StdioKind) void { pipe.deref(); } }, - .buffer_promise => |pipe| { - // The pipe already has the data in state.done, we need to take ownership - var bytes = if (pipe.state == .done) brk: { - const data = pipe.state.done; - pipe.state = .{ .done = &.{} }; // Clear to prevent double free - break :brk @constCast(data); - } else @constCast(&.{}); - log("buffer_promise onCloseIO: bytes.len={d}, first few bytes={any}", .{bytes.len, bytes[0..@min(10, bytes.len)]}); - // Don't resolve the promise here - it will be resolved when accessed - // Convert to buffer_promise_done variant to indicate data is ready but should still return a promise - out.* = .{ .buffer_promise_done = CowString.initOwned(bytes, bun.default_allocator) }; - pipe.deref(); - }, - .text_promise => |pipe| { - // The pipe already has the data in state.done, we need to take ownership - var bytes = if (pipe.state == .done) brk: { - const data = pipe.state.done; - pipe.state = .{ .done = &.{} }; // Clear to prevent double free - break :brk @constCast(data); - } else @constCast(&.{}); - log("text_promise onCloseIO: bytes.len={d}, first few bytes={any}", .{bytes.len, bytes[0..@min(10, bytes.len)]}); - // Don't resolve the promise here - it will be resolved when accessed - // Convert to text_promise_done variant to indicate data is ready but should still return a promise - out.* = .{ .text_promise_done = CowString.initOwned(bytes, bun.default_allocator) }; - pipe.deref(); + inline .buffer_promise, .text_promise => |pipe| { + // Keep the pipe reference for now - the promise resolution + // already happened in onReaderDone but we need to keep the state + // so getStdout/getStderr can still work + _ = pipe; }, else => {}, } @@ -420,16 +400,12 @@ const Readable = union(enum) { /// the owning `Readable` will be convered into this variant and the pipe's /// buffer will be taken as an owned `CowString`. buffer: CowString, - /// Completed buffer data that should be returned as a resolved promise - buffer_promise_done: CowString, - /// Completed text data that should be returned as a resolved promise - text_promise_done: CowString, pub fn memoryCost(this: *const Readable) usize { return switch (this.*) { .pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(), - .buffer_promise, .text_promise => |p| @sizeOf(PipeReader) + p.memoryCost(), - .buffer, .buffer_promise_done, .text_promise_done => |buf| buf.length(), + inline .buffer_promise, .text_promise => |p| @sizeOf(PipeReader) + p.memoryCost(), + .buffer => |buf| buf.length(), else => 0, }; } @@ -437,7 +413,7 @@ const Readable = union(enum) { pub fn hasPendingActivity(this: *const Readable) bool { return switch (this.*) { .pipe => this.pipe.hasPendingActivity(), - .buffer_promise, .text_promise => |p| p.hasPendingActivity(), + inline .buffer_promise, .text_promise => |p| p.hasPendingActivity(), else => false, }; } @@ -447,7 +423,7 @@ const Readable = union(enum) { .pipe => { this.pipe.updateRef(true); }, - .buffer_promise, .text_promise => |p| { + inline .buffer_promise, .text_promise => |p| { p.updateRef(true); }, else => {}, @@ -459,7 +435,7 @@ const Readable = union(enum) { .pipe => { this.pipe.updateRef(false); }, - .buffer_promise, .text_promise => |p| { + inline .buffer_promise, .text_promise => |p| { p.updateRef(false); }, else => {}, @@ -512,7 +488,7 @@ const Readable = union(enum) { .pipe => { this.pipe.close(); }, - .buffer_promise, .text_promise => |p| { + inline .buffer_promise, .text_promise => |p| { p.close(); }, else => {}, @@ -532,11 +508,11 @@ const Readable = union(enum) { defer pipe.detach(); this.* = .{ .closed = {} }; }, - .buffer_promise, .text_promise => |p| { + inline .buffer_promise, .text_promise => |p| { defer p.detach(); this.* = .{ .closed = {} }; }, - .buffer, .buffer_promise_done, .text_promise_done => |*buf| { + .buffer => |*buf| { buf.deinit(bun.default_allocator); }, else => {}, @@ -560,38 +536,56 @@ const Readable = union(enum) { }, .buffer_promise => |pipe| { log("toJS buffer_promise: pipe state = {s}", .{@tagName(pipe.state)}); + + // If we already have a pending promise, return it + if (pipe.pending_promise) |promise| { + log("toJS buffer_promise: returning existing promise", .{}); + return promise.toJS(); + } + // Check if the PipeReader has already finished reading if (pipe.state == .done) { log("toJS buffer_promise: state is done, creating resolved promise", .{}); - const bytes = pipe.toOwnedSlice(); - defer pipe.detach(); + const bytes = pipe.state.done; + // Don't use toOwnedSlice as it might have already been called + const bytes_copy = bun.default_allocator.alloc(u8, bytes.len) catch { + globalThis.throwOutOfMemory() catch return .zero; + }; + @memcpy(bytes_copy, bytes); defer this.* = .{ .closed = {} }; - const buffer = JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); + const buffer = JSC.MarkedArrayBuffer.fromBytes(bytes_copy, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); return JSC.JSPromise.resolvedPromiseValue(globalThis, buffer); } log("toJS buffer_promise: state is pending, creating new promise", .{}); - // Create a new pending promise - const promise = JSC.JSPromise.create(globalThis).toJS(); - // The getter in JS will cache this value - // DON'T detach the pipe or convert to a stream - we need it to resolve the promise later - return promise; + // Create a new pending promise and store it + const promise = JSC.JSPromise.create(globalThis); + pipe.pending_promise = promise; + return promise.toJS(); }, .text_promise => |pipe| { + // If we already have a pending promise, return it + if (pipe.pending_promise) |promise| { + return promise.toJS(); + } + // Check if the PipeReader has already finished reading if (pipe.state == .done) { - const bytes = pipe.toOwnedSlice(); - defer pipe.detach(); + const bytes = pipe.state.done; + const bytes_copy = bun.default_allocator.alloc(u8, bytes.len) catch { + globalThis.throwOutOfMemory() catch return .zero; + }; + @memcpy(bytes_copy, bytes); defer this.* = .{ .closed = {} }; - var str = bun.SliceWithUnderlyingString.transcodeFromOwnedSlice(bytes, .utf8); + var str = bun.SliceWithUnderlyingString.transcodeFromOwnedSlice(bytes_copy, .utf8); defer str.deinit(); return JSC.JSPromise.resolvedPromiseValue(globalThis, str.toJS(globalThis)); } - // Create a new pending promise - const promise = JSC.JSPromise.create(globalThis).toJS(); - // DON'T detach the pipe or convert to a stream - we need it to resolve the promise later - return promise; + // Create a new pending promise and store it + const promise = JSC.JSPromise.create(globalThis); + pipe.pending_promise = promise; + return promise.toJS(); }, .buffer => |*buffer| { defer this.* = .{ .closed = {} }; @@ -605,25 +599,6 @@ const Readable = union(enum) { }; return JSC.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0); }, - .buffer_promise_done => |*buf| { - defer this.* = .{ .closed = {} }; - log("buffer_promise_done toJS: buf.len={d}, first few bytes={any}", .{buf.length(), buf.slice()[0..@min(10, buf.length())]}); - const bytes = buf.takeSlice(bun.default_allocator) catch { - globalThis.throwOutOfMemory() catch return .zero; - }; - log("buffer_promise_done toJS: after takeSlice bytes.len={d}, first few bytes={any}", .{bytes.len, bytes[0..@min(10, bytes.len)]}); - const buffer = JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); - return JSC.JSPromise.resolvedPromiseValue(globalThis, buffer); - }, - .text_promise_done => |*buf| { - defer this.* = .{ .closed = {} }; - const bytes = buf.takeSlice(bun.default_allocator) catch { - globalThis.throwOutOfMemory() catch return .zero; - }; - var str = bun.SliceWithUnderlyingString.transcodeFromOwnedSlice(bytes, .utf8); - defer str.deinit(); - return JSC.JSPromise.resolvedPromiseValue(globalThis, str.toJS(globalThis)); - }, else => { return .js_undefined; }, @@ -647,9 +622,18 @@ const Readable = union(enum) { this.* = .{ .closed = {} }; return pipe.toBuffer(globalThis); }, - .buffer_promise, .text_promise => |pipe| { + inline .buffer_promise, .text_promise => |pipe, tag| { defer pipe.detach(); this.* = .{ .closed = {} }; + + // For text mode, return a string instead of a buffer + if (tag == .text_promise) { + const bytes = pipe.toOwnedSlice(); + const str = bun.String.createUTF8(bytes); + defer bun.default_allocator.free(bytes); + return str.toJS(globalThis); + } + return pipe.toBuffer(globalThis); }, .buffer => |*buf| { @@ -660,23 +644,6 @@ const Readable = union(enum) { return JSC.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); }, - .buffer_promise_done => |*buf| { - defer this.* = .{ .closed = {} }; - const own = buf.takeSlice(bun.default_allocator) catch { - return globalThis.throwOutOfMemory(); - }; - return JSC.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); - }, - .text_promise_done => |*buf| { - defer this.* = .{ .closed = {} }; - const own = buf.takeSlice(bun.default_allocator) catch { - return globalThis.throwOutOfMemory(); - }; - // For sync mode, return as a string not a buffer - const str = bun.String.createUTF8(own); - bun.default_allocator.free(own); - return str.toJS(globalThis); - }, else => { return .js_undefined; }, @@ -693,27 +660,14 @@ pub fn getStderr( // For buffer_promise and text_promise modes, we need to handle the race condition // where the process might have already exited before the getter is called switch (this.stderr) { - .buffer_promise, .text_promise => { + inline .buffer_promise, .text_promise => { // Check if there's already a cached promise if (this.this_jsvalue != .zero) { if (JSC.Codegen.JSSubprocess.stderrGetCached(this.this_jsvalue)) |cached| { return cached; } } - // If not, create and cache the promise - const promise_value = this.stderr.toJS(globalThis, this.hasExited()); - // The generated getter should cache this value - return promise_value; - }, - .buffer_promise_done, .text_promise_done => { - // Data is ready but we need to return a promise - // Check if there's already a cached promise - if (this.this_jsvalue != .zero) { - if (JSC.Codegen.JSSubprocess.stderrGetCached(this.this_jsvalue)) |cached| { - return cached; - } - } - // Create and return a resolved promise + // Create and cache the promise (resolved or pending based on variant) const promise_value = this.stderr.toJS(globalThis, this.hasExited()); return promise_value; }, @@ -742,7 +696,7 @@ pub fn getStdout( // For buffer_promise and text_promise modes, we need to handle the race condition // where the process might have already exited before the getter is called switch (this.stdout) { - .buffer_promise, .text_promise => { + inline .buffer_promise, .text_promise => { // Check if there's already a cached promise if (this.this_jsvalue != .zero) { if (JSC.Codegen.JSSubprocess.stdoutGetCached(this.this_jsvalue)) |cached| { @@ -750,24 +704,9 @@ pub fn getStdout( return cached; } } - // If not, create and cache the promise + // Create and cache the promise const promise_value = this.stdout.toJS(globalThis, this.hasExited()); - log("getStdout created new promise", .{}); - // The generated getter should cache this value - return promise_value; - }, - .buffer_promise_done, .text_promise_done => { - // Data is ready but we need to return a promise - // Check if there's already a cached promise - if (this.this_jsvalue != .zero) { - if (JSC.Codegen.JSSubprocess.stdoutGetCached(this.this_jsvalue)) |cached| { - log("getStdout returning cached promise (done variant)", .{}); - return cached; - } - } - // Create and return a resolved promise - const promise_value = this.stdout.toJS(globalThis, this.hasExited()); - log("getStdout created resolved promise (done variant)", .{}); + log("getStdout created promise", .{}); return promise_value; }, else => {}, @@ -1185,6 +1124,7 @@ pub const PipeReader = struct { err: bun.sys.Error, } = .{ .pending = {} }, stdio_result: StdioResult, + pending_promise: ?*JSC.JSPromise = null, pub const IOReader = bun.io.BufferedReader; pub const Poll = IOReader; @@ -1260,22 +1200,61 @@ pub const PipeReader = struct { this.state = .{ .done = owned }; if (this.process) |process| { this.process = null; - process.onCloseIO(this.kind(process)); + + // Determine if this is stdout or stderr + const stdio_kind = this.kind(process); + + // If we have a pending promise for buffer/text mode, we need to resolve it now + if (this.pending_promise) |promise| { + const globalThis = process.globalThis; + const event_loop = globalThis.bunVM().eventLoop(); + event_loop.enter(); + defer event_loop.exit(); + + const is_stdout = stdio_kind == .stdout; + const readable = if (is_stdout) &process.stdout else &process.stderr; + + // Resolve the promise based on the mode + switch (readable.*) { + .buffer_promise => { + // Don't use 'owned' directly as it will be freed later + const bytes = bun.default_allocator.alloc(u8, owned.len) catch bun.outOfMemory(); + @memcpy(bytes, owned); + const buffer = JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis); + promise.resolve(globalThis, buffer); + }, + .text_promise => { + // Don't use 'owned' directly as it will be freed later + const bytes = bun.default_allocator.alloc(u8, owned.len) catch bun.outOfMemory(); + @memcpy(bytes, owned); + var str = bun.SliceWithUnderlyingString.transcodeFromOwnedSlice(bytes, .utf8); + defer str.deinit(); + promise.resolve(globalThis, str.toJS(globalThis)); + }, + else => {}, + } + + this.pending_promise = null; + } + + process.onCloseIO(stdio_kind); this.deref(); } } pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind { - if ((process.stdout == .pipe and process.stdout.pipe == reader) or - (process.stdout == .buffer_promise and process.stdout.buffer_promise == reader) or - (process.stdout == .text_promise and process.stdout.text_promise == reader)) { - return .stdout; + switch (process.stdout) { + inline .pipe, .buffer_promise, .text_promise => |p| { + if (p == reader) return .stdout; + }, + else => {}, } - - if ((process.stderr == .pipe and process.stderr.pipe == reader) or - (process.stderr == .buffer_promise and process.stderr.buffer_promise == reader) or - (process.stderr == .text_promise and process.stderr.text_promise == reader)) { - return .stderr; + + switch (process.stderr) { + inline .pipe, .buffer_promise, .text_promise => |p| { + if (p == reader) return .stderr; + }, + else => {}, } @panic("We should be either stdout or stderr"); @@ -2769,12 +2748,14 @@ pub fn spawnMaybeSync( } } - if (subprocess.stdout == .buffer_promise or subprocess.stdout == .text_promise) { - const pipe = if (subprocess.stdout == .buffer_promise) subprocess.stdout.buffer_promise else subprocess.stdout.text_promise; - pipe.start(subprocess, loop).assert(); - if ((is_sync or !lazy)) { - pipe.readAll(); - } + switch (subprocess.stdout) { + inline .buffer_promise, .text_promise => |pipe| { + pipe.start(subprocess, loop).assert(); + if ((is_sync or !lazy)) { + pipe.readAll(); + } + }, + else => {}, } if (subprocess.stderr == .pipe) { @@ -2785,12 +2766,14 @@ pub fn spawnMaybeSync( } } - if (subprocess.stderr == .buffer_promise or subprocess.stderr == .text_promise) { - const pipe = if (subprocess.stderr == .buffer_promise) subprocess.stderr.buffer_promise else subprocess.stderr.text_promise; - pipe.start(subprocess, loop).assert(); - if ((is_sync or !lazy)) { - pipe.readAll(); - } + switch (subprocess.stderr) { + inline .buffer_promise, .text_promise => |pipe| { + pipe.start(subprocess, loop).assert(); + if ((is_sync or !lazy)) { + pipe.readAll(); + } + }, + else => {}, } should_close_memfd = false; @@ -2861,14 +2844,14 @@ pub fn spawnMaybeSync( subprocess.stdout.pipe.watch(); } - if (subprocess.stderr == .buffer_promise or subprocess.stderr == .text_promise) { - const pipe = if (subprocess.stderr == .buffer_promise) subprocess.stderr.buffer_promise else subprocess.stderr.text_promise; - pipe.watch(); + switch (subprocess.stderr) { + inline .buffer_promise, .text_promise => |pipe| pipe.watch(), + else => {}, } - if (subprocess.stdout == .buffer_promise or subprocess.stdout == .text_promise) { - const pipe = if (subprocess.stdout == .buffer_promise) subprocess.stdout.buffer_promise else subprocess.stdout.text_promise; - pipe.watch(); + switch (subprocess.stdout) { + inline .buffer_promise, .text_promise => |pipe| pipe.watch(), + else => {}, } jsc_vm.tick();