Compare commits

...

8 Commits

Author SHA1 Message Date
Jarred Sumner
8bc9b2e741 Merge branch 'main' into dylan/fix-worker-threads-double-dispatch 2026-01-29 01:28:02 -08:00
Dylan Conway
45302e60b1 Merge branch 'main' into dylan/fix-worker-threads-double-dispatch 2026-01-23 10:43:15 -08:00
Jarred Sumner
657feebdf6 Merge branch 'main' into dylan/fix-worker-threads-double-dispatch 2026-01-20 22:43:03 -08:00
Claude Bot
0f59bb00b2 fix(worker_threads): dispatch messages synchronously to prevent race with exit
The previous implementation used postTask to defer message dispatch from
the parent's MessagePort to the Worker object. This caused a race condition
where the worker could exit before the queued message was processed,
resulting in the message event never firing.

Changed to dispatch the message event synchronously. This is safe because:
1. Worker is a different EventTarget from MessagePort, so no "event
   already being dispatched" assertion will fire
2. Synchronous dispatch ensures messages are processed before exit events

This fixes the test-worker-sharedarraybuffer-from-worker-thread.js failure
on Windows CI.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 01:21:49 +00:00
Claude Bot
c1da935e5e fix(test): update worker-fixture-argv.js for new parentPort behavior
The fixture was using `globalThis.addEventListener` first (which is
available in worker threads), but with the new MessagePort-based
parentPort, messages from `worker.postMessage()` now correctly go to
parentPort rather than self.onmessage. Updated the fixture to
explicitly check for parentPort availability first.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 00:42:01 +00:00
Dylan Conway
ffd80531a1 update 2026-01-19 22:05:51 -08:00
Dylan Conway
a85133aa3b fix(worker_threads): remove unused self declaration
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%)
Claude-Steers: 0
Claude-Permission-Prompts: 1
Claude-Escapes: 0
2026-01-19 21:27:35 -08:00
Dylan Conway
94d609cd69 fix(worker_threads): use MessagePort for parentPort instead of dispatching to self
Node.js worker_threads delivers messages only to parentPort, not to
self.onmessage. Libraries like fflate set both handlers expecting only
parentPort.on('message') to fire.

Previously, Bun dispatched messages to both self.onmessage AND
parentPort event listeners, causing handlers to run twice.

This fix creates a real MessagePort pair for Node workers:
- Parent keeps port1 (m_parentPort) for worker.postMessage()
- Worker gets port2 as parentPort via entangle()
- WorkerMessageForwarder forwards port1 messages to Worker object

This matches Node.js architecture where worker.postMessage() goes
through a MessagePort channel, not the global scope.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%)
Claude-Steers: 0
Claude-Permission-Prompts: 4
Claude-Escapes: 0
2026-01-19 21:22:48 -08:00
7 changed files with 169 additions and 82 deletions

View File

@@ -202,7 +202,12 @@ static inline bool setJSMessagePort_onmessageSetter(JSGlobalObject& lexicalGloba
vm.writeBarrier(&thisObject, value);
ensureStillAliveHere(value);
thisObject.wrapped().jsRef(&lexicalGlobalObject);
// Only ref when setting to a callable function; unref when clearing or setting to non-function.
// This matches Node.js behavior where setting onmessage to a non-function allows the event loop to exit.
if (value.isCallable())
thisObject.wrapped().jsRef(&lexicalGlobalObject);
else
thisObject.wrapped().jsUnref(&lexicalGlobalObject);
return true;
}

View File

@@ -60,6 +60,8 @@
#include <JavaScriptCore/JSModuleLoader.h>
#include <JavaScriptCore/DeferredWorkTimer.h>
#include "MessageEvent.h"
#include "MessageChannel.h"
#include "AddEventListenerOptions.h"
#include "BunWorkerGlobalScope.h"
#include "CloseEvent.h"
#include "JSMessagePort.h"
@@ -69,6 +71,56 @@ namespace WebCore {
WTF_MAKE_TZONE_ALLOCATED_IMPL(Worker);
// Event listener that forwards messages from a MessagePort to a Worker object
// https://github.com/nodejs/node/blob/e1fc3dc2fcf19d9278ab59c353aa1fa59290378b/lib/internal/worker.js#L331-L335
class WorkerMessageForwarder final : public EventListener {
public:
static Ref<WorkerMessageForwarder> create(Worker& worker)
{
return adoptRef(*new WorkerMessageForwarder(worker));
}
bool operator==(const EventListener& other) const override
{
return this == &other;
}
void handleEvent(ScriptExecutionContext& context, Event& event) override
{
if (!m_worker)
return;
if (event.type() != eventNames().messageEvent)
return;
auto& messageEvent = static_cast<MessageEvent&>(event);
// Get the data value from the event's cache
JSC::JSValue dataValue = messageEvent.cachedData().getValue(JSC::jsNull());
// Create and dispatch the message event to the Worker object synchronously.
// This is safe because Worker is a different EventTarget from the MessagePort,
// so we won't trigger "event is already being dispatched" assertions.
// Dispatching synchronously ensures message events are processed before exit events.
MessageEvent::Init init;
init.data = dataValue;
init.ports = messageEvent.ports();
auto newEvent = MessageEvent::create(eventNames().messageEvent, WTF::move(init), EventIsTrusted::Yes);
m_worker->dispatchEvent(newEvent);
}
private:
explicit WorkerMessageForwarder(Worker& worker)
: EventListener(NativeEventListenerType)
, m_worker(&worker)
{
}
// Raw pointer is safe because the Worker owns the MessagePort which owns this listener.
// When the Worker is destroyed, the MessagePort is destroyed first.
Worker* m_worker;
};
extern "C" void WebWorker__notifyNeedTermination(
void* worker);
@@ -151,6 +203,25 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
{
auto worker = adoptRef(*new Worker(context, WTF::move(options)));
// For Node workers, create a MessagePort pair for parent-worker communication.
// The parent keeps port1 (m_parentPort) and the child gets port2 (via options).
if (worker->m_options.kind == WorkerOptions::Kind::Node) {
auto channel = MessageChannel::create(context);
worker->m_parentPort = &channel->port1();
worker->m_parentPort->entangle();
// Set up a listener on the parent port that forwards messages to the Worker object
// This allows worker.on('message', ...) to receive messages sent via parentPort.postMessage()
auto forwarder = WorkerMessageForwarder::create(worker.get());
static_cast<EventTarget*>(worker->m_parentPort.get())->addEventListener(eventNames().messageEvent, WTF::move(forwarder), {});
worker->m_parentPort->start();
// Disentangle the child port from the parent context so it can be transferred to the worker
MessagePort& childPort = channel->port2();
auto disentangledPort = childPort.disentangle();
worker->m_options.parentPortTransferred = WTF::move(disentangledPort);
}
WTF::String url = urlInit;
if (url.startsWith("file://"_s)) {
WTF::URL urlObject = WTF::URL(url);
@@ -224,6 +295,11 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
Worker::~Worker()
{
// Close the parent port before member destruction begins.
// This removes the WorkerMessageForwarder listener while Worker is still fully valid.
if (m_parentPort)
m_parentPort->close();
{
Locker locker { allWorkersLock };
allWorkers().remove(m_clientIdentifier);
@@ -236,6 +312,14 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
if (m_terminationFlags & TerminatedFlag)
return Exception { InvalidStateError, "Worker has been terminated"_s };
// For Node workers, post through the MessagePort (m_parentPort) which delivers
// to the worker's parentPort. This avoids triggering self.onmessage which is
// Web Worker behavior, not Node worker_threads behavior.
if (m_options.kind == WorkerOptions::Kind::Node && m_parentPort) {
return m_parentPort->postMessage(state, messageValue, WTF::move(options));
}
// For Web Workers, dispatch to globalEventScope (which triggers self.onmessage)
Vector<RefPtr<MessagePort>> ports;
auto serialized = SerializedScriptValue::create(state, messageValue, WTF::move(options.transfer), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage);
if (serialized.hasException())
@@ -562,6 +646,7 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
JSValue workerData = jsNull();
JSValue threadId = jsNumber(0);
JSMap* environmentData = nullptr;
JSValue parentPortValue = jsNull();
if (auto* worker = WebWorker__getParentWorker(globalObject->bunVM())) {
auto& options = worker->options();
@@ -583,6 +668,16 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
// Main thread starts at 1
threadId = jsNumber(worker->clientIdentifier() - 1);
// Entangle the parentPort MessagePort for Node workers (transferred from parent)
if (options.parentPortTransferred.has_value()) {
auto* context = globalObject->scriptExecutionContext();
if (context) {
auto parentPort = MessagePort::entangle(*context, WTF::move(*options.parentPortTransferred));
parentPort->start();
parentPortValue = toJS(globalObject, globalObject, parentPort.get());
}
}
}
if (!environmentData) {
environmentData = JSMap::create(vm, globalObject->mapStructure());
@@ -591,12 +686,13 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
ASSERT(environmentData);
globalObject->setNodeWorkerEnvironmentData(environmentData);
JSObject* array = constructEmptyArray(globalObject, nullptr, 4);
JSObject* array = constructEmptyArray(globalObject, nullptr, 5);
RETURN_IF_EXCEPTION(scope, {});
array->putDirectIndex(globalObject, 0, workerData);
array->putDirectIndex(globalObject, 1, threadId);
array->putDirectIndex(globalObject, 2, JSFunction::create(vm, globalObject, 1, "receiveMessageOnPort"_s, jsReceiveMessageOnPort, ImplementationVisibility::Public, NoIntrinsic));
array->putDirectIndex(globalObject, 3, environmentData);
array->putDirectIndex(globalObject, 4, parentPortValue);
return array;
}

View File

@@ -28,6 +28,7 @@
#include "ActiveDOMObject.h"
#include "EventTarget.h"
#include "WorkerOptions.h"
#include "MessagePort.h"
#include <JavaScriptCore/RuntimeFlags.h>
#include <wtf/Deque.h>
#include <wtf/MonotonicTime.h>
@@ -120,6 +121,9 @@ private:
std::atomic<uint8_t> m_terminationFlags { 0 };
const ScriptExecutionContextIdentifier m_clientIdentifier;
void* impl_ { nullptr };
// For Node workers: the parent-side MessagePort for communicating with the worker's parentPort
RefPtr<MessagePort> m_parentPort;
};
JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject);

View File

@@ -34,6 +34,10 @@ struct WorkerOptions {
Vector<String> argv;
// If nullopt, inherit execArgv from the parent thread
std::optional<Vector<String>> execArgv;
// For Node workers: the transferred parentPort
// This is disentangled from the parent and entangled in the worker
std::optional<TransferredMessagePort> parentPortTransferred;
};
} // namespace WebCore

View File

@@ -1,6 +1,5 @@
// import type { Readable, Writable } from "node:stream";
// import type { WorkerOptions } from "node:worker_threads";
declare const self: typeof globalThis;
type WebWorker = InstanceType<typeof globalThis.Worker>;
const EventEmitter = require("node:events");
@@ -25,11 +24,13 @@ const {
1: _threadId,
2: _receiveMessageOnPort,
3: environmentData,
4: _parentPort,
} = $cpp("Worker.cpp", "createNodeWorkerThreadsBinding") as [
unknown,
number,
(port: unknown) => unknown,
Map<unknown, unknown>,
MessagePort | null,
];
type NodeWorkerOptions = import("node:worker_threads").WorkerOptions;
@@ -127,81 +128,15 @@ function receiveMessageOnPort(port: MessagePort) {
};
}
// TODO: parent port emulation is not complete
function fakeParentPort() {
const fake = Object.create(MessagePort.prototype);
Object.defineProperty(fake, "onmessage", {
get() {
return self.onmessage;
},
set(value) {
self.onmessage = value;
},
});
// For Node workers, parentPort is a real MessagePort (separate from self.onmessage).
// Messages sent via worker.postMessage() only trigger parentPort listeners,
// not self.onmessage (which is Web Worker behavior).
let parentPort: MessagePort | null = isMainThread ? null : _parentPort;
Object.defineProperty(fake, "onmessageerror", {
get() {
return self.onmessageerror;
},
set(value) {
self.onmessageerror = value;
},
});
const postMessage = $newCppFunction("ZigGlobalObject.cpp", "jsFunctionPostMessage", 1);
Object.defineProperty(fake, "postMessage", {
value(...args: [any, any]) {
return postMessage.$apply(null, args);
},
});
Object.defineProperty(fake, "close", {
value() {},
});
Object.defineProperty(fake, "start", {
value() {},
});
Object.defineProperty(fake, "unref", {
value() {},
});
Object.defineProperty(fake, "ref", {
value() {},
});
Object.defineProperty(fake, "hasRef", {
value() {
return false;
},
});
Object.defineProperty(fake, "setEncoding", {
value() {},
});
Object.defineProperty(fake, "addEventListener", {
value: self.addEventListener.bind(self),
});
Object.defineProperty(fake, "removeEventListener", {
value: self.removeEventListener.bind(self),
});
Object.defineProperty(fake, "removeListener", {
value: self.removeEventListener.bind(self),
enumerable: false,
});
Object.defineProperty(fake, "addListener", {
value: self.addEventListener.bind(self),
enumerable: false,
});
return fake;
// Add setEncoding which is a Node.js-specific no-op for stream compatibility.
if (parentPort && !("setEncoding" in parentPort)) {
Object.defineProperty(parentPort, "setEncoding", { value() {}, enumerable: false });
}
let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
function getEnvironmentData(key: unknown): unknown {
return environmentData.get(key);

View File

@@ -32,6 +32,34 @@ test("support eval in worker", async () => {
await worker.terminate();
});
// In Node.js worker_threads, messages go to parentPort only, not self.onmessage.
// Libraries like fflate set both handlers, expecting only parentPort to fire.
test("worker_threads messages should not trigger self.onmessage", async () => {
const workerCode = `
const { parentPort } = require('worker_threads');
let selfOnMessageCount = 0;
let parentPortOnMessageCount = 0;
self.onmessage = () => { selfOnMessageCount++; };
parentPort.on('message', () => {
parentPortOnMessageCount++;
parentPort.postMessage({ selfOnMessageCount, parentPortOnMessageCount });
});
`;
const worker = new Worker(workerCode, { eval: true });
const result = await new Promise<{ selfOnMessageCount: number; parentPortOnMessageCount: number }>(
(resolve, reject) => {
worker.on("message", resolve);
worker.on("error", reject);
worker.postMessage({ test: 1 });
},
);
await worker.terminate();
expect(result.parentPortOnMessageCount).toBe(1);
expect(result.selfOnMessageCount).toBe(0);
});
test("all worker_threads module properties are present", () => {
expect(wt).toHaveProperty("getEnvironmentData");
expect(wt).toHaveProperty("isMainThread");

View File

@@ -1,7 +1,22 @@
(globalThis.addEventListener || require("node:worker_threads").parentPort.on)("message", () => {
const postMessage = globalThis.postMessage || require("node:worker_threads").parentPort.postMessage;
postMessage({
argv: process.argv,
execArgv: process.execArgv,
// For Node workers, parentPort receives messages (not globalThis/self).
// For Web Workers, globalThis.addEventListener receives messages.
const wt = require("node:worker_threads");
const parentPort = wt.parentPort;
if (parentPort) {
// Node worker_threads
parentPort.on("message", () => {
parentPort.postMessage({
argv: process.argv,
execArgv: process.execArgv,
});
});
});
} else {
// Web Worker
globalThis.addEventListener("message", () => {
globalThis.postMessage({
argv: process.argv,
execArgv: process.execArgv,
});
});
}