Fix reading FIFO files

This commit is contained in:
Jarred Sumner
2022-11-25 00:05:14 -08:00
parent 04328c163b
commit 7b23cb5cd7
2 changed files with 59 additions and 19 deletions

View File

@@ -4146,6 +4146,8 @@ pub const FilePoll = struct {
disable,
nonblocking,
pub fn poll(this: Flags) Flags {
return switch (this) {
.readable => .poll_readable,

View File

@@ -292,6 +292,7 @@ pub const ReadableStream = struct {
.globalThis = globalThis,
.context = .{
.buffered_data = buffered_data,
.started = true,
.lazy_readable = .{
.readable = .{
.FIFO = fifo.*,
@@ -1142,7 +1143,16 @@ pub const FileSink = struct {
const log = Output.scoped(.FileSink, false);
pub fn isReachable(this: *const FileSink) bool {
return this.reachable_from_js or this.signal.isDead();
return this.reachable_from_js or !this.signal.isDead();
}
pub fn updateRef(this: *FileSink, value: bool) void {
if (this.poll_ref) |poll| {
if (value)
poll.enableKeepingProcessAlive(JSC.VirtualMachine.vm)
else
poll.disableKeepingProcessAlive(JSC.VirtualMachine.vm);
}
}
const max_fifo_size = 64 * 1024;
@@ -1294,7 +1304,7 @@ pub const FileSink = struct {
}
switch (bun.isWritable(fd)) {
.not_writable => {
.not_ready => {
if (this.poll_ref) |poll| {
poll.flags.remove(.writable);
}
@@ -1318,7 +1328,7 @@ pub const FileSink = struct {
.done = {},
};
},
.writable => break :brk this.max_write_size,
.ready => break :brk this.max_write_size,
}
} else remain.len;
@@ -1373,14 +1383,14 @@ pub const FileSink = struct {
// lets check if its writable, so we avoid blocking
if (is_fifo and remain.len > 0) {
switch (bun.isWritable(fd)) {
.writable => {
.ready => {
if (this.poll_ref) |poll_ref| {
poll_ref.flags.insert(.writable);
poll_ref.flags.insert(.fifo);
std.debug.assert(poll_ref.flags.contains(.poll_writable));
}
},
.not_writable => {
.not_ready => {
if (!this.isWatching())
this.watch(this.fd);
@@ -2227,8 +2237,16 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
.@"end" = end,
.@"construct" = construct,
.@"endWithSink" = endWithSink,
.@"updateRef" = updateRef,
});
pub fn updateRef(ptr: *anyopaque, value: bool) callconv(.C) void {
JSC.markBinding(@src());
var this = bun.cast(*ThisSink, ptr);
if (comptime @hasDecl(SinkType, "updateRef"))
this.sink.updateRef(value);
}
comptime {
if (!JSC.is_bindgen) {
@export(finalize, .{ .name = Export[0].symbol_name });
@@ -2239,6 +2257,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
@export(end, .{ .name = Export[5].symbol_name });
@export(construct, .{ .name = Export[6].symbol_name });
@export(endWithSink, .{ .name = Export[7].symbol_name });
@export(updateRef, .{ .name = Export[8].symbol_name });
}
}
@@ -3040,8 +3059,9 @@ pub fn ReadableStreamSource(
});
comptime {
if (!JSC.is_bindgen)
if (!JSC.is_bindgen) {
@export(load, .{ .name = Export[0].symbol_name });
}
}
};
};
@@ -3601,19 +3621,36 @@ pub const FIFO = struct {
if (!is_readable and (this.close_on_empty_read or poll.isHUP())) {
// it might be readable actually
this.close_on_empty_read = true;
if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
return null;
switch (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
.ready => {
this.close_on_empty_read = false;
return null;
},
else => {},
}
return .done;
} else if (!is_readable and poll.isWatching()) {
// if the file was opened non-blocking
// we don't risk anything by attempting to read it!
if (poll.flags.contains(.nonblocking))
return null;
// this happens if we've registered a watcher but we haven't
// ticked the event loop since registering it
if (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
return null;
switch (bun.isReadable(@intCast(std.os.fd_t, poll.fd))) {
.ready => {
poll.flags.insert(.readable);
return null;
},
.hup => {
poll.flags.insert(.hup);
return .done;
},
else => {
return .pending;
},
}
return .pending;
}
}
@@ -3624,13 +3661,10 @@ pub const FIFO = struct {
}
} else if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) {
// we don't know if it's readable or not
if (!bun.isReadable(this.fd)) {
// we hung up
if (this.close_on_empty_read)
return .done;
return .pending;
}
return switch (bun.isReadable(this.fd)) {
.hup, .ready => null,
else => ReadResult{ .pending = {} },
};
}
return null;
@@ -4361,6 +4395,8 @@ pub const FileReader = struct {
},
},
};
this.lazy_readable.readable.FIFO.watch(readable_file.fd);
this.lazy_readable.readable.FIFO.poll_ref.?.flags.insert(.nonblocking);
} else {
this.lazy_readable = .{
.readable = .{ .File = readable_file },
@@ -4435,6 +4471,8 @@ pub const FileReader = struct {
}
}
pub const setRef = setRefOrUnref;
pub fn drainInternalBuffer(this: *FileReader) bun.ByteList {
const buffered = this.buffered_data;
if (buffered.cap > 0) {