diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index 01127ed85e..75ae381c5f 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -468,7 +468,7 @@ fn ReqMixin(comptime Type: type) type { uv_req_set_data(@ptrCast(handle), ptr); } pub fn cancel(this: *Type) void { - uv_cancel(@ptrCast(this)); + _ = uv_cancel(@ptrCast(this)); } }; } @@ -1712,6 +1712,7 @@ pub const fs_t = extern struct { sys_errno_: DWORD, file: union_unnamed_450, fs: union_unnamed_451, + pub usingnamespace ReqMixin(@This()); pub inline fn deinit(this: *fs_t) void { this.assert(); @@ -2723,63 +2724,6 @@ pub const ReturnCodeI64 = enum(i64) { pub const addrinfo = std.os.windows.ws2_32.addrinfo; -pub fn StreamReaderMixin(comptime Type: type, comptime pipe_field_name: std.meta.FieldEnum(Type)) type { - return struct { - fn uv_alloc_cb(pipe: *uv_stream_t, suggested_size: usize, buf: *uv_buf_t) callconv(.C) void { - var this = bun.cast(*Type, pipe.data); - const result = this.getReadBufferWithStableMemoryAddress(suggested_size); - buf.* = uv_buf_t.init(result); - } - - fn uv_read_cb(pipe: *uv_stream_t, nread: ReturnCodeI64, buf: *const uv_buf_t) callconv(.C) void { - var this = bun.cast(*Type, pipe.data); - - const read = nread.int(); - - switch (read) { - 0 => { - // EAGAIN or EWOULDBLOCK - return this.onRead(.{ .result = 0 }, buf, .drained); - }, - UV_EOF => { - // EOF - return this.onRead(.{ .result = 0 }, buf, .eof); - }, - else => { - this.onRead(if (nread.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(read) }, buf, .progress); - }, - } - } - - fn __get_pipe(this: *Type) ?*uv_stream_t { - switch (@TypeOf(@field(this, @tagName(pipe_field_name)))) { - ?*Pipe, ?*uv_tcp_t, ?*uv_tty_t => return if (@field(this, @tagName(pipe_field_name))) |ptr| @ptrCast(ptr) else null, - *Pipe, *uv_tcp_t, *uv_tty_t => return @ptrCast(@field(this, @tagName(pipe_field_name))), - Pipe, uv_tcp_t, uv_tty_t => return @ptrCast(&@field(this, @tagName(pipe_field_name))), - else => @compileError("StreamWriterMixin only works with Pipe, uv_tcp_t, uv_tty_t"), - } - } - - pub fn startReading(this: *Type) Maybe(void) { - const pipe = __get_pipe(this) orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe) }; - - //TODO: change to pipe.readStart - if (uv_read_start(pipe, @ptrCast(&@This().uv_alloc_cb), @ptrCast(&@This().uv_read_cb)).toError(.open)) |err| { - return .{ .err = err }; - } - - return .{ .result = {} }; - } - - pub fn stopReading(this: *Type) Maybe(void) { - const pipe = __get_pipe(this) orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe) }; - pipe.readStop(); - - return .{ .result = {} }; - } - }; -} - // https://docs.libuv.org/en/v1.x/stream.html fn StreamMixin(comptime Type: type) type { return struct { diff --git a/src/fd.zig b/src/fd.zig index 20faf783dd..b8bac224ae 100644 --- a/src/fd.zig +++ b/src/fd.zig @@ -34,13 +34,15 @@ fn numberToHandle(handle: FDImpl.SystemAsInt) FDImpl.System { pub fn uv_get_osfhandle(in: c_int) libuv.uv_os_fd_t { const out = libuv.uv_get_osfhandle(in); - log("uv_get_osfhandle({d}) = {d}", .{ in, @intFromPtr(out) }); + // TODO: this is causing a dead lock because is also used on fd format + // log("uv_get_osfhandle({d}) = {d}", .{ in, @intFromPtr(out) }); return out; } pub fn uv_open_osfhandle(in: libuv.uv_os_fd_t) c_int { const out = libuv.uv_open_osfhandle(in); - log("uv_open_osfhandle({d}) = {d}", .{ @intFromPtr(in), out }); + // TODO: this is causing a dead lock because is also used on fd format + // log("uv_open_osfhandle({d}) = {d}", .{ @intFromPtr(in), out }); return out; } diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index f0361e853a..c38d0b3897 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -329,66 +329,100 @@ pub fn WindowsPipeReader( comptime onError: fn (*This, bun.sys.Error) void, ) type { return struct { - fn uv_alloc_cb(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void { + fn onStreamAlloc(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void { var this = bun.cast(*This, handle.data); const result = this.getReadBufferWithStableMemoryAddress(suggested_size); buf.* = uv.uv_buf_t.init(result); } - fn uv_stream_read_cb(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void { + fn onStreamRead(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void { var this = bun.cast(*This, stream.data); const nread_int = nread.int(); switch (nread_int) { 0 => { - // EAGAIN or EWOULDBLOCK or canceled - return this.onRead(.{ .result = 0 }, buf, .drained); + // EAGAIN or EWOULDBLOCK or canceled (buf is not safe to access here) + return this.onRead(.{ .result = 0 }, "", .drained); }, uv.UV_EOF => { - // EOF - return this.onRead(.{ .result = 0 }, buf, .eof); + // EOF (buf is not safe to access here) + return this.onRead(.{ .result = 0 }, "", .eof); }, else => { - this.onRead(if (nread.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress); + if (nread.toError(.recv)) |err| { + // ERROR (buf is not safe to access here) + this.onRead(.{ .err = err }, "", .progress); + return; + } + // we got some data we can slice the buffer! + const len: usize = @intCast(nread_int); + var slice = buf.slice(); + this.onRead(.{ .result = len }, slice[0..len], .progress); }, } } - fn uv_file_read_cb(fs: *uv.fs_t) callconv(.C) void { + fn onFileRead(fs: *uv.fs_t) callconv(.C) void { var this: *This = bun.cast(*This, fs.data); - const nread_int = fs.result.int(); - const buf = &this.*.source.?.file.iov; - switch (nread_int) { - 0, uv.UV_ECANCELED => - // EAGAIN or EWOULDBLOCK or canceled - this.onRead(.{ .result = 0 }, buf, .drained), - uv.UV_EOF => - // EOF - this.onRead(.{ .result = 0 }, buf, .eof), - else => this.onRead(if (fs.result.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress), + // EAGAIN or EWOULDBLOCK + 0 => { + // continue reading + if (!this.is_paused) { + _ = this.startReading(); + } + }, + uv.UV_ECANCELED => { + this.onRead(.{ .result = 0 }, "", .drained); + }, + uv.UV_EOF => { + this.onRead(.{ .result = 0 }, "", .eof); + }, + else => { + if (fs.result.toError(.recv)) |err| { + this.onRead(.{ .err = err }, "", .progress); + return; + } + // continue reading + defer { + if (!this.is_paused) { + _ = this.startReading(); + } + } + + const len: usize = @intCast(nread_int); + // we got some data lets get the current iov + if (this.*.source) |source| { + if (source == .file) { + var buf = source.file.iov.slice(); + return this.onRead(.{ .result = len }, buf[0..len], .progress); + } + } + // ops we should not hit this lets fail with EPIPE + std.debug.assert(false); + return this.onRead(.{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .read) }, "", .progress); + }, } - uv.uv_fs_req_cleanup(fs); } pub fn startReading(this: *This) bun.JSC.Maybe(void) { + if (!this.is_paused) return .{ .result = {} }; + this.is_paused = false; const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.BADF, .read) }; switch (source) { .file => |file| { - if (file.iov.len == 0) { - const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); - file.iov = uv.uv_buf_t.init(buf); - std.debug.assert(file.iov.len > 0); - } - if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, uv_file_read_cb).toError(.write)) |err| { + file.fs.deinit(); + const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); + file.iov = uv.uv_buf_t.init(buf); + if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| { return .{ .err = err }; } }, else => { - if (uv.uv_read_start(source.toStream(), &uv_alloc_cb, @ptrCast(&uv_stream_read_cb)).toError(.open)) |err| { + if (uv.uv_read_start(source.toStream(), &onStreamAlloc, @ptrCast(&onStreamRead)).toError(.open)) |err| { return .{ .err = err }; } }, @@ -398,14 +432,15 @@ pub fn WindowsPipeReader( } pub fn stopReading(this: *This) bun.JSC.Maybe(void) { + if (this.is_paused) return .{ .result = {} }; + this.is_paused = true; const source = this.source orelse return .{ .result = {} }; switch (source) { .file => |file| { - _ = uv.uv_cancel(@ptrCast(&file.fs)); + file.fs.cancel(); }, else => { - // can be safely ignored as per libuv documentation - _ = uv.uv_read_stop(source.toStream()); + source.toStream().readStop(); }, } return .{ .result = {} }; @@ -414,6 +449,12 @@ pub fn WindowsPipeReader( pub fn close(this: *This) void { _ = this.stopReading(); if (this.source) |source| { + if (source == .file) { + source.file.fs.deinit(); + // TODO: handle this error instead of ignoring it + _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&onCloseSource)); + return; + } source.getHandle().close(onCloseSource); } } @@ -428,38 +469,45 @@ pub fn WindowsPipeReader( fn onCloseSource(handle: *uv.Handle) callconv(.C) void { const this = bun.cast(*This, handle.data); switch (this.source.?) { - .file => |file| uv.uv_fs_req_cleanup(&file.fs), + .file => |file| file.fs.deinit(), else => {}, } done(this); } - pub fn onRead(this: *This, amount: bun.JSC.Maybe(usize), buf: *const uv.uv_buf_t, hasMore: ReadState) void { + pub fn onRead(this: *This, amount: bun.JSC.Maybe(usize), slice: []u8, hasMore: ReadState) void { if (amount == .err) { onError(this, amount.err); return; } - if (hasMore == .eof) { - _ = onReadChunk(this, "", hasMore); - close(this); - return; - } - - var buffer = getBuffer(this); - - if (comptime bun.Environment.allow_assert) { - if (!bun.isSliceInBuffer(buf.slice()[0..amount.result], buffer.allocatedSlice())) { - std.debug.print("buf len: {d}, buffer ln: {d}\n", .{ buf.slice().len, buffer.allocatedSlice().len }); - @panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it."); - } - } - - buffer.items.len += amount.result; - - const keep_reading = onReadChunk(this, buf.slice()[0..amount.result], hasMore); - if (!keep_reading) { - close(this); + switch (hasMore) { + .eof => { + // we call report EOF and close + _ = onReadChunk(this, slice, hasMore); + close(this); + }, + .drained => { + // we call drained so we know if we should stop here + const keep_reading = onReadChunk(this, slice, hasMore); + if (!keep_reading) { + close(this); + } + }, + else => { + var buffer = getBuffer(this); + if (comptime bun.Environment.allow_assert) { + if (slice.len > 0 and !bun.isSliceInBuffer(slice, buffer.allocatedSlice())) { + @panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it."); + } + } + // move cursor foward + buffer.items.len += amount.result; + const keep_reading = onReadChunk(this, slice, hasMore); + if (!keep_reading) { + close(this); + } + }, } } @@ -804,6 +852,7 @@ pub const WindowsBufferedReader = struct { flags: Flags = .{}, has_inflight_read: bool = false, + is_paused: bool = true, parent: *anyopaque = undefined, vtable: WindowsOutputReaderVTable = undefined, ref_count: u32 = 1, @@ -939,14 +988,11 @@ pub const WindowsBufferedReader = struct { this.has_inflight_read = true; this._buffer.ensureUnusedCapacity(suggested_size) catch bun.outOfMemory(); const res = this._buffer.allocatedSlice()[this._buffer.items.len..]; - std.debug.print("getReadBufferWithStableMemoryAddress({d}) = {d}\n", .{ suggested_size, res.len }); return res; } pub fn startWithCurrentPipe(this: *WindowsOutputReader) bun.JSC.Maybe(void) { std.debug.assert(this.source != null); - - std.debug.print("clearRetainingCapacity\n", .{}); this.buffer().clearRetainingCapacity(); this.flags.is_done = false; this.unpause(); diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index bbc75171a4..0e67a7aa71 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -721,7 +721,7 @@ fn BaseWindowsPipeWriter( this.is_done = true; if (this.source) |source| { if (source == .file) { - uv.uv_fs_req_cleanup(&source.file.fs); + source.file.fs.deinit(); // TODO: handle this error instead of ignoring it _ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&WindowsPipeWriter.onCloseSource)); return;