diff --git a/test/js/web/fetch/fetch-tcp-stress.test.ts b/test/js/web/fetch/fetch-tcp-stress.test.ts index 0188b9e4cc..6766fbfb5b 100644 --- a/test/js/web/fetch/fetch-tcp-stress.test.ts +++ b/test/js/web/fetch/fetch-tcp-stress.test.ts @@ -14,90 +14,96 @@ async function runStressTest({ onServerWritten: (socket) => void; onFetchWritten: (socket) => void; }) { - const total = PORT_EXHAUSTION_THRESHOLD * 2; - let sockets = []; - const batch = 48; - let toClose = 0; - let pendingClose = Promise.withResolvers(); - const objects = []; - for (let i = 0; i < total; i++) { - objects.push({ - method: "POST", - body: "--BYTEMARKER: " + (10 + i) + " ", - keepalive: false, - }); - } + let initialMaxFD = -1; + + { + const total = PORT_EXHAUSTION_THRESHOLD * 2; + let sockets = []; + const batch = 48; + let toClose = 0; + let pendingClose = Promise.withResolvers(); + const objects = []; + for (let i = 0; i < total; i++) { + objects.push({ + method: "POST", + body: "--BYTEMARKER: " + (10 + i) + " ", + keepalive: false, + }); + } + + const server = await Bun.listen({ + port: 0, + socket: { + open(socket) {}, + data(socket, data) { + const text = new TextDecoder().decode(data); + const i = parseInt(text.slice(text.indexOf("--BYTEMARKER: ") + "--BYTEMARKER: ".length).slice(0, 3)) - 10; + if (text.includes(objects[i].body)) { + socket.data ??= {}; + socket.data.read = true; + sockets[i] = socket; + if (socket.write("200 OK\r\nCo") === "200 OK\r\nCo".length) { + socket.data.written = true; + onServerWritten(socket); + } + return; + } + + console.log("Data is missing!"); + }, + drain(socket) { + if (!socket.data?.read || socket.data?.written) { + return; + } - const server = await Bun.listen({ - port: 0, - socket: { - open(socket) {}, - data(socket, data) { - const text = new TextDecoder().decode(data); - const i = parseInt(text.slice(text.indexOf("--BYTEMARKER: ") + "--BYTEMARKER: ".length).slice(0, 3)) - 10; - if (text.includes(objects[i].body)) { - socket.data ??= {}; - socket.data.read = true; - sockets[i] = socket; if (socket.write("200 OK\r\nCo") === "200 OK\r\nCo".length) { socket.data.written = true; onServerWritten(socket); } - return; + }, + error(socket, err) { + console.log(err); + }, + timeout() {}, + close(socket) { + toClose--; + if (toClose === 0) { + pendingClose.resolve(); + } + }, + }, + hostname: "127.0.0.1", + }); + for (let remaining = total; remaining > 0; remaining -= batch) { + pendingClose = Promise.withResolvers(); + { + const promises = []; + toClose = batch; + for (let i = 0; i < batch; i++) { + promises.push( + fetch(`http://127.0.0.1:${server.port}`, objects[i]).finally(() => { + onFetchWritten(sockets[i]); + }), + ); } + await Promise.allSettled(promises); - console.log("Data is missing!"); - }, - drain(socket) { - if (!socket.data?.read || socket.data?.written) { - return; - } - - if (socket.write("200 OK\r\nCo") === "200 OK\r\nCo".length) { - socket.data.written = true; - onServerWritten(socket); - } - }, - error(socket, err) { - console.log(err); - }, - timeout() {}, - close(socket) { - toClose--; - if (toClose === 0) { - pendingClose.resolve(); - } - }, - }, - hostname: "127.0.0.1", - }); - let initialMaxFD = -1; - for (let remaining = total; remaining > 0; remaining -= batch) { - pendingClose = Promise.withResolvers(); - { - const promises = []; - toClose = batch; - for (let i = 0; i < batch; i++) { - promises.push( - fetch(`http://127.0.0.1:${server.port}`, objects[i]).finally(() => { - onFetchWritten(sockets[i]); - }), - ); + promises.length = 0; } - await Promise.allSettled(promises); - promises.length = 0; - } - - await pendingClose.promise; - if (total) sockets = []; - - if (initialMaxFD === -1) { - initialMaxFD = getMaxFD(); + await pendingClose.promise; + if (total) sockets = []; + + if (initialMaxFD === -1) { + initialMaxFD = getMaxFD(); + } } + server.stop(true); } - server.stop(true); - await Bun.sleep(10); + Bun.gc(true); + + await Bun.sleep(80); + expect(getMaxFD()).toBeLessThan(initialMaxFD + 10); } diff --git a/test/js/web/fetch/tcp-socket-leak.test.ts b/test/js/web/fetch/tcp-socket-leak.test.ts new file mode 100644 index 0000000000..d83b5b3563 --- /dev/null +++ b/test/js/web/fetch/tcp-socket-leak.test.ts @@ -0,0 +1,59 @@ +import { expect, test } from "bun:test"; +import { connect, listen } from "bun"; +import { getMaxFD } from "harness"; + +test("tcp socket doesn't leak", async () => { + const init = getMaxFD(); + { + let onClose = () => {}; + const server = listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + data(socket, data) { + socket.write("hi"); + }, + open(socket) {}, + close(socket) { + onClose(socket); + }, + }, + }); + + let attempts = 1000; + while ((attempts -= 50) >= 0) { + let batch = []; + let closed = []; + for (let i = 0; i < 50; i++) { + const onClose = Promise.withResolvers(); + closed.push(onClose.promise); + batch.push( + connect({ + port: server.port, + hostname: server.hostname, + socket: { + close(socket) { + onClose.resolve(socket); + }, + data(socket, data) {}, + open(socket) { + socket.write("hi"); + }, + }, + }), + ); + } + + const sockets = await Promise.all(batch); + sockets.forEach(socket => socket.end()); + await Promise.all(closed); + } + server.stop(true); + } + Bun.gc(true); + await Bun.sleep(1000); + Bun.gc(true); + const end = getMaxFD(); + console.log({ init, end }); + expect(end - init).toBeLessThan(100); +});