More careful code for FileBlobLoader and FIleSink

This commit is contained in:
Jarred Sumner
2022-10-09 02:02:25 -07:00
parent edfd2af949
commit 25918ddd2a
2 changed files with 101 additions and 12 deletions

View File

@@ -359,10 +359,15 @@ pub const Prompt = struct {
pub const Crypto = struct {
const UUID = @import("./uuid.zig");
const BoringSSL = @import("boringssl");
pub const Class = JSC.NewClass(void, .{ .name = "crypto" }, .{
.getRandomValues = JSC.DOMCall("Crypto", @This(), "getRandomValues", JSC.JSValue, JSC.DOMEffect.top),
.randomUUID = JSC.DOMCall("Crypto", @This(), "randomUUID", *JSC.JSString, JSC.DOMEffect.top),
}, .{});
pub const Class = JSC.NewClass(
void,
.{ .name = "crypto" },
.{
.getRandomValues = JSC.DOMCall("Crypto", @This(), "getRandomValues", JSC.JSValue, JSC.DOMEffect.top),
.randomUUID = JSC.DOMCall("Crypto", @This(), "randomUUID", *JSC.JSString, JSC.DOMEffect.top),
},
.{},
);
pub const Prototype = JSC.NewClass(
void,
.{ .name = "Crypto" },

View File

@@ -759,13 +759,22 @@ pub const Signal = struct {
) VTable {
const Functions = struct {
fn onClose(this: *anyopaque, err: ?Syscall.Error) void {
Wrapped.close(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), err);
if (comptime !@hasDecl(Wrapped, "onClose"))
Wrapped.close(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), err)
else
Wrapped.onClose(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), err);
}
fn onReady(this: *anyopaque, amount: ?Blob.SizeType, offset: ?Blob.SizeType) void {
Wrapped.ready(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), amount, offset);
if (comptime !@hasDecl(Wrapped, "onReady"))
Wrapped.ready(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), amount, offset)
else
Wrapped.onReady(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)), amount, offset);
}
fn onStart(this: *anyopaque) void {
Wrapped.start(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)));
if (comptime !@hasDecl(Wrapped, "onStart"))
Wrapped.start(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)))
else
Wrapped.onStart(@ptrCast(*Wrapped, @alignCast(std.meta.alignment(Wrapped), this)));
}
};
@@ -1019,6 +1028,7 @@ pub const FileSink = struct {
pub usingnamespace NewReadyWatcher(@This(), .write, ready);
const max_fifo_size = 64 * 1024;
pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) {
var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
const auto_close = this.auto_close;
@@ -1091,6 +1101,10 @@ pub const FileSink = struct {
}
pub fn flush(this: *FileSink) StreamResult.Writable {
return flushMaybePoll(this);
}
pub fn flushMaybePoll(this: *FileSink) StreamResult.Writable {
std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor));
var total: usize = this.written;
@@ -1111,7 +1125,9 @@ pub const FileSink = struct {
}
}
while (remain.len > 0) {
const res = JSC.Node.Syscall.write(fd, remain);
const max_to_write = if (std.os.S.ISFIFO(this.mode)) max_fifo_size else remain.len;
const write_buf = remain[0..@minimum(remain.len, max_to_write)];
const res = JSC.Node.Syscall.write(fd, write_buf);
if (res == .err) {
const retry =
std.os.E.AGAIN;
@@ -1134,6 +1150,20 @@ pub const FileSink = struct {
remain = remain[res.result..];
total += res.result;
if (res.result == 0) break;
// we flushed an entire fifo
// but we still have more
// lets wait for the next poll
if (std.os.S.ISFIFO(this.mode) and
max_to_write == max_fifo_size and
res.result == max_fifo_size and
remain.len > 0)
{
this.watch(this.fd);
return .{
.pending = &this.pending,
};
}
}
this.pending.result = .{
@@ -1192,6 +1222,7 @@ pub const FileSink = struct {
}
pub fn finalize(this: *FileSink) void {
this.signal.close(null);
this.cleanup();
this.reachable_from_js = false;
@@ -1225,7 +1256,7 @@ pub const FileSink = struct {
}
pub fn ready(this: *FileSink, _: i64) void {
_ = this.flush();
_ = this.flushMaybePoll();
}
pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
@@ -1308,6 +1339,10 @@ pub const FileSink = struct {
if (this.next) |*next| {
return next.end(err);
}
if (this.requested_end or this.done)
return .{ .result = void{} };
this.requested_end = true;
const flushy = this.flush();
@@ -3006,6 +3041,7 @@ pub const FileBlobLoader = struct {
total_read: Blob.SizeType = 0,
finalized: bool = false,
callback: anyframe = undefined,
buffered_data: bun.ByteList = .{},
pending: StreamResult.Pending = StreamResult.Pending{
.frame = undefined,
.state = .none,
@@ -3019,6 +3055,8 @@ pub const FileBlobLoader = struct {
stored_global_this_: ?*JSC.JSGlobalObject = null,
poll_ref: JSC.PollRef = .{},
has_adjusted_pipe_size_on_linux: bool = false,
finished: bool = false,
signal: JSC.WebCore.Signal = .{},
pub usingnamespace NewReadyWatcher(@This(), .read, ready);
@@ -3042,6 +3080,25 @@ pub const FileBlobLoader = struct {
};
}
pub fn finish(this: *FileBlobLoader) void {
if (this.finished) return;
this.finished = true;
// we are done
// resolve any promises with done
// but there could still be data in the pipe
// so we shouldn't actually end it, just means that we no longer will retry on EGAGAIN
if (this.pending.state == .pending) {
if (this.buffered_data.len > 0) {
this.pending.result = .{ .owned = this.buffered_data };
this.buffered_data = .{};
} else {
this.pending.result = .{ .done = {} };
}
this.pending.run();
}
}
const Concurrent = struct {
read: Blob.SizeType = 0,
task: NetworkThread.Task = .{ .callback = Concurrent.taskCallback },
@@ -3202,8 +3259,11 @@ pub const FileBlobLoader = struct {
NetworkThread.global.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 });
}
const default_fifo_chunk_size = 1024;
// macOS default pipe size is page_size, 16k, or 64k. It changes based on how much was written
// Linux default pipe size is 16 pages of memory
const default_fifo_chunk_size = 64 * 1024;
const default_file_chunk_size = 1024 * 1024 * 2;
pub fn onStart(this: *FileBlobLoader) StreamStart {
var file = &this.store.data.file;
std.debug.assert(!this.started);
@@ -3293,6 +3353,7 @@ pub const FileBlobLoader = struct {
this.auto_close = auto_close;
const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
this.signal.start();
return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) };
}
@@ -3316,6 +3377,13 @@ pub const FileBlobLoader = struct {
const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
std.debug.assert(this.started);
if (this.buffered_data.len > 0) {
const data = this.buffered_data;
this.buffered_data.len = 0;
this.buffered_data.cap = 0;
return .{ .owned = data };
}
switch (chunk_size) {
0 => {
std.debug.assert(this.store.data.file.seekable orelse false);
@@ -3371,7 +3439,6 @@ pub const FileBlobLoader = struct {
if (owned) {
return .{ .owned_and_done = bun.ByteList.init(buf) };
}
return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result), .value = view } };
}
@@ -3392,6 +3459,10 @@ pub const FileBlobLoader = struct {
std.debug.assert(this.started);
std.debug.assert(read_buf.len > 0);
if (this.fd == std.math.maxInt(JSC.Node.FileDescriptor)) {
return .{ .done = {} };
}
var buf_to_use = read_buf;
var free_buffer_on_error: bool = false;
@@ -3467,6 +3538,10 @@ pub const FileBlobLoader = struct {
switch (err.getErrno()) {
retry => {
if (this.finished) {
return .{ .done = {} };
}
if (free_buffer_on_error) {
bun.default_allocator.free(buf_to_use);
buf_to_use = read_buf;
@@ -3562,7 +3637,16 @@ pub const FileBlobLoader = struct {
if (this.finalized)
return;
this.unwatch(this.fd);
this.signal.close(null);
if (this.buffered_data.cap > 0) {
this.buffered_data.listManaged(bun.default_allocator).deinit();
}
this.finished = true;
if (this.poll_ref.isActive())
this.unwatch(this.fd);
this.finalized = true;
this.pending.result = .{ .done = {} };