Attempt to deflake tcp socket test

This commit is contained in:
Jarred Sumner
2024-09-26 22:08:53 -07:00
parent 392a58b0ed
commit 856c64fc9f
2 changed files with 138 additions and 73 deletions

View File

@@ -14,90 +14,96 @@ async function runStressTest({
onServerWritten: (socket) => void;
onFetchWritten: (socket) => void;
}) {
const total = PORT_EXHAUSTION_THRESHOLD * 2;
let sockets = [];
const batch = 48;
let toClose = 0;
let pendingClose = Promise.withResolvers();
const objects = [];
for (let i = 0; i < total; i++) {
objects.push({
method: "POST",
body: "--BYTEMARKER: " + (10 + i) + " ",
keepalive: false,
});
}
let initialMaxFD = -1;
{
const total = PORT_EXHAUSTION_THRESHOLD * 2;
let sockets = [];
const batch = 48;
let toClose = 0;
let pendingClose = Promise.withResolvers();
const objects = [];
for (let i = 0; i < total; i++) {
objects.push({
method: "POST",
body: "--BYTEMARKER: " + (10 + i) + " ",
keepalive: false,
});
}
const server = await Bun.listen({
port: 0,
socket: {
open(socket) {},
data(socket, data) {
const text = new TextDecoder().decode(data);
const i = parseInt(text.slice(text.indexOf("--BYTEMARKER: ") + "--BYTEMARKER: ".length).slice(0, 3)) - 10;
if (text.includes(objects[i].body)) {
socket.data ??= {};
socket.data.read = true;
sockets[i] = socket;
if (socket.write("200 OK\r\nCo") === "200 OK\r\nCo".length) {
socket.data.written = true;
onServerWritten(socket);
}
return;
}
console.log("Data is missing!");
},
drain(socket) {
if (!socket.data?.read || socket.data?.written) {
return;
}
const server = await Bun.listen({
port: 0,
socket: {
open(socket) {},
data(socket, data) {
const text = new TextDecoder().decode(data);
const i = parseInt(text.slice(text.indexOf("--BYTEMARKER: ") + "--BYTEMARKER: ".length).slice(0, 3)) - 10;
if (text.includes(objects[i].body)) {
socket.data ??= {};
socket.data.read = true;
sockets[i] = socket;
if (socket.write("200 OK\r\nCo") === "200 OK\r\nCo".length) {
socket.data.written = true;
onServerWritten(socket);
}
return;
},
error(socket, err) {
console.log(err);
},
timeout() {},
close(socket) {
toClose--;
if (toClose === 0) {
pendingClose.resolve();
}
},
},
hostname: "127.0.0.1",
});
for (let remaining = total; remaining > 0; remaining -= batch) {
pendingClose = Promise.withResolvers();
{
const promises = [];
toClose = batch;
for (let i = 0; i < batch; i++) {
promises.push(
fetch(`http://127.0.0.1:${server.port}`, objects[i]).finally(() => {
onFetchWritten(sockets[i]);
}),
);
}
await Promise.allSettled(promises);
console.log("Data is missing!");
},
drain(socket) {
if (!socket.data?.read || socket.data?.written) {
return;
}
if (socket.write("200 OK\r\nCo") === "200 OK\r\nCo".length) {
socket.data.written = true;
onServerWritten(socket);
}
},
error(socket, err) {
console.log(err);
},
timeout() {},
close(socket) {
toClose--;
if (toClose === 0) {
pendingClose.resolve();
}
},
},
hostname: "127.0.0.1",
});
let initialMaxFD = -1;
for (let remaining = total; remaining > 0; remaining -= batch) {
pendingClose = Promise.withResolvers();
{
const promises = [];
toClose = batch;
for (let i = 0; i < batch; i++) {
promises.push(
fetch(`http://127.0.0.1:${server.port}`, objects[i]).finally(() => {
onFetchWritten(sockets[i]);
}),
);
promises.length = 0;
}
await Promise.allSettled(promises);
promises.length = 0;
}
await pendingClose.promise;
if (total) sockets = [];
if (initialMaxFD === -1) {
initialMaxFD = getMaxFD();
await pendingClose.promise;
if (total) sockets = [];
if (initialMaxFD === -1) {
initialMaxFD = getMaxFD();
}
}
server.stop(true);
}
server.stop(true);
await Bun.sleep(10);
Bun.gc(true);
await Bun.sleep(80);
expect(getMaxFD()).toBeLessThan(initialMaxFD + 10);
}

View File

@@ -0,0 +1,59 @@
import { expect, test } from "bun:test";
import { connect, listen } from "bun";
import { getMaxFD } from "harness";
test("tcp socket doesn't leak", async () => {
const init = getMaxFD();
{
let onClose = () => {};
const server = listen({
port: 0,
hostname: "0.0.0.0",
socket: {
data(socket, data) {
socket.write("hi");
},
open(socket) {},
close(socket) {
onClose(socket);
},
},
});
let attempts = 1000;
while ((attempts -= 50) >= 0) {
let batch = [];
let closed = [];
for (let i = 0; i < 50; i++) {
const onClose = Promise.withResolvers();
closed.push(onClose.promise);
batch.push(
connect({
port: server.port,
hostname: server.hostname,
socket: {
close(socket) {
onClose.resolve(socket);
},
data(socket, data) {},
open(socket) {
socket.write("hi");
},
},
}),
);
}
const sockets = await Promise.all(batch);
sockets.forEach(socket => socket.end());
await Promise.all(closed);
}
server.stop(true);
}
Bun.gc(true);
await Bun.sleep(1000);
Bun.gc(true);
const end = getMaxFD();
console.log({ init, end });
expect(end - init).toBeLessThan(100);
});