mirror of
https://github.com/oven-sh/bun
synced 2026-02-04 16:08:53 +00:00
Compare commits
1 Commits
dylan/pyth
...
claude/she
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4681cbfeec |
32
packages/bun-types/shell.d.ts
vendored
32
packages/bun-types/shell.d.ts
vendored
@@ -88,6 +88,38 @@ declare module "bun" {
|
||||
* ```
|
||||
*/
|
||||
class ShellPromise extends Promise<ShellOutput> {
|
||||
/**
|
||||
* Get a ReadableStream for stdout that streams data as the shell executes.
|
||||
*
|
||||
* This allows you to consume stdout incrementally rather than waiting for
|
||||
* the command to complete. The stream will emit chunks as they're written.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const shell = $`long-running-command`;
|
||||
* for await (const chunk of shell.stdout) {
|
||||
* console.log('Received:', new TextDecoder().decode(chunk));
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
get stdout(): ReadableStream<Uint8Array>;
|
||||
|
||||
/**
|
||||
* Get a ReadableStream for stderr that streams data as the shell executes.
|
||||
*
|
||||
* This allows you to consume stderr incrementally rather than waiting for
|
||||
* the command to complete. The stream will emit chunks as they're written.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const shell = $`long-running-command`;
|
||||
* for await (const chunk of shell.stderr) {
|
||||
* console.error('Error:', new TextDecoder().decode(chunk));
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
get stderr(): ReadableStream<Uint8Array>;
|
||||
|
||||
get stdin(): WritableStream;
|
||||
|
||||
/**
|
||||
|
||||
@@ -23,6 +23,14 @@ export default [
|
||||
fn: "getStarted",
|
||||
length: 0,
|
||||
},
|
||||
stdout: {
|
||||
getter: "getStdout",
|
||||
cache: true,
|
||||
},
|
||||
stderr: {
|
||||
getter: "getStderr",
|
||||
cache: true,
|
||||
},
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
@@ -40,9 +40,9 @@ function source(name) {
|
||||
isClosed: {
|
||||
getter: "getIsClosedFromJS",
|
||||
},
|
||||
...(name !== "File"
|
||||
...(name !== "File" && name !== "ShellOutputStream"
|
||||
? // Buffered versions
|
||||
// not implemented in File, yet.
|
||||
// not implemented in File and ShellOutputStream yet.
|
||||
{
|
||||
text: {
|
||||
fn: "textFromJS",
|
||||
@@ -80,6 +80,6 @@ function source(name) {
|
||||
});
|
||||
}
|
||||
|
||||
const sources = ["Blob", "File", "Bytes"];
|
||||
const sources = ["Blob", "File", "Bytes", "ShellOutputStream"];
|
||||
|
||||
export default sources.map(source);
|
||||
|
||||
@@ -70,6 +70,7 @@ pub const Classes = struct {
|
||||
pub const FileInternalReadableStreamSource = webcore.FileReader.Source;
|
||||
pub const BlobInternalReadableStreamSource = webcore.ByteBlobLoader.Source;
|
||||
pub const BytesInternalReadableStreamSource = webcore.ByteStream.Source;
|
||||
pub const ShellOutputStreamInternalReadableStreamSource = webcore.ShellOutputStream.Source;
|
||||
pub const PostgresSQLConnection = api.Postgres.PostgresSQLConnection;
|
||||
pub const MySQLConnection = api.MySQL.MySQLConnection;
|
||||
pub const PostgresSQLQuery = api.Postgres.PostgresSQLQuery;
|
||||
|
||||
@@ -40,6 +40,7 @@ pub const FetchHeaders = @import("./bindings/FetchHeaders.zig").FetchHeaders;
|
||||
pub const ByteBlobLoader = @import("./webcore/ByteBlobLoader.zig");
|
||||
pub const ByteStream = @import("./webcore/ByteStream.zig");
|
||||
pub const FileReader = @import("./webcore/FileReader.zig");
|
||||
pub const ShellOutputStream = @import("../shell/ShellOutputStream.zig");
|
||||
pub const ScriptExecutionContext = @import("./webcore/ScriptExecutionContext.zig");
|
||||
|
||||
pub const streams = @import("./webcore/streams.zig");
|
||||
|
||||
@@ -109,6 +109,7 @@ export function createBunShellTemplateFunction(createShellInterpreter_, createPa
|
||||
#throws: boolean = true;
|
||||
#resolve: (code: number, stdout: Buffer, stderr: Buffer) => void;
|
||||
#reject: (code: number, stdout: Buffer, stderr: Buffer) => void;
|
||||
#interp: $ZigGeneratedClasses.ShellInterpreter | undefined = undefined;
|
||||
|
||||
constructor(args: $ZigGeneratedClasses.ParsedShellScript, throws: boolean) {
|
||||
// Create the error immediately so it captures the stacktrace at the point
|
||||
@@ -170,11 +171,22 @@ export function createBunShellTemplateFunction(createShellInterpreter_, createPa
|
||||
this.#hasRun = true;
|
||||
|
||||
let interp = createShellInterpreter(this.#resolve, this.#reject, this.#args!);
|
||||
this.#interp = interp;
|
||||
this.#args = undefined;
|
||||
interp.run();
|
||||
}
|
||||
}
|
||||
|
||||
get stdout(): ReadableStream {
|
||||
this.#run();
|
||||
return this.#interp!.stdout;
|
||||
}
|
||||
|
||||
get stderr(): ReadableStream {
|
||||
this.#run();
|
||||
return this.#interp!.stderr;
|
||||
}
|
||||
|
||||
#quiet(): this {
|
||||
this.#throwIfRunning();
|
||||
this.#args!.setQuiet();
|
||||
|
||||
@@ -623,6 +623,7 @@ pub fn done(this: *Builtin, exit_code: anytype) Yield {
|
||||
bun.default_allocator,
|
||||
this.stdout.buf.items[0..],
|
||||
));
|
||||
cmd.base.shell.notifyStdoutData();
|
||||
}
|
||||
// Aggregate output data if shell state is piped and this cmd is piped
|
||||
if (cmd.io.stderr == .pipe and cmd.io.stderr == .pipe and this.stderr == .buf) {
|
||||
@@ -630,6 +631,7 @@ pub fn done(this: *Builtin, exit_code: anytype) Yield {
|
||||
bun.default_allocator,
|
||||
this.stderr.buf.items[0..],
|
||||
));
|
||||
cmd.base.shell.notifyStderrData();
|
||||
}
|
||||
|
||||
return cmd.parent.childDone(cmd, this.exit_code.?);
|
||||
|
||||
207
src/shell/ShellOutputStream.zig
Normal file
207
src/shell/ShellOutputStream.zig
Normal file
@@ -0,0 +1,207 @@
|
||||
const std = @import("std");
|
||||
const bun = @import("bun");
|
||||
const jsc = bun.jsc;
|
||||
const JSValue = jsc.JSValue;
|
||||
const JSGlobalObject = jsc.JSGlobalObject;
|
||||
const webcore = jsc.WebCore;
|
||||
const Blob = webcore.Blob;
|
||||
const streams = webcore.streams;
|
||||
const Output = bun.Output;
|
||||
|
||||
/// ShellOutputStream provides a ReadableStream interface over a ByteList that is
|
||||
/// being written to during shell execution. It allows streaming stdout/stderr
|
||||
/// while the shell is still running, rather than waiting for completion.
|
||||
const ShellOutputStream = @This();
|
||||
|
||||
/// Pointer to the ByteList being written to by the shell
|
||||
buffer: *bun.ByteList,
|
||||
/// Current read offset in the buffer
|
||||
offset: usize = 0,
|
||||
/// Whether the shell has finished and no more data will be written
|
||||
done: bool = false,
|
||||
/// Pending read operation
|
||||
pending: streams.Result.Pending = .{ .result = .{ .done = {} } },
|
||||
/// Buffer for pending read
|
||||
pending_buffer: []u8 = &.{},
|
||||
/// JSValue for the pending read
|
||||
pending_value: jsc.Strong.Optional = .empty,
|
||||
|
||||
pub const Source = webcore.ReadableStream.NewSource(
|
||||
@This(),
|
||||
"ShellOutputStream",
|
||||
onStart,
|
||||
onPull,
|
||||
onCancel,
|
||||
deinit,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
);
|
||||
|
||||
const log = Output.scoped(.ShellOutputStream, .visible);
|
||||
|
||||
pub fn init(buffer: *bun.ByteList) ShellOutputStream {
|
||||
return .{
|
||||
.buffer = buffer,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn parent(this: *@This()) *Source {
|
||||
return @fieldParentPtr("context", this);
|
||||
}
|
||||
|
||||
pub fn onStart(this: *@This()) streams.Start {
|
||||
// If we already have data, let the consumer know
|
||||
if (this.buffer.len > 0 and this.done) {
|
||||
return .{ .chunk_size = 16384 };
|
||||
}
|
||||
|
||||
return .{ .ready = {} };
|
||||
}
|
||||
|
||||
pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
|
||||
jsc.markBinding(@src());
|
||||
bun.assert(buffer.len > 0);
|
||||
|
||||
const available = this.buffer.len -| this.offset;
|
||||
|
||||
if (available > 0) {
|
||||
const to_copy = @min(available, buffer.len);
|
||||
@memcpy(buffer[0..to_copy], this.buffer.slice()[this.offset..][0..to_copy]);
|
||||
this.offset += to_copy;
|
||||
|
||||
// If we've read everything and the shell is done, signal completion
|
||||
if (this.done and this.offset >= this.buffer.len) {
|
||||
return .{
|
||||
.into_array_and_done = .{
|
||||
.value = view,
|
||||
.len = @as(Blob.SizeType, @truncate(to_copy)),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return .{
|
||||
.into_array = .{
|
||||
.value = view,
|
||||
.len = @as(Blob.SizeType, @truncate(to_copy)),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// No data available yet
|
||||
if (this.done) {
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
// Wait for data
|
||||
this.pending_buffer = buffer;
|
||||
this.pending_value.set(this.parent().globalThis, view);
|
||||
return .{
|
||||
.pending = &this.pending,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn onCancel(this: *@This()) void {
|
||||
jsc.markBinding(@src());
|
||||
this.done = true;
|
||||
this.pending_value.deinit();
|
||||
|
||||
if (this.pending.state == .pending) {
|
||||
this.pending_buffer = &.{};
|
||||
this.pending.result.deinit();
|
||||
this.pending.result = .{ .done = {} };
|
||||
this.pending.run();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(this: *@This()) void {
|
||||
jsc.markBinding(@src());
|
||||
this.pending_value.deinit();
|
||||
|
||||
if (!this.done) {
|
||||
this.done = true;
|
||||
if (this.pending.state == .pending) {
|
||||
this.pending_buffer = &.{};
|
||||
this.pending.result.deinit();
|
||||
this.pending.result = .{ .done = {} };
|
||||
if (this.pending.future == .promise) {
|
||||
this.pending.runOnNextTick();
|
||||
} else {
|
||||
this.pending.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.parent().deinit();
|
||||
}
|
||||
|
||||
/// Called when new data has been written to the buffer.
|
||||
/// Resumes any pending read operation.
|
||||
pub fn onData(this: *@This()) void {
|
||||
if (this.pending.state != .pending) {
|
||||
return;
|
||||
}
|
||||
|
||||
const available = this.buffer.len -| this.offset;
|
||||
if (available == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const to_copy = @min(available, this.pending_buffer.len);
|
||||
@memcpy(
|
||||
this.pending_buffer[0..to_copy],
|
||||
this.buffer.slice()[this.offset..][0..to_copy]
|
||||
);
|
||||
this.offset += to_copy;
|
||||
|
||||
const view = this.pending_value.get() orelse {
|
||||
return;
|
||||
};
|
||||
this.pending_value.clearWithoutDeallocation();
|
||||
this.pending_buffer = &.{};
|
||||
|
||||
const is_done = this.done and this.offset >= this.buffer.len;
|
||||
|
||||
if (is_done) {
|
||||
this.pending.result = .{
|
||||
.into_array_and_done = .{
|
||||
.value = view,
|
||||
.len = @as(Blob.SizeType, @truncate(to_copy)),
|
||||
},
|
||||
};
|
||||
} else {
|
||||
this.pending.result = .{
|
||||
.into_array = .{
|
||||
.value = view,
|
||||
.len = @as(Blob.SizeType, @truncate(to_copy)),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
this.pending.run();
|
||||
}
|
||||
|
||||
/// Called when the shell has finished and no more data will be written.
|
||||
pub fn setDone(this: *@This()) void {
|
||||
this.done = true;
|
||||
|
||||
// If we have a pending read and no more data, resolve it as done
|
||||
if (this.pending.state == .pending) {
|
||||
const available = this.buffer.len -| this.offset;
|
||||
if (available == 0) {
|
||||
this.pending_buffer = &.{};
|
||||
const view = this.pending_value.get();
|
||||
if (view) |v| {
|
||||
_ = v;
|
||||
this.pending_value.clearWithoutDeallocation();
|
||||
}
|
||||
this.pending.result.deinit();
|
||||
this.pending.result = .{ .done = {} };
|
||||
this.pending.run();
|
||||
} else {
|
||||
// We have data, let onData handle it
|
||||
this.onData();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -113,6 +113,8 @@ pub const CallstackGuard = enum(u0) { __i_know_what_i_am_doing };
|
||||
|
||||
pub const ExitCode = u16;
|
||||
|
||||
pub const ShellOutputStream = @import("./ShellOutputStream.zig");
|
||||
|
||||
pub const StateKind = enum(u8) {
|
||||
script,
|
||||
stmt,
|
||||
@@ -278,6 +280,9 @@ pub const Interpreter = struct {
|
||||
|
||||
__alloc_scope: if (bun.Environment.enableAllocScopes) bun.AllocationScope else void,
|
||||
|
||||
stdout_stream: ?*ShellOutputStream.Source = null,
|
||||
stderr_stream: ?*ShellOutputStream.Source = null,
|
||||
|
||||
// Here are all the state nodes:
|
||||
pub const State = @import("./states/Base.zig");
|
||||
pub const Script = @import("./states/Script.zig");
|
||||
@@ -351,6 +356,10 @@ pub const Interpreter = struct {
|
||||
|
||||
async_pids: SmolList(pid_t, 4) = SmolList(pid_t, 4).zeroes,
|
||||
|
||||
/// Reference to the interpreter for stream notifications
|
||||
/// Only set for the root shell
|
||||
interpreter: ?*ThisInterpreter = null,
|
||||
|
||||
__alloc_scope: if (bun.Environment.enableAllocScopes) *bun.AllocationScope else void,
|
||||
|
||||
const pid_t = if (bun.Environment.isPosix) std.posix.pid_t else uv.uv_pid_t;
|
||||
@@ -383,6 +392,20 @@ pub const Interpreter = struct {
|
||||
};
|
||||
}
|
||||
|
||||
/// Notify streams that new stdout data is available
|
||||
pub fn notifyStdoutData(this: *ShellExecEnv) void {
|
||||
if (this.interpreter) |interp| {
|
||||
interp.notifyStdoutData();
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify streams that new stderr data is available
|
||||
pub fn notifyStderrData(this: *ShellExecEnv) void {
|
||||
if (this.interpreter) |interp| {
|
||||
interp.notifyStderrData();
|
||||
}
|
||||
}
|
||||
|
||||
pub inline fn cwdZ(this: *ShellExecEnv) [:0]const u8 {
|
||||
if (this.__cwd.items.len == 0) return "";
|
||||
return this.__cwd.items[0..this.__cwd.items.len -| 1 :0];
|
||||
@@ -872,6 +895,7 @@ pub const Interpreter = struct {
|
||||
}
|
||||
|
||||
interpreter.root_shell.__alloc_scope = if (bun.Environment.enableAllocScopes) &interpreter.__alloc_scope else {};
|
||||
interpreter.root_shell.interpreter = interpreter;
|
||||
|
||||
return .{ .result = interpreter };
|
||||
}
|
||||
@@ -1139,6 +1163,9 @@ pub const Interpreter = struct {
|
||||
log("Interpreter(0x{x}) finish {d}", .{ @intFromPtr(this), exit_code });
|
||||
defer decrPendingActivityFlag(&this.has_pending_activity);
|
||||
|
||||
// Mark streams as done before resolving
|
||||
this.markStreamsDone();
|
||||
|
||||
if (this.event_loop == .js) {
|
||||
defer this.deinitAfterJSRun();
|
||||
this.exit_code = exit_code;
|
||||
@@ -1281,6 +1308,72 @@ pub const Interpreter = struct {
|
||||
return ioToJSValue(globalThis, this.root_shell.buffered_stderr());
|
||||
}
|
||||
|
||||
pub fn getStdout(this: *ThisInterpreter, globalThis: *JSGlobalObject) JSValue {
|
||||
if (this.stdout_stream) |stream| {
|
||||
return stream.toReadableStream(globalThis) catch |err| {
|
||||
globalThis.reportActiveExceptionAsUnhandled(err);
|
||||
return .zero;
|
||||
};
|
||||
}
|
||||
|
||||
// Create the stream
|
||||
var source = ShellOutputStream.Source.new(.{
|
||||
.globalThis = globalThis,
|
||||
.context = ShellOutputStream.init(this.root_shell.buffered_stdout()),
|
||||
});
|
||||
this.stdout_stream = source;
|
||||
|
||||
return source.toReadableStream(globalThis) catch |err| {
|
||||
globalThis.reportActiveExceptionAsUnhandled(err);
|
||||
return .zero;
|
||||
};
|
||||
}
|
||||
|
||||
pub fn getStderr(this: *ThisInterpreter, globalThis: *JSGlobalObject) JSValue {
|
||||
if (this.stderr_stream) |stream| {
|
||||
return stream.toReadableStream(globalThis) catch |err| {
|
||||
globalThis.reportActiveExceptionAsUnhandled(err);
|
||||
return .zero;
|
||||
};
|
||||
}
|
||||
|
||||
// Create the stream
|
||||
var source = ShellOutputStream.Source.new(.{
|
||||
.globalThis = globalThis,
|
||||
.context = ShellOutputStream.init(this.root_shell.buffered_stderr()),
|
||||
});
|
||||
this.stderr_stream = source;
|
||||
|
||||
return source.toReadableStream(globalThis) catch |err| {
|
||||
globalThis.reportActiveExceptionAsUnhandled(err);
|
||||
return .zero;
|
||||
};
|
||||
}
|
||||
|
||||
/// Notify stdout stream that new data is available
|
||||
pub fn notifyStdoutData(this: *ThisInterpreter) void {
|
||||
if (this.stdout_stream) |stream| {
|
||||
stream.context.onData();
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify stderr stream that new data is available
|
||||
pub fn notifyStderrData(this: *ThisInterpreter) void {
|
||||
if (this.stderr_stream) |stream| {
|
||||
stream.context.onData();
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark streams as done when shell finishes
|
||||
fn markStreamsDone(this: *ThisInterpreter) void {
|
||||
if (this.stdout_stream) |stream| {
|
||||
stream.context.setDone();
|
||||
}
|
||||
if (this.stderr_stream) |stream| {
|
||||
stream.context.setDone();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finalize(this: *ThisInterpreter) void {
|
||||
log("Interpreter(0x{x}) finalize", .{@intFromPtr(this)});
|
||||
this.deinitFromFinalizer();
|
||||
|
||||
@@ -155,6 +155,7 @@ const BufferedIoClosed = struct {
|
||||
if (cmd.io.stdout == .pipe and cmd.io.stdout == .pipe and !cmd.node.redirect.redirectsElsewhere(.stdout)) {
|
||||
const the_slice = readable.pipe.slice();
|
||||
bun.handleOom(cmd.base.shell.buffered_stdout().appendSlice(bun.default_allocator, the_slice));
|
||||
cmd.base.shell.notifyStdoutData();
|
||||
}
|
||||
|
||||
var buffer = readable.pipe.takeBuffer();
|
||||
@@ -169,6 +170,7 @@ const BufferedIoClosed = struct {
|
||||
if (cmd.io.stderr == .pipe and cmd.io.stderr == .pipe and !cmd.node.redirect.redirectsElsewhere(.stderr)) {
|
||||
const the_slice = readable.pipe.slice();
|
||||
bun.handleOom(cmd.base.shell.buffered_stderr().appendSlice(bun.default_allocator, the_slice));
|
||||
cmd.base.shell.notifyStderrData();
|
||||
}
|
||||
|
||||
var buffer = readable.pipe.takeBuffer();
|
||||
|
||||
113
test/js/bun/shell/shell-streaming.test.ts
Normal file
113
test/js/bun/shell/shell-streaming.test.ts
Normal file
@@ -0,0 +1,113 @@
|
||||
import { test, expect, describe } from "bun:test";
|
||||
import { $ } from "bun";
|
||||
|
||||
describe("Shell streaming stdout/stderr", () => {
|
||||
test("stdout returns a ReadableStream", async () => {
|
||||
const shell = $`echo "hello world"`;
|
||||
const stdout = shell.stdout;
|
||||
|
||||
expect(stdout).toBeInstanceOf(ReadableStream);
|
||||
|
||||
// Consume the stream
|
||||
const chunks: Uint8Array[] = [];
|
||||
for await (const chunk of stdout) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
const text = new TextDecoder().decode(Buffer.concat(chunks));
|
||||
expect(text.trim()).toBe("hello world");
|
||||
|
||||
// Wait for shell to complete
|
||||
await shell;
|
||||
});
|
||||
|
||||
test("stderr returns a ReadableStream", async () => {
|
||||
const shell = $`node -e "console.error('error message')"`.nothrow();
|
||||
const stderr = shell.stderr;
|
||||
|
||||
expect(stderr).toBeInstanceOf(ReadableStream);
|
||||
|
||||
// Consume the stream
|
||||
const chunks: Uint8Array[] = [];
|
||||
for await (const chunk of stderr) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
const text = new TextDecoder().decode(Buffer.concat(chunks));
|
||||
expect(text.trim()).toBe("error message");
|
||||
|
||||
// Wait for shell to complete
|
||||
await shell;
|
||||
});
|
||||
|
||||
test("can read stdout stream while command is running", async () => {
|
||||
const shell = $`node -e "
|
||||
for (let i = 0; i < 3; i++) {
|
||||
console.log('line ' + i);
|
||||
}
|
||||
"`;
|
||||
|
||||
const chunks: string[] = [];
|
||||
const reader = shell.stdout.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
chunks.push(decoder.decode(value, { stream: true }));
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
const output = chunks.join('');
|
||||
expect(output).toContain("line 0");
|
||||
expect(output).toContain("line 1");
|
||||
expect(output).toContain("line 2");
|
||||
|
||||
await shell;
|
||||
});
|
||||
|
||||
test("stdout and stderr work independently", async () => {
|
||||
const shell = $`node -e "
|
||||
console.log('stdout message');
|
||||
console.error('stderr message');
|
||||
"`.nothrow();
|
||||
|
||||
const stdoutPromise = (async () => {
|
||||
const chunks: Uint8Array[] = [];
|
||||
for await (const chunk of shell.stdout) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
return new TextDecoder().decode(Buffer.concat(chunks));
|
||||
})();
|
||||
|
||||
const stderrPromise = (async () => {
|
||||
const chunks: Uint8Array[] = [];
|
||||
for await (const chunk of shell.stderr) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
return new TextDecoder().decode(Buffer.concat(chunks));
|
||||
})();
|
||||
|
||||
const [stdoutText, stderrText] = await Promise.all([stdoutPromise, stderrPromise]);
|
||||
|
||||
expect(stdoutText.trim()).toBe("stdout message");
|
||||
expect(stderrText.trim()).toBe("stderr message");
|
||||
|
||||
await shell;
|
||||
});
|
||||
|
||||
test("can access stdout stream multiple times", async () => {
|
||||
const shell = $`echo "test"`;
|
||||
|
||||
const stream1 = shell.stdout;
|
||||
const stream2 = shell.stdout;
|
||||
|
||||
// Should return the same stream instance
|
||||
expect(stream1).toBe(stream2);
|
||||
|
||||
await shell;
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user