fix(install/fetch) proper handle proxy (#19771)

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
Co-authored-by: Jarred-Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: cirospaciari <6379399+cirospaciari@users.noreply.github.com>
Co-authored-by: Meghan Denny <meghan@bun.sh>
This commit is contained in:
Ciro Spaciari
2025-05-20 21:11:22 -07:00
committed by GitHub
parent 497360d543
commit 0e883c935c
4 changed files with 310 additions and 101 deletions

View File

@@ -4,6 +4,7 @@ const BoringSSL = bun.BoringSSL.c;
const X509 = @import("./x509.zig");
const JSC = bun.JSC;
const uws = bun.uws;
const log = bun.Output.scoped(.SSLWrapper, true);
/// Mimics the behavior of openssl.c in uSockets, wrapping data that can be received from any where (network, DuplexStream, etc)
pub fn SSLWrapper(comptime T: type) type {
@@ -25,7 +26,9 @@ pub fn SSLWrapper(comptime T: type) type {
return struct {
const This = @This();
const BUFFER_SIZE = 16384;
// 64kb nice buffer size for SSL reads and writes, should be enough for most cases
// in reads we loop until we have no more data to read and in writes we loop until we have no more data to write/backpressure
const BUFFER_SIZE = 65536;
handlers: Handlers,
ssl: ?*BoringSSL.SSL,
@@ -110,6 +113,12 @@ pub fn SSLWrapper(comptime T: type) type {
// start the handshake
this.handleTraffic();
}
pub fn startWithPayload(this: *This, payload: []const u8) void {
this.handlers.onOpen(this.handlers.ctx);
this.receiveData(payload);
// start the handshake
this.handleTraffic();
}
/// Shutdown the read direction of the SSL (fake it just for convenience)
pub fn shutdownRead(this: *This) void {
@@ -180,10 +189,15 @@ pub fn SSLWrapper(comptime T: type) type {
// Return if we have pending data to be read or write
pub fn hasPendingData(this: *const This) bool {
const ssl = this.ssl orelse return false;
return BoringSSL.BIO_ctrl_pending(BoringSSL.SSL_get_wbio(ssl)) > 0 or BoringSSL.BIO_ctrl_pending(BoringSSL.SSL_get_rbio(ssl)) > 0;
}
/// Return if we buffered data inside the BIO read buffer, not necessarily will return data to read
/// this dont reflect SSL_pending()
fn hasPendingRead(this: *const This) bool {
const ssl = this.ssl orelse return false;
return BoringSSL.BIO_ctrl_pending(BoringSSL.SSL_get_rbio(ssl)) > 0;
}
// We sent or received a shutdown (closing or closed)
pub fn isShutdown(this: *const This) bool {
return this.flags.closed_notified or this.flags.received_ssl_shutdown or this.flags.sent_ssl_shutdown;
@@ -382,18 +396,12 @@ pub fn SSLWrapper(comptime T: type) type {
// read data from the input BIO
while (true) {
log("handleReading", .{});
const ssl = this.ssl orelse return false;
const input = BoringSSL.SSL_get_rbio(ssl) orelse return true;
const pending = BoringSSL.BIO_ctrl_pending(input);
if (pending <= 0) {
// no data to write
break;
}
const available = buffer[read..];
const just_read = BoringSSL.SSL_read(ssl, available.ptr, @intCast(available.len));
log("just read {d}", .{just_read});
if (just_read <= 0) {
const err = BoringSSL.SSL_get_error(ssl, just_read);
BoringSSL.ERR_clear_error();
@@ -424,11 +432,13 @@ pub fn SSLWrapper(comptime T: type) type {
// flush the reading
if (read > 0) {
log("triggering data callback (read {d})", .{read});
this.triggerDataCallback(buffer[0..read]);
}
this.triggerCloseCallback();
return false;
} else {
log("wanna read/write just break", .{});
// we wanna read/write just break
break;
}
@@ -438,6 +448,7 @@ pub fn SSLWrapper(comptime T: type) type {
read += @intCast(just_read);
if (read == buffer.len) {
log("triggering data callback (read {d}) and resetting read buffer", .{read});
// we filled the buffer
this.triggerDataCallback(buffer[0..read]);
read = 0;
@@ -445,41 +456,45 @@ pub fn SSLWrapper(comptime T: type) type {
}
// we finished reading
if (read > 0) {
log("triggering data callback (read {d})", .{read});
this.triggerDataCallback(buffer[0..read]);
}
return true;
}
fn handleWriting(this: *This, buffer: *[BUFFER_SIZE]u8) void {
var read: usize = 0;
while (true) {
const ssl = this.ssl orelse return;
const output = BoringSSL.SSL_get_wbio(ssl) orelse return;
// read data from the output BIO
const pending = BoringSSL.BIO_ctrl_pending(output);
if (pending <= 0) {
// no data to write
const available = buffer[read..];
const just_read = BoringSSL.BIO_read(output, available.ptr, @intCast(available.len));
if (just_read > 0) {
read += @intCast(just_read);
if (read == buffer.len) {
this.triggerWannaWriteCallback(buffer[0..read]);
read = 0;
}
} else {
break;
}
// limit the read to the buffer size
const len = @min(pending, buffer.len);
const pending_buffer = buffer[0..len];
const read = BoringSSL.BIO_read(output, pending_buffer.ptr, len);
if (read > 0) {
this.triggerWannaWriteCallback(buffer[0..@intCast(read)]);
}
}
if (read > 0) {
this.triggerWannaWriteCallback(buffer[0..read]);
}
}
fn handleTraffic(this: *This) void {
// always handle the handshake first
if (this.updateHandshakeState()) {
// shared stack buffer for reading and writing
var buffer: [BUFFER_SIZE]u8 = undefined;
// drain the input BIO first
this.handleWriting(&buffer);
// drain the output BIO
if (this.handleReading(&buffer)) {
// drain the output BIO in loop, because read can trigger writing and vice versa
while (this.hasPendingRead() and this.handleReading(&buffer)) {
// read data can trigger writing so we need to handle it
this.handleWriting(&buffer);
}

View File

@@ -241,6 +241,7 @@ const ProxyTunnel = struct {
const ProxyTunnelWrapper = SSLWrapper(*HTTPClient);
fn onOpen(this: *HTTPClient) void {
log("ProxyTunnel onOpen", .{});
this.state.response_stage = .proxy_handshake;
this.state.request_stage = .proxy_handshake;
if (this.proxy_tunnel) |proxy| {
@@ -271,13 +272,13 @@ const ProxyTunnel = struct {
fn onData(this: *HTTPClient, decoded_data: []const u8) void {
if (decoded_data.len == 0) return;
log("onData decoded {}", .{decoded_data.len});
log("ProxyTunnel onData decoded {}", .{decoded_data.len});
if (this.proxy_tunnel) |proxy| {
proxy.ref();
defer proxy.deref();
switch (this.state.response_stage) {
.body => {
log("ProxyTunnel onData body", .{});
if (decoded_data.len == 0) return;
const report_progress = this.handleResponseBody(decoded_data, false) catch |err| {
proxy.close(err);
@@ -298,6 +299,7 @@ const ProxyTunnel = struct {
}
},
.body_chunk => {
log("ProxyTunnel onData body_chunk", .{});
if (decoded_data.len == 0) return;
const report_progress = this.handleResponseBodyChunkedEncoding(decoded_data) catch |err| {
proxy.close(err);
@@ -318,6 +320,7 @@ const ProxyTunnel = struct {
}
},
.proxy_headers => {
log("ProxyTunnel onData proxy_headers", .{});
switch (proxy.socket) {
.ssl => |socket| {
this.handleOnDataHeaders(true, decoded_data, &http_thread.https_context, socket);
@@ -329,6 +332,7 @@ const ProxyTunnel = struct {
}
},
else => {
log("ProxyTunnel onData unexpected data", .{});
this.state.pending_response = null;
proxy.close(error.UnexpectedData);
},
@@ -338,6 +342,7 @@ const ProxyTunnel = struct {
fn onHandshake(this: *HTTPClient, handshake_success: bool, ssl_error: uws.us_bun_verify_error_t) void {
if (this.proxy_tunnel) |proxy| {
log("ProxyTunnel onHandshake", .{});
proxy.ref();
defer proxy.deref();
this.state.response_stage = .proxy_headers;
@@ -349,6 +354,7 @@ const ProxyTunnel = struct {
.reason = if (ssl_error.code == null) "" else ssl_error.reason[0..bun.len(ssl_error.reason) :0],
};
if (handshake_success) {
log("ProxyTunnel onHandshake success", .{});
// handshake completed but we may have ssl errors
this.flags.did_have_handshaking_error = handshake_error.error_no != 0;
if (this.flags.reject_unauthorized) {
@@ -365,13 +371,16 @@ const ProxyTunnel = struct {
switch (proxy.socket) {
.ssl => |socket| {
if (!this.checkServerIdentity(true, socket, handshake_error, ssl_ptr, false)) {
log("ProxyTunnel onHandshake checkServerIdentity failed", .{});
this.flags.did_have_handshaking_error = true;
this.unregisterAbortTracker();
return;
}
},
.tcp => |socket| {
if (!this.checkServerIdentity(false, socket, handshake_error, ssl_ptr, false)) {
log("ProxyTunnel onHandshake checkServerIdentity failed", .{});
this.flags.did_have_handshaking_error = true;
this.unregisterAbortTracker();
return;
@@ -391,9 +400,10 @@ const ProxyTunnel = struct {
.none => {},
}
} else {
log("ProxyTunnel onHandshake failed", .{});
// if we are here is because server rejected us, and the error_no is the cause of this
// if we set reject_unauthorized == false this means the server requires custom CA aka NODE_EXTRA_CA_CERTS
if (this.flags.did_have_handshaking_error) {
if (this.flags.did_have_handshaking_error and handshake_error.error_no != 0) {
proxy.close(BoringSSL.getCertErrorFromNo(handshake_error.error_no));
return;
}
@@ -407,8 +417,8 @@ const ProxyTunnel = struct {
pub fn write(this: *HTTPClient, encoded_data: []const u8) void {
if (this.proxy_tunnel) |proxy| {
const written = switch (proxy.socket) {
.ssl => |socket| socket.write(encoded_data, true),
.tcp => |socket| socket.write(encoded_data, true),
.ssl => |socket| socket.write(encoded_data, false),
.tcp => |socket| socket.write(encoded_data, false),
.none => 0,
};
const pending = encoded_data[@intCast(written)..];
@@ -420,6 +430,7 @@ const ProxyTunnel = struct {
}
fn onClose(this: *HTTPClient) void {
log("ProxyTunnel onClose {s}", .{if (this.proxy_tunnel == null) "tunnel is detached" else "tunnel exists"});
if (this.proxy_tunnel) |proxy| {
proxy.ref();
// defer the proxy deref the proxy tunnel may still be in use after triggering the close callback
@@ -438,7 +449,7 @@ const ProxyTunnel = struct {
}
}
fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: JSC.API.ServerConfig.SSLConfig) void {
fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: JSC.API.ServerConfig.SSLConfig, start_payload: []const u8) void {
const proxy_tunnel = bun.new(ProxyTunnel, .{
.ref_count = .init(),
});
@@ -470,11 +481,21 @@ const ProxyTunnel = struct {
} else {
proxy_tunnel.socket = .{ .tcp = socket };
}
proxy_tunnel.wrapper.?.start();
if (start_payload.len > 0) {
log("proxy tunnel start with payload", .{});
proxy_tunnel.wrapper.?.startWithPayload(start_payload);
} else {
log("proxy tunnel start", .{});
proxy_tunnel.wrapper.?.start();
}
}
pub fn close(this: *ProxyTunnel, err: anyerror) void {
this.shutdown_err = err;
this.shutdown();
}
pub fn shutdown(this: *ProxyTunnel) void {
if (this.wrapper) |*wrapper| {
// fast shutdown the connection
_ = wrapper.shutdown(true);
@@ -482,8 +503,14 @@ const ProxyTunnel = struct {
}
pub fn onWritable(this: *ProxyTunnel, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
log("ProxyTunnel onWritable", .{});
this.ref();
defer this.deref();
defer if (this.wrapper) |*wrapper| {
// Cycle to through the SSL state machine
_ = wrapper.flush();
};
const encoded_data = this.write_buffer.slice();
if (encoded_data.len == 0) {
return;
@@ -491,13 +518,8 @@ const ProxyTunnel = struct {
const written = socket.write(encoded_data, true);
if (written == encoded_data.len) {
this.write_buffer.reset();
return;
}
this.write_buffer.cursor += @intCast(written);
if (this.wrapper) |*wrapper| {
// Cycle to through the SSL state machine
_ = wrapper.flush();
} else {
this.write_buffer.cursor += @intCast(written);
}
}
@@ -513,7 +535,7 @@ const ProxyTunnel = struct {
if (this.wrapper) |*wrapper| {
return try wrapper.writeData(buf);
}
return 0;
return error.ConnectionClosed;
}
pub fn detachSocket(this: *ProxyTunnel) void {
@@ -734,11 +756,14 @@ fn NewHTTPContext(comptime ssl: bool) type {
pending.hostname_len = @as(u8, @truncate(hostname.len));
pending.port = port;
// log("Keep-Alive release {s}:{d} (0x{})", .{ hostname, port, @intFromPtr(socket.socket) });
log("Keep-Alive release {s}:{d}", .{
hostname,
port,
});
return;
}
}
log("close socket", .{});
closeSocket(socket);
}
@@ -1695,6 +1720,7 @@ pub fn onClose(
if (client.proxy_tunnel) |tunnel| {
client.proxy_tunnel = null;
// always detach the socket from the tunnel onClose (timeout, connectError will call fail that will do the same)
tunnel.shutdown();
tunnel.detachAndDeref();
}
const in_progress = client.state.stage != .done and client.state.stage != .fail and client.state.flags.is_redirect_pending == false;
@@ -2087,6 +2113,7 @@ pub const InternalState = struct {
.request_body = "",
.certificate_info = null,
.flags = .{},
.total_body_received = 0,
};
}
@@ -2293,7 +2320,8 @@ pub fn isKeepAlivePossible(this: *HTTPClient) bool {
// TODO keepalive for unix sockets
if (this.unix_socket_path.length() > 0) return false;
// is not possible to reuse Proxy with TSL, so disable keepalive if url is tunneling HTTPS
if (this.http_proxy != null and this.url.isHTTPS()) {
if (this.proxy_tunnel != null or (this.http_proxy != null and this.url.isHTTPS())) {
log("Keep-Alive release (proxy tunneling https)", .{});
return false;
}
@@ -3042,17 +3070,28 @@ pub fn doRedirect(
assert(this.redirect_type == FetchRedirect.follow);
this.unregisterAbortTracker();
// we need to clean the client reference before closing the socket because we are going to reuse the same ref in a another request
if (this.isKeepAlivePossible()) {
assert(this.connected_url.hostname.len > 0);
ctx.releaseSocket(
socket,
this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized,
this.connected_url.hostname,
this.connected_url.getPortAuto(),
);
if (this.proxy_tunnel) |tunnel| {
log("close the tunnel in redirect", .{});
this.proxy_tunnel = null;
tunnel.detachAndDeref();
if (!socket.isClosed()) {
log("close socket in redirect", .{});
NewHTTPContext(is_ssl).closeSocket(socket);
}
} else {
NewHTTPContext(is_ssl).closeSocket(socket);
// we need to clean the client reference before closing the socket because we are going to reuse the same ref in a another request
if (this.isKeepAlivePossible()) {
log("Keep-Alive release in redirect", .{});
assert(this.connected_url.hostname.len > 0);
ctx.releaseSocket(
socket,
this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized,
this.connected_url.hostname,
this.connected_url.getPortAuto(),
);
} else {
NewHTTPContext(is_ssl).closeSocket(socket);
}
}
this.connected_url = URL{};
@@ -3218,10 +3257,12 @@ noinline fn sendInitialRequestPayload(this: *HTTPClient, comptime is_first_call:
if (this.http_proxy) |_| {
if (this.url.isHTTPS()) {
log("start proxy tunneling (https proxy)", .{});
//DO the tunneling!
this.flags.proxy_tunneling = true;
try writeProxyConnect(@TypeOf(writer), writer, this);
} else {
log("start proxy request (http proxy)", .{});
// HTTP do not need tunneling with CONNECT just a slightly different version of the request
try writeProxyRequest(
@TypeOf(writer),
@@ -3231,6 +3272,7 @@ noinline fn sendInitialRequestPayload(this: *HTTPClient, comptime is_first_call:
);
}
} else {
log("normal request", .{});
try writeRequest(
@TypeOf(writer),
writer,
@@ -3314,6 +3356,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
switch (this.state.request_stage) {
.pending, .headers => {
log("sendInitialRequestPayload", .{});
this.setTimeout(socket, 5);
const result = sendInitialRequestPayload(this, is_first_call, is_ssl, socket) catch |err| {
this.closeAndFail(err, is_ssl, socket);
@@ -3361,6 +3404,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
}
},
.body => {
log("send body", .{});
this.setTimeout(socket, 5);
switch (this.state.original_request_body) {
@@ -3428,6 +3472,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
}
},
.proxy_body => {
log("send proxy body", .{});
if (this.proxy_tunnel) |proxy| {
switch (this.state.original_request_body) {
.bytes => {
@@ -3447,6 +3492,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
.stream => {
var stream = &this.state.original_request_body.stream;
stream.has_backpressure = false;
this.setTimeout(socket, 5);
// to simplify things here the buffer contains the raw data we just need to flush to the socket it
if (stream.buffer.isNotEmpty()) {
@@ -3457,9 +3503,6 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
if (amount < to_send.len) {
stream.has_backpressure = true;
}
if (stream.buffer.isEmpty()) {
stream.buffer.reset();
}
}
if (stream.hasEnded()) {
this.state.request_stage = .done;
@@ -3474,6 +3517,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
}
},
.proxy_headers => {
log("send proxy headers", .{});
if (this.proxy_tunnel) |proxy| {
this.setTimeout(socket, 5);
var stack_buffer = std.heap.stackFallback(1024 * 16, bun.default_allocator);
@@ -3513,6 +3557,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
if (comptime is_first_call) {
if (amount == 0) {
// don't worry about it
log("is_first_call and amount == 0", .{});
return;
}
}
@@ -3560,10 +3605,11 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc
this.fail(err);
}
fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, start_payload: []const u8) void {
log("startProxyHandshake", .{});
// if we have options we pass them (ca, reject_unauthorized, etc) otherwise use the default
const ssl_options = if (this.tls_props != null) this.tls_props.?.* else JSC.API.ServerConfig.SSLConfig.zero;
ProxyTunnel.start(this, is_ssl, socket, ssl_options);
ProxyTunnel.start(this, is_ssl, socket, ssl_options, start_payload);
}
inline fn handleShortRead(
@@ -3592,6 +3638,7 @@ pub fn handleOnDataHeaders(
ctx: *NewHTTPContext(is_ssl),
socket: NewHTTPContext(is_ssl).HTTPSocket,
) void {
log("handleOnDataHeaders", .{});
var to_read = incoming_data;
var amount_read: usize = 0;
var needs_move = true;
@@ -3608,6 +3655,7 @@ pub fn handleOnDataHeaders(
// minimal http/1.1 request size is 16 bytes without headers and 26 with Host header
// if is less than 16 will always be a ShortRead
if (to_read.len < 16) {
log("handleShortRead", .{});
this.handleShortRead(is_ssl, incoming_data, socket, needs_move);
return;
}
@@ -3633,9 +3681,11 @@ pub fn handleOnDataHeaders(
const body_buf = to_read[@min(@as(usize, @intCast(response.bytes_read)), to_read.len)..];
// handle the case where we have a 100 Continue
if (response.status_code == 100) {
if (response.status_code >= 100 and response.status_code < 200) {
log("information headers", .{});
// we still can have the 200 OK in the same buffer sometimes
if (body_buf.len > 0) {
log("information headers with body", .{});
this.onData(is_ssl, body_buf, ctx, socket);
}
return;
@@ -3675,7 +3725,7 @@ pub fn handleOnDataHeaders(
if (this.flags.proxy_tunneling and this.proxy_tunnel == null) {
// we are proxing we dont need to cloneMetadata yet
this.startProxyHandshake(is_ssl, socket);
this.startProxyHandshake(is_ssl, socket, body_buf);
return;
}
@@ -3736,6 +3786,13 @@ pub fn onData(
return;
}
if (this.proxy_tunnel) |proxy| {
// if we have a tunnel we dont care about the other stages, we will just tunnel the data
this.setTimeout(socket, 5);
proxy.receiveData(incoming_data);
return;
}
switch (this.state.response_stage) {
.pending, .headers => {
this.handleOnDataHeaders(is_ssl, incoming_data, ctx, socket);
@@ -3743,47 +3800,32 @@ pub fn onData(
.body => {
this.setTimeout(socket, 5);
if (this.proxy_tunnel) |proxy| {
proxy.receiveData(incoming_data);
} else {
const report_progress = this.handleResponseBody(incoming_data, false) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
const report_progress = this.handleResponseBody(incoming_data, false) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
if (report_progress) {
this.progressUpdate(is_ssl, ctx, socket);
return;
}
if (report_progress) {
this.progressUpdate(is_ssl, ctx, socket);
return;
}
},
.body_chunk => {
this.setTimeout(socket, 5);
if (this.proxy_tunnel) |proxy| {
proxy.receiveData(incoming_data);
} else {
const report_progress = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
const report_progress = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
if (report_progress) {
this.progressUpdate(is_ssl, ctx, socket);
return;
}
if (report_progress) {
this.progressUpdate(is_ssl, ctx, socket);
return;
}
},
.fail => {},
.proxy_headers, .proxy_handshake => {
this.setTimeout(socket, 5);
if (this.proxy_tunnel) |proxy| {
proxy.receiveData(incoming_data);
}
return;
},
else => {
this.state.pending_response = null;
this.closeAndFail(error.UnexpectedData, is_ssl, socket);
@@ -3815,6 +3857,7 @@ fn fail(this: *HTTPClient, err: anyerror) void {
if (this.proxy_tunnel) |tunnel| {
this.proxy_tunnel = null;
tunnel.shutdown();
// always detach the socket from the tunnel in case of fail
tunnel.detachAndDeref();
}
@@ -3898,16 +3941,28 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
if (is_done) {
this.unregisterAbortTracker();
if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) {
ctx.releaseSocket(
socket,
this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized,
this.connected_url.hostname,
this.connected_url.getPortAuto(),
);
} else if (!socket.isClosed()) {
NewHTTPContext(is_ssl).closeSocket(socket);
if (this.proxy_tunnel) |tunnel| {
log("close the tunnel", .{});
this.proxy_tunnel = null;
tunnel.shutdown();
tunnel.detachAndDeref();
if (!socket.isClosed()) {
log("close socket", .{});
NewHTTPContext(is_ssl).closeSocket(socket);
}
} else {
if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) {
log("release socket", .{});
ctx.releaseSocket(
socket,
this.flags.did_have_handshaking_error and !this.flags.reject_unauthorized,
this.connected_url.hostname,
this.connected_url.getPortAuto(),
);
} else if (!socket.isClosed()) {
log("close socket", .{});
NewHTTPContext(is_ssl).closeSocket(socket);
}
}
this.state.reset(this.allocator);
@@ -3915,6 +3970,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
this.state.request_stage = .done;
this.state.stage = .done;
this.flags.proxy_tunneling = false;
log("done", .{});
}
result.body.?.* = body;
@@ -4070,6 +4126,7 @@ pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_
fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !void {
if (!this.state.isChunkedEncoding()) {
this.state.total_body_received += incoming_data.len;
log("handleResponseBodyFromSinglePacket {d}", .{this.state.total_body_received});
}
defer {
if (this.progress_node) |progress| {
@@ -4120,7 +4177,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
}
this.state.total_body_received += remainder.len;
log("handleResponseBodyFromMultiplePackets {d}", .{this.state.total_body_received});
if (this.progress_node) |progress| {
progress.activate();
progress.setCompletedItems(this.state.total_body_received);
@@ -4180,6 +4237,7 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
);
buffer.list.items.len -|= incoming_data.len - bytes_decoded;
this.state.total_body_received += bytes_decoded;
log("handleResponseBodyChunkedEncodingFromMultiplePackets {d}", .{this.state.total_body_received});
buffer_ptr.* = buffer;
@@ -4258,7 +4316,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
);
buffer.len -|= incoming_data.len - bytes_decoded;
this.state.total_body_received += bytes_decoded;
log("handleResponseBodyChunkedEncodingFromSinglePacket {d}", .{this.state.total_body_received});
switch (pret) {
// Invalid HTTP response body
-1 => {

View File

@@ -0,0 +1,114 @@
import { beforeAll, it } from "bun:test";
import { exec, execSync } from "child_process";
import { rm } from "fs/promises";
import { bunEnv, bunExe, isLinux, tempDirWithFiles } from "harness";
import { join } from "path";
import { promisify } from "util";
const execAsync = promisify(exec);
const dockerCLI = Bun.which("docker") as string;
const SQUID_URL = "http://127.0.0.1:3128";
function isDockerEnabled(): boolean {
if (!dockerCLI) {
return false;
}
// TODO: investigate why its not starting on Linux arm64
if (isLinux && process.arch === "arm64") {
return false;
}
try {
const info = execSync(`${dockerCLI} info`, { stdio: ["ignore", "pipe", "inherit"] });
return info.toString().indexOf("Server Version:") !== -1;
} catch {
return false;
}
}
if (isDockerEnabled()) {
beforeAll(async () => {
async function isSquidRunning() {
const text = await fetch(SQUID_URL)
.then(res => res.text())
.catch(() => {});
return text?.includes("squid") ?? false;
}
if (!(await isSquidRunning())) {
// try to create or error if is already created
await execAsync(
`${dockerCLI} run -d --name squid-container -e TZ=UTC -p 3128:3128 ubuntu/squid:5.2-22.04_beta`,
).catch(() => {});
async function waitForSquid(max_wait = 60_000) {
const start = Date.now();
while (true) {
if (await isSquidRunning()) {
return;
}
if (Date.now() - start > max_wait) {
throw new Error("Squid did not start in time");
}
await Bun.sleep(1000);
}
}
// wait for squid to start
await waitForSquid();
}
});
it("bun install with proxy with big packages", async () => {
const files = {
"package.json": JSON.stringify({
"name": "test-install",
"module": "index.ts",
"type": "module",
"private": true,
"dependencies": {
"gastby": "1.0.1",
"mitata": "1.0.34",
"next.js": "1.0.3",
"react": "19.1.0",
"react-dom": "19.1.0",
"@types/react": "18.3.3",
"esbuild": "0.21.4",
"peechy": "0.4.34",
"prettier": "3.5.3",
"prettier-plugin-organize-imports": "4.0.0",
"source-map-js": "1.2.0",
"typescript": "5.7.2",
},
}),
};
const promises = new Array(5);
// this repro a hang when using a proxy, we run multiple times to make sure it's not a flaky test
for (let i = 0; i < 5; i++) {
const package_dir = tempDirWithFiles("codex-" + i, files);
const { exited } = Bun.spawn([bunExe(), "install", "--ignore-scripts"], {
cwd: package_dir,
// @ts-ignore
env: {
...bunEnv,
BUN_INSTALL_CACHE_DIR: join(package_dir, ".bun-install-cache"),
TMPDIR: join(package_dir, ".tmp"),
BUN_TMPDIR: join(package_dir, ".tmp"),
HTTPS_PROXY: SQUID_URL,
HTTP_PROXY: SQUID_URL,
},
stdio: ["inherit", "inherit", "inherit"],
timeout: 20_000,
});
promises[i] = exited
.then(r => {
if (r !== 0) {
throw new Error("failed to install with exit code " + r);
}
})
.finally(() => {
return rm(package_dir, { recursive: true, force: true });
});
}
await Promise.all(promises);
}, 60_000);
}

View File

@@ -205,6 +205,28 @@ export async function makeTree(base: string, tree: DirectoryTree) {
}
}
export function makeTreeSync(base: string, tree: DirectoryTree) {
const isDirectoryTree = (value: string | DirectoryTree | Buffer): value is DirectoryTree =>
typeof value === "object" && value && typeof value?.byteLength === "undefined";
for (const [name, raw_contents] of Object.entries(tree)) {
const contents = (typeof raw_contents === "function" ? raw_contents({ root: base }) : raw_contents) as string;
const joined = join(base, name);
if (name.includes("/")) {
const dir = dirname(name);
if (dir !== name && dir !== ".") {
fs.mkdirSync(join(base, dir), { recursive: true });
}
}
if (isDirectoryTree(contents)) {
fs.mkdirSync(joined);
makeTreeSync(joined, contents);
continue;
}
fs.writeFileSync(joined, contents);
}
}
/**
* Recursively create files within a new temporary directory.
*
@@ -224,7 +246,7 @@ export async function makeTree(base: string, tree: DirectoryTree) {
*/
export function tempDirWithFiles(basename: string, files: DirectoryTree): string {
const base = fs.mkdtempSync(join(fs.realpathSync(os.tmpdir()), basename + "_"));
makeTree(base, files);
makeTreeSync(base, files);
return base;
}