diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index faaa9b2a76..b8d68de8b0 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -25,11 +25,12 @@ */ export function getStdioWriteStream(fd) { + $assert(typeof fd === "number", `Expected fd to be a number, got ${typeof fd}`); const tty = require("node:tty"); let stream; if (tty.isatty(fd)) { - stream = new tty.WriteStream(null, { fd }); + stream = new tty.WriteStream(fd); // TODO: this is the wrong place for this property. // but the TTY is technically duplex // see test-fs-syncwritestream.js @@ -62,7 +63,9 @@ export function getStdioWriteStream(fd) { stream._isStdio = true; stream.fd = fd; - return [stream, stream[require("internal/shared").fileSinkSymbol]]; + const underlyingSink = stream[require("internal/fs/streams").kWriteStreamFastPath]; + $assert(underlyingSink); + return [stream, underlyingSink]; } export function getStdinStream(fd) { diff --git a/src/js/internal-for-testing.ts b/src/js/internal-for-testing.ts index cef9ca23fa..cffb35bec0 100644 --- a/src/js/internal-for-testing.ts +++ b/src/js/internal-for-testing.ts @@ -157,4 +157,10 @@ export const bindgen = $zig("bindgen_test.zig", "getBindgenTestFunctions") as { export const noOpForTesting = $cpp("NoOpForTesting.cpp", "createNoOpForTesting"); export const Dequeue = require("internal/fifo"); -export const fs = require('node:fs/promises').$data; +export const fs = require("node:fs/promises").$data; + +export const fsStreamInternals = { + writeStreamFastPath(str) { + return str[require("internal/fs/streams").kWriteStreamFastPath]; + }, +}; diff --git a/src/js/internal/fs/streams.ts b/src/js/internal/fs/streams.ts index e269646a2f..7673d27b62 100644 --- a/src/js/internal/fs/streams.ts +++ b/src/js/internal/fs/streams.ts @@ -1,78 +1,94 @@ // fs.ReadStream and fs.WriteStream are lazily loaded to avoid importing 'node:stream' until required -const { Readable, Writable } = require("node:stream"); +import type { FileSink } from "bun"; +const { Readable, Writable, finished } = require("node:stream"); const fs: typeof import("node:fs") = require("node:fs"); -const { - read, - write, - fsync, -} = fs; -const { - FileHandle, - kRef, - kUnref, - kFd, -} = (fs.promises as any).$data as { - FileHandle: { new(): FileHandle }; +const { read, write, fsync, writev } = fs; +const { FileHandle, kRef, kUnref, kFd } = (fs.promises as any).$data as { + FileHandle: { new (): FileHandle }; readonly kRef: unique symbol; readonly kUnref: unique symbol; readonly kFd: unique symbol; fs: typeof fs; }; -type FileHandle = import('node:fs/promises').FileHandle & { +type FileHandle = import("node:fs/promises").FileHandle & { on(event: any, listener: any): FileHandle; }; -type FSStream = import("node:fs").ReadStream & import("node:fs").WriteStream & { - fd: number | null; - path: string; - flags: string; - mode: number; - start: number; - end: number; - pos: number | undefined; - bytesRead: number; - flush: boolean; - open: () => void; -}; +type FSStream = import("node:fs").ReadStream & + import("node:fs").WriteStream & { + fd: number | null; + path: string; + flags: string; + mode: number; + start: number; + end: number; + pos: number | undefined; + bytesRead: number; + flush: boolean; + open: () => void; + autoClose: boolean; + /** + * true = path must be opened + * sink = FileSink + */ + [kWriteStreamFastPath]?: undefined | true | FileSink; + }; type FD = number; const { validateInteger, validateInt32, validateFunction } = require("internal/validators"); -// Bun supports a fast path for `createReadStream("path.txt")` in `Bun.serve`, +// Bun supports a fast path for `createReadStream("path.txt")` with `.pipe(res)`, // where the entire stream implementation can be bypassed, effectively making it -// `new Response(Bun.file("path.txt"))`. This makes an idomatic Node.js pattern -// much faster. +// `new Response(Bun.file("path.txt"))`. +// This makes an idomatic Node.js pattern much faster. const kReadStreamFastPath = Symbol("kReadStreamFastPath"); +// Bun supports a fast path for `createWriteStream("path.txt")` where instead of +// using `node:fs`, `Bun.file(...).writer()` is used instead. +const kWriteStreamFastPath = Symbol("kWriteStreamFastPath"); const kFs = Symbol("kFs"); -// const readStreamSymbol = Symbol.for("Bun.NodeReadStream"); -// const readStreamPathOrFdSymbol = Symbol.for("Bun.NodeReadStreamPathOrFd"); -// const writeStreamSymbol = Symbol.for("Bun.NodeWriteStream"); -// const writeStreamPathFastPathSymbol = Symbol.for("Bun.NodeWriteStreamFastPath"); -// const writeStreamPathFastPathCallSymbol = Symbol.for("Bun.NodeWriteStreamFastPathCall"); const kIoDone = Symbol("kIoDone"); const kIsPerformingIO = Symbol("kIsPerformingIO"); -const { read: fileHandlePrototypeRead, write: fileHandlePrototypeWrite, fsync: fileHandlePrototypeFsync } = FileHandle.prototype; - -const blobToStreamWithOffset = $newZigFunction("blob.zig", "Blob.toStreamWithOffset", 1); +const { + read: fileHandlePrototypeRead, + write: fileHandlePrototypeWrite, + fsync: fileHandlePrototypeFsync, + writev: fileHandlePrototypeWritev, +} = FileHandle.prototype; const fileHandleStreamFs = (fh: FileHandle) => ({ // try to use the basic fs.read/write/fsync if available, since they are less // abstractions. however, node.js allows patching the file handle, so this has // to be checked for. - read: fh.read === fileHandlePrototypeRead ? read : function(fd, buf, offset, length, pos, cb) { - return fh.read(buf,offset,length,pos).then(({ bytesRead, buffer }) => cb(null, bytesRead, buffer), (err) => cb(err, 0, buf)); - }, - write: fh.write === fileHandlePrototypeWrite ? write : function(fd, buffer, offset, length, position, cb) { - return fh.write(buffer, offset, length, position).then(({ bytesWritten, buffer }) => cb(null, bytesWritten, buffer), (err) => cb(err, 0, buffer)); - }, - fsync: fh.sync === fileHandlePrototypeFsync ? fsync : function(fd, cb) { - return fh.sync().then(() => cb(), cb); - }, + read: + fh.read === fileHandlePrototypeRead + ? read + : function (fd, buf, offset, length, pos, cb) { + return fh.read(buf, offset, length, pos).then( + ({ bytesRead, buffer }) => cb(null, bytesRead, buffer), + err => cb(err, 0, buf), + ); + }, + write: + fh.write === fileHandlePrototypeWrite + ? write + : function (fd, buffer, offset, length, position, cb) { + return fh.write(buffer, offset, length, position).then( + ({ bytesWritten, buffer }) => cb(null, bytesWritten, buffer), + err => cb(err, 0, buffer), + ); + }, + writev: fh.writev === fileHandlePrototypeWritev ? writev : undefined, + fsync: + fh.sync === fileHandlePrototypeFsync + ? fsync + : function (fd, cb) { + return fh.sync().then(() => cb(), cb); + }, close: streamFileHandleClose.bind(fh), }); function streamFileHandleClose(this: FileHandle, fd: FD, cb: (err?: any) => void) { - $assert(this[kFd] == fd, 'fd mismatch'); + $assert(this[kFd] == fd, "fd mismatch"); this[kUnref](); this.close().then(() => cb(), cb); } @@ -86,27 +102,26 @@ function getValidatedPath(p: any) { function copyObject(source) { const target = {}; // Node tests for prototype lookups, so { ...source } will not work. - for (const key in source) - target[key] = source[key]; + for (const key in source) target[key] = source[key]; return target; } function getStreamOptions(options, defaultOptions = {}) { - if (options == null || typeof options === 'function') { + if (options == null || typeof options === "function") { return defaultOptions; } - if (typeof options === 'string') { - if (options !== 'buffer' && !Buffer.isEncoding(options)) { + if (typeof options === "string") { + if (options !== "buffer" && !Buffer.isEncoding(options)) { throw $ERR_INVALID_ARG_VALUE("encoding", options, "is invalid encoding"); } return { encoding: options }; - } else if (typeof options !== 'object') { - throw $ERR_INVALID_ARG_TYPE('options', ['string', 'Object'], options); + } else if (typeof options !== "object") { + throw $ERR_INVALID_ARG_TYPE("options", ["string", "Object"], options); } let { encoding, signal = true } = options; - if (encoding && encoding !== 'buffer' && !Buffer.isEncoding(encoding)) { + if (encoding && encoding !== "buffer" && !Buffer.isEncoding(encoding)) { throw $ERR_INVALID_ARG_VALUE("encoding", encoding, "is invalid encoding"); } @@ -122,20 +137,13 @@ function ReadStream(this: FSStream, path, options): void { if (!(this instanceof ReadStream)) { return new ReadStream(path, options); } - + options = copyObject(getStreamOptions(options)); // Only buffers are supported. options.decodeStrings = true; - let { - fd, - autoClose, - fs: customFs, - start = 0, - end = Infinity, - encoding, - } = options; + let { fd, autoClose, fs: customFs, start = 0, end = Infinity, encoding } = options; if (fd == null) { this[kFs] = customFs || fs; this.fd = null; @@ -158,27 +166,25 @@ function ReadStream(this: FSStream, path, options): void { } this.fd = fd; this[kFs] = customFs || fs; - } else if (typeof fd === 'object' && fd instanceof FileHandle) { + } else if (typeof fd === "object" && fd instanceof FileHandle) { if (options.fs) { throw $ERR_METHOD_NOT_IMPLEMENTED("fs.FileHandle with custom fs operations"); } this[kFs] = fileHandleStreamFs(fd); this.fd = fd[kFd]; fd[kRef](); - fd.on('close', this.close.bind(this)); + fd.on("close", this.close.bind(this)); } else { - throw $ERR_INVALID_ARG_TYPE('options.fd', 'number or FileHandle', fd); + throw $ERR_INVALID_ARG_TYPE("options.fd", "number or FileHandle", fd); } if (customFs) { validateFunction(customFs.read, "options.fs.read"); } - $assert(this[kFs], 'fs implementation was not assigned'); + $assert(this[kFs], "fs implementation was not assigned"); - if((options.autoDestroy = autoClose === undefined - ? true - : autoClose) && customFs) { + if ((options.autoDestroy = autoClose === undefined ? true : autoClose) && customFs) { validateFunction(customFs.close, "options.fs.close"); } @@ -215,7 +221,7 @@ function ReadStream(this: FSStream, path, options): void { $toClass(ReadStream, "ReadStream", Readable); const readStreamPrototype = ReadStream.prototype; -Object.defineProperty(readStreamPrototype, 'autoClose', { +Object.defineProperty(readStreamPrototype, "autoClose", { get() { return this._readableState.autoDestroy; }, @@ -226,22 +232,29 @@ Object.defineProperty(readStreamPrototype, 'autoClose', { const streamNoop = function open() { // noop -} +}; function streamConstruct(this: FSStream, callback: (e?: any) => void) { const { fd } = this; if (typeof fd === "number") { callback(); return; } + const fastPath = this[kWriteStreamFastPath]; if (this.open !== streamNoop) { + if (fastPath) { + // disable fast path in this case + $assert(this[kWriteStreamFastPath] === true, "fastPath is not true"); + this[kWriteStreamFastPath] = undefined; + } + // Backwards compat for monkey patching open(). const orgEmit: any = this.emit; - this.emit = function(...args) { - if (args[0] === 'open') { + this.emit = function (...args) { + if (args[0] === "open") { this.emit = orgEmit; callback(); orgEmit.$apply(this, args); - } else if (args[0] === 'error') { + } else if (args[0] === "error") { this.emit = orgEmit; callback(args[1]); } else { @@ -250,14 +263,21 @@ function streamConstruct(this: FSStream, callback: (e?: any) => void) { } as any; this.open(); } else { + if (fastPath) { + this.fd = NaN; // TODO: retrieve fd from file sink + callback(); + this.emit("open", this.fd); + this.emit("ready"); + return; + } this[kFs].open(this.path, this.flags, this.mode, (err, fd) => { if (err) { callback(err); } else { this.fd = fd; callback(); - this.emit('open', this.fd); - this.emit('ready'); + this.emit("open", this.fd); + this.emit("ready"); } }); } @@ -267,10 +287,8 @@ readStreamPrototype.open = streamNoop; readStreamPrototype._construct = streamConstruct; -readStreamPrototype._read = function(n) { - n = this.pos !== undefined ? - $min(this.end - this.pos + 1, n) : - $min(this.end - this.bytesRead + 1, n); +readStreamPrototype._read = function (n) { + n = this.pos !== undefined ? $min(this.end - this.pos + 1, n) : $min(this.end - this.bytesRead + 1, n); if (n <= 0) { this.push(null); @@ -280,42 +298,41 @@ readStreamPrototype._read = function(n) { const buf = Buffer.allocUnsafeSlow(n); this[kIsPerformingIO] = true; - this[kFs] - .read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => { - this[kIsPerformingIO] = false; + this[kFs].read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => { + this[kIsPerformingIO] = false; - // Tell ._destroy() that it's safe to close the fd now. - if (this.destroyed) { - this.emit(kIoDone, er); - return; + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) { + this.emit(kIoDone, er); + return; + } + + if (er) { + require("internal/streams/destroy").errorOrDestroy(this, er); + } else if (bytesRead > 0) { + if (this.pos !== undefined) { + this.pos += bytesRead; } - if (er) { - require('internal/streams/destroy').errorOrDestroy(this, er); - } else if (bytesRead > 0) { - if (this.pos !== undefined) { - this.pos += bytesRead; - } + this.bytesRead += bytesRead; - this.bytesRead += bytesRead; - - if (bytesRead !== buf.length) { - // Slow path. Shrink to fit. - // Copy instead of slice so that we don't retain - // large backing buffer for small reads. - const dst = Buffer.allocUnsafeSlow(bytesRead); - buf.copy(dst, 0, 0, bytesRead); - buf = dst; - } - - this.push(buf); - } else { - this.push(null); + if (bytesRead !== buf.length) { + // Slow path. Shrink to fit. + // Copy instead of slice so that we don't retain + // large backing buffer for small reads. + const dst = Buffer.allocUnsafeSlow(bytesRead); + buf.copy(dst, 0, 0, bytesRead); + buf = dst; } - }); + + this.push(buf); + } else { + this.push(null); + } + }); }; -readStreamPrototype._destroy = function(err, cb) { +readStreamPrototype._destroy = function (this: FSStream, err, cb) { // Usually for async IO it is safe to close a file descriptor // even when there are pending operations. However, due to platform // differences file IO is implemented using synchronous operations @@ -323,18 +340,18 @@ readStreamPrototype._destroy = function(err, cb) { // to close while used in a pending read or write operation. Wait for // any pending IO (kIsPerformingIO) to complete (kIoDone). if (this[kIsPerformingIO]) { - this.once(kIoDone, (er) => close(this, err || er, cb)); + this.once(kIoDone, er => close(this, err || er, cb)); } else { close(this, err, cb); } }; -readStreamPrototype.close = function(cb) { - if (typeof cb === 'function') require('node:stream').finished(this, cb); +readStreamPrototype.close = function (cb) { + if (typeof cb === "function") finished(this, cb); this.destroy(); }; -Object.defineProperty(readStreamPrototype, 'pending', { +Object.defineProperty(readStreamPrototype, "pending", { get() { return this.fd == null; }, @@ -345,7 +362,7 @@ function close(stream, err, cb) { if (!stream.fd) { cb(err); } else if (stream.flush) { - stream[kFs].fsync(stream.fd, (flushErr) => { + stream[kFs].fsync(stream.fd, flushErr => { closeAfterSync(stream, err || flushErr, cb); }); } else { @@ -354,13 +371,20 @@ function close(stream, err, cb) { } function closeAfterSync(stream, err, cb) { - stream[kFs].close(stream.fd, (er) => { + stream[kFs].close(stream.fd, er => { cb(er || err); }); stream.fd = null; } -function WriteStream(this: FSStream, path: string | null, options: any): void { +ReadStream.prototype.pipe = function (this: FSStream, dest, pipeOpts) { + // Fast path for streaming files: + // if (this[kReadStreamFastPath]) { + // } + return Readable.prototype.pipe.$call(this, dest, pipeOpts); +} + +function WriteStream(this: FSStream, path: string | null, options?: any): void { if (!(this instanceof WriteStream)) { return new WriteStream(path, options); } @@ -370,13 +394,8 @@ function WriteStream(this: FSStream, path: string | null, options: any): void { // Only buffers are supported. options.decodeStrings = true; - let { - fd, - autoClose, - fs: customFs, - start, - flush, - } = options; + let fastPath = true; + let { fd, autoClose, fs: customFs, start, flush } = options; if (fd == null) { this[kFs] = customFs || fs; this.fd = null; @@ -387,6 +406,9 @@ function WriteStream(this: FSStream, path: string | null, options: any): void { if (customFs) { validateFunction(customFs.open, "options.fs.open"); } + if (flags !== undefined || mode !== undefined || customFs) { + fastPath = false; + } } else if (typeof options.fd === "number") { // When fd is a raw descriptor, we must keep our fingers crossed // that the descriptor won't get closed, or worse, replaced with @@ -399,21 +421,20 @@ function WriteStream(this: FSStream, path: string | null, options: any): void { } this.fd = fd; this[kFs] = customFs || fs; - } else if (typeof fd === 'object' && fd instanceof FileHandle) { + } else if (typeof fd === "object" && fd instanceof FileHandle) { if (options.fs) { throw $ERR_METHOD_NOT_IMPLEMENTED("fs.FileHandle with custom fs operations"); } - this[kFs] = fileHandleStreamFs(fd); - this.fd = fd[kFd]; + this[kFs] = customFs = fileHandleStreamFs(fd); fd[kRef](); - fd.on('close', this.close.bind(this)); + fd.on("close", this.close.bind(this)); + this.fd = fd = fd[kFd]; + fastPath = false; } else { - throw $ERR_INVALID_ARG_TYPE('options.fd', 'number or FileHandle', fd); + throw $ERR_INVALID_ARG_TYPE("options.fd", "number or FileHandle", fd); } - const autoDestroy = options.autoDestroy = autoClose === undefined - ? true - : autoClose; + const autoDestroy = autoClose = (options.autoDestroy = autoClose === undefined ? true : autoClose); if (customFs) { const { write, writev, close, fsync } = customFs; @@ -445,6 +466,15 @@ function WriteStream(this: FSStream, path: string | null, options: any): void { this.pos = start; } + // Enable fast path + if (!start && fastPath) { + this[kWriteStreamFastPath] = fd ? Bun.file(fd).writer() : true; + this._write = underscoreWriteFast; + this._writev = undefined; + this.write = writeFast as any; + this.end = endFast as any; + } + Writable.$call(this, options); if (options.encoding) { @@ -461,13 +491,13 @@ writeStreamPrototype._construct = streamConstruct; function writeAll(data, size, pos, cb, retries = 0) { this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => { // No data currently available and operation should be retried later. - if (er?.code === 'EAGAIN') { + if (er?.code === "EAGAIN") { er = null; bytesWritten = 0; } if (this.destroyed || er) { - return cb(er || $ERR_STREAM_DESTROYED('write')); + return cb(er || $ERR_STREAM_DESTROYED("write")); } this.bytesWritten += bytesWritten; @@ -479,7 +509,7 @@ function writeAll(data, size, pos, cb, retries = 0) { // Try writing non-zero number of bytes up to 5 times. if (retries > 5) { // cb($ERR_SYSTEM_ERROR('write failed')); - cb(new Error('write failed')); + cb(new Error("write failed")); } else if (size) { writeAll.$call(this, buffer.slice(bytesWritten), size, pos, cb, retries); } else { @@ -491,13 +521,13 @@ function writeAll(data, size, pos, cb, retries = 0) { function writevAll(chunks, size, pos, cb, retries = 0) { this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => { // No data currently available and operation should be retried later. - if (er?.code === 'EAGAIN') { + if (er?.code === "EAGAIN") { er = null; bytesWritten = 0; } if (this.destroyed || er) { - return cb(er || $ERR_STREAM_DESTROYED('writev')); + return cb(er || $ERR_STREAM_DESTROYED("writev")); } this.bytesWritten += bytesWritten; @@ -509,7 +539,7 @@ function writevAll(chunks, size, pos, cb, retries = 0) { // Try writing non-zero number of bytes up to 5 times. if (retries > 5) { // cb($ERR_SYSTEM_ERROR('writev failed')); - cb(new Error('writev failed')); + cb(new Error("writev failed")); } else if (size) { writevAll.$call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries); } else { @@ -518,9 +548,9 @@ function writevAll(chunks, size, pos, cb, retries = 0) { }); } -writeStreamPrototype._write = function(data, encoding, cb) { +function _write(data, encoding, cb) { this[kIsPerformingIO] = true; - writeAll.$call(this, data, data.length, this.pos, (er) => { + writeAll.$call(this, data, data.length, this.pos, er => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -531,11 +561,75 @@ writeStreamPrototype._write = function(data, encoding, cb) { cb(er); }); - if (this.pos !== undefined) - this.pos += data.length; -}; + if (this.pos !== undefined) this.pos += data.length; +} +writeStreamPrototype._write = _write; -writeStreamPrototype._writev = function(data, cb) { +function underscoreWriteFast(this: FSStream, data: any, encoding: any, cb: any) { + let fileSink = this[kWriteStreamFastPath]; + if (!fileSink) { + // When the fast path is disabled, the write function gets reset. + this._write = _write; + return this._write(data, encoding, cb); + } + try { + if (fileSink === true) { + fileSink = this[kWriteStreamFastPath] = Bun.file(this.path).writer(); + } + const maybePromise = fileSink.write(data); + if (cb) thenIfPromise(maybePromise, cb); + } catch (e) { + if (cb) process.nextTick(cb, e); + } + return true; +} + +function writeFast(this: FSStream, data: any, encoding: any, cb: any) { + if (encoding != null && typeof encoding === "function") { + cb = encoding; + encoding = null; + } + + let fileSink = this[kWriteStreamFastPath]; + if (!fileSink) { + this.write = Writable.prototype.write; + return this.write(data, encoding, cb); + } else if (fileSink === true) { + if (this.open !== streamNoop) { + this.write = Writable.prototype.write; + return this.write(data, encoding, cb); + } + fileSink = this[kWriteStreamFastPath] = Bun.file(this.path).writer(); + } + + try { + const maybePromise = fileSink.write(data); + if (typeof cb === "function") thenIfPromise(maybePromise, cb); + } catch (e) { + if (typeof cb === "function") process.nextTick(cb, e); + else throw e; + } + return true; +} + +function endFast(this: FSStream, data: any, encoding: any, cb: any) { + const fileSink = this[kWriteStreamFastPath]; + if (fileSink && fileSink !== true) { + if (data) { + const maybePromise = fileSink.write(data); + if ($isPromise(maybePromise)) { + fileSink.flush(); + maybePromise.then(() => Writable.prototype.end.$call(this, cb)); + return; + } + } + process.nextTick(() => Writable.prototype.end.$call(this, cb)); + return; + } + return Writable.prototype.end.$call(this, data, encoding, cb); +} + +writeStreamPrototype._writev = function (data, cb) { const len = data.length; const chunks = new Array(len); let size = 0; @@ -548,7 +642,7 @@ writeStreamPrototype._writev = function(data, cb) { } this[kIsPerformingIO] = true; - writevAll.$call(this, chunks, size, this.pos, (er) => { + writevAll.$call(this, chunks, size, this.pos, er => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -559,11 +653,16 @@ writeStreamPrototype._writev = function(data, cb) { cb(er); }); - if (this.pos !== undefined) - this.pos += size; + if (this.pos !== undefined) this.pos += size; }; -writeStreamPrototype._destroy = function(err, cb) { +writeStreamPrototype._destroy = function (err, cb) { + const fastPath: FileSink | true = this[kWriteStreamFastPath]; + if (fastPath && fastPath !== true) { + const maybePromise = fastPath.end(err); + thenIfPromise(maybePromise, cb); + return; + } // Usually for async IO it is safe to close a file descriptor // even when there are pending operations. However, due to platform // differences file IO is implemented using synchronous operations @@ -571,25 +670,25 @@ writeStreamPrototype._destroy = function(err, cb) { // to close while used in a pending read or write operation. Wait for // any pending IO (kIsPerformingIO) to complete (kIoDone). if (this[kIsPerformingIO]) { - this.once(kIoDone, (er) => close(this, err || er, cb)); + this.once(kIoDone, er => close(this, err || er, cb)); } else { close(this, err, cb); } }; -writeStreamPrototype.close = function(cb) { +writeStreamPrototype.close = function (this: FSStream, cb) { if (cb) { if (this.closed) { process.nextTick(cb); return; } - this.on('close', cb); + this.on("close", cb); } // If we are not autoClosing, we should call // destroy on 'finish'. if (!this.autoClose) { - this.on('finish', this.destroy); + this.on("finish", this.destroy); } // We use end() instead of destroy() because of @@ -600,7 +699,7 @@ writeStreamPrototype.close = function(cb) { // There is no shutdown() for files. writeStreamPrototype.destroySoon = writeStreamPrototype.end; -Object.defineProperty(writeStreamPrototype, 'autoClose', { +Object.defineProperty(writeStreamPrototype, "autoClose", { get() { return this._writableState.autoDestroy; }, @@ -609,9 +708,36 @@ Object.defineProperty(writeStreamPrototype, 'autoClose', { }, }); -Object.$defineProperty(writeStreamPrototype, 'pending', { - get() { return this.fd === null; }, +Object.$defineProperty(writeStreamPrototype, "pending", { + get() { + return this.fd === null; + }, configurable: true, }); -export default { ReadStream, WriteStream, kReadStreamFastPath }; \ No newline at end of file +function thenIfPromise(maybePromise: Promise | T, cb: any) { + $assert(typeof cb === "function", "cb is not a function"); + if ($isPromise(maybePromise)) { + maybePromise.then(() => cb(null), cb); + } else { + process.nextTick(cb, null); + } +} + +function writableFromFileSink(fileSink: any) { + $assert(typeof fileSink === "object", "fileSink is not an object"); + $assert(typeof fileSink.write === "function", "fileSink.write is not a function"); + $assert(typeof fileSink.end === "function", "fileSink.end is not a function"); + const w = new WriteStream(""); + $assert(w[kWriteStreamFastPath] === true, "fast path not enabled"); + w[kWriteStreamFastPath] = fileSink; + w.path = undefined; + return w; +} + +export default { + ReadStream, + WriteStream, + kWriteStreamFastPath, + writableFromFileSink, +}; diff --git a/src/js/internal/shared.ts b/src/js/internal/shared.ts index af82b5c0ba..ae29c54452 100644 --- a/src/js/internal/shared.ts +++ b/src/js/internal/shared.ts @@ -45,8 +45,6 @@ function warnNotImplementedOnce(feature: string, issue?: number) { console.warn(new NotImplementedError(feature, issue)); } -const fileSinkSymbol = Symbol("fileSink"); - // let util: typeof import("node:util"); @@ -103,7 +101,6 @@ export default { throwNotImplemented, hideFromStack, warnNotImplementedOnce, - fileSinkSymbol, ExceptionWithHostPort, once, @@ -111,7 +108,6 @@ export default { kAutoDestroyed: Symbol("kAutoDestroyed"), kResistStopPropagation: Symbol("kResistStopPropagation"), kWeakHandler: Symbol("kWeak"), - kEnsureConstructed: Symbol("kEnsureConstructed"), kGetNativeReadableProto: Symbol("kGetNativeReadableProto"), kEmptyObject, }; diff --git a/src/js/internal/streams/native-readable.ts b/src/js/internal/streams/native-readable.ts new file mode 100644 index 0000000000..9d4badd6b3 --- /dev/null +++ b/src/js/internal/streams/native-readable.ts @@ -0,0 +1,253 @@ +// NativeReadable is an implementation of ReadableStream which contains +// a pointer to a native handle. This is used, for example, to make +// child_process' stderr/out streams go through less hoops. +// +// Normally, Readable.fromWeb will wrap the ReadableStream in JavaScript. In +// Bun, `fromWeb` is able to check if the stream is backed by a native handle, +// to which it will take this path. +const Readable = require("node:stream").Readable; +const transferToNativeReadable = $newCppFunction("ReadableStream.cpp", "jsFunctionTransferToNativeReadableStream", 1); +const { errorOrDestroy } = require("internal/streams/destroy"); + +const kRefCount = Symbol("refCount"); +const kCloseState = Symbol("closeState"); +const kConstructed = Symbol("constructed"); +const kHighWaterMark = Symbol("highWaterMark"); +const kPendingRead = Symbol("pendingRead"); +const kHasResized = Symbol("hasResized"); +const kRemainingChunk = Symbol("remainingChunk"); + +const MIN_BUFFER_SIZE = 512; +let dynamicallyAdjustChunkSize = (_?) => ( + (_ = process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1"), (dynamicallyAdjustChunkSize = () => _) +); + +type NativeReadable = typeof import("node:stream").Readable & + typeof import("node:stream").Stream & { + push: (chunk: any) => void; + $bunNativePtr?: NativePtr; + [kRefCount]: number; + [kCloseState]: [boolean]; + [kPendingRead]: boolean; + [kHighWaterMark]: number; + [kHasResized]: boolean; + [kRemainingChunk]: Buffer; + debugId: number; + }; + +interface NativePtr { + onClose: () => void; + onDrain: (chunk: any) => void; + start: (highWaterMark: number) => number; + drain: () => any; + pull: (view: any, closer: any) => any; + updateRef: (ref: boolean) => void; + cancel: (error: any) => void; +} + +let debugId = 0; + +function constructNativeReadable(readableStream: ReadableStream, options): NativeReadable { + $assert(typeof readableStream === "object" && readableStream instanceof ReadableStream, "Invalid readable stream"); + const bunNativePtr = (readableStream as any).$bunNativePtr; + $assert(typeof bunNativePtr === "object", "Invalid native ptr"); + + const stream = new Readable(options); + stream._read = read; + stream._destroy = destroy; + + if (!!$debug) { + stream.debugId = ++debugId; + } + + stream.$bunNativePtr = bunNativePtr; + stream[kRefCount] = 0; + stream[kConstructed] = false; + stream[kPendingRead] = false; + stream[kHasResized] = !dynamicallyAdjustChunkSize(); + stream[kCloseState] = [false]; + + if (typeof options.highWaterMark === "number") { + stream[kHighWaterMark] = options.highWaterMark; + } else { + stream[kHighWaterMark] = 256 * 1024; + } + + stream.ref = ref; + stream.unref = unref; + if (process.platform === "win32") { + // Only used by node:tty on Windows + stream.$start = ensureConstructed; + } + + // https://github.com/oven-sh/bun/pull/12801 + // https://github.com/oven-sh/bun/issues/9555 + // There may be a ReadableStream.Strong handle to the ReadableStream. + // We can't update those handles to point to the NativeReadable from JS + // So we instead mark it as no longer usable, and create a new NativeReadable + transferToNativeReadable(readableStream); + + $debug(`[${stream.debugId}] constructed!`); + + return stream; +} + +function ensureConstructed(this: NativeReadable, cb: null | (() => void)) { + $debug(`[${this.debugId}] ensureConstructed`); + if (this[kConstructed]) return; + this[kConstructed] = true; + const ptr = this.$bunNativePtr; + if (!ptr) return; + $assert(typeof ptr.start === "function", "NativeReadable.start is not a function"); + ptr.start(this[kHighWaterMark]); + if (cb) cb(); +} + +// maxToRead can be the highWaterMark (by default) or the remaining amount of the stream to read +// This is so the consumer of the stream can terminate the stream early if they know +// how many bytes they want to read (ie. when reading only part of a file) +// ObjectDefinePrivateProperty(NativeReadable.prototype, "_getRemainingChunk", ); +function getRemainingChunk(stream: NativeReadable, maxToRead?: number) { + maxToRead ??= stream[kHighWaterMark] as number; + var chunk = stream[kRemainingChunk]; + if (chunk?.byteLength ?? 0 < MIN_BUFFER_SIZE) { + var size = maxToRead > MIN_BUFFER_SIZE ? maxToRead : MIN_BUFFER_SIZE; + stream[kRemainingChunk] = chunk = Buffer.alloc(size); + } + $debug(`[${stream.debugId}] getRemainingChunk, ${chunk?.byteLength} bytes`); + return chunk; +} + +function read(this: NativeReadable, maxToRead: number) { + $debug(`[${this.debugId}] read${this[kPendingRead] ? ", is already pending" : ""}`); + if (this[kPendingRead]) { + return; + } + var ptr = this.$bunNativePtr; + if (!ptr) { + $debug(`[${this.debugId}] read, no ptr`); + this.push(null); + return; + } + if (!this[kConstructed]) { + const result = ptr.start(this[kHighWaterMark]); + $debug(`[${this.debugId}] start, initial hwm: ${result}`); + if (typeof result === "number" && result > 1) { + this[kHasResized] = true; + this[kHighWaterMark] = Math.min(this[kHighWaterMark], result); + } + const drainResult = ptr.drain(); + this[kConstructed] = true; + $debug(`[${this.debugId}] drain result: ${drainResult?.byteLength ?? 'null'}`); + if ((drainResult?.byteLength ?? 0) > 0) { + this.push(drainResult); + } + } + const chunk = getRemainingChunk(this, maxToRead); + var result = ptr.pull(chunk, this[kCloseState]); + $assert(result !== undefined); + $debug(`[${this.debugId}] pull ${chunk?.byteLength} bytes, result: ${result instanceof Promise ? '' : result}, closeState: ${this[kCloseState][0]}`); + if ($isPromise(result)) { + this[kPendingRead] = true; + return result.then( + result => { + $debug(`[${this.debugId}] pull, resolved: ${result}, closeState: ${this[kCloseState][0]}`); + this[kPendingRead] = false; + this[kRemainingChunk] = handleResult(this, result, chunk, this[kCloseState][0]); + }, + reason => { + errorOrDestroy(this, reason); + }, + ); + } else { + this[kRemainingChunk] = handleResult(this, result, chunk, this[kCloseState][0]); + } +} + +function handleResult(stream: NativeReadable, result: any, chunk: Buffer, isClosed: boolean) { + if (typeof result === "number") { + $debug(`[${stream.debugId}] handleResult(${result})`); + if (result >= stream[kHighWaterMark] && !stream[kHasResized] && !isClosed) { + adjustHighWaterMark(stream); + } + return handleNumberResult(stream, result, chunk, isClosed); + } else if (typeof result === "boolean") { + $debug(`[${stream.debugId}] handleResult(${result})`, chunk, isClosed); + process.nextTick(() => { + stream.push(null); + }); + return (chunk?.byteLength ?? 0 > 0) ? chunk : undefined; + } else if ($isTypedArrayView(result)) { + if (result.byteLength >= stream[kHighWaterMark] && !stream[kHasResized] && !isClosed) { + adjustHighWaterMark(stream); + } + return handleArrayBufferViewResult(stream, result, chunk, isClosed); + } else { + $assert(false, "Invalid result from pull"); + } +} + +function handleNumberResult(stream: NativeReadable, result: number, chunk: any, isClosed: boolean) { + if (result > 0) { + const slice = chunk.subarray(0, result); + chunk = slice.byteLength < chunk.byteLength ? chunk.subarray(result) : undefined; + if (slice.byteLength > 0) { + stream.push(slice); + } + } + + if (isClosed) { + process.nextTick(() => { + stream.push(null); + }); + } + + return chunk; +} + +function handleArrayBufferViewResult(stream: NativeReadable, result: any, chunk: any, isClosed: boolean) { + if (result.byteLength > 0) { + stream.push(result); + } + + if (isClosed) { + process.nextTick(() => { + stream.push(null); + }); + } + + return chunk; +} + +function adjustHighWaterMark(stream: NativeReadable) { + stream[kHighWaterMark] = $min(stream[kHighWaterMark] * 2, 1024 * 1024 * 2); + stream[kHasResized] = true; +} + +function destroy(this: NativeReadable, error: any, cb: () => void) { + const ptr = this.$bunNativePtr; + if (ptr) { + ptr.cancel(error); + } + if (cb) { + process.nextTick(cb); + } +} + +function ref(this: NativeReadable) { + const ptr = this.$bunNativePtr; + if (ptr === undefined) return; + if (this[kRefCount]++ === 0) { + ptr.updateRef(true); + } +} + +function unref(this: NativeReadable) { + const ptr = this.$bunNativePtr; + if (ptr === undefined) return; + if (this[kRefCount]-- === 1) { + ptr.updateRef(false); + } +} + +export default { constructNativeReadable }; diff --git a/src/js/internal/streams/nativereadable.ts b/src/js/internal/streams/nativereadable.ts deleted file mode 100644 index 1ac623dd57..0000000000 --- a/src/js/internal/streams/nativereadable.ts +++ /dev/null @@ -1,248 +0,0 @@ -const { kEnsureConstructed } = require("internal/shared"); -const { errorOrDestroy } = require("internal/streams/destroy"); - -const ProcessNextTick = process.nextTick; - -var DYNAMICALLY_ADJUST_CHUNK_SIZE = process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1"; - -const MIN_BUFFER_SIZE = 512; - -const refCount = Symbol("refCount"); -const constructed = Symbol("constructed"); -const remainingChunk = Symbol("remainingChunk"); -const highWaterMark = Symbol("highWaterMark"); -const pendingRead = Symbol("pendingRead"); -const hasResized = Symbol("hasResized"); -const _onClose = Symbol("_onClose"); -const _onDrain = Symbol("_onDrain"); -const _internalConstruct = Symbol("_internalConstruct"); -const _getRemainingChunk = Symbol("_getRemainingChunk"); -const _adjustHighWaterMark = Symbol("_adjustHighWaterMark"); -const _handleResult = Symbol("_handleResult"); -const _internalRead = Symbol("_internalRead"); - -export default function () { - const Readable = require("internal/streams/readable"); - - var closer = [false]; - var handleNumberResult = function (nativeReadable, result, view, isClosed) { - if (result > 0) { - const slice = view.subarray(0, result); - view = slice.byteLength < view.byteLength ? view.subarray(result) : undefined; - if (slice.byteLength > 0) { - nativeReadable.push(slice); - } - } - - if (isClosed) { - ProcessNextTick(() => { - nativeReadable.push(null); - }); - } - - return view; - }; - - var handleArrayBufferViewResult = function (nativeReadable, result, view, isClosed) { - if (result.byteLength > 0) { - nativeReadable.push(result); - } - - if (isClosed) { - ProcessNextTick(() => { - nativeReadable.push(null); - }); - } - - return view; - }; - - function NativeReadable(ptr, options) { - if (!(this instanceof NativeReadable)) return Reflect.construct(NativeReadable, [ptr, options]); - - this[refCount] = 0; - this[constructed] = false; - this[remainingChunk] = undefined; - this[pendingRead] = false; - this[hasResized] = !DYNAMICALLY_ADJUST_CHUNK_SIZE; - - options ??= {}; - Readable.$call(this, options); - - if (typeof options.highWaterMark === "number") { - this[highWaterMark] = options.highWaterMark; - } else { - this[highWaterMark] = 256 * 1024; - } - this.$bunNativePtr = ptr; - this[constructed] = false; - this[remainingChunk] = undefined; - this[pendingRead] = false; - if (ptr) { - ptr.onClose = this[_onClose].bind(this); - ptr.onDrain = this[_onDrain].bind(this); - } - } - $toClass(NativeReadable, "NativeReadable", Readable); - - NativeReadable.prototype[_onClose] = function () { - this.push(null); - }; - - NativeReadable.prototype[_onDrain] = function (chunk) { - this.push(chunk); - }; - - // maxToRead is by default the highWaterMark passed from the Readable.read call to this fn - // However, in the case of an fs.ReadStream, we can pass the number of bytes we want to read - // which may be significantly less than the actual highWaterMark - NativeReadable.prototype._read = function _read(maxToRead) { - $debug("NativeReadable._read", this.__id); - if (this[pendingRead]) { - $debug("pendingRead is true", this.__id); - return; - } - var ptr = this.$bunNativePtr; - $debug("ptr @ NativeReadable._read", ptr, this.__id); - if (!ptr) { - this.push(null); - return; - } - if (!this[constructed]) { - $debug("NativeReadable not constructed yet", this.__id); - this[_internalConstruct](ptr); - } - return this[_internalRead](this[_getRemainingChunk](maxToRead), ptr); - }; - - NativeReadable.prototype[_internalConstruct] = function (ptr) { - $assert(this[constructed] === false); - this[constructed] = true; - - const result = ptr.start(this[highWaterMark]); - - $debug("NativeReadable internal `start` result", result, this.__id); - - if (typeof result === "number" && result > 1) { - this[hasResized] = true; - $debug("NativeReadable resized", this.__id); - - this[highWaterMark] = Math.min(this[highWaterMark], result); - } - - const drainResult = ptr.drain(); - $debug("NativeReadable drain result", drainResult, this.__id); - if ((drainResult?.byteLength ?? 0) > 0) { - this.push(drainResult); - } - }; - - // maxToRead can be the highWaterMark (by default) or the remaining amount of the stream to read - // This is so the consumer of the stream can terminate the stream early if they know - // how many bytes they want to read (ie. when reading only part of a file) - // ObjectDefinePrivateProperty(NativeReadable.prototype, "_getRemainingChunk", ); - NativeReadable.prototype[_getRemainingChunk] = function (maxToRead) { - maxToRead ??= this[highWaterMark]; - var chunk = this[remainingChunk]; - $debug("chunk @ #getRemainingChunk", chunk, this.__id); - if (chunk?.byteLength ?? 0 < MIN_BUFFER_SIZE) { - var size = maxToRead > MIN_BUFFER_SIZE ? maxToRead : MIN_BUFFER_SIZE; - this[remainingChunk] = chunk = new Buffer(size); - } - return chunk; - }; - - // ObjectDefinePrivateProperty(NativeReadable.prototype, "_adjustHighWaterMark", ); - NativeReadable.prototype[_adjustHighWaterMark] = function () { - this[highWaterMark] = Math.min(this[highWaterMark] * 2, 1024 * 1024 * 2); - this[hasResized] = true; - $debug("Resized", this.__id); - }; - - // ObjectDefinePrivateProperty(NativeReadable.prototype, "_handleResult", ); - NativeReadable.prototype[_handleResult] = function (result, view, isClosed) { - $debug("result, isClosed @ #handleResult", result, isClosed, this.__id); - - if (typeof result === "number") { - if (result >= this[highWaterMark] && !this[hasResized] && !isClosed) { - this[_adjustHighWaterMark](); - } - return handleNumberResult(this, result, view, isClosed); - } else if (typeof result === "boolean") { - ProcessNextTick(() => { - this.push(null); - }); - return (view?.byteLength ?? 0 > 0) ? view : undefined; - } else if ($isTypedArrayView(result)) { - if (result.byteLength >= this[highWaterMark] && !this[hasResized] && !isClosed) { - this[_adjustHighWaterMark](); - } - - return handleArrayBufferViewResult(this, result, view, isClosed); - } else { - $debug("Unknown result type", result, this.__id); - throw new Error("Invalid result from pull"); - } - }; - - NativeReadable.prototype[_internalRead] = function (view, ptr) { - $debug("#internalRead()", this.__id); - closer[0] = false; - var result = ptr.pull(view, closer); - if ($isPromise(result)) { - this[pendingRead] = true; - return result.then( - result => { - this[pendingRead] = false; - $debug("pending no longerrrrrrrr (result returned from pull)", this.__id); - const isClosed = closer[0]; - this[remainingChunk] = this[_handleResult](result, view, isClosed); - }, - reason => { - $debug("error from pull", reason, this.__id); - errorOrDestroy(this, reason); - }, - ); - } else { - this[remainingChunk] = this[_handleResult](result, view, closer[0]); - } - }; - - NativeReadable.prototype._destroy = function (error, callback) { - var ptr = this.$bunNativePtr; - if (!ptr) { - callback(error); - return; - } - - this.$bunNativePtr = undefined; - ptr.updateRef(false); - - $debug("NativeReadable destroyed", this.__id); - ptr.cancel(error); - callback(error); - }; - - NativeReadable.prototype.ref = function () { - var ptr = this.$bunNativePtr; - if (ptr === undefined) return; - if (this[refCount]++ === 0) { - ptr.updateRef(true); - } - }; - - NativeReadable.prototype.unref = function () { - var ptr = this.$bunNativePtr; - if (ptr === undefined) return; - if (this[refCount]-- === 1) { - ptr.updateRef(false); - } - }; - - NativeReadable.prototype[kEnsureConstructed] = function () { - if (this[constructed]) return; - this[_internalConstruct](this.$bunNativePtr); - }; - - return NativeReadable; -} diff --git a/src/js/internal/streams/nativewritable.ts b/src/js/internal/streams/nativewritable.ts deleted file mode 100644 index fcc371efd2..0000000000 --- a/src/js/internal/streams/nativewritable.ts +++ /dev/null @@ -1,135 +0,0 @@ -const Writable = require("internal/streams/writable"); - -const ProcessNextTick = process.nextTick; - -const _native = Symbol("native"); -const _pathOrFdOrSink = Symbol("pathOrFdOrSink"); -const { fileSinkSymbol: _fileSink } = require("internal/shared"); - -function NativeWritable(pathOrFdOrSink, options = {}) { - Writable.$call(this, options); - - this[_native] = true; - - this._construct = NativeWritable_internalConstruct; - this._final = NativeWritable_internalFinal; - this._write = NativeWritablePrototypeWrite; - - this[_pathOrFdOrSink] = pathOrFdOrSink; -} -$toClass(NativeWritable, "NativeWritable", Writable); - -// These are confusingly two different fns for construct which initially were the same thing because -// `_construct` is part of the lifecycle of Writable and is not called lazily, -// so we need to separate our _construct for Writable state and actual construction of the write stream -function NativeWritable_internalConstruct(cb) { - this._writableState.constructed = true; - this.constructed = true; - if (typeof cb === "function") ProcessNextTick(cb); - ProcessNextTick(() => { - this.emit("open", this.fd); - this.emit("ready"); - }); -} - -function NativeWritable_internalFinal(cb) { - var sink = this[_fileSink]; - if (sink) { - const end = sink.end(true); - if ($isPromise(end) && cb) { - end.then(() => { - if (cb) cb(); - }, cb); - } - } - if (cb) cb(); -} - -function NativeWritablePrototypeWrite(chunk, encoding, cb) { - var fileSink = this[_fileSink] ?? NativeWritable_lazyConstruct(this); - var result = fileSink.write(chunk); - - if (typeof encoding === "function") { - cb = encoding; - } - - if ($isPromise(result)) { - // var writePromises = this.#writePromises; - // var i = writePromises.length; - // writePromises[i] = result; - result - .then(result => { - this.emit("drain"); - if (cb) { - cb(null, result); - } - }) - .catch( - cb - ? err => { - cb(err); - } - : err => { - this.emit("error", err); - }, - ); - return false; - } - - // TODO: Should we just have a calculation based on encoding and length of chunk? - if (cb) cb(null, chunk.byteLength); - return true; -} - -function NativeWritable_lazyConstruct(stream) { - // TODO: Turn this check into check for instanceof FileSink - var sink = stream[_pathOrFdOrSink]; - if (typeof sink === "object") { - if (typeof sink.write === "function") { - return (stream[_fileSink] = sink); - } else { - throw new Error("Invalid FileSink"); - } - } else { - return (stream[_fileSink] = Bun.file(sink).writer()); - } -} - -const WritablePrototypeEnd = Writable.prototype.end; -NativeWritable.prototype.end = function end(chunk, encoding, cb, native) { - return WritablePrototypeEnd.$call(this, chunk, encoding, cb, native ?? this[_native]); -}; - -NativeWritable.prototype._destroy = function (error, cb) { - const w = this._writableState; - const r = this._readableState; - - if (w) { - w.destroyed = true; - w.closeEmitted = true; - } - if (r) { - r.destroyed = true; - r.closeEmitted = true; - } - - if (typeof cb === "function") cb(error); - - if (w?.closeEmitted || r?.closeEmitted) { - this.emit("close"); - } -}; - -NativeWritable.prototype.ref = function ref() { - const sink = (this[_fileSink] ||= NativeWritable_lazyConstruct(this)); - sink.ref(); - return this; -}; - -NativeWritable.prototype.unref = function unref() { - const sink = (this[_fileSink] ||= NativeWritable_lazyConstruct(this)); - sink.unref(); - return this; -}; - -export default NativeWritable; diff --git a/src/js/internal/util/inspect.js b/src/js/internal/util/inspect.js index de79cb0a33..6908ff9ca5 100644 --- a/src/js/internal/util/inspect.js +++ b/src/js/internal/util/inspect.js @@ -73,7 +73,6 @@ const MapPrototypeValues = uncurryThis(Map.prototype.values); const MapPrototypeKeys = uncurryThis(Map.prototype.keys); const MathFloor = Math.floor; const MathMax = Math.max; -const MathMin = Math.min; const MathRound = Math.round; const MathSqrt = Math.sqrt; const MathTrunc = Math.trunc; @@ -1623,7 +1622,7 @@ function identicalSequenceRange(a, b) { const rest = b.length - pos; if (rest > 3) { let len = 1; - const maxLen = MathMin(a.length - i, rest); + const maxLen = $min(a.length - i, rest); // Count the number of consecutive entries. while (maxLen > len && a[i + len] === b[pos + len]) { len++; @@ -1873,7 +1872,7 @@ function groupArrayElements(ctx, output, value) { const averageBias = MathSqrt(actualMax - totalLength / output.length); const biasedMax = MathMax(actualMax - 3 - averageBias, 1); // Dynamically check how many columns seem possible. - const columns = MathMin( + const columns = $min( // Ideally a square should be drawn. We expect a character to be about 2.5 // times as high as wide. This is the area formula to calculate a square // which contains n rectangles of size `actualMax * approxCharHeights`. @@ -1914,7 +1913,7 @@ function groupArrayElements(ctx, output, value) { // Each iteration creates a single line of grouped entries. for (let i = 0; i < outputLength; i += columns) { // The last lines may contain less entries than columns. - const max = MathMin(i + columns, outputLength); + const max = $min(i + columns, outputLength); let str = ""; let j = i; for (; j < max - 1; j++) { @@ -2114,7 +2113,7 @@ function formatArrayBuffer(ctx, value) { return [ctx.stylize("(detached)", "special")]; } let str = StringPrototypeTrim( - RegExpPrototypeSymbolReplace(/(.{2})/g, hexSlice(buffer, 0, MathMin(ctx.maxArrayLength, buffer.length)), "$1 "), + RegExpPrototypeSymbolReplace(/(.{2})/g, hexSlice(buffer, 0, $min(ctx.maxArrayLength, buffer.length)), "$1 "), ); const remaining = buffer.length - ctx.maxArrayLength; if (remaining > 0) str += ` ... ${remaining} more byte${remaining > 1 ? "s" : ""}`; @@ -2123,7 +2122,7 @@ function formatArrayBuffer(ctx, value) { function formatArray(ctx, value, recurseTimes) { const valLen = value.length; - const len = MathMin(MathMax(0, ctx.maxArrayLength), valLen); + const len = $min(MathMax(0, ctx.maxArrayLength), valLen); const remaining = valLen - len; const output = []; @@ -2144,9 +2143,9 @@ function formatTypedArray(value, length, ctx, ignored, recurseTimes) { if (Buffer.isBuffer(value)) { BufferModule ??= require("node:buffer"); const INSPECT_MAX_BYTES = $requireMap.$get("buffer")?.exports.INSPECT_MAX_BYTES ?? BufferModule.INSPECT_MAX_BYTES; - ctx.maxArrayLength = MathMin(ctx.maxArrayLength, INSPECT_MAX_BYTES); + ctx.maxArrayLength = $min(ctx.maxArrayLength, INSPECT_MAX_BYTES); } - const maxLength = MathMin(MathMax(0, ctx.maxArrayLength), length); + const maxLength = $min(MathMax(0, ctx.maxArrayLength), length); const remaining = value.length - maxLength; const output = new Array(maxLength); const elementFormatter = value.length > 0 && typeof value[0] === "number" ? formatNumber : formatBigInt; @@ -2171,7 +2170,7 @@ function formatTypedArray(value, length, ctx, ignored, recurseTimes) { function formatSet(value, ctx, ignored, recurseTimes) { const length = value.size; - const maxLength = MathMin(MathMax(0, ctx.maxArrayLength), length); + const maxLength = $min(MathMax(0, ctx.maxArrayLength), length); const remaining = length - maxLength; const output = []; ctx.indentationLvl += 2; @@ -2190,7 +2189,7 @@ function formatSet(value, ctx, ignored, recurseTimes) { function formatMap(value, ctx, ignored, recurseTimes) { const length = value.size; - const maxLength = MathMin(MathMax(0, ctx.maxArrayLength), length); + const maxLength = $min(MathMax(0, ctx.maxArrayLength), length); const remaining = length - maxLength; const output = []; ctx.indentationLvl += 2; @@ -2209,7 +2208,7 @@ function formatMap(value, ctx, ignored, recurseTimes) { function formatSetIterInner(ctx, recurseTimes, entries, state) { const maxArrayLength = MathMax(ctx.maxArrayLength, 0); - const maxLength = MathMin(maxArrayLength, entries.length); + const maxLength = $min(maxArrayLength, entries.length); const output = new Array(maxLength); ctx.indentationLvl += 2; for (let i = 0; i < maxLength; i++) { @@ -2234,7 +2233,7 @@ function formatMapIterInner(ctx, recurseTimes, entries, state) { // Entries exist as [key1, val1, key2, val2, ...] const len = entries.length / 2; const remaining = len - maxArrayLength; - const maxLength = MathMin(maxArrayLength, len); + const maxLength = $min(maxArrayLength, len); const output = new Array(maxLength); let i = 0; ctx.indentationLvl += 2; diff --git a/src/js/internal/webstreams_adapters.ts b/src/js/internal/webstreams_adapters.ts index e8931e1ccb..f69d1bb1b4 100644 --- a/src/js/internal/webstreams_adapters.ts +++ b/src/js/internal/webstreams_adapters.ts @@ -28,31 +28,14 @@ const PromiseResolve = Promise.resolve.bind(Promise); const PromisePrototypeThen = Promise.prototype.then; const SafePromisePrototypeFinally = Promise.prototype.finally; -const constants_zlib = process.binding("constants").zlib; +const constants_zlib = $processBindingConstants.zlib; -// -// -const transferToNativeReadable = $newCppFunction("ReadableStream.cpp", "jsFunctionTransferToNativeReadableStream", 1); - -function getNativeReadableStream(Readable, stream, options) { +function tryTransferToNativeReadable(stream, options) { const ptr = stream.$bunNativePtr; if (!ptr || ptr === -1) { - $debug("no native readable stream"); return undefined; } - const type = stream.$bunNativeType; - $assert(typeof type === "number", "Invalid native type"); - $assert(typeof ptr === "object", "Invalid native ptr"); - - const NativeReadable = require("node:stream")[kGetNativeReadableProto](type); - // https://github.com/oven-sh/bun/pull/12801 - // https://github.com/oven-sh/bun/issues/9555 - // There may be a ReadableStream.Strong handle to the ReadableStream. - // We can't update those handles to point to the NativeReadable from JS - // So we instead mark it as no longer usable, and create a new NativeReadable - transferToNativeReadable(stream); - - return new NativeReadable(ptr, options); + return require("internal/streams/native-readable").constructNativeReadable(stream, options); } class ReadableFromWeb extends Readable { @@ -177,8 +160,6 @@ class ReadableFromWeb extends Readable { } } } -// -// const encoder = new TextEncoder(); @@ -542,7 +523,7 @@ function newStreamReadableFromReadableStream(readableStream, options = kEmptyObj throw $ERR_INVALID_ARG_VALUE("options.encoding", encoding); validateBoolean(objectMode, "options.objectMode"); - const nativeStream = getNativeReadableStream(Readable, readableStream, options); + const nativeStream = tryTransferToNativeReadable(readableStream, options); return ( nativeStream || diff --git a/src/js/node/child_process.ts b/src/js/node/child_process.ts index 096ee95907..8dc4c2f0e0 100644 --- a/src/js/node/child_process.ts +++ b/src/js/node/child_process.ts @@ -53,9 +53,6 @@ if ($debug) { }; } -var NativeWritable; -var ReadableFromWeb; - // Sections: // 1. Exported child_process functions // 2. child_process helpers @@ -1123,21 +1120,16 @@ class ChildProcess extends EventEmitter { } } - NativeWritable ||= StreamModule.NativeWritable; - ReadableFromWeb ||= StreamModule.Readable.fromWeb; - const io = this.#stdioOptions[i]; switch (i) { case 0: { switch (io) { case "pipe": { const stdin = this.#handle.stdin; - if (!stdin) // This can happen if the process was already killed. return new ShimmedStdin(); - - return new NativeWritable(stdin); + return require("internal/fs/streams").writableFromFileSink(stdin); } case "inherit": return process.stdin || null; @@ -1151,13 +1143,12 @@ class ChildProcess extends EventEmitter { case 1: { switch (io) { case "pipe": { - const value = this.#handle[fdToStdioName(i)]; - + const value = this.#handle[fdToStdioName(i) as any as number]; + // This can happen if the process was already killed. if (!value) - // This can happen if the process was already killed. return new ShimmedStdioOutStream(); - const pipe = ReadableFromWeb(value, { encoding }); + const pipe = require('internal/streams/native-readable').constructNativeReadable(value, { encoding }); this.#closesNeeded++; pipe.once("close", () => this.#maybeClose()); if (autoResume) pipe.resume(); diff --git a/src/js/node/stream.ts b/src/js/node/stream.ts index 9d261544c1..306191f914 100644 --- a/src/js/node/stream.ts +++ b/src/js/node/stream.ts @@ -1,35 +1,9 @@ // Hardcoded module "node:stream" / "readable-stream" - -const { kEnsureConstructed, kGetNativeReadableProto } = require("internal/shared"); const EE = require("node:events").EventEmitter; const exports = require("internal/stream"); $debug("node:stream loaded"); -var nativeReadableStreamPrototypes = { - 0: undefined, - 1: undefined, - 2: undefined, - 3: undefined, - 4: undefined, - 5: undefined, -}; - -function getNativeReadableStreamPrototype(nativeType, Readable) { - return (nativeReadableStreamPrototypes[nativeType] ??= require("internal/streams/nativereadable")()); -} - -/** --- Bun native stream wrapper --- */ - -exports[kGetNativeReadableProto] = getNativeReadableStreamPrototype; -exports.NativeWritable = require("internal/streams/nativewritable"); - -const { - newStreamReadableFromReadableStream: _ReadableFromWeb, - _ReadableFromWeb: _ReadableFromWebForUndici, -} = require("internal/webstreams_adapters"); - -exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUndici, kEnsureConstructed }; exports.eos = require("internal/streams/end-of-stream"); exports.EventEmitter = EE; diff --git a/src/js/node/tty.ts b/src/js/node/tty.ts index ac05466827..e2bfce85d3 100644 --- a/src/js/node/tty.ts +++ b/src/js/node/tty.ts @@ -1,3 +1,5 @@ +// Note: please keep this module's loading constrants light, as some users +// import it just to call `isatty`. In that case, `node:stream` is not needed. const { setRawMode: ttySetMode, isatty, @@ -6,7 +8,7 @@ const { const { validateInteger } = require("internal/validators"); -function ReadStream(fd) { +function ReadStream(fd): void { if (!(this instanceof ReadStream)) { return new ReadStream(fd); } @@ -46,7 +48,8 @@ Object.defineProperty(ReadStream, "prototype", { // If you call setRawMode before you call on('data'), the stream will // not be constructed, leading to EBADF - this[require("node:stream")[Symbol.for("::bunternal::")].kEnsureConstructed](); + // This corresponds to the `ensureConstructed` function in `native-readable.ts` + this.$start(); const err = handle.setRawMode(flag); if (err) { @@ -74,11 +77,10 @@ Object.defineProperty(ReadStream, "prototype", { configurable: true, }); -function WriteStream(fd) { +function WriteStream(fd): void { if (!(this instanceof WriteStream)) return new WriteStream(fd); - const stream = require("node:fs").WriteStream.$call(this, "", { fd }); - + const stream = require("node:fs").WriteStream.$call(this, null, { fd }); stream.columns = undefined; stream.rows = undefined; stream.isTTY = isatty(stream.fd); diff --git a/test/js/node/child_process/child_process-node.test.js b/test/js/node/child_process/child_process-node.test.js index ff4699e1e1..710eb5be67 100644 --- a/test/js/node/child_process/child_process-node.test.js +++ b/test/js/node/child_process/child_process-node.test.js @@ -447,8 +447,6 @@ describe("child_process double pipe", () => { }), ); - // TODO(Derrick): We don't implement the full API for this yet, - // So stdin has no 'drain' event. // TODO(@jasnell): This does not appear to ever be // emitted. It's not clear if it is necessary. fakeGrep.stdin.on("drain", () => { diff --git a/test/js/node/child_process/child_process.test.ts b/test/js/node/child_process/child_process.test.ts index 961f9634d3..c87999089c 100644 --- a/test/js/node/child_process/child_process.test.ts +++ b/test/js/node/child_process/child_process.test.ts @@ -427,9 +427,11 @@ it("it accepts stdio passthrough", async () => { stdio: ["ignore", "pipe", "pipe"], env: bunEnv, })); + console.log(package_dir); const [err, out, exitCode] = await Promise.all([new Response(stderr).text(), new Response(stdout).text(), exited]); try { // This command outputs in either `["hello", "world"]` or `["world", "hello"]` order. + console.log({err, out}); expect([err.split("\n")[0], ...err.split("\n").slice(1, -1).sort(), err.split("\n").at(-1)]).toEqual([ "$ run-p echo-hello echo-world", "$ echo hello", diff --git a/test/js/node/fs/fs-leak.test.js b/test/js/node/fs/fs-leak.test.js index 228d6d5a95..cb3a70e7f0 100644 --- a/test/js/node/fs/fs-leak.test.js +++ b/test/js/node/fs/fs-leak.test.js @@ -2,6 +2,7 @@ const { expect, test } = require("bun:test"); const fs = require("fs"); const { tmpdir, devNull } = require("os"); +const { fsStreamInternals } = require('bun:internal-for-testing'); function getMaxFd() { const dev_null = fs.openSync(devNull, "r");