diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp index 3b5d322b0b..4ca0db7c35 100644 --- a/src/bun.js/bindings/webcore/Worker.cpp +++ b/src/bun.js/bindings/webcore/Worker.cpp @@ -136,9 +136,8 @@ void Worker::setKeepAlive(bool keepAlive) bool Worker::updatePtr() { if (!WebWorker__updatePtr(impl_, this)) { - m_wasTerminated = true; - m_isClosing = true; - m_isOnline = false; + m_onlineClosingFlags = ClosingFlag; + m_terminationFlags.fetch_or(TerminatedFlag); return false; } @@ -198,6 +197,8 @@ ExceptionOr> Worker::create(ScriptExecutionContext& context, const S execArgv ? static_cast(execArgv->size()) : 0, preloadModules.size() ? preloadModules.data() : nullptr, static_cast(preloadModules.size())); + // now referenced by Zig + worker->ref(); preloadModuleStrings->clear(); @@ -222,7 +223,7 @@ Worker::~Worker() ExceptionOr Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue messageValue, StructuredSerializeOptions&& options) { - if (m_wasTerminated) + if (m_terminationFlags & TerminatedFlag) return Exception { InvalidStateError, "Worker has been terminated"_s }; Vector> ports; @@ -251,7 +252,7 @@ ExceptionOr Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m void Worker::terminate() { // m_contextProxy.terminateWorkerGlobalScope(); - m_wasTerminated = true; + m_terminationFlags.fetch_or(TerminateRequestedFlag); WebWorker__requestTerminate(impl_); } @@ -283,16 +284,17 @@ void Worker::terminate() bool Worker::hasPendingActivity() const { - if (this->m_isOnline) { - return !this->m_isClosing; + auto onlineClosingFlags = m_onlineClosingFlags.load(); + if (onlineClosingFlags & OnlineFlag) { + return !(onlineClosingFlags & ClosingFlag); } - return !this->m_wasTerminated; + return !(m_terminationFlags & TerminatedFlag); } void Worker::dispatchEvent(Event& event) { - if (!m_wasTerminated) + if (!m_terminationFlags) EventTargetWithInlineData::dispatchEvent(event); } @@ -338,7 +340,7 @@ void Worker::dispatchOnline(Zig::GlobalObject* workerGlobalObject) Locker lock(this->m_pendingTasksMutex); - this->m_isOnline = true; + m_onlineClosingFlags.fetch_or(OnlineFlag); auto* thisContext = workerGlobalObject->scriptExecutionContext(); if (!thisContext) { return; @@ -386,20 +388,19 @@ void Worker::dispatchExit(int32_t exitCode) return; ScriptExecutionContext::postTaskTo(ctx->identifier(), [exitCode, protectedThis = Ref { *this }](ScriptExecutionContext& context) -> void { - protectedThis->m_isOnline = false; - protectedThis->m_isClosing = true; + protectedThis->m_onlineClosingFlags = ClosingFlag; if (protectedThis->hasEventListeners(eventNames().closeEvent)) { auto event = CloseEvent::create(exitCode == 0, static_cast(exitCode), exitCode == 0 ? "Worker terminated normally"_s : "Worker exited abnormally"_s); protectedThis->dispatchCloseEvent(event); } - protectedThis->m_wasTerminated = true; + protectedThis->m_terminationFlags.fetch_or(TerminatedFlag); }); } void Worker::postTaskToWorkerGlobalScope(Function&& task) { - if (!this->m_isOnline) { + if (!(m_onlineClosingFlags & OnlineFlag)) { Locker lock(this->m_pendingTasksMutex); this->m_pendingTasks.append(WTFMove(task)); return; @@ -417,9 +418,8 @@ void Worker::forEachWorker(const Functionref(); worker->dispatchExit(exitCode); + // no longer referenced by Zig worker->deref(); if (globalObject) { diff --git a/src/bun.js/bindings/webcore/Worker.h b/src/bun.js/bindings/webcore/Worker.h index a83f97bbbe..9fe3aba7ac 100644 --- a/src/bun.js/bindings/webcore/Worker.h +++ b/src/bun.js/bindings/webcore/Worker.h @@ -68,7 +68,7 @@ public: using ThreadSafeRefCounted::ref; void terminate(); - bool wasTerminated() const { return m_wasTerminated; } + bool wasTerminated() const { return m_terminationFlags & TerminatedFlag; } bool hasPendingActivity() const; bool updatePtr(); @@ -120,6 +120,11 @@ private: static void networkStateChanged(bool isOnLine); + static constexpr uint8_t OnlineFlag = 1 << 0; + static constexpr uint8_t ClosingFlag = 1 << 1; + static constexpr uint8_t TerminateRequestedFlag = 1 << 0; + static constexpr uint8_t TerminatedFlag = 1 << 1; + // RefPtr m_scriptLoader; WorkerOptions m_options; String m_identifier; @@ -132,10 +137,11 @@ private: Deque> m_pendingEvents; Lock m_pendingTasksMutex; Deque> m_pendingTasks; - std::atomic m_wasTerminated { false }; bool m_didStartWorkerGlobalScope { false }; - bool m_isOnline { false }; - bool m_isClosing { false }; + // Tracks OnlineFlag and ClosingFlag + std::atomic m_onlineClosingFlags { 0 }; + // Tracks TerminateRequestedFlag and TerminatedFlag + std::atomic m_terminationFlags { 0 }; const ScriptExecutionContextIdentifier m_clientIdentifier; void* impl_ { nullptr }; }; diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts index 4cd282398f..8d5eabb642 100644 --- a/src/js/node/worker_threads.ts +++ b/src/js/node/worker_threads.ts @@ -234,11 +234,11 @@ class Worker extends EventEmitter { } throw e; } - this.#worker.addEventListener("close", this.#onClose.bind(this)); + this.#worker.addEventListener("close", this.#onClose.bind(this), { once: true }); this.#worker.addEventListener("error", this.#onError.bind(this)); this.#worker.addEventListener("message", this.#onMessage.bind(this)); this.#worker.addEventListener("messageerror", this.#onMessageError.bind(this)); - this.#worker.addEventListener("open", this.#onOpen.bind(this)); + this.#worker.addEventListener("open", this.#onOpen.bind(this), { once: true }); if (this.#urlToRevoke) { const url = this.#urlToRevoke;