mirror of
https://github.com/oven-sh/bun
synced 2026-02-09 18:38:55 +00:00
Keep event loop alive when refConcurrently has been called (#14068)
This commit is contained in:
@@ -1288,8 +1288,14 @@ pub const EventLoop = struct {
|
||||
_ = this.tickConcurrentWithCount();
|
||||
}
|
||||
|
||||
/// Check whether refConcurrently has been called but the change has not yet been applied to the
|
||||
/// underlying event loop's `active` counter
|
||||
pub fn hasPendingRefs(this: *const EventLoop) bool {
|
||||
return this.concurrent_ref.load(.seq_cst) > 0;
|
||||
}
|
||||
|
||||
fn updateCounts(this: *EventLoop) void {
|
||||
const delta = this.concurrent_ref.swap(0, .monotonic);
|
||||
const delta = this.concurrent_ref.swap(0, .seq_cst);
|
||||
const loop = this.virtual_machine.event_loop_handle.?;
|
||||
if (comptime Environment.isWindows) {
|
||||
if (delta > 0) {
|
||||
@@ -1642,14 +1648,13 @@ pub const EventLoop = struct {
|
||||
}
|
||||
|
||||
pub fn refConcurrently(this: *EventLoop) void {
|
||||
// TODO maybe this should be AcquireRelease
|
||||
_ = this.concurrent_ref.fetchAdd(1, .monotonic);
|
||||
_ = this.concurrent_ref.fetchAdd(1, .seq_cst);
|
||||
this.wakeup();
|
||||
}
|
||||
|
||||
pub fn unrefConcurrently(this: *EventLoop) void {
|
||||
// TODO maybe this should be AcquireRelease
|
||||
_ = this.concurrent_ref.fetchSub(1, .monotonic);
|
||||
_ = this.concurrent_ref.fetchSub(1, .seq_cst);
|
||||
this.wakeup();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -935,8 +935,12 @@ pub const VirtualMachine = struct {
|
||||
|
||||
pub fn isEventLoopAlive(vm: *const VirtualMachine) bool {
|
||||
return vm.unhandled_error_counter == 0 and
|
||||
(vm.event_loop_handle.?.isActive() or
|
||||
vm.active_tasks + vm.event_loop.tasks.count + vm.event_loop.immediate_tasks.count + vm.event_loop.next_immediate_tasks.count > 0);
|
||||
(@intFromBool(vm.event_loop_handle.?.isActive()) +
|
||||
vm.active_tasks +
|
||||
vm.event_loop.tasks.count +
|
||||
vm.event_loop.immediate_tasks.count +
|
||||
vm.event_loop.next_immediate_tasks.count +
|
||||
@intFromBool(vm.event_loop.hasPendingRefs()) > 0);
|
||||
}
|
||||
|
||||
pub fn wakeup(this: *VirtualMachine) void {
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
napi_value fail(napi_env env, const char *msg) {
|
||||
napi_value result;
|
||||
@@ -374,6 +375,8 @@ struct AsyncWorkData {
|
||||
napi_deferred deferred;
|
||||
napi_async_work work;
|
||||
|
||||
AsyncWorkData() : result(0), deferred(nullptr), work(nullptr) {}
|
||||
|
||||
static void execute(napi_env env, void *data) {
|
||||
AsyncWorkData *async_work_data = reinterpret_cast<AsyncWorkData *>(data);
|
||||
async_work_data->result = 42;
|
||||
@@ -397,7 +400,8 @@ struct AsyncWorkData {
|
||||
|
||||
napi_value create_promise(const Napi::CallbackInfo &info) {
|
||||
napi_env env = info.Env();
|
||||
auto *data = new AsyncWorkData;
|
||||
auto *data = new AsyncWorkData();
|
||||
|
||||
napi_value promise;
|
||||
|
||||
assert(napi_create_promise(env, &data->deferred, &promise) == napi_ok);
|
||||
@@ -408,10 +412,92 @@ napi_value create_promise(const Napi::CallbackInfo &info) {
|
||||
assert(napi_create_async_work(env, nullptr, resource_name,
|
||||
AsyncWorkData::execute, AsyncWorkData::complete,
|
||||
data, &data->work) == napi_ok);
|
||||
|
||||
assert(napi_queue_async_work(env, data->work) == napi_ok);
|
||||
return promise;
|
||||
}
|
||||
|
||||
struct ThreadsafeFunctionData {
|
||||
napi_threadsafe_function tsfn;
|
||||
napi_deferred deferred;
|
||||
|
||||
static void thread_entry(ThreadsafeFunctionData *data) {
|
||||
using namespace std::chrono_literals;
|
||||
std::this_thread::sleep_for(10ms);
|
||||
// nonblocking means it will return an error if the threadsafe function's
|
||||
// queue is full, which it should never do because we only use it once and
|
||||
// we init with a capacity of 1
|
||||
assert(napi_call_threadsafe_function(data->tsfn, nullptr,
|
||||
napi_tsfn_nonblocking) == napi_ok);
|
||||
}
|
||||
|
||||
static void tsfn_finalize_callback(napi_env env, void *finalize_data,
|
||||
void *finalize_hint) {
|
||||
printf("tsfn_finalize_callback\n");
|
||||
ThreadsafeFunctionData *data =
|
||||
reinterpret_cast<ThreadsafeFunctionData *>(finalize_data);
|
||||
delete data;
|
||||
}
|
||||
|
||||
static void tsfn_callback(napi_env env, napi_value js_callback, void *context,
|
||||
void *data) {
|
||||
// context == ThreadsafeFunctionData pointer
|
||||
// data == nullptr
|
||||
printf("tsfn_callback\n");
|
||||
ThreadsafeFunctionData *tsfn_data =
|
||||
reinterpret_cast<ThreadsafeFunctionData *>(context);
|
||||
|
||||
napi_value recv;
|
||||
assert(napi_get_undefined(env, &recv) == napi_ok);
|
||||
|
||||
// call our JS function with undefined for this and no arguments
|
||||
napi_value js_result;
|
||||
assert(napi_call_function(env, recv, js_callback, 0, nullptr, &js_result) ==
|
||||
napi_ok);
|
||||
|
||||
// resolve the promise with the return value of the JS function
|
||||
assert(napi_resolve_deferred(env, tsfn_data->deferred, js_result) ==
|
||||
napi_ok);
|
||||
|
||||
// clean up the threadsafe function
|
||||
assert(napi_release_threadsafe_function(tsfn_data->tsfn, napi_tsfn_abort) ==
|
||||
napi_ok);
|
||||
}
|
||||
};
|
||||
|
||||
napi_value
|
||||
create_promise_with_threadsafe_function(const Napi::CallbackInfo &info) {
|
||||
napi_env env = info.Env();
|
||||
ThreadsafeFunctionData *tsfn_data = new ThreadsafeFunctionData;
|
||||
|
||||
napi_value async_resource_name;
|
||||
assert(napi_create_string_utf8(
|
||||
env, "napitests::create_promise_with_threadsafe_function",
|
||||
NAPI_AUTO_LENGTH, &async_resource_name) == napi_ok);
|
||||
|
||||
// this is called directly, without the GC callback, so argument 0 is a JS
|
||||
// callback used to resolve the promise
|
||||
assert(napi_create_threadsafe_function(
|
||||
env, info[0], nullptr, async_resource_name,
|
||||
// max_queue_size, initial_thread_count
|
||||
1, 1,
|
||||
// thread_finalize_data, thread_finalize_cb
|
||||
tsfn_data, ThreadsafeFunctionData::tsfn_finalize_callback,
|
||||
// context
|
||||
tsfn_data, ThreadsafeFunctionData::tsfn_callback,
|
||||
&tsfn_data->tsfn) == napi_ok);
|
||||
// create a promise we can return to JS and put the deferred counterpart in
|
||||
// tsfn_data
|
||||
napi_value promise;
|
||||
assert(napi_create_promise(env, &tsfn_data->deferred, &promise) == napi_ok);
|
||||
|
||||
// spawn and release std::thread
|
||||
std::thread secondary_thread(ThreadsafeFunctionData::thread_entry, tsfn_data);
|
||||
secondary_thread.detach();
|
||||
// return the promise to javascript
|
||||
return promise;
|
||||
}
|
||||
|
||||
napi_value test_napi_ref(const Napi::CallbackInfo &info) {
|
||||
napi_env env = info.Env();
|
||||
|
||||
@@ -511,6 +597,9 @@ Napi::Object InitAll(Napi::Env env, Napi::Object exports1) {
|
||||
exports.Set("get_class_with_constructor",
|
||||
Napi::Function::New(env, get_class_with_constructor));
|
||||
exports.Set("create_promise", Napi::Function::New(env, create_promise));
|
||||
exports.Set(
|
||||
"create_promise_with_threadsafe_function",
|
||||
Napi::Function::New(env, create_promise_with_threadsafe_function));
|
||||
exports.Set("test_napi_ref", Napi::Function::New(env, test_napi_ref));
|
||||
exports.Set("create_ref_with_finalizer",
|
||||
Napi::Function::New(env, create_ref_with_finalizer));
|
||||
|
||||
@@ -20,6 +20,7 @@ const result = fn.apply(null, [
|
||||
} else if (global.gc) {
|
||||
global.gc();
|
||||
}
|
||||
console.log("GC did run");
|
||||
},
|
||||
...eval(process.argv[3] ?? "[]"),
|
||||
]);
|
||||
|
||||
@@ -24,4 +24,11 @@ nativeTests.test_napi_handle_scope_finalizer = async () => {
|
||||
}
|
||||
};
|
||||
|
||||
nativeTests.test_promise_with_threadsafe_function = async () => {
|
||||
await new Promise(resolve => setTimeout(resolve, 1));
|
||||
// create_promise_with_threadsafe_function returns a promise that calls our function from another
|
||||
// thread (via napi_threadsafe_function) and resolves with its return value
|
||||
return await nativeTests.create_promise_with_threadsafe_function(() => 1234);
|
||||
};
|
||||
|
||||
module.exports = nativeTests;
|
||||
|
||||
@@ -60,11 +60,6 @@ describe("napi", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("threadsafe function does not hang on finalize", () => {
|
||||
const result = checkSameOutput("test_napi_threadsafe_function_does_not_hang_after_finalize", []);
|
||||
expect(result).toBe("success!");
|
||||
});
|
||||
|
||||
it("#1288", async () => {
|
||||
const result = checkSameOutput("self", []);
|
||||
expect(result).toBe("hello world!");
|
||||
@@ -125,6 +120,17 @@ describe("napi", () => {
|
||||
checkSameOutput("test_napi_handle_scope_finalizer", []);
|
||||
});
|
||||
});
|
||||
|
||||
describe("napi_threadsafe_function", () => {
|
||||
it("keeps the event loop alive without async_work", () => {
|
||||
checkSameOutput("test_promise_with_threadsafe_function", []);
|
||||
});
|
||||
|
||||
it("does not hang on finalize", () => {
|
||||
const result = checkSameOutput("test_napi_threadsafe_function_does_not_hang_after_finalize", []);
|
||||
expect(result).toBe("success!");
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function checkSameOutput(test: string, args: any[] | string) {
|
||||
|
||||
Reference in New Issue
Block a user