Compare commits

...

1 Commits

Author SHA1 Message Date
Cursor Agent
fb516e7264 Implement IPC socket handling and file descriptor passing in Bun 2025-06-05 23:48:28 +00:00
6 changed files with 192 additions and 20 deletions

View File

@@ -40,7 +40,7 @@ static StringView extractCookieName(const StringView& cookie)
{
auto nameEnd = cookie.find('=');
if (nameEnd == notFound)
return String();
return StringView();
return cookie.substring(0, nameEnd);
}

View File

@@ -1003,8 +1003,16 @@ pub fn doSend(ipc: ?*SendQueue, globalObject: *JSC.JSGlobalObject, callFrame: *J
},
.none => {},
}
} else if (JSC.API.TCPSocket.fromJS(handle)) |socket| {
log("got TCPSocket", .{});
const fd = socket.socket.fd();
zig_handle = .init(fd, handle);
} else if (JSC.API.TLSSocket.fromJS(handle)) |socket| {
log("got TLSSocket", .{});
const fd = socket.socket.fd();
zig_handle = .init(fd, handle);
} else {
//
log("unknown handle type", .{});
}
}

View File

@@ -145,16 +145,16 @@
* @param {{ keepOpen?: boolean } | undefined} options
* @returns {[unknown, Serialized] | null}
*/
export function serialize(_message, _handle, _options) {
// sending file descriptors is not supported yet
return null; // send the message without the file descriptor
/*
export function serialize(message, handle, options) {
const net = require("node:net");
const dgram = require("node:dgram");
console.log("[IPC serialize] message:", message, "handle:", handle, "handle type:", handle?.constructor?.name);
if (handle instanceof net.Server) {
// this one doesn't need a close function, but the fd needs to be kept alive until it is sent
const server = handle as unknown as (typeof net)["Server"] & { _handle: Bun.TCPSocketListener<unknown> };
if (!server._handle) return null;
return [server._handle, { cmd: "NODE_HANDLE", message, type: "net.Server" }];
} else if (handle instanceof net.Socket) {
const new_message: { cmd: "NODE_HANDLE"; message: unknown; type: "net.Socket"; key?: string } = {
@@ -162,25 +162,23 @@ export function serialize(_message, _handle, _options) {
message,
type: "net.Socket",
};
const socket = handle as unknown as (typeof net)["Socket"] & {
_handle: Bun.Socket;
server: (typeof net)["Server"] | null;
setTimeout(timeout: number): void;
};
const socket = handle as import("node:net").Socket;
console.log("[IPC serialize] socket._handle:", socket._handle);
if (!socket._handle) return null; // failed
// If the socket was created by net.Server
if (socket.server) {
// The worker should keep track of the socket
new_message.key = socket.server._connectionKey;
new_message.key = (socket.server as any)._connectionKey;
const firstTime = !this[kChannelHandle].sockets.send[message.key];
const socketList = getSocketList("send", this, message.key);
// TODO: Handle socket lists when cluster support is added
// const firstTime = !this[kChannelHandle].sockets.send[message.key];
// const socketList = getSocketList("send", this, message.key);
// The server should no longer expose a .connection property
// and when asked to close it should query the socket status from
// the workers
if (firstTime) socket.server._setupWorker(socketList);
// if (firstTime) socket.server._setupWorker(socketList);
// Act like socket is detached
if (!options?.keepOpen) socket.server._connections--;
@@ -192,10 +190,11 @@ export function serialize(_message, _handle, _options) {
// will be sent
if (!options?.keepOpen) {
// we can use a $newZigFunction to have it unset the callback
internal_handle.onread = nop;
socket._handle = null;
// internal_handle.onread = nop;
(socket as any)._handle = null;
socket.setTimeout(0);
}
console.log("[IPC serialize] returning handle and message");
return [internal_handle, new_message];
} else if (handle instanceof dgram.Socket) {
// this one doesn't need a close function, but the fd needs to be kept alive until it is sent
@@ -203,7 +202,6 @@ export function serialize(_message, _handle, _options) {
} else {
throw $ERR_INVALID_HANDLE_TYPE();
}
*/
}
/**
* @param {Serialized} serialized
@@ -212,6 +210,7 @@ export function serialize(_message, _handle, _options) {
* @returns {void}
*/
export function parseHandle(target, serialized, fd) {
console.log("[IPC parseHandle] target:", target, "serialized:", serialized, "fd:", fd);
const emit = $newZigFunction("ipc.zig", "emitHandleIPCMessage", 3);
const net = require("node:net");
// const dgram = require("node:dgram");
@@ -224,7 +223,25 @@ export function parseHandle(target, serialized, fd) {
return;
}
case "net.Socket": {
throw new Error("TODO case net.Socket");
console.log("[IPC parseHandle] Creating net.Socket with fd:", fd);
const socket = new net.Socket({
fd: fd,
readable: true,
writable: true,
});
// If the socket was created by net.Server we will track the socket
if (serialized.key) {
// TODO: Add socket to connections list when cluster support is added
// const socketList = getSocketList("got", this, message.key);
// socketList.add({
// socket: socket,
// });
}
console.log("[IPC parseHandle] Emitting socket, message:", serialized.message);
emit(target, serialized.message, socket);
return;
}
case "dgram.Socket": {
throw new Error("TODO case dgram.Socket");

View File

@@ -751,6 +751,34 @@ function Socket(options?) {
if (options?.fd !== undefined) {
const { fd } = options;
validateInt32(fd, "fd", 0);
// Create socket from fd immediately
if (!this._handle) {
this._handle = newDetachedSocket(false);
initSocketHandle(this);
}
// Connect using the fd
doConnect(this._handle, {
data: this,
fd: fd,
socket: this[khandlers],
allowHalfOpen: this.allowHalfOpen,
})
.then(() => {
// Socket is ready
this.connecting = false;
process.nextTick(() => {
this.emit("connect", this);
this.emit("ready");
});
})
.catch(error => {
if (!this.destroyed) {
this.emit("error", error);
this.emit("close", true);
}
});
}
if (socket instanceof Socket) {

69
test-ipc-socket.js Normal file
View File

@@ -0,0 +1,69 @@
const cp = require("child_process");
const net = require("net");
if (process.argv[2] !== "child") {
// Parent process
console.log("[Parent] Starting test...");
const child = cp.fork(__filename, ["child"]);
child.on("exit", code => {
console.log("[Parent] Child exited with code:", code);
});
child.on("message", msg => {
console.log("[Parent] Received message from child:", msg);
});
const server = net.createServer(socket => {
console.log("[Parent] Client connected, sending socket to child...");
// Send the socket to the child
child.send("socket", socket, { keepOpen: true }, err => {
if (err) {
console.error("[Parent] Error sending socket:", err);
} else {
console.log("[Parent] Socket sent successfully");
}
});
});
server.listen(0, () => {
const port = server.address().port;
console.log("[Parent] Server listening on port:", port);
// Connect to the server
const client = net.connect(port, "127.0.0.1");
client.on("connect", () => {
console.log("[Parent] Connected to server");
});
client.on("data", data => {
console.log("[Parent] Received data:", data.toString());
server.close();
child.disconnect();
});
});
} else {
// Child process
console.log("[Child] Started");
process.on("message", (msg, socket) => {
console.log("[Child] Received message:", msg);
console.log("[Child] Socket:", socket);
console.log("[Child] Socket type:", socket?.constructor?.name);
if (msg === "socket" && socket) {
try {
socket.write("Hello from child!", () => {
console.log("[Child] Data written to socket");
process.send("done");
});
} catch (err) {
console.error("[Child] Error writing to socket:", err);
}
} else {
console.error("[Child] Socket is undefined or null!");
}
});
}

View File

@@ -0,0 +1,50 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const cp = require('child_process');
const net = require('net');
if (process.argv[2] !== 'child') {
// The parent process forks a child process, starts a TCP server, and connects
// to the server. The accepted connection is passed to the child process,
// where the socket is written. Then, the child signals the parent process to
// write to the same socket.
let result = '';
process.on('exit', () => {
assert.strictEqual(result, 'childparent');
});
const child = cp.fork(__filename, ['child']);
// Verify that the child exits successfully
child.on('exit', common.mustCall((exitCode, signalCode) => {
assert.strictEqual(exitCode, 0);
assert.strictEqual(signalCode, null);
}));
const server = net.createServer((socket) => {
child.on('message', common.mustCall((msg) => {
assert.strictEqual(msg, 'child_done');
socket.end('parent', () => {
server.close();
child.disconnect();
});
}));
child.send('socket', socket, { keepOpen: true }, common.mustSucceed());
});
server.listen(0, () => {
const socket = net.connect(server.address().port, common.localhostIPv4);
socket.setEncoding('utf8');
socket.on('data', (data) => result += data);
});
} else {
// The child process receives the socket from the parent, writes data to
// the socket, then signals the parent process to write
process.on('message', common.mustCall((msg, socket) => {
assert.strictEqual(msg, 'socket');
socket.write('child', () => process.send('child_done'));
}));
}