Merge branch 'jarred/process-change' of github.com:oven-sh/bun into jarred/process-change

This commit is contained in:
Zack Radisic
2024-02-27 21:41:21 -08:00
4 changed files with 106 additions and 61 deletions

View File

@@ -139,7 +139,7 @@ export function readableByteStreamControllerClose(controller) {
}
}
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
}
export function readableByteStreamControllerClearPendingPullIntos(controller) {
@@ -177,7 +177,7 @@ export function readableByteStreamControllerHandleQueueDrain(controller) {
$getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable,
);
if (!$getByIdDirectPrivate(controller, "queue").size && $getByIdDirectPrivate(controller, "closeRequested"))
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
else $readableByteStreamControllerCallPullIfNeeded(controller);
}
@@ -227,6 +227,7 @@ export function readableByteStreamControllerShouldCallPull(controller) {
if (!stream) {
return false;
}
if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return false;
if ($getByIdDirectPrivate(controller, "closeRequested")) return false;
if (!($getByIdDirectPrivate(controller, "started") > 0)) return false;

View File

@@ -99,12 +99,11 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$putByValDirect(outValues, i, values[i].value);
}
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));
if ($getByIdDirectPrivate(controller, "closeRequested"))
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
else if ($isReadableStreamDefaultController(controller)) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
@@ -141,7 +140,7 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$resetQueue(queue);
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {

View File

@@ -708,7 +708,7 @@ export async function readStreamIntoSink(stream, sink, isNative) {
sink,
stream,
undefined,
() => !didThrow && $markPromiseAsHandled(stream.cancel()),
() => !didThrow && stream.$state !== $streamClosed && $markPromiseAsHandled(stream.cancel()),
stream.$asyncContext,
);
@@ -778,7 +778,7 @@ export async function readStreamIntoSink(stream, sink, isNative) {
}
if (!didThrow && streamState !== $streamClosed && streamState !== $streamErrored) {
$readableStreamClose(stream);
$readableStreamCloseIfPossible(stream);
}
stream = undefined;
}
@@ -944,7 +944,7 @@ export function onCloseDirectStream(reason) {
if (_pendingRead && $isPromise(_pendingRead) && flushed?.byteLength) {
this._pendingRead = undefined;
$fulfillPromise(_pendingRead, { value: flushed, done: false });
$readableStreamClose(stream);
$readableStreamCloseIfPossible(stream);
return;
}
}
@@ -953,7 +953,7 @@ export function onCloseDirectStream(reason) {
var requests = $getByIdDirectPrivate(reader, "readRequests");
if (requests?.isNotEmpty()) {
$readableStreamFulfillReadRequest(stream, flushed, false);
$readableStreamClose(stream);
$readableStreamCloseIfPossible(stream);
return;
}
@@ -964,7 +964,7 @@ export function onCloseDirectStream(reason) {
done: false,
});
flushed = undefined;
$readableStreamClose(stream);
$readableStreamCloseIfPossible(stream);
stream = undefined;
return thisResult;
};
@@ -975,7 +975,7 @@ export function onCloseDirectStream(reason) {
$fulfillPromise(read, { value: undefined, done: true });
}
$readableStreamClose(stream);
$readableStreamCloseIfPossible(stream);
}
export function onFlushDirectStream() {
@@ -1374,9 +1374,9 @@ export function readableStreamDefaultControllerPull(controller) {
var queue = $getByIdDirectPrivate(controller, "queue");
if (queue.content.isNotEmpty()) {
const chunk = $dequeueValue(queue);
if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty())
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
else $readableStreamDefaultControllerCallPullIfNeeded(controller);
if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else $readableStreamDefaultControllerCallPullIfNeeded(controller);
return $createFulfilledPromise({ value: chunk, done: false });
}
@@ -1388,8 +1388,19 @@ export function readableStreamDefaultControllerPull(controller) {
export function readableStreamDefaultControllerClose(controller) {
$assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller));
$putByIdDirectPrivate(controller, "closeRequested", true);
if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty())
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
}
}
export function readableStreamCloseIfPossible(stream) {
switch ($getByIdDirectPrivate(stream, "state")) {
case $streamReadable:
case $streamClosing: {
$readableStreamClose(stream);
break;
}
}
}
export function readableStreamClose(stream) {
@@ -1398,12 +1409,13 @@ export function readableStreamClose(stream) {
$getByIdDirectPrivate(stream, "state") === $streamClosing,
);
$putByIdDirectPrivate(stream, "state", $streamClosed);
if (!$getByIdDirectPrivate(stream, "reader")) return;
const reader = $getByIdDirectPrivate(stream, "reader");
if (!reader) return;
if ($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))) {
const requests = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests");
if ($isReadableStreamDefaultReader(reader)) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
if (requests.isNotEmpty()) {
$putByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests", $createFIFO());
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift())
$fulfillPromise(request, { value: undefined, done: true });
@@ -1895,7 +1907,7 @@ export function readableStreamToArrayBufferDirect(stream, underlyingSource) {
return Promise.$reject(e);
} finally {
if (!$isPromise(firstPull)) {
if (!didError && stream) $readableStreamClose(stream);
if (!didError && stream) $readableStreamCloseIfPossible(stream);
controller = close = sink = pull = stream = undefined;
return capability.promise;
}
@@ -1904,7 +1916,7 @@ export function readableStreamToArrayBufferDirect(stream, underlyingSource) {
$assert($isPromise(firstPull));
return firstPull.then(
() => {
if (!didError && stream) $readableStreamClose(stream);
if (!didError && stream) $readableStreamCloseIfPossible(stream);
controller = close = sink = pull = stream = undefined;
return capability.promise;
},

View File

@@ -3,7 +3,7 @@ import { file, gc, Serve, serve, Server } from "bun";
import { afterEach, describe, it, expect, afterAll } from "bun:test";
import { readFileSync, writeFileSync } from "fs";
import { join, resolve } from "path";
import { bunExe, bunEnv } from "harness";
import { bunExe, bunEnv, dumpStats } from "harness";
// import { renderToReadableStream } from "react-dom/server";
// import app_jsx from "./app.jsx";
import { spawn } from "child_process";
@@ -51,46 +51,79 @@ afterAll(() => {
}
});
it.todo("1000 simultaneous downloads do not leak ReadableStream", async () => {});
describe("1000 simultaneous uploads & downloads do not leak ReadableStream", () => {
for (let isDirect of [true, false] as const) {
it(
isDirect ? "direct" : "default",
async () => {
const blob = new Blob([new Uint8Array(1024 * 768).fill(123)]);
Bun.gc(true);
it("1000 simultaneous uploads do not leak ReadableStream", async () => {
const blob = new Blob([new Uint8Array(128).fill(123)]);
Bun.gc(true);
const expected = Bun.CryptoHasher.hash("sha256", blob, "base64");
const initialCount = heapStats().objectTypeCounts.ReadableStream || 0;
const expected = Bun.CryptoHasher.hash("sha256", blob, "base64");
const initialCount = heapStats().objectTypeCounts.ReadableStream || 0;
await runTest(
{
async fetch(req) {
var hasher = new Bun.SHA256();
for await (const chunk of req.body) {
await Bun.sleep(0);
hasher.update(chunk);
}
return new Response(
isDirect
? new ReadableStream({
type: "direct",
async pull(controller) {
await Bun.sleep(0);
controller.write(Buffer.from(hasher.digest("base64")));
await controller.flush();
controller.close();
},
})
: new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
controller.enqueue(Buffer.from(hasher.digest("base64")));
controller.close();
},
}),
);
},
},
async server => {
const count = 1000;
async function callback() {
const response = await fetch(server.url, { body: blob, method: "POST" });
await runTest(
{
async fetch(req) {
var hasher = new Bun.SHA256();
for await (const chunk of req.body) {
await Bun.sleep(0);
hasher.update(chunk);
}
return new Response(hasher.digest("base64"));
// We are testing for ReadableStream leaks, so we use the ReadableStream here.
const chunks = [];
for await (const chunk of response.body) {
chunks.push(chunk);
}
const digest = Buffer.from(Bun.concatArrayBuffers(chunks)).toString();
expect(digest).toBe(expected);
}
{
const promises = new Array(count);
for (let i = 0; i < count; i++) {
promises[i] = callback();
}
await Promise.all(promises);
}
Bun.gc(true);
dumpStats();
expect(heapStats().objectTypeCounts.ReadableStream).toBeWithin(initialCount - 50, initialCount + 50);
},
);
},
},
async server => {
const count = 1000;
async function callback() {
const response = await fetch(server.url, { body: blob, method: "POST" });
const digest = await response.text();
expect(digest).toBe(expected);
}
{
const promises = new Array(count);
for (let i = 0; i < count; i++) {
promises[i] = callback();
}
await Promise.all(promises);
}
Bun.gc(true);
expect(heapStats().objectTypeCounts.ReadableStream).toBeWithin(initialCount - 50, initialCount + 50);
},
);
100000,
);
}
});
[200, 200n, 303, 418, 599, 599n].forEach(statusCode => {