From 0e883c935cd0eef7d4bd201744f5f967264fc48c Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Tue, 20 May 2025 21:11:22 -0700 Subject: [PATCH] fix(install/fetch) proper handle proxy (#19771) Co-authored-by: Jarred Sumner Co-authored-by: Jarred-Sumner <709451+Jarred-Sumner@users.noreply.github.com> Co-authored-by: cirospaciari <6379399+cirospaciari@users.noreply.github.com> Co-authored-by: Meghan Denny --- src/bun.js/api/bun/ssl_wrapper.zig | 63 ++++--- src/http.zig | 210 +++++++++++++-------- test/cli/install/bun-install-proxy.test.ts | 114 +++++++++++ test/harness.ts | 24 ++- 4 files changed, 310 insertions(+), 101 deletions(-) create mode 100644 test/cli/install/bun-install-proxy.test.ts diff --git a/src/bun.js/api/bun/ssl_wrapper.zig b/src/bun.js/api/bun/ssl_wrapper.zig index 6c62116674..c1f561ce2a 100644 --- a/src/bun.js/api/bun/ssl_wrapper.zig +++ b/src/bun.js/api/bun/ssl_wrapper.zig @@ -4,6 +4,7 @@ const BoringSSL = bun.BoringSSL.c; const X509 = @import("./x509.zig"); const JSC = bun.JSC; const uws = bun.uws; +const log = bun.Output.scoped(.SSLWrapper, true); /// Mimics the behavior of openssl.c in uSockets, wrapping data that can be received from any where (network, DuplexStream, etc) pub fn SSLWrapper(comptime T: type) type { @@ -25,7 +26,9 @@ pub fn SSLWrapper(comptime T: type) type { return struct { const This = @This(); - const BUFFER_SIZE = 16384; + // 64kb nice buffer size for SSL reads and writes, should be enough for most cases + // in reads we loop until we have no more data to read and in writes we loop until we have no more data to write/backpressure + const BUFFER_SIZE = 65536; handlers: Handlers, ssl: ?*BoringSSL.SSL, @@ -110,6 +113,12 @@ pub fn SSLWrapper(comptime T: type) type { // start the handshake this.handleTraffic(); } + pub fn startWithPayload(this: *This, payload: []const u8) void { + this.handlers.onOpen(this.handlers.ctx); + this.receiveData(payload); + // start the handshake + this.handleTraffic(); + } /// Shutdown the read direction of the SSL (fake it just for convenience) pub fn shutdownRead(this: *This) void { @@ -180,10 +189,15 @@ pub fn SSLWrapper(comptime T: type) type { // Return if we have pending data to be read or write pub fn hasPendingData(this: *const This) bool { const ssl = this.ssl orelse return false; - return BoringSSL.BIO_ctrl_pending(BoringSSL.SSL_get_wbio(ssl)) > 0 or BoringSSL.BIO_ctrl_pending(BoringSSL.SSL_get_rbio(ssl)) > 0; } + /// Return if we buffered data inside the BIO read buffer, not necessarily will return data to read + /// this dont reflect SSL_pending() + fn hasPendingRead(this: *const This) bool { + const ssl = this.ssl orelse return false; + return BoringSSL.BIO_ctrl_pending(BoringSSL.SSL_get_rbio(ssl)) > 0; + } // We sent or received a shutdown (closing or closed) pub fn isShutdown(this: *const This) bool { return this.flags.closed_notified or this.flags.received_ssl_shutdown or this.flags.sent_ssl_shutdown; @@ -382,18 +396,12 @@ pub fn SSLWrapper(comptime T: type) type { // read data from the input BIO while (true) { + log("handleReading", .{}); const ssl = this.ssl orelse return false; - const input = BoringSSL.SSL_get_rbio(ssl) orelse return true; - - const pending = BoringSSL.BIO_ctrl_pending(input); - if (pending <= 0) { - // no data to write - break; - } const available = buffer[read..]; const just_read = BoringSSL.SSL_read(ssl, available.ptr, @intCast(available.len)); - + log("just read {d}", .{just_read}); if (just_read <= 0) { const err = BoringSSL.SSL_get_error(ssl, just_read); BoringSSL.ERR_clear_error(); @@ -424,11 +432,13 @@ pub fn SSLWrapper(comptime T: type) type { // flush the reading if (read > 0) { + log("triggering data callback (read {d})", .{read}); this.triggerDataCallback(buffer[0..read]); } this.triggerCloseCallback(); return false; } else { + log("wanna read/write just break", .{}); // we wanna read/write just break break; } @@ -438,6 +448,7 @@ pub fn SSLWrapper(comptime T: type) type { read += @intCast(just_read); if (read == buffer.len) { + log("triggering data callback (read {d}) and resetting read buffer", .{read}); // we filled the buffer this.triggerDataCallback(buffer[0..read]); read = 0; @@ -445,41 +456,45 @@ pub fn SSLWrapper(comptime T: type) type { } // we finished reading if (read > 0) { + log("triggering data callback (read {d})", .{read}); this.triggerDataCallback(buffer[0..read]); } return true; } fn handleWriting(this: *This, buffer: *[BUFFER_SIZE]u8) void { + var read: usize = 0; while (true) { const ssl = this.ssl orelse return; - const output = BoringSSL.SSL_get_wbio(ssl) orelse return; - // read data from the output BIO - const pending = BoringSSL.BIO_ctrl_pending(output); - if (pending <= 0) { - // no data to write + const available = buffer[read..]; + const just_read = BoringSSL.BIO_read(output, available.ptr, @intCast(available.len)); + if (just_read > 0) { + read += @intCast(just_read); + if (read == buffer.len) { + this.triggerWannaWriteCallback(buffer[0..read]); + read = 0; + } + } else { break; } - // limit the read to the buffer size - const len = @min(pending, buffer.len); - const pending_buffer = buffer[0..len]; - const read = BoringSSL.BIO_read(output, pending_buffer.ptr, len); - if (read > 0) { - this.triggerWannaWriteCallback(buffer[0..@intCast(read)]); - } + } + if (read > 0) { + this.triggerWannaWriteCallback(buffer[0..read]); } } fn handleTraffic(this: *This) void { + // always handle the handshake first if (this.updateHandshakeState()) { // shared stack buffer for reading and writing var buffer: [BUFFER_SIZE]u8 = undefined; // drain the input BIO first this.handleWriting(&buffer); - // drain the output BIO - if (this.handleReading(&buffer)) { + + // drain the output BIO in loop, because read can trigger writing and vice versa + while (this.hasPendingRead() and this.handleReading(&buffer)) { // read data can trigger writing so we need to handle it this.handleWriting(&buffer); } diff --git a/src/http.zig b/src/http.zig index 8401a55cdc..892908c553 100644 --- a/src/http.zig +++ b/src/http.zig @@ -241,6 +241,7 @@ const ProxyTunnel = struct { const ProxyTunnelWrapper = SSLWrapper(*HTTPClient); fn onOpen(this: *HTTPClient) void { + log("ProxyTunnel onOpen", .{}); this.state.response_stage = .proxy_handshake; this.state.request_stage = .proxy_handshake; if (this.proxy_tunnel) |proxy| { @@ -271,13 +272,13 @@ const ProxyTunnel = struct { fn onData(this: *HTTPClient, decoded_data: []const u8) void { if (decoded_data.len == 0) return; - log("onData decoded {}", .{decoded_data.len}); - + log("ProxyTunnel onData decoded {}", .{decoded_data.len}); if (this.proxy_tunnel) |proxy| { proxy.ref(); defer proxy.deref(); switch (this.state.response_stage) { .body => { + log("ProxyTunnel onData body", .{}); if (decoded_data.len == 0) return; const report_progress = this.handleResponseBody(decoded_data, false) catch |err| { proxy.close(err); @@ -298,6 +299,7 @@ const ProxyTunnel = struct { } }, .body_chunk => { + log("ProxyTunnel onData body_chunk", .{}); if (decoded_data.len == 0) return; const report_progress = this.handleResponseBodyChunkedEncoding(decoded_data) catch |err| { proxy.close(err); @@ -318,6 +320,7 @@ const ProxyTunnel = struct { } }, .proxy_headers => { + log("ProxyTunnel onData proxy_headers", .{}); switch (proxy.socket) { .ssl => |socket| { this.handleOnDataHeaders(true, decoded_data, &http_thread.https_context, socket); @@ -329,6 +332,7 @@ const ProxyTunnel = struct { } }, else => { + log("ProxyTunnel onData unexpected data", .{}); this.state.pending_response = null; proxy.close(error.UnexpectedData); }, @@ -338,6 +342,7 @@ const ProxyTunnel = struct { fn onHandshake(this: *HTTPClient, handshake_success: bool, ssl_error: uws.us_bun_verify_error_t) void { if (this.proxy_tunnel) |proxy| { + log("ProxyTunnel onHandshake", .{}); proxy.ref(); defer proxy.deref(); this.state.response_stage = .proxy_headers; @@ -349,6 +354,7 @@ const ProxyTunnel = struct { .reason = if (ssl_error.code == null) "" else ssl_error.reason[0..bun.len(ssl_error.reason) :0], }; if (handshake_success) { + log("ProxyTunnel onHandshake success", .{}); // handshake completed but we may have ssl errors this.flags.did_have_handshaking_error = handshake_error.error_no != 0; if (this.flags.reject_unauthorized) { @@ -365,13 +371,16 @@ const ProxyTunnel = struct { switch (proxy.socket) { .ssl => |socket| { if (!this.checkServerIdentity(true, socket, handshake_error, ssl_ptr, false)) { + log("ProxyTunnel onHandshake checkServerIdentity failed", .{}); this.flags.did_have_handshaking_error = true; + this.unregisterAbortTracker(); return; } }, .tcp => |socket| { if (!this.checkServerIdentity(false, socket, handshake_error, ssl_ptr, false)) { + log("ProxyTunnel onHandshake checkServerIdentity failed", .{}); this.flags.did_have_handshaking_error = true; this.unregisterAbortTracker(); return; @@ -391,9 +400,10 @@ const ProxyTunnel = struct { .none => {}, } } else { + log("ProxyTunnel onHandshake failed", .{}); // if we are here is because server rejected us, and the error_no is the cause of this // if we set reject_unauthorized == false this means the server requires custom CA aka NODE_EXTRA_CA_CERTS - if (this.flags.did_have_handshaking_error) { + if (this.flags.did_have_handshaking_error and handshake_error.error_no != 0) { proxy.close(BoringSSL.getCertErrorFromNo(handshake_error.error_no)); return; } @@ -407,8 +417,8 @@ const ProxyTunnel = struct { pub fn write(this: *HTTPClient, encoded_data: []const u8) void { if (this.proxy_tunnel) |proxy| { const written = switch (proxy.socket) { - .ssl => |socket| socket.write(encoded_data, true), - .tcp => |socket| socket.write(encoded_data, true), + .ssl => |socket| socket.write(encoded_data, false), + .tcp => |socket| socket.write(encoded_data, false), .none => 0, }; const pending = encoded_data[@intCast(written)..]; @@ -420,6 +430,7 @@ const ProxyTunnel = struct { } fn onClose(this: *HTTPClient) void { + log("ProxyTunnel onClose {s}", .{if (this.proxy_tunnel == null) "tunnel is detached" else "tunnel exists"}); if (this.proxy_tunnel) |proxy| { proxy.ref(); // defer the proxy deref the proxy tunnel may still be in use after triggering the close callback @@ -438,7 +449,7 @@ const ProxyTunnel = struct { } } - fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: JSC.API.ServerConfig.SSLConfig) void { + fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: JSC.API.ServerConfig.SSLConfig, start_payload: []const u8) void { const proxy_tunnel = bun.new(ProxyTunnel, .{ .ref_count = .init(), }); @@ -470,11 +481,21 @@ const ProxyTunnel = struct { } else { proxy_tunnel.socket = .{ .tcp = socket }; } - proxy_tunnel.wrapper.?.start(); + if (start_payload.len > 0) { + log("proxy tunnel start with payload", .{}); + proxy_tunnel.wrapper.?.startWithPayload(start_payload); + } else { + log("proxy tunnel start", .{}); + proxy_tunnel.wrapper.?.start(); + } } pub fn close(this: *ProxyTunnel, err: anyerror) void { this.shutdown_err = err; + this.shutdown(); + } + + pub fn shutdown(this: *ProxyTunnel) void { if (this.wrapper) |*wrapper| { // fast shutdown the connection _ = wrapper.shutdown(true); @@ -482,8 +503,14 @@ const ProxyTunnel = struct { } pub fn onWritable(this: *ProxyTunnel, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { + log("ProxyTunnel onWritable", .{}); this.ref(); defer this.deref(); + defer if (this.wrapper) |*wrapper| { + // Cycle to through the SSL state machine + _ = wrapper.flush(); + }; + const encoded_data = this.write_buffer.slice(); if (encoded_data.len == 0) { return; @@ -491,13 +518,8 @@ const ProxyTunnel = struct { const written = socket.write(encoded_data, true); if (written == encoded_data.len) { this.write_buffer.reset(); - return; - } - - this.write_buffer.cursor += @intCast(written); - if (this.wrapper) |*wrapper| { - // Cycle to through the SSL state machine - _ = wrapper.flush(); + } else { + this.write_buffer.cursor += @intCast(written); } } @@ -513,7 +535,7 @@ const ProxyTunnel = struct { if (this.wrapper) |*wrapper| { return try wrapper.writeData(buf); } - return 0; + return error.ConnectionClosed; } pub fn detachSocket(this: *ProxyTunnel) void { @@ -734,11 +756,14 @@ fn NewHTTPContext(comptime ssl: bool) type { pending.hostname_len = @as(u8, @truncate(hostname.len)); pending.port = port; - // log("Keep-Alive release {s}:{d} (0x{})", .{ hostname, port, @intFromPtr(socket.socket) }); + log("Keep-Alive release {s}:{d}", .{ + hostname, + port, + }); return; } } - + log("close socket", .{}); closeSocket(socket); } @@ -1695,6 +1720,7 @@ pub fn onClose( if (client.proxy_tunnel) |tunnel| { client.proxy_tunnel = null; // always detach the socket from the tunnel onClose (timeout, connectError will call fail that will do the same) + tunnel.shutdown(); tunnel.detachAndDeref(); } const in_progress = client.state.stage != .done and client.state.stage != .fail and client.state.flags.is_redirect_pending == false; @@ -2087,6 +2113,7 @@ pub const InternalState = struct { .request_body = "", .certificate_info = null, .flags = .{}, + .total_body_received = 0, }; } @@ -2293,7 +2320,8 @@ pub fn isKeepAlivePossible(this: *HTTPClient) bool { // TODO keepalive for unix sockets if (this.unix_socket_path.length() > 0) return false; // is not possible to reuse Proxy with TSL, so disable keepalive if url is tunneling HTTPS - if (this.http_proxy != null and this.url.isHTTPS()) { + if (this.proxy_tunnel != null or (this.http_proxy != null and this.url.isHTTPS())) { + log("Keep-Alive release (proxy tunneling https)", .{}); return false; } @@ -3042,17 +3070,28 @@ pub fn doRedirect( assert(this.redirect_type == FetchRedirect.follow); this.unregisterAbortTracker(); - // we need to clean the client reference before closing the socket because we are going to reuse the same ref in a another request - if (this.isKeepAlivePossible()) { - assert(this.connected_url.hostname.len > 0); - ctx.releaseSocket( - socket, - this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized, - this.connected_url.hostname, - this.connected_url.getPortAuto(), - ); + if (this.proxy_tunnel) |tunnel| { + log("close the tunnel in redirect", .{}); + this.proxy_tunnel = null; + tunnel.detachAndDeref(); + if (!socket.isClosed()) { + log("close socket in redirect", .{}); + NewHTTPContext(is_ssl).closeSocket(socket); + } } else { - NewHTTPContext(is_ssl).closeSocket(socket); + // we need to clean the client reference before closing the socket because we are going to reuse the same ref in a another request + if (this.isKeepAlivePossible()) { + log("Keep-Alive release in redirect", .{}); + assert(this.connected_url.hostname.len > 0); + ctx.releaseSocket( + socket, + this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized, + this.connected_url.hostname, + this.connected_url.getPortAuto(), + ); + } else { + NewHTTPContext(is_ssl).closeSocket(socket); + } } this.connected_url = URL{}; @@ -3218,10 +3257,12 @@ noinline fn sendInitialRequestPayload(this: *HTTPClient, comptime is_first_call: if (this.http_proxy) |_| { if (this.url.isHTTPS()) { + log("start proxy tunneling (https proxy)", .{}); //DO the tunneling! this.flags.proxy_tunneling = true; try writeProxyConnect(@TypeOf(writer), writer, this); } else { + log("start proxy request (http proxy)", .{}); // HTTP do not need tunneling with CONNECT just a slightly different version of the request try writeProxyRequest( @TypeOf(writer), @@ -3231,6 +3272,7 @@ noinline fn sendInitialRequestPayload(this: *HTTPClient, comptime is_first_call: ); } } else { + log("normal request", .{}); try writeRequest( @TypeOf(writer), writer, @@ -3314,6 +3356,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s switch (this.state.request_stage) { .pending, .headers => { + log("sendInitialRequestPayload", .{}); this.setTimeout(socket, 5); const result = sendInitialRequestPayload(this, is_first_call, is_ssl, socket) catch |err| { this.closeAndFail(err, is_ssl, socket); @@ -3361,6 +3404,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s } }, .body => { + log("send body", .{}); this.setTimeout(socket, 5); switch (this.state.original_request_body) { @@ -3428,6 +3472,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s } }, .proxy_body => { + log("send proxy body", .{}); if (this.proxy_tunnel) |proxy| { switch (this.state.original_request_body) { .bytes => { @@ -3447,6 +3492,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s .stream => { var stream = &this.state.original_request_body.stream; stream.has_backpressure = false; + this.setTimeout(socket, 5); // to simplify things here the buffer contains the raw data we just need to flush to the socket it if (stream.buffer.isNotEmpty()) { @@ -3457,9 +3503,6 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s if (amount < to_send.len) { stream.has_backpressure = true; } - if (stream.buffer.isEmpty()) { - stream.buffer.reset(); - } } if (stream.hasEnded()) { this.state.request_stage = .done; @@ -3474,6 +3517,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s } }, .proxy_headers => { + log("send proxy headers", .{}); if (this.proxy_tunnel) |proxy| { this.setTimeout(socket, 5); var stack_buffer = std.heap.stackFallback(1024 * 16, bun.default_allocator); @@ -3513,6 +3557,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s if (comptime is_first_call) { if (amount == 0) { // don't worry about it + log("is_first_call and amount == 0", .{}); return; } } @@ -3560,10 +3605,11 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc this.fail(err); } -fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { +fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, start_payload: []const u8) void { + log("startProxyHandshake", .{}); // if we have options we pass them (ca, reject_unauthorized, etc) otherwise use the default const ssl_options = if (this.tls_props != null) this.tls_props.?.* else JSC.API.ServerConfig.SSLConfig.zero; - ProxyTunnel.start(this, is_ssl, socket, ssl_options); + ProxyTunnel.start(this, is_ssl, socket, ssl_options, start_payload); } inline fn handleShortRead( @@ -3592,6 +3638,7 @@ pub fn handleOnDataHeaders( ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket, ) void { + log("handleOnDataHeaders", .{}); var to_read = incoming_data; var amount_read: usize = 0; var needs_move = true; @@ -3608,6 +3655,7 @@ pub fn handleOnDataHeaders( // minimal http/1.1 request size is 16 bytes without headers and 26 with Host header // if is less than 16 will always be a ShortRead if (to_read.len < 16) { + log("handleShortRead", .{}); this.handleShortRead(is_ssl, incoming_data, socket, needs_move); return; } @@ -3633,9 +3681,11 @@ pub fn handleOnDataHeaders( const body_buf = to_read[@min(@as(usize, @intCast(response.bytes_read)), to_read.len)..]; // handle the case where we have a 100 Continue - if (response.status_code == 100) { + if (response.status_code >= 100 and response.status_code < 200) { + log("information headers", .{}); // we still can have the 200 OK in the same buffer sometimes if (body_buf.len > 0) { + log("information headers with body", .{}); this.onData(is_ssl, body_buf, ctx, socket); } return; @@ -3675,7 +3725,7 @@ pub fn handleOnDataHeaders( if (this.flags.proxy_tunneling and this.proxy_tunnel == null) { // we are proxing we dont need to cloneMetadata yet - this.startProxyHandshake(is_ssl, socket); + this.startProxyHandshake(is_ssl, socket, body_buf); return; } @@ -3736,6 +3786,13 @@ pub fn onData( return; } + if (this.proxy_tunnel) |proxy| { + // if we have a tunnel we dont care about the other stages, we will just tunnel the data + this.setTimeout(socket, 5); + proxy.receiveData(incoming_data); + return; + } + switch (this.state.response_stage) { .pending, .headers => { this.handleOnDataHeaders(is_ssl, incoming_data, ctx, socket); @@ -3743,47 +3800,32 @@ pub fn onData( .body => { this.setTimeout(socket, 5); - if (this.proxy_tunnel) |proxy| { - proxy.receiveData(incoming_data); - } else { - const report_progress = this.handleResponseBody(incoming_data, false) catch |err| { - this.closeAndFail(err, is_ssl, socket); - return; - }; + const report_progress = this.handleResponseBody(incoming_data, false) catch |err| { + this.closeAndFail(err, is_ssl, socket); + return; + }; - if (report_progress) { - this.progressUpdate(is_ssl, ctx, socket); - return; - } + if (report_progress) { + this.progressUpdate(is_ssl, ctx, socket); + return; } }, .body_chunk => { this.setTimeout(socket, 5); - if (this.proxy_tunnel) |proxy| { - proxy.receiveData(incoming_data); - } else { - const report_progress = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| { - this.closeAndFail(err, is_ssl, socket); - return; - }; + const report_progress = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| { + this.closeAndFail(err, is_ssl, socket); + return; + }; - if (report_progress) { - this.progressUpdate(is_ssl, ctx, socket); - return; - } + if (report_progress) { + this.progressUpdate(is_ssl, ctx, socket); + return; } }, .fail => {}, - .proxy_headers, .proxy_handshake => { - this.setTimeout(socket, 5); - if (this.proxy_tunnel) |proxy| { - proxy.receiveData(incoming_data); - } - return; - }, else => { this.state.pending_response = null; this.closeAndFail(error.UnexpectedData, is_ssl, socket); @@ -3815,6 +3857,7 @@ fn fail(this: *HTTPClient, err: anyerror) void { if (this.proxy_tunnel) |tunnel| { this.proxy_tunnel = null; + tunnel.shutdown(); // always detach the socket from the tunnel in case of fail tunnel.detachAndDeref(); } @@ -3898,16 +3941,28 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon if (is_done) { this.unregisterAbortTracker(); - - if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) { - ctx.releaseSocket( - socket, - this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized, - this.connected_url.hostname, - this.connected_url.getPortAuto(), - ); - } else if (!socket.isClosed()) { - NewHTTPContext(is_ssl).closeSocket(socket); + if (this.proxy_tunnel) |tunnel| { + log("close the tunnel", .{}); + this.proxy_tunnel = null; + tunnel.shutdown(); + tunnel.detachAndDeref(); + if (!socket.isClosed()) { + log("close socket", .{}); + NewHTTPContext(is_ssl).closeSocket(socket); + } + } else { + if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) { + log("release socket", .{}); + ctx.releaseSocket( + socket, + this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized, + this.connected_url.hostname, + this.connected_url.getPortAuto(), + ); + } else if (!socket.isClosed()) { + log("close socket", .{}); + NewHTTPContext(is_ssl).closeSocket(socket); + } } this.state.reset(this.allocator); @@ -3915,6 +3970,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon this.state.request_stage = .done; this.state.stage = .done; this.flags.proxy_tunneling = false; + log("done", .{}); } result.body.?.* = body; @@ -4070,6 +4126,7 @@ pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !void { if (!this.state.isChunkedEncoding()) { this.state.total_body_received += incoming_data.len; + log("handleResponseBodyFromSinglePacket {d}", .{this.state.total_body_received}); } defer { if (this.progress_node) |progress| { @@ -4120,7 +4177,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con } this.state.total_body_received += remainder.len; - + log("handleResponseBodyFromMultiplePackets {d}", .{this.state.total_body_received}); if (this.progress_node) |progress| { progress.activate(); progress.setCompletedItems(this.state.total_body_received); @@ -4180,6 +4237,7 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets( ); buffer.list.items.len -|= incoming_data.len - bytes_decoded; this.state.total_body_received += bytes_decoded; + log("handleResponseBodyChunkedEncodingFromMultiplePackets {d}", .{this.state.total_body_received}); buffer_ptr.* = buffer; @@ -4258,7 +4316,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket( ); buffer.len -|= incoming_data.len - bytes_decoded; this.state.total_body_received += bytes_decoded; - + log("handleResponseBodyChunkedEncodingFromSinglePacket {d}", .{this.state.total_body_received}); switch (pret) { // Invalid HTTP response body -1 => { diff --git a/test/cli/install/bun-install-proxy.test.ts b/test/cli/install/bun-install-proxy.test.ts new file mode 100644 index 0000000000..00d11cd18a --- /dev/null +++ b/test/cli/install/bun-install-proxy.test.ts @@ -0,0 +1,114 @@ +import { beforeAll, it } from "bun:test"; +import { exec, execSync } from "child_process"; +import { rm } from "fs/promises"; +import { bunEnv, bunExe, isLinux, tempDirWithFiles } from "harness"; +import { join } from "path"; +import { promisify } from "util"; +const execAsync = promisify(exec); +const dockerCLI = Bun.which("docker") as string; +const SQUID_URL = "http://127.0.0.1:3128"; +function isDockerEnabled(): boolean { + if (!dockerCLI) { + return false; + } + + // TODO: investigate why its not starting on Linux arm64 + if (isLinux && process.arch === "arm64") { + return false; + } + + try { + const info = execSync(`${dockerCLI} info`, { stdio: ["ignore", "pipe", "inherit"] }); + return info.toString().indexOf("Server Version:") !== -1; + } catch { + return false; + } +} +if (isDockerEnabled()) { + beforeAll(async () => { + async function isSquidRunning() { + const text = await fetch(SQUID_URL) + .then(res => res.text()) + .catch(() => {}); + return text?.includes("squid") ?? false; + } + if (!(await isSquidRunning())) { + // try to create or error if is already created + await execAsync( + `${dockerCLI} run -d --name squid-container -e TZ=UTC -p 3128:3128 ubuntu/squid:5.2-22.04_beta`, + ).catch(() => {}); + + async function waitForSquid(max_wait = 60_000) { + const start = Date.now(); + while (true) { + if (await isSquidRunning()) { + return; + } + if (Date.now() - start > max_wait) { + throw new Error("Squid did not start in time"); + } + + await Bun.sleep(1000); + } + } + // wait for squid to start + await waitForSquid(); + } + }); + + it("bun install with proxy with big packages", async () => { + const files = { + "package.json": JSON.stringify({ + "name": "test-install", + "module": "index.ts", + "type": "module", + "private": true, + "dependencies": { + "gastby": "1.0.1", + "mitata": "1.0.34", + "next.js": "1.0.3", + "react": "19.1.0", + "react-dom": "19.1.0", + "@types/react": "18.3.3", + "esbuild": "0.21.4", + "peechy": "0.4.34", + "prettier": "3.5.3", + "prettier-plugin-organize-imports": "4.0.0", + "source-map-js": "1.2.0", + "typescript": "5.7.2", + }, + }), + }; + const promises = new Array(5); + // this repro a hang when using a proxy, we run multiple times to make sure it's not a flaky test + for (let i = 0; i < 5; i++) { + const package_dir = tempDirWithFiles("codex-" + i, files); + + const { exited } = Bun.spawn([bunExe(), "install", "--ignore-scripts"], { + cwd: package_dir, + // @ts-ignore + env: { + ...bunEnv, + BUN_INSTALL_CACHE_DIR: join(package_dir, ".bun-install-cache"), + TMPDIR: join(package_dir, ".tmp"), + BUN_TMPDIR: join(package_dir, ".tmp"), + HTTPS_PROXY: SQUID_URL, + HTTP_PROXY: SQUID_URL, + }, + stdio: ["inherit", "inherit", "inherit"], + timeout: 20_000, + }); + promises[i] = exited + .then(r => { + if (r !== 0) { + throw new Error("failed to install with exit code " + r); + } + }) + .finally(() => { + return rm(package_dir, { recursive: true, force: true }); + }); + } + + await Promise.all(promises); + }, 60_000); +} diff --git a/test/harness.ts b/test/harness.ts index 175e64401b..e5acad6f1d 100644 --- a/test/harness.ts +++ b/test/harness.ts @@ -205,6 +205,28 @@ export async function makeTree(base: string, tree: DirectoryTree) { } } +export function makeTreeSync(base: string, tree: DirectoryTree) { + const isDirectoryTree = (value: string | DirectoryTree | Buffer): value is DirectoryTree => + typeof value === "object" && value && typeof value?.byteLength === "undefined"; + + for (const [name, raw_contents] of Object.entries(tree)) { + const contents = (typeof raw_contents === "function" ? raw_contents({ root: base }) : raw_contents) as string; + const joined = join(base, name); + if (name.includes("/")) { + const dir = dirname(name); + if (dir !== name && dir !== ".") { + fs.mkdirSync(join(base, dir), { recursive: true }); + } + } + if (isDirectoryTree(contents)) { + fs.mkdirSync(joined); + makeTreeSync(joined, contents); + continue; + } + fs.writeFileSync(joined, contents); + } +} + /** * Recursively create files within a new temporary directory. * @@ -224,7 +246,7 @@ export async function makeTree(base: string, tree: DirectoryTree) { */ export function tempDirWithFiles(basename: string, files: DirectoryTree): string { const base = fs.mkdtempSync(join(fs.realpathSync(os.tmpdir()), basename + "_")); - makeTree(base, files); + makeTreeSync(base, files); return base; }