Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
4681cbfeec Add streaming support for Bun Shell stdout/stderr
Implements ReadableStream interface for shell stdout and stderr, allowing
real-time consumption of command output instead of waiting for buffering.

- Created ShellOutputStream.zig implementing ReadableStream source
- Added stdout/stderr getters to ShellInterpreter returning streams
- Exposed streams on ShellPromise in shell.ts
- Added data notification hooks in Cmd.zig and Builtin.zig
- Updated TypeScript definitions in shell.d.ts
- Added comprehensive tests in shell-streaming.test.ts

The streams are lazily created on first access and share the same
underlying ByteList buffer used for buffered output. Data notifications
wake pending reads when new output arrives.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-30 22:09:02 +00:00
11 changed files with 474 additions and 3 deletions

View File

@@ -88,6 +88,38 @@ declare module "bun" {
* ```
*/
class ShellPromise extends Promise<ShellOutput> {
/**
* Get a ReadableStream for stdout that streams data as the shell executes.
*
* This allows you to consume stdout incrementally rather than waiting for
* the command to complete. The stream will emit chunks as they're written.
*
* @example
* ```ts
* const shell = $`long-running-command`;
* for await (const chunk of shell.stdout) {
* console.log('Received:', new TextDecoder().decode(chunk));
* }
* ```
*/
get stdout(): ReadableStream<Uint8Array>;
/**
* Get a ReadableStream for stderr that streams data as the shell executes.
*
* This allows you to consume stderr incrementally rather than waiting for
* the command to complete. The stream will emit chunks as they're written.
*
* @example
* ```ts
* const shell = $`long-running-command`;
* for await (const chunk of shell.stderr) {
* console.error('Error:', new TextDecoder().decode(chunk));
* }
* ```
*/
get stderr(): ReadableStream<Uint8Array>;
get stdin(): WritableStream;
/**

View File

@@ -23,6 +23,14 @@ export default [
fn: "getStarted",
length: 0,
},
stdout: {
getter: "getStdout",
cache: true,
},
stderr: {
getter: "getStderr",
cache: true,
},
},
}),
];

View File

@@ -40,9 +40,9 @@ function source(name) {
isClosed: {
getter: "getIsClosedFromJS",
},
...(name !== "File"
...(name !== "File" && name !== "ShellOutputStream"
? // Buffered versions
// not implemented in File, yet.
// not implemented in File and ShellOutputStream yet.
{
text: {
fn: "textFromJS",
@@ -80,6 +80,6 @@ function source(name) {
});
}
const sources = ["Blob", "File", "Bytes"];
const sources = ["Blob", "File", "Bytes", "ShellOutputStream"];
export default sources.map(source);

View File

@@ -70,6 +70,7 @@ pub const Classes = struct {
pub const FileInternalReadableStreamSource = webcore.FileReader.Source;
pub const BlobInternalReadableStreamSource = webcore.ByteBlobLoader.Source;
pub const BytesInternalReadableStreamSource = webcore.ByteStream.Source;
pub const ShellOutputStreamInternalReadableStreamSource = webcore.ShellOutputStream.Source;
pub const PostgresSQLConnection = api.Postgres.PostgresSQLConnection;
pub const MySQLConnection = api.MySQL.MySQLConnection;
pub const PostgresSQLQuery = api.Postgres.PostgresSQLQuery;

View File

@@ -40,6 +40,7 @@ pub const FetchHeaders = @import("./bindings/FetchHeaders.zig").FetchHeaders;
pub const ByteBlobLoader = @import("./webcore/ByteBlobLoader.zig");
pub const ByteStream = @import("./webcore/ByteStream.zig");
pub const FileReader = @import("./webcore/FileReader.zig");
pub const ShellOutputStream = @import("../shell/ShellOutputStream.zig");
pub const ScriptExecutionContext = @import("./webcore/ScriptExecutionContext.zig");
pub const streams = @import("./webcore/streams.zig");

View File

@@ -109,6 +109,7 @@ export function createBunShellTemplateFunction(createShellInterpreter_, createPa
#throws: boolean = true;
#resolve: (code: number, stdout: Buffer, stderr: Buffer) => void;
#reject: (code: number, stdout: Buffer, stderr: Buffer) => void;
#interp: $ZigGeneratedClasses.ShellInterpreter | undefined = undefined;
constructor(args: $ZigGeneratedClasses.ParsedShellScript, throws: boolean) {
// Create the error immediately so it captures the stacktrace at the point
@@ -170,11 +171,22 @@ export function createBunShellTemplateFunction(createShellInterpreter_, createPa
this.#hasRun = true;
let interp = createShellInterpreter(this.#resolve, this.#reject, this.#args!);
this.#interp = interp;
this.#args = undefined;
interp.run();
}
}
get stdout(): ReadableStream {
this.#run();
return this.#interp!.stdout;
}
get stderr(): ReadableStream {
this.#run();
return this.#interp!.stderr;
}
#quiet(): this {
this.#throwIfRunning();
this.#args!.setQuiet();

View File

@@ -623,6 +623,7 @@ pub fn done(this: *Builtin, exit_code: anytype) Yield {
bun.default_allocator,
this.stdout.buf.items[0..],
));
cmd.base.shell.notifyStdoutData();
}
// Aggregate output data if shell state is piped and this cmd is piped
if (cmd.io.stderr == .pipe and cmd.io.stderr == .pipe and this.stderr == .buf) {
@@ -630,6 +631,7 @@ pub fn done(this: *Builtin, exit_code: anytype) Yield {
bun.default_allocator,
this.stderr.buf.items[0..],
));
cmd.base.shell.notifyStderrData();
}
return cmd.parent.childDone(cmd, this.exit_code.?);

View File

@@ -0,0 +1,207 @@
const std = @import("std");
const bun = @import("bun");
const jsc = bun.jsc;
const JSValue = jsc.JSValue;
const JSGlobalObject = jsc.JSGlobalObject;
const webcore = jsc.WebCore;
const Blob = webcore.Blob;
const streams = webcore.streams;
const Output = bun.Output;
/// ShellOutputStream provides a ReadableStream interface over a ByteList that is
/// being written to during shell execution. It allows streaming stdout/stderr
/// while the shell is still running, rather than waiting for completion.
const ShellOutputStream = @This();
/// Pointer to the ByteList being written to by the shell
buffer: *bun.ByteList,
/// Current read offset in the buffer
offset: usize = 0,
/// Whether the shell has finished and no more data will be written
done: bool = false,
/// Pending read operation
pending: streams.Result.Pending = .{ .result = .{ .done = {} } },
/// Buffer for pending read
pending_buffer: []u8 = &.{},
/// JSValue for the pending read
pending_value: jsc.Strong.Optional = .empty,
pub const Source = webcore.ReadableStream.NewSource(
@This(),
"ShellOutputStream",
onStart,
onPull,
onCancel,
deinit,
null,
null,
null,
null,
);
const log = Output.scoped(.ShellOutputStream, .visible);
pub fn init(buffer: *bun.ByteList) ShellOutputStream {
return .{
.buffer = buffer,
};
}
pub fn parent(this: *@This()) *Source {
return @fieldParentPtr("context", this);
}
pub fn onStart(this: *@This()) streams.Start {
// If we already have data, let the consumer know
if (this.buffer.len > 0 and this.done) {
return .{ .chunk_size = 16384 };
}
return .{ .ready = {} };
}
pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
jsc.markBinding(@src());
bun.assert(buffer.len > 0);
const available = this.buffer.len -| this.offset;
if (available > 0) {
const to_copy = @min(available, buffer.len);
@memcpy(buffer[0..to_copy], this.buffer.slice()[this.offset..][0..to_copy]);
this.offset += to_copy;
// If we've read everything and the shell is done, signal completion
if (this.done and this.offset >= this.buffer.len) {
return .{
.into_array_and_done = .{
.value = view,
.len = @as(Blob.SizeType, @truncate(to_copy)),
},
};
}
return .{
.into_array = .{
.value = view,
.len = @as(Blob.SizeType, @truncate(to_copy)),
},
};
}
// No data available yet
if (this.done) {
return .{ .done = {} };
}
// Wait for data
this.pending_buffer = buffer;
this.pending_value.set(this.parent().globalThis, view);
return .{
.pending = &this.pending,
};
}
pub fn onCancel(this: *@This()) void {
jsc.markBinding(@src());
this.done = true;
this.pending_value.deinit();
if (this.pending.state == .pending) {
this.pending_buffer = &.{};
this.pending.result.deinit();
this.pending.result = .{ .done = {} };
this.pending.run();
}
}
pub fn deinit(this: *@This()) void {
jsc.markBinding(@src());
this.pending_value.deinit();
if (!this.done) {
this.done = true;
if (this.pending.state == .pending) {
this.pending_buffer = &.{};
this.pending.result.deinit();
this.pending.result = .{ .done = {} };
if (this.pending.future == .promise) {
this.pending.runOnNextTick();
} else {
this.pending.run();
}
}
}
this.parent().deinit();
}
/// Called when new data has been written to the buffer.
/// Resumes any pending read operation.
pub fn onData(this: *@This()) void {
if (this.pending.state != .pending) {
return;
}
const available = this.buffer.len -| this.offset;
if (available == 0) {
return;
}
const to_copy = @min(available, this.pending_buffer.len);
@memcpy(
this.pending_buffer[0..to_copy],
this.buffer.slice()[this.offset..][0..to_copy]
);
this.offset += to_copy;
const view = this.pending_value.get() orelse {
return;
};
this.pending_value.clearWithoutDeallocation();
this.pending_buffer = &.{};
const is_done = this.done and this.offset >= this.buffer.len;
if (is_done) {
this.pending.result = .{
.into_array_and_done = .{
.value = view,
.len = @as(Blob.SizeType, @truncate(to_copy)),
},
};
} else {
this.pending.result = .{
.into_array = .{
.value = view,
.len = @as(Blob.SizeType, @truncate(to_copy)),
},
};
}
this.pending.run();
}
/// Called when the shell has finished and no more data will be written.
pub fn setDone(this: *@This()) void {
this.done = true;
// If we have a pending read and no more data, resolve it as done
if (this.pending.state == .pending) {
const available = this.buffer.len -| this.offset;
if (available == 0) {
this.pending_buffer = &.{};
const view = this.pending_value.get();
if (view) |v| {
_ = v;
this.pending_value.clearWithoutDeallocation();
}
this.pending.result.deinit();
this.pending.result = .{ .done = {} };
this.pending.run();
} else {
// We have data, let onData handle it
this.onData();
}
}
}

View File

@@ -113,6 +113,8 @@ pub const CallstackGuard = enum(u0) { __i_know_what_i_am_doing };
pub const ExitCode = u16;
pub const ShellOutputStream = @import("./ShellOutputStream.zig");
pub const StateKind = enum(u8) {
script,
stmt,
@@ -278,6 +280,9 @@ pub const Interpreter = struct {
__alloc_scope: if (bun.Environment.enableAllocScopes) bun.AllocationScope else void,
stdout_stream: ?*ShellOutputStream.Source = null,
stderr_stream: ?*ShellOutputStream.Source = null,
// Here are all the state nodes:
pub const State = @import("./states/Base.zig");
pub const Script = @import("./states/Script.zig");
@@ -351,6 +356,10 @@ pub const Interpreter = struct {
async_pids: SmolList(pid_t, 4) = SmolList(pid_t, 4).zeroes,
/// Reference to the interpreter for stream notifications
/// Only set for the root shell
interpreter: ?*ThisInterpreter = null,
__alloc_scope: if (bun.Environment.enableAllocScopes) *bun.AllocationScope else void,
const pid_t = if (bun.Environment.isPosix) std.posix.pid_t else uv.uv_pid_t;
@@ -383,6 +392,20 @@ pub const Interpreter = struct {
};
}
/// Notify streams that new stdout data is available
pub fn notifyStdoutData(this: *ShellExecEnv) void {
if (this.interpreter) |interp| {
interp.notifyStdoutData();
}
}
/// Notify streams that new stderr data is available
pub fn notifyStderrData(this: *ShellExecEnv) void {
if (this.interpreter) |interp| {
interp.notifyStderrData();
}
}
pub inline fn cwdZ(this: *ShellExecEnv) [:0]const u8 {
if (this.__cwd.items.len == 0) return "";
return this.__cwd.items[0..this.__cwd.items.len -| 1 :0];
@@ -872,6 +895,7 @@ pub const Interpreter = struct {
}
interpreter.root_shell.__alloc_scope = if (bun.Environment.enableAllocScopes) &interpreter.__alloc_scope else {};
interpreter.root_shell.interpreter = interpreter;
return .{ .result = interpreter };
}
@@ -1139,6 +1163,9 @@ pub const Interpreter = struct {
log("Interpreter(0x{x}) finish {d}", .{ @intFromPtr(this), exit_code });
defer decrPendingActivityFlag(&this.has_pending_activity);
// Mark streams as done before resolving
this.markStreamsDone();
if (this.event_loop == .js) {
defer this.deinitAfterJSRun();
this.exit_code = exit_code;
@@ -1281,6 +1308,72 @@ pub const Interpreter = struct {
return ioToJSValue(globalThis, this.root_shell.buffered_stderr());
}
pub fn getStdout(this: *ThisInterpreter, globalThis: *JSGlobalObject) JSValue {
if (this.stdout_stream) |stream| {
return stream.toReadableStream(globalThis) catch |err| {
globalThis.reportActiveExceptionAsUnhandled(err);
return .zero;
};
}
// Create the stream
var source = ShellOutputStream.Source.new(.{
.globalThis = globalThis,
.context = ShellOutputStream.init(this.root_shell.buffered_stdout()),
});
this.stdout_stream = source;
return source.toReadableStream(globalThis) catch |err| {
globalThis.reportActiveExceptionAsUnhandled(err);
return .zero;
};
}
pub fn getStderr(this: *ThisInterpreter, globalThis: *JSGlobalObject) JSValue {
if (this.stderr_stream) |stream| {
return stream.toReadableStream(globalThis) catch |err| {
globalThis.reportActiveExceptionAsUnhandled(err);
return .zero;
};
}
// Create the stream
var source = ShellOutputStream.Source.new(.{
.globalThis = globalThis,
.context = ShellOutputStream.init(this.root_shell.buffered_stderr()),
});
this.stderr_stream = source;
return source.toReadableStream(globalThis) catch |err| {
globalThis.reportActiveExceptionAsUnhandled(err);
return .zero;
};
}
/// Notify stdout stream that new data is available
pub fn notifyStdoutData(this: *ThisInterpreter) void {
if (this.stdout_stream) |stream| {
stream.context.onData();
}
}
/// Notify stderr stream that new data is available
pub fn notifyStderrData(this: *ThisInterpreter) void {
if (this.stderr_stream) |stream| {
stream.context.onData();
}
}
/// Mark streams as done when shell finishes
fn markStreamsDone(this: *ThisInterpreter) void {
if (this.stdout_stream) |stream| {
stream.context.setDone();
}
if (this.stderr_stream) |stream| {
stream.context.setDone();
}
}
pub fn finalize(this: *ThisInterpreter) void {
log("Interpreter(0x{x}) finalize", .{@intFromPtr(this)});
this.deinitFromFinalizer();

View File

@@ -155,6 +155,7 @@ const BufferedIoClosed = struct {
if (cmd.io.stdout == .pipe and cmd.io.stdout == .pipe and !cmd.node.redirect.redirectsElsewhere(.stdout)) {
const the_slice = readable.pipe.slice();
bun.handleOom(cmd.base.shell.buffered_stdout().appendSlice(bun.default_allocator, the_slice));
cmd.base.shell.notifyStdoutData();
}
var buffer = readable.pipe.takeBuffer();
@@ -169,6 +170,7 @@ const BufferedIoClosed = struct {
if (cmd.io.stderr == .pipe and cmd.io.stderr == .pipe and !cmd.node.redirect.redirectsElsewhere(.stderr)) {
const the_slice = readable.pipe.slice();
bun.handleOom(cmd.base.shell.buffered_stderr().appendSlice(bun.default_allocator, the_slice));
cmd.base.shell.notifyStderrData();
}
var buffer = readable.pipe.takeBuffer();

View File

@@ -0,0 +1,113 @@
import { test, expect, describe } from "bun:test";
import { $ } from "bun";
describe("Shell streaming stdout/stderr", () => {
test("stdout returns a ReadableStream", async () => {
const shell = $`echo "hello world"`;
const stdout = shell.stdout;
expect(stdout).toBeInstanceOf(ReadableStream);
// Consume the stream
const chunks: Uint8Array[] = [];
for await (const chunk of stdout) {
chunks.push(chunk);
}
const text = new TextDecoder().decode(Buffer.concat(chunks));
expect(text.trim()).toBe("hello world");
// Wait for shell to complete
await shell;
});
test("stderr returns a ReadableStream", async () => {
const shell = $`node -e "console.error('error message')"`.nothrow();
const stderr = shell.stderr;
expect(stderr).toBeInstanceOf(ReadableStream);
// Consume the stream
const chunks: Uint8Array[] = [];
for await (const chunk of stderr) {
chunks.push(chunk);
}
const text = new TextDecoder().decode(Buffer.concat(chunks));
expect(text.trim()).toBe("error message");
// Wait for shell to complete
await shell;
});
test("can read stdout stream while command is running", async () => {
const shell = $`node -e "
for (let i = 0; i < 3; i++) {
console.log('line ' + i);
}
"`;
const chunks: string[] = [];
const reader = shell.stdout.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(decoder.decode(value, { stream: true }));
}
} finally {
reader.releaseLock();
}
const output = chunks.join('');
expect(output).toContain("line 0");
expect(output).toContain("line 1");
expect(output).toContain("line 2");
await shell;
});
test("stdout and stderr work independently", async () => {
const shell = $`node -e "
console.log('stdout message');
console.error('stderr message');
"`.nothrow();
const stdoutPromise = (async () => {
const chunks: Uint8Array[] = [];
for await (const chunk of shell.stdout) {
chunks.push(chunk);
}
return new TextDecoder().decode(Buffer.concat(chunks));
})();
const stderrPromise = (async () => {
const chunks: Uint8Array[] = [];
for await (const chunk of shell.stderr) {
chunks.push(chunk);
}
return new TextDecoder().decode(Buffer.concat(chunks));
})();
const [stdoutText, stderrText] = await Promise.all([stdoutPromise, stderrPromise]);
expect(stdoutText.trim()).toBe("stdout message");
expect(stderrText.trim()).toBe("stderr message");
await shell;
});
test("can access stdout stream multiple times", async () => {
const shell = $`echo "test"`;
const stream1 = shell.stdout;
const stream2 = shell.stdout;
// Should return the same stream instance
expect(stream1).toBe(stream2);
await shell;
});
});