From a5712b92b8a3cf2f972628c81c2ca435e5878f4c Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Fri, 12 Dec 2025 11:10:22 -0800 Subject: [PATCH] Fix 100% CPU usage with idle WebSocket connections on macOS (kqueue) (#25475) ### What does this PR do? Fixes a bug where idle WebSocket connections would cause 100% CPU usage on macOS and other BSD systems using kqueue. **Root cause:** The kqueue event filter comparison was using bitwise AND (`&`) instead of equality (`==`) when checking the filter type. Combined with missing `EV_ONESHOT` flags on writable events, this caused the event loop to continuously spin even when no actual I/O was pending. **Changes:** 1. **Fixed filter comparison** in `epoll_kqueue.c`: Changed `filter & EVFILT_READ` to `filter == EVFILT_READ` (same for `EVFILT_WRITE`). The filter field is a value, not a bitmask. 2. **Added `EV_ONESHOT` flag** to writable events: kqueue writable events now use one-shot mode to prevent continuous triggering. 3. **Re-arm writable events when needed**: After a one-shot writable event fires, the code now properly updates the poll state and re-arms the writable event if another write is still pending. ### How did you verify your code works? Added a test that: 1. Creates a TLS WebSocket server and client 2. Sends messages then lets the connection sit idle 3. Measures CPU usage over 3 seconds 4. Fails if CPU usage exceeds 2% (expected is ~0.XX% when idle) --- .../bun-usockets/src/eventing/epoll_kqueue.c | 8 +-- packages/bun-usockets/src/loop.c | 11 +++- test/js/bun/http/bun-server.test.ts | 6 +- test/js/bun/http/bun-websocket-cpu-fixture.js | 65 +++++++++++++++++++ 4 files changed, 84 insertions(+), 6 deletions(-) create mode 100644 test/js/bun/http/bun-websocket-cpu-fixture.js diff --git a/packages/bun-usockets/src/eventing/epoll_kqueue.c b/packages/bun-usockets/src/eventing/epoll_kqueue.c index 4c59ca6382..e796b16c05 100644 --- a/packages/bun-usockets/src/eventing/epoll_kqueue.c +++ b/packages/bun-usockets/src/eventing/epoll_kqueue.c @@ -228,8 +228,8 @@ void us_loop_run(struct us_loop_t *loop) { // > Instead, the filter will aggregate the events into a single kevent struct // Note: EV_ERROR only sets the error in data as part of changelist. Not in this call! int events = 0 - | ((filter & EVFILT_READ) ? LIBUS_SOCKET_READABLE : 0) - | ((filter & EVFILT_WRITE) ? LIBUS_SOCKET_WRITABLE : 0); + | ((filter == EVFILT_READ) ? LIBUS_SOCKET_READABLE : 0) + | ((filter == EVFILT_WRITE) ? LIBUS_SOCKET_WRITABLE : 0); const int error = (flags & (EV_ERROR)) ? ((int)fflags || 1) : 0; const int eof = (flags & (EV_EOF)); #endif @@ -360,11 +360,11 @@ int kqueue_change(int kqfd, int fd, int old_events, int new_events, void *user_d if(!is_readable && !is_writable) { if(!(old_events & LIBUS_SOCKET_WRITABLE)) { // if we are not reading or writing, we need to add writable to receive FIN - EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, EV_ADD, 0, 0, (uint64_t)(void*)user_data, 0, 0); + EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, (uint64_t)(void*)user_data, 0, 0); } } else if ((new_events & LIBUS_SOCKET_WRITABLE) != (old_events & LIBUS_SOCKET_WRITABLE)) { /* Do they differ in writable? */ - EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0); + EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD | EV_ONESHOT : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0); } int ret; do { diff --git a/packages/bun-usockets/src/loop.c b/packages/bun-usockets/src/loop.c index 621f118b47..1f4c232474 100644 --- a/packages/bun-usockets/src/loop.c +++ b/packages/bun-usockets/src/loop.c @@ -375,7 +375,11 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in /* Note: if we failed a write as a socket of one loop then adopted * to another loop, this will be wrong. Absurd case though */ loop->data.last_write_failed = 0; - + #ifdef LIBUS_USE_KQUEUE + /* Kqueue is one-shot so is not writable anymore */ + p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0); + #endif + s = s->context->on_writable(s); if (!s || us_socket_is_closed(0, s)) { @@ -385,6 +389,11 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in /* If we have no failed write or if we shut down, then stop polling for more writable */ if (!loop->data.last_write_failed || us_socket_is_shut_down(0, s)) { us_poll_change(&s->p, loop, us_poll_events(&s->p) & LIBUS_SOCKET_READABLE); + } else { + #ifdef LIBUS_USE_KQUEUE + /* Kqueue one-shot writable needs to be re-enabled */ + us_poll_change(&s->p, loop, us_poll_events(&s->p) | LIBUS_SOCKET_WRITABLE); + #endif } } diff --git a/test/js/bun/http/bun-server.test.ts b/test/js/bun/http/bun-server.test.ts index 68922965e7..f1c1e02b61 100644 --- a/test/js/bun/http/bun-server.test.ts +++ b/test/js/bun/http/bun-server.test.ts @@ -1,9 +1,13 @@ import type { Server, ServerWebSocket, Socket } from "bun"; import { describe, expect, test } from "bun:test"; -import { bunEnv, bunExe, rejectUnauthorizedScope, tempDirWithFiles, tls } from "harness"; +import { bunEnv, bunExe, bunRun, rejectUnauthorizedScope, tempDirWithFiles, tls } from "harness"; import path from "path"; describe.concurrent("Server", () => { + test("should not use 100% CPU when websocket is idle", async () => { + const { stderr } = bunRun(path.join(import.meta.dir, "bun-websocket-cpu-fixture.js")); + expect(stderr).toBe(""); + }); test("normlizes incoming request URLs", async () => { using server = Bun.serve({ fetch(request) { diff --git a/test/js/bun/http/bun-websocket-cpu-fixture.js b/test/js/bun/http/bun-websocket-cpu-fixture.js new file mode 100644 index 0000000000..a2d03babfe --- /dev/null +++ b/test/js/bun/http/bun-websocket-cpu-fixture.js @@ -0,0 +1,65 @@ +import path from "path"; + +const server = Bun.serve({ + port: 0, + idleTimeout: 100, + tls: { + cert: Bun.file(path.join(import.meta.dir, "fixtures", "cert.pem")), + key: Bun.file(path.join(import.meta.dir, "fixtures", "cert.key")), + }, + fetch(req, server) { + if (server.upgrade(req)) { + return; + } + return new Response("Upgrade failed", { status: 500 }); + }, + websocket: { + idleTimeout: 120, + open(ws) {}, + message(ws, message) { + ws.send(message); + }, + }, +}); + +const ws = new WebSocket(`wss://${server.hostname}:${server.port}`, { tls: { rejectUnauthorized: false } }); +await Bun.sleep(1000); +for (let i = 0; i < 1000; i++) { + ws.send("hello"); +} +let bytesReceived = 0; +ws.onmessage = event => { + bytesReceived += event.data.length; +}; + +let previousUsage = process.cpuUsage(); +let previousTime = Date.now(); + +let count = 0; +setInterval(() => { + count++; + + const currentUsage = process.cpuUsage(previousUsage); + const currentTime = Date.now(); + + const userCpuTime = currentUsage.user; // microseconds + const systemCpuTime = currentUsage.system; // microseconds + const totalCpuTime = userCpuTime + systemCpuTime; + + const timeDeltaMs = currentTime - previousTime; // milliseconds + const timeDeltaMicroseconds = timeDeltaMs * 1000; // convert to microseconds + + // Calculate percentage for the current process + const cpuUsagePercentage = (totalCpuTime / timeDeltaMicroseconds) * 100; + + console.log(`CPU Usage: ${cpuUsagePercentage.toFixed(2)}%`); + + previousUsage = process.cpuUsage(); // Update for the next interval + previousTime = currentTime; + + if (count == 3) { + server.stop(true); + // The expected value is around 0.XX%, but we allow a 2% margin of error to account for potential flakiness. + process.exit(cpuUsagePercentage < 2 ? 0 : 1); + } +}, 1000);