native readable rewrite

This commit is contained in:
chloe caruso
2025-01-19 22:50:45 -08:00
parent ed928969d7
commit be4ad4635a
15 changed files with 577 additions and 628 deletions

View File

@@ -25,11 +25,12 @@
*/
export function getStdioWriteStream(fd) {
$assert(typeof fd === "number", `Expected fd to be a number, got ${typeof fd}`);
const tty = require("node:tty");
let stream;
if (tty.isatty(fd)) {
stream = new tty.WriteStream(null, { fd });
stream = new tty.WriteStream(fd);
// TODO: this is the wrong place for this property.
// but the TTY is technically duplex
// see test-fs-syncwritestream.js
@@ -62,7 +63,9 @@ export function getStdioWriteStream(fd) {
stream._isStdio = true;
stream.fd = fd;
return [stream, stream[require("internal/shared").fileSinkSymbol]];
const underlyingSink = stream[require("internal/fs/streams").kWriteStreamFastPath];
$assert(underlyingSink);
return [stream, underlyingSink];
}
export function getStdinStream(fd) {

View File

@@ -157,4 +157,10 @@ export const bindgen = $zig("bindgen_test.zig", "getBindgenTestFunctions") as {
export const noOpForTesting = $cpp("NoOpForTesting.cpp", "createNoOpForTesting");
export const Dequeue = require("internal/fifo");
export const fs = require('node:fs/promises').$data;
export const fs = require("node:fs/promises").$data;
export const fsStreamInternals = {
writeStreamFastPath(str) {
return str[require("internal/fs/streams").kWriteStreamFastPath];
},
};

View File

@@ -1,78 +1,94 @@
// fs.ReadStream and fs.WriteStream are lazily loaded to avoid importing 'node:stream' until required
const { Readable, Writable } = require("node:stream");
import type { FileSink } from "bun";
const { Readable, Writable, finished } = require("node:stream");
const fs: typeof import("node:fs") = require("node:fs");
const {
read,
write,
fsync,
} = fs;
const {
FileHandle,
kRef,
kUnref,
kFd,
} = (fs.promises as any).$data as {
FileHandle: { new(): FileHandle };
const { read, write, fsync, writev } = fs;
const { FileHandle, kRef, kUnref, kFd } = (fs.promises as any).$data as {
FileHandle: { new (): FileHandle };
readonly kRef: unique symbol;
readonly kUnref: unique symbol;
readonly kFd: unique symbol;
fs: typeof fs;
};
type FileHandle = import('node:fs/promises').FileHandle & {
type FileHandle = import("node:fs/promises").FileHandle & {
on(event: any, listener: any): FileHandle;
};
type FSStream = import("node:fs").ReadStream & import("node:fs").WriteStream & {
fd: number | null;
path: string;
flags: string;
mode: number;
start: number;
end: number;
pos: number | undefined;
bytesRead: number;
flush: boolean;
open: () => void;
};
type FSStream = import("node:fs").ReadStream &
import("node:fs").WriteStream & {
fd: number | null;
path: string;
flags: string;
mode: number;
start: number;
end: number;
pos: number | undefined;
bytesRead: number;
flush: boolean;
open: () => void;
autoClose: boolean;
/**
* true = path must be opened
* sink = FileSink
*/
[kWriteStreamFastPath]?: undefined | true | FileSink;
};
type FD = number;
const { validateInteger, validateInt32, validateFunction } = require("internal/validators");
// Bun supports a fast path for `createReadStream("path.txt")` in `Bun.serve`,
// Bun supports a fast path for `createReadStream("path.txt")` with `.pipe(res)`,
// where the entire stream implementation can be bypassed, effectively making it
// `new Response(Bun.file("path.txt"))`. This makes an idomatic Node.js pattern
// much faster.
// `new Response(Bun.file("path.txt"))`.
// This makes an idomatic Node.js pattern much faster.
const kReadStreamFastPath = Symbol("kReadStreamFastPath");
// Bun supports a fast path for `createWriteStream("path.txt")` where instead of
// using `node:fs`, `Bun.file(...).writer()` is used instead.
const kWriteStreamFastPath = Symbol("kWriteStreamFastPath");
const kFs = Symbol("kFs");
// const readStreamSymbol = Symbol.for("Bun.NodeReadStream");
// const readStreamPathOrFdSymbol = Symbol.for("Bun.NodeReadStreamPathOrFd");
// const writeStreamSymbol = Symbol.for("Bun.NodeWriteStream");
// const writeStreamPathFastPathSymbol = Symbol.for("Bun.NodeWriteStreamFastPath");
// const writeStreamPathFastPathCallSymbol = Symbol.for("Bun.NodeWriteStreamFastPathCall");
const kIoDone = Symbol("kIoDone");
const kIsPerformingIO = Symbol("kIsPerformingIO");
const { read: fileHandlePrototypeRead, write: fileHandlePrototypeWrite, fsync: fileHandlePrototypeFsync } = FileHandle.prototype;
const blobToStreamWithOffset = $newZigFunction("blob.zig", "Blob.toStreamWithOffset", 1);
const {
read: fileHandlePrototypeRead,
write: fileHandlePrototypeWrite,
fsync: fileHandlePrototypeFsync,
writev: fileHandlePrototypeWritev,
} = FileHandle.prototype;
const fileHandleStreamFs = (fh: FileHandle) => ({
// try to use the basic fs.read/write/fsync if available, since they are less
// abstractions. however, node.js allows patching the file handle, so this has
// to be checked for.
read: fh.read === fileHandlePrototypeRead ? read : function(fd, buf, offset, length, pos, cb) {
return fh.read(buf,offset,length,pos).then(({ bytesRead, buffer }) => cb(null, bytesRead, buffer), (err) => cb(err, 0, buf));
},
write: fh.write === fileHandlePrototypeWrite ? write : function(fd, buffer, offset, length, position, cb) {
return fh.write(buffer, offset, length, position).then(({ bytesWritten, buffer }) => cb(null, bytesWritten, buffer), (err) => cb(err, 0, buffer));
},
fsync: fh.sync === fileHandlePrototypeFsync ? fsync : function(fd, cb) {
return fh.sync().then(() => cb(), cb);
},
read:
fh.read === fileHandlePrototypeRead
? read
: function (fd, buf, offset, length, pos, cb) {
return fh.read(buf, offset, length, pos).then(
({ bytesRead, buffer }) => cb(null, bytesRead, buffer),
err => cb(err, 0, buf),
);
},
write:
fh.write === fileHandlePrototypeWrite
? write
: function (fd, buffer, offset, length, position, cb) {
return fh.write(buffer, offset, length, position).then(
({ bytesWritten, buffer }) => cb(null, bytesWritten, buffer),
err => cb(err, 0, buffer),
);
},
writev: fh.writev === fileHandlePrototypeWritev ? writev : undefined,
fsync:
fh.sync === fileHandlePrototypeFsync
? fsync
: function (fd, cb) {
return fh.sync().then(() => cb(), cb);
},
close: streamFileHandleClose.bind(fh),
});
function streamFileHandleClose(this: FileHandle, fd: FD, cb: (err?: any) => void) {
$assert(this[kFd] == fd, 'fd mismatch');
$assert(this[kFd] == fd, "fd mismatch");
this[kUnref]();
this.close().then(() => cb(), cb);
}
@@ -86,27 +102,26 @@ function getValidatedPath(p: any) {
function copyObject(source) {
const target = {};
// Node tests for prototype lookups, so { ...source } will not work.
for (const key in source)
target[key] = source[key];
for (const key in source) target[key] = source[key];
return target;
}
function getStreamOptions(options, defaultOptions = {}) {
if (options == null || typeof options === 'function') {
if (options == null || typeof options === "function") {
return defaultOptions;
}
if (typeof options === 'string') {
if (options !== 'buffer' && !Buffer.isEncoding(options)) {
if (typeof options === "string") {
if (options !== "buffer" && !Buffer.isEncoding(options)) {
throw $ERR_INVALID_ARG_VALUE("encoding", options, "is invalid encoding");
}
return { encoding: options };
} else if (typeof options !== 'object') {
throw $ERR_INVALID_ARG_TYPE('options', ['string', 'Object'], options);
} else if (typeof options !== "object") {
throw $ERR_INVALID_ARG_TYPE("options", ["string", "Object"], options);
}
let { encoding, signal = true } = options;
if (encoding && encoding !== 'buffer' && !Buffer.isEncoding(encoding)) {
if (encoding && encoding !== "buffer" && !Buffer.isEncoding(encoding)) {
throw $ERR_INVALID_ARG_VALUE("encoding", encoding, "is invalid encoding");
}
@@ -122,20 +137,13 @@ function ReadStream(this: FSStream, path, options): void {
if (!(this instanceof ReadStream)) {
return new ReadStream(path, options);
}
options = copyObject(getStreamOptions(options));
// Only buffers are supported.
options.decodeStrings = true;
let {
fd,
autoClose,
fs: customFs,
start = 0,
end = Infinity,
encoding,
} = options;
let { fd, autoClose, fs: customFs, start = 0, end = Infinity, encoding } = options;
if (fd == null) {
this[kFs] = customFs || fs;
this.fd = null;
@@ -158,27 +166,25 @@ function ReadStream(this: FSStream, path, options): void {
}
this.fd = fd;
this[kFs] = customFs || fs;
} else if (typeof fd === 'object' && fd instanceof FileHandle) {
} else if (typeof fd === "object" && fd instanceof FileHandle) {
if (options.fs) {
throw $ERR_METHOD_NOT_IMPLEMENTED("fs.FileHandle with custom fs operations");
}
this[kFs] = fileHandleStreamFs(fd);
this.fd = fd[kFd];
fd[kRef]();
fd.on('close', this.close.bind(this));
fd.on("close", this.close.bind(this));
} else {
throw $ERR_INVALID_ARG_TYPE('options.fd', 'number or FileHandle', fd);
throw $ERR_INVALID_ARG_TYPE("options.fd", "number or FileHandle", fd);
}
if (customFs) {
validateFunction(customFs.read, "options.fs.read");
}
$assert(this[kFs], 'fs implementation was not assigned');
$assert(this[kFs], "fs implementation was not assigned");
if((options.autoDestroy = autoClose === undefined
? true
: autoClose) && customFs) {
if ((options.autoDestroy = autoClose === undefined ? true : autoClose) && customFs) {
validateFunction(customFs.close, "options.fs.close");
}
@@ -215,7 +221,7 @@ function ReadStream(this: FSStream, path, options): void {
$toClass(ReadStream, "ReadStream", Readable);
const readStreamPrototype = ReadStream.prototype;
Object.defineProperty(readStreamPrototype, 'autoClose', {
Object.defineProperty(readStreamPrototype, "autoClose", {
get() {
return this._readableState.autoDestroy;
},
@@ -226,22 +232,29 @@ Object.defineProperty(readStreamPrototype, 'autoClose', {
const streamNoop = function open() {
// noop
}
};
function streamConstruct(this: FSStream, callback: (e?: any) => void) {
const { fd } = this;
if (typeof fd === "number") {
callback();
return;
}
const fastPath = this[kWriteStreamFastPath];
if (this.open !== streamNoop) {
if (fastPath) {
// disable fast path in this case
$assert(this[kWriteStreamFastPath] === true, "fastPath is not true");
this[kWriteStreamFastPath] = undefined;
}
// Backwards compat for monkey patching open().
const orgEmit: any = this.emit;
this.emit = function(...args) {
if (args[0] === 'open') {
this.emit = function (...args) {
if (args[0] === "open") {
this.emit = orgEmit;
callback();
orgEmit.$apply(this, args);
} else if (args[0] === 'error') {
} else if (args[0] === "error") {
this.emit = orgEmit;
callback(args[1]);
} else {
@@ -250,14 +263,21 @@ function streamConstruct(this: FSStream, callback: (e?: any) => void) {
} as any;
this.open();
} else {
if (fastPath) {
this.fd = NaN; // TODO: retrieve fd from file sink
callback();
this.emit("open", this.fd);
this.emit("ready");
return;
}
this[kFs].open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
this.emit('open', this.fd);
this.emit('ready');
this.emit("open", this.fd);
this.emit("ready");
}
});
}
@@ -267,10 +287,8 @@ readStreamPrototype.open = streamNoop;
readStreamPrototype._construct = streamConstruct;
readStreamPrototype._read = function(n) {
n = this.pos !== undefined ?
$min(this.end - this.pos + 1, n) :
$min(this.end - this.bytesRead + 1, n);
readStreamPrototype._read = function (n) {
n = this.pos !== undefined ? $min(this.end - this.pos + 1, n) : $min(this.end - this.bytesRead + 1, n);
if (n <= 0) {
this.push(null);
@@ -280,42 +298,41 @@ readStreamPrototype._read = function(n) {
const buf = Buffer.allocUnsafeSlow(n);
this[kIsPerformingIO] = true;
this[kFs]
.read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
this[kIsPerformingIO] = false;
this[kFs].read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
this.emit(kIoDone, er);
return;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
this.emit(kIoDone, er);
return;
}
if (er) {
require("internal/streams/destroy").errorOrDestroy(this, er);
} else if (bytesRead > 0) {
if (this.pos !== undefined) {
this.pos += bytesRead;
}
if (er) {
require('internal/streams/destroy').errorOrDestroy(this, er);
} else if (bytesRead > 0) {
if (this.pos !== undefined) {
this.pos += bytesRead;
}
this.bytesRead += bytesRead;
this.bytesRead += bytesRead;
if (bytesRead !== buf.length) {
// Slow path. Shrink to fit.
// Copy instead of slice so that we don't retain
// large backing buffer for small reads.
const dst = Buffer.allocUnsafeSlow(bytesRead);
buf.copy(dst, 0, 0, bytesRead);
buf = dst;
}
this.push(buf);
} else {
this.push(null);
if (bytesRead !== buf.length) {
// Slow path. Shrink to fit.
// Copy instead of slice so that we don't retain
// large backing buffer for small reads.
const dst = Buffer.allocUnsafeSlow(bytesRead);
buf.copy(dst, 0, 0, bytesRead);
buf = dst;
}
});
this.push(buf);
} else {
this.push(null);
}
});
};
readStreamPrototype._destroy = function(err, cb) {
readStreamPrototype._destroy = function (this: FSStream, err, cb) {
// Usually for async IO it is safe to close a file descriptor
// even when there are pending operations. However, due to platform
// differences file IO is implemented using synchronous operations
@@ -323,18 +340,18 @@ readStreamPrototype._destroy = function(err, cb) {
// to close while used in a pending read or write operation. Wait for
// any pending IO (kIsPerformingIO) to complete (kIoDone).
if (this[kIsPerformingIO]) {
this.once(kIoDone, (er) => close(this, err || er, cb));
this.once(kIoDone, er => close(this, err || er, cb));
} else {
close(this, err, cb);
}
};
readStreamPrototype.close = function(cb) {
if (typeof cb === 'function') require('node:stream').finished(this, cb);
readStreamPrototype.close = function (cb) {
if (typeof cb === "function") finished(this, cb);
this.destroy();
};
Object.defineProperty(readStreamPrototype, 'pending', {
Object.defineProperty(readStreamPrototype, "pending", {
get() {
return this.fd == null;
},
@@ -345,7 +362,7 @@ function close(stream, err, cb) {
if (!stream.fd) {
cb(err);
} else if (stream.flush) {
stream[kFs].fsync(stream.fd, (flushErr) => {
stream[kFs].fsync(stream.fd, flushErr => {
closeAfterSync(stream, err || flushErr, cb);
});
} else {
@@ -354,13 +371,20 @@ function close(stream, err, cb) {
}
function closeAfterSync(stream, err, cb) {
stream[kFs].close(stream.fd, (er) => {
stream[kFs].close(stream.fd, er => {
cb(er || err);
});
stream.fd = null;
}
function WriteStream(this: FSStream, path: string | null, options: any): void {
ReadStream.prototype.pipe = function (this: FSStream, dest, pipeOpts) {
// Fast path for streaming files:
// if (this[kReadStreamFastPath]) {
// }
return Readable.prototype.pipe.$call(this, dest, pipeOpts);
}
function WriteStream(this: FSStream, path: string | null, options?: any): void {
if (!(this instanceof WriteStream)) {
return new WriteStream(path, options);
}
@@ -370,13 +394,8 @@ function WriteStream(this: FSStream, path: string | null, options: any): void {
// Only buffers are supported.
options.decodeStrings = true;
let {
fd,
autoClose,
fs: customFs,
start,
flush,
} = options;
let fastPath = true;
let { fd, autoClose, fs: customFs, start, flush } = options;
if (fd == null) {
this[kFs] = customFs || fs;
this.fd = null;
@@ -387,6 +406,9 @@ function WriteStream(this: FSStream, path: string | null, options: any): void {
if (customFs) {
validateFunction(customFs.open, "options.fs.open");
}
if (flags !== undefined || mode !== undefined || customFs) {
fastPath = false;
}
} else if (typeof options.fd === "number") {
// When fd is a raw descriptor, we must keep our fingers crossed
// that the descriptor won't get closed, or worse, replaced with
@@ -399,21 +421,20 @@ function WriteStream(this: FSStream, path: string | null, options: any): void {
}
this.fd = fd;
this[kFs] = customFs || fs;
} else if (typeof fd === 'object' && fd instanceof FileHandle) {
} else if (typeof fd === "object" && fd instanceof FileHandle) {
if (options.fs) {
throw $ERR_METHOD_NOT_IMPLEMENTED("fs.FileHandle with custom fs operations");
}
this[kFs] = fileHandleStreamFs(fd);
this.fd = fd[kFd];
this[kFs] = customFs = fileHandleStreamFs(fd);
fd[kRef]();
fd.on('close', this.close.bind(this));
fd.on("close", this.close.bind(this));
this.fd = fd = fd[kFd];
fastPath = false;
} else {
throw $ERR_INVALID_ARG_TYPE('options.fd', 'number or FileHandle', fd);
throw $ERR_INVALID_ARG_TYPE("options.fd", "number or FileHandle", fd);
}
const autoDestroy = options.autoDestroy = autoClose === undefined
? true
: autoClose;
const autoDestroy = autoClose = (options.autoDestroy = autoClose === undefined ? true : autoClose);
if (customFs) {
const { write, writev, close, fsync } = customFs;
@@ -445,6 +466,15 @@ function WriteStream(this: FSStream, path: string | null, options: any): void {
this.pos = start;
}
// Enable fast path
if (!start && fastPath) {
this[kWriteStreamFastPath] = fd ? Bun.file(fd).writer() : true;
this._write = underscoreWriteFast;
this._writev = undefined;
this.write = writeFast as any;
this.end = endFast as any;
}
Writable.$call(this, options);
if (options.encoding) {
@@ -461,13 +491,13 @@ writeStreamPrototype._construct = streamConstruct;
function writeAll(data, size, pos, cb, retries = 0) {
this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => {
// No data currently available and operation should be retried later.
if (er?.code === 'EAGAIN') {
if (er?.code === "EAGAIN") {
er = null;
bytesWritten = 0;
}
if (this.destroyed || er) {
return cb(er || $ERR_STREAM_DESTROYED('write'));
return cb(er || $ERR_STREAM_DESTROYED("write"));
}
this.bytesWritten += bytesWritten;
@@ -479,7 +509,7 @@ function writeAll(data, size, pos, cb, retries = 0) {
// Try writing non-zero number of bytes up to 5 times.
if (retries > 5) {
// cb($ERR_SYSTEM_ERROR('write failed'));
cb(new Error('write failed'));
cb(new Error("write failed"));
} else if (size) {
writeAll.$call(this, buffer.slice(bytesWritten), size, pos, cb, retries);
} else {
@@ -491,13 +521,13 @@ function writeAll(data, size, pos, cb, retries = 0) {
function writevAll(chunks, size, pos, cb, retries = 0) {
this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => {
// No data currently available and operation should be retried later.
if (er?.code === 'EAGAIN') {
if (er?.code === "EAGAIN") {
er = null;
bytesWritten = 0;
}
if (this.destroyed || er) {
return cb(er || $ERR_STREAM_DESTROYED('writev'));
return cb(er || $ERR_STREAM_DESTROYED("writev"));
}
this.bytesWritten += bytesWritten;
@@ -509,7 +539,7 @@ function writevAll(chunks, size, pos, cb, retries = 0) {
// Try writing non-zero number of bytes up to 5 times.
if (retries > 5) {
// cb($ERR_SYSTEM_ERROR('writev failed'));
cb(new Error('writev failed'));
cb(new Error("writev failed"));
} else if (size) {
writevAll.$call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries);
} else {
@@ -518,9 +548,9 @@ function writevAll(chunks, size, pos, cb, retries = 0) {
});
}
writeStreamPrototype._write = function(data, encoding, cb) {
function _write(data, encoding, cb) {
this[kIsPerformingIO] = true;
writeAll.$call(this, data, data.length, this.pos, (er) => {
writeAll.$call(this, data, data.length, this.pos, er => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
@@ -531,11 +561,75 @@ writeStreamPrototype._write = function(data, encoding, cb) {
cb(er);
});
if (this.pos !== undefined)
this.pos += data.length;
};
if (this.pos !== undefined) this.pos += data.length;
}
writeStreamPrototype._write = _write;
writeStreamPrototype._writev = function(data, cb) {
function underscoreWriteFast(this: FSStream, data: any, encoding: any, cb: any) {
let fileSink = this[kWriteStreamFastPath];
if (!fileSink) {
// When the fast path is disabled, the write function gets reset.
this._write = _write;
return this._write(data, encoding, cb);
}
try {
if (fileSink === true) {
fileSink = this[kWriteStreamFastPath] = Bun.file(this.path).writer();
}
const maybePromise = fileSink.write(data);
if (cb) thenIfPromise(maybePromise, cb);
} catch (e) {
if (cb) process.nextTick(cb, e);
}
return true;
}
function writeFast(this: FSStream, data: any, encoding: any, cb: any) {
if (encoding != null && typeof encoding === "function") {
cb = encoding;
encoding = null;
}
let fileSink = this[kWriteStreamFastPath];
if (!fileSink) {
this.write = Writable.prototype.write;
return this.write(data, encoding, cb);
} else if (fileSink === true) {
if (this.open !== streamNoop) {
this.write = Writable.prototype.write;
return this.write(data, encoding, cb);
}
fileSink = this[kWriteStreamFastPath] = Bun.file(this.path).writer();
}
try {
const maybePromise = fileSink.write(data);
if (typeof cb === "function") thenIfPromise(maybePromise, cb);
} catch (e) {
if (typeof cb === "function") process.nextTick(cb, e);
else throw e;
}
return true;
}
function endFast(this: FSStream, data: any, encoding: any, cb: any) {
const fileSink = this[kWriteStreamFastPath];
if (fileSink && fileSink !== true) {
if (data) {
const maybePromise = fileSink.write(data);
if ($isPromise(maybePromise)) {
fileSink.flush();
maybePromise.then(() => Writable.prototype.end.$call(this, cb));
return;
}
}
process.nextTick(() => Writable.prototype.end.$call(this, cb));
return;
}
return Writable.prototype.end.$call(this, data, encoding, cb);
}
writeStreamPrototype._writev = function (data, cb) {
const len = data.length;
const chunks = new Array(len);
let size = 0;
@@ -548,7 +642,7 @@ writeStreamPrototype._writev = function(data, cb) {
}
this[kIsPerformingIO] = true;
writevAll.$call(this, chunks, size, this.pos, (er) => {
writevAll.$call(this, chunks, size, this.pos, er => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
@@ -559,11 +653,16 @@ writeStreamPrototype._writev = function(data, cb) {
cb(er);
});
if (this.pos !== undefined)
this.pos += size;
if (this.pos !== undefined) this.pos += size;
};
writeStreamPrototype._destroy = function(err, cb) {
writeStreamPrototype._destroy = function (err, cb) {
const fastPath: FileSink | true = this[kWriteStreamFastPath];
if (fastPath && fastPath !== true) {
const maybePromise = fastPath.end(err);
thenIfPromise(maybePromise, cb);
return;
}
// Usually for async IO it is safe to close a file descriptor
// even when there are pending operations. However, due to platform
// differences file IO is implemented using synchronous operations
@@ -571,25 +670,25 @@ writeStreamPrototype._destroy = function(err, cb) {
// to close while used in a pending read or write operation. Wait for
// any pending IO (kIsPerformingIO) to complete (kIoDone).
if (this[kIsPerformingIO]) {
this.once(kIoDone, (er) => close(this, err || er, cb));
this.once(kIoDone, er => close(this, err || er, cb));
} else {
close(this, err, cb);
}
};
writeStreamPrototype.close = function(cb) {
writeStreamPrototype.close = function (this: FSStream, cb) {
if (cb) {
if (this.closed) {
process.nextTick(cb);
return;
}
this.on('close', cb);
this.on("close", cb);
}
// If we are not autoClosing, we should call
// destroy on 'finish'.
if (!this.autoClose) {
this.on('finish', this.destroy);
this.on("finish", this.destroy);
}
// We use end() instead of destroy() because of
@@ -600,7 +699,7 @@ writeStreamPrototype.close = function(cb) {
// There is no shutdown() for files.
writeStreamPrototype.destroySoon = writeStreamPrototype.end;
Object.defineProperty(writeStreamPrototype, 'autoClose', {
Object.defineProperty(writeStreamPrototype, "autoClose", {
get() {
return this._writableState.autoDestroy;
},
@@ -609,9 +708,36 @@ Object.defineProperty(writeStreamPrototype, 'autoClose', {
},
});
Object.$defineProperty(writeStreamPrototype, 'pending', {
get() { return this.fd === null; },
Object.$defineProperty(writeStreamPrototype, "pending", {
get() {
return this.fd === null;
},
configurable: true,
});
export default { ReadStream, WriteStream, kReadStreamFastPath };
function thenIfPromise<T>(maybePromise: Promise<T> | T, cb: any) {
$assert(typeof cb === "function", "cb is not a function");
if ($isPromise(maybePromise)) {
maybePromise.then(() => cb(null), cb);
} else {
process.nextTick(cb, null);
}
}
function writableFromFileSink(fileSink: any) {
$assert(typeof fileSink === "object", "fileSink is not an object");
$assert(typeof fileSink.write === "function", "fileSink.write is not a function");
$assert(typeof fileSink.end === "function", "fileSink.end is not a function");
const w = new WriteStream("");
$assert(w[kWriteStreamFastPath] === true, "fast path not enabled");
w[kWriteStreamFastPath] = fileSink;
w.path = undefined;
return w;
}
export default {
ReadStream,
WriteStream,
kWriteStreamFastPath,
writableFromFileSink,
};

View File

@@ -45,8 +45,6 @@ function warnNotImplementedOnce(feature: string, issue?: number) {
console.warn(new NotImplementedError(feature, issue));
}
const fileSinkSymbol = Symbol("fileSink");
//
let util: typeof import("node:util");
@@ -103,7 +101,6 @@ export default {
throwNotImplemented,
hideFromStack,
warnNotImplementedOnce,
fileSinkSymbol,
ExceptionWithHostPort,
once,
@@ -111,7 +108,6 @@ export default {
kAutoDestroyed: Symbol("kAutoDestroyed"),
kResistStopPropagation: Symbol("kResistStopPropagation"),
kWeakHandler: Symbol("kWeak"),
kEnsureConstructed: Symbol("kEnsureConstructed"),
kGetNativeReadableProto: Symbol("kGetNativeReadableProto"),
kEmptyObject,
};

View File

@@ -0,0 +1,253 @@
// NativeReadable is an implementation of ReadableStream which contains
// a pointer to a native handle. This is used, for example, to make
// child_process' stderr/out streams go through less hoops.
//
// Normally, Readable.fromWeb will wrap the ReadableStream in JavaScript. In
// Bun, `fromWeb` is able to check if the stream is backed by a native handle,
// to which it will take this path.
const Readable = require("node:stream").Readable;
const transferToNativeReadable = $newCppFunction("ReadableStream.cpp", "jsFunctionTransferToNativeReadableStream", 1);
const { errorOrDestroy } = require("internal/streams/destroy");
const kRefCount = Symbol("refCount");
const kCloseState = Symbol("closeState");
const kConstructed = Symbol("constructed");
const kHighWaterMark = Symbol("highWaterMark");
const kPendingRead = Symbol("pendingRead");
const kHasResized = Symbol("hasResized");
const kRemainingChunk = Symbol("remainingChunk");
const MIN_BUFFER_SIZE = 512;
let dynamicallyAdjustChunkSize = (_?) => (
(_ = process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1"), (dynamicallyAdjustChunkSize = () => _)
);
type NativeReadable = typeof import("node:stream").Readable &
typeof import("node:stream").Stream & {
push: (chunk: any) => void;
$bunNativePtr?: NativePtr;
[kRefCount]: number;
[kCloseState]: [boolean];
[kPendingRead]: boolean;
[kHighWaterMark]: number;
[kHasResized]: boolean;
[kRemainingChunk]: Buffer;
debugId: number;
};
interface NativePtr {
onClose: () => void;
onDrain: (chunk: any) => void;
start: (highWaterMark: number) => number;
drain: () => any;
pull: (view: any, closer: any) => any;
updateRef: (ref: boolean) => void;
cancel: (error: any) => void;
}
let debugId = 0;
function constructNativeReadable(readableStream: ReadableStream, options): NativeReadable {
$assert(typeof readableStream === "object" && readableStream instanceof ReadableStream, "Invalid readable stream");
const bunNativePtr = (readableStream as any).$bunNativePtr;
$assert(typeof bunNativePtr === "object", "Invalid native ptr");
const stream = new Readable(options);
stream._read = read;
stream._destroy = destroy;
if (!!$debug) {
stream.debugId = ++debugId;
}
stream.$bunNativePtr = bunNativePtr;
stream[kRefCount] = 0;
stream[kConstructed] = false;
stream[kPendingRead] = false;
stream[kHasResized] = !dynamicallyAdjustChunkSize();
stream[kCloseState] = [false];
if (typeof options.highWaterMark === "number") {
stream[kHighWaterMark] = options.highWaterMark;
} else {
stream[kHighWaterMark] = 256 * 1024;
}
stream.ref = ref;
stream.unref = unref;
if (process.platform === "win32") {
// Only used by node:tty on Windows
stream.$start = ensureConstructed;
}
// https://github.com/oven-sh/bun/pull/12801
// https://github.com/oven-sh/bun/issues/9555
// There may be a ReadableStream.Strong handle to the ReadableStream.
// We can't update those handles to point to the NativeReadable from JS
// So we instead mark it as no longer usable, and create a new NativeReadable
transferToNativeReadable(readableStream);
$debug(`[${stream.debugId}] constructed!`);
return stream;
}
function ensureConstructed(this: NativeReadable, cb: null | (() => void)) {
$debug(`[${this.debugId}] ensureConstructed`);
if (this[kConstructed]) return;
this[kConstructed] = true;
const ptr = this.$bunNativePtr;
if (!ptr) return;
$assert(typeof ptr.start === "function", "NativeReadable.start is not a function");
ptr.start(this[kHighWaterMark]);
if (cb) cb();
}
// maxToRead can be the highWaterMark (by default) or the remaining amount of the stream to read
// This is so the consumer of the stream can terminate the stream early if they know
// how many bytes they want to read (ie. when reading only part of a file)
// ObjectDefinePrivateProperty(NativeReadable.prototype, "_getRemainingChunk", );
function getRemainingChunk(stream: NativeReadable, maxToRead?: number) {
maxToRead ??= stream[kHighWaterMark] as number;
var chunk = stream[kRemainingChunk];
if (chunk?.byteLength ?? 0 < MIN_BUFFER_SIZE) {
var size = maxToRead > MIN_BUFFER_SIZE ? maxToRead : MIN_BUFFER_SIZE;
stream[kRemainingChunk] = chunk = Buffer.alloc(size);
}
$debug(`[${stream.debugId}] getRemainingChunk, ${chunk?.byteLength} bytes`);
return chunk;
}
function read(this: NativeReadable, maxToRead: number) {
$debug(`[${this.debugId}] read${this[kPendingRead] ? ", is already pending" : ""}`);
if (this[kPendingRead]) {
return;
}
var ptr = this.$bunNativePtr;
if (!ptr) {
$debug(`[${this.debugId}] read, no ptr`);
this.push(null);
return;
}
if (!this[kConstructed]) {
const result = ptr.start(this[kHighWaterMark]);
$debug(`[${this.debugId}] start, initial hwm: ${result}`);
if (typeof result === "number" && result > 1) {
this[kHasResized] = true;
this[kHighWaterMark] = Math.min(this[kHighWaterMark], result);
}
const drainResult = ptr.drain();
this[kConstructed] = true;
$debug(`[${this.debugId}] drain result: ${drainResult?.byteLength ?? 'null'}`);
if ((drainResult?.byteLength ?? 0) > 0) {
this.push(drainResult);
}
}
const chunk = getRemainingChunk(this, maxToRead);
var result = ptr.pull(chunk, this[kCloseState]);
$assert(result !== undefined);
$debug(`[${this.debugId}] pull ${chunk?.byteLength} bytes, result: ${result instanceof Promise ? '<pending>' : result}, closeState: ${this[kCloseState][0]}`);
if ($isPromise(result)) {
this[kPendingRead] = true;
return result.then(
result => {
$debug(`[${this.debugId}] pull, resolved: ${result}, closeState: ${this[kCloseState][0]}`);
this[kPendingRead] = false;
this[kRemainingChunk] = handleResult(this, result, chunk, this[kCloseState][0]);
},
reason => {
errorOrDestroy(this, reason);
},
);
} else {
this[kRemainingChunk] = handleResult(this, result, chunk, this[kCloseState][0]);
}
}
function handleResult(stream: NativeReadable, result: any, chunk: Buffer, isClosed: boolean) {
if (typeof result === "number") {
$debug(`[${stream.debugId}] handleResult(${result})`);
if (result >= stream[kHighWaterMark] && !stream[kHasResized] && !isClosed) {
adjustHighWaterMark(stream);
}
return handleNumberResult(stream, result, chunk, isClosed);
} else if (typeof result === "boolean") {
$debug(`[${stream.debugId}] handleResult(${result})`, chunk, isClosed);
process.nextTick(() => {
stream.push(null);
});
return (chunk?.byteLength ?? 0 > 0) ? chunk : undefined;
} else if ($isTypedArrayView(result)) {
if (result.byteLength >= stream[kHighWaterMark] && !stream[kHasResized] && !isClosed) {
adjustHighWaterMark(stream);
}
return handleArrayBufferViewResult(stream, result, chunk, isClosed);
} else {
$assert(false, "Invalid result from pull");
}
}
function handleNumberResult(stream: NativeReadable, result: number, chunk: any, isClosed: boolean) {
if (result > 0) {
const slice = chunk.subarray(0, result);
chunk = slice.byteLength < chunk.byteLength ? chunk.subarray(result) : undefined;
if (slice.byteLength > 0) {
stream.push(slice);
}
}
if (isClosed) {
process.nextTick(() => {
stream.push(null);
});
}
return chunk;
}
function handleArrayBufferViewResult(stream: NativeReadable, result: any, chunk: any, isClosed: boolean) {
if (result.byteLength > 0) {
stream.push(result);
}
if (isClosed) {
process.nextTick(() => {
stream.push(null);
});
}
return chunk;
}
function adjustHighWaterMark(stream: NativeReadable) {
stream[kHighWaterMark] = $min(stream[kHighWaterMark] * 2, 1024 * 1024 * 2);
stream[kHasResized] = true;
}
function destroy(this: NativeReadable, error: any, cb: () => void) {
const ptr = this.$bunNativePtr;
if (ptr) {
ptr.cancel(error);
}
if (cb) {
process.nextTick(cb);
}
}
function ref(this: NativeReadable) {
const ptr = this.$bunNativePtr;
if (ptr === undefined) return;
if (this[kRefCount]++ === 0) {
ptr.updateRef(true);
}
}
function unref(this: NativeReadable) {
const ptr = this.$bunNativePtr;
if (ptr === undefined) return;
if (this[kRefCount]-- === 1) {
ptr.updateRef(false);
}
}
export default { constructNativeReadable };

View File

@@ -1,248 +0,0 @@
const { kEnsureConstructed } = require("internal/shared");
const { errorOrDestroy } = require("internal/streams/destroy");
const ProcessNextTick = process.nextTick;
var DYNAMICALLY_ADJUST_CHUNK_SIZE = process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1";
const MIN_BUFFER_SIZE = 512;
const refCount = Symbol("refCount");
const constructed = Symbol("constructed");
const remainingChunk = Symbol("remainingChunk");
const highWaterMark = Symbol("highWaterMark");
const pendingRead = Symbol("pendingRead");
const hasResized = Symbol("hasResized");
const _onClose = Symbol("_onClose");
const _onDrain = Symbol("_onDrain");
const _internalConstruct = Symbol("_internalConstruct");
const _getRemainingChunk = Symbol("_getRemainingChunk");
const _adjustHighWaterMark = Symbol("_adjustHighWaterMark");
const _handleResult = Symbol("_handleResult");
const _internalRead = Symbol("_internalRead");
export default function () {
const Readable = require("internal/streams/readable");
var closer = [false];
var handleNumberResult = function (nativeReadable, result, view, isClosed) {
if (result > 0) {
const slice = view.subarray(0, result);
view = slice.byteLength < view.byteLength ? view.subarray(result) : undefined;
if (slice.byteLength > 0) {
nativeReadable.push(slice);
}
}
if (isClosed) {
ProcessNextTick(() => {
nativeReadable.push(null);
});
}
return view;
};
var handleArrayBufferViewResult = function (nativeReadable, result, view, isClosed) {
if (result.byteLength > 0) {
nativeReadable.push(result);
}
if (isClosed) {
ProcessNextTick(() => {
nativeReadable.push(null);
});
}
return view;
};
function NativeReadable(ptr, options) {
if (!(this instanceof NativeReadable)) return Reflect.construct(NativeReadable, [ptr, options]);
this[refCount] = 0;
this[constructed] = false;
this[remainingChunk] = undefined;
this[pendingRead] = false;
this[hasResized] = !DYNAMICALLY_ADJUST_CHUNK_SIZE;
options ??= {};
Readable.$call(this, options);
if (typeof options.highWaterMark === "number") {
this[highWaterMark] = options.highWaterMark;
} else {
this[highWaterMark] = 256 * 1024;
}
this.$bunNativePtr = ptr;
this[constructed] = false;
this[remainingChunk] = undefined;
this[pendingRead] = false;
if (ptr) {
ptr.onClose = this[_onClose].bind(this);
ptr.onDrain = this[_onDrain].bind(this);
}
}
$toClass(NativeReadable, "NativeReadable", Readable);
NativeReadable.prototype[_onClose] = function () {
this.push(null);
};
NativeReadable.prototype[_onDrain] = function (chunk) {
this.push(chunk);
};
// maxToRead is by default the highWaterMark passed from the Readable.read call to this fn
// However, in the case of an fs.ReadStream, we can pass the number of bytes we want to read
// which may be significantly less than the actual highWaterMark
NativeReadable.prototype._read = function _read(maxToRead) {
$debug("NativeReadable._read", this.__id);
if (this[pendingRead]) {
$debug("pendingRead is true", this.__id);
return;
}
var ptr = this.$bunNativePtr;
$debug("ptr @ NativeReadable._read", ptr, this.__id);
if (!ptr) {
this.push(null);
return;
}
if (!this[constructed]) {
$debug("NativeReadable not constructed yet", this.__id);
this[_internalConstruct](ptr);
}
return this[_internalRead](this[_getRemainingChunk](maxToRead), ptr);
};
NativeReadable.prototype[_internalConstruct] = function (ptr) {
$assert(this[constructed] === false);
this[constructed] = true;
const result = ptr.start(this[highWaterMark]);
$debug("NativeReadable internal `start` result", result, this.__id);
if (typeof result === "number" && result > 1) {
this[hasResized] = true;
$debug("NativeReadable resized", this.__id);
this[highWaterMark] = Math.min(this[highWaterMark], result);
}
const drainResult = ptr.drain();
$debug("NativeReadable drain result", drainResult, this.__id);
if ((drainResult?.byteLength ?? 0) > 0) {
this.push(drainResult);
}
};
// maxToRead can be the highWaterMark (by default) or the remaining amount of the stream to read
// This is so the consumer of the stream can terminate the stream early if they know
// how many bytes they want to read (ie. when reading only part of a file)
// ObjectDefinePrivateProperty(NativeReadable.prototype, "_getRemainingChunk", );
NativeReadable.prototype[_getRemainingChunk] = function (maxToRead) {
maxToRead ??= this[highWaterMark];
var chunk = this[remainingChunk];
$debug("chunk @ #getRemainingChunk", chunk, this.__id);
if (chunk?.byteLength ?? 0 < MIN_BUFFER_SIZE) {
var size = maxToRead > MIN_BUFFER_SIZE ? maxToRead : MIN_BUFFER_SIZE;
this[remainingChunk] = chunk = new Buffer(size);
}
return chunk;
};
// ObjectDefinePrivateProperty(NativeReadable.prototype, "_adjustHighWaterMark", );
NativeReadable.prototype[_adjustHighWaterMark] = function () {
this[highWaterMark] = Math.min(this[highWaterMark] * 2, 1024 * 1024 * 2);
this[hasResized] = true;
$debug("Resized", this.__id);
};
// ObjectDefinePrivateProperty(NativeReadable.prototype, "_handleResult", );
NativeReadable.prototype[_handleResult] = function (result, view, isClosed) {
$debug("result, isClosed @ #handleResult", result, isClosed, this.__id);
if (typeof result === "number") {
if (result >= this[highWaterMark] && !this[hasResized] && !isClosed) {
this[_adjustHighWaterMark]();
}
return handleNumberResult(this, result, view, isClosed);
} else if (typeof result === "boolean") {
ProcessNextTick(() => {
this.push(null);
});
return (view?.byteLength ?? 0 > 0) ? view : undefined;
} else if ($isTypedArrayView(result)) {
if (result.byteLength >= this[highWaterMark] && !this[hasResized] && !isClosed) {
this[_adjustHighWaterMark]();
}
return handleArrayBufferViewResult(this, result, view, isClosed);
} else {
$debug("Unknown result type", result, this.__id);
throw new Error("Invalid result from pull");
}
};
NativeReadable.prototype[_internalRead] = function (view, ptr) {
$debug("#internalRead()", this.__id);
closer[0] = false;
var result = ptr.pull(view, closer);
if ($isPromise(result)) {
this[pendingRead] = true;
return result.then(
result => {
this[pendingRead] = false;
$debug("pending no longerrrrrrrr (result returned from pull)", this.__id);
const isClosed = closer[0];
this[remainingChunk] = this[_handleResult](result, view, isClosed);
},
reason => {
$debug("error from pull", reason, this.__id);
errorOrDestroy(this, reason);
},
);
} else {
this[remainingChunk] = this[_handleResult](result, view, closer[0]);
}
};
NativeReadable.prototype._destroy = function (error, callback) {
var ptr = this.$bunNativePtr;
if (!ptr) {
callback(error);
return;
}
this.$bunNativePtr = undefined;
ptr.updateRef(false);
$debug("NativeReadable destroyed", this.__id);
ptr.cancel(error);
callback(error);
};
NativeReadable.prototype.ref = function () {
var ptr = this.$bunNativePtr;
if (ptr === undefined) return;
if (this[refCount]++ === 0) {
ptr.updateRef(true);
}
};
NativeReadable.prototype.unref = function () {
var ptr = this.$bunNativePtr;
if (ptr === undefined) return;
if (this[refCount]-- === 1) {
ptr.updateRef(false);
}
};
NativeReadable.prototype[kEnsureConstructed] = function () {
if (this[constructed]) return;
this[_internalConstruct](this.$bunNativePtr);
};
return NativeReadable;
}

View File

@@ -1,135 +0,0 @@
const Writable = require("internal/streams/writable");
const ProcessNextTick = process.nextTick;
const _native = Symbol("native");
const _pathOrFdOrSink = Symbol("pathOrFdOrSink");
const { fileSinkSymbol: _fileSink } = require("internal/shared");
function NativeWritable(pathOrFdOrSink, options = {}) {
Writable.$call(this, options);
this[_native] = true;
this._construct = NativeWritable_internalConstruct;
this._final = NativeWritable_internalFinal;
this._write = NativeWritablePrototypeWrite;
this[_pathOrFdOrSink] = pathOrFdOrSink;
}
$toClass(NativeWritable, "NativeWritable", Writable);
// These are confusingly two different fns for construct which initially were the same thing because
// `_construct` is part of the lifecycle of Writable and is not called lazily,
// so we need to separate our _construct for Writable state and actual construction of the write stream
function NativeWritable_internalConstruct(cb) {
this._writableState.constructed = true;
this.constructed = true;
if (typeof cb === "function") ProcessNextTick(cb);
ProcessNextTick(() => {
this.emit("open", this.fd);
this.emit("ready");
});
}
function NativeWritable_internalFinal(cb) {
var sink = this[_fileSink];
if (sink) {
const end = sink.end(true);
if ($isPromise(end) && cb) {
end.then(() => {
if (cb) cb();
}, cb);
}
}
if (cb) cb();
}
function NativeWritablePrototypeWrite(chunk, encoding, cb) {
var fileSink = this[_fileSink] ?? NativeWritable_lazyConstruct(this);
var result = fileSink.write(chunk);
if (typeof encoding === "function") {
cb = encoding;
}
if ($isPromise(result)) {
// var writePromises = this.#writePromises;
// var i = writePromises.length;
// writePromises[i] = result;
result
.then(result => {
this.emit("drain");
if (cb) {
cb(null, result);
}
})
.catch(
cb
? err => {
cb(err);
}
: err => {
this.emit("error", err);
},
);
return false;
}
// TODO: Should we just have a calculation based on encoding and length of chunk?
if (cb) cb(null, chunk.byteLength);
return true;
}
function NativeWritable_lazyConstruct(stream) {
// TODO: Turn this check into check for instanceof FileSink
var sink = stream[_pathOrFdOrSink];
if (typeof sink === "object") {
if (typeof sink.write === "function") {
return (stream[_fileSink] = sink);
} else {
throw new Error("Invalid FileSink");
}
} else {
return (stream[_fileSink] = Bun.file(sink).writer());
}
}
const WritablePrototypeEnd = Writable.prototype.end;
NativeWritable.prototype.end = function end(chunk, encoding, cb, native) {
return WritablePrototypeEnd.$call(this, chunk, encoding, cb, native ?? this[_native]);
};
NativeWritable.prototype._destroy = function (error, cb) {
const w = this._writableState;
const r = this._readableState;
if (w) {
w.destroyed = true;
w.closeEmitted = true;
}
if (r) {
r.destroyed = true;
r.closeEmitted = true;
}
if (typeof cb === "function") cb(error);
if (w?.closeEmitted || r?.closeEmitted) {
this.emit("close");
}
};
NativeWritable.prototype.ref = function ref() {
const sink = (this[_fileSink] ||= NativeWritable_lazyConstruct(this));
sink.ref();
return this;
};
NativeWritable.prototype.unref = function unref() {
const sink = (this[_fileSink] ||= NativeWritable_lazyConstruct(this));
sink.unref();
return this;
};
export default NativeWritable;

View File

@@ -73,7 +73,6 @@ const MapPrototypeValues = uncurryThis(Map.prototype.values);
const MapPrototypeKeys = uncurryThis(Map.prototype.keys);
const MathFloor = Math.floor;
const MathMax = Math.max;
const MathMin = Math.min;
const MathRound = Math.round;
const MathSqrt = Math.sqrt;
const MathTrunc = Math.trunc;
@@ -1623,7 +1622,7 @@ function identicalSequenceRange(a, b) {
const rest = b.length - pos;
if (rest > 3) {
let len = 1;
const maxLen = MathMin(a.length - i, rest);
const maxLen = $min(a.length - i, rest);
// Count the number of consecutive entries.
while (maxLen > len && a[i + len] === b[pos + len]) {
len++;
@@ -1873,7 +1872,7 @@ function groupArrayElements(ctx, output, value) {
const averageBias = MathSqrt(actualMax - totalLength / output.length);
const biasedMax = MathMax(actualMax - 3 - averageBias, 1);
// Dynamically check how many columns seem possible.
const columns = MathMin(
const columns = $min(
// Ideally a square should be drawn. We expect a character to be about 2.5
// times as high as wide. This is the area formula to calculate a square
// which contains n rectangles of size `actualMax * approxCharHeights`.
@@ -1914,7 +1913,7 @@ function groupArrayElements(ctx, output, value) {
// Each iteration creates a single line of grouped entries.
for (let i = 0; i < outputLength; i += columns) {
// The last lines may contain less entries than columns.
const max = MathMin(i + columns, outputLength);
const max = $min(i + columns, outputLength);
let str = "";
let j = i;
for (; j < max - 1; j++) {
@@ -2114,7 +2113,7 @@ function formatArrayBuffer(ctx, value) {
return [ctx.stylize("(detached)", "special")];
}
let str = StringPrototypeTrim(
RegExpPrototypeSymbolReplace(/(.{2})/g, hexSlice(buffer, 0, MathMin(ctx.maxArrayLength, buffer.length)), "$1 "),
RegExpPrototypeSymbolReplace(/(.{2})/g, hexSlice(buffer, 0, $min(ctx.maxArrayLength, buffer.length)), "$1 "),
);
const remaining = buffer.length - ctx.maxArrayLength;
if (remaining > 0) str += ` ... ${remaining} more byte${remaining > 1 ? "s" : ""}`;
@@ -2123,7 +2122,7 @@ function formatArrayBuffer(ctx, value) {
function formatArray(ctx, value, recurseTimes) {
const valLen = value.length;
const len = MathMin(MathMax(0, ctx.maxArrayLength), valLen);
const len = $min(MathMax(0, ctx.maxArrayLength), valLen);
const remaining = valLen - len;
const output = [];
@@ -2144,9 +2143,9 @@ function formatTypedArray(value, length, ctx, ignored, recurseTimes) {
if (Buffer.isBuffer(value)) {
BufferModule ??= require("node:buffer");
const INSPECT_MAX_BYTES = $requireMap.$get("buffer")?.exports.INSPECT_MAX_BYTES ?? BufferModule.INSPECT_MAX_BYTES;
ctx.maxArrayLength = MathMin(ctx.maxArrayLength, INSPECT_MAX_BYTES);
ctx.maxArrayLength = $min(ctx.maxArrayLength, INSPECT_MAX_BYTES);
}
const maxLength = MathMin(MathMax(0, ctx.maxArrayLength), length);
const maxLength = $min(MathMax(0, ctx.maxArrayLength), length);
const remaining = value.length - maxLength;
const output = new Array(maxLength);
const elementFormatter = value.length > 0 && typeof value[0] === "number" ? formatNumber : formatBigInt;
@@ -2171,7 +2170,7 @@ function formatTypedArray(value, length, ctx, ignored, recurseTimes) {
function formatSet(value, ctx, ignored, recurseTimes) {
const length = value.size;
const maxLength = MathMin(MathMax(0, ctx.maxArrayLength), length);
const maxLength = $min(MathMax(0, ctx.maxArrayLength), length);
const remaining = length - maxLength;
const output = [];
ctx.indentationLvl += 2;
@@ -2190,7 +2189,7 @@ function formatSet(value, ctx, ignored, recurseTimes) {
function formatMap(value, ctx, ignored, recurseTimes) {
const length = value.size;
const maxLength = MathMin(MathMax(0, ctx.maxArrayLength), length);
const maxLength = $min(MathMax(0, ctx.maxArrayLength), length);
const remaining = length - maxLength;
const output = [];
ctx.indentationLvl += 2;
@@ -2209,7 +2208,7 @@ function formatMap(value, ctx, ignored, recurseTimes) {
function formatSetIterInner(ctx, recurseTimes, entries, state) {
const maxArrayLength = MathMax(ctx.maxArrayLength, 0);
const maxLength = MathMin(maxArrayLength, entries.length);
const maxLength = $min(maxArrayLength, entries.length);
const output = new Array(maxLength);
ctx.indentationLvl += 2;
for (let i = 0; i < maxLength; i++) {
@@ -2234,7 +2233,7 @@ function formatMapIterInner(ctx, recurseTimes, entries, state) {
// Entries exist as [key1, val1, key2, val2, ...]
const len = entries.length / 2;
const remaining = len - maxArrayLength;
const maxLength = MathMin(maxArrayLength, len);
const maxLength = $min(maxArrayLength, len);
const output = new Array(maxLength);
let i = 0;
ctx.indentationLvl += 2;

View File

@@ -28,31 +28,14 @@ const PromiseResolve = Promise.resolve.bind(Promise);
const PromisePrototypeThen = Promise.prototype.then;
const SafePromisePrototypeFinally = Promise.prototype.finally;
const constants_zlib = process.binding("constants").zlib;
const constants_zlib = $processBindingConstants.zlib;
//
//
const transferToNativeReadable = $newCppFunction("ReadableStream.cpp", "jsFunctionTransferToNativeReadableStream", 1);
function getNativeReadableStream(Readable, stream, options) {
function tryTransferToNativeReadable(stream, options) {
const ptr = stream.$bunNativePtr;
if (!ptr || ptr === -1) {
$debug("no native readable stream");
return undefined;
}
const type = stream.$bunNativeType;
$assert(typeof type === "number", "Invalid native type");
$assert(typeof ptr === "object", "Invalid native ptr");
const NativeReadable = require("node:stream")[kGetNativeReadableProto](type);
// https://github.com/oven-sh/bun/pull/12801
// https://github.com/oven-sh/bun/issues/9555
// There may be a ReadableStream.Strong handle to the ReadableStream.
// We can't update those handles to point to the NativeReadable from JS
// So we instead mark it as no longer usable, and create a new NativeReadable
transferToNativeReadable(stream);
return new NativeReadable(ptr, options);
return require("internal/streams/native-readable").constructNativeReadable(stream, options);
}
class ReadableFromWeb extends Readable {
@@ -177,8 +160,6 @@ class ReadableFromWeb extends Readable {
}
}
}
//
//
const encoder = new TextEncoder();
@@ -542,7 +523,7 @@ function newStreamReadableFromReadableStream(readableStream, options = kEmptyObj
throw $ERR_INVALID_ARG_VALUE("options.encoding", encoding);
validateBoolean(objectMode, "options.objectMode");
const nativeStream = getNativeReadableStream(Readable, readableStream, options);
const nativeStream = tryTransferToNativeReadable(readableStream, options);
return (
nativeStream ||

View File

@@ -53,9 +53,6 @@ if ($debug) {
};
}
var NativeWritable;
var ReadableFromWeb;
// Sections:
// 1. Exported child_process functions
// 2. child_process helpers
@@ -1123,21 +1120,16 @@ class ChildProcess extends EventEmitter {
}
}
NativeWritable ||= StreamModule.NativeWritable;
ReadableFromWeb ||= StreamModule.Readable.fromWeb;
const io = this.#stdioOptions[i];
switch (i) {
case 0: {
switch (io) {
case "pipe": {
const stdin = this.#handle.stdin;
if (!stdin)
// This can happen if the process was already killed.
return new ShimmedStdin();
return new NativeWritable(stdin);
return require("internal/fs/streams").writableFromFileSink(stdin);
}
case "inherit":
return process.stdin || null;
@@ -1151,13 +1143,12 @@ class ChildProcess extends EventEmitter {
case 1: {
switch (io) {
case "pipe": {
const value = this.#handle[fdToStdioName(i)];
const value = this.#handle[fdToStdioName(i) as any as number];
// This can happen if the process was already killed.
if (!value)
// This can happen if the process was already killed.
return new ShimmedStdioOutStream();
const pipe = ReadableFromWeb(value, { encoding });
const pipe = require('internal/streams/native-readable').constructNativeReadable(value, { encoding });
this.#closesNeeded++;
pipe.once("close", () => this.#maybeClose());
if (autoResume) pipe.resume();

View File

@@ -1,35 +1,9 @@
// Hardcoded module "node:stream" / "readable-stream"
const { kEnsureConstructed, kGetNativeReadableProto } = require("internal/shared");
const EE = require("node:events").EventEmitter;
const exports = require("internal/stream");
$debug("node:stream loaded");
var nativeReadableStreamPrototypes = {
0: undefined,
1: undefined,
2: undefined,
3: undefined,
4: undefined,
5: undefined,
};
function getNativeReadableStreamPrototype(nativeType, Readable) {
return (nativeReadableStreamPrototypes[nativeType] ??= require("internal/streams/nativereadable")());
}
/** --- Bun native stream wrapper --- */
exports[kGetNativeReadableProto] = getNativeReadableStreamPrototype;
exports.NativeWritable = require("internal/streams/nativewritable");
const {
newStreamReadableFromReadableStream: _ReadableFromWeb,
_ReadableFromWeb: _ReadableFromWebForUndici,
} = require("internal/webstreams_adapters");
exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUndici, kEnsureConstructed };
exports.eos = require("internal/streams/end-of-stream");
exports.EventEmitter = EE;

View File

@@ -1,3 +1,5 @@
// Note: please keep this module's loading constrants light, as some users
// import it just to call `isatty`. In that case, `node:stream` is not needed.
const {
setRawMode: ttySetMode,
isatty,
@@ -6,7 +8,7 @@ const {
const { validateInteger } = require("internal/validators");
function ReadStream(fd) {
function ReadStream(fd): void {
if (!(this instanceof ReadStream)) {
return new ReadStream(fd);
}
@@ -46,7 +48,8 @@ Object.defineProperty(ReadStream, "prototype", {
// If you call setRawMode before you call on('data'), the stream will
// not be constructed, leading to EBADF
this[require("node:stream")[Symbol.for("::bunternal::")].kEnsureConstructed]();
// This corresponds to the `ensureConstructed` function in `native-readable.ts`
this.$start();
const err = handle.setRawMode(flag);
if (err) {
@@ -74,11 +77,10 @@ Object.defineProperty(ReadStream, "prototype", {
configurable: true,
});
function WriteStream(fd) {
function WriteStream(fd): void {
if (!(this instanceof WriteStream)) return new WriteStream(fd);
const stream = require("node:fs").WriteStream.$call(this, "", { fd });
const stream = require("node:fs").WriteStream.$call(this, null, { fd });
stream.columns = undefined;
stream.rows = undefined;
stream.isTTY = isatty(stream.fd);

View File

@@ -447,8 +447,6 @@ describe("child_process double pipe", () => {
}),
);
// TODO(Derrick): We don't implement the full API for this yet,
// So stdin has no 'drain' event.
// TODO(@jasnell): This does not appear to ever be
// emitted. It's not clear if it is necessary.
fakeGrep.stdin.on("drain", () => {

View File

@@ -427,9 +427,11 @@ it("it accepts stdio passthrough", async () => {
stdio: ["ignore", "pipe", "pipe"],
env: bunEnv,
}));
console.log(package_dir);
const [err, out, exitCode] = await Promise.all([new Response(stderr).text(), new Response(stdout).text(), exited]);
try {
// This command outputs in either `["hello", "world"]` or `["world", "hello"]` order.
console.log({err, out});
expect([err.split("\n")[0], ...err.split("\n").slice(1, -1).sort(), err.split("\n").at(-1)]).toEqual([
"$ run-p echo-hello echo-world",
"$ echo hello",

View File

@@ -2,6 +2,7 @@
const { expect, test } = require("bun:test");
const fs = require("fs");
const { tmpdir, devNull } = require("os");
const { fsStreamInternals } = require('bun:internal-for-testing');
function getMaxFd() {
const dev_null = fs.openSync(devNull, "r");