implement "source" in PipeReader

This commit is contained in:
Georgijs Vilums
2024-02-19 21:25:38 -08:00
parent 7098a86acc
commit 091cdcf1fd
4 changed files with 142 additions and 71 deletions

View File

@@ -822,7 +822,7 @@ pub const Subprocess = struct {
.stdio_result = result,
});
if (Environment.isWindows) {
this.reader.pipe = this.stdio_result.buffer;
this.reader.source = .{ .pipe = this.stdio_result.buffer };
}
this.reader.setParent(this);
return this;
@@ -956,7 +956,7 @@ pub const Subprocess = struct {
}
if (comptime Environment.isWindows) {
std.debug.assert(this.reader.pipe == null or this.reader.pipe.?.isClosed());
std.debug.assert(this.reader.source == null or this.reader.source.?.isClosed());
}
if (this.state == .done) {

View File

@@ -131,8 +131,8 @@ pub const LifecycleScriptSubprocess = struct {
null,
};
if (Environment.isWindows) {
this.stdout.pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory();
this.stderr.pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory();
this.stdout.source = .{ .pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() };
this.stderr.source = .{ .pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory() };
}
const spawn_options = bun.spawn.SpawnOptions{
.stdin = .ignore,
@@ -142,7 +142,7 @@ pub const LifecycleScriptSubprocess = struct {
.buffer
else
.{
.buffer = this.stdout.pipe.?,
.buffer = this.stdout.source.?.pipe,
},
.stderr = if (this.manager.options.log_level.isVerbose())
.inherit
@@ -150,7 +150,7 @@ pub const LifecycleScriptSubprocess = struct {
.buffer
else
.{
.buffer = this.stderr.pipe.?,
.buffer = this.stderr.source.?.pipe,
},
.cwd = cwd,

View File

@@ -318,7 +318,91 @@ pub fn WindowsPipeReader(
comptime onError: fn (*This, bun.sys.Error) void,
) type {
return struct {
pub usingnamespace uv.StreamReaderMixin(This, .pipe);
// pub usingnamespace uv.StreamReaderMixin(This, .pipe);
fn uv_alloc_cb(handle: *uv.Handle, suggested_size: usize, buf: *uv.uv_buf_t) callconv(.C) void {
var this = bun.cast(*This, handle.data);
const result = this.getReadBufferWithStableMemoryAddress(suggested_size);
buf.* = uv.uv_buf_t.init(result);
}
fn uv_stream_read_cb(stream: *uv.uv_stream_t, nread: uv.ReturnCodeI64, buf: *const uv.uv_buf_t) callconv(.C) void {
var this = bun.cast(*This, stream.data);
const nread_int = nread.int();
switch (nread_int) {
0 => {
// EAGAIN or EWOULDBLOCK or canceled
return this.onRead(.{ .result = 0 }, buf, .drained);
},
uv.UV_EOF => {
// EOF
return this.onRead(.{ .result = 0 }, buf, .eof);
},
else => {
this.onRead(if (nread.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress);
},
}
}
fn uv_file_read_cb(fs: *uv.fs_t) callconv(.C) void {
var this: *This = bun.cast(*This, fs.data);
const nread_int = fs.result.int();
const buf = &this.*.source.?.file.iov;
switch (nread_int) {
0, uv.UV_ECANCELED =>
// EAGAIN or EWOULDBLOCK or canceled
this.onRead(.{ .result = 0 }, buf, .drained),
uv.UV_EOF =>
// EOF
this.onRead(.{ .result = 0 }, buf, .eof),
else => this.onRead(if (fs.result.toError(.recv)) |err| .{ .err = err } else .{ .result = @intCast(nread_int) }, buf, .progress),
}
uv.uv_fs_req_cleanup(fs);
}
pub fn startReading(this: *This) bun.JSC.Maybe(void) {
const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.BADF, .read) };
switch (source) {
.file => |file| {
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, uv_file_read_cb).toError(.write)) |err| {
return .{ .err = err };
}
},
else => {
if (uv.uv_read_start(source.toStream(), &uv_alloc_cb, @ptrCast(&uv_stream_read_cb)).toError(.open)) |err| {
return .{ .err = err };
}
},
}
return .{ .result = {} };
}
pub fn stopReading(this: *This) bun.JSC.Maybe(void) {
const source = this.source orelse return .{ .result = {} };
switch (source) {
.file => |file| {
_ = uv.uv_cancel(@ptrCast(&file.fs));
},
else => {
// can be safely ignored as per libuv documentation
_ = uv.uv_read_stop(source.toStream());
},
}
return .{ .result = {} };
}
pub fn close(this: *This) void {
_ = this.stopReading();
if (this.source) |source| {
source.getHandle().close(onCloseSource);
}
}
const vtable = .{
.getBuffer = getBuffer,
@@ -327,37 +411,12 @@ pub fn WindowsPipeReader(
.onError = onError,
};
fn _pipe(this: *This) ?*uv.Pipe {
switch (@TypeOf(this.pipe)) {
?*uv.Pipe, *uv.Pipe => return this.pipe,
uv.Pipe => return &this.pipe,
else => @compileError("StreamReaderMixin only works with Pipe, *Pipe or ?*Pipe"),
}
}
pub fn open(this: *This, loop: *uv.Loop, fd: bun.FileDescriptor, ipc: bool) bun.JSC.Maybe(void) {
const pipe = _pipe(this) orelse return .{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe) };
switch (pipe.init(loop, ipc)) {
.err => |err| {
return .{ .err = err };
},
fn onCloseSource(handle: *uv.Handle) callconv(.C) void {
const this = bun.cast(*This, handle.data);
switch (this.source.?) {
.file => |file| uv.uv_fs_req_cleanup(&file.fs),
else => {},
}
pipe.data = this;
switch (pipe.open(bun.uvfdcast(fd))) {
.err => |err| {
return .{ .err = err };
},
else => {},
}
return .{ .result = {} };
}
fn onClosePipe(pipe: *uv.Pipe) callconv(.C) void {
const this = bun.cast(*This, pipe.data);
done(this);
}
@@ -397,22 +456,13 @@ pub fn WindowsPipeReader(
}
pub fn unpause(this: *This) void {
const pipe = this._pipe() orelse return;
if (!pipe.isActive()) {
this.startReading().unwrap() catch {};
}
_ = this.startReading();
}
pub fn read(this: *This) void {
// we cannot sync read pipes on Windows so we just check if we are paused to resume the reading
this.unpause();
}
pub fn close(this: *This) void {
const pipe = this._pipe() orelse return;
this.stopReading().unwrap() catch unreachable;
pipe.close(&onClosePipe);
}
};
}
@@ -733,7 +783,7 @@ const WindowsOutputReaderVTable = struct {
pub const WindowsBufferedReader = struct {
/// The pointer to this pipe must be stable.
/// It cannot change because we don't know what libuv will do with it.
pipe: ?*uv.Pipe = null,
source: ?Source = null,
_buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
// for compatibility with Linux
flags: Flags = .{},
@@ -769,22 +819,22 @@ pub const WindowsBufferedReader = struct {
}
pub fn from(to: *WindowsOutputReader, other: anytype, parent: anytype) void {
std.debug.assert(other.pipe != null and to.pipe == null);
std.debug.assert(other.source != null and to.source == null);
to.* = .{
.vtable = to.vtable,
.flags = other.flags,
._buffer = other.buffer().*,
.has_inflight_read = other.has_inflight_read,
.pipe = other.pipe,
.source = other.source,
};
other.flags.is_done = true;
other.pipe = null;
other.source = null;
to.setParent(parent);
}
pub fn getFd(this: *const WindowsOutputReader) bun.FileDescriptor {
const pipe = this.pipe orelse return bun.invalid_fd;
return pipe.fd();
const source = this.source orelse return bun.invalid_fd;
return source.getFd();
}
pub fn watch(_: *WindowsOutputReader) void {
@@ -794,18 +844,18 @@ pub const WindowsBufferedReader = struct {
pub fn setParent(this: *WindowsOutputReader, parent: anytype) void {
this.parent = parent;
if (!this.flags.is_done) {
if (this.pipe) |pipe| {
pipe.data = this;
if (this.source) |source| {
source.setData(this);
}
}
}
pub fn updateRef(this: *WindowsOutputReader, value: bool) void {
if (this.pipe) |pipe| {
if (this.source) |source| {
if (value) {
pipe.ref();
source.ref();
} else {
pipe.unref();
source.unref();
}
}
}
@@ -833,8 +883,8 @@ pub const WindowsBufferedReader = struct {
}
pub fn hasPendingActivity(this: *const WindowsOutputReader) bool {
const pipe = this.pipe orelse return false;
return pipe.isClosed();
const source = this.source orelse return false;
return source.isClosed();
}
pub fn hasPendingRead(this: *const WindowsOutputReader) bool {
@@ -858,7 +908,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn done(this: *WindowsOutputReader) void {
std.debug.assert(this.pipe == null or this.pipe.?.isClosed());
std.debug.assert(if (this.source) |source| source.isClosed() else true);
this.finish();
@@ -877,7 +927,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn startWithCurrentPipe(this: *WindowsOutputReader) bun.JSC.Maybe(void) {
std.debug.assert(this.pipe != null);
std.debug.assert(this.source != null);
this.buffer().clearRetainingCapacity();
this.flags.is_done = false;
@@ -886,25 +936,30 @@ pub const WindowsBufferedReader = struct {
}
pub fn startWithPipe(this: *WindowsOutputReader, pipe: *uv.Pipe) bun.JSC.Maybe(void) {
std.debug.assert(this.pipe == null);
this.pipe = pipe;
std.debug.assert(this.source == null);
this.source = .{ .pipe = pipe };
return this.startWithCurrentPipe();
}
pub fn start(this: *WindowsOutputReader, fd: bun.FileDescriptor, _: bool) bun.JSC.Maybe(void) {
//TODO: check detect if its a tty here and use uv_tty_t instead of pipe
std.debug.assert(this.pipe == null);
this.pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory();
if (this.open(uv.Loop.get(), fd, false).asErr()) |err| return .{ .err = err };
std.debug.assert(this.source == null);
const source = switch (Source.open(uv.Loop.get(), fd)) {
.err => |err| return .{ .err = err },
.result => |source| source,
};
source.setData(this);
this.source = source;
return this.startWithCurrentPipe();
}
pub fn deinit(this: *WindowsOutputReader) void {
this.buffer().deinit();
var pipe = this.pipe orelse return;
std.debug.assert(pipe.isClosed());
this.pipe = null;
bun.default_allocator.destroy(pipe);
const source = this.source orelse return;
std.debug.assert(source.isClosed());
this.source = null;
switch (source) {
inline else => |ptr| bun.default_allocator.destroy(ptr),
}
}
};

View File

@@ -80,6 +80,22 @@ pub const Source = union(enum) {
}
}
pub fn isClosed(this: Source) bool {
switch (this) {
.pipe => |pipe| return pipe.isClosed(),
.tty => |tty| return tty.isClosed(),
.file => |file| return file.file == -1,
}
}
pub fn isActive(this: Source) bool {
switch (this) {
.pipe => |pipe| return pipe.isActive(),
.tty => |tty| return tty.isActive(),
.file => return false,
}
}
pub fn openPipe(loop: *uv.Loop, fd: bun.FileDescriptor, ipc: bool) bun.JSC.Maybe(*Source.Pipe) {
log("openPipe (fd = {})", .{fd});
const pipe = bun.default_allocator.create(Source.Pipe) catch bun.outOfMemory();