Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
32ee722ad4 fix(socket): resolve HTTP streaming hangs on Windows
Three fixes to address issue #27010 where HTTP requests using the Node.js
http module would hang on Windows after receiving the first data chunk:

1. Force writable poll re-registration on libuv/Windows: Add
   us_poll_change_force() that unconditionally restarts uv_poll to work
   around WSAPoll not reliably delivering writable events after partial
   writes. Previously, only kqueue had explicit re-registration.

2. Enable repeat-recv on Windows: The recv loop optimization that reads
   multiple times when large data is available was completely disabled
   on Windows. This caused each recv to require a full event loop
   iteration, degrading streaming performance.

3. Fix bsd_write2 partial write handling on Windows: The two-send
   fallback (since Windows lacks writev) now properly propagates errors
   from the first send call instead of passing negative values through.

Closes #27010

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-13 23:15:34 +00:00
6 changed files with 183 additions and 4 deletions

View File

@@ -810,11 +810,19 @@ ssize_t bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_le
#else
ssize_t bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_length, const char *payload, int payload_length) {
ssize_t written = bsd_send(fd, header, header_length);
if (written < 0) {
return written;
}
if (written == header_length) {
ssize_t second_write = bsd_send(fd, payload, payload_length);
if (second_write > 0) {
written += second_write;
} else if (second_write < 0) {
/* First write succeeded but second failed with error.
* Return the header bytes written so the caller knows
* partial progress was made. */
}
/* If second_write == 0 (would-block), also just return header_length */
}
return written;
}

View File

@@ -495,6 +495,12 @@ void us_poll_change(struct us_poll_t *p, struct us_loop_t *loop, int events) {
}
}
/* On epoll/kqueue, force is the same as regular change since the kernel
* handles level/edge triggering correctly. */
void us_poll_change_force(struct us_poll_t *p, struct us_loop_t *loop, int events) {
us_poll_change(p, loop, events);
}
void us_poll_stop(struct us_poll_t *p, struct us_loop_t *loop) {
int old_events = us_poll_events(p);
int new_events = 0;

View File

@@ -115,6 +115,18 @@ void us_poll_change(struct us_poll_t *p, struct us_loop_t *loop, int events) {
}
}
/* Like us_poll_change, but always calls uv_poll_start even if events haven't changed.
* This is needed on Windows where WSAPoll may not reliably re-trigger writable events
* after a partial write without an explicit poll restart. */
void us_poll_change_force(struct us_poll_t *p, struct us_loop_t *loop, int events) {
if(!p->uv_p) return;
p->poll_type =
us_internal_poll_type(p) |
((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) |
((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0);
uv_poll_start(p->uv_p, events, poll_cb);
}
void us_poll_stop(struct us_poll_t *p, struct us_loop_t *loop) {
if(!p->uv_p) return;
uv_poll_stop(p->uv_p);

View File

@@ -399,6 +399,9 @@ void us_poll_start(us_poll_r p, us_loop_r loop, int events) nonnull_fn_decl;
/* Returns 0 if successful */
int us_poll_start_rc(us_poll_r p, us_loop_r loop, int events) nonnull_fn_decl;
void us_poll_change(us_poll_r p, us_loop_r loop, int events) nonnull_fn_decl;
/* Like us_poll_change but unconditionally restarts the poll even if events match.
* Needed on Windows where WSAPoll may miss writable events without explicit restart. */
void us_poll_change_force(us_poll_r p, us_loop_r loop, int events) nonnull_fn_decl;
void us_poll_stop(us_poll_r p, struct us_loop_t *loop) nonnull_fn_decl;
/* Return what events we are polling for */

View File

@@ -418,9 +418,11 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
if (!s->flags.last_write_failed || us_socket_is_shut_down(0, s)) {
us_poll_change(&s->p, loop, us_poll_events(&s->p) & LIBUS_SOCKET_READABLE);
} else {
#ifdef LIBUS_USE_KQUEUE
/* Kqueue one-shot writable needs to be re-registered */
us_poll_change(&s->p, loop, us_poll_events(&s->p) | LIBUS_SOCKET_WRITABLE);
#if defined(LIBUS_USE_KQUEUE) || defined(LIBUS_USE_LIBUV)
/* Kqueue: one-shot writable needs to be re-registered.
* Libuv/Windows: WSAPoll may not reliably deliver writable events
* after a partial write without re-registration. Force it. */
us_poll_change_force(&s->p, loop, us_poll_events(&s->p) | LIBUS_SOCKET_WRITABLE);
#endif
}
}
@@ -509,8 +511,8 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
if(s && s->flags.adopted && s->prev) {
s = s->prev;
}
// loop->num_ready_polls isn't accessible on Windows.
#ifndef WIN32
// loop->num_ready_polls isn't accessible on Windows.
// rare case: we're reading a lot of data, there's more to be read, and either:
// - the socket has hung up, so we will never get more data from it (only applies to macOS, as macOS will send the event the same tick but Linux will not.)
// - the event loop isn't very busy, so we can read multiple times in a row
@@ -529,6 +531,20 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
}
}
#undef LOOP_ISNT_VERY_BUSY_THRESHOLD
#else
// On Windows, we don't have num_ready_polls but we still need to
// drain available data to avoid requiring a full event loop iteration
// for each recv. This is critical for streaming performance.
if (
s && length >= (LIBUS_RECV_BUFFER_LENGTH - 24 * 1024) && length <= LIBUS_RECV_BUFFER_LENGTH &&
!us_socket_is_closed(0, s)
) {
repeat_recv_count++;
// Limit to 10 iterations to avoid starving other sockets
if (repeat_recv_count <= 10) {
continue;
}
}
#endif
} else if (!length) {
eof = 1; // lets handle EOF in the same place

View File

@@ -0,0 +1,134 @@
import { expect, test } from "bun:test";
import http from "node:http";
// Regression test for https://github.com/oven-sh/bun/issues/27010
// HTTP requests hanging on Windows when making multiple concurrent large
// streaming GET requests using the Node.js http module.
test("multiple concurrent streaming HTTP requests complete without hanging", async () => {
const TWO_MIB = 2 * 1024 * 1024;
const CHUNK_SIZE = 64 * 1024;
const ZERO_CHUNK = new Uint8Array(CHUNK_SIZE);
// Start a streaming HTTP server
using server = Bun.serve({
port: 0,
fetch(req) {
const url = new URL(req.url);
if (url.pathname !== "/stream") {
return new Response("OK");
}
let sent = 0;
const stream = new ReadableStream({
pull: async controller => {
const remaining = TWO_MIB - sent;
if (remaining <= 0) {
controller.close();
return;
}
// Small delay to simulate realistic streaming
await new Promise(resolve => setTimeout(resolve, 1));
const n = Math.min(remaining, ZERO_CHUNK.byteLength);
controller.enqueue(n === ZERO_CHUNK.byteLength ? ZERO_CHUNK : ZERO_CHUNK.subarray(0, n));
sent += n;
},
});
return new Response(stream, {
headers: {
"content-type": "application/octet-stream",
"cache-control": "no-store",
},
});
},
});
const url = `http://localhost:${server.port}/stream`;
const NUM_WORKERS = 3;
const NUM_ITERATIONS = 2;
function downloadOnce(): Promise<number> {
return new Promise((resolve, reject) => {
const req = http.get(url, res => {
let total = 0;
res.on("data", (chunk: Buffer) => {
total += chunk.length;
});
res.on("end", () => {
resolve(total);
});
res.on("error", (err: Error) => {
reject(err);
});
});
req.on("error", (err: Error) => {
reject(err);
});
});
}
async function workerLoop(): Promise<void> {
for (let iter = 0; iter < NUM_ITERATIONS; iter++) {
const bytes = await downloadOnce();
expect(bytes).toBe(TWO_MIB);
}
}
const promises: Promise<void>[] = [];
for (let w = 0; w < NUM_WORKERS; w++) {
promises.push(workerLoop());
}
await Promise.all(promises);
}, 30_000);
test("streaming HTTP response delivers all chunks via node:http", async () => {
const TOTAL_SIZE = 512 * 1024; // 512KB
const CHUNK_SIZE = 16 * 1024; // 16KB chunks
using server = Bun.serve({
port: 0,
fetch(req) {
let sent = 0;
const stream = new ReadableStream({
pull: async controller => {
if (sent >= TOTAL_SIZE) {
controller.close();
return;
}
// Create a chunk with incrementing byte pattern for verification
const n = Math.min(TOTAL_SIZE - sent, CHUNK_SIZE);
const chunk = new Uint8Array(n);
chunk.fill((sent / CHUNK_SIZE) & 0xff);
controller.enqueue(chunk);
sent += n;
},
});
return new Response(stream);
},
});
const url = `http://localhost:${server.port}/`;
const result = await new Promise<Buffer>((resolve, reject) => {
const req = http.get(url, res => {
const chunks: Buffer[] = [];
res.on("data", (chunk: Buffer) => {
chunks.push(chunk);
});
res.on("end", () => {
resolve(Buffer.concat(chunks));
});
res.on("error", reject);
});
req.on("error", reject);
});
expect(result.length).toBe(TOTAL_SIZE);
// Verify chunk pattern integrity
for (let i = 0; i < TOTAL_SIZE / CHUNK_SIZE; i++) {
const offset = i * CHUNK_SIZE;
const expectedByte = i & 0xff;
expect(result[offset]).toBe(expectedByte);
expect(result[offset + CHUNK_SIZE - 1]).toBe(expectedByte);
}
});