Files
bun.sh/src/bun.js/event_loop.zig
2022-10-09 02:02:47 -07:00

800 lines
28 KiB
Zig

const std = @import("std");
const JSC = @import("javascript_core");
const JSGlobalObject = JSC.JSGlobalObject;
const VirtualMachine = JSC.VirtualMachine;
const Lock = @import("../lock.zig").Lock;
const Microtask = JSC.Microtask;
const bun = @import("../global.zig");
const Environment = bun.Environment;
const Fetch = JSC.WebCore.Fetch;
const WebCore = JSC.WebCore;
const Bun = JSC.API.Bun;
const TaggedPointerUnion = @import("../tagged_pointer.zig").TaggedPointerUnion;
const typeBaseName = @import("../meta.zig").typeBaseName;
const CopyFilePromiseTask = WebCore.Blob.Store.CopyFile.CopyFilePromiseTask;
const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask;
const ReadFileTask = WebCore.Blob.Store.ReadFile.ReadFileTask;
const WriteFileTask = WebCore.Blob.Store.WriteFile.WriteFileTask;
const napi_async_work = JSC.napi.napi_async_work;
const FetchTasklet = Fetch.FetchTasklet;
const JSValue = JSC.JSValue;
const js = JSC.C;
pub const WorkPool = @import("../work_pool.zig").WorkPool;
pub const WorkPoolTask = @import("../work_pool.zig").Task;
const NetworkThread = @import("http").NetworkThread;
const uws = @import("uws");
pub fn ConcurrentPromiseTask(comptime Context: type) type {
return struct {
const This = @This();
ctx: *Context,
task: WorkPoolTask = .{ .callback = runFromThreadPool },
event_loop: *JSC.EventLoop,
allocator: std.mem.Allocator,
promise: JSValue,
globalThis: *JSGlobalObject,
concurrent_task: JSC.ConcurrentTask = .{},
// This is a poll because we want it to enter the uSockets loop
ref: JSC.PollRef = .{},
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.event_loop,
.ctx = value,
.allocator = allocator,
.promise = JSValue.createInternalPromise(globalThis),
.globalThis = globalThis,
};
this.promise.protect();
this.ref.ref(this.event_loop.virtual_machine);
return this;
}
pub fn runFromThreadPool(task: *WorkPoolTask) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx);
this.onFinish();
}
pub fn runFromJS(this: *This) void {
var promise_value = this.promise;
this.ref.unref(this.event_loop.virtual_machine);
promise_value.ensureStillAlive();
promise_value.unprotect();
var promise = promise_value.asInternalPromise() orelse {
if (comptime @hasDecl(Context, "deinit")) {
@call(.{}, Context.deinit, .{this.ctx});
}
return;
};
var ctx = this.ctx;
ctx.then(promise);
}
pub fn schedule(this: *This) void {
WorkPool.schedule(&this.task);
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn deinit(this: *This) void {
this.allocator.destroy(this);
}
};
}
pub fn IOTask(comptime Context: type) type {
return struct {
const This = @This();
ctx: *Context,
task: NetworkThread.Task = .{ .callback = runFromThreadPool },
event_loop: *JSC.EventLoop,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
concurrent_task: ConcurrentTask = .{},
// This is a poll because we want it to enter the uSockets loop
ref: JSC.PollRef = .{},
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = globalThis.bunVM().eventLoop(),
.ctx = value,
.allocator = allocator,
.globalThis = globalThis,
};
this.ref.ref(this.event_loop.virtual_machine);
return this;
}
pub fn runFromThreadPool(task: *NetworkThread.Task) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx, this);
}
pub fn runFromJS(this: *This) void {
var ctx = this.ctx;
this.ref.unref(this.event_loop.virtual_machine);
ctx.then(this.globalThis);
}
pub fn schedule(this: *This) void {
this.ref.ref(this.event_loop.virtual_machine);
NetworkThread.init() catch return;
NetworkThread.global.schedule(NetworkThread.Batch.from(&this.task));
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn deinit(this: *This) void {
var allocator = this.allocator;
this.ref.unref(this.event_loop.virtual_machine);
this.* = undefined;
allocator.destroy(this);
}
};
}
pub const AnyTask = struct {
ctx: ?*anyopaque,
callback: fn (*anyopaque) void,
pub fn run(this: *AnyTask) void {
@setRuntimeSafety(false);
this.callback(this.ctx.?);
}
pub fn New(comptime Type: type, comptime Callback: anytype) type {
return struct {
pub fn init(ctx: *Type) AnyTask {
return AnyTask{
.callback = wrap,
.ctx = ctx,
};
}
pub fn wrap(this: ?*anyopaque) void {
Callback(@ptrCast(*Type, @alignCast(@alignOf(Type), this.?)));
}
};
}
};
pub const CppTask = opaque {
extern fn Bun__performTask(globalObject: *JSGlobalObject, task: *CppTask) void;
pub fn run(this: *CppTask, global: *JSGlobalObject) void {
JSC.markBinding();
Bun__performTask(global, this);
}
};
const ThreadSafeFunction = JSC.napi.ThreadSafeFunction;
const MicrotaskForDefaultGlobalObject = JSC.MicrotaskForDefaultGlobalObject;
const HotReloadTask = JSC.HotReloader.HotReloadTask;
// const PromiseTask = JSInternalPromise.Completion.PromiseTask;
pub const Task = TaggedPointerUnion(.{
FetchTasklet,
Microtask,
MicrotaskForDefaultGlobalObject,
AsyncTransformTask,
ReadFileTask,
CopyFilePromiseTask,
WriteFileTask,
AnyTask,
napi_async_work,
ThreadSafeFunction,
CppTask,
HotReloadTask,
// PromiseTask,
// TimeoutTasklet,
});
const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue;
pub const ConcurrentTask = struct {
task: Task = undefined,
next: ?*ConcurrentTask = null,
pub const Queue = UnboundedQueue(ConcurrentTask, .next);
pub fn from(this: *ConcurrentTask, of: anytype) *ConcurrentTask {
this.* = .{
.task = Task.init(of),
.next = null,
};
return this;
}
};
const AsyncIO = @import("io");
pub const EventLoop = struct {
tasks: Queue = undefined,
concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
global: *JSGlobalObject = undefined,
virtual_machine: *VirtualMachine = undefined,
waker: ?AsyncIO.Waker = null,
start_server_on_next_tick: bool = false,
defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
pending_processes_to_exit: std.AutoArrayHashMap(*JSC.Subprocess, void) = undefined,
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
pub fn tickWithCount(this: *EventLoop) u32 {
var global = this.global;
var global_vm = global.vm();
var counter: usize = 0;
while (this.tasks.readItem()) |task| {
defer counter += 1;
switch (task.tag()) {
.Microtask => {
var micro: *Microtask = task.as(Microtask);
micro.run(global);
},
.MicrotaskForDefaultGlobalObject => {
var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject);
micro.run(global);
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onDone();
fetch_task.deinit();
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => {
var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
transform_task.*.runFromJS();
},
@field(Task.Tag, @typeName(ReadFileTask)) => {
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, @typeName(WriteFileTask)) => {
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
.HotReloadTask => {
var transform_task: *HotReloadTask = task.get(HotReloadTask).?;
transform_task.*.run();
transform_task.deinit();
// special case: we return
return 0;
},
@field(Task.Tag, typeBaseName(@typeName(AnyTask))) => {
var any: *AnyTask = task.get(AnyTask).?;
any.run();
},
@field(Task.Tag, typeBaseName(@typeName(CppTask))) => {
var any: *CppTask = task.get(CppTask).?;
any.run(global);
},
else => if (Environment.allow_assert) {
bun.Output.prettyln("\nUnexpected tag: {s}\n", .{@tagName(task.tag())});
} else unreachable,
}
global_vm.releaseWeakRefs();
global_vm.drainMicrotasks();
}
if (this.tasks.count == 0) {
this.tasks.head = 0;
}
return @truncate(u32, counter);
}
pub fn tickConcurrent(this: *EventLoop) void {
_ = this.tickConcurrentWithCount();
}
pub fn tickConcurrentWithCount(this: *EventLoop) usize {
var concurrent = this.concurrent_tasks.popBatch();
const count = concurrent.count;
if (count == 0)
return 0;
var iter = concurrent.iterator();
const start_count = this.tasks.count;
if (start_count == 0) {
this.tasks.head = 0;
}
this.tasks.ensureUnusedCapacity(count) catch unreachable;
var writable = this.tasks.writableSlice(0);
while (iter.next()) |task| {
writable[0] = task.task;
writable = writable[1..];
this.tasks.count += 1;
if (writable.len == 0) break;
}
return this.tasks.count - start_count;
}
pub fn autoTick(this: *EventLoop) void {
if (this.virtual_machine.uws_event_loop.?.num_polls > 0 or this.virtual_machine.uws_event_loop.?.active > 0) {
this.virtual_machine.uws_event_loop.?.tick();
this.afterUSocketsTick();
}
}
// TODO: fix this technical debt
pub fn tick(this: *EventLoop) void {
var ctx = this.virtual_machine;
this.tickConcurrent();
var global_vm = ctx.global.vm();
while (true) {
while (this.tickWithCount() > 0) {
this.tickConcurrent();
} else {
global_vm.releaseWeakRefs();
global_vm.drainMicrotasks();
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
break;
}
this.global.vm().doWork();
while (this.tickWithCount() > 0) {
this.tickConcurrent();
}
this.global.handleRejectedPromises();
}
pub fn runUSocketsLoop(this: *EventLoop) void {
var ctx = this.virtual_machine;
ctx.global.vm().releaseWeakRefs();
ctx.global.vm().drainMicrotasks();
if (ctx.poller.loop != null and ctx.poller.loop.?.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (ctx.uws_event_loop.?.num_polls > 0 or this.start_server_on_next_tick))) {
if (this.tickConcurrentWithCount() > 0) {
this.tick();
} else {
if ((@intCast(c_ulonglong, ctx.uws_event_loop.?.internal_loop_data.iteration_nr) % 1_000) == 1) {
_ = ctx.global.vm().runGC(true);
}
}
ctx.is_us_loop_entered = true;
this.start_server_on_next_tick = false;
ctx.enterUWSLoop();
ctx.is_us_loop_entered = false;
}
}
// TODO: fix this technical debt
pub fn waitForPromise(this: *EventLoop, promise: *JSC.JSInternalPromise) void {
switch (promise.status(this.global.vm())) {
JSC.JSPromise.Status.Pending => {
while (promise.status(this.global.vm()) == .Pending) {
this.tick();
if (promise.status(this.global.vm()) == .Pending) {
this.autoTick();
}
}
},
else => {},
}
}
pub fn waitForTasks(this: *EventLoop) void {
this.tick();
while (this.tasks.count > 0) {
this.tick();
if (this.virtual_machine.uws_event_loop != null) {
this.runUSocketsLoop();
}
} else {
if (this.virtual_machine.uws_event_loop != null) {
this.runUSocketsLoop();
}
}
}
pub fn enqueueTask(this: *EventLoop, task: Task) void {
this.tasks.writeItem(task) catch unreachable;
}
pub fn ensureWaker(this: *EventLoop) void {
JSC.markBinding();
if (this.virtual_machine.uws_event_loop == null) {
var actual = uws.Loop.get().?;
this.virtual_machine.uws_event_loop = actual;
// _ = actual.addPostHandler(*JSC.EventLoop, this, JSC.EventLoop.afterUSocketsTick);
// _ = actual.addPreHandler(*JSC.VM, this.virtual_machine.global.vm(), JSC.VM.drainMicrotasks);
}
}
pub fn afterUSocketsTick(this: *EventLoop) void {
const processes = this.pending_processes_to_exit.keys();
if (processes.len > 0) {
for (processes) |process| {
process.onExitNotification();
}
this.pending_processes_to_exit.clearRetainingCapacity();
}
}
pub fn enqueueTaskConcurrent(this: *EventLoop, task: *ConcurrentTask) void {
JSC.markBinding();
this.concurrent_tasks.push(task);
if (this.virtual_machine.uws_event_loop) |loop| {
loop.wakeup();
}
}
};
pub const Poller = struct {
/// kqueue() or epoll()
/// 0 == unset
loop: ?*uws.Loop = null,
pub fn dispatchKQueueEvent(loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void {
if (comptime !Environment.isMac) {
unreachable;
}
var ptr = Pollable.from(@intToPtr(?*anyopaque, kqueue_event.udata));
switch (ptr.tag()) {
@field(Pollable.Tag, "FileBlobLoader") => {
var loader = ptr.as(FileBlobLoader);
loader.poll_ref.deactivate(loop);
loader.onPoll(@bitCast(i64, kqueue_event.data), kqueue_event.flags);
},
@field(Pollable.Tag, "Subprocess") => {
var loader = ptr.as(JSC.Subprocess);
loader.poll_ref.deactivate(loop);
// kqueue sends the same notification multiple times in the same tick potentially
// so we have to dedupe it
_ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable;
},
@field(Pollable.Tag, "BufferedInput") => {
var loader = ptr.as(JSC.Subprocess.BufferedInput);
loader.poll_ref.deactivate(loop);
loader.onReady(@bitCast(i64, kqueue_event.data));
},
@field(Pollable.Tag, "BufferedOutput") => {
var loader = ptr.as(JSC.Subprocess.BufferedOutput);
loader.poll_ref.deactivate(loop);
loader.ready(@bitCast(i64, kqueue_event.data));
},
@field(Pollable.Tag, "FileSink") => {
var loader = ptr.as(JSC.WebCore.FileSink);
loader.poll_ref.deactivate(loop);
loader.onPoll(0, 0);
},
else => |tag| {
bun.Output.panic(
"Internal error\nUnknown pollable tag: {d}\n",
.{@enumToInt(tag)},
);
},
}
}
fn dispatchEpollEvent(loop: *uws.Loop, epoll_event: *linux.epoll_event) void {
var ptr = Pollable.from(@intToPtr(?*anyopaque, epoll_event.data.ptr));
switch (ptr.tag()) {
@field(Pollable.Tag, "FileBlobLoader") => {
var loader = ptr.as(FileBlobLoader);
loader.poll_ref.deactivate(loop);
loader.onPoll(0, 0);
},
@field(Pollable.Tag, "Subprocess") => {
var loader = ptr.as(JSC.Subprocess);
loader.poll_ref.deactivate(loop);
// kqueue sends the same notification multiple times in the same tick potentially
// so we have to dedupe it
_ = loader.globalThis.bunVM().eventLoop().pending_processes_to_exit.getOrPut(loader) catch unreachable;
},
@field(Pollable.Tag, "FileSink") => {
var loader = ptr.as(JSC.WebCore.FileSink);
loader.poll_ref.deactivate(loop);
loader.onPoll(0, 0);
},
@field(Pollable.Tag, "BufferedInput") => {
var loader = ptr.as(JSC.Subprocess.BufferedInput);
loader.poll_ref.deactivate(loop);
loader.onReady(0);
},
@field(Pollable.Tag, "BufferedOutput") => {
var loader = ptr.as(JSC.Subprocess.BufferedOutput);
loader.poll_ref.deactivate(loop);
loader.ready(0);
},
else => unreachable,
}
}
const timeout = std.mem.zeroes(std.os.timespec);
const linux = std.os.linux;
const FileBlobLoader = JSC.WebCore.FileBlobLoader;
const FileSink = JSC.WebCore.FileSink;
const Subprocess = JSC.Subprocess;
const BufferedInput = Subprocess.BufferedInput;
const BufferedOutput = Subprocess.BufferedOutput;
/// epoll only allows one pointer
/// We unfortunately need two pointers: one for a function call and one for the context
/// We use a tagged pointer union and then call the function with the context pointer
pub const Pollable = TaggedPointerUnion(.{
FileBlobLoader,
FileSink,
Subprocess,
BufferedInput,
BufferedOutput,
});
const Kevent = std.os.Kevent;
const kevent = std.c.kevent;
pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) {
if (this.loop == null) {
this.loop = uws.Loop.get();
JSC.VirtualMachine.vm.uws_event_loop = this.loop.?;
}
const watcher_fd = this.loop.?.fd;
if (comptime Environment.isLinux) {
const flags: u32 = switch (flag) {
.process, .read => linux.EPOLL.IN | linux.EPOLL.HUP | linux.EPOLL.ONESHOT,
.write => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | linux.EPOLL.ONESHOT,
};
var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @ptrToInt(Pollable.init(ctx).ptr()) } };
const ctl = linux.epoll_ctl(
watcher_fd,
linux.EPOLL.CTL_ADD,
fd,
&event,
);
if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
return errno;
}
ctx.poll_ref.activate(this.loop.?);
return JSC.Maybe(void).success;
} else if (comptime Environment.isMac) {
var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
changelist[0] = switch (flag) {
.read => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_READ,
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(ctx).ptr()),
.flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT,
.ext = .{ 0, 0 },
},
.write => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_WRITE,
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(ctx).ptr()),
.flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT,
.ext = .{ 0, 0 },
},
.process => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_PROC,
.data = 0,
.fflags = std.c.NOTE_EXIT,
.udata = @ptrToInt(Pollable.init(ctx).ptr()),
.flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT,
.ext = .{ 0, 0 },
},
};
// output events only include change errors
const KEVENT_FLAG_ERROR_EVENTS = 0x000002;
// The kevent() system call returns the number of events placed in
// the eventlist, up to the value given by nevents. If the time
// limit expires, then kevent() returns 0.
const rc = std.os.system.kevent64(
watcher_fd,
&changelist,
1,
// The same array may be used for the changelist and eventlist.
&changelist,
1,
KEVENT_FLAG_ERROR_EVENTS,
&timeout,
);
// If an error occurs while
// processing an element of the changelist and there is enough room
// in the eventlist, then the event will be placed in the eventlist
// with EV_ERROR set in flags and the system error in data.
if (changelist[0].flags == std.c.EV_ERROR) {
return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
// Otherwise, -1 will be returned, and errno will be set to
// indicate the error condition.
}
const errno = std.c.getErrno(rc);
if (errno == .SUCCESS) {
ctx.poll_ref.activate(this.loop.?);
return JSC.Maybe(void).success;
}
switch (rc) {
std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
else => unreachable,
}
} else {
@compileError("TODO: Poller");
}
}
pub fn unwatch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, comptime ContextType: type, ctx: *ContextType) JSC.Maybe(void) {
if (this.loop == null) {
this.loop = uws.Loop.get();
JSC.VirtualMachine.vm.uws_event_loop = this.loop.?;
}
const watcher_fd = this.loop.?.fd;
if (comptime Environment.isLinux) {
const ctl = linux.epoll_ctl(
watcher_fd,
linux.EPOLL.CTL_DEL,
fd,
null,
);
if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
return errno;
}
ctx.poll_ref.deactivate(this.loop.?);
return JSC.Maybe(void).success;
} else if (comptime Environment.isMac) {
var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
changelist[0] = switch (flag) {
.read => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_READ,
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(ctx).ptr()),
.flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
.ext = .{ 0, 0 },
},
.write => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_WRITE,
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(ctx).ptr()),
.flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
.ext = .{ 0, 0 },
},
.process => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_PROC,
.data = 0,
.fflags = std.c.NOTE_EXIT,
.udata = @ptrToInt(Pollable.init(ctx).ptr()),
.flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
.ext = .{ 0, 0 },
},
};
// output events only include change errors
const KEVENT_FLAG_ERROR_EVENTS = 0x000002;
// The kevent() system call returns the number of events placed in
// the eventlist, up to the value given by nevents. If the time
// limit expires, then kevent() returns 0.
const rc = std.os.system.kevent64(
watcher_fd,
&changelist,
1,
// The same array may be used for the changelist and eventlist.
&changelist,
1,
KEVENT_FLAG_ERROR_EVENTS,
&timeout,
);
// If an error occurs while
// processing an element of the changelist and there is enough room
// in the eventlist, then the event will be placed in the eventlist
// with EV_ERROR set in flags and the system error in data.
if (changelist[0].flags == std.c.EV_ERROR) {
return JSC.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
// Otherwise, -1 will be returned, and errno will be set to
// indicate the error condition.
}
const errno = std.c.getErrno(rc);
if (errno == .SUCCESS) {
ctx.poll_ref.deactivate(this.loop.?);
return JSC.Maybe(void).success;
}
switch (rc) {
std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
else => unreachable,
}
} else {
@compileError("TODO: Poller");
}
}
pub fn tick(this: *Poller) void {
var loop = this.loop orelse return;
if (loop.active == 0) return;
loop.tick();
}
pub fn onTick(loop: *uws.Loop, tagged_pointer: ?*anyopaque) callconv(.C) void {
_ = loop;
_ = tagged_pointer;
if (comptime Environment.isMac)
dispatchKQueueEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)])
else if (comptime Environment.isLinux)
dispatchEpollEvent(loop, &loop.ready_polls[@intCast(usize, loop.current_ready_poll)]);
}
pub const Flag = enum {
read,
write,
process,
};
comptime {
@export(onTick, .{ .name = "Bun__internal_dispatch_ready_poll" });
}
};