Files
bun.sh/src/js/node/worker_threads.ts
Alistair Smith 9df2b09768 push
2025-09-22 21:55:37 -07:00

434 lines
11 KiB
TypeScript

// import type { Readable, Writable } from "node:stream";
// import type { WorkerOptions } from "node:worker_threads";
declare const self: typeof globalThis;
type WebWorker = InstanceType<typeof globalThis.Worker>;
const EventEmitter = require("node:events");
const { Readable } = require("node:stream");
const { throwNotImplemented, warnNotImplementedOnce } = require("internal/shared");
const {
MessageChannel,
BroadcastChannel,
Worker: WebWorker,
} = globalThis as typeof globalThis & {
// The Worker constructor secretly takes an extra parameter to provide the node:worker_threads
// instance. This is so that it can emit the `worker` event on the process with the
// node:worker_threads instance instead of the Web Worker instance.
Worker: new (...args: [...ConstructorParameters<typeof globalThis.Worker>, nodeWorker: Worker]) => WebWorker;
};
const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV");
const isMainThread = Bun.isMainThread;
const {
0: _workerData,
1: _threadId,
2: _receiveMessageOnPort,
3: environmentData,
} = $cpp("Worker.cpp", "createNodeWorkerThreadsBinding") as [
unknown,
number,
(port: unknown) => unknown,
Map<unknown, unknown>,
];
type NodeWorkerOptions = import("node:worker_threads").WorkerOptions;
// Used to ensure that Blobs created to hold the source code for `eval: true` Workers get cleaned up
// after their Worker exits
let urlRevokeRegistry: FinalizationRegistry<string> | undefined = undefined;
function injectFakeEmitter(Class) {
function messageEventHandler(event: MessageEvent) {
return event.data;
}
function errorEventHandler(event: ErrorEvent) {
return event.error;
}
const wrappedListener = Symbol("wrappedListener");
function wrapped(run, listener) {
const callback = function (event) {
return listener(run(event));
};
listener[wrappedListener] = callback;
return callback;
}
function functionForEventType(event, listener) {
switch (event) {
case "error":
case "messageerror": {
return wrapped(errorEventHandler, listener);
}
default: {
return wrapped(messageEventHandler, listener);
}
}
}
Class.prototype.on = function (event, listener) {
this.addEventListener(event, functionForEventType(event, listener));
return this;
};
Class.prototype.off = function (event, listener) {
if (listener) {
this.removeEventListener(event, listener[wrappedListener] || listener);
} else {
this.removeEventListener(event);
}
return this;
};
Class.prototype.once = function (event, listener) {
this.addEventListener(event, functionForEventType(event, listener), { once: true });
return this;
};
function EventClass(eventName) {
if (eventName === "error" || eventName === "messageerror") {
return ErrorEvent;
}
return MessageEvent;
}
Class.prototype.emit = function (event, ...args) {
this.dispatchEvent(new (EventClass(event))(event, ...args));
return this;
};
Class.prototype.prependListener = Class.prototype.on;
Class.prototype.prependOnceListener = Class.prototype.once;
}
const _MessagePort = globalThis.MessagePort;
injectFakeEmitter(_MessagePort);
const MessagePort = _MessagePort;
let resourceLimits = {};
let workerData = _workerData;
let threadId = _threadId;
function receiveMessageOnPort(port: MessagePort) {
let res = _receiveMessageOnPort(port);
if (!res) return undefined;
return {
message: res,
};
}
// TODO: parent port emulation is not complete
function fakeParentPort() {
const fake = Object.create(MessagePort.prototype);
Object.defineProperty(fake, "onmessage", {
get() {
return self.onmessage;
},
set(value) {
self.onmessage = value;
},
});
Object.defineProperty(fake, "onmessageerror", {
get() {
return self.onmessageerror;
},
set(value) {
self.onmessageerror = value;
},
});
const postMessage = $newCppFunction("ZigGlobalObject.cpp", "jsFunctionPostMessage", 1);
Object.defineProperty(fake, "postMessage", {
value(...args: [any, any]) {
return postMessage.$apply(null, args);
},
});
Object.defineProperty(fake, "close", {
value() {},
});
Object.defineProperty(fake, "start", {
value() {},
});
Object.defineProperty(fake, "unref", {
value() {},
});
Object.defineProperty(fake, "ref", {
value() {},
});
Object.defineProperty(fake, "hasRef", {
value() {
return false;
},
});
Object.defineProperty(fake, "setEncoding", {
value() {},
});
Object.defineProperty(fake, "addEventListener", {
value: self.addEventListener.bind(self),
});
Object.defineProperty(fake, "removeEventListener", {
value: self.removeEventListener.bind(self),
});
Object.defineProperty(fake, "removeListener", {
value: self.removeEventListener.bind(self),
enumerable: false,
});
Object.defineProperty(fake, "addListener", {
value: self.addEventListener.bind(self),
enumerable: false,
});
return fake;
}
let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
function getEnvironmentData(key: unknown): unknown {
return environmentData.get(key);
}
function setEnvironmentData(key: unknown, value: unknown): void {
if (value === undefined) {
environmentData.delete(key);
} else {
environmentData.set(key, value);
}
}
function markAsUntransferable() {
throwNotImplemented("worker_threads.markAsUntransferable");
}
function moveMessagePortToContext() {
throwNotImplemented("worker_threads.moveMessagePortToContext");
}
const unsupportedOptions = ["stdin", "stdout", "stderr", "trackedUnmanagedFds", "resourceLimits"];
class Worker extends EventEmitter {
#worker: WebWorker;
#performance;
// this is used by terminate();
// either is the exit code if exited, a promise resolving to the exit code, or undefined if we haven't sent .terminate() yet
#onExitPromise: Promise<number> | number | undefined = undefined;
#urlToRevoke = "";
constructor(filename: string, options: NodeWorkerOptions = {}) {
super();
for (const key of unsupportedOptions) {
if (key in options && options[key] != null) {
warnNotImplementedOnce(`worker_threads.Worker option "${key}"`);
}
}
const builtinsGeneratorHatesEval = "ev" + "a" + "l"[0];
if (options && builtinsGeneratorHatesEval in options) {
if (options[builtinsGeneratorHatesEval]) {
// TODO: consider doing this step in native code and letting the Blob be cleaned up by the
// C++ Worker object's destructor
const blob = new Blob([filename], { type: "" });
this.#urlToRevoke = filename = URL.createObjectURL(blob);
} else {
// if options.eval = false, allow the constructor below to fail, if
// we convert the code to a blob, it will succeed.
this.#urlToRevoke = filename;
}
}
try {
this.#worker = new WebWorker(filename, options as Bun.WorkerOptions, this);
} catch (e) {
if (this.#urlToRevoke) {
URL.revokeObjectURL(this.#urlToRevoke);
}
throw e;
}
this.#worker.addEventListener("close", this.#onClose.bind(this), { once: true });
this.#worker.addEventListener("error", this.#onError.bind(this));
this.#worker.addEventListener("message", this.#onMessage.bind(this));
this.#worker.addEventListener("messageerror", this.#onMessageError.bind(this));
this.#worker.addEventListener("open", this.#onOpen.bind(this), { once: true });
if (this.#urlToRevoke) {
if (!urlRevokeRegistry) {
urlRevokeRegistry = new FinalizationRegistry<string>(url => {
URL.revokeObjectURL(url);
});
}
urlRevokeRegistry.register(this.#worker, this.#urlToRevoke);
}
}
get threadId() {
return this.#worker.threadId;
}
ref() {
this.#worker.ref();
}
unref() {
this.#worker.unref();
}
get stdin() {
// TODO:
return null;
}
get stdout() {
// TODO:
return null;
}
get stderr() {
// TODO:
return null;
}
get performance() {
return (this.#performance ??= {
eventLoopUtilization() {
warnNotImplementedOnce("worker_threads.Worker.performance");
return {
idle: 0,
active: 0,
utilization: 0,
};
},
});
}
terminate(callback: unknown) {
if (typeof callback === "function") {
process.emitWarning(
"Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.",
"DeprecationWarning",
"DEP0132",
);
this.#worker.addEventListener("close", event => callback(null, event.code), { once: true });
}
const onExitPromise = this.#onExitPromise;
if (onExitPromise) {
return $isPromise(onExitPromise) ? onExitPromise : Promise.$resolve(onExitPromise);
}
const { resolve, promise } = Promise.withResolvers();
this.#worker.addEventListener(
"close",
event => {
resolve(event.code);
},
{ once: true },
);
this.#worker.terminate();
return (this.#onExitPromise = promise);
}
postMessage(...args: Parameters<Bun.Worker["postMessage"]>) {
return this.#worker.postMessage.$apply(this.#worker, args);
}
getHeapSnapshot(options: unknown) {
const stringPromise = this.#worker.getHeapSnapshot(options);
return stringPromise.then(s => new HeapSnapshotStream(s));
}
#onClose(e) {
this.#onExitPromise = e.code;
this.emit("exit", e.code);
}
#onError(event: ErrorEvent) {
let error = event?.error;
// if the thrown value serialized successfully, the message will be empty
// if not the message is the actual error
if (event.message !== "") {
error = new Error(event.message, { cause: event });
const stack = event?.stack;
if (stack) {
error.stack = stack;
}
}
this.emit("error", error);
}
#onMessage(event: MessageEvent) {
// TODO: is this right?
this.emit("message", event.data);
}
#onMessageError(event: MessageEvent) {
// TODO: is this right?
this.emit("messageerror", (event as any).error ?? event.data ?? event);
}
#onOpen() {
this.emit("online");
}
async [Symbol.asyncDispose]() {
await this.terminate();
}
}
class HeapSnapshotStream extends Readable {
#json: string | undefined;
constructor(json: string) {
super();
this.#json = json;
}
_read() {
if (this.#json !== undefined) {
this.push(this.#json);
this.push(null);
this.#json = undefined;
}
}
}
export default {
Worker,
workerData,
parentPort,
resourceLimits,
isMainThread,
MessageChannel,
BroadcastChannel,
MessagePort,
getEnvironmentData,
setEnvironmentData,
getHeapSnapshot() {
return {};
},
markAsUntransferable,
moveMessagePortToContext,
receiveMessageOnPort,
SHARE_ENV,
threadId,
};