Compare commits

...

4 Commits

Author SHA1 Message Date
Jarred Sumner
51286a9f4c Different implementation 2023-12-19 00:47:25 -08:00
Jarred Sumner
4748c090db Fix assertion 2023-12-18 21:03:26 -08:00
Jarred Sumner
e33003f2e2 Further efficiency improvement to timers 2023-12-18 05:10:40 -08:00
Jarred Sumner
cb61d75529 Use timer heap for setTimeout 2023-12-18 03:51:39 -08:00
7 changed files with 639 additions and 99 deletions

View File

@@ -903,3 +903,4 @@ pub const FilePoll = struct {
};
pub const Waker = bun.AsyncIO.Waker;
pub const Timer = bun.io.Timer;

View File

@@ -3272,8 +3272,8 @@ pub const Timer = struct {
Debugger.willDispatchAsyncCall(globalThis, .DOMTimer, Timeout.ID.asyncID(.{ .id = this.id, .kind = kind }));
}
const result = callback.callWithGlobalThis(
globalThis,
globalThis.queueMicrotask(
callback,
args,
);
@@ -3281,50 +3281,318 @@ pub const Timer = struct {
Debugger.didDispatchAsyncCall(globalThis, .DOMTimer, Timeout.ID.asyncID(.{ .id = this.id, .kind = kind }));
}
if (result.isEmptyOrUndefinedOrNull() or !result.isCell()) {
this.deinit();
return;
}
this.deinit();
if (result.isAnyError()) {
vm.onUnhandledError(globalThis, result);
this.deinit();
return;
}
// if (result.isEmptyOrUndefinedOrNull() or !result.isCell()) {
// this.deinit();
// return;
// }
if (result.asAnyPromise()) |promise| {
switch (promise.status(globalThis.vm())) {
.Rejected => {
this.deinit();
vm.onUnhandledError(globalThis, promise.result(globalThis.vm()));
},
.Fulfilled => {
this.deinit();
// if (result.isAnyError()) {
// vm.onUnhandledError(globalThis, result);
// this.deinit();
// return;
// }
// get the value out of the promise
_ = promise.result(globalThis.vm());
},
.Pending => {
result.then(globalThis, this, CallbackJob__onResolve, CallbackJob__onReject);
},
}
} else {
this.deinit();
}
// if (result.asAnyPromise()) |promise| {
// switch (promise.status(globalThis.vm())) {
// .Rejected => {
// this.deinit();
// vm.onUnhandledError(globalThis, promise.result(globalThis.vm()));
// },
// .Fulfilled => {
// this.deinit();
// // get the value out of the promise
// _ = promise.result(globalThis.vm());
// },
// .Pending => {
// result.then(globalThis, this, CallbackJob__onResolve, CallbackJob__onReject);
// },
// }
// } else {
// this.deinit();
// }
}
};
fn msToTimespec(ms: usize) std.os.timespec {
var now: std.os.timespec = undefined;
// std.time.Instant.now uses a different clock on macOS than monotonic
bun.io.Loop.updateTimespec(&now);
var increment = std.os.timespec{
// nanosecond from ms milliseconds
.tv_nsec = if (ms > 0) @intCast((ms % std.time.ms_per_s) *| std.time.ns_per_ms) else 0,
.tv_sec = @intCast(ms / std.time.ms_per_s),
};
increment.tv_nsec +|= now.tv_nsec;
increment.tv_sec +|= now.tv_sec;
if (increment.tv_nsec >= std.time.ns_per_s) {
increment.tv_nsec -= std.time.ns_per_s;
increment.tv_sec +|= 1;
}
return increment;
}
fn msExpiryToTimespec(ms: usize) std.os.timespec {
return std.os.timespec{
// nanosecond from ms milliseconds
.tv_nsec = @intCast((ms % std.time.ms_per_s) *| std.time.ns_per_ms),
.tv_sec = @intCast(ms / std.time.ms_per_s),
};
}
pub const TimerObject = struct {
id: i32 = -1,
kind: Timeout.Kind = .setTimeout,
ref_count: u16 = 1,
interval: i32 = 0,
expiry: u64 = 0,
// we do not allow the timer to be refreshed after we call clearInterval/clearTimeout
has_cleaned_up: bool = false,
heap: bun.io.heap.IntrusiveField(TimerObject) = .{},
keep_alive: Async.KeepAlive = Async.KeepAlive.init(),
pub usingnamespace JSC.Codegen.JSTimeout;
pub fn init(globalThis: *JSGlobalObject, id: i32, kind: Timeout.Kind, interval: i32, callback: JSValue, arguments: JSValue) JSValue {
pub const TimerObjectHeap = bun.io.heap.Intrusive(TimerObject, void, isLessThan);
fn isLessThan(_: void, a: *TimerObject, b: *TimerObject) bool {
return a.expiry < b.expiry;
}
pub const TimerMap = struct {
object: JSC.Strong = .{},
count: u32 = 0,
heap: TimerObjectHeap = .{
.context = {},
},
current_min_timestamp: u64 = 0,
pub const ScheduledTimer = struct {
request: bun.io.Request = .{
.callback = &onRequest,
},
timer: bun.io.Timer = .{
.next = std.mem.zeroes(std.os.timespec),
.tag = .ScheduledTimer,
},
map: *TimerMap,
pub fn callback(this: *ScheduledTimer) bun.io.Timer.Arm {
var vm = @fieldParentPtr(JSC.VirtualMachine, "timer_heap", this.map);
vm.eventLoop().enqueueTaskConcurrent(JSC.ConcurrentTask.createFrom(this.map));
bun.default_allocator.destroy(this);
return .{ .disarm = {} };
}
fn onRequest(req: *bun.io.Request) bun.io.Action {
var this: *ScheduledTimer = @fieldParentPtr(ScheduledTimer, "request", req);
return bun.io.Action{
.timer = &this.timer,
};
}
};
pub fn cancel(this: *TimerMap, jsc_vm: *JSC.VirtualMachine, globalObject: *JSC.JSGlobalObject, id: u32) void {
if (this.count == 0) return;
const value = this.object.get().?.getDirectIndex(globalObject, id);
if (value.isEmptyOrUndefinedOrNull()) {
return;
}
if (value.as(TimerObject)) |timer| {
timer.has_cleaned_up = true;
jsc_vm.timer_heap.heap.remove(timer);
this.object.get().?.putIndex(globalObject, id, .undefined);
this.count -= 1;
if (this.count == 0) {
this.object.deinit();
} else {
this.scheduleNextTimer(jsc_vm, msToTimespec(0));
}
return;
}
}
pub fn insert(this: *TimerMap, jsc_vm: *JSC.VirtualMachine, globalObject: *JSC.JSGlobalObject, id: u32, timer_js: JSC.JSValue, timer_object: *TimerObject) void {
if (this.count == 0) {
this.object = JSC.Strong.create(JSC.JSValue.createEmptyObject(globalObject, 32), globalObject);
}
this.object.get().?.putIndex(globalObject, id, timer_js);
this.count += 1;
const now: std.os.timespec = msToTimespec(@intCast(timer_object.interval));
timer_object.expiry = timespecToMs(now);
jsc_vm.timer_heap.heap.insert(timer_object);
this.scheduleNextTimer(jsc_vm, now);
}
pub fn popLessThan(this: *TimerMap, globalObject: *JSC.JSGlobalObject, expiry: u64) ?struct {
/// timer object
JSValue,
*TimerObject,
/// callback
JSValue,
/// arguments
JSValue,
} {
if (this.heap.peek()) |element| {
if (element.expiry < expiry) {
_ = this.heap.deleteMin();
const el = this.object.get().?.getDirectIndex(globalObject, @intCast(element.id));
if (el.isEmptyOrUndefinedOrNull()) {
return null;
}
if (element.kind != .setInterval) {
this.count -= 1;
this.object.get().?.putIndex(globalObject, @intCast(element.id), .undefined);
}
return .{
el,
element,
TimerObject.callbackGetCached(el) orelse .undefined,
TimerObject.argumentsGetCached(el) orelse .undefined,
};
}
}
return null;
}
fn timespecToMs(ts: std.os.timespec) u64 {
const max = std.math.maxInt(u64);
const s_ns = std.math.mul(
u64,
@as(u64, @intCast(ts.tv_sec)),
std.time.ns_per_s,
) catch return max;
return std.math.add(u64, s_ns, @as(u64, @intCast(ts.tv_nsec))) catch
max;
}
pub fn runFromJSThread(this: *TimerMap) void {
const now = msToTimespec(0);
const ns = timespecToMs(now);
const jsc_vm = @fieldParentPtr(JSC.VirtualMachine, "timer_heap", this);
this.flushPendingTimers(jsc_vm, jsc_vm.global, ns);
}
pub fn scheduleNextTimer(this: *TimerMap, vm: *JSC.VirtualMachine, now: std.os.timespec) void {
_ = vm;
const next = this.heap.peek() orelse return;
const now_ns = brk: {
const max = std.math.maxInt(u64);
const s_ns = std.math.mul(
u64,
@as(u64, @intCast(now.tv_sec)),
std.time.ns_per_s,
) catch break :brk max;
break :brk std.math.add(u64, s_ns, @as(u64, @intCast(now.tv_nsec))) catch
max;
};
if (this.current_min_timestamp == 0 or this.current_min_timestamp > next.expiry or now_ns > next.expiry) {
this.current_min_timestamp = next.expiry;
const timer = bun.default_allocator.create(ScheduledTimer) catch bun.outOfMemory();
timer.* = .{
.map = this,
};
timer.timer.next = now;
bun.io.Loop.get().schedule(&timer.request);
}
}
pub fn flushPendingTimers(this: *TimerMap, jsc_vm: *JSC.VirtualMachine, globalObject: *JSC.JSGlobalObject, until_expiry: u64) void {
while (this.count > 0) {
const timer_value, const timer_object, const cb, const arguments = this.popLessThan(globalObject, until_expiry) orelse return;
timer_value.ensureStillAlive();
if (timer_object.kind == .setInterval) {
var now: std.os.timespec = undefined;
// std.time.Instant.now uses a different clock on macOS than monotonic
bun.io.Loop.updateTimespec(&now);
const interval_ms: u64 = @intCast(timer_object.interval);
now.tv_nsec += @intCast(interval_ms * 1000000);
now.tv_sec += @intCast(@as(u64, @intCast(now.tv_nsec)) / 1000000000);
now.tv_nsec = @rem(now.tv_nsec, 1000000000);
timer_object.expiry = timespecToMs(now);
this.heap.insert(timer_object);
this.scheduleNextTimer(jsc_vm, now);
} else {
timer_object.keep_alive.unref(jsc_vm);
}
var args_buf: [8]JSC.JSValue = undefined;
var args: []JSC.JSValue = &.{};
var args_needs_deinit = false;
defer if (args_needs_deinit) bun.default_allocator.free(args);
if (!arguments.isEmptyOrUndefinedOrNull()) {
// Bun.sleep passes a Promise
if (arguments.jsType() == .JSPromise) {
args_buf[0] = arguments;
args = args_buf[0..1];
} else {
const count = arguments.getLength(globalObject);
if (count > 0) {
if (count > args_buf.len) {
args = bun.default_allocator.alloc(JSC.JSValue, count) catch unreachable;
args_needs_deinit = true;
} else {
args = args_buf[0..count];
}
var arg = args.ptr;
var i: u32 = 0;
while (i < count) : (i += 1) {
arg[0] = JSC.JSObject.getIndex(arguments, globalObject, @as(u32, @truncate(i)));
arg += 1;
}
}
}
}
if (jsc_vm.isInspectorEnabled()) {
Debugger.willDispatchAsyncCall(globalObject, .DOMTimer, Timeout.ID.asyncID(.{ .id = timer_object.id, .kind = timer_object.kind }));
}
timer_value.ensureStillAlive();
const result = cb.callWithGlobalThis(globalObject, args);
if (jsc_vm.isInspectorEnabled()) {
Debugger.didDispatchAsyncCall(globalObject, .DOMTimer, Timeout.ID.asyncID(.{ .id = timer_object.id, .kind = timer_object.kind }));
}
if (result.isAnyError()) {
jsc_vm.onUnhandledError(globalObject, result);
return;
}
jsc_vm.drainMicrotasks();
}
}
};
pub fn init(globalThis: *JSGlobalObject, id: i32, kind: Timeout.Kind, interval: i32, callback: JSValue, arguments: JSValue, out: **TimerObject) JSValue {
var timer = globalThis.allocator().create(TimerObject) catch unreachable;
timer.* = .{
.id = id,
@@ -3336,6 +3604,7 @@ pub const Timer = struct {
TimerObject.argumentsSetCached(timer_js, globalThis, arguments);
TimerObject.callbackSetCached(timer_js, globalThis, callback);
timer_js.ensureStillAlive();
out.* = timer;
return timer_js;
}
@@ -3427,8 +3696,8 @@ pub const Timer = struct {
var timeout = Timeout{
.callback = JSC.Strong.create(callback, globalThis),
.globalThis = globalThis,
.timer = uws.Timer.create(
vm.uwsLoop(),
.timer = Timeout.TimerReference.create(
vm.eventLoop(),
id,
),
};
@@ -3450,12 +3719,7 @@ pub const Timer = struct {
map.put(vm.allocator, this.id, timeout) catch unreachable;
timeout.timer.set(
id,
Timeout.run,
this.interval,
@as(i32, @intFromBool(this.kind == .setInterval)) * this.interval,
);
timeout.timer.?.schedule(this.interval);
return this_value;
}
return JSValue.jsUndefined();
@@ -3503,12 +3767,89 @@ pub const Timer = struct {
pub const Timeout = struct {
callback: JSC.Strong = .{},
globalThis: *JSC.JSGlobalObject,
timer: *uws.Timer,
timer: ?*TimerReference = null,
did_unref_timer: bool = false,
poll_ref: Async.KeepAlive = Async.KeepAlive.init(),
arguments: JSC.Strong = .{},
has_scheduled_job: bool = false,
pub const TimerReference = struct {
id: ID = .{ .id = 0 },
cancelled: bool = false,
event_loop: *JSC.EventLoop,
timer: bun.io.Timer = .{
.tag = .TimerReference,
.next = std.mem.zeroes(std.os.timespec),
},
request: bun.io.Request = .{
.callback = &onRequest,
},
interval: i32 = -1,
concurrent_task: JSC.ConcurrentTask = undefined,
next: ?*TimerReference = null,
pub const Pool = bun.HiveArray(TimerReference, 1024).Fallback;
fn onRequest(req: *bun.io.Request) bun.io.Action {
var this: *TimerReference = @fieldParentPtr(TimerReference, "request", req);
if (this.timer.state == .CANCELLED) {
this.deinit();
return bun.io.Action{
.timer_cancelled = {},
};
}
return bun.io.Action{
.timer = &this.timer,
};
}
pub fn callback(this: *TimerReference) bun.io.Timer.Arm {
_ = this;
// TODO:
return .{ .disarm = {} };
}
pub fn runFromJSThread(this: *TimerReference) void {
const timer_id = this.id;
const vm = this.event_loop.virtual_machine;
if (this.cancelled) {
this.deinit();
return;
}
if (Timeout.runFromConcurrentTask(timer_id, vm) and !this.cancelled) {
this.request = .{
.callback = &onRequest,
};
this.schedule(null);
} else {
this.deinit();
}
}
pub fn deinit(this: *TimerReference) void {
this.event_loop.timerReferencePool().put(this);
}
pub fn create(event_loop: *JSC.EventLoop, id: ID) *TimerReference {
const timer = event_loop.timerReferencePool().get();
timer.* = .{
.id = id,
.event_loop = event_loop,
};
return timer;
}
pub fn schedule(this: *TimerReference, interval: ?i32) void {
std.debug.assert(!this.cancelled);
this.timer.state = .PENDING;
this.timer.next = msToTimespec(@intCast(@max(interval orelse this.interval, 1)));
bun.io.Loop.get().schedule(&this.request);
}
};
pub const Kind = enum(u32) {
setTimeout,
setInterval,
@@ -3535,8 +3876,99 @@ pub const Timer = struct {
// use the threadlocal despite being slow on macOS
// to handle the timeout being cancelled after already enqueued
var vm = JSC.VirtualMachine.get();
const vm = JSC.VirtualMachine.get();
runWithIDAndVM(timer_id, vm);
}
pub fn runFromConcurrentTask(timer_id: ID, vm: *JSC.VirtualMachine) bool {
const repeats = timer_id.repeats();
var map = vm.timer.maps.get(timer_id.kind);
const this_: ?Timeout = map.get(
timer_id.id,
) orelse return false;
var this = this_ orelse
return false;
const globalThis = this.globalThis;
// Disable thundering herd of setInterval() calls
// Skip setInterval() calls when the previous one has not been run yet.
if (repeats and this.has_scheduled_job) {
return false;
}
const cb: CallbackJob = .{
.callback = if (repeats)
JSC.Strong.create(
this.callback.get() orelse {
// if the callback was freed, that's an error
if (comptime Environment.allow_assert)
unreachable;
this.deinit();
_ = map.swapRemove(timer_id.id);
return false;
},
globalThis,
)
else
this.callback,
.arguments = if (repeats and this.arguments.has())
JSC.Strong.create(
this.arguments.get() orelse {
// if the arguments freed, that's an error
if (comptime Environment.allow_assert)
unreachable;
this.deinit();
_ = map.swapRemove(timer_id.id);
return false;
},
globalThis,
)
else
this.arguments,
.globalThis = globalThis,
.id = timer_id.id,
.kind = timer_id.kind,
};
var reschedule = false;
// This allows us to:
// - free the memory before the job is run
// - reuse the JSC.Strong
if (!repeats) {
this.callback = .{};
this.arguments = .{};
map.put(vm.allocator, timer_id.id, null) catch unreachable;
this.deinit();
} else {
this.has_scheduled_job = true;
map.put(vm.allocator, timer_id.id, this) catch {};
reschedule = true;
}
// TODO: remove this memory allocation!
var job = vm.allocator.create(CallbackJob) catch @panic(
"Out of memory while allocating Timeout",
);
job.* = cb;
job.task = CallbackJob.Task.init(job);
job.ref.ref(vm);
if (vm.isInspectorEnabled()) {
Debugger.didScheduleAsyncCall(globalThis, .DOMTimer, timer_id.asyncID(), !repeats);
}
job.perform();
return reschedule;
}
pub fn runWithIDAndVM(timer_id: ID, vm: *JSC.VirtualMachine) void {
const repeats = timer_id.repeats();
var map = vm.timer.maps.get(timer_id.kind);
@@ -3625,7 +4057,9 @@ pub const Timer = struct {
this.poll_ref.unref(vm);
this.timer.deinit(false);
if (this.timer) |timer| {
timer.cancelled = true;
}
if (comptime Environment.isPosix)
// balance double unreffing in doUnref
@@ -3683,8 +4117,8 @@ pub const Timer = struct {
var timeout = Timeout{
.callback = JSC.Strong.create(callback, globalThis),
.globalThis = globalThis,
.timer = uws.Timer.create(
vm.uwsLoop(),
.timer = Timeout.TimerReference.create(
vm.eventLoop(),
Timeout.ID{
.id = id,
.kind = kind,
@@ -3703,15 +4137,7 @@ pub const Timer = struct {
Debugger.didScheduleAsyncCall(globalThis, .DOMTimer, Timeout.ID.asyncID(.{ .id = id, .kind = kind }), !repeat);
}
timeout.timer.set(
Timeout.ID{
.id = id,
.kind = kind,
},
Timeout.run,
interval,
@as(i32, @intFromBool(kind == .setInterval)) * interval,
);
timeout.timer.?.schedule(interval);
}
pub fn setImmediate(
@@ -3726,11 +4152,11 @@ pub const Timer = struct {
const interval: i32 = 0;
const wrappedCallback = callback.withAsyncContextIfNeeded(globalThis);
var timer_object: *TimerObject = undefined;
const timer_js = TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments, &timer_object);
timer_js.ensureStillAlive();
Timer.set(id, globalThis, wrappedCallback, interval, arguments, false) catch
return JSValue.jsUndefined();
return TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments);
return timer_js;
}
comptime {
@@ -3746,8 +4172,9 @@ pub const Timer = struct {
arguments: JSValue,
) callconv(.C) JSValue {
JSC.markBinding(@src());
const id = globalThis.bunVM().timer.last_id;
globalThis.bunVM().timer.last_id +%= 1;
var vm = globalThis.bunVM();
const id = vm.timer.last_id;
vm.timer.last_id +%= 1;
const interval: i32 = @max(
countdown.coerce(i32, globalThis),
@@ -3757,10 +4184,12 @@ pub const Timer = struct {
const wrappedCallback = callback.withAsyncContextIfNeeded(globalThis);
Timer.set(id, globalThis, wrappedCallback, interval, arguments, false) catch
return JSValue.jsUndefined();
return TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments);
var timer_object: *TimerObject = undefined;
const timer_js = TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments, &timer_object);
timer_js.ensureStillAlive();
vm.timer_heap.insert(vm, globalThis, @intCast(id), timer_js, timer_object);
timer_object.keep_alive.ref(vm);
return timer_js;
}
pub fn setInterval(
globalThis: *JSGlobalObject,
@@ -3769,8 +4198,9 @@ pub const Timer = struct {
arguments: JSValue,
) callconv(.C) JSValue {
JSC.markBinding(@src());
const id = globalThis.bunVM().timer.last_id;
globalThis.bunVM().timer.last_id +%= 1;
const vm = globalThis.bunVM();
const id = vm.timer.last_id;
vm.timer.last_id +%= 1;
const wrappedCallback = callback.withAsyncContextIfNeeded(globalThis);
@@ -3780,10 +4210,13 @@ pub const Timer = struct {
countdown.coerce(i32, globalThis),
1,
);
Timer.set(id, globalThis, wrappedCallback, interval, arguments, true) catch
return JSValue.jsUndefined();
return TimerObject.init(globalThis, id, .setInterval, interval, wrappedCallback, arguments);
var timer_object: *TimerObject = undefined;
const timer_js = TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments, &timer_object);
timer_js.ensureStillAlive();
vm.timer_heap.insert(vm, globalThis, @intCast(id), timer_js, timer_object);
timer_object.keep_alive.ref(vm);
return timer_js;
}
pub fn clearTimer(timer_id_value: JSValue, globalThis: *JSGlobalObject, repeats: bool) void {

View File

@@ -272,8 +272,7 @@ JSC_DEFINE_HOST_FUNCTION(functionBunSleepThenCallback,
{
JSC::VM& vm = globalObject->vm();
RELEASE_ASSERT(callFrame->argumentCount() == 1);
JSPromise* promise = jsCast<JSC::JSPromise*>(callFrame->argument(0));
JSPromise* promise = jsDynamicCast<JSC::JSPromise*>(callFrame->argument(0));
RELEASE_ASSERT(promise);
promise->resolve(globalObject, JSC::jsUndefined());

View File

@@ -2417,8 +2417,8 @@ CPP_DECL void JSC__JSValue__putIndex(JSC__JSValue JSValue0, JSC__JSGlobalObject*
{
JSC::JSValue value = JSC::JSValue::decode(JSValue0);
JSC::JSValue value2 = JSC::JSValue::decode(JSValue3);
JSC::JSArray* array = JSC::jsCast<JSC::JSArray*>(value);
array->putDirectIndex(arg1, arg2, value2);
JSC::JSObject* object = value.getObject();
object->putDirectIndex(arg1, arg2, value2);
}
CPP_DECL void JSC__JSValue__push(JSC__JSValue JSValue0, JSC__JSGlobalObject* arg1, JSC__JSValue JSValue3)

View File

@@ -339,6 +339,8 @@ const Lchmod = JSC.Node.Async.lchmod;
const Lchown = JSC.Node.Async.lchown;
const Unlink = JSC.Node.Async.unlink;
const WaitPidResultTask = JSC.Subprocess.WaiterThread.WaitPidResultTask;
const TimerReference = JSC.BunTimer.Timeout.TimerReference;
const TimerMap = JSC.BunTimer.TimerObject.TimerMap;
// Task.get(ReadFileTask) -> ?ReadFileTask
pub const Task = TaggedPointerUnion(.{
FetchTasklet,
@@ -398,6 +400,8 @@ pub const Task = TaggedPointerUnion(.{
Lchown,
Unlink,
WaitPidResultTask,
TimerReference,
TimerMap,
});
const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue;
pub const ConcurrentTask = struct {
@@ -623,6 +627,22 @@ pub const EventLoop = struct {
forever_timer: ?*uws.Timer = null,
deferred_microtask_map: std.AutoArrayHashMapUnmanaged(?*anyopaque, DeferredRepeatingTask) = .{},
uws_loop: if (Environment.isWindows) *uws.Loop else void = undefined,
timer_reference_pool: if (Environment.isPosix) ?*bun.JSC.BunTimer.Timeout.TimerReference.Pool else void = if (Environment.isPosix) null else undefined,
pub fn timerReferencePool(this: *EventLoop) *bun.JSC.BunTimer.Timeout.TimerReference.Pool {
if (comptime !Environment.isPosix) {
@compileError("This function is only available on POSIX platforms");
}
return this.timer_reference_pool orelse brk: {
const _pool = bun.default_allocator.create(bun.JSC.BunTimer.Timeout.TimerReference.Pool) catch bun.outOfMemory();
_pool.* = bun.JSC.BunTimer.Timeout.TimerReference.Pool.init(bun.default_allocator);
this.timer_reference_pool = _pool;
break :brk _pool;
};
}
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
const log = bun.Output.scoped(.EventLoop, false);
@@ -919,6 +939,20 @@ pub const EventLoop = struct {
var any: *WaitPidResultTask = task.get(WaitPidResultTask).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(TimerReference))) => {
var any: *TimerReference = task.get(TimerReference).?;
var next_ = any.next;
any.runFromJSThread();
while (next_) |next| {
next_ = next.next;
next.runFromJSThread();
}
},
@field(Task.Tag, typeBaseName(@typeName(TimerMap))) => {
var any: *TimerMap = task.get(TimerMap).?;
any.runFromJSThread();
},
else => if (Environment.allow_assert) {
bun.Output.prettyln("\nUnexpected tag: {s}\n", .{@tagName(task.tag())});
} else {
@@ -1301,6 +1335,18 @@ pub const EventLoop = struct {
this.concurrent_tasks.push(task);
this.wakeup();
}
pub fn enqueueTaskConcurrentBatch(this: *EventLoop, batch: ConcurrentTask.Queue.Batch) void {
JSC.markBinding(@src());
if (comptime Environment.allow_assert) {
if (this.virtual_machine.has_terminated) {
@panic("EventLoop.enqueueTaskConcurrent: VM has terminated");
}
}
this.concurrent_tasks.pushBatch(batch.front.?, batch.last.?, batch.count);
this.wakeup();
}
};
pub const MiniEventLoop = struct {

View File

@@ -498,6 +498,8 @@ pub const VirtualMachine = struct {
hot_reload: bun.CLI.Command.HotReload = .none,
jsc: *JSC.VM = undefined,
timer_heap: bun.JSC.BunTimer.TimerObject.TimerMap = .{},
/// hide bun:wrap from stack traces
/// bun:wrap is very noisy
hide_bun_stackframes: bool = true,

View File

@@ -3,7 +3,7 @@ const std = @import("std");
const sys = bun.sys;
const linux = std.os.linux;
const Environment = bun.Environment;
const heap = @import("./heap.zig");
pub const heap = @import("./heap.zig");
const JSC = bun.JSC;
const log = bun.Output.scoped(.loop, false);
@@ -92,6 +92,8 @@ pub const Loop = struct {
@compileError("Epoll is Linux-Only");
}
this.updateNow();
while (true) {
// Process pending requests
@@ -127,22 +129,12 @@ pub const Loop = struct {
this.active -= 1;
close.onDone(close.ctx);
},
.timer_cancelled => {},
.timer => |timer| {
while (true) {
switch (timer.state) {
.PENDING => {
timer.state = .ACTIVE;
if (Timer.less({}, timer, &.{ .next = this.cached_now })) {
if (timer.fire() == .rearm) {
if (timer.reset) |reset| {
timer.next = reset;
timer.reset = null;
continue;
}
}
break;
}
this.timers.insert(timer);
},
.ACTIVE => {
@@ -174,7 +166,9 @@ pub const Loop = struct {
@as(u64, @intCast(this.cached_now.tv_nsec)) / std.time.ns_per_ms;
const ms_next = @as(u64, @intCast(t.next.tv_sec)) * std.time.ms_per_s +
@as(u64, @intCast(t.next.tv_nsec)) / std.time.ns_per_ms;
break :timeout @as(i32, @intCast(ms_next -| ms_now));
const out = @as(i32, @intCast(ms_next -| ms_now));
break :timeout @max(out, 0);
};
var events: [256]EventType = undefined;
@@ -230,6 +224,8 @@ pub const Loop = struct {
@compileError("Kqueue is MacOS-Only");
}
this.updateNow();
while (true) {
var stack_fallback = std.heap.stackFallback(@sizeOf([256]EventType), bun.default_allocator);
var events_list: std.ArrayList(EventType) = std.ArrayList(EventType).initCapacity(stack_fallback.get(), 256) catch unreachable;
@@ -270,6 +266,7 @@ pub const Loop = struct {
&events_list.items.ptr[i],
);
},
.close => |close| {
if (close.poll.flags.contains(.poll_readable) or close.poll.flags.contains(.poll_writable)) {
const i = events_list.items.len;
@@ -285,22 +282,12 @@ pub const Loop = struct {
}
close.onDone(close.ctx);
},
.timer_cancelled => {},
.timer => |timer| {
while (true) {
switch (timer.state) {
.PENDING => {
timer.state = .ACTIVE;
if (Timer.less({}, timer, &.{ .next = this.cached_now })) {
if (timer.fire() == .rearm) {
if (timer.reset) |reset| {
timer.next = reset;
timer.reset = null;
continue;
}
}
break;
}
this.timers.insert(timer);
},
.ACTIVE => {
@@ -331,6 +318,15 @@ pub const Loop = struct {
out.tv_sec = t.next.tv_sec -| this.cached_now.tv_sec;
out.tv_nsec = t.next.tv_nsec -| this.cached_now.tv_nsec;
if (out.tv_nsec < 0) {
out.tv_sec -= 1;
out.tv_nsec += std.time.ns_per_s;
}
if (out.tv_sec < 0) {
break :timeout null;
}
break :timeout out;
};
@@ -367,6 +363,9 @@ pub const Loop = struct {
fn drainExpiredTimers(this: *Loop) void {
const now = Timer{ .next = this.cached_now };
var current_batch = JSC.ConcurrentTask.Queue.Batch{};
var prev_event_loop: ?*JSC.EventLoop = null;
// Run our expired timers
while (this.timers.peek()) |t| {
if (!Timer.less({}, t, &now)) break;
@@ -377,7 +376,10 @@ pub const Loop = struct {
// Mark completion as done
t.state = .FIRED;
switch (t.fire()) {
switch (t.fire(
&current_batch,
&prev_event_loop,
)) {
.disarm => {},
.rearm => |new| {
t.next = new;
@@ -387,14 +389,22 @@ pub const Loop = struct {
},
}
}
if (prev_event_loop) |event_loop| {
event_loop.enqueueTaskConcurrent(current_batch.front.?);
}
}
fn updateNow(this: *Loop) void {
updateTimespec(&this.cached_now);
}
pub fn updateTimespec(timespec: *os.timespec) void {
if (comptime Environment.isLinux) {
const rc = linux.clock_gettime(linux.CLOCK.MONOTONIC, &this.cached_now);
const rc = linux.clock_gettime(linux.CLOCK.MONOTONIC, timespec);
assert(rc == 0);
} else if (comptime Environment.isMac) {
std.os.clock_gettime(std.os.CLOCK.MONOTONIC, &this.cached_now) catch {};
std.os.clock_gettime(std.os.CLOCK.MONOTONIC, timespec) catch {};
} else {
@compileError("TODO: implement poll for this platform");
}
@@ -416,6 +426,7 @@ pub const Action = union(enum) {
writable: FileAction,
close: CloseAction,
timer: *Timer,
timer_cancelled: void,
pub const FileAction = struct {
fd: bun.FileDescriptor,
@@ -482,6 +493,9 @@ const Pollable = struct {
}
};
const TimerReference = bun.JSC.BunTimer.Timeout.TimerReference;
const ScheduledTimer = bun.JSC.BunTimer.TimerObject.TimerMap.ScheduledTimer;
pub const Timer = struct {
/// The absolute time to fire this timer next.
next: os.timespec,
@@ -501,10 +515,14 @@ pub const Timer = struct {
pub const Tag = enum {
TimerCallback,
TimerReference,
ScheduledTimer,
pub fn Type(comptime T: Tag) type {
return switch (T) {
.TimerCallback => TimerCallback,
.TimerReference => TimerReference,
.ScheduledTimer => ScheduledTimer,
};
}
};
@@ -556,10 +574,51 @@ pub const Timer = struct {
disarm,
};
pub fn fire(this: *Timer) Arm {
pub fn fire(this: *Timer, batch: *JSC.ConcurrentTask.Queue.Batch, event_loop: *?*JSC.EventLoop) Arm {
if (comptime Environment.allow_assert) {
if (comptime Environment.isPosix) {
const timer = std.time.Instant{ .timestamp = this.next };
var now = std.time.Instant{ .timestamp = undefined };
Loop.updateTimespec(&now.timestamp);
if (timer.order(now) != .lt) {
bun.Output.panic("Timer fired {} too early", .{bun.fmt.fmtDuration(timer.since(now))});
}
}
}
switch (this.tag) {
inline else => |t| {
var container: *t.Type() = @fieldParentPtr(t.Type(), "timer", this);
if (comptime t.Type() == ScheduledTimer) {
return container.callback();
} else if (comptime @hasDecl(t.Type(), "callback")) {
const concurrent_task = container.concurrent_task.from(container, .manual_deinit);
if (event_loop.*) |loop| {
// If they are different event loops, we have to drain the batch right here.
if (loop != container.event_loop) {
loop.enqueueTaskConcurrent(batch.front.?);
event_loop.* = container.event_loop;
}
if (batch.front == null) {
batch.front = concurrent_task;
}
} else {
batch.front = concurrent_task;
event_loop.* = container.event_loop;
}
if (batch.last) |last_task| {
var last = @fieldParentPtr(t.Type(), "concurrent_task", last_task);
last.next = container;
}
batch.last = concurrent_task;
batch.count += 1;
return container.callback();
}
return container.callback(container);
},
}