diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index ca50477234..7a0f7a793e 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -1288,8 +1288,14 @@ pub const EventLoop = struct { _ = this.tickConcurrentWithCount(); } + /// Check whether refConcurrently has been called but the change has not yet been applied to the + /// underlying event loop's `active` counter + pub fn hasPendingRefs(this: *const EventLoop) bool { + return this.concurrent_ref.load(.seq_cst) > 0; + } + fn updateCounts(this: *EventLoop) void { - const delta = this.concurrent_ref.swap(0, .monotonic); + const delta = this.concurrent_ref.swap(0, .seq_cst); const loop = this.virtual_machine.event_loop_handle.?; if (comptime Environment.isWindows) { if (delta > 0) { @@ -1642,14 +1648,13 @@ pub const EventLoop = struct { } pub fn refConcurrently(this: *EventLoop) void { - // TODO maybe this should be AcquireRelease - _ = this.concurrent_ref.fetchAdd(1, .monotonic); + _ = this.concurrent_ref.fetchAdd(1, .seq_cst); this.wakeup(); } pub fn unrefConcurrently(this: *EventLoop) void { // TODO maybe this should be AcquireRelease - _ = this.concurrent_ref.fetchSub(1, .monotonic); + _ = this.concurrent_ref.fetchSub(1, .seq_cst); this.wakeup(); } }; diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 94c07d4d24..fe9b945d4d 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -935,8 +935,12 @@ pub const VirtualMachine = struct { pub fn isEventLoopAlive(vm: *const VirtualMachine) bool { return vm.unhandled_error_counter == 0 and - (vm.event_loop_handle.?.isActive() or - vm.active_tasks + vm.event_loop.tasks.count + vm.event_loop.immediate_tasks.count + vm.event_loop.next_immediate_tasks.count > 0); + (@intFromBool(vm.event_loop_handle.?.isActive()) + + vm.active_tasks + + vm.event_loop.tasks.count + + vm.event_loop.immediate_tasks.count + + vm.event_loop.next_immediate_tasks.count + + @intFromBool(vm.event_loop.hasPendingRefs()) > 0); } pub fn wakeup(this: *VirtualMachine) void { diff --git a/test/napi/napi-app/main.cpp b/test/napi/napi-app/main.cpp index f60f989748..bf197d08f8 100644 --- a/test/napi/napi-app/main.cpp +++ b/test/napi/napi-app/main.cpp @@ -8,6 +8,7 @@ #include #include #include +#include napi_value fail(napi_env env, const char *msg) { napi_value result; @@ -374,6 +375,8 @@ struct AsyncWorkData { napi_deferred deferred; napi_async_work work; + AsyncWorkData() : result(0), deferred(nullptr), work(nullptr) {} + static void execute(napi_env env, void *data) { AsyncWorkData *async_work_data = reinterpret_cast(data); async_work_data->result = 42; @@ -397,7 +400,8 @@ struct AsyncWorkData { napi_value create_promise(const Napi::CallbackInfo &info) { napi_env env = info.Env(); - auto *data = new AsyncWorkData; + auto *data = new AsyncWorkData(); + napi_value promise; assert(napi_create_promise(env, &data->deferred, &promise) == napi_ok); @@ -408,10 +412,92 @@ napi_value create_promise(const Napi::CallbackInfo &info) { assert(napi_create_async_work(env, nullptr, resource_name, AsyncWorkData::execute, AsyncWorkData::complete, data, &data->work) == napi_ok); + assert(napi_queue_async_work(env, data->work) == napi_ok); return promise; } +struct ThreadsafeFunctionData { + napi_threadsafe_function tsfn; + napi_deferred deferred; + + static void thread_entry(ThreadsafeFunctionData *data) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(10ms); + // nonblocking means it will return an error if the threadsafe function's + // queue is full, which it should never do because we only use it once and + // we init with a capacity of 1 + assert(napi_call_threadsafe_function(data->tsfn, nullptr, + napi_tsfn_nonblocking) == napi_ok); + } + + static void tsfn_finalize_callback(napi_env env, void *finalize_data, + void *finalize_hint) { + printf("tsfn_finalize_callback\n"); + ThreadsafeFunctionData *data = + reinterpret_cast(finalize_data); + delete data; + } + + static void tsfn_callback(napi_env env, napi_value js_callback, void *context, + void *data) { + // context == ThreadsafeFunctionData pointer + // data == nullptr + printf("tsfn_callback\n"); + ThreadsafeFunctionData *tsfn_data = + reinterpret_cast(context); + + napi_value recv; + assert(napi_get_undefined(env, &recv) == napi_ok); + + // call our JS function with undefined for this and no arguments + napi_value js_result; + assert(napi_call_function(env, recv, js_callback, 0, nullptr, &js_result) == + napi_ok); + + // resolve the promise with the return value of the JS function + assert(napi_resolve_deferred(env, tsfn_data->deferred, js_result) == + napi_ok); + + // clean up the threadsafe function + assert(napi_release_threadsafe_function(tsfn_data->tsfn, napi_tsfn_abort) == + napi_ok); + } +}; + +napi_value +create_promise_with_threadsafe_function(const Napi::CallbackInfo &info) { + napi_env env = info.Env(); + ThreadsafeFunctionData *tsfn_data = new ThreadsafeFunctionData; + + napi_value async_resource_name; + assert(napi_create_string_utf8( + env, "napitests::create_promise_with_threadsafe_function", + NAPI_AUTO_LENGTH, &async_resource_name) == napi_ok); + + // this is called directly, without the GC callback, so argument 0 is a JS + // callback used to resolve the promise + assert(napi_create_threadsafe_function( + env, info[0], nullptr, async_resource_name, + // max_queue_size, initial_thread_count + 1, 1, + // thread_finalize_data, thread_finalize_cb + tsfn_data, ThreadsafeFunctionData::tsfn_finalize_callback, + // context + tsfn_data, ThreadsafeFunctionData::tsfn_callback, + &tsfn_data->tsfn) == napi_ok); + // create a promise we can return to JS and put the deferred counterpart in + // tsfn_data + napi_value promise; + assert(napi_create_promise(env, &tsfn_data->deferred, &promise) == napi_ok); + + // spawn and release std::thread + std::thread secondary_thread(ThreadsafeFunctionData::thread_entry, tsfn_data); + secondary_thread.detach(); + // return the promise to javascript + return promise; +} + napi_value test_napi_ref(const Napi::CallbackInfo &info) { napi_env env = info.Env(); @@ -511,6 +597,9 @@ Napi::Object InitAll(Napi::Env env, Napi::Object exports1) { exports.Set("get_class_with_constructor", Napi::Function::New(env, get_class_with_constructor)); exports.Set("create_promise", Napi::Function::New(env, create_promise)); + exports.Set( + "create_promise_with_threadsafe_function", + Napi::Function::New(env, create_promise_with_threadsafe_function)); exports.Set("test_napi_ref", Napi::Function::New(env, test_napi_ref)); exports.Set("create_ref_with_finalizer", Napi::Function::New(env, create_ref_with_finalizer)); diff --git a/test/napi/napi-app/main.js b/test/napi/napi-app/main.js index aa5dc05706..0f8aeffa86 100644 --- a/test/napi/napi-app/main.js +++ b/test/napi/napi-app/main.js @@ -20,6 +20,7 @@ const result = fn.apply(null, [ } else if (global.gc) { global.gc(); } + console.log("GC did run"); }, ...eval(process.argv[3] ?? "[]"), ]); diff --git a/test/napi/napi-app/module.js b/test/napi/napi-app/module.js index a0353c7fce..d4295bcf76 100644 --- a/test/napi/napi-app/module.js +++ b/test/napi/napi-app/module.js @@ -24,4 +24,11 @@ nativeTests.test_napi_handle_scope_finalizer = async () => { } }; +nativeTests.test_promise_with_threadsafe_function = async () => { + await new Promise(resolve => setTimeout(resolve, 1)); + // create_promise_with_threadsafe_function returns a promise that calls our function from another + // thread (via napi_threadsafe_function) and resolves with its return value + return await nativeTests.create_promise_with_threadsafe_function(() => 1234); +}; + module.exports = nativeTests; diff --git a/test/napi/napi.test.ts b/test/napi/napi.test.ts index 2d9d9a6d19..ab8660a69c 100644 --- a/test/napi/napi.test.ts +++ b/test/napi/napi.test.ts @@ -60,11 +60,6 @@ describe("napi", () => { }); }); - it("threadsafe function does not hang on finalize", () => { - const result = checkSameOutput("test_napi_threadsafe_function_does_not_hang_after_finalize", []); - expect(result).toBe("success!"); - }); - it("#1288", async () => { const result = checkSameOutput("self", []); expect(result).toBe("hello world!"); @@ -125,6 +120,17 @@ describe("napi", () => { checkSameOutput("test_napi_handle_scope_finalizer", []); }); }); + + describe("napi_threadsafe_function", () => { + it("keeps the event loop alive without async_work", () => { + checkSameOutput("test_promise_with_threadsafe_function", []); + }); + + it("does not hang on finalize", () => { + const result = checkSameOutput("test_napi_threadsafe_function_does_not_hang_after_finalize", []); + expect(result).toBe("success!"); + }); + }); }); function checkSameOutput(test: string, args: any[] | string) {