From d4db29c1645bcbecefb36ea194fdfd732d9ba932 Mon Sep 17 00:00:00 2001 From: Georgijs <48869301+gvilums@users.noreply.github.com> Date: Tue, 7 May 2024 16:05:48 -0700 Subject: [PATCH] Fix fd offset handling in ReadStream (#10883) Co-authored-by: Georgijs Vilums <=> Co-authored-by: gvilums Co-authored-by: Jarred Sumner --- src/bun.js/webcore.zig | 4 +-- src/bun.js/webcore/blob.zig | 14 +++++++++ src/bun.js/webcore/streams.zig | 52 ++++++++++++++++++++++++++++--- src/io/PipeReader.zig | 57 +++++++++++++++++++++++++++++----- src/js/node/fs.ts | 53 ++++++++----------------------- test/js/node/fs/fs.test.ts | 18 +++++++++++ 6 files changed, 143 insertions(+), 55 deletions(-) diff --git a/src/bun.js/webcore.zig b/src/bun.js/webcore.zig index be2c26a022..dba5be0478 100644 --- a/src/bun.js/webcore.zig +++ b/src/bun.js/webcore.zig @@ -127,7 +127,7 @@ fn confirm(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callcon // They may have said yes, but the stdin is invalid. return .false; }; - if(next_byte == '\n'){ + if (next_byte == '\n') { return .false; } }, @@ -142,7 +142,7 @@ fn confirm(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callcon // 8. If the user responded positively, return true; // otherwise, the user responded negatively: return false. return .true; - }else if(next_byte == '\r'){ + } else if (next_byte == '\r') { //Check Windows style const second_byte = reader.readByte() catch { return .false; diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig index 8adc5a87ad..bfd8e51970 100644 --- a/src/bun.js/webcore/blob.zig +++ b/src/bun.js/webcore/blob.zig @@ -2829,6 +2829,20 @@ pub const Blob = struct { return stream; } + pub fn toStreamWithOffset( + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSC.JSValue { + const this = callframe.this().as(Blob) orelse @panic("this is not a Blob"); + const args = callframe.arguments(1).slice(); + + return JSC.WebCore.ReadableStream.fromFileBlobWithOffset( + globalThis, + this, + @intCast(args[0].toInt64()), + ); + } + fn promisified( value: JSC.JSValue, global: *JSGlobalObject, diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index a142b9919f..21f8e15f4a 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -348,6 +348,38 @@ pub const ReadableStream = struct { } } + pub fn fromFileBlobWithOffset( + globalThis: *JSGlobalObject, + blob: *const Blob, + offset: usize, + ) JSC.JSValue { + JSC.markBinding(@src()); + var store = blob.store orelse { + return ReadableStream.empty(globalThis); + }; + switch (store.data) { + .file => { + var reader = FileReader.Source.new(.{ + .globalThis = globalThis, + .context = .{ + .event_loop = JSC.EventLoopHandle.init(globalThis.bunVM().eventLoop()), + .start_offset = offset, + .lazy = .{ + .blob = store, + }, + }, + }); + store.ref(); + + return reader.toReadableStream(globalThis); + }, + else => { + globalThis.throw("Expected FileBlob", .{}); + return .zero; + }, + } + } + pub fn fromPipe( globalThis: *JSGlobalObject, parent: anytype, @@ -3415,6 +3447,7 @@ pub const FileReader = struct { pending_value: JSC.Strong = .{}, pending_view: []u8 = &.{}, fd: bun.FileDescriptor = bun.invalid_fd, + start_offset: ?usize = null, started: bool = false, waiting_for_onReaderDone: bool = false, event_loop: JSC.EventLoopHandle, @@ -3606,11 +3639,20 @@ pub const FileReader = struct { if (was_lazy) { _ = this.parent().incrementCount(); this.waiting_for_onReaderDone = true; - switch (this.reader.start(this.fd, pollable)) { - .result => {}, - .err => |e| { - return .{ .err = e }; - }, + if (this.start_offset) |offset| { + switch (this.reader.startFileOffset(this.fd, pollable, offset)) { + .result => {}, + .err => |e| { + return .{ .err = e }; + }, + } + } else { + switch (this.reader.start(this.fd, pollable)) { + .result => {}, + .err => |e| { + return .{ .err = e }; + }, + } } } else if (comptime Environment.isPosix) { if (this.reader.flags.pollable and !this.reader.isDone()) { diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index c5553599cb..40f047f99e 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -89,23 +89,41 @@ pub fn PosixPipeReader( return false; } + fn wrapReadFn(comptime func: *const fn (bun.FileDescriptor, []u8) JSC.Maybe(usize)) *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize) { + return struct { + pub fn call(fd: bun.FileDescriptor, buffer: []u8, offset: usize) JSC.Maybe(usize) { + _ = offset; + return func(fd, buffer); + } + }.call; + } + fn readFile(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, bun.sys.read); + const preadFn = struct { + pub fn call(fd1: bun.FileDescriptor, buffer: []u8, offset: usize) JSC.Maybe(usize) { + return bun.sys.pread(fd1, buffer, @intCast(offset)); + } + }.call; + if (parent.flags.use_pread) { + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, preadFn); + } else { + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, wrapReadFn(bun.sys.read)); + } } fn readSocket(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .socket, bun.sys.recvNonBlock); + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .socket, wrapReadFn(bun.sys.recvNonBlock)); } fn readPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, bun.sys.readNonblocking); + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, wrapReadFn(bun.sys.readNonblocking)); } fn readBlockingPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, bun.sys.readNonblocking); + return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, wrapReadFn(bun.sys.readNonblocking)); } - fn readWithFn(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8) JSC.Maybe(usize)) void { + fn readWithFn(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void { _ = size_hint; // autofix const streaming = parent.vtable.isStreamingEnabled(); @@ -122,8 +140,10 @@ pub fn PosixPipeReader( switch (sys_fn( fd, buffer, + parent._offset, )) { .result => |bytes_read| { + parent._offset += bytes_read; buffer = stack_buffer_head[0..bytes_read]; stack_buffer_head = stack_buffer_head[bytes_read..]; @@ -217,8 +237,9 @@ pub fn PosixPipeReader( resizable_buffer.ensureUnusedCapacity(16 * 1024) catch bun.outOfMemory(); var buffer: []u8 = resizable_buffer.unusedCapacitySlice(); - switch (sys_fn(fd, buffer)) { + switch (sys_fn(fd, buffer, parent._offset)) { .result => |bytes_read| { + parent._offset += bytes_read; buffer = buffer[0..bytes_read]; resizable_buffer.items.len += bytes_read; @@ -402,7 +423,7 @@ pub fn WindowsPipeReader( source.setData(this); const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); file.iov = uv.uv_buf_t.init(buf); - if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| { + if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| { this.flags.is_paused = true; // we should inform the error if we are unable to keep reading this.onRead(.{ .err = err }, "", .progress); @@ -413,6 +434,7 @@ pub fn WindowsPipeReader( } const len: usize = @intCast(nread_int); + this._offset += len; // we got some data lets get the current iov if (this.source) |source| { if (source == .file) { @@ -439,7 +461,7 @@ pub fn WindowsPipeReader( source.setData(this); const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024); file.iov = uv.uv_buf_t.init(buf); - if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| { + if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| { return .{ .err = err }; } }, @@ -650,6 +672,7 @@ const BufferedReaderVTable = struct { const PosixBufferedReader = struct { handle: PollOrFd = .{ .closed = {} }, _buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), + _offset: usize = 0, vtable: BufferedReaderVTable, flags: Flags = .{}, @@ -662,6 +685,7 @@ const PosixBufferedReader = struct { closed_without_reporting: bool = false, close_handle: bool = true, memfd: bool = false, + use_pread: bool = false, }; pub fn init(comptime Type: type) PosixBufferedReader { @@ -683,6 +707,7 @@ const PosixBufferedReader = struct { to.* = .{ .handle = other.handle, ._buffer = other.buffer().*, + ._offset = other._offset, .flags = other.flags, .vtable = .{ .fns = to.vtable.fns, @@ -692,6 +717,7 @@ const PosixBufferedReader = struct { other.buffer().* = std.ArrayList(u8).init(bun.default_allocator); other.flags.is_done = true; other.handle = .{ .closed = {} }; + other._offset = 0; to.handle.setOwner(to); // note: the caller is supposed to drain the buffer themselves @@ -879,6 +905,12 @@ const PosixBufferedReader = struct { }; } + pub fn startFileOffset(this: *PosixBufferedReader, fd: bun.FileDescriptor, poll: bool, offset: usize) bun.JSC.Maybe(void) { + this._offset = offset; + this.flags.use_pread = true; + return this.start(fd, poll); + } + // Exists for consistentcy with Windows. pub fn hasPendingRead(this: *const PosixBufferedReader) bool { return this.handle == .poll and this.handle.poll.isRegistered(); @@ -927,6 +959,7 @@ pub const WindowsBufferedReader = struct { /// The pointer to this pipe must be stable. /// It cannot change because we don't know what libuv will do with it. source: ?Source = null, + _offset: usize = 0, _buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), // for compatibility with Linux flags: Flags = .{}, @@ -948,6 +981,7 @@ pub const WindowsBufferedReader = struct { is_paused: bool = true, has_inflight_read: bool = false, + use_pread: bool = false, }; pub fn init(comptime Type: type) WindowsOutputReader { @@ -970,6 +1004,7 @@ pub const WindowsBufferedReader = struct { .vtable = to.vtable, .flags = other.flags, ._buffer = other.buffer().*, + ._offset = other._offset, .source = other.source, }; other.flags.is_done = true; @@ -1103,6 +1138,12 @@ pub const WindowsBufferedReader = struct { return this.startWithCurrentPipe(); } + pub fn startFileOffset(this: *WindowsOutputReader, fd: bun.FileDescriptor, poll: bool, offset: usize) bun.JSC.Maybe(void) { + this._offset = offset; + this.flags.use_pread = true; + return this.start(fd, poll); + } + pub fn deinit(this: *WindowsOutputReader) void { this.buffer().deinit(); const source = this.source orelse return; diff --git a/src/js/node/fs.ts b/src/js/node/fs.ts index 196aa2fecb..10f54e01ea 100644 --- a/src/js/node/fs.ts +++ b/src/js/node/fs.ts @@ -537,6 +537,8 @@ var defaultReadStreamOptions = { let kHandle = Symbol("kHandle"); +const blobToStreamWithOffset = $newZigFunction("blob.zig", "Blob.toStreamWithOffset", 1); + var ReadStreamClass; ReadStream = (function (InternalReadStream) { @@ -633,7 +635,7 @@ ReadStream = (function (InternalReadStream) { // Get the stream controller // We need the pointer to the underlying stream controller for the NativeReadable - var stream = fileRef.stream(); + const stream = blobToStreamWithOffset.$apply(fileRef, [start]); var ptr = stream.$bunNativePtr; if (!ptr) { throw new Error("Failed to get internal stream controller. This is a bug in Bun"); @@ -672,9 +674,8 @@ ReadStream = (function (InternalReadStream) { this._readableState.autoClose = autoDestroy = autoClose; this._readableState.highWaterMark = highWaterMark; - if (start !== undefined) { - this.pos = start; - } + this.pos = start || 0; + this.bytesRead = start || 0; $assert(overridden_fs); this.#fs = overridden_fs; @@ -740,51 +741,23 @@ ReadStream = (function (InternalReadStream) { } push(chunk) { - // Is it even possible for this to be less than 1? - var bytesRead = chunk?.length ?? 0; + let bytesRead = chunk?.length ?? 0; if (bytesRead > 0) { this.bytesRead += bytesRead; - var currPos = this.pos; - // Handle case of going through bytes before pos if bytesRead is less than pos - // If pos is undefined, we are reading through the whole file - // Otherwise we started from somewhere in the middle of the file - if (currPos !== undefined) { - // At this point we still haven't hit our `start` point - // We should discard this chunk and exit - if (this.bytesRead < currPos) { - return true; - } - // At this point, bytes read is greater than our starting position - // If the current position is still the starting position, that means - // this is the first chunk where we care about the bytes read - // and we need to subtract the bytes read from the start position (n) and slice the last n bytes - if (currPos === this.start) { - var n = this.bytesRead - currPos; - chunk = chunk.slice(-n); - var [_, ...rest] = arguments; - this.pos = this.bytesRead; - if (this.end !== undefined && this.bytesRead > this.end) { - chunk = chunk.slice(0, this.end - this.start + 1); - } - return super.push(chunk, ...rest); - } - var end = this.end; - // This is multi-chunk read case where we go passed the end of the what we want to read in the last chunk - if (end !== undefined && this.bytesRead > end) { - chunk = chunk.slice(0, end - currPos + 1); - var [_, ...rest] = arguments; - this.pos = this.bytesRead; - return super.push(chunk, ...rest); - } + let end = this.end; + // truncate the chunk if we go past the end + if (end !== undefined && this.bytesRead > end) { + chunk = chunk.slice(0, end - this.pos + 1); + var [_, ...rest] = arguments; this.pos = this.bytesRead; + return super.push(chunk, ...rest); } + this.pos = this.bytesRead; } return super.push(...arguments); } - // # - // n should be the highwatermark passed from Readable.read when calling internal _read (_read is set to this private fn in this class) #internalRead(n) { // pos is the current position in the file diff --git a/test/js/node/fs/fs.test.ts b/test/js/node/fs/fs.test.ts index ac326e2ac7..50005c10d4 100644 --- a/test/js/node/fs/fs.test.ts +++ b/test/js/node/fs/fs.test.ts @@ -1815,6 +1815,24 @@ describe("createReadStream", () => { done(); }); }); + + it( + "correctly handles file descriptors with an offset", + done => { + const path = `${tmpdir()}/bun-fs-createReadStream-${Date.now()}.txt`; + const fd = fs.openSync(path, "w+"); + + const stream = fs.createReadStream("", { fd: fd, start: 2 }); + stream.on("data", chunk => { + expect(chunk.toString()).toBe("llo, world!"); + done(); + }); + stream.on("error", done); + + fs.writeSync(fd, "Hello, world!"); + }, + { timeout: 100 }, + ); }); describe("fs.WriteStream", () => {