Compare commits

...

17 Commits

Author SHA1 Message Date
Jarred Sumner
2d78d45ed8 Make AbortSignal.timeout(0) slower 2024-12-05 21:55:45 -08:00
Jarred Sumner
b224ebadb8 Revert "[build images]"
This reverts commit fc6eb9959b.
2024-12-05 21:41:03 -08:00
Jarred Sumner
fc6eb9959b [build images] 2024-12-05 21:40:29 -08:00
Jarred Sumner
6553052a32 Update event_loop.zig 2024-12-05 20:44:26 -08:00
Jarred Sumner
cd4a7caf37 Merge branch 'main' into jarred/change-timers 2024-12-05 20:43:20 -08:00
Jarred Sumner
318cefb211 Implement tickWithTimeout on Windows 2024-12-05 19:08:34 -08:00
Jarred Sumner
c69912e57a Update event_loop.zig 2024-12-05 18:45:49 -08:00
Jarred Sumner
0d06f2d32b Merge branch 'main' into jarred/change-timers 2024-12-05 18:43:01 -08:00
Jarred Sumner
3cbd73176f Revert "maybe fix windows"
This reverts commit 35d16bea36.
2024-12-05 18:40:35 -08:00
Dylan Conway
35d16bea36 maybe fix windows 2024-11-25 15:11:54 -08:00
Jarred Sumner
5e191eaf3a Add test that timed out in v1.1.36 2024-11-24 05:43:09 -08:00
Jarred Sumner
a91f59b9a2 Register for AbortSignal before the DNS completes 2024-11-24 05:29:14 -08:00
Jarred Sumner
6e7362c241 Delete uws.Timer 2024-11-24 05:16:16 -08:00
Jarred Sumner
b5bed9ec05 Make this test better 2024-11-24 04:30:18 -08:00
Jarred Sumner
8869ddafba Enable this test 2024-11-24 04:03:58 -08:00
Jarred Sumner
f8e87e34d6 Update http-hello.js 2024-11-24 02:51:04 -08:00
Jarred Sumner
712c5bc80e Make AbortSignal.timeout() 10x faster 2024-11-24 02:40:16 -08:00
14 changed files with 684 additions and 535 deletions

View File

@@ -1,7 +1,8 @@
var i = 0;
export default {
port: 3002,
fetch(req) {
if (i++ === 200_000 - 1) queueMicrotask(() => process.exit(0));
if (i++ === 200_000 - 1) setTimeout(() => process.exit(0), 0);
return new Response("Hello, World!" + i);
},
};

View File

@@ -32,6 +32,7 @@ struct us_loop_t {
uv_prepare_t *uv_pre;
uv_check_t *uv_check;
uv_timer_t *uv_tick_with_timeout_timer;
};
// it is no longer valid to cast a pointer to us_poll_t to a pointer of

View File

@@ -130,33 +130,13 @@ pub const All = struct {
}
pub fn getTimeout(this: *const All, spec: *timespec) bool {
if (this.active_timer_count == 0) {
return false;
}
if (this.timers.peek()) |min| {
const now = timespec.now();
switch (now.order(&min.next)) {
.gt, .eq => {
spec.* = .{ .nsec = 0, .sec = 0 };
return true;
},
.lt => {
spec.* = min.next.duration(&now);
return true;
},
}
}
return false;
}
export fn Bun__internal_drainTimers(vm: *VirtualMachine) callconv(.C) void {
drainTimers(&vm.timer, vm);
}
comptime {
_ = &Bun__internal_drainTimers;
const min = this.timers.peek() orelse return false;
const now = timespec.now();
spec.* = switch (now.order(&min.next)) {
.gt, .eq => .{ .nsec = 0, .sec = 0 },
.lt => min.next.duration(&now),
};
return true;
}
pub fn drainTimers(this: *All, vm: *VirtualMachine) void {
@@ -715,7 +695,7 @@ const heap = bun.io.heap;
pub const EventLoopTimer = struct {
/// The absolute time to fire this timer next.
next: timespec,
next: timespec = .{},
/// Internal heap fields.
heap: heap.IntrusiveField(EventLoopTimer) = .{},
@@ -724,6 +704,44 @@ pub const EventLoopTimer = struct {
tag: Tag = .TimerCallback,
pub fn cancel(this: *@This(), vm: *JSC.VirtualMachine) void {
if (this.state == .ACTIVE) {
vm.timer.remove(this);
}
}
pub fn set(this: *@This(), vm: *JSC.VirtualMachine, interval: i32) void {
// if the timer is active we need to remove it
if (this.state == .ACTIVE) {
vm.timer.remove(this);
}
// if the interval is 0 means that we stop the timer
if (interval == 0) {
return;
}
// reschedule the timer
this.next = bun.timespec.msFromNow(interval);
vm.timer.insert(this);
}
pub fn setWithTimespec(this: *@This(), vm: *JSC.VirtualMachine, next: *const timespec, interval: i32) void {
// if the timer is active we need to remove it
if (this.state == .ACTIVE) {
vm.timer.remove(this);
}
// if the interval is 0 means that we stop the timer
if (interval == 0) {
return;
}
// reschedule the timer
this.next = next.addMs(interval);
vm.timer.insert(this);
}
pub const Tag = if (Environment.isWindows) enum {
TimerCallback,
TimerObject,
@@ -731,7 +749,9 @@ pub const EventLoopTimer = struct {
StatWatcherScheduler,
UpgradedDuplex,
WindowsNamedPipe,
GCTimer,
GCRepeatingTimer,
TimeoutTask,
pub fn Type(comptime T: Tag) type {
return switch (T) {
.TimerCallback => TimerCallback,
@@ -740,6 +760,9 @@ pub const EventLoopTimer = struct {
.StatWatcherScheduler => StatWatcherScheduler,
.UpgradedDuplex => uws.UpgradedDuplex,
.WindowsNamedPipe => uws.WindowsNamedPipe,
.GCTimer => JSC.GarbageCollectionController,
.GCRepeatingTimer => JSC.GarbageCollectionController,
.TimeoutTask => JSC.EventLoop.TimeoutTask,
};
}
} else enum {
@@ -748,7 +771,9 @@ pub const EventLoopTimer = struct {
TestRunner,
StatWatcherScheduler,
UpgradedDuplex,
GCTimer,
GCRepeatingTimer,
TimeoutTask,
pub fn Type(comptime T: Tag) type {
return switch (T) {
.TimerCallback => TimerCallback,
@@ -756,6 +781,9 @@ pub const EventLoopTimer = struct {
.TestRunner => JSC.Jest.TestRunner,
.StatWatcherScheduler => StatWatcherScheduler,
.UpgradedDuplex => uws.UpgradedDuplex,
.GCTimer => JSC.GarbageCollectionController,
.GCRepeatingTimer => JSC.GarbageCollectionController,
.TimeoutTask => JSC.EventLoop.TimeoutTask,
};
}
};
@@ -808,8 +836,33 @@ pub const EventLoopTimer = struct {
pub fn fire(this: *EventLoopTimer, now: *const timespec, vm: *VirtualMachine) Arm {
switch (this.tag) {
.GCTimer => {
const gc_controller: *JSC.GarbageCollectionController = @fieldParentPtr("gc_timer", this);
this.state = .FIRED;
gc_controller.onGCTimer();
return .disarm;
},
.GCRepeatingTimer => {
const gc_controller: *JSC.GarbageCollectionController = @fieldParentPtr("gc_repeating_timer", this);
this.state = .FIRED;
gc_controller.onGCRepeatingTimer(now);
if (this.state == .FIRED) {
// It's repeating, so let's repeat it.
this.next = now.addMs(gc_controller.repeatingTimeInterval());
vm.timer.insert(this);
}
return .disarm;
},
.TimeoutTask => {
const timeout_task: *JSC.EventLoop.TimeoutTask = @fieldParentPtr("event_loop_timer", this);
this.state = .FIRED;
timeout_task.schedule(vm);
return .disarm;
},
inline else => |t| {
var container: *t.Type() = @alignCast(@fieldParentPtr("event_loop_timer", this));
if (comptime t.Type() == TimerObject) {
return container.fire(now, vm);
}

View File

@@ -72,12 +72,8 @@ Ref<AbortSignal> AbortSignal::timeout(ScriptExecutionContext& context, uint64_t
signal->setHasActiveTimeoutTimer(false);
};
if (milliseconds == 0) {
// immediately write to task queue
context.postTask(WTFMove(action));
} else {
context.postTaskOnTimeout(WTFMove(action), Seconds::fromMilliseconds(milliseconds));
}
// Act like setTimeout(0) and always tick the event loop.
context.postTaskOnTimeout(WTFMove(action), Seconds::fromMilliseconds(milliseconds == 0 ? 1 : milliseconds));
return signal;
}
@@ -102,7 +98,7 @@ Ref<AbortSignal> AbortSignal::any(ScriptExecutionContext& context, const Vector<
AbortSignal::AbortSignal(ScriptExecutionContext* context, Aborted aborted, JSC::JSValue reason)
: ContextDestructionObserver(context)
, m_reason(reason)
, m_aborted(aborted == Aborted::Yes)
, m_flags(aborted == Aborted::Yes ? static_cast<uint8_t>(Flags::Aborted) : static_cast<uint8_t>(Flags::None))
{
ASSERT(reason);
}
@@ -145,11 +141,11 @@ void AbortSignal::addDependentSignal(AbortSignal& signal)
void AbortSignal::signalAbort(JSC::JSValue reason)
{
// 1. If signal's aborted flag is set, then return.
if (m_aborted)
if (aborted())
return;
// 2. Set signals aborted flag.
m_aborted = true;
setFlag(Flags::Aborted);
m_sourceSignals.clear();
// FIXME: This code is wrong: we should emit a write-barrier. Otherwise, GC can collect it.
@@ -178,7 +174,7 @@ void AbortSignal::signalAbort(JSC::JSValue reason)
void AbortSignal::signalAbort(JSC::JSGlobalObject* globalObject, CommonAbortReason reason)
{
// 1. If signal's aborted flag is set, then return.
if (m_aborted)
if (aborted())
return;
m_commonReason = reason;
@@ -218,7 +214,10 @@ void AbortSignal::signalFollow(AbortSignal& signal)
void AbortSignal::eventListenersDidChange()
{
m_hasAbortEventListener = hasEventListeners(eventNames().abortEvent);
if (hasEventListeners(eventNames().abortEvent))
setFlag(Flags::HasAbortEventListener);
else
clearFlag(Flags::HasAbortEventListener);
}
uint32_t AbortSignal::addAbortAlgorithmToSignal(AbortSignal& signal, Ref<AbortAlgorithm>&& algorithm)

View File

@@ -76,7 +76,7 @@ public:
void signalAbort(JSC::JSValue reason);
void signalFollow(AbortSignal&);
bool aborted() const { return m_aborted; }
bool aborted() const { return m_flags.load(std::memory_order_seq_cst) & static_cast<uint8_t>(Flags::Aborted); }
const JSValueInWrappedObject& reason() const { return m_reason; }
JSValue jsReason(JSC::JSGlobalObject& globalObject);
CommonAbortReason commonReason() const { return m_commonReason; }
@@ -84,8 +84,8 @@ public:
void cleanNativeBindings(void* ref);
void addNativeCallback(NativeCallbackTuple callback) { m_native_callbacks.append(callback); }
bool hasActiveTimeoutTimer() const { return m_hasActiveTimeoutTimer; }
bool hasAbortEventListener() const { return m_hasAbortEventListener; }
bool hasActiveTimeoutTimer() const { return m_flags.load(std::memory_order_seq_cst) & static_cast<uint8_t>(Flags::HasActiveTimeoutTimer); }
bool hasAbortEventListener() const { return m_flags.load(std::memory_order_seq_cst) & static_cast<uint8_t>(Flags::HasAbortEventListener); }
using RefCounted::deref;
using RefCounted::ref;
@@ -106,7 +106,7 @@ public:
void incrementPendingActivityCount() { ++pendingActivityCount; }
void decrementPendingActivityCount() { --pendingActivityCount; }
bool hasPendingActivity() const { return pendingActivityCount > 0; }
bool isDependent() const { return m_isDependent; }
bool isDependent() const { return m_flags.load(std::memory_order_seq_cst) & static_cast<uint8_t>(Flags::IsDependent); }
private:
enum class Aborted : bool {
@@ -114,10 +114,24 @@ private:
Yes
};
explicit AbortSignal(ScriptExecutionContext*, Aborted = Aborted::No, JSC::JSValue reason = JSC::jsUndefined());
enum class Flags : uint8_t {
None = 0,
Aborted = 2,
HasActiveTimeoutTimer = 4,
HasAbortEventListener = 8,
IsDependent = 16,
};
void setFlag(Flags flags)
{
m_flags.fetch_or(static_cast<uint8_t>(flags));
}
void clearFlag(Flags flags)
{
m_flags.fetch_and(~static_cast<uint8_t>(flags));
}
void setHasActiveTimeoutTimer(bool hasActiveTimeoutTimer) { m_hasActiveTimeoutTimer = hasActiveTimeoutTimer; }
void markAsDependent() { m_isDependent = true; }
void setHasActiveTimeoutTimer(bool hasActiveTimeoutTimer) { setFlag(Flags::HasActiveTimeoutTimer); }
void markAsDependent() { setFlag(Flags::IsDependent); }
void addSourceSignal(AbortSignal&);
void addDependentSignal(AbortSignal&);
@@ -137,10 +151,7 @@ private:
Vector<NativeCallbackTuple, 2> m_native_callbacks;
std::atomic<uint32_t> pendingActivityCount { 0 };
uint32_t m_algorithmIdentifier { 0 };
bool m_aborted : 1 = false;
bool m_hasActiveTimeoutTimer : 1 = false;
bool m_hasAbortEventListener : 1 = false;
bool m_isDependent : 1 = false;
std::atomic<uint8_t> m_flags { 0 };
};
WebCoreOpaqueRoot root(AbortSignal*);

View File

@@ -21,6 +21,7 @@ const FetchTasklet = Fetch.FetchTasklet;
const JSValue = JSC.JSValue;
const js = JSC.C;
const Waker = bun.Async.Waker;
const log = bun.Output.scoped(.EventLoop, false);
pub const WorkPool = @import("../work_pool.zig").WorkPool;
pub const WorkPoolTask = @import("../work_pool.zig").Task;
@@ -532,20 +533,23 @@ pub const ConcurrentTask = struct {
// This type must be unique per JavaScript thread
pub const GarbageCollectionController = struct {
gc_timer: *uws.Timer = undefined,
gc_timer: EventLoopTimer = .{
.tag = .GCTimer,
},
gc_last_heap_size: usize = 0,
gc_last_heap_size_on_repeating_timer: usize = 0,
heap_size_didnt_change_for_repeating_timer_ticks_count: u8 = 0,
gc_timer_state: GCTimerState = GCTimerState.pending,
gc_repeating_timer: *uws.Timer = undefined,
gc_repeating_timer: EventLoopTimer = .{
.tag = .GCRepeatingTimer,
},
gc_timer_interval: i32 = 0,
gc_repeating_timer_fast: bool = true,
disabled: bool = false,
pub fn init(this: *GarbageCollectionController, vm: *VirtualMachine) void {
const actual = uws.Loop.get();
this.gc_timer = uws.Timer.createFallthrough(actual, this);
this.gc_repeating_timer = uws.Timer.createFallthrough(actual, this);
actual.internal_loop_data.jsc_vm = vm.jsc;
if (comptime Environment.isDebug) {
@@ -567,21 +571,22 @@ pub const GarbageCollectionController = struct {
this.disabled = vm.bundler.env.has("BUN_GC_TIMER_DISABLE");
if (!this.disabled)
this.gc_repeating_timer.set(this, onGCRepeatingTimer, gc_timer_interval, gc_timer_interval);
this.gc_repeating_timer.set(vm, gc_timer_interval);
}
pub fn scheduleGCTimer(this: *GarbageCollectionController) void {
pub fn scheduleGCTimer(this: *GarbageCollectionController, now: *const bun.timespec) void {
this.gc_timer_state = .scheduled;
this.gc_timer.set(this, onGCTimer, 16, 0);
this.gc_timer.setWithTimespec(this.bunVM(), now, 16);
}
pub fn bunVM(this: *GarbageCollectionController) *VirtualMachine {
return @alignCast(@fieldParentPtr("gc_controller", this));
}
pub fn onGCTimer(timer: *uws.Timer) callconv(.C) void {
var this = timer.as(*GarbageCollectionController);
pub fn onGCTimer(this: *GarbageCollectionController) void {
if (this.disabled) return;
log("onGCTimer()", .{});
this.gc_timer_state = .run_on_next_tick;
}
@@ -596,20 +601,35 @@ pub const GarbageCollectionController = struct {
//
// When the heap size is increasing, we always switch to fast mode
// When the heap size has been the same or less for 30 seconds, we switch to slow mode
pub fn updateGCRepeatTimer(this: *GarbageCollectionController, comptime setting: @Type(.EnumLiteral)) void {
pub fn updateGCRepeatTimer(this: *GarbageCollectionController, cached_timespec: ?*const bun.timespec, comptime setting: @Type(.EnumLiteral)) ?bun.timespec {
if (setting == .fast and !this.gc_repeating_timer_fast) {
this.gc_repeating_timer_fast = true;
this.gc_repeating_timer.set(this, onGCRepeatingTimer, this.gc_timer_interval, this.gc_timer_interval);
const ts = if (cached_timespec != null) cached_timespec.?.* else bun.timespec.now();
this.gc_repeating_timer.setWithTimespec(this.bunVM(), &ts, this.repeatingTimeInterval());
this.heap_size_didnt_change_for_repeating_timer_ticks_count = 0;
return ts;
} else if (setting == .slow and this.gc_repeating_timer_fast) {
this.gc_repeating_timer_fast = false;
this.gc_repeating_timer.set(this, onGCRepeatingTimer, 30_000, 30_000);
const ts = if (cached_timespec != null) cached_timespec.?.* else bun.timespec.now();
this.gc_repeating_timer.setWithTimespec(this.bunVM(), &ts, this.repeatingTimeInterval());
this.heap_size_didnt_change_for_repeating_timer_ticks_count = 0;
return ts;
}
return null;
}
pub fn onGCRepeatingTimer(timer: *uws.Timer) callconv(.C) void {
var this = timer.as(*GarbageCollectionController);
pub fn repeatingTimeInterval(this: *const GarbageCollectionController) i32 {
return if (this.gc_repeating_timer_fast) this.gc_timer_interval else 30_000;
}
pub fn onGCRepeatingTimer(this: *GarbageCollectionController, now: *const bun.timespec) void {
if (comptime Environment.enable_logs)
log("onGCRepeatingTimer(at: {d}ms, heap: {d}) - repeating every {d}ms", .{
now.ms(),
this.gc_last_heap_size_on_repeating_timer,
this.repeatingTimeInterval(),
});
const prev_heap_size = this.gc_last_heap_size_on_repeating_timer;
this.performGC();
this.gc_last_heap_size_on_repeating_timer = this.gc_last_heap_size;
@@ -617,11 +637,11 @@ pub const GarbageCollectionController = struct {
this.heap_size_didnt_change_for_repeating_timer_ticks_count +|= 1;
if (this.heap_size_didnt_change_for_repeating_timer_ticks_count >= 30) {
// make the timer interval longer
this.updateGCRepeatTimer(.slow);
_ = this.updateGCRepeatTimer(now, .slow);
}
} else {
this.heap_size_didnt_change_for_repeating_timer_ticks_count = 0;
this.updateGCRepeatTimer(.fast);
_ = this.updateGCRepeatTimer(now, .fast);
}
}
@@ -638,8 +658,9 @@ pub const GarbageCollectionController = struct {
.run_on_next_tick => {
// When memory usage is not stable, run the GC more.
if (this_heap_size != prev) {
this.scheduleGCTimer();
this.updateGCRepeatTimer(.fast);
const now = bun.timespec.now();
this.scheduleGCTimer(&now);
_ = this.updateGCRepeatTimer(&now, .fast);
} else {
this.gc_timer_state = .pending;
}
@@ -648,18 +669,18 @@ pub const GarbageCollectionController = struct {
},
.pending => {
if (this_heap_size != prev) {
this.updateGCRepeatTimer(.fast);
const ts = this.updateGCRepeatTimer(null, .fast);
if (this_heap_size > prev * 2) {
this.performGC();
} else {
this.scheduleGCTimer();
this.scheduleGCTimer(&(ts orelse bun.timespec.now()));
}
}
},
.scheduled => {
if (this_heap_size > prev * 2) {
this.updateGCRepeatTimer(.fast);
_ = this.updateGCRepeatTimer(null, .fast);
this.performGC();
}
},
@@ -774,7 +795,6 @@ pub const EventLoop = struct {
global: *JSGlobalObject = undefined,
virtual_machine: *JSC.VirtualMachine = undefined,
waker: ?Waker = null,
forever_timer: ?*uws.Timer = null,
deferred_tasks: DeferredTaskQueue = .{},
uws_loop: if (Environment.isWindows) ?*uws.Loop else void = if (Environment.isWindows) null else {},
@@ -838,7 +858,6 @@ pub const EventLoop = struct {
}
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
const log = bun.Output.scoped(.EventLoop, false);
pub fn tickWhilePaused(this: *EventLoop, done: *bool) void {
while (!done.*) {
@@ -891,6 +910,343 @@ pub const EventLoop = struct {
return result;
}
pub fn runTask(this: *EventLoop, task: Task, global: *JSC.JSGlobalObject, jsc_vm: *JSC.VM, virtual_machine: *JSC.VirtualMachine) bool {
switch (task.tag()) {
@field(Task.Tag, typeBaseName(@typeName(ShellAsync))) => {
var shell_ls_task: *ShellAsync = task.get(ShellAsync).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellAsyncSubprocessDone))) => {
var shell_ls_task: *ShellAsyncSubprocessDone = task.get(ShellAsyncSubprocessDone).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellIOWriterAsyncDeinit))) => {
var shell_ls_task: *ShellIOWriterAsyncDeinit = task.get(ShellIOWriterAsyncDeinit).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellIOReaderAsyncDeinit))) => {
var shell_ls_task: *ShellIOReaderAsyncDeinit = task.get(ShellIOReaderAsyncDeinit).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellCondExprStatTask))) => {
var shell_ls_task: *ShellCondExprStatTask = task.get(ShellCondExprStatTask).?;
shell_ls_task.task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellCpTask))) => {
var shell_ls_task: *ShellCpTask = task.get(ShellCpTask).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellTouchTask))) => {
var shell_ls_task: *ShellTouchTask = task.get(ShellTouchTask).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellMkdirTask))) => {
var shell_ls_task: *ShellMkdirTask = task.get(ShellMkdirTask).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellLsTask))) => {
var shell_ls_task: *ShellLsTask = task.get(ShellLsTask).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellMvBatchedTask))) => {
var shell_mv_batched_task: *ShellMvBatchedTask = task.get(ShellMvBatchedTask).?;
shell_mv_batched_task.task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellMvCheckTargetTask))) => {
var shell_mv_check_target_task: *ShellMvCheckTargetTask = task.get(ShellMvCheckTargetTask).?;
shell_mv_check_target_task.task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellRmTask))) => {
var shell_rm_task: *ShellRmTask = task.get(ShellRmTask).?;
shell_rm_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellRmDirTask))) => {
var shell_rm_task: *ShellRmDirTask = task.get(ShellRmDirTask).?;
shell_rm_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellGlobTask))) => {
var shell_glob_task: *ShellGlobTask = task.get(ShellGlobTask).?;
shell_glob_task.runFromMainThread();
shell_glob_task.deinit();
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onProgressUpdate();
},
@field(Task.Tag, @typeName(AsyncGlobWalkTask)) => {
var globWalkTask: *AsyncGlobWalkTask = task.get(AsyncGlobWalkTask).?;
globWalkTask.*.runFromJS();
globWalkTask.deinit();
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => {
const transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
transform_task.*.runFromJS();
},
.ThreadSafeFunction => {
var transform_task: *ThreadSafeFunction = task.as(ThreadSafeFunction);
transform_task.onDispatch();
},
@field(Task.Tag, @typeName(ReadFileTask)) => {
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, bun.meta.typeBaseName(@typeName(JSCDeferredWorkTask))) => {
var jsc_task: *JSCDeferredWorkTask = task.get(JSCDeferredWorkTask).?;
JSC.markBinding(@src());
jsc_task.run();
},
@field(Task.Tag, @typeName(WriteFileTask)) => {
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, @typeName(HotReloadTask)) => {
const transform_task: *HotReloadTask = task.get(HotReloadTask).?;
transform_task.run();
transform_task.deinit();
// special case: we return
return false;
},
@field(Task.Tag, typeBaseName(@typeName(bun.bake.DevServer.HotReloadEvent))) => {
const hmr_task: *bun.bake.DevServer.HotReloadEvent = task.get(bun.bake.DevServer.HotReloadEvent).?;
hmr_task.run();
},
@field(Task.Tag, typeBaseName(@typeName(FSWatchTask))) => {
var transform_task: *FSWatchTask = task.get(FSWatchTask).?;
transform_task.*.run();
transform_task.deinit();
},
@field(Task.Tag, typeBaseName(@typeName(AnyTask))) => {
var any: *AnyTask = task.get(AnyTask).?;
any.run();
},
@field(Task.Tag, typeBaseName(@typeName(ManagedTask))) => {
var any: *ManagedTask = task.get(ManagedTask).?;
any.run();
},
@field(Task.Tag, typeBaseName(@typeName(CppTask))) => {
var any: *CppTask = task.get(CppTask).?;
any.run(global);
},
@field(Task.Tag, typeBaseName(@typeName(PollPendingModulesTask))) => {
virtual_machine.modules.onPoll();
},
@field(Task.Tag, typeBaseName(@typeName(GetAddrInfoRequestTask))) => {
if (Environment.os == .windows) @panic("This should not be reachable on Windows");
var any: *GetAddrInfoRequestTask = task.get(GetAddrInfoRequestTask).?;
any.runFromJS();
any.deinit();
},
@field(Task.Tag, typeBaseName(@typeName(Stat))) => {
var any: *Stat = task.get(Stat).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Lstat))) => {
var any: *Lstat = task.get(Lstat).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fstat))) => {
var any: *Fstat = task.get(Fstat).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Open))) => {
var any: *Open = task.get(Open).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(ReadFile))) => {
var any: *ReadFile = task.get(ReadFile).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(WriteFile))) => {
var any: *WriteFile = task.get(WriteFile).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(CopyFile))) => {
var any: *CopyFile = task.get(CopyFile).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Read))) => {
var any: *Read = task.get(Read).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Write))) => {
var any: *Write = task.get(Write).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Truncate))) => {
var any: *Truncate = task.get(Truncate).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Writev))) => {
var any: *Writev = task.get(Writev).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Readv))) => {
var any: *Readv = task.get(Readv).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Rename))) => {
var any: *Rename = task.get(Rename).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(FTruncate))) => {
var any: *FTruncate = task.get(FTruncate).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Readdir))) => {
var any: *Readdir = task.get(Readdir).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(ReaddirRecursive))) => {
var any: *ReaddirRecursive = task.get(ReaddirRecursive).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Close))) => {
var any: *Close = task.get(Close).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Rm))) => {
var any: *Rm = task.get(Rm).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Rmdir))) => {
var any: *Rmdir = task.get(Rmdir).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Chown))) => {
var any: *Chown = task.get(Chown).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(FChown))) => {
var any: *FChown = task.get(FChown).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Utimes))) => {
var any: *Utimes = task.get(Utimes).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Lutimes))) => {
var any: *Lutimes = task.get(Lutimes).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Chmod))) => {
var any: *Chmod = task.get(Chmod).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fchmod))) => {
var any: *Fchmod = task.get(Fchmod).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Link))) => {
var any: *Link = task.get(Link).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Symlink))) => {
var any: *Symlink = task.get(Symlink).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Readlink))) => {
var any: *Readlink = task.get(Readlink).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Realpath))) => {
var any: *Realpath = task.get(Realpath).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Mkdir))) => {
var any: *Mkdir = task.get(Mkdir).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fsync))) => {
var any: *Fsync = task.get(Fsync).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fdatasync))) => {
var any: *Fdatasync = task.get(Fdatasync).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Access))) => {
var any: *Access = task.get(Access).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(AppendFile))) => {
var any: *AppendFile = task.get(AppendFile).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Mkdtemp))) => {
var any: *Mkdtemp = task.get(Mkdtemp).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Exists))) => {
var any: *Exists = task.get(Exists).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Futimes))) => {
var any: *Futimes = task.get(Futimes).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Lchmod))) => {
var any: *Lchmod = task.get(Lchmod).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Lchown))) => {
var any: *Lchown = task.get(Lchown).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Unlink))) => {
var any: *Unlink = task.get(Unlink).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(NativeZlib))) => {
var any: *NativeZlib = task.get(NativeZlib).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(NativeBrotli))) => {
var any: *NativeBrotli = task.get(NativeBrotli).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(ProcessWaiterThreadTask))) => {
bun.markPosixOnly();
var any: *ProcessWaiterThreadTask = task.get(ProcessWaiterThreadTask).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(RuntimeTranspilerStore))) => {
var any: *RuntimeTranspilerStore = task.get(RuntimeTranspilerStore).?;
any.drain();
},
@field(Task.Tag, typeBaseName(@typeName(TimerObject))) => {
var any: *TimerObject = task.get(TimerObject).?;
any.runImmediateTask(virtual_machine);
},
@field(Task.Tag, typeBaseName(@typeName(ServerAllConnectionsClosedTask))) => {
var any: *ServerAllConnectionsClosedTask = task.get(ServerAllConnectionsClosedTask).?;
any.runFromJSThread(virtual_machine);
},
@field(Task.Tag, typeBaseName(@typeName(bun.bundle_v2.DeferredBatchTask))) => {
var any: *bun.bundle_v2.DeferredBatchTask = task.get(bun.bundle_v2.DeferredBatchTask).?;
any.runOnJSThread();
},
else => {
bun.Output.panic("Unexpected tag: {s}", .{@tagName(task.tag())});
},
}
this.drainMicrotasksWithGlobal(global, jsc_vm);
return true;
}
fn tickQueueWithCount(this: *EventLoop, virtual_machine: *VirtualMachine, comptime queue_name: []const u8) u32 {
var global = this.global;
const global_vm = global.vm();
@@ -928,341 +1284,12 @@ pub const EventLoop = struct {
}
while (@field(this, queue_name).readItem()) |task| {
defer counter += 1;
switch (task.tag()) {
@field(Task.Tag, typeBaseName(@typeName(ShellAsync))) => {
var shell_ls_task: *ShellAsync = task.get(ShellAsync).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellAsyncSubprocessDone))) => {
var shell_ls_task: *ShellAsyncSubprocessDone = task.get(ShellAsyncSubprocessDone).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellIOWriterAsyncDeinit))) => {
var shell_ls_task: *ShellIOWriterAsyncDeinit = task.get(ShellIOWriterAsyncDeinit).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellIOReaderAsyncDeinit))) => {
var shell_ls_task: *ShellIOReaderAsyncDeinit = task.get(ShellIOReaderAsyncDeinit).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellCondExprStatTask))) => {
var shell_ls_task: *ShellCondExprStatTask = task.get(ShellCondExprStatTask).?;
shell_ls_task.task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellCpTask))) => {
var shell_ls_task: *ShellCpTask = task.get(ShellCpTask).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellTouchTask))) => {
var shell_ls_task: *ShellTouchTask = task.get(ShellTouchTask).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellMkdirTask))) => {
var shell_ls_task: *ShellMkdirTask = task.get(ShellMkdirTask).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellLsTask))) => {
var shell_ls_task: *ShellLsTask = task.get(ShellLsTask).?;
shell_ls_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellMvBatchedTask))) => {
var shell_mv_batched_task: *ShellMvBatchedTask = task.get(ShellMvBatchedTask).?;
shell_mv_batched_task.task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellMvCheckTargetTask))) => {
var shell_mv_check_target_task: *ShellMvCheckTargetTask = task.get(ShellMvCheckTargetTask).?;
shell_mv_check_target_task.task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellRmTask))) => {
var shell_rm_task: *ShellRmTask = task.get(ShellRmTask).?;
shell_rm_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellRmDirTask))) => {
var shell_rm_task: *ShellRmDirTask = task.get(ShellRmDirTask).?;
shell_rm_task.runFromMainThread();
},
@field(Task.Tag, typeBaseName(@typeName(ShellGlobTask))) => {
var shell_glob_task: *ShellGlobTask = task.get(ShellGlobTask).?;
shell_glob_task.runFromMainThread();
shell_glob_task.deinit();
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onProgressUpdate();
},
@field(Task.Tag, @typeName(AsyncGlobWalkTask)) => {
var globWalkTask: *AsyncGlobWalkTask = task.get(AsyncGlobWalkTask).?;
globWalkTask.*.runFromJS();
globWalkTask.deinit();
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => {
const transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
transform_task.*.runFromJS();
},
.ThreadSafeFunction => {
var transform_task: *ThreadSafeFunction = task.as(ThreadSafeFunction);
transform_task.onDispatch();
},
@field(Task.Tag, @typeName(ReadFileTask)) => {
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, bun.meta.typeBaseName(@typeName(JSCDeferredWorkTask))) => {
var jsc_task: *JSCDeferredWorkTask = task.get(JSCDeferredWorkTask).?;
JSC.markBinding(@src());
jsc_task.run();
},
@field(Task.Tag, @typeName(WriteFileTask)) => {
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
},
@field(Task.Tag, @typeName(HotReloadTask)) => {
const transform_task: *HotReloadTask = task.get(HotReloadTask).?;
transform_task.run();
transform_task.deinit();
// special case: we return
return 0;
},
@field(Task.Tag, typeBaseName(@typeName(bun.bake.DevServer.HotReloadEvent))) => {
const hmr_task: *bun.bake.DevServer.HotReloadEvent = task.get(bun.bake.DevServer.HotReloadEvent).?;
hmr_task.run();
},
@field(Task.Tag, typeBaseName(@typeName(FSWatchTask))) => {
var transform_task: *FSWatchTask = task.get(FSWatchTask).?;
transform_task.*.run();
transform_task.deinit();
},
@field(Task.Tag, typeBaseName(@typeName(AnyTask))) => {
var any: *AnyTask = task.get(AnyTask).?;
any.run();
},
@field(Task.Tag, typeBaseName(@typeName(ManagedTask))) => {
var any: *ManagedTask = task.get(ManagedTask).?;
any.run();
},
@field(Task.Tag, typeBaseName(@typeName(CppTask))) => {
var any: *CppTask = task.get(CppTask).?;
any.run(global);
},
@field(Task.Tag, typeBaseName(@typeName(PollPendingModulesTask))) => {
virtual_machine.modules.onPoll();
},
@field(Task.Tag, typeBaseName(@typeName(GetAddrInfoRequestTask))) => {
if (Environment.os == .windows) @panic("This should not be reachable on Windows");
var any: *GetAddrInfoRequestTask = task.get(GetAddrInfoRequestTask).?;
any.runFromJS();
any.deinit();
},
@field(Task.Tag, typeBaseName(@typeName(Stat))) => {
var any: *Stat = task.get(Stat).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Lstat))) => {
var any: *Lstat = task.get(Lstat).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fstat))) => {
var any: *Fstat = task.get(Fstat).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Open))) => {
var any: *Open = task.get(Open).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(ReadFile))) => {
var any: *ReadFile = task.get(ReadFile).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(WriteFile))) => {
var any: *WriteFile = task.get(WriteFile).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(CopyFile))) => {
var any: *CopyFile = task.get(CopyFile).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Read))) => {
var any: *Read = task.get(Read).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Write))) => {
var any: *Write = task.get(Write).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Truncate))) => {
var any: *Truncate = task.get(Truncate).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Writev))) => {
var any: *Writev = task.get(Writev).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Readv))) => {
var any: *Readv = task.get(Readv).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Rename))) => {
var any: *Rename = task.get(Rename).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(FTruncate))) => {
var any: *FTruncate = task.get(FTruncate).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Readdir))) => {
var any: *Readdir = task.get(Readdir).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(ReaddirRecursive))) => {
var any: *ReaddirRecursive = task.get(ReaddirRecursive).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Close))) => {
var any: *Close = task.get(Close).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Rm))) => {
var any: *Rm = task.get(Rm).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Rmdir))) => {
var any: *Rmdir = task.get(Rmdir).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Chown))) => {
var any: *Chown = task.get(Chown).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(FChown))) => {
var any: *FChown = task.get(FChown).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Utimes))) => {
var any: *Utimes = task.get(Utimes).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Lutimes))) => {
var any: *Lutimes = task.get(Lutimes).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Chmod))) => {
var any: *Chmod = task.get(Chmod).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fchmod))) => {
var any: *Fchmod = task.get(Fchmod).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Link))) => {
var any: *Link = task.get(Link).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Symlink))) => {
var any: *Symlink = task.get(Symlink).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Readlink))) => {
var any: *Readlink = task.get(Readlink).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Realpath))) => {
var any: *Realpath = task.get(Realpath).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Mkdir))) => {
var any: *Mkdir = task.get(Mkdir).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fsync))) => {
var any: *Fsync = task.get(Fsync).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fdatasync))) => {
var any: *Fdatasync = task.get(Fdatasync).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Access))) => {
var any: *Access = task.get(Access).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(AppendFile))) => {
var any: *AppendFile = task.get(AppendFile).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Mkdtemp))) => {
var any: *Mkdtemp = task.get(Mkdtemp).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Exists))) => {
var any: *Exists = task.get(Exists).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Futimes))) => {
var any: *Futimes = task.get(Futimes).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Lchmod))) => {
var any: *Lchmod = task.get(Lchmod).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Lchown))) => {
var any: *Lchown = task.get(Lchown).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Unlink))) => {
var any: *Unlink = task.get(Unlink).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(NativeZlib))) => {
var any: *NativeZlib = task.get(NativeZlib).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(NativeBrotli))) => {
var any: *NativeBrotli = task.get(NativeBrotli).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(ProcessWaiterThreadTask))) => {
bun.markPosixOnly();
var any: *ProcessWaiterThreadTask = task.get(ProcessWaiterThreadTask).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(RuntimeTranspilerStore))) => {
var any: *RuntimeTranspilerStore = task.get(RuntimeTranspilerStore).?;
any.drain();
},
@field(Task.Tag, typeBaseName(@typeName(TimerObject))) => {
var any: *TimerObject = task.get(TimerObject).?;
any.runImmediateTask(virtual_machine);
},
@field(Task.Tag, typeBaseName(@typeName(ServerAllConnectionsClosedTask))) => {
var any: *ServerAllConnectionsClosedTask = task.get(ServerAllConnectionsClosedTask).?;
any.runFromJSThread(virtual_machine);
},
@field(Task.Tag, typeBaseName(@typeName(bun.bundle_v2.DeferredBatchTask))) => {
var any: *bun.bundle_v2.DeferredBatchTask = task.get(bun.bundle_v2.DeferredBatchTask).?;
any.runOnJSThread();
},
else => {
bun.Output.panic("Unexpected tag: {s}", .{@tagName(task.tag())});
},
if (!this.runTask(task, global, global_vm, virtual_machine)) {
// Yield when hot reloading.
return 0;
}
this.drainMicrotasksWithGlobal(global, global_vm);
counter += 1;
}
@field(this, queue_name).head = if (@field(this, queue_name).count == 0) 0 else @field(this, queue_name).head;
@@ -1430,18 +1457,14 @@ pub const EventLoop = struct {
}
}
this.processGCTimer();
if (!loop.isActive()) {
if (this.forever_timer == null) {
var t = uws.Timer.create(loop, this);
t.set(this, &noopForeverTimer, 1000 * 60 * 4, 1000 * 60 * 4);
this.forever_timer = t;
}
const timespec: bun.timespec = bun.timespec.msFromNow(std.time.ms_per_week);
loop.tickWithTimeout(&timespec);
} else {
loop.tick();
}
this.processGCTimer();
this.processGCTimer();
loop.tick();
ctx.onAfterEventLoop();
this.tickConcurrent();
this.tick();
@@ -1561,18 +1584,31 @@ pub const EventLoop = struct {
this.next_immediate_tasks.writeItem(task) catch unreachable;
}
pub const TimeoutTask = struct {
task: Task,
event_loop_timer: EventLoopTimer = .{
.tag = .TimeoutTask,
},
pub usingnamespace bun.New(@This());
pub fn create(vm: *JSC.VirtualMachine, task: Task, timeout: i32) void {
const this = TimeoutTask.new(.{
.task = task,
});
this.event_loop_timer.set(vm, timeout);
}
pub fn schedule(this: *TimeoutTask, vm: *JSC.VirtualMachine) void {
const task = this.task;
this.destroy();
_ = vm.event_loop.runTask(task, vm.global, vm.jsc, vm);
}
};
pub fn enqueueTaskWithTimeout(this: *EventLoop, task: Task, timeout: i32) void {
// TODO: make this more efficient!
const loop = this.virtual_machine.uwsLoop();
var timer = uws.Timer.createFallthrough(loop, task.ptr());
timer.set(task.ptr(), callTask, timeout, 0);
}
pub fn callTask(timer: *uws.Timer) callconv(.C) void {
const task = Task.from(timer.as(*anyopaque));
defer timer.deinit(true);
JSC.VirtualMachine.get().enqueueTask(task);
TimeoutTask.create(this.virtual_machine, task, timeout);
}
pub fn ensureWaker(this: *EventLoop) void {
@@ -2293,3 +2329,5 @@ pub const EventLoopTaskPtr = union {
js: *ConcurrentTask,
mini: *JSC.AnyTaskWithExtraContext,
};
pub const EventLoopTimer = @import("./api/Timer.zig").EventLoopTimer;

View File

@@ -443,10 +443,6 @@ pub const StatWatcher = struct {
) catch |err| this.globalThis.reportActiveExceptionAsUnhandled(err);
}
pub fn onTimerInterval(timer: *uws.Timer) callconv(.C) void {
timer.ext(StatWatcher).?.restat();
}
pub fn init(args: Arguments) !*StatWatcher {
log("init", .{});

View File

@@ -1631,8 +1631,6 @@ pub const TestCommand = struct {
vm_.runWithAPILock(Context, &ctx, Context.begin);
}
fn timerNoop(_: *uws.Timer) callconv(.C) void {}
pub fn run(
reporter: *CommandLineReporter,
vm: *JSC.VirtualMachine,

View File

@@ -45,7 +45,9 @@ fn NativeSocketHandleType(comptime ssl: bool) type {
pub const InternalLoopData = extern struct {
pub const us_internal_async = opaque {};
sweep_timer: ?*Timer,
/// Do not use uws.Timer in any new code.
sweep_timer: ?*anyopaque,
wakeup_async: ?*us_internal_async,
last_write_failed: i32,
head: ?*SocketContext,
@@ -2305,47 +2307,6 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type {
pub const SocketTCP = NewSocketHandler(false);
pub const SocketTLS = NewSocketHandler(true);
pub const Timer = opaque {
pub fn create(loop: *Loop, ptr: anytype) *Timer {
const Type = @TypeOf(ptr);
// never fallthrough poll
// the problem is uSockets hardcodes it on the other end
// so we can never free non-fallthrough polls
return us_create_timer(loop, 0, @sizeOf(Type)) orelse std.debug.panic("us_create_timer: returned null: {d}", .{std.c._errno().*});
}
pub fn createFallthrough(loop: *Loop, ptr: anytype) *Timer {
const Type = @TypeOf(ptr);
// never fallthrough poll
// the problem is uSockets hardcodes it on the other end
// so we can never free non-fallthrough polls
return us_create_timer(loop, 1, @sizeOf(Type)) orelse std.debug.panic("us_create_timer: returned null: {d}", .{std.c._errno().*});
}
pub fn set(this: *Timer, ptr: anytype, cb: ?*const fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void {
us_timer_set(this, cb, ms, repeat_ms);
const value_ptr = us_timer_ext(this);
@setRuntimeSafety(false);
@as(*@TypeOf(ptr), @ptrCast(@alignCast(value_ptr))).* = ptr;
}
pub fn deinit(this: *Timer, comptime fallthrough: bool) void {
debug("Timer.deinit()", .{});
us_timer_close(this, @intFromBool(fallthrough));
}
pub fn ext(this: *Timer, comptime Type: type) ?*Type {
return @as(*Type, @ptrCast(@alignCast(us_timer_ext(this).*.?)));
}
pub fn as(this: *Timer, comptime Type: type) Type {
@setRuntimeSafety(false);
return @as(*?Type, @ptrCast(@alignCast(us_timer_ext(this)))).*.?;
}
};
pub const SocketContext = opaque {
pub fn getNativeHandle(this: *SocketContext, comptime ssl: bool) *anyopaque {
return us_socket_context_get_native_handle(@intFromBool(ssl), this).?;
@@ -2606,11 +2567,6 @@ pub const PosixLoop = extern struct {
extern fn uws_loop_defer(loop: *Loop, ctx: *anyopaque, cb: *const (fn (ctx: *anyopaque) callconv(.C) void)) void;
extern fn us_create_timer(loop: ?*Loop, fallthrough: i32, ext_size: c_uint) ?*Timer;
extern fn us_timer_ext(timer: ?*Timer) *?*anyopaque;
extern fn us_timer_close(timer: ?*Timer, fallthrough: i32) void;
extern fn us_timer_set(timer: ?*Timer, cb: ?*const fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void;
extern fn us_timer_loop(t: ?*Timer) ?*Loop;
pub const us_socket_context_options_t = extern struct {
key_file_name: [*c]const u8 = null,
cert_file_name: [*c]const u8 = null,
@@ -4239,6 +4195,7 @@ pub const WindowsLoop = extern struct {
is_default: c_int,
pre: *uv.uv_prepare_t,
check: *uv.uv_check_t,
tick_with_timeout_timer: ?*uv.Timer = null,
pub fn get() *WindowsLoop {
return uws_get_loop_with_native(bun.windows.libuv.Loop.get());
@@ -4268,8 +4225,30 @@ pub const WindowsLoop = extern struct {
pub const wake = wakeup;
pub fn tickWithTimeout(this: *WindowsLoop, _: ?*const bun.timespec) void {
fn timeoutTimer(this: *WindowsLoop) ?*uv.Timer {
return this.tick_with_timeout_timer orelse {
this.tick_with_timeout_timer = bun.create(bun.default_allocator, uv.Timer, std.mem.zeroes(uv.Timer));
this.tick_with_timeout_timer.?.init(this.uv_loop);
return this.tick_with_timeout_timer.?;
};
}
fn onTimeout(_: *uv.Timer) callconv(.C) void {}
pub fn tickWithTimeout(this: *WindowsLoop, timeout: ?*const bun.timespec) void {
var did_schedule_timeout_timer = false;
if (timeout) |t| {
if (this.timeoutTimer()) |timer| {
timer.start(t.ms(), 0, onTimeout);
timer.ref();
did_schedule_timeout_timer = true;
}
}
us_loop_run(this);
if (did_schedule_timeout_timer) {
this.tick_with_timeout_timer.?.unref();
}
}
pub fn tickWithoutIdle(this: *WindowsLoop) void {

View File

@@ -49,8 +49,14 @@ const TaggedPointerUnion = @import("./tagged_pointer.zig").TaggedPointerUnion;
const DeadSocket = opaque {};
var dead_socket = @as(*DeadSocket, @ptrFromInt(1));
//TODO: this needs to be freed when Worker Threads are implemented
var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSocket).init(bun.default_allocator);
var async_http_id: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
var socket_async_http_abort_tracker = std.AutoHashMap(u32, uws.InternalSocket).init(bun.default_allocator);
const AsyncHTTPID = u31;
fn nextAsyncHttpID() AsyncHTTPID {
const container = struct {
pub var async_http_id: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
};
return @truncate(container.async_http_id.fetchAdd(1, .monotonic) % std.math.maxInt(u31));
}
const MAX_REDIRECT_URL_LENGTH = 128 * 1024;
var custom_ssl_context_map = std.AutoArrayHashMap(*SSLConfig, *NewHTTPContext(true)).init(bun.default_allocator);
@@ -1052,18 +1058,19 @@ pub const HTTPThread = struct {
lazy_libdeflater: ?*LibdeflateState = null,
const threadlog = Output.scoped(.HTTPThread, true);
const ShutdownMessage = packed struct(u32) {
async_http_id: AsyncHTTPID,
is_tls: bool,
};
const WriteMessage = struct {
data: []const u8,
async_http_id: u32,
async_http_id: AsyncHTTPID,
flags: packed struct {
is_tls: bool,
ended: bool,
},
};
const ShutdownMessage = struct {
async_http_id: u32,
is_tls: bool,
};
pub const LibdeflateState = struct {
decompressor: *bun.libdeflate.Decompressor = undefined,
@@ -1230,10 +1237,17 @@ pub const HTTPThread = struct {
fn drainEvents(this: *@This()) void {
{
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
for (this.queued_shutdowns.items) |http| {
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
var shutdowns = brk: {
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
const shutdowns = this.queued_shutdowns;
this.queued_shutdowns = .{};
break :brk shutdowns;
};
defer shutdowns.deinit(bun.default_allocator);
for (shutdowns.items) |http| {
if (socket_async_http_abort_tracker.fetchRemove(http.async_http_id)) |socket_ptr| {
if (http.is_tls) {
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
// do a fast shutdown here since we are aborting and we dont want to wait for the close_notify from the other side
@@ -1244,7 +1258,6 @@ pub const HTTPThread = struct {
}
}
}
this.queued_shutdowns.clearRetainingCapacity();
}
{
this.queued_writes_lock.lock();
@@ -1506,6 +1519,7 @@ fn registerAbortTracker(
socket: NewHTTPContext(is_ssl).HTTPSocket,
) void {
if (client.signals.aborted != null) {
client.state.flags.did_register_abort_tracker = true;
socket_async_http_abort_tracker.put(client.async_http_id, socket.socket) catch unreachable;
}
}
@@ -1514,7 +1528,8 @@ fn unregisterAbortTracker(
client: *HTTPClient,
) void {
if (client.signals.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(client.async_http_id);
client.state.flags.did_register_abort_tracker = false;
_ = socket_async_http_abort_tracker.remove(client.async_http_id);
}
}
@@ -1924,6 +1939,10 @@ pub const InternalState = struct {
is_redirect_pending: bool = false,
is_libdeflate_fast_path_disabled: bool = false,
resend_request_body_on_redirect: bool = false,
/// Attempt to register to receive AbortSignal while the socket has not connected yet.
/// It's fine for this to be wrong sometimes.
did_register_abort_tracker: bool = false,
};
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
@@ -2149,7 +2168,7 @@ http_proxy: ?URL = null,
proxy_authorization: ?[]u8 = null,
proxy_tunnel: ?*ProxyTunnel = null,
signals: Signals = .{},
async_http_id: u32 = 0,
async_http_id: AsyncHTTPID = 0,
hostname: ?[]u8 = null,
unix_socket_path: JSC.ZigString.Slice = JSC.ZigString.Slice.empty,
@@ -2320,7 +2339,7 @@ pub const AsyncHTTP = struct {
waitingDeffered: bool = false,
finalized: bool = false,
err: ?anyerror = null,
async_http_id: u32 = 0,
async_http_id: AsyncHTTPID = 0,
state: AtomicState = AtomicState.init(State.pending),
elapsed: u64 = 0,
@@ -2468,7 +2487,7 @@ pub const AsyncHTTP = struct {
.result_callback = callback,
.http_proxy = options.http_proxy,
.signals = options.signals orelse .{},
.async_http_id = if (options.signals != null and options.signals.?.aborted != null) async_http_id.fetchAdd(1, .monotonic) else 0,
.async_http_id = if (options.signals != null and options.signals.?.aborted != null) nextAsyncHttpID() else 0,
};
this.client = .{
@@ -2721,7 +2740,9 @@ pub const AsyncHTTP = struct {
}
if (socket_async_http_abort_tracker.capacity() > 10_000 and socket_async_http_abort_tracker.count() < 100) {
socket_async_http_abort_tracker.shrinkAndFree(socket_async_http_abort_tracker.count());
const cloned = socket_async_http_abort_tracker.clone() catch bun.outOfMemory();
socket_async_http_abort_tracker.deinit();
socket_async_http_abort_tracker = cloned;
}
if (result.has_more) {
@@ -2991,6 +3012,15 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
this.fail(error.ConnectionClosed);
return;
}
// A connecting socket won't have called onOpen yet.
// So if you want to be able to call AbortSignal.timeout() on it, we need to
// manually register the abort tracker.
if (socket.socket == .connecting) {
if (!socket.isClosed() and !this.state.flags.did_register_abort_tracker) {
this.registerAbortTracker(is_ssl, socket);
}
}
}
const Task = ThreadPool.Task;

View File

@@ -1,23 +1,53 @@
import type { Server } from "bun";
for (let beforeHeaders of [true, false]) {
for (let abortTimeout of [1, 2, 0]) {
const count = 100;
let defer = Promise.withResolvers();
const log = `[${beforeHeaders ? "beforeHeaders" : "afterHeaders"}] ${abortTimeout} timeout`;
console.time(log);
using server = Bun.serve({
port: 0,
idleTimeout: 0,
development: false,
using server = Bun.serve({
port: 0,
async fetch() {
const signal = AbortSignal.timeout(1);
return await fetch("https://example.com", { signal });
},
});
async fetch() {
if (beforeHeaders) {
await defer.promise;
throw new Error("Never going to happen");
} else {
return new Response(
new ReadableStream({
async pull(controller) {
controller.enqueue("a");
await defer.promise;
},
}),
);
}
},
});
function hostname(server: Server) {
if (server.hostname.startsWith(":")) return `[${server.hostname}]`;
return server.hostname;
let responses = new Array(count);
for (let i = 0; i < count; i++) {
const defer2 = Promise.withResolvers();
fetch(server.url, { signal: AbortSignal.timeout(abortTimeout) })
.then(response => {
if (beforeHeaders) {
defer2.reject(new Error("One of the requests succeeded"));
} else {
return response.arrayBuffer();
}
})
.catch(err => {
if (err.name !== "TimeoutError") {
defer2.reject(err);
} else {
defer2.resolve();
}
});
responses[i] = defer2.promise;
}
await Promise.all(responses);
console.timeEnd(log);
}
}
let url = `http://${hostname(server)}:${server.port}/`;
const responses: Response[] = [];
for (let i = 0; i < 10; i++) {
responses.push(await fetch(url));
}
// we fail if any of the requests succeeded
process.exit(responses.every(res => res.status === 500) ? 0 : 1);

View File

@@ -28,16 +28,7 @@ describe("AbortSignal", () => {
cwd: tmpdir(),
});
const exitCode = await Promise.race([
server.exited,
(async () => {
await Bun.sleep(5000);
server.kill();
return 2;
})(),
]);
expect(exitCode).toBe(0);
expect(await server.exited).toBe(0);
});
test("AbortSignal.any() should fire abort event", async () => {

View File

@@ -110,15 +110,11 @@ describe("AbortSignal", () => {
expect(() => AbortSignal.timeout(timeout)).toThrow(TypeError);
});
}
// FIXME: test runner hangs when this is enabled
test.skip("timeout works", done => {
test("timeout works", done => {
const abort = AbortSignal.timeout(1);
abort.addEventListener("abort", event => {
done();
});
// AbortSignal.timeout doesn't keep the event loop / process alive
// so we set a no-op timeout
setTimeout(() => {}, 10);
});
});
describe("prototype", () => {

View File

@@ -0,0 +1,26 @@
import { test, expect } from "bun:test";
for (let timeout of [1, 2, 0]) {
test(`AbortSignal.timeout(${timeout})`, async () => {
const count = 10_000;
const promises = new Array(count);
const signals = new Array(count);
console.time("[" + count + "x] " + "AbortSignal.timeout(" + timeout + ")");
for (let i = 0; i < count; i++) {
const signal = AbortSignal.timeout(timeout);
const { promise, resolve, reject } = Promise.withResolvers();
promises[i] = promise;
signals[i] = signal;
signal.addEventListener("abort", () => {
resolve();
});
}
console.timeEnd("[" + count + "x] " + "AbortSignal.timeout(" + timeout + ")");
console.time("[" + count + "x] " + "await Promise.all(promises)");
await Promise.all(promises);
console.timeEnd("[" + count + "x] " + "await Promise.all(promises)");
});
}