This commit is contained in:
Jarred Sumner
2024-01-30 10:04:03 -08:00
parent 58132dc9da
commit cb63d2bf69
9 changed files with 365 additions and 193 deletions

View File

@@ -166,7 +166,7 @@ pub const FilePoll = struct {
const Process = bun.spawn.Process;
const Subprocess = JSC.Subprocess;
const BufferedInput = Subprocess.BufferedInput;
const BufferedOutput = Subprocess.BufferedOutput;
const BufferedOutput = Subprocess.StreamingOutput;
const DNSResolver = JSC.DNS.DNSResolver;
const GetAddrInfoRequest = JSC.DNS.GetAddrInfoRequest;
const Deactivated = opaque {
@@ -376,7 +376,7 @@ pub const FilePoll = struct {
loader.onMachportChange();
},
@field(Owner.Tag, "PosixOutputReader") => {
@field(Owner.Tag, bun.meta.typeBaseName(@typeName(LifecycleScriptSubprocessOutputReader))) => {
log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) OutputReader", .{poll.fd});
var output: *LifecycleScriptSubprocessOutputReader = ptr.as(LifecycleScriptSubprocessOutputReader);
output.onPoll(size_or_offset);

View File

@@ -185,12 +185,18 @@ pub const Process = struct {
return this.status.signalCode();
}
pub fn wait(this: *Process, sync: bool) void {
pub fn waitPosix(this: *Process, sync: bool) void {
var rusage = std.mem.zeroes(Rusage);
const waitpid_result = PosixSpawn.wait4(this.pid, if (sync) 0 else std.os.W.NOHANG, &rusage);
this.onWaitPid(&waitpid_result, &rusage);
}
pub fn wait(this: *Process, sync: bool) void {
if (comptime Environment.isPosix) {
this.waitPosix(sync);
} else if (comptime Environment.isWindows) {}
}
pub fn onWaitPidFromWaiterThread(this: *Process, waitpid_result: *const JSC.Maybe(PosixSpawn.WaitPidResult)) void {
if (comptime Environment.isWindows) {
@compileError("not implemented on this platform");
@@ -526,19 +532,19 @@ pub const PollerPosix = union(enum) {
detached: void,
pub fn deinit(this: *PollerPosix) void {
if (this.poller == .fd) {
this.poller.fd.deinit();
} else if (this.poller == .waiter_thread) {
this.poller.waiter_thread.disable();
if (this.* == .fd) {
this.fd.deinit();
} else if (this.* == .waiter_thread) {
this.waiter_thread.disable();
}
}
pub fn enableKeepingEventLoopAlive(this: *Poller, event_loop: JSC.EventLoopHandle) void {
switch (this.*) {
.fd => |poll| {
poll.enableKeepingEventLoopAlive(event_loop);
poll.enableKeepingProcessAlive(event_loop);
},
.waiter_thread => |waiter| {
.waiter_thread => |*waiter| {
waiter.ref(event_loop);
},
else => {},
@@ -548,9 +554,9 @@ pub const PollerPosix = union(enum) {
pub fn disableKeepingEventLoopAlive(this: *PollerPosix, event_loop: JSC.EventLoopHandle) void {
switch (this.*) {
.fd => |poll| {
poll.disableKeepingEventLoopAlive(event_loop);
poll.disableKeepingProcessAlive(event_loop);
},
.waiter_thread => |waiter| {
.waiter_thread => |*waiter| {
waiter.unref(event_loop);
},
else => {},
@@ -559,7 +565,7 @@ pub const PollerPosix = union(enum) {
pub fn hasRef(this: *const PollerPosix) bool {
return switch (this.*) {
.fd => this.fd.hasRef(),
.fd => this.fd.canEnableKeepingProcessAlive(),
.waiter_thread => this.waiter_thread.isActive(),
else => false,
};

View File

@@ -122,6 +122,11 @@ pub const Subprocess = struct {
const log = Output.scoped(.Subprocess, false);
pub usingnamespace JSC.Codegen.JSSubprocess;
const default_max_buffer_size = 1024 * 1024 * 4;
pub const StdioKind = enum {
stdin,
stdout,
stderr,
};
process: *Process = undefined,
closed_streams: u8 = 0,
deinit_onclose: bool = false,
@@ -141,11 +146,7 @@ pub const Subprocess = struct {
stderr,
stdio,
}) = .{},
closed: std.enums.EnumSet(enum {
stdin,
stdout,
stderr,
}) = .{},
closed: std.enums.EnumSet(StdioKind) = .{},
has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
this_jsvalue: JSC.JSValue = .zero,
@@ -276,6 +277,7 @@ pub const Subprocess = struct {
const Readable = union(enum) {
fd: bun.FileDescriptor,
memfd: bun.FileDescriptor,
sync_buffered_output: *BufferedOutput,
pipe: Pipe,
inherit: void,
@@ -318,7 +320,7 @@ pub const Subprocess = struct {
pub const Pipe = union(enum) {
stream: JSC.WebCore.ReadableStream,
buffer: BufferedOutput,
buffer: StreamingOutput,
detached: void,
pub fn finish(this: *@This()) void {
@@ -370,7 +372,7 @@ pub const Subprocess = struct {
.pipe => brk: {
break :brk .{
.pipe = .{
.buffer = BufferedOutput.initWithPipeAndAllocator(allocator, pipe, max_size),
.buffer = StreamingOutput.initWithPipeAndAllocator(allocator, pipe, max_size),
},
};
},
@@ -379,24 +381,26 @@ pub const Subprocess = struct {
.memfd => Readable{ .memfd = stdio.memfd },
.array_buffer => Readable{
.pipe = .{
.buffer = BufferedOutput.initWithPipeAndSlice(pipe, stdio.array_buffer.slice()),
.buffer = StreamingOutput.initWithPipeAndSlice(pipe, stdio.array_buffer.slice()),
},
},
};
}
pub fn init(stdio: Stdio, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32) Readable {
pub fn init(stdio: Stdio, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32, is_sync: bool) Readable {
if (comptime Environment.allow_assert) {
if (fd) |fd_| {
std.debug.assert(fd_ != bun.invalid_fd);
}
}
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore => Readable{ .ignore = {} },
.pipe => brk: {
if (is_sync) {}
break :brk .{
.pipe = .{
.buffer = BufferedOutput.initWithAllocator(allocator, fd.?, max_size),
.buffer = StreamingOutput.initWithAllocator(allocator, fd.?, max_size),
},
};
},
@@ -405,7 +409,7 @@ pub const Subprocess = struct {
.memfd => Readable{ .memfd = stdio.memfd },
.array_buffer => Readable{
.pipe = .{
.buffer = BufferedOutput.initWithSlice(fd.?, stdio.array_buffer.slice()),
.buffer = StreamingOutput.initWithSlice(fd.?, stdio.array_buffer.slice()),
},
},
};
@@ -499,6 +503,13 @@ pub const Subprocess = struct {
this.* = .{ .closed = {} };
return JSC.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis);
},
.sync_buffered_output => |*sync_buffered_output| {
const slice = sync_buffered_output.toOwnedSlice(globalThis);
this.* = .{ .closed = {} };
return JSC.MarkedArrayBuffer
.fromBytes(slice, bun.default_allocator, .Uint8Array)
.toNodeBuffer(globalThis);
},
.pipe => {
if (!Environment.isWindows) {
this.pipe.buffer.stream.close_on_empty_read = true;
@@ -819,6 +830,67 @@ pub const Subprocess = struct {
};
pub const BufferedOutput = struct {
reader: bun.io.BufferedOutputReader(BufferedOutput, null) = .{},
process: *Subprocess = undefined,
event_loop: *JSC.EventLoop = undefined,
ref_count: u32 = 1,
pub usingnamespace bun.NewRefCounted(@This(), deinit);
pub fn onOutputDone(this: *BufferedOutput) void {
this.process.onCloseIO(this.kind());
}
pub fn toOwnedSlice(this: *BufferedOutput) []u8 {
// we do not use .toOwnedSlice() because we don't want to reallocate memory.
const out = this.reader.buffer.items;
this.reader.buffer.items = &.{};
this.reader.buffer.capacity = 0;
return out;
}
pub fn onOutputError(this: *BufferedOutput, err: bun.sys.Error) void {
_ = this; // autofix
Output.panic("BufferedOutput should never error. If it does, it's a bug in the code.\n{}", .{err});
}
fn kind(this: *const BufferedOutput) StdioKind {
if (this.process.stdout == .sync_buffered_output and this.process.stdout.sync_buffered_output == this) {
// are we stdout?
return .stdout;
} else if (this.process.stderr == .sync_buffered_output and this.process.stderr.sync_buffered_output == this) {
// are we stderr?
return .stderr;
}
@panic("We should be either stdout or stderr");
}
pub fn close(this: *BufferedOutput) void {
if (!this.reader.is_done)
this.reader.close();
}
pub fn eventLoop(this: *BufferedOutput) *JSC.EventLoop {
return this.event_loop;
}
pub fn loop(this: *BufferedOutput) *uws.Loop {
return this.event_loop.virtual_machine.uwsLoop();
}
fn deinit(this: *BufferedOutput) void {
std.debug.assert(this.reader.is_done);
if (comptime Environment.isWindows) {
std.debug.assert(this.reader.pipe.isClosed());
}
this.destroy();
}
};
pub const StreamingOutput = struct {
internal_buffer: bun.ByteList = .{},
stream: FIFOType = undefined,
auto_sizer: ?JSC.WebCore.AutoSizer = null,
@@ -836,13 +908,13 @@ pub const Subprocess = struct {
err: bun.sys.Error,
};
pub fn init(fd: bun.FileDescriptor) BufferedOutput {
pub fn init(fd: bun.FileDescriptor) StreamingOutput {
if (Environment.isWindows) {
@compileError("Cannot use BufferedOutput with fd on Windows please use .initWithPipe");
}
std.debug.assert(fd != .zero and fd != bun.invalid_fd);
return BufferedOutput{
return StreamingOutput{
.internal_buffer = .{},
.stream = JSC.WebCore.FIFO{
.fd = fd,
@@ -850,18 +922,18 @@ pub const Subprocess = struct {
};
}
pub fn initWithPipe(pipe: *uv.Pipe) BufferedOutput {
pub fn initWithPipe(pipe: *uv.Pipe) StreamingOutput {
if (!Environment.isWindows) {
@compileError("uv.Pipe can only be used on Windows");
}
return BufferedOutput{ .internal_buffer = .{}, .stream = pipe };
return StreamingOutput{ .internal_buffer = .{}, .stream = pipe };
}
pub fn initWithSlice(fd: bun.FileDescriptor, slice: []u8) BufferedOutput {
pub fn initWithSlice(fd: bun.FileDescriptor, slice: []u8) StreamingOutput {
if (Environment.isWindows) {
@compileError("Cannot use BufferedOutput with fd on Windows please use .initWithPipeAndSlice");
}
return BufferedOutput{
return StreamingOutput{
// fixed capacity
.internal_buffer = bun.ByteList.initWithBuffer(slice),
.auto_sizer = null,
@@ -871,11 +943,11 @@ pub const Subprocess = struct {
};
}
pub fn initWithPipeAndSlice(pipe: *uv.Pipe, slice: []u8) BufferedOutput {
pub fn initWithPipeAndSlice(pipe: *uv.Pipe, slice: []u8) StreamingOutput {
if (!Environment.isWindows) {
@compileError("uv.Pipe can only be used on Window");
}
return BufferedOutput{
return StreamingOutput{
// fixed capacity
.internal_buffer = bun.ByteList.initWithBuffer(slice),
.auto_sizer = null,
@@ -883,7 +955,7 @@ pub const Subprocess = struct {
};
}
pub fn initWithAllocator(allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) BufferedOutput {
pub fn initWithAllocator(allocator: std.mem.Allocator, fd: bun.FileDescriptor, max_size: u32) StreamingOutput {
if (Environment.isWindows) {
@compileError("Cannot use BufferedOutput with fd on Windows please use .initWithPipeAndAllocator");
}
@@ -896,7 +968,7 @@ pub const Subprocess = struct {
return this;
}
pub fn initWithPipeAndAllocator(allocator: std.mem.Allocator, pipe: *uv.Pipe, max_size: u32) BufferedOutput {
pub fn initWithPipeAndAllocator(allocator: std.mem.Allocator, pipe: *uv.Pipe, max_size: u32) StreamingOutput {
if (!Environment.isWindows) {
@compileError("uv.Pipe can only be used on Window");
}
@@ -909,7 +981,7 @@ pub const Subprocess = struct {
return this;
}
pub fn onRead(this: *BufferedOutput, result: JSC.WebCore.StreamResult) void {
pub fn onRead(this: *StreamingOutput, result: JSC.WebCore.StreamResult) void {
if (Environment.isWindows) {
@compileError("uv.Pipe can only be used on Window");
}
@@ -948,7 +1020,7 @@ pub const Subprocess = struct {
}
fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: uv.ReturnCodeI64, _: *const uv.uv_buf_t) callconv(.C) void {
const this: *BufferedOutput = @ptrCast(@alignCast(handle.data));
const this: *StreamingOutput = @ptrCast(@alignCast(handle.data));
if (nread.int() == uv.UV_EOF) {
this.status = .{ .done = {} };
_ = uv.uv_read_stop(@ptrCast(handle));
@@ -968,7 +1040,7 @@ pub const Subprocess = struct {
}
fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void {
const this: *BufferedOutput = @ptrCast(@alignCast(handle.data));
const this: *StreamingOutput = @ptrCast(@alignCast(handle.data));
var size: usize = 0;
var available = this.internal_buffer.available();
if (this.auto_sizer) |auto_sizer| {
@@ -994,11 +1066,11 @@ pub const Subprocess = struct {
}
}
pub fn readAll(this: *BufferedOutput) void {
pub fn readAll(this: *StreamingOutput) void {
if (Environment.isWindows) {
if (this.status == .pending) {
this.stream.data = this;
_ = uv.uv_read_start(@ptrCast(this.stream), BufferedOutput.uvStreamAllocCallback, BufferedOutput.uvStreamReadCallback);
_ = uv.uv_read_start(@ptrCast(this.stream), StreamingOutput.uvStreamAllocCallback, StreamingOutput.uvStreamReadCallback);
}
return;
}
@@ -1079,25 +1151,25 @@ pub const Subprocess = struct {
}
}
fn watch(this: *BufferedOutput) void {
fn watch(this: *StreamingOutput) void {
if (Environment.isWindows) {
this.readAll();
} else {
std.debug.assert(this.stream.fd != bun.invalid_fd);
this.stream.pending.set(BufferedOutput, this, onRead);
this.stream.pending.set(StreamingOutput, this, onRead);
if (!this.stream.isWatching()) this.stream.watch(this.stream.fd);
}
return;
}
pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob {
pub fn toBlob(this: *StreamingOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob {
const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis);
this.internal_buffer = bun.ByteList.init("");
return blob;
}
pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult {
const this = bun.cast(*BufferedOutput, ctx);
const this = bun.cast(*StreamingOutput, ctx);
this.readAll();
const internal_buffer = this.internal_buffer;
this.internal_buffer = bun.ByteList.init("");
@@ -1110,7 +1182,7 @@ pub const Subprocess = struct {
};
}
fn signalStreamError(this: *BufferedOutput) void {
fn signalStreamError(this: *StreamingOutput) void {
if (this.status == .err) {
// if we are streaming update with error
if (this.readable_stream_ref.get()) |readable| {
@@ -1127,7 +1199,7 @@ pub const Subprocess = struct {
this.readable_stream_ref.deinit();
}
}
fn flushBufferedDataIntoReadableStream(this: *BufferedOutput) void {
fn flushBufferedDataIntoReadableStream(this: *StreamingOutput) void {
if (this.readable_stream_ref.get()) |readable| {
if (readable.ptr != .Bytes) return;
@@ -1159,13 +1231,13 @@ pub const Subprocess = struct {
}
fn onReadableStreamAvailable(ctx: *anyopaque, readable: JSC.WebCore.ReadableStream) void {
const this = bun.cast(*BufferedOutput, ctx);
const this = bun.cast(*StreamingOutput, ctx);
if (this.globalThis) |globalThis| {
this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, globalThis) catch .{};
}
}
fn toReadableStream(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream {
fn toReadableStream(this: *StreamingOutput, globalThis: *JSC.JSGlobalObject, exited: bool) JSC.WebCore.ReadableStream {
if (Environment.isWindows) {
if (this.readable_stream_ref.get()) |readable| {
return readable;
@@ -1208,8 +1280,8 @@ pub const Subprocess = struct {
.size_hint = 0,
.task = this,
.global = globalThis,
.onStartStreaming = BufferedOutput.onStartStreamingRequestBodyCallback,
.onReadableStreamAvailable = BufferedOutput.onReadableStreamAvailable,
.onStartStreaming = StreamingOutput.onStartStreamingRequestBodyCallback,
.onReadableStreamAvailable = StreamingOutput.onReadableStreamAvailable,
},
};
return JSC.WebCore.ReadableStream.fromJS(body.toReadableStream(globalThis), globalThis).?;
@@ -1238,12 +1310,12 @@ pub const Subprocess = struct {
fn uvClosedCallback(handler: *anyopaque) callconv(.C) void {
const event = bun.cast(*uv.Pipe, handler);
var this = bun.cast(*BufferedOutput, event.data);
var this = bun.cast(*StreamingOutput, event.data);
this.readable_stream_ref.deinit();
this.closeCallback.run();
}
pub fn close(this: *BufferedOutput) void {
pub fn close(this: *StreamingOutput) void {
switch (this.status) {
.done => {},
.pending => {
@@ -1341,6 +1413,10 @@ pub const Subprocess = struct {
return Writable{ .pipe = sink };
},
.sync_buffered_output => |buffer| {
_ = buffer; // autofix
@panic("This should never be called");
},
.array_buffer, .blob => {
var buffered_input: BufferedInput = .{ .fd = fd.?, .source = undefined };
switch (stdio) {
@@ -1957,6 +2033,35 @@ pub const Subprocess = struct {
return .zero;
};
if (comptime is_sync) {
if (stdio[1] == .pipe and stdio[1].pipe == null) {
stdio[1] = .{ .sync_buffered_output = BufferedOutput.new(.{}) };
}
if (stdio[2] == .pipe and stdio[2].pipe == null) {
stdio[2] = .{ .sync_buffered_output = BufferedOutput.new(.{}) };
}
} else {
if (stdio[1] == .pipe and stdio[1].pipe == null) {
stdio[1] = .{ .buffer = {} };
}
if (stdio[2] == .pipe and stdio[2].pipe == null) {
stdio[2] = .{ .buffer = {} };
}
}
defer {
if (comptime is_sync) {
if (stdio[1] == .sync_buffered_output) {
stdio[1].sync_buffered_output.deref();
}
if (stdio[2] == .sync_buffered_output) {
stdio[2].sync_buffered_output.deref();
}
}
}
const spawn_options = bun.spawn.SpawnOptions{
.cwd = cwd,
.detached = detached,
@@ -1964,6 +2069,11 @@ pub const Subprocess = struct {
.stdout = stdio[1].asSpawnOption(),
.stderr = stdio[2].asSpawnOption(),
.extra_fds = extra_fds.items,
.windows = if (Environment.isWindows) bun.spawn.WindowsSpawnOptions.WindowsOptions{
.hide_window = windows_hide,
.loop = jsc_vm.eventLoop().uws_loop,
} else {},
};
var spawned = switch (bun.spawn.spawnProcess(
@@ -2014,8 +2124,8 @@ pub const Subprocess = struct {
globalThis.throwOutOfMemory();
return .zero;
},
.stdout = Readable.init(stdio[1], spawned.stdout, jsc_vm.allocator, default_max_buffer_size),
.stderr = Readable.init(stdio[2], spawned.stderr, jsc_vm.allocator, default_max_buffer_size),
.stdout = Readable.init(stdio[1], spawned.stdout, jsc_vm.allocator, default_max_buffer_size, is_sync),
.stderr = Readable.init(stdio[2], spawned.stderr, jsc_vm.allocator, default_max_buffer_size, is_sync),
.stdio_pipes = spawned.extra_pipes.moveToUnmanaged(),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.ipc_mode = ipc_mode,
@@ -2149,6 +2259,7 @@ pub const Subprocess = struct {
pipe: ?JSC.WebCore.ReadableStream,
array_buffer: JSC.ArrayBuffer.Strong,
memfd: bun.FileDescriptor,
sync_buffered_output: *BufferedOutput,
const PipeExtra = struct {
fd: i32,
@@ -2261,6 +2372,7 @@ pub const Subprocess = struct {
.path => |pathlike| .{ .path = pathlike.slice() },
.inherit => .{ .inherit = {} },
.ignore => .{ .ignore = {} },
.sync_buffer => .{ .buffer = &stdio.sync_buffer.reader.pipe },
.memfd => @panic("This should never happen"),
};

View File

@@ -409,6 +409,7 @@ pub const ArrayBuffer = extern struct {
return Bun__createUint8ArrayForCopy(globalThis, bytes.ptr, bytes.len, true);
}
extern "C" fn Bun__createUint8ArrayForCopy(*JSC.JSGlobalObject, ptr: ?*const anyopaque, len: usize, buffer: bool) JSValue;
extern "C" fn Bun__createArrayBufferForCopy(*JSC.JSGlobalObject, ptr: ?*const anyopaque, len: usize) JSValue;

View File

@@ -4154,7 +4154,7 @@ pub fn NewFIFO(comptime EventLoop: JSC.EventLoopKind) type {
return struct {
buf: []u8 = &[_]u8{},
view: JSC.Strong = .{},
poll_ref: ?*Async.FilePoll = null,
fd: bun.FileDescriptor = bun.invalid_fd,
to_read: ?u32 = null,
close_on_empty_read: bool = false,
@@ -4165,7 +4165,6 @@ pub fn NewFIFO(comptime EventLoop: JSC.EventLoopKind) type {
.result = .{ .done = {} },
},
signal: JSC.WebCore.Signal = .{},
is_first_read: bool = true,
has_adjusted_pipe_size_on_linux: bool = false,
drained: bool = true,

View File

@@ -35,137 +35,15 @@ pub const LifecycleScriptSubprocess = struct {
const uv = bun.windows.libuv;
const PosixOutputReader = struct {
poll: *Async.FilePoll = undefined,
buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
is_done: bool = false,
pub const OutputReader = bun.io.BufferedOutputReader(LifecycleScriptSubprocess, null);
// This is a workaround for "Dependency loop detected"
parent: *LifecycleScriptSubprocess = undefined,
pub fn loop(this: *const LifecycleScriptSubprocess) *bun.uws.Loop {
return this.manager.event_loop.loop();
}
pub usingnamespace bun.io.PipeReader(
@This(),
getFd,
getBuffer,
null,
registerPoll,
done,
onError,
);
pub fn getFd(this: *PosixOutputReader) bun.FileDescriptor {
return this.poll.fd;
}
pub fn getBuffer(this: *PosixOutputReader) *std.ArrayList(u8) {
return &this.buffer;
}
fn finish(this: *PosixOutputReader) void {
this.poll.flags.insert(.ignore_updates);
this.subprocess().manager.event_loop.putFilePoll(this.poll);
std.debug.assert(!this.is_done);
this.is_done = true;
}
pub fn done(this: *PosixOutputReader) void {
this.finish();
this.subprocess().onOutputDone();
}
pub fn onError(this: *PosixOutputReader, err: bun.sys.Error) void {
this.finish();
this.subprocess().onOutputError(err);
}
pub fn registerPoll(this: *PosixOutputReader) void {
switch (this.poll.register(this.subprocess().manager.event_loop.loop(), .readable, true)) {
.err => |err| {
Output.prettyErrorln("<r><red>error<r>: Failed to register poll for <b>{s}<r> script output from \"<b>{s}<r>\" due to error <b>{d} {s}<r>", .{
this.subprocess().scriptName(),
this.subprocess().package_name,
err.errno,
@tagName(err.getErrno()),
});
},
.result => {},
}
}
pub inline fn subprocess(this: *PosixOutputReader) *LifecycleScriptSubprocess {
return this.parent;
}
pub fn start(this: *PosixOutputReader) JSC.Maybe(void) {
const maybe = this.poll.register(this.subprocess().manager.event_loop.loop(), .readable, true);
if (maybe != .result) {
return maybe;
}
this.read();
return .{
.result = {},
};
}
};
const WindowsOutputReader = struct {
pipe: uv.Pipe = std.mem.zeroes(uv.Pipe),
buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
is_done: bool = false,
// This is a workaround for "Dependency loop detected"
parent: *LifecycleScriptSubprocess = undefined,
pub usingnamespace bun.io.PipeReader(
@This(),
{},
getBuffer,
null,
null,
done,
onError,
);
pub fn getBuffer(this: *WindowsOutputReader) *std.ArrayList(u8) {
return &this.buffer;
}
fn finish(this: *WindowsOutputReader) void {
std.debug.assert(!this.is_done);
this.is_done = true;
}
pub fn done(this: *WindowsOutputReader) void {
std.debug.assert(this.pipe.isClosed());
this.finish();
this.subprocess().onOutputDone();
}
pub fn onError(this: *WindowsOutputReader, err: bun.sys.Error) void {
this.finish();
this.subprocess().onOutputError(err);
}
pub inline fn subprocess(this: *WindowsOutputReader) *LifecycleScriptSubprocess {
return this.parent;
}
pub fn getReadBufferWithStableMemoryAddress(this: *WindowsOutputReader, suggested_size: usize) []u8 {
this.buffer.ensureUnusedCapacity(suggested_size) catch bun.outOfMemory();
return this.buffer.allocatedSlice()[this.buffer.items.len..];
}
pub fn start(this: *WindowsOutputReader) JSC.Maybe(void) {
this.buffer.clearRetainingCapacity();
this.is_done = false;
return this.startReading();
}
};
pub const OutputReader = if (Environment.isPosix) PosixOutputReader else WindowsOutputReader;
pub fn eventLoop(this: *const LifecycleScriptSubprocess) *JSC.AnyEventLoop {
return &this.manager.event_loop;
}
pub fn scriptName(this: *const LifecycleScriptSubprocess) []const u8 {
std.debug.assert(this.current_script_index < Lockfile.Scripts.names.len);

View File

@@ -45,7 +45,7 @@ pub fn PosixPipeReader(
readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint);
}
const stack_buffer_len = 16384;
const stack_buffer_len = 64 * 1024;
fn readFromBlockingPipeWithoutBlocking(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize) void {
if (size_hint > stack_buffer_len) {
@@ -65,7 +65,7 @@ pub fn PosixPipeReader(
switch (bun.sys.read(fd, buffer)) {
.result => |bytes_read| {
if (bytes_read == 0) {
vtable.done(parent);
parent.close();
return;
}
@@ -113,15 +113,19 @@ pub fn PosixPipeReader(
}
}
}
pub fn close(this: *This) void {
_ = bun.sys.close(getFd(this));
this.poll.deinit();
vtable.done(this);
}
};
}
const uv = bun.windows.libuv;
pub fn WindowsPipeReader(
comptime This: type,
// Originally this was the comptime vtable struct like the below
// But that caused a Zig compiler segfault as of 0.12.0-dev.1604+caae40c21
comptime getFd: anytype,
comptime _: anytype,
comptime getBuffer: fn (*This) *std.ArrayList(u8),
comptime onReadChunk: ?fn (*This, chunk: []u8) void,
comptime registerPoll: ?fn (*This) void,
@@ -132,9 +136,7 @@ pub fn WindowsPipeReader(
pub usingnamespace uv.StreamReaderMixin(This, .pipe);
const vtable = .{
.getFd = getFd,
.getBuffer = getBuffer,
.onReadChunk = onReadChunk,
.registerPoll = registerPoll,
.done = done,
.onError = onError,
@@ -173,13 +175,13 @@ pub fn WindowsPipeReader(
return;
}
var buffer = getBuffer(this);
if (amount.result == 0) {
close(this);
return;
}
var buffer = getBuffer(this);
if (comptime bun.Environment.allow_assert) {
if (!bun.isSliceInBuffer(buf.slice()[0..amount.result], buffer.allocatedSlice())) {
@panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it.");
@@ -201,3 +203,176 @@ pub fn WindowsPipeReader(
}
pub const PipeReader = if (bun.Environment.isWindows) WindowsPipeReader else PosixPipeReader;
const Async = bun.Async;
pub fn PosixBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void) type {
return struct {
poll: *Async.FilePoll = undefined,
buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
is_done: bool = false,
parent: *Parent = undefined,
const PosixOutputReader = @This();
pub fn setParent(this: *@This(), parent: *Parent) void {
this.parent = parent;
if (!this.is_done) {
this.poll.owner = Async.FilePoll.Owner.init(this);
}
}
pub usingnamespace PosixPipeReader(
@This(),
getFd,
getBuffer,
if (onReadChunk != null) _onReadChunk else null,
registerPoll,
done,
onError,
);
fn _onReadChunk(this: *PosixOutputReader, chunk: []u8) void {
onReadChunk.?(this.parent, chunk);
}
pub fn getFd(this: *PosixOutputReader) bun.FileDescriptor {
return this.poll.fd;
}
pub fn getBuffer(this: *PosixOutputReader) *std.ArrayList(u8) {
return &this.buffer;
}
pub fn ref(this: *@This(), event_loop_ctx: anytype) void {
this.poll.ref(event_loop_ctx);
}
pub fn unref(this: *@This(), event_loop_ctx: anytype) void {
this.poll.unref(event_loop_ctx);
}
fn finish(this: *PosixOutputReader) void {
this.poll.flags.insert(.ignore_updates);
this.parent.eventLoop().putFilePoll(this.poll);
std.debug.assert(!this.is_done);
this.is_done = true;
}
pub fn done(this: *PosixOutputReader) void {
this.finish();
this.parent.onOutputDone();
}
pub fn deinit(this: *PosixOutputReader) void {
this.buffer.deinit();
this.poll.deinit();
}
pub fn onError(this: *PosixOutputReader, err: bun.sys.Error) void {
this.finish();
this.parent.onOutputError(err);
}
pub fn registerPoll(this: *PosixOutputReader) void {
switch (this.poll.register(this.parent.loop(), .readable, true)) {
.err => |err| {
this.onError(err);
},
.result => {},
}
}
pub fn start(this: *PosixOutputReader) bun.JSC.Maybe(void) {
const maybe = this.poll.register(this.parent.loop(), .readable, true);
if (maybe != .result) {
return maybe;
}
this.read();
return .{
.result = {},
};
}
};
}
const JSC = bun.JSC;
fn WindowsBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?*const fn (*anyopaque, buf: []u8) void) type {
return struct {
pipe: uv.Pipe = std.mem.zeroes(uv.Pipe),
buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
is_done: bool = false,
parent: *Parent = undefined,
const WindowsOutputReader = @This();
pub fn setParent(this: *@This(), parent: *Parent) void {
this.parent = parent;
if (!this.is_done) {
this.pipe.data = this;
}
}
pub fn ref(this: *@This()) void {
this.pipe.ref();
}
pub fn unref(this: *@This()) void {
this.pipe.unref();
}
pub usingnamespace WindowsPipeReader(
@This(),
{},
getBuffer,
if (onReadChunk != null) _onReadChunk else null,
null,
done,
onError,
);
pub fn getBuffer(this: *WindowsOutputReader) *std.ArrayList(u8) {
return &this.buffer;
}
fn _onReadChunk(this: *WindowsOutputReader, buf: []u8) void {
onReadChunk.?(this.parent, buf);
}
fn finish(this: *WindowsOutputReader) void {
std.debug.assert(!this.is_done);
this.is_done = true;
}
pub fn done(this: *WindowsOutputReader) void {
std.debug.assert(this.pipe.isClosed());
this.finish();
this.parent.onOutputDone();
}
pub fn onError(this: *WindowsOutputReader, err: bun.sys.Error) void {
this.finish();
this.parent.onOutputError(err);
}
pub fn getReadBufferWithStableMemoryAddress(this: *WindowsOutputReader, suggested_size: usize) []u8 {
this.buffer.ensureUnusedCapacity(suggested_size) catch bun.outOfMemory();
return this.buffer.allocatedSlice()[this.buffer.items.len..];
}
pub fn start(this: *WindowsOutputReader) JSC.Maybe(void) {
this.buffer.clearRetainingCapacity();
this.is_done = false;
return this.startReading();
}
pub fn deinit(this: *WindowsOutputReader) void {
this.buffer.deinit();
std.debug.assert(this.pipe.isClosed());
}
};
}
pub const BufferedOutputReader = if (bun.Environment.isPosix) PosixBufferedOutputReader else WindowsBufferedOutputReader;

View File

@@ -927,3 +927,4 @@ pub const Poll = struct {
pub const retry = bun.C.E.AGAIN;
pub const PipeReader = @import("./PipeReader.zig").PipeReader;
pub const BufferedOutputReader = @import("./PipeReader.zig").BufferedOutputReader;

View File

@@ -1279,7 +1279,7 @@ pub fn NewShellSubprocess(comptime EventLoopKind: JSC.EventLoopKind, comptime Sh
}
pub fn wait(this: *@This(), sync: bool) void {
return this.process.wait(sync);
return this.process.waitPosix(sync);
}
pub fn onProcessExit(this: *@This(), _: *Process, status: bun.spawn.Status, _: *const bun.spawn.Rusage) void {