Compare commits

...

1 Commits

Author SHA1 Message Date
Jarred Sumner
633a457e72 Expose fewer details of the HTTP client outside the HTTP thread 2024-07-26 05:20:52 -07:00
10 changed files with 666 additions and 549 deletions

View File

@@ -192,7 +192,7 @@ pub fn main() anyerror!void {
try channel.buffer.ensureTotalCapacity(args.count);
try NetworkThread.init();
if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests.store(args.concurrency, .monotonic);
if (args.concurrency > 0) HTTP.max_simultaneous_requests.store(args.concurrency, .monotonic);
const Group = struct {
response_body: MutableString = undefined,
context: HTTP.HTTPChannelContext = undefined,

View File

@@ -1,7 +1,7 @@
const std = @import("std");
const Api = @import("../../api/schema.zig").Api;
const bun = @import("root").bun;
const MimeType = bun.http.MimeType;
const MimeType = http.MimeType;
const ZigURL = @import("../../url.zig").URL;
const http = bun.http;
const FetchRedirect = http.FetchRedirect;
@@ -57,6 +57,7 @@ const Async = bun.Async;
const BoringSSL = bun.BoringSSL;
const X509 = @import("../api/bun/x509.zig");
const PosixToWinNormalizer = bun.path.PosixToWinNormalizer;
const Sendfile = http.Sendfile;
pub const Response = struct {
const ResponseMixin = BodyMixin(@This());
@@ -720,7 +721,7 @@ pub const Fetch = struct {
pub const FetchTasklet = struct {
const log = Output.scoped(.FetchTasklet, false);
http: ?*http.AsyncHTTP = null,
pending_request: ?*http.PendingHTTPRequest = null,
result: http.HTTPClientResult = .{},
metadata: ?http.HTTPResponseMetadata = null,
javascript_vm: *VirtualMachine = undefined,
@@ -741,7 +742,6 @@ pub const Fetch = struct {
promise: JSC.JSPromise.Strong,
concurrent_task: JSC.ConcurrentTask = .{},
poll_ref: Async.KeepAlive = .{},
memory_reporter: *JSC.MemoryReportingAllocator,
/// For Http Client requests
/// when Content-Length is provided this represents the whole size of the request
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
@@ -753,8 +753,6 @@ pub const Fetch = struct {
url_proxy_buffer: []const u8 = "",
signal: ?*JSC.WebCore.AbortSignal = null,
signals: http.Signals = .{},
signal_store: http.Signals.Store = .{},
has_schedule_callback: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
// must be stored because AbortSignal stores reason weakly
@@ -773,23 +771,11 @@ pub const Fetch = struct {
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
pub fn ref(this: *FetchTasklet) void {
const count = this.ref_count.fetchAdd(1, .monotonic);
bun.debugAssert(count > 0);
}
pub fn deref(this: *FetchTasklet) void {
const count = this.ref_count.fetchSub(1, .monotonic);
bun.debugAssert(count > 0);
if (count == 1) {
this.deinit();
}
}
pub usingnamespace bun.NewThreadSafeRefCounted(@This(), @This().deinit);
pub const HTTPRequestBody = union(enum) {
AnyBlob: AnyBlob,
Sendfile: http.Sendfile,
Sendfile: Sendfile,
pub fn store(this: *HTTPRequestBody) ?*JSC.WebCore.Blob.Store {
return switch (this.*) {
@@ -824,7 +810,7 @@ pub const Fetch = struct {
fn clearData(this: *FetchTasklet) void {
log("clearData", .{});
const allocator = this.memory_reporter.allocator();
const allocator = bun.default_allocator;
if (this.url_proxy_buffer.len > 0) {
allocator.free(this.url_proxy_buffer);
this.url_proxy_buffer.len = 0;
@@ -844,12 +830,14 @@ pub const Fetch = struct {
this.request_headers.buf.deinit(allocator);
this.request_headers = Headers{ .allocator = undefined };
if (this.http != null) {
this.http.?.clearData();
if (this.pending_request) |request| {
this.pending_request = null;
request.ignoreRemainingResponseBody();
request.deref();
}
if (this.metadata != null) {
this.metadata.?.deinit(allocator);
if (this.metadata) |*metadata| {
metadata.deinit(allocator);
this.metadata = null;
}
@@ -886,13 +874,11 @@ pub const Fetch = struct {
this.clearData();
var reporter = this.memory_reporter;
const allocator = reporter.allocator();
if (this.http) |http_| allocator.destroy(http_);
allocator.destroy(this);
// reporter.assert();
bun.default_allocator.destroy(reporter);
if (this.pending_request) |request| {
this.pending_request = null;
request.deref();
}
this.destroy();
}
fn getCurrentResponse(this: *FetchTasklet) ?*Response {
@@ -1029,7 +1015,6 @@ pub const Fetch = struct {
buffer_reset = false;
if (!this.result.has_more) {
var scheduled_response_buffer = this.scheduled_response_buffer.list;
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
// done resolve body
var old = body.value;
@@ -1041,7 +1026,7 @@ pub const Fetch = struct {
response.body.value = body_value;
this.scheduled_response_buffer = .{
.allocator = this.memory_reporter.allocator(),
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
@@ -1229,12 +1214,11 @@ pub const Fetch = struct {
check_result.ensureStillAlive();
check_result.protect();
this.abort_reason = check_result;
this.signal_store.aborted.store(true, .monotonic);
this.tracker.didCancel(this.global_this);
// we need to abort the request
if (this.http != null) {
http.http_thread.scheduleShutdown(this.http.?);
if (this.pending_request) |request| {
request.abort();
}
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
return false;
@@ -1286,8 +1270,6 @@ pub const Fetch = struct {
// some times we don't have metadata so we also check http.url
const path = if (this.metadata) |metadata|
bun.String.createUTF8(metadata.url)
else if (this.http) |http_|
bun.String.createUTF8(http_.url.href)
else
bun.String.empty;
@@ -1383,12 +1365,24 @@ pub const Fetch = struct {
this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, this.global_this);
}
pub fn isAborted(this: *const FetchTasklet) bool {
if (this.abort_reason != .zero) {
return true;
}
if (this.pending_request) |request| {
return request.isAborted();
}
return false;
}
pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult {
const this = bun.cast(*FetchTasklet, ctx);
if (this.http) |http_| {
http_.enableBodyStreaming();
if (this.pending_request) |request| {
request.enableBodyStreaming();
}
if (this.signal_store.aborted.load(.monotonic)) {
if (this.isAborted()) {
return JSC.WebCore.DrainResult{
.aborted = {},
};
@@ -1401,9 +1395,8 @@ pub const Fetch = struct {
var scheduled_response_buffer = this.scheduled_response_buffer.list;
// This means we have received part of the body but not the whole thing
if (scheduled_response_buffer.items.len > 0) {
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
this.scheduled_response_buffer = .{
.allocator = this.memory_reporter.allocator(),
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
@@ -1449,14 +1442,13 @@ pub const Fetch = struct {
}
var scheduled_response_buffer = this.scheduled_response_buffer.list;
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
const response = Body.Value{
.InternalBlob = .{
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
},
};
this.scheduled_response_buffer = .{
.allocator = this.memory_reporter.allocator(),
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
@@ -1489,10 +1481,12 @@ pub const Fetch = struct {
fn ignoreRemainingResponseBody(this: *FetchTasklet) void {
log("ignoreRemainingResponseBody", .{});
// enabling streaming will make the http thread to drain into the main thread (aka stop buffering)
// without a stream ref, response body or response instance alive it will just ignore the result
if (this.http) |http_| {
http_.enableBodyStreaming();
if (this.pending_request) |request| {
// enabling streaming will make the http thread to drain into the main thread (aka stop buffering)
// without a stream ref, response body or response instance alive it will just ignore the result
// TODO: use ignoreRemainingResponseBody() instead of enableBodyStreaming()
request.enableBodyStreaming();
}
// we should not keep the process alive if we are ignoring the body
const vm = this.global_this.bunVM();
@@ -1558,26 +1552,24 @@ pub const Fetch = struct {
fetch_options: FetchOptions,
promise: JSC.JSPromise.Strong,
) !*FetchTasklet {
_ = allocator; // autofix
var jsc_vm = globalThis.bunVM();
var fetch_tasklet = try allocator.create(FetchTasklet);
fetch_tasklet.* = .{
var fetch_tasklet = FetchTasklet.new(.{
.mutex = Mutex.init(),
.scheduled_response_buffer = .{
.allocator = fetch_options.memory_reporter.allocator(),
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
},
.response_buffer = MutableString{
.allocator = fetch_options.memory_reporter.allocator(),
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
},
.http = try allocator.create(http.AsyncHTTP),
.javascript_vm = jsc_vm,
.request_body = fetch_options.body,
.global_this = globalThis,
@@ -1587,12 +1579,9 @@ pub const Fetch = struct {
.signal = fetch_options.signal,
.hostname = fetch_options.hostname,
.tracker = JSC.AsyncTaskTracker.init(jsc_vm),
.memory_reporter = fetch_options.memory_reporter,
.check_server_identity = fetch_options.check_server_identity,
.reject_unauthorized = fetch_options.reject_unauthorized,
};
fetch_tasklet.signals = fetch_tasklet.signal_store.to();
});
fetch_tasklet.tracker.didSchedule(globalThis);
@@ -1609,54 +1598,45 @@ pub const Fetch = struct {
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
}
if (fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized) {
fetch_tasklet.signal_store.cert_errors.store(true, .monotonic);
} else {
fetch_tasklet.signals.cert_errors = null;
}
const flags = http.Signals.Flags{
.cert_errors = fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized,
.header_progress = true,
};
fetch_tasklet.http.?.* = http.AsyncHTTP.init(
fetch_options.memory_reporter.allocator(),
fetch_options.method,
fetch_options.url,
fetch_options.headers.entries,
fetch_options.headers.buf.items,
&fetch_tasklet.response_buffer,
fetch_tasklet.request_body.slice(),
http.HTTPClientResult.Callback.New(
*FetchTasklet,
FetchTasklet.callback,
).init(fetch_tasklet),
fetch_options.redirect_type,
.{
fetch_tasklet.pending_request = http.fetch(
&.{
.method = fetch_options.method,
.url = fetch_options.url,
.redirect_type = fetch_options.redirect_type,
.headers = fetch_options.headers.entries,
.headers_buf = fetch_options.headers.buf.items,
.flags = flags,
.request_body = brk: {
if (fetch_tasklet.request_body == .Sendfile) {
bun.assert(fetch_options.url.isHTTP());
bun.assert(fetch_options.proxy == null);
break :brk .{ .sendfile = fetch_tasklet.request_body.Sendfile };
}
break :brk .{ .bytes = fetch_tasklet.request_body.slice() };
},
.http_proxy = proxy,
.hostname = fetch_options.hostname,
.signals = fetch_tasklet.signals,
.unix_socket_path = fetch_options.unix_socket_path,
.disable_timeout = fetch_options.disable_timeout,
.disable_keepalive = fetch_options.disable_keepalive,
.disable_decompression = fetch_options.disable_decompression,
.reject_unauthorized = fetch_options.reject_unauthorized,
.verbose = fetch_options.verbose,
.tls_props = fetch_options.ssl_config,
.tls_config = fetch_options.ssl_config,
},
http.HTTPClientResult.Callback.New(
*FetchTasklet,
FetchTasklet.callback,
).init(fetch_tasklet),
);
// TODO is this necessary? the http client already sets the redirect type,
// so manually setting it here seems redundant
if (fetch_options.redirect_type != FetchRedirect.follow) {
fetch_tasklet.http.?.client.remaining_redirect_count = 0;
}
// we want to return after headers are received
fetch_tasklet.signal_store.header_progress.store(true, .monotonic);
if (fetch_tasklet.request_body == .Sendfile) {
bun.assert(fetch_options.url.isHTTP());
bun.assert(fetch_options.proxy == null);
fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile };
}
if (fetch_tasklet.signal) |signal| {
fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener);
}
@@ -1668,11 +1648,9 @@ pub const Fetch = struct {
reason.ensureStillAlive();
this.abort_reason = reason;
reason.protect();
this.signal_store.aborted.store(true, .monotonic);
this.tracker.didCancel(this.global_this);
if (this.http != null) {
http.http_thread.scheduleShutdown(this.http.?);
if (this.pending_request) |request| {
request.abort();
}
}
@@ -1693,7 +1671,6 @@ pub const Fetch = struct {
globalThis: ?*JSGlobalObject,
// Custom Hostname
hostname: ?[]u8 = null,
memory_reporter: *JSC.MemoryReportingAllocator,
check_server_identity: JSC.Strong = .{},
unix_socket_path: ZigString.Slice,
ssl_config: ?*SSLConfig = null,
@@ -1713,29 +1690,23 @@ pub const Fetch = struct {
promise,
);
var batch = bun.ThreadPool.Batch{};
node.http.?.schedule(allocator, &batch);
node.poll_ref.ref(global.bunVM());
http.http_thread.schedule(batch);
return node;
}
pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void {
pub fn callback(task: *FetchTasklet, pending_request: *http.PendingHTTPRequest, result: http.HTTPClientResult) void {
task.ref();
task.mutex.lock();
defer task.mutex.unlock();
task.http.?.* = async_http.*;
task.http.?.response_buffer = async_http.response_buffer;
log("callback success {} has_more {} bytes {}", .{ result.isSuccess(), result.has_more, result.body.?.list.items.len });
const prev_metadata = task.result.metadata;
const prev_cert_info = task.result.certificate_info;
task.result = result;
pending_request.result.metadata = null;
// Preserve pending certificate info if it was preovided in the previous update.
if (task.result.certificate_info == null) {
@@ -1765,7 +1736,7 @@ pub const Fetch = struct {
if (task.scheduled_response_buffer.list.capacity > 0) {
task.scheduled_response_buffer.deinit();
task.scheduled_response_buffer = .{
.allocator = task.memory_reporter.allocator(),
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
@@ -1810,7 +1781,7 @@ pub const Fetch = struct {
var blob = Blob.init(data, allocator, globalThis);
var allocated = false;
const mime_type = bun.http.MimeType.init(data_url.mime_type, allocator, &allocated);
const mime_type = MimeType.init(data_url.mime_type, allocator, &allocated);
blob.content_type = mime_type.value;
if (allocated) {
blob.content_type_allocated = true;
@@ -1882,7 +1853,7 @@ pub const Fetch = struct {
return .zero;
}
bun.http.AsyncHTTP.preconnect(url, true);
bun.http.preconnect(url, true);
return .undefined;
}
@@ -1910,7 +1881,7 @@ pub const Fetch = struct {
memory_reporter.report(globalThis.vm());
if (is_error) bun.default_allocator.destroy(memory_reporter);
bun.default_allocator.destroy(memory_reporter);
}
if (arguments.len == 0) {
@@ -2697,7 +2668,7 @@ pub const Fetch = struct {
.result => |fd| fd,
};
if (proxy == null and bun.http.Sendfile.isEligible(url)) {
if (proxy == null and Sendfile.isEligible(url)) {
use_sendfile: {
const stat: bun.Stat = switch (bun.sys.fstat(opened_fd)) {
.result => |result| result,
@@ -2788,13 +2759,13 @@ pub const Fetch = struct {
const promise_val = promise.value();
_ = FetchTasklet.queue(
allocator,
bun.default_allocator,
globalThis,
.{
.method = method,
.url = url,
.headers = headers orelse Headers{
.allocator = allocator,
.allocator = bun.default_allocator,
},
.body = http_body,
.disable_keepalive = disable_keepalive,
@@ -2809,7 +2780,6 @@ pub const Fetch = struct {
.globalThis = globalThis,
.ssl_config = ssl_config,
.hostname = hostname,
.memory_reporter = memory_reporter,
.check_server_identity = if (check_server_identity.isEmptyOrUndefinedOrNull()) .{} else JSC.Strong.create(check_server_identity, globalThis),
.unix_socket_path = unix_socket_path,
},

View File

@@ -28,7 +28,6 @@ const bundler = bun.bundler;
const DotEnv = @import("env_loader.zig");
const which = @import("which.zig").which;
const JSC = bun.JSC;
const AsyncHTTP = bun.http.AsyncHTTP;
const Arena = @import("./mimalloc_arena.zig").Arena;
const OpaqueWrap = JSC.OpaqueWrap;
@@ -112,7 +111,7 @@ pub const Run = struct {
failWithBuildError(vm);
};
AsyncHTTP.loadEnv(vm.allocator, vm.log, b.env);
bun.http.loadEnv(vm.allocator, vm.log, b.env);
vm.loadExtraEnv();
vm.is_main_thread = true;
@@ -146,7 +145,7 @@ pub const Run = struct {
Global.exit(1);
}
AsyncHTTP.preconnect(url, false);
bun.http.preconnect(url, false);
}
}
@@ -255,7 +254,7 @@ pub const Run = struct {
failWithBuildError(vm);
};
AsyncHTTP.loadEnv(vm.allocator, vm.log, b.env);
bun.http.loadEnv(vm.allocator, vm.log, b.env);
vm.loadExtraEnv();
vm.is_main_thread = true;

View File

@@ -1967,23 +1967,17 @@ pub const Example = struct {
mutable.* = try MutableString.init(ctx.allocator, 8192);
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
api_url,
header_entries,
headers_buf,
mutable,
"",
http_proxy,
null,
HTTP.FetchRedirect.follow,
);
async_http.client.progress_node = progress;
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
const response = try async_http.sendSync(true);
const response = try bun.http.fetchSync(&.{
.allocator = ctx.allocator,
.method = .GET,
.url = api_url,
.headers = header_entries,
.headers_buf = headers_buf,
.progress_node = progress,
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
.http_proxy = http_proxy,
.redirect_type = .follow,
}, mutable);
switch (response.status_code) {
404 => return error.GitHubRepositoryNotFound,
@@ -2043,24 +2037,17 @@ pub const Example = struct {
var http_proxy: ?URL = env_loader.getHttpProxy(url);
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
url,
.{},
"",
mutable,
"",
http_proxy,
null,
HTTP.FetchRedirect.follow,
);
async_http.client.progress_node = progress;
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
var response = try async_http.sendSync(true);
var response = try bun.http.fetchSync(&.{
.allocator = ctx.allocator,
.method = .GET,
.url = url,
.headers = .{},
.headers_buf = "",
.progress_node = progress,
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
.http_proxy = http_proxy,
.redirect_type = .follow,
}, mutable);
switch (response.status_code) {
404 => return error.ExampleNotFound,
@@ -2134,24 +2121,19 @@ pub const Example = struct {
http_proxy = env_loader.getHttpProxy(parsed_tarball_url);
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
parsed_tarball_url,
.{},
"",
mutable,
"",
http_proxy,
null,
HTTP.FetchRedirect.follow,
);
async_http.client.progress_node = progress;
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
refresher.maybeRefresh();
response = try async_http.sendSync(true);
response = try bun.http.fetchSync(&.{
.allocator = ctx.allocator,
.method = .GET,
.url = parsed_tarball_url,
.headers = .{},
.headers_buf = "",
.progress_node = progress,
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
.http_proxy = http_proxy,
.redirect_type = .follow,
}, mutable);
refresher.maybeRefresh();
@@ -2172,29 +2154,20 @@ pub const Example = struct {
const http_proxy: ?URL = env_loader.getHttpProxy(url);
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
const mutable = try ctx.allocator.create(MutableString);
mutable.* = try MutableString.init(ctx.allocator, 2048);
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
url,
.{},
"",
mutable,
"",
http_proxy,
null,
HTTP.FetchRedirect.follow,
);
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
if (Output.enable_ansi_colors) {
async_http.client.progress_node = progress_node;
}
const response = async_http.sendSync(true) catch |err| {
const response = bun.http.fetchSync(&.{
.allocator = ctx.allocator,
.method = .GET,
.url = url,
.headers = .{},
.headers_buf = "",
.progress_node = progress_node,
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
.http_proxy = http_proxy,
.redirect_type = .follow,
}, mutable) catch |err| {
switch (err) {
error.WouldBlock => {
Output.prettyErrorln("Request timed out while trying to fetch examples list. Please try again", .{});

View File

@@ -234,24 +234,20 @@ pub const UpgradeCommand = struct {
var metadata_body = try MutableString.init(allocator, 2048);
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = try allocator.create(HTTP.AsyncHTTP);
async_http.* = HTTP.AsyncHTTP.initSync(
allocator,
.GET,
api_url,
header_entries,
headers_buf,
const response = try bun.http.fetchSync(
&.{
.allocator = allocator,
.progress_node = progress,
.headers = header_entries,
.headers_buf = headers_buf,
.http_proxy = http_proxy,
.url = api_url,
.method = .GET,
.redirect_type = .follow,
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
},
&metadata_body,
"",
http_proxy,
null,
HTTP.FetchRedirect.follow,
);
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
if (!silent) async_http.client.progress_node = progress.?;
const response = try async_http.sendSync(true);
switch (response.status_code) {
404 => return error.HTTP404,
@@ -514,26 +510,20 @@ pub const UpgradeCommand = struct {
var refresher = Progress{};
var progress = refresher.start("Downloading", version.size);
refresher.refresh();
var async_http = try ctx.allocator.create(HTTP.AsyncHTTP);
var zip_file_buffer = try ctx.allocator.create(MutableString);
zip_file_buffer.* = try MutableString.init(ctx.allocator, @max(version.size, 1024));
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
zip_url,
.{},
"",
zip_file_buffer,
"",
http_proxy,
null,
HTTP.FetchRedirect.follow,
);
async_http.client.progress_node = progress;
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
const response = try async_http.sendSync(true);
const response = try bun.http.fetchSync(&.{
.allocator = ctx.allocator,
.progress_node = progress,
.headers = .{},
.headers_buf = "",
.http_proxy = http_proxy,
.url = zip_url,
.method = .GET,
.redirect_type = .follow,
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
}, zip_file_buffer);
switch (response.status_code) {
404 => {

View File

@@ -143,8 +143,6 @@ pub fn downloadToPath(this: *const CompileTarget, env: *bun.DotEnv.Loader, alloc
{
refresher.refresh();
// TODO: This is way too much code necessary to send a single HTTP request...
var async_http = try allocator.create(HTTP.AsyncHTTP);
var compressed_archive_bytes = try allocator.create(MutableString);
compressed_archive_bytes.* = try MutableString.init(allocator, 24 * 1024 * 1024);
var url_buffer: [2048]u8 = undefined;
@@ -155,22 +153,15 @@ pub fn downloadToPath(this: *const CompileTarget, env: *bun.DotEnv.Loader, alloc
defer progress.end();
const http_proxy: ?bun.URL = env.getHttpProxy(url);
async_http.* = HTTP.AsyncHTTP.initSync(
allocator,
.GET,
url,
.{},
"",
compressed_archive_bytes,
"",
http_proxy,
null,
HTTP.FetchRedirect.follow,
);
async_http.client.progress_node = progress;
async_http.client.reject_unauthorized = env.getTLSRejectUnauthorized();
const response = try async_http.sendSync(true);
const response = try bun.http.fetchSync(&.{
.method = .GET,
.url = url,
.headers = .{},
.headers_buf = "",
.http_proxy = http_proxy,
.progress_node = progress,
.reject_unauthorized = env.getTLSRejectUnauthorized(),
}, compressed_archive_bytes);
switch (response.status_code) {
404 => {

View File

@@ -138,6 +138,17 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type {
socket: InternalSocket,
const ThisSocket = @This();
pub fn isSSL() bool {
return is_ssl;
}
pub fn toAnySocket(this: ThisSocket) uws.AnySocket {
return switch (is_ssl) {
true => uws.AnySocket{ .SocketTLS = this },
false => uws.AnySocket{ .SocketTCP = this },
};
}
pub fn verifyError(this: ThisSocket) us_bun_verify_error_t {
const socket = this.socket.get() orelse return std.mem.zeroes(us_bun_verify_error_t);
const ssl_error: us_bun_verify_error_t = uws.us_socket_verify_error(comptime ssl_int, socket);

View File

@@ -47,9 +47,6 @@ const Batch = bun.ThreadPool.Batch;
const TaggedPointerUnion = @import("./tagged_pointer.zig").TaggedPointerUnion;
const DeadSocket = opaque {};
var dead_socket = @as(*DeadSocket, @ptrFromInt(1));
//TODO: this needs to be freed when Worker Threads are implemented
var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSocket).init(bun.default_allocator);
var async_http_id: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
const MAX_REDIRECT_URL_LENGTH = 128 * 1024;
var custom_ssl_context_map = std.AutoArrayHashMap(*SSLConfig, *NewHTTPContext(true)).init(bun.default_allocator);
@@ -65,35 +62,245 @@ var shared_response_headers_buf: [256]picohttp.Header = undefined;
const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
pub const Signals = struct {
header_progress: ?*std.atomic.Value(bool) = null,
body_streaming: ?*std.atomic.Value(bool) = null,
aborted: ?*std.atomic.Value(bool) = null,
cert_errors: ?*std.atomic.Value(bool) = null,
const AbortTracker = struct {
map: std.AutoArrayHashMap(u32, uws.AnySocket) = std.AutoArrayHashMap(u32, uws.AnySocket).init(bun.default_allocator),
last_id: std.atomic.Value(usize) = std.atomic.Value(usize).init(1),
pub fn isEmpty(this: *const Signals) bool {
return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null;
pub fn nextId(this: *AbortTracker) u32 {
// The max size is u32
// We want this number to never be 0, as we use 0 to indicate no id.
const id: u32 = @intCast((this.last_id.fetchAdd(1, .monotonic) % (std.math.maxInt(u32) - 1)) + 1);
bun.debugAssert(id > 0);
return id;
}
pub const Store = struct {
header_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
body_streaming: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
aborted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
cert_errors: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
pub fn put(this: *AbortTracker, id: u32, socket: uws.AnySocket) void {
if (id > 0) {
this.map.put(id, socket) catch bun.outOfMemory();
}
}
pub fn to(this: *Store) Signals {
return .{
.header_progress = &this.header_progress,
.body_streaming = &this.body_streaming,
.aborted = &this.aborted,
.cert_errors = &this.cert_errors,
};
pub fn remove(this: *AbortTracker, id: u32) void {
if (id > 0) {
_ = this.map.swapRemove(id);
}
}
pub fn fetchRemove(this: *AbortTracker, id: u32) ?uws.AnySocket {
if (id > 0) {
if (this.map.fetchSwapRemove(id)) |val| {
return val.value;
}
}
return null;
}
pub fn garbageCollect(this: *AbortTracker) void {
if (this.map.capacity() > 10_000 and this.map.count() < 1000) {
this.map.shrinkAndFree(this.map.count());
}
}
};
var abort_tracker: AbortTracker = .{};
pub const HTTPClientRequest = struct {
method: Method = .GET,
url: URL = .{},
headers: Headers.Entries = .{},
headers_buf: string = "",
request_body: HTTPRequestBody = .{ .bytes = "" },
redirect_type: FetchRedirect = .follow,
http_proxy: ?URL = null,
hostname: ?[]u8 = null,
signals: ?Signals = null,
unix_socket_path: ?JSC.ZigString.Slice = null,
disable_timeout: ?bool = null,
verbose: ?HTTPVerboseLevel = null,
disable_keepalive: ?bool = null,
disable_decompression: ?bool = null,
tls_config: ?*SSLConfig = null,
receives_updates: bool = false,
reject_unauthorized: ?bool = null,
allocator: std.mem.Allocator = bun.default_allocator,
flags: Signals.Flags = .{},
progress_node: ?*Progress.Node = null,
};
pub fn fetchBatched(this: *const HTTPClientRequest, callback: HTTPClientResult.Callback, task: ?*?*ThreadPool.Task) *PendingHTTPRequest {
var pending_http = PendingHTTPRequest.new(.{
.signals_store = .{},
.signals = undefined,
.result = .{},
.token = if (this.receives_updates) abort_tracker.nextId() else 0,
});
pending_http.signals_store.flags.raw = @bitCast(this.flags);
pending_http.signals = pending_http.signals_store.signals();
const _http = AsyncHTTP.init(this, pending_http, callback);
if (task) |task_| {
task_.* = &_http.task;
pending_http.ref();
} else {
var empty_batch: ThreadPool.Batch = .{};
_http.schedule(&empty_batch);
http_thread.schedule(empty_batch);
}
return pending_http;
}
pub inline fn fetch(this: *const HTTPClientRequest, callback: HTTPClientResult.Callback) *PendingHTTPRequest {
return fetchBatched(this, callback, null);
}
pub fn fetchSync(this: *const HTTPClientRequest, body: *MutableString) !picohttp.Response {
// This leaks PendingHTTPRequest, but we always shutdown the whole program after this so it's fine.
var pending_http = PendingHTTPRequest.new(.{
.signals_store = .{},
.signals = undefined,
.result = .{},
.token = 0,
});
pending_http.signals_store.flags.raw = @bitCast(this.flags);
pending_http.response_body = body.*;
pending_http.signals.flags = &pending_http.signals_store.flags;
var _http = AsyncHTTP.init(this, pending_http, undefined);
const response = try _http.sendSync();
body.* = pending_http.response_body;
return response;
}
pub fn preconnect(
url: URL,
is_url_owned: bool,
) void {
if (!FeatureFlags.is_fetch_preconnect_supported) {
if (is_url_owned) {
bun.default_allocator.free(url.href);
}
return;
}
var this = Preconnect.new(.{
.pending_http_request = PendingHTTPRequest.new(.{
.signals_store = .{},
.signals = undefined,
.result = .{},
.token = 0,
.response_body = MutableString{ .allocator = default_allocator, .list = .{} },
}),
.response_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
.url = url,
.is_url_owned = is_url_owned,
});
this.pending_http_request.signals = this.pending_http_request.signals_store.signals();
var http_ = AsyncHTTP.init(
&.{
.url = url,
.method = .GET,
},
this.pending_http_request,
HTTPClientResult.Callback.New(*Preconnect, Preconnect.onResult).init(this),
);
http_.client.is_preconnect_only = true;
http_thread.schedule(Batch.from(&http_.task));
}
pub const PendingHTTPRequest = struct {
response_body: MutableString = .{
.allocator = bun.default_allocator,
.list = .{},
},
signals_store: Signals.Store = .{},
signals: Signals,
result: HTTPClientResult = .{
.has_more = true,
},
token: u32 = 0,
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
elapsed: u64 = 0,
pub fn isAborted(this: *const PendingHTTPRequest) bool {
return this.signals.get(.aborted);
}
pub fn err(this: *const PendingHTTPRequest) ?anyerror {
return this.result.fail;
}
pub fn response(this: *const PendingHTTPRequest) ?picohttp.Response {
if (this.result.metadata) |*metadata| {
return metadata.response;
}
return null;
}
pub usingnamespace bun.NewThreadSafeRefCounted(@This(), @This().deinit);
pub fn abort(this: *PendingHTTPRequest) void {
const state = this.signals.fetchSet(.aborted, true);
if (!state.aborted and !state.done and this.token > 0) {
http_thread.scheduleShutdown(this.token);
}
}
pub fn enableBodyStreaming(this: *PendingHTTPRequest) void {
this.signals.set(.body_streaming, true);
}
pub fn ignoreRemainingResponseBody(this: *PendingHTTPRequest) void {
this.signals.set(.ignore_remaining_response_body, true);
}
fn deinit(this: *PendingHTTPRequest) void {
if (this.result.metadata) |*metadata| {
metadata.deinit(bun.default_allocator);
}
this.response_body.deinit();
this.destroy();
}
};
pub const Signals = struct {
flags: *std.atomic.Value(u8),
pub const Flags = packed struct(u8) {
_padding: u2 = 0,
header_progress: bool = false,
body_streaming: bool = false,
aborted: bool = false,
cert_errors: bool = false,
done: bool = false,
ignore_remaining_response_body: bool = false,
};
pub const Store = struct {
flags: std.atomic.Value(u8) = .{ .raw = 0 },
pub fn signals(this: *@This()) Signals {
return Signals{ .flags = &this.flags };
}
};
pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals)) bool {
var ptr: *std.atomic.Value(bool) = @field(this, @tagName(field)) orelse return false;
return ptr.load(.monotonic);
pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals.Flags)) bool {
const flags: Flags = @bitCast(this.flags.load(.monotonic));
return @field(flags, @tagName(field));
}
pub fn fetchSet(this: Signals, comptime field: std.meta.FieldEnum(Signals.Flags), value: bool) Flags {
const fieldIndex: u8 = std.meta.fieldIndex(Flags, @tagName(field)) orelse @compileError(std.fmt.comptimePrint("Field \"{s}\" not found", .{@tagName(field)}));
if (value) {
return @bitCast(this.flags.fetchOr(@as(u8, 1) << fieldIndex, .monotonic));
} else {
return @bitCast(this.flags.fetchAnd(~(@as(u8, 1) << fieldIndex), .monotonic));
}
}
pub fn set(this: Signals, comptime field: std.meta.FieldEnum(Signals.Flags), value: bool) void {
_ = this.fetchSet(field, value);
}
};
@@ -355,7 +562,7 @@ fn NewHTTPContext(comptime ssl: bool) type {
socket.close(.normal);
}
fn getTagged(ptr: *anyopaque) ActiveSocket {
pub fn getTagged(ptr: *anyopaque) ActiveSocket {
return ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
}
@@ -771,7 +978,7 @@ pub const HTTPThread = struct {
queued_tasks: Queue = Queue{},
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
queued_shutdowns: std.ArrayListUnmanaged(u32) = std.ArrayListUnmanaged(u32){},
queued_shutdowns_lock: bun.Lock = bun.Lock.init(),
has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
@@ -779,11 +986,6 @@ pub const HTTPThread = struct {
lazy_libdeflater: ?*LibdeflateState = null,
const ShutdownMessage = struct {
async_http_id: u32,
is_tls: bool,
};
const threadlog = Output.scoped(.HTTPThread, true);
pub const LibdeflateState = struct {
@@ -897,45 +1099,45 @@ pub const HTTPThread = struct {
{
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
for (this.queued_shutdowns.items) |http| {
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
if (http.is_tls) {
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
socket.shutdown();
socket.shutdownRead();
} else {
const socket = uws.SocketTCP.fromAny(socket_ptr.value);
socket.shutdown();
socket.shutdownRead();
for (this.queued_shutdowns.items) |token| {
if (abort_tracker.fetchRemove(token)) |any_socket| {
switch (any_socket) {
inline .SocketTCP, .SocketTLS => |socket| {
const tagged = NewHTTPContext(comptime @TypeOf(socket).isSSL()).getTagged(socket.ext(anyopaque));
if (tagged.get(HTTPClient)) |client| {
client.allow_retry = false;
}
socket.close(.failure);
},
}
}
}
this.queued_shutdowns.clearRetainingCapacity();
}
var count: usize = 0;
var active = AsyncHTTP.active_requests_count.load(.monotonic);
const max = AsyncHTTP.max_simultaneous_requests.load(.monotonic);
if (active >= max) return;
defer {
if (comptime Environment.allow_assert) {
if (count > 0)
log("Processed {d} tasks\n", .{count});
if (this.queued_shutdowns.capacity > 1024) {
this.queued_shutdowns.shrinkAndFree(bun.default_allocator, 10);
}
}
while (this.queued_tasks.pop()) |http| {
var cloned = ThreadlocalAsyncHTTP.new(.{
.async_http = http.*,
});
cloned.async_http.real = http;
cloned.async_http.onStart();
if (comptime Environment.allow_assert) {
count += 1;
var stack_queue = std.heap.stackFallback(2048, bun.default_allocator);
var queue = std.ArrayList(*AsyncHTTP).init(stack_queue.get());
defer queue.deinit();
while (active_requests_count.load(.monotonic) < max_simultaneous_requests.load(.monotonic)) {
const to_clone_count = max_simultaneous_requests.load(.monotonic) - active_requests_count.load(.monotonic);
if (to_clone_count == 0) {
break;
}
active += 1;
if (active >= max) break;
var any = false;
while (this.queued_tasks.pop()) |http| {
any = true;
http.onStart();
if (active_requests_count.load(.monotonic) >= max_simultaneous_requests.load(.monotonic)) break;
}
if (!any) break;
}
}
@@ -970,14 +1172,12 @@ pub const HTTPThread = struct {
}
}
pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void {
pub fn scheduleShutdown(this: *@This(), token: u32) void {
{
bun.debugAssert(token > 0);
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
this.queued_shutdowns.append(bun.default_allocator, .{
.async_http_id = http.async_http_id,
.is_tls = http.client.isHTTPS(),
}) catch bun.outOfMemory();
this.queued_shutdowns.append(bun.default_allocator, token) catch bun.outOfMemory();
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
@@ -1087,9 +1287,7 @@ pub fn onOpen(
assert(is_ssl == client.url.isHTTPS());
}
}
if (client.signals.aborted != null) {
socket_async_http_abort_tracker.put(client.async_http_id, socket.socket) catch unreachable;
}
abort_tracker.put(client.token, socket.toAnySocket());
log("Connected {s} \n", .{client.url.href});
if (client.signals.get(.aborted)) {
@@ -1673,7 +1871,6 @@ state: InternalState = .{},
did_have_handshaking_error: bool = false,
tls_props: ?*SSLConfig = null,
result_callback: HTTPClientResult.Callback = undefined,
/// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since.
/// This is a workaround for that.
@@ -1685,8 +1882,10 @@ http_proxy: ?URL = null,
proxy_authorization: ?[]u8 = null,
proxy_tunneling: bool = false,
proxy_tunnel: ?ProxyTunnel = null,
signals: Signals = .{},
async_http_id: u32 = 0,
signals: Signals = .{
.flags = undefined,
},
token: u32 = 0,
hostname: ?[]u8 = null,
reject_unauthorized: bool = true,
unix_socket_path: JSC.ZigString.Slice = JSC.ZigString.Slice.empty,
@@ -1831,7 +2030,53 @@ pub const HTTPChannelContext = struct {
}
};
pub const AsyncHTTP = struct {
pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void {
if (env.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
const max = std.fmt.parseInt(u16, max_http_requests, 10) catch {
logger.addErrorFmt(
null,
Loc.Empty,
allocator,
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
.{max_http_requests},
) catch unreachable;
return;
};
if (max == 0) {
logger.addWarningFmt(
null,
Loc.Empty,
allocator,
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
.{},
) catch unreachable;
return;
}
max_simultaneous_requests.store(max, .monotonic);
}
}
const Preconnect = struct {
pending_http_request: *PendingHTTPRequest,
response_buffer: MutableString,
url: bun.URL,
is_url_owned: bool,
pub usingnamespace bun.New(@This());
pub fn onResult(this: *Preconnect, _: *PendingHTTPRequest, _: HTTPClientResult) void {
this.response_buffer.deinit();
if (this.is_url_owned) {
bun.default_allocator.free(this.url.href);
}
this.pending_http_request.deref();
this.destroy();
}
};
// *** WARNING: Do not expose this struct outside of this file ***
// We should delete this struct entirely later.
const AsyncHTTP = struct {
request: ?picohttp.Request = null,
response: ?picohttp.Response = null,
request_headers: Headers.Entries = Headers.Entries{},
@@ -1843,8 +2088,8 @@ pub const AsyncHTTP = struct {
method: Method = Method.GET,
url: URL,
http_proxy: ?URL = null,
real: ?*AsyncHTTP = null,
next: ?*AsyncHTTP = null,
free_request_headers_buf: bool = false,
task: ThreadPool.Task = ThreadPool.Task{ .callback = &startAsyncHTTP },
result_callback: HTTPClientResult.Callback = undefined,
@@ -1856,63 +2101,19 @@ pub const AsyncHTTP = struct {
client: HTTPClient = undefined,
err: ?anyerror = null,
async_http_id: u32 = 0,
token: u32 = 0,
state: AtomicState = AtomicState.init(State.pending),
elapsed: u64 = 0,
gzip_elapsed: u64 = 0,
signals: Signals = .{},
signals: Signals = .{
.flags = undefined,
},
pub var active_requests_count = std.atomic.Value(usize).init(0);
pub var max_simultaneous_requests = std.atomic.Value(usize).init(256);
pending_http_request: ?*PendingHTTPRequest = null,
pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void {
if (env.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
const max = std.fmt.parseInt(u16, max_http_requests, 10) catch {
logger.addErrorFmt(
null,
Loc.Empty,
allocator,
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
.{max_http_requests},
) catch unreachable;
return;
};
if (max == 0) {
logger.addWarningFmt(
null,
Loc.Empty,
allocator,
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
.{},
) catch unreachable;
return;
}
AsyncHTTP.max_simultaneous_requests.store(max, .monotonic);
}
}
pub fn signalHeaderProgress(this: *AsyncHTTP) void {
@fence(.release);
var progress = this.signals.header_progress orelse return;
progress.store(true, .release);
}
pub fn enableBodyStreaming(this: *AsyncHTTP) void {
@fence(.release);
var stream = this.signals.body_streaming orelse return;
stream.store(true, .release);
}
pub fn clearData(this: *AsyncHTTP) void {
this.response_headers.deinit(this.allocator);
this.response_headers = .{};
this.request = null;
this.response = null;
this.client.unix_socket_path.deinit();
this.client.unix_socket_path = JSC.ZigString.Slice.empty;
}
pub usingnamespace bun.New(@This());
pub const State = enum(u32) {
pending = 0,
@@ -1923,88 +2124,41 @@ pub const AsyncHTTP = struct {
};
const AtomicState = std.atomic.Value(State);
pub const Options = struct {
http_proxy: ?URL = null,
hostname: ?[]u8 = null,
signals: ?Signals = null,
unix_socket_path: ?JSC.ZigString.Slice = null,
disable_timeout: ?bool = null,
verbose: ?HTTPVerboseLevel = null,
disable_keepalive: ?bool = null,
disable_decompression: ?bool = null,
reject_unauthorized: ?bool = null,
tls_props: ?*SSLConfig = null,
};
const Preconnect = struct {
async_http: AsyncHTTP,
response_buffer: MutableString,
url: bun.URL,
is_url_owned: bool,
pub usingnamespace bun.New(@This());
pub fn onResult(this: *Preconnect, _: *AsyncHTTP, _: HTTPClientResult) void {
this.response_buffer.deinit();
this.async_http.clearData();
this.async_http.client.deinit();
if (this.is_url_owned) {
bun.default_allocator.free(this.url.href);
}
this.destroy();
}
};
pub fn preconnect(
url: URL,
is_url_owned: bool,
) void {
if (!FeatureFlags.is_fetch_preconnect_supported) {
if (is_url_owned) {
bun.default_allocator.free(url.href);
}
return;
}
var this = Preconnect.new(.{
.async_http = undefined,
.response_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
.url = url,
.is_url_owned = is_url_owned,
});
this.async_http = AsyncHTTP.init(bun.default_allocator, .GET, url, .{}, "", &this.response_buffer, "", HTTPClientResult.Callback.New(*Preconnect, Preconnect.onResult).init(this), .manual, .{});
this.async_http.client.is_preconnect_only = true;
http_thread.schedule(Batch.from(&this.async_http.task));
pub fn init(
options: *const HTTPClientRequest,
pending_http_request: *PendingHTTPRequest,
callback: HTTPClientResult.Callback,
) *AsyncHTTP {
var this = AsyncHTTP.new(undefined);
_ = this.create(options, pending_http_request, callback);
return this;
}
pub fn init(
allocator: std.mem.Allocator,
method: Method,
url: URL,
headers: Headers.Entries,
headers_buf: string,
response_buffer: *MutableString,
request_body: []const u8,
pub fn create(
this: *AsyncHTTP,
options: *const HTTPClientRequest,
pending_http_request: *PendingHTTPRequest,
callback: HTTPClientResult.Callback,
redirect_type: FetchRedirect,
options: Options,
) AsyncHTTP {
var this = AsyncHTTP{
) *AsyncHTTP {
const method = options.method;
const url = options.url;
const headers = options.headers;
const headers_buf = options.headers_buf;
const redirect_type = options.redirect_type;
const allocator = bun.default_allocator;
this.* = .{
.allocator = allocator,
.url = url,
.method = method,
.request_headers = headers,
.request_header_buf = headers_buf,
.request_body = .{ .bytes = request_body },
.response_buffer = response_buffer,
.request_body = options.request_body,
.response_buffer = &pending_http_request.response_body,
.result_callback = callback,
.http_proxy = options.http_proxy,
.signals = options.signals orelse .{},
.async_http_id = if (options.signals != null and options.signals.?.aborted != null) async_http_id.fetchAdd(1, .monotonic) else 0,
.signals = pending_http_request.signals,
.token = pending_http_request.token,
.pending_http_request = pending_http_request,
};
this.client = .{
@@ -2015,9 +2169,10 @@ pub const AsyncHTTP = struct {
.header_buf = headers_buf,
.hostname = options.hostname,
.signals = options.signals orelse this.signals,
.async_http_id = this.async_http_id,
.token = this.token,
.http_proxy = this.http_proxy,
.redirect_type = redirect_type,
.progress_node = options.progress_node,
};
if (options.unix_socket_path) |val| {
assert(this.client.unix_socket_path.length() == 0);
@@ -2038,7 +2193,7 @@ pub const AsyncHTTP = struct {
if (options.reject_unauthorized) |val| {
this.client.reject_unauthorized = val;
}
if (options.tls_props) |val| {
if (options.tls_config) |val| {
this.client.tls_props = val;
}
@@ -2116,10 +2271,16 @@ pub const AsyncHTTP = struct {
if (this.http_proxy) |proxy| {
//TODO: need to understand how is possible to reuse Proxy with TSL, so disable keepalive if url is HTTPS
this.client.disable_keepalive = this.url.isHTTPS();
if (this.client.proxy_authorization) |prev| {
bun.default_allocator.free(prev);
}
this.client.proxy_authorization = null;
// Username between 0 and 4096 chars
if (proxy.username.len > 0 and proxy.username.len < 4096) {
// Password between 0 and 4096 chars
if (proxy.password.len > 0 and proxy.password.len < 4096) {
// decode password
var password_buffer = std.mem.zeroes([4096]u8);
var password_stream = std.io.fixedBufferStream(&password_buffer);
@@ -2174,18 +2335,18 @@ pub const AsyncHTTP = struct {
}
}
pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void {
pub fn schedule(this: *AsyncHTTP, batch: *ThreadPool.Batch) void {
this.state.store(.scheduled, .monotonic);
this.pending_http_request.?.ref();
batch.push(ThreadPool.Batch.from(&this.task));
}
fn sendSyncCallback(this: *SingleHTTPChannel, async_http: *AsyncHTTP, result: HTTPClientResult) void {
async_http.real.?.* = async_http.*;
async_http.real.?.response_buffer = async_http.response_buffer;
fn sendSyncCallback(this: *SingleHTTPChannel, pending_http_request: *PendingHTTPRequest, result: HTTPClientResult) void {
_ = pending_http_request; // autofix
this.channel.writeItem(result) catch unreachable;
}
pub fn sendSync(this: *AsyncHTTP, comptime _: bool) anyerror!picohttp.Response {
pub fn sendSync(this: *AsyncHTTP) anyerror!picohttp.Response {
HTTPThread.init();
var ctx = try bun.default_allocator.create(SingleHTTPChannel);
@@ -2196,7 +2357,7 @@ pub const AsyncHTTP = struct {
).init(ctx);
var batch = bun.ThreadPool.Batch{};
this.schedule(bun.default_allocator, &batch);
this.schedule(&batch);
http_thread.schedule(batch);
while (true) {
const result: HTTPClientResult = ctx.channel.readItem() catch unreachable;
@@ -2208,9 +2369,7 @@ pub const AsyncHTTP = struct {
unreachable;
}
pub fn onAsyncHTTPCallback(this: *AsyncHTTP, async_http: *AsyncHTTP, result: HTTPClientResult) void {
assert(this.real != null);
pub fn onAsyncHTTPCallback(this: *AsyncHTTP, result: HTTPClientResult, is_owned_by_self: bool) void {
var callback = this.result_callback;
this.elapsed = http_thread.timer.read() -| this.elapsed;
@@ -2228,33 +2387,36 @@ pub const AsyncHTTP = struct {
this.response = null;
this.state.store(State.fail, .monotonic);
}
if (comptime Environment.enable_logs) {
if (socket_async_http_abort_tracker.count() > 0) {
log("socket_async_http_abort_tracker count: {d}", .{socket_async_http_abort_tracker.count()});
}
}
if (socket_async_http_abort_tracker.capacity() > 10_000 and socket_async_http_abort_tracker.count() < 100) {
socket_async_http_abort_tracker.shrinkAndFree(socket_async_http_abort_tracker.count());
}
abort_tracker.garbageCollect();
if (result.has_more) {
callback.function(callback.ctx, async_http, result);
callback.function(callback.ctx, this.pending_http_request.?, result);
} else {
{
this.client.deinit();
var threadlocal_http: *ThreadlocalAsyncHTTP = @fieldParentPtr("async_http", async_http);
defer threadlocal_http.destroy();
log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
callback.function(callback.ctx, async_http, result);
log("onAsyncHTTPCallback({*}): {}", .{ this, bun.fmt.fmtDuration(this.elapsed) });
var pending = this.pending_http_request.?;
pending.elapsed = this.elapsed;
pending.signals.set(.done, true);
defer pending.deref();
this.pending_http_request = null;
callback.function(callback.ctx, pending, result);
if (is_owned_by_self) {
if (this.free_request_headers_buf) {
this.allocator.free(this.request_header_buf);
}
var threadlocal_http: *ThreadlocalAsyncHTTP = @fieldParentPtr("async_http", this);
threadlocal_http.destroy();
}
}
const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .monotonic);
const active_requests = active_requests_count.fetchSub(1, .monotonic);
assert(active_requests > 0);
}
if (!http_thread.queued_tasks.isEmpty() and AsyncHTTP.active_requests_count.load(.monotonic) < AsyncHTTP.max_simultaneous_requests.load(.monotonic)) {
if (!http_thread.queued_tasks.isEmpty() and active_requests_count.load(.monotonic) < max_simultaneous_requests.load(.monotonic)) {
http_thread.loop.loop.wakeup();
}
}
@@ -2268,10 +2430,6 @@ pub const AsyncHTTP = struct {
_ = active_requests_count.fetchAdd(1, .monotonic);
this.err = null;
this.state.store(.sending, .monotonic);
this.client.result_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPCallback).init(
this,
);
this.elapsed = http_thread.timer.read();
if (this.response_buffer.list.capacity == 0) {
this.response_buffer.allocator = default_allocator;
@@ -2429,6 +2587,7 @@ pub fn doRedirect(
return;
}
this.state.reset(this.allocator);
// also reset proxy to redirect
this.proxy_tunneling = false;
if (this.proxy_tunnel != null) {
@@ -2436,11 +2595,19 @@ pub fn doRedirect(
tunnel.deinit();
this.proxy_tunnel = null;
}
if (this.signals.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
abort_tracker.remove(this.token);
return this.start(.{ .bytes = request_body }, body_out_str);
// We have to clone it because it will get destructed once it finishes.
var clone = ThreadlocalAsyncHTTP.new(.{
.async_http = this.async_http().*,
});
var parent: *ThreadlocalAsyncHTTP = @fieldParentPtr("async_http", this.async_http());
parent.destroy();
clone.async_http.client.start(.{ .bytes = request_body }, body_out_str);
}
pub fn async_http(this: *HTTPClient) *AsyncHTTP {
return @fieldParentPtr("client", this);
}
pub fn isHTTPS(this: *HTTPClient) bool {
if (this.http_proxy) |proxy| {
@@ -2477,20 +2644,21 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
// Aborted before connecting
if (this.signals.get(.aborted)) {
this.fail(error.AbortedBeforeConnecting);
// After fail is called, this memory is no longer accessible.
return;
}
var socket = http_thread.connect(this, is_ssl) catch |err| {
bun.handleErrorReturnTrace(err, @errorReturnTrace());
this.fail(err);
// After fail is called, this memory is no longer accessible.
return;
};
if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) {
NewHTTPContext(is_ssl).markSocketAsDead(socket);
this.fail(error.ConnectionClosed);
assert(this.state.fail != null);
// After fail is called, this memory is no longer accessible.
return;
}
}
@@ -2501,6 +2669,9 @@ pub const HTTPResponseMetadata = struct {
url: []const u8 = "",
owned_buf: []u8 = "",
response: picohttp.Response = .{},
redirect_count: u32 = 0,
done: bool = false,
pub fn deinit(this: *HTTPResponseMetadata, allocator: std.mem.Allocator) void {
if (this.owned_buf.len > 0) allocator.free(this.owned_buf);
if (this.response.headers.len > 0) allocator.free(this.response.headers);
@@ -2533,7 +2704,7 @@ fn printResponse(response: picohttp.Response) void {
pub fn onPreconnect(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
log("onPreconnect({})", .{this.url});
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
abort_tracker.remove(this.token);
const ctx = if (comptime is_ssl) &http_thread.https_context else &http_thread.http_context;
ctx.releaseSocket(
socket,
@@ -2547,7 +2718,7 @@ pub fn onPreconnect(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCon
this.state.request_stage = .done;
this.state.stage = .done;
this.proxy_tunneling = false;
this.result_callback.run(@fieldParentPtr("client", this), HTTPClientResult{ .fail = null, .metadata = null, .has_more = false });
this.async_http().onAsyncHTTPCallback(.{ .fail = null, .metadata = null, .has_more = false }, false);
}
pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
@@ -3172,21 +3343,18 @@ pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCo
}
fn fail(this: *HTTPClient, err: anyerror) void {
if (this.signals.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
abort_tracker.remove(this.token);
this.state.request_stage = .fail;
this.state.response_stage = .fail;
this.state.fail = err;
this.state.stage = .fail;
const callback = this.result_callback;
const result = this.toResult();
this.state.reset(this.allocator);
this.proxy_tunneling = false;
callback.run(@fieldParentPtr("client", this), result);
this.async_http().onAsyncHTTPCallback(result, !this.is_preconnect_only);
}
// We have to clone metadata immediately after use
@@ -3245,14 +3413,13 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
}
return;
}
if (this.signals.aborted != null and is_done) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
if (is_done) {
abort_tracker.remove(this.token);
}
log("progressUpdate {}", .{is_done});
const callback = this.result_callback;
if (is_done) {
if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) {
ctx.releaseSocket(
@@ -3273,7 +3440,8 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
}
result.body.?.* = body;
callback.run(@fieldParentPtr("client", this), result);
this.async_http().onAsyncHTTPCallback(result, !this.is_preconnect_only);
if (comptime print_every > 0) {
print_every_i += 1;
@@ -3301,6 +3469,9 @@ pub const HTTPClientResult = struct {
/// If is not chunked encoded and Content-Length is not provided this will be unknown
body_size: BodySize = .unknown,
redirected: bool = false,
aborted: bool = false,
retry_count: u32 = 0,
certificate_info: ?CertificateInfo = null,
pub const BodySize = union(enum) {
@@ -3325,10 +3496,10 @@ pub const HTTPClientResult = struct {
ctx: *anyopaque,
function: Function,
pub const Function = *const fn (*anyopaque, *AsyncHTTP, HTTPClientResult) void;
pub const Function = *const fn (*anyopaque, *PendingHTTPRequest, HTTPClientResult) void;
pub fn run(self: Callback, async_http: *AsyncHTTP, result: HTTPClientResult) void {
self.function(self.ctx, async_http, result);
pub fn run(self: Callback, pending_request: *PendingHTTPRequest, result: HTTPClientResult) void {
self.function(self.ctx, pending_request, result);
}
pub fn New(comptime Type: type, comptime callback: anytype) type {
@@ -3340,9 +3511,9 @@ pub const HTTPClientResult = struct {
};
}
pub fn wrapped_callback(ptr: *anyopaque, async_http: *AsyncHTTP, result: HTTPClientResult) void {
pub fn wrapped_callback(ptr: *anyopaque, parent: *PendingHTTPRequest, result: HTTPClientResult) void {
const casted = @as(Type, @ptrCast(@alignCast(ptr)));
@call(bun.callmod_inline, callback, .{ casted, async_http, result });
@call(bun.callmod_inline, callback, .{ casted, parent, result });
}
};
}
@@ -4008,3 +4179,6 @@ const ThreadlocalAsyncHTTP = struct {
async_http: AsyncHTTP,
pub usingnamespace bun.New(@This());
};
pub var active_requests_count = std.atomic.Value(usize).init(0);
pub var max_simultaneous_requests = std.atomic.Value(usize).init(256);

View File

@@ -38,7 +38,6 @@ const FileSystem = Fs.FileSystem;
const Lock = @import("../lock.zig").Lock;
const URL = @import("../url.zig").URL;
const HTTP = bun.http;
const AsyncHTTP = HTTP.AsyncHTTP;
const HTTPChannel = HTTP.HTTPChannel;
const HeaderBuilder = HTTP.HeaderBuilder;
@@ -232,7 +231,8 @@ pub const Aligner = struct {
};
const NetworkTask = struct {
http: AsyncHTTP = undefined,
http: *HTTP.PendingHTTPRequest,
http_task: ?*ThreadPool.Task = null,
task_id: u64,
url_buf: []const u8 = &[_]u8{},
retried: u16 = 0,
@@ -259,10 +259,11 @@ const NetworkTask = struct {
};
pub const DedupeMap = std.HashMap(u64, DedupeMapEntry, IdentityContext(u64), 80);
pub fn notify(this: *NetworkTask, async_http: *AsyncHTTP, _: anytype) void {
pub fn notify(this: *NetworkTask, pending_http_request: *HTTP.PendingHTTPRequest, _: anytype) void {
defer this.package_manager.wake();
async_http.real.?.* = async_http.*;
async_http.real.?.response_buffer = async_http.response_buffer;
this.http = pending_http_request;
bun.debugAssert(this.http == pending_http_request);
this.response_buffer = pending_http_request.response_body;
this.package_manager.async_network_task_queue.push(this);
}
@@ -422,18 +423,24 @@ const NetworkTask = struct {
header_builder.content = GlobalStringBuilder{ .ptr = @as([*]u8, @ptrFromInt(@intFromPtr(bun.span(default_headers_buf).ptr))), .len = default_headers_buf.len, .cap = default_headers_buf.len };
}
this.response_buffer = try MutableString.init(allocator, 0);
this.allocator = allocator;
const url = URL.parse(this.url_buf);
this.http = AsyncHTTP.init(allocator, .GET, url, header_builder.entries, header_builder.content.ptr.?[0..header_builder.content.len], &this.response_buffer, "", this.getCompletionCallback(), HTTP.FetchRedirect.follow, .{
.http_proxy = this.package_manager.httpProxy(url),
});
this.http.client.reject_unauthorized = this.package_manager.tlsRejectUnauthorized();
if (PackageManager.verbose_install) {
this.http.client.verbose = .headers;
}
const verbose: HTTP.HTTPVerboseLevel = if (PackageManager.verbose_install) .headers else .none;
this.http = bun.http.fetchBatched(
&.{
.allocator = allocator,
.method = .GET,
.url = url,
.headers = header_builder.entries,
.headers_buf = header_builder.content.ptr.?[0..header_builder.content.len],
.reject_unauthorized = this.package_manager.tlsRejectUnauthorized(),
.http_proxy = this.package_manager.httpProxy(url),
.verbose = verbose,
},
this.getCompletionCallback(),
&this.http_task,
);
this.callback = .{
.package_manifest = .{
@@ -441,17 +448,6 @@ const NetworkTask = struct {
.loaded_manifest = if (loaded_manifest) |manifest| manifest.* else null,
},
};
if (PackageManager.verbose_install) {
this.http.verbose = .headers;
this.http.client.verbose = .headers;
}
// Incase the ETag causes invalidation, we fallback to the last modified date.
if (last_modified.len != 0 and bun.getRuntimeFeatureFlag("BUN_FEATURE_FLAG_LAST_MODIFIED_PRETEND_304")) {
this.http.client.force_last_modified = true;
this.http.client.if_modified_since = last_modified;
}
}
pub fn getCompletionCallback(this: *NetworkTask) HTTP.HTTPClientResult.Callback {
@@ -459,7 +455,11 @@ const NetworkTask = struct {
}
pub fn schedule(this: *NetworkTask, batch: *ThreadPool.Batch) void {
this.http.schedule(this.allocator, batch);
batch.push(.{
.head = this.http_task.?,
.tail = this.http_task.?,
.len = 1,
});
}
pub fn forTarball(
@@ -510,13 +510,21 @@ const NetworkTask = struct {
const url = URL.parse(this.url_buf);
this.http = AsyncHTTP.init(allocator, .GET, url, header_builder.entries, header_buf, &this.response_buffer, "", this.getCompletionCallback(), HTTP.FetchRedirect.follow, .{
.http_proxy = this.package_manager.httpProxy(url),
});
this.http.client.reject_unauthorized = this.package_manager.tlsRejectUnauthorized();
if (PackageManager.verbose_install) {
this.http.client.verbose = .headers;
}
const verbose: HTTP.HTTPVerboseLevel = if (PackageManager.verbose_install) .headers else .none;
this.http = bun.http.fetchBatched(
&.{
.allocator = allocator,
.method = .GET,
.url = url,
.headers = header_builder.entries,
.headers_buf = header_buf,
.reject_unauthorized = this.package_manager.tlsRejectUnauthorized(),
.http_proxy = this.package_manager.httpProxy(url),
.verbose = verbose,
},
this.getCompletionCallback(),
&this.http_task,
);
}
};
@@ -699,7 +707,7 @@ pub const Task = struct {
const package_manifest = Npm.Registry.getPackageMetadata(
allocator,
manager.scopeForPackageName(manifest.name.slice()),
manifest.network.http.response.?,
manifest.network.http.response().?,
body,
&this.log,
manifest.name.slice(),
@@ -4275,6 +4283,7 @@ pub const PackageManager = struct {
.task_id = task_id,
.callback = undefined,
.allocator = this.allocator,
.http = undefined,
.package_manager = this,
.apply_patch_task = if (patch_name_and_version_hash) |h| brk: {
const patch_hash = this.lockfile.patched_dependencies.get(h).?.patchfileHash().?;
@@ -5229,6 +5238,7 @@ pub const PackageManager = struct {
var network_task = this.getNetworkTask();
network_task.* = .{
.http = undefined,
.package_manager = &PackageManager.instance, // https://github.com/ziglang/zig/issues/14005
.callback = undefined,
.task_id = task_id,
@@ -6167,18 +6177,18 @@ pub const PackageManager = struct {
}
}
if (!has_network_error and task.http.response == null) {
if (!has_network_error and task.http.response() == null) {
has_network_error = true;
const min = manager.options.min_simultaneous_requests;
const max = AsyncHTTP.max_simultaneous_requests.load(.monotonic);
const max = HTTP.max_simultaneous_requests.load(.monotonic);
if (max > min) {
AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .monotonic);
HTTP.max_simultaneous_requests.store(@max(min, max / 2), .monotonic);
}
}
// Handle retry-able errors.
if (task.http.response == null or task.http.response.?.status_code > 499) {
const err = task.http.err orelse error.HTTPError;
if (task.http.response() == null or task.http.response().?.status_code > 499) {
const err = task.http.err() orelse error.HTTPError;
if (task.retried < manager.options.max_retry_count) {
task.retried += 1;
@@ -6198,9 +6208,9 @@ pub const PackageManager = struct {
}
}
const response = task.http.response orelse {
const response = task.http.response() orelse {
// Handle non-retry-able errors.
const err = task.http.err orelse error.HTTPError;
const err = task.http.err() orelse error.HTTPError;
if (@TypeOf(callbacks.onPackageManifestError) != void) {
callbacks.onPackageManifestError(
@@ -6272,7 +6282,7 @@ pub const PackageManager = struct {
logger.Loc.Empty,
manager.allocator,
"<r><red><b>GET<r><red> {s}<d> - {d}<r>",
.{ task.http.client.url.href, response.status_code },
.{ task.url_buf, response.status_code },
) catch bun.outOfMemory();
} else {
manager.log.addWarningFmt(
@@ -6280,7 +6290,7 @@ pub const PackageManager = struct {
logger.Loc.Empty,
manager.allocator,
"<r><yellow><b>GET<r><yellow> {s}<d> - {d}<r>",
.{ task.http.client.url.href, response.status_code },
.{ task.url_buf, response.status_code },
) catch bun.outOfMemory();
}
if (manager.subcommand != .remove) {
@@ -6343,17 +6353,17 @@ pub const PackageManager = struct {
manager.task_batch.push(ThreadPool.Batch.from(manager.enqueueParseNPMPackage(task.task_id, name, task)));
},
.extract => |*extract| {
if (!has_network_error and task.http.response == null) {
if (!has_network_error and task.http.response() == null) {
has_network_error = true;
const min = manager.options.min_simultaneous_requests;
const max = AsyncHTTP.max_simultaneous_requests.load(.monotonic);
const max = HTTP.max_simultaneous_requests.load(.monotonic);
if (max > min) {
AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .monotonic);
HTTP.max_simultaneous_requests.store(@max(min, max / 2), .monotonic);
}
}
if (task.http.response == null or task.http.response.?.status_code > 499) {
const err = task.http.err orelse error.TarballFailedToDownload;
if (task.http.response() == null or task.http.response().?.status_code > 499) {
const err = task.http.err() orelse error.TarballFailedToDownload;
if (task.retried < manager.options.max_retry_count) {
task.retried += 1;
@@ -6379,8 +6389,8 @@ pub const PackageManager = struct {
}
}
const response = task.http.response orelse {
const err = task.http.err orelse error.TarballFailedToDownload;
const response = task.http.response() orelse {
const err = task.http.err() orelse error.TarballFailedToDownload;
if (@TypeOf(callbacks.onPackageDownloadError) != void) {
const package_id = manager.lockfile.buffers.resolutions.items[extract.dependency_id];
@@ -6466,7 +6476,7 @@ pub const PackageManager = struct {
manager.allocator,
"<r><red><b>GET<r><red> {s}<d> - {d}<r>",
.{
task.http.client.url.href,
task.url_buf,
response.status_code,
},
) catch bun.outOfMemory();
@@ -6477,7 +6487,7 @@ pub const PackageManager = struct {
manager.allocator,
"<r><yellow><b>GET<r><yellow> {s}<d> - {d}<r>",
.{
task.http.client.url.href,
task.url_buf,
response.status_code,
},
) catch bun.outOfMemory();
@@ -7170,7 +7180,7 @@ pub const PackageManager = struct {
if (std.fmt.parseInt(u16, retry_count, 10)) |int| this.max_retry_count = int else |_| {}
}
AsyncHTTP.loadEnv(allocator, log, env);
bun.http.loadEnv(allocator, log, env);
if (env.get("BUN_CONFIG_SKIP_SAVE_LOCKFILE")) |check_bool| {
this.do.save_lockfile = strings.eqlComptime(check_bool, "0");

View File

@@ -40,7 +40,6 @@ const Fs = @import("../fs.zig");
const FileSystem = Fs.FileSystem;
const Lock = @import("../lock.zig").Lock;
const URL = @import("../url.zig").URL;
const AsyncHTTP = bun.http.AsyncHTTP;
const HTTPChannel = bun.http.HTTPChannel;
const Integrity = @import("./integrity.zig").Integrity;