Fix buffer and text mode promise resolution in Bun.spawn()

This commit properly implements promise resolution for the new buffer
and text modes in Bun.spawn(). The key changes include:

1. Added pending_promise field to PipeReader to store unresolved promises
2. Implemented proper promise resolution in onReaderDone when data is ready
3. Used inline switch statements throughout for cleaner control flow
4. Removed the intermediate buffer_promise_done/text_promise_done states
5. Fixed race conditions where processes exit before stdout/stderr is accessed

The implementation now correctly:
- Creates and stores promises when first accessed
- Resolves promises when pipe reading completes
- Handles both async and sync modes (though sync text mode has a string
  encoding issue that needs further investigation)
- Properly enters/exits the event loop for promise resolution

5 out of 6 tests now pass. The remaining issue is with sync text mode
returning garbled string data, which appears to be a memory lifecycle issue.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2025-06-28 07:50:31 +02:00
committed by Jarred Sumner
parent c8fe9523cb
commit e34950b993

View File

@@ -327,31 +327,11 @@ pub fn onCloseIO(this: *Subprocess, kind: StdioKind) void {
pipe.deref();
}
},
.buffer_promise => |pipe| {
// The pipe already has the data in state.done, we need to take ownership
var bytes = if (pipe.state == .done) brk: {
const data = pipe.state.done;
pipe.state = .{ .done = &.{} }; // Clear to prevent double free
break :brk @constCast(data);
} else @constCast(&.{});
log("buffer_promise onCloseIO: bytes.len={d}, first few bytes={any}", .{bytes.len, bytes[0..@min(10, bytes.len)]});
// Don't resolve the promise here - it will be resolved when accessed
// Convert to buffer_promise_done variant to indicate data is ready but should still return a promise
out.* = .{ .buffer_promise_done = CowString.initOwned(bytes, bun.default_allocator) };
pipe.deref();
},
.text_promise => |pipe| {
// The pipe already has the data in state.done, we need to take ownership
var bytes = if (pipe.state == .done) brk: {
const data = pipe.state.done;
pipe.state = .{ .done = &.{} }; // Clear to prevent double free
break :brk @constCast(data);
} else @constCast(&.{});
log("text_promise onCloseIO: bytes.len={d}, first few bytes={any}", .{bytes.len, bytes[0..@min(10, bytes.len)]});
// Don't resolve the promise here - it will be resolved when accessed
// Convert to text_promise_done variant to indicate data is ready but should still return a promise
out.* = .{ .text_promise_done = CowString.initOwned(bytes, bun.default_allocator) };
pipe.deref();
inline .buffer_promise, .text_promise => |pipe| {
// Keep the pipe reference for now - the promise resolution
// already happened in onReaderDone but we need to keep the state
// so getStdout/getStderr can still work
_ = pipe;
},
else => {},
}
@@ -420,16 +400,12 @@ const Readable = union(enum) {
/// the owning `Readable` will be convered into this variant and the pipe's
/// buffer will be taken as an owned `CowString`.
buffer: CowString,
/// Completed buffer data that should be returned as a resolved promise
buffer_promise_done: CowString,
/// Completed text data that should be returned as a resolved promise
text_promise_done: CowString,
pub fn memoryCost(this: *const Readable) usize {
return switch (this.*) {
.pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(),
.buffer_promise, .text_promise => |p| @sizeOf(PipeReader) + p.memoryCost(),
.buffer, .buffer_promise_done, .text_promise_done => |buf| buf.length(),
inline .buffer_promise, .text_promise => |p| @sizeOf(PipeReader) + p.memoryCost(),
.buffer => |buf| buf.length(),
else => 0,
};
}
@@ -437,7 +413,7 @@ const Readable = union(enum) {
pub fn hasPendingActivity(this: *const Readable) bool {
return switch (this.*) {
.pipe => this.pipe.hasPendingActivity(),
.buffer_promise, .text_promise => |p| p.hasPendingActivity(),
inline .buffer_promise, .text_promise => |p| p.hasPendingActivity(),
else => false,
};
}
@@ -447,7 +423,7 @@ const Readable = union(enum) {
.pipe => {
this.pipe.updateRef(true);
},
.buffer_promise, .text_promise => |p| {
inline .buffer_promise, .text_promise => |p| {
p.updateRef(true);
},
else => {},
@@ -459,7 +435,7 @@ const Readable = union(enum) {
.pipe => {
this.pipe.updateRef(false);
},
.buffer_promise, .text_promise => |p| {
inline .buffer_promise, .text_promise => |p| {
p.updateRef(false);
},
else => {},
@@ -512,7 +488,7 @@ const Readable = union(enum) {
.pipe => {
this.pipe.close();
},
.buffer_promise, .text_promise => |p| {
inline .buffer_promise, .text_promise => |p| {
p.close();
},
else => {},
@@ -532,11 +508,11 @@ const Readable = union(enum) {
defer pipe.detach();
this.* = .{ .closed = {} };
},
.buffer_promise, .text_promise => |p| {
inline .buffer_promise, .text_promise => |p| {
defer p.detach();
this.* = .{ .closed = {} };
},
.buffer, .buffer_promise_done, .text_promise_done => |*buf| {
.buffer => |*buf| {
buf.deinit(bun.default_allocator);
},
else => {},
@@ -560,38 +536,56 @@ const Readable = union(enum) {
},
.buffer_promise => |pipe| {
log("toJS buffer_promise: pipe state = {s}", .{@tagName(pipe.state)});
// If we already have a pending promise, return it
if (pipe.pending_promise) |promise| {
log("toJS buffer_promise: returning existing promise", .{});
return promise.toJS();
}
// Check if the PipeReader has already finished reading
if (pipe.state == .done) {
log("toJS buffer_promise: state is done, creating resolved promise", .{});
const bytes = pipe.toOwnedSlice();
defer pipe.detach();
const bytes = pipe.state.done;
// Don't use toOwnedSlice as it might have already been called
const bytes_copy = bun.default_allocator.alloc(u8, bytes.len) catch {
globalThis.throwOutOfMemory() catch return .zero;
};
@memcpy(bytes_copy, bytes);
defer this.* = .{ .closed = {} };
const buffer = JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
const buffer = JSC.MarkedArrayBuffer.fromBytes(bytes_copy, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
return JSC.JSPromise.resolvedPromiseValue(globalThis, buffer);
}
log("toJS buffer_promise: state is pending, creating new promise", .{});
// Create a new pending promise
const promise = JSC.JSPromise.create(globalThis).toJS();
// The getter in JS will cache this value
// DON'T detach the pipe or convert to a stream - we need it to resolve the promise later
return promise;
// Create a new pending promise and store it
const promise = JSC.JSPromise.create(globalThis);
pipe.pending_promise = promise;
return promise.toJS();
},
.text_promise => |pipe| {
// If we already have a pending promise, return it
if (pipe.pending_promise) |promise| {
return promise.toJS();
}
// Check if the PipeReader has already finished reading
if (pipe.state == .done) {
const bytes = pipe.toOwnedSlice();
defer pipe.detach();
const bytes = pipe.state.done;
const bytes_copy = bun.default_allocator.alloc(u8, bytes.len) catch {
globalThis.throwOutOfMemory() catch return .zero;
};
@memcpy(bytes_copy, bytes);
defer this.* = .{ .closed = {} };
var str = bun.SliceWithUnderlyingString.transcodeFromOwnedSlice(bytes, .utf8);
var str = bun.SliceWithUnderlyingString.transcodeFromOwnedSlice(bytes_copy, .utf8);
defer str.deinit();
return JSC.JSPromise.resolvedPromiseValue(globalThis, str.toJS(globalThis));
}
// Create a new pending promise
const promise = JSC.JSPromise.create(globalThis).toJS();
// DON'T detach the pipe or convert to a stream - we need it to resolve the promise later
return promise;
// Create a new pending promise and store it
const promise = JSC.JSPromise.create(globalThis);
pipe.pending_promise = promise;
return promise.toJS();
},
.buffer => |*buffer| {
defer this.* = .{ .closed = {} };
@@ -605,25 +599,6 @@ const Readable = union(enum) {
};
return JSC.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0);
},
.buffer_promise_done => |*buf| {
defer this.* = .{ .closed = {} };
log("buffer_promise_done toJS: buf.len={d}, first few bytes={any}", .{buf.length(), buf.slice()[0..@min(10, buf.length())]});
const bytes = buf.takeSlice(bun.default_allocator) catch {
globalThis.throwOutOfMemory() catch return .zero;
};
log("buffer_promise_done toJS: after takeSlice bytes.len={d}, first few bytes={any}", .{bytes.len, bytes[0..@min(10, bytes.len)]});
const buffer = JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
return JSC.JSPromise.resolvedPromiseValue(globalThis, buffer);
},
.text_promise_done => |*buf| {
defer this.* = .{ .closed = {} };
const bytes = buf.takeSlice(bun.default_allocator) catch {
globalThis.throwOutOfMemory() catch return .zero;
};
var str = bun.SliceWithUnderlyingString.transcodeFromOwnedSlice(bytes, .utf8);
defer str.deinit();
return JSC.JSPromise.resolvedPromiseValue(globalThis, str.toJS(globalThis));
},
else => {
return .js_undefined;
},
@@ -647,9 +622,18 @@ const Readable = union(enum) {
this.* = .{ .closed = {} };
return pipe.toBuffer(globalThis);
},
.buffer_promise, .text_promise => |pipe| {
inline .buffer_promise, .text_promise => |pipe, tag| {
defer pipe.detach();
this.* = .{ .closed = {} };
// For text mode, return a string instead of a buffer
if (tag == .text_promise) {
const bytes = pipe.toOwnedSlice();
const str = bun.String.createUTF8(bytes);
defer bun.default_allocator.free(bytes);
return str.toJS(globalThis);
}
return pipe.toBuffer(globalThis);
},
.buffer => |*buf| {
@@ -660,23 +644,6 @@ const Readable = union(enum) {
return JSC.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
},
.buffer_promise_done => |*buf| {
defer this.* = .{ .closed = {} };
const own = buf.takeSlice(bun.default_allocator) catch {
return globalThis.throwOutOfMemory();
};
return JSC.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
},
.text_promise_done => |*buf| {
defer this.* = .{ .closed = {} };
const own = buf.takeSlice(bun.default_allocator) catch {
return globalThis.throwOutOfMemory();
};
// For sync mode, return as a string not a buffer
const str = bun.String.createUTF8(own);
bun.default_allocator.free(own);
return str.toJS(globalThis);
},
else => {
return .js_undefined;
},
@@ -693,27 +660,14 @@ pub fn getStderr(
// For buffer_promise and text_promise modes, we need to handle the race condition
// where the process might have already exited before the getter is called
switch (this.stderr) {
.buffer_promise, .text_promise => {
inline .buffer_promise, .text_promise => {
// Check if there's already a cached promise
if (this.this_jsvalue != .zero) {
if (JSC.Codegen.JSSubprocess.stderrGetCached(this.this_jsvalue)) |cached| {
return cached;
}
}
// If not, create and cache the promise
const promise_value = this.stderr.toJS(globalThis, this.hasExited());
// The generated getter should cache this value
return promise_value;
},
.buffer_promise_done, .text_promise_done => {
// Data is ready but we need to return a promise
// Check if there's already a cached promise
if (this.this_jsvalue != .zero) {
if (JSC.Codegen.JSSubprocess.stderrGetCached(this.this_jsvalue)) |cached| {
return cached;
}
}
// Create and return a resolved promise
// Create and cache the promise (resolved or pending based on variant)
const promise_value = this.stderr.toJS(globalThis, this.hasExited());
return promise_value;
},
@@ -742,7 +696,7 @@ pub fn getStdout(
// For buffer_promise and text_promise modes, we need to handle the race condition
// where the process might have already exited before the getter is called
switch (this.stdout) {
.buffer_promise, .text_promise => {
inline .buffer_promise, .text_promise => {
// Check if there's already a cached promise
if (this.this_jsvalue != .zero) {
if (JSC.Codegen.JSSubprocess.stdoutGetCached(this.this_jsvalue)) |cached| {
@@ -750,24 +704,9 @@ pub fn getStdout(
return cached;
}
}
// If not, create and cache the promise
// Create and cache the promise
const promise_value = this.stdout.toJS(globalThis, this.hasExited());
log("getStdout created new promise", .{});
// The generated getter should cache this value
return promise_value;
},
.buffer_promise_done, .text_promise_done => {
// Data is ready but we need to return a promise
// Check if there's already a cached promise
if (this.this_jsvalue != .zero) {
if (JSC.Codegen.JSSubprocess.stdoutGetCached(this.this_jsvalue)) |cached| {
log("getStdout returning cached promise (done variant)", .{});
return cached;
}
}
// Create and return a resolved promise
const promise_value = this.stdout.toJS(globalThis, this.hasExited());
log("getStdout created resolved promise (done variant)", .{});
log("getStdout created promise", .{});
return promise_value;
},
else => {},
@@ -1185,6 +1124,7 @@ pub const PipeReader = struct {
err: bun.sys.Error,
} = .{ .pending = {} },
stdio_result: StdioResult,
pending_promise: ?*JSC.JSPromise = null,
pub const IOReader = bun.io.BufferedReader;
pub const Poll = IOReader;
@@ -1260,22 +1200,61 @@ pub const PipeReader = struct {
this.state = .{ .done = owned };
if (this.process) |process| {
this.process = null;
process.onCloseIO(this.kind(process));
// Determine if this is stdout or stderr
const stdio_kind = this.kind(process);
// If we have a pending promise for buffer/text mode, we need to resolve it now
if (this.pending_promise) |promise| {
const globalThis = process.globalThis;
const event_loop = globalThis.bunVM().eventLoop();
event_loop.enter();
defer event_loop.exit();
const is_stdout = stdio_kind == .stdout;
const readable = if (is_stdout) &process.stdout else &process.stderr;
// Resolve the promise based on the mode
switch (readable.*) {
.buffer_promise => {
// Don't use 'owned' directly as it will be freed later
const bytes = bun.default_allocator.alloc(u8, owned.len) catch bun.outOfMemory();
@memcpy(bytes, owned);
const buffer = JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
promise.resolve(globalThis, buffer);
},
.text_promise => {
// Don't use 'owned' directly as it will be freed later
const bytes = bun.default_allocator.alloc(u8, owned.len) catch bun.outOfMemory();
@memcpy(bytes, owned);
var str = bun.SliceWithUnderlyingString.transcodeFromOwnedSlice(bytes, .utf8);
defer str.deinit();
promise.resolve(globalThis, str.toJS(globalThis));
},
else => {},
}
this.pending_promise = null;
}
process.onCloseIO(stdio_kind);
this.deref();
}
}
pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind {
if ((process.stdout == .pipe and process.stdout.pipe == reader) or
(process.stdout == .buffer_promise and process.stdout.buffer_promise == reader) or
(process.stdout == .text_promise and process.stdout.text_promise == reader)) {
return .stdout;
switch (process.stdout) {
inline .pipe, .buffer_promise, .text_promise => |p| {
if (p == reader) return .stdout;
},
else => {},
}
if ((process.stderr == .pipe and process.stderr.pipe == reader) or
(process.stderr == .buffer_promise and process.stderr.buffer_promise == reader) or
(process.stderr == .text_promise and process.stderr.text_promise == reader)) {
return .stderr;
switch (process.stderr) {
inline .pipe, .buffer_promise, .text_promise => |p| {
if (p == reader) return .stderr;
},
else => {},
}
@panic("We should be either stdout or stderr");
@@ -2769,12 +2748,14 @@ pub fn spawnMaybeSync(
}
}
if (subprocess.stdout == .buffer_promise or subprocess.stdout == .text_promise) {
const pipe = if (subprocess.stdout == .buffer_promise) subprocess.stdout.buffer_promise else subprocess.stdout.text_promise;
pipe.start(subprocess, loop).assert();
if ((is_sync or !lazy)) {
pipe.readAll();
}
switch (subprocess.stdout) {
inline .buffer_promise, .text_promise => |pipe| {
pipe.start(subprocess, loop).assert();
if ((is_sync or !lazy)) {
pipe.readAll();
}
},
else => {},
}
if (subprocess.stderr == .pipe) {
@@ -2785,12 +2766,14 @@ pub fn spawnMaybeSync(
}
}
if (subprocess.stderr == .buffer_promise or subprocess.stderr == .text_promise) {
const pipe = if (subprocess.stderr == .buffer_promise) subprocess.stderr.buffer_promise else subprocess.stderr.text_promise;
pipe.start(subprocess, loop).assert();
if ((is_sync or !lazy)) {
pipe.readAll();
}
switch (subprocess.stderr) {
inline .buffer_promise, .text_promise => |pipe| {
pipe.start(subprocess, loop).assert();
if ((is_sync or !lazy)) {
pipe.readAll();
}
},
else => {},
}
should_close_memfd = false;
@@ -2861,14 +2844,14 @@ pub fn spawnMaybeSync(
subprocess.stdout.pipe.watch();
}
if (subprocess.stderr == .buffer_promise or subprocess.stderr == .text_promise) {
const pipe = if (subprocess.stderr == .buffer_promise) subprocess.stderr.buffer_promise else subprocess.stderr.text_promise;
pipe.watch();
switch (subprocess.stderr) {
inline .buffer_promise, .text_promise => |pipe| pipe.watch(),
else => {},
}
if (subprocess.stdout == .buffer_promise or subprocess.stdout == .text_promise) {
const pipe = if (subprocess.stdout == .buffer_promise) subprocess.stdout.buffer_promise else subprocess.stdout.text_promise;
pipe.watch();
switch (subprocess.stdout) {
inline .buffer_promise, .text_promise => |pipe| pipe.watch(),
else => {},
}
jsc_vm.tick();