diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 04342020bb..b9121328a3 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -6,598 +6,8 @@ const Source = @import("./source.zig").Source; const ReadState = @import("./pipes.zig").ReadState; const FileType = @import("./pipes.zig").FileType; -/// Read a blocking pipe without blocking the current thread. -pub fn PosixPipeReader( - comptime This: type, - comptime vtable: struct { - getFd: *const fn (*This) bun.FileDescriptor, - getBuffer: *const fn (*This) *std.ArrayList(u8), - getFileType: *const fn (*This) FileType, - onReadChunk: ?*const fn (*This, chunk: []u8, state: ReadState) void = null, - registerPoll: ?*const fn (*This) void = null, - done: *const fn (*This) void, - close: *const fn (*This) void, - onError: *const fn (*This, bun.sys.Error) void, - }, -) type { - return struct { - pub fn read(this: *This) void { - const buffer = vtable.getBuffer(this); - const fd = vtable.getFd(this); - - switch (vtable.getFileType(this)) { - .nonblocking_pipe => { - readPipe(this, buffer, fd, 0, false); - return; - }, - .file => { - readFile(this, buffer, fd, 0, false); - return; - }, - .socket => { - readSocket(this, buffer, fd, 0, false); - return; - }, - .pipe => { - switch (bun.isReadable(fd)) { - .ready => { - readFromBlockingPipeWithoutBlocking(this, buffer, fd, 0, false); - }, - .hup => { - readFromBlockingPipeWithoutBlocking(this, buffer, fd, 0, true); - }, - .not_ready => { - if (comptime vtable.registerPoll) |register| { - register(this); - } - }, - } - }, - } - } - - pub fn onPoll(parent: *This, size_hint: isize, received_hup: bool) void { - const resizable_buffer = vtable.getBuffer(parent); - const fd = vtable.getFd(parent); - bun.sys.syslog("onPoll({}) = {d}", .{ fd, size_hint }); - - switch (vtable.getFileType(parent)) { - .nonblocking_pipe => { - readPipe(parent, resizable_buffer, fd, size_hint, received_hup); - }, - .file => { - readFile(parent, resizable_buffer, fd, size_hint, received_hup); - }, - .socket => { - readSocket(parent, resizable_buffer, fd, size_hint, received_hup); - }, - .pipe => { - readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint, received_hup); - }, - } - } - - const stack_buffer_len = 64 * 1024; - - inline fn drainChunk(parent: *This, chunk: []const u8, hasMore: ReadState) bool { - if (parent.vtable.isStreamingEnabled()) { - if (chunk.len > 0) { - return parent.vtable.onReadChunk(chunk, hasMore); - } - } - - return false; - } - - fn wrapReadFn(comptime func: *const fn (bun.FileDescriptor, []u8) JSC.Maybe(usize)) *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize) { - return struct { - pub fn call(fd: bun.FileDescriptor, buffer: []u8, offset: usize) JSC.Maybe(usize) { - _ = offset; - return func(fd, buffer); - } - }.call; - } - - fn readFile(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - const preadFn = struct { - pub fn call(fd1: bun.FileDescriptor, buffer: []u8, offset: usize) JSC.Maybe(usize) { - return bun.sys.pread(fd1, buffer, @intCast(offset)); - } - }.call; - if (parent.flags.use_pread) { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, preadFn); - } else { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, wrapReadFn(bun.sys.read)); - } - } - - fn readSocket(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .socket, wrapReadFn(bun.sys.recvNonBlock)); - } - - fn readPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, wrapReadFn(bun.sys.readNonblocking)); - } - - fn readBlockingPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, wrapReadFn(bun.sys.readNonblocking)); - } - - fn readWithFn(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void { - _ = size_hint; // autofix - const streaming = parent.vtable.isStreamingEnabled(); - - var received_hup = received_hup_; - - if (streaming) { - const stack_buffer = parent.vtable.eventLoop().pipeReadBuffer(); - while (resizable_buffer.capacity == 0) { - const stack_buffer_cutoff = stack_buffer.len / 2; - var stack_buffer_head = stack_buffer; - while (stack_buffer_head.len > 16 * 1024) { - var buffer = stack_buffer_head; - - switch (sys_fn( - fd, - buffer, - parent._offset, - )) { - .result => |bytes_read| { - parent._offset += bytes_read; - buffer = stack_buffer_head[0..bytes_read]; - stack_buffer_head = stack_buffer_head[bytes_read..]; - - if (bytes_read == 0) { - vtable.close(parent); - if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) - _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], .eof); - vtable.done(parent); - return; - } - - if (comptime file_type == .pipe) { - if (bun.Environment.isMac or !bun.C.RWFFlagSupport.isMaybeSupported()) { - switch (bun.isReadable(fd)) { - .ready => {}, - .hup => { - received_hup = true; - }, - .not_ready => { - if (received_hup) { - vtable.close(parent); - } - defer { - if (received_hup) { - vtable.done(parent); - } - } - if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) { - if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .drained)) { - return; - } - } - - if (!received_hup) { - if (comptime vtable.registerPoll) |register| { - register(parent); - } - } - - return; - }, - } - } - } - - if (comptime file_type != .pipe) { - // blocking pipes block a process, so we have to keep reading as much as we can - // otherwise, we do want to stream the data - if (stack_buffer_head.len < stack_buffer_cutoff) { - if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .progress)) { - return; - } - stack_buffer_head = stack_buffer; - } - } - }, - .err => |err| { - if (err.isRetry()) { - if (comptime file_type == .file) { - bun.Output.debugWarn("Received EAGAIN while reading from a file. This is a bug.", .{}); - } else { - if (comptime vtable.registerPoll) |register| { - register(parent); - } - } - - if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) - _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], .drained); - return; - } - - if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) - _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], .progress); - vtable.onError(parent, err); - return; - }, - } - } - - if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) { - if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .progress) and !received_hup) { - return; - } - } - - if (!parent.vtable.isStreamingEnabled()) break; - } - } - - while (true) { - resizable_buffer.ensureUnusedCapacity(16 * 1024) catch bun.outOfMemory(); - var buffer: []u8 = resizable_buffer.unusedCapacitySlice(); - - switch (sys_fn(fd, buffer, parent._offset)) { - .result => |bytes_read| { - parent._offset += bytes_read; - buffer = buffer[0..bytes_read]; - resizable_buffer.items.len += bytes_read; - - if (bytes_read == 0) { - vtable.close(parent); - _ = drainChunk(parent, resizable_buffer.items, .eof); - vtable.done(parent); - return; - } - - if (comptime file_type == .pipe) { - if (bun.Environment.isMac or !bun.C.RWFFlagSupport.isMaybeSupported()) { - switch (bun.isReadable(fd)) { - .ready => {}, - .hup => { - received_hup = true; - }, - .not_ready => { - if (received_hup) { - vtable.close(parent); - } - defer { - if (received_hup) { - vtable.done(parent); - } - } - - if (parent.vtable.isStreamingEnabled()) { - defer { - resizable_buffer.clearRetainingCapacity(); - } - if (!parent.vtable.onReadChunk(resizable_buffer.items, if (received_hup) .eof else .drained) and !received_hup) { - return; - } - } - - if (!received_hup) { - if (comptime vtable.registerPoll) |register| { - register(parent); - } - } - - return; - }, - } - } - } - - if (comptime file_type != .pipe) { - if (parent.vtable.isStreamingEnabled()) { - if (resizable_buffer.items.len > 128_000) { - defer { - resizable_buffer.clearRetainingCapacity(); - } - if (!parent.vtable.onReadChunk(resizable_buffer.items, .progress)) { - return; - } - - continue; - } - } - } - }, - .err => |err| { - if (parent.vtable.isStreamingEnabled()) { - if (resizable_buffer.items.len > 0) { - _ = parent.vtable.onReadChunk(resizable_buffer.items, .drained); - resizable_buffer.clearRetainingCapacity(); - } - } - - if (err.isRetry()) { - if (comptime file_type == .file) { - bun.Output.debugWarn("Received EAGAIN while reading from a file. This is a bug.", .{}); - } else { - if (comptime vtable.registerPoll) |register| { - register(parent); - } - } - return; - } - vtable.onError(parent, err); - return; - }, - } - } - } - - fn readFromBlockingPipeWithoutBlocking(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - if (parent.vtable.isStreamingEnabled()) { - resizable_buffer.clearRetainingCapacity(); - } - - readBlockingPipe(parent, resizable_buffer, fd, size_hint, received_hup); - } - }; -} - const PollOrFd = @import("./pipes.zig").PollOrFd; -pub fn WindowsPipeReader( - comptime This: type, - comptime _: anytype, - comptime getBuffer: fn (*This) *std.ArrayList(u8), - comptime onReadChunk: fn (*This, chunk: []u8, ReadState) bool, - comptime registerPoll: ?fn (*This) void, - comptime done: fn (*This) void, - comptime onError: fn (*This, bun.sys.Error) void, -) type { - return struct { - fn onStreamAlloc(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void { - var this = bun.cast(*This, handle.data); - const result = this.getReadBufferWithStableMemoryAddress(suggested_size); - buf.* = uv.uv_buf_t.init(result); - } - - fn onStreamRead(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void { - var this = bun.cast(*This, stream.data); - - const nread_int = nread.int(); - - bun.sys.syslog("onStreamRead(0x{d}) = {d}", .{ @intFromPtr(this), nread_int }); - - // NOTE: pipes/tty need to call stopReading on errors (yeah) - switch (nread_int) { - 0 => { - // EAGAIN or EWOULDBLOCK or canceled (buf is not safe to access here) - return this.onRead(.{ .result = 0 }, "", .drained); - }, - uv.UV_EOF => { - _ = this.stopReading(); - // EOF (buf is not safe to access here) - return this.onRead(.{ .result = 0 }, "", .eof); - }, - else => { - if (nread.toError(.recv)) |err| { - _ = this.stopReading(); - // ERROR (buf is not safe to access here) - this.onRead(.{ .err = err }, "", .progress); - return; - } - // we got some data we can slice the buffer! - const len: usize = @intCast(nread_int); - var slice = buf.slice(); - this.onRead(.{ .result = len }, slice[0..len], .progress); - }, - } - } - - fn onFileRead(fs: *uv.fs_t) callconv(.C) void { - const result = fs.result; - const nread_int = result.int(); - bun.sys.syslog("onFileRead({}) = {d}", .{ bun.toFD(fs.file.fd), nread_int }); - if (nread_int == uv.UV_ECANCELED) { - fs.deinit(); - return; - } - var this: *This = bun.cast(*This, fs.data); - fs.deinit(); - if (this.flags.is_done) return; - - switch (nread_int) { - // 0 actually means EOF too - 0, uv.UV_EOF => { - this.flags.is_paused = true; - this.onRead(.{ .result = 0 }, "", .eof); - }, - // UV_ECANCELED needs to be on the top so we avoid UAF - uv.UV_ECANCELED => unreachable, - else => { - if (result.toError(.read)) |err| { - this.flags.is_paused = true; - this.onRead(.{ .err = err }, "", .progress); - return; - } - defer { - // if we are not paused we keep reading until EOF or err - if (!this.flags.is_paused) { - if (this.source) |source| { - if (source == .file) { - const file = source.file; - source.setData(this); - const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); - file.iov = uv.uv_buf_t.init(buf); - if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| { - this.flags.is_paused = true; - // we should inform the error if we are unable to keep reading - this.onRead(.{ .err = err }, "", .progress); - } - } - } - } - } - - const len: usize = @intCast(nread_int); - this._offset += len; - // we got some data lets get the current iov - if (this.source) |source| { - if (source == .file) { - var buf = source.file.iov.slice(); - return this.onRead(.{ .result = len }, buf[0..len], .progress); - } - } - // ops we should not hit this lets fail with EPIPE - bun.assert(false); - return this.onRead(.{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .read) }, "", .progress); - }, - } - } - - pub fn startReading(this: *This) bun.JSC.Maybe(void) { - if (this.flags.is_done or !this.flags.is_paused) return .{ .result = {} }; - this.flags.is_paused = false; - const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.BADF, .read) }; - bun.assert(!source.isClosed()); - - switch (source) { - .file => |file| { - file.fs.deinit(); - source.setData(this); - const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); - file.iov = uv.uv_buf_t.init(buf); - if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| { - return .{ .err = err }; - } - }, - else => { - if (uv.uv_read_start(source.toStream(), &onStreamAlloc, @ptrCast(&onStreamRead)).toError(.open)) |err| { - bun.windows.libuv.log("uv_read_start() = {s}", .{err.name()}); - return .{ .err = err }; - } - }, - } - - return .{ .result = {} }; - } - - pub fn stopReading(this: *This) bun.JSC.Maybe(void) { - if (this.flags.is_done or this.flags.is_paused) return .{ .result = {} }; - this.flags.is_paused = true; - const source = this.source orelse return .{ .result = {} }; - switch (source) { - .file => |file| { - file.fs.cancel(); - }, - else => { - source.toStream().readStop(); - }, - } - return .{ .result = {} }; - } - - pub fn closeImpl(this: *This, comptime callDone: bool) void { - if (this.source) |source| { - switch (source) { - .sync_file, .file => |file| { - if (!this.flags.is_paused) { - // always cancel the current one - file.fs.cancel(); - this.flags.is_paused = true; - } - // always use close_fs here because we can have a operation in progress - file.close_fs.data = file; - _ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, onFileClose); - }, - .pipe => |pipe| { - pipe.data = pipe; - pipe.close(onPipeClose); - }, - .tty => |tty| { - if (tty == &Source.stdin_tty) { - Source.stdin_tty = undefined; - Source.stdin_tty_init = false; - } - - tty.data = tty; - tty.close(onTTYClose); - }, - } - this.source = null; - if (comptime callDone) done(this); - } - } - - pub fn close(this: *This) void { - _ = this.stopReading(); - this.closeImpl(true); - } - - const vtable = .{ - .getBuffer = getBuffer, - .registerPoll = registerPoll, - .done = done, - .onError = onError, - }; - - fn onFileClose(handle: *uv.fs_t) callconv(.C) void { - const file = bun.cast(*Source.File, handle.data); - handle.deinit(); - bun.default_allocator.destroy(file); - } - - fn onPipeClose(handle: *uv.Pipe) callconv(.C) void { - const this = bun.cast(*uv.Pipe, handle.data); - bun.default_allocator.destroy(this); - } - - fn onTTYClose(handle: *uv.uv_tty_t) callconv(.C) void { - const this = bun.cast(*uv.uv_tty_t, handle.data); - bun.default_allocator.destroy(this); - } - - pub fn onRead(this: *This, amount: bun.JSC.Maybe(usize), slice: []u8, hasMore: ReadState) void { - if (amount == .err) { - onError(this, amount.err); - return; - } - - switch (hasMore) { - .eof => { - // we call report EOF and close - _ = onReadChunk(this, slice, hasMore); - close(this); - }, - .drained => { - // we call drained so we know if we should stop here - _ = onReadChunk(this, slice, hasMore); - }, - else => { - var buffer = getBuffer(this); - if (comptime bun.Environment.allow_assert) { - if (slice.len > 0 and !bun.isSliceInBuffer(slice, buffer.allocatedSlice())) { - @panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it."); - } - } - // move cursor foward - buffer.items.len += amount.result; - _ = onReadChunk(this, slice, hasMore); - }, - } - } - - pub fn pause(this: *This) void { - _ = this.stopReading(); - } - - pub fn unpause(this: *This) void { - _ = this.startReading(); - } - - pub fn read(this: *This) void { - // we cannot sync read pipes on Windows so we just check if we are paused to resume the reading - this.unpause(); - } - }; -} - -pub const PipeReader = if (bun.Environment.isWindows) WindowsPipeReader else PosixPipeReader; const Async = bun.Async; // This is a runtime type instead of comptime due to bugs in Zig. @@ -620,23 +30,29 @@ const BufferedReaderVTable = struct { eventLoop: *const fn (*anyopaque) JSC.EventLoopHandle, pub fn init(comptime Type: type) *const BufferedReaderVTable.Fn { - const loop_fn = &struct { - pub fn loop_fn(this: *anyopaque) *Async.Loop { - return Type.loop(@alignCast(@ptrCast(this))); + const fns = struct { + fn onReadChunk(this: *anyopaque, chunk: []const u8, hasMore: ReadState) bool { + return Type.onReadChunk(@as(*Type, @alignCast(@ptrCast(this))), chunk, hasMore); } - }.loop_fn; - - const eventLoop_fn = &struct { - pub fn eventLoop_fn(this: *anyopaque) JSC.EventLoopHandle { - return JSC.EventLoopHandle.init(Type.eventLoop(@alignCast(@ptrCast(this)))); + fn onReaderDone(this: *anyopaque) void { + return Type.onReaderDone(@as(*Type, @alignCast(@ptrCast(this)))); } - }.eventLoop_fn; + fn onReaderError(this: *anyopaque, err: bun.sys.Error) void { + return Type.onReaderError(@as(*Type, @alignCast(@ptrCast(this))), err); + } + fn eventLoop(this: *anyopaque) JSC.EventLoopHandle { + return JSC.EventLoopHandle.init(Type.eventLoop(@as(*Type, @alignCast(@ptrCast(this))))); + } + fn loop(this: *anyopaque) *Async.Loop { + return Type.loop(@as(*Type, @alignCast(@ptrCast(this)))); + } + }; return comptime &BufferedReaderVTable.Fn{ - .onReadChunk = if (@hasDecl(Type, "onReadChunk")) @ptrCast(&Type.onReadChunk) else null, - .onReaderDone = @ptrCast(&Type.onReaderDone), - .onReaderError = @ptrCast(&Type.onReaderError), - .eventLoop = eventLoop_fn, - .loop = loop_fn, + .onReadChunk = if (@hasDecl(Type, "onReadChunk")) &fns.onReadChunk else null, + .onReaderDone = &fns.onReaderDone, + .onReaderError = &fns.onReaderError, + .eventLoop = &fns.eventLoop, + .loop = &fns.loop, }; } }; @@ -739,17 +155,6 @@ const PosixBufferedReader = struct { this.handle = .{ .fd = fd }; } - pub usingnamespace PosixPipeReader(@This(), .{ - .getFd = @ptrCast(&getFd), - .getBuffer = @ptrCast(&buffer), - .onReadChunk = @ptrCast(&_onReadChunk), - .registerPoll = @ptrCast(®isterPoll), - .done = @ptrCast(&done), - .close = @ptrCast(&closeWithoutReporting), - .onError = @ptrCast(&onError), - .getFileType = @ptrCast(&getFileType), - }); - fn getFileType(this: *const PosixBufferedReader) FileType { const flags = this.flags; if (flags.socket) { @@ -804,7 +209,7 @@ const PosixBufferedReader = struct { } pub fn buffer(this: *PosixBufferedReader) *std.ArrayList(u8) { - return &@as(*PosixBufferedReader, @alignCast(@ptrCast(this)))._buffer; + return &this._buffer; } pub fn finalBuffer(this: *PosixBufferedReader) *std.ArrayList(u8) { @@ -943,6 +348,311 @@ const PosixBufferedReader = struct { return this.vtable.eventLoop(); } + pub fn read(this: *PosixBufferedReader) void { + const buf = this.buffer(); + const fd = this.getFd(); + + switch (this.getFileType()) { + .nonblocking_pipe => { + readPipe(this, buf, fd, 0, false); + return; + }, + .file => { + readFile(this, buf, fd, 0, false); + return; + }, + .socket => { + readSocket(this, buf, fd, 0, false); + return; + }, + .pipe => { + switch (bun.isReadable(fd)) { + .ready => { + readFromBlockingPipeWithoutBlocking(this, buf, fd, 0, false); + }, + .hup => { + readFromBlockingPipeWithoutBlocking(this, buf, fd, 0, true); + }, + .not_ready => { + this.registerPoll(); + }, + } + }, + } + } + + pub fn onPoll(parent: *PosixBufferedReader, size_hint: isize, received_hup: bool) void { + const resizable_buffer = parent.buffer(); + const fd = parent.getFd(); + bun.sys.syslog("onPoll({}) = {d}", .{ fd, size_hint }); + + switch (parent.getFileType()) { + .nonblocking_pipe => { + readPipe(parent, resizable_buffer, fd, size_hint, received_hup); + }, + .file => { + readFile(parent, resizable_buffer, fd, size_hint, received_hup); + }, + .socket => { + readSocket(parent, resizable_buffer, fd, size_hint, received_hup); + }, + .pipe => { + readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint, received_hup); + }, + } + } + + const stack_buffer_len = 64 * 1024; + + inline fn drainChunk(parent: *PosixBufferedReader, chunk: []const u8, hasMore: ReadState) bool { + if (parent.vtable.isStreamingEnabled()) { + if (chunk.len > 0) { + return parent.vtable.onReadChunk(chunk, hasMore); + } + } + + return false; + } + + fn wrapReadFn(comptime func: *const fn (bun.FileDescriptor, []u8) JSC.Maybe(usize)) *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize) { + return struct { + pub fn call(fd: bun.FileDescriptor, buf: []u8, offset: usize) JSC.Maybe(usize) { + _ = offset; + return func(fd, buf); + } + }.call; + } + + fn readFile(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { + const preadFn = struct { + pub fn call(fd1: bun.FileDescriptor, buf: []u8, offset: usize) JSC.Maybe(usize) { + return bun.sys.pread(fd1, buf, @intCast(offset)); + } + }.call; + if (parent.flags.use_pread) { + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, preadFn); + } else { + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, wrapReadFn(bun.sys.read)); + } + } + + fn readSocket(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .socket, wrapReadFn(bun.sys.recvNonBlock)); + } + + fn readPipe(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, wrapReadFn(bun.sys.readNonblocking)); + } + + fn readBlockingPipe(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, wrapReadFn(bun.sys.readNonblocking)); + } + + fn readWithFn(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void { + _ = size_hint; // autofix + const streaming = parent.vtable.isStreamingEnabled(); + + var received_hup = received_hup_; + + if (streaming) { + const stack_buffer = parent.vtable.eventLoop().pipeReadBuffer(); + while (resizable_buffer.capacity == 0) { + const stack_buffer_cutoff = stack_buffer.len / 2; + var stack_buffer_head = stack_buffer; + while (stack_buffer_head.len > 16 * 1024) { + var buf = stack_buffer_head; + + switch (sys_fn( + fd, + buf, + parent._offset, + )) { + .result => |bytes_read| { + parent._offset += bytes_read; + buf = stack_buffer_head[0..bytes_read]; + stack_buffer_head = stack_buffer_head[bytes_read..]; + + if (bytes_read == 0) { + parent.closeWithoutReporting(); + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) + _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], .eof); + parent.done(); + return; + } + + if (comptime file_type == .pipe) { + if (bun.Environment.isMac or !bun.C.RWFFlagSupport.isMaybeSupported()) { + switch (bun.isReadable(fd)) { + .ready => {}, + .hup => { + received_hup = true; + }, + .not_ready => { + if (received_hup) { + parent.closeWithoutReporting(); + } + defer { + if (received_hup) { + parent.done(); + } + } + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) { + if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .drained)) { + return; + } + } + + if (!received_hup) { + parent.registerPoll(); + } + + return; + }, + } + } + } + + if (comptime file_type != .pipe) { + // blocking pipes block a process, so we have to keep reading as much as we can + // otherwise, we do want to stream the data + if (stack_buffer_head.len < stack_buffer_cutoff) { + if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .progress)) { + return; + } + stack_buffer_head = stack_buffer; + } + } + }, + .err => |err| { + if (err.isRetry()) { + if (comptime file_type == .file) { + bun.Output.debugWarn("Received EAGAIN while reading from a file. This is a bug.", .{}); + } else { + parent.registerPoll(); + } + + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) + _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], .drained); + return; + } + + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) + _ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], .progress); + parent.onError(err); + return; + }, + } + } + + if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) { + if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .progress) and !received_hup) { + return; + } + } + + if (!parent.vtable.isStreamingEnabled()) break; + } + } + + while (true) { + resizable_buffer.ensureUnusedCapacity(16 * 1024) catch bun.outOfMemory(); + var buf: []u8 = resizable_buffer.unusedCapacitySlice(); + + switch (sys_fn(fd, buf, parent._offset)) { + .result => |bytes_read| { + parent._offset += bytes_read; + buf = buf[0..bytes_read]; + resizable_buffer.items.len += bytes_read; + + if (bytes_read == 0) { + parent.closeWithoutReporting(); + _ = drainChunk(parent, resizable_buffer.items, .eof); + parent.done(); + return; + } + + if (comptime file_type == .pipe) { + if (bun.Environment.isMac or !bun.C.RWFFlagSupport.isMaybeSupported()) { + switch (bun.isReadable(fd)) { + .ready => {}, + .hup => { + received_hup = true; + }, + .not_ready => { + if (received_hup) { + parent.closeWithoutReporting(); + } + defer { + if (received_hup) { + parent.done(); + } + } + + if (parent.vtable.isStreamingEnabled()) { + defer { + resizable_buffer.clearRetainingCapacity(); + } + if (!parent.vtable.onReadChunk(resizable_buffer.items, if (received_hup) .eof else .drained) and !received_hup) { + return; + } + } + + if (!received_hup) { + parent.registerPoll(); + } + + return; + }, + } + } + } + + if (comptime file_type != .pipe) { + if (parent.vtable.isStreamingEnabled()) { + if (resizable_buffer.items.len > 128_000) { + defer { + resizable_buffer.clearRetainingCapacity(); + } + if (!parent.vtable.onReadChunk(resizable_buffer.items, .progress)) { + return; + } + + continue; + } + } + } + }, + .err => |err| { + if (parent.vtable.isStreamingEnabled()) { + if (resizable_buffer.items.len > 0) { + _ = parent.vtable.onReadChunk(resizable_buffer.items, .drained); + resizable_buffer.clearRetainingCapacity(); + } + } + + if (err.isRetry()) { + if (comptime file_type == .file) { + bun.Output.debugWarn("Received EAGAIN while reading from a file. This is a bug.", .{}); + } else { + parent.registerPoll(); + } + return; + } + parent.onError(err); + return; + }, + } + } + } + + fn readFromBlockingPipeWithoutBlocking(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { + if (parent.vtable.isStreamingEnabled()) { + resizable_buffer.clearRetainingCapacity(); + } + + readBlockingPipe(parent, resizable_buffer, fd, size_hint, received_hup); + } + comptime { bun.meta.banFieldType(@This(), bool); // put them in flags instead. } @@ -994,11 +704,22 @@ pub const WindowsBufferedReader = struct { }; pub fn init(comptime Type: type) WindowsOutputReader { + const fns = struct { + fn onReadChunk(this: *anyopaque, chunk: []const u8, hasMore: ReadState) bool { + return Type.onReadChunk(@as(*Type, @alignCast(@ptrCast(this))), chunk, hasMore); + } + fn onReaderDone(this: *anyopaque) void { + return Type.onReaderDone(@as(*Type, @alignCast(@ptrCast(this)))); + } + fn onReaderError(this: *anyopaque, err: bun.sys.Error) void { + return Type.onReaderError(@as(*Type, @alignCast(@ptrCast(this))), err); + } + }; return .{ .vtable = .{ - .onReadChunk = if (@hasDecl(Type, "onReadChunk")) @ptrCast(&Type.onReadChunk) else null, - .onReaderDone = @ptrCast(&Type.onReaderDone), - .onReaderError = @ptrCast(&Type.onReaderError), + .onReadChunk = if (@hasDecl(Type, "onReadChunk")) &fns.onReadChunk else null, + .onReaderDone = &fns.onReaderDone, + .onReaderError = &fns.onReaderError, }, }; } @@ -1059,16 +780,6 @@ pub const WindowsBufferedReader = struct { this.updateRef(false); } - pub usingnamespace WindowsPipeReader( - @This(), - {}, - buffer, - _onReadChunk, - null, - done, - onError, - ); - pub fn takeBuffer(this: *WindowsOutputReader) std.ArrayList(u8) { const out = this._buffer; this._buffer = std.ArrayList(u8).init(out.allocator); @@ -1175,6 +886,245 @@ pub const WindowsBufferedReader = struct { return source.setRawMode(value); } + fn onStreamAlloc(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void { + var this = bun.cast(*WindowsBufferedReader, handle.data); + const result = this.getReadBufferWithStableMemoryAddress(suggested_size); + buf.* = uv.uv_buf_t.init(result); + } + + fn onStreamRead(handle: *uv.uv_handle_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void { + const stream = bun.cast(*uv.uv_stream_t, handle); + var this = bun.cast(*WindowsBufferedReader, stream.data); + + const nread_int = nread.int(); + + bun.sys.syslog("onStreamRead(0x{d}) = {d}", .{ @intFromPtr(this), nread_int }); + + // NOTE: pipes/tty need to call stopReading on errors (yeah) + switch (nread_int) { + 0 => { + // EAGAIN or EWOULDBLOCK or canceled (buf is not safe to access here) + return this.onRead(.{ .result = 0 }, "", .drained); + }, + uv.UV_EOF => { + _ = this.stopReading(); + // EOF (buf is not safe to access here) + return this.onRead(.{ .result = 0 }, "", .eof); + }, + else => { + if (nread.toError(.recv)) |err| { + _ = this.stopReading(); + // ERROR (buf is not safe to access here) + this.onRead(.{ .err = err }, "", .progress); + return; + } + // we got some data we can slice the buffer! + const len: usize = @intCast(nread_int); + var slice = buf.slice(); + this.onRead(.{ .result = len }, slice[0..len], .progress); + }, + } + } + + fn onFileRead(fs: *uv.fs_t) callconv(.C) void { + const result = fs.result; + const nread_int = result.int(); + bun.sys.syslog("onFileRead({}) = {d}", .{ bun.toFD(fs.file.fd), nread_int }); + if (nread_int == uv.UV_ECANCELED) { + fs.deinit(); + return; + } + var this: *WindowsBufferedReader = bun.cast(*WindowsBufferedReader, fs.data); + fs.deinit(); + if (this.flags.is_done) return; + + switch (nread_int) { + // 0 actually means EOF too + 0, uv.UV_EOF => { + this.flags.is_paused = true; + this.onRead(.{ .result = 0 }, "", .eof); + }, + // UV_ECANCELED needs to be on the top so we avoid UAF + uv.UV_ECANCELED => unreachable, + else => { + if (result.toError(.read)) |err| { + this.flags.is_paused = true; + this.onRead(.{ .err = err }, "", .progress); + return; + } + defer { + // if we are not paused we keep reading until EOF or err + if (!this.flags.is_paused) { + if (this.source) |source| { + if (source == .file) { + const file = source.file; + source.setData(this); + const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); + file.iov = uv.uv_buf_t.init(buf); + if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| { + this.flags.is_paused = true; + // we should inform the error if we are unable to keep reading + this.onRead(.{ .err = err }, "", .progress); + } + } + } + } + } + + const len: usize = @intCast(nread_int); + this._offset += len; + // we got some data lets get the current iov + if (this.source) |source| { + if (source == .file) { + var buf = source.file.iov.slice(); + return this.onRead(.{ .result = len }, buf[0..len], .progress); + } + } + // ops we should not hit this lets fail with EPIPE + bun.assert(false); + return this.onRead(.{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .read) }, "", .progress); + }, + } + } + + pub fn startReading(this: *WindowsBufferedReader) bun.JSC.Maybe(void) { + if (this.flags.is_done or !this.flags.is_paused) return .{ .result = {} }; + this.flags.is_paused = false; + const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.BADF, .read) }; + bun.assert(!source.isClosed()); + + switch (source) { + .file => |file| { + file.fs.deinit(); + source.setData(this); + const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); + file.iov = uv.uv_buf_t.init(buf); + if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| { + return .{ .err = err }; + } + }, + else => { + if (uv.uv_read_start(source.toStream(), &onStreamAlloc, &onStreamRead).toError(.open)) |err| { + bun.windows.libuv.log("uv_read_start() = {s}", .{err.name()}); + return .{ .err = err }; + } + }, + } + + return .{ .result = {} }; + } + + pub fn stopReading(this: *WindowsBufferedReader) bun.JSC.Maybe(void) { + if (this.flags.is_done or this.flags.is_paused) return .{ .result = {} }; + this.flags.is_paused = true; + const source = this.source orelse return .{ .result = {} }; + switch (source) { + .file => |file| { + file.fs.cancel(); + }, + else => { + source.toStream().readStop(); + }, + } + return .{ .result = {} }; + } + + pub fn closeImpl(this: *WindowsBufferedReader, comptime callDone: bool) void { + if (this.source) |source| { + switch (source) { + .sync_file, .file => |file| { + if (!this.flags.is_paused) { + // always cancel the current one + file.fs.cancel(); + this.flags.is_paused = true; + } + // always use close_fs here because we can have a operation in progress + file.close_fs.data = file; + _ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, onFileClose); + }, + .pipe => |pipe| { + pipe.data = pipe; + pipe.close(onPipeClose); + }, + .tty => |tty| { + if (tty == &Source.stdin_tty) { + Source.stdin_tty = undefined; + Source.stdin_tty_init = false; + } + + tty.data = tty; + tty.close(onTTYClose); + }, + } + this.source = null; + if (comptime callDone) this.done(); + } + } + + pub fn close(this: *WindowsBufferedReader) void { + _ = this.stopReading(); + this.closeImpl(true); + } + + fn onFileClose(handle: *uv.fs_t) callconv(.C) void { + const file = bun.cast(*Source.File, handle.data); + handle.deinit(); + bun.default_allocator.destroy(file); + } + + fn onPipeClose(handle: *uv.Pipe) callconv(.C) void { + const this = bun.cast(*uv.Pipe, handle.data); + bun.default_allocator.destroy(this); + } + + fn onTTYClose(handle: *uv.uv_tty_t) callconv(.C) void { + const this = bun.cast(*uv.uv_tty_t, handle.data); + bun.default_allocator.destroy(this); + } + + pub fn onRead(this: *WindowsBufferedReader, amount: bun.JSC.Maybe(usize), slice: []u8, hasMore: ReadState) void { + if (amount == .err) { + this.onError(amount.err); + return; + } + + switch (hasMore) { + .eof => { + // we call report EOF and close + _ = this._onReadChunk(slice, hasMore); + close(this); + }, + .drained => { + // we call drained so we know if we should stop here + _ = this._onReadChunk(slice, hasMore); + }, + else => { + var buf = this.buffer(); + if (comptime bun.Environment.allow_assert) { + if (slice.len > 0 and !bun.isSliceInBuffer(slice, buf.allocatedSlice())) { + @panic("uv_read_cb: buf is not in buffer! This is a bug in bun. Please report it."); + } + } + // move cursor foward + buf.items.len += amount.result; + _ = this._onReadChunk(slice, hasMore); + }, + } + } + + pub fn pause(this: *WindowsBufferedReader) void { + _ = this.stopReading(); + } + + pub fn unpause(this: *WindowsBufferedReader) void { + _ = this.startReading(); + } + + pub fn read(this: *WindowsBufferedReader) void { + // we cannot sync read pipes on Windows so we just check if we are paused to resume the reading + this.unpause(); + } + comptime { bun.meta.banFieldType(WindowsOutputReader, bool); // Don't increase the size of the struct. Put them in flags instead. } diff --git a/test/internal/ban-words.test.ts b/test/internal/ban-words.test.ts index f466ea4033..5d84fc16fb 100644 --- a/test/internal/ban-words.test.ts +++ b/test/internal/ban-words.test.ts @@ -28,7 +28,7 @@ const words: Record "== alloc.ptr": { reason: "The std.mem.Allocator context pointer can be undefined, which makes this comparison undefined behavior" }, "!= alloc.ptr": { reason: "The std.mem.Allocator context pointer can be undefined, which makes this comparison undefined behavior" }, [String.raw`: [a-zA-Z0-9_\.\*\?\[\]\(\)]+ = undefined,`]: { reason: "Do not default a struct field to undefined", limit: 246, regex: true }, - "usingnamespace": { reason: "This brings Bun away from incremental / faster compile times.", limit: 496 }, + "usingnamespace": { reason: "This brings Bun away from incremental / faster compile times.", limit: 494 }, }; const words_keys = [...Object.keys(words)];