Fix assertion failure in spawn-related tests (#4400)

* Clean up some of the event loop code

* Support timeouts

* Defer freeing FilePoll

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
This commit is contained in:
Jarred Sumner
2023-08-29 21:17:56 -07:00
committed by GitHub
parent a846852818
commit c028b206bc
9 changed files with 167 additions and 72 deletions

View File

@@ -24,10 +24,13 @@
void Bun__internal_dispatch_ready_poll(void* loop, void* poll);
// void Bun__internal_dispatch_ready_poll(void* loop, void* poll) {}
void us_loop_run_bun_tick(struct us_loop_t *loop);
#ifndef WIN32
/* Cannot include this one on Windows */
#include <unistd.h>
#include <stdint.h>
#endif
void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs);
/* Pointer tags are used to indicate a Bun pointer versus a uSockets pointer */
#define UNSET_BITS_49_UNTIL_64 0x0000FFFFFFFFFFFF
@@ -172,7 +175,7 @@ void us_loop_run(struct us_loop_t *loop) {
}
void us_loop_run_bun_tick(struct us_loop_t *loop) {
void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs) {
us_loop_integrate(loop);
if (loop->num_polls == 0)
@@ -183,10 +186,20 @@ void us_loop_run_bun_tick(struct us_loop_t *loop) {
/* Fetch ready polls */
#ifdef LIBUS_USE_EPOLL
loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1);
if (timeoutMs > 0) {
loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, (int)timeoutMs);
} else {
loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1);
}
#else
struct timespec ts = {0, 0};
loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL);
if (timeoutMs > 0) {
struct timespec ts = {0, 0};
ts.tv_sec = timeoutMs / 1000;
ts.tv_nsec = (timeoutMs % 1000) * 1000000;
loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, &ts);
} else {
loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL);
}
#endif
/* Iterate ready polls, dispatching them by type */

View File

@@ -1706,6 +1706,7 @@ pub const FilePoll = struct {
/// on macOS kevent64 has an extra pointer field so we use it for that
/// linux doesn't have a field like that
generation_number: KQueueGenerationNumber = 0,
next_to_free: ?*FilePoll = null,
const FileReader = JSC.WebCore.FileReader;
const FileSink = JSC.WebCore.FileSink;
@@ -1789,20 +1790,21 @@ pub const FilePoll = struct {
this.deinitWithVM(vm);
}
pub fn deinitWithoutVM(this: *FilePoll, loop: *uws.Loop, polls: *JSC.FilePoll.HiveArray) void {
fn deinitPossiblyDefer(this: *FilePoll, vm: *JSC.VirtualMachine, loop: *uws.Loop, polls: *JSC.FilePoll.Store) void {
if (this.isRegistered()) {
_ = this.unregister(loop);
}
this.owner = Deactivated.owner;
const was_ever_registered = this.flags.contains(.was_ever_registered);
this.flags = Flags.Set{};
this.fd = invalid_fd;
polls.put(this);
polls.put(this, vm, was_ever_registered);
}
pub fn deinitWithVM(this: *FilePoll, vm: *JSC.VirtualMachine) void {
var loop = vm.event_loop_handle.?;
this.deinitWithoutVM(loop, vm.rareData().filePolls(vm));
this.deinitPossiblyDefer(vm, loop, vm.rareData().filePolls(vm));
}
pub fn isRegistered(this: *const FilePoll) bool {
@@ -1888,6 +1890,9 @@ pub const FilePoll = struct {
nonblocking,
was_ever_registered,
ignore_updates,
pub fn poll(this: Flags) Flags {
return switch (this) {
.readable => .poll_readable,
@@ -1949,7 +1954,64 @@ pub const FilePoll = struct {
}
};
pub const HiveArray = bun.HiveArray(FilePoll, 128).Fallback;
const HiveArray = bun.HiveArray(FilePoll, 128).Fallback;
// We defer freeing FilePoll until the end of the next event loop iteration
// This ensures that we don't free a FilePoll before the next callback is called
pub const Store = struct {
hive: HiveArray,
pending_free_head: ?*FilePoll = null,
pending_free_tail: ?*FilePoll = null,
const log = Output.scoped(.FilePoll, false);
pub fn init(allocator: std.mem.Allocator) Store {
return .{
.hive = HiveArray.init(allocator),
};
}
pub fn get(this: *Store) *FilePoll {
return this.hive.get();
}
pub fn processDeferredFrees(this: *Store) void {
var next = this.pending_free_head;
while (next) |current| {
next = current.next_to_free;
current.next_to_free = null;
this.hive.put(current);
}
this.pending_free_head = null;
this.pending_free_tail = null;
}
pub fn put(this: *Store, poll: *FilePoll, vm: *JSC.VirtualMachine, ever_registered: bool) void {
if (!ever_registered) {
this.hive.put(poll);
return;
}
std.debug.assert(poll.next_to_free == null);
if (this.pending_free_tail) |tail| {
std.debug.assert(this.pending_free_head != null);
std.debug.assert(tail.next_to_free == null);
tail.next_to_free = poll;
}
if (this.pending_free_head == null) {
this.pending_free_head = poll;
std.debug.assert(this.pending_free_tail == null);
}
poll.flags.insert(.ignore_updates);
this.pending_free_tail = poll;
std.debug.assert(vm.after_event_loop_callback == null or vm.after_event_loop_callback == @as(?JSC.OpaqueCallback, @ptrCast(&processDeferredFrees)));
vm.after_event_loop_callback = @ptrCast(&processDeferredFrees);
vm.after_event_loop_callback_ctx = this;
}
};
const log = Output.scoped(.FilePoll, false);
@@ -1999,7 +2061,6 @@ pub const FilePoll = struct {
pub fn activate(this: *FilePoll, loop: *uws.Loop) void {
loop.num_polls += @as(i32, @intFromBool(!this.flags.contains(.has_incremented_poll_count)));
loop.active += @as(u32, @intFromBool(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count)));
this.flags.insert(.has_incremented_poll_count);
}
@@ -2012,6 +2073,8 @@ pub const FilePoll = struct {
poll.fd = @intCast(fd);
poll.flags = Flags.Set.init(flags);
poll.owner = owner;
poll.next_to_free = null;
if (KQueueGenerationNumber != u0) {
max_generation_number +%= 1;
poll.generation_number = max_generation_number;
@@ -2052,7 +2115,11 @@ pub const FilePoll = struct {
if (tag.tag() != @field(Pollable.Tag, "FilePoll"))
return;
var file_poll = tag.as(FilePoll);
var file_poll: *FilePoll = tag.as(FilePoll);
if (file_poll.flags.contains(.ignore_updates)) {
return;
}
if (comptime Environment.isMac)
onKQueueEvent(file_poll, loop, &loop.ready_polls[@as(usize, @intCast(loop.current_ready_poll))])
else if (comptime Environment.isLinux)
@@ -2107,7 +2174,7 @@ pub const FilePoll = struct {
@as(std.os.fd_t, @intCast(fd)),
&event,
);
this.flags.insert(.was_ever_registered);
if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
return errno;
}
@@ -2180,6 +2247,8 @@ pub const FilePoll = struct {
}
};
this.flags.insert(.was_ever_registered);
// If an error occurs while
// processing an element of the changelist and there is enough room
// in the eventlist, then the event will be placed in the eventlist
@@ -2349,7 +2418,6 @@ pub const FilePoll = struct {
this.flags.remove(.poll_writable);
this.flags.remove(.poll_process);
this.flags.remove(.poll_machport);
if (this.isActive())
this.deactivate(loop);

View File

@@ -737,6 +737,31 @@ pub const EventLoop = struct {
if (loop.num_polls > 0 or loop.active > 0) {
loop.tick();
this.processGCTimer();
ctx.onAfterEventLoop();
// this.afterUSocketsTick();
}
}
pub fn autoTickWithTimeout(this: *EventLoop, timeoutMs: i64) void {
var ctx = this.virtual_machine;
var loop = ctx.event_loop_handle.?;
// Some tasks need to keep the event loop alive for one more tick.
// We want to keep the event loop alive long enough to process those ticks and any microtasks
//
// BUT. We don't actually have an idle event in that case.
// That means the process will be waiting forever on nothing.
// So we need to drain the counter immediately before entering uSockets loop
const pending_unref = ctx.pending_unref_counter;
if (pending_unref > 0) {
ctx.pending_unref_counter = 0;
loop.unrefCount(pending_unref);
}
if (loop.num_polls > 0 or loop.active > 0) {
loop.tickWithTimeout(timeoutMs);
this.processGCTimer();
ctx.onAfterEventLoop();
// this.afterUSocketsTick();
}
}
@@ -761,6 +786,7 @@ pub const EventLoop = struct {
loop.tick();
this.processGCTimer();
ctx.onAfterEventLoop();
this.tickConcurrent();
this.tick();
}
@@ -783,6 +809,7 @@ pub const EventLoop = struct {
if (loop.active > 0) {
loop.tick();
this.processGCTimer();
ctx.onAfterEventLoop();
// this.afterUSocketsTick();
}
}
@@ -817,26 +844,6 @@ pub const EventLoop = struct {
this.global.handleRejectedPromises();
}
pub fn runUSocketsLoop(this: *EventLoop) void {
var ctx = this.virtual_machine;
ctx.global.vm().releaseWeakRefs();
ctx.global.vm().drainMicrotasks();
var loop = ctx.event_loop_handle orelse return;
if (loop.active > 0 or (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered and (loop.num_polls > 0 or this.start_server_on_next_tick))) {
if (this.tickConcurrentWithCount() > 0) {
this.tick();
}
ctx.is_us_loop_entered = true;
this.start_server_on_next_tick = false;
ctx.enterUWSLoop();
ctx.is_us_loop_entered = false;
ctx.autoGarbageCollect();
}
}
pub fn waitForPromise(this: *EventLoop, promise: JSC.AnyPromise) void {
switch (promise.status(this.global.vm())) {
JSC.JSPromise.Status.Pending => {
@@ -852,6 +859,8 @@ pub const EventLoop = struct {
}
}
// TODO: this implementation is terrible
// we should not be checking the millitimestamp every time
pub fn waitForPromiseWithTimeout(this: *EventLoop, promise: JSC.AnyPromise, timeout: u32) bool {
return switch (promise.status(this.global.vm())) {
JSC.JSPromise.Status.Pending => {
@@ -862,12 +871,13 @@ pub const EventLoop = struct {
while (promise.status(this.global.vm()) == .Pending) {
this.tick();
if (std.time.milliTimestamp() - start_time > timeout) {
return false;
}
if (promise.status(this.global.vm()) == .Pending) {
this.autoTick();
const remaining = std.time.milliTimestamp() - start_time;
if (remaining >= timeout) {
return false;
}
this.autoTickWithTimeout(remaining);
}
}
return true;
@@ -876,21 +886,6 @@ pub const EventLoop = struct {
};
}
pub fn waitForTasks(this: *EventLoop) void {
this.tick();
while (this.tasks.count > 0) {
this.tick();
if (this.virtual_machine.event_loop_handle != null) {
this.runUSocketsLoop();
}
} else {
if (this.virtual_machine.event_loop_handle != null) {
this.runUSocketsLoop();
}
}
}
pub fn enqueueTask(this: *EventLoop, task: Task) void {
this.tasks.writeItem(task) catch unreachable;
}

View File

@@ -449,6 +449,9 @@ pub const VirtualMachine = struct {
transpiler_store: JSC.RuntimeTranspilerStore,
after_event_loop_callback_ctx: ?*anyopaque = null,
after_event_loop_callback: ?OpaqueCallback = null,
/// The arguments used to launch the process _after_ the script name and bun and any flags applied to Bun
/// "bun run foo --bar"
/// ["--bar"]
@@ -479,7 +482,6 @@ pub const VirtualMachine = struct {
active_tasks: usize = 0,
rare_data: ?*JSC.RareData = null,
us_loop_reference_count: usize = 0,
is_us_loop_entered: bool = false,
pending_internal_promise: *JSC.JSInternalPromise = undefined,
auto_install_dependencies: bool = false,
@@ -533,6 +535,21 @@ pub const VirtualMachine = struct {
return this.rareData().mimeTypeFromString(this.allocator, str);
}
pub fn onAfterEventLoop(this: *VirtualMachine) void {
if (this.after_event_loop_callback) |cb| {
var ctx = this.after_event_loop_callback_ctx;
this.after_event_loop_callback = null;
this.after_event_loop_callback_ctx = null;
cb(ctx);
}
}
pub fn isEventLoopAlive(vm: *const VirtualMachine) bool {
return vm.active_tasks > 0 or
vm.event_loop_handle.?.active > 0 or
vm.event_loop.tasks.count > 0;
}
const SourceMapHandlerGetter = struct {
vm: *VirtualMachine,
printer: *js_printer.BufferPrinter,
@@ -732,7 +749,7 @@ pub const VirtualMachine = struct {
this.exit_handler.dispatchOnBeforeExit();
var dispatch = false;
while (true) {
while (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) : (dispatch = true) {
while (this.isEventLoopAlive()) : (dispatch = true) {
this.tick();
this.eventLoop().autoTickActive();
}
@@ -741,7 +758,7 @@ pub const VirtualMachine = struct {
this.exit_handler.dispatchOnBeforeExit();
dispatch = false;
if (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) continue;
if (this.isEventLoopAlive()) continue;
}
break;
@@ -884,7 +901,7 @@ pub const VirtualMachine = struct {
this.eventLoop().tick();
while (true) {
while (this.eventLoop().tasks.count > 0 or this.active_tasks > 0 or this.event_loop_handle.?.active > 0) {
while (this.isEventLoopAlive()) {
this.tick();
this.eventLoop().autoTickActive();
}

View File

@@ -26,7 +26,7 @@ hot_map: ?HotMap = null,
tail_cleanup_hook: ?*CleanupHook = null,
cleanup_hook: ?*CleanupHook = null,
file_polls_: ?*JSC.FilePoll.HiveArray = null,
file_polls_: ?*JSC.FilePoll.Store = null,
global_dns_data: ?*JSC.DNS.GlobalData = null,
@@ -102,10 +102,10 @@ pub const HotMap = struct {
}
};
pub fn filePolls(this: *RareData, vm: *JSC.VirtualMachine) *JSC.FilePoll.HiveArray {
pub fn filePolls(this: *RareData, vm: *JSC.VirtualMachine) *JSC.FilePoll.Store {
return this.file_polls_ orelse {
this.file_polls_ = vm.allocator.create(JSC.FilePoll.HiveArray) catch unreachable;
this.file_polls_.?.* = JSC.FilePoll.HiveArray.init(vm.allocator);
this.file_polls_ = vm.allocator.create(JSC.FilePoll.Store) catch unreachable;
this.file_polls_.?.* = JSC.FilePoll.Store.init(vm.allocator);
return this.file_polls_.?;
};
}

View File

@@ -263,8 +263,7 @@ pub const WebWorker = struct {
this.setStatus(.running);
// don't run the GC if we don't actually need to
if (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or
vm.event_loop_handle.?.active > 0 or
if (vm.isEventLoopAlive() or
vm.eventLoop().tickConcurrentWithCount() > 0)
{
vm.global.vm().releaseWeakRefs();
@@ -275,7 +274,7 @@ pub const WebWorker = struct {
// always doing a first tick so we call CppTask without delay after dispatchOnline
vm.tick();
while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) {
while (vm.isEventLoopAlive()) {
vm.tick();
if (this.requested_terminate) break;
vm.eventLoop().autoTickActive();

View File

@@ -297,8 +297,7 @@ pub const Run = struct {
}
// don't run the GC if we don't actually need to
if (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or
vm.event_loop_handle.?.active > 0 or
if (vm.isEventLoopAlive() or
vm.eventLoop().tickConcurrentWithCount() > 0)
{
vm.global.vm().releaseWeakRefs();
@@ -315,7 +314,7 @@ pub const Run = struct {
}
while (true) {
while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) {
while (vm.isEventLoopAlive()) {
vm.tick();
// Report exceptions in hot-reloaded modules
@@ -343,7 +342,7 @@ pub const Run = struct {
vm.onUnhandledError(this.vm.global, this.vm.pending_internal_promise.result(vm.global.vm()));
}
} else {
while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) {
while (vm.isEventLoopAlive()) {
vm.tick();
vm.eventLoop().autoTickActive();
}

View File

@@ -835,7 +835,7 @@ pub const TestCommand = struct {
vm.eventLoop().tickPossiblyForever();
while (true) {
while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.event_loop_handle.?.active > 0) {
while (vm.isEventLoopAlive()) {
vm.tick();
vm.eventLoop().autoTickActive();
}

View File

@@ -827,7 +827,11 @@ pub const Loop = extern struct {
}
pub fn tick(this: *Loop) void {
us_loop_run_bun_tick(this);
us_loop_run_bun_tick(this, 0);
}
pub fn tickWithTimeout(this: *Loop, timeoutMs: i64) void {
us_loop_run_bun_tick(this, timeoutMs);
}
pub fn nextTick(this: *Loop, comptime UserType: type, user_data: UserType, comptime deferCallback: fn (ctx: UserType) void) void {
@@ -889,7 +893,7 @@ pub const Loop = extern struct {
extern fn us_loop_free(loop: ?*Loop) void;
extern fn us_loop_ext(loop: ?*Loop) ?*anyopaque;
extern fn us_loop_run(loop: ?*Loop) void;
extern fn us_loop_run_bun_tick(loop: ?*Loop) void;
extern fn us_loop_run_bun_tick(loop: ?*Loop, timouetMs: i64) void;
extern fn us_wakeup_loop(loop: ?*Loop) void;
extern fn us_loop_integrate(loop: ?*Loop) void;
extern fn us_loop_iteration_number(loop: ?*Loop) c_longlong;