From 2fdf9c489a56520775aafbad432cf602aa67efd1 Mon Sep 17 00:00:00 2001 From: Don Isaac Date: Tue, 4 Mar 2025 16:24:12 -0800 Subject: [PATCH] add some tests --- src/js/internal/net/socket.ts | 136 ++++++++++++++---- .../node/net/fixtures/socket/ping.fixture.js | 46 ++++++ test/js/node/net/socket.spec.ts | 55 +++++++ 3 files changed, 206 insertions(+), 31 deletions(-) create mode 100644 test/js/node/net/fixtures/socket/ping.fixture.js create mode 100644 test/js/node/net/socket.spec.ts diff --git a/src/js/internal/net/socket.ts b/src/js/internal/net/socket.ts index b029fb7dac..b796b52476 100644 --- a/src/js/internal/net/socket.ts +++ b/src/js/internal/net/socket.ts @@ -1,6 +1,6 @@ const Duplex: DuplexConstructor = require("internal/streams/duplex"); const { validateNumber, validateFunction, validateUint32 } = require("internal/validators"); -const { isIP } = require("internal/net/ip"); +const { isIP } = require("internal/net/ip") as Readonly<{ isIP: (ip: string) => 0 | 4 | 6 }>; const { getTimerDuration } = require("internal/timers"); const { upgradeDuplexToTLS, @@ -18,12 +18,13 @@ const { bunSocketServerOptions: symbol; }>; -import type { TCPSocket, TLSSocket } from "bun"; +import type { TCPSocket, TCPSocketConnectOptions, TLSSocket, UnixSocketOptions } from "bun"; import type { Duplex as IDuplex } from "node:stream"; const { connect: bunConnect } = Bun; const kServerSocket = Symbol("kServerSocket"); +const kTimeout = Symbol("kTimeout"); const kBytesWritten = Symbol("kBytesWritten"); const bunTLSConnectOptions = Symbol.for("::buntlsconnectoptions::"); const kSetNoDelay = Symbol("kSetNoDelay"); @@ -362,6 +363,7 @@ class Socket extends Duplex { declare _requestCert?: boolean; declare _rejectUnauthorized?: boolean; timeout = 0; + [kTimeout]: Timer | null = null; #writeCallback; _pendingData; _pendingEncoding; // for compatibility @@ -409,7 +411,7 @@ class Socket extends Duplex { // Handle strings directly. decodeStrings: false, }); - this._parent = this; + this._parent = null; this._parentWrap = this; this.#pendingRead = undefined; this.#upgraded = null; @@ -582,15 +584,14 @@ class Socket extends Duplex { }); } - this.pauseOnConnect = pauseOnConnect; - if (!pauseOnConnect) { - process.nextTick(() => { - this.resume(); - }); - this.connecting = true; - } + // this.pauseOnConnect = pauseOnConnect; if (fd) { + // if (!pauseOnConnect) { + // process.nextTick(() => { + // this.resume(); + // }); + // } return this; } @@ -743,33 +744,17 @@ class Socket extends Duplex { } } else if (path) { // start using unix socket - bunConnect({ + this.connecting = true; + this.#internalConnect({ data: this, unix: path, socket: this.#handlers, tls, allowHalfOpen: this.allowHalfOpen, - }).catch(error => { - if (!this.destroyed) { - this.emit("error", error); - this.emit("close"); - } }); } else { // default start - bunConnect({ - data: this, - hostname: host || "localhost", - port: port, - socket: this.#handlers, - tls, - allowHalfOpen: this.allowHalfOpen, - }).catch(error => { - if (!this.destroyed) { - this.emit("error", error); - this.emit("close"); - } - }); + this.#lookupAndConnect(port, family, host, tls); } } catch (error) { process.nextTick(emitErrorAndCloseNextTick, this, error); @@ -777,6 +762,90 @@ class Socket extends Duplex { return this; } + async #lookupAndConnect( + port: number, + family: 4 | 6 | 0 | "IPv4" | "IPv6" | "any" | undefined, + hostname: string | undefined, + tls, + ) { + this.connecting = true; + try { + // TODO: options.lookup + var lookup = await Bun.dns.lookup(hostname || "localhost", { + family, + port, + socketType: "tcp", + }); + } catch (error) { + // It's possible we were destroyed while looking this up. + if (!this.connecting) return; + this.emit("lookup", error, undefined, undefined, hostname); + process.nextTick(connectErrorNT, this, error); + return; + } + + // It's possible we were destroyed while looking this up. + if (!this.connecting) return; + $assert(lookup.length > 0); + if (lookup.length === 0) { + this.emit("lookup", new Error("getaddrinfo ENOTFOUND"), undefined, undefined, hostname); + process.nextTick(connectErrorNT, this, new Error("getaddrinfo ENOTFOUND")); + return; + } + + // NOTE: Node uses all the addresses returned by dns.lookup, but our + // Bun.connect API doesn't support this + const { address: ip, family: addressType } = lookup[0]; + $assert(isIP(ip) == addressType); + this.emit("lookup", null, ip, addressType, hostname); + $debug("attempting to connect to %s:%d (addressType: %d)", ip, port, addressType); + // console.log("attempting to connect to %s:%d (addressType: %d)", ip, port, addressType); + this.emit("connectionAttempt", ip, port, addressType); + this._unrefTimer(); + this.#internalConnect({ + data: this, + port, + host: ip, + family: addressType, + socket: this.#handlers, + allowHalfOpen: this.allowHalfOpen, + tls, + }); + } + + // #lookupAndConnect(port: number, family: 4 | 6 | 0 | "IPv4" | "IPv6" | "any", hostname = "localhost") { + // this.connecting = true; + // try { + // var lookup = await Bun.dns.lookup(hostname, { + // family, + // port, + // socketType: "tcp", + // }); + // } catch (error) { + // if (!this.destroyed) { + // this.emit("error", error); + // this.emit("close"); + // } + // return; + // } + // } + + #internalConnect(options: TCPSocketConnectOptions): Promise; + #internalConnect(options: UnixSocketOptions): Promise; + async #internalConnect(options: TCPSocketConnectOptions | UnixSocketOptions): Promise { + $assert(this.connecting); + + try { + await bunConnect(options as any); + } catch (error) { + if (!this.destroyed) { + this.emit("error", error); + this.emit("close"); + connectErrorNT(this, error); + } + } + } + end(...args) { if (!this._readableState.endEmitted) { this.secureConnecting = false; @@ -886,6 +955,12 @@ class Socket extends Duplex { return "IPv4"; } + private _unrefTimer() { + for (let socket = this; socket != null; socket = socket._parent) { + socket[kTimeout]?.refresh(); + } + } + resetAndDestroy() { if (this._handle) { if (this.connecting) { @@ -949,8 +1024,6 @@ class Socket extends Duplex { } return this; } - // for compatibility - _unrefTimer() {} unref() { const socket = this._handle; if (!socket) { @@ -1074,6 +1147,7 @@ function finishSocket(hasError) { function destroyNT(self, err) { self.destroy(err); } +const connectErrorNT = destroyNT; function destroyWhenAborted(err) { if (!this.destroyed) { this.destroy(err.target.reason); diff --git a/test/js/node/net/fixtures/socket/ping.fixture.js b/test/js/node/net/fixtures/socket/ping.fixture.js new file mode 100644 index 0000000000..9f9cec7fce --- /dev/null +++ b/test/js/node/net/fixtures/socket/ping.fixture.js @@ -0,0 +1,46 @@ +const net = require("node:net"); + +const server = new net.Server(); +const client = new net.Socket(); + +const serverEmit = server.emit, + clientEmit = client.emit; + +const verboseEmit = (name, originalEmit) => { + const log = (...args) => console.log(`[${name}]`, ...args); + return function verboseEmit(...args) { + const [eventName, ...rest] = args; + switch (eventName) { + case "data": + log("data:", ...rest.map(d => d.toString())); + break; + default: + if (args[1] && args[1] instanceof Error) { + log(eventName, args[1].message); + } else { + log(eventName); + } + } + return originalEmit.apply(this, args); + }; +}; + +Object.defineProperty(server, "emit", { value: verboseEmit("server", serverEmit) }); +Object.defineProperty(client, "emit", { value: verboseEmit("client", clientEmit) }); + +server.on("connection", socket => { + socket.on("data", data => { + console.log("[server] socket data:", data.toString()); + socket.write(data); + process.nextTick(() => socket.end()); + }); + socket.on("close", () => server.close()); +}); + +client.on("data", () => client.end()); + +server.listen(0, () => { + client.connect(server.address(), () => { + client.write("ping"); + }); +}); diff --git a/test/js/node/net/socket.spec.ts b/test/js/node/net/socket.spec.ts new file mode 100644 index 0000000000..2ae48fff54 --- /dev/null +++ b/test/js/node/net/socket.spec.ts @@ -0,0 +1,55 @@ +import { bunRun } from "harness"; +import path from "node:path"; + +const fixturePath = (...segs: string[]): string => path.join(import.meta.dirname, "fixtures", "socket", ...segs); +function cleanLogs(text: string): string { + return text + .split("\n") + .filter(line => !line.startsWith("mimalloc:")) + .join("\n"); +} + +describe("Given a ping server/client", () => { + let logs: string[], clientLogs, serverLogs; + + beforeAll(() => { + logs = bunRun(fixturePath("ping.fixture.js")).stdout.split("\n"); + clientLogs = logs.filter(line => line.startsWith("[client]")); + serverLogs = logs.filter(line => line.startsWith("[server]")); + }); + + it("no errors occur", () => { + expect(logs.find(line => /error/i.test(line))).toBeUndefined(); + }); + + describe("the client", () => { + it("emits a connect event", () => expect(clientLogs).toContain("[client] connect")); + it("emits a DNS 'lookup' event before attempting connect", () => { + expect(clientLogs).toContain("[client] lookup"); + expect(clientLogs.indexOf("[client] lookup")).toBeLessThan(clientLogs.indexOf("[client] connectionAttempt")); + }); + it("emits a 'connectionAttempt' event before connecting", () => { + expect(clientLogs).toContain("[client] connectionAttempt"); + expect(clientLogs.indexOf("[client] connectionAttempt")).toBeLessThan(clientLogs.indexOf("[client] connect")); + }); + it('receives "ping" from the server', () => expect(clientLogs).toContain("[client] data: ping")); + it("emits 'prefinish' before 'finish'", () => { + expect(clientLogs).toContain("[client] prefinish"); + expect(clientLogs).toContain("[client] finish"); + expect(clientLogs.indexOf("[client] prefinish")).toBeLessThan(clientLogs.indexOf("[client] finish")); + }); + it("finishes with a close event", () => { + expect(logs).toContain("[client] close"); + expect(clientLogs.at(-1)).toEqual("[client] close"); + }); + }); // the client + + describe("the server", () => { + it("emits a 'connection' event", () => expect(serverLogs).toContain("[server] connection")); + it("receives 'ping' from the client", () => expect(serverLogs).toContain("[server] socket data: ping")); + it("finishes with a 'close' event", () => { + expect(logs).toContain("[server] close"); + expect(serverLogs.at(-1)).toEqual("[server] close"); + }); + }); // the server +});