events.on: Add compatibility with readline module (#8885)

* fix: make event.on work with for...await of readline module

* fix: queueing events before next() call and multiple event vals

* test: add tests for events.on and readline for await...of

* style, docs

* [autofix.ci] apply automated fixes

* refactor: use removeListeners instead of check for done

* test: merge test files

* test: remove todo comment

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
This commit is contained in:
Yannik Schröder
2024-02-19 20:44:09 +01:00
committed by GitHub
parent 42d15ea853
commit 182d052d4b
5 changed files with 259 additions and 129 deletions

View File

@@ -11,6 +11,7 @@ const kMaxEventTargetListeners = Symbol("events.maxEventTargetListeners");
const kMaxEventTargetListenersWarned = Symbol("events.maxEventTargetListenersWarned");
const kWatermarkData = SymbolFor("nodejs.watermarkData");
const kRejection = SymbolFor("nodejs.rejection");
const kFirstEventParam = SymbolFor("nodejs.kFirstEventParam");
const captureRejectionSymbol = SymbolFor("nodejs.rejection");
const ArrayPrototypeSlice = Array.prototype.slice;
@@ -356,9 +357,8 @@ function once(emitter, type, options) {
});
}
async function* on(emitter, event, options = {}) {
function on(emitter, event, options = {}) {
const signal = options.signal;
if (signal?.aborted) throw new AbortError(undefined, { cause: signal?.reason });
const { FixedQueue } = require("internal/fixed_queue");
const unconsumedPromises = new FixedQueue();
@@ -366,42 +366,58 @@ async function* on(emitter, event, options = {}) {
const unconsumedErrors = new FixedQueue();
let done = false;
emitter.on(event, ev => {
const eventHandlerBody = ev => {
// If there is a pending Promise -> resolve with current event value.
if (!unconsumedPromises.isEmpty()) {
const { resolve } = unconsumedPromises.shift();
return resolve([ev]);
return resolve(ev);
}
unconsumedEvents.push([ev]);
});
emitter.on("error", ex => {
// Else: Add event value to queue so it can be consumed by a future Promise.
unconsumedEvents.push(ev);
};
const eventHandler = options[kFirstEventParam] ? eventHandlerBody : (...args) => eventHandlerBody(args);
emitter.on(event, eventHandler);
const errorHandler = ex => {
if (!unconsumedPromises.isEmpty()) {
const { reject } = unconsumedPromises.shift();
return reject(ex);
}
unconsumedErrors.push(ex);
});
};
emitter.on("error", errorHandler);
signal?.addEventListener("abort", () => {
emitter.emit("error", new AbortError(undefined, { cause: signal?.reason }));
});
// If any of the close events is emitted -> remove listeners
// and yield only the remaining queued-up values in iterator.
for (const evName of options?.close || []) {
emitter.on(evName, () => {
emitter.emit(event, undefined);
emitter.removeListener(event, eventHandler);
emitter.removeListener("error", errorHandler);
done = true;
});
}
while (!done) {
if (!unconsumedEvents.isEmpty()) {
yield Promise.$resolve(unconsumedEvents.shift());
// Create AsyncGeneratorFunction which handles the Iterator logic
const iterator = async function* () {
while (!done || !unconsumedEvents.isEmpty() || !unconsumedErrors.isEmpty()) {
if (!unconsumedEvents.isEmpty()) {
yield Promise.$resolve(unconsumedEvents.shift());
} else if (!unconsumedErrors.isEmpty()) {
yield Promise.$reject(unconsumedErrors.shift());
} else {
const { promise, reject, resolve } = $newPromiseCapability(Promise);
unconsumedPromises.push({ reject, resolve });
yield promise;
}
}
if (!unconsumedErrors.isEmpty()) {
yield Promise.$reject(unconsumedErrors.shift());
}
const { promise, reject, resolve } = $newPromiseCapability(Promise);
unconsumedPromises.push({ reject, resolve });
yield promise;
}
};
// Return AsyncGenerator
return iterator();
}
function getEventListeners(emitter, type) {

View File

@@ -1226,7 +1226,7 @@ var kYanking = Symbol("_yanking");
var kYankPop = Symbol("_yankPop");
// Event symbols
var kFirstEventParam = Symbol("nodejs.kFirstEventParam");
var kFirstEventParam = SymbolFor("nodejs.kFirstEventParam");
// class InterfaceConstructor extends EventEmitter {
// #onSelfCloseWithTerminal;

View File

@@ -65,22 +65,6 @@ describe("node:events", () => {
await promise;
expect(emitter.listenerCount("hey")).toBe(0);
});
// TODO: extensive events.on tests
// test("on", () => {
// const emitter = new EventEmitter();
// const asyncIterator = EventEmitter.on(emitter, "hey");
// expect(asyncIterator.next).toBeDefined();
// expect(asyncIterator[Symbol.asyncIterator]).toBeDefined();
// const fn = async () => {
// const { value } = await asyncIterator.next();
// expect(value).toBe(1);
// };
// emitter.emit("hey", 1, 2, 3);
// });
});
describe("EventEmitter", () => {
@@ -422,6 +406,207 @@ describe("EventEmitter", () => {
});
});
describe("EventEmitter.on", () => {
test("Basic test", async () => {
const emitter = new EventEmitter();
const asyncIterator = EventEmitter.on(emitter, "hey");
expect(asyncIterator.next).toBeDefined();
expect(asyncIterator[Symbol.asyncIterator]).toBeDefined();
process.nextTick(() => {
emitter.emit("hey", 1);
});
const { value } = await asyncIterator.next();
expect(value).toEqual([1]);
});
test("Basic test with for await...of", async () => {
const emitter = new EventEmitter();
const asyncIterator = EventEmitter.on(emitter, "hey", { close: ["close"] } as any);
process.nextTick(() => {
emitter.emit("hey", 1);
emitter.emit("hey", 2);
emitter.emit("hey", 3);
emitter.emit("hey", 4);
emitter.emit("close");
});
const result = [];
for await (const ev of asyncIterator) {
result.push(ev);
}
expect(result).toEqual([[1], [2], [3], [4]]);
});
test("Stop reading events after 'close' event is emitted", async () => {
const emitter = new EventEmitter();
const asyncIterator = EventEmitter.on(emitter, "hey", { close: ["close"] } as any);
process.nextTick(() => {
emitter.emit("hey", 1);
emitter.emit("hey", 2);
emitter.emit("close");
emitter.emit("hey", 3);
});
const result = [];
for await (const ev of asyncIterator) {
result.push(ev);
}
expect(result).toEqual([[1], [2]]);
});
test("Queue events before first next() call", async () => {
const emitter = new EventEmitter();
const asyncIterator = EventEmitter.on(emitter, "hey");
emitter.emit("hey", 1);
emitter.emit("hey", 2);
emitter.emit("hey", 3);
await new Promise(resolve => setTimeout(resolve, 1000));
expect((await asyncIterator.next()).value).toEqual([1]);
expect((await asyncIterator.next()).value).toEqual([2]);
expect((await asyncIterator.next()).value).toEqual([3]);
});
test("Emit multiple values", async () => {
const emitter = new EventEmitter();
const asyncIterator = EventEmitter.on(emitter, "hey");
emitter.emit("hey", 1, 2, 3);
const { value } = await asyncIterator.next();
expect(value).toEqual([1, 2, 3]);
});
test("kFirstEventParam", async () => {
const kFirstEventParam = Symbol.for("nodejs.kFirstEventParam");
const emitter = new EventEmitter();
const asyncIterator = EventEmitter.on(emitter, "hey", { [kFirstEventParam]: true } as any);
emitter.emit("hey", 1, 2, 3);
emitter.emit("hey", [4, 5, 6]);
expect((await asyncIterator.next()).value).toBe(1);
expect((await asyncIterator.next()).value).toEqual([4, 5, 6]);
});
test("Cancel via error event", async () => {
const { on, EventEmitter } = require("node:events");
const process = require("node:process");
const ee = new EventEmitter();
const output = [];
// Emit later on
process.nextTick(() => {
ee.emit("foo", "bar");
ee.emit("foo", 42);
ee.emit("foo", "baz");
});
setTimeout(() => {
ee.emit("error", "DONE");
}, 1_000);
try {
for await (const event of on(ee, "foo")) {
output.push([1, event]);
}
} catch (error) {
output.push([2, error]);
}
expect(output).toEqual([
[1, ["bar"]],
[1, [42]],
[1, ["baz"]],
[2, "DONE"],
]);
});
test("AbortController", () => {
const { on, EventEmitter } = require("node:events");
const ac = new AbortController();
const ee = new EventEmitter();
const output = [];
process.nextTick(() => {
ee.emit("foo", "bar");
ee.emit("foo", 42);
ee.emit("foo", "baz");
});
(async () => {
try {
for await (const event of on(ee, "foo", { signal: ac.signal })) {
output.push([1, event]);
}
console.log("unreachable");
} catch (error: any) {
const { code, message } = error;
output.push([2, { code, message }]);
expect(output).toEqual([
[1, ["bar"]],
[1, [42]],
[1, ["baz"]],
[
2,
{
code: "ABORT_ERR",
message: "The operation was aborted",
},
],
]);
}
})();
process.nextTick(() => ac.abort());
});
// Checks for potential issues with FixedQueue size
test("Queue many events", async () => {
const emitter = new EventEmitter();
const asyncIterator = EventEmitter.on(emitter, "hey");
for (let i = 0; i < 2500; i += 1) {
emitter.emit("hey", i);
}
expect((await asyncIterator.next()).value).toEqual([0]);
});
test("readline.createInterface", async () => {
const { createInterface } = require("node:readline");
const { createReadStream } = require("node:fs");
const path = require("node:path");
const fpath = path.join(__filename, "..", "..", "child_process", "fixtures", "child-process-echo-options.js");
console.log(fpath);
const interfaced = createInterface(createReadStream(fpath));
const output = [];
try {
for await (const line of interfaced) {
output.push(line);
}
} catch (e) {
expect(output).toBe([
"// TODO - bun has no `send` method in the process",
"process?.send({ env: process.env });",
]);
}
});
});
describe("EventEmitter error handling", () => {
test("unhandled error event throws on emit", () => {
const myEmitter = new EventEmitter();
@@ -630,4 +815,9 @@ describe("EventEmitter constructors", () => {
const events = req("events");
new events();
});
test("in cjs, events is callable", () => {
const EventEmitter = require("events");
new EventEmitter();
});
});

View File

@@ -1,93 +0,0 @@
test("in cjs, events is callable", () => {
const EventEmitter = require("events");
new EventEmitter();
});
test("events.on", async () => {
const { on, EventEmitter } = require("node:events");
const process = require("node:process");
const ee = new EventEmitter();
const output = [];
// Emit later on
process.nextTick(() => {
ee.emit("foo", "bar");
ee.emit("foo", 42);
});
setTimeout(() => {
ee.emit("error", "DONE");
}, 1_000);
try {
for await (const event of on(ee, "foo")) {
output.push([1, event]);
}
} catch (error) {
output.push([2, error]);
}
expect(output).toEqual([
[1, ["bar"]],
[1, [42]],
[2, "DONE"],
]);
});
test("events.on with AbortController", () => {
const { on, EventEmitter } = require("node:events");
const ac = new AbortController();
const ee = new EventEmitter();
const output = [];
process.nextTick(() => {
ee.emit("foo", "bar");
ee.emit("foo", 42);
});
(async () => {
try {
for await (const event of on(ee, "foo", { signal: ac.signal })) {
output.push([1, event]);
}
console.log("unreachable");
} catch (error) {
const { code, message } = error;
output.push([2, { code, message }]);
expect(output).toEqual([
[1, ["bar"]],
[1, [42]],
[
2,
{
code: "ABORT_ERR",
message: "The operation was aborted",
},
],
]);
}
})();
process.nextTick(() => ac.abort());
});
test("readline.createInterface", async () => {
const { createInterface } = require("node:readline");
const { createReadStream } = require("node:fs");
const path = require("node:path");
const fpath = path.join(__filename, "..", "..", "child_process", "fixtures", "child-process-echo-options.js");
console.log(fpath);
const interfaced = createInterface(createReadStream(fpath));
const output = [];
try {
for await (const line of interfaced) {
output.push(line);
}
} catch (e) {
expect(output).toBe(["// TODO - bun has no `send` method in the process", "process?.send({ env: process.env });"]);
}
});

View File

@@ -1880,6 +1880,23 @@ describe("readline.createInterface()", () => {
input.write("abc\n");
});
it("should support reading-in lines via for await...of loop", async () => {
const sampleTextBuffer = new Buffer.from("Line1\nLine2\nLine3\nLine4");
const bufferStream = new PassThrough();
const rl = readline.createInterface({
input: bufferStream,
});
process.nextTick(() => {
bufferStream.end(sampleTextBuffer);
});
const result = [];
for await (const line of rl) result.push(line);
expect(result).toEqual(["Line1", "Line2", "Line3", "Line4"]);
});
it("should respond to home and end sequences for common pttys ", () => {
const input = new PassThrough();
const rl = readline.createInterface({