mirror of
https://github.com/oven-sh/bun
synced 2026-02-13 12:29:07 +00:00
1741 lines
54 KiB
Zig
1741 lines
54 KiB
Zig
const std = @import("std");
|
|
const assert = std.debug.assert;
|
|
const Platform = @import("root").bun.analytics.GenerateHeader.GeneratePlatform;
|
|
const os = struct {
|
|
pub usingnamespace std.os;
|
|
pub const EPERM = 1;
|
|
pub const ENOENT = 2;
|
|
pub const ESRCH = 3;
|
|
pub const EINTR = 4;
|
|
pub const EIO = 5;
|
|
pub const ENXIO = 6;
|
|
pub const E2BIG = 7;
|
|
pub const ENOEXEC = 8;
|
|
pub const EBADF = 9;
|
|
pub const ECHILD = 10;
|
|
pub const EAGAIN = 11;
|
|
pub const ENOMEM = 12;
|
|
pub const EACCES = 13;
|
|
pub const EFAULT = 14;
|
|
pub const ENOTBLK = 15;
|
|
pub const EBUSY = 16;
|
|
pub const EEXIST = 17;
|
|
pub const EXDEV = 18;
|
|
pub const ENODEV = 19;
|
|
pub const ENOTDIR = 20;
|
|
pub const EISDIR = 21;
|
|
pub const EINVAL = 22;
|
|
pub const ENFILE = 23;
|
|
pub const EMFILE = 24;
|
|
pub const ENOTTY = 25;
|
|
pub const ETXTBSY = 26;
|
|
pub const EFBIG = 27;
|
|
pub const ENOSPC = 28;
|
|
pub const ESPIPE = 29;
|
|
pub const EROFS = 30;
|
|
pub const EMLINK = 31;
|
|
pub const EPIPE = 32;
|
|
pub const EDOM = 33;
|
|
pub const ERANGE = 34;
|
|
pub const EDEADLK = 35;
|
|
pub const ENAMETOOLONG = 36;
|
|
pub const ENOLCK = 37;
|
|
pub const ENOSYS = 38;
|
|
pub const ENOTEMPTY = 39;
|
|
pub const ELOOP = 40;
|
|
pub const EWOULDBLOCK = 41;
|
|
pub const ENOMSG = 42;
|
|
pub const EIDRM = 43;
|
|
pub const ECHRNG = 44;
|
|
pub const EL2NSYNC = 45;
|
|
pub const EL3HLT = 46;
|
|
pub const EL3RST = 47;
|
|
pub const ELNRNG = 48;
|
|
pub const EUNATCH = 49;
|
|
pub const ENOCSI = 50;
|
|
pub const EL2HLT = 51;
|
|
pub const EBADE = 52;
|
|
pub const EBADR = 53;
|
|
pub const EXFULL = 54;
|
|
pub const ENOANO = 55;
|
|
pub const EBADRQC = 56;
|
|
pub const EBADSLT = 57;
|
|
pub const EDEADLOCK = 58;
|
|
pub const EBFONT = 59;
|
|
pub const ENOSTR = 60;
|
|
pub const ENODATA = 61;
|
|
pub const ETIME = 62;
|
|
pub const ENOSR = 63;
|
|
pub const ENONET = 64;
|
|
pub const ENOPKG = 65;
|
|
pub const EREMOTE = 66;
|
|
pub const ENOLINK = 67;
|
|
pub const EADV = 68;
|
|
pub const ESRMNT = 69;
|
|
pub const ECOMM = 70;
|
|
pub const EPROTO = 71;
|
|
pub const EMULTIHOP = 72;
|
|
pub const EDOTDOT = 73;
|
|
pub const EBADMSG = 74;
|
|
pub const EOVERFLOW = 75;
|
|
pub const ENOTUNIQ = 76;
|
|
pub const EBADFD = 77;
|
|
pub const EREMCHG = 78;
|
|
pub const ELIBACC = 79;
|
|
pub const ELIBBAD = 80;
|
|
pub const ELIBSCN = 81;
|
|
pub const ELIBMAX = 82;
|
|
pub const ELIBEXEC = 83;
|
|
pub const EILSEQ = 84;
|
|
pub const ERESTART = 85;
|
|
pub const ESTRPIPE = 86;
|
|
pub const EUSERS = 87;
|
|
pub const ENOTSOCK = 88;
|
|
pub const EDESTADDRREQ = 89;
|
|
pub const EMSGSIZE = 90;
|
|
pub const EPROTOTYPE = 91;
|
|
pub const ENOPROTOOPT = 92;
|
|
pub const EPROTONOSUPPORT = 93;
|
|
pub const ESOCKTNOSUPPORT = 94;
|
|
/// For Linux, EOPNOTSUPP is the real value
|
|
/// but it's ~the same and is incompatible across operating systems
|
|
/// https://lists.gnu.org/archive/html/bug-glibc/2002-08/msg00017.html
|
|
pub const ENOTSUP = 95;
|
|
pub const EOPNOTSUPP = ENOTSUP;
|
|
pub const EPFNOSUPPORT = 96;
|
|
pub const EAFNOSUPPORT = 97;
|
|
pub const EADDRINUSE = 98;
|
|
pub const EADDRNOTAVAIL = 99;
|
|
pub const ENETDOWN = 100;
|
|
pub const ENETUNREACH = 101;
|
|
pub const ENETRESET = 102;
|
|
pub const ECONNABORTED = 103;
|
|
pub const ECONNRESET = 104;
|
|
pub const ENOBUFS = 105;
|
|
pub const EISCONN = 106;
|
|
pub const ENOTCONN = 107;
|
|
pub const ESHUTDOWN = 108;
|
|
pub const ETOOMANYREFS = 109;
|
|
pub const ETIMEDOUT = 110;
|
|
pub const ECONNREFUSED = 111;
|
|
pub const EHOSTDOWN = 112;
|
|
pub const EHOSTUNREACH = 113;
|
|
pub const EALREADY = 114;
|
|
pub const EINPROGRESS = 115;
|
|
pub const ESTALE = 116;
|
|
pub const EUCLEAN = 117;
|
|
pub const ENOTNAM = 118;
|
|
pub const ENAVAIL = 119;
|
|
pub const EISNAM = 120;
|
|
pub const EREMOTEIO = 121;
|
|
pub const EDQUOT = 122;
|
|
pub const ENOMEDIUM = 123;
|
|
pub const EMEDIUMTYPE = 124;
|
|
pub const ECANCELED = 125;
|
|
pub const ENOKEY = 126;
|
|
pub const EKEYEXPIRED = 127;
|
|
pub const EKEYREVOKED = 128;
|
|
pub const EKEYREJECTED = 129;
|
|
pub const EOWNERDEAD = 130;
|
|
pub const ENOTRECOVERABLE = 131;
|
|
pub const ERFKILL = 132;
|
|
pub const EHWPOISON = 133;
|
|
};
|
|
|
|
const Features = struct {
|
|
connect_blocking: bool = false,
|
|
close_blocking: bool = false,
|
|
replace_recv_with_readv: bool = false,
|
|
replace_send_with_writev: bool = false,
|
|
};
|
|
|
|
var features = Features{};
|
|
|
|
pub const Errno = error{
|
|
EPERM,
|
|
ENOENT,
|
|
ESRCH,
|
|
EINTR,
|
|
EIO,
|
|
ENXIO,
|
|
E2BIG,
|
|
ENOEXEC,
|
|
EBADF,
|
|
ECHILD,
|
|
EAGAIN,
|
|
ENOMEM,
|
|
EACCES,
|
|
EFAULT,
|
|
ENOTBLK,
|
|
EBUSY,
|
|
EEXIST,
|
|
EXDEV,
|
|
ENODEV,
|
|
ENOTDIR,
|
|
EISDIR,
|
|
EINVAL,
|
|
ENFILE,
|
|
EMFILE,
|
|
ENOTTY,
|
|
ETXTBSY,
|
|
EFBIG,
|
|
ENOSPC,
|
|
ESPIPE,
|
|
EROFS,
|
|
EMLINK,
|
|
EPIPE,
|
|
EDOM,
|
|
ERANGE,
|
|
EDEADLK,
|
|
ENAMETOOLONG,
|
|
ENOLCK,
|
|
ENOSYS,
|
|
ENOTEMPTY,
|
|
ELOOP,
|
|
EWOULDBLOCK,
|
|
ENOMSG,
|
|
EIDRM,
|
|
ECHRNG,
|
|
EL2NSYNC,
|
|
EL3HLT,
|
|
EL3RST,
|
|
ELNRNG,
|
|
EUNATCH,
|
|
ENOCSI,
|
|
EL2HLT,
|
|
EBADE,
|
|
EBADR,
|
|
EXFULL,
|
|
ENOANO,
|
|
EBADRQC,
|
|
EBADSLT,
|
|
EDEADLOCK,
|
|
EBFONT,
|
|
ENOSTR,
|
|
ENODATA,
|
|
ETIME,
|
|
ENOSR,
|
|
ENONET,
|
|
ENOPKG,
|
|
EREMOTE,
|
|
ENOLINK,
|
|
EADV,
|
|
ESRMNT,
|
|
ECOMM,
|
|
EPROTO,
|
|
EMULTIHOP,
|
|
EDOTDOT,
|
|
EBADMSG,
|
|
EOVERFLOW,
|
|
ENOTUNIQ,
|
|
EBADFD,
|
|
EREMCHG,
|
|
ELIBACC,
|
|
ELIBBAD,
|
|
ELIBSCN,
|
|
ELIBMAX,
|
|
ELIBEXEC,
|
|
EILSEQ,
|
|
ERESTART,
|
|
ESTRPIPE,
|
|
EUSERS,
|
|
ENOTSOCK,
|
|
EDESTADDRREQ,
|
|
EMSGSIZE,
|
|
EPROTOTYPE,
|
|
ENOPROTOOPT,
|
|
EPROTONOSUPPORT,
|
|
ESOCKTNOSUPPORT,
|
|
ENOTSUP,
|
|
EPFNOSUPPORT,
|
|
EAFNOSUPPORT,
|
|
EADDRINUSE,
|
|
EADDRNOTAVAIL,
|
|
ENETDOWN,
|
|
ENETUNREACH,
|
|
ENETRESET,
|
|
ECONNABORTED,
|
|
ECONNRESET,
|
|
ENOBUFS,
|
|
EISCONN,
|
|
ENOTCONN,
|
|
ESHUTDOWN,
|
|
ETOOMANYREFS,
|
|
ETIMEDOUT,
|
|
ECONNREFUSED,
|
|
EHOSTDOWN,
|
|
EHOSTUNREACH,
|
|
EALREADY,
|
|
EINPROGRESS,
|
|
ESTALE,
|
|
EUCLEAN,
|
|
ENOTNAM,
|
|
ENAVAIL,
|
|
EISNAM,
|
|
EREMOTEIO,
|
|
EDQUOT,
|
|
ENOMEDIUM,
|
|
EMEDIUMTYPE,
|
|
ECANCELED,
|
|
ENOKEY,
|
|
EKEYEXPIRED,
|
|
EKEYREVOKED,
|
|
EKEYREJECTED,
|
|
EOWNERDEAD,
|
|
ENOTRECOVERABLE,
|
|
ERFKILL,
|
|
EHWPOISON,
|
|
Unexpected,
|
|
};
|
|
pub const errno_map: [135]Errno = brk: {
|
|
var errors: [135]Errno = undefined;
|
|
errors[0] = error.Unexpected;
|
|
errors[1] = error.EPERM;
|
|
errors[2] = error.ENOENT;
|
|
errors[3] = error.ESRCH;
|
|
errors[4] = error.EINTR;
|
|
errors[5] = error.EIO;
|
|
errors[6] = error.ENXIO;
|
|
errors[7] = error.E2BIG;
|
|
errors[8] = error.ENOEXEC;
|
|
errors[9] = error.EBADF;
|
|
errors[10] = error.ECHILD;
|
|
errors[11] = error.EAGAIN;
|
|
errors[12] = error.ENOMEM;
|
|
errors[13] = error.EACCES;
|
|
errors[14] = error.EFAULT;
|
|
errors[15] = error.ENOTBLK;
|
|
errors[16] = error.EBUSY;
|
|
errors[17] = error.EEXIST;
|
|
errors[18] = error.EXDEV;
|
|
errors[19] = error.ENODEV;
|
|
errors[20] = error.ENOTDIR;
|
|
errors[21] = error.EISDIR;
|
|
errors[22] = error.EINVAL;
|
|
errors[23] = error.ENFILE;
|
|
errors[24] = error.EMFILE;
|
|
errors[25] = error.ENOTTY;
|
|
errors[26] = error.ETXTBSY;
|
|
errors[27] = error.EFBIG;
|
|
errors[28] = error.ENOSPC;
|
|
errors[29] = error.ESPIPE;
|
|
errors[30] = error.EROFS;
|
|
errors[31] = error.EMLINK;
|
|
errors[32] = error.EPIPE;
|
|
errors[33] = error.EDOM;
|
|
errors[34] = error.ERANGE;
|
|
errors[35] = error.EDEADLK;
|
|
errors[36] = error.ENAMETOOLONG;
|
|
errors[37] = error.ENOLCK;
|
|
errors[38] = error.ENOSYS;
|
|
errors[39] = error.ENOTEMPTY;
|
|
errors[40] = error.ELOOP;
|
|
errors[41] = error.EWOULDBLOCK;
|
|
errors[42] = error.ENOMSG;
|
|
errors[43] = error.EIDRM;
|
|
errors[44] = error.ECHRNG;
|
|
errors[45] = error.EL2NSYNC;
|
|
errors[46] = error.EL3HLT;
|
|
errors[47] = error.EL3RST;
|
|
errors[48] = error.ELNRNG;
|
|
errors[49] = error.EUNATCH;
|
|
errors[50] = error.ENOCSI;
|
|
errors[51] = error.EL2HLT;
|
|
errors[52] = error.EBADE;
|
|
errors[53] = error.EBADR;
|
|
errors[54] = error.EXFULL;
|
|
errors[55] = error.ENOANO;
|
|
errors[56] = error.EBADRQC;
|
|
errors[57] = error.EBADSLT;
|
|
errors[58] = error.EDEADLOCK;
|
|
errors[59] = error.EBFONT;
|
|
errors[60] = error.ENOSTR;
|
|
errors[61] = error.ENODATA;
|
|
errors[62] = error.ETIME;
|
|
errors[63] = error.ENOSR;
|
|
errors[64] = error.ENONET;
|
|
errors[65] = error.ENOPKG;
|
|
errors[66] = error.EREMOTE;
|
|
errors[67] = error.ENOLINK;
|
|
errors[68] = error.EADV;
|
|
errors[69] = error.ESRMNT;
|
|
errors[70] = error.ECOMM;
|
|
errors[71] = error.EPROTO;
|
|
errors[72] = error.EMULTIHOP;
|
|
errors[73] = error.EDOTDOT;
|
|
errors[74] = error.EBADMSG;
|
|
errors[75] = error.EOVERFLOW;
|
|
errors[76] = error.ENOTUNIQ;
|
|
errors[77] = error.EBADFD;
|
|
errors[78] = error.EREMCHG;
|
|
errors[79] = error.ELIBACC;
|
|
errors[80] = error.ELIBBAD;
|
|
errors[81] = error.ELIBSCN;
|
|
errors[82] = error.ELIBMAX;
|
|
errors[83] = error.ELIBEXEC;
|
|
errors[84] = error.EILSEQ;
|
|
errors[85] = error.ERESTART;
|
|
errors[86] = error.ESTRPIPE;
|
|
errors[87] = error.EUSERS;
|
|
errors[88] = error.ENOTSOCK;
|
|
errors[89] = error.EDESTADDRREQ;
|
|
errors[90] = error.EMSGSIZE;
|
|
errors[91] = error.EPROTOTYPE;
|
|
errors[92] = error.ENOPROTOOPT;
|
|
errors[93] = error.EPROTONOSUPPORT;
|
|
errors[94] = error.ESOCKTNOSUPPORT;
|
|
errors[95] = error.ENOTSUP;
|
|
errors[96] = error.EPFNOSUPPORT;
|
|
errors[97] = error.EAFNOSUPPORT;
|
|
errors[98] = error.EADDRINUSE;
|
|
errors[99] = error.EADDRNOTAVAIL;
|
|
errors[100] = error.ENETDOWN;
|
|
errors[101] = error.ENETUNREACH;
|
|
errors[102] = error.ENETRESET;
|
|
errors[103] = error.ECONNABORTED;
|
|
errors[104] = error.ECONNRESET;
|
|
errors[105] = error.ENOBUFS;
|
|
errors[106] = error.EISCONN;
|
|
errors[107] = error.ENOTCONN;
|
|
errors[108] = error.ESHUTDOWN;
|
|
errors[109] = error.ETOOMANYREFS;
|
|
errors[110] = error.ETIMEDOUT;
|
|
errors[111] = error.ECONNREFUSED;
|
|
errors[112] = error.EHOSTDOWN;
|
|
errors[113] = error.EHOSTUNREACH;
|
|
errors[114] = error.EALREADY;
|
|
errors[115] = error.EINPROGRESS;
|
|
errors[116] = error.ESTALE;
|
|
errors[117] = error.EUCLEAN;
|
|
errors[118] = error.ENOTNAM;
|
|
errors[119] = error.ENAVAIL;
|
|
errors[120] = error.EISNAM;
|
|
errors[121] = error.EREMOTEIO;
|
|
errors[122] = error.EDQUOT;
|
|
errors[123] = error.ENOMEDIUM;
|
|
errors[124] = error.EMEDIUMTYPE;
|
|
errors[125] = error.ECANCELED;
|
|
errors[126] = error.ENOKEY;
|
|
errors[127] = error.EKEYEXPIRED;
|
|
errors[128] = error.EKEYREVOKED;
|
|
errors[129] = error.EKEYREJECTED;
|
|
errors[130] = error.EOWNERDEAD;
|
|
errors[131] = error.ENOTRECOVERABLE;
|
|
errors[132] = error.ERFKILL;
|
|
errors[133] = error.EHWPOISON;
|
|
errors[134] = error.Unexpected;
|
|
break :brk errors;
|
|
};
|
|
pub fn asError(err: anytype) Errno {
|
|
const errnum = if (@typeInfo(@TypeOf(err)) == .Enum)
|
|
@enumToInt(err)
|
|
else
|
|
err;
|
|
return switch (errnum) {
|
|
1...errno_map.len => errno_map[@intCast(u8, errnum)],
|
|
else => error.Unexpected,
|
|
};
|
|
}
|
|
|
|
const timespec = linux.timespec;
|
|
const linux = os.linux;
|
|
const IO_Uring = linux.IO_Uring;
|
|
const io_uring_cqe = linux.io_uring_cqe;
|
|
const io_uring_sqe = linux.io_uring_sqe;
|
|
|
|
const FIFO = @import("./fifo.zig").FIFO;
|
|
const IO = @This();
|
|
|
|
ring: IO_Uring,
|
|
|
|
/// Operations not yet submitted to the kernel and waiting on available space in the
|
|
/// submission queue.
|
|
unqueued: FIFO(Completion) = .{},
|
|
|
|
/// Completions that are ready to have their callbacks run.
|
|
completed: FIFO(Completion) = .{},
|
|
|
|
next_tick: FIFO(Completion) = .{},
|
|
event_fd: linux.fd_t = 0,
|
|
|
|
eventfd_buf: [16]u8 = undefined,
|
|
has_queued: usize = 0,
|
|
wakeup_completion: Completion = undefined,
|
|
|
|
fn queueForWakeup(this: *@This(), comptime Type: type, ctx: Type, comptime cb: anytype) void {
|
|
@memset(&this.eventfd_buf, 0, this.eventfd_buf.len);
|
|
const Callback = struct {
|
|
pub fn callback(that: Type, completion: *Completion, _: ReadError!usize) void {
|
|
var io = @fieldParentPtr(IO, "wakeup_completion", completion);
|
|
io.has_queued -|= 1;
|
|
cb(that);
|
|
}
|
|
};
|
|
this.read(
|
|
Type,
|
|
ctx,
|
|
Callback.callback,
|
|
&this.wakeup_completion,
|
|
this.event_fd,
|
|
&this.eventfd_buf,
|
|
null,
|
|
);
|
|
this.has_queued +|= 1;
|
|
}
|
|
|
|
pub fn wait(this: *@This(), ptr: anytype, comptime onReady: anytype) void {
|
|
// Subscribe to wakeups
|
|
if (this.has_queued == 0) {
|
|
this.queueForWakeup(@TypeOf(ptr), ptr, onReady);
|
|
}
|
|
|
|
this.tick() catch {};
|
|
|
|
if (this.has_queued == 0) {
|
|
return;
|
|
}
|
|
const submitted = this.ring.flush_sq();
|
|
_ = this.ring.enter(submitted, 1, linux.IORING_ENTER_GETEVENTS) catch 0;
|
|
}
|
|
|
|
pub fn init(entries_: u12, flags: u32, waker: Waker) !IO {
|
|
var ring: IO_Uring = undefined;
|
|
var entries = entries_;
|
|
|
|
const kernel = Platform.kernelVersion();
|
|
|
|
if (kernel.orderWithoutTag(@TypeOf(kernel){ .major = 5, .minor = 5, .patch = 0 }) == .lt) {
|
|
features.connect_blocking = true;
|
|
}
|
|
|
|
if (kernel.orderWithoutTag(@TypeOf(kernel){ .major = 5, .minor = 6, .patch = 0 }) == .lt) {
|
|
features.close_blocking = true;
|
|
features.replace_recv_with_readv = true;
|
|
features.replace_send_with_writev = true;
|
|
}
|
|
|
|
var limit = linux.rlimit{ .cur = 0, .max = 0 };
|
|
|
|
if (linux.getrlimit(.MEMLOCK, &limit) == 0) {
|
|
if (limit.cur < 16 * 1024) {
|
|
return error.@"memlock is too low. Please increase it to at least 64k";
|
|
}
|
|
|
|
if (limit.cur < 128 * 1024) {
|
|
entries = @min(256, entries);
|
|
}
|
|
}
|
|
|
|
while (true) {
|
|
ring = IO_Uring.init(entries, flags) catch |err| {
|
|
if (err == error.SystemResources) {
|
|
if (entries <= 8) return error.SystemResources;
|
|
// We divide by 4 instead of 2
|
|
// This way, a child process that uses io_uring can still function
|
|
entries /= 4;
|
|
continue;
|
|
}
|
|
|
|
return err;
|
|
};
|
|
break;
|
|
}
|
|
|
|
return IO{ .ring = ring, .event_fd = waker.fd };
|
|
}
|
|
|
|
pub fn deinit(self: *IO) void {
|
|
self.ring.deinit();
|
|
}
|
|
|
|
/// Pass all queued submissions to the kernel and peek for completions.
|
|
pub fn tick(self: *IO) !void {
|
|
while (self.next_tick.pop()) |completion| {
|
|
completion.complete();
|
|
}
|
|
// We assume that all timeouts submitted by `run_for_ns()` will be reaped by `run_for_ns()`
|
|
// and that `tick()` and `run_for_ns()` cannot be run concurrently.
|
|
// Therefore `timeouts` here will never be decremented and `etime` will always be false.
|
|
var timeouts: usize = 0;
|
|
var etime = false;
|
|
|
|
try self.flush(0, &timeouts, &etime);
|
|
assert(etime == false);
|
|
|
|
// Flush any SQEs that were queued while running completion callbacks in `flush()`:
|
|
// This is an optimization to avoid delaying submissions until the next tick.
|
|
// At the same time, we do not flush any ready CQEs since SQEs may complete synchronously.
|
|
// We guard against an io_uring_enter() syscall if we know we do not have any queued SQEs.
|
|
// We cannot use `self.ring.sq_ready()` here since this counts flushed and unflushed SQEs.
|
|
const queued = self.ring.sq.sqe_tail -% self.ring.sq.sqe_head;
|
|
if (queued > 0) {
|
|
try self.flush_submissions(0, &timeouts, &etime);
|
|
assert(etime == false);
|
|
}
|
|
}
|
|
|
|
/// Pass all queued submissions to the kernel and run for `nanoseconds`.
|
|
/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used
|
|
/// in the timespec struct.
|
|
pub fn run_for_ns(self: *IO, nanoseconds: u63) !void {
|
|
assert(nanoseconds > 0);
|
|
|
|
while (self.next_tick.pop()) |completion| {
|
|
completion.complete();
|
|
}
|
|
|
|
// We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the
|
|
// timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are
|
|
// dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC.
|
|
var current_ts: timespec = undefined;
|
|
os.clock_gettime(os.CLOCK.MONOTONIC, ¤t_ts) catch unreachable;
|
|
// The absolute CLOCK_MONOTONIC time after which we may return from this function:
|
|
const timeout_ts: timespec = .{
|
|
.tv_sec = current_ts.tv_sec,
|
|
.tv_nsec = current_ts.tv_nsec + nanoseconds,
|
|
};
|
|
var timeouts: usize = 0;
|
|
var etime = false;
|
|
while (!etime) {
|
|
const timeout_sqe = self.ring.get_sqe() catch blk: {
|
|
// The submission queue is full, so flush submissions to make space:
|
|
try self.flush_submissions(0, &timeouts, &etime);
|
|
break :blk self.ring.get_sqe() catch unreachable;
|
|
};
|
|
// Submit an absolute timeout that will be canceled if any other SQE completes first:
|
|
linux.io_uring_prep_timeout(timeout_sqe, &timeout_ts, 1, linux.IORING_TIMEOUT_ABS);
|
|
timeout_sqe.user_data = 0;
|
|
timeouts += 1;
|
|
// The amount of time this call will block is bounded by the timeout we just submitted:
|
|
try self.flush(1, &timeouts, &etime);
|
|
}
|
|
// Reap any remaining timeouts, which reference the timespec in the current stack frame.
|
|
// The busy loop here is required to avoid a potential deadlock, as the kernel determines
|
|
// when the timeouts are pushed to the completion queue, not us.
|
|
while (timeouts > 0) _ = try self.flush_completions(0, &timeouts, &etime);
|
|
}
|
|
|
|
fn flush(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
|
|
// Flush any queued SQEs and reuse the same syscall to wait for completions if required:
|
|
try self.flush_submissions(wait_nr, timeouts, etime);
|
|
// We can now just peek for any CQEs without waiting and without another syscall:
|
|
try self.flush_completions(0, timeouts, etime);
|
|
// Run completions only after all completions have been flushed:
|
|
// Loop on a copy of the linked list, having reset the list first, so that any synchronous
|
|
// append on running a completion is executed only the next time round the event loop,
|
|
// without creating an infinite loop.
|
|
{
|
|
var copy = self.completed;
|
|
self.completed = .{};
|
|
while (copy.pop()) |completion| completion.complete();
|
|
}
|
|
// Again, loop on a copy of the list to avoid an infinite loop:
|
|
{
|
|
var copy = self.unqueued;
|
|
self.unqueued = .{};
|
|
while (copy.pop()) |completion| self.enqueue(completion);
|
|
}
|
|
}
|
|
|
|
fn flush_completions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
|
|
var cqes: [256]std.os.linux.io_uring_cqe = undefined;
|
|
var completion_byttes = std.mem.asBytes(&cqes);
|
|
@memset(completion_byttes, 0, completion_byttes.len);
|
|
var wait_remaining = wait_nr;
|
|
while (true) {
|
|
// Guard against waiting indefinitely (if there are too few requests inflight),
|
|
// especially if this is not the first time round the loop:
|
|
const completed = self.ring.copy_cqes(&cqes, wait_remaining) catch |err| switch (err) {
|
|
error.SignalInterrupt => continue,
|
|
else => return err,
|
|
};
|
|
if (completed > wait_remaining) wait_remaining = 0 else wait_remaining -= completed;
|
|
for (cqes[0..completed]) |cqe| {
|
|
if (cqe.user_data == 0) {
|
|
timeouts.* -= 1;
|
|
// We are only done if the timeout submitted was completed due to time, not if
|
|
// it was completed due to the completion of an event, in which case `cqe.res`
|
|
// would be 0. It is possible for multiple timeout operations to complete at the
|
|
// same time if the nanoseconds value passed to `run_for_ns()` is very short.
|
|
if (-cqe.res == os.ETIME) etime.* = true;
|
|
continue;
|
|
}
|
|
const completion = @intToPtr(*Completion, @intCast(usize, cqe.user_data));
|
|
completion.result = cqe.res;
|
|
// We do not run the completion here (instead appending to a linked list) to avoid:
|
|
// * recursion through `flush_submissions()` and `flush_completions()`,
|
|
// * unbounded stack usage, and
|
|
// * confusing stack traces.
|
|
self.completed.push(completion);
|
|
}
|
|
if (completed < cqes.len) break;
|
|
}
|
|
}
|
|
|
|
fn flush_submissions(self: *IO, wait_nr: u32, timeouts: *usize, etime: *bool) !void {
|
|
while (true) {
|
|
_ = self.ring.submit_and_wait(wait_nr) catch |err| switch (err) {
|
|
error.SignalInterrupt => continue,
|
|
// Wait for some completions and then try again:
|
|
// See https://github.com/axboe/liburing/issues/281 re: error.SystemResources.
|
|
// Be careful also that copy_cqes() will flush before entering to wait (it does):
|
|
// https://github.com/axboe/liburing/commit/35c199c48dfd54ad46b96e386882e7ac341314c5
|
|
error.CompletionQueueOvercommitted, error.SystemResources => {
|
|
try self.flush_completions(1, timeouts, etime);
|
|
continue;
|
|
},
|
|
else => return err,
|
|
};
|
|
break;
|
|
}
|
|
}
|
|
|
|
fn enqueue(self: *IO, completion: *Completion) void {
|
|
const sqe = self.ring.get_sqe() catch |err| switch (err) {
|
|
error.SubmissionQueueFull => {
|
|
self.unqueued.push(completion);
|
|
return;
|
|
},
|
|
};
|
|
completion.prep(sqe);
|
|
}
|
|
|
|
/// This struct holds the data needed for a single io_uring operation
|
|
pub const Completion = struct {
|
|
io: *IO,
|
|
result: i32 = undefined,
|
|
next: ?*Completion = null,
|
|
operation: Operation,
|
|
// This is one of the usecases for anyopaque outside of C code and as such anyopaque will
|
|
// be replaced with anyopaque eventually: https://github.com/ziglang/zig/issues/323
|
|
context: ?*anyopaque,
|
|
callback: *const fn (context: ?*anyopaque, completion: *Completion, result: *const anyopaque) void,
|
|
|
|
fn prep(completion: *Completion, sqe: *io_uring_sqe) void {
|
|
switch (completion.operation) {
|
|
.accept => |*op| {
|
|
linux.io_uring_prep_accept(
|
|
sqe,
|
|
op.socket,
|
|
&op.address,
|
|
&op.address_size,
|
|
os.SOCK.CLOEXEC,
|
|
);
|
|
},
|
|
.close => |op| {
|
|
linux.io_uring_prep_close(sqe, op.fd);
|
|
},
|
|
.connect => |*op| {
|
|
linux.io_uring_prep_connect(
|
|
sqe,
|
|
op.socket,
|
|
&op.address.any,
|
|
op.address.getOsSockLen(),
|
|
);
|
|
},
|
|
.fsync => |op| {
|
|
linux.io_uring_prep_fsync(sqe, op.fd, 0);
|
|
},
|
|
.read => |op| {
|
|
linux.io_uring_prep_read(
|
|
sqe,
|
|
op.fd,
|
|
op.buffer[0..buffer_limit(op.buffer.len)],
|
|
op.offset,
|
|
);
|
|
},
|
|
.open => |op| {
|
|
linux.io_uring_prep_openat(
|
|
sqe,
|
|
linux.AT.FDCWD,
|
|
op.path,
|
|
op.flags,
|
|
op.mode,
|
|
);
|
|
},
|
|
.readev => {
|
|
var op = &completion.operation.readev;
|
|
linux.io_uring_prep_readv(sqe, op.socket, &op.iovecs, 0);
|
|
},
|
|
.writev => {
|
|
var op = &completion.operation.writev;
|
|
linux.io_uring_prep_writev(sqe, op.socket, &op.iovecs, 0);
|
|
},
|
|
.recv => |op| {
|
|
linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL);
|
|
},
|
|
.send => |op| {
|
|
linux.io_uring_prep_send(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL);
|
|
},
|
|
.timeout => {
|
|
var op = &completion.operation.timeout;
|
|
linux.io_uring_prep_timeout(sqe, &op.timespec, 0, 0);
|
|
},
|
|
.write => |op| {
|
|
linux.io_uring_prep_write(
|
|
sqe,
|
|
op.fd,
|
|
op.buffer[0..buffer_limit(op.buffer.len)],
|
|
op.offset,
|
|
);
|
|
},
|
|
}
|
|
sqe.user_data = @ptrToInt(completion);
|
|
}
|
|
|
|
fn complete(completion: *Completion) void {
|
|
switch (completion.operation) {
|
|
.accept => {
|
|
const result: AcceptError!os.socket_t = if (completion.result < 0) switch (-completion.result) {
|
|
os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.EAGAIN => error.WouldBlock,
|
|
os.EBADF => error.FileDescriptorInvalid,
|
|
os.ECONNABORTED => error.ConnectionAborted,
|
|
os.EINVAL => error.SocketNotListening,
|
|
os.EMFILE => error.ProcessFdQuotaExceeded,
|
|
os.ENFILE => error.SystemFdQuotaExceeded,
|
|
os.ENOBUFS => error.SystemResources,
|
|
os.ENOMEM => error.SystemResources,
|
|
os.ENOTSOCK => error.FileDescriptorNotASocket,
|
|
os.EOPNOTSUPP => error.OperationNotSupported,
|
|
os.EPERM => error.PermissionDenied,
|
|
os.EPROTO => error.ProtocolFailure,
|
|
else => |errno| asError(errno),
|
|
} else @intCast(os.socket_t, completion.result);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
|
|
.close => {
|
|
const result = if (completion.result < 0) switch (-completion.result) {
|
|
os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
|
|
os.EBADF => error.FileDescriptorInvalid,
|
|
os.EDQUOT => error.DiskQuota,
|
|
os.EIO => error.InputOutput,
|
|
os.ENOSPC => error.NoSpaceLeft,
|
|
else => |errno| asError(errno),
|
|
} else assert(completion.result == 0);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
.open => {
|
|
const result: OpenError!linux.fd_t = if (completion.result < 0) switch (-completion.result) {
|
|
0 => unreachable,
|
|
os.EAGAIN, os.EINPROGRESS, os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.EACCES => error.AccessDenied,
|
|
os.EFBIG => error.FileTooBig,
|
|
os.EOVERFLOW => error.FileTooBig,
|
|
os.EISDIR => error.IsDir,
|
|
os.ELOOP => error.SymLinkLoop,
|
|
os.EMFILE => error.ProcessFdQuotaExceeded,
|
|
os.ENAMETOOLONG => error.NameTooLong,
|
|
os.ENFILE => error.SystemFdQuotaExceeded,
|
|
os.ENODEV => error.NoDevice,
|
|
os.ENOENT => error.FileNotFound,
|
|
os.ENOMEM => error.SystemResources,
|
|
os.ENOSPC => error.NoSpaceLeft,
|
|
os.ENOTDIR => error.NotDir,
|
|
os.EPERM => error.AccessDenied,
|
|
os.EEXIST => error.PathAlreadyExists,
|
|
os.EBUSY => error.DeviceBusy,
|
|
else => |errno| asError(errno),
|
|
} else @intCast(linux.fd_t, completion.result);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
.connect => {
|
|
const result: ConnectError!void = if (completion.result < 0) switch (-completion.result) {
|
|
os.EAGAIN, os.EINPROGRESS, os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.EACCES => error.AccessDenied,
|
|
os.EADDRINUSE => error.AddressInUse,
|
|
os.EADDRNOTAVAIL => error.AddressNotAvailable,
|
|
os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
|
|
os.EALREADY => error.OpenAlreadyInProgress,
|
|
os.EBADF => error.FileDescriptorInvalid,
|
|
os.ECONNREFUSED => error.ConnectionRefused,
|
|
os.ECONNRESET => error.ConnectionResetByPeer,
|
|
os.EISCONN => error.AlreadyConnected,
|
|
os.ENETUNREACH => error.NetworkUnreachable,
|
|
os.ENOENT => error.FileNotFound,
|
|
os.ENOTSOCK => error.FileDescriptorNotASocket,
|
|
os.EPERM => error.PermissionDenied,
|
|
os.EPROTOTYPE => error.ProtocolNotSupported,
|
|
os.ETIMEDOUT => error.ConnectionTimedOut,
|
|
else => |errno| asError(errno),
|
|
} else assert(completion.result == 0);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
.fsync => {
|
|
const result: FsyncError!void = if (completion.result < 0) switch (-completion.result) {
|
|
os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.EBADF => error.FileDescriptorInvalid,
|
|
os.EDQUOT => error.DiskQuota,
|
|
os.EINVAL => error.ArgumentsInvalid,
|
|
os.EIO => error.InputOutput,
|
|
os.ENOSPC => error.NoSpaceLeft,
|
|
os.EROFS => error.ReadOnlyFileSystem,
|
|
else => |errno| asError(errno),
|
|
} else assert(completion.result == 0);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
.read => {
|
|
const result: ReadError!usize = if (completion.result < 0) switch (-completion.result) {
|
|
os.EAGAIN, os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.EBADF => error.NotOpenForReading,
|
|
os.ECONNRESET => error.ConnectionResetByPeer,
|
|
os.EINVAL => error.Alignment,
|
|
os.EIO => error.InputOutput,
|
|
os.EISDIR => error.IsDir,
|
|
os.ENOBUFS => error.SystemResources,
|
|
os.ENOMEM => error.SystemResources,
|
|
os.ENXIO => error.Unseekable,
|
|
os.EOVERFLOW => error.Unseekable,
|
|
os.ESPIPE => error.Unseekable,
|
|
else => |errno| asError(errno),
|
|
} else @intCast(usize, completion.result);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
.readev, .recv => {
|
|
const result: RecvError!usize = if (completion.result < 0) switch (-completion.result) {
|
|
os.EAGAIN, os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.EBADF => error.FileDescriptorInvalid,
|
|
os.ECONNREFUSED => error.ConnectionRefused,
|
|
os.ENOMEM => error.SystemResources,
|
|
os.ENOTCONN => error.SocketNotConnected,
|
|
os.ENOTSOCK => error.FileDescriptorNotASocket,
|
|
os.ECONNRESET => error.ConnectionResetByPeer,
|
|
else => |errno| asError(errno),
|
|
} else @intCast(usize, completion.result);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
.writev, .send => {
|
|
const result: SendError!usize = if (completion.result < 0) switch (-completion.result) {
|
|
os.EAGAIN, os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.EACCES => error.AccessDenied,
|
|
os.EALREADY => error.FastOpenAlreadyInProgress,
|
|
os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
|
|
os.EBADF => error.FileDescriptorInvalid,
|
|
os.ECONNRESET => error.ConnectionResetByPeer,
|
|
os.EMSGSIZE => error.MessageTooBig,
|
|
os.ENOBUFS => error.SystemResources,
|
|
os.ENOMEM => error.SystemResources,
|
|
os.ENOTCONN => error.SocketNotConnected,
|
|
os.ENOTSOCK => error.FileDescriptorNotASocket,
|
|
os.EOPNOTSUPP => error.OperationNotSupported,
|
|
os.EPIPE => error.BrokenPipe,
|
|
else => |errno| asError(errno),
|
|
} else @intCast(usize, completion.result);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
.timeout => {
|
|
const result: TimeoutError!void = if (completion.result < 0) switch (-completion.result) {
|
|
os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.ECANCELED => error.Canceled,
|
|
os.ETIME => {}, // A success.
|
|
else => |errno| asError(errno),
|
|
} else {};
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
.write => {
|
|
const result: WriteError!usize = if (completion.result < 0) switch (-completion.result) {
|
|
os.EINTR => {
|
|
completion.io.enqueue(completion);
|
|
return;
|
|
},
|
|
os.EAGAIN => error.WouldBlock,
|
|
os.EBADF => error.NotOpenForWriting,
|
|
os.EDESTADDRREQ => error.NotConnected,
|
|
os.EDQUOT => error.DiskQuota,
|
|
os.EFBIG => error.FileTooBig,
|
|
os.EINVAL => error.Alignment,
|
|
os.EIO => error.InputOutput,
|
|
os.ENOSPC => error.NoSpaceLeft,
|
|
os.ENXIO => error.Unseekable,
|
|
os.EOVERFLOW => error.Unseekable,
|
|
os.EPERM => error.AccessDenied,
|
|
os.EPIPE => error.BrokenPipe,
|
|
os.ESPIPE => error.Unseekable,
|
|
else => |errno| asError(errno),
|
|
} else @intCast(usize, completion.result);
|
|
completion.callback(completion.context, completion, &result);
|
|
},
|
|
}
|
|
}
|
|
};
|
|
|
|
pub const Waker = struct {
|
|
fd: os.fd_t,
|
|
|
|
pub fn init(allocator: std.mem.Allocator) !Waker {
|
|
return initWithFileDescriptor(allocator, @intCast(os.fd_t, try std.os.eventfd(0, 0)));
|
|
}
|
|
|
|
pub fn initWithFileDescriptor(_: std.mem.Allocator, fd: os.fd_t) Waker {
|
|
return Waker{
|
|
.fd = fd,
|
|
};
|
|
}
|
|
|
|
pub fn wait(this: Waker) !u64 {
|
|
var bytes: usize = 0;
|
|
_ = std.os.read(this.fd, @ptrCast(*[8]u8, &bytes)) catch 0;
|
|
return @intCast(u64, bytes);
|
|
}
|
|
|
|
pub fn wake(this: *const Waker) !void {
|
|
var bytes: usize = 1;
|
|
_ = std.os.write(
|
|
this.fd,
|
|
@ptrCast(*[8]u8, &bytes),
|
|
) catch 0;
|
|
}
|
|
};
|
|
|
|
/// This union encodes the set of operations supported as well as their arguments.
|
|
const Operation = union(enum) {
|
|
accept: struct {
|
|
socket: os.socket_t,
|
|
address: os.sockaddr = undefined,
|
|
address_size: os.socklen_t = @sizeOf(os.sockaddr),
|
|
},
|
|
close: struct {
|
|
fd: os.fd_t,
|
|
},
|
|
connect: struct {
|
|
socket: os.socket_t,
|
|
address: std.net.Address,
|
|
},
|
|
fsync: struct {
|
|
fd: os.fd_t,
|
|
},
|
|
open: struct {
|
|
path: [*:0]const u8,
|
|
flags: u32,
|
|
mode: os.mode_t,
|
|
},
|
|
read: struct {
|
|
fd: os.fd_t,
|
|
buffer: []u8,
|
|
offset: u64,
|
|
},
|
|
readev: struct {
|
|
socket: os.socket_t,
|
|
iovecs: [1]os.iovec,
|
|
},
|
|
writev: struct {
|
|
socket: os.socket_t,
|
|
iovecs: [1]os.iovec_const,
|
|
},
|
|
recv: struct {
|
|
socket: os.socket_t,
|
|
buffer: []u8,
|
|
},
|
|
send: struct {
|
|
socket: os.socket_t,
|
|
buffer: []const u8,
|
|
},
|
|
timeout: struct {
|
|
timespec: linux.timespec,
|
|
},
|
|
write: struct {
|
|
fd: os.fd_t,
|
|
buffer: []const u8,
|
|
offset: u64,
|
|
},
|
|
|
|
pub fn slice(this: Operation) []const u8 {
|
|
return switch (this) {
|
|
.write => |op| op.buffer,
|
|
.send => |op| op.buffer,
|
|
.recv => |op| op.buffer,
|
|
.read => |op| op.buffer,
|
|
else => &[_]u8{},
|
|
};
|
|
}
|
|
};
|
|
|
|
pub const AcceptError = error{
|
|
WouldBlock,
|
|
FileDescriptorInvalid,
|
|
ConnectionAborted,
|
|
SocketNotListening,
|
|
ProcessFdQuotaExceeded,
|
|
SystemFdQuotaExceeded,
|
|
SystemResources,
|
|
FileDescriptorNotASocket,
|
|
OperationNotSupported,
|
|
PermissionDenied,
|
|
ProtocolFailure,
|
|
} || Errno;
|
|
|
|
pub fn accept(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: AcceptError!os.socket_t,
|
|
) void,
|
|
completion: *Completion,
|
|
socket: os.socket_t,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const AcceptError!os.socket_t, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.accept = .{
|
|
.socket = socket,
|
|
.address = undefined,
|
|
.address_size = @sizeOf(os.sockaddr),
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const CloseError = error{
|
|
FileDescriptorInvalid,
|
|
DiskQuota,
|
|
InputOutput,
|
|
NoSpaceLeft,
|
|
} || Errno;
|
|
|
|
pub fn close(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: CloseError!void,
|
|
) void,
|
|
completion: *Completion,
|
|
fd: os.fd_t,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const CloseError!void, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{ .close = .{ .fd = fd } },
|
|
};
|
|
|
|
if (features.close_blocking) {
|
|
const rc = linux.close(fd);
|
|
completion.result = @intCast(i32, rc);
|
|
self.next_tick.push(completion);
|
|
return;
|
|
}
|
|
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const ConnectError = error{
|
|
AccessDenied,
|
|
AddressInUse,
|
|
AddressNotAvailable,
|
|
AddressFamilyNotSupported,
|
|
WouldBlock,
|
|
OpenAlreadyInProgress,
|
|
FileDescriptorInvalid,
|
|
ConnectionRefused,
|
|
AlreadyConnected,
|
|
NetworkUnreachable,
|
|
FileNotFound,
|
|
FileDescriptorNotASocket,
|
|
PermissionDenied,
|
|
ProtocolNotSupported,
|
|
ConnectionTimedOut,
|
|
ConnectionResetByPeer,
|
|
} || Errno;
|
|
|
|
pub fn connect(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: ConnectError!void,
|
|
) void,
|
|
completion: *Completion,
|
|
socket: os.socket_t,
|
|
address: std.net.Address,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const ConnectError!void, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.connect = .{
|
|
.socket = socket,
|
|
.address = address,
|
|
},
|
|
},
|
|
};
|
|
|
|
if (features.connect_blocking) {
|
|
const rc = linux.connect(socket, &address.any, address.getOsSockLen());
|
|
completion.result = @intCast(i32, rc);
|
|
self.completed.push(completion);
|
|
return;
|
|
}
|
|
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const FsyncError = error{
|
|
FileDescriptorInvalid,
|
|
DiskQuota,
|
|
ArgumentsInvalid,
|
|
InputOutput,
|
|
NoSpaceLeft,
|
|
ReadOnlyFileSystem,
|
|
} || Errno;
|
|
|
|
pub fn fsync(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: FsyncError!void,
|
|
) void,
|
|
completion: *Completion,
|
|
fd: os.fd_t,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const FsyncError!void, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.fsync = .{
|
|
.fd = fd,
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const ReadError = error{
|
|
WouldBlock,
|
|
NotOpenForReading,
|
|
ConnectionResetByPeer,
|
|
Alignment,
|
|
InputOutput,
|
|
IsDir,
|
|
SystemResources,
|
|
Unseekable,
|
|
} || Errno;
|
|
|
|
pub fn read(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: ReadError!usize,
|
|
) void,
|
|
completion: *Completion,
|
|
fd: os.fd_t,
|
|
buffer: []u8,
|
|
offset: ?u64,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const ReadError!usize, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.read = .{
|
|
.fd = fd,
|
|
.buffer = buffer,
|
|
// pread is irrelevant here
|
|
.offset = offset orelse 0,
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const RecvError = error{
|
|
WouldBlock,
|
|
FileDescriptorInvalid,
|
|
ConnectionRefused,
|
|
SystemResources,
|
|
SocketNotConnected,
|
|
FileDescriptorNotASocket,
|
|
ConnectionResetByPeer,
|
|
} || Errno;
|
|
|
|
pub fn recv(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: RecvError!usize,
|
|
) void,
|
|
completion: *Completion,
|
|
socket: os.socket_t,
|
|
buffer: []u8,
|
|
) void {
|
|
if (features.replace_recv_with_readv) {
|
|
readev(self, Context, context, callback, completion, socket, buffer);
|
|
return;
|
|
}
|
|
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const RecvError!usize, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.recv = .{
|
|
.socket = socket,
|
|
.buffer = buffer,
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub fn readev(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: RecvError!usize,
|
|
) void,
|
|
completion: *Completion,
|
|
socket: os.socket_t,
|
|
buffer: []u8,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const RecvError!usize, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.readev = .{
|
|
.socket = socket,
|
|
.iovecs = .{.{ .iov_base = buffer.ptr, .iov_len = buffer.len }},
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const SendError = error{
|
|
AccessDenied,
|
|
WouldBlock,
|
|
FastOpenAlreadyInProgress,
|
|
AddressFamilyNotSupported,
|
|
FileDescriptorInvalid,
|
|
ConnectionResetByPeer,
|
|
MessageTooBig,
|
|
SystemResources,
|
|
SocketNotConnected,
|
|
FileDescriptorNotASocket,
|
|
OperationNotSupported,
|
|
BrokenPipe,
|
|
} || Errno;
|
|
|
|
pub fn send(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: SendError!usize,
|
|
) void,
|
|
completion: *Completion,
|
|
socket: os.socket_t,
|
|
buffer: []const u8,
|
|
_: u32,
|
|
) void {
|
|
if (features.replace_send_with_writev) {
|
|
writev(self, Context, context, callback, completion, socket, buffer, 0);
|
|
return;
|
|
}
|
|
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const SendError!usize, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.send = .{
|
|
.socket = socket,
|
|
.buffer = buffer,
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const OpenError = error{
|
|
/// In WASI, this error may occur when the file descriptor does
|
|
/// not hold the required rights to open a new resource relative to it.
|
|
AccessDenied,
|
|
SymLinkLoop,
|
|
ProcessFdQuotaExceeded,
|
|
SystemFdQuotaExceeded,
|
|
NoDevice,
|
|
FileNotFound,
|
|
|
|
/// The path exceeded `MAX_PATH_BYTES` bytes.
|
|
NameTooLong,
|
|
|
|
/// Insufficient kernel memory was available, or
|
|
/// the named file is a FIFO and per-user hard limit on
|
|
/// memory allocation for pipes has been reached.
|
|
SystemResources,
|
|
|
|
/// The file is too large to be opened. This error is unreachable
|
|
/// for 64-bit targets, as well as when opening directories.
|
|
FileTooBig,
|
|
|
|
/// The path refers to directory but the `O.DIRECTORY` flag was not provided.
|
|
IsDir,
|
|
|
|
/// A new path cannot be created because the device has no room for the new file.
|
|
/// This error is only reachable when the `O.CREAT` flag is provided.
|
|
NoSpaceLeft,
|
|
|
|
/// A component used as a directory in the path was not, in fact, a directory, or
|
|
/// `O.DIRECTORY` was specified and the path was not a directory.
|
|
NotDir,
|
|
|
|
/// The path already exists and the `O.CREAT` and `O.EXCL` flags were provided.
|
|
PathAlreadyExists,
|
|
DeviceBusy,
|
|
|
|
/// The underlying filesystem does not support file locks
|
|
FileLocksNotSupported,
|
|
|
|
BadPathName,
|
|
InvalidUtf8,
|
|
|
|
/// One of these three things:
|
|
/// * pathname refers to an executable image which is currently being
|
|
/// executed and write access was requested.
|
|
/// * pathname refers to a file that is currently in use as a swap
|
|
/// file, and the O_TRUNC flag was specified.
|
|
/// * pathname refers to a file that is currently being read by the
|
|
/// kernel (e.g., for module/firmware loading), and write access was
|
|
/// requested.
|
|
FileBusy,
|
|
|
|
WouldBlock,
|
|
} || Errno;
|
|
|
|
pub fn open(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: OpenError!linux.fd_t,
|
|
) void,
|
|
completion: *Completion,
|
|
path: [:0]const u8,
|
|
flags: os.mode_t,
|
|
mode: os.mode_t,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const OpenError!linux.fd_t, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.open = .{
|
|
.path = path,
|
|
.flags = @intCast(u32, flags),
|
|
.mode = mode,
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub fn writev(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: SendError!usize,
|
|
) void,
|
|
completion: *Completion,
|
|
socket: os.socket_t,
|
|
buffer: []const u8,
|
|
_: u32,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const SendError!usize, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.writev = .{
|
|
.socket = socket,
|
|
.iovecs = .{
|
|
.{ .iov_base = buffer.ptr, .iov_len = buffer.len },
|
|
},
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const TimeoutError = error{Canceled} || Errno;
|
|
|
|
pub fn timeout(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: TimeoutError!void,
|
|
) void,
|
|
completion: *Completion,
|
|
nanoseconds: u63,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const TimeoutError!void, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.timeout = .{
|
|
.timespec = .{ .tv_sec = 0, .tv_nsec = nanoseconds },
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
pub const WriteError = error{
|
|
WouldBlock,
|
|
NotOpenForWriting,
|
|
NotConnected,
|
|
DiskQuota,
|
|
FileTooBig,
|
|
Alignment,
|
|
InputOutput,
|
|
NoSpaceLeft,
|
|
Unseekable,
|
|
AccessDenied,
|
|
BrokenPipe,
|
|
} || Errno;
|
|
|
|
pub fn write(
|
|
self: *IO,
|
|
comptime Context: type,
|
|
context: Context,
|
|
comptime callback: *const fn (
|
|
context: Context,
|
|
completion: *Completion,
|
|
result: WriteError!usize,
|
|
) void,
|
|
completion: *Completion,
|
|
fd: os.fd_t,
|
|
buffer: []const u8,
|
|
offset: u64,
|
|
) void {
|
|
completion.* = .{
|
|
.io = self,
|
|
.context = context,
|
|
.callback = struct {
|
|
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
|
|
callback(
|
|
@intToPtr(Context, @ptrToInt(ctx)),
|
|
comp,
|
|
@intToPtr(*const WriteError!usize, @ptrToInt(res)).*,
|
|
);
|
|
}
|
|
}.wrapper,
|
|
.operation = .{
|
|
.write = .{
|
|
.fd = fd,
|
|
.buffer = buffer,
|
|
.offset = offset,
|
|
},
|
|
},
|
|
};
|
|
self.enqueueNew(completion);
|
|
}
|
|
|
|
inline fn enqueueNew(self: *IO, completion: *Completion) void {
|
|
self.enqueue(completion);
|
|
}
|
|
|
|
pub fn wake(self: *IO, completion: *Completion) void {
|
|
self.enqueue(completion);
|
|
}
|
|
|
|
const SocketError = error{
|
|
AddressFamilyNotSupported,
|
|
ProtocolFamilyNotAvailable,
|
|
ProcessFdQuotaExceeded,
|
|
SystemFdQuotaExceeded,
|
|
SystemResources,
|
|
ProtocolNotSupported,
|
|
SocketTypeNotSupported,
|
|
PermissionDenied,
|
|
} || Errno;
|
|
|
|
const Syscall = struct {
|
|
pub fn socket(domain: u32, socket_type: u32, protocol: u32) SocketError!os.socket_t {
|
|
const rc = linux.socket(domain, socket_type, protocol);
|
|
return switch (linux.getErrno((rc))) {
|
|
.SUCCESS => @intCast(os.fd_t, rc),
|
|
.ACCES => return error.PermissionDenied,
|
|
.AFNOSUPPORT => return error.AddressFamilyNotSupported,
|
|
.INVAL => return error.ProtocolFamilyNotAvailable,
|
|
.MFILE => return error.ProcessFdQuotaExceeded,
|
|
.NFILE => return error.SystemFdQuotaExceeded,
|
|
.NOBUFS => return error.SystemResources,
|
|
.NOMEM => return error.SystemResources,
|
|
.PROTONOSUPPORT => return error.ProtocolNotSupported,
|
|
.PROTOTYPE => return error.SocketTypeNotSupported,
|
|
else => |err| return asError(@enumToInt(err)),
|
|
};
|
|
}
|
|
};
|
|
|
|
pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
|
|
return Syscall.socket(family, sock_type | os.SOCK.CLOEXEC | os.SOCK.NONBLOCK, protocol);
|
|
}
|
|
|
|
pub var global: IO = undefined;
|
|
pub var global_loaded: bool = false;
|
|
|
|
fn buffer_limit(buffer_len: usize) usize {
|
|
|
|
// Linux limits how much may be written in a `pwrite()/pread()` call, which is `0x7ffff000` on
|
|
// both 64-bit and 32-bit systems, due to using a signed C int as the return value, as well as
|
|
// stuffing the errno codes into the last `4096` values.
|
|
// Darwin limits writes to `0x7fffffff` bytes, more than that returns `EINVAL`.
|
|
// The corresponding POSIX limit is `std.math.maxInt(isize)`.
|
|
const limit = switch (@import("builtin").target.os.tag) {
|
|
.linux => 0x7ffff000,
|
|
.macos, .ios, .watchos, .tvos => std.math.maxInt(i32),
|
|
else => std.math.maxInt(isize),
|
|
};
|
|
return @min(limit, buffer_len);
|
|
}
|