From afba1deb6e3c8dfc4976f29a75c475faa86e69ee Mon Sep 17 00:00:00 2001 From: snwy Date: Mon, 4 Nov 2024 15:54:51 -0800 Subject: [PATCH] so cooked it's unreal --- src/js/internal/streams/pipeline.js | 17 +++-- src/js/node/stream.js | 2 +- .../test-stream-pipeline-listeners.js | 76 +++++++++++++++++++ .../parallel/test-stream-pipeline-uncaught.js | 22 ++++++ .../test-stream-pipeline-with-empty-string.js | 18 +++++ 5 files changed, 128 insertions(+), 7 deletions(-) create mode 100644 test/js/node/test/parallel/test-stream-pipeline-listeners.js create mode 100644 test/js/node/test/parallel/test-stream-pipeline-uncaught.js create mode 100644 test/js/node/test/parallel/test-stream-pipeline-with-empty-string.js diff --git a/src/js/internal/streams/pipeline.js b/src/js/internal/streams/pipeline.js index 84583deab4..0fba2e597a 100644 --- a/src/js/internal/streams/pipeline.js +++ b/src/js/internal/streams/pipeline.js @@ -2,6 +2,9 @@ // permission from the author, Mathias Buus (@mafintosh). "use strict"; + +const duplexify = require("internal/streams/duplexify"); + const primordials = require("internal/primordials"); const { ArrayIsArray, Promise, SymbolAsyncIterator, SymbolDispose } = primordials; @@ -71,10 +74,11 @@ function popCallback(streams) { function makeAsyncIterable(val) { if (isIterable(val)) { return val; - } else if (isReadableNodeStream(val)) { + } else if (isReadableNodeStream(val, false)) { // Legacy streams are not Iterable. return fromReadable(val); } + console.log("makeAsyncIterable", val); throw $ERR_INVALID_ARG_TYPE("val", ["Readable", "Iterable", "AsyncIterable"], val); } @@ -195,10 +199,10 @@ function pipelineImpl(streams, callback, opts) { validateAbortSignal(outerSignal, "options.signal"); function abort() { - finishImpl(new AbortError()); + finishImpl(new AbortError(undefined, { cause: outerSignal?.reason })); } - addAbortListener ??= require("../../node/events").addAbortListener; + addAbortListener ??= require("node:events").addAbortListener; let disposable; if (outerSignal) { disposable = addAbortListener(outerSignal, abort); @@ -238,7 +242,6 @@ function pipelineImpl(streams, callback, opts) { if (!error) { lastStreamCleanup.forEach(fn => fn()); } - process.nextTick(callback, error, value); } } @@ -289,13 +292,13 @@ function pipelineImpl(streams, callback, opts) { } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { ret = stream; } else { - ret = Duplex.from(stream); + ret = duplexify(stream); } } else if (typeof stream === "function") { if (isTransformStream(ret)) { ret = makeAsyncIterable(ret?.readable); } else { - // AAAA FUCK AAAAAAA + console.log(streams.length, i); ret = makeAsyncIterable(ret); } ret = stream(ret, { signal }); @@ -382,7 +385,9 @@ function pipelineImpl(streams, callback, opts) { } else if (isWebStream(stream)) { if (isReadableNodeStream(ret)) { finishCount++; + console.log("ret5", ret); pumpToWeb(makeAsyncIterable(ret), stream, finish, { end }); + console.log("ret6"); } else if (isReadableStream(ret) || isIterable(ret)) { finishCount++; pumpToWeb(ret, stream, finish, { end }); diff --git a/src/js/node/stream.js b/src/js/node/stream.js index 1bc3a2ad45..d30489f7d7 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -110,7 +110,7 @@ Object.$defineProperty(Stream, "promises", { }, }); -Object.$defineProperty(Stream, "pipeline", { +Object.$defineProperty(pipeline, customPromisify, { __proto__: null, enumerable: true, get() { diff --git a/test/js/node/test/parallel/test-stream-pipeline-listeners.js b/test/js/node/test/parallel/test-stream-pipeline-listeners.js new file mode 100644 index 0000000000..81e287b77c --- /dev/null +++ b/test/js/node/test/parallel/test-stream-pipeline-listeners.js @@ -0,0 +1,76 @@ +'use strict'; + +const common = require('../common'); +const { pipeline, Duplex, PassThrough, Writable } = require('stream'); +const assert = require('assert'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'no way'); +}, 2)); + +// Ensure that listeners is removed if last stream is readable +// And other stream's listeners unchanged +const a = new PassThrough(); +a.end('foobar'); +const b = new Duplex({ + write(chunk, encoding, callback) { + callback(); + } +}); +pipeline(a, b, common.mustCall((error) => { + if (error) { + assert.ifError(error); + } + + assert(a.listenerCount('error') > 0); + assert.strictEqual(b.listenerCount('error'), 0); + setTimeout(() => { + assert.strictEqual(b.listenerCount('error'), 0); + b.destroy(new Error('no way')); + }, 100); +})); + +// Async generators +const c = new PassThrough(); +c.end('foobar'); +const d = pipeline( + c, + async function* (source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } + }, + common.mustCall((error) => { + if (error) { + assert.ifError(error); + } + + assert(c.listenerCount('error') > 0); + assert.strictEqual(d.listenerCount('error'), 0); + setTimeout(() => { + assert.strictEqual(b.listenerCount('error'), 0); + d.destroy(new Error('no way')); + }, 100); + }) +); + +// If last stream is not readable, will not throw and remove listeners +const e = new PassThrough(); +e.end('foobar'); +const f = new Writable({ + write(chunk, encoding, callback) { + callback(); + } +}); +pipeline(e, f, common.mustCall((error) => { + if (error) { + assert.ifError(error); + } + + assert(e.listenerCount('error') > 0); + assert(f.listenerCount('error') > 0); + setTimeout(() => { + assert(f.listenerCount('error') > 0); + f.destroy(new Error('no way')); + }, 100); +})); diff --git a/test/js/node/test/parallel/test-stream-pipeline-uncaught.js b/test/js/node/test/parallel/test-stream-pipeline-uncaught.js new file mode 100644 index 0000000000..8aa1c47b7f --- /dev/null +++ b/test/js/node/test/parallel/test-stream-pipeline-uncaught.js @@ -0,0 +1,22 @@ +'use strict'; + +const common = require('../common'); +const { + pipeline, + PassThrough +} = require('stream'); +const assert = require('assert'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'error'); +})); + +// Ensure that pipeline that ends with Promise +// still propagates error to uncaughtException. +const s = new PassThrough(); +s.end('data'); +pipeline(s, async function(source) { + for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty +}, common.mustSucceed(() => { + throw new Error('error'); +})); diff --git a/test/js/node/test/parallel/test-stream-pipeline-with-empty-string.js b/test/js/node/test/parallel/test-stream-pipeline-with-empty-string.js new file mode 100644 index 0000000000..5df1ff9edf --- /dev/null +++ b/test/js/node/test/parallel/test-stream-pipeline-with-empty-string.js @@ -0,0 +1,18 @@ +'use strict'; + +const common = require('../common'); +const { + pipeline, + PassThrough +} = require('stream'); + + +async function runTest() { + await pipeline( + '', + new PassThrough({ objectMode: true }), + common.mustCall(), + ); +} + +runTest().then(common.mustCall());