diff --git a/src/bun.js/api/server/NodeHTTPResponse.zig b/src/bun.js/api/server/NodeHTTPResponse.zig index 67bc27c3a0..707683602a 100644 --- a/src/bun.js/api/server/NodeHTTPResponse.zig +++ b/src/bun.js/api/server/NodeHTTPResponse.zig @@ -193,6 +193,9 @@ pub fn upgrade(this: *NodeHTTPResponse, data_value: JSValue, sec_websocket_proto if (this.raw_response) |raw_response| { this.raw_response = null; this.flags.upgraded = true; + // Unref the poll_ref since the socket is now upgraded to WebSocket + // and will have its own lifecycle management + this.poll_ref.unref(this.server.globalThis().bunVM()); _ = raw_response.upgrade(*ServerWebSocket, ws, websocket_key, sec_websocket_protocol_value, sec_websocket_extensions_value, upgrade_ctx); } return true; diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index dd54426e9a..bd963d0282 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -633,6 +633,25 @@ pub fn unrefConcurrently(this: *EventLoop) void { this.wakeup(); } +/// Testing API to expose event loop state +pub fn getActiveTasks(globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const vm = globalObject.bunVM(); + const event_loop = vm.event_loop; + + const result = jsc.JSValue.createEmptyObject(globalObject, 3); + result.put(globalObject, jsc.ZigString.static("activeTasks"), jsc.JSValue.jsNumber(vm.active_tasks)); + result.put(globalObject, jsc.ZigString.static("concurrentRef"), jsc.JSValue.jsNumber(event_loop.concurrent_ref.load(.seq_cst))); + + // Get num_polls from uws loop (POSIX) or active_handles from libuv (Windows) + const num_polls: i32 = if (Environment.isWindows) + @intCast(bun.windows.libuv.Loop.get().active_handles) + else + uws.Loop.get().num_polls; + result.put(globalObject, jsc.ZigString.static("numPolls"), jsc.JSValue.jsNumber(num_polls)); + + return result; +} + pub const AnyEventLoop = @import("./event_loop/AnyEventLoop.zig").AnyEventLoop; pub const ConcurrentPromiseTask = @import("./event_loop/ConcurrentPromiseTask.zig").ConcurrentPromiseTask; pub const WorkTask = @import("./event_loop/WorkTask.zig").WorkTask; diff --git a/src/js/internal-for-testing.ts b/src/js/internal-for-testing.ts index 89b22f5841..35b7cd0916 100644 --- a/src/js/internal-for-testing.ts +++ b/src/js/internal-for-testing.ts @@ -211,6 +211,9 @@ export const structuredCloneAdvanced: ( export const lsanDoLeakCheck = $newCppFunction("InternalForTesting.cpp", "jsFunction_lsanDoLeakCheck", 1); +export const getEventLoopStats: () => { activeTasks: number; concurrentRef: number; numPolls: number } = + $newZigFunction("event_loop.zig", "getActiveTasks", 0); + export const hostedGitInfo = { parseUrl: $newZigFunction("hosted_git_info.zig", "TestingAPIs.jsParseUrl", 1), fromUrl: $newZigFunction("hosted_git_info.zig", "TestingAPIs.jsFromUrl", 1), diff --git a/test/js/node/http/node-http-with-ws.test.ts b/test/js/node/http/node-http-with-ws.test.ts index f4cc63d242..a3ef8cac6a 100644 --- a/test/js/node/http/node-http-with-ws.test.ts +++ b/test/js/node/http/node-http-with-ws.test.ts @@ -1,10 +1,65 @@ import { expect, test } from "bun:test"; -import { tls as options } from "harness"; +import { bunEnv, bunExe, tls as options } from "harness"; import https from "https"; import type { AddressInfo } from "node:net"; import tls from "tls"; import { WebSocketServer } from "ws"; -test("should not crash when closing sockets after upgrade", async () => { + +test.concurrent("WebSocket upgrade should unref poll_ref from response", async () => { + // Regression test for bug where poll_ref was not unref'd on WebSocket upgrade + // The bug: NodeHTTPResponse.poll_ref stayed active after upgrade + // This test verifies activeTasks is correctly decremented after upgrade + const script = /* js */ ` + const http = require("http"); + const { WebSocketServer } = require("ws"); + const { getEventLoopStats } = require("bun:internal-for-testing"); + + const server = http.createServer(); + const wsServer = new WebSocketServer({ server }); + + let initialStats; + process.exitCode = 1; + + wsServer.on("connection", (ws) => { + // After WebSocket upgrade completes, check active tasks + const stats = getEventLoopStats(); + ws.close(); + wsServer.close(); + server.close(); + + // With the bug: poll_ref from NodeHTTPResponse stays active (activeTasks = 1) + // With the fix: poll_ref.unref() was called on upgrade (activeTasks should be 0) + if (stats.activeTasks !== initialStats.activeTasks) { + console.error("BUG_DETECTED: activeTasks=" + stats.activeTasks + " (expected 0 after upgrade)"); + process.exit(1); + } + + process.exitCode = 0; + }); + + initialStats = getEventLoopStats(); + server.listen(0, "127.0.0.1", () => { + const port = server.address().port; + const ws = new WebSocket("ws://127.0.0.1:" + port); + }); + `; + + await using proc = Bun.spawn({ + cmd: [bunExe(), "-e", script], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const [stderr, exitCode] = await Promise.all([proc.stderr.text(), proc.exited]); + + // Should exit cleanly without detecting the bug + expect(stderr).not.toContain("BUG_DETECTED"); + expect(stderr).toBe(""); + expect(exitCode).toBe(0); +}); + +test.concurrent("should not crash when closing sockets after upgrade", async () => { const { promise, resolve } = Promise.withResolvers(); let http_sockets: tls.TLSSocket[] = [];