From 6b577a3970f270a080f9e9ca43e0223ace23d0ef Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 15 Jan 2024 18:54:37 -0800 Subject: [PATCH] implement `events.on` (#8190) * exit event loop if there's unhandled errors * move FixedQueue to a special package * require(events) is also EventEmitter * implement events.on * move fixed_queue to internal and make it lazy in node:events * use better Promise intrinsics * move $shared to internal/shared * make test not dependent on cwd --- src/bun.js/javascript.zig | 6 +- .../internal-module-registry-scanner.ts | 3 - src/js/builtins/ProcessObjectInternals.ts | 126 +---------------- src/js/internal/fixed_queue.ts | 127 ++++++++++++++++++ src/js/node/cluster.ts | 2 +- src/js/node/dgram.ts | 2 +- src/js/node/events.js | 53 +++++++- src/js/node/http2.ts | 2 +- src/js/node/inspector.ts | 2 +- src/js/node/repl.ts | 2 +- src/js/node/v8.ts | 2 +- src/js/node/vm.ts | 2 +- src/js/tsconfig.json | 1 - test/js/node/events/events-cjs.test.js | 93 ++++++++++++- 14 files changed, 279 insertions(+), 144 deletions(-) create mode 100644 src/js/internal/fixed_queue.ts diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 9235a5a7b4..306308d939 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -655,9 +655,9 @@ pub const VirtualMachine = struct { } pub fn isEventLoopAlive(vm: *const VirtualMachine) bool { - return vm.event_loop_handle.?.isActive() or (vm.active_tasks + - vm.event_loop.tasks.count + - vm.event_loop.immediate_tasks.count + vm.event_loop.next_immediate_tasks.count > 0); + return vm.unhandled_error_counter == 0 and + (vm.event_loop_handle.?.isActive() or + vm.active_tasks + vm.event_loop.tasks.count + vm.event_loop.immediate_tasks.count + vm.event_loop.next_immediate_tasks.count > 0); } pub fn wakeup(this: *VirtualMachine) void { diff --git a/src/codegen/internal-module-registry-scanner.ts b/src/codegen/internal-module-registry-scanner.ts index 7bc2f9bddd..0c98d5004b 100644 --- a/src/codegen/internal-module-registry-scanner.ts +++ b/src/codegen/internal-module-registry-scanner.ts @@ -55,9 +55,6 @@ export function createInternalModuleRegistry(basedir: string) { } const requireTransformer = (specifier: string, from: string) => { - // this one is deprecated - if (specifier === "$shared") specifier = "./internal/shared.ts"; - const directMatch = internalRegistry.get(specifier); if (directMatch) return codegenRequireId(`${directMatch}/*${specifier}*/`); diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index f9736608dd..48495efdeb 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -258,130 +258,8 @@ export function initializeNextTickQueue(process, nextTickQueue, drainMicrotasksF var setup; setup = () => { - queue = (function createQueue() { - // Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two. - const kSize = 2048; - const kMask = kSize - 1; - - // The FixedQueue is implemented as a singly-linked list of fixed-size - // circular buffers. It looks something like this: - // - // head tail - // | | - // v v - // +-----------+ <-----\ +-----------+ <------\ +-----------+ - // | [null] | \----- | next | \------- | next | - // +-----------+ +-----------+ +-----------+ - // | item | <-- bottom | item | <-- bottom | [empty] | - // | item | | item | | [empty] | - // | item | | item | | [empty] | - // | item | | item | | [empty] | - // | item | | item | bottom --> | item | - // | item | | item | | item | - // | ... | | ... | | ... | - // | item | | item | | item | - // | item | | item | | item | - // | [empty] | <-- top | item | | item | - // | [empty] | | item | | item | - // | [empty] | | [empty] | <-- top top --> | [empty] | - // +-----------+ +-----------+ +-----------+ - // - // Or, if there is only one circular buffer, it looks something - // like either of these: - // - // head tail head tail - // | | | | - // v v v v - // +-----------+ +-----------+ - // | [null] | | [null] | - // +-----------+ +-----------+ - // | [empty] | | item | - // | [empty] | | item | - // | item | <-- bottom top --> | [empty] | - // | item | | [empty] | - // | [empty] | <-- top bottom --> | item | - // | [empty] | | item | - // +-----------+ +-----------+ - // - // Adding a value means moving `top` forward by one, removing means - // moving `bottom` forward by one. After reaching the end, the queue - // wraps around. - // - // When `top === bottom` the current queue is empty and when - // `top + 1 === bottom` it's full. This wastes a single space of storage - // but allows much quicker checks. - - class FixedCircularBuffer { - top: number; - bottom: number; - list: Array; - next: FixedCircularBuffer | null; - - constructor() { - this.bottom = 0; - this.top = 0; - this.list = $newArrayWithSize(kSize); - this.next = null; - } - - isEmpty() { - return this.top === this.bottom; - } - - isFull() { - return ((this.top + 1) & kMask) === this.bottom; - } - - push(data) { - this.list[this.top] = data; - this.top = (this.top + 1) & kMask; - } - - shift() { - var { list, bottom } = this; - const nextItem = list[bottom]; - if (nextItem === undefined) return null; - list[bottom] = undefined; - this.bottom = (bottom + 1) & kMask; - return nextItem; - } - } - - class FixedQueue { - head: FixedCircularBuffer; - tail: FixedCircularBuffer; - - constructor() { - this.head = this.tail = new FixedCircularBuffer(); - } - - isEmpty() { - return this.head.isEmpty(); - } - - push(data) { - if (this.head.isFull()) { - // Head is full: Creates a new queue, sets the old queue's `.next` to it, - // and sets it as the new main queue. - this.head = this.head.next = new FixedCircularBuffer(); - } - this.head.push(data); - } - - shift() { - const tail = this.tail; - const next = tail.shift(); - if (tail.isEmpty() && tail.next !== null) { - // If there is another queue, it forms the new tail. - this.tail = tail.next; - tail.next = null; - } - return next; - } - } - - return new FixedQueue(); - })(); + const { FixedQueue } = require("internal/fixed_queue"); + queue = new FixedQueue(); function processTicksAndRejections() { var tock; diff --git a/src/js/internal/fixed_queue.ts b/src/js/internal/fixed_queue.ts new file mode 100644 index 0000000000..767eabe32f --- /dev/null +++ b/src/js/internal/fixed_queue.ts @@ -0,0 +1,127 @@ +// https://github.com/nodejs/node/blob/bae03c4e30f927676203f61ff5a34fe0a0c0bbc9/lib/internal/fixed_queue.js + +// Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two. +const kSize = 2048; +const kMask = kSize - 1; + +// The FixedQueue is implemented as a singly-linked list of fixed-size +// circular buffers. It looks something like this: +// +// head tail +// | | +// v v +// +-----------+ <-----\ +-----------+ <------\ +-----------+ +// | [null] | \----- | next | \------- | next | +// +-----------+ +-----------+ +-----------+ +// | item | <-- bottom | item | <-- bottom | [empty] | +// | item | | item | | [empty] | +// | item | | item | | [empty] | +// | item | | item | | [empty] | +// | item | | item | bottom --> | item | +// | item | | item | | item | +// | ... | | ... | | ... | +// | item | | item | | item | +// | item | | item | | item | +// | [empty] | <-- top | item | | item | +// | [empty] | | item | | item | +// | [empty] | | [empty] | <-- top top --> | [empty] | +// +-----------+ +-----------+ +-----------+ +// +// Or, if there is only one circular buffer, it looks something +// like either of these: +// +// head tail head tail +// | | | | +// v v v v +// +-----------+ +-----------+ +// | [null] | | [null] | +// +-----------+ +-----------+ +// | [empty] | | item | +// | [empty] | | item | +// | item | <-- bottom top --> | [empty] | +// | item | | [empty] | +// | [empty] | <-- top bottom --> | item | +// | [empty] | | item | +// +-----------+ +-----------+ +// +// Adding a value means moving `top` forward by one, removing means +// moving `bottom` forward by one. After reaching the end, the queue +// wraps around. +// +// When `top === bottom` the current queue is empty and when +// `top + 1 === bottom` it's full. This wastes a single space of storage +// but allows much quicker checks. + +class FixedCircularBuffer { + top: number; + bottom: number; + list: Array; + next: FixedCircularBuffer | null; + + constructor() { + this.bottom = 0; + this.top = 0; + this.list = $newArrayWithSize(kSize); + this.next = null; + } + + isEmpty() { + return this.top === this.bottom; + } + + isFull() { + return ((this.top + 1) & kMask) === this.bottom; + } + + push(data) { + this.list[this.top] = data; + this.top = (this.top + 1) & kMask; + } + + shift() { + var { list, bottom } = this; + const nextItem = list[bottom]; + if (nextItem === undefined) return null; + list[bottom] = undefined; + this.bottom = (bottom + 1) & kMask; + return nextItem; + } +} + +class FixedQueue { + head: FixedCircularBuffer; + tail: FixedCircularBuffer; + + constructor() { + this.head = this.tail = new FixedCircularBuffer(); + } + + isEmpty() { + return this.head.isEmpty(); + } + + push(data: T) { + if (this.head.isFull()) { + // Head is full: Creates a new queue, sets the old queue's `.next` to it, + // and sets it as the new main queue. + this.head = this.head.next = new FixedCircularBuffer(); + } + this.head.push(data); + } + + shift() { + const tail = this.tail; + const next = tail.shift(); + if (tail.isEmpty() && tail.next !== null) { + // If there is another queue, it forms the new tail. + this.tail = tail.next; + tail.next = null; + } + return next; + } +} + +export default { + FixedCircularBuffer, + FixedQueue, +}; diff --git a/src/js/node/cluster.ts b/src/js/node/cluster.ts index 8c1ebdd097..3b71f7439c 100644 --- a/src/js/node/cluster.ts +++ b/src/js/node/cluster.ts @@ -3,7 +3,7 @@ // We leave it in here to provide a better error message // TODO: implement node cluster const EventEmitter = require("node:events"); -const { throwNotImplemented } = require("$shared"); +const { throwNotImplemented } = require("internal/shared"); // TODO: is it okay for this to be a class? class Cluster extends EventEmitter { diff --git a/src/js/node/dgram.ts b/src/js/node/dgram.ts index dd6f2895d4..b7abe9156a 100644 --- a/src/js/node/dgram.ts +++ b/src/js/node/dgram.ts @@ -1,6 +1,6 @@ // Hardcoded module "node:dgram" // This is a stub! None of this is actually implemented yet. -const { hideFromStack, throwNotImplemented } = require("$shared"); +const { hideFromStack, throwNotImplemented } = require("internal/shared"); function createSocket() { throwNotImplemented("node:dgram createSocket", 1630); diff --git a/src/js/node/events.js b/src/js/node/events.js index 0c74345184..ba7bf39154 100644 --- a/src/js/node/events.js +++ b/src/js/node/events.js @@ -1,8 +1,10 @@ // Reimplementation of https://nodejs.org/api/events.html + // Reference: https://github.com/nodejs/node/blob/main/lib/events.js -const { throwNotImplemented } = require("$shared"); +const { throwNotImplemented } = require("internal/shared"); const SymbolFor = Symbol.for; + const kCapture = Symbol("kCapture"); const kErrorMonitor = SymbolFor("events.errorMonitor"); const kMaxEventTargetListeners = Symbol("events.maxEventTargetListeners"); @@ -354,9 +356,52 @@ function once(emitter, type, options) { }); } -function on(emitter, type, options) { - var { signal, close, highWatermark = Number.MAX_SAFE_INTEGER, lowWatermark = 1 } = options || {}; - throwNotImplemented("events.on", 2679); +async function* on(emitter, event, options = {}) { + const signal = options.signal; + if (signal?.aborted) throw new AbortError(undefined, { cause: signal?.reason }); + + const { FixedQueue } = require("internal/fixed_queue"); + const unconsumedPromises = new FixedQueue(); + const unconsumedEvents = new FixedQueue(); + const unconsumedErrors = new FixedQueue(); + let done = false; + + emitter.on(event, ev => { + if (!unconsumedPromises.isEmpty()) { + const { resolve } = unconsumedPromises.shift(); + return resolve([ev]); + } + unconsumedEvents.push([ev]); + }); + emitter.on("error", ex => { + if (!unconsumedPromises.isEmpty()) { + const { reject } = unconsumedPromises.shift(); + return reject(ex); + } + unconsumedErrors.push(ex); + }); + signal?.addEventListener("abort", () => { + emitter.emit("error", new AbortError(undefined, { cause: signal?.reason })); + }); + + for (const evName of options?.close || []) { + emitter.on(evName, () => { + emitter.emit(event, undefined); + done = true; + }); + } + + while (!done) { + if (!unconsumedEvents.isEmpty()) { + yield Promise.$resolve(unconsumedEvents.shift()); + } + if (!unconsumedErrors.isEmpty()) { + yield Promise.$reject(unconsumedErrors.shift()); + } + const { promise, reject, resolve } = $newPromiseCapability(Promise); + unconsumedPromises.push({ reject, resolve }); + yield promise; + } } function getEventListeners(emitter, type) { diff --git a/src/js/node/http2.ts b/src/js/node/http2.ts index c0f030d94a..4375f15869 100644 --- a/src/js/node/http2.ts +++ b/src/js/node/http2.ts @@ -3,7 +3,7 @@ const { isTypedArray } = require("node:util/types"); // This is a stub! None of this is actually implemented yet. -const { hideFromStack, throwNotImplemented } = require("$shared"); +const { hideFromStack, throwNotImplemented } = require("internal/shared"); const tls = require("node:tls"); const net = require("node:net"); diff --git a/src/js/node/inspector.ts b/src/js/node/inspector.ts index 1fd7316f74..4e32e6e428 100644 --- a/src/js/node/inspector.ts +++ b/src/js/node/inspector.ts @@ -1,6 +1,6 @@ // Hardcoded module "node:inspector" and "node:inspector/promises" // This is a stub! None of this is actually implemented yet. -const { hideFromStack, throwNotImplemented } = require("$shared"); +const { hideFromStack, throwNotImplemented } = require("internal/shared"); const EventEmitter = require("node:events"); function open() { diff --git a/src/js/node/repl.ts b/src/js/node/repl.ts index 10ca90e391..d613731db3 100644 --- a/src/js/node/repl.ts +++ b/src/js/node/repl.ts @@ -1,7 +1,7 @@ // Hardcoded module "node:repl" // This is a stub! None of this is actually implemented yet. // It only exists to make some packages which import this module work. -const { throwNotImplemented } = require("$shared"); +const { throwNotImplemented } = require("internal/shared"); function REPLServer() { throwNotImplemented("node:repl REPLServer"); diff --git a/src/js/node/v8.ts b/src/js/node/v8.ts index 2aa49451fd..a4968eebc6 100644 --- a/src/js/node/v8.ts +++ b/src/js/node/v8.ts @@ -1,6 +1,6 @@ // Hardcoded module "node:v8" // This is a stub! None of this is actually implemented yet. -const { hideFromStack, throwNotImplemented } = require("$shared"); +const { hideFromStack, throwNotImplemented } = require("internal/shared"); const jsc: typeof import("bun:jsc") = require("bun:jsc"); function notimpl(message) { diff --git a/src/js/node/vm.ts b/src/js/node/vm.ts index e3058780ae..7d2b5aa2cf 100644 --- a/src/js/node/vm.ts +++ b/src/js/node/vm.ts @@ -1,5 +1,5 @@ // Hardcoded module "node:vm" -const { throwNotImplemented } = require("$shared"); +const { throwNotImplemented } = require("internal/shared"); const vm = $lazy("vm"); diff --git a/src/js/tsconfig.json b/src/js/tsconfig.json index fdca4935e1..4d278a7c62 100644 --- a/src/js/tsconfig.json +++ b/src/js/tsconfig.json @@ -7,7 +7,6 @@ "noEmit": true, "emitDeclarationOnly": false, "paths": { - "$shared": ["./internal/shared.ts"], //deprecated "internal/*": ["./internal/*"] //deprecated } }, diff --git a/test/js/node/events/events-cjs.test.js b/test/js/node/events/events-cjs.test.js index 5bee9979fc..1f2e3399b0 100644 --- a/test/js/node/events/events-cjs.test.js +++ b/test/js/node/events/events-cjs.test.js @@ -1,4 +1,93 @@ test("in cjs, events is callable", () => { - const events = require("events"); - new events(); + const EventEmitter = require("events"); + new EventEmitter(); +}); + +test("events.on", async () => { + const { on, EventEmitter } = require("node:events"); + const process = require("node:process"); + + const ee = new EventEmitter(); + const output = []; + + // Emit later on + process.nextTick(() => { + ee.emit("foo", "bar"); + ee.emit("foo", 42); + }); + + setTimeout(() => { + ee.emit("error", "DONE"); + }, 1_000); + + try { + for await (const event of on(ee, "foo")) { + output.push([1, event]); + } + } catch (error) { + output.push([2, error]); + } + + expect(output).toEqual([ + [1, ["bar"]], + [1, [42]], + [2, "DONE"], + ]); +}); + +test("events.on with AbortController", () => { + const { on, EventEmitter } = require("node:events"); + + const ac = new AbortController(); + const ee = new EventEmitter(); + const output = []; + + process.nextTick(() => { + ee.emit("foo", "bar"); + ee.emit("foo", 42); + }); + (async () => { + try { + for await (const event of on(ee, "foo", { signal: ac.signal })) { + output.push([1, event]); + } + console.log("unreachable"); + } catch (error) { + const { code, message } = error; + output.push([2, { code, message }]); + + expect(output).toEqual([ + [1, ["bar"]], + [1, [42]], + [ + 2, + { + code: "ABORT_ERR", + message: "The operation was aborted", + }, + ], + ]); + } + })(); + + process.nextTick(() => ac.abort()); +}); + +test("readline.createInterface", async () => { + const { createInterface } = require("node:readline"); + const { createReadStream } = require("node:fs"); + const path = require("node:path"); + + const fpath = path.join(__filename, "..", "..", "child_process", "fixtures", "child-process-echo-options.js"); + console.log(fpath); + const interfaced = createInterface(createReadStream(fpath)); + const output = []; + + try { + for await (const line of interfaced) { + output.push(line); + } + } catch (e) { + expect(output).toBe(["// TODO - bun has no `send` method in the process", "process?.send({ env: process.env });"]); + } });