From 3812335c478e49d8cfb1ba80e53a71f802b64409 Mon Sep 17 00:00:00 2001 From: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Date: Tue, 27 Feb 2024 19:55:27 -0800 Subject: [PATCH 1/2] closeIfPossible --- .../builtins/ReadableByteStreamInternals.ts | 5 +- .../builtins/ReadableStreamDefaultReader.ts | 9 ++-- src/js/builtins/ReadableStreamInternals.ts | 46 ++++++++++++------- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/src/js/builtins/ReadableByteStreamInternals.ts b/src/js/builtins/ReadableByteStreamInternals.ts index 8e395dfe53..1a87c977ee 100644 --- a/src/js/builtins/ReadableByteStreamInternals.ts +++ b/src/js/builtins/ReadableByteStreamInternals.ts @@ -139,7 +139,7 @@ export function readableByteStreamControllerClose(controller) { } } - $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); } export function readableByteStreamControllerClearPendingPullIntos(controller) { @@ -177,7 +177,7 @@ export function readableByteStreamControllerHandleQueueDrain(controller) { $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable, ); if (!$getByIdDirectPrivate(controller, "queue").size && $getByIdDirectPrivate(controller, "closeRequested")) - $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); else $readableByteStreamControllerCallPullIfNeeded(controller); } @@ -227,6 +227,7 @@ export function readableByteStreamControllerShouldCallPull(controller) { if (!stream) { return false; } + if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return false; if ($getByIdDirectPrivate(controller, "closeRequested")) return false; if (!($getByIdDirectPrivate(controller, "started") > 0)) return false; diff --git a/src/js/builtins/ReadableStreamDefaultReader.ts b/src/js/builtins/ReadableStreamDefaultReader.ts index 360bfc33f5..2ff8e385f0 100644 --- a/src/js/builtins/ReadableStreamDefaultReader.ts +++ b/src/js/builtins/ReadableStreamDefaultReader.ts @@ -99,12 +99,11 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau $putByValDirect(outValues, i, values[i].value); } } - $resetQueue($getByIdDirectPrivate(controller, "queue")); - if ($getByIdDirectPrivate(controller, "closeRequested")) - $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); - else if ($isReadableStreamDefaultController(controller)) { + if ($getByIdDirectPrivate(controller, "closeRequested")) { + $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); + } else if ($isReadableStreamDefaultController(controller)) { $readableStreamDefaultControllerCallPullIfNeeded(controller); } else if ($isReadableByteStreamController(controller)) { $readableByteStreamControllerCallPullIfNeeded(controller); @@ -141,7 +140,7 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau $resetQueue(queue); if ($getByIdDirectPrivate(controller, "closeRequested")) { - $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); } else if ($isReadableStreamDefaultController(controller)) { $readableStreamDefaultControllerCallPullIfNeeded(controller); } else if ($isReadableByteStreamController(controller)) { diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index 9d6afa94f4..5b81edd9b3 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -708,7 +708,7 @@ export async function readStreamIntoSink(stream, sink, isNative) { sink, stream, undefined, - () => !didThrow && $markPromiseAsHandled(stream.cancel()), + () => !didThrow && stream.$state !== $streamClosed && $markPromiseAsHandled(stream.cancel()), stream.$asyncContext, ); @@ -778,7 +778,7 @@ export async function readStreamIntoSink(stream, sink, isNative) { } if (!didThrow && streamState !== $streamClosed && streamState !== $streamErrored) { - $readableStreamClose(stream); + $readableStreamCloseIfPossible(stream); } stream = undefined; } @@ -944,7 +944,7 @@ export function onCloseDirectStream(reason) { if (_pendingRead && $isPromise(_pendingRead) && flushed?.byteLength) { this._pendingRead = undefined; $fulfillPromise(_pendingRead, { value: flushed, done: false }); - $readableStreamClose(stream); + $readableStreamCloseIfPossible(stream); return; } } @@ -953,7 +953,7 @@ export function onCloseDirectStream(reason) { var requests = $getByIdDirectPrivate(reader, "readRequests"); if (requests?.isNotEmpty()) { $readableStreamFulfillReadRequest(stream, flushed, false); - $readableStreamClose(stream); + $readableStreamCloseIfPossible(stream); return; } @@ -964,7 +964,7 @@ export function onCloseDirectStream(reason) { done: false, }); flushed = undefined; - $readableStreamClose(stream); + $readableStreamCloseIfPossible(stream); stream = undefined; return thisResult; }; @@ -975,7 +975,7 @@ export function onCloseDirectStream(reason) { $fulfillPromise(read, { value: undefined, done: true }); } - $readableStreamClose(stream); + $readableStreamCloseIfPossible(stream); } export function onFlushDirectStream() { @@ -1374,9 +1374,9 @@ export function readableStreamDefaultControllerPull(controller) { var queue = $getByIdDirectPrivate(controller, "queue"); if (queue.content.isNotEmpty()) { const chunk = $dequeueValue(queue); - if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) - $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); - else $readableStreamDefaultControllerCallPullIfNeeded(controller); + if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) { + $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); + } else $readableStreamDefaultControllerCallPullIfNeeded(controller); return $createFulfilledPromise({ value: chunk, done: false }); } @@ -1388,8 +1388,19 @@ export function readableStreamDefaultControllerPull(controller) { export function readableStreamDefaultControllerClose(controller) { $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller)); $putByIdDirectPrivate(controller, "closeRequested", true); - if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) - $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) { + $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); + } +} + +export function readableStreamCloseIfPossible(stream) { + switch ($getByIdDirectPrivate(stream, "state")) { + case $streamReadable: + case $streamClosing: { + $readableStreamClose(stream); + break; + } + } } export function readableStreamClose(stream) { @@ -1398,12 +1409,13 @@ export function readableStreamClose(stream) { $getByIdDirectPrivate(stream, "state") === $streamClosing, ); $putByIdDirectPrivate(stream, "state", $streamClosed); - if (!$getByIdDirectPrivate(stream, "reader")) return; + const reader = $getByIdDirectPrivate(stream, "reader"); + if (!reader) return; - if ($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))) { - const requests = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests"); + if ($isReadableStreamDefaultReader(reader)) { + const requests = $getByIdDirectPrivate(reader, "readRequests"); if (requests.isNotEmpty()) { - $putByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests", $createFIFO()); + $putByIdDirectPrivate(reader, "readRequests", $createFIFO()); for (var request = requests.shift(); request; request = requests.shift()) $fulfillPromise(request, { value: undefined, done: true }); @@ -1895,7 +1907,7 @@ export function readableStreamToArrayBufferDirect(stream, underlyingSource) { return Promise.$reject(e); } finally { if (!$isPromise(firstPull)) { - if (!didError && stream) $readableStreamClose(stream); + if (!didError && stream) $readableStreamCloseIfPossible(stream); controller = close = sink = pull = stream = undefined; return capability.promise; } @@ -1904,7 +1916,7 @@ export function readableStreamToArrayBufferDirect(stream, underlyingSource) { $assert($isPromise(firstPull)); return firstPull.then( () => { - if (!didError && stream) $readableStreamClose(stream); + if (!didError && stream) $readableStreamCloseIfPossible(stream); controller = close = sink = pull = stream = undefined; return capability.promise; }, From ec07710c8ae23004aa14d3c8c2b3126ba75898a2 Mon Sep 17 00:00:00 2001 From: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Date: Tue, 27 Feb 2024 19:57:25 -0800 Subject: [PATCH 2/2] Better leak test --- test/js/bun/http/serve.test.ts | 107 +++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 37 deletions(-) diff --git a/test/js/bun/http/serve.test.ts b/test/js/bun/http/serve.test.ts index 51645fd669..e7ab070583 100644 --- a/test/js/bun/http/serve.test.ts +++ b/test/js/bun/http/serve.test.ts @@ -3,7 +3,7 @@ import { file, gc, Serve, serve, Server } from "bun"; import { afterEach, describe, it, expect, afterAll } from "bun:test"; import { readFileSync, writeFileSync } from "fs"; import { join, resolve } from "path"; -import { bunExe, bunEnv } from "harness"; +import { bunExe, bunEnv, dumpStats } from "harness"; // import { renderToReadableStream } from "react-dom/server"; // import app_jsx from "./app.jsx"; import { spawn } from "child_process"; @@ -51,46 +51,79 @@ afterAll(() => { } }); -it.todo("1000 simultaneous downloads do not leak ReadableStream", async () => {}); +describe("1000 simultaneous uploads & downloads do not leak ReadableStream", () => { + for (let isDirect of [true, false] as const) { + it( + isDirect ? "direct" : "default", + async () => { + const blob = new Blob([new Uint8Array(1024 * 768).fill(123)]); + Bun.gc(true); -it("1000 simultaneous uploads do not leak ReadableStream", async () => { - const blob = new Blob([new Uint8Array(128).fill(123)]); - Bun.gc(true); + const expected = Bun.CryptoHasher.hash("sha256", blob, "base64"); + const initialCount = heapStats().objectTypeCounts.ReadableStream || 0; - const expected = Bun.CryptoHasher.hash("sha256", blob, "base64"); - const initialCount = heapStats().objectTypeCounts.ReadableStream || 0; + await runTest( + { + async fetch(req) { + var hasher = new Bun.SHA256(); + for await (const chunk of req.body) { + await Bun.sleep(0); + hasher.update(chunk); + } + return new Response( + isDirect + ? new ReadableStream({ + type: "direct", + async pull(controller) { + await Bun.sleep(0); + controller.write(Buffer.from(hasher.digest("base64"))); + await controller.flush(); + controller.close(); + }, + }) + : new ReadableStream({ + async pull(controller) { + await Bun.sleep(0); + controller.enqueue(Buffer.from(hasher.digest("base64"))); + controller.close(); + }, + }), + ); + }, + }, + async server => { + const count = 1000; + async function callback() { + const response = await fetch(server.url, { body: blob, method: "POST" }); - await runTest( - { - async fetch(req) { - var hasher = new Bun.SHA256(); - for await (const chunk of req.body) { - await Bun.sleep(0); - hasher.update(chunk); - } - return new Response(hasher.digest("base64")); + // We are testing for ReadableStream leaks, so we use the ReadableStream here. + const chunks = []; + for await (const chunk of response.body) { + chunks.push(chunk); + } + + const digest = Buffer.from(Bun.concatArrayBuffers(chunks)).toString(); + + expect(digest).toBe(expected); + } + { + const promises = new Array(count); + for (let i = 0; i < count; i++) { + promises[i] = callback(); + } + + await Promise.all(promises); + } + + Bun.gc(true); + dumpStats(); + expect(heapStats().objectTypeCounts.ReadableStream).toBeWithin(initialCount - 50, initialCount + 50); + }, + ); }, - }, - async server => { - const count = 1000; - async function callback() { - const response = await fetch(server.url, { body: blob, method: "POST" }); - const digest = await response.text(); - expect(digest).toBe(expected); - } - { - const promises = new Array(count); - for (let i = 0; i < count; i++) { - promises[i] = callback(); - } - - await Promise.all(promises); - } - - Bun.gc(true); - expect(heapStats().objectTypeCounts.ReadableStream).toBeWithin(initialCount - 50, initialCount + 50); - }, - ); + 100000, + ); + } }); [200, 200n, 303, 418, 599, 599n].forEach(statusCode => {