more passing

This commit is contained in:
snwy
2024-10-29 15:51:31 -07:00
parent 188778412d
commit 120bfbfafa
12 changed files with 270 additions and 36 deletions

View File

@@ -17,8 +17,27 @@ const {
kAutoDestroy,
kErrored,
} = require("internal/streams/utils");
const aggregateTwoErrors = (inner, outer) => {
return new AggregateError([inner, outer]);
const aggregateTwoErrors = (innerError, outerError) => {
if (innerError && outerError && innerError !== outerError) {
if (Array.isArray(outerError.errors)) {
// If `outerError` is already an `AggregateError`.
outerError.errors.push(innerError);
return outerError;
}
let err;
const limit = Error.stackTraceLimit;
Error.stackTraceLimit = 0;
// eslint-disable-next-line no-restricted-syntax
err = new AggregateError(new SafeArrayIterator([outerError, innerError]), outerError.message);
Error.stackTraceLimit = limit;
Error.captureStackTrace(err, aggregateTwoErrors);
err.code = outerError.code;
return err;
}
return innerError || outerError;
};
const kDestroy = Symbol("kDestroy");

View File

@@ -163,12 +163,12 @@ function lazyWebStreams() {
return webStreamsAdapters;
}
Duplex.fromWeb = function (pair, options) {
throw new Error("Not implemented");
Duplex.fromWeb = function () {
throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported");
};
Duplex.toWeb = function (duplex) {
throw new Error("Not implemented");
Duplex.toWeb = function () {
throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported");
};
let duplexify;

View File

@@ -283,7 +283,7 @@ function pipelineImpl(streams, callback, opts) {
if (typeof stream === "function") {
ret = stream({ signal });
if (!isIterable(ret)) {
throw $ERR_INVALID_RETURN_VALUE();
throw $ERR_INVALID_RETURN_VALUE("Iterable, AsyncIterable or Stream", "source", ret);
}
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
ret = stream;
@@ -300,7 +300,7 @@ function pipelineImpl(streams, callback, opts) {
if (reading) {
if (!isIterable(ret, true)) {
throw $ERR_INVALID_RETURN_VALUE();
throw $ERR_INVALID_RETURN_VALUE("AsyncIterable", `transform[${i - 1}]`, ret);
}
} else {
PassThrough ??= require("internal/streams/passthrough");
@@ -344,7 +344,7 @@ function pipelineImpl(streams, callback, opts) {
finishCount++;
pumpToNode(toRead, pt, finish, { end });
} else {
throw $ERR_INVALID_RETURN_VALUE();
throw $ERR_INVALID_RETURN_VALUE("AsyncIterable or Promise", "destination", ret);
}
ret = pt;
@@ -370,7 +370,11 @@ function pipelineImpl(streams, callback, opts) {
finishCount++;
pumpToNode(ret, stream, finish, { end });
} else {
throw $ERR_INVALID_ARG_TYPE();
throw $ERR_INVALID_ARG_TYPE(
"val",
["Readable", "Iterable", "AsyncIterable", "ReadableStream", "TransformStream"],
ret,
);
}
ret = stream;
} else if (isWebStream(stream)) {
@@ -384,7 +388,11 @@ function pipelineImpl(streams, callback, opts) {
finishCount++;
pumpToWeb(ret.readable, stream, finish, { end });
} else {
throw $ERR_INVALID_ARG_TYPE();
throw $ERR_INVALID_ARG_TYPE(
"val",
["Readable", "Iterable", "AsyncIterable", "ReadableStream", "TransformStream"],
ret,
);
}
ret = stream;
} else {

View File

@@ -868,7 +868,7 @@ function maybeReadMore_(stream, state) {
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function (n) {
throw $ERR_METHOD_NOT_IMPLEMENTED();
throw $ERR_METHOD_NOT_IMPLEMENTED("The _read() method is not implemented");
};
Readable.prototype.pipe = function (dest, pipeOpts) {
@@ -1671,15 +1671,8 @@ function endWritableNT(stream) {
}
Readable.from = function (iterable, opts) {
return from(Readable, iterable, opts);
};
Readable.filter = function (fn, options) {
return require("./operators").filter(fn, options);
};
Readable.map = function (fn, options) {
return require("./operators").map(fn, options);
let result = from(Readable, iterable, opts);
return result;
};
let webStreamsAdapters;
@@ -1690,12 +1683,12 @@ function lazyWebStreams() {
return webStreamsAdapters;
}
Readable.fromWeb = function (readableStream, options) {
throw $ERR_METHOD_NOT_IMPLEMENTED();
Readable.fromWeb = function () {
throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported");
};
Readable.toWeb = function (streamReadable, options) {
throw $ERR_METHOD_NOT_IMPLEMENTED();
Readable.toWeb = function () {
throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported");
};
Readable.wrap = function (src, options) {

View File

@@ -445,7 +445,7 @@ ObjectDefineProperty(Writable, SymbolHasInstance, {
// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function () {
errorOrDestroy(this, $ERR_STREAM_CANNOT_PIPE());
errorOrDestroy(this, $ERR_STREAM_CANNOT_PIPE("writable", this));
};
function _write(stream, chunk, encoding, cb) {
@@ -456,7 +456,7 @@ function _write(stream, chunk, encoding, cb) {
}
if (chunk === null) {
throw $ERR_STREAM_NULL_VALUES();
throw $ERR_STREAM_NULL_VALUES("chunk");
}
if ((state[kState] & kObjectMode) === 0) {
@@ -531,7 +531,9 @@ Writable.prototype.uncork = function () {
Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
// node::ParseEncoding() requires lower case.
if (typeof encoding === "string") encoding = StringPrototypeToLowerCase(encoding);
if (!Buffer.isEncoding(encoding)) throw $ERR_UNKNOWN_ENCODING(encoding);
if (!Buffer.isEncoding(encoding)) {
throw $ERR_UNKNOWN_ENCODING(encoding);
}
this._writableState.defaultEncoding = encoding;
return this;
};
@@ -792,7 +794,7 @@ Writable.prototype._write = function (chunk, encoding, cb) {
if (this._writev) {
this._writev([{ chunk, encoding }], cb);
} else {
throw $ERR_METHOD_NOT_IMPLEMENTED();
throw $ERR_METHOD_NOT_IMPLEMENTED("The _write() method is not implemented");
}
};
@@ -1137,12 +1139,12 @@ function lazyWebStreams() {
return webStreamsAdapters;
}
Writable.fromWeb = function (writableStream, options) {
throw $ERR_METHOD_NOT_IMPLEMENTED();
Writable.fromWeb = function () {
throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported");
};
Writable.toWeb = function (streamWritable) {
throw $ERR_METHOD_NOT_IMPLEMENTED();
Writable.toWeb = function () {
throw $ERR_METHOD_NOT_IMPLEMENTED("webStreams unsupported");
};
Writable.prototype[SymbolAsyncDispose] = function () {

View File

@@ -817,7 +817,7 @@ const Socket = (function (InternalSocket) {
}
_write(chunk, encoding, callback) {
if (typeof chunk == "string" && encoding !== "ascii") chunk = Buffer.from(chunk, encoding);
if (typeof chunk === "string" && encoding !== "ascii") chunk = Buffer.from(chunk, encoding);
var written = this[bunSocketInternal]?.write(chunk);
if (written == chunk.length) {

View File

@@ -55,7 +55,7 @@ for (let i = 0; i < streamKeys.length; i++) {
if (new.target) {
throw $ERR_ILLEGAL_CONSTRUCTOR();
}
return Stream.Readable.from(Reflect.$apply(op, this, args));
return Stream.Readable.from(op.$apply(this, args));
}
Object.$defineProperty(fn, "name", { __proto__: null, value: op.name });
Object.$defineProperty(fn, "length", { __proto__: null, value: op.length });
@@ -75,7 +75,7 @@ for (let i = 0; i < promiseKeys.length; i++) {
if (new.target) {
throw $ERR_ILLEGAL_CONSTRUCTOR();
}
return Reflect.$apply(op, this, args);
return op.$apply(this, args);
}
Object.$defineProperty(fn, "name", { __proto__: null, value: op.name });
Object.$defineProperty(fn, "length", { __proto__: null, value: op.length });

View File

@@ -0,0 +1,13 @@
'use strict';
const common = require('../common');
const { Readable } = require('stream');
const readable = new Readable();
readable.read();
readable.on('error', common.expectsError({
code: 'ERR_METHOD_NOT_IMPLEMENTED',
name: 'Error',
message: 'The _read() method is not implemented'
}));
readable.on('close', common.mustCall());

View File

@@ -0,0 +1,41 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Writable } = require('stream');
const bufferBlerg = Buffer.from('blerg');
const w = new Writable();
assert.throws(
() => {
w.end(bufferBlerg);
},
{
name: 'Error',
code: 'ERR_METHOD_NOT_IMPLEMENTED',
message: 'The _write() method is not implemented'
}
);
const _write = common.mustCall((chunk, _, next) => {
next();
});
const _writev = common.mustCall((chunks, next) => {
assert.strictEqual(chunks.length, 2);
next();
});
const w2 = new Writable({ write: _write, writev: _writev });
assert.strictEqual(w2._write, _write);
assert.strictEqual(w2._writev, _writev);
w2.write(bufferBlerg);
w2.cork();
w2.write(bufferBlerg);
w2.write(bufferBlerg);
w2.end();

View File

@@ -0,0 +1,36 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');
function testWriteType(val, objectMode, code) {
const writable = new stream.Writable({
objectMode,
write: () => {}
});
writable.on('error', common.mustNotCall());
if (code) {
assert.throws(() => {
writable.write(val);
}, { code });
} else {
writable.write(val);
}
}
testWriteType([], false, 'ERR_INVALID_ARG_TYPE');
testWriteType({}, false, 'ERR_INVALID_ARG_TYPE');
testWriteType(0, false, 'ERR_INVALID_ARG_TYPE');
testWriteType(true, false, 'ERR_INVALID_ARG_TYPE');
testWriteType(0.0, false, 'ERR_INVALID_ARG_TYPE');
testWriteType(undefined, false, 'ERR_INVALID_ARG_TYPE');
testWriteType(null, false, 'ERR_STREAM_NULL_VALUES');
testWriteType([], true);
testWriteType({}, true);
testWriteType(0, true);
testWriteType(true, true);
testWriteType(0.0, true);
testWriteType(undefined, true);
testWriteType(null, true, 'ERR_STREAM_NULL_VALUES');

View File

@@ -0,0 +1,47 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const stream = require('stream');
class MyWritable extends stream.Writable {
constructor(options) {
super({ autoDestroy: false, ...options });
}
_write(chunk, encoding, callback) {
assert.notStrictEqual(chunk, null);
callback();
}
}
{
const m = new MyWritable({ objectMode: true });
m.on('error', common.mustNotCall());
assert.throws(() => {
m.write(null);
}, {
code: 'ERR_STREAM_NULL_VALUES'
});
}
{
const m = new MyWritable();
m.on('error', common.mustNotCall());
assert.throws(() => {
m.write(false);
}, {
code: 'ERR_INVALID_ARG_TYPE'
});
}
{ // Should not throw.
const m = new MyWritable({ objectMode: true });
m.write(false, assert.ifError);
}
{ // Should not throw.
const m = new MyWritable({ objectMode: true }).on('error', (e) => {
assert.ifError(e || new Error('should not get here'));
});
m.write(false, assert.ifError);
}

View File

@@ -0,0 +1,75 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Writable } = require('stream');
function expectError(w, args, code, sync) {
if (sync) {
if (code) {
assert.throws(() => w.write(...args), { code });
} else {
w.write(...args);
}
} else {
let errorCalled = false;
let ticked = false;
w.write(...args, common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(errorCalled, false);
assert.strictEqual(err.code, code);
}));
ticked = true;
w.on('error', common.mustCall((err) => {
errorCalled = true;
assert.strictEqual(err.code, code);
}));
}
}
function test(autoDestroy) {
{
const w = new Writable({
autoDestroy,
_write() {}
});
w.end();
expectError(w, ['asd'], 'ERR_STREAM_WRITE_AFTER_END');
}
{
const w = new Writable({
autoDestroy,
_write() {}
});
w.destroy();
}
{
const w = new Writable({
autoDestroy,
_write() {}
});
expectError(w, [null], 'ERR_STREAM_NULL_VALUES', true);
}
{
const w = new Writable({
autoDestroy,
_write() {}
});
expectError(w, [{}], 'ERR_INVALID_ARG_TYPE', true);
}
{
const w = new Writable({
decodeStrings: false,
autoDestroy,
_write() {}
});
expectError(w, ['asd', 'noencoding'], 'ERR_UNKNOWN_ENCODING', true);
}
}
test(false);
test(true);