mirror of
https://github.com/oven-sh/bun
synced 2026-02-16 22:01:47 +00:00
Compare commits
17 Commits
bun-v1.3.1
...
jarred/cha
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d78d45ed8 | ||
|
|
b224ebadb8 | ||
|
|
fc6eb9959b | ||
|
|
6553052a32 | ||
|
|
cd4a7caf37 | ||
|
|
318cefb211 | ||
|
|
c69912e57a | ||
|
|
0d06f2d32b | ||
|
|
3cbd73176f | ||
|
|
35d16bea36 | ||
|
|
5e191eaf3a | ||
|
|
a91f59b9a2 | ||
|
|
6e7362c241 | ||
|
|
b5bed9ec05 | ||
|
|
8869ddafba | ||
|
|
f8e87e34d6 | ||
|
|
712c5bc80e |
@@ -1,7 +1,8 @@
|
||||
var i = 0;
|
||||
export default {
|
||||
port: 3002,
|
||||
fetch(req) {
|
||||
if (i++ === 200_000 - 1) queueMicrotask(() => process.exit(0));
|
||||
if (i++ === 200_000 - 1) setTimeout(() => process.exit(0), 0);
|
||||
return new Response("Hello, World!" + i);
|
||||
},
|
||||
};
|
||||
|
||||
@@ -32,6 +32,7 @@ struct us_loop_t {
|
||||
|
||||
uv_prepare_t *uv_pre;
|
||||
uv_check_t *uv_check;
|
||||
uv_timer_t *uv_tick_with_timeout_timer;
|
||||
};
|
||||
|
||||
// it is no longer valid to cast a pointer to us_poll_t to a pointer of
|
||||
|
||||
@@ -130,33 +130,13 @@ pub const All = struct {
|
||||
}
|
||||
|
||||
pub fn getTimeout(this: *const All, spec: *timespec) bool {
|
||||
if (this.active_timer_count == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this.timers.peek()) |min| {
|
||||
const now = timespec.now();
|
||||
switch (now.order(&min.next)) {
|
||||
.gt, .eq => {
|
||||
spec.* = .{ .nsec = 0, .sec = 0 };
|
||||
return true;
|
||||
},
|
||||
.lt => {
|
||||
spec.* = min.next.duration(&now);
|
||||
return true;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export fn Bun__internal_drainTimers(vm: *VirtualMachine) callconv(.C) void {
|
||||
drainTimers(&vm.timer, vm);
|
||||
}
|
||||
|
||||
comptime {
|
||||
_ = &Bun__internal_drainTimers;
|
||||
const min = this.timers.peek() orelse return false;
|
||||
const now = timespec.now();
|
||||
spec.* = switch (now.order(&min.next)) {
|
||||
.gt, .eq => .{ .nsec = 0, .sec = 0 },
|
||||
.lt => min.next.duration(&now),
|
||||
};
|
||||
return true;
|
||||
}
|
||||
|
||||
pub fn drainTimers(this: *All, vm: *VirtualMachine) void {
|
||||
@@ -715,7 +695,7 @@ const heap = bun.io.heap;
|
||||
|
||||
pub const EventLoopTimer = struct {
|
||||
/// The absolute time to fire this timer next.
|
||||
next: timespec,
|
||||
next: timespec = .{},
|
||||
|
||||
/// Internal heap fields.
|
||||
heap: heap.IntrusiveField(EventLoopTimer) = .{},
|
||||
@@ -724,6 +704,44 @@ pub const EventLoopTimer = struct {
|
||||
|
||||
tag: Tag = .TimerCallback,
|
||||
|
||||
pub fn cancel(this: *@This(), vm: *JSC.VirtualMachine) void {
|
||||
if (this.state == .ACTIVE) {
|
||||
vm.timer.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(this: *@This(), vm: *JSC.VirtualMachine, interval: i32) void {
|
||||
// if the timer is active we need to remove it
|
||||
if (this.state == .ACTIVE) {
|
||||
vm.timer.remove(this);
|
||||
}
|
||||
|
||||
// if the interval is 0 means that we stop the timer
|
||||
if (interval == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// reschedule the timer
|
||||
this.next = bun.timespec.msFromNow(interval);
|
||||
vm.timer.insert(this);
|
||||
}
|
||||
|
||||
pub fn setWithTimespec(this: *@This(), vm: *JSC.VirtualMachine, next: *const timespec, interval: i32) void {
|
||||
// if the timer is active we need to remove it
|
||||
if (this.state == .ACTIVE) {
|
||||
vm.timer.remove(this);
|
||||
}
|
||||
|
||||
// if the interval is 0 means that we stop the timer
|
||||
if (interval == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// reschedule the timer
|
||||
this.next = next.addMs(interval);
|
||||
vm.timer.insert(this);
|
||||
}
|
||||
|
||||
pub const Tag = if (Environment.isWindows) enum {
|
||||
TimerCallback,
|
||||
TimerObject,
|
||||
@@ -731,7 +749,9 @@ pub const EventLoopTimer = struct {
|
||||
StatWatcherScheduler,
|
||||
UpgradedDuplex,
|
||||
WindowsNamedPipe,
|
||||
|
||||
GCTimer,
|
||||
GCRepeatingTimer,
|
||||
TimeoutTask,
|
||||
pub fn Type(comptime T: Tag) type {
|
||||
return switch (T) {
|
||||
.TimerCallback => TimerCallback,
|
||||
@@ -740,6 +760,9 @@ pub const EventLoopTimer = struct {
|
||||
.StatWatcherScheduler => StatWatcherScheduler,
|
||||
.UpgradedDuplex => uws.UpgradedDuplex,
|
||||
.WindowsNamedPipe => uws.WindowsNamedPipe,
|
||||
.GCTimer => JSC.GarbageCollectionController,
|
||||
.GCRepeatingTimer => JSC.GarbageCollectionController,
|
||||
.TimeoutTask => JSC.EventLoop.TimeoutTask,
|
||||
};
|
||||
}
|
||||
} else enum {
|
||||
@@ -748,7 +771,9 @@ pub const EventLoopTimer = struct {
|
||||
TestRunner,
|
||||
StatWatcherScheduler,
|
||||
UpgradedDuplex,
|
||||
|
||||
GCTimer,
|
||||
GCRepeatingTimer,
|
||||
TimeoutTask,
|
||||
pub fn Type(comptime T: Tag) type {
|
||||
return switch (T) {
|
||||
.TimerCallback => TimerCallback,
|
||||
@@ -756,6 +781,9 @@ pub const EventLoopTimer = struct {
|
||||
.TestRunner => JSC.Jest.TestRunner,
|
||||
.StatWatcherScheduler => StatWatcherScheduler,
|
||||
.UpgradedDuplex => uws.UpgradedDuplex,
|
||||
.GCTimer => JSC.GarbageCollectionController,
|
||||
.GCRepeatingTimer => JSC.GarbageCollectionController,
|
||||
.TimeoutTask => JSC.EventLoop.TimeoutTask,
|
||||
};
|
||||
}
|
||||
};
|
||||
@@ -808,8 +836,33 @@ pub const EventLoopTimer = struct {
|
||||
|
||||
pub fn fire(this: *EventLoopTimer, now: *const timespec, vm: *VirtualMachine) Arm {
|
||||
switch (this.tag) {
|
||||
.GCTimer => {
|
||||
const gc_controller: *JSC.GarbageCollectionController = @fieldParentPtr("gc_timer", this);
|
||||
this.state = .FIRED;
|
||||
gc_controller.onGCTimer();
|
||||
return .disarm;
|
||||
},
|
||||
.GCRepeatingTimer => {
|
||||
const gc_controller: *JSC.GarbageCollectionController = @fieldParentPtr("gc_repeating_timer", this);
|
||||
this.state = .FIRED;
|
||||
gc_controller.onGCRepeatingTimer(now);
|
||||
|
||||
if (this.state == .FIRED) {
|
||||
// It's repeating, so let's repeat it.
|
||||
this.next = now.addMs(gc_controller.repeatingTimeInterval());
|
||||
vm.timer.insert(this);
|
||||
}
|
||||
return .disarm;
|
||||
},
|
||||
.TimeoutTask => {
|
||||
const timeout_task: *JSC.EventLoop.TimeoutTask = @fieldParentPtr("event_loop_timer", this);
|
||||
this.state = .FIRED;
|
||||
timeout_task.schedule(vm);
|
||||
return .disarm;
|
||||
},
|
||||
inline else => |t| {
|
||||
var container: *t.Type() = @alignCast(@fieldParentPtr("event_loop_timer", this));
|
||||
|
||||
if (comptime t.Type() == TimerObject) {
|
||||
return container.fire(now, vm);
|
||||
}
|
||||
|
||||
@@ -72,12 +72,8 @@ Ref<AbortSignal> AbortSignal::timeout(ScriptExecutionContext& context, uint64_t
|
||||
signal->setHasActiveTimeoutTimer(false);
|
||||
};
|
||||
|
||||
if (milliseconds == 0) {
|
||||
// immediately write to task queue
|
||||
context.postTask(WTFMove(action));
|
||||
} else {
|
||||
context.postTaskOnTimeout(WTFMove(action), Seconds::fromMilliseconds(milliseconds));
|
||||
}
|
||||
// Act like setTimeout(0) and always tick the event loop.
|
||||
context.postTaskOnTimeout(WTFMove(action), Seconds::fromMilliseconds(milliseconds == 0 ? 1 : milliseconds));
|
||||
|
||||
return signal;
|
||||
}
|
||||
@@ -102,7 +98,7 @@ Ref<AbortSignal> AbortSignal::any(ScriptExecutionContext& context, const Vector<
|
||||
AbortSignal::AbortSignal(ScriptExecutionContext* context, Aborted aborted, JSC::JSValue reason)
|
||||
: ContextDestructionObserver(context)
|
||||
, m_reason(reason)
|
||||
, m_aborted(aborted == Aborted::Yes)
|
||||
, m_flags(aborted == Aborted::Yes ? static_cast<uint8_t>(Flags::Aborted) : static_cast<uint8_t>(Flags::None))
|
||||
{
|
||||
ASSERT(reason);
|
||||
}
|
||||
@@ -145,11 +141,11 @@ void AbortSignal::addDependentSignal(AbortSignal& signal)
|
||||
void AbortSignal::signalAbort(JSC::JSValue reason)
|
||||
{
|
||||
// 1. If signal's aborted flag is set, then return.
|
||||
if (m_aborted)
|
||||
if (aborted())
|
||||
return;
|
||||
|
||||
// 2. Set signal’s aborted flag.
|
||||
m_aborted = true;
|
||||
setFlag(Flags::Aborted);
|
||||
m_sourceSignals.clear();
|
||||
|
||||
// FIXME: This code is wrong: we should emit a write-barrier. Otherwise, GC can collect it.
|
||||
@@ -178,7 +174,7 @@ void AbortSignal::signalAbort(JSC::JSValue reason)
|
||||
void AbortSignal::signalAbort(JSC::JSGlobalObject* globalObject, CommonAbortReason reason)
|
||||
{
|
||||
// 1. If signal's aborted flag is set, then return.
|
||||
if (m_aborted)
|
||||
if (aborted())
|
||||
return;
|
||||
|
||||
m_commonReason = reason;
|
||||
@@ -218,7 +214,10 @@ void AbortSignal::signalFollow(AbortSignal& signal)
|
||||
|
||||
void AbortSignal::eventListenersDidChange()
|
||||
{
|
||||
m_hasAbortEventListener = hasEventListeners(eventNames().abortEvent);
|
||||
if (hasEventListeners(eventNames().abortEvent))
|
||||
setFlag(Flags::HasAbortEventListener);
|
||||
else
|
||||
clearFlag(Flags::HasAbortEventListener);
|
||||
}
|
||||
|
||||
uint32_t AbortSignal::addAbortAlgorithmToSignal(AbortSignal& signal, Ref<AbortAlgorithm>&& algorithm)
|
||||
|
||||
@@ -76,7 +76,7 @@ public:
|
||||
void signalAbort(JSC::JSValue reason);
|
||||
void signalFollow(AbortSignal&);
|
||||
|
||||
bool aborted() const { return m_aborted; }
|
||||
bool aborted() const { return m_flags.load(std::memory_order_seq_cst) & static_cast<uint8_t>(Flags::Aborted); }
|
||||
const JSValueInWrappedObject& reason() const { return m_reason; }
|
||||
JSValue jsReason(JSC::JSGlobalObject& globalObject);
|
||||
CommonAbortReason commonReason() const { return m_commonReason; }
|
||||
@@ -84,8 +84,8 @@ public:
|
||||
void cleanNativeBindings(void* ref);
|
||||
void addNativeCallback(NativeCallbackTuple callback) { m_native_callbacks.append(callback); }
|
||||
|
||||
bool hasActiveTimeoutTimer() const { return m_hasActiveTimeoutTimer; }
|
||||
bool hasAbortEventListener() const { return m_hasAbortEventListener; }
|
||||
bool hasActiveTimeoutTimer() const { return m_flags.load(std::memory_order_seq_cst) & static_cast<uint8_t>(Flags::HasActiveTimeoutTimer); }
|
||||
bool hasAbortEventListener() const { return m_flags.load(std::memory_order_seq_cst) & static_cast<uint8_t>(Flags::HasAbortEventListener); }
|
||||
|
||||
using RefCounted::deref;
|
||||
using RefCounted::ref;
|
||||
@@ -106,7 +106,7 @@ public:
|
||||
void incrementPendingActivityCount() { ++pendingActivityCount; }
|
||||
void decrementPendingActivityCount() { --pendingActivityCount; }
|
||||
bool hasPendingActivity() const { return pendingActivityCount > 0; }
|
||||
bool isDependent() const { return m_isDependent; }
|
||||
bool isDependent() const { return m_flags.load(std::memory_order_seq_cst) & static_cast<uint8_t>(Flags::IsDependent); }
|
||||
|
||||
private:
|
||||
enum class Aborted : bool {
|
||||
@@ -114,10 +114,24 @@ private:
|
||||
Yes
|
||||
};
|
||||
explicit AbortSignal(ScriptExecutionContext*, Aborted = Aborted::No, JSC::JSValue reason = JSC::jsUndefined());
|
||||
enum class Flags : uint8_t {
|
||||
None = 0,
|
||||
Aborted = 2,
|
||||
HasActiveTimeoutTimer = 4,
|
||||
HasAbortEventListener = 8,
|
||||
IsDependent = 16,
|
||||
};
|
||||
void setFlag(Flags flags)
|
||||
{
|
||||
m_flags.fetch_or(static_cast<uint8_t>(flags));
|
||||
}
|
||||
void clearFlag(Flags flags)
|
||||
{
|
||||
m_flags.fetch_and(~static_cast<uint8_t>(flags));
|
||||
}
|
||||
|
||||
void setHasActiveTimeoutTimer(bool hasActiveTimeoutTimer) { m_hasActiveTimeoutTimer = hasActiveTimeoutTimer; }
|
||||
|
||||
void markAsDependent() { m_isDependent = true; }
|
||||
void setHasActiveTimeoutTimer(bool hasActiveTimeoutTimer) { setFlag(Flags::HasActiveTimeoutTimer); }
|
||||
void markAsDependent() { setFlag(Flags::IsDependent); }
|
||||
void addSourceSignal(AbortSignal&);
|
||||
void addDependentSignal(AbortSignal&);
|
||||
|
||||
@@ -137,10 +151,7 @@ private:
|
||||
Vector<NativeCallbackTuple, 2> m_native_callbacks;
|
||||
std::atomic<uint32_t> pendingActivityCount { 0 };
|
||||
uint32_t m_algorithmIdentifier { 0 };
|
||||
bool m_aborted : 1 = false;
|
||||
bool m_hasActiveTimeoutTimer : 1 = false;
|
||||
bool m_hasAbortEventListener : 1 = false;
|
||||
bool m_isDependent : 1 = false;
|
||||
std::atomic<uint8_t> m_flags { 0 };
|
||||
};
|
||||
|
||||
WebCoreOpaqueRoot root(AbortSignal*);
|
||||
|
||||
@@ -21,6 +21,7 @@ const FetchTasklet = Fetch.FetchTasklet;
|
||||
const JSValue = JSC.JSValue;
|
||||
const js = JSC.C;
|
||||
const Waker = bun.Async.Waker;
|
||||
const log = bun.Output.scoped(.EventLoop, false);
|
||||
|
||||
pub const WorkPool = @import("../work_pool.zig").WorkPool;
|
||||
pub const WorkPoolTask = @import("../work_pool.zig").Task;
|
||||
@@ -532,20 +533,23 @@ pub const ConcurrentTask = struct {
|
||||
|
||||
// This type must be unique per JavaScript thread
|
||||
pub const GarbageCollectionController = struct {
|
||||
gc_timer: *uws.Timer = undefined,
|
||||
gc_timer: EventLoopTimer = .{
|
||||
.tag = .GCTimer,
|
||||
},
|
||||
gc_last_heap_size: usize = 0,
|
||||
gc_last_heap_size_on_repeating_timer: usize = 0,
|
||||
heap_size_didnt_change_for_repeating_timer_ticks_count: u8 = 0,
|
||||
gc_timer_state: GCTimerState = GCTimerState.pending,
|
||||
gc_repeating_timer: *uws.Timer = undefined,
|
||||
gc_repeating_timer: EventLoopTimer = .{
|
||||
.tag = .GCRepeatingTimer,
|
||||
},
|
||||
gc_timer_interval: i32 = 0,
|
||||
gc_repeating_timer_fast: bool = true,
|
||||
disabled: bool = false,
|
||||
|
||||
pub fn init(this: *GarbageCollectionController, vm: *VirtualMachine) void {
|
||||
const actual = uws.Loop.get();
|
||||
this.gc_timer = uws.Timer.createFallthrough(actual, this);
|
||||
this.gc_repeating_timer = uws.Timer.createFallthrough(actual, this);
|
||||
|
||||
actual.internal_loop_data.jsc_vm = vm.jsc;
|
||||
|
||||
if (comptime Environment.isDebug) {
|
||||
@@ -567,21 +571,22 @@ pub const GarbageCollectionController = struct {
|
||||
this.disabled = vm.bundler.env.has("BUN_GC_TIMER_DISABLE");
|
||||
|
||||
if (!this.disabled)
|
||||
this.gc_repeating_timer.set(this, onGCRepeatingTimer, gc_timer_interval, gc_timer_interval);
|
||||
this.gc_repeating_timer.set(vm, gc_timer_interval);
|
||||
}
|
||||
|
||||
pub fn scheduleGCTimer(this: *GarbageCollectionController) void {
|
||||
pub fn scheduleGCTimer(this: *GarbageCollectionController, now: *const bun.timespec) void {
|
||||
this.gc_timer_state = .scheduled;
|
||||
this.gc_timer.set(this, onGCTimer, 16, 0);
|
||||
this.gc_timer.setWithTimespec(this.bunVM(), now, 16);
|
||||
}
|
||||
|
||||
pub fn bunVM(this: *GarbageCollectionController) *VirtualMachine {
|
||||
return @alignCast(@fieldParentPtr("gc_controller", this));
|
||||
}
|
||||
|
||||
pub fn onGCTimer(timer: *uws.Timer) callconv(.C) void {
|
||||
var this = timer.as(*GarbageCollectionController);
|
||||
pub fn onGCTimer(this: *GarbageCollectionController) void {
|
||||
if (this.disabled) return;
|
||||
|
||||
log("onGCTimer()", .{});
|
||||
this.gc_timer_state = .run_on_next_tick;
|
||||
}
|
||||
|
||||
@@ -596,20 +601,35 @@ pub const GarbageCollectionController = struct {
|
||||
//
|
||||
// When the heap size is increasing, we always switch to fast mode
|
||||
// When the heap size has been the same or less for 30 seconds, we switch to slow mode
|
||||
pub fn updateGCRepeatTimer(this: *GarbageCollectionController, comptime setting: @Type(.EnumLiteral)) void {
|
||||
pub fn updateGCRepeatTimer(this: *GarbageCollectionController, cached_timespec: ?*const bun.timespec, comptime setting: @Type(.EnumLiteral)) ?bun.timespec {
|
||||
if (setting == .fast and !this.gc_repeating_timer_fast) {
|
||||
this.gc_repeating_timer_fast = true;
|
||||
this.gc_repeating_timer.set(this, onGCRepeatingTimer, this.gc_timer_interval, this.gc_timer_interval);
|
||||
const ts = if (cached_timespec != null) cached_timespec.?.* else bun.timespec.now();
|
||||
this.gc_repeating_timer.setWithTimespec(this.bunVM(), &ts, this.repeatingTimeInterval());
|
||||
this.heap_size_didnt_change_for_repeating_timer_ticks_count = 0;
|
||||
return ts;
|
||||
} else if (setting == .slow and this.gc_repeating_timer_fast) {
|
||||
this.gc_repeating_timer_fast = false;
|
||||
this.gc_repeating_timer.set(this, onGCRepeatingTimer, 30_000, 30_000);
|
||||
const ts = if (cached_timespec != null) cached_timespec.?.* else bun.timespec.now();
|
||||
this.gc_repeating_timer.setWithTimespec(this.bunVM(), &ts, this.repeatingTimeInterval());
|
||||
this.heap_size_didnt_change_for_repeating_timer_ticks_count = 0;
|
||||
return ts;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn onGCRepeatingTimer(timer: *uws.Timer) callconv(.C) void {
|
||||
var this = timer.as(*GarbageCollectionController);
|
||||
pub fn repeatingTimeInterval(this: *const GarbageCollectionController) i32 {
|
||||
return if (this.gc_repeating_timer_fast) this.gc_timer_interval else 30_000;
|
||||
}
|
||||
|
||||
pub fn onGCRepeatingTimer(this: *GarbageCollectionController, now: *const bun.timespec) void {
|
||||
if (comptime Environment.enable_logs)
|
||||
log("onGCRepeatingTimer(at: {d}ms, heap: {d}) - repeating every {d}ms", .{
|
||||
now.ms(),
|
||||
this.gc_last_heap_size_on_repeating_timer,
|
||||
this.repeatingTimeInterval(),
|
||||
});
|
||||
const prev_heap_size = this.gc_last_heap_size_on_repeating_timer;
|
||||
this.performGC();
|
||||
this.gc_last_heap_size_on_repeating_timer = this.gc_last_heap_size;
|
||||
@@ -617,11 +637,11 @@ pub const GarbageCollectionController = struct {
|
||||
this.heap_size_didnt_change_for_repeating_timer_ticks_count +|= 1;
|
||||
if (this.heap_size_didnt_change_for_repeating_timer_ticks_count >= 30) {
|
||||
// make the timer interval longer
|
||||
this.updateGCRepeatTimer(.slow);
|
||||
_ = this.updateGCRepeatTimer(now, .slow);
|
||||
}
|
||||
} else {
|
||||
this.heap_size_didnt_change_for_repeating_timer_ticks_count = 0;
|
||||
this.updateGCRepeatTimer(.fast);
|
||||
_ = this.updateGCRepeatTimer(now, .fast);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -638,8 +658,9 @@ pub const GarbageCollectionController = struct {
|
||||
.run_on_next_tick => {
|
||||
// When memory usage is not stable, run the GC more.
|
||||
if (this_heap_size != prev) {
|
||||
this.scheduleGCTimer();
|
||||
this.updateGCRepeatTimer(.fast);
|
||||
const now = bun.timespec.now();
|
||||
this.scheduleGCTimer(&now);
|
||||
_ = this.updateGCRepeatTimer(&now, .fast);
|
||||
} else {
|
||||
this.gc_timer_state = .pending;
|
||||
}
|
||||
@@ -648,18 +669,18 @@ pub const GarbageCollectionController = struct {
|
||||
},
|
||||
.pending => {
|
||||
if (this_heap_size != prev) {
|
||||
this.updateGCRepeatTimer(.fast);
|
||||
const ts = this.updateGCRepeatTimer(null, .fast);
|
||||
|
||||
if (this_heap_size > prev * 2) {
|
||||
this.performGC();
|
||||
} else {
|
||||
this.scheduleGCTimer();
|
||||
this.scheduleGCTimer(&(ts orelse bun.timespec.now()));
|
||||
}
|
||||
}
|
||||
},
|
||||
.scheduled => {
|
||||
if (this_heap_size > prev * 2) {
|
||||
this.updateGCRepeatTimer(.fast);
|
||||
_ = this.updateGCRepeatTimer(null, .fast);
|
||||
this.performGC();
|
||||
}
|
||||
},
|
||||
@@ -774,7 +795,6 @@ pub const EventLoop = struct {
|
||||
global: *JSGlobalObject = undefined,
|
||||
virtual_machine: *JSC.VirtualMachine = undefined,
|
||||
waker: ?Waker = null,
|
||||
forever_timer: ?*uws.Timer = null,
|
||||
deferred_tasks: DeferredTaskQueue = .{},
|
||||
uws_loop: if (Environment.isWindows) ?*uws.Loop else void = if (Environment.isWindows) null else {},
|
||||
|
||||
@@ -838,7 +858,6 @@ pub const EventLoop = struct {
|
||||
}
|
||||
|
||||
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
|
||||
const log = bun.Output.scoped(.EventLoop, false);
|
||||
|
||||
pub fn tickWhilePaused(this: *EventLoop, done: *bool) void {
|
||||
while (!done.*) {
|
||||
@@ -891,6 +910,343 @@ pub const EventLoop = struct {
|
||||
return result;
|
||||
}
|
||||
|
||||
pub fn runTask(this: *EventLoop, task: Task, global: *JSC.JSGlobalObject, jsc_vm: *JSC.VM, virtual_machine: *JSC.VirtualMachine) bool {
|
||||
switch (task.tag()) {
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellAsync))) => {
|
||||
var shell_ls_task: *ShellAsync = task.get(ShellAsync).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellAsyncSubprocessDone))) => {
|
||||
var shell_ls_task: *ShellAsyncSubprocessDone = task.get(ShellAsyncSubprocessDone).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellIOWriterAsyncDeinit))) => {
|
||||
var shell_ls_task: *ShellIOWriterAsyncDeinit = task.get(ShellIOWriterAsyncDeinit).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellIOReaderAsyncDeinit))) => {
|
||||
var shell_ls_task: *ShellIOReaderAsyncDeinit = task.get(ShellIOReaderAsyncDeinit).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellCondExprStatTask))) => {
|
||||
var shell_ls_task: *ShellCondExprStatTask = task.get(ShellCondExprStatTask).?;
|
||||
shell_ls_task.task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellCpTask))) => {
|
||||
var shell_ls_task: *ShellCpTask = task.get(ShellCpTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellTouchTask))) => {
|
||||
var shell_ls_task: *ShellTouchTask = task.get(ShellTouchTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellMkdirTask))) => {
|
||||
var shell_ls_task: *ShellMkdirTask = task.get(ShellMkdirTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellLsTask))) => {
|
||||
var shell_ls_task: *ShellLsTask = task.get(ShellLsTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellMvBatchedTask))) => {
|
||||
var shell_mv_batched_task: *ShellMvBatchedTask = task.get(ShellMvBatchedTask).?;
|
||||
shell_mv_batched_task.task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellMvCheckTargetTask))) => {
|
||||
var shell_mv_check_target_task: *ShellMvCheckTargetTask = task.get(ShellMvCheckTargetTask).?;
|
||||
shell_mv_check_target_task.task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellRmTask))) => {
|
||||
var shell_rm_task: *ShellRmTask = task.get(ShellRmTask).?;
|
||||
shell_rm_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellRmDirTask))) => {
|
||||
var shell_rm_task: *ShellRmDirTask = task.get(ShellRmDirTask).?;
|
||||
shell_rm_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellGlobTask))) => {
|
||||
var shell_glob_task: *ShellGlobTask = task.get(ShellGlobTask).?;
|
||||
shell_glob_task.runFromMainThread();
|
||||
shell_glob_task.deinit();
|
||||
},
|
||||
.FetchTasklet => {
|
||||
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
|
||||
fetch_task.onProgressUpdate();
|
||||
},
|
||||
@field(Task.Tag, @typeName(AsyncGlobWalkTask)) => {
|
||||
var globWalkTask: *AsyncGlobWalkTask = task.get(AsyncGlobWalkTask).?;
|
||||
globWalkTask.*.runFromJS();
|
||||
globWalkTask.deinit();
|
||||
},
|
||||
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
|
||||
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
|
||||
transform_task.*.runFromJS();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
|
||||
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
|
||||
transform_task.*.runFromJS();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => {
|
||||
const transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
|
||||
transform_task.*.runFromJS();
|
||||
},
|
||||
.ThreadSafeFunction => {
|
||||
var transform_task: *ThreadSafeFunction = task.as(ThreadSafeFunction);
|
||||
transform_task.onDispatch();
|
||||
},
|
||||
@field(Task.Tag, @typeName(ReadFileTask)) => {
|
||||
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
|
||||
transform_task.*.runFromJS();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, bun.meta.typeBaseName(@typeName(JSCDeferredWorkTask))) => {
|
||||
var jsc_task: *JSCDeferredWorkTask = task.get(JSCDeferredWorkTask).?;
|
||||
JSC.markBinding(@src());
|
||||
jsc_task.run();
|
||||
},
|
||||
@field(Task.Tag, @typeName(WriteFileTask)) => {
|
||||
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
|
||||
transform_task.*.runFromJS();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, @typeName(HotReloadTask)) => {
|
||||
const transform_task: *HotReloadTask = task.get(HotReloadTask).?;
|
||||
transform_task.run();
|
||||
transform_task.deinit();
|
||||
// special case: we return
|
||||
return false;
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(bun.bake.DevServer.HotReloadEvent))) => {
|
||||
const hmr_task: *bun.bake.DevServer.HotReloadEvent = task.get(bun.bake.DevServer.HotReloadEvent).?;
|
||||
hmr_task.run();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(FSWatchTask))) => {
|
||||
var transform_task: *FSWatchTask = task.get(FSWatchTask).?;
|
||||
transform_task.*.run();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(AnyTask))) => {
|
||||
var any: *AnyTask = task.get(AnyTask).?;
|
||||
any.run();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ManagedTask))) => {
|
||||
var any: *ManagedTask = task.get(ManagedTask).?;
|
||||
any.run();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(CppTask))) => {
|
||||
var any: *CppTask = task.get(CppTask).?;
|
||||
any.run(global);
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(PollPendingModulesTask))) => {
|
||||
virtual_machine.modules.onPoll();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(GetAddrInfoRequestTask))) => {
|
||||
if (Environment.os == .windows) @panic("This should not be reachable on Windows");
|
||||
|
||||
var any: *GetAddrInfoRequestTask = task.get(GetAddrInfoRequestTask).?;
|
||||
any.runFromJS();
|
||||
any.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Stat))) => {
|
||||
var any: *Stat = task.get(Stat).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Lstat))) => {
|
||||
var any: *Lstat = task.get(Lstat).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Fstat))) => {
|
||||
var any: *Fstat = task.get(Fstat).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Open))) => {
|
||||
var any: *Open = task.get(Open).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ReadFile))) => {
|
||||
var any: *ReadFile = task.get(ReadFile).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(WriteFile))) => {
|
||||
var any: *WriteFile = task.get(WriteFile).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(CopyFile))) => {
|
||||
var any: *CopyFile = task.get(CopyFile).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Read))) => {
|
||||
var any: *Read = task.get(Read).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Write))) => {
|
||||
var any: *Write = task.get(Write).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Truncate))) => {
|
||||
var any: *Truncate = task.get(Truncate).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Writev))) => {
|
||||
var any: *Writev = task.get(Writev).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Readv))) => {
|
||||
var any: *Readv = task.get(Readv).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Rename))) => {
|
||||
var any: *Rename = task.get(Rename).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(FTruncate))) => {
|
||||
var any: *FTruncate = task.get(FTruncate).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Readdir))) => {
|
||||
var any: *Readdir = task.get(Readdir).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ReaddirRecursive))) => {
|
||||
var any: *ReaddirRecursive = task.get(ReaddirRecursive).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Close))) => {
|
||||
var any: *Close = task.get(Close).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Rm))) => {
|
||||
var any: *Rm = task.get(Rm).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Rmdir))) => {
|
||||
var any: *Rmdir = task.get(Rmdir).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Chown))) => {
|
||||
var any: *Chown = task.get(Chown).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(FChown))) => {
|
||||
var any: *FChown = task.get(FChown).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Utimes))) => {
|
||||
var any: *Utimes = task.get(Utimes).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Lutimes))) => {
|
||||
var any: *Lutimes = task.get(Lutimes).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Chmod))) => {
|
||||
var any: *Chmod = task.get(Chmod).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Fchmod))) => {
|
||||
var any: *Fchmod = task.get(Fchmod).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Link))) => {
|
||||
var any: *Link = task.get(Link).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Symlink))) => {
|
||||
var any: *Symlink = task.get(Symlink).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Readlink))) => {
|
||||
var any: *Readlink = task.get(Readlink).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Realpath))) => {
|
||||
var any: *Realpath = task.get(Realpath).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Mkdir))) => {
|
||||
var any: *Mkdir = task.get(Mkdir).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Fsync))) => {
|
||||
var any: *Fsync = task.get(Fsync).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Fdatasync))) => {
|
||||
var any: *Fdatasync = task.get(Fdatasync).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Access))) => {
|
||||
var any: *Access = task.get(Access).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(AppendFile))) => {
|
||||
var any: *AppendFile = task.get(AppendFile).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Mkdtemp))) => {
|
||||
var any: *Mkdtemp = task.get(Mkdtemp).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Exists))) => {
|
||||
var any: *Exists = task.get(Exists).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Futimes))) => {
|
||||
var any: *Futimes = task.get(Futimes).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Lchmod))) => {
|
||||
var any: *Lchmod = task.get(Lchmod).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Lchown))) => {
|
||||
var any: *Lchown = task.get(Lchown).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Unlink))) => {
|
||||
var any: *Unlink = task.get(Unlink).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(NativeZlib))) => {
|
||||
var any: *NativeZlib = task.get(NativeZlib).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(NativeBrotli))) => {
|
||||
var any: *NativeBrotli = task.get(NativeBrotli).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ProcessWaiterThreadTask))) => {
|
||||
bun.markPosixOnly();
|
||||
var any: *ProcessWaiterThreadTask = task.get(ProcessWaiterThreadTask).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(RuntimeTranspilerStore))) => {
|
||||
var any: *RuntimeTranspilerStore = task.get(RuntimeTranspilerStore).?;
|
||||
any.drain();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(TimerObject))) => {
|
||||
var any: *TimerObject = task.get(TimerObject).?;
|
||||
any.runImmediateTask(virtual_machine);
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ServerAllConnectionsClosedTask))) => {
|
||||
var any: *ServerAllConnectionsClosedTask = task.get(ServerAllConnectionsClosedTask).?;
|
||||
any.runFromJSThread(virtual_machine);
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(bun.bundle_v2.DeferredBatchTask))) => {
|
||||
var any: *bun.bundle_v2.DeferredBatchTask = task.get(bun.bundle_v2.DeferredBatchTask).?;
|
||||
any.runOnJSThread();
|
||||
},
|
||||
|
||||
else => {
|
||||
bun.Output.panic("Unexpected tag: {s}", .{@tagName(task.tag())});
|
||||
},
|
||||
}
|
||||
|
||||
this.drainMicrotasksWithGlobal(global, jsc_vm);
|
||||
return true;
|
||||
}
|
||||
fn tickQueueWithCount(this: *EventLoop, virtual_machine: *VirtualMachine, comptime queue_name: []const u8) u32 {
|
||||
var global = this.global;
|
||||
const global_vm = global.vm();
|
||||
@@ -928,341 +1284,12 @@ pub const EventLoop = struct {
|
||||
}
|
||||
|
||||
while (@field(this, queue_name).readItem()) |task| {
|
||||
defer counter += 1;
|
||||
switch (task.tag()) {
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellAsync))) => {
|
||||
var shell_ls_task: *ShellAsync = task.get(ShellAsync).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellAsyncSubprocessDone))) => {
|
||||
var shell_ls_task: *ShellAsyncSubprocessDone = task.get(ShellAsyncSubprocessDone).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellIOWriterAsyncDeinit))) => {
|
||||
var shell_ls_task: *ShellIOWriterAsyncDeinit = task.get(ShellIOWriterAsyncDeinit).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellIOReaderAsyncDeinit))) => {
|
||||
var shell_ls_task: *ShellIOReaderAsyncDeinit = task.get(ShellIOReaderAsyncDeinit).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellCondExprStatTask))) => {
|
||||
var shell_ls_task: *ShellCondExprStatTask = task.get(ShellCondExprStatTask).?;
|
||||
shell_ls_task.task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellCpTask))) => {
|
||||
var shell_ls_task: *ShellCpTask = task.get(ShellCpTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellTouchTask))) => {
|
||||
var shell_ls_task: *ShellTouchTask = task.get(ShellTouchTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellMkdirTask))) => {
|
||||
var shell_ls_task: *ShellMkdirTask = task.get(ShellMkdirTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellLsTask))) => {
|
||||
var shell_ls_task: *ShellLsTask = task.get(ShellLsTask).?;
|
||||
shell_ls_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellMvBatchedTask))) => {
|
||||
var shell_mv_batched_task: *ShellMvBatchedTask = task.get(ShellMvBatchedTask).?;
|
||||
shell_mv_batched_task.task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellMvCheckTargetTask))) => {
|
||||
var shell_mv_check_target_task: *ShellMvCheckTargetTask = task.get(ShellMvCheckTargetTask).?;
|
||||
shell_mv_check_target_task.task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellRmTask))) => {
|
||||
var shell_rm_task: *ShellRmTask = task.get(ShellRmTask).?;
|
||||
shell_rm_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellRmDirTask))) => {
|
||||
var shell_rm_task: *ShellRmDirTask = task.get(ShellRmDirTask).?;
|
||||
shell_rm_task.runFromMainThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ShellGlobTask))) => {
|
||||
var shell_glob_task: *ShellGlobTask = task.get(ShellGlobTask).?;
|
||||
shell_glob_task.runFromMainThread();
|
||||
shell_glob_task.deinit();
|
||||
},
|
||||
.FetchTasklet => {
|
||||
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
|
||||
fetch_task.onProgressUpdate();
|
||||
},
|
||||
@field(Task.Tag, @typeName(AsyncGlobWalkTask)) => {
|
||||
var globWalkTask: *AsyncGlobWalkTask = task.get(AsyncGlobWalkTask).?;
|
||||
globWalkTask.*.runFromJS();
|
||||
globWalkTask.deinit();
|
||||
},
|
||||
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
|
||||
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
|
||||
transform_task.*.runFromJS();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
|
||||
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
|
||||
transform_task.*.runFromJS();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => {
|
||||
const transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
|
||||
transform_task.*.runFromJS();
|
||||
},
|
||||
.ThreadSafeFunction => {
|
||||
var transform_task: *ThreadSafeFunction = task.as(ThreadSafeFunction);
|
||||
transform_task.onDispatch();
|
||||
},
|
||||
@field(Task.Tag, @typeName(ReadFileTask)) => {
|
||||
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
|
||||
transform_task.*.runFromJS();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, bun.meta.typeBaseName(@typeName(JSCDeferredWorkTask))) => {
|
||||
var jsc_task: *JSCDeferredWorkTask = task.get(JSCDeferredWorkTask).?;
|
||||
JSC.markBinding(@src());
|
||||
jsc_task.run();
|
||||
},
|
||||
@field(Task.Tag, @typeName(WriteFileTask)) => {
|
||||
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
|
||||
transform_task.*.runFromJS();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, @typeName(HotReloadTask)) => {
|
||||
const transform_task: *HotReloadTask = task.get(HotReloadTask).?;
|
||||
transform_task.run();
|
||||
transform_task.deinit();
|
||||
// special case: we return
|
||||
return 0;
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(bun.bake.DevServer.HotReloadEvent))) => {
|
||||
const hmr_task: *bun.bake.DevServer.HotReloadEvent = task.get(bun.bake.DevServer.HotReloadEvent).?;
|
||||
hmr_task.run();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(FSWatchTask))) => {
|
||||
var transform_task: *FSWatchTask = task.get(FSWatchTask).?;
|
||||
transform_task.*.run();
|
||||
transform_task.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(AnyTask))) => {
|
||||
var any: *AnyTask = task.get(AnyTask).?;
|
||||
any.run();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ManagedTask))) => {
|
||||
var any: *ManagedTask = task.get(ManagedTask).?;
|
||||
any.run();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(CppTask))) => {
|
||||
var any: *CppTask = task.get(CppTask).?;
|
||||
any.run(global);
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(PollPendingModulesTask))) => {
|
||||
virtual_machine.modules.onPoll();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(GetAddrInfoRequestTask))) => {
|
||||
if (Environment.os == .windows) @panic("This should not be reachable on Windows");
|
||||
|
||||
var any: *GetAddrInfoRequestTask = task.get(GetAddrInfoRequestTask).?;
|
||||
any.runFromJS();
|
||||
any.deinit();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Stat))) => {
|
||||
var any: *Stat = task.get(Stat).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Lstat))) => {
|
||||
var any: *Lstat = task.get(Lstat).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Fstat))) => {
|
||||
var any: *Fstat = task.get(Fstat).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Open))) => {
|
||||
var any: *Open = task.get(Open).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ReadFile))) => {
|
||||
var any: *ReadFile = task.get(ReadFile).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(WriteFile))) => {
|
||||
var any: *WriteFile = task.get(WriteFile).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(CopyFile))) => {
|
||||
var any: *CopyFile = task.get(CopyFile).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Read))) => {
|
||||
var any: *Read = task.get(Read).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Write))) => {
|
||||
var any: *Write = task.get(Write).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Truncate))) => {
|
||||
var any: *Truncate = task.get(Truncate).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Writev))) => {
|
||||
var any: *Writev = task.get(Writev).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Readv))) => {
|
||||
var any: *Readv = task.get(Readv).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Rename))) => {
|
||||
var any: *Rename = task.get(Rename).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(FTruncate))) => {
|
||||
var any: *FTruncate = task.get(FTruncate).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Readdir))) => {
|
||||
var any: *Readdir = task.get(Readdir).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ReaddirRecursive))) => {
|
||||
var any: *ReaddirRecursive = task.get(ReaddirRecursive).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Close))) => {
|
||||
var any: *Close = task.get(Close).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Rm))) => {
|
||||
var any: *Rm = task.get(Rm).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Rmdir))) => {
|
||||
var any: *Rmdir = task.get(Rmdir).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Chown))) => {
|
||||
var any: *Chown = task.get(Chown).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(FChown))) => {
|
||||
var any: *FChown = task.get(FChown).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Utimes))) => {
|
||||
var any: *Utimes = task.get(Utimes).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Lutimes))) => {
|
||||
var any: *Lutimes = task.get(Lutimes).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Chmod))) => {
|
||||
var any: *Chmod = task.get(Chmod).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Fchmod))) => {
|
||||
var any: *Fchmod = task.get(Fchmod).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Link))) => {
|
||||
var any: *Link = task.get(Link).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Symlink))) => {
|
||||
var any: *Symlink = task.get(Symlink).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Readlink))) => {
|
||||
var any: *Readlink = task.get(Readlink).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Realpath))) => {
|
||||
var any: *Realpath = task.get(Realpath).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Mkdir))) => {
|
||||
var any: *Mkdir = task.get(Mkdir).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Fsync))) => {
|
||||
var any: *Fsync = task.get(Fsync).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Fdatasync))) => {
|
||||
var any: *Fdatasync = task.get(Fdatasync).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Access))) => {
|
||||
var any: *Access = task.get(Access).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(AppendFile))) => {
|
||||
var any: *AppendFile = task.get(AppendFile).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Mkdtemp))) => {
|
||||
var any: *Mkdtemp = task.get(Mkdtemp).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Exists))) => {
|
||||
var any: *Exists = task.get(Exists).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Futimes))) => {
|
||||
var any: *Futimes = task.get(Futimes).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Lchmod))) => {
|
||||
var any: *Lchmod = task.get(Lchmod).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Lchown))) => {
|
||||
var any: *Lchown = task.get(Lchown).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(Unlink))) => {
|
||||
var any: *Unlink = task.get(Unlink).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(NativeZlib))) => {
|
||||
var any: *NativeZlib = task.get(NativeZlib).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(NativeBrotli))) => {
|
||||
var any: *NativeBrotli = task.get(NativeBrotli).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ProcessWaiterThreadTask))) => {
|
||||
bun.markPosixOnly();
|
||||
var any: *ProcessWaiterThreadTask = task.get(ProcessWaiterThreadTask).?;
|
||||
any.runFromJSThread();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(RuntimeTranspilerStore))) => {
|
||||
var any: *RuntimeTranspilerStore = task.get(RuntimeTranspilerStore).?;
|
||||
any.drain();
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(TimerObject))) => {
|
||||
var any: *TimerObject = task.get(TimerObject).?;
|
||||
any.runImmediateTask(virtual_machine);
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(ServerAllConnectionsClosedTask))) => {
|
||||
var any: *ServerAllConnectionsClosedTask = task.get(ServerAllConnectionsClosedTask).?;
|
||||
any.runFromJSThread(virtual_machine);
|
||||
},
|
||||
@field(Task.Tag, typeBaseName(@typeName(bun.bundle_v2.DeferredBatchTask))) => {
|
||||
var any: *bun.bundle_v2.DeferredBatchTask = task.get(bun.bundle_v2.DeferredBatchTask).?;
|
||||
any.runOnJSThread();
|
||||
},
|
||||
|
||||
else => {
|
||||
bun.Output.panic("Unexpected tag: {s}", .{@tagName(task.tag())});
|
||||
},
|
||||
if (!this.runTask(task, global, global_vm, virtual_machine)) {
|
||||
// Yield when hot reloading.
|
||||
return 0;
|
||||
}
|
||||
|
||||
this.drainMicrotasksWithGlobal(global, global_vm);
|
||||
counter += 1;
|
||||
}
|
||||
|
||||
@field(this, queue_name).head = if (@field(this, queue_name).count == 0) 0 else @field(this, queue_name).head;
|
||||
@@ -1430,18 +1457,14 @@ pub const EventLoop = struct {
|
||||
}
|
||||
}
|
||||
|
||||
this.processGCTimer();
|
||||
if (!loop.isActive()) {
|
||||
if (this.forever_timer == null) {
|
||||
var t = uws.Timer.create(loop, this);
|
||||
t.set(this, &noopForeverTimer, 1000 * 60 * 4, 1000 * 60 * 4);
|
||||
this.forever_timer = t;
|
||||
}
|
||||
const timespec: bun.timespec = bun.timespec.msFromNow(std.time.ms_per_week);
|
||||
loop.tickWithTimeout(×pec);
|
||||
} else {
|
||||
loop.tick();
|
||||
}
|
||||
|
||||
this.processGCTimer();
|
||||
this.processGCTimer();
|
||||
loop.tick();
|
||||
|
||||
ctx.onAfterEventLoop();
|
||||
this.tickConcurrent();
|
||||
this.tick();
|
||||
@@ -1561,18 +1584,31 @@ pub const EventLoop = struct {
|
||||
this.next_immediate_tasks.writeItem(task) catch unreachable;
|
||||
}
|
||||
|
||||
pub const TimeoutTask = struct {
|
||||
task: Task,
|
||||
event_loop_timer: EventLoopTimer = .{
|
||||
.tag = .TimeoutTask,
|
||||
},
|
||||
|
||||
pub usingnamespace bun.New(@This());
|
||||
|
||||
pub fn create(vm: *JSC.VirtualMachine, task: Task, timeout: i32) void {
|
||||
const this = TimeoutTask.new(.{
|
||||
.task = task,
|
||||
});
|
||||
this.event_loop_timer.set(vm, timeout);
|
||||
}
|
||||
|
||||
pub fn schedule(this: *TimeoutTask, vm: *JSC.VirtualMachine) void {
|
||||
const task = this.task;
|
||||
this.destroy();
|
||||
|
||||
_ = vm.event_loop.runTask(task, vm.global, vm.jsc, vm);
|
||||
}
|
||||
};
|
||||
|
||||
pub fn enqueueTaskWithTimeout(this: *EventLoop, task: Task, timeout: i32) void {
|
||||
// TODO: make this more efficient!
|
||||
const loop = this.virtual_machine.uwsLoop();
|
||||
var timer = uws.Timer.createFallthrough(loop, task.ptr());
|
||||
timer.set(task.ptr(), callTask, timeout, 0);
|
||||
}
|
||||
|
||||
pub fn callTask(timer: *uws.Timer) callconv(.C) void {
|
||||
const task = Task.from(timer.as(*anyopaque));
|
||||
defer timer.deinit(true);
|
||||
|
||||
JSC.VirtualMachine.get().enqueueTask(task);
|
||||
TimeoutTask.create(this.virtual_machine, task, timeout);
|
||||
}
|
||||
|
||||
pub fn ensureWaker(this: *EventLoop) void {
|
||||
@@ -2293,3 +2329,5 @@ pub const EventLoopTaskPtr = union {
|
||||
js: *ConcurrentTask,
|
||||
mini: *JSC.AnyTaskWithExtraContext,
|
||||
};
|
||||
|
||||
pub const EventLoopTimer = @import("./api/Timer.zig").EventLoopTimer;
|
||||
|
||||
@@ -443,10 +443,6 @@ pub const StatWatcher = struct {
|
||||
) catch |err| this.globalThis.reportActiveExceptionAsUnhandled(err);
|
||||
}
|
||||
|
||||
pub fn onTimerInterval(timer: *uws.Timer) callconv(.C) void {
|
||||
timer.ext(StatWatcher).?.restat();
|
||||
}
|
||||
|
||||
pub fn init(args: Arguments) !*StatWatcher {
|
||||
log("init", .{});
|
||||
|
||||
|
||||
@@ -1631,8 +1631,6 @@ pub const TestCommand = struct {
|
||||
vm_.runWithAPILock(Context, &ctx, Context.begin);
|
||||
}
|
||||
|
||||
fn timerNoop(_: *uws.Timer) callconv(.C) void {}
|
||||
|
||||
pub fn run(
|
||||
reporter: *CommandLineReporter,
|
||||
vm: *JSC.VirtualMachine,
|
||||
|
||||
@@ -45,7 +45,9 @@ fn NativeSocketHandleType(comptime ssl: bool) type {
|
||||
pub const InternalLoopData = extern struct {
|
||||
pub const us_internal_async = opaque {};
|
||||
|
||||
sweep_timer: ?*Timer,
|
||||
/// Do not use uws.Timer in any new code.
|
||||
sweep_timer: ?*anyopaque,
|
||||
|
||||
wakeup_async: ?*us_internal_async,
|
||||
last_write_failed: i32,
|
||||
head: ?*SocketContext,
|
||||
@@ -2305,47 +2307,6 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type {
|
||||
pub const SocketTCP = NewSocketHandler(false);
|
||||
pub const SocketTLS = NewSocketHandler(true);
|
||||
|
||||
pub const Timer = opaque {
|
||||
pub fn create(loop: *Loop, ptr: anytype) *Timer {
|
||||
const Type = @TypeOf(ptr);
|
||||
|
||||
// never fallthrough poll
|
||||
// the problem is uSockets hardcodes it on the other end
|
||||
// so we can never free non-fallthrough polls
|
||||
return us_create_timer(loop, 0, @sizeOf(Type)) orelse std.debug.panic("us_create_timer: returned null: {d}", .{std.c._errno().*});
|
||||
}
|
||||
|
||||
pub fn createFallthrough(loop: *Loop, ptr: anytype) *Timer {
|
||||
const Type = @TypeOf(ptr);
|
||||
|
||||
// never fallthrough poll
|
||||
// the problem is uSockets hardcodes it on the other end
|
||||
// so we can never free non-fallthrough polls
|
||||
return us_create_timer(loop, 1, @sizeOf(Type)) orelse std.debug.panic("us_create_timer: returned null: {d}", .{std.c._errno().*});
|
||||
}
|
||||
|
||||
pub fn set(this: *Timer, ptr: anytype, cb: ?*const fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void {
|
||||
us_timer_set(this, cb, ms, repeat_ms);
|
||||
const value_ptr = us_timer_ext(this);
|
||||
@setRuntimeSafety(false);
|
||||
@as(*@TypeOf(ptr), @ptrCast(@alignCast(value_ptr))).* = ptr;
|
||||
}
|
||||
|
||||
pub fn deinit(this: *Timer, comptime fallthrough: bool) void {
|
||||
debug("Timer.deinit()", .{});
|
||||
us_timer_close(this, @intFromBool(fallthrough));
|
||||
}
|
||||
|
||||
pub fn ext(this: *Timer, comptime Type: type) ?*Type {
|
||||
return @as(*Type, @ptrCast(@alignCast(us_timer_ext(this).*.?)));
|
||||
}
|
||||
|
||||
pub fn as(this: *Timer, comptime Type: type) Type {
|
||||
@setRuntimeSafety(false);
|
||||
return @as(*?Type, @ptrCast(@alignCast(us_timer_ext(this)))).*.?;
|
||||
}
|
||||
};
|
||||
|
||||
pub const SocketContext = opaque {
|
||||
pub fn getNativeHandle(this: *SocketContext, comptime ssl: bool) *anyopaque {
|
||||
return us_socket_context_get_native_handle(@intFromBool(ssl), this).?;
|
||||
@@ -2606,11 +2567,6 @@ pub const PosixLoop = extern struct {
|
||||
|
||||
extern fn uws_loop_defer(loop: *Loop, ctx: *anyopaque, cb: *const (fn (ctx: *anyopaque) callconv(.C) void)) void;
|
||||
|
||||
extern fn us_create_timer(loop: ?*Loop, fallthrough: i32, ext_size: c_uint) ?*Timer;
|
||||
extern fn us_timer_ext(timer: ?*Timer) *?*anyopaque;
|
||||
extern fn us_timer_close(timer: ?*Timer, fallthrough: i32) void;
|
||||
extern fn us_timer_set(timer: ?*Timer, cb: ?*const fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void;
|
||||
extern fn us_timer_loop(t: ?*Timer) ?*Loop;
|
||||
pub const us_socket_context_options_t = extern struct {
|
||||
key_file_name: [*c]const u8 = null,
|
||||
cert_file_name: [*c]const u8 = null,
|
||||
@@ -4239,6 +4195,7 @@ pub const WindowsLoop = extern struct {
|
||||
is_default: c_int,
|
||||
pre: *uv.uv_prepare_t,
|
||||
check: *uv.uv_check_t,
|
||||
tick_with_timeout_timer: ?*uv.Timer = null,
|
||||
|
||||
pub fn get() *WindowsLoop {
|
||||
return uws_get_loop_with_native(bun.windows.libuv.Loop.get());
|
||||
@@ -4268,8 +4225,30 @@ pub const WindowsLoop = extern struct {
|
||||
|
||||
pub const wake = wakeup;
|
||||
|
||||
pub fn tickWithTimeout(this: *WindowsLoop, _: ?*const bun.timespec) void {
|
||||
fn timeoutTimer(this: *WindowsLoop) ?*uv.Timer {
|
||||
return this.tick_with_timeout_timer orelse {
|
||||
this.tick_with_timeout_timer = bun.create(bun.default_allocator, uv.Timer, std.mem.zeroes(uv.Timer));
|
||||
this.tick_with_timeout_timer.?.init(this.uv_loop);
|
||||
return this.tick_with_timeout_timer.?;
|
||||
};
|
||||
}
|
||||
|
||||
fn onTimeout(_: *uv.Timer) callconv(.C) void {}
|
||||
|
||||
pub fn tickWithTimeout(this: *WindowsLoop, timeout: ?*const bun.timespec) void {
|
||||
var did_schedule_timeout_timer = false;
|
||||
if (timeout) |t| {
|
||||
if (this.timeoutTimer()) |timer| {
|
||||
timer.start(t.ms(), 0, onTimeout);
|
||||
timer.ref();
|
||||
did_schedule_timeout_timer = true;
|
||||
}
|
||||
}
|
||||
us_loop_run(this);
|
||||
|
||||
if (did_schedule_timeout_timer) {
|
||||
this.tick_with_timeout_timer.?.unref();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tickWithoutIdle(this: *WindowsLoop) void {
|
||||
|
||||
64
src/http.zig
64
src/http.zig
@@ -49,8 +49,14 @@ const TaggedPointerUnion = @import("./tagged_pointer.zig").TaggedPointerUnion;
|
||||
const DeadSocket = opaque {};
|
||||
var dead_socket = @as(*DeadSocket, @ptrFromInt(1));
|
||||
//TODO: this needs to be freed when Worker Threads are implemented
|
||||
var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSocket).init(bun.default_allocator);
|
||||
var async_http_id: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
|
||||
var socket_async_http_abort_tracker = std.AutoHashMap(u32, uws.InternalSocket).init(bun.default_allocator);
|
||||
const AsyncHTTPID = u31;
|
||||
fn nextAsyncHttpID() AsyncHTTPID {
|
||||
const container = struct {
|
||||
pub var async_http_id: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
|
||||
};
|
||||
return @truncate(container.async_http_id.fetchAdd(1, .monotonic) % std.math.maxInt(u31));
|
||||
}
|
||||
const MAX_REDIRECT_URL_LENGTH = 128 * 1024;
|
||||
var custom_ssl_context_map = std.AutoArrayHashMap(*SSLConfig, *NewHTTPContext(true)).init(bun.default_allocator);
|
||||
|
||||
@@ -1052,18 +1058,19 @@ pub const HTTPThread = struct {
|
||||
lazy_libdeflater: ?*LibdeflateState = null,
|
||||
|
||||
const threadlog = Output.scoped(.HTTPThread, true);
|
||||
|
||||
const ShutdownMessage = packed struct(u32) {
|
||||
async_http_id: AsyncHTTPID,
|
||||
is_tls: bool,
|
||||
};
|
||||
const WriteMessage = struct {
|
||||
data: []const u8,
|
||||
async_http_id: u32,
|
||||
async_http_id: AsyncHTTPID,
|
||||
flags: packed struct {
|
||||
is_tls: bool,
|
||||
ended: bool,
|
||||
},
|
||||
};
|
||||
const ShutdownMessage = struct {
|
||||
async_http_id: u32,
|
||||
is_tls: bool,
|
||||
};
|
||||
|
||||
pub const LibdeflateState = struct {
|
||||
decompressor: *bun.libdeflate.Decompressor = undefined,
|
||||
@@ -1230,10 +1237,17 @@ pub const HTTPThread = struct {
|
||||
|
||||
fn drainEvents(this: *@This()) void {
|
||||
{
|
||||
this.queued_shutdowns_lock.lock();
|
||||
defer this.queued_shutdowns_lock.unlock();
|
||||
for (this.queued_shutdowns.items) |http| {
|
||||
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
|
||||
var shutdowns = brk: {
|
||||
this.queued_shutdowns_lock.lock();
|
||||
defer this.queued_shutdowns_lock.unlock();
|
||||
const shutdowns = this.queued_shutdowns;
|
||||
this.queued_shutdowns = .{};
|
||||
break :brk shutdowns;
|
||||
};
|
||||
defer shutdowns.deinit(bun.default_allocator);
|
||||
|
||||
for (shutdowns.items) |http| {
|
||||
if (socket_async_http_abort_tracker.fetchRemove(http.async_http_id)) |socket_ptr| {
|
||||
if (http.is_tls) {
|
||||
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
|
||||
// do a fast shutdown here since we are aborting and we dont want to wait for the close_notify from the other side
|
||||
@@ -1244,7 +1258,6 @@ pub const HTTPThread = struct {
|
||||
}
|
||||
}
|
||||
}
|
||||
this.queued_shutdowns.clearRetainingCapacity();
|
||||
}
|
||||
{
|
||||
this.queued_writes_lock.lock();
|
||||
@@ -1506,6 +1519,7 @@ fn registerAbortTracker(
|
||||
socket: NewHTTPContext(is_ssl).HTTPSocket,
|
||||
) void {
|
||||
if (client.signals.aborted != null) {
|
||||
client.state.flags.did_register_abort_tracker = true;
|
||||
socket_async_http_abort_tracker.put(client.async_http_id, socket.socket) catch unreachable;
|
||||
}
|
||||
}
|
||||
@@ -1514,7 +1528,8 @@ fn unregisterAbortTracker(
|
||||
client: *HTTPClient,
|
||||
) void {
|
||||
if (client.signals.aborted != null) {
|
||||
_ = socket_async_http_abort_tracker.swapRemove(client.async_http_id);
|
||||
client.state.flags.did_register_abort_tracker = false;
|
||||
_ = socket_async_http_abort_tracker.remove(client.async_http_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1924,6 +1939,10 @@ pub const InternalState = struct {
|
||||
is_redirect_pending: bool = false,
|
||||
is_libdeflate_fast_path_disabled: bool = false,
|
||||
resend_request_body_on_redirect: bool = false,
|
||||
|
||||
/// Attempt to register to receive AbortSignal while the socket has not connected yet.
|
||||
/// It's fine for this to be wrong sometimes.
|
||||
did_register_abort_tracker: bool = false,
|
||||
};
|
||||
|
||||
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
|
||||
@@ -2149,7 +2168,7 @@ http_proxy: ?URL = null,
|
||||
proxy_authorization: ?[]u8 = null,
|
||||
proxy_tunnel: ?*ProxyTunnel = null,
|
||||
signals: Signals = .{},
|
||||
async_http_id: u32 = 0,
|
||||
async_http_id: AsyncHTTPID = 0,
|
||||
hostname: ?[]u8 = null,
|
||||
unix_socket_path: JSC.ZigString.Slice = JSC.ZigString.Slice.empty,
|
||||
|
||||
@@ -2320,7 +2339,7 @@ pub const AsyncHTTP = struct {
|
||||
waitingDeffered: bool = false,
|
||||
finalized: bool = false,
|
||||
err: ?anyerror = null,
|
||||
async_http_id: u32 = 0,
|
||||
async_http_id: AsyncHTTPID = 0,
|
||||
|
||||
state: AtomicState = AtomicState.init(State.pending),
|
||||
elapsed: u64 = 0,
|
||||
@@ -2468,7 +2487,7 @@ pub const AsyncHTTP = struct {
|
||||
.result_callback = callback,
|
||||
.http_proxy = options.http_proxy,
|
||||
.signals = options.signals orelse .{},
|
||||
.async_http_id = if (options.signals != null and options.signals.?.aborted != null) async_http_id.fetchAdd(1, .monotonic) else 0,
|
||||
.async_http_id = if (options.signals != null and options.signals.?.aborted != null) nextAsyncHttpID() else 0,
|
||||
};
|
||||
|
||||
this.client = .{
|
||||
@@ -2721,7 +2740,9 @@ pub const AsyncHTTP = struct {
|
||||
}
|
||||
|
||||
if (socket_async_http_abort_tracker.capacity() > 10_000 and socket_async_http_abort_tracker.count() < 100) {
|
||||
socket_async_http_abort_tracker.shrinkAndFree(socket_async_http_abort_tracker.count());
|
||||
const cloned = socket_async_http_abort_tracker.clone() catch bun.outOfMemory();
|
||||
socket_async_http_abort_tracker.deinit();
|
||||
socket_async_http_abort_tracker = cloned;
|
||||
}
|
||||
|
||||
if (result.has_more) {
|
||||
@@ -2991,6 +3012,15 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
|
||||
this.fail(error.ConnectionClosed);
|
||||
return;
|
||||
}
|
||||
|
||||
// A connecting socket won't have called onOpen yet.
|
||||
// So if you want to be able to call AbortSignal.timeout() on it, we need to
|
||||
// manually register the abort tracker.
|
||||
if (socket.socket == .connecting) {
|
||||
if (!socket.isClosed() and !this.state.flags.did_register_abort_tracker) {
|
||||
this.registerAbortTracker(is_ssl, socket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const Task = ThreadPool.Task;
|
||||
|
||||
@@ -1,23 +1,53 @@
|
||||
import type { Server } from "bun";
|
||||
for (let beforeHeaders of [true, false]) {
|
||||
for (let abortTimeout of [1, 2, 0]) {
|
||||
const count = 100;
|
||||
let defer = Promise.withResolvers();
|
||||
const log = `[${beforeHeaders ? "beforeHeaders" : "afterHeaders"}] ${abortTimeout} timeout`;
|
||||
console.time(log);
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
idleTimeout: 0,
|
||||
development: false,
|
||||
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch() {
|
||||
const signal = AbortSignal.timeout(1);
|
||||
return await fetch("https://example.com", { signal });
|
||||
},
|
||||
});
|
||||
async fetch() {
|
||||
if (beforeHeaders) {
|
||||
await defer.promise;
|
||||
throw new Error("Never going to happen");
|
||||
} else {
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
async pull(controller) {
|
||||
controller.enqueue("a");
|
||||
await defer.promise;
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
function hostname(server: Server) {
|
||||
if (server.hostname.startsWith(":")) return `[${server.hostname}]`;
|
||||
return server.hostname;
|
||||
let responses = new Array(count);
|
||||
|
||||
for (let i = 0; i < count; i++) {
|
||||
const defer2 = Promise.withResolvers();
|
||||
fetch(server.url, { signal: AbortSignal.timeout(abortTimeout) })
|
||||
.then(response => {
|
||||
if (beforeHeaders) {
|
||||
defer2.reject(new Error("One of the requests succeeded"));
|
||||
} else {
|
||||
return response.arrayBuffer();
|
||||
}
|
||||
})
|
||||
.catch(err => {
|
||||
if (err.name !== "TimeoutError") {
|
||||
defer2.reject(err);
|
||||
} else {
|
||||
defer2.resolve();
|
||||
}
|
||||
});
|
||||
responses[i] = defer2.promise;
|
||||
}
|
||||
await Promise.all(responses);
|
||||
console.timeEnd(log);
|
||||
}
|
||||
}
|
||||
|
||||
let url = `http://${hostname(server)}:${server.port}/`;
|
||||
|
||||
const responses: Response[] = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
responses.push(await fetch(url));
|
||||
}
|
||||
// we fail if any of the requests succeeded
|
||||
process.exit(responses.every(res => res.status === 500) ? 0 : 1);
|
||||
|
||||
@@ -28,16 +28,7 @@ describe("AbortSignal", () => {
|
||||
cwd: tmpdir(),
|
||||
});
|
||||
|
||||
const exitCode = await Promise.race([
|
||||
server.exited,
|
||||
(async () => {
|
||||
await Bun.sleep(5000);
|
||||
server.kill();
|
||||
return 2;
|
||||
})(),
|
||||
]);
|
||||
|
||||
expect(exitCode).toBe(0);
|
||||
expect(await server.exited).toBe(0);
|
||||
});
|
||||
|
||||
test("AbortSignal.any() should fire abort event", async () => {
|
||||
|
||||
@@ -110,15 +110,11 @@ describe("AbortSignal", () => {
|
||||
expect(() => AbortSignal.timeout(timeout)).toThrow(TypeError);
|
||||
});
|
||||
}
|
||||
// FIXME: test runner hangs when this is enabled
|
||||
test.skip("timeout works", done => {
|
||||
test("timeout works", done => {
|
||||
const abort = AbortSignal.timeout(1);
|
||||
abort.addEventListener("abort", event => {
|
||||
done();
|
||||
});
|
||||
// AbortSignal.timeout doesn't keep the event loop / process alive
|
||||
// so we set a no-op timeout
|
||||
setTimeout(() => {}, 10);
|
||||
});
|
||||
});
|
||||
describe("prototype", () => {
|
||||
|
||||
26
test/js/web/fetch/abortsignal-standalone.test.ts
Normal file
26
test/js/web/fetch/abortsignal-standalone.test.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
import { test, expect } from "bun:test";
|
||||
|
||||
for (let timeout of [1, 2, 0]) {
|
||||
test(`AbortSignal.timeout(${timeout})`, async () => {
|
||||
const count = 10_000;
|
||||
|
||||
const promises = new Array(count);
|
||||
|
||||
const signals = new Array(count);
|
||||
console.time("[" + count + "x] " + "AbortSignal.timeout(" + timeout + ")");
|
||||
for (let i = 0; i < count; i++) {
|
||||
const signal = AbortSignal.timeout(timeout);
|
||||
const { promise, resolve, reject } = Promise.withResolvers();
|
||||
promises[i] = promise;
|
||||
signals[i] = signal;
|
||||
signal.addEventListener("abort", () => {
|
||||
resolve();
|
||||
});
|
||||
}
|
||||
console.timeEnd("[" + count + "x] " + "AbortSignal.timeout(" + timeout + ")");
|
||||
|
||||
console.time("[" + count + "x] " + "await Promise.all(promises)");
|
||||
await Promise.all(promises);
|
||||
console.timeEnd("[" + count + "x] " + "await Promise.all(promises)");
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user