Files
bun.sh/src/js/node/worker_threads.ts
Jérôme Benoit 85a0d71c52 fix(worker_threads): off by one on threadId (#6671)
* fix(worker_threads): off by one on threadId

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: refine worker_threads threadId consistency test

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: improve worker_threads tests

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: fix worker_threads threadId consistency test

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: simplify worker_threads threadId consistency test

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: refine worker_threads threadId consistency test

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>

* refactor: cleanup import on worker_threads tests

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: switch worker_threads worker to TS

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: revert wrong refactoring

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: format

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: port worker_threads worker to ESM

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: cleanup worker_threads test

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>

* test: improve worker_threads coverage

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>

---------

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
2023-10-24 17:05:22 -07:00

337 lines
7.5 KiB
TypeScript

// import type { Readable, Writable } from "node:stream";
// import type { WorkerOptions } from "node:worker_threads";
declare const self: typeof globalThis;
type WebWorker = InstanceType<typeof globalThis.Worker>;
const EventEmitter = require("node:events");
const { throwNotImplemented } = require("../internal/shared");
const { MessageChannel, BroadcastChannel, Worker: WebWorker } = globalThis;
const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV");
const isMainThread = Bun.isMainThread;
let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads");
type NodeWorkerOptions = import("node:worker_threads").WorkerOptions;
const emittedWarnings = new Set();
function emitWarning(type, message) {
if (emittedWarnings.has(type)) return;
emittedWarnings.add(type);
// process.emitWarning(message); // our printing is bad
console.warn("[bun] Warning:", message);
}
function injectFakeEmitter(Class) {
function messageEventHandler(event: MessageEvent) {
return event.data;
}
function errorEventHandler(event: ErrorEvent) {
return event.error;
}
const wrappedListener = Symbol("wrappedListener");
function wrapped(run, listener) {
const callback = function (event) {
return listener(run(event));
};
listener[wrappedListener] = callback;
return callback;
}
function functionForEventType(event, listener) {
switch (event) {
case "error":
case "messageerror": {
return wrapped(errorEventHandler, listener);
}
default: {
return wrapped(messageEventHandler, listener);
}
}
}
Class.prototype.on = function (event, listener) {
this.addEventListener(event, functionForEventType(event, listener));
return this;
};
Class.prototype.off = function (event, listener) {
if (listener) {
this.removeEventListener(event, listener[wrappedListener] || listener);
} else {
this.removeEventListener(event);
}
return this;
};
Class.prototype.once = function (event, listener) {
this.addEventListener(event, functionForEventType(event, listener), { once: true });
return this;
};
function EventClass(eventName) {
if (eventName === "error" || eventName === "messageerror") {
return ErrorEvent;
}
return MessageEvent;
}
Class.prototype.emit = function (event, ...args) {
this.dispatchEvent(new (EventClass(event))(event, ...args));
return this;
};
Class.prototype.prependListener = Class.prototype.on;
Class.prototype.prependOnceListener = Class.prototype.once;
}
const _MessagePort = globalThis.MessagePort;
injectFakeEmitter(_MessagePort);
const MessagePort = _MessagePort;
let resourceLimits = {};
let workerData = _workerData;
let threadId = _threadId;
function receiveMessageOnPort(port: MessagePort) {
let res = _receiveMessageOnPort(port);
if (!res) return undefined;
return {
message: res,
};
}
function fakeParentPort() {
const fake = Object.create(MessagePort.prototype);
Object.defineProperty(fake, "onmessage", {
get() {
return self.onmessage;
},
set(value) {
self.onmessage = value;
},
});
Object.defineProperty(fake, "onmessageerror", {
get() {
return self.onmessageerror;
},
set(value) {},
});
Object.defineProperty(fake, "postMessage", {
value(...args: [any, any]) {
return self.postMessage(...args);
},
});
Object.defineProperty(fake, "close", {
value() {
return process.exit(0);
},
});
Object.defineProperty(fake, "start", {
value() {},
});
Object.defineProperty(fake, "unref", {
value() {},
});
Object.defineProperty(fake, "ref", {
value() {},
});
Object.defineProperty(fake, "hasRef", {
value() {
return false;
},
});
Object.defineProperty(fake, "setEncoding", {
value() {},
});
Object.defineProperty(fake, "addEventListener", {
value: self.addEventListener.bind(self),
});
Object.defineProperty(fake, "removeEventListener", {
value: self.removeEventListener.bind(self),
});
return fake;
}
let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
function getEnvironmentData() {
return process.env;
}
function setEnvironmentData(env: any) {
process.env = env;
}
function markAsUntransferable() {
throwNotImplemented("worker_threads.markAsUntransferable");
}
function moveMessagePortToContext() {
throwNotImplemented("worker_threads.moveMessagePortToContext");
}
const unsupportedOptions = [
"eval",
"argv",
"execArgv",
"stdin",
"stdout",
"stderr",
"trackedUnmanagedFds",
"resourceLimits",
];
class Worker extends EventEmitter {
#worker: WebWorker;
#performance;
// this is used by wt.Worker.terminate();
// either is the exit code if exited, a promise resolving to the exit code, or undefined if we haven't sent .terminate() yet
#onExitPromise: Promise<number> | number | undefined = undefined;
constructor(filename: string, options: NodeWorkerOptions = {}) {
super();
for (const key of unsupportedOptions) {
if (key in options) {
emitWarning("option." + key, `worker_threads.Worker option "${key}" is not implemented.`);
}
}
this.#worker = new WebWorker(filename, options);
this.#worker.addEventListener("close", this.#onClose.bind(this));
this.#worker.addEventListener("error", this.#onError.bind(this));
this.#worker.addEventListener("message", this.#onMessage.bind(this));
this.#worker.addEventListener("messageerror", this.#onMessageError.bind(this));
this.#worker.addEventListener("open", this.#onOpen.bind(this));
}
get threadId() {
return this.#worker.threadId;
}
ref() {
this.#worker.ref();
}
unref() {
this.#worker.unref();
}
get stdin() {
// TODO:
return null;
}
get stdout() {
// TODO:
return null;
}
get stderr() {
// TODO:
return null;
}
get performance() {
return (this.#performance ??= {
eventLoopUtilization() {
emitWarning("performance", "worker_threads.Worker.performance is not implemented.");
return {
idle: 0,
active: 0,
utilization: 0,
};
},
});
}
terminate() {
const onExitPromise = this.#onExitPromise;
if (onExitPromise) {
return $isPromise(onExitPromise) ? onExitPromise : Promise.resolve(onExitPromise);
}
const { resolve, promise } = Promise.withResolvers();
this.#worker.addEventListener(
"close",
event => {
resolve(event.code);
},
{ once: true },
);
this.#worker.terminate();
return (this.#onExitPromise = promise);
}
postMessage(...args: [any, any]) {
return this.#worker.postMessage(...args);
}
#onClose(e) {
this.#onExitPromise = e.code;
this.emit("exit", e.code);
}
#onError(error: ErrorEvent) {
this.emit("error", error);
}
#onMessage(event: MessageEvent) {
// TODO: is this right?
this.emit("message", event.data);
}
#onMessageError(event: MessageEvent) {
// TODO: is this right?
this.emit("messageerror", (event as any).error ?? event.data ?? event);
}
#onOpen() {
this.emit("online");
}
async getHeapSnapshot() {
throwNotImplemented("worker_threads.Worker.getHeapSnapshot");
}
}
export default {
Worker,
workerData,
parentPort,
resourceLimits,
isMainThread,
MessageChannel,
BroadcastChannel,
MessagePort,
getEnvironmentData,
setEnvironmentData,
getHeapSnapshot() {
return {};
},
markAsUntransferable,
moveMessagePortToContext,
receiveMessageOnPort,
SHARE_ENV,
threadId,
};