one more passing test ...

This commit is contained in:
snwy
2024-10-31 17:57:01 -07:00
parent c168e26987
commit 7798af8524
7 changed files with 167 additions and 102 deletions

View File

@@ -1442,7 +1442,7 @@ export function readableStreamDefaultControllerCallPullIfNeeded(controller) {
$alwaysInline = true;
export function isReadableStreamLocked(stream) {
$assert($isReadableStream(stream));
// $assert($isReadableStream(stream));
return (
// Case 1. Is there a reader actively using it?
!!$getByIdDirectPrivate(stream, "reader") ||

View File

@@ -157,6 +157,7 @@ export default {
ObjectSetPrototypeOf: Object.setPrototypeOf,
Promise,
PromiseResolve: Promise.resolve,
PromiseReject: Promise.reject,
PromisePrototypeThen: uncurryThis(Promise.prototype.then),
ReflectOwnKeys: Reflect.ownKeys,
RegExp,
@@ -273,4 +274,5 @@ export default {
BigUint64Array,
BigInt64Array,
uncurryThis,
FunctionPrototypeSymbolHasInstance: Function.prototype[Symbol.hasInstance].bind(Function.prototype),
};

View File

@@ -26,7 +26,7 @@
"use strict";
const primordials = require("internal/primordials");
const { ObjectDefineProperties, ObjectGetOwnPropertyDescriptor, ObjectKeys, ObjectSetPrototypeOf } = primordials;
const { ObjectGetOwnPropertyDescriptor, ObjectKeys, ObjectSetPrototypeOf } = primordials;
const Stream = require("internal/streams/legacy").Stream;
const Readable = require("internal/streams/readable");
@@ -37,22 +37,6 @@ const { addAbortSignal } = require("internal/streams/add-abort-signal");
const destroyImpl = require("internal/streams/destroy");
const { kOnConstructed } = require("internal/streams/utils");
Duplex.prototype = {};
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
ObjectSetPrototypeOf(Duplex, Readable);
{
const keys = ObjectKeys(Writable.prototype);
// Allow the keys array to be GC'ed.
for (let i = 0; i < keys.length; i++) {
const method = keys[i];
Duplex.prototype[method] ||= Writable.prototype[method];
}
}
// Use the `destroy` method of `Writable`.
Duplex.prototype.destroy = Writable.prototype.destroy;
function Duplex(options) {
if (!(this instanceof Duplex)) return new Duplex(options);
@@ -120,19 +104,39 @@ function Duplex(options) {
}
}
ObjectDefineProperties(Duplex.prototype, {
writable: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writable") },
Duplex.prototype = Object.create(Readable.prototype);
Object.defineProperty(Duplex, Symbol.toStringTag, {
value: "Duplex",
configurable: true,
});
const WritableProtoype = Writable.prototype;
ObjectSetPrototypeOf(Duplex, Readable);
{
const keys = ObjectKeys(WritableProtoype);
// Allow the keys array to be GC'ed.
for (let i = 0; i < keys.length; i++) {
const method = keys[i];
Duplex.prototype[method] ||= WritableProtoype[method];
}
}
// Use the `destroy` method of `Writable`.
Duplex.prototype.destroy = WritableProtoype.destroy;
Object.defineProperties(Duplex.prototype, {
writable: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writable") },
writableHighWaterMark: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableHighWaterMark"),
...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableHighWaterMark"),
},
writableObjectMode: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableObjectMode") },
writableBuffer: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableBuffer") },
writableLength: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableLength") },
writableFinished: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableFinished") },
writableCorked: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableCorked") },
writableEnded: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableEnded") },
writableNeedDrain: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableNeedDrain") },
writableObjectMode: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableObjectMode") },
writableBuffer: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableBuffer") },
writableLength: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableLength") },
writableFinished: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableFinished") },
writableCorked: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableCorked") },
writableEnded: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableEnded") },
writableNeedDrain: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableNeedDrain") },
destroyed: {
__proto__: null,

View File

@@ -9,8 +9,9 @@ const EE = require("node:events");
function Stream(opts) {
EE.$call(this, opts);
}
Stream.prototype = {};
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
Stream.prototype = {
__proto__: EE.prototype,
};
ObjectSetPrototypeOf(Stream, EE);
Stream.prototype.pipe = function (dest, options) {

View File

@@ -47,10 +47,6 @@ const { Buffer } = require("node:buffer");
const { addAbortSignal } = require("./add-abort-signal");
const eos = require("./end-of-stream");
let debug = require("../../node/util").debuglog("stream", fn => {
debug = fn;
});
const destroyImpl = require("./destroy");
const { getHighWaterMark, getDefaultHighWaterMark } = require("./state");
const {
@@ -376,7 +372,7 @@ Readable.prototype[SymbolAsyncDispose] = function () {
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function (chunk, encoding) {
debug("push", chunk);
$debug("push", chunk);
const state = this._readableState;
return (state[kState] & kObjectMode) === 0
@@ -386,7 +382,7 @@ Readable.prototype.push = function (chunk, encoding) {
// Unshift should *always* be something directly out of read().
Readable.prototype.unshift = function (chunk, encoding) {
debug("unshift", chunk);
$debug("unshift", chunk);
const state = this._readableState;
return (state[kState] & kObjectMode) === 0
? readableAddChunkUnshiftByteMode(this, state, chunk, encoding)
@@ -623,7 +619,7 @@ function howMuchToRead(n, state) {
// You can override either this method, or the async _read(n) below.
Readable.prototype.read = function (n) {
debug("read", n);
$debug("read", n);
// Same as parseInt(undefined, 10), however V8 7.3 performance regressed
if (n === undefined) {
n = NaN;
@@ -647,7 +643,7 @@ Readable.prototype.read = function (n) {
((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) ||
(state[kState] & kEnded) !== 0)
) {
debug("read: emitReadable");
$debug("read: emitReadable");
if (state.length === 0 && (state[kState] & kEnded) !== 0) endReadable(this);
else emitReadable(this);
return null;
@@ -685,12 +681,12 @@ Readable.prototype.read = function (n) {
// if we need a readable event, then we need to do some reading.
let doRead = (state[kState] & kNeedReadable) !== 0;
debug("need readable", doRead);
$debug("need readable", doRead);
// If we currently have less than the highWaterMark, then also read some.
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug("length less than watermark", doRead);
$debug("length less than watermark", doRead);
}
// However, if we've ended, then there's no point, if we're already
@@ -698,9 +694,9 @@ Readable.prototype.read = function (n) {
// and if we're destroyed or errored, then it's not allowed,
if ((state[kState] & (kReading | kEnded | kDestroyed | kErrored | kConstructed)) !== kConstructed) {
doRead = false;
debug("reading, ended or constructing", doRead);
$debug("reading, ended or constructing", doRead);
} else if (doRead) {
debug("do read");
$debug("do read");
state[kState] |= kReading | kSync;
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0) state[kState] |= kNeedReadable;
@@ -752,7 +748,7 @@ Readable.prototype.read = function (n) {
};
function onEofChunk(stream, state) {
debug("onEofChunk");
$debug("onEofChunk");
if ((state[kState] & kEnded) !== 0) return;
const decoder = (state[kState] & kDecoder) !== 0 ? state[kDecoderValue] : null;
if (decoder) {
@@ -784,10 +780,10 @@ function onEofChunk(stream, state) {
// a nextTick recursion warning, but that's not so bad.
function emitReadable(stream) {
const state = stream._readableState;
debug("emitReadable");
$debug("emitReadable");
state[kState] &= ~kNeedReadable;
if ((state[kState] & kEmittedReadable) === 0) {
debug("emitReadable", (state[kState] & kFlowing) !== 0);
$debug("emitReadable", (state[kState] & kFlowing) !== 0);
state[kState] |= kEmittedReadable;
process.nextTick(emitReadable_, stream);
}
@@ -795,7 +791,7 @@ function emitReadable(stream) {
function emitReadable_(stream) {
const state = stream._readableState;
debug("emitReadable_");
$debug("emitReadable_");
if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || (state[kState] & kEnded) !== 0)) {
stream.emit("readable");
state[kState] &= ~kEmittedReadable;
@@ -854,7 +850,7 @@ function maybeReadMore_(stream, state) {
(state.length < state.highWaterMark || ((state[kState] & kFlowing) !== 0 && state.length === 0))
) {
const len = state.length;
debug("maybeReadMore read 0");
$debug("maybeReadMore read 0");
stream.read(0);
if (len === state.length)
// Didn't get any data, stop spinning.
@@ -883,7 +879,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
}
state.pipes.push(dest);
debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts);
$debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts);
const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr;
@@ -893,7 +889,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
dest.on("unpipe", onunpipe);
function onunpipe(readable, unpipeInfo) {
debug("onunpipe");
$debug("onunpipe");
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
@@ -903,7 +899,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
}
function onend() {
debug("onend");
$debug("onend");
dest.end();
}
@@ -911,7 +907,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
let cleanedUp = false;
function cleanup() {
debug("cleanup");
$debug("cleanup");
// Cleanup event handlers once the pipe is broken.
dest.removeListener("close", onclose);
dest.removeListener("finish", onfinish);
@@ -941,11 +937,11 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug("false write response, pause", 0);
$debug("false write response, pause", 0);
state.awaitDrainWriters = dest;
state[kState] &= ~kMultiAwaitDrain;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug("false write response, pause", state.awaitDrainWriters.size);
$debug("false write response, pause", state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
@@ -962,9 +958,9 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
src.on("data", ondata);
function ondata(chunk) {
debug("ondata");
$debug("ondata");
const ret = dest.write(chunk);
debug("dest.write", ret);
$debug("dest.write", ret);
if (ret === false) {
pause();
}
@@ -973,7 +969,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
// If the dest has an error, then stop piping into it.
// However, don't suppress the throwing behavior for this.
function onerror(er) {
debug("onerror", er);
$debug("onerror", er);
unpipe();
dest.removeListener("error", onerror);
if (dest.listenerCount("error") === 0) {
@@ -997,14 +993,14 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
}
dest.once("close", onclose);
function onfinish() {
debug("onfinish");
$debug("onfinish");
dest.removeListener("close", onclose);
unpipe();
}
dest.once("finish", onfinish);
function unpipe() {
debug("unpipe");
$debug("unpipe");
src.unpipe(dest);
}
@@ -1016,7 +1012,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
if (dest.writableNeedDrain === true) {
pause();
} else if ((state[kState] & kFlowing) === 0) {
debug("pipe resume");
$debug("pipe resume");
src.resume();
}
@@ -1031,10 +1027,10 @@ function pipeOnDrain(src, dest) {
// `this` maybe not a reference to dest,
// so we use the real dest here.
if (state.awaitDrainWriters === dest) {
debug("pipeOnDrain", 1);
$debug("pipeOnDrain", 1);
state.awaitDrainWriters = null;
} else if ((state[kState] & kMultiAwaitDrain) !== 0) {
debug("pipeOnDrain", state.awaitDrainWriters.size);
$debug("pipeOnDrain", state.awaitDrainWriters.size);
state.awaitDrainWriters.delete(dest);
}
@@ -1094,7 +1090,7 @@ Readable.prototype.on = function (ev, fn) {
if ((state[kState] & (kEndEmitted | kReadableListening)) === 0) {
state[kState] |= kReadableListening | kNeedReadable | kHasFlowing;
state[kState] &= ~(kFlowing | kEmittedReadable);
debug("on readable");
$debug("on readable");
if (state.length) {
emitReadable(this);
} else if ((state[kState] & kReading) === 0) {
@@ -1167,7 +1163,7 @@ function updateReadableListening(self) {
}
function nReadingNextTick(self) {
debug("readable nexttick read 0");
$debug("readable nexttick read 0");
self.read(0);
}
@@ -1176,7 +1172,7 @@ function nReadingNextTick(self) {
Readable.prototype.resume = function () {
const state = this._readableState;
if ((state[kState] & kFlowing) === 0) {
debug("resume");
$debug("resume");
// We flow only if there is no one listening
// for readable, but we still have to call
// resume().
@@ -1201,7 +1197,7 @@ function resume(stream, state) {
}
function resume_(stream, state) {
debug("resume", (state[kState] & kReading) !== 0);
$debug("resume", (state[kState] & kReading) !== 0);
if ((state[kState] & kReading) === 0) {
stream.read(0);
}
@@ -1214,9 +1210,9 @@ function resume_(stream, state) {
Readable.prototype.pause = function () {
const state = this._readableState;
debug("call pause");
$debug("call pause");
if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) {
debug("pause");
$debug("pause");
state[kState] |= kHasFlowing;
state[kState] &= ~kFlowing;
this.emit("pause");
@@ -1227,7 +1223,7 @@ Readable.prototype.pause = function () {
function flow(stream) {
const state = stream._readableState;
debug("flow");
$debug("flow");
while ((state[kState] & kFlowing) !== 0 && stream.read() !== null);
}
@@ -1628,7 +1624,7 @@ function fromList(n, state) {
function endReadable(stream) {
const state = stream._readableState;
debug("endReadable");
$debug("endReadable");
if ((state[kState] & kEndEmitted) === 0) {
state[kState] |= kEnded;
process.nextTick(endReadableNT, state, stream);
@@ -1636,7 +1632,7 @@ function endReadable(stream) {
}
function endReadableNT(state, stream) {
debug("endReadableNT");
$debug("endReadableNT");
// Check that we didn't get one last unshift.
if ((state[kState] & (kErrored | kCloseEmitted | kEndEmitted)) === 0 && state.length === 0) {

View File

@@ -23,12 +23,33 @@
// Implement an async ._write(chunk, encoding, cb), and it'll handle all
// the drain event emission and buffering.
"use strict";
const kSync = 1 << 9;
const kFinalCalled = 1 << 10;
const kNeedDrain = 1 << 11;
const kEnding = 1 << 12;
const kFinished = 1 << 13;
const kDecodeStrings = 1 << 14;
const kWriting = 1 << 15;
const kBufferProcessing = 1 << 16;
const kPrefinished = 1 << 17;
const kAllBuffers = 1 << 18;
const kAllNoop = 1 << 19;
const kOnFinished = 1 << 20;
const kHasWritable = 1 << 21;
const kWritable = 1 << 22;
const kCorked = 1 << 23;
const kDefaultUTF8Encoding = 1 << 24;
const kWriteCb = 1 << 25;
const kExpectWriteCb = 1 << 26;
const kAfterWriteTickInfo = 1 << 27;
const kAfterWritePending = 1 << 28;
const kBuffered = 1 << 29;
const kEnded = 1 << 30;
const primordials = require("internal/primordials");
const {
ArrayPrototypeSlice,
//Error,
FunctionPrototypeSymbolHasInstance,
ObjectDefineProperties,
ObjectDefineProperty,
ObjectSetPrototypeOf,
@@ -36,10 +57,8 @@ const {
StringPrototypeToLowerCase,
Symbol,
SymbolAsyncDispose,
SymbolHasInstance,
} = primordials;
export default Writable;
Writable.WritableState = WritableState;
const EE = require("node:events");
@@ -87,29 +106,6 @@ const kWriteCbValue = Symbol("kWriteCbValue");
const kAfterWriteTickInfoValue = Symbol("kAfterWriteTickInfoValue");
const kBufferedValue = Symbol("kBufferedValue");
const kSync = 1 << 9;
const kFinalCalled = 1 << 10;
const kNeedDrain = 1 << 11;
const kEnding = 1 << 12;
const kFinished = 1 << 13;
const kDecodeStrings = 1 << 14;
const kWriting = 1 << 15;
const kBufferProcessing = 1 << 16;
const kPrefinished = 1 << 17;
const kAllBuffers = 1 << 18;
const kAllNoop = 1 << 19;
const kOnFinished = 1 << 20;
const kHasWritable = 1 << 21;
const kWritable = 1 << 22;
const kCorked = 1 << 23;
const kDefaultUTF8Encoding = 1 << 24;
const kWriteCb = 1 << 25;
const kExpectWriteCb = 1 << 26;
const kAfterWriteTickInfo = 1 << 27;
const kAfterWritePending = 1 << 28;
const kBuffered = 1 << 29;
const kEnded = 1 << 30;
// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
return {
@@ -428,15 +424,15 @@ function Writable(options) {
}
}
Writable.prototype = {};
ObjectSetPrototypeOf(Writable.prototype, Stream.prototype);
Writable.prototype = Object.create(Stream.prototype);
ObjectSetPrototypeOf(Writable, Stream);
ObjectDefineProperty(Writable, SymbolHasInstance, {
const OriginalInstanceOf = Function.prototype[Symbol.hasInstance];
Object.defineProperty(Writable, Symbol.hasInstance, {
__proto__: null,
value: function (object) {
if (FunctionPrototypeSymbolHasInstance(this, object)) return true;
if (OriginalInstanceOf.$apply(this, arguments)) return true;
if (this !== Writable) return false;
return object && object._writableState instanceof WritableState;
@@ -1157,3 +1153,5 @@ Writable.prototype[SymbolAsyncDispose] = function () {
eos(this, err => (err && err.name !== "AbortError" ? reject(err) : resolve(null))),
);
};
export default Writable;

View File

@@ -0,0 +1,64 @@
'use strict';
require('../common');
const assert = require('assert');
const { Readable, Writable, Duplex, Transform } = require('stream');
const readable = new Readable({ read() {} });
const writable = new Writable({ write() {} });
const duplex = new Duplex({ read() {}, write() {} });
console.log(duplex);
const transform = new Transform({ transform() {} });
assert.ok(readable instanceof Readable);
assert.ok(!(writable instanceof Readable));
assert.ok(duplex instanceof Readable);
assert.ok(transform instanceof Readable);
assert.ok(!(readable instanceof Writable));
assert.ok(writable instanceof Writable);
assert.ok(duplex instanceof Writable);
assert.ok(transform instanceof Writable);
assert.ok(!(readable instanceof Duplex));
assert.ok(!(writable instanceof Duplex));
assert.ok(duplex instanceof Duplex);
assert.ok(transform instanceof Duplex);
assert.ok(!(readable instanceof Transform));
assert.ok(!(writable instanceof Transform));
assert.ok(!(duplex instanceof Transform));
assert.ok(transform instanceof Transform);
assert.ok(!(null instanceof Writable));
assert.ok(!(undefined instanceof Writable));
// Simple inheritance check for `Writable` works fine in a subclass constructor.
function CustomWritable() {
assert.ok(
this instanceof CustomWritable,
`${this} does not inherit from CustomWritable`
);
assert.ok(
this instanceof Writable,
`${this} does not inherit from Writable`
);
}
Object.setPrototypeOf(CustomWritable, Writable);
Object.setPrototypeOf(CustomWritable.prototype, Writable.prototype);
new CustomWritable();
assert.throws(
CustomWritable,
{
code: 'ERR_ASSERTION',
constructor: assert.AssertionError,
message: 'undefined does not inherit from CustomWritable'
}
);
class OtherCustomWritable extends Writable {}
assert(!(new OtherCustomWritable() instanceof CustomWritable));
assert(!(new CustomWritable() instanceof OtherCustomWritable));