napi_threadsafe_function

This commit is contained in:
Jarred Sumner
2022-05-05 03:20:03 -07:00
parent b487eb7e13
commit 30ca112260
2 changed files with 254 additions and 15 deletions

View File

@@ -480,11 +480,12 @@ pub const SavedSourceMap = struct {
const uws = @import("uws");
pub const AnyTask = struct {
ctx: *anyopaque,
ctx: ?*anyopaque,
callback: fn (*anyopaque) void,
pub fn run(this: *AnyTask) void {
this.callback(this.ctx);
@setRuntimeSafety(false);
this.callback(this.ctx.?);
}
pub fn New(comptime Type: type, comptime Callback: anytype) type {

View File

@@ -2,10 +2,13 @@ const std = @import("std");
const JSC = @import("javascript_core");
const strings = @import("strings");
const bun = @import("../global.zig");
const Lock = @import("../lock.zig").Lock;
const JSValue = JSC.JSValue;
const ZigString = JSC.ZigString;
const TODO_EXCEPTION: JSC.C.ExceptionRef = null;
const Channel = @import("../sync.zig").Channel;
pub const napi_env = *JSC.JSGlobalObject;
pub const napi_ref = struct_napi_ref__;
pub const napi_handle_scope = napi_env;
@@ -634,6 +637,7 @@ pub export fn napi_get_array_length(env: napi_env, value: napi_value, result: [*
return .ok;
}
pub export fn napi_strict_equals(env: napi_env, lhs: napi_value, rhs: napi_value, result: *bool) napi_status {
// there is some nuance with NaN here i'm not sure about
result.* = lhs.isSameValue(rhs, env);
return .ok;
}
@@ -977,17 +981,17 @@ pub const napi_async_work = struct {
);
}
};
pub const struct_napi_threadsafe_function__ = opaque {};
pub const napi_threadsafe_function = ?*struct_napi_threadsafe_function__;
pub const napi_tsfn_release: c_int = 0;
pub const napi_tsfn_abort: c_int = 1;
pub const napi_threadsafe_function_release_mode = c_uint;
pub const napi_threadsafe_function = *ThreadSafeFunction;
pub const napi_threadsafe_function_release_mode = enum(c_uint) {
release = 0,
abort = 1,
};
pub const napi_tsfn_nonblocking: c_int = 0;
pub const napi_tsfn_blocking: c_int = 1;
pub const napi_threadsafe_function_call_mode = c_uint;
pub const napi_async_execute_callback = ?fn (napi_env, ?*anyopaque) callconv(.C) void;
pub const napi_async_complete_callback = ?fn (napi_env, napi_status, ?*anyopaque) callconv(.C) void;
pub const napi_threadsafe_function_call_js = ?fn (napi_env, napi_value, ?*anyopaque, ?*anyopaque) callconv(.C) void;
pub const napi_threadsafe_function_call_js = fn (napi_env, napi_value, ?*anyopaque, ?*anyopaque) callconv(.C) void;
pub const napi_node_version = extern struct {
major: u32,
minor: u32,
@@ -1141,15 +1145,249 @@ pub export fn napi_remove_env_cleanup_hook(env: napi_env, fun: ?fn (?*anyopaque)
return .ok;
}
pub const Finalizer = struct {
fun: napi_finalize,
ctx: ?*anyopaque = null,
};
// TODO: generate comptime version of this instead of runtime checking
pub const ThreadSafeFunction = struct {
/// thread-safe functions can be "referenced" and "unreferenced". A
/// "referenced" thread-safe function will cause the event loop on the thread
/// on which it is created to remain alive until the thread-safe function is
/// destroyed. In contrast, an "unreferenced" thread-safe function will not
/// prevent the event loop from exiting. The APIs napi_ref_threadsafe_function
/// and napi_unref_threadsafe_function exist for this purpose.
///
/// Neither does napi_unref_threadsafe_function mark the thread-safe
/// functions as able to be destroyed nor does napi_ref_threadsafe_function
/// prevent it from being destroyed.
ref_for_process_exit: bool = false,
owning_threads: std.AutoArrayHashMapUnmanaged(u64) = .{},
owning_thread_lock: Lock = Lock.init(),
event_loop: *JSC.VirtualMachine.EventLoop,
finalizer: ?*Finalizer = null,
javascript_function: JSValue,
finalizer_task: JSC.AnyTask = undefined,
finalizer: Finalizer = Finalizer{ .fun = null, .ctx = null },
channel: Queue,
ctx: ?*anyopaque = null,
call_js: ?napi_threadsafe_function_call_js = null,
const ThreadSafeFunctionTask = JSC.AnyTask.New(@This(), call);
pub const Queue = union(enum) {
sized: Channel(?*anyopaque, .Slice),
unsized: Channel(?*anyopaque, .Slice),
pub fn isClosed(this: *const @This()) bool {
return @atomicLoad(
bool,
switch (this) {
.sized => &this.size.is_closed,
.unsized => &this.unsized.is_closed,
},
.SeqCst,
);
}
pub fn close(this: *@This()) bool {
switch (this) {
.sized => this.size.close(),
.unsized => this.unsized.close(),
}
}
pub fn init(size: usize, allocator: std.mem.Allocator) @This() {
switch (size) {
0 => {
return .{
.unsized = Channel(?*anyopaque, .Dynamic).init(allocator),
};
},
else => {
var slice = allocator.alloc(?*anyopaque, size) catch unreachable;
return .{
.sized = Channel(?*anyopaque, .Slice).init(slice),
};
},
}
}
pub fn writeItem(this: *@This(), value: ?*anyopaque) !void {
switch (this.*) {
.sized => try this.sized.writeItem(value),
.unsized => try this.unsized.writeItem(value),
}
}
pub fn readItem(this: *@This()) !?*anyopaque {
switch (this.*) {
.sized => try this.sized.readItem(),
.unsized => try this.unsized.readItem(),
}
}
pub fn tryWriteItem(this: *@This(), value: ?*anyopaque) !bool {
switch (this.*) {
.sized => try this.sized.tryWriteItem(value),
.unsized => try this.unsized.tryWriteItem(value),
}
}
pub fn tryReadItem(this: *@This()) !??*anyopaque {
switch (this.*) {
.sized => try this.sized.tryReadItem(),
.unsized => try this.unsized.tryReadItem(),
}
}
};
pub fn call(this: *ThreadSafeFunction) void {
var task = this.channel.tryReadItem() catch null orelse return;
if (this.call_js) |cb| {
cb(this.event_loop.global, this.javascript_function, task, this.ctx);
} else {
// TODO: wrapper that reports errors
_ = JSC.C.JSObjectCallAsFunction(
this.event_loop.global.ref(),
this.javascript_function.asObjectRef(),
JSC.JSValue.jsUndefined().asObjectRef(),
0,
null,
null,
);
}
}
pub fn enqueue(this: *ThreadSafeFunction, ctx: ?*anyopaque, block: bool) !void {
if (block) {
try this.channel.writeItem(JSC.AnyTask{ .ctx = ctx, .run = this.call });
} else {
if (!this.channel.tryWriteItem(JSC.AnyTask{ .ctx = ctx, .run = this.call })) {
return error.WouldBlock;
}
}
this.event_loop.enqueueTaskConcurrent(ThreadSafeFunction.init(this));
}
pub fn finalize(opaq: *anyopaque) void {
var this = bun.cast(*ThreadSafeFunction, opaq);
if (this.finalizer.fun) |fun| {
fun(this.finalizer.ctx);
}
JSC.C.JSValueUnprotect(this.event_loop.global.ref(), this.javascript_function.asObjectRef());
bun.default_allocator.destroy(this);
}
pub fn ref(this: *ThreadSafeFunction) void {
this.ref_for_process_exit = true;
}
pub fn unref(this: *ThreadSafeFunction) void {
this.ref_for_process_exit = false;
}
pub fn acquire(this: *ThreadSafeFunction) !void {
this.owning_thread_lock.lock();
defer this.owning_thread_lock.unlock();
if (this.channel.isClosed())
return error.Closed;
_ = this.owning_threads.getOrPut(bun.default_allocator, std.Thread.getCurrentId()) catch unreachable;
}
pub fn release(this: *ThreadSafeFunction, mode: napi_threadsafe_function_release_mode) void {
this.owning_thread_lock.lock();
defer this.owning_thread_lock.unlock();
if (!this.owning_threads.swapRemove(std.Thread.getCurrentId()))
return;
if (mode == .abort) {
this.channel.close();
}
if (this.owning_threads.count() == 0) {
this.finalizer_task = JSC.AnyTask{ .ctx = this, .callback = finalize };
this.event_loop.enqueueTaskConcurrent(&this.finalizer_task);
return;
}
}
};
pub extern fn napi_open_callback_scope(env: napi_env, resource_object: napi_value, context: napi_async_context, result: [*c]napi_callback_scope) napi_status;
pub extern fn napi_close_callback_scope(env: napi_env, scope: napi_callback_scope) napi_status;
pub extern fn napi_create_threadsafe_function(env: napi_env, func: napi_value, async_resource: napi_value, async_resource_name: napi_value, max_queue_size: usize, initial_thread_count: usize, thread_finalize_data: ?*anyopaque, thread_finalize_cb: napi_finalize, context: ?*anyopaque, call_js_cb: napi_threadsafe_function_call_js, result: [*c]napi_threadsafe_function) napi_status;
pub extern fn napi_get_threadsafe_function_context(func: napi_threadsafe_function, result: [*]*anyopaque) napi_status;
pub extern fn napi_call_threadsafe_function(func: napi_threadsafe_function, data: ?*anyopaque, is_blocking: napi_threadsafe_function_call_mode) napi_status;
pub extern fn napi_acquire_threadsafe_function(func: napi_threadsafe_function) napi_status;
pub extern fn napi_release_threadsafe_function(func: napi_threadsafe_function, mode: napi_threadsafe_function_release_mode) napi_status;
pub extern fn napi_unref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status;
pub extern fn napi_ref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status;
pub export fn napi_create_threadsafe_function(
env: napi_env,
func: napi_value,
_: napi_value,
_: napi_value,
max_queue_size: usize,
initial_thread_count: usize,
thread_finalize_data: ?*anyopaque,
thread_finalize_cb: napi_finalize,
context: ?*anyopaque,
call_js_cb: ?napi_threadsafe_function_call_js,
result: *napi_threadsafe_function,
) napi_status {
// TODO: don't do this
// just have a GC hook for this...
JSC.C.JSValueProtect(env.ref(), func.asObjectRef());
var function = bun.default_allocator.create(ThreadSafeFunction) catch return .generic_failure;
function.* = .{
.event_loop = JSC.VirtualMachine.vm.eventLoop(),
.javascript_function = func,
.call_js = call_js_cb,
.ctx = context,
.queue = ThreadSafeFunction.Queue.init(max_queue_size, bun.default_allocator),
.owning_threads = .{},
};
function.owning_threads.ensureTotalCapacity(bun.default_allocator, initial_thread_count) catch return .generic_failure;
function.finalizer = .{ .ctx = thread_finalize_data, .fun = thread_finalize_cb };
result.* = function;
return .ok;
}
pub export fn napi_get_threadsafe_function_context(func: napi_threadsafe_function, result: *?*anyopaque) napi_status {
result.* = func.ctx;
return .ok;
}
pub export fn napi_call_threadsafe_function(func: napi_threadsafe_function, data: ?*anyopaque, is_blocking: napi_threadsafe_function_call_mode) napi_status {
func.enqueue(data, is_blocking) catch |err| {
switch (err) {
error.WouldBlock => {
return napi_status.queue_full;
},
error.Closing => {
return napi_status.closing;
},
}
};
return .ok;
}
pub export fn napi_acquire_threadsafe_function(func: napi_threadsafe_function) napi_status {
func.acquire() catch return .closing;
return .ok;
}
pub export fn napi_release_threadsafe_function(func: napi_threadsafe_function, mode: napi_threadsafe_function_release_mode) napi_status {
func.release(mode);
return .ok;
}
pub export fn napi_unref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status {
std.debug.assert(func.event_loop.global == env);
func.unref();
return .ok;
}
pub export fn napi_ref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status {
std.debug.assert(func.event_loop.global == env);
func.ref();
return .ok;
}
pub export fn napi_add_async_cleanup_hook(_: napi_env, _: napi_async_cleanup_hook, _: ?*anyopaque, _: [*c]napi_async_cleanup_hook_handle) napi_status {
// TODO: