From 0845231a1e4afe0b2f528bb3c0ea767a4f2e4d3e Mon Sep 17 00:00:00 2001 From: Zack Radisic <56137411+zackradisic@users.noreply.github.com> Date: Thu, 14 Aug 2025 18:03:26 -0700 Subject: [PATCH] Fix pipeline stack errors on Windows (#21800) ### What does this PR do? ### How did you verify your code works? --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- src/bun.js/api/bun/spawn/stdio.zig | 2 +- src/bun.zig | 1 + src/fd.zig | 78 +++++++++++++++++++++++- src/io/PipeWriter.zig | 25 +++++++- src/shell/IO.zig | 16 ++++- src/shell/IOReader.zig | 1 + src/shell/IOWriter.zig | 30 ++++++--- src/shell/Yield.zig | 3 +- src/shell/states/Pipeline.zig | 14 ++--- src/sys.zig | 14 ++++- test/js/bun/shell/pipeline_stack.test.ts | 2 +- 11 files changed, 161 insertions(+), 25 deletions(-) diff --git a/src/bun.js/api/bun/spawn/stdio.zig b/src/bun.js/api/bun/spawn/stdio.zig index 291c800f44..e11696cd2c 100644 --- a/src/bun.js/api/bun/spawn/stdio.zig +++ b/src/bun.js/api/bun/spawn/stdio.zig @@ -1,6 +1,6 @@ pub const Stdio = union(enum) { inherit, - capture: struct { fd: bun.FileDescriptor, buf: *bun.ByteList }, + capture: struct { buf: *bun.ByteList }, ignore, fd: bun.FileDescriptor, dup2: struct { diff --git a/src/bun.zig b/src/bun.zig index fa2654c779..2f063d380c 100644 --- a/src/bun.zig +++ b/src/bun.zig @@ -190,6 +190,7 @@ pub const Output = @import("./output.zig"); pub const Global = @import("./Global.zig"); pub const FD = @import("./fd.zig").FD; +pub const MovableIfWindowsFd = @import("./fd.zig").MovableIfWindowsFd; /// Deprecated: Use `FD` instead. pub const FileDescriptor = FD; diff --git a/src/fd.zig b/src/fd.zig index f32750007a..f92459eba9 100644 --- a/src/fd.zig +++ b/src/fd.zig @@ -225,7 +225,8 @@ pub const FD = packed struct(backing_int) { /// In debug, fd assertion failure can print where the FD was actually /// closed. pub fn close(fd: FD) void { - bun.debugAssert(fd.closeAllowingBadFileDescriptor(@returnAddress()) == null); // use after close! + const err = fd.closeAllowingBadFileDescriptor(@returnAddress()); + bun.debugAssert(err == null); // use after close! } /// fd function will NOT CLOSE stdin/stdout/stderr. @@ -652,6 +653,81 @@ pub fn uv_open_osfhandle(in: libuv.uv_os_fd_t) error{SystemFdQuotaExceeded}!c_in return out; } +/// On Windows we use libuv and often pass file descriptors to functions +/// like `uv_pipe_open`, `uv_tty_init`. +/// +/// But `uv_pipe` and `uv_tty` **take ownership of the file descriptor**. +/// +/// This can easily cause use-after-frees, double closing the FD, etc. +/// +/// So this type represents an FD that could possibly be moved to libuv. +/// +/// Note that on Posix, this is just a wrapper over FD and does nothing. +pub const MovableIfWindowsFd = union(enum) { + const Self = @This(); + + _inner: if (bun.Environment.isWindows) ?FD else FD, + + pub fn init(fd: FD) Self { + return .{ ._inner = fd }; + } + + pub fn get(self: *const Self) ?FD { + return self._inner; + } + + pub fn getPosix(self: *const Self) FD { + if (comptime bun.Environment.isWindows) + @compileError("MovableIfWindowsFd.getPosix is not available on Windows"); + + return self._inner; + } + + pub fn close(self: *Self) void { + if (comptime bun.Environment.isPosix) { + self._inner.close(); + self._inner = FD.invalid; + return; + } + if (self._inner) |fd| { + fd.close(); + self._inner = null; + } + } + + pub fn isValid(self: *const Self) bool { + if (comptime bun.Environment.isPosix) return self._inner.isValid(); + return self._inner != null and self._inner.?.isValid(); + } + + pub fn isOwned(self: *const Self) bool { + if (comptime bun.Environment.isPosix) return true; + return self._inner != null; + } + + /// Takes the FD, leaving `self` in a "moved-from" state. Only available on Windows. + pub fn take(self: *Self) ?FD { + if (comptime bun.Environment.isPosix) { + @compileError("MovableIfWindowsFd.take is not available on Posix"); + } + const result = self._inner; + self._inner = null; + return result; + } + + pub fn format(self: *const Self, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { + if (comptime bun.Environment.isPosix) { + try writer.print("{}", .{self.get().?}); + return; + } + if (self._inner) |fd| { + try writer.print("{}", .{fd}); + return; + } + try writer.print("[moved]", .{}); + } +}; + pub var windows_cached_fd_set: if (Environment.isDebug) bool else void = if (Environment.isDebug) false; pub var windows_cached_stdin: FD = undefined; pub var windows_cached_stdout: FD = undefined; diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index 7511b5bb85..b172cf133d 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -339,7 +339,13 @@ pub fn PosixBufferedWriter(Parent: type, function_table: anytype) type { } } - pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, pollable: bool) bun.sys.Maybe(void) { + pub fn start(this: *PosixWriter, rawfd: anytype, pollable: bool) bun.sys.Maybe(void) { + const FDType = @TypeOf(rawfd); + const fd = switch (FDType) { + bun.FileDescriptor => rawfd, + *bun.MovableIfWindowsFd, bun.MovableIfWindowsFd => rawfd.getPosix(), + else => @compileError("Expected `bun.FileDescriptor`, `*bun.MovableIfWindowsFd` or `bun.MovableIfWindowsFd` but got: " ++ @typeName(rawfd)), + }; this.pollable = pollable; if (!pollable) { bun.assert(this.handle != .poll); @@ -888,12 +894,27 @@ fn BaseWindowsPipeWriter( return this.startWithCurrentPipe(); } - pub fn start(this: *WindowsPipeWriter, fd: bun.FileDescriptor, _: bool) bun.sys.Maybe(void) { + pub fn start(this: *WindowsPipeWriter, rawfd: anytype, _: bool) bun.sys.Maybe(void) { + const FDType = @TypeOf(rawfd); + const fd = switch (FDType) { + bun.FileDescriptor => rawfd, + *bun.MovableIfWindowsFd => rawfd.get().?, + else => @compileError("Expected `bun.FileDescriptor` or `*bun.MovableIfWindowsFd` but got: " ++ @typeName(rawfd)), + }; bun.assert(this.source == null); const source = switch (Source.open(uv.Loop.get(), fd)) { .result => |source| source, .err => |err| return .{ .err = err }, }; + // Creating a uv_pipe/uv_tty takes ownership of the file descriptor + // TODO: Change the type of the parameter and update all places to + // use MovableFD + if (switch (source) { + .pipe, .tty => true, + else => false, + } and FDType == *bun.MovableIfWindowsFd) { + _ = rawfd.take(); + } source.setData(this); this.source = source; this.setParent(this.parent); diff --git a/src/shell/IO.zig b/src/shell/IO.zig index c0d141b23f..556ed894ad 100644 --- a/src/shell/IO.zig +++ b/src/shell/IO.zig @@ -138,7 +138,21 @@ pub const OutKind = union(enum) { return switch (this) { .fd => |val| brk: { shellio.* = val.writer.refSelf(); - break :brk if (val.captured) |cap| .{ .capture = .{ .buf = cap, .fd = val.writer.fd } } else .{ .fd = val.writer.fd }; + break :brk if (val.captured) |cap| .{ + .capture = .{ + .buf = cap, + }, + } else .{ + // Windows notes: + // Since `val.writer.fd` is `MovableFD`, it could + // technically be moved to libuv for ownership. + // + // But since this file descriptor never going to be touched by this + // process, except to hand off to the subprocess when we + // spawn it, we don't really care if the file descriptor + // ends up being invalid. + .fd = val.writer.fd.get().?, + }; }, .pipe => .pipe, .ignore => .ignore, diff --git a/src/shell/IOReader.zig b/src/shell/IOReader.zig index 95050534d3..4049e49075 100644 --- a/src/shell/IOReader.zig +++ b/src/shell/IOReader.zig @@ -159,6 +159,7 @@ pub fn onReadChunk(ptr: *anyopaque, chunk: []const u8, has_more: bun.io.ReadStat } pub fn onReaderError(this: *IOReader, err: bun.sys.Error) void { + log("IOReader(0x{x}.onReaderError({err}) ", .{ @intFromPtr(this), err }); this.setReading(false); this.err = err.toShellSystemError(); for (this.readers.slice()) |r| { diff --git a/src/shell/IOWriter.zig b/src/shell/IOWriter.zig index 43f14e8843..dda47780a3 100644 --- a/src/shell/IOWriter.zig +++ b/src/shell/IOWriter.zig @@ -19,8 +19,13 @@ pub const ref = RefCount.ref; pub const deref = RefCount.deref; ref_count: RefCount, -writer: WriterImpl = if (bun.Environment.isWindows) .{} else .{ .close_fd = false }, -fd: bun.FileDescriptor, +writer: WriterImpl = if (bun.Environment.isWindows) .{ + // Tell the Windows PipeWriter impl to *not* close the file descriptor, + // unfortunately this won't work if it creates a uv_pipe or uv_tty as those + // types own their file descriptor + .owns_fd = false, +} else .{ .close_fd = false }, +fd: MovableIfWindowsFd, writers: Writers = .{ .inlined = .{} }, buf: std.ArrayListUnmanaged(u8) = .{}, /// quick hack to get windows working @@ -81,7 +86,7 @@ pub const Flags = packed struct(u8) { pub fn init(fd: bun.FileDescriptor, flags: Flags, evtloop: jsc.EventLoopHandle) *IOWriter { const this = bun.new(IOWriter, .{ .ref_count = .init(), - .fd = fd, + .fd = MovableIfWindowsFd.init(fd), .evtloop = evtloop, .concurrent_task = jsc.EventLoopTask.fromEventLoop(evtloop), .concurrent_task2 = jsc.EventLoopTask.fromEventLoop(evtloop), @@ -96,8 +101,9 @@ pub fn init(fd: bun.FileDescriptor, flags: Flags, evtloop: jsc.EventLoopHandle) } pub fn __start(this: *IOWriter) Maybe(void) { + bun.assert(this.fd.isOwned()); debug("IOWriter(0x{x}, fd={}) __start()", .{ @intFromPtr(this), this.fd }); - if (this.writer.start(this.fd, this.flags.pollable).asErr()) |e_| { + if (this.writer.start(&this.fd, this.flags.pollable).asErr()) |e_| { const e: bun.sys.Error = e_; if (bun.Environment.isPosix) { // We get this if we pass in a file descriptor that is not @@ -140,7 +146,7 @@ pub fn __start(this: *IOWriter) Maybe(void) { this.flags.pollable = false; this.flags.nonblocking = false; this.flags.is_socket = false; - return this.writer.startWithFile(this.fd); + return this.writer.startWithFile(this.fd.get().?); } } return .{ .err = e }; @@ -157,6 +163,10 @@ pub fn __start(this: *IOWriter) Maybe(void) { } } + if (comptime bun.Environment.isWindows) { + log("IOWriter(0x{x}, {}) starting with source={s}", .{ @intFromPtr(this), this.fd, if (this.writer.source) |src| @tagName(src) else "no source lol" }); + } + return .success; } @@ -637,6 +647,7 @@ pub fn enqueueFmt( fn asyncDeinit(this: *@This()) void { debug("IOWriter(0x{x}, fd={}) asyncDeinit", .{ @intFromPtr(this), this.fd }); + bun.assert(!this.is_writing); this.async_deinit.enqueue(); } @@ -648,7 +659,10 @@ pub fn deinitOnMainThread(this: *IOWriter) void { if (this.writer.handle == .poll and this.writer.handle.poll.isRegistered()) { this.writer.handle.closeImpl(null, {}, false); } - } else this.winbuf.deinit(bun.default_allocator); + } else { + this.writer.close(); + this.winbuf.deinit(bun.default_allocator); + } if (this.fd.isValid()) this.fd.close(); this.writer.disableKeepingProcessAlive(this.evtloop); bun.destroy(this); @@ -760,6 +774,7 @@ fn tryWriteWithWriteFn(fd: bun.FileDescriptor, buf: []const u8, comptime write_f } pub fn drainBufferedData(parent: *IOWriter, buf: []const u8, max_write_size: usize, received_hup: bool) bun.io.WriteResult { + bun.assert(bun.Environment.isPosix); _ = received_hup; const trimmed = if (max_write_size < buf.len and max_write_size > 0) buf[0..max_write_size] else buf; @@ -767,7 +782,7 @@ pub fn drainBufferedData(parent: *IOWriter, buf: []const u8, max_write_size: usi var drained: usize = 0; while (drained < trimmed.len) { - const attempt = tryWriteWithWriteFn(parent.fd, buf, bun.sys.write); + const attempt = tryWriteWithWriteFn(parent.fd.get().?, buf, bun.sys.write); switch (attempt) { .pending => |pending| { drained += pending; @@ -840,6 +855,7 @@ const log = bun.Output.scoped(.IOWriter, .hidden); const std = @import("std"); const bun = @import("bun"); +const MovableIfWindowsFd = bun.MovableIfWindowsFd; const assert = bun.assert; const jsc = bun.jsc; const Maybe = bun.sys.Maybe; diff --git a/src/shell/Yield.zig b/src/shell/Yield.zig index e678947ea0..967522a823 100644 --- a/src/shell/Yield.zig +++ b/src/shell/Yield.zig @@ -76,7 +76,7 @@ pub const Yield = union(enum) { bun.debugAssert(_dbg_catch_exec_within_exec <= MAX_DEPTH); if (comptime Environment.isDebug) _dbg_catch_exec_within_exec += 1; defer { - if (comptime Environment.isDebug) log("Yield({s}) _dbg_catch_exec_within_exec = {d} - 1 = {d}", .{ @tagName(this), _dbg_catch_exec_within_exec, _dbg_catch_exec_within_exec + 1 }); + if (comptime Environment.isDebug) log("Yield({s}) _dbg_catch_exec_within_exec = {d} - 1 = {d}", .{ @tagName(this), _dbg_catch_exec_within_exec, _dbg_catch_exec_within_exec - 1 }); if (comptime Environment.isDebug) _dbg_catch_exec_within_exec -= 1; } @@ -108,6 +108,7 @@ pub const Yield = union(enum) { } continue :state x.next(); } + bun.assert_eql(std.mem.indexOfScalar(*Pipeline, pipeline_stack.items, x), null); pipeline_stack.append(x) catch bun.outOfMemory(); continue :state x.next(); }, diff --git a/src/shell/states/Pipeline.zig b/src/shell/states/Pipeline.zig index 88828c6639..a466d892e1 100644 --- a/src/shell/states/Pipeline.zig +++ b/src/shell/states/Pipeline.zig @@ -282,12 +282,10 @@ pub fn deinit(this: *Pipeline) void { fn initializePipes(pipes: []Pipe, set_count: *u32) Maybe(void) { for (pipes) |*pipe| { if (bun.Environment.isWindows) { - var fds: [2]uv.uv_file = undefined; - if (uv.uv_pipe(&fds, 0, 0).errEnum()) |e| { - return .{ .err = Syscall.Error.fromCode(e, .pipe) }; - } - pipe[0] = .fromUV(fds[0]); - pipe[1] = .fromUV(fds[1]); + pipe.* = switch (bun.sys.pipe()) { + .result => |p| p, + .err => |e| return .{ .err = e }, + }; } else { switch (bun.sys.socketpairForShell( // switch (bun.sys.socketpair( @@ -353,9 +351,5 @@ const Subshell = bun.shell.Interpreter.Subshell; const Pipe = bun.shell.interpret.Pipe; const StatePtrUnion = bun.shell.interpret.StatePtrUnion; -const Syscall = bun.shell.interpret.Syscall; const closefd = bun.shell.interpret.closefd; const log = bun.shell.interpret.log; - -const windows = bun.windows; -const uv = windows.libuv; diff --git a/src/sys.zig b/src/sys.zig index 400588d39b..d7908f4f7c 100644 --- a/src/sys.zig +++ b/src/sys.zig @@ -3371,12 +3371,24 @@ pub fn disableLinger(fd: bun.FileDescriptor) void { pub fn pipe() Maybe([2]bun.FileDescriptor) { if (comptime Environment.isWindows) { - @panic("TODO: Implement `pipe()` for Windows"); + const uv = bun.windows.libuv; + var out: [2]bun.FileDescriptor = undefined; + var fds: [2]uv.uv_file = undefined; + if (uv.uv_pipe(&fds, 0, 0).errEnum()) |e| { + const err = Error.fromCode(e, .pipe); + log("pipe() = {}", .{err}); + return .{ .err = err }; + } + out[0] = .fromUV(fds[0]); + out[1] = .fromUV(fds[1]); + log("pipe() = [{}, {}]", .{ out[0], out[1] }); + return .{ .result = out }; } var fds: [2]i32 = undefined; const rc = syscall.pipe(&fds); if (Maybe([2]bun.FileDescriptor).errnoSys(rc, .pipe)) |err| { + log("pipe() = {}", .{err}); return err; } log("pipe() = [{d}, {d}]", .{ fds[0], fds[1] }); diff --git a/test/js/bun/shell/pipeline_stack.test.ts b/test/js/bun/shell/pipeline_stack.test.ts index f0edbf3935..8e04ea6132 100644 --- a/test/js/bun/shell/pipeline_stack.test.ts +++ b/test/js/bun/shell/pipeline_stack.test.ts @@ -79,7 +79,7 @@ describe("pipeline stack edge cases", () => { .ensureTempDir() .runAsTest("cd | pwd - cd doesn't affect next command in pipeline"); - TestBuilder.command`cd / | cd /tmp | pwd` + TestBuilder.command`mkdir foo; mkdir foo/bar; cd foo | cd foo/bar | pwd` .stdout(s => s.includes("$TEMP_DIR")) .ensureTempDir() .runAsTest("cd | cd | pwd - multiple cd's don't affect");