mirror of
https://github.com/oven-sh/bun
synced 2026-02-09 10:28:47 +00:00
Co-authored-by: Jarred-Sumner <709451+Jarred-Sumner@users.noreply.github.com> Co-authored-by: Alistair Smith <hi@alistair.sh> Co-authored-by: Claude Bot <claude-bot@bun.sh> Co-authored-by: Claude <noreply@anthropic.com>
480 lines
13 KiB
TypeScript
480 lines
13 KiB
TypeScript
/**
|
|
* Edge case tests for spawn with ReadableStream stdin.
|
|
*
|
|
* **IMPORTANT**: Many of these tests use `await` in ReadableStream constructors
|
|
* (e.g., `await Bun.sleep(0)`, `await 42`) to prevent Bun from optimizing
|
|
* the ReadableStream into a Blob. When a ReadableStream is synchronous and
|
|
* contains only string/buffer data, Bun may normalize it to a Blob for
|
|
* performance reasons. The `await` ensures the stream remains truly streaming
|
|
* and tests the actual ReadableStream code paths in spawn.
|
|
*/
|
|
|
|
import { spawn } from "bun";
|
|
import { describe, expect, test } from "bun:test";
|
|
import { bunEnv, bunExe } from "harness";
|
|
|
|
describe("spawn stdin ReadableStream edge cases", () => {
|
|
test("ReadableStream with exception in pull", async () => {
|
|
let pullCount = 0;
|
|
const stream = new ReadableStream({
|
|
pull(controller) {
|
|
pullCount++;
|
|
if (pullCount === 1) {
|
|
controller.enqueue("chunk 1\n");
|
|
} else if (pullCount === 2) {
|
|
controller.enqueue("chunk 2\n");
|
|
throw new Error("Pull error");
|
|
}
|
|
},
|
|
});
|
|
|
|
const proc = spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text = await proc.stdout.text();
|
|
// Should receive data before the exception
|
|
expect(text).toContain("chunk 1\n");
|
|
expect(text).toContain("chunk 2\n");
|
|
});
|
|
|
|
test("ReadableStream writing after process closed", async () => {
|
|
let writeAttempts = 0;
|
|
let errorOccurred = false;
|
|
|
|
const stream = new ReadableStream({
|
|
async pull(controller) {
|
|
writeAttempts++;
|
|
if (writeAttempts <= 10) {
|
|
await Bun.sleep(100);
|
|
try {
|
|
controller.enqueue(`attempt ${writeAttempts}\n`);
|
|
} catch (e) {
|
|
errorOccurred = true;
|
|
throw e;
|
|
}
|
|
} else {
|
|
controller.close();
|
|
}
|
|
},
|
|
});
|
|
|
|
// Use a command that exits quickly after reading one line
|
|
const proc = spawn({
|
|
cmd: [
|
|
bunExe(),
|
|
"-e",
|
|
`const readline = require('readline');
|
|
const rl = readline.createInterface({
|
|
input: process.stdin,
|
|
output: process.stdout,
|
|
terminal: false
|
|
});
|
|
rl.on('line', (line) => {
|
|
console.log(line);
|
|
process.exit(0);
|
|
});`,
|
|
],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text = await proc.stdout.text();
|
|
await proc.exited;
|
|
|
|
// Give time for more pull attempts
|
|
await Bun.sleep(500);
|
|
|
|
// The stream should have attempted multiple writes but only the first succeeded
|
|
expect(writeAttempts).toBeGreaterThanOrEqual(1);
|
|
expect(text).toBe("attempt 1\n");
|
|
});
|
|
|
|
test("ReadableStream with mixed types", async () => {
|
|
const stream = new ReadableStream({
|
|
start(controller) {
|
|
// String
|
|
controller.enqueue("text ");
|
|
// Uint8Array
|
|
controller.enqueue(new TextEncoder().encode("binary "));
|
|
// ArrayBuffer
|
|
const buffer = new ArrayBuffer(5);
|
|
const view = new Uint8Array(buffer);
|
|
view.set([100, 97, 116, 97, 32]); // "data "
|
|
controller.enqueue(buffer);
|
|
// Another string
|
|
controller.enqueue("end");
|
|
controller.close();
|
|
},
|
|
});
|
|
|
|
const proc = spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text = await proc.stdout.text();
|
|
expect(text).toBe("text binary data end");
|
|
expect(await proc.exited).toBe(0);
|
|
});
|
|
|
|
test("ReadableStream with process consuming data slowly", async () => {
|
|
const chunks: string[] = [];
|
|
for (let i = 0; i < 10; i++) {
|
|
chunks.push(`chunk ${i}\n`);
|
|
}
|
|
|
|
let currentChunk = 0;
|
|
const stream = new ReadableStream({
|
|
pull(controller) {
|
|
if (currentChunk < chunks.length) {
|
|
controller.enqueue(chunks[currentChunk]);
|
|
currentChunk++;
|
|
} else {
|
|
controller.close();
|
|
}
|
|
},
|
|
});
|
|
|
|
// Use a script that reads slowly
|
|
const proc = spawn({
|
|
cmd: [
|
|
bunExe(),
|
|
"-e",
|
|
`
|
|
const readline = require('readline');
|
|
const rl = readline.createInterface({
|
|
input: process.stdin,
|
|
output: process.stdout,
|
|
terminal: false
|
|
});
|
|
|
|
rl.on('line', async (line) => {
|
|
await Bun.sleep(10);
|
|
console.log(line);
|
|
});
|
|
`,
|
|
],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text = await proc.stdout.text();
|
|
const lines = text.trim().split("\n");
|
|
expect(lines.length).toBe(10);
|
|
for (let i = 0; i < 10; i++) {
|
|
expect(lines[i]).toBe(`chunk ${i}`);
|
|
}
|
|
expect(await proc.exited).toBe(0);
|
|
});
|
|
|
|
test.todo("ReadableStream with cancel callback verification", async () => {
|
|
let cancelReason: any = null;
|
|
let cancelCalled = false;
|
|
|
|
const stream = new ReadableStream({
|
|
start(controller) {
|
|
// Start sending data
|
|
let count = 0;
|
|
const interval = setInterval(() => {
|
|
count++;
|
|
try {
|
|
controller.enqueue(`data ${count}\n`);
|
|
} catch (e) {
|
|
clearInterval(interval);
|
|
}
|
|
}, 50);
|
|
|
|
// Store interval for cleanup
|
|
(controller as any).interval = interval;
|
|
},
|
|
cancel(reason) {
|
|
cancelCalled = true;
|
|
cancelReason = reason;
|
|
// Clean up interval if exists
|
|
if ((this as any).interval) {
|
|
clearInterval((this as any).interval);
|
|
}
|
|
},
|
|
});
|
|
|
|
// Kill the process after some data
|
|
const proc = spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
// Wait a bit then kill
|
|
await Bun.sleep(150);
|
|
proc.kill();
|
|
|
|
try {
|
|
await proc.exited;
|
|
} catch (e) {
|
|
// Expected - process was killed
|
|
}
|
|
|
|
// Give time for cancel to be called
|
|
await Bun.sleep(50);
|
|
|
|
expect(cancelCalled).toBe(true);
|
|
});
|
|
|
|
test("ReadableStream with high frequency small chunks", async () => {
|
|
const totalChunks = 1000;
|
|
let sentChunks = 0;
|
|
|
|
const stream = new ReadableStream({
|
|
pull(controller) {
|
|
// Send multiple small chunks per pull
|
|
for (let i = 0; i < 10 && sentChunks < totalChunks; i++) {
|
|
controller.enqueue(`${sentChunks}\n`);
|
|
sentChunks++;
|
|
}
|
|
|
|
if (sentChunks >= totalChunks) {
|
|
controller.close();
|
|
}
|
|
},
|
|
});
|
|
|
|
const proc = spawn({
|
|
cmd: [
|
|
bunExe(),
|
|
"-e",
|
|
`let count = 0;
|
|
const readline = require('readline');
|
|
const rl = readline.createInterface({
|
|
input: process.stdin,
|
|
output: process.stdout,
|
|
terminal: false
|
|
});
|
|
rl.on('line', () => count++);
|
|
rl.on('close', () => console.log(count));`,
|
|
],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text = await proc.stdout.text();
|
|
expect(parseInt(text.trim())).toBe(totalChunks);
|
|
expect(await proc.exited).toBe(0);
|
|
});
|
|
|
|
test("ReadableStream with several pulls", async () => {
|
|
let pullCount = 0;
|
|
|
|
const stream = new ReadableStream({
|
|
pull(controller) {
|
|
pullCount++;
|
|
if (pullCount <= 5) {
|
|
// Enqueue data larger than high water mark
|
|
controller.enqueue(Buffer.alloc(1024, "x"));
|
|
} else {
|
|
controller.close();
|
|
}
|
|
},
|
|
});
|
|
|
|
const proc = spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text = await proc.stdout.text();
|
|
expect(text).toBe("x".repeat(1024 * 5));
|
|
expect(await proc.exited).toBe(0);
|
|
|
|
// TODO: this is not quite right. But it's still godo to have
|
|
expect(pullCount).toBe(6);
|
|
});
|
|
|
|
test("ReadableStream reuse prevention", async () => {
|
|
const stream = new ReadableStream({
|
|
start(controller) {
|
|
controller.enqueue("test data");
|
|
controller.close();
|
|
},
|
|
});
|
|
|
|
// First use
|
|
const proc1 = spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text1 = await new Response(proc1.stdout).text();
|
|
expect(text1).toBe("test data");
|
|
expect(await proc1.exited).toBe(0);
|
|
|
|
// Second use should fail
|
|
expect(() => {
|
|
spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
env: bunEnv,
|
|
});
|
|
}).toThrow();
|
|
});
|
|
|
|
test("ReadableStream with byte stream", async () => {
|
|
const data = new Uint8Array(256);
|
|
for (let i = 0; i < 256; i++) {
|
|
data[i] = i;
|
|
}
|
|
|
|
const stream = new ReadableStream({
|
|
type: "bytes",
|
|
start(controller) {
|
|
// Enqueue as byte chunks
|
|
controller.enqueue(data.slice(0, 128));
|
|
controller.enqueue(data.slice(128, 256));
|
|
controller.close();
|
|
},
|
|
});
|
|
|
|
const proc = spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const buffer = await new Response(proc.stdout).arrayBuffer();
|
|
const result = new Uint8Array(buffer);
|
|
expect(result).toEqual(data);
|
|
expect(await proc.exited).toBe(0);
|
|
});
|
|
|
|
test("ReadableStream with stdin and other pipes", async () => {
|
|
const stream = new ReadableStream({
|
|
start(controller) {
|
|
controller.enqueue("stdin data");
|
|
controller.close();
|
|
},
|
|
});
|
|
|
|
// Create a script that also writes to stdout and stderr
|
|
const script = `
|
|
process.stdin.on('data', (data) => {
|
|
process.stdout.write('stdout: ' + data);
|
|
process.stderr.write('stderr: ' + data);
|
|
});
|
|
`;
|
|
|
|
const proc = spawn({
|
|
cmd: [bunExe(), "-e", script],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
stderr: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const [stdout, stderr] = await Promise.all([proc.stdout.text(), proc.stderr.text()]);
|
|
|
|
expect(stdout).toBe("stdout: stdin data");
|
|
expect(stderr).toBe("stderr: stdin data");
|
|
expect(await proc.exited).toBe(0);
|
|
});
|
|
|
|
test("ReadableStream with very long single chunk", async () => {
|
|
// Create a chunk larger than typical pipe buffer (64KB on most systems)
|
|
const size = 256 * 1024; // 256KB
|
|
const chunk = "a".repeat(size);
|
|
|
|
const stream = new ReadableStream({
|
|
start(controller) {
|
|
controller.enqueue(chunk);
|
|
controller.close();
|
|
},
|
|
});
|
|
|
|
const proc = spawn({
|
|
cmd: [
|
|
bunExe(),
|
|
"-e",
|
|
`let count = 0;
|
|
process.stdin.on('data', (chunk) => count += chunk.length);
|
|
process.stdin.on('end', () => console.log(count));`,
|
|
],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text = await proc.stdout.text();
|
|
expect(parseInt(text.trim())).toBe(size);
|
|
expect(await proc.exited).toBe(0);
|
|
});
|
|
|
|
test("ReadableStream with alternating data types", async () => {
|
|
const stream = new ReadableStream({
|
|
async pull(controller) {
|
|
await Bun.sleep(0);
|
|
|
|
// Alternate between strings and Uint8Arrays
|
|
controller.enqueue("string1 ");
|
|
controller.enqueue(new TextEncoder().encode("binary1 "));
|
|
controller.enqueue("string2 ");
|
|
controller.enqueue(new TextEncoder().encode("binary2"));
|
|
controller.close();
|
|
},
|
|
});
|
|
|
|
await using proc = spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
stdout: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
|
|
const text = await proc.stdout.text();
|
|
expect(text).toBe("string1 binary1 string2 binary2");
|
|
expect(await proc.exited).toBe(0);
|
|
});
|
|
|
|
test("ReadableStream with spawn options variations", async () => {
|
|
// Test with different spawn configurations
|
|
const configs = [
|
|
{ stdout: "pipe", stderr: "ignore" },
|
|
{ stdout: "pipe", stderr: "pipe" },
|
|
{ stdout: "pipe", stderr: "inherit" },
|
|
];
|
|
|
|
for (const config of configs) {
|
|
const stream = new ReadableStream({
|
|
async pull(controller) {
|
|
await Bun.sleep(0);
|
|
controller.enqueue("test input");
|
|
controller.close();
|
|
},
|
|
});
|
|
|
|
const proc = spawn({
|
|
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
|
|
stdin: stream,
|
|
...config,
|
|
env: bunEnv,
|
|
});
|
|
|
|
const stdout = await proc.stdout.text();
|
|
expect(stdout).toBe("test input");
|
|
expect(await proc.exited).toBe(0);
|
|
}
|
|
});
|
|
});
|