This commit is contained in:
Jarred Sumner
2024-02-01 00:38:03 -08:00
parent 989edb99ed
commit 65c6ace259
5 changed files with 267 additions and 95 deletions

View File

@@ -277,8 +277,6 @@ pub const Subprocess = struct {
const Readable = union(enum) {
fd: bun.FileDescriptor,
memfd: bun.FileDescriptor,
sync_buffered_output: *BufferedOutput,
pipe: Pipe,
inherit: void,
ignore: void,
@@ -320,7 +318,7 @@ pub const Subprocess = struct {
pub const Pipe = union(enum) {
stream: JSC.WebCore.ReadableStream,
buffer: StreamingOutput,
buffer: PipeReader,
detached: void,
pub fn finish(this: *@This()) void {
@@ -760,6 +758,17 @@ pub const Subprocess = struct {
pub usingnamespace bun.NewRefCounted(@This(), deinit);
pub fn readAll(this: *PipeReader) void {
if (this.state == .pending)
this.reader.read();
}
pub fn start(this: *PipeReader, process: *Subprocess, event_loop: *JSC.EventLoop) JSC.Maybe(void) {
this.process = process;
this.event_loop = event_loop;
return this.reader.start();
}
pub fn onOutputDone(this: *PipeReader) void {
const owned = this.toOwnedSlice();
this.state = .{ .done = owned };

View File

@@ -826,7 +826,7 @@ pub const VirtualMachine = struct {
bun.reloadProcess(bun.default_allocator, !strings.eqlComptime(this.bundler.env.map.get("BUN_CONFIG_NO_CLEAR_TERMINAL_ON_RELOAD") orelse "0", "true"));
}
if (!strings.eqlComptime(this.bundler.env.map.get("BUN_CONFIG_NO_CLEAR_TERMINAL_ON_RELOAD") orelse "0", "true")) {
if (!strings.eqlComptime(this.bundler.env.get("BUN_CONFIG_NO_CLEAR_TERMINAL_ON_RELOAD") orelse "0", "true")) {
Output.flush();
Output.disableBuffering();
Output.resetTerminalAll();

View File

@@ -242,7 +242,7 @@ pub const ReadableStream = struct {
Bytes: *ByteStream,
Pipe: *ReadableStreamPipe,
Pipe: *PipeReader,
};
extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag;
@@ -303,7 +303,7 @@ pub const ReadableStream = struct {
.Pipe => ReadableStream{
.value = value,
.ptr = .{
.Pipe = ptr.asPtr(ReadableStreamPipe),
.Pipe = ptr.asPtr(PipeReader),
},
},
@@ -366,7 +366,7 @@ pub const ReadableStream = struct {
buffered_reader: anytype,
) JSC.JSValue {
JSC.markBinding(@src());
var source = bun.default_allocator.create(ReadableStreamPipe.Source) catch bun.outOfMemory();
var source = bun.default_allocator.create(PipeReader.Source) catch bun.outOfMemory();
source.* = .{
.globalThis = globalThis,
.context = undefined,
@@ -440,9 +440,9 @@ pub const StreamStart = union(Tag) {
chunk_size,
ArrayBufferSink,
FileSink,
PipeSink,
HTTPSResponseSink,
HTTPResponseSink,
UVStreamSink,
ready,
};
@@ -498,12 +498,12 @@ pub const StreamStart = union(Tag) {
empty = false;
}
if (value.get(globalThis, "stream")) |as_array| {
if (value.fastGet(globalThis, .stream)) |as_array| {
stream = as_array.toBoolean();
empty = false;
}
if (value.get(globalThis, "highWaterMark")) |chunkSize| {
if (value.fastGet(globalThis, .highWaterMark)) |chunkSize| {
if (chunkSize.isNumber()) {
empty = false;
chunk_size = @as(JSC.WebCore.Blob.SizeType, @intCast(@max(0, @as(i51, @truncate(chunkSize.toInt64())))));
@@ -523,12 +523,12 @@ pub const StreamStart = union(Tag) {
.FileSink => {
var chunk_size: JSC.WebCore.Blob.SizeType = 0;
if (value.getTruthy(globalThis, "highWaterMark")) |chunkSize| {
if (value.fastGet(globalThis, .highWaterMark)) |chunkSize| {
if (chunkSize.isNumber())
chunk_size = @as(JSC.WebCore.Blob.SizeType, @intCast(@max(0, @as(i51, @truncate(chunkSize.toInt64())))));
}
if (value.getTruthy(globalThis, "path")) |path| {
if (value.fastGet(globalThis, .path)) |path| {
if (!path.isString()) {
return .{
.err = Syscall.Error{
@@ -586,7 +586,7 @@ pub const StreamStart = union(Tag) {
var empty = true;
var chunk_size: JSC.WebCore.Blob.SizeType = 2048;
if (value.getTruthy(globalThis, "highWaterMark")) |chunkSize| {
if (value.fastGet(globalThis, .highWaterMark)) |chunkSize| {
if (chunkSize.isNumber()) {
empty = false;
chunk_size = @as(JSC.WebCore.Blob.SizeType, @intCast(@max(256, @as(i51, @truncate(chunkSize.toInt64())))));
@@ -3662,7 +3662,7 @@ pub fn ReadableStreamSource(
};
}
pub const ReadableStreamPipe = struct {
pub const PipeReader = struct {
reader: bun.io.BufferedOutputReader(@This(), onReadChunk) = .{},
done: bool = false,
pending: StreamResult.Pending = .{},
@@ -3670,10 +3670,10 @@ pub const ReadableStreamPipe = struct {
pending_view: []u8 = []u8{},
pub fn setup(
this: *ReadableStreamPipe,
this: *PipeReader,
other_reader: anytype,
) void {
this.* = ReadableStreamPipe{
this.* = PipeReader{
.reader = .{},
.done = false,
};
@@ -3681,8 +3681,13 @@ pub const ReadableStreamPipe = struct {
this.reader.fromOutputReader(other_reader, this);
}
pub fn onStart(this: *ReadableStreamPipe) StreamStart {
_ = this; // autofix
pub fn onStart(this: *PipeReader) StreamStart {
switch (this.reader.start()) {
.result => {},
.err => |e| {
return .{ .err = e };
},
}
return .{ .ready = {} };
}
@@ -3691,13 +3696,13 @@ pub const ReadableStreamPipe = struct {
return @fieldParentPtr(Source, "context", this);
}
pub fn onCancel(this: *ReadableStreamPipe) void {
pub fn onCancel(this: *PipeReader) void {
if (this.done) return;
this.done = true;
this.reader.close();
}
pub fn deinit(this: *ReadableStreamPipe) void {
pub fn deinit(this: *PipeReader) void {
this.reader.deinit();
this.pending_value.deinit();
}
@@ -3738,7 +3743,7 @@ pub const ReadableStreamPipe = struct {
}
}
pub fn onPull(this: *ReadableStreamPipe, buffer: []u8, array: JSC.JSValue) StreamResult {
pub fn onPull(this: *PipeReader, buffer: []u8, array: JSC.JSValue) StreamResult {
array.ensureStillAlive();
defer array.ensureStillAlive();
const drained = this.drain();
@@ -3773,7 +3778,7 @@ pub const ReadableStreamPipe = struct {
return .{ .pending = &this.pending };
}
pub fn drain(this: *ReadableStreamPipe) bun.ByteList {
pub fn drain(this: *PipeReader) bun.ByteList {
if (this.reader.hasPendingRead()) {
return .{};
}
@@ -3783,7 +3788,7 @@ pub const ReadableStreamPipe = struct {
return bun.ByteList.fromList(out);
}
pub fn setRefOrUnref(this: *ReadableStreamPipe, enable: bool) void {
pub fn setRefOrUnref(this: *PipeReader, enable: bool) void {
if (this.done) return;
if (enable) {
this.reader.enableKeepingProcessAlive(JSC.EventLoopHandle.init(this.parent().globalThis.bunVM().eventLoop()));

View File

@@ -397,8 +397,10 @@ pub fn PosixBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?*
}
}
pub fn start(this: *PosixOutputReader) bun.JSC.Maybe(void) {
const maybe = this.poll.register(this.parent.loop(), .readable, true);
pub fn start(this: *PosixOutputReader, fd: bun.FileDescriptor) bun.JSC.Maybe(void) {
const poll = Async.FilePoll.init(this.parent.loop(), fd, .readable, @This(), this);
this.poll = poll;
const maybe = poll.register(this.parent.loop(), .readable, true);
if (maybe != .result) {
return maybe;
}
@@ -515,7 +517,7 @@ pub const GenericWindowsBufferedOutputReader = struct {
return this._buffer.allocatedSlice()[this._buffer.items.len..];
}
pub fn start(this: *WindowsOutputReader) JSC.Maybe(void) {
pub fn start(this: *@This(), _: bun.FileDescriptor) bun.JSC.Maybe(void) {
this.buffer.clearRetainingCapacity();
this.is_done = false;
}
@@ -577,13 +579,13 @@ pub fn WindowsBufferedOutputReader(comptime Parent: type, comptime onReadChunk:
reader.deref();
}
pub fn start(this: *@This()) bun.JSC.Maybe(void) {
pub fn start(this: *@This(), fd: bun.FileDescriptor) bun.JSC.Maybe(void) {
const reader = this.reader orelse brk: {
this.reader = this.newReader();
break :brk this.reader.?;
};
return reader.start();
return reader.start(fd);
}
pub fn end(this: *@This()) void {

View File

@@ -6,7 +6,7 @@ const JSC = bun.JSC;
pub const WriteResult = union(enum) {
done: usize,
wrote: usize,
pending: void,
pending: usize,
err: bun.sys.Error,
};
@@ -21,6 +21,7 @@ pub fn PosixPipeWriter(
comptime onError: fn (*This, bun.sys.Error) void,
comptime onWritable: fn (*This) void,
) type {
_ = onWritable; // autofix
return struct {
pub fn _tryWrite(this: *This, buf_: []const u8) WriteResult {
const fd = getFd(this);
@@ -30,7 +31,7 @@ pub fn PosixPipeWriter(
switch (writeNonBlocking(fd, buf)) {
.err => |err| {
if (err.isRetry()) {
break;
return .{ .pending = buf_.len - buf.len };
}
return .{ .err = err };
@@ -63,10 +64,30 @@ pub fn PosixPipeWriter(
pub fn onPoll(parent: *This, size_hint: isize) void {
_ = size_hint; // autofix
drain(parent);
switch (drainBufferedData(parent)) {
.pending => {
if (comptime registerPoll) |register| {
register(parent);
}
},
.wrote => |amt| {
if (getBuffer(parent).len > 0) {
if (comptime registerPoll) |register| {
register(parent);
}
}
onWrite(parent, amt, false);
},
.err => |err| {
onError(parent, err);
},
.done => |amt| {
onWrite(parent, amt, true);
},
}
}
fn drain(parent: *This) bool {
pub fn drainBufferedData(parent: *This) WriteResult {
var buf = getBuffer(parent);
const original_buf = buf;
while (buf.len > 0) {
@@ -77,40 +98,70 @@ pub fn PosixPipeWriter(
buf = buf[amt..];
},
.err => |err| {
std.debug.assert(!err.isRetry());
const wrote = original_buf.len - buf.len;
if (wrote > 0) {
onWrite(parent, wrote, false);
if (err.isRetry()) {
return .{ .pending = wrote };
}
if (wrote > 0) {
onError(parent, err);
return .{ .wrote = wrote };
} else {
return .{ .err = err };
}
onError(parent, err);
},
.done => |amt| {
buf = buf[amt..];
const wrote = original_buf.len - buf.len;
onWrite(parent, wrote, true);
return false;
return .{ .done = wrote };
},
}
}
const wrote = original_buf.len - buf.len;
if (wrote < original_buf.len) {
if (comptime registerPoll) |register| {
register(parent);
}
}
if (wrote == 0) {
onWritable(parent);
} else {
onWrite(parent, wrote, false);
}
return .{ .wrote = wrote };
}
};
}
pub const PollOrFd = union(enum) {
/// When it's a pipe/fifo
poll: *Async.FilePoll,
fd: bun.FileDescriptor,
closed: void,
pub fn getFd(this: *const PollOrFd) bun.FileDescriptor {
return switch (this.*) {
.closed => bun.invalid_fd,
.fd => this.fd,
.poll => this.poll.fd,
};
}
pub fn getPoll(this: *const PollOrFd) ?*Async.FilePoll {
return switch (this.*) {
.closed => null,
.fd => null,
.poll => this.poll,
};
}
pub fn close(this: *PollOrFd, ctx: ?*anyopaque, comptime onCloseFn: anytype) void {
const fd = this.getFd();
if (this.* == .poll) {
this.poll.deinit();
this.* = .{ .closed = {} };
}
if (fd != bun.invalid_fd) {
this.handle = .{ .closed = {} };
onCloseFn(@ptrCast(ctx.?));
}
}
};
pub fn PosixBufferedWriter(
comptime Parent: type,
comptime onWrite: fn (*Parent, amount: usize, done: bool) void,
@@ -119,14 +170,18 @@ pub fn PosixBufferedWriter(
) type {
return struct {
buffer: []const u8 = "",
poll: ?*Async.FilePoll = null,
handle: PollOrFd = .{ .closed = {} },
parent: *Parent = undefined,
is_done: bool = false,
const PosixWriter = @This();
pub fn getPoll(this: *@This()) ?*Async.FilePoll {
return this.handle.getPoll();
}
pub fn getFd(this: *PosixWriter) bun.FileDescriptor {
return this.poll.fd;
return this.handle.getFd();
}
pub fn getBuffer(this: *PosixWriter) []const u8 {
@@ -138,9 +193,10 @@ pub fn PosixBufferedWriter(
err: bun.sys.Error,
) void {
std.debug.assert(!err.isRetry());
clearPoll(this);
onError(this.parent, err);
this.close();
}
fn _onWrite(
@@ -155,7 +211,7 @@ pub fn PosixBufferedWriter(
onWrite(parent, written, done);
if (done and !was_done) {
this.clearPoll();
this.close();
}
}
@@ -166,7 +222,7 @@ pub fn PosixBufferedWriter(
}
fn registerPoll(this: *PosixWriter) void {
var poll = this.poll orelse return;
var poll = this.getPoll() orelse return;
switch (poll.registerWithFd(bun.uws.Loop.get(), .writable, true, poll.fd)) {
.err => |err| {
onError(this, err);
@@ -178,18 +234,23 @@ pub fn PosixBufferedWriter(
pub const tryWrite = @This()._tryWrite;
pub fn hasRef(this: *PosixWriter) bool {
return !this.is_done and this.poll.canEnableKeepingProcessAlive();
if (this.is_done) {
return false;
}
const poll = this.getPoll() orelse return false;
return poll.canEnableKeepingProcessAlive();
}
pub fn enableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void {
if (this.is_done) return;
const poll = this.poll orelse return;
const poll = this.getPoll() orelse return;
poll.enableKeepingProcessAlive(event_loop);
}
pub fn disableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void {
const poll = this.poll orelse return;
const poll = this.getPoll() orelse return;
poll.disableKeepingProcessAlive(event_loop);
}
@@ -201,26 +262,23 @@ pub fn PosixBufferedWriter(
}
this.is_done = true;
clearPoll(this);
this.close();
}
fn clearPoll(this: *PosixWriter) void {
if (this.poll) |poll| {
const fd = poll.fd;
this.poll = null;
if (fd != bun.invalid_fd) {
_ = bun.sys.close(fd);
onClose(@ptrCast(this.parent));
}
poll.deinit();
}
pub fn close(this: *PosixWriter) void {
this.handle.close(this.parent, onClose);
}
pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, bytes: []const u8) JSC.Maybe(void) {
pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, bytes: []const u8, pollable: bool) JSC.Maybe(void) {
this.buffer = bytes;
if (!pollable) {
std.debug.assert(this.handle != .poll);
this.handle = .{ .fd = fd };
return JSC.Maybe(void){ .result = {} };
}
const loop = @as(*Parent, @ptrCast(this.parent)).loop();
var poll = this.poll orelse brk: {
this.poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this);
this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) };
break :brk this.poll.?;
};
@@ -245,17 +303,24 @@ pub fn PosixStreamingWriter(
) type {
return struct {
buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
poll: ?*Async.FilePoll = null,
handle: PollOrFd = .{ .closed = {} },
parent: *anyopaque = undefined,
is_done: bool = false,
head: usize = 0,
is_done: bool = false,
const PosixWriter = @This();
// TODO:
chunk_size: usize = 0,
pub fn getPoll(this: *@This()) ?*Async.FilePoll {
return this.handle.getPoll();
}
pub fn getFd(this: *PosixWriter) bun.FileDescriptor {
return this.poll.?.fd;
return this.handle.getFd();
}
const PosixWriter = @This();
pub fn getBuffer(this: *PosixWriter) []const u8 {
return this.buffer.items[this.head..];
}
@@ -266,7 +331,9 @@ pub fn PosixStreamingWriter(
) void {
std.debug.assert(!err.isRetry());
this.is_done = true;
onError(@ptrCast(this.parent), err);
this.close();
}
fn _onWrite(
@@ -274,10 +341,12 @@ pub fn PosixStreamingWriter(
written: usize,
done: bool,
) void {
this.buffer = this.buffer[written..];
this.head += written;
if (this.buffer.items.len == this.head) {
if (this.buffer.capacity > 32 * 1024 and !done) {
this.buffer.shrinkAndFree(std.mem.page_size);
}
this.buffer.clearRetainingCapacity();
this.head = 0;
}
@@ -297,9 +366,11 @@ pub fn PosixStreamingWriter(
}
fn registerPoll(this: *PosixWriter) void {
switch (this.poll.?.registerWithFd(@as(*Parent, @ptrCast(this.parent)).loop(), .writable, true, this.poll.fd)) {
const poll = this.getPoll() orelse return;
switch (poll.registerWithFd(@as(*Parent, @ptrCast(this.parent)).loop(), .writable, true, poll.fd)) {
.err => |err| {
onError(this, err);
this.close();
},
.result => {},
}
@@ -315,14 +386,94 @@ pub fn PosixStreamingWriter(
return .{ .err = bun.sys.Error.oom };
};
return .{ .pending = {} };
return .{ .pending = 0 };
}
return @This()._tryWrite(this, buf);
}
pub fn writeUTF16(this: *PosixWriter, buf: []const u16) WriteResult {
if (this.is_done) {
return .{ .done = 0 };
}
const had_buffered_data = this.buffer.items.len > 0;
{
var byte_list = bun.ByteList.fromList(this.buffer);
defer this.buffer = byte_list.listManaged(bun.default_allocator);
byte_list.writeUTF16(bun.default_allocator, buf) catch {
return .{ .err = bun.sys.Error.oom };
};
}
if (had_buffered_data) {
return .{ .pending = 0 };
}
return this._tryWriteNewlyBufferedData();
}
pub fn writeLatin1(this: *PosixWriter, buf: []const u8) WriteResult {
if (this.is_done) {
return .{ .done = 0 };
}
if (bun.strings.isAllASCII(buf)) {
return this.write(buf);
}
const had_buffered_data = this.buffer.items.len > 0;
{
var byte_list = bun.ByteList.fromList(this.buffer);
defer this.buffer = byte_list.listManaged(bun.default_allocator);
byte_list.writeLatin1(bun.default_allocator, buf) catch {
return .{ .err = bun.sys.Error.oom };
};
}
if (had_buffered_data) {
return .{ .pending = 0 };
}
return this._tryWriteNewlyBufferedData();
}
fn _tryWriteNewlyBufferedData(this: *PosixWriter) WriteResult {
std.debug.assert(!this.is_done);
switch (@This()._tryWrite(this, this.buffer.items)) {
.wrote => |amt| {
if (amt == this.buffer.items.len) {
this.buffer.clearRetainingCapacity();
} else {
this.head = amt;
}
return .{ .wrote = amt };
},
.done => |amt| {
this.buffer.clearRetainingCapacity();
return .{ .done = amt };
},
}
}
pub fn write(this: *PosixWriter, buf: []const u8) WriteResult {
const rc = tryWrite(this, buf);
if (this.is_done) {
return .{ .done = 0 };
}
if (this.buffer.items.len + buf.len < this.chunk_size) {
this.buffer.appendSlice(buf) catch {
return .{ .err = bun.sys.Error.oom };
};
return .{ .pending = 0 };
}
const rc = @This()._tryWrite(this, buf);
if (rc == .pending) {
registerPoll(this);
return rc;
@@ -351,23 +502,30 @@ pub fn PosixStreamingWriter(
pub usingnamespace PosixPipeWriter(@This(), getFd, getBuffer, _onWrite, registerPoll, _onError, _onWritable);
pub fn flush(this: *PosixWriter) WriteResult {
return this.drainBufferedData();
}
pub fn deinit(this: *PosixWriter) void {
this.buffer.clearAndFree();
this.clearPoll();
this.close();
}
pub fn hasRef(this: *PosixWriter) bool {
return !this.is_done and this.poll.?.canEnableKeepingProcessAlive();
const poll = this.poll orelse return false;
return !this.is_done and poll.canEnableKeepingProcessAlive();
}
pub fn enableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void {
if (this.is_done) return;
const poll = this.getPoll() orelse return;
this.poll.?.enableKeepingProcessAlive(event_loop);
poll.enableKeepingProcessAlive(event_loop);
}
pub fn disableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void {
this.poll.?.disableKeepingProcessAlive(event_loop);
const poll = this.getPoll() orelse return;
poll.disableKeepingProcessAlive(event_loop);
}
pub fn end(this: *PosixWriter) void {
@@ -376,25 +534,23 @@ pub fn PosixStreamingWriter(
}
this.is_done = true;
clearPoll(this);
this.close();
}
fn clearPoll(this: *PosixWriter) void {
if (this.poll) |poll| {
const fd = poll.fd;
poll.deinit();
this.poll = null;
pub fn close(this: *PosixWriter) void {
this.handle.close(@ptrCast(this.parent), onClose);
}
if (fd != bun.invalid_fd) {
onClose(@ptrCast(this.parent));
}
pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, is_pollable: bool) JSC.Maybe(void) {
if (!is_pollable) {
this.close();
this.handle = .{ .fd = fd };
return JSC.Maybe(void){ .result = {} };
}
}
pub fn start(this: *PosixWriter, fd: bun.FileDescriptor) JSC.Maybe(void) {
const loop = @as(*Parent, @ptrCast(this.parent)).loop();
var poll = this.poll orelse brk: {
this.poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this);
this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) };
break :brk this.poll.?;
};