diff --git a/packages/bun-usockets/src/eventing/epoll_kqueue.c b/packages/bun-usockets/src/eventing/epoll_kqueue.c index 4c59ca6382..e796b16c05 100644 --- a/packages/bun-usockets/src/eventing/epoll_kqueue.c +++ b/packages/bun-usockets/src/eventing/epoll_kqueue.c @@ -228,8 +228,8 @@ void us_loop_run(struct us_loop_t *loop) { // > Instead, the filter will aggregate the events into a single kevent struct // Note: EV_ERROR only sets the error in data as part of changelist. Not in this call! int events = 0 - | ((filter & EVFILT_READ) ? LIBUS_SOCKET_READABLE : 0) - | ((filter & EVFILT_WRITE) ? LIBUS_SOCKET_WRITABLE : 0); + | ((filter == EVFILT_READ) ? LIBUS_SOCKET_READABLE : 0) + | ((filter == EVFILT_WRITE) ? LIBUS_SOCKET_WRITABLE : 0); const int error = (flags & (EV_ERROR)) ? ((int)fflags || 1) : 0; const int eof = (flags & (EV_EOF)); #endif @@ -360,11 +360,11 @@ int kqueue_change(int kqfd, int fd, int old_events, int new_events, void *user_d if(!is_readable && !is_writable) { if(!(old_events & LIBUS_SOCKET_WRITABLE)) { // if we are not reading or writing, we need to add writable to receive FIN - EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, EV_ADD, 0, 0, (uint64_t)(void*)user_data, 0, 0); + EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, (uint64_t)(void*)user_data, 0, 0); } } else if ((new_events & LIBUS_SOCKET_WRITABLE) != (old_events & LIBUS_SOCKET_WRITABLE)) { /* Do they differ in writable? */ - EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0); + EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD | EV_ONESHOT : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0); } int ret; do { diff --git a/packages/bun-usockets/src/loop.c b/packages/bun-usockets/src/loop.c index 621f118b47..1f4c232474 100644 --- a/packages/bun-usockets/src/loop.c +++ b/packages/bun-usockets/src/loop.c @@ -375,7 +375,11 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in /* Note: if we failed a write as a socket of one loop then adopted * to another loop, this will be wrong. Absurd case though */ loop->data.last_write_failed = 0; - + #ifdef LIBUS_USE_KQUEUE + /* Kqueue is one-shot so is not writable anymore */ + p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0); + #endif + s = s->context->on_writable(s); if (!s || us_socket_is_closed(0, s)) { @@ -385,6 +389,11 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in /* If we have no failed write or if we shut down, then stop polling for more writable */ if (!loop->data.last_write_failed || us_socket_is_shut_down(0, s)) { us_poll_change(&s->p, loop, us_poll_events(&s->p) & LIBUS_SOCKET_READABLE); + } else { + #ifdef LIBUS_USE_KQUEUE + /* Kqueue one-shot writable needs to be re-enabled */ + us_poll_change(&s->p, loop, us_poll_events(&s->p) | LIBUS_SOCKET_WRITABLE); + #endif } } diff --git a/test/js/bun/http/bun-server.test.ts b/test/js/bun/http/bun-server.test.ts index 68922965e7..f1c1e02b61 100644 --- a/test/js/bun/http/bun-server.test.ts +++ b/test/js/bun/http/bun-server.test.ts @@ -1,9 +1,13 @@ import type { Server, ServerWebSocket, Socket } from "bun"; import { describe, expect, test } from "bun:test"; -import { bunEnv, bunExe, rejectUnauthorizedScope, tempDirWithFiles, tls } from "harness"; +import { bunEnv, bunExe, bunRun, rejectUnauthorizedScope, tempDirWithFiles, tls } from "harness"; import path from "path"; describe.concurrent("Server", () => { + test("should not use 100% CPU when websocket is idle", async () => { + const { stderr } = bunRun(path.join(import.meta.dir, "bun-websocket-cpu-fixture.js")); + expect(stderr).toBe(""); + }); test("normlizes incoming request URLs", async () => { using server = Bun.serve({ fetch(request) { diff --git a/test/js/bun/http/bun-websocket-cpu-fixture.js b/test/js/bun/http/bun-websocket-cpu-fixture.js new file mode 100644 index 0000000000..a2d03babfe --- /dev/null +++ b/test/js/bun/http/bun-websocket-cpu-fixture.js @@ -0,0 +1,65 @@ +import path from "path"; + +const server = Bun.serve({ + port: 0, + idleTimeout: 100, + tls: { + cert: Bun.file(path.join(import.meta.dir, "fixtures", "cert.pem")), + key: Bun.file(path.join(import.meta.dir, "fixtures", "cert.key")), + }, + fetch(req, server) { + if (server.upgrade(req)) { + return; + } + return new Response("Upgrade failed", { status: 500 }); + }, + websocket: { + idleTimeout: 120, + open(ws) {}, + message(ws, message) { + ws.send(message); + }, + }, +}); + +const ws = new WebSocket(`wss://${server.hostname}:${server.port}`, { tls: { rejectUnauthorized: false } }); +await Bun.sleep(1000); +for (let i = 0; i < 1000; i++) { + ws.send("hello"); +} +let bytesReceived = 0; +ws.onmessage = event => { + bytesReceived += event.data.length; +}; + +let previousUsage = process.cpuUsage(); +let previousTime = Date.now(); + +let count = 0; +setInterval(() => { + count++; + + const currentUsage = process.cpuUsage(previousUsage); + const currentTime = Date.now(); + + const userCpuTime = currentUsage.user; // microseconds + const systemCpuTime = currentUsage.system; // microseconds + const totalCpuTime = userCpuTime + systemCpuTime; + + const timeDeltaMs = currentTime - previousTime; // milliseconds + const timeDeltaMicroseconds = timeDeltaMs * 1000; // convert to microseconds + + // Calculate percentage for the current process + const cpuUsagePercentage = (totalCpuTime / timeDeltaMicroseconds) * 100; + + console.log(`CPU Usage: ${cpuUsagePercentage.toFixed(2)}%`); + + previousUsage = process.cpuUsage(); // Update for the next interval + previousTime = currentTime; + + if (count == 3) { + server.stop(true); + // The expected value is around 0.XX%, but we allow a 2% margin of error to account for potential flakiness. + process.exit(cpuUsagePercentage < 2 ? 0 : 1); + } +}, 1000);