diff --git a/src/js/node/events.js b/src/js/node/events.js index ba7bf39154..1202347455 100644 --- a/src/js/node/events.js +++ b/src/js/node/events.js @@ -11,6 +11,7 @@ const kMaxEventTargetListeners = Symbol("events.maxEventTargetListeners"); const kMaxEventTargetListenersWarned = Symbol("events.maxEventTargetListenersWarned"); const kWatermarkData = SymbolFor("nodejs.watermarkData"); const kRejection = SymbolFor("nodejs.rejection"); +const kFirstEventParam = SymbolFor("nodejs.kFirstEventParam"); const captureRejectionSymbol = SymbolFor("nodejs.rejection"); const ArrayPrototypeSlice = Array.prototype.slice; @@ -356,9 +357,8 @@ function once(emitter, type, options) { }); } -async function* on(emitter, event, options = {}) { +function on(emitter, event, options = {}) { const signal = options.signal; - if (signal?.aborted) throw new AbortError(undefined, { cause: signal?.reason }); const { FixedQueue } = require("internal/fixed_queue"); const unconsumedPromises = new FixedQueue(); @@ -366,42 +366,58 @@ async function* on(emitter, event, options = {}) { const unconsumedErrors = new FixedQueue(); let done = false; - emitter.on(event, ev => { + const eventHandlerBody = ev => { + // If there is a pending Promise -> resolve with current event value. if (!unconsumedPromises.isEmpty()) { const { resolve } = unconsumedPromises.shift(); - return resolve([ev]); + return resolve(ev); } - unconsumedEvents.push([ev]); - }); - emitter.on("error", ex => { + // Else: Add event value to queue so it can be consumed by a future Promise. + unconsumedEvents.push(ev); + }; + const eventHandler = options[kFirstEventParam] ? eventHandlerBody : (...args) => eventHandlerBody(args); + emitter.on(event, eventHandler); + + const errorHandler = ex => { if (!unconsumedPromises.isEmpty()) { const { reject } = unconsumedPromises.shift(); return reject(ex); } unconsumedErrors.push(ex); - }); + }; + emitter.on("error", errorHandler); + signal?.addEventListener("abort", () => { emitter.emit("error", new AbortError(undefined, { cause: signal?.reason })); }); + // If any of the close events is emitted -> remove listeners + // and yield only the remaining queued-up values in iterator. for (const evName of options?.close || []) { emitter.on(evName, () => { - emitter.emit(event, undefined); + emitter.removeListener(event, eventHandler); + emitter.removeListener("error", errorHandler); done = true; }); } - while (!done) { - if (!unconsumedEvents.isEmpty()) { - yield Promise.$resolve(unconsumedEvents.shift()); + // Create AsyncGeneratorFunction which handles the Iterator logic + const iterator = async function* () { + while (!done || !unconsumedEvents.isEmpty() || !unconsumedErrors.isEmpty()) { + if (!unconsumedEvents.isEmpty()) { + yield Promise.$resolve(unconsumedEvents.shift()); + } else if (!unconsumedErrors.isEmpty()) { + yield Promise.$reject(unconsumedErrors.shift()); + } else { + const { promise, reject, resolve } = $newPromiseCapability(Promise); + unconsumedPromises.push({ reject, resolve }); + yield promise; + } } - if (!unconsumedErrors.isEmpty()) { - yield Promise.$reject(unconsumedErrors.shift()); - } - const { promise, reject, resolve } = $newPromiseCapability(Promise); - unconsumedPromises.push({ reject, resolve }); - yield promise; - } + }; + + // Return AsyncGenerator + return iterator(); } function getEventListeners(emitter, type) { diff --git a/src/js/node/readline.js b/src/js/node/readline.js index 1205020425..7536b80172 100644 --- a/src/js/node/readline.js +++ b/src/js/node/readline.js @@ -1226,7 +1226,7 @@ var kYanking = Symbol("_yanking"); var kYankPop = Symbol("_yankPop"); // Event symbols -var kFirstEventParam = Symbol("nodejs.kFirstEventParam"); +var kFirstEventParam = SymbolFor("nodejs.kFirstEventParam"); // class InterfaceConstructor extends EventEmitter { // #onSelfCloseWithTerminal; diff --git a/test/js/node/events/event-emitter.test.ts b/test/js/node/events/event-emitter.test.ts index 687e90910e..9ff1061b77 100644 --- a/test/js/node/events/event-emitter.test.ts +++ b/test/js/node/events/event-emitter.test.ts @@ -65,22 +65,6 @@ describe("node:events", () => { await promise; expect(emitter.listenerCount("hey")).toBe(0); }); - - // TODO: extensive events.on tests - // test("on", () => { - // const emitter = new EventEmitter(); - // const asyncIterator = EventEmitter.on(emitter, "hey"); - - // expect(asyncIterator.next).toBeDefined(); - // expect(asyncIterator[Symbol.asyncIterator]).toBeDefined(); - - // const fn = async () => { - // const { value } = await asyncIterator.next(); - // expect(value).toBe(1); - // }; - - // emitter.emit("hey", 1, 2, 3); - // }); }); describe("EventEmitter", () => { @@ -422,6 +406,207 @@ describe("EventEmitter", () => { }); }); +describe("EventEmitter.on", () => { + test("Basic test", async () => { + const emitter = new EventEmitter(); + const asyncIterator = EventEmitter.on(emitter, "hey"); + + expect(asyncIterator.next).toBeDefined(); + expect(asyncIterator[Symbol.asyncIterator]).toBeDefined(); + + process.nextTick(() => { + emitter.emit("hey", 1); + }); + + const { value } = await asyncIterator.next(); + expect(value).toEqual([1]); + }); + + test("Basic test with for await...of", async () => { + const emitter = new EventEmitter(); + const asyncIterator = EventEmitter.on(emitter, "hey", { close: ["close"] } as any); + + process.nextTick(() => { + emitter.emit("hey", 1); + emitter.emit("hey", 2); + emitter.emit("hey", 3); + emitter.emit("hey", 4); + emitter.emit("close"); + }); + + const result = []; + for await (const ev of asyncIterator) { + result.push(ev); + } + + expect(result).toEqual([[1], [2], [3], [4]]); + }); + + test("Stop reading events after 'close' event is emitted", async () => { + const emitter = new EventEmitter(); + const asyncIterator = EventEmitter.on(emitter, "hey", { close: ["close"] } as any); + + process.nextTick(() => { + emitter.emit("hey", 1); + emitter.emit("hey", 2); + emitter.emit("close"); + emitter.emit("hey", 3); + }); + + const result = []; + for await (const ev of asyncIterator) { + result.push(ev); + } + + expect(result).toEqual([[1], [2]]); + }); + + test("Queue events before first next() call", async () => { + const emitter = new EventEmitter(); + const asyncIterator = EventEmitter.on(emitter, "hey"); + + emitter.emit("hey", 1); + emitter.emit("hey", 2); + emitter.emit("hey", 3); + + await new Promise(resolve => setTimeout(resolve, 1000)); + + expect((await asyncIterator.next()).value).toEqual([1]); + expect((await asyncIterator.next()).value).toEqual([2]); + expect((await asyncIterator.next()).value).toEqual([3]); + }); + + test("Emit multiple values", async () => { + const emitter = new EventEmitter(); + const asyncIterator = EventEmitter.on(emitter, "hey"); + + emitter.emit("hey", 1, 2, 3); + + const { value } = await asyncIterator.next(); + expect(value).toEqual([1, 2, 3]); + }); + + test("kFirstEventParam", async () => { + const kFirstEventParam = Symbol.for("nodejs.kFirstEventParam"); + const emitter = new EventEmitter(); + const asyncIterator = EventEmitter.on(emitter, "hey", { [kFirstEventParam]: true } as any); + + emitter.emit("hey", 1, 2, 3); + emitter.emit("hey", [4, 5, 6]); + + expect((await asyncIterator.next()).value).toBe(1); + expect((await asyncIterator.next()).value).toEqual([4, 5, 6]); + }); + + test("Cancel via error event", async () => { + const { on, EventEmitter } = require("node:events"); + const process = require("node:process"); + + const ee = new EventEmitter(); + const output = []; + + // Emit later on + process.nextTick(() => { + ee.emit("foo", "bar"); + ee.emit("foo", 42); + ee.emit("foo", "baz"); + }); + + setTimeout(() => { + ee.emit("error", "DONE"); + }, 1_000); + + try { + for await (const event of on(ee, "foo")) { + output.push([1, event]); + } + } catch (error) { + output.push([2, error]); + } + + expect(output).toEqual([ + [1, ["bar"]], + [1, [42]], + [1, ["baz"]], + [2, "DONE"], + ]); + }); + + test("AbortController", () => { + const { on, EventEmitter } = require("node:events"); + + const ac = new AbortController(); + const ee = new EventEmitter(); + const output = []; + + process.nextTick(() => { + ee.emit("foo", "bar"); + ee.emit("foo", 42); + ee.emit("foo", "baz"); + }); + (async () => { + try { + for await (const event of on(ee, "foo", { signal: ac.signal })) { + output.push([1, event]); + } + console.log("unreachable"); + } catch (error: any) { + const { code, message } = error; + output.push([2, { code, message }]); + + expect(output).toEqual([ + [1, ["bar"]], + [1, [42]], + [1, ["baz"]], + [ + 2, + { + code: "ABORT_ERR", + message: "The operation was aborted", + }, + ], + ]); + } + })(); + + process.nextTick(() => ac.abort()); + }); + + // Checks for potential issues with FixedQueue size + test("Queue many events", async () => { + const emitter = new EventEmitter(); + const asyncIterator = EventEmitter.on(emitter, "hey"); + + for (let i = 0; i < 2500; i += 1) { + emitter.emit("hey", i); + } + + expect((await asyncIterator.next()).value).toEqual([0]); + }); + + test("readline.createInterface", async () => { + const { createInterface } = require("node:readline"); + const { createReadStream } = require("node:fs"); + const path = require("node:path"); + + const fpath = path.join(__filename, "..", "..", "child_process", "fixtures", "child-process-echo-options.js"); + console.log(fpath); + const interfaced = createInterface(createReadStream(fpath)); + const output = []; + + try { + for await (const line of interfaced) { + output.push(line); + } + } catch (e) { + expect(output).toBe([ + "// TODO - bun has no `send` method in the process", + "process?.send({ env: process.env });", + ]); + } + }); +}); + describe("EventEmitter error handling", () => { test("unhandled error event throws on emit", () => { const myEmitter = new EventEmitter(); @@ -630,4 +815,9 @@ describe("EventEmitter constructors", () => { const events = req("events"); new events(); }); + + test("in cjs, events is callable", () => { + const EventEmitter = require("events"); + new EventEmitter(); + }); }); diff --git a/test/js/node/events/events-cjs.test.js b/test/js/node/events/events-cjs.test.js deleted file mode 100644 index 1f2e3399b0..0000000000 --- a/test/js/node/events/events-cjs.test.js +++ /dev/null @@ -1,93 +0,0 @@ -test("in cjs, events is callable", () => { - const EventEmitter = require("events"); - new EventEmitter(); -}); - -test("events.on", async () => { - const { on, EventEmitter } = require("node:events"); - const process = require("node:process"); - - const ee = new EventEmitter(); - const output = []; - - // Emit later on - process.nextTick(() => { - ee.emit("foo", "bar"); - ee.emit("foo", 42); - }); - - setTimeout(() => { - ee.emit("error", "DONE"); - }, 1_000); - - try { - for await (const event of on(ee, "foo")) { - output.push([1, event]); - } - } catch (error) { - output.push([2, error]); - } - - expect(output).toEqual([ - [1, ["bar"]], - [1, [42]], - [2, "DONE"], - ]); -}); - -test("events.on with AbortController", () => { - const { on, EventEmitter } = require("node:events"); - - const ac = new AbortController(); - const ee = new EventEmitter(); - const output = []; - - process.nextTick(() => { - ee.emit("foo", "bar"); - ee.emit("foo", 42); - }); - (async () => { - try { - for await (const event of on(ee, "foo", { signal: ac.signal })) { - output.push([1, event]); - } - console.log("unreachable"); - } catch (error) { - const { code, message } = error; - output.push([2, { code, message }]); - - expect(output).toEqual([ - [1, ["bar"]], - [1, [42]], - [ - 2, - { - code: "ABORT_ERR", - message: "The operation was aborted", - }, - ], - ]); - } - })(); - - process.nextTick(() => ac.abort()); -}); - -test("readline.createInterface", async () => { - const { createInterface } = require("node:readline"); - const { createReadStream } = require("node:fs"); - const path = require("node:path"); - - const fpath = path.join(__filename, "..", "..", "child_process", "fixtures", "child-process-echo-options.js"); - console.log(fpath); - const interfaced = createInterface(createReadStream(fpath)); - const output = []; - - try { - for await (const line of interfaced) { - output.push(line); - } - } catch (e) { - expect(output).toBe(["// TODO - bun has no `send` method in the process", "process?.send({ env: process.env });"]); - } -}); diff --git a/test/js/node/readline/readline.node.test.ts b/test/js/node/readline/readline.node.test.ts index a30afcc829..7f4b270344 100644 --- a/test/js/node/readline/readline.node.test.ts +++ b/test/js/node/readline/readline.node.test.ts @@ -1880,6 +1880,23 @@ describe("readline.createInterface()", () => { input.write("abc\n"); }); + it("should support reading-in lines via for await...of loop", async () => { + const sampleTextBuffer = new Buffer.from("Line1\nLine2\nLine3\nLine4"); + const bufferStream = new PassThrough(); + + const rl = readline.createInterface({ + input: bufferStream, + }); + + process.nextTick(() => { + bufferStream.end(sampleTextBuffer); + }); + + const result = []; + for await (const line of rl) result.push(line); + expect(result).toEqual(["Line1", "Line2", "Line3", "Line4"]); + }); + it("should respond to home and end sequences for common pttys ", () => { const input = new PassThrough(); const rl = readline.createInterface({