diff --git a/src/js/internal/streams/destroy.js b/src/js/internal/streams/destroy.js index e6b576da35..57d452152d 100644 --- a/src/js/internal/streams/destroy.js +++ b/src/js/internal/streams/destroy.js @@ -17,8 +17,27 @@ const { kAutoDestroy, kErrored, } = require("internal/streams/utils"); -const aggregateTwoErrors = (inner, outer) => { - return new AggregateError([inner, outer]); + +const aggregateTwoErrors = (innerError, outerError) => { + if (innerError && outerError && innerError !== outerError) { + if (Array.isArray(outerError.errors)) { + // If `outerError` is already an `AggregateError`. + outerError.errors.push(innerError); + return outerError; + } + let err; + + const limit = Error.stackTraceLimit; + Error.stackTraceLimit = 0; + // eslint-disable-next-line no-restricted-syntax + err = new AggregateError(new SafeArrayIterator([outerError, innerError]), outerError.message); + Error.stackTraceLimit = limit; + Error.captureStackTrace(err, aggregateTwoErrors); + + err.code = outerError.code; + return err; + } + return innerError || outerError; }; const kDestroy = Symbol("kDestroy"); diff --git a/src/js/internal/streams/duplex.js b/src/js/internal/streams/duplex.js index 0b0ed6a3fe..20c3addd2d 100644 --- a/src/js/internal/streams/duplex.js +++ b/src/js/internal/streams/duplex.js @@ -163,12 +163,12 @@ function lazyWebStreams() { return webStreamsAdapters; } -Duplex.fromWeb = function (pair, options) { - throw new Error("Not implemented"); +Duplex.fromWeb = function () { + throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported"); }; -Duplex.toWeb = function (duplex) { - throw new Error("Not implemented"); +Duplex.toWeb = function () { + throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported"); }; let duplexify; diff --git a/src/js/internal/streams/pipeline.js b/src/js/internal/streams/pipeline.js index 15f12f949d..114bfdce6e 100644 --- a/src/js/internal/streams/pipeline.js +++ b/src/js/internal/streams/pipeline.js @@ -283,7 +283,7 @@ function pipelineImpl(streams, callback, opts) { if (typeof stream === "function") { ret = stream({ signal }); if (!isIterable(ret)) { - throw $ERR_INVALID_RETURN_VALUE(); + throw $ERR_INVALID_RETURN_VALUE("Iterable, AsyncIterable or Stream", "source", ret); } } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { ret = stream; @@ -300,7 +300,7 @@ function pipelineImpl(streams, callback, opts) { if (reading) { if (!isIterable(ret, true)) { - throw $ERR_INVALID_RETURN_VALUE(); + throw $ERR_INVALID_RETURN_VALUE("AsyncIterable", `transform[${i - 1}]`, ret); } } else { PassThrough ??= require("internal/streams/passthrough"); @@ -344,7 +344,7 @@ function pipelineImpl(streams, callback, opts) { finishCount++; pumpToNode(toRead, pt, finish, { end }); } else { - throw $ERR_INVALID_RETURN_VALUE(); + throw $ERR_INVALID_RETURN_VALUE("AsyncIterable or Promise", "destination", ret); } ret = pt; @@ -370,7 +370,11 @@ function pipelineImpl(streams, callback, opts) { finishCount++; pumpToNode(ret, stream, finish, { end }); } else { - throw $ERR_INVALID_ARG_TYPE(); + throw $ERR_INVALID_ARG_TYPE( + "val", + ["Readable", "Iterable", "AsyncIterable", "ReadableStream", "TransformStream"], + ret, + ); } ret = stream; } else if (isWebStream(stream)) { @@ -384,7 +388,11 @@ function pipelineImpl(streams, callback, opts) { finishCount++; pumpToWeb(ret.readable, stream, finish, { end }); } else { - throw $ERR_INVALID_ARG_TYPE(); + throw $ERR_INVALID_ARG_TYPE( + "val", + ["Readable", "Iterable", "AsyncIterable", "ReadableStream", "TransformStream"], + ret, + ); } ret = stream; } else { diff --git a/src/js/internal/streams/readable.js b/src/js/internal/streams/readable.js index 35e6de540b..03955bbc80 100644 --- a/src/js/internal/streams/readable.js +++ b/src/js/internal/streams/readable.js @@ -868,7 +868,7 @@ function maybeReadMore_(stream, state) { // for virtual (non-string, non-buffer) streams, "length" is somewhat // arbitrary, and perhaps not very meaningful. Readable.prototype._read = function (n) { - throw $ERR_METHOD_NOT_IMPLEMENTED(); + throw $ERR_METHOD_NOT_IMPLEMENTED("The _read() method is not implemented"); }; Readable.prototype.pipe = function (dest, pipeOpts) { @@ -1671,15 +1671,8 @@ function endWritableNT(stream) { } Readable.from = function (iterable, opts) { - return from(Readable, iterable, opts); -}; - -Readable.filter = function (fn, options) { - return require("./operators").filter(fn, options); -}; - -Readable.map = function (fn, options) { - return require("./operators").map(fn, options); + let result = from(Readable, iterable, opts); + return result; }; let webStreamsAdapters; @@ -1690,12 +1683,12 @@ function lazyWebStreams() { return webStreamsAdapters; } -Readable.fromWeb = function (readableStream, options) { - throw $ERR_METHOD_NOT_IMPLEMENTED(); +Readable.fromWeb = function () { + throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported"); }; -Readable.toWeb = function (streamReadable, options) { - throw $ERR_METHOD_NOT_IMPLEMENTED(); +Readable.toWeb = function () { + throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported"); }; Readable.wrap = function (src, options) { diff --git a/src/js/internal/streams/writable.js b/src/js/internal/streams/writable.js index 73da232c7c..6b43a66f7b 100644 --- a/src/js/internal/streams/writable.js +++ b/src/js/internal/streams/writable.js @@ -445,7 +445,7 @@ ObjectDefineProperty(Writable, SymbolHasInstance, { // Otherwise people can pipe Writable streams, which is just wrong. Writable.prototype.pipe = function () { - errorOrDestroy(this, $ERR_STREAM_CANNOT_PIPE()); + errorOrDestroy(this, $ERR_STREAM_CANNOT_PIPE("writable", this)); }; function _write(stream, chunk, encoding, cb) { @@ -456,7 +456,7 @@ function _write(stream, chunk, encoding, cb) { } if (chunk === null) { - throw $ERR_STREAM_NULL_VALUES(); + throw $ERR_STREAM_NULL_VALUES("chunk"); } if ((state[kState] & kObjectMode) === 0) { @@ -531,7 +531,9 @@ Writable.prototype.uncork = function () { Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { // node::ParseEncoding() requires lower case. if (typeof encoding === "string") encoding = StringPrototypeToLowerCase(encoding); - if (!Buffer.isEncoding(encoding)) throw $ERR_UNKNOWN_ENCODING(encoding); + if (!Buffer.isEncoding(encoding)) { + throw $ERR_UNKNOWN_ENCODING(encoding); + } this._writableState.defaultEncoding = encoding; return this; }; @@ -792,7 +794,7 @@ Writable.prototype._write = function (chunk, encoding, cb) { if (this._writev) { this._writev([{ chunk, encoding }], cb); } else { - throw $ERR_METHOD_NOT_IMPLEMENTED(); + throw $ERR_METHOD_NOT_IMPLEMENTED("The _write() method is not implemented"); } }; @@ -1137,12 +1139,12 @@ function lazyWebStreams() { return webStreamsAdapters; } -Writable.fromWeb = function (writableStream, options) { - throw $ERR_METHOD_NOT_IMPLEMENTED(); +Writable.fromWeb = function () { + throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported"); }; -Writable.toWeb = function (streamWritable) { - throw $ERR_METHOD_NOT_IMPLEMENTED(); +Writable.toWeb = function () { + throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported"); }; Writable.prototype[SymbolAsyncDispose] = function () { diff --git a/src/js/node/net.ts b/src/js/node/net.ts index 868de1d27b..e72cd20110 100644 --- a/src/js/node/net.ts +++ b/src/js/node/net.ts @@ -817,7 +817,7 @@ const Socket = (function (InternalSocket) { } _write(chunk, encoding, callback) { - if (typeof chunk == "string" && encoding !== "ascii") chunk = Buffer.from(chunk, encoding); + if (typeof chunk === "string" && encoding !== "ascii") chunk = Buffer.from(chunk, encoding); var written = this[bunSocketInternal]?.write(chunk); if (written == chunk.length) { diff --git a/src/js/node/stream.js b/src/js/node/stream.js index 3f42c89bdb..1bc3a2ad45 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -55,7 +55,7 @@ for (let i = 0; i < streamKeys.length; i++) { if (new.target) { throw $ERR_ILLEGAL_CONSTRUCTOR(); } - return Stream.Readable.from(Reflect.$apply(op, this, args)); + return Stream.Readable.from(op.$apply(this, args)); } Object.$defineProperty(fn, "name", { __proto__: null, value: op.name }); Object.$defineProperty(fn, "length", { __proto__: null, value: op.length }); @@ -75,7 +75,7 @@ for (let i = 0; i < promiseKeys.length; i++) { if (new.target) { throw $ERR_ILLEGAL_CONSTRUCTOR(); } - return Reflect.$apply(op, this, args); + return op.$apply(this, args); } Object.$defineProperty(fn, "name", { __proto__: null, value: op.name }); Object.$defineProperty(fn, "length", { __proto__: null, value: op.length }); diff --git a/test/js/node/test/parallel/test-stream-readable-with-unimplemented-_read.js b/test/js/node/test/parallel/test-stream-readable-with-unimplemented-_read.js new file mode 100644 index 0000000000..85e83aa3b6 --- /dev/null +++ b/test/js/node/test/parallel/test-stream-readable-with-unimplemented-_read.js @@ -0,0 +1,13 @@ +'use strict'; +const common = require('../common'); +const { Readable } = require('stream'); + +const readable = new Readable(); + +readable.read(); +readable.on('error', common.expectsError({ + code: 'ERR_METHOD_NOT_IMPLEMENTED', + name: 'Error', + message: 'The _read() method is not implemented' +})); +readable.on('close', common.mustCall()); diff --git a/test/js/node/test/parallel/test-stream-writable-constructor-set-methods.js b/test/js/node/test/parallel/test-stream-writable-constructor-set-methods.js new file mode 100644 index 0000000000..34fda8edda --- /dev/null +++ b/test/js/node/test/parallel/test-stream-writable-constructor-set-methods.js @@ -0,0 +1,41 @@ +'use strict'; +const common = require('../common'); + +const assert = require('assert'); +const { Writable } = require('stream'); + +const bufferBlerg = Buffer.from('blerg'); +const w = new Writable(); + +assert.throws( + () => { + w.end(bufferBlerg); + }, + { + name: 'Error', + code: 'ERR_METHOD_NOT_IMPLEMENTED', + message: 'The _write() method is not implemented' + } +); + +const _write = common.mustCall((chunk, _, next) => { + next(); +}); + +const _writev = common.mustCall((chunks, next) => { + assert.strictEqual(chunks.length, 2); + next(); +}); + +const w2 = new Writable({ write: _write, writev: _writev }); + +assert.strictEqual(w2._write, _write); +assert.strictEqual(w2._writev, _writev); + +w2.write(bufferBlerg); + +w2.cork(); +w2.write(bufferBlerg); +w2.write(bufferBlerg); + +w2.end(); diff --git a/test/js/node/test/parallel/test-stream-writable-invalid-chunk.js b/test/js/node/test/parallel/test-stream-writable-invalid-chunk.js new file mode 100644 index 0000000000..09032c07c5 --- /dev/null +++ b/test/js/node/test/parallel/test-stream-writable-invalid-chunk.js @@ -0,0 +1,36 @@ +'use strict'; + +const common = require('../common'); +const stream = require('stream'); +const assert = require('assert'); + +function testWriteType(val, objectMode, code) { + const writable = new stream.Writable({ + objectMode, + write: () => {} + }); + writable.on('error', common.mustNotCall()); + if (code) { + assert.throws(() => { + writable.write(val); + }, { code }); + } else { + writable.write(val); + } +} + +testWriteType([], false, 'ERR_INVALID_ARG_TYPE'); +testWriteType({}, false, 'ERR_INVALID_ARG_TYPE'); +testWriteType(0, false, 'ERR_INVALID_ARG_TYPE'); +testWriteType(true, false, 'ERR_INVALID_ARG_TYPE'); +testWriteType(0.0, false, 'ERR_INVALID_ARG_TYPE'); +testWriteType(undefined, false, 'ERR_INVALID_ARG_TYPE'); +testWriteType(null, false, 'ERR_STREAM_NULL_VALUES'); + +testWriteType([], true); +testWriteType({}, true); +testWriteType(0, true); +testWriteType(true, true); +testWriteType(0.0, true); +testWriteType(undefined, true); +testWriteType(null, true, 'ERR_STREAM_NULL_VALUES'); diff --git a/test/js/node/test/parallel/test-stream-writable-null.js b/test/js/node/test/parallel/test-stream-writable-null.js new file mode 100644 index 0000000000..99419f1cf9 --- /dev/null +++ b/test/js/node/test/parallel/test-stream-writable-null.js @@ -0,0 +1,47 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const stream = require('stream'); + +class MyWritable extends stream.Writable { + constructor(options) { + super({ autoDestroy: false, ...options }); + } + _write(chunk, encoding, callback) { + assert.notStrictEqual(chunk, null); + callback(); + } +} + +{ + const m = new MyWritable({ objectMode: true }); + m.on('error', common.mustNotCall()); + assert.throws(() => { + m.write(null); + }, { + code: 'ERR_STREAM_NULL_VALUES' + }); +} + +{ + const m = new MyWritable(); + m.on('error', common.mustNotCall()); + assert.throws(() => { + m.write(false); + }, { + code: 'ERR_INVALID_ARG_TYPE' + }); +} + +{ // Should not throw. + const m = new MyWritable({ objectMode: true }); + m.write(false, assert.ifError); +} + +{ // Should not throw. + const m = new MyWritable({ objectMode: true }).on('error', (e) => { + assert.ifError(e || new Error('should not get here')); + }); + m.write(false, assert.ifError); +} diff --git a/test/js/node/test/parallel/test-stream-writable-write-error.js b/test/js/node/test/parallel/test-stream-writable-write-error.js new file mode 100644 index 0000000000..069e32e1be --- /dev/null +++ b/test/js/node/test/parallel/test-stream-writable-write-error.js @@ -0,0 +1,75 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { Writable } = require('stream'); + +function expectError(w, args, code, sync) { + if (sync) { + if (code) { + assert.throws(() => w.write(...args), { code }); + } else { + w.write(...args); + } + } else { + let errorCalled = false; + let ticked = false; + w.write(...args, common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(errorCalled, false); + assert.strictEqual(err.code, code); + })); + ticked = true; + w.on('error', common.mustCall((err) => { + errorCalled = true; + assert.strictEqual(err.code, code); + })); + } +} + +function test(autoDestroy) { + { + const w = new Writable({ + autoDestroy, + _write() {} + }); + w.end(); + expectError(w, ['asd'], 'ERR_STREAM_WRITE_AFTER_END'); + } + + { + const w = new Writable({ + autoDestroy, + _write() {} + }); + w.destroy(); + } + + { + const w = new Writable({ + autoDestroy, + _write() {} + }); + expectError(w, [null], 'ERR_STREAM_NULL_VALUES', true); + } + + { + const w = new Writable({ + autoDestroy, + _write() {} + }); + expectError(w, [{}], 'ERR_INVALID_ARG_TYPE', true); + } + + { + const w = new Writable({ + decodeStrings: false, + autoDestroy, + _write() {} + }); + expectError(w, ['asd', 'noencoding'], 'ERR_UNKNOWN_ENCODING', true); + } +} + +test(false); +test(true);