diff --git a/packages/bun-types/shell.d.ts b/packages/bun-types/shell.d.ts index 278ce91127..285e9d3100 100644 --- a/packages/bun-types/shell.d.ts +++ b/packages/bun-types/shell.d.ts @@ -88,6 +88,38 @@ declare module "bun" { * ``` */ class ShellPromise extends Promise { + /** + * Get a ReadableStream for stdout that streams data as the shell executes. + * + * This allows you to consume stdout incrementally rather than waiting for + * the command to complete. The stream will emit chunks as they're written. + * + * @example + * ```ts + * const shell = $`long-running-command`; + * for await (const chunk of shell.stdout) { + * console.log('Received:', new TextDecoder().decode(chunk)); + * } + * ``` + */ + get stdout(): ReadableStream; + + /** + * Get a ReadableStream for stderr that streams data as the shell executes. + * + * This allows you to consume stderr incrementally rather than waiting for + * the command to complete. The stream will emit chunks as they're written. + * + * @example + * ```ts + * const shell = $`long-running-command`; + * for await (const chunk of shell.stderr) { + * console.error('Error:', new TextDecoder().decode(chunk)); + * } + * ``` + */ + get stderr(): ReadableStream; + get stdin(): WritableStream; /** diff --git a/src/bun.js/api/Shell.classes.ts b/src/bun.js/api/Shell.classes.ts index a86b04e861..b5905af95e 100644 --- a/src/bun.js/api/Shell.classes.ts +++ b/src/bun.js/api/Shell.classes.ts @@ -23,6 +23,14 @@ export default [ fn: "getStarted", length: 0, }, + stdout: { + getter: "getStdout", + cache: true, + }, + stderr: { + getter: "getStderr", + cache: true, + }, }, }), ]; diff --git a/src/bun.js/api/streams.classes.ts b/src/bun.js/api/streams.classes.ts index 140ac646e6..b399001a9a 100644 --- a/src/bun.js/api/streams.classes.ts +++ b/src/bun.js/api/streams.classes.ts @@ -40,9 +40,9 @@ function source(name) { isClosed: { getter: "getIsClosedFromJS", }, - ...(name !== "File" + ...(name !== "File" && name !== "ShellOutputStream" ? // Buffered versions - // not implemented in File, yet. + // not implemented in File and ShellOutputStream yet. { text: { fn: "textFromJS", @@ -80,6 +80,6 @@ function source(name) { }); } -const sources = ["Blob", "File", "Bytes"]; +const sources = ["Blob", "File", "Bytes", "ShellOutputStream"]; export default sources.map(source); diff --git a/src/bun.js/bindings/generated_classes_list.zig b/src/bun.js/bindings/generated_classes_list.zig index 41705dbd11..8184a005fe 100644 --- a/src/bun.js/bindings/generated_classes_list.zig +++ b/src/bun.js/bindings/generated_classes_list.zig @@ -70,6 +70,7 @@ pub const Classes = struct { pub const FileInternalReadableStreamSource = webcore.FileReader.Source; pub const BlobInternalReadableStreamSource = webcore.ByteBlobLoader.Source; pub const BytesInternalReadableStreamSource = webcore.ByteStream.Source; + pub const ShellOutputStreamInternalReadableStreamSource = webcore.ShellOutputStream.Source; pub const PostgresSQLConnection = api.Postgres.PostgresSQLConnection; pub const MySQLConnection = api.MySQL.MySQLConnection; pub const PostgresSQLQuery = api.Postgres.PostgresSQLQuery; diff --git a/src/bun.js/webcore.zig b/src/bun.js/webcore.zig index 5bf55087b4..4957103baf 100644 --- a/src/bun.js/webcore.zig +++ b/src/bun.js/webcore.zig @@ -40,6 +40,7 @@ pub const FetchHeaders = @import("./bindings/FetchHeaders.zig").FetchHeaders; pub const ByteBlobLoader = @import("./webcore/ByteBlobLoader.zig"); pub const ByteStream = @import("./webcore/ByteStream.zig"); pub const FileReader = @import("./webcore/FileReader.zig"); +pub const ShellOutputStream = @import("../shell/ShellOutputStream.zig"); pub const ScriptExecutionContext = @import("./webcore/ScriptExecutionContext.zig"); pub const streams = @import("./webcore/streams.zig"); diff --git a/src/js/builtins/shell.ts b/src/js/builtins/shell.ts index 2b52695099..0794b39548 100644 --- a/src/js/builtins/shell.ts +++ b/src/js/builtins/shell.ts @@ -109,6 +109,7 @@ export function createBunShellTemplateFunction(createShellInterpreter_, createPa #throws: boolean = true; #resolve: (code: number, stdout: Buffer, stderr: Buffer) => void; #reject: (code: number, stdout: Buffer, stderr: Buffer) => void; + #interp: $ZigGeneratedClasses.ShellInterpreter | undefined = undefined; constructor(args: $ZigGeneratedClasses.ParsedShellScript, throws: boolean) { // Create the error immediately so it captures the stacktrace at the point @@ -170,11 +171,22 @@ export function createBunShellTemplateFunction(createShellInterpreter_, createPa this.#hasRun = true; let interp = createShellInterpreter(this.#resolve, this.#reject, this.#args!); + this.#interp = interp; this.#args = undefined; interp.run(); } } + get stdout(): ReadableStream { + this.#run(); + return this.#interp!.stdout; + } + + get stderr(): ReadableStream { + this.#run(); + return this.#interp!.stderr; + } + #quiet(): this { this.#throwIfRunning(); this.#args!.setQuiet(); diff --git a/src/shell/Builtin.zig b/src/shell/Builtin.zig index 8578485590..247cfc93a2 100644 --- a/src/shell/Builtin.zig +++ b/src/shell/Builtin.zig @@ -623,6 +623,7 @@ pub fn done(this: *Builtin, exit_code: anytype) Yield { bun.default_allocator, this.stdout.buf.items[0..], )); + cmd.base.shell.notifyStdoutData(); } // Aggregate output data if shell state is piped and this cmd is piped if (cmd.io.stderr == .pipe and cmd.io.stderr == .pipe and this.stderr == .buf) { @@ -630,6 +631,7 @@ pub fn done(this: *Builtin, exit_code: anytype) Yield { bun.default_allocator, this.stderr.buf.items[0..], )); + cmd.base.shell.notifyStderrData(); } return cmd.parent.childDone(cmd, this.exit_code.?); diff --git a/src/shell/ShellOutputStream.zig b/src/shell/ShellOutputStream.zig new file mode 100644 index 0000000000..631b122194 --- /dev/null +++ b/src/shell/ShellOutputStream.zig @@ -0,0 +1,207 @@ +const std = @import("std"); +const bun = @import("bun"); +const jsc = bun.jsc; +const JSValue = jsc.JSValue; +const JSGlobalObject = jsc.JSGlobalObject; +const webcore = jsc.WebCore; +const Blob = webcore.Blob; +const streams = webcore.streams; +const Output = bun.Output; + +/// ShellOutputStream provides a ReadableStream interface over a ByteList that is +/// being written to during shell execution. It allows streaming stdout/stderr +/// while the shell is still running, rather than waiting for completion. +const ShellOutputStream = @This(); + +/// Pointer to the ByteList being written to by the shell +buffer: *bun.ByteList, +/// Current read offset in the buffer +offset: usize = 0, +/// Whether the shell has finished and no more data will be written +done: bool = false, +/// Pending read operation +pending: streams.Result.Pending = .{ .result = .{ .done = {} } }, +/// Buffer for pending read +pending_buffer: []u8 = &.{}, +/// JSValue for the pending read +pending_value: jsc.Strong.Optional = .empty, + +pub const Source = webcore.ReadableStream.NewSource( + @This(), + "ShellOutputStream", + onStart, + onPull, + onCancel, + deinit, + null, + null, + null, + null, +); + +const log = Output.scoped(.ShellOutputStream, .visible); + +pub fn init(buffer: *bun.ByteList) ShellOutputStream { + return .{ + .buffer = buffer, + }; +} + +pub fn parent(this: *@This()) *Source { + return @fieldParentPtr("context", this); +} + +pub fn onStart(this: *@This()) streams.Start { + // If we already have data, let the consumer know + if (this.buffer.len > 0 and this.done) { + return .{ .chunk_size = 16384 }; + } + + return .{ .ready = {} }; +} + +pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result { + jsc.markBinding(@src()); + bun.assert(buffer.len > 0); + + const available = this.buffer.len -| this.offset; + + if (available > 0) { + const to_copy = @min(available, buffer.len); + @memcpy(buffer[0..to_copy], this.buffer.slice()[this.offset..][0..to_copy]); + this.offset += to_copy; + + // If we've read everything and the shell is done, signal completion + if (this.done and this.offset >= this.buffer.len) { + return .{ + .into_array_and_done = .{ + .value = view, + .len = @as(Blob.SizeType, @truncate(to_copy)), + }, + }; + } + + return .{ + .into_array = .{ + .value = view, + .len = @as(Blob.SizeType, @truncate(to_copy)), + }, + }; + } + + // No data available yet + if (this.done) { + return .{ .done = {} }; + } + + // Wait for data + this.pending_buffer = buffer; + this.pending_value.set(this.parent().globalThis, view); + return .{ + .pending = &this.pending, + }; +} + +pub fn onCancel(this: *@This()) void { + jsc.markBinding(@src()); + this.done = true; + this.pending_value.deinit(); + + if (this.pending.state == .pending) { + this.pending_buffer = &.{}; + this.pending.result.deinit(); + this.pending.result = .{ .done = {} }; + this.pending.run(); + } +} + +pub fn deinit(this: *@This()) void { + jsc.markBinding(@src()); + this.pending_value.deinit(); + + if (!this.done) { + this.done = true; + if (this.pending.state == .pending) { + this.pending_buffer = &.{}; + this.pending.result.deinit(); + this.pending.result = .{ .done = {} }; + if (this.pending.future == .promise) { + this.pending.runOnNextTick(); + } else { + this.pending.run(); + } + } + } + + this.parent().deinit(); +} + +/// Called when new data has been written to the buffer. +/// Resumes any pending read operation. +pub fn onData(this: *@This()) void { + if (this.pending.state != .pending) { + return; + } + + const available = this.buffer.len -| this.offset; + if (available == 0) { + return; + } + + const to_copy = @min(available, this.pending_buffer.len); + @memcpy( + this.pending_buffer[0..to_copy], + this.buffer.slice()[this.offset..][0..to_copy] + ); + this.offset += to_copy; + + const view = this.pending_value.get() orelse { + return; + }; + this.pending_value.clearWithoutDeallocation(); + this.pending_buffer = &.{}; + + const is_done = this.done and this.offset >= this.buffer.len; + + if (is_done) { + this.pending.result = .{ + .into_array_and_done = .{ + .value = view, + .len = @as(Blob.SizeType, @truncate(to_copy)), + }, + }; + } else { + this.pending.result = .{ + .into_array = .{ + .value = view, + .len = @as(Blob.SizeType, @truncate(to_copy)), + }, + }; + } + + this.pending.run(); +} + +/// Called when the shell has finished and no more data will be written. +pub fn setDone(this: *@This()) void { + this.done = true; + + // If we have a pending read and no more data, resolve it as done + if (this.pending.state == .pending) { + const available = this.buffer.len -| this.offset; + if (available == 0) { + this.pending_buffer = &.{}; + const view = this.pending_value.get(); + if (view) |v| { + _ = v; + this.pending_value.clearWithoutDeallocation(); + } + this.pending.result.deinit(); + this.pending.result = .{ .done = {} }; + this.pending.run(); + } else { + // We have data, let onData handle it + this.onData(); + } + } +} diff --git a/src/shell/interpreter.zig b/src/shell/interpreter.zig index b14bcb5b99..8c88333aac 100644 --- a/src/shell/interpreter.zig +++ b/src/shell/interpreter.zig @@ -113,6 +113,8 @@ pub const CallstackGuard = enum(u0) { __i_know_what_i_am_doing }; pub const ExitCode = u16; +pub const ShellOutputStream = @import("./ShellOutputStream.zig"); + pub const StateKind = enum(u8) { script, stmt, @@ -278,6 +280,9 @@ pub const Interpreter = struct { __alloc_scope: if (bun.Environment.enableAllocScopes) bun.AllocationScope else void, + stdout_stream: ?*ShellOutputStream.Source = null, + stderr_stream: ?*ShellOutputStream.Source = null, + // Here are all the state nodes: pub const State = @import("./states/Base.zig"); pub const Script = @import("./states/Script.zig"); @@ -351,6 +356,10 @@ pub const Interpreter = struct { async_pids: SmolList(pid_t, 4) = SmolList(pid_t, 4).zeroes, + /// Reference to the interpreter for stream notifications + /// Only set for the root shell + interpreter: ?*ThisInterpreter = null, + __alloc_scope: if (bun.Environment.enableAllocScopes) *bun.AllocationScope else void, const pid_t = if (bun.Environment.isPosix) std.posix.pid_t else uv.uv_pid_t; @@ -383,6 +392,20 @@ pub const Interpreter = struct { }; } + /// Notify streams that new stdout data is available + pub fn notifyStdoutData(this: *ShellExecEnv) void { + if (this.interpreter) |interp| { + interp.notifyStdoutData(); + } + } + + /// Notify streams that new stderr data is available + pub fn notifyStderrData(this: *ShellExecEnv) void { + if (this.interpreter) |interp| { + interp.notifyStderrData(); + } + } + pub inline fn cwdZ(this: *ShellExecEnv) [:0]const u8 { if (this.__cwd.items.len == 0) return ""; return this.__cwd.items[0..this.__cwd.items.len -| 1 :0]; @@ -872,6 +895,7 @@ pub const Interpreter = struct { } interpreter.root_shell.__alloc_scope = if (bun.Environment.enableAllocScopes) &interpreter.__alloc_scope else {}; + interpreter.root_shell.interpreter = interpreter; return .{ .result = interpreter }; } @@ -1139,6 +1163,9 @@ pub const Interpreter = struct { log("Interpreter(0x{x}) finish {d}", .{ @intFromPtr(this), exit_code }); defer decrPendingActivityFlag(&this.has_pending_activity); + // Mark streams as done before resolving + this.markStreamsDone(); + if (this.event_loop == .js) { defer this.deinitAfterJSRun(); this.exit_code = exit_code; @@ -1281,6 +1308,72 @@ pub const Interpreter = struct { return ioToJSValue(globalThis, this.root_shell.buffered_stderr()); } + pub fn getStdout(this: *ThisInterpreter, globalThis: *JSGlobalObject) JSValue { + if (this.stdout_stream) |stream| { + return stream.toReadableStream(globalThis) catch |err| { + globalThis.reportActiveExceptionAsUnhandled(err); + return .zero; + }; + } + + // Create the stream + var source = ShellOutputStream.Source.new(.{ + .globalThis = globalThis, + .context = ShellOutputStream.init(this.root_shell.buffered_stdout()), + }); + this.stdout_stream = source; + + return source.toReadableStream(globalThis) catch |err| { + globalThis.reportActiveExceptionAsUnhandled(err); + return .zero; + }; + } + + pub fn getStderr(this: *ThisInterpreter, globalThis: *JSGlobalObject) JSValue { + if (this.stderr_stream) |stream| { + return stream.toReadableStream(globalThis) catch |err| { + globalThis.reportActiveExceptionAsUnhandled(err); + return .zero; + }; + } + + // Create the stream + var source = ShellOutputStream.Source.new(.{ + .globalThis = globalThis, + .context = ShellOutputStream.init(this.root_shell.buffered_stderr()), + }); + this.stderr_stream = source; + + return source.toReadableStream(globalThis) catch |err| { + globalThis.reportActiveExceptionAsUnhandled(err); + return .zero; + }; + } + + /// Notify stdout stream that new data is available + pub fn notifyStdoutData(this: *ThisInterpreter) void { + if (this.stdout_stream) |stream| { + stream.context.onData(); + } + } + + /// Notify stderr stream that new data is available + pub fn notifyStderrData(this: *ThisInterpreter) void { + if (this.stderr_stream) |stream| { + stream.context.onData(); + } + } + + /// Mark streams as done when shell finishes + fn markStreamsDone(this: *ThisInterpreter) void { + if (this.stdout_stream) |stream| { + stream.context.setDone(); + } + if (this.stderr_stream) |stream| { + stream.context.setDone(); + } + } + pub fn finalize(this: *ThisInterpreter) void { log("Interpreter(0x{x}) finalize", .{@intFromPtr(this)}); this.deinitFromFinalizer(); diff --git a/src/shell/states/Cmd.zig b/src/shell/states/Cmd.zig index 8a732f27d6..41d67c07ab 100644 --- a/src/shell/states/Cmd.zig +++ b/src/shell/states/Cmd.zig @@ -155,6 +155,7 @@ const BufferedIoClosed = struct { if (cmd.io.stdout == .pipe and cmd.io.stdout == .pipe and !cmd.node.redirect.redirectsElsewhere(.stdout)) { const the_slice = readable.pipe.slice(); bun.handleOom(cmd.base.shell.buffered_stdout().appendSlice(bun.default_allocator, the_slice)); + cmd.base.shell.notifyStdoutData(); } var buffer = readable.pipe.takeBuffer(); @@ -169,6 +170,7 @@ const BufferedIoClosed = struct { if (cmd.io.stderr == .pipe and cmd.io.stderr == .pipe and !cmd.node.redirect.redirectsElsewhere(.stderr)) { const the_slice = readable.pipe.slice(); bun.handleOom(cmd.base.shell.buffered_stderr().appendSlice(bun.default_allocator, the_slice)); + cmd.base.shell.notifyStderrData(); } var buffer = readable.pipe.takeBuffer(); diff --git a/test/js/bun/shell/shell-streaming.test.ts b/test/js/bun/shell/shell-streaming.test.ts new file mode 100644 index 0000000000..108515b808 --- /dev/null +++ b/test/js/bun/shell/shell-streaming.test.ts @@ -0,0 +1,113 @@ +import { test, expect, describe } from "bun:test"; +import { $ } from "bun"; + +describe("Shell streaming stdout/stderr", () => { + test("stdout returns a ReadableStream", async () => { + const shell = $`echo "hello world"`; + const stdout = shell.stdout; + + expect(stdout).toBeInstanceOf(ReadableStream); + + // Consume the stream + const chunks: Uint8Array[] = []; + for await (const chunk of stdout) { + chunks.push(chunk); + } + + const text = new TextDecoder().decode(Buffer.concat(chunks)); + expect(text.trim()).toBe("hello world"); + + // Wait for shell to complete + await shell; + }); + + test("stderr returns a ReadableStream", async () => { + const shell = $`node -e "console.error('error message')"`.nothrow(); + const stderr = shell.stderr; + + expect(stderr).toBeInstanceOf(ReadableStream); + + // Consume the stream + const chunks: Uint8Array[] = []; + for await (const chunk of stderr) { + chunks.push(chunk); + } + + const text = new TextDecoder().decode(Buffer.concat(chunks)); + expect(text.trim()).toBe("error message"); + + // Wait for shell to complete + await shell; + }); + + test("can read stdout stream while command is running", async () => { + const shell = $`node -e " + for (let i = 0; i < 3; i++) { + console.log('line ' + i); + } + "`; + + const chunks: string[] = []; + const reader = shell.stdout.getReader(); + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(decoder.decode(value, { stream: true })); + } + } finally { + reader.releaseLock(); + } + + const output = chunks.join(''); + expect(output).toContain("line 0"); + expect(output).toContain("line 1"); + expect(output).toContain("line 2"); + + await shell; + }); + + test("stdout and stderr work independently", async () => { + const shell = $`node -e " + console.log('stdout message'); + console.error('stderr message'); + "`.nothrow(); + + const stdoutPromise = (async () => { + const chunks: Uint8Array[] = []; + for await (const chunk of shell.stdout) { + chunks.push(chunk); + } + return new TextDecoder().decode(Buffer.concat(chunks)); + })(); + + const stderrPromise = (async () => { + const chunks: Uint8Array[] = []; + for await (const chunk of shell.stderr) { + chunks.push(chunk); + } + return new TextDecoder().decode(Buffer.concat(chunks)); + })(); + + const [stdoutText, stderrText] = await Promise.all([stdoutPromise, stderrPromise]); + + expect(stdoutText.trim()).toBe("stdout message"); + expect(stderrText.trim()).toBe("stderr message"); + + await shell; + }); + + test("can access stdout stream multiple times", async () => { + const shell = $`echo "test"`; + + const stream1 = shell.stdout; + const stream2 = shell.stdout; + + // Should return the same stream instance + expect(stream1).toBe(stream2); + + await shell; + }); +});