Introduce stdin support to Bun.spawn

This commit is contained in:
Jarred Sumner
2025-06-22 22:42:49 -07:00
parent 4cc61a1b8c
commit 85258d64bb
12 changed files with 1433 additions and 33 deletions

View File

@@ -23,6 +23,7 @@ pub const Stdio = union(enum) {
memfd: bun.FileDescriptor,
pipe,
ipc,
readable_stream: JSC.WebCore.ReadableStream,
const log = bun.sys.syslog;
@@ -78,6 +79,9 @@ pub const Stdio = union(enum) {
.memfd => |fd| {
fd.close();
},
.readable_stream => {
// ReadableStream cleanup is handled by the subprocess
},
else => {},
}
}
@@ -191,7 +195,7 @@ pub const Stdio = union(enum) {
break :brk .{ .buffer = {} };
},
.dup2 => .{ .dup2 = .{ .out = stdio.dup2.out, .to = stdio.dup2.to } },
.capture, .pipe, .array_buffer => .{ .buffer = {} },
.capture, .pipe, .array_buffer, .readable_stream => .{ .buffer = {} },
.ipc => .{ .ipc = {} },
.fd => |fd| .{ .pipe = fd },
.memfd => |fd| .{ .pipe = fd },
@@ -244,7 +248,7 @@ pub const Stdio = union(enum) {
break :brk .{ .buffer = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() };
},
.ipc => .{ .ipc = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() },
.capture, .pipe, .array_buffer => .{ .buffer = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() },
.capture, .pipe, .array_buffer, .readable_stream => .{ .buffer = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() },
.fd => |fd| .{ .pipe = fd },
.dup2 => .{ .dup2 = .{ .out = stdio.dup2.out, .to = stdio.dup2.to } },
.path => |pathlike| .{ .path = pathlike.slice() },
@@ -277,7 +281,7 @@ pub const Stdio = union(enum) {
pub fn isPiped(self: Stdio) bool {
return switch (self) {
.capture, .array_buffer, .blob, .pipe => true,
.capture, .array_buffer, .blob, .pipe, .readable_stream => true,
.ipc => Environment.isWindows,
else => false,
};
@@ -351,27 +355,13 @@ pub const Stdio = union(enum) {
} else if (value.as(JSC.WebCore.Response)) |req| {
req.getBodyValue().toBlobIfPossible();
return out_stdio.extractBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i);
} else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |req_const| {
var req = req_const;
if (i == 0) {
if (req.toAnyBlob(globalThis)) |blob| {
return out_stdio.extractBlob(globalThis, blob, i);
}
switch (req.ptr) {
.File, .Blob => {
globalThis.throwTODO("Support fd/blob backed ReadableStream in spawn stdin. See https://github.com/oven-sh/bun/issues/8049") catch {};
return error.JSError;
},
.Direct, .JavaScript, .Bytes => {
// out_stdio.* = .{ .connect = req };
globalThis.throwTODO("Re-enable ReadableStream support in spawn stdin. ") catch {};
return error.JSError;
},
.Invalid => {
return globalThis.throwInvalidArguments("ReadableStream is in invalid state.", .{});
},
} else if (i == 0) {
if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |stream| {
if (stream.isDisturbed(globalThis)) {
return globalThis.throwInvalidArguments("stdin ReadableStream is already disturbed", .{});
}
out_stdio.* = .{ .readable_stream = stream };
return;
}
} else if (value.asArrayBuffer(globalThis)) |array_buffer| {
// Change in Bun v1.0.34: don't throw for empty ArrayBuffer

View File

@@ -446,6 +446,7 @@ const Readable = union(enum) {
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, max_size) },
.array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
.capture => Output.panic("TODO: implement capture support in Stdio readable", .{}),
.readable_stream => Readable{ .ignore = {} }, // ReadableStream is handled separately
};
}
@@ -1265,16 +1266,17 @@ const Writable = union(enum) {
pub fn onStart(_: *Writable) void {}
pub fn init(
stdio: Stdio,
stdio: *Stdio,
event_loop: *JSC.EventLoop,
subprocess: *Subprocess,
result: StdioResult,
promise_for_stream: *JSC.JSValue,
) !Writable {
assertStdioResult(result);
if (Environment.isWindows) {
switch (stdio) {
.pipe => {
switch (stdio.*) {
.pipe, .readable_stream => {
if (result == .buffer) {
const pipe = JSC.WebCore.FileSink.createWithPipe(event_loop, result.buffer);
@@ -1328,14 +1330,14 @@ const Writable = union(enum) {
}
if (comptime Environment.isPosix) {
if (stdio == .pipe) {
if (stdio.* == .pipe) {
_ = bun.sys.setNonblocking(result.?);
}
}
switch (stdio) {
switch (stdio.*) {
.dup2 => @panic("TODO dup2 stdio"),
.pipe => {
.pipe, .readable_stream => {
const pipe = JSC.WebCore.FileSink.create(event_loop, result.?);
switch (pipe.writer.start(pipe.fd, true)) {
@@ -1343,16 +1345,24 @@ const Writable = union(enum) {
.err => |err| {
_ = err; // autofix
pipe.deref();
if (stdio.* == .readable_stream) {
stdio.readable_stream.cancel(event_loop.global);
}
return error.UnexpectedCreatingStdin;
},
}
pipe.writer.handle.poll.flags.insert(.socket);
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.has_stdin_destructor_called = false;
subprocess.flags.deref_on_stdin_destroyed = true;
pipe.writer.handle.poll.flags.insert(.socket);
if (stdio.* == .readable_stream) {
promise_for_stream.* = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
}
return Writable{
.pipe = pipe,
@@ -1518,6 +1528,7 @@ pub fn onProcessExit(this: *Subprocess, process: *Process, status: bun.spawn.Sta
this.pid_rusage = rusage.*;
const is_sync = this.flags.is_sync;
this.clearAbortSignal();
defer this.deref();
defer this.disconnectIPC(true);
@@ -2335,16 +2346,19 @@ pub fn spawnMaybeSync(
MaxBuf.createForSubprocess(subprocess, &subprocess.stderr_maxbuf, maxBuffer);
MaxBuf.createForSubprocess(subprocess, &subprocess.stdout_maxbuf, maxBuffer);
var promise_for_stream: JSC.JSValue = .zero;
// When run synchronously, subprocess isn't garbage collected
subprocess.* = Subprocess{
.globalThis = globalThis,
.process = process,
.pid_rusage = null,
.stdin = Writable.init(
stdio[0],
&stdio[0],
loop,
subprocess,
spawned.stdin,
&promise_for_stream,
) catch {
subprocess.deref();
return globalThis.throwOutOfMemory();
@@ -2388,6 +2402,14 @@ pub fn spawnMaybeSync(
subprocess.process.setExitHandler(subprocess);
promise_for_stream.ensureStillAlive();
if (globalThis.hasException()) {
subprocess.deref();
_ = subprocess.tryKill(subprocess.killSignal);
return globalThis.throwValue(globalThis.takeError(error.JSError));
}
var posix_ipc_info: if (Environment.isPosix) IPC.Socket else void = undefined;
if (Environment.isPosix and !is_sync) {
if (maybe_ipc_mode) |mode| {
@@ -2446,6 +2468,10 @@ pub fn spawnMaybeSync(
JSC.Codegen.JSSubprocess.ipcCallbackSetCached(out, globalThis, ipc_callback);
}
if (stdio[0] == .readable_stream) {
JSC.Codegen.JSSubprocess.stdinSetCached(out, globalThis, stdio[0].readable_stream.value);
}
switch (subprocess.process.watch()) {
.result => {},
.err => {

View File

@@ -4406,6 +4406,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h
return GlobalObject::PromiseFunctions::Bun__FileStreamWrapper__onResolveRequestStream;
} else if (handler == Bun__FileStreamWrapper__onRejectRequestStream) {
return GlobalObject::PromiseFunctions::Bun__FileStreamWrapper__onRejectRequestStream;
} else if (handler == Bun__FileSink__onResolveStream) {
return GlobalObject::PromiseFunctions::Bun__FileSink__onResolveStream;
} else if (handler == Bun__FileSink__onRejectStream) {
return GlobalObject::PromiseFunctions::Bun__FileSink__onRejectStream;
} else {
RELEASE_ASSERT_NOT_REACHED();
}

View File

@@ -379,8 +379,10 @@ public:
Bun__S3UploadStream__onResolveRequestStream,
Bun__FileStreamWrapper__onRejectRequestStream,
Bun__FileStreamWrapper__onResolveRequestStream,
Bun__FileSink__onResolveStream,
Bun__FileSink__onRejectStream,
};
static constexpr size_t promiseFunctionsSize = 34;
static constexpr size_t promiseFunctionsSize = 36;
static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC::JSGlobalObject* arg0, JSC::CallFrame* arg1));

View File

@@ -688,6 +688,9 @@ BUN_DECLARE_HOST_FUNCTION(Bun__HTTPRequestContext__onResolveStream);
BUN_DECLARE_HOST_FUNCTION(Bun__NodeHTTPRequest__onResolve);
BUN_DECLARE_HOST_FUNCTION(Bun__NodeHTTPRequest__onReject);
BUN_DECLARE_HOST_FUNCTION(Bun__FileSink__onResolveStream);
BUN_DECLARE_HOST_FUNCTION(Bun__FileSink__onRejectStream);
#endif
#ifdef __cplusplus

View File

@@ -24,6 +24,9 @@ fd: bun.FileDescriptor = bun.invalid_fd,
auto_flusher: webcore.AutoFlusher = .{},
run_pending_later: FlushPendingTask = .{},
/// Currently, only used when `stdin` in `Bun.spawn` is a ReadableStream.
readable_stream: JSC.WebCore.ReadableStream.Strong = .{},
const log = Output.scoped(.FileSink, false);
pub const RefCount = bun.ptr.RefCount(FileSink, "ref_count", deinit, .{});
@@ -79,6 +82,14 @@ pub fn onAttachedProcessExit(this: *FileSink) void {
this.pending.result = .{ .err = .fromCode(.PIPE, .write) };
if (this.readable_stream.has()) {
if (this.event_loop_handle.globalObject()) |global| {
if (this.readable_stream.get(global)) |stream| {
stream.cancel(global);
}
}
}
this.runPending();
if (this.must_be_kept_alive_until_eof) {
@@ -182,6 +193,13 @@ pub fn onReady(this: *FileSink) void {
pub fn onClose(this: *FileSink) void {
log("onClose()", .{});
this.signal.close(null);
if (this.readable_stream.has()) {
if (this.event_loop_handle.globalObject()) |global| {
if (this.readable_stream.get(global)) |stream| {
stream.cancel(global);
}
}
}
}
pub fn createWithPipe(
@@ -225,6 +243,11 @@ pub fn create(
}
pub fn setup(this: *FileSink, options: *const FileSink.Options) JSC.Maybe(void) {
if (this.readable_stream.has()) {
// Already started.
return .{ .result = {} };
}
const result = bun.io.openForWriting(
bun.FileDescriptor.cwd(),
options.input_path,
@@ -414,6 +437,7 @@ pub fn flushFromJS(this: *FileSink, globalThis: *JSGlobalObject, wait: bool) JSC
}
pub fn finalize(this: *FileSink) void {
this.readable_stream.deinit();
this.pending.deinit();
this.deref();
}
@@ -495,6 +519,7 @@ pub fn end(this: *FileSink, _: ?bun.sys.Error) JSC.Maybe(void) {
fn deinit(this: *FileSink) void {
this.pending.deinit();
this.writer.deinit();
this.readable_stream.deinit();
if (this.event_loop_handle.globalObject()) |global| {
webcore.AutoFlusher.unregisterDeferredMicrotaskWithType(@This(), this, global.bunVM());
}
@@ -611,6 +636,96 @@ pub const FlushPendingTask = struct {
}
};
/// Does not ref or unref.
fn handleResolveStream(this: *FileSink, globalThis: *JSC.JSGlobalObject) void {
if (this.readable_stream.get(globalThis)) |*stream| {
stream.done(globalThis);
}
if (!this.done) {
this.writer.close();
}
}
/// Does not ref or unref.
fn handleRejectStream(this: *FileSink, globalThis: *JSC.JSGlobalObject, _: JSC.JSValue) void {
if (this.readable_stream.get(globalThis)) |*stream| {
stream.abort(globalThis);
}
if (!this.done) {
this.writer.close();
}
}
fn onResolveStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
log("onResolveStream", .{});
var args = callframe.arguments();
var this: *@This() = args[args.len - 1].asPromisePtr(@This());
defer this.deref();
this.handleResolveStream(globalThis);
return .js_undefined;
}
fn onRejectStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
log("onRejectStream", .{});
const args = callframe.arguments();
var this = args[args.len - 1].asPromisePtr(@This());
const err = args[0];
defer this.deref();
this.handleRejectStream(globalThis, err);
return .js_undefined;
}
pub fn assignToStream(this: *FileSink, stream: *JSC.WebCore.ReadableStream, globalThis: *JSGlobalObject) JSC.JSValue {
var signal = &this.signal;
signal.* = JSC.WebCore.FileSink.JSSink.SinkSignal.init(JSValue.zero);
this.ref();
defer this.deref();
// explicitly set it to a dead pointer
// we use this memory address to disable signals being sent
signal.clear();
this.readable_stream = .init(stream.*, globalThis);
const promise_result = JSC.WebCore.FileSink.JSSink.assignToStream(globalThis, stream.value, this, @as(**anyopaque, @ptrCast(&signal.ptr)));
if (promise_result.toError()) |err| {
this.readable_stream.deinit();
this.readable_stream = .{};
return err;
}
if (!promise_result.isEmptyOrUndefinedOrNull()) {
if (promise_result.asAnyPromise()) |promise| {
switch (promise.status(globalThis.vm())) {
.pending => {
this.ref();
promise_result.then(globalThis, this, onResolveStream, onRejectStream);
},
.fulfilled => {
// These don't ref().
this.handleResolveStream(globalThis);
},
.rejected => {
// These don't ref().
this.handleRejectStream(globalThis, promise.result(globalThis.vm()));
},
}
}
}
return promise_result;
}
comptime {
const export_prefix = "Bun__FileSink";
if (bun.Environment.export_cpp_apis) {
@export(&JSC.toJSHostFn(onResolveStream), .{ .name = export_prefix ++ "__onResolveStream" });
@export(&JSC.toJSHostFn(onRejectStream), .{ .name = export_prefix ++ "__onRejectStream" });
}
}
const std = @import("std");
const bun = @import("bun");
const uv = bun.windows.libuv;

View File

@@ -149,7 +149,7 @@ pub const ShellSubprocess = struct {
if (Environment.isWindows) {
switch (stdio) {
.pipe => {
.pipe, .readable_stream => {
if (result == .buffer) {
const pipe = JSC.WebCore.FileSink.createWithPipe(event_loop, result.buffer);
@@ -236,6 +236,10 @@ pub const ShellSubprocess = struct {
.ipc, .capture => {
return Writable{ .ignore = {} };
},
.readable_stream => {
// The shell never uses this
@panic("Unimplemented stdin readable_stream");
},
}
}
@@ -384,6 +388,7 @@ pub const ShellSubprocess = struct {
return readable;
},
.capture => Readable{ .pipe = PipeReader.create(event_loop, process, result, shellio, out_type) },
.readable_stream => Readable{ .ignore = {} }, // Shell doesn't use readable_stream
};
}
@@ -405,6 +410,7 @@ pub const ShellSubprocess = struct {
return readable;
},
.capture => Readable{ .pipe = PipeReader.create(event_loop, process, result, shellio, out_type) },
.readable_stream => Readable{ .ignore = {} }, // Shell doesn't use readable_stream
};
}