From 0aeb5004df31d5b3dcc9c595d79b35a4113bc153 Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Tue, 20 Feb 2024 20:07:06 -0300 Subject: [PATCH] fix closing on PipeWriter and PipeReader --- src/io/PipeReader.zig | 44 ++++++++++++++++++++++++----------- src/io/PipeWriter.zig | 53 +++++++++++++++++++++++++++++++++---------- src/io/source.zig | 2 +- 3 files changed, 73 insertions(+), 26 deletions(-) diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 1da21acd15..e9e0a36de3 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -449,17 +449,24 @@ pub fn WindowsPipeReader( pub fn close(this: *This) void { _ = this.stopReading(); if (this.source) |source| { - if (source == .file) { - source.file.fs.deinit(); - source.setData(this); - _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, null); - source.file.fs.deinit(); - // mark "closed" - this.source.?.file.file = -1; - done(this); - return; + switch (source) { + .file => |file| { + file.fs.deinit(); + file.fs.data = file; + // TODO: handle this error instead of ignoring it + _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&onFileClose)); + }, + .pipe => |pipe| { + pipe.data = pipe; + pipe.close(onPipeClose); + }, + .tty => |tty| { + tty.data = tty; + tty.close(onTTYClose); + }, } - source.getHandle().close(onCloseSource); + this.source = null; + done(this); } } @@ -470,9 +477,20 @@ pub fn WindowsPipeReader( .onError = onError, }; - fn onCloseSource(handle: *uv.Handle) callconv(.C) void { - const this = bun.cast(*This, handle.data); - done(this); + fn onFileClose(handle: *uv.fs_t) callconv(.C) void { + const file = bun.cast(*Source.File, handle.data); + file.fs.deinit(); + bun.default_allocator.destroy(file); + } + + fn onPipeClose(handle: *uv.Pipe) callconv(.C) void { + const this = bun.cast(*uv.Pipe, handle.data); + bun.default_allocator.destroy(this); + } + + fn onTTYClose(handle: *uv.uv_tty_t) callconv(.C) void { + const this = bun.cast(*uv.uv_tty_t, handle.data); + bun.default_allocator.destroy(this); } pub fn onRead(this: *This, amount: bun.JSC.Maybe(usize), slice: []u8, hasMore: ReadState) void { diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index 0e67a7aa71..1f3612c015 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -717,16 +717,43 @@ fn BaseWindowsPipeWriter( this.updateRef(event_loop, false); } + fn onFileClose(handle: *uv.fs_t) callconv(.C) void { + const file = bun.cast(*Source.File, handle.data); + file.fs.deinit(); + bun.default_allocator.destroy(file); + } + + fn onPipeClose(handle: *uv.Pipe) callconv(.C) void { + const this = bun.cast(*uv.Pipe, handle.data); + bun.default_allocator.destroy(this); + } + + fn onTTYClose(handle: *uv.uv_tty_t) callconv(.C) void { + const this = bun.cast(*uv.uv_tty_t, handle.data); + bun.default_allocator.destroy(this); + } + pub fn close(this: *WindowsPipeWriter) void { this.is_done = true; if (this.source) |source| { - if (source == .file) { - source.file.fs.deinit(); - // TODO: handle this error instead of ignoring it - _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&WindowsPipeWriter.onCloseSource)); - return; + switch (source) { + .file => |file| { + file.fs.deinit(); + file.fs.data = file; + // TODO: handle this error instead of ignoring it + _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&onFileClose)); + }, + .pipe => |pipe| { + pipe.data = pipe; + pipe.close(onPipeClose); + }, + .tty => |tty| { + tty.data = tty; + tty.close(onTTYClose); + }, } - source.getHandle().close(&WindowsPipeWriter.onCloseSource); + this.source = null; + this.onCloseSource(); } } @@ -806,8 +833,7 @@ pub fn WindowsBufferedWriter( pub usingnamespace BaseWindowsPipeWriter(WindowsWriter, Parent); - fn onCloseSource(pipe: *uv.Handle) callconv(.C) void { - const this = bun.cast(*WindowsWriter, pipe.data); + fn onCloseSource(this: *WindowsWriter) void { if (onClose) |onCloseFn| { onCloseFn(this.parent); } @@ -868,8 +894,10 @@ pub fn WindowsBufferedWriter( switch (pipe) { .file => |file| { this.pending_payload_size = buffer.len; - uv.uv_fs_req_cleanup(&file.fs); + file.fs.deinit(); + file.fs.setData(this); this.write_buffer = uv.uv_buf_t.init(buffer); + if (uv.uv_fs_write(uv.Loop.get(), &file.fs, file.file, @ptrCast(&this.write_buffer), 1, -1, onFsWriteComplete).toError(.write)) |err| { this.close(); onError(this.parent, err); @@ -1015,8 +1043,7 @@ pub fn WindowsStreamingWriter( pub usingnamespace BaseWindowsPipeWriter(WindowsWriter, Parent); - fn onCloseSource(pipe: *uv.Handle) callconv(.C) void { - const this = bun.cast(*WindowsWriter, pipe.data); + fn onCloseSource(this: *WindowsWriter) void { this.source = null; if (!this.closed_without_reporting) { onClose(this.parent); @@ -1114,8 +1141,10 @@ pub fn WindowsStreamingWriter( this.outgoing = temp; switch (pipe) { .file => |file| { - uv.uv_fs_req_cleanup(&file.fs); + file.fs.deinit(); + file.fs.setData(this); this.write_buffer = uv.uv_buf_t.init(bytes); + if (uv.uv_fs_write(uv.Loop.get(), &file.fs, file.file, @ptrCast(&this.write_buffer), 1, -1, onFsWriteComplete).toError(.write)) |err| { this.last_write_result = .{ .err = err }; onError(this.parent, err); diff --git a/src/io/source.zig b/src/io/source.zig index f4f1dea723..1ed036f095 100644 --- a/src/io/source.zig +++ b/src/io/source.zig @@ -11,7 +11,7 @@ pub const Source = union(enum) { const Pipe = uv.Pipe; const Tty = uv.uv_tty_t; - const File = struct { + pub const File = struct { fs: uv.fs_t, iov: uv.uv_buf_t, file: uv.uv_file,