diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 3a79545b16..930a4bb915 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -368,29 +368,46 @@ pub fn WindowsPipeReader( } fn onFileRead(fs: *uv.fs_t) callconv(.C) void { - var this: *This = bun.cast(*This, fs.data); - const nread_int = fs.result.int(); - const continue_reading = !this.flags.is_paused; - this.flags.is_paused = true; + const result = fs.result; + const nread_int = result.int(); bun.sys.syslog("onFileRead({}) = {d}", .{ bun.toFD(fs.file.fd), nread_int }); + if (nread_int == uv.UV_ECANCELED) { + fs.deinit(); + return; + } + var this: *This = bun.cast(*This, fs.data); + fs.deinit(); switch (nread_int) { // 0 actually means EOF too 0, uv.UV_EOF => { + this.flags.is_paused = true; this.onRead(.{ .result = 0 }, "", .eof); }, - uv.UV_ECANCELED => { - this.onRead(.{ .result = 0 }, "", .drained); - }, + // UV_ECANCELED needs to be on the top so we avoid UAF + uv.UV_ECANCELED => unreachable, else => { - if (fs.result.toError(.read)) |err| { + if (result.toError(.read)) |err| { + this.flags.is_paused = true; this.onRead(.{ .err = err }, "", .progress); return; } - // continue reading defer { - if (continue_reading) { - _ = this.startReading(); + // if we are not paused we keep reading until EOF or err + if (!this.flags.is_paused) { + if (this.source) |source| { + if (source == .file) { + const file = source.file; + source.setData(this); + const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); + file.iov = uv.uv_buf_t.init(buf); + if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| { + this.flags.is_paused = true; + // we should inform the error if we are unable to keep reading + this.onRead(.{ .err = err }, "", .progress); + } + } + } } } @@ -455,9 +472,9 @@ pub fn WindowsPipeReader( if (this.source) |source| { switch (source) { .sync_file, .file => |file| { - file.fs.deinit(); - file.fs.data = file; - _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&onFileClose)); + // always use close_fs here because we can have a operation in progress + file.close_fs.data = file; + _ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, @ptrCast(&onFileClose)); }, .pipe => |pipe| { pipe.data = pipe; @@ -487,7 +504,7 @@ pub fn WindowsPipeReader( fn onFileClose(handle: *uv.fs_t) callconv(.C) void { const file = bun.cast(*Source.File, handle.data); - file.fs.deinit(); + handle.deinit(); bun.default_allocator.destroy(file); } @@ -1062,8 +1079,7 @@ pub const WindowsBufferedReader = struct { this.source.?.setData(this); this.buffer().clearRetainingCapacity(); this.flags.is_done = false; - this.unpause(); - return .{ .result = {} }; + return this.startReading(); } pub fn startWithPipe(this: *WindowsOutputReader, pipe: *uv.Pipe) bun.JSC.Maybe(void) { diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index cd5e210c43..beee6c54d3 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -767,7 +767,7 @@ fn BaseWindowsPipeWriter( fn onFileClose(handle: *uv.fs_t) callconv(.C) void { const file = bun.cast(*Source.File, handle.data); - file.fs.deinit(); + handle.deinit(); bun.default_allocator.destroy(file); } @@ -786,9 +786,9 @@ fn BaseWindowsPipeWriter( if (this.source) |source| { switch (source) { .file => |file| { - file.fs.deinit(); - file.fs.data = file; - _ = uv.uv_fs_close(uv.Loop.get(), &file.fs, file.file, @ptrCast(&onFileClose)); + // always use close_fs here because we can have a operation in progress + file.close_fs.data = file; + _ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, @ptrCast(&onFileClose)); }, .sync_file => { // no-op @@ -1205,8 +1205,15 @@ pub fn WindowsStreamingWriter( } fn onFsWriteComplete(fs: *uv.fs_t) callconv(.C) void { + const result = fs.result; + if (result.int() == uv.UV_ECANCELED) { + fs.deinit(); + return; + } const this = bun.cast(*WindowsWriter, fs.data); - if (fs.result.toError(.write)) |err| { + + fs.deinit(); + if (result.toError(.write)) |err| { this.close(); onError(this.parent, err); return; diff --git a/src/io/source.zig b/src/io/source.zig index b001c32780..fd00b78630 100644 --- a/src/io/source.zig +++ b/src/io/source.zig @@ -15,6 +15,10 @@ pub const Source = union(enum) { pub const File = struct { fs: uv.fs_t, + // we need a new fs_t to close the file + // the current one is used for write/reading/canceling + // we dont wanna to free any data that is being used in uv loop + close_fs: uv.fs_t, iov: uv.uv_buf_t, file: uv.uv_file, }; @@ -145,8 +149,9 @@ pub const Source = union(enum) { } pub fn open(loop: *uv.Loop, fd: bun.FileDescriptor) bun.JSC.Maybe(Source) { - log("open (fd = {})", .{fd}); const rc = bun.windows.GetFileType(fd.cast()); + log("open(fd: {}, type: {d})", .{ fd, rc }); + switch (rc) { bun.windows.FILE_TYPE_PIPE => { switch (openPipe(loop, fd)) { @@ -160,6 +165,20 @@ pub const Source = union(enum) { .err => |err| return .{ .err = err }, } }, + bun.windows.FILE_TYPE_UNKNOWN => { + const errno = bun.windows.getLastErrno(); + + if (errno == .SUCCESS) { + // If it's nul, let's pretend its a pipe + // that seems to be the mode that libuv is happiest with. + return switch (openPipe(loop, fd)) { + .result => |pipe| return .{ .result = .{ .pipe = pipe } }, + .err => |err| return .{ .err = err }, + }; + } + + return .{ .err = bun.sys.Error.fromCode(errno, .open) }; + }, else => { return .{ .result = .{