mirror of
https://github.com/oven-sh/bun
synced 2026-02-13 20:39:05 +00:00
[install] reduce parallel HTTP requests under heavy load (#2536)
* [install] reduce parallel HTTP requests under heavy load * make `max_simultaneous_requests` atomic
This commit is contained in:
@@ -198,7 +198,7 @@ pub fn main() anyerror!void {
|
||||
try channel.buffer.ensureTotalCapacity(args.count);
|
||||
|
||||
try NetworkThread.init();
|
||||
if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests = args.concurrency;
|
||||
if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests.store(args.concurrency, .Monotonic);
|
||||
const Group = struct {
|
||||
response_body: MutableString = undefined,
|
||||
context: HTTP.HTTPChannelContext = undefined,
|
||||
|
||||
@@ -510,9 +510,6 @@ pub const EventList = struct {
|
||||
var retry_remaining: usize = 10;
|
||||
const rand = random.random();
|
||||
retry: while (retry_remaining > 0) {
|
||||
this.async_http.max_retry_count = 0;
|
||||
this.async_http.retries_count = 0;
|
||||
|
||||
const response = this.async_http.sendSync(true) catch |err| {
|
||||
if (FeatureFlags.verbose_analytics) {
|
||||
Output.prettyErrorln("[Analytics] failed due to error {s} ({d} retries remain)", .{ @errorName(err), retry_remaining });
|
||||
|
||||
@@ -104,33 +104,8 @@ pub const Run = struct {
|
||||
Output.prettyErrorln("\n", .{});
|
||||
Global.exit(1);
|
||||
};
|
||||
AsyncHTTP.max_simultaneous_requests = 255;
|
||||
|
||||
if (b.env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
|
||||
load: {
|
||||
AsyncHTTP.max_simultaneous_requests = std.fmt.parseInt(u16, max_http_requests, 10) catch {
|
||||
vm.log.addErrorFmt(
|
||||
null,
|
||||
logger.Loc.Empty,
|
||||
vm.allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
|
||||
.{max_http_requests},
|
||||
) catch unreachable;
|
||||
break :load;
|
||||
};
|
||||
|
||||
if (AsyncHTTP.max_simultaneous_requests == 0) {
|
||||
vm.log.addWarningFmt(
|
||||
null,
|
||||
logger.Loc.Empty,
|
||||
vm.allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
|
||||
.{},
|
||||
) catch unreachable;
|
||||
AsyncHTTP.max_simultaneous_requests = 255;
|
||||
}
|
||||
}
|
||||
}
|
||||
AsyncHTTP.loadEnv(vm.allocator, vm.log, b.env);
|
||||
|
||||
vm.loadExtraEnv();
|
||||
vm.is_main_thread = true;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
const picohttp = @import("bun").picohttp;
|
||||
const bun = @import("bun");
|
||||
const picohttp = bun.picohttp;
|
||||
const JSC = bun.JSC;
|
||||
const string = bun.string;
|
||||
const Output = bun.Output;
|
||||
@@ -10,6 +10,9 @@ const MutableString = bun.MutableString;
|
||||
const FeatureFlags = bun.FeatureFlags;
|
||||
const stringZ = bun.stringZ;
|
||||
const C = bun.C;
|
||||
const Loc = bun.logger.Loc;
|
||||
const Log = bun.logger.Log;
|
||||
const DotEnv = @import("./env_loader.zig");
|
||||
const std = @import("std");
|
||||
const URL = @import("./url.zig").URL;
|
||||
pub const Method = @import("./http/method.zig").Method;
|
||||
@@ -18,16 +21,16 @@ const Lock = @import("./lock.zig").Lock;
|
||||
const HTTPClient = @This();
|
||||
const Zlib = @import("./zlib.zig");
|
||||
const StringBuilder = @import("./string_builder.zig");
|
||||
const AsyncIO = @import("bun").AsyncIO;
|
||||
const ThreadPool = @import("bun").ThreadPool;
|
||||
const BoringSSL = @import("bun").BoringSSL;
|
||||
const AsyncIO = bun.AsyncIO;
|
||||
const ThreadPool = bun.ThreadPool;
|
||||
const BoringSSL = bun.BoringSSL;
|
||||
pub const NetworkThread = @import("./network_thread.zig");
|
||||
const ObjectPool = @import("./pool.zig").ObjectPool;
|
||||
const SOCK = os.SOCK;
|
||||
const Arena = @import("./mimalloc_arena.zig").Arena;
|
||||
const ZlibPool = @import("./http/zlib.zig");
|
||||
const URLBufferPool = ObjectPool([4096]u8, null, false, 10);
|
||||
const uws = @import("bun").uws;
|
||||
const uws = bun.uws;
|
||||
pub const MimeType = @import("./http/mime_type.zig");
|
||||
pub const URLPath = @import("./http/url_path.zig");
|
||||
// This becomes Arena.allocator
|
||||
@@ -570,8 +573,9 @@ pub const HTTPThread = struct {
|
||||
}
|
||||
|
||||
var count: usize = 0;
|
||||
var remaining: usize = AsyncHTTP.max_simultaneous_requests - AsyncHTTP.active_requests_count.loadUnchecked();
|
||||
if (remaining == 0) return;
|
||||
var active = AsyncHTTP.active_requests_count.load(.Monotonic);
|
||||
const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic);
|
||||
if (active >= max) return;
|
||||
defer {
|
||||
if (comptime Environment.allow_assert) {
|
||||
if (count > 0)
|
||||
@@ -588,8 +592,8 @@ pub const HTTPThread = struct {
|
||||
count += 1;
|
||||
}
|
||||
|
||||
remaining -= 1;
|
||||
if (remaining == 0) break;
|
||||
active += 1;
|
||||
if (active >= max) break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1171,7 +1175,6 @@ pub const AsyncHTTP = struct {
|
||||
allocator: std.mem.Allocator,
|
||||
request_header_buf: string = "",
|
||||
method: Method = Method.GET,
|
||||
max_retry_count: u32 = 0,
|
||||
url: URL,
|
||||
http_proxy: ?URL = null,
|
||||
real: ?*AsyncHTTP = null,
|
||||
@@ -1185,7 +1188,6 @@ pub const AsyncHTTP = struct {
|
||||
redirected: bool = false,
|
||||
|
||||
response_encoding: Encoding = Encoding.identity,
|
||||
retries_count: u32 = 0,
|
||||
verbose: bool = false,
|
||||
|
||||
client: HTTPClient = undefined,
|
||||
@@ -1197,7 +1199,33 @@ pub const AsyncHTTP = struct {
|
||||
gzip_elapsed: u64 = 0,
|
||||
|
||||
pub var active_requests_count = std.atomic.Atomic(usize).init(0);
|
||||
pub var max_simultaneous_requests: usize = 256;
|
||||
pub var max_simultaneous_requests = std.atomic.Atomic(usize).init(256);
|
||||
|
||||
pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void {
|
||||
if (env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
|
||||
const max = std.fmt.parseInt(u16, max_http_requests, 10) catch {
|
||||
logger.addErrorFmt(
|
||||
null,
|
||||
Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
|
||||
.{max_http_requests},
|
||||
) catch unreachable;
|
||||
return;
|
||||
};
|
||||
if (max == 0) {
|
||||
logger.addWarningFmt(
|
||||
null,
|
||||
Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
|
||||
.{},
|
||||
) catch unreachable;
|
||||
return;
|
||||
}
|
||||
AsyncHTTP.max_simultaneous_requests.store(max, .Monotonic);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(this: *AsyncHTTP) void {
|
||||
this.response_headers.deinit(this.allocator);
|
||||
@@ -1357,7 +1385,7 @@ pub const AsyncHTTP = struct {
|
||||
|
||||
completion.function(completion.ctx, result);
|
||||
|
||||
if (active_requests == AsyncHTTP.max_simultaneous_requests) {
|
||||
if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) {
|
||||
http_thread.drainEvents();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ const C = bun.C;
|
||||
const std = @import("std");
|
||||
|
||||
const JSLexer = bun.js_lexer;
|
||||
const logger = @import("bun").logger;
|
||||
const logger = bun.logger;
|
||||
|
||||
const js_parser = bun.js_parser;
|
||||
const json_parser = bun.JSON;
|
||||
@@ -30,23 +30,23 @@ const NodeModuleBundle = @import("../node_module_bundle.zig").NodeModuleBundle;
|
||||
const DotEnv = @import("../env_loader.zig");
|
||||
const which = @import("../which.zig").which;
|
||||
const Run = @import("../bun_js.zig").Run;
|
||||
const HeaderBuilder = @import("bun").HTTP.HeaderBuilder;
|
||||
const Fs = @import("../fs.zig");
|
||||
const FileSystem = Fs.FileSystem;
|
||||
const Lock = @import("../lock.zig").Lock;
|
||||
const URL = @import("../url.zig").URL;
|
||||
const AsyncHTTP = @import("bun").HTTP.AsyncHTTP;
|
||||
const HTTPChannel = @import("bun").HTTP.HTTPChannel;
|
||||
const NetworkThread = @import("bun").HTTP.NetworkThread;
|
||||
const HTTP = @import("bun").HTTP;
|
||||
const HTTP = bun.HTTP;
|
||||
const AsyncHTTP = HTTP.AsyncHTTP;
|
||||
const HTTPChannel = HTTP.HTTPChannel;
|
||||
const NetworkThread = HTTP.NetworkThread;
|
||||
const HeaderBuilder = HTTP.HeaderBuilder;
|
||||
|
||||
const Integrity = @import("./integrity.zig").Integrity;
|
||||
const clap = @import("bun").clap;
|
||||
const clap = bun.clap;
|
||||
const ExtractTarball = @import("./extract_tarball.zig");
|
||||
const Npm = @import("./npm.zig");
|
||||
const Bitset = @import("./bit_set.zig").DynamicBitSetUnmanaged;
|
||||
const z_allocator = @import("../memory_allocator.zig").z_allocator;
|
||||
const Syscall = @import("bun").JSC.Node.Syscall;
|
||||
const Syscall = bun.JSC.Node.Syscall;
|
||||
const RunCommand = @import("../cli/run_command.zig").RunCommand;
|
||||
threadlocal var initialized_store = false;
|
||||
const Futex = @import("../futex.zig");
|
||||
@@ -160,6 +160,7 @@ const NetworkTask = struct {
|
||||
http: AsyncHTTP = undefined,
|
||||
task_id: u64,
|
||||
url_buf: []const u8 = &[_]u8{},
|
||||
retried: u16 = 0,
|
||||
allocator: std.mem.Allocator,
|
||||
request_buffer: MutableString = undefined,
|
||||
response_buffer: MutableString = undefined,
|
||||
@@ -339,7 +340,6 @@ const NetworkTask = struct {
|
||||
this.package_manager.httpProxy(url),
|
||||
null,
|
||||
);
|
||||
this.http.max_retry_count = this.package_manager.options.max_retry_count;
|
||||
this.callback = .{
|
||||
.package_manifest = .{
|
||||
.name = try strings.StringOrTinyString.initAppendIfNeeded(name, *FileSystem.FilenameStore, &FileSystem.FilenameStore.instance),
|
||||
@@ -416,7 +416,6 @@ const NetworkTask = struct {
|
||||
this.package_manager.httpProxy(url),
|
||||
null,
|
||||
);
|
||||
this.http.max_retry_count = this.package_manager.options.max_retry_count;
|
||||
this.callback = .{ .extract = tarball };
|
||||
}
|
||||
};
|
||||
@@ -1559,7 +1558,7 @@ const TaskCallbackList = std.ArrayListUnmanaged(TaskCallbackContext);
|
||||
const TaskDependencyQueue = std.HashMapUnmanaged(u64, TaskCallbackList, IdentityContext(u64), 80);
|
||||
const TaskChannel = sync.Channel(Task, .{ .Static = 4096 });
|
||||
const NetworkChannel = sync.Channel(*NetworkTask, .{ .Static = 8192 });
|
||||
const ThreadPool = @import("bun").ThreadPool;
|
||||
const ThreadPool = bun.ThreadPool;
|
||||
const PackageManifestMap = std.HashMapUnmanaged(PackageNameHash, Npm.PackageManifest, IdentityContext(PackageNameHash), 80);
|
||||
const RepositoryMap = std.HashMapUnmanaged(u64, std.os.fd_t, IdentityContext(u64), 80);
|
||||
|
||||
@@ -1568,7 +1567,7 @@ pub const CacheLevel = struct {
|
||||
use_etag: bool,
|
||||
use_last_modified: bool,
|
||||
};
|
||||
const AsyncIO = @import("bun").AsyncIO;
|
||||
const AsyncIO = bun.AsyncIO;
|
||||
const Waker = AsyncIO.Waker;
|
||||
|
||||
// We can't know all the packages we need until we've downloaded all the packages
|
||||
@@ -3577,6 +3576,7 @@ pub const PackageManager = struct {
|
||||
comptime log_level: Options.LogLevel,
|
||||
) anyerror!void {
|
||||
var has_updated_this_run = false;
|
||||
var has_network_error = false;
|
||||
|
||||
var timestamp_this_tick: ?u32 = null;
|
||||
|
||||
@@ -3598,7 +3598,28 @@ pub const PackageManager = struct {
|
||||
const response = task.http.response orelse {
|
||||
const err = task.http.err orelse error.HTTPError;
|
||||
|
||||
if (@TypeOf(callbacks.onPackageManifestError) != void) {
|
||||
if (task.retried < manager.options.max_retry_count) {
|
||||
task.retried += 1;
|
||||
if (!has_network_error) {
|
||||
has_network_error = true;
|
||||
const min = manager.options.min_simultaneous_requests;
|
||||
const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic);
|
||||
if (max > min) {
|
||||
AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .Monotonic);
|
||||
}
|
||||
}
|
||||
manager.enqueueNetworkTask(task);
|
||||
|
||||
if (manager.options.log_level.isVerbose()) {
|
||||
manager.log.addWarningFmt(
|
||||
null,
|
||||
logger.Loc.Empty,
|
||||
manager.allocator,
|
||||
"<r><yellow>warn:<r> {s} downloading package manifest <b>{s}<r>",
|
||||
.{ bun.span(@errorName(err)), name.slice() },
|
||||
) catch unreachable;
|
||||
}
|
||||
} else if (@TypeOf(callbacks.onPackageManifestError) != void) {
|
||||
callbacks.onPackageManifestError(
|
||||
extract_ctx,
|
||||
name.slice(),
|
||||
@@ -3607,8 +3628,7 @@ pub const PackageManager = struct {
|
||||
);
|
||||
} else if (comptime log_level != .silent) {
|
||||
const fmt = "\n<r><red>error<r>: {s} downloading package manifest <b>{s}<r>\n";
|
||||
const error_name: string = bun.span(@errorName(err));
|
||||
const args = .{ error_name, name.slice() };
|
||||
const args = .{ bun.span(@errorName(err)), name.slice() };
|
||||
if (comptime log_level.showProgress()) {
|
||||
Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress);
|
||||
} else {
|
||||
@@ -3759,9 +3779,34 @@ pub const PackageManager = struct {
|
||||
.extract => |extract| {
|
||||
const response = task.http.response orelse {
|
||||
const err = task.http.err orelse error.TarballFailedToDownload;
|
||||
const package_id = manager.lockfile.buffers.resolutions.items[extract.dependency_id];
|
||||
|
||||
if (@TypeOf(callbacks.onPackageDownloadError) != void) {
|
||||
if (task.retried < manager.options.max_retry_count) {
|
||||
task.retried += 1;
|
||||
if (!has_network_error) {
|
||||
has_network_error = true;
|
||||
const min = manager.options.min_simultaneous_requests;
|
||||
const max = AsyncHTTP.max_simultaneous_requests.load(.Monotonic);
|
||||
if (max > min) {
|
||||
AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .Monotonic);
|
||||
}
|
||||
}
|
||||
manager.enqueueNetworkTask(task);
|
||||
|
||||
if (manager.options.log_level.isVerbose()) {
|
||||
manager.log.addWarningFmt(
|
||||
null,
|
||||
logger.Loc.Empty,
|
||||
manager.allocator,
|
||||
"<r><yellow>warn:<r> {s} downloading tarball <b>{s}@{s}<r>",
|
||||
.{
|
||||
bun.span(@errorName(err)),
|
||||
extract.name.slice(),
|
||||
extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items),
|
||||
},
|
||||
) catch unreachable;
|
||||
}
|
||||
} else if (@TypeOf(callbacks.onPackageDownloadError) != void) {
|
||||
const package_id = manager.lockfile.buffers.resolutions.items[extract.dependency_id];
|
||||
callbacks.onPackageDownloadError(
|
||||
extract_ctx,
|
||||
package_id,
|
||||
@@ -3770,18 +3815,18 @@ pub const PackageManager = struct {
|
||||
err,
|
||||
task.url_buf,
|
||||
);
|
||||
} else {
|
||||
} else if (comptime log_level != .silent) {
|
||||
const fmt = "\n<r><red>error<r>: {s} downloading tarball <b>{s}@{s}<r>\n";
|
||||
const error_name: string = bun.span(@errorName(err));
|
||||
const args = .{ error_name, extract.name.slice(), extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items) };
|
||||
|
||||
if (comptime log_level != .silent) {
|
||||
if (comptime log_level.showProgress()) {
|
||||
Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress);
|
||||
} else {
|
||||
Output.prettyErrorln(fmt, args);
|
||||
Output.flush();
|
||||
}
|
||||
const args = .{
|
||||
bun.span(@errorName(err)),
|
||||
extract.name.slice(),
|
||||
extract.resolution.fmt(manager.lockfile.buffers.string_bytes.items),
|
||||
};
|
||||
if (comptime log_level.showProgress()) {
|
||||
Output.prettyWithPrinterFn(fmt, args, Progress.log, &manager.progress);
|
||||
} else {
|
||||
Output.prettyErrorln(fmt, args);
|
||||
Output.flush();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4194,6 +4239,7 @@ pub const PackageManager = struct {
|
||||
// 2. Has a platform and/or os specified, which evaluates to not disabled
|
||||
native_bin_link_allowlist: []const PackageNameHash = &default_native_bin_link_allowlist,
|
||||
max_retry_count: u16 = 5,
|
||||
min_simultaneous_requests: usize = 4,
|
||||
|
||||
pub fn shouldPrintCommandName(this: *const Options) bool {
|
||||
return this.log_level != .silent and this.do.summary;
|
||||
@@ -4497,9 +4543,7 @@ pub const PackageManager = struct {
|
||||
}
|
||||
|
||||
if (env.map.get("BUN_CONFIG_HTTP_RETRY_COUNT")) |retry_count| {
|
||||
if (std.fmt.parseInt(i32, retry_count, 10)) |int| {
|
||||
this.max_retry_count = @intCast(u16, @min(@max(int, 0), 65355));
|
||||
} else |_| {}
|
||||
if (std.fmt.parseInt(u16, retry_count, 10)) |int| this.max_retry_count = int else |_| {}
|
||||
}
|
||||
|
||||
if (env.map.get("BUN_CONFIG_LINK_NATIVE_BINS")) |native_packages| {
|
||||
@@ -4522,31 +4566,7 @@ pub const PackageManager = struct {
|
||||
// this.enable.deduplicate_packages = false;
|
||||
// }
|
||||
|
||||
if (env.map.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
|
||||
load: {
|
||||
AsyncHTTP.max_simultaneous_requests = std.fmt.parseInt(u16, max_http_requests, 10) catch {
|
||||
log.addErrorFmt(
|
||||
null,
|
||||
logger.Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
|
||||
.{max_http_requests},
|
||||
) catch unreachable;
|
||||
break :load;
|
||||
};
|
||||
|
||||
if (AsyncHTTP.max_simultaneous_requests == 0) {
|
||||
log.addWarningFmt(
|
||||
null,
|
||||
logger.Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
|
||||
.{},
|
||||
) catch unreachable;
|
||||
AsyncHTTP.max_simultaneous_requests = 255;
|
||||
}
|
||||
}
|
||||
}
|
||||
AsyncHTTP.loadEnv(allocator, log, env);
|
||||
|
||||
if (env.map.get("BUN_CONFIG_SKIP_SAVE_LOCKFILE")) |check_bool| {
|
||||
this.do.save_lockfile = strings.eqlComptime(check_bool, "0");
|
||||
|
||||
Reference in New Issue
Block a user