so cooked it's unreal

This commit is contained in:
snwy
2024-11-04 15:54:51 -08:00
parent b592d8a3d2
commit afba1deb6e
5 changed files with 128 additions and 7 deletions

View File

@@ -2,6 +2,9 @@
// permission from the author, Mathias Buus (@mafintosh).
"use strict";
const duplexify = require("internal/streams/duplexify");
const primordials = require("internal/primordials");
const { ArrayIsArray, Promise, SymbolAsyncIterator, SymbolDispose } = primordials;
@@ -71,10 +74,11 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadableNodeStream(val)) {
} else if (isReadableNodeStream(val, false)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
console.log("makeAsyncIterable", val);
throw $ERR_INVALID_ARG_TYPE("val", ["Readable", "Iterable", "AsyncIterable"], val);
}
@@ -195,10 +199,10 @@ function pipelineImpl(streams, callback, opts) {
validateAbortSignal(outerSignal, "options.signal");
function abort() {
finishImpl(new AbortError());
finishImpl(new AbortError(undefined, { cause: outerSignal?.reason }));
}
addAbortListener ??= require("../../node/events").addAbortListener;
addAbortListener ??= require("node:events").addAbortListener;
let disposable;
if (outerSignal) {
disposable = addAbortListener(outerSignal, abort);
@@ -238,7 +242,6 @@ function pipelineImpl(streams, callback, opts) {
if (!error) {
lastStreamCleanup.forEach(fn => fn());
}
process.nextTick(callback, error, value);
}
}
@@ -289,13 +292,13 @@ function pipelineImpl(streams, callback, opts) {
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
ret = duplexify(stream);
}
} else if (typeof stream === "function") {
if (isTransformStream(ret)) {
ret = makeAsyncIterable(ret?.readable);
} else {
// AAAA FUCK AAAAAAA
console.log(streams.length, i);
ret = makeAsyncIterable(ret);
}
ret = stream(ret, { signal });
@@ -382,7 +385,9 @@ function pipelineImpl(streams, callback, opts) {
} else if (isWebStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount++;
console.log("ret5", ret);
pumpToWeb(makeAsyncIterable(ret), stream, finish, { end });
console.log("ret6");
} else if (isReadableStream(ret) || isIterable(ret)) {
finishCount++;
pumpToWeb(ret, stream, finish, { end });

View File

@@ -110,7 +110,7 @@ Object.$defineProperty(Stream, "promises", {
},
});
Object.$defineProperty(Stream, "pipeline", {
Object.$defineProperty(pipeline, customPromisify, {
__proto__: null,
enumerable: true,
get() {

View File

@@ -0,0 +1,76 @@
'use strict';
const common = require('../common');
const { pipeline, Duplex, PassThrough, Writable } = require('stream');
const assert = require('assert');
process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'no way');
}, 2));
// Ensure that listeners is removed if last stream is readable
// And other stream's listeners unchanged
const a = new PassThrough();
a.end('foobar');
const b = new Duplex({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(a, b, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}
assert(a.listenerCount('error') > 0);
assert.strictEqual(b.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
b.destroy(new Error('no way'));
}, 100);
}));
// Async generators
const c = new PassThrough();
c.end('foobar');
const d = pipeline(
c,
async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
},
common.mustCall((error) => {
if (error) {
assert.ifError(error);
}
assert(c.listenerCount('error') > 0);
assert.strictEqual(d.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
d.destroy(new Error('no way'));
}, 100);
})
);
// If last stream is not readable, will not throw and remove listeners
const e = new PassThrough();
e.end('foobar');
const f = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(e, f, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}
assert(e.listenerCount('error') > 0);
assert(f.listenerCount('error') > 0);
setTimeout(() => {
assert(f.listenerCount('error') > 0);
f.destroy(new Error('no way'));
}, 100);
}));

View File

@@ -0,0 +1,22 @@
'use strict';
const common = require('../common');
const {
pipeline,
PassThrough
} = require('stream');
const assert = require('assert');
process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'error');
}));
// Ensure that pipeline that ends with Promise
// still propagates error to uncaughtException.
const s = new PassThrough();
s.end('data');
pipeline(s, async function(source) {
for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty
}, common.mustSucceed(() => {
throw new Error('error');
}));

View File

@@ -0,0 +1,18 @@
'use strict';
const common = require('../common');
const {
pipeline,
PassThrough
} = require('stream');
async function runTest() {
await pipeline(
'',
new PassThrough({ objectMode: true }),
common.mustCall(),
);
}
runTest().then(common.mustCall());