Fix 100% CPU usage with idle WebSocket connections on macOS (kqueue) (#25475)

### What does this PR do?

Fixes a bug where idle WebSocket connections would cause 100% CPU usage
on macOS and other BSD systems using kqueue.

**Root cause:** The kqueue event filter comparison was using bitwise AND
(`&`) instead of equality (`==`) when checking the filter type. Combined
with missing `EV_ONESHOT` flags on writable events, this caused the
event loop to continuously spin even when no actual I/O was pending.

**Changes:**
1. **Fixed filter comparison** in `epoll_kqueue.c`: Changed `filter &
EVFILT_READ` to `filter == EVFILT_READ` (same for `EVFILT_WRITE`). The
filter field is a value, not a bitmask.

2. **Added `EV_ONESHOT` flag** to writable events: kqueue writable
events now use one-shot mode to prevent continuous triggering.

3. **Re-arm writable events when needed**: After a one-shot writable
event fires, the code now properly updates the poll state and re-arms
the writable event if another write is still pending.

### How did you verify your code works?

Added a test that:
1. Creates a TLS WebSocket server and client
2. Sends messages then lets the connection sit idle
3. Measures CPU usage over 3 seconds
4. Fails if CPU usage exceeds 2% (expected is ~0.XX% when idle)
This commit is contained in:
Ciro Spaciari
2025-12-12 11:10:22 -08:00
committed by GitHub
parent 7dcd49f832
commit a5712b92b8
4 changed files with 84 additions and 6 deletions

View File

@@ -228,8 +228,8 @@ void us_loop_run(struct us_loop_t *loop) {
// > Instead, the filter will aggregate the events into a single kevent struct
// Note: EV_ERROR only sets the error in data as part of changelist. Not in this call!
int events = 0
| ((filter & EVFILT_READ) ? LIBUS_SOCKET_READABLE : 0)
| ((filter & EVFILT_WRITE) ? LIBUS_SOCKET_WRITABLE : 0);
| ((filter == EVFILT_READ) ? LIBUS_SOCKET_READABLE : 0)
| ((filter == EVFILT_WRITE) ? LIBUS_SOCKET_WRITABLE : 0);
const int error = (flags & (EV_ERROR)) ? ((int)fflags || 1) : 0;
const int eof = (flags & (EV_EOF));
#endif
@@ -360,11 +360,11 @@ int kqueue_change(int kqfd, int fd, int old_events, int new_events, void *user_d
if(!is_readable && !is_writable) {
if(!(old_events & LIBUS_SOCKET_WRITABLE)) {
// if we are not reading or writing, we need to add writable to receive FIN
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, EV_ADD, 0, 0, (uint64_t)(void*)user_data, 0, 0);
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, (uint64_t)(void*)user_data, 0, 0);
}
} else if ((new_events & LIBUS_SOCKET_WRITABLE) != (old_events & LIBUS_SOCKET_WRITABLE)) {
/* Do they differ in writable? */
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0);
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD | EV_ONESHOT : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0);
}
int ret;
do {

View File

@@ -375,7 +375,11 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
/* Note: if we failed a write as a socket of one loop then adopted
* to another loop, this will be wrong. Absurd case though */
loop->data.last_write_failed = 0;
#ifdef LIBUS_USE_KQUEUE
/* Kqueue is one-shot so is not writable anymore */
p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0);
#endif
s = s->context->on_writable(s);
if (!s || us_socket_is_closed(0, s)) {
@@ -385,6 +389,11 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
/* If we have no failed write or if we shut down, then stop polling for more writable */
if (!loop->data.last_write_failed || us_socket_is_shut_down(0, s)) {
us_poll_change(&s->p, loop, us_poll_events(&s->p) & LIBUS_SOCKET_READABLE);
} else {
#ifdef LIBUS_USE_KQUEUE
/* Kqueue one-shot writable needs to be re-enabled */
us_poll_change(&s->p, loop, us_poll_events(&s->p) | LIBUS_SOCKET_WRITABLE);
#endif
}
}

View File

@@ -1,9 +1,13 @@
import type { Server, ServerWebSocket, Socket } from "bun";
import { describe, expect, test } from "bun:test";
import { bunEnv, bunExe, rejectUnauthorizedScope, tempDirWithFiles, tls } from "harness";
import { bunEnv, bunExe, bunRun, rejectUnauthorizedScope, tempDirWithFiles, tls } from "harness";
import path from "path";
describe.concurrent("Server", () => {
test("should not use 100% CPU when websocket is idle", async () => {
const { stderr } = bunRun(path.join(import.meta.dir, "bun-websocket-cpu-fixture.js"));
expect(stderr).toBe("");
});
test("normlizes incoming request URLs", async () => {
using server = Bun.serve({
fetch(request) {

View File

@@ -0,0 +1,65 @@
import path from "path";
const server = Bun.serve({
port: 0,
idleTimeout: 100,
tls: {
cert: Bun.file(path.join(import.meta.dir, "fixtures", "cert.pem")),
key: Bun.file(path.join(import.meta.dir, "fixtures", "cert.key")),
},
fetch(req, server) {
if (server.upgrade(req)) {
return;
}
return new Response("Upgrade failed", { status: 500 });
},
websocket: {
idleTimeout: 120,
open(ws) {},
message(ws, message) {
ws.send(message);
},
},
});
const ws = new WebSocket(`wss://${server.hostname}:${server.port}`, { tls: { rejectUnauthorized: false } });
await Bun.sleep(1000);
for (let i = 0; i < 1000; i++) {
ws.send("hello");
}
let bytesReceived = 0;
ws.onmessage = event => {
bytesReceived += event.data.length;
};
let previousUsage = process.cpuUsage();
let previousTime = Date.now();
let count = 0;
setInterval(() => {
count++;
const currentUsage = process.cpuUsage(previousUsage);
const currentTime = Date.now();
const userCpuTime = currentUsage.user; // microseconds
const systemCpuTime = currentUsage.system; // microseconds
const totalCpuTime = userCpuTime + systemCpuTime;
const timeDeltaMs = currentTime - previousTime; // milliseconds
const timeDeltaMicroseconds = timeDeltaMs * 1000; // convert to microseconds
// Calculate percentage for the current process
const cpuUsagePercentage = (totalCpuTime / timeDeltaMicroseconds) * 100;
console.log(`CPU Usage: ${cpuUsagePercentage.toFixed(2)}%`);
previousUsage = process.cpuUsage(); // Update for the next interval
previousTime = currentTime;
if (count == 3) {
server.stop(true);
// The expected value is around 0.XX%, but we allow a 2% margin of error to account for potential flakiness.
process.exit(cpuUsagePercentage < 2 ? 0 : 1);
}
}, 1000);