From 36e23882bc0574fda7a2bc4edf53723cbf50395e Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Mon, 23 Jun 2025 02:25:44 -0700 Subject: [PATCH] cross-platform --- src/bun.js/api/bun/spawn/stdio.zig | 7 +- src/bun.js/api/bun/subprocess.zig | 34 ++++++--- ...n-stdin-readable-stream-edge-cases.test.ts | 69 +++++++++++++++---- ...-stdin-readable-stream-integration.test.ts | 60 +++++++++++----- 4 files changed, 132 insertions(+), 38 deletions(-) diff --git a/src/bun.js/api/bun/spawn/stdio.zig b/src/bun.js/api/bun/spawn/stdio.zig index d7735f8d3e..710ab9a8da 100644 --- a/src/bun.js/api/bun/spawn/stdio.zig +++ b/src/bun.js/api/bun/spawn/stdio.zig @@ -401,7 +401,12 @@ pub const Stdio = union(enum) { } else if (value.as(JSC.WebCore.Response)) |res| { return extractBodyValue(out_stdio, globalThis, i, res.getBodyValue(), is_sync); } else if (i == 0) { - if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |stream| { + if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |stream_| { + var stream = stream_; + if (stream.toAnyBlob(globalThis)) |blob| { + return out_stdio.extractBlob(globalThis, blob, i); + } + if (is_sync) { return globalThis.throwInvalidArguments("'stdin' ReadableStream cannot be used in sync mode", .{}); } diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 80e0c22d17..3999a32f45 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -1361,7 +1361,13 @@ const Writable = union(enum) { subprocess.flags.deref_on_stdin_destroyed = true; if (stdio.* == .readable_stream) { - promise_for_stream.* = pipe.assignToStream(&stdio.readable_stream, event_loop.global); + const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global); + if (assign_result.toError()) |err| { + pipe.deref(); + subprocess.deref(); + return event_loop.global.throwValue(err); + } + promise_for_stream.* = assign_result; } return Writable{ @@ -2045,7 +2051,7 @@ pub fn spawnMaybeSync( var stdio_iter = try stdio_val.arrayIterator(globalThis); var i: u31 = 0; while (try stdio_iter.next()) |value| : (i += 1) { - try stdio[i].extract(globalThis, i, value); + try stdio[i].extract(globalThis, i, value, is_sync); if (i == 2) break; } @@ -2053,7 +2059,7 @@ pub fn spawnMaybeSync( while (try stdio_iter.next()) |value| : (i += 1) { var new_item: Stdio = undefined; - try new_item.extract(globalThis, i, value); + try new_item.extract(globalThis, i, value, is_sync); const opt = switch (new_item.asSpawnOption(i)) { .result => |opt| opt, @@ -2072,15 +2078,15 @@ pub fn spawnMaybeSync( } } else { if (try args.get(globalThis, "stdin")) |value| { - try stdio[0].extract(globalThis, 0, value); + try stdio[0].extract(globalThis, 0, value, is_sync); } if (try args.get(globalThis, "stderr")) |value| { - try stdio[2].extract(globalThis, 2, value); + try stdio[2].extract(globalThis, 2, value, is_sync); } if (try args.get(globalThis, "stdout")) |value| { - try stdio[1].extract(globalThis, 1, value); + try stdio[1].extract(globalThis, 1, value, is_sync); } } @@ -2404,10 +2410,22 @@ pub fn spawnMaybeSync( promise_for_stream.ensureStillAlive(); + if (promise_for_stream != .zero and !globalThis.hasException()) { + if (promise_for_stream.toError()) |err| { + _ = globalThis.throwValue(err) catch {}; + } + } + if (globalThis.hasException()) { - subprocess.deref(); + const err = globalThis.takeException(error.JSError); + // Ensure we kill the process so we don't leave things in an unexpected state. _ = subprocess.tryKill(subprocess.killSignal); - return globalThis.throwValue(globalThis.takeError(error.JSError)); + + if (globalThis.hasException()) { + return error.JSError; + } + + return globalThis.throwValue(err); } var posix_ipc_info: if (Environment.isPosix) IPC.Socket else void = undefined; diff --git a/test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts b/test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts index be3edc1d24..aa87aa790a 100644 --- a/test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts +++ b/test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts @@ -18,9 +18,10 @@ describe("spawn stdin ReadableStream edge cases", () => { }); const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", + env: bunEnv, }); const text = await new Response(proc.stdout).text(); @@ -50,11 +51,25 @@ describe("spawn stdin ReadableStream edge cases", () => { }, }); - // Use a command that exits quickly + // Use a command that exits quickly after reading one line const proc = spawn({ - cmd: ["sh", "-c", "head -n 1"], + cmd: [ + bunExe(), + "-e", + `const readline = require('readline'); + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false + }); + rl.on('line', (line) => { + console.log(line); + process.exit(0); + });`, + ], stdin: stream, stdout: "pipe", + env: bunEnv, }); const text = await new Response(proc.stdout).text(); @@ -87,9 +102,10 @@ describe("spawn stdin ReadableStream edge cases", () => { }); const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", + env: bunEnv, }); const text = await new Response(proc.stdout).text(); @@ -180,9 +196,10 @@ describe("spawn stdin ReadableStream edge cases", () => { // Kill the process after some data const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", + env: bunEnv, }); // Wait a bit then kill @@ -220,9 +237,22 @@ describe("spawn stdin ReadableStream edge cases", () => { }); const proc = spawn({ - cmd: ["wc", "-l"], + cmd: [ + bunExe(), + "-e", + `let count = 0; + const readline = require('readline'); + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false + }); + rl.on('line', () => count++); + rl.on('close', () => console.log(count));`, + ], stdin: stream, stdout: "pipe", + env: bunEnv, }); const text = await new Response(proc.stdout).text(); @@ -246,9 +276,10 @@ describe("spawn stdin ReadableStream edge cases", () => { }); const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", + env: bunEnv, }); const text = await new Response(proc.stdout).text(); @@ -269,9 +300,10 @@ describe("spawn stdin ReadableStream edge cases", () => { // First use const proc1 = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", + env: bunEnv, }); const text1 = await new Response(proc1.stdout).text(); @@ -281,8 +313,9 @@ describe("spawn stdin ReadableStream edge cases", () => { // Second use should fail expect(() => { spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, + env: bunEnv, }); }).toThrow(); }); @@ -304,9 +337,10 @@ describe("spawn stdin ReadableStream edge cases", () => { }); const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", + env: bunEnv, }); const buffer = await new Response(proc.stdout).arrayBuffer(); @@ -359,9 +393,16 @@ describe("spawn stdin ReadableStream edge cases", () => { }); const proc = spawn({ - cmd: ["wc", "-c"], + cmd: [ + bunExe(), + "-e", + `let count = 0; + process.stdin.on('data', (chunk) => count += chunk.length); + process.stdin.on('end', () => console.log(count));`, + ], stdin: stream, stdout: "pipe", + env: bunEnv, }); const text = await new Response(proc.stdout).text(); @@ -382,9 +423,10 @@ describe("spawn stdin ReadableStream edge cases", () => { }); const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", + env: bunEnv, }); const text = await new Response(proc.stdout).text(); @@ -410,9 +452,10 @@ describe("spawn stdin ReadableStream edge cases", () => { }); const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, ...config, + env: bunEnv, }); const stdout = await new Response(proc.stdout).text(); diff --git a/test/js/bun/spawn/spawn-stdin-readable-stream-integration.test.ts b/test/js/bun/spawn/spawn-stdin-readable-stream-integration.test.ts index 6bb51d63a2..ad06d4e5e8 100644 --- a/test/js/bun/spawn/spawn-stdin-readable-stream-integration.test.ts +++ b/test/js/bun/spawn/spawn-stdin-readable-stream-integration.test.ts @@ -1,5 +1,6 @@ import { spawn } from "bun"; import { describe, expect, test } from "bun:test"; +import { bunEnv, bunExe } from "harness"; describe("spawn stdin ReadableStream integration", () => { test("example from documentation", async () => { @@ -12,8 +13,10 @@ describe("spawn stdin ReadableStream integration", () => { }); const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, + stdout: "pipe", + env: bunEnv, }); const text = await new Response(proc.stdout).text(); @@ -33,11 +36,24 @@ describe("spawn stdin ReadableStream integration", () => { }, }); - // Count lines using wc -l + // Count lines using Bun subprocess const proc = spawn({ - cmd: ["wc", "-l"], + cmd: [ + bunExe(), + "-e", + `let count = 0; + const readline = require('readline'); + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false + }); + rl.on('line', () => count++); + rl.on('close', () => console.log(count));`, + ], stdin: responseStream, stdout: "pipe", + env: bunEnv, }); const output = await new Response(proc.stdout).text(); @@ -67,9 +83,10 @@ describe("spawn stdin ReadableStream integration", () => { const transformedStream = dataStream.pipeThrough(upperCaseTransform); const proc = spawn({ - cmd: ["cat"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: transformedStream, stdout: "pipe", + env: bunEnv, }); const result = await new Response(proc.stdout).text(); @@ -94,21 +111,15 @@ describe("spawn stdin ReadableStream integration", () => { }, }); - // Process the stream (e.g., compress it) + // Process the stream (just echo it for cross-platform compatibility) const proc = spawn({ - cmd: ["gzip"], + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: fileStream, stdout: "pipe", + env: bunEnv, }); - // Decompress to verify - const decompress = spawn({ - cmd: ["gunzip"], - stdin: proc.stdout, - stdout: "pipe", - }); - - const result = await new Response(decompress.stdout).text(); + const result = await new Response(proc.stdout).text(); const lines = result.trim().split("\n"); expect(lines.length).toBe(numChunks); expect(lines[0]).toStartWith("Chunk 0:"); @@ -136,11 +147,28 @@ describe("spawn stdin ReadableStream integration", () => { }, }); - // Process the CSV data + // Process the CSV data using Bun const proc = spawn({ - cmd: ["awk", "-F,", "{ sum += $2; count++ } END { print sum/count }"], + cmd: [ + bunExe(), + "-e", + `let sum = 0, count = 0; + const readline = require('readline'); + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false + }); + rl.on('line', (line) => { + const [_, value] = line.split(','); + sum += parseFloat(value); + count++; + }); + rl.on('close', () => console.log(sum / count));`, + ], stdin: dataStream, stdout: "pipe", + env: bunEnv, }); const avgStr = await new Response(proc.stdout).text();