From abdedcb29aafebb2c643e6fda47bffd42549cd2e Mon Sep 17 00:00:00 2001 From: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Date: Thu, 7 Mar 2024 02:33:24 -0800 Subject: [PATCH] Fix a bunch of things --- src/bun.js/webcore/streams.zig | 50 +++++++++++++++++++++++++--------- src/io/PipeWriter.zig | 6 +++- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index ff80ad994c..6eab92a6da 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2909,11 +2909,23 @@ pub const FileSink = struct { pub fn onWrite(this: *FileSink, amount: usize, done: bool) void { log("onWrite({d}, {any})", .{ amount, done }); + this.written += amount; + // Only keep the event loop ref'd while there's a pending write in progress. // If there's no pending write, no need to keep the event loop ref'd. this.writer.updateRef(this.eventLoop(), false); - this.written += amount; + // If the developer requested to close the writer (.end() in node streams) + // + // but: + // 1) We haven't finished writing yet + // 2) We haven't received EOF + if (this.done and !done and this.writer.hasPendingData()) { + if (this.pending.state == .pending) { + this.pending.consumed += @truncate(amount); + } + return; + } if (this.pending.state == .pending) { this.pending.consumed += @truncate(amount); @@ -2925,7 +2937,7 @@ pub const FileSink = struct { this.runPending(); - if (this.done and done) { + if (this.done and !done and this.writer.getBuffer().len == 0) { // if we call end/endFromJS and we have some pending returned from .flush() we should call writer.end() this.writer.end(); } @@ -3276,7 +3288,7 @@ pub const FileReader = struct { pending_view: []u8 = &.{}, fd: bun.FileDescriptor = bun.invalid_fd, started: bool = false, - started_from_js: bool = false, + waiting_for_onReaderDone: bool = false, event_loop: JSC.EventLoopHandle, lazy: Lazy = .{ .none = {} }, buffered: std.ArrayListUnmanaged(u8) = .{}, @@ -3419,6 +3431,7 @@ pub const FileReader = struct { pollable = opened.pollable; file_type = opened.file_type; this.reader.flags.nonblocking = opened.nonblocking; + this.reader.flags.pollable = pollable; }, } }, @@ -3436,23 +3449,32 @@ pub const FileReader = struct { if (was_lazy) { _ = this.parent().incrementCount(); - this.started_from_js = true; + this.waiting_for_onReaderDone = true; switch (this.reader.start(this.fd, pollable)) { .result => {}, .err => |e| { return .{ .err = e }; }, } + } else if (comptime Environment.isPosix) { + if (this.reader.flags.pollable and !this.reader.isDone()) { + this.waiting_for_onReaderDone = true; + _ = this.parent().incrementCount(); + } } if (comptime Environment.isPosix) { - if (this.reader.handle.getPoll()) |poll| { - if (file_type == .pipe or file_type == .nonblocking_pipe) { - poll.flags.insert(.fifo); - } + if (file_type == .socket) { + this.reader.flags.socket = true; + } - if (file_type == .socket) { + if (this.reader.handle.getPoll()) |poll| { + if (file_type == .socket or this.reader.flags.socket) { poll.flags.insert(.socket); + } else { + // if it's a TTY, we report it as a fifo + // we want the behavior to be as though it were a blocking pipe. + poll.flags.insert(.fifo); } if (this.reader.flags.nonblocking) { @@ -3470,6 +3492,10 @@ pub const FileReader = struct { this.buffered = .{}; return .{ .owned_and_done = bun.ByteList.init(buffered.items) }; } + } else if (comptime Environment.isPosix) { + if (!was_lazy and this.reader.flags.pollable) { + this.reader.read(); + } } return .{ .ready = {} }; @@ -3536,8 +3562,6 @@ pub const FileReader = struct { this.pending_value.clear(); this.pending_view = &.{}; this.reader.buffer().clearAndFree(); - this.reader.close(); - this.done = true; this.pending.run(); return false; } @@ -3775,8 +3799,8 @@ pub const FileReader = struct { } this.parent().onClose(); - if (this.started_from_js) { - this.started_from_js = false; + if (this.waiting_for_onReaderDone) { + this.waiting_for_onReaderDone = false; _ = this.parent().decrementCount(); } } diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index fb85d95bf7..053e8de714 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -454,6 +454,10 @@ pub fn PosixStreamingWriter( } } + pub fn hasPendingData(this: *const PosixWriter) bool { + return this.buffer.items.len > 0; + } + fn closeWithoutReporting(this: *PosixWriter) void { if (this.getFd() != bun.invalid_fd) { std.debug.assert(!this.closed_without_reporting); @@ -1071,7 +1075,7 @@ pub fn WindowsStreamingWriter( return .{ .result = {} }; } - fn hasPendingData(this: *WindowsWriter) bool { + pub fn hasPendingData(this: *const WindowsWriter) bool { return (this.outgoing.isNotEmpty() or this.current_payload.isNotEmpty()); }