Fallback to readev / writev

This commit is contained in:
Jarred SUmner
2022-01-24 23:30:23 -08:00
parent ecea12d206
commit 0e138bcc8f

View File

@@ -1,5 +1,6 @@
const std = @import("std");
const assert = std.debug.assert;
const Platform = @import("analytics").GenerateHeader.GeneratePlatform;
const os = struct {
pub usingnamespace std.os;
pub const EPERM = 1;
@@ -141,6 +142,16 @@ const os = struct {
pub const EHWPOISON = 133;
};
pub const pretend_older_kernel = false;
const Features = struct {
connect_poll: bool = pretend_older_kernel,
close_poll: bool = pretend_older_kernel,
replace_recv_with_readv: bool = pretend_older_kernel,
replace_send_with_writev: bool = pretend_older_kernel,
};
var features = Features{};
pub const Errno = error{
EPERM,
ENOENT,
@@ -444,6 +455,26 @@ completed: FIFO(Completion) = .{},
pub fn init(entries_: u12, flags: u32) !IO {
var ring: IO_Uring = undefined;
var entries = entries_;
const kernel = Platform.kernelVersion();
if (kernel.orderWithoutTag(@TypeOf(kernel){ .major = 5, .minor = 6, .patch = 0 }) == .lt) {
features.close_poll = true;
features.connect_poll = true;
features.replace_recv_with_readv = true;
features.replace_send_with_writev = true;
}
var limit = linux.rlimit{ .cur = 0, .max = 0 };
if (linux.getrlimit(.MEMLOCK, &limit) == 0) {
if (limit.cur < 16 * 1024) {
return error.@"memlock is too low. Please increase it to at least 64k";
}
if (limit.cur < 128 * 1024) {
entries = @minimum(256, entries);
}
}
while (true) {
ring = IO_Uring.init(entries, flags) catch |err| {
if (err == error.SystemResources) {
@@ -640,6 +671,12 @@ pub const Completion = struct {
op.address.getOsSockLen(),
);
},
.close_poll => |op| {
linux.io_uring_prep_poll_add(sqe, op.fd, linux.POLL.HUP | linux.POLL.IN | linux.POLL.OUT);
},
.connect_poll => |*op| {
linux.io_uring_prep_poll_add(sqe, op.socket, linux.POLL.HUP | linux.POLL.OUT);
},
.fsync => |op| {
linux.io_uring_prep_fsync(sqe, op.fd, 0);
},
@@ -651,6 +688,14 @@ pub const Completion = struct {
op.offset,
);
},
.readev => {
var op = &completion.operation.readev;
linux.io_uring_prep_readv(sqe, op.socket, &op.iovecs, 0);
},
.writev => {
var op = &completion.operation.writev;
linux.io_uring_prep_writev(sqe, op.socket, &op.iovecs, 0);
},
.recv => |op| {
linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL);
},
@@ -696,6 +741,21 @@ pub const Completion = struct {
} else @intCast(os.socket_t, completion.result);
completion.callback(completion.context, completion, &result);
},
.close_poll => {
var op = &completion.operation.close_poll;
const rc = linux.close(op.fd);
completion.result = @intCast(i32, rc);
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
os.EBADF => error.FileDescriptorInvalid,
os.EDQUOT => error.DiskQuota,
os.EIO => error.InputOutput,
os.ENOSPC => error.NoSpaceLeft,
else => |errno| asError(errno),
} else assert(completion.result == 0);
completion.callback(completion.context, completion, &result);
},
.close => {
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
@@ -707,6 +767,35 @@ pub const Completion = struct {
} else assert(completion.result == 0);
completion.callback(completion.context, completion, &result);
},
.connect_poll => {
var op = &completion.operation.connect_poll;
const rc = linux.connect(op.socket, &op.address.any, op.address.getOsSockLen());
completion.result = @intCast(i32, rc);
const result = if (completion.result < 0) switch (-completion.result) {
os.EAGAIN, os.EWOULDBLOCK, os.EINPROGRESS, os.EINTR => {
completion.io.enqueue(completion);
return;
},
os.EACCES => error.AccessDenied,
os.EADDRINUSE => error.AddressInUse,
os.EADDRNOTAVAIL => error.AddressNotAvailable,
os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
os.EALREADY => error.OpenAlreadyInProgress,
os.EBADF => error.FileDescriptorInvalid,
os.ECONNREFUSED => error.ConnectionRefused,
os.ECONNRESET => error.ConnectionResetByPeer,
os.EISCONN => error.AlreadyConnected,
os.ENETUNREACH => error.NetworkUnreachable,
os.ENOENT => error.FileNotFound,
os.ENOTSOCK => error.FileDescriptorNotASocket,
os.EPERM => error.PermissionDenied,
os.EPROTOTYPE => error.ProtocolNotSupported,
os.ETIMEDOUT => error.ConnectionTimedOut,
else => |errno| asError(errno),
} else assert(completion.result == 0);
completion.callback(completion.context, completion, &result);
},
.connect => {
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {
@@ -770,7 +859,7 @@ pub const Completion = struct {
} else @intCast(usize, completion.result);
completion.callback(completion.context, completion, &result);
},
.recv => {
.readev, .recv => {
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {
completion.io.enqueue(completion);
@@ -787,7 +876,7 @@ pub const Completion = struct {
} else @intCast(usize, completion.result);
completion.callback(completion.context, completion, &result);
},
.send => {
.writev, .send => {
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {
completion.io.enqueue(completion);
@@ -859,10 +948,17 @@ const Operation = union(enum) {
close: struct {
fd: os.fd_t,
},
close_poll: struct {
fd: os.fd_t,
},
connect: struct {
socket: os.socket_t,
address: std.net.Address,
},
connect_poll: struct {
socket: os.socket_t,
address: std.net.Address,
},
fsync: struct {
fd: os.fd_t,
},
@@ -871,6 +967,14 @@ const Operation = union(enum) {
buffer: []u8,
offset: u64,
},
readev: struct {
socket: os.socket_t,
iovecs: [1]os.iovec,
},
writev: struct {
socket: os.socket_t,
iovecs: [1]os.iovec_const,
},
recv: struct {
socket: os.socket_t,
buffer: []u8,
@@ -969,10 +1073,25 @@ pub fn close(
);
}
}.wrapper,
.operation = .{
.operation = if (features.close_poll) .{
.close_poll = .{ .fd = fd },
} else .{
.close = .{ .fd = fd },
},
};
if (features.close_poll) {
const rc = linux.close(fd);
switch (linux.getErrno(rc)) {
.AGAIN, .INPROGRESS, .INTR => {},
else => {
completion.result = @intCast(i32, rc);
self.completed.push(completion);
return;
},
}
}
self.enqueue(completion);
}
@@ -1019,13 +1138,30 @@ pub fn connect(
);
}
}.wrapper,
.operation = .{
.operation = if (features.connect_poll) .{
.connect_poll = .{
.socket = socket,
.address = address,
},
} else .{
.connect = .{
.socket = socket,
.address = address,
},
},
};
if (features.connect_poll) {
const rc = linux.connect(socket, &address.any, address.getOsSockLen());
switch (linux.getErrno(rc)) {
.AGAIN, .INPROGRESS, .INTR => {},
else => {
completion.result = @intCast(i32, rc);
self.completed.push(completion);
return;
},
}
}
self.enqueue(completion);
}
@@ -1141,6 +1277,11 @@ pub fn recv(
socket: os.socket_t,
buffer: []u8,
) void {
if (features.replace_recv_with_readv) {
readev(self, Context, context, callback, completion, socket, buffer);
return;
}
completion.* = .{
.io = self,
.context = context,
@@ -1163,6 +1304,41 @@ pub fn recv(
self.enqueue(completion);
}
pub fn readev(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: RecvError!usize,
) void,
completion: *Completion,
socket: os.socket_t,
buffer: []u8,
) void {
completion.* = .{
.io = self,
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@intToPtr(*const RecvError!usize, @ptrToInt(res)).*,
);
}
}.wrapper,
.operation = .{
.readev = .{
.socket = socket,
.iovecs = .{.{ .iov_base = buffer.ptr, .iov_len = buffer.len }},
},
},
};
self.enqueue(completion);
}
pub const SendError = error{
AccessDenied,
WouldBlock,
@@ -1192,6 +1368,11 @@ pub fn send(
buffer: []const u8,
_: u32,
) void {
if (features.replace_send_with_writev) {
writev(self, Context, context, callback, completion, socket, buffer, 0);
return;
}
completion.* = .{
.io = self,
.context = context,
@@ -1214,6 +1395,44 @@ pub fn send(
self.enqueue(completion);
}
pub fn writev(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: SendError!usize,
) void,
completion: *Completion,
socket: os.socket_t,
buffer: []const u8,
_: u32,
) void {
completion.* = .{
.io = self,
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
callback(
@intToPtr(Context, @ptrToInt(ctx)),
comp,
@intToPtr(*const SendError!usize, @ptrToInt(res)).*,
);
}
}.wrapper,
.operation = .{
.writev = .{
.socket = socket,
.iovecs = .{
.{ .iov_base = buffer.ptr, .iov_len = buffer.len },
},
},
},
};
self.enqueue(completion);
}
pub const TimeoutError = error{Canceled} || Errno;
pub fn timeout(
@@ -1314,7 +1533,7 @@ const SocketError = error{
const Syscall = struct {
pub fn socket(domain: u32, socket_type: u32, protocol: u32) SocketError!os.socket_t {
const rc = linux.socket(domain, socket_type, protocol);
return switch (linux.getErrno(rc)) {
return switch (linux.getErrno((rc))) {
.SUCCESS => @intCast(os.fd_t, rc),
.ACCES => return error.PermissionDenied,
.AFNOSUPPORT => return error.AddressFamilyNotSupported,
@@ -1325,13 +1544,13 @@ const Syscall = struct {
.NOMEM => return error.SystemResources,
.PROTONOSUPPORT => return error.ProtocolNotSupported,
.PROTOTYPE => return error.SocketTypeNotSupported,
else => |err| return asError(err),
else => |err| return asError(@enumToInt(err)),
};
}
};
pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
return Syscall.socket(family, sock_type, protocol);
return Syscall.socket(family, sock_type | os.O.NONBLOCK | os.O.CLOEXEC, protocol);
}
pub var global: IO = undefined;