diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 4eab8074ae..3821765c72 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -1534,20 +1534,18 @@ pub const Subprocess = struct { // This must run before the stdio parsing happens if (args.getTruthy(globalThis, "ipc")) |val| { - if (Environment.isWindows) { - globalThis.throwTODO("TODO: IPC is not yet supported on Windows"); - return .zero; - } - if (val.isCell() and val.isCallable(globalThis.vm())) { // In the future, we should add a way to use a different IPC serialization format, specifically `json`. // but the only use case this has is doing interop with node.js IPC and other programs. ipc_mode = .bun; ipc_callback = val.withAsyncContextIfNeeded(globalThis); - extra_fds.append(.{ .buffer = {} }) catch { - globalThis.throwOutOfMemory(); - return .zero; - }; + + if (Environment.isPosix) { + extra_fds.append(.{ .buffer = {} }) catch { + globalThis.throwOutOfMemory(); + return .zero; + }; + } } } @@ -1706,27 +1704,34 @@ pub const Subprocess = struct { } } - // IPC is currently implemented in a very limited way. - // - // Node lets you pass as many fds as you want, they all become be sockets; then, IPC is just a special - // runtime-owned version of "pipe" (in which pipe is a misleading name since they're bidirectional sockets). - // - // Bun currently only supports three fds: stdin, stdout, and stderr, which are all unidirectional - // - // And then fd 3 is assigned specifically and only for IPC. This is quite lame, because Node.js allows - // the ipc fd to be any number and it just works. But most people only care about the default `.fork()` - // behavior, where this workaround suffices. - // - // When Bun.spawn() is given an `.ipc` callback, it enables IPC as follows: - var socket: IPC.Socket = undefined; + var ipc_info: if (Environment.isPosix) IPC.Socket else [74]u8 = undefined; if (ipc_mode != .none) { if (comptime is_sync) { globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{}); return .zero; } - env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in posix_spawn"); - env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_FD=3"); + if (Environment.isPosix) { + // IPC is currently implemented in a very limited way. + // + // Node lets you pass as many fds as you want, they all become be sockets; then, IPC is just a special + // runtime-owned version of "pipe" (in which pipe is a misleading name since they're bidirectional sockets). + // + // Bun currently only supports three fds: stdin, stdout, and stderr, which are all unidirectional + // + // And then fd 3 is assigned specifically and only for IPC. This is quite lame, because Node.js allows + // the ipc fd to be any number and it just works. But most people only care about the default `.fork()` + // behavior, where this workaround suffices. + // + // When Bun.spawn() is given an `.ipc` callback, it enables IPC as follows: + env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in posix_spawn"); + env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_FD=3"); + } else { + env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in posix_spawn"); + const uuid = globalThis.bunVM().rareData().nextUUID(); + const pipe_env = std.fmt.bufPrintZ(&ipc_info, "BUN_INTERNAL_IPC_FD=\\\\.\\pipe\\BUN_IPC_{s}", .{uuid}) catch |err| return globalThis.handleError(err, "in uv_spawn"); + env_array.appendAssumeCapacity(pipe_env); + } } env_array.append(allocator, null) catch { @@ -1777,22 +1782,21 @@ pub const Subprocess = struct { .result => |result| result, }; - if (ipc_mode != .none) { - if (Environment.isWindows) { - @panic("TODO: IPC"); + if (Environment.isPosix) { + if (ipc_mode != .none) { + ipc_info = .{ + // we initialize ext later in the function + .socket = uws.us_socket_from_fd( + jsc_vm.rareData().spawnIPCContext(jsc_vm), + @sizeOf(*Subprocess), + spawned.extra_pipes.items[0].cast(), + ) orelse { + globalThis.throw("failed to create socket pair", .{}); + // TODO: + return .zero; + }, + }; } - socket = .{ - // we initialize ext later in the function - .socket = uws.us_socket_from_fd( - jsc_vm.rareData().spawnIPCContext(jsc_vm), - @sizeOf(*Subprocess), - spawned.extra_pipes.items[0].cast(), - ) orelse { - globalThis.throw("failed to create socket pair", .{}); - // TODO: - return .zero; - }, - }; } var subprocess = globalThis.allocator().create(Subprocess) catch { @@ -1843,7 +1847,7 @@ pub const Subprocess = struct { .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .ipc_mode = ipc_mode, // will be assigned in the block below - .ipc = .{ .socket = socket }, + .ipc = if (Environment.isWindows) .{} else .{ .socket = ipc_info }, .ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined, .flags = .{ .is_sync = is_sync, @@ -1852,8 +1856,16 @@ pub const Subprocess = struct { subprocess.process.setExitHandler(subprocess); if (ipc_mode != .none) { - const ptr = socket.ext(*Subprocess); - ptr.?.* = subprocess; + if (Environment.isPosix) { + const ptr = ipc_info.ext(*Subprocess); + ptr.?.* = subprocess; + } else { + if (subprocess.ipc.configureServer(Subprocess, subprocess, ipc_info[20..]).asErr()) |err| { + globalThis.allocator().destroy(subprocess); + globalThis.throwValue(err.toJSC(globalThis)); + return .zero; + } + } subprocess.ipc.writeVersionPacket(); } @@ -1986,7 +1998,7 @@ pub const Subprocess = struct { } } - pub fn handleIPCClose(this: *Subprocess, _: IPC.Socket) void { + pub fn handleIPCClose(this: *Subprocess) void { // uSocket is already freed so calling .close() on the socket can segfault this.ipc_mode = .none; this.updateHasPendingActivity(); diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 9d8af127b7..b3601cfffc 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -35,11 +35,6 @@ pub const IPCMessageType = enum(u8) { _, }; -pub const IPCBuffer = struct { - list: bun.ByteList = .{}, - cursor: u32 = 0, -}; - /// Given potentially unfinished buffer `data`, attempt to decode and process a message from it. /// Returns `NotEnoughBytes` if there werent enough bytes /// Returns `InvalidFormat` if the message was invalid, probably close the socket in this case @@ -94,14 +89,14 @@ pub fn decodeIPCMessage( pub const Socket = uws.NewSocketHandler(false); -pub const IPCData = struct { +pub const SocketIPCData = struct { socket: Socket, - incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well - outgoing: IPCBuffer = .{}, + incoming: bun.ByteList = .{}, // Maybe we should use StreamBuffer here as well + outgoing: bun.io.StreamBuffer = .{}, has_written_version: if (Environment.allow_assert) u1 else u0 = 0, - pub fn writeVersionPacket(this: *IPCData) void { + pub fn writeVersionPacket(this: *SocketIPCData) void { if (Environment.allow_assert) { std.debug.assert(this.has_written_version == 0); } @@ -112,15 +107,14 @@ pub const IPCData = struct { const bytes = comptime std.mem.asBytes(&VersionPacket{}); const n = this.socket.write(bytes, false); if (n != bytes.len) { - var list = this.outgoing.list.listManaged(bun.default_allocator); - list.appendSlice(bytes) catch @panic("OOM"); + this.outgoing.write(bytes) catch bun.outOfMemory(); } if (Environment.allow_assert) { this.has_written_version = 1; } } - pub fn serializeAndSend(ipc_data: *IPCData, globalThis: *JSGlobalObject, value: JSValue) bool { + pub fn serializeAndSend(ipc_data: *SocketIPCData, globalThis: *JSGlobalObject, value: JSValue) bool { if (Environment.allow_assert) { std.debug.assert(ipc_data.has_written_version == 1); } @@ -132,21 +126,22 @@ pub const IPCData = struct { const payload_length: usize = @sizeOf(IPCMessageType) + @sizeOf(u32) + size; - ipc_data.outgoing.list.ensureUnusedCapacity(bun.default_allocator, payload_length) catch @panic("OOM"); - const start_offset = ipc_data.outgoing.list.len; + ipc_data.outgoing.ensureUnusedCapacity(payload_length) catch bun.outOfMemory(); + //TODO: probably we should not direct access ipc_data.outgoing.list.items here + const start_offset = ipc_data.outgoing.list.items.len; - ipc_data.outgoing.list.writeTypeAsBytesAssumeCapacity(u8, @intFromEnum(IPCMessageType.SerializedMessage)); - ipc_data.outgoing.list.writeTypeAsBytesAssumeCapacity(u32, size); - ipc_data.outgoing.list.appendSliceAssumeCapacity(serialized.data); + ipc_data.outgoing.writeTypeAsBytesAssumeCapacity(u8, @intFromEnum(IPCMessageType.SerializedMessage)); + ipc_data.outgoing.writeTypeAsBytesAssumeCapacity(u32, size); + ipc_data.outgoing.writeAssumeCapacity(serialized.data); - std.debug.assert(ipc_data.outgoing.list.len == start_offset + payload_length); + std.debug.assert(ipc_data.outgoing.list.items.len == start_offset + payload_length); if (start_offset == 0) { std.debug.assert(ipc_data.outgoing.cursor == 0); - const n = ipc_data.socket.write(ipc_data.outgoing.list.ptr[start_offset..payload_length], false); + const n = ipc_data.socket.write(ipc_data.outgoing.list.items.ptr[start_offset..payload_length], false); if (n == payload_length) { - ipc_data.outgoing.list.len = 0; + ipc_data.outgoing.reset(); } else if (n > 0) { ipc_data.outgoing.cursor = @intCast(n); } @@ -156,17 +151,180 @@ pub const IPCData = struct { } }; -/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers -/// -/// `Context` must be a struct that implements this interface: -/// struct { -/// globalThis: ?*JSGlobalObject, -/// ipc: IPCData, -/// -/// fn handleIPCMessage(*Context, DecodedIPCMessage) void -/// fn handleIPCClose(*Context, Socket) void -/// } -pub fn NewIPCHandler(comptime Context: type) type { +const NamedPipeIPCData = struct { + const uv = bun.windows.libuv; + // we will use writer pipe as Duplex + writer: bun.io.StreamingWriter(NamedPipeIPCData, onWrite, onError, null, onClientClose) = .{}, + + incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well + connected: bool = false, + has_written_version: if (Environment.allow_assert) u1 else u0 = 0, + connect_req: uv.uv_connect_t = std.mem.zeroes(uv.uv_connect_t), + server: ?*uv.Pipe = null, + onClose: ?CloseHandler = null, + const CloseHandler = struct { + callback: *const fn (*anyopaque) void, + context: *anyopaque, + }; + + fn onWrite(_: *NamedPipeIPCData, amount: usize, done: bool) void { + log("onWrite {d} {}", .{ amount, done }); + } + + fn onError(_: *NamedPipeIPCData, err: bun.sys.Error) void { + log("Failed to write outgoing data {}", .{err}); + } + + fn onClientClose(this: *NamedPipeIPCData) void { + log("onClisentClose", .{}); + this.connected = false; + if (this.server) |server| { + // we must close the server too + server.close(onServerClose); + } else { + if (this.onClose) |handler| { + handler.callback(handler.context); + } + this.deinit(); + } + } + + fn onServerClose(pipe: *uv.Pipe) callconv(.C) void { + log("onServerClose", .{}); + const this = bun.cast(*NamedPipeIPCData, pipe.data); + this.server = null; + if (this.connected) { + // close and deinit client if connected + this.writer.deinit(); + return; + } + if (this.onClose) |handler| { + handler.callback(handler.context); + } + this.deinit(); + } + + pub fn writeVersionPacket(this: *NamedPipeIPCData) void { + if (Environment.allow_assert) { + std.debug.assert(this.has_written_version == 0); + } + const VersionPacket = extern struct { + type: IPCMessageType align(1) = .Version, + version: u32 align(1) = ipcVersion, + }; + + if (Environment.allow_assert) { + this.has_written_version = 1; + } + const bytes = comptime std.mem.asBytes(&VersionPacket{}); + if (this.connected) { + _ = this.writer.write(bytes); + } else { + // enqueue to be sent after connecting + this.writer.outgoing.write(bytes) catch bun.outOfMemory(); + } + } + + pub fn serializeAndSend(this: *NamedPipeIPCData, globalThis: *JSGlobalObject, value: JSValue) bool { + if (Environment.allow_assert) { + std.debug.assert(this.has_written_version == 1); + } + + const serialized = value.serialize(globalThis) orelse return false; + defer serialized.deinit(); + + const size: u32 = @intCast(serialized.data.len); + log("serializeAndSend {d}", .{size}); + + const payload_length: usize = @sizeOf(IPCMessageType) + @sizeOf(u32) + size; + + this.writer.outgoing.ensureUnusedCapacity(payload_length) catch @panic("OOM"); + const start_offset = this.writer.outgoing.list.items.len; + + this.writer.outgoing.writeTypeAsBytesAssumeCapacity(u8, @intFromEnum(IPCMessageType.SerializedMessage)); + this.writer.outgoing.writeTypeAsBytesAssumeCapacity(u32, size); + this.writer.outgoing.writeAssumeCapacity(serialized.data); + + std.debug.assert(this.writer.outgoing.list.items.len == start_offset + payload_length); + + if (start_offset == 0) { + std.debug.assert(this.writer.outgoing.cursor == 0); + if (this.connected) { + _ = this.writer.flush(); + } + } + + return true; + } + + pub fn close(this: *NamedPipeIPCData) void { + if (this.server) |server| { + server.close(onServerClose); + } else { + this.writer.close(); + } + } + + pub fn configureServer(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) JSC.Maybe(void) { + log("configureServer", .{}); + const ipc_pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory(); + this.server = ipc_pipe; + ipc_pipe.data = this; + if (ipc_pipe.init(uv.Loop.get(), false).asErr()) |err| { + bun.default_allocator.destroy(ipc_pipe); + this.server = null; + return .{ .err = err }; + } + ipc_pipe.data = @ptrCast(instance); + this.onClose = .{ + .callback = @ptrCast(&NewNamedPipeIPCHandler(Context).onClose), + .context = @ptrCast(instance), + }; + if (ipc_pipe.listenNamedPipe(named_pipe, 0, instance, NewNamedPipeIPCHandler(Context).onNewClientConnect).asErr()) |err| { + bun.default_allocator.destroy(ipc_pipe); + this.server = null; + return .{ .err = err }; + } + + ipc_pipe.setPendingInstancesCount(1); + + ipc_pipe.unref(); + + return .{ .result = {} }; + } + + pub fn configureClient(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) !void { + log("configureClient", .{}); + const ipc_pipe = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory(); + ipc_pipe.init(uv.Loop.get(), true).unwrap() catch |err| { + bun.default_allocator.destroy(ipc_pipe); + return err; + }; + this.writer.startWithPipe(ipc_pipe).unwrap() catch |err| { + bun.default_allocator.destroy(ipc_pipe); + return err; + }; + this.connect_req.data = @ptrCast(instance); + this.onClose = .{ + .callback = @ptrCast(&NewNamedPipeIPCHandler(Context).onClose), + .context = @ptrCast(instance), + }; + try ipc_pipe.connect(&this.connect_req, named_pipe, instance, NewNamedPipeIPCHandler(Context).onConnect).unwrap(); + } + + fn deinit(this: *NamedPipeIPCData) void { + log("deinit", .{}); + this.writer.deinit(); + if (this.server) |server| { + bun.default_allocator.destroy(server); + } + this.incoming.deinitWithAllocator(bun.default_allocator); + } +}; + +pub const IPCData = if (Environment.isWindows) NamedPipeIPCData else SocketIPCData; + +pub fn NewSocketIPCHandler(comptime Context: type) type { return struct { pub fn onOpen( _: *anyopaque, @@ -183,13 +341,13 @@ pub fn NewIPCHandler(comptime Context: type) type { pub fn onClose( this: *Context, - socket: Socket, + _: Socket, _: c_int, _: ?*anyopaque, ) void { // ?! does uSockets .close call onClose? log("onClose\n", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); } pub fn onData( @@ -208,7 +366,7 @@ pub fn NewIPCHandler(comptime Context: type) type { if (this.globalThis) |global| { break :brk global; } - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -221,13 +379,13 @@ pub fn NewIPCHandler(comptime Context: type) type { while (true) { const result = decodeIPCMessage(data, globalThis) catch |e| switch (e) { error.NotEnoughBytes => { - _ = this.ipc.incoming.write(bun.default_allocator, data) catch @panic("OOM"); + _ = this.ipc.incoming.write(bun.default_allocator, data) catch bun.outOfMemory(); log("hit NotEnoughBytes", .{}); return; }, error.InvalidFormat => { Output.printErrorln("InvalidFormatError during IPC message handling", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -243,7 +401,7 @@ pub fn NewIPCHandler(comptime Context: type) type { } } - _ = this.ipc.incoming.write(bun.default_allocator, data) catch @panic("OOM"); + _ = this.ipc.incoming.write(bun.default_allocator, data) catch bun.outOfMemory(); var slice = this.ipc.incoming.slice(); while (true) { @@ -257,7 +415,7 @@ pub fn NewIPCHandler(comptime Context: type) type { }, error.InvalidFormat => { Output.printErrorln("InvalidFormatError during IPC message handling", .{}); - this.handleIPCClose(socket); + this.handleIPCClose(); socket.close(0, null); return; }, @@ -279,16 +437,16 @@ pub fn NewIPCHandler(comptime Context: type) type { context: *Context, socket: Socket, ) void { - const to_write = context.ipc.outgoing.list.ptr[context.ipc.outgoing.cursor..context.ipc.outgoing.list.len]; + const to_write = context.ipc.outgoing.slice(); if (to_write.len == 0) { - context.ipc.outgoing.cursor = 0; - context.ipc.outgoing.list.len = 0; + context.ipc.outgoing.reset(); + context.ipc.outgoing.reset(); return; } const n = socket.write(to_write, false); if (n == to_write.len) { - context.ipc.outgoing.cursor = 0; - context.ipc.outgoing.list.len = 0; + context.ipc.outgoing.reset(); + context.ipc.outgoing.reset(); } else if (n > 0) { context.ipc.outgoing.cursor += @intCast(n); } @@ -318,3 +476,141 @@ pub fn NewIPCHandler(comptime Context: type) type { ) void {} }; } + +fn NewNamedPipeIPCHandler(comptime Context: type) type { + const uv = bun.windows.libuv; + return struct { + fn onReadAlloc(this: *Context, suggested_size: usize) []u8 { + var available = this.ipc.incoming.available(); + if (available.len < suggested_size) { + this.ipc.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size) catch bun.outOfMemory(); + available = this.ipc.incoming.available(); + } + log("onReadAlloc {d}", .{suggested_size}); + return available.ptr[0..suggested_size]; + } + + fn onReadError(this: *Context, err: bun.C.E) void { + log("onReadError {}", .{err}); + this.ipc.close(); + } + + fn onRead(this: *Context, buffer: []const u8) void { + log("onRead {d}", .{buffer.len}); + this.ipc.incoming.len += @as(u32, @truncate(buffer.len)); + var slice = this.ipc.incoming.slice(); + const globalThis = switch (@typeInfo(@TypeOf(this.globalThis))) { + .Pointer => this.globalThis, + .Optional => brk: { + if (this.globalThis) |global| { + break :brk global; + } + this.ipc.close(); + return; + }, + else => @panic("Unexpected globalThis type: " ++ @typeName(@TypeOf(this.globalThis))), + }; + while (true) { + const result = decodeIPCMessage(slice, globalThis) catch |e| switch (e) { + error.NotEnoughBytes => { + // copy the remaining bytes to the start of the buffer + bun.copy(u8, this.ipc.incoming.ptr[0..slice.len], slice); + this.ipc.incoming.len = @truncate(slice.len); + log("hit NotEnoughBytes2", .{}); + return; + }, + error.InvalidFormat => { + Output.printErrorln("InvalidFormatError during IPC message handling", .{}); + this.ipc.close(); + return; + }, + }; + + this.handleIPCMessage(result.message); + + if (result.bytes_consumed < slice.len) { + slice = slice[result.bytes_consumed..]; + } else { + // clear the buffer + this.ipc.incoming.len = 0; + return; + } + } + } + + pub fn onNewClientConnect(this: *Context, status: uv.ReturnCode) void { + log("onNewClientConnect {d}", .{status.int()}); + if (status.errEnum()) |_| { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + } + const server = this.ipc.server orelse { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + }; + var client = bun.default_allocator.create(uv.Pipe) catch bun.outOfMemory(); + client.init(uv.Loop.get(), true).unwrap() catch { + bun.default_allocator.destroy(client); + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + }; + + this.ipc.writer.startWithPipe(client).unwrap() catch { + bun.default_allocator.destroy(client); + Output.printErrorln("Failed to start IPC pipe", .{}); + return; + }; + + switch (server.accept(client)) { + .err => { + this.ipc.close(); + return; + }, + .result => { + this.ipc.connected = true; + client.readStart(this, onReadAlloc, onReadError, onRead).unwrap() catch { + this.ipc.close(); + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + }; + _ = this.ipc.writer.flush(); + }, + } + } + + pub fn onClose(this: *Context) void { + this.handleIPCClose(); + } + + fn onConnect(this: *Context, status: uv.ReturnCode) void { + log("onConnect {d}", .{status.int()}); + this.ipc.connected = true; + + if (status.errEnum()) |_| { + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + } + this.ipc.writer.pipe.?.readStart(this, onReadAlloc, onReadError, onRead).unwrap() catch { + this.ipc.close(); + Output.printErrorln("Failed to connect IPC pipe", .{}); + return; + }; + _ = this.ipc.writer.flush(); + } + }; +} + +/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers +/// +/// `Context` must be a struct that implements this interface: +/// struct { +/// globalThis: ?*JSGlobalObject, +/// ipc: IPCData, +/// +/// fn handleIPCMessage(*Context, DecodedIPCMessage) void +/// fn handleIPCClose(*Context) void +/// } +pub fn NewIPCHandler(comptime Context: type) type { + const IPCHandler = if (Environment.isWindows) NewNamedPipeIPCHandler else NewSocketIPCHandler; + return IPCHandler(Context); +} diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 3913a0a9e8..dda1cb404e 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -115,6 +115,8 @@ const SourceMap = @import("../sourcemap/sourcemap.zig"); const ParsedSourceMap = SourceMap.Mapping.ParsedSourceMap; const MappingList = SourceMap.Mapping.List; +const uv = bun.windows.libuv; + pub const SavedSourceMap = struct { pub const vlq_offset = 24; @@ -756,7 +758,9 @@ pub const VirtualMachine = struct { } if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_FD")) |kv| { - if (std.fmt.parseInt(i32, kv.value.value, 10) catch null) |fd| { + if (Environment.isWindows) { + this.initIPCInstance(kv.value.value); + } else if (std.fmt.parseInt(i32, kv.value.value, 10) catch null) |fd| { this.initIPCInstance(bun.toFD(fd)); } else { Output.printErrorln("Failed to parse BUN_INTERNAL_IPC_FD", .{}); @@ -3128,9 +3132,11 @@ pub const VirtualMachine = struct { pub const IPCInstance = struct { globalThis: ?*JSGlobalObject, - uws_context: *uws.SocketContext, + context: if (Environment.isPosix) *uws.SocketContext else u0, ipc: IPC.IPCData, + pub usingnamespace bun.New(@This()); + pub fn handleIPCMessage( this: *IPCInstance, message: IPC.DecodedIPCMessage, @@ -3151,36 +3157,56 @@ pub const VirtualMachine = struct { } } - pub fn handleIPCClose(this: *IPCInstance, _: IPC.Socket) void { + pub fn handleIPCClose(this: *IPCInstance) void { JSC.markBinding(@src()); if (this.globalThis) |global| { var vm = global.bunVM(); vm.ipc = null; Process__emitDisconnectEvent(global); } - uws.us_socket_context_free(0, this.uws_context); - bun.default_allocator.destroy(this); + if (Environment.isPosix) { + uws.us_socket_context_free(0, this.context); + } + this.destroy(); } pub const Handlers = IPC.NewIPCHandler(IPCInstance); }; - pub fn initIPCInstance(this: *VirtualMachine, fd: bun.FileDescriptor) void { + const IPCInfoType = if (Environment.isWindows) []const u8 else bun.FileDescriptor; + pub fn initIPCInstance(this: *VirtualMachine, info: IPCInfoType) void { if (Environment.isWindows) { Output.warn("IPC is not supported on Windows", .{}); + + var instance = IPCInstance.new(.{ + .globalThis = this.global, + .context = 0, + .ipc = .{}, + }); + instance.ipc.configureClient(IPCInstance, instance, info) catch { + instance.destroy(); + Output.printErrorln("Unable to start IPC pipe", .{}); + return; + }; + + this.ipc = instance; + instance.ipc.writeVersionPacket(); return; } this.event_loop.ensureWaker(); const context = uws.us_create_socket_context(0, this.event_loop_handle.?, @sizeOf(usize), .{}).?; IPC.Socket.configure(context, true, *IPCInstance, IPCInstance.Handlers); - var instance = bun.default_allocator.create(IPCInstance) catch @panic("OOM"); - instance.* = .{ + var instance = IPCInstance.new(.{ .globalThis = this.global, - .uws_context = context, + .context = context, .ipc = undefined, + }); + const socket = IPC.Socket.fromFd(context, info, IPCInstance, instance, null) orelse { + instance.destroy(); + Output.printErrorln("Unable to start IPC socket", .{}); + return; }; - const socket = IPC.Socket.fromFd(context, fd, IPCInstance, instance, null) orelse @panic("Unable to start IPC"); socket.setTimeout(0); instance.ipc = .{ .socket = socket }; diff --git a/src/deps/libuv.zig b/src/deps/libuv.zig index 0e11e49536..c5301e8bc6 100644 --- a/src/deps/libuv.zig +++ b/src/deps/libuv.zig @@ -952,7 +952,7 @@ const struct_unnamed_385 = extern struct { write_reqs_pending: c_uint, shutdown_req: [*c]uv_shutdown_t, }; -pub const uv_connection_cb = ?*const fn ([*c]uv_stream_t, c_int) callconv(.C) void; +pub const uv_connection_cb = ?*const fn (*uv_stream_t, ReturnCode) callconv(.C) void; const struct_unnamed_389 = extern struct { connection_cb: uv_connection_cb, }; @@ -1261,6 +1261,38 @@ pub const Pipe = extern struct { return .{ .result = {} }; } + + pub fn listenNamedPipe(this: *@This(), named_pipe: []const u8, backlog: i32, context: anytype, comptime onClientConnect: *const (fn (@TypeOf(context), ReturnCode) void)) Maybe(void) { + if (this.bind(named_pipe, 0).asErr()) |err| { + return .{ .err = err }; + } + return this.listen(backlog, context, onClientConnect); + } + + pub fn bind(this: *@This(), named_pipe: []const u8, flags: i32) Maybe(void) { + if (uv_pipe_bind2(this, named_pipe.ptr, named_pipe.len, @intCast(flags)).toError(.bind2)) |err| { + return .{ .err = err }; + } + return .{ .result = {} }; + } + + pub fn connect(this: *@This(), req: *uv_connect_t, name: []const u8, context: anytype, comptime onConnect: *const (fn (@TypeOf(context), ReturnCode) void)) Maybe(void) { + this.data = @ptrCast(context); + const Wrapper = struct { + pub fn uvConnectCb(handle: *uv_connect_t, status: ReturnCode) callconv(.C) void { + onConnect(@ptrCast(@alignCast(handle.data)), status); + } + }; + + if (uv_pipe_connect2(req, this, @ptrCast(name.ptr), name.len, 0, &Wrapper.uvConnectCb).toError(.connect2)) |err| { + return .{ .err = err }; + } + return .{ .result = {} }; + } + + pub fn setPendingInstancesCount(this: *@This(), count: i32) void { + uv_pipe_pending_instances(this, count); + } }; const union_unnamed_416 = extern union { fd: c_int, @@ -1588,7 +1620,7 @@ const union_unnamed_441 = extern union { connect: struct_unnamed_443, }; pub const uv_connect_t = struct_uv_connect_s; -pub const uv_connect_cb = ?*const fn ([*c]uv_connect_t, c_int) callconv(.C) void; +pub const uv_connect_cb = ?*const fn (*uv_connect_t, ReturnCode) callconv(.C) void; pub const struct_uv_connect_s = extern struct { data: ?*anyopaque, type: uv_req_type, @@ -1974,8 +2006,8 @@ pub extern fn uv_buf_init(base: [*]u8, len: c_uint) uv_buf_t; pub extern fn uv_pipe(fds: *[2]uv_file, read_flags: c_int, write_flags: c_int) ReturnCode; pub extern fn uv_socketpair(@"type": c_int, protocol: c_int, socket_vector: [*]uv_os_sock_t, flags0: c_int, flags1: c_int) ReturnCode; pub extern fn uv_stream_get_write_queue_size(stream: [*c]const uv_stream_t) usize; -pub extern fn uv_listen(stream: [*c]uv_stream_t, backlog: c_int, cb: uv_connection_cb) c_int; -pub extern fn uv_accept(server: [*c]uv_stream_t, client: [*c]uv_stream_t) c_int; +pub extern fn uv_listen(stream: [*c]uv_stream_t, backlog: c_int, cb: uv_connection_cb) ReturnCode; +pub extern fn uv_accept(server: [*c]uv_stream_t, client: [*c]uv_stream_t) ReturnCode; pub extern fn uv_read_start(*uv_stream_t, alloc_cb: uv_alloc_cb, read_cb: uv_read_cb) ReturnCode; pub extern fn uv_read_stop(*uv_stream_t) ReturnCode; pub extern fn uv_write(req: *uv_write_t, handle: *uv_stream_t, bufs: [*]const uv_buf_t, nbufs: c_uint, cb: uv_write_cb) ReturnCode; @@ -2047,9 +2079,9 @@ const enum_unnamed_462 = c_uint; pub extern fn uv_pipe_init(*uv_loop_t, handle: *Pipe, ipc: c_int) ReturnCode; pub extern fn uv_pipe_open(*Pipe, file: uv_file) ReturnCode; pub extern fn uv_pipe_bind(handle: *Pipe, name: [*]const u8) c_int; -pub extern fn uv_pipe_bind2(handle: *Pipe, name: [*]const u8, namelen: usize, flags: c_uint) c_int; +pub extern fn uv_pipe_bind2(handle: *Pipe, name: [*]const u8, namelen: usize, flags: c_uint) ReturnCode; pub extern fn uv_pipe_connect(req: [*c]uv_connect_t, handle: *Pipe, name: [*]const u8, cb: uv_connect_cb) void; -pub extern fn uv_pipe_connect2(req: [*c]uv_connect_t, handle: *Pipe, name: [*]const u8, namelen: usize, flags: c_uint, cb: uv_connect_cb) c_int; +pub extern fn uv_pipe_connect2(req: [*c]uv_connect_t, handle: *Pipe, name: [*]const u8, namelen: usize, flags: c_uint, cb: uv_connect_cb) ReturnCode; pub extern fn uv_pipe_getsockname(handle: *const Pipe, buffer: [*]u8, size: [*c]usize) c_int; pub extern fn uv_pipe_getpeername(handle: *const Pipe, buffer: [*]u8, size: [*c]usize) c_int; pub extern fn uv_pipe_pending_instances(handle: *Pipe, count: c_int) void; @@ -2639,6 +2671,10 @@ pub const ReturnCodeI64 = enum(i64) { zero = 0, _, + pub fn init(i: i64) ReturnCodeI64 { + return @enumFromInt(i); + } + pub fn format(this: ReturnCodeI64, comptime fmt_: []const u8, options_: std.fmt.FormatOptions, writer: anytype) !void { _ = fmt_; _ = options_; @@ -2759,22 +2795,26 @@ fn StreamMixin(comptime Type: type) type { onConnect(@ptrCast(@alignCast(handle.data)), status); } }; - const rc = uv_listen(@ptrCast(this), backlog, &Wrapper.uvConnectCb); - if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .listen } }; + if (uv_listen(@ptrCast(this), backlog, &Wrapper.uvConnectCb).toError(.listen)) |err| { + return .{ .err = err }; } return .{ .result = {} }; } pub fn accept(this: *Type, client: *Type) Maybe(void) { - const rc = uv_accept(@ptrCast(this), @ptrCast(client)); - if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .accept } }; + if (uv_accept(@ptrCast(this), @ptrCast(client)).toError(.accept)) |err| { + return .{ .err = err }; } return .{ .result = {} }; } - pub fn readStart(this: *Type, context: anytype, comptime alloc_cb: *const (fn (@TypeOf(context), suggested_size: usize) []u8), comptime error_cb: *const (fn (@TypeOf(context), err: bun.C.E) void), comptime read_cb: *const (fn (@TypeOf(context), data: []const u8) void)) Maybe(void) { + pub fn readStart( + this: *Type, + context: anytype, + comptime alloc_cb: *const (fn (@TypeOf(context), suggested_size: usize) []u8), + comptime error_cb: *const (fn (@TypeOf(context), err: bun.C.E) void), + comptime read_cb: *const (fn (@TypeOf(context), data: []const u8) void), + ) Maybe(void) { const Context = @TypeOf(context); this.data = @ptrCast(context); const Wrapper = struct { @@ -2787,16 +2827,15 @@ fn StreamMixin(comptime Type: type) type { if (nreads == 0) return; // EAGAIN or EWOULDBLOCK if (nreads < 0) { req.readStop(); - const rc = ReturnCodeI64{ .value = nreads }; - error_cb(context_data, rc.errEnum() orelse bun.C.E.CANCELED); + error_cb(context_data, ReturnCodeI64.init(nreads).errEnum() orelse bun.C.E.CANCELED); } else { read_cb(context_data, buffer.slice()); } } }; - const rc = uv_read_start(@ptrCast(this), @ptrCast(&Wrapper.uvAllocb), @ptrCast(&Wrapper.uvReadcb)); - if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .listen } }; + + if (uv_read_start(@ptrCast(this), @ptrCast(&Wrapper.uvAllocb), @ptrCast(&Wrapper.uvReadcb)).toError(.listen)) |err| { + return .{ .err = err }; } return .{ .result = {} }; } @@ -2821,33 +2860,31 @@ fn StreamMixin(comptime Type: type) type { uv_data.data = context; uv_data.write_buffer = uv_buf_t.init(input); - const rc = uv_write(uv_data, @ptrCast(this), @ptrCast(&uv_data.write_buffer), 1, &Wrapper.uvWriteCb); - if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .write } }; + if (uv_write(uv_data, @ptrCast(this), @ptrCast(&uv_data.write_buffer), 1, &Wrapper.uvWriteCb).toError(.write)) |err| { + return .{ .err = err }; } return .{ .result = {} }; } var req: uv_write_t = std.mem.zeroes(uv_write_t); - const rc = uv_write(&req, this, @ptrCast(&uv_buf_t.init(input)), 1, null); - if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .write } }; + if (uv_write(&req, this, @ptrCast(&uv_buf_t.init(input)), 1, null).toError(.write)) |err| { + return .{ .err = err }; } return .{ .result = {} }; } pub fn tryWrite(this: *Type, input: []const u8) Maybe(usize) { const rc = uv_try_write(@ptrCast(this), @ptrCast(&uv_buf_t.init(input)), 1); - if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .try_write } }; + if (rc.toError(.try_write)) |err| { + return .{ .err = err }; } return .{ .result = @intCast(rc.int()) }; } pub fn tryWrite2(this: *Type, input: []const u8, send_handle: *uv_stream_t) ReturnCode { const rc = uv_try_write2(@ptrCast(this), @ptrCast(&uv_buf_t.init(input)), 1, send_handle); - if (rc.errno()) |errno| { - return .{ .err = .{ .errno = errno, .syscall = .try_write2 } }; + if (rc.toError(.try_write2)) |err| { + return .{ .err = err }; } return .{ .result = @intCast(rc.int()) }; } diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index e1271f48ae..6d20fd616e 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -889,6 +889,7 @@ pub const WindowsBufferedReader = struct { this.pipe = pipe; return this.startWithCurrentPipe(); } + pub fn start(this: *WindowsOutputReader, fd: bun.FileDescriptor, _: bool) bun.JSC.Maybe(void) { //TODO: check detect if its a tty here and use uv_tty_t instead of pipe std.debug.assert(this.pipe == null); diff --git a/src/io/PipeWriter.zig b/src/io/PipeWriter.zig index fa1d45d6b2..e3b34f85c0 100644 --- a/src/io/PipeWriter.zig +++ b/src/io/PipeWriter.zig @@ -334,6 +334,7 @@ pub fn PosixStreamingWriter( comptime onClose: fn (*Parent) void, ) type { return struct { + // TODO: replace buffer + head for StreamBuffer buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), handle: PollOrFd = .{ .closed = {} }, parent: *Parent = undefined, @@ -1036,7 +1037,7 @@ pub fn WindowsBufferedWriter( } /// Basic std.ArrayList(u8) + u32 cursor wrapper -const StreamBuffer = struct { +pub const StreamBuffer = struct { list: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator), // should cursor be usize? cursor: u32 = 0, @@ -1065,6 +1066,29 @@ const StreamBuffer = struct { _ = try this.list.appendSlice(buffer); } + pub fn writeAssumeCapacity(this: *StreamBuffer, buffer: []const u8) void { + var byte_list = bun.ByteList.fromList(this.list); + defer this.list = byte_list.listManaged(this.list.allocator); + byte_list.appendSliceAssumeCapacity(buffer); + } + + pub fn ensureUnusedCapacity(this: *StreamBuffer, capacity: usize) !void { + var byte_list = bun.ByteList.fromList(this.list); + defer this.list = byte_list.listManaged(this.list.allocator); + + _ = try byte_list.ensureUnusedCapacity(this.list.allocator, capacity); + } + + pub fn writeTypeAsBytes(this: *StreamBuffer, comptime T: type, data: *const T) !void { + _ = try this.write(std.mem.asBytes(data)); + } + + pub fn writeTypeAsBytesAssumeCapacity(this: *StreamBuffer, comptime T: type, data: T) void { + var byte_list = bun.ByteList.fromList(this.list); + defer this.list = byte_list.listManaged(this.list.allocator); + byte_list.writeTypeAsBytesAssumeCapacity(T, data); + } + pub fn writeLatin1(this: *StreamBuffer, buffer: []const u8) !void { if (bun.strings.isAllASCII(buffer)) { return this.write(buffer); diff --git a/src/io/io.zig b/src/io/io.zig index e291a51dcd..452f477dc0 100644 --- a/src/io/io.zig +++ b/src/io/io.zig @@ -932,3 +932,4 @@ pub const BufferedReader = @import("./PipeReader.zig").BufferedReader; pub const BufferedWriter = @import("./PipeWriter.zig").BufferedWriter; pub const WriteResult = @import("./PipeWriter.zig").WriteResult; pub const StreamingWriter = @import("./PipeWriter.zig").StreamingWriter; +pub const StreamBuffer = @import("./PipeWriter.zig").StreamBuffer; diff --git a/src/sys.zig b/src/sys.zig index 1481aa915a..bb9accc9c4 100644 --- a/src/sys.zig +++ b/src/sys.zig @@ -132,10 +132,14 @@ pub const Tag = enum(u8) { preadv, ioctl_ficlone, - uv_spawn, - uv_pipe, + accept, + bind2, + connect2, + listen, pipe, try_write, + uv_spawn, + uv_pipe, WriteFile, NtQueryDirectoryFile, NtSetInformationFile, diff --git a/test/js/bun/spawn/spawn.ipc.test.ts b/test/js/bun/spawn/spawn.ipc.test.ts new file mode 100644 index 0000000000..f492aa3ce1 --- /dev/null +++ b/test/js/bun/spawn/spawn.ipc.test.ts @@ -0,0 +1,36 @@ +import { spawn } from "bun"; +import { describe, expect, it } from "bun:test"; +import { gcTick, bunExe } from "harness"; +import path from "path"; + +describe("ipc", () => { + it("the subprocess should be defined and the child should send", done => { + gcTick(); + const returned_subprocess = spawn([bunExe(), path.join(__dirname, "bun-ipc-child.js")], { + ipc: (message, subProcess) => { + expect(subProcess).toBe(returned_subprocess); + expect(message).toBe("hello"); + subProcess.kill(); + done(); + gcTick(); + }, + }); + }); + + it("the subprocess should receive the parent message and respond back", done => { + gcTick(); + + const parentMessage = "I am your father"; + const childProc = spawn([bunExe(), path.join(__dirname, "bun-ipc-child-respond.js")], { + ipc: (message, subProcess) => { + expect(message).toBe(`pong:${parentMessage}`); + subProcess.kill(); + done(); + gcTick(); + }, + }); + + childProc.send(parentMessage); + gcTick(); + }); +}); diff --git a/test/js/bun/spawn/spawn.test.ts b/test/js/bun/spawn/spawn.test.ts index bccb55510b..4bf2882032 100644 --- a/test/js/bun/spawn/spawn.test.ts +++ b/test/js/bun/spawn/spawn.test.ts @@ -471,38 +471,6 @@ for (let [gcTick, label] of [ } }); - describe("ipc", () => { - it("the subprocess should be defined and the child should send", done => { - gcTick(); - const returned_subprocess = spawn([bunExe(), path.join(__dirname, "bun-ipc-child.js")], { - ipc: (message, subProcess) => { - expect(subProcess).toBe(returned_subprocess); - expect(message).toBe("hello"); - subProcess.kill(); - done(); - gcTick(); - }, - }); - }); - - it("the subprocess should receive the parent message and respond back", done => { - gcTick(); - - const parentMessage = "I am your father"; - const childProc = spawn([bunExe(), path.join(__dirname, "bun-ipc-child-respond.js")], { - ipc: (message, subProcess) => { - expect(message).toBe(`pong:${parentMessage}`); - subProcess.kill(); - done(); - gcTick(); - }, - }); - - childProc.send(parentMessage); - gcTick(); - }); - }); - it("throws errors for invalid arguments", async () => { expect(() => { spawnSync({