From 2a8bfbe4eaeb065899b4f871ecbfbd27eed4c3b8 Mon Sep 17 00:00:00 2001 From: Zack Radisic <56137411+zackradisic@users.noreply.github.com> Date: Fri, 23 Feb 2024 13:45:58 -0800 Subject: [PATCH] shell: Fix array buffer --- src/shell/interpreter.zig | 2 +- src/shell/subproc.zig | 89 +++++++++++++++++++++++------- test/js/bun/shell/bunshell.test.ts | 2 +- 3 files changed, 70 insertions(+), 23 deletions(-) diff --git a/src/shell/interpreter.zig b/src/shell/interpreter.zig index d01cf72476..4f04880b51 100644 --- a/src/shell/interpreter.zig +++ b/src/shell/interpreter.zig @@ -3375,7 +3375,7 @@ pub const Interpreter = struct { } else switch (redirect) { .jsbuf => |val| { // JS values in here is probably a bug - if (this.base.eventLoop() == .js) @panic("JS values not allowed in this context"); + if (this.base.eventLoop() != .js) @panic("JS values not allowed in this context"); const global = this.base.eventLoop().js.global; if (this.base.interpreter.jsobjs[val.idx].asArrayBuffer(global)) |buf| { diff --git a/src/shell/subproc.zig b/src/shell/subproc.zig index 65165eb69f..3a8439dde6 100644 --- a/src/shell/subproc.zig +++ b/src/shell/subproc.zig @@ -28,7 +28,6 @@ const FileSink = JSC.WebCore.FileSink; const StdioResult = if (Environment.isWindows) bun.spawn.WindowsSpawnResult.StdioResult else ?bun.FileDescriptor; -const BufferedOutput = struct {}; const BufferedInput = struct {}; /// TODO Set this to interpreter @@ -141,7 +140,14 @@ pub const ShellSubprocess = struct { .fd => Readable{ .fd = result.? }, .memfd => Readable{ .memfd = stdio.memfd }, .pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, false, out_type) }, - .array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}), + .array_buffer => { + const readable = Readable{ .pipe = PipeReader.create(event_loop, process, result, false, out_type) }; + readable.pipe.buffered_output = .{ + .array_buffer = .{ .buf = stdio.array_buffer, .i = 0 }, + }; + return readable; + }, + .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}), .capture => Readable{ .pipe = PipeReader.create(event_loop, process, result, true, out_type) }, }; } @@ -681,11 +687,48 @@ pub const PipeReader = struct { err: bun.sys.Error, } = .{ .pending = {} }, stdio_result: StdioResult, - captured_writer: CapturedWriter = .{}, out_type: bun.shell.subproc.ShellSubprocess.OutKind, - buffered: bun.ByteList = .{}, + captured_writer: CapturedWriter = .{}, + buffered_output: BufferedOutput = .{ .bytelist = .{} }, ref_count: u32 = 1, + const BufferedOutput = union(enum) { + bytelist: bun.ByteList, + array_buffer: struct { + buf: JSC.ArrayBuffer.Strong, + i: u32 = 0, + }, + + pub fn slice(this: *BufferedOutput) []const u8 { + return switch (this.*) { + .bytelist => this.bytelist.slice(), + .array_buffer => this.array_buffer.buf.slice(), + }; + } + + pub fn append(this: *BufferedOutput, bytes: []const u8) void { + switch (this.*) { + .bytelist => { + this.bytelist.append(bun.default_allocator, bytes) catch bun.outOfMemory(); + }, + .array_buffer => { + const array_buf_slice = this.array_buffer.buf.slice(); + if (array_buf_slice.len - this.array_buffer.i < bytes.len) return; + @memcpy(array_buf_slice[this.array_buffer.i .. this.array_buffer.i + bytes.len], bytes); + }, + } + } + + pub fn deinit(this: *BufferedOutput) void { + switch (this.*) { + .bytelist => { + this.bytelist.deinitWithAllocator(bun.default_allocator); + }, + .array_buffer => {}, + } + } + }; + pub usingnamespace bun.NewRefCounted(PipeReader, deinit); pub const CapturedWriter = struct { @@ -730,7 +773,7 @@ pub const PipeReader = struct { } pub fn onWrite(this: *CapturedWriter, amount: usize, done: bool) void { - log("CapturedWriter onWrite({x}, {d}, {any})", .{ @intFromPtr(this), amount, done }); + log("CapturedWriter({x}, {s}) onWrite({d}, {any})", .{ @intFromPtr(this), @tagName(this.parent().out_type), amount, done }); this.written += amount; if (done) return; if (this.written >= this.parent().reader.buffer().items.len) { @@ -743,6 +786,7 @@ pub const PipeReader = struct { } pub fn onClose(this: *CapturedWriter) void { + log("CapturedWriter({x}, {s}) onClose()", .{ @intFromPtr(this), @tagName(this.parent().out_type) }); this.parent().onCapturedWriterDone(); } }; @@ -751,7 +795,7 @@ pub const PipeReader = struct { pub const Poll = IOReader; pub fn detach(this: *PipeReader) void { - log("PipeReader detach({x})", .{@intFromPtr(this)}); + log("PipeReader(0x{x}, {s}) detach()", .{ @intFromPtr(this), @tagName(this.out_type) }); this.process = null; this.deref(); } @@ -773,6 +817,7 @@ pub const PipeReader = struct { .stdio_result = result, .out_type = out_type, }); + log("PipeReader(0x{x}, {s}) create()", .{ @intFromPtr(this), @tagName(this.out_type) }); if (capture) { this.captured_writer.dead = false; @@ -820,18 +865,20 @@ pub const PipeReader = struct { pub fn onReadChunk(ptr: *anyopaque, chunk: []const u8, has_more: bun.io.ReadState) bool { var this: *PipeReader = @ptrCast(@alignCast(ptr)); - this.buffered.append(bun.default_allocator, chunk) catch bun.outOfMemory(); - log("PipeReader onReadChunk({x}, ...)", .{@intFromPtr(this)}); - if (this.captured_writer.writer.getPoll() == null) { - this.captured_writer.writer.handle = .{ .poll = Async.FilePoll.init(this.eventLoop(), if (this.out_type == .stdout) bun.STDOUT_FD else bun.STDERR_FD, .{}, @TypeOf(this.captured_writer.writer), &this.captured_writer.writer) }; - } - switch (this.captured_writer.writer.write(chunk)) { - .err => |e| { - const writer = std.io.getStdOut().writer(); - e.format("Yoops ", .{}, writer) catch @panic("oops"); - @panic("TODO SHELL SUBPROC onReadChunk error"); - }, - else => {}, + this.buffered_output.append(chunk); + log("PipeReader(0x{x}, {s}) onReadChunk(...)", .{ @intFromPtr(this), @tagName(this.out_type) }); + if (!this.captured_writer.dead) { + if (this.captured_writer.writer.getPoll() == null) { + this.captured_writer.writer.handle = .{ .poll = Async.FilePoll.init(this.eventLoop(), if (this.out_type == .stdout) bun.STDOUT_FD else bun.STDERR_FD, .{}, @TypeOf(this.captured_writer.writer), &this.captured_writer.writer) }; + } + switch (this.captured_writer.writer.write(chunk)) { + .err => |e| { + const writer = std.io.getStdOut().writer(); + e.format("Yoops ", .{}, writer) catch @panic("oops"); + @panic("TODO SHELL SUBPROC onReadChunk error"); + }, + else => {}, + } } return has_more != .eof; } @@ -883,7 +930,7 @@ pub const PipeReader = struct { } pub fn slice(this: *PipeReader) []const u8 { - return this.buffered.slice(); + return this.buffered_output.slice(); } pub fn toOwnedSlice(this: *PipeReader) []u8 { @@ -969,7 +1016,7 @@ pub const PipeReader = struct { } pub fn deinit(this: *PipeReader) void { - log("PipeReader deinit({x})", .{@intFromPtr(this)}); + log("PipeReader(0x{x}, {s}) deinit()", .{ @intFromPtr(this), @tagName(this.out_type) }); if (comptime Environment.isPosix) { std.debug.assert(this.reader.isDone()); } @@ -982,7 +1029,7 @@ pub const PipeReader = struct { bun.default_allocator.free(this.state.done); } - this.buffered.deinitWithAllocator(bun.default_allocator); + this.buffered_output.deinit(); this.reader.deinit(); this.destroy(); diff --git a/test/js/bun/shell/bunshell.test.ts b/test/js/bun/shell/bunshell.test.ts index 6b14c16247..41c53873ae 100644 --- a/test/js/bun/shell/bunshell.test.ts +++ b/test/js/bun/shell/bunshell.test.ts @@ -231,7 +231,7 @@ describe("bunshell", () => { const { stdout } = await $`FOO=${whatsupbro}; echo $FOO`; expect(stdout.toString("utf-8")).toEqual(whatsupbro + "\n"); }); - expect(error).toBeDefined(); + expect(error).toBeUndefined(); }); test("in compound word", async () => {