From eee5d4fb4a73c1cff88e51cd1650e1b152320814 Mon Sep 17 00:00:00 2001 From: 190n Date: Tue, 8 Apr 2025 05:29:53 -0700 Subject: [PATCH] node:worker_threads low-hanging fruit (#18758) Co-authored-by: 190n <7763597+190n@users.noreply.github.com> Co-authored-by: Ashcon Partovi Co-authored-by: Jarred Sumner Co-authored-by: Dylan Conway <35280289+dylan-conway@users.noreply.github.com> Co-authored-by: Don Isaac Co-authored-by: chloe caruso --- src/bun.js/bindings/ZigGlobalObject.cpp | 28 ++++++-- src/bun.js/bindings/headers.h | 2 +- src/bun.js/bindings/webcore/JSWorker.cpp | 65 ++++++++++++------- .../webcore/SerializedScriptValue.cpp | 2 +- src/bun.js/bindings/webcore/Worker.cpp | 59 ++++++++++------- src/bun.js/bindings/webcore/WorkerOptions.h | 23 ++++--- src/bun.js/javascript.zig | 27 +++++++- src/bun.js/node/node_net_binding.zig | 12 ++-- src/bun.js/node/types.zig | 20 +++--- src/bun.js/web_worker.zig | 28 ++++---- src/bun_js.zig | 8 ++- src/cli/test_command.zig | 3 + src/js/node/fs.promises.ts | 9 ++- src/js/node/worker_threads.ts | 51 ++++++++++++--- src/shell/interpreter.zig | 6 +- ...test-worker-abort-on-uncaught-exception.js | 12 ++++ .../parallel/test-worker-console-listeners.js | 16 +++++ .../node/test/parallel/test-worker-memory.js | 51 +++++++++++++++ ...orker-message-channel-sharedarraybuffer.js | 2 +- .../test/parallel/test-worker-process-argv.js | 49 ++++++++++++++ .../parallel/test-worker-terminate-timers.js | 2 +- ...est-worker-workerdata-sharedarraybuffer.js | 2 +- .../eval-source-leak-fixture.js | 40 ++++++++++++ .../node/worker_threads/fixture-execargv.js | 13 ++++ .../worker_threads/worker_threads.test.ts | 45 +++++++++++++ 25 files changed, 459 insertions(+), 116 deletions(-) create mode 100644 test/js/node/test/parallel/test-worker-abort-on-uncaught-exception.js create mode 100644 test/js/node/test/parallel/test-worker-console-listeners.js create mode 100644 test/js/node/test/parallel/test-worker-memory.js create mode 100644 test/js/node/test/parallel/test-worker-process-argv.js create mode 100644 test/js/node/worker_threads/eval-source-leak-fixture.js create mode 100644 test/js/node/worker_threads/fixture-execargv.js diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 21f373f960..26c282c5ca 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -988,26 +988,25 @@ extern "C" JSC__JSGlobalObject* Zig__GlobalObject__create(void* console_client, const auto initializeWorker = [&](WebCore::Worker& worker) -> void { auto& options = worker.options(); - if (options.bun.env) { - auto map = WTFMove(options.bun.env); - auto size = map->size(); + if (options.env.has_value()) { + HashMap map = WTFMove(*std::exchange(options.env, std::nullopt)); + auto size = map.size(); // In theory, a GC could happen before we finish putting all the properties on the object. // So we use a MarkedArgumentBuffer to ensure that the strings are not collected and we immediately put them on the object. MarkedArgumentBuffer strings; - strings.ensureCapacity(map->size()); - for (const auto& value : map->values()) { + strings.ensureCapacity(size); + for (const auto& value : map.values()) { strings.append(jsString(vm, value)); } auto env = JSC::constructEmptyObject(globalObject, globalObject->objectPrototype(), size >= JSFinalObject::maxInlineCapacity ? JSFinalObject::maxInlineCapacity : size); size_t i = 0; - for (auto k : *map) { + for (auto k : map) { // They can have environment variables with numbers as keys. // So we must use putDirectMayBeIndex to handle that. env->putDirectMayBeIndex(globalObject, JSC::Identifier::fromString(vm, WTFMove(k.key)), strings.at(i++)); } - map->clear(); globalObject->m_processEnvObject.set(vm, globalObject, env); } @@ -4677,6 +4676,21 @@ bool GlobalObject::hasNapiFinalizers() const return false; } +extern "C" void Zig__GlobalObject__destructOnExit(Zig::GlobalObject* globalObject) +{ + auto& vm = JSC::getVM(globalObject); + if (vm.entryScope) { + // Exiting while running JavaScript code (e.g. `process.exit()`), so we can't destroy it + // just now. Perhaps later in this case we can defer destruction to run later. + return; + } + gcUnprotect(globalObject); + globalObject = nullptr; + vm.heap.collectNow(JSC::Sync, JSC::CollectionScope::Full); + vm.derefSuppressingSaferCPPChecking(); + vm.derefSuppressingSaferCPPChecking(); +} + #include "ZigGeneratedClasses+lazyStructureImpl.h" #include "ZigGlobalObject.lut.h" diff --git a/src/bun.js/bindings/headers.h b/src/bun.js/bindings/headers.h index a2b20170aa..6725daa117 100644 --- a/src/bun.js/bindings/headers.h +++ b/src/bun.js/bindings/headers.h @@ -741,7 +741,7 @@ ZIG_DECL size_t Bun__WebSocketClientTLS__memoryCost(WebSocketClientTLS* arg0); #ifdef __cplusplus -ZIG_DECL void Bun__Process__exit(JSC__JSGlobalObject* arg0, unsigned char arg1); +ZIG_DECL /*[[noreturn]]*/ void Bun__Process__exit(JSC__JSGlobalObject* arg0, uint8_t arg1); // TODO(@190n) figure out why with a real [[noreturn]] annotation this trips ASan before calling the function ZIG_DECL JSC__JSValue Bun__Process__getArgv(JSC__JSGlobalObject* arg0); ZIG_DECL JSC__JSValue Bun__Process__getArgv0(JSC__JSGlobalObject* arg0); ZIG_DECL JSC__JSValue Bun__Process__getCwd(JSC__JSGlobalObject* arg0); diff --git a/src/bun.js/bindings/webcore/JSWorker.cpp b/src/bun.js/bindings/webcore/JSWorker.cpp index 223aab4249..676ea4543f 100644 --- a/src/bun.js/bindings/webcore/JSWorker.cpp +++ b/src/bun.js/bindings/webcore/JSWorker.cpp @@ -41,6 +41,7 @@ #include "JSDOMOperation.h" #include "JSDOMWrapperCache.h" #include "JSEventListener.h" +#include "NodeValidator.h" #include "StructuredSerializeOptions.h" #include "JSWorkerOptions.h" #include "ScriptExecutionContext.h" @@ -128,7 +129,6 @@ template<> JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor:: EnsureStillAliveScope argument1 = callFrame->argument(1); auto options = WorkerOptions {}; - options.bun.unref = false; if (JSObject* optionsObject = JSC::jsDynamicCast(argument1.value())) { if (auto nameValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, vm.propertyNames->name)) { @@ -139,12 +139,19 @@ template<> JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor:: } if (auto miniModeValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "smol"_s))) { - options.bun.mini = miniModeValue.toBoolean(lexicalGlobalObject); + options.mini = miniModeValue.toBoolean(lexicalGlobalObject); } + RETURN_IF_EXCEPTION(throwScope, {}); if (auto ref = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "ref"_s))) { - options.bun.unref = !ref.toBoolean(lexicalGlobalObject); + options.unref = !ref.toBoolean(lexicalGlobalObject); } + RETURN_IF_EXCEPTION(throwScope, {}); + + if (auto eval = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "eval"_s))) { + options.evalMode = eval.toBoolean(lexicalGlobalObject); + } + RETURN_IF_EXCEPTION(throwScope, {}); if (auto preloadModulesValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "preload"_s))) { if (!preloadModulesValue.isUndefinedOrNull()) { @@ -152,14 +159,14 @@ template<> JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor:: auto str = preloadModulesValue.toWTFString(lexicalGlobalObject); RETURN_IF_EXCEPTION(throwScope, {}); if (!str.isEmpty()) { - options.bun.preloadModules.append(str); + options.preloadModules.append(str); } } else if (auto* array = jsDynamicCast(preloadModulesValue)) { std::optional> seq = convert>(*lexicalGlobalObject, array); RETURN_IF_EXCEPTION(throwScope, {}); if (seq) { - options.bun.preloadModules = WTFMove(*seq); - options.bun.preloadModules.removeAllMatching([](const String& str) { + options.preloadModules = WTFMove(*seq); + options.preloadModules.removeAllMatching([](const String& str) { return str.isEmpty(); }); } @@ -211,13 +218,17 @@ template<> JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor:: transferredPorts = disentangleResult.releaseReturnValue(); } - options.bun.data = serialized.releaseReturnValue(); - options.bun.dataMessagePorts = WTFMove(transferredPorts); + options.data = serialized.releaseReturnValue(); + options.dataMessagePorts = WTFMove(transferredPorts); } auto* globalObject = jsCast(lexicalGlobalObject); auto envValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "env"_s)); RETURN_IF_EXCEPTION(throwScope, {}); + // for now, we don't permit SHARE_ENV, because the behavior isn't implemented + if (envValue && !(envValue.isObject() || envValue.isUndefinedOrNull())) { + return Bun::ERR::INVALID_ARG_TYPE(throwScope, globalObject, "options.env"_s, "object or one of undefined, null, or worker_threads.SHARE_ENV"_s, envValue); + } JSObject* envObject = nullptr; if (envValue && envValue.isCell()) { @@ -246,35 +257,43 @@ template<> JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor:: env.add(key.impl()->isolatedCopy(), str); } - options.bun.env = std::make_unique>(WTFMove(env)); + options.env.emplace(WTFMove(env)); } + // needed to match the coercion behavior of `String(value)`, which returns a descriptive + // string for Symbols instead of throwing like JSValue::toString does. + // may throw an exception! + auto coerceToIsolatedString = [lexicalGlobalObject](JSValue v) -> String { + String original = v.isSymbol() ? asSymbol(v)->descriptiveString() : v.toWTFString(lexicalGlobalObject); + return original.isolatedCopy(); + }; + JSValue argvValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "argv"_s)); RETURN_IF_EXCEPTION(throwScope, {}); - if (argvValue && argvValue.isCell() && argvValue.asCell()->type() == JSC::JSType::ArrayType) { - Vector argv; - forEachInIterable(lexicalGlobalObject, argvValue, [&argv](JSC::VM& vm, JSC::JSGlobalObject* lexicalGlobalObject, JSC::JSValue nextValue) { + if (argvValue && argvValue.pureToBoolean() != TriState::False) { + Bun::V::validateArray(throwScope, globalObject, argvValue, "options.argv"_s, jsNumber(0)); + RETURN_IF_EXCEPTION(throwScope, {}); + forEachInIterable(lexicalGlobalObject, argvValue, [&options, &coerceToIsolatedString](JSC::VM& vm, JSC::JSGlobalObject* lexicalGlobalObject, JSC::JSValue nextValue) { auto scope = DECLARE_THROW_SCOPE(vm); - String str = nextValue.toWTFString(lexicalGlobalObject).isolatedCopy(); - if (UNLIKELY(scope.exception())) - return; - argv.append(str); + String str = coerceToIsolatedString(nextValue); + RETURN_IF_EXCEPTION(scope, ); + options.argv.append(str); }); - options.bun.argv = std::make_unique>(WTFMove(argv)); } JSValue execArgvValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "execArgv"_s)); RETURN_IF_EXCEPTION(throwScope, {}); - if (execArgvValue && execArgvValue.isCell() && execArgvValue.asCell()->type() == JSC::JSType::ArrayType) { + if (execArgvValue && execArgvValue.pureToBoolean() != TriState::False) { Vector execArgv; - forEachInIterable(lexicalGlobalObject, execArgvValue, [&execArgv](JSC::VM& vm, JSC::JSGlobalObject* lexicalGlobalObject, JSC::JSValue nextValue) { + Bun::V::validateArray(throwScope, globalObject, execArgvValue, "options.execArgv"_s, jsNumber(0)); + RETURN_IF_EXCEPTION(throwScope, {}); + forEachInIterable(lexicalGlobalObject, execArgvValue, [&execArgv, &coerceToIsolatedString](JSC::VM& vm, JSC::JSGlobalObject* lexicalGlobalObject, JSC::JSValue nextValue) { auto scope = DECLARE_THROW_SCOPE(vm); - String str = nextValue.toWTFString(lexicalGlobalObject).isolatedCopy(); - if (UNLIKELY(scope.exception())) - return; + String str = coerceToIsolatedString(nextValue); + RETURN_IF_EXCEPTION(scope, ); execArgv.append(str); }); - options.bun.execArgv = std::make_unique>(WTFMove(execArgv)); + options.execArgv.emplace(WTFMove(execArgv)); } } diff --git a/src/bun.js/bindings/webcore/SerializedScriptValue.cpp b/src/bun.js/bindings/webcore/SerializedScriptValue.cpp index 40852b8bc2..152b97ac59 100644 --- a/src/bun.js/bindings/webcore/SerializedScriptValue.cpp +++ b/src/bun.js/bindings/webcore/SerializedScriptValue.cpp @@ -5578,7 +5578,7 @@ ExceptionOr> SerializedScriptValue::create(JSGlobalOb } if (auto port = JSMessagePort::toWrapped(vm, transferable.get())) { if (port->isDetached()) - return Exception { DataCloneError, "MessagePort is detached"_s }; + return Exception { DataCloneError, "MessagePort in transfer list is already detached"_s }; messagePorts.append(WTFMove(port)); continue; } diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp index 38bcd17573..d904af0266 100644 --- a/src/bun.js/bindings/webcore/Worker.cpp +++ b/src/bun.js/bindings/webcore/Worker.cpp @@ -118,12 +118,14 @@ extern "C" void* WebWorker__create( uint32_t contextId, bool miniMode, bool unrefByDefault, - StringImpl* argvPtr, - uint32_t argvLen, - StringImpl* execArgvPtr, - uint32_t execArgvLen, + bool evalMode, + StringImpl** argvPtr, + size_t argvLen, + bool defaultExecArgv, + StringImpl** execArgvPtr, + size_t execArgvLen, BunString* preloadModulesPtr, - uint32_t preloadModulesLen); + size_t preloadModulesLen); extern "C" void WebWorker__setRef( void* worker, bool ref); @@ -161,26 +163,31 @@ ExceptionOr> Worker::create(ScriptExecutionContext& context, const S BunString errorMessage = BunStringEmpty; BunString nameStr = Bun::toString(worker->m_options.name); - bool miniMode = worker->m_options.bun.mini; - bool unrefByDefault = worker->m_options.bun.unref; - - Vector* argv = worker->m_options.bun.argv.get(); - Vector* execArgv = worker->m_options.bun.execArgv.get(); - Vector* preloadModuleStrings = &worker->m_options.bun.preloadModules; + auto& preloadModuleStrings = worker->m_options.preloadModules; Vector preloadModules; - preloadModules.reserveInitialCapacity(preloadModuleStrings->size()); - for (auto& str : *preloadModuleStrings) { + preloadModules.reserveInitialCapacity(preloadModuleStrings.size()); + for (auto& str : preloadModuleStrings) { if (str.startsWith("file://"_s)) { WTF::URL urlObject = WTF::URL(str); if (!urlObject.isValid()) { return Exception { TypeError, makeString("Invalid file URL: \""_s, str, '"') }; } + // We need to replace the string inside preloadModuleStrings (this line replaces because + // we are iterating by-ref). Otherwise, the string returned by fileSystemPath() will be + // freed in this block, before it is used by Zig code. str = urlObject.fileSystemPath(); } preloadModules.append(Bun::toString(str)); } + // try to ensure the cast from String* to StringImpl** is sane + static_assert(sizeof(WTF::String) == sizeof(WTF::StringImpl*)); + std::span execArgv = worker->m_options.execArgv + .transform([](Vector& vec) -> std::span { + return { reinterpret_cast(vec.data()), vec.size() }; + }) + .value_or(std::span {}); void* impl = WebWorker__create( worker.ptr(), jsCast(context.jsGlobalObject())->bunVM(), @@ -189,18 +196,20 @@ ExceptionOr> Worker::create(ScriptExecutionContext& context, const S &errorMessage, static_cast(context.identifier()), static_cast(worker->m_clientIdentifier), - miniMode, - unrefByDefault, - argv ? reinterpret_cast(argv->data()) : nullptr, - argv ? static_cast(argv->size()) : 0, - execArgv ? reinterpret_cast(execArgv->data()) : nullptr, - execArgv ? static_cast(execArgv->size()) : 0, - preloadModules.size() ? preloadModules.data() : nullptr, - static_cast(preloadModules.size())); + worker->m_options.mini, + worker->m_options.unref, + worker->m_options.evalMode, + reinterpret_cast(worker->m_options.argv.data()), + worker->m_options.argv.size(), + !worker->m_options.execArgv.has_value(), + execArgv.data(), + execArgv.size(), + preloadModules.data(), + preloadModules.size()); // now referenced by Zig worker->ref(); - preloadModuleStrings->clear(); + preloadModuleStrings.clear(); if (!impl) { return Exception { TypeError, errorMessage.toWTFString(BunString::ZeroCopy) }; @@ -498,9 +507,9 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject) if (auto* worker = WebWorker__getParentWorker(globalObject->bunVM())) { auto& options = worker->options(); - if (worker && options.bun.data) { - auto ports = MessagePort::entanglePorts(*ScriptExecutionContext::getScriptExecutionContext(worker->clientIdentifier()), WTFMove(options.bun.dataMessagePorts)); - RefPtr serialized = WTFMove(options.bun.data); + if (worker && options.data) { + auto ports = MessagePort::entanglePorts(*ScriptExecutionContext::getScriptExecutionContext(worker->clientIdentifier()), WTFMove(options.dataMessagePorts)); + RefPtr serialized = WTFMove(options.data); JSValue deserialized = serialized->deserialize(*globalObject, globalObject, WTFMove(ports)); RETURN_IF_EXCEPTION(scope, {}); workerData = deserialized; diff --git a/src/bun.js/bindings/webcore/WorkerOptions.h b/src/bun.js/bindings/webcore/WorkerOptions.h index de6d9ba0fe..43249a3466 100644 --- a/src/bun.js/bindings/webcore/WorkerOptions.h +++ b/src/bun.js/bindings/webcore/WorkerOptions.h @@ -7,23 +7,22 @@ namespace WebCore { -struct BunOptions { +struct WorkerOptions { + String name; bool mini { false }; bool unref { false }; + // Most of our code doesn't care whether `eval` was passed, because worker_threads.ts + // automatically passes a Blob URL instead of a file path if `eval` is true. But, if `eval` is + // true, then we need to make sure that `process.argv` contains "[worker eval]" instead of the + // Blob URL. + bool evalMode { false }; RefPtr data; Vector dataMessagePorts; Vector preloadModules; - std::unique_ptr> env { nullptr }; - std::unique_ptr> argv { nullptr }; - std::unique_ptr> execArgv { nullptr }; -}; - -struct WorkerOptions { - // WorkerType type { WorkerType::Classic }; - // FetchRequestCredentials credentials { FetchRequestCredentials::SameOrigin }; - String name; - - BunOptions bun {}; + std::optional> env; // TODO(@190n) allow shared + Vector argv; + // If nullopt, inherit execArgv from the parent thread + std::optional> execArgv; }; } // namespace WebCore diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index a201648914..d390c024a3 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -910,6 +910,12 @@ pub const VirtualMachine = struct { // if one disconnect event listener should be ignored channel_ref_should_ignore_one_disconnect_event_listener: bool = false, + /// Whether this VM should be destroyed after it exits, even if it is the main thread's VM. + /// Worker VMs are always destroyed on exit, regardless of this setting. Setting this to + /// true may expose bugs that would otherwise only occur using Workers. Controlled by + /// Options.destruct_main_thread_on_exit. + destruct_main_thread_on_exit: bool, + /// A set of extensions that exist in the require.extensions map. Keys /// contain the leading '.'. Value is either a loader for built in /// functions, or an index into JSCommonJSExtensions. @@ -1229,7 +1235,6 @@ pub const VirtualMachine = struct { extern fn Bun__handleUncaughtException(*JSGlobalObject, err: JSValue, is_rejection: c_int) c_int; extern fn Bun__handleUnhandledRejection(*JSGlobalObject, reason: JSValue, promise: JSValue) c_int; - extern fn Bun__Process__exit(*JSGlobalObject, code: c_int) noreturn; export fn Bun__VirtualMachine__exitDuringUncaughtException(this: *JSC.VirtualMachine) void { this.exit_on_uncaught_exception = true; @@ -1269,12 +1274,12 @@ pub const VirtualMachine = struct { if (this.is_handling_uncaught_exception) { this.runErrorHandler(err, null); - Bun__Process__exit(globalObject, 7); + JSC.Process.exit(globalObject, 7); @panic("Uncaught exception while handling uncaught exception"); } if (this.exit_on_uncaught_exception) { this.runErrorHandler(err, null); - Bun__Process__exit(globalObject, 1); + JSC.Process.exit(globalObject, 1); @panic("made it past Bun__Process__exit"); } this.is_handling_uncaught_exception = true; @@ -1283,6 +1288,7 @@ pub const VirtualMachine = struct { if (!handled) { // TODO maybe we want a separate code path for uncaught exceptions this.unhandled_error_counter += 1; + this.exit_handler.exit_code = 1; this.onUnhandledRejection(this, globalObject, err); } return handled; @@ -1455,7 +1461,13 @@ pub const VirtualMachine = struct { } } + extern fn Zig__GlobalObject__destructOnExit(*JSGlobalObject) void; + pub fn globalExit(this: *VirtualMachine) noreturn { + if (this.destruct_main_thread_on_exit and this.is_main_thread) { + Zig__GlobalObject__destructOnExit(this.global); + this.deinit(); + } bun.Global.exit(this.exit_handler.exit_code); } @@ -1954,6 +1966,7 @@ pub const VirtualMachine = struct { .ref_strings_mutex = .{}, .standalone_module_graph = opts.graph.?, .debug_thread_id = if (Environment.allow_assert) std.Thread.getCurrentId(), + .destruct_main_thread_on_exit = opts.destruct_main_thread_on_exit, }; vm.source_mappings.init(&vm.saved_source_map_table); vm.regular_event_loop.tasks = EventLoop.Queue.init( @@ -2026,6 +2039,10 @@ pub const VirtualMachine = struct { graph: ?*bun.StandaloneModuleGraph = null, debugger: bun.CLI.Command.Debugger = .{ .unspecified = {} }, is_main_thread: bool = false, + /// Whether this VM should be destroyed after it exits, even if it is the main thread's VM. + /// Worker VMs are always destroyed on exit, regardless of this setting. Setting this to + /// true may expose bugs that would otherwise only occur using Workers. + destruct_main_thread_on_exit: bool = false, }; pub var is_smol_mode = false; @@ -2076,6 +2093,7 @@ pub const VirtualMachine = struct { .ref_strings = JSC.RefString.Map.init(allocator), .ref_strings_mutex = .{}, .debug_thread_id = if (Environment.allow_assert) std.Thread.getCurrentId(), + .destruct_main_thread_on_exit = opts.destruct_main_thread_on_exit, }; vm.source_mappings.init(&vm.saved_source_map_table); vm.regular_event_loop.tasks = EventLoop.Queue.init( @@ -2238,6 +2256,8 @@ pub const VirtualMachine = struct { .standalone_module_graph = worker.parent.standalone_module_graph, .worker = worker, .debug_thread_id = if (Environment.allow_assert) std.Thread.getCurrentId(), + // This option is irrelevant for Workers + .destruct_main_thread_on_exit = false, }; vm.source_mappings.init(&vm.saved_source_map_table); vm.regular_event_loop.tasks = EventLoop.Queue.init( @@ -2331,6 +2351,7 @@ pub const VirtualMachine = struct { .ref_strings = JSC.RefString.Map.init(allocator), .ref_strings_mutex = .{}, .debug_thread_id = if (Environment.allow_assert) std.Thread.getCurrentId(), + .destruct_main_thread_on_exit = opts.destruct_main_thread_on_exit, }; vm.source_mappings.init(&vm.saved_source_map_table); vm.regular_event_loop.tasks = EventLoop.Queue.init( diff --git a/src/bun.js/node/node_net_binding.zig b/src/bun.js/node/node_net_binding.zig index 23f1cb0be0..eb038a8511 100644 --- a/src/bun.js/node/node_net_binding.zig +++ b/src/bun.js/node/node_net_binding.zig @@ -40,10 +40,14 @@ pub fn setDefaultAutoSelectFamily(global: *JSC.JSGlobalObject) JSC.JSValue { }).setter, 1, .{}); } -// -// - -pub var autoSelectFamilyAttemptTimeoutDefault: u32 = 250; +/// This is only used to provide the getDefaultAutoSelectFamilyAttemptTimeout and +/// setDefaultAutoSelectFamilyAttemptTimeout functions, not currently read by any other code. It's +/// `threadlocal` because Node.js expects each Worker to have its own copy of this, and currently +/// it can only be accessed by accessor functions which run on each Worker's main JavaScript thread. +/// +/// If this becomes used in more places, and especially if it can be read by other threads, we may +/// need to store it as a field in the VirtualMachine instead of in a `threadlocal`. +pub threadlocal var autoSelectFamilyAttemptTimeoutDefault: u32 = 250; pub fn getDefaultAutoSelectFamilyAttemptTimeout(global: *JSC.JSGlobalObject) JSC.JSValue { return JSC.JSFunction.create(global, "getDefaultAutoSelectFamilyAttemptTimeout", (struct { diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index 208a07952b..6f2e737e4f 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -1778,7 +1778,7 @@ pub const Process = struct { var args_count: usize = vm.argv.len; if (vm.worker) |worker| { - args_count = if (worker.argv) |argv| argv.len else 0; + args_count = worker.argv.len; } const args = allocator.alloc( @@ -1787,8 +1787,7 @@ pub const Process = struct { // argv also omits the script name args_count + 2, ) catch bun.outOfMemory(); - var args_list = std.ArrayListUnmanaged(bun.String){ .items = args, .capacity = args.len }; - args_list.items.len = 0; + var args_list: std.ArrayListUnmanaged(bun.String) = .initBuffer(args); if (vm.standalone_module_graph != null) { // Don't break user's code because they did process.argv.slice(2) @@ -1807,16 +1806,18 @@ pub const Process = struct { !strings.endsWithComptime(vm.main, bun.pathLiteral("/[eval]")) and !strings.endsWithComptime(vm.main, bun.pathLiteral("/[stdin]"))) { - args_list.appendAssumeCapacity(bun.String.fromUTF8(vm.main)); + if (vm.worker != null and vm.worker.?.eval_mode) { + args_list.appendAssumeCapacity(bun.String.static("[worker eval]")); + } else { + args_list.appendAssumeCapacity(bun.String.fromUTF8(vm.main)); + } } defer allocator.free(args); if (vm.worker) |worker| { - if (worker.argv) |argv| { - for (argv) |arg| { - args_list.appendAssumeCapacity(bun.String.init(arg)); - } + for (worker.argv) |arg| { + args_list.appendAssumeCapacity(bun.String.init(arg)); } } else { for (vm.argv) |arg| { @@ -1888,7 +1889,8 @@ pub const Process = struct { } } - pub fn exit(globalObject: *JSC.JSGlobalObject, code: u8) callconv(.C) void { + // TODO(@190n) this may need to be noreturn + pub fn exit(globalObject: *JSC.JSGlobalObject, code: u8) callconv(.c) void { var vm = globalObject.bunVM(); if (vm.worker) |worker| { vm.exit_handler.exit_code = code; diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index a8b6295520..dc27084e0e 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -27,7 +27,12 @@ pub const WebWorker = struct { arena: ?bun.MimallocArena = null, name: [:0]const u8 = "Worker", cpp_worker: *anyopaque, - mini: bool = false, + mini: bool, + // Most of our code doesn't care whether `eval` was passed, because worker_threads.ts + // automatically passes a Blob URL instead of a file path if `eval` is true. But, if `eval` is + // true, then we need to make sure that `process.argv` contains "[worker eval]" instead of the + // Blob URL. + eval_mode: bool, /// `user_keep_alive` is the state of the user's .ref()/.unref() calls /// if false, then the parent poll will always be unref, otherwise the worker's event loop will keep the poll alive. @@ -35,7 +40,8 @@ pub const WebWorker = struct { worker_event_loop_running: bool = true, parent_poll_ref: Async.KeepAlive = .{}, - argv: ?[]const WTFStringImpl, + // kept alive by C++ Worker object + argv: []const WTFStringImpl, execArgv: ?[]const WTFStringImpl, pub const Status = enum(u8) { @@ -178,12 +184,14 @@ pub const WebWorker = struct { this_context_id: u32, mini: bool, default_unref: bool, + eval_mode: bool, argv_ptr: ?[*]WTFStringImpl, - argv_len: u32, + argv_len: usize, + inherit_execArgv: bool, execArgv_ptr: ?[*]WTFStringImpl, - execArgv_len: u32, + execArgv_len: usize, preload_modules_ptr: ?[*]bun.String, - preload_modules_len: u32, + preload_modules_len: usize, ) callconv(.C) ?*WebWorker { JSC.markBinding(@src()); log("[{d}] WebWorker.create", .{this_context_id}); @@ -195,10 +203,7 @@ pub const WebWorker = struct { defer parent.transpiler.setLog(prev_log); defer temp_log.deinit(); - const preload_modules = if (preload_modules_ptr) |ptr| - ptr[0..preload_modules_len] - else - &.{}; + const preload_modules = if (preload_modules_ptr) |ptr| ptr[0..preload_modules_len] else &.{}; const path = resolveEntryPointSpecifier(parent, spec_slice.slice(), error_message, &temp_log) orelse { return null; @@ -228,6 +233,7 @@ pub const WebWorker = struct { .parent_context_id = parent_context_id, .execution_context_id = this_context_id, .mini = mini, + .eval_mode = eval_mode, .specifier = bun.default_allocator.dupe(u8, path) catch bun.outOfMemory(), .store_fd = parent.transpiler.resolver.store_fd, .name = brk: { @@ -238,8 +244,8 @@ pub const WebWorker = struct { }, .user_keep_alive = !default_unref, .worker_event_loop_running = true, - .argv = if (argv_ptr) |ptr| ptr[0..argv_len] else null, - .execArgv = if (execArgv_ptr) |ptr| ptr[0..execArgv_len] else null, + .argv = if (argv_ptr) |ptr| ptr[0..argv_len] else &.{}, + .execArgv = if (inherit_execArgv) null else (if (execArgv_ptr) |ptr| ptr[0..execArgv_len] else &.{}), .preloads = preloads.items, }; diff --git a/src/bun_js.zig b/src/bun_js.zig index cc8c570148..d1d40c3a68 100644 --- a/src/bun_js.zig +++ b/src/bun_js.zig @@ -68,6 +68,7 @@ pub const Run = struct { .args = ctx.args, .graph = graph_ptr, .is_main_thread = true, + .destruct_main_thread_on_exit = bun.getRuntimeFeatureFlag("BUN_DESTRUCT_VM_ON_EXIT"), }), .arena = arena, .ctx = ctx, @@ -205,6 +206,7 @@ pub const Run = struct { .debugger = ctx.runtime_options.debugger, .dns_result_order = DNSResolver.Order.fromStringOrDie(ctx.runtime_options.dns_result_order), .is_main_thread = true, + .destruct_main_thread_on_exit = bun.getRuntimeFeatureFlag("BUN_DESTRUCT_VM_ON_EXIT"), }, ), .arena = arena, @@ -307,6 +309,8 @@ pub const Run = struct { this.entry_path = vm.transpiler.fs.top_level_dir; } + var printed_sourcemap_warning_and_version = false; + if (vm.loadEntryPoint(this.entry_path)) |promise| { if (promise.status(vm.global.vm()) == .rejected) { const handled = vm.uncaughtException(vm.global, promise.result(vm.global.vm()), true); @@ -320,6 +324,7 @@ pub const Run = struct { vm.onExit(); if (run.any_unhandled) { + printed_sourcemap_warning_and_version = true; bun.JSC.SavedSourceMap.MissingSourceMapNoteInfo.print(); Output.prettyErrorln( @@ -350,6 +355,7 @@ pub const Run = struct { vm.exit_handler.exit_code = 1; vm.onExit(); if (run.any_unhandled) { + printed_sourcemap_warning_and_version = true; bun.JSC.SavedSourceMap.MissingSourceMapNoteInfo.print(); Output.prettyErrorln( @@ -437,7 +443,7 @@ pub const Run = struct { vm.global.handleRejectedPromises(); vm.onExit(); - if (this.any_unhandled and this.vm.exit_handler.exit_code == 0) { + if (this.any_unhandled and !printed_sourcemap_warning_and_version) { this.vm.exit_handler.exit_code = 1; bun.JSC.SavedSourceMap.MissingSourceMapNoteInfo.print(); diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig index 0a715c9242..959f846f3f 100644 --- a/src/cli/test_command.zig +++ b/src/cli/test_command.zig @@ -1268,6 +1268,7 @@ pub const TestCommand = struct { .smol = ctx.runtime_options.smol, .debugger = ctx.runtime_options.debugger, .is_main_thread = true, + .destruct_main_thread_on_exit = bun.getRuntimeFeatureFlag("BUN_DESTRUCT_VM_ON_EXIT"), }, ); vm.argv = ctx.passthrough; @@ -1583,6 +1584,8 @@ pub const TestCommand = struct { Global.exit(1); } else if (reporter.jest.unhandled_errors_between_tests > 0) { Global.exit(reporter.jest.unhandled_errors_between_tests); + } else { + vm.runWithAPILock(JSC.VirtualMachine, vm, JSC.VirtualMachine.globalExit); } } diff --git a/src/js/node/fs.promises.ts b/src/js/node/fs.promises.ts index f987507fa7..6910a28852 100644 --- a/src/js/node/fs.promises.ts +++ b/src/js/node/fs.promises.ts @@ -161,7 +161,7 @@ const exports = { mkdtemp: asyncWrap(fs.mkdtemp, "mkdtemp"), statfs: asyncWrap(fs.statfs, "statfs"), open: async (path, flags = "r", mode = 0o666) => { - return new FileHandle(await fs.open(path, flags, mode), flags); + return new private_symbols.FileHandle(await fs.open(path, flags, mode), flags); }, read: asyncWrap(fs.read, "read"), write: asyncWrap(fs.write, "write"), @@ -252,7 +252,7 @@ function asyncWrap(fn: any, name: string) { // Partially taken from https://github.com/nodejs/node/blob/c25878d370/lib/internal/fs/promises.js#L148 // These functions await the result so that errors propagate correctly with // async stack traces and so that the ref counting is correct. - var FileHandle = (private_symbols.FileHandle = class FileHandle extends EventEmitter { + class FileHandle extends EventEmitter { constructor(fd, flag) { super(); this[kFd] = fd ? fd : -1; @@ -274,6 +274,8 @@ function asyncWrap(fn: any, name: string) { [kFlag]; [kClosePromise]; [kRefs]; + // needs to exist for https://github.com/nodejs/node/blob/8641d941893/test/parallel/test-worker-message-port-transfer-fake-js-transferable.js to pass + [Symbol("messaging_transfer_symbol")]() {} async appendFile(data, options) { const fd = this[kFd]; @@ -597,7 +599,8 @@ function asyncWrap(fn: any, name: string) { this.close().$then(this[kCloseResolve], this[kCloseReject]); } } - }); + } + private_symbols.FileHandle = FileHandle; } function throwEBADFIfNecessary(fn: string, fd) { diff --git a/src/js/node/worker_threads.ts b/src/js/node/worker_threads.ts index b7287c585b..59cb63c677 100644 --- a/src/js/node/worker_threads.ts +++ b/src/js/node/worker_threads.ts @@ -5,6 +5,7 @@ type WebWorker = InstanceType; const EventEmitter = require("node:events"); const { throwNotImplemented, warnNotImplementedOnce } = require("internal/shared"); +const { validateObject, validateBoolean } = require("internal/validators"); const { MessageChannel, BroadcastChannel, Worker: WebWorker } = globalThis; const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV"); @@ -14,6 +15,10 @@ const { 0: _workerData, 1: _threadId, 2: _receiveMessageOnPort } = $cpp("Worker. type NodeWorkerOptions = import("node:worker_threads").WorkerOptions; +// Used to ensure that Blobs created to hold the source code for `eval: true` Workers get cleaned up +// after their Worker exits +let urlRevokeRegistry: FinalizationRegistry | undefined = undefined; + function injectFakeEmitter(Class) { function messageEventHandler(event: MessageEvent) { return event.data; @@ -127,7 +132,7 @@ function fakeParentPort() { const postMessage = $newCppFunction("ZigGlobalObject.cpp", "jsFunctionPostMessage", 1); Object.defineProperty(fake, "postMessage", { value(...args: [any, any]) { - return postMessage(...args); + return postMessage.$apply(null, args); }, }); @@ -205,6 +210,7 @@ class Worker extends EventEmitter { // either is the exit code if exited, a promise resolving to the exit code, or undefined if we haven't sent .terminate() yet #onExitPromise: Promise | number | undefined = undefined; #urlToRevoke = ""; + #isRunning = false; constructor(filename: string, options: NodeWorkerOptions = {}) { super(); @@ -217,6 +223,8 @@ class Worker extends EventEmitter { const builtinsGeneratorHatesEval = "ev" + "a" + "l"[0]; if (options && builtinsGeneratorHatesEval in options) { if (options[builtinsGeneratorHatesEval]) { + // TODO: consider doing this step in native code and letting the Blob be cleaned up by the + // C++ Worker object's destructor const blob = new Blob([filename], { type: "" }); this.#urlToRevoke = filename = URL.createObjectURL(blob); } else { @@ -224,7 +232,6 @@ class Worker extends EventEmitter { // we convert the code to a blob, it will succeed. this.#urlToRevoke = filename; } - delete options[builtinsGeneratorHatesEval]; } try { this.#worker = new WebWorker(filename, options); @@ -241,10 +248,12 @@ class Worker extends EventEmitter { this.#worker.addEventListener("open", this.#onOpen.bind(this), { once: true }); if (this.#urlToRevoke) { - const url = this.#urlToRevoke; - new FinalizationRegistry(url => { - URL.revokeObjectURL(url); - }).register(this.#worker, url); + if (!urlRevokeRegistry) { + urlRevokeRegistry = new FinalizationRegistry(url => { + URL.revokeObjectURL(url); + }); + } + urlRevokeRegistry.register(this.#worker, this.#urlToRevoke); } } @@ -288,7 +297,17 @@ class Worker extends EventEmitter { }); } - terminate() { + terminate(callback: unknown) { + this.#isRunning = false; + if (typeof callback === "function") { + process.emitWarning( + "Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.", + "DeprecationWarning", + "DEP0132", + ); + this.#worker.addEventListener("close", event => callback(null, event.code), { once: true }); + } + const onExitPromise = this.#onExitPromise; if (onExitPromise) { return $isPromise(onExitPromise) ? onExitPromise : Promise.resolve(onExitPromise); @@ -308,15 +327,17 @@ class Worker extends EventEmitter { } postMessage(...args: [any, any]) { - return this.#worker.postMessage(...args); + return this.#worker.postMessage.$apply(this.#worker, args); } #onClose(e) { + this.#isRunning = false; this.#onExitPromise = e.code; this.emit("exit", e.code); } #onError(event: ErrorEvent) { + this.#isRunning = false; let error = event?.error; if (!error) { error = new Error(event.message, { cause: event }); @@ -339,10 +360,22 @@ class Worker extends EventEmitter { } #onOpen() { + this.#isRunning = true; this.emit("online"); } - async getHeapSnapshot() { + getHeapSnapshot(options: any) { + if (options !== undefined) { + // These errors must be thrown synchronously. + validateObject(options, "options"); + validateBoolean(options.exposeInternals, "options.exposeInternals"); + validateBoolean(options.exposeNumericValues, "options.exposeNumericValues"); + } + if (!this.#isRunning) { + const err = new Error("Worker instance not running"); + err.code = "ERR_WORKER_NOT_RUNNING"; + return Promise.$reject(err); + } throwNotImplemented("worker_threads.Worker.getHeapSnapshot"); } } diff --git a/src/shell/interpreter.zig b/src/shell/interpreter.zig index 90baae6466..bf8bf42f18 100644 --- a/src/shell/interpreter.zig +++ b/src/shell/interpreter.zig @@ -2045,10 +2045,8 @@ pub const Interpreter = struct { } if (vm.worker) |worker| { - if (worker.argv) |argv| { - if (int >= argv.len) return ""; - return this.base.interpreter.getVmArgsUtf8(argv, int); - } + if (int >= worker.argv.len) return ""; + return this.base.interpreter.getVmArgsUtf8(worker.argv, int); } const argv = vm.argv; if (int >= argv.len) return ""; diff --git a/test/js/node/test/parallel/test-worker-abort-on-uncaught-exception.js b/test/js/node/test/parallel/test-worker-abort-on-uncaught-exception.js new file mode 100644 index 0000000000..7518d37552 --- /dev/null +++ b/test/js/node/test/parallel/test-worker-abort-on-uncaught-exception.js @@ -0,0 +1,12 @@ +// Flags: --abort-on-uncaught-exception +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Worker } = require('worker_threads'); + +// Tests that --abort-on-uncaught-exception does not apply to +// Workers. + +const w = new Worker('throw new Error()', { eval: true }); +w.on('error', common.mustCall()); +w.on('exit', common.mustCall((code) => assert.strictEqual(code, 1))); diff --git a/test/js/node/test/parallel/test-worker-console-listeners.js b/test/js/node/test/parallel/test-worker-console-listeners.js new file mode 100644 index 0000000000..9f0d6e4824 --- /dev/null +++ b/test/js/node/test/parallel/test-worker-console-listeners.js @@ -0,0 +1,16 @@ +'use strict'; +const common = require('../common'); +const { Worker, isMainThread } = require('worker_threads'); +const EventEmitter = require('events'); + +if (isMainThread) { + process.on('warning', common.mustNotCall('unexpected warning')); + + for (let i = 0; i < EventEmitter.defaultMaxListeners; ++i) { + const worker = new Worker(__filename); + + worker.on('exit', common.mustCall(() => { + console.log('a'); // This console.log() is part of the test. + })); + } +} diff --git a/test/js/node/test/parallel/test-worker-memory.js b/test/js/node/test/parallel/test-worker-memory.js new file mode 100644 index 0000000000..8c38409a26 --- /dev/null +++ b/test/js/node/test/parallel/test-worker-memory.js @@ -0,0 +1,51 @@ +'use strict'; +const common = require('../common'); +if (common.isIBMi) + common.skip('On IBMi, the rss memory always returns zero'); + +const assert = require('assert'); +const util = require('util'); +const { Worker } = require('worker_threads'); + +let numWorkers = +process.env.JOBS || require('os').availableParallelism(); +if (numWorkers > 20) { + // Cap the number of workers at 20 (as an even divisor of 60 used as + // the total number of workers started) otherwise the test fails on + // machines with high core counts. + numWorkers = 20; +} + +// Verify that a Worker's memory isn't kept in memory after the thread finishes. + +function run(n, done) { + console.log(`run() called with n=${n} (numWorkers=${numWorkers})`); + if (n <= 0) + return done(); + const worker = new Worker( + 'require(\'worker_threads\').parentPort.postMessage(2 + 2)', + { eval: true }); + worker.on('message', common.mustCall((value) => { + assert.strictEqual(value, 4); + })); + worker.on('exit', common.mustCall(() => { + run(n - 1, done); + })); +} + +const startStats = process.memoryUsage(); +let finished = 0; +for (let i = 0; i < numWorkers; ++i) { + run(60 / numWorkers, () => { + console.log(`done() called (finished=${finished})`); + if (++finished === numWorkers) { + const finishStats = process.memoryUsage(); + // A typical value for this ratio would be ~1.15. + // 5 as a upper limit is generous, but the main point is that we + // don't have the memory of 50 Isolates/Node.js environments just lying + // around somewhere. + assert.ok(finishStats.rss / startStats.rss < 5, + 'Unexpected memory overhead: ' + + util.inspect([startStats, finishStats])); + } + }); +} diff --git a/test/js/node/test/parallel/test-worker-message-channel-sharedarraybuffer.js b/test/js/node/test/parallel/test-worker-message-channel-sharedarraybuffer.js index 220aa978b1..6ee577d447 100644 --- a/test/js/node/test/parallel/test-worker-message-channel-sharedarraybuffer.js +++ b/test/js/node/test/parallel/test-worker-message-channel-sharedarraybuffer.js @@ -19,7 +19,7 @@ const { Worker } = require('worker_threads'); `, { eval: true }); w.on('message', common.mustCall(() => { assert.strictEqual(local.toString(), 'Hello world!'); - global.gc(); + globalThis.gc(); w.terminate(); })); w.postMessage({ sharedArrayBuffer }); diff --git a/test/js/node/test/parallel/test-worker-process-argv.js b/test/js/node/test/parallel/test-worker-process-argv.js new file mode 100644 index 0000000000..0e541c453e --- /dev/null +++ b/test/js/node/test/parallel/test-worker-process-argv.js @@ -0,0 +1,49 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Worker, isMainThread, workerData } = require('worker_threads'); + +if (isMainThread) { + assert.throws(() => { + new Worker(__filename, { argv: 'foo' }); + }, { + code: 'ERR_INVALID_ARG_TYPE' + }); + + [ + new Worker(__filename, { + argv: [null, 'foo', 123, Symbol('bar')], + // Asserts only if the worker is started by the test. + workerData: 'assert-argv' + }), + new Worker(` + const assert = require('assert'); + assert.deepStrictEqual( + process.argv, + [process.execPath, '[worker eval]'] + ); + `, { + eval: true + }), + new Worker(` + const assert = require('assert'); + assert.deepStrictEqual( + process.argv, + [process.execPath, '[worker eval]', 'null', 'foo', '123', + String(Symbol('bar'))] + ); + `, { + argv: [null, 'foo', 123, Symbol('bar')], + eval: true + }), + ].forEach((worker) => { + worker.on('exit', common.mustCall((code) => { + assert.strictEqual(code, 0); + })); + }); +} else if (workerData === 'assert-argv') { + assert.deepStrictEqual( + process.argv, + [process.execPath, __filename, 'null', 'foo', '123', String(Symbol('bar'))] + ); +} diff --git a/test/js/node/test/parallel/test-worker-terminate-timers.js b/test/js/node/test/parallel/test-worker-terminate-timers.js index defaadf9fe..62360a6cdb 100644 --- a/test/js/node/test/parallel/test-worker-terminate-timers.js +++ b/test/js/node/test/parallel/test-worker-terminate-timers.js @@ -8,7 +8,7 @@ for (const fn of ['setTimeout', 'setImmediate', 'setInterval']) { const worker = new Worker(` const { parentPort } = require('worker_threads'); ${fn}(() => { - parentPort.postMessage({}); + require('worker_threads').parentPort.postMessage({}); while (true); });`, { eval: true }); diff --git a/test/js/node/test/parallel/test-worker-workerdata-sharedarraybuffer.js b/test/js/node/test/parallel/test-worker-workerdata-sharedarraybuffer.js index 4e3d508ac9..4f1b332461 100644 --- a/test/js/node/test/parallel/test-worker-workerdata-sharedarraybuffer.js +++ b/test/js/node/test/parallel/test-worker-workerdata-sharedarraybuffer.js @@ -23,7 +23,7 @@ const { Worker } = require('worker_threads'); }); w.on('message', common.mustCall(() => { assert.strictEqual(local.toString(), 'Hello world!'); - global.gc(); + globalThis.gc(); w.terminate(); })); w.postMessage({}); diff --git a/test/js/node/worker_threads/eval-source-leak-fixture.js b/test/js/node/worker_threads/eval-source-leak-fixture.js new file mode 100644 index 0000000000..63758a66a8 --- /dev/null +++ b/test/js/node/worker_threads/eval-source-leak-fixture.js @@ -0,0 +1,40 @@ +// Create a worker with extremely large source code which completes instantly and the `eval` option +// set to true. Ensure that the Blob created to hold the source code is not kept in memory after the +// worker exits. +const { Worker } = require("node:worker_threads"); + +const eachSizeMiB = 100; +const iterations = 5; + +function test() { + const code = " ".repeat(eachSizeMiB * 1024 * 1024); + return new Promise((resolve, reject) => { + const worker = new Worker(code, { eval: true }); + worker.on("exit", () => resolve()); + worker.on("error", e => reject(e)); + }); +} + +async function reallyGC() { + for (let i = 0; i < 3; i++) { + await Bun.sleep(5); + Bun.gc(true); + } +} + +// warmup +await test(); +await reallyGC(); + +const before = process.memoryUsage.rss(); +for (let i = 0; i < iterations; i++) { + await test(); + await reallyGC(); +} +const after = process.memoryUsage.rss(); +// The bug is that the source code passed to `new Worker` would never be freed. +// If this bug is present, then the memory growth likely won't be much more than the total amount +// of source code, but it's impossible for the memory growth to be less than the source code size. +// On macOS before fixing this bug, deltaMiB was around 503. +const deltaMiB = (after - before) / 1024 / 1024; +if (deltaMiB >= eachSizeMiB * iterations) throw new Error(`leaked ${deltaMiB} MiB`); diff --git a/test/js/node/worker_threads/fixture-execargv.js b/test/js/node/worker_threads/fixture-execargv.js new file mode 100644 index 0000000000..907e79b2c1 --- /dev/null +++ b/test/js/node/worker_threads/fixture-execargv.js @@ -0,0 +1,13 @@ +import assert from "node:assert"; +import { Worker } from "node:worker_threads"; + +// parent thread needs to have nonempty execArgv, otherwise the test is faulty +assert(process.execArgv.length > 0); + +const execArgvToPass = JSON.parse(process.argv[2]); +new Worker("console.log(JSON.stringify(process.execArgv));", { eval: true, execArgv: execArgvToPass }).on( + "error", + e => { + throw e; + }, +); diff --git a/test/js/node/worker_threads/worker_threads.test.ts b/test/js/node/worker_threads/worker_threads.test.ts index 38857a83fd..4e36578c46 100644 --- a/test/js/node/worker_threads/worker_threads.test.ts +++ b/test/js/node/worker_threads/worker_threads.test.ts @@ -1,3 +1,4 @@ +import { bunEnv, bunExe } from "harness"; import fs from "node:fs"; import { join, relative, resolve } from "node:path"; import wt, { @@ -246,3 +247,47 @@ test("support worker eval that throws", async () => { expect(result.toString()).toInclude(`error: Unexpected throw`); await worker.terminate(); }); + +describe("execArgv option", async () => { + // this needs to be a subprocess to ensure that the parent's execArgv is not empty + // otherwise we could not distinguish between the worker inheriting the parent's execArgv + // vs. the worker getting a fresh empty execArgv + async function run(execArgv: string, expected: string) { + const proc = Bun.spawn({ + // pass --smol so that the parent thread has some known, non-empty execArgv + cmd: [bunExe(), "--smol", "fixture-execargv.js", execArgv], + env: bunEnv, + cwd: __dirname, + }); + await proc.exited; + expect(proc.exitCode).toBe(0); + expect(await new Response(proc.stdout).text()).toBe(expected); + } + + it("inherits the parent's execArgv when falsy or unspecified", async () => { + await run("null", '["--smol"]\n'); + await run("0", '["--smol"]\n'); + }); + it("provides empty execArgv when passed an empty array", async () => { + // empty array should result in empty execArgv, not inherited from parent thread + await run("[]", "[]\n"); + }); + it("can specify an array of strings", async () => { + await run('["--no-warnings"]', '["--no-warnings"]\n'); + }); + // TODO(@190n) get our handling of non-string array elements in line with Node's +}); + +test("eval does not leak source code", async () => { + const proc = Bun.spawn({ + cmd: [bunExe(), "eval-source-leak-fixture.js"], + env: bunEnv, + cwd: __dirname, + stderr: "pipe", + stdout: "ignore", + }); + await proc.exited; + const errors = await new Response(proc.stderr).text(); + if (errors.length > 0) throw new Error(errors); + expect(proc.exitCode).toBe(0); +});