diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index e1fc271a79..9034d64489 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -822,7 +822,7 @@ pub const Subprocess = struct { .stdio_result = result, }); if (Environment.isWindows) { - this.reader.pipe = this.stdio_result.buffer; + this.reader.source = .{ .pipe = this.stdio_result.buffer }; } this.reader.setParent(this); return this; @@ -956,7 +956,7 @@ pub const Subprocess = struct { } if (comptime Environment.isWindows) { - std.debug.assert(this.reader.pipe == null or this.reader.pipe.?.isClosed()); + std.debug.assert(this.reader.source == null or this.reader.source.?.isClosed()); } if (this.state == .done) { diff --git a/src/install/lifecycle_script_runner.zig b/src/install/lifecycle_script_runner.zig index 1e93321ee3..9726763298 100644 --- a/src/install/lifecycle_script_runner.zig +++ b/src/install/lifecycle_script_runner.zig @@ -131,8 +131,8 @@ pub const LifecycleScriptSubprocess = struct { null, }; if (Environment.isWindows) { - this.stdout.pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory(); - this.stderr.pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory(); + this.stdout.source = .{ .pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() }; + this.stderr.source = .{ .pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() }; } const spawn_options = bun.spawn.SpawnOptions{ .stdin = .ignore, @@ -142,7 +142,7 @@ pub const LifecycleScriptSubprocess = struct { .buffer else .{ - .buffer = this.stdout.pipe.?, + .buffer = this.stdout.source.?.pipe, }, .stderr = if (this.manager.options.log_level.isVerbose()) .inherit @@ -150,7 +150,7 @@ pub const LifecycleScriptSubprocess = struct { .buffer else .{ - .buffer = this.stderr.pipe.?, + .buffer = this.stderr.source.?.pipe, }, .cwd = cwd, diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 6cbb9b9979..e3177faa42 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -318,7 +318,91 @@ pub fn WindowsPipeReader( comptime onError: fn (*This, bun.sys.Error) void, ) type { return struct { - pub usingnamespace uv.StreamReaderMixin(This, .pipe); + // pub usingnamespace uv.StreamReaderMixin(This, .pipe); + + fn uv_alloc_cb(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void { + var this = bun.cast(*This, handle.data); + const result = this.getReadBufferWithStableMemoryAddress(suggested_size); + buf.* = uv.uv_buf_t.init(result); + } + + fn uv_stream_read_cb(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void { + var this = bun.cast(*This, stream.data); + + const nread_int = nread.int(); + + switch (nread_int) { + 0 => { + // EAGAIN or EWOULDBLOCK or canceled + return this.onRead(.{ .result = 0 }, buf, .drained); + }, + uv.UV_EOF => { + // EOF + return this.onRead(.{ .result = 0 }, buf, .eof); + }, + else => { + this.onRead(if (nread.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress); + }, + } + } + + fn uv_file_read_cb(fs: *uv.fs_t) callconv(.C) void { + var this: *This = bun.cast(*This, fs.data); + + const nread_int = fs.result.int(); + const buf = &this.*.source.?.file.iov; + + switch (nread_int) { + 0, uv.UV_ECANCELED => + // EAGAIN or EWOULDBLOCK or canceled + this.onRead(.{ .result = 0 }, buf, .drained), + uv.UV_EOF => + // EOF + this.onRead(.{ .result = 0 }, buf, .eof), + else => this.onRead(if (fs.result.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress), + } + uv.uv_fs_req_cleanup(fs); + } + + pub fn startReading(this: *This) bun.JSC.Maybe(void) { + const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.BADF, .read) }; + + switch (source) { + .file => |file| { + if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, uv_file_read_cb).toError(.write)) |err| { + return .{ .err = err }; + } + }, + else => { + if (uv.uv_read_start(source.toStream(), &uv_alloc_cb, @ptrCast(&uv_stream_read_cb)).toError(.open)) |err| { + return .{ .err = err }; + } + }, + } + + return .{ .result = {} }; + } + + pub fn stopReading(this: *This) bun.JSC.Maybe(void) { + const source = this.source orelse return .{ .result = {} }; + switch (source) { + .file => |file| { + _ = uv.uv_cancel(@ptrCast(&file.fs)); + }, + else => { + // can be safely ignored as per libuv documentation + _ = uv.uv_read_stop(source.toStream()); + }, + } + return .{ .result = {} }; + } + + pub fn close(this: *This) void { + _ = this.stopReading(); + if (this.source) |source| { + source.getHandle().close(onCloseSource); + } + } const vtable = .{ .getBuffer = getBuffer, @@ -327,37 +411,12 @@ pub fn WindowsPipeReader( .onError = onError, }; - fn _pipe(this: *This) ?*uv.Pipe { - switch (@TypeOf(this.pipe)) { - ?*uv.Pipe, *uv.Pipe => return this.pipe, - uv.Pipe => return &this.pipe, - else => @compileError("StreamReaderMixin only works with Pipe, *Pipe or ?*Pipe"), - } - } - - pub fn open(this: *This, loop: *uv.Loop, fd: bun.FileDescriptor, ipc: bool) bun.JSC.Maybe(void) { - const pipe = _pipe(this) orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe) }; - switch (pipe.init(loop, ipc)) { - .err => |err| { - return .{ .err = err }; - }, + fn onCloseSource(handle: *uv.Handle) callconv(.C) void { + const this = bun.cast(*This, handle.data); + switch (this.source.?) { + .file => |file| uv.uv_fs_req_cleanup(&file.fs), else => {}, } - - pipe.data = this; - - switch (pipe.open(bun.uvfdcast(fd))) { - .err => |err| { - return .{ .err = err }; - }, - else => {}, - } - - return .{ .result = {} }; - } - - fn onClosePipe(pipe: *uv.Pipe) callconv(.C) void { - const this = bun.cast(*This, pipe.data); done(this); } @@ -397,22 +456,13 @@ pub fn WindowsPipeReader( } pub fn unpause(this: *This) void { - const pipe = this._pipe() orelse return; - if (!pipe.isActive()) { - this.startReading().unwrap() catch {}; - } + _ = this.startReading(); } pub fn read(this: *This) void { // we cannot sync read pipes on Windows so we just check if we are paused to resume the reading this.unpause(); } - - pub fn close(this: *This) void { - const pipe = this._pipe() orelse return; - this.stopReading().unwrap() catch unreachable; - pipe.close(&onClosePipe); - } }; } @@ -733,7 +783,7 @@ const WindowsOutputReaderVTable = struct { pub const WindowsBufferedReader = struct { /// The pointer to this pipe must be stable. /// It cannot change because we don't know what libuv will do with it. - pipe: ?*uv.Pipe = null, + source: ?Source = null, _buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), // for compatibility with Linux flags: Flags = .{}, @@ -769,22 +819,22 @@ pub const WindowsBufferedReader = struct { } pub fn from(to: *WindowsOutputReader, other: anytype, parent: anytype) void { - std.debug.assert(other.pipe != null and to.pipe == null); + std.debug.assert(other.source != null and to.source == null); to.* = .{ .vtable = to.vtable, .flags = other.flags, ._buffer = other.buffer().*, .has_inflight_read = other.has_inflight_read, - .pipe = other.pipe, + .source = other.source, }; other.flags.is_done = true; - other.pipe = null; + other.source = null; to.setParent(parent); } pub fn getFd(this: *const WindowsOutputReader) bun.FileDescriptor { - const pipe = this.pipe orelse return bun.invalid_fd; - return pipe.fd(); + const source = this.source orelse return bun.invalid_fd; + return source.getFd(); } pub fn watch(_: *WindowsOutputReader) void { @@ -794,18 +844,18 @@ pub const WindowsBufferedReader = struct { pub fn setParent(this: *WindowsOutputReader, parent: anytype) void { this.parent = parent; if (!this.flags.is_done) { - if (this.pipe) |pipe| { - pipe.data = this; + if (this.source) |source| { + source.setData(this); } } } pub fn updateRef(this: *WindowsOutputReader, value: bool) void { - if (this.pipe) |pipe| { + if (this.source) |source| { if (value) { - pipe.ref(); + source.ref(); } else { - pipe.unref(); + source.unref(); } } } @@ -833,8 +883,8 @@ pub const WindowsBufferedReader = struct { } pub fn hasPendingActivity(this: *const WindowsOutputReader) bool { - const pipe = this.pipe orelse return false; - return pipe.isClosed(); + const source = this.source orelse return false; + return source.isClosed(); } pub fn hasPendingRead(this: *const WindowsOutputReader) bool { @@ -858,7 +908,7 @@ pub const WindowsBufferedReader = struct { } pub fn done(this: *WindowsOutputReader) void { - std.debug.assert(this.pipe == null or this.pipe.?.isClosed()); + std.debug.assert(if (this.source) |source| source.isClosed() else true); this.finish(); @@ -877,7 +927,7 @@ pub const WindowsBufferedReader = struct { } pub fn startWithCurrentPipe(this: *WindowsOutputReader) bun.JSC.Maybe(void) { - std.debug.assert(this.pipe != null); + std.debug.assert(this.source != null); this.buffer().clearRetainingCapacity(); this.flags.is_done = false; @@ -886,25 +936,30 @@ pub const WindowsBufferedReader = struct { } pub fn startWithPipe(this: *WindowsOutputReader, pipe: *uv.Pipe) bun.JSC.Maybe(void) { - std.debug.assert(this.pipe == null); - this.pipe = pipe; + std.debug.assert(this.source == null); + this.source = .{ .pipe = pipe }; return this.startWithCurrentPipe(); } pub fn start(this: *WindowsOutputReader, fd: bun.FileDescriptor, _: bool) bun.JSC.Maybe(void) { - //TODO: check detect if its a tty here and use uv_tty_t instead of pipe - std.debug.assert(this.pipe == null); - this.pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory(); - if (this.open(uv.Loop.get(), fd, false).asErr()) |err| return .{ .err = err }; + std.debug.assert(this.source == null); + const source = switch (Source.open(uv.Loop.get(), fd)) { + .err => |err| return .{ .err = err }, + .result => |source| source, + }; + source.setData(this); + this.source = source; return this.startWithCurrentPipe(); } pub fn deinit(this: *WindowsOutputReader) void { this.buffer().deinit(); - var pipe = this.pipe orelse return; - std.debug.assert(pipe.isClosed()); - this.pipe = null; - bun.default_allocator.destroy(pipe); + const source = this.source orelse return; + std.debug.assert(source.isClosed()); + this.source = null; + switch (source) { + inline else => |ptr| bun.default_allocator.destroy(ptr), + } } }; diff --git a/src/io/source.zig b/src/io/source.zig index 9ea0e70df5..c5d36058a3 100644 --- a/src/io/source.zig +++ b/src/io/source.zig @@ -80,6 +80,22 @@ pub const Source = union(enum) { } } + pub fn isClosed(this: Source) bool { + switch (this) { + .pipe => |pipe| return pipe.isClosed(), + .tty => |tty| return tty.isClosed(), + .file => |file| return file.file == -1, + } + } + + pub fn isActive(this: Source) bool { + switch (this) { + .pipe => |pipe| return pipe.isActive(), + .tty => |tty| return tty.isActive(), + .file => return false, + } + } + pub fn openPipe(loop: *uv.Loop, fd: bun.FileDescriptor, ipc: bool) bun.JSC.Maybe(*Source.Pipe) { log("openPipe (fd = {})", .{fd}); const pipe = bun.default_allocator.create(Source.Pipe) catch bun.outOfMemory();