some PipeReader fixes

This commit is contained in:
cirospaciari
2024-02-20 17:13:49 -03:00
parent 78291af8de
commit 30c4ba2fff
4 changed files with 107 additions and 115 deletions

View File

@@ -468,7 +468,7 @@ fn ReqMixin(comptime Type: type) type {
uv_req_set_data(@ptrCast(handle), ptr);
}
pub fn cancel(this: *Type) void {
uv_cancel(@ptrCast(this));
_ = uv_cancel(@ptrCast(this));
}
};
}
@@ -1712,6 +1712,7 @@ pub const fs_t = extern struct {
sys_errno_: DWORD,
file: union_unnamed_450,
fs: union_unnamed_451,
pub usingnamespace ReqMixin(@This());
pub inline fn deinit(this: *fs_t) void {
this.assert();
@@ -2723,63 +2724,6 @@ pub const ReturnCodeI64 = enum(i64) {
pub const addrinfo = std.os.windows.ws2_32.addrinfo;
pub fn StreamReaderMixin(comptime Type: type, comptime pipe_field_name: std.meta.FieldEnum(Type)) type {
return struct {
fn uv_alloc_cb(pipe: *uv_stream_t, suggested_size: usize, buf: *uv_buf_t) callconv(.C) void {
var this = bun.cast(*Type, pipe.data);
const result = this.getReadBufferWithStableMemoryAddress(suggested_size);
buf.* = uv_buf_t.init(result);
}
fn uv_read_cb(pipe: *uv_stream_t, nread: ReturnCodeI64, buf: *const uv_buf_t) callconv(.C) void {
var this = bun.cast(*Type, pipe.data);
const read = nread.int();
switch (read) {
0 => {
// EAGAIN or EWOULDBLOCK
return this.onRead(.{ .result = 0 }, buf, .drained);
},
UV_EOF => {
// EOF
return this.onRead(.{ .result = 0 }, buf, .eof);
},
else => {
this.onRead(if (nread.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(read) }, buf, .progress);
},
}
}
fn __get_pipe(this: *Type) ?*uv_stream_t {
switch (@TypeOf(@field(this, @tagName(pipe_field_name)))) {
?*Pipe, ?*uv_tcp_t, ?*uv_tty_t => return if (@field(this, @tagName(pipe_field_name))) |ptr| @ptrCast(ptr) else null,
*Pipe, *uv_tcp_t, *uv_tty_t => return @ptrCast(@field(this, @tagName(pipe_field_name))),
Pipe, uv_tcp_t, uv_tty_t => return @ptrCast(&@field(this, @tagName(pipe_field_name))),
else => @compileError("StreamWriterMixin only works with Pipe, uv_tcp_t, uv_tty_t"),
}
}
pub fn startReading(this: *Type) Maybe(void) {
const pipe = __get_pipe(this) orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe) };
//TODO: change to pipe.readStart
if (uv_read_start(pipe, @ptrCast(&@This().uv_alloc_cb), @ptrCast(&@This().uv_read_cb)).toError(.open)) |err| {
return .{ .err = err };
}
return .{ .result = {} };
}
pub fn stopReading(this: *Type) Maybe(void) {
const pipe = __get_pipe(this) orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe) };
pipe.readStop();
return .{ .result = {} };
}
};
}
// https://docs.libuv.org/en/v1.x/stream.html
fn StreamMixin(comptime Type: type) type {
return struct {

View File

@@ -34,13 +34,15 @@ fn numberToHandle(handle: FDImpl.SystemAsInt) FDImpl.System {
pub fn uv_get_osfhandle(in: c_int) libuv.uv_os_fd_t {
const out = libuv.uv_get_osfhandle(in);
log("uv_get_osfhandle({d}) = {d}", .{ in, @intFromPtr(out) });
// TODO: this is causing a dead lock because is also used on fd format
// log("uv_get_osfhandle({d}) = {d}", .{ in, @intFromPtr(out) });
return out;
}
pub fn uv_open_osfhandle(in: libuv.uv_os_fd_t) c_int {
const out = libuv.uv_open_osfhandle(in);
log("uv_open_osfhandle({d}) = {d}", .{ @intFromPtr(in), out });
// TODO: this is causing a dead lock because is also used on fd format
// log("uv_open_osfhandle({d}) = {d}", .{ @intFromPtr(in), out });
return out;
}

View File

@@ -329,66 +329,100 @@ pub fn WindowsPipeReader(
comptime onError: fn (*This, bun.sys.Error) void,
) type {
return struct {
fn uv_alloc_cb(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void {
fn onStreamAlloc(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void {
var this = bun.cast(*This, handle.data);
const result = this.getReadBufferWithStableMemoryAddress(suggested_size);
buf.* = uv.uv_buf_t.init(result);
}
fn uv_stream_read_cb(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void {
fn onStreamRead(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void {
var this = bun.cast(*This, stream.data);
const nread_int = nread.int();
switch (nread_int) {
0 => {
// EAGAIN or EWOULDBLOCK or canceled
return this.onRead(.{ .result = 0 }, buf, .drained);
// EAGAIN or EWOULDBLOCK or canceled (buf is not safe to access here)
return this.onRead(.{ .result = 0 }, "", .drained);
},
uv.UV_EOF => {
// EOF
return this.onRead(.{ .result = 0 }, buf, .eof);
// EOF (buf is not safe to access here)
return this.onRead(.{ .result = 0 }, "", .eof);
},
else => {
this.onRead(if (nread.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress);
if (nread.toError(.recv)) |err| {
// ERROR (buf is not safe to access here)
this.onRead(.{ .err = err }, "", .progress);
return;
}
// we got some data we can slice the buffer!
const len: usize = @intCast(nread_int);
var slice = buf.slice();
this.onRead(.{ .result = len }, slice[0..len], .progress);
},
}
}
fn uv_file_read_cb(fs: *uv.fs_t) callconv(.C) void {
fn onFileRead(fs: *uv.fs_t) callconv(.C) void {
var this: *This = bun.cast(*This, fs.data);
const nread_int = fs.result.int();
const buf = &this.*.source.?.file.iov;
switch (nread_int) {
0, uv.UV_ECANCELED =>
// EAGAIN or EWOULDBLOCK or canceled
this.onRead(.{ .result = 0 }, buf, .drained),
uv.UV_EOF =>
// EOF
this.onRead(.{ .result = 0 }, buf, .eof),
else => this.onRead(if (fs.result.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress),
// EAGAIN or EWOULDBLOCK
0 => {
// continue reading
if (!this.is_paused) {
_ = this.startReading();
}
},
uv.UV_ECANCELED => {
this.onRead(.{ .result = 0 }, "", .drained);
},
uv.UV_EOF => {
this.onRead(.{ .result = 0 }, "", .eof);
},
else => {
if (fs.result.toError(.recv)) |err| {
this.onRead(.{ .err = err }, "", .progress);
return;
}
// continue reading
defer {
if (!this.is_paused) {
_ = this.startReading();
}
}
const len: usize = @intCast(nread_int);
// we got some data lets get the current iov
if (this.*.source) |source| {
if (source == .file) {
var buf = source.file.iov.slice();
return this.onRead(.{ .result = len }, buf[0..len], .progress);
}
}
// ops we should not hit this lets fail with EPIPE
std.debug.assert(false);
return this.onRead(.{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .read) }, "", .progress);
},
}
uv.uv_fs_req_cleanup(fs);
}
pub fn startReading(this: *This) bun.JSC.Maybe(void) {
if (!this.is_paused) return .{ .result = {} };
this.is_paused = false;
const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.BADF, .read) };
switch (source) {
.file => |file| {
if (file.iov.len == 0) {
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
std.debug.assert(file.iov.len > 0);
}
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, uv_file_read_cb).toError(.write)) |err| {
file.fs.deinit();
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
return .{ .err = err };
}
},
else => {
if (uv.uv_read_start(source.toStream(), &uv_alloc_cb, @ptrCast(&uv_stream_read_cb)).toError(.open)) |err| {
if (uv.uv_read_start(source.toStream(), &onStreamAlloc, @ptrCast(&onStreamRead)).toError(.open)) |err| {
return .{ .err = err };
}
},
@@ -398,14 +432,15 @@ pub fn WindowsPipeReader(
}
pub fn stopReading(this: *This) bun.JSC.Maybe(void) {
if (this.is_paused) return .{ .result = {} };
this.is_paused = true;
const source = this.source orelse return .{ .result = {} };
switch (source) {
.file => |file| {
_ = uv.uv_cancel(@ptrCast(&file.fs));
file.fs.cancel();
},
else => {
// can be safely ignored as per libuv documentation
_ = uv.uv_read_stop(source.toStream());
source.toStream().readStop();
},
}
return .{ .result = {} };
@@ -414,6 +449,12 @@ pub fn WindowsPipeReader(
pub fn close(this: *This) void {
_ = this.stopReading();
if (this.source) |source| {
if (source == .file) {
source.file.fs.deinit();
// TODO: handle this error instead of ignoring it
_ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&onCloseSource));
return;
}
source.getHandle().close(onCloseSource);
}
}
@@ -428,38 +469,45 @@ pub fn WindowsPipeReader(
fn onCloseSource(handle: *uv.Handle) callconv(.C) void {
const this = bun.cast(*This, handle.data);
switch (this.source.?) {
.file => |file| uv.uv_fs_req_cleanup(&file.fs),
.file => |file| file.fs.deinit(),
else => {},
}
done(this);
}
pub fn onRead(this: *This, amount: bun.JSC.Maybe(usize), buf: *const uv.uv_buf_t, hasMore: ReadState) void {
pub fn onRead(this: *This, amount: bun.JSC.Maybe(usize), slice: []u8, hasMore: ReadState) void {
if (amount == .err) {
onError(this, amount.err);
return;
}
if (hasMore == .eof) {
_ = onReadChunk(this, "", hasMore);
close(this);
return;
}
var buffer = getBuffer(this);
if (comptime bun.Environment.allow_assert) {
if (!bun.isSliceInBuffer(buf.slice()[0..amount.result], buffer.allocatedSlice())) {
std.debug.print("buf len: {d}, buffer ln: {d}\n", .{ buf.slice().len, buffer.allocatedSlice().len });
@panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it.");
}
}
buffer.items.len += amount.result;
const keep_reading = onReadChunk(this, buf.slice()[0..amount.result], hasMore);
if (!keep_reading) {
close(this);
switch (hasMore) {
.eof => {
// we call report EOF and close
_ = onReadChunk(this, slice, hasMore);
close(this);
},
.drained => {
// we call drained so we know if we should stop here
const keep_reading = onReadChunk(this, slice, hasMore);
if (!keep_reading) {
close(this);
}
},
else => {
var buffer = getBuffer(this);
if (comptime bun.Environment.allow_assert) {
if (slice.len > 0 and !bun.isSliceInBuffer(slice, buffer.allocatedSlice())) {
@panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it.");
}
}
// move cursor foward
buffer.items.len += amount.result;
const keep_reading = onReadChunk(this, slice, hasMore);
if (!keep_reading) {
close(this);
}
},
}
}
@@ -804,6 +852,7 @@ pub const WindowsBufferedReader = struct {
flags: Flags = .{},
has_inflight_read: bool = false,
is_paused: bool = true,
parent: *anyopaque = undefined,
vtable: WindowsOutputReaderVTable = undefined,
ref_count: u32 = 1,
@@ -939,14 +988,11 @@ pub const WindowsBufferedReader = struct {
this.has_inflight_read = true;
this._buffer.ensureUnusedCapacity(suggested_size) catch bun.outOfMemory();
const res = this._buffer.allocatedSlice()[this._buffer.items.len..];
std.debug.print("getReadBufferWithStableMemoryAddress({d}) = {d}\n", .{ suggested_size, res.len });
return res;
}
pub fn startWithCurrentPipe(this: *WindowsOutputReader) bun.JSC.Maybe(void) {
std.debug.assert(this.source != null);
std.debug.print("clearRetainingCapacity\n", .{});
this.buffer().clearRetainingCapacity();
this.flags.is_done = false;
this.unpause();

View File

@@ -721,7 +721,7 @@ fn BaseWindowsPipeWriter(
this.is_done = true;
if (this.source) |source| {
if (source == .file) {
uv.uv_fs_req_cleanup(&source.file.fs);
source.file.fs.deinit();
// TODO: handle this error instead of ignoring it
_ = uv.uv_fs_close(uv.Loop.get(), &source.file.fs, source.file.file, @ptrCast(&WindowsPipeWriter.onCloseSource));
return;