diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 3dd2291cf5..49f0fac92e 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -805,7 +805,12 @@ pub const Subprocess = struct { pub fn disconnect(this: *Subprocess) void { if (this.ipc_mode == .none) return; - this.ipc.socket.close(0, null); + if (Environment.isWindows) { + this.ipc.pipe.data = this; + this.ipc.close(Subprocess); + } else { + this.ipc.socket.close(0, null); + } this.ipc_mode = .none; } @@ -2193,11 +2198,6 @@ pub const Subprocess = struct { } if (args.get(globalThis, "ipc")) |val| { - if (Environment.isWindows) { - globalThis.throwTODO("TODO: IPC is not yet supported on Windows"); - return .zero; - } - if (val.isCell() and val.isCallable(globalThis.vm())) { // In the future, we should add a way to use a different IPC serialization format, specifically `json`. // but the only use case this has is doing interop with node.js IPC and other programs. @@ -2228,6 +2228,24 @@ pub const Subprocess = struct { env_array.capacity = env_array.items.len; } + const pipe_prefix = "BUN_INTERNAL_IPC_PIPE=\\\\.\\pipe\\BUN_IPC_"; + var pipe_env_bytes: [pipe_prefix.len + 37]u8 = undefined; + + const pipe_name_bytes = pipe_env_bytes["BUN_INTERNAL_IPC_PIPE=".len..]; + + if (ipc_mode != .none) { + if (comptime is_sync) { + globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{}); + return .zero; + } + env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in uv_spawn"); + + const uuid = globalThis.bunVM().rareData().nextUUID(); + const pipe_env = std.fmt.bufPrintZ(&pipe_env_bytes, "{s}{s}", .{ pipe_prefix, uuid }) catch |err| return globalThis.handleError(err, "in uv_spawn"); + + env_array.appendAssumeCapacity(pipe_env); + } + env_array.append(allocator, null) catch { globalThis.throwOutOfMemory(); return .zero; @@ -2239,6 +2257,15 @@ pub const Subprocess = struct { globalThis.throwOutOfMemory(); return .zero; }; + subprocess.ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) }; + if (ipc_mode != .none) { + const errno = subprocess.ipc.configureServer(Subprocess, subprocess, pipe_name_bytes); + if (errno != 0) { + alloc.destroy(subprocess); + globalThis.throwValue(bun.sys.Error.fromCodeInt(errno, .uv_spawn).toJSC(globalThis)); + return .zero; + } + } var uv_stdio = [3]uv.uv_stdio_container_s{ stdio[0].setUpChildIoUvSpawn(0, &subprocess.pipes[0], true, bun.invalid_fd) catch |err| { @@ -2286,6 +2313,7 @@ pub const Subprocess = struct { .pid = subprocess.pid, .pidfd = 0, .stdin = Writable.initWithPipe(stdio[0], &subprocess.pipes[0], globalThis) catch { + alloc.destroy(subprocess); globalThis.throwOutOfMemory(); return .zero; }, @@ -2295,16 +2323,15 @@ pub const Subprocess = struct { .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .ipc_mode = ipc_mode, - .ipc = undefined, - .ipc_callback = undefined, + .ipc = subprocess.ipc, + .ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined, .flags = .{ .is_sync = is_sync, }, }; - subprocess.pid.data = subprocess; - std.debug.assert(ipc_mode == .none); //TODO: + subprocess.pid.data = subprocess; const out = if (comptime !is_sync) subprocess.toJS(globalThis) else .zero; subprocess.this_jsvalue = out; @@ -3396,8 +3423,7 @@ pub const Subprocess = struct { } } - pub fn handleIPCClose(this: *Subprocess, _: IPC.Socket) void { - // uSocket is already freed so calling .close() on the socket can segfault + pub fn handleIPCClose(this: *Subprocess) void { this.ipc_mode = .none; this.updateHasPendingActivity(); } diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 9d8af127b7..5a5569a67a 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -94,14 +94,14 @@ pub fn decodeIPCMessage( pub const Socket = uws.NewSocketHandler(false); -pub const IPCData = struct { +const SocketIPCData = struct { socket: Socket, incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well outgoing: IPCBuffer = .{}, has_written_version: if (Environment.allow_assert) u1 else u0 = 0, - pub fn writeVersionPacket(this: *IPCData) void { + pub fn writeVersionPacket(this: *SocketIPCData) void { if (Environment.allow_assert) { std.debug.assert(this.has_written_version == 0); } @@ -120,7 +120,7 @@ pub const IPCData = struct { } } - pub fn serializeAndSend(ipc_data: *IPCData, globalThis: *JSGlobalObject, value: JSValue) bool { + pub fn serializeAndSend(ipc_data: *SocketIPCData, globalThis: *JSGlobalObject, value: JSValue) bool { if (Environment.allow_assert) { std.debug.assert(ipc_data.has_written_version == 1); } @@ -156,17 +156,162 @@ pub const IPCData = struct { } }; -/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers -/// -/// `Context` must be a struct that implements this interface: -/// struct { -/// globalThis: ?*JSGlobalObject, -/// ipc: IPCData, -/// -/// fn handleIPCMessage(*Context, DecodedIPCMessage) void -/// fn handleIPCClose(*Context, Socket) void -/// } -pub fn NewIPCHandler(comptime Context: type) type { +const NamedPipeIPCData = struct { + const uv = bun.windows.libuv; + + pipe: uv.uv_pipe_t, + incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well + outgoing: IPCBuffer = .{}, + connected: bool = false, + has_written_version: if (Environment.allow_assert) u1 else u0 = 0, + connect_req: uv.uv_connect_t = std.mem.zeroes(uv.uv_connect_t), + server: uv.uv_pipe_t = std.mem.zeroes(uv.uv_pipe_t), + current_payload_len: usize = 0, + + pub fn processSend(this: *NamedPipeIPCData) void { + const bytes = this.outgoing.list.slice(); + log("processSend {d}", .{bytes.len}); + if (bytes.len == 0) return; + + const req = bun.new(uv.uv_write_t, std.mem.zeroes(uv.uv_write_t)); + req.data = @ptrCast(this); + req.write_buffer = uv.uv_buf_t.init(bytes); + log("processSend write_buffer {d}", .{req.write_buffer.len}); + this.current_payload_len = bytes.len; + const write_err = uv.uv_write(req, @ptrCast(&this.pipe), @ptrCast(&req.write_buffer), 1, NamedPipeIPCData.uvWriteCallback).int(); + if (write_err < 0) { + Output.printErrorln("Failed write IPC version", .{}); + return; + } + } + + fn uvWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void { + const this = bun.cast(*NamedPipeIPCData, req.data); + log("uvWriteCallback {d} {d} {d}", .{ status.int(), this.current_payload_len, this.outgoing.list.len }); + defer bun.destroy(req); + if (status.errEnum()) |_| { + Output.printErrorln("Failed write IPC data", .{}); + return; + } + const n = this.current_payload_len; + if (n == this.outgoing.list.len) { + this.outgoing.cursor = 0; + this.outgoing.list.len = 0; + } else { + this.outgoing.cursor += @intCast(n); + this.processSend(); + } + } + + pub fn writeVersionPacket(this: *NamedPipeIPCData) void { + if (Environment.allow_assert) { + std.debug.assert(this.has_written_version == 0); + } + const VersionPacket = extern struct { + type: IPCMessageType align(1) = .Version, + version: u32 align(1) = ipcVersion, + }; + + if (Environment.allow_assert) { + this.has_written_version = 1; + } + const bytes = comptime std.mem.asBytes(&VersionPacket{}); + // enqueue to be sent after connecting + var list = this.outgoing.list.listManaged(bun.default_allocator); + list.appendSlice(bytes) catch bun.outOfMemory(); + if (this.connected) { + this.processSend(); + } + } + + pub fn serializeAndSend(this: *NamedPipeIPCData, globalThis: *JSGlobalObject, value: JSValue) bool { + if (Environment.allow_assert) { + std.debug.assert(this.has_written_version == 1); + } + + const serialized = value.serialize(globalThis) orelse return false; + defer serialized.deinit(); + + const size: u32 = @intCast(serialized.data.len); + log("serializeAndSend {d}", .{size}); + + const payload_length: usize = @sizeOf(IPCMessageType) + @sizeOf(u32) + size; + + this.outgoing.list.ensureUnusedCapacity(bun.default_allocator, payload_length) catch @panic("OOM"); + const start_offset = this.outgoing.list.len; + + this.outgoing.list.writeTypeAsBytesAssumeCapacity(u8, @intFromEnum(IPCMessageType.SerializedMessage)); + this.outgoing.list.writeTypeAsBytesAssumeCapacity(u32, size); + this.outgoing.list.appendSliceAssumeCapacity(serialized.data); + + std.debug.assert(this.outgoing.list.len == start_offset + payload_length); + + if (start_offset == 0) { + std.debug.assert(this.outgoing.cursor == 0); + if (this.connected) { + this.processSend(); + } + } + + return true; + } + + pub fn close(this: *NamedPipeIPCData, comptime Context: type) void { + if (this.server.loop != null) { + _ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onServerClose); + } else { + _ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onClose); + } + } + + pub fn configureServer(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int { + log("configureServer", .{}); + const ipc_pipe = &this.server; + + var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 0); + if (errno != 0) { + return errno; + } + ipc_pipe.data = @ptrCast(instance); + errno = uv.uv_pipe_bind2(ipc_pipe, named_pipe.ptr, named_pipe.len, 0); + if (errno != 0) { + return errno; + } + errno = uv.uv_listen(@ptrCast(ipc_pipe), 0, NewNamedPipeIPCHandler(Context).onNewClientConnect); + if (errno != 0) { + return errno; + } + + uv.uv_pipe_pending_instances(ipc_pipe, 1); + + uv.uv_unref(@ptrCast(ipc_pipe)); + + this.writeVersionPacket(); + return 0; + } + + pub fn configureClient(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int { + log("configureClient", .{}); + const ipc_pipe = &this.pipe; + var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 1); + if (errno != 0) { + return errno; + } + ipc_pipe.data = @ptrCast(instance); + this.connect_req.data = @ptrCast(instance); + errno = uv.uv_pipe_connect2(&this.connect_req, ipc_pipe, named_pipe.ptr, named_pipe.len, 0, NewNamedPipeIPCHandler(Context).onConnect); + if (errno != 0) { + return errno; + } + + this.writeVersionPacket(); + return 0; + } +}; + +pub const IPCData = if (Environment.isWindows) NamedPipeIPCData else NamedPipeIPCData; + +fn NewSocketIPCHandler(comptime Context: type) type { return struct { pub fn onOpen( _: *anyopaque, @@ -183,13 +328,13 @@ pub fn NewIPCHandler(comptime Context: type) type { pub fn onClose( this: *Context, - socket: Socket, + _: Socket, _: c_int, _: ?*anyopaque, ) void { // ?! does uSockets .close call onClose? log("onClose\n", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); } pub fn onData( @@ -208,7 +353,7 @@ pub fn NewIPCHandler(comptime Context: type) type { if (this.globalThis) |global| { break :brk global; } - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -227,7 +372,7 @@ pub fn NewIPCHandler(comptime Context: type) type { }, error.InvalidFormat => { Output.printErrorln("InvalidFormatError during IPC message handling", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -257,7 +402,7 @@ pub fn NewIPCHandler(comptime Context: type) type { }, error.InvalidFormat => { Output.printErrorln("InvalidFormatError during IPC message handling", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -318,3 +463,157 @@ pub fn NewIPCHandler(comptime Context: type) type { ) void {} }; } + +fn NewNamedPipeIPCHandler(comptime Context: type) type { + const uv = bun.windows.libuv; + return struct { + fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void { + const this: *Context = @ptrCast(@alignCast(handle.data)); + + var available = this.ipc.incoming.available(); + if (available.len < suggested_size) { + this.ipc.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size) catch bun.outOfMemory(); + available = this.ipc.incoming.available(); + } + log("uvStreamAllocCallback {d}", .{suggested_size}); + buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(suggested_size) }; + } + + fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void { + log("uvStreamReadCallback {d}", .{nread}); + const this: *Context = @ptrCast(@alignCast(handle.data)); + if (nread <= 0) { + switch (nread) { + 0 => { + // EAGAIN or EWOULDBLOCK + return; + }, + uv.UV_EOF => { + _ = uv.uv_read_stop(@ptrCast(handle)); + this.ipc.close(Context); + }, + else => { + _ = uv.uv_read_stop(@ptrCast(handle)); + this.ipc.close(Context); + }, + } + + // when nread < 0 buffer maybe not point to a valid address + return; + } + + this.ipc.incoming.len += @as(u32, @truncate(buffer.len)); + var slice = this.ipc.incoming.slice(); + const globalThis = switch (@typeInfo(@TypeOf(this.globalThis))) { + .Pointer => this.globalThis, + .Optional => brk: { + if (this.globalThis) |global| { + break :brk global; + } + this.handleIPCClose(); + this.ipc.close(Context); + return; + }, + else => @panic("Unexpected globalThis type: " ++ @typeName(@TypeOf(this.globalThis))), + }; + while (true) { + const result = decodeIPCMessage(slice, globalThis) catch |e| switch (e) { + error.NotEnoughBytes => { + // copy the remaining bytes to the start of the buffer + bun.copy(u8, this.ipc.incoming.ptr[0..slice.len], slice); + this.ipc.incoming.len = @truncate(slice.len); + log("hit NotEnoughBytes2", .{}); + return; + }, + error.InvalidFormat => { + Output.printErrorln("InvalidFormatError during IPC message handling", .{}); + this.handleIPCClose(); + this.ipc.close(Context); + return; + }, + }; + + this.handleIPCMessage(result.message); + + if (result.bytes_consumed < slice.len) { + slice = slice[result.bytes_consumed..]; + } else { + // clear the buffer + this.ipc.incoming.len = 0; + return; + } + } + } + + pub fn onNewClientConnect(req: *uv.uv_stream_t, status: c_int) callconv(.C) void { + log("onNewClientConnect {d}", .{status}); + if (status < 0) { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + } + const this = bun.cast(*Context, req.data); + const client = &this.ipc.pipe; + const server = &this.ipc.server; + if (uv.uv_pipe_init(uv.Loop.get(), client, 1) != 0) { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + } + client.data = server.data; + + if (uv.uv_accept(@ptrCast(server), @ptrCast(client)) == 0) { + if (this.ipc.connected) { + this.ipc.close(Context); + return; + } + this.ipc.connected = true; + _ = uv.uv_read_start(@ptrCast(client), uvStreamAllocCallback, uvStreamReadCallback); + this.ipc.processSend(); + } else { + this.ipc.close(Context); + } + } + pub fn onConnect(req: *uv.uv_connect_t, status: c_int) callconv(.C) void { + log("onConnect {d}", .{status}); + if (status < 0) { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + } + const this = bun.cast(*Context, req.data); + _ = uv.uv_read_start(@ptrCast(&this.ipc.pipe), uvStreamAllocCallback, uvStreamReadCallback); + this.ipc.connected = true; + this.ipc.processSend(); + } + pub fn onServerClose(handler: *anyopaque) callconv(.C) void { + log("onServerClose", .{}); + const event = bun.cast(*uv.uv_pipe_t, handler); + const this = bun.cast(*Context, event.data); + this.handleIPCClose(); + } + + pub fn onClose(handler: *anyopaque) callconv(.C) void { + log("onClose", .{}); + const event = bun.cast(*uv.uv_pipe_t, handler); + const this = bun.cast(*Context, event.data); + if (this.ipc.server.loop != null) { + _ = uv.uv_close(@ptrCast(&this.ipc.server), onServerClose); + return; + } + this.handleIPCClose(); + } + }; +} + +/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers +/// +/// `Context` must be a struct that implements this interface: +/// struct { +/// globalThis: ?*JSGlobalObject, +/// ipc: IPCData, +/// +/// fn handleIPCMessage(*Context, DecodedIPCMessage) void +/// fn handleIPCClose(*Context) void +/// } +pub fn NewIPCHandler(comptime Context: type) type { + const IPCHandler = if (Environment.isWindows) NewNamedPipeIPCHandler else NewSocketIPCHandler; + return IPCHandler(Context); +} diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index d06d8b0701..ff28f6bb02 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -95,6 +95,7 @@ const Lock = @import("../lock.zig").Lock; const BuildMessage = JSC.BuildMessage; const ResolveMessage = JSC.ResolveMessage; const Async = bun.Async; +const uv = bun.windows.libuv; pub const OpaqueCallback = *const fn (current: ?*anyopaque) callconv(.C) void; pub fn OpaqueWrap(comptime Context: type, comptime Function: fn (this: *Context) void) OpaqueCallback { @@ -760,11 +761,22 @@ pub const VirtualMachine = struct { this.hide_bun_stackframes = false; } - if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_FD")) |kv| { - if (std.fmt.parseInt(i32, kv.value.value, 10) catch null) |fd| { - this.initIPCInstance(bun.toFD(fd)); + if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_PIPE")) |kv| { + if (Environment.isWindows) { + this.initIPCInstance(kv.value.value); } else { + Output.printErrorln("Failed to connect into BUN_INTERNAL_IPC_PIPE", .{}); + } + } + if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_FD")) |kv| { + if (Environment.isWindows) { Output.printErrorln("Failed to parse BUN_INTERNAL_IPC_FD", .{}); + } else { + if (std.fmt.parseInt(i32, kv.value.value, 10) catch null) |fd| { + this.initIPCInstance(bun.toFD(fd)); + } else { + Output.printErrorln("Failed to parse BUN_INTERNAL_IPC_FD", .{}); + } } } @@ -3000,7 +3012,8 @@ pub const VirtualMachine = struct { pub const IPCInstance = struct { globalThis: ?*JSGlobalObject, - uws_context: *uws.SocketContext, + context: if (Environment.isWindows) u0 else *uws.SocketContext, + ipc: IPC.IPCData, pub fn handleIPCMessage( @@ -3023,36 +3036,51 @@ pub const VirtualMachine = struct { } } - pub fn handleIPCClose(this: *IPCInstance, _: IPC.Socket) void { + pub fn handleIPCClose(this: *IPCInstance) void { JSC.markBinding(@src()); if (this.globalThis) |global| { var vm = global.bunVM(); vm.ipc = null; Process__emitDisconnectEvent(global); } - uws.us_socket_context_free(0, this.uws_context); + + if (!Environment.isWindows) { + uws.us_socket_context_free(0, this.context); + } bun.default_allocator.destroy(this); } pub const Handlers = IPC.NewIPCHandler(IPCInstance); }; - pub fn initIPCInstance(this: *VirtualMachine, fd: bun.FileDescriptor) void { + pub fn initIPCInstance(this: *VirtualMachine, source: if (Environment.isWindows) []const u8 else bun.FileDescriptor) void { + this.event_loop.ensureWaker(); + if (Environment.isWindows) { - Output.warn("IPC is not supported on Windows", .{}); + var instance = bun.default_allocator.create(IPCInstance) catch bun.outOfMemory(); + instance.* = .{ + .globalThis = this.global, + .context = 0, + .ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) }, + }; + const errno = instance.ipc.configureClient(IPCInstance, instance, source); + if (errno != 0) { + @panic("Unable to start IPC"); + } + this.ipc = instance; return; } - this.event_loop.ensureWaker(); + const context = uws.us_create_socket_context(0, this.event_loop_handle.?, @sizeOf(usize), .{}).?; IPC.Socket.configure(context, true, *IPCInstance, IPCInstance.Handlers); - var instance = bun.default_allocator.create(IPCInstance) catch @panic("OOM"); + var instance = bun.default_allocator.create(IPCInstance) catch bun.outOfMemory(); instance.* = .{ .globalThis = this.global, - .uws_context = context, + .context = context, .ipc = undefined, }; - const socket = IPC.Socket.fromFd(context, fd, IPCInstance, instance, null) orelse @panic("Unable to start IPC"); + const socket = IPC.Socket.fromFd(context, source, IPCInstance, instance, null) orelse @panic("Unable to start IPC"); socket.setTimeout(0); instance.ipc = .{ .socket = socket }; diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index 2267766fed..bdb3eb3dfa 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -950,7 +950,7 @@ const struct_unnamed_385 = extern struct { write_reqs_pending: c_uint, shutdown_req: [*c]uv_shutdown_t, }; -pub const uv_connection_cb = ?*const fn ([*c]uv_stream_t, c_int) callconv(.C) void; +pub const uv_connection_cb = ?*const fn (*uv_stream_t, c_int) callconv(.C) void; const struct_unnamed_389 = extern struct { connection_cb: uv_connection_cb, }; @@ -1520,7 +1520,7 @@ const union_unnamed_441 = extern union { connect: struct_unnamed_443, }; pub const uv_connect_t = struct_uv_connect_s; -pub const uv_connect_cb = ?*const fn ([*c]uv_connect_t, c_int) callconv(.C) void; +pub const uv_connect_cb = ?*const fn (*uv_connect_t, c_int) callconv(.C) void; pub const struct_uv_connect_s = extern struct { data: ?*anyopaque, type: uv_req_type,