Introduce PipeSink

This commit is contained in:
Jarred Sumner
2024-02-01 04:52:19 -08:00
parent cdb089b406
commit 59e17f6728
8 changed files with 584 additions and 401 deletions

View File

@@ -212,6 +212,10 @@ pub const Subprocess = struct {
return true;
}
if (this.hasPendingActivityStdio()) {
return true;
}
return this.process.hasRef();
}
@@ -229,6 +233,48 @@ pub const Subprocess = struct {
);
}
pub fn hasPendingActivityStdio(this: *const Subprocess) bool {
if (this.stdin.hasPendingActivity()) {
return true;
}
inline for (.{ StdioKind.stdout, StdioKind.stderr }) |kind| {
if (@field(this, @tagName(kind)).hasPendingActivity()) {
return true;
}
}
}
pub fn onCloseIO(this: *Subprocess, kind: StdioKind) void {
switch (kind) {
.stdin => {
switch (this.stdin) {
.pipe => |pipe| {
pipe.signal.clear();
pipe.deref();
this.stdin.* = .{ .ignore = {} };
},
.buffer => {
this.stdin.buffer.source.detach();
this.stdin.buffer.deref();
this.stdin.* = .{ .ignore = {} };
},
else => {},
}
},
inline .stdout, .stderr => |tag| {
const out: *Readable = &@field(this, @tagName(tag));
switch (out.*) {
.pipe => |pipe| {
out.* = .{ .ignore = {} };
pipe.deref();
},
else => {},
}
},
}
}
pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool {
@fence(.Acquire);
return this.has_pending_activity.load(.Acquire);
@@ -277,7 +323,7 @@ pub const Subprocess = struct {
const Readable = union(enum) {
fd: bun.FileDescriptor,
memfd: bun.FileDescriptor,
pipe: Pipe,
pipe: *PipeReader,
inherit: void,
ignore: void,
closed: void,
@@ -285,15 +331,7 @@ pub const Subprocess = struct {
pub fn ref(this: *Readable) void {
switch (this.*) {
.pipe => {
if (this.pipe == .buffer) {
if (Environment.isWindows) {
uv.uv_ref(@ptrCast(&this.pipe.buffer.stream));
return;
}
if (this.pipe.buffer.stream.poll_ref) |poll| {
poll.enableKeepingProcessAlive(JSC.VirtualMachine.get());
}
}
this.pipe.updateRef(true);
},
else => {},
}
@@ -302,89 +340,16 @@ pub const Subprocess = struct {
pub fn unref(this: *Readable) void {
switch (this.*) {
.pipe => {
if (this.pipe == .buffer) {
if (Environment.isWindows) {
uv.uv_unref(@ptrCast(&this.pipe.buffer.stream));
return;
}
if (this.pipe.buffer.stream.poll_ref) |poll| {
poll.disableKeepingProcessAlive(JSC.VirtualMachine.get());
}
}
this.pipe.updateRef(false);
},
else => {},
}
}
pub const Pipe = union(enum) {
stream: JSC.WebCore.ReadableStream,
buffer: PipeReader,
detached: void,
pub fn finish(this: *@This()) void {
if (this.* == .stream and this.stream.ptr == .File) {
this.stream.ptr.File.finish();
}
}
pub fn done(this: *@This()) void {
if (this.* == .detached)
return;
if (this.* == .stream) {
if (this.stream.ptr == .File) this.stream.ptr.File.setSignal(JSC.WebCore.Signal{});
this.stream.done();
return;
}
this.buffer.close();
}
pub fn toJS(this: *@This(), readable: *Readable, globalThis: *JSC.JSGlobalObject, exited: bool) JSValue {
if (comptime Environment.allow_assert)
std.debug.assert(this.* != .detached); // this should be cached by the getter
if (this.* != .stream) {
const stream = this.buffer.toReadableStream(globalThis, exited);
// we do not detach on windows
if (Environment.isWindows) {
return stream.toJS();
}
this.* = .{ .stream = stream };
}
if (this.stream.ptr == .File) {
this.stream.ptr.File.setSignal(JSC.WebCore.Signal.init(readable));
}
const result = this.stream.toJS();
this.* = .detached;
return result;
}
};
pub fn initWithPipe(stdio: Stdio, pipe: *uv.Pipe, allocator: std.mem.Allocator, max_size: u32) Readable {
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore => Readable{ .ignore = {} },
.pipe => brk: {
break :brk .{
.pipe = .{
.buffer = StreamingOutput.initWithPipeAndAllocator(allocator, pipe, max_size),
},
};
},
.path => Readable{ .ignore = {} },
.blob, .fd => @panic("use init() instead"),
.memfd => Readable{ .memfd = stdio.memfd },
.array_buffer => Readable{
.pipe = .{
.buffer = StreamingOutput.initWithPipeAndSlice(pipe, stdio.array_buffer.slice()),
},
},
};
}
pub fn init(stdio: Stdio, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32, is_sync: bool) Readable {
pub fn init(stdio: Stdio, event_loop: *JSC.EventLoop, process: *Subprocess, fd: ?bun.FileDescriptor, allocator: std.mem.Allocator, max_size: u32, is_sync: bool) Readable {
_ = allocator; // autofix
_ = max_size; // autofix
_ = is_sync; // autofix
if (comptime Environment.allow_assert) {
if (fd) |fd_| {
std.debug.assert(fd_ != bun.invalid_fd);
@@ -394,22 +359,10 @@ pub const Subprocess = struct {
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore => Readable{ .ignore = {} },
.pipe => brk: {
if (is_sync) {}
break :brk .{
.pipe = .{
.buffer = StreamingOutput.initWithAllocator(allocator, fd.?, max_size),
},
};
},
.path => Readable{ .ignore = {} },
.blob, .fd => Readable{ .fd = fd.? },
.fd => Readable{ .fd = fd.? },
.memfd => Readable{ .memfd = stdio.memfd },
.array_buffer => Readable{
.pipe = .{
.buffer = StreamingOutput.initWithSlice(fd.?, stdio.array_buffer.slice()),
},
},
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, fd.?) },
};
}
@@ -428,28 +381,12 @@ pub const Subprocess = struct {
_ = bun.sys.close(fd);
},
.pipe => {
this.pipe.done();
this.pipe.close();
},
else => {},
}
}
pub fn setCloseCallbackIfPossible(this: *Readable, callback: CloseCallbackHandler) bool {
switch (this.*) {
.pipe => {
if (Environment.isWindows) {
if (uv.uv_is_closed(@ptrCast(this.pipe.buffer.stream))) {
return false;
}
this.pipe.buffer.closeCallback = callback;
return true;
}
return false;
},
else => return false,
}
}
pub fn finalize(this: *Readable) void {
switch (this.*) {
inline .memfd, .fd => |fd| {
@@ -457,16 +394,8 @@ pub const Subprocess = struct {
_ = bun.sys.close(fd);
},
.pipe => |*pipe| {
if (pipe.* == .detached) {
return;
}
if (pipe.* == .stream and pipe.stream.ptr == .File) {
this.close();
return;
}
pipe.buffer.close();
defer pipe.detach();
this.* = .{ .closed = {} };
},
else => {},
}
@@ -480,8 +409,10 @@ pub const Subprocess = struct {
.fd => |fd| {
return JSValue.jsNumber(fd);
},
.pipe => {
return this.pipe.toJS(this, globalThis, exited);
.pipe => |pipe| {
defer pipe.detach();
this.* = .{ .closed = {} };
return pipe.toJS(this, globalThis, exited);
},
else => {
return JSValue.jsUndefined();
@@ -501,28 +432,10 @@ pub const Subprocess = struct {
this.* = .{ .closed = {} };
return JSC.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis);
},
.sync_buffered_output => |*sync_buffered_output| {
const slice = sync_buffered_output.toOwnedSlice(globalThis);
.pipe => |pipe| {
defer pipe.detach();
this.* = .{ .closed = {} };
return JSC.MarkedArrayBuffer
.fromBytes(slice, bun.default_allocator, .Uint8Array)
.toNodeBuffer(globalThis);
},
.pipe => {
if (!Environment.isWindows) {
this.pipe.buffer.stream.close_on_empty_read = true;
this.pipe.buffer.readAll();
}
const bytes = this.pipe.buffer.internal_buffer.slice();
this.pipe.buffer.internal_buffer = .{};
if (bytes.len > 0) {
// Return a Buffer so that they can do .toString() on it
return JSC.JSValue.createBuffer(globalThis, bytes, bun.default_allocator);
}
return JSC.JSValue.createBuffer(globalThis, &.{}, bun.default_allocator);
return pipe.toBuffer(globalThis);
},
else => {
return JSValue.jsUndefined();
@@ -694,11 +607,44 @@ pub const Subprocess = struct {
pub usingnamespace bun.NewRefCounted(@This(), deinit);
pub fn updateRef(this: *StaticPipeWriter, add: bool) void {
if (add) {
this.writer.updateRef(this.event_loop, true);
} else {
this.writer.updateRef(this.event_loop, false);
}
}
pub fn close(this: *StaticPipeWriter) void {
this.writer.close();
}
pub fn flush(this: *StaticPipeWriter) void {
this.writer.flush();
}
pub fn create(event_loop: *JSC.EventLoop, subprocess: *Subprocess, fd: bun.FileDescriptor, source: Source) *StaticPipeWriter {
return StaticPipeWriter.new(.{
.event_loop = event_loop,
.process = subprocess,
.fd = fd,
.source = source,
});
}
pub const Source = union(enum) {
blob: JSC.WebCore.Blob,
array_buffer: JSC.ArrayBuffer.Strong,
detached: void,
pub fn slice(this: *const Source) []const u8 {
return switch (this.*) {
.blob => this.blob.sharedView(),
.array_buffer => this.array_buffer.slice(),
else => @panic("Invalid source"),
};
}
pub fn detach(this: *@This()) void {
switch (this.*) {
.blob => {
@@ -755,9 +701,23 @@ pub const Subprocess = struct {
done: []u8,
err: bun.sys.Error,
} = .{ .pending = {} },
fd: bun.FileDescriptor = bun.invalid_fd,
pub usingnamespace bun.NewRefCounted(@This(), deinit);
pub fn detach(this: *PipeReader) void {
this.process = undefined;
this.deref();
}
pub fn create(event_loop: *JSC.EventLoop, process: *Subprocess, fd: bun.FileDescriptor) *PipeReader {
return PipeReader.new(.{
.process = process,
.event_loop = event_loop,
.fd = fd,
});
}
pub fn readAll(this: *PipeReader) void {
if (this.state == .pending)
this.reader.read();
@@ -788,33 +748,46 @@ pub const Subprocess = struct {
return out;
}
pub fn toReadableStream(this: *PipeReader) JSC.JSValue {
pub fn setFd(this: *PipeReader, fd: bun.FileDescriptor) *PipeReader {
this.fd = fd;
return this;
}
pub fn updateRef(this: *PipeReader, add: bool) void {
if (add) {
this.reader.updateRef(this.event_loop, true);
} else {
this.reader.updateRef(this.event_loop, false);
}
}
pub fn toReadableStream(this: *PipeReader, globalObject: *JSC.JSGlobalObject) JSC.JSValue {
switch (this.state) {
.pending => {
const stream = JSC.WebCore.ReadableStream.fromPipe(this.event_loop.global, &this.reader);
const stream = JSC.WebCore.ReadableStream.fromPipe(globalObject, &this.reader);
defer this.reader.deref();
this.state = .{ .done = .{} };
return stream;
},
.done => |bytes| {
const blob = JSC.WebCore.Blob.init(bytes, bun.default_allocator, this.event_loop.global);
const blob = JSC.WebCore.Blob.init(bytes, bun.default_allocator, globalObject);
this.state = .{ .done = .{} };
return JSC.WebCore.ReadableStream.fromBlob(this.event_loop.global, &blob, 0);
return JSC.WebCore.ReadableStream.fromBlob(globalObject, &blob, 0);
},
.err => |err| {
_ = err; // autofix
const empty = JSC.WebCore.ReadableStream.empty(this.event_loop.global);
JSC.WebCore.ReadableStream.cancel(JSC.WebCore.ReadableStream.fromJS(empty, this.event_loop.global), this.event_loop.global);
const empty = JSC.WebCore.ReadableStream.empty(globalObject);
JSC.WebCore.ReadableStream.cancel(JSC.WebCore.ReadableStream.fromJS(empty, globalObject), globalObject);
return empty;
},
}
}
pub fn toBuffer(this: *PipeReader) JSC.JSValue {
pub fn toBuffer(this: *PipeReader, globalThis: *JSC.JSGlobalObject) JSC.JSValue {
switch (this.state) {
.done => |bytes| {
defer this.state = .{ .done = &.{} };
return JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(this.event_loop.global);
return JSC.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
},
else => {
return JSC.JSValue.undefined;
@@ -823,16 +796,19 @@ pub const Subprocess = struct {
}
pub fn onOutputError(this: *PipeReader, err: bun.sys.Error) void {
if (this.state == .done) {
bun.default_allocator.free(this.state.done);
}
this.state = .{ .err = err };
this.process.onCloseIO(this.kind());
}
fn kind(this: *const PipeReader) StdioKind {
if (this.process.stdout == .pipe and this.process.stdout.sync_buffered_output == this) {
// are we stdout?
if (this.process.stdout == .pipe and this.process.stdout.pipe == this) {
return .stdout;
} else if (this.process.stderr == .sync_buffered_output and this.process.stderr.sync_buffered_output == this) {
// are we stderr?
}
if (this.process.stderr == .pipe and this.process.stderr.pipe == this) {
return .stderr;
}
@@ -870,32 +846,35 @@ pub const Subprocess = struct {
bun.default_allocator.free(this.state.done);
}
this.reader.deinit();
this.destroy();
}
};
const SinkType = if (Environment.isWindows) *JSC.WebCore.UVStreamSink else *JSC.WebCore.FileSink;
const BufferedInputType = BufferedInput;
const Writable = union(enum) {
pipe: SinkType,
pipe: struct {
pipe: SinkType,
readable_stream: JSC.WebCore.ReadableStream,
},
pipe: *JSC.WebCore.PipeSink,
fd: bun.FileDescriptor,
buffered_input: BufferedInputType,
buffer: *StaticPipeWriter,
memfd: bun.FileDescriptor,
inherit: void,
ignore: void,
pub fn hasPendingActivity(this: *Writable) bool {
return switch (this.*) {
// we mark them as .ignore when they are closed, so this must be true
.pipe => true,
.buffer => true,
else => false,
};
}
pub fn ref(this: *Writable) void {
switch (this.*) {
.pipe => {
if (Environment.isWindows) {
_ = uv.uv_ref(@ptrCast(this.pipe.stream));
} else if (this.pipe.poll_ref) |poll| {
poll.enableKeepingProcessAlive(JSC.VirtualMachine.get());
}
this.pipe.updateRef(true);
},
.buffer => {
this.buffer.updateRef(true);
},
else => {},
}
@@ -904,11 +883,10 @@ pub const Subprocess = struct {
pub fn unref(this: *Writable) void {
switch (this.*) {
.pipe => {
if (Environment.isWindows) {
_ = uv.uv_unref(@ptrCast(this.pipe.stream));
} else if (this.pipe.poll_ref) |poll| {
poll.disableKeepingProcessAlive(JSC.VirtualMachine.get());
}
this.pipe.updateRef(false);
},
.buffer => {
this.buffer.updateRef(false);
},
else => {},
}
@@ -917,6 +895,15 @@ pub const Subprocess = struct {
// When the stream has closed we need to be notified to prevent a use-after-free
// We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice
pub fn onClose(this: *Writable, _: ?bun.sys.Error) void {
switch (this.*) {
.buffer => {
this.buffer.deref();
},
.pipe => {
this.pipe.deref();
},
else => {},
}
this.* = .{
.ignore = {},
};
@@ -924,7 +911,7 @@ pub const Subprocess = struct {
pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Writable) void {}
pub fn init(stdio: Stdio, fd: ?bun.FileDescriptor, globalThis: *JSC.JSGlobalObject) !Writable {
pub fn init(stdio: Stdio, event_loop: *JSC.EventLoop, subprocess: *Subprocess, fd: ?bun.FileDescriptor) !Writable {
if (comptime Environment.allow_assert) {
if (fd) |fd_| {
std.debug.assert(fd_ != bun.invalid_fd);
@@ -932,44 +919,21 @@ pub const Subprocess = struct {
}
switch (stdio) {
.pipe => |maybe_readable| {
if (Environment.isWindows) @panic("TODO");
var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink);
sink.* = .{
.fd = fd.?,
.buffer = bun.ByteList{},
.allocator = globalThis.bunVM().allocator,
.auto_close = true,
.pipe => {
return Writable{
.pipe = JSC.WebCore.PipeSink.create(event_loop, fd.?),
};
sink.mode = bun.S.IFIFO;
sink.watch(fd.?);
if (maybe_readable) |readable| {
return Writable{
.pipe_to_readable_stream = .{
.pipe = sink,
.readable_stream = readable,
},
};
}
},
return Writable{ .pipe = sink };
.blob => |blob| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, fd, .{ .blob = blob }),
};
},
.sync_buffered_output => |buffer| {
_ = buffer; // autofix
@panic("This should never be called");
},
.array_buffer, .blob => {
var buffered_input: BufferedInput = .{ .fd = fd.?, .source = undefined };
switch (stdio) {
.array_buffer => |array_buffer| {
buffered_input.source = .{ .array_buffer = array_buffer };
},
.blob => |blob| {
buffered_input.source = .{ .blob = blob };
},
else => unreachable,
}
return Writable{ .buffered_input = buffered_input };
.array_buffer => |array_buffer| {
return Writable{
.buffer = StaticPipeWriter.create(event_loop, subprocess, .{ .array_buffer = array_buffer }),
};
},
.memfd => |memfd| {
std.debug.assert(memfd != bun.invalid_fd);
@@ -988,82 +952,49 @@ pub const Subprocess = struct {
}
}
pub fn toJS(this: Writable, globalThis: *JSC.JSGlobalObject) JSValue {
return switch (this) {
.pipe => |pipe| pipe.toJS(globalThis),
pub fn toJS(this: *Writable, globalThis: *JSC.JSGlobalObject) JSValue {
return switch (this.*) {
.fd => |fd| JSValue.jsNumber(fd),
.memfd, .ignore => JSValue.jsUndefined(),
.inherit => JSValue.jsUndefined(),
.buffered_input => JSValue.jsUndefined(),
.pipe_to_readable_stream => this.pipe_to_readable_stream.readable_stream.value,
.buffer, .inherit => JSValue.jsUndefined(),
.pipe => |pipe| {
this.* = .{ .ignore = {} };
return pipe.toJS(globalThis);
},
};
}
pub fn finalize(this: *Writable) void {
return switch (this.*) {
.pipe => |pipe| {
pipe.close();
pipe.deref();
this.* = .{ .ignore = {} };
},
.pipe_to_readable_stream => |*pipe_to_readable_stream| {
_ = pipe_to_readable_stream.pipe.end(null);
.buffer => {
this.buffer.updateRef(false);
this.buffer.deref();
},
.memfd => |fd| {
_ = bun.sys.close(fd);
this.* = .{ .ignore = {} };
},
.buffered_input => {
this.buffered_input.deinit();
},
.ignore => {},
.fd, .inherit => {},
};
}
pub fn setCloseCallbackIfPossible(this: *Writable, callback: CloseCallbackHandler) bool {
switch (this.*) {
.pipe => |pipe| {
if (Environment.isWindows) {
if (pipe.isClosed()) {
return false;
}
pipe.closeCallback = callback;
return true;
}
return false;
},
.pipe_to_readable_stream => |*pipe_to_readable_stream| {
if (Environment.isWindows) {
if (pipe_to_readable_stream.pipe.isClosed()) {
return false;
}
pipe_to_readable_stream.pipe.closeCallback = callback;
return true;
}
return false;
},
.buffered_input => {
if (Environment.isWindows) {
this.buffered_input.closeCallback = callback;
return true;
}
return false;
},
else => return false,
}
}
pub fn close(this: *Writable) void {
switch (this.*) {
.pipe => {},
.pipe_to_readable_stream => |*pipe_to_readable_stream| {
_ = pipe_to_readable_stream.pipe.end(null);
.pipe => |pipe| {
pipe.end(null);
},
inline .memfd, .fd => |fd| {
_ = bun.sys.close(fd);
this.* = .{ .ignore = {} };
},
.buffered_input => {
this.buffered_input.deinit();
.buffer => {
this.buffer.close();
},
.ignore => {},
.inherit => {},
@@ -1574,34 +1505,34 @@ pub const Subprocess = struct {
return .zero;
};
if (comptime is_sync) {
if (stdio[1] == .pipe and stdio[1].pipe == null) {
stdio[1] = .{ .sync_buffered_output = BufferedOutput.new(.{}) };
}
// if (comptime is_sync) {
// if (stdio[1] == .pipe and stdio[1].pipe == null) {
// stdio[1] = .{ .sync_buffered_output = BufferedOutput.new(.{}) };
// }
if (stdio[2] == .pipe and stdio[2].pipe == null) {
stdio[2] = .{ .sync_buffered_output = BufferedOutput.new(.{}) };
}
} else {
if (stdio[1] == .pipe and stdio[1].pipe == null) {
stdio[1] = .{ .buffer = {} };
}
// if (stdio[2] == .pipe and stdio[2].pipe == null) {
// stdio[2] = .{ .sync_buffered_output = BufferedOutput.new(.{}) };
// }
// } else {
// if (stdio[1] == .pipe and stdio[1].pipe == null) {
// stdio[1] = .{ .buffer = {} };
// }
if (stdio[2] == .pipe and stdio[2].pipe == null) {
stdio[2] = .{ .buffer = {} };
}
}
defer {
if (comptime is_sync) {
if (stdio[1] == .sync_buffered_output) {
stdio[1].sync_buffered_output.deref();
}
// if (stdio[2] == .pipe and stdio[2].pipe == null) {
// stdio[2] = .{ .buffer = {} };
// }
// }
// defer {
// if (comptime is_sync) {
// if (stdio[1] == .sync_buffered_output) {
// stdio[1].sync_buffered_output.deref();
// }
if (stdio[2] == .sync_buffered_output) {
stdio[2].sync_buffered_output.deref();
}
}
}
// if (stdio[2] == .sync_buffered_output) {
// stdio[2].sync_buffered_output.deref();
// }
// }
// }
const spawn_options = bun.spawn.SpawnOptions{
.cwd = cwd,
@@ -1661,12 +1592,33 @@ pub const Subprocess = struct {
is_sync,
),
.pid_rusage = null,
.stdin = Writable.init(stdio[0], spawned.stdin, globalThis) catch {
.stdin = Writable.init(
stdio[0],
jsc_vm.eventLoop(),
subprocess,
spawned.stdin,
) catch {
globalThis.throwOutOfMemory();
return .zero;
},
.stdout = Readable.init(stdio[1], spawned.stdout, jsc_vm.allocator, default_max_buffer_size, is_sync),
.stderr = Readable.init(stdio[2], spawned.stderr, jsc_vm.allocator, default_max_buffer_size, is_sync),
.stdout = Readable.init(
jsc_vm.eventLoop(),
subprocess,
stdio[1],
spawned.stdout,
jsc_vm.allocator,
default_max_buffer_size,
is_sync,
),
.stderr = Readable.init(
jsc_vm.eventLoop(),
subprocess,
stdio[2],
spawned.stderr,
jsc_vm.allocator,
default_max_buffer_size,
is_sync,
),
.stdio_pipes = spawned.extra_pipes.moveToUnmanaged(),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.ipc_mode = ipc_mode,
@@ -1715,23 +1667,19 @@ pub const Subprocess = struct {
}
}
if (subprocess.stdin == .buffered_input) {
subprocess.stdin.buffered_input.remain = switch (subprocess.stdin.buffered_input.source) {
.blob => subprocess.stdin.buffered_input.source.blob.slice(),
.array_buffer => |array_buffer| array_buffer.slice(),
};
subprocess.stdin.buffered_input.writeIfPossible(is_sync);
if (subprocess.stdin == .buffer) {
subprocess.stdin.buffer.start(spawned.stdin.?, true);
}
if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
if (subprocess.stdout == .pipe) {
if (is_sync or !lazy) {
subprocess.stdout.pipe.buffer.readAll();
subprocess.stdout.pipe.readAll();
}
}
if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
if (subprocess.stderr == .pie) {
if (is_sync or !lazy) {
subprocess.stderr.pipe.buffer.readAll();
subprocess.stderr.pipe.readAll();
}
}
@@ -1741,14 +1689,6 @@ pub const Subprocess = struct {
return out;
}
if (subprocess.stdin == .buffered_input) {
while (subprocess.stdin.buffered_input.remain.len > 0) {
subprocess.stdin.buffered_input.writeIfPossible(true);
}
}
subprocess.closeIO(.stdin);
if (comptime is_sync) {
switch (subprocess.process.watch(jsc_vm)) {
.result => {},
@@ -1759,12 +1699,16 @@ pub const Subprocess = struct {
}
while (!subprocess.hasExited()) {
if (subprocess.stderr == .pipe and subprocess.stderr.pipe == .buffer) {
subprocess.stderr.pipe.buffer.readAll();
if (subprocess.stdin == .buffer) {
subprocess.stdin.buffer.flush();
}
if (subprocess.stdout == .pipe and subprocess.stdout.pipe == .buffer) {
subprocess.stdout.pipe.buffer.readAll();
if (subprocess.stderr == .pipe) {
subprocess.stderr.pipe.readAll();
}
if (subprocess.stdout == .pipe) {
subprocess.stdout.pipe.readAll();
}
jsc_vm.tick();
@@ -1797,10 +1741,9 @@ pub const Subprocess = struct {
fd: bun.FileDescriptor,
path: JSC.Node.PathLike,
blob: JSC.WebCore.AnyBlob,
pipe: ?JSC.WebCore.ReadableStream,
array_buffer: JSC.ArrayBuffer.Strong,
memfd: bun.FileDescriptor,
sync_buffered_output: *BufferedOutput,
pipe: void,
const PipeExtra = struct {
fd: i32,
@@ -1815,7 +1758,7 @@ pub const Subprocess = struct {
return switch (this.*) {
.blob => !this.blob.needsToReadFile(),
.memfd, .array_buffer => true,
.pipe => |pipe| pipe == null and is_sync,
.pipe => is_sync,
else => false,
};
}
@@ -1892,10 +1835,10 @@ pub const Subprocess = struct {
}
fn toPosix(
stdio: @This(),
stdio: *@This(),
) bun.spawn.SpawnOptions.Stdio {
return switch (stdio) {
.array_buffer, .blob, .pipe => .{ .buffer = {} },
.pipe, .array_buffer, .blob => .{ .buffer = {} },
.fd => |fd| .{ .pipe = fd },
.memfd => |fd| .{ .pipe = fd },
.path => |pathlike| .{ .path = pathlike.slice() },
@@ -1905,22 +1848,21 @@ pub const Subprocess = struct {
}
fn toWindows(
stdio: @This(),
stdio: *@This(),
) bun.spawn.SpawnOptions.Stdio {
return switch (stdio) {
.array_buffer, .blob, .pipe => .{ .buffer = {} },
.pipe, .array_buffer, .blob, .pipe => .{ .buffer = {} },
.fd => |fd| .{ .pipe = fd },
.path => |pathlike| .{ .path = pathlike.slice() },
.inherit => .{ .inherit = {} },
.ignore => .{ .ignore = {} },
.sync_buffer => .{ .buffer = &stdio.sync_buffer.reader.pipe },
.memfd => @panic("This should never happen"),
};
}
pub fn asSpawnOption(
stdio: @This(),
stdio: *@This(),
) bun.spawn.SpawnOptions.Stdio {
if (comptime Environment.isWindows) {
return stdio.toWindows();
@@ -1928,54 +1870,6 @@ pub const Subprocess = struct {
return stdio.toPosix();
}
}
fn setUpChildIoUvSpawn(
stdio: @This(),
std_fileno: i32,
pipe: *uv.Pipe,
isReadable: bool,
fd: bun.FileDescriptor,
) !uv.uv_stdio_container_s {
return switch (stdio) {
.array_buffer, .blob, .pipe => {
if (uv.uv_pipe_init(uv.Loop.get(), pipe, 0) != 0) {
return error.FailedToCreatePipe;
}
if (fd != bun.invalid_fd) {
// we receive a FD so we open this into our pipe
if (uv.uv_pipe_open(pipe, bun.uvfdcast(fd)).errEnum()) |_| {
return error.FailedToCreatePipe;
}
return uv.uv_stdio_container_s{
.flags = @intCast(uv.UV_INHERIT_STREAM),
.data = .{ .stream = @ptrCast(pipe) },
};
}
// we dont have any fd so we create a new pipe
return uv.uv_stdio_container_s{
.flags = @intCast(uv.UV_CREATE_PIPE | if (isReadable) uv.UV_READABLE_PIPE else uv.UV_WRITABLE_PIPE),
.data = .{ .stream = @ptrCast(pipe) },
};
},
.fd => |_fd| uv.uv_stdio_container_s{
.flags = uv.UV_INHERIT_FD,
.data = .{ .fd = bun.uvfdcast(_fd) },
},
.path => |pathlike| {
_ = pathlike;
@panic("TODO");
},
.inherit => uv.uv_stdio_container_s{
.flags = uv.UV_INHERIT_FD,
.data = .{ .fd = std_fileno },
},
.ignore => uv.uv_stdio_container_s{
.flags = uv.UV_IGNORE,
.data = undefined,
},
.memfd => unreachable,
};
}
};
fn extractStdioBlob(

View File

@@ -65,6 +65,15 @@ pub fn Maybe(comptime ResultType: type) type {
.result = std.mem.zeroes(ReturnType),
};
pub fn assert(this: @This()) ReturnType {
switch (this) {
.err => |err| {
bun.Output.panic("Unexpected error\n{}", .{err});
},
.result => |result| return result,
}
}
pub inline fn todo() @This() {
if (Environment.allow_assert) {
if (comptime ResultType == void) {

View File

@@ -189,6 +189,7 @@ pub const ReadableStream = struct {
.Blob => |blob| blob.parent().decrementCount(),
.File => |file| file.parent().decrementCount(),
.Bytes => |bytes| bytes.parent().decrementCount(),
.Pipe => |bytes| bytes.parent().decrementCount(),
else => 0,
};
@@ -422,6 +423,7 @@ pub const StreamStart = union(Tag) {
as_uint8array: bool,
stream: bool,
},
PipeSink: void,
FileSink: struct {
chunk_size: Blob.SizeType = 16384,
input_path: PathOrFileDescriptor,
@@ -644,6 +646,11 @@ pub const StreamResult = union(Tag) {
into_array_and_done,
};
pub fn slice16(this: *const StreamResult) []const u16 {
const bytes = this.slice();
return @as([*]const u16, @ptrCast(@alignCast(bytes.ptr)))[0..std.mem.bytesAsSlice(u16, bytes).len];
}
pub fn slice(this: *const StreamResult) []const u8 {
return switch (this.*) {
.owned => |owned| owned.slice(),
@@ -673,6 +680,10 @@ pub const StreamResult = union(Tag) {
consumed: Blob.SizeType = 0,
state: StreamResult.Pending.State = .none,
pub fn deinit(_: *@This()) void {
// TODO:
}
pub const Future = union(enum) {
promise: struct {
promise: *JSPromise,
@@ -1477,8 +1488,6 @@ pub fn NewFileSink(comptime EventLoop: JSC.EventLoopKind) type {
while (remain.len > 0) {
const write_buf = remain[0..@min(remain.len, max_to_write)];
const res = bun.sys.write(fd, write_buf);
// this does not fix the issue with writes not showing up
// const res = bun.sys.sys_uv.write(fd, write_buf);
if (res == .err) {
const retry =
@@ -2632,7 +2641,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
defer {
if ((comptime @hasField(SinkType, "done")) and this.sink.done) {
callframe.this().unprotect();
this.unprotect();
}
}
@@ -3662,27 +3671,279 @@ pub fn ReadableStreamSource(
};
}
pub const PipeSink = struct {
writer: bun.io.StreamingWriter(@This(), onWrite, onError, onReady, onClose) = .{},
done: bool = false,
event_loop_handle: JSC.EventLoopHandle,
fd: bun.FileDescriptor = bun.invalid_fd,
written: usize = 0,
pending: StreamResult.Writable.Pending = .{},
signal: Signal = Signal{},
const log = Output.scoped(.Pipe);
pub usingnamespace bun.NewRefCounted(PipeSink, deinit);
pub fn onWrite(this: *PipeSink, amount: usize, done: bool) void {
log("onWrite({d}, {any})", .{ amount, done });
this.written += amount;
if (this.pending.state == .pending)
this.pending.consumed += amount;
if (done) {
if (this.pending.state == .pending) {
this.pending.result = .{ .owned = this.pending.consumed };
this.pending.run();
}
}
}
pub fn onError(this: *PipeSink, err: bun.sys.Error) void {
log("onError({any})", .{err});
if (this.pending.state == .pending) {
this.pending.result = .{ .err = err };
this.pending.run();
}
}
pub fn onReady(this: *PipeSink) void {
log("onReady()", .{});
this.signal.ready(null, null);
}
pub fn onClose(this: *PipeSink) void {
log("onClose()", .{});
this.signal.close(null);
}
pub fn create(
event_loop: *JSC.EventLoop,
fd: bun.FileDescriptor,
) *PipeSink {
return PipeSink.new(.{
.event_loop_handle = JSC.EventLoopHandle.init(event_loop),
.fd = fd,
});
}
pub fn setup(
this: *PipeSink,
fd: bun.FileDescriptor,
) void {
this.fd = fd;
this.writer.start(fd, true).assert();
}
pub fn loop(this: *PipeSink) *Async.Loop {
return this.event_loop_handle.loop();
}
pub fn eventLoop(this: *PipeSink) JSC.EventLoopHandle {
return this.event_loop_handle;
}
pub fn connect(this: *PipeSink, signal: Signal) void {
this.signal = signal;
}
pub fn start(this: *PipeSink, stream_start: StreamStart) JSC.Node.Maybe(void) {
switch (stream_start) {
.PipeSink => {},
else => {},
}
this.done = false;
this.signal.start();
return .{ .result = {} };
}
pub fn flush(_: *PipeSink) JSC.Node.Maybe(void) {
return .{ .result = {} };
}
pub fn flushFromJS(this: *PipeSink, globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
_ = globalThis; // autofix
_ = wait; // autofix
if (this.done or this.pending.state == .pending) {
return .{ .result = JSC.JSValue.jsUndefined() };
}
return this.toResult(this.writer.flush());
}
pub fn finalize(this: *PipeSink) void {
this.pending.deinit();
this.deref();
}
pub fn init(fd: bun.FileDescriptor) *PipeSink {
return PipeSink.new(.{
.writer = .{},
.fd = fd,
});
}
pub fn construct(
this: *PipeSink,
allocator: std.mem.Allocator,
) void {
_ = allocator; // autofix
this.* = PipeSink{
.event_loop_handle = JSC.EventLoopHandle.init(JSC.VirtualMachine.get().eventLoop()),
};
}
pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
if (this.next) |*next| {
return next.writeBytes(data);
}
return this.toResult(this.writer.write(data.slice()));
}
pub const writeBytes = write;
pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable {
if (this.next) |*next| {
return next.writeLatin1(data);
}
return this.toResult(this.writer.writeLatin1(data.slice()));
}
pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable {
if (this.next) |*next| {
return next.writeUTF16(data);
}
return this.toResult(this.writer.writeUTF16(data.slice16()));
}
pub fn end(this: *PipeSink, err: ?Syscall.Error) JSC.Node.Maybe(void) {
if (this.next) |*next| {
return next.end(err);
}
switch (this.writer.flush()) {
.done => {
this.writer.end();
return .{ .result = {} };
},
.err => |e| {
return .{ .err = e };
},
.pending => |pending_written| {
_ = pending_written; // autofix
this.ref();
this.done = true;
this.writer.close();
return .{ .result = {} };
},
.written => |written| {
_ = written; // autofix
this.writer.end();
return .{ .result = {} };
},
}
}
pub fn deinit(this: *PipeSink) void {
this.writer.deinit();
}
pub fn toJS(this: *PipeSink, globalThis: *JSGlobalObject) JSValue {
return JSSink.createObject(globalThis, this);
}
pub fn endFromJS(this: *PipeSink, globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
if (this.done) {
if (this.pending.state == .pending) {
return .{ .result = this.pending.future.promise.promise.asValue(globalThis) };
}
return .{ .result = JSValue.jsNumber(this.written) };
}
switch (this.writer.flush()) {
.done => {
this.writer.end();
return .{ .result = JSValue.jsNumber(this.written) };
},
.err => |err| {
this.writer.close();
return .{ .err = err };
},
.pending => |pending_written| {
this.written += pending_written;
this.done = true;
this.pending.result = .{ .owned = pending_written };
return .{ .result = this.pending.promise(globalThis).asValue(globalThis) };
},
.written => |written| {
this.writer.end();
return .{ .result = JSValue.jsNumber(written) };
},
}
}
pub fn sink(this: *PipeSink) Sink {
return Sink.init(this);
}
pub fn updateRef(this: *PipeSink, value: bool) void {
if (value) {
this.writer.enableKeepingProcessAlive(this.event_loop_handle);
} else {
this.writer.disableKeepingProcessAlive(this.event_loop_handle);
}
}
pub const JSSink = NewJSSink(@This(), "PipeSink");
fn toResult(this: *PipeSink, write_result: bun.io.WriteResult) StreamResult.Writable {
switch (write_result) {
.done => |amt| {
if (amt > 0)
return .{ .owned_and_done = @truncate(amt) };
return .{ .done = {} };
},
.wrote => |amt| {
if (amt > 0)
return .{ .owned = @truncate(amt) };
return .{ .temporary = @truncate(amt) };
},
.err => |err| {
return .{ .err = err };
},
.pending => |pending_written| {
this.pending.consumed += pending_written;
this.pending.result = .{ .owned = pending_written };
return .{ .pending = &this.pending };
},
}
}
};
pub const PipeReader = struct {
reader: bun.io.BufferedOutputReader(@This(), onReadChunk) = .{},
done: bool = false,
pending: StreamResult.Pending = .{},
pending_value: JSC.Strong = .{},
pending_view: []u8 = []u8{},
fd: bun.io.FileDescriptor = bun.invalid_fd,
pub fn setup(
this: *PipeReader,
other_reader: anytype,
fd: bun.io.FileDescriptor,
) void {
this.* = PipeReader{
.reader = .{},
.done = false,
.fd = fd,
};
this.reader.fromOutputReader(other_reader, this);
}
pub fn onStart(this: *PipeReader) StreamStart {
switch (this.reader.start()) {
switch (this.reader.start(this.fd, true)) {
.result => {},
.err => |e| {
return .{ .err = e };
@@ -3752,8 +4013,12 @@ pub const PipeReader = struct {
this.pending_value.clear();
this.pending_view = &.{};
if (buffer.len >= drained.len) {
if (buffer.len >= @as(usize, drained.len)) {
@memcpy(buffer[0..drained.len], drained);
// give it back!
this.reader.buffer().* = drained;
if (this.done) {
return .{ .into_array_and_done = .{ .value = array, .len = drained.len } };
} else {
@@ -3801,7 +4066,7 @@ pub const PipeReader = struct {
pub const Source = ReadableStreamSource(
@This(),
"ReadableStreamPipe",
"PipeReader",
onStart,
onPull,
onCancel,
@@ -4355,7 +4620,7 @@ pub const File = struct {
var fd = if (file.pathlike != .path)
// We will always need to close the file descriptor.
switch (Syscall.dup(file.pathlike.fd)) {
.result => |_fd| if (Environment.isWindows) bun.toLibUVOwnedFD(_fd) else _fd,
.result => |_fd| _fd,
.err => |err| {
return .{ .err = err.withFd(file.pathlike.fd) };
},
@@ -4829,11 +5094,6 @@ pub const FileReader = struct {
} else if (this.lazy_readable == .empty)
return .{ .empty = {} };
if (this.readable().* == .File) {
const chunk_size = this.readable().File.calculateChunkSize(std.math.maxInt(usize));
return .{ .chunk_size = @as(Blob.SizeType, @truncate(chunk_size)) };
}
return .{ .chunk_size = if (this.user_chunk_size == 0) default_fifo_chunk_size else this.user_chunk_size };
}

View File

@@ -1,6 +1,6 @@
import { resolve, join } from "path";
const classes = ["ArrayBufferSink", "FileSink", "HTTPResponseSink", "HTTPSResponseSink", "UVStreamSink"];
const classes = ["ArrayBufferSink", "FileSink", "HTTPResponseSink", "HTTPSResponseSink", "UVStreamSink", "PipeSink"];
function names(name) {
return {

View File

@@ -195,11 +195,6 @@ pub fn PosixPipeReader(
}
pub fn close(this: *This) void {
const fd = getFd(this);
if (fd != bun.invalid_fd) {
_ = bun.sys.close();
this.handle.getPoll().deinit();
}
vtable.done(this);
}
};
@@ -377,6 +372,10 @@ pub fn PosixBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?*
}
pub fn done(this: *PosixOutputReader) void {
if (this.handle != .closed) {
this.handle.close(this, done);
return;
}
this.finish();
this.parent.onOutputDone();
}
@@ -393,6 +392,7 @@ pub fn PosixBufferedOutputReader(comptime Parent: type, comptime onReadChunk: ?*
pub fn registerPoll(this: *PosixOutputReader) void {
const poll = this.handle.getPoll() orelse return;
poll.owner.set(this);
switch (poll.register(this.parent.loop(), .readable, true)) {
.err => |err| {
this.onError(err);

View File

@@ -236,6 +236,14 @@ pub fn PosixBufferedWriter(
this.handle.close(this.parent, onClose);
}
pub fn updateRef(this: *PosixWriter, value: bool, event_loop: JSC.EventLoopHandle) void {
if (value) {
this.enableKeepingProcessAlive(event_loop);
} else {
this.disableKeepingProcessAlive(event_loop);
}
}
pub fn start(this: *PosixWriter, fd: bun.FileDescriptor, bytes: []const u8, pollable: bool) JSC.Maybe(void) {
this.buffer = bytes;
if (!pollable) {
@@ -244,9 +252,9 @@ pub fn PosixBufferedWriter(
return JSC.Maybe(void){ .result = {} };
}
const loop = @as(*Parent, @ptrCast(this.parent)).loop();
var poll = this.poll orelse brk: {
var poll = this.getPoll() orelse brk: {
this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) };
break :brk this.poll.?;
break :brk this.handle.poll;
};
switch (poll.registerWithFd(loop, .writable, true, fd)) {
@@ -479,7 +487,7 @@ pub fn PosixStreamingWriter(
}
pub fn hasRef(this: *PosixWriter) bool {
const poll = this.poll orelse return false;
const poll = this.getPoll() orelse return false;
return !this.is_done and poll.canEnableKeepingProcessAlive();
}
@@ -495,6 +503,14 @@ pub fn PosixStreamingWriter(
poll.disableKeepingProcessAlive(event_loop);
}
pub fn updateRef(this: *PosixWriter, event_loop: JSC.EventLoopHandle, value: bool) void {
if (value) {
this.enableKeepingProcessAlive(event_loop);
} else {
this.disableKeepingProcessAlive(event_loop);
}
}
pub fn end(this: *PosixWriter) void {
if (this.is_done) {
return;
@@ -516,9 +532,9 @@ pub fn PosixStreamingWriter(
}
const loop = @as(*Parent, @ptrCast(this.parent)).loop();
var poll = this.poll orelse brk: {
var poll = this.getPoll() orelse brk: {
this.handle = .{ .poll = Async.FilePoll.init(loop, fd, .writable, PosixWriter, this) };
break :brk this.poll.?;
break :brk this.handle.poll;
};
switch (poll.registerWithFd(loop, .writable, true, fd)) {

View File

@@ -929,4 +929,5 @@ pub const retry = bun.C.E.AGAIN;
pub const PipeReader = @import("./PipeReader.zig").PipeReader;
pub const BufferedOutputReader = @import("./PipeReader.zig").BufferedOutputReader;
pub const BufferedWriter = @import("./PipeWriter.zig").BufferedWriter;
pub const WriteResult = @import("./PipeWriter.zig").WriteResult;
pub const StreamingWriter = @import("./PipeWriter.zig").StreamingWriter;

View File

@@ -33,8 +33,11 @@ pub const PollOrFd = union(enum) {
if (fd != bun.invalid_fd) {
this.handle = .{ .closed = {} };
_ = bun.sys.close(fd);
if (comptime onCloseFn != void)
onCloseFn(@ptrCast(ctx.?));
} else {
this.handle = .{ .closed = {} };
}
}
};