mirror of
https://github.com/oven-sh/bun
synced 2026-02-03 07:28:53 +00:00
Compare commits
186 Commits
dylan/pyth
...
ali/piscin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a45c2632c3 | ||
|
|
a5e5ea2e60 | ||
|
|
499201be48 | ||
|
|
7e557e10fd | ||
|
|
b6ee037172 | ||
|
|
9effd4df31 | ||
|
|
0c0dc471da | ||
|
|
41457fb37b | ||
|
|
c43a251d9c | ||
|
|
8d0eeba251 | ||
|
|
c354cfd86e | ||
|
|
16ad3d9f9d | ||
|
|
ca1e7022eb | ||
|
|
cbdd965643 | ||
|
|
ee68193de7 | ||
|
|
2210add1a8 | ||
|
|
019c0c12d0 | ||
|
|
ca8d9968fe | ||
|
|
1015e0f978 | ||
|
|
3b8abfb5e1 | ||
|
|
4720de1be4 | ||
|
|
f452ca7c36 | ||
|
|
9afa4249e1 | ||
|
|
a8db2da0c9 | ||
|
|
b7b2fbe2a4 | ||
|
|
36f98aad85 | ||
|
|
d038264002 | ||
|
|
bc3af0c0f5 | ||
|
|
853cbf42ee | ||
|
|
cae404e99b | ||
|
|
976052e541 | ||
|
|
501088189a | ||
|
|
6ad7b8a8b7 | ||
|
|
80684fe9c6 | ||
|
|
2ade52c24e | ||
|
|
1b11a433eb | ||
|
|
0c86f6dcc9 | ||
|
|
e4a780ac64 | ||
|
|
6ffcc53014 | ||
|
|
8c15bde769 | ||
|
|
04353da520 | ||
|
|
a9b9276ed5 | ||
|
|
3598b9401d | ||
|
|
5c754dc8cb | ||
|
|
96c18bdf26 | ||
|
|
e34d7592fd | ||
|
|
8b6aad0490 | ||
|
|
b3f8adad0b | ||
|
|
1963ddab8c | ||
|
|
26e6cd8d1f | ||
|
|
9981c87c56 | ||
|
|
222ff4d7b4 | ||
|
|
6550f8c251 | ||
|
|
5719568ec0 | ||
|
|
80a23c7142 | ||
|
|
975caa75b1 | ||
|
|
8143c0690c | ||
|
|
cdb7368151 | ||
|
|
041c62efbc | ||
|
|
18653f2a65 | ||
|
|
5d8d430a14 | ||
|
|
28bf87f586 | ||
|
|
30d546b7aa | ||
|
|
41763a0a79 | ||
|
|
2f671aacdc | ||
|
|
b72e83cd5a | ||
|
|
8dbaccf5e1 | ||
|
|
79b56ca9c3 | ||
|
|
77640c6290 | ||
|
|
252efe8dc5 | ||
|
|
4b015cf585 | ||
|
|
dc3bbdab96 | ||
|
|
e3d3015d47 | ||
|
|
9b6f4822f0 | ||
|
|
49d1090dbb | ||
|
|
8a0c11c331 | ||
|
|
637325f4b4 | ||
|
|
7df0e39660 | ||
|
|
f5b2d2181f | ||
|
|
35f69d9001 | ||
|
|
3bee77904a | ||
|
|
5bfed41d1a | ||
|
|
461c9db98a | ||
|
|
363edf7b36 | ||
|
|
ca18b5e410 | ||
|
|
92c19a2dd3 | ||
|
|
7be4234c0e | ||
|
|
7499f99fc8 | ||
|
|
9ffc850887 | ||
|
|
468e2b9ee4 | ||
|
|
bc743a9766 | ||
|
|
8a2fa506bb | ||
|
|
e6bc9fbb77 | ||
|
|
9f4157d934 | ||
|
|
8980ec7829 | ||
|
|
7bfdcd8da2 | ||
|
|
fa5a6d4380 | ||
|
|
b6026d75df | ||
|
|
98c265fd36 | ||
|
|
9df2b09768 | ||
|
|
a951a85d82 | ||
|
|
963ba1b632 | ||
|
|
84bb592f87 | ||
|
|
db0de23737 | ||
|
|
5b9abfa756 | ||
|
|
696343b9fd | ||
|
|
799d7058d9 | ||
|
|
2066644e16 | ||
|
|
224c9bfc05 | ||
|
|
978b94644c | ||
|
|
48734ac20b | ||
|
|
7fcf9095c3 | ||
|
|
4e451f9521 | ||
|
|
d23afd6c9b | ||
|
|
c9e92e4a5a | ||
|
|
ecf262f250 | ||
|
|
89ef4c6b52 | ||
|
|
2420b0726d | ||
|
|
2ce5b9eab7 | ||
|
|
d94996bf68 | ||
|
|
fbb21b8ebc | ||
|
|
b4282814ff | ||
|
|
833ee60b5c | ||
|
|
0793881e56 | ||
|
|
b2d0899442 | ||
|
|
182470909e | ||
|
|
bc39cd2806 | ||
|
|
5fac9749e1 | ||
|
|
07fc765d9c | ||
|
|
b50d5dfc83 | ||
|
|
b77139967e | ||
|
|
eacd40daab | ||
|
|
09cbcfe20a | ||
|
|
1b332ec147 | ||
|
|
da657db887 | ||
|
|
2a09ad8e75 | ||
|
|
922bc0e0b3 | ||
|
|
c59fd309c4 | ||
|
|
4bf3836f0b | ||
|
|
bf0510aea4 | ||
|
|
7bb256ef4f | ||
|
|
35a528fb99 | ||
|
|
8a8a23c5d6 | ||
|
|
10551fcc90 | ||
|
|
2c0f8f29b6 | ||
|
|
0d9ca788c6 | ||
|
|
832b74a239 | ||
|
|
cd67a3a024 | ||
|
|
5f0c054160 | ||
|
|
3e64050b67 | ||
|
|
5326514afc | ||
|
|
a2bd8ea622 | ||
|
|
836fbfefe8 | ||
|
|
b105877790 | ||
|
|
daf21ad44d | ||
|
|
2d01a3d1e7 | ||
|
|
a10e268584 | ||
|
|
3bc5724ff4 | ||
|
|
b6d4a15496 | ||
|
|
1d1e88faa7 | ||
|
|
54788db012 | ||
|
|
59d2941e25 | ||
|
|
9276453ca4 | ||
|
|
c192b8e474 | ||
|
|
b0f756777c | ||
|
|
6640b4dde4 | ||
|
|
a9a4d483aa | ||
|
|
fe95423dd8 | ||
|
|
5e79575215 | ||
|
|
9f74448006 | ||
|
|
93650f7317 | ||
|
|
aa32166827 | ||
|
|
72e7177fff | ||
|
|
63b9d04a38 | ||
|
|
006cdf353b | ||
|
|
919454ea30 | ||
|
|
b736d6b364 | ||
|
|
39f9aeb500 | ||
|
|
0f5e9b2dcd | ||
|
|
4c44a553d3 | ||
|
|
2ec4f39747 | ||
|
|
236ff12a2d | ||
|
|
b1e5826ad6 | ||
|
|
b06997e2b1 | ||
|
|
4e6c44d5bc | ||
|
|
1884a8197e |
@@ -1,6 +1,6 @@
|
||||
---
|
||||
description:
|
||||
globs: src/**/*.cpp,src/**/*.zig
|
||||
description: How to build Bun
|
||||
globs: **/*.zig,**/*.ts,**/*.cpp,**/*.c,src/**/*.cpp,src/**/*.zig
|
||||
alwaysApply: false
|
||||
---
|
||||
|
||||
@@ -23,6 +23,12 @@ bun bd <file> <...args>
|
||||
|
||||
**CRITICAL**: Never use `bun <file>` directly. It will not have your changes.
|
||||
|
||||
You can also pass arguments to `bun bd` as if it were a regular build of bun. For example:
|
||||
|
||||
```bash
|
||||
bun bd test math-utils.test.ts
|
||||
```
|
||||
|
||||
### Logging
|
||||
|
||||
`BUN_DEBUG_$(SCOPE)=1` enables debug logs for a specific debug log scope.
|
||||
|
||||
4
.gitignore
vendored
4
.gitignore
vendored
@@ -1,3 +1,7 @@
|
||||
.cursor/rules/vibe-tools.mdc
|
||||
vibe-tools.config.json
|
||||
.repomix-output.txt
|
||||
repomix.config.json
|
||||
.claude/settings.local.json
|
||||
.DS_Store
|
||||
.env
|
||||
|
||||
@@ -1703,6 +1703,10 @@ pub fn spawnMaybeSync(
|
||||
defer {
|
||||
jsc_vm.uwsLoop().internal_loop_data.jsc_vm = old_vm;
|
||||
}
|
||||
|
||||
jsc_vm.eventLoop().is_inside_spawn_sync = true;
|
||||
defer jsc_vm.eventLoop().is_inside_spawn_sync = false;
|
||||
|
||||
while (subprocess.hasPendingActivityNonThreadsafe()) {
|
||||
if (subprocess.stdin == .buffer) {
|
||||
subprocess.stdin.buffer.watch();
|
||||
|
||||
@@ -57,7 +57,7 @@ static const DOMException::Description descriptions[] = {
|
||||
{ "QuotaExceededError"_s, "The quota has been exceeded."_s, 22 },
|
||||
{ "TimeoutError"_s, "The operation timed out."_s, 23 },
|
||||
{ "InvalidNodeTypeError"_s, "The supplied node is incorrect or has an incorrect ancestor for this operation."_s, 24 },
|
||||
{ "DataCloneError"_s, "The object can not be cloned."_s, 25 },
|
||||
{ "DataCloneError"_s, "The object could not be cloned."_s, 25 }, // TODO: This should include an inspection of the object (e.g. "DOMException [DataCloneError]: hello () {} could not be cloned.")
|
||||
{ "EncodingError"_s, "The encoding operation (either encoded or decoding) failed."_s, 0 },
|
||||
{ "NotReadableError"_s, "The I/O read operation failed."_s, 0 },
|
||||
{ "UnknownError"_s, "The operation failed for an unknown transient reason (e.g. out of memory)."_s, 0 },
|
||||
|
||||
@@ -600,9 +600,8 @@ pub const JSGlobalObject = opaque {
|
||||
return @as(*jsc.VirtualMachine, @ptrCast(@alignCast(this.bunVMUnsafe())));
|
||||
}
|
||||
|
||||
extern fn JSC__JSGlobalObject__handleRejectedPromises(*JSGlobalObject) void;
|
||||
pub fn handleRejectedPromises(this: *JSGlobalObject) void {
|
||||
return bun.jsc.fromJSHostCallGeneric(this, @src(), JSC__JSGlobalObject__handleRejectedPromises, .{this}) catch @panic("unreachable");
|
||||
return bun.cpp.JSC__JSGlobalObject__handleRejectedPromises(this);
|
||||
}
|
||||
|
||||
extern fn ZigGlobalObject__readableStreamToArrayBuffer(*JSGlobalObject, JSValue) JSValue;
|
||||
|
||||
@@ -383,6 +383,19 @@ void ScriptExecutionContext::postTask(EventLoopTask* task)
|
||||
static_cast<Zig::GlobalObject*>(m_globalObject)->queueTask(task);
|
||||
}
|
||||
|
||||
extern "C" void Bun__queueImmediateCppTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
|
||||
|
||||
void ScriptExecutionContext::queueImmediateCppTask(Function<void(ScriptExecutionContext&)>&& lambda)
|
||||
{
|
||||
auto* task = new EventLoopTask(WTFMove(lambda));
|
||||
queueImmediateCppTask(task);
|
||||
}
|
||||
|
||||
void ScriptExecutionContext::queueImmediateCppTask(EventLoopTask* task)
|
||||
{
|
||||
Bun__queueImmediateCppTask(m_globalObject, task);
|
||||
}
|
||||
|
||||
// Zig bindings
|
||||
extern "C" ScriptExecutionContextIdentifier ScriptExecutionContextIdentifier__forGlobalObject(JSC::JSGlobalObject* globalObject)
|
||||
{
|
||||
|
||||
@@ -131,6 +131,9 @@ public:
|
||||
// Executes the task on context's thread asynchronously.
|
||||
void postTask(EventLoopTask* task);
|
||||
|
||||
void queueImmediateCppTask(Function<void(ScriptExecutionContext&)>&& lambda);
|
||||
void queueImmediateCppTask(EventLoopTask* task);
|
||||
|
||||
template<typename... Arguments>
|
||||
void postCrossThreadTask(Arguments&&... arguments)
|
||||
{
|
||||
@@ -153,6 +156,14 @@ public:
|
||||
|
||||
static ScriptExecutionContext* getMainThreadScriptExecutionContext();
|
||||
|
||||
bool canSendMessage()
|
||||
{
|
||||
static constexpr size_t maxMessagesPerTick = 1000;
|
||||
return m_messagesSentThisTick < maxMessagesPerTick;
|
||||
}
|
||||
void incrementMessageCount() { m_messagesSentThisTick++; }
|
||||
void resetMessageCount() { m_messagesSentThisTick = 0; }
|
||||
|
||||
private:
|
||||
JSC::VM* m_vm = nullptr;
|
||||
JSC::JSGlobalObject* m_globalObject = nullptr;
|
||||
@@ -165,6 +176,7 @@ private:
|
||||
LazyRef<ScriptExecutionContext, BunBroadcastChannelRegistry> m_broadcastChannelRegistry;
|
||||
|
||||
bool m_willProcessMessageWithMessagePortsSoon { false };
|
||||
size_t m_messagesSentThisTick { 0 };
|
||||
|
||||
us_socket_context_t* webSocketContextSSL();
|
||||
us_socket_context_t* webSocketContextNoSSL();
|
||||
|
||||
@@ -2933,6 +2933,7 @@ extern "C" void JSGlobalObject__clearTerminationException(JSC::JSGlobalObject* g
|
||||
}
|
||||
|
||||
extern "C" void Bun__queueTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
|
||||
extern "C" void Bun__queueImmediateCppTask(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
|
||||
extern "C" void Bun__queueTaskConcurrently(JSC::JSGlobalObject*, WebCore::EventLoopTask* task);
|
||||
extern "C" [[ZIG_EXPORT(check_slow)]] void Bun__performTask(Zig::GlobalObject* globalObject, WebCore::EventLoopTask* task)
|
||||
{
|
||||
@@ -2956,6 +2957,11 @@ void GlobalObject::queueTask(WebCore::EventLoopTask* task)
|
||||
Bun__queueTask(this, task);
|
||||
}
|
||||
|
||||
void GlobalObject::queueImmediateCppTask(WebCore::EventLoopTask* task)
|
||||
{
|
||||
Bun__queueImmediateCppTask(this, task);
|
||||
}
|
||||
|
||||
void GlobalObject::queueTaskConcurrently(WebCore::EventLoopTask* task)
|
||||
{
|
||||
Bun__queueTaskConcurrently(this, task);
|
||||
@@ -2972,7 +2978,10 @@ void GlobalObject::handleRejectedPromises()
|
||||
continue;
|
||||
|
||||
Bun__handleRejectedPromise(this, promise);
|
||||
if (auto ex = scope.exception()) this->reportUncaughtExceptionAtEventLoop(this, ex);
|
||||
if (auto ex = scope.exception()) {
|
||||
scope.clearException();
|
||||
this->reportUncaughtExceptionAtEventLoop(this, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -171,6 +171,7 @@ public:
|
||||
|
||||
void queueTask(WebCore::EventLoopTask* task);
|
||||
void queueTaskConcurrently(WebCore::EventLoopTask* task);
|
||||
void queueImmediateCppTask(WebCore::EventLoopTask* task);
|
||||
|
||||
JSDOMStructureMap& structures() WTF_REQUIRES_LOCK(m_gcLock) { return m_structures; }
|
||||
JSDOMStructureMap& structures(NoLockingNecessaryTag) WTF_IGNORES_THREAD_SAFETY_ANALYSIS
|
||||
|
||||
@@ -3667,6 +3667,7 @@ JSC::EncodedJSValue JSC__JSGlobalObject__generateHeapSnapshot(JSC::JSGlobalObjec
|
||||
|
||||
JSC::VM* JSC__JSGlobalObject__vm(JSC::JSGlobalObject* arg0) { return &arg0->vm(); };
|
||||
|
||||
[[ZIG_EXPORT(nothrow)]]
|
||||
void JSC__JSGlobalObject__handleRejectedPromises(JSC::JSGlobalObject* arg0)
|
||||
{
|
||||
return jsCast<Zig::GlobalObject*>(arg0)->handleRejectedPromises();
|
||||
|
||||
@@ -243,6 +243,71 @@ void MessagePort::close()
|
||||
removeAllEventListeners();
|
||||
}
|
||||
|
||||
void MessagePort::processMessages(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback)
|
||||
{
|
||||
auto& vm = context.vm();
|
||||
auto* globalObject = defaultGlobalObject(context.globalObject());
|
||||
|
||||
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running) {
|
||||
completionCallback();
|
||||
return;
|
||||
}
|
||||
|
||||
Vector<MessageWithMessagePorts> deferredMessages;
|
||||
|
||||
for (auto&& message : messages) {
|
||||
if (!context.canSendMessage()) {
|
||||
deferredMessages.append(WTFMove(message));
|
||||
continue;
|
||||
}
|
||||
|
||||
context.incrementMessageCount();
|
||||
|
||||
auto scope = DECLARE_CATCH_SCOPE(vm);
|
||||
|
||||
auto ports = MessagePort::entanglePorts(context, WTFMove(message.transferredPorts));
|
||||
auto event = MessageEvent::create(*context.jsGlobalObject(), message.message.releaseNonNull(), {}, {}, {}, WTFMove(ports));
|
||||
dispatchEvent(event.event);
|
||||
|
||||
if (scope.exception()) [[unlikely]] {
|
||||
RELEASE_ASSERT(vm.hasPendingTerminationException());
|
||||
return;
|
||||
}
|
||||
|
||||
// if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) == ScriptExecutionStatus::Running) {
|
||||
globalObject->drainMicrotasks();
|
||||
// }
|
||||
}
|
||||
|
||||
if (!deferredMessages.isEmpty()) {
|
||||
auto* globalObject = defaultGlobalObject(context.globalObject());
|
||||
auto scriptExecutionStatus = Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject);
|
||||
|
||||
if (scriptExecutionStatus != ScriptExecutionStatus::Running || context.isJSExecutionForbidden()) {
|
||||
completionCallback();
|
||||
return;
|
||||
}
|
||||
|
||||
// remaining messages should happen on the next on the immediate cpp task queue
|
||||
auto contextIdentifier = context.identifier();
|
||||
context.queueImmediateCppTask(
|
||||
[protectedThis = Ref { *this }, contextIdentifier, deferred = WTFMove(deferredMessages), completionCallback = WTFMove(completionCallback)](ScriptExecutionContext& ctx) mutable {
|
||||
if (auto* validContext = ScriptExecutionContext::getScriptExecutionContext(contextIdentifier)) {
|
||||
if (validContext == &ctx && !validContext->activeDOMObjectsAreSuspended() && protectedThis->isEntangled()) {
|
||||
// then reset for next tick
|
||||
ctx.resetMessageCount();
|
||||
protectedThis->processMessages(ctx, WTFMove(deferred), WTFMove(completionCallback));
|
||||
return;
|
||||
}
|
||||
}
|
||||
// context was destroyed or conditions not met, just complete
|
||||
completionCallback();
|
||||
});
|
||||
} else {
|
||||
completionCallback();
|
||||
}
|
||||
}
|
||||
|
||||
void MessagePort::dispatchMessages()
|
||||
{
|
||||
// Messages for contexts that are not fully active get dispatched too, but JSAbstractEventListener::handleEvent() doesn't call handlers for these.
|
||||
@@ -254,41 +319,14 @@ void MessagePort::dispatchMessages()
|
||||
return;
|
||||
|
||||
auto messagesTakenHandler = [this, protectedThis = Ref { *this }](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionCallback) mutable {
|
||||
auto scopeExit = makeScopeExit(WTFMove(completionCallback));
|
||||
|
||||
// LOG(MessagePorts, "MessagePort %s (%p) dispatching %zu messages", m_identifier.logString().utf8().data(), this, messages.size());
|
||||
|
||||
RefPtr<ScriptExecutionContext> context = scriptExecutionContext();
|
||||
if (!context || !context->globalObject())
|
||||
if (!context || !context->globalObject()) {
|
||||
completionCallback();
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(context->isContextThread());
|
||||
auto* globalObject = defaultGlobalObject(context->globalObject());
|
||||
Ref vm = globalObject->vm();
|
||||
auto scope = DECLARE_CATCH_SCOPE(vm);
|
||||
|
||||
for (auto& message : messages) {
|
||||
// close() in Worker onmessage handler should prevent next message from dispatching.
|
||||
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running)
|
||||
return;
|
||||
|
||||
auto ports = MessagePort::entanglePorts(*context, WTFMove(message.transferredPorts));
|
||||
if (scope.exception()) [[unlikely]] {
|
||||
// Currently, we assume that the only way we can get here is if we have a termination.
|
||||
RELEASE_ASSERT(vm->hasPendingTerminationException());
|
||||
return;
|
||||
}
|
||||
|
||||
// Per specification, each MessagePort object has a task source called the port message queue.
|
||||
// queueTaskKeepingObjectAlive(context, *this, TaskSource::PostedMessageQueue, [this, event = WTFMove(event)] {
|
||||
// dispatchEvent(event.event);
|
||||
// });
|
||||
|
||||
ScriptExecutionContext::postTaskTo(context->identifier(), [protectedThis = Ref { *this }, ports = WTFMove(ports), message = WTFMove(message)](ScriptExecutionContext& context) mutable {
|
||||
auto event = MessageEvent::create(*context.jsGlobalObject(), message.message.releaseNonNull(), {}, {}, {}, WTFMove(ports));
|
||||
protectedThis->dispatchEvent(event.event);
|
||||
});
|
||||
}
|
||||
processMessages(*context, WTFMove(messages), WTFMove(completionCallback));
|
||||
};
|
||||
|
||||
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(m_identifier, WTFMove(messagesTakenHandler));
|
||||
|
||||
@@ -144,9 +144,9 @@ private:
|
||||
MessagePortIdentifier m_remoteIdentifier;
|
||||
|
||||
mutable std::atomic<unsigned> m_refCount { 1 };
|
||||
void processMessages(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback);
|
||||
|
||||
bool m_hasRef { false };
|
||||
|
||||
uint32_t m_messageEventCount { 0 };
|
||||
static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind);
|
||||
};
|
||||
|
||||
@@ -69,8 +69,8 @@ namespace WebCore {
|
||||
|
||||
WTF_MAKE_TZONE_ALLOCATED_IMPL(Worker);
|
||||
|
||||
extern "C" void WebWorker__notifyNeedTermination(
|
||||
void* worker);
|
||||
extern "C" void WebWorkerLifecycleHandle__requestTermination(WebWorkerLifecycleHandle* worker);
|
||||
extern "C" void WebWorkerLifecycleHandle__release(WebWorkerLifecycleHandle* worker);
|
||||
|
||||
static Lock allWorkersLock;
|
||||
static HashMap<ScriptExecutionContextIdentifier, Worker*>& allWorkers() WTF_REQUIRES_LOCK(allWorkersLock)
|
||||
@@ -109,7 +109,7 @@ Worker::Worker(ScriptExecutionContext& context, WorkerOptions&& options)
|
||||
ASSERT_UNUSED(addResult, addResult.isNewEntry);
|
||||
}
|
||||
extern "C" bool WebWorker__updatePtr(void* worker, Worker* ptr);
|
||||
extern "C" void* WebWorker__create(
|
||||
extern "C" WebWorkerLifecycleHandle* WebWorkerLifecycleHandle__createWebWorker(
|
||||
Worker* worker,
|
||||
void* parent,
|
||||
BunString name,
|
||||
@@ -133,12 +133,12 @@ extern "C" void WebWorker__setRef(
|
||||
|
||||
void Worker::setKeepAlive(bool keepAlive)
|
||||
{
|
||||
WebWorker__setRef(impl_, keepAlive);
|
||||
WebWorker__setRef(lifecycleHandle_, keepAlive);
|
||||
}
|
||||
|
||||
bool Worker::updatePtr()
|
||||
{
|
||||
if (!WebWorker__updatePtr(impl_, this)) {
|
||||
if (!WebWorker__updatePtr(lifecycleHandle_, this)) {
|
||||
m_onlineClosingFlags = ClosingFlag;
|
||||
m_terminationFlags.fetch_or(TerminatedFlag);
|
||||
return false;
|
||||
@@ -189,7 +189,7 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
|
||||
return { reinterpret_cast<WTF::StringImpl**>(vec.begin()), vec.size() };
|
||||
})
|
||||
.value_or(std::span<WTF::StringImpl*> {});
|
||||
void* impl = WebWorker__create(
|
||||
WebWorkerLifecycleHandle* lifecycleHandle = WebWorkerLifecycleHandle__createWebWorker(
|
||||
worker.ptr(),
|
||||
bunVM(context.jsGlobalObject()),
|
||||
nameStr,
|
||||
@@ -212,11 +212,11 @@ ExceptionOr<Ref<Worker>> Worker::create(ScriptExecutionContext& context, const S
|
||||
|
||||
preloadModuleStrings.clear();
|
||||
|
||||
if (!impl) {
|
||||
if (!lifecycleHandle) {
|
||||
return Exception { TypeError, errorMessage.toWTFString(BunString::ZeroCopy) };
|
||||
}
|
||||
|
||||
worker->impl_ = impl;
|
||||
worker->lifecycleHandle_ = lifecycleHandle;
|
||||
worker->m_workerCreationTime = MonotonicTime::now();
|
||||
|
||||
return worker;
|
||||
@@ -228,6 +228,14 @@ Worker::~Worker()
|
||||
Locker locker { allWorkersLock };
|
||||
allWorkers().remove(m_clientIdentifier);
|
||||
}
|
||||
|
||||
if (lifecycleHandle_) {
|
||||
auto* impl = lifecycleHandle_;
|
||||
lifecycleHandle_ = nullptr;
|
||||
WebWorkerLifecycleHandle__requestTermination(impl);
|
||||
// release our reference back in web_worker.zig
|
||||
WebWorkerLifecycleHandle__release(impl);
|
||||
}
|
||||
// m_contextProxy.workerObjectDestroyed();
|
||||
}
|
||||
|
||||
@@ -261,9 +269,11 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
|
||||
|
||||
void Worker::terminate()
|
||||
{
|
||||
// m_contextProxy.terminateWorkerGlobalScope();
|
||||
m_terminationFlags.fetch_or(TerminateRequestedFlag);
|
||||
WebWorker__notifyNeedTermination(impl_);
|
||||
|
||||
auto* impl = lifecycleHandle_;
|
||||
lifecycleHandle_ = nullptr;
|
||||
WebWorkerLifecycleHandle__requestTermination(impl);
|
||||
}
|
||||
|
||||
// const char* Worker::activeDOMObjectName() const
|
||||
@@ -468,6 +478,7 @@ void Worker::forEachWorker(const Function<Function<void(ScriptExecutionContext&)
|
||||
extern "C" void WebWorker__dispatchExit(Zig::GlobalObject* globalObject, Worker* worker, int32_t exitCode)
|
||||
{
|
||||
worker->dispatchExit(exitCode);
|
||||
|
||||
// no longer referenced by Zig
|
||||
worker->deref();
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ class WorkerGlobalScopeProxy;
|
||||
|
||||
struct StructuredSerializeOptions;
|
||||
struct WorkerOptions;
|
||||
struct WebWorkerLifecycleHandle;
|
||||
|
||||
class Worker final : public ThreadSafeRefCounted<Worker>, public EventTargetWithInlineData, private ContextDestructionObserver {
|
||||
WTF_MAKE_TZONE_ALLOCATED(Worker);
|
||||
@@ -119,7 +120,7 @@ private:
|
||||
// Tracks TerminateRequestedFlag and TerminatedFlag
|
||||
std::atomic<uint8_t> m_terminationFlags { 0 };
|
||||
const ScriptExecutionContextIdentifier m_clientIdentifier;
|
||||
void* impl_ { nullptr };
|
||||
WebWorkerLifecycleHandle* lifecycleHandle_ { nullptr };
|
||||
};
|
||||
|
||||
JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject);
|
||||
|
||||
@@ -12,7 +12,9 @@ tasks: Queue = undefined,
|
||||
///
|
||||
/// Having two queues avoids infinite loops creating by calling `setImmediate` in a `setImmediate` callback.
|
||||
immediate_tasks: std.ArrayListUnmanaged(*Timer.ImmediateObject) = .{},
|
||||
immediate_cpp_tasks: std.ArrayListUnmanaged(*CppTask) = .{},
|
||||
next_immediate_tasks: std.ArrayListUnmanaged(*Timer.ImmediateObject) = .{},
|
||||
next_immediate_cpp_tasks: std.ArrayListUnmanaged(*CppTask) = .{},
|
||||
|
||||
concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
|
||||
global: *jsc.JSGlobalObject = undefined,
|
||||
@@ -29,6 +31,13 @@ imminent_gc_timer: std.atomic.Value(?*Timer.WTFTimer) = .{ .raw = null },
|
||||
|
||||
signal_handler: if (Environment.isPosix) ?*PosixSignalHandle else void = if (Environment.isPosix) null,
|
||||
|
||||
// this exists because while we're inside a spawnSync call, some tasks can actually
|
||||
// still complete which leads to a case where module resolution can partially complete and
|
||||
// some modules are only partialy evaluated which causes reference errors.
|
||||
// TODO: A better fix here could be a second event loop so we can come off the main one
|
||||
// while processing spawnSync, then resume back to here afterwards
|
||||
is_inside_spawn_sync: bool = false,
|
||||
|
||||
pub const Debug = if (Environment.isDebug) struct {
|
||||
is_inside_tick_queue: bool = false,
|
||||
js_call_count_outside_tick_queue: usize = 0,
|
||||
@@ -126,6 +135,12 @@ const DrainMicrotasksResult = enum(u8) {
|
||||
extern fn JSC__JSGlobalObject__drainMicrotasks(*jsc.JSGlobalObject) DrainMicrotasksResult;
|
||||
pub fn drainMicrotasksWithGlobal(this: *EventLoop, globalObject: *jsc.JSGlobalObject, jsc_vm: *jsc.VM) bun.JSTerminated!void {
|
||||
jsc.markBinding(@src());
|
||||
|
||||
// see is_inside_spawn_sync doc comment
|
||||
if (this.is_inside_spawn_sync) {
|
||||
return;
|
||||
}
|
||||
|
||||
jsc_vm.releaseWeakRefs();
|
||||
|
||||
switch (JSC__JSGlobalObject__drainMicrotasks(globalObject)) {
|
||||
@@ -220,10 +235,19 @@ fn tickWithCount(this: *EventLoop, virtual_machine: *VirtualMachine) u32 {
|
||||
|
||||
pub fn tickImmediateTasks(this: *EventLoop, virtual_machine: *VirtualMachine) void {
|
||||
var to_run_now = this.immediate_tasks;
|
||||
var to_run_now_cpp = this.immediate_cpp_tasks;
|
||||
|
||||
this.immediate_tasks = this.next_immediate_tasks;
|
||||
this.next_immediate_tasks = .{};
|
||||
|
||||
this.immediate_cpp_tasks = this.next_immediate_cpp_tasks;
|
||||
this.next_immediate_cpp_tasks = .{};
|
||||
|
||||
for (to_run_now_cpp.items) |task| {
|
||||
log("running immediate cpp task", .{});
|
||||
task.run(virtual_machine.global);
|
||||
}
|
||||
|
||||
var exception_thrown = false;
|
||||
for (to_run_now.items) |task| {
|
||||
exception_thrown = task.runImmediateTask(virtual_machine);
|
||||
@@ -234,6 +258,22 @@ pub fn tickImmediateTasks(this: *EventLoop, virtual_machine: *VirtualMachine) vo
|
||||
this.maybeDrainMicrotasks();
|
||||
}
|
||||
|
||||
if (this.next_immediate_cpp_tasks.capacity > 0) {
|
||||
// this would only occur if we were recursively running tickImmediateTasks.
|
||||
@branchHint(.unlikely);
|
||||
this.immediate_cpp_tasks.appendSlice(bun.default_allocator, this.next_immediate_cpp_tasks.items) catch bun.outOfMemory();
|
||||
this.next_immediate_cpp_tasks.deinit(bun.default_allocator);
|
||||
}
|
||||
|
||||
if (to_run_now_cpp.capacity > 1024 * 128) {
|
||||
// once in a while, deinit the array to free up memory
|
||||
to_run_now_cpp.clearAndFree(bun.default_allocator);
|
||||
} else {
|
||||
to_run_now_cpp.clearRetainingCapacity();
|
||||
}
|
||||
|
||||
this.next_immediate_cpp_tasks = to_run_now_cpp;
|
||||
|
||||
if (this.next_immediate_tasks.capacity > 0) {
|
||||
// this would only occur if we were recursively running tickImmediateTasks.
|
||||
@branchHint(.unlikely);
|
||||
@@ -549,6 +589,10 @@ pub fn enqueueTask(this: *EventLoop, task: Task) void {
|
||||
this.tasks.writeItem(task) catch unreachable;
|
||||
}
|
||||
|
||||
pub fn enqueueImmediateCppTask(this: *EventLoop, task: *CppTask) void {
|
||||
this.immediate_cpp_tasks.append(bun.default_allocator, task) catch bun.outOfMemory();
|
||||
}
|
||||
|
||||
pub fn enqueueImmediateTask(this: *EventLoop, task: *Timer.ImmediateObject) void {
|
||||
bun.handleOom(this.immediate_tasks.append(bun.default_allocator, task));
|
||||
}
|
||||
|
||||
@@ -283,14 +283,14 @@ fn setCwd_(globalObject: *jsc.JSGlobalObject, to: *jsc.ZigString) bun.JSError!js
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(@190n) this may need to be noreturn
|
||||
pub fn exit(globalObject: *jsc.JSGlobalObject, code: u8) callconv(.c) void {
|
||||
var vm = globalObject.bunVM();
|
||||
vm.exit_handler.exit_code = code;
|
||||
if (vm.worker) |worker| {
|
||||
// TODO(@190n) we may need to use requestTerminate or throwTerminationException
|
||||
// instead to terminate the worker sooner
|
||||
// sets exit_called and requests termination
|
||||
worker.exit();
|
||||
// and then fire the trap to immediately interrupt js execution
|
||||
vm.jsc_vm.notifyNeedTermination();
|
||||
} else {
|
||||
vm.onExit();
|
||||
vm.globalExit();
|
||||
|
||||
@@ -78,6 +78,12 @@ pub export fn Bun__queueTask(global: *JSGlobalObject, task: *jsc.CppTask) void {
|
||||
global.bunVM().eventLoop().enqueueTask(jsc.Task.init(task));
|
||||
}
|
||||
|
||||
pub export fn Bun__queueImmediateCppTask(global: *JSGlobalObject, task: *jsc.CppTask) void {
|
||||
jsc.markBinding(@src());
|
||||
|
||||
global.bunVM().eventLoop().enqueueImmediateCppTask(task);
|
||||
}
|
||||
|
||||
pub export fn Bun__reportUnhandledError(globalObject: *JSGlobalObject, value: JSValue) callconv(.C) JSValue {
|
||||
jsc.markBinding(@src());
|
||||
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
//! Shared implementation of Web and Node `Worker`
|
||||
|
||||
const WebWorker = @This();
|
||||
const RefCount = bun.ptr.ThreadSafeRefCount(@This(), "ref_count", destroy, .{});
|
||||
pub const new = bun.TrivialNew(@This());
|
||||
pub const ref = RefCount.ref;
|
||||
pub const deref = RefCount.deref;
|
||||
|
||||
const log = Output.scoped(.Worker, .hidden);
|
||||
|
||||
@@ -8,11 +12,13 @@ const log = Output.scoped(.Worker, .hidden);
|
||||
vm: ?*jsc.VirtualMachine = null,
|
||||
status: std.atomic.Value(Status) = .init(.start),
|
||||
/// To prevent UAF, the `spin` function (aka the worker's event loop) will call deinit once this is set and properly exit the loop.
|
||||
requested_terminate: std.atomic.Value(bool) = .init(false),
|
||||
execution_context_id: u32 = 0,
|
||||
parent_context_id: u32 = 0,
|
||||
parent: *jsc.VirtualMachine,
|
||||
|
||||
ref_count: RefCount,
|
||||
lifecycle_handle: *WebWorkerLifecycleHandle,
|
||||
|
||||
/// To be resolved on the Worker thread at startup, in spin().
|
||||
unresolved_specifier: []const u8,
|
||||
preloads: [][]const u8 = &.{},
|
||||
@@ -58,14 +64,11 @@ export fn WebWorker__getParentWorker(vm: *jsc.VirtualMachine) ?*anyopaque {
|
||||
}
|
||||
|
||||
pub fn hasRequestedTerminate(this: *const WebWorker) bool {
|
||||
return this.requested_terminate.load(.monotonic);
|
||||
return this.lifecycle_handle.worker.load(.acquire) == null;
|
||||
}
|
||||
|
||||
pub fn setRequestedTerminate(this: *WebWorker) bool {
|
||||
return this.requested_terminate.swap(true, .release);
|
||||
}
|
||||
|
||||
export fn WebWorker__updatePtr(worker: *WebWorker, ptr: *anyopaque) bool {
|
||||
export fn WebWorker__updatePtr(handle: *WebWorkerLifecycleHandle, ptr: *anyopaque) bool {
|
||||
const worker = handle.worker.load(.acquire).?;
|
||||
worker.cpp_worker = ptr;
|
||||
|
||||
var thread = std.Thread.spawn(
|
||||
@@ -73,7 +76,7 @@ export fn WebWorker__updatePtr(worker: *WebWorker, ptr: *anyopaque) bool {
|
||||
startWithErrorHandling,
|
||||
.{worker},
|
||||
) catch {
|
||||
worker.deinit();
|
||||
worker.destroy();
|
||||
return false;
|
||||
};
|
||||
thread.detach();
|
||||
@@ -203,6 +206,7 @@ pub fn create(
|
||||
execArgv_len: usize,
|
||||
preload_modules_ptr: ?[*]bun.String,
|
||||
preload_modules_len: usize,
|
||||
lifecycle_handle: *WebWorkerLifecycleHandle,
|
||||
) callconv(.c) ?*WebWorker {
|
||||
jsc.markBinding(@src());
|
||||
log("[{d}] WebWorker.create", .{this_context_id});
|
||||
@@ -233,8 +237,9 @@ pub fn create(
|
||||
}
|
||||
}
|
||||
|
||||
var worker = bun.handleOom(bun.default_allocator.create(WebWorker));
|
||||
worker.* = WebWorker{
|
||||
var worker = bun.new(WebWorker, WebWorker{
|
||||
.lifecycle_handle = lifecycle_handle,
|
||||
.ref_count = .init(),
|
||||
.cpp_worker = cpp_worker,
|
||||
.parent = parent,
|
||||
.parent_context_id = parent_context_id,
|
||||
@@ -254,10 +259,10 @@ pub fn create(
|
||||
.argv = if (argv_ptr) |ptr| ptr[0..argv_len] else &.{},
|
||||
.execArgv = if (inherit_execArgv) null else (if (execArgv_ptr) |ptr| ptr[0..execArgv_len] else &.{}),
|
||||
.preloads = preloads.items,
|
||||
};
|
||||
});
|
||||
|
||||
worker.parent_poll_ref.ref(parent);
|
||||
|
||||
worker.ref();
|
||||
return worker;
|
||||
}
|
||||
|
||||
@@ -273,6 +278,7 @@ pub fn startWithErrorHandling(
|
||||
pub fn start(
|
||||
this: *WebWorker,
|
||||
) anyerror!void {
|
||||
errdefer this.deref();
|
||||
if (this.name.len > 0) {
|
||||
Output.Source.configureNamedThread(this.name);
|
||||
} else {
|
||||
@@ -373,7 +379,10 @@ fn deinit(this: *WebWorker) void {
|
||||
bun.default_allocator.free(preload);
|
||||
}
|
||||
bun.default_allocator.free(this.preloads);
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
fn destroy(this: *WebWorker) void {
|
||||
bun.destroy(this);
|
||||
}
|
||||
|
||||
fn flushLogs(this: *WebWorker) void {
|
||||
@@ -408,7 +417,7 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
|
||||
|
||||
var buffered_writer_ = bun.MutableString.BufferedWriter{ .context = &array };
|
||||
var buffered_writer = &buffered_writer_;
|
||||
var worker = vm.worker orelse @panic("Assertion failure: no worker");
|
||||
const worker = vm.worker orelse @panic("Assertion failure: no worker");
|
||||
|
||||
const writer = buffered_writer.writer();
|
||||
const Writer = @TypeOf(writer);
|
||||
@@ -442,9 +451,12 @@ fn onUnhandledRejection(vm: *jsc.VirtualMachine, globalObject: *jsc.JSGlobalObje
|
||||
jsc.markBinding(@src());
|
||||
WebWorker__dispatchError(globalObject, worker.cpp_worker, bun.String.cloneUTF8(array.slice()), error_instance);
|
||||
if (vm.worker) |worker_| {
|
||||
_ = worker.setRequestedTerminate();
|
||||
worker.parent_poll_ref.unrefConcurrently(worker.parent);
|
||||
worker_.exitAndDeinit();
|
||||
// During unhandled rejection, we're already holding the API lock - now
|
||||
// is the right time to set exit_called to true so that
|
||||
// notifyNeedTermination uses vm.global.requestTermination() instead of
|
||||
// vm.jsc.notifyNeedTermination() which avoid VMTraps assertion failures
|
||||
worker_.exit_called = true;
|
||||
worker_.lifecycle_handle.requestTermination();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -454,6 +466,10 @@ fn setStatus(this: *WebWorker, status: Status) void {
|
||||
this.status.store(status, .release);
|
||||
}
|
||||
|
||||
fn unhandledError(this: *WebWorker, _: anyerror) void {
|
||||
this.flushLogs();
|
||||
}
|
||||
|
||||
fn spin(this: *WebWorker) void {
|
||||
log("[{d}] spin start", .{this.execution_context_id});
|
||||
|
||||
@@ -545,12 +561,14 @@ fn spin(this: *WebWorker) void {
|
||||
}
|
||||
|
||||
/// This is worker.ref()/.unref() from JS (Caller thread)
|
||||
pub fn setRef(this: *WebWorker, value: bool) callconv(.c) void {
|
||||
if (this.hasRequestedTerminate()) {
|
||||
return;
|
||||
}
|
||||
pub fn setRef(handle: *WebWorkerLifecycleHandle, value: bool) callconv(.c) void {
|
||||
if (handle.worker.load(.acquire)) |worker| {
|
||||
if (worker.hasRequestedTerminate()) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.setRefInternal(value);
|
||||
worker.setRefInternal(value);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn setRefInternal(this: *WebWorker, value: bool) void {
|
||||
@@ -564,26 +582,30 @@ pub fn setRefInternal(this: *WebWorker, value: bool) void {
|
||||
/// Implement process.exit(). May only be called from the Worker thread.
|
||||
pub fn exit(this: *WebWorker) void {
|
||||
this.exit_called = true;
|
||||
this.notifyNeedTermination();
|
||||
this.lifecycle_handle.requestTermination();
|
||||
}
|
||||
|
||||
/// Request a terminate from any thread.
|
||||
pub fn notifyNeedTermination(this: *WebWorker) callconv(.c) void {
|
||||
fn notifyNeedTermination(this: *WebWorker) void {
|
||||
if (this.status.load(.acquire) == .terminated) {
|
||||
return;
|
||||
}
|
||||
if (this.setRequestedTerminate()) {
|
||||
return;
|
||||
}
|
||||
|
||||
log("[{d}] notifyNeedTermination", .{this.execution_context_id});
|
||||
|
||||
if (this.vm) |vm| {
|
||||
vm.eventLoop().wakeup();
|
||||
// TODO(@190n) notifyNeedTermination
|
||||
}
|
||||
|
||||
// TODO(@190n) delete
|
||||
this.setRefInternal(false);
|
||||
if (this.exit_called) {
|
||||
// For process.exit() called from JavaScript, use JSC's termination
|
||||
// exception mechanism to interrupt ongoing JS execution
|
||||
vm.global.requestTermination();
|
||||
} else {
|
||||
// For external terminate requests (e.g worker.terminate() from parent thread),
|
||||
// use VM traps system
|
||||
vm.jsc_vm.notifyNeedTermination();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This handles cleanup, emitting the "close" event, and deinit.
|
||||
@@ -596,6 +618,7 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
|
||||
|
||||
log("[{d}] exitAndDeinit", .{this.execution_context_id});
|
||||
const cpp_worker = this.cpp_worker;
|
||||
|
||||
var exit_code: i32 = 0;
|
||||
var globalObject: ?*jsc.JSGlobalObject = null;
|
||||
var vm_to_deinit: ?*jsc.VirtualMachine = null;
|
||||
@@ -610,31 +633,100 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
|
||||
vm_to_deinit = vm;
|
||||
}
|
||||
var arena = this.arena;
|
||||
|
||||
this.lifecycle_handle.deref();
|
||||
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);
|
||||
if (loop) |loop_| {
|
||||
loop_.internal_loop_data.jsc_vm = null;
|
||||
}
|
||||
|
||||
bun.uws.onThreadExit();
|
||||
this.deinit();
|
||||
|
||||
if (vm_to_deinit) |vm| {
|
||||
vm.gc_controller.deinit();
|
||||
vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm.
|
||||
}
|
||||
bun.deleteAllPoolsForThreadExit();
|
||||
|
||||
if (arena) |*arena_| {
|
||||
arena_.deinit();
|
||||
}
|
||||
|
||||
this.deref();
|
||||
bun.exitThread();
|
||||
}
|
||||
|
||||
pub export fn WebWorkerLifecycleHandle__requestTermination(handle: ?*WebWorkerLifecycleHandle) void {
|
||||
if (handle) |h| {
|
||||
h.requestTermination();
|
||||
}
|
||||
}
|
||||
|
||||
pub export fn WebWorkerLifecycleHandle__release(handle: ?*WebWorkerLifecycleHandle) void {
|
||||
if (handle) |h| {
|
||||
h.deref();
|
||||
}
|
||||
}
|
||||
|
||||
/// Manages the complex timing surrounding web worker creation and destruction
|
||||
const WebWorkerLifecycleHandle = struct {
|
||||
const RefCount = bun.ptr.ThreadSafeRefCount(@This(), "ref_count", WebWorkerLifecycleHandle.deinit, .{});
|
||||
pub const ref = WebWorkerLifecycleHandle.RefCount.ref;
|
||||
pub const deref = WebWorkerLifecycleHandle.RefCount.deref;
|
||||
|
||||
worker: std.atomic.Value(?*WebWorker),
|
||||
ref_count: WebWorkerLifecycleHandle.RefCount,
|
||||
|
||||
pub const new = bun.TrivialNew(WebWorkerLifecycleHandle);
|
||||
|
||||
pub fn createWebWorker(
|
||||
cpp_worker: *void,
|
||||
parent: *jsc.VirtualMachine,
|
||||
name_str: bun.String,
|
||||
specifier_str: bun.String,
|
||||
error_message: *bun.String,
|
||||
parent_context_id: u32,
|
||||
this_context_id: u32,
|
||||
mini: bool,
|
||||
default_unref: bool,
|
||||
eval_mode: bool,
|
||||
argv_ptr: ?[*]WTFStringImpl,
|
||||
argv_len: usize,
|
||||
inherit_execArgv: bool,
|
||||
execArgv_ptr: ?[*]WTFStringImpl,
|
||||
execArgv_len: usize,
|
||||
preload_modules_ptr: ?[*]bun.String,
|
||||
preload_modules_len: usize,
|
||||
) callconv(.c) *WebWorkerLifecycleHandle {
|
||||
const handle = WebWorkerLifecycleHandle.new(.{
|
||||
.worker = .init(null),
|
||||
.ref_count = .init(),
|
||||
});
|
||||
|
||||
const worker = create(cpp_worker, parent, name_str, specifier_str, error_message, parent_context_id, this_context_id, mini, default_unref, eval_mode, argv_ptr, argv_len, inherit_execArgv, execArgv_ptr, execArgv_len, preload_modules_ptr, preload_modules_len, handle).?;
|
||||
|
||||
handle.worker.store(worker, .release);
|
||||
|
||||
// Worker.cpp holds a reference to this
|
||||
handle.ref();
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
pub fn deinit(this: *WebWorkerLifecycleHandle) void {
|
||||
bun.destroy(this);
|
||||
}
|
||||
|
||||
pub fn requestTermination(self: *WebWorkerLifecycleHandle) void {
|
||||
const worker = self.worker.swap(null, .acq_rel) orelse return;
|
||||
|
||||
worker.notifyNeedTermination();
|
||||
worker.deref();
|
||||
}
|
||||
};
|
||||
|
||||
comptime {
|
||||
@export(&create, .{ .name = "WebWorker__create" });
|
||||
@export(¬ifyNeedTermination, .{ .name = "WebWorker__notifyNeedTermination" });
|
||||
@export(&WebWorkerLifecycleHandle.createWebWorker, .{ .name = "WebWorkerLifecycleHandle__createWebWorker" });
|
||||
@export(&setRef, .{ .name = "WebWorker__setRef" });
|
||||
_ = WebWorker__updatePtr;
|
||||
}
|
||||
|
||||
const std = @import("std");
|
||||
|
||||
@@ -4,7 +4,6 @@ const HTTPClient = @This();
|
||||
pub var default_allocator: std.mem.Allocator = undefined;
|
||||
pub var default_arena: Arena = undefined;
|
||||
pub var http_thread: HTTPThread = undefined;
|
||||
|
||||
//TODO: this needs to be freed when Worker Threads are implemented
|
||||
pub var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.AnySocket).init(bun.default_allocator);
|
||||
pub var async_http_id_monotonic: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import type { Pipe as NodeStreamPipe } from "node:stream";
|
||||
|
||||
// Hardcoded module "node:child_process"
|
||||
const EventEmitter = require("node:events");
|
||||
const OsModule = require("node:os");
|
||||
@@ -1041,13 +1043,15 @@ class ChildProcess extends EventEmitter {
|
||||
#handle;
|
||||
#closesNeeded = 1;
|
||||
#closesGot = 0;
|
||||
disconnect: undefined | (() => void);
|
||||
|
||||
signalCode = null;
|
||||
exitCode = null;
|
||||
spawnfile;
|
||||
spawnargs;
|
||||
pid;
|
||||
channel;
|
||||
|
||||
channel: NodeStreamPipe | undefined;
|
||||
killed = false;
|
||||
|
||||
[Symbol.dispose]() {
|
||||
@@ -1371,7 +1375,7 @@ class ChildProcess extends EventEmitter {
|
||||
if (has_ipc) {
|
||||
this.send = this.#send;
|
||||
this.disconnect = this.#disconnect;
|
||||
this.channel = new Control();
|
||||
this.channel = new SubprocessChannel(this);
|
||||
Object.defineProperty(this, "_channel", {
|
||||
get() {
|
||||
return this.channel;
|
||||
@@ -1730,9 +1734,46 @@ function abortChildProcess(child, killSignal, reason) {
|
||||
}
|
||||
}
|
||||
|
||||
class Control extends EventEmitter {
|
||||
constructor() {
|
||||
class SubprocessChannel extends EventEmitter implements NodeStreamPipe {
|
||||
#subprocess: ChildProcess;
|
||||
#closed: boolean = false;
|
||||
|
||||
public constructor(childProcess: ChildProcess) {
|
||||
super();
|
||||
this.#subprocess = childProcess;
|
||||
}
|
||||
|
||||
public close(): void {
|
||||
if (this.#closed) return;
|
||||
|
||||
this.#closed = true;
|
||||
|
||||
if (this.#subprocess.connected) {
|
||||
this.#subprocess.disconnect?.();
|
||||
}
|
||||
|
||||
process.nextTick(() => {
|
||||
this.emit("close");
|
||||
});
|
||||
}
|
||||
|
||||
public hasRef(): boolean {
|
||||
if (this.#closed) return false;
|
||||
|
||||
const handle = this.#subprocess[kHandle];
|
||||
if (!handle) return false;
|
||||
|
||||
return this.#subprocess.connected;
|
||||
}
|
||||
|
||||
public ref(): void {
|
||||
if (this.#closed) return;
|
||||
this.#subprocess.ref();
|
||||
}
|
||||
|
||||
public unref(): void {
|
||||
if (this.#closed) return;
|
||||
this.#subprocess.unref();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// import type { Readable, Writable } from "node:stream";
|
||||
|
||||
// import type { WorkerOptions } from "node:worker_threads";
|
||||
declare const self: typeof globalThis;
|
||||
type WebWorker = InstanceType<typeof globalThis.Worker>;
|
||||
@@ -225,15 +226,19 @@ function moveMessagePortToContext() {
|
||||
|
||||
const unsupportedOptions = ["stdin", "stdout", "stderr", "trackedUnmanagedFds", "resourceLimits"];
|
||||
|
||||
class Worker extends EventEmitter {
|
||||
class Worker extends EventEmitter implements AsyncDisposable {
|
||||
#worker: WebWorker;
|
||||
#performance;
|
||||
|
||||
// this is used by terminate();
|
||||
// either is the exit code if exited, a promise resolving to the exit code, or undefined if we haven't sent .terminate() yet
|
||||
#onExitPromise: Promise<number> | number | undefined = undefined;
|
||||
#onExitResolvers = Promise.withResolvers<number | void>();
|
||||
#urlToRevoke = "";
|
||||
|
||||
async [Symbol.asyncDispose]() {
|
||||
await this.terminate();
|
||||
}
|
||||
|
||||
constructor(filename: string, options: NodeWorkerOptions = {}) {
|
||||
super();
|
||||
for (const key of unsupportedOptions) {
|
||||
@@ -319,7 +324,13 @@ class Worker extends EventEmitter {
|
||||
});
|
||||
}
|
||||
|
||||
terminate(callback: unknown) {
|
||||
terminate(callback?: unknown): Promise<number | void> {
|
||||
// threadId = -1 signifies the worker was closed already. Node returns PromiseResolve() in this case
|
||||
// https://github.com/nodejs/node/blob/61601089f7f2f0e5e7abe8240f198585f585704c/lib/internal/worker.js#L390
|
||||
if (this.threadId === -1) {
|
||||
return Promise.resolve<void>();
|
||||
}
|
||||
|
||||
if (typeof callback === "function") {
|
||||
process.emitWarning(
|
||||
"Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.",
|
||||
@@ -329,12 +340,8 @@ class Worker extends EventEmitter {
|
||||
this.#worker.addEventListener("close", event => callback(null, event.code), { once: true });
|
||||
}
|
||||
|
||||
const onExitPromise = this.#onExitPromise;
|
||||
if (onExitPromise) {
|
||||
return $isPromise(onExitPromise) ? onExitPromise : Promise.$resolve(onExitPromise);
|
||||
}
|
||||
const { resolve, promise } = this.#onExitResolvers;
|
||||
|
||||
const { resolve, promise } = Promise.withResolvers();
|
||||
this.#worker.addEventListener(
|
||||
"close",
|
||||
event => {
|
||||
@@ -342,12 +349,13 @@ class Worker extends EventEmitter {
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
|
||||
this.#worker.terminate();
|
||||
|
||||
return (this.#onExitPromise = promise);
|
||||
return promise;
|
||||
}
|
||||
|
||||
postMessage(...args: [any, any]) {
|
||||
postMessage(...args: Parameters<Bun.Worker["postMessage"]>) {
|
||||
return this.#worker.postMessage.$apply(this.#worker, args);
|
||||
}
|
||||
|
||||
@@ -356,8 +364,8 @@ class Worker extends EventEmitter {
|
||||
return stringPromise.then(s => new HeapSnapshotStream(s));
|
||||
}
|
||||
|
||||
#onClose(e) {
|
||||
this.#onExitPromise = e.code;
|
||||
#onClose(e: Event & { code: number }) {
|
||||
this.#onExitResolvers.resolve(e.code);
|
||||
this.emit("exit", e.code);
|
||||
}
|
||||
|
||||
|
||||
@@ -69,6 +69,7 @@
|
||||
"pg-gateway": "0.3.0-beta.4",
|
||||
"pino": "9.4.0",
|
||||
"pino-pretty": "11.2.2",
|
||||
"piscina": "5.0.0",
|
||||
"postgres": "3.3.5",
|
||||
"prisma": "5.1.1",
|
||||
"prompts": "2.4.2",
|
||||
@@ -460,6 +461,42 @@
|
||||
|
||||
"@napi-rs/canvas-win32-x64-msvc": ["@napi-rs/canvas-win32-x64-msvc@0.1.65", "", { "os": "win32", "cpu": "x64" }, "sha512-RZQX3luWnlNWgdMnLMQ1hyfQraeAn9lnxWWVCHuUM4tAWEV8UDdeb7cMwmJW7eyt8kAosmjeHt3cylQMHOxGFg=="],
|
||||
|
||||
"@napi-rs/nice": ["@napi-rs/nice@1.1.1", "", { "optionalDependencies": { "@napi-rs/nice-android-arm-eabi": "1.1.1", "@napi-rs/nice-android-arm64": "1.1.1", "@napi-rs/nice-darwin-arm64": "1.1.1", "@napi-rs/nice-darwin-x64": "1.1.1", "@napi-rs/nice-freebsd-x64": "1.1.1", "@napi-rs/nice-linux-arm-gnueabihf": "1.1.1", "@napi-rs/nice-linux-arm64-gnu": "1.1.1", "@napi-rs/nice-linux-arm64-musl": "1.1.1", "@napi-rs/nice-linux-ppc64-gnu": "1.1.1", "@napi-rs/nice-linux-riscv64-gnu": "1.1.1", "@napi-rs/nice-linux-s390x-gnu": "1.1.1", "@napi-rs/nice-linux-x64-gnu": "1.1.1", "@napi-rs/nice-linux-x64-musl": "1.1.1", "@napi-rs/nice-openharmony-arm64": "1.1.1", "@napi-rs/nice-win32-arm64-msvc": "1.1.1", "@napi-rs/nice-win32-ia32-msvc": "1.1.1", "@napi-rs/nice-win32-x64-msvc": "1.1.1" } }, "sha512-xJIPs+bYuc9ASBl+cvGsKbGrJmS6fAKaSZCnT0lhahT5rhA2VVy9/EcIgd2JhtEuFOJNx7UHNn/qiTPTY4nrQw=="],
|
||||
|
||||
"@napi-rs/nice-android-arm-eabi": ["@napi-rs/nice-android-arm-eabi@1.1.1", "", { "os": "android", "cpu": "arm" }, "sha512-kjirL3N6TnRPv5iuHw36wnucNqXAO46dzK9oPb0wj076R5Xm8PfUVA9nAFB5ZNMmfJQJVKACAPd/Z2KYMppthw=="],
|
||||
|
||||
"@napi-rs/nice-android-arm64": ["@napi-rs/nice-android-arm64@1.1.1", "", { "os": "android", "cpu": "arm64" }, "sha512-blG0i7dXgbInN5urONoUCNf+DUEAavRffrO7fZSeoRMJc5qD+BJeNcpr54msPF6qfDD6kzs9AQJogZvT2KD5nw=="],
|
||||
|
||||
"@napi-rs/nice-darwin-arm64": ["@napi-rs/nice-darwin-arm64@1.1.1", "", { "os": "darwin", "cpu": "arm64" }, "sha512-s/E7w45NaLqTGuOjC2p96pct4jRfo61xb9bU1unM/MJ/RFkKlJyJDx7OJI/O0ll/hrfpqKopuAFDV8yo0hfT7A=="],
|
||||
|
||||
"@napi-rs/nice-darwin-x64": ["@napi-rs/nice-darwin-x64@1.1.1", "", { "os": "darwin", "cpu": "x64" }, "sha512-dGoEBnVpsdcC+oHHmW1LRK5eiyzLwdgNQq3BmZIav+9/5WTZwBYX7r5ZkQC07Nxd3KHOCkgbHSh4wPkH1N1LiQ=="],
|
||||
|
||||
"@napi-rs/nice-freebsd-x64": ["@napi-rs/nice-freebsd-x64@1.1.1", "", { "os": "freebsd", "cpu": "x64" }, "sha512-kHv4kEHAylMYmlNwcQcDtXjklYp4FCf0b05E+0h6nDHsZ+F0bDe04U/tXNOqrx5CmIAth4vwfkjjUmp4c4JktQ=="],
|
||||
|
||||
"@napi-rs/nice-linux-arm-gnueabihf": ["@napi-rs/nice-linux-arm-gnueabihf@1.1.1", "", { "os": "linux", "cpu": "arm" }, "sha512-E1t7K0efyKXZDoZg1LzCOLxgolxV58HCkaEkEvIYQx12ht2pa8hoBo+4OB3qh7e+QiBlp1SRf+voWUZFxyhyqg=="],
|
||||
|
||||
"@napi-rs/nice-linux-arm64-gnu": ["@napi-rs/nice-linux-arm64-gnu@1.1.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-CIKLA12DTIZlmTaaKhQP88R3Xao+gyJxNWEn04wZwC2wmRapNnxCUZkVwggInMJvtVElA+D4ZzOU5sX4jV+SmQ=="],
|
||||
|
||||
"@napi-rs/nice-linux-arm64-musl": ["@napi-rs/nice-linux-arm64-musl@1.1.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-+2Rzdb3nTIYZ0YJF43qf2twhqOCkiSrHx2Pg6DJaCPYhhaxbLcdlV8hCRMHghQ+EtZQWGNcS2xF4KxBhSGeutg=="],
|
||||
|
||||
"@napi-rs/nice-linux-ppc64-gnu": ["@napi-rs/nice-linux-ppc64-gnu@1.1.1", "", { "os": "linux", "cpu": "ppc64" }, "sha512-4FS8oc0GeHpwvv4tKciKkw3Y4jKsL7FRhaOeiPei0X9T4Jd619wHNe4xCLmN2EMgZoeGg+Q7GY7BsvwKpL22Tg=="],
|
||||
|
||||
"@napi-rs/nice-linux-riscv64-gnu": ["@napi-rs/nice-linux-riscv64-gnu@1.1.1", "", { "os": "linux", "cpu": "none" }, "sha512-HU0nw9uD4FO/oGCCk409tCi5IzIZpH2agE6nN4fqpwVlCn5BOq0MS1dXGjXaG17JaAvrlpV5ZeyZwSon10XOXw=="],
|
||||
|
||||
"@napi-rs/nice-linux-s390x-gnu": ["@napi-rs/nice-linux-s390x-gnu@1.1.1", "", { "os": "linux", "cpu": "s390x" }, "sha512-2YqKJWWl24EwrX0DzCQgPLKQBxYDdBxOHot1KWEq7aY2uYeX+Uvtv4I8xFVVygJDgf6/92h9N3Y43WPx8+PAgQ=="],
|
||||
|
||||
"@napi-rs/nice-linux-x64-gnu": ["@napi-rs/nice-linux-x64-gnu@1.1.1", "", { "os": "linux", "cpu": "x64" }, "sha512-/gaNz3R92t+dcrfCw/96pDopcmec7oCcAQ3l/M+Zxr82KT4DljD37CpgrnXV+pJC263JkW572pdbP3hP+KjcIg=="],
|
||||
|
||||
"@napi-rs/nice-linux-x64-musl": ["@napi-rs/nice-linux-x64-musl@1.1.1", "", { "os": "linux", "cpu": "x64" }, "sha512-xScCGnyj/oppsNPMnevsBe3pvNaoK7FGvMjT35riz9YdhB2WtTG47ZlbxtOLpjeO9SqqQ2J2igCmz6IJOD5JYw=="],
|
||||
|
||||
"@napi-rs/nice-openharmony-arm64": ["@napi-rs/nice-openharmony-arm64@1.1.1", "", { "os": "none", "cpu": "arm64" }, "sha512-6uJPRVwVCLDeoOaNyeiW0gp2kFIM4r7PL2MczdZQHkFi9gVlgm+Vn+V6nTWRcu856mJ2WjYJiumEajfSm7arPQ=="],
|
||||
|
||||
"@napi-rs/nice-win32-arm64-msvc": ["@napi-rs/nice-win32-arm64-msvc@1.1.1", "", { "os": "win32", "cpu": "arm64" }, "sha512-uoTb4eAvM5B2aj/z8j+Nv8OttPf2m+HVx3UjA5jcFxASvNhQriyCQF1OB1lHL43ZhW+VwZlgvjmP5qF3+59atA=="],
|
||||
|
||||
"@napi-rs/nice-win32-ia32-msvc": ["@napi-rs/nice-win32-ia32-msvc@1.1.1", "", { "os": "win32", "cpu": "ia32" }, "sha512-CNQqlQT9MwuCsg1Vd/oKXiuH+TcsSPJmlAFc5frFyX/KkOh0UpBLEj7aoY656d5UKZQMQFP7vJNa1DNUNORvug=="],
|
||||
|
||||
"@napi-rs/nice-win32-x64-msvc": ["@napi-rs/nice-win32-x64-msvc@1.1.1", "", { "os": "win32", "cpu": "x64" }, "sha512-vB+4G/jBQCAh0jelMTY3+kgFy00Hlx2f2/1zjMoH821IbplbWZOkLiTYXQkygNTzQJTq5cvwBDgn2ppHD+bglQ=="],
|
||||
|
||||
"@nestjs/common": ["@nestjs/common@11.0.3", "", { "dependencies": { "iterare": "1.2.1", "tslib": "2.8.1", "uid": "2.0.2" }, "peerDependencies": { "class-transformer": "*", "class-validator": "*", "reflect-metadata": "^0.1.12 || ^0.2.0", "rxjs": "^7.1.0" }, "optionalPeers": ["class-transformer", "class-validator"] }, "sha512-fTkJWQ20+jvPKfrv3A+T3wsPwwYSJyoJ+pcBzyKtv5fCpK/yX/rJalFUIpw1CDmarfqIaMX9SdkplNyxtvH6RA=="],
|
||||
|
||||
"@nestjs/core": ["@nestjs/core@11.0.3", "", { "dependencies": { "@nuxt/opencollective": "0.4.1", "fast-safe-stringify": "2.1.1", "iterare": "1.2.1", "path-to-regexp": "8.2.0", "tslib": "2.8.1", "uid": "2.0.2" }, "peerDependencies": { "@nestjs/common": "^11.0.0", "@nestjs/microservices": "^11.0.0", "@nestjs/platform-express": "^11.0.0", "@nestjs/websockets": "^11.0.0", "reflect-metadata": "^0.1.12 || ^0.2.0", "rxjs": "^7.1.0" }, "optionalPeers": ["@nestjs/microservices", "@nestjs/platform-express", "@nestjs/websockets"] }, "sha512-6UoVHpwa23HJxMNtuTXQCiqx/NHTG3lRBRgnZ8EDHTjgaNnR7P+xBS68zN3gLH7rBIrhhQ5Q1hVs7WswRxrw7Q=="],
|
||||
@@ -2092,6 +2129,8 @@
|
||||
|
||||
"pino-std-serializers": ["pino-std-serializers@7.0.0", "", {}, "sha512-e906FRY0+tV27iq4juKzSYPbUj2do2X2JX4EzSca1631EB2QJQUqGbDuERal7LCtOpxl6x3+nvo9NPZcmjkiFA=="],
|
||||
|
||||
"piscina": ["piscina@5.0.0", "", { "optionalDependencies": { "@napi-rs/nice": "^1.0.1" } }, "sha512-R+arufwL7sZvGjAhSMK3TfH55YdGOqhpKXkcwQJr432AAnJX/xxX19PA4QisrmJ+BTTfZVggaz6HexbkQq1l1Q=="],
|
||||
|
||||
"pixelmatch": ["pixelmatch@5.3.0", "", { "dependencies": { "pngjs": "^6.0.0" }, "bin": { "pixelmatch": "bin/pixelmatch" } }, "sha512-o8mkY4E/+LNUf6LzX96ht6k6CEDi65k9G2rjMtBe9Oo+VPKSvl+0GKHuH/AlG+GA5LPG/i5hrekkxUc3s2HU+Q=="],
|
||||
|
||||
"pkg-dir": ["pkg-dir@4.2.0", "", { "dependencies": { "find-up": "^4.0.0" } }, "sha512-HRDzbaKjC+AOWVXxAU/x54COGeIv9eb+6CkDSQoNTt4XyWoIJvuPsXizxu/Fr23EiekbtZwmh1IcIG/l/a10GQ=="],
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* without always needing to run `bun install` in development.
|
||||
*/
|
||||
|
||||
import { gc as bunGC, sleepSync, spawnSync, unsafe, which, write } from "bun";
|
||||
import { gc as bunGC, readableStreamToText, sleepSync, spawnSync, unsafe, which, write } from "bun";
|
||||
import { heapStats } from "bun:jsc";
|
||||
import { beforeAll, describe, expect } from "bun:test";
|
||||
import { ChildProcess, execSync, fork } from "child_process";
|
||||
@@ -496,11 +496,43 @@ if (expect.extend)
|
||||
}
|
||||
}
|
||||
},
|
||||
async toRunAsync(cmds: string[], optionalStdout?: string, expectedCode: number = 0) {
|
||||
const result = Bun.spawn({
|
||||
cmd: [bunExe(), ...cmds],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "pipe", "pipe"],
|
||||
});
|
||||
|
||||
const [stdout, stderr, exitCode] = await Promise.all([
|
||||
readableStreamToText(result.stdout),
|
||||
readableStreamToText(result.stderr),
|
||||
result.exited,
|
||||
]);
|
||||
|
||||
if (exitCode !== expectedCode) {
|
||||
return {
|
||||
pass: false,
|
||||
message: () => `Command ${cmds.join(" ")} failed:` + "\n" + stdout + "\n" + stderr,
|
||||
};
|
||||
}
|
||||
|
||||
if (optionalStdout != null) {
|
||||
return {
|
||||
pass: stdout === optionalStdout,
|
||||
message: () => `Expected ${cmds.join(" ")} to output ${optionalStdout} but got ${stdout}`,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
pass: true,
|
||||
message: () => `Expected ${cmds.join(" ")} to run`,
|
||||
};
|
||||
},
|
||||
toRun(cmds: string[], optionalStdout?: string, expectedCode: number = 0) {
|
||||
const result = Bun.spawnSync({
|
||||
cmd: [bunExe(), ...cmds],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "pipe", "inherit"],
|
||||
stdio: ["ignore", "pipe", "inherit"],
|
||||
});
|
||||
|
||||
if (result.exitCode !== expectedCode) {
|
||||
@@ -1371,6 +1403,7 @@ interface BunHarnessTestMatchers {
|
||||
toHaveTestTimedOutAfter(expected: number): void;
|
||||
toBeBinaryType(expected: keyof typeof binaryTypes): void;
|
||||
toRun(optionalStdout?: string, expectedCode?: number): void;
|
||||
toRunAsync(optionalStdout?: string, expectedCode?: number): Promise<void>;
|
||||
toThrowWithCode(cls: CallableFunction, code: string): void;
|
||||
toThrowWithCodeAsync(cls: CallableFunction, code: string): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -187,3 +187,6 @@ tsd.expectAssignable<NullSubprocess>(Bun.spawn([], { stdio: ["ignore", "inherit"
|
||||
tsd.expectAssignable<NullSubprocess>(Bun.spawn([], { stdio: [null, null, null] }));
|
||||
|
||||
tsd.expectAssignable<SyncSubprocess<Bun.SpawnOptions.Readable, Bun.SpawnOptions.Readable>>(Bun.spawnSync([], {}));
|
||||
|
||||
Bun.spawnSync({ cmd: ["echo", "hello"] });
|
||||
Bun.spawnSync(["echo", "hello"], { stdio: ["ignore", "pipe", "pipe"] });
|
||||
|
||||
8
test/js/node/test/parallel/test-worker-dispose.mjs
Normal file
8
test/js/node/test/parallel/test-worker-dispose.mjs
Normal file
@@ -0,0 +1,8 @@
|
||||
import * as common from '../common/index.mjs';
|
||||
import { Worker } from 'node:worker_threads';
|
||||
|
||||
{
|
||||
// Verifies that the worker is async disposable
|
||||
await using worker = new Worker('for(;;) {}', { eval: true });
|
||||
worker.on('exit', common.mustCall());
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const { Worker } = require('worker_threads');
|
||||
|
||||
// Regression for https://github.com/nodejs/node/issues/43182.
|
||||
const w = new Worker(new URL('data:text/javascript,process.exit(1);await new Promise(()=>{ process.exit(2); })'));
|
||||
w.on('exit', common.mustCall((code) => {
|
||||
assert.strictEqual(code, 1);
|
||||
}));
|
||||
@@ -7,7 +7,7 @@ const eachSizeMiB = 100;
|
||||
const iterations = 5;
|
||||
|
||||
function test() {
|
||||
const code = " ".repeat(eachSizeMiB * 1024 * 1024);
|
||||
const code = Buffer.alloc(eachSizeMiB * 1024 * 1024, " ").toString();
|
||||
return new Promise((resolve, reject) => {
|
||||
const worker = new Worker(code, { eval: true });
|
||||
worker.on("exit", () => resolve());
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
import assert from "node:assert";
|
||||
import { test } from "node:test";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { MessageChannel, MessagePort, parentPort, Worker } from "node:worker_threads";
|
||||
|
||||
interface StartupMessage {
|
||||
port: MessagePort;
|
||||
}
|
||||
|
||||
if (parentPort) {
|
||||
parentPort.on("message", (message: StartupMessage) => {
|
||||
console.log("Worker received startup message");
|
||||
|
||||
message.port.postMessage("hello");
|
||||
message.port.close();
|
||||
});
|
||||
} else {
|
||||
test("worker lifecycle message port", async () => {
|
||||
const worker = new Worker(fileURLToPath(import.meta.url));
|
||||
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const { promise, resolve, reject } = Promise.withResolvers<string>();
|
||||
|
||||
port1.on("message", (message: string) => {
|
||||
console.log("Received message:", message);
|
||||
assert.equal(message, "hello");
|
||||
worker.terminate();
|
||||
resolve(message);
|
||||
});
|
||||
|
||||
worker.on("online", () => {
|
||||
console.log("Worker is online");
|
||||
const startupMessage: StartupMessage = { port: port2 };
|
||||
worker.postMessage(startupMessage, [port2]);
|
||||
});
|
||||
|
||||
worker.on("exit", () => {
|
||||
console.log("Worker exited");
|
||||
reject();
|
||||
});
|
||||
|
||||
worker.on("error", err => {
|
||||
reject(err);
|
||||
});
|
||||
|
||||
assert.equal(await promise, "hello");
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
import assert from "node:assert";
|
||||
import { setTimeout as sleep } from "node:timers/promises";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { Worker, isMainThread, threadId } from "node:worker_threads";
|
||||
|
||||
const sleeptime = 100;
|
||||
|
||||
if (isMainThread) {
|
||||
const worker = new Worker(fileURLToPath(import.meta.url));
|
||||
assert.strictEqual(threadId, 0);
|
||||
assert.strictEqual(worker.threadId, 1);
|
||||
console.log(" (main) threadId:", worker.threadId);
|
||||
|
||||
await sleep(sleeptime);
|
||||
assert.strictEqual(await worker.terminate(), 1);
|
||||
assert.strictEqual(worker.threadId, -1); // should be -1 after termination
|
||||
assert.strictEqual(await worker.terminate(), undefined); // sequential calling is basically no-op
|
||||
assert.strictEqual(worker.threadId, -1);
|
||||
} else {
|
||||
console.log("(worker) threadId:", threadId);
|
||||
assert.strictEqual(threadId, 1);
|
||||
await sleep(sleeptime * 2); // keep it alive definitely longer than the parent
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
import { once } from "node:events";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { Worker, isMainThread, parentPort } from "node:worker_threads";
|
||||
|
||||
if (isMainThread) {
|
||||
const { test, expect } = await import("bun:test");
|
||||
|
||||
test("process.exit() works", async () => {
|
||||
const worker = new Worker(fileURLToPath(import.meta.url));
|
||||
|
||||
worker.on("message", () => expect().fail("worker should not keep executing after process.exit()"));
|
||||
worker.postMessage("boom");
|
||||
|
||||
const [exitCode] = await once(worker, "exit");
|
||||
expect(exitCode).toBe(2);
|
||||
});
|
||||
} else {
|
||||
console.log("Worker thread started");
|
||||
|
||||
parentPort!.on("message", message => {
|
||||
console.log(`Worker received: ${message}`);
|
||||
|
||||
console.log("About to call process.exit(2)...");
|
||||
process.exit(2);
|
||||
console.log("process.exit(2) called");
|
||||
|
||||
parentPort!.postMessage("i'm still alive!");
|
||||
});
|
||||
|
||||
console.log("Worker is ready, waiting for messages...");
|
||||
}
|
||||
@@ -6,7 +6,7 @@ import { Worker, isMainThread, workerData } from "worker_threads";
|
||||
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
|
||||
|
||||
const actions = {
|
||||
async ["Bun.connect"](port) {
|
||||
async ["Bun.connect"](port: number) {
|
||||
await Bun.connect({
|
||||
hostname: "localhost",
|
||||
port,
|
||||
@@ -19,7 +19,7 @@ const actions = {
|
||||
},
|
||||
});
|
||||
},
|
||||
async ["Bun.listen"](port) {
|
||||
async ["Bun.listen"](port: number) {
|
||||
const server = Bun.listen({
|
||||
hostname: "localhost",
|
||||
port: 0,
|
||||
@@ -32,7 +32,7 @@ const actions = {
|
||||
},
|
||||
});
|
||||
},
|
||||
async ["fetch"](port) {
|
||||
async ["fetch"](port: number) {
|
||||
const resp = await fetch("http://localhost:" + port);
|
||||
await resp.blob();
|
||||
},
|
||||
@@ -65,12 +65,10 @@ if (isMainThread) {
|
||||
const { promise, resolve, reject } = Promise.withResolvers();
|
||||
promises.push(promise);
|
||||
|
||||
worker.on("online", () => {
|
||||
sleep(1)
|
||||
.then(() => {
|
||||
return worker.terminate();
|
||||
})
|
||||
.finally(resolve);
|
||||
worker.once("online", async () => {
|
||||
await sleep(1);
|
||||
await worker.terminate();
|
||||
resolve();
|
||||
});
|
||||
worker.on("error", e => reject(e));
|
||||
}
|
||||
|
||||
17
test/js/third_party/astro/astro-post.test.js
vendored
17
test/js/third_party/astro/astro-post.test.js
vendored
@@ -4,21 +4,22 @@ import { bunEnv, nodeExe } from "harness";
|
||||
import { join } from "path";
|
||||
|
||||
const fixtureDir = join(import.meta.dirname, "fixtures");
|
||||
function postNodeFormData(port) {
|
||||
const result = Bun.spawnSync({
|
||||
async function postNodeFormData(port) {
|
||||
const result = Bun.spawn({
|
||||
cmd: [nodeExe(), join(fixtureDir, "node-form-data.fetch.fixture.js"), port?.toString()],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "inherit", "inherit"],
|
||||
});
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(await result.exited).toBe(0);
|
||||
}
|
||||
function postNodeAction(port) {
|
||||
const result = Bun.spawnSync({
|
||||
async function postNodeAction(port) {
|
||||
const result = Bun.spawn({
|
||||
cmd: [nodeExe(), join(fixtureDir, "node-action.fetch.fixture.js"), port?.toString()],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "inherit", "inherit"],
|
||||
});
|
||||
expect(result.exitCode).toBe(0);
|
||||
|
||||
expect(await result.exited).toBe(0);
|
||||
}
|
||||
|
||||
describe("astro", async () => {
|
||||
@@ -66,7 +67,7 @@ describe("astro", async () => {
|
||||
});
|
||||
|
||||
test("is able todo a POST request to an astro action using node", async () => {
|
||||
postNodeAction(previewServer.port);
|
||||
await postNodeAction(previewServer.port);
|
||||
});
|
||||
|
||||
test("is able to post form data to an astro using bun", async () => {
|
||||
@@ -89,6 +90,6 @@ describe("astro", async () => {
|
||||
});
|
||||
});
|
||||
test("is able to post form data to an astro using node", async () => {
|
||||
postNodeFormData(previewServer.port);
|
||||
await postNodeFormData(previewServer.port);
|
||||
});
|
||||
});
|
||||
|
||||
1
test/js/third_party/astro/fixtures/.gitignore
vendored
Normal file
1
test/js/third_party/astro/fixtures/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
.astro
|
||||
6
test/js/third_party/piscina/package.json
vendored
Normal file
6
test/js/third_party/piscina/package.json
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "piscina-test",
|
||||
"dependencies": {
|
||||
"piscina": "5.0.0"
|
||||
}
|
||||
}
|
||||
22
test/js/third_party/piscina/piscina-atomics.test.ts
vendored
Normal file
22
test/js/third_party/piscina/piscina-atomics.test.ts
vendored
Normal file
@@ -0,0 +1,22 @@
|
||||
import { resolve } from "node:path";
|
||||
import { test } from "node:test";
|
||||
import { Piscina } from "piscina";
|
||||
|
||||
test("piscina atomics", async () => {
|
||||
const pool = new Piscina<void, void>({
|
||||
filename: resolve(__dirname, "simple.fixture.ts"),
|
||||
minThreads: 2,
|
||||
maxThreads: 2,
|
||||
atomics: "sync",
|
||||
});
|
||||
|
||||
const tasks: Promise<void>[] = [];
|
||||
|
||||
for (let i = 1; i <= 10000; i++) {
|
||||
tasks.push(pool.run());
|
||||
}
|
||||
|
||||
await Promise.all(tasks);
|
||||
|
||||
await pool.destroy();
|
||||
});
|
||||
63
test/js/third_party/piscina/piscina.test.ts
vendored
Normal file
63
test/js/third_party/piscina/piscina.test.ts
vendored
Normal file
@@ -0,0 +1,63 @@
|
||||
import { expect, test } from "bun:test";
|
||||
import { join } from "node:path";
|
||||
import { Piscina } from "piscina";
|
||||
|
||||
setTimeout(() => {
|
||||
console.error(new Error("Catastrophic failure, exiting so test can fail"));
|
||||
process.exit(1);
|
||||
}, 10 * 1000).unref();
|
||||
|
||||
test("Piscina basic functionality", async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: join(import.meta.dir, "worker.fixture.ts"),
|
||||
});
|
||||
|
||||
const result = await piscina.run({ a: 4, b: 6 });
|
||||
expect(result).toBe(10);
|
||||
|
||||
await piscina.destroy();
|
||||
});
|
||||
|
||||
test("Piscina event loop cleanup", async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: join(import.meta.dir, "worker.fixture.ts"),
|
||||
});
|
||||
|
||||
const results = await Promise.all([
|
||||
piscina.run({ a: 1, b: 2 }),
|
||||
piscina.run({ a: 3, b: 4 }),
|
||||
piscina.run({ a: 5, b: 6 }),
|
||||
]);
|
||||
|
||||
expect(results).toEqual([3, 7, 11]);
|
||||
|
||||
await piscina.destroy();
|
||||
});
|
||||
|
||||
test("Piscina with idleTimeout", async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: join(import.meta.dir, "worker.fixture.ts"),
|
||||
idleTimeout: 100,
|
||||
maxThreads: 1,
|
||||
});
|
||||
|
||||
const result = await piscina.run({ a: 10, b: 20 });
|
||||
expect(result).toBe(30);
|
||||
|
||||
await piscina.destroy();
|
||||
});
|
||||
|
||||
test("Piscina error handling", async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: join(import.meta.dir, "worker-error.fixture.ts"),
|
||||
});
|
||||
|
||||
const p = await piscina.run({ shouldThrow: true }).then(
|
||||
() => true,
|
||||
() => false,
|
||||
);
|
||||
|
||||
expect(p).toBe(false);
|
||||
|
||||
await piscina.destroy();
|
||||
});
|
||||
10
test/js/third_party/piscina/simple.fixture.ts
vendored
Normal file
10
test/js/third_party/piscina/simple.fixture.ts
vendored
Normal file
@@ -0,0 +1,10 @@
|
||||
// https://github.com/piscinajs/piscina/blob/ba396ced7afc08a8c16f65fbc367a9b7f4d7e84c/test/fixtures/simple-isworkerthread.ts#L7
|
||||
|
||||
import assert from "assert";
|
||||
import Piscina from "piscina";
|
||||
|
||||
assert.strictEqual(Piscina.isWorkerThread, true);
|
||||
|
||||
export default function () {
|
||||
return "done";
|
||||
}
|
||||
7
test/js/third_party/piscina/worker-error.fixture.ts
vendored
Normal file
7
test/js/third_party/piscina/worker-error.fixture.ts
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
export default ({ shouldThrow }: { shouldThrow: boolean }) => {
|
||||
if (shouldThrow) {
|
||||
throw new Error("Worker error for testing");
|
||||
}
|
||||
|
||||
return "success";
|
||||
};
|
||||
4
test/js/third_party/piscina/worker.fixture.ts
vendored
Normal file
4
test/js/third_party/piscina/worker.fixture.ts
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
export default ({ a, b }: { a: number; b: number }) => {
|
||||
console.log("Worker: calculating", a, "+", b);
|
||||
return a + b;
|
||||
};
|
||||
308
test/js/web/workers/message-port-lifecycle.test.ts
Normal file
308
test/js/web/workers/message-port-lifecycle.test.ts
Normal file
@@ -0,0 +1,308 @@
|
||||
import assert from "node:assert";
|
||||
import { test } from "node:test";
|
||||
import { MessageChannel, receiveMessageOnPort, Worker } from "worker_threads";
|
||||
|
||||
test("MessagePort postMessage respects event loop timing", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const messages: string[] = [];
|
||||
let messageCount = 0;
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", msg => {
|
||||
messages.push(`received: ${msg}`);
|
||||
messageCount++;
|
||||
|
||||
if (messageCount === 3) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
port2.start();
|
||||
|
||||
setImmediate(() => {
|
||||
messages.push("setImmediate 1");
|
||||
port1.postMessage("message1");
|
||||
});
|
||||
|
||||
setImmediate(() => {
|
||||
messages.push("setImmediate 2");
|
||||
port1.postMessage("message2");
|
||||
});
|
||||
|
||||
setImmediate(() => {
|
||||
messages.push("setImmediate 3");
|
||||
port1.postMessage("message3");
|
||||
});
|
||||
|
||||
await promise;
|
||||
|
||||
assert.deepStrictEqual(messages, [
|
||||
"setImmediate 1",
|
||||
"setImmediate 2",
|
||||
"setImmediate 3",
|
||||
"received: message1",
|
||||
"received: message2",
|
||||
"received: message3",
|
||||
]);
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
|
||||
test("MessagePort messages execute after process.nextTick (Node.js compatibility)", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const executionOrder: string[] = [];
|
||||
let messageReceived = false;
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", () => {
|
||||
executionOrder.push("message received");
|
||||
messageReceived = true;
|
||||
resolve();
|
||||
});
|
||||
|
||||
port2.start();
|
||||
|
||||
port1.postMessage("test");
|
||||
|
||||
process.nextTick(() => {
|
||||
executionOrder.push("nextTick 1");
|
||||
});
|
||||
|
||||
process.nextTick(() => {
|
||||
executionOrder.push("nextTick 2");
|
||||
});
|
||||
|
||||
await promise;
|
||||
|
||||
await new Promise(resolve => setImmediate(resolve));
|
||||
|
||||
assert.strictEqual(messageReceived, true);
|
||||
assert.strictEqual(executionOrder[0], "nextTick 1");
|
||||
assert.strictEqual(executionOrder[1], "nextTick 2");
|
||||
assert.strictEqual(executionOrder[2], "message received");
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
|
||||
test("MessagePort message delivery works with workers", async () => {
|
||||
const worker = new Worker(
|
||||
`
|
||||
const { parentPort, MessageChannel } = require('worker_threads');
|
||||
|
||||
parentPort.on('message', ({ port }) => {
|
||||
let count = 0;
|
||||
|
||||
port.on('message', (msg) => {
|
||||
count++;
|
||||
|
||||
port.postMessage(\`echo-\${count}: \${msg}\`);
|
||||
|
||||
if (count >= 3) {
|
||||
port.close();
|
||||
parentPort.postMessage('done');
|
||||
}
|
||||
});
|
||||
|
||||
port.start();
|
||||
parentPort.postMessage('ready');
|
||||
});
|
||||
`,
|
||||
{ eval: true },
|
||||
);
|
||||
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const messages: string[] = [];
|
||||
let readyReceived = false;
|
||||
let doneReceived = false;
|
||||
|
||||
const { promise, resolve, reject: rejectWorker } = Promise.withResolvers<void>();
|
||||
const {
|
||||
promise: allMessagesReceived,
|
||||
resolve: resolveAllMessages,
|
||||
reject: rejectAllMessages,
|
||||
} = Promise.withResolvers<void>();
|
||||
|
||||
AbortSignal.timeout(100).addEventListener("abort", () => {
|
||||
worker.terminate();
|
||||
rejectWorker(new Error("timeout"));
|
||||
rejectAllMessages(new Error("timeout"));
|
||||
});
|
||||
|
||||
worker.on("message", msg => {
|
||||
if (msg === "ready") {
|
||||
readyReceived = true;
|
||||
|
||||
port1.postMessage("hello1");
|
||||
port1.postMessage("hello2");
|
||||
port1.postMessage("hello3");
|
||||
} else if (msg === "done") {
|
||||
doneReceived = true;
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
port1.on("message", msg => {
|
||||
messages.push(msg);
|
||||
|
||||
if (messages.length === 3) {
|
||||
resolveAllMessages();
|
||||
}
|
||||
});
|
||||
|
||||
worker.postMessage({ port: port2 }, [port2]);
|
||||
|
||||
await Promise.all([promise, allMessagesReceived]);
|
||||
|
||||
assert.strictEqual(readyReceived, true);
|
||||
assert.strictEqual(doneReceived, true);
|
||||
assert.strictEqual(messages.length, 3);
|
||||
assert.deepStrictEqual(messages, ["echo-1: hello1", "echo-2: hello2", "echo-3: hello3"]);
|
||||
|
||||
port1.close();
|
||||
worker.terminate();
|
||||
});
|
||||
|
||||
test("MessagePort messages don't starve microtasks", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const executionOrder: string[] = [];
|
||||
let messageCount = 0;
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", () => {
|
||||
messageCount++;
|
||||
executionOrder.push(`message-${messageCount}`);
|
||||
|
||||
if (messageCount === 3) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
port2.start();
|
||||
|
||||
queueMicrotask(() => {
|
||||
executionOrder.push("microtask-1");
|
||||
});
|
||||
|
||||
port1.postMessage("msg1");
|
||||
|
||||
queueMicrotask(() => {
|
||||
executionOrder.push("microtask-2");
|
||||
});
|
||||
|
||||
port1.postMessage("msg2");
|
||||
|
||||
queueMicrotask(() => {
|
||||
executionOrder.push("microtask-3");
|
||||
});
|
||||
|
||||
port1.postMessage("msg3");
|
||||
|
||||
await promise;
|
||||
|
||||
assert(executionOrder.includes("microtask-1"));
|
||||
assert(executionOrder.includes("microtask-2"));
|
||||
assert(executionOrder.includes("microtask-3"));
|
||||
assert(executionOrder.includes("message-1"));
|
||||
assert(executionOrder.includes("message-2"));
|
||||
assert(executionOrder.includes("message-3"));
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
|
||||
test("high volume MessagePort operations maintain order", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
const TOTAL_MESSAGES = 100;
|
||||
const receivedMessages: number[] = [];
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", msg => {
|
||||
receivedMessages.push(msg);
|
||||
|
||||
if (receivedMessages.length === TOTAL_MESSAGES) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
port2.start();
|
||||
|
||||
for (let i = 0; i < TOTAL_MESSAGES; i++) {
|
||||
port1.postMessage(i);
|
||||
}
|
||||
|
||||
await promise;
|
||||
|
||||
assert.strictEqual(receivedMessages.length, TOTAL_MESSAGES);
|
||||
for (let i = 0; i < TOTAL_MESSAGES; i++) {
|
||||
assert.strictEqual(receivedMessages[i], i);
|
||||
}
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
|
||||
test("MessagePort close behavior during message handling", async () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
let messageReceived = false;
|
||||
let errorThrown = false;
|
||||
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
|
||||
port2.on("message", () => {
|
||||
messageReceived = true;
|
||||
|
||||
port2.close();
|
||||
|
||||
try {
|
||||
port1.postMessage("after-close");
|
||||
} catch (e) {
|
||||
errorThrown = true;
|
||||
}
|
||||
|
||||
setTimeout(resolve, 10);
|
||||
});
|
||||
|
||||
port2.start();
|
||||
port1.postMessage("test");
|
||||
|
||||
await promise;
|
||||
|
||||
assert.strictEqual(messageReceived, true);
|
||||
|
||||
assert.strictEqual(errorThrown, false);
|
||||
|
||||
port1.close();
|
||||
});
|
||||
|
||||
test("receiveMessageOnPort synchronous message retrieval", () => {
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
port1.postMessage("msg1");
|
||||
port1.postMessage("msg2");
|
||||
port1.postMessage("msg3");
|
||||
|
||||
const result1 = receiveMessageOnPort(port2);
|
||||
const result2 = receiveMessageOnPort(port2);
|
||||
const result3 = receiveMessageOnPort(port2);
|
||||
const result4 = receiveMessageOnPort(port2);
|
||||
|
||||
assert.strictEqual(result1?.message, "msg1");
|
||||
assert.strictEqual(result2?.message, "msg2");
|
||||
assert.strictEqual(result3?.message, "msg3");
|
||||
assert.strictEqual(result4, undefined);
|
||||
|
||||
port1.close();
|
||||
port2.close();
|
||||
});
|
||||
@@ -116,7 +116,7 @@ for (const testType of testTypes) {
|
||||
if (!testType.isTransferable) {
|
||||
expect(() =>
|
||||
structuredCloneAdvanced(original, transferList, !!isForTransfer, isForStorage, context),
|
||||
).toThrowError("The object can not be cloned.");
|
||||
).toThrowError("The object could not be cloned.");
|
||||
} else {
|
||||
const cloned = structuredCloneAdvanced(original, transferList, !!isForTransfer, isForStorage, context);
|
||||
testType.expectedAfterClone(original, cloned, isForTransfer, isForStorage);
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
import assert from "node:assert";
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { test } from "node:test";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { Worker } from "node:worker_threads";
|
||||
import stripAnsi from "strip-ansi";
|
||||
|
||||
const IS_CHILD = process.env.IS_CHILD === "true";
|
||||
|
||||
// At the time of writing, this test file passes in Node.js and fails in Bun.
|
||||
// Node.js seems to wait for the exit event to happen before the parent process
|
||||
// exits, which means that the Worker's exit code is printed to stdout
|
||||
|
||||
if (IS_CHILD) {
|
||||
const worker = new Worker("process.exit(1)", { eval: true });
|
||||
worker.on("exit", code => console.log(code));
|
||||
} else {
|
||||
test("The worker exit event is emitted before the parent exits", async () => {
|
||||
const file = fileURLToPath(import.meta.url);
|
||||
|
||||
const { stdout } = spawnSync(process.execPath, [file], {
|
||||
env: { ...process.env, IS_CHILD: "true" },
|
||||
});
|
||||
|
||||
assert.strictEqual(stripAnsi(stdout.toString()).trim(), "1");
|
||||
});
|
||||
}
|
||||
293
test/js/web/workers/worker-memory-leak.test.ts
Normal file
293
test/js/web/workers/worker-memory-leak.test.ts
Normal file
@@ -0,0 +1,293 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { Worker as WebWorker } from "worker_threads";
|
||||
|
||||
const CONFIG = {
|
||||
WARMUP_ITERATIONS: 2,
|
||||
TEST_ITERATIONS: 10,
|
||||
BATCH_SIZE: 5,
|
||||
MEMORY_THRESHOLD_MB: 20,
|
||||
GC_SETTLE_TIME: 50,
|
||||
TEST_TIMEOUT_MS: 15000,
|
||||
};
|
||||
|
||||
interface MemorySnapshot {
|
||||
rss: number;
|
||||
heapUsed: number;
|
||||
heapTotal: number;
|
||||
external: number;
|
||||
}
|
||||
|
||||
function takeMemorySnapshot(): MemorySnapshot {
|
||||
const mem = process.memoryUsage();
|
||||
return {
|
||||
rss: Math.round(mem.rss / 1024 / 1024),
|
||||
heapUsed: Math.round(mem.heapUsed / 1024 / 1024),
|
||||
heapTotal: Math.round(mem.heapTotal / 1024 / 1024),
|
||||
external: Math.round(mem.external / 1024 / 1024),
|
||||
};
|
||||
}
|
||||
|
||||
async function forceGCAndSettle(): Promise<void> {
|
||||
for (let i = 0; i < 2; i++) {
|
||||
Bun.gc(true);
|
||||
await Bun.sleep(CONFIG.GC_SETTLE_TIME);
|
||||
}
|
||||
}
|
||||
|
||||
function logMemoryDiff(before: MemorySnapshot, after: MemorySnapshot, label: string) {
|
||||
const rssDiff = after.rss - before.rss;
|
||||
const heapDiff = after.heapUsed - before.heapUsed;
|
||||
console.log(`${label}:`, {
|
||||
rss: `${before.rss}MB -> ${after.rss}MB (${rssDiff >= 0 ? "+" : ""}${rssDiff}MB)`,
|
||||
heap: `${before.heapUsed}MB -> ${after.heapUsed}MB (${heapDiff >= 0 ? "+" : ""}${heapDiff}MB)`,
|
||||
});
|
||||
|
||||
if (rssDiff > 50) {
|
||||
console.warn(`⚠️ Large memory increase detected: +${rssDiff}MB RSS`);
|
||||
}
|
||||
}
|
||||
|
||||
async function withTimeout<T>(promise: Promise<T>, ms: number, description: string): Promise<T> {
|
||||
const timeout = new Promise<never>((_, reject) => {
|
||||
setTimeout(() => reject(new Error(`${description} timed out after ${ms}ms`)), ms);
|
||||
});
|
||||
return Promise.race([promise, timeout]);
|
||||
}
|
||||
|
||||
async function runWorkerBatch(workerCode: string, batchSize: number = CONFIG.BATCH_SIZE): Promise<void> {
|
||||
const workers: WebWorker[] = [];
|
||||
|
||||
for (let i = 0; i < batchSize; i++) {
|
||||
const worker = new WebWorker(workerCode, { eval: true });
|
||||
workers.push(worker);
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
worker.removeAllListeners();
|
||||
reject(new Error(`Worker ${i} failed to respond within timeout`));
|
||||
}, 5000);
|
||||
|
||||
worker.once("message", msg => {
|
||||
clearTimeout(timeout);
|
||||
if (msg.error) {
|
||||
reject(new Error(msg.error));
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
await Promise.all(workers.map(worker => worker.terminate()));
|
||||
await forceGCAndSettle();
|
||||
}
|
||||
|
||||
describe("Worker Memory Leak Tests", () => {
|
||||
test(
|
||||
"workers should not leak memory with basic create/terminate cycles",
|
||||
async () => {
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
parentPort.postMessage('ready');
|
||||
`;
|
||||
|
||||
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} warmup iterations...`);
|
||||
|
||||
// warmup
|
||||
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
console.log(`Warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
|
||||
}
|
||||
|
||||
const baselineMemory = takeMemorySnapshot();
|
||||
console.log("Baseline memory after warmup:", baselineMemory);
|
||||
|
||||
console.log(`Running ${CONFIG.TEST_ITERATIONS} test iterations...`);
|
||||
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
|
||||
if ((i + 1) % 3 === 0) {
|
||||
const currentMemory = takeMemorySnapshot();
|
||||
console.log(`Test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
|
||||
}
|
||||
}
|
||||
|
||||
const finalMemory = takeMemorySnapshot();
|
||||
logMemoryDiff(baselineMemory, finalMemory, "Basic create/terminate test");
|
||||
|
||||
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
|
||||
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
|
||||
},
|
||||
CONFIG.TEST_TIMEOUT_MS,
|
||||
);
|
||||
|
||||
test(
|
||||
"workers with HTTP activity should not leak memory",
|
||||
async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
fetch() {
|
||||
return new Response("OK");
|
||||
},
|
||||
});
|
||||
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
|
||||
async function doWork() {
|
||||
try {
|
||||
const response = await fetch('http://localhost:${server.port}');
|
||||
await response.text();
|
||||
parentPort.postMessage('done');
|
||||
} catch (err) {
|
||||
parentPort.postMessage({ error: err.message });
|
||||
}
|
||||
}
|
||||
|
||||
doWork();
|
||||
`;
|
||||
|
||||
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} HTTP warmup iterations...`);
|
||||
|
||||
// warmup
|
||||
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
console.log(`HTTP warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
|
||||
}
|
||||
|
||||
const baselineMemory = takeMemorySnapshot();
|
||||
console.log("HTTP baseline memory after warmup:", baselineMemory);
|
||||
|
||||
console.log(`Running ${CONFIG.TEST_ITERATIONS} HTTP test iterations...`);
|
||||
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
|
||||
if ((i + 1) % 3 === 0) {
|
||||
const currentMemory = takeMemorySnapshot();
|
||||
console.log(`HTTP test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
|
||||
}
|
||||
}
|
||||
|
||||
const finalMemory = takeMemorySnapshot();
|
||||
logMemoryDiff(baselineMemory, finalMemory, "HTTP activity test");
|
||||
|
||||
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
|
||||
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
|
||||
},
|
||||
CONFIG.TEST_TIMEOUT_MS,
|
||||
);
|
||||
|
||||
test(
|
||||
"workers with message passing should not leak memory",
|
||||
async () => {
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
|
||||
parentPort.on('message', (msg) => {
|
||||
if (msg === 'start') {
|
||||
for (let j = 0; j < 10; j++) {
|
||||
parentPort.postMessage({ count: j, data: 'x'.repeat(1000) });
|
||||
}
|
||||
parentPort.postMessage('done');
|
||||
}
|
||||
});
|
||||
`;
|
||||
|
||||
async function runMessagePassingBatch(): Promise<void> {
|
||||
const workers: WebWorker[] = [];
|
||||
|
||||
for (let i = 0; i < CONFIG.BATCH_SIZE; i++) {
|
||||
const worker = new WebWorker(workerCode, { eval: true });
|
||||
workers.push(worker);
|
||||
|
||||
await new Promise<void>(resolve => {
|
||||
worker.on("message", msg => {
|
||||
if (msg === "done") {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
worker.postMessage("start");
|
||||
});
|
||||
}
|
||||
|
||||
await Promise.all(workers.map(worker => worker.terminate()));
|
||||
await forceGCAndSettle();
|
||||
}
|
||||
|
||||
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} message passing warmup iterations...`);
|
||||
|
||||
// warmup
|
||||
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
|
||||
await runMessagePassingBatch();
|
||||
console.log(`Message passing warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
|
||||
}
|
||||
|
||||
const baselineMemory = takeMemorySnapshot();
|
||||
console.log("Message passing baseline memory after warmup:", baselineMemory);
|
||||
|
||||
console.log(`Running ${CONFIG.TEST_ITERATIONS} message passing test iterations...`);
|
||||
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
|
||||
await runMessagePassingBatch();
|
||||
|
||||
if ((i + 1) % 3 === 0) {
|
||||
const currentMemory = takeMemorySnapshot();
|
||||
console.log(
|
||||
`Message passing test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const finalMemory = takeMemorySnapshot();
|
||||
logMemoryDiff(baselineMemory, finalMemory, "Message passing test");
|
||||
|
||||
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
|
||||
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
|
||||
},
|
||||
CONFIG.TEST_TIMEOUT_MS,
|
||||
);
|
||||
|
||||
test(
|
||||
"workers with timers should not leak memory",
|
||||
async () => {
|
||||
const workerCode = `
|
||||
const { parentPort } = require('worker_threads');
|
||||
|
||||
const timers = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
timers.push(setTimeout(() => {}, 10000));
|
||||
timers.push(setInterval(() => {}, 1000));
|
||||
}
|
||||
|
||||
parentPort.postMessage('ready');
|
||||
`;
|
||||
|
||||
console.log(`Running ${CONFIG.WARMUP_ITERATIONS} timer warmup iterations...`);
|
||||
|
||||
// warmup
|
||||
for (let i = 0; i < CONFIG.WARMUP_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
console.log(`Timer warmup ${i + 1}/${CONFIG.WARMUP_ITERATIONS} completed`);
|
||||
}
|
||||
|
||||
const baselineMemory = takeMemorySnapshot();
|
||||
console.log("Timer baseline memory after warmup:", baselineMemory);
|
||||
|
||||
console.log(`Running ${CONFIG.TEST_ITERATIONS} timer test iterations...`);
|
||||
for (let i = 0; i < CONFIG.TEST_ITERATIONS; i++) {
|
||||
await runWorkerBatch(workerCode);
|
||||
|
||||
if ((i + 1) % 3 === 0) {
|
||||
const currentMemory = takeMemorySnapshot();
|
||||
console.log(`Timer test iteration ${i + 1}/${CONFIG.TEST_ITERATIONS} - RSS: ${currentMemory.rss}MB`);
|
||||
}
|
||||
}
|
||||
|
||||
const finalMemory = takeMemorySnapshot();
|
||||
logMemoryDiff(baselineMemory, finalMemory, "Timer cleanup test");
|
||||
|
||||
const memoryIncrease = finalMemory.rss - baselineMemory.rss;
|
||||
expect(memoryIncrease).toBeLessThan(CONFIG.MEMORY_THRESHOLD_MB);
|
||||
},
|
||||
CONFIG.TEST_TIMEOUT_MS,
|
||||
);
|
||||
});
|
||||
@@ -94,7 +94,7 @@ describe("web worker", () => {
|
||||
};
|
||||
});
|
||||
|
||||
test("worker-env", done => {
|
||||
test("worker-env without a lot of properties", done => {
|
||||
const worker = new Worker(new URL("worker-fixture-env.js", import.meta.url).href, {
|
||||
env: {
|
||||
// Verify that we use putDirectMayBeIndex instead of putDirect
|
||||
@@ -342,12 +342,12 @@ describe("worker_threads", () => {
|
||||
const worker = new wt.Worker(new URL("worker-fixture-process-exit.js", import.meta.url).href, {
|
||||
smol: true,
|
||||
});
|
||||
await Bun.sleep(200);
|
||||
const code = await worker.terminate();
|
||||
expect(code).toBe(2);
|
||||
const [exitCode] = await once(worker, "exit");
|
||||
expect<number | undefined>(await worker.terminate()).toBe(undefined);
|
||||
expect<number | undefined>(exitCode).toBe(2);
|
||||
});
|
||||
|
||||
test.todo("worker terminating forcefully properly interrupts", async () => {
|
||||
test("worker terminating forcefully properly interrupts", async () => {
|
||||
const worker = new wt.Worker(new URL("worker-fixture-while-true.js", import.meta.url).href, {});
|
||||
await new Promise<void>(done => {
|
||||
worker.on("message", () => done());
|
||||
|
||||
@@ -74,6 +74,7 @@
|
||||
"pg-gateway": "0.3.0-beta.4",
|
||||
"pino": "9.4.0",
|
||||
"pino-pretty": "11.2.2",
|
||||
"piscina": "5.0.0",
|
||||
"postgres": "3.3.5",
|
||||
"prisma": "5.1.1",
|
||||
"prompts": "2.4.2",
|
||||
|
||||
Reference in New Issue
Block a user