Compare commits

...

186 Commits

Author SHA1 Message Date
Alistair Smith
a45c2632c3 Merge branch 'main' into ali/piscina 2025-10-21 08:06:46 +09:00
Alistair Smith
a5e5ea2e60 Merge branch 'main' into ali/piscina 2025-09-26 23:54:08 -07:00
Alistair Smith
499201be48 try notifyNeedTermination in process.exit() when in a worker 2025-09-23 15:45:01 -07:00
Alistair Smith
7e557e10fd Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-09-23 15:20:41 -07:00
Alistair Smith
b6ee037172 rewrite bun in rust 2025-09-23 00:46:19 -07:00
Meghan Denny
9effd4df31 fix worker.test.ts 2025-09-23 00:05:54 -07:00
Alistair Smith
0c0dc471da change name of test to be unique 2025-09-22 23:14:04 -07:00
Alistair Smith
41457fb37b fix: Failing error msg 2025-09-22 22:31:32 -07:00
Alistair Smith
c43a251d9c Remove unnecessary cast for Bun.jest 2025-09-22 22:18:50 -07:00
Alistair Smith
8d0eeba251 revert runner 2025-09-22 22:17:15 -07:00
autofix-ci[bot]
c354cfd86e [autofix.ci] apply automated fixes 2025-09-23 05:16:21 +00:00
Alistair Smith
16ad3d9f9d remove unused header 2025-09-22 22:14:45 -07:00
Alistair Smith
ca1e7022eb remove return value 2025-09-22 21:57:15 -07:00
Alistair Smith
cbdd965643 checks 2025-09-22 21:57:15 -07:00
Alistair Smith
ee68193de7 changes
Co-authored-by: taylor.fish <contact@taylor.fish>
2025-09-22 21:57:15 -07:00
Alistair Smith
2210add1a8 fix double export 2025-09-22 21:57:15 -07:00
Alistair Smith
019c0c12d0 some lifecycle change 2025-09-22 21:57:15 -07:00
Alistair Smith
ca8d9968fe use bun.new 2025-09-22 21:57:15 -07:00
Alistair Smith
1015e0f978 smaller diff 2025-09-22 21:57:15 -07:00
Alistair Smith
3b8abfb5e1 fix compilation error 2025-09-22 21:57:15 -07:00
Alistair Smith
4720de1be4 revert http.zig 2025-09-22 21:57:15 -07:00
Alistair Smith
f452ca7c36 fix issues arisen from rebase 2025-09-22 21:57:15 -07:00
Alistair Smith
9afa4249e1 generate it 2025-09-22 21:57:15 -07:00
Alistair Smith
a8db2da0c9 revert 2025-09-22 21:57:15 -07:00
Alistair Smith
b7b2fbe2a4 restore lockfile 2025-09-22 21:57:15 -07:00
autofix-ci[bot]
36f98aad85 [autofix.ci] apply automated fixes 2025-09-22 21:57:14 -07:00
alii
d038264002 bun run clang-format 2025-09-22 21:57:14 -07:00
Alistair Smith
bc3af0c0f5 recomment (exception checks help) 2025-09-22 21:57:14 -07:00
Alistair Smith
853cbf42ee copy from webkit 2025-09-22 21:57:14 -07:00
Alistair Smith
cae404e99b changes 2025-09-22 21:57:14 -07:00
Alistair Smith
976052e541 helps wiht reprod 2025-09-22 21:57:14 -07:00
Alistair Smith
501088189a piscina atomics test (panicky) 2025-09-22 21:57:14 -07:00
Alistair Smith
6ad7b8a8b7 dont unref early 2025-09-22 21:57:14 -07:00
Alistair Smith
80684fe9c6 comments 2025-09-22 21:57:14 -07:00
Alistair Smith
2ade52c24e check exit is actually called before parent exits (ref issue?) 2025-09-22 21:57:14 -07:00
Alistair Smith
1b11a433eb test now passes 2025-09-22 21:57:14 -07:00
Alistair Smith
0c86f6dcc9 let's see what ci thinks 2025-09-22 21:57:14 -07:00
Alistair Smith
e4a780ac64 faster 2025-09-22 21:57:14 -07:00
Alistair Smith
6ffcc53014 improve worker memory leak test ? 2025-09-22 21:57:14 -07:00
Alistair Smith
8c15bde769 this does fix the process exit test 2025-09-22 21:57:14 -07:00
Alistair Smith
04353da520 for now 2025-09-22 21:57:14 -07:00
Alistair Smith
a9b9276ed5 rm dupe test 2025-09-22 21:57:14 -07:00
Alistair Smith
3598b9401d rm 2025-09-22 21:57:14 -07:00
Alistair Smith
5c754dc8cb process.exit() failure 2025-09-22 21:57:14 -07:00
Alistair Smith
96c18bdf26 changes 2025-09-22 21:57:14 -07:00
Alistair Smith
e34d7592fd unclear if useful 2025-09-22 21:57:14 -07:00
Alistair Smith
8b6aad0490 experiment: try request termination immediately 2025-09-22 21:57:14 -07:00
Alistair Smith
b3f8adad0b we should not tick after process.exit() was called in a webworker 2025-09-22 21:57:14 -07:00
Alistair Smith
1963ddab8c non-nullable lifecycle_handle 2025-09-22 21:57:14 -07:00
Alistair Smith
26e6cd8d1f requestTermination instead of exiting immediately 2025-09-22 21:57:14 -07:00
Alistair Smith
9981c87c56 can→could (fixes a piscina test case) 2025-09-22 21:57:14 -07:00
Alistair Smith
222ff4d7b4 Revert "experiment: clear socket_async_http_abort_tracker?"
This reverts commit 74581471c0.
2025-09-22 21:57:14 -07:00
Alistair Smith
6550f8c251 experiment: clear socket_async_http_abort_tracker? 2025-09-22 21:57:14 -07:00
Alistair Smith
5719568ec0 mv 2025-09-22 21:57:14 -07:00
Alistair Smith
80a23c7142 rm 2025-09-22 21:57:14 -07:00
Alistair Smith
975caa75b1 dont miss this 2025-09-22 21:57:14 -07:00
Alistair Smith
8143c0690c node worker is async disposable 2025-09-22 21:57:14 -07:00
Alistair Smith
cdb7368151 coverage in node to be sure 2025-09-22 21:57:14 -07:00
Alistair Smith
041c62efbc fix tests (updated to match node behaviour) 2025-09-22 21:57:14 -07:00
Alistair Smith
18653f2a65 plausible 2025-09-22 21:57:14 -07:00
Alistair Smith
5d8d430a14 remove comment 2025-09-22 21:57:14 -07:00
Alistair Smith
28bf87f586 coverage (ran in both node & bun) 2025-09-22 21:57:14 -07:00
Alistair Smith
30d546b7aa clean up exit logic of worker_threads.Worker 2025-09-22 21:57:14 -07:00
Alistair Smith
41763a0a79 fix test in ci (?) 2025-09-22 21:55:38 -07:00
Alistair Smith
2f671aacdc Attempt for CI only 2025-09-22 21:55:38 -07:00
Alistair Smith
b72e83cd5a movie 2025-09-22 21:55:38 -07:00
Alistair Smith
8dbaccf5e1 hm 2025-09-22 21:55:38 -07:00
Alistair Smith
79b56ca9c3 change 2025-09-22 21:55:38 -07:00
Alistair Smith
77640c6290 further reduce diff 2025-09-22 21:55:38 -07:00
Alistair Smith
252efe8dc5 postImmediateCppTask was unused 2025-09-22 21:55:38 -07:00
Alistair Smith
4b015cf585 ignore vibe tools rule in future 2025-09-22 21:55:37 -07:00
Alistair Smith
dc3bbdab96 remove vibe tools since not everybody will have it installed 2025-09-22 21:55:37 -07:00
Alistair Smith
e3d3015d47 revert some 2025-09-22 21:55:37 -07:00
Alistair Smith
9b6f4822f0 not async (unrelated to test failure) 2025-09-22 21:55:37 -07:00
Alistair Smith
49d1090dbb do the fetch(unrelated to test failure) 2025-09-22 21:55:37 -07:00
Alistair Smith
8a0c11c331 lifecycleHandle_ might be gone already 2025-09-22 21:55:37 -07:00
Alistair Smith
637325f4b4 script execution context is already gone at this point 2025-09-22 21:55:37 -07:00
Alistair Smith
7df0e39660 extract terminate test for node compat work 2025-09-22 21:55:37 -07:00
Alistair Smith
f5b2d2181f fwd declare WebWorkerLifecycleHandle 2025-09-22 21:55:37 -07:00
Alistair Smith
35f69d9001 make nullable for cyclic lifecycle handle 2025-09-22 21:55:37 -07:00
Alistair Smith
3bee77904a revert preloads free 2025-09-22 21:55:37 -07:00
Alistair Smith
5bfed41d1a rm exports 2025-09-22 21:55:37 -07:00
Alistair Smith
461c9db98a set ref only if not gone 2025-09-22 21:55:37 -07:00
Alistair Smith
363edf7b36 rm 2025-09-22 21:55:37 -07:00
Alistair Smith
ca18b5e410 whoops 2025-09-22 21:55:37 -07:00
Alistair Smith
92c19a2dd3 whoops 2025-09-22 21:55:37 -07:00
alii
7be4234c0e bun run clang-format 2025-09-22 21:55:37 -07:00
Alistair Smith
7499f99fc8 seems to work 2025-09-22 21:55:37 -07:00
Alistair Smith
9ffc850887 seems to work 2025-09-22 21:55:37 -07:00
Alistair Smith
468e2b9ee4 fix 2025-09-22 21:55:37 -07:00
Alistair Smith
bc743a9766 changes 2025-09-22 21:55:37 -07:00
Alistair Smith
8a2fa506bb cjange 2025-09-22 21:55:37 -07:00
Alistair Smith
e6bc9fbb77 whoops 2025-09-22 21:55:37 -07:00
Alistair Smith
9f4157d934 WebWorkerLifecycleHandle 2025-09-22 21:55:37 -07:00
Alistair Smith
8980ec7829 invert ownership 2025-09-22 21:55:37 -07:00
Alistair Smith
7bfdcd8da2 . 2025-09-22 21:55:37 -07:00
alii
fa5a6d4380 bun run clang-format 2025-09-22 21:55:37 -07:00
Alistair Smith
b6026d75df freeWithoutDeinit 2025-09-22 21:55:37 -07:00
Alistair Smith
98c265fd36 not much 2025-09-22 21:55:37 -07:00
Alistair Smith
9df2b09768 push 2025-09-22 21:55:37 -07:00
alii
a951a85d82 bun run clang-format 2025-09-22 21:55:37 -07:00
Alistair Smith
963ba1b632 uaf 2025-09-22 21:55:37 -07:00
Alistair Smith
84bb592f87 hm 2025-09-22 21:55:37 -07:00
Alistair Smith
db0de23737 addressing some pr feedback 2025-09-22 21:55:37 -07:00
Alistair Smith
5b9abfa756 uaf 2025-09-22 21:55:37 -07:00
alii
696343b9fd bun run clang-format 2025-09-22 21:55:37 -07:00
Alistair Smith
799d7058d9 use a coutnter 2025-09-22 21:55:37 -07:00
Alistair Smith
2066644e16 Bun__queueImmediateCppTask again 2025-09-22 21:55:37 -07:00
Alistair Smith
224c9bfc05 start removing currentTickNr behaviour 2025-09-22 21:55:36 -07:00
Alistair Smith
978b94644c remove immediate_cpp_tasks 2025-09-22 21:55:36 -07:00
Alistair Smith
48734ac20b nitpick @heimskr 2025-09-22 21:55:36 -07:00
Alistair Smith
7fcf9095c3 use Buffer.alloc 2025-09-22 21:55:36 -07:00
Alistair Smith
4e451f9521 remove immediate cpp task 2025-09-22 21:55:36 -07:00
Ben Grant
d23afd6c9b try more retries for suspicious tests 2025-09-22 21:55:36 -07:00
Alistair Smith
c9e92e4a5a attempt: dont process messages if suspended/not entangled 2025-09-22 21:54:37 -07:00
Alistair Smith
ecf262f250 ignore .astro dist/build folder in fixture 2025-09-22 21:54:37 -07:00
Alistair Smith
89ef4c6b52 dont use spawnSync (broken behaviour) 2025-09-22 21:54:37 -07:00
Alistair Smith
2420b0726d wait 2025-09-22 21:54:37 -07:00
Alistair Smith
2ce5b9eab7 tick immediate tasks to process MessagePort postMessage stuff 2025-09-22 21:54:37 -07:00
alii
d94996bf68 bun run prettier 2025-09-22 21:54:37 -07:00
Alistair Smith
fbb21b8ebc use assert here so we can test in node 2025-09-22 21:54:37 -07:00
Alistair Smith
b4282814ff ignore 2025-09-22 21:54:37 -07:00
Alistair Smith
833ee60b5c config 2025-09-22 21:53:25 -07:00
Alistair Smith
0793881e56 fix 2025-09-22 21:53:25 -07:00
Alistair Smith
b2d0899442 @eastlondoner 2025-09-22 21:53:25 -07:00
Alistair Smith
182470909e irrelevant 2025-09-22 21:53:25 -07:00
Alistair Smith
bc39cd2806 avoid assertion error 2025-09-22 21:53:25 -07:00
Alistair Smith
5fac9749e1 couple of tests 2025-09-22 21:53:25 -07:00
Alistair Smith
07fc765d9c merge issue 2025-09-22 21:53:25 -07:00
alii
b50d5dfc83 bun run clang-format 2025-09-22 21:53:25 -07:00
Alistair Smith
b77139967e use RELEASE_ASSERT (https://github.com/oven-sh/bun/pull/19940#discussion_r2116938040) 2025-09-22 21:53:25 -07:00
Alistair Smith
eacd40daab revert rule 2025-09-22 21:53:25 -07:00
alii
09cbcfe20a bun run clang-format 2025-09-22 21:53:24 -07:00
Alistair Smith
1b332ec147 address some comments 2025-09-22 21:53:24 -07:00
Alistair Smith
da657db887 put _compile on actual Module prototype 2025-09-22 21:53:24 -07:00
Alistair Smith
2a09ad8e75 clean 2025-09-22 21:49:39 -07:00
alii
922bc0e0b3 bun run clang-format 2025-09-22 21:49:39 -07:00
Alistair Smith
c59fd309c4 changes 2025-09-22 21:49:39 -07:00
Alistair Smith
4bf3836f0b 1 2025-09-22 21:49:39 -07:00
Alistair Smith
bf0510aea4 fields 2025-09-22 21:49:39 -07:00
alii
7bb256ef4f bun run clang-format 2025-09-22 21:49:39 -07:00
Alistair Smith
35a528fb99 drain 2025-09-22 21:49:39 -07:00
Alistair Smith
8a8a23c5d6 xchanges 2025-09-22 21:49:38 -07:00
Alistair Smith
10551fcc90 chages 2025-09-22 21:49:38 -07:00
Alistair Smith
2c0f8f29b6 rule 2025-09-22 21:49:38 -07:00
alii
0d9ca788c6 bun run prettier 2025-09-22 21:49:38 -07:00
Alistair Smith
832b74a239 Update .vscode/settings.json
Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
2025-09-22 21:49:38 -07:00
Alistair Smith
cd67a3a024 move 2025-09-22 21:49:38 -07:00
Alistair Smith
5f0c054160 worker lifecycle port test 2025-09-22 21:49:38 -07:00
Alistair Smith
3e64050b67 notifyNeedTermination() 2025-09-22 21:49:38 -07:00
alii
5326514afc bun run clang-format 2025-09-22 21:49:38 -07:00
Alistair Smith
a2bd8ea622 revert 2025-09-22 21:49:38 -07:00
Alistair Smith
836fbfefe8 and here 2025-09-22 21:49:38 -07:00
Alistair Smith
b105877790 fix piscina@5 exactly 2025-09-22 21:49:38 -07:00
Alistair Smith
daf21ad44d dont drain microtasks queue during spawnSync() 2025-09-22 21:49:38 -07:00
Alistair Smith
2d01a3d1e7 unnecessary 2025-09-22 21:49:38 -07:00
Alistair Smith
a10e268584 delegate ref to subprocess 2025-09-22 21:49:38 -07:00
Alistair Smith
3bc5724ff4 better stub child_process channel 2025-09-22 21:49:38 -07:00
Alistair Smith
b6d4a15496 always dispatch close 2025-09-22 21:49:38 -07:00
alii
1d1e88faa7 bun run clang-format 2025-09-22 21:49:38 -07:00
Alistair Smith
54788db012 rm/clean 2025-09-22 21:49:38 -07:00
Alistair Smith
59d2941e25 rm 2025-09-22 21:49:38 -07:00
Alistair Smith
9276453ca4 . 2025-09-22 21:49:38 -07:00
Alistair Smith
c192b8e474 pass identifier not ptr 2025-09-22 21:49:38 -07:00
Alistair Smith
b0f756777c use .fetch_or() 2025-09-22 21:49:38 -07:00
Alistair Smith
6640b4dde4 only close MessagePort if message listeners 2025-09-22 21:49:38 -07:00
Alistair Smith
a9a4d483aa rm 2025-09-22 21:49:38 -07:00
Alistair Smith
fe95423dd8 rm 2025-09-22 21:49:38 -07:00
Alistair Smith
5e79575215 rm 2025-09-22 21:49:38 -07:00
Alistair Smith
9f74448006 fix timeouts? 2025-09-22 21:49:38 -07:00
alii
93650f7317 bun run clang-format 2025-09-22 21:49:38 -07:00
Alistair Smith
aa32166827 some logs 2025-09-22 21:49:38 -07:00
Alistair Smith
72e7177fff move 2025-09-22 21:42:24 -07:00
Alistair Smith
63b9d04a38 msg 2025-09-22 21:42:24 -07:00
Alistair Smith
006cdf353b plenty of debug logs 2025-09-22 21:42:24 -07:00
Alistair Smith
919454ea30 we should test p.destroy() 2025-09-22 21:42:24 -07:00
Alistair Smith
b736d6b364 this back 2025-09-22 21:42:24 -07:00
Alistair Smith
39f9aeb500 rm 2025-09-22 21:42:24 -07:00
Alistair Smith
0f5e9b2dcd testing pass? 2025-09-22 21:42:24 -07:00
alii
4c44a553d3 bun run clang-format 2025-09-22 21:42:24 -07:00
Alistair Smith
2ec4f39747 changes 2025-09-22 21:42:24 -07:00
Alistair Smith
236ff12a2d be clear about async operation order 2025-09-22 21:42:24 -07:00
Alistair Smith
b1e5826ad6 10s is ok 2025-09-22 21:42:24 -07:00
Alistair Smith
b06997e2b1 rm 2025-09-22 21:42:24 -07:00
Alistair Smith
4e6c44d5bc notifyPortClosed 2025-09-22 21:42:24 -07:00
Alistair Smith
1884a8197e simples piscina third party suite 2025-09-22 21:42:13 -07:00
45 changed files with 1358 additions and 131 deletions

View File

@@ -1,6 +1,6 @@
---
description:
globs: src/**/*.cpp,src/**/*.zig
description: How to build Bun
globs: **/*.zig,**/*.ts,**/*.cpp,**/*.c,src/**/*.cpp,src/**/*.zig
alwaysApply: false
---
@@ -23,6 +23,12 @@ bun bd <file> <...args>
**CRITICAL**: Never use `bun <file>` directly. It will not have your changes.
You can also pass arguments to `bun bd` as if it were a regular build of bun. For example:
```bash
bun bd test math-utils.test.ts
```
### Logging
`BUN_DEBUG_$(SCOPE)=1` enables debug logs for a specific debug log scope.

4
.gitignore vendored
View File

@@ -1,3 +1,7 @@
.cursor/rules/vibe-tools.mdc
vibe-tools.config.json
.repomix-output.txt
repomix.config.json
.claude/settings.local.json
.DS_Store
.env

View File

@@ -1703,6 +1703,10 @@ pub fn spawnMaybeSync(
defer {
jsc_vm.uwsLoop().internal_loop_data.jsc_vm = old_vm;
}
jsc_vm.eventLoop().is_inside_spawn_sync = true;
defer jsc_vm.eventLoop().is_inside_spawn_sync = false;
while (subprocess.hasPendingActivityNonThreadsafe()) {
if (subprocess.stdin == .buffer) {
subprocess.stdin.buffer.watch();

View File

@@ -57,7 +57,7 @@ static const DOMException::Description descriptions[] = {
{ "QuotaExceededError"_s, "The quota has been exceeded."_s, 22 },
{ "TimeoutError"_s, "The operation timed out."_s, 23 },
{ "InvalidNodeTypeError"_s, "The supplied node is incorrect or has an incorrect ancestor for this operation."_s, 24 },
{ "DataCloneError"_s, "The object can not be cloned."_s, 25 },
{ "DataCloneError"_s, "The object could not be cloned."_s, 25 }, // TODO: This should include an inspection of the object (e.g. "DOMException [DataCloneError]: hello () {} could not be cloned.")
{ "EncodingError"_s, "The encoding operation (either encoded or decoding) failed."_s, 0 },
{ "NotReadableError"_s, "The I/O read operation failed."_s, 0 },
{ "UnknownError"_s, "The operation failed for an unknown transient reason (e.g. out of memory)."_s, 0 },

View File

@@ -600,9 +600,8 @@ pub const JSGlobalObject = opaque {
return @as(*jsc.VirtualMachine, @ptrCast(@alignCast(this.bunVMUnsafe())));
}
extern fn JSC__JSGlobalObject__handleRejectedPromises(*JSGlobalObject) void;
pub fn handleRejectedPromises(this: *JSGlobalObject) void {
return bun.jsc.fromJSHostCallGeneric(this, @src(), JSC__JSGlobalObject__handleRejectedPromises, .{this}) catch @panic("unreachable");
return bun.cpp.JSC__JSGlobalObject__handleRejectedPromises(this);
}
extern fn ZigGlobalObject__readableStreamToArrayBuffer(*JSGlobalObject, JSValue) JSValue;

View File

@@ -383,6 +383,19 @@ void ScriptExecutionContext::postTask(EventLoopTask* task)
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
}
extern "C" void Bun__queueImmediateCppTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
void ScriptExecutionContext::queueImmediateCppTask(Function<void(ScriptExecutionContext&)>&& lambda)
{
auto* task = new EventLoopTask(WTFMove(lambda));
queueImmediateCppTask(task);
}
void ScriptExecutionContext::queueImmediateCppTask(EventLoopTask* task)
{
Bun__queueImmediateCppTask(m_globalObject, task);
}
// Zig bindings
extern "C" ScriptExecutionContextIdentifier ScriptExecutionContextIdentifier__forGlobalObject(JSC::JSGlobalObject* globalObject)
{

View File

@@ -131,6 +131,9 @@ public:
// Executes the task on context's thread asynchronously.
void postTask(EventLoopTask* task);
void queueImmediateCppTask(Function<void(ScriptExecutionContext&)>&& lambda);
void queueImmediateCppTask(EventLoopTask* task);
template<typename... Arguments>
void postCrossThreadTask(Arguments&&... arguments)
{
@@ -153,6 +156,14 @@ public:
static ScriptExecutionContext* getMainThreadScriptExecutionContext();
bool canSendMessage()
{
static constexpr size_t maxMessagesPerTick = 1000;
return m_messagesSentThisTick < maxMessagesPerTick;
}
void incrementMessageCount() { m_messagesSentThisTick++; }
void resetMessageCount() { m_messagesSentThisTick = 0; }
private:
JSC::VM* m_vm = nullptr;
JSC::JSGlobalObject* m_globalObject = nullptr;
@@ -165,6 +176,7 @@ private:
LazyRef<ScriptExecutionContext, BunBroadcastChannelRegistry> m_broadcastChannelRegistry;
bool m_willProcessMessageWithMessagePortsSoon { false };
size_t m_messagesSentThisTick { 0 };
us_socket_context_t* webSocketContextSSL();
us_socket_context_t* webSocketContextNoSSL();

View File

@@ -2933,6 +2933,7 @@ extern "C" void JSGlobalObject__clearTerminationException(JSC::JSGlobalObject* g
}
extern "C" void Bun__queueTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
extern "C" void Bun__queueImmediateCppTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
extern "C" void Bun__queueTaskConcurrently(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
extern "C" [[ZIG_EXPORT(check_slow)]] void Bun__performTask(Zig::GlobalObject* globalObject, WebCore::EventLoopTask* task)
{
@@ -2956,6 +2957,11 @@ void GlobalObject::queueTask(WebCore::EventLoopTask* task)
Bun__queueTask(this, task);
}
void GlobalObject::queueImmediateCppTask(WebCore::EventLoopTask* task)
{
Bun__queueImmediateCppTask(this, task);
}
void GlobalObject::queueTaskConcurrently(WebCore::EventLoopTask* task)
{
Bun__queueTaskConcurrently(this, task);
@@ -2972,7 +2978,10 @@ void GlobalObject::handleRejectedPromises()
continue;
Bun__handleRejectedPromise(this, promise);
if (auto ex = scope.exception()) this->reportUncaughtExceptionAtEventLoop(this, ex);
if (auto ex = scope.exception()) {
scope.clearException();
this->reportUncaughtExceptionAtEventLoop(this, ex);
}
}
}

View File

@@ -171,6 +171,7 @@ public:
void queueTask(WebCore::EventLoopTask* task);
void queueTaskConcurrently(WebCore::EventLoopTask* task);
void queueImmediateCppTask(WebCore::EventLoopTask* task);
JSDOMStructureMap& structures() WTF_REQUIRES_LOCK(m_gcLock) { return m_structures; }
JSDOMStructureMap& structures(NoLockingNecessaryTag) WTF_IGNORES_THREAD_SAFETY_ANALYSIS

View File

@@ -3667,6 +3667,7 @@ JSC::EncodedJSValue JSC__JSGlobalObject__generateHeapSnapshot(JSC::JSGlobalObjec
JSC::VM* JSC__JSGlobalObject__vm(JSC::JSGlobalObject* arg0) { return &arg0->vm(); };
[[ZIG_EXPORT(nothrow)]]
void JSC__JSGlobalObject__handleRejectedPromises(JSC::JSGlobalObject* arg0)
{
return jsCast<Zig::GlobalObject*>(arg0)->handleRejectedPromises();

View File

@@ -243,6 +243,71 @@ void MessagePort::close()
removeAllEventListeners();
}
void MessagePort::processMessages(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback)
{
auto& vm = context.vm();
auto* globalObject = defaultGlobalObject(context.globalObject());
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running) {
completionCallback();
return;
}
Vector<MessageWithMessagePorts> deferredMessages;
for (auto&& message : messages) {
if (!context.canSendMessage()) {
deferredMessages.append(WTFMove(message));
continue;
}
context.incrementMessageCount();
auto scope = DECLARE_CATCH_SCOPE(vm);
auto ports = MessagePort::entanglePorts(context, WTFMove(message.transferredPorts));
auto event = MessageEvent::create(*context.jsGlobalObject(), message.message.releaseNonNull(), {}, {}, {}, WTFMove(ports));
dispatchEvent(event.event);
if (scope.exception()) [[unlikely]] {
RELEASE_ASSERT(vm.hasPendingTerminationException());
return;
}
// if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) == ScriptExecutionStatus::Running) {
globalObject->drainMicrotasks();
// }
}
if (!deferredMessages.isEmpty()) {
auto* globalObject = defaultGlobalObject(context.globalObject());
auto scriptExecutionStatus = Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject);
if (scriptExecutionStatus != ScriptExecutionStatus::Running || context.isJSExecutionForbidden()) {
completionCallback();
return;
}
// remaining messages should happen on the next on the immediate cpp task queue
auto contextIdentifier = context.identifier();
context.queueImmediateCppTask(
[protectedThis = Ref { *this }, contextIdentifier, deferred = WTFMove(deferredMessages), completionCallback = WTFMove(completionCallback)](ScriptExecutionContext& ctx) mutable {
if (auto* validContext = ScriptExecutionContext::getScriptExecutionContext(contextIdentifier)) {
if (validContext == &ctx && !validContext->activeDOMObjectsAreSuspended() && protectedThis->isEntangled()) {
// then reset for next tick
ctx.resetMessageCount();
protectedThis->processMessages(ctx, WTFMove(deferred), WTFMove(completionCallback));
return;
}
}
// context was destroyed or conditions not met, just complete
completionCallback();
});
} else {
completionCallback();
}
}
void MessagePort::dispatchMessages()
{
// Messages for contexts that are not fully active get dispatched too, but JSAbstractEventListener::handleEvent() doesn't call handlers for these.
@@ -254,41 +319,14 @@ void MessagePort::dispatchMessages()
return;
auto messagesTakenHandler = [this, protectedThis = Ref { *this }](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionCallback) mutable {
auto scopeExit = makeScopeExit(WTFMove(completionCallback));
// LOG(MessagePorts, "MessagePort %s (%p) dispatching %zu messages", m_identifier.logString().utf8().data(), this, messages.size());
RefPtr<ScriptExecutionContext> context = scriptExecutionContext();
if (!context || !context->globalObject())
if (!context || !context->globalObject()) {
completionCallback();
return;
}
ASSERT(context->isContextThread());
auto* globalObject = defaultGlobalObject(context->globalObject());
Ref vm = globalObject->vm();
auto scope = DECLARE_CATCH_SCOPE(vm);
for (auto& message : messages) {
// close() in Worker onmessage handler should prevent next message from dispatching.
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running)
return;
auto ports = MessagePort::entanglePorts(*context, WTFMove(message.transferredPorts));
if (scope.exception()) [[unlikely]] {
// Currently, we assume that the only way we can get here is if we have a termination.
RELEASE_ASSERT(vm->hasPendingTerminationException());
return;
}
// Per specification, each MessagePort object has a task source called the port message queue.
// queueTaskKeepingObjectAlive(context, *this, TaskSource::PostedMessageQueue, [this, event = WTFMove(event)] {
// dispatchEvent(event.event);
// });
ScriptExecutionContext::postTaskTo(context->identifier(), [protectedThis = Ref { *this }, ports = WTFMove(ports), message = WTFMove(message)](ScriptExecutionContext& context) mutable {
auto event = MessageEvent::create(*context.jsGlobalObject(), message.message.releaseNonNull(), {}, {}, {}, WTFMove(ports));
protectedThis->dispatchEvent(event.event);
});
}
processMessages(*context, WTFMove(messages), WTFMove(completionCallback));
};
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(m_identifier, WTFMove(messagesTakenHandler));

View File

@@ -144,9 +144,9 @@ private:
MessagePortIdentifier m_remoteIdentifier;
mutable std::atomic<unsigned> m_refCount { 1 };
void processMessages(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback);
bool m_hasRef { false };
uint32_t m_messageEventCount { 0 };
static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind);
};

View File

@@ -69,8 +69,8 @@ namespace WebCore {
WTF_MAKE_TZONE_ALLOCATED_IMPL(Worker);
extern "C" void WebWorker__notifyNeedTermination(
void* worker);
extern "C" void WebWorkerLifecycleHandle__requestTermination(WebWorkerLifecycleHandle* worker);
extern "C" void WebWorkerLifecycleHandle__release(WebWorkerLifecycleHandle* worker);
static Lock allWorkersLock;
static HashMap<ScriptExecutionContextIdentifier, Worker*>& allWorkers() WTF_REQUIRES_LOCK(allWorkersLock)
@@ -109,7 +109,7 @@ Worker::Worker(ScriptExecutionContext& context, WorkerOptions&& options)
ASSERT_UNUSED(addResult, addResult.isNewEntry);
}
extern "C" bool WebWorker__updatePtr(void* worker, Worker* ptr);
extern "C" void* WebWorker__create(
extern "C" WebWorkerLifecycleHandle* WebWorkerLifecycleHandle__createWebWorker(
Worker* worker,
void* parent,
BunString name,
@@ -133,12 +133,12 @@ extern "C" void WebWorker__setRef(
void Worker::setKeepAlive(bool keepAlive)
{
WebWorker__setRef(impl_, keepAlive);
WebWorker__setRef(lifecycleHandle_, keepAlive);
}
bool Worker::updatePtr()
{
if (!WebWorker__updatePtr(impl_, this)) {
if (!WebWorker__updatePtr(lifecycleHandle_, this)) {
m_onlineClosingFlags = ClosingFlag;
m_terminationFlags.fetch_or(TerminatedFlag);
return false;
@@ -189,7 +189,7 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
return { reinterpret_cast<WTF::StringImpl**>(vec.begin()), vec.size() };
})
.value_or(std::span<WTF::StringImpl*> {});
void* impl = WebWorker__create(
WebWorkerLifecycleHandle* lifecycleHandle = WebWorkerLifecycleHandle__createWebWorker(
worker.ptr(),
bunVM(context.jsGlobalObject()),
nameStr,
@@ -212,11 +212,11 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
preloadModuleStrings.clear();
if (!impl) {
if (!lifecycleHandle) {
return Exception { TypeError, errorMessage.toWTFString(BunString::ZeroCopy) };
}
worker->impl_ = impl;
worker->lifecycleHandle_ = lifecycleHandle;
worker->m_workerCreationTime = MonotonicTime::now();
return worker;
@@ -228,6 +228,14 @@ Worker::~Worker()
Locker locker { allWorkersLock };
allWorkers().remove(m_clientIdentifier);
}
if (lifecycleHandle_) {
auto* impl = lifecycleHandle_;
lifecycleHandle_ = nullptr;
WebWorkerLifecycleHandle__requestTermination(impl);
// release our reference back in web_worker.zig
WebWorkerLifecycleHandle__release(impl);
}
// m_contextProxy.workerObjectDestroyed();
}
@@ -261,9 +269,11 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
void Worker::terminate()
{
// m_contextProxy.terminateWorkerGlobalScope();
m_terminationFlags.fetch_or(TerminateRequestedFlag);
WebWorker__notifyNeedTermination(impl_);
auto* impl = lifecycleHandle_;
lifecycleHandle_ = nullptr;
WebWorkerLifecycleHandle__requestTermination(impl);
}
// const char* Worker::activeDOMObjectName() const
@@ -468,6 +478,7 @@ void Worker::forEachWorker(const Function<Function<void(ScriptExecutionContext&)
extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* worker, int32_t exitCode)
{
worker->dispatchExit(exitCode);
// no longer referenced by Zig
worker->deref();

View File

@@ -50,6 +50,7 @@ class WorkerGlobalScopeProxy;
struct StructuredSerializeOptions;
struct WorkerOptions;
struct WebWorkerLifecycleHandle;
class Worker final : public ThreadSafeRefCounted<Worker>, public EventTargetWithInlineData, private ContextDestructionObserver {
WTF_MAKE_TZONE_ALLOCATED(Worker);
@@ -119,7 +120,7 @@ private:
// Tracks TerminateRequestedFlag and TerminatedFlag
std::atomic<uint8_t> m_terminationFlags { 0 };
const ScriptExecutionContextIdentifier m_clientIdentifier;
void* impl_ { nullptr };
WebWorkerLifecycleHandle* lifecycleHandle_ { nullptr };
};
JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject);

View File

@@ -12,7 +12,9 @@ tasks: Queue = undefined,
///
/// Having two queues avoids infinite loops creating by calling `setImmediate` in a `setImmediate` callback.
immediate_tasks: std.ArrayListUnmanaged(*Timer.ImmediateObject) = .{},
immediate_cpp_tasks: std.ArrayListUnmanaged(*CppTask) = .{},
next_immediate_tasks: std.ArrayListUnmanaged(*Timer.ImmediateObject) = .{},
next_immediate_cpp_tasks: std.ArrayListUnmanaged(*CppTask) = .{},
concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
global: *jsc.JSGlobalObject = undefined,
@@ -29,6 +31,13 @@ imminent_gc_timer: std.atomic.Value(?*Timer.WTFTimer) = .{ .raw = null },
signal_handler: if (Environment.isPosix) ?*PosixSignalHandle else void = if (Environment.isPosix) null,
// this exists because while we're inside a spawnSync call, some tasks can actually
// still complete which leads to a case where module resolution can partially complete and
// some modules are only partialy evaluated which causes reference errors.
// TODO: A better fix here could be a second event loop so we can come off the main one
// while processing spawnSync, then resume back to here afterwards
is_inside_spawn_sync: bool = false,
pub const Debug = if (Environment.isDebug) struct {
is_inside_tick_queue: bool = false,
js_call_count_outside_tick_queue: usize = 0,
@@ -126,6 +135,12 @@ const DrainMicrotasksResult = enum(u8) {
extern fn JSC__JSGlobalObject__drainMicrotasks(*jsc.JSGlobalObject) DrainMicrotasksResult;
pub fn drainMicrotasksWithGlobal(this: *EventLoop, globalObject: *jsc.JSGlobalObject, jsc_vm: *jsc.VM) bun.JSTerminated!void {
jsc.markBinding(@src());
// see is_inside_spawn_sync doc comment
if (this.is_inside_spawn_sync) {
return;
}
jsc_vm.releaseWeakRefs();
switch (JSC__JSGlobalObject__drainMicrotasks(globalObject)) {
@@ -220,10 +235,19 @@ fn tickWithCount(this: *EventLoop, virtual_machine: *VirtualMachine) u32 {
pub fn tickImmediateTasks(this: *EventLoop, virtual_machine: *VirtualMachine) void {
var to_run_now = this.immediate_tasks;
var to_run_now_cpp = this.immediate_cpp_tasks;
this.immediate_tasks = this.next_immediate_tasks;
this.next_immediate_tasks = .{};
this.immediate_cpp_tasks = this.next_immediate_cpp_tasks;
this.next_immediate_cpp_tasks = .{};
for (to_run_now_cpp.items) |task| {
log("running immediate cpp task", .{});
task.run(virtual_machine.global);
}
var exception_thrown = false;
for (to_run_now.items) |task| {
exception_thrown = task.runImmediateTask(virtual_machine);
@@ -234,6 +258,22 @@ pub fn tickImmediateTasks(this: *EventLoop, virtual_machine: *VirtualMachine) vo
this.maybeDrainMicrotasks();
}
if (this.next_immediate_cpp_tasks.capacity > 0) {
// this would only occur if we were recursively running tickImmediateTasks.
@branchHint(.unlikely);
this.immediate_cpp_tasks.appendSlice(bun.default_allocator, this.next_immediate_cpp_tasks.items) catch bun.outOfMemory();
this.next_immediate_cpp_tasks.deinit(bun.default_allocator);
}
if (to_run_now_cpp.capacity > 1024 * 128) {
// once in a while, deinit the array to free up memory
to_run_now_cpp.clearAndFree(bun.default_allocator);
} else {
to_run_now_cpp.clearRetainingCapacity();
}
this.next_immediate_cpp_tasks = to_run_now_cpp;
if (this.next_immediate_tasks.capacity > 0) {
// this would only occur if we were recursively running tickImmediateTasks.
@branchHint(.unlikely);
@@ -549,6 +589,10 @@ pub fn enqueueTask(this: *EventLoop, task: Task) void {
this.tasks.writeItem(task) catch unreachable;
}
pub fn enqueueImmediateCppTask(this: *EventLoop, task: *CppTask) void {
this.immediate_cpp_tasks.append(bun.default_allocator, task) catch bun.outOfMemory();
}
pub fn enqueueImmediateTask(this: *EventLoop, task: *Timer.ImmediateObject) void {
bun.handleOom(this.immediate_tasks.append(bun.default_allocator, task));
}

View File

@@ -283,14 +283,14 @@ fn setCwd_(globalObject: *jsc.JSGlobalObject, to: *jsc.ZigString) bun.JSError!js
}
}
// TODO(@190n) this may need to be noreturn
pub fn exit(globalObject: *jsc.JSGlobalObject, code: u8) callconv(.c) void {
var vm = globalObject.bunVM();
vm.exit_handler.exit_code = code;
if (vm.worker) |worker| {
// TODO(@190n) we may need to use requestTerminate or throwTerminationException
// instead to terminate the worker sooner
// sets exit_called and requests termination
worker.exit();
// and then fire the trap to immediately interrupt js execution
vm.jsc_vm.notifyNeedTermination();
} else {
vm.onExit();
vm.globalExit();

View File

@@ -78,6 +78,12 @@ pub export fn Bun__queueTask(global: *JSGlobalObject, task: *jsc.CppTask) void {
global.bunVM().eventLoop().enqueueTask(jsc.Task.init(task));
}
pub export fn Bun__queueImmediateCppTask(global: *JSGlobalObject, task: *jsc.CppTask) void {
jsc.markBinding(@src());
global.bunVM().eventLoop().enqueueImmediateCppTask(task);
}
pub export fn Bun__reportUnhandledError(globalObject: *JSGlobalObject, value: JSValue) callconv(.C) JSValue {
jsc.markBinding(@src());

View File

@@ -1,6 +1,10 @@
//! Shared implementation of Web and Node `Worker`
const WebWorker = @This();
const RefCount = bun.ptr.ThreadSafeRefCount(@This(), "ref_count", destroy, .{});
pub const new = bun.TrivialNew(@This());
pub const ref = RefCount.ref;
pub const deref = RefCount.deref;
const log = Output.scoped(.Worker, .hidden);
@@ -8,11 +12,13 @@ const log = Output.scoped(.Worker, .hidden);
vm: ?*jsc.VirtualMachine = null,
status: std.atomic.Value(Status) = .init(.start),
/// To prevent UAF, the `spin` function (aka the worker's event loop) will call deinit once this is set and properly exit the loop.
requested_terminate: std.atomic.Value(bool) = .init(false),
execution_context_id: u32 = 0,
parent_context_id: u32 = 0,
parent: *jsc.VirtualMachine,
ref_count: RefCount,
lifecycle_handle: *WebWorkerLifecycleHandle,
/// To be resolved on the Worker thread at startup, in spin().
unresolved_specifier: []const u8,
preloads: [][]const u8 = &.{},
@@ -58,14 +64,11 @@ export fn WebWorker__getParentWorker(vm: *jsc.VirtualMachine) ?*anyopaque {
}
pub fn hasRequestedTerminate(this: *const WebWorker) bool {
return this.requested_terminate.load(.monotonic);
return this.lifecycle_handle.worker.load(.acquire) == null;
}
pub fn setRequestedTerminate(this: *WebWorker) bool {
return this.requested_terminate.swap(true, .release);
}
export fn WebWorker__updatePtr(worker: *WebWorker, ptr: *anyopaque) bool {
export fn WebWorker__updatePtr(handle: *WebWorkerLifecycleHandle, ptr: *anyopaque) bool {
const worker = handle.worker.load(.acquire).?;
worker.cpp_worker = ptr;
var thread = std.Thread.spawn(
@@ -73,7 +76,7 @@ export fn WebWorker__updatePtr(worker: *WebWorker, ptr: *anyopaque) bool {
startWithErrorHandling,
.{worker},
) catch {
worker.deinit();
worker.destroy();
return false;
};
thread.detach();
@@ -203,6 +206,7 @@ pub fn create(
execArgv_len: usize,
preload_modules_ptr: ?[*]bun.String,
preload_modules_len: usize,
lifecycle_handle: *WebWorkerLifecycleHandle,
) callconv(.c) ?*WebWorker {
jsc.markBinding(@src());
log("[{d}] WebWorker.create", .{this_context_id});
@@ -233,8 +237,9 @@ pub fn create(
}
}
var worker = bun.handleOom(bun.default_allocator.create(WebWorker));
worker.* = WebWorker{
var worker = bun.new(WebWorker, WebWorker{
.lifecycle_handle = lifecycle_handle,
.ref_count = .init(),
.cpp_worker = cpp_worker,
.parent = parent,
.parent_context_id = parent_context_id,
@@ -254,10 +259,10 @@ pub fn create(
.argv = if (argv_ptr) |ptr| ptr[0..argv_len] else &.{},
.execArgv = if (inherit_execArgv) null else (if (execArgv_ptr) |ptr| ptr[0..execArgv_len] else &.{}),
.preloads = preloads.items,
};
});
worker.parent_poll_ref.ref(parent);
worker.ref();
return worker;
}
@@ -273,6 +278,7 @@ pub fn startWithErrorHandling(
pub fn start(
this: *WebWorker,
) anyerror!void {
errdefer this.deref();
if (this.name.len > 0) {
Output.Source.configureNamedThread(this.name);
} else {
@@ -373,7 +379,10 @@ fn deinit(this: *WebWorker) void {
bun.default_allocator.free(preload);
}
bun.default_allocator.free(this.preloads);
bun.default_allocator.destroy(this);
}
fn destroy(this: *WebWorker) void {
bun.destroy(this);
}
fn flushLogs(this: *WebWorker) void {
@@ -408,7 +417,7 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
var buffered_writer_ = bun.MutableString.BufferedWriter{ .context = &array };
var buffered_writer = &buffered_writer_;
var worker = vm.worker orelse @panic("Assertion failure: no worker");
const worker = vm.worker orelse @panic("Assertion failure: no worker");
const writer = buffered_writer.writer();
const Writer = @TypeOf(writer);
@@ -442,9 +451,12 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
jsc.markBinding(@src());
WebWorker__dispatchError(globalObject, worker.cpp_worker, bun.String.cloneUTF8(array.slice()), error_instance);
if (vm.worker) |worker_| {
_ = worker.setRequestedTerminate();
worker.parent_poll_ref.unrefConcurrently(worker.parent);
worker_.exitAndDeinit();
// During unhandled rejection, we're already holding the API lock - now
// is the right time to set exit_called to true so that
// notifyNeedTermination uses vm.global.requestTermination() instead of
// vm.jsc.notifyNeedTermination() which avoid VMTraps assertion failures
worker_.exit_called = true;
worker_.lifecycle_handle.requestTermination();
}
}
@@ -454,6 +466,10 @@ fn setStatus(this: *WebWorker, status: Status) void {
this.status.store(status, .release);
}
fn unhandledError(this: *WebWorker, _: anyerror) void {
this.flushLogs();
}
fn spin(this: *WebWorker) void {
log("[{d}] spin start", .{this.execution_context_id});
@@ -545,12 +561,14 @@ fn spin(this: *WebWorker) void {
}
/// This is worker.ref()/.unref() from JS (Caller thread)
pub fn setRef(this: *WebWorker, value: bool) callconv(.c) void {
if (this.hasRequestedTerminate()) {
return;
}
pub fn setRef(handle: *WebWorkerLifecycleHandle, value: bool) callconv(.c) void {
if (handle.worker.load(.acquire)) |worker| {
if (worker.hasRequestedTerminate()) {
return;
}
this.setRefInternal(value);
worker.setRefInternal(value);
}
}
pub fn setRefInternal(this: *WebWorker, value: bool) void {
@@ -564,26 +582,30 @@ pub fn setRefInternal(this: *WebWorker, value: bool) void {
/// Implement process.exit(). May only be called from the Worker thread.
pub fn exit(this: *WebWorker) void {
this.exit_called = true;
this.notifyNeedTermination();
this.lifecycle_handle.requestTermination();
}
/// Request a terminate from any thread.
pub fn notifyNeedTermination(this: *WebWorker) callconv(.c) void {
fn notifyNeedTermination(this: *WebWorker) void {
if (this.status.load(.acquire) == .terminated) {
return;
}
if (this.setRequestedTerminate()) {
return;
}
log("[{d}] notifyNeedTermination", .{this.execution_context_id});
if (this.vm) |vm| {
vm.eventLoop().wakeup();
// TODO(@190n) notifyNeedTermination
}
// TODO(@190n) delete
this.setRefInternal(false);
if (this.exit_called) {
// For process.exit() called from JavaScript, use JSC's termination
// exception mechanism to interrupt ongoing JS execution
vm.global.requestTermination();
} else {
// For external terminate requests (e.g worker.terminate() from parent thread),
// use VM traps system
vm.jsc_vm.notifyNeedTermination();
}
}
}
/// This handles cleanup, emitting the "close" event, and deinit.
@@ -596,6 +618,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
log("[{d}] exitAndDeinit", .{this.execution_context_id});
const cpp_worker = this.cpp_worker;
var exit_code: i32 = 0;
var globalObject: ?*jsc.JSGlobalObject = null;
var vm_to_deinit: ?*jsc.VirtualMachine = null;
@@ -610,31 +633,100 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
vm_to_deinit = vm;
}
var arena = this.arena;
this.lifecycle_handle.deref();
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);
if (loop) |loop_| {
loop_.internal_loop_data.jsc_vm = null;
}
bun.uws.onThreadExit();
this.deinit();
if (vm_to_deinit) |vm| {
vm.gc_controller.deinit();
vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm.
}
bun.deleteAllPoolsForThreadExit();
if (arena) |*arena_| {
arena_.deinit();
}
this.deref();
bun.exitThread();
}
pub export fn WebWorkerLifecycleHandle__requestTermination(handle: ?*WebWorkerLifecycleHandle) void {
if (handle) |h| {
h.requestTermination();
}
}
pub export fn WebWorkerLifecycleHandle__release(handle: ?*WebWorkerLifecycleHandle) void {
if (handle) |h| {
h.deref();
}
}
/// Manages the complex timing surrounding web worker creation and destruction
const WebWorkerLifecycleHandle = struct {
const RefCount = bun.ptr.ThreadSafeRefCount(@This(), "ref_count", WebWorkerLifecycleHandle.deinit, .{});
pub const ref = WebWorkerLifecycleHandle.RefCount.ref;
pub const deref = WebWorkerLifecycleHandle.RefCount.deref;
worker: std.atomic.Value(?*WebWorker),
ref_count: WebWorkerLifecycleHandle.RefCount,
pub const new = bun.TrivialNew(WebWorkerLifecycleHandle);
pub fn createWebWorker(
cpp_worker: *void,
parent: *jsc.VirtualMachine,
name_str: bun.String,
specifier_str: bun.String,
error_message: *bun.String,
parent_context_id: u32,
this_context_id: u32,
mini: bool,
default_unref: bool,
eval_mode: bool,
argv_ptr: ?[*]WTFStringImpl,
argv_len: usize,
inherit_execArgv: bool,
execArgv_ptr: ?[*]WTFStringImpl,
execArgv_len: usize,
preload_modules_ptr: ?[*]bun.String,
preload_modules_len: usize,
) callconv(.c) *WebWorkerLifecycleHandle {
const handle = WebWorkerLifecycleHandle.new(.{
.worker = .init(null),
.ref_count = .init(),
});
const worker = create(cpp_worker, parent, name_str, specifier_str, error_message, parent_context_id, this_context_id, mini, default_unref, eval_mode, argv_ptr, argv_len, inherit_execArgv, execArgv_ptr, execArgv_len, preload_modules_ptr, preload_modules_len, handle).?;
handle.worker.store(worker, .release);
// Worker.cpp holds a reference to this
handle.ref();
return handle;
}
pub fn deinit(this: *WebWorkerLifecycleHandle) void {
bun.destroy(this);
}
pub fn requestTermination(self: *WebWorkerLifecycleHandle) void {
const worker = self.worker.swap(null, .acq_rel) orelse return;
worker.notifyNeedTermination();
worker.deref();
}
};
comptime {
@export(&create, .{ .name = "WebWorker__create" });
@export(&notifyNeedTermination, .{ .name = "WebWorker__notifyNeedTermination" });
@export(&WebWorkerLifecycleHandle.createWebWorker, .{ .name = "WebWorkerLifecycleHandle__createWebWorker" });
@export(&setRef, .{ .name = "WebWorker__setRef" });
_ = WebWorker__updatePtr;
}
const std = @import("std");

View File

@@ -4,7 +4,6 @@ const HTTPClient = @This();
pub var default_allocator: std.mem.Allocator = undefined;
pub var default_arena: Arena = undefined;
pub var http_thread: HTTPThread = undefined;
//TODO: this needs to be freed when Worker Threads are implemented
pub var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.AnySocket).init(bun.default_allocator);
pub var async_http_id_monotonic: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);

View File

@@ -1,3 +1,5 @@
import type { Pipe as NodeStreamPipe } from "node:stream";
// Hardcoded module "node:child_process"
const EventEmitter = require("node:events");
const OsModule = require("node:os");
@@ -1041,13 +1043,15 @@ class ChildProcess extends EventEmitter {
#handle;
#closesNeeded = 1;
#closesGot = 0;
disconnect: undefined | (() => void);
signalCode = null;
exitCode = null;
spawnfile;
spawnargs;
pid;
channel;
channel: NodeStreamPipe | undefined;
killed = false;
[Symbol.dispose]() {
@@ -1371,7 +1375,7 @@ class ChildProcess extends EventEmitter {
if (has_ipc) {
this.send = this.#send;
this.disconnect = this.#disconnect;
this.channel = new Control();
this.channel = new SubprocessChannel(this);
Object.defineProperty(this, "_channel", {
get() {
return this.channel;
@@ -1730,9 +1734,46 @@ function abortChildProcess(child, killSignal, reason) {
}
}
class Control extends EventEmitter {
constructor() {
class SubprocessChannel extends EventEmitter implements NodeStreamPipe {
#subprocess: ChildProcess;
#closed: boolean = false;
public constructor(childProcess: ChildProcess) {
super();
this.#subprocess = childProcess;
}
public close(): void {
if (this.#closed) return;
this.#closed = true;
if (this.#subprocess.connected) {
this.#subprocess.disconnect?.();
}
process.nextTick(() => {
this.emit("close");
});
}
public hasRef(): boolean {
if (this.#closed) return false;
const handle = this.#subprocess[kHandle];
if (!handle) return false;
return this.#subprocess.connected;
}
public ref(): void {
if (this.#closed) return;
this.#subprocess.ref();
}
public unref(): void {
if (this.#closed) return;
this.#subprocess.unref();
}
}

View File

@@ -1,4 +1,5 @@
// import type { Readable, Writable } from "node:stream";
// import type { WorkerOptions } from "node:worker_threads";
declare const self: typeof globalThis;
type WebWorker = InstanceType<typeof globalThis.Worker>;
@@ -225,15 +226,19 @@ function moveMessagePortToContext() {
const unsupportedOptions = ["stdin", "stdout", "stderr", "trackedUnmanagedFds", "resourceLimits"];
class Worker extends EventEmitter {
class Worker extends EventEmitter implements AsyncDisposable {
#worker: WebWorker;
#performance;
// this is used by terminate();
// either is the exit code if exited, a promise resolving to the exit code, or undefined if we haven't sent .terminate() yet
#onExitPromise: Promise<number> | number | undefined = undefined;
#onExitResolvers = Promise.withResolvers<number | void>();
#urlToRevoke = "";
async [Symbol.asyncDispose]() {
await this.terminate();
}
constructor(filename: string, options: NodeWorkerOptions = {}) {
super();
for (const key of unsupportedOptions) {
@@ -319,7 +324,13 @@ class Worker extends EventEmitter {
});
}
terminate(callback: unknown) {
terminate(callback?: unknown): Promise<number | void> {
// threadId = -1 signifies the worker was closed already. Node returns PromiseResolve() in this case
// https://github.com/nodejs/node/blob/61601089f7f2f0e5e7abe8240f198585f585704c/lib/internal/worker.js#L390
if (this.threadId === -1) {
return Promise.resolve<void>();
}
if (typeof callback === "function") {
process.emitWarning(
"Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.",
@@ -329,12 +340,8 @@ class Worker extends EventEmitter {
this.#worker.addEventListener("close", event => callback(null, event.code), { once: true });
}
const onExitPromise = this.#onExitPromise;
if (onExitPromise) {
return $isPromise(onExitPromise) ? onExitPromise : Promise.$resolve(onExitPromise);
}
const { resolve, promise } = this.#onExitResolvers;
const { resolve, promise } = Promise.withResolvers();
this.#worker.addEventListener(
"close",
event => {
@@ -342,12 +349,13 @@ class Worker extends EventEmitter {
},
{ once: true },
);
this.#worker.terminate();
return (this.#onExitPromise = promise);
return promise;
}
postMessage(...args: [any, any]) {
postMessage(...args: Parameters<Bun.Worker["postMessage"]>) {
return this.#worker.postMessage.$apply(this.#worker, args);
}
@@ -356,8 +364,8 @@ class Worker extends EventEmitter {
return stringPromise.then(s => new HeapSnapshotStream(s));
}
#onClose(e) {
this.#onExitPromise = e.code;
#onClose(e: Event & { code: number }) {
this.#onExitResolvers.resolve(e.code);
this.emit("exit", e.code);
}

View File

@@ -69,6 +69,7 @@
"pg-gateway": "0.3.0-beta.4",
"pino": "9.4.0",
"pino-pretty": "11.2.2",
"piscina": "5.0.0",
"postgres": "3.3.5",
"prisma": "5.1.1",
"prompts": "2.4.2",
@@ -460,6 +461,42 @@
"@napi-rs/canvas-win32-x64-msvc": ["@napi-rs/canvas-win32-x64-msvc@0.1.65", "", { "os": "win32", "cpu": "x64" }, "sha512-RZQX3luWnlNWgdMnLMQ1hyfQraeAn9lnxWWVCHuUM4tAWEV8UDdeb7cMwmJW7eyt8kAosmjeHt3cylQMHOxGFg=="],
"@napi-rs/nice": ["@napi-rs/nice@1.1.1", "", { "optionalDependencies": { "@napi-rs/nice-android-arm-eabi": "1.1.1", "@napi-rs/nice-android-arm64": "1.1.1", "@napi-rs/nice-darwin-arm64": "1.1.1", "@napi-rs/nice-darwin-x64": "1.1.1", "@napi-rs/nice-freebsd-x64": "1.1.1", "@napi-rs/nice-linux-arm-gnueabihf": "1.1.1", "@napi-rs/nice-linux-arm64-gnu": "1.1.1", "@napi-rs/nice-linux-arm64-musl": "1.1.1", "@napi-rs/nice-linux-ppc64-gnu": "1.1.1", "@napi-rs/nice-linux-riscv64-gnu": "1.1.1", "@napi-rs/nice-linux-s390x-gnu": "1.1.1", "@napi-rs/nice-linux-x64-gnu": "1.1.1", "@napi-rs/nice-linux-x64-musl": "1.1.1", "@napi-rs/nice-openharmony-arm64": "1.1.1", "@napi-rs/nice-win32-arm64-msvc": "1.1.1", "@napi-rs/nice-win32-ia32-msvc": "1.1.1", "@napi-rs/nice-win32-x64-msvc": "1.1.1" } }, "sha512-xJIPs+bYuc9ASBl+cvGsKbGrJmS6fAKaSZCnT0lhahT5rhA2VVy9/EcIgd2JhtEuFOJNx7UHNn/qiTPTY4nrQw=="],
"@napi-rs/nice-android-arm-eabi": ["@napi-rs/nice-android-arm-eabi@1.1.1", "", { "os": "android", "cpu": "arm" }, "sha512-kjirL3N6TnRPv5iuHw36wnucNqXAO46dzK9oPb0wj076R5Xm8PfUVA9nAFB5ZNMmfJQJVKACAPd/Z2KYMppthw=="],
"@napi-rs/nice-android-arm64": ["@napi-rs/nice-android-arm64@1.1.1", "", { "os": "android", "cpu": "arm64" }, "sha512-blG0i7dXgbInN5urONoUCNf+DUEAavRffrO7fZSeoRMJc5qD+BJeNcpr54msPF6qfDD6kzs9AQJogZvT2KD5nw=="],
"@napi-rs/nice-darwin-arm64": ["@napi-rs/nice-darwin-arm64@1.1.1", "", { "os": "darwin", "cpu": "arm64" }, "sha512-s/E7w45NaLqTGuOjC2p96pct4jRfo61xb9bU1unM/MJ/RFkKlJyJDx7OJI/O0ll/hrfpqKopuAFDV8yo0hfT7A=="],
"@napi-rs/nice-darwin-x64": ["@napi-rs/nice-darwin-x64@1.1.1", "", { "os": "darwin", "cpu": "x64" }, "sha512-dGoEBnVpsdcC+oHHmW1LRK5eiyzLwdgNQq3BmZIav+9/5WTZwBYX7r5ZkQC07Nxd3KHOCkgbHSh4wPkH1N1LiQ=="],
"@napi-rs/nice-freebsd-x64": ["@napi-rs/nice-freebsd-x64@1.1.1", "", { "os": "freebsd", "cpu": "x64" }, "sha512-kHv4kEHAylMYmlNwcQcDtXjklYp4FCf0b05E+0h6nDHsZ+F0bDe04U/tXNOqrx5CmIAth4vwfkjjUmp4c4JktQ=="],
"@napi-rs/nice-linux-arm-gnueabihf": ["@napi-rs/nice-linux-arm-gnueabihf@1.1.1", "", { "os": "linux", "cpu": "arm" }, "sha512-E1t7K0efyKXZDoZg1LzCOLxgolxV58HCkaEkEvIYQx12ht2pa8hoBo+4OB3qh7e+QiBlp1SRf+voWUZFxyhyqg=="],
"@napi-rs/nice-linux-arm64-gnu": ["@napi-rs/nice-linux-arm64-gnu@1.1.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-CIKLA12DTIZlmTaaKhQP88R3Xao+gyJxNWEn04wZwC2wmRapNnxCUZkVwggInMJvtVElA+D4ZzOU5sX4jV+SmQ=="],
"@napi-rs/nice-linux-arm64-musl": ["@napi-rs/nice-linux-arm64-musl@1.1.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-+2Rzdb3nTIYZ0YJF43qf2twhqOCkiSrHx2Pg6DJaCPYhhaxbLcdlV8hCRMHghQ+EtZQWGNcS2xF4KxBhSGeutg=="],
"@napi-rs/nice-linux-ppc64-gnu": ["@napi-rs/nice-linux-ppc64-gnu@1.1.1", "", { "os": "linux", "cpu": "ppc64" }, "sha512-4FS8oc0GeHpwvv4tKciKkw3Y4jKsL7FRhaOeiPei0X9T4Jd619wHNe4xCLmN2EMgZoeGg+Q7GY7BsvwKpL22Tg=="],
"@napi-rs/nice-linux-riscv64-gnu": ["@napi-rs/nice-linux-riscv64-gnu@1.1.1", "", { "os": "linux", "cpu": "none" }, "sha512-HU0nw9uD4FO/oGCCk409tCi5IzIZpH2agE6nN4fqpwVlCn5BOq0MS1dXGjXaG17JaAvrlpV5ZeyZwSon10XOXw=="],
"@napi-rs/nice-linux-s390x-gnu": ["@napi-rs/nice-linux-s390x-gnu@1.1.1", "", { "os": "linux", "cpu": "s390x" }, "sha512-2YqKJWWl24EwrX0DzCQgPLKQBxYDdBxOHot1KWEq7aY2uYeX+Uvtv4I8xFVVygJDgf6/92h9N3Y43WPx8+PAgQ=="],
"@napi-rs/nice-linux-x64-gnu": ["@napi-rs/nice-linux-x64-gnu@1.1.1", "", { "os": "linux", "cpu": "x64" }, "sha512-/gaNz3R92t+dcrfCw/96pDopcmec7oCcAQ3l/M+Zxr82KT4DljD37CpgrnXV+pJC263JkW572pdbP3hP+KjcIg=="],
"@napi-rs/nice-linux-x64-musl": ["@napi-rs/nice-linux-x64-musl@1.1.1", "", { "os": "linux", "cpu": "x64" }, "sha512-xScCGnyj/oppsNPMnevsBe3pvNaoK7FGvMjT35riz9YdhB2WtTG47ZlbxtOLpjeO9SqqQ2J2igCmz6IJOD5JYw=="],
"@napi-rs/nice-openharmony-arm64": ["@napi-rs/nice-openharmony-arm64@1.1.1", "", { "os": "none", "cpu": "arm64" }, "sha512-6uJPRVwVCLDeoOaNyeiW0gp2kFIM4r7PL2MczdZQHkFi9gVlgm+Vn+V6nTWRcu856mJ2WjYJiumEajfSm7arPQ=="],
"@napi-rs/nice-win32-arm64-msvc": ["@napi-rs/nice-win32-arm64-msvc@1.1.1", "", { "os": "win32", "cpu": "arm64" }, "sha512-uoTb4eAvM5B2aj/z8j+Nv8OttPf2m+HVx3UjA5jcFxASvNhQriyCQF1OB1lHL43ZhW+VwZlgvjmP5qF3+59atA=="],
"@napi-rs/nice-win32-ia32-msvc": ["@napi-rs/nice-win32-ia32-msvc@1.1.1", "", { "os": "win32", "cpu": "ia32" }, "sha512-CNQqlQT9MwuCsg1Vd/oKXiuH+TcsSPJmlAFc5frFyX/KkOh0UpBLEj7aoY656d5UKZQMQFP7vJNa1DNUNORvug=="],
"@napi-rs/nice-win32-x64-msvc": ["@napi-rs/nice-win32-x64-msvc@1.1.1", "", { "os": "win32", "cpu": "x64" }, "sha512-vB+4G/jBQCAh0jelMTY3+kgFy00Hlx2f2/1zjMoH821IbplbWZOkLiTYXQkygNTzQJTq5cvwBDgn2ppHD+bglQ=="],
"@nestjs/common": ["@nestjs/common@11.0.3", "", { "dependencies": { "iterare": "1.2.1", "tslib": "2.8.1", "uid": "2.0.2" }, "peerDependencies": { "class-transformer": "*", "class-validator": "*", "reflect-metadata": "^0.1.12 || ^0.2.0", "rxjs": "^7.1.0" }, "optionalPeers": ["class-transformer", "class-validator"] }, "sha512-fTkJWQ20+jvPKfrv3A+T3wsPwwYSJyoJ+pcBzyKtv5fCpK/yX/rJalFUIpw1CDmarfqIaMX9SdkplNyxtvH6RA=="],
"@nestjs/core": ["@nestjs/core@11.0.3", "", { "dependencies": { "@nuxt/opencollective": "0.4.1", "fast-safe-stringify": "2.1.1", "iterare": "1.2.1", "path-to-regexp": "8.2.0", "tslib": "2.8.1", "uid": "2.0.2" }, "peerDependencies": { "@nestjs/common": "^11.0.0", "@nestjs/microservices": "^11.0.0", "@nestjs/platform-express": "^11.0.0", "@nestjs/websockets": "^11.0.0", "reflect-metadata": "^0.1.12 || ^0.2.0", "rxjs": "^7.1.0" }, "optionalPeers": ["@nestjs/microservices", "@nestjs/platform-express", "@nestjs/websockets"] }, "sha512-6UoVHpwa23HJxMNtuTXQCiqx/NHTG3lRBRgnZ8EDHTjgaNnR7P+xBS68zN3gLH7rBIrhhQ5Q1hVs7WswRxrw7Q=="],
@@ -2092,6 +2129,8 @@
"pino-std-serializers": ["pino-std-serializers@7.0.0", "", {}, "sha512-e906FRY0+tV27iq4juKzSYPbUj2do2X2JX4EzSca1631EB2QJQUqGbDuERal7LCtOpxl6x3+nvo9NPZcmjkiFA=="],
"piscina": ["piscina@5.0.0", "", { "optionalDependencies": { "@napi-rs/nice": "^1.0.1" } }, "sha512-R+arufwL7sZvGjAhSMK3TfH55YdGOqhpKXkcwQJr432AAnJX/xxX19PA4QisrmJ+BTTfZVggaz6HexbkQq1l1Q=="],
"pixelmatch": ["pixelmatch@5.3.0", "", { "dependencies": { "pngjs": "^6.0.0" }, "bin": { "pixelmatch": "bin/pixelmatch" } }, "sha512-o8mkY4E/+LNUf6LzX96ht6k6CEDi65k9G2rjMtBe9Oo+VPKSvl+0GKHuH/AlG+GA5LPG/i5hrekkxUc3s2HU+Q=="],
"pkg-dir": ["pkg-dir@4.2.0", "", { "dependencies": { "find-up": "^4.0.0" } }, "sha512-HRDzbaKjC+AOWVXxAU/x54COGeIv9eb+6CkDSQoNTt4XyWoIJvuPsXizxu/Fr23EiekbtZwmh1IcIG/l/a10GQ=="],

View File

@@ -5,7 +5,7 @@
* without always needing to run `bun install` in development.
*/
import { gc as bunGC, sleepSync, spawnSync, unsafe, which, write } from "bun";
import { gc as bunGC, readableStreamToText, sleepSync, spawnSync, unsafe, which, write } from "bun";
import { heapStats } from "bun:jsc";
import { beforeAll, describe, expect } from "bun:test";
import { ChildProcess, execSync, fork } from "child_process";
@@ -496,11 +496,43 @@ if (expect.extend)
}
}
},
async toRunAsync(cmds: string[], optionalStdout?: string, expectedCode: number = 0) {
const result = Bun.spawn({
cmd: [bunExe(), ...cmds],
env: bunEnv,
stdio: ["inherit", "pipe", "pipe"],
});
const [stdout, stderr, exitCode] = await Promise.all([
readableStreamToText(result.stdout),
readableStreamToText(result.stderr),
result.exited,
]);
if (exitCode !== expectedCode) {
return {
pass: false,
message: () => `Command ${cmds.join(" ")} failed:` + "\n" + stdout + "\n" + stderr,
};
}
if (optionalStdout != null) {
return {
pass: stdout === optionalStdout,
message: () => `Expected ${cmds.join(" ")} to output ${optionalStdout} but got ${stdout}`,
};
}
return {
pass: true,
message: () => `Expected ${cmds.join(" ")} to run`,
};
},
toRun(cmds: string[], optionalStdout?: string, expectedCode: number = 0) {
const result = Bun.spawnSync({
cmd: [bunExe(), ...cmds],
env: bunEnv,
stdio: ["inherit", "pipe", "inherit"],
stdio: ["ignore", "pipe", "inherit"],
});
if (result.exitCode !== expectedCode) {
@@ -1371,6 +1403,7 @@ interface BunHarnessTestMatchers {
toHaveTestTimedOutAfter(expected: number): void;
toBeBinaryType(expected: keyof typeof binaryTypes): void;
toRun(optionalStdout?: string, expectedCode?: number): void;
toRunAsync(optionalStdout?: string, expectedCode?: number): Promise<void>;
toThrowWithCode(cls: CallableFunction, code: string): void;
toThrowWithCodeAsync(cls: CallableFunction, code: string): Promise<void>;
}

View File

@@ -187,3 +187,6 @@ tsd.expectAssignable<NullSubprocess>(Bun.spawn([], { stdio: ["ignore", "inherit"
tsd.expectAssignable<NullSubprocess>(Bun.spawn([], { stdio: [null, null, null] }));
tsd.expectAssignable<SyncSubprocess<Bun.SpawnOptions.Readable, Bun.SpawnOptions.Readable>>(Bun.spawnSync([], {}));
Bun.spawnSync({ cmd: ["echo", "hello"] });
Bun.spawnSync(["echo", "hello"], { stdio: ["ignore", "pipe", "pipe"] });

View File

@@ -0,0 +1,8 @@
import * as common from '../common/index.mjs';
import { Worker } from 'node:worker_threads';
{
// Verifies that the worker is async disposable
await using worker = new Worker('for(;;) {}', { eval: true });
worker.on('exit', common.mustCall());
}

View File

@@ -0,0 +1,11 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');
// Regression for https://github.com/nodejs/node/issues/43182.
const w = new Worker(new URL('data:text/javascript,process.exit(1);await new Promise(()=>{ process.exit(2); })'));
w.on('exit', common.mustCall((code) => {
assert.strictEqual(code, 1);
}));

View File

@@ -7,7 +7,7 @@ const eachSizeMiB = 100;
const iterations = 5;
function test() {
const code = " ".repeat(eachSizeMiB * 1024 * 1024);
const code = Buffer.alloc(eachSizeMiB * 1024 * 1024, " ").toString();
return new Promise((resolve, reject) => {
const worker = new Worker(code, { eval: true });
worker.on("exit", () => resolve());

View File

@@ -0,0 +1,49 @@
import assert from "node:assert";
import { test } from "node:test";
import { fileURLToPath } from "node:url";
import { MessageChannel, MessagePort, parentPort, Worker } from "node:worker_threads";
interface StartupMessage {
port: MessagePort;
}
if (parentPort) {
parentPort.on("message", (message: StartupMessage) => {
console.log("Worker received startup message");
message.port.postMessage("hello");
message.port.close();
});
} else {
test("worker lifecycle message port", async () => {
const worker = new Worker(fileURLToPath(import.meta.url));
const { port1, port2 } = new MessageChannel();
const { promise, resolve, reject } = Promise.withResolvers<string>();
port1.on("message", (message: string) => {
console.log("Received message:", message);
assert.equal(message, "hello");
worker.terminate();
resolve(message);
});
worker.on("online", () => {
console.log("Worker is online");
const startupMessage: StartupMessage = { port: port2 };
worker.postMessage(startupMessage, [port2]);
});
worker.on("exit", () => {
console.log("Worker exited");
reject();
});
worker.on("error", err => {
reject(err);
});
assert.equal(await promise, "hello");
});
}

View File

@@ -0,0 +1,23 @@
import assert from "node:assert";
import { setTimeout as sleep } from "node:timers/promises";
import { fileURLToPath } from "node:url";
import { Worker, isMainThread, threadId } from "node:worker_threads";
const sleeptime = 100;
if (isMainThread) {
const worker = new Worker(fileURLToPath(import.meta.url));
assert.strictEqual(threadId, 0);
assert.strictEqual(worker.threadId, 1);
console.log(" (main) threadId:", worker.threadId);
await sleep(sleeptime);
assert.strictEqual(await worker.terminate(), 1);
assert.strictEqual(worker.threadId, -1); // should be -1 after termination
assert.strictEqual(await worker.terminate(), undefined); // sequential calling is basically no-op
assert.strictEqual(worker.threadId, -1);
} else {
console.log("(worker) threadId:", threadId);
assert.strictEqual(threadId, 1);
await sleep(sleeptime * 2); // keep it alive definitely longer than the parent
}

View File

@@ -0,0 +1,31 @@
import { once } from "node:events";
import { fileURLToPath } from "node:url";
import { Worker, isMainThread, parentPort } from "node:worker_threads";
if (isMainThread) {
const { test, expect } = await import("bun:test");
test("process.exit() works", async () => {
const worker = new Worker(fileURLToPath(import.meta.url));
worker.on("message", () => expect().fail("worker should not keep executing after process.exit()"));
worker.postMessage("boom");
const [exitCode] = await once(worker, "exit");
expect(exitCode).toBe(2);
});
} else {
console.log("Worker thread started");
parentPort!.on("message", message => {
console.log(`Worker received: ${message}`);
console.log("About to call process.exit(2)...");
process.exit(2);
console.log("process.exit(2) called");
parentPort!.postMessage("i'm still alive!");
});
console.log("Worker is ready, waiting for messages...");
}

View File

@@ -6,7 +6,7 @@ import { Worker, isMainThread, workerData } from "worker_threads";
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
const actions = {
async ["Bun.connect"](port) {
async ["Bun.connect"](port: number) {
await Bun.connect({
hostname: "localhost",
port,
@@ -19,7 +19,7 @@ const actions = {
},
});
},
async ["Bun.listen"](port) {
async ["Bun.listen"](port: number) {
const server = Bun.listen({
hostname: "localhost",
port: 0,
@@ -32,7 +32,7 @@ const actions = {
},
});
},
async ["fetch"](port) {
async ["fetch"](port: number) {
const resp = await fetch("http://localhost:" + port);
await resp.blob();
},
@@ -65,12 +65,10 @@ if (isMainThread) {
const { promise, resolve, reject } = Promise.withResolvers();
promises.push(promise);
worker.on("online", () => {
sleep(1)
.then(() => {
return worker.terminate();
})
.finally(resolve);
worker.once("online", async () => {
await sleep(1);
await worker.terminate();
resolve();
});
worker.on("error", e => reject(e));
}

View File

@@ -4,21 +4,22 @@ import { bunEnv, nodeExe } from "harness";
import { join } from "path";
const fixtureDir = join(import.meta.dirname, "fixtures");
function postNodeFormData(port) {
const result = Bun.spawnSync({
async function postNodeFormData(port) {
const result = Bun.spawn({
cmd: [nodeExe(), join(fixtureDir, "node-form-data.fetch.fixture.js"), port?.toString()],
env: bunEnv,
stdio: ["inherit", "inherit", "inherit"],
});
expect(result.exitCode).toBe(0);
expect(await result.exited).toBe(0);
}
function postNodeAction(port) {
const result = Bun.spawnSync({
async function postNodeAction(port) {
const result = Bun.spawn({
cmd: [nodeExe(), join(fixtureDir, "node-action.fetch.fixture.js"), port?.toString()],
env: bunEnv,
stdio: ["inherit", "inherit", "inherit"],
});
expect(result.exitCode).toBe(0);
expect(await result.exited).toBe(0);
}
describe("astro", async () => {
@@ -66,7 +67,7 @@ describe("astro", async () => {
});
test("is able todo a POST request to an astro action using node", async () => {
postNodeAction(previewServer.port);
await postNodeAction(previewServer.port);
});
test("is able to post form data to an astro using bun", async () => {
@@ -89,6 +90,6 @@ describe("astro", async () => {
});
});
test("is able to post form data to an astro using node", async () => {
postNodeFormData(previewServer.port);
await postNodeFormData(previewServer.port);
});
});

View File

@@ -0,0 +1 @@
.astro

View File

@@ -0,0 +1,6 @@
{
"name": "piscina-test",
"dependencies": {
"piscina": "5.0.0"
}
}

View File

@@ -0,0 +1,22 @@
import { resolve } from "node:path";
import { test } from "node:test";
import { Piscina } from "piscina";
test("piscina atomics", async () => {
const pool = new Piscina<void, void>({
filename: resolve(__dirname, "simple.fixture.ts"),
minThreads: 2,
maxThreads: 2,
atomics: "sync",
});
const tasks: Promise<void>[] = [];
for (let i = 1; i <= 10000; i++) {
tasks.push(pool.run());
}
await Promise.all(tasks);
await pool.destroy();
});

View File

@@ -0,0 +1,63 @@
import { expect, test } from "bun:test";
import { join } from "node:path";
import { Piscina } from "piscina";
setTimeout(() => {
console.error(new Error("Catastrophic failure, exiting so test can fail"));
process.exit(1);
}, 10 * 1000).unref();
test("Piscina basic functionality", async () => {
const piscina = new Piscina({
filename: join(import.meta.dir, "worker.fixture.ts"),
});
const result = await piscina.run({ a: 4, b: 6 });
expect(result).toBe(10);
await piscina.destroy();
});
test("Piscina event loop cleanup", async () => {
const piscina = new Piscina({
filename: join(import.meta.dir, "worker.fixture.ts"),
});
const results = await Promise.all([
piscina.run({ a: 1, b: 2 }),
piscina.run({ a: 3, b: 4 }),
piscina.run({ a: 5, b: 6 }),
]);
expect(results).toEqual([3, 7, 11]);
await piscina.destroy();
});
test("Piscina with idleTimeout", async () => {
const piscina = new Piscina({
filename: join(import.meta.dir, "worker.fixture.ts"),
idleTimeout: 100,
maxThreads: 1,
});
const result = await piscina.run({ a: 10, b: 20 });
expect(result).toBe(30);
await piscina.destroy();
});
test("Piscina error handling", async () => {
const piscina = new Piscina({
filename: join(import.meta.dir, "worker-error.fixture.ts"),
});
const p = await piscina.run({ shouldThrow: true }).then(
() => true,
() => false,
);
expect(p).toBe(false);
await piscina.destroy();
});

View File

@@ -0,0 +1,10 @@
// https://github.com/piscinajs/piscina/blob/ba396ced7afc08a8c16f65fbc367a9b7f4d7e84c/test/fixtures/simple-isworkerthread.ts#L7
import assert from "assert";
import Piscina from "piscina";
assert.strictEqual(Piscina.isWorkerThread, true);
export default function () {
return "done";
}

View File

@@ -0,0 +1,7 @@
export default ({ shouldThrow }: { shouldThrow: boolean }) => {
if (shouldThrow) {
throw new Error("Worker error for testing");
}
return "success";
};

View File

@@ -0,0 +1,4 @@
export default ({ a, b }: { a: number; b: number }) => {
console.log("Worker: calculating", a, "+", b);
return a + b;
};

View File

@@ -0,0 +1,308 @@
import assert from "node:assert";
import { test } from "node:test";
import { MessageChannel, receiveMessageOnPort, Worker } from "worker_threads";
test("MessagePort postMessage respects event loop timing", async () => {
const { port1, port2 } = new MessageChannel();
const messages: string[] = [];
let messageCount = 0;
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", msg => {
messages.push(`received: ${msg}`);
messageCount++;
if (messageCount === 3) {
resolve();
}
});
port2.start();
setImmediate(() => {
messages.push("setImmediate 1");
port1.postMessage("message1");
});
setImmediate(() => {
messages.push("setImmediate 2");
port1.postMessage("message2");
});
setImmediate(() => {
messages.push("setImmediate 3");
port1.postMessage("message3");
});
await promise;
assert.deepStrictEqual(messages, [
"setImmediate 1",
"setImmediate 2",
"setImmediate 3",
"received: message1",
"received: message2",
"received: message3",
]);
port1.close();
port2.close();
});
test("MessagePort messages execute after process.nextTick (Node.js compatibility)", async () => {
const { port1, port2 } = new MessageChannel();
const executionOrder: string[] = [];
let messageReceived = false;
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", () => {
executionOrder.push("message received");
messageReceived = true;
resolve();
});
port2.start();
port1.postMessage("test");
process.nextTick(() => {
executionOrder.push("nextTick 1");
});
process.nextTick(() => {
executionOrder.push("nextTick 2");
});
await promise;
await new Promise(resolve => setImmediate(resolve));
assert.strictEqual(messageReceived, true);
assert.strictEqual(executionOrder[0], "nextTick 1");
assert.strictEqual(executionOrder[1], "nextTick 2");
assert.strictEqual(executionOrder[2], "message received");
port1.close();
port2.close();
});
test("MessagePort message delivery works with workers", async () => {
const worker = new Worker(
`
const { parentPort, MessageChannel } = require('worker_threads');
parentPort.on('message', ({ port }) => {
let count = 0;
port.on('message', (msg) => {
count++;
port.postMessage(\`echo-\${count}: \${msg}\`);
if (count >= 3) {
port.close();
parentPort.postMessage('done');
}
});
port.start();
parentPort.postMessage('ready');
});
`,
{ eval: true },
);
const { port1, port2 } = new MessageChannel();
const messages: string[] = [];
let readyReceived = false;
let doneReceived = false;
const { promise, resolve, reject: rejectWorker } = Promise.withResolvers<void>();
const {
promise: allMessagesReceived,
resolve: resolveAllMessages,
reject: rejectAllMessages,
} = Promise.withResolvers<void>();
AbortSignal.timeout(100).addEventListener("abort", () => {
worker.terminate();
rejectWorker(new Error("timeout"));
rejectAllMessages(new Error("timeout"));
});
worker.on("message", msg => {
if (msg === "ready") {
readyReceived = true;
port1.postMessage("hello1");
port1.postMessage("hello2");
port1.postMessage("hello3");
} else if (msg === "done") {
doneReceived = true;
resolve();
}
});
port1.on("message", msg => {
messages.push(msg);
if (messages.length === 3) {
resolveAllMessages();
}
});
worker.postMessage({ port: port2 }, [port2]);
await Promise.all([promise, allMessagesReceived]);
assert.strictEqual(readyReceived, true);
assert.strictEqual(doneReceived, true);
assert.strictEqual(messages.length, 3);
assert.deepStrictEqual(messages, ["echo-1: hello1", "echo-2: hello2", "echo-3: hello3"]);
port1.close();
worker.terminate();
});
test("MessagePort messages don't starve microtasks", async () => {
const { port1, port2 } = new MessageChannel();
const executionOrder: string[] = [];
let messageCount = 0;
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", () => {
messageCount++;
executionOrder.push(`message-${messageCount}`);
if (messageCount === 3) {
resolve();
}
});
port2.start();
queueMicrotask(() => {
executionOrder.push("microtask-1");
});
port1.postMessage("msg1");
queueMicrotask(() => {
executionOrder.push("microtask-2");
});
port1.postMessage("msg2");
queueMicrotask(() => {
executionOrder.push("microtask-3");
});
port1.postMessage("msg3");
await promise;
assert(executionOrder.includes("microtask-1"));
assert(executionOrder.includes("microtask-2"));
assert(executionOrder.includes("microtask-3"));
assert(executionOrder.includes("message-1"));
assert(executionOrder.includes("message-2"));
assert(executionOrder.includes("message-3"));
port1.close();
port2.close();
});
test("high volume MessagePort operations maintain order", async () => {
const { port1, port2 } = new MessageChannel();
const TOTAL_MESSAGES = 100;
const receivedMessages: number[] = [];
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", msg => {
receivedMessages.push(msg);
if (receivedMessages.length === TOTAL_MESSAGES) {
resolve();
}
});
port2.start();
for (let i = 0; i < TOTAL_MESSAGES; i++) {
port1.postMessage(i);
}
await promise;
assert.strictEqual(receivedMessages.length, TOTAL_MESSAGES);
for (let i = 0; i < TOTAL_MESSAGES; i++) {
assert.strictEqual(receivedMessages[i], i);
}
port1.close();
port2.close();
});
test("MessagePort close behavior during message handling", async () => {
const { port1, port2 } = new MessageChannel();
let messageReceived = false;
let errorThrown = false;
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", () => {
messageReceived = true;
port2.close();
try {
port1.postMessage("after-close");
} catch (e) {
errorThrown = true;
}
setTimeout(resolve, 10);
});
port2.start();
port1.postMessage("test");
await promise;
assert.strictEqual(messageReceived, true);
assert.strictEqual(errorThrown, false);
port1.close();
});
test("receiveMessageOnPort synchronous message retrieval", () => {
const { port1, port2 } = new MessageChannel();
port1.postMessage("msg1");
port1.postMessage("msg2");
port1.postMessage("msg3");
const result1 = receiveMessageOnPort(port2);
const result2 = receiveMessageOnPort(port2);
const result3 = receiveMessageOnPort(port2);
const result4 = receiveMessageOnPort(port2);
assert.strictEqual(result1?.message, "msg1");
assert.strictEqual(result2?.message, "msg2");
assert.strictEqual(result3?.message, "msg3");
assert.strictEqual(result4, undefined);
port1.close();
port2.close();
});

View File

@@ -116,7 +116,7 @@ for (const testType of testTypes) {
if (!testType.isTransferable) {
expect(() =>
structuredCloneAdvanced(original, transferList, !!isForTransfer, isForStorage, context),
).toThrowError("The object can not be cloned.");
).toThrowError("The object could not be cloned.");
} else {
const cloned = structuredCloneAdvanced(original, transferList, !!isForTransfer, isForStorage, context);
testType.expectedAfterClone(original, cloned, isForTransfer, isForStorage);

View File

@@ -0,0 +1,27 @@
import assert from "node:assert";
import { spawnSync } from "node:child_process";
import { test } from "node:test";
import { fileURLToPath } from "node:url";
import { Worker } from "node:worker_threads";
import stripAnsi from "strip-ansi";
const IS_CHILD = process.env.IS_CHILD === "true";
// At the time of writing, this test file passes in Node.js and fails in Bun.
// Node.js seems to wait for the exit event to happen before the parent process
// exits, which means that the Worker's exit code is printed to stdout
if (IS_CHILD) {
const worker = new Worker("process.exit(1)", { eval: true });
worker.on("exit", code => console.log(code));
} else {
test("The worker exit event is emitted before the parent exits", async () => {
const file = fileURLToPath(import.meta.url);
const { stdout } = spawnSync(process.execPath, [file], {
env: { ...process.env, IS_CHILD: "true" },
});
assert.strictEqual(stripAnsi(stdout.toString()).trim(), "1");
});
}

View File

@@ -0,0 +1,293 @@
import { describe, expect, test } from "bun:test";
import { Worker as WebWorker } from "worker_threads";
const CONFIG = {
WARMUP_ITERATIONS: 2,
TEST_ITERATIONS: 10,
BATCH_SIZE: 5,
MEMORY_THRESHOLD_MB: 20,
GC_SETTLE_TIME: 50,
TEST_TIMEOUT_MS: 15000,
};
interface MemorySnapshot {
rss: number;
heapUsed: number;
heapTotal: number;
external: number;
}
function takeMemorySnapshot(): MemorySnapshot {
const mem = process.memoryUsage();
return {
rss: Math.round(mem.rss / 1024 / 1024),
heapUsed: Math.round(mem.heapUsed / 1024 / 1024),
heapTotal: Math.round(mem.heapTotal / 1024 / 1024),
external: Math.round(mem.external / 1024 / 1024),
};
}
async function forceGCAndSettle(): Promise<void> {
for (let i = 0; i < 2; i++) {
Bun.gc(true);
await Bun.sleep(CONFIG.GC_SETTLE_TIME);
}
}
function logMemoryDiff(before: MemorySnapshot, after: MemorySnapshot, label: string) {
const rssDiff = after.rss - before.rss;
const heapDiff = after.heapUsed - before.heapUsed;
console.log(`${label}:`, {
rss: `${before.rss}MB -> ${after.rss}MB (${rssDiff >= 0 ? "+" : ""}${rssDiff}MB)`,
heap: `${before.heapUsed}MB -> ${after.heapUsed}MB (${heapDiff >= 0 ? "+" : ""}${heapDiff}MB)`,
});
if (rssDiff > 50) {
console.warn(`⚠️ Large memory increase detected: +${rssDiff}MB RSS`);
}
}
async function withTimeout<T>(promise: Promise<T>, ms: number, description: string): Promise<T> {
const timeout = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`${description} timed out after ${ms}ms`)), ms);
});
return Promise.race([promise, timeout]);
}
async function runWorkerBatch(workerCode: string, batchSize: number = CONFIG.BATCH_SIZE): Promise<void> {
const workers: WebWorker[] = [];
for (let i = 0; i < batchSize; i++) {
const worker = new WebWorker(workerCode, { eval: true });
workers.push(worker);
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
worker.removeAllListeners();
reject(new Error(`Worker ${i} failed to respond within timeout`));
}, 5000);
worker.once("message", msg => {
clearTimeout(timeout);
if (msg.error) {
reject(new Error(msg.error));
} else {
resolve();
}
});
});
}
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
}
describe("Worker Memory Leak Tests", () => {
test(
"workers should not leak memory with basic create/terminate cycles",
async () => {
const workerCode = `
const { parentPort } = require('worker_threads');
parentPort.postMessage('ready');
`;
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`Warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("Baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 3 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`Test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Basic create/terminate test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
test(
"workers with HTTP activity should not leak memory",
async () => {
using server = Bun.serve({
port: 0,
fetch() {
return new Response("OK");
},
});
const workerCode = `
const { parentPort } = require('worker_threads');
async function doWork() {
try {
const response = await fetch('http://localhost:${server.port}');
await response.text();
parentPort.postMessage('done');
} catch (err) {
parentPort.postMessage({ error: err.message });
}
}
doWork();
`;
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} HTTP warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`HTTP warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("HTTP baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} HTTP test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 3 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`HTTP test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "HTTP activity test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
test(
"workers with message passing should not leak memory",
async () => {
const workerCode = `
const { parentPort } = require('worker_threads');
parentPort.on('message', (msg) => {
if (msg === 'start') {
for (let j = 0; j < 10; j++) {
parentPort.postMessage({ count: j, data: 'x'.repeat(1000) });
}
parentPort.postMessage('done');
}
});
`;
async function runMessagePassingBatch(): Promise<void> {
const workers: WebWorker[] = [];
for (let i = 0; i < CONFIG.BATCH_SIZE; i++) {
const worker = new WebWorker(workerCode, { eval: true });
workers.push(worker);
await new Promise<void>(resolve => {
worker.on("message", msg => {
if (msg === "done") {
resolve();
}
});
worker.postMessage("start");
});
}
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
}
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} message passing warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runMessagePassingBatch();
console.log(`Message passing warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("Message passing baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} message passing test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runMessagePassingBatch();
if ((i + 1) % 3 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(
`Message passing test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`,
);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Message passing test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
test(
"workers with timers should not leak memory",
async () => {
const workerCode = `
const { parentPort } = require('worker_threads');
const timers = [];
for (let i = 0; i < 5; i++) {
timers.push(setTimeout(() => {}, 10000));
timers.push(setInterval(() => {}, 1000));
}
parentPort.postMessage('ready');
`;
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} timer warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`Timer warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("Timer baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} timer test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 3 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`Timer test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Timer cleanup test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
});

View File

@@ -94,7 +94,7 @@ describe("web worker", () => {
};
});
test("worker-env", done => {
test("worker-env without a lot of properties", done => {
const worker = new Worker(new URL("worker-fixture-env.js", import.meta.url).href, {
env: {
// Verify that we use putDirectMayBeIndex instead of putDirect
@@ -342,12 +342,12 @@ describe("worker_threads", () => {
const worker = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
smol: true,
});
await Bun.sleep(200);
const code = await worker.terminate();
expect(code).toBe(2);
const [exitCode] = await once(worker, "exit");
expect<number | undefined>(await worker.terminate()).toBe(undefined);
expect<number | undefined>(exitCode).toBe(2);
});
test.todo("worker terminating forcefully properly interrupts", async () => {
test("worker terminating forcefully properly interrupts", async () => {
const worker = new wt.Worker(new URL("worker-fixture-while-true.js", import.meta.url).href, {});
await new Promise<void>(done => {
worker.on("message", () => done());

View File

@@ -74,6 +74,7 @@
"pg-gateway": "0.3.0-beta.4",
"pino": "9.4.0",
"pino-pretty": "11.2.2",
"piscina": "5.0.0",
"postgres": "3.3.5",
"prisma": "5.1.1",
"prompts": "2.4.2",