mirror of
https://github.com/oven-sh/bun
synced 2026-02-16 05:42:43 +00:00
indent + changes
This commit is contained in:
@@ -861,6 +861,7 @@ if(NOT WIN32)
|
||||
-Wno-unused-function
|
||||
-Wno-c++23-lambda-attributes
|
||||
-Wno-nullability-completeness
|
||||
-Wmisleading-indentation
|
||||
-Werror
|
||||
)
|
||||
else()
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include <wtf/TZoneMallocInlines.h>
|
||||
#include <wtf/Lock.h>
|
||||
#include <wtf/Scope.h>
|
||||
#include <wtf/threads/BinarySemaphore.h>
|
||||
|
||||
extern "C" void Bun__eventLoop__incrementRefConcurrently(void* bunVM, int delta);
|
||||
|
||||
@@ -238,7 +239,11 @@ void MessagePort::close()
|
||||
return;
|
||||
m_isDetached = true;
|
||||
|
||||
MessagePortChannelProvider::singleton().messagePortClosed(m_identifier);
|
||||
WTF::ensureOnMainThread(
|
||||
[this]() {
|
||||
MessagePortChannelProvider::singleton().messagePortClosed(m_identifier);
|
||||
}
|
||||
);
|
||||
|
||||
removeAllEventListeners();
|
||||
}
|
||||
@@ -339,13 +344,29 @@ JSValue MessagePort::tryTakeMessage(JSGlobalObject* lexicalGlobalObject)
|
||||
if (!context || context->activeDOMObjectsAreSuspended() || !isEntangled())
|
||||
return jsUndefined();
|
||||
|
||||
std::optional<MessageWithMessagePorts> messageWithPorts = MessagePortChannelProvider::fromContext(*context).tryTakeMessageForPort(m_identifier);
|
||||
std::optional<MessageWithMessagePorts> result;
|
||||
BinarySemaphore semaphore;
|
||||
|
||||
if (!messageWithPorts)
|
||||
auto callback = [&](std::optional<MessageWithMessagePorts>&& messageWithPorts) {
|
||||
printf("got message\n");
|
||||
result = WTFMove(messageWithPorts);
|
||||
semaphore.signal();
|
||||
};
|
||||
|
||||
WTF::ensureOnMainThread([identifier = m_identifier, callback = WTFMove(callback), context]() mutable {
|
||||
printf("taking message on main thread\n");
|
||||
MessagePortChannelProvider::fromContext(*context).tryTakeMessageForPort(identifier, WTFMove(callback));
|
||||
});
|
||||
|
||||
printf("waiting for message\n");
|
||||
|
||||
semaphore.wait();
|
||||
|
||||
if (!result)
|
||||
return jsUndefined();
|
||||
|
||||
auto ports = MessagePort::entanglePorts(*context, WTFMove(messageWithPorts->transferredPorts));
|
||||
auto message = messageWithPorts->message.releaseNonNull();
|
||||
auto ports = MessagePort::entanglePorts(*context, WTFMove(result->transferredPorts));
|
||||
auto message = result->message.releaseNonNull();
|
||||
return message->deserialize(*lexicalGlobalObject, lexicalGlobalObject, WTFMove(ports), SerializationErrorMode::NonThrowing);
|
||||
}
|
||||
|
||||
@@ -510,4 +531,4 @@ void MessagePort::jsUnref(JSGlobalObject* lexicalGlobalObject)
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace WebCore
|
||||
} // namespace WebCore
|
||||
@@ -185,17 +185,21 @@ bool MessagePortChannel::hasAnyMessagesPendingOrInFlight() const
|
||||
return m_messageBatchesInFlight || !m_pendingMessages[0].isEmpty() || !m_pendingMessages[1].isEmpty();
|
||||
}
|
||||
|
||||
std::optional<MessageWithMessagePorts> MessagePortChannel::tryTakeMessageForPort(const MessagePortIdentifier port)
|
||||
void MessagePortChannel::tryTakeMessageForPort(const MessagePortIdentifier port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
|
||||
{
|
||||
ASSERT(isMainThread());
|
||||
|
||||
ASSERT(port == m_ports[0] || port == m_ports[1]);
|
||||
size_t i = port == m_ports[0] ? 0 : 1;
|
||||
|
||||
if (m_pendingMessages[i].isEmpty())
|
||||
return std::nullopt;
|
||||
if (m_pendingMessages[i].isEmpty()) {
|
||||
callback(std::nullopt);
|
||||
return;
|
||||
}
|
||||
|
||||
auto message = m_pendingMessages[i].first();
|
||||
m_pendingMessages[i].removeAt(0);
|
||||
return WTFMove(message);
|
||||
callback(WTFMove(message));
|
||||
}
|
||||
|
||||
} // namespace WebCore
|
||||
@@ -56,7 +56,7 @@ public:
|
||||
bool postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget);
|
||||
|
||||
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&);
|
||||
std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier);
|
||||
void tryTakeMessageForPort(const MessagePortIdentifier, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&);
|
||||
|
||||
WEBCORE_EXPORT bool hasAnyMessagesPendingOrInFlight() const;
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ public:
|
||||
virtual void messagePortClosed(const MessagePortIdentifier& local) = 0;
|
||||
|
||||
virtual void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) = 0;
|
||||
virtual std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&) = 0;
|
||||
virtual void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&) = 0;
|
||||
virtual void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) = 0;
|
||||
};
|
||||
|
||||
|
||||
@@ -96,9 +96,9 @@ void MessagePortChannelProviderImpl::takeAllMessagesForPort(const MessagePortIde
|
||||
});
|
||||
}
|
||||
|
||||
std::optional<MessageWithMessagePorts> MessagePortChannelProviderImpl::tryTakeMessageForPort(const MessagePortIdentifier& port)
|
||||
void MessagePortChannelProviderImpl::tryTakeMessageForPort(const MessagePortIdentifier& port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
|
||||
{
|
||||
return m_registry.tryTakeMessageForPort(port);
|
||||
m_registry.tryTakeMessageForPort(port, WTFMove(callback));
|
||||
}
|
||||
|
||||
} // namespace WebCore
|
||||
@@ -42,7 +42,7 @@ private:
|
||||
void messagePortClosed(const MessagePortIdentifier& local) final;
|
||||
void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) final;
|
||||
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) final;
|
||||
std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&) final;
|
||||
void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&) final;
|
||||
|
||||
MessagePortChannelRegistry m_registry;
|
||||
};
|
||||
|
||||
@@ -159,25 +159,25 @@ void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentif
|
||||
channel->takeAllMessagesForPort(port, WTFMove(callback));
|
||||
}
|
||||
|
||||
std::optional<MessageWithMessagePorts> MessagePortChannelRegistry::tryTakeMessageForPort(const MessagePortIdentifier& port)
|
||||
void MessagePortChannelRegistry::tryTakeMessageForPort(const MessagePortIdentifier& port, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&& callback)
|
||||
{
|
||||
// Bun calls this from worker threads
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
// LOG(MessagePorts, "Registry: Trying to take a message for MessagePort %s", port.logString().utf8().data());
|
||||
|
||||
// The channel might be gone if the remote side was closed.
|
||||
auto* channel = m_openChannels.get(port);
|
||||
if (!channel)
|
||||
return std::nullopt;
|
||||
if (!channel) {
|
||||
callback(std::nullopt);
|
||||
return;
|
||||
}
|
||||
|
||||
return channel->tryTakeMessageForPort(port);
|
||||
channel->tryTakeMessageForPort(port, WTFMove(callback));
|
||||
}
|
||||
|
||||
MessagePortChannel* MessagePortChannelRegistry::existingChannelContainingPort(const MessagePortIdentifier& port)
|
||||
{
|
||||
// Bun calls this from worker threads
|
||||
// ASSERT(isMainThread());
|
||||
ASSERT(isMainThread());
|
||||
|
||||
return m_openChannels.get(port);
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ public:
|
||||
WEBCORE_EXPORT void didCloseMessagePort(const MessagePortIdentifier& local);
|
||||
WEBCORE_EXPORT bool didPostMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget);
|
||||
WEBCORE_EXPORT void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&);
|
||||
WEBCORE_EXPORT std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&);
|
||||
WEBCORE_EXPORT void tryTakeMessageForPort(const MessagePortIdentifier&, CompletionHandler<void(std::optional<MessageWithMessagePorts>&&)>&&);
|
||||
|
||||
WEBCORE_EXPORT MessagePortChannel* existingChannelContainingPort(const MessagePortIdentifier&);
|
||||
|
||||
|
||||
@@ -357,8 +357,9 @@ JSC_DEFINE_HOST_FUNCTION(functionStartDirectStream, (JSC::JSGlobalObject * lexic
|
||||
templ += `
|
||||
|
||||
void ${className}::ref() {
|
||||
if (!m_sinkPtr)
|
||||
return;
|
||||
if (!m_sinkPtr) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_refCount++;
|
||||
if (m_refCount == 1) {
|
||||
@@ -367,14 +368,14 @@ JSC_DEFINE_HOST_FUNCTION(functionStartDirectStream, (JSC::JSGlobalObject * lexic
|
||||
}
|
||||
|
||||
void ${className}::unref() {
|
||||
if (!m_sinkPtr)
|
||||
return;
|
||||
if (!m_sinkPtr) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_refCount = std::max(0, m_refCount - 1);
|
||||
if (!m_refCount)
|
||||
{
|
||||
m_refCount = std::max(0, m_refCount - 1);
|
||||
if (!m_refCount) {
|
||||
${name}__updateRef(m_sinkPtr, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
JSC_DEFINE_HOST_FUNCTION(${name}__ref, (JSC::JSGlobalObject * lexicalGlobalObject, JSC::CallFrame *callFrame))
|
||||
|
||||
Reference in New Issue
Block a user