Merge branch 'jarred/process-change' of https://github.com/oven-sh/bun into jarred/process-change

This commit is contained in:
Georgijs Vilums
2024-02-19 21:25:41 -08:00
14 changed files with 170 additions and 33 deletions

View File

@@ -212,6 +212,20 @@ pub const FilePoll = struct {
try writer.print("FilePoll({}) = {}", .{ poll.fd, Flags.Formatter{ .data = poll.flags } });
}
pub fn fileType(poll: *const FilePoll) bun.io.FileType {
const flags = poll.flags;
if (flags.contains(.socket)) {
return .socket;
}
if (flags.contains(.nonblocking)) {
return .nonblocking_pipe;
}
return .pipe;
}
pub fn onKQueueEvent(poll: *FilePoll, _: *Loop, kqueue_event: *const std.os.system.kevent64_s) void {
if (KQueueGenerationNumber != u0)
std.debug.assert(poll.generation_number == kqueue_event.ext[0]);
@@ -436,6 +450,8 @@ pub const FilePoll = struct {
/// Was O_NONBLOCK set on the file descriptor?
nonblock,
socket,
pub fn poll(this: Flags) Flags {
return switch (this) {
.readable => .poll_readable,

View File

@@ -1084,7 +1084,7 @@ pub fn spawnProcessPosix(
defer {
for (to_set_cloexec.items) |fd| {
const fcntl_flags = bun.sys.fcntl(fd, std.os.F.GETFD, 0).unwrap() catch continue;
_ = bun.sys.fcntl(fd, std.os.F.SETFD, std.os.FD_CLOEXEC | fcntl_flags);
_ = bun.sys.fcntl(fd, std.os.F.SETFD, bun.C.FD_CLOEXEC | fcntl_flags);
}
to_set_cloexec.clearAndFree();
@@ -1125,19 +1125,26 @@ pub fn spawnProcessPosix(
try actions.open(fileno, path, flag | std.os.O.CREAT, 0o664);
},
.buffer => {
const pipe = try bun.sys.pipe().unwrap();
const idx: usize = comptime if (i == 0) 0 else 1;
const theirs = pipe[idx];
const ours = pipe[1 - idx];
const fds: [2]bun.FileDescriptor = brk: {
var fds_: [2]std.c.fd_t = undefined;
const rc = std.c.socketpair(std.os.AF.UNIX, std.os.SOCK.STREAM, 0, &fds_);
if (rc != 0) {
return error.SystemResources;
}
try actions.dup2(theirs, fileno);
try actions.close(ours);
const before = std.c.fcntl(fds_[if (i == 0) 1 else 0], std.os.F.GETFL);
_ = std.c.fcntl(fds_[if (i == 0) 1 else 0], std.os.F.SETFL, before | bun.C.FD_CLOEXEC);
try to_close_at_end.append(theirs);
try to_close_on_error.append(ours);
try to_set_cloexec.append(ours);
break :brk .{ bun.toFD(fds_[if (i == 0) 1 else 0]), bun.toFD(fds_[if (i == 0) 0 else 1]) };
};
stdio.* = ours;
try to_close_at_end.append(fds[1]);
try to_close_on_error.append(fds[0]);
try actions.dup2(fds[1], fileno);
try actions.close(fds[1]);
stdio.* = fds[0];
},
.pipe => |fd| {
try actions.dup2(fd, fileno);
@@ -1170,9 +1177,8 @@ pub fn spawnProcessPosix(
// enable non-block
const before = std.c.fcntl(fds_[0], std.os.F.GETFL);
// disable sigpipe
_ = std.c.fcntl(fds_[0], std.os.F.SETFL, before | std.os.O.NONBLOCK | std.os.FD_CLOEXEC);
_ = std.c.fcntl(fds_[0], std.os.F.SETFL, before | std.os.O.NONBLOCK | bun.C.FD_CLOEXEC);
break :brk .{ bun.toFD(fds_[0]), bun.toFD(fds_[1]) };
};

View File

@@ -743,7 +743,19 @@ pub const Subprocess = struct {
if (Environment.isWindows) {
return this.writer.startWithCurrentPipe();
}
return this.writer.start(this.stdio_result.?, true);
switch (this.writer.start(this.stdio_result.?, true)) {
.err => |err| {
return .{ .err = err };
},
.result => {
if (comptime Environment.isPosix) {
const poll = this.writer.handle.poll;
poll.flags.insert(.socket);
}
return .{ .result = {} };
},
}
}
pub fn onWrite(this: *This, amount: usize, is_done: bool) void {
@@ -841,7 +853,20 @@ pub const Subprocess = struct {
return this.reader.startWithCurrentPipe();
}
return this.reader.start(this.stdio_result.?, true);
switch (this.reader.start(this.stdio_result.?, true)) {
.err => |err| {
return .{ .err = err };
},
.result => {
if (comptime Environment.isPosix) {
const poll = this.reader.handle.poll;
poll.flags.insert(.nonblocking);
poll.flags.insert(.socket);
}
return .{ .result = {} };
},
}
}
pub const toJS = toReadableStream;
@@ -1101,6 +1126,8 @@ pub const Subprocess = struct {
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.flags.has_stdin_destructor_called = false;
pipe.writer.handle.poll.flags.insert(.socket);
return Writable{
.pipe = pipe,
};

View File

@@ -2895,6 +2895,7 @@ pub const FileSink = struct {
// we should not duplicate these fields...
pollable: bool = false,
nonblocking: bool = false,
is_socket: bool = false,
fd: bun.FileDescriptor = bun.invalid_fd,
has_js_called_unref: bool = false,
@@ -2926,7 +2927,7 @@ pub const FileSink = struct {
// Only keep the event loop ref'd while there's a pending write in progress.
// If there's no pending write, no need to keep the event loop ref'd.
this.writer.updateRef(this.eventLoop(), !done);
this.writer.updateRef(this.eventLoop(), false);
this.written += amount;
@@ -3018,6 +3019,7 @@ pub const FileSink = struct {
.result => |stat| {
this.pollable = bun.sys.isPollable(stat.mode) or std.os.isatty(fd.int());
this.fd = fd;
this.is_socket = std.os.S.ISSOCK(stat.mode);
this.nonblocking = this.pollable and switch (options.input_path) {
.path => true,
.fd => |fd_| bun.FDTag.get(fd_) == .none,
@@ -3047,6 +3049,12 @@ pub const FileSink = struct {
if (this.nonblocking) {
this.writer.getPoll().?.flags.insert(.nonblocking);
}
if (this.is_socket) {
this.writer.getPoll().?.flags.insert(.socket);
} else if (this.pollable) {
this.writer.getPoll().?.flags.insert(.fifo);
}
}
},
}
@@ -3302,6 +3310,7 @@ pub const FileReader = struct {
fd: bun.FileDescriptor,
pollable: bool = false,
nonblocking: bool = true,
file_type: bun.io.FileType = .file,
};
pub fn openFileBlob(
@@ -3358,7 +3367,12 @@ pub const FileReader = struct {
}
this.pollable = bun.sys.isPollable(stat.mode) or (file.is_atty orelse false);
this.file_type = if (bun.S.ISFIFO(stat.mode)) .pipe else if (bun.S.ISSOCK(stat.mode)) .socket else .file;
this.nonblocking = this.pollable and !(file.is_atty orelse false);
if (this.nonblocking and this.file_type == .pipe) {
this.file_type = .nonblocking_pipe;
}
}
this.fd = fd;
@@ -3392,6 +3406,7 @@ pub const FileReader = struct {
this.reader.setParent(this);
const was_lazy = this.lazy != .none;
var pollable = false;
var file_type: bun.io.FileType = .file;
if (this.lazy == .blob) {
switch (this.lazy.blob.data) {
.bytes => @panic("Invalid state in FileReader: expected file "),
@@ -3408,6 +3423,7 @@ pub const FileReader = struct {
.result => |opened| {
this.fd = opened.fd;
pollable = opened.pollable;
file_type = opened.file_type;
this.reader.flags.nonblocking = opened.nonblocking;
},
}
@@ -3435,8 +3451,18 @@ pub const FileReader = struct {
}
if (comptime Environment.isPosix) {
if (this.reader.flags.nonblocking) {
if (this.reader.handle.getPoll()) |poll| poll.flags.insert(.nonblocking);
if (this.reader.handle.getPoll()) |poll| {
if (file_type == .pipe or file_type == .nonblocking_pipe) {
poll.flags.insert(.fifo);
}
if (file_type == .socket) {
poll.flags.insert(.socket);
}
if (this.reader.flags.nonblocking) {
poll.flags.insert(.nonblocking);
}
}
}

View File

@@ -767,6 +767,7 @@ pub const sockaddr_dl = extern struct {
pub usingnamespace @cImport({
@cInclude("sys/spawn.h");
@cInclude("sys/fcntl.h");
@cInclude("sys/socket.h");
});
pub const F = struct {

View File

@@ -34,6 +34,10 @@ pub fn PosixPipeReader(
readFile(this, buffer, fd, 0, false);
return;
},
.socket => {
readSocket(this, buffer, fd, 0, false);
return;
},
.pipe => {
switch (bun.isReadable(fd)) {
.ready => {
@@ -64,6 +68,9 @@ pub fn PosixPipeReader(
.file => {
readFile(parent, resizable_buffer, fd, size_hint, received_hup);
},
.socket => {
readSocket(parent, resizable_buffer, fd, size_hint, received_hup);
},
.pipe => {
readFromBlockingPipeWithoutBlocking(parent, resizable_buffer, fd, size_hint, received_hup);
},
@@ -86,6 +93,10 @@ pub fn PosixPipeReader(
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, bun.sys.read);
}
fn readSocket(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, bun.sys.recvNonBlock);
}
fn readPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, bun.sys.readNonblocking);
}

View File

@@ -6,6 +6,7 @@ const uv = bun.windows.libuv;
const Source = @import("./source.zig").Source;
const log = bun.Output.scoped(.PipeWriter, false);
const FileType = @import("./pipes.zig").FileType;
pub const WriteResult = union(enum) {
done: usize,
@@ -24,15 +25,22 @@ pub fn PosixPipeWriter(
comptime registerPoll: ?fn (*This) void,
comptime onError: fn (*This, bun.sys.Error) void,
comptime onWritable: fn (*This) void,
comptime getFileType: *const fn (*This) FileType,
) type {
_ = onWritable; // autofix
return struct {
pub fn _tryWrite(this: *This, buf_: []const u8) WriteResult {
return switch (getFileType(this)) {
inline else => |ft| return _tryWriteWithWriteFn(this, buf_, comptime writeToFileType(ft)),
};
}
fn _tryWriteWithWriteFn(this: *This, buf_: []const u8, comptime write_fn: *const fn (bun.FileDescriptor, []const u8) JSC.Maybe(usize)) WriteResult {
const fd = getFd(this);
var buf = buf_;
while (buf.len > 0) {
switch (writeNonBlocking(fd, buf)) {
switch (write_fn(fd, buf)) {
.err => |err| {
if (err.isRetry()) {
return .{ .pending = buf_.len - buf.len };
@@ -54,7 +62,15 @@ pub fn PosixPipeWriter(
return .{ .wrote = buf_.len - buf.len };
}
fn writeNonBlocking(fd: bun.FileDescriptor, buf: []const u8) JSC.Maybe(usize) {
fn writeToFileType(comptime file_type: FileType) *const (fn (bun.FileDescriptor, []const u8) JSC.Maybe(usize)) {
comptime return switch (file_type) {
.nonblocking_pipe, .file => &bun.sys.write,
.pipe => &writeToBlockingPipe,
.socket => &bun.sys.sendNonBlock,
};
}
fn writeToBlockingPipe(fd: bun.FileDescriptor, buf: []const u8) JSC.Maybe(usize) {
if (comptime bun.Environment.isLinux) {
if (bun.C.linux.RWFFlagSupport.isMaybeSupported()) {
return bun.sys.writeNonblocking(fd, buf);
@@ -171,6 +187,12 @@ pub fn PosixBufferedWriter(
return this.handle.getPoll();
}
pub fn getFileType(this: *const @This()) FileType {
const poll = getPoll(this) orelse return FileType.file;
return poll.fileType();
}
pub fn getFd(this: *const PosixWriter) bun.FileDescriptor {
return this.handle.getFd();
}
@@ -247,7 +269,7 @@ pub fn PosixBufferedWriter(
return getBuffer(this.parent);
}
pub usingnamespace PosixPipeWriter(@This(), getFd, getBufferInternal, _onWrite, registerPoll, _onError, _onWritable);
pub usingnamespace PosixPipeWriter(@This(), getFd, getBufferInternal, _onWrite, registerPoll, _onError, _onWritable, getFileType);
pub fn end(this: *PosixWriter) void {
if (this.is_done) {
@@ -355,6 +377,12 @@ pub fn PosixStreamingWriter(
return this.handle.getFd();
}
pub fn getFileType(this: *PosixWriter) FileType {
const poll = this.getPoll() orelse return FileType.file;
return poll.fileType();
}
const PosixWriter = @This();
pub fn getBuffer(this: *PosixWriter) []const u8 {
@@ -558,7 +586,7 @@ pub fn PosixStreamingWriter(
return rc;
}
pub usingnamespace PosixPipeWriter(@This(), getFd, getBuffer, _onWrite, registerPoll, _onError, _onWritable);
pub usingnamespace PosixPipeWriter(@This(), getFd, getBuffer, _onWrite, registerPoll, _onError, _onWritable, getFileType);
pub fn flush(this: *PosixWriter) WriteResult {
if (this.closed_without_reporting or this.is_done) {

View File

@@ -933,3 +933,4 @@ pub const BufferedWriter = @import("./PipeWriter.zig").BufferedWriter;
pub const WriteResult = @import("./PipeWriter.zig").WriteResult;
pub const StreamingWriter = @import("./PipeWriter.zig").StreamingWriter;
pub const StreamBuffer = @import("./PipeWriter.zig").StreamBuffer;
pub const FileType = @import("./pipes.zig").FileType;

View File

@@ -77,9 +77,10 @@ pub const FileType = enum {
file,
pipe,
nonblocking_pipe,
socket,
pub fn isPollable(this: FileType) bool {
return this == .pipe or this == .nonblocking_pipe;
return this == .pipe or this == .nonblocking_pipe or this == .socket;
}
pub fn isBlocking(this: FileType) bool {

View File

@@ -567,13 +567,18 @@ const net_c = @cImport({
@cInclude("ifaddrs.h"); // getifaddrs, freeifaddrs
@cInclude("net/if.h"); // IFF_RUNNING, IFF_UP
@cInclude("fcntl.h"); // F_DUPFD_CLOEXEC
@cInclude("sys/socket.h");
});
pub const ifaddrs = net_c.ifaddrs;
pub const getifaddrs = net_c.getifaddrs;
pub const FD_CLOEXEC = net_c.FD_CLOEXEC;
pub const freeifaddrs = net_c.freeifaddrs;
pub const getifaddrs = net_c.getifaddrs;
pub const ifaddrs = net_c.ifaddrs;
pub const IFF_LOOPBACK = net_c.IFF_LOOPBACK;
pub const IFF_RUNNING = net_c.IFF_RUNNING;
pub const IFF_UP = net_c.IFF_UP;
pub const IFF_LOOPBACK = net_c.IFF_LOOPBACK;
pub const MSG_DONTWAIT = net_c.MSG_DONTWAIT;
pub const MSG_NOSIGNAL = net_c.MSG_NOSIGNAL;
pub const F = struct {
pub const DUPFD_CLOEXEC = net_c.F_DUPFD_CLOEXEC;

View File

@@ -1326,6 +1326,12 @@ pub fn read(fd: bun.FileDescriptor, buf: []u8) Maybe(usize) {
};
}
const socket_flags_nonblock = bun.C.MSG_DONTWAIT | bun.C.MSG_NOSIGNAL;
pub fn recvNonBlock(fd: bun.FileDescriptor, buf: []u8) Maybe(usize) {
return recv(fd, buf, socket_flags_nonblock);
}
pub fn recv(fd: bun.FileDescriptor, buf: []u8, flag: u32) Maybe(usize) {
const adjusted_len = @min(buf.len, max_count);
if (comptime Environment.allow_assert) {
@@ -1336,7 +1342,7 @@ pub fn recv(fd: bun.FileDescriptor, buf: []u8, flag: u32) Maybe(usize) {
if (comptime Environment.isMac) {
const rc = system.@"recvfrom$NOCANCEL"(fd.cast(), buf.ptr, adjusted_len, flag, null, null);
log("recv({d}, {d}, {d}) = {d}", .{ fd, adjusted_len, flag, rc });
log("recv({}, {d}) = {d}", .{ fd, adjusted_len, rc });
if (Maybe(usize).errnoSys(rc, .recv)) |err| {
return err;
@@ -1346,7 +1352,7 @@ pub fn recv(fd: bun.FileDescriptor, buf: []u8, flag: u32) Maybe(usize) {
} else {
while (true) {
const rc = linux.recvfrom(fd.cast(), buf.ptr, adjusted_len, flag | os.SOCK.CLOEXEC | linux.MSG.CMSG_CLOEXEC, null, null);
log("recv({d}, {d}, {d}) = {d}", .{ fd, adjusted_len, flag, rc });
log("recv({}, {d}) = {d}", .{ fd, adjusted_len, rc });
if (Maybe(usize).errnoSysFd(rc, .recv, fd)) |err| {
if (err.getErrno() == .INTR) continue;
@@ -1357,16 +1363,25 @@ pub fn recv(fd: bun.FileDescriptor, buf: []u8, flag: u32) Maybe(usize) {
}
}
pub fn sendNonBlock(fd: bun.FileDescriptor, buf: []const u8) Maybe(usize) {
return send(fd, buf, socket_flags_nonblock);
}
pub fn send(fd: bun.FileDescriptor, buf: []const u8, flag: u32) Maybe(usize) {
if (comptime Environment.isMac) {
const rc = system.@"sendto$NOCANCEL"(fd, buf.ptr, buf.len, flag, null, 0);
const rc = system.@"sendto$NOCANCEL"(fd.cast(), buf.ptr, buf.len, flag, null, 0);
syslog("send({}, {d}) = {d}", .{ fd, buf.len, rc });
if (Maybe(usize).errnoSys(rc, .send)) |err| {
return err;
}
return Maybe(usize){ .result = @as(usize, @intCast(rc)) };
} else {
while (true) {
const rc = linux.sendto(fd, buf.ptr, buf.len, flag | os.SOCK.CLOEXEC | os.MSG.NOSIGNAL, null, 0);
const rc = linux.sendto(fd.cast(), buf.ptr, buf.len, flag, null, 0);
syslog("send({}, {d}) = {d}", .{ fd, buf.len, rc });
if (Maybe(usize).errnoSys(rc, .send)) |err| {
if (err.getErrno() == .INTR) continue;

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env bash
myvar=$(cat /dev/stdin)
# On Linux/Cygwin, $(</dev/stdin) doesn't work when stdin is a socket.
myvar=$(cat)
echo -e "$myvar"

View File

@@ -94,7 +94,6 @@ describe("process.stdin", () => {
child.stdout
.on("readable", () => {
let chunk;
console.log("called");
while ((chunk = child.stdout.read()) !== null) {
data += chunk;
}

View File

@@ -511,7 +511,7 @@ describe("websocket in subprocess", () => {
if (isWindows) {
expect(await subprocess.exited).toBe(1);
} else {
expect(await subprocess.exited).toBe(129);
expect(await subprocess.exited).toBe(143);
}
});