mirror of
https://github.com/oven-sh/bun
synced 2026-02-17 14:22:01 +00:00
Compare commits
212 Commits
claude/fix
...
ali/piscin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cfb379737c | ||
|
|
58644c6bb0 | ||
|
|
ffa2ccb236 | ||
|
|
0b9435ddf4 | ||
|
|
b2fa37add0 | ||
|
|
b7ba06152c | ||
|
|
39f8e378bd | ||
|
|
562790bcd9 | ||
|
|
b267eb01c2 | ||
|
|
ffba9b572f | ||
|
|
2ae44bd778 | ||
|
|
bcb09c9d20 | ||
|
|
6325d40050 | ||
|
|
c1192a3607 | ||
|
|
4aac8e33b0 | ||
|
|
82f20a5375 | ||
|
|
e256665836 | ||
|
|
045fe31935 | ||
|
|
e1df2bea9d | ||
|
|
72a6e19478 | ||
|
|
130f106211 | ||
|
|
aa8be1f73c | ||
|
|
6746aa4a12 | ||
|
|
c4a5420cb0 | ||
|
|
d650aa723c | ||
|
|
dbeb9cc7e4 | ||
|
|
8bf3da7ed7 | ||
|
|
41ea1152ee | ||
|
|
7654a5c851 | ||
|
|
48491ca7a6 | ||
|
|
4c4eb6caf3 | ||
|
|
db018fab9c | ||
|
|
0da86bba5a | ||
|
|
be8ef90134 | ||
|
|
a8683d4d2a | ||
|
|
88d8b0f258 | ||
|
|
344c097cd6 | ||
|
|
e63712ac91 | ||
|
|
8a132a96c1 | ||
|
|
6b9897e107 | ||
|
|
69e1d46fae | ||
|
|
7c8da8f982 | ||
|
|
274e8d8b50 | ||
|
|
408ac4efe8 | ||
|
|
fcb6f6cf79 | ||
|
|
4b36cb0b91 | ||
|
|
21304e842a | ||
|
|
5c43d4de9c | ||
|
|
01d66e8053 | ||
|
|
87e82a1ab7 | ||
|
|
19a2432e82 | ||
|
|
8486cb922f | ||
|
|
3e2b64efc1 | ||
|
|
74581471c0 | ||
|
|
e199369151 | ||
|
|
dd0509c606 | ||
|
|
861c03e266 | ||
|
|
64361b2929 | ||
|
|
0c53b78c96 | ||
|
|
f6dc66925e | ||
|
|
64e9e1d978 | ||
|
|
e011d3dc8c | ||
|
|
64f82a8c10 | ||
|
|
656d1a3098 | ||
|
|
9166d43c5e | ||
|
|
75e8e86336 | ||
|
|
b1fdd29b38 | ||
|
|
3c12e72de6 | ||
|
|
c1b9878607 | ||
|
|
635ff1afe1 | ||
|
|
32976f9136 | ||
|
|
fde3eb6d84 | ||
|
|
e88e2b73dc | ||
|
|
afee33a37c | ||
|
|
e7579aa4ac | ||
|
|
c32784fcb9 | ||
|
|
b2593bad58 | ||
|
|
3c8f1a6cb1 | ||
|
|
7c4df2543b | ||
|
|
ca499d43dd | ||
|
|
d04ebc8029 | ||
|
|
3a0fcab5d6 | ||
|
|
4e4bb0a2b7 | ||
|
|
95e7fdcb36 | ||
|
|
d96cb24e0d | ||
|
|
20f376b50e | ||
|
|
e601a5a405 | ||
|
|
dfab7869a9 | ||
|
|
622a643f2b | ||
|
|
8f5acf3591 | ||
|
|
e965f113d6 | ||
|
|
51c83890c0 | ||
|
|
8ce468a727 | ||
|
|
505d1be9bf | ||
|
|
2ff973df24 | ||
|
|
6049fee6d4 | ||
|
|
76503c8b04 | ||
|
|
559f79443d | ||
|
|
ba43ff4159 | ||
|
|
50e8d6cf03 | ||
|
|
3168501f37 | ||
|
|
2e5737f506 | ||
|
|
53f719ef34 | ||
|
|
0a4ca0f036 | ||
|
|
89f1925f83 | ||
|
|
42023a34d1 | ||
|
|
915a82a27a | ||
|
|
f6a0d58bde | ||
|
|
a2cb442429 | ||
|
|
e194911a1d | ||
|
|
fea76395a7 | ||
|
|
d613409de0 | ||
|
|
ed46108ff2 | ||
|
|
0e957c28d8 | ||
|
|
e1ac3373cd | ||
|
|
9085bbcaab | ||
|
|
6df3ff2776 | ||
|
|
0e101a87cd | ||
|
|
bda588ad30 | ||
|
|
d5d81d9728 | ||
|
|
f3267a2734 | ||
|
|
71ca5ef648 | ||
|
|
74f510616d | ||
|
|
1b6b081e08 | ||
|
|
db03da9fb9 | ||
|
|
fe8dfee059 | ||
|
|
5768d6705e | ||
|
|
ce175ac95e | ||
|
|
a04dd16092 | ||
|
|
82d3de493d | ||
|
|
2d0ccf26c8 | ||
|
|
f1a2cd04bd | ||
|
|
03a77a8e9d | ||
|
|
87bac43baf | ||
|
|
6dc55d1124 | ||
|
|
0313f7cfe3 | ||
|
|
3c633842da | ||
|
|
012e73b95a | ||
|
|
dba909c235 | ||
|
|
96ab3a0909 | ||
|
|
5a79317782 | ||
|
|
e01e416d13 | ||
|
|
cad8540dab | ||
|
|
304714d636 | ||
|
|
3d0672c744 | ||
|
|
bed24f3a51 | ||
|
|
693947673e | ||
|
|
397227e59c | ||
|
|
e86ce16afb | ||
|
|
5846c43f90 | ||
|
|
01b7d9c293 | ||
|
|
dca7569615 | ||
|
|
0ce858e605 | ||
|
|
b81e640c82 | ||
|
|
fd8addebdc | ||
|
|
437a19691c | ||
|
|
c663ccd83b | ||
|
|
53958f369d | ||
|
|
d7a517cdfc | ||
|
|
684e597460 | ||
|
|
1fcd442373 | ||
|
|
964f1cd177 | ||
|
|
3dce9aeabd | ||
|
|
031ad7adc6 | ||
|
|
3493d31e47 | ||
|
|
bdc7d047ed | ||
|
|
622b290908 | ||
|
|
f9a563f0c4 | ||
|
|
337270b80b | ||
|
|
f11e925756 | ||
|
|
c34af38f18 | ||
|
|
390abc7687 | ||
|
|
cdd959def7 | ||
|
|
816f805c3e | ||
|
|
773216ed2e | ||
|
|
42ae6e6c7e | ||
|
|
6cbd258201 | ||
|
|
689376ea80 | ||
|
|
cd6fd2cfd0 | ||
|
|
4804b316b8 | ||
|
|
df7c1b3221 | ||
|
|
ab3af5bfd3 | ||
|
|
84e42a0580 | ||
|
|
11efc56a07 | ||
|
|
ce24d4ea39 | ||
|
|
4ab50bf065 | ||
|
|
97cf7903be | ||
|
|
eec22bfec3 | ||
|
|
c9df308835 | ||
|
|
aa0d8b3baf | ||
|
|
bd2bdfd17b | ||
|
|
c11dac53dc | ||
|
|
a4ee48503d | ||
|
|
e1e44df206 | ||
|
|
cc452f3b4f | ||
|
|
d529390128 | ||
|
|
34089a2af0 | ||
|
|
7e571f9dfb | ||
|
|
23b813db53 | ||
|
|
0ac77fee2b | ||
|
|
c0a0fe3e22 | ||
|
|
d37841f2b1 | ||
|
|
a2c6e613f3 | ||
|
|
940fa4fd4e | ||
|
|
ed8d84158b | ||
|
|
962ef477da | ||
|
|
ef203ce9c7 | ||
|
|
f86e962f25 | ||
|
|
1b4fc47fd9 | ||
|
|
a4ccaa5822 | ||
|
|
aa7f696df0 | ||
|
|
5578f1ec1e |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -1,3 +1,7 @@
|
||||
.cursor/rules/vibe-tools.mdc
|
||||
vibe-tools.config.json
|
||||
.repomix-output.txt
|
||||
repomix.config.json
|
||||
.DS_Store
|
||||
.env
|
||||
.envrc
|
||||
@@ -183,4 +187,4 @@ codegen-for-zig-team.tar.gz
|
||||
*.sock
|
||||
scratch*.{js,ts,tsx,cjs,mjs}
|
||||
|
||||
*.bun-build
|
||||
*.bun-build
|
||||
|
||||
@@ -885,6 +885,7 @@ if(NOT WIN32)
|
||||
-Wno-unused-function
|
||||
-Wno-c++23-lambda-attributes
|
||||
-Wno-nullability-completeness
|
||||
-Wmisleading-indentation
|
||||
-Werror
|
||||
)
|
||||
else()
|
||||
|
||||
2
packages/bun-types/overrides.d.ts
vendored
2
packages/bun-types/overrides.d.ts
vendored
@@ -2,7 +2,7 @@ export {};
|
||||
|
||||
declare global {
|
||||
namespace NodeJS {
|
||||
interface ProcessEnv extends Bun.Env, ImportMetaEnv {}
|
||||
interface ProcessEnv extends Bun.Env {}
|
||||
|
||||
interface Process {
|
||||
readonly version: string;
|
||||
|
||||
@@ -334,7 +334,7 @@ async function runTests() {
|
||||
const okResults = [];
|
||||
const flakyResults = [];
|
||||
const failedResults = [];
|
||||
const maxAttempts = 1 + (parseInt(options["retries"]) || 0);
|
||||
const defaultMaxAttempts = 1 + (parseInt(options["retries"]) || 0);
|
||||
|
||||
/**
|
||||
* @param {string} title
|
||||
@@ -342,11 +342,23 @@ async function runTests() {
|
||||
* @returns {Promise<TestResult>}
|
||||
*/
|
||||
const runTest = async (title, fn) => {
|
||||
// suspicious tests are run a minimum number of times, larger than the normal retry count, even
|
||||
// if they pass on the first attempt. we are giving them N chances to fail instead of N chances
|
||||
// to pass.
|
||||
const suspiciousTests = [
|
||||
"test-worker-arraybuffer-zerofill.js",
|
||||
"worker_destruction.test.ts",
|
||||
"worker.test.ts",
|
||||
"test-worker-message-port-transfer-terminate.js",
|
||||
"worker-lifecycle-message-port.test.ts",
|
||||
];
|
||||
const suspicious = suspiciousTests.some(name => title.includes(name));
|
||||
const maxAttempts = suspicious ? 50 : defaultMaxAttempts;
|
||||
const index = ++i;
|
||||
|
||||
let result, failure, flaky;
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
if (attempt > 1) {
|
||||
if (attempt > 1 && !suspicious) {
|
||||
await new Promise(resolve => setTimeout(resolve, 5000 + Math.random() * 10_000));
|
||||
}
|
||||
|
||||
@@ -364,7 +376,11 @@ async function runTests() {
|
||||
} else {
|
||||
okResults.push(result);
|
||||
}
|
||||
break;
|
||||
if (suspicious) {
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const color = attempt >= maxAttempts ? "red" : "yellow";
|
||||
|
||||
@@ -2631,6 +2631,10 @@ pub fn spawnMaybeSync(
|
||||
defer {
|
||||
jsc_vm.uwsLoop().internal_loop_data.jsc_vm = old_vm;
|
||||
}
|
||||
|
||||
jsc_vm.eventLoop().is_inside_spawn_sync = true;
|
||||
defer jsc_vm.eventLoop().is_inside_spawn_sync = false;
|
||||
|
||||
while (subprocess.hasPendingActivityNonThreadsafe()) {
|
||||
if (subprocess.stdin == .buffer) {
|
||||
subprocess.stdin.buffer.watch();
|
||||
|
||||
@@ -57,7 +57,7 @@ static const DOMException::Description descriptions[] = {
|
||||
{ "QuotaExceededError"_s, "The quota has been exceeded."_s, 22 },
|
||||
{ "TimeoutError"_s, "The operation timed out."_s, 23 },
|
||||
{ "InvalidNodeTypeError"_s, "The supplied node is incorrect or has an incorrect ancestor for this operation."_s, 24 },
|
||||
{ "DataCloneError"_s, "The object can not be cloned."_s, 25 },
|
||||
{ "DataCloneError"_s, "The object could not be cloned."_s, 25 }, // TODO: This should include an inspection of the object (e.g. "DOMException [DataCloneError]: hello () {} could not be cloned.")
|
||||
{ "EncodingError"_s, "The encoding operation (either encoded or decoding) failed."_s, 0 },
|
||||
{ "NotReadableError"_s, "The I/O read operation failed."_s, 0 },
|
||||
{ "UnknownError"_s, "The operation failed for an unknown transient reason (e.g. out of memory)."_s, 0 },
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#include "BunClientData.h"
|
||||
#include "EventLoopTask.h"
|
||||
#include "BunBroadcastChannelRegistry.h"
|
||||
#include <wtf/threads/BinarySemaphore.h>
|
||||
#include <wtf/LazyRef.h>
|
||||
extern "C" void Bun__startLoop(us_loop_t* loop);
|
||||
|
||||
@@ -215,10 +216,40 @@ bool ScriptExecutionContext::ensureOnMainThread(Function<void(ScriptExecutionCon
|
||||
return false;
|
||||
}
|
||||
|
||||
if (WTF::isMainThread()) {
|
||||
task(*context);
|
||||
return true;
|
||||
}
|
||||
|
||||
context->postTaskConcurrently(WTFMove(task));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ScriptExecutionContext::ensureOnMainThreadAndWait(Function<void(ScriptExecutionContext&)>&& task)
|
||||
{
|
||||
auto* context = ScriptExecutionContext::getMainThreadScriptExecutionContext();
|
||||
|
||||
if (!context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (WTF::isMainThread()) {
|
||||
task(*context);
|
||||
return true;
|
||||
}
|
||||
|
||||
BinarySemaphore semaphore;
|
||||
|
||||
context->postTaskConcurrently(
|
||||
[task = WTFMove(task), &semaphore](ScriptExecutionContext& context) {
|
||||
task(context);
|
||||
semaphore.signal();
|
||||
});
|
||||
|
||||
semaphore.wait();
|
||||
return true;
|
||||
}
|
||||
|
||||
ScriptExecutionContext* ScriptExecutionContext::getMainThreadScriptExecutionContext()
|
||||
{
|
||||
Locker locker { allScriptExecutionContextsMapLock };
|
||||
@@ -366,23 +397,23 @@ ScriptExecutionContext* executionContext(JSC::JSGlobalObject* globalObject)
|
||||
void ScriptExecutionContext::postTaskConcurrently(Function<void(ScriptExecutionContext&)>&& lambda)
|
||||
{
|
||||
auto* task = new EventLoopTask(WTFMove(lambda));
|
||||
reinterpret_cast<Zig::GlobalObject*>(m_globalObject)->queueTaskConcurrently(task);
|
||||
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTaskConcurrently(task);
|
||||
}
|
||||
// Executes the task on context's thread asynchronously.
|
||||
void ScriptExecutionContext::postTask(Function<void(ScriptExecutionContext&)>&& lambda)
|
||||
{
|
||||
auto* task = new EventLoopTask(WTFMove(lambda));
|
||||
reinterpret_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
|
||||
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
|
||||
}
|
||||
// Executes the task on context's thread asynchronously.
|
||||
void ScriptExecutionContext::postTask(EventLoopTask* task)
|
||||
{
|
||||
reinterpret_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
|
||||
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
|
||||
}
|
||||
// Executes the task on context's thread asynchronously.
|
||||
void ScriptExecutionContext::postTaskOnTimeout(EventLoopTask* task, Seconds timeout)
|
||||
{
|
||||
reinterpret_cast<Zig::GlobalObject*>(m_globalObject)->queueTaskOnTimeout(task, static_cast<int>(timeout.milliseconds()));
|
||||
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTaskOnTimeout(task, static_cast<int>(timeout.milliseconds()));
|
||||
}
|
||||
// Executes the task on context's thread asynchronously.
|
||||
void ScriptExecutionContext::postTaskOnTimeout(Function<void(ScriptExecutionContext&)>&& lambda, Seconds timeout)
|
||||
@@ -391,6 +422,19 @@ void ScriptExecutionContext::postTaskOnTimeout(Function<void(ScriptExecutionCont
|
||||
postTaskOnTimeout(task, timeout);
|
||||
}
|
||||
|
||||
extern "C" void Bun__queueImmediateCppTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
|
||||
|
||||
void ScriptExecutionContext::queueImmediateCppTask(Function<void(ScriptExecutionContext&)>&& lambda)
|
||||
{
|
||||
auto* task = new EventLoopTask(WTFMove(lambda));
|
||||
queueImmediateCppTask(task);
|
||||
}
|
||||
|
||||
void ScriptExecutionContext::queueImmediateCppTask(EventLoopTask* task)
|
||||
{
|
||||
Bun__queueImmediateCppTask(m_globalObject, task);
|
||||
}
|
||||
|
||||
// Zig bindings
|
||||
extern "C" ScriptExecutionContextIdentifier ScriptExecutionContextIdentifier__forGlobalObject(JSC::JSGlobalObject* globalObject)
|
||||
{
|
||||
|
||||
@@ -108,6 +108,7 @@ public:
|
||||
WEBCORE_EXPORT static bool postTaskTo(ScriptExecutionContextIdentifier identifier, Function<void(ScriptExecutionContext&)>&& task);
|
||||
WEBCORE_EXPORT static bool ensureOnContextThread(ScriptExecutionContextIdentifier, Function<void(ScriptExecutionContext&)>&& task);
|
||||
WEBCORE_EXPORT static bool ensureOnMainThread(Function<void(ScriptExecutionContext&)>&& task);
|
||||
WEBCORE_EXPORT static bool ensureOnMainThreadAndWait(Function<void(ScriptExecutionContext&)>&& task);
|
||||
|
||||
WEBCORE_EXPORT JSC::JSGlobalObject* globalObject();
|
||||
|
||||
@@ -135,6 +136,9 @@ public:
|
||||
// Executes the task on context's thread asynchronously.
|
||||
void postTaskOnTimeout(Function<void(ScriptExecutionContext&)>&& lambda, Seconds timeout);
|
||||
|
||||
void queueImmediateCppTask(Function<void(ScriptExecutionContext&)>&& lambda);
|
||||
void queueImmediateCppTask(EventLoopTask* task);
|
||||
|
||||
template<typename... Arguments>
|
||||
void postCrossThreadTask(Arguments&&... arguments)
|
||||
{
|
||||
@@ -157,6 +161,14 @@ public:
|
||||
|
||||
static ScriptExecutionContext* getMainThreadScriptExecutionContext();
|
||||
|
||||
bool canSendMessage()
|
||||
{
|
||||
static constexpr size_t maxMessagesPerTick = 1000;
|
||||
return m_messagesSentThisTick < maxMessagesPerTick;
|
||||
}
|
||||
void incrementMessageCount() { m_messagesSentThisTick++; }
|
||||
void resetMessageCount() { m_messagesSentThisTick = 0; }
|
||||
|
||||
private:
|
||||
JSC::VM* m_vm = nullptr;
|
||||
JSC::JSGlobalObject* m_globalObject = nullptr;
|
||||
@@ -169,6 +181,7 @@ private:
|
||||
LazyRef<ScriptExecutionContext, BunBroadcastChannelRegistry> m_broadcastChannelRegistry;
|
||||
|
||||
bool m_willProcessMessageWithMessagePortsSoon { false };
|
||||
size_t m_messagesSentThisTick { 0 };
|
||||
|
||||
us_socket_context_t* webSocketContextSSL();
|
||||
us_socket_context_t* webSocketContextNoSSL();
|
||||
|
||||
@@ -3928,6 +3928,7 @@ extern "C" void JSGlobalObject__clearTerminationException(JSC::JSGlobalObject* g
|
||||
}
|
||||
|
||||
extern "C" void Bun__queueTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
|
||||
extern "C" void Bun__queueImmediateCppTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
|
||||
extern "C" void Bun__queueTaskWithTimeout(JSC::JSGlobalObject*, WebCore::EventLoopTask* task, int timeout);
|
||||
extern "C" void Bun__queueTaskConcurrently(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
|
||||
extern "C" void Bun__performTask(Zig::GlobalObject* globalObject, WebCore::EventLoopTask* task)
|
||||
@@ -3952,6 +3953,11 @@ void GlobalObject::queueTask(WebCore::EventLoopTask* task)
|
||||
Bun__queueTask(this, task);
|
||||
}
|
||||
|
||||
void GlobalObject::queueImmediateCppTask(WebCore::EventLoopTask* task)
|
||||
{
|
||||
Bun__queueImmediateCppTask(this, task);
|
||||
}
|
||||
|
||||
void GlobalObject::queueTaskOnTimeout(WebCore::EventLoopTask* task, int timeout)
|
||||
{
|
||||
Bun__queueTaskWithTimeout(this, task, timeout);
|
||||
|
||||
@@ -170,6 +170,7 @@ public:
|
||||
void queueTask(WebCore::EventLoopTask* task);
|
||||
void queueTaskOnTimeout(WebCore::EventLoopTask* task, int timeout);
|
||||
void queueTaskConcurrently(WebCore::EventLoopTask* task);
|
||||
void queueImmediateCppTask(WebCore::EventLoopTask* task);
|
||||
|
||||
JSDOMStructureMap& structures() WTF_REQUIRES_LOCK(m_gcLock) { return m_structures; }
|
||||
JSDOMStructureMap& structures(NoLockingNecessaryTag) WTF_IGNORES_THREAD_SAFETY_ANALYSIS
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include <wtf/TZoneMallocInlines.h>
|
||||
#include <wtf/Lock.h>
|
||||
#include <wtf/Scope.h>
|
||||
#include <wtf/threads/BinarySemaphore.h>
|
||||
|
||||
extern "C" void Bun__eventLoop__incrementRefConcurrently(void* bunVM, int delta);
|
||||
|
||||
@@ -238,11 +239,79 @@ void MessagePort::close()
|
||||
return;
|
||||
m_isDetached = true;
|
||||
|
||||
MessagePortChannelProvider::singleton().messagePortClosed(m_identifier);
|
||||
ScriptExecutionContext::ensureOnMainThread(
|
||||
[this](ScriptExecutionContext&) {
|
||||
MessagePortChannelProvider::singleton().messagePortClosed(m_identifier);
|
||||
});
|
||||
|
||||
removeAllEventListeners();
|
||||
}
|
||||
|
||||
void MessagePort::processMessages(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback)
|
||||
{
|
||||
auto& vm = context.vm();
|
||||
auto* globalObject = defaultGlobalObject(context.globalObject());
|
||||
|
||||
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running) {
|
||||
completionCallback();
|
||||
return;
|
||||
}
|
||||
|
||||
Vector<MessageWithMessagePorts> deferredMessages;
|
||||
|
||||
for (auto&& message : messages) {
|
||||
if (!context.canSendMessage()) {
|
||||
deferredMessages.append(WTFMove(message));
|
||||
continue;
|
||||
}
|
||||
|
||||
context.incrementMessageCount();
|
||||
|
||||
auto scope = DECLARE_CATCH_SCOPE(vm);
|
||||
|
||||
auto ports = MessagePort::entanglePorts(context, WTFMove(message.transferredPorts));
|
||||
auto event = MessageEvent::create(*context.jsGlobalObject(), message.message.releaseNonNull(), {}, {}, {}, WTFMove(ports));
|
||||
dispatchEvent(event.event);
|
||||
|
||||
if (scope.exception()) [[unlikely]] {
|
||||
RELEASE_ASSERT(vm.hasPendingTerminationException());
|
||||
return;
|
||||
}
|
||||
|
||||
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) == ScriptExecutionStatus::Running) {
|
||||
globalObject->drainMicrotasks();
|
||||
}
|
||||
}
|
||||
|
||||
if (!deferredMessages.isEmpty()) {
|
||||
auto* globalObject = defaultGlobalObject(context.globalObject());
|
||||
auto scriptExecutionStatus = Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject);
|
||||
|
||||
if (scriptExecutionStatus != ScriptExecutionStatus::Running || context.isJSExecutionForbidden()) {
|
||||
completionCallback();
|
||||
return;
|
||||
}
|
||||
|
||||
// remaining messages should happen on the next on the immediate cpp task queue
|
||||
auto contextIdentifier = context.identifier();
|
||||
context.queueImmediateCppTask(
|
||||
[protectedThis = Ref { *this }, contextIdentifier, deferred = WTFMove(deferredMessages), completionCallback = WTFMove(completionCallback)](ScriptExecutionContext& ctx) mutable {
|
||||
if (auto* validContext = ScriptExecutionContext::getScriptExecutionContext(contextIdentifier)) {
|
||||
if (validContext == &ctx && !validContext->activeDOMObjectsAreSuspended() && protectedThis->isEntangled()) {
|
||||
// then reset for next tick
|
||||
ctx.resetMessageCount();
|
||||
protectedThis->processMessages(ctx, WTFMove(deferred), WTFMove(completionCallback));
|
||||
return;
|
||||
}
|
||||
}
|
||||
// context was destroyed or conditions not met, just complete
|
||||
completionCallback();
|
||||
});
|
||||
} else {
|
||||
completionCallback();
|
||||
}
|
||||
}
|
||||
|
||||
void MessagePort::dispatchMessages()
|
||||
{
|
||||
// Messages for contexts that are not fully active get dispatched too, but JSAbstractEventListener::handleEvent() doesn't call handlers for these.
|
||||
@@ -253,60 +322,49 @@ void MessagePort::dispatchMessages()
|
||||
if (!context || context->activeDOMObjectsAreSuspended() || !isEntangled())
|
||||
return;
|
||||
|
||||
auto messagesTakenHandler = [this, protectedThis = Ref { *this }](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionCallback) mutable {
|
||||
auto scopeExit = makeScopeExit(WTFMove(completionCallback));
|
||||
auto executionContextIdentifier = scriptExecutionContext()->identifier();
|
||||
|
||||
// LOG(MessagePorts, "MessagePort %s (%p) dispatching %zu messages", m_identifier.logString().utf8().data(), this, messages.size());
|
||||
auto messagesTakenHandler = [this, protectedThis = Ref { *this }, executionContextIdentifier](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionCallback) mutable {
|
||||
RefPtr<ScriptExecutionContext> context = ScriptExecutionContext::getScriptExecutionContext(executionContextIdentifier);
|
||||
|
||||
RefPtr<ScriptExecutionContext> context = scriptExecutionContext();
|
||||
if (!context || !context->globalObject())
|
||||
if (!context || !context->globalObject()) {
|
||||
completionCallback();
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(context->isContextThread());
|
||||
auto* globalObject = defaultGlobalObject(context->globalObject());
|
||||
Ref vm = globalObject->vm();
|
||||
auto scope = DECLARE_CATCH_SCOPE(vm);
|
||||
|
||||
for (auto& message : messages) {
|
||||
// close() in Worker onmessage handler should prevent next message from dispatching.
|
||||
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running)
|
||||
return;
|
||||
|
||||
auto ports = MessagePort::entanglePorts(*context, WTFMove(message.transferredPorts));
|
||||
if (scope.exception()) [[unlikely]] {
|
||||
// Currently, we assume that the only way we can get here is if we have a termination.
|
||||
RELEASE_ASSERT(vm->hasPendingTerminationException());
|
||||
return;
|
||||
}
|
||||
|
||||
// Per specification, each MessagePort object has a task source called the port message queue.
|
||||
// queueTaskKeepingObjectAlive(context, *this, TaskSource::PostedMessageQueue, [this, event = WTFMove(event)] {
|
||||
// dispatchEvent(event.event);
|
||||
// });
|
||||
|
||||
ScriptExecutionContext::postTaskTo(context->identifier(), [protectedThis = Ref { *this }, ports = WTFMove(ports), message = WTFMove(message)](ScriptExecutionContext& context) mutable {
|
||||
auto event = MessageEvent::create(*context.jsGlobalObject(), message.message.releaseNonNull(), {}, {}, {}, WTFMove(ports));
|
||||
protectedThis->dispatchEvent(event.event);
|
||||
});
|
||||
}
|
||||
processMessages(*context, WTFMove(messages), WTFMove(completionCallback));
|
||||
};
|
||||
|
||||
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(m_identifier, WTFMove(messagesTakenHandler));
|
||||
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(executionContextIdentifier, m_identifier, WTFMove(messagesTakenHandler));
|
||||
}
|
||||
|
||||
// synchronous for node:worker_threads.receiveMessageOnPort
|
||||
JSValue MessagePort::tryTakeMessage(JSGlobalObject* lexicalGlobalObject)
|
||||
{
|
||||
auto* context = scriptExecutionContext();
|
||||
if (!context || context->activeDOMObjectsAreSuspended() || !isEntangled())
|
||||
return jsUndefined();
|
||||
|
||||
std::optional<MessageWithMessagePorts> messageWithPorts = MessagePortChannelProvider::fromContext(*context).tryTakeMessageForPort(m_identifier);
|
||||
std::optional<MessageWithMessagePorts> result;
|
||||
BinarySemaphore semaphore;
|
||||
|
||||
if (!messageWithPorts)
|
||||
auto callback = [&](std::optional<MessageWithMessagePorts>&& messageWithPorts) {
|
||||
result = WTFMove(messageWithPorts);
|
||||
semaphore.signal();
|
||||
};
|
||||
|
||||
ScriptExecutionContext::ensureOnMainThread([identifier = m_identifier, callback](ScriptExecutionContext& context) mutable {
|
||||
MessagePortChannelProvider::fromContext(context).tryTakeMessageForPort(identifier, WTFMove(callback));
|
||||
});
|
||||
|
||||
semaphore.wait();
|
||||
|
||||
if (!result)
|
||||
return jsUndefined();
|
||||
|
||||
auto ports = MessagePort::entanglePorts(*context, WTFMove(messageWithPorts->transferredPorts));
|
||||
auto message = messageWithPorts->message.releaseNonNull();
|
||||
auto ports = MessagePort::entanglePorts(*context, WTFMove(result->transferredPorts));
|
||||
auto message = result->message.releaseNonNull();
|
||||
return message->deserialize(*lexicalGlobalObject, lexicalGlobalObject, WTFMove(ports), SerializationErrorMode::NonThrowing);
|
||||
}
|
||||
|
||||
@@ -427,8 +485,7 @@ Ref<MessagePort> MessagePort::entangle(ScriptExecutionContext& context, Transfer
|
||||
bool MessagePort::addEventListener(const AtomString& eventType, Ref<EventListener>&& listener, const AddEventListenerOptions& options)
|
||||
{
|
||||
if (eventType == eventNames().messageEvent) {
|
||||
if (listener->isAttribute())
|
||||
start();
|
||||
start();
|
||||
m_hasMessageEventListener = true;
|
||||
}
|
||||
return EventTarget::addEventListener(eventType, WTFMove(listener), options);
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "ActiveDOMObject.h"
|
||||
#include "ContextDestructionObserver.h"
|
||||
#include "EventTarget.h"
|
||||
#include "ExceptionOr.h"
|
||||
#include "MessagePortChannel.h"
|
||||
@@ -144,9 +145,9 @@ private:
|
||||
MessagePortIdentifier m_remoteIdentifier;
|
||||
|
||||
mutable std::atomic<unsigned> m_refCount { 1 };
|
||||
void processMessages(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback);
|
||||
|
||||
bool m_hasRef { false };
|
||||
|
||||
uint32_t m_messageEventCount { 0 };
|
||||
static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind);
|
||||
};
|
||||
|
||||
@@ -42,6 +42,8 @@ MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, con
|
||||
: m_ports { port1, port2 }
|
||||
, m_registry(registry)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
relaxAdoptionRequirement();
|
||||
|
||||
m_processes[0] = port1.processIdentifier;
|
||||
@@ -49,16 +51,22 @@ MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, con
|
||||
m_processes[1] = port2.processIdentifier;
|
||||
m_entangledToProcessProtectors[1] = this;
|
||||
|
||||
m_registry.messagePortChannelCreated(*this);
|
||||
checkedRegistry()->messagePortChannelCreated(*this);
|
||||
}
|
||||
|
||||
MessagePortChannel::~MessagePortChannel()
|
||||
{
|
||||
m_registry.messagePortChannelDestroyed(*this);
|
||||
checkedRegistry()->messagePortChannelDestroyed(*this);
|
||||
}
|
||||
|
||||
CheckedRef<MessagePortChannelRegistry> MessagePortChannel::checkedRegistry() const
|
||||
{
|
||||
return m_registry;
|
||||
}
|
||||
|
||||
std::optional<ProcessIdentifier> MessagePortChannel::processForPort(const MessagePortIdentifier& port)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
ASSERT(port == m_ports[0] || port == m_ports[1]);
|
||||
size_t i = port == m_ports[0] ? 0 : 1;
|
||||
return m_processes[i];
|
||||
@@ -66,11 +74,15 @@ std::optional<ProcessIdentifier> MessagePortChannel::processForPort(const Messag
|
||||
|
||||
bool MessagePortChannel::includesPort(const MessagePortIdentifier& port)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
return m_ports[0] == port || m_ports[1] == port;
|
||||
}
|
||||
|
||||
void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& port, ProcessIdentifier process)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
ASSERT(port == m_ports[0] || port == m_ports[1]);
|
||||
size_t i = port == m_ports[0] ? 0 : 1;
|
||||
|
||||
@@ -84,6 +96,8 @@ void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& po
|
||||
|
||||
void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// LOG(MessagePorts, "MessagePortChannel %s (%p) disentangling port %s", logString().utf8().data(), this, port.logString().utf8().data());
|
||||
|
||||
ASSERT(port == m_ports[0] || port == m_ports[1]);
|
||||
@@ -100,16 +114,14 @@ void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
|
||||
|
||||
void MessagePortChannel::closePort(const MessagePortIdentifier& port)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
ASSERT(port == m_ports[0] || port == m_ports[1]);
|
||||
size_t i = port == m_ports[0] ? 0 : 1;
|
||||
|
||||
m_processes[i] = std::nullopt;
|
||||
m_isClosed[i] = true;
|
||||
|
||||
// This set of steps is to guarantee that the lock is unlocked before the
|
||||
// last ref to this object is released.
|
||||
Ref protectedThis { *this };
|
||||
|
||||
m_pendingMessages[i].clear();
|
||||
m_pendingMessagePortTransfers[i].clear();
|
||||
m_pendingMessageProtectors[i] = nullptr;
|
||||
@@ -118,6 +130,8 @@ void MessagePortChannel::closePort(const MessagePortIdentifier& port)
|
||||
|
||||
bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
ASSERT(remoteTarget == m_ports[0] || remoteTarget == m_ports[1]);
|
||||
size_t i = remoteTarget == m_ports[0] ? 0 : 1;
|
||||
|
||||
@@ -135,6 +149,8 @@ bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message,
|
||||
|
||||
void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& callback)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// LOG(MessagePorts, "MessagePortChannel %p taking all messages for port %s", this, port.logString().utf8().data());
|
||||
|
||||
ASSERT(port == m_ports[0] || port == m_ports[1]);
|
||||
@@ -145,7 +161,7 @@ void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& por
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(m_pendingMessageProtectors[i]);
|
||||
ASSERT(m_pendingMessageProtectors[i] == this);
|
||||
|
||||
Vector<MessageWithMessagePorts> result;
|
||||
result.swap(m_pendingMessages[i]);
|
||||
@@ -154,24 +170,36 @@ void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& por
|
||||
|
||||
// LOG(MessagePorts, "There are %zu messages to take for port %s. Taking them now, messages in flight is now %" PRIu64, result.size(), port.logString().utf8().data(), m_messageBatchesInFlight);
|
||||
|
||||
callback(WTFMove(result), [this, port, protectedThis = WTFMove(m_pendingMessageProtectors[i])] {
|
||||
auto size = result.size();
|
||||
callback(WTFMove(result), [size, port, protectedThis = WTFMove(m_pendingMessageProtectors[i])] {
|
||||
UNUSED_PARAM(port);
|
||||
--m_messageBatchesInFlight;
|
||||
// LOG(MessagePorts, "Message port channel %s was notified that a batch of %zu message port messages targeted for port %s just completed dispatch, in flight is now %" PRIu64, logString().utf8().data(), size, port.logString().utf8().data(), m_messageBatchesInFlight);
|
||||
UNUSED_PARAM(size);
|
||||
--(protectedThis->m_messageBatchesInFlight);
|
||||
// LOG(MessagePorts, "Message port channel %s was notified that a batch of %zu message port messages targeted for port %s just completed dispatch, in flight is now %" PRIu64, protectedThis->logString().utf8().data(), size, port.logString().utf8().data(), protectedThis->m_messageBatchesInFlight);
|
||||
});
|
||||
}
|
||||
|
||||
std::optional<MessageWithMessagePorts> MessagePortChannel::tryTakeMessageForPort(const MessagePortIdentifier port)
|
||||
bool MessagePortChannel::hasAnyMessagesPendingOrInFlight() const
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
return m_messageBatchesInFlight || !m_pendingMessages[0].isEmpty() || !m_pendingMessages[1].isEmpty();
|
||||
}
|
||||
|
||||
void MessagePortChannel::tryTakeMessageForPort(const MessagePortIdentifier port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
ASSERT(port == m_ports[0] || port == m_ports[1]);
|
||||
size_t i = port == m_ports[0] ? 0 : 1;
|
||||
|
||||
if (m_pendingMessages[i].isEmpty())
|
||||
return std::nullopt;
|
||||
if (m_pendingMessages[i].isEmpty()) {
|
||||
callback(std::nullopt);
|
||||
return;
|
||||
}
|
||||
|
||||
auto message = m_pendingMessages[i].first();
|
||||
m_pendingMessages[i].removeAt(0);
|
||||
return WTFMove(message);
|
||||
callback(WTFMove(message));
|
||||
}
|
||||
|
||||
} // namespace WebCore
|
||||
|
||||
@@ -30,9 +30,10 @@
|
||||
#include "MessageWithMessagePorts.h"
|
||||
#include "ProcessIdentifier.h"
|
||||
#include <wtf/HashSet.h>
|
||||
#include <wtf/RefCounted.h>
|
||||
#include <wtf/text/WTFString.h>
|
||||
#include <wtf/RefCountedAndCanMakeWeakPtr.h>
|
||||
#include <wtf/WeakPtr.h>
|
||||
#include <wtf/text/MakeString.h>
|
||||
#include <wtf/text/WTFString.h>
|
||||
|
||||
namespace WebCore {
|
||||
|
||||
@@ -55,32 +56,31 @@ public:
|
||||
bool postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget);
|
||||
|
||||
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&);
|
||||
std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier);
|
||||
void tryTakeMessageForPort(const MessagePortIdentifier, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&);
|
||||
|
||||
WEBCORE_EXPORT bool hasAnyMessagesPendingOrInFlight() const;
|
||||
|
||||
uint64_t beingTransferredCount();
|
||||
|
||||
#if !LOG_DISABLED
|
||||
String logString() const
|
||||
{
|
||||
return makeString(m_ports[0].logString(), ":"_s, m_ports[1].logString());
|
||||
}
|
||||
String logString() const { return makeString(m_ports[0].logString(), ':', m_ports[1].logString()); }
|
||||
#endif
|
||||
|
||||
private:
|
||||
MessagePortChannel(MessagePortChannelRegistry&, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2);
|
||||
|
||||
MessagePortIdentifier m_ports[2];
|
||||
bool m_isClosed[2] { false, false };
|
||||
std::optional<ProcessIdentifier> m_processes[2];
|
||||
RefPtr<MessagePortChannel> m_entangledToProcessProtectors[2];
|
||||
Vector<MessageWithMessagePorts> m_pendingMessages[2];
|
||||
UncheckedKeyHashSet<RefPtr<MessagePortChannel>> m_pendingMessagePortTransfers[2];
|
||||
RefPtr<MessagePortChannel> m_pendingMessageProtectors[2];
|
||||
CheckedRef<MessagePortChannelRegistry> checkedRegistry() const;
|
||||
|
||||
std::array<MessagePortIdentifier, 2> m_ports;
|
||||
std::array<bool, 2> m_isClosed { false, false };
|
||||
std::array<std::optional<ProcessIdentifier>, 2> m_processes;
|
||||
std::array<RefPtr<MessagePortChannel>, 2> m_entangledToProcessProtectors;
|
||||
std::array<Vector<MessageWithMessagePorts>, 2> m_pendingMessages;
|
||||
std::array<UncheckedKeyHashSet<RefPtr<MessagePortChannel>>, 2> m_pendingMessagePortTransfers;
|
||||
std::array<RefPtr<MessagePortChannel>, 2> m_pendingMessageProtectors;
|
||||
uint64_t m_messageBatchesInFlight { 0 };
|
||||
|
||||
MessagePortChannelRegistry& m_registry;
|
||||
CheckedRef<MessagePortChannelRegistry> m_registry;
|
||||
};
|
||||
|
||||
} // namespace WebCore
|
||||
} // namespace WebCore
|
||||
@@ -24,13 +24,10 @@
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
// #include "MessagePortChannelProvider.h"
|
||||
|
||||
// #include "Document.h"
|
||||
#include "MessagePortChannelProvider.h"
|
||||
#include "MessagePortChannelProviderImpl.h"
|
||||
// #include "WorkerGlobalScope.h"
|
||||
// #include "WorkletGlobalScope.h"
|
||||
#include <wtf/MainThread.h>
|
||||
#include "ScriptExecutionContext.h"
|
||||
#include "BunWorkerGlobalScope.h"
|
||||
|
||||
namespace WebCore {
|
||||
|
||||
|
||||
@@ -26,8 +26,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "ProcessIdentifier.h"
|
||||
#include "BunWorkerGlobalScope.h"
|
||||
#include "MessageWithMessagePorts.h"
|
||||
#include "ScriptExecutionContext.h"
|
||||
#include <wtf/CompletionHandler.h>
|
||||
#include <wtf/Vector.h>
|
||||
|
||||
@@ -37,7 +36,7 @@ class MessagePortChannelProvider;
|
||||
|
||||
namespace WTF {
|
||||
template<typename T> struct IsDeprecatedWeakRefSmartPointerException;
|
||||
template<> struct IsDeprecatedWeakRefSmartPointerException<WebCore::MessagePortChannelProvider> : std::true_type {};
|
||||
template<> struct IsDeprecatedWeakRefSmartPointerException<WebCore::MessagePortChannelProvider> : std::true_type { };
|
||||
}
|
||||
|
||||
namespace WebCore {
|
||||
@@ -50,18 +49,19 @@ class MessagePortChannelProvider : public CanMakeWeakPtr<MessagePortChannelProvi
|
||||
public:
|
||||
static MessagePortChannelProvider& fromContext(ScriptExecutionContext&);
|
||||
static MessagePortChannelProvider& singleton();
|
||||
static void setSharedProvider(MessagePortChannelProvider&);
|
||||
|
||||
virtual ~MessagePortChannelProvider() {}
|
||||
virtual ~MessagePortChannelProvider() { }
|
||||
|
||||
// Operations that WebProcesses perform
|
||||
virtual void createNewMessagePortChannel(const MessagePortIdentifier& local, const MessagePortIdentifier& remote) = 0;
|
||||
virtual void entangleLocalPortInThisProcessToRemote(const MessagePortIdentifier& local, const MessagePortIdentifier& remote) = 0;
|
||||
virtual void messagePortDisentangled(const MessagePortIdentifier& local) = 0;
|
||||
virtual void messagePortClosed(const MessagePortIdentifier& local) = 0;
|
||||
|
||||
virtual void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) = 0;
|
||||
virtual std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&) = 0;
|
||||
|
||||
virtual void takeAllMessagesForPort(const ScriptExecutionContextIdentifier, const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) = 0;
|
||||
virtual void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&) = 0;
|
||||
virtual void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) = 0;
|
||||
};
|
||||
|
||||
} // namespace WebCore
|
||||
} // namespace WebCore
|
||||
@@ -22,14 +22,18 @@
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
|
||||
* THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
#include "root.h"
|
||||
|
||||
#include "config.h"
|
||||
#include "MessagePortChannelProviderImpl.h"
|
||||
|
||||
#include "MessagePort.h"
|
||||
#include "ScriptExecutionContext.h"
|
||||
#include "BunClientData.h"
|
||||
#include <wtf/MainThread.h>
|
||||
#include <wtf/RunLoop.h>
|
||||
|
||||
extern "C" void* Bun__getVM();
|
||||
|
||||
namespace WebCore {
|
||||
|
||||
MessagePortChannelProviderImpl::MessagePortChannelProviderImpl() = default;
|
||||
@@ -41,44 +45,85 @@ MessagePortChannelProviderImpl::~MessagePortChannelProviderImpl()
|
||||
|
||||
void MessagePortChannelProviderImpl::createNewMessagePortChannel(const MessagePortIdentifier& local, const MessagePortIdentifier& remote)
|
||||
{
|
||||
m_registry.didCreateMessagePortChannel(local, remote);
|
||||
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, local, remote](ScriptExecutionContext& context) {
|
||||
if (CheckedPtr registry = weakRegistry.get())
|
||||
registry->didCreateMessagePortChannel(local, remote);
|
||||
});
|
||||
}
|
||||
|
||||
void MessagePortChannelProviderImpl::entangleLocalPortInThisProcessToRemote(const MessagePortIdentifier& local, const MessagePortIdentifier& remote)
|
||||
{
|
||||
m_registry.didEntangleLocalToRemote(local, remote, Process::identifier());
|
||||
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, local, remote](ScriptExecutionContext& context) {
|
||||
if (CheckedPtr registry = weakRegistry.get())
|
||||
registry->didEntangleLocalToRemote(local, remote, Process::identifier());
|
||||
});
|
||||
}
|
||||
|
||||
void MessagePortChannelProviderImpl::messagePortDisentangled(const MessagePortIdentifier& local)
|
||||
{
|
||||
m_registry.didDisentangleMessagePort(local);
|
||||
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, local](ScriptExecutionContext& context) {
|
||||
if (CheckedPtr registry = weakRegistry.get())
|
||||
registry->didDisentangleMessagePort(local);
|
||||
});
|
||||
}
|
||||
|
||||
void MessagePortChannelProviderImpl::messagePortClosed(const MessagePortIdentifier& local)
|
||||
{
|
||||
m_registry.didCloseMessagePort(local);
|
||||
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, local](ScriptExecutionContext& context) {
|
||||
if (CheckedPtr registry = weakRegistry.get())
|
||||
registry->didCloseMessagePort(local);
|
||||
});
|
||||
}
|
||||
|
||||
void MessagePortChannelProviderImpl::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
|
||||
{
|
||||
if (m_registry.didPostMessageToRemote(WTFMove(message), remoteTarget))
|
||||
MessagePort::notifyMessageAvailable(remoteTarget);
|
||||
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, message = WTFMove(message), remoteTarget](ScriptExecutionContext& context) mutable {
|
||||
CheckedPtr registry = weakRegistry.get();
|
||||
if (!registry)
|
||||
return;
|
||||
if (registry->didPostMessageToRemote(WTFMove(message), remoteTarget))
|
||||
MessagePort::notifyMessageAvailable(remoteTarget);
|
||||
});
|
||||
}
|
||||
|
||||
void MessagePortChannelProviderImpl::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& outerCallback)
|
||||
void MessagePortChannelProviderImpl::takeAllMessagesForPort(const ScriptExecutionContextIdentifier identifier, const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& outerCallback)
|
||||
{
|
||||
// It is the responsibility of outerCallback to get itself to the appropriate thread (e.g. WebWorker thread)
|
||||
auto callback = [outerCallback = WTFMove(outerCallback)](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& messageDeliveryCallback) mutable {
|
||||
// ASSERT(isMainThread());
|
||||
outerCallback(WTFMove(messages), WTFMove(messageDeliveryCallback));
|
||||
};
|
||||
if (WTF::isMainThread()) {
|
||||
m_registry.takeAllMessagesForPort(port, WTFMove(outerCallback));
|
||||
return;
|
||||
}
|
||||
|
||||
m_registry.takeAllMessagesForPort(port, WTFMove(callback));
|
||||
auto currentVM = Bun__getVM();
|
||||
if (!currentVM) {
|
||||
outerCallback({}, []() {}); // already destroyed
|
||||
return;
|
||||
}
|
||||
|
||||
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, port, outerCallback = WTFMove(outerCallback), identifier](ScriptExecutionContext& mainContext) mutable {
|
||||
CheckedPtr registry = weakRegistry.get();
|
||||
if (!registry) {
|
||||
ScriptExecutionContext::ensureOnContextThread(identifier, [outerCallback = WTFMove(outerCallback)](ScriptExecutionContext&) mutable {
|
||||
outerCallback({}, []() {});
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
registry->takeAllMessagesForPort(port, [outerCallback = WTFMove(outerCallback), identifier](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionHandler) mutable {
|
||||
ScriptExecutionContext::ensureOnContextThread(identifier, [outerCallback = WTFMove(outerCallback), messages = WTFMove(messages), completionHandler = WTFMove(completionHandler)](ScriptExecutionContext&) mutable {
|
||||
auto wrappedCompletionHandler = [completionHandler = WTFMove(completionHandler)]() mutable {
|
||||
ScriptExecutionContext::ensureOnMainThread([completionHandler = WTFMove(completionHandler)](ScriptExecutionContext&) mutable {
|
||||
completionHandler();
|
||||
});
|
||||
};
|
||||
outerCallback(WTFMove(messages), WTFMove(wrappedCompletionHandler));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
std::optional<MessageWithMessagePorts> MessagePortChannelProviderImpl::tryTakeMessageForPort(const MessagePortIdentifier& port)
|
||||
void MessagePortChannelProviderImpl::tryTakeMessageForPort(const MessagePortIdentifier& port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
|
||||
{
|
||||
return m_registry.tryTakeMessageForPort(port);
|
||||
m_registry.tryTakeMessageForPort(port, WTFMove(callback));
|
||||
}
|
||||
|
||||
} // namespace WebCore
|
||||
|
||||
@@ -27,7 +27,7 @@
|
||||
|
||||
#include "MessagePortChannelProvider.h"
|
||||
#include "MessagePortChannelRegistry.h"
|
||||
#include "MessageWithMessagePorts.h"
|
||||
#include "ScriptExecutionContext.h"
|
||||
|
||||
namespace WebCore {
|
||||
|
||||
@@ -42,10 +42,10 @@ private:
|
||||
void messagePortDisentangled(const MessagePortIdentifier& local) final;
|
||||
void messagePortClosed(const MessagePortIdentifier& local) final;
|
||||
void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) final;
|
||||
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) final;
|
||||
std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&) final;
|
||||
void takeAllMessagesForPort(const ScriptExecutionContextIdentifier identifier, const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) final;
|
||||
void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&) final;
|
||||
|
||||
MessagePortChannelRegistry m_registry;
|
||||
};
|
||||
|
||||
} // namespace WebCore
|
||||
} // namespace WebCore
|
||||
@@ -49,14 +49,14 @@ MessagePortChannelRegistry::~MessagePortChannelRegistry()
|
||||
void MessagePortChannelRegistry::didCreateMessagePortChannel(const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
|
||||
{
|
||||
// LOG(MessagePorts, "Registry: Creating MessagePortChannel %p linking %s and %s", this, port1.logString().utf8().data(), port2.logString().utf8().data());
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
MessagePortChannel::create(*this, port1, port2);
|
||||
}
|
||||
|
||||
void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& channel)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
auto result = m_openChannels.add(channel.port1(), channel);
|
||||
ASSERT_UNUSED(result, result.isNewEntry);
|
||||
@@ -67,20 +67,27 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c
|
||||
|
||||
void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel& channel)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
ASSERT(m_openChannels.get(channel.port1()) == &channel);
|
||||
ASSERT(m_openChannels.get(channel.port2()) == &channel);
|
||||
|
||||
m_openChannels.remove(channel.port1());
|
||||
m_openChannels.remove(channel.port2());
|
||||
// auto* port1Channel = m_openChannels.get(channel.port1());
|
||||
// if (port1Channel == &channel)
|
||||
// m_openChannels.remove(channel.port1());
|
||||
|
||||
// auto* port2Channel = m_openChannels.get(channel.port2());
|
||||
// if (port2Channel == &channel)
|
||||
// m_openChannels.remove(channel.port2());
|
||||
|
||||
// LOG(MessagePorts, "Registry: After removing channel %s there are %u channels left in the registry:", channel.logString().utf8().data(), m_openChannels.size());
|
||||
}
|
||||
|
||||
void MessagePortChannelRegistry::didEntangleLocalToRemote(const MessagePortIdentifier& local, const MessagePortIdentifier& remote, ProcessIdentifier process)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// The channel might be gone if the remote side was closed.
|
||||
RefPtr channel = m_openChannels.get(local);
|
||||
@@ -94,7 +101,7 @@ void MessagePortChannelRegistry::didEntangleLocalToRemote(const MessagePortIdent
|
||||
|
||||
void MessagePortChannelRegistry::didDisentangleMessagePort(const MessagePortIdentifier& port)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// The channel might be gone if the remote side was closed.
|
||||
if (RefPtr channel = m_openChannels.get(port))
|
||||
@@ -103,7 +110,7 @@ void MessagePortChannelRegistry::didDisentangleMessagePort(const MessagePortIden
|
||||
|
||||
void MessagePortChannelRegistry::didCloseMessagePort(const MessagePortIdentifier& port)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// LOG(MessagePorts, "Registry: MessagePort %s closed in registry", port.logString().utf8().data());
|
||||
|
||||
@@ -124,7 +131,7 @@ void MessagePortChannelRegistry::didCloseMessagePort(const MessagePortIdentifier
|
||||
|
||||
bool MessagePortChannelRegistry::didPostMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// LOG(MessagePorts, "Registry: Posting message to MessagePort %s in registry", remoteTarget.logString().utf8().data());
|
||||
|
||||
@@ -140,7 +147,7 @@ bool MessagePortChannelRegistry::didPostMessageToRemote(MessageWithMessagePorts&
|
||||
|
||||
void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& callback)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// The channel might be gone if the remote side was closed.
|
||||
RefPtr channel = m_openChannels.get(port);
|
||||
@@ -152,23 +159,25 @@ void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentif
|
||||
channel->takeAllMessagesForPort(port, WTFMove(callback));
|
||||
}
|
||||
|
||||
std::optional<MessageWithMessagePorts> MessagePortChannelRegistry::tryTakeMessageForPort(const MessagePortIdentifier& port)
|
||||
void MessagePortChannelRegistry::tryTakeMessageForPort(const MessagePortIdentifier& port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// LOG(MessagePorts, "Registry: Trying to take a message for MessagePort %s", port.logString().utf8().data());
|
||||
|
||||
// The channel might be gone if the remote side was closed.
|
||||
auto* channel = m_openChannels.get(port);
|
||||
if (!channel)
|
||||
return std::nullopt;
|
||||
if (!channel) {
|
||||
callback(std::nullopt);
|
||||
return;
|
||||
}
|
||||
|
||||
return channel->tryTakeMessageForPort(port);
|
||||
channel->tryTakeMessageForPort(port, WTFMove(callback));
|
||||
}
|
||||
|
||||
MessagePortChannel* MessagePortChannelRegistry::existingChannelContainingPort(const MessagePortIdentifier& port)
|
||||
{
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
return m_openChannels.get(port);
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ public:
|
||||
WEBCORE_EXPORT void didCloseMessagePort(const MessagePortIdentifier& local);
|
||||
WEBCORE_EXPORT bool didPostMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget);
|
||||
WEBCORE_EXPORT void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&);
|
||||
WEBCORE_EXPORT std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&);
|
||||
WEBCORE_EXPORT void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&);
|
||||
|
||||
WEBCORE_EXPORT MessagePortChannel* existingChannelContainingPort(const MessagePortIdentifier&);
|
||||
|
||||
|
||||
@@ -69,8 +69,7 @@ namespace WebCore {
|
||||
|
||||
WTF_MAKE_TZONE_ALLOCATED_IMPL(Worker);
|
||||
|
||||
extern "C" void WebWorker__notifyNeedTermination(
|
||||
void* worker);
|
||||
extern "C" void WebWorkerLifecycleHandle__requestTermination(WebWorkerLifecycleHandle* worker);
|
||||
|
||||
static Lock allWorkersLock;
|
||||
static HashMap<ScriptExecutionContextIdentifier, Worker*>& allWorkers() WTF_REQUIRES_LOCK(allWorkersLock)
|
||||
@@ -109,7 +108,7 @@ Worker::Worker(ScriptExecutionContext& context, WorkerOptions&& options)
|
||||
ASSERT_UNUSED(addResult, addResult.isNewEntry);
|
||||
}
|
||||
extern "C" bool WebWorker__updatePtr(void* worker, Worker* ptr);
|
||||
extern "C" void* WebWorker__create(
|
||||
extern "C" WebWorkerLifecycleHandle* WebWorkerLifecycleHandle__createWebWorker(
|
||||
Worker* worker,
|
||||
void* parent,
|
||||
BunString name,
|
||||
@@ -133,12 +132,12 @@ extern "C" void WebWorker__setRef(
|
||||
|
||||
void Worker::setKeepAlive(bool keepAlive)
|
||||
{
|
||||
WebWorker__setRef(impl_, keepAlive);
|
||||
WebWorker__setRef(lifecycleHandle_, keepAlive);
|
||||
}
|
||||
|
||||
bool Worker::updatePtr()
|
||||
{
|
||||
if (!WebWorker__updatePtr(impl_, this)) {
|
||||
if (!WebWorker__updatePtr(lifecycleHandle_, this)) {
|
||||
m_onlineClosingFlags = ClosingFlag;
|
||||
m_terminationFlags.fetch_or(TerminatedFlag);
|
||||
return false;
|
||||
@@ -189,7 +188,7 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
|
||||
return { reinterpret_cast<WTF::StringImpl**>(vec.begin()), vec.size() };
|
||||
})
|
||||
.value_or(std::span<WTF::StringImpl*> {});
|
||||
void* impl = WebWorker__create(
|
||||
WebWorkerLifecycleHandle* lifecycleHandle = WebWorkerLifecycleHandle__createWebWorker(
|
||||
worker.ptr(),
|
||||
bunVM(context.jsGlobalObject()),
|
||||
nameStr,
|
||||
@@ -212,11 +211,11 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
|
||||
|
||||
preloadModuleStrings.clear();
|
||||
|
||||
if (!impl) {
|
||||
if (!lifecycleHandle) {
|
||||
return Exception { TypeError, errorMessage.toWTFString(BunString::ZeroCopy) };
|
||||
}
|
||||
|
||||
worker->impl_ = impl;
|
||||
worker->lifecycleHandle_ = lifecycleHandle;
|
||||
worker->m_workerCreationTime = MonotonicTime::now();
|
||||
|
||||
return worker;
|
||||
@@ -228,6 +227,12 @@ Worker::~Worker()
|
||||
Locker locker { allWorkersLock };
|
||||
allWorkers().remove(m_clientIdentifier);
|
||||
}
|
||||
|
||||
if (lifecycleHandle_) {
|
||||
auto* impl = lifecycleHandle_;
|
||||
lifecycleHandle_ = nullptr;
|
||||
WebWorkerLifecycleHandle__requestTermination(impl);
|
||||
}
|
||||
// m_contextProxy.workerObjectDestroyed();
|
||||
}
|
||||
|
||||
@@ -261,9 +266,11 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
|
||||
|
||||
void Worker::terminate()
|
||||
{
|
||||
// m_contextProxy.terminateWorkerGlobalScope();
|
||||
m_terminationFlags.fetch_or(TerminateRequestedFlag);
|
||||
WebWorker__notifyNeedTermination(impl_);
|
||||
|
||||
auto* impl = lifecycleHandle_;
|
||||
lifecycleHandle_ = nullptr;
|
||||
WebWorkerLifecycleHandle__requestTermination(impl);
|
||||
}
|
||||
|
||||
// const char* Worker::activeDOMObjectName() const
|
||||
@@ -468,6 +475,7 @@ void Worker::forEachWorker(const Function<Function<void(ScriptExecutionContext&)
|
||||
extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* worker, int32_t exitCode)
|
||||
{
|
||||
worker->dispatchExit(exitCode);
|
||||
|
||||
// no longer referenced by Zig
|
||||
worker->deref();
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ class WorkerGlobalScopeProxy;
|
||||
|
||||
struct StructuredSerializeOptions;
|
||||
struct WorkerOptions;
|
||||
struct WebWorkerLifecycleHandle;
|
||||
|
||||
class Worker final : public ThreadSafeRefCounted<Worker>, public EventTargetWithInlineData, private ContextDestructionObserver {
|
||||
WTF_MAKE_TZONE_ALLOCATED(Worker);
|
||||
@@ -119,7 +120,7 @@ private:
|
||||
// Tracks TerminateRequestedFlag and TerminatedFlag
|
||||
std::atomic<uint8_t> m_terminationFlags { 0 };
|
||||
const ScriptExecutionContextIdentifier m_clientIdentifier;
|
||||
void* impl_ { nullptr };
|
||||
WebWorkerLifecycleHandle* lifecycleHandle_ { nullptr };
|
||||
};
|
||||
|
||||
JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject);
|
||||
|
||||
@@ -11,8 +11,13 @@ tasks: Queue = undefined,
|
||||
/// - immediate_tasks: tasks that will run on the current tick
|
||||
///
|
||||
/// Having two queues avoids infinite loops creating by calling `setImmediate` in a `setImmediate` callback.
|
||||
/// We also have immediate_cpp_tasks and next_immediate_cpp_tasks which are basically
|
||||
/// exactly the same thing, except these just come from c++ code. The behaviour and theory
|
||||
/// for executing them is the same. You can call "globalObject->queueImmediateCppTask()" to queue a task from cpp
|
||||
immediate_tasks: std.ArrayListUnmanaged(*Timer.ImmediateObject) = .{},
|
||||
immediate_cpp_tasks: std.ArrayListUnmanaged(*CppTask) = .{},
|
||||
next_immediate_tasks: std.ArrayListUnmanaged(*Timer.ImmediateObject) = .{},
|
||||
next_immediate_cpp_tasks: std.ArrayListUnmanaged(*CppTask) = .{},
|
||||
|
||||
concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
|
||||
global: *JSC.JSGlobalObject = undefined,
|
||||
@@ -29,6 +34,13 @@ imminent_gc_timer: std.atomic.Value(?*Timer.WTFTimer) = .{ .raw = null },
|
||||
|
||||
signal_handler: if (Environment.isPosix) ?*PosixSignalHandle else void = if (Environment.isPosix) null,
|
||||
|
||||
// this exists because while we're inside a spawnSync call, some tasks can actually
|
||||
// still complete which leads to a case where module resolution can partially complete and
|
||||
// some modules are only partialy evaluated which causes reference errors.
|
||||
// TODO: A better fix here could be a second event loop so we can come off the main one
|
||||
// while processing spawnSync, then resume back to here afterwards
|
||||
is_inside_spawn_sync: bool = false,
|
||||
|
||||
pub const Debug = if (Environment.isDebug) struct {
|
||||
is_inside_tick_queue: bool = false,
|
||||
js_call_count_outside_tick_queue: usize = 0,
|
||||
@@ -113,6 +125,11 @@ pub fn drainMicrotasksWithGlobal(this: *EventLoop, globalObject: *JSC.JSGlobalOb
|
||||
scope.init(globalObject, @src());
|
||||
defer scope.deinit();
|
||||
|
||||
// see is_inside_spawn_sync doc comment
|
||||
if (this.is_inside_spawn_sync) {
|
||||
return;
|
||||
}
|
||||
|
||||
jsc_vm.releaseWeakRefs();
|
||||
JSC__JSGlobalObject__drainMicrotasks(globalObject);
|
||||
try scope.assertNoExceptionExceptTermination();
|
||||
@@ -196,10 +213,19 @@ fn tickWithCount(this: *EventLoop, virtual_machine: *VirtualMachine) u32 {
|
||||
|
||||
pub fn tickImmediateTasks(this: *EventLoop, virtual_machine: *VirtualMachine) void {
|
||||
var to_run_now = this.immediate_tasks;
|
||||
var to_run_now_cpp = this.immediate_cpp_tasks;
|
||||
|
||||
this.immediate_tasks = this.next_immediate_tasks;
|
||||
this.next_immediate_tasks = .{};
|
||||
|
||||
this.immediate_cpp_tasks = this.next_immediate_cpp_tasks;
|
||||
this.next_immediate_cpp_tasks = .{};
|
||||
|
||||
for (to_run_now_cpp.items) |task| {
|
||||
log("running immediate cpp task", .{});
|
||||
task.run(virtual_machine.global);
|
||||
}
|
||||
|
||||
var exception_thrown = false;
|
||||
for (to_run_now.items) |task| {
|
||||
exception_thrown = task.runImmediateTask(virtual_machine);
|
||||
@@ -210,6 +236,22 @@ pub fn tickImmediateTasks(this: *EventLoop, virtual_machine: *VirtualMachine) vo
|
||||
this.maybeDrainMicrotasks();
|
||||
}
|
||||
|
||||
if (this.next_immediate_cpp_tasks.capacity > 0) {
|
||||
// this would only occur if we were recursively running tickImmediateTasks.
|
||||
@branchHint(.unlikely);
|
||||
this.immediate_cpp_tasks.appendSlice(bun.default_allocator, this.next_immediate_cpp_tasks.items) catch bun.outOfMemory();
|
||||
this.next_immediate_cpp_tasks.deinit(bun.default_allocator);
|
||||
}
|
||||
|
||||
if (to_run_now_cpp.capacity > 1024 * 128) {
|
||||
// once in a while, deinit the array to free up memory
|
||||
to_run_now_cpp.clearAndFree(bun.default_allocator);
|
||||
} else {
|
||||
to_run_now_cpp.clearRetainingCapacity();
|
||||
}
|
||||
|
||||
this.next_immediate_cpp_tasks = to_run_now_cpp;
|
||||
|
||||
if (this.next_immediate_tasks.capacity > 0) {
|
||||
// this would only occur if we were recursively running tickImmediateTasks.
|
||||
@branchHint(.unlikely);
|
||||
@@ -521,6 +563,10 @@ pub fn enqueueTask(this: *EventLoop, task: Task) void {
|
||||
this.tasks.writeItem(task) catch unreachable;
|
||||
}
|
||||
|
||||
pub fn enqueueImmediateCppTask(this: *EventLoop, task: *CppTask) void {
|
||||
this.immediate_cpp_tasks.append(bun.default_allocator, task) catch bun.outOfMemory();
|
||||
}
|
||||
|
||||
pub fn enqueueImmediateTask(this: *EventLoop, task: *Timer.ImmediateObject) void {
|
||||
this.immediate_tasks.append(bun.default_allocator, task) catch bun.outOfMemory();
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "JavaScriptCore/Completion.h"
|
||||
#include "JavaScriptCore/JSNativeStdFunction.h"
|
||||
#include "JSCommonJSExtensions.h"
|
||||
#include "JSCommonJSModule.h"
|
||||
|
||||
#include "PathInlines.h"
|
||||
#include "ZigGlobalObject.h"
|
||||
@@ -713,6 +714,16 @@ static JSValue getModulePrototypeObject(VM& vm, JSObject* moduleObject)
|
||||
setterRequireFunction),
|
||||
0);
|
||||
|
||||
prototype->putDirectNativeFunction(
|
||||
vm,
|
||||
globalObject,
|
||||
JSC::Identifier::fromString(vm, "_compile"_s),
|
||||
2,
|
||||
functionJSCommonJSModule_compile,
|
||||
JSC::ImplementationVisibility::Public,
|
||||
JSC::NoIntrinsic,
|
||||
static_cast<unsigned>(JSC::PropertyAttribute::DontEnum));
|
||||
|
||||
return prototype;
|
||||
}
|
||||
|
||||
|
||||
@@ -260,7 +260,7 @@ fn setCwd_(globalObject: *JSC.JSGlobalObject, to: *JSC.ZigString) bun.JSError!JS
|
||||
// TODO(@190n) this may need to be noreturn
|
||||
pub fn exit(globalObject: *JSC.JSGlobalObject, code: u8) callconv(.c) void {
|
||||
var vm = globalObject.bunVM();
|
||||
vm.exit_handler.exit_code = code;
|
||||
vm.exit_handler.exit_code = code; // TODO(@alii): https://github.com/oven-sh/bun/pull/20213
|
||||
if (vm.worker) |worker| {
|
||||
// TODO(@190n) we may need to use requestTerminate or throwTerminationException
|
||||
// instead to terminate the worker sooner
|
||||
|
||||
@@ -78,6 +78,12 @@ pub export fn Bun__queueTask(global: *JSGlobalObject, task: *JSC.CppTask) void {
|
||||
global.bunVM().eventLoop().enqueueTask(JSC.Task.init(task));
|
||||
}
|
||||
|
||||
pub export fn Bun__queueImmediateCppTask(global: *JSGlobalObject, task: *JSC.CppTask) void {
|
||||
JSC.markBinding(@src());
|
||||
|
||||
global.bunVM().eventLoop().enqueueImmediateCppTask(task);
|
||||
}
|
||||
|
||||
pub export fn Bun__queueTaskWithTimeout(global: *JSGlobalObject, task: *JSC.CppTask, milliseconds: i32) void {
|
||||
JSC.markBinding(@src());
|
||||
|
||||
|
||||
@@ -8,6 +8,10 @@ const JSValue = jsc.JSValue;
|
||||
const Async = bun.Async;
|
||||
const WTFStringImpl = @import("../string.zig").WTFStringImpl;
|
||||
const WebWorker = @This();
|
||||
const RefCount = bun.ptr.ThreadSafeRefCount(@This(), "ref_count", destroy, .{});
|
||||
pub const new = bun.TrivialNew(@This());
|
||||
pub const ref = RefCount.ref;
|
||||
pub const deref = RefCount.deref;
|
||||
|
||||
/// null when haven't started yet
|
||||
vm: ?*jsc.VirtualMachine = null,
|
||||
@@ -18,6 +22,9 @@ execution_context_id: u32 = 0,
|
||||
parent_context_id: u32 = 0,
|
||||
parent: *jsc.VirtualMachine,
|
||||
|
||||
ref_count: RefCount,
|
||||
lifecycle_handle: *WebWorkerLifecycleHandle,
|
||||
|
||||
/// To be resolved on the Worker thread at startup, in spin().
|
||||
unresolved_specifier: []const u8,
|
||||
preloads: [][]const u8 = &.{},
|
||||
@@ -70,15 +77,15 @@ pub fn setRequestedTerminate(this: *WebWorker) bool {
|
||||
return this.requested_terminate.swap(true, .release);
|
||||
}
|
||||
|
||||
export fn WebWorker__updatePtr(worker: *WebWorker, ptr: *anyopaque) bool {
|
||||
worker.cpp_worker = ptr;
|
||||
export fn WebWorker__updatePtr(handle: *WebWorkerLifecycleHandle, ptr: *anyopaque) bool {
|
||||
handle.worker.?.cpp_worker = ptr;
|
||||
|
||||
var thread = std.Thread.spawn(
|
||||
.{ .stack_size = bun.default_thread_stack_size },
|
||||
startWithErrorHandling,
|
||||
.{worker},
|
||||
.{handle.worker.?},
|
||||
) catch {
|
||||
worker.deinit();
|
||||
handle.worker.?.destroy();
|
||||
return false;
|
||||
};
|
||||
thread.detach();
|
||||
@@ -194,6 +201,7 @@ pub fn create(
|
||||
execArgv_len: usize,
|
||||
preload_modules_ptr: ?[*]bun.String,
|
||||
preload_modules_len: usize,
|
||||
lifecycle_handle: *WebWorkerLifecycleHandle,
|
||||
) callconv(.c) ?*WebWorker {
|
||||
jsc.markBinding(@src());
|
||||
log("[{d}] WebWorker.create", .{this_context_id});
|
||||
@@ -224,8 +232,9 @@ pub fn create(
|
||||
}
|
||||
}
|
||||
|
||||
var worker = bun.default_allocator.create(WebWorker) catch bun.outOfMemory();
|
||||
worker.* = WebWorker{
|
||||
const worker = WebWorker.new(.{
|
||||
.lifecycle_handle = lifecycle_handle,
|
||||
.ref_count = .init(),
|
||||
.cpp_worker = cpp_worker,
|
||||
.parent = parent,
|
||||
.parent_context_id = parent_context_id,
|
||||
@@ -245,10 +254,10 @@ pub fn create(
|
||||
.argv = if (argv_ptr) |ptr| ptr[0..argv_len] else &.{},
|
||||
.execArgv = if (inherit_execArgv) null else (if (execArgv_ptr) |ptr| ptr[0..execArgv_len] else &.{}),
|
||||
.preloads = preloads.items,
|
||||
};
|
||||
});
|
||||
|
||||
worker.parent_poll_ref.ref(parent);
|
||||
|
||||
worker.ref();
|
||||
return worker;
|
||||
}
|
||||
|
||||
@@ -264,6 +273,7 @@ pub fn startWithErrorHandling(
|
||||
pub fn start(
|
||||
this: *WebWorker,
|
||||
) anyerror!void {
|
||||
errdefer this.deref();
|
||||
if (this.name.len > 0) {
|
||||
Output.Source.configureNamedThread(this.name);
|
||||
} else {
|
||||
@@ -364,7 +374,10 @@ fn deinit(this: *WebWorker) void {
|
||||
bun.default_allocator.free(preload);
|
||||
}
|
||||
bun.default_allocator.free(this.preloads);
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
fn destroy(this: *WebWorker) void {
|
||||
bun.destroy(this);
|
||||
}
|
||||
|
||||
fn flushLogs(this: *WebWorker) void {
|
||||
@@ -388,7 +401,7 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
|
||||
|
||||
var buffered_writer_ = bun.MutableString.BufferedWriter{ .context = &array };
|
||||
var buffered_writer = &buffered_writer_;
|
||||
var worker = vm.worker orelse @panic("Assertion failure: no worker");
|
||||
const worker = vm.worker orelse @panic("Assertion failure: no worker");
|
||||
|
||||
const writer = buffered_writer.writer();
|
||||
const Writer = @TypeOf(writer);
|
||||
@@ -421,9 +434,12 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
|
||||
jsc.markBinding(@src());
|
||||
WebWorker__dispatchError(globalObject, worker.cpp_worker, bun.String.createUTF8(array.slice()), error_instance);
|
||||
if (vm.worker) |worker_| {
|
||||
_ = worker.setRequestedTerminate();
|
||||
worker.parent_poll_ref.unrefConcurrently(worker.parent);
|
||||
worker_.exitAndDeinit();
|
||||
// During unhandled rejection, we're already holding the API lock - now
|
||||
// is the right time to set exit_called to true so that
|
||||
// notifyNeedTermination uses vm.global.requestTermination() instead of
|
||||
// vm.jsc.notifyNeedTermination() which avoid VMTraps assertion failures
|
||||
worker_.exit_called = true;
|
||||
worker_.lifecycle_handle.requestTermination();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -528,12 +544,13 @@ fn spin(this: *WebWorker) void {
|
||||
}
|
||||
|
||||
/// This is worker.ref()/.unref() from JS (Caller thread)
|
||||
pub fn setRef(this: *WebWorker, value: bool) callconv(.c) void {
|
||||
if (this.hasRequestedTerminate()) {
|
||||
return;
|
||||
pub fn setRef(handle: *WebWorkerLifecycleHandle, value: bool) callconv(.c) void {
|
||||
if (handle.worker) |worker| {
|
||||
if (worker.hasRequestedTerminate()) {
|
||||
return;
|
||||
}
|
||||
worker.setRefInternal(value);
|
||||
}
|
||||
|
||||
this.setRefInternal(value);
|
||||
}
|
||||
|
||||
pub fn setRefInternal(this: *WebWorker, value: bool) void {
|
||||
@@ -551,10 +568,11 @@ pub fn exit(this: *WebWorker) void {
|
||||
}
|
||||
|
||||
/// Request a terminate from any thread.
|
||||
pub fn notifyNeedTermination(this: *WebWorker) callconv(.c) void {
|
||||
pub fn notifyNeedTermination(this: *WebWorker) void {
|
||||
if (this.status.load(.acquire) == .terminated) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.setRequestedTerminate()) {
|
||||
return;
|
||||
}
|
||||
@@ -562,11 +580,17 @@ pub fn notifyNeedTermination(this: *WebWorker) callconv(.c) void {
|
||||
|
||||
if (this.vm) |vm| {
|
||||
vm.eventLoop().wakeup();
|
||||
// TODO(@190n) notifyNeedTermination
|
||||
}
|
||||
|
||||
// TODO(@190n) delete
|
||||
this.setRefInternal(false);
|
||||
if (this.exit_called) {
|
||||
// For process.exit() called from JavaScript, use JSC's termination
|
||||
// exception mechanism to interrupt ongoing JS execution
|
||||
vm.global.requestTermination();
|
||||
} else {
|
||||
// For external terminate requests (e.g worker.terminate() from parent thread),
|
||||
// use VM traps system
|
||||
vm.jsc.notifyNeedTermination();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This handles cleanup, emitting the "close" event, and deinit.
|
||||
@@ -579,6 +603,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
|
||||
|
||||
log("[{d}] exitAndDeinit", .{this.execution_context_id});
|
||||
const cpp_worker = this.cpp_worker;
|
||||
|
||||
var exit_code: i32 = 0;
|
||||
var globalObject: ?*jsc.JSGlobalObject = null;
|
||||
var vm_to_deinit: ?*jsc.VirtualMachine = null;
|
||||
@@ -593,7 +618,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
|
||||
vm_to_deinit = vm;
|
||||
}
|
||||
var arena = this.arena;
|
||||
|
||||
this.lifecycle_handle.onTermination();
|
||||
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);
|
||||
if (loop) |loop_| {
|
||||
loop_.internal_loop_data.jsc_vm = null;
|
||||
@@ -606,18 +631,117 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
|
||||
vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm.
|
||||
}
|
||||
bun.deleteAllPoolsForThreadExit();
|
||||
|
||||
if (arena) |*arena_| {
|
||||
arena_.deinit();
|
||||
}
|
||||
|
||||
this.deref();
|
||||
bun.exitThread();
|
||||
}
|
||||
|
||||
pub export fn WebWorkerLifecycleHandle__requestTermination(handle: ?*WebWorkerLifecycleHandle) void {
|
||||
if (handle) |h| {
|
||||
h.requestTermination();
|
||||
}
|
||||
}
|
||||
|
||||
/// Manages the complex timing surrounding web worker creation and destruction
|
||||
const WebWorkerLifecycleHandle = struct {
|
||||
const RefCount = bun.ptr.ThreadSafeRefCount(@This(), "ref_count", WebWorkerLifecycleHandle.deinit, .{});
|
||||
pub const ref = WebWorkerLifecycleHandle.RefCount.ref;
|
||||
pub const deref = WebWorkerLifecycleHandle.RefCount.deref;
|
||||
|
||||
mutex: bun.Mutex = .{},
|
||||
worker: ?*WebWorker = null,
|
||||
requested_terminate: std.atomic.Value(bool) = .init(false),
|
||||
ref_count: WebWorkerLifecycleHandle.RefCount,
|
||||
|
||||
pub const new = bun.TrivialNew(WebWorkerLifecycleHandle);
|
||||
|
||||
pub fn createWebWorker(
|
||||
cpp_worker: *void,
|
||||
parent: *jsc.VirtualMachine,
|
||||
name_str: bun.String,
|
||||
specifier_str: bun.String,
|
||||
error_message: *bun.String,
|
||||
parent_context_id: u32,
|
||||
this_context_id: u32,
|
||||
mini: bool,
|
||||
default_unref: bool,
|
||||
eval_mode: bool,
|
||||
argv_ptr: ?[*]WTFStringImpl,
|
||||
argv_len: usize,
|
||||
inherit_execArgv: bool,
|
||||
execArgv_ptr: ?[*]WTFStringImpl,
|
||||
execArgv_len: usize,
|
||||
preload_modules_ptr: ?[*]bun.String,
|
||||
preload_modules_len: usize,
|
||||
) callconv(.c) *WebWorkerLifecycleHandle {
|
||||
const handle = WebWorkerLifecycleHandle.new(.{
|
||||
.worker = null,
|
||||
.ref_count = .init(),
|
||||
});
|
||||
|
||||
const worker = create(cpp_worker, parent, name_str, specifier_str, error_message, parent_context_id, this_context_id, mini, default_unref, eval_mode, argv_ptr, argv_len, inherit_execArgv, execArgv_ptr, execArgv_len, preload_modules_ptr, preload_modules_len, handle);
|
||||
|
||||
handle.worker = worker;
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
pub fn deinit(this: *WebWorkerLifecycleHandle) void {
|
||||
bun.destroy(this);
|
||||
}
|
||||
|
||||
pub fn requestTermination(self: *WebWorkerLifecycleHandle) void {
|
||||
if (self.requested_terminate.load(.acquire)) {
|
||||
return;
|
||||
}
|
||||
|
||||
self.ref();
|
||||
self.mutex.lock();
|
||||
|
||||
if (self.requested_terminate.swap(true, .monotonic)) {
|
||||
self.mutex.unlock();
|
||||
self.deref();
|
||||
return;
|
||||
}
|
||||
|
||||
if (self.worker) |worker| {
|
||||
self.worker = null;
|
||||
worker.notifyNeedTermination();
|
||||
self.mutex.unlock();
|
||||
worker.deref();
|
||||
} else {
|
||||
self.mutex.unlock();
|
||||
// Let the reference counting system handle deinitialization
|
||||
self.deref();
|
||||
}
|
||||
|
||||
self.deref();
|
||||
}
|
||||
|
||||
pub fn onTermination(self: *WebWorkerLifecycleHandle) void {
|
||||
self.ref();
|
||||
self.mutex.lock();
|
||||
if (self.requested_terminate.swap(false, .acquire)) {
|
||||
// we already requested to terminate, therefore this handle has
|
||||
// already been consumed on the other thread and we are able to free
|
||||
// it. Let the reference counting system handle deinitialization.
|
||||
self.mutex.unlock();
|
||||
self.deref();
|
||||
return;
|
||||
}
|
||||
self.worker = null;
|
||||
self.mutex.unlock();
|
||||
self.deref();
|
||||
}
|
||||
};
|
||||
|
||||
comptime {
|
||||
@export(&create, .{ .name = "WebWorker__create" });
|
||||
@export(¬ifyNeedTermination, .{ .name = "WebWorker__notifyNeedTermination" });
|
||||
@export(&WebWorkerLifecycleHandle.createWebWorker, .{ .name = "WebWorkerLifecycleHandle__createWebWorker" });
|
||||
@export(&setRef, .{ .name = "WebWorker__setRef" });
|
||||
_ = WebWorker__updatePtr;
|
||||
}
|
||||
|
||||
const assert = bun.assert;
|
||||
|
||||
@@ -357,8 +357,9 @@ JSC_DEFINE_HOST_FUNCTION(functionStartDirectStream, (JSC::JSGlobalObject * lexic
|
||||
templ += `
|
||||
|
||||
void ${className}::ref() {
|
||||
if (!m_sinkPtr)
|
||||
return;
|
||||
if (!m_sinkPtr) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_refCount++;
|
||||
if (m_refCount == 1) {
|
||||
@@ -367,14 +368,14 @@ JSC_DEFINE_HOST_FUNCTION(functionStartDirectStream, (JSC::JSGlobalObject * lexic
|
||||
}
|
||||
|
||||
void ${className}::unref() {
|
||||
if (!m_sinkPtr)
|
||||
return;
|
||||
if (!m_sinkPtr) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_refCount = std::max(0, m_refCount - 1);
|
||||
if (!m_refCount)
|
||||
{
|
||||
m_refCount = std::max(0, m_refCount - 1);
|
||||
if (!m_refCount) {
|
||||
${name}__updateRef(m_sinkPtr, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
JSC_DEFINE_HOST_FUNCTION(${name}__ref, (JSC::JSGlobalObject * lexicalGlobalObject, JSC::CallFrame *callFrame))
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import type { Pipe as NodeStreamPipe } from "node:stream";
|
||||
|
||||
// Hardcoded module "node:child_process"
|
||||
const EventEmitter = require("node:events");
|
||||
const OsModule = require("node:os");
|
||||
@@ -1035,13 +1037,15 @@ class ChildProcess extends EventEmitter {
|
||||
#handle;
|
||||
#closesNeeded = 1;
|
||||
#closesGot = 0;
|
||||
disconnect: undefined | (() => void);
|
||||
|
||||
signalCode = null;
|
||||
exitCode = null;
|
||||
spawnfile;
|
||||
spawnargs;
|
||||
pid;
|
||||
channel;
|
||||
|
||||
channel: NodeStreamPipe | undefined;
|
||||
killed = false;
|
||||
|
||||
[Symbol.dispose]() {
|
||||
@@ -1332,7 +1336,7 @@ class ChildProcess extends EventEmitter {
|
||||
if (has_ipc) {
|
||||
this.send = this.#send;
|
||||
this.disconnect = this.#disconnect;
|
||||
this.channel = new Control();
|
||||
this.channel = new SubprocessChannel(this);
|
||||
Object.defineProperty(this, "_channel", {
|
||||
get() {
|
||||
return this.channel;
|
||||
@@ -1624,9 +1628,46 @@ function abortChildProcess(child, killSignal, reason) {
|
||||
}
|
||||
}
|
||||
|
||||
class Control extends EventEmitter {
|
||||
constructor() {
|
||||
class SubprocessChannel extends EventEmitter implements NodeStreamPipe {
|
||||
#subprocess: ChildProcess;
|
||||
#closed: boolean = false;
|
||||
|
||||
public constructor(childProcess: ChildProcess) {
|
||||
super();
|
||||
this.#subprocess = childProcess;
|
||||
}
|
||||
|
||||
public close(): void {
|
||||
if (this.#closed) return;
|
||||
|
||||
this.#closed = true;
|
||||
|
||||
if (this.#subprocess.connected) {
|
||||
this.#subprocess.disconnect?.();
|
||||
}
|
||||
|
||||
process.nextTick(() => {
|
||||
this.emit("close");
|
||||
});
|
||||
}
|
||||
|
||||
public hasRef(): boolean {
|
||||
if (this.#closed) return false;
|
||||
|
||||
const handle = this.#subprocess[kHandle];
|
||||
if (!handle) return false;
|
||||
|
||||
return this.#subprocess.connected;
|
||||
}
|
||||
|
||||
public ref(): void {
|
||||
if (this.#closed) return;
|
||||
this.#subprocess.ref();
|
||||
}
|
||||
|
||||
public unref(): void {
|
||||
if (this.#closed) return;
|
||||
this.#subprocess.unref();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// import type { Readable, Writable } from "node:stream";
|
||||
|
||||
// import type { WorkerOptions } from "node:worker_threads";
|
||||
declare const self: typeof globalThis;
|
||||
type WebWorker = InstanceType<typeof globalThis.Worker>;
|
||||
@@ -225,15 +226,19 @@ function moveMessagePortToContext() {
|
||||
|
||||
const unsupportedOptions = ["stdin", "stdout", "stderr", "trackedUnmanagedFds", "resourceLimits"];
|
||||
|
||||
class Worker extends EventEmitter {
|
||||
class Worker extends EventEmitter implements AsyncDisposable {
|
||||
#worker: WebWorker;
|
||||
#performance;
|
||||
|
||||
// this is used by terminate();
|
||||
// either is the exit code if exited, a promise resolving to the exit code, or undefined if we haven't sent .terminate() yet
|
||||
#onExitPromise: Promise<number> | number | undefined = undefined;
|
||||
#onExitResolvers = Promise.withResolvers<number | void>();
|
||||
#urlToRevoke = "";
|
||||
|
||||
async [Symbol.asyncDispose]() {
|
||||
await this.terminate();
|
||||
}
|
||||
|
||||
constructor(filename: string, options: NodeWorkerOptions = {}) {
|
||||
super();
|
||||
for (const key of unsupportedOptions) {
|
||||
@@ -319,7 +324,13 @@ class Worker extends EventEmitter {
|
||||
});
|
||||
}
|
||||
|
||||
terminate(callback: unknown) {
|
||||
terminate(callback?: unknown): Promise<number | void> {
|
||||
// threadId = -1 signifies the worker was closed already. Node returns PromiseResolve() in this case
|
||||
// https://github.com/nodejs/node/blob/61601089f7f2f0e5e7abe8240f198585f585704c/lib/internal/worker.js#L390
|
||||
if (this.threadId === -1) {
|
||||
return Promise.resolve<void>();
|
||||
}
|
||||
|
||||
if (typeof callback === "function") {
|
||||
process.emitWarning(
|
||||
"Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.",
|
||||
@@ -329,12 +340,8 @@ class Worker extends EventEmitter {
|
||||
this.#worker.addEventListener("close", event => callback(null, event.code), { once: true });
|
||||
}
|
||||
|
||||
const onExitPromise = this.#onExitPromise;
|
||||
if (onExitPromise) {
|
||||
return $isPromise(onExitPromise) ? onExitPromise : Promise.resolve(onExitPromise);
|
||||
}
|
||||
const { resolve, promise } = this.#onExitResolvers;
|
||||
|
||||
const { resolve, promise } = Promise.withResolvers();
|
||||
this.#worker.addEventListener(
|
||||
"close",
|
||||
event => {
|
||||
@@ -342,12 +349,13 @@ class Worker extends EventEmitter {
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
|
||||
this.#worker.terminate();
|
||||
|
||||
return (this.#onExitPromise = promise);
|
||||
return promise;
|
||||
}
|
||||
|
||||
postMessage(...args: [any, any]) {
|
||||
postMessage(...args: Parameters<Bun.Worker["postMessage"]>) {
|
||||
return this.#worker.postMessage.$apply(this.#worker, args);
|
||||
}
|
||||
|
||||
@@ -356,8 +364,8 @@ class Worker extends EventEmitter {
|
||||
return stringPromise.then(s => new HeapSnapshotStream(s));
|
||||
}
|
||||
|
||||
#onClose(e) {
|
||||
this.#onExitPromise = e.code;
|
||||
#onClose(e: Event & { code: number }) {
|
||||
this.#onExitResolvers.resolve(e.code);
|
||||
this.emit("exit", e.code);
|
||||
}
|
||||
|
||||
|
||||
@@ -62,6 +62,7 @@
|
||||
"pg-gateway": "0.3.0-beta.4",
|
||||
"pino": "9.4.0",
|
||||
"pino-pretty": "11.2.2",
|
||||
"piscina": "5.0.0",
|
||||
"postgres": "3.3.5",
|
||||
"prisma": "5.1.1",
|
||||
"prompts": "2.4.2",
|
||||
@@ -444,6 +445,40 @@
|
||||
|
||||
"@napi-rs/canvas-win32-x64-msvc": ["@napi-rs/canvas-win32-x64-msvc@0.1.65", "", { "os": "win32", "cpu": "x64" }, "sha512-RZQX3luWnlNWgdMnLMQ1hyfQraeAn9lnxWWVCHuUM4tAWEV8UDdeb7cMwmJW7eyt8kAosmjeHt3cylQMHOxGFg=="],
|
||||
|
||||
"@napi-rs/nice": ["@napi-rs/nice@1.0.1", "", { "optionalDependencies": { "@napi-rs/nice-android-arm-eabi": "1.0.1", "@napi-rs/nice-android-arm64": "1.0.1", "@napi-rs/nice-darwin-arm64": "1.0.1", "@napi-rs/nice-darwin-x64": "1.0.1", "@napi-rs/nice-freebsd-x64": "1.0.1", "@napi-rs/nice-linux-arm-gnueabihf": "1.0.1", "@napi-rs/nice-linux-arm64-gnu": "1.0.1", "@napi-rs/nice-linux-arm64-musl": "1.0.1", "@napi-rs/nice-linux-ppc64-gnu": "1.0.1", "@napi-rs/nice-linux-riscv64-gnu": "1.0.1", "@napi-rs/nice-linux-s390x-gnu": "1.0.1", "@napi-rs/nice-linux-x64-gnu": "1.0.1", "@napi-rs/nice-linux-x64-musl": "1.0.1", "@napi-rs/nice-win32-arm64-msvc": "1.0.1", "@napi-rs/nice-win32-ia32-msvc": "1.0.1", "@napi-rs/nice-win32-x64-msvc": "1.0.1" } }, "sha512-zM0mVWSXE0a0h9aKACLwKmD6nHcRiKrPpCfvaKqG1CqDEyjEawId0ocXxVzPMCAm6kkWr2P025msfxXEnt8UGQ=="],
|
||||
|
||||
"@napi-rs/nice-android-arm-eabi": ["@napi-rs/nice-android-arm-eabi@1.0.1", "", { "os": "android", "cpu": "arm" }, "sha512-5qpvOu5IGwDo7MEKVqqyAxF90I6aLj4n07OzpARdgDRfz8UbBztTByBp0RC59r3J1Ij8uzYi6jI7r5Lws7nn6w=="],
|
||||
|
||||
"@napi-rs/nice-android-arm64": ["@napi-rs/nice-android-arm64@1.0.1", "", { "os": "android", "cpu": "arm64" }, "sha512-GqvXL0P8fZ+mQqG1g0o4AO9hJjQaeYG84FRfZaYjyJtZZZcMjXW5TwkL8Y8UApheJgyE13TQ4YNUssQaTgTyvA=="],
|
||||
|
||||
"@napi-rs/nice-darwin-arm64": ["@napi-rs/nice-darwin-arm64@1.0.1", "", { "os": "darwin", "cpu": "arm64" }, "sha512-91k3HEqUl2fsrz/sKkuEkscj6EAj3/eZNCLqzD2AA0TtVbkQi8nqxZCZDMkfklULmxLkMxuUdKe7RvG/T6s2AA=="],
|
||||
|
||||
"@napi-rs/nice-darwin-x64": ["@napi-rs/nice-darwin-x64@1.0.1", "", { "os": "darwin", "cpu": "x64" }, "sha512-jXnMleYSIR/+TAN/p5u+NkCA7yidgswx5ftqzXdD5wgy/hNR92oerTXHc0jrlBisbd7DpzoaGY4cFD7Sm5GlgQ=="],
|
||||
|
||||
"@napi-rs/nice-freebsd-x64": ["@napi-rs/nice-freebsd-x64@1.0.1", "", { "os": "freebsd", "cpu": "x64" }, "sha512-j+iJ/ezONXRQsVIB/FJfwjeQXX7A2tf3gEXs4WUGFrJjpe/z2KB7sOv6zpkm08PofF36C9S7wTNuzHZ/Iiccfw=="],
|
||||
|
||||
"@napi-rs/nice-linux-arm-gnueabihf": ["@napi-rs/nice-linux-arm-gnueabihf@1.0.1", "", { "os": "linux", "cpu": "arm" }, "sha512-G8RgJ8FYXYkkSGQwywAUh84m946UTn6l03/vmEXBYNJxQJcD+I3B3k5jmjFG/OPiU8DfvxutOP8bi+F89MCV7Q=="],
|
||||
|
||||
"@napi-rs/nice-linux-arm64-gnu": ["@napi-rs/nice-linux-arm64-gnu@1.0.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-IMDak59/W5JSab1oZvmNbrms3mHqcreaCeClUjwlwDr0m3BoR09ZiN8cKFBzuSlXgRdZ4PNqCYNeGQv7YMTjuA=="],
|
||||
|
||||
"@napi-rs/nice-linux-arm64-musl": ["@napi-rs/nice-linux-arm64-musl@1.0.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-wG8fa2VKuWM4CfjOjjRX9YLIbysSVV1S3Kgm2Fnc67ap/soHBeYZa6AGMeR5BJAylYRjnoVOzV19Cmkco3QEPw=="],
|
||||
|
||||
"@napi-rs/nice-linux-ppc64-gnu": ["@napi-rs/nice-linux-ppc64-gnu@1.0.1", "", { "os": "linux", "cpu": "ppc64" }, "sha512-lxQ9WrBf0IlNTCA9oS2jg/iAjQyTI6JHzABV664LLrLA/SIdD+I1i3Mjf7TsnoUbgopBcCuDztVLfJ0q9ubf6Q=="],
|
||||
|
||||
"@napi-rs/nice-linux-riscv64-gnu": ["@napi-rs/nice-linux-riscv64-gnu@1.0.1", "", { "os": "linux", "cpu": "none" }, "sha512-3xs69dO8WSWBb13KBVex+yvxmUeEsdWexxibqskzoKaWx9AIqkMbWmE2npkazJoopPKX2ULKd8Fm9veEn0g4Ig=="],
|
||||
|
||||
"@napi-rs/nice-linux-s390x-gnu": ["@napi-rs/nice-linux-s390x-gnu@1.0.1", "", { "os": "linux", "cpu": "s390x" }, "sha512-lMFI3i9rlW7hgToyAzTaEybQYGbQHDrpRkg+1gJWEpH0PLAQoZ8jiY0IzakLfNWnVda1eTYYlxxFYzW8Rqczkg=="],
|
||||
|
||||
"@napi-rs/nice-linux-x64-gnu": ["@napi-rs/nice-linux-x64-gnu@1.0.1", "", { "os": "linux", "cpu": "x64" }, "sha512-XQAJs7DRN2GpLN6Fb+ZdGFeYZDdGl2Fn3TmFlqEL5JorgWKrQGRUrpGKbgZ25UeZPILuTKJ+OowG2avN8mThBA=="],
|
||||
|
||||
"@napi-rs/nice-linux-x64-musl": ["@napi-rs/nice-linux-x64-musl@1.0.1", "", { "os": "linux", "cpu": "x64" }, "sha512-/rodHpRSgiI9o1faq9SZOp/o2QkKQg7T+DK0R5AkbnI/YxvAIEHf2cngjYzLMQSQgUhxym+LFr+UGZx4vK4QdQ=="],
|
||||
|
||||
"@napi-rs/nice-win32-arm64-msvc": ["@napi-rs/nice-win32-arm64-msvc@1.0.1", "", { "os": "win32", "cpu": "arm64" }, "sha512-rEcz9vZymaCB3OqEXoHnp9YViLct8ugF+6uO5McifTedjq4QMQs3DHz35xBEGhH3gJWEsXMUbzazkz5KNM5YUg=="],
|
||||
|
||||
"@napi-rs/nice-win32-ia32-msvc": ["@napi-rs/nice-win32-ia32-msvc@1.0.1", "", { "os": "win32", "cpu": "ia32" }, "sha512-t7eBAyPUrWL8su3gDxw9xxxqNwZzAqKo0Szv3IjVQd1GpXXVkb6vBBQUuxfIYaXMzZLwlxRQ7uzM2vdUE9ULGw=="],
|
||||
|
||||
"@napi-rs/nice-win32-x64-msvc": ["@napi-rs/nice-win32-x64-msvc@1.0.1", "", { "os": "win32", "cpu": "x64" }, "sha512-JlF+uDcatt3St2ntBG8H02F1mM45i5SF9W+bIKiReVE6wiy3o16oBP/yxt+RZ+N6LbCImJXJ6bXNO2kn9AXicg=="],
|
||||
|
||||
"@nestjs/common": ["@nestjs/common@11.0.3", "", { "dependencies": { "iterare": "1.2.1", "tslib": "2.8.1", "uid": "2.0.2" }, "peerDependencies": { "class-transformer": "*", "class-validator": "*", "reflect-metadata": "^0.1.12 || ^0.2.0", "rxjs": "^7.1.0" }, "optionalPeers": ["class-transformer", "class-validator"] }, "sha512-fTkJWQ20+jvPKfrv3A+T3wsPwwYSJyoJ+pcBzyKtv5fCpK/yX/rJalFUIpw1CDmarfqIaMX9SdkplNyxtvH6RA=="],
|
||||
|
||||
"@nestjs/core": ["@nestjs/core@11.0.3", "", { "dependencies": { "@nuxt/opencollective": "0.4.1", "fast-safe-stringify": "2.1.1", "iterare": "1.2.1", "path-to-regexp": "8.2.0", "tslib": "2.8.1", "uid": "2.0.2" }, "peerDependencies": { "@nestjs/common": "^11.0.0", "@nestjs/microservices": "^11.0.0", "@nestjs/platform-express": "^11.0.0", "@nestjs/websockets": "^11.0.0", "reflect-metadata": "^0.1.12 || ^0.2.0", "rxjs": "^7.1.0" }, "optionalPeers": ["@nestjs/microservices", "@nestjs/platform-express", "@nestjs/websockets"] }, "sha512-6UoVHpwa23HJxMNtuTXQCiqx/NHTG3lRBRgnZ8EDHTjgaNnR7P+xBS68zN3gLH7rBIrhhQ5Q1hVs7WswRxrw7Q=="],
|
||||
@@ -2028,6 +2063,8 @@
|
||||
|
||||
"pino-std-serializers": ["pino-std-serializers@7.0.0", "", {}, "sha512-e906FRY0+tV27iq4juKzSYPbUj2do2X2JX4EzSca1631EB2QJQUqGbDuERal7LCtOpxl6x3+nvo9NPZcmjkiFA=="],
|
||||
|
||||
"piscina": ["piscina@5.0.0", "", { "optionalDependencies": { "@napi-rs/nice": "^1.0.1" } }, "sha512-R+arufwL7sZvGjAhSMK3TfH55YdGOqhpKXkcwQJr432AAnJX/xxX19PA4QisrmJ+BTTfZVggaz6HexbkQq1l1Q=="],
|
||||
|
||||
"pixelmatch": ["pixelmatch@5.3.0", "", { "dependencies": { "pngjs": "^6.0.0" }, "bin": { "pixelmatch": "bin/pixelmatch" } }, "sha512-o8mkY4E/+LNUf6LzX96ht6k6CEDi65k9G2rjMtBe9Oo+VPKSvl+0GKHuH/AlG+GA5LPG/i5hrekkxUc3s2HU+Q=="],
|
||||
|
||||
"pkg-dir": ["pkg-dir@4.2.0", "", { "dependencies": { "find-up": "^4.0.0" } }, "sha512-HRDzbaKjC+AOWVXxAU/x54COGeIv9eb+6CkDSQoNTt4XyWoIJvuPsXizxu/Fr23EiekbtZwmh1IcIG/l/a10GQ=="],
|
||||
|
||||
@@ -87,7 +87,7 @@ export function testForFile(file: string): BunTestExports {
|
||||
|
||||
var testFile = testFiles.get(file);
|
||||
if (!testFile) {
|
||||
testFile = Bun.jest(file);
|
||||
testFile = (Bun as typeof Bun & { jest: (absoluteSourceFilePath: string) => BunTestExports }).jest(file);
|
||||
testFiles.set(file, testFile);
|
||||
}
|
||||
return testFile;
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* without always needing to run `bun install` in development.
|
||||
*/
|
||||
|
||||
import { gc as bunGC, sleepSync, spawnSync, unsafe, which, write } from "bun";
|
||||
import { gc as bunGC, readableStreamToText, sleepSync, spawnSync, unsafe, which, write } from "bun";
|
||||
import { heapStats } from "bun:jsc";
|
||||
import { afterAll, beforeAll, describe, expect, test } from "bun:test";
|
||||
import { ChildProcess, fork } from "child_process";
|
||||
@@ -469,11 +469,43 @@ if (expect.extend)
|
||||
}
|
||||
}
|
||||
},
|
||||
async toRunAsync(cmds: string[], optionalStdout?: string, expectedCode: number = 0) {
|
||||
const result = Bun.spawn({
|
||||
cmd: [bunExe(), ...cmds],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "pipe", "pipe"],
|
||||
});
|
||||
|
||||
const [stdout, stderr, exitCode] = await Promise.all([
|
||||
readableStreamToText(result.stdout),
|
||||
readableStreamToText(result.stderr),
|
||||
result.exited,
|
||||
]);
|
||||
|
||||
if (exitCode !== expectedCode) {
|
||||
return {
|
||||
pass: false,
|
||||
message: () => `Command ${cmds.join(" ")} failed:` + "\n" + stdout + "\n" + stderr,
|
||||
};
|
||||
}
|
||||
|
||||
if (optionalStdout != null) {
|
||||
return {
|
||||
pass: stdout === optionalStdout,
|
||||
message: () => `Expected ${cmds.join(" ")} to output ${optionalStdout} but got ${stdout}`,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
pass: true,
|
||||
message: () => `Expected ${cmds.join(" ")} to run`,
|
||||
};
|
||||
},
|
||||
toRun(cmds: string[], optionalStdout?: string, expectedCode: number = 0) {
|
||||
const result = Bun.spawnSync({
|
||||
cmd: [bunExe(), ...cmds],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "pipe", "inherit"],
|
||||
stdio: ["ignore", "pipe", "inherit"],
|
||||
});
|
||||
|
||||
if (result.exitCode !== expectedCode) {
|
||||
@@ -1279,6 +1311,7 @@ interface BunHarnessTestMatchers {
|
||||
toHaveTestTimedOutAfter(expected: number): void;
|
||||
toBeBinaryType(expected: keyof typeof binaryTypes): void;
|
||||
toRun(optionalStdout?: string, expectedCode?: number): void;
|
||||
toRunAsync(optionalStdout?: string, expectedCode?: number): Promise<void>;
|
||||
toThrowWithCode(cls: CallableFunction, code: string): void;
|
||||
toThrowWithCodeAsync(cls: CallableFunction, code: string): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -187,3 +187,6 @@ tsd.expectAssignable<NullSubprocess>(Bun.spawn([], { stdio: ["ignore", "inherit"
|
||||
tsd.expectAssignable<NullSubprocess>(Bun.spawn([], { stdio: [null, null, null] }));
|
||||
|
||||
tsd.expectAssignable<SyncSubprocess<Bun.SpawnOptions.Readable, Bun.SpawnOptions.Readable>>(Bun.spawnSync([], {}));
|
||||
|
||||
Bun.spawnSync({ cmd: ["echo", "hello"] });
|
||||
Bun.spawnSync(["echo", "hello"], { stdio: ["ignore", "pipe", "pipe"] });
|
||||
|
||||
8
test/js/node/test/parallel/test-worker-dispose.mjs
Normal file
8
test/js/node/test/parallel/test-worker-dispose.mjs
Normal file
@@ -0,0 +1,8 @@
|
||||
import * as common from '../common/index.mjs';
|
||||
import { Worker } from 'node:worker_threads';
|
||||
|
||||
{
|
||||
// Verifies that the worker is async disposable
|
||||
await using worker = new Worker('for(;;) {}', { eval: true });
|
||||
worker.on('exit', common.mustCall());
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const { Worker } = require('worker_threads');
|
||||
|
||||
// Regression for https://github.com/nodejs/node/issues/43182.
|
||||
const w = new Worker(new URL('data:text/javascript,process.exit(1);await new Promise(()=>{ process.exit(2); })'));
|
||||
w.on('exit', common.mustCall((code) => {
|
||||
assert.strictEqual(code, 1);
|
||||
}));
|
||||
@@ -7,7 +7,7 @@ const eachSizeMiB = 100;
|
||||
const iterations = 5;
|
||||
|
||||
function test() {
|
||||
const code = " ".repeat(eachSizeMiB * 1024 * 1024);
|
||||
const code = Buffer.alloc(eachSizeMiB * 1024 * 1024, " ").toString();
|
||||
return new Promise((resolve, reject) => {
|
||||
const worker = new Worker(code, { eval: true });
|
||||
worker.on("exit", () => resolve());
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
import assert from "node:assert";
|
||||
import { test } from "node:test";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { MessageChannel, MessagePort, parentPort, Worker } from "node:worker_threads";
|
||||
|
||||
interface StartupMessage {
|
||||
port: MessagePort;
|
||||
}
|
||||
|
||||
if (parentPort) {
|
||||
parentPort.on("message", (message: StartupMessage) => {
|
||||
console.log("Worker received startup message");
|
||||
|
||||
message.port.postMessage("hello");
|
||||
message.port.close();
|
||||
});
|
||||
} else {
|
||||
test("worker lifecycle message port", async () => {
|
||||
const worker = new Worker(fileURLToPath(import.meta.url));
|
||||
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const { promise, resolve, reject } = Promise.withResolvers<string>();
|
||||
|
||||
port1.on("message", (message: string) => {
|
||||
console.log("Received message:", message);
|
||||
assert.equal(message, "hello");
|
||||
worker.terminate();
|
||||
resolve(message);
|
||||
});
|
||||
|
||||
worker.on("online", () => {
|
||||
console.log("Worker is online");
|
||||
const startupMessage: StartupMessage = { port: port2 };
|
||||
worker.postMessage(startupMessage, [port2]);
|
||||
});
|
||||
|
||||
worker.on("exit", () => {
|
||||
console.log("Worker exited");
|
||||
reject();
|
||||
});
|
||||
|
||||
worker.on("error", err => {
|
||||
reject(err);
|
||||
});
|
||||
|
||||
assert.equal(await promise, "hello");
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
import assert from "node:assert";
|
||||
import { setTimeout as sleep } from "node:timers/promises";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { Worker, isMainThread, threadId } from "node:worker_threads";
|
||||
|
||||
const sleeptime = 100;
|
||||
|
||||
if (isMainThread) {
|
||||
const worker = new Worker(fileURLToPath(import.meta.url));
|
||||
assert.strictEqual(threadId, 0);
|
||||
assert.strictEqual(worker.threadId, 1);
|
||||
console.log(" (main) threadId:", worker.threadId);
|
||||
|
||||
await sleep(sleeptime);
|
||||
assert.strictEqual(await worker.terminate(), 1);
|
||||
assert.strictEqual(worker.threadId, -1); // should be -1 after termination
|
||||
assert.strictEqual(await worker.terminate(), undefined); // sequential calling is basically no-op
|
||||
assert.strictEqual(worker.threadId, -1);
|
||||
} else {
|
||||
console.log("(worker) threadId:", threadId);
|
||||
assert.strictEqual(threadId, 1);
|
||||
await sleep(sleeptime * 2); // keep it alive definitely longer than the parent
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
import { once } from "node:events";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { Worker, isMainThread, parentPort } from "node:worker_threads";
|
||||
|
||||
if (isMainThread) {
|
||||
const { test, expect } = await import("bun:test");
|
||||
|
||||
test("process.exit() works", async () => {
|
||||
const worker = new Worker(fileURLToPath(import.meta.url));
|
||||
|
||||
worker.on("message", () => expect().fail("worker should not keep executing after process.exit()"));
|
||||
worker.postMessage("boom");
|
||||
|
||||
const [exitCode] = await once(worker, "exit");
|
||||
expect(exitCode).toBe(2);
|
||||
});
|
||||
} else {
|
||||
console.log("Worker thread started");
|
||||
|
||||
parentPort!.on("message", message => {
|
||||
console.log(`Worker received: ${message}`);
|
||||
|
||||
console.log("About to call process.exit(2)...");
|
||||
process.exit(2);
|
||||
console.log("process.exit(2) called");
|
||||
|
||||
parentPort!.postMessage("i'm still alive!");
|
||||
});
|
||||
|
||||
console.log("Worker is ready, waiting for messages...");
|
||||
}
|
||||
@@ -6,7 +6,7 @@ import { Worker, isMainThread, workerData } from "worker_threads";
|
||||
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
|
||||
|
||||
const actions = {
|
||||
async ["Bun.connect"](port) {
|
||||
async ["Bun.connect"](port: number) {
|
||||
await Bun.connect({
|
||||
hostname: "localhost",
|
||||
port,
|
||||
@@ -19,7 +19,7 @@ const actions = {
|
||||
},
|
||||
});
|
||||
},
|
||||
async ["Bun.listen"](port) {
|
||||
async ["Bun.listen"](port: number) {
|
||||
const server = Bun.listen({
|
||||
hostname: "localhost",
|
||||
port: 0,
|
||||
@@ -32,7 +32,7 @@ const actions = {
|
||||
},
|
||||
});
|
||||
},
|
||||
async ["fetch"](port) {
|
||||
async ["fetch"](port: number) {
|
||||
const resp = await fetch("http://localhost:" + port);
|
||||
await resp.blob();
|
||||
},
|
||||
@@ -65,12 +65,10 @@ if (isMainThread) {
|
||||
const { promise, resolve, reject } = Promise.withResolvers();
|
||||
promises.push(promise);
|
||||
|
||||
worker.on("online", () => {
|
||||
sleep(1)
|
||||
.then(() => {
|
||||
return worker.terminate();
|
||||
})
|
||||
.finally(resolve);
|
||||
worker.once("online", async () => {
|
||||
await sleep(1);
|
||||
await worker.terminate();
|
||||
resolve();
|
||||
});
|
||||
worker.on("error", e => reject(e));
|
||||
}
|
||||
|
||||
17
test/js/third_party/astro/astro-post.test.js
vendored
17
test/js/third_party/astro/astro-post.test.js
vendored
@@ -4,21 +4,22 @@ import { bunEnv, nodeExe } from "harness";
|
||||
import { join } from "path";
|
||||
|
||||
const fixtureDir = join(import.meta.dirname, "fixtures");
|
||||
function postNodeFormData(port) {
|
||||
const result = Bun.spawnSync({
|
||||
async function postNodeFormData(port) {
|
||||
const result = Bun.spawn({
|
||||
cmd: [nodeExe(), join(fixtureDir, "node-form-data.fetch.fixture.js"), port?.toString()],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "inherit", "inherit"],
|
||||
});
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(await result.exited).toBe(0);
|
||||
}
|
||||
function postNodeAction(port) {
|
||||
const result = Bun.spawnSync({
|
||||
async function postNodeAction(port) {
|
||||
const result = Bun.spawn({
|
||||
cmd: [nodeExe(), join(fixtureDir, "node-action.fetch.fixture.js"), port?.toString()],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "inherit", "inherit"],
|
||||
});
|
||||
expect(result.exitCode).toBe(0);
|
||||
|
||||
expect(await result.exited).toBe(0);
|
||||
}
|
||||
|
||||
describe("astro", async () => {
|
||||
@@ -66,7 +67,7 @@ describe("astro", async () => {
|
||||
});
|
||||
|
||||
test("is able todo a POST request to an astro action using node", async () => {
|
||||
postNodeAction(previewServer.port);
|
||||
await postNodeAction(previewServer.port);
|
||||
});
|
||||
|
||||
test("is able to post form data to an astro using bun", async () => {
|
||||
@@ -89,6 +90,6 @@ describe("astro", async () => {
|
||||
});
|
||||
});
|
||||
test("is able to post form data to an astro using node", async () => {
|
||||
postNodeFormData(previewServer.port);
|
||||
await postNodeFormData(previewServer.port);
|
||||
});
|
||||
});
|
||||
|
||||
1
test/js/third_party/astro/fixtures/.gitignore
vendored
Normal file
1
test/js/third_party/astro/fixtures/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
.astro
|
||||
6
test/js/third_party/piscina/package.json
vendored
Normal file
6
test/js/third_party/piscina/package.json
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "piscina-test",
|
||||
"dependencies": {
|
||||
"piscina": "5.0.0"
|
||||
}
|
||||
}
|
||||
22
test/js/third_party/piscina/piscina-atomics.test.ts
vendored
Normal file
22
test/js/third_party/piscina/piscina-atomics.test.ts
vendored
Normal file
@@ -0,0 +1,22 @@
|
||||
import { resolve } from "node:path";
|
||||
import { test } from "node:test";
|
||||
import { Piscina } from "piscina";
|
||||
|
||||
test("piscina atomics", async () => {
|
||||
const pool = new Piscina<void, void>({
|
||||
filename: resolve(__dirname, "simple.fixture.ts"),
|
||||
minThreads: 2,
|
||||
maxThreads: 2,
|
||||
atomics: "sync",
|
||||
});
|
||||
|
||||
const tasks: Promise<void>[] = [];
|
||||
|
||||
for (let i = 1; i <= 10000; i++) {
|
||||
tasks.push(pool.run());
|
||||
}
|
||||
|
||||
await Promise.all(tasks);
|
||||
|
||||
await pool.destroy();
|
||||
});
|
||||
63
test/js/third_party/piscina/piscina.test.ts
vendored
Normal file
63
test/js/third_party/piscina/piscina.test.ts
vendored
Normal file
@@ -0,0 +1,63 @@
|
||||
import { expect, test } from "bun:test";
|
||||
import { join } from "node:path";
|
||||
import { Piscina } from "piscina";
|
||||
|
||||
setTimeout(() => {
|
||||
console.error(new Error("Catastrophic failure, exiting so test can fail"));
|
||||
process.exit(1);
|
||||
}, 10 * 1000).unref();
|
||||
|
||||
test("Piscina basic functionality", async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: join(import.meta.dir, "worker.fixture.ts"),
|
||||
});
|
||||
|
||||
const result = await piscina.run({ a: 4, b: 6 });
|
||||
expect(result).toBe(10);
|
||||
|
||||
await piscina.destroy();
|
||||
});
|
||||
|
||||
test("Piscina event loop cleanup", async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: join(import.meta.dir, "worker.fixture.ts"),
|
||||
});
|
||||
|
||||
const results = await Promise.all([
|
||||
piscina.run({ a: 1, b: 2 }),
|
||||
piscina.run({ a: 3, b: 4 }),
|
||||
piscina.run({ a: 5, b: 6 }),
|
||||
]);
|
||||
|
||||
expect(results).toEqual([3, 7, 11]);
|
||||
|
||||
await piscina.destroy();
|
||||
});
|
||||
|
||||
test("Piscina with idleTimeout", async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: join(import.meta.dir, "worker.fixture.ts"),
|
||||
idleTimeout: 100,
|
||||
maxThreads: 1,
|
||||
});
|
||||
|
||||
const result = await piscina.run({ a: 10, b: 20 });
|
||||
expect(result).toBe(30);
|
||||
|
||||
await piscina.destroy();
|
||||
});
|
||||
|
||||
test("Piscina error handling", async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: join(import.meta.dir, "worker-error.fixture.ts"),
|
||||
});
|
||||
|
||||
const p = await piscina.run({ shouldThrow: true }).then(
|
||||
() => true,
|
||||
() => false,
|
||||
);
|
||||
|
||||
expect(p).toBe(false);
|
||||
|
||||
await piscina.destroy();
|
||||
});
|
||||
10
test/js/third_party/piscina/simple.fixture.ts
vendored
Normal file
10
test/js/third_party/piscina/simple.fixture.ts
vendored
Normal file
@@ -0,0 +1,10 @@
|
||||
// https://github.com/piscinajs/piscina/blob/ba396ced7afc08a8c16f65fbc367a9b7f4d7e84c/test/fixtures/simple-isworkerthread.ts#L7
|
||||
|
||||
import assert from "assert";
|
||||
import Piscina from "piscina";
|
||||
|
||||
assert.strictEqual(Piscina.isWorkerThread, true);
|
||||
|
||||
export default function () {
|
||||
return "done";
|
||||
}
|
||||
7
test/js/third_party/piscina/worker-error.fixture.ts
vendored
Normal file
7
test/js/third_party/piscina/worker-error.fixture.ts
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
export default ({ shouldThrow }: { shouldThrow: boolean }) => {
|
||||
if (shouldThrow) {
|
||||
throw new Error("Worker error for testing");
|
||||
}
|
||||
|
||||
return "success";
|
||||
};
|
||||
4
test/js/third_party/piscina/worker.fixture.ts
vendored
Normal file
4
test/js/third_party/piscina/worker.fixture.ts
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
export default ({ a, b }: { a: number; b: number }) => {
|
||||
console.log("Worker: calculating", a, "+", b);
|
||||
return a + b;
|
||||
};
|
||||
308
test/js/web/workers/message-port-lifecycle.test.ts
Normal file
308
test/js/web/workers/message-port-lifecycle.test.ts
Normal file
@@ -0,0 +1,308 @@
|
||||
import assert from "node:assert";
|
||||
import { test } from "node:test";
|
||||
import { MessageChannel, receiveMessageOnPort, Worker } from "worker_threads";
|
||||
|
||||
test("MessagePort postMessage respects event loop timing", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const messages: string[] = [];
|
||||
let messageCount = 0;
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", msg => {
|
||||
messages.push(`received: ${msg}`);
|
||||
messageCount++;
|
||||
|
||||
if (messageCount === 3) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
port2.start();
|
||||
|
||||
setImmediate(() => {
|
||||
messages.push("setImmediate 1");
|
||||
port1.postMessage("message1");
|
||||
});
|
||||
|
||||
setImmediate(() => {
|
||||
messages.push("setImmediate 2");
|
||||
port1.postMessage("message2");
|
||||
});
|
||||
|
||||
setImmediate(() => {
|
||||
messages.push("setImmediate 3");
|
||||
port1.postMessage("message3");
|
||||
});
|
||||
|
||||
await promise;
|
||||
|
||||
assert.deepStrictEqual(messages, [
|
||||
"setImmediate 1",
|
||||
"setImmediate 2",
|
||||
"setImmediate 3",
|
||||
"received: message1",
|
||||
"received: message2",
|
||||
"received: message3",
|
||||
]);
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
|
||||
test("MessagePort messages execute after process.nextTick (Node.js compatibility)", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const executionOrder: string[] = [];
|
||||
let messageReceived = false;
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", () => {
|
||||
executionOrder.push("message received");
|
||||
messageReceived = true;
|
||||
resolve();
|
||||
});
|
||||
|
||||
port2.start();
|
||||
|
||||
port1.postMessage("test");
|
||||
|
||||
process.nextTick(() => {
|
||||
executionOrder.push("nextTick 1");
|
||||
});
|
||||
|
||||
process.nextTick(() => {
|
||||
executionOrder.push("nextTick 2");
|
||||
});
|
||||
|
||||
await promise;
|
||||
|
||||
await new Promise(resolve => setImmediate(resolve));
|
||||
|
||||
assert.strictEqual(messageReceived, true);
|
||||
assert.strictEqual(executionOrder[0], "nextTick 1");
|
||||
assert.strictEqual(executionOrder[1], "nextTick 2");
|
||||
assert.strictEqual(executionOrder[2], "message received");
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
|
||||
test("MessagePort message delivery works with workers", async () => {
|
||||
const worker = new Worker(
|
||||
`
|
||||
const { parentPort, MessageChannel } = require('worker_threads');
|
||||
|
||||
parentPort.on('message', ({ port }) => {
|
||||
let count = 0;
|
||||
|
||||
port.on('message', (msg) => {
|
||||
count++;
|
||||
|
||||
port.postMessage(\`echo-\${count}: \${msg}\`);
|
||||
|
||||
if (count >= 3) {
|
||||
port.close();
|
||||
parentPort.postMessage('done');
|
||||
}
|
||||
});
|
||||
|
||||
port.start();
|
||||
parentPort.postMessage('ready');
|
||||
});
|
||||
`,
|
||||
{ eval: true },
|
||||
);
|
||||
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const messages: string[] = [];
|
||||
let readyReceived = false;
|
||||
let doneReceived = false;
|
||||
|
||||
const { promise, resolve, reject: rejectWorker } = Promise.withResolvers<void>();
|
||||
const {
|
||||
promise: allMessagesReceived,
|
||||
resolve: resolveAllMessages,
|
||||
reject: rejectAllMessages,
|
||||
} = Promise.withResolvers<void>();
|
||||
|
||||
AbortSignal.timeout(100).addEventListener("abort", () => {
|
||||
worker.terminate();
|
||||
rejectWorker(new Error("timeout"));
|
||||
rejectAllMessages(new Error("timeout"));
|
||||
});
|
||||
|
||||
worker.on("message", msg => {
|
||||
if (msg === "ready") {
|
||||
readyReceived = true;
|
||||
|
||||
port1.postMessage("hello1");
|
||||
port1.postMessage("hello2");
|
||||
port1.postMessage("hello3");
|
||||
} else if (msg === "done") {
|
||||
doneReceived = true;
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
port1.on("message", msg => {
|
||||
messages.push(msg);
|
||||
|
||||
if (messages.length === 3) {
|
||||
resolveAllMessages();
|
||||
}
|
||||
});
|
||||
|
||||
worker.postMessage({ port: port2 }, [port2]);
|
||||
|
||||
await Promise.all([promise, allMessagesReceived]);
|
||||
|
||||
assert.strictEqual(readyReceived, true);
|
||||
assert.strictEqual(doneReceived, true);
|
||||
assert.strictEqual(messages.length, 3);
|
||||
assert.deepStrictEqual(messages, ["echo-1: hello1", "echo-2: hello2", "echo-3: hello3"]);
|
||||
|
||||
port1.close();
|
||||
worker.terminate();
|
||||
});
|
||||
|
||||
test("MessagePort messages don't starve microtasks", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const executionOrder: string[] = [];
|
||||
let messageCount = 0;
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", () => {
|
||||
messageCount++;
|
||||
executionOrder.push(`message-${messageCount}`);
|
||||
|
||||
if (messageCount === 3) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
port2.start();
|
||||
|
||||
queueMicrotask(() => {
|
||||
executionOrder.push("microtask-1");
|
||||
});
|
||||
|
||||
port1.postMessage("msg1");
|
||||
|
||||
queueMicrotask(() => {
|
||||
executionOrder.push("microtask-2");
|
||||
});
|
||||
|
||||
port1.postMessage("msg2");
|
||||
|
||||
queueMicrotask(() => {
|
||||
executionOrder.push("microtask-3");
|
||||
});
|
||||
|
||||
port1.postMessage("msg3");
|
||||
|
||||
await promise;
|
||||
|
||||
assert(executionOrder.includes("microtask-1"));
|
||||
assert(executionOrder.includes("microtask-2"));
|
||||
assert(executionOrder.includes("microtask-3"));
|
||||
assert(executionOrder.includes("message-1"));
|
||||
assert(executionOrder.includes("message-2"));
|
||||
assert(executionOrder.includes("message-3"));
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
|
||||
test("high volume MessagePort operations maintain order", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const TOTAL_MESSAGES = 100;
|
||||
const receivedMessages: number[] = [];
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", msg => {
|
||||
receivedMessages.push(msg);
|
||||
|
||||
if (receivedMessages.length === TOTAL_MESSAGES) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
port2.start();
|
||||
|
||||
for (let i = 0; i < TOTAL_MESSAGES; i++) {
|
||||
port1.postMessage(i);
|
||||
}
|
||||
|
||||
await promise;
|
||||
|
||||
assert.strictEqual(receivedMessages.length, TOTAL_MESSAGES);
|
||||
for (let i = 0; i < TOTAL_MESSAGES; i++) {
|
||||
assert.strictEqual(receivedMessages[i], i);
|
||||
}
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
|
||||
test("MessagePort close behavior during message handling", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
let messageReceived = false;
|
||||
let errorThrown = false;
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", () => {
|
||||
messageReceived = true;
|
||||
|
||||
port2.close();
|
||||
|
||||
try {
|
||||
port1.postMessage("after-close");
|
||||
} catch (e) {
|
||||
errorThrown = true;
|
||||
}
|
||||
|
||||
setTimeout(resolve, 10);
|
||||
});
|
||||
|
||||
port2.start();
|
||||
port1.postMessage("test");
|
||||
|
||||
await promise;
|
||||
|
||||
assert.strictEqual(messageReceived, true);
|
||||
|
||||
assert.strictEqual(errorThrown, false);
|
||||
|
||||
port1.close();
|
||||
});
|
||||
|
||||
test("receiveMessageOnPort synchronous message retrieval", () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
port1.postMessage("msg1");
|
||||
port1.postMessage("msg2");
|
||||
port1.postMessage("msg3");
|
||||
|
||||
const result1 = receiveMessageOnPort(port2);
|
||||
const result2 = receiveMessageOnPort(port2);
|
||||
const result3 = receiveMessageOnPort(port2);
|
||||
const result4 = receiveMessageOnPort(port2);
|
||||
|
||||
assert.strictEqual(result1?.message, "msg1");
|
||||
assert.strictEqual(result2?.message, "msg2");
|
||||
assert.strictEqual(result3?.message, "msg3");
|
||||
assert.strictEqual(result4, undefined);
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
@@ -0,0 +1,27 @@
|
||||
import assert from "node:assert";
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { test } from "node:test";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { Worker } from "node:worker_threads";
|
||||
import stripAnsi from "strip-ansi";
|
||||
|
||||
const IS_CHILD = process.env.IS_CHILD === "true";
|
||||
|
||||
// At the time of writing, this test file passes in Node.js and fails in Bun.
|
||||
// Node.js seems to wait for the exit event to happen before the parent process
|
||||
// exits, which means that the Worker's exit code is printed to stdout
|
||||
|
||||
if (IS_CHILD) {
|
||||
const worker = new Worker("process.exit(1)", { eval: true });
|
||||
worker.on("exit", code => console.log(code));
|
||||
} else {
|
||||
test("The worker exit event is emitted before the parent exits", async () => {
|
||||
const file = fileURLToPath(import.meta.url);
|
||||
|
||||
const { stdout } = spawnSync(process.execPath, [file], {
|
||||
env: { ...process.env, IS_CHILD: "true" },
|
||||
});
|
||||
|
||||
assert.strictEqual(stripAnsi(stdout.toString()).trim(), "1");
|
||||
});
|
||||
}
|
||||
293
test/js/web/workers/worker-memory-leak.test.ts
Normal file
293
test/js/web/workers/worker-memory-leak.test.ts
Normal file
@@ -0,0 +1,293 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { Worker as WebWorker } from "worker_threads";
|
||||
|
||||
const CONFIG = {
|
||||
WARMUP_ITERATIONS: 2,
|
||||
TEST_ITERATIONS: 10,
|
||||
BATCH_SIZE: 5,
|
||||
MEMORY_THRESHOLD_MB: 20,
|
||||
GC_SETTLE_TIME: 50,
|
||||
TEST_TIMEOUT_MS: 15000,
|
||||
};
|
||||
|
||||
interface MemorySnapshot {
|
||||
rss: number;
|
||||
heapUsed: number;
|
||||
heapTotal: number;
|
||||
external: number;
|
||||
}
|
||||
|
||||
function takeMemorySnapshot(): MemorySnapshot {
|
||||
const mem = process.memoryUsage();
|
||||
return {
|
||||
rss: Math.round(mem.rss / 1024 / 1024),
|
||||
heapUsed: Math.round(mem.heapUsed / 1024 / 1024),
|
||||
heapTotal: Math.round(mem.heapTotal / 1024 / 1024),
|
||||
external: Math.round(mem.external / 1024 / 1024),
|
||||
};
|
||||
}
|
||||
|
||||
async function forceGCAndSettle(): Promise<void> {
|
||||
for (let i = 0; i < 2; i++) {
|
||||
Bun.gc(true);
|
||||
await Bun.sleep(CONFIG.GC_SETTLE_TIME);
|
||||
}
|
||||
}
|
||||
|
||||
function logMemoryDiff(before: MemorySnapshot, after: MemorySnapshot, label: string) {
|
||||
const rssDiff = after.rss - before.rss;
|
||||
const heapDiff = after.heapUsed - before.heapUsed;
|
||||
console.log(`${label}:`, {
|
||||
rss: `${before.rss}MB -> ${after.rss}MB (${rssDiff >= 0 ? "+" : ""}${rssDiff}MB)`,
|
||||
heap: `${before.heapUsed}MB -> ${after.heapUsed}MB (${heapDiff >= 0 ? "+" : ""}${heapDiff}MB)`,
|
||||
});
|
||||
|
||||
if (rssDiff > 50) {
|
||||
console.warn(`⚠️ Large memory increase detected: +${rssDiff}MB RSS`);
|
||||
}
|
||||
}
|
||||
|
||||
async function withTimeout<T>(promise: Promise<T>, ms: number, description: string): Promise<T> {
|
||||
const timeout = new Promise<never>((_, reject) => {
|
||||
setTimeout(() => reject(new Error(`${description} timed out after ${ms}ms`)), ms);
|
||||
});
|
||||
return Promise.race([promise, timeout]);
|
||||
}
|
||||
|
||||
async function runWorkerBatch(workerCode: string, batchSize: number = CONFIG.BATCH_SIZE): Promise<void> {
|
||||
const workers: WebWorker[] = [];
|
||||
|
||||
for (let i = 0; i < batchSize; i++) {
|
||||
const worker = new WebWorker(workerCode, { eval: true });
|
||||
workers.push(worker);
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
worker.removeAllListeners();
|
||||
reject(new Error(`Worker ${i} failed to respond within timeout`));
|
||||
}, 5000);
|
||||
|
||||
worker.once("message", msg => {
|
||||
clearTimeout(timeout);
|
||||
if (msg.error) {
|
||||
reject(new Error(msg.error));
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
await Promise.all(workers.map(worker => worker.terminate()));
|
||||
await forceGCAndSettle();
|
||||
}
|
||||
|
||||
describe("Worker Memory Leak Tests", () => {
|
||||
test(
|
||||
"workers should not leak memory with basic create/terminate cycles",
|
||||
async () => {
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
parentPort.postMessage('ready');
|
||||
`;
|
||||
|
||||
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} warmup iterations...`);
|
||||
|
||||
// warmup
|
||||
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
console.log(`Warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
|
||||
}
|
||||
|
||||
const baselineMemory = takeMemorySnapshot();
|
||||
console.log("Baseline memory after warmup:", baselineMemory);
|
||||
|
||||
console.log(`Running ${CONFIG.TEST_ITERATIONS} test iterations...`);
|
||||
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
|
||||
if ((i + 1) % 3 === 0) {
|
||||
const currentMemory = takeMemorySnapshot();
|
||||
console.log(`Test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
|
||||
}
|
||||
}
|
||||
|
||||
const finalMemory = takeMemorySnapshot();
|
||||
logMemoryDiff(baselineMemory, finalMemory, "Basic create/terminate test");
|
||||
|
||||
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
|
||||
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
|
||||
},
|
||||
CONFIG.TEST_TIMEOUT_MS,
|
||||
);
|
||||
|
||||
test(
|
||||
"workers with HTTP activity should not leak memory",
|
||||
async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
fetch() {
|
||||
return new Response("OK");
|
||||
},
|
||||
});
|
||||
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
|
||||
async function doWork() {
|
||||
try {
|
||||
const response = await fetch('http://localhost:${server.port}');
|
||||
await response.text();
|
||||
parentPort.postMessage('done');
|
||||
} catch (err) {
|
||||
parentPort.postMessage({ error: err.message });
|
||||
}
|
||||
}
|
||||
|
||||
doWork();
|
||||
`;
|
||||
|
||||
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} HTTP warmup iterations...`);
|
||||
|
||||
// warmup
|
||||
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
console.log(`HTTP warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
|
||||
}
|
||||
|
||||
const baselineMemory = takeMemorySnapshot();
|
||||
console.log("HTTP baseline memory after warmup:", baselineMemory);
|
||||
|
||||
console.log(`Running ${CONFIG.TEST_ITERATIONS} HTTP test iterations...`);
|
||||
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
|
||||
if ((i + 1) % 3 === 0) {
|
||||
const currentMemory = takeMemorySnapshot();
|
||||
console.log(`HTTP test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
|
||||
}
|
||||
}
|
||||
|
||||
const finalMemory = takeMemorySnapshot();
|
||||
logMemoryDiff(baselineMemory, finalMemory, "HTTP activity test");
|
||||
|
||||
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
|
||||
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
|
||||
},
|
||||
CONFIG.TEST_TIMEOUT_MS,
|
||||
);
|
||||
|
||||
test(
|
||||
"workers with message passing should not leak memory",
|
||||
async () => {
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
|
||||
parentPort.on('message', (msg) => {
|
||||
if (msg === 'start') {
|
||||
for (let j = 0; j < 10; j++) {
|
||||
parentPort.postMessage({ count: j, data: 'x'.repeat(1000) });
|
||||
}
|
||||
parentPort.postMessage('done');
|
||||
}
|
||||
});
|
||||
`;
|
||||
|
||||
async function runMessagePassingBatch(): Promise<void> {
|
||||
const workers: WebWorker[] = [];
|
||||
|
||||
for (let i = 0; i < CONFIG.BATCH_SIZE; i++) {
|
||||
const worker = new WebWorker(workerCode, { eval: true });
|
||||
workers.push(worker);
|
||||
|
||||
await new Promise<void>(resolve => {
|
||||
worker.on("message", msg => {
|
||||
if (msg === "done") {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
worker.postMessage("start");
|
||||
});
|
||||
}
|
||||
|
||||
await Promise.all(workers.map(worker => worker.terminate()));
|
||||
await forceGCAndSettle();
|
||||
}
|
||||
|
||||
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} message passing warmup iterations...`);
|
||||
|
||||
// warmup
|
||||
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
|
||||
await runMessagePassingBatch();
|
||||
console.log(`Message passing warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
|
||||
}
|
||||
|
||||
const baselineMemory = takeMemorySnapshot();
|
||||
console.log("Message passing baseline memory after warmup:", baselineMemory);
|
||||
|
||||
console.log(`Running ${CONFIG.TEST_ITERATIONS} message passing test iterations...`);
|
||||
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
|
||||
await runMessagePassingBatch();
|
||||
|
||||
if ((i + 1) % 3 === 0) {
|
||||
const currentMemory = takeMemorySnapshot();
|
||||
console.log(
|
||||
`Message passing test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const finalMemory = takeMemorySnapshot();
|
||||
logMemoryDiff(baselineMemory, finalMemory, "Message passing test");
|
||||
|
||||
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
|
||||
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
|
||||
},
|
||||
CONFIG.TEST_TIMEOUT_MS,
|
||||
);
|
||||
|
||||
test(
|
||||
"workers with timers should not leak memory",
|
||||
async () => {
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
|
||||
const timers = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
timers.push(setTimeout(() => {}, 10000));
|
||||
timers.push(setInterval(() => {}, 1000));
|
||||
}
|
||||
|
||||
parentPort.postMessage('ready');
|
||||
`;
|
||||
|
||||
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} timer warmup iterations...`);
|
||||
|
||||
// warmup
|
||||
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
console.log(`Timer warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
|
||||
}
|
||||
|
||||
const baselineMemory = takeMemorySnapshot();
|
||||
console.log("Timer baseline memory after warmup:", baselineMemory);
|
||||
|
||||
console.log(`Running ${CONFIG.TEST_ITERATIONS} timer test iterations...`);
|
||||
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
|
||||
if ((i + 1) % 3 === 0) {
|
||||
const currentMemory = takeMemorySnapshot();
|
||||
console.log(`Timer test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
|
||||
}
|
||||
}
|
||||
|
||||
const finalMemory = takeMemorySnapshot();
|
||||
logMemoryDiff(baselineMemory, finalMemory, "Timer cleanup test");
|
||||
|
||||
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
|
||||
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
|
||||
},
|
||||
CONFIG.TEST_TIMEOUT_MS,
|
||||
);
|
||||
});
|
||||
@@ -342,12 +342,12 @@ describe("worker_threads", () => {
|
||||
const worker = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
|
||||
smol: true,
|
||||
});
|
||||
await Bun.sleep(200);
|
||||
const code = await worker.terminate();
|
||||
expect(code).toBe(2);
|
||||
const [exitCode] = await once(worker, "exit");
|
||||
expect<number | undefined>(await worker.terminate()).toBe(undefined);
|
||||
expect<number | undefined>(exitCode).toBe(2);
|
||||
});
|
||||
|
||||
test.todo("worker terminating forcefully properly interrupts", async () => {
|
||||
test("worker terminating forcefully properly interrupts", async () => {
|
||||
const worker = new wt.Worker(new URL("worker-fixture-while-true.js", import.meta.url).href, {});
|
||||
await new Promise<void>(done => {
|
||||
worker.on("message", () => done());
|
||||
|
||||
@@ -67,6 +67,7 @@
|
||||
"pg-gateway": "0.3.0-beta.4",
|
||||
"pino": "9.4.0",
|
||||
"pino-pretty": "11.2.2",
|
||||
"piscina": "5.0.0",
|
||||
"postgres": "3.3.5",
|
||||
"prisma": "5.1.1",
|
||||
"prompts": "2.4.2",
|
||||
|
||||
Reference in New Issue
Block a user