This commit is contained in:
Jarred Sumner
2024-02-08 22:54:29 -08:00
parent 147d810516
commit 39ecac0a99
3 changed files with 150 additions and 90 deletions

View File

@@ -430,6 +430,16 @@ pub const Subprocess = struct {
this.* = .{ .closed = {} };
return pipe.toJS(globalThis);
},
.buffer => |buffer| {
defer this.* = .{ .closed = {} };
if (buffer.len == 0) {
return JSC.WebCore.ReadableStream.empty(globalThis);
}
const blob = JSC.WebCore.Blob.init(buffer, bun.default_allocator, globalThis);
return JSC.WebCore.ReadableStream.fromBlob(globalThis, &blob, 0);
},
else => {
return JSValue.jsUndefined();
},

View File

@@ -319,6 +319,12 @@ pub const ReadableStream = struct {
return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@intFromEnum(id)));
}
pub fn fromOwnedSlice(globalThis: *JSGlobalObject, bytes: []u8) JSC.JSValue {
JSC.markBinding(@src());
var stream = ByteStream.new(globalThis, bytes);
return stream.toJS(globalThis);
}
pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue {
JSC.markBinding(@src());
var store = blob.store orelse {
@@ -3016,6 +3022,7 @@ pub const FileReader = struct {
lazy: Lazy = .{ .none = {} },
buffered: std.ArrayListUnmanaged(u8) = .{},
read_inside_on_pull: ReadDuringJSOnPullResult = .{ .none = {} },
highwater_mark: usize = 16384,
pub const IOReader = bun.io.BufferedReader;
pub const Poll = IOReader;
@@ -3026,6 +3033,7 @@ pub const FileReader = struct {
js: []u8,
amount_read: usize,
temporary: []const u8,
use_buffered: usize,
};
pub const Lazy = union(enum) {
@@ -3099,24 +3107,31 @@ pub const FileReader = struct {
this.pending_value.deinit();
}
pub fn onReadChunk(this: *@This(), buf: []const u8) void {
pub fn onReadChunk(this: *@This(), buf: []const u8, hasMore: bool) bool {
log("onReadChunk() = {d}", .{buf.len});
if (this.done) {
this.reader.close();
return;
return false;
}
if (this.read_inside_on_pull != .none) {
switch (this.read_inside_on_pull) {
.js => |in_progress| {
if (in_progress.len >= buf.len) {
if (in_progress.len >= buf.len and !hasMore) {
@memcpy(in_progress[0..buf.len], buf);
this.read_inside_on_pull = .{ .amount_read = buf.len };
} else {
this.read_inside_on_pull = .{ .js = in_progress[buf.len..] };
} else if (in_progress.len > 0 and !hasMore) {
this.read_inside_on_pull = .{ .temporary = buf };
} else if (hasMore and !bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) {
this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory();
this.read_inside_on_pull = .{ .use_buffered = buf.len };
}
},
.use_buffered => |original| {
this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory();
this.read_inside_on_pull = .{ .use_buffered = buf.len + original };
},
.none => unreachable,
else => @panic("Invalid state"),
}
@@ -3129,7 +3144,7 @@ pub const FileReader = struct {
this.reader.close();
this.done = true;
this.pending.run();
return;
return false;
}
if (this.pending_view.len >= buf.len) {
@@ -3145,15 +3160,13 @@ pub const FileReader = struct {
this.pending_value.clear();
this.pending_view = &.{};
this.pending.run();
return;
}
} else if (!bun.isSliceInBuffer(buf, this.reader.buffer().allocatedSlice())) {
if (this.reader.isDone() and this.reader.buffer().capacity == 0) {
this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory();
} else {
this.reader.buffer().appendSlice(buf) catch bun.outOfMemory();
return false;
}
} else if (!bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) {
this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory();
}
return this.read_inside_on_pull != .temporary and this.buffered.items.len + this.reader.buffer().items.len < this.highwater_mark;
}
pub fn onPull(this: *FileReader, buffer: []u8, array: JSC.JSValue) StreamResult {
@@ -3161,17 +3174,15 @@ pub const FileReader = struct {
defer array.ensureStillAlive();
const drained = this.drain();
log("onPull({d}) = {d}", .{ buffer.len, drained.len });
if (drained.len > 0) {
log("onPull({d}) = {d}", .{ buffer.len, drained.len });
this.pending_value.clear();
this.pending_view = &.{};
if (buffer.len >= @as(usize, drained.len)) {
@memcpy(buffer[0..drained.len], drained.slice());
// give it back!
this.reader.buffer().* = drained.listManaged(bun.default_allocator);
this.buffered.clearAndFree(bun.default_allocator);
if (this.reader.isDone()) {
return .{ .into_array_and_done = .{ .value = array, .len = drained.len } };
@@ -3194,9 +3205,14 @@ pub const FileReader = struct {
if (!this.reader.hasPendingRead()) {
this.read_inside_on_pull = .{ .js = buffer };
this.reader.read();
defer this.read_inside_on_pull = .{ .none = {} };
switch (this.read_inside_on_pull) {
.amount_read => |amount_read| {
.js => |remaining_buf| {
const amount_read = buffer.len - remaining_buf.len;
log("onPull({d}) = {d}", .{ buffer.len, amount_read });
if (amount_read > 0) {
if (this.reader.isDone()) {
return .{ .into_array_and_done = .{ .value = array, .len = @truncate(amount_read) } };
@@ -3210,16 +3226,29 @@ pub const FileReader = struct {
}
},
.temporary => |buf| {
log("onPull({d}) = {d}", .{ buffer.len, buf.len });
if (this.reader.isDone()) {
return .{ .temporary_and_done = bun.ByteList.init(buf) };
}
return .{ .temporary = bun.ByteList.init(buf) };
},
.use_buffered => {
const buffered = this.buffered;
this.buffered = .{};
log("onPull({d}) = {d}", .{ buffer.len, buffered.items.len });
if (this.reader.isDone()) {
return .{ .owned_and_done = bun.ByteList.init(buffered.items) };
}
return .{ .owned = bun.ByteList.init(buffered.items) };
},
else => {},
}
if (this.reader.isDone()) {
log("onPull({d}) = done", .{buffer.len});
return .{ .done = {} };
}
}
@@ -3227,6 +3256,8 @@ pub const FileReader = struct {
this.pending_value.set(this.parent().globalThis, array);
this.pending_view = buffer;
log("onPull({d}) = pending", .{buffer.len});
return .{ .pending = &this.pending };
}

View File

@@ -7,7 +7,7 @@ pub fn PosixPipeReader(
comptime vtable: struct {
getFd: *const fn (*This) bun.FileDescriptor,
getBuffer: *const fn (*This) *std.ArrayList(u8),
onReadChunk: ?*const fn (*This, chunk: []u8) void = null,
onReadChunk: ?*const fn (*This, chunk: []u8, hasMore: bool) void = null,
registerPoll: ?*const fn (*This) void = null,
done: *const fn (*This) void,
onError: *const fn (*This, bun.sys.Error) void,
@@ -40,18 +40,20 @@ pub fn PosixPipeReader(
pub fn onPoll(parent: *This, size_hint: isize) void {
const resizable_buffer = vtable.getBuffer(parent);
const fd = vtable.getFd(parent);
bun.sys.syslog("onPoll({d}) = {d}", .{ fd, size_hint });
readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint);
}
const stack_buffer_len = 64 * 1024;
inline fn drainChunk(parent: *This, chunk: []const u8) void {
inline fn drainChunk(parent: *This, chunk: []const u8, hasMore: bool) bool {
if (parent.vtable.isStreamingEnabled()) {
if (chunk.len > 0) {
parent.vtable.onReadChunk(chunk);
return parent.vtable.onReadChunk(chunk, hasMore);
}
}
return false;
}
// On Linux, we use preadv2 to read without blocking.
@@ -79,19 +81,15 @@ pub fn PosixPipeReader(
stack_buffer_head = stack_buffer_head[bytes_read..];
if (bytes_read == 0) {
drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]);
drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false);
close(parent);
return;
}
if (streaming) {
parent.vtable.onReadChunk(buffer);
}
},
.err => |err| {
if (err.isRetry()) {
resizable_buffer.appendSlice(buffer) catch bun.outOfMemory();
drainChunk(parent, resizable_buffer.items[0..resizable_buffer.items.len]);
drainChunk(parent, resizable_buffer.items[0..resizable_buffer.items.len], false);
if (comptime vtable.registerPoll) |register| {
register(parent);
@@ -152,61 +150,71 @@ pub fn PosixPipeReader(
}
fn readFromBlockingPipeWithoutBlockingPOSIX(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize) void {
if (size_hint > stack_buffer_len) {
resizable_buffer.ensureUnusedCapacity(@intCast(size_hint)) catch bun.outOfMemory();
}
_ = size_hint; // autofix
const start_length: usize = resizable_buffer.items.len;
const streaming = parent.vtable.isStreamingEnabled();
if (streaming and resizable_buffer.capacity == 0) {
if (streaming) {
const stack_buffer = parent.vtable.eventLoop().pipeReadBuffer();
var stack_buffer_head = stack_buffer;
while (resizable_buffer.capacity == 0) {
var stack_buffer_head = stack_buffer;
while (stack_buffer_head.len > 16 * 1024) {
var buffer = stack_buffer_head;
while (stack_buffer_head.len > 16 * 1024) {
var buffer = stack_buffer_head;
switch (bun.sys.readNonblocking(
fd,
buffer,
)) {
.result => |bytes_read| {
buffer = stack_buffer_head[0..bytes_read];
stack_buffer_head = stack_buffer_head[bytes_read..];
switch (bun.sys.readNonblocking(
fd,
buffer,
)) {
.result => |bytes_read| {
buffer = stack_buffer_head[0..bytes_read];
stack_buffer_head = stack_buffer_head[bytes_read..];
if (bytes_read == 0) {
drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]);
close(parent);
return;
}
switch (bun.isReadable(fd)) {
.ready, .hup => continue,
.not_ready => {
drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]);
if (comptime vtable.registerPoll) |register| {
register(parent);
}
return;
},
}
},
.err => |err| {
drainChunk(parent, stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]);
if (err.isRetry()) {
if (comptime vtable.registerPoll) |register| {
register(parent);
if (bytes_read == 0) {
if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0)
_ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false);
close(parent);
return;
}
}
vtable.onError(parent, err);
return;
},
}
}
},
.err => |err| {
if (err.isRetry()) {
if (comptime vtable.registerPoll) |register| {
register(parent);
_ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false);
return;
}
}
resizable_buffer.appendSlice(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len]) catch bun.outOfMemory();
if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0)
_ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false);
vtable.onError(parent, err);
return;
},
}
switch (bun.isReadable(fd)) {
.ready, .hup => {},
.not_ready => {
if (comptime vtable.registerPoll) |register| {
register(parent);
}
if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0)
_ = parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false);
return;
},
}
}
if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) {
if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], false)) {
return;
}
}
if (!parent.vtable.isStreamingEnabled()) break;
}
}
while (true) {
@@ -219,19 +227,27 @@ pub fn PosixPipeReader(
resizable_buffer.items.len += bytes_read;
if (bytes_read == 0) {
drainChunk(parent, resizable_buffer.items[start_length..]);
_ = drainChunk(parent, resizable_buffer.items[start_length..], false);
close(parent);
return;
}
if (streaming) {
parent.vtable.onReadChunk(buffer);
switch (bun.isReadable(fd)) {
.ready, .hup => continue,
.not_ready => {
_ = drainChunk(parent, resizable_buffer.items[start_length..], false);
if (comptime vtable.registerPoll) |register| {
register(parent);
}
return;
},
}
},
.err => |err| {
if (err.isRetry()) {
drainChunk(parent, resizable_buffer.items[start_length..]);
_ = drainChunk(parent, resizable_buffer.items[start_length..], false);
if (err.isRetry()) {
if (comptime vtable.registerPoll) |register| {
register(parent);
return;
@@ -257,7 +273,7 @@ pub fn WindowsPipeReader(
comptime This: type,
comptime _: anytype,
comptime getBuffer: fn (*This) *std.ArrayList(u8),
comptime onReadChunk: fn (*This, chunk: []u8) void,
comptime onReadChunk: fn (*This, chunk: []u8, bool) bool,
comptime registerPoll: ?fn (*This) void,
comptime done: fn (*This) void,
comptime onError: fn (*This, bun.sys.Error) void,
@@ -358,7 +374,7 @@ const BufferedReaderVTable = struct {
}
pub const Fn = struct {
onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void = null,
onReadChunk: ?*const fn (*anyopaque, chunk: []const u8, hasMore: bool) bool = null,
onReaderDone: *const fn (*anyopaque) void,
onReaderError: *const fn (*anyopaque, bun.sys.Error) void,
loop: *const fn (*anyopaque) *Async.Loop,
@@ -398,8 +414,12 @@ const BufferedReaderVTable = struct {
return this.fns.onReadChunk != null;
}
pub fn onReadChunk(this: @This(), chunk: []const u8) void {
this.fns.onReadChunk.?(this.parent, chunk);
/// When the reader has read a chunk of data
/// and hasMore is true, it means that there might be more data to read.
///
/// Returning false prevents the reader from reading more data.
pub fn onReadChunk(this: @This(), chunk: []const u8, hasMore: bool) bool {
return this.fns.onReadChunk.?(this.parent, chunk, hasMore);
}
pub fn onReaderDone(this: @This()) void {
@@ -464,8 +484,8 @@ const PosixBufferedReader = struct {
.onError = @ptrCast(&onError),
});
fn _onReadChunk(this: *PosixBufferedReader, chunk: []u8) void {
this.vtable.onReadChunk(chunk);
fn _onReadChunk(this: *PosixBufferedReader, chunk: []u8, hasMore: bool) bool {
return this.vtable.onReadChunk(chunk, hasMore);
}
pub fn getFd(this: *PosixBufferedReader) bun.FileDescriptor {
@@ -547,9 +567,8 @@ const PosixBufferedReader = struct {
return .{ .result = {} };
}
this.pollable = true;
this.handle = .{ .fd = fd };
this.read();
this.registerPoll();
return .{
.result = {},
@@ -642,11 +661,11 @@ pub const GenericWindowsBufferedReader = struct {
return this.has_inflight_read;
}
fn _onReadChunk(this: *WindowsOutputReader, buf: []u8) void {
fn _onReadChunk(this: *WindowsOutputReader, buf: []u8, hasMore: bool) bool {
this.has_inflight_read = false;
const onReadChunkFn = this.vtable.onReadChunk orelse return;
onReadChunkFn(this.parent() orelse return, buf);
return onReadChunkFn(this.parent() orelse return, buf, hasMore);
}
fn finish(this: *WindowsOutputReader) void {
@@ -689,7 +708,7 @@ pub const GenericWindowsBufferedReader = struct {
}
};
pub fn WindowsBufferedReader(comptime Parent: type, comptime onReadChunk: ?*const fn (*anyopaque, chunk: []const u8) void) type {
pub fn WindowsBufferedReader(comptime Parent: type, comptime onReadChunk: ?*const fn (*anyopaque, chunk: []const u8, more: bool) bool) type {
return struct {
reader: ?*GenericWindowsBufferedReader = null,