Fix a bunch of things

This commit is contained in:
Jarred Sumner
2024-03-07 02:33:24 -08:00
parent 7a26d18968
commit abdedcb29a
2 changed files with 42 additions and 14 deletions

View File

@@ -2909,11 +2909,23 @@ pub const FileSink = struct {
pub fn onWrite(this: *FileSink, amount: usize, done: bool) void {
log("onWrite({d}, {any})", .{ amount, done });
this.written += amount;
// Only keep the event loop ref'd while there's a pending write in progress.
// If there's no pending write, no need to keep the event loop ref'd.
this.writer.updateRef(this.eventLoop(), false);
this.written += amount;
// If the developer requested to close the writer (.end() in node streams)
//
// but:
// 1) We haven't finished writing yet
// 2) We haven't received EOF
if (this.done and !done and this.writer.hasPendingData()) {
if (this.pending.state == .pending) {
this.pending.consumed += @truncate(amount);
}
return;
}
if (this.pending.state == .pending) {
this.pending.consumed += @truncate(amount);
@@ -2925,7 +2937,7 @@ pub const FileSink = struct {
this.runPending();
if (this.done and done) {
if (this.done and !done and this.writer.getBuffer().len == 0) {
// if we call end/endFromJS and we have some pending returned from .flush() we should call writer.end()
this.writer.end();
}
@@ -3276,7 +3288,7 @@ pub const FileReader = struct {
pending_view: []u8 = &.{},
fd: bun.FileDescriptor = bun.invalid_fd,
started: bool = false,
started_from_js: bool = false,
waiting_for_onReaderDone: bool = false,
event_loop: JSC.EventLoopHandle,
lazy: Lazy = .{ .none = {} },
buffered: std.ArrayListUnmanaged(u8) = .{},
@@ -3419,6 +3431,7 @@ pub const FileReader = struct {
pollable = opened.pollable;
file_type = opened.file_type;
this.reader.flags.nonblocking = opened.nonblocking;
this.reader.flags.pollable = pollable;
},
}
},
@@ -3436,23 +3449,32 @@ pub const FileReader = struct {
if (was_lazy) {
_ = this.parent().incrementCount();
this.started_from_js = true;
this.waiting_for_onReaderDone = true;
switch (this.reader.start(this.fd, pollable)) {
.result => {},
.err => |e| {
return .{ .err = e };
},
}
} else if (comptime Environment.isPosix) {
if (this.reader.flags.pollable and !this.reader.isDone()) {
this.waiting_for_onReaderDone = true;
_ = this.parent().incrementCount();
}
}
if (comptime Environment.isPosix) {
if (this.reader.handle.getPoll()) |poll| {
if (file_type == .pipe or file_type == .nonblocking_pipe) {
poll.flags.insert(.fifo);
}
if (file_type == .socket) {
this.reader.flags.socket = true;
}
if (file_type == .socket) {
if (this.reader.handle.getPoll()) |poll| {
if (file_type == .socket or this.reader.flags.socket) {
poll.flags.insert(.socket);
} else {
// if it's a TTY, we report it as a fifo
// we want the behavior to be as though it were a blocking pipe.
poll.flags.insert(.fifo);
}
if (this.reader.flags.nonblocking) {
@@ -3470,6 +3492,10 @@ pub const FileReader = struct {
this.buffered = .{};
return .{ .owned_and_done = bun.ByteList.init(buffered.items) };
}
} else if (comptime Environment.isPosix) {
if (!was_lazy and this.reader.flags.pollable) {
this.reader.read();
}
}
return .{ .ready = {} };
@@ -3536,8 +3562,6 @@ pub const FileReader = struct {
this.pending_value.clear();
this.pending_view = &.{};
this.reader.buffer().clearAndFree();
this.reader.close();
this.done = true;
this.pending.run();
return false;
}
@@ -3775,8 +3799,8 @@ pub const FileReader = struct {
}
this.parent().onClose();
if (this.started_from_js) {
this.started_from_js = false;
if (this.waiting_for_onReaderDone) {
this.waiting_for_onReaderDone = false;
_ = this.parent().decrementCount();
}
}

View File

@@ -454,6 +454,10 @@ pub fn PosixStreamingWriter(
}
}
pub fn hasPendingData(this: *const PosixWriter) bool {
return this.buffer.items.len > 0;
}
fn closeWithoutReporting(this: *PosixWriter) void {
if (this.getFd() != bun.invalid_fd) {
std.debug.assert(!this.closed_without_reporting);
@@ -1071,7 +1075,7 @@ pub fn WindowsStreamingWriter(
return .{ .result = {} };
}
fn hasPendingData(this: *WindowsWriter) bool {
pub fn hasPendingData(this: *const WindowsWriter) bool {
return (this.outgoing.isNotEmpty() or this.current_payload.isNotEmpty());
}