diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp index 8db7b462b3..ccc3d20f46 100644 --- a/src/bun.js/bindings/webcore/Worker.cpp +++ b/src/bun.js/bindings/webcore/Worker.cpp @@ -60,6 +60,8 @@ #include #include #include "MessageEvent.h" +#include "MessageChannel.h" +#include "AddEventListenerOptions.h" #include "BunWorkerGlobalScope.h" #include "CloseEvent.h" #include "JSMessagePort.h" @@ -69,6 +71,59 @@ namespace WebCore { WTF_MAKE_TZONE_ALLOCATED_IMPL(Worker); +// Event listener that forwards messages from a MessagePort to a Worker object +// https://github.com/nodejs/node/blob/e1fc3dc2fcf19d9278ab59c353aa1fa59290378b/lib/internal/worker.js#L331-L335 +class WorkerMessageForwarder final : public EventListener { +public: + static Ref create(Worker& worker) + { + return adoptRef(*new WorkerMessageForwarder(worker)); + } + + bool operator==(const EventListener& other) const override + { + return this == &other; + } + + void handleEvent(ScriptExecutionContext& context, Event& event) override + { + if (!m_worker) + return; + + if (event.type() != eventNames().messageEvent) + return; + + auto& messageEvent = static_cast(event); + + // Get the data value from the event's cache + JSC::JSValue dataValue = messageEvent.cachedData().getValue(JSC::jsNull()); + auto& vm = context.vm(); + + // Queue a task to dispatch the message to the Worker after the current event finishes + // This avoids the "event is already being dispatched" assertion + // We're already on the parent context (where m_parentPort lives), so use postTask + context.postTask([protectedWorker = Ref { *m_worker }, ports = messageEvent.ports(), strongData = JSC::Strong(vm, dataValue)](ScriptExecutionContext& ctx) mutable { + // Create a new event with the data + MessageEvent::Init init; + init.data = strongData.get(); + init.ports = WTF::move(ports); + auto newEvent = MessageEvent::create(eventNames().messageEvent, WTF::move(init), EventIsTrusted::Yes); + protectedWorker->dispatchEvent(newEvent); + }); + } + +private: + explicit WorkerMessageForwarder(Worker& worker) + : EventListener(NativeEventListenerType) + , m_worker(&worker) + { + } + + // Raw pointer is safe because the Worker owns the MessagePort which owns this listener. + // When the Worker is destroyed, the MessagePort is destroyed first. + Worker* m_worker; +}; + extern "C" void WebWorker__notifyNeedTermination( void* worker); @@ -151,6 +206,25 @@ ExceptionOr> Worker::create(ScriptExecutionContext& context, const S { auto worker = adoptRef(*new Worker(context, WTF::move(options))); + // For Node workers, create a MessagePort pair for parent-worker communication. + // The parent keeps port1 (m_parentPort) and the child gets port2 (via options). + if (worker->m_options.kind == WorkerOptions::Kind::Node) { + auto channel = MessageChannel::create(context); + worker->m_parentPort = &channel->port1(); + worker->m_parentPort->entangle(); + + // Set up a listener on the parent port that forwards messages to the Worker object + // This allows worker.on('message', ...) to receive messages sent via parentPort.postMessage() + auto forwarder = WorkerMessageForwarder::create(worker.get()); + static_cast(worker->m_parentPort.get())->addEventListener(eventNames().messageEvent, WTF::move(forwarder), {}); + worker->m_parentPort->start(); + + // Disentangle the child port from the parent context so it can be transferred to the worker + MessagePort& childPort = channel->port2(); + auto disentangledPort = childPort.disentangle(); + worker->m_options.parentPortTransferred = WTF::move(disentangledPort); + } + WTF::String url = urlInit; if (url.startsWith("file://"_s)) { WTF::URL urlObject = WTF::URL(url); @@ -224,6 +298,11 @@ ExceptionOr> Worker::create(ScriptExecutionContext& context, const S Worker::~Worker() { + // Close the parent port before member destruction begins. + // This removes the WorkerMessageForwarder listener while Worker is still fully valid. + if (m_parentPort) + m_parentPort->close(); + { Locker locker { allWorkersLock }; allWorkers().remove(m_clientIdentifier); @@ -236,6 +315,14 @@ ExceptionOr Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m if (m_terminationFlags & TerminatedFlag) return Exception { InvalidStateError, "Worker has been terminated"_s }; + // For Node workers, post through the MessagePort (m_parentPort) which delivers + // to the worker's parentPort. This avoids triggering self.onmessage which is + // Web Worker behavior, not Node worker_threads behavior. + if (m_options.kind == WorkerOptions::Kind::Node && m_parentPort) { + return m_parentPort->postMessage(state, messageValue, WTF::move(options)); + } + + // For Web Workers, dispatch to globalEventScope (which triggers self.onmessage) Vector> ports; auto serialized = SerializedScriptValue::create(state, messageValue, WTF::move(options.transfer), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage); if (serialized.hasException()) @@ -562,6 +649,7 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject) JSValue workerData = jsNull(); JSValue threadId = jsNumber(0); JSMap* environmentData = nullptr; + JSValue parentPortValue = jsNull(); if (auto* worker = WebWorker__getParentWorker(globalObject->bunVM())) { auto& options = worker->options(); @@ -583,6 +671,16 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject) // Main thread starts at 1 threadId = jsNumber(worker->clientIdentifier() - 1); + + // Entangle the parentPort MessagePort for Node workers (transferred from parent) + if (options.parentPortTransferred.has_value()) { + auto* context = globalObject->scriptExecutionContext(); + if (context) { + auto parentPort = MessagePort::entangle(*context, WTF::move(*options.parentPortTransferred)); + parentPort->start(); + parentPortValue = toJS(globalObject, globalObject, parentPort.get()); + } + } } if (!environmentData) { environmentData = JSMap::create(vm, globalObject->mapStructure()); @@ -591,12 +689,13 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject) ASSERT(environmentData); globalObject->setNodeWorkerEnvironmentData(environmentData); - JSObject* array = constructEmptyArray(globalObject, nullptr, 4); + JSObject* array = constructEmptyArray(globalObject, nullptr, 5); RETURN_IF_EXCEPTION(scope, {}); array->putDirectIndex(globalObject, 0, workerData); array->putDirectIndex(globalObject, 1, threadId); array->putDirectIndex(globalObject, 2, JSFunction::create(vm, globalObject, 1, "receiveMessageOnPort"_s, jsReceiveMessageOnPort, ImplementationVisibility::Public, NoIntrinsic)); array->putDirectIndex(globalObject, 3, environmentData); + array->putDirectIndex(globalObject, 4, parentPortValue); return array; } diff --git a/src/bun.js/bindings/webcore/Worker.h b/src/bun.js/bindings/webcore/Worker.h index bbc73053dd..cacdade6f9 100644 --- a/src/bun.js/bindings/webcore/Worker.h +++ b/src/bun.js/bindings/webcore/Worker.h @@ -28,6 +28,7 @@ #include "ActiveDOMObject.h" #include "EventTarget.h" #include "WorkerOptions.h" +#include "MessagePort.h" #include #include #include @@ -120,6 +121,9 @@ private: std::atomic m_terminationFlags { 0 }; const ScriptExecutionContextIdentifier m_clientIdentifier; void* impl_ { nullptr }; + + // For Node workers: the parent-side MessagePort for communicating with the worker's parentPort + RefPtr m_parentPort; }; JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject); diff --git a/src/bun.js/bindings/webcore/WorkerOptions.h b/src/bun.js/bindings/webcore/WorkerOptions.h index 3feed8512c..07b99e36fe 100644 --- a/src/bun.js/bindings/webcore/WorkerOptions.h +++ b/src/bun.js/bindings/webcore/WorkerOptions.h @@ -34,6 +34,10 @@ struct WorkerOptions { Vector argv; // If nullopt, inherit execArgv from the parent thread std::optional> execArgv; + + // For Node workers: the transferred parentPort + // This is disentangled from the parent and entangled in the worker + std::optional parentPortTransferred; }; } // namespace WebCore diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts index c6d0633dde..2bc007f327 100644 --- a/src/js/node/worker_threads.ts +++ b/src/js/node/worker_threads.ts @@ -25,11 +25,13 @@ const { 1: _threadId, 2: _receiveMessageOnPort, 3: environmentData, + 4: _parentPort, } = $cpp("Worker.cpp", "createNodeWorkerThreadsBinding") as [ unknown, number, (port: unknown) => unknown, Map, + MessagePort | null, ]; type NodeWorkerOptions = import("node:worker_threads").WorkerOptions; @@ -127,81 +129,15 @@ function receiveMessageOnPort(port: MessagePort) { }; } -// TODO: parent port emulation is not complete -function fakeParentPort() { - const fake = Object.create(MessagePort.prototype); - Object.defineProperty(fake, "onmessage", { - get() { - return self.onmessage; - }, - set(value) { - self.onmessage = value; - }, - }); +// For Node workers, parentPort is a real MessagePort (separate from self.onmessage). +// Messages sent via worker.postMessage() only trigger parentPort listeners, +// not self.onmessage (which is Web Worker behavior). +let parentPort: MessagePort | null = isMainThread ? null : _parentPort; - Object.defineProperty(fake, "onmessageerror", { - get() { - return self.onmessageerror; - }, - set(value) { - self.onmessageerror = value; - }, - }); - - const postMessage = $newCppFunction("ZigGlobalObject.cpp", "jsFunctionPostMessage", 1); - Object.defineProperty(fake, "postMessage", { - value(...args: [any, any]) { - return postMessage.$apply(null, args); - }, - }); - - Object.defineProperty(fake, "close", { - value() {}, - }); - - Object.defineProperty(fake, "start", { - value() {}, - }); - - Object.defineProperty(fake, "unref", { - value() {}, - }); - - Object.defineProperty(fake, "ref", { - value() {}, - }); - - Object.defineProperty(fake, "hasRef", { - value() { - return false; - }, - }); - - Object.defineProperty(fake, "setEncoding", { - value() {}, - }); - - Object.defineProperty(fake, "addEventListener", { - value: self.addEventListener.bind(self), - }); - - Object.defineProperty(fake, "removeEventListener", { - value: self.removeEventListener.bind(self), - }); - - Object.defineProperty(fake, "removeListener", { - value: self.removeEventListener.bind(self), - enumerable: false, - }); - - Object.defineProperty(fake, "addListener", { - value: self.addEventListener.bind(self), - enumerable: false, - }); - - return fake; +// Add setEncoding which is a Node.js-specific no-op for stream compatibility. +if (parentPort && !("setEncoding" in parentPort)) { + Object.defineProperty(parentPort, "setEncoding", { value() {}, enumerable: false }); } -let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort(); function getEnvironmentData(key: unknown): unknown { return environmentData.get(key); diff --git a/test/js/node/worker_threads/worker_threads.test.ts b/test/js/node/worker_threads/worker_threads.test.ts index 555c205c6f..6afe983ed1 100644 --- a/test/js/node/worker_threads/worker_threads.test.ts +++ b/test/js/node/worker_threads/worker_threads.test.ts @@ -32,6 +32,34 @@ test("support eval in worker", async () => { await worker.terminate(); }); +// In Node.js worker_threads, messages go to parentPort only, not self.onmessage. +// Libraries like fflate set both handlers, expecting only parentPort to fire. +test("worker_threads messages should not trigger self.onmessage", async () => { + const workerCode = ` +const { parentPort } = require('worker_threads'); +let selfOnMessageCount = 0; +let parentPortOnMessageCount = 0; + +self.onmessage = () => { selfOnMessageCount++; }; +parentPort.on('message', () => { + parentPortOnMessageCount++; + parentPort.postMessage({ selfOnMessageCount, parentPortOnMessageCount }); +}); +`; + const worker = new Worker(workerCode, { eval: true }); + const result = await new Promise<{ selfOnMessageCount: number; parentPortOnMessageCount: number }>( + (resolve, reject) => { + worker.on("message", resolve); + worker.on("error", reject); + worker.postMessage({ test: 1 }); + }, + ); + await worker.terminate(); + + expect(result.parentPortOnMessageCount).toBe(1); + expect(result.selfOnMessageCount).toBe(0); +}); + test("all worker_threads module properties are present", () => { expect(wt).toHaveProperty("getEnvironmentData"); expect(wt).toHaveProperty("isMainThread");