mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 23:18:47 +00:00
Compare commits
4 Commits
dylan/byte
...
jarred/red
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51286a9f4c | ||
|
|
4748c090db | ||
|
|
e33003f2e2 | ||
|
|
cb61d75529 |
@@ -903,3 +903,4 @@ pub const FilePoll = struct {
|
||||
};
|
||||
|
||||
pub const Waker = bun.AsyncIO.Waker;
|
||||
pub const Timer = bun.io.Timer;
|
||||
|
||||
@@ -3272,8 +3272,8 @@ pub const Timer = struct {
|
||||
Debugger.willDispatchAsyncCall(globalThis, .DOMTimer, Timeout.ID.asyncID(.{ .id = this.id, .kind = kind }));
|
||||
}
|
||||
|
||||
const result = callback.callWithGlobalThis(
|
||||
globalThis,
|
||||
globalThis.queueMicrotask(
|
||||
callback,
|
||||
args,
|
||||
);
|
||||
|
||||
@@ -3281,50 +3281,318 @@ pub const Timer = struct {
|
||||
Debugger.didDispatchAsyncCall(globalThis, .DOMTimer, Timeout.ID.asyncID(.{ .id = this.id, .kind = kind }));
|
||||
}
|
||||
|
||||
if (result.isEmptyOrUndefinedOrNull() or !result.isCell()) {
|
||||
this.deinit();
|
||||
return;
|
||||
}
|
||||
this.deinit();
|
||||
|
||||
if (result.isAnyError()) {
|
||||
vm.onUnhandledError(globalThis, result);
|
||||
this.deinit();
|
||||
return;
|
||||
}
|
||||
// if (result.isEmptyOrUndefinedOrNull() or !result.isCell()) {
|
||||
// this.deinit();
|
||||
// return;
|
||||
// }
|
||||
|
||||
if (result.asAnyPromise()) |promise| {
|
||||
switch (promise.status(globalThis.vm())) {
|
||||
.Rejected => {
|
||||
this.deinit();
|
||||
vm.onUnhandledError(globalThis, promise.result(globalThis.vm()));
|
||||
},
|
||||
.Fulfilled => {
|
||||
this.deinit();
|
||||
// if (result.isAnyError()) {
|
||||
// vm.onUnhandledError(globalThis, result);
|
||||
// this.deinit();
|
||||
// return;
|
||||
// }
|
||||
|
||||
// get the value out of the promise
|
||||
_ = promise.result(globalThis.vm());
|
||||
},
|
||||
.Pending => {
|
||||
result.then(globalThis, this, CallbackJob__onResolve, CallbackJob__onReject);
|
||||
},
|
||||
}
|
||||
} else {
|
||||
this.deinit();
|
||||
}
|
||||
// if (result.asAnyPromise()) |promise| {
|
||||
// switch (promise.status(globalThis.vm())) {
|
||||
// .Rejected => {
|
||||
// this.deinit();
|
||||
// vm.onUnhandledError(globalThis, promise.result(globalThis.vm()));
|
||||
// },
|
||||
// .Fulfilled => {
|
||||
// this.deinit();
|
||||
|
||||
// // get the value out of the promise
|
||||
// _ = promise.result(globalThis.vm());
|
||||
// },
|
||||
// .Pending => {
|
||||
// result.then(globalThis, this, CallbackJob__onResolve, CallbackJob__onReject);
|
||||
// },
|
||||
// }
|
||||
// } else {
|
||||
// this.deinit();
|
||||
// }
|
||||
}
|
||||
};
|
||||
|
||||
fn msToTimespec(ms: usize) std.os.timespec {
|
||||
var now: std.os.timespec = undefined;
|
||||
// std.time.Instant.now uses a different clock on macOS than monotonic
|
||||
bun.io.Loop.updateTimespec(&now);
|
||||
|
||||
var increment = std.os.timespec{
|
||||
// nanosecond from ms milliseconds
|
||||
.tv_nsec = if (ms > 0) @intCast((ms % std.time.ms_per_s) *| std.time.ns_per_ms) else 0,
|
||||
.tv_sec = @intCast(ms / std.time.ms_per_s),
|
||||
};
|
||||
|
||||
increment.tv_nsec +|= now.tv_nsec;
|
||||
increment.tv_sec +|= now.tv_sec;
|
||||
|
||||
if (increment.tv_nsec >= std.time.ns_per_s) {
|
||||
increment.tv_nsec -= std.time.ns_per_s;
|
||||
increment.tv_sec +|= 1;
|
||||
}
|
||||
|
||||
return increment;
|
||||
}
|
||||
|
||||
fn msExpiryToTimespec(ms: usize) std.os.timespec {
|
||||
return std.os.timespec{
|
||||
// nanosecond from ms milliseconds
|
||||
.tv_nsec = @intCast((ms % std.time.ms_per_s) *| std.time.ns_per_ms),
|
||||
.tv_sec = @intCast(ms / std.time.ms_per_s),
|
||||
};
|
||||
}
|
||||
|
||||
pub const TimerObject = struct {
|
||||
id: i32 = -1,
|
||||
kind: Timeout.Kind = .setTimeout,
|
||||
ref_count: u16 = 1,
|
||||
interval: i32 = 0,
|
||||
expiry: u64 = 0,
|
||||
// we do not allow the timer to be refreshed after we call clearInterval/clearTimeout
|
||||
has_cleaned_up: bool = false,
|
||||
heap: bun.io.heap.IntrusiveField(TimerObject) = .{},
|
||||
keep_alive: Async.KeepAlive = Async.KeepAlive.init(),
|
||||
|
||||
pub usingnamespace JSC.Codegen.JSTimeout;
|
||||
|
||||
pub fn init(globalThis: *JSGlobalObject, id: i32, kind: Timeout.Kind, interval: i32, callback: JSValue, arguments: JSValue) JSValue {
|
||||
pub const TimerObjectHeap = bun.io.heap.Intrusive(TimerObject, void, isLessThan);
|
||||
|
||||
fn isLessThan(_: void, a: *TimerObject, b: *TimerObject) bool {
|
||||
return a.expiry < b.expiry;
|
||||
}
|
||||
|
||||
pub const TimerMap = struct {
|
||||
object: JSC.Strong = .{},
|
||||
count: u32 = 0,
|
||||
heap: TimerObjectHeap = .{
|
||||
.context = {},
|
||||
},
|
||||
|
||||
current_min_timestamp: u64 = 0,
|
||||
|
||||
pub const ScheduledTimer = struct {
|
||||
request: bun.io.Request = .{
|
||||
.callback = &onRequest,
|
||||
},
|
||||
|
||||
timer: bun.io.Timer = .{
|
||||
.next = std.mem.zeroes(std.os.timespec),
|
||||
.tag = .ScheduledTimer,
|
||||
},
|
||||
|
||||
map: *TimerMap,
|
||||
|
||||
pub fn callback(this: *ScheduledTimer) bun.io.Timer.Arm {
|
||||
var vm = @fieldParentPtr(JSC.VirtualMachine, "timer_heap", this.map);
|
||||
vm.eventLoop().enqueueTaskConcurrent(JSC.ConcurrentTask.createFrom(this.map));
|
||||
bun.default_allocator.destroy(this);
|
||||
|
||||
return .{ .disarm = {} };
|
||||
}
|
||||
|
||||
fn onRequest(req: *bun.io.Request) bun.io.Action {
|
||||
var this: *ScheduledTimer = @fieldParentPtr(ScheduledTimer, "request", req);
|
||||
return bun.io.Action{
|
||||
.timer = &this.timer,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
pub fn cancel(this: *TimerMap, jsc_vm: *JSC.VirtualMachine, globalObject: *JSC.JSGlobalObject, id: u32) void {
|
||||
if (this.count == 0) return;
|
||||
|
||||
const value = this.object.get().?.getDirectIndex(globalObject, id);
|
||||
|
||||
if (value.isEmptyOrUndefinedOrNull()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (value.as(TimerObject)) |timer| {
|
||||
timer.has_cleaned_up = true;
|
||||
jsc_vm.timer_heap.heap.remove(timer);
|
||||
this.object.get().?.putIndex(globalObject, id, .undefined);
|
||||
this.count -= 1;
|
||||
if (this.count == 0) {
|
||||
this.object.deinit();
|
||||
} else {
|
||||
this.scheduleNextTimer(jsc_vm, msToTimespec(0));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(this: *TimerMap, jsc_vm: *JSC.VirtualMachine, globalObject: *JSC.JSGlobalObject, id: u32, timer_js: JSC.JSValue, timer_object: *TimerObject) void {
|
||||
if (this.count == 0) {
|
||||
this.object = JSC.Strong.create(JSC.JSValue.createEmptyObject(globalObject, 32), globalObject);
|
||||
}
|
||||
|
||||
this.object.get().?.putIndex(globalObject, id, timer_js);
|
||||
this.count += 1;
|
||||
|
||||
const now: std.os.timespec = msToTimespec(@intCast(timer_object.interval));
|
||||
timer_object.expiry = timespecToMs(now);
|
||||
jsc_vm.timer_heap.heap.insert(timer_object);
|
||||
|
||||
this.scheduleNextTimer(jsc_vm, now);
|
||||
}
|
||||
|
||||
pub fn popLessThan(this: *TimerMap, globalObject: *JSC.JSGlobalObject, expiry: u64) ?struct {
|
||||
/// timer object
|
||||
JSValue,
|
||||
|
||||
*TimerObject,
|
||||
/// callback
|
||||
JSValue,
|
||||
/// arguments
|
||||
JSValue,
|
||||
} {
|
||||
if (this.heap.peek()) |element| {
|
||||
if (element.expiry < expiry) {
|
||||
_ = this.heap.deleteMin();
|
||||
const el = this.object.get().?.getDirectIndex(globalObject, @intCast(element.id));
|
||||
if (el.isEmptyOrUndefinedOrNull()) {
|
||||
return null;
|
||||
}
|
||||
if (element.kind != .setInterval) {
|
||||
this.count -= 1;
|
||||
this.object.get().?.putIndex(globalObject, @intCast(element.id), .undefined);
|
||||
}
|
||||
|
||||
return .{
|
||||
el,
|
||||
element,
|
||||
TimerObject.callbackGetCached(el) orelse .undefined,
|
||||
TimerObject.argumentsGetCached(el) orelse .undefined,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
fn timespecToMs(ts: std.os.timespec) u64 {
|
||||
const max = std.math.maxInt(u64);
|
||||
const s_ns = std.math.mul(
|
||||
u64,
|
||||
@as(u64, @intCast(ts.tv_sec)),
|
||||
std.time.ns_per_s,
|
||||
) catch return max;
|
||||
return std.math.add(u64, s_ns, @as(u64, @intCast(ts.tv_nsec))) catch
|
||||
max;
|
||||
}
|
||||
|
||||
pub fn runFromJSThread(this: *TimerMap) void {
|
||||
const now = msToTimespec(0);
|
||||
const ns = timespecToMs(now);
|
||||
|
||||
const jsc_vm = @fieldParentPtr(JSC.VirtualMachine, "timer_heap", this);
|
||||
|
||||
this.flushPendingTimers(jsc_vm, jsc_vm.global, ns);
|
||||
}
|
||||
|
||||
pub fn scheduleNextTimer(this: *TimerMap, vm: *JSC.VirtualMachine, now: std.os.timespec) void {
|
||||
_ = vm;
|
||||
|
||||
const next = this.heap.peek() orelse return;
|
||||
|
||||
const now_ns = brk: {
|
||||
const max = std.math.maxInt(u64);
|
||||
const s_ns = std.math.mul(
|
||||
u64,
|
||||
@as(u64, @intCast(now.tv_sec)),
|
||||
std.time.ns_per_s,
|
||||
) catch break :brk max;
|
||||
break :brk std.math.add(u64, s_ns, @as(u64, @intCast(now.tv_nsec))) catch
|
||||
max;
|
||||
};
|
||||
|
||||
if (this.current_min_timestamp == 0 or this.current_min_timestamp > next.expiry or now_ns > next.expiry) {
|
||||
this.current_min_timestamp = next.expiry;
|
||||
const timer = bun.default_allocator.create(ScheduledTimer) catch bun.outOfMemory();
|
||||
timer.* = .{
|
||||
.map = this,
|
||||
};
|
||||
timer.timer.next = now;
|
||||
bun.io.Loop.get().schedule(&timer.request);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flushPendingTimers(this: *TimerMap, jsc_vm: *JSC.VirtualMachine, globalObject: *JSC.JSGlobalObject, until_expiry: u64) void {
|
||||
while (this.count > 0) {
|
||||
const timer_value, const timer_object, const cb, const arguments = this.popLessThan(globalObject, until_expiry) orelse return;
|
||||
timer_value.ensureStillAlive();
|
||||
|
||||
if (timer_object.kind == .setInterval) {
|
||||
var now: std.os.timespec = undefined;
|
||||
// std.time.Instant.now uses a different clock on macOS than monotonic
|
||||
bun.io.Loop.updateTimespec(&now);
|
||||
|
||||
const interval_ms: u64 = @intCast(timer_object.interval);
|
||||
now.tv_nsec += @intCast(interval_ms * 1000000);
|
||||
now.tv_sec += @intCast(@as(u64, @intCast(now.tv_nsec)) / 1000000000);
|
||||
now.tv_nsec = @rem(now.tv_nsec, 1000000000);
|
||||
|
||||
timer_object.expiry = timespecToMs(now);
|
||||
this.heap.insert(timer_object);
|
||||
this.scheduleNextTimer(jsc_vm, now);
|
||||
} else {
|
||||
timer_object.keep_alive.unref(jsc_vm);
|
||||
}
|
||||
|
||||
var args_buf: [8]JSC.JSValue = undefined;
|
||||
var args: []JSC.JSValue = &.{};
|
||||
var args_needs_deinit = false;
|
||||
defer if (args_needs_deinit) bun.default_allocator.free(args);
|
||||
|
||||
if (!arguments.isEmptyOrUndefinedOrNull()) {
|
||||
// Bun.sleep passes a Promise
|
||||
if (arguments.jsType() == .JSPromise) {
|
||||
args_buf[0] = arguments;
|
||||
args = args_buf[0..1];
|
||||
} else {
|
||||
const count = arguments.getLength(globalObject);
|
||||
if (count > 0) {
|
||||
if (count > args_buf.len) {
|
||||
args = bun.default_allocator.alloc(JSC.JSValue, count) catch unreachable;
|
||||
args_needs_deinit = true;
|
||||
} else {
|
||||
args = args_buf[0..count];
|
||||
}
|
||||
var arg = args.ptr;
|
||||
var i: u32 = 0;
|
||||
while (i < count) : (i += 1) {
|
||||
arg[0] = JSC.JSObject.getIndex(arguments, globalObject, @as(u32, @truncate(i)));
|
||||
arg += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (jsc_vm.isInspectorEnabled()) {
|
||||
Debugger.willDispatchAsyncCall(globalObject, .DOMTimer, Timeout.ID.asyncID(.{ .id = timer_object.id, .kind = timer_object.kind }));
|
||||
}
|
||||
timer_value.ensureStillAlive();
|
||||
const result = cb.callWithGlobalThis(globalObject, args);
|
||||
|
||||
if (jsc_vm.isInspectorEnabled()) {
|
||||
Debugger.didDispatchAsyncCall(globalObject, .DOMTimer, Timeout.ID.asyncID(.{ .id = timer_object.id, .kind = timer_object.kind }));
|
||||
}
|
||||
|
||||
if (result.isAnyError()) {
|
||||
jsc_vm.onUnhandledError(globalObject, result);
|
||||
return;
|
||||
}
|
||||
|
||||
jsc_vm.drainMicrotasks();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(globalThis: *JSGlobalObject, id: i32, kind: Timeout.Kind, interval: i32, callback: JSValue, arguments: JSValue, out: **TimerObject) JSValue {
|
||||
var timer = globalThis.allocator().create(TimerObject) catch unreachable;
|
||||
timer.* = .{
|
||||
.id = id,
|
||||
@@ -3336,6 +3604,7 @@ pub const Timer = struct {
|
||||
TimerObject.argumentsSetCached(timer_js, globalThis, arguments);
|
||||
TimerObject.callbackSetCached(timer_js, globalThis, callback);
|
||||
timer_js.ensureStillAlive();
|
||||
out.* = timer;
|
||||
return timer_js;
|
||||
}
|
||||
|
||||
@@ -3427,8 +3696,8 @@ pub const Timer = struct {
|
||||
var timeout = Timeout{
|
||||
.callback = JSC.Strong.create(callback, globalThis),
|
||||
.globalThis = globalThis,
|
||||
.timer = uws.Timer.create(
|
||||
vm.uwsLoop(),
|
||||
.timer = Timeout.TimerReference.create(
|
||||
vm.eventLoop(),
|
||||
id,
|
||||
),
|
||||
};
|
||||
@@ -3450,12 +3719,7 @@ pub const Timer = struct {
|
||||
|
||||
map.put(vm.allocator, this.id, timeout) catch unreachable;
|
||||
|
||||
timeout.timer.set(
|
||||
id,
|
||||
Timeout.run,
|
||||
this.interval,
|
||||
@as(i32, @intFromBool(this.kind == .setInterval)) * this.interval,
|
||||
);
|
||||
timeout.timer.?.schedule(this.interval);
|
||||
return this_value;
|
||||
}
|
||||
return JSValue.jsUndefined();
|
||||
@@ -3503,12 +3767,89 @@ pub const Timer = struct {
|
||||
pub const Timeout = struct {
|
||||
callback: JSC.Strong = .{},
|
||||
globalThis: *JSC.JSGlobalObject,
|
||||
timer: *uws.Timer,
|
||||
timer: ?*TimerReference = null,
|
||||
did_unref_timer: bool = false,
|
||||
poll_ref: Async.KeepAlive = Async.KeepAlive.init(),
|
||||
arguments: JSC.Strong = .{},
|
||||
has_scheduled_job: bool = false,
|
||||
|
||||
pub const TimerReference = struct {
|
||||
id: ID = .{ .id = 0 },
|
||||
cancelled: bool = false,
|
||||
|
||||
event_loop: *JSC.EventLoop,
|
||||
timer: bun.io.Timer = .{
|
||||
.tag = .TimerReference,
|
||||
.next = std.mem.zeroes(std.os.timespec),
|
||||
},
|
||||
request: bun.io.Request = .{
|
||||
.callback = &onRequest,
|
||||
},
|
||||
interval: i32 = -1,
|
||||
concurrent_task: JSC.ConcurrentTask = undefined,
|
||||
next: ?*TimerReference = null,
|
||||
|
||||
pub const Pool = bun.HiveArray(TimerReference, 1024).Fallback;
|
||||
|
||||
fn onRequest(req: *bun.io.Request) bun.io.Action {
|
||||
var this: *TimerReference = @fieldParentPtr(TimerReference, "request", req);
|
||||
if (this.timer.state == .CANCELLED) {
|
||||
this.deinit();
|
||||
return bun.io.Action{
|
||||
.timer_cancelled = {},
|
||||
};
|
||||
}
|
||||
return bun.io.Action{
|
||||
.timer = &this.timer,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn callback(this: *TimerReference) bun.io.Timer.Arm {
|
||||
_ = this;
|
||||
|
||||
// TODO:
|
||||
return .{ .disarm = {} };
|
||||
}
|
||||
|
||||
pub fn runFromJSThread(this: *TimerReference) void {
|
||||
const timer_id = this.id;
|
||||
const vm = this.event_loop.virtual_machine;
|
||||
|
||||
if (this.cancelled) {
|
||||
this.deinit();
|
||||
return;
|
||||
}
|
||||
if (Timeout.runFromConcurrentTask(timer_id, vm) and !this.cancelled) {
|
||||
this.request = .{
|
||||
.callback = &onRequest,
|
||||
};
|
||||
this.schedule(null);
|
||||
} else {
|
||||
this.deinit();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(this: *TimerReference) void {
|
||||
this.event_loop.timerReferencePool().put(this);
|
||||
}
|
||||
|
||||
pub fn create(event_loop: *JSC.EventLoop, id: ID) *TimerReference {
|
||||
const timer = event_loop.timerReferencePool().get();
|
||||
timer.* = .{
|
||||
.id = id,
|
||||
.event_loop = event_loop,
|
||||
};
|
||||
return timer;
|
||||
}
|
||||
|
||||
pub fn schedule(this: *TimerReference, interval: ?i32) void {
|
||||
std.debug.assert(!this.cancelled);
|
||||
this.timer.state = .PENDING;
|
||||
this.timer.next = msToTimespec(@intCast(@max(interval orelse this.interval, 1)));
|
||||
bun.io.Loop.get().schedule(&this.request);
|
||||
}
|
||||
};
|
||||
|
||||
pub const Kind = enum(u32) {
|
||||
setTimeout,
|
||||
setInterval,
|
||||
@@ -3535,8 +3876,99 @@ pub const Timer = struct {
|
||||
|
||||
// use the threadlocal despite being slow on macOS
|
||||
// to handle the timeout being cancelled after already enqueued
|
||||
var vm = JSC.VirtualMachine.get();
|
||||
const vm = JSC.VirtualMachine.get();
|
||||
|
||||
runWithIDAndVM(timer_id, vm);
|
||||
}
|
||||
|
||||
pub fn runFromConcurrentTask(timer_id: ID, vm: *JSC.VirtualMachine) bool {
|
||||
const repeats = timer_id.repeats();
|
||||
|
||||
var map = vm.timer.maps.get(timer_id.kind);
|
||||
|
||||
const this_: ?Timeout = map.get(
|
||||
timer_id.id,
|
||||
) orelse return false;
|
||||
var this = this_ orelse
|
||||
return false;
|
||||
|
||||
const globalThis = this.globalThis;
|
||||
|
||||
// Disable thundering herd of setInterval() calls
|
||||
// Skip setInterval() calls when the previous one has not been run yet.
|
||||
if (repeats and this.has_scheduled_job) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const cb: CallbackJob = .{
|
||||
.callback = if (repeats)
|
||||
JSC.Strong.create(
|
||||
this.callback.get() orelse {
|
||||
// if the callback was freed, that's an error
|
||||
if (comptime Environment.allow_assert)
|
||||
unreachable;
|
||||
|
||||
this.deinit();
|
||||
_ = map.swapRemove(timer_id.id);
|
||||
return false;
|
||||
},
|
||||
globalThis,
|
||||
)
|
||||
else
|
||||
this.callback,
|
||||
.arguments = if (repeats and this.arguments.has())
|
||||
JSC.Strong.create(
|
||||
this.arguments.get() orelse {
|
||||
// if the arguments freed, that's an error
|
||||
if (comptime Environment.allow_assert)
|
||||
unreachable;
|
||||
|
||||
this.deinit();
|
||||
_ = map.swapRemove(timer_id.id);
|
||||
return false;
|
||||
},
|
||||
globalThis,
|
||||
)
|
||||
else
|
||||
this.arguments,
|
||||
.globalThis = globalThis,
|
||||
.id = timer_id.id,
|
||||
.kind = timer_id.kind,
|
||||
};
|
||||
|
||||
var reschedule = false;
|
||||
// This allows us to:
|
||||
// - free the memory before the job is run
|
||||
// - reuse the JSC.Strong
|
||||
if (!repeats) {
|
||||
this.callback = .{};
|
||||
this.arguments = .{};
|
||||
map.put(vm.allocator, timer_id.id, null) catch unreachable;
|
||||
this.deinit();
|
||||
} else {
|
||||
this.has_scheduled_job = true;
|
||||
map.put(vm.allocator, timer_id.id, this) catch {};
|
||||
reschedule = true;
|
||||
}
|
||||
|
||||
// TODO: remove this memory allocation!
|
||||
var job = vm.allocator.create(CallbackJob) catch @panic(
|
||||
"Out of memory while allocating Timeout",
|
||||
);
|
||||
job.* = cb;
|
||||
job.task = CallbackJob.Task.init(job);
|
||||
job.ref.ref(vm);
|
||||
|
||||
if (vm.isInspectorEnabled()) {
|
||||
Debugger.didScheduleAsyncCall(globalThis, .DOMTimer, timer_id.asyncID(), !repeats);
|
||||
}
|
||||
|
||||
job.perform();
|
||||
|
||||
return reschedule;
|
||||
}
|
||||
|
||||
pub fn runWithIDAndVM(timer_id: ID, vm: *JSC.VirtualMachine) void {
|
||||
const repeats = timer_id.repeats();
|
||||
|
||||
var map = vm.timer.maps.get(timer_id.kind);
|
||||
@@ -3625,7 +4057,9 @@ pub const Timer = struct {
|
||||
|
||||
this.poll_ref.unref(vm);
|
||||
|
||||
this.timer.deinit(false);
|
||||
if (this.timer) |timer| {
|
||||
timer.cancelled = true;
|
||||
}
|
||||
|
||||
if (comptime Environment.isPosix)
|
||||
// balance double unreffing in doUnref
|
||||
@@ -3683,8 +4117,8 @@ pub const Timer = struct {
|
||||
var timeout = Timeout{
|
||||
.callback = JSC.Strong.create(callback, globalThis),
|
||||
.globalThis = globalThis,
|
||||
.timer = uws.Timer.create(
|
||||
vm.uwsLoop(),
|
||||
.timer = Timeout.TimerReference.create(
|
||||
vm.eventLoop(),
|
||||
Timeout.ID{
|
||||
.id = id,
|
||||
.kind = kind,
|
||||
@@ -3703,15 +4137,7 @@ pub const Timer = struct {
|
||||
Debugger.didScheduleAsyncCall(globalThis, .DOMTimer, Timeout.ID.asyncID(.{ .id = id, .kind = kind }), !repeat);
|
||||
}
|
||||
|
||||
timeout.timer.set(
|
||||
Timeout.ID{
|
||||
.id = id,
|
||||
.kind = kind,
|
||||
},
|
||||
Timeout.run,
|
||||
interval,
|
||||
@as(i32, @intFromBool(kind == .setInterval)) * interval,
|
||||
);
|
||||
timeout.timer.?.schedule(interval);
|
||||
}
|
||||
|
||||
pub fn setImmediate(
|
||||
@@ -3726,11 +4152,11 @@ pub const Timer = struct {
|
||||
const interval: i32 = 0;
|
||||
|
||||
const wrappedCallback = callback.withAsyncContextIfNeeded(globalThis);
|
||||
var timer_object: *TimerObject = undefined;
|
||||
const timer_js = TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments, &timer_object);
|
||||
timer_js.ensureStillAlive();
|
||||
|
||||
Timer.set(id, globalThis, wrappedCallback, interval, arguments, false) catch
|
||||
return JSValue.jsUndefined();
|
||||
|
||||
return TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments);
|
||||
return timer_js;
|
||||
}
|
||||
|
||||
comptime {
|
||||
@@ -3746,8 +4172,9 @@ pub const Timer = struct {
|
||||
arguments: JSValue,
|
||||
) callconv(.C) JSValue {
|
||||
JSC.markBinding(@src());
|
||||
const id = globalThis.bunVM().timer.last_id;
|
||||
globalThis.bunVM().timer.last_id +%= 1;
|
||||
var vm = globalThis.bunVM();
|
||||
const id = vm.timer.last_id;
|
||||
vm.timer.last_id +%= 1;
|
||||
|
||||
const interval: i32 = @max(
|
||||
countdown.coerce(i32, globalThis),
|
||||
@@ -3757,10 +4184,12 @@ pub const Timer = struct {
|
||||
|
||||
const wrappedCallback = callback.withAsyncContextIfNeeded(globalThis);
|
||||
|
||||
Timer.set(id, globalThis, wrappedCallback, interval, arguments, false) catch
|
||||
return JSValue.jsUndefined();
|
||||
|
||||
return TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments);
|
||||
var timer_object: *TimerObject = undefined;
|
||||
const timer_js = TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments, &timer_object);
|
||||
timer_js.ensureStillAlive();
|
||||
vm.timer_heap.insert(vm, globalThis, @intCast(id), timer_js, timer_object);
|
||||
timer_object.keep_alive.ref(vm);
|
||||
return timer_js;
|
||||
}
|
||||
pub fn setInterval(
|
||||
globalThis: *JSGlobalObject,
|
||||
@@ -3769,8 +4198,9 @@ pub const Timer = struct {
|
||||
arguments: JSValue,
|
||||
) callconv(.C) JSValue {
|
||||
JSC.markBinding(@src());
|
||||
const id = globalThis.bunVM().timer.last_id;
|
||||
globalThis.bunVM().timer.last_id +%= 1;
|
||||
const vm = globalThis.bunVM();
|
||||
const id = vm.timer.last_id;
|
||||
vm.timer.last_id +%= 1;
|
||||
|
||||
const wrappedCallback = callback.withAsyncContextIfNeeded(globalThis);
|
||||
|
||||
@@ -3780,10 +4210,13 @@ pub const Timer = struct {
|
||||
countdown.coerce(i32, globalThis),
|
||||
1,
|
||||
);
|
||||
Timer.set(id, globalThis, wrappedCallback, interval, arguments, true) catch
|
||||
return JSValue.jsUndefined();
|
||||
|
||||
return TimerObject.init(globalThis, id, .setInterval, interval, wrappedCallback, arguments);
|
||||
var timer_object: *TimerObject = undefined;
|
||||
const timer_js = TimerObject.init(globalThis, id, .setTimeout, interval, wrappedCallback, arguments, &timer_object);
|
||||
timer_js.ensureStillAlive();
|
||||
vm.timer_heap.insert(vm, globalThis, @intCast(id), timer_js, timer_object);
|
||||
timer_object.keep_alive.ref(vm);
|
||||
return timer_js;
|
||||
}
|
||||
|
||||
pub fn clearTimer(timer_id_value: JSValue, globalThis: *JSGlobalObject, repeats: bool) void {
|
||||
|
||||
@@ -272,8 +272,7 @@ JSC_DEFINE_HOST_FUNCTION(functionBunSleepThenCallback,
|
||||
{
|
||||
JSC::VM& vm = globalObject->vm();
|
||||
|
||||
RELEASE_ASSERT(callFrame->argumentCount() == 1);
|
||||
JSPromise* promise = jsCast<JSC::JSPromise*>(callFrame->argument(0));
|
||||
JSPromise* promise = jsDynamicCast<JSC::JSPromise*>(callFrame->argument(0));
|
||||
RELEASE_ASSERT(promise);
|
||||
|
||||
promise->resolve(globalObject, JSC::jsUndefined());
|
||||
|
||||
@@ -2417,8 +2417,8 @@ CPP_DECL void JSC__JSValue__putIndex(JSC__JSValue JSValue0, JSC__JSGlobalObject*
|
||||
{
|
||||
JSC::JSValue value = JSC::JSValue::decode(JSValue0);
|
||||
JSC::JSValue value2 = JSC::JSValue::decode(JSValue3);
|
||||
JSC::JSArray* array = JSC::jsCast<JSC::JSArray*>(value);
|
||||
array->putDirectIndex(arg1, arg2, value2);
|
||||
JSC::JSObject* object = value.getObject();
|
||||
object->putDirectIndex(arg1, arg2, value2);
|
||||
}
|
||||
|
||||
CPP_DECL void JSC__JSValue__push(JSC__JSValue JSValue0, JSC__JSGlobalObject* arg1, JSC__JSValue JSValue3)
|
||||
|
||||
@@ -339,6 +339,8 @@ const Lchmod = JSC.Node.Async.lchmod;
|
||||
const Lchown = JSC.Node.Async.lchown;
|
||||
const Unlink = JSC.Node.Async.unlink;
|
||||
const WaitPidResultTask = JSC.Subprocess.WaiterThread.WaitPidResultTask;
|
||||
const TimerReference = JSC.BunTimer.Timeout.TimerReference;
|
||||
const TimerMap = JSC.BunTimer.TimerObject.TimerMap;
|
||||
// Task.get(ReadFileTask) -> ?ReadFileTask
|
||||
pub const Task = TaggedPointerUnion(.{
|
||||
FetchTasklet,
|
||||
@@ -398,6 +400,8 @@ pub const Task = TaggedPointerUnion(.{
|
||||
Lchown,
|
||||
Unlink,
|
||||
WaitPidResultTask,
|
||||
TimerReference,
|
||||
TimerMap,
|
||||
});
|
||||
const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue;
|
||||
pub const ConcurrentTask = struct {
|
||||
@@ -623,6 +627,22 @@ pub const EventLoop = struct {
|
||||
forever_timer: ?*uws.Timer = null,
|
||||
deferred_microtask_map: std.AutoArrayHashMapUnmanaged(?*anyopaque, DeferredRepeatingTask) = .{},
|
||||
uws_loop: if (Environment.isWindows) *uws.Loop else void = undefined,
|
||||
|
||||
timer_reference_pool: if (Environment.isPosix) ?*bun.JSC.BunTimer.Timeout.TimerReference.Pool else void = if (Environment.isPosix) null else undefined,
|
||||
|
||||
pub fn timerReferencePool(this: *EventLoop) *bun.JSC.BunTimer.Timeout.TimerReference.Pool {
|
||||
if (comptime !Environment.isPosix) {
|
||||
@compileError("This function is only available on POSIX platforms");
|
||||
}
|
||||
|
||||
return this.timer_reference_pool orelse brk: {
|
||||
const _pool = bun.default_allocator.create(bun.JSC.BunTimer.Timeout.TimerReference.Pool) catch bun.outOfMemory();
|
||||
_pool.* = bun.JSC.BunTimer.Timeout.TimerReference.Pool.init(bun.default_allocator);
|
||||
this.timer_reference_pool = _pool;
|
||||
break :brk _pool;
|
||||
};
|
||||
}
|
||||
|
||||
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
|
||||
const log = bun.Output.scoped(.EventLoop, false);
|
||||
|
||||
@@ -919,6 +939,20 @@ pub const EventLoop = struct {
|
||||
var any: *WaitPidResultTask = task.get(WaitPidResultTask).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(TimerReference))) => {
|
||||
var any: *TimerReference = task.get(TimerReference).?;
|
||||
var next_ = any.next;
|
||||
any.runFromJSThread();
|
||||
while (next_) |next| {
|
||||
next_ = next.next;
|
||||
next.runFromJSThread();
|
||||
}
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(TimerMap))) => {
|
||||
var any: *TimerMap = task.get(TimerMap).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
|
||||
else => if (Environment.allow_assert) {
|
||||
bun.Output.prettyln("\nUnexpected tag: {s}\n", .{@tagName(task.tag())});
|
||||
} else {
|
||||
@@ -1301,6 +1335,18 @@ pub const EventLoop = struct {
|
||||
this.concurrent_tasks.push(task);
|
||||
this.wakeup();
|
||||
}
|
||||
|
||||
pub fn enqueueTaskConcurrentBatch(this: *EventLoop, batch: ConcurrentTask.Queue.Batch) void {
|
||||
JSC.markBinding(@src());
|
||||
if (comptime Environment.allow_assert) {
|
||||
if (this.virtual_machine.has_terminated) {
|
||||
@panic("EventLoop.enqueueTaskConcurrent: VM has terminated");
|
||||
}
|
||||
}
|
||||
|
||||
this.concurrent_tasks.pushBatch(batch.front.?, batch.last.?, batch.count);
|
||||
this.wakeup();
|
||||
}
|
||||
};
|
||||
|
||||
pub const MiniEventLoop = struct {
|
||||
|
||||
@@ -498,6 +498,8 @@ pub const VirtualMachine = struct {
|
||||
hot_reload: bun.CLI.Command.HotReload = .none,
|
||||
jsc: *JSC.VM = undefined,
|
||||
|
||||
timer_heap: bun.JSC.BunTimer.TimerObject.TimerMap = .{},
|
||||
|
||||
/// hide bun:wrap from stack traces
|
||||
/// bun:wrap is very noisy
|
||||
hide_bun_stackframes: bool = true,
|
||||
|
||||
115
src/io/io.zig
115
src/io/io.zig
@@ -3,7 +3,7 @@ const std = @import("std");
|
||||
const sys = bun.sys;
|
||||
const linux = std.os.linux;
|
||||
const Environment = bun.Environment;
|
||||
const heap = @import("./heap.zig");
|
||||
pub const heap = @import("./heap.zig");
|
||||
const JSC = bun.JSC;
|
||||
|
||||
const log = bun.Output.scoped(.loop, false);
|
||||
@@ -92,6 +92,8 @@ pub const Loop = struct {
|
||||
@compileError("Epoll is Linux-Only");
|
||||
}
|
||||
|
||||
this.updateNow();
|
||||
|
||||
while (true) {
|
||||
|
||||
// Process pending requests
|
||||
@@ -127,22 +129,12 @@ pub const Loop = struct {
|
||||
this.active -= 1;
|
||||
close.onDone(close.ctx);
|
||||
},
|
||||
.timer_cancelled => {},
|
||||
.timer => |timer| {
|
||||
while (true) {
|
||||
switch (timer.state) {
|
||||
.PENDING => {
|
||||
timer.state = .ACTIVE;
|
||||
if (Timer.less({}, timer, &.{ .next = this.cached_now })) {
|
||||
if (timer.fire() == .rearm) {
|
||||
if (timer.reset) |reset| {
|
||||
timer.next = reset;
|
||||
timer.reset = null;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
this.timers.insert(timer);
|
||||
},
|
||||
.ACTIVE => {
|
||||
@@ -174,7 +166,9 @@ pub const Loop = struct {
|
||||
@as(u64, @intCast(this.cached_now.tv_nsec)) / std.time.ns_per_ms;
|
||||
const ms_next = @as(u64, @intCast(t.next.tv_sec)) * std.time.ms_per_s +
|
||||
@as(u64, @intCast(t.next.tv_nsec)) / std.time.ns_per_ms;
|
||||
break :timeout @as(i32, @intCast(ms_next -| ms_now));
|
||||
const out = @as(i32, @intCast(ms_next -| ms_now));
|
||||
|
||||
break :timeout @max(out, 0);
|
||||
};
|
||||
|
||||
var events: [256]EventType = undefined;
|
||||
@@ -230,6 +224,8 @@ pub const Loop = struct {
|
||||
@compileError("Kqueue is MacOS-Only");
|
||||
}
|
||||
|
||||
this.updateNow();
|
||||
|
||||
while (true) {
|
||||
var stack_fallback = std.heap.stackFallback(@sizeOf([256]EventType), bun.default_allocator);
|
||||
var events_list: std.ArrayList(EventType) = std.ArrayList(EventType).initCapacity(stack_fallback.get(), 256) catch unreachable;
|
||||
@@ -270,6 +266,7 @@ pub const Loop = struct {
|
||||
&events_list.items.ptr[i],
|
||||
);
|
||||
},
|
||||
|
||||
.close => |close| {
|
||||
if (close.poll.flags.contains(.poll_readable) or close.poll.flags.contains(.poll_writable)) {
|
||||
const i = events_list.items.len;
|
||||
@@ -285,22 +282,12 @@ pub const Loop = struct {
|
||||
}
|
||||
close.onDone(close.ctx);
|
||||
},
|
||||
.timer_cancelled => {},
|
||||
.timer => |timer| {
|
||||
while (true) {
|
||||
switch (timer.state) {
|
||||
.PENDING => {
|
||||
timer.state = .ACTIVE;
|
||||
if (Timer.less({}, timer, &.{ .next = this.cached_now })) {
|
||||
if (timer.fire() == .rearm) {
|
||||
if (timer.reset) |reset| {
|
||||
timer.next = reset;
|
||||
timer.reset = null;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
this.timers.insert(timer);
|
||||
},
|
||||
.ACTIVE => {
|
||||
@@ -331,6 +318,15 @@ pub const Loop = struct {
|
||||
out.tv_sec = t.next.tv_sec -| this.cached_now.tv_sec;
|
||||
out.tv_nsec = t.next.tv_nsec -| this.cached_now.tv_nsec;
|
||||
|
||||
if (out.tv_nsec < 0) {
|
||||
out.tv_sec -= 1;
|
||||
out.tv_nsec += std.time.ns_per_s;
|
||||
}
|
||||
|
||||
if (out.tv_sec < 0) {
|
||||
break :timeout null;
|
||||
}
|
||||
|
||||
break :timeout out;
|
||||
};
|
||||
|
||||
@@ -367,6 +363,9 @@ pub const Loop = struct {
|
||||
fn drainExpiredTimers(this: *Loop) void {
|
||||
const now = Timer{ .next = this.cached_now };
|
||||
|
||||
var current_batch = JSC.ConcurrentTask.Queue.Batch{};
|
||||
var prev_event_loop: ?*JSC.EventLoop = null;
|
||||
|
||||
// Run our expired timers
|
||||
while (this.timers.peek()) |t| {
|
||||
if (!Timer.less({}, t, &now)) break;
|
||||
@@ -377,7 +376,10 @@ pub const Loop = struct {
|
||||
// Mark completion as done
|
||||
t.state = .FIRED;
|
||||
|
||||
switch (t.fire()) {
|
||||
switch (t.fire(
|
||||
¤t_batch,
|
||||
&prev_event_loop,
|
||||
)) {
|
||||
.disarm => {},
|
||||
.rearm => |new| {
|
||||
t.next = new;
|
||||
@@ -387,14 +389,22 @@ pub const Loop = struct {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if (prev_event_loop) |event_loop| {
|
||||
event_loop.enqueueTaskConcurrent(current_batch.front.?);
|
||||
}
|
||||
}
|
||||
|
||||
fn updateNow(this: *Loop) void {
|
||||
updateTimespec(&this.cached_now);
|
||||
}
|
||||
|
||||
pub fn updateTimespec(timespec: *os.timespec) void {
|
||||
if (comptime Environment.isLinux) {
|
||||
const rc = linux.clock_gettime(linux.CLOCK.MONOTONIC, &this.cached_now);
|
||||
const rc = linux.clock_gettime(linux.CLOCK.MONOTONIC, timespec);
|
||||
assert(rc == 0);
|
||||
} else if (comptime Environment.isMac) {
|
||||
std.os.clock_gettime(std.os.CLOCK.MONOTONIC, &this.cached_now) catch {};
|
||||
std.os.clock_gettime(std.os.CLOCK.MONOTONIC, timespec) catch {};
|
||||
} else {
|
||||
@compileError("TODO: implement poll for this platform");
|
||||
}
|
||||
@@ -416,6 +426,7 @@ pub const Action = union(enum) {
|
||||
writable: FileAction,
|
||||
close: CloseAction,
|
||||
timer: *Timer,
|
||||
timer_cancelled: void,
|
||||
|
||||
pub const FileAction = struct {
|
||||
fd: bun.FileDescriptor,
|
||||
@@ -482,6 +493,9 @@ const Pollable = struct {
|
||||
}
|
||||
};
|
||||
|
||||
const TimerReference = bun.JSC.BunTimer.Timeout.TimerReference;
|
||||
const ScheduledTimer = bun.JSC.BunTimer.TimerObject.TimerMap.ScheduledTimer;
|
||||
|
||||
pub const Timer = struct {
|
||||
/// The absolute time to fire this timer next.
|
||||
next: os.timespec,
|
||||
@@ -501,10 +515,14 @@ pub const Timer = struct {
|
||||
|
||||
pub const Tag = enum {
|
||||
TimerCallback,
|
||||
TimerReference,
|
||||
ScheduledTimer,
|
||||
|
||||
pub fn Type(comptime T: Tag) type {
|
||||
return switch (T) {
|
||||
.TimerCallback => TimerCallback,
|
||||
.TimerReference => TimerReference,
|
||||
.ScheduledTimer => ScheduledTimer,
|
||||
};
|
||||
}
|
||||
};
|
||||
@@ -556,10 +574,51 @@ pub const Timer = struct {
|
||||
disarm,
|
||||
};
|
||||
|
||||
pub fn fire(this: *Timer) Arm {
|
||||
pub fn fire(this: *Timer, batch: *JSC.ConcurrentTask.Queue.Batch, event_loop: *?*JSC.EventLoop) Arm {
|
||||
if (comptime Environment.allow_assert) {
|
||||
if (comptime Environment.isPosix) {
|
||||
const timer = std.time.Instant{ .timestamp = this.next };
|
||||
var now = std.time.Instant{ .timestamp = undefined };
|
||||
Loop.updateTimespec(&now.timestamp);
|
||||
|
||||
if (timer.order(now) != .lt) {
|
||||
bun.Output.panic("Timer fired {} too early", .{bun.fmt.fmtDuration(timer.since(now))});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch (this.tag) {
|
||||
inline else => |t| {
|
||||
var container: *t.Type() = @fieldParentPtr(t.Type(), "timer", this);
|
||||
if (comptime t.Type() == ScheduledTimer) {
|
||||
return container.callback();
|
||||
} else if (comptime @hasDecl(t.Type(), "callback")) {
|
||||
const concurrent_task = container.concurrent_task.from(container, .manual_deinit);
|
||||
if (event_loop.*) |loop| {
|
||||
// If they are different event loops, we have to drain the batch right here.
|
||||
if (loop != container.event_loop) {
|
||||
loop.enqueueTaskConcurrent(batch.front.?);
|
||||
event_loop.* = container.event_loop;
|
||||
}
|
||||
|
||||
if (batch.front == null) {
|
||||
batch.front = concurrent_task;
|
||||
}
|
||||
} else {
|
||||
batch.front = concurrent_task;
|
||||
event_loop.* = container.event_loop;
|
||||
}
|
||||
|
||||
if (batch.last) |last_task| {
|
||||
var last = @fieldParentPtr(t.Type(), "concurrent_task", last_task);
|
||||
last.next = container;
|
||||
}
|
||||
|
||||
batch.last = concurrent_task;
|
||||
batch.count += 1;
|
||||
return container.callback();
|
||||
}
|
||||
|
||||
return container.callback(container);
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user