diff --git a/src/js/internal/fs/FSWatcher-lazy.ts b/src/js/internal/fs/FSWatcher-lazy.ts new file mode 100644 index 0000000000..48b984d65b --- /dev/null +++ b/src/js/internal/fs/FSWatcher-lazy.ts @@ -0,0 +1,22 @@ +var FSWatcher; + +var load = () => { + FSWatcher = require("internal/fs/FSWatcher").FSWatcher; + load = () => {}; + return FSWatcher; +}; + +export default { + FSWatcherPropertyDescriptor: { + enumerable: true, + get() { + load(); + return FSWatcher; + }, + set() {}, + }, + watch: function watch(...args) { + load(); + return new FSWatcher(...args); + }, +}; diff --git a/src/js/internal/fs/FSWatcher.ts b/src/js/internal/fs/FSWatcher.ts new file mode 100644 index 0000000000..657011ac46 --- /dev/null +++ b/src/js/internal/fs/FSWatcher.ts @@ -0,0 +1,71 @@ +var watchNodeFS: typeof import("node:fs").watch; + +const EventEmitter = require("node:events"); +class FSWatcher extends EventEmitter { + #watcher; + #listener; + constructor(path, options, listener) { + super(); + + if (!watchNodeFS) { + const nodeFS = Bun.fs(); + watchNodeFS = nodeFS.watch.bind(nodeFS) as any; + } + + if (typeof options === "function") { + listener = options; + options = {}; + } else if (typeof options === "string") { + options = { encoding: options }; + } + + if (typeof listener !== "function") { + listener = () => {}; + } + + this.#listener = listener; + try { + this.#watcher = watchNodeFS(path, options || {}, this.#onEvent.bind(this)); + } catch (e) { + if (!e.message?.startsWith("FileNotFound")) { + throw e; + } + const notFound = new Error(`ENOENT: no such file or directory, watch '${path}'`); + notFound.code = "ENOENT"; + notFound.errno = -2; + notFound.path = path; + notFound.syscall = "watch"; + notFound.filename = path; + throw notFound; + } + } + + #onEvent(eventType, filenameOrError) { + if (eventType === "error" || eventType === "close") { + this.emit(eventType, filenameOrError); + } else { + this.emit("change", eventType, filenameOrError); + this.#listener(eventType, filenameOrError); + } + } + + close() { + this.#watcher?.close(); + this.#watcher = null; + } + + ref() { + this.#watcher?.ref(); + } + + unref() { + this.#watcher?.unref(); + } + + // https://github.com/nodejs/node/blob/9f51c55a47702dc6a0ca3569853dd7ba022bf7bb/lib/internal/fs/watchers.js#L259-L263 + start() {} +} + +export default { + FSWatcher, +}; diff --git a/src/js/internal/fs/ReadStream-lazy.ts b/src/js/internal/fs/ReadStream-lazy.ts new file mode 100644 index 0000000000..a0c6e71918 --- /dev/null +++ b/src/js/internal/fs/ReadStream-lazy.ts @@ -0,0 +1,20 @@ +var ReadStream; +function load() { + return require("internal/fs/ReadStream").ReadStream; +} + +export default { + ReadStreamPropertyDescriptor: { + enumerable: true, + + get() { + return (ReadStream ??= load()); + }, + + set() {}, + }, + + createReadStream(path, options) { + return new (ReadStream ??= load())(path, options); + }, +}; diff --git a/src/js/internal/fs/ReadStream.ts b/src/js/internal/fs/ReadStream.ts new file mode 100644 index 0000000000..3140f0d1ce --- /dev/null +++ b/src/js/internal/fs/ReadStream.ts @@ -0,0 +1,359 @@ +const readStreamPathFastPathSymbol = Symbol.for("Bun.Node.readStreamPathFastPath"); +const readStreamSymbol = Symbol.for("Bun.NodeReadStream"); +const readStreamPathOrFdSymbol = Symbol.for("Bun.NodeReadStreamPathOrFd"); +const { kIoDone, writeStreamPathFastPathCallSymbol, writeStreamPathFastPathSymbol } = require("internal/symbols"); +const { read, open, openSync, close, fstatSync } = require("node:fs"); +const Stream = require("node:stream"); +var ReadStreamClass; +var defaultReadStreamOptions = { + file: undefined, + fd: null, + flags: "r", + encoding: undefined, + mode: 0o666, + autoClose: true, + emitClose: true, + start: 0, + end: Infinity, + highWaterMark: 64 * 1024, + fs: { + read, + open, + openSync, + close, + }, + autoDestroy: true, +}; + +var ReadStream = (function (InternalReadStream) { + ReadStreamClass = InternalReadStream; + Object.defineProperty(ReadStreamClass.prototype, Symbol.toStringTag, { + value: "ReadStream", + enumerable: false, + }); + function ReadStream(path, options) { + return new InternalReadStream(path, options); + } + ReadStream.prototype = InternalReadStream.prototype; + return Object.defineProperty(ReadStream, Symbol.hasInstance, { + value(instance) { + return instance instanceof InternalReadStream; + }, + }); +})( + class ReadStream extends Stream._getNativeReadableStreamPrototype(2, Stream.Readable) { + constructor(pathOrFd, options = defaultReadStreamOptions) { + if (typeof options !== "object" || !options) { + throw new TypeError("Expected options to be an object"); + } + + var { + flags = defaultReadStreamOptions.flags, + encoding = defaultReadStreamOptions.encoding, + mode = defaultReadStreamOptions.mode, + autoClose = defaultReadStreamOptions.autoClose, + emitClose = defaultReadStreamOptions.emitClose, + start = defaultReadStreamOptions.start, + end = defaultReadStreamOptions.end, + autoDestroy = defaultReadStreamOptions.autoClose, + fs = defaultReadStreamOptions.fs, + highWaterMark = defaultReadStreamOptions.highWaterMark, + fd = defaultReadStreamOptions.fd, + } = options; + + if (pathOrFd?.constructor?.name === "URL") { + pathOrFd = Bun.fileURLToPath(pathOrFd); + } + + // This is kinda hacky but we create a temporary object to assign props that we will later pull into the `this` context after we call super + var tempThis = {}; + if (fd != null) { + if (typeof fd !== "number") { + throw new TypeError("Expected options.fd to be a number"); + } + tempThis.fd = tempThis[readStreamPathOrFdSymbol] = fd; + tempThis.autoClose = false; + } else if (typeof pathOrFd === "string") { + if (pathOrFd.startsWith("file://")) { + pathOrFd = Bun.fileURLToPath(pathOrFd); + } + if (pathOrFd.length === 0) { + throw new TypeError("Expected path to be a non-empty string"); + } + tempThis.path = tempThis.file = tempThis[readStreamPathOrFdSymbol] = pathOrFd; + } else if (typeof pathOrFd === "number") { + pathOrFd |= 0; + if (pathOrFd < 0) { + throw new TypeError("Expected fd to be a positive integer"); + } + tempThis.fd = tempThis[readStreamPathOrFdSymbol] = pathOrFd; + + tempThis.autoClose = false; + } else { + throw new TypeError("Expected a path or file descriptor"); + } + + // If fd not open for this file, open it + if (tempThis.fd === undefined) { + // NOTE: this fs is local to constructor, from options + tempThis.fd = fs.openSync(pathOrFd, flags, mode); + } + // Get FileRef from fd + var fileRef = Bun.file(tempThis.fd); + + // Get the stream controller + // We need the pointer to the underlying stream controller for the NativeReadable + var stream = fileRef.stream(); + var native = $direct(stream); + if (!native) { + $debug("no native readable stream"); + throw new Error("no native readable stream"); + } + var { stream: ptr } = native; + + super(ptr, { + ...options, + encoding, + autoDestroy, + autoClose, + emitClose, + highWaterMark, + }); + + // Assign the tempThis props to this + Object.assign(this, tempThis); + this.#fileRef = fileRef; + + this.end = end; + this._read = this.#internalRead; + this.start = start; + this.flags = flags; + this.mode = mode; + this.emitClose = emitClose; + + this[readStreamPathFastPathSymbol] = + start === 0 && + end === Infinity && + autoClose && + fs === defaultReadStreamOptions.fs && + // is it an encoding which we don't need to decode? + (encoding === "buffer" || + encoding === "binary" || + encoding == null || + encoding === "utf-8" || + encoding === "utf8"); + this._readableState.autoClose = autoDestroy = autoClose; + this._readableState.highWaterMark = highWaterMark; + + if (start !== undefined) { + this.pos = start; + } + } + #fileRef; + #fs; + file; + path; + fd = null; + flags; + mode; + start; + end; + pos; + bytesRead = 0; + #fileSize = -1; + _read; + + [readStreamSymbol] = true; + [readStreamPathOrFdSymbol]; + [readStreamPathFastPathSymbol]; + + _construct(callback) { + if (super._construct) { + super._construct(callback); + } else { + callback(); + } + this.emit("open", this.fd); + this.emit("ready"); + } + + _destroy(err, cb) { + super._destroy(err, cb); + try { + var fd = this.fd; + this[readStreamPathFastPathSymbol] = false; + + if (!fd) { + cb(err); + } else { + this.#fs.close(fd, er => { + cb(er || err); + }); + this.fd = null; + } + } catch (e) { + throw e; + } + } + + close(cb) { + if (typeof cb === "function") Stream.eos(this, cb); + this.destroy(); + } + + push(chunk) { + // Is it even possible for this to be less than 1? + var bytesRead = chunk?.length ?? 0; + if (bytesRead > 0) { + this.bytesRead += bytesRead; + var currPos = this.pos; + // Handle case of going through bytes before pos if bytesRead is less than pos + // If pos is undefined, we are reading through the whole file + // Otherwise we started from somewhere in the middle of the file + if (currPos !== undefined) { + // At this point we still haven't hit our `start` point + // We should discard this chunk and exit + if (this.bytesRead < currPos) { + return true; + } + // At this point, bytes read is greater than our starting position + // If the current position is still the starting position, that means + // this is the first chunk where we care about the bytes read + // and we need to subtract the bytes read from the start position (n) and slice the last n bytes + if (currPos === this.start) { + var n = this.bytesRead - currPos; + chunk = chunk.slice(-n); + var [_, ...rest] = arguments; + this.pos = this.bytesRead; + if (this.end !== undefined && this.bytesRead > this.end) { + chunk = chunk.slice(0, this.end - this.start + 1); + } + return super.push(chunk, ...rest); + } + var end = this.end; + // This is multi-chunk read case where we go passed the end of the what we want to read in the last chunk + if (end !== undefined && this.bytesRead > end) { + chunk = chunk.slice(0, end - currPos + 1); + var [_, ...rest] = arguments; + this.pos = this.bytesRead; + return super.push(chunk, ...rest); + } + this.pos = this.bytesRead; + } + } + + return super.push(...arguments); + } + + // # + + // n should be the the highwatermark passed from Readable.read when calling internal _read (_read is set to this private fn in this class) + #internalRead(n) { + // pos is the current position in the file + // by default, if a start value is provided, pos starts at this.start + var { pos, end, bytesRead, fd, encoding } = this; + + n = + pos !== undefined // if there is a pos, then we are reading from that specific position in the file + ? Math.min(end - pos + 1, n) // takes smaller of length of the rest of the file to read minus the cursor position, or the highwatermark + : Math.min(end - bytesRead + 1, n); // takes the smaller of the length of the rest of the file from the bytes that we have marked read, or the highwatermark + + $debug("n @ fs.ReadStream.#internalRead, after clamp", n); + + // If n is 0 or less, then we read all the file, push null to stream, ending it + if (n <= 0) { + this.push(null); + return; + } + + // At this point, n is the lesser of the length of the rest of the file to read or the highwatermark + // Which means n is the maximum number of bytes to read + + // Basically if we don't know the file size yet, then check it + // Then if n is bigger than fileSize, set n to be fileSize + // This is a fast path to avoid allocating more than the file size for a small file (is this respected by native stream though) + if (this.#fileSize === -1 && bytesRead === 0 && pos === undefined) { + var stat = fstatSync(fd); + this.#fileSize = stat.size; + if (this.#fileSize > 0 && n > this.#fileSize) { + n = this.#fileSize + 1; + } + $debug("fileSize", this.#fileSize); + } + + // At this point, we know the file size and how much we want to read of the file + this[kIoDone] = false; + var res = super._read(n); + $debug("res -- undefined? why?", res); + if ($isPromise(res)) { + var then = res?.then; + if (then && $isCallable(then)) { + res.then( + () => { + this[kIoDone] = true; + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) { + this.emit(kIoDone); + } + }, + er => { + this[kIoDone] = true; + this.#errorOrDestroy(er); + }, + ); + } + } else { + this[kIoDone] = true; + if (this.destroyed) { + this.emit(kIoDone); + this.#errorOrDestroy(new Error("ERR_STREAM_PREMATURE_CLOSE")); + } + } + } + + #errorOrDestroy(err, sync = null) { + var { + _readableState: r = { destroyed: false, autoDestroy: false }, + _writableState: w = { destroyed: false, autoDestroy: false }, + } = this; + + if (w?.destroyed || r?.destroyed) { + return this; + } + if (r?.autoDestroy || w?.autoDestroy) this.destroy(err); + else if (err) { + this.emit("error", err); + } + } + + pause() { + this[readStreamPathFastPathSymbol] = false; + return super.pause(); + } + + resume() { + this[readStreamPathFastPathSymbol] = false; + return super.resume(); + } + + unshift(...args) { + this[readStreamPathFastPathSymbol] = false; + return super.unshift(...args); + } + + pipe(dest, pipeOpts) { + if (this[readStreamPathFastPathSymbol] && (pipeOpts?.end ?? true) && this._readableState?.pipes?.length === 0) { + if (writeStreamPathFastPathSymbol in dest && dest[writeStreamPathFastPathSymbol]) { + if (dest[writeStreamPathFastPathCallSymbol](this, pipeOpts)) { + return this; + } + } + } + + this[readStreamPathFastPathSymbol] = false; + return super.pipe(dest, pipeOpts); + } + }, +); + +export default { ReadStream: ReadStreamClass }; diff --git a/src/js/internal/fs/StatWatcher.ts b/src/js/internal/fs/StatWatcher.ts new file mode 100644 index 0000000000..fb8e5f63fc --- /dev/null +++ b/src/js/internal/fs/StatWatcher.ts @@ -0,0 +1,110 @@ +var StatWatcher; +const statWatchers = new Map(); +let _pathModule; + +function getValidatedPath(p) { + if (p instanceof URL) return Bun.fileURLToPath(p); + if (typeof p !== "string") throw new TypeError("Path must be a string or URL."); + return (_pathModule ??= require("node:path")).resolve(p); +} + +function watchFile(filename, options, listener) { + filename = getValidatedPath(filename); + + if (typeof options === "function") { + listener = options; + options = {}; + } + + if (typeof listener !== "function") { + throw new TypeError("listener must be a function"); + } + + var stat = statWatchers.get(filename); + if (!stat) { + stat = new (StatWatcher ??= load())(filename, options); + statWatchers.set(filename, stat); + } + stat.addListener("change", listener); + return stat; +} + +function load() { + var bunFS; + const EventEmitter = require("node:events"); + var doWatchFile = (a, b, c) => { + bunFS = Bun.fs(); + doWatchFile = bunFS.watchFile.bind(bunFS); + return bunFS.watchFile(a, b, c); + }; + /** Implemented in `node_fs_stat_watcher.zig` */ + // interface StatWatcherHandle { + // ref(); + // unref(); + // close(); + // } + class StatWatcher extends EventEmitter { + // _handle: StatWatcherHandle; + + constructor(path, options) { + super(); + this._handle = doWatchFile(path, options, this.#onChange.bind(this)); + } + + #onChange(curr, prev) { + this.emit("change", curr, prev); + } + + // https://github.com/nodejs/node/blob/9f51c55a47702dc6a0ca3569853dd7ba022bf7bb/lib/internal/fs/watchers.js#L259-L263 + start() {} + + stop() { + this._handle?.close(); + this._handle = null; + } + + ref() { + this._handle?.ref(); + } + + unref() { + this._handle?.unref(); + } + } + + return StatWatcher; +} + +export default { + StatWatcherPropertyDescriptor: { + enumerable: true, + get() { + return (StatWatcher ??= load()); + }, + set() {}, + }, + watchFile, + unwatchFile, +}; + +// TODO: move this entire thing into native code. +// the reason it's not done right now is because there isnt a great way to have multiple +// listeners per StatWatcher with the current implementation in native code. the downside +// of this means we need to do path validation in the js side of things + +function unwatchFile(filename, listener) { + filename = getValidatedPath(filename); + + var stat = statWatchers.get(filename); + if (!stat) return; + if (listener) { + stat.removeListener("change", listener); + if (stat.listenerCount("change") !== 0) { + return; + } + } else { + stat.removeAllListeners("change"); + } + stat.stop(); + statWatchers.delete(filename); +} diff --git a/src/js/internal/fs/WriteStream-lazy.ts b/src/js/internal/fs/WriteStream-lazy.ts new file mode 100644 index 0000000000..9c82868b78 --- /dev/null +++ b/src/js/internal/fs/WriteStream-lazy.ts @@ -0,0 +1,18 @@ +var WriteStream; + +function load() { + return require("internal/fs/WriteStream").WriteStream; +} + +export default { + WriteStreamPropertyDescriptor: { + get() { + return (WriteStream ??= load()); + }, + set() {}, + enumerable: true, + }, + createWriteStream(path, options) { + return new (WriteStream ??= load())(path, options); + }, +}; diff --git a/src/js/internal/fs/WriteStream.ts b/src/js/internal/fs/WriteStream.ts new file mode 100644 index 0000000000..6ba3a764fb --- /dev/null +++ b/src/js/internal/fs/WriteStream.ts @@ -0,0 +1,307 @@ +const { write, close, open, openSync } = require("node:fs"); +const { NativeWritable } = require("node:stream"); +var writeStreamPathFastPathCallSymbol = Symbol.for("Bun.NodeWriteStreamFastPathCall"); +const writeStreamSymbol = Symbol.for("Bun.NodeWriteStream"); +const { kIoDone, writeStreamPathFastSymbol: _writeStreamPathFastPathSymbol } = require("internal/symbols"); +const _fs = Symbol("fs"); +var WriteStream; +var defaultWriteStreamOptions = { + fd: null, + start: undefined, + pos: undefined, + encoding: undefined, + flags: "w", + mode: 0o666, + fs: { + write, + close, + open, + openSync, + }, +}; + +var WriteStreamClass = (WriteStream = function WriteStream(path, options = defaultWriteStreamOptions) { + if (!(this instanceof WriteStream)) { + return new WriteStream(path, options); + } + + if (!options) { + throw new TypeError("Expected options to be an object"); + } + + var { + fs = defaultWriteStreamOptions.fs, + start = defaultWriteStreamOptions.start, + flags = defaultWriteStreamOptions.flags, + mode = defaultWriteStreamOptions.mode, + autoClose = true, + emitClose = false, + autoDestroy = autoClose, + encoding = defaultWriteStreamOptions.encoding, + fd = defaultWriteStreamOptions.fd, + pos = defaultWriteStreamOptions.pos, + } = options; + + var tempThis = {}; + if (fd != null) { + if (typeof fd !== "number") { + throw new Error("Expected options.fd to be a number"); + } + tempThis.fd = fd; + tempThis[_writeStreamPathFastPathSymbol] = false; + } else if (typeof path === "string") { + if (path.length === 0) { + throw new TypeError("Expected a non-empty path"); + } + + if (path.startsWith("file:")) { + path = Bun.fileURLToPath(path); + } + + tempThis.path = path; + tempThis.fd = null; + tempThis[_writeStreamPathFastPathSymbol] = + autoClose && + (start === undefined || start === 0) && + fs.write === defaultWriteStreamOptions.fs.write && + fs.close === defaultWriteStreamOptions.fs.close; + } + + if (tempThis.fd == null) { + tempThis.fd = fs.openSync(path, flags, mode); + } + + NativeWritable.$call(this, tempThis.fd, { + ...options, + decodeStrings: false, + autoDestroy, + emitClose, + fd: tempThis, + }); + Object.assign(this, tempThis); + + if (typeof fs?.write !== "function") { + throw new TypeError("Expected fs.write to be a function"); + } + + if (typeof fs?.close !== "function") { + throw new TypeError("Expected fs.close to be a function"); + } + + if (typeof fs?.open !== "function") { + throw new TypeError("Expected fs.open to be a function"); + } + + if (typeof path === "object" && path) { + if (path instanceof URL) { + path = Bun.fileURLToPath(path); + } + } + + if (typeof path !== "string" && typeof fd !== "number") { + throw new TypeError("Expected a path or file descriptor"); + } + + this.start = start; + this[_fs] = fs; + this.flags = flags; + this.mode = mode; + this.bytesWritten = 0; + this[writeStreamSymbol] = true; + this[kIoDone] = false; + // _write = undefined; + // _writev = undefined; + + if (this.start !== undefined) { + this.pos = this.start; + } + + if (encoding !== defaultWriteStreamOptions.encoding) { + this.setDefaultEncoding(encoding); + if (encoding !== "buffer" && encoding !== "utf8" && encoding !== "utf-8" && encoding !== "binary") { + this[_writeStreamPathFastPathSymbol] = false; + } + } + + return this; +}); +const WriteStreamPrototype = (WriteStream.prototype = Object.create(NativeWritable.prototype)); + +Object.defineProperties(WriteStreamPrototype, { + autoClose: { + get() { + return this._writableState.autoDestroy; + }, + set(val) { + this._writableState.autoDestroy = val; + }, + }, + pending: { + get() { + return this.fd === null; + }, + }, +}); + +// TODO: what is this for? +WriteStreamPrototype.destroySoon = WriteStreamPrototype.end; + +// noop, node has deprecated this +WriteStreamPrototype.open = function open() {}; + +WriteStreamPrototype[writeStreamPathFastPathCallSymbol] = function WriteStreamPathFastPathCallSymbol( + readStream, + pipeOpts, +) { + if (!this[_writeStreamPathFastPathSymbol]) { + return false; + } + + if (this.fd !== null) { + this[_writeStreamPathFastPathSymbol] = false; + return false; + } + + this[kIoDone] = false; + readStream[kIoDone] = false; + return Bun.write(this[_writeStreamPathFastPathSymbol], readStream[readStreamPathOrFdSymbol]).then( + bytesWritten => { + readStream[kIoDone] = this[kIoDone] = true; + this.bytesWritten += bytesWritten; + readStream.bytesRead += bytesWritten; + this.end(); + readStream.close(); + }, + err => { + readStream[kIoDone] = this[kIoDone] = true; + WriteStream_errorOrDestroy.$call(this, err); + readStream.emit("error", err); + }, + ); +}; + +WriteStreamPrototype.isBunFastPathEnabled = function isBunFastPathEnabled() { + return this[_writeStreamPathFastPathSymbol]; +}; + +WriteStreamPrototype.disableBunFastPath = function disableBunFastPath() { + this[_writeStreamPathFastPathSymbol] = false; +}; + +function WriteStream_handleWrite(er, bytes) { + if (er) { + return WriteStream_errorOrDestroy.$call(this, er); + } + + this.bytesWritten += bytes; +} + +function WriteStream_internalClose(err, cb) { + this[_writeStreamPathFastPathSymbol] = false; + var fd = this.fd; + this[_fs].close(fd, er => { + this.fd = null; + cb(err || er); + }); +} + +WriteStreamPrototype._construct = function _construct(callback) { + if (typeof this.fd === "number") { + callback(); + return; + } + + callback(); + this.emit("open", this.fd); + this.emit("ready"); +}; + +WriteStreamPrototype._destroy = function _destroy(err, cb) { + if (this.fd === null) { + return cb(err); + } + + if (this[kIoDone]) { + this.once(kIoDone, () => WriteStream_internalClose.$call(this, err, cb)); + return; + } + + WriteStream_internalClose.$call(this, err, cb); +}; + +WriteStreamPrototype.close = function close(cb) { + if (cb) { + if (this.closed) { + process.nextTick(cb); + return; + } + this.on("close", cb); + } + + // If we are not autoClosing, we should call + // destroy on 'finish'. + if (!this.autoClose) { + this.on("finish", this.destroy); + } + + // We use end() instead of destroy() because of + // https://github.com/nodejs/node/issues/2006 + this.end(); +}; + +WriteStreamPrototype.write = function write(chunk, encoding, cb) { + encoding ??= this._writableState?.defaultEncoding; + this[_writeStreamPathFastPathSymbol] = false; + if (typeof chunk === "string") { + chunk = Buffer.from(chunk, encoding); + } + + // TODO: Replace this when something like lseek is available + var native = this.pos === undefined; + const callback = native + ? (err, bytes) => { + this[kIoDone] = false; + WriteStream_handleWrite.$call(this, err, bytes); + this.emit(kIoDone); + if (cb) !err ? cb() : cb(err); + } + : () => {}; + this[kIoDone] = true; + if (this._write) { + return this._write(chunk, encoding, callback); + } else { + return NativeWritable.prototype.write.$call(this, chunk, encoding, callback, native); + } +}; + +// Do not inherit +WriteStreamPrototype._write = undefined; +WriteStreamPrototype._writev = undefined; + +WriteStreamPrototype.end = function end(chunk, encoding, cb) { + var native = this.pos === undefined; + return NativeWritable.prototype.end.$call(this, chunk, encoding, cb, native); +}; + +WriteStreamPrototype._destroy = function _destroy(err, cb) { + this.close(err, cb); +}; + +function WriteStream_errorOrDestroy(err) { + var { + _readableState: r = { destroyed: false, autoDestroy: false }, + _writableState: w = { destroyed: false, autoDestroy: false }, + } = this; + + if (w?.destroyed || r?.destroyed) { + return this; + } + if (r?.autoDestroy || w?.autoDestroy) this.destroy(err); + else if (err) { + this.emit("error", err); + } +} + +export default { + WriteStream: WriteStreamClass, +}; diff --git a/src/js/internal/symbols.js b/src/js/internal/symbols.js new file mode 100644 index 0000000000..91c9df9c51 --- /dev/null +++ b/src/js/internal/symbols.js @@ -0,0 +1,6 @@ +export default { + // TODO: does this need to be exposed as Symbol.for? + kIoDone: Symbol.for("kIoDone"), + kCustomPromisifiedSymbol: Symbol.for("nodejs.util.promisify.custom"), + writeStreamPathFastSymbol: Symbol.for("Bun.NodeWriteStreamFastPath"), +}; diff --git a/src/js/node/fs.js b/src/js/node/fs.js index e5129c46b9..d5d904cd4d 100644 --- a/src/js/node/fs.js +++ b/src/js/node/fs.js @@ -1,13 +1,10 @@ // Hardcoded module "node:fs" -var ReadStream; -var WriteStream; -const EventEmitter = require("node:events"); -const promises = require("node:fs/promises"); -const Stream = require("node:stream"); - -var _writeStreamPathFastPathSymbol = Symbol.for("Bun.NodeWriteStreamFastPath"); -var _fs = Symbol.for("#fs"); - +const { WriteStreamPropertyDescriptor, createWriteStream } = require("internal/fs/WriteStream-lazy"); +const { ReadStreamPropertyDescriptor, createReadStream } = require("internal/fs/ReadStream-lazy"); +const { FSWatcherPropertyDescriptor, watch } = require("internal/fs/FSWatcher-lazy"); +const { StatWatcherPropertyDescriptor, watchFile, unwatchFile } = require("internal/fs/StatWatcher"); +// TODO: make symbols a separate export somewhere +var { kCustomPromisifiedSymbol } = require("internal/symbols"); const constants = $processBindingConstants.fs; function ensureCallback(callback) { @@ -21,101 +18,6 @@ function ensureCallback(callback) { } var fs = Bun.fs(); -class FSWatcher extends EventEmitter { - #watcher; - #listener; - constructor(path, options, listener) { - super(); - - if (typeof options === "function") { - listener = options; - options = {}; - } else if (typeof options === "string") { - options = { encoding: options }; - } - - if (typeof listener !== "function") { - listener = () => {}; - } - - this.#listener = listener; - try { - this.#watcher = fs.watch(path, options || {}, this.#onEvent.bind(this)); - } catch (e) { - if (!e.message?.startsWith("FileNotFound")) { - throw e; - } - const notFound = new Error(`ENOENT: no such file or directory, watch '${path}'`); - notFound.code = "ENOENT"; - notFound.errno = -2; - notFound.path = path; - notFound.syscall = "watch"; - notFound.filename = path; - throw notFound; - } - } - - #onEvent(eventType, filenameOrError) { - if (eventType === "error" || eventType === "close") { - this.emit(eventType, filenameOrError); - } else { - this.emit("change", eventType, filenameOrError); - this.#listener(eventType, filenameOrError); - } - } - - close() { - this.#watcher?.close(); - this.#watcher = null; - } - - ref() { - this.#watcher?.ref(); - } - - unref() { - this.#watcher?.unref(); - } - - // https://github.com/nodejs/node/blob/9f51c55a47702dc6a0ca3569853dd7ba022bf7bb/lib/internal/fs/watchers.js#L259-L263 - start() {} -} - -/** Implemented in `node_fs_stat_watcher.zig` */ -// interface StatWatcherHandle { -// ref(); -// unref(); -// close(); -// } - -class StatWatcher extends EventEmitter { - // _handle: StatWatcherHandle; - - constructor(path, options) { - super(); - this._handle = fs.watchFile(path, options, this.#onChange.bind(this)); - } - - #onChange(curr, prev) { - this.emit("change", curr, prev); - } - - // https://github.com/nodejs/node/blob/9f51c55a47702dc6a0ca3569853dd7ba022bf7bb/lib/internal/fs/watchers.js#L259-L263 - start() {} - - stop() { - this._handle?.close(); - this._handle = null; - } - - ref() { - this._handle?.ref(); - } - - unref() { - this._handle?.unref(); - } -} var access = function access(...args) { callbackify(fs.access, args); @@ -350,13 +252,7 @@ var access = function access(...args) { }, readvSync = fs.readvSync.bind(fs), Dirent = fs.Dirent, - Stats = fs.Stats, - watch = function watch(path, options, listener) { - return new FSWatcher(path, options, listener); - }; - -// TODO: make symbols a separate export somewhere -var kCustomPromisifiedSymbol = Symbol.for("nodejs.util.promisify.custom"); + Stats = fs.Stats; read[kCustomPromisifiedSymbol] = async function (fd, bufferOrOptions, ...rest) { const { isArrayBufferView } = require("node:util/types"); @@ -382,53 +278,15 @@ write[kCustomPromisifiedSymbol] = async function (fd, stringOrBuffer, ...rest) { return { bytesWritten, buffer: stringOrBuffer }; }; -// TODO: move this entire thing into native code. -// the reason it's not done right now is because there isnt a great way to have multiple -// listeners per StatWatcher with the current implementation in native code. the downside -// of this means we need to do path validation in the js side of things -const statWatchers = new Map(); -let _pathModule; -function getValidatedPath(p) { - if (p instanceof URL) return Bun.fileURLToPath(p); - if (typeof p !== "string") throw new TypeError("Path must be a string or URL."); - return (_pathModule ??= require("node:path")).resolve(p); -} -function watchFile(filename, options, listener) { - filename = getValidatedPath(filename); +writev[kCustomPromisifiedSymbol] = async function (fd, buffers, ...rest) { + const bytesWritten = await fs.writev(fd, buffers, ...rest); + return { bytesWritten, buffers }; +}; - if (typeof options === "function") { - listener = options; - options = {}; - } - - if (typeof listener !== "function") { - throw new TypeError("listener must be a function"); - } - - var stat = statWatchers.get(filename); - if (!stat) { - stat = new StatWatcher(filename, options); - statWatchers.set(filename, stat); - } - stat.addListener("change", listener); - return stat; -} -function unwatchFile(filename, listener) { - filename = getValidatedPath(filename); - - var stat = statWatchers.get(filename); - if (!stat) return; - if (listener) { - stat.removeListener("change", listener); - if (stat.listenerCount("change") !== 0) { - return; - } - } else { - stat.removeAllListeners("change"); - } - stat.stop(); - statWatchers.delete(filename); -} +readv[kCustomPromisifiedSymbol] = async function (fd, buffers, ...rest) { + const bytesRead = await fs.readv(fd, buffers, ...rest); + return { bytesRead, buffers }; +}; function callbackify(fsFunction, args) { const callback = args[args.length - 1]; @@ -460,701 +318,6 @@ function callbackify(fsFunction, args) { // _events // _eventsCount // _maxListener -var readStreamPathFastPathSymbol = Symbol.for("Bun.Node.readStreamPathFastPath"); -const readStreamSymbol = Symbol.for("Bun.NodeReadStream"); -const readStreamPathOrFdSymbol = Symbol.for("Bun.NodeReadStreamPathOrFd"); -const writeStreamSymbol = Symbol.for("Bun.NodeWriteStream"); -var writeStreamPathFastPathSymbol = Symbol.for("Bun.NodeWriteStreamFastPath"); -var writeStreamPathFastPathCallSymbol = Symbol.for("Bun.NodeWriteStreamFastPathCall"); -var kIoDone = Symbol.for("kIoDone"); - -var defaultReadStreamOptions = { - file: undefined, - fd: null, - flags: "r", - encoding: undefined, - mode: 0o666, - autoClose: true, - emitClose: true, - start: 0, - end: Infinity, - highWaterMark: 64 * 1024, - fs: { - read, - open: (path, flags, mode, cb) => { - var fd; - try { - fd = openSync(path, flags, mode); - } catch (e) { - cb(e); - return; - } - - cb(null, fd); - }, - openSync, - close, - }, - autoDestroy: true, -}; - -var ReadStreamClass; - -ReadStream = (function (InternalReadStream) { - ReadStreamClass = InternalReadStream; - Object.defineProperty(ReadStreamClass.prototype, Symbol.toStringTag, { - value: "ReadStream", - enumerable: false, - }); - function ReadStream(path, options) { - return new InternalReadStream(path, options); - } - ReadStream.prototype = InternalReadStream.prototype; - return Object.defineProperty(ReadStream, Symbol.hasInstance, { - value(instance) { - return instance instanceof InternalReadStream; - }, - }); -})( - class ReadStream extends Stream._getNativeReadableStreamPrototype(2, Stream.Readable) { - constructor(pathOrFd, options = defaultReadStreamOptions) { - if (typeof options !== "object" || !options) { - throw new TypeError("Expected options to be an object"); - } - - var { - flags = defaultReadStreamOptions.flags, - encoding = defaultReadStreamOptions.encoding, - mode = defaultReadStreamOptions.mode, - autoClose = defaultReadStreamOptions.autoClose, - emitClose = defaultReadStreamOptions.emitClose, - start = defaultReadStreamOptions.start, - end = defaultReadStreamOptions.end, - autoDestroy = defaultReadStreamOptions.autoClose, - fs = defaultReadStreamOptions.fs, - highWaterMark = defaultReadStreamOptions.highWaterMark, - fd = defaultReadStreamOptions.fd, - } = options; - - if (pathOrFd?.constructor?.name === "URL") { - pathOrFd = Bun.fileURLToPath(pathOrFd); - } - - // This is kinda hacky but we create a temporary object to assign props that we will later pull into the `this` context after we call super - var tempThis = {}; - if (fd != null) { - if (typeof fd !== "number") { - throw new TypeError("Expected options.fd to be a number"); - } - tempThis.fd = tempThis[readStreamPathOrFdSymbol] = fd; - tempThis.autoClose = false; - } else if (typeof pathOrFd === "string") { - if (pathOrFd.startsWith("file://")) { - pathOrFd = Bun.fileURLToPath(pathOrFd); - } - if (pathOrFd.length === 0) { - throw new TypeError("Expected path to be a non-empty string"); - } - tempThis.path = tempThis.file = tempThis[readStreamPathOrFdSymbol] = pathOrFd; - } else if (typeof pathOrFd === "number") { - pathOrFd |= 0; - if (pathOrFd < 0) { - throw new TypeError("Expected fd to be a positive integer"); - } - tempThis.fd = tempThis[readStreamPathOrFdSymbol] = pathOrFd; - - tempThis.autoClose = false; - } else { - throw new TypeError("Expected a path or file descriptor"); - } - - // If fd not open for this file, open it - if (tempThis.fd === undefined) { - // NOTE: this fs is local to constructor, from options - tempThis.fd = fs.openSync(pathOrFd, flags, mode); - } - // Get FileRef from fd - var fileRef = Bun.file(tempThis.fd); - - // Get the stream controller - // We need the pointer to the underlying stream controller for the NativeReadable - var stream = fileRef.stream(); - var native = $direct(stream); - if (!native) { - $debug("no native readable stream"); - throw new Error("no native readable stream"); - } - var { stream: ptr } = native; - - super(ptr, { - ...options, - encoding, - autoDestroy, - autoClose, - emitClose, - highWaterMark, - }); - - // Assign the tempThis props to this - Object.assign(this, tempThis); - this.#fileRef = fileRef; - - this.end = end; - this._read = this.#internalRead; - this.start = start; - this.flags = flags; - this.mode = mode; - this.emitClose = emitClose; - - this[readStreamPathFastPathSymbol] = - start === 0 && - end === Infinity && - autoClose && - fs === defaultReadStreamOptions.fs && - // is it an encoding which we don't need to decode? - (encoding === "buffer" || - encoding === "binary" || - encoding == null || - encoding === "utf-8" || - encoding === "utf8"); - this._readableState.autoClose = autoDestroy = autoClose; - this._readableState.highWaterMark = highWaterMark; - - if (start !== undefined) { - this.pos = start; - } - } - #fileRef; - #fs; - file; - path; - fd = null; - flags; - mode; - start; - end; - pos; - bytesRead = 0; - #fileSize = -1; - _read; - - [readStreamSymbol] = true; - [readStreamPathOrFdSymbol]; - [readStreamPathFastPathSymbol]; - - _construct(callback) { - if (super._construct) { - super._construct(callback); - } else { - callback(); - } - this.emit("open", this.fd); - this.emit("ready"); - } - - _destroy(err, cb) { - super._destroy(err, cb); - try { - var fd = this.fd; - this[readStreamPathFastPathSymbol] = false; - - if (!fd) { - cb(err); - } else { - this.#fs.close(fd, er => { - cb(er || err); - }); - this.fd = null; - } - } catch (e) { - throw e; - } - } - - close(cb) { - if (typeof cb === "function") Stream.eos(this, cb); - this.destroy(); - } - - push(chunk) { - // Is it even possible for this to be less than 1? - var bytesRead = chunk?.length ?? 0; - if (bytesRead > 0) { - this.bytesRead += bytesRead; - var currPos = this.pos; - // Handle case of going through bytes before pos if bytesRead is less than pos - // If pos is undefined, we are reading through the whole file - // Otherwise we started from somewhere in the middle of the file - if (currPos !== undefined) { - // At this point we still haven't hit our `start` point - // We should discard this chunk and exit - if (this.bytesRead < currPos) { - return true; - } - // At this point, bytes read is greater than our starting position - // If the current position is still the starting position, that means - // this is the first chunk where we care about the bytes read - // and we need to subtract the bytes read from the start position (n) and slice the last n bytes - if (currPos === this.start) { - var n = this.bytesRead - currPos; - chunk = chunk.slice(-n); - var [_, ...rest] = arguments; - this.pos = this.bytesRead; - if (this.end !== undefined && this.bytesRead > this.end) { - chunk = chunk.slice(0, this.end - this.start + 1); - } - return super.push(chunk, ...rest); - } - var end = this.end; - // This is multi-chunk read case where we go passed the end of the what we want to read in the last chunk - if (end !== undefined && this.bytesRead > end) { - chunk = chunk.slice(0, end - currPos + 1); - var [_, ...rest] = arguments; - this.pos = this.bytesRead; - return super.push(chunk, ...rest); - } - this.pos = this.bytesRead; - } - } - - return super.push(...arguments); - } - - // # - - // n should be the the highwatermark passed from Readable.read when calling internal _read (_read is set to this private fn in this class) - #internalRead(n) { - // pos is the current position in the file - // by default, if a start value is provided, pos starts at this.start - var { pos, end, bytesRead, fd, encoding } = this; - - n = - pos !== undefined // if there is a pos, then we are reading from that specific position in the file - ? Math.min(end - pos + 1, n) // takes smaller of length of the rest of the file to read minus the cursor position, or the highwatermark - : Math.min(end - bytesRead + 1, n); // takes the smaller of the length of the rest of the file from the bytes that we have marked read, or the highwatermark - - $debug("n @ fs.ReadStream.#internalRead, after clamp", n); - - // If n is 0 or less, then we read all the file, push null to stream, ending it - if (n <= 0) { - this.push(null); - return; - } - - // At this point, n is the lesser of the length of the rest of the file to read or the highwatermark - // Which means n is the maximum number of bytes to read - - // Basically if we don't know the file size yet, then check it - // Then if n is bigger than fileSize, set n to be fileSize - // This is a fast path to avoid allocating more than the file size for a small file (is this respected by native stream though) - if (this.#fileSize === -1 && bytesRead === 0 && pos === undefined) { - var stat = fstatSync(fd); - this.#fileSize = stat.size; - if (this.#fileSize > 0 && n > this.#fileSize) { - n = this.#fileSize + 1; - } - $debug("fileSize", this.#fileSize); - } - - // At this point, we know the file size and how much we want to read of the file - this[kIoDone] = false; - var res = super._read(n); - $debug("res -- undefined? why?", res); - if ($isPromise(res)) { - var then = res?.then; - if (then && $isCallable(then)) { - res.then( - () => { - this[kIoDone] = true; - // Tell ._destroy() that it's safe to close the fd now. - if (this.destroyed) { - this.emit(kIoDone); - } - }, - er => { - this[kIoDone] = true; - this.#errorOrDestroy(er); - }, - ); - } - } else { - this[kIoDone] = true; - if (this.destroyed) { - this.emit(kIoDone); - this.#errorOrDestroy(new Error("ERR_STREAM_PREMATURE_CLOSE")); - } - } - } - - #errorOrDestroy(err, sync = null) { - var { - _readableState: r = { destroyed: false, autoDestroy: false }, - _writableState: w = { destroyed: false, autoDestroy: false }, - } = this; - - if (w?.destroyed || r?.destroyed) { - return this; - } - if (r?.autoDestroy || w?.autoDestroy) this.destroy(err); - else if (err) { - this.emit("error", err); - } - } - - pause() { - this[readStreamPathFastPathSymbol] = false; - return super.pause(); - } - - resume() { - this[readStreamPathFastPathSymbol] = false; - return super.resume(); - } - - unshift(...args) { - this[readStreamPathFastPathSymbol] = false; - return super.unshift(...args); - } - - pipe(dest, pipeOpts) { - if (this[readStreamPathFastPathSymbol] && (pipeOpts?.end ?? true) && this._readableState?.pipes?.length === 0) { - if (writeStreamPathFastPathSymbol in dest && dest[writeStreamPathFastPathSymbol]) { - if (dest[writeStreamPathFastPathCallSymbol](this, pipeOpts)) { - return this; - } - } - } - - this[readStreamPathFastPathSymbol] = false; - return super.pipe(dest, pipeOpts); - } - }, -); - -function createReadStream(path, options) { - return new ReadStream(path, options); -} - -var defaultWriteStreamOptions = { - fd: null, - start: undefined, - pos: undefined, - encoding: undefined, - flags: "w", - mode: 0o666, - fs: { - write, - close, - open, - openSync, - }, -}; - -var WriteStreamClass = (WriteStream = function WriteStream(path, options = defaultWriteStreamOptions) { - if (!(this instanceof WriteStream)) { - return new WriteStream(path, options); - } - - if (!options) { - throw new TypeError("Expected options to be an object"); - } - - var { - fs = defaultWriteStreamOptions.fs, - start = defaultWriteStreamOptions.start, - flags = defaultWriteStreamOptions.flags, - mode = defaultWriteStreamOptions.mode, - autoClose = true, - emitClose = false, - autoDestroy = autoClose, - encoding = defaultWriteStreamOptions.encoding, - fd = defaultWriteStreamOptions.fd, - pos = defaultWriteStreamOptions.pos, - } = options; - - var tempThis = {}; - if (fd != null) { - if (typeof fd !== "number") { - throw new Error("Expected options.fd to be a number"); - } - tempThis.fd = fd; - tempThis[_writeStreamPathFastPathSymbol] = false; - } else if (typeof path === "string") { - if (path.length === 0) { - throw new TypeError("Expected a non-empty path"); - } - - if (path.startsWith("file:")) { - path = Bun.fileURLToPath(path); - } - - tempThis.path = path; - tempThis.fd = null; - tempThis[_writeStreamPathFastPathSymbol] = - autoClose && - (start === undefined || start === 0) && - fs.write === defaultWriteStreamOptions.fs.write && - fs.close === defaultWriteStreamOptions.fs.close; - } - - if (tempThis.fd == null) { - tempThis.fd = fs.openSync(path, flags, mode); - } - - NativeWritable.$call(this, tempThis.fd, { - ...options, - decodeStrings: false, - autoDestroy, - emitClose, - fd: tempThis, - }); - Object.assign(this, tempThis); - - if (typeof fs?.write !== "function") { - throw new TypeError("Expected fs.write to be a function"); - } - - if (typeof fs?.close !== "function") { - throw new TypeError("Expected fs.close to be a function"); - } - - if (typeof fs?.open !== "function") { - throw new TypeError("Expected fs.open to be a function"); - } - - if (typeof path === "object" && path) { - if (path instanceof URL) { - path = Bun.fileURLToPath(path); - } - } - - if (typeof path !== "string" && typeof fd !== "number") { - throw new TypeError("Expected a path or file descriptor"); - } - - this.start = start; - this[_fs] = fs; - this.flags = flags; - this.mode = mode; - this.bytesWritten = 0; - this[writeStreamSymbol] = true; - this[kIoDone] = false; - // _write = undefined; - // _writev = undefined; - - if (this.start !== undefined) { - this.pos = this.start; - } - - if (encoding !== defaultWriteStreamOptions.encoding) { - this.setDefaultEncoding(encoding); - if (encoding !== "buffer" && encoding !== "utf8" && encoding !== "utf-8" && encoding !== "binary") { - this[_writeStreamPathFastPathSymbol] = false; - } - } - - return this; -}); -const NativeWritable = Stream.NativeWritable; -const WriteStreamPrototype = (WriteStream.prototype = Object.create(NativeWritable.prototype)); - -Object.defineProperties(WriteStreamPrototype, { - autoClose: { - get() { - return this._writableState.autoDestroy; - }, - set(val) { - this._writableState.autoDestroy = val; - }, - }, - pending: { - get() { - return this.fd === null; - }, - }, -}); - -// TODO: what is this for? -WriteStreamPrototype.destroySoon = WriteStreamPrototype.end; - -// noop, node has deprecated this -WriteStreamPrototype.open = function open() {}; - -WriteStreamPrototype[writeStreamPathFastPathCallSymbol] = function WriteStreamPathFastPathCallSymbol( - readStream, - pipeOpts, -) { - if (!this[_writeStreamPathFastPathSymbol]) { - return false; - } - - if (this.fd !== null) { - this[_writeStreamPathFastPathSymbol] = false; - return false; - } - - this[kIoDone] = false; - readStream[kIoDone] = false; - return Bun.write(this[_writeStreamPathFastPathSymbol], readStream[readStreamPathOrFdSymbol]).then( - bytesWritten => { - readStream[kIoDone] = this[kIoDone] = true; - this.bytesWritten += bytesWritten; - readStream.bytesRead += bytesWritten; - this.end(); - readStream.close(); - }, - err => { - readStream[kIoDone] = this[kIoDone] = true; - WriteStream_errorOrDestroy.$call(this, err); - readStream.emit("error", err); - }, - ); -}; - -WriteStreamPrototype.isBunFastPathEnabled = function isBunFastPathEnabled() { - return this[_writeStreamPathFastPathSymbol]; -}; - -WriteStreamPrototype.disableBunFastPath = function disableBunFastPath() { - this[_writeStreamPathFastPathSymbol] = false; -}; - -function WriteStream_handleWrite(er, bytes) { - if (er) { - return WriteStream_errorOrDestroy.$call(this, er); - } - - this.bytesWritten += bytes; -} - -function WriteStream_internalClose(err, cb) { - this[_writeStreamPathFastPathSymbol] = false; - var fd = this.fd; - this[_fs].close(fd, er => { - this.fd = null; - cb(err || er); - }); -} - -WriteStreamPrototype._construct = function _construct(callback) { - if (typeof this.fd === "number") { - callback(); - return; - } - - callback(); - this.emit("open", this.fd); - this.emit("ready"); -}; - -WriteStreamPrototype._destroy = function _destroy(err, cb) { - if (this.fd === null) { - return cb(err); - } - - if (this[kIoDone]) { - this.once(kIoDone, () => WriteStream_internalClose.$call(this, err, cb)); - return; - } - - WriteStream_internalClose.$call(this, err, cb); -}; - -WriteStreamPrototype.close = function close(cb) { - if (cb) { - if (this.closed) { - process.nextTick(cb); - return; - } - this.on("close", cb); - } - - // If we are not autoClosing, we should call - // destroy on 'finish'. - if (!this.autoClose) { - this.on("finish", this.destroy); - } - - // We use end() instead of destroy() because of - // https://github.com/nodejs/node/issues/2006 - this.end(); -}; - -WriteStreamPrototype.write = function write(chunk, encoding, cb) { - encoding ??= this._writableState?.defaultEncoding; - this[_writeStreamPathFastPathSymbol] = false; - if (typeof chunk === "string") { - chunk = Buffer.from(chunk, encoding); - } - - // TODO: Replace this when something like lseek is available - var native = this.pos === undefined; - const callback = native - ? (err, bytes) => { - this[kIoDone] = false; - WriteStream_handleWrite.$call(this, err, bytes); - this.emit(kIoDone); - if (cb) !err ? cb() : cb(err); - } - : () => {}; - this[kIoDone] = true; - if (this._write) { - return this._write(chunk, encoding, callback); - } else { - return NativeWritable.prototype.write.$call(this, chunk, encoding, callback, native); - } -}; - -// Do not inherit -WriteStreamPrototype._write = undefined; -WriteStreamPrototype._writev = undefined; - -WriteStreamPrototype.end = function end(chunk, encoding, cb) { - var native = this.pos === undefined; - return NativeWritable.prototype.end.$call(this, chunk, encoding, cb, native); -}; - -WriteStreamPrototype._destroy = function _destroy(err, cb) { - this.close(err, cb); -}; - -function WriteStream_errorOrDestroy(err) { - var { - _readableState: r = { destroyed: false, autoDestroy: false }, - _writableState: w = { destroyed: false, autoDestroy: false }, - } = this; - - if (w?.destroyed || r?.destroyed) { - return this; - } - if (r?.autoDestroy || w?.autoDestroy) this.destroy(err); - else if (err) { - this.emit("error", err); - } -} - -function createWriteStream(path, options) { - return new WriteStream(path, options); -} - -// NOTE: This was too smart and doesn't actually work -// WriteStream = Object.defineProperty( -// function WriteStream(path, options) { -// var _InternalWriteStream = getLazyWriteStream(); -// return new _InternalWriteStream(path, options); -// }, -// Symbol.hasInstance, -// { value: (instance) => instance[writeStreamSymbol] === true }, -// ); - -// ReadStream = Object.defineProperty( -// function ReadStream(path, options) { -// var _InternalReadStream = getLazyReadStream(); -// return new _InternalReadStream(path, options); -// }, -// Symbol.hasInstance, -// { value: (instance) => instance[readStreamSymbol] === true }, -// ); Object.defineProperties(fs, { createReadStream: { @@ -1163,18 +326,19 @@ Object.defineProperties(fs, { createWriteStream: { value: createWriteStream, }, - ReadStream: { - value: ReadStream, + ReadStream: ReadStreamPropertyDescriptor, + WriteStream: WriteStreamPropertyDescriptor, + FSWatcher: FSWatcherPropertyDescriptor, + watch: { + value: watch, }, - WriteStream: { - value: WriteStream, + StatWatcher: StatWatcherPropertyDescriptor, + watchFile: { + value: watchFile, + }, + unwatchFile: { + value: unwatchFile, }, - // ReadStream: { - // get: () => getLazyReadStream(), - // }, - // WriteStream: { - // get: () => getLazyWriteStream(), - // }, }); // lol @@ -1203,7 +367,9 @@ function cp(src, dest, options, callback) { callback = options; options = undefined; } - promises.cp(src, dest, options).then(() => callback(), callback); + require("node:fs/promises") + .cp(src, dest, options) + .then(() => callback(), callback); } function _toUnixTimestamp(time, name = "time") { @@ -1223,12 +389,9 @@ function _toUnixTimestamp(time, name = "time") { throw new TypeError(`Expected ${name} to be a number or Date`); } -export default { +const defaultObject = { Dirent, - FSWatcher, - ReadStream, Stats, - WriteStream, _toUnixTimestamp, access, accessSync, @@ -1277,7 +440,10 @@ export default { mkdtempSync, open, openSync, - promises, + get promises() { + return require("node:fs/promises"); + }, + set promises(v) {}, read, readFile, readFileSync, @@ -1315,14 +481,13 @@ export default { writeSync, writev, writevSync, - [Symbol.for("::bunternal::")]: { - ReadStreamClass, - WriteStreamClass, - }, - // get WriteStream() { - // return getLazyWriteStream(); - // }, - // get ReadStream() { - // return getLazyReadStream(); - // }, }; + +Object.defineProperties(defaultObject, { + WriteStream: WriteStreamPropertyDescriptor, + ReadStream: ReadStreamPropertyDescriptor, + StatWatcher: StatWatcherPropertyDescriptor, + FSWatcher: FSWatcherPropertyDescriptor, +}); + +export default defaultObject; diff --git a/src/js/node/readline.js b/src/js/node/readline.js index 16f4e19957..809a266092 100644 --- a/src/js/node/readline.js +++ b/src/js/node/readline.js @@ -27,6 +27,7 @@ // ---------------------------------------------------------------------------- const EventEmitter = require("node:events"); const { StringDecoder } = require("node:string_decoder"); +var { kCustomPromisifiedSymbol } = require("internal/symbols"); var isWritable; var { inspect } = Bun; @@ -211,7 +212,6 @@ function stripVTControlCharacters(str) { // Promisify -var kCustomPromisifiedSymbol = SymbolFor("nodejs.util.promisify.custom"); var kCustomPromisifyArgsSymbol = Symbol("customPromisifyArgs"); function promisify(original) { diff --git a/src/js/node/util.js b/src/js/node/util.js index 2002eed252..1c698a08d1 100644 --- a/src/js/node/util.js +++ b/src/js/node/util.js @@ -2,6 +2,7 @@ const types = require("node:util/types"); /** @type {import('node-inspect-extracted')} */ const utl = require("internal/util/inspect"); +var { kCustomPromisifiedSymbol } = require("internal/symbols"); var cjs_exports = {}; @@ -147,7 +148,7 @@ var _extend = function (origin, add) { } return origin; }; -var kCustomPromisifiedSymbol = Symbol.for("nodejs.util.promisify.custom"); +const GlobalPromise = globalThis.Promise; var promisify = function promisify(original) { if (typeof original !== "function") throw new TypeError('The "original" argument must be of type Function'); if (kCustomPromisifiedSymbol && original[kCustomPromisifiedSymbol]) { @@ -164,22 +165,17 @@ var promisify = function promisify(original) { return fn; } function fn() { - var promiseResolve, promiseReject; - var promise = new Promise(function (resolve, reject) { - promiseResolve = resolve; - promiseReject = reject; - }); - var args = []; - for (var i = 0; i < arguments.length; i++) { - args.push(arguments[i]); - } - args.push(function (err, value) { - if (err) { - promiseReject(err); - } else { - promiseResolve(value); - } - }); + var { promise, resolve: promiseResolve, reject: promiseReject } = $newPromiseCapability(GlobalPromise); + var args = [ + ...arguments, + function (err, value) { + if (err) { + promiseReject(err); + } else { + promiseResolve(value); + } + }, + ]; try { original.$apply(this, args); } catch (err) { diff --git a/test/js/node/util/util.test.js b/test/js/node/util/util.test.js index df995955a2..f7c7dbf2ef 100644 --- a/test/js/node/util/util.test.js +++ b/test/js/node/util/util.test.js @@ -36,6 +36,22 @@ const deepStrictEqual = (...args) => { // Tests adapted from https://github.com/nodejs/node/blob/main/test/parallel/test-util.js describe("util", () => { + it("callbackify", done => { + const fn = util.callbackify(async arg => { + await 1; + if (arg !== "foo") { + throw new Error("bar"); + } + + return "baz"; + }); + fn("foo", (err, ret) => { + expect(err).toBeNull(); + expect(ret).toBe("baz"); + done(); + }); + }); + it("toUSVString", () => { const strings = [ // Lone high surrogate