Compare commits

...

212 Commits

Author SHA1 Message Date
Alistair Smith
cfb379737c Merge branch 'main' of github.com:oven-sh/bun into ali/piscina-message-port-webkit 2025-07-02 09:26:43 -07:00
alii
58644c6bb0 bun run clang-format 2025-06-19 13:17:47 +00:00
Alistair Smith
ffa2ccb236 experiment 2025-06-19 14:13:28 +01:00
alii
0b9435ddf4 bun run clang-format 2025-06-17 01:16:32 +00:00
Alistair Smith
b2fa37add0 hm 2025-06-16 17:33:30 -07:00
Alistair Smith
b7ba06152c . 2025-06-16 17:01:22 -07:00
Alistair Smith
39f8e378bd don't wait here 2025-06-16 16:52:01 -07:00
Alistair Smith
562790bcd9 Close 2025-06-16 16:51:20 -07:00
Alistair Smith
b267eb01c2 make postTaskOnMainThreadAndWait 2025-06-16 16:42:53 -07:00
Alistair Smith
ffba9b572f downstream MessagePortChannel.h 2025-06-16 15:26:30 -07:00
alii
2ae44bd778 bun run clang-format 2025-06-16 22:20:57 +00:00
Alistair Smith
bcb09c9d20 Merge branch 'ali/piscina-message-port-webkit' of github.com:oven-sh/bun into ali/piscina-message-port-webkit 2025-06-16 15:18:11 -07:00
Alistair Smith
6325d40050 indent + changes 2025-06-16 15:18:09 -07:00
alii
c1192a3607 bun run clang-format 2025-06-16 20:07:59 +00:00
Alistair Smith
4aac8e33b0 try use WTF::ensureOnMainThread 2025-06-13 21:16:15 -07:00
Alistair Smith
82f20a5375 changes 2025-06-13 20:56:13 -07:00
Alistair Smith
e256665836 this one 2025-06-13 20:08:31 -07:00
Alistair Smith
045fe31935 changes 2025-06-13 19:48:58 -07:00
Alistair Smith
e1df2bea9d import 2025-06-13 19:44:20 -07:00
Alistair Smith
72a6e19478 changes 2025-06-13 19:43:51 -07:00
Alistair Smith
130f106211 start 2025-06-13 19:33:49 -07:00
Alistair Smith
aa8be1f73c changes 2025-06-13 17:49:40 -07:00
Alistair Smith
6746aa4a12 helps wiht reprod 2025-06-13 17:49:08 -07:00
Alistair Smith
c4a5420cb0 piscina atomics test (panicky) 2025-06-13 16:06:27 -07:00
Alistair Smith
d650aa723c change 2025-06-12 15:06:09 -07:00
Alistair Smith
dbeb9cc7e4 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-12 14:16:21 -07:00
Alistair Smith
8bf3da7ed7 dont unref early 2025-06-11 16:48:14 -07:00
Alistair Smith
41ea1152ee comments 2025-06-11 16:41:39 -07:00
Alistair Smith
7654a5c851 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-11 16:41:31 -07:00
Alistair Smith
48491ca7a6 check exit is actually called before parent exits (ref issue?) 2025-06-11 16:39:07 -07:00
Alistair Smith
4c4eb6caf3 test now passes 2025-06-11 16:11:53 -07:00
Alistair Smith
db018fab9c let's see what ci thinks 2025-06-11 15:57:47 -07:00
Alistair Smith
0da86bba5a faster 2025-06-11 15:31:12 -07:00
Alistair Smith
be8ef90134 improve worker memory leak test ? 2025-06-11 15:31:08 -07:00
Alistair Smith
a8683d4d2a this does fix the process exit test 2025-06-11 15:26:17 -07:00
Alistair Smith
88d8b0f258 for now 2025-06-11 14:55:36 -07:00
Alistair Smith
344c097cd6 rm dupe test 2025-06-11 14:47:20 -07:00
Alistair Smith
e63712ac91 rm 2025-06-11 14:37:59 -07:00
Alistair Smith
8a132a96c1 process.exit() failure 2025-06-11 14:37:13 -07:00
Alistair Smith
6b9897e107 changes 2025-06-11 13:43:46 -07:00
Alistair Smith
69e1d46fae unclear if useful 2025-06-10 21:54:49 -07:00
Alistair Smith
7c8da8f982 experiment: try request termination immediately 2025-06-10 18:39:10 -07:00
Alistair Smith
274e8d8b50 we should not tick after process.exit() was called in a webworker 2025-06-10 18:25:39 -07:00
alii
408ac4efe8 bun run prettier 2025-06-10 20:00:44 +00:00
Alistair Smith
fcb6f6cf79 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-10 12:58:00 -07:00
Alistair Smith
4b36cb0b91 Merge branch 'ali/piscina' of github.com:oven-sh/bun into ali/piscina 2025-06-09 13:51:58 -07:00
Alistair Smith
21304e842a Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-09 13:51:50 -07:00
Alistair Smith
5c43d4de9c Merge branch 'main' into ali/piscina 2025-06-08 16:27:34 -07:00
Alistair Smith
01d66e8053 non-nullable lifecycle_handle 2025-06-06 18:23:30 -07:00
Alistair Smith
87e82a1ab7 requestTermination instead of exiting immediately 2025-06-06 18:16:44 -07:00
Alistair Smith
19a2432e82 can→could (fixes a piscina test case) 2025-06-06 16:49:00 -07:00
Alistair Smith
8486cb922f Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-06 11:09:53 -07:00
Alistair Smith
3e2b64efc1 Revert "experiment: clear socket_async_http_abort_tracker?"
This reverts commit 74581471c0.
2025-06-05 16:03:01 -07:00
Alistair Smith
74581471c0 experiment: clear socket_async_http_abort_tracker? 2025-06-05 15:54:43 -07:00
Alistair Smith
e199369151 mv 2025-06-05 15:26:56 -07:00
Alistair Smith
dd0509c606 rm 2025-06-05 14:48:30 -07:00
Alistair Smith
861c03e266 dont miss this 2025-06-05 14:27:22 -07:00
Alistair Smith
64361b2929 node worker is async disposable 2025-06-05 14:18:50 -07:00
Alistair Smith
0c53b78c96 coverage in node to be sure 2025-06-05 13:52:41 -07:00
Alistair Smith
f6dc66925e fix tests (updated to match node behaviour) 2025-06-05 13:43:26 -07:00
Alistair Smith
64e9e1d978 plausible 2025-06-05 13:28:57 -07:00
Alistair Smith
e011d3dc8c Merge branch 'main' into ali/piscina 2025-06-05 12:55:57 -07:00
Alistair Smith
64f82a8c10 remove comment 2025-06-05 12:55:37 -07:00
Alistair Smith
656d1a3098 coverage (ran in both node & bun) 2025-06-05 12:54:27 -07:00
Alistair Smith
9166d43c5e clean up exit logic of worker_threads.Worker 2025-06-05 12:45:53 -07:00
Alistair Smith
75e8e86336 fix test in ci (?) 2025-06-04 19:59:44 -07:00
Alistair Smith
b1fdd29b38 Attempt for CI only 2025-06-04 18:55:04 -07:00
Alistair Smith
3c12e72de6 movie 2025-06-04 18:20:56 -07:00
Alistair Smith
c1b9878607 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-04 17:46:00 -07:00
Alistair Smith
635ff1afe1 hm 2025-06-04 17:33:20 -07:00
Alistair Smith
32976f9136 change 2025-06-04 16:23:56 -07:00
Alistair Smith
fde3eb6d84 further reduce diff 2025-06-04 16:00:53 -07:00
Alistair Smith
e88e2b73dc postImmediateCppTask was unused 2025-06-04 15:56:39 -07:00
Alistair Smith
afee33a37c ignore vibe tools rule in future 2025-06-04 15:41:40 -07:00
Alistair Smith
e7579aa4ac remove vibe tools since not everybody will have it installed 2025-06-04 15:41:11 -07:00
Alistair Smith
c32784fcb9 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-04 15:27:27 -07:00
Alistair Smith
b2593bad58 revert some 2025-06-04 15:11:35 -07:00
Alistair Smith
3c8f1a6cb1 not async (unrelated to test failure) 2025-06-04 15:08:49 -07:00
Alistair Smith
7c4df2543b do the fetch(unrelated to test failure) 2025-06-04 15:06:14 -07:00
Alistair Smith
ca499d43dd lifecycleHandle_ might be gone already 2025-06-04 14:14:30 -07:00
Alistair Smith
d04ebc8029 script execution context is already gone at this point 2025-06-04 13:55:12 -07:00
Alistair Smith
3a0fcab5d6 extract terminate test for node compat work 2025-06-04 13:32:22 -07:00
Alistair Smith
4e4bb0a2b7 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-04 13:03:16 -07:00
Alistair Smith
95e7fdcb36 fwd declare WebWorkerLifecycleHandle 2025-06-04 13:03:13 -07:00
Alistair Smith
d96cb24e0d make nullable for cyclic lifecycle handle 2025-06-04 12:49:51 -07:00
Alistair Smith
20f376b50e revert preloads free 2025-06-04 12:43:05 -07:00
Alistair Smith
e601a5a405 rm exports 2025-06-04 12:32:57 -07:00
Alistair Smith
dfab7869a9 set ref only if not gone 2025-06-04 12:32:26 -07:00
Alistair Smith
622a643f2b rm 2025-06-04 12:29:27 -07:00
Alistair Smith
8f5acf3591 whoops 2025-06-04 10:20:38 -07:00
Alistair Smith
e965f113d6 whoops 2025-06-04 10:19:08 -07:00
alii
51c83890c0 bun run clang-format 2025-06-04 09:48:46 +00:00
Alistair Smith
8ce468a727 Merge branch 'ali/piscina' of github.com:oven-sh/bun into ali/piscina 2025-06-04 02:45:52 -07:00
Alistair Smith
505d1be9bf seems to work 2025-06-04 02:45:24 -07:00
Alistair Smith
2ff973df24 seems to work 2025-06-04 02:18:39 -07:00
Alistair Smith
6049fee6d4 fix 2025-06-04 02:07:43 -07:00
Alistair Smith
76503c8b04 changes 2025-06-04 02:01:12 -07:00
Alistair Smith
559f79443d cjange 2025-06-04 00:27:46 -07:00
Alistair Smith
ba43ff4159 whoops 2025-06-04 00:15:40 -07:00
Alistair Smith
50e8d6cf03 WebWorkerLifecycleHandle 2025-06-04 00:14:39 -07:00
Alistair Smith
3168501f37 invert ownership 2025-06-03 23:40:36 -07:00
Alistair Smith
2e5737f506 . 2025-06-03 22:11:30 -07:00
alii
53f719ef34 bun run clang-format 2025-06-04 04:27:09 +00:00
Alistair Smith
0a4ca0f036 freeWithoutDeinit 2025-06-03 21:24:15 -07:00
Alistair Smith
89f1925f83 not much 2025-06-03 20:56:42 -07:00
Alistair Smith
42023a34d1 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-03 18:52:59 -07:00
Alistair Smith
915a82a27a push 2025-06-03 18:50:57 -07:00
alii
f6a0d58bde bun run clang-format 2025-06-03 20:57:22 +00:00
Alistair Smith
a2cb442429 uaf 2025-06-03 13:54:12 -07:00
Alistair Smith
e194911a1d hm 2025-06-03 13:49:39 -07:00
Alistair Smith
fea76395a7 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-03 13:43:56 -07:00
Alistair Smith
d613409de0 addressing some pr feedback 2025-06-03 13:41:44 -07:00
Alistair Smith
ed46108ff2 uaf 2025-06-03 13:09:16 -07:00
Alistair Smith
0e957c28d8 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-03 12:34:38 -07:00
Alistair Smith
e1ac3373cd Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-02 22:32:47 -07:00
alii
9085bbcaab bun run clang-format 2025-06-03 04:27:04 +00:00
Alistair Smith
6df3ff2776 use a coutnter 2025-06-02 21:24:12 -07:00
Alistair Smith
0e101a87cd Bun__queueImmediateCppTask again 2025-06-02 21:17:15 -07:00
Alistair Smith
bda588ad30 start removing currentTickNr behaviour 2025-06-02 19:25:50 -07:00
Alistair Smith
d5d81d9728 remove immediate_cpp_tasks 2025-06-02 19:25:21 -07:00
Alistair Smith
f3267a2734 nitpick @heimskr 2025-06-02 17:04:47 -07:00
Alistair Smith
71ca5ef648 use Buffer.alloc 2025-06-02 17:01:59 -07:00
Alistair Smith
74f510616d remove immediate cpp task 2025-06-02 16:44:02 -07:00
Alistair Smith
1b6b081e08 Merge branch 'ali/piscina' of github.com:oven-sh/bun into ali/piscina 2025-06-02 16:20:41 -07:00
Alistair Smith
db03da9fb9 attempt: dont process messages if suspended/not entangled 2025-06-02 16:20:37 -07:00
Ben Grant
fe8dfee059 try more retries for suspicious tests 2025-06-02 16:15:43 -07:00
Alistair Smith
5768d6705e ignore .astro dist/build folder in fixture 2025-06-02 15:59:01 -07:00
Alistair Smith
ce175ac95e dont use spawnSync (broken behaviour) 2025-06-02 15:55:05 -07:00
Alistair Smith
a04dd16092 wait 2025-06-02 15:50:53 -07:00
Alistair Smith
82d3de493d tick immediate tasks to process MessagePort postMessage stuff 2025-06-02 14:53:56 -07:00
alii
2d0ccf26c8 bun run prettier 2025-06-02 21:43:18 +00:00
Alistair Smith
f1a2cd04bd Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-02 14:38:42 -07:00
Alistair Smith
03a77a8e9d use assert here so we can test in node 2025-06-02 14:34:56 -07:00
Alistair Smith
87bac43baf ignore 2025-06-02 14:24:42 -07:00
Alistair Smith
6dc55d1124 config 2025-06-02 14:19:30 -07:00
Alistair Smith
0313f7cfe3 fix 2025-06-02 14:10:47 -07:00
Alistair Smith
3c633842da @eastlondoner 2025-06-02 13:22:40 -07:00
Alistair Smith
012e73b95a irrelevant 2025-06-02 12:53:07 -07:00
Alistair Smith
dba909c235 avoid assertion error 2025-06-02 12:43:24 -07:00
Alistair Smith
96ab3a0909 couple of tests 2025-06-02 12:11:18 -07:00
Alistair Smith
5a79317782 merge issue 2025-06-02 11:36:31 -07:00
Alistair Smith
e01e416d13 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-06-02 11:31:09 -07:00
alii
cad8540dab bun run clang-format 2025-05-31 02:02:39 +00:00
Alistair Smith
304714d636 Merge branch 'ali/piscina' of github.com:oven-sh/bun into ali/piscina 2025-05-30 18:59:57 -07:00
Alistair Smith
3d0672c744 Merge branch 'main' into ali/piscina 2025-05-30 18:59:24 -07:00
Alistair Smith
bed24f3a51 use RELEASE_ASSERT (https://github.com/oven-sh/bun/pull/19940#discussion_r2116938040) 2025-05-30 18:55:03 -07:00
Alistair Smith
693947673e revert rule 2025-05-30 18:41:49 -07:00
alii
397227e59c bun run clang-format 2025-05-31 01:28:50 +00:00
Alistair Smith
e86ce16afb Merge branch 'ali/piscina' of github.com:oven-sh/bun into ali/piscina 2025-05-30 18:26:03 -07:00
Alistair Smith
5846c43f90 address some comments 2025-05-30 18:26:01 -07:00
Alistair Smith
01b7d9c293 Update src/bun.js/event_loop.zig
Co-authored-by: 190n <ben@bun.sh>
2025-05-30 18:21:29 -07:00
Alistair Smith
dca7569615 put _compile on actual Module prototype 2025-05-30 17:58:41 -07:00
Alistair Smith
0ce858e605 clean 2025-05-30 15:25:33 -07:00
Alistair Smith
b81e640c82 Merge branch 'main' into ali/piscina 2025-05-30 15:20:36 -07:00
alii
fd8addebdc bun run clang-format 2025-05-30 22:17:47 +00:00
Alistair Smith
437a19691c changes 2025-05-30 15:14:50 -07:00
Alistair Smith
c663ccd83b 1 2025-05-30 15:09:56 -07:00
Alistair Smith
53958f369d fields 2025-05-30 14:52:59 -07:00
Alistair Smith
d7a517cdfc Enhance EventLoop to support immediate C++ tasks. Added immediate_cpp_tasks and next_immediate_cpp_tasks for handling C++ queued tasks, ensuring consistent execution behavior. Updated tickImmediateTasks to log and run C++ tasks appropriately. 2025-05-30 14:26:20 -07:00
Alistair Smith
684e597460 Merge branch 'ali/piscina' of github.com:oven-sh/bun into ali/piscina 2025-05-30 13:55:27 -07:00
Alistair Smith
1fcd442373 drain 2025-05-30 13:55:09 -07:00
alii
964f1cd177 bun run clang-format 2025-05-30 20:50:36 +00:00
Alistair Smith
3dce9aeabd xchanges 2025-05-30 13:42:32 -07:00
Alistair Smith
031ad7adc6 chages 2025-05-30 13:27:48 -07:00
Alistair Smith
3493d31e47 rule 2025-05-30 11:55:20 -07:00
alii
bdc7d047ed bun run prettier 2025-05-30 18:41:44 +00:00
Alistair Smith
622b290908 Merge branch 'ali/piscina' of github.com:oven-sh/bun into ali/piscina 2025-05-30 11:39:01 -07:00
Alistair Smith
f9a563f0c4 move 2025-05-30 11:38:58 -07:00
Alistair Smith
337270b80b Update .vscode/settings.json
Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
2025-05-30 00:33:50 -07:00
Alistair Smith
f11e925756 worker lifecycle port test 2025-05-30 00:00:13 -07:00
Alistair Smith
c34af38f18 notifyNeedTermination() 2025-05-29 18:12:21 -07:00
alii
390abc7687 bun run clang-format 2025-05-30 00:26:48 +00:00
Alistair Smith
cdd959def7 revert 2025-05-29 17:24:20 -07:00
Alistair Smith
816f805c3e and here 2025-05-29 16:34:17 -07:00
Alistair Smith
773216ed2e fix piscina@5 exactly 2025-05-29 16:33:52 -07:00
Alistair Smith
42ae6e6c7e comment 2025-05-29 16:18:52 -07:00
Alistair Smith
6cbd258201 dont drain microtasks queue during spawnSync() 2025-05-29 16:14:05 -07:00
Alistair Smith
689376ea80 unnecessary 2025-05-29 15:37:40 -07:00
alii
cd6fd2cfd0 bun run prettier 2025-05-29 18:50:14 +00:00
Alistair Smith
4804b316b8 delegate ref to subprocess 2025-05-29 11:49:13 -07:00
Alistair Smith
df7c1b3221 Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-05-29 11:48:00 -07:00
Alistair Smith
ab3af5bfd3 better stub child_process channel 2025-05-29 11:45:18 -07:00
Alistair Smith
84e42a0580 always dispatch close 2025-05-29 10:37:17 -07:00
alii
11efc56a07 bun run clang-format 2025-05-28 21:27:30 +00:00
Alistair Smith
ce24d4ea39 rm/clean 2025-05-28 14:24:40 -07:00
Alistair Smith
4ab50bf065 rm 2025-05-28 14:15:51 -07:00
Alistair Smith
97cf7903be Merge branch 'main' of github.com:oven-sh/bun into ali/piscina 2025-05-28 13:21:03 -07:00
Alistair Smith
eec22bfec3 . 2025-05-28 13:01:11 -07:00
Alistair Smith
c9df308835 pass identifier not ptr 2025-05-28 11:30:46 -07:00
Alistair Smith
aa0d8b3baf use .fetch_or() 2025-05-28 00:33:50 -07:00
Alistair Smith
bd2bdfd17b Merge branch 'main' into ali/piscina 2025-05-28 00:19:36 -07:00
Alistair Smith
c11dac53dc only close MessagePort if message listeners 2025-05-28 00:19:07 -07:00
Alistair Smith
a4ee48503d rm 2025-05-27 20:29:36 -07:00
Alistair Smith
e1e44df206 rm 2025-05-27 20:28:53 -07:00
Alistair Smith
cc452f3b4f rm 2025-05-27 20:27:09 -07:00
Alistair Smith
d529390128 fix timeouts? 2025-05-27 20:21:25 -07:00
alii
34089a2af0 bun run clang-format 2025-05-28 03:10:54 +00:00
Alistair Smith
7e571f9dfb some logs 2025-05-27 18:55:01 -07:00
Alistair Smith
23b813db53 move 2025-05-27 18:36:20 -07:00
Alistair Smith
0ac77fee2b msg 2025-05-27 17:20:51 -07:00
Alistair Smith
c0a0fe3e22 plenty of debug logs 2025-05-27 16:19:38 -07:00
Alistair Smith
d37841f2b1 we should test p.destroy() 2025-05-27 16:19:27 -07:00
Alistair Smith
a2c6e613f3 this back 2025-05-27 15:48:07 -07:00
Alistair Smith
940fa4fd4e rm 2025-05-27 15:45:34 -07:00
Alistair Smith
ed8d84158b testing pass? 2025-05-27 15:15:01 -07:00
alii
962ef477da bun run clang-format 2025-05-27 21:59:41 +00:00
Alistair Smith
ef203ce9c7 changes 2025-05-27 14:55:26 -07:00
Alistair Smith
f86e962f25 be clear about async operation order 2025-05-27 13:48:37 -07:00
Alistair Smith
1b4fc47fd9 10s is ok 2025-05-27 13:44:56 -07:00
Alistair Smith
a4ccaa5822 rm 2025-05-27 13:39:49 -07:00
Alistair Smith
aa7f696df0 notifyPortClosed 2025-05-27 13:35:57 -07:00
Alistair Smith
5578f1ec1e simples piscina third party suite 2025-05-27 12:35:01 -07:00
54 changed files with 1625 additions and 217 deletions

6
.gitignore vendored
View File

@@ -1,3 +1,7 @@
.cursor/rules/vibe-tools.mdc
vibe-tools.config.json
.repomix-output.txt
repomix.config.json
.DS_Store
.env
.envrc
@@ -183,4 +187,4 @@ codegen-for-zig-team.tar.gz
*.sock
scratch*.{js,ts,tsx,cjs,mjs}
*.bun-build
*.bun-build

View File

@@ -885,6 +885,7 @@ if(NOT WIN32)
-Wno-unused-function
-Wno-c++23-lambda-attributes
-Wno-nullability-completeness
-Wmisleading-indentation
-Werror
)
else()

View File

@@ -2,7 +2,7 @@ export {};
declare global {
namespace NodeJS {
interface ProcessEnv extends Bun.Env, ImportMetaEnv {}
interface ProcessEnv extends Bun.Env {}
interface Process {
readonly version: string;

View File

@@ -334,7 +334,7 @@ async function runTests() {
const okResults = [];
const flakyResults = [];
const failedResults = [];
const maxAttempts = 1 + (parseInt(options["retries"]) || 0);
const defaultMaxAttempts = 1 + (parseInt(options["retries"]) || 0);
/**
* @param {string} title
@@ -342,11 +342,23 @@ async function runTests() {
* @returns {Promise<TestResult>}
*/
const runTest = async (title, fn) => {
// suspicious tests are run a minimum number of times, larger than the normal retry count, even
// if they pass on the first attempt. we are giving them N chances to fail instead of N chances
// to pass.
const suspiciousTests = [
"test-worker-arraybuffer-zerofill.js",
"worker_destruction.test.ts",
"worker.test.ts",
"test-worker-message-port-transfer-terminate.js",
"worker-lifecycle-message-port.test.ts",
];
const suspicious = suspiciousTests.some(name => title.includes(name));
const maxAttempts = suspicious ? 50 : defaultMaxAttempts;
const index = ++i;
let result, failure, flaky;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (attempt > 1) {
if (attempt > 1 && !suspicious) {
await new Promise(resolve => setTimeout(resolve, 5000 + Math.random() * 10_000));
}
@@ -364,7 +376,11 @@ async function runTests() {
} else {
okResults.push(result);
}
break;
if (suspicious) {
continue;
} else {
break;
}
}
const color = attempt >= maxAttempts ? "red" : "yellow";

View File

@@ -2631,6 +2631,10 @@ pub fn spawnMaybeSync(
defer {
jsc_vm.uwsLoop().internal_loop_data.jsc_vm = old_vm;
}
jsc_vm.eventLoop().is_inside_spawn_sync = true;
defer jsc_vm.eventLoop().is_inside_spawn_sync = false;
while (subprocess.hasPendingActivityNonThreadsafe()) {
if (subprocess.stdin == .buffer) {
subprocess.stdin.buffer.watch();

View File

@@ -57,7 +57,7 @@ static const DOMException::Description descriptions[] = {
{ "QuotaExceededError"_s, "The quota has been exceeded."_s, 22 },
{ "TimeoutError"_s, "The operation timed out."_s, 23 },
{ "InvalidNodeTypeError"_s, "The supplied node is incorrect or has an incorrect ancestor for this operation."_s, 24 },
{ "DataCloneError"_s, "The object can not be cloned."_s, 25 },
{ "DataCloneError"_s, "The object could not be cloned."_s, 25 }, // TODO: This should include an inspection of the object (e.g. "DOMException [DataCloneError]: hello () {} could not be cloned.")
{ "EncodingError"_s, "The encoding operation (either encoded or decoding) failed."_s, 0 },
{ "NotReadableError"_s, "The I/O read operation failed."_s, 0 },
{ "UnknownError"_s, "The operation failed for an unknown transient reason (e.g. out of memory)."_s, 0 },

View File

@@ -8,6 +8,7 @@
#include "BunClientData.h"
#include "EventLoopTask.h"
#include "BunBroadcastChannelRegistry.h"
#include <wtf/threads/BinarySemaphore.h>
#include <wtf/LazyRef.h>
extern "C" void Bun__startLoop(us_loop_t* loop);
@@ -215,10 +216,40 @@ bool ScriptExecutionContext::ensureOnMainThread(Function<void(ScriptExecutionCon
return false;
}
if (WTF::isMainThread()) {
task(*context);
return true;
}
context->postTaskConcurrently(WTFMove(task));
return true;
}
bool ScriptExecutionContext::ensureOnMainThreadAndWait(Function<void(ScriptExecutionContext&)>&& task)
{
auto* context = ScriptExecutionContext::getMainThreadScriptExecutionContext();
if (!context) {
return false;
}
if (WTF::isMainThread()) {
task(*context);
return true;
}
BinarySemaphore semaphore;
context->postTaskConcurrently(
[task = WTFMove(task), &semaphore](ScriptExecutionContext& context) {
task(context);
semaphore.signal();
});
semaphore.wait();
return true;
}
ScriptExecutionContext* ScriptExecutionContext::getMainThreadScriptExecutionContext()
{
Locker locker { allScriptExecutionContextsMapLock };
@@ -366,23 +397,23 @@ ScriptExecutionContext* executionContext(JSC::JSGlobalObject* globalObject)
void ScriptExecutionContext::postTaskConcurrently(Function<void(ScriptExecutionContext&)>&& lambda)
{
auto* task = new EventLoopTask(WTFMove(lambda));
reinterpret_cast<Zig::GlobalObject*>(m_globalObject)->queueTaskConcurrently(task);
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTaskConcurrently(task);
}
// Executes the task on context's thread asynchronously.
void ScriptExecutionContext::postTask(Function<void(ScriptExecutionContext&)>&& lambda)
{
auto* task = new EventLoopTask(WTFMove(lambda));
reinterpret_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
}
// Executes the task on context's thread asynchronously.
void ScriptExecutionContext::postTask(EventLoopTask* task)
{
reinterpret_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
}
// Executes the task on context's thread asynchronously.
void ScriptExecutionContext::postTaskOnTimeout(EventLoopTask* task, Seconds timeout)
{
reinterpret_cast<Zig::GlobalObject*>(m_globalObject)->queueTaskOnTimeout(task, static_cast<int>(timeout.milliseconds()));
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTaskOnTimeout(task, static_cast<int>(timeout.milliseconds()));
}
// Executes the task on context's thread asynchronously.
void ScriptExecutionContext::postTaskOnTimeout(Function<void(ScriptExecutionContext&)>&& lambda, Seconds timeout)
@@ -391,6 +422,19 @@ void ScriptExecutionContext::postTaskOnTimeout(Function<void(ScriptExecutionCont
postTaskOnTimeout(task, timeout);
}
extern "C" void Bun__queueImmediateCppTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
void ScriptExecutionContext::queueImmediateCppTask(Function<void(ScriptExecutionContext&)>&& lambda)
{
auto* task = new EventLoopTask(WTFMove(lambda));
queueImmediateCppTask(task);
}
void ScriptExecutionContext::queueImmediateCppTask(EventLoopTask* task)
{
Bun__queueImmediateCppTask(m_globalObject, task);
}
// Zig bindings
extern "C" ScriptExecutionContextIdentifier ScriptExecutionContextIdentifier__forGlobalObject(JSC::JSGlobalObject* globalObject)
{

View File

@@ -108,6 +108,7 @@ public:
WEBCORE_EXPORT static bool postTaskTo(ScriptExecutionContextIdentifier identifier, Function<void(ScriptExecutionContext&)>&& task);
WEBCORE_EXPORT static bool ensureOnContextThread(ScriptExecutionContextIdentifier, Function<void(ScriptExecutionContext&)>&& task);
WEBCORE_EXPORT static bool ensureOnMainThread(Function<void(ScriptExecutionContext&)>&& task);
WEBCORE_EXPORT static bool ensureOnMainThreadAndWait(Function<void(ScriptExecutionContext&)>&& task);
WEBCORE_EXPORT JSC::JSGlobalObject* globalObject();
@@ -135,6 +136,9 @@ public:
// Executes the task on context's thread asynchronously.
void postTaskOnTimeout(Function<void(ScriptExecutionContext&)>&& lambda, Seconds timeout);
void queueImmediateCppTask(Function<void(ScriptExecutionContext&)>&& lambda);
void queueImmediateCppTask(EventLoopTask* task);
template<typename... Arguments>
void postCrossThreadTask(Arguments&&... arguments)
{
@@ -157,6 +161,14 @@ public:
static ScriptExecutionContext* getMainThreadScriptExecutionContext();
bool canSendMessage()
{
static constexpr size_t maxMessagesPerTick = 1000;
return m_messagesSentThisTick < maxMessagesPerTick;
}
void incrementMessageCount() { m_messagesSentThisTick++; }
void resetMessageCount() { m_messagesSentThisTick = 0; }
private:
JSC::VM* m_vm = nullptr;
JSC::JSGlobalObject* m_globalObject = nullptr;
@@ -169,6 +181,7 @@ private:
LazyRef<ScriptExecutionContext, BunBroadcastChannelRegistry> m_broadcastChannelRegistry;
bool m_willProcessMessageWithMessagePortsSoon { false };
size_t m_messagesSentThisTick { 0 };
us_socket_context_t* webSocketContextSSL();
us_socket_context_t* webSocketContextNoSSL();

View File

@@ -3928,6 +3928,7 @@ extern "C" void JSGlobalObject__clearTerminationException(JSC::JSGlobalObject* g
}
extern "C" void Bun__queueTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
extern "C" void Bun__queueImmediateCppTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
extern "C" void Bun__queueTaskWithTimeout(JSC::JSGlobalObject*, WebCore::EventLoopTask* task, int timeout);
extern "C" void Bun__queueTaskConcurrently(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
extern "C" void Bun__performTask(Zig::GlobalObject* globalObject, WebCore::EventLoopTask* task)
@@ -3952,6 +3953,11 @@ void GlobalObject::queueTask(WebCore::EventLoopTask* task)
Bun__queueTask(this, task);
}
void GlobalObject::queueImmediateCppTask(WebCore::EventLoopTask* task)
{
Bun__queueImmediateCppTask(this, task);
}
void GlobalObject::queueTaskOnTimeout(WebCore::EventLoopTask* task, int timeout)
{
Bun__queueTaskWithTimeout(this, task, timeout);

View File

@@ -170,6 +170,7 @@ public:
void queueTask(WebCore::EventLoopTask* task);
void queueTaskOnTimeout(WebCore::EventLoopTask* task, int timeout);
void queueTaskConcurrently(WebCore::EventLoopTask* task);
void queueImmediateCppTask(WebCore::EventLoopTask* task);
JSDOMStructureMap& structures() WTF_REQUIRES_LOCK(m_gcLock) { return m_structures; }
JSDOMStructureMap& structures(NoLockingNecessaryTag) WTF_IGNORES_THREAD_SAFETY_ANALYSIS

View File

@@ -44,6 +44,7 @@
#include <wtf/TZoneMallocInlines.h>
#include <wtf/Lock.h>
#include <wtf/Scope.h>
#include <wtf/threads/BinarySemaphore.h>
extern "C" void Bun__eventLoop__incrementRefConcurrently(void* bunVM, int delta);
@@ -238,11 +239,79 @@ void MessagePort::close()
return;
m_isDetached = true;
MessagePortChannelProvider::singleton().messagePortClosed(m_identifier);
ScriptExecutionContext::ensureOnMainThread(
[this](ScriptExecutionContext&) {
MessagePortChannelProvider::singleton().messagePortClosed(m_identifier);
});
removeAllEventListeners();
}
void MessagePort::processMessages(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback)
{
auto& vm = context.vm();
auto* globalObject = defaultGlobalObject(context.globalObject());
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running) {
completionCallback();
return;
}
Vector<MessageWithMessagePorts> deferredMessages;
for (auto&& message : messages) {
if (!context.canSendMessage()) {
deferredMessages.append(WTFMove(message));
continue;
}
context.incrementMessageCount();
auto scope = DECLARE_CATCH_SCOPE(vm);
auto ports = MessagePort::entanglePorts(context, WTFMove(message.transferredPorts));
auto event = MessageEvent::create(*context.jsGlobalObject(), message.message.releaseNonNull(), {}, {}, {}, WTFMove(ports));
dispatchEvent(event.event);
if (scope.exception()) [[unlikely]] {
RELEASE_ASSERT(vm.hasPendingTerminationException());
return;
}
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) == ScriptExecutionStatus::Running) {
globalObject->drainMicrotasks();
}
}
if (!deferredMessages.isEmpty()) {
auto* globalObject = defaultGlobalObject(context.globalObject());
auto scriptExecutionStatus = Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject);
if (scriptExecutionStatus != ScriptExecutionStatus::Running || context.isJSExecutionForbidden()) {
completionCallback();
return;
}
// remaining messages should happen on the next on the immediate cpp task queue
auto contextIdentifier = context.identifier();
context.queueImmediateCppTask(
[protectedThis = Ref { *this }, contextIdentifier, deferred = WTFMove(deferredMessages), completionCallback = WTFMove(completionCallback)](ScriptExecutionContext& ctx) mutable {
if (auto* validContext = ScriptExecutionContext::getScriptExecutionContext(contextIdentifier)) {
if (validContext == &ctx && !validContext->activeDOMObjectsAreSuspended() && protectedThis->isEntangled()) {
// then reset for next tick
ctx.resetMessageCount();
protectedThis->processMessages(ctx, WTFMove(deferred), WTFMove(completionCallback));
return;
}
}
// context was destroyed or conditions not met, just complete
completionCallback();
});
} else {
completionCallback();
}
}
void MessagePort::dispatchMessages()
{
// Messages for contexts that are not fully active get dispatched too, but JSAbstractEventListener::handleEvent() doesn't call handlers for these.
@@ -253,60 +322,49 @@ void MessagePort::dispatchMessages()
if (!context || context->activeDOMObjectsAreSuspended() || !isEntangled())
return;
auto messagesTakenHandler = [this, protectedThis = Ref { *this }](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionCallback) mutable {
auto scopeExit = makeScopeExit(WTFMove(completionCallback));
auto executionContextIdentifier = scriptExecutionContext()->identifier();
// LOG(MessagePorts, "MessagePort %s (%p) dispatching %zu messages", m_identifier.logString().utf8().data(), this, messages.size());
auto messagesTakenHandler = [this, protectedThis = Ref { *this }, executionContextIdentifier](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionCallback) mutable {
RefPtr<ScriptExecutionContext> context = ScriptExecutionContext::getScriptExecutionContext(executionContextIdentifier);
RefPtr<ScriptExecutionContext> context = scriptExecutionContext();
if (!context || !context->globalObject())
if (!context || !context->globalObject()) {
completionCallback();
return;
}
ASSERT(context->isContextThread());
auto* globalObject = defaultGlobalObject(context->globalObject());
Ref vm = globalObject->vm();
auto scope = DECLARE_CATCH_SCOPE(vm);
for (auto& message : messages) {
// close() in Worker onmessage handler should prevent next message from dispatching.
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running)
return;
auto ports = MessagePort::entanglePorts(*context, WTFMove(message.transferredPorts));
if (scope.exception()) [[unlikely]] {
// Currently, we assume that the only way we can get here is if we have a termination.
RELEASE_ASSERT(vm->hasPendingTerminationException());
return;
}
// Per specification, each MessagePort object has a task source called the port message queue.
// queueTaskKeepingObjectAlive(context, *this, TaskSource::PostedMessageQueue, [this, event = WTFMove(event)] {
// dispatchEvent(event.event);
// });
ScriptExecutionContext::postTaskTo(context->identifier(), [protectedThis = Ref { *this }, ports = WTFMove(ports), message = WTFMove(message)](ScriptExecutionContext& context) mutable {
auto event = MessageEvent::create(*context.jsGlobalObject(), message.message.releaseNonNull(), {}, {}, {}, WTFMove(ports));
protectedThis->dispatchEvent(event.event);
});
}
processMessages(*context, WTFMove(messages), WTFMove(completionCallback));
};
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(m_identifier, WTFMove(messagesTakenHandler));
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(executionContextIdentifier, m_identifier, WTFMove(messagesTakenHandler));
}
// synchronous for node:worker_threads.receiveMessageOnPort
JSValue MessagePort::tryTakeMessage(JSGlobalObject* lexicalGlobalObject)
{
auto* context = scriptExecutionContext();
if (!context || context->activeDOMObjectsAreSuspended() || !isEntangled())
return jsUndefined();
std::optional<MessageWithMessagePorts> messageWithPorts = MessagePortChannelProvider::fromContext(*context).tryTakeMessageForPort(m_identifier);
std::optional<MessageWithMessagePorts> result;
BinarySemaphore semaphore;
if (!messageWithPorts)
auto callback = [&](std::optional<MessageWithMessagePorts>&& messageWithPorts) {
result = WTFMove(messageWithPorts);
semaphore.signal();
};
ScriptExecutionContext::ensureOnMainThread([identifier = m_identifier, callback](ScriptExecutionContext& context) mutable {
MessagePortChannelProvider::fromContext(context).tryTakeMessageForPort(identifier, WTFMove(callback));
});
semaphore.wait();
if (!result)
return jsUndefined();
auto ports = MessagePort::entanglePorts(*context, WTFMove(messageWithPorts->transferredPorts));
auto message = messageWithPorts->message.releaseNonNull();
auto ports = MessagePort::entanglePorts(*context, WTFMove(result->transferredPorts));
auto message = result->message.releaseNonNull();
return message->deserialize(*lexicalGlobalObject, lexicalGlobalObject, WTFMove(ports), SerializationErrorMode::NonThrowing);
}
@@ -427,8 +485,7 @@ Ref<MessagePort> MessagePort::entangle(ScriptExecutionContext& context, Transfer
bool MessagePort::addEventListener(const AtomString& eventType, Ref<EventListener>&& listener, const AddEventListenerOptions& options)
{
if (eventType == eventNames().messageEvent) {
if (listener->isAttribute())
start();
start();
m_hasMessageEventListener = true;
}
return EventTarget::addEventListener(eventType, WTFMove(listener), options);

View File

@@ -27,6 +27,7 @@
#pragma once
#include "ActiveDOMObject.h"
#include "ContextDestructionObserver.h"
#include "EventTarget.h"
#include "ExceptionOr.h"
#include "MessagePortChannel.h"
@@ -144,9 +145,9 @@ private:
MessagePortIdentifier m_remoteIdentifier;
mutable std::atomic<unsigned> m_refCount { 1 };
void processMessages(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback);
bool m_hasRef { false };
uint32_t m_messageEventCount { 0 };
static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind);
};

View File

@@ -42,6 +42,8 @@ MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, con
: m_ports { port1, port2 }
, m_registry(registry)
{
ASSERT(isMainThread());
relaxAdoptionRequirement();
m_processes[0] = port1.processIdentifier;
@@ -49,16 +51,22 @@ MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, con
m_processes[1] = port2.processIdentifier;
m_entangledToProcessProtectors[1] = this;
m_registry.messagePortChannelCreated(*this);
checkedRegistry()->messagePortChannelCreated(*this);
}
MessagePortChannel::~MessagePortChannel()
{
m_registry.messagePortChannelDestroyed(*this);
checkedRegistry()->messagePortChannelDestroyed(*this);
}
CheckedRef<MessagePortChannelRegistry> MessagePortChannel::checkedRegistry() const
{
return m_registry;
}
std::optional<ProcessIdentifier> MessagePortChannel::processForPort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
return m_processes[i];
@@ -66,11 +74,15 @@ std::optional<ProcessIdentifier> MessagePortChannel::processForPort(const Messag
bool MessagePortChannel::includesPort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
return m_ports[0] == port || m_ports[1] == port;
}
void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& port, ProcessIdentifier process)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
@@ -84,6 +96,8 @@ void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& po
void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
// LOG(MessagePorts, "MessagePortChannel %s (%p) disentangling port %s", logString().utf8().data(), this, port.logString().utf8().data());
ASSERT(port == m_ports[0] || port == m_ports[1]);
@@ -100,16 +114,14 @@ void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
void MessagePortChannel::closePort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
m_processes[i] = std::nullopt;
m_isClosed[i] = true;
// This set of steps is to guarantee that the lock is unlocked before the
// last ref to this object is released.
Ref protectedThis { *this };
m_pendingMessages[i].clear();
m_pendingMessagePortTransfers[i].clear();
m_pendingMessageProtectors[i] = nullptr;
@@ -118,6 +130,8 @@ void MessagePortChannel::closePort(const MessagePortIdentifier& port)
bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
{
ASSERT(isMainThread());
ASSERT(remoteTarget == m_ports[0] || remoteTarget == m_ports[1]);
size_t i = remoteTarget == m_ports[0] ? 0 : 1;
@@ -135,6 +149,8 @@ bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message,
void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& callback)
{
ASSERT(isMainThread());
// LOG(MessagePorts, "MessagePortChannel %p taking all messages for port %s", this, port.logString().utf8().data());
ASSERT(port == m_ports[0] || port == m_ports[1]);
@@ -145,7 +161,7 @@ void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& por
return;
}
ASSERT(m_pendingMessageProtectors[i]);
ASSERT(m_pendingMessageProtectors[i] == this);
Vector<MessageWithMessagePorts> result;
result.swap(m_pendingMessages[i]);
@@ -154,24 +170,36 @@ void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& por
// LOG(MessagePorts, "There are %zu messages to take for port %s. Taking them now, messages in flight is now %" PRIu64, result.size(), port.logString().utf8().data(), m_messageBatchesInFlight);
callback(WTFMove(result), [this, port, protectedThis = WTFMove(m_pendingMessageProtectors[i])] {
auto size = result.size();
callback(WTFMove(result), [size, port, protectedThis = WTFMove(m_pendingMessageProtectors[i])] {
UNUSED_PARAM(port);
--m_messageBatchesInFlight;
// LOG(MessagePorts, "Message port channel %s was notified that a batch of %zu message port messages targeted for port %s just completed dispatch, in flight is now %" PRIu64, logString().utf8().data(), size, port.logString().utf8().data(), m_messageBatchesInFlight);
UNUSED_PARAM(size);
--(protectedThis->m_messageBatchesInFlight);
// LOG(MessagePorts, "Message port channel %s was notified that a batch of %zu message port messages targeted for port %s just completed dispatch, in flight is now %" PRIu64, protectedThis->logString().utf8().data(), size, port.logString().utf8().data(), protectedThis->m_messageBatchesInFlight);
});
}
std::optional<MessageWithMessagePorts> MessagePortChannel::tryTakeMessageForPort(const MessagePortIdentifier port)
bool MessagePortChannel::hasAnyMessagesPendingOrInFlight() const
{
ASSERT(isMainThread());
return m_messageBatchesInFlight || !m_pendingMessages[0].isEmpty() || !m_pendingMessages[1].isEmpty();
}
void MessagePortChannel::tryTakeMessageForPort(const MessagePortIdentifier port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
if (m_pendingMessages[i].isEmpty())
return std::nullopt;
if (m_pendingMessages[i].isEmpty()) {
callback(std::nullopt);
return;
}
auto message = m_pendingMessages[i].first();
m_pendingMessages[i].removeAt(0);
return WTFMove(message);
callback(WTFMove(message));
}
} // namespace WebCore

View File

@@ -30,9 +30,10 @@
#include "MessageWithMessagePorts.h"
#include "ProcessIdentifier.h"
#include <wtf/HashSet.h>
#include <wtf/RefCounted.h>
#include <wtf/text/WTFString.h>
#include <wtf/RefCountedAndCanMakeWeakPtr.h>
#include <wtf/WeakPtr.h>
#include <wtf/text/MakeString.h>
#include <wtf/text/WTFString.h>
namespace WebCore {
@@ -55,32 +56,31 @@ public:
bool postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget);
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&);
std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier);
void tryTakeMessageForPort(const MessagePortIdentifier, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&);
WEBCORE_EXPORT bool hasAnyMessagesPendingOrInFlight() const;
uint64_t beingTransferredCount();
#if !LOG_DISABLED
String logString() const
{
return makeString(m_ports[0].logString(), ":"_s, m_ports[1].logString());
}
String logString() const { return makeString(m_ports[0].logString(), ':', m_ports[1].logString()); }
#endif
private:
MessagePortChannel(MessagePortChannelRegistry&, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2);
MessagePortIdentifier m_ports[2];
bool m_isClosed[2] { false, false };
std::optional<ProcessIdentifier> m_processes[2];
RefPtr<MessagePortChannel> m_entangledToProcessProtectors[2];
Vector<MessageWithMessagePorts> m_pendingMessages[2];
UncheckedKeyHashSet<RefPtr<MessagePortChannel>> m_pendingMessagePortTransfers[2];
RefPtr<MessagePortChannel> m_pendingMessageProtectors[2];
CheckedRef<MessagePortChannelRegistry> checkedRegistry() const;
std::array<MessagePortIdentifier, 2> m_ports;
std::array<bool, 2> m_isClosed { false, false };
std::array<std::optional<ProcessIdentifier>, 2> m_processes;
std::array<RefPtr<MessagePortChannel>, 2> m_entangledToProcessProtectors;
std::array<Vector<MessageWithMessagePorts>, 2> m_pendingMessages;
std::array<UncheckedKeyHashSet<RefPtr<MessagePortChannel>>, 2> m_pendingMessagePortTransfers;
std::array<RefPtr<MessagePortChannel>, 2> m_pendingMessageProtectors;
uint64_t m_messageBatchesInFlight { 0 };
MessagePortChannelRegistry& m_registry;
CheckedRef<MessagePortChannelRegistry> m_registry;
};
} // namespace WebCore
} // namespace WebCore

View File

@@ -24,13 +24,10 @@
*/
#include "config.h"
// #include "MessagePortChannelProvider.h"
// #include "Document.h"
#include "MessagePortChannelProvider.h"
#include "MessagePortChannelProviderImpl.h"
// #include "WorkerGlobalScope.h"
// #include "WorkletGlobalScope.h"
#include <wtf/MainThread.h>
#include "ScriptExecutionContext.h"
#include "BunWorkerGlobalScope.h"
namespace WebCore {

View File

@@ -26,8 +26,7 @@
#pragma once
#include "ProcessIdentifier.h"
#include "BunWorkerGlobalScope.h"
#include "MessageWithMessagePorts.h"
#include "ScriptExecutionContext.h"
#include <wtf/CompletionHandler.h>
#include <wtf/Vector.h>
@@ -37,7 +36,7 @@ class MessagePortChannelProvider;
namespace WTF {
template<typename T> struct IsDeprecatedWeakRefSmartPointerException;
template<> struct IsDeprecatedWeakRefSmartPointerException<WebCore::MessagePortChannelProvider> : std::true_type {};
template<> struct IsDeprecatedWeakRefSmartPointerException<WebCore::MessagePortChannelProvider> : std::true_type { };
}
namespace WebCore {
@@ -50,18 +49,19 @@ class MessagePortChannelProvider : public CanMakeWeakPtr<MessagePortChannelProvi
public:
static MessagePortChannelProvider& fromContext(ScriptExecutionContext&);
static MessagePortChannelProvider& singleton();
static void setSharedProvider(MessagePortChannelProvider&);
virtual ~MessagePortChannelProvider() {}
virtual ~MessagePortChannelProvider() { }
// Operations that WebProcesses perform
virtual void createNewMessagePortChannel(const MessagePortIdentifier& local, const MessagePortIdentifier& remote) = 0;
virtual void entangleLocalPortInThisProcessToRemote(const MessagePortIdentifier& local, const MessagePortIdentifier& remote) = 0;
virtual void messagePortDisentangled(const MessagePortIdentifier& local) = 0;
virtual void messagePortClosed(const MessagePortIdentifier& local) = 0;
virtual void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) = 0;
virtual std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&) = 0;
virtual void takeAllMessagesForPort(const ScriptExecutionContextIdentifier, const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) = 0;
virtual void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&) = 0;
virtual void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) = 0;
};
} // namespace WebCore
} // namespace WebCore

View File

@@ -22,14 +22,18 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "root.h"
#include "config.h"
#include "MessagePortChannelProviderImpl.h"
#include "MessagePort.h"
#include "ScriptExecutionContext.h"
#include "BunClientData.h"
#include <wtf/MainThread.h>
#include <wtf/RunLoop.h>
extern "C" void* Bun__getVM();
namespace WebCore {
MessagePortChannelProviderImpl::MessagePortChannelProviderImpl() = default;
@@ -41,44 +45,85 @@ MessagePortChannelProviderImpl::~MessagePortChannelProviderImpl()
void MessagePortChannelProviderImpl::createNewMessagePortChannel(const MessagePortIdentifier& local, const MessagePortIdentifier& remote)
{
m_registry.didCreateMessagePortChannel(local, remote);
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, local, remote](ScriptExecutionContext& context) {
if (CheckedPtr registry = weakRegistry.get())
registry->didCreateMessagePortChannel(local, remote);
});
}
void MessagePortChannelProviderImpl::entangleLocalPortInThisProcessToRemote(const MessagePortIdentifier& local, const MessagePortIdentifier& remote)
{
m_registry.didEntangleLocalToRemote(local, remote, Process::identifier());
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, local, remote](ScriptExecutionContext& context) {
if (CheckedPtr registry = weakRegistry.get())
registry->didEntangleLocalToRemote(local, remote, Process::identifier());
});
}
void MessagePortChannelProviderImpl::messagePortDisentangled(const MessagePortIdentifier& local)
{
m_registry.didDisentangleMessagePort(local);
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, local](ScriptExecutionContext& context) {
if (CheckedPtr registry = weakRegistry.get())
registry->didDisentangleMessagePort(local);
});
}
void MessagePortChannelProviderImpl::messagePortClosed(const MessagePortIdentifier& local)
{
m_registry.didCloseMessagePort(local);
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, local](ScriptExecutionContext& context) {
if (CheckedPtr registry = weakRegistry.get())
registry->didCloseMessagePort(local);
});
}
void MessagePortChannelProviderImpl::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
{
if (m_registry.didPostMessageToRemote(WTFMove(message), remoteTarget))
MessagePort::notifyMessageAvailable(remoteTarget);
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, message = WTFMove(message), remoteTarget](ScriptExecutionContext& context) mutable {
CheckedPtr registry = weakRegistry.get();
if (!registry)
return;
if (registry->didPostMessageToRemote(WTFMove(message), remoteTarget))
MessagePort::notifyMessageAvailable(remoteTarget);
});
}
void MessagePortChannelProviderImpl::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& outerCallback)
void MessagePortChannelProviderImpl::takeAllMessagesForPort(const ScriptExecutionContextIdentifier identifier, const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& outerCallback)
{
// It is the responsibility of outerCallback to get itself to the appropriate thread (e.g. WebWorker thread)
auto callback = [outerCallback = WTFMove(outerCallback)](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& messageDeliveryCallback) mutable {
// ASSERT(isMainThread());
outerCallback(WTFMove(messages), WTFMove(messageDeliveryCallback));
};
if (WTF::isMainThread()) {
m_registry.takeAllMessagesForPort(port, WTFMove(outerCallback));
return;
}
m_registry.takeAllMessagesForPort(port, WTFMove(callback));
auto currentVM = Bun__getVM();
if (!currentVM) {
outerCallback({}, []() {}); // already destroyed
return;
}
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, port, outerCallback = WTFMove(outerCallback), identifier](ScriptExecutionContext& mainContext) mutable {
CheckedPtr registry = weakRegistry.get();
if (!registry) {
ScriptExecutionContext::ensureOnContextThread(identifier, [outerCallback = WTFMove(outerCallback)](ScriptExecutionContext&) mutable {
outerCallback({}, []() {});
});
return;
}
registry->takeAllMessagesForPort(port, [outerCallback = WTFMove(outerCallback), identifier](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionHandler) mutable {
ScriptExecutionContext::ensureOnContextThread(identifier, [outerCallback = WTFMove(outerCallback), messages = WTFMove(messages), completionHandler = WTFMove(completionHandler)](ScriptExecutionContext&) mutable {
auto wrappedCompletionHandler = [completionHandler = WTFMove(completionHandler)]() mutable {
ScriptExecutionContext::ensureOnMainThread([completionHandler = WTFMove(completionHandler)](ScriptExecutionContext&) mutable {
completionHandler();
});
};
outerCallback(WTFMove(messages), WTFMove(wrappedCompletionHandler));
});
});
});
}
std::optional<MessageWithMessagePorts> MessagePortChannelProviderImpl::tryTakeMessageForPort(const MessagePortIdentifier& port)
void MessagePortChannelProviderImpl::tryTakeMessageForPort(const MessagePortIdentifier& port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
{
return m_registry.tryTakeMessageForPort(port);
m_registry.tryTakeMessageForPort(port, WTFMove(callback));
}
} // namespace WebCore

View File

@@ -27,7 +27,7 @@
#include "MessagePortChannelProvider.h"
#include "MessagePortChannelRegistry.h"
#include "MessageWithMessagePorts.h"
#include "ScriptExecutionContext.h"
namespace WebCore {
@@ -42,10 +42,10 @@ private:
void messagePortDisentangled(const MessagePortIdentifier& local) final;
void messagePortClosed(const MessagePortIdentifier& local) final;
void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) final;
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) final;
std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&) final;
void takeAllMessagesForPort(const ScriptExecutionContextIdentifier identifier, const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) final;
void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&) final;
MessagePortChannelRegistry m_registry;
};
} // namespace WebCore
} // namespace WebCore

View File

@@ -49,14 +49,14 @@ MessagePortChannelRegistry::~MessagePortChannelRegistry()
void MessagePortChannelRegistry::didCreateMessagePortChannel(const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
{
// LOG(MessagePorts, "Registry: Creating MessagePortChannel %p linking %s and %s", this, port1.logString().utf8().data(), port2.logString().utf8().data());
// ASSERT(isMainThread());
ASSERT(isMainThread());
MessagePortChannel::create(*this, port1, port2);
}
void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& channel)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
auto result = m_openChannels.add(channel.port1(), channel);
ASSERT_UNUSED(result, result.isNewEntry);
@@ -67,20 +67,27 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c
void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel& channel)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
ASSERT(m_openChannels.get(channel.port1()) == &channel);
ASSERT(m_openChannels.get(channel.port2()) == &channel);
m_openChannels.remove(channel.port1());
m_openChannels.remove(channel.port2());
// auto* port1Channel = m_openChannels.get(channel.port1());
// if (port1Channel == &channel)
// m_openChannels.remove(channel.port1());
// auto* port2Channel = m_openChannels.get(channel.port2());
// if (port2Channel == &channel)
// m_openChannels.remove(channel.port2());
// LOG(MessagePorts, "Registry: After removing channel %s there are %u channels left in the registry:", channel.logString().utf8().data(), m_openChannels.size());
}
void MessagePortChannelRegistry::didEntangleLocalToRemote(const MessagePortIdentifier& local, const MessagePortIdentifier& remote, ProcessIdentifier process)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
// The channel might be gone if the remote side was closed.
RefPtr channel = m_openChannels.get(local);
@@ -94,7 +101,7 @@ void MessagePortChannelRegistry::didEntangleLocalToRemote(const MessagePortIdent
void MessagePortChannelRegistry::didDisentangleMessagePort(const MessagePortIdentifier& port)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
// The channel might be gone if the remote side was closed.
if (RefPtr channel = m_openChannels.get(port))
@@ -103,7 +110,7 @@ void MessagePortChannelRegistry::didDisentangleMessagePort(const MessagePortIden
void MessagePortChannelRegistry::didCloseMessagePort(const MessagePortIdentifier& port)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
// LOG(MessagePorts, "Registry: MessagePort %s closed in registry", port.logString().utf8().data());
@@ -124,7 +131,7 @@ void MessagePortChannelRegistry::didCloseMessagePort(const MessagePortIdentifier
bool MessagePortChannelRegistry::didPostMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
// LOG(MessagePorts, "Registry: Posting message to MessagePort %s in registry", remoteTarget.logString().utf8().data());
@@ -140,7 +147,7 @@ bool MessagePortChannelRegistry::didPostMessageToRemote(MessageWithMessagePorts&
void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& callback)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
// The channel might be gone if the remote side was closed.
RefPtr channel = m_openChannels.get(port);
@@ -152,23 +159,25 @@ void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentif
channel->takeAllMessagesForPort(port, WTFMove(callback));
}
std::optional<MessageWithMessagePorts> MessagePortChannelRegistry::tryTakeMessageForPort(const MessagePortIdentifier& port)
void MessagePortChannelRegistry::tryTakeMessageForPort(const MessagePortIdentifier& port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
// LOG(MessagePorts, "Registry: Trying to take a message for MessagePort %s", port.logString().utf8().data());
// The channel might be gone if the remote side was closed.
auto* channel = m_openChannels.get(port);
if (!channel)
return std::nullopt;
if (!channel) {
callback(std::nullopt);
return;
}
return channel->tryTakeMessageForPort(port);
channel->tryTakeMessageForPort(port, WTFMove(callback));
}
MessagePortChannel* MessagePortChannelRegistry::existingChannelContainingPort(const MessagePortIdentifier& port)
{
// ASSERT(isMainThread());
ASSERT(isMainThread());
return m_openChannels.get(port);
}

View File

@@ -49,7 +49,7 @@ public:
WEBCORE_EXPORT void didCloseMessagePort(const MessagePortIdentifier& local);
WEBCORE_EXPORT bool didPostMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget);
WEBCORE_EXPORT void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&);
WEBCORE_EXPORT std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&);
WEBCORE_EXPORT void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&);
WEBCORE_EXPORT MessagePortChannel* existingChannelContainingPort(const MessagePortIdentifier&);

View File

@@ -69,8 +69,7 @@ namespace WebCore {
WTF_MAKE_TZONE_ALLOCATED_IMPL(Worker);
extern "C" void WebWorker__notifyNeedTermination(
void* worker);
extern "C" void WebWorkerLifecycleHandle__requestTermination(WebWorkerLifecycleHandle* worker);
static Lock allWorkersLock;
static HashMap<ScriptExecutionContextIdentifier, Worker*>& allWorkers() WTF_REQUIRES_LOCK(allWorkersLock)
@@ -109,7 +108,7 @@ Worker::Worker(ScriptExecutionContext& context, WorkerOptions&& options)
ASSERT_UNUSED(addResult, addResult.isNewEntry);
}
extern "C" bool WebWorker__updatePtr(void* worker, Worker* ptr);
extern "C" void* WebWorker__create(
extern "C" WebWorkerLifecycleHandle* WebWorkerLifecycleHandle__createWebWorker(
Worker* worker,
void* parent,
BunString name,
@@ -133,12 +132,12 @@ extern "C" void WebWorker__setRef(
void Worker::setKeepAlive(bool keepAlive)
{
WebWorker__setRef(impl_, keepAlive);
WebWorker__setRef(lifecycleHandle_, keepAlive);
}
bool Worker::updatePtr()
{
if (!WebWorker__updatePtr(impl_, this)) {
if (!WebWorker__updatePtr(lifecycleHandle_, this)) {
m_onlineClosingFlags = ClosingFlag;
m_terminationFlags.fetch_or(TerminatedFlag);
return false;
@@ -189,7 +188,7 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
return { reinterpret_cast<WTF::StringImpl**>(vec.begin()), vec.size() };
})
.value_or(std::span<WTF::StringImpl*> {});
void* impl = WebWorker__create(
WebWorkerLifecycleHandle* lifecycleHandle = WebWorkerLifecycleHandle__createWebWorker(
worker.ptr(),
bunVM(context.jsGlobalObject()),
nameStr,
@@ -212,11 +211,11 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
preloadModuleStrings.clear();
if (!impl) {
if (!lifecycleHandle) {
return Exception { TypeError, errorMessage.toWTFString(BunString::ZeroCopy) };
}
worker->impl_ = impl;
worker->lifecycleHandle_ = lifecycleHandle;
worker->m_workerCreationTime = MonotonicTime::now();
return worker;
@@ -228,6 +227,12 @@ Worker::~Worker()
Locker locker { allWorkersLock };
allWorkers().remove(m_clientIdentifier);
}
if (lifecycleHandle_) {
auto* impl = lifecycleHandle_;
lifecycleHandle_ = nullptr;
WebWorkerLifecycleHandle__requestTermination(impl);
}
// m_contextProxy.workerObjectDestroyed();
}
@@ -261,9 +266,11 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
void Worker::terminate()
{
// m_contextProxy.terminateWorkerGlobalScope();
m_terminationFlags.fetch_or(TerminateRequestedFlag);
WebWorker__notifyNeedTermination(impl_);
auto* impl = lifecycleHandle_;
lifecycleHandle_ = nullptr;
WebWorkerLifecycleHandle__requestTermination(impl);
}
// const char* Worker::activeDOMObjectName() const
@@ -468,6 +475,7 @@ void Worker::forEachWorker(const Function<Function<void(ScriptExecutionContext&)
extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* worker, int32_t exitCode)
{
worker->dispatchExit(exitCode);
// no longer referenced by Zig
worker->deref();

View File

@@ -50,6 +50,7 @@ class WorkerGlobalScopeProxy;
struct StructuredSerializeOptions;
struct WorkerOptions;
struct WebWorkerLifecycleHandle;
class Worker final : public ThreadSafeRefCounted<Worker>, public EventTargetWithInlineData, private ContextDestructionObserver {
WTF_MAKE_TZONE_ALLOCATED(Worker);
@@ -119,7 +120,7 @@ private:
// Tracks TerminateRequestedFlag and TerminatedFlag
std::atomic<uint8_t> m_terminationFlags { 0 };
const ScriptExecutionContextIdentifier m_clientIdentifier;
void* impl_ { nullptr };
WebWorkerLifecycleHandle* lifecycleHandle_ { nullptr };
};
JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject);

View File

@@ -11,8 +11,13 @@ tasks: Queue = undefined,
/// - immediate_tasks: tasks that will run on the current tick
///
/// Having two queues avoids infinite loops creating by calling `setImmediate` in a `setImmediate` callback.
/// We also have immediate_cpp_tasks and next_immediate_cpp_tasks which are basically
/// exactly the same thing, except these just come from c++ code. The behaviour and theory
/// for executing them is the same. You can call "globalObject->queueImmediateCppTask()" to queue a task from cpp
immediate_tasks: std.ArrayListUnmanaged(*Timer.ImmediateObject) = .{},
immediate_cpp_tasks: std.ArrayListUnmanaged(*CppTask) = .{},
next_immediate_tasks: std.ArrayListUnmanaged(*Timer.ImmediateObject) = .{},
next_immediate_cpp_tasks: std.ArrayListUnmanaged(*CppTask) = .{},
concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
global: *JSC.JSGlobalObject = undefined,
@@ -29,6 +34,13 @@ imminent_gc_timer: std.atomic.Value(?*Timer.WTFTimer) = .{ .raw = null },
signal_handler: if (Environment.isPosix) ?*PosixSignalHandle else void = if (Environment.isPosix) null,
// this exists because while we're inside a spawnSync call, some tasks can actually
// still complete which leads to a case where module resolution can partially complete and
// some modules are only partialy evaluated which causes reference errors.
// TODO: A better fix here could be a second event loop so we can come off the main one
// while processing spawnSync, then resume back to here afterwards
is_inside_spawn_sync: bool = false,
pub const Debug = if (Environment.isDebug) struct {
is_inside_tick_queue: bool = false,
js_call_count_outside_tick_queue: usize = 0,
@@ -113,6 +125,11 @@ pub fn drainMicrotasksWithGlobal(this: *EventLoop, globalObject: *JSC.JSGlobalOb
scope.init(globalObject, @src());
defer scope.deinit();
// see is_inside_spawn_sync doc comment
if (this.is_inside_spawn_sync) {
return;
}
jsc_vm.releaseWeakRefs();
JSC__JSGlobalObject__drainMicrotasks(globalObject);
try scope.assertNoExceptionExceptTermination();
@@ -196,10 +213,19 @@ fn tickWithCount(this: *EventLoop, virtual_machine: *VirtualMachine) u32 {
pub fn tickImmediateTasks(this: *EventLoop, virtual_machine: *VirtualMachine) void {
var to_run_now = this.immediate_tasks;
var to_run_now_cpp = this.immediate_cpp_tasks;
this.immediate_tasks = this.next_immediate_tasks;
this.next_immediate_tasks = .{};
this.immediate_cpp_tasks = this.next_immediate_cpp_tasks;
this.next_immediate_cpp_tasks = .{};
for (to_run_now_cpp.items) |task| {
log("running immediate cpp task", .{});
task.run(virtual_machine.global);
}
var exception_thrown = false;
for (to_run_now.items) |task| {
exception_thrown = task.runImmediateTask(virtual_machine);
@@ -210,6 +236,22 @@ pub fn tickImmediateTasks(this: *EventLoop, virtual_machine: *VirtualMachine) vo
this.maybeDrainMicrotasks();
}
if (this.next_immediate_cpp_tasks.capacity > 0) {
// this would only occur if we were recursively running tickImmediateTasks.
@branchHint(.unlikely);
this.immediate_cpp_tasks.appendSlice(bun.default_allocator, this.next_immediate_cpp_tasks.items) catch bun.outOfMemory();
this.next_immediate_cpp_tasks.deinit(bun.default_allocator);
}
if (to_run_now_cpp.capacity > 1024 * 128) {
// once in a while, deinit the array to free up memory
to_run_now_cpp.clearAndFree(bun.default_allocator);
} else {
to_run_now_cpp.clearRetainingCapacity();
}
this.next_immediate_cpp_tasks = to_run_now_cpp;
if (this.next_immediate_tasks.capacity > 0) {
// this would only occur if we were recursively running tickImmediateTasks.
@branchHint(.unlikely);
@@ -521,6 +563,10 @@ pub fn enqueueTask(this: *EventLoop, task: Task) void {
this.tasks.writeItem(task) catch unreachable;
}
pub fn enqueueImmediateCppTask(this: *EventLoop, task: *CppTask) void {
this.immediate_cpp_tasks.append(bun.default_allocator, task) catch bun.outOfMemory();
}
pub fn enqueueImmediateTask(this: *EventLoop, task: *Timer.ImmediateObject) void {
this.immediate_tasks.append(bun.default_allocator, task) catch bun.outOfMemory();
}

View File

@@ -13,6 +13,7 @@
#include "JavaScriptCore/Completion.h"
#include "JavaScriptCore/JSNativeStdFunction.h"
#include "JSCommonJSExtensions.h"
#include "JSCommonJSModule.h"
#include "PathInlines.h"
#include "ZigGlobalObject.h"
@@ -713,6 +714,16 @@ static JSValue getModulePrototypeObject(VM& vm, JSObject* moduleObject)
setterRequireFunction),
0);
prototype->putDirectNativeFunction(
vm,
globalObject,
JSC::Identifier::fromString(vm, "_compile"_s),
2,
functionJSCommonJSModule_compile,
JSC::ImplementationVisibility::Public,
JSC::NoIntrinsic,
static_cast<unsigned>(JSC::PropertyAttribute::DontEnum));
return prototype;
}

View File

@@ -260,7 +260,7 @@ fn setCwd_(globalObject: *JSC.JSGlobalObject, to: *JSC.ZigString) bun.JSError!JS
// TODO(@190n) this may need to be noreturn
pub fn exit(globalObject: *JSC.JSGlobalObject, code: u8) callconv(.c) void {
var vm = globalObject.bunVM();
vm.exit_handler.exit_code = code;
vm.exit_handler.exit_code = code; // TODO(@alii): https://github.com/oven-sh/bun/pull/20213
if (vm.worker) |worker| {
// TODO(@190n) we may need to use requestTerminate or throwTerminationException
// instead to terminate the worker sooner

View File

@@ -78,6 +78,12 @@ pub export fn Bun__queueTask(global: *JSGlobalObject, task: *JSC.CppTask) void {
global.bunVM().eventLoop().enqueueTask(JSC.Task.init(task));
}
pub export fn Bun__queueImmediateCppTask(global: *JSGlobalObject, task: *JSC.CppTask) void {
JSC.markBinding(@src());
global.bunVM().eventLoop().enqueueImmediateCppTask(task);
}
pub export fn Bun__queueTaskWithTimeout(global: *JSGlobalObject, task: *JSC.CppTask, milliseconds: i32) void {
JSC.markBinding(@src());

View File

@@ -8,6 +8,10 @@ const JSValue = jsc.JSValue;
const Async = bun.Async;
const WTFStringImpl = @import("../string.zig").WTFStringImpl;
const WebWorker = @This();
const RefCount = bun.ptr.ThreadSafeRefCount(@This(), "ref_count", destroy, .{});
pub const new = bun.TrivialNew(@This());
pub const ref = RefCount.ref;
pub const deref = RefCount.deref;
/// null when haven't started yet
vm: ?*jsc.VirtualMachine = null,
@@ -18,6 +22,9 @@ execution_context_id: u32 = 0,
parent_context_id: u32 = 0,
parent: *jsc.VirtualMachine,
ref_count: RefCount,
lifecycle_handle: *WebWorkerLifecycleHandle,
/// To be resolved on the Worker thread at startup, in spin().
unresolved_specifier: []const u8,
preloads: [][]const u8 = &.{},
@@ -70,15 +77,15 @@ pub fn setRequestedTerminate(this: *WebWorker) bool {
return this.requested_terminate.swap(true, .release);
}
export fn WebWorker__updatePtr(worker: *WebWorker, ptr: *anyopaque) bool {
worker.cpp_worker = ptr;
export fn WebWorker__updatePtr(handle: *WebWorkerLifecycleHandle, ptr: *anyopaque) bool {
handle.worker.?.cpp_worker = ptr;
var thread = std.Thread.spawn(
.{ .stack_size = bun.default_thread_stack_size },
startWithErrorHandling,
.{worker},
.{handle.worker.?},
) catch {
worker.deinit();
handle.worker.?.destroy();
return false;
};
thread.detach();
@@ -194,6 +201,7 @@ pub fn create(
execArgv_len: usize,
preload_modules_ptr: ?[*]bun.String,
preload_modules_len: usize,
lifecycle_handle: *WebWorkerLifecycleHandle,
) callconv(.c) ?*WebWorker {
jsc.markBinding(@src());
log("[{d}] WebWorker.create", .{this_context_id});
@@ -224,8 +232,9 @@ pub fn create(
}
}
var worker = bun.default_allocator.create(WebWorker) catch bun.outOfMemory();
worker.* = WebWorker{
const worker = WebWorker.new(.{
.lifecycle_handle = lifecycle_handle,
.ref_count = .init(),
.cpp_worker = cpp_worker,
.parent = parent,
.parent_context_id = parent_context_id,
@@ -245,10 +254,10 @@ pub fn create(
.argv = if (argv_ptr) |ptr| ptr[0..argv_len] else &.{},
.execArgv = if (inherit_execArgv) null else (if (execArgv_ptr) |ptr| ptr[0..execArgv_len] else &.{}),
.preloads = preloads.items,
};
});
worker.parent_poll_ref.ref(parent);
worker.ref();
return worker;
}
@@ -264,6 +273,7 @@ pub fn startWithErrorHandling(
pub fn start(
this: *WebWorker,
) anyerror!void {
errdefer this.deref();
if (this.name.len > 0) {
Output.Source.configureNamedThread(this.name);
} else {
@@ -364,7 +374,10 @@ fn deinit(this: *WebWorker) void {
bun.default_allocator.free(preload);
}
bun.default_allocator.free(this.preloads);
bun.default_allocator.destroy(this);
}
fn destroy(this: *WebWorker) void {
bun.destroy(this);
}
fn flushLogs(this: *WebWorker) void {
@@ -388,7 +401,7 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
var buffered_writer_ = bun.MutableString.BufferedWriter{ .context = &array };
var buffered_writer = &buffered_writer_;
var worker = vm.worker orelse @panic("Assertion failure: no worker");
const worker = vm.worker orelse @panic("Assertion failure: no worker");
const writer = buffered_writer.writer();
const Writer = @TypeOf(writer);
@@ -421,9 +434,12 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
jsc.markBinding(@src());
WebWorker__dispatchError(globalObject, worker.cpp_worker, bun.String.createUTF8(array.slice()), error_instance);
if (vm.worker) |worker_| {
_ = worker.setRequestedTerminate();
worker.parent_poll_ref.unrefConcurrently(worker.parent);
worker_.exitAndDeinit();
// During unhandled rejection, we're already holding the API lock - now
// is the right time to set exit_called to true so that
// notifyNeedTermination uses vm.global.requestTermination() instead of
// vm.jsc.notifyNeedTermination() which avoid VMTraps assertion failures
worker_.exit_called = true;
worker_.lifecycle_handle.requestTermination();
}
}
@@ -528,12 +544,13 @@ fn spin(this: *WebWorker) void {
}
/// This is worker.ref()/.unref() from JS (Caller thread)
pub fn setRef(this: *WebWorker, value: bool) callconv(.c) void {
if (this.hasRequestedTerminate()) {
return;
pub fn setRef(handle: *WebWorkerLifecycleHandle, value: bool) callconv(.c) void {
if (handle.worker) |worker| {
if (worker.hasRequestedTerminate()) {
return;
}
worker.setRefInternal(value);
}
this.setRefInternal(value);
}
pub fn setRefInternal(this: *WebWorker, value: bool) void {
@@ -551,10 +568,11 @@ pub fn exit(this: *WebWorker) void {
}
/// Request a terminate from any thread.
pub fn notifyNeedTermination(this: *WebWorker) callconv(.c) void {
pub fn notifyNeedTermination(this: *WebWorker) void {
if (this.status.load(.acquire) == .terminated) {
return;
}
if (this.setRequestedTerminate()) {
return;
}
@@ -562,11 +580,17 @@ pub fn notifyNeedTermination(this: *WebWorker) callconv(.c) void {
if (this.vm) |vm| {
vm.eventLoop().wakeup();
// TODO(@190n) notifyNeedTermination
}
// TODO(@190n) delete
this.setRefInternal(false);
if (this.exit_called) {
// For process.exit() called from JavaScript, use JSC's termination
// exception mechanism to interrupt ongoing JS execution
vm.global.requestTermination();
} else {
// For external terminate requests (e.g worker.terminate() from parent thread),
// use VM traps system
vm.jsc.notifyNeedTermination();
}
}
}
/// This handles cleanup, emitting the "close" event, and deinit.
@@ -579,6 +603,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
log("[{d}] exitAndDeinit", .{this.execution_context_id});
const cpp_worker = this.cpp_worker;
var exit_code: i32 = 0;
var globalObject: ?*jsc.JSGlobalObject = null;
var vm_to_deinit: ?*jsc.VirtualMachine = null;
@@ -593,7 +618,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
vm_to_deinit = vm;
}
var arena = this.arena;
this.lifecycle_handle.onTermination();
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);
if (loop) |loop_| {
loop_.internal_loop_data.jsc_vm = null;
@@ -606,18 +631,117 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm.
}
bun.deleteAllPoolsForThreadExit();
if (arena) |*arena_| {
arena_.deinit();
}
this.deref();
bun.exitThread();
}
pub export fn WebWorkerLifecycleHandle__requestTermination(handle: ?*WebWorkerLifecycleHandle) void {
if (handle) |h| {
h.requestTermination();
}
}
/// Manages the complex timing surrounding web worker creation and destruction
const WebWorkerLifecycleHandle = struct {
const RefCount = bun.ptr.ThreadSafeRefCount(@This(), "ref_count", WebWorkerLifecycleHandle.deinit, .{});
pub const ref = WebWorkerLifecycleHandle.RefCount.ref;
pub const deref = WebWorkerLifecycleHandle.RefCount.deref;
mutex: bun.Mutex = .{},
worker: ?*WebWorker = null,
requested_terminate: std.atomic.Value(bool) = .init(false),
ref_count: WebWorkerLifecycleHandle.RefCount,
pub const new = bun.TrivialNew(WebWorkerLifecycleHandle);
pub fn createWebWorker(
cpp_worker: *void,
parent: *jsc.VirtualMachine,
name_str: bun.String,
specifier_str: bun.String,
error_message: *bun.String,
parent_context_id: u32,
this_context_id: u32,
mini: bool,
default_unref: bool,
eval_mode: bool,
argv_ptr: ?[*]WTFStringImpl,
argv_len: usize,
inherit_execArgv: bool,
execArgv_ptr: ?[*]WTFStringImpl,
execArgv_len: usize,
preload_modules_ptr: ?[*]bun.String,
preload_modules_len: usize,
) callconv(.c) *WebWorkerLifecycleHandle {
const handle = WebWorkerLifecycleHandle.new(.{
.worker = null,
.ref_count = .init(),
});
const worker = create(cpp_worker, parent, name_str, specifier_str, error_message, parent_context_id, this_context_id, mini, default_unref, eval_mode, argv_ptr, argv_len, inherit_execArgv, execArgv_ptr, execArgv_len, preload_modules_ptr, preload_modules_len, handle);
handle.worker = worker;
return handle;
}
pub fn deinit(this: *WebWorkerLifecycleHandle) void {
bun.destroy(this);
}
pub fn requestTermination(self: *WebWorkerLifecycleHandle) void {
if (self.requested_terminate.load(.acquire)) {
return;
}
self.ref();
self.mutex.lock();
if (self.requested_terminate.swap(true, .monotonic)) {
self.mutex.unlock();
self.deref();
return;
}
if (self.worker) |worker| {
self.worker = null;
worker.notifyNeedTermination();
self.mutex.unlock();
worker.deref();
} else {
self.mutex.unlock();
// Let the reference counting system handle deinitialization
self.deref();
}
self.deref();
}
pub fn onTermination(self: *WebWorkerLifecycleHandle) void {
self.ref();
self.mutex.lock();
if (self.requested_terminate.swap(false, .acquire)) {
// we already requested to terminate, therefore this handle has
// already been consumed on the other thread and we are able to free
// it. Let the reference counting system handle deinitialization.
self.mutex.unlock();
self.deref();
return;
}
self.worker = null;
self.mutex.unlock();
self.deref();
}
};
comptime {
@export(&create, .{ .name = "WebWorker__create" });
@export(&notifyNeedTermination, .{ .name = "WebWorker__notifyNeedTermination" });
@export(&WebWorkerLifecycleHandle.createWebWorker, .{ .name = "WebWorkerLifecycleHandle__createWebWorker" });
@export(&setRef, .{ .name = "WebWorker__setRef" });
_ = WebWorker__updatePtr;
}
const assert = bun.assert;

View File

@@ -357,8 +357,9 @@ JSC_DEFINE_HOST_FUNCTION(functionStartDirectStream, (JSC::JSGlobalObject * lexic
templ += `
void ${className}::ref() {
if (!m_sinkPtr)
return;
if (!m_sinkPtr) {
return;
}
m_refCount++;
if (m_refCount == 1) {
@@ -367,14 +368,14 @@ JSC_DEFINE_HOST_FUNCTION(functionStartDirectStream, (JSC::JSGlobalObject * lexic
}
void ${className}::unref() {
if (!m_sinkPtr)
return;
if (!m_sinkPtr) {
return;
}
m_refCount = std::max(0, m_refCount - 1);
if (!m_refCount)
{
m_refCount = std::max(0, m_refCount - 1);
if (!m_refCount) {
${name}__updateRef(m_sinkPtr, false);
}
}
}
JSC_DEFINE_HOST_FUNCTION(${name}__ref, (JSC::JSGlobalObject * lexicalGlobalObject, JSC::CallFrame *callFrame))

View File

@@ -1,3 +1,5 @@
import type { Pipe as NodeStreamPipe } from "node:stream";
// Hardcoded module "node:child_process"
const EventEmitter = require("node:events");
const OsModule = require("node:os");
@@ -1035,13 +1037,15 @@ class ChildProcess extends EventEmitter {
#handle;
#closesNeeded = 1;
#closesGot = 0;
disconnect: undefined | (() => void);
signalCode = null;
exitCode = null;
spawnfile;
spawnargs;
pid;
channel;
channel: NodeStreamPipe | undefined;
killed = false;
[Symbol.dispose]() {
@@ -1332,7 +1336,7 @@ class ChildProcess extends EventEmitter {
if (has_ipc) {
this.send = this.#send;
this.disconnect = this.#disconnect;
this.channel = new Control();
this.channel = new SubprocessChannel(this);
Object.defineProperty(this, "_channel", {
get() {
return this.channel;
@@ -1624,9 +1628,46 @@ function abortChildProcess(child, killSignal, reason) {
}
}
class Control extends EventEmitter {
constructor() {
class SubprocessChannel extends EventEmitter implements NodeStreamPipe {
#subprocess: ChildProcess;
#closed: boolean = false;
public constructor(childProcess: ChildProcess) {
super();
this.#subprocess = childProcess;
}
public close(): void {
if (this.#closed) return;
this.#closed = true;
if (this.#subprocess.connected) {
this.#subprocess.disconnect?.();
}
process.nextTick(() => {
this.emit("close");
});
}
public hasRef(): boolean {
if (this.#closed) return false;
const handle = this.#subprocess[kHandle];
if (!handle) return false;
return this.#subprocess.connected;
}
public ref(): void {
if (this.#closed) return;
this.#subprocess.ref();
}
public unref(): void {
if (this.#closed) return;
this.#subprocess.unref();
}
}

View File

@@ -1,4 +1,5 @@
// import type { Readable, Writable } from "node:stream";
// import type { WorkerOptions } from "node:worker_threads";
declare const self: typeof globalThis;
type WebWorker = InstanceType<typeof globalThis.Worker>;
@@ -225,15 +226,19 @@ function moveMessagePortToContext() {
const unsupportedOptions = ["stdin", "stdout", "stderr", "trackedUnmanagedFds", "resourceLimits"];
class Worker extends EventEmitter {
class Worker extends EventEmitter implements AsyncDisposable {
#worker: WebWorker;
#performance;
// this is used by terminate();
// either is the exit code if exited, a promise resolving to the exit code, or undefined if we haven't sent .terminate() yet
#onExitPromise: Promise<number> | number | undefined = undefined;
#onExitResolvers = Promise.withResolvers<number | void>();
#urlToRevoke = "";
async [Symbol.asyncDispose]() {
await this.terminate();
}
constructor(filename: string, options: NodeWorkerOptions = {}) {
super();
for (const key of unsupportedOptions) {
@@ -319,7 +324,13 @@ class Worker extends EventEmitter {
});
}
terminate(callback: unknown) {
terminate(callback?: unknown): Promise<number | void> {
// threadId = -1 signifies the worker was closed already. Node returns PromiseResolve() in this case
// https://github.com/nodejs/node/blob/61601089f7f2f0e5e7abe8240f198585f585704c/lib/internal/worker.js#L390
if (this.threadId === -1) {
return Promise.resolve<void>();
}
if (typeof callback === "function") {
process.emitWarning(
"Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.",
@@ -329,12 +340,8 @@ class Worker extends EventEmitter {
this.#worker.addEventListener("close", event => callback(null, event.code), { once: true });
}
const onExitPromise = this.#onExitPromise;
if (onExitPromise) {
return $isPromise(onExitPromise) ? onExitPromise : Promise.resolve(onExitPromise);
}
const { resolve, promise } = this.#onExitResolvers;
const { resolve, promise } = Promise.withResolvers();
this.#worker.addEventListener(
"close",
event => {
@@ -342,12 +349,13 @@ class Worker extends EventEmitter {
},
{ once: true },
);
this.#worker.terminate();
return (this.#onExitPromise = promise);
return promise;
}
postMessage(...args: [any, any]) {
postMessage(...args: Parameters<Bun.Worker["postMessage"]>) {
return this.#worker.postMessage.$apply(this.#worker, args);
}
@@ -356,8 +364,8 @@ class Worker extends EventEmitter {
return stringPromise.then(s => new HeapSnapshotStream(s));
}
#onClose(e) {
this.#onExitPromise = e.code;
#onClose(e: Event & { code: number }) {
this.#onExitResolvers.resolve(e.code);
this.emit("exit", e.code);
}

View File

@@ -62,6 +62,7 @@
"pg-gateway": "0.3.0-beta.4",
"pino": "9.4.0",
"pino-pretty": "11.2.2",
"piscina": "5.0.0",
"postgres": "3.3.5",
"prisma": "5.1.1",
"prompts": "2.4.2",
@@ -444,6 +445,40 @@
"@napi-rs/canvas-win32-x64-msvc": ["@napi-rs/canvas-win32-x64-msvc@0.1.65", "", { "os": "win32", "cpu": "x64" }, "sha512-RZQX3luWnlNWgdMnLMQ1hyfQraeAn9lnxWWVCHuUM4tAWEV8UDdeb7cMwmJW7eyt8kAosmjeHt3cylQMHOxGFg=="],
"@napi-rs/nice": ["@napi-rs/nice@1.0.1", "", { "optionalDependencies": { "@napi-rs/nice-android-arm-eabi": "1.0.1", "@napi-rs/nice-android-arm64": "1.0.1", "@napi-rs/nice-darwin-arm64": "1.0.1", "@napi-rs/nice-darwin-x64": "1.0.1", "@napi-rs/nice-freebsd-x64": "1.0.1", "@napi-rs/nice-linux-arm-gnueabihf": "1.0.1", "@napi-rs/nice-linux-arm64-gnu": "1.0.1", "@napi-rs/nice-linux-arm64-musl": "1.0.1", "@napi-rs/nice-linux-ppc64-gnu": "1.0.1", "@napi-rs/nice-linux-riscv64-gnu": "1.0.1", "@napi-rs/nice-linux-s390x-gnu": "1.0.1", "@napi-rs/nice-linux-x64-gnu": "1.0.1", "@napi-rs/nice-linux-x64-musl": "1.0.1", "@napi-rs/nice-win32-arm64-msvc": "1.0.1", "@napi-rs/nice-win32-ia32-msvc": "1.0.1", "@napi-rs/nice-win32-x64-msvc": "1.0.1" } }, "sha512-zM0mVWSXE0a0h9aKACLwKmD6nHcRiKrPpCfvaKqG1CqDEyjEawId0ocXxVzPMCAm6kkWr2P025msfxXEnt8UGQ=="],
"@napi-rs/nice-android-arm-eabi": ["@napi-rs/nice-android-arm-eabi@1.0.1", "", { "os": "android", "cpu": "arm" }, "sha512-5qpvOu5IGwDo7MEKVqqyAxF90I6aLj4n07OzpARdgDRfz8UbBztTByBp0RC59r3J1Ij8uzYi6jI7r5Lws7nn6w=="],
"@napi-rs/nice-android-arm64": ["@napi-rs/nice-android-arm64@1.0.1", "", { "os": "android", "cpu": "arm64" }, "sha512-GqvXL0P8fZ+mQqG1g0o4AO9hJjQaeYG84FRfZaYjyJtZZZcMjXW5TwkL8Y8UApheJgyE13TQ4YNUssQaTgTyvA=="],
"@napi-rs/nice-darwin-arm64": ["@napi-rs/nice-darwin-arm64@1.0.1", "", { "os": "darwin", "cpu": "arm64" }, "sha512-91k3HEqUl2fsrz/sKkuEkscj6EAj3/eZNCLqzD2AA0TtVbkQi8nqxZCZDMkfklULmxLkMxuUdKe7RvG/T6s2AA=="],
"@napi-rs/nice-darwin-x64": ["@napi-rs/nice-darwin-x64@1.0.1", "", { "os": "darwin", "cpu": "x64" }, "sha512-jXnMleYSIR/+TAN/p5u+NkCA7yidgswx5ftqzXdD5wgy/hNR92oerTXHc0jrlBisbd7DpzoaGY4cFD7Sm5GlgQ=="],
"@napi-rs/nice-freebsd-x64": ["@napi-rs/nice-freebsd-x64@1.0.1", "", { "os": "freebsd", "cpu": "x64" }, "sha512-j+iJ/ezONXRQsVIB/FJfwjeQXX7A2tf3gEXs4WUGFrJjpe/z2KB7sOv6zpkm08PofF36C9S7wTNuzHZ/Iiccfw=="],
"@napi-rs/nice-linux-arm-gnueabihf": ["@napi-rs/nice-linux-arm-gnueabihf@1.0.1", "", { "os": "linux", "cpu": "arm" }, "sha512-G8RgJ8FYXYkkSGQwywAUh84m946UTn6l03/vmEXBYNJxQJcD+I3B3k5jmjFG/OPiU8DfvxutOP8bi+F89MCV7Q=="],
"@napi-rs/nice-linux-arm64-gnu": ["@napi-rs/nice-linux-arm64-gnu@1.0.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-IMDak59/W5JSab1oZvmNbrms3mHqcreaCeClUjwlwDr0m3BoR09ZiN8cKFBzuSlXgRdZ4PNqCYNeGQv7YMTjuA=="],
"@napi-rs/nice-linux-arm64-musl": ["@napi-rs/nice-linux-arm64-musl@1.0.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-wG8fa2VKuWM4CfjOjjRX9YLIbysSVV1S3Kgm2Fnc67ap/soHBeYZa6AGMeR5BJAylYRjnoVOzV19Cmkco3QEPw=="],
"@napi-rs/nice-linux-ppc64-gnu": ["@napi-rs/nice-linux-ppc64-gnu@1.0.1", "", { "os": "linux", "cpu": "ppc64" }, "sha512-lxQ9WrBf0IlNTCA9oS2jg/iAjQyTI6JHzABV664LLrLA/SIdD+I1i3Mjf7TsnoUbgopBcCuDztVLfJ0q9ubf6Q=="],
"@napi-rs/nice-linux-riscv64-gnu": ["@napi-rs/nice-linux-riscv64-gnu@1.0.1", "", { "os": "linux", "cpu": "none" }, "sha512-3xs69dO8WSWBb13KBVex+yvxmUeEsdWexxibqskzoKaWx9AIqkMbWmE2npkazJoopPKX2ULKd8Fm9veEn0g4Ig=="],
"@napi-rs/nice-linux-s390x-gnu": ["@napi-rs/nice-linux-s390x-gnu@1.0.1", "", { "os": "linux", "cpu": "s390x" }, "sha512-lMFI3i9rlW7hgToyAzTaEybQYGbQHDrpRkg+1gJWEpH0PLAQoZ8jiY0IzakLfNWnVda1eTYYlxxFYzW8Rqczkg=="],
"@napi-rs/nice-linux-x64-gnu": ["@napi-rs/nice-linux-x64-gnu@1.0.1", "", { "os": "linux", "cpu": "x64" }, "sha512-XQAJs7DRN2GpLN6Fb+ZdGFeYZDdGl2Fn3TmFlqEL5JorgWKrQGRUrpGKbgZ25UeZPILuTKJ+OowG2avN8mThBA=="],
"@napi-rs/nice-linux-x64-musl": ["@napi-rs/nice-linux-x64-musl@1.0.1", "", { "os": "linux", "cpu": "x64" }, "sha512-/rodHpRSgiI9o1faq9SZOp/o2QkKQg7T+DK0R5AkbnI/YxvAIEHf2cngjYzLMQSQgUhxym+LFr+UGZx4vK4QdQ=="],
"@napi-rs/nice-win32-arm64-msvc": ["@napi-rs/nice-win32-arm64-msvc@1.0.1", "", { "os": "win32", "cpu": "arm64" }, "sha512-rEcz9vZymaCB3OqEXoHnp9YViLct8ugF+6uO5McifTedjq4QMQs3DHz35xBEGhH3gJWEsXMUbzazkz5KNM5YUg=="],
"@napi-rs/nice-win32-ia32-msvc": ["@napi-rs/nice-win32-ia32-msvc@1.0.1", "", { "os": "win32", "cpu": "ia32" }, "sha512-t7eBAyPUrWL8su3gDxw9xxxqNwZzAqKo0Szv3IjVQd1GpXXVkb6vBBQUuxfIYaXMzZLwlxRQ7uzM2vdUE9ULGw=="],
"@napi-rs/nice-win32-x64-msvc": ["@napi-rs/nice-win32-x64-msvc@1.0.1", "", { "os": "win32", "cpu": "x64" }, "sha512-JlF+uDcatt3St2ntBG8H02F1mM45i5SF9W+bIKiReVE6wiy3o16oBP/yxt+RZ+N6LbCImJXJ6bXNO2kn9AXicg=="],
"@nestjs/common": ["@nestjs/common@11.0.3", "", { "dependencies": { "iterare": "1.2.1", "tslib": "2.8.1", "uid": "2.0.2" }, "peerDependencies": { "class-transformer": "*", "class-validator": "*", "reflect-metadata": "^0.1.12 || ^0.2.0", "rxjs": "^7.1.0" }, "optionalPeers": ["class-transformer", "class-validator"] }, "sha512-fTkJWQ20+jvPKfrv3A+T3wsPwwYSJyoJ+pcBzyKtv5fCpK/yX/rJalFUIpw1CDmarfqIaMX9SdkplNyxtvH6RA=="],
"@nestjs/core": ["@nestjs/core@11.0.3", "", { "dependencies": { "@nuxt/opencollective": "0.4.1", "fast-safe-stringify": "2.1.1", "iterare": "1.2.1", "path-to-regexp": "8.2.0", "tslib": "2.8.1", "uid": "2.0.2" }, "peerDependencies": { "@nestjs/common": "^11.0.0", "@nestjs/microservices": "^11.0.0", "@nestjs/platform-express": "^11.0.0", "@nestjs/websockets": "^11.0.0", "reflect-metadata": "^0.1.12 || ^0.2.0", "rxjs": "^7.1.0" }, "optionalPeers": ["@nestjs/microservices", "@nestjs/platform-express", "@nestjs/websockets"] }, "sha512-6UoVHpwa23HJxMNtuTXQCiqx/NHTG3lRBRgnZ8EDHTjgaNnR7P+xBS68zN3gLH7rBIrhhQ5Q1hVs7WswRxrw7Q=="],
@@ -2028,6 +2063,8 @@
"pino-std-serializers": ["pino-std-serializers@7.0.0", "", {}, "sha512-e906FRY0+tV27iq4juKzSYPbUj2do2X2JX4EzSca1631EB2QJQUqGbDuERal7LCtOpxl6x3+nvo9NPZcmjkiFA=="],
"piscina": ["piscina@5.0.0", "", { "optionalDependencies": { "@napi-rs/nice": "^1.0.1" } }, "sha512-R+arufwL7sZvGjAhSMK3TfH55YdGOqhpKXkcwQJr432AAnJX/xxX19PA4QisrmJ+BTTfZVggaz6HexbkQq1l1Q=="],
"pixelmatch": ["pixelmatch@5.3.0", "", { "dependencies": { "pngjs": "^6.0.0" }, "bin": { "pixelmatch": "bin/pixelmatch" } }, "sha512-o8mkY4E/+LNUf6LzX96ht6k6CEDi65k9G2rjMtBe9Oo+VPKSvl+0GKHuH/AlG+GA5LPG/i5hrekkxUc3s2HU+Q=="],
"pkg-dir": ["pkg-dir@4.2.0", "", { "dependencies": { "find-up": "^4.0.0" } }, "sha512-HRDzbaKjC+AOWVXxAU/x54COGeIv9eb+6CkDSQoNTt4XyWoIJvuPsXizxu/Fr23EiekbtZwmh1IcIG/l/a10GQ=="],

View File

@@ -87,7 +87,7 @@ export function testForFile(file: string): BunTestExports {
var testFile = testFiles.get(file);
if (!testFile) {
testFile = Bun.jest(file);
testFile = (Bun as typeof Bun & { jest: (absoluteSourceFilePath: string) => BunTestExports }).jest(file);
testFiles.set(file, testFile);
}
return testFile;

View File

@@ -5,7 +5,7 @@
* without always needing to run `bun install` in development.
*/
import { gc as bunGC, sleepSync, spawnSync, unsafe, which, write } from "bun";
import { gc as bunGC, readableStreamToText, sleepSync, spawnSync, unsafe, which, write } from "bun";
import { heapStats } from "bun:jsc";
import { afterAll, beforeAll, describe, expect, test } from "bun:test";
import { ChildProcess, fork } from "child_process";
@@ -469,11 +469,43 @@ if (expect.extend)
}
}
},
async toRunAsync(cmds: string[], optionalStdout?: string, expectedCode: number = 0) {
const result = Bun.spawn({
cmd: [bunExe(), ...cmds],
env: bunEnv,
stdio: ["inherit", "pipe", "pipe"],
});
const [stdout, stderr, exitCode] = await Promise.all([
readableStreamToText(result.stdout),
readableStreamToText(result.stderr),
result.exited,
]);
if (exitCode !== expectedCode) {
return {
pass: false,
message: () => `Command ${cmds.join(" ")} failed:` + "\n" + stdout + "\n" + stderr,
};
}
if (optionalStdout != null) {
return {
pass: stdout === optionalStdout,
message: () => `Expected ${cmds.join(" ")} to output ${optionalStdout} but got ${stdout}`,
};
}
return {
pass: true,
message: () => `Expected ${cmds.join(" ")} to run`,
};
},
toRun(cmds: string[], optionalStdout?: string, expectedCode: number = 0) {
const result = Bun.spawnSync({
cmd: [bunExe(), ...cmds],
env: bunEnv,
stdio: ["inherit", "pipe", "inherit"],
stdio: ["ignore", "pipe", "inherit"],
});
if (result.exitCode !== expectedCode) {
@@ -1279,6 +1311,7 @@ interface BunHarnessTestMatchers {
toHaveTestTimedOutAfter(expected: number): void;
toBeBinaryType(expected: keyof typeof binaryTypes): void;
toRun(optionalStdout?: string, expectedCode?: number): void;
toRunAsync(optionalStdout?: string, expectedCode?: number): Promise<void>;
toThrowWithCode(cls: CallableFunction, code: string): void;
toThrowWithCodeAsync(cls: CallableFunction, code: string): Promise<void>;
}

View File

@@ -187,3 +187,6 @@ tsd.expectAssignable<NullSubprocess>(Bun.spawn([], { stdio: ["ignore", "inherit"
tsd.expectAssignable<NullSubprocess>(Bun.spawn([], { stdio: [null, null, null] }));
tsd.expectAssignable<SyncSubprocess<Bun.SpawnOptions.Readable, Bun.SpawnOptions.Readable>>(Bun.spawnSync([], {}));
Bun.spawnSync({ cmd: ["echo", "hello"] });
Bun.spawnSync(["echo", "hello"], { stdio: ["ignore", "pipe", "pipe"] });

View File

@@ -0,0 +1,8 @@
import * as common from '../common/index.mjs';
import { Worker } from 'node:worker_threads';
{
// Verifies that the worker is async disposable
await using worker = new Worker('for(;;) {}', { eval: true });
worker.on('exit', common.mustCall());
}

View File

@@ -0,0 +1,11 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');
// Regression for https://github.com/nodejs/node/issues/43182.
const w = new Worker(new URL('data:text/javascript,process.exit(1);await new Promise(()=>{ process.exit(2); })'));
w.on('exit', common.mustCall((code) => {
assert.strictEqual(code, 1);
}));

View File

@@ -7,7 +7,7 @@ const eachSizeMiB = 100;
const iterations = 5;
function test() {
const code = " ".repeat(eachSizeMiB * 1024 * 1024);
const code = Buffer.alloc(eachSizeMiB * 1024 * 1024, " ").toString();
return new Promise((resolve, reject) => {
const worker = new Worker(code, { eval: true });
worker.on("exit", () => resolve());

View File

@@ -0,0 +1,49 @@
import assert from "node:assert";
import { test } from "node:test";
import { fileURLToPath } from "node:url";
import { MessageChannel, MessagePort, parentPort, Worker } from "node:worker_threads";
interface StartupMessage {
port: MessagePort;
}
if (parentPort) {
parentPort.on("message", (message: StartupMessage) => {
console.log("Worker received startup message");
message.port.postMessage("hello");
message.port.close();
});
} else {
test("worker lifecycle message port", async () => {
const worker = new Worker(fileURLToPath(import.meta.url));
const { port1, port2 } = new MessageChannel();
const { promise, resolve, reject } = Promise.withResolvers<string>();
port1.on("message", (message: string) => {
console.log("Received message:", message);
assert.equal(message, "hello");
worker.terminate();
resolve(message);
});
worker.on("online", () => {
console.log("Worker is online");
const startupMessage: StartupMessage = { port: port2 };
worker.postMessage(startupMessage, [port2]);
});
worker.on("exit", () => {
console.log("Worker exited");
reject();
});
worker.on("error", err => {
reject(err);
});
assert.equal(await promise, "hello");
});
}

View File

@@ -0,0 +1,23 @@
import assert from "node:assert";
import { setTimeout as sleep } from "node:timers/promises";
import { fileURLToPath } from "node:url";
import { Worker, isMainThread, threadId } from "node:worker_threads";
const sleeptime = 100;
if (isMainThread) {
const worker = new Worker(fileURLToPath(import.meta.url));
assert.strictEqual(threadId, 0);
assert.strictEqual(worker.threadId, 1);
console.log(" (main) threadId:", worker.threadId);
await sleep(sleeptime);
assert.strictEqual(await worker.terminate(), 1);
assert.strictEqual(worker.threadId, -1); // should be -1 after termination
assert.strictEqual(await worker.terminate(), undefined); // sequential calling is basically no-op
assert.strictEqual(worker.threadId, -1);
} else {
console.log("(worker) threadId:", threadId);
assert.strictEqual(threadId, 1);
await sleep(sleeptime * 2); // keep it alive definitely longer than the parent
}

View File

@@ -0,0 +1,31 @@
import { once } from "node:events";
import { fileURLToPath } from "node:url";
import { Worker, isMainThread, parentPort } from "node:worker_threads";
if (isMainThread) {
const { test, expect } = await import("bun:test");
test("process.exit() works", async () => {
const worker = new Worker(fileURLToPath(import.meta.url));
worker.on("message", () => expect().fail("worker should not keep executing after process.exit()"));
worker.postMessage("boom");
const [exitCode] = await once(worker, "exit");
expect(exitCode).toBe(2);
});
} else {
console.log("Worker thread started");
parentPort!.on("message", message => {
console.log(`Worker received: ${message}`);
console.log("About to call process.exit(2)...");
process.exit(2);
console.log("process.exit(2) called");
parentPort!.postMessage("i'm still alive!");
});
console.log("Worker is ready, waiting for messages...");
}

View File

@@ -6,7 +6,7 @@ import { Worker, isMainThread, workerData } from "worker_threads";
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
const actions = {
async ["Bun.connect"](port) {
async ["Bun.connect"](port: number) {
await Bun.connect({
hostname: "localhost",
port,
@@ -19,7 +19,7 @@ const actions = {
},
});
},
async ["Bun.listen"](port) {
async ["Bun.listen"](port: number) {
const server = Bun.listen({
hostname: "localhost",
port: 0,
@@ -32,7 +32,7 @@ const actions = {
},
});
},
async ["fetch"](port) {
async ["fetch"](port: number) {
const resp = await fetch("http://localhost:" + port);
await resp.blob();
},
@@ -65,12 +65,10 @@ if (isMainThread) {
const { promise, resolve, reject } = Promise.withResolvers();
promises.push(promise);
worker.on("online", () => {
sleep(1)
.then(() => {
return worker.terminate();
})
.finally(resolve);
worker.once("online", async () => {
await sleep(1);
await worker.terminate();
resolve();
});
worker.on("error", e => reject(e));
}

View File

@@ -4,21 +4,22 @@ import { bunEnv, nodeExe } from "harness";
import { join } from "path";
const fixtureDir = join(import.meta.dirname, "fixtures");
function postNodeFormData(port) {
const result = Bun.spawnSync({
async function postNodeFormData(port) {
const result = Bun.spawn({
cmd: [nodeExe(), join(fixtureDir, "node-form-data.fetch.fixture.js"), port?.toString()],
env: bunEnv,
stdio: ["inherit", "inherit", "inherit"],
});
expect(result.exitCode).toBe(0);
expect(await result.exited).toBe(0);
}
function postNodeAction(port) {
const result = Bun.spawnSync({
async function postNodeAction(port) {
const result = Bun.spawn({
cmd: [nodeExe(), join(fixtureDir, "node-action.fetch.fixture.js"), port?.toString()],
env: bunEnv,
stdio: ["inherit", "inherit", "inherit"],
});
expect(result.exitCode).toBe(0);
expect(await result.exited).toBe(0);
}
describe("astro", async () => {
@@ -66,7 +67,7 @@ describe("astro", async () => {
});
test("is able todo a POST request to an astro action using node", async () => {
postNodeAction(previewServer.port);
await postNodeAction(previewServer.port);
});
test("is able to post form data to an astro using bun", async () => {
@@ -89,6 +90,6 @@ describe("astro", async () => {
});
});
test("is able to post form data to an astro using node", async () => {
postNodeFormData(previewServer.port);
await postNodeFormData(previewServer.port);
});
});

View File

@@ -0,0 +1 @@
.astro

View File

@@ -0,0 +1,6 @@
{
"name": "piscina-test",
"dependencies": {
"piscina": "5.0.0"
}
}

View File

@@ -0,0 +1,22 @@
import { resolve } from "node:path";
import { test } from "node:test";
import { Piscina } from "piscina";
test("piscina atomics", async () => {
const pool = new Piscina<void, void>({
filename: resolve(__dirname, "simple.fixture.ts"),
minThreads: 2,
maxThreads: 2,
atomics: "sync",
});
const tasks: Promise<void>[] = [];
for (let i = 1; i <= 10000; i++) {
tasks.push(pool.run());
}
await Promise.all(tasks);
await pool.destroy();
});

View File

@@ -0,0 +1,63 @@
import { expect, test } from "bun:test";
import { join } from "node:path";
import { Piscina } from "piscina";
setTimeout(() => {
console.error(new Error("Catastrophic failure, exiting so test can fail"));
process.exit(1);
}, 10 * 1000).unref();
test("Piscina basic functionality", async () => {
const piscina = new Piscina({
filename: join(import.meta.dir, "worker.fixture.ts"),
});
const result = await piscina.run({ a: 4, b: 6 });
expect(result).toBe(10);
await piscina.destroy();
});
test("Piscina event loop cleanup", async () => {
const piscina = new Piscina({
filename: join(import.meta.dir, "worker.fixture.ts"),
});
const results = await Promise.all([
piscina.run({ a: 1, b: 2 }),
piscina.run({ a: 3, b: 4 }),
piscina.run({ a: 5, b: 6 }),
]);
expect(results).toEqual([3, 7, 11]);
await piscina.destroy();
});
test("Piscina with idleTimeout", async () => {
const piscina = new Piscina({
filename: join(import.meta.dir, "worker.fixture.ts"),
idleTimeout: 100,
maxThreads: 1,
});
const result = await piscina.run({ a: 10, b: 20 });
expect(result).toBe(30);
await piscina.destroy();
});
test("Piscina error handling", async () => {
const piscina = new Piscina({
filename: join(import.meta.dir, "worker-error.fixture.ts"),
});
const p = await piscina.run({ shouldThrow: true }).then(
() => true,
() => false,
);
expect(p).toBe(false);
await piscina.destroy();
});

View File

@@ -0,0 +1,10 @@
// https://github.com/piscinajs/piscina/blob/ba396ced7afc08a8c16f65fbc367a9b7f4d7e84c/test/fixtures/simple-isworkerthread.ts#L7
import assert from "assert";
import Piscina from "piscina";
assert.strictEqual(Piscina.isWorkerThread, true);
export default function () {
return "done";
}

View File

@@ -0,0 +1,7 @@
export default ({ shouldThrow }: { shouldThrow: boolean }) => {
if (shouldThrow) {
throw new Error("Worker error for testing");
}
return "success";
};

View File

@@ -0,0 +1,4 @@
export default ({ a, b }: { a: number; b: number }) => {
console.log("Worker: calculating", a, "+", b);
return a + b;
};

View File

@@ -0,0 +1,308 @@
import assert from "node:assert";
import { test } from "node:test";
import { MessageChannel, receiveMessageOnPort, Worker } from "worker_threads";
test("MessagePort postMessage respects event loop timing", async () => {
const { port1, port2 } = new MessageChannel();
const messages: string[] = [];
let messageCount = 0;
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", msg => {
messages.push(`received: ${msg}`);
messageCount++;
if (messageCount === 3) {
resolve();
}
});
port2.start();
setImmediate(() => {
messages.push("setImmediate 1");
port1.postMessage("message1");
});
setImmediate(() => {
messages.push("setImmediate 2");
port1.postMessage("message2");
});
setImmediate(() => {
messages.push("setImmediate 3");
port1.postMessage("message3");
});
await promise;
assert.deepStrictEqual(messages, [
"setImmediate 1",
"setImmediate 2",
"setImmediate 3",
"received: message1",
"received: message2",
"received: message3",
]);
port1.close();
port2.close();
});
test("MessagePort messages execute after process.nextTick (Node.js compatibility)", async () => {
const { port1, port2 } = new MessageChannel();
const executionOrder: string[] = [];
let messageReceived = false;
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", () => {
executionOrder.push("message received");
messageReceived = true;
resolve();
});
port2.start();
port1.postMessage("test");
process.nextTick(() => {
executionOrder.push("nextTick 1");
});
process.nextTick(() => {
executionOrder.push("nextTick 2");
});
await promise;
await new Promise(resolve => setImmediate(resolve));
assert.strictEqual(messageReceived, true);
assert.strictEqual(executionOrder[0], "nextTick 1");
assert.strictEqual(executionOrder[1], "nextTick 2");
assert.strictEqual(executionOrder[2], "message received");
port1.close();
port2.close();
});
test("MessagePort message delivery works with workers", async () => {
const worker = new Worker(
`
const { parentPort, MessageChannel } = require('worker_threads');
parentPort.on('message', ({ port }) => {
let count = 0;
port.on('message', (msg) => {
count++;
port.postMessage(\`echo-\${count}: \${msg}\`);
if (count >= 3) {
port.close();
parentPort.postMessage('done');
}
});
port.start();
parentPort.postMessage('ready');
});
`,
{ eval: true },
);
const { port1, port2 } = new MessageChannel();
const messages: string[] = [];
let readyReceived = false;
let doneReceived = false;
const { promise, resolve, reject: rejectWorker } = Promise.withResolvers<void>();
const {
promise: allMessagesReceived,
resolve: resolveAllMessages,
reject: rejectAllMessages,
} = Promise.withResolvers<void>();
AbortSignal.timeout(100).addEventListener("abort", () => {
worker.terminate();
rejectWorker(new Error("timeout"));
rejectAllMessages(new Error("timeout"));
});
worker.on("message", msg => {
if (msg === "ready") {
readyReceived = true;
port1.postMessage("hello1");
port1.postMessage("hello2");
port1.postMessage("hello3");
} else if (msg === "done") {
doneReceived = true;
resolve();
}
});
port1.on("message", msg => {
messages.push(msg);
if (messages.length === 3) {
resolveAllMessages();
}
});
worker.postMessage({ port: port2 }, [port2]);
await Promise.all([promise, allMessagesReceived]);
assert.strictEqual(readyReceived, true);
assert.strictEqual(doneReceived, true);
assert.strictEqual(messages.length, 3);
assert.deepStrictEqual(messages, ["echo-1: hello1", "echo-2: hello2", "echo-3: hello3"]);
port1.close();
worker.terminate();
});
test("MessagePort messages don't starve microtasks", async () => {
const { port1, port2 } = new MessageChannel();
const executionOrder: string[] = [];
let messageCount = 0;
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", () => {
messageCount++;
executionOrder.push(`message-${messageCount}`);
if (messageCount === 3) {
resolve();
}
});
port2.start();
queueMicrotask(() => {
executionOrder.push("microtask-1");
});
port1.postMessage("msg1");
queueMicrotask(() => {
executionOrder.push("microtask-2");
});
port1.postMessage("msg2");
queueMicrotask(() => {
executionOrder.push("microtask-3");
});
port1.postMessage("msg3");
await promise;
assert(executionOrder.includes("microtask-1"));
assert(executionOrder.includes("microtask-2"));
assert(executionOrder.includes("microtask-3"));
assert(executionOrder.includes("message-1"));
assert(executionOrder.includes("message-2"));
assert(executionOrder.includes("message-3"));
port1.close();
port2.close();
});
test("high volume MessagePort operations maintain order", async () => {
const { port1, port2 } = new MessageChannel();
const TOTAL_MESSAGES = 100;
const receivedMessages: number[] = [];
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", msg => {
receivedMessages.push(msg);
if (receivedMessages.length === TOTAL_MESSAGES) {
resolve();
}
});
port2.start();
for (let i = 0; i < TOTAL_MESSAGES; i++) {
port1.postMessage(i);
}
await promise;
assert.strictEqual(receivedMessages.length, TOTAL_MESSAGES);
for (let i = 0; i < TOTAL_MESSAGES; i++) {
assert.strictEqual(receivedMessages[i], i);
}
port1.close();
port2.close();
});
test("MessagePort close behavior during message handling", async () => {
const { port1, port2 } = new MessageChannel();
let messageReceived = false;
let errorThrown = false;
const { promise, resolve } = Promise.withResolvers<void>();
port2.on("message", () => {
messageReceived = true;
port2.close();
try {
port1.postMessage("after-close");
} catch (e) {
errorThrown = true;
}
setTimeout(resolve, 10);
});
port2.start();
port1.postMessage("test");
await promise;
assert.strictEqual(messageReceived, true);
assert.strictEqual(errorThrown, false);
port1.close();
});
test("receiveMessageOnPort synchronous message retrieval", () => {
const { port1, port2 } = new MessageChannel();
port1.postMessage("msg1");
port1.postMessage("msg2");
port1.postMessage("msg3");
const result1 = receiveMessageOnPort(port2);
const result2 = receiveMessageOnPort(port2);
const result3 = receiveMessageOnPort(port2);
const result4 = receiveMessageOnPort(port2);
assert.strictEqual(result1?.message, "msg1");
assert.strictEqual(result2?.message, "msg2");
assert.strictEqual(result3?.message, "msg3");
assert.strictEqual(result4, undefined);
port1.close();
port2.close();
});

View File

@@ -0,0 +1,27 @@
import assert from "node:assert";
import { spawnSync } from "node:child_process";
import { test } from "node:test";
import { fileURLToPath } from "node:url";
import { Worker } from "node:worker_threads";
import stripAnsi from "strip-ansi";
const IS_CHILD = process.env.IS_CHILD === "true";
// At the time of writing, this test file passes in Node.js and fails in Bun.
// Node.js seems to wait for the exit event to happen before the parent process
// exits, which means that the Worker's exit code is printed to stdout
if (IS_CHILD) {
const worker = new Worker("process.exit(1)", { eval: true });
worker.on("exit", code => console.log(code));
} else {
test("The worker exit event is emitted before the parent exits", async () => {
const file = fileURLToPath(import.meta.url);
const { stdout } = spawnSync(process.execPath, [file], {
env: { ...process.env, IS_CHILD: "true" },
});
assert.strictEqual(stripAnsi(stdout.toString()).trim(), "1");
});
}

View File

@@ -0,0 +1,293 @@
import { describe, expect, test } from "bun:test";
import { Worker as WebWorker } from "worker_threads";
const CONFIG = {
WARMUP_ITERATIONS: 2,
TEST_ITERATIONS: 10,
BATCH_SIZE: 5,
MEMORY_THRESHOLD_MB: 20,
GC_SETTLE_TIME: 50,
TEST_TIMEOUT_MS: 15000,
};
interface MemorySnapshot {
rss: number;
heapUsed: number;
heapTotal: number;
external: number;
}
function takeMemorySnapshot(): MemorySnapshot {
const mem = process.memoryUsage();
return {
rss: Math.round(mem.rss / 1024 / 1024),
heapUsed: Math.round(mem.heapUsed / 1024 / 1024),
heapTotal: Math.round(mem.heapTotal / 1024 / 1024),
external: Math.round(mem.external / 1024 / 1024),
};
}
async function forceGCAndSettle(): Promise<void> {
for (let i = 0; i < 2; i++) {
Bun.gc(true);
await Bun.sleep(CONFIG.GC_SETTLE_TIME);
}
}
function logMemoryDiff(before: MemorySnapshot, after: MemorySnapshot, label: string) {
const rssDiff = after.rss - before.rss;
const heapDiff = after.heapUsed - before.heapUsed;
console.log(`${label}:`, {
rss: `${before.rss}MB -> ${after.rss}MB (${rssDiff >= 0 ? "+" : ""}${rssDiff}MB)`,
heap: `${before.heapUsed}MB -> ${after.heapUsed}MB (${heapDiff >= 0 ? "+" : ""}${heapDiff}MB)`,
});
if (rssDiff > 50) {
console.warn(`⚠️ Large memory increase detected: +${rssDiff}MB RSS`);
}
}
async function withTimeout<T>(promise: Promise<T>, ms: number, description: string): Promise<T> {
const timeout = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`${description} timed out after ${ms}ms`)), ms);
});
return Promise.race([promise, timeout]);
}
async function runWorkerBatch(workerCode: string, batchSize: number = CONFIG.BATCH_SIZE): Promise<void> {
const workers: WebWorker[] = [];
for (let i = 0; i < batchSize; i++) {
const worker = new WebWorker(workerCode, { eval: true });
workers.push(worker);
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
worker.removeAllListeners();
reject(new Error(`Worker ${i} failed to respond within timeout`));
}, 5000);
worker.once("message", msg => {
clearTimeout(timeout);
if (msg.error) {
reject(new Error(msg.error));
} else {
resolve();
}
});
});
}
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
}
describe("Worker Memory Leak Tests", () => {
test(
"workers should not leak memory with basic create/terminate cycles",
async () => {
const workerCode = `
const { parentPort } = require('worker_threads');
parentPort.postMessage('ready');
`;
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`Warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("Baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 3 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`Test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Basic create/terminate test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
test(
"workers with HTTP activity should not leak memory",
async () => {
using server = Bun.serve({
port: 0,
fetch() {
return new Response("OK");
},
});
const workerCode = `
const { parentPort } = require('worker_threads');
async function doWork() {
try {
const response = await fetch('http://localhost:${server.port}');
await response.text();
parentPort.postMessage('done');
} catch (err) {
parentPort.postMessage({ error: err.message });
}
}
doWork();
`;
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} HTTP warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`HTTP warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("HTTP baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} HTTP test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 3 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`HTTP test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "HTTP activity test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
test(
"workers with message passing should not leak memory",
async () => {
const workerCode = `
const { parentPort } = require('worker_threads');
parentPort.on('message', (msg) => {
if (msg === 'start') {
for (let j = 0; j < 10; j++) {
parentPort.postMessage({ count: j, data: 'x'.repeat(1000) });
}
parentPort.postMessage('done');
}
});
`;
async function runMessagePassingBatch(): Promise<void> {
const workers: WebWorker[] = [];
for (let i = 0; i < CONFIG.BATCH_SIZE; i++) {
const worker = new WebWorker(workerCode, { eval: true });
workers.push(worker);
await new Promise<void>(resolve => {
worker.on("message", msg => {
if (msg === "done") {
resolve();
}
});
worker.postMessage("start");
});
}
await Promise.all(workers.map(worker => worker.terminate()));
await forceGCAndSettle();
}
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} message passing warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runMessagePassingBatch();
console.log(`Message passing warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("Message passing baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} message passing test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runMessagePassingBatch();
if ((i + 1) % 3 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(
`Message passing test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`,
);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Message passing test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
test(
"workers with timers should not leak memory",
async () => {
const workerCode = `
const { parentPort } = require('worker_threads');
const timers = [];
for (let i = 0; i < 5; i++) {
timers.push(setTimeout(() => {}, 10000));
timers.push(setInterval(() => {}, 1000));
}
parentPort.postMessage('ready');
`;
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} timer warmup iterations...`);
// warmup
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
console.log(`Timer warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
}
const baselineMemory = takeMemorySnapshot();
console.log("Timer baseline memory after warmup:", baselineMemory);
console.log(`Running ${CONFIG.TEST_ITERATIONS} timer test iterations...`);
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
await runWorkerBatch(workerCode);
if ((i + 1) % 3 === 0) {
const currentMemory = takeMemorySnapshot();
console.log(`Timer test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
}
}
const finalMemory = takeMemorySnapshot();
logMemoryDiff(baselineMemory, finalMemory, "Timer cleanup test");
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
},
CONFIG.TEST_TIMEOUT_MS,
);
});

View File

@@ -342,12 +342,12 @@ describe("worker_threads", () => {
const worker = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
smol: true,
});
await Bun.sleep(200);
const code = await worker.terminate();
expect(code).toBe(2);
const [exitCode] = await once(worker, "exit");
expect<number | undefined>(await worker.terminate()).toBe(undefined);
expect<number | undefined>(exitCode).toBe(2);
});
test.todo("worker terminating forcefully properly interrupts", async () => {
test("worker terminating forcefully properly interrupts", async () => {
const worker = new wt.Worker(new URL("worker-fixture-while-true.js", import.meta.url).href, {});
await new Promise<void>(done => {
worker.on("message", () => done());

View File

@@ -67,6 +67,7 @@
"pg-gateway": "0.3.0-beta.4",
"pino": "9.4.0",
"pino-pretty": "11.2.2",
"piscina": "5.0.0",
"postgres": "3.3.5",
"prisma": "5.1.1",
"prompts": "2.4.2",