From 85258d64bb7fe41cb021cd368de335c367b3af6d Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Sun, 22 Jun 2025 22:42:49 -0700 Subject: [PATCH] Introduce stdin support to Bun.spawn --- docs/api/spawn.md | 34 + src/bun.js/api/bun/spawn/stdio.zig | 36 +- src/bun.js/api/bun/subprocess.zig | 42 +- src/bun.js/bindings/ZigGlobalObject.cpp | 4 + src/bun.js/bindings/ZigGlobalObject.h | 4 +- src/bun.js/bindings/headers.h | 3 + src/bun.js/webcore/FileSink.zig | 115 ++++ src/shell/subproc.zig | 8 +- ...n-stdin-readable-stream-edge-cases.test.ts | 448 +++++++++++++ ...-stdin-readable-stream-integration.test.ts | 153 +++++ .../spawn-stdin-readable-stream-sync.test.ts | 33 + .../spawn/spawn-stdin-readable-stream.test.ts | 586 ++++++++++++++++++ 12 files changed, 1433 insertions(+), 33 deletions(-) create mode 100644 test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts create mode 100644 test/js/bun/spawn/spawn-stdin-readable-stream-integration.test.ts create mode 100644 test/js/bun/spawn/spawn-stdin-readable-stream-sync.test.ts create mode 100644 test/js/bun/spawn/spawn-stdin-readable-stream.test.ts diff --git a/docs/api/spawn.md b/docs/api/spawn.md index d570dc09f2..2cc6081630 100644 --- a/docs/api/spawn.md +++ b/docs/api/spawn.md @@ -113,6 +113,40 @@ proc.stdin.flush(); proc.stdin.end(); ``` +The `ReadableStream` option lets you pipe data from a JavaScript `ReadableStream` directly to the subprocess's input: + +```ts +const stream = new ReadableStream({ + start(controller) { + controller.enqueue("Hello from "); + controller.enqueue("ReadableStream!"); + controller.close(); + }, +}); + +const proc = Bun.spawn(["cat"], { + stdin: stream, + stdout: "pipe", +}); + +const output = await new Response(proc.stdout).text(); +console.log(output); // "Hello from ReadableStream!" +``` + +This is particularly useful for streaming data from HTTP responses, file streams, or any other source that provides a `ReadableStream`: + +```ts +// Stream an HTTP response through a subprocess +const response = await fetch("https://example.com/large-file.txt"); +const proc = Bun.spawn(["gzip"], { + stdin: response.body, + stdout: "pipe", +}); + +// Save the compressed output +await Bun.write("compressed.gz", proc.stdout); +``` + ## Output streams You can read results from the subprocess via the `stdout` and `stderr` properties. By default these are instances of `ReadableStream`. diff --git a/src/bun.js/api/bun/spawn/stdio.zig b/src/bun.js/api/bun/spawn/stdio.zig index 1d640e532a..ce2086210a 100644 --- a/src/bun.js/api/bun/spawn/stdio.zig +++ b/src/bun.js/api/bun/spawn/stdio.zig @@ -23,6 +23,7 @@ pub const Stdio = union(enum) { memfd: bun.FileDescriptor, pipe, ipc, + readable_stream: JSC.WebCore.ReadableStream, const log = bun.sys.syslog; @@ -78,6 +79,9 @@ pub const Stdio = union(enum) { .memfd => |fd| { fd.close(); }, + .readable_stream => { + // ReadableStream cleanup is handled by the subprocess + }, else => {}, } } @@ -191,7 +195,7 @@ pub const Stdio = union(enum) { break :brk .{ .buffer = {} }; }, .dup2 => .{ .dup2 = .{ .out = stdio.dup2.out, .to = stdio.dup2.to } }, - .capture, .pipe, .array_buffer => .{ .buffer = {} }, + .capture, .pipe, .array_buffer, .readable_stream => .{ .buffer = {} }, .ipc => .{ .ipc = {} }, .fd => |fd| .{ .pipe = fd }, .memfd => |fd| .{ .pipe = fd }, @@ -244,7 +248,7 @@ pub const Stdio = union(enum) { break :brk .{ .buffer = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() }; }, .ipc => .{ .ipc = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() }, - .capture, .pipe, .array_buffer => .{ .buffer = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() }, + .capture, .pipe, .array_buffer, .readable_stream => .{ .buffer = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() }, .fd => |fd| .{ .pipe = fd }, .dup2 => .{ .dup2 = .{ .out = stdio.dup2.out, .to = stdio.dup2.to } }, .path => |pathlike| .{ .path = pathlike.slice() }, @@ -277,7 +281,7 @@ pub const Stdio = union(enum) { pub fn isPiped(self: Stdio) bool { return switch (self) { - .capture, .array_buffer, .blob, .pipe => true, + .capture, .array_buffer, .blob, .pipe, .readable_stream => true, .ipc => Environment.isWindows, else => false, }; @@ -351,27 +355,13 @@ pub const Stdio = union(enum) { } else if (value.as(JSC.WebCore.Response)) |req| { req.getBodyValue().toBlobIfPossible(); return out_stdio.extractBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i); - } else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |req_const| { - var req = req_const; - if (i == 0) { - if (req.toAnyBlob(globalThis)) |blob| { - return out_stdio.extractBlob(globalThis, blob, i); - } - - switch (req.ptr) { - .File, .Blob => { - globalThis.throwTODO("Support fd/blob backed ReadableStream in spawn stdin. See https://github.com/oven-sh/bun/issues/8049") catch {}; - return error.JSError; - }, - .Direct, .JavaScript, .Bytes => { - // out_stdio.* = .{ .connect = req }; - globalThis.throwTODO("Re-enable ReadableStream support in spawn stdin. ") catch {}; - return error.JSError; - }, - .Invalid => { - return globalThis.throwInvalidArguments("ReadableStream is in invalid state.", .{}); - }, + } else if (i == 0) { + if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |stream| { + if (stream.isDisturbed(globalThis)) { + return globalThis.throwInvalidArguments("stdin ReadableStream is already disturbed", .{}); } + out_stdio.* = .{ .readable_stream = stream }; + return; } } else if (value.asArrayBuffer(globalThis)) |array_buffer| { // Change in Bun v1.0.34: don't throw for empty ArrayBuffer diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 0e25964198..80e0c22d17 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -446,6 +446,7 @@ const Readable = union(enum) { .pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, max_size) }, .array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}), .capture => Output.panic("TODO: implement capture support in Stdio readable", .{}), + .readable_stream => Readable{ .ignore = {} }, // ReadableStream is handled separately }; } @@ -1265,16 +1266,17 @@ const Writable = union(enum) { pub fn onStart(_: *Writable) void {} pub fn init( - stdio: Stdio, + stdio: *Stdio, event_loop: *JSC.EventLoop, subprocess: *Subprocess, result: StdioResult, + promise_for_stream: *JSC.JSValue, ) !Writable { assertStdioResult(result); if (Environment.isWindows) { - switch (stdio) { - .pipe => { + switch (stdio.*) { + .pipe, .readable_stream => { if (result == .buffer) { const pipe = JSC.WebCore.FileSink.createWithPipe(event_loop, result.buffer); @@ -1328,14 +1330,14 @@ const Writable = union(enum) { } if (comptime Environment.isPosix) { - if (stdio == .pipe) { + if (stdio.* == .pipe) { _ = bun.sys.setNonblocking(result.?); } } - switch (stdio) { + switch (stdio.*) { .dup2 => @panic("TODO dup2 stdio"), - .pipe => { + .pipe, .readable_stream => { const pipe = JSC.WebCore.FileSink.create(event_loop, result.?); switch (pipe.writer.start(pipe.fd, true)) { @@ -1343,16 +1345,24 @@ const Writable = union(enum) { .err => |err| { _ = err; // autofix pipe.deref(); + if (stdio.* == .readable_stream) { + stdio.readable_stream.cancel(event_loop.global); + } + return error.UnexpectedCreatingStdin; }, } + pipe.writer.handle.poll.flags.insert(.socket); + subprocess.weak_file_sink_stdin_ptr = pipe; subprocess.ref(); subprocess.flags.has_stdin_destructor_called = false; subprocess.flags.deref_on_stdin_destroyed = true; - pipe.writer.handle.poll.flags.insert(.socket); + if (stdio.* == .readable_stream) { + promise_for_stream.* = pipe.assignToStream(&stdio.readable_stream, event_loop.global); + } return Writable{ .pipe = pipe, @@ -1518,6 +1528,7 @@ pub fn onProcessExit(this: *Subprocess, process: *Process, status: bun.spawn.Sta this.pid_rusage = rusage.*; const is_sync = this.flags.is_sync; this.clearAbortSignal(); + defer this.deref(); defer this.disconnectIPC(true); @@ -2335,16 +2346,19 @@ pub fn spawnMaybeSync( MaxBuf.createForSubprocess(subprocess, &subprocess.stderr_maxbuf, maxBuffer); MaxBuf.createForSubprocess(subprocess, &subprocess.stdout_maxbuf, maxBuffer); + var promise_for_stream: JSC.JSValue = .zero; + // When run synchronously, subprocess isn't garbage collected subprocess.* = Subprocess{ .globalThis = globalThis, .process = process, .pid_rusage = null, .stdin = Writable.init( - stdio[0], + &stdio[0], loop, subprocess, spawned.stdin, + &promise_for_stream, ) catch { subprocess.deref(); return globalThis.throwOutOfMemory(); @@ -2388,6 +2402,14 @@ pub fn spawnMaybeSync( subprocess.process.setExitHandler(subprocess); + promise_for_stream.ensureStillAlive(); + + if (globalThis.hasException()) { + subprocess.deref(); + _ = subprocess.tryKill(subprocess.killSignal); + return globalThis.throwValue(globalThis.takeError(error.JSError)); + } + var posix_ipc_info: if (Environment.isPosix) IPC.Socket else void = undefined; if (Environment.isPosix and !is_sync) { if (maybe_ipc_mode) |mode| { @@ -2446,6 +2468,10 @@ pub fn spawnMaybeSync( JSC.Codegen.JSSubprocess.ipcCallbackSetCached(out, globalThis, ipc_callback); } + if (stdio[0] == .readable_stream) { + JSC.Codegen.JSSubprocess.stdinSetCached(out, globalThis, stdio[0].readable_stream.value); + } + switch (subprocess.process.watch()) { .result => {}, .err => { diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 615d9486de..d41ace2ea4 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -4406,6 +4406,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h return GlobalObject::PromiseFunctions::Bun__FileStreamWrapper__onResolveRequestStream; } else if (handler == Bun__FileStreamWrapper__onRejectRequestStream) { return GlobalObject::PromiseFunctions::Bun__FileStreamWrapper__onRejectRequestStream; + } else if (handler == Bun__FileSink__onResolveStream) { + return GlobalObject::PromiseFunctions::Bun__FileSink__onResolveStream; + } else if (handler == Bun__FileSink__onRejectStream) { + return GlobalObject::PromiseFunctions::Bun__FileSink__onRejectStream; } else { RELEASE_ASSERT_NOT_REACHED(); } diff --git a/src/bun.js/bindings/ZigGlobalObject.h b/src/bun.js/bindings/ZigGlobalObject.h index f086237c67..dfed8b13d1 100644 --- a/src/bun.js/bindings/ZigGlobalObject.h +++ b/src/bun.js/bindings/ZigGlobalObject.h @@ -379,8 +379,10 @@ public: Bun__S3UploadStream__onResolveRequestStream, Bun__FileStreamWrapper__onRejectRequestStream, Bun__FileStreamWrapper__onResolveRequestStream, + Bun__FileSink__onResolveStream, + Bun__FileSink__onRejectStream, }; - static constexpr size_t promiseFunctionsSize = 34; + static constexpr size_t promiseFunctionsSize = 36; static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC::JSGlobalObject* arg0, JSC::CallFrame* arg1)); diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index c41dda57be..368167e7f4 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -688,6 +688,9 @@ BUN_DECLARE_HOST_FUNCTION(Bun__HTTPRequestContext__onResolveStream); BUN_DECLARE_HOST_FUNCTION(Bun__NodeHTTPRequest__onResolve); BUN_DECLARE_HOST_FUNCTION(Bun__NodeHTTPRequest__onReject); +BUN_DECLARE_HOST_FUNCTION(Bun__FileSink__onResolveStream); +BUN_DECLARE_HOST_FUNCTION(Bun__FileSink__onRejectStream); + #endif #ifdef __cplusplus diff --git a/src/bun.js/webcore/FileSink.zig b/src/bun.js/webcore/FileSink.zig index 837cfe7b8a..5174af3257 100644 --- a/src/bun.js/webcore/FileSink.zig +++ b/src/bun.js/webcore/FileSink.zig @@ -24,6 +24,9 @@ fd: bun.FileDescriptor = bun.invalid_fd, auto_flusher: webcore.AutoFlusher = .{}, run_pending_later: FlushPendingTask = .{}, +/// Currently, only used when `stdin` in `Bun.spawn` is a ReadableStream. +readable_stream: JSC.WebCore.ReadableStream.Strong = .{}, + const log = Output.scoped(.FileSink, false); pub const RefCount = bun.ptr.RefCount(FileSink, "ref_count", deinit, .{}); @@ -79,6 +82,14 @@ pub fn onAttachedProcessExit(this: *FileSink) void { this.pending.result = .{ .err = .fromCode(.PIPE, .write) }; + if (this.readable_stream.has()) { + if (this.event_loop_handle.globalObject()) |global| { + if (this.readable_stream.get(global)) |stream| { + stream.cancel(global); + } + } + } + this.runPending(); if (this.must_be_kept_alive_until_eof) { @@ -182,6 +193,13 @@ pub fn onReady(this: *FileSink) void { pub fn onClose(this: *FileSink) void { log("onClose()", .{}); this.signal.close(null); + if (this.readable_stream.has()) { + if (this.event_loop_handle.globalObject()) |global| { + if (this.readable_stream.get(global)) |stream| { + stream.cancel(global); + } + } + } } pub fn createWithPipe( @@ -225,6 +243,11 @@ pub fn create( } pub fn setup(this: *FileSink, options: *const FileSink.Options) JSC.Maybe(void) { + if (this.readable_stream.has()) { + // Already started. + return .{ .result = {} }; + } + const result = bun.io.openForWriting( bun.FileDescriptor.cwd(), options.input_path, @@ -414,6 +437,7 @@ pub fn flushFromJS(this: *FileSink, globalThis: *JSGlobalObject, wait: bool) JSC } pub fn finalize(this: *FileSink) void { + this.readable_stream.deinit(); this.pending.deinit(); this.deref(); } @@ -495,6 +519,7 @@ pub fn end(this: *FileSink, _: ?bun.sys.Error) JSC.Maybe(void) { fn deinit(this: *FileSink) void { this.pending.deinit(); this.writer.deinit(); + this.readable_stream.deinit(); if (this.event_loop_handle.globalObject()) |global| { webcore.AutoFlusher.unregisterDeferredMicrotaskWithType(@This(), this, global.bunVM()); } @@ -611,6 +636,96 @@ pub const FlushPendingTask = struct { } }; +/// Does not ref or unref. +fn handleResolveStream(this: *FileSink, globalThis: *JSC.JSGlobalObject) void { + if (this.readable_stream.get(globalThis)) |*stream| { + stream.done(globalThis); + } + + if (!this.done) { + this.writer.close(); + } +} + +/// Does not ref or unref. +fn handleRejectStream(this: *FileSink, globalThis: *JSC.JSGlobalObject, _: JSC.JSValue) void { + if (this.readable_stream.get(globalThis)) |*stream| { + stream.abort(globalThis); + } + + if (!this.done) { + this.writer.close(); + } +} + +fn onResolveStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue { + log("onResolveStream", .{}); + var args = callframe.arguments(); + var this: *@This() = args[args.len - 1].asPromisePtr(@This()); + defer this.deref(); + this.handleResolveStream(globalThis); + return .js_undefined; +} +fn onRejectStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue { + log("onRejectStream", .{}); + const args = callframe.arguments(); + var this = args[args.len - 1].asPromisePtr(@This()); + const err = args[0]; + defer this.deref(); + + this.handleRejectStream(globalThis, err); + return .js_undefined; +} + +pub fn assignToStream(this: *FileSink, stream: *JSC.WebCore.ReadableStream, globalThis: *JSGlobalObject) JSC.JSValue { + var signal = &this.signal; + signal.* = JSC.WebCore.FileSink.JSSink.SinkSignal.init(JSValue.zero); + this.ref(); + defer this.deref(); + + // explicitly set it to a dead pointer + // we use this memory address to disable signals being sent + signal.clear(); + + this.readable_stream = .init(stream.*, globalThis); + const promise_result = JSC.WebCore.FileSink.JSSink.assignToStream(globalThis, stream.value, this, @as(**anyopaque, @ptrCast(&signal.ptr))); + + if (promise_result.toError()) |err| { + this.readable_stream.deinit(); + this.readable_stream = .{}; + return err; + } + + if (!promise_result.isEmptyOrUndefinedOrNull()) { + if (promise_result.asAnyPromise()) |promise| { + switch (promise.status(globalThis.vm())) { + .pending => { + this.ref(); + promise_result.then(globalThis, this, onResolveStream, onRejectStream); + }, + .fulfilled => { + // These don't ref(). + this.handleResolveStream(globalThis); + }, + .rejected => { + // These don't ref(). + this.handleRejectStream(globalThis, promise.result(globalThis.vm())); + }, + } + } + } + + return promise_result; +} + +comptime { + const export_prefix = "Bun__FileSink"; + if (bun.Environment.export_cpp_apis) { + @export(&JSC.toJSHostFn(onResolveStream), .{ .name = export_prefix ++ "__onResolveStream" }); + @export(&JSC.toJSHostFn(onRejectStream), .{ .name = export_prefix ++ "__onRejectStream" }); + } +} + const std = @import("std"); const bun = @import("bun"); const uv = bun.windows.libuv; diff --git a/src/shell/subproc.zig b/src/shell/subproc.zig index 93a2e17b41..5f796bcb9a 100644 --- a/src/shell/subproc.zig +++ b/src/shell/subproc.zig @@ -149,7 +149,7 @@ pub const ShellSubprocess = struct { if (Environment.isWindows) { switch (stdio) { - .pipe => { + .pipe, .readable_stream => { if (result == .buffer) { const pipe = JSC.WebCore.FileSink.createWithPipe(event_loop, result.buffer); @@ -236,6 +236,10 @@ pub const ShellSubprocess = struct { .ipc, .capture => { return Writable{ .ignore = {} }; }, + .readable_stream => { + // The shell never uses this + @panic("Unimplemented stdin readable_stream"); + }, } } @@ -384,6 +388,7 @@ pub const ShellSubprocess = struct { return readable; }, .capture => Readable{ .pipe = PipeReader.create(event_loop, process, result, shellio, out_type) }, + .readable_stream => Readable{ .ignore = {} }, // Shell doesn't use readable_stream }; } @@ -405,6 +410,7 @@ pub const ShellSubprocess = struct { return readable; }, .capture => Readable{ .pipe = PipeReader.create(event_loop, process, result, shellio, out_type) }, + .readable_stream => Readable{ .ignore = {} }, // Shell doesn't use readable_stream }; } diff --git a/test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts b/test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts new file mode 100644 index 0000000000..118fe3d111 --- /dev/null +++ b/test/js/bun/spawn/spawn-stdin-readable-stream-edge-cases.test.ts @@ -0,0 +1,448 @@ +import { spawn } from "bun"; +import { describe, expect, test } from "bun:test"; +import { bunEnv, bunExe } from "harness"; + +describe("spawn stdin ReadableStream edge cases", () => { + test("ReadableStream with exception in start", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("before exception\n"); + throw new Error("Start error"); + }, + }); + + // The stream should still work with the data before the exception + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("before exception\n"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with exception in pull", async () => { + let pullCount = 0; + const stream = new ReadableStream({ + pull(controller) { + pullCount++; + if (pullCount === 1) { + controller.enqueue("chunk 1\n"); + } else if (pullCount === 2) { + controller.enqueue("chunk 2\n"); + throw new Error("Pull error"); + } + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + // Should receive data before the exception + expect(text).toContain("chunk 1\n"); + expect(text).toContain("chunk 2\n"); + }); + + test("ReadableStream writing after process closed", async () => { + let writeAttempts = 0; + let errorOccurred = false; + + const stream = new ReadableStream({ + async pull(controller) { + writeAttempts++; + if (writeAttempts <= 10) { + await Bun.sleep(100); + try { + controller.enqueue(`attempt ${writeAttempts}\n`); + } catch (e) { + errorOccurred = true; + throw e; + } + } else { + controller.close(); + } + }, + }); + + // Use a command that exits quickly + const proc = spawn({ + cmd: ["sh", "-c", "head -n 1"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + await proc.exited; + + // Give time for more pull attempts + await Bun.sleep(500); + + // The stream should have attempted multiple writes but only the first succeeded + expect(writeAttempts).toBeGreaterThanOrEqual(1); + expect(text).toBe("attempt 1\n"); + }); + + test("ReadableStream with mixed types", async () => { + const stream = new ReadableStream({ + start(controller) { + // String + controller.enqueue("text "); + // Uint8Array + controller.enqueue(new TextEncoder().encode("binary ")); + // ArrayBuffer + const buffer = new ArrayBuffer(5); + const view = new Uint8Array(buffer); + view.set([100, 97, 116, 97, 32]); // "data " + controller.enqueue(buffer); + // Another string + controller.enqueue("end"); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("text binary data end"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with process consuming data slowly", async () => { + const chunks: string[] = []; + for (let i = 0; i < 10; i++) { + chunks.push(`chunk ${i}\n`); + } + + let currentChunk = 0; + const stream = new ReadableStream({ + pull(controller) { + if (currentChunk < chunks.length) { + controller.enqueue(chunks[currentChunk]); + currentChunk++; + } else { + controller.close(); + } + }, + }); + + // Use a script that reads slowly + const proc = spawn({ + cmd: [ + bunExe(), + "-e", + ` + const readline = require('readline'); + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false + }); + + rl.on('line', async (line) => { + await Bun.sleep(10); + console.log(line); + }); + `, + ], + stdin: stream, + stdout: "pipe", + env: bunEnv, + }); + + const text = await new Response(proc.stdout).text(); + const lines = text.trim().split("\n"); + expect(lines.length).toBe(10); + for (let i = 0; i < 10; i++) { + expect(lines[i]).toBe(`chunk ${i}`); + } + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with cancel callback verification", async () => { + let cancelReason: any = null; + let cancelCalled = false; + + const stream = new ReadableStream({ + start(controller) { + // Start sending data + let count = 0; + const interval = setInterval(() => { + count++; + try { + controller.enqueue(`data ${count}\n`); + } catch (e) { + clearInterval(interval); + } + }, 50); + + // Store interval for cleanup + (controller as any).interval = interval; + }, + cancel(reason) { + cancelCalled = true; + cancelReason = reason; + // Clean up interval if exists + if ((this as any).interval) { + clearInterval((this as any).interval); + } + }, + }); + + // Kill the process after some data + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + // Wait a bit then kill + await Bun.sleep(150); + proc.kill(); + + try { + await proc.exited; + } catch (e) { + // Expected - process was killed + } + + // Give time for cancel to be called + await Bun.sleep(50); + + expect(cancelCalled).toBe(true); + }); + + test("ReadableStream with high frequency small chunks", async () => { + const totalChunks = 1000; + let sentChunks = 0; + + const stream = new ReadableStream({ + pull(controller) { + // Send multiple small chunks per pull + for (let i = 0; i < 10 && sentChunks < totalChunks; i++) { + controller.enqueue(`${sentChunks}\n`); + sentChunks++; + } + + if (sentChunks >= totalChunks) { + controller.close(); + } + }, + }); + + const proc = spawn({ + cmd: ["wc", "-l"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(parseInt(text.trim())).toBe(totalChunks); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with queuing strategy", async () => { + let pullCount = 0; + + const stream = new ReadableStream( + { + pull(controller) { + pullCount++; + if (pullCount <= 5) { + // Enqueue data larger than high water mark + controller.enqueue("x".repeat(1024)); + } else { + controller.close(); + } + }, + }, + { + // Small high water mark to test backpressure + highWaterMark: 1024, + }, + ); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("x".repeat(1024 * 5)); + expect(await proc.exited).toBe(0); + + // Should have been pulled exactly as needed + expect(pullCount).toBe(5); + }); + + test("ReadableStream reuse prevention", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("test data"); + controller.close(); + }, + }); + + // First use + const proc1 = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text1 = await new Response(proc1.stdout).text(); + expect(text1).toBe("test data"); + expect(await proc1.exited).toBe(0); + + // Second use should fail + expect(() => { + spawn({ + cmd: ["cat"], + stdin: stream, + }); + }).toThrow(); + }); + + test("ReadableStream with byte stream", async () => { + const data = new Uint8Array(256); + for (let i = 0; i < 256; i++) { + data[i] = i; + } + + const stream = new ReadableStream({ + type: "bytes", + start(controller) { + // Enqueue as byte chunks + controller.enqueue(data.slice(0, 128)); + controller.enqueue(data.slice(128, 256)); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const buffer = await new Response(proc.stdout).arrayBuffer(); + const result = new Uint8Array(buffer); + expect(result).toEqual(data); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with stdin and other pipes", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("stdin data"); + controller.close(); + }, + }); + + // Create a script that also writes to stdout and stderr + const script = ` + process.stdin.on('data', (data) => { + process.stdout.write('stdout: ' + data); + process.stderr.write('stderr: ' + data); + }); + `; + + const proc = spawn({ + cmd: [bunExe(), "-e", script], + stdin: stream, + stdout: "pipe", + stderr: "pipe", + env: bunEnv, + }); + + const [stdout, stderr] = await Promise.all([new Response(proc.stdout).text(), new Response(proc.stderr).text()]); + + expect(stdout).toBe("stdout: stdin data"); + expect(stderr).toBe("stderr: stdin data"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with very long single chunk", async () => { + // Create a chunk larger than typical pipe buffer (64KB on most systems) + const size = 256 * 1024; // 256KB + const chunk = "a".repeat(size); + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(chunk); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["wc", "-c"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(parseInt(text.trim())).toBe(size); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with alternating data types", async () => { + const stream = new ReadableStream({ + start(controller) { + // Alternate between strings and Uint8Arrays + controller.enqueue("string1 "); + controller.enqueue(new TextEncoder().encode("binary1 ")); + controller.enqueue("string2 "); + controller.enqueue(new TextEncoder().encode("binary2")); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("string1 binary1 string2 binary2"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with spawn options variations", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("test input"); + controller.close(); + }, + }); + + // Test with different spawn configurations + const configs = [ + { stdout: "pipe", stderr: "ignore" }, + { stdout: "pipe", stderr: "pipe" }, + { stdout: "pipe", stderr: "inherit" }, + ]; + + for (const config of configs) { + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + ...config, + }); + + const stdout = await new Response(proc.stdout).text(); + expect(stdout).toBe("test input"); + expect(await proc.exited).toBe(0); + } + }); +}); diff --git a/test/js/bun/spawn/spawn-stdin-readable-stream-integration.test.ts b/test/js/bun/spawn/spawn-stdin-readable-stream-integration.test.ts new file mode 100644 index 0000000000..6bb51d63a2 --- /dev/null +++ b/test/js/bun/spawn/spawn-stdin-readable-stream-integration.test.ts @@ -0,0 +1,153 @@ +import { spawn } from "bun"; +import { describe, expect, test } from "bun:test"; + +describe("spawn stdin ReadableStream integration", () => { + test("example from documentation", async () => { + const stream = new ReadableStream({ + async pull(controller) { + await Bun.sleep(1); + controller.enqueue("some data from a stream"); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + }); + + const text = await new Response(proc.stdout).text(); + console.log(text); // "some data from a stream" + expect(text).toBe("some data from a stream"); + }); + + test("piping HTTP response to process", async () => { + // Simulate an HTTP response stream + const responseStream = new ReadableStream({ + async pull(controller) { + await Bun.sleep(1); + controller.enqueue("Line 1\n"); + controller.enqueue("Line 2\n"); + controller.enqueue("Line 3\n"); + controller.close(); + }, + }); + + // Count lines using wc -l + const proc = spawn({ + cmd: ["wc", "-l"], + stdin: responseStream, + stdout: "pipe", + }); + + const output = await new Response(proc.stdout).text(); + expect(parseInt(output.trim())).toBe(3); + }); + + test("transforming data before passing to process", async () => { + // Original data stream + const dataStream = new ReadableStream({ + async pull(controller) { + await Bun.sleep(1); + controller.enqueue("hello world"); + controller.enqueue("\n"); + controller.enqueue("foo bar"); + controller.close(); + }, + }); + + // Transform to uppercase + const upperCaseTransform = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk.toUpperCase()); + }, + }); + + // Pipe through transform then to process + const transformedStream = dataStream.pipeThrough(upperCaseTransform); + + const proc = spawn({ + cmd: ["cat"], + stdin: transformedStream, + stdout: "pipe", + }); + + const result = await new Response(proc.stdout).text(); + expect(result).toBe("HELLO WORLD\nFOO BAR"); + }); + + test("streaming large file through process", async () => { + // Simulate streaming a large file in chunks + const chunkSize = 1024; + const numChunks = 100; + let currentChunk = 0; + + const fileStream = new ReadableStream({ + pull(controller) { + if (currentChunk < numChunks) { + // Simulate file chunk + controller.enqueue(`Chunk ${currentChunk}: ${"x".repeat(chunkSize - 20)}\n`); + currentChunk++; + } else { + controller.close(); + } + }, + }); + + // Process the stream (e.g., compress it) + const proc = spawn({ + cmd: ["gzip"], + stdin: fileStream, + stdout: "pipe", + }); + + // Decompress to verify + const decompress = spawn({ + cmd: ["gunzip"], + stdin: proc.stdout, + stdout: "pipe", + }); + + const result = await new Response(decompress.stdout).text(); + const lines = result.trim().split("\n"); + expect(lines.length).toBe(numChunks); + expect(lines[0]).toStartWith("Chunk 0:"); + expect(lines[99]).toStartWith("Chunk 99:"); + }); + + test("real-time data processing", async () => { + let dataPoints = 0; + const maxDataPoints = 5; + + // Simulate real-time data stream + const dataStream = new ReadableStream({ + async pull(controller) { + if (dataPoints < maxDataPoints) { + const timestamp = Date.now(); + const value = Math.random() * 100; + controller.enqueue(`${timestamp},${value.toFixed(2)}\n`); + dataPoints++; + + // Simulate real-time delay + await Bun.sleep(10); + } else { + controller.close(); + } + }, + }); + + // Process the CSV data + const proc = spawn({ + cmd: ["awk", "-F,", "{ sum += $2; count++ } END { print sum/count }"], + stdin: dataStream, + stdout: "pipe", + }); + + const avgStr = await new Response(proc.stdout).text(); + const avg = parseFloat(avgStr.trim()); + + // Average should be between 0 and 100 + expect(avg).toBeGreaterThanOrEqual(0); + expect(avg).toBeLessThanOrEqual(100); + }); +}); diff --git a/test/js/bun/spawn/spawn-stdin-readable-stream-sync.test.ts b/test/js/bun/spawn/spawn-stdin-readable-stream-sync.test.ts new file mode 100644 index 0000000000..e3486257ca --- /dev/null +++ b/test/js/bun/spawn/spawn-stdin-readable-stream-sync.test.ts @@ -0,0 +1,33 @@ +import { spawnSync } from "bun"; +import { describe, expect, test } from "bun:test"; + +describe("spawnSync with ReadableStream stdin", () => { + test("spawnSync should throw or handle ReadableStream appropriately", () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("test data"); + controller.close(); + }, + }); + + // spawnSync with ReadableStream should either: + // 1. Throw an error because async streams can't be used synchronously + // 2. Handle it in some special way + + try { + const result = spawnSync({ + cmd: ["cat"], + stdin: stream as any, // Type assertion because it may not be in the types yet + stdout: "pipe", + }); + + // If it doesn't throw, check what happens + if (result.stdout) { + console.log("spawnSync accepted ReadableStream, output:", result.stdout.toString()); + } + } catch (error: any) { + // This is expected - spawnSync shouldn't support async ReadableStream + expect(error.message).toContain("ReadableStream"); + } + }); +}); diff --git a/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts b/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts new file mode 100644 index 0000000000..1b78e5d807 --- /dev/null +++ b/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts @@ -0,0 +1,586 @@ +import { spawn } from "bun"; +import { describe, expect, test } from "bun:test"; +import { expectMaxObjectTypeCount, getMaxFD } from "harness"; + +describe("spawn stdin ReadableStream", () => { + test("basic ReadableStream as stdin", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("hello from stream"); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("hello from stream"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with multiple chunks", async () => { + const chunks = ["chunk1\n", "chunk2\n", "chunk3\n"]; + const stream = new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe(chunks.join("")); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with Uint8Array chunks", async () => { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode("binary ")); + controller.enqueue(encoder.encode("data ")); + controller.enqueue(encoder.encode("stream")); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("binary data stream"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with delays between chunks", async () => { + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue("first\n"); + await Bun.sleep(50); + controller.enqueue("second\n"); + await Bun.sleep(50); + controller.enqueue("third\n"); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("first\nsecond\nthird\n"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with pull method", async () => { + let pullCount = 0; + const stream = new ReadableStream({ + pull(controller) { + pullCount++; + if (pullCount <= 3) { + controller.enqueue(`pull ${pullCount}\n`); + } else { + controller.close(); + } + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("pull 1\npull 2\npull 3\n"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with async pull and delays", async () => { + let pullCount = 0; + const stream = new ReadableStream({ + async pull(controller) { + pullCount++; + if (pullCount <= 3) { + await Bun.sleep(30); + controller.enqueue(`async pull ${pullCount}\n`); + } else { + controller.close(); + } + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("async pull 1\nasync pull 2\nasync pull 3\n"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with large data", async () => { + const largeData = "x".repeat(1024 * 1024); // 1MB + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(largeData); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe(largeData); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with very large chunked data", async () => { + const chunkSize = 64 * 1024; // 64KB chunks + const numChunks = 16; // 1MB total + let pushedChunks = 0; + + const stream = new ReadableStream({ + pull(controller) { + if (pushedChunks < numChunks) { + controller.enqueue("x".repeat(chunkSize)); + pushedChunks++; + } else { + controller.close(); + } + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text.length).toBe(chunkSize * numChunks); + expect(text).toBe("x".repeat(chunkSize * numChunks)); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream cancellation when process exits early", async () => { + let cancelled = false; + let chunksEnqueued = 0; + + const stream = new ReadableStream({ + async pull(controller) { + // Keep enqueueing data slowly + await Bun.sleep(50); + chunksEnqueued++; + controller.enqueue(`chunk ${chunksEnqueued}\n`); + }, + cancel(reason) { + cancelled = true; + }, + }); + + const proc = spawn({ + cmd: ["head", "-n", "2"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + await proc.exited; + + // Give some time for cancellation to happen + await Bun.sleep(100); + + expect(cancelled).toBe(true); + expect(chunksEnqueued).toBeGreaterThanOrEqual(2); + // head -n 2 should only output 2 lines + expect(text.trim().split("\n").length).toBe(2); + }); + + test("ReadableStream error handling", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("before error\n"); + controller.error(new Error("Stream error")); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + // Process should receive data before the error + expect(text).toBe("before error\n"); + + // Process should exit normally (the stream error happens after data is sent) + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with process that exits immediately", async () => { + let cancelled = false; + const stream = new ReadableStream({ + start(controller) { + // Enqueue a lot of data + for (let i = 0; i < 1000; i++) { + controller.enqueue(`line ${i}\n`); + } + controller.close(); + }, + cancel() { + cancelled = true; + }, + }); + + const proc = spawn({ + cmd: ["true"], // exits immediately + stdin: stream, + }); + + expect(await proc.exited).toBe(0); + + // Give time for any pending operations + await Bun.sleep(50); + + // The stream might be cancelled since the process exits before reading + // This is implementation-dependent behavior + }); + + test("ReadableStream with process that fails", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("data for failing process\n"); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["sh", "-c", "exit 1"], + stdin: stream, + }); + + expect(await proc.exited).toBe(1); + }); + + test("already disturbed ReadableStream throws error", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("data"); + controller.close(); + }, + }); + + // Disturb the stream by getting a reader + const reader = stream.getReader(); + reader.releaseLock(); + + expect(() => { + spawn({ + cmd: ["cat"], + stdin: stream, + }); + }).toThrow("stdin ReadableStream is already disturbed"); + }); + + test("ReadableStream with abort signal", async () => { + const controller = new AbortController(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("data before abort\n"); + }, + pull(controller) { + // Keep the stream open + controller.enqueue("more data\n"); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + signal: controller.signal, + }); + + // Give it some time to start + await Bun.sleep(50); + + // Abort the process + controller.abort(); + + try { + await proc.exited; + } catch (e) { + // Process was aborted + } + + // The process should have been killed + expect(proc.killed).toBe(true); + }); + + test("ReadableStream with backpressure", async () => { + let pullCalls = 0; + let totalBytesEnqueued = 0; + const chunkSize = 64 * 1024; // 64KB chunks + + const stream = new ReadableStream({ + pull(controller) { + pullCalls++; + if (totalBytesEnqueued < 1024 * 1024 * 2) { + // 2MB total + const chunk = "x".repeat(chunkSize); + controller.enqueue(chunk); + totalBytesEnqueued += chunk.length; + } else { + controller.close(); + } + }, + }); + + // Use a slow reader to create backpressure + const proc = spawn({ + cmd: ["sh", "-c", 'while IFS= read -r line; do echo "$line"; sleep 0.01; done'], + stdin: stream, + stdout: "pipe", + }); + + const startTime = Date.now(); + let outputLength = 0; + + const reader = proc.stdout.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) { + outputLength += value.length; + // Break after some data to not wait forever + if (outputLength > chunkSize * 2) break; + } + } + } finally { + reader.releaseLock(); + } + + proc.kill(); + await proc.exited; + + // The pull method should have been called multiple times due to backpressure + expect(pullCalls).toBeGreaterThan(1); + }); + + test("ReadableStream with multiple processes", async () => { + const stream1 = new ReadableStream({ + start(controller) { + controller.enqueue("stream1 data"); + controller.close(); + }, + }); + + const stream2 = new ReadableStream({ + start(controller) { + controller.enqueue("stream2 data"); + controller.close(); + }, + }); + + const proc1 = spawn({ + cmd: ["cat"], + stdin: stream1, + stdout: "pipe", + }); + + const proc2 = spawn({ + cmd: ["cat"], + stdin: stream2, + stdout: "pipe", + }); + + const [text1, text2] = await Promise.all([new Response(proc1.stdout).text(), new Response(proc2.stdout).text()]); + + expect(text1).toBe("stream1 data"); + expect(text2).toBe("stream2 data"); + expect(await proc1.exited).toBe(0); + expect(await proc2.exited).toBe(0); + }); + + test("ReadableStream file descriptor cleanup", async () => { + const maxFD = getMaxFD(); + const iterations = 10; + + for (let i = 0; i < iterations; i++) { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(`iteration ${i}\n`); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe(`iteration ${i}\n`); + expect(await proc.exited).toBe(0); + } + + // Force garbage collection + Bun.gc(true); + await Bun.sleep(50); + + // Check that we didn't leak file descriptors + const newMaxFD = getMaxFD(); + expect(newMaxFD).toBeLessThanOrEqual(maxFD + 10); // Allow some variance + }); + + test("ReadableStream with empty stream", async () => { + const stream = new ReadableStream({ + start(controller) { + // Close immediately without enqueueing anything + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe(""); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with null bytes", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([72, 101, 108, 108, 111, 0, 87, 111, 114, 108, 100])); // "Hello\0World" + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + const buffer = await new Response(proc.stdout).arrayBuffer(); + const bytes = new Uint8Array(buffer); + expect(bytes).toEqual(new Uint8Array([72, 101, 108, 108, 111, 0, 87, 111, 114, 108, 100])); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with transform stream", async () => { + // Create a transform stream that uppercases text + const upperCaseTransform = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk.toUpperCase()); + }, + }); + + const originalStream = new ReadableStream({ + start(controller) { + controller.enqueue("hello "); + controller.enqueue("world"); + controller.close(); + }, + }); + + const transformedStream = originalStream.pipeThrough(upperCaseTransform); + + const proc = spawn({ + cmd: ["cat"], + stdin: transformedStream, + stdout: "pipe", + }); + + const text = await new Response(proc.stdout).text(); + expect(text).toBe("HELLO WORLD"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream with tee", async () => { + const originalStream = new ReadableStream({ + start(controller) { + controller.enqueue("shared data"); + controller.close(); + }, + }); + + const [stream1, stream2] = originalStream.tee(); + + // Use the first branch for the process + const proc = spawn({ + cmd: ["cat"], + stdin: stream1, + stdout: "pipe", + }); + + // Read from the second branch independently + const text2 = await new Response(stream2).text(); + + const text1 = await new Response(proc.stdout).text(); + expect(text1).toBe("shared data"); + expect(text2).toBe("shared data"); + expect(await proc.exited).toBe(0); + }); + + test("ReadableStream object type count", async () => { + const iterations = 5; + + for (let i = 0; i < iterations; i++) { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(`iteration ${i}`); + controller.close(); + }, + }); + + const proc = spawn({ + cmd: ["cat"], + stdin: stream, + stdout: "pipe", + }); + + await new Response(proc.stdout).text(); + await proc.exited; + } + + // Force cleanup + Bun.gc(true); + await Bun.sleep(100); + + // Check that we're not leaking objects + await expectMaxObjectTypeCount(expect, "ReadableStream", 10); + await expectMaxObjectTypeCount(expect, "Subprocess", 5); + }); +});