experiment

This commit is contained in:
Alistair Smith
2025-06-19 14:13:28 +01:00
parent 0b9435ddf4
commit ffa2ccb236
4 changed files with 44 additions and 14 deletions

View File

@@ -322,8 +322,11 @@ void MessagePort::dispatchMessages()
if (!context || context->activeDOMObjectsAreSuspended() || !isEntangled())
return;
auto messagesTakenHandler = [this, protectedThis = Ref { *this }](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionCallback) mutable {
RefPtr<ScriptExecutionContext> context = scriptExecutionContext();
auto executionContextIdentifier = scriptExecutionContext()->identifier();
auto messagesTakenHandler = [this, protectedThis = Ref { *this }, executionContextIdentifier](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionCallback) mutable {
RefPtr<ScriptExecutionContext> context = ScriptExecutionContext::getScriptExecutionContext(executionContextIdentifier);
if (!context || !context->globalObject()) {
completionCallback();
return;
@@ -333,7 +336,7 @@ void MessagePort::dispatchMessages()
processMessages(*context, WTFMove(messages), WTFMove(completionCallback));
};
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(m_identifier, WTFMove(messagesTakenHandler));
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(executionContextIdentifier, m_identifier, WTFMove(messagesTakenHandler));
}
// synchronous for node:worker_threads.receiveMessageOnPort

View File

@@ -26,6 +26,7 @@
#pragma once
#include "ProcessIdentifier.h"
#include "ScriptExecutionContext.h"
#include <wtf/CompletionHandler.h>
#include <wtf/Vector.h>
@@ -58,7 +59,7 @@ public:
virtual void messagePortDisentangled(const MessagePortIdentifier& local) = 0;
virtual void messagePortClosed(const MessagePortIdentifier& local) = 0;
virtual void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) = 0;
virtual void takeAllMessagesForPort(const ScriptExecutionContextIdentifier, const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) = 0;
virtual void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&) = 0;
virtual void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) = 0;
};

View File

@@ -27,9 +27,13 @@
#include "MessagePortChannelProviderImpl.h"
#include "MessagePort.h"
#include "ScriptExecutionContext.h"
#include "BunClientData.h"
#include <wtf/MainThread.h>
#include <wtf/RunLoop.h>
extern "C" void* Bun__getVM();
namespace WebCore {
MessagePortChannelProviderImpl::MessagePortChannelProviderImpl() = default;
@@ -82,17 +86,38 @@ void MessagePortChannelProviderImpl::postMessageToRemote(MessageWithMessagePorts
});
}
void MessagePortChannelProviderImpl::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& outerCallback)
void MessagePortChannelProviderImpl::takeAllMessagesForPort(const ScriptExecutionContextIdentifier identifier, const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& outerCallback)
{
// It is the responsibility of outerCallback to get itself to the appropriate thread (e.g. WebWorker thread)
auto callback = [outerCallback = WTFMove(outerCallback)](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& messageDeliveryCallback) mutable {
ASSERT(isMainThread());
outerCallback(WTFMove(messages), WTFMove(messageDeliveryCallback));
};
if (WTF::isMainThread()) {
m_registry.takeAllMessagesForPort(port, WTFMove(outerCallback));
return;
}
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, port, callback = WTFMove(callback)](ScriptExecutionContext& context) mutable {
if (CheckedPtr registry = weakRegistry.get())
registry->takeAllMessagesForPort(port, WTFMove(callback));
auto currentVM = Bun__getVM();
if (!currentVM) {
outerCallback({}, [](){}); // already destroyed
return;
}
ScriptExecutionContext::ensureOnMainThread([weakRegistry = WeakPtr { m_registry }, port, outerCallback = WTFMove(outerCallback), identifier](ScriptExecutionContext& mainContext) mutable {
CheckedPtr registry = weakRegistry.get();
if (!registry) {
ScriptExecutionContext::ensureOnContextThread(identifier, [outerCallback = WTFMove(outerCallback)](ScriptExecutionContext&) mutable {
outerCallback({}, [](){});
});
return;
}
registry->takeAllMessagesForPort(port, [outerCallback = WTFMove(outerCallback), identifier](Vector<MessageWithMessagePorts>&& messages, CompletionHandler<void()>&& completionHandler) mutable {
ScriptExecutionContext::ensureOnContextThread(identifier, [outerCallback = WTFMove(outerCallback), messages = WTFMove(messages), completionHandler = WTFMove(completionHandler)](ScriptExecutionContext&) mutable {
auto wrappedCompletionHandler = [completionHandler = WTFMove(completionHandler)]() mutable {
ScriptExecutionContext::ensureOnMainThread([completionHandler = WTFMove(completionHandler)](ScriptExecutionContext&) mutable {
completionHandler();
});
};
outerCallback(WTFMove(messages), WTFMove(wrappedCompletionHandler));
});
});
});
}

View File

@@ -27,6 +27,7 @@
#include "MessagePortChannelProvider.h"
#include "MessagePortChannelRegistry.h"
#include "ScriptExecutionContext.h"
namespace WebCore {
@@ -41,7 +42,7 @@ private:
void messagePortDisentangled(const MessagePortIdentifier& local) final;
void messagePortClosed(const MessagePortIdentifier& local) final;
void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) final;
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) final;
void takeAllMessagesForPort(const ScriptExecutionContextIdentifier identifier, const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) final;
void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&) final;
MessagePortChannelRegistry m_registry;