From 489ff14e4c8a9f6bdb497b4ae2288fe93010754d Mon Sep 17 00:00:00 2001 From: cirospaciari Date: Fri, 26 Jan 2024 12:00:36 -0300 Subject: [PATCH] WIP mixins --- src/bun.js/api/bun/subprocess.zig | 49 +++++---- src/bun.js/ipc.zig | 93 ++++++++-------- src/bun.js/javascript.zig | 15 +-- src/bun.js/webcore/blob.zig | 9 +- src/deps/libuv.zig | 172 ++++++++++++++++++++++++++++-- src/sys.zig | 8 ++ 6 files changed, 253 insertions(+), 93 deletions(-) diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 49f0fac92e..b2cfc0684e 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -411,7 +411,7 @@ pub const Subprocess = struct { .pipe => { if (this.pipe == .buffer) { if (Environment.isWindows) { - uv.uv_ref(@ptrCast(&this.pipe.buffer.stream)); + this.pipe.buffer.stream.unref(); return; } if (this.pipe.buffer.stream.poll_ref) |poll| { @@ -428,7 +428,7 @@ pub const Subprocess = struct { .pipe => { if (this.pipe == .buffer) { if (Environment.isWindows) { - uv.uv_unref(@ptrCast(&this.pipe.buffer.stream)); + this.pipe.buffer.stream.unref(); return; } if (this.pipe.buffer.stream.poll_ref) |poll| { @@ -554,7 +554,7 @@ pub const Subprocess = struct { switch (this.*) { .pipe => { if (Environment.isWindows) { - if (uv.uv_is_closed(@ptrCast(this.pipe.buffer.stream))) { + if (this.pipe.buffer.stream.isClosed()) { return false; } this.pipe.buffer.closeCallback = callback; @@ -958,7 +958,7 @@ pub const Subprocess = struct { if (this.pipe) |pipe| { pipe.data = this; - _ = uv.uv_close(@ptrCast(pipe), BufferedPipeInput.uvClosedCallback); + pipe.close(BufferedPipeInput.uvClosedCallback); } } @@ -1242,7 +1242,7 @@ pub const Subprocess = struct { } } - fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { + fn uvStreamReadCallback(handle: *uv.uv_stream_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { const this: *BufferedOutput = @ptrCast(@alignCast(handle.data)); if (nread <= 0) { switch (nread) { @@ -1252,7 +1252,7 @@ pub const Subprocess = struct { }, uv.UV_EOF => { this.status = .{ .done = {} }; - _ = uv.uv_read_stop(@ptrCast(handle)); + handle.readStop(); this.flushBufferedDataIntoReadableStream(); }, else => { @@ -1261,7 +1261,7 @@ pub const Subprocess = struct { }; const err = rt.errEnum() orelse bun.C.E.CANCELED; this.status = .{ .err = bun.sys.Error.fromCode(err, .read) }; - _ = uv.uv_read_stop(@ptrCast(handle)); + handle.readStop(); this.signalStreamError(); }, } @@ -1274,7 +1274,7 @@ pub const Subprocess = struct { this.flushBufferedDataIntoReadableStream(); } - fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { + fn uvStreamAllocCallback(handle: *uv.uv_stream_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { const this: *BufferedOutput = @ptrCast(@alignCast(handle.data)); var size: usize = 0; var available = this.internal_buffer.available(); @@ -1296,7 +1296,7 @@ pub const Subprocess = struct { } buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(size) }; if (size == 0) { - _ = uv.uv_read_stop(@ptrCast(@alignCast(handle))); + handle.readStop(); this.status = .{ .done = {} }; } } @@ -1305,7 +1305,7 @@ pub const Subprocess = struct { if (Environment.isWindows) { if (this.status == .pending) { this.stream.data = this; - _ = uv.uv_read_start(@ptrCast(this.stream), BufferedOutput.uvStreamAllocCallback, BufferedOutput.uvStreamReadCallback); + _ = this.stream.readStart(BufferedOutput.uvStreamAllocCallback, BufferedOutput.uvStreamReadCallback); } return; } @@ -1557,12 +1557,12 @@ pub const Subprocess = struct { .pending => { if (Environment.isWindows) { needCallbackCall = false; - _ = uv.uv_read_stop(@ptrCast(&this.stream)); - if (uv.uv_is_closed(@ptrCast(&this.stream))) { + this.stream.readStop(); + if (this.stream.isClosed()) { this.readable_stream_ref.deinit(); this.closeCallback.run(); } else { - _ = uv.uv_close(@ptrCast(&this.stream), BufferedOutput.uvClosedCallback); + this.stream.close(BufferedOutput.uvClosedCallback); } } else { this.stream.close(); @@ -1602,7 +1602,9 @@ pub const Subprocess = struct { switch (this.*) { .pipe => { if (Environment.isWindows) { - _ = uv.uv_ref(@ptrCast(this.pipe.stream)); + if (this.pipe.stream) |stream| { + stream.unref(); + } } else if (this.pipe.poll_ref) |poll| { poll.enableKeepingProcessAlive(JSC.VirtualMachine.get()); } @@ -1615,7 +1617,9 @@ pub const Subprocess = struct { switch (this.*) { .pipe => { if (Environment.isWindows) { - _ = uv.uv_unref(@ptrCast(this.pipe.stream)); + if (this.pipe.stream) |stream| { + stream.unref(); + } } else if (this.pipe.poll_ref) |poll| { poll.disableKeepingProcessAlive(JSC.VirtualMachine.get()); } @@ -2259,11 +2263,9 @@ pub const Subprocess = struct { }; subprocess.ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) }; if (ipc_mode != .none) { - const errno = subprocess.ipc.configureServer(Subprocess, subprocess, pipe_name_bytes); - if (errno != 0) { + if (subprocess.ipc.configureServer(Subprocess, subprocess, pipe_name_bytes).asErr()) |err| { alloc.destroy(subprocess); - globalThis.throwValue(bun.sys.Error.fromCodeInt(errno, .uv_spawn).toJSC(globalThis)); - return .zero; + globalThis.throwValue(err.toJSC(globalThis)); } } @@ -3203,14 +3205,11 @@ pub const Subprocess = struct { ) !uv.uv_stdio_container_s { return switch (stdio) { .array_buffer, .blob, .pipe => { - if (uv.uv_pipe_init(uv.Loop.get(), pipe, 0) != 0) { - return error.FailedToCreatePipe; - } + try pipe.init(uv.Loop.get(), false).unwrap(); + if (fd != bun.invalid_fd) { // we receive a FD so we open this into our pipe - if (uv.uv_pipe_open(pipe, bun.uvfdcast(fd)).errEnum()) |_| { - return error.FailedToCreatePipe; - } + try pipe.open(bun.uvfdcast(fd)).unwrap(); return uv.uv_stdio_container_s{ .flags = @intCast(uv.UV_INHERIT_STREAM), .data = .{ .stream = @ptrCast(pipe) }, diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 8498dd6b0a..21c302a26f 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -11,6 +11,7 @@ const Allocator = std.mem.Allocator; const JSC = @import("root").bun.JSC; const JSValue = JSC.JSValue; const JSGlobalObject = JSC.JSGlobalObject; +const Maybe = JSC.Maybe; pub const log = Output.scoped(.IPC, false); @@ -162,11 +163,12 @@ const NamedPipeIPCData = struct { pipe: uv.uv_pipe_t, incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well outgoing: IPCBuffer = .{}, + current_payload_len: usize = 0, + connected: bool = false, has_written_version: if (Environment.allow_assert) u1 else u0 = 0, connect_req: uv.uv_connect_t = std.mem.zeroes(uv.uv_connect_t), server: uv.uv_pipe_t = std.mem.zeroes(uv.uv_pipe_t), - current_payload_len: usize = 0, pub fn processSend(this: *NamedPipeIPCData) void { const bytes = this.outgoing.list.slice()[this.outgoing.cursor..]; @@ -178,16 +180,16 @@ const NamedPipeIPCData = struct { req.write_buffer = uv.uv_buf_t.init(bytes); log("processSend write_buffer {d}", .{req.write_buffer.len}); this.current_payload_len = bytes.len; - const write_err = uv.uv_write(req, @ptrCast(&this.pipe), @ptrCast(&req.write_buffer), 1, NamedPipeIPCData.uvWriteCallback).int(); + const write_err = uv.uv_write(req, @ptrCast(&this.pipe), @ptrCast(&req.write_buffer), 1, NamedPipeIPCData.onWriteCallback).int(); if (write_err < 0) { Output.printErrorln("Failed write IPC version", .{}); return; } } - fn uvWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void { + fn onWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void { const this = bun.cast(*NamedPipeIPCData, req.data); - log("uvWriteCallback {d} {d} {d}", .{ status.int(), this.current_payload_len, this.outgoing.list.len }); + log("onWriteCallback {d} {d} {d}", .{ status.int(), this.current_payload_len, this.outgoing.list.len }); defer bun.destroy(req); if (status.errEnum()) |_| { Output.printErrorln("Failed write IPC data", .{}); @@ -258,54 +260,41 @@ const NamedPipeIPCData = struct { pub fn close(this: *NamedPipeIPCData, comptime Context: type) void { if (this.server.loop != null) { - _ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onServerClose); + this.server.close(NewNamedPipeIPCHandler(Context).onServerClose); } else { - _ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onClose); + this.pipe.close(NewNamedPipeIPCHandler(Context).onClose); } } - pub fn configureServer(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int { + pub fn configureServer(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) Maybe(void) { log("configureServer", .{}); const ipc_pipe = &this.server; - var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 0); - if (errno != 0) { - return errno; + if (ipc_pipe.init(uv.Loop.get(), false).asErr()) |err| { + return .{ .err = err }; } ipc_pipe.data = @ptrCast(instance); - errno = uv.uv_pipe_bind2(ipc_pipe, named_pipe.ptr, named_pipe.len, 0); - if (errno != 0) { - return errno; - } - errno = uv.uv_listen(@ptrCast(ipc_pipe), 0, NewNamedPipeIPCHandler(Context).onNewClientConnect); - if (errno != 0) { - return errno; + if (ipc_pipe.listenNamedPipe(named_pipe, 0, NewNamedPipeIPCHandler(Context).onNewClientConnect).asErr()) |err| { + return .{ .err = err }; } - uv.uv_pipe_pending_instances(ipc_pipe, 1); + ipc_pipe.setPendingInstances(1); - uv.uv_unref(@ptrCast(ipc_pipe)); + ipc_pipe.unref(); this.writeVersionPacket(); - return 0; + return .{ .result = {} }; } - pub fn configureClient(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int { + pub fn configureClient(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) !void { log("configureClient", .{}); const ipc_pipe = &this.pipe; - var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 1); - if (errno != 0) { - return errno; - } + try ipc_pipe.init(uv.Loop.get(), true).unwrap(); ipc_pipe.data = @ptrCast(instance); this.connect_req.data = @ptrCast(instance); - errno = uv.uv_pipe_connect2(&this.connect_req, ipc_pipe, named_pipe.ptr, named_pipe.len, 0, NewNamedPipeIPCHandler(Context).onConnect); - if (errno != 0) { - return errno; - } + try ipc_pipe.connect(&this.connect_req, named_pipe, NewNamedPipeIPCHandler(Context).onConnect).unwrap(); this.writeVersionPacket(); - return 0; } }; @@ -467,7 +456,7 @@ fn NewSocketIPCHandler(comptime Context: type) type { fn NewNamedPipeIPCHandler(comptime Context: type) type { const uv = bun.windows.libuv; return struct { - fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { + fn onStreamAlloc(handle: *uv.uv_stream_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { const this: *Context = @ptrCast(@alignCast(handle.data)); var available = this.ipc.incoming.available(); @@ -475,12 +464,12 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { this.ipc.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size) catch bun.outOfMemory(); available = this.ipc.incoming.available(); } - log("uvStreamAllocCallback {d}", .{suggested_size}); + log("onStreamAlloc {d}", .{suggested_size}); buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(suggested_size) }; } - fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { - log("uvStreamReadCallback {d}", .{nread}); + fn onRead(handle: *uv.uv_stream_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { + log("onRead {d}", .{nread}); const this: *Context = @ptrCast(@alignCast(handle.data)); if (nread <= 0) { switch (nread) { @@ -489,11 +478,11 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { return; }, uv.UV_EOF => { - _ = uv.uv_read_stop(@ptrCast(handle)); + handle.readStop(); this.ipc.close(Context); }, else => { - _ = uv.uv_read_stop(@ptrCast(handle)); + handle.readStop(); this.ipc.close(Context); }, } @@ -554,22 +543,26 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { const this = bun.cast(*Context, req.data); const client = &this.ipc.pipe; const server = &this.ipc.server; - if (uv.uv_pipe_init(uv.Loop.get(), client, 1) != 0) { + client.init(uv.Loop.get(), true).unwrap() catch { Output.printErrorln("Failed to connect IPC pipe", .{}); return; - } + }; client.data = server.data; - if (uv.uv_accept(@ptrCast(server), @ptrCast(client)) == 0) { - if (this.ipc.connected) { + switch (server.accept(client)) { + .err => { this.ipc.close(Context); return; - } - this.ipc.connected = true; - _ = uv.uv_read_start(@ptrCast(client), uvStreamAllocCallback, uvStreamReadCallback); - this.ipc.processSend(); - } else { - this.ipc.close(Context); + }, + .result => { + this.ipc.connected = true; + client.readStart(onStreamAlloc, onRead).unwrap() catch { + this.ipc.close(Context); + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + }; + this.ipc.processSend(); + }, } } pub fn onConnect(req: *uv.uv_connect_t, status: c_int) callconv(.C) void { @@ -579,7 +572,11 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { return; } const this = bun.cast(*Context, req.data); - _ = uv.uv_read_start(@ptrCast(&this.ipc.pipe), uvStreamAllocCallback, uvStreamReadCallback); + this.ipc.pipe.readStart(onStreamAlloc, onRead).unwrap() catch { + this.ipc.close(Context); + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + }; this.ipc.connected = true; this.ipc.processSend(); } @@ -595,7 +592,7 @@ fn NewNamedPipeIPCHandler(comptime Context: type) type { const event = bun.cast(*uv.uv_pipe_t, handler); const this = bun.cast(*Context, event.data); if (this.ipc.server.loop != null) { - _ = uv.uv_close(@ptrCast(&this.ipc.server), onServerClose); + this.ipc.server.close(onServerClose); return; } this.handleIPCClose(); diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index ff28f6bb02..82d3843a54 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -3016,6 +3016,8 @@ pub const VirtualMachine = struct { ipc: IPC.IPCData, + pub usingnamespace bun.New(@This()); + pub fn handleIPCMessage( this: *IPCInstance, message: IPC.DecodedIPCMessage, @@ -3057,16 +3059,17 @@ pub const VirtualMachine = struct { this.event_loop.ensureWaker(); if (Environment.isWindows) { - var instance = bun.default_allocator.create(IPCInstance) catch bun.outOfMemory(); - instance.* = .{ + var instance = IPCInstance.new(.{ .globalThis = this.global, .context = 0, .ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) }, + }); + instance.ipc.configureClient(IPCInstance, instance, source) catch { + instance.destroy(); + Output.printErrorln("Unable to start IPC pipe", .{}); + return; }; - const errno = instance.ipc.configureClient(IPCInstance, instance, source); - if (errno != 0) { - @panic("Unable to start IPC"); - } + this.ipc = instance; return; } diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig index 85a7dc2c87..0489803cfb 100644 --- a/src/bun.js/webcore/blob.zig +++ b/src/bun.js/webcore/blob.zig @@ -2959,17 +2959,18 @@ pub const Blob = struct { var pipe_ptr = &(this.store.?.data.file.pipe); if (store.data.file.pipe.loop == null) { - if (libuv.uv_pipe_init(libuv.Loop.get(), pipe_ptr, 0) != 0) { + pipe_ptr.init(libuv.Loop.get(), false).unwrap() catch { pipe_ptr.loop = null; globalThis.throwInvalidArguments("Failed to create UVStreamSink", .{}); return JSValue.jsUndefined(); - } + }; + const file_fd = bun.uvfdcast(fd); - if (libuv.uv_pipe_open(pipe_ptr, file_fd).errEnum()) |err| { + pipe_ptr.open(file_fd).unwrap() catch |err| { pipe_ptr.loop = null; globalThis.throwInvalidArguments("Failed to create UVStreamSink: uv_pipe_open({d}) {}", .{ file_fd, err }); return JSValue.jsUndefined(); - } + }; } var sink = JSC.WebCore.UVStreamSink.init(globalThis.allocator(), @ptrCast(pipe_ptr), null) catch |err| { diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index bdb3eb3dfa..6b82d6dc1d 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -1,4 +1,5 @@ const bun = @import("root").bun; +const Maybe = bun.JSC.Maybe; const WORD = c_ushort; const LARGE_INTEGER = i64; @@ -279,7 +280,6 @@ pub const UV_IF_NAMESIZE = @as(c_int, 16) + @as(c_int, 1); pub const uv__queue = struct_uv__queue; pub const uv_req_s = struct_uv_req_s; -pub const uv_handle_s = Handle; pub const uv_prepare_s = struct_uv_prepare_s; pub const uv_check_s = struct_uv_check_s; pub const uv_idle_s = struct_uv_idle_s; @@ -438,6 +438,10 @@ fn HandleMixin(comptime Type: type) type { pub fn isActive(this: *const Type) bool { return uv_is_active(@ptrCast(this)) != 0; } + + pub fn isClosed(this: *const Type) bool { + return uv_is_closed(@ptrCast(this)); + } }; } @@ -458,7 +462,104 @@ fn ReqMixin(comptime Type: type) type { } }; } -pub const uv_handle_t = Handle; + +// https://docs.libuv.org/en/v1.x/stream.html +fn StreamMixin(comptime Type: type) type { + return struct { + + // pub extern fn uv_write(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, cb: uv_write_cb) ReturnCode; + // pub extern fn uv_write2(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t, cb: uv_write_cb) ReturnCode; + pub fn getWriteQueueSize(this: *Type) usize { + return uv_stream_get_write_queue_size(@ptrCast(this)); + } + + pub fn listen(this: *Type, backlog: i32, cb: uv_connection_cb) Maybe(void) { + const rc = uv_listen(@ptrCast(this), backlog, cb); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .listen, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn accept(this: *Type, client: *Type) Maybe(void) { + const rc = uv_accept(@ptrCast(this), @ptrCast(client)); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .accept, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub const stream_read_cb = ?*const fn (*uv_stream_t, isize, *const uv_buf_t) callconv(.C) void; + pub const stream_alloc_cb = ?*const fn (*uv_stream_t, usize, *uv_buf_t) callconv(.C) void; + + pub fn readStart(this: *Type, alloc_cb: stream_alloc_cb, read_cb: stream_read_cb) Maybe(void) { + const rc = uv_read_start(@ptrCast(this), @ptrCast(alloc_cb), @ptrCast(read_cb)); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .listen, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn readStop(this: *Type) void { + // always succeed see https://docs.libuv.org/en/v1.x/stream.html#c.uv_read_stop + _ = uv_read_stop(@ptrCast(this)); + } + + pub fn tryWrite(this: *Type, buffer: *uv_buf_t) Maybe(usize) { + const rc = uv_try_write(@ptrCast(this), @ptrCast(buffer), 1); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .try_write, .from_libuv = true } }; + } + return .{ .result = @intCast(rc.int()) }; + } + + pub fn tryWrite2(this: *Type, buffer: *uv_buf_t, send_handle: *uv_stream_t) ReturnCode { + const rc = uv_try_write2(@ptrCast(this), @ptrCast(buffer), 1, send_handle); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .try_write2, .from_libuv = true } }; + } + return .{ .result = @intCast(rc.int()) }; + } + + pub fn tryWriteMany(this: *Type, buffers: []uv_buf_t) Maybe(usize) { + const rc = uv_try_write(@ptrCast(this), @ptrCast(buffers.ptr), @intCast(buffers.len)); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .try_write, .from_libuv = true } }; + } + return .{ .result = @intCast(rc.int()) }; + } + + pub fn tryWriteMany2(this: *Type, buffers: []uv_buf_t, send_handle: *uv_stream_t) Maybe(usize) { + const rc = uv_try_write(@ptrCast(this), @ptrCast(buffers.ptr), @intCast(buffers.len), send_handle); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .try_write2, .from_libuv = true } }; + } + return .{ .result = @intCast(rc.int()) }; + } + + pub fn isReadable(this: *Type) bool { + return uv_is_readable(@ptrCast(this)) != 0; + } + + pub fn isWritable(this: *@This()) bool { + return uv_is_writable(@ptrCast(this)) != 0; + } + }; +} + +pub const uv_handle_s = extern struct { + data: ?*anyopaque, + loop: ?*uv_loop_t, + type: uv_handle_type, + close_cb: uv_close_cb, + handle_queue: struct_uv__queue, + u: union_unnamed_378, + endgame_next: [*c]uv_handle_t, + flags: c_uint, + + pub usingnamespace HandleMixin(@This()); +}; +pub const uv_handle_t = uv_handle_s; const union_unnamed_375 = extern union { fd: c_int, reserved: [4]?*anyopaque, @@ -895,7 +996,7 @@ const union_unnamed_380 = extern union { }; pub const uv_alloc_cb = ?*const fn (*uv_handle_t, usize, *uv_buf_t) callconv(.C) void; pub const uv_stream_t = struct_uv_stream_s; -/// *uv.uv_handle_t is actually *uv_stream_t, just changed to avoid dependency loop error on Zig +/// *uv_handle_t is actually *uv_stream_t, just changed to avoid dependency loop error on Zig pub const uv_read_cb = ?*const fn (*uv_handle_t, isize, *const uv_buf_t) callconv(.C) void; const struct_unnamed_382 = extern struct { overlapped: OVERLAPPED, @@ -974,6 +1075,9 @@ pub const struct_uv_stream_s = extern struct { activecnt: c_int, read_req: uv_read_t, stream: union_unnamed_384, + + pub usingnamespace HandleMixin(@This()); + pub usingnamespace StreamMixin(@This()); }; const union_unnamed_390 = extern union { fd: c_int, @@ -1216,6 +1320,54 @@ pub const struct_uv_pipe_s = extern struct { handle: HANDLE, name: [*]WCHAR, pipe: union_unnamed_405, + + pub usingnamespace HandleMixin(@This()); + pub usingnamespace StreamMixin(@This()); + + pub fn init(this: *@This(), loop: *Loop, isIPC: bool) Maybe(void) { + @memset(std.mem.asBytes(this), 0); + + const rc = uv_pipe_init(loop, this, if (isIPC) 1 else 0); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .pipe, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn listenNamedPipe(this: *@This(), named_pipe: []const u8, backlog: i32, onClientConnect: uv_connection_cb) Maybe(void) { + if (this.bind(named_pipe, 0).asErr()) |err| { + return .{ .err = err }; + } + return this.listen(backlog, onClientConnect); + } + + pub fn bind(this: *@This(), named_pipe: []const u8, flags: i32) Maybe(void) { + const rc = uv_pipe_bind2(this, named_pipe.ptr, named_pipe.len, @intCast(flags)); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .bind2, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn connect(this: *@This(), req: *uv_connect_t, name: []const u8, cb: uv_connect_cb) Maybe(void) { + const rc = uv_pipe_connect2(req, this, @ptrCast(name.ptr), name.len, 0, cb); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .connect2, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn open(this: *@This(), file: uv_file) Maybe(void) { + const rc = uv_pipe_open(this, file); + if (rc.errno()) |errno| { + return .{ .err = .{ .errno = errno, .syscall = .open, .from_libuv = true } }; + } + return .{ .result = {} }; + } + + pub fn setPendingInstances(this: *@This(), count: i32) void { + uv_pipe_pending_instances(this, count); + } }; pub const uv_pipe_t = struct_uv_pipe_s; const union_unnamed_416 = extern union { @@ -1907,14 +2059,14 @@ pub extern fn uv_buf_init(base: [*]u8, len: c_uint) uv_buf_t; pub extern fn uv_pipe(fds: *[2]uv_file, read_flags: c_int, write_flags: c_int) ReturnCode; pub extern fn uv_socketpair(@"type": c_int, protocol: c_int, socket_vector: [*c]uv_os_sock_t, flags0: c_int, flags1: c_int) c_int; pub extern fn uv_stream_get_write_queue_size(stream: [*c]const uv_stream_t) usize; -pub extern fn uv_listen(stream: [*c]uv_stream_t, backlog: c_int, cb: uv_connection_cb) c_int; -pub extern fn uv_accept(server: [*c]uv_stream_t, client: [*c]uv_stream_t) c_int; -pub extern fn uv_read_start([*c]uv_stream_t, alloc_cb: uv_alloc_cb, read_cb: uv_read_cb) c_int; +pub extern fn uv_listen(stream: [*c]uv_stream_t, backlog: c_int, cb: uv_connection_cb) ReturnCode; +pub extern fn uv_accept(server: [*c]uv_stream_t, client: [*c]uv_stream_t) ReturnCode; +pub extern fn uv_read_start([*c]uv_stream_t, alloc_cb: uv_alloc_cb, read_cb: uv_read_cb) ReturnCode; pub extern fn uv_read_stop([*c]uv_stream_t) c_int; pub extern fn uv_write(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, cb: uv_write_cb) ReturnCode; pub extern fn uv_write2(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t, cb: uv_write_cb) ReturnCode; pub extern fn uv_try_write(handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint) ReturnCode; -pub extern fn uv_try_write2(handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t) c_int; +pub extern fn uv_try_write2(handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, send_handle: *uv_stream_t) ReturnCode; pub extern fn uv_is_readable(handle: *const uv_stream_t) c_int; pub extern fn uv_is_writable(handle: *const uv_stream_t) c_int; pub extern fn uv_stream_set_blocking(handle: *uv_stream_t, blocking: c_int) c_int; @@ -1977,12 +2129,12 @@ pub extern fn uv_tty_get_vterm_state(state: [*c]uv_tty_vtermstate_t) c_int; pub extern fn uv_guess_handle(file: uv_file) uv_handle_type; pub const UV_PIPE_NO_TRUNCATE: c_int = 1; const enum_unnamed_462 = c_uint; -pub extern fn uv_pipe_init(*uv_loop_t, handle: *uv_pipe_t, ipc: c_int) c_int; +pub extern fn uv_pipe_init(*uv_loop_t, handle: *uv_pipe_t, ipc: c_int) ReturnCode; pub extern fn uv_pipe_open([*c]uv_pipe_t, file: uv_file) ReturnCode; pub extern fn uv_pipe_bind(handle: *uv_pipe_t, name: [*]const u8) c_int; -pub extern fn uv_pipe_bind2(handle: *uv_pipe_t, name: [*]const u8, namelen: usize, flags: c_uint) c_int; +pub extern fn uv_pipe_bind2(handle: *uv_pipe_t, name: [*]const u8, namelen: usize, flags: c_uint) ReturnCode; pub extern fn uv_pipe_connect(req: [*c]uv_connect_t, handle: *uv_pipe_t, name: [*]const u8, cb: uv_connect_cb) void; -pub extern fn uv_pipe_connect2(req: [*c]uv_connect_t, handle: *uv_pipe_t, name: [*]const u8, namelen: usize, flags: c_uint, cb: uv_connect_cb) c_int; +pub extern fn uv_pipe_connect2(req: [*c]uv_connect_t, handle: *uv_pipe_t, name: [*]const u8, namelen: usize, flags: c_uint, cb: uv_connect_cb) ReturnCode; pub extern fn uv_pipe_getsockname(handle: *const uv_pipe_t, buffer: [*]u8, size: [*c]usize) c_int; pub extern fn uv_pipe_getpeername(handle: *const uv_pipe_t, buffer: [*]u8, size: [*c]usize) c_int; pub extern fn uv_pipe_pending_instances(handle: *uv_pipe_t, count: c_int) void; diff --git a/src/sys.zig b/src/sys.zig index f978375452..8181c8955e 100644 --- a/src/sys.zig +++ b/src/sys.zig @@ -126,6 +126,14 @@ pub const Tag = enum(u8) { uv_spawn, uv_pipe, pipe, + connect, + connect2, + accept, + bind, + bind2, + listen, + try_write, + try_write2, WriteFile, NtQueryDirectoryFile,