mirror of
https://github.com/oven-sh/bun
synced 2026-02-13 20:39:05 +00:00
win IPC yay
This commit is contained in:
@@ -805,7 +805,12 @@ pub const Subprocess = struct {
|
||||
|
||||
pub fn disconnect(this: *Subprocess) void {
|
||||
if (this.ipc_mode == .none) return;
|
||||
this.ipc.socket.close(0, null);
|
||||
if (Environment.isWindows) {
|
||||
this.ipc.pipe.data = this;
|
||||
this.ipc.close(Subprocess);
|
||||
} else {
|
||||
this.ipc.socket.close(0, null);
|
||||
}
|
||||
this.ipc_mode = .none;
|
||||
}
|
||||
|
||||
@@ -2193,11 +2198,6 @@ pub const Subprocess = struct {
|
||||
}
|
||||
|
||||
if (args.get(globalThis, "ipc")) |val| {
|
||||
if (Environment.isWindows) {
|
||||
globalThis.throwTODO("TODO: IPC is not yet supported on Windows");
|
||||
return .zero;
|
||||
}
|
||||
|
||||
if (val.isCell() and val.isCallable(globalThis.vm())) {
|
||||
// In the future, we should add a way to use a different IPC serialization format, specifically `json`.
|
||||
// but the only use case this has is doing interop with node.js IPC and other programs.
|
||||
@@ -2228,6 +2228,24 @@ pub const Subprocess = struct {
|
||||
env_array.capacity = env_array.items.len;
|
||||
}
|
||||
|
||||
const pipe_prefix = "BUN_INTERNAL_IPC_PIPE=\\\\.\\pipe\\BUN_IPC_";
|
||||
var pipe_env_bytes: [pipe_prefix.len + 37]u8 = undefined;
|
||||
|
||||
const pipe_name_bytes = pipe_env_bytes["BUN_INTERNAL_IPC_PIPE=".len..];
|
||||
|
||||
if (ipc_mode != .none) {
|
||||
if (comptime is_sync) {
|
||||
globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{});
|
||||
return .zero;
|
||||
}
|
||||
env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in uv_spawn");
|
||||
|
||||
const uuid = globalThis.bunVM().rareData().nextUUID();
|
||||
const pipe_env = std.fmt.bufPrintZ(&pipe_env_bytes, "{s}{s}", .{ pipe_prefix, uuid }) catch |err| return globalThis.handleError(err, "in uv_spawn");
|
||||
|
||||
env_array.appendAssumeCapacity(pipe_env);
|
||||
}
|
||||
|
||||
env_array.append(allocator, null) catch {
|
||||
globalThis.throwOutOfMemory();
|
||||
return .zero;
|
||||
@@ -2239,6 +2257,15 @@ pub const Subprocess = struct {
|
||||
globalThis.throwOutOfMemory();
|
||||
return .zero;
|
||||
};
|
||||
subprocess.ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) };
|
||||
if (ipc_mode != .none) {
|
||||
const errno = subprocess.ipc.configureServer(Subprocess, subprocess, pipe_name_bytes);
|
||||
if (errno != 0) {
|
||||
alloc.destroy(subprocess);
|
||||
globalThis.throwValue(bun.sys.Error.fromCodeInt(errno, .uv_spawn).toJSC(globalThis));
|
||||
return .zero;
|
||||
}
|
||||
}
|
||||
|
||||
var uv_stdio = [3]uv.uv_stdio_container_s{
|
||||
stdio[0].setUpChildIoUvSpawn(0, &subprocess.pipes[0], true, bun.invalid_fd) catch |err| {
|
||||
@@ -2286,6 +2313,7 @@ pub const Subprocess = struct {
|
||||
.pid = subprocess.pid,
|
||||
.pidfd = 0,
|
||||
.stdin = Writable.initWithPipe(stdio[0], &subprocess.pipes[0], globalThis) catch {
|
||||
alloc.destroy(subprocess);
|
||||
globalThis.throwOutOfMemory();
|
||||
return .zero;
|
||||
},
|
||||
@@ -2295,16 +2323,15 @@ pub const Subprocess = struct {
|
||||
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
|
||||
|
||||
.ipc_mode = ipc_mode,
|
||||
.ipc = undefined,
|
||||
.ipc_callback = undefined,
|
||||
.ipc = subprocess.ipc,
|
||||
.ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined,
|
||||
|
||||
.flags = .{
|
||||
.is_sync = is_sync,
|
||||
},
|
||||
};
|
||||
subprocess.pid.data = subprocess;
|
||||
std.debug.assert(ipc_mode == .none); //TODO:
|
||||
|
||||
subprocess.pid.data = subprocess;
|
||||
const out = if (comptime !is_sync) subprocess.toJS(globalThis) else .zero;
|
||||
subprocess.this_jsvalue = out;
|
||||
|
||||
@@ -3396,8 +3423,7 @@ pub const Subprocess = struct {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handleIPCClose(this: *Subprocess, _: IPC.Socket) void {
|
||||
// uSocket is already freed so calling .close() on the socket can segfault
|
||||
pub fn handleIPCClose(this: *Subprocess) void {
|
||||
this.ipc_mode = .none;
|
||||
this.updateHasPendingActivity();
|
||||
}
|
||||
|
||||
@@ -94,14 +94,14 @@ pub fn decodeIPCMessage(
|
||||
|
||||
pub const Socket = uws.NewSocketHandler(false);
|
||||
|
||||
pub const IPCData = struct {
|
||||
const SocketIPCData = struct {
|
||||
socket: Socket,
|
||||
incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well
|
||||
outgoing: IPCBuffer = .{},
|
||||
|
||||
has_written_version: if (Environment.allow_assert) u1 else u0 = 0,
|
||||
|
||||
pub fn writeVersionPacket(this: *IPCData) void {
|
||||
pub fn writeVersionPacket(this: *SocketIPCData) void {
|
||||
if (Environment.allow_assert) {
|
||||
std.debug.assert(this.has_written_version == 0);
|
||||
}
|
||||
@@ -120,7 +120,7 @@ pub const IPCData = struct {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serializeAndSend(ipc_data: *IPCData, globalThis: *JSGlobalObject, value: JSValue) bool {
|
||||
pub fn serializeAndSend(ipc_data: *SocketIPCData, globalThis: *JSGlobalObject, value: JSValue) bool {
|
||||
if (Environment.allow_assert) {
|
||||
std.debug.assert(ipc_data.has_written_version == 1);
|
||||
}
|
||||
@@ -156,17 +156,162 @@ pub const IPCData = struct {
|
||||
}
|
||||
};
|
||||
|
||||
/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers
|
||||
///
|
||||
/// `Context` must be a struct that implements this interface:
|
||||
/// struct {
|
||||
/// globalThis: ?*JSGlobalObject,
|
||||
/// ipc: IPCData,
|
||||
///
|
||||
/// fn handleIPCMessage(*Context, DecodedIPCMessage) void
|
||||
/// fn handleIPCClose(*Context, Socket) void
|
||||
/// }
|
||||
pub fn NewIPCHandler(comptime Context: type) type {
|
||||
const NamedPipeIPCData = struct {
|
||||
const uv = bun.windows.libuv;
|
||||
|
||||
pipe: uv.uv_pipe_t,
|
||||
incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well
|
||||
outgoing: IPCBuffer = .{},
|
||||
connected: bool = false,
|
||||
has_written_version: if (Environment.allow_assert) u1 else u0 = 0,
|
||||
connect_req: uv.uv_connect_t = std.mem.zeroes(uv.uv_connect_t),
|
||||
server: uv.uv_pipe_t = std.mem.zeroes(uv.uv_pipe_t),
|
||||
current_payload_len: usize = 0,
|
||||
|
||||
pub fn processSend(this: *NamedPipeIPCData) void {
|
||||
const bytes = this.outgoing.list.slice();
|
||||
log("processSend {d}", .{bytes.len});
|
||||
if (bytes.len == 0) return;
|
||||
|
||||
const req = bun.new(uv.uv_write_t, std.mem.zeroes(uv.uv_write_t));
|
||||
req.data = @ptrCast(this);
|
||||
req.write_buffer = uv.uv_buf_t.init(bytes);
|
||||
log("processSend write_buffer {d}", .{req.write_buffer.len});
|
||||
this.current_payload_len = bytes.len;
|
||||
const write_err = uv.uv_write(req, @ptrCast(&this.pipe), @ptrCast(&req.write_buffer), 1, NamedPipeIPCData.uvWriteCallback).int();
|
||||
if (write_err < 0) {
|
||||
Output.printErrorln("Failed write IPC version", .{});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
fn uvWriteCallback(req: *uv.uv_write_t, status: uv.ReturnCode) callconv(.C) void {
|
||||
const this = bun.cast(*NamedPipeIPCData, req.data);
|
||||
log("uvWriteCallback {d} {d} {d}", .{ status.int(), this.current_payload_len, this.outgoing.list.len });
|
||||
defer bun.destroy(req);
|
||||
if (status.errEnum()) |_| {
|
||||
Output.printErrorln("Failed write IPC data", .{});
|
||||
return;
|
||||
}
|
||||
const n = this.current_payload_len;
|
||||
if (n == this.outgoing.list.len) {
|
||||
this.outgoing.cursor = 0;
|
||||
this.outgoing.list.len = 0;
|
||||
} else {
|
||||
this.outgoing.cursor += @intCast(n);
|
||||
this.processSend();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn writeVersionPacket(this: *NamedPipeIPCData) void {
|
||||
if (Environment.allow_assert) {
|
||||
std.debug.assert(this.has_written_version == 0);
|
||||
}
|
||||
const VersionPacket = extern struct {
|
||||
type: IPCMessageType align(1) = .Version,
|
||||
version: u32 align(1) = ipcVersion,
|
||||
};
|
||||
|
||||
if (Environment.allow_assert) {
|
||||
this.has_written_version = 1;
|
||||
}
|
||||
const bytes = comptime std.mem.asBytes(&VersionPacket{});
|
||||
// enqueue to be sent after connecting
|
||||
var list = this.outgoing.list.listManaged(bun.default_allocator);
|
||||
list.appendSlice(bytes) catch bun.outOfMemory();
|
||||
if (this.connected) {
|
||||
this.processSend();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serializeAndSend(this: *NamedPipeIPCData, globalThis: *JSGlobalObject, value: JSValue) bool {
|
||||
if (Environment.allow_assert) {
|
||||
std.debug.assert(this.has_written_version == 1);
|
||||
}
|
||||
|
||||
const serialized = value.serialize(globalThis) orelse return false;
|
||||
defer serialized.deinit();
|
||||
|
||||
const size: u32 = @intCast(serialized.data.len);
|
||||
log("serializeAndSend {d}", .{size});
|
||||
|
||||
const payload_length: usize = @sizeOf(IPCMessageType) + @sizeOf(u32) + size;
|
||||
|
||||
this.outgoing.list.ensureUnusedCapacity(bun.default_allocator, payload_length) catch @panic("OOM");
|
||||
const start_offset = this.outgoing.list.len;
|
||||
|
||||
this.outgoing.list.writeTypeAsBytesAssumeCapacity(u8, @intFromEnum(IPCMessageType.SerializedMessage));
|
||||
this.outgoing.list.writeTypeAsBytesAssumeCapacity(u32, size);
|
||||
this.outgoing.list.appendSliceAssumeCapacity(serialized.data);
|
||||
|
||||
std.debug.assert(this.outgoing.list.len == start_offset + payload_length);
|
||||
|
||||
if (start_offset == 0) {
|
||||
std.debug.assert(this.outgoing.cursor == 0);
|
||||
if (this.connected) {
|
||||
this.processSend();
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
pub fn close(this: *NamedPipeIPCData, comptime Context: type) void {
|
||||
if (this.server.loop != null) {
|
||||
_ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onServerClose);
|
||||
} else {
|
||||
_ = uv.uv_close(@ptrCast(&this.pipe), NewNamedPipeIPCHandler(Context).onClose);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn configureServer(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int {
|
||||
log("configureServer", .{});
|
||||
const ipc_pipe = &this.server;
|
||||
|
||||
var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 0);
|
||||
if (errno != 0) {
|
||||
return errno;
|
||||
}
|
||||
ipc_pipe.data = @ptrCast(instance);
|
||||
errno = uv.uv_pipe_bind2(ipc_pipe, named_pipe.ptr, named_pipe.len, 0);
|
||||
if (errno != 0) {
|
||||
return errno;
|
||||
}
|
||||
errno = uv.uv_listen(@ptrCast(ipc_pipe), 0, NewNamedPipeIPCHandler(Context).onNewClientConnect);
|
||||
if (errno != 0) {
|
||||
return errno;
|
||||
}
|
||||
|
||||
uv.uv_pipe_pending_instances(ipc_pipe, 1);
|
||||
|
||||
uv.uv_unref(@ptrCast(ipc_pipe));
|
||||
|
||||
this.writeVersionPacket();
|
||||
return 0;
|
||||
}
|
||||
|
||||
pub fn configureClient(this: *NamedPipeIPCData, comptime Context: type, instance: *Context, named_pipe: []const u8) c_int {
|
||||
log("configureClient", .{});
|
||||
const ipc_pipe = &this.pipe;
|
||||
var errno = uv.uv_pipe_init(uv.Loop.get(), ipc_pipe, 1);
|
||||
if (errno != 0) {
|
||||
return errno;
|
||||
}
|
||||
ipc_pipe.data = @ptrCast(instance);
|
||||
this.connect_req.data = @ptrCast(instance);
|
||||
errno = uv.uv_pipe_connect2(&this.connect_req, ipc_pipe, named_pipe.ptr, named_pipe.len, 0, NewNamedPipeIPCHandler(Context).onConnect);
|
||||
if (errno != 0) {
|
||||
return errno;
|
||||
}
|
||||
|
||||
this.writeVersionPacket();
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
pub const IPCData = if (Environment.isWindows) NamedPipeIPCData else NamedPipeIPCData;
|
||||
|
||||
fn NewSocketIPCHandler(comptime Context: type) type {
|
||||
return struct {
|
||||
pub fn onOpen(
|
||||
_: *anyopaque,
|
||||
@@ -183,13 +328,13 @@ pub fn NewIPCHandler(comptime Context: type) type {
|
||||
|
||||
pub fn onClose(
|
||||
this: *Context,
|
||||
socket: Socket,
|
||||
_: Socket,
|
||||
_: c_int,
|
||||
_: ?*anyopaque,
|
||||
) void {
|
||||
// ?! does uSockets .close call onClose?
|
||||
log("onClose\n", .{});
|
||||
this.handleIPCClose(socket);
|
||||
this.handleIPCClose();
|
||||
}
|
||||
|
||||
pub fn onData(
|
||||
@@ -208,7 +353,7 @@ pub fn NewIPCHandler(comptime Context: type) type {
|
||||
if (this.globalThis) |global| {
|
||||
break :brk global;
|
||||
}
|
||||
this.handleIPCClose(socket);
|
||||
this.handleIPCClose();
|
||||
socket.close(0, null);
|
||||
return;
|
||||
},
|
||||
@@ -227,7 +372,7 @@ pub fn NewIPCHandler(comptime Context: type) type {
|
||||
},
|
||||
error.InvalidFormat => {
|
||||
Output.printErrorln("InvalidFormatError during IPC message handling", .{});
|
||||
this.handleIPCClose(socket);
|
||||
this.handleIPCClose();
|
||||
socket.close(0, null);
|
||||
return;
|
||||
},
|
||||
@@ -257,7 +402,7 @@ pub fn NewIPCHandler(comptime Context: type) type {
|
||||
},
|
||||
error.InvalidFormat => {
|
||||
Output.printErrorln("InvalidFormatError during IPC message handling", .{});
|
||||
this.handleIPCClose(socket);
|
||||
this.handleIPCClose();
|
||||
socket.close(0, null);
|
||||
return;
|
||||
},
|
||||
@@ -318,3 +463,157 @@ pub fn NewIPCHandler(comptime Context: type) type {
|
||||
) void {}
|
||||
};
|
||||
}
|
||||
|
||||
fn NewNamedPipeIPCHandler(comptime Context: type) type {
|
||||
const uv = bun.windows.libuv;
|
||||
return struct {
|
||||
fn uvStreamAllocCallback(handle: *uv.uv_handle_t, suggested_size: usize, buffer: *uv.uv_buf_t) callconv(.C) void {
|
||||
const this: *Context = @ptrCast(@alignCast(handle.data));
|
||||
|
||||
var available = this.ipc.incoming.available();
|
||||
if (available.len < suggested_size) {
|
||||
this.ipc.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size) catch bun.outOfMemory();
|
||||
available = this.ipc.incoming.available();
|
||||
}
|
||||
log("uvStreamAllocCallback {d}", .{suggested_size});
|
||||
buffer.* = .{ .base = @ptrCast(available.ptr), .len = @intCast(suggested_size) };
|
||||
}
|
||||
|
||||
fn uvStreamReadCallback(handle: *uv.uv_handle_t, nread: isize, buffer: *const uv.uv_buf_t) callconv(.C) void {
|
||||
log("uvStreamReadCallback {d}", .{nread});
|
||||
const this: *Context = @ptrCast(@alignCast(handle.data));
|
||||
if (nread <= 0) {
|
||||
switch (nread) {
|
||||
0 => {
|
||||
// EAGAIN or EWOULDBLOCK
|
||||
return;
|
||||
},
|
||||
uv.UV_EOF => {
|
||||
_ = uv.uv_read_stop(@ptrCast(handle));
|
||||
this.ipc.close(Context);
|
||||
},
|
||||
else => {
|
||||
_ = uv.uv_read_stop(@ptrCast(handle));
|
||||
this.ipc.close(Context);
|
||||
},
|
||||
}
|
||||
|
||||
// when nread < 0 buffer maybe not point to a valid address
|
||||
return;
|
||||
}
|
||||
|
||||
this.ipc.incoming.len += @as(u32, @truncate(buffer.len));
|
||||
var slice = this.ipc.incoming.slice();
|
||||
const globalThis = switch (@typeInfo(@TypeOf(this.globalThis))) {
|
||||
.Pointer => this.globalThis,
|
||||
.Optional => brk: {
|
||||
if (this.globalThis) |global| {
|
||||
break :brk global;
|
||||
}
|
||||
this.handleIPCClose();
|
||||
this.ipc.close(Context);
|
||||
return;
|
||||
},
|
||||
else => @panic("Unexpected globalThis type: " ++ @typeName(@TypeOf(this.globalThis))),
|
||||
};
|
||||
while (true) {
|
||||
const result = decodeIPCMessage(slice, globalThis) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
// copy the remaining bytes to the start of the buffer
|
||||
bun.copy(u8, this.ipc.incoming.ptr[0..slice.len], slice);
|
||||
this.ipc.incoming.len = @truncate(slice.len);
|
||||
log("hit NotEnoughBytes2", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat => {
|
||||
Output.printErrorln("InvalidFormatError during IPC message handling", .{});
|
||||
this.handleIPCClose();
|
||||
this.ipc.close(Context);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
this.handleIPCMessage(result.message);
|
||||
|
||||
if (result.bytes_consumed < slice.len) {
|
||||
slice = slice[result.bytes_consumed..];
|
||||
} else {
|
||||
// clear the buffer
|
||||
this.ipc.incoming.len = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onNewClientConnect(req: *uv.uv_stream_t, status: c_int) callconv(.C) void {
|
||||
log("onNewClientConnect {d}", .{status});
|
||||
if (status < 0) {
|
||||
Output.printErrorln("Failed to connect IPC pipe", .{});
|
||||
return;
|
||||
}
|
||||
const this = bun.cast(*Context, req.data);
|
||||
const client = &this.ipc.pipe;
|
||||
const server = &this.ipc.server;
|
||||
if (uv.uv_pipe_init(uv.Loop.get(), client, 1) != 0) {
|
||||
Output.printErrorln("Failed to connect IPC pipe", .{});
|
||||
return;
|
||||
}
|
||||
client.data = server.data;
|
||||
|
||||
if (uv.uv_accept(@ptrCast(server), @ptrCast(client)) == 0) {
|
||||
if (this.ipc.connected) {
|
||||
this.ipc.close(Context);
|
||||
return;
|
||||
}
|
||||
this.ipc.connected = true;
|
||||
_ = uv.uv_read_start(@ptrCast(client), uvStreamAllocCallback, uvStreamReadCallback);
|
||||
this.ipc.processSend();
|
||||
} else {
|
||||
this.ipc.close(Context);
|
||||
}
|
||||
}
|
||||
pub fn onConnect(req: *uv.uv_connect_t, status: c_int) callconv(.C) void {
|
||||
log("onConnect {d}", .{status});
|
||||
if (status < 0) {
|
||||
Output.printErrorln("Failed to connect IPC pipe", .{});
|
||||
return;
|
||||
}
|
||||
const this = bun.cast(*Context, req.data);
|
||||
_ = uv.uv_read_start(@ptrCast(&this.ipc.pipe), uvStreamAllocCallback, uvStreamReadCallback);
|
||||
this.ipc.connected = true;
|
||||
this.ipc.processSend();
|
||||
}
|
||||
pub fn onServerClose(handler: *anyopaque) callconv(.C) void {
|
||||
log("onServerClose", .{});
|
||||
const event = bun.cast(*uv.uv_pipe_t, handler);
|
||||
const this = bun.cast(*Context, event.data);
|
||||
this.handleIPCClose();
|
||||
}
|
||||
|
||||
pub fn onClose(handler: *anyopaque) callconv(.C) void {
|
||||
log("onClose", .{});
|
||||
const event = bun.cast(*uv.uv_pipe_t, handler);
|
||||
const this = bun.cast(*Context, event.data);
|
||||
if (this.ipc.server.loop != null) {
|
||||
_ = uv.uv_close(@ptrCast(&this.ipc.server), onServerClose);
|
||||
return;
|
||||
}
|
||||
this.handleIPCClose();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers
|
||||
///
|
||||
/// `Context` must be a struct that implements this interface:
|
||||
/// struct {
|
||||
/// globalThis: ?*JSGlobalObject,
|
||||
/// ipc: IPCData,
|
||||
///
|
||||
/// fn handleIPCMessage(*Context, DecodedIPCMessage) void
|
||||
/// fn handleIPCClose(*Context) void
|
||||
/// }
|
||||
pub fn NewIPCHandler(comptime Context: type) type {
|
||||
const IPCHandler = if (Environment.isWindows) NewNamedPipeIPCHandler else NewSocketIPCHandler;
|
||||
return IPCHandler(Context);
|
||||
}
|
||||
|
||||
@@ -95,6 +95,7 @@ const Lock = @import("../lock.zig").Lock;
|
||||
const BuildMessage = JSC.BuildMessage;
|
||||
const ResolveMessage = JSC.ResolveMessage;
|
||||
const Async = bun.Async;
|
||||
const uv = bun.windows.libuv;
|
||||
|
||||
pub const OpaqueCallback = *const fn (current: ?*anyopaque) callconv(.C) void;
|
||||
pub fn OpaqueWrap(comptime Context: type, comptime Function: fn (this: *Context) void) OpaqueCallback {
|
||||
@@ -760,11 +761,22 @@ pub const VirtualMachine = struct {
|
||||
this.hide_bun_stackframes = false;
|
||||
}
|
||||
|
||||
if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_FD")) |kv| {
|
||||
if (std.fmt.parseInt(i32, kv.value.value, 10) catch null) |fd| {
|
||||
this.initIPCInstance(bun.toFD(fd));
|
||||
if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_PIPE")) |kv| {
|
||||
if (Environment.isWindows) {
|
||||
this.initIPCInstance(kv.value.value);
|
||||
} else {
|
||||
Output.printErrorln("Failed to connect into BUN_INTERNAL_IPC_PIPE", .{});
|
||||
}
|
||||
}
|
||||
if (map.map.fetchSwapRemove("BUN_INTERNAL_IPC_FD")) |kv| {
|
||||
if (Environment.isWindows) {
|
||||
Output.printErrorln("Failed to parse BUN_INTERNAL_IPC_FD", .{});
|
||||
} else {
|
||||
if (std.fmt.parseInt(i32, kv.value.value, 10) catch null) |fd| {
|
||||
this.initIPCInstance(bun.toFD(fd));
|
||||
} else {
|
||||
Output.printErrorln("Failed to parse BUN_INTERNAL_IPC_FD", .{});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3000,7 +3012,8 @@ pub const VirtualMachine = struct {
|
||||
|
||||
pub const IPCInstance = struct {
|
||||
globalThis: ?*JSGlobalObject,
|
||||
uws_context: *uws.SocketContext,
|
||||
context: if (Environment.isWindows) u0 else *uws.SocketContext,
|
||||
|
||||
ipc: IPC.IPCData,
|
||||
|
||||
pub fn handleIPCMessage(
|
||||
@@ -3023,36 +3036,51 @@ pub const VirtualMachine = struct {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handleIPCClose(this: *IPCInstance, _: IPC.Socket) void {
|
||||
pub fn handleIPCClose(this: *IPCInstance) void {
|
||||
JSC.markBinding(@src());
|
||||
if (this.globalThis) |global| {
|
||||
var vm = global.bunVM();
|
||||
vm.ipc = null;
|
||||
Process__emitDisconnectEvent(global);
|
||||
}
|
||||
uws.us_socket_context_free(0, this.uws_context);
|
||||
|
||||
if (!Environment.isWindows) {
|
||||
uws.us_socket_context_free(0, this.context);
|
||||
}
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
pub const Handlers = IPC.NewIPCHandler(IPCInstance);
|
||||
};
|
||||
|
||||
pub fn initIPCInstance(this: *VirtualMachine, fd: bun.FileDescriptor) void {
|
||||
pub fn initIPCInstance(this: *VirtualMachine, source: if (Environment.isWindows) []const u8 else bun.FileDescriptor) void {
|
||||
this.event_loop.ensureWaker();
|
||||
|
||||
if (Environment.isWindows) {
|
||||
Output.warn("IPC is not supported on Windows", .{});
|
||||
var instance = bun.default_allocator.create(IPCInstance) catch bun.outOfMemory();
|
||||
instance.* = .{
|
||||
.globalThis = this.global,
|
||||
.context = 0,
|
||||
.ipc = .{ .pipe = std.mem.zeroes(uv.uv_pipe_t) },
|
||||
};
|
||||
const errno = instance.ipc.configureClient(IPCInstance, instance, source);
|
||||
if (errno != 0) {
|
||||
@panic("Unable to start IPC");
|
||||
}
|
||||
this.ipc = instance;
|
||||
return;
|
||||
}
|
||||
this.event_loop.ensureWaker();
|
||||
|
||||
const context = uws.us_create_socket_context(0, this.event_loop_handle.?, @sizeOf(usize), .{}).?;
|
||||
IPC.Socket.configure(context, true, *IPCInstance, IPCInstance.Handlers);
|
||||
|
||||
var instance = bun.default_allocator.create(IPCInstance) catch @panic("OOM");
|
||||
var instance = bun.default_allocator.create(IPCInstance) catch bun.outOfMemory();
|
||||
instance.* = .{
|
||||
.globalThis = this.global,
|
||||
.uws_context = context,
|
||||
.context = context,
|
||||
.ipc = undefined,
|
||||
};
|
||||
const socket = IPC.Socket.fromFd(context, fd, IPCInstance, instance, null) orelse @panic("Unable to start IPC");
|
||||
const socket = IPC.Socket.fromFd(context, source, IPCInstance, instance, null) orelse @panic("Unable to start IPC");
|
||||
socket.setTimeout(0);
|
||||
instance.ipc = .{ .socket = socket };
|
||||
|
||||
|
||||
@@ -950,7 +950,7 @@ const struct_unnamed_385 = extern struct {
|
||||
write_reqs_pending: c_uint,
|
||||
shutdown_req: [*c]uv_shutdown_t,
|
||||
};
|
||||
pub const uv_connection_cb = ?*const fn ([*c]uv_stream_t, c_int) callconv(.C) void;
|
||||
pub const uv_connection_cb = ?*const fn (*uv_stream_t, c_int) callconv(.C) void;
|
||||
const struct_unnamed_389 = extern struct {
|
||||
connection_cb: uv_connection_cb,
|
||||
};
|
||||
@@ -1520,7 +1520,7 @@ const union_unnamed_441 = extern union {
|
||||
connect: struct_unnamed_443,
|
||||
};
|
||||
pub const uv_connect_t = struct_uv_connect_s;
|
||||
pub const uv_connect_cb = ?*const fn ([*c]uv_connect_t, c_int) callconv(.C) void;
|
||||
pub const uv_connect_cb = ?*const fn (*uv_connect_t, c_int) callconv(.C) void;
|
||||
pub const struct_uv_connect_s = extern struct {
|
||||
data: ?*anyopaque,
|
||||
type: uv_req_type,
|
||||
|
||||
Reference in New Issue
Block a user