Rewrite internal Web Streams to use less memory (#16860)

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
Co-authored-by: pfg <pfg@pfg.pw>
This commit is contained in:
chloe caruso
2025-02-15 01:16:28 -08:00
committed by GitHub
parent 905fbee768
commit 78e52006c5
7 changed files with 370 additions and 232 deletions

View File

@@ -109,22 +109,7 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType
// Releasing the lock is not possible as there are active reads
// we will instead pretend we are unref'd, and release the lock once the reads are finished.
shouldUnref = true;
// unref the native part of the stream
try {
$getByIdDirectPrivate(
$getByIdDirectPrivate(native, "readableStreamController"),
"underlyingByteSource",
).$resume(false);
} catch (e) {
if (IS_BUN_DEVELOPMENT) {
// we assume this isn't possible, but because we aren't sure
// we will ignore if error during release, but make a big deal in debug
console.error(e);
$assert(!"reachable");
}
}
source?.updateRef?.(false);
}
} else if (source) {
source.updateRef(false);
@@ -188,25 +173,11 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType
async function internalRead(stream) {
$debug("internalRead();");
try {
var done: boolean, value: Uint8Array[];
$assert(reader);
const pendingRead = reader.readMany();
const { done, value } = await reader.read();
if ($isPromise(pendingRead)) {
({ done, value } = await pendingRead);
} else {
$debug("readMany() did not return a promise");
({ done, value } = pendingRead);
}
if (!done) {
stream.push(value[0]);
// shouldn't actually happen, but just in case
const length = value.length;
for (let i = 1; i < length; i++) {
stream.push(value[i]);
}
if (value) {
stream.push(value);
if (shouldUnref) unref();
} else {

View File

@@ -51,19 +51,23 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
const state = $getByIdDirectPrivate(stream, "state");
stream.$disturbed = true;
if (state === $streamClosed) return { value: [], size: 0, done: true };
else if (state === $streamErrored) {
if (state === $streamErrored) {
throw $getByIdDirectPrivate(stream, "storedError");
}
var controller = $getByIdDirectPrivate(stream, "readableStreamController");
var queue = $getByIdDirectPrivate(controller, "queue");
if (!queue) {
if (controller) {
var queue = $getByIdDirectPrivate(controller, "queue");
}
if (!queue && state !== $streamClosed) {
// This is a ReadableStream direct controller implemented in JS
// It hasn't been started yet.
return controller.$pull(controller).$then(function ({ done, value }) {
return done ? { done: true, value: [], size: 0 } : { value: [value], size: 1, done: false };
return done ? { done: true, value: value ? [value] : [], size: 0 } : { value: [value], size: 1, done: false };
});
} else if (!queue) {
return { done: true, value: [], size: 0 };
}
const content = queue.content;
@@ -98,27 +102,31 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$putByValDirect(outValues, i, values[i].value);
}
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
if (state !== $streamClosed) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
} else if ($isReadableByteStreamController(controller)) {
$readableByteStreamControllerCallPullIfNeeded(controller);
}
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));
return { value: outValues, size, done: false };
}
var onPullMany = result => {
const resultValue = result.value;
if (result.done) {
return { value: [], size: 0, done: true };
return { value: resultValue ? [resultValue] : [], size: 0, done: true };
}
var controller = $getByIdDirectPrivate(stream, "readableStreamController");
var queue = $getByIdDirectPrivate(controller, "queue");
var value = [result.value].concat(queue.content.toArray(false));
var value = [resultValue].concat(queue.content.toArray(false));
var length = value.length;
if ($isReadableByteStreamController(controller)) {
@@ -136,8 +144,6 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
}
var size = queue.size;
$resetQueue(queue);
if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream"));
} else if ($isReadableStreamDefaultController(controller)) {
@@ -146,12 +152,18 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
$readableByteStreamControllerCallPullIfNeeded(controller);
}
$resetQueue($getByIdDirectPrivate(controller, "queue"));
return { value: value, size: size, done: false };
};
if (state === $streamClosed) {
return { value: [], size: 0, done: true };
}
var pullResult = controller.$pull(controller);
if (pullResult && $isPromise(pullResult)) {
return pullResult.$then(onPullMany) as any;
return pullResult.then(onPullMany) as any;
}
return onPullMany(pullResult);

View File

@@ -1638,9 +1638,7 @@ export function readableStreamReaderGenericRelease(reader) {
var stream = $getByIdDirectPrivate(reader, "ownerReadableStream");
if (stream.$bunNativePtr) {
$getByIdDirectPrivate($getByIdDirectPrivate(stream, "readableStreamController"), "underlyingByteSource").$resume(
false,
);
$getByIdDirectPrivate($getByIdDirectPrivate(stream, "readableStreamController"), "underlyingSource").$resume(false);
}
$putByIdDirectPrivate(stream, "reader", undefined);
$putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
@@ -1768,162 +1766,252 @@ export function readableStreamFromAsyncIterator(target, fn) {
});
}
export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultController {
const closer = [false];
function callClose(controller) {
try {
var source = controller.$underlyingSource;
const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
if (!stream) {
return;
}
if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
controller.close();
} catch (e) {
globalThis.reportError(e);
} finally {
if (source?.$stream) {
source.$stream = undefined;
}
if (source) {
source.$data = undefined;
}
}
}
// This was a type: "bytes" until Bun v1.1.44, but pendingPullIntos was not really
// compatible with how we send data to the stream, and "mode: 'byob'" wasn't
// supported so changing it isn't an observable change.
//
// When we receive chunks of data from native code, we sometimes read more
// than what the input buffer provided. When that happens, we return a typed
// array instead of the number of bytes read.
//
// When that happens, the ReadableByteStreamController creates (byteLength / autoAllocateChunkSize) pending pull into descriptors.
// So if that number is something like 16 * 1024, and we actually read 2 MB, you're going to create 128 pending pull into descriptors.
//
// And those pendingPullIntos were often never actually drained.
class NativeReadableStreamSource {
constructor(handle, autoAllocateChunkSize, drainValue) {
$putByIdDirectPrivate(this, "stream", handle);
this.pull = this.#pull.bind(this);
this.cancel = this.#cancel.bind(this);
this.autoAllocateChunkSize = autoAllocateChunkSize;
if (drainValue !== undefined) {
this.start = controller => {
this.start = undefined;
this.#controller = new WeakRef(controller);
controller.enqueue(drainValue);
};
}
handle.onClose = this.#onClose.bind(this);
handle.onDrain = this.#onDrain.bind(this);
}
#onDrain(chunk) {
var controller = this.#controller?.deref?.();
if (controller) {
controller.enqueue(chunk);
}
}
#hasResized = false;
#adjustHighWaterMark(result) {
const autoAllocateChunkSize = this.autoAllocateChunkSize;
if (result >= autoAllocateChunkSize && !this.#hasResized) {
this.#hasResized = true;
this.autoAllocateChunkSize = Math.min(autoAllocateChunkSize * 2, 1024 * 1024 * 2);
}
}
#controller: WeakRef<ReadableByteStreamController>;
pull;
cancel;
start;
autoAllocateChunkSize = 0;
#chunk;
#closed = false;
$data?: Uint8Array;
#onClose() {
this.#closed = true;
this.#controller = undefined;
this.$data = undefined;
var controller = this.#controller?.deref?.();
$putByIdDirectPrivate(this, "stream", undefined);
if (controller) {
$enqueueJob(callClose, controller);
}
}
#getInternalBuffer(chunkSize) {
var chunk = this.$data;
if (!chunk || chunk.length < chunkSize) {
this.$data = chunk = new Uint8Array(chunkSize);
}
return chunk;
}
#handleArrayBufferViewResult(result, view, isClosed, controller) {
if (result.byteLength > 0) {
controller.enqueue(result);
}
if (isClosed) {
$enqueueJob(callClose, controller);
return undefined;
}
return view;
}
#handleNumberResult(result, view, isClosed, controller) {
if (result > 0) {
const remaining = view.length - result;
let toEnqueue = view;
if (remaining > 0) {
toEnqueue = view.subarray(0, result);
view = view.subarray(result);
} else {
view = undefined;
}
controller.enqueue(toEnqueue);
}
if (isClosed) {
$enqueueJob(callClose, controller);
return undefined;
}
return view;
}
#onNativeReadableStreamResult(result, view, isClosed, controller) {
if (typeof result === "number") {
if (!isClosed) this.#adjustHighWaterMark(result);
return this.#handleNumberResult(result, view, isClosed, controller);
} else if (typeof result === "boolean") {
$enqueueJob(callClose, controller);
return undefined;
} else if ($isTypedArrayView(result)) {
if (!isClosed) this.#adjustHighWaterMark(result.byteLength);
return this.#handleArrayBufferViewResult(result, view, isClosed, controller);
}
$debug("Unknown result type", result);
throw $ERR_INVALID_STATE("Internal error: invalid result from pull. This is a bug in Bun. Please report it.");
}
#pull(controller) {
var handle = $getByIdDirectPrivate(this, "stream");
if (!handle || this.#closed) {
this.#controller = undefined;
this.#closed = true;
$putByIdDirectPrivate(this, "stream", undefined);
$enqueueJob(callClose, controller);
this.$data = undefined;
return;
}
if (!this.#controller) {
this.#controller = new WeakRef(controller);
}
closer[0] = false;
if (this.$data) {
let drainResult = handle.drain();
if (drainResult) {
this.$data = this.#onNativeReadableStreamResult(drainResult, this.$data, closer[0], controller);
return;
}
}
const view = this.#getInternalBuffer(this.autoAllocateChunkSize);
const result = handle.pull(view, closer);
if ($isPromise(result)) {
return result.$then(
result => {
this.$data = this.#onNativeReadableStreamResult(result, view, closer[0], controller);
if (this.#closed) {
this.$data = undefined;
}
},
err => {
this.$data = undefined;
this.#closed = true;
this.#controller = undefined;
controller.error(err);
this.#onClose();
},
);
}
this.$data = this.#onNativeReadableStreamResult(result, view, closer[0], controller);
if (this.#closed) {
this.$data = undefined;
}
}
#cancel(reason) {
var handle = $getByIdDirectPrivate(this, "stream");
this.$data = undefined;
if (handle) {
handle.updateRef(false);
handle.cancel(reason);
$putByIdDirectPrivate(this, "stream", undefined);
}
}
}
// this is reuse of an existing private symbol
NativeReadableStreamSource.prototype.$resume = function (has_ref) {
var handle = $getByIdDirectPrivate(this, "stream");
if (handle) handle.updateRef(has_ref);
};
return NativeReadableStreamSource;
}
export function lazyLoadStream(stream, autoAllocateChunkSize) {
$debug("lazyLoadStream", stream, autoAllocateChunkSize);
var handle = stream.$bunNativePtr;
if (handle === -1) return;
var Prototype = $lazyStreamPrototypeMap.$get($getPrototypeOf(handle));
if (Prototype === undefined) {
var closer = [false];
var handleResult;
function handleNativeReadableStreamPromiseResult(val) {
var { c, v } = this;
this.c = undefined;
this.v = undefined;
handleResult(val, c, v);
}
function callClose(controller) {
try {
var underlyingByteSource = controller.$underlyingByteSource;
const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
if (!stream) {
return;
}
if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
controller.close();
} catch (e) {
globalThis.reportError(e);
} finally {
if (underlyingByteSource?.$stream) {
underlyingByteSource.$stream = undefined;
}
}
}
handleResult = function handleResult(result, controller, view) {
$assert(controller, "controller is missing");
if (result && $isPromise(result)) {
return result.$then(
handleNativeReadableStreamPromiseResult.bind({
c: controller,
v: view,
}),
err => controller.error(err),
);
} else if (typeof result === "number") {
if (view && view.byteLength === result && view.buffer === controller?.byobRequest?.view?.buffer) {
controller.byobRequest.respondWithNewView(view);
} else {
controller.byobRequest.respond(result);
}
} else if ($isTypedArrayView(result)) {
controller.enqueue(result);
}
if (closer[0] || result === false) {
$enqueueJob(callClose, controller);
closer[0] = false;
}
};
function createResult(handle, controller, view, closer) {
closer[0] = false;
var result;
try {
result = handle.pull(view, closer);
} catch (err) {
return controller.error(err);
}
return handleResult(result, controller, view);
}
Prototype = class NativeReadableStreamSource {
constructor(handle, autoAllocateChunkSize, drainValue) {
$putByIdDirectPrivate(this, "stream", handle);
this.pull = this.#pull.bind(this);
this.cancel = this.#cancel.bind(this);
this.autoAllocateChunkSize = autoAllocateChunkSize;
if (drainValue !== undefined) {
this.start = controller => {
this.#controller = new WeakRef(controller);
controller.enqueue(drainValue);
};
}
handle.onClose = this.#onClose.bind(this);
handle.onDrain = this.#onDrain.bind(this);
}
#onDrain(chunk) {
var controller = this.#controller?.deref?.();
if (controller) {
controller.enqueue(chunk);
}
}
#controller: WeakRef<ReadableByteStreamController>;
pull;
cancel;
start;
type = "bytes";
autoAllocateChunkSize = 0;
#closed = false;
#onClose() {
this.#closed = true;
this.#controller = undefined;
var controller = this.#controller?.deref?.();
$putByIdDirectPrivate(this, "stream", undefined);
if (controller) {
$enqueueJob(callClose, controller);
}
}
#pull(controller) {
var handle = $getByIdDirectPrivate(this, "stream");
if (!handle || this.#closed) {
this.#controller = undefined;
$putByIdDirectPrivate(this, "stream", undefined);
$enqueueJob(callClose, controller);
return;
}
if (!this.#controller) {
this.#controller = new WeakRef(controller);
}
createResult(handle, controller, controller.byobRequest.view, closer);
}
#cancel(reason) {
var handle = $getByIdDirectPrivate(this, "stream");
if (handle) {
handle.updateRef(false);
handle.cancel(reason);
$putByIdDirectPrivate(this, "stream", undefined);
}
}
};
// this is reuse of an existing private symbol
Prototype.prototype.$resume = function (has_ref) {
var handle = $getByIdDirectPrivate(this, "stream");
if (handle) handle.updateRef(has_ref);
};
$lazyStreamPrototypeMap.$set($getPrototypeOf(handle), Prototype);
$lazyStreamPrototypeMap.$set($getPrototypeOf(handle), (Prototype = $createLazyLoadedStreamPrototype()));
}
stream.$disturbed = true;
if (autoAllocateChunkSize === undefined) {
// This default is what Node.js uses as well.
autoAllocateChunkSize = 256 * 1024;
}
const chunkSizeOrCompleteBuffer = handle.start(autoAllocateChunkSize);
let chunkSize, drainValue;
if ($isTypedArrayView(chunkSizeOrCompleteBuffer)) {
@@ -1945,7 +2033,6 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
pull(controller) {
controller.close();
},
type: "bytes",
};
}
@@ -1956,11 +2043,10 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
pull(controller) {
controller.close();
},
type: "bytes",
};
}
return new Prototype(handle, chunkSize, drainValue);
return new Prototype(handle, Math.max(chunkSize, autoAllocateChunkSize), drainValue);
}
export function readableStreamIntoArray(stream) {

View File

@@ -1,27 +1,36 @@
import * as http from "node:http";
const options = {
hostname: "www.example.com",
port: 80,
path: "/",
method: "GET",
headers: {},
};
const req = http.request(options, res => {
patchEmitter(res, "res");
console.log(`STATUS: ${res.statusCode}`);
res.setEncoding("utf8");
let server = http.createServer((req, res) => {
res.end("Hello, World!");
});
patchEmitter(req, "req");
req.end();
function patchEmitter(emitter, prefix) {
var oldEmit = emitter.emit;
emitter.emit = function () {
console.log([prefix, arguments[0]]);
oldEmit.apply(emitter, arguments);
server.listen(0, "localhost", 0, () => {
const options = {
hostname: "localhost",
port: server.address().port,
path: "/",
method: "GET",
headers: {},
};
}
const req = http.request(options, res => {
patchEmitter(res, "res");
console.log(`STATUS: ${res.statusCode}`);
res.setEncoding("utf8");
});
patchEmitter(req, "req");
req.end().once("close", () => {
setTimeout(() => {
server.close();
}, 1);
});
function patchEmitter(emitter, prefix) {
var oldEmit = emitter.emit;
emitter.emit = function () {
console.log([prefix, arguments[0]]);
oldEmit.apply(emitter, arguments);
};
}
});

View File

@@ -1859,17 +1859,14 @@ it("#11425 http no payload limit", done => {
});
it("should emit events in the right order", async () => {
const { stdout, stderr, exited } = Bun.spawn({
const { stdout, exited } = Bun.spawn({
cmd: [bunExe(), "run", path.join(import.meta.dir, "fixtures/log-events.mjs")],
stdout: "pipe",
stdin: "ignore",
stderr: "pipe",
stderr: "inherit",
env: bunEnv,
});
const err = await new Response(stderr).text();
expect(err).toBeEmpty();
const out = await new Response(stdout).text();
// TODO prefinish and socket are not emitted in the right order
expect(out.split("\n")).toEqual([
`[ "req", "prefinish" ]`,
`[ "req", "socket" ]`,
@@ -1884,6 +1881,7 @@ it("should emit events in the right order", async () => {
// `[ "res", "close" ]`,
"",
]);
expect(await exited).toBe(0);
});
it("destroy should end download", async () => {

View File

@@ -0,0 +1,62 @@
import { expect, test } from "bun:test";
import { isWindows } from "harness";
const BYTES_TO_WRITE = 500_000;
// https://github.com/oven-sh/bun/issues/12198
test.skipIf(isWindows)(
"Absolute memory usage remains relatively constant when reading and writing to a pipe",
async () => {
async function write(bytes: number) {
const buf = Buffer.alloc(bytes, "foo");
await cat.stdin.write(buf);
}
async function read(bytes: number) {
let i = 0;
while (true) {
const { value } = await r.read();
i += value?.length ?? 0;
if (i >= bytes) {
return;
}
}
}
async function readAndWrite(bytes = BYTES_TO_WRITE) {
await Promise.all([write(bytes), read(bytes)]);
}
await using cat = Bun.spawn(["cat"], {
stdin: "pipe",
stdout: "pipe",
stderr: "inherit",
});
const r = cat.stdout.getReader() as any;
const rounds = 5000;
for (let i = 0; i < rounds; i++) {
await readAndWrite(BYTES_TO_WRITE);
}
Bun.gc(true);
const before = process.memoryUsage.rss();
for (let i = 0; i < rounds; i++) {
await readAndWrite();
}
Bun.gc(true);
const after = process.memoryUsage.rss();
for (let i = 0; i < rounds; i++) {
await readAndWrite();
}
Bun.gc(true);
const after2 = process.memoryUsage.rss();
console.log({ after, after2 });
console.log(require("bun:jsc").heapStats());
console.log("RSS delta", ((after - before) | 0) / 1024 / 1024);
console.log("RSS total", (after / 1024 / 1024) | 0, "MB");
expect(after).toBeLessThan(250 * 1024 * 1024);
expect(after).toBeLessThan(before * 1.5);
},
);

View File

@@ -19,7 +19,7 @@ const writer = (async function () {
// 1. Remove "await" from proc.stdin.write(string) (keep the .end() await)
// 2. Run `hyperfine "bun test/regression/issue/011297.fixture.ts"` (or run this many times on macOS.)
//
await proc.stdin.write(string);
proc.stdin.write(string);
}
await proc.stdin.end();
console.timeEnd("Sent " + string.length + " bytes x 10");