Compare commits

...

1 Commits

Author SHA1 Message Date
Ashcon Partovi
953757bb88 Revert "Deflake fetch tests (#16000)"
This reverts commit 7b06872abb.
2024-12-31 09:13:33 -08:00
11 changed files with 63 additions and 88 deletions

View File

@@ -1,6 +1,5 @@
const server = Bun.serve({
port: 0,
idleTimeout: 0,
async fetch(req) {
return new Response(

View File

@@ -3,7 +3,6 @@ const content = Buffer.alloc(3 * 15360000, "Bun").toString();
const server = Bun.serve({
port: 0,
idleTimeout: 0,
fetch: async req => {
const data = await req.formData();
return new Response(data.get("name") === content ? "OK" : "NO");

View File

@@ -1,6 +1,5 @@
const server = Bun.serve({
port: 0,
idleTimeout: 0,
async fetch(req: Request) {
const url = req.url;
if (url.endsWith("/report")) {

View File

@@ -1,6 +1,6 @@
const server = Bun.serve({
port: 0,
idleTimeout: 0,
error(err) {
return new Response("Failed", { status: 555 });
},

View File

@@ -1,6 +1,5 @@
const server = Bun.serve({
hostname: "localhost",
idleTimeout: 0,
async fetch() {
throw new Error("Error");
},

View File

@@ -9,9 +9,15 @@ const totalCount = 10_000;
const zeroCopyPayload = new Blob([payload]);
const zeroCopyJSONPayload = new Blob([JSON.stringify({ bun: payload })]);
async function getURL() {
let defer = Promise.withResolvers<string>();
const process = Bun.spawn([bunExe(), "--smol", join(import.meta.dirname, "body-leak-test-fixture.ts")], {
let url: URL;
let process: Subprocess<"ignore", "pipe", "inherit"> | null = null;
beforeEach(async () => {
if (process) {
process?.kill();
}
let defer = Promise.withResolvers();
process = Bun.spawn([bunExe(), "--smol", join(import.meta.dirname, "body-leak-test-fixture.ts")], {
env: bunEnv,
stdout: "inherit",
stderr: "inherit",
@@ -20,17 +26,19 @@ async function getURL() {
defer.resolve(message);
},
});
const url: URL = new URL(await defer.promise);
url = new URL(await defer.promise);
process.unref();
await warmup(url);
return { url, process };
}
await warmup();
});
afterEach(() => {
process?.kill();
});
async function getMemoryUsage(url: URL): Promise<number> {
async function getMemoryUsage(): Promise<number> {
return (await fetch(`${url.origin}/report`).then(res => res.json())) as number;
}
async function warmup(url: URL) {
async function warmup() {
var remaining = totalCount;
while (remaining > 0) {
@@ -46,17 +54,17 @@ async function warmup(url: URL) {
remaining -= batchSize;
}
// clean up memory before first test
await getMemoryUsage(url);
await getMemoryUsage();
}
async function callBuffering(url: URL) {
async function callBuffering() {
const result = await fetch(`${url.origin}/buffering`, {
method: "POST",
body: zeroCopyPayload,
}).then(res => res.text());
expect(result).toBe("Ok");
}
async function callJSONBuffering(url: URL) {
async function callJSONBuffering() {
const result = await fetch(`${url.origin}/json-buffering`, {
method: "POST",
body: zeroCopyJSONPayload,
@@ -64,35 +72,35 @@ async function callJSONBuffering(url: URL) {
expect(result).toBe("Ok");
}
async function callBufferingBodyGetter(url: URL) {
async function callBufferingBodyGetter() {
const result = await fetch(`${url.origin}/buffering+body-getter`, {
method: "POST",
body: zeroCopyPayload,
}).then(res => res.text());
expect(result).toBe("Ok");
}
async function callStreaming(url: URL) {
async function callStreaming() {
const result = await fetch(`${url.origin}/streaming`, {
method: "POST",
body: zeroCopyPayload,
}).then(res => res.text());
expect(result).toBe("Ok");
}
async function callIncompleteStreaming(url: URL) {
async function callIncompleteStreaming() {
const result = await fetch(`${url.origin}/incomplete-streaming`, {
method: "POST",
body: zeroCopyPayload,
}).then(res => res.text());
expect(result).toBe("Ok");
}
async function callStreamingEcho(url: URL) {
async function callStreamingEcho() {
const result = await fetch(`${url.origin}/streaming-echo`, {
method: "POST",
body: zeroCopyPayload,
}).then(res => res.text());
expect(result).toBe(payload);
}
async function callIgnore(url: URL) {
async function callIgnore() {
const result = await fetch(url, {
method: "POST",
body: zeroCopyPayload,
@@ -100,8 +108,8 @@ async function callIgnore(url: URL) {
expect(result).toBe("Ok");
}
async function calculateMemoryLeak(fn: (url: URL) => Promise<void>, url: URL) {
const start_memory = await getMemoryUsage(url);
async function calculateMemoryLeak(fn: () => Promise<void>) {
const start_memory = await getMemoryUsage();
const memory_examples: Array<number> = [];
let peak_memory = start_memory;
@@ -109,14 +117,14 @@ async function calculateMemoryLeak(fn: (url: URL) => Promise<void>, url: URL) {
while (remaining > 0) {
const batch = new Array(batchSize);
for (let j = 0; j < batchSize; j++) {
batch[j] = fn(url);
batch[j] = fn();
}
await Promise.all(batch);
remaining -= batchSize;
// garbage collect and check memory usage every 1000 requests
if (remaining > 0 && remaining % 1000 === 0) {
const report = await getMemoryUsage(url);
const report = await getMemoryUsage();
if (report > peak_memory) {
peak_memory = report;
}
@@ -125,7 +133,7 @@ async function calculateMemoryLeak(fn: (url: URL) => Promise<void>, url: URL) {
}
// wait for the last memory usage to be stable
const end_memory = await getMemoryUsage(url);
const end_memory = await getMemoryUsage();
if (end_memory > peak_memory) {
peak_memory = end_memory;
}
@@ -152,9 +160,7 @@ for (const test_info of [
it.todoIf(skip)(
testName,
async () => {
const { url, process } = await getURL();
await using processHandle = process;
const report = await calculateMemoryLeak(fn, url);
const report = await calculateMemoryLeak(fn);
// peak memory is too high
expect(report.peak_memory).not.toBeGreaterThan(report.start_memory * 2.5);
// acceptable memory leak

View File

@@ -9,7 +9,6 @@
let pending = [];
using server = Bun.serve({
port: 0,
idleTimeout: 0,
websocket: {
open(ws) {
globalThis.sockets ??= [];

View File

@@ -1,15 +1,8 @@
// Reduce memory pressure by not cloning the buffer each Response.
const payload = new Blob([Buffer.alloc(64 * 64 * 1024, "X")]);
const payload = Buffer.alloc(64 * 64 * 1024, "X");
const server = Bun.serve({
port: 0,
idleTimeout: 0,
async fetch(req) {
return new Response(payload);
},
});
if (process.send) {
process.send(server.url.href);
} else {
console.log(server.url.href);
}
console.log(server.url.href);

View File

@@ -23,16 +23,8 @@ try {
Bun.gc(true);
await Bun.sleep(10);
const stats = getHeapStats();
let { Response, Promise } = stats;
Response ||= 0;
Promise ||= 0;
console.log({
rss: ((process.memoryUsage.rss() / 1024 / 1024) | 0) + " MB",
Response,
Promise,
});
expect(Response).toBeLessThanOrEqual(threshold);
expect(Promise).toBeLessThanOrEqual(threshold);
expect(stats.Response || 0).toBeLessThanOrEqual(threshold);
expect(stats.Promise || 0).toBeLessThanOrEqual(threshold);
}
}
process.exit(0);

View File

@@ -10,7 +10,7 @@ describe("fetch doesn't leak", () => {
var count = 0;
using server = Bun.serve({
port: 0,
idleTimeout: 0,
fetch(req) {
count++;
return new Response(body);
@@ -20,7 +20,7 @@ describe("fetch doesn't leak", () => {
const proc = Bun.spawn({
env: {
...bunEnv,
SERVER: server.url.href,
SERVER: `http://${server.hostname}:${server.port}`,
COUNT: "200",
},
stderr: "inherit",
@@ -49,7 +49,6 @@ describe("fetch doesn't leak", () => {
const serveOptions = {
port: 0,
idleTimeout: 0,
fetch(req) {
return new Response(body, { headers });
},
@@ -63,8 +62,8 @@ describe("fetch doesn't leak", () => {
const env = {
...bunEnv,
SERVER: server.url.href,
BUN_JSC_forceRAMSize: (1024 * 1024 * 64).toString(10),
SERVER: `${tls ? "https" : "http"}://${server.hostname}:${server.port}`,
BUN_JSC_forceRAMSize: (1024 * 1024 * 64).toString("10"),
NAME: name,
};
@@ -106,7 +105,6 @@ describe.each(["FormData", "Blob", "Buffer", "String", "URLSearchParams", "strea
async () => {
using server = Bun.serve({
port: 0,
idleTimeout: 0,
fetch(req) {
return new Response();
},
@@ -153,7 +151,7 @@ test("do not leak", async () => {
let url;
let isDone = false;
server.listen(0, "127.0.0.1", function attack() {
server.listen(0, function attack() {
if (isDone) {
return;
}
@@ -167,7 +165,7 @@ test("do not leak", async () => {
let prev = Infinity;
let count = 0;
var interval = setInterval(() => {
const interval = setInterval(() => {
isDone = true;
gc();
const next = process.memoryUsage().heapUsed;

View File

@@ -17,7 +17,6 @@ const fetchFixture4 = join(import.meta.dir, "fetch-leak-test-fixture-4.js");
let server: Server;
function startServer({ fetch, ...options }: ServeOptions) {
server = serve({
idleTimeout: 0,
...options,
fetch,
port: 0,
@@ -1315,16 +1314,9 @@ describe("Response", () => {
method: "POST",
body: await Bun.file(import.meta.dir + "/fixtures/file.txt").arrayBuffer(),
});
const input = await response.bytes();
var input = await response.arrayBuffer();
var output = await Bun.file(import.meta.dir + "/fixtures/file.txt").stream();
let chunks: Uint8Array[] = [];
const reader = output.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(input).toEqual(Buffer.concat(chunks));
expect(new Uint8Array(input)).toEqual((await output.getReader().read()).value);
});
});
@@ -2026,31 +2018,35 @@ describe("http/1.1 response body length", () => {
});
describe("fetch Response life cycle", () => {
it("should not keep Response alive if not consumed", async () => {
let deferred = Promise.withResolvers<string>();
await using serverProcess = Bun.spawn({
const serverProcess = Bun.spawn({
cmd: [bunExe(), "--smol", fetchFixture3],
stderr: "inherit",
stdout: "inherit",
stdin: "inherit",
stdout: "pipe",
stdin: "ignore",
env: bunEnv,
ipc(message) {
deferred.resolve(message);
},
});
const serverUrl = await deferred.promise;
await using clientProcess = Bun.spawn({
async function getServerUrl() {
const reader = serverProcess.stdout.getReader();
const { done, value } = await reader.read();
return new TextDecoder().decode(value);
}
const serverUrl = await getServerUrl();
const clientProcess = Bun.spawn({
cmd: [bunExe(), "--smol", fetchFixture4, serverUrl],
stderr: "inherit",
stdout: "inherit",
stdin: "inherit",
stdout: "pipe",
stdin: "ignore",
env: bunEnv,
});
expect(await clientProcess.exited).toBe(0);
try {
expect(await clientProcess.exited).toBe(0);
} finally {
serverProcess.kill();
}
});
it("should allow to get promise result after response is GC'd", async () => {
using server = Bun.serve({
const server = Bun.serve({
port: 0,
async fetch(request: Request) {
return new Response(
@@ -2239,7 +2235,6 @@ describe("fetch should allow duplex", () => {
it("should work in redirects .manual when using duplex", async () => {
using server = Bun.serve({
port: 0,
idleTimeout: 0,
async fetch(req) {
if (req.url.indexOf("/redirect") === -1) {
return Response.redirect("/");
@@ -2303,11 +2298,7 @@ it("should allow to follow redirect if connection is closed, abort should work e
await once(server.listen(0), "listening");
try {
let { address, port } = server.address() as AddressInfo;
if (address === "::") {
address = "[::]";
}
const response = await fetch(`http://${address}:${port}/redirect`, {
const response = await fetch(`http://localhost:${(server.address() as AddressInfo).port}/redirect`, {
signal: AbortSignal.timeout(150),
});
if (type === "delay") {