Implement node:worker_threads (#3923)

* Start to implement `worker_threads`

* more

* more!!

* more

* Update bundle_v2.zig

* delete outdated tests

* `receiveMessageOnPort`

* props test and export default

* fix merge

* not implemented tests

* individual imports

* `receiveMessageOnPort` tests

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: Dylan Conway <dylan.conway567@gmail.com>
This commit is contained in:
Jarred Sumner
2023-08-02 18:12:12 -07:00
committed by GitHub
parent 505e77c2d0
commit 207c7eb509
22 changed files with 802 additions and 80 deletions

View File

@@ -237,7 +237,7 @@ public:
};
}
extern "C" WebCore::Worker* WebWorker__getParentWorker(void*);
extern "C" void JSCInitialize(const char* envp[], size_t envc, void (*onCrash)(const char* ptr, size_t length))
{
if (has_loaded_jsc)
@@ -1586,18 +1586,46 @@ JSC_DEFINE_HOST_FUNCTION(functionCallNotImplemented,
return JSC::JSValue::encode(JSC::JSValue {});
}
JSC_DEFINE_HOST_FUNCTION(jsReceiveMessageOnPort, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
{
auto& vm = lexicalGlobalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
if (callFrame->argumentCount() < 1) {
throwTypeError(lexicalGlobalObject, scope, "receiveMessageOnPort needs 1 argument"_s);
return JSC::JSValue::encode(JSC::JSValue {});
}
auto port = callFrame->argument(0);
if (!port.isObject()) {
throwTypeError(lexicalGlobalObject, scope, "the \"port\" argument must be a MessagePort instance"_s);
return JSC::JSValue::encode(jsUndefined());
}
if (auto* messagePort = jsDynamicCast<JSMessagePort*>(port)) {
return JSC::JSValue::encode(messagePort->wrapped().tryTakeMessage(lexicalGlobalObject));
} else if (auto* broadcastChannel = jsDynamicCast<JSBroadcastChannel*>(port)) {
// TODO: support broadcast channels
return JSC::JSValue::encode(jsUndefined());
}
throwTypeError(lexicalGlobalObject, scope, "the \"port\" argument must be a MessagePort instance"_s);
return JSC::JSValue::encode(jsUndefined());
}
// we're trying out a new way to do this lazy loading
// this is $lazy() in js code
static JSC_DEFINE_HOST_FUNCTION(functionLazyLoad,
(JSC::JSGlobalObject * lexicalGlobalObject, JSC::CallFrame* callFrame))
{
JSC:
Zig::GlobalObject* globalObject = reinterpret_cast<Zig::GlobalObject*>(lexicalGlobalObject);
auto scope = DECLARE_THROW_SCOPE(globalObject->vm());
VM& vm = globalObject->vm();
switch (callFrame->argumentCount()) {
case 0: {
auto scope = DECLARE_THROW_SCOPE(globalObject->vm());
JSC::throwTypeError(globalObject, scope, "lazyLoad needs 1 argument (a string)"_s);
scope.release();
return JSC::JSValue::encode(JSC::JSValue {});
@@ -1607,7 +1635,6 @@ JSC:
if (moduleName.isNumber()) {
switch (moduleName.toInt32(globalObject)) {
case 0: {
auto scope = DECLARE_THROW_SCOPE(globalObject->vm());
JSC::throwTypeError(globalObject, scope, "lazyLoad expects a string"_s);
scope.release();
return JSC::JSValue::encode(JSC::JSValue {});
@@ -1634,7 +1661,6 @@ JSC:
auto string = moduleName.toWTFString(globalObject);
if (string.isNull()) {
auto scope = DECLARE_THROW_SCOPE(globalObject->vm());
JSC::throwTypeError(globalObject, scope, "lazyLoad expects a string"_s);
scope.release();
return JSC::JSValue::encode(JSC::JSValue {});
@@ -1644,6 +1670,32 @@ JSC:
return JSC::JSValue::encode(JSSQLStatementConstructor::create(vm, globalObject, JSSQLStatementConstructor::createStructure(vm, globalObject, globalObject->m_functionPrototype.get())));
}
if (string == "worker_threads"_s) {
JSValue workerData = jsUndefined();
JSValue threadId = jsNumber(0);
if (auto* worker = WebWorker__getParentWorker(globalObject->bunVM())) {
auto& options = worker->options();
if (worker && options.bun.data) {
auto ports = MessagePort::entanglePorts(*ScriptExecutionContext::getScriptExecutionContext(worker->clientIdentifier()), WTFMove(options.bun.dataMessagePorts));
RefPtr<WebCore::SerializedScriptValue> serialized = WTFMove(options.bun.data);
JSValue deserialized = serialized->deserialize(*globalObject, globalObject, WTFMove(ports));
RETURN_IF_EXCEPTION(scope, {});
workerData = deserialized;
}
threadId = jsNumber(worker->clientIdentifier());
}
JSArray* array = constructEmptyArray(globalObject, nullptr);
array->push(globalObject, workerData);
array->push(globalObject, threadId);
array->push(globalObject, JSFunction::create(vm, globalObject, 1, "receiveMessageOnPort"_s, jsReceiveMessageOnPort, ImplementationVisibility::Public, NoIntrinsic));
return JSC::JSValue::encode(array);
}
if (string == "pathToFileURL"_s) {
return JSValue::encode(
JSFunction::create(vm, globalObject, 1, pathToFileURLString, functionPathToFileURL, ImplementationVisibility::Public, NoIntrinsic));
@@ -3551,7 +3603,6 @@ void GlobalObject::finishCreation(VM& vm)
consoleObject->putDirectBuiltinFunction(vm, this, clientData->builtinNames().writePublicName(), consoleObjectWriteCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::ReadOnly | PropertyAttribute::DontDelete);
}
extern "C" WebCore::Worker* WebWorker__getParentWorker(void*);
JSC_DEFINE_HOST_FUNCTION(jsFunctionPostMessage,
(JSC::JSGlobalObject * leixcalGlobalObject, JSC::CallFrame* callFrame))
{
@@ -3596,7 +3647,7 @@ JSC_DEFINE_HOST_FUNCTION(jsFunctionPostMessage,
}
Vector<RefPtr<MessagePort>> ports;
ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*globalObject, value, WTFMove(transferList), ports);
ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*globalObject, value, WTFMove(transferList), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage);
if (serialized.hasException()) {
WebCore::propagateException(*globalObject, throwScope, serialized.releaseException());
return JSValue::encode(jsUndefined());

View File

@@ -55,6 +55,7 @@
#include <wtf/GetPtr.h>
#include <wtf/PointerPreparations.h>
#include <wtf/URL.h>
#include "SerializedScriptValue.h"
namespace WebCore {
using namespace JSC;
@@ -147,6 +148,51 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor::const
options.bun.unref = !ref.toBoolean(lexicalGlobalObject);
RETURN_IF_EXCEPTION(throwScope, encodedJSValue());
}
auto workerData = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "workerData"_s));
if (!workerData) {
workerData = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "data"_s));
}
if (workerData) {
Vector<RefPtr<MessagePort>> ports;
Vector<JSC::Strong<JSC::JSObject>> transferList;
if (JSValue transferListValue = bunObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "transferList"_s))) {
if (transferListValue.isObject()) {
JSC::JSObject* transferListObject = transferListValue.getObject();
if (auto* transferListArray = jsDynamicCast<JSC::JSArray*>(transferListObject)) {
for (unsigned i = 0; i < transferListArray->length(); i++) {
JSC::JSValue transferListValue = transferListArray->get(lexicalGlobalObject, i);
if (transferListValue.isObject()) {
JSC::JSObject* transferListObject = transferListValue.getObject();
transferList.append(JSC::Strong<JSC::JSObject>(vm, transferListObject));
}
}
}
}
}
ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*lexicalGlobalObject, workerData, WTFMove(transferList), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage);
if (serialized.hasException()) {
WebCore::propagateException(*lexicalGlobalObject, throwScope, serialized.releaseException());
return encodedJSValue();
}
Vector<TransferredMessagePort> transferredPorts;
if (!ports.isEmpty()) {
auto disentangleResult = MessagePort::disentanglePorts(WTFMove(ports));
if (disentangleResult.hasException()) {
WebCore::propagateException(*lexicalGlobalObject, throwScope, disentangleResult.releaseException());
return encodedJSValue();
}
transferredPorts = disentangleResult.releaseReturnValue();
}
options.bun.data = WTFMove(serialized.releaseReturnValue());
options.bun.dataMessagePorts = WTFMove(transferredPorts);
}
}
}

View File

@@ -109,7 +109,6 @@ ScriptExecutionContextIdentifier MessagePort::contextIdForMessagePortId(MessageP
void MessagePort::notifyMessageAvailable(const MessagePortIdentifier& identifier)
{
ASSERT(isMainThread());
ScriptExecutionContextIdentifier scriptExecutionContextIdentifier;
{
Locker locker { allMessagePortsLock };
@@ -171,9 +170,7 @@ MessagePort::~MessagePort()
void MessagePort::entangle()
{
ScriptExecutionContext::ensureOnMainThread([identifier = m_identifier, remoteIdentifier = m_remoteIdentifier](ScriptExecutionContext& context) {
MessagePortChannelProvider::fromContext(context).entangleLocalPortInThisProcessToRemote(identifier, remoteIdentifier);
});
MessagePortChannelProvider::fromContext(*scriptExecutionContext()).entangleLocalPortInThisProcessToRemote(m_identifier, m_remoteIdentifier);
}
ExceptionOr<void> MessagePort::postMessage(JSC::JSGlobalObject& state, JSC::JSValue messageValue, StructuredSerializeOptions&& options)
@@ -265,9 +262,7 @@ void MessagePort::close()
return;
m_isDetached = true;
ScriptExecutionContext::ensureOnMainThread([identifier = m_identifier, protectedThis = Ref { *this }](ScriptExecutionContext& context) {
MessagePortChannelProvider::singleton().messagePortClosed(identifier);
});
MessagePortChannelProvider::singleton().messagePortClosed(m_identifier);
removeAllEventListeners();
}
@@ -316,6 +311,22 @@ void MessagePort::dispatchMessages()
MessagePortChannelProvider::fromContext(*context).takeAllMessagesForPort(m_identifier, WTFMove(messagesTakenHandler));
}
JSValue MessagePort::tryTakeMessage(JSGlobalObject* lexicalGlobalObject)
{
auto* context = scriptExecutionContext();
if (!context || context->activeDOMObjectsAreSuspended() || !isEntangled())
return jsUndefined();
std::optional<MessageWithMessagePorts> messageWithPorts = MessagePortChannelProvider::fromContext(*context).tryTakeMessageForPort(m_identifier);
if (!messageWithPorts)
return jsUndefined();
auto ports = MessagePort::entanglePorts(*context, WTFMove(messageWithPorts->transferredPorts));
auto message = messageWithPorts->message.releaseNonNull();
return message->deserialize(*lexicalGlobalObject, lexicalGlobalObject, WTFMove(ports), SerializationErrorMode::NonThrowing);
}
void MessagePort::dispatchEvent(Event& event)
{
if (m_isDetached) {

View File

@@ -96,6 +96,8 @@ public:
void dispatchEvent(Event&) final;
JSValue tryTakeMessage(JSGlobalObject*);
TransferredMessagePort disentangle();
static Ref<MessagePort> entangle(ScriptExecutionContext&, TransferredMessagePort&&);

View File

@@ -41,8 +41,6 @@ Ref<MessagePortChannel> MessagePortChannel::create(MessagePortChannelRegistry& r
MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
: m_registry(registry)
{
ASSERT(isMainThread());
relaxAdoptionRequirement();
m_ports[0] = port1;
@@ -62,7 +60,6 @@ MessagePortChannel::~MessagePortChannel()
std::optional<ProcessIdentifier> MessagePortChannel::processForPort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
return m_processes[i];
@@ -70,15 +67,11 @@ std::optional<ProcessIdentifier> MessagePortChannel::processForPort(const Messag
bool MessagePortChannel::includesPort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
return m_ports[0] == port || m_ports[1] == port;
}
void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& port, ProcessIdentifier process)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
@@ -92,8 +85,6 @@ void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& po
void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
LOG(MessagePorts, "MessagePortChannel %s (%p) disentangling port %s", logString().utf8().data(), this, port.logString().utf8().data());
ASSERT(port == m_ports[0] || port == m_ports[1]);
@@ -110,8 +101,6 @@ void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
void MessagePortChannel::closePort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
@@ -130,8 +119,6 @@ void MessagePortChannel::closePort(const MessagePortIdentifier& port)
bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
{
ASSERT(isMainThread());
ASSERT(remoteTarget == m_ports[0] || remoteTarget == m_ports[1]);
size_t i = remoteTarget == m_ports[0] ? 0 : 1;
@@ -149,8 +136,6 @@ bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message,
void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& callback)
{
ASSERT(isMainThread());
LOG(MessagePorts, "MessagePortChannel %p taking all messages for port %s", this, port.logString().utf8().data());
ASSERT(port == m_ports[0] || port == m_ports[1]);
@@ -181,10 +166,17 @@ void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& por
});
}
bool MessagePortChannel::hasAnyMessagesPendingOrInFlight() const
std::optional<MessageWithMessagePorts> MessagePortChannel::tryTakeMessageForPort(const MessagePortIdentifier port)
{
ASSERT(isMainThread());
return m_messageBatchesInFlight || !m_pendingMessages[0].isEmpty() || !m_pendingMessages[1].isEmpty();
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
if (m_pendingMessages[i].isEmpty())
return std::nullopt;
auto message = m_pendingMessages[i].first();
m_pendingMessages[i].remove(0);
return WTFMove(message);
}
} // namespace WebCore

View File

@@ -54,6 +54,7 @@ public:
bool postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget);
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&);
std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier);
WEBCORE_EXPORT bool hasAnyMessagesPendingOrInFlight() const;

View File

@@ -27,6 +27,7 @@
#include "ProcessIdentifier.h"
#include "BunWorkerGlobalScope.h"
#include "MessageWithMessagePorts.h"
#include <wtf/CompletionHandler.h>
#include <wtf/Vector.h>
@@ -51,6 +52,7 @@ public:
virtual void messagePortClosed(const MessagePortIdentifier& local) = 0;
virtual void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) = 0;
virtual std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&) = 0;
virtual void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) = 0;
};

View File

@@ -43,38 +43,28 @@ MessagePortChannelProviderImpl::~MessagePortChannelProviderImpl()
void MessagePortChannelProviderImpl::createNewMessagePortChannel(const MessagePortIdentifier& local, const MessagePortIdentifier& remote)
{
ScriptExecutionContext::ensureOnMainThread([registry = &m_registry, local, remote](ScriptExecutionContext& context) mutable {
registry->didCreateMessagePortChannel(local, remote);
});
m_registry.didCreateMessagePortChannel(local, remote);
}
void MessagePortChannelProviderImpl::entangleLocalPortInThisProcessToRemote(const MessagePortIdentifier& local, const MessagePortIdentifier& remote)
{
ScriptExecutionContext::ensureOnMainThread([registry = &m_registry, local, remote](ScriptExecutionContext& context) mutable {
registry->didEntangleLocalToRemote(local, remote, ProcessIdent::identifier());
});
m_registry.didEntangleLocalToRemote(local, remote, ProcessIdent::identifier());
}
void MessagePortChannelProviderImpl::messagePortDisentangled(const MessagePortIdentifier& local)
{
ScriptExecutionContext::ensureOnMainThread([registry = &m_registry, local](ScriptExecutionContext& context) mutable {
registry->didDisentangleMessagePort(local);
});
m_registry.didDisentangleMessagePort(local);
}
void MessagePortChannelProviderImpl::messagePortClosed(const MessagePortIdentifier& local)
{
ScriptExecutionContext::ensureOnMainThread([registry = &m_registry, local](ScriptExecutionContext& context) mutable {
registry->didCloseMessagePort(local);
});
m_registry.didCloseMessagePort(local);
}
void MessagePortChannelProviderImpl::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
{
ScriptExecutionContext::ensureOnMainThread([message = WTFMove(message), registry = &m_registry, remoteTarget](ScriptExecutionContext& context) mutable {
if (registry->didPostMessageToRemote(WTFMove(message), remoteTarget))
MessagePort::notifyMessageAvailable(remoteTarget);
});
if (m_registry.didPostMessageToRemote(WTFMove(message), remoteTarget))
MessagePort::notifyMessageAvailable(remoteTarget);
}
void MessagePortChannelProviderImpl::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& outerCallback)
@@ -85,9 +75,12 @@ void MessagePortChannelProviderImpl::takeAllMessagesForPort(const MessagePortIde
outerCallback(WTFMove(messages), WTFMove(messageDeliveryCallback));
};
ScriptExecutionContext::ensureOnMainThread([registry = &m_registry, port, callback = WTFMove(callback)](ScriptExecutionContext& context) mutable {
registry->takeAllMessagesForPort(port, WTFMove(callback));
});
m_registry.takeAllMessagesForPort(port, WTFMove(callback));
}
std::optional<MessageWithMessagePorts> MessagePortChannelProviderImpl::tryTakeMessageForPort(const MessagePortIdentifier& port)
{
return m_registry.tryTakeMessageForPort(port);
}
} // namespace WebCore

View File

@@ -27,6 +27,7 @@
#include "MessagePortChannelProvider.h"
#include "MessagePortChannelRegistry.h"
#include "MessageWithMessagePorts.h"
namespace WebCore {
@@ -42,6 +43,7 @@ private:
void messagePortClosed(const MessagePortIdentifier& local) final;
void postMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget) final;
void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&) final;
std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&) final;
MessagePortChannelRegistry m_registry;
};

View File

@@ -154,6 +154,20 @@ void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentif
channel->takeAllMessagesForPort(port, WTFMove(callback));
}
std::optional<MessageWithMessagePorts> MessagePortChannelRegistry::tryTakeMessageForPort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
LOG(MessagePorts, "Registry: Trying to take a message for MessagePort %s", port.logString().utf8().data());
// The channel might be gone if the remote side was closed.
auto* channel = m_openChannels.get(port);
if (!channel)
return std::nullopt;
return channel->tryTakeMessageForPort(port);
}
MessagePortChannel* MessagePortChannelRegistry::existingChannelContainingPort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());

View File

@@ -45,6 +45,7 @@ public:
WEBCORE_EXPORT void didCloseMessagePort(const MessagePortIdentifier& local);
WEBCORE_EXPORT bool didPostMessageToRemote(MessageWithMessagePorts&&, const MessagePortIdentifier& remoteTarget);
WEBCORE_EXPORT void takeAllMessagesForPort(const MessagePortIdentifier&, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&&);
WEBCORE_EXPORT std::optional<MessageWithMessagePorts> tryTakeMessageForPort(const MessagePortIdentifier&);
WEBCORE_EXPORT MessagePortChannel* existingChannelContainingPort(const MessagePortIdentifier&);

View File

@@ -186,7 +186,7 @@ ExceptionOr<void> Worker::postMessage(JSC::JSGlobalObject& state, JSC::JSValue m
return Exception { InvalidStateError, "Worker has been terminated"_s };
Vector<RefPtr<MessagePort>> ports;
auto serialized = SerializedScriptValue::create(state, messageValue, WTFMove(options.transfer), ports);
auto serialized = SerializedScriptValue::create(state, messageValue, WTFMove(options.transfer), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage);
if (serialized.hasException())
return serialized.releaseException();

View File

@@ -97,6 +97,7 @@ public:
void dispatchExit();
ScriptExecutionContext* scriptExecutionContext() const final { return ContextDestructionObserver::scriptExecutionContext(); }
ScriptExecutionContextIdentifier clientIdentifier() const { return m_clientIdentifier; }
WorkerOptions& options() { return m_options; }
private:
Worker(ScriptExecutionContext&, WorkerOptions&&);
@@ -119,7 +120,7 @@ private:
static void networkStateChanged(bool isOnLine);
// RefPtr<WorkerScriptLoader> m_scriptLoader;
const WorkerOptions m_options;
WorkerOptions m_options;
String m_identifier;
// WorkerGlobalScopeProxy& m_contextProxy; // The proxy outlives the worker to perform thread shutdown.
// std::optional<ContentSecurityPolicyResponseHeaders> m_contentSecurityPolicyResponseHeaders;

View File

@@ -1,12 +1,17 @@
#pragma once
#include "root.h"
#include "SerializedScriptValue.h"
#include "TransferredMessagePort.h"
#include "MessagePort.h"
namespace WebCore {
struct BunOptions {
bool mini { false };
bool unref { false };
RefPtr<SerializedScriptValue> data;
Vector<TransferredMessagePort> dataMessagePorts;
};
struct WorkerOptions {

View File

@@ -2408,8 +2408,8 @@ pub const HardcodedModule = enum {
@"node:util/types",
@"node:vm",
@"node:wasi",
@"node:worker_threads",
@"node:zlib",
@"node:worker_threads",
@"node:punycode",
undici,
ws,

View File

@@ -1 +1,296 @@
export default $lazy("masqueradesAsUndefined");
const { MessageChannel, BroadcastChannel } = globalThis;
function injectFakeEmitter(Class) {
function messageEventHandler(event: MessageEvent) {
return event.data;
}
function errorEventHandler(event: ErrorEvent) {
return event.error;
}
const wrappedListener = Symbol("wrappedListener");
function wrapped(run, listener) {
const callback = function (event) {
return listener(run(event));
};
listener[wrappedListener] = callback;
return callback;
}
function functionForEventType(event, listener) {
switch (event) {
case "error":
case "messageerror": {
return wrapped(errorEventHandler, listener);
}
default: {
return wrapped(messageEventHandler, listener);
}
}
}
Class.prototype.on = function (event, listener) {
this.addEventListener(event, functionForEventType(event, listener));
return this;
};
Class.prototype.off = function (event, listener) {
if (listener) {
this.removeEventListener(event, listener[wrappedListener] || listener);
} else {
this.removeEventListener(event);
}
return this;
};
Class.prototype.once = function (event, listener) {
this.addEventListener(event, functionForEventType(event, listener), { once: true });
return this;
};
function EventClass(eventName) {
if (eventName === "error" || eventName === "messageerror") {
return ErrorEvent;
}
return MessageEvent;
}
Class.prototype.emit = function (event, ...args) {
this.dispatchEvent(new (EventClass(event))(event, ...args));
return this;
};
Class.prototype.prependListener = Class.prototype.on;
Class.prototype.prependOnceListener = Class.prototype.once;
}
const _MessagePort = globalThis.MessagePort;
injectFakeEmitter(_MessagePort);
const MessagePort = _MessagePort;
const EventEmitter = require("node:events");
const isMainThread = Bun.isMainThread;
let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads");
let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort();
let resourceLimits = {};
let workerData = _workerData;
let threadId = _threadId;
function receiveMessageOnPort(port: MessagePort) {
let res = _receiveMessageOnPort(port);
if (!res) return undefined;
return {
message: res,
};
}
function fakeParentPort() {
const fake = Object.create(MessagePort.prototype);
Object.defineProperty(fake, "onmessage", {
get() {
return self.onmessage;
},
set(value) {
self.onmessage = value;
},
});
Object.defineProperty(fake, "onmessageerror", {
get() {
return self.onmessageerror;
},
set(value) {},
});
Object.defineProperty(fake, "postMessage", {
value(...args: any[]) {
return self.postMessage(...args);
},
});
Object.defineProperty(fake, "close", {
value() {
return process.exit(0);
},
});
Object.defineProperty(fake, "start", {
value() {},
});
Object.defineProperty(fake, "unref", {
value() {},
});
Object.defineProperty(fake, "ref", {
value() {},
});
Object.defineProperty(fake, "hasRef", {
value() {
return false;
},
});
Object.defineProperty(fake, "setEncoding", {
value() {},
});
Object.defineProperty(fake, "addEventListener", {
value: self.addEventListener.bind(self),
});
Object.defineProperty(fake, "removeEventListener", {
value: self.removeEventListener.bind(self),
});
return fake;
}
function getEnvironmentData() {
return process.env;
}
function setEnvironmentData(env: any) {
process.env = env;
}
function markAsUntransferable() {
throw new Error("markAsUntransferable is not implemented");
}
function moveMessagePortToContext() {
throw new Error("moveMessagePortToContext is not implemented");
}
const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV");
const WebWorker = globalThis.Worker;
class Worker extends EventEmitter {
#worker: globalThis.Worker;
#performance;
#onExitPromise = undefined;
constructor(filename: string, options: any = {}) {
super();
this.#worker = new WebWorker(filename, {
...options,
});
this.#worker.addEventListener("close", this.#onClose.bind(this));
this.#worker.addEventListener("error", this.#onError.bind(this));
this.#worker.addEventListener("message", this.#onMessage.bind(this));
this.#worker.addEventListener("messageerror", this.#onMessageError.bind(this));
this.#worker.addEventListener("open", this.#onOpen.bind(this));
}
ref() {
this.#worker.ref();
}
unref() {
this.#worker.unref();
}
get stdin() {
// TODO:
return null;
}
get stdout() {
// TODO:
return null;
}
get stderr() {
// TODO:
return null;
}
get performance() {
return (this.#performance ??= {
eventLoopUtilization() {
return {};
},
});
}
terminate() {
if (this.#onExitPromise) {
return this.#onExitPromise;
}
const { resolve, promise } = Promise.withResolvers();
this.#worker.addEventListener(
"close",
event => {
// TODO: exit code
resolve(0);
},
{ once: true },
);
return (this.#onExitPromise = promise);
}
postMessage(...args: any[]) {
return this.#worker.postMessage(...args);
}
#onClose() {
this.emit("exit");
}
#onError(event: ErrorEvent) {
// TODO: is this right?
this.emit("error", event);
}
#onMessage(event: MessageEvent) {
// TODO: is this right?
this.emit("message", event.data);
}
#onMessageError(event: Event) {
// TODO: is this right?
this.emit("messageerror", event.error || event);
}
#onOpen() {
// TODO: is this right?
this.emit("online");
}
getHeapSnapshot() {
return {};
}
}
export default {
Worker,
workerData,
parentPort,
resourceLimits,
isMainThread,
MessageChannel,
BroadcastChannel,
MessagePort,
getEnvironmentData,
setEnvironmentData,
getHeapSnapshot() {
return {};
},
markAsUntransferable,
moveMessagePortToContext,
receiveMessageOnPort,
SHARE_ENV,
threadId,
};

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,213 @@
var injectFakeEmitter = function(Class) {
function messageEventHandler(event) {
return event.data;
}
function errorEventHandler(event) {
return event.error;
}
const wrappedListener = Symbol("wrappedListener");
function wrapped(run, listener) {
const callback = function(event) {
return listener(run(event));
};
return listener[wrappedListener] = callback, callback;
}
function functionForEventType(event, listener) {
switch (event) {
case "error":
case "messageerror":
return wrapped(errorEventHandler, listener);
default:
return wrapped(messageEventHandler, listener);
}
}
Class.prototype.on = function(event, listener) {
return this.addEventListener(event, functionForEventType(event, listener)), this;
}, Class.prototype.off = function(event, listener) {
if (listener)
this.removeEventListener(event, listener[wrappedListener] || listener);
else
this.removeEventListener(event);
return this;
}, Class.prototype.once = function(event, listener) {
return this.addEventListener(event, functionForEventType(event, listener), { once: !0 }), this;
};
function EventClass(eventName) {
if (eventName === "error" || eventName === "messageerror")
return ErrorEvent;
return MessageEvent;
}
Class.prototype.emit = function(event, ...args) {
return this.dispatchEvent(new (EventClass(event))(event, ...args)), this;
}, Class.prototype.prependListener = Class.prototype.on, Class.prototype.prependOnceListener = Class.prototype.once;
};
import EventEmitter from "node:events";
function receiveMessageOnPort(port) {
let res = _receiveMessageOnPort(port);
if (!res)
return;
return {
message: res
};
}
var fakeParentPort = function() {
const fake = Object.create(MessagePort.prototype);
return Object.defineProperty(fake, "onmessage", {
get() {
return self.onmessage;
},
set(value) {
self.onmessage = value;
}
}), Object.defineProperty(fake, "onmessageerror", {
get() {
return self.onmessageerror;
},
set(value) {
}
}), Object.defineProperty(fake, "postMessage", {
value(...args) {
return self.postMessage(...args);
}
}), Object.defineProperty(fake, "close", {
value() {
return process.exit(0);
}
}), Object.defineProperty(fake, "start", {
value() {
}
}), Object.defineProperty(fake, "unref", {
value() {
}
}), Object.defineProperty(fake, "ref", {
value() {
}
}), Object.defineProperty(fake, "hasRef", {
value() {
return !1;
}
}), Object.defineProperty(fake, "setEncoding", {
value() {
}
}), Object.defineProperty(fake, "addEventListener", {
value: self.addEventListener.bind(self)
}), Object.defineProperty(fake, "removeEventListener", {
value: self.removeEventListener.bind(self)
}), fake;
};
function getEnvironmentData() {
return process.env;
}
function setEnvironmentData(env) {
process.env = env;
}
function markAsUntransferable() {
throw new Error("markAsUntransferable is not implemented");
}
function moveMessagePortToContext() {
throw new Error("moveMessagePortToContext is not implemented");
}
var { MessageChannel, BroadcastChannel } = globalThis, _MessagePort = globalThis.MessagePort;
injectFakeEmitter(_MessagePort);
var MessagePort = _MessagePort, isMainThread = Bun.isMainThread, [_workerData, _threadId, _receiveMessageOnPort] = globalThis[Symbol.for("Bun.lazy")]("worker_threads"), parentPort = isMainThread ? null : fakeParentPort(), resourceLimits = {}, workerData = _workerData, threadId = _threadId, SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV"), WebWorker = globalThis.Worker;
class Worker extends EventEmitter {
#worker;
#performance;
#onExitPromise = void 0;
constructor(filename, options = {}) {
super();
this.#worker = new WebWorker(filename, {
...options
}), this.#worker.addEventListener("close", this.#onClose.bind(this)), this.#worker.addEventListener("error", this.#onError.bind(this)), this.#worker.addEventListener("message", this.#onMessage.bind(this)), this.#worker.addEventListener("messageerror", this.#onMessageError.bind(this)), this.#worker.addEventListener("open", this.#onOpen.bind(this));
}
ref() {
this.#worker.ref();
}
unref() {
this.#worker.unref();
}
get stdin() {
return null;
}
get stdout() {
return null;
}
get stderr() {
return null;
}
get performance() {
return this.#performance ??= {
eventLoopUtilization() {
return {};
}
};
}
terminate() {
if (this.#onExitPromise)
return this.#onExitPromise;
const { resolve, promise } = Promise.withResolvers();
return this.#worker.addEventListener("close", (event) => {
resolve(0);
}, { once: !0 }), this.#onExitPromise = promise;
}
postMessage(...args) {
return this.#worker.postMessage(...args);
}
#onClose() {
this.emit("exit");
}
#onError(event) {
this.emit("error", event);
}
#onMessage(event) {
this.emit("message", event.data);
}
#onMessageError(event) {
this.emit("messageerror", event.error || event);
}
#onOpen() {
this.emit("online");
}
getHeapSnapshot() {
return {};
}
}
var worker_threads_default = {
Worker,
workerData,
parentPort,
resourceLimits,
isMainThread,
MessageChannel,
BroadcastChannel,
MessagePort,
getEnvironmentData,
setEnvironmentData,
getHeapSnapshot() {
return {};
},
markAsUntransferable,
moveMessagePortToContext,
receiveMessageOnPort,
SHARE_ENV,
threadId
};
export {
workerData,
threadId,
setEnvironmentData,
resourceLimits,
receiveMessageOnPort,
parentPort,
moveMessagePortToContext,
markAsUntransferable,
isMainThread,
getEnvironmentData,
worker_threads_default as default,
Worker,
SHARE_ENV,
MessagePort,
MessageChannel,
BroadcastChannel
};

View File

@@ -1,6 +0,0 @@
test("not implemented yet module masquerades as undefined in cjs and throws an error", () => {
const worker_threads = require("worker_threads");
expect(typeof worker_threads).toBe("undefined");
expect(typeof worker_threads.getEnvironmentData).toBe("undefined");
});

View File

@@ -1,16 +0,0 @@
import { expect, test } from "bun:test";
import * as worker_threads from "worker_threads";
import worker_threads_default from "worker_threads";
test("not implemented yet module masquerades as undefined and throws an error", () => {
expect(typeof worker_threads.default).toBe("undefined");
expect(typeof worker_threads_default).toBe("undefined");
expect(typeof worker_threads.getEnvironmentData).toBe("undefined");
expect(typeof worker_threads_default.getEnvironmentData).toBe("undefined");
});
test("esbuild functions with worker_threads stub", async () => {
const esbuild = await import("esbuild");
const result = await esbuild.transform('console . log( "hello world" )', { minify: true });
expect(result.code).toBe('console.log("hello world");\n');
});

View File

@@ -0,0 +1,8 @@
const wt = require("worker_threads");
wt.parentPort.on("message", e => {
let sharedBufferView = new Int32Array(e.sharedBuffer);
wt.workerData.postMessage("done!");
Atomics.add(sharedBufferView, 0, 1);
Atomics.notify(sharedBufferView, 0, Infinity);
});

View File

@@ -0,0 +1,107 @@
import wt from "worker_threads";
import {
getEnvironmentData,
isMainThread,
markAsUntransferable,
moveMessagePortToContext,
parentPort,
receiveMessageOnPort,
resourceLimits,
setEnvironmentData,
SHARE_ENV,
threadId,
workerData,
BroadcastChannel,
MessageChannel,
MessagePort,
Worker,
} from "worker_threads";
test("all properties are present", () => {
expect(wt).toHaveProperty("getEnvironmentData");
expect(wt).toHaveProperty("isMainThread");
expect(wt).toHaveProperty("markAsUntransferable");
expect(wt).toHaveProperty("moveMessagePortToContext");
expect(wt).toHaveProperty("parentPort");
expect(wt).toHaveProperty("receiveMessageOnPort");
expect(wt).toHaveProperty("resourceLimits");
expect(wt).toHaveProperty("SHARE_ENV");
expect(wt).toHaveProperty("setEnvironmentData");
expect(wt).toHaveProperty("threadId");
expect(wt).toHaveProperty("workerData");
expect(wt).toHaveProperty("BroadcastChannel");
expect(wt).toHaveProperty("MessageChannel");
expect(wt).toHaveProperty("MessagePort");
expect(wt).toHaveProperty("Worker");
expect(getEnvironmentData).toBeDefined();
expect(isMainThread).toBeDefined();
expect(markAsUntransferable).toBeDefined();
expect(moveMessagePortToContext).toBeDefined();
expect(parentPort).toBeDefined();
expect(receiveMessageOnPort).toBeDefined();
expect(resourceLimits).toBeDefined();
expect(SHARE_ENV).toBeDefined();
expect(setEnvironmentData).toBeDefined();
expect(threadId).toBeDefined();
expect(workerData).toBeUndefined();
expect(BroadcastChannel).toBeDefined();
expect(MessageChannel).toBeDefined();
expect(MessagePort).toBeDefined();
expect(Worker).toBeDefined();
expect(() => {
wt.markAsUntransferable();
}).toThrow("not implemented");
expect(() => {
wt.moveMessagePortToContext();
}).toThrow("not implemented");
});
test("receiveMessageOnPort works across threads", () => {
const { port1, port2 } = new MessageChannel();
var worker = new wt.Worker(new URL("./worker.js", import.meta.url).href, {
workerData: port2,
transferList: [port2],
});
let sharedBuffer = new SharedArrayBuffer(8);
let sharedBufferView = new Int32Array(sharedBuffer);
let msg = { sharedBuffer };
worker.postMessage(msg);
Atomics.wait(sharedBufferView, 0, 0);
const message = receiveMessageOnPort(port1);
expect(message).toBeDefined();
expect(message!.message).toBe("done!");
});
test("receiveMessageOnPort works with FIFO", () => {
const { port1, port2 } = new wt.MessageChannel();
const message1 = { hello: "world" };
const message2 = { foo: "bar" };
// Make sure receiveMessageOnPort() works in a FIFO way, the same way it does
// when were using events.
expect(receiveMessageOnPort(port2)).toBe(undefined);
port1.postMessage(message1);
port1.postMessage(message2);
expect(receiveMessageOnPort(port2)).toStrictEqual({ message: message1 });
expect(receiveMessageOnPort(port2)).toStrictEqual({ message: message2 });
expect(receiveMessageOnPort(port2)).toBe(undefined);
expect(receiveMessageOnPort(port2)).toBe(undefined);
// Make sure message handlers arent called.
port2.on("message", () => {
expect().fail("message handler must not be called");
});
port1.postMessage(message1);
expect(receiveMessageOnPort(port2)).toStrictEqual({ message: message1 });
port1.close();
for (const value of [null, 0, -1, {}, []]) {
expect(() => {
// @ts-ignore
receiveMessageOnPort(value);
}).toThrow();
}
});