mirror of
https://github.com/oven-sh/bun
synced 2026-02-14 12:51:54 +00:00
more wip
This commit is contained in:
@@ -175,7 +175,6 @@ pub const FilePoll = struct {
|
||||
const LifecycleScriptSubprocessOutputReader = bun.install.LifecycleScriptSubprocess.OutputReader;
|
||||
|
||||
pub const Owner = bun.TaggedPointerUnion(.{
|
||||
FileReader,
|
||||
FileSink,
|
||||
|
||||
// ShellBufferedWriter,
|
||||
@@ -187,9 +186,7 @@ pub const FilePoll = struct {
|
||||
// ShellBufferedOutput,
|
||||
// ShellBufferedOutputMini,
|
||||
|
||||
ProcessPipeReader,
|
||||
StaticPipeWriter,
|
||||
FileSink,
|
||||
|
||||
Deactivated,
|
||||
DNSResolver,
|
||||
@@ -347,10 +344,6 @@ pub const FilePoll = struct {
|
||||
// var loader = ptr.as(ShellSubprocessCapturedBufferedWriterMini);
|
||||
// loader.onPoll(size_or_offset, 0);
|
||||
// },
|
||||
@field(Owner.Tag, bun.meta.typeBase(@typeName(ProcessPipeReader))) => {
|
||||
var handler: *ProcessPipeReader = ptr.as(ProcessPipeReader);
|
||||
handler.onPoll(size_or_offset);
|
||||
},
|
||||
@field(Owner.Tag, bun.meta.typeBase(@typeName(StaticPipeWriter))) => {
|
||||
var handler: *StaticPipeWriter = ptr.as(StaticPipeWriter);
|
||||
handler.onPoll(size_or_offset);
|
||||
@@ -577,6 +570,11 @@ pub const FilePoll = struct {
|
||||
/// This decrements the active counter if it was previously incremented
|
||||
/// "active" controls whether or not the event loop should potentially idle
|
||||
pub fn disableKeepingProcessAlive(this: *FilePoll, event_loop_ctx_: anytype) void {
|
||||
if (comptime @TypeOf(event_loop_ctx_) == *JSC.EventLoop) {
|
||||
disableKeepingProcessAlive(this, JSC.EventLoopHandle.init(event_loop_ctx_));
|
||||
return;
|
||||
}
|
||||
|
||||
if (comptime @TypeOf(event_loop_ctx_) == JSC.EventLoopHandle) {
|
||||
event_loop_ctx_.loop().subActive(@as(u32, @intFromBool(this.flags.contains(.has_incremented_active_count))));
|
||||
} else {
|
||||
@@ -594,7 +592,19 @@ pub const FilePoll = struct {
|
||||
return this.flags.contains(.keeps_event_loop_alive) and this.flags.contains(.has_incremented_poll_count);
|
||||
}
|
||||
|
||||
pub fn setKeepingProcessAlive(this: *FilePoll, event_loop_ctx_: anytype, value: bool) void {
|
||||
if (value) {
|
||||
this.enableKeepingProcessAlive(event_loop_ctx_);
|
||||
} else {
|
||||
this.disableKeepingProcessAlive(event_loop_ctx_);
|
||||
}
|
||||
}
|
||||
pub fn enableKeepingProcessAlive(this: *FilePoll, event_loop_ctx_: anytype) void {
|
||||
if (comptime @TypeOf(event_loop_ctx_) == *JSC.EventLoop) {
|
||||
enableKeepingProcessAlive(this, JSC.EventLoopHandle.init(event_loop_ctx_));
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.flags.contains(.closed))
|
||||
return;
|
||||
|
||||
|
||||
@@ -111,8 +111,8 @@ pub const ProcessExitHandler = struct {
|
||||
subprocess.onProcessExit(process, status, rusage);
|
||||
},
|
||||
@field(TaggedPointer.Tag, bun.meta.typeBaseName(@typeName(ShellSubprocess))) => {
|
||||
const subprocess = this.ptr.as(ShellSubprocess);
|
||||
subprocess.onProcessExit(process, status, rusage);
|
||||
// const subprocess = this.ptr.as(ShellSubprocess);
|
||||
// subprocess.onProcessExit(process, status, rusage);
|
||||
},
|
||||
else => {
|
||||
@panic("Internal Bun error: ProcessExitHandler has an invalid tag. Please file a bug report.");
|
||||
|
||||
@@ -127,8 +127,6 @@ pub const Subprocess = struct {
|
||||
stderr,
|
||||
};
|
||||
process: *Process = undefined,
|
||||
closed_streams: u8 = 0,
|
||||
deinit_onclose: bool = false,
|
||||
stdin: Writable,
|
||||
stdout: Readable,
|
||||
stderr: Readable,
|
||||
@@ -242,6 +240,8 @@ pub const Subprocess = struct {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn onCloseIO(this: *Subprocess, kind: StdioKind) void {
|
||||
@@ -251,12 +251,12 @@ pub const Subprocess = struct {
|
||||
.pipe => |pipe| {
|
||||
pipe.signal.clear();
|
||||
pipe.deref();
|
||||
this.stdin.* = .{ .ignore = {} };
|
||||
this.stdin = .{ .ignore = {} };
|
||||
},
|
||||
.buffer => {
|
||||
this.stdin.buffer.source.detach();
|
||||
this.stdin.buffer.deref();
|
||||
this.stdin.* = .{ .ignore = {} };
|
||||
this.stdin = .{ .ignore = {} };
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
@@ -327,6 +327,13 @@ pub const Subprocess = struct {
|
||||
ignore: void,
|
||||
closed: void,
|
||||
|
||||
pub fn hasPendingActivity(this: *const Readable) bool {
|
||||
return switch (this.*) {
|
||||
.pipe => this.pipe.hasPendingActivity(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn ref(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
@@ -362,6 +369,7 @@ pub const Subprocess = struct {
|
||||
.fd => Readable{ .fd = fd.? },
|
||||
.memfd => Readable{ .memfd = stdio.memfd },
|
||||
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, fd.?) },
|
||||
.array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -392,7 +400,7 @@ pub const Subprocess = struct {
|
||||
this.* = .{ .closed = {} };
|
||||
_ = bun.sys.close(fd);
|
||||
},
|
||||
.pipe => |*pipe| {
|
||||
.pipe => |pipe| {
|
||||
defer pipe.detach();
|
||||
this.* = .{ .closed = {} };
|
||||
},
|
||||
@@ -598,106 +606,116 @@ pub const Subprocess = struct {
|
||||
return array;
|
||||
}
|
||||
|
||||
pub const StaticPipeWriter = struct {
|
||||
writer: IOWriter = .{},
|
||||
fd: bun.FileDescriptor = bun.invalid_fd,
|
||||
source: Source = .{ .detached = {} },
|
||||
process: *Subprocess = undefined,
|
||||
event_loop: *JSC.EventLoop,
|
||||
ref_count: u32 = 1,
|
||||
pub const Source = union(enum) {
|
||||
blob: JSC.WebCore.AnyBlob,
|
||||
array_buffer: JSC.ArrayBuffer.Strong,
|
||||
detached: void,
|
||||
|
||||
pub usingnamespace bun.NewRefCounted(@This(), deinit);
|
||||
pub fn slice(this: *const Source) []const u8 {
|
||||
return switch (this.*) {
|
||||
.blob => this.blob.sharedView(),
|
||||
.array_buffer => this.array_buffer.slice(),
|
||||
else => @panic("Invalid source"),
|
||||
};
|
||||
}
|
||||
|
||||
pub const IOWriter = bun.io.BufferedWriter(StaticPipeWriter, onWrite, onError, onClose);
|
||||
pub const Poll = IOWriter;
|
||||
|
||||
pub fn updateRef(this: *StaticPipeWriter, add: bool) void {
|
||||
if (add) {
|
||||
this.writer.updateRef(this.event_loop, true);
|
||||
} else {
|
||||
this.writer.updateRef(this.event_loop, false);
|
||||
pub fn detach(this: *@This()) void {
|
||||
switch (this.*) {
|
||||
.blob => {
|
||||
this.blob.detach();
|
||||
},
|
||||
.array_buffer => {
|
||||
this.array_buffer.deinit();
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close(this: *StaticPipeWriter) void {
|
||||
this.writer.close();
|
||||
}
|
||||
|
||||
pub fn flush(this: *StaticPipeWriter) void {
|
||||
this.writer.flush();
|
||||
}
|
||||
|
||||
pub fn create(event_loop: *JSC.EventLoop, subprocess: *Subprocess, fd: bun.FileDescriptor, source: Source) *StaticPipeWriter {
|
||||
return StaticPipeWriter.new(.{
|
||||
.event_loop = event_loop,
|
||||
.process = subprocess,
|
||||
.fd = fd,
|
||||
.source = source,
|
||||
});
|
||||
}
|
||||
|
||||
pub const Source = union(enum) {
|
||||
blob: JSC.WebCore.Blob,
|
||||
array_buffer: JSC.ArrayBuffer.Strong,
|
||||
detached: void,
|
||||
|
||||
pub fn slice(this: *const Source) []const u8 {
|
||||
return switch (this.*) {
|
||||
.blob => this.blob.sharedView(),
|
||||
.array_buffer => this.array_buffer.slice(),
|
||||
else => @panic("Invalid source"),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn detach(this: *@This()) void {
|
||||
switch (this.*) {
|
||||
.blob => {
|
||||
this.blob.detach();
|
||||
},
|
||||
.array_buffer => {
|
||||
this.array_buffer.deinit();
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
this.* = .detached;
|
||||
}
|
||||
};
|
||||
|
||||
pub fn onWrite(this: *StaticPipeWriter, amount: usize, is_done: bool) void {
|
||||
_ = amount; // autofix
|
||||
if (is_done) {
|
||||
this.writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onError(this: *StaticPipeWriter, err: bun.sys.Error) void {
|
||||
_ = err; // autofix
|
||||
this.source.detach();
|
||||
}
|
||||
|
||||
pub fn onClose(this: *StaticPipeWriter) void {
|
||||
this.source.detach();
|
||||
this.process.onCloseIO(.stdin);
|
||||
}
|
||||
|
||||
pub fn deinit(this: *StaticPipeWriter) void {
|
||||
this.writer.end();
|
||||
this.source.detach();
|
||||
this.destroy();
|
||||
}
|
||||
|
||||
pub fn loop(this: *StaticPipeWriter) *uws.Loop {
|
||||
return this.event_loop.virtual_machine.uwsLoop();
|
||||
}
|
||||
|
||||
pub fn eventLoop(this: *StaticPipeWriter) *JSC.EventLoop {
|
||||
return this.event_loop;
|
||||
this.* = .detached;
|
||||
}
|
||||
};
|
||||
|
||||
pub const StaticPipeWriter = NewStaticPipeWriter(Subprocess);
|
||||
|
||||
pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
|
||||
return struct {
|
||||
writer: IOWriter = .{},
|
||||
fd: bun.FileDescriptor = bun.invalid_fd,
|
||||
source: Source = .{ .detached = {} },
|
||||
process: *ProcessType = undefined,
|
||||
event_loop: JSC.EventLoopHandle,
|
||||
ref_count: u32 = 1,
|
||||
buffer: []const u8 = "",
|
||||
|
||||
pub usingnamespace bun.NewRefCounted(@This(), deinit);
|
||||
const This = @This();
|
||||
|
||||
pub const IOWriter = bun.io.BufferedWriter(This, onWrite, onError, onClose, getBuffer);
|
||||
pub const Poll = IOWriter;
|
||||
|
||||
pub fn updateRef(this: *This, add: bool) void {
|
||||
this.writer.updateRef(this.event_loop, add);
|
||||
}
|
||||
|
||||
pub fn getBuffer(this: *This) []const u8 {
|
||||
return this.buffer;
|
||||
}
|
||||
|
||||
pub fn close(this: *This) void {
|
||||
this.writer.close();
|
||||
}
|
||||
|
||||
pub fn flush(this: *This) void {
|
||||
this.writer.flush();
|
||||
}
|
||||
|
||||
pub fn create(event_loop: anytype, subprocess: *ProcessType, fd: bun.FileDescriptor, source: Source) *This {
|
||||
return This.new(.{
|
||||
.event_loop = JSC.EventLoopHandle.init(event_loop),
|
||||
.process = subprocess,
|
||||
.fd = fd,
|
||||
.source = source,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn start(this: *This) JSC.Maybe(void) {
|
||||
return this.writer.start(this.fd, this.source.slice(), this.event_loop, true);
|
||||
}
|
||||
|
||||
pub fn onWrite(this: *This, amount: usize, is_done: bool) void {
|
||||
this.buffer = this.buffer[@min(amount, this.buffer.len)..];
|
||||
if (is_done) {
|
||||
this.writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onError(this: *This, err: bun.sys.Error) void {
|
||||
_ = err; // autofix
|
||||
this.source.detach();
|
||||
}
|
||||
|
||||
pub fn onClose(this: *This) void {
|
||||
this.source.detach();
|
||||
this.process.onCloseIO(.stdin);
|
||||
}
|
||||
|
||||
pub fn deinit(this: *This) void {
|
||||
this.writer.end();
|
||||
this.source.detach();
|
||||
this.destroy();
|
||||
}
|
||||
|
||||
pub fn loop(this: *This) *uws.Loop {
|
||||
return this.event_loop.virtual_machine.uwsLoop();
|
||||
}
|
||||
|
||||
pub fn eventLoop(this: *This) JSC.EventLoopHandle {
|
||||
return this.event_loop;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub const PipeReader = struct {
|
||||
reader: IOReader = .{},
|
||||
process: *Subprocess = undefined,
|
||||
reader: IOReader = undefined,
|
||||
process: ?*Subprocess = null,
|
||||
event_loop: *JSC.EventLoop = undefined,
|
||||
ref_count: u32 = 1,
|
||||
state: union(enum) {
|
||||
@@ -707,23 +725,29 @@ pub const Subprocess = struct {
|
||||
} = .{ .pending = {} },
|
||||
fd: bun.FileDescriptor = bun.invalid_fd,
|
||||
|
||||
pub const IOReader = bun.io.BufferedReader(PipeReader);
|
||||
pub const IOReader = bun.io.BufferedReader;
|
||||
pub const Poll = IOReader;
|
||||
|
||||
// pub usingnamespace bun.NewRefCounted(@This(), deinit);
|
||||
pub usingnamespace bun.NewRefCounted(PipeReader, deinit);
|
||||
|
||||
pub fn hasPendingActivity(this: *const PipeReader) bool {
|
||||
return this.reader.hasPendingRead();
|
||||
}
|
||||
|
||||
pub fn detach(this: *PipeReader) void {
|
||||
this.process = undefined;
|
||||
this.reader.is_done = true;
|
||||
this.deinit();
|
||||
this.process = null;
|
||||
this.deref();
|
||||
}
|
||||
|
||||
pub fn create(event_loop: *JSC.EventLoop, process: *Subprocess, fd: bun.FileDescriptor) *PipeReader {
|
||||
return PipeReader.new(.{
|
||||
var this = PipeReader.new(.{
|
||||
.process = process,
|
||||
.event_loop = event_loop,
|
||||
.fd = fd,
|
||||
.reader = IOReader.init(@This()),
|
||||
});
|
||||
this.reader.setParent(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn readAll(this: *PipeReader) void {
|
||||
@@ -743,8 +767,24 @@ pub const Subprocess = struct {
|
||||
const owned = this.toOwnedSlice();
|
||||
this.state = .{ .done = owned };
|
||||
this.reader.close();
|
||||
this.reader.deref();
|
||||
this.process.onCloseIO(this.kind());
|
||||
if (this.process) |process| {
|
||||
this.process = null;
|
||||
process.onCloseIO(this.kind(process));
|
||||
}
|
||||
|
||||
this.deref();
|
||||
}
|
||||
|
||||
pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind {
|
||||
if (process.stdout == .pipe and process.stdout.pipe == reader) {
|
||||
return .stdout;
|
||||
}
|
||||
|
||||
if (process.stderr == .pipe and process.stderr.pipe == reader) {
|
||||
return .stderr;
|
||||
}
|
||||
|
||||
@panic("We should be either stdout or stderr");
|
||||
}
|
||||
|
||||
pub fn toOwnedSlice(this: *PipeReader) []u8 {
|
||||
@@ -764,18 +804,15 @@ pub const Subprocess = struct {
|
||||
}
|
||||
|
||||
pub fn updateRef(this: *PipeReader, add: bool) void {
|
||||
if (add) {
|
||||
this.reader.updateRef(this.event_loop, true);
|
||||
} else {
|
||||
this.reader.updateRef(this.event_loop, false);
|
||||
}
|
||||
this.reader.updateRef(add);
|
||||
}
|
||||
|
||||
pub fn toReadableStream(this: *PipeReader, globalObject: *JSC.JSGlobalObject) JSC.JSValue {
|
||||
defer this.detach();
|
||||
|
||||
switch (this.state) {
|
||||
.pending => {
|
||||
const stream = JSC.WebCore.ReadableStream.fromPipe(globalObject, &this.reader);
|
||||
defer this.deref();
|
||||
this.state = .{ .done = &.{} };
|
||||
return stream;
|
||||
},
|
||||
@@ -787,7 +824,7 @@ pub const Subprocess = struct {
|
||||
.err => |err| {
|
||||
_ = err; // autofix
|
||||
const empty = JSC.WebCore.ReadableStream.empty(globalObject);
|
||||
JSC.WebCore.ReadableStream.cancel(JSC.WebCore.ReadableStream.fromJS(empty, globalObject), globalObject);
|
||||
JSC.WebCore.ReadableStream.cancel(&JSC.WebCore.ReadableStream.fromJS(empty, globalObject).?, globalObject);
|
||||
return empty;
|
||||
},
|
||||
}
|
||||
@@ -810,19 +847,8 @@ pub const Subprocess = struct {
|
||||
bun.default_allocator.free(this.state.done);
|
||||
}
|
||||
this.state = .{ .err = err };
|
||||
this.process.onCloseIO(this.kind());
|
||||
}
|
||||
|
||||
fn kind(this: *const PipeReader) StdioKind {
|
||||
if (this.process.stdout == .pipe and this.process.stdout.pipe == this) {
|
||||
return .stdout;
|
||||
}
|
||||
|
||||
if (this.process.stderr == .pipe and this.process.stderr.pipe == this) {
|
||||
return .stderr;
|
||||
}
|
||||
|
||||
@panic("We should be either stdout or stderr");
|
||||
if (this.process) |process|
|
||||
process.onCloseIO(this.kind());
|
||||
}
|
||||
|
||||
pub fn close(this: *PipeReader) void {
|
||||
@@ -869,7 +895,7 @@ pub const Subprocess = struct {
|
||||
inherit: void,
|
||||
ignore: void,
|
||||
|
||||
pub fn hasPendingActivity(this: *Writable) bool {
|
||||
pub fn hasPendingActivity(this: *const Writable) bool {
|
||||
return switch (this.*) {
|
||||
// we mark them as .ignore when they are closed, so this must be true
|
||||
.pipe => true,
|
||||
@@ -937,12 +963,12 @@ pub const Subprocess = struct {
|
||||
|
||||
.blob => |blob| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, fd, .{ .blob = blob }),
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, fd.?, .{ .blob = blob }),
|
||||
};
|
||||
},
|
||||
.array_buffer => |array_buffer| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, .{ .array_buffer = array_buffer }),
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, fd.?, .{ .array_buffer = array_buffer }),
|
||||
};
|
||||
},
|
||||
.memfd => |memfd| {
|
||||
@@ -997,7 +1023,7 @@ pub const Subprocess = struct {
|
||||
pub fn close(this: *Writable) void {
|
||||
switch (this.*) {
|
||||
.pipe => |pipe| {
|
||||
pipe.end(null);
|
||||
_ = pipe.end(null);
|
||||
},
|
||||
inline .memfd, .fd => |fd| {
|
||||
_ = bun.sys.close(fd);
|
||||
@@ -1203,13 +1229,13 @@ pub const Subprocess = struct {
|
||||
|
||||
var stdio = [3]Stdio{
|
||||
.{ .ignore = {} },
|
||||
.{ .pipe = null },
|
||||
.{ .pipe = {} },
|
||||
.{ .inherit = {} },
|
||||
};
|
||||
|
||||
if (comptime is_sync) {
|
||||
stdio[1] = .{ .pipe = null };
|
||||
stdio[2] = .{ .pipe = null };
|
||||
stdio[1] = .{ .pipe = {} };
|
||||
stdio[2] = .{ .pipe = {} };
|
||||
}
|
||||
var lazy = false;
|
||||
var on_exit_callback = JSValue.zero;
|
||||
@@ -1612,18 +1638,18 @@ pub const Subprocess = struct {
|
||||
return .zero;
|
||||
},
|
||||
.stdout = Readable.init(
|
||||
stdio[1],
|
||||
jsc_vm.eventLoop(),
|
||||
subprocess,
|
||||
stdio[1],
|
||||
spawned.stdout,
|
||||
jsc_vm.allocator,
|
||||
default_max_buffer_size,
|
||||
is_sync,
|
||||
),
|
||||
.stderr = Readable.init(
|
||||
stdio[2],
|
||||
jsc_vm.eventLoop(),
|
||||
subprocess,
|
||||
stdio[2],
|
||||
spawned.stderr,
|
||||
jsc_vm.allocator,
|
||||
default_max_buffer_size,
|
||||
@@ -1678,7 +1704,7 @@ pub const Subprocess = struct {
|
||||
}
|
||||
|
||||
if (subprocess.stdin == .buffer) {
|
||||
subprocess.stdin.buffer.start(spawned.stdin.?, true);
|
||||
subprocess.stdin.buffer.start().assert();
|
||||
}
|
||||
|
||||
if (subprocess.stdout == .pipe) {
|
||||
@@ -1687,7 +1713,7 @@ pub const Subprocess = struct {
|
||||
}
|
||||
}
|
||||
|
||||
if (subprocess.stderr == .pie) {
|
||||
if (subprocess.stderr == .pipe) {
|
||||
if (is_sync or !lazy) {
|
||||
subprocess.stderr.pipe.readAll();
|
||||
}
|
||||
@@ -1745,315 +1771,6 @@ pub const Subprocess = struct {
|
||||
|
||||
const os = std.os;
|
||||
|
||||
const Stdio = union(enum) {
|
||||
inherit: void,
|
||||
ignore: void,
|
||||
fd: bun.FileDescriptor,
|
||||
path: JSC.Node.PathLike,
|
||||
blob: JSC.WebCore.AnyBlob,
|
||||
array_buffer: JSC.ArrayBuffer.Strong,
|
||||
memfd: bun.FileDescriptor,
|
||||
pipe: void,
|
||||
|
||||
const PipeExtra = struct {
|
||||
fd: i32,
|
||||
fileno: i32,
|
||||
};
|
||||
|
||||
pub fn canUseMemfd(this: *const @This(), is_sync: bool) bool {
|
||||
if (comptime !Environment.isLinux) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return switch (this.*) {
|
||||
.blob => !this.blob.needsToReadFile(),
|
||||
.memfd, .array_buffer => true,
|
||||
.pipe => is_sync,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn byteSlice(this: *const @This()) []const u8 {
|
||||
return switch (this.*) {
|
||||
.blob => this.blob.slice(),
|
||||
.array_buffer => |array_buffer| array_buffer.slice(),
|
||||
else => "",
|
||||
};
|
||||
}
|
||||
|
||||
pub fn useMemfd(this: *@This(), index: u32) void {
|
||||
const label = switch (index) {
|
||||
0 => "spawn_stdio_stdin",
|
||||
1 => "spawn_stdio_stdout",
|
||||
2 => "spawn_stdio_stderr",
|
||||
else => "spawn_stdio_memory_file",
|
||||
};
|
||||
|
||||
// We use the linux syscall api because the glibc requirement is 2.27, which is a little close for comfort.
|
||||
const rc = std.os.linux.memfd_create(label, 0);
|
||||
|
||||
log("memfd_create({s}) = {d}", .{ label, rc });
|
||||
|
||||
switch (std.os.linux.getErrno(rc)) {
|
||||
.SUCCESS => {},
|
||||
else => |errno| {
|
||||
log("Failed to create memfd: {s}", .{@tagName(errno)});
|
||||
return;
|
||||
},
|
||||
}
|
||||
|
||||
const fd = bun.toFD(rc);
|
||||
|
||||
var remain = this.byteSlice();
|
||||
|
||||
if (remain.len > 0)
|
||||
// Hint at the size of the file
|
||||
_ = bun.sys.ftruncate(fd, @intCast(remain.len));
|
||||
|
||||
// Dump all the bytes in there
|
||||
var written: isize = 0;
|
||||
while (remain.len > 0) {
|
||||
switch (bun.sys.pwrite(fd, remain, written)) {
|
||||
.err => |err| {
|
||||
if (err.getErrno() == .AGAIN) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Output.debugWarn("Failed to write to memfd: {s}", .{@tagName(err.getErrno())});
|
||||
_ = bun.sys.close(fd);
|
||||
return;
|
||||
},
|
||||
.result => |result| {
|
||||
if (result == 0) {
|
||||
Output.debugWarn("Failed to write to memfd: EOF", .{});
|
||||
_ = bun.sys.close(fd);
|
||||
return;
|
||||
}
|
||||
written += @intCast(result);
|
||||
remain = remain[result..];
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
switch (this.*) {
|
||||
.array_buffer => this.array_buffer.deinit(),
|
||||
.blob => this.blob.detach(),
|
||||
else => {},
|
||||
}
|
||||
|
||||
this.* = .{ .memfd = fd };
|
||||
}
|
||||
|
||||
fn toPosix(
|
||||
stdio: *@This(),
|
||||
) bun.spawn.SpawnOptions.Stdio {
|
||||
return switch (stdio) {
|
||||
.pipe, .array_buffer, .blob => .{ .buffer = {} },
|
||||
.fd => |fd| .{ .pipe = fd },
|
||||
.memfd => |fd| .{ .pipe = fd },
|
||||
.path => |pathlike| .{ .path = pathlike.slice() },
|
||||
.inherit => .{ .inherit = {} },
|
||||
.ignore => .{ .ignore = {} },
|
||||
};
|
||||
}
|
||||
|
||||
fn toWindows(
|
||||
stdio: *@This(),
|
||||
) bun.spawn.SpawnOptions.Stdio {
|
||||
return switch (stdio) {
|
||||
.pipe, .array_buffer, .blob, .pipe => .{ .buffer = {} },
|
||||
.fd => |fd| .{ .pipe = fd },
|
||||
.path => |pathlike| .{ .path = pathlike.slice() },
|
||||
.inherit => .{ .inherit = {} },
|
||||
.ignore => .{ .ignore = {} },
|
||||
|
||||
.memfd => @panic("This should never happen"),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn asSpawnOption(
|
||||
stdio: *@This(),
|
||||
) bun.spawn.SpawnOptions.Stdio {
|
||||
if (comptime Environment.isWindows) {
|
||||
return stdio.toWindows();
|
||||
} else {
|
||||
return stdio.toPosix();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
fn extractStdioBlob(
|
||||
globalThis: *JSC.JSGlobalObject,
|
||||
blob: JSC.WebCore.AnyBlob,
|
||||
i: u32,
|
||||
out_stdio: *Stdio,
|
||||
) bool {
|
||||
const fd = bun.stdio(i);
|
||||
|
||||
if (blob.needsToReadFile()) {
|
||||
if (blob.store()) |store| {
|
||||
if (store.data.file.pathlike == .fd) {
|
||||
if (store.data.file.pathlike.fd == fd) {
|
||||
out_stdio.* = Stdio{ .inherit = {} };
|
||||
} else {
|
||||
switch (bun.FDTag.get(i)) {
|
||||
.stdin => {
|
||||
if (i == 1 or i == 2) {
|
||||
globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{});
|
||||
return false;
|
||||
}
|
||||
},
|
||||
.stdout, .stderr => {
|
||||
if (i == 0) {
|
||||
globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{});
|
||||
return false;
|
||||
}
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
|
||||
out_stdio.* = Stdio{ .fd = store.data.file.pathlike.fd };
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
out_stdio.* = .{ .path = store.data.file.pathlike.path };
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
out_stdio.* = .{ .blob = blob };
|
||||
return true;
|
||||
}
|
||||
|
||||
fn extractStdio(
|
||||
globalThis: *JSC.JSGlobalObject,
|
||||
i: u32,
|
||||
value: JSValue,
|
||||
out_stdio: *Stdio,
|
||||
) bool {
|
||||
if (value.isEmptyOrUndefinedOrNull()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (value.isString()) {
|
||||
const str = value.getZigString(globalThis);
|
||||
if (str.eqlComptime("inherit")) {
|
||||
out_stdio.* = Stdio{ .inherit = {} };
|
||||
} else if (str.eqlComptime("ignore")) {
|
||||
out_stdio.* = Stdio{ .ignore = {} };
|
||||
} else if (str.eqlComptime("pipe") or str.eqlComptime("overlapped")) {
|
||||
out_stdio.* = Stdio{ .pipe = null };
|
||||
} else if (str.eqlComptime("ipc")) {
|
||||
out_stdio.* = Stdio{ .pipe = null }; // TODO:
|
||||
} else {
|
||||
globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'pipe', 'ignore', Bun.file(pathOrFd), number, or null", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
} else if (value.isNumber()) {
|
||||
const fd = value.asFileDescriptor();
|
||||
if (fd.int() < 0) {
|
||||
globalThis.throwInvalidArguments("file descriptor must be a positive integer", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
if (fd.int() >= std.math.maxInt(i32)) {
|
||||
var formatter = JSC.ConsoleObject.Formatter{ .globalThis = globalThis };
|
||||
globalThis.throwInvalidArguments("file descriptor must be a valid integer, received: {}", .{
|
||||
value.toFmt(globalThis, &formatter),
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (bun.FDTag.get(fd)) {
|
||||
.stdin => {
|
||||
if (i == 1 or i == 2) {
|
||||
globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
out_stdio.* = Stdio{ .inherit = {} };
|
||||
return true;
|
||||
},
|
||||
|
||||
.stdout, .stderr => |tag| {
|
||||
if (i == 0) {
|
||||
globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
if (i == 1 and tag == .stdout) {
|
||||
out_stdio.* = .{ .inherit = {} };
|
||||
return true;
|
||||
} else if (i == 2 and tag == .stderr) {
|
||||
out_stdio.* = .{ .inherit = {} };
|
||||
return true;
|
||||
}
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
|
||||
out_stdio.* = Stdio{ .fd = fd };
|
||||
|
||||
return true;
|
||||
} else if (value.as(JSC.WebCore.Blob)) |blob| {
|
||||
return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, out_stdio);
|
||||
} else if (value.as(JSC.WebCore.Request)) |req| {
|
||||
req.getBodyValue().toBlobIfPossible();
|
||||
return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio);
|
||||
} else if (value.as(JSC.WebCore.Response)) |req| {
|
||||
req.getBodyValue().toBlobIfPossible();
|
||||
return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio);
|
||||
} else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |req_const| {
|
||||
var req = req_const;
|
||||
if (i == 0) {
|
||||
if (req.toAnyBlob(globalThis)) |blob| {
|
||||
return extractStdioBlob(globalThis, blob, i, out_stdio);
|
||||
}
|
||||
|
||||
switch (req.ptr) {
|
||||
.File, .Blob => {
|
||||
globalThis.throwTODO("Support fd/blob backed ReadableStream in spawn stdin. See https://github.com/oven-sh/bun/issues/8049");
|
||||
return false;
|
||||
},
|
||||
.Direct, .JavaScript, .Bytes => {
|
||||
if (req.isLocked(globalThis)) {
|
||||
globalThis.throwInvalidArguments("ReadableStream cannot be locked", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
out_stdio.* = .{ .pipe = req };
|
||||
return true;
|
||||
},
|
||||
.Invalid => {
|
||||
globalThis.throwInvalidArguments("ReadableStream is in invalid state.", .{});
|
||||
return false;
|
||||
},
|
||||
}
|
||||
}
|
||||
} else if (value.asArrayBuffer(globalThis)) |array_buffer| {
|
||||
if (array_buffer.slice().len == 0) {
|
||||
globalThis.throwInvalidArguments("ArrayBuffer cannot be empty", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
out_stdio.* = .{
|
||||
.array_buffer = JSC.ArrayBuffer.Strong{
|
||||
.array_buffer = array_buffer,
|
||||
.held = JSC.Strong.create(array_buffer.value, globalThis),
|
||||
},
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn handleIPCMessage(
|
||||
this: *Subprocess,
|
||||
message: IPC.DecodedIPCMessage,
|
||||
|
||||
@@ -662,7 +662,7 @@ pub const DeferredTaskQueue = struct {
|
||||
return existing.found_existing;
|
||||
}
|
||||
|
||||
pub fn unregisterTask(this: *EventLoop, ctx: ?*anyopaque) bool {
|
||||
pub fn unregisterTask(this: *DeferredTaskQueue, ctx: ?*anyopaque) bool {
|
||||
return this.map.swapRemove(ctx);
|
||||
}
|
||||
|
||||
@@ -757,34 +757,41 @@ pub const EventLoop = struct {
|
||||
defer counter += 1;
|
||||
switch (task.tag()) {
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellLsTask))) => {
|
||||
if (comptime true) @panic("TODO");
|
||||
var shell_ls_task: *ShellLsTask = task.get(ShellLsTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
// shell_ls_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellMvBatchedTask))) => {
|
||||
if (comptime true) @panic("TODO");
|
||||
var shell_mv_batched_task: *ShellMvBatchedTask = task.get(ShellMvBatchedTask).?;
|
||||
shell_mv_batched_task.task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellMvCheckTargetTask))) => {
|
||||
if (comptime true) @panic("TODO");
|
||||
var shell_mv_check_target_task: *ShellMvCheckTargetTask = task.get(ShellMvCheckTargetTask).?;
|
||||
shell_mv_check_target_task.task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellRmTask))) => {
|
||||
if (comptime true) @panic("TODO");
|
||||
var shell_rm_task: *ShellRmTask = task.get(ShellRmTask).?;
|
||||
shell_rm_task.runFromMainThread();
|
||||
// shell_rm_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellRmDirTask))) => {
|
||||
if (comptime true) @panic("TODO");
|
||||
var shell_rm_task: *ShellRmDirTask = task.get(ShellRmDirTask).?;
|
||||
shell_rm_task.runFromMainThread();
|
||||
// shell_rm_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellRmDirTaskMini))) => {
|
||||
if (comptime true) @panic("TODO");
|
||||
var shell_rm_task: *ShellRmDirTaskMini = task.get(ShellRmDirTaskMini).?;
|
||||
shell_rm_task.runFromMainThread();
|
||||
// shell_rm_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellGlobTask))) => {
|
||||
if (comptime true) @panic("TODO");
|
||||
var shell_glob_task: *ShellGlobTask = task.get(ShellGlobTask).?;
|
||||
shell_glob_task.runFromMainThread();
|
||||
shell_glob_task.deinit();
|
||||
|
||||
@@ -545,12 +545,15 @@ pub const Body = struct {
|
||||
|
||||
switch (readable.ptr) {
|
||||
.Blob => |blob| {
|
||||
var store = blob.store orelse {
|
||||
return Body.Value{ .Blob = Blob.initEmpty(globalThis) };
|
||||
};
|
||||
store.ref();
|
||||
readable.forceDetach(globalThis);
|
||||
|
||||
const result: Value = .{
|
||||
.Blob = Blob.initWithStore(blob.store, globalThis),
|
||||
.Blob = Blob.initWithStore(store, globalThis),
|
||||
};
|
||||
blob.store.ref();
|
||||
|
||||
if (!blob.done) {
|
||||
blob.done = true;
|
||||
|
||||
@@ -119,7 +119,7 @@ pub const ReadableStream = struct {
|
||||
|
||||
switch (stream.ptr) {
|
||||
.Blob => |blobby| {
|
||||
var blob = JSC.WebCore.Blob.initWithStore(blobby.store, globalThis);
|
||||
var blob = JSC.WebCore.Blob.initWithStore(blobby.store orelse return null, globalThis);
|
||||
blob.offset = blobby.offset;
|
||||
blob.size = blobby.remain;
|
||||
blob.store.?.ref();
|
||||
@@ -339,6 +339,7 @@ pub const ReadableStream = struct {
|
||||
var reader = FileReader.Source.new(.{
|
||||
.globalThis = globalThis,
|
||||
.context = .{
|
||||
.event_loop = JSC.EventLoopHandle.init(globalThis.bunVM().eventLoop()),
|
||||
.lazy = .{
|
||||
.blob = store,
|
||||
},
|
||||
@@ -357,7 +358,9 @@ pub const ReadableStream = struct {
|
||||
JSC.markBinding(@src());
|
||||
var source = FileReader.Source.new(.{
|
||||
.globalThis = globalThis,
|
||||
.context = .{},
|
||||
.context = .{
|
||||
.event_loop = JSC.EventLoopHandle.init(globalThis.bunVM().eventLoop()),
|
||||
},
|
||||
});
|
||||
source.context.reader.from(buffered_reader, &source.context);
|
||||
|
||||
@@ -2861,7 +2864,7 @@ pub const FileSink = struct {
|
||||
.fd = fd,
|
||||
.event_loop_handle = JSC.EventLoopHandle.init(event_loop_handle),
|
||||
});
|
||||
this.writer.setParent(this);
|
||||
this.writer.parent = this;
|
||||
|
||||
return this;
|
||||
}
|
||||
@@ -2992,16 +2995,17 @@ pub const FileSink = struct {
|
||||
};
|
||||
|
||||
pub const FileReader = struct {
|
||||
reader: IOReader = .{},
|
||||
reader: IOReader = IOReader.init(FileReader),
|
||||
done: bool = false,
|
||||
pending: StreamResult.Pending = .{},
|
||||
pending_value: JSC.Strong = .{},
|
||||
pending_view: []u8 = &.{},
|
||||
fd: bun.FileDescriptor = bun.invalid_fd,
|
||||
|
||||
started: bool = false,
|
||||
event_loop: JSC.EventLoopHandle,
|
||||
lazy: Lazy = .{ .none = {} },
|
||||
|
||||
pub const IOReader = bun.io.BufferedReader(@This());
|
||||
pub const IOReader = bun.io.BufferedReader;
|
||||
pub const Poll = IOReader;
|
||||
pub const tag = ReadableStream.Tag.File;
|
||||
|
||||
@@ -3010,13 +3014,12 @@ pub const FileReader = struct {
|
||||
blob: *Blob.Store,
|
||||
};
|
||||
|
||||
pub fn eventLoop(this: *FileReader) JSC.EventLoopHandle {
|
||||
return this.parent().globalThis.bunVM().eventLoop();
|
||||
pub fn eventLoop(this: *const FileReader) JSC.EventLoopHandle {
|
||||
return this.event_loop;
|
||||
}
|
||||
|
||||
pub fn loop(this: *FileReader) *uws.Loop {
|
||||
_ = this; // autofix
|
||||
return uws.Loop.get();
|
||||
pub fn loop(this: *const FileReader) *Async.Loop {
|
||||
return this.eventLoop().loop();
|
||||
}
|
||||
|
||||
pub fn setup(
|
||||
@@ -3028,6 +3031,8 @@ pub const FileReader = struct {
|
||||
.done = false,
|
||||
.fd = fd,
|
||||
};
|
||||
|
||||
this.event_loop = this.parent().globalThis.bunVM().eventLoop();
|
||||
}
|
||||
|
||||
pub fn onStart(this: *FileReader) StreamStart {
|
||||
@@ -3038,6 +3043,9 @@ pub const FileReader = struct {
|
||||
},
|
||||
}
|
||||
|
||||
this.started = true;
|
||||
this.event_loop = JSC.EventLoopHandle.init(this.parent().globalThis.bunVM().eventLoop());
|
||||
|
||||
return .{ .ready = {} };
|
||||
}
|
||||
|
||||
@@ -3125,7 +3133,7 @@ pub const FileReader = struct {
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
this.pending_value.set(this.parent().globalThis(), array);
|
||||
this.pending_value.set(this.parent().globalThis, array);
|
||||
this.pending_view = buffer;
|
||||
|
||||
return .{ .pending = &this.pending };
|
||||
@@ -3175,10 +3183,11 @@ pub const FileReader = struct {
|
||||
|
||||
pub const ByteBlobLoader = struct {
|
||||
offset: Blob.SizeType = 0,
|
||||
store: *Blob.Store,
|
||||
store: ?*Blob.Store = null,
|
||||
chunk_size: Blob.SizeType = 1024 * 1024 * 2,
|
||||
remain: Blob.SizeType = 1024 * 1024 * 2,
|
||||
done: bool = false,
|
||||
pulled: bool = false,
|
||||
|
||||
pub const tag = ReadableStream.Tag.Blob;
|
||||
|
||||
@@ -3197,7 +3206,10 @@ pub const ByteBlobLoader = struct {
|
||||
this.* = ByteBlobLoader{
|
||||
.offset = blobe.offset,
|
||||
.store = blobe.store.?,
|
||||
.chunk_size = if (user_chunk_size > 0) @min(user_chunk_size, blobe.size) else @min(1024 * 1024 * 2, blobe.size),
|
||||
.chunk_size = @min(
|
||||
if (user_chunk_size > 0) @min(user_chunk_size, blobe.size) else blobe.size,
|
||||
1024 * 1024 * 2,
|
||||
),
|
||||
.remain = blobe.size,
|
||||
.done = false,
|
||||
};
|
||||
@@ -3210,16 +3222,18 @@ pub const ByteBlobLoader = struct {
|
||||
pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult {
|
||||
array.ensureStillAlive();
|
||||
defer array.ensureStillAlive();
|
||||
this.pulled = true;
|
||||
const store = this.store orelse return .{ .done = {} };
|
||||
if (this.done) {
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
var temporary = this.store.sharedView();
|
||||
temporary = temporary[this.offset..];
|
||||
var temporary = store.sharedView();
|
||||
temporary = temporary[@min(this.offset, temporary.len)..];
|
||||
|
||||
temporary = temporary[0..@min(buffer.len, @min(temporary.len, this.remain))];
|
||||
if (temporary.len == 0) {
|
||||
this.store.deref();
|
||||
this.clearStore();
|
||||
this.done = true;
|
||||
return .{ .done = {} };
|
||||
}
|
||||
@@ -3237,19 +3251,26 @@ pub const ByteBlobLoader = struct {
|
||||
return .{ .into_array = .{ .value = array, .len = copied } };
|
||||
}
|
||||
|
||||
pub fn onCancel(_: *ByteBlobLoader) void {}
|
||||
pub fn onCancel(this: *ByteBlobLoader) void {
|
||||
this.clearStore();
|
||||
}
|
||||
|
||||
pub fn deinit(this: *ByteBlobLoader) void {
|
||||
if (!this.done) {
|
||||
this.done = true;
|
||||
this.store.deref();
|
||||
}
|
||||
this.clearStore();
|
||||
|
||||
this.parent().destroy();
|
||||
}
|
||||
|
||||
fn clearStore(this: *ByteBlobLoader) void {
|
||||
if (this.store) |store| {
|
||||
this.store = null;
|
||||
store.deref();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn drain(this: *ByteBlobLoader) bun.ByteList {
|
||||
var temporary = this.store.sharedView();
|
||||
const store = this.store orelse return .{};
|
||||
var temporary = store.sharedView();
|
||||
temporary = temporary[this.offset..];
|
||||
temporary = temporary[0..@min(16384, @min(temporary.len, this.remain))];
|
||||
|
||||
@@ -3358,7 +3379,7 @@ pub const ByteStream = struct {
|
||||
pub fn unpipe(this: *@This()) void {
|
||||
this.pipe.ctx = null;
|
||||
this.pipe.onPipe = null;
|
||||
this.parent().decrementCount();
|
||||
_ = this.parent().decrementCount();
|
||||
}
|
||||
|
||||
pub fn onData(
|
||||
@@ -3379,8 +3400,8 @@ pub const ByteStream = struct {
|
||||
std.debug.assert(!this.has_received_last_chunk);
|
||||
this.has_received_last_chunk = stream.isDone();
|
||||
|
||||
if (this.pipe.ctx != null) {
|
||||
this.pipe.onPipe.?(this.pipe.ctx.?, stream, allocator);
|
||||
if (this.pipe.ctx) |ctx| {
|
||||
this.pipe.onPipe.?(ctx, stream, allocator);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -20,8 +20,8 @@ pub const LifecycleScriptSubprocess = struct {
|
||||
|
||||
finished_fds: u8 = 0,
|
||||
process: ?*Process = null,
|
||||
stdout: OutputReader = .{},
|
||||
stderr: OutputReader = .{},
|
||||
stdout: OutputReader = OutputReader.init(@This()),
|
||||
stderr: OutputReader = OutputReader.init(@This()),
|
||||
manager: *PackageManager,
|
||||
envp: [:null]?[*:0]u8,
|
||||
|
||||
@@ -35,7 +35,7 @@ pub const LifecycleScriptSubprocess = struct {
|
||||
|
||||
const uv = bun.windows.libuv;
|
||||
|
||||
pub const OutputReader = bun.io.BufferedReader(LifecycleScriptSubprocess);
|
||||
pub const OutputReader = bun.io.BufferedReader;
|
||||
|
||||
pub fn loop(this: *const LifecycleScriptSubprocess) *bun.uws.Loop {
|
||||
return this.manager.event_loop.loop();
|
||||
@@ -92,6 +92,8 @@ pub const LifecycleScriptSubprocess = struct {
|
||||
const original_script = this.scripts[next_script_index].?;
|
||||
const cwd = bun.path.z(original_script.cwd, &cwd_z_buf);
|
||||
const env = manager.env;
|
||||
this.stdout.setParent(this);
|
||||
this.stderr.setParent(this);
|
||||
|
||||
if (manager.scripts_node) |scripts_node| {
|
||||
manager.setNodeName(
|
||||
@@ -162,20 +164,11 @@ pub const LifecycleScriptSubprocess = struct {
|
||||
|
||||
if (comptime Environment.isPosix) {
|
||||
if (spawned.stdout) |stdout| {
|
||||
this.stdout = .{
|
||||
.parent = this,
|
||||
.poll = Async.FilePoll.init(manager, stdout, .{}, OutputReader, &this.stdout),
|
||||
};
|
||||
try this.stdout.start().unwrap();
|
||||
try this.stdout.start(stdout, true).unwrap();
|
||||
}
|
||||
|
||||
if (spawned.stderr) |stderr| {
|
||||
this.stderr = .{
|
||||
.parent = this,
|
||||
.poll = Async.FilePoll.init(manager, stderr, .{}, OutputReader, &this.stderr),
|
||||
};
|
||||
|
||||
try this.stderr.start().unwrap();
|
||||
try this.stdout.start(stderr, true).unwrap();
|
||||
}
|
||||
} else if (comptime Environment.isWindows) {
|
||||
if (spawned.stdout == .buffer) {
|
||||
@@ -205,21 +198,21 @@ pub const LifecycleScriptSubprocess = struct {
|
||||
|
||||
pub fn printOutput(this: *LifecycleScriptSubprocess) void {
|
||||
if (!this.manager.options.log_level.isVerbose()) {
|
||||
if (this.stdout.buffer.items.len +| this.stderr.buffer.items.len == 0) {
|
||||
if (this.stdout.buffer().items.len +| this.stderr.buffer().items.len == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
Output.disableBuffering();
|
||||
Output.flush();
|
||||
|
||||
if (this.stdout.buffer.items.len > 0) {
|
||||
Output.errorWriter().print("{s}\n", .{this.stdout.buffer.items}) catch {};
|
||||
this.stdout.buffer.clearAndFree();
|
||||
if (this.stdout.buffer().items.len > 0) {
|
||||
Output.errorWriter().print("{s}\n", .{this.stdout.buffer().items}) catch {};
|
||||
this.stdout.buffer().clearAndFree();
|
||||
}
|
||||
|
||||
if (this.stderr.buffer.items.len > 0) {
|
||||
Output.errorWriter().print("{s}\n", .{this.stderr.buffer.items}) catch {};
|
||||
this.stderr.buffer.clearAndFree();
|
||||
if (this.stderr.buffer().items.len > 0) {
|
||||
Output.errorWriter().print("{s}\n", .{this.stderr.buffer().items}) catch {};
|
||||
this.stderr.buffer().clearAndFree();
|
||||
}
|
||||
|
||||
Output.enableBuffering();
|
||||
@@ -350,8 +343,8 @@ pub const LifecycleScriptSubprocess = struct {
|
||||
this.resetPolls();
|
||||
|
||||
if (!this.manager.options.log_level.isVerbose()) {
|
||||
this.stdout.buffer.clearAndFree();
|
||||
this.stderr.buffer.clearAndFree();
|
||||
this.stdout.deinit();
|
||||
this.stderr.deinit();
|
||||
}
|
||||
|
||||
this.destroy();
|
||||
|
||||
@@ -1,24 +1,22 @@
|
||||
const bun = @import("root").bun;
|
||||
const std = @import("std");
|
||||
|
||||
const PipeReaderVTable = struct {
|
||||
getFd: *const fn (*anyopaque) bun.FileDescriptor,
|
||||
getBuffer: *const fn (*anyopaque) *std.ArrayList(u8),
|
||||
onReadChunk: ?*const fn (*anyopaque, chunk: []u8) void = null,
|
||||
registerPoll: ?*const fn (*anyopaque) void = null,
|
||||
done: *const fn (*anyopaque) void,
|
||||
onError: *const fn (*anyopaque, bun.sys.Error) void,
|
||||
};
|
||||
|
||||
/// Read a blocking pipe without blocking the current thread.
|
||||
pub fn PosixPipeReader(
|
||||
comptime This: type,
|
||||
comptime vtable: PipeReaderVTable,
|
||||
comptime vtable: struct {
|
||||
getFd: *const fn (*This) bun.FileDescriptor,
|
||||
getBuffer: *const fn (*This) *std.ArrayList(u8),
|
||||
onReadChunk: ?*const fn (*This, chunk: []u8) void = null,
|
||||
registerPoll: ?*const fn (*This) void = null,
|
||||
done: *const fn (*This) void,
|
||||
onError: *const fn (*This, bun.sys.Error) void,
|
||||
},
|
||||
) type {
|
||||
return struct {
|
||||
pub fn read(this: *This) void {
|
||||
const buffer = @call(.always_inline, vtable.getBuffer, .{this});
|
||||
const fd = @call(.always_inline, vtable.getFd, .{this});
|
||||
const buffer = vtable.getBuffer(this);
|
||||
const fd = vtable.getFd(this);
|
||||
if (comptime bun.Environment.isLinux) {
|
||||
if (bun.C.linux.RWFFlagSupport.isMaybeSupported()) {
|
||||
readFromBlockingPipeWithoutBlockingLinux(this, buffer, fd, 0);
|
||||
@@ -40,7 +38,7 @@ pub fn PosixPipeReader(
|
||||
|
||||
pub fn onPoll(parent: *This, size_hint: isize) void {
|
||||
const resizable_buffer = vtable.getBuffer(parent);
|
||||
const fd = @call(.always_inline, vtable.getFd, .{parent});
|
||||
const fd = vtable.getFd(parent);
|
||||
|
||||
readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint);
|
||||
}
|
||||
@@ -48,7 +46,7 @@ pub fn PosixPipeReader(
|
||||
const stack_buffer_len = 64 * 1024;
|
||||
|
||||
inline fn drainChunk(parent: *This, resizable_buffer: *std.ArrayList(u8), start_length: usize) void {
|
||||
if (comptime vtable.onReadChunk) |onRead| {
|
||||
if (vtable.onReadChunk) |onRead| {
|
||||
if (resizable_buffer.items[start_length..].len > 0) {
|
||||
const chunk = resizable_buffer.items[start_length..];
|
||||
onRead(parent, chunk);
|
||||
@@ -63,6 +61,7 @@ pub fn PosixPipeReader(
|
||||
}
|
||||
|
||||
const start_length: usize = resizable_buffer.items.len;
|
||||
const streaming = parent.vtable.isStreamingEnabled();
|
||||
|
||||
while (true) {
|
||||
var buffer: []u8 = resizable_buffer.unusedCapacitySlice();
|
||||
@@ -88,8 +87,8 @@ pub fn PosixPipeReader(
|
||||
buffer = resizable_buffer.items;
|
||||
}
|
||||
|
||||
if (comptime vtable.onReadChunk) |onRead| {
|
||||
onRead(parent, buffer);
|
||||
if (streaming) {
|
||||
parent.vtable.onReadChunk(buffer);
|
||||
} else if (buffer.ptr != &stack_buffer) {
|
||||
resizable_buffer.items.len += bytes_read;
|
||||
}
|
||||
@@ -127,6 +126,7 @@ pub fn PosixPipeReader(
|
||||
}
|
||||
|
||||
const start_length: usize = resizable_buffer.items.len;
|
||||
const streaming = parent.vtable.isStreamingEnabled();
|
||||
|
||||
while (true) {
|
||||
var buffer: []u8 = resizable_buffer.unusedCapacitySlice();
|
||||
@@ -153,8 +153,8 @@ pub fn PosixPipeReader(
|
||||
buffer = resizable_buffer.items;
|
||||
}
|
||||
|
||||
if (comptime vtable.onReadChunk) |onRead| {
|
||||
onRead(parent, buffer);
|
||||
if (streaming) {
|
||||
parent.vtable.onReadChunk(buffer);
|
||||
} else if (buffer.ptr != &stack_buffer) {
|
||||
resizable_buffer.items.len += bytes_read;
|
||||
}
|
||||
@@ -290,46 +290,59 @@ const Async = bun.Async;
|
||||
|
||||
// This is a runtime type instead of comptime due to bugs in Zig.
|
||||
// https://github.com/ziglang/zig/issues/18664
|
||||
//
|
||||
const BufferedReaderVTable = struct {
|
||||
parent: *anyopaque = undefined,
|
||||
fns: *const Fn = undefined,
|
||||
|
||||
pub fn init(comptime Type: type) BufferedReaderVTable {
|
||||
return .{
|
||||
.fns = Fn.init(Type),
|
||||
};
|
||||
}
|
||||
|
||||
pub const Fn = struct {
|
||||
onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void = null,
|
||||
onReaderDone: *const fn (*anyopaque) void,
|
||||
onReaderError: *const fn (*anyopaque, bun.sys.Error) void,
|
||||
loop: *const fn (*anyopaque) JSC.EventLoopHandle,
|
||||
loop: *const fn (*anyopaque) *Async.Loop,
|
||||
eventLoop: *const fn (*anyopaque) JSC.EventLoopHandle,
|
||||
|
||||
pub fn init(comptime Type: type) *const BufferedReaderVTable.Fn {
|
||||
const loop_fn = &struct {
|
||||
pub fn loop_fn(this: *anyopaque) *Async.Loop {
|
||||
return Type.loop(@alignCast(@ptrCast(this)));
|
||||
}
|
||||
}.loop_fn;
|
||||
|
||||
const eventLoop_fn = &struct {
|
||||
pub fn eventLoop_fn(this: *anyopaque) JSC.EventLoopHandle {
|
||||
return JSC.EventLoopHandle.init(Type.eventLoop(@alignCast(@ptrCast(this))));
|
||||
}
|
||||
}.eventLoop_fn;
|
||||
return comptime &BufferedReaderVTable.Fn{
|
||||
.onReadChunk = if (@hasDecl(Type, "onReadChunk")) @ptrCast(&Type.onReadChunk) else null,
|
||||
.onReaderDone = @ptrCast(&Type.onReaderDone),
|
||||
.onReaderError = @ptrCast(&Type.onReaderError),
|
||||
.eventLoop = eventLoop_fn,
|
||||
.loop = loop_fn,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(comptime Type: type) *const BufferedReaderVTable.Fn {
|
||||
const loop_fn = &struct {
|
||||
pub fn doLoop(this: *anyopaque) *Async.Loop {
|
||||
return Type.loop(@alignCast(@ptrCast(this)));
|
||||
}
|
||||
}.loop;
|
||||
|
||||
const eventLoop = &struct {
|
||||
pub fn doLoop(this: *anyopaque) JSC.EventLoopHandle {
|
||||
return JSC.EventLoopHandle.init(Type.eventLoop(@alignCast(@ptrCast(this))));
|
||||
}
|
||||
}.eventLoop;
|
||||
return comptime &BufferedReaderVTable.Fn{
|
||||
.onReadChunk = if (@hasDecl(Type, "onReadChunk")) @ptrCast(&Type.onReadChunk) else null,
|
||||
.onReaderDone = @ptrCast(&Type.onReaderDone),
|
||||
.onReaderError = @ptrCast(&Type.onReaderError),
|
||||
.eventLoop = eventLoop,
|
||||
.loop = loop_fn,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn loop(this: @This()) JSC.EventLoopHandle {
|
||||
pub fn eventLoop(this: @This()) JSC.EventLoopHandle {
|
||||
return this.fns.eventLoop(this.parent);
|
||||
}
|
||||
|
||||
pub fn loop(this: @This()) *Async.Loop {
|
||||
return this.fns.loop(this.parent);
|
||||
}
|
||||
|
||||
pub fn isStreamingEnabled(this: @This()) bool {
|
||||
return this.fns.onReadChunk != null;
|
||||
}
|
||||
|
||||
pub fn onReadChunk(this: @This(), chunk: []const u8) void {
|
||||
this.fns.onReadChunk(this.parent, chunk);
|
||||
this.fns.onReadChunk.?(this.parent, chunk);
|
||||
}
|
||||
|
||||
pub fn onReaderDone(this: @This()) void {
|
||||
@@ -345,39 +358,43 @@ const PosixBufferedReader = struct {
|
||||
handle: PollOrFd = .{ .closed = {} },
|
||||
_buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
|
||||
is_done: bool = false,
|
||||
vtable: BufferedReaderVTable = .{},
|
||||
vtable: BufferedReaderVTable,
|
||||
|
||||
pub fn @"for"(comptime Type: type) PosixBufferedReader {
|
||||
pub fn init(comptime Type: type) PosixBufferedReader {
|
||||
return .{
|
||||
.vtable = BufferedReaderVTable.init(Type),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn from(to: *@This(), other: anytype, parent_: *anyopaque) void {
|
||||
pub fn updateRef(this: *const PosixBufferedReader, value: bool) void {
|
||||
const poll = this.handle.getPoll() orelse return;
|
||||
poll.setKeepingProcessAlive(this.vtable.eventLoop(), value);
|
||||
}
|
||||
|
||||
pub fn from(to: *@This(), other: *PosixBufferedReader, parent_: *anyopaque) void {
|
||||
to.* = .{
|
||||
.handle = other.handle,
|
||||
._buffer = other.buffer().*,
|
||||
.is_done = other.is_done,
|
||||
._parent = parent_,
|
||||
.vtable = .{
|
||||
.fns = to.vtable.fns,
|
||||
.parent = parent_,
|
||||
},
|
||||
};
|
||||
other.buffer().* = std.ArrayList(u8).init(bun.default_allocator);
|
||||
to.setParent(parent_);
|
||||
|
||||
other.is_done = true;
|
||||
other.handle = .{ .closed = {} };
|
||||
}
|
||||
|
||||
pub fn setParent(this: *@This(), parent_: *anyopaque) void {
|
||||
this._parent = parent_;
|
||||
if (!this.is_done) {
|
||||
this.handle.setOwner(this);
|
||||
}
|
||||
pub fn setParent(this: *PosixBufferedReader, parent_: *anyopaque) void {
|
||||
this.vtable.parent = parent_;
|
||||
this.handle.setOwner(this);
|
||||
}
|
||||
|
||||
pub usingnamespace PosixPipeReader(@This(), .{
|
||||
.getFd = @ptrCast(&getFd),
|
||||
.getBuffer = @ptrCast(&buffer),
|
||||
.onReadChunk = if (vtable.onReadChunk != null) @ptrCast(&_onReadChunk) else null,
|
||||
.onReadChunk = @ptrCast(&_onReadChunk),
|
||||
.registerPoll = @ptrCast(®isterPoll),
|
||||
.done = @ptrCast(&done),
|
||||
.onError = @ptrCast(&onError),
|
||||
@@ -398,7 +415,7 @@ const PosixBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn buffer(this: *PosixBufferedReader) *std.ArrayList(u8) {
|
||||
return &this._buffer;
|
||||
return &@as(*PosixBufferedReader, @alignCast(@ptrCast(this)))._buffer;
|
||||
}
|
||||
|
||||
pub fn disableKeepingProcessAlive(this: *@This(), event_loop_ctx: anytype) void {
|
||||
@@ -456,7 +473,7 @@ const PosixBufferedReader = struct {
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
const poll = Async.FilePoll.init(this.loop(), fd, .readable, @This(), this);
|
||||
const poll = Async.FilePoll.init(this.eventLoop(), fd, .{}, @This(), this);
|
||||
const maybe = poll.register(this.loop(), .readable, true);
|
||||
if (maybe != .result) {
|
||||
poll.deinit();
|
||||
@@ -476,32 +493,24 @@ const PosixBufferedReader = struct {
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn loop(this: *const PosixBufferedReader) JSC.EventLoopHandle {
|
||||
return vtable.loop(this._parent);
|
||||
pub fn loop(this: *const PosixBufferedReader) *Async.Loop {
|
||||
return this.vtable.loop();
|
||||
}
|
||||
|
||||
pub fn eventLoop(this: *const PosixBufferedReader) JSC.EventLoopHandle {
|
||||
return this.vtable.eventLoop();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn PosixBufferedReader(comptime vtable: anytype) type {
|
||||
return PosixBufferedReaderWithVTable(.{
|
||||
.onReaderDone = @ptrCast(&vtable.onReaderDone),
|
||||
.onReaderError = @ptrCast(&vtable.onReaderError),
|
||||
.onReadChunk = if (@hasDecl(vtable, "onReadChunk")) @ptrCast(&vtable.onReadChunk) else null,
|
||||
.loop = &struct {
|
||||
pub fn doLoop(this: *anyopaque) JSC.EventLoopHandle {
|
||||
_ = this; // autofix
|
||||
// return JSC.EventLoopHandle.init(Parent.eventLoop(@alignCast(@ptrCast(this))));
|
||||
return undefined;
|
||||
}
|
||||
}.doLoop,
|
||||
});
|
||||
}
|
||||
|
||||
const JSC = bun.JSC;
|
||||
|
||||
const WindowsOutputReaderVTable = struct {
|
||||
onReaderDone: *const fn (*anyopaque) void,
|
||||
onReaderError: *const fn (*anyopaque, bun.sys.Error) void,
|
||||
onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void = null,
|
||||
onReadChunk: ?*const fn (
|
||||
*anyopaque,
|
||||
chunk: []const u8,
|
||||
) void = null,
|
||||
};
|
||||
|
||||
pub const GenericWindowsBufferedReader = struct {
|
||||
|
||||
@@ -131,30 +131,26 @@ const PollOrFd = @import("./pipes.zig").PollOrFd;
|
||||
|
||||
pub fn PosixBufferedWriter(
|
||||
comptime Parent: type,
|
||||
comptime onWrite: fn (*Parent, amount: usize, done: bool) void,
|
||||
comptime onError: fn (*Parent, bun.sys.Error) void,
|
||||
comptime onClose: fn (*Parent) void,
|
||||
comptime onWrite: *const fn (*Parent, amount: usize, done: bool) void,
|
||||
comptime onError: *const fn (*Parent, bun.sys.Error) void,
|
||||
comptime onClose: *const fn (*Parent) void,
|
||||
comptime getBuffer: *const fn (*Parent) []const u8,
|
||||
) type {
|
||||
return struct {
|
||||
buffer: []const u8 = "",
|
||||
handle: PollOrFd = .{ .closed = {} },
|
||||
parent: *Parent = undefined,
|
||||
is_done: bool = false,
|
||||
|
||||
const PosixWriter = @This();
|
||||
|
||||
pub fn getPoll(this: *@This()) ?*Async.FilePoll {
|
||||
pub fn getPoll(this: *const @This()) ?*Async.FilePoll {
|
||||
return this.handle.getPoll();
|
||||
}
|
||||
|
||||
pub fn getFd(this: *PosixWriter) bun.FileDescriptor {
|
||||
pub fn getFd(this: *const PosixWriter) bun.FileDescriptor {
|
||||
return this.handle.getFd();
|
||||
}
|
||||
|
||||
pub fn getBuffer(this: *PosixWriter) []const u8 {
|
||||
return this.buffer;
|
||||
}
|
||||
|
||||
fn _onError(
|
||||
this: *PosixWriter,
|
||||
err: bun.sys.Error,
|
||||
@@ -172,7 +168,6 @@ pub fn PosixBufferedWriter(
|
||||
done: bool,
|
||||
) void {
|
||||
const was_done = this.is_done == true;
|
||||
this.buffer = this.buffer[written..];
|
||||
const parent = this.parent;
|
||||
|
||||
onWrite(parent, written, done);
|
||||
@@ -209,19 +204,19 @@ pub fn PosixBufferedWriter(
|
||||
return poll.canEnableKeepingProcessAlive();
|
||||
}
|
||||
|
||||
pub fn enableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void {
|
||||
if (this.is_done) return;
|
||||
|
||||
const poll = this.getPoll() orelse return;
|
||||
poll.enableKeepingProcessAlive(event_loop);
|
||||
pub fn enableKeepingProcessAlive(this: *PosixWriter, event_loop: anytype) void {
|
||||
this.updateRef(event_loop, true);
|
||||
}
|
||||
|
||||
pub fn disableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void {
|
||||
const poll = this.getPoll() orelse return;
|
||||
poll.disableKeepingProcessAlive(event_loop);
|
||||
pub fn disableKeepingProcessAlive(this: *PosixWriter, event_loop: anytype) void {
|
||||
this.updateRef(event_loop, false);
|
||||
}
|
||||
|
||||
pub usingnamespace PosixPipeWriter(@This(), getFd, getBuffer, _onWrite, registerPoll, _onError, _onWritable);
|
||||
fn getBufferInternal(this: *PosixWriter) []const u8 {
|
||||
return getBuffer(this.parent);
|
||||
}
|
||||
|
||||
pub usingnamespace PosixPipeWriter(@This(), getFd, getBufferInternal, _onWrite, registerPoll, _onError, _onWritable);
|
||||
|
||||
pub fn end(this: *PosixWriter) void {
|
||||
if (this.is_done) {
|
||||
@@ -236,16 +231,12 @@ pub fn PosixBufferedWriter(
|
||||
this.handle.close(this.parent, onClose);
|
||||
}
|
||||
|
||||
pub fn updateRef(this: *PosixWriter, value: bool, event_loop: JSC.EventLoopHandle) void {
|
||||
if (value) {
|
||||
this.enableKeepingProcessAlive(event_loop);
|
||||
} else {
|
||||
this.disableKeepingProcessAlive(event_loop);
|
||||
}
|
||||
pub fn updateRef(this: *const PosixWriter, event_loop: anytype, value: bool) void {
|
||||
const poll = this.getPoll() orelse return;
|
||||
poll.setKeepingProcessAlive(event_loop, value);
|
||||
}
|
||||
|
||||
pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, bytes: []const u8, pollable: bool) JSC.Maybe(void) {
|
||||
this.buffer = bytes;
|
||||
pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, pollable: bool) JSC.Maybe(void) {
|
||||
if (!pollable) {
|
||||
std.debug.assert(this.handle != .poll);
|
||||
this.handle = .{ .fd = fd };
|
||||
@@ -253,7 +244,7 @@ pub fn PosixBufferedWriter(
|
||||
}
|
||||
const loop = @as(*Parent, @ptrCast(this.parent)).loop();
|
||||
var poll = this.getPoll() orelse brk: {
|
||||
this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) };
|
||||
this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .{}, PosixWriter, this) };
|
||||
break :brk this.handle.poll;
|
||||
};
|
||||
|
||||
@@ -344,7 +335,7 @@ pub fn PosixStreamingWriter(
|
||||
const poll = this.getPoll() orelse return;
|
||||
switch (poll.registerWithFd(@as(*Parent, @ptrCast(this.parent)).loop(), .writable, true, poll.fd)) {
|
||||
.err => |err| {
|
||||
onError(this, err);
|
||||
onError(this.parent, err);
|
||||
this.close();
|
||||
},
|
||||
.result => {},
|
||||
@@ -432,6 +423,7 @@ pub fn PosixStreamingWriter(
|
||||
|
||||
return .{ .done = amt };
|
||||
},
|
||||
else => |r| return r,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -536,7 +528,7 @@ pub fn PosixStreamingWriter(
|
||||
|
||||
const loop = @as(*Parent, @ptrCast(this.parent)).loop();
|
||||
var poll = this.getPoll() orelse brk: {
|
||||
this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) };
|
||||
this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .{}, PosixWriter, this) };
|
||||
break :brk this.handle.poll;
|
||||
};
|
||||
|
||||
|
||||
@@ -38,12 +38,12 @@ pub const PollOrFd = union(enum) {
|
||||
}
|
||||
|
||||
if (fd != bun.invalid_fd) {
|
||||
this.handle = .{ .closed = {} };
|
||||
this.* = .{ .closed = {} };
|
||||
_ = bun.sys.close(fd);
|
||||
if (comptime onCloseFn != void)
|
||||
onCloseFn(@ptrCast(ctx.?));
|
||||
if (comptime @TypeOf(onCloseFn) != void)
|
||||
onCloseFn(@alignCast(@ptrCast(ctx.?)));
|
||||
} else {
|
||||
this.handle = .{ .closed = {} };
|
||||
this.* = .{ .closed = {} };
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -17,128 +17,326 @@ const os = std.os;
|
||||
pub const OutKind = enum { stdout, stderr };
|
||||
|
||||
pub const Stdio = union(enum) {
|
||||
/// When set to true, it means to capture the output
|
||||
inherit: struct { captured: ?*bun.ByteList = null },
|
||||
inherit: void,
|
||||
capture: *bun.ByteList,
|
||||
ignore: void,
|
||||
fd: bun.FileDescriptor,
|
||||
path: JSC.Node.PathLike,
|
||||
blob: JSC.WebCore.AnyBlob,
|
||||
pipe: ?JSC.WebCore.ReadableStream,
|
||||
array_buffer: struct { buf: JSC.ArrayBuffer.Strong, from_jsc: bool = false },
|
||||
array_buffer: JSC.ArrayBuffer.Strong,
|
||||
memfd: bun.FileDescriptor,
|
||||
pipe: void,
|
||||
|
||||
pub fn toPosix(self: Stdio) bun.spawn.SpawnOptions.Stdio {
|
||||
return switch (self) {
|
||||
.pipe, .blob, .array_buffer => .{ .buffer = {} },
|
||||
.inherit => |inherit| if (inherit.captured == null) .{ .inherit = {} } else .{ .buffer = {} },
|
||||
.fd => .{ .pipe = self.fd },
|
||||
.path => .{ .path = self.path.slice() },
|
||||
.ignore => .{ .ignore = {} },
|
||||
};
|
||||
const log = bun.sys.syslog;
|
||||
|
||||
pub fn deinit(this: *Stdio) void {
|
||||
switch (this.*) {
|
||||
.array_buffer => |*array_buffer| {
|
||||
array_buffer.deinit();
|
||||
},
|
||||
.blob => |*blob| {
|
||||
blob.detach();
|
||||
},
|
||||
.memfd => |fd| {
|
||||
_ = bun.sys.close(fd);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn isPiped(self: Stdio) bool {
|
||||
return switch (self) {
|
||||
.array_buffer, .blob, .pipe => true,
|
||||
.inherit => self.inherit.captured != null,
|
||||
pub fn canUseMemfd(this: *const @This(), is_sync: bool) bool {
|
||||
if (comptime !Environment.isLinux) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return switch (this.*) {
|
||||
.blob => !this.blob.needsToReadFile(),
|
||||
.memfd, .array_buffer => true,
|
||||
.pipe => is_sync,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn setUpChildIoPosixSpawn(
|
||||
stdio: @This(),
|
||||
actions: *PosixSpawn.Actions,
|
||||
pipe_fd: [2]bun.FileDescriptor,
|
||||
comptime std_fileno: bun.FileDescriptor,
|
||||
) !void {
|
||||
switch (stdio) {
|
||||
.array_buffer, .blob, .pipe => {
|
||||
std.debug.assert(!(stdio == .blob and stdio.blob.needsToReadFile()));
|
||||
const idx: usize = if (std_fileno == bun.STDIN_FD) 0 else 1;
|
||||
pub fn useMemfd(this: *@This(), index: u32) void {
|
||||
const label = switch (index) {
|
||||
0 => "spawn_stdio_stdin",
|
||||
1 => "spawn_stdio_stdout",
|
||||
2 => "spawn_stdio_stderr",
|
||||
else => "spawn_stdio_memory_file",
|
||||
};
|
||||
|
||||
try actions.dup2(pipe_fd[idx], std_fileno);
|
||||
try actions.close(pipe_fd[1 - idx]);
|
||||
},
|
||||
.inherit => {
|
||||
if (stdio.inherit.captured != null) {
|
||||
// Same as above
|
||||
std.debug.assert(!(stdio == .blob and stdio.blob.needsToReadFile()));
|
||||
const idx: usize = if (std_fileno == bun.STDIN_FD) 0 else 1;
|
||||
// We use the linux syscall api because the glibc requirement is 2.27, which is a little close for comfort.
|
||||
const rc = std.os.linux.memfd_create(label, 0);
|
||||
|
||||
try actions.dup2(pipe_fd[idx], std_fileno);
|
||||
try actions.close(pipe_fd[1 - idx]);
|
||||
return;
|
||||
}
|
||||
log("memfd_create({s}) = {d}", .{ label, rc });
|
||||
|
||||
if (comptime Environment.isMac) {
|
||||
try actions.inherit(std_fileno);
|
||||
} else {
|
||||
try actions.dup2(std_fileno, std_fileno);
|
||||
}
|
||||
},
|
||||
.fd => |fd| {
|
||||
try actions.dup2(fd, std_fileno);
|
||||
},
|
||||
.path => |pathlike| {
|
||||
const flag = if (std_fileno == bun.STDIN_FD) @as(u32, os.O.RDONLY) else @as(u32, std.os.O.WRONLY);
|
||||
try actions.open(std_fileno, pathlike.slice(), flag | std.os.O.CREAT, 0o664);
|
||||
},
|
||||
.ignore => {
|
||||
const flag = if (std_fileno == bun.STDIN_FD) @as(u32, os.O.RDONLY) else @as(u32, std.os.O.WRONLY);
|
||||
try actions.openZ(std_fileno, "/dev/null", flag, 0o664);
|
||||
switch (std.os.linux.getErrno(rc)) {
|
||||
.SUCCESS => {},
|
||||
else => |errno| {
|
||||
log("Failed to create memfd: {s}", .{@tagName(errno)});
|
||||
return;
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub fn extractStdioBlob(
|
||||
globalThis: *JSC.JSGlobalObject,
|
||||
blob: JSC.WebCore.AnyBlob,
|
||||
i: u32,
|
||||
stdio_array: []Stdio,
|
||||
) bool {
|
||||
const fd = bun.stdio(i);
|
||||
const fd = bun.toFD(rc);
|
||||
|
||||
if (blob.needsToReadFile()) {
|
||||
if (blob.store()) |store| {
|
||||
if (store.data.file.pathlike == .fd) {
|
||||
if (store.data.file.pathlike.fd == fd) {
|
||||
stdio_array[i] = Stdio{ .inherit = .{} };
|
||||
} else {
|
||||
switch (bun.FDTag.get(i)) {
|
||||
.stdin => {
|
||||
if (i == 1 or i == 2) {
|
||||
globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{});
|
||||
return false;
|
||||
}
|
||||
},
|
||||
var remain = this.byteSlice();
|
||||
|
||||
.stdout, .stderr => {
|
||||
if (i == 0) {
|
||||
globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{});
|
||||
return false;
|
||||
}
|
||||
},
|
||||
else => {},
|
||||
if (remain.len > 0)
|
||||
// Hint at the size of the file
|
||||
_ = bun.sys.ftruncate(fd, @intCast(remain.len));
|
||||
|
||||
// Dump all the bytes in there
|
||||
var written: isize = 0;
|
||||
while (remain.len > 0) {
|
||||
switch (bun.sys.pwrite(fd, remain, written)) {
|
||||
.err => |err| {
|
||||
if (err.getErrno() == .AGAIN) {
|
||||
continue;
|
||||
}
|
||||
|
||||
stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd };
|
||||
}
|
||||
|
||||
return true;
|
||||
Output.debugWarn("Failed to write to memfd: {s}", .{@tagName(err.getErrno())});
|
||||
_ = bun.sys.close(fd);
|
||||
return;
|
||||
},
|
||||
.result => |result| {
|
||||
if (result == 0) {
|
||||
Output.debugWarn("Failed to write to memfd: EOF", .{});
|
||||
_ = bun.sys.close(fd);
|
||||
return;
|
||||
}
|
||||
written += @intCast(result);
|
||||
remain = remain[result..];
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
stdio_array[i] = .{ .path = store.data.file.pathlike.path };
|
||||
return true;
|
||||
switch (this.*) {
|
||||
.array_buffer => this.array_buffer.deinit(),
|
||||
.blob => this.blob.detach(),
|
||||
else => {},
|
||||
}
|
||||
|
||||
this.* = .{ .memfd = fd };
|
||||
}
|
||||
|
||||
fn toPosix(
|
||||
stdio: *@This(),
|
||||
) bun.spawn.SpawnOptions.Stdio {
|
||||
return switch (stdio.*) {
|
||||
.capture, .pipe, .array_buffer, .blob => .{ .buffer = {} },
|
||||
.fd => |fd| .{ .pipe = fd },
|
||||
.memfd => |fd| .{ .pipe = fd },
|
||||
.path => |pathlike| .{ .path = pathlike.slice() },
|
||||
.inherit => .{ .inherit = {} },
|
||||
.ignore => .{ .ignore = {} },
|
||||
};
|
||||
}
|
||||
|
||||
fn toWindows(
|
||||
stdio: *@This(),
|
||||
) bun.spawn.SpawnOptions.Stdio {
|
||||
return switch (stdio.*) {
|
||||
.capture, .pipe, .array_buffer, .blob => .{ .buffer = {} },
|
||||
.fd => |fd| .{ .pipe = fd },
|
||||
.path => |pathlike| .{ .path = pathlike.slice() },
|
||||
.inherit => .{ .inherit = {} },
|
||||
.ignore => .{ .ignore = {} },
|
||||
|
||||
.memfd => @panic("This should never happen"),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn asSpawnOption(
|
||||
stdio: *@This(),
|
||||
) bun.spawn.SpawnOptions.Stdio {
|
||||
if (comptime Environment.isWindows) {
|
||||
return stdio.toWindows();
|
||||
} else {
|
||||
return stdio.toPosix();
|
||||
}
|
||||
}
|
||||
|
||||
if (i == 1 or i == 2) {
|
||||
globalThis.throwInvalidArguments("Blobs are immutable, and cannot be used for stdout/stderr", .{});
|
||||
pub fn isPiped(self: Stdio) bool {
|
||||
return switch (self) {
|
||||
.capture, .array_buffer, .blob, .pipe => true,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
fn extractStdio(
|
||||
out_stdio: *Stdio,
|
||||
globalThis: *JSC.JSGlobalObject,
|
||||
i: u32,
|
||||
value: JSValue,
|
||||
) bool {
|
||||
if (value.isEmptyOrUndefinedOrNull()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (value.isString()) {
|
||||
const str = value.getZigString(globalThis);
|
||||
if (str.eqlComptime("inherit")) {
|
||||
out_stdio.* = Stdio{ .inherit = {} };
|
||||
} else if (str.eqlComptime("ignore")) {
|
||||
out_stdio.* = Stdio{ .ignore = {} };
|
||||
} else if (str.eqlComptime("pipe") or str.eqlComptime("overlapped")) {
|
||||
out_stdio.* = Stdio{ .pipe = {} };
|
||||
} else if (str.eqlComptime("ipc")) {
|
||||
out_stdio.* = Stdio{ .pipe = {} }; // TODO:
|
||||
} else {
|
||||
globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'pipe', 'ignore', Bun.file(pathOrFd), number, or null", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
} else if (value.isNumber()) {
|
||||
const fd = value.asFileDescriptor();
|
||||
if (fd.int() < 0) {
|
||||
globalThis.throwInvalidArguments("file descriptor must be a positive integer", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
if (fd.int() >= std.math.maxInt(i32)) {
|
||||
var formatter = JSC.ConsoleObject.Formatter{ .globalThis = globalThis };
|
||||
globalThis.throwInvalidArguments("file descriptor must be a valid integer, received: {}", .{
|
||||
value.toFmt(globalThis, &formatter),
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (bun.FDTag.get(fd)) {
|
||||
.stdin => {
|
||||
if (i == 1 or i == 2) {
|
||||
globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
out_stdio.* = Stdio{ .inherit = {} };
|
||||
return true;
|
||||
},
|
||||
|
||||
.stdout, .stderr => |tag| {
|
||||
if (i == 0) {
|
||||
globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
if (i == 1 and tag == .stdout) {
|
||||
out_stdio.* = .{ .inherit = {} };
|
||||
return true;
|
||||
} else if (i == 2 and tag == .stderr) {
|
||||
out_stdio.* = .{ .inherit = {} };
|
||||
return true;
|
||||
}
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
|
||||
out_stdio.* = Stdio{ .fd = fd };
|
||||
|
||||
return true;
|
||||
} else if (value.as(JSC.WebCore.Blob)) |blob| {
|
||||
return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, out_stdio);
|
||||
} else if (value.as(JSC.WebCore.Request)) |req| {
|
||||
req.getBodyValue().toBlobIfPossible();
|
||||
return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio);
|
||||
} else if (value.as(JSC.WebCore.Response)) |req| {
|
||||
req.getBodyValue().toBlobIfPossible();
|
||||
return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, out_stdio);
|
||||
} else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |req_const| {
|
||||
var req = req_const;
|
||||
if (i == 0) {
|
||||
if (req.toAnyBlob(globalThis)) |blob| {
|
||||
return extractStdioBlob(globalThis, blob, i, out_stdio);
|
||||
}
|
||||
|
||||
switch (req.ptr) {
|
||||
.File, .Blob => {
|
||||
globalThis.throwTODO("Support fd/blob backed ReadableStream in spawn stdin. See https://github.com/oven-sh/bun/issues/8049");
|
||||
return false;
|
||||
},
|
||||
.Direct, .JavaScript, .Bytes => {
|
||||
// out_stdio.* = .{ .connect = req };
|
||||
globalThis.throwTODO("Re-enable ReadableStream support in spawn stdin. ");
|
||||
return false;
|
||||
},
|
||||
.Invalid => {
|
||||
globalThis.throwInvalidArguments("ReadableStream is in invalid state.", .{});
|
||||
return false;
|
||||
},
|
||||
}
|
||||
}
|
||||
} else if (value.asArrayBuffer(globalThis)) |array_buffer| {
|
||||
if (array_buffer.slice().len == 0) {
|
||||
globalThis.throwInvalidArguments("ArrayBuffer cannot be empty", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
out_stdio.* = .{
|
||||
.array_buffer = JSC.ArrayBuffer.Strong{
|
||||
.array_buffer = array_buffer,
|
||||
.held = JSC.Strong.create(array_buffer.value, globalThis),
|
||||
},
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
stdio_array[i] = .{ .blob = blob };
|
||||
return true;
|
||||
}
|
||||
pub fn extractStdioBlob(
|
||||
globalThis: *JSC.JSGlobalObject,
|
||||
blob: JSC.WebCore.AnyBlob,
|
||||
i: u32,
|
||||
stdio_array: []Stdio,
|
||||
) bool {
|
||||
const fd = bun.stdio(i);
|
||||
|
||||
if (blob.needsToReadFile()) {
|
||||
if (blob.store()) |store| {
|
||||
if (store.data.file.pathlike == .fd) {
|
||||
if (store.data.file.pathlike.fd == fd) {
|
||||
stdio_array[i] = Stdio{ .inherit = .{} };
|
||||
} else {
|
||||
switch (bun.FDTag.get(i)) {
|
||||
.stdin => {
|
||||
if (i == 1 or i == 2) {
|
||||
globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{});
|
||||
return false;
|
||||
}
|
||||
},
|
||||
|
||||
.stdout, .stderr => {
|
||||
if (i == 0) {
|
||||
globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{});
|
||||
return false;
|
||||
}
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
|
||||
stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd };
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
stdio_array[i] = .{ .path = store.data.file.pathlike.path };
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (i == 1 or i == 2) {
|
||||
globalThis.throwInvalidArguments("Blobs are immutable, and cannot be used for stdout/stderr", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
stdio_array[i] = .{ .blob = blob };
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
pub const WatchFd = if (Environment.isLinux) std.os.fd_t else i32;
|
||||
|
||||
Reference in New Issue
Block a user