Fix pipeline stack errors on Windows (#21800)

### What does this PR do?

### How did you verify your code works?

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Zack Radisic
2025-08-14 18:03:26 -07:00
committed by GitHub
parent 7dd85f9dd4
commit 0845231a1e
11 changed files with 161 additions and 25 deletions

View File

@@ -138,7 +138,21 @@ pub const OutKind = union(enum) {
return switch (this) {
.fd => |val| brk: {
shellio.* = val.writer.refSelf();
break :brk if (val.captured) |cap| .{ .capture = .{ .buf = cap, .fd = val.writer.fd } } else .{ .fd = val.writer.fd };
break :brk if (val.captured) |cap| .{
.capture = .{
.buf = cap,
},
} else .{
// Windows notes:
// Since `val.writer.fd` is `MovableFD`, it could
// technically be moved to libuv for ownership.
//
// But since this file descriptor never going to be touched by this
// process, except to hand off to the subprocess when we
// spawn it, we don't really care if the file descriptor
// ends up being invalid.
.fd = val.writer.fd.get().?,
};
},
.pipe => .pipe,
.ignore => .ignore,

View File

@@ -159,6 +159,7 @@ pub fn onReadChunk(ptr: *anyopaque, chunk: []const u8, has_more: bun.io.ReadStat
}
pub fn onReaderError(this: *IOReader, err: bun.sys.Error) void {
log("IOReader(0x{x}.onReaderError({err}) ", .{ @intFromPtr(this), err });
this.setReading(false);
this.err = err.toShellSystemError();
for (this.readers.slice()) |r| {

View File

@@ -19,8 +19,13 @@ pub const ref = RefCount.ref;
pub const deref = RefCount.deref;
ref_count: RefCount,
writer: WriterImpl = if (bun.Environment.isWindows) .{} else .{ .close_fd = false },
fd: bun.FileDescriptor,
writer: WriterImpl = if (bun.Environment.isWindows) .{
// Tell the Windows PipeWriter impl to *not* close the file descriptor,
// unfortunately this won't work if it creates a uv_pipe or uv_tty as those
// types own their file descriptor
.owns_fd = false,
} else .{ .close_fd = false },
fd: MovableIfWindowsFd,
writers: Writers = .{ .inlined = .{} },
buf: std.ArrayListUnmanaged(u8) = .{},
/// quick hack to get windows working
@@ -81,7 +86,7 @@ pub const Flags = packed struct(u8) {
pub fn init(fd: bun.FileDescriptor, flags: Flags, evtloop: jsc.EventLoopHandle) *IOWriter {
const this = bun.new(IOWriter, .{
.ref_count = .init(),
.fd = fd,
.fd = MovableIfWindowsFd.init(fd),
.evtloop = evtloop,
.concurrent_task = jsc.EventLoopTask.fromEventLoop(evtloop),
.concurrent_task2 = jsc.EventLoopTask.fromEventLoop(evtloop),
@@ -96,8 +101,9 @@ pub fn init(fd: bun.FileDescriptor, flags: Flags, evtloop: jsc.EventLoopHandle)
}
pub fn __start(this: *IOWriter) Maybe(void) {
bun.assert(this.fd.isOwned());
debug("IOWriter(0x{x}, fd={}) __start()", .{ @intFromPtr(this), this.fd });
if (this.writer.start(this.fd, this.flags.pollable).asErr()) |e_| {
if (this.writer.start(&this.fd, this.flags.pollable).asErr()) |e_| {
const e: bun.sys.Error = e_;
if (bun.Environment.isPosix) {
// We get this if we pass in a file descriptor that is not
@@ -140,7 +146,7 @@ pub fn __start(this: *IOWriter) Maybe(void) {
this.flags.pollable = false;
this.flags.nonblocking = false;
this.flags.is_socket = false;
return this.writer.startWithFile(this.fd);
return this.writer.startWithFile(this.fd.get().?);
}
}
return .{ .err = e };
@@ -157,6 +163,10 @@ pub fn __start(this: *IOWriter) Maybe(void) {
}
}
if (comptime bun.Environment.isWindows) {
log("IOWriter(0x{x}, {}) starting with source={s}", .{ @intFromPtr(this), this.fd, if (this.writer.source) |src| @tagName(src) else "no source lol" });
}
return .success;
}
@@ -637,6 +647,7 @@ pub fn enqueueFmt(
fn asyncDeinit(this: *@This()) void {
debug("IOWriter(0x{x}, fd={}) asyncDeinit", .{ @intFromPtr(this), this.fd });
bun.assert(!this.is_writing);
this.async_deinit.enqueue();
}
@@ -648,7 +659,10 @@ pub fn deinitOnMainThread(this: *IOWriter) void {
if (this.writer.handle == .poll and this.writer.handle.poll.isRegistered()) {
this.writer.handle.closeImpl(null, {}, false);
}
} else this.winbuf.deinit(bun.default_allocator);
} else {
this.writer.close();
this.winbuf.deinit(bun.default_allocator);
}
if (this.fd.isValid()) this.fd.close();
this.writer.disableKeepingProcessAlive(this.evtloop);
bun.destroy(this);
@@ -760,6 +774,7 @@ fn tryWriteWithWriteFn(fd: bun.FileDescriptor, buf: []const u8, comptime write_f
}
pub fn drainBufferedData(parent: *IOWriter, buf: []const u8, max_write_size: usize, received_hup: bool) bun.io.WriteResult {
bun.assert(bun.Environment.isPosix);
_ = received_hup;
const trimmed = if (max_write_size < buf.len and max_write_size > 0) buf[0..max_write_size] else buf;
@@ -767,7 +782,7 @@ pub fn drainBufferedData(parent: *IOWriter, buf: []const u8, max_write_size: usi
var drained: usize = 0;
while (drained < trimmed.len) {
const attempt = tryWriteWithWriteFn(parent.fd, buf, bun.sys.write);
const attempt = tryWriteWithWriteFn(parent.fd.get().?, buf, bun.sys.write);
switch (attempt) {
.pending => |pending| {
drained += pending;
@@ -840,6 +855,7 @@ const log = bun.Output.scoped(.IOWriter, .hidden);
const std = @import("std");
const bun = @import("bun");
const MovableIfWindowsFd = bun.MovableIfWindowsFd;
const assert = bun.assert;
const jsc = bun.jsc;
const Maybe = bun.sys.Maybe;

View File

@@ -76,7 +76,7 @@ pub const Yield = union(enum) {
bun.debugAssert(_dbg_catch_exec_within_exec <= MAX_DEPTH);
if (comptime Environment.isDebug) _dbg_catch_exec_within_exec += 1;
defer {
if (comptime Environment.isDebug) log("Yield({s}) _dbg_catch_exec_within_exec = {d} - 1 = {d}", .{ @tagName(this), _dbg_catch_exec_within_exec, _dbg_catch_exec_within_exec + 1 });
if (comptime Environment.isDebug) log("Yield({s}) _dbg_catch_exec_within_exec = {d} - 1 = {d}", .{ @tagName(this), _dbg_catch_exec_within_exec, _dbg_catch_exec_within_exec - 1 });
if (comptime Environment.isDebug) _dbg_catch_exec_within_exec -= 1;
}
@@ -108,6 +108,7 @@ pub const Yield = union(enum) {
}
continue :state x.next();
}
bun.assert_eql(std.mem.indexOfScalar(*Pipeline, pipeline_stack.items, x), null);
pipeline_stack.append(x) catch bun.outOfMemory();
continue :state x.next();
},

View File

@@ -282,12 +282,10 @@ pub fn deinit(this: *Pipeline) void {
fn initializePipes(pipes: []Pipe, set_count: *u32) Maybe(void) {
for (pipes) |*pipe| {
if (bun.Environment.isWindows) {
var fds: [2]uv.uv_file = undefined;
if (uv.uv_pipe(&fds, 0, 0).errEnum()) |e| {
return .{ .err = Syscall.Error.fromCode(e, .pipe) };
}
pipe[0] = .fromUV(fds[0]);
pipe[1] = .fromUV(fds[1]);
pipe.* = switch (bun.sys.pipe()) {
.result => |p| p,
.err => |e| return .{ .err = e },
};
} else {
switch (bun.sys.socketpairForShell(
// switch (bun.sys.socketpair(
@@ -353,9 +351,5 @@ const Subshell = bun.shell.Interpreter.Subshell;
const Pipe = bun.shell.interpret.Pipe;
const StatePtrUnion = bun.shell.interpret.StatePtrUnion;
const Syscall = bun.shell.interpret.Syscall;
const closefd = bun.shell.interpret.closefd;
const log = bun.shell.interpret.log;
const windows = bun.windows;
const uv = windows.libuv;