mirror of
https://github.com/oven-sh/bun
synced 2026-02-09 10:28:47 +00:00
fix(worker_threads): use MessagePort for parentPort instead of dispatching to self
Node.js worker_threads delivers messages only to parentPort, not to
self.onmessage. Libraries like fflate set both handlers expecting only
parentPort.on('message') to fire.
Previously, Bun dispatched messages to both self.onmessage AND
parentPort event listeners, causing handlers to run twice.
This fix creates a real MessagePort pair for Node workers:
- Parent keeps port1 (m_parentPort) for worker.postMessage()
- Worker gets port2 as parentPort via entangle()
- WorkerMessageForwarder forwards port1 messages to Worker object
This matches Node.js architecture where worker.postMessage() goes
through a MessagePort channel, not the global scope.
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%)
Claude-Steers: 0
Claude-Permission-Prompts: 4
Claude-Escapes: 0
This commit is contained in:
@@ -60,6 +60,8 @@
|
||||
#include <JavaScriptCore/JSModuleLoader.h>
|
||||
#include <JavaScriptCore/DeferredWorkTimer.h>
|
||||
#include "MessageEvent.h"
|
||||
#include "MessageChannel.h"
|
||||
#include "AddEventListenerOptions.h"
|
||||
#include "BunWorkerGlobalScope.h"
|
||||
#include "CloseEvent.h"
|
||||
#include "JSMessagePort.h"
|
||||
@@ -69,6 +71,59 @@ namespace WebCore {
|
||||
|
||||
WTF_MAKE_TZONE_ALLOCATED_IMPL(Worker);
|
||||
|
||||
// Event listener that forwards messages from a MessagePort to a Worker object
|
||||
// https://github.com/nodejs/node/blob/e1fc3dc2fcf19d9278ab59c353aa1fa59290378b/lib/internal/worker.js#L331-L335
|
||||
class WorkerMessageForwarder final : public EventListener {
|
||||
public:
|
||||
static Ref<WorkerMessageForwarder> create(Worker& worker)
|
||||
{
|
||||
return adoptRef(*new WorkerMessageForwarder(worker));
|
||||
}
|
||||
|
||||
bool operator==(const EventListener& other) const override
|
||||
{
|
||||
return this == &other;
|
||||
}
|
||||
|
||||
void handleEvent(ScriptExecutionContext& context, Event& event) override
|
||||
{
|
||||
if (!m_worker)
|
||||
return;
|
||||
|
||||
if (event.type() != eventNames().messageEvent)
|
||||
return;
|
||||
|
||||
auto& messageEvent = static_cast<MessageEvent&>(event);
|
||||
|
||||
// Get the data value from the event's cache
|
||||
JSC::JSValue dataValue = messageEvent.cachedData().getValue(JSC::jsNull());
|
||||
auto& vm = context.vm();
|
||||
|
||||
// Queue a task to dispatch the message to the Worker after the current event finishes
|
||||
// This avoids the "event is already being dispatched" assertion
|
||||
// We're already on the parent context (where m_parentPort lives), so use postTask
|
||||
context.postTask([protectedWorker = Ref { *m_worker }, ports = messageEvent.ports(), strongData = JSC::Strong<JSC::Unknown>(vm, dataValue)](ScriptExecutionContext& ctx) mutable {
|
||||
// Create a new event with the data
|
||||
MessageEvent::Init init;
|
||||
init.data = strongData.get();
|
||||
init.ports = WTF::move(ports);
|
||||
auto newEvent = MessageEvent::create(eventNames().messageEvent, WTF::move(init), EventIsTrusted::Yes);
|
||||
protectedWorker->dispatchEvent(newEvent);
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
explicit WorkerMessageForwarder(Worker& worker)
|
||||
: EventListener(NativeEventListenerType)
|
||||
, m_worker(&worker)
|
||||
{
|
||||
}
|
||||
|
||||
// Raw pointer is safe because the Worker owns the MessagePort which owns this listener.
|
||||
// When the Worker is destroyed, the MessagePort is destroyed first.
|
||||
Worker* m_worker;
|
||||
};
|
||||
|
||||
extern "C" void WebWorker__notifyNeedTermination(
|
||||
void* worker);
|
||||
|
||||
@@ -151,6 +206,25 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
|
||||
{
|
||||
auto worker = adoptRef(*new Worker(context, WTF::move(options)));
|
||||
|
||||
// For Node workers, create a MessagePort pair for parent-worker communication.
|
||||
// The parent keeps port1 (m_parentPort) and the child gets port2 (via options).
|
||||
if (worker->m_options.kind == WorkerOptions::Kind::Node) {
|
||||
auto channel = MessageChannel::create(context);
|
||||
worker->m_parentPort = &channel->port1();
|
||||
worker->m_parentPort->entangle();
|
||||
|
||||
// Set up a listener on the parent port that forwards messages to the Worker object
|
||||
// This allows worker.on('message', ...) to receive messages sent via parentPort.postMessage()
|
||||
auto forwarder = WorkerMessageForwarder::create(worker.get());
|
||||
static_cast<EventTarget*>(worker->m_parentPort.get())->addEventListener(eventNames().messageEvent, WTF::move(forwarder), {});
|
||||
worker->m_parentPort->start();
|
||||
|
||||
// Disentangle the child port from the parent context so it can be transferred to the worker
|
||||
MessagePort& childPort = channel->port2();
|
||||
auto disentangledPort = childPort.disentangle();
|
||||
worker->m_options.parentPortTransferred = WTF::move(disentangledPort);
|
||||
}
|
||||
|
||||
WTF::String url = urlInit;
|
||||
if (url.startsWith("file://"_s)) {
|
||||
WTF::URL urlObject = WTF::URL(url);
|
||||
@@ -224,6 +298,11 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
|
||||
|
||||
Worker::~Worker()
|
||||
{
|
||||
// Close the parent port before member destruction begins.
|
||||
// This removes the WorkerMessageForwarder listener while Worker is still fully valid.
|
||||
if (m_parentPort)
|
||||
m_parentPort->close();
|
||||
|
||||
{
|
||||
Locker locker { allWorkersLock };
|
||||
allWorkers().remove(m_clientIdentifier);
|
||||
@@ -236,6 +315,14 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
|
||||
if (m_terminationFlags & TerminatedFlag)
|
||||
return Exception { InvalidStateError, "Worker has been terminated"_s };
|
||||
|
||||
// For Node workers, post through the MessagePort (m_parentPort) which delivers
|
||||
// to the worker's parentPort. This avoids triggering self.onmessage which is
|
||||
// Web Worker behavior, not Node worker_threads behavior.
|
||||
if (m_options.kind == WorkerOptions::Kind::Node && m_parentPort) {
|
||||
return m_parentPort->postMessage(state, messageValue, WTF::move(options));
|
||||
}
|
||||
|
||||
// For Web Workers, dispatch to globalEventScope (which triggers self.onmessage)
|
||||
Vector<RefPtr<MessagePort>> ports;
|
||||
auto serialized = SerializedScriptValue::create(state, messageValue, WTF::move(options.transfer), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage);
|
||||
if (serialized.hasException())
|
||||
@@ -562,6 +649,7 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
|
||||
JSValue workerData = jsNull();
|
||||
JSValue threadId = jsNumber(0);
|
||||
JSMap* environmentData = nullptr;
|
||||
JSValue parentPortValue = jsNull();
|
||||
|
||||
if (auto* worker = WebWorker__getParentWorker(globalObject->bunVM())) {
|
||||
auto& options = worker->options();
|
||||
@@ -583,6 +671,16 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
|
||||
|
||||
// Main thread starts at 1
|
||||
threadId = jsNumber(worker->clientIdentifier() - 1);
|
||||
|
||||
// Entangle the parentPort MessagePort for Node workers (transferred from parent)
|
||||
if (options.parentPortTransferred.has_value()) {
|
||||
auto* context = globalObject->scriptExecutionContext();
|
||||
if (context) {
|
||||
auto parentPort = MessagePort::entangle(*context, WTF::move(*options.parentPortTransferred));
|
||||
parentPort->start();
|
||||
parentPortValue = toJS(globalObject, globalObject, parentPort.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!environmentData) {
|
||||
environmentData = JSMap::create(vm, globalObject->mapStructure());
|
||||
@@ -591,12 +689,13 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
|
||||
ASSERT(environmentData);
|
||||
globalObject->setNodeWorkerEnvironmentData(environmentData);
|
||||
|
||||
JSObject* array = constructEmptyArray(globalObject, nullptr, 4);
|
||||
JSObject* array = constructEmptyArray(globalObject, nullptr, 5);
|
||||
RETURN_IF_EXCEPTION(scope, {});
|
||||
array->putDirectIndex(globalObject, 0, workerData);
|
||||
array->putDirectIndex(globalObject, 1, threadId);
|
||||
array->putDirectIndex(globalObject, 2, JSFunction::create(vm, globalObject, 1, "receiveMessageOnPort"_s, jsReceiveMessageOnPort, ImplementationVisibility::Public, NoIntrinsic));
|
||||
array->putDirectIndex(globalObject, 3, environmentData);
|
||||
array->putDirectIndex(globalObject, 4, parentPortValue);
|
||||
return array;
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include "ActiveDOMObject.h"
|
||||
#include "EventTarget.h"
|
||||
#include "WorkerOptions.h"
|
||||
#include "MessagePort.h"
|
||||
#include <JavaScriptCore/RuntimeFlags.h>
|
||||
#include <wtf/Deque.h>
|
||||
#include <wtf/MonotonicTime.h>
|
||||
@@ -120,6 +121,9 @@ private:
|
||||
std::atomic<uint8_t> m_terminationFlags { 0 };
|
||||
const ScriptExecutionContextIdentifier m_clientIdentifier;
|
||||
void* impl_ { nullptr };
|
||||
|
||||
// For Node workers: the parent-side MessagePort for communicating with the worker's parentPort
|
||||
RefPtr<MessagePort> m_parentPort;
|
||||
};
|
||||
|
||||
JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject);
|
||||
|
||||
@@ -34,6 +34,10 @@ struct WorkerOptions {
|
||||
Vector<String> argv;
|
||||
// If nullopt, inherit execArgv from the parent thread
|
||||
std::optional<Vector<String>> execArgv;
|
||||
|
||||
// For Node workers: the transferred parentPort
|
||||
// This is disentangled from the parent and entangled in the worker
|
||||
std::optional<TransferredMessagePort> parentPortTransferred;
|
||||
};
|
||||
|
||||
} // namespace WebCore
|
||||
|
||||
@@ -25,11 +25,13 @@ const {
|
||||
1: _threadId,
|
||||
2: _receiveMessageOnPort,
|
||||
3: environmentData,
|
||||
4: _parentPort,
|
||||
} = $cpp("Worker.cpp", "createNodeWorkerThreadsBinding") as [
|
||||
unknown,
|
||||
number,
|
||||
(port: unknown) => unknown,
|
||||
Map<unknown, unknown>,
|
||||
MessagePort | null,
|
||||
];
|
||||
|
||||
type NodeWorkerOptions = import("node:worker_threads").WorkerOptions;
|
||||
@@ -127,81 +129,15 @@ function receiveMessageOnPort(port: MessagePort) {
|
||||
};
|
||||
}
|
||||
|
||||
// TODO: parent port emulation is not complete
|
||||
function fakeParentPort() {
|
||||
const fake = Object.create(MessagePort.prototype);
|
||||
Object.defineProperty(fake, "onmessage", {
|
||||
get() {
|
||||
return self.onmessage;
|
||||
},
|
||||
set(value) {
|
||||
self.onmessage = value;
|
||||
},
|
||||
});
|
||||
// For Node workers, parentPort is a real MessagePort (separate from self.onmessage).
|
||||
// Messages sent via worker.postMessage() only trigger parentPort listeners,
|
||||
// not self.onmessage (which is Web Worker behavior).
|
||||
let parentPort: MessagePort | null = isMainThread ? null : _parentPort;
|
||||
|
||||
Object.defineProperty(fake, "onmessageerror", {
|
||||
get() {
|
||||
return self.onmessageerror;
|
||||
},
|
||||
set(value) {
|
||||
self.onmessageerror = value;
|
||||
},
|
||||
});
|
||||
|
||||
const postMessage = $newCppFunction("ZigGlobalObject.cpp", "jsFunctionPostMessage", 1);
|
||||
Object.defineProperty(fake, "postMessage", {
|
||||
value(...args: [any, any]) {
|
||||
return postMessage.$apply(null, args);
|
||||
},
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "close", {
|
||||
value() {},
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "start", {
|
||||
value() {},
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "unref", {
|
||||
value() {},
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "ref", {
|
||||
value() {},
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "hasRef", {
|
||||
value() {
|
||||
return false;
|
||||
},
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "setEncoding", {
|
||||
value() {},
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "addEventListener", {
|
||||
value: self.addEventListener.bind(self),
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "removeEventListener", {
|
||||
value: self.removeEventListener.bind(self),
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "removeListener", {
|
||||
value: self.removeEventListener.bind(self),
|
||||
enumerable: false,
|
||||
});
|
||||
|
||||
Object.defineProperty(fake, "addListener", {
|
||||
value: self.addEventListener.bind(self),
|
||||
enumerable: false,
|
||||
});
|
||||
|
||||
return fake;
|
||||
// Add setEncoding which is a Node.js-specific no-op for stream compatibility.
|
||||
if (parentPort && !("setEncoding" in parentPort)) {
|
||||
Object.defineProperty(parentPort, "setEncoding", { value() {}, enumerable: false });
|
||||
}
|
||||
let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
|
||||
|
||||
function getEnvironmentData(key: unknown): unknown {
|
||||
return environmentData.get(key);
|
||||
|
||||
@@ -32,6 +32,34 @@ test("support eval in worker", async () => {
|
||||
await worker.terminate();
|
||||
});
|
||||
|
||||
// In Node.js worker_threads, messages go to parentPort only, not self.onmessage.
|
||||
// Libraries like fflate set both handlers, expecting only parentPort to fire.
|
||||
test("worker_threads messages should not trigger self.onmessage", async () => {
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
let selfOnMessageCount = 0;
|
||||
let parentPortOnMessageCount = 0;
|
||||
|
||||
self.onmessage = () => { selfOnMessageCount++; };
|
||||
parentPort.on('message', () => {
|
||||
parentPortOnMessageCount++;
|
||||
parentPort.postMessage({ selfOnMessageCount, parentPortOnMessageCount });
|
||||
});
|
||||
`;
|
||||
const worker = new Worker(workerCode, { eval: true });
|
||||
const result = await new Promise<{ selfOnMessageCount: number; parentPortOnMessageCount: number }>(
|
||||
(resolve, reject) => {
|
||||
worker.on("message", resolve);
|
||||
worker.on("error", reject);
|
||||
worker.postMessage({ test: 1 });
|
||||
},
|
||||
);
|
||||
await worker.terminate();
|
||||
|
||||
expect(result.parentPortOnMessageCount).toBe(1);
|
||||
expect(result.selfOnMessageCount).toBe(0);
|
||||
});
|
||||
|
||||
test("all worker_threads module properties are present", () => {
|
||||
expect(wt).toHaveProperty("getEnvironmentData");
|
||||
expect(wt).toHaveProperty("isMainThread");
|
||||
|
||||
Reference in New Issue
Block a user