diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 512f016859..fbe433c84a 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -277,8 +277,6 @@ pub const Subprocess = struct { const Readable = union(enum) { fd: bun.FileDescriptor, memfd: bun.FileDescriptor, - sync_buffered_output: *BufferedOutput, - pipe: Pipe, inherit: void, ignore: void, @@ -320,7 +318,7 @@ pub const Subprocess = struct { pub const Pipe = union(enum) { stream: JSC.WebCore.ReadableStream, - buffer: StreamingOutput, + buffer: PipeReader, detached: void, pub fn finish(this: *@This()) void { @@ -760,6 +758,17 @@ pub const Subprocess = struct { pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub fn readAll(this: *PipeReader) void { + if (this.state == .pending) + this.reader.read(); + } + + pub fn start(this: *PipeReader, process: *Subprocess, event_loop: *JSC.EventLoop) JSC.Maybe(void) { + this.process = process; + this.event_loop = event_loop; + return this.reader.start(); + } + pub fn onOutputDone(this: *PipeReader) void { const owned = this.toOwnedSlice(); this.state = .{ .done = owned }; diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 8d01175036..d91fbee5ad 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -826,7 +826,7 @@ pub const VirtualMachine = struct { bun.reloadProcess(bun.default_allocator, !strings.eqlComptime(this.bundler.env.map.get("BUN_CONFIG_NO_CLEAR_TERMINAL_ON_RELOAD") orelse "0", "true")); } - if (!strings.eqlComptime(this.bundler.env.map.get("BUN_CONFIG_NO_CLEAR_TERMINAL_ON_RELOAD") orelse "0", "true")) { + if (!strings.eqlComptime(this.bundler.env.get("BUN_CONFIG_NO_CLEAR_TERMINAL_ON_RELOAD") orelse "0", "true")) { Output.flush(); Output.disableBuffering(); Output.resetTerminalAll(); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index f4751167c4..6e6ad5fd19 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -242,7 +242,7 @@ pub const ReadableStream = struct { Bytes: *ByteStream, - Pipe: *ReadableStreamPipe, + Pipe: *PipeReader, }; extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag; @@ -303,7 +303,7 @@ pub const ReadableStream = struct { .Pipe => ReadableStream{ .value = value, .ptr = .{ - .Pipe = ptr.asPtr(ReadableStreamPipe), + .Pipe = ptr.asPtr(PipeReader), }, }, @@ -366,7 +366,7 @@ pub const ReadableStream = struct { buffered_reader: anytype, ) JSC.JSValue { JSC.markBinding(@src()); - var source = bun.default_allocator.create(ReadableStreamPipe.Source) catch bun.outOfMemory(); + var source = bun.default_allocator.create(PipeReader.Source) catch bun.outOfMemory(); source.* = .{ .globalThis = globalThis, .context = undefined, @@ -440,9 +440,9 @@ pub const StreamStart = union(Tag) { chunk_size, ArrayBufferSink, FileSink, + PipeSink, HTTPSResponseSink, HTTPResponseSink, - UVStreamSink, ready, }; @@ -498,12 +498,12 @@ pub const StreamStart = union(Tag) { empty = false; } - if (value.get(globalThis, "stream")) |as_array| { + if (value.fastGet(globalThis, .stream)) |as_array| { stream = as_array.toBoolean(); empty = false; } - if (value.get(globalThis, "highWaterMark")) |chunkSize| { + if (value.fastGet(globalThis, .highWaterMark)) |chunkSize| { if (chunkSize.isNumber()) { empty = false; chunk_size = @as(JSC.WebCore.Blob.SizeType, @intCast(@max(0, @as(i51, @truncate(chunkSize.toInt64()))))); @@ -523,12 +523,12 @@ pub const StreamStart = union(Tag) { .FileSink => { var chunk_size: JSC.WebCore.Blob.SizeType = 0; - if (value.getTruthy(globalThis, "highWaterMark")) |chunkSize| { + if (value.fastGet(globalThis, .highWaterMark)) |chunkSize| { if (chunkSize.isNumber()) chunk_size = @as(JSC.WebCore.Blob.SizeType, @intCast(@max(0, @as(i51, @truncate(chunkSize.toInt64()))))); } - if (value.getTruthy(globalThis, "path")) |path| { + if (value.fastGet(globalThis, .path)) |path| { if (!path.isString()) { return .{ .err = Syscall.Error{ @@ -586,7 +586,7 @@ pub const StreamStart = union(Tag) { var empty = true; var chunk_size: JSC.WebCore.Blob.SizeType = 2048; - if (value.getTruthy(globalThis, "highWaterMark")) |chunkSize| { + if (value.fastGet(globalThis, .highWaterMark)) |chunkSize| { if (chunkSize.isNumber()) { empty = false; chunk_size = @as(JSC.WebCore.Blob.SizeType, @intCast(@max(256, @as(i51, @truncate(chunkSize.toInt64()))))); @@ -3662,7 +3662,7 @@ pub fn ReadableStreamSource( }; } -pub const ReadableStreamPipe = struct { +pub const PipeReader = struct { reader: bun.io.BufferedOutputReader(@This(), onReadChunk) = .{}, done: bool = false, pending: StreamResult.Pending = .{}, @@ -3670,10 +3670,10 @@ pub const ReadableStreamPipe = struct { pending_view: []u8 = []u8{}, pub fn setup( - this: *ReadableStreamPipe, + this: *PipeReader, other_reader: anytype, ) void { - this.* = ReadableStreamPipe{ + this.* = PipeReader{ .reader = .{}, .done = false, }; @@ -3681,8 +3681,13 @@ pub const ReadableStreamPipe = struct { this.reader.fromOutputReader(other_reader, this); } - pub fn onStart(this: *ReadableStreamPipe) StreamStart { - _ = this; // autofix + pub fn onStart(this: *PipeReader) StreamStart { + switch (this.reader.start()) { + .result => {}, + .err => |e| { + return .{ .err = e }; + }, + } return .{ .ready = {} }; } @@ -3691,13 +3696,13 @@ pub const ReadableStreamPipe = struct { return @fieldParentPtr(Source, "context", this); } - pub fn onCancel(this: *ReadableStreamPipe) void { + pub fn onCancel(this: *PipeReader) void { if (this.done) return; this.done = true; this.reader.close(); } - pub fn deinit(this: *ReadableStreamPipe) void { + pub fn deinit(this: *PipeReader) void { this.reader.deinit(); this.pending_value.deinit(); } @@ -3738,7 +3743,7 @@ pub const ReadableStreamPipe = struct { } } - pub fn onPull(this: *ReadableStreamPipe, buffer: []u8, array: JSC.JSValue) StreamResult { + pub fn onPull(this: *PipeReader, buffer: []u8, array: JSC.JSValue) StreamResult { array.ensureStillAlive(); defer array.ensureStillAlive(); const drained = this.drain(); @@ -3773,7 +3778,7 @@ pub const ReadableStreamPipe = struct { return .{ .pending = &this.pending }; } - pub fn drain(this: *ReadableStreamPipe) bun.ByteList { + pub fn drain(this: *PipeReader) bun.ByteList { if (this.reader.hasPendingRead()) { return .{}; } @@ -3783,7 +3788,7 @@ pub const ReadableStreamPipe = struct { return bun.ByteList.fromList(out); } - pub fn setRefOrUnref(this: *ReadableStreamPipe, enable: bool) void { + pub fn setRefOrUnref(this: *PipeReader, enable: bool) void { if (this.done) return; if (enable) { this.reader.enableKeepingProcessAlive(JSC.EventLoopHandle.init(this.parent().globalThis.bunVM().eventLoop())); diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 0749723676..428e224bca 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -397,8 +397,10 @@ pub fn PosixBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?* } } - pub fn start(this: *PosixOutputReader) bun.JSC.Maybe(void) { - const maybe = this.poll.register(this.parent.loop(), .readable, true); + pub fn start(this: *PosixOutputReader, fd: bun.FileDescriptor) bun.JSC.Maybe(void) { + const poll = Async.FilePoll.init(this.parent.loop(), fd, .readable, @This(), this); + this.poll = poll; + const maybe = poll.register(this.parent.loop(), .readable, true); if (maybe != .result) { return maybe; } @@ -515,7 +517,7 @@ pub const GenericWindowsBufferedOutputReader = struct { return this._buffer.allocatedSlice()[this._buffer.items.len..]; } - pub fn start(this: *WindowsOutputReader) JSC.Maybe(void) { + pub fn start(this: *@This(), _: bun.FileDescriptor) bun.JSC.Maybe(void) { this.buffer.clearRetainingCapacity(); this.is_done = false; } @@ -577,13 +579,13 @@ pub fn WindowsBufferedOutputReader(comptime Parent: type, comptime onReadChunk: reader.deref(); } - pub fn start(this: *@This()) bun.JSC.Maybe(void) { + pub fn start(this: *@This(), fd: bun.FileDescriptor) bun.JSC.Maybe(void) { const reader = this.reader orelse brk: { this.reader = this.newReader(); break :brk this.reader.?; }; - return reader.start(); + return reader.start(fd); } pub fn end(this: *@This()) void { diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index ba46146200..967f743260 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -6,7 +6,7 @@ const JSC = bun.JSC; pub const WriteResult = union(enum) { done: usize, wrote: usize, - pending: void, + pending: usize, err: bun.sys.Error, }; @@ -21,6 +21,7 @@ pub fn PosixPipeWriter( comptime onError: fn (*This, bun.sys.Error) void, comptime onWritable: fn (*This) void, ) type { + _ = onWritable; // autofix return struct { pub fn _tryWrite(this: *This, buf_: []const u8) WriteResult { const fd = getFd(this); @@ -30,7 +31,7 @@ pub fn PosixPipeWriter( switch (writeNonBlocking(fd, buf)) { .err => |err| { if (err.isRetry()) { - break; + return .{ .pending = buf_.len - buf.len }; } return .{ .err = err }; @@ -63,10 +64,30 @@ pub fn PosixPipeWriter( pub fn onPoll(parent: *This, size_hint: isize) void { _ = size_hint; // autofix - drain(parent); + switch (drainBufferedData(parent)) { + .pending => { + if (comptime registerPoll) |register| { + register(parent); + } + }, + .wrote => |amt| { + if (getBuffer(parent).len > 0) { + if (comptime registerPoll) |register| { + register(parent); + } + } + onWrite(parent, amt, false); + }, + .err => |err| { + onError(parent, err); + }, + .done => |amt| { + onWrite(parent, amt, true); + }, + } } - fn drain(parent: *This) bool { + pub fn drainBufferedData(parent: *This) WriteResult { var buf = getBuffer(parent); const original_buf = buf; while (buf.len > 0) { @@ -77,40 +98,70 @@ pub fn PosixPipeWriter( buf = buf[amt..]; }, .err => |err| { - std.debug.assert(!err.isRetry()); const wrote = original_buf.len - buf.len; - if (wrote > 0) { - onWrite(parent, wrote, false); + if (err.isRetry()) { + return .{ .pending = wrote }; + } + + if (wrote > 0) { + onError(parent, err); + return .{ .wrote = wrote }; + } else { + return .{ .err = err }; } - onError(parent, err); }, .done => |amt| { buf = buf[amt..]; const wrote = original_buf.len - buf.len; - onWrite(parent, wrote, true); - - return false; + return .{ .done = wrote }; }, } } const wrote = original_buf.len - buf.len; - if (wrote < original_buf.len) { - if (comptime registerPoll) |register| { - register(parent); - } - } - - if (wrote == 0) { - onWritable(parent); - } else { - onWrite(parent, wrote, false); - } + return .{ .wrote = wrote }; } }; } +pub const PollOrFd = union(enum) { + /// When it's a pipe/fifo + poll: *Async.FilePoll, + + fd: bun.FileDescriptor, + closed: void, + + pub fn getFd(this: *const PollOrFd) bun.FileDescriptor { + return switch (this.*) { + .closed => bun.invalid_fd, + .fd => this.fd, + .poll => this.poll.fd, + }; + } + + pub fn getPoll(this: *const PollOrFd) ?*Async.FilePoll { + return switch (this.*) { + .closed => null, + .fd => null, + .poll => this.poll, + }; + } + + pub fn close(this: *PollOrFd, ctx: ?*anyopaque, comptime onCloseFn: anytype) void { + const fd = this.getFd(); + if (this.* == .poll) { + this.poll.deinit(); + this.* = .{ .closed = {} }; + } + + if (fd != bun.invalid_fd) { + this.handle = .{ .closed = {} }; + onCloseFn(@ptrCast(ctx.?)); + } + } +}; + pub fn PosixBufferedWriter( comptime Parent: type, comptime onWrite: fn (*Parent, amount: usize, done: bool) void, @@ -119,14 +170,18 @@ pub fn PosixBufferedWriter( ) type { return struct { buffer: []const u8 = "", - poll: ?*Async.FilePoll = null, + handle: PollOrFd = .{ .closed = {} }, parent: *Parent = undefined, is_done: bool = false, const PosixWriter = @This(); + pub fn getPoll(this: *@This()) ?*Async.FilePoll { + return this.handle.getPoll(); + } + pub fn getFd(this: *PosixWriter) bun.FileDescriptor { - return this.poll.fd; + return this.handle.getFd(); } pub fn getBuffer(this: *PosixWriter) []const u8 { @@ -138,9 +193,10 @@ pub fn PosixBufferedWriter( err: bun.sys.Error, ) void { std.debug.assert(!err.isRetry()); - clearPoll(this); onError(this.parent, err); + + this.close(); } fn _onWrite( @@ -155,7 +211,7 @@ pub fn PosixBufferedWriter( onWrite(parent, written, done); if (done and !was_done) { - this.clearPoll(); + this.close(); } } @@ -166,7 +222,7 @@ pub fn PosixBufferedWriter( } fn registerPoll(this: *PosixWriter) void { - var poll = this.poll orelse return; + var poll = this.getPoll() orelse return; switch (poll.registerWithFd(bun.uws.Loop.get(), .writable, true, poll.fd)) { .err => |err| { onError(this, err); @@ -178,18 +234,23 @@ pub fn PosixBufferedWriter( pub const tryWrite = @This()._tryWrite; pub fn hasRef(this: *PosixWriter) bool { - return !this.is_done and this.poll.canEnableKeepingProcessAlive(); + if (this.is_done) { + return false; + } + + const poll = this.getPoll() orelse return false; + return poll.canEnableKeepingProcessAlive(); } pub fn enableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void { if (this.is_done) return; - const poll = this.poll orelse return; + const poll = this.getPoll() orelse return; poll.enableKeepingProcessAlive(event_loop); } pub fn disableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void { - const poll = this.poll orelse return; + const poll = this.getPoll() orelse return; poll.disableKeepingProcessAlive(event_loop); } @@ -201,26 +262,23 @@ pub fn PosixBufferedWriter( } this.is_done = true; - clearPoll(this); + this.close(); } - fn clearPoll(this: *PosixWriter) void { - if (this.poll) |poll| { - const fd = poll.fd; - this.poll = null; - if (fd != bun.invalid_fd) { - _ = bun.sys.close(fd); - onClose(@ptrCast(this.parent)); - } - poll.deinit(); - } + pub fn close(this: *PosixWriter) void { + this.handle.close(this.parent, onClose); } - pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, bytes: []const u8) JSC.Maybe(void) { + pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, bytes: []const u8, pollable: bool) JSC.Maybe(void) { this.buffer = bytes; + if (!pollable) { + std.debug.assert(this.handle != .poll); + this.handle = .{ .fd = fd }; + return JSC.Maybe(void){ .result = {} }; + } const loop = @as(*Parent, @ptrCast(this.parent)).loop(); var poll = this.poll orelse brk: { - this.poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this); + this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) }; break :brk this.poll.?; }; @@ -245,17 +303,24 @@ pub fn PosixStreamingWriter( ) type { return struct { buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), - poll: ?*Async.FilePoll = null, + handle: PollOrFd = .{ .closed = {} }, parent: *anyopaque = undefined, - is_done: bool = false, head: usize = 0, + is_done: bool = false, - const PosixWriter = @This(); + // TODO: + chunk_size: usize = 0, + + pub fn getPoll(this: *@This()) ?*Async.FilePoll { + return this.handle.getPoll(); + } pub fn getFd(this: *PosixWriter) bun.FileDescriptor { - return this.poll.?.fd; + return this.handle.getFd(); } + const PosixWriter = @This(); + pub fn getBuffer(this: *PosixWriter) []const u8 { return this.buffer.items[this.head..]; } @@ -266,7 +331,9 @@ pub fn PosixStreamingWriter( ) void { std.debug.assert(!err.isRetry()); this.is_done = true; + onError(@ptrCast(this.parent), err); + this.close(); } fn _onWrite( @@ -274,10 +341,12 @@ pub fn PosixStreamingWriter( written: usize, done: bool, ) void { - this.buffer = this.buffer[written..]; this.head += written; if (this.buffer.items.len == this.head) { + if (this.buffer.capacity > 32 * 1024 and !done) { + this.buffer.shrinkAndFree(std.mem.page_size); + } this.buffer.clearRetainingCapacity(); this.head = 0; } @@ -297,9 +366,11 @@ pub fn PosixStreamingWriter( } fn registerPoll(this: *PosixWriter) void { - switch (this.poll.?.registerWithFd(@as(*Parent, @ptrCast(this.parent)).loop(), .writable, true, this.poll.fd)) { + const poll = this.getPoll() orelse return; + switch (poll.registerWithFd(@as(*Parent, @ptrCast(this.parent)).loop(), .writable, true, poll.fd)) { .err => |err| { onError(this, err); + this.close(); }, .result => {}, } @@ -315,14 +386,94 @@ pub fn PosixStreamingWriter( return .{ .err = bun.sys.Error.oom }; }; - return .{ .pending = {} }; + return .{ .pending = 0 }; } return @This()._tryWrite(this, buf); } + pub fn writeUTF16(this: *PosixWriter, buf: []const u16) WriteResult { + if (this.is_done) { + return .{ .done = 0 }; + } + + const had_buffered_data = this.buffer.items.len > 0; + { + var byte_list = bun.ByteList.fromList(this.buffer); + defer this.buffer = byte_list.listManaged(bun.default_allocator); + + byte_list.writeUTF16(bun.default_allocator, buf) catch { + return .{ .err = bun.sys.Error.oom }; + }; + } + + if (had_buffered_data) { + return .{ .pending = 0 }; + } + + return this._tryWriteNewlyBufferedData(); + } + + pub fn writeLatin1(this: *PosixWriter, buf: []const u8) WriteResult { + if (this.is_done) { + return .{ .done = 0 }; + } + + if (bun.strings.isAllASCII(buf)) { + return this.write(buf); + } + + const had_buffered_data = this.buffer.items.len > 0; + { + var byte_list = bun.ByteList.fromList(this.buffer); + defer this.buffer = byte_list.listManaged(bun.default_allocator); + + byte_list.writeLatin1(bun.default_allocator, buf) catch { + return .{ .err = bun.sys.Error.oom }; + }; + } + + if (had_buffered_data) { + return .{ .pending = 0 }; + } + + return this._tryWriteNewlyBufferedData(); + } + + fn _tryWriteNewlyBufferedData(this: *PosixWriter) WriteResult { + std.debug.assert(!this.is_done); + + switch (@This()._tryWrite(this, this.buffer.items)) { + .wrote => |amt| { + if (amt == this.buffer.items.len) { + this.buffer.clearRetainingCapacity(); + } else { + this.head = amt; + } + return .{ .wrote = amt }; + }, + .done => |amt| { + this.buffer.clearRetainingCapacity(); + + return .{ .done = amt }; + }, + } + } + pub fn write(this: *PosixWriter, buf: []const u8) WriteResult { - const rc = tryWrite(this, buf); + if (this.is_done) { + return .{ .done = 0 }; + } + + if (this.buffer.items.len + buf.len < this.chunk_size) { + this.buffer.appendSlice(buf) catch { + return .{ .err = bun.sys.Error.oom }; + }; + + return .{ .pending = 0 }; + } + + const rc = @This()._tryWrite(this, buf); if (rc == .pending) { registerPoll(this); return rc; @@ -351,23 +502,30 @@ pub fn PosixStreamingWriter( pub usingnamespace PosixPipeWriter(@This(), getFd, getBuffer, _onWrite, registerPoll, _onError, _onWritable); + pub fn flush(this: *PosixWriter) WriteResult { + return this.drainBufferedData(); + } + pub fn deinit(this: *PosixWriter) void { this.buffer.clearAndFree(); - this.clearPoll(); + this.close(); } pub fn hasRef(this: *PosixWriter) bool { - return !this.is_done and this.poll.?.canEnableKeepingProcessAlive(); + const poll = this.poll orelse return false; + return !this.is_done and poll.canEnableKeepingProcessAlive(); } pub fn enableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void { if (this.is_done) return; + const poll = this.getPoll() orelse return; - this.poll.?.enableKeepingProcessAlive(event_loop); + poll.enableKeepingProcessAlive(event_loop); } pub fn disableKeepingProcessAlive(this: *PosixWriter, event_loop: JSC.EventLoopHandle) void { - this.poll.?.disableKeepingProcessAlive(event_loop); + const poll = this.getPoll() orelse return; + poll.disableKeepingProcessAlive(event_loop); } pub fn end(this: *PosixWriter) void { @@ -376,25 +534,23 @@ pub fn PosixStreamingWriter( } this.is_done = true; - clearPoll(this); + this.close(); } - fn clearPoll(this: *PosixWriter) void { - if (this.poll) |poll| { - const fd = poll.fd; - poll.deinit(); - this.poll = null; + pub fn close(this: *PosixWriter) void { + this.handle.close(@ptrCast(this.parent), onClose); + } - if (fd != bun.invalid_fd) { - onClose(@ptrCast(this.parent)); - } + pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, is_pollable: bool) JSC.Maybe(void) { + if (!is_pollable) { + this.close(); + this.handle = .{ .fd = fd }; + return JSC.Maybe(void){ .result = {} }; } - } - pub fn start(this: *PosixWriter, fd: bun.FileDescriptor) JSC.Maybe(void) { const loop = @as(*Parent, @ptrCast(this.parent)).loop(); var poll = this.poll orelse brk: { - this.poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this); + this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) }; break :brk this.poll.?; };