fix(node:net): add Socket helper functions to socket.ts

Added Socket-specific connection helper functions (lookupAndConnect,
internalConnect, etc.) from the middle section of the original net.ts
to socket.ts where they belong.

This fixes Socket.prototype.connect() which was calling undefined
functions.

Test improvements: 6 pass → 13 pass (18 failures remaining)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Claude Bot
2025-10-19 23:27:51 +00:00
parent be2a24b3c1
commit cfc2cdffb0
2 changed files with 639 additions and 17 deletions

View File

@@ -1,7 +1,6 @@
export const Duplex = require("internal/streams/duplex");
export const { getDefaultHighWaterMark } = require("internal/streams/state");
export const EventEmitter = require("node:events");
export let dns: typeof import("node:dns");
export const normalizedArgsSymbol = Symbol("normalizedArgs");
export const { ExceptionWithHostPort } = require("internal/shared");

View File

@@ -1,23 +1,76 @@
import type { Socket, SocketHandler, SocketListener } from "bun";
import type { Server as NetServer, Socket as NetSocket, ServerOpts } from "node:net";
import type { Server as NetServer, Socket as NetSocket } from "node:net";
import type { TLSSocket } from "node:tls";
const {
Duplex, getDefaultHighWaterMark, EventEmitter, dns,
normalizedArgsSymbol, ExceptionWithHostPort, kTimeout, getTimerDuration,
validateFunction, validateNumber, validateAbortSignal, validatePort, validateBoolean, validateInt32, validateString,
NodeAggregateError, ErrnoException,
ArrayPrototypeIncludes, ArrayPrototypePush, MathMax,
UV_ECANCELED, UV_ETIMEDOUT, isWindows,
getDefaultAutoSelectFamily, setDefaultAutoSelectFamily, getDefaultAutoSelectFamilyAttemptTimeout, setDefaultAutoSelectFamilyAttemptTimeout,
SocketAddress, BlockList, newDetachedSocket, doConnect,
addServerName, upgradeDuplexToTLS, isNamedPipeSocket, getBufferedAmount,
isIPv4, isIPv6, isIP,
bunTlsSymbol, bunSocketServerOptions, owner_symbol,
kServerSocket, kBytesWritten, bunTLSConnectOptions, kReinitializeHandle,
kRealListen, kSetNoDelay, kSetKeepAlive, kSetKeepAliveInitialDelay, kConnectOptions, kAttach, kCloseRawConnection,
kpendingRead, kupgraded, ksocket, khandlers, kclosed, kended, kwriteCallback, kSocketClass,
endNT, emitCloseNT, detachSocket, destroyNT, destroyWhenAborted, onSocketEnd, writeAfterFIN, onConnectEnd
Duplex,
getDefaultHighWaterMark,
EventEmitter,
normalizedArgsSymbol,
ExceptionWithHostPort,
kTimeout,
getTimerDuration,
validateFunction,
validateNumber,
validateAbortSignal,
validatePort,
validateBoolean,
validateInt32,
validateString,
NodeAggregateError,
ErrnoException,
ArrayPrototypeIncludes,
ArrayPrototypePush,
MathMax,
UV_ECANCELED,
UV_ETIMEDOUT,
isWindows,
getDefaultAutoSelectFamily,
setDefaultAutoSelectFamily,
getDefaultAutoSelectFamilyAttemptTimeout,
setDefaultAutoSelectFamilyAttemptTimeout,
SocketAddress,
BlockList,
newDetachedSocket,
doConnect,
addServerName,
upgradeDuplexToTLS,
isNamedPipeSocket,
getBufferedAmount,
isIPv4,
isIPv6,
isIP,
bunTlsSymbol,
bunSocketServerOptions,
owner_symbol,
kServerSocket,
kBytesWritten,
bunTLSConnectOptions,
kReinitializeHandle,
kRealListen,
kSetNoDelay,
kSetKeepAlive,
kSetKeepAliveInitialDelay,
kConnectOptions,
kAttach,
kCloseRawConnection,
kpendingRead,
kupgraded,
ksocket,
khandlers,
kclosed,
kended,
kwriteCallback,
kSocketClass,
endNT,
emitCloseNT,
detachSocket,
destroyNT,
destroyWhenAborted,
onSocketEnd,
writeAfterFIN,
onConnectEnd,
} = require("internal/net/shared");
let dns: typeof import("node:dns");
const SocketHandlers: SocketHandler = {
close(socket, err) {
@@ -1425,3 +1478,573 @@ function isPipeName(s) {
function toNumber(x) {
return (x = Number(x)) >= 0 ? x : false;
}
function createConnection(...args) {
const normalized = normalizeArgs(args);
const options = normalized[0];
const socket = new Socket(options);
if (options.timeout) {
socket.setTimeout(options.timeout);
}
return socket.connect(normalized);
}
function lookupAndConnect(self, options) {
const { localAddress, localPort } = options;
const host = options.host || "localhost";
let { port, autoSelectFamilyAttemptTimeout, autoSelectFamily } = options;
validateString(host, "options.host");
if (localAddress && !isIP(localAddress)) {
throw $ERR_INVALID_IP_ADDRESS(localAddress);
}
if (localPort) {
validateNumber(localPort, "options.localPort");
}
if (typeof port !== "undefined") {
if (typeof port !== "number" && typeof port !== "string") {
throw $ERR_INVALID_ARG_TYPE("options.port", ["number", "string"], port);
}
validatePort(port);
}
port |= 0;
if (autoSelectFamily != null) {
validateBoolean(autoSelectFamily, "options.autoSelectFamily");
} else {
autoSelectFamily = getDefaultAutoSelectFamily();
}
if (autoSelectFamilyAttemptTimeout != null) {
validateInt32(autoSelectFamilyAttemptTimeout, "options.autoSelectFamilyAttemptTimeout", 1);
if (autoSelectFamilyAttemptTimeout < 10) {
autoSelectFamilyAttemptTimeout = 10;
}
} else {
autoSelectFamilyAttemptTimeout = getDefaultAutoSelectFamilyAttemptTimeout();
}
// If host is an IP, skip performing a lookup
const addressType = isIP(host);
if (addressType) {
process.nextTick(() => {
if (self.connecting) {
internalConnect(self, options, host, port, addressType, localAddress, localPort);
}
});
return;
}
if (options.lookup != null) validateFunction(options.lookup, "options.lookup");
if (dns === undefined) dns = require("node:dns");
const dnsopts = {
family: socketToDnsFamily(options.family),
hints: options.hints || 0,
};
if (!isWindows && dnsopts.family !== 4 && dnsopts.family !== 6 && dnsopts.hints === 0) {
dnsopts.hints = dns.ADDRCONFIG;
}
$debug("connect: find host", host, addressType);
$debug("connect: dns options", dnsopts);
self._host = host;
self._port = port;
const lookup = options.lookup || dns.lookup;
if (dnsopts.family !== 4 && dnsopts.family !== 6 && !localAddress && autoSelectFamily) {
$debug("connect: autodetecting", host, port);
dnsopts.all = true;
lookupAndConnectMultiple(
self,
lookup,
host,
options,
dnsopts,
port,
localAddress,
localPort,
autoSelectFamilyAttemptTimeout,
);
return;
}
lookup(host, dnsopts, function emitLookup(err, ip, addressType) {
self.emit("lookup", err, ip, addressType, host);
if (!self.connecting) return;
if (err) {
process.nextTick(destroyNT, self, err);
} else if (!isIP(ip)) {
err = $ERR_INVALID_IP_ADDRESS(ip);
process.nextTick(destroyNT, self, err);
} else if (addressType !== 4 && addressType !== 6) {
err = $ERR_INVALID_ADDRESS_FAMILY(addressType, options.host, options.port);
process.nextTick(destroyNT, self, err);
} else {
self._unrefTimer();
internalConnect(self, options, ip, port, addressType, localAddress, localPort);
}
});
}
function socketToDnsFamily(family) {
switch (family) {
case "IPv4": return 4; // prettier-ignore
case "IPv6": return 6; // prettier-ignore
}
return family;
}
function lookupAndConnectMultiple(self, lookup, host, options, dnsopts, port, localAddress, localPort, timeout) {
lookup(host, dnsopts, function emitLookup(err, addresses) {
if (!self.connecting) {
return;
} else if (err) {
self.emit("lookup", err, undefined, undefined, host);
process.nextTick(destroyNT, self, err);
return;
}
const validAddresses = [[], []];
const validIps = [[], []];
let destinations;
for (let i = 0, l = addresses.length; i < l; i++) {
const address = addresses[i];
const { address: ip, family: addressType } = address;
self.emit("lookup", err, ip, addressType, host);
if (!self.connecting) {
return;
}
if (isIP(ip) && (addressType === 4 || addressType === 6)) {
destinations ||= addressType === 6 ? { 6: 0, 4: 1 } : { 4: 0, 6: 1 };
const destination = destinations[addressType];
// Only try an address once
if (!ArrayPrototypeIncludes.$call(validIps[destination], ip)) {
ArrayPrototypePush.$call(validAddresses[destination], address);
ArrayPrototypePush.$call(validIps[destination], ip);
}
}
}
// When no AAAA or A records are available, fail on the first one
if (!validAddresses[0].length && !validAddresses[1].length) {
const { address: firstIp, family: firstAddressType } = addresses[0];
if (!isIP(firstIp)) {
err = $ERR_INVALID_IP_ADDRESS(firstIp);
process.nextTick(destroyNT, self, err);
} else if (firstAddressType !== 4 && firstAddressType !== 6) {
err = $ERR_INVALID_ADDRESS_FAMILY(firstAddressType, options.host, options.port);
process.nextTick(destroyNT, self, err);
}
return;
}
// Sort addresses alternating families
const toAttempt = [];
for (let i = 0, l = MathMax(validAddresses[0].length, validAddresses[1].length); i < l; i++) {
if (i in validAddresses[0]) {
ArrayPrototypePush.$call(toAttempt, validAddresses[0][i]);
}
if (i in validAddresses[1]) {
ArrayPrototypePush.$call(toAttempt, validAddresses[1][i]);
}
}
if (toAttempt.length === 1) {
$debug("connect/multiple: only one address found, switching back to single connection");
const { address: ip, family: addressType } = toAttempt[0];
self._unrefTimer();
internalConnect(self, options, ip, port, addressType, localAddress, localPort);
return;
}
self.autoSelectFamilyAttemptedAddresses = [];
$debug("connect/multiple: will try the following addresses", toAttempt);
const context = {
socket: self,
addresses: toAttempt,
current: 0,
port,
localPort,
timeout,
[kTimeout]: null,
errors: [],
options,
};
self._unrefTimer();
internalConnectMultiple(context);
});
}
function internalConnect(self, options, path);
function internalConnect(self, options, address, port, addressType, localAddress, localPort, _flags?) {
$assert(self.connecting);
let err;
if (localAddress || localPort) {
if (addressType === 4) {
localAddress ||= "0.0.0.0";
// TODO:
// err = self._handle.bind(localAddress, localPort);
} else {
// addressType === 6
localAddress ||= "::";
// TODO:
// err = self._handle.bind6(localAddress, localPort, flags);
}
$debug(
"connect: binding to localAddress: %s and localPort: %d (addressType: %d)",
localAddress,
localPort,
addressType,
);
err = checkBindError(err, localPort, self._handle);
if (err) {
const ex = new ExceptionWithHostPort(err, "bind", localAddress, localPort);
self.destroy(ex);
return;
}
}
//TLS
let connection = self[ksocket];
if (options.socket) {
connection = options.socket;
}
let tls = undefined;
const bunTLS = self[bunTlsSymbol];
if (typeof bunTLS === "function") {
tls = bunTLS.$call(self, port, self._host, true);
self._requestCert = true; // Client always request Cert
if (tls) {
const { rejectUnauthorized, session, checkServerIdentity } = options;
if (typeof rejectUnauthorized !== "undefined") {
self._rejectUnauthorized = rejectUnauthorized;
tls.rejectUnauthorized = rejectUnauthorized;
} else {
self._rejectUnauthorized = tls.rejectUnauthorized;
}
tls.requestCert = true;
tls.session = session || tls.session;
self.servername = tls.servername;
tls.checkServerIdentity = checkServerIdentity || tls.checkServerIdentity;
self[bunTLSConnectOptions] = tls;
if (!connection && tls.socket) {
connection = tls.socket;
}
}
self.authorized = false;
self.secureConnecting = true;
self._secureEstablished = false;
self._securePending = true;
self[kConnectOptions] = options;
self.prependListener("end", onConnectEnd);
}
//TLS
$debug("connect: attempting to connect to %s:%d (addressType: %d)", address, port, addressType);
self.emit("connectionAttempt", address, port, addressType);
if (addressType === 6 || addressType === 4) {
if (self.blockList?.check(address, `ipv${addressType}`)) {
self.destroy($ERR_IP_BLOCKED(address));
return;
}
const req: any = {};
req.oncomplete = afterConnect;
req.address = address;
req.port = port;
req.localAddress = localAddress;
req.localPort = localPort;
req.addressType = addressType;
req.tls = tls;
err = kConnectTcp(self, addressType, req, address, port);
} else {
const req: any = {};
req.address = address;
req.oncomplete = afterConnect;
req.tls = tls;
err = kConnectPipe(self, req, address);
}
if (err) {
const ex = new ExceptionWithHostPort(err, "connect", address, port);
self.destroy(ex);
}
}
function internalConnectMultiple(context, canceled?) {
clearTimeout(context[kTimeout]);
const self = context.socket;
// We were requested to abort. Stop all operations
if (self._aborted) {
return;
}
// All connections have been tried without success, destroy with error
if (canceled || context.current === context.addresses.length) {
if (context.errors.length === 0) {
self.destroy($ERR_SOCKET_CONNECTION_TIMEOUT());
return;
}
self.destroy(new NodeAggregateError(context.errors));
return;
}
$assert(self.connecting);
const current = context.current++;
if (current > 0) {
self[kReinitializeHandle](newDetachedSocket(typeof self[bunTlsSymbol] === "function"));
}
const { localPort, port, _flags } = context;
const { address, family: addressType } = context.addresses[current];
let localAddress;
let err;
if (localPort) {
if (addressType === 4) {
localAddress = DEFAULT_IPV4_ADDR;
// TODO:
// err = self._handle.bind(localAddress, localPort);
} else {
// addressType === 6
localAddress = DEFAULT_IPV6_ADDR;
// TODO:
// err = self._handle.bind6(localAddress, localPort, flags);
}
$debug(
"connect/multiple: binding to localAddress: %s and localPort: %d (addressType: %d)",
localAddress,
localPort,
addressType,
);
err = checkBindError(err, localPort, self._handle);
if (err) {
ArrayPrototypePush.$call(context.errors, new ExceptionWithHostPort(err, "bind", localAddress, localPort));
internalConnectMultiple(context);
return;
}
}
if (self.blockList?.check(address, `ipv${addressType}`)) {
const ex = $ERR_IP_BLOCKED(address);
ArrayPrototypePush.$call(context.errors, ex);
self.emit("connectionAttemptFailed", address, port, addressType, ex);
internalConnectMultiple(context);
return;
}
//TLS
let connection = self[ksocket];
if (context.options.socket) {
connection = context.options.socket;
}
let tls = undefined;
const bunTLS = self[bunTlsSymbol];
if (typeof bunTLS === "function") {
tls = bunTLS.$call(self, port, self._host, true);
self._requestCert = true; // Client always request Cert
if (tls) {
const { rejectUnauthorized, session, checkServerIdentity } = context.options;
if (typeof rejectUnauthorized !== "undefined") {
self._rejectUnauthorized = rejectUnauthorized;
tls.rejectUnauthorized = rejectUnauthorized;
} else {
self._rejectUnauthorized = tls.rejectUnauthorized;
}
tls.requestCert = true;
tls.session = session || tls.session;
self.servername = tls.servername;
tls.checkServerIdentity = checkServerIdentity || tls.checkServerIdentity;
self[bunTLSConnectOptions] = tls;
if (!connection && tls.socket) {
connection = tls.socket;
}
}
self.authorized = false;
self.secureConnecting = true;
self._secureEstablished = false;
self._securePending = true;
self[kConnectOptions] = context.options;
self.prependListener("end", onConnectEnd);
}
//TLS
$debug("connect/multiple: attempting to connect to %s:%d (addressType: %d)", address, port, addressType);
self.emit("connectionAttempt", address, port, addressType);
// const req = new TCPConnectWrap();
const req = {};
req.oncomplete = afterConnectMultiple.bind(undefined, context, current);
req.address = address;
req.port = port;
req.localAddress = localAddress;
req.localPort = localPort;
req.addressType = addressType;
req.tls = tls;
ArrayPrototypePush.$call(self.autoSelectFamilyAttemptedAddresses, `${address}:${port}`);
err = kConnectTcp(self, addressType, req, address, port);
if (err) {
const ex = new ExceptionWithHostPort(err, "connect", address, port);
ArrayPrototypePush.$call(context.errors, ex);
self.emit("connectionAttemptFailed", address, port, addressType, ex);
internalConnectMultiple(context);
return;
}
if (current < context.addresses.length - 1) {
$debug("connect/multiple: setting the attempt timeout to %d ms", context.timeout);
// If the attempt has not returned an error, start the connection timer
context[kTimeout] = setTimeout(internalConnectMultipleTimeout, context.timeout, context, req, self._handle).unref();
}
}
function internalConnectMultipleTimeout(context, req, handle) {
$debug("connect/multiple: connection to %s:%s timed out", req.address, req.port);
context.socket.emit("connectionAttemptTimeout", req.address, req.port, req.addressType);
req.oncomplete = undefined;
ArrayPrototypePush.$call(context.errors, createConnectionError(req, UV_ETIMEDOUT));
handle.close();
// Try the next address, unless we were aborted
if (context.socket.connecting) {
internalConnectMultiple(context);
}
}
function afterConnect(status, handle, req, readable, writable) {
if (!handle) return;
const self = handle[owner_symbol];
if (!self) return;
// Callback may come after call to destroy
if (self.destroyed) {
return;
}
$debug("afterConnect", status, readable, writable);
$assert(self.connecting);
self.connecting = false;
self._sockname = null;
if (status === 0) {
if (self.readable && !readable) {
self.push(null);
self.read();
}
if (self.writable && !writable) {
self.end();
}
self._unrefTimer();
if (self[kSetNoDelay] && self._handle.setNoDelay) {
self._handle.setNoDelay(true);
}
if (self[kSetKeepAlive] && self._handle.setKeepAlive) {
self._handle.setKeepAlive(true, self[kSetKeepAliveInitialDelay]);
}
self.emit("connect");
self.emit("ready");
// Start the first read, or get an immediate EOF.
// this doesn't actually consume any bytes, because len=0.
if (readable && !self.isPaused()) self.read(0);
} else {
let details;
if (req.localAddress && req.localPort) {
details = req.localAddress + ":" + req.localPort;
}
const ex = new ExceptionWithHostPort(status, "connect", req.address, req.port);
if (details) {
ex.localAddress = req.localAddress;
ex.localPort = req.localPort;
}
self.emit("connectionAttemptFailed", req.address, req.port, req.addressType, ex);
self.destroy(ex);
}
}
function afterConnectMultiple(context, current, status, handle, req, readable, writable) {
$debug("connect/multiple: connection attempt to %s:%s completed with status %s", req.address, req.port, status);
// Make sure another connection is not spawned
$debug("clearTimeout", context[kTimeout]);
clearTimeout(context[kTimeout]);
// One of the connection has completed and correctly dispatched but after timeout, ignore this one
if (status === 0 && current !== context.current - 1) {
$debug("connect/multiple: ignoring successful but timedout connection to %s:%s", req.address, req.port);
handle.close();
return;
}
const self = context.socket;
// Some error occurred, add to the list of exceptions
if (status !== 0) {
const ex = createConnectionError(req, status);
ArrayPrototypePush.$call(context.errors, ex);
self.emit("connectionAttemptFailed", req.address, req.port, req.addressType, ex);
// Try the next address, unless we were aborted
if (context.socket.connecting) {
internalConnectMultiple(context, status === UV_ECANCELED);
}
return;
}
afterConnect(status, self._handle, req, readable, writable);
}
function createConnectionError(req, status) {
let details;
if (req.localAddress && req.localPort) {
details = req.localAddress + ":" + req.localPort;
}
const ex = new ExceptionWithHostPort(status, "connect", req.address, req.port);
if (details) {
ex.localAddress = req.localAddress;
ex.localPort = req.localPort;
}
return ex;
}
type MaybeListener = SocketListener<unknown> | null;