Compare commits

...

3 Commits

Author SHA1 Message Date
Claude Bot
9ca5870ab8 refactor: extract channelForPort helper for lock-guarded access
Per code review feedback, moved the lock acquisition to a single helper
function channelForPort() that returns a RefPtr<MessagePortChannel>.
This reduces code duplication and makes the locking pattern clearer.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 23:04:26 +00:00
Claude Bot
eb67910ea8 test: remove stderr assertions per code review
Remove the stderr assertions that check for "Segmentation fault" and
"panic" strings, as these tests will never fail in CI. Rely on the
JSON parsing, result assertions, and exit code checks instead.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 20:24:31 +00:00
Claude Bot
f66c737409 fix(worker_threads): add thread-safety to MessagePortChannelRegistry
The MessagePortChannelRegistry's m_openChannels HashMap was being
accessed from multiple threads without synchronization, causing
race conditions and segfaults when using worker_threads with
MessageChannel (e.g., when running oxfmt).

This adds a Lock to protect all accesses to m_openChannels, fixing
the thread-safety issue.

Fixes #25610

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 20:12:41 +00:00
3 changed files with 201 additions and 7 deletions

View File

@@ -43,9 +43,16 @@ MessagePortChannelRegistry::MessagePortChannelRegistry() = default;
MessagePortChannelRegistry::~MessagePortChannelRegistry()
{
Locker locker { m_openChannelsLock };
ASSERT(m_openChannels.isEmpty());
}
RefPtr<MessagePortChannel> MessagePortChannelRegistry::channelForPort(const MessagePortIdentifier& port)
{
Locker locker { m_openChannelsLock };
return m_openChannels.get(port);
}
void MessagePortChannelRegistry::didCreateMessagePortChannel(const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
{
// LOG(MessagePorts, "Registry: Creating MessagePortChannel %p linking %s and %s", this, port1.logString().utf8().data(), port2.logString().utf8().data());
@@ -58,6 +65,7 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c
{
// ASSERT(isMainThread());
Locker locker { m_openChannelsLock };
auto result = m_openChannels.add(channel.port1(), channel);
ASSERT_UNUSED(result, result.isNewEntry);
@@ -69,6 +77,7 @@ void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel&
{
// ASSERT(isMainThread());
Locker locker { m_openChannelsLock };
ASSERT(m_openChannels.get(channel.port1()) == &channel);
ASSERT(m_openChannels.get(channel.port2()) == &channel);
@@ -83,7 +92,7 @@ void MessagePortChannelRegistry::didEntangleLocalToRemote(const MessagePortIdent
// ASSERT(isMainThread());
// The channel might be gone if the remote side was closed.
RefPtr channel = m_openChannels.get(local);
auto channel = channelForPort(local);
if (!channel)
return;
@@ -97,7 +106,7 @@ void MessagePortChannelRegistry::didDisentangleMessagePort(const MessagePortIden
// ASSERT(isMainThread());
// The channel might be gone if the remote side was closed.
if (RefPtr channel = m_openChannels.get(port))
if (auto channel = channelForPort(port))
channel->disentanglePort(port);
}
@@ -107,7 +116,7 @@ void MessagePortChannelRegistry::didCloseMessagePort(const MessagePortIdentifier
// LOG(MessagePorts, "Registry: MessagePort %s closed in registry", port.logString().utf8().data());
RefPtr channel = m_openChannels.get(port);
auto channel = channelForPort(port);
if (!channel)
return;
@@ -129,7 +138,7 @@ bool MessagePortChannelRegistry::didPostMessageToRemote(MessageWithMessagePorts&
// LOG(MessagePorts, "Registry: Posting message to MessagePort %s in registry", remoteTarget.logString().utf8().data());
// The channel might be gone if the remote side was closed.
RefPtr channel = m_openChannels.get(remoteTarget);
auto channel = channelForPort(remoteTarget);
if (!channel) {
// LOG(MessagePorts, "Registry: Could not find MessagePortChannel for port %s; It was probably closed. Message will be dropped.", remoteTarget.logString().utf8().data());
return false;
@@ -143,7 +152,7 @@ void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentif
// ASSERT(isMainThread());
// The channel might be gone if the remote side was closed.
RefPtr channel = m_openChannels.get(port);
auto channel = channelForPort(port);
if (!channel) {
callback({}, [] {});
return;
@@ -159,7 +168,7 @@ std::optional<MessageWithMessagePorts> MessagePortChannelRegistry::tryTakeMessag
// LOG(MessagePorts, "Registry: Trying to take a message for MessagePort %s", port.logString().utf8().data());
// The channel might be gone if the remote side was closed.
auto* channel = m_openChannels.get(port);
auto channel = channelForPort(port);
if (!channel)
return std::nullopt;
@@ -170,6 +179,7 @@ MessagePortChannel* MessagePortChannelRegistry::existingChannelContainingPort(co
{
// ASSERT(isMainThread());
Locker locker { m_openChannelsLock };
return m_openChannels.get(port);
}

View File

@@ -31,6 +31,7 @@
#include "ProcessIdentifier.h"
#include <wtf/HashMap.h>
#include <wtf/CheckedRef.h>
#include <wtf/Lock.h>
namespace WebCore {
@@ -57,7 +58,10 @@ public:
WEBCORE_EXPORT void messagePortChannelDestroyed(MessagePortChannel&);
private:
UncheckedKeyHashMap<MessagePortIdentifier, WeakRef<MessagePortChannel>> m_openChannels;
RefPtr<MessagePortChannel> channelForPort(const MessagePortIdentifier&);
Lock m_openChannelsLock;
UncheckedKeyHashMap<MessagePortIdentifier, WeakRef<MessagePortChannel>> m_openChannels WTF_GUARDED_BY_LOCK(m_openChannelsLock);
};
} // namespace WebCore

View File

@@ -0,0 +1,180 @@
import { expect, test } from "bun:test";
import { bunEnv, bunExe, tempDir } from "harness";
// Test for GitHub issue #25610: Segfault and DataCloneError when using worker threads with MessageChannel
// This tests thread-safety of MessagePortChannelRegistry
test("MessageChannel between multiple workers should not crash", async () => {
using dir = tempDir("25610", {
"worker.js": `
const { parentPort, workerData } = require('worker_threads');
const { port } = workerData;
// Simulate some work
let result = 0;
for (let i = 0; i < 1000; i++) {
result += i;
}
// Send result through the port
port.postMessage({ result, workerId: workerData.id });
port.close();
parentPort.postMessage('done');
`,
"main.js": `
const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');
async function createWorker(id) {
const { port1, port2 } = new MessageChannel();
return new Promise((resolve, reject) => {
const worker = new Worker(path.join(__dirname, 'worker.js'), {
workerData: { port: port2, id },
transferList: [port2]
});
let result = null;
port1.on('message', (msg) => {
result = msg;
});
worker.on('message', () => {
port1.close();
resolve(result);
});
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0 && !result) {
reject(new Error(\`Worker \${id} exited with code \${code}\`));
}
});
});
}
async function main() {
const numWorkers = 10;
const promises = [];
// Create multiple workers concurrently to stress test MessageChannel registry
for (let i = 0; i < numWorkers; i++) {
promises.push(createWorker(i));
}
try {
const results = await Promise.all(promises);
console.log(JSON.stringify({ success: true, workerCount: results.length }));
} catch (error) {
console.log(JSON.stringify({ success: false, error: error.message }));
process.exit(1);
}
}
main();
`,
});
await using proc = Bun.spawn({
cmd: [bunExe(), "main.js"],
cwd: String(dir),
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});
const [stdout, , exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
const result = JSON.parse(stdout.trim());
expect(result.success).toBe(true);
expect(result.workerCount).toBe(10);
expect(exitCode).toBe(0);
});
test("MessageChannel with many concurrent workers should not crash", async () => {
// This tests the thread-safety of MessagePortChannelRegistry by spawning many workers
// with MessageChannel simultaneously
using dir = tempDir("25610-concurrent", {
"worker.js": `
const { parentPort, workerData } = require('worker_threads');
const { port } = workerData;
// Do some work
let sum = 0;
for (let i = 0; i < 1000; i++) {
sum += i;
}
// Signal we're done via the port
port.postMessage({ done: true, id: workerData.id, sum });
port.close();
parentPort.postMessage('finished');
`,
"main.js": `
const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');
async function main() {
const numWorkers = 20;
const promises = [];
for (let i = 0; i < numWorkers; i++) {
const { port1, port2 } = new MessageChannel();
const promise = new Promise((resolve, reject) => {
const worker = new Worker(path.join(__dirname, 'worker.js'), {
workerData: { port: port2, id: i },
transferList: [port2]
});
let result = null;
port1.on('message', (msg) => {
result = msg;
});
worker.on('message', () => {
port1.close();
resolve(result);
});
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0 && !result) {
reject(new Error('Worker ' + i + ' exited with code ' + code));
}
});
});
promises.push(promise);
}
const results = await Promise.all(promises);
console.log(JSON.stringify({ success: true, workerCount: results.length }));
}
main().catch(err => {
console.log(JSON.stringify({ success: false, error: err.message }));
process.exit(1);
});
`,
});
await using proc = Bun.spawn({
cmd: [bunExe(), "main.js"],
cwd: String(dir),
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});
const [stdout, , exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
const result = JSON.parse(stdout.trim());
expect(result.success).toBe(true);
expect(result.workerCount).toBe(20);
expect(exitCode).toBe(0);
});