Files
bun.sh/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts
Jarred Sumner 2e02d9de28 Use ReadableStream.prototype.* in tests instead of new Response(...).* (#20937)
Co-authored-by: Jarred-Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: Alistair Smith <hi@alistair.sh>
Co-authored-by: Claude Bot <claude-bot@bun.sh>
Co-authored-by: Claude <noreply@anthropic.com>
2025-07-14 00:47:53 -07:00

591 lines
16 KiB
TypeScript

import { spawn } from "bun";
import { describe, expect, mock, test } from "bun:test";
import { bunEnv, bunExe, expectMaxObjectTypeCount, isASAN, isCI } from "harness";
describe("spawn stdin ReadableStream", () => {
test("basic ReadableStream as stdin", async () => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello from stream");
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("hello from stream");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with multiple chunks", async () => {
const chunks = ["chunk1\n", "chunk2\n", "chunk3\n"];
const stream = new ReadableStream({
start(controller) {
for (const chunk of chunks) {
controller.enqueue(chunk);
}
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe(chunks.join(""));
expect(await proc.exited).toBe(0);
});
test("ReadableStream with Uint8Array chunks", async () => {
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode("binary "));
controller.enqueue(encoder.encode("data "));
controller.enqueue(encoder.encode("stream"));
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("binary data stream");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with delays between chunks", async () => {
const stream = new ReadableStream({
async start(controller) {
controller.enqueue("first\n");
await Bun.sleep(50);
controller.enqueue("second\n");
await Bun.sleep(50);
controller.enqueue("third\n");
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("first\nsecond\nthird\n");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with pull method", async () => {
let pullCount = 0;
const stream = new ReadableStream({
pull(controller) {
pullCount++;
if (pullCount <= 3) {
controller.enqueue(`pull ${pullCount}\n`);
} else {
controller.close();
}
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("pull 1\npull 2\npull 3\n");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with async pull and delays", async () => {
let pullCount = 0;
const stream = new ReadableStream({
async pull(controller) {
pullCount++;
if (pullCount <= 3) {
await Bun.sleep(30);
controller.enqueue(`async pull ${pullCount}\n`);
} else {
controller.close();
}
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("async pull 1\nasync pull 2\nasync pull 3\n");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with large data", async () => {
const largeData = "x".repeat(1024 * 1024); // 1MB
const stream = new ReadableStream({
start(controller) {
controller.enqueue(largeData);
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe(largeData);
expect(await proc.exited).toBe(0);
});
test("ReadableStream with very large chunked data", async () => {
const chunkSize = 64 * 1024; // 64KB chunks
const numChunks = 16; // 1MB total
let pushedChunks = 0;
const stream = new ReadableStream({
pull(controller) {
if (pushedChunks < numChunks) {
controller.enqueue("x".repeat(chunkSize));
pushedChunks++;
} else {
controller.close();
}
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text.length).toBe(chunkSize * numChunks);
expect(text).toBe("x".repeat(chunkSize * numChunks));
expect(await proc.exited).toBe(0);
});
test.todo("ReadableStream cancellation when process exits early", async () => {
let cancelled = false;
let chunksEnqueued = 0;
const stream = new ReadableStream({
async pull(controller) {
// Keep enqueueing data slowly
await Bun.sleep(50);
chunksEnqueued++;
controller.enqueue(`chunk ${chunksEnqueued}\n`);
},
cancel(_reason) {
cancelled = true;
},
});
await using proc = spawn({
cmd: [
bunExe(),
"-e",
`const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
let lines = 0;
rl.on('line', (line) => {
console.log(line);
lines++;
if (lines >= 2) process.exit(0);
});`,
],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
await proc.exited;
// Give some time for cancellation to happen
await Bun.sleep(100);
expect(cancelled).toBe(true);
expect(chunksEnqueued).toBeGreaterThanOrEqual(2);
// head -n 2 should only output 2 lines
expect(text.trim().split("\n").length).toBe(2);
});
test("ReadableStream error handling", async () => {
const stream = new ReadableStream({
async start(controller) {
controller.enqueue("before error\n");
// Give time for the data to be consumed
await Bun.sleep(10);
controller.error(new Error("Stream error"));
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
// Process should receive data before the error
expect(text).toBe("before error\n");
// Process should exit normally (the stream error happens after data is sent)
expect(await proc.exited).toBe(0);
});
test("ReadableStream with process that exits immediately", async () => {
const stream = new ReadableStream({
start(controller) {
// Enqueue a lot of data
for (let i = 0; i < 1000; i++) {
controller.enqueue(`line ${i}\n`);
}
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.exit(0)"], // exits immediately
stdin: stream,
env: bunEnv,
});
expect(await proc.exited).toBe(0);
// Give time for any pending operations
await Bun.sleep(50);
// The stream might be cancelled since the process exits before reading
// This is implementation-dependent behavior
});
test("ReadableStream with process that fails", async () => {
const stream = new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
controller.enqueue("data for failing process\n");
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.exit(1)"],
stdin: stream,
env: bunEnv,
});
expect(await proc.exited).toBe(1);
});
test("already disturbed ReadableStream throws error", async () => {
const stream = new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
controller.enqueue("data");
controller.close();
},
});
// Disturb the stream by reading from it
const reader = stream.getReader();
await reader.read();
reader.releaseLock();
expect(() => {
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
env: bunEnv,
});
}).toThrow("'stdin' ReadableStream has already been used");
});
test("ReadableStream with abort signal calls cancel", async () => {
const controller = new AbortController();
const cancel = mock();
const stream = new ReadableStream({
start(controller) {
controller.enqueue("data before abort\n");
},
async pull(controller) {
// Keep the stream open
// but don't block the event loop.
await Bun.sleep(1);
controller.enqueue("more data\n");
},
cancel,
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
signal: controller.signal,
env: bunEnv,
});
// Give it some time to start
await Bun.sleep(10);
// Abort the process
controller.abort();
try {
await proc.exited;
} catch (e) {
// Process was aborted
}
// The process should have been killed
expect(proc.killed).toBe(true);
expect(cancel).toHaveBeenCalledTimes(1);
});
test("ReadableStream with backpressure", async () => {
let pullCalls = 0;
const maxChunks = 5;
const stream = new ReadableStream({
async pull(controller) {
pullCalls++;
if (pullCalls <= maxChunks) {
// Add async to prevent optimization to blob
await Bun.sleep(0);
controller.enqueue(`chunk ${pullCalls}\n`);
} else {
controller.close();
}
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
await proc.exited;
// The pull method should have been called multiple times
expect(pullCalls).toBeGreaterThan(1);
expect(pullCalls).toBeLessThanOrEqual(maxChunks + 1); // +1 for the close pull
expect(text).toContain("chunk 1\n");
expect(text).toContain(`chunk ${maxChunks}\n`);
});
test("ReadableStream with multiple processes", async () => {
const stream1 = new ReadableStream({
start(controller) {
controller.enqueue("stream1 data");
controller.close();
},
});
const stream2 = new ReadableStream({
start(controller) {
controller.enqueue("stream2 data");
controller.close();
},
});
await using proc1 = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream1,
stdout: "pipe",
env: bunEnv,
});
await using proc2 = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream2,
stdout: "pipe",
env: bunEnv,
});
const [text1, text2] = await Promise.all([new Response(proc1.stdout).text(), new Response(proc2.stdout).text()]);
expect(text1).toBe("stream1 data");
expect(text2).toBe("stream2 data");
expect(await proc1.exited).toBe(0);
expect(await proc2.exited).toBe(0);
});
test("ReadableStream with empty stream", async () => {
const stream = new ReadableStream({
start(controller) {
// Close immediately without enqueueing anything
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with null bytes", async () => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([72, 101, 108, 108, 111, 0, 87, 111, 114, 108, 100])); // "Hello\0World"
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
const buffer = await new Response(proc.stdout).arrayBuffer();
const bytes = new Uint8Array(buffer);
expect(bytes).toEqual(new Uint8Array([72, 101, 108, 108, 111, 0, 87, 111, 114, 108, 100]));
expect(await proc.exited).toBe(0);
});
test("ReadableStream with transform stream", async () => {
// Create a transform stream that uppercases text
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const originalStream = new ReadableStream({
start(controller) {
controller.enqueue("hello ");
controller.enqueue("world");
controller.close();
},
});
const transformedStream = originalStream.pipeThrough(upperCaseTransform);
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: transformedStream,
stdout: "pipe",
env: bunEnv,
});
const text = await proc.stdout.text();
expect(text).toBe("HELLO WORLD");
expect(await proc.exited).toBe(0);
});
test("ReadableStream with tee", async () => {
const originalStream = new ReadableStream({
start(controller) {
controller.enqueue("shared data");
controller.close();
},
});
const [stream1, stream2] = originalStream.tee();
// Use the first branch for the process
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream1,
stdout: "pipe",
env: bunEnv,
});
// Read from the second branch independently
const text2 = await new Response(stream2).text();
const text1 = await proc.stdout.text();
expect(text1).toBe("shared data");
expect(text2).toBe("shared data");
expect(await proc.exited).toBe(0);
});
test("ReadableStream object type count", async () => {
const iterations =
isASAN && isCI
? // With ASAN, entire process gets killed, including the test runner in CI. Likely an OOM or out of file descriptors.
10
: 50;
async function main() {
async function iterate(i: number) {
const stream = new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
controller.enqueue(`iteration ${i}`);
controller.close();
},
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
stderr: "inherit",
env: bunEnv,
});
await Promise.all([proc.stdout.text(), proc.exited]);
}
const promises = Array.from({ length: iterations }, (_, i) => iterate(i));
await Promise.all(promises);
}
await main();
await Bun.sleep(1);
Bun.gc(true);
await Bun.sleep(1);
// Check that we're not leaking objects
await expectMaxObjectTypeCount(expect, "ReadableStream", 10);
await expectMaxObjectTypeCount(expect, "Subprocess", 5);
});
});