diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index 7d5ef31d12..03dd972f1a 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -109,22 +109,7 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType // Releasing the lock is not possible as there are active reads // we will instead pretend we are unref'd, and release the lock once the reads are finished. - shouldUnref = true; - - // unref the native part of the stream - try { - $getByIdDirectPrivate( - $getByIdDirectPrivate(native, "readableStreamController"), - "underlyingByteSource", - ).$resume(false); - } catch (e) { - if (IS_BUN_DEVELOPMENT) { - // we assume this isn't possible, but because we aren't sure - // we will ignore if error during release, but make a big deal in debug - console.error(e); - $assert(!"reachable"); - } - } + source?.updateRef?.(false); } } else if (source) { source.updateRef(false); @@ -188,25 +173,11 @@ export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType async function internalRead(stream) { $debug("internalRead();"); try { - var done: boolean, value: Uint8Array[]; $assert(reader); - const pendingRead = reader.readMany(); + const { done, value } = await reader.read(); - if ($isPromise(pendingRead)) { - ({ done, value } = await pendingRead); - } else { - $debug("readMany() did not return a promise"); - ({ done, value } = pendingRead); - } - - if (!done) { - stream.push(value[0]); - - // shouldn't actually happen, but just in case - const length = value.length; - for (let i = 1; i < length; i++) { - stream.push(value[i]); - } + if (value) { + stream.push(value); if (shouldUnref) unref(); } else { diff --git a/src/js/builtins/ReadableStreamDefaultReader.ts b/src/js/builtins/ReadableStreamDefaultReader.ts index c1004488a3..c79d3a7848 100644 --- a/src/js/builtins/ReadableStreamDefaultReader.ts +++ b/src/js/builtins/ReadableStreamDefaultReader.ts @@ -51,19 +51,23 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau const state = $getByIdDirectPrivate(stream, "state"); stream.$disturbed = true; - if (state === $streamClosed) return { value: [], size: 0, done: true }; - else if (state === $streamErrored) { + if (state === $streamErrored) { throw $getByIdDirectPrivate(stream, "storedError"); } var controller = $getByIdDirectPrivate(stream, "readableStreamController"); - var queue = $getByIdDirectPrivate(controller, "queue"); - if (!queue) { + if (controller) { + var queue = $getByIdDirectPrivate(controller, "queue"); + } + + if (!queue && state !== $streamClosed) { // This is a ReadableStream direct controller implemented in JS // It hasn't been started yet. return controller.$pull(controller).$then(function ({ done, value }) { - return done ? { done: true, value: [], size: 0 } : { value: [value], size: 1, done: false }; + return done ? { done: true, value: value ? [value] : [], size: 0 } : { value: [value], size: 1, done: false }; }); + } else if (!queue) { + return { done: true, value: [], size: 0 }; } const content = queue.content; @@ -98,27 +102,31 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau $putByValDirect(outValues, i, values[i].value); } } - $resetQueue($getByIdDirectPrivate(controller, "queue")); - if ($getByIdDirectPrivate(controller, "closeRequested")) { - $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); - } else if ($isReadableStreamDefaultController(controller)) { - $readableStreamDefaultControllerCallPullIfNeeded(controller); - } else if ($isReadableByteStreamController(controller)) { - $readableByteStreamControllerCallPullIfNeeded(controller); + if (state !== $streamClosed) { + if ($getByIdDirectPrivate(controller, "closeRequested")) { + $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); + } else if ($isReadableStreamDefaultController(controller)) { + $readableStreamDefaultControllerCallPullIfNeeded(controller); + } else if ($isReadableByteStreamController(controller)) { + $readableByteStreamControllerCallPullIfNeeded(controller); + } } + $resetQueue($getByIdDirectPrivate(controller, "queue")); return { value: outValues, size, done: false }; } var onPullMany = result => { + const resultValue = result.value; + if (result.done) { - return { value: [], size: 0, done: true }; + return { value: resultValue ? [resultValue] : [], size: 0, done: true }; } var controller = $getByIdDirectPrivate(stream, "readableStreamController"); var queue = $getByIdDirectPrivate(controller, "queue"); - var value = [result.value].concat(queue.content.toArray(false)); + var value = [resultValue].concat(queue.content.toArray(false)); var length = value.length; if ($isReadableByteStreamController(controller)) { @@ -136,8 +144,6 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau } var size = queue.size; - $resetQueue(queue); - if ($getByIdDirectPrivate(controller, "closeRequested")) { $readableStreamCloseIfPossible($getByIdDirectPrivate(controller, "controlledReadableStream")); } else if ($isReadableStreamDefaultController(controller)) { @@ -146,12 +152,18 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau $readableByteStreamControllerCallPullIfNeeded(controller); } + $resetQueue($getByIdDirectPrivate(controller, "queue")); + return { value: value, size: size, done: false }; }; + if (state === $streamClosed) { + return { value: [], size: 0, done: true }; + } + var pullResult = controller.$pull(controller); if (pullResult && $isPromise(pullResult)) { - return pullResult.$then(onPullMany) as any; + return pullResult.then(onPullMany) as any; } return onPullMany(pullResult); diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index f81b71d6bd..83e249ccd5 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -1638,9 +1638,7 @@ export function readableStreamReaderGenericRelease(reader) { var stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); if (stream.$bunNativePtr) { - $getByIdDirectPrivate($getByIdDirectPrivate(stream, "readableStreamController"), "underlyingByteSource").$resume( - false, - ); + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "readableStreamController"), "underlyingSource").$resume(false); } $putByIdDirectPrivate(stream, "reader", undefined); $putByIdDirectPrivate(reader, "ownerReadableStream", undefined); @@ -1768,162 +1766,252 @@ export function readableStreamFromAsyncIterator(target, fn) { }); } +export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultController { + const closer = [false]; + + function callClose(controller) { + try { + var source = controller.$underlyingSource; + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + if (!stream) { + return; + } + + if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return; + controller.close(); + } catch (e) { + globalThis.reportError(e); + } finally { + if (source?.$stream) { + source.$stream = undefined; + } + + if (source) { + source.$data = undefined; + } + } + } + + // This was a type: "bytes" until Bun v1.1.44, but pendingPullIntos was not really + // compatible with how we send data to the stream, and "mode: 'byob'" wasn't + // supported so changing it isn't an observable change. + // + // When we receive chunks of data from native code, we sometimes read more + // than what the input buffer provided. When that happens, we return a typed + // array instead of the number of bytes read. + // + // When that happens, the ReadableByteStreamController creates (byteLength / autoAllocateChunkSize) pending pull into descriptors. + // So if that number is something like 16 * 1024, and we actually read 2 MB, you're going to create 128 pending pull into descriptors. + // + // And those pendingPullIntos were often never actually drained. + class NativeReadableStreamSource { + constructor(handle, autoAllocateChunkSize, drainValue) { + $putByIdDirectPrivate(this, "stream", handle); + this.pull = this.#pull.bind(this); + this.cancel = this.#cancel.bind(this); + this.autoAllocateChunkSize = autoAllocateChunkSize; + + if (drainValue !== undefined) { + this.start = controller => { + this.start = undefined; + this.#controller = new WeakRef(controller); + controller.enqueue(drainValue); + }; + } + + handle.onClose = this.#onClose.bind(this); + handle.onDrain = this.#onDrain.bind(this); + } + + #onDrain(chunk) { + var controller = this.#controller?.deref?.(); + if (controller) { + controller.enqueue(chunk); + } + } + + #hasResized = false; + + #adjustHighWaterMark(result) { + const autoAllocateChunkSize = this.autoAllocateChunkSize; + if (result >= autoAllocateChunkSize && !this.#hasResized) { + this.#hasResized = true; + this.autoAllocateChunkSize = Math.min(autoAllocateChunkSize * 2, 1024 * 1024 * 2); + } + } + + #controller: WeakRef; + + pull; + cancel; + start; + + autoAllocateChunkSize = 0; + #chunk; + #closed = false; + + $data?: Uint8Array; + + #onClose() { + this.#closed = true; + this.#controller = undefined; + this.$data = undefined; + + var controller = this.#controller?.deref?.(); + + $putByIdDirectPrivate(this, "stream", undefined); + if (controller) { + $enqueueJob(callClose, controller); + } + } + + #getInternalBuffer(chunkSize) { + var chunk = this.$data; + if (!chunk || chunk.length < chunkSize) { + this.$data = chunk = new Uint8Array(chunkSize); + } + return chunk; + } + + #handleArrayBufferViewResult(result, view, isClosed, controller) { + if (result.byteLength > 0) { + controller.enqueue(result); + } + + if (isClosed) { + $enqueueJob(callClose, controller); + return undefined; + } + + return view; + } + + #handleNumberResult(result, view, isClosed, controller) { + if (result > 0) { + const remaining = view.length - result; + let toEnqueue = view; + + if (remaining > 0) { + toEnqueue = view.subarray(0, result); + view = view.subarray(result); + } else { + view = undefined; + } + + controller.enqueue(toEnqueue); + } + + if (isClosed) { + $enqueueJob(callClose, controller); + return undefined; + } + + return view; + } + + #onNativeReadableStreamResult(result, view, isClosed, controller) { + if (typeof result === "number") { + if (!isClosed) this.#adjustHighWaterMark(result); + return this.#handleNumberResult(result, view, isClosed, controller); + } else if (typeof result === "boolean") { + $enqueueJob(callClose, controller); + return undefined; + } else if ($isTypedArrayView(result)) { + if (!isClosed) this.#adjustHighWaterMark(result.byteLength); + return this.#handleArrayBufferViewResult(result, view, isClosed, controller); + } + + $debug("Unknown result type", result); + throw $ERR_INVALID_STATE("Internal error: invalid result from pull. This is a bug in Bun. Please report it."); + } + + #pull(controller) { + var handle = $getByIdDirectPrivate(this, "stream"); + + if (!handle || this.#closed) { + this.#controller = undefined; + this.#closed = true; + $putByIdDirectPrivate(this, "stream", undefined); + $enqueueJob(callClose, controller); + this.$data = undefined; + return; + } + + if (!this.#controller) { + this.#controller = new WeakRef(controller); + } + + closer[0] = false; + + if (this.$data) { + let drainResult = handle.drain(); + if (drainResult) { + this.$data = this.#onNativeReadableStreamResult(drainResult, this.$data, closer[0], controller); + return; + } + } + + const view = this.#getInternalBuffer(this.autoAllocateChunkSize); + const result = handle.pull(view, closer); + if ($isPromise(result)) { + return result.$then( + result => { + this.$data = this.#onNativeReadableStreamResult(result, view, closer[0], controller); + if (this.#closed) { + this.$data = undefined; + } + }, + err => { + this.$data = undefined; + this.#closed = true; + this.#controller = undefined; + controller.error(err); + this.#onClose(); + }, + ); + } + + this.$data = this.#onNativeReadableStreamResult(result, view, closer[0], controller); + if (this.#closed) { + this.$data = undefined; + } + } + + #cancel(reason) { + var handle = $getByIdDirectPrivate(this, "stream"); + this.$data = undefined; + if (handle) { + handle.updateRef(false); + handle.cancel(reason); + $putByIdDirectPrivate(this, "stream", undefined); + } + } + } + // this is reuse of an existing private symbol + NativeReadableStreamSource.prototype.$resume = function (has_ref) { + var handle = $getByIdDirectPrivate(this, "stream"); + if (handle) handle.updateRef(has_ref); + }; + + return NativeReadableStreamSource; +} + export function lazyLoadStream(stream, autoAllocateChunkSize) { $debug("lazyLoadStream", stream, autoAllocateChunkSize); var handle = stream.$bunNativePtr; if (handle === -1) return; var Prototype = $lazyStreamPrototypeMap.$get($getPrototypeOf(handle)); if (Prototype === undefined) { - var closer = [false]; - var handleResult; - function handleNativeReadableStreamPromiseResult(val) { - var { c, v } = this; - this.c = undefined; - this.v = undefined; - handleResult(val, c, v); - } - - function callClose(controller) { - try { - var underlyingByteSource = controller.$underlyingByteSource; - const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); - if (!stream) { - return; - } - - if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return; - controller.close(); - } catch (e) { - globalThis.reportError(e); - } finally { - if (underlyingByteSource?.$stream) { - underlyingByteSource.$stream = undefined; - } - } - } - - handleResult = function handleResult(result, controller, view) { - $assert(controller, "controller is missing"); - - if (result && $isPromise(result)) { - return result.$then( - handleNativeReadableStreamPromiseResult.bind({ - c: controller, - v: view, - }), - err => controller.error(err), - ); - } else if (typeof result === "number") { - if (view && view.byteLength === result && view.buffer === controller?.byobRequest?.view?.buffer) { - controller.byobRequest.respondWithNewView(view); - } else { - controller.byobRequest.respond(result); - } - } else if ($isTypedArrayView(result)) { - controller.enqueue(result); - } - - if (closer[0] || result === false) { - $enqueueJob(callClose, controller); - closer[0] = false; - } - }; - - function createResult(handle, controller, view, closer) { - closer[0] = false; - - var result; - try { - result = handle.pull(view, closer); - } catch (err) { - return controller.error(err); - } - - return handleResult(result, controller, view); - } - - Prototype = class NativeReadableStreamSource { - constructor(handle, autoAllocateChunkSize, drainValue) { - $putByIdDirectPrivate(this, "stream", handle); - this.pull = this.#pull.bind(this); - this.cancel = this.#cancel.bind(this); - this.autoAllocateChunkSize = autoAllocateChunkSize; - - if (drainValue !== undefined) { - this.start = controller => { - this.#controller = new WeakRef(controller); - controller.enqueue(drainValue); - }; - } - - handle.onClose = this.#onClose.bind(this); - handle.onDrain = this.#onDrain.bind(this); - } - - #onDrain(chunk) { - var controller = this.#controller?.deref?.(); - if (controller) { - controller.enqueue(chunk); - } - } - - #controller: WeakRef; - - pull; - cancel; - start; - - type = "bytes"; - autoAllocateChunkSize = 0; - #closed = false; - - #onClose() { - this.#closed = true; - this.#controller = undefined; - - var controller = this.#controller?.deref?.(); - - $putByIdDirectPrivate(this, "stream", undefined); - if (controller) { - $enqueueJob(callClose, controller); - } - } - - #pull(controller) { - var handle = $getByIdDirectPrivate(this, "stream"); - - if (!handle || this.#closed) { - this.#controller = undefined; - $putByIdDirectPrivate(this, "stream", undefined); - $enqueueJob(callClose, controller); - return; - } - - if (!this.#controller) { - this.#controller = new WeakRef(controller); - } - - createResult(handle, controller, controller.byobRequest.view, closer); - } - - #cancel(reason) { - var handle = $getByIdDirectPrivate(this, "stream"); - if (handle) { - handle.updateRef(false); - handle.cancel(reason); - $putByIdDirectPrivate(this, "stream", undefined); - } - } - }; - // this is reuse of an existing private symbol - Prototype.prototype.$resume = function (has_ref) { - var handle = $getByIdDirectPrivate(this, "stream"); - if (handle) handle.updateRef(has_ref); - }; - $lazyStreamPrototypeMap.$set($getPrototypeOf(handle), Prototype); + $lazyStreamPrototypeMap.$set($getPrototypeOf(handle), (Prototype = $createLazyLoadedStreamPrototype())); } stream.$disturbed = true; + + if (autoAllocateChunkSize === undefined) { + // This default is what Node.js uses as well. + autoAllocateChunkSize = 256 * 1024; + } + const chunkSizeOrCompleteBuffer = handle.start(autoAllocateChunkSize); let chunkSize, drainValue; if ($isTypedArrayView(chunkSizeOrCompleteBuffer)) { @@ -1945,7 +2033,6 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) { pull(controller) { controller.close(); }, - type: "bytes", }; } @@ -1956,11 +2043,10 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) { pull(controller) { controller.close(); }, - type: "bytes", }; } - return new Prototype(handle, chunkSize, drainValue); + return new Prototype(handle, Math.max(chunkSize, autoAllocateChunkSize), drainValue); } export function readableStreamIntoArray(stream) { diff --git a/test/js/node/http/fixtures/log-events.mjs b/test/js/node/http/fixtures/log-events.mjs index bb5dcdd820..c3e6ec55f2 100644 --- a/test/js/node/http/fixtures/log-events.mjs +++ b/test/js/node/http/fixtures/log-events.mjs @@ -1,27 +1,36 @@ import * as http from "node:http"; -const options = { - hostname: "www.example.com", - port: 80, - path: "/", - method: "GET", - headers: {}, -}; - -const req = http.request(options, res => { - patchEmitter(res, "res"); - console.log(`STATUS: ${res.statusCode}`); - res.setEncoding("utf8"); +let server = http.createServer((req, res) => { + res.end("Hello, World!"); }); -patchEmitter(req, "req"); - -req.end(); - -function patchEmitter(emitter, prefix) { - var oldEmit = emitter.emit; - - emitter.emit = function () { - console.log([prefix, arguments[0]]); - oldEmit.apply(emitter, arguments); +server.listen(0, "localhost", 0, () => { + const options = { + hostname: "localhost", + port: server.address().port, + path: "/", + method: "GET", + headers: {}, }; -} + + const req = http.request(options, res => { + patchEmitter(res, "res"); + console.log(`STATUS: ${res.statusCode}`); + res.setEncoding("utf8"); + }); + patchEmitter(req, "req"); + + req.end().once("close", () => { + setTimeout(() => { + server.close(); + }, 1); + }); + + function patchEmitter(emitter, prefix) { + var oldEmit = emitter.emit; + + emitter.emit = function () { + console.log([prefix, arguments[0]]); + oldEmit.apply(emitter, arguments); + }; + } +}); diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index a65e3bbb4a..bbade224e6 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -1859,17 +1859,14 @@ it("#11425 http no payload limit", done => { }); it("should emit events in the right order", async () => { - const { stdout, stderr, exited } = Bun.spawn({ + const { stdout, exited } = Bun.spawn({ cmd: [bunExe(), "run", path.join(import.meta.dir, "fixtures/log-events.mjs")], stdout: "pipe", stdin: "ignore", - stderr: "pipe", + stderr: "inherit", env: bunEnv, }); - const err = await new Response(stderr).text(); - expect(err).toBeEmpty(); const out = await new Response(stdout).text(); - // TODO prefinish and socket are not emitted in the right order expect(out.split("\n")).toEqual([ `[ "req", "prefinish" ]`, `[ "req", "socket" ]`, @@ -1884,6 +1881,7 @@ it("should emit events in the right order", async () => { // `[ "res", "close" ]`, "", ]); + expect(await exited).toBe(0); }); it("destroy should end download", async () => { diff --git a/test/js/web/streams/streams-leak.test.ts b/test/js/web/streams/streams-leak.test.ts new file mode 100644 index 0000000000..29e9a61af7 --- /dev/null +++ b/test/js/web/streams/streams-leak.test.ts @@ -0,0 +1,62 @@ +import { expect, test } from "bun:test"; +import { isWindows } from "harness"; + +const BYTES_TO_WRITE = 500_000; + +// https://github.com/oven-sh/bun/issues/12198 +test.skipIf(isWindows)( + "Absolute memory usage remains relatively constant when reading and writing to a pipe", + async () => { + async function write(bytes: number) { + const buf = Buffer.alloc(bytes, "foo"); + await cat.stdin.write(buf); + } + async function read(bytes: number) { + let i = 0; + while (true) { + const { value } = await r.read(); + i += value?.length ?? 0; + if (i >= bytes) { + return; + } + } + } + + async function readAndWrite(bytes = BYTES_TO_WRITE) { + await Promise.all([write(bytes), read(bytes)]); + } + + await using cat = Bun.spawn(["cat"], { + stdin: "pipe", + stdout: "pipe", + stderr: "inherit", + }); + const r = cat.stdout.getReader() as any; + + const rounds = 5000; + + for (let i = 0; i < rounds; i++) { + await readAndWrite(BYTES_TO_WRITE); + } + Bun.gc(true); + const before = process.memoryUsage.rss(); + + for (let i = 0; i < rounds; i++) { + await readAndWrite(); + } + Bun.gc(true); + const after = process.memoryUsage.rss(); + + for (let i = 0; i < rounds; i++) { + await readAndWrite(); + } + Bun.gc(true); + const after2 = process.memoryUsage.rss(); + console.log({ after, after2 }); + console.log(require("bun:jsc").heapStats()); + console.log("RSS delta", ((after - before) | 0) / 1024 / 1024); + console.log("RSS total", (after / 1024 / 1024) | 0, "MB"); + expect(after).toBeLessThan(250 * 1024 * 1024); + expect(after).toBeLessThan(before * 1.5); + }, +); diff --git a/test/regression/issue/11297/11297.fixture.ts b/test/regression/issue/11297/11297.fixture.ts index 97de15a551..bc6ad09128 100644 --- a/test/regression/issue/11297/11297.fixture.ts +++ b/test/regression/issue/11297/11297.fixture.ts @@ -19,7 +19,7 @@ const writer = (async function () { // 1. Remove "await" from proc.stdin.write(string) (keep the .end() await) // 2. Run `hyperfine "bun test/regression/issue/011297.fixture.ts"` (or run this many times on macOS.) // - await proc.stdin.write(string); + proc.stdin.write(string); } await proc.stdin.end(); console.timeEnd("Sent " + string.length + " bytes x 10");