diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index b48c2f406b..0b041a974c 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -865,17 +865,24 @@ pub const FetchTasklet = struct { this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis); } - pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) jsc.WebCore.DrainResult { + pub fn onStartStreamingHTTPResponseBodyCallback(ctx: *anyopaque) jsc.WebCore.DrainResult { const this = bun.cast(*FetchTasklet, ctx); - if (this.http) |http_| { - http_.enableBodyStreaming(); - } if (this.signal_store.aborted.load(.monotonic)) { return jsc.WebCore.DrainResult{ .aborted = {}, }; } + if (this.http) |http_| { + http_.enableResponseBodyStreaming(); + + // If the server sent the headers and the response body in two separate socket writes + // and if the server doesn't close the connection by itself + // and doesn't send any follow-up data + // then we must make sure the HTTP thread flushes. + bun.http.http_thread.scheduleResponseBodyDrain(http_.async_http_id); + } + this.mutex.lock(); defer this.mutex.unlock(); const size_hint = this.getSizeHint(); @@ -923,7 +930,7 @@ pub const FetchTasklet = struct { .size_hint = this.getSizeHint(), .task = this, .global = this.global_this, - .onStartStreaming = FetchTasklet.onStartStreamingRequestBodyCallback, + .onStartStreaming = FetchTasklet.onStartStreamingHTTPResponseBodyCallback, .onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable, }, }; @@ -974,7 +981,7 @@ pub const FetchTasklet = struct { // enabling streaming will make the http thread to drain into the main thread (aka stop buffering) // without a stream ref, response body or response instance alive it will just ignore the result if (this.http) |http_| { - http_.enableBodyStreaming(); + http_.enableResponseBodyStreaming(); } // we should not keep the process alive if we are ignoring the body const vm = this.javascript_vm; @@ -1338,7 +1345,7 @@ pub const FetchTasklet = struct { task.http.?.* = async_http.*; task.http.?.response_buffer = async_http.response_buffer; - log("callback success={} has_more={} bytes={}", .{ result.isSuccess(), result.has_more, result.body.?.list.items.len }); + log("callback success={} ignore_data={} has_more={} bytes={}", .{ result.isSuccess(), task.ignore_data, result.has_more, result.body.?.list.items.len }); const prev_metadata = task.result.metadata; const prev_cert_info = task.result.certificate_info; diff --git a/src/http.zig b/src/http.zig index c5cea683eb..833dacd709 100644 --- a/src/http.zig +++ b/src/http.zig @@ -1684,6 +1684,89 @@ pub fn setTimeout(this: *HTTPClient, socket: anytype, minutes: c_uint) void { socket.setTimeoutMinutes(minutes); } +pub fn drainResponseBody(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { + + // Find out if we should not send any update. + switch (this.state.stage) { + .done, .fail => return, + else => {}, + } + + if (this.state.fail != null) { + // If there's any error at all, do not drain. + return; + } + + // If there's a pending redirect, then don't bother to send a response body + // as that wouldn't make sense and I want to defensively avoid edgecases + // from that. + if (this.state.flags.is_redirect_pending) { + return; + } + + const body_out_str = this.state.body_out_str orelse return; + if (body_out_str.list.items.len == 0) { + // No update! Don't do anything. + return; + } + + this.sendProgressUpdateWithoutStageCheck(is_ssl, http_thread.context(is_ssl), socket); +} + +fn sendProgressUpdateWithoutStageCheck(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { + const out_str = this.state.body_out_str.?; + const body = out_str.*; + const result = this.toResult(); + const is_done = !result.has_more; + + log("progressUpdate {}", .{is_done}); + + const callback = this.result_callback; + + if (is_done) { + this.unregisterAbortTracker(); + if (this.proxy_tunnel) |tunnel| { + log("close the tunnel", .{}); + this.proxy_tunnel = null; + tunnel.shutdown(); + tunnel.detachAndDeref(); + NewHTTPContext(is_ssl).closeSocket(socket); + } else { + if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) { + log("release socket", .{}); + ctx.releaseSocket( + socket, + this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized, + this.connected_url.hostname, + this.connected_url.getPortAuto(), + ); + } else { + NewHTTPContext(is_ssl).closeSocket(socket); + } + } + + this.state.reset(this.allocator); + this.state.response_stage = .done; + this.state.request_stage = .done; + this.state.stage = .done; + this.flags.proxy_tunneling = false; + log("done", .{}); + } + + result.body.?.* = body; + callback.run(@fieldParentPtr("client", this), result); + + if (comptime print_every > 0) { + print_every_i += 1; + if (print_every_i % print_every == 0) { + Output.prettyln("Heap stats for HTTP thread\n", .{}); + Output.flush(); + default_arena.dumpThreadStats(); + print_every_i = 0; + } + } +} + pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { if (this.state.stage != .done and this.state.stage != .fail) { if (this.state.flags.is_redirect_pending and this.state.fail == null) { @@ -1692,57 +1775,8 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon } return; } - const out_str = this.state.body_out_str.?; - const body = out_str.*; - const result = this.toResult(); - const is_done = !result.has_more; - log("progressUpdate {}", .{is_done}); - - const callback = this.result_callback; - - if (is_done) { - this.unregisterAbortTracker(); - if (this.proxy_tunnel) |tunnel| { - log("close the tunnel", .{}); - this.proxy_tunnel = null; - tunnel.shutdown(); - tunnel.detachAndDeref(); - NewHTTPContext(is_ssl).closeSocket(socket); - } else { - if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) { - log("release socket", .{}); - ctx.releaseSocket( - socket, - this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized, - this.connected_url.hostname, - this.connected_url.getPortAuto(), - ); - } else { - NewHTTPContext(is_ssl).closeSocket(socket); - } - } - - this.state.reset(this.allocator); - this.state.response_stage = .done; - this.state.request_stage = .done; - this.state.stage = .done; - this.flags.proxy_tunneling = false; - log("done", .{}); - } - - result.body.?.* = body; - callback.run(@fieldParentPtr("client", this), result); - - if (comptime print_every > 0) { - print_every_i += 1; - if (print_every_i % print_every == 0) { - Output.prettyln("Heap stats for HTTP thread\n", .{}); - Output.flush(); - default_arena.dumpThreadStats(); - print_every_i = 0; - } - } + this.sendProgressUpdateWithoutStageCheck(is_ssl, ctx, socket); } } @@ -1944,7 +1978,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con // done or streaming const is_done = content_length != null and this.state.total_body_received >= content_length.?; - if (is_done or this.signals.get(.body_streaming) or content_length == null) { + if (is_done or this.signals.get(.response_body_streaming) or content_length == null) { const is_final_chunk = is_done; const processed = try this.state.processBodyBuffer(buffer.*, is_final_chunk); @@ -2010,7 +2044,7 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets( progress.context.maybeRefresh(); } // streaming chunks - if (this.signals.get(.body_streaming)) { + if (this.signals.get(.response_body_streaming)) { // If we're streaming, we cannot use the libdeflate fast path this.state.flags.is_libdeflate_fast_path_disabled = true; return try this.state.processBodyBuffer(buffer, false); @@ -2091,7 +2125,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket( try body_buffer.appendSliceExact(buffer); // streaming chunks - if (this.signals.get(.body_streaming)) { + if (this.signals.get(.response_body_streaming)) { // If we're streaming, we cannot use the libdeflate fast path this.state.flags.is_libdeflate_fast_path_disabled = true; diff --git a/src/http/AsyncHTTP.zig b/src/http/AsyncHTTP.zig index 3a41a67663..52e7356af0 100644 --- a/src/http/AsyncHTTP.zig +++ b/src/http/AsyncHTTP.zig @@ -68,8 +68,8 @@ pub fn signalHeaderProgress(this: *AsyncHTTP) void { progress.store(true, .release); } -pub fn enableBodyStreaming(this: *AsyncHTTP) void { - var stream = this.signals.body_streaming orelse return; +pub fn enableResponseBodyStreaming(this: *AsyncHTTP) void { + var stream = this.signals.response_body_streaming orelse return; stream.store(true, .release); } diff --git a/src/http/HTTPThread.zig b/src/http/HTTPThread.zig index c65dd26ec8..40453f8927 100644 --- a/src/http/HTTPThread.zig +++ b/src/http/HTTPThread.zig @@ -10,9 +10,12 @@ queued_tasks: Queue = Queue{}, queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){}, queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){}, +queued_response_body_drains: std.ArrayListUnmanaged(DrainMessage) = std.ArrayListUnmanaged(DrainMessage){}, queued_shutdowns_lock: bun.Mutex = .{}, queued_writes_lock: bun.Mutex = .{}, +queued_response_body_drains_lock: bun.Mutex = .{}, + queued_threadlocal_proxy_derefs: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){}, has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), @@ -88,6 +91,9 @@ const WriteMessage = struct { end = 1, }; }; +const DrainMessage = struct { + async_http_id: u32, +}; const ShutdownMessage = struct { async_http_id: u32, }; @@ -359,8 +365,43 @@ fn drainQueuedWrites(this: *@This()) void { } } +fn drainQueuedHTTPResponseBodyDrains(this: *@This()) void { + while (true) { + // socket.close() can potentially be slow + // Let's not block other threads while this runs. + var queued_response_body_drains = brk: { + this.queued_response_body_drains_lock.lock(); + defer this.queued_response_body_drains_lock.unlock(); + const drains = this.queued_response_body_drains; + this.queued_response_body_drains = .{}; + break :brk drains; + }; + defer queued_response_body_drains.deinit(bun.default_allocator); + + for (queued_response_body_drains.items) |drain| { + if (bun.http.socket_async_http_abort_tracker.get(drain.async_http_id)) |socket_ptr| { + switch (socket_ptr) { + inline .SocketTLS, .SocketTCP => |socket, tag| { + const is_tls = tag == .SocketTLS; + const HTTPContext = HTTPThread.NewHTTPContext(comptime is_tls); + const tagged = HTTPContext.getTaggedFromSocket(socket); + if (tagged.get(HTTPClient)) |client| { + client.drainResponseBody(comptime is_tls, socket); + } + }, + } + } + } + if (queued_response_body_drains.items.len == 0) { + break; + } + threadlog("drained {d} queued drains", .{queued_response_body_drains.items.len}); + } +} + fn drainEvents(this: *@This()) void { // Process any pending writes **before** aborting. + this.drainQueuedHTTPResponseBodyDrains(); this.drainQueuedWrites(); this.drainQueuedShutdowns(); @@ -443,6 +484,18 @@ fn processEvents(this: *@This()) noreturn { } } +pub fn scheduleResponseBodyDrain(this: *@This(), async_http_id: u32) void { + { + this.queued_response_body_drains_lock.lock(); + defer this.queued_response_body_drains_lock.unlock(); + this.queued_response_body_drains.append(bun.default_allocator, .{ + .async_http_id = async_http_id, + }) catch |err| bun.handleOom(err); + } + if (this.has_awoken.load(.monotonic)) + this.loop.loop.wakeup(); +} + pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void { threadlog("scheduleShutdown {d}", .{http.async_http_id}); { diff --git a/src/http/Signals.zig b/src/http/Signals.zig index bf8d1d8360..b59147e773 100644 --- a/src/http/Signals.zig +++ b/src/http/Signals.zig @@ -1,24 +1,24 @@ const Signals = @This(); header_progress: ?*std.atomic.Value(bool) = null, -body_streaming: ?*std.atomic.Value(bool) = null, +response_body_streaming: ?*std.atomic.Value(bool) = null, aborted: ?*std.atomic.Value(bool) = null, cert_errors: ?*std.atomic.Value(bool) = null, upgraded: ?*std.atomic.Value(bool) = null, pub fn isEmpty(this: *const Signals) bool { - return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null and this.upgraded == null; + return this.aborted == null and this.response_body_streaming == null and this.header_progress == null and this.cert_errors == null and this.upgraded == null; } pub const Store = struct { header_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - body_streaming: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + response_body_streaming: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), aborted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), cert_errors: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), upgraded: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), pub fn to(this: *Store) Signals { return .{ .header_progress = &this.header_progress, - .body_streaming = &this.body_streaming, + .response_body_streaming = &this.response_body_streaming, .aborted = &this.aborted, .cert_errors = &this.cert_errors, .upgraded = &this.upgraded, diff --git a/src/s3/client.zig b/src/s3/client.zig index d08e774070..9908aef366 100644 --- a/src/s3/client.zig +++ b/src/s3/client.zig @@ -585,7 +585,7 @@ pub fn downloadStream( }, ); // enable streaming - task.http.enableBodyStreaming(); + task.http.enableResponseBodyStreaming(); // queue http request bun.http.HTTPThread.init(&.{}); var batch = bun.ThreadPool.Batch{}; diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts index 52f2b66c7c..bdbd646b65 100644 --- a/test/js/web/fetch/fetch.stream.test.ts +++ b/test/js/web/fetch/fetch.stream.test.ts @@ -1,10 +1,10 @@ import { Socket } from "bun"; import { describe, expect, it } from "bun:test"; import { createReadStream, readFileSync } from "fs"; -import { gcTick } from "harness"; +import { gcTick, isWindows, tempDirWithFilesAnon } from "harness"; import http from "http"; import type { AddressInfo } from "net"; -import { join } from "path"; +import path, { join } from "path"; import { pipeline } from "stream"; import zlib from "zlib"; @@ -1347,4 +1347,58 @@ describe.concurrent("fetch() with streaming", () => { } }); } + + it.skipIf( + // The C program is POSIX only + isWindows, + )("should drain response body from HTTP thread when server sends chunk then stops (chunked encoding)", async () => { + // This test reproduces a bug where the HTTP client wasn't asking the HTTP thread + // to drain pending response body bytes. If the server sent headers + first chunk, + // then stopped sending data (but kept connection open), the read would hang forever. + // + // We use a C server with blocking sockets instead of Bun.listen because Bun's sockets + // are non-blocking and event-driven, which makes it difficult to reliably reproduce + // the exact timing conditions needed to trigger this bug. The C server uses blocking + // write() calls that ensure data is buffered in the kernel before the server stops + // sending, forcing the HTTP client to drain the response body from the HTTP thread. + const dir = tempDirWithFilesAnon({ "a": "// a" }); + { + await using proc = Bun.spawn({ + cmd: [ + "cc", + "-Wno-error", + "-w", + path.join(import.meta.dirname, "http-chunked-server.c"), + "-o", + "http-chunked-server", + ], + cwd: dir, + stdout: "inherit", + stderr: "inherit", + stdin: "ignore", + }); + expect(await proc.exited).toBe(0); + } + + await using server = Bun.spawn({ + cmd: [path.join(dir, "http-chunked-server")], + stdout: "pipe", + stderr: "inherit", + stdin: "ignore", + }); + + const url = new URL("http://127.0.0.1:" + (await server.stdout.text()).trim()); + + const response = await fetch(url.toString(), {}); + const reader = response.body!.getReader(); + + // Read the data - this should not hang + const result = (await reader.read()) as ReadableStreamDefaultReadResult; + + // Verify we got the data without hanging + expect(result.done).toBe(false); + expect(result.value).toBeDefined(); + expect(new TextDecoder().decode(result.value!)).toBe("hello\n"); + server.kill("SIGTERM"); + }); }); diff --git a/test/js/web/fetch/http-chunked-server.c b/test/js/web/fetch/http-chunked-server.c new file mode 100644 index 0000000000..3a16c29888 --- /dev/null +++ b/test/js/web/fetch/http-chunked-server.c @@ -0,0 +1,111 @@ +// Simple HTTP server that reproduces a streaming response body bug. +// +// This server uses blocking sockets to send an HTTP response with chunked +// encoding, then keeps the connection open without sending more data. This +// reproduces a bug where Bun's HTTP client wasn't draining pending response +// body bytes from the HTTP thread when the server stopped sending data but +// kept the connection alive. +// +// The server: +// 1. Binds to a random port and prints it to stdout +// 2. Accepts one connection +// 3. Sends HTTP headers with Transfer-Encoding: chunked +// 4. Sends one chunk containing "hello\n" +// 5. Keeps the connection open indefinitely before closing +// +// Without the fix, step 4 would cause the client to hang indefinitely waiting +// for data that's already been received by the HTTP thread but not drained. + +#include +#include +#include +#include +#include +#include +#include + +int main(int argc, char *argv[]) { + (void)argc; + (void)argv; + + int server_fd, client_fd; + struct sockaddr_in address; + int opt = 1; + int addrlen = sizeof(address); + char buffer[1024] = {0}; + + // Create socket + if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) { + perror("socket failed"); + exit(EXIT_FAILURE); + } + + // Set socket options + if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) { + perror("setsockopt"); + exit(EXIT_FAILURE); + } + + address.sin_family = AF_INET; + address.sin_addr.s_addr = inet_addr("127.0.0.1"); + address.sin_port = htons(0); // Use port 0 to get random port + + // Bind socket + if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) { + perror("bind failed"); + exit(EXIT_FAILURE); + } + + // Get the actual port number + socklen_t len = sizeof(address); + if (getsockname(server_fd, (struct sockaddr *)&address, &len) == -1) { + perror("getsockname"); + exit(EXIT_FAILURE); + } + + // Print port to stdout so test can read it + printf("%d\n", ntohs(address.sin_port)); + fflush(stdout); + // Close stdout so we can simply read it. + close(1); + + // Listen + if (listen(server_fd, 1) < 0) { + perror("listen"); + exit(EXIT_FAILURE); + } + + // Accept connection + if ((client_fd = accept(server_fd, (struct sockaddr *)&address, + (socklen_t *)&addrlen)) < 0) { + perror("accept"); + exit(EXIT_FAILURE); + } + + // Read the HTTP request (we don't care about it) + read(client_fd, buffer, 1024); + + // Send HTTP response headers with chunked encoding + const char *headers = "HTTP/1.1 200 OK\r\n" + "Content-Type: text/event-stream\r\n" + "Cache-Control: no-store\r\n" + "Connection: keep-alive\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n"; + + write(client_fd, headers, strlen(headers)); + + // Send first chunk: "hello\n" is 6 bytes + const char *chunk = "6\r\nhello\n\r\n"; + write(client_fd, chunk, strlen(chunk)); + + // Important: Don't close the connection! + // Just sleep to keep it open + sleep(9999999); + + // Clean up + close(client_fd); + close(server_fd); + + return 0; +}