Compare commits

...

2 Commits

Author SHA1 Message Date
Claude Bot
74b6e704d8 Coalesce kqueue events to handle multiple filters per fd
On kqueue (macOS), a single file descriptor can have multiple events
(EVFILT_READ and EVFILT_WRITE) which arrive as separate kevent structures.
This could cause the same poll to be dispatched multiple times per event
loop tick.

This change implements event coalescing:
1. Sort events by udata (poll pointer) using qsort
2. Coalesce adjacent events with matching udata into a single logical event
3. Mark duplicate events as skipped
4. Dispatch only the coalesced events

This ensures each poll is dispatched at most once per event loop iteration,
with all of its ready events (readable/writable/error/eof) combined.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-23 05:21:02 +00:00
Jarred Sumner
9e0d78e711 Fix kqueue readable & writable check 2025-10-22 20:46:14 -07:00

View File

@@ -24,6 +24,26 @@
void Bun__internal_dispatch_ready_poll(void* loop, void* poll);
// void Bun__internal_dispatch_ready_poll(void* loop, void* poll) {}
#ifdef LIBUS_USE_KQUEUE
/* Structure for coalescing kqueue events */
struct KQueueEvent {
unsigned int writable : 1;
unsigned int readable : 1;
unsigned int error : 1;
unsigned int eof : 1;
unsigned int skipped : 1;
};
/* Comparison function for sorting kevents by udata */
static int kqueue_event_compare(const void *a, const void *b) {
const struct kevent64_s *ea = (const struct kevent64_s *)a;
const struct kevent64_s *eb = (const struct kevent64_s *)b;
if (ea->udata < eb->udata) return -1;
if (ea->udata > eb->udata) return 1;
return 0;
}
#endif
#ifndef WIN32
/* Cannot include this one on Windows */
#include <unistd.h>
@@ -203,10 +223,61 @@ void us_loop_run(struct us_loop_t *loop) {
do {
loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL);
} while (IS_EINTR(loop->num_ready_polls));
/* For kqueue, coalesce events by udata to handle multiple filters per fd */
struct KQueueEvent coalesced_events[1024];
memset(coalesced_events, 0, sizeof(coalesced_events));
/* First pass: sort events by udata */
if (loop->num_ready_polls > 0) {
qsort(loop->ready_polls, loop->num_ready_polls, sizeof(struct kevent64_s), kqueue_event_compare);
}
/* Second pass: coalesce events and fill KQueueEvent struct */
uint64_t prev_udata = 0;
int prev_index = -1;
for (int i = 0; i < loop->num_ready_polls; i++) {
const struct kevent64_s* current_kevent = &loop->ready_polls[i];
const uint64_t current_udata = current_kevent->udata;
const int16_t filter = current_kevent->filter;
const uint16_t flags = current_kevent->flags;
const uint32_t fflags = current_kevent->fflags;
int target_index = i;
/* If this udata matches previous, coalesce into previous event */
if (i > 0 && current_udata == prev_udata && prev_index >= 0) {
target_index = prev_index;
coalesced_events[i].skipped = 1;
} else {
prev_index = i;
prev_udata = current_udata;
}
/* Accumulate event flags */
if (filter == EVFILT_READ) {
coalesced_events[target_index].readable = 1;
} else if (filter == EVFILT_WRITE) {
coalesced_events[target_index].writable = 1;
}
if (flags & EV_ERROR) {
coalesced_events[target_index].error = 1;
}
if (flags & EV_EOF) {
coalesced_events[target_index].eof = 1;
}
}
#endif
/* Iterate ready polls, dispatching them by type */
for (loop->current_ready_poll = 0; loop->current_ready_poll < loop->num_ready_polls; loop->current_ready_poll++) {
#ifdef LIBUS_USE_KQUEUE
/* Skip coalesced events */
if (coalesced_events[loop->current_ready_poll].skipped) {
continue;
}
#endif
struct us_poll_t *poll = GET_READY_POLL(loop, loop->current_ready_poll);
/* Any ready poll marked with nullptr will be ignored */
if (LIKELY(poll)) {
@@ -219,19 +290,12 @@ void us_loop_run(struct us_loop_t *loop) {
const int error = events & EPOLLERR;
const int eof = events & EPOLLHUP;
#else
const struct kevent64_s* current_kevent = &loop->ready_polls[loop->current_ready_poll];
const int16_t filter = current_kevent->filter;
const uint16_t flags = current_kevent->flags;
const uint32_t fflags = current_kevent->fflags;
// > Multiple events which trigger the filter do not result in multiple kevents being placed on the kqueue
// > Instead, the filter will aggregate the events into a single kevent struct
// Note: EV_ERROR only sets the error in data as part of changelist. Not in this call!
/* Use coalesced event data */
int events = 0
| ((filter & EVFILT_READ) ? LIBUS_SOCKET_READABLE : 0)
| ((filter & EVFILT_WRITE) ? LIBUS_SOCKET_WRITABLE : 0);
const int error = (flags & (EV_ERROR)) ? ((int)fflags || 1) : 0;
const int eof = (flags & (EV_EOF));
| (coalesced_events[loop->current_ready_poll].readable ? LIBUS_SOCKET_READABLE : 0)
| (coalesced_events[loop->current_ready_poll].writable ? LIBUS_SOCKET_WRITABLE : 0);
const int error = coalesced_events[loop->current_ready_poll].error;
const int eof = coalesced_events[loop->current_ready_poll].eof;
#endif
/* Always filter all polls by what they actually poll for (callback polls always poll for readable) */
events &= us_poll_events(poll);
@@ -264,7 +328,7 @@ void us_loop_run_bun_tick(struct us_loop_t *loop, const struct timespec* timeout
us_internal_loop_pre(loop);
if (loop->data.jsc_vm)
if (loop->data.jsc_vm)
Bun__JSC_onBeforeWait(loop->data.jsc_vm);
/* Fetch ready polls */
@@ -274,11 +338,62 @@ void us_loop_run_bun_tick(struct us_loop_t *loop, const struct timespec* timeout
do {
loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, timeout);
} while (IS_EINTR(loop->num_ready_polls));
/* For kqueue, coalesce events by udata to handle multiple filters per fd */
struct KQueueEvent coalesced_events[1024];
memset(coalesced_events, 0, sizeof(coalesced_events));
/* First pass: sort events by udata */
if (loop->num_ready_polls > 0) {
qsort(loop->ready_polls, loop->num_ready_polls, sizeof(struct kevent64_s), kqueue_event_compare);
}
/* Second pass: coalesce events and fill KQueueEvent struct */
uint64_t prev_udata = 0;
int prev_index = -1;
for (int i = 0; i < loop->num_ready_polls; i++) {
const struct kevent64_s* current_kevent = &loop->ready_polls[i];
const uint64_t current_udata = current_kevent->udata;
const int16_t filter = current_kevent->filter;
const uint16_t flags = current_kevent->flags;
const uint32_t fflags = current_kevent->fflags;
int target_index = i;
/* If this udata matches previous, coalesce into previous event */
if (i > 0 && current_udata == prev_udata && prev_index >= 0) {
target_index = prev_index;
coalesced_events[i].skipped = 1;
} else {
prev_index = i;
prev_udata = current_udata;
}
/* Accumulate event flags */
if (filter == EVFILT_READ) {
coalesced_events[target_index].readable = 1;
} else if (filter == EVFILT_WRITE) {
coalesced_events[target_index].writable = 1;
}
if (flags & EV_ERROR) {
coalesced_events[target_index].error = 1;
}
if (flags & EV_EOF) {
coalesced_events[target_index].eof = 1;
}
}
#endif
/* Iterate ready polls, dispatching them by type */
for (loop->current_ready_poll = 0; loop->current_ready_poll < loop->num_ready_polls; loop->current_ready_poll++) {
#ifdef LIBUS_USE_KQUEUE
/* Skip coalesced events */
if (coalesced_events[loop->current_ready_poll].skipped) {
continue;
}
#endif
struct us_poll_t *poll = GET_READY_POLL(loop, loop->current_ready_poll);
/* Any ready poll marked with nullptr will be ignored */
if (LIKELY(poll)) {
@@ -291,21 +406,12 @@ void us_loop_run_bun_tick(struct us_loop_t *loop, const struct timespec* timeout
const int error = events & EPOLLERR;
const int eof = events & EPOLLHUP;
#else
const struct kevent64_s* current_kevent = &loop->ready_polls[loop->current_ready_poll];
const int16_t filter = current_kevent->filter;
const uint16_t flags = current_kevent->flags;
const uint32_t fflags = current_kevent->fflags;
// > Multiple events which trigger the filter do not result in multiple kevents being placed on the kqueue
// > Instead, the filter will aggregate the events into a single kevent struct
/* Use coalesced event data */
int events = 0
| ((filter & EVFILT_READ) ? LIBUS_SOCKET_READABLE : 0)
| ((filter & EVFILT_WRITE) ? LIBUS_SOCKET_WRITABLE : 0);
// Note: EV_ERROR only sets the error in data as part of changelist. Not in this call!
const int error = (flags & (EV_ERROR)) ? ((int)fflags || 1) : 0;
const int eof = (flags & (EV_EOF));
| (coalesced_events[loop->current_ready_poll].readable ? LIBUS_SOCKET_READABLE : 0)
| (coalesced_events[loop->current_ready_poll].writable ? LIBUS_SOCKET_WRITABLE : 0);
const int error = coalesced_events[loop->current_ready_poll].error;
const int eof = coalesced_events[loop->current_ready_poll].eof;
#endif
/* Always filter all polls by what they actually poll for (callback polls always poll for readable) */
events &= us_poll_events(poll);
@@ -360,11 +466,11 @@ int kqueue_change(int kqfd, int fd, int old_events, int new_events, void *user_d
if(!is_readable && !is_writable) {
if(!(old_events & LIBUS_SOCKET_WRITABLE)) {
// if we are not reading or writing, we need to add writable to receive FIN
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, EV_ADD, 0, 0, (uint64_t)(void*)user_data, 0, 0);
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, (uint64_t)(void*)user_data, 0, 0);
}
} else if ((new_events & LIBUS_SOCKET_WRITABLE) != (old_events & LIBUS_SOCKET_WRITABLE)) {
/* Do they differ in writable? */
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0);
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? (EV_ADD | EV_ONESHOT) : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0);
}
int ret;
do {