Compare commits

...

2 Commits

Author SHA1 Message Date
Don Isaac
ce568fda8d Merge branch 'main' of https://github.com/oven-sh/bun into don/fix/worker-threads 2025-04-22 09:25:23 -07:00
Don Isaac
4cfcd6ade7 wip 2025-04-21 16:57:42 -04:00
21 changed files with 272 additions and 84 deletions

View File

@@ -69,7 +69,7 @@ namespace WebCore {
WTF_MAKE_TZONE_ALLOCATED_IMPL(Worker);
extern "C" void WebWorker__requestTerminate(
void* worker);
WebWorker* worker);
static Lock allWorkersLock;
static HashMap<ScriptExecutionContextIdentifier, Worker*>& allWorkers() WTF_REQUIRES_LOCK(allWorkersLock)
@@ -107,8 +107,8 @@ Worker::Worker(ScriptExecutionContext& context, WorkerOptions&& options)
auto addResult = allWorkers().add(m_clientIdentifier, this);
ASSERT_UNUSED(addResult, addResult.isNewEntry);
}
extern "C" bool WebWorker__updatePtr(void* worker, Worker* ptr);
extern "C" void* WebWorker__create(
extern "C" bool WebWorker__updatePtr(WebWorker* worker, Worker* ptr);
extern "C" WebWorker* WebWorker__create(
Worker* worker,
void* parent,
BunString name,
@@ -188,7 +188,7 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
return { reinterpret_cast<WTF::StringImpl**>(vec.data()), vec.size() };
})
.value_or(std::span<WTF::StringImpl*> {});
void* impl = WebWorker__create(
WebWorker* impl = WebWorker__create(
worker.ptr(),
jsCast<Zig::GlobalObject*>(context.jsGlobalObject())->bunVM(),
nameStr,
@@ -467,7 +467,7 @@ extern "C" void WebWorker__dispatchError(Zig::GlobalObject* globalObject, Worker
worker->dispatchError(message.toWTFString(BunString::ZeroCopy));
}
extern "C" WebCore::Worker* WebWorker__getParentWorker(void*);
extern "C" WebCore::Worker* WebWorker__getParentWorker(void* bunVm);
JSC_DEFINE_HOST_FUNCTION(jsReceiveMessageOnPort, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
{

View File

@@ -55,6 +55,9 @@ class WorkerGlobalScopeProxy;
struct StructuredSerializeOptions;
struct WorkerOptions;
// see web_worker.zig
struct WebWorker;
class Worker final : public ThreadSafeRefCounted<Worker>, public EventTargetWithInlineData, private ContextDestructionObserver {
WTF_MAKE_TZONE_ALLOCATED(Worker);
@@ -142,7 +145,7 @@ private:
// Tracks TerminateRequestedFlag and TerminatedFlag
std::atomic<uint8_t> m_terminationFlags { 0 };
const ScriptExecutionContextIdentifier m_clientIdentifier;
void* impl_ { nullptr };
WebWorker* impl_ { nullptr };
};
JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject);

View File

@@ -212,8 +212,11 @@ class Worker extends EventEmitter {
#urlToRevoke = "";
#isRunning = false;
constructor(filename: string, options: NodeWorkerOptions = {}) {
constructor(filename: URL | string, options: NodeWorkerOptions = {}) {
super();
if ($isUndefinedOrNull(filename) || !(typeof filename === "string" || filename instanceof URL)) {
throw $ERR_INVALID_ARG_TYPE("filename", "string or an instance of URL", filename);
}
for (const key of unsupportedOptions) {
if (key in options && options[key] != null) {
warnNotImplementedOnce(`worker_threads.Worker option "${key}"`);

View File

@@ -0,0 +1,56 @@
'use strict';
// This test ensures that CryptoKey instances can be correctly
// sent to a Worker via postMessage.
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const { subtle } = globalThis.crypto;
const { once } = require('events');
const {
Worker,
parentPort,
} = require('worker_threads');
const keyData =
Buffer.from(
'000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f', 'hex');
const sig = '13691a79fb55a0417e4d6699a32f91ad29283fa2c1439865cc0632931f4f48dc';
async function doSig(key) {
const signature = await subtle.sign({
name: 'HMAC'
}, key, Buffer.from('some data'));
assert.strictEqual(Buffer.from(signature).toString('hex'), sig);
}
if (process.env.HAS_STARTED_WORKER) {
return parentPort.once('message', (key) => {
assert.strictEqual(key.algorithm.name, 'HMAC');
doSig(key).then(common.mustCall());
});
}
// Don't use isMainThread to allow running this test inside a worker.
process.env.HAS_STARTED_WORKER = 1;
(async function() {
const worker = new Worker(__filename);
await once(worker, 'online');
const key = await subtle.importKey(
'raw',
keyData,
{ name: 'HMAC', hash: 'SHA-256' },
true, ['sign', 'verify']);
worker.postMessage(key);
await doSig(key);
})().then(common.mustCall());

View File

@@ -0,0 +1,30 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');
const { once } = require('events');
(async function() {
const w = new Worker('', { eval: true });
await once(w, 'exit');
await assert.rejects(() => w.getHeapSnapshot(), {
name: 'Error',
code: 'ERR_WORKER_NOT_RUNNING'
});
})().then(common.mustCall());
(async function() {
const worker = new Worker('setInterval(() => {}, 1000);', { eval: true });
await once(worker, 'online');
[1, true, [], null, Infinity, NaN].forEach((i) => {
assert.throws(() => worker.getHeapSnapshot(i), {
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError',
message: 'The "options" argument must be of type object.' +
common.invalidArgTypeHelper(i)
});
});
await worker.terminate();
})().then(common.mustCall());

View File

@@ -0,0 +1,11 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');
// Regression for https://github.com/nodejs/node/issues/43182.
const w = new Worker(new URL('data:text/javascript,process.exit(1);await new Promise(()=>{ process.exit(2); })'));
w.on('exit', common.mustCall((code) => {
assert.strictEqual(code, 1);
}));

View File

@@ -0,0 +1,16 @@
'use strict';
const common = require('../common');
const { once } = require('events');
const { Worker } = require('worker_threads');
// Test that calling worker.terminate() on an unref()ed Worker instance
// still resolves the returned Promise.
async function test() {
const worker = new Worker('setTimeout(() => {}, 1000000);', { eval: true });
await once(worker, 'online');
worker.unref();
await worker.terminate();
}
test().then(common.mustCall());

View File

@@ -0,0 +1,29 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');
{
[
undefined,
null,
false,
0,
Symbol('test'),
{},
[],
() => {},
].forEach((val) => {
assert.throws(
() => new Worker(val),
{
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError',
message: 'The "filename" argument must be of type string ' +
'or an instance of URL.' +
common.invalidArgTypeHelper(val)
}
);
});
}

View File

@@ -1,8 +1,16 @@
import { describe, expect, test } from "bun:test";
import { describe, expect, test, it } from "bun:test";
import { bunEnv, bunExe } from "harness";
import path from "path";
import wt from "worker_threads";
function fixtureUrl(fixture: string): string;
function fixtureUrl(fixture: string, url: false): string;
function fixtureUrl(fixture: string, url: true): URL;
function fixtureUrl(fixture: string, url = false) {
const u = new URL(path.posix.join("fixtures", fixture), import.meta.url);
return url ? u : u.href;
}
describe("web worker", () => {
async function waitForWorkerResult(worker: Worker, message: any): Promise<any> {
const promise = new Promise((resolve, reject) => {
@@ -29,35 +37,32 @@ describe("web worker", () => {
});
test("string", async () => {
const worker = new Worker(new URL("worker-fixture-preload-entry.js", import.meta.url).href, {
preload: new URL("worker-fixture-preload.js", import.meta.url).href,
const worker = new Worker(fixtureUrl("worker-fixture-preload-entry.js"), {
preload: fixtureUrl("worker-fixture-preload.js"),
});
const result = await waitForWorkerResult(worker, "hello world");
expect(result).toEqual("hello world");
});
test("array of 2 strings", async () => {
const worker = new Worker(new URL("worker-fixture-preload-entry.js", import.meta.url).href, {
preload: [
new URL("worker-fixture-preload.js", import.meta.url).href,
new URL("worker-fixture-preload-2.js", import.meta.url).href,
],
const worker = new Worker(fixtureUrl("worker-fixture-preload-entry.js"), {
preload: [fixtureUrl("worker-fixture-preload.js"), fixtureUrl("worker-fixture-preload-2.js")],
});
const result = await waitForWorkerResult(worker, "hello world world");
expect(result).toEqual("hello world world");
});
test("array of string", async () => {
const worker = new Worker(new URL("worker-fixture-preload-entry.js", import.meta.url).href, {
preload: [new URL("worker-fixture-preload.js", import.meta.url).href],
const worker = new Worker(fixtureUrl("worker-fixture-preload-entry.js"), {
preload: [fixtureUrl("worker-fixture-preload.js")],
});
const result = await waitForWorkerResult(worker, "hello world");
expect(result).toEqual("hello world");
});
test("error in preload doesn't crash parent", async () => {
const worker = new Worker(new URL("worker-fixture-preload-entry.js", import.meta.url).href, {
preload: [new URL("worker-fixture-preload-bad.js", import.meta.url).href],
const worker = new Worker(fixtureUrl("worker-fixture-preload-entry.js"), {
preload: [fixtureUrl("worker-fixture-preload-bad.js")],
});
const { resolve, promise } = Promise.withResolvers();
worker.onerror = e => {
@@ -71,7 +76,7 @@ describe("web worker", () => {
});
test("worker", done => {
const worker = new Worker(new URL("worker-fixture.js", import.meta.url).href, {
const worker = new Worker(fixtureUrl("worker-fixture.js"), {
smol: true,
});
expect(worker.threadId).toBeGreaterThan(0);
@@ -94,7 +99,7 @@ describe("web worker", () => {
});
test("worker-env", done => {
const worker = new Worker(new URL("worker-fixture-env.js", import.meta.url).href, {
const worker = new Worker(fixtureUrl("worker-fixture-env.js"), {
env: {
// Verify that we use putDirectMayBeIndex instead of putDirect
[0]: "123",
@@ -135,7 +140,7 @@ describe("web worker", () => {
obj["prop " + i] = Math.random().toString();
}
const worker = new Worker(new URL("worker-fixture-env.js", import.meta.url).href, {
const worker = new Worker(fixtureUrl("worker-fixture-env.js"), {
env: obj,
});
worker.postMessage("hello");
@@ -158,7 +163,7 @@ describe("web worker", () => {
});
test("argv / execArgv defaults", async () => {
const worker = new Worker(new URL("worker-fixture-argv.js", import.meta.url).href, {});
const worker = new Worker(fixtureUrl("worker-fixture-argv.js"), {});
worker.postMessage("hello");
const result = await waitForWorkerResult(worker, "hello");
@@ -171,14 +176,18 @@ describe("web worker", () => {
const worker_execArgv = ["--no-warnings", "--no-deprecation", "--tls-min-v1.2"];
const original_argv = [...process.argv];
const original_execArgv = [...process.execArgv];
const worker = new Worker(new URL("worker-fixture-argv.js", import.meta.url).href, {
const worker = new Worker(fixtureUrl("worker-fixture-argv.js"), {
argv: worker_argv,
execArgv: worker_execArgv,
});
const result = await waitForWorkerResult(worker, "hello");
expect(result).toEqual({
argv: [original_argv[0], original_argv[1].replace(import.meta.file, "worker-fixture-argv.js"), ...worker_argv],
argv: [
original_argv[0],
original_argv[1].replace(import.meta.file, "fixtures/worker-fixture-argv.js"),
...worker_argv,
],
execArgv: worker_execArgv,
});
// ensure they didn't change for the main thread
@@ -187,7 +196,7 @@ describe("web worker", () => {
});
test("sending 50 messages should just work", done => {
const worker = new Worker(new URL("worker-fixture-many-messages.js", import.meta.url).href, {});
const worker = new Worker(fixtureUrl("worker-fixture-many-messages.js"), {});
worker.postMessage("initial message");
worker.addEventListener("message", ({ data }) => {
@@ -202,7 +211,11 @@ describe("web worker", () => {
test("worker with event listeners doesn't close event loop", done => {
const x = Bun.spawn({
cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.js"), "worker-fixture-many-messages.js"],
cmd: [
bunExe(),
path.join(import.meta.dir, "many-messages-event-loop.js"),
"fixtures/worker-fixture-many-messages.js",
],
env: bunEnv,
stdio: ["inherit", "pipe", "inherit"],
});
@@ -230,7 +243,11 @@ describe("web worker", () => {
test("worker with event listeners doesn't close event loop 2", done => {
const x = Bun.spawn({
cmd: [bunExe(), path.join(import.meta.dir, "many-messages-event-loop.js"), "worker-fixture-many-messages2.js"],
cmd: [
bunExe(),
path.join(import.meta.dir, "many-messages-event-loop.js"),
"fixtures/worker-fixture-many-messages2.js",
],
env: bunEnv,
stdio: ["inherit", "pipe", "inherit"],
});
@@ -257,7 +274,7 @@ describe("web worker", () => {
});
test("worker with process.exit", done => {
const worker = new Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
const worker = new Worker(fixtureUrl("worker-fixture-process-exit.js"), {
smol: true,
});
worker.addEventListener("close", e => {
@@ -273,8 +290,22 @@ describe("web worker", () => {
// TODO: move to node:worker_threads tests directory
describe("worker_threads", () => {
describe("Worker constructor", () => {
// @ts-expect-error
it("is not callable", () => expect(() => wt.Worker()).toThrow(TypeError));
it("is named 'Worker'", () => expect(wt.Worker.name).toBe("Worker"));
it("has a length of 1", () => expect(wt.Worker.length).toBe(1));
it("must have a filename argument", () => {
// @ts-expect-error
expect(() => new wt.Worker()).toThrowWithCode(TypeError, "ERR_INVALID_ARG_TYPE");
});
it.each([undefined, null, 0, 1, true, false, Symbol("hi"), {}])("throws when filename is %p", (badFilename: any) =>
expect(() => new wt.Worker(badFilename)).toThrowWithCode(TypeError, "ERR_INVALID_ARG_TYPE"),
);
});
test("worker with process.exit", done => {
const worker = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
const worker = new wt.Worker(fixtureUrl("worker-fixture-process-exit.js"), {
smol: true,
});
worker.on("exit", code => {
@@ -288,63 +319,71 @@ describe("worker_threads", () => {
});
});
test("worker terminate", async () => {
const worker = new wt.Worker(new URL("worker-fixture-hang.js", import.meta.url).href, {
smol: true,
describe("terminate()", () => {
test("exits with code 0", async () => {
const worker = new wt.Worker(fixtureUrl("worker-fixture-hang.js"), {
smol: true,
});
const code = await worker.terminate();
expect(code).toBe(0);
});
test.todo("worker terminating forcefully properly interrupts", async () => {
const worker = new wt.Worker(fixtureUrl("worker-fixture-while-true.js"), {});
await new Promise<void>(done => {
worker.on("message", () => done());
});
const code = await worker.terminate();
expect(code).toBe(0);
});
test("when worker exits with code 2 after delay, exit code is 2", async () => {
const worker = new wt.Worker(fixtureUrl("worker-fixture-process-exit.js"), {
smol: true,
});
await Bun.sleep(200);
const code = await worker.terminate();
expect(code).toBe(2);
});
const code = await worker.terminate();
expect(code).toBe(0);
});
test("worker with process.exit (delay) and terminate", async () => {
const worker = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
smol: true,
});
await Bun.sleep(200);
const code = await worker.terminate();
expect(code).toBe(2);
});
describe("argv/execArgv", () => {
test("when not set, defaults to process.{argv,execArgv}", async () => {
const worker = new wt.Worker(fixtureUrl("worker-fixture-argv.js", true), {});
const promise = new Promise<any>(resolve => worker.on("message", resolve));
worker.postMessage("hello");
const result = await promise;
test.todo("worker terminating forcefully properly interrupts", async () => {
const worker = new wt.Worker(new URL("worker-fixture-while-true.js", import.meta.url).href, {});
await new Promise<void>(done => {
worker.on("message", () => done());
});
const code = await worker.terminate();
expect(code).toBe(0);
});
test("worker without argv/execArgv", async () => {
const worker = new wt.Worker(new URL("worker-fixture-argv.js", import.meta.url), {});
const promise = new Promise<any>(resolve => worker.on("message", resolve));
worker.postMessage("hello");
const result = await promise;
expect(result.argv).toHaveLength(process.argv.length);
expect(result.execArgv).toHaveLength(process.execArgv.length);
});
test("worker with argv/execArgv", async () => {
const worker_argv = ["--some-arg=1", "--some-arg=2"];
const worker_execArgv = ["--no-warnings", "--no-deprecation", "--tls-min-v1.2"];
const original_argv = [...process.argv];
const original_execArgv = [...process.execArgv];
const worker = new wt.Worker(new URL("worker-fixture-argv.js", import.meta.url), {
argv: worker_argv,
execArgv: worker_execArgv,
});
const promise = new Promise<any>(resolve => worker.once("message", resolve));
worker.postMessage("hello");
const result = await promise;
expect(result).toEqual({
argv: [original_argv[0], original_argv[1].replace(import.meta.file, "worker-fixture-argv.js"), ...worker_argv],
execArgv: worker_execArgv,
expect(result.argv).toHaveLength(process.argv.length);
expect(result.execArgv).toHaveLength(process.execArgv.length);
});
// ensure they didn't change for the main thread
expect(process.argv).toEqual(original_argv);
expect(process.execArgv).toEqual(original_execArgv);
test("can be passed to the worker", async () => {
const worker_argv = ["--some-arg=1", "--some-arg=2"];
const worker_execArgv = ["--no-warnings", "--no-deprecation", "--tls-min-v1.2"];
const original_argv = [...process.argv];
const original_execArgv = [...process.execArgv];
const worker = new wt.Worker(new URL("fixtures/worker-fixture-argv.js", import.meta.url), {
argv: worker_argv,
execArgv: worker_execArgv,
});
const promise = new Promise<any>(resolve => worker.once("message", resolve));
worker.postMessage("hello");
const result = await promise;
expect(result).toEqual({
argv: [
original_argv[0],
original_argv[1].replace(import.meta.file, "fixtures/worker-fixture-argv.js"),
...worker_argv,
],
execArgv: worker_execArgv,
});
// ensure they didn't change for the main thread
expect(process.argv).toEqual(original_argv);
expect(process.execArgv).toEqual(original_execArgv);
});
});
test("worker with eval = false fails with code", async () => {
@@ -352,15 +391,16 @@ describe("worker_threads", () => {
try {
const worker = new wt.Worker("console.log('this should not get printed')", { eval: false });
} catch (err) {
expect(err.constructor.name).toEqual("TypeError");
expect(err.message).toMatch(/BuildMessage: ModuleNotFound.+/);
expect(err).toBeInstanceOf(TypeError);
expect((err as TypeError).constructor.name).toEqual("TypeError");
expect((err as TypeError).message).toMatch(/BuildMessage: ModuleNotFound.+/);
has_error = true;
}
expect(has_error).toBe(true);
});
test("worker with eval = true succeeds with valid code", async () => {
let message;
let message: unknown;
const worker = new wt.Worker("postMessage('hello')", { eval: true });
worker.on("message", e => {
message = e;