Compare commits

...

7 Commits

Author SHA1 Message Date
Jarred Sumner
157c6ffe85 Update fetch-tcp-stress.test.ts 2024-09-27 03:34:31 -07:00
Jarred Sumner
70cd12ef2b Update runner.node.mjs 2024-09-27 03:23:30 -07:00
Jarred Sumner
fc1ae69989 Update runner.node.mjs 2024-09-27 03:23:04 -07:00
Jarred Sumner
0bbce0f0c0 Update runner.node.mjs 2024-09-27 03:03:34 -07:00
Jarred Sumner
2c5690a0c8 Reduce concurrent connection count on macOS 2024-09-27 02:28:23 -07:00
Jarred Sumner
378631f531 Update fetch-tcp-stress.test.ts 2024-09-27 01:39:34 -07:00
Jarred Sumner
856c64fc9f Attempt to deflake tcp socket test 2024-09-26 22:08:53 -07:00
4 changed files with 172 additions and 85 deletions

View File

@@ -24,7 +24,11 @@
#include <arpa/inet.h>
#endif
#ifdef __APPLE__
#define CONCURRENT_CONNECTIONS 2
#else
#define CONCURRENT_CONNECTIONS 4
#endif
// clang-format off
int default_is_low_prio_handler(struct us_socket_t *s) {

View File

@@ -7,23 +7,23 @@
// - It cannot use Bun APIs, since it is run using Node.js.
// - It does not import dependencies, so it's faster to start.
import { spawn, spawnSync } from "node:child_process";
import {
constants as fs,
readFileSync,
mkdtempSync,
existsSync,
statSync,
mkdirSync,
accessSync,
appendFileSync,
existsSync,
constants as fs,
mkdirSync,
mkdtempSync,
readdirSync,
readFileSync,
rmSync,
statSync,
} from "node:fs";
import { spawn, spawnSync } from "node:child_process";
import { tmpdir, hostname, userInfo, homedir } from "node:os";
import { join, basename, dirname, relative, sep } from "node:path";
import { normalize as normalizeWindows } from "node:path/win32";
import { isIP } from "node:net";
import { homedir, hostname, tmpdir, userInfo } from "node:os";
import { basename, dirname, isAbsolute, join, relative, resolve, sep } from "node:path";
import { normalize as normalizeWindows } from "node:path/win32";
import { parseArgs } from "node:util";
const spawnTimeout = 5_000;
@@ -222,7 +222,11 @@ async function runTests() {
if (results.every(({ ok }) => ok)) {
for (const testPath of tests) {
const title = relative(cwd, join(testsPath, testPath)).replace(/\\/g, "/");
await runTest(title, async () => spawnBunTest(execPath, join("test", testPath)));
// Make it an absolute path so that the test directory is not scanned, which reduces the load on the filesystem
const abs = resolve("test", testPath);
await runTest(title, async () => spawnBunTest(execPath, abs, join("test", testPath)));
}
}
@@ -531,12 +535,12 @@ async function spawnBun(execPath, { args, cwd, timeout, env, stdout, stderr }) {
* @param {string} testPath
* @returns {Promise<TestResult>}
*/
async function spawnBunTest(execPath, testPath) {
async function spawnBunTest(execPath, absoluteTestPath, testPath) {
const timeout = getTestTimeout(testPath);
const perTestTimeout = Math.ceil(timeout / 2);
const isReallyTest = isTestStrict(testPath);
const { ok, error, stdout } = await spawnBun(execPath, {
args: isReallyTest ? ["test", `--timeout=${perTestTimeout}`, testPath] : [testPath],
args: isReallyTest ? ["test", `--timeout=${perTestTimeout}`, absoluteTestPath] : [absoluteTestPath],
cwd: cwd,
timeout: isReallyTest ? timeout : 30_000,
env: {
@@ -1265,6 +1269,10 @@ function getBuildLabel() {
* @returns {string | undefined}
*/
function getFileUrl(file, line) {
if (isAbsolute(filePath) && filePath.includes(cwd)) {
file = relative(cwd, filePath);
}
const filePath = file.replace(/\\/g, "/");
let url;

View File

@@ -14,90 +14,106 @@ async function runStressTest({
onServerWritten: (socket) => void;
onFetchWritten: (socket) => void;
}) {
const total = PORT_EXHAUSTION_THRESHOLD * 2;
let sockets = [];
const batch = 48;
let toClose = 0;
let pendingClose = Promise.withResolvers();
const objects = [];
for (let i = 0; i < total; i++) {
objects.push({
method: "POST",
body: "--BYTEMARKER: " + (10 + i) + " ",
keepalive: false,
});
}
let initialMaxFD = -1;
const server = await Bun.listen({
port: 0,
socket: {
open(socket) {},
data(socket, data) {
const text = new TextDecoder().decode(data);
const i = parseInt(text.slice(text.indexOf("--BYTEMARKER: ") + "--BYTEMARKER: ".length).slice(0, 3)) - 10;
if (text.includes(objects[i].body)) {
socket.data ??= {};
socket.data.read = true;
sockets[i] = socket;
if (socket.write("200 OK\r\nCo") === "200 OK\r\nCo".length) {
{
const total = PORT_EXHAUSTION_THRESHOLD * 2;
let sockets = [];
const batch = 48;
let toClose = 0;
let pendingClose = Promise.withResolvers();
const objects = [];
for (let i = 0; i < total; i++) {
objects.push({
method: "POST",
body: "--BYTEMARKER: " + (10 + i) + " ",
keepalive: false,
});
}
const server = await Bun.listen({
port: 0,
socket: {
open(socket) {},
data(socket, data) {
const text = new TextDecoder().decode(data);
const i = parseInt(text.slice(text.indexOf("--BYTEMARKER: ") + "--BYTEMARKER: ".length).slice(0, 3)) - 10;
if (text.includes(objects[i].body)) {
socket.data ??= {};
socket.data.read = true;
sockets[i] = socket;
if (socket.write("200 OK\r\n\r\n") === "200 OK\r\n\r\n".length) {
socket.data.written = true;
onServerWritten(socket);
}
return;
}
console.log("Data is missing!");
},
drain(socket) {
if (!socket.data?.read || socket.data?.written) {
return;
}
if (socket.write("200 OK\r\n\r\n") === "200 OK\r\n\r\n".length) {
socket.data.written = true;
onServerWritten(socket);
}
return;
},
error(socket, err) {
console.log(err);
},
timeout() {},
close(socket) {
toClose--;
if (toClose === 0) {
pendingClose.resolve();
}
},
},
hostname: "127.0.0.1",
});
for (let remaining = total; remaining > 0; remaining -= batch) {
pendingClose = Promise.withResolvers();
{
const promises = [];
toClose = batch;
for (let i = 0; i < batch; i++) {
promises.push(
fetch(`http://127.0.0.1:${server.port}`, objects[i])
.then(r => r.blob())
.finally(() => {
onFetchWritten(sockets[i]);
}),
);
}
await Promise.allSettled(promises);
console.log("Data is missing!");
},
drain(socket) {
if (!socket.data?.read || socket.data?.written) {
return;
}
if (socket.write("200 OK\r\nCo") === "200 OK\r\nCo".length) {
socket.data.written = true;
onServerWritten(socket);
}
},
error(socket, err) {
console.log(err);
},
timeout() {},
close(socket) {
toClose--;
if (toClose === 0) {
pendingClose.resolve();
}
},
},
hostname: "127.0.0.1",
});
let initialMaxFD = -1;
for (let remaining = total; remaining > 0; remaining -= batch) {
pendingClose = Promise.withResolvers();
{
const promises = [];
toClose = batch;
for (let i = 0; i < batch; i++) {
promises.push(
fetch(`http://127.0.0.1:${server.port}`, objects[i]).finally(() => {
onFetchWritten(sockets[i]);
}),
);
promises.length = 0;
}
await Promise.allSettled(promises);
promises.length = 0;
await pendingClose.promise;
if (total) sockets = [];
if (initialMaxFD === -1) {
initialMaxFD = getMaxFD();
}
}
server.stop(true);
}
Bun.gc(true);
await pendingClose.promise;
if (total) sockets = [];
await Bun.sleep(80);
if (initialMaxFD === -1) {
initialMaxFD = getMaxFD();
for (let i = 0; i < 10; i++) {
let max = getMaxFD();
if (max > initialMaxFD + 10) {
await Bun.sleep(10);
console.log("Max FD is still high!", { max, initialMaxFD });
}
}
server.stop(true);
await Bun.sleep(10);
expect(getMaxFD()).toBeLessThan(initialMaxFD + 10);
}
@@ -106,7 +122,7 @@ test(
async () => {
await runStressTest({
onServerWritten(socket) {
socket.end();
socket.shutdown();
},
onFetchWritten(socket) {},
});

View File

@@ -0,0 +1,59 @@
import { expect, test } from "bun:test";
import { connect, listen } from "bun";
import { getMaxFD } from "harness";
test("tcp socket doesn't leak", async () => {
const init = getMaxFD();
{
let onClose = () => {};
const server = listen({
port: 0,
hostname: "0.0.0.0",
socket: {
data(socket, data) {
socket.write("hi");
},
open(socket) {},
close(socket) {
onClose(socket);
},
},
});
let attempts = 1000;
while ((attempts -= 50) >= 0) {
let batch = [];
let closed = [];
for (let i = 0; i < 50; i++) {
const onClose = Promise.withResolvers();
closed.push(onClose.promise);
batch.push(
connect({
port: server.port,
hostname: server.hostname,
socket: {
close(socket) {
onClose.resolve(socket);
},
data(socket, data) {},
open(socket) {
socket.write("hi");
},
},
}),
);
}
const sockets = await Promise.all(batch);
sockets.forEach(socket => socket.end());
await Promise.all(closed);
}
server.stop(true);
}
Bun.gc(true);
await Bun.sleep(1000);
Bun.gc(true);
const end = getMaxFD();
console.log({ init, end });
expect(end - init).toBeLessThan(100);
});