This commit is contained in:
Jarred Sumner
2024-02-07 13:48:09 -08:00
parent e708629265
commit baef6ea6f4
5 changed files with 155 additions and 809 deletions

View File

@@ -24,6 +24,15 @@ pub const Stdio = union(enum) {
const log = bun.sys.syslog;
pub fn byteSlice(this: *const Stdio) []const u8 {
return switch (this.*) {
.capture => this.capture.slice(),
.array_buffer => this.array_buffer.array_buffer.byteSlice(),
.blob => this.blob.slice(),
else => &[_]u8{},
};
}
pub fn deinit(this: *Stdio) void {
switch (this.*) {
.array_buffer => |*array_buffer| {
@@ -294,7 +303,7 @@ pub const Stdio = union(enum) {
if (blob.store()) |store| {
if (store.data.file.pathlike == .fd) {
if (store.data.file.pathlike.fd == fd) {
stdio.* = Stdio{ .inherit = .{} };
stdio.* = Stdio{ .inherit = {} };
} else {
switch (bun.FDTag.get(i)) {
.stdin => {

View File

@@ -371,6 +371,7 @@ pub const Subprocess = struct {
.memfd => Readable{ .memfd = stdio.memfd },
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, fd.?) },
.array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
.capture => Output.panic("TODO: implement capture support in Stdio readable", .{}),
};
}
@@ -649,7 +650,14 @@ pub const Subprocess = struct {
pub usingnamespace bun.NewRefCounted(@This(), deinit);
const This = @This();
pub const IOWriter = bun.io.BufferedWriter(This, onWrite, onError, onClose, getBuffer);
pub const IOWriter = bun.io.BufferedWriter(
This,
onWrite,
onError,
onClose,
getBuffer,
null,
);
pub const Poll = IOWriter;
pub fn updateRef(this: *This, add: bool) void {
@@ -665,7 +673,8 @@ pub const Subprocess = struct {
}
pub fn flush(this: *This) void {
this.writer.flush();
_ = this; // autofix
// this.writer.flush();
}
pub fn create(event_loop: anytype, subprocess: *ProcessType, fd: bun.FileDescriptor, source: Source) *This {
@@ -678,7 +687,7 @@ pub const Subprocess = struct {
}
pub fn start(this: *This) JSC.Maybe(void) {
return this.writer.start(this.fd, this.source.slice(), this.event_loop, true);
return this.writer.start(this.fd, true);
}
pub fn onWrite(this: *This, amount: usize, is_done: bool) void {
@@ -986,6 +995,9 @@ pub const Subprocess = struct {
.path, .ignore => {
return Writable{ .ignore = {} };
},
.capture => {
return Writable{ .ignore = {} };
},
}
}
@@ -993,7 +1005,7 @@ pub const Subprocess = struct {
return switch (this.*) {
.fd => |fd| JSValue.jsNumber(fd),
.memfd, .ignore => JSValue.jsUndefined(),
.buffer, .inherit => JSValue.jsUndefined(),
.capture, .buffer, .inherit => JSValue.jsUndefined(),
.pipe => |pipe| {
this.* = .{ .ignore = {} };
return pipe.toJS(globalThis);
@@ -1429,7 +1441,7 @@ pub const Subprocess = struct {
while (stdio_iter.next()) |value| : (i += 1) {
var new_item: Stdio = undefined;
if (&new_item.extract(globalThis, i, value))
if (new_item.extract(globalThis, i, value))
return JSC.JSValue.jsUndefined();
switch (new_item) {
.pipe => {

View File

@@ -133,8 +133,9 @@ pub fn PosixBufferedWriter(
comptime Parent: type,
comptime onWrite: *const fn (*Parent, amount: usize, done: bool) void,
comptime onError: *const fn (*Parent, bun.sys.Error) void,
comptime onClose: *const fn (*Parent) void,
comptime onClose: ?*const fn (*Parent) void,
comptime getBuffer: *const fn (*Parent) []const u8,
comptime onWritable: ?*const fn (*Parent) void,
) type {
return struct {
handle: PollOrFd = .{ .closed = {} },
@@ -171,7 +172,6 @@ pub fn PosixBufferedWriter(
const parent = this.parent;
onWrite(parent, written, done);
if (done and !was_done) {
this.close();
}
@@ -181,6 +181,10 @@ pub fn PosixBufferedWriter(
if (this.is_done) {
return;
}
if (onWritable) |cb| {
cb(this.parent);
}
}
fn registerPoll(this: *PosixWriter) void {
@@ -228,7 +232,8 @@ pub fn PosixBufferedWriter(
}
pub fn close(this: *PosixWriter) void {
this.handle.close(this.parent, onClose);
if (onClose) |closer|
this.handle.close(this.parent, closer);
}
pub fn updateRef(this: *const PosixWriter, event_loop: anytype, value: bool) void {

View File

@@ -2681,7 +2681,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
const HandleIOWrite = struct {
fn run(pipeline: *Pipeline, bufw: BufferedWriter) void {
pipeline.state = .{ .waiting_write_err = bufw };
pipeline.state.waiting_write_err.writeIfPossible(false);
pipeline.state.waiting_write_err.write();
}
};
@@ -3001,7 +3001,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
const HandleIOWrite = struct {
fn run(cmd: *Cmd, bufw: BufferedWriter) void {
cmd.state = .{ .waiting_write_err = bufw };
cmd.state.waiting_write_err.writeIfPossible(false);
cmd.state.waiting_write_err.write();
}
};
_ = this.base.shell.writeFailingError(buf, this, HandleIOWrite.run);
@@ -3014,7 +3014,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
// .parent = BufferedWriter.ParentPtr.init(this),
// .bytelist = val.captured,
// } };
// this.state.waiting_write_err.writeIfPossible(false);
// this.state.waiting_write_err.write();
// },
// .fd => {
// this.state = .{ .waiting_write_err = BufferedWriter{
@@ -3022,7 +3022,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
// .remain = buf,
// .parent = BufferedWriter.ParentPtr.init(this),
// } };
// this.state.waiting_write_err.writeIfPossible(false);
// this.state.waiting_write_err.write();
// },
// .pipe, .ignore => {
// this.parent.childDone(this, 1);
@@ -4167,7 +4167,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(io_kind),
},
};
this.print_state.?.bufwriter.writeIfPossible(false);
this.print_state.?.bufwriter.write();
return Maybe(void).success;
}
@@ -4240,7 +4240,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
},
};
this.print_state.?.bufwriter.writeIfPossible(false);
this.print_state.?.bufwriter.write();
// if (this.print_state.?.isDone()) {
// if (this.print_state.?.bufwriter.err) |e| {
@@ -4333,7 +4333,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stdout),
};
this.state = .waiting;
this.io_write_state.?.writeIfPossible(false);
this.io_write_state.?.write();
return Maybe(void).success;
}
@@ -4404,7 +4404,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
},
},
};
this.state.one_arg.writer.writeIfPossible(false);
this.state.one_arg.writer.write();
return Maybe(void).success;
}
@@ -4469,7 +4469,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stdout),
},
};
multiargs.state.waiting_write.writeIfPossible(false);
multiargs.state.waiting_write.write();
// yield execution
return;
};
@@ -4483,7 +4483,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stdout),
},
};
multiargs.state.waiting_write.writeIfPossible(false);
multiargs.state.waiting_write.write();
return;
}
@@ -4550,7 +4550,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
},
},
};
this.state.waiting_write_stderr.buffered_writer.writeIfPossible(false);
this.state.waiting_write_stderr.buffered_writer.write();
}
pub fn start(this: *Cd) Maybe(void) {
@@ -4680,7 +4680,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
},
},
};
this.state.waiting_io.writer.writeIfPossible(false);
this.state.waiting_io.writer.write();
return Maybe(void).success;
}
@@ -4706,7 +4706,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
},
},
};
this.state.waiting_io.writer.writeIfPossible(false);
this.state.waiting_io.writer.write();
return Maybe(void).success;
}
@@ -4800,7 +4800,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stderr),
},
};
this.state.waiting_write_err.writeIfPossible(false);
this.state.waiting_write_err.write();
return Maybe(void).success;
}
@@ -4890,7 +4890,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
if (this.state.exec.output_queue.len == 1 and do_run) {
// if (do_run and !this.state.exec.started_output_queue) {
this.state.exec.started_output_queue = true;
this.state.exec.output_queue.first.?.data.writer.writeIfPossible(false);
this.state.exec.output_queue.first.?.data.writer.write();
return .yield;
}
return .cont;
@@ -4898,7 +4898,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
fn scheduleBlockingOutput(this: *Ls) CoroutineResult {
if (this.state.exec.output_queue.len > 0) {
this.state.exec.output_queue.first.?.data.writer.writeIfPossible(false);
this.state.exec.output_queue.first.?.data.writer.write();
return .yield;
}
return .cont;
@@ -4919,7 +4919,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
bun.default_allocator.destroy(first);
}
if (first.next) |next_writer| {
next_writer.data.writer.writeIfPossible(false);
next_writer.data.writer.write();
return;
}
@@ -5789,7 +5789,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.exit_code = exit_code,
},
};
this.state.waiting_write_err.writer.writeIfPossible(false);
this.state.waiting_write_err.writer.write();
return Maybe(void).success;
}
@@ -6267,7 +6267,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stderr),
},
};
parse_opts.state.wait_write_err.writeIfPossible(false);
parse_opts.state.wait_write_err.write();
return Maybe(void).success;
}
@@ -6305,7 +6305,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stderr),
},
};
parse_opts.state.wait_write_err.writeIfPossible(false);
parse_opts.state.wait_write_err.write();
continue;
}
@@ -6350,7 +6350,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stderr),
},
};
parse_opts.state.wait_write_err.writeIfPossible(false);
parse_opts.state.wait_write_err.write();
return Maybe(void).success;
}
@@ -6389,7 +6389,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stderr),
},
};
parse_opts.state.wait_write_err.writeIfPossible(false);
parse_opts.state.wait_write_err.write();
return Maybe(void).success;
}
@@ -6412,7 +6412,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
.bytelist = this.bltn.stdBufferedBytelist(.stderr),
},
};
parse_opts.state.wait_write_err.writeIfPossible(false);
parse_opts.state.wait_write_err.write();
return Maybe(void).success;
}
@@ -6495,7 +6495,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
bun.default_allocator.destroy(first);
}
if (first.next) |next_writer| {
next_writer.data.writer.writeIfPossible(false);
next_writer.data.writer.write();
} else {
if (this.state.exec.state.tasksDone() >= this.state.exec.total_tasks and this.state.exec.getOutputCount(.output_done) >= this.state.exec.getOutputCount(.output_count)) {
this.bltn.done(if (this.state.exec.err != null) 1 else 0);
@@ -6684,7 +6684,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
// Need to start it
if (this.state.exec.output_queue.len == 1) {
this.state.exec.output_queue.first.?.data.writer.writeIfPossible(false);
this.state.exec.output_queue.first.?.data.writer.write();
}
}
@@ -7287,9 +7287,7 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
/// it. IT DOES NOT CLOSE FILE DESCRIPTORS
pub const BufferedWriter =
struct {
remain: []const u8 = "",
fd: bun.FileDescriptor,
poll_ref: ?*bun.Async.FilePoll = null,
writer: Writer = .{},
written: usize = 0,
parent: ParentPtr,
err: ?Syscall.Error = null,
@@ -7304,6 +7302,48 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
const BuiltinJs = bun.shell.Interpreter.Builtin;
const BuiltinMini = bun.shell.InterpreterMini.Builtin;
pub const Writer = bun.io.BufferedWriter(
@This(),
onWrite,
onError,
onClose,
getBuffer,
onReady,
);
pub const Status = union(enum) {
pending: void,
done: void,
err: bun.sys.Error,
};
pub fn getBuffer(this: *BufferedWriter) []const u8 {
_ = this; // autofix
// TODO:
return "";
}
pub fn onWrite(this: *BufferedWriter, amount: usize, done: bool) void {
_ = done; // autofix
if (this.bytelist) |bytelist| {
bytelist.append(bun.default_allocator, this.getBuffer()[this.getBuffer().len - amount ..]) catch bun.outOfMemory();
}
}
pub fn onError(this: *BufferedWriter, err: bun.sys.Error) void {
_ = this; // autofix
_ = err; // autofix
}
pub fn onReady(this: *BufferedWriter) void {
_ = this; // autofix
}
pub fn onClose(this: *BufferedWriter) void {
_ = this; // autofix
}
pub const ParentPtr = struct {
const Types = .{
BuiltinJs.Export,
@@ -7373,132 +7413,8 @@ pub fn NewInterpreter(comptime EventLoopKind: JSC.EventLoopKind) type {
pub const event_loop_kind = EventLoopKind;
pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedWriter, .writable, onReady);
pub fn onReady(this: *BufferedWriter, _: i64) void {
if (this.fd == bun.invalid_fd) {
return;
}
this.__write();
}
pub fn writeIfPossible(this: *BufferedWriter, comptime is_sync: bool) void {
if (this.remain.len == 0) return this.deinit();
if (comptime !is_sync) {
// we ask, "Is it possible to write right now?"
// we do this rather than epoll or kqueue()
// because we don't want to block the thread waiting for the write
switch (bun.isWritable(this.fd)) {
.ready => {
if (this.poll_ref) |poll| {
poll.flags.insert(.writable);
poll.flags.insert(.fifo);
std.debug.assert(poll.flags.contains(.poll_writable));
}
},
.hup => {
this.deinit();
return;
},
.not_ready => {
if (!this.isWatching()) this.watch(this.fd);
return;
},
}
}
this.writeAllowBlocking(is_sync);
}
/// Calling this directly will block if the fd is not opened with non
/// blocking option. If the fd is blocking, you should call
/// `writeIfPossible()` first, which will check if the fd is writable. If so
/// it will then call this function, if not, then it will poll for the fd to
/// be writable
pub fn __write(this: *BufferedWriter) void {
this.writeAllowBlocking(false);
}
pub fn writeAllowBlocking(this: *BufferedWriter, allow_blocking: bool) void {
var to_write = this.remain;
if (to_write.len == 0) {
// we are done!
this.deinit();
return;
}
if (comptime bun.Environment.allow_assert) {
// bun.assertNonBlocking(this.fd);
}
while (to_write.len > 0) {
switch (bun.sys.write(this.fd, to_write)) {
.err => |e| {
if (e.isRetry()) {
log("write({d}) retry", .{
to_write.len,
});
this.watch(this.fd);
this.poll_ref.?.flags.insert(.fifo);
return;
}
if (e.getErrno() == .PIPE) {
this.deinit();
return;
}
// fail
log("write({d}) fail: {d}", .{ to_write.len, e.errno });
this.err = e;
this.deinit();
return;
},
.result => |bytes_written| {
if (this.bytelist) |blist| {
blist.append(bun.default_allocator, to_write[0..bytes_written]) catch bun.outOfMemory();
}
this.written += bytes_written;
log(
"write({d}) {d}",
.{
to_write.len,
bytes_written,
},
);
this.remain = this.remain[@min(bytes_written, this.remain.len)..];
to_write = to_write[bytes_written..];
// we are done or it accepts no more input
if (this.remain.len == 0 or (allow_blocking and bytes_written == 0)) {
this.deinit();
return;
}
},
}
}
}
fn close(this: *BufferedWriter) void {
if (this.poll_ref) |poll| {
this.poll_ref = null;
poll.deinit();
}
if (this.fd != bun.invalid_fd) {
// _ = bun.sys.close(this.fd);
// this.fd = bun.invalid_fd;
}
}
pub fn deinit(this: *BufferedWriter) void {
this.close();
this.parent.onDone(this.err);
this.writer.deinit();
}
};
};
@@ -7770,11 +7686,20 @@ pub fn NewBufferedWriter(comptime Src: type, comptime Parent: type, comptime Eve
return struct {
src: Src,
fd: bun.FileDescriptor,
poll_ref: ?*bun.Async.FilePoll = null,
written: usize = 0,
parent: Parent,
err: ?Syscall.Error = null,
writer: Writer = .{},
pub const Writer = bun.io.BufferedWriter(
@This(),
onWrite,
onError,
// we don't close it
null,
getBuffer,
onReady,
);
pub const ParentType = Parent;
@@ -7787,141 +7712,48 @@ pub fn NewBufferedWriter(comptime Src: type, comptime Parent: type, comptime Eve
pub const event_loop_kind = EventLoopKind;
pub usingnamespace JSC.WebCore.NewReadyWatcher(@This(), .writable, onReady);
pub fn onReady(this: *@This(), _: i64) void {
if (this.fd == bun.invalid_fd) {
pub fn onReady(this: *@This()) void {
if (this.src.isDone(this.written)) {
this.parent.onDone(this.err);
return;
}
this.__write();
const buf = this.getBuffer();
this.writer.write(buf);
}
pub fn writeIfPossible(this: *@This(), comptime is_sync: bool) void {
if (SrcHandler.bufToWrite(this.src, 0).len == 0) return this.deinit();
if (comptime !is_sync) {
// we ask, "Is it possible to write right now?"
// we do this rather than epoll or kqueue()
// because we don't want to block the thread waiting for the write
switch (bun.isWritable(this.fd)) {
.ready => {
if (this.poll_ref) |poll| {
poll.flags.insert(.writable);
poll.flags.insert(.fifo);
std.debug.assert(poll.flags.contains(.poll_writable));
}
},
.hup => {
this.deinit();
return;
},
.not_ready => {
if (!this.isWatching()) this.watch(this.fd);
return;
},
}
}
this.writeAllowBlocking(is_sync);
pub fn getBuffer(this: *@This()) []const u8 {
return SrcHandler.bufToWrite(this.src, this.written);
}
/// Calling this directly will block if the fd is not opened with non
/// blocking option. If the fd is blocking, you should call
/// `writeIfPossible()` first, which will check if the fd is writable. If so
/// it will then call this function, if not, then it will poll for the fd to
/// be writable
pub fn __write(this: *@This()) void {
this.writeAllowBlocking(false);
}
pub fn writeAllowBlocking(this: *@This(), allow_blocking: bool) void {
_ = allow_blocking; // autofix
var to_write = SrcHandler.bufToWrite(this.src, this.written);
if (to_write.len == 0) {
// we are done!
// this.closeFDIfOpen();
if (SrcHandler.isDone(this.src, this.written)) {
this.deinit();
}
pub fn write(this: *@This()) void {
if (this.src.isDone(this.written)) {
return;
}
if (comptime bun.Environment.allow_assert) {
// bun.assertNonBlocking(this.fd);
}
const buf = this.getBuffer();
this.writer.write(buf);
}
while (to_write.len > 0) {
switch (bun.sys.write(this.fd, to_write)) {
.err => |e| {
if (e.isRetry()) {
log("write({d}) retry", .{
to_write.len,
});
pub fn onWrite(this: *@This(), amount: usize, done: bool) void {
this.written += amount;
this.watch(this.fd);
this.poll_ref.?.flags.insert(.fifo);
return;
}
if (e.getErrno() == .PIPE) {
this.deinit();
return;
}
// fail
log("write({d}) fail: {d}", .{ to_write.len, e.errno });
this.err = e;
this.deinit();
return;
},
.result => |bytes_written| {
this.written += bytes_written;
log(
"write({d}) {d}",
.{
to_write.len,
bytes_written,
},
);
// this.remain = this.remain[@min(bytes_written, this.remain.len)..];
// to_write = to_write[bytes_written..];
// // we are done or it accepts no more input
// if (this.remain.len == 0 or (allow_blocking and bytes_written == 0)) {
// this.deinit();
// return;
// }
to_write = SrcHandler.bufToWrite(this.src, this.written);
if (to_write.len == 0) {
if (SrcHandler.isDone(this.src, this.written)) {
this.deinit();
return;
}
}
},
}
if (done or this.src.isDone(this.written)) {
this.parent.onDone(this.err);
} else {
const buf = this.getBuffer();
this.writer.write(buf);
}
}
fn close(this: *@This()) void {
if (this.poll_ref) |poll| {
this.poll_ref = null;
poll.deinit();
}
pub fn onError(this: *@This(), err: bun.sys.Error) void {
this.err = err;
if (this.fd != bun.invalid_fd) {
// _ = bun.sys.close(this.fd);
// this.fd = bun.invalid_fd;
}
this.parent.onDone(this.err);
}
pub fn deinit(this: *@This()) void {
this.close();
this.parent.onDone(this.err);
this.writer.deinit();
}
};
}

View File

@@ -50,6 +50,7 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh
};
}
};
_ = get_vm; // autofix
// const ShellCmd = switch (EventLoopKind) {
// .js => bun.shell.interpret.Interpreter.Cmd,
@@ -73,9 +74,9 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh
process: *Process,
stdin: Writable,
stdout: Readable,
stderr: Readable,
stdin: *Writable = undefined,
stdout: *Readable = undefined,
stderr: *Readable = undefined,
globalThis: GlobalRef,
@@ -90,6 +91,9 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh
pub const OutKind = util.OutKind;
const Readable = opaque {};
const Writable = opaque {};
pub const Flags = packed struct(u3) {
is_sync: bool = false,
killed: bool = false,
@@ -97,335 +101,10 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh
};
pub const SignalCode = bun.SignalCode;
pub const Writable = union(enum) {
pipe: *FileSink,
pipe_to_readable_stream: struct {
pipe: *FileSink,
readable_stream: JSC.WebCore.ReadableStream,
},
fd: bun.FileDescriptor,
buffered_input: BufferedInput,
inherit: void,
ignore: void,
pub fn ref(this: *Writable) void {
switch (this.*) {
.pipe => {
if (this.pipe.poll_ref) |poll| {
poll.enableKeepingProcessAlive(get_vm.get());
}
},
else => {},
}
}
pub fn unref(this: *Writable) void {
switch (this.*) {
.pipe => {
if (this.pipe.poll_ref) |poll| {
poll.enableKeepingProcessAlive(get_vm.get());
}
},
else => {},
}
}
// When the stream has closed we need to be notified to prevent a use-after-free
// We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice
pub fn onClose(this: *Writable, _: ?bun.sys.Error) void {
this.* = .{
.ignore = {},
};
}
pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Writable) void {}
pub fn init(subproc: *Subprocess, stdio: Stdio, fd: ?bun.FileDescriptor, globalThis: GlobalRef) !Writable {
switch (stdio) {
.pipe => {
// var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink);
var sink = try GlobalHandle.init(globalThis).allocator().create(FileSink);
sink.* = .{
.fd = fd.?,
.buffer = bun.ByteList{},
.allocator = GlobalHandle.init(globalThis).allocator(),
.auto_close = true,
};
sink.mode = std.os.S.IFIFO;
sink.watch(fd.?);
if (stdio == .pipe) {
if (stdio.pipe) |readable| {
if (comptime EventLoopKind == .mini) @panic("FIXME TODO error gracefully but wait can this even happen");
return Writable{
.pipe_to_readable_stream = .{
.pipe = sink,
.readable_stream = readable,
},
};
}
}
return Writable{ .pipe = sink };
},
.array_buffer, .blob => {
var buffered_input: BufferedInput = .{ .fd = fd.?, .source = undefined, .subproc = subproc };
switch (stdio) {
.array_buffer => |array_buffer| {
buffered_input.source = .{ .array_buffer = array_buffer.buf };
},
.blob => |blob| {
buffered_input.source = .{ .blob = blob };
},
else => unreachable,
}
return Writable{ .buffered_input = buffered_input };
},
.fd => {
return Writable{ .fd = fd.? };
},
.inherit => {
return Writable{ .inherit = {} };
},
.path, .ignore => {
return Writable{ .ignore = {} };
},
}
}
pub fn toJS(this: Writable, globalThis: *JSC.JSGlobalObject) JSValue {
return switch (this) {
.pipe => |pipe| pipe.toJS(globalThis),
.fd => |fd| JSValue.jsNumber(fd),
.ignore => JSValue.jsUndefined(),
.inherit => JSValue.jsUndefined(),
.buffered_input => JSValue.jsUndefined(),
.pipe_to_readable_stream => this.pipe_to_readable_stream.readable_stream.value,
};
}
pub fn finalize(this: *Writable) void {
return switch (this.*) {
.pipe => |pipe| {
pipe.deref();
},
.pipe_to_readable_stream => |*pipe_to_readable_stream| {
_ = pipe_to_readable_stream.pipe.end(null);
},
.fd => |fd| {
_ = bun.sys.close(fd);
this.* = .{ .ignore = {} };
},
.buffered_input => {
this.buffered_input.deinit();
},
.ignore => {},
.inherit => {},
};
}
pub fn close(this: *Writable) void {
return switch (this.*) {
.pipe => {},
.pipe_to_readable_stream => |*pipe_to_readable_stream| {
_ = pipe_to_readable_stream.pipe.end(null);
},
.fd => |fd| {
_ = bun.sys.close(fd);
this.* = .{ .ignore = {} };
},
.buffered_input => {
this.buffered_input.deinit();
},
.ignore => {},
.inherit => {},
};
}
};
pub const Readable = union(enum) {
fd: bun.FileDescriptor,
pipe: Pipe,
inherit: void,
ignore: void,
closed: void,
pub fn ref(this: *Readable) void {
switch (this.*) {
.pipe => {
if (this.pipe == .buffer) {
// if (this.pipe.buffer.fifo.poll_ref) |poll| {
// poll.enableKeepingProcessAlive(get_vm.get());
// }
}
},
else => {},
}
}
pub fn unref(this: *Readable) void {
switch (this.*) {
.pipe => {
if (this.pipe == .buffer) {
// if (this.pipe.buffer.fifo.poll_ref) |poll| {
// poll.enableKeepingProcessAlive(get_vm.get());
// }
}
},
else => {},
}
}
pub fn init(subproc: *Subprocess, comptime kind: OutKind, stdio: Stdio, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32) Readable {
return switch (stdio) {
.ignore => Readable{ .ignore = {} },
.pipe => {
var subproc_readable_ptr = subproc.getIO(kind);
subproc_readable_ptr.* = Readable{ .pipe = .{ .buffer = undefined } };
BufferedOutput.initWithAllocator(subproc, &subproc_readable_ptr.pipe.buffer, kind, allocator, fd.?, max_size);
return subproc_readable_ptr.*;
},
.inherit => {
return Readable{ .inherit = {} };
},
.captured => |captured| {
var subproc_readable_ptr = subproc.getIO(kind);
subproc_readable_ptr.* = Readable{ .pipe = .{ .buffer = undefined } };
BufferedOutput.initWithAllocator(subproc, &subproc_readable_ptr.pipe.buffer, kind, allocator, fd.?, max_size);
subproc_readable_ptr.pipe.buffer.out = captured;
subproc_readable_ptr.pipe.buffer.writer = BufferedOutput.CapturedBufferedWriter{
.src = WriterSrc{
.inner = &subproc_readable_ptr.pipe.buffer,
},
.fd = if (kind == .stdout) bun.STDOUT_FD else bun.STDERR_FD,
.parent = .{ .parent = &subproc_readable_ptr.pipe.buffer },
};
return subproc_readable_ptr.*;
},
.path => Readable{ .ignore = {} },
.blob, .fd => Readable{ .fd = fd.? },
.array_buffer => {
var subproc_readable_ptr = subproc.getIO(kind);
subproc_readable_ptr.* = Readable{
.pipe = .{
.buffer = undefined,
},
};
if (stdio.array_buffer.from_jsc) {
BufferedOutput.initWithArrayBuffer(subproc, &subproc_readable_ptr.pipe.buffer, kind, fd.?, stdio.array_buffer.buf);
} else {
subproc_readable_ptr.pipe.buffer = BufferedOutput.initWithSlice(subproc, kind, fd.?, stdio.array_buffer.buf.slice());
}
return subproc_readable_ptr.*;
},
};
}
pub fn onClose(this: *Readable, _: ?bun.sys.Error) void {
this.* = .closed;
}
pub fn onReady(_: *Readable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Readable) void {}
pub fn close(this: *Readable) void {
log("READABLE close", .{});
switch (this.*) {
.fd => |fd| {
_ = bun.sys.close(fd);
},
.pipe => {
this.pipe.done();
},
else => {},
}
}
pub fn finalize(this: *Readable) void {
log("Readable::finalize", .{});
switch (this.*) {
.fd => |fd| {
_ = bun.sys.close(fd);
},
.pipe => {
if (this.pipe == .stream and this.pipe.stream.ptr == .File) {
this.close();
return;
}
// this.pipe.buffer.close();
},
else => {},
}
}
pub fn toJS(this: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue {
switch (this.*) {
.fd => |fd| {
return JSValue.jsNumber(fd);
},
.pipe => {
return this.pipe.toJS(this, globalThis, exited);
},
else => {
return JSValue.jsUndefined();
},
}
}
pub fn toSlice(this: *Readable) ?[]const u8 {
switch (this.*) {
.fd => return null,
.pipe => {
this.pipe.buffer.fifo.close_on_empty_read = true;
this.pipe.buffer.readAll();
const bytes = this.pipe.buffer.internal_buffer.slice();
// this.pipe.buffer.internal_buffer = .{};
if (bytes.len > 0) {
return bytes;
}
return "";
},
else => {
return null;
},
}
}
pub fn toBufferedValue(this: *Readable, globalThis: *JSC.JSGlobalObject) JSValue {
switch (this.*) {
.fd => |fd| {
return JSValue.jsNumber(fd);
},
.pipe => {
this.pipe.buffer.fifo.close_on_empty_read = true;
this.pipe.buffer.readAll();
const bytes = this.pipe.buffer.internal_buffer.slice();
this.pipe.buffer.internal_buffer = .{};
if (bytes.len > 0) {
// Return a Buffer so that they can do .toString() on it
return JSC.JSValue.createBuffer(globalThis, bytes, bun.default_allocator);
}
return JSC.JSValue.createBuffer(globalThis, &.{}, bun.default_allocator);
},
else => {
return JSValue.jsUndefined();
},
}
}
};
pub const CapturedBufferedWriter = bun.shell.eval.NewBufferedWriter(
WriterSrc,
struct {
parent: *Out,
parent: *BufferedOutput,
pub inline fn onDone(this: @This(), e: ?bun.sys.Error) void {
this.parent.onBufferedWriterDone(e);
}
@@ -434,7 +113,7 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh
);
const WriterSrc = struct {
inner: *Out,
inner: *BufferedOutput,
pub inline fn bufToWrite(this: WriterSrc, written: usize) []const u8 {
if (written >= this.inner.internal_buffer.len) return "";
@@ -448,210 +127,16 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh
}
};
pub const Out = struct {
internal_buffer: bun.ByteList = .{},
owns_internal_buffer: bool = true,
subproc: *Subprocess,
out_type: OutKind,
// pub const Pipe = struct {
// writer: Writer = Writer{},
// parent: *Subprocess,
// src: WriterSrc,
writer: ?CapturedBufferedWriter = null,
// writer: ?CapturedBufferedWriter = null,
status: Status = .{
.pending = {},
},
pub const Status = union(enum) {
pending: void,
done: void,
err: bun.sys.Error,
};
};
// pub const BufferedOutput = struct {
// fifo: FIFO = undefined,
// internal_buffer: bun.ByteList = .{},
// auto_sizer: ?JSC.WebCore.AutoSizer = null,
// subproc: *Subprocess,
// out_type: OutKind,
// /// Sometimes the `internal_buffer` may be filled with memory from JSC,
// /// for example an array buffer. In that case we shouldn't dealloc
// /// memory and let the GC do it.
// from_jsc: bool = false,
// status: Status = .{
// .pending = {},
// },
// recall_readall: bool = true,
// /// Used to allow to write to fd and also capture the data
// writer: ?CapturedBufferedWriter = null,
// out: ?*bun.ByteList = null,
// pub const Status = union(enum) {
// pending: void,
// done: void,
// err: bun.sys.Error,
// };
// pub fn init(subproc: *Subprocess, out_type: OutKind, fd: bun.FileDescriptor) BufferedOutput {
// return BufferedOutput{
// .out_type = out_type,
// .subproc = subproc,
// .internal_buffer = .{},
// .fifo = FIFO{
// .fd = fd,
// },
// };
// }
// pub fn initWithArrayBuffer(subproc: *Subprocess, out: *BufferedOutput, comptime out_type: OutKind, fd: bun.FileDescriptor, array_buf: JSC.ArrayBuffer.Strong) void {
// out.* = BufferedOutput.initWithSlice(subproc, out_type, fd, array_buf.slice());
// out.from_jsc = true;
// out.fifo.view = array_buf.held;
// out.fifo.buf = out.internal_buffer.ptr[0..out.internal_buffer.cap];
// }
// pub fn initWithSlice(subproc: *Subprocess, comptime out_type: OutKind, fd: bun.FileDescriptor, slice: []u8) BufferedOutput {
// return BufferedOutput{
// // fixed capacity
// .internal_buffer = bun.ByteList.initWithBuffer(slice),
// .auto_sizer = null,
// .subproc = subproc,
// .fifo = FIFO{
// .fd = fd,
// },
// .out_type = out_type,
// };
// }
// pub fn initWithAllocator(subproc: *Subprocess, out: *BufferedOutput, comptime out_type: OutKind, allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) void {
// out.* = init(subproc, out_type, fd);
// out.auto_sizer = .{
// .max = max_size,
// .allocator = allocator,
// .buffer = &out.internal_buffer,
// };
// out.fifo.auto_sizer = &out.auto_sizer.?;
// }
// pub fn onBufferedWriterDone(this: *BufferedOutput, e: ?bun.sys.Error) void {
// _ = e; // autofix
// defer this.signalDoneToCmd();
// // if (e) |err| {
// // this.status = .{ .err = err };
// // }
// }
// pub fn isDone(this: *BufferedOutput) bool {
// if (this.status != .done and this.status != .err) return false;
// if (this.writer != null) {
// return this.writer.?.isDone();
// }
// return true;
// }
// pub fn signalDoneToCmd(this: *BufferedOutput) void {
// log("signalDoneToCmd ({x}: {s}) isDone={any}", .{ @intFromPtr(this), @tagName(this.out_type), this.isDone() });
// // `this.fifo.close()` will be called from the parent
// // this.fifo.close();
// if (!this.isDone()) return;
// if (this.subproc.cmd_parent) |cmd| {
// if (this.writer != null) {
// if (this.writer.?.err) |e| {
// if (this.status != .err) {
// this.status = .{ .err = e };
// }
// }
// }
// cmd.bufferedOutputClose(this.out_type);
// }
// }
// /// This is called after it is read (it's confusing because "on read" could
// /// be interpreted as present or past tense)
// pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void {
// log("ON READ {s} result={s}", .{ @tagName(this.out_type), @tagName(result) });
// defer {
// if (this.status == .err or this.status == .done) {
// this.signalDoneToCmd();
// } else if (this.recall_readall and this.recall_readall) {
// this.readAll();
// }
// }
// switch (result) {
// .pending => {
// this.watch();
// return;
// },
// .err => |err| {
// if (err == .Error) {
// this.status = .{ .err = err.Error };
// } else {
// this.status = .{ .err = bun.sys.Error.fromCode(.CANCELED, .read) };
// }
// // this.fifo.close();
// // this.closeFifoSignalCmd();
// return;
// },
// .done => {
// this.status = .{ .done = {} };
// // this.fifo.close();
// // this.closeFifoSignalCmd();
// return;
// },
// else => {
// const slice = switch (result) {
// .into_array => this.fifo.buf[0..result.into_array.len],
// else => result.slice(),
// };
// log("buffered output ({s}) onRead: {s}", .{ @tagName(this.out_type), slice });
// this.internal_buffer.len += @as(u32, @truncate(slice.len));
// if (slice.len > 0)
// std.debug.assert(this.internal_buffer.contains(slice));
// if (this.writer != null) {
// this.writer.?.writeIfPossible(false);
// }
// this.fifo.buf = this.internal_buffer.ptr[@min(this.internal_buffer.len, this.internal_buffer.cap)..this.internal_buffer.cap];
// if (result.isDone() or (slice.len == 0 and this.fifo.poll_ref != null and this.fifo.poll_ref.?.isHUP())) {
// this.status = .{ .done = {} };
// // this.fifo.close();
// // this.closeFifoSignalCmd();
// }
// },
// }
// }
// pub fn readAll(this: *BufferedOutput) void {
// log("ShellBufferedOutput.readAll doing nothing", .{});
// this.watch();
// }
// pub fn watch(this: *BufferedOutput) void {
// std.debug.assert(this.fifo.fd != bun.invalid_fd);
// this.fifo.pending.set(BufferedOutput, this, onRead);
// if (!this.fifo.isWatching()) this.fifo.watch(this.fifo.fd);
// return;
// }
// pub fn close(this: *BufferedOutput) void {
// log("BufferedOutput close", .{});
// switch (this.status) {
// .done => {},
// .pending => {
// this.fifo.close();
// this.status = .{ .done = {} };
// },
// .err => {},
// }
// if (this.internal_buffer.cap > 0 and !this.from_jsc) {
// this.internal_buffer.listManaged(bun.default_allocator).deinit();
// this.internal_buffer = .{};
// }
// }
// };
pub const StaticPipeWriter = JSC.Subprocess.NewStaticPipeWriter(Subprocess);
@@ -948,6 +433,9 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh
spawn_args: *SpawnArgs,
out_subproc: **@This(),
) bun.shell.Result(*@This()) {
if (comptime true) {
@panic("TODO");
}
const globalThis = GlobalHandle.init(globalThis_);
const is_sync = config.is_sync;