From 7798af85248cc1049b826db159bc0cb2bbd8b249 Mon Sep 17 00:00:00 2001 From: snwy Date: Thu, 31 Oct 2024 17:57:01 -0700 Subject: [PATCH] one more passing test ... --- src/js/builtins/ReadableStreamInternals.ts | 2 +- src/js/internal/primordials.js | 2 + src/js/internal/streams/duplex.js | 58 +++++++------- src/js/internal/streams/legacy.js | 5 +- src/js/internal/streams/readable.js | 76 +++++++++---------- src/js/internal/streams/writable.js | 62 ++++++++------- .../test/parallel/test-stream-inheritance.js | 64 ++++++++++++++++ 7 files changed, 167 insertions(+), 102 deletions(-) create mode 100644 test/js/node/test/parallel/test-stream-inheritance.js diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index be82bca346..7bccabdbc9 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -1442,7 +1442,7 @@ export function readableStreamDefaultControllerCallPullIfNeeded(controller) { $alwaysInline = true; export function isReadableStreamLocked(stream) { - $assert($isReadableStream(stream)); + // $assert($isReadableStream(stream)); return ( // Case 1. Is there a reader actively using it? !!$getByIdDirectPrivate(stream, "reader") || diff --git a/src/js/internal/primordials.js b/src/js/internal/primordials.js index 16018b4e45..bc62a7cb5b 100644 --- a/src/js/internal/primordials.js +++ b/src/js/internal/primordials.js @@ -157,6 +157,7 @@ export default { ObjectSetPrototypeOf: Object.setPrototypeOf, Promise, PromiseResolve: Promise.resolve, + PromiseReject: Promise.reject, PromisePrototypeThen: uncurryThis(Promise.prototype.then), ReflectOwnKeys: Reflect.ownKeys, RegExp, @@ -273,4 +274,5 @@ export default { BigUint64Array, BigInt64Array, uncurryThis, + FunctionPrototypeSymbolHasInstance: Function.prototype[Symbol.hasInstance].bind(Function.prototype), }; diff --git a/src/js/internal/streams/duplex.js b/src/js/internal/streams/duplex.js index 89b2119392..b904608767 100644 --- a/src/js/internal/streams/duplex.js +++ b/src/js/internal/streams/duplex.js @@ -26,7 +26,7 @@ "use strict"; const primordials = require("internal/primordials"); -const { ObjectDefineProperties, ObjectGetOwnPropertyDescriptor, ObjectKeys, ObjectSetPrototypeOf } = primordials; +const { ObjectGetOwnPropertyDescriptor, ObjectKeys, ObjectSetPrototypeOf } = primordials; const Stream = require("internal/streams/legacy").Stream; const Readable = require("internal/streams/readable"); @@ -37,22 +37,6 @@ const { addAbortSignal } = require("internal/streams/add-abort-signal"); const destroyImpl = require("internal/streams/destroy"); const { kOnConstructed } = require("internal/streams/utils"); -Duplex.prototype = {}; -ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); -ObjectSetPrototypeOf(Duplex, Readable); - -{ - const keys = ObjectKeys(Writable.prototype); - // Allow the keys array to be GC'ed. - for (let i = 0; i < keys.length; i++) { - const method = keys[i]; - Duplex.prototype[method] ||= Writable.prototype[method]; - } -} - -// Use the `destroy` method of `Writable`. -Duplex.prototype.destroy = Writable.prototype.destroy; - function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); @@ -120,19 +104,39 @@ function Duplex(options) { } } -ObjectDefineProperties(Duplex.prototype, { - writable: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writable") }, +Duplex.prototype = Object.create(Readable.prototype); +Object.defineProperty(Duplex, Symbol.toStringTag, { + value: "Duplex", + configurable: true, +}); +const WritableProtoype = Writable.prototype; +ObjectSetPrototypeOf(Duplex, Readable); + +{ + const keys = ObjectKeys(WritableProtoype); + // Allow the keys array to be GC'ed. + for (let i = 0; i < keys.length; i++) { + const method = keys[i]; + Duplex.prototype[method] ||= WritableProtoype[method]; + } +} + +// Use the `destroy` method of `Writable`. +Duplex.prototype.destroy = WritableProtoype.destroy; + +Object.defineProperties(Duplex.prototype, { + writable: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writable") }, writableHighWaterMark: { __proto__: null, - ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableHighWaterMark"), + ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableHighWaterMark"), }, - writableObjectMode: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableObjectMode") }, - writableBuffer: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableBuffer") }, - writableLength: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableLength") }, - writableFinished: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableFinished") }, - writableCorked: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableCorked") }, - writableEnded: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableEnded") }, - writableNeedDrain: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(Writable.prototype, "writableNeedDrain") }, + writableObjectMode: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableObjectMode") }, + writableBuffer: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableBuffer") }, + writableLength: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableLength") }, + writableFinished: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableFinished") }, + writableCorked: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableCorked") }, + writableEnded: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableEnded") }, + writableNeedDrain: { __proto__: null, ...ObjectGetOwnPropertyDescriptor(WritableProtoype, "writableNeedDrain") }, destroyed: { __proto__: null, diff --git a/src/js/internal/streams/legacy.js b/src/js/internal/streams/legacy.js index 8beaf98252..2c7d1726f3 100644 --- a/src/js/internal/streams/legacy.js +++ b/src/js/internal/streams/legacy.js @@ -9,8 +9,9 @@ const EE = require("node:events"); function Stream(opts) { EE.$call(this, opts); } -Stream.prototype = {}; -ObjectSetPrototypeOf(Stream.prototype, EE.prototype); +Stream.prototype = { + __proto__: EE.prototype, +}; ObjectSetPrototypeOf(Stream, EE); Stream.prototype.pipe = function (dest, options) { diff --git a/src/js/internal/streams/readable.js b/src/js/internal/streams/readable.js index 910a334771..0319bb23ac 100644 --- a/src/js/internal/streams/readable.js +++ b/src/js/internal/streams/readable.js @@ -47,10 +47,6 @@ const { Buffer } = require("node:buffer"); const { addAbortSignal } = require("./add-abort-signal"); const eos = require("./end-of-stream"); -let debug = require("../../node/util").debuglog("stream", fn => { - debug = fn; -}); - const destroyImpl = require("./destroy"); const { getHighWaterMark, getDefaultHighWaterMark } = require("./state"); const { @@ -376,7 +372,7 @@ Readable.prototype[SymbolAsyncDispose] = function () { // similar to how Writable.write() returns true if you should // write() some more. Readable.prototype.push = function (chunk, encoding) { - debug("push", chunk); + $debug("push", chunk); const state = this._readableState; return (state[kState] & kObjectMode) === 0 @@ -386,7 +382,7 @@ Readable.prototype.push = function (chunk, encoding) { // Unshift should *always* be something directly out of read(). Readable.prototype.unshift = function (chunk, encoding) { - debug("unshift", chunk); + $debug("unshift", chunk); const state = this._readableState; return (state[kState] & kObjectMode) === 0 ? readableAddChunkUnshiftByteMode(this, state, chunk, encoding) @@ -623,7 +619,7 @@ function howMuchToRead(n, state) { // You can override either this method, or the async _read(n) below. Readable.prototype.read = function (n) { - debug("read", n); + $debug("read", n); // Same as parseInt(undefined, 10), however V8 7.3 performance regressed if (n === undefined) { n = NaN; @@ -647,7 +643,7 @@ Readable.prototype.read = function (n) { ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || (state[kState] & kEnded) !== 0) ) { - debug("read: emitReadable"); + $debug("read: emitReadable"); if (state.length === 0 && (state[kState] & kEnded) !== 0) endReadable(this); else emitReadable(this); return null; @@ -685,12 +681,12 @@ Readable.prototype.read = function (n) { // if we need a readable event, then we need to do some reading. let doRead = (state[kState] & kNeedReadable) !== 0; - debug("need readable", doRead); + $debug("need readable", doRead); // If we currently have less than the highWaterMark, then also read some. if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; - debug("length less than watermark", doRead); + $debug("length less than watermark", doRead); } // However, if we've ended, then there's no point, if we're already @@ -698,9 +694,9 @@ Readable.prototype.read = function (n) { // and if we're destroyed or errored, then it's not allowed, if ((state[kState] & (kReading | kEnded | kDestroyed | kErrored | kConstructed)) !== kConstructed) { doRead = false; - debug("reading, ended or constructing", doRead); + $debug("reading, ended or constructing", doRead); } else if (doRead) { - debug("do read"); + $debug("do read"); state[kState] |= kReading | kSync; // If the length is currently zero, then we *need* a readable event. if (state.length === 0) state[kState] |= kNeedReadable; @@ -752,7 +748,7 @@ Readable.prototype.read = function (n) { }; function onEofChunk(stream, state) { - debug("onEofChunk"); + $debug("onEofChunk"); if ((state[kState] & kEnded) !== 0) return; const decoder = (state[kState] & kDecoder) !== 0 ? state[kDecoderValue] : null; if (decoder) { @@ -784,10 +780,10 @@ function onEofChunk(stream, state) { // a nextTick recursion warning, but that's not so bad. function emitReadable(stream) { const state = stream._readableState; - debug("emitReadable"); + $debug("emitReadable"); state[kState] &= ~kNeedReadable; if ((state[kState] & kEmittedReadable) === 0) { - debug("emitReadable", (state[kState] & kFlowing) !== 0); + $debug("emitReadable", (state[kState] & kFlowing) !== 0); state[kState] |= kEmittedReadable; process.nextTick(emitReadable_, stream); } @@ -795,7 +791,7 @@ function emitReadable(stream) { function emitReadable_(stream) { const state = stream._readableState; - debug("emitReadable_"); + $debug("emitReadable_"); if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || (state[kState] & kEnded) !== 0)) { stream.emit("readable"); state[kState] &= ~kEmittedReadable; @@ -854,7 +850,7 @@ function maybeReadMore_(stream, state) { (state.length < state.highWaterMark || ((state[kState] & kFlowing) !== 0 && state.length === 0)) ) { const len = state.length; - debug("maybeReadMore read 0"); + $debug("maybeReadMore read 0"); stream.read(0); if (len === state.length) // Didn't get any data, stop spinning. @@ -883,7 +879,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { } state.pipes.push(dest); - debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts); + $debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts); const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr; @@ -893,7 +889,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { dest.on("unpipe", onunpipe); function onunpipe(readable, unpipeInfo) { - debug("onunpipe"); + $debug("onunpipe"); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; @@ -903,7 +899,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { } function onend() { - debug("onend"); + $debug("onend"); dest.end(); } @@ -911,7 +907,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { let cleanedUp = false; function cleanup() { - debug("cleanup"); + $debug("cleanup"); // Cleanup event handlers once the pipe is broken. dest.removeListener("close", onclose); dest.removeListener("finish", onfinish); @@ -941,11 +937,11 @@ Readable.prototype.pipe = function (dest, pipeOpts) { // => Check whether `dest` is still a piping destination. if (!cleanedUp) { if (state.pipes.length === 1 && state.pipes[0] === dest) { - debug("false write response, pause", 0); + $debug("false write response, pause", 0); state.awaitDrainWriters = dest; state[kState] &= ~kMultiAwaitDrain; } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - debug("false write response, pause", state.awaitDrainWriters.size); + $debug("false write response, pause", state.awaitDrainWriters.size); state.awaitDrainWriters.add(dest); } src.pause(); @@ -962,9 +958,9 @@ Readable.prototype.pipe = function (dest, pipeOpts) { src.on("data", ondata); function ondata(chunk) { - debug("ondata"); + $debug("ondata"); const ret = dest.write(chunk); - debug("dest.write", ret); + $debug("dest.write", ret); if (ret === false) { pause(); } @@ -973,7 +969,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { // If the dest has an error, then stop piping into it. // However, don't suppress the throwing behavior for this. function onerror(er) { - debug("onerror", er); + $debug("onerror", er); unpipe(); dest.removeListener("error", onerror); if (dest.listenerCount("error") === 0) { @@ -997,14 +993,14 @@ Readable.prototype.pipe = function (dest, pipeOpts) { } dest.once("close", onclose); function onfinish() { - debug("onfinish"); + $debug("onfinish"); dest.removeListener("close", onclose); unpipe(); } dest.once("finish", onfinish); function unpipe() { - debug("unpipe"); + $debug("unpipe"); src.unpipe(dest); } @@ -1016,7 +1012,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { if (dest.writableNeedDrain === true) { pause(); } else if ((state[kState] & kFlowing) === 0) { - debug("pipe resume"); + $debug("pipe resume"); src.resume(); } @@ -1031,10 +1027,10 @@ function pipeOnDrain(src, dest) { // `this` maybe not a reference to dest, // so we use the real dest here. if (state.awaitDrainWriters === dest) { - debug("pipeOnDrain", 1); + $debug("pipeOnDrain", 1); state.awaitDrainWriters = null; } else if ((state[kState] & kMultiAwaitDrain) !== 0) { - debug("pipeOnDrain", state.awaitDrainWriters.size); + $debug("pipeOnDrain", state.awaitDrainWriters.size); state.awaitDrainWriters.delete(dest); } @@ -1094,7 +1090,7 @@ Readable.prototype.on = function (ev, fn) { if ((state[kState] & (kEndEmitted | kReadableListening)) === 0) { state[kState] |= kReadableListening | kNeedReadable | kHasFlowing; state[kState] &= ~(kFlowing | kEmittedReadable); - debug("on readable"); + $debug("on readable"); if (state.length) { emitReadable(this); } else if ((state[kState] & kReading) === 0) { @@ -1167,7 +1163,7 @@ function updateReadableListening(self) { } function nReadingNextTick(self) { - debug("readable nexttick read 0"); + $debug("readable nexttick read 0"); self.read(0); } @@ -1176,7 +1172,7 @@ function nReadingNextTick(self) { Readable.prototype.resume = function () { const state = this._readableState; if ((state[kState] & kFlowing) === 0) { - debug("resume"); + $debug("resume"); // We flow only if there is no one listening // for readable, but we still have to call // resume(). @@ -1201,7 +1197,7 @@ function resume(stream, state) { } function resume_(stream, state) { - debug("resume", (state[kState] & kReading) !== 0); + $debug("resume", (state[kState] & kReading) !== 0); if ((state[kState] & kReading) === 0) { stream.read(0); } @@ -1214,9 +1210,9 @@ function resume_(stream, state) { Readable.prototype.pause = function () { const state = this._readableState; - debug("call pause"); + $debug("call pause"); if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) { - debug("pause"); + $debug("pause"); state[kState] |= kHasFlowing; state[kState] &= ~kFlowing; this.emit("pause"); @@ -1227,7 +1223,7 @@ Readable.prototype.pause = function () { function flow(stream) { const state = stream._readableState; - debug("flow"); + $debug("flow"); while ((state[kState] & kFlowing) !== 0 && stream.read() !== null); } @@ -1628,7 +1624,7 @@ function fromList(n, state) { function endReadable(stream) { const state = stream._readableState; - debug("endReadable"); + $debug("endReadable"); if ((state[kState] & kEndEmitted) === 0) { state[kState] |= kEnded; process.nextTick(endReadableNT, state, stream); @@ -1636,7 +1632,7 @@ function endReadable(stream) { } function endReadableNT(state, stream) { - debug("endReadableNT"); + $debug("endReadableNT"); // Check that we didn't get one last unshift. if ((state[kState] & (kErrored | kCloseEmitted | kEndEmitted)) === 0 && state.length === 0) { diff --git a/src/js/internal/streams/writable.js b/src/js/internal/streams/writable.js index 91371033d6..df777e9f84 100644 --- a/src/js/internal/streams/writable.js +++ b/src/js/internal/streams/writable.js @@ -23,12 +23,33 @@ // Implement an async ._write(chunk, encoding, cb), and it'll handle all // the drain event emission and buffering. -"use strict"; +const kSync = 1 << 9; +const kFinalCalled = 1 << 10; +const kNeedDrain = 1 << 11; +const kEnding = 1 << 12; +const kFinished = 1 << 13; +const kDecodeStrings = 1 << 14; +const kWriting = 1 << 15; +const kBufferProcessing = 1 << 16; +const kPrefinished = 1 << 17; +const kAllBuffers = 1 << 18; +const kAllNoop = 1 << 19; +const kOnFinished = 1 << 20; +const kHasWritable = 1 << 21; +const kWritable = 1 << 22; +const kCorked = 1 << 23; +const kDefaultUTF8Encoding = 1 << 24; +const kWriteCb = 1 << 25; +const kExpectWriteCb = 1 << 26; +const kAfterWriteTickInfo = 1 << 27; +const kAfterWritePending = 1 << 28; +const kBuffered = 1 << 29; +const kEnded = 1 << 30; + const primordials = require("internal/primordials"); const { ArrayPrototypeSlice, //Error, - FunctionPrototypeSymbolHasInstance, ObjectDefineProperties, ObjectDefineProperty, ObjectSetPrototypeOf, @@ -36,10 +57,8 @@ const { StringPrototypeToLowerCase, Symbol, SymbolAsyncDispose, - SymbolHasInstance, } = primordials; -export default Writable; Writable.WritableState = WritableState; const EE = require("node:events"); @@ -87,29 +106,6 @@ const kWriteCbValue = Symbol("kWriteCbValue"); const kAfterWriteTickInfoValue = Symbol("kAfterWriteTickInfoValue"); const kBufferedValue = Symbol("kBufferedValue"); -const kSync = 1 << 9; -const kFinalCalled = 1 << 10; -const kNeedDrain = 1 << 11; -const kEnding = 1 << 12; -const kFinished = 1 << 13; -const kDecodeStrings = 1 << 14; -const kWriting = 1 << 15; -const kBufferProcessing = 1 << 16; -const kPrefinished = 1 << 17; -const kAllBuffers = 1 << 18; -const kAllNoop = 1 << 19; -const kOnFinished = 1 << 20; -const kHasWritable = 1 << 21; -const kWritable = 1 << 22; -const kCorked = 1 << 23; -const kDefaultUTF8Encoding = 1 << 24; -const kWriteCb = 1 << 25; -const kExpectWriteCb = 1 << 26; -const kAfterWriteTickInfo = 1 << 27; -const kAfterWritePending = 1 << 28; -const kBuffered = 1 << 29; -const kEnded = 1 << 30; - // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { return { @@ -428,15 +424,15 @@ function Writable(options) { } } -Writable.prototype = {}; - -ObjectSetPrototypeOf(Writable.prototype, Stream.prototype); +Writable.prototype = Object.create(Stream.prototype); ObjectSetPrototypeOf(Writable, Stream); -ObjectDefineProperty(Writable, SymbolHasInstance, { +const OriginalInstanceOf = Function.prototype[Symbol.hasInstance]; +Object.defineProperty(Writable, Symbol.hasInstance, { __proto__: null, value: function (object) { - if (FunctionPrototypeSymbolHasInstance(this, object)) return true; + if (OriginalInstanceOf.$apply(this, arguments)) return true; + if (this !== Writable) return false; return object && object._writableState instanceof WritableState; @@ -1157,3 +1153,5 @@ Writable.prototype[SymbolAsyncDispose] = function () { eos(this, err => (err && err.name !== "AbortError" ? reject(err) : resolve(null))), ); }; + +export default Writable; diff --git a/test/js/node/test/parallel/test-stream-inheritance.js b/test/js/node/test/parallel/test-stream-inheritance.js new file mode 100644 index 0000000000..caa956289e --- /dev/null +++ b/test/js/node/test/parallel/test-stream-inheritance.js @@ -0,0 +1,64 @@ +'use strict'; +require('../common'); +const assert = require('assert'); +const { Readable, Writable, Duplex, Transform } = require('stream'); + +const readable = new Readable({ read() {} }); +const writable = new Writable({ write() {} }); +const duplex = new Duplex({ read() {}, write() {} }); +console.log(duplex); +const transform = new Transform({ transform() {} }); + +assert.ok(readable instanceof Readable); +assert.ok(!(writable instanceof Readable)); +assert.ok(duplex instanceof Readable); +assert.ok(transform instanceof Readable); + +assert.ok(!(readable instanceof Writable)); +assert.ok(writable instanceof Writable); +assert.ok(duplex instanceof Writable); +assert.ok(transform instanceof Writable); + +assert.ok(!(readable instanceof Duplex)); +assert.ok(!(writable instanceof Duplex)); +assert.ok(duplex instanceof Duplex); +assert.ok(transform instanceof Duplex); + +assert.ok(!(readable instanceof Transform)); +assert.ok(!(writable instanceof Transform)); +assert.ok(!(duplex instanceof Transform)); +assert.ok(transform instanceof Transform); + +assert.ok(!(null instanceof Writable)); +assert.ok(!(undefined instanceof Writable)); + +// Simple inheritance check for `Writable` works fine in a subclass constructor. +function CustomWritable() { + assert.ok( + this instanceof CustomWritable, + `${this} does not inherit from CustomWritable` + ); + assert.ok( + this instanceof Writable, + `${this} does not inherit from Writable` + ); +} + +Object.setPrototypeOf(CustomWritable, Writable); +Object.setPrototypeOf(CustomWritable.prototype, Writable.prototype); + +new CustomWritable(); + +assert.throws( + CustomWritable, + { + code: 'ERR_ASSERTION', + constructor: assert.AssertionError, + message: 'undefined does not inherit from CustomWritable' + } +); + +class OtherCustomWritable extends Writable {} + +assert(!(new OtherCustomWritable() instanceof CustomWritable)); +assert(!(new CustomWritable() instanceof OtherCustomWritable));