cross-platform

This commit is contained in:
Jarred Sumner
2025-06-23 02:25:44 -07:00
parent 29ee24a6cd
commit 36e23882bc
4 changed files with 132 additions and 38 deletions

View File

@@ -401,7 +401,12 @@ pub const Stdio = union(enum) {
} else if (value.as(JSC.WebCore.Response)) |res| {
return extractBodyValue(out_stdio, globalThis, i, res.getBodyValue(), is_sync);
} else if (i == 0) {
if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |stream| {
if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |stream_| {
var stream = stream_;
if (stream.toAnyBlob(globalThis)) |blob| {
return out_stdio.extractBlob(globalThis, blob, i);
}
if (is_sync) {
return globalThis.throwInvalidArguments("'stdin' ReadableStream cannot be used in sync mode", .{});
}

View File

@@ -1361,7 +1361,13 @@ const Writable = union(enum) {
subprocess.flags.deref_on_stdin_destroyed = true;
if (stdio.* == .readable_stream) {
promise_for_stream.* = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
if (assign_result.toError()) |err| {
pipe.deref();
subprocess.deref();
return event_loop.global.throwValue(err);
}
promise_for_stream.* = assign_result;
}
return Writable{
@@ -2045,7 +2051,7 @@ pub fn spawnMaybeSync(
var stdio_iter = try stdio_val.arrayIterator(globalThis);
var i: u31 = 0;
while (try stdio_iter.next()) |value| : (i += 1) {
try stdio[i].extract(globalThis, i, value);
try stdio[i].extract(globalThis, i, value, is_sync);
if (i == 2)
break;
}
@@ -2053,7 +2059,7 @@ pub fn spawnMaybeSync(
while (try stdio_iter.next()) |value| : (i += 1) {
var new_item: Stdio = undefined;
try new_item.extract(globalThis, i, value);
try new_item.extract(globalThis, i, value, is_sync);
const opt = switch (new_item.asSpawnOption(i)) {
.result => |opt| opt,
@@ -2072,15 +2078,15 @@ pub fn spawnMaybeSync(
}
} else {
if (try args.get(globalThis, "stdin")) |value| {
try stdio[0].extract(globalThis, 0, value);
try stdio[0].extract(globalThis, 0, value, is_sync);
}
if (try args.get(globalThis, "stderr")) |value| {
try stdio[2].extract(globalThis, 2, value);
try stdio[2].extract(globalThis, 2, value, is_sync);
}
if (try args.get(globalThis, "stdout")) |value| {
try stdio[1].extract(globalThis, 1, value);
try stdio[1].extract(globalThis, 1, value, is_sync);
}
}
@@ -2404,10 +2410,22 @@ pub fn spawnMaybeSync(
promise_for_stream.ensureStillAlive();
if (promise_for_stream != .zero and !globalThis.hasException()) {
if (promise_for_stream.toError()) |err| {
_ = globalThis.throwValue(err) catch {};
}
}
if (globalThis.hasException()) {
subprocess.deref();
const err = globalThis.takeException(error.JSError);
// Ensure we kill the process so we don't leave things in an unexpected state.
_ = subprocess.tryKill(subprocess.killSignal);
return globalThis.throwValue(globalThis.takeError(error.JSError));
if (globalThis.hasException()) {
return error.JSError;
}
return globalThis.throwValue(err);
}
var posix_ipc_info: if (Environment.isPosix) IPC.Socket else void = undefined;

View File

@@ -18,9 +18,10 @@ describe("spawn stdin ReadableStream edge cases", () => {
});
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await new Response(proc.stdout).text();
@@ -50,11 +51,25 @@ describe("spawn stdin ReadableStream edge cases", () => {
},
});
// Use a command that exits quickly
// Use a command that exits quickly after reading one line
const proc = spawn({
cmd: ["sh", "-c", "head -n 1"],
cmd: [
bunExe(),
"-e",
`const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', (line) => {
console.log(line);
process.exit(0);
});`,
],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await new Response(proc.stdout).text();
@@ -87,9 +102,10 @@ describe("spawn stdin ReadableStream edge cases", () => {
});
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await new Response(proc.stdout).text();
@@ -180,9 +196,10 @@ describe("spawn stdin ReadableStream edge cases", () => {
// Kill the process after some data
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
// Wait a bit then kill
@@ -220,9 +237,22 @@ describe("spawn stdin ReadableStream edge cases", () => {
});
const proc = spawn({
cmd: ["wc", "-l"],
cmd: [
bunExe(),
"-e",
`let count = 0;
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', () => count++);
rl.on('close', () => console.log(count));`,
],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await new Response(proc.stdout).text();
@@ -246,9 +276,10 @@ describe("spawn stdin ReadableStream edge cases", () => {
});
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await new Response(proc.stdout).text();
@@ -269,9 +300,10 @@ describe("spawn stdin ReadableStream edge cases", () => {
// First use
const proc1 = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text1 = await new Response(proc1.stdout).text();
@@ -281,8 +313,9 @@ describe("spawn stdin ReadableStream edge cases", () => {
// Second use should fail
expect(() => {
spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
env: bunEnv,
});
}).toThrow();
});
@@ -304,9 +337,10 @@ describe("spawn stdin ReadableStream edge cases", () => {
});
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const buffer = await new Response(proc.stdout).arrayBuffer();
@@ -359,9 +393,16 @@ describe("spawn stdin ReadableStream edge cases", () => {
});
const proc = spawn({
cmd: ["wc", "-c"],
cmd: [
bunExe(),
"-e",
`let count = 0;
process.stdin.on('data', (chunk) => count += chunk.length);
process.stdin.on('end', () => console.log(count));`,
],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await new Response(proc.stdout).text();
@@ -382,9 +423,10 @@ describe("spawn stdin ReadableStream edge cases", () => {
});
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await new Response(proc.stdout).text();
@@ -410,9 +452,10 @@ describe("spawn stdin ReadableStream edge cases", () => {
});
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
...config,
env: bunEnv,
});
const stdout = await new Response(proc.stdout).text();

View File

@@ -1,5 +1,6 @@
import { spawn } from "bun";
import { describe, expect, test } from "bun:test";
import { bunEnv, bunExe } from "harness";
describe("spawn stdin ReadableStream integration", () => {
test("example from documentation", async () => {
@@ -12,8 +13,10 @@ describe("spawn stdin ReadableStream integration", () => {
});
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await new Response(proc.stdout).text();
@@ -33,11 +36,24 @@ describe("spawn stdin ReadableStream integration", () => {
},
});
// Count lines using wc -l
// Count lines using Bun subprocess
const proc = spawn({
cmd: ["wc", "-l"],
cmd: [
bunExe(),
"-e",
`let count = 0;
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', () => count++);
rl.on('close', () => console.log(count));`,
],
stdin: responseStream,
stdout: "pipe",
env: bunEnv,
});
const output = await new Response(proc.stdout).text();
@@ -67,9 +83,10 @@ describe("spawn stdin ReadableStream integration", () => {
const transformedStream = dataStream.pipeThrough(upperCaseTransform);
const proc = spawn({
cmd: ["cat"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: transformedStream,
stdout: "pipe",
env: bunEnv,
});
const result = await new Response(proc.stdout).text();
@@ -94,21 +111,15 @@ describe("spawn stdin ReadableStream integration", () => {
},
});
// Process the stream (e.g., compress it)
// Process the stream (just echo it for cross-platform compatibility)
const proc = spawn({
cmd: ["gzip"],
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: fileStream,
stdout: "pipe",
env: bunEnv,
});
// Decompress to verify
const decompress = spawn({
cmd: ["gunzip"],
stdin: proc.stdout,
stdout: "pipe",
});
const result = await new Response(decompress.stdout).text();
const result = await new Response(proc.stdout).text();
const lines = result.trim().split("\n");
expect(lines.length).toBe(numChunks);
expect(lines[0]).toStartWith("Chunk 0:");
@@ -136,11 +147,28 @@ describe("spawn stdin ReadableStream integration", () => {
},
});
// Process the CSV data
// Process the CSV data using Bun
const proc = spawn({
cmd: ["awk", "-F,", "{ sum += $2; count++ } END { print sum/count }"],
cmd: [
bunExe(),
"-e",
`let sum = 0, count = 0;
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', (line) => {
const [_, value] = line.split(',');
sum += parseFloat(value);
count++;
});
rl.on('close', () => console.log(sum / count));`,
],
stdin: dataStream,
stdout: "pipe",
env: bunEnv,
});
const avgStr = await new Response(proc.stdout).text();