mirror of
https://github.com/oven-sh/bun
synced 2026-02-09 10:28:47 +00:00
Implement Bun.spawnSync
This commit is contained in:
@@ -1198,6 +1198,9 @@ pub const Class = NewClass(
|
||||
.spawn = .{
|
||||
.rfn = JSC.wrapWithHasContainer(JSC.Subprocess, "spawn", false, false, false),
|
||||
},
|
||||
.spawnSync = .{
|
||||
.rfn = JSC.wrapWithHasContainer(JSC.Subprocess, "spawnSync", false, false, false),
|
||||
},
|
||||
},
|
||||
.{
|
||||
.main = .{
|
||||
|
||||
@@ -174,6 +174,37 @@ pub const Subprocess = struct {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toBufferedValue(this: *Readable, globalThis: *JSC.JSGlobalObject) JSValue {
|
||||
switch (this.*) {
|
||||
.fd => |fd| {
|
||||
return JSValue.jsNumber(fd);
|
||||
},
|
||||
.pipe => {
|
||||
defer this.close();
|
||||
|
||||
// TODO: handle when there's pending unread data in the pipe
|
||||
// For some reason, this currently hangs forever
|
||||
if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
|
||||
if (this.pipe.buffer.canRead())
|
||||
this.pipe.buffer.readIfPossible(true);
|
||||
}
|
||||
|
||||
var bytes = this.pipe.buffer.internal_buffer.slice();
|
||||
this.pipe.buffer.internal_buffer = .{};
|
||||
|
||||
if (bytes.len > 0) {
|
||||
// Return a Buffer so that they can do .toString() on it
|
||||
return JSC.JSValue.createBuffer(globalThis, bytes, bun.default_allocator);
|
||||
}
|
||||
|
||||
return JSC.JSValue.createBuffer(globalThis, &.{}, bun.default_allocator);
|
||||
},
|
||||
else => {
|
||||
return JSValue.jsUndefined();
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub fn getStderr(
|
||||
@@ -321,27 +352,30 @@ pub const Subprocess = struct {
|
||||
|
||||
pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady);
|
||||
|
||||
pub fn onReady(this: *BufferedInput, size_or_offset: i64) void {
|
||||
this.write(@intCast(usize, @maximum(size_or_offset, 0)));
|
||||
pub fn onReady(this: *BufferedInput, _: i64) void {
|
||||
this.write();
|
||||
}
|
||||
|
||||
pub fn canWrite(this: *BufferedInput) bool {
|
||||
return bun.isWritable(this.fd);
|
||||
}
|
||||
|
||||
pub fn writeIfPossible(this: *BufferedInput) void {
|
||||
// we ask, "Is it possible to write right now?"
|
||||
// we do this rather than epoll or kqueue()
|
||||
// because we don't want to block the thread waiting for the write
|
||||
if (!this.canWrite()) {
|
||||
this.watch(this.fd);
|
||||
return;
|
||||
pub fn writeIfPossible(this: *BufferedInput, comptime is_sync: bool) void {
|
||||
if (comptime !is_sync) {
|
||||
|
||||
// we ask, "Is it possible to write right now?"
|
||||
// we do this rather than epoll or kqueue()
|
||||
// because we don't want to block the thread waiting for the write
|
||||
if (!this.canWrite()) {
|
||||
this.watch(this.fd);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
this.write(0);
|
||||
this.write();
|
||||
}
|
||||
|
||||
pub fn write(this: *BufferedInput, _: usize) void {
|
||||
pub fn write(this: *BufferedInput) void {
|
||||
var to_write = this.remain;
|
||||
|
||||
if (to_write.len == 0) {
|
||||
@@ -367,6 +401,11 @@ pub const Subprocess = struct {
|
||||
return;
|
||||
}
|
||||
|
||||
if (e.getErrno() == .PIPE) {
|
||||
this.deinit();
|
||||
return;
|
||||
}
|
||||
|
||||
// fail
|
||||
log("write({d}) fail: {d}", .{ to_write.len, e.errno });
|
||||
this.deinit();
|
||||
@@ -432,28 +471,30 @@ pub const Subprocess = struct {
|
||||
|
||||
pub fn ready(this: *BufferedOutput, _: i64) void {
|
||||
// TODO: what happens if the task was already enqueued after unwatch()?
|
||||
this.readAll();
|
||||
this.readAll(false);
|
||||
}
|
||||
|
||||
pub fn canRead(this: *BufferedOutput) bool {
|
||||
return bun.isReadable(this.fd);
|
||||
}
|
||||
|
||||
pub fn readIfPossible(this: *BufferedOutput) void {
|
||||
// we ask, "Is it possible to read right now?"
|
||||
// we do this rather than epoll or kqueue()
|
||||
// because we don't want to block the thread waiting for the read
|
||||
if (!this.canRead()) {
|
||||
this.watch(this.fd);
|
||||
return;
|
||||
pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void {
|
||||
if (comptime !force) {
|
||||
// we ask, "Is it possible to read right now?"
|
||||
// we do this rather than epoll or kqueue()
|
||||
// because we don't want to block the thread waiting for the read
|
||||
// and because kqueue or epoll might return other unrelated events
|
||||
// and we don't want this to become an event loop ticking point
|
||||
if (!this.canRead()) {
|
||||
this.watch(this.fd);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
this.readAll();
|
||||
this.readAll(force);
|
||||
}
|
||||
|
||||
pub fn readAll(
|
||||
this: *BufferedOutput,
|
||||
) void {
|
||||
pub fn readAll(this: *BufferedOutput, comptime force: bool) void {
|
||||
// read as much as we can from the pipe
|
||||
while (this.internal_buffer.len <= this.max_internal_buffer) {
|
||||
var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined;
|
||||
@@ -465,10 +506,6 @@ pub const Subprocess = struct {
|
||||
buf = available;
|
||||
}
|
||||
|
||||
if (comptime bun.Environment.allow_assert) {
|
||||
// bun.assertNonBlocking(this.fd);
|
||||
}
|
||||
|
||||
switch (JSC.Node.Syscall.read(this.fd, buf)) {
|
||||
.err => |e| {
|
||||
if (e.isRetry()) {
|
||||
@@ -476,6 +513,16 @@ pub const Subprocess = struct {
|
||||
return;
|
||||
}
|
||||
|
||||
// INTR is returned on macOS when the process is killed
|
||||
// It probably sent SIGPIPE but we have the handler for
|
||||
// that disabled.
|
||||
// We know it's the "real" INTR because we use read$NOCANCEL
|
||||
if (e.getErrno() == .INTR) {
|
||||
this.received_eof = true;
|
||||
this.autoCloseFileDescriptor();
|
||||
return;
|
||||
}
|
||||
|
||||
// fail
|
||||
log("readAll() fail: {s}", .{@tagName(e.getErrno())});
|
||||
this.pending_error = e;
|
||||
@@ -495,10 +542,19 @@ pub const Subprocess = struct {
|
||||
}
|
||||
}
|
||||
|
||||
if (buf[bytes_read..].len > 0 or !this.canRead()) {
|
||||
this.watch(this.fd);
|
||||
this.received_eof = true;
|
||||
return;
|
||||
if (comptime !force) {
|
||||
if (buf[bytes_read..].len > 0 or !this.canRead()) {
|
||||
this.watch(this.fd);
|
||||
this.received_eof = true;
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// we consider a short read as being EOF
|
||||
this.received_eof = this.received_eof or bytes_read < buf.len;
|
||||
if (this.received_eof) {
|
||||
this.autoCloseFileDescriptor();
|
||||
return;
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
@@ -732,6 +788,18 @@ pub const Subprocess = struct {
|
||||
}
|
||||
|
||||
pub fn spawn(globalThis: *JSC.JSGlobalObject, args: JSValue) JSValue {
|
||||
return spawnMaybeSync(globalThis, args, false);
|
||||
}
|
||||
|
||||
pub fn spawnSync(globalThis: *JSC.JSGlobalObject, args: JSValue) JSValue {
|
||||
return spawnMaybeSync(globalThis, args, true);
|
||||
}
|
||||
|
||||
pub fn spawnMaybeSync(
|
||||
globalThis: *JSC.JSGlobalObject,
|
||||
args: JSValue,
|
||||
comptime is_sync: bool,
|
||||
) JSValue {
|
||||
var arena = std.heap.ArenaAllocator.init(bun.default_allocator);
|
||||
defer arena.deinit();
|
||||
var allocator = arena.allocator();
|
||||
@@ -751,6 +819,11 @@ pub const Subprocess = struct {
|
||||
.{ .pipe = null },
|
||||
};
|
||||
|
||||
if (comptime is_sync) {
|
||||
stdio[1] = .{ .pipe = null };
|
||||
stdio[2] = .{ .pipe = null };
|
||||
}
|
||||
|
||||
var on_exit_callback = JSValue.zero;
|
||||
var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse "";
|
||||
var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined;
|
||||
@@ -992,7 +1065,7 @@ pub const Subprocess = struct {
|
||||
const kernel = @import("../../../analytics.zig").GenerateHeader.GeneratePlatform.kernelVersion();
|
||||
|
||||
// pidfd_nonblock only supported in 5.10+
|
||||
const flags: u32 = if (kernel.orderWithoutTag(.{ .major = 5, .minor = 10, .patch = 0 }).compare(.gte))
|
||||
const flags: u32 = if (!is_sync and kernel.orderWithoutTag(.{ .major = 5, .minor = 10, .patch = 0 }).compare(.gte))
|
||||
std.os.O.NONBLOCK
|
||||
else
|
||||
0;
|
||||
@@ -1014,15 +1087,12 @@ pub const Subprocess = struct {
|
||||
}
|
||||
};
|
||||
|
||||
// set non-blocking stdin
|
||||
// if (stdio[0].isPiped())
|
||||
// _ = std.os.fcntl(stdin_pipe[1], std.os.F.SETFL, std.os.O.NONBLOCK) catch 0;
|
||||
|
||||
var subprocess = globalThis.allocator().create(Subprocess) catch {
|
||||
globalThis.throw("out of memory", .{});
|
||||
return JSValue.jsUndefined();
|
||||
};
|
||||
|
||||
// When run synchronously, subprocess isn't garbage collected
|
||||
subprocess.* = Subprocess{
|
||||
.globalThis = globalThis,
|
||||
.pid = pid,
|
||||
@@ -1039,25 +1109,28 @@ pub const Subprocess = struct {
|
||||
subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin);
|
||||
}
|
||||
|
||||
const out = subprocess.toJS(globalThis);
|
||||
subprocess.this_jsvalue.set(globalThis, out);
|
||||
const out = if (comptime !is_sync) subprocess.toJS(globalThis) else JSValue.zero;
|
||||
if (comptime !is_sync)
|
||||
subprocess.this_jsvalue.set(globalThis, out);
|
||||
|
||||
switch (globalThis.bunVM().poller.watch(
|
||||
@intCast(JSC.Node.FileDescriptor, pidfd),
|
||||
.process,
|
||||
Subprocess,
|
||||
subprocess,
|
||||
)) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
if (err.getErrno() == .SRCH) {
|
||||
@panic("This shouldn't happen");
|
||||
}
|
||||
if (comptime !is_sync) {
|
||||
switch (globalThis.bunVM().poller.watch(
|
||||
@intCast(JSC.Node.FileDescriptor, pidfd),
|
||||
.process,
|
||||
Subprocess,
|
||||
subprocess,
|
||||
)) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
if (err.getErrno() == .SRCH) {
|
||||
@panic("This shouldn't happen");
|
||||
}
|
||||
|
||||
// process has already exited
|
||||
// https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007
|
||||
subprocess.onExitNotification();
|
||||
},
|
||||
// process has already exited
|
||||
// https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007
|
||||
subprocess.onExitNotification();
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if (subprocess.stdin == .buffered_input) {
|
||||
@@ -1065,29 +1138,54 @@ pub const Subprocess = struct {
|
||||
.blob => subprocess.stdin.buffered_input.source.blob.slice(),
|
||||
.array_buffer => |array_buffer| array_buffer.slice(),
|
||||
};
|
||||
subprocess.stdin.buffered_input.writeIfPossible();
|
||||
subprocess.stdin.buffered_input.writeIfPossible(is_sync);
|
||||
}
|
||||
|
||||
if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
|
||||
// bun.ensureNonBlocking(subprocess.stdout.pipe.buffer.fd);
|
||||
subprocess.stdout.pipe.buffer.readIfPossible();
|
||||
if (comptime is_sync) {
|
||||
if (subprocess.stderr.pipe.buffer.canRead()) {
|
||||
subprocess.stderr.pipe.buffer.readAll(true);
|
||||
}
|
||||
} else {
|
||||
subprocess.stdout.pipe.buffer.readIfPossible(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
|
||||
// bun.ensureNonBlocking(subprocess.stderr.pipe.buffer.fd);
|
||||
subprocess.stderr.pipe.buffer.readIfPossible();
|
||||
if (comptime is_sync) {
|
||||
if (subprocess.stderr.pipe.buffer.canRead()) {
|
||||
subprocess.stderr.pipe.buffer.readAll(true);
|
||||
}
|
||||
} else {
|
||||
subprocess.stderr.pipe.buffer.readIfPossible(false);
|
||||
}
|
||||
}
|
||||
|
||||
return out;
|
||||
if (comptime !is_sync) {
|
||||
return out;
|
||||
}
|
||||
|
||||
subprocess.wait(true);
|
||||
const exitCode = subprocess.exit_code orelse 1;
|
||||
const stdout = subprocess.stdout.toBufferedValue(globalThis);
|
||||
const stderr = subprocess.stderr.toBufferedValue(globalThis);
|
||||
subprocess.finalize();
|
||||
|
||||
const sync_value = JSC.JSValue.createEmptyObject(globalThis, 4);
|
||||
sync_value.put(globalThis, JSC.ZigString.static("exitCode"), JSValue.jsNumber(@intCast(i32, exitCode) * -1));
|
||||
sync_value.put(globalThis, JSC.ZigString.static("stdout"), stdout);
|
||||
sync_value.put(globalThis, JSC.ZigString.static("stderr"), stderr);
|
||||
sync_value.put(globalThis, JSC.ZigString.static("success"), JSValue.jsBoolean(exitCode == 0));
|
||||
return sync_value;
|
||||
}
|
||||
|
||||
pub fn onExitNotification(
|
||||
this: *Subprocess,
|
||||
) void {
|
||||
this.wait(this.globalThis.bunVM());
|
||||
this.wait(false);
|
||||
}
|
||||
|
||||
pub fn wait(this: *Subprocess, vm: *JSC.VirtualMachine) void {
|
||||
pub fn wait(this: *Subprocess, sync: bool) void {
|
||||
if (this.has_waitpid_task) {
|
||||
return;
|
||||
}
|
||||
@@ -1103,9 +1201,11 @@ pub const Subprocess = struct {
|
||||
},
|
||||
}
|
||||
|
||||
this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this);
|
||||
this.has_waitpid_task = true;
|
||||
vm.eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task));
|
||||
if (!sync) {
|
||||
this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this);
|
||||
this.has_waitpid_task = true;
|
||||
this.globalThis.bunVM().eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task));
|
||||
}
|
||||
}
|
||||
|
||||
fn onExit(this: *Subprocess) void {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { it, expect } from "bun:test";
|
||||
import { basename, dirname, join } from "path";
|
||||
import * as fs from "fs";
|
||||
import { readableStreamToText, spawn } from "bun";
|
||||
import { readableStreamToText, spawnSync } from "bun";
|
||||
|
||||
it("should not log .env when quiet", async () => {
|
||||
writeDirectoryTree("/tmp/log-test-silent", {
|
||||
@@ -9,17 +9,12 @@ it("should not log .env when quiet", async () => {
|
||||
"bunfig.toml": `logLevel = "error"`,
|
||||
"index.ts": "export default console.log('Here');",
|
||||
});
|
||||
const out = spawn({
|
||||
const { stderr } = spawnSync({
|
||||
cmd: ["bun", "index.ts"],
|
||||
stdout: "pipe",
|
||||
stderr: "pipe",
|
||||
cwd: "/tmp/log-test-silent",
|
||||
});
|
||||
|
||||
out.ref();
|
||||
await out.exited;
|
||||
const text = await readableStreamToText(out.stderr);
|
||||
expect(text).toBe("");
|
||||
expect(stderr.toString()).toBe("");
|
||||
});
|
||||
|
||||
it("should log .env by default", async () => {
|
||||
@@ -29,17 +24,12 @@ it("should log .env by default", async () => {
|
||||
"index.ts": "export default console.log('Here');",
|
||||
});
|
||||
|
||||
const out = spawn({
|
||||
const { stderr } = spawnSync({
|
||||
cmd: ["bun", "index.ts"],
|
||||
stdout: "pipe",
|
||||
stderr: "pipe",
|
||||
cwd: "/tmp/log-test-silent",
|
||||
});
|
||||
|
||||
out.ref();
|
||||
await out.exited;
|
||||
const text = await readableStreamToText(out.stderr);
|
||||
expect(text.includes(".env")).toBe(true);
|
||||
expect(stderr.toString().includes(".env")).toBe(true);
|
||||
});
|
||||
|
||||
function writeDirectoryTree(base, paths) {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { readableStreamToText, spawn, write } from "bun";
|
||||
import { readableStreamToText, spawn, spawnSync, write } from "bun";
|
||||
import { describe, expect, it } from "bun:test";
|
||||
import { gcTick as _gcTick } from "gc";
|
||||
import { rmdirSync, unlinkSync, rmSync, writeFileSync } from "node:fs";
|
||||
@@ -8,9 +8,49 @@ for (let [gcTick, label] of [
|
||||
[() => {}, "no gc tick"],
|
||||
]) {
|
||||
describe(label, () => {
|
||||
describe("spawnSync", () => {
|
||||
const hugeString = "hello".repeat(10000).slice();
|
||||
|
||||
it("Uint8Array works as stdin", async () => {
|
||||
const { stdout, stderr } = spawnSync({
|
||||
cmd: ["cat"],
|
||||
stdin: new TextEncoder().encode(hugeString),
|
||||
});
|
||||
|
||||
expect(stdout.toString()).toBe(hugeString);
|
||||
expect(stderr.byteLength).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("spawn", () => {
|
||||
const hugeString = "hello".repeat(10000).slice();
|
||||
|
||||
it("Uint8Array works as stdin", async () => {
|
||||
rmSync("/tmp/out.123.txt", { force: true });
|
||||
gcTick();
|
||||
const { exited } = spawn({
|
||||
cmd: ["cat"],
|
||||
stdin: new TextEncoder().encode(hugeString),
|
||||
stdout: Bun.file("/tmp/out.123.txt"),
|
||||
});
|
||||
|
||||
await exited;
|
||||
expect(await Bun.file("/tmp/out.123.txt").text()).toBe(hugeString);
|
||||
});
|
||||
|
||||
it("Blob works as stdin", async () => {
|
||||
rmSync("/tmp/out.123.txt", { force: true });
|
||||
gcTick();
|
||||
const { exited } = spawn({
|
||||
cmd: ["cat"],
|
||||
stdin: new Blob([new TextEncoder().encode(hugeString)]),
|
||||
stdout: Bun.file("/tmp/out.123.txt"),
|
||||
});
|
||||
|
||||
await exited;
|
||||
expect(await Bun.file("/tmp/out.123.txt").text()).toBe(hugeString);
|
||||
});
|
||||
|
||||
it("Bun.file() works as stdout", async () => {
|
||||
rmSync("/tmp/out.123.txt", { force: true });
|
||||
gcTick();
|
||||
|
||||
Reference in New Issue
Block a user