shell: Fix array buffer

This commit is contained in:
Zack Radisic
2024-02-23 13:45:58 -08:00
parent c367ce1620
commit 2a8bfbe4ea
3 changed files with 70 additions and 23 deletions

View File

@@ -3375,7 +3375,7 @@ pub const Interpreter = struct {
} else switch (redirect) {
.jsbuf => |val| {
// JS values in here is probably a bug
if (this.base.eventLoop() == .js) @panic("JS values not allowed in this context");
if (this.base.eventLoop() != .js) @panic("JS values not allowed in this context");
const global = this.base.eventLoop().js.global;
if (this.base.interpreter.jsobjs[val.idx].asArrayBuffer(global)) |buf| {

View File

@@ -28,7 +28,6 @@ const FileSink = JSC.WebCore.FileSink;
const StdioResult = if (Environment.isWindows) bun.spawn.WindowsSpawnResult.StdioResult else ?bun.FileDescriptor;
const BufferedOutput = struct {};
const BufferedInput = struct {};
/// TODO Set this to interpreter
@@ -141,7 +140,14 @@ pub const ShellSubprocess = struct {
.fd => Readable{ .fd = result.? },
.memfd => Readable{ .memfd = stdio.memfd },
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, false, out_type) },
.array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
.array_buffer => {
const readable = Readable{ .pipe = PipeReader.create(event_loop, process, result, false, out_type) };
readable.pipe.buffered_output = .{
.array_buffer = .{ .buf = stdio.array_buffer, .i = 0 },
};
return readable;
},
.blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
.capture => Readable{ .pipe = PipeReader.create(event_loop, process, result, true, out_type) },
};
}
@@ -681,11 +687,48 @@ pub const PipeReader = struct {
err: bun.sys.Error,
} = .{ .pending = {} },
stdio_result: StdioResult,
captured_writer: CapturedWriter = .{},
out_type: bun.shell.subproc.ShellSubprocess.OutKind,
buffered: bun.ByteList = .{},
captured_writer: CapturedWriter = .{},
buffered_output: BufferedOutput = .{ .bytelist = .{} },
ref_count: u32 = 1,
const BufferedOutput = union(enum) {
bytelist: bun.ByteList,
array_buffer: struct {
buf: JSC.ArrayBuffer.Strong,
i: u32 = 0,
},
pub fn slice(this: *BufferedOutput) []const u8 {
return switch (this.*) {
.bytelist => this.bytelist.slice(),
.array_buffer => this.array_buffer.buf.slice(),
};
}
pub fn append(this: *BufferedOutput, bytes: []const u8) void {
switch (this.*) {
.bytelist => {
this.bytelist.append(bun.default_allocator, bytes) catch bun.outOfMemory();
},
.array_buffer => {
const array_buf_slice = this.array_buffer.buf.slice();
if (array_buf_slice.len - this.array_buffer.i < bytes.len) return;
@memcpy(array_buf_slice[this.array_buffer.i .. this.array_buffer.i + bytes.len], bytes);
},
}
}
pub fn deinit(this: *BufferedOutput) void {
switch (this.*) {
.bytelist => {
this.bytelist.deinitWithAllocator(bun.default_allocator);
},
.array_buffer => {},
}
}
};
pub usingnamespace bun.NewRefCounted(PipeReader, deinit);
pub const CapturedWriter = struct {
@@ -730,7 +773,7 @@ pub const PipeReader = struct {
}
pub fn onWrite(this: *CapturedWriter, amount: usize, done: bool) void {
log("CapturedWriter onWrite({x}, {d}, {any})", .{ @intFromPtr(this), amount, done });
log("CapturedWriter({x}, {s}) onWrite({d}, {any})", .{ @intFromPtr(this), @tagName(this.parent().out_type), amount, done });
this.written += amount;
if (done) return;
if (this.written >= this.parent().reader.buffer().items.len) {
@@ -743,6 +786,7 @@ pub const PipeReader = struct {
}
pub fn onClose(this: *CapturedWriter) void {
log("CapturedWriter({x}, {s}) onClose()", .{ @intFromPtr(this), @tagName(this.parent().out_type) });
this.parent().onCapturedWriterDone();
}
};
@@ -751,7 +795,7 @@ pub const PipeReader = struct {
pub const Poll = IOReader;
pub fn detach(this: *PipeReader) void {
log("PipeReader detach({x})", .{@intFromPtr(this)});
log("PipeReader(0x{x}, {s}) detach()", .{ @intFromPtr(this), @tagName(this.out_type) });
this.process = null;
this.deref();
}
@@ -773,6 +817,7 @@ pub const PipeReader = struct {
.stdio_result = result,
.out_type = out_type,
});
log("PipeReader(0x{x}, {s}) create()", .{ @intFromPtr(this), @tagName(this.out_type) });
if (capture) {
this.captured_writer.dead = false;
@@ -820,18 +865,20 @@ pub const PipeReader = struct {
pub fn onReadChunk(ptr: *anyopaque, chunk: []const u8, has_more: bun.io.ReadState) bool {
var this: *PipeReader = @ptrCast(@alignCast(ptr));
this.buffered.append(bun.default_allocator, chunk) catch bun.outOfMemory();
log("PipeReader onReadChunk({x}, ...)", .{@intFromPtr(this)});
if (this.captured_writer.writer.getPoll() == null) {
this.captured_writer.writer.handle = .{ .poll = Async.FilePoll.init(this.eventLoop(), if (this.out_type == .stdout) bun.STDOUT_FD else bun.STDERR_FD, .{}, @TypeOf(this.captured_writer.writer), &this.captured_writer.writer) };
}
switch (this.captured_writer.writer.write(chunk)) {
.err => |e| {
const writer = std.io.getStdOut().writer();
e.format("Yoops ", .{}, writer) catch @panic("oops");
@panic("TODO SHELL SUBPROC onReadChunk error");
},
else => {},
this.buffered_output.append(chunk);
log("PipeReader(0x{x}, {s}) onReadChunk(...)", .{ @intFromPtr(this), @tagName(this.out_type) });
if (!this.captured_writer.dead) {
if (this.captured_writer.writer.getPoll() == null) {
this.captured_writer.writer.handle = .{ .poll = Async.FilePoll.init(this.eventLoop(), if (this.out_type == .stdout) bun.STDOUT_FD else bun.STDERR_FD, .{}, @TypeOf(this.captured_writer.writer), &this.captured_writer.writer) };
}
switch (this.captured_writer.writer.write(chunk)) {
.err => |e| {
const writer = std.io.getStdOut().writer();
e.format("Yoops ", .{}, writer) catch @panic("oops");
@panic("TODO SHELL SUBPROC onReadChunk error");
},
else => {},
}
}
return has_more != .eof;
}
@@ -883,7 +930,7 @@ pub const PipeReader = struct {
}
pub fn slice(this: *PipeReader) []const u8 {
return this.buffered.slice();
return this.buffered_output.slice();
}
pub fn toOwnedSlice(this: *PipeReader) []u8 {
@@ -969,7 +1016,7 @@ pub const PipeReader = struct {
}
pub fn deinit(this: *PipeReader) void {
log("PipeReader deinit({x})", .{@intFromPtr(this)});
log("PipeReader(0x{x}, {s}) deinit()", .{ @intFromPtr(this), @tagName(this.out_type) });
if (comptime Environment.isPosix) {
std.debug.assert(this.reader.isDone());
}
@@ -982,7 +1029,7 @@ pub const PipeReader = struct {
bun.default_allocator.free(this.state.done);
}
this.buffered.deinitWithAllocator(bun.default_allocator);
this.buffered_output.deinit();
this.reader.deinit();
this.destroy();

View File

@@ -231,7 +231,7 @@ describe("bunshell", () => {
const { stdout } = await $`FOO=${whatsupbro}; echo $FOO`;
expect(stdout.toString("utf-8")).toEqual(whatsupbro + "\n");
});
expect(error).toBeDefined();
expect(error).toBeUndefined();
});
test("in compound word", async () => {