Files
bun.sh/test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts
Jarred Sumner 2e02d9de28 Use ReadableStream.prototype.* in tests instead of new Response(...).* (#20937)
Co-authored-by: Jarred-Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: Alistair Smith <hi@alistair.sh>
Co-authored-by: Claude Bot <claude-bot@bun.sh>
Co-authored-by: Claude <noreply@anthropic.com>
2025-07-14 00:47:53 -07:00

480 lines
13 KiB
TypeScript

/**
* Edge case tests for spawn with ReadableStream stdin.
*
* **IMPORTANT**: Many of these tests use `await` in ReadableStream constructors
* (e.g., `await Bun.sleep(0)`, `await 42`) to prevent Bun from optimizing
* the ReadableStream into a Blob. When a ReadableStream is synchronous and
* contains only string/buffer data, Bun may normalize it to a Blob for
* performance reasons. The `await` ensures the stream remains truly streaming
* and tests the actual ReadableStream code paths in spawn.
*/
import { spawn } from "bun";
import { describe, expect, test } from "bun:test";
import { bunEnv, bunExe } from "harness";
describe("spawn stdin ReadableStream edge cases", () => {
test("ReadableStream with exception in pull", async () => {
let pullCount = 0;
const stream = new ReadableStream({
pull(controller) {
pullCount++;
if (pullCount === 1) {
controller.enqueue("chunk 1\n");
} else if (pullCount === 2) {
controller.enqueue("chunk 2\n");
throw new Error("Pull error");
}
},
});
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
// Should receive data before the exception
expect(text).toContain("chunk 1\n");
expect(text).toContain("chunk 2\n");
});
test("ReadableStream writing after process closed", async () => {
let writeAttempts = 0;
let errorOccurred = false;
const stream = new ReadableStream({
async pull(controller) {
writeAttempts++;
if (writeAttempts <= 10) {
await Bun.sleep(100);
try {
controller.enqueue(`attempt ${writeAttempts}\n`);
} catch (e) {
errorOccurred = true;
throw e;
}
} else {
controller.close();
}
},
});
// Use a command that exits quickly after reading one line
const proc = spawn({
cmd: [
bunExe(),
"-e",
`const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', (line) => {
console.log(line);
process.exit(0);
});`,
],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
await proc.exited;
// Give time for more pull attempts
await Bun.sleep(500);
// The stream should have attempted multiple writes but only the first succeeded
expect(writeAttempts).toBeGreaterThanOrEqual(1);
expect(text).toBe("attempt 1\n");
});
test("ReadableStream with mixed types", async () => {
const stream = new ReadableStream({
start(controller) {
// String
controller.enqueue("text ");
// Uint8Array
controller.enqueue(new TextEncoder().encode("binary "));
// ArrayBuffer
const buffer = new ArrayBuffer(5);
const view = new Uint8Array(buffer);
view.set([100, 97, 116, 97, 32]); // "data "
controller.enqueue(buffer);
// Another string
controller.enqueue("end");
controller.close();
},
});
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("text binary data end");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with process consuming data slowly", async () => {
const chunks: string[] = [];
for (let i = 0; i < 10; i++) {
chunks.push(`chunk ${i}\n`);
}
let currentChunk = 0;
const stream = new ReadableStream({
pull(controller) {
if (currentChunk < chunks.length) {
controller.enqueue(chunks[currentChunk]);
currentChunk++;
} else {
controller.close();
}
},
});
// Use a script that reads slowly
const proc = spawn({
cmd: [
bunExe(),
"-e",
`
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', async (line) => {
await Bun.sleep(10);
console.log(line);
});
`,
],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
const lines = text.trim().split("\n");
expect(lines.length).toBe(10);
for (let i = 0; i < 10; i++) {
expect(lines[i]).toBe(`chunk ${i}`);
}
expect(await proc.exited).toBe(0);
});
test.todo("ReadableStream with cancel callback verification", async () => {
let cancelReason: any = null;
let cancelCalled = false;
const stream = new ReadableStream({
start(controller) {
// Start sending data
let count = 0;
const interval = setInterval(() => {
count++;
try {
controller.enqueue(`data ${count}\n`);
} catch (e) {
clearInterval(interval);
}
}, 50);
// Store interval for cleanup
(controller as any).interval = interval;
},
cancel(reason) {
cancelCalled = true;
cancelReason = reason;
// Clean up interval if exists
if ((this as any).interval) {
clearInterval((this as any).interval);
}
},
});
// Kill the process after some data
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
// Wait a bit then kill
await Bun.sleep(150);
proc.kill();
try {
await proc.exited;
} catch (e) {
// Expected - process was killed
}
// Give time for cancel to be called
await Bun.sleep(50);
expect(cancelCalled).toBe(true);
});
test("ReadableStream with high frequency small chunks", async () => {
const totalChunks = 1000;
let sentChunks = 0;
const stream = new ReadableStream({
pull(controller) {
// Send multiple small chunks per pull
for (let i = 0; i < 10 && sentChunks < totalChunks; i++) {
controller.enqueue(`${sentChunks}\n`);
sentChunks++;
}
if (sentChunks >= totalChunks) {
controller.close();
}
},
});
const proc = spawn({
cmd: [
bunExe(),
"-e",
`let count = 0;
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', () => count++);
rl.on('close', () => console.log(count));`,
],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(parseInt(text.trim())).toBe(totalChunks);
expect(await proc.exited).toBe(0);
});
test("ReadableStream with several pulls", async () => {
let pullCount = 0;
const stream = new ReadableStream({
pull(controller) {
pullCount++;
if (pullCount <= 5) {
// Enqueue data larger than high water mark
controller.enqueue(Buffer.alloc(1024, "x"));
} else {
controller.close();
}
},
});
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("x".repeat(1024 * 5));
expect(await proc.exited).toBe(0);
// TODO: this is not quite right. But it's still godo to have
expect(pullCount).toBe(6);
});
test("ReadableStream reuse prevention", async () => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue("test data");
controller.close();
},
});
// First use
const proc1 = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text1 = await new Response(proc1.stdout).text();
expect(text1).toBe("test data");
expect(await proc1.exited).toBe(0);
// Second use should fail
expect(() => {
spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
env: bunEnv,
});
}).toThrow();
});
test("ReadableStream with byte stream", async () => {
const data = new Uint8Array(256);
for (let i = 0; i < 256; i++) {
data[i] = i;
}
const stream = new ReadableStream({
type: "bytes",
start(controller) {
// Enqueue as byte chunks
controller.enqueue(data.slice(0, 128));
controller.enqueue(data.slice(128, 256));
controller.close();
},
});
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const buffer = await new Response(proc.stdout).arrayBuffer();
const result = new Uint8Array(buffer);
expect(result).toEqual(data);
expect(await proc.exited).toBe(0);
});
test("ReadableStream with stdin and other pipes", async () => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue("stdin data");
controller.close();
},
});
// Create a script that also writes to stdout and stderr
const script = `
process.stdin.on('data', (data) => {
process.stdout.write('stdout: ' + data);
process.stderr.write('stderr: ' + data);
});
`;
const proc = spawn({
cmd: [bunExe(), "-e", script],
stdin: stream,
stdout: "pipe",
stderr: "pipe",
env: bunEnv,
});
const [stdout, stderr] = await Promise.all([proc.stdout.text(), proc.stderr.text()]);
expect(stdout).toBe("stdout: stdin data");
expect(stderr).toBe("stderr: stdin data");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with very long single chunk", async () => {
// Create a chunk larger than typical pipe buffer (64KB on most systems)
const size = 256 * 1024; // 256KB
const chunk = "a".repeat(size);
const stream = new ReadableStream({
start(controller) {
controller.enqueue(chunk);
controller.close();
},
});
const proc = spawn({
cmd: [
bunExe(),
"-e",
`let count = 0;
process.stdin.on('data', (chunk) => count += chunk.length);
process.stdin.on('end', () => console.log(count));`,
],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(parseInt(text.trim())).toBe(size);
expect(await proc.exited).toBe(0);
});
test("ReadableStream with alternating data types", async () => {
const stream = new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
// Alternate between strings and Uint8Arrays
controller.enqueue("string1 ");
controller.enqueue(new TextEncoder().encode("binary1 "));
controller.enqueue("string2 ");
controller.enqueue(new TextEncoder().encode("binary2"));
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("string1 binary1 string2 binary2");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with spawn options variations", async () => {
// Test with different spawn configurations
const configs = [
{ stdout: "pipe", stderr: "ignore" },
{ stdout: "pipe", stderr: "pipe" },
{ stdout: "pipe", stderr: "inherit" },
];
for (const config of configs) {
const stream = new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
controller.enqueue("test input");
controller.close();
},
});
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
...config,
env: bunEnv,
});
const stdout = await proc.stdout.text();
expect(stdout).toBe("test input");
expect(await proc.exited).toBe(0);
}
});
});