Compare commits

...

6 Commits

Author SHA1 Message Date
Jarred Sumner
5ab217da9e Propagate errors in open 2024-03-29 16:53:37 -07:00
Jarred Sumner
2c11b9e7e6 Merge branch 'main' into ciro/fix-file-reader 2024-03-29 16:13:02 -07:00
cirospaciari
19fe6db6de avoid possible memory leaks 2024-03-29 20:03:25 -03:00
cirospaciari
bab8d1bc96 also fix pipe writer 2024-03-29 19:48:10 -03:00
cirospaciari
2a1db65265 report continue errors and fix closing 2024-03-29 19:48:10 -03:00
cirospaciari
fab30db3e2 fix canceled onFileRead 2024-03-29 19:48:10 -03:00
3 changed files with 65 additions and 23 deletions

View File

@@ -368,29 +368,46 @@ pub fn WindowsPipeReader(
}
fn onFileRead(fs: *uv.fs_t) callconv(.C) void {
var this: *This = bun.cast(*This, fs.data);
const nread_int = fs.result.int();
const continue_reading = !this.flags.is_paused;
this.flags.is_paused = true;
const result = fs.result;
const nread_int = result.int();
bun.sys.syslog("onFileRead({}) = {d}", .{ bun.toFD(fs.file.fd), nread_int });
if (nread_int == uv.UV_ECANCELED) {
fs.deinit();
return;
}
var this: *This = bun.cast(*This, fs.data);
fs.deinit();
switch (nread_int) {
// 0 actually means EOF too
0, uv.UV_EOF => {
this.flags.is_paused = true;
this.onRead(.{ .result = 0 }, "", .eof);
},
uv.UV_ECANCELED => {
this.onRead(.{ .result = 0 }, "", .drained);
},
// UV_ECANCELED needs to be on the top so we avoid UAF
uv.UV_ECANCELED => unreachable,
else => {
if (fs.result.toError(.read)) |err| {
if (result.toError(.read)) |err| {
this.flags.is_paused = true;
this.onRead(.{ .err = err }, "", .progress);
return;
}
// continue reading
defer {
if (continue_reading) {
_ = this.startReading();
// if we are not paused we keep reading until EOF or err
if (!this.flags.is_paused) {
if (this.source) |source| {
if (source == .file) {
const file = source.file;
source.setData(this);
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
this.flags.is_paused = true;
// we should inform the error if we are unable to keep reading
this.onRead(.{ .err = err }, "", .progress);
}
}
}
}
}
@@ -455,9 +472,9 @@ pub fn WindowsPipeReader(
if (this.source) |source| {
switch (source) {
.sync_file, .file => |file| {
file.fs.deinit();
file.fs.data = file;
_ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&onFileClose));
// always use close_fs here because we can have a operation in progress
file.close_fs.data = file;
_ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, @ptrCast(&onFileClose));
},
.pipe => |pipe| {
pipe.data = pipe;
@@ -487,7 +504,7 @@ pub fn WindowsPipeReader(
fn onFileClose(handle: *uv.fs_t) callconv(.C) void {
const file = bun.cast(*Source.File, handle.data);
file.fs.deinit();
handle.deinit();
bun.default_allocator.destroy(file);
}
@@ -1062,8 +1079,7 @@ pub const WindowsBufferedReader = struct {
this.source.?.setData(this);
this.buffer().clearRetainingCapacity();
this.flags.is_done = false;
this.unpause();
return .{ .result = {} };
return this.startReading();
}
pub fn startWithPipe(this: *WindowsOutputReader, pipe: *uv.Pipe) bun.JSC.Maybe(void) {

View File

@@ -767,7 +767,7 @@ fn BaseWindowsPipeWriter(
fn onFileClose(handle: *uv.fs_t) callconv(.C) void {
const file = bun.cast(*Source.File, handle.data);
file.fs.deinit();
handle.deinit();
bun.default_allocator.destroy(file);
}
@@ -786,9 +786,9 @@ fn BaseWindowsPipeWriter(
if (this.source) |source| {
switch (source) {
.file => |file| {
file.fs.deinit();
file.fs.data = file;
_ = uv.uv_fs_close(uv.Loop.get(), &file.fs, file.file, @ptrCast(&onFileClose));
// always use close_fs here because we can have a operation in progress
file.close_fs.data = file;
_ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, @ptrCast(&onFileClose));
},
.sync_file => {
// no-op
@@ -1205,8 +1205,15 @@ pub fn WindowsStreamingWriter(
}
fn onFsWriteComplete(fs: *uv.fs_t) callconv(.C) void {
const result = fs.result;
if (result.int() == uv.UV_ECANCELED) {
fs.deinit();
return;
}
const this = bun.cast(*WindowsWriter, fs.data);
if (fs.result.toError(.write)) |err| {
fs.deinit();
if (result.toError(.write)) |err| {
this.close();
onError(this.parent, err);
return;

View File

@@ -15,6 +15,10 @@ pub const Source = union(enum) {
pub const File = struct {
fs: uv.fs_t,
// we need a new fs_t to close the file
// the current one is used for write/reading/canceling
// we dont wanna to free any data that is being used in uv loop
close_fs: uv.fs_t,
iov: uv.uv_buf_t,
file: uv.uv_file,
};
@@ -145,8 +149,9 @@ pub const Source = union(enum) {
}
pub fn open(loop: *uv.Loop, fd: bun.FileDescriptor) bun.JSC.Maybe(Source) {
log("open (fd = {})", .{fd});
const rc = bun.windows.GetFileType(fd.cast());
log("open(fd: {}, type: {d})", .{ fd, rc });
switch (rc) {
bun.windows.FILE_TYPE_PIPE => {
switch (openPipe(loop, fd)) {
@@ -160,6 +165,20 @@ pub const Source = union(enum) {
.err => |err| return .{ .err = err },
}
},
bun.windows.FILE_TYPE_UNKNOWN => {
const errno = bun.windows.getLastErrno();
if (errno == .SUCCESS) {
// If it's nul, let's pretend its a pipe
// that seems to be the mode that libuv is happiest with.
return switch (openPipe(loop, fd)) {
.result => |pipe| return .{ .result = .{ .pipe = pipe } },
.err => |err| return .{ .err = err },
};
}
return .{ .err = bun.sys.Error.fromCode(errno, .open) };
},
else => {
return .{
.result = .{