add some tests

This commit is contained in:
Don Isaac
2025-03-04 16:24:12 -08:00
parent 3f089a80d1
commit 2fdf9c489a
3 changed files with 206 additions and 31 deletions

View File

@@ -1,6 +1,6 @@
const Duplex: DuplexConstructor = require("internal/streams/duplex");
const { validateNumber, validateFunction, validateUint32 } = require("internal/validators");
const { isIP } = require("internal/net/ip");
const { isIP } = require("internal/net/ip") as Readonly<{ isIP: (ip: string) => 0 | 4 | 6 }>;
const { getTimerDuration } = require("internal/timers");
const {
upgradeDuplexToTLS,
@@ -18,12 +18,13 @@ const {
bunSocketServerOptions: symbol;
}>;
import type { TCPSocket, TLSSocket } from "bun";
import type { TCPSocket, TCPSocketConnectOptions, TLSSocket, UnixSocketOptions } from "bun";
import type { Duplex as IDuplex } from "node:stream";
const { connect: bunConnect } = Bun;
const kServerSocket = Symbol("kServerSocket");
const kTimeout = Symbol("kTimeout");
const kBytesWritten = Symbol("kBytesWritten");
const bunTLSConnectOptions = Symbol.for("::buntlsconnectoptions::");
const kSetNoDelay = Symbol("kSetNoDelay");
@@ -362,6 +363,7 @@ class Socket extends Duplex {
declare _requestCert?: boolean;
declare _rejectUnauthorized?: boolean;
timeout = 0;
[kTimeout]: Timer | null = null;
#writeCallback;
_pendingData;
_pendingEncoding; // for compatibility
@@ -409,7 +411,7 @@ class Socket extends Duplex {
// Handle strings directly.
decodeStrings: false,
});
this._parent = this;
this._parent = null;
this._parentWrap = this;
this.#pendingRead = undefined;
this.#upgraded = null;
@@ -582,15 +584,14 @@ class Socket extends Duplex {
});
}
this.pauseOnConnect = pauseOnConnect;
if (!pauseOnConnect) {
process.nextTick(() => {
this.resume();
});
this.connecting = true;
}
// this.pauseOnConnect = pauseOnConnect;
if (fd) {
// if (!pauseOnConnect) {
// process.nextTick(() => {
// this.resume();
// });
// }
return this;
}
@@ -743,33 +744,17 @@ class Socket extends Duplex {
}
} else if (path) {
// start using unix socket
bunConnect({
this.connecting = true;
this.#internalConnect({
data: this,
unix: path,
socket: this.#handlers,
tls,
allowHalfOpen: this.allowHalfOpen,
}).catch(error => {
if (!this.destroyed) {
this.emit("error", error);
this.emit("close");
}
});
} else {
// default start
bunConnect({
data: this,
hostname: host || "localhost",
port: port,
socket: this.#handlers,
tls,
allowHalfOpen: this.allowHalfOpen,
}).catch(error => {
if (!this.destroyed) {
this.emit("error", error);
this.emit("close");
}
});
this.#lookupAndConnect(port, family, host, tls);
}
} catch (error) {
process.nextTick(emitErrorAndCloseNextTick, this, error);
@@ -777,6 +762,90 @@ class Socket extends Duplex {
return this;
}
async #lookupAndConnect(
port: number,
family: 4 | 6 | 0 | "IPv4" | "IPv6" | "any" | undefined,
hostname: string | undefined,
tls,
) {
this.connecting = true;
try {
// TODO: options.lookup
var lookup = await Bun.dns.lookup(hostname || "localhost", {
family,
port,
socketType: "tcp",
});
} catch (error) {
// It's possible we were destroyed while looking this up.
if (!this.connecting) return;
this.emit("lookup", error, undefined, undefined, hostname);
process.nextTick(connectErrorNT, this, error);
return;
}
// It's possible we were destroyed while looking this up.
if (!this.connecting) return;
$assert(lookup.length > 0);
if (lookup.length === 0) {
this.emit("lookup", new Error("getaddrinfo ENOTFOUND"), undefined, undefined, hostname);
process.nextTick(connectErrorNT, this, new Error("getaddrinfo ENOTFOUND"));
return;
}
// NOTE: Node uses all the addresses returned by dns.lookup, but our
// Bun.connect API doesn't support this
const { address: ip, family: addressType } = lookup[0];
$assert(isIP(ip) == addressType);
this.emit("lookup", null, ip, addressType, hostname);
$debug("attempting to connect to %s:%d (addressType: %d)", ip, port, addressType);
// console.log("attempting to connect to %s:%d (addressType: %d)", ip, port, addressType);
this.emit("connectionAttempt", ip, port, addressType);
this._unrefTimer();
this.#internalConnect({
data: this,
port,
host: ip,
family: addressType,
socket: this.#handlers,
allowHalfOpen: this.allowHalfOpen,
tls,
});
}
// #lookupAndConnect(port: number, family: 4 | 6 | 0 | "IPv4" | "IPv6" | "any", hostname = "localhost") {
// this.connecting = true;
// try {
// var lookup = await Bun.dns.lookup(hostname, {
// family,
// port,
// socketType: "tcp",
// });
// } catch (error) {
// if (!this.destroyed) {
// this.emit("error", error);
// this.emit("close");
// }
// return;
// }
// }
#internalConnect(options: TCPSocketConnectOptions<this>): Promise<void>;
#internalConnect(options: UnixSocketOptions<this>): Promise<void>;
async #internalConnect(options: TCPSocketConnectOptions<this> | UnixSocketOptions<this>): Promise<void> {
$assert(this.connecting);
try {
await bunConnect(options as any);
} catch (error) {
if (!this.destroyed) {
this.emit("error", error);
this.emit("close");
connectErrorNT(this, error);
}
}
}
end(...args) {
if (!this._readableState.endEmitted) {
this.secureConnecting = false;
@@ -886,6 +955,12 @@ class Socket extends Duplex {
return "IPv4";
}
private _unrefTimer() {
for (let socket = this; socket != null; socket = socket._parent) {
socket[kTimeout]?.refresh();
}
}
resetAndDestroy() {
if (this._handle) {
if (this.connecting) {
@@ -949,8 +1024,6 @@ class Socket extends Duplex {
}
return this;
}
// for compatibility
_unrefTimer() {}
unref() {
const socket = this._handle;
if (!socket) {
@@ -1074,6 +1147,7 @@ function finishSocket(hasError) {
function destroyNT(self, err) {
self.destroy(err);
}
const connectErrorNT = destroyNT;
function destroyWhenAborted(err) {
if (!this.destroyed) {
this.destroy(err.target.reason);

View File

@@ -0,0 +1,46 @@
const net = require("node:net");
const server = new net.Server();
const client = new net.Socket();
const serverEmit = server.emit,
clientEmit = client.emit;
const verboseEmit = (name, originalEmit) => {
const log = (...args) => console.log(`[${name}]`, ...args);
return function verboseEmit(...args) {
const [eventName, ...rest] = args;
switch (eventName) {
case "data":
log("data:", ...rest.map(d => d.toString()));
break;
default:
if (args[1] && args[1] instanceof Error) {
log(eventName, args[1].message);
} else {
log(eventName);
}
}
return originalEmit.apply(this, args);
};
};
Object.defineProperty(server, "emit", { value: verboseEmit("server", serverEmit) });
Object.defineProperty(client, "emit", { value: verboseEmit("client", clientEmit) });
server.on("connection", socket => {
socket.on("data", data => {
console.log("[server] socket data:", data.toString());
socket.write(data);
process.nextTick(() => socket.end());
});
socket.on("close", () => server.close());
});
client.on("data", () => client.end());
server.listen(0, () => {
client.connect(server.address(), () => {
client.write("ping");
});
});

View File

@@ -0,0 +1,55 @@
import { bunRun } from "harness";
import path from "node:path";
const fixturePath = (...segs: string[]): string => path.join(import.meta.dirname, "fixtures", "socket", ...segs);
function cleanLogs(text: string): string {
return text
.split("\n")
.filter(line => !line.startsWith("mimalloc:"))
.join("\n");
}
describe("Given a ping server/client", () => {
let logs: string[], clientLogs, serverLogs;
beforeAll(() => {
logs = bunRun(fixturePath("ping.fixture.js")).stdout.split("\n");
clientLogs = logs.filter(line => line.startsWith("[client]"));
serverLogs = logs.filter(line => line.startsWith("[server]"));
});
it("no errors occur", () => {
expect(logs.find(line => /error/i.test(line))).toBeUndefined();
});
describe("the client", () => {
it("emits a connect event", () => expect(clientLogs).toContain("[client] connect"));
it("emits a DNS 'lookup' event before attempting connect", () => {
expect(clientLogs).toContain("[client] lookup");
expect(clientLogs.indexOf("[client] lookup")).toBeLessThan(clientLogs.indexOf("[client] connectionAttempt"));
});
it("emits a 'connectionAttempt' event before connecting", () => {
expect(clientLogs).toContain("[client] connectionAttempt");
expect(clientLogs.indexOf("[client] connectionAttempt")).toBeLessThan(clientLogs.indexOf("[client] connect"));
});
it('receives "ping" from the server', () => expect(clientLogs).toContain("[client] data: ping"));
it("emits 'prefinish' before 'finish'", () => {
expect(clientLogs).toContain("[client] prefinish");
expect(clientLogs).toContain("[client] finish");
expect(clientLogs.indexOf("[client] prefinish")).toBeLessThan(clientLogs.indexOf("[client] finish"));
});
it("finishes with a close event", () => {
expect(logs).toContain("[client] close");
expect(clientLogs.at(-1)).toEqual("[client] close");
});
}); // the client
describe("the server", () => {
it("emits a 'connection' event", () => expect(serverLogs).toContain("[server] connection"));
it("receives 'ping' from the client", () => expect(serverLogs).toContain("[server] socket data: ping"));
it("finishes with a 'close' event", () => {
expect(logs).toContain("[server] close");
expect(serverLogs.at(-1)).toEqual("[server] close");
});
}); // the server
});