mirror of
https://github.com/oven-sh/bun
synced 2026-02-20 15:51:46 +00:00
Compare commits
1 Commits
claude/fix
...
jarred/fet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
633a457e72 |
@@ -192,7 +192,7 @@ pub fn main() anyerror!void {
|
||||
try channel.buffer.ensureTotalCapacity(args.count);
|
||||
|
||||
try NetworkThread.init();
|
||||
if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests.store(args.concurrency, .monotonic);
|
||||
if (args.concurrency > 0) HTTP.max_simultaneous_requests.store(args.concurrency, .monotonic);
|
||||
const Group = struct {
|
||||
response_body: MutableString = undefined,
|
||||
context: HTTP.HTTPChannelContext = undefined,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
const std = @import("std");
|
||||
const Api = @import("../../api/schema.zig").Api;
|
||||
const bun = @import("root").bun;
|
||||
const MimeType = bun.http.MimeType;
|
||||
const MimeType = http.MimeType;
|
||||
const ZigURL = @import("../../url.zig").URL;
|
||||
const http = bun.http;
|
||||
const FetchRedirect = http.FetchRedirect;
|
||||
@@ -57,6 +57,7 @@ const Async = bun.Async;
|
||||
const BoringSSL = bun.BoringSSL;
|
||||
const X509 = @import("../api/bun/x509.zig");
|
||||
const PosixToWinNormalizer = bun.path.PosixToWinNormalizer;
|
||||
const Sendfile = http.Sendfile;
|
||||
|
||||
pub const Response = struct {
|
||||
const ResponseMixin = BodyMixin(@This());
|
||||
@@ -720,7 +721,7 @@ pub const Fetch = struct {
|
||||
pub const FetchTasklet = struct {
|
||||
const log = Output.scoped(.FetchTasklet, false);
|
||||
|
||||
http: ?*http.AsyncHTTP = null,
|
||||
pending_request: ?*http.PendingHTTPRequest = null,
|
||||
result: http.HTTPClientResult = .{},
|
||||
metadata: ?http.HTTPResponseMetadata = null,
|
||||
javascript_vm: *VirtualMachine = undefined,
|
||||
@@ -741,7 +742,6 @@ pub const Fetch = struct {
|
||||
promise: JSC.JSPromise.Strong,
|
||||
concurrent_task: JSC.ConcurrentTask = .{},
|
||||
poll_ref: Async.KeepAlive = .{},
|
||||
memory_reporter: *JSC.MemoryReportingAllocator,
|
||||
/// For Http Client requests
|
||||
/// when Content-Length is provided this represents the whole size of the request
|
||||
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
|
||||
@@ -753,8 +753,6 @@ pub const Fetch = struct {
|
||||
url_proxy_buffer: []const u8 = "",
|
||||
|
||||
signal: ?*JSC.WebCore.AbortSignal = null,
|
||||
signals: http.Signals = .{},
|
||||
signal_store: http.Signals.Store = .{},
|
||||
has_schedule_callback: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
|
||||
// must be stored because AbortSignal stores reason weakly
|
||||
@@ -773,23 +771,11 @@ pub const Fetch = struct {
|
||||
|
||||
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
|
||||
|
||||
pub fn ref(this: *FetchTasklet) void {
|
||||
const count = this.ref_count.fetchAdd(1, .monotonic);
|
||||
bun.debugAssert(count > 0);
|
||||
}
|
||||
|
||||
pub fn deref(this: *FetchTasklet) void {
|
||||
const count = this.ref_count.fetchSub(1, .monotonic);
|
||||
bun.debugAssert(count > 0);
|
||||
|
||||
if (count == 1) {
|
||||
this.deinit();
|
||||
}
|
||||
}
|
||||
pub usingnamespace bun.NewThreadSafeRefCounted(@This(), @This().deinit);
|
||||
|
||||
pub const HTTPRequestBody = union(enum) {
|
||||
AnyBlob: AnyBlob,
|
||||
Sendfile: http.Sendfile,
|
||||
Sendfile: Sendfile,
|
||||
|
||||
pub fn store(this: *HTTPRequestBody) ?*JSC.WebCore.Blob.Store {
|
||||
return switch (this.*) {
|
||||
@@ -824,7 +810,7 @@ pub const Fetch = struct {
|
||||
|
||||
fn clearData(this: *FetchTasklet) void {
|
||||
log("clearData", .{});
|
||||
const allocator = this.memory_reporter.allocator();
|
||||
const allocator = bun.default_allocator;
|
||||
if (this.url_proxy_buffer.len > 0) {
|
||||
allocator.free(this.url_proxy_buffer);
|
||||
this.url_proxy_buffer.len = 0;
|
||||
@@ -844,12 +830,14 @@ pub const Fetch = struct {
|
||||
this.request_headers.buf.deinit(allocator);
|
||||
this.request_headers = Headers{ .allocator = undefined };
|
||||
|
||||
if (this.http != null) {
|
||||
this.http.?.clearData();
|
||||
if (this.pending_request) |request| {
|
||||
this.pending_request = null;
|
||||
request.ignoreRemainingResponseBody();
|
||||
request.deref();
|
||||
}
|
||||
|
||||
if (this.metadata != null) {
|
||||
this.metadata.?.deinit(allocator);
|
||||
if (this.metadata) |*metadata| {
|
||||
metadata.deinit(allocator);
|
||||
this.metadata = null;
|
||||
}
|
||||
|
||||
@@ -886,13 +874,11 @@ pub const Fetch = struct {
|
||||
|
||||
this.clearData();
|
||||
|
||||
var reporter = this.memory_reporter;
|
||||
const allocator = reporter.allocator();
|
||||
|
||||
if (this.http) |http_| allocator.destroy(http_);
|
||||
allocator.destroy(this);
|
||||
// reporter.assert();
|
||||
bun.default_allocator.destroy(reporter);
|
||||
if (this.pending_request) |request| {
|
||||
this.pending_request = null;
|
||||
request.deref();
|
||||
}
|
||||
this.destroy();
|
||||
}
|
||||
|
||||
fn getCurrentResponse(this: *FetchTasklet) ?*Response {
|
||||
@@ -1029,7 +1015,6 @@ pub const Fetch = struct {
|
||||
buffer_reset = false;
|
||||
if (!this.result.has_more) {
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
|
||||
|
||||
// done resolve body
|
||||
var old = body.value;
|
||||
@@ -1041,7 +1026,7 @@ pub const Fetch = struct {
|
||||
response.body.value = body_value;
|
||||
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = this.memory_reporter.allocator(),
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
@@ -1229,12 +1214,11 @@ pub const Fetch = struct {
|
||||
check_result.ensureStillAlive();
|
||||
check_result.protect();
|
||||
this.abort_reason = check_result;
|
||||
this.signal_store.aborted.store(true, .monotonic);
|
||||
this.tracker.didCancel(this.global_this);
|
||||
|
||||
// we need to abort the request
|
||||
if (this.http != null) {
|
||||
http.http_thread.scheduleShutdown(this.http.?);
|
||||
if (this.pending_request) |request| {
|
||||
request.abort();
|
||||
}
|
||||
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
|
||||
return false;
|
||||
@@ -1286,8 +1270,6 @@ pub const Fetch = struct {
|
||||
// some times we don't have metadata so we also check http.url
|
||||
const path = if (this.metadata) |metadata|
|
||||
bun.String.createUTF8(metadata.url)
|
||||
else if (this.http) |http_|
|
||||
bun.String.createUTF8(http_.url.href)
|
||||
else
|
||||
bun.String.empty;
|
||||
|
||||
@@ -1383,12 +1365,24 @@ pub const Fetch = struct {
|
||||
this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, this.global_this);
|
||||
}
|
||||
|
||||
pub fn isAborted(this: *const FetchTasklet) bool {
|
||||
if (this.abort_reason != .zero) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.pending_request) |request| {
|
||||
return request.isAborted();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult {
|
||||
const this = bun.cast(*FetchTasklet, ctx);
|
||||
if (this.http) |http_| {
|
||||
http_.enableBodyStreaming();
|
||||
if (this.pending_request) |request| {
|
||||
request.enableBodyStreaming();
|
||||
}
|
||||
if (this.signal_store.aborted.load(.monotonic)) {
|
||||
if (this.isAborted()) {
|
||||
return JSC.WebCore.DrainResult{
|
||||
.aborted = {},
|
||||
};
|
||||
@@ -1401,9 +1395,8 @@ pub const Fetch = struct {
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
// This means we have received part of the body but not the whole thing
|
||||
if (scheduled_response_buffer.items.len > 0) {
|
||||
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = this.memory_reporter.allocator(),
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
@@ -1449,14 +1442,13 @@ pub const Fetch = struct {
|
||||
}
|
||||
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
|
||||
const response = Body.Value{
|
||||
.InternalBlob = .{
|
||||
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
|
||||
},
|
||||
};
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = this.memory_reporter.allocator(),
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
@@ -1489,10 +1481,12 @@ pub const Fetch = struct {
|
||||
|
||||
fn ignoreRemainingResponseBody(this: *FetchTasklet) void {
|
||||
log("ignoreRemainingResponseBody", .{});
|
||||
// enabling streaming will make the http thread to drain into the main thread (aka stop buffering)
|
||||
// without a stream ref, response body or response instance alive it will just ignore the result
|
||||
if (this.http) |http_| {
|
||||
http_.enableBodyStreaming();
|
||||
|
||||
if (this.pending_request) |request| {
|
||||
// enabling streaming will make the http thread to drain into the main thread (aka stop buffering)
|
||||
// without a stream ref, response body or response instance alive it will just ignore the result
|
||||
// TODO: use ignoreRemainingResponseBody() instead of enableBodyStreaming()
|
||||
request.enableBodyStreaming();
|
||||
}
|
||||
// we should not keep the process alive if we are ignoring the body
|
||||
const vm = this.global_this.bunVM();
|
||||
@@ -1558,26 +1552,24 @@ pub const Fetch = struct {
|
||||
fetch_options: FetchOptions,
|
||||
promise: JSC.JSPromise.Strong,
|
||||
) !*FetchTasklet {
|
||||
_ = allocator; // autofix
|
||||
var jsc_vm = globalThis.bunVM();
|
||||
var fetch_tasklet = try allocator.create(FetchTasklet);
|
||||
|
||||
fetch_tasklet.* = .{
|
||||
var fetch_tasklet = FetchTasklet.new(.{
|
||||
.mutex = Mutex.init(),
|
||||
.scheduled_response_buffer = .{
|
||||
.allocator = fetch_options.memory_reporter.allocator(),
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
},
|
||||
.response_buffer = MutableString{
|
||||
.allocator = fetch_options.memory_reporter.allocator(),
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
},
|
||||
.http = try allocator.create(http.AsyncHTTP),
|
||||
.javascript_vm = jsc_vm,
|
||||
.request_body = fetch_options.body,
|
||||
.global_this = globalThis,
|
||||
@@ -1587,12 +1579,9 @@ pub const Fetch = struct {
|
||||
.signal = fetch_options.signal,
|
||||
.hostname = fetch_options.hostname,
|
||||
.tracker = JSC.AsyncTaskTracker.init(jsc_vm),
|
||||
.memory_reporter = fetch_options.memory_reporter,
|
||||
.check_server_identity = fetch_options.check_server_identity,
|
||||
.reject_unauthorized = fetch_options.reject_unauthorized,
|
||||
};
|
||||
|
||||
fetch_tasklet.signals = fetch_tasklet.signal_store.to();
|
||||
});
|
||||
|
||||
fetch_tasklet.tracker.didSchedule(globalThis);
|
||||
|
||||
@@ -1609,54 +1598,45 @@ pub const Fetch = struct {
|
||||
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
|
||||
}
|
||||
|
||||
if (fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized) {
|
||||
fetch_tasklet.signal_store.cert_errors.store(true, .monotonic);
|
||||
} else {
|
||||
fetch_tasklet.signals.cert_errors = null;
|
||||
}
|
||||
const flags = http.Signals.Flags{
|
||||
.cert_errors = fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized,
|
||||
.header_progress = true,
|
||||
};
|
||||
|
||||
fetch_tasklet.http.?.* = http.AsyncHTTP.init(
|
||||
fetch_options.memory_reporter.allocator(),
|
||||
fetch_options.method,
|
||||
fetch_options.url,
|
||||
fetch_options.headers.entries,
|
||||
fetch_options.headers.buf.items,
|
||||
&fetch_tasklet.response_buffer,
|
||||
fetch_tasklet.request_body.slice(),
|
||||
http.HTTPClientResult.Callback.New(
|
||||
*FetchTasklet,
|
||||
FetchTasklet.callback,
|
||||
).init(fetch_tasklet),
|
||||
fetch_options.redirect_type,
|
||||
.{
|
||||
fetch_tasklet.pending_request = http.fetch(
|
||||
&.{
|
||||
.method = fetch_options.method,
|
||||
.url = fetch_options.url,
|
||||
.redirect_type = fetch_options.redirect_type,
|
||||
.headers = fetch_options.headers.entries,
|
||||
.headers_buf = fetch_options.headers.buf.items,
|
||||
.flags = flags,
|
||||
.request_body = brk: {
|
||||
if (fetch_tasklet.request_body == .Sendfile) {
|
||||
bun.assert(fetch_options.url.isHTTP());
|
||||
bun.assert(fetch_options.proxy == null);
|
||||
break :brk .{ .sendfile = fetch_tasklet.request_body.Sendfile };
|
||||
}
|
||||
|
||||
break :brk .{ .bytes = fetch_tasklet.request_body.slice() };
|
||||
},
|
||||
.http_proxy = proxy,
|
||||
.hostname = fetch_options.hostname,
|
||||
.signals = fetch_tasklet.signals,
|
||||
.unix_socket_path = fetch_options.unix_socket_path,
|
||||
.disable_timeout = fetch_options.disable_timeout,
|
||||
.disable_keepalive = fetch_options.disable_keepalive,
|
||||
.disable_decompression = fetch_options.disable_decompression,
|
||||
.reject_unauthorized = fetch_options.reject_unauthorized,
|
||||
.verbose = fetch_options.verbose,
|
||||
.tls_props = fetch_options.ssl_config,
|
||||
.tls_config = fetch_options.ssl_config,
|
||||
},
|
||||
|
||||
http.HTTPClientResult.Callback.New(
|
||||
*FetchTasklet,
|
||||
FetchTasklet.callback,
|
||||
).init(fetch_tasklet),
|
||||
);
|
||||
|
||||
// TODO is this necessary? the http client already sets the redirect type,
|
||||
// so manually setting it here seems redundant
|
||||
if (fetch_options.redirect_type != FetchRedirect.follow) {
|
||||
fetch_tasklet.http.?.client.remaining_redirect_count = 0;
|
||||
}
|
||||
|
||||
// we want to return after headers are received
|
||||
fetch_tasklet.signal_store.header_progress.store(true, .monotonic);
|
||||
|
||||
if (fetch_tasklet.request_body == .Sendfile) {
|
||||
bun.assert(fetch_options.url.isHTTP());
|
||||
bun.assert(fetch_options.proxy == null);
|
||||
fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile };
|
||||
}
|
||||
|
||||
if (fetch_tasklet.signal) |signal| {
|
||||
fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener);
|
||||
}
|
||||
@@ -1668,11 +1648,9 @@ pub const Fetch = struct {
|
||||
reason.ensureStillAlive();
|
||||
this.abort_reason = reason;
|
||||
reason.protect();
|
||||
this.signal_store.aborted.store(true, .monotonic);
|
||||
this.tracker.didCancel(this.global_this);
|
||||
|
||||
if (this.http != null) {
|
||||
http.http_thread.scheduleShutdown(this.http.?);
|
||||
if (this.pending_request) |request| {
|
||||
request.abort();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1693,7 +1671,6 @@ pub const Fetch = struct {
|
||||
globalThis: ?*JSGlobalObject,
|
||||
// Custom Hostname
|
||||
hostname: ?[]u8 = null,
|
||||
memory_reporter: *JSC.MemoryReportingAllocator,
|
||||
check_server_identity: JSC.Strong = .{},
|
||||
unix_socket_path: ZigString.Slice,
|
||||
ssl_config: ?*SSLConfig = null,
|
||||
@@ -1713,29 +1690,23 @@ pub const Fetch = struct {
|
||||
promise,
|
||||
);
|
||||
|
||||
var batch = bun.ThreadPool.Batch{};
|
||||
node.http.?.schedule(allocator, &batch);
|
||||
node.poll_ref.ref(global.bunVM());
|
||||
|
||||
http.http_thread.schedule(batch);
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void {
|
||||
pub fn callback(task: *FetchTasklet, pending_request: *http.PendingHTTPRequest, result: http.HTTPClientResult) void {
|
||||
task.ref();
|
||||
|
||||
task.mutex.lock();
|
||||
defer task.mutex.unlock();
|
||||
|
||||
task.http.?.* = async_http.*;
|
||||
task.http.?.response_buffer = async_http.response_buffer;
|
||||
|
||||
log("callback success {} has_more {} bytes {}", .{ result.isSuccess(), result.has_more, result.body.?.list.items.len });
|
||||
|
||||
const prev_metadata = task.result.metadata;
|
||||
const prev_cert_info = task.result.certificate_info;
|
||||
task.result = result;
|
||||
pending_request.result.metadata = null;
|
||||
|
||||
// Preserve pending certificate info if it was preovided in the previous update.
|
||||
if (task.result.certificate_info == null) {
|
||||
@@ -1765,7 +1736,7 @@ pub const Fetch = struct {
|
||||
if (task.scheduled_response_buffer.list.capacity > 0) {
|
||||
task.scheduled_response_buffer.deinit();
|
||||
task.scheduled_response_buffer = .{
|
||||
.allocator = task.memory_reporter.allocator(),
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
@@ -1810,7 +1781,7 @@ pub const Fetch = struct {
|
||||
var blob = Blob.init(data, allocator, globalThis);
|
||||
|
||||
var allocated = false;
|
||||
const mime_type = bun.http.MimeType.init(data_url.mime_type, allocator, &allocated);
|
||||
const mime_type = MimeType.init(data_url.mime_type, allocator, &allocated);
|
||||
blob.content_type = mime_type.value;
|
||||
if (allocated) {
|
||||
blob.content_type_allocated = true;
|
||||
@@ -1882,7 +1853,7 @@ pub const Fetch = struct {
|
||||
return .zero;
|
||||
}
|
||||
|
||||
bun.http.AsyncHTTP.preconnect(url, true);
|
||||
bun.http.preconnect(url, true);
|
||||
return .undefined;
|
||||
}
|
||||
|
||||
@@ -1910,7 +1881,7 @@ pub const Fetch = struct {
|
||||
|
||||
memory_reporter.report(globalThis.vm());
|
||||
|
||||
if (is_error) bun.default_allocator.destroy(memory_reporter);
|
||||
bun.default_allocator.destroy(memory_reporter);
|
||||
}
|
||||
|
||||
if (arguments.len == 0) {
|
||||
@@ -2697,7 +2668,7 @@ pub const Fetch = struct {
|
||||
.result => |fd| fd,
|
||||
};
|
||||
|
||||
if (proxy == null and bun.http.Sendfile.isEligible(url)) {
|
||||
if (proxy == null and Sendfile.isEligible(url)) {
|
||||
use_sendfile: {
|
||||
const stat: bun.Stat = switch (bun.sys.fstat(opened_fd)) {
|
||||
.result => |result| result,
|
||||
@@ -2788,13 +2759,13 @@ pub const Fetch = struct {
|
||||
const promise_val = promise.value();
|
||||
|
||||
_ = FetchTasklet.queue(
|
||||
allocator,
|
||||
bun.default_allocator,
|
||||
globalThis,
|
||||
.{
|
||||
.method = method,
|
||||
.url = url,
|
||||
.headers = headers orelse Headers{
|
||||
.allocator = allocator,
|
||||
.allocator = bun.default_allocator,
|
||||
},
|
||||
.body = http_body,
|
||||
.disable_keepalive = disable_keepalive,
|
||||
@@ -2809,7 +2780,6 @@ pub const Fetch = struct {
|
||||
.globalThis = globalThis,
|
||||
.ssl_config = ssl_config,
|
||||
.hostname = hostname,
|
||||
.memory_reporter = memory_reporter,
|
||||
.check_server_identity = if (check_server_identity.isEmptyOrUndefinedOrNull()) .{} else JSC.Strong.create(check_server_identity, globalThis),
|
||||
.unix_socket_path = unix_socket_path,
|
||||
},
|
||||
|
||||
@@ -28,7 +28,6 @@ const bundler = bun.bundler;
|
||||
const DotEnv = @import("env_loader.zig");
|
||||
const which = @import("which.zig").which;
|
||||
const JSC = bun.JSC;
|
||||
const AsyncHTTP = bun.http.AsyncHTTP;
|
||||
const Arena = @import("./mimalloc_arena.zig").Arena;
|
||||
|
||||
const OpaqueWrap = JSC.OpaqueWrap;
|
||||
@@ -112,7 +111,7 @@ pub const Run = struct {
|
||||
failWithBuildError(vm);
|
||||
};
|
||||
|
||||
AsyncHTTP.loadEnv(vm.allocator, vm.log, b.env);
|
||||
bun.http.loadEnv(vm.allocator, vm.log, b.env);
|
||||
|
||||
vm.loadExtraEnv();
|
||||
vm.is_main_thread = true;
|
||||
@@ -146,7 +145,7 @@ pub const Run = struct {
|
||||
Global.exit(1);
|
||||
}
|
||||
|
||||
AsyncHTTP.preconnect(url, false);
|
||||
bun.http.preconnect(url, false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -255,7 +254,7 @@ pub const Run = struct {
|
||||
failWithBuildError(vm);
|
||||
};
|
||||
|
||||
AsyncHTTP.loadEnv(vm.allocator, vm.log, b.env);
|
||||
bun.http.loadEnv(vm.allocator, vm.log, b.env);
|
||||
|
||||
vm.loadExtraEnv();
|
||||
vm.is_main_thread = true;
|
||||
|
||||
@@ -1967,23 +1967,17 @@ pub const Example = struct {
|
||||
mutable.* = try MutableString.init(ctx.allocator, 8192);
|
||||
|
||||
// ensure very stable memory address
|
||||
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
|
||||
async_http.* = HTTP.AsyncHTTP.initSync(
|
||||
ctx.allocator,
|
||||
.GET,
|
||||
api_url,
|
||||
header_entries,
|
||||
headers_buf,
|
||||
mutable,
|
||||
"",
|
||||
http_proxy,
|
||||
null,
|
||||
HTTP.FetchRedirect.follow,
|
||||
);
|
||||
async_http.client.progress_node = progress;
|
||||
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
|
||||
|
||||
const response = try async_http.sendSync(true);
|
||||
const response = try bun.http.fetchSync(&.{
|
||||
.allocator = ctx.allocator,
|
||||
.method = .GET,
|
||||
.url = api_url,
|
||||
.headers = header_entries,
|
||||
.headers_buf = headers_buf,
|
||||
.progress_node = progress,
|
||||
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
|
||||
.http_proxy = http_proxy,
|
||||
.redirect_type = .follow,
|
||||
}, mutable);
|
||||
|
||||
switch (response.status_code) {
|
||||
404 => return error.GitHubRepositoryNotFound,
|
||||
@@ -2043,24 +2037,17 @@ pub const Example = struct {
|
||||
|
||||
var http_proxy: ?URL = env_loader.getHttpProxy(url);
|
||||
|
||||
// ensure very stable memory address
|
||||
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
|
||||
async_http.* = HTTP.AsyncHTTP.initSync(
|
||||
ctx.allocator,
|
||||
.GET,
|
||||
url,
|
||||
.{},
|
||||
"",
|
||||
mutable,
|
||||
"",
|
||||
http_proxy,
|
||||
null,
|
||||
HTTP.FetchRedirect.follow,
|
||||
);
|
||||
async_http.client.progress_node = progress;
|
||||
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
|
||||
|
||||
var response = try async_http.sendSync(true);
|
||||
var response = try bun.http.fetchSync(&.{
|
||||
.allocator = ctx.allocator,
|
||||
.method = .GET,
|
||||
.url = url,
|
||||
.headers = .{},
|
||||
.headers_buf = "",
|
||||
.progress_node = progress,
|
||||
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
|
||||
.http_proxy = http_proxy,
|
||||
.redirect_type = .follow,
|
||||
}, mutable);
|
||||
|
||||
switch (response.status_code) {
|
||||
404 => return error.ExampleNotFound,
|
||||
@@ -2134,24 +2121,19 @@ pub const Example = struct {
|
||||
|
||||
http_proxy = env_loader.getHttpProxy(parsed_tarball_url);
|
||||
|
||||
async_http.* = HTTP.AsyncHTTP.initSync(
|
||||
ctx.allocator,
|
||||
.GET,
|
||||
parsed_tarball_url,
|
||||
.{},
|
||||
"",
|
||||
mutable,
|
||||
"",
|
||||
http_proxy,
|
||||
null,
|
||||
HTTP.FetchRedirect.follow,
|
||||
);
|
||||
async_http.client.progress_node = progress;
|
||||
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
|
||||
|
||||
refresher.maybeRefresh();
|
||||
|
||||
response = try async_http.sendSync(true);
|
||||
response = try bun.http.fetchSync(&.{
|
||||
.allocator = ctx.allocator,
|
||||
.method = .GET,
|
||||
.url = parsed_tarball_url,
|
||||
.headers = .{},
|
||||
.headers_buf = "",
|
||||
.progress_node = progress,
|
||||
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
|
||||
.http_proxy = http_proxy,
|
||||
.redirect_type = .follow,
|
||||
}, mutable);
|
||||
|
||||
refresher.maybeRefresh();
|
||||
|
||||
@@ -2172,29 +2154,20 @@ pub const Example = struct {
|
||||
|
||||
const http_proxy: ?URL = env_loader.getHttpProxy(url);
|
||||
|
||||
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
|
||||
const mutable = try ctx.allocator.create(MutableString);
|
||||
mutable.* = try MutableString.init(ctx.allocator, 2048);
|
||||
|
||||
async_http.* = HTTP.AsyncHTTP.initSync(
|
||||
ctx.allocator,
|
||||
.GET,
|
||||
url,
|
||||
.{},
|
||||
"",
|
||||
mutable,
|
||||
"",
|
||||
http_proxy,
|
||||
null,
|
||||
HTTP.FetchRedirect.follow,
|
||||
);
|
||||
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
|
||||
|
||||
if (Output.enable_ansi_colors) {
|
||||
async_http.client.progress_node = progress_node;
|
||||
}
|
||||
|
||||
const response = async_http.sendSync(true) catch |err| {
|
||||
const response = bun.http.fetchSync(&.{
|
||||
.allocator = ctx.allocator,
|
||||
.method = .GET,
|
||||
.url = url,
|
||||
.headers = .{},
|
||||
.headers_buf = "",
|
||||
.progress_node = progress_node,
|
||||
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
|
||||
.http_proxy = http_proxy,
|
||||
.redirect_type = .follow,
|
||||
}, mutable) catch |err| {
|
||||
switch (err) {
|
||||
error.WouldBlock => {
|
||||
Output.prettyErrorln("Request timed out while trying to fetch examples list. Please try again", .{});
|
||||
|
||||
@@ -234,24 +234,20 @@ pub const UpgradeCommand = struct {
|
||||
|
||||
var metadata_body = try MutableString.init(allocator, 2048);
|
||||
|
||||
// ensure very stable memory address
|
||||
var async_http: *HTTP.AsyncHTTP = try allocator.create(HTTP.AsyncHTTP);
|
||||
async_http.* = HTTP.AsyncHTTP.initSync(
|
||||
allocator,
|
||||
.GET,
|
||||
api_url,
|
||||
header_entries,
|
||||
headers_buf,
|
||||
const response = try bun.http.fetchSync(
|
||||
&.{
|
||||
.allocator = allocator,
|
||||
.progress_node = progress,
|
||||
.headers = header_entries,
|
||||
.headers_buf = headers_buf,
|
||||
.http_proxy = http_proxy,
|
||||
.url = api_url,
|
||||
.method = .GET,
|
||||
.redirect_type = .follow,
|
||||
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
|
||||
},
|
||||
&metadata_body,
|
||||
"",
|
||||
http_proxy,
|
||||
null,
|
||||
HTTP.FetchRedirect.follow,
|
||||
);
|
||||
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
|
||||
|
||||
if (!silent) async_http.client.progress_node = progress.?;
|
||||
const response = try async_http.sendSync(true);
|
||||
|
||||
switch (response.status_code) {
|
||||
404 => return error.HTTP404,
|
||||
@@ -514,26 +510,20 @@ pub const UpgradeCommand = struct {
|
||||
var refresher = Progress{};
|
||||
var progress = refresher.start("Downloading", version.size);
|
||||
refresher.refresh();
|
||||
var async_http = try ctx.allocator.create(HTTP.AsyncHTTP);
|
||||
var zip_file_buffer = try ctx.allocator.create(MutableString);
|
||||
zip_file_buffer.* = try MutableString.init(ctx.allocator, @max(version.size, 1024));
|
||||
|
||||
async_http.* = HTTP.AsyncHTTP.initSync(
|
||||
ctx.allocator,
|
||||
.GET,
|
||||
zip_url,
|
||||
.{},
|
||||
"",
|
||||
zip_file_buffer,
|
||||
"",
|
||||
http_proxy,
|
||||
null,
|
||||
HTTP.FetchRedirect.follow,
|
||||
);
|
||||
async_http.client.progress_node = progress;
|
||||
async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized();
|
||||
|
||||
const response = try async_http.sendSync(true);
|
||||
const response = try bun.http.fetchSync(&.{
|
||||
.allocator = ctx.allocator,
|
||||
.progress_node = progress,
|
||||
.headers = .{},
|
||||
.headers_buf = "",
|
||||
.http_proxy = http_proxy,
|
||||
.url = zip_url,
|
||||
.method = .GET,
|
||||
.redirect_type = .follow,
|
||||
.reject_unauthorized = env_loader.getTLSRejectUnauthorized(),
|
||||
}, zip_file_buffer);
|
||||
|
||||
switch (response.status_code) {
|
||||
404 => {
|
||||
|
||||
@@ -143,8 +143,6 @@ pub fn downloadToPath(this: *const CompileTarget, env: *bun.DotEnv.Loader, alloc
|
||||
{
|
||||
refresher.refresh();
|
||||
|
||||
// TODO: This is way too much code necessary to send a single HTTP request...
|
||||
var async_http = try allocator.create(HTTP.AsyncHTTP);
|
||||
var compressed_archive_bytes = try allocator.create(MutableString);
|
||||
compressed_archive_bytes.* = try MutableString.init(allocator, 24 * 1024 * 1024);
|
||||
var url_buffer: [2048]u8 = undefined;
|
||||
@@ -155,22 +153,15 @@ pub fn downloadToPath(this: *const CompileTarget, env: *bun.DotEnv.Loader, alloc
|
||||
defer progress.end();
|
||||
const http_proxy: ?bun.URL = env.getHttpProxy(url);
|
||||
|
||||
async_http.* = HTTP.AsyncHTTP.initSync(
|
||||
allocator,
|
||||
.GET,
|
||||
url,
|
||||
.{},
|
||||
"",
|
||||
compressed_archive_bytes,
|
||||
"",
|
||||
http_proxy,
|
||||
null,
|
||||
HTTP.FetchRedirect.follow,
|
||||
);
|
||||
async_http.client.progress_node = progress;
|
||||
async_http.client.reject_unauthorized = env.getTLSRejectUnauthorized();
|
||||
|
||||
const response = try async_http.sendSync(true);
|
||||
const response = try bun.http.fetchSync(&.{
|
||||
.method = .GET,
|
||||
.url = url,
|
||||
.headers = .{},
|
||||
.headers_buf = "",
|
||||
.http_proxy = http_proxy,
|
||||
.progress_node = progress,
|
||||
.reject_unauthorized = env.getTLSRejectUnauthorized(),
|
||||
}, compressed_archive_bytes);
|
||||
|
||||
switch (response.status_code) {
|
||||
404 => {
|
||||
|
||||
@@ -138,6 +138,17 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type {
|
||||
socket: InternalSocket,
|
||||
const ThisSocket = @This();
|
||||
|
||||
pub fn isSSL() bool {
|
||||
return is_ssl;
|
||||
}
|
||||
|
||||
pub fn toAnySocket(this: ThisSocket) uws.AnySocket {
|
||||
return switch (is_ssl) {
|
||||
true => uws.AnySocket{ .SocketTLS = this },
|
||||
false => uws.AnySocket{ .SocketTCP = this },
|
||||
};
|
||||
}
|
||||
|
||||
pub fn verifyError(this: ThisSocket) us_bun_verify_error_t {
|
||||
const socket = this.socket.get() orelse return std.mem.zeroes(us_bun_verify_error_t);
|
||||
const ssl_error: us_bun_verify_error_t = uws.us_socket_verify_error(comptime ssl_int, socket);
|
||||
|
||||
678
src/http.zig
678
src/http.zig
@@ -47,9 +47,6 @@ const Batch = bun.ThreadPool.Batch;
|
||||
const TaggedPointerUnion = @import("./tagged_pointer.zig").TaggedPointerUnion;
|
||||
const DeadSocket = opaque {};
|
||||
var dead_socket = @as(*DeadSocket, @ptrFromInt(1));
|
||||
//TODO: this needs to be freed when Worker Threads are implemented
|
||||
var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSocket).init(bun.default_allocator);
|
||||
var async_http_id: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
|
||||
const MAX_REDIRECT_URL_LENGTH = 128 * 1024;
|
||||
var custom_ssl_context_map = std.AutoArrayHashMap(*SSLConfig, *NewHTTPContext(true)).init(bun.default_allocator);
|
||||
|
||||
@@ -65,35 +62,245 @@ var shared_response_headers_buf: [256]picohttp.Header = undefined;
|
||||
|
||||
const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
|
||||
|
||||
pub const Signals = struct {
|
||||
header_progress: ?*std.atomic.Value(bool) = null,
|
||||
body_streaming: ?*std.atomic.Value(bool) = null,
|
||||
aborted: ?*std.atomic.Value(bool) = null,
|
||||
cert_errors: ?*std.atomic.Value(bool) = null,
|
||||
const AbortTracker = struct {
|
||||
map: std.AutoArrayHashMap(u32, uws.AnySocket) = std.AutoArrayHashMap(u32, uws.AnySocket).init(bun.default_allocator),
|
||||
last_id: std.atomic.Value(usize) = std.atomic.Value(usize).init(1),
|
||||
|
||||
pub fn isEmpty(this: *const Signals) bool {
|
||||
return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null;
|
||||
pub fn nextId(this: *AbortTracker) u32 {
|
||||
// The max size is u32
|
||||
// We want this number to never be 0, as we use 0 to indicate no id.
|
||||
const id: u32 = @intCast((this.last_id.fetchAdd(1, .monotonic) % (std.math.maxInt(u32) - 1)) + 1);
|
||||
bun.debugAssert(id > 0);
|
||||
return id;
|
||||
}
|
||||
|
||||
pub const Store = struct {
|
||||
header_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
body_streaming: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
aborted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
cert_errors: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
pub fn put(this: *AbortTracker, id: u32, socket: uws.AnySocket) void {
|
||||
if (id > 0) {
|
||||
this.map.put(id, socket) catch bun.outOfMemory();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to(this: *Store) Signals {
|
||||
return .{
|
||||
.header_progress = &this.header_progress,
|
||||
.body_streaming = &this.body_streaming,
|
||||
.aborted = &this.aborted,
|
||||
.cert_errors = &this.cert_errors,
|
||||
};
|
||||
pub fn remove(this: *AbortTracker, id: u32) void {
|
||||
if (id > 0) {
|
||||
_ = this.map.swapRemove(id);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fetchRemove(this: *AbortTracker, id: u32) ?uws.AnySocket {
|
||||
if (id > 0) {
|
||||
if (this.map.fetchSwapRemove(id)) |val| {
|
||||
return val.value;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn garbageCollect(this: *AbortTracker) void {
|
||||
if (this.map.capacity() > 10_000 and this.map.count() < 1000) {
|
||||
this.map.shrinkAndFree(this.map.count());
|
||||
}
|
||||
}
|
||||
};
|
||||
var abort_tracker: AbortTracker = .{};
|
||||
|
||||
pub const HTTPClientRequest = struct {
|
||||
method: Method = .GET,
|
||||
url: URL = .{},
|
||||
headers: Headers.Entries = .{},
|
||||
headers_buf: string = "",
|
||||
request_body: HTTPRequestBody = .{ .bytes = "" },
|
||||
redirect_type: FetchRedirect = .follow,
|
||||
http_proxy: ?URL = null,
|
||||
hostname: ?[]u8 = null,
|
||||
signals: ?Signals = null,
|
||||
unix_socket_path: ?JSC.ZigString.Slice = null,
|
||||
disable_timeout: ?bool = null,
|
||||
verbose: ?HTTPVerboseLevel = null,
|
||||
disable_keepalive: ?bool = null,
|
||||
disable_decompression: ?bool = null,
|
||||
tls_config: ?*SSLConfig = null,
|
||||
receives_updates: bool = false,
|
||||
reject_unauthorized: ?bool = null,
|
||||
allocator: std.mem.Allocator = bun.default_allocator,
|
||||
flags: Signals.Flags = .{},
|
||||
progress_node: ?*Progress.Node = null,
|
||||
};
|
||||
|
||||
pub fn fetchBatched(this: *const HTTPClientRequest, callback: HTTPClientResult.Callback, task: ?*?*ThreadPool.Task) *PendingHTTPRequest {
|
||||
var pending_http = PendingHTTPRequest.new(.{
|
||||
.signals_store = .{},
|
||||
.signals = undefined,
|
||||
.result = .{},
|
||||
.token = if (this.receives_updates) abort_tracker.nextId() else 0,
|
||||
});
|
||||
pending_http.signals_store.flags.raw = @bitCast(this.flags);
|
||||
pending_http.signals = pending_http.signals_store.signals();
|
||||
const _http = AsyncHTTP.init(this, pending_http, callback);
|
||||
if (task) |task_| {
|
||||
task_.* = &_http.task;
|
||||
pending_http.ref();
|
||||
} else {
|
||||
var empty_batch: ThreadPool.Batch = .{};
|
||||
_http.schedule(&empty_batch);
|
||||
http_thread.schedule(empty_batch);
|
||||
}
|
||||
|
||||
return pending_http;
|
||||
}
|
||||
|
||||
pub inline fn fetch(this: *const HTTPClientRequest, callback: HTTPClientResult.Callback) *PendingHTTPRequest {
|
||||
return fetchBatched(this, callback, null);
|
||||
}
|
||||
|
||||
pub fn fetchSync(this: *const HTTPClientRequest, body: *MutableString) !picohttp.Response {
|
||||
// This leaks PendingHTTPRequest, but we always shutdown the whole program after this so it's fine.
|
||||
var pending_http = PendingHTTPRequest.new(.{
|
||||
.signals_store = .{},
|
||||
.signals = undefined,
|
||||
.result = .{},
|
||||
.token = 0,
|
||||
});
|
||||
pending_http.signals_store.flags.raw = @bitCast(this.flags);
|
||||
pending_http.response_body = body.*;
|
||||
pending_http.signals.flags = &pending_http.signals_store.flags;
|
||||
var _http = AsyncHTTP.init(this, pending_http, undefined);
|
||||
const response = try _http.sendSync();
|
||||
body.* = pending_http.response_body;
|
||||
return response;
|
||||
}
|
||||
|
||||
pub fn preconnect(
|
||||
url: URL,
|
||||
is_url_owned: bool,
|
||||
) void {
|
||||
if (!FeatureFlags.is_fetch_preconnect_supported) {
|
||||
if (is_url_owned) {
|
||||
bun.default_allocator.free(url.href);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
var this = Preconnect.new(.{
|
||||
.pending_http_request = PendingHTTPRequest.new(.{
|
||||
.signals_store = .{},
|
||||
.signals = undefined,
|
||||
.result = .{},
|
||||
.token = 0,
|
||||
.response_body = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
}),
|
||||
.response_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
.url = url,
|
||||
.is_url_owned = is_url_owned,
|
||||
});
|
||||
this.pending_http_request.signals = this.pending_http_request.signals_store.signals();
|
||||
var http_ = AsyncHTTP.init(
|
||||
&.{
|
||||
.url = url,
|
||||
.method = .GET,
|
||||
},
|
||||
this.pending_http_request,
|
||||
HTTPClientResult.Callback.New(*Preconnect, Preconnect.onResult).init(this),
|
||||
);
|
||||
http_.client.is_preconnect_only = true;
|
||||
http_thread.schedule(Batch.from(&http_.task));
|
||||
}
|
||||
|
||||
pub const PendingHTTPRequest = struct {
|
||||
response_body: MutableString = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{},
|
||||
},
|
||||
signals_store: Signals.Store = .{},
|
||||
signals: Signals,
|
||||
result: HTTPClientResult = .{
|
||||
.has_more = true,
|
||||
},
|
||||
token: u32 = 0,
|
||||
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
|
||||
elapsed: u64 = 0,
|
||||
|
||||
pub fn isAborted(this: *const PendingHTTPRequest) bool {
|
||||
return this.signals.get(.aborted);
|
||||
}
|
||||
|
||||
pub fn err(this: *const PendingHTTPRequest) ?anyerror {
|
||||
return this.result.fail;
|
||||
}
|
||||
|
||||
pub fn response(this: *const PendingHTTPRequest) ?picohttp.Response {
|
||||
if (this.result.metadata) |*metadata| {
|
||||
return metadata.response;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
pub usingnamespace bun.NewThreadSafeRefCounted(@This(), @This().deinit);
|
||||
|
||||
pub fn abort(this: *PendingHTTPRequest) void {
|
||||
const state = this.signals.fetchSet(.aborted, true);
|
||||
if (!state.aborted and !state.done and this.token > 0) {
|
||||
http_thread.scheduleShutdown(this.token);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enableBodyStreaming(this: *PendingHTTPRequest) void {
|
||||
this.signals.set(.body_streaming, true);
|
||||
}
|
||||
|
||||
pub fn ignoreRemainingResponseBody(this: *PendingHTTPRequest) void {
|
||||
this.signals.set(.ignore_remaining_response_body, true);
|
||||
}
|
||||
|
||||
fn deinit(this: *PendingHTTPRequest) void {
|
||||
if (this.result.metadata) |*metadata| {
|
||||
metadata.deinit(bun.default_allocator);
|
||||
}
|
||||
|
||||
this.response_body.deinit();
|
||||
|
||||
this.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
pub const Signals = struct {
|
||||
flags: *std.atomic.Value(u8),
|
||||
|
||||
pub const Flags = packed struct(u8) {
|
||||
_padding: u2 = 0,
|
||||
header_progress: bool = false,
|
||||
body_streaming: bool = false,
|
||||
aborted: bool = false,
|
||||
cert_errors: bool = false,
|
||||
done: bool = false,
|
||||
ignore_remaining_response_body: bool = false,
|
||||
};
|
||||
|
||||
pub const Store = struct {
|
||||
flags: std.atomic.Value(u8) = .{ .raw = 0 },
|
||||
|
||||
pub fn signals(this: *@This()) Signals {
|
||||
return Signals{ .flags = &this.flags };
|
||||
}
|
||||
};
|
||||
|
||||
pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals)) bool {
|
||||
var ptr: *std.atomic.Value(bool) = @field(this, @tagName(field)) orelse return false;
|
||||
return ptr.load(.monotonic);
|
||||
pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals.Flags)) bool {
|
||||
const flags: Flags = @bitCast(this.flags.load(.monotonic));
|
||||
return @field(flags, @tagName(field));
|
||||
}
|
||||
|
||||
pub fn fetchSet(this: Signals, comptime field: std.meta.FieldEnum(Signals.Flags), value: bool) Flags {
|
||||
const fieldIndex: u8 = std.meta.fieldIndex(Flags, @tagName(field)) orelse @compileError(std.fmt.comptimePrint("Field \"{s}\" not found", .{@tagName(field)}));
|
||||
if (value) {
|
||||
return @bitCast(this.flags.fetchOr(@as(u8, 1) << fieldIndex, .monotonic));
|
||||
} else {
|
||||
return @bitCast(this.flags.fetchAnd(~(@as(u8, 1) << fieldIndex), .monotonic));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(this: Signals, comptime field: std.meta.FieldEnum(Signals.Flags), value: bool) void {
|
||||
_ = this.fetchSet(field, value);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -355,7 +562,7 @@ fn NewHTTPContext(comptime ssl: bool) type {
|
||||
socket.close(.normal);
|
||||
}
|
||||
|
||||
fn getTagged(ptr: *anyopaque) ActiveSocket {
|
||||
pub fn getTagged(ptr: *anyopaque) ActiveSocket {
|
||||
return ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
|
||||
}
|
||||
|
||||
@@ -771,7 +978,7 @@ pub const HTTPThread = struct {
|
||||
|
||||
queued_tasks: Queue = Queue{},
|
||||
|
||||
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
|
||||
queued_shutdowns: std.ArrayListUnmanaged(u32) = std.ArrayListUnmanaged(u32){},
|
||||
queued_shutdowns_lock: bun.Lock = bun.Lock.init(),
|
||||
|
||||
has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
@@ -779,11 +986,6 @@ pub const HTTPThread = struct {
|
||||
|
||||
lazy_libdeflater: ?*LibdeflateState = null,
|
||||
|
||||
const ShutdownMessage = struct {
|
||||
async_http_id: u32,
|
||||
is_tls: bool,
|
||||
};
|
||||
|
||||
const threadlog = Output.scoped(.HTTPThread, true);
|
||||
|
||||
pub const LibdeflateState = struct {
|
||||
@@ -897,45 +1099,45 @@ pub const HTTPThread = struct {
|
||||
{
|
||||
this.queued_shutdowns_lock.lock();
|
||||
defer this.queued_shutdowns_lock.unlock();
|
||||
for (this.queued_shutdowns.items) |http| {
|
||||
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
|
||||
if (http.is_tls) {
|
||||
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
|
||||
socket.shutdown();
|
||||
socket.shutdownRead();
|
||||
} else {
|
||||
const socket = uws.SocketTCP.fromAny(socket_ptr.value);
|
||||
socket.shutdown();
|
||||
socket.shutdownRead();
|
||||
for (this.queued_shutdowns.items) |token| {
|
||||
if (abort_tracker.fetchRemove(token)) |any_socket| {
|
||||
switch (any_socket) {
|
||||
inline .SocketTCP, .SocketTLS => |socket| {
|
||||
const tagged = NewHTTPContext(comptime @TypeOf(socket).isSSL()).getTagged(socket.ext(anyopaque));
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
client.allow_retry = false;
|
||||
}
|
||||
|
||||
socket.close(.failure);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
this.queued_shutdowns.clearRetainingCapacity();
|
||||
}
|
||||
|
||||
var count: usize = 0;
|
||||
var active = AsyncHTTP.active_requests_count.load(.monotonic);
|
||||
const max = AsyncHTTP.max_simultaneous_requests.load(.monotonic);
|
||||
if (active >= max) return;
|
||||
defer {
|
||||
if (comptime Environment.allow_assert) {
|
||||
if (count > 0)
|
||||
log("Processed {d} tasks\n", .{count});
|
||||
if (this.queued_shutdowns.capacity > 1024) {
|
||||
this.queued_shutdowns.shrinkAndFree(bun.default_allocator, 10);
|
||||
}
|
||||
}
|
||||
|
||||
while (this.queued_tasks.pop()) |http| {
|
||||
var cloned = ThreadlocalAsyncHTTP.new(.{
|
||||
.async_http = http.*,
|
||||
});
|
||||
cloned.async_http.real = http;
|
||||
cloned.async_http.onStart();
|
||||
if (comptime Environment.allow_assert) {
|
||||
count += 1;
|
||||
var stack_queue = std.heap.stackFallback(2048, bun.default_allocator);
|
||||
var queue = std.ArrayList(*AsyncHTTP).init(stack_queue.get());
|
||||
defer queue.deinit();
|
||||
|
||||
while (active_requests_count.load(.monotonic) < max_simultaneous_requests.load(.monotonic)) {
|
||||
const to_clone_count = max_simultaneous_requests.load(.monotonic) - active_requests_count.load(.monotonic);
|
||||
if (to_clone_count == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
active += 1;
|
||||
if (active >= max) break;
|
||||
var any = false;
|
||||
while (this.queued_tasks.pop()) |http| {
|
||||
any = true;
|
||||
http.onStart();
|
||||
|
||||
if (active_requests_count.load(.monotonic) >= max_simultaneous_requests.load(.monotonic)) break;
|
||||
}
|
||||
|
||||
if (!any) break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -970,14 +1172,12 @@ pub const HTTPThread = struct {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void {
|
||||
pub fn scheduleShutdown(this: *@This(), token: u32) void {
|
||||
{
|
||||
bun.debugAssert(token > 0);
|
||||
this.queued_shutdowns_lock.lock();
|
||||
defer this.queued_shutdowns_lock.unlock();
|
||||
this.queued_shutdowns.append(bun.default_allocator, .{
|
||||
.async_http_id = http.async_http_id,
|
||||
.is_tls = http.client.isHTTPS(),
|
||||
}) catch bun.outOfMemory();
|
||||
this.queued_shutdowns.append(bun.default_allocator, token) catch bun.outOfMemory();
|
||||
}
|
||||
if (this.has_awoken.load(.monotonic))
|
||||
this.loop.loop.wakeup();
|
||||
@@ -1087,9 +1287,7 @@ pub fn onOpen(
|
||||
assert(is_ssl == client.url.isHTTPS());
|
||||
}
|
||||
}
|
||||
if (client.signals.aborted != null) {
|
||||
socket_async_http_abort_tracker.put(client.async_http_id, socket.socket) catch unreachable;
|
||||
}
|
||||
abort_tracker.put(client.token, socket.toAnySocket());
|
||||
log("Connected {s} \n", .{client.url.href});
|
||||
|
||||
if (client.signals.get(.aborted)) {
|
||||
@@ -1673,7 +1871,6 @@ state: InternalState = .{},
|
||||
|
||||
did_have_handshaking_error: bool = false,
|
||||
tls_props: ?*SSLConfig = null,
|
||||
result_callback: HTTPClientResult.Callback = undefined,
|
||||
|
||||
/// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since.
|
||||
/// This is a workaround for that.
|
||||
@@ -1685,8 +1882,10 @@ http_proxy: ?URL = null,
|
||||
proxy_authorization: ?[]u8 = null,
|
||||
proxy_tunneling: bool = false,
|
||||
proxy_tunnel: ?ProxyTunnel = null,
|
||||
signals: Signals = .{},
|
||||
async_http_id: u32 = 0,
|
||||
signals: Signals = .{
|
||||
.flags = undefined,
|
||||
},
|
||||
token: u32 = 0,
|
||||
hostname: ?[]u8 = null,
|
||||
reject_unauthorized: bool = true,
|
||||
unix_socket_path: JSC.ZigString.Slice = JSC.ZigString.Slice.empty,
|
||||
@@ -1831,7 +2030,53 @@ pub const HTTPChannelContext = struct {
|
||||
}
|
||||
};
|
||||
|
||||
pub const AsyncHTTP = struct {
|
||||
pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void {
|
||||
if (env.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
|
||||
const max = std.fmt.parseInt(u16, max_http_requests, 10) catch {
|
||||
logger.addErrorFmt(
|
||||
null,
|
||||
Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
|
||||
.{max_http_requests},
|
||||
) catch unreachable;
|
||||
return;
|
||||
};
|
||||
if (max == 0) {
|
||||
logger.addWarningFmt(
|
||||
null,
|
||||
Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
|
||||
.{},
|
||||
) catch unreachable;
|
||||
return;
|
||||
}
|
||||
max_simultaneous_requests.store(max, .monotonic);
|
||||
}
|
||||
}
|
||||
|
||||
const Preconnect = struct {
|
||||
pending_http_request: *PendingHTTPRequest,
|
||||
response_buffer: MutableString,
|
||||
url: bun.URL,
|
||||
is_url_owned: bool,
|
||||
|
||||
pub usingnamespace bun.New(@This());
|
||||
|
||||
pub fn onResult(this: *Preconnect, _: *PendingHTTPRequest, _: HTTPClientResult) void {
|
||||
this.response_buffer.deinit();
|
||||
if (this.is_url_owned) {
|
||||
bun.default_allocator.free(this.url.href);
|
||||
}
|
||||
this.pending_http_request.deref();
|
||||
this.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
// *** WARNING: Do not expose this struct outside of this file ***
|
||||
// We should delete this struct entirely later.
|
||||
const AsyncHTTP = struct {
|
||||
request: ?picohttp.Request = null,
|
||||
response: ?picohttp.Response = null,
|
||||
request_headers: Headers.Entries = Headers.Entries{},
|
||||
@@ -1843,8 +2088,8 @@ pub const AsyncHTTP = struct {
|
||||
method: Method = Method.GET,
|
||||
url: URL,
|
||||
http_proxy: ?URL = null,
|
||||
real: ?*AsyncHTTP = null,
|
||||
next: ?*AsyncHTTP = null,
|
||||
free_request_headers_buf: bool = false,
|
||||
|
||||
task: ThreadPool.Task = ThreadPool.Task{ .callback = &startAsyncHTTP },
|
||||
result_callback: HTTPClientResult.Callback = undefined,
|
||||
@@ -1856,63 +2101,19 @@ pub const AsyncHTTP = struct {
|
||||
|
||||
client: HTTPClient = undefined,
|
||||
err: ?anyerror = null,
|
||||
async_http_id: u32 = 0,
|
||||
token: u32 = 0,
|
||||
|
||||
state: AtomicState = AtomicState.init(State.pending),
|
||||
elapsed: u64 = 0,
|
||||
gzip_elapsed: u64 = 0,
|
||||
|
||||
signals: Signals = .{},
|
||||
signals: Signals = .{
|
||||
.flags = undefined,
|
||||
},
|
||||
|
||||
pub var active_requests_count = std.atomic.Value(usize).init(0);
|
||||
pub var max_simultaneous_requests = std.atomic.Value(usize).init(256);
|
||||
pending_http_request: ?*PendingHTTPRequest = null,
|
||||
|
||||
pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void {
|
||||
if (env.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
|
||||
const max = std.fmt.parseInt(u16, max_http_requests, 10) catch {
|
||||
logger.addErrorFmt(
|
||||
null,
|
||||
Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
|
||||
.{max_http_requests},
|
||||
) catch unreachable;
|
||||
return;
|
||||
};
|
||||
if (max == 0) {
|
||||
logger.addWarningFmt(
|
||||
null,
|
||||
Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
|
||||
.{},
|
||||
) catch unreachable;
|
||||
return;
|
||||
}
|
||||
AsyncHTTP.max_simultaneous_requests.store(max, .monotonic);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn signalHeaderProgress(this: *AsyncHTTP) void {
|
||||
@fence(.release);
|
||||
var progress = this.signals.header_progress orelse return;
|
||||
progress.store(true, .release);
|
||||
}
|
||||
|
||||
pub fn enableBodyStreaming(this: *AsyncHTTP) void {
|
||||
@fence(.release);
|
||||
var stream = this.signals.body_streaming orelse return;
|
||||
stream.store(true, .release);
|
||||
}
|
||||
|
||||
pub fn clearData(this: *AsyncHTTP) void {
|
||||
this.response_headers.deinit(this.allocator);
|
||||
this.response_headers = .{};
|
||||
this.request = null;
|
||||
this.response = null;
|
||||
this.client.unix_socket_path.deinit();
|
||||
this.client.unix_socket_path = JSC.ZigString.Slice.empty;
|
||||
}
|
||||
pub usingnamespace bun.New(@This());
|
||||
|
||||
pub const State = enum(u32) {
|
||||
pending = 0,
|
||||
@@ -1923,88 +2124,41 @@ pub const AsyncHTTP = struct {
|
||||
};
|
||||
const AtomicState = std.atomic.Value(State);
|
||||
|
||||
pub const Options = struct {
|
||||
http_proxy: ?URL = null,
|
||||
hostname: ?[]u8 = null,
|
||||
signals: ?Signals = null,
|
||||
unix_socket_path: ?JSC.ZigString.Slice = null,
|
||||
disable_timeout: ?bool = null,
|
||||
verbose: ?HTTPVerboseLevel = null,
|
||||
disable_keepalive: ?bool = null,
|
||||
disable_decompression: ?bool = null,
|
||||
reject_unauthorized: ?bool = null,
|
||||
tls_props: ?*SSLConfig = null,
|
||||
};
|
||||
|
||||
const Preconnect = struct {
|
||||
async_http: AsyncHTTP,
|
||||
response_buffer: MutableString,
|
||||
url: bun.URL,
|
||||
is_url_owned: bool,
|
||||
|
||||
pub usingnamespace bun.New(@This());
|
||||
|
||||
pub fn onResult(this: *Preconnect, _: *AsyncHTTP, _: HTTPClientResult) void {
|
||||
this.response_buffer.deinit();
|
||||
this.async_http.clearData();
|
||||
this.async_http.client.deinit();
|
||||
if (this.is_url_owned) {
|
||||
bun.default_allocator.free(this.url.href);
|
||||
}
|
||||
|
||||
this.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn preconnect(
|
||||
url: URL,
|
||||
is_url_owned: bool,
|
||||
) void {
|
||||
if (!FeatureFlags.is_fetch_preconnect_supported) {
|
||||
if (is_url_owned) {
|
||||
bun.default_allocator.free(url.href);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
var this = Preconnect.new(.{
|
||||
.async_http = undefined,
|
||||
.response_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
.url = url,
|
||||
.is_url_owned = is_url_owned,
|
||||
});
|
||||
|
||||
this.async_http = AsyncHTTP.init(bun.default_allocator, .GET, url, .{}, "", &this.response_buffer, "", HTTPClientResult.Callback.New(*Preconnect, Preconnect.onResult).init(this), .manual, .{});
|
||||
this.async_http.client.is_preconnect_only = true;
|
||||
|
||||
http_thread.schedule(Batch.from(&this.async_http.task));
|
||||
pub fn init(
|
||||
options: *const HTTPClientRequest,
|
||||
pending_http_request: *PendingHTTPRequest,
|
||||
callback: HTTPClientResult.Callback,
|
||||
) *AsyncHTTP {
|
||||
var this = AsyncHTTP.new(undefined);
|
||||
_ = this.create(options, pending_http_request, callback);
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn init(
|
||||
allocator: std.mem.Allocator,
|
||||
method: Method,
|
||||
url: URL,
|
||||
headers: Headers.Entries,
|
||||
headers_buf: string,
|
||||
response_buffer: *MutableString,
|
||||
request_body: []const u8,
|
||||
pub fn create(
|
||||
this: *AsyncHTTP,
|
||||
options: *const HTTPClientRequest,
|
||||
pending_http_request: *PendingHTTPRequest,
|
||||
callback: HTTPClientResult.Callback,
|
||||
redirect_type: FetchRedirect,
|
||||
options: Options,
|
||||
) AsyncHTTP {
|
||||
var this = AsyncHTTP{
|
||||
) *AsyncHTTP {
|
||||
const method = options.method;
|
||||
const url = options.url;
|
||||
const headers = options.headers;
|
||||
const headers_buf = options.headers_buf;
|
||||
const redirect_type = options.redirect_type;
|
||||
const allocator = bun.default_allocator;
|
||||
this.* = .{
|
||||
.allocator = allocator,
|
||||
.url = url,
|
||||
.method = method,
|
||||
.request_headers = headers,
|
||||
.request_header_buf = headers_buf,
|
||||
.request_body = .{ .bytes = request_body },
|
||||
.response_buffer = response_buffer,
|
||||
.request_body = options.request_body,
|
||||
.response_buffer = &pending_http_request.response_body,
|
||||
.result_callback = callback,
|
||||
.http_proxy = options.http_proxy,
|
||||
.signals = options.signals orelse .{},
|
||||
.async_http_id = if (options.signals != null and options.signals.?.aborted != null) async_http_id.fetchAdd(1, .monotonic) else 0,
|
||||
.signals = pending_http_request.signals,
|
||||
.token = pending_http_request.token,
|
||||
.pending_http_request = pending_http_request,
|
||||
};
|
||||
|
||||
this.client = .{
|
||||
@@ -2015,9 +2169,10 @@ pub const AsyncHTTP = struct {
|
||||
.header_buf = headers_buf,
|
||||
.hostname = options.hostname,
|
||||
.signals = options.signals orelse this.signals,
|
||||
.async_http_id = this.async_http_id,
|
||||
.token = this.token,
|
||||
.http_proxy = this.http_proxy,
|
||||
.redirect_type = redirect_type,
|
||||
.progress_node = options.progress_node,
|
||||
};
|
||||
if (options.unix_socket_path) |val| {
|
||||
assert(this.client.unix_socket_path.length() == 0);
|
||||
@@ -2038,7 +2193,7 @@ pub const AsyncHTTP = struct {
|
||||
if (options.reject_unauthorized) |val| {
|
||||
this.client.reject_unauthorized = val;
|
||||
}
|
||||
if (options.tls_props) |val| {
|
||||
if (options.tls_config) |val| {
|
||||
this.client.tls_props = val;
|
||||
}
|
||||
|
||||
@@ -2116,10 +2271,16 @@ pub const AsyncHTTP = struct {
|
||||
if (this.http_proxy) |proxy| {
|
||||
//TODO: need to understand how is possible to reuse Proxy with TSL, so disable keepalive if url is HTTPS
|
||||
this.client.disable_keepalive = this.url.isHTTPS();
|
||||
if (this.client.proxy_authorization) |prev| {
|
||||
bun.default_allocator.free(prev);
|
||||
}
|
||||
this.client.proxy_authorization = null;
|
||||
|
||||
// Username between 0 and 4096 chars
|
||||
if (proxy.username.len > 0 and proxy.username.len < 4096) {
|
||||
// Password between 0 and 4096 chars
|
||||
if (proxy.password.len > 0 and proxy.password.len < 4096) {
|
||||
|
||||
// decode password
|
||||
var password_buffer = std.mem.zeroes([4096]u8);
|
||||
var password_stream = std.io.fixedBufferStream(&password_buffer);
|
||||
@@ -2174,18 +2335,18 @@ pub const AsyncHTTP = struct {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void {
|
||||
pub fn schedule(this: *AsyncHTTP, batch: *ThreadPool.Batch) void {
|
||||
this.state.store(.scheduled, .monotonic);
|
||||
this.pending_http_request.?.ref();
|
||||
batch.push(ThreadPool.Batch.from(&this.task));
|
||||
}
|
||||
|
||||
fn sendSyncCallback(this: *SingleHTTPChannel, async_http: *AsyncHTTP, result: HTTPClientResult) void {
|
||||
async_http.real.?.* = async_http.*;
|
||||
async_http.real.?.response_buffer = async_http.response_buffer;
|
||||
fn sendSyncCallback(this: *SingleHTTPChannel, pending_http_request: *PendingHTTPRequest, result: HTTPClientResult) void {
|
||||
_ = pending_http_request; // autofix
|
||||
this.channel.writeItem(result) catch unreachable;
|
||||
}
|
||||
|
||||
pub fn sendSync(this: *AsyncHTTP, comptime _: bool) anyerror!picohttp.Response {
|
||||
pub fn sendSync(this: *AsyncHTTP) anyerror!picohttp.Response {
|
||||
HTTPThread.init();
|
||||
|
||||
var ctx = try bun.default_allocator.create(SingleHTTPChannel);
|
||||
@@ -2196,7 +2357,7 @@ pub const AsyncHTTP = struct {
|
||||
).init(ctx);
|
||||
|
||||
var batch = bun.ThreadPool.Batch{};
|
||||
this.schedule(bun.default_allocator, &batch);
|
||||
this.schedule(&batch);
|
||||
http_thread.schedule(batch);
|
||||
while (true) {
|
||||
const result: HTTPClientResult = ctx.channel.readItem() catch unreachable;
|
||||
@@ -2208,9 +2369,7 @@ pub const AsyncHTTP = struct {
|
||||
unreachable;
|
||||
}
|
||||
|
||||
pub fn onAsyncHTTPCallback(this: *AsyncHTTP, async_http: *AsyncHTTP, result: HTTPClientResult) void {
|
||||
assert(this.real != null);
|
||||
|
||||
pub fn onAsyncHTTPCallback(this: *AsyncHTTP, result: HTTPClientResult, is_owned_by_self: bool) void {
|
||||
var callback = this.result_callback;
|
||||
this.elapsed = http_thread.timer.read() -| this.elapsed;
|
||||
|
||||
@@ -2228,33 +2387,36 @@ pub const AsyncHTTP = struct {
|
||||
this.response = null;
|
||||
this.state.store(State.fail, .monotonic);
|
||||
}
|
||||
|
||||
if (comptime Environment.enable_logs) {
|
||||
if (socket_async_http_abort_tracker.count() > 0) {
|
||||
log("socket_async_http_abort_tracker count: {d}", .{socket_async_http_abort_tracker.count()});
|
||||
}
|
||||
}
|
||||
|
||||
if (socket_async_http_abort_tracker.capacity() > 10_000 and socket_async_http_abort_tracker.count() < 100) {
|
||||
socket_async_http_abort_tracker.shrinkAndFree(socket_async_http_abort_tracker.count());
|
||||
}
|
||||
abort_tracker.garbageCollect();
|
||||
|
||||
if (result.has_more) {
|
||||
callback.function(callback.ctx, async_http, result);
|
||||
callback.function(callback.ctx, this.pending_http_request.?, result);
|
||||
} else {
|
||||
{
|
||||
this.client.deinit();
|
||||
var threadlocal_http: *ThreadlocalAsyncHTTP = @fieldParentPtr("async_http", async_http);
|
||||
defer threadlocal_http.destroy();
|
||||
log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
|
||||
callback.function(callback.ctx, async_http, result);
|
||||
log("onAsyncHTTPCallback({*}): {}", .{ this, bun.fmt.fmtDuration(this.elapsed) });
|
||||
var pending = this.pending_http_request.?;
|
||||
pending.elapsed = this.elapsed;
|
||||
pending.signals.set(.done, true);
|
||||
defer pending.deref();
|
||||
this.pending_http_request = null;
|
||||
callback.function(callback.ctx, pending, result);
|
||||
if (is_owned_by_self) {
|
||||
if (this.free_request_headers_buf) {
|
||||
this.allocator.free(this.request_header_buf);
|
||||
}
|
||||
|
||||
var threadlocal_http: *ThreadlocalAsyncHTTP = @fieldParentPtr("async_http", this);
|
||||
|
||||
threadlocal_http.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .monotonic);
|
||||
const active_requests = active_requests_count.fetchSub(1, .monotonic);
|
||||
assert(active_requests > 0);
|
||||
}
|
||||
|
||||
if (!http_thread.queued_tasks.isEmpty() and AsyncHTTP.active_requests_count.load(.monotonic) < AsyncHTTP.max_simultaneous_requests.load(.monotonic)) {
|
||||
if (!http_thread.queued_tasks.isEmpty() and active_requests_count.load(.monotonic) < max_simultaneous_requests.load(.monotonic)) {
|
||||
http_thread.loop.loop.wakeup();
|
||||
}
|
||||
}
|
||||
@@ -2268,10 +2430,6 @@ pub const AsyncHTTP = struct {
|
||||
_ = active_requests_count.fetchAdd(1, .monotonic);
|
||||
this.err = null;
|
||||
this.state.store(.sending, .monotonic);
|
||||
this.client.result_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPCallback).init(
|
||||
this,
|
||||
);
|
||||
|
||||
this.elapsed = http_thread.timer.read();
|
||||
if (this.response_buffer.list.capacity == 0) {
|
||||
this.response_buffer.allocator = default_allocator;
|
||||
@@ -2429,6 +2587,7 @@ pub fn doRedirect(
|
||||
return;
|
||||
}
|
||||
this.state.reset(this.allocator);
|
||||
|
||||
// also reset proxy to redirect
|
||||
this.proxy_tunneling = false;
|
||||
if (this.proxy_tunnel != null) {
|
||||
@@ -2436,11 +2595,19 @@ pub fn doRedirect(
|
||||
tunnel.deinit();
|
||||
this.proxy_tunnel = null;
|
||||
}
|
||||
if (this.signals.aborted != null) {
|
||||
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
|
||||
}
|
||||
abort_tracker.remove(this.token);
|
||||
|
||||
return this.start(.{ .bytes = request_body }, body_out_str);
|
||||
// We have to clone it because it will get destructed once it finishes.
|
||||
var clone = ThreadlocalAsyncHTTP.new(.{
|
||||
.async_http = this.async_http().*,
|
||||
});
|
||||
var parent: *ThreadlocalAsyncHTTP = @fieldParentPtr("async_http", this.async_http());
|
||||
parent.destroy();
|
||||
|
||||
clone.async_http.client.start(.{ .bytes = request_body }, body_out_str);
|
||||
}
|
||||
pub fn async_http(this: *HTTPClient) *AsyncHTTP {
|
||||
return @fieldParentPtr("client", this);
|
||||
}
|
||||
pub fn isHTTPS(this: *HTTPClient) bool {
|
||||
if (this.http_proxy) |proxy| {
|
||||
@@ -2477,20 +2644,21 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
|
||||
// Aborted before connecting
|
||||
if (this.signals.get(.aborted)) {
|
||||
this.fail(error.AbortedBeforeConnecting);
|
||||
// After fail is called, this memory is no longer accessible.
|
||||
return;
|
||||
}
|
||||
|
||||
var socket = http_thread.connect(this, is_ssl) catch |err| {
|
||||
bun.handleErrorReturnTrace(err, @errorReturnTrace());
|
||||
|
||||
this.fail(err);
|
||||
// After fail is called, this memory is no longer accessible.
|
||||
return;
|
||||
};
|
||||
|
||||
if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) {
|
||||
NewHTTPContext(is_ssl).markSocketAsDead(socket);
|
||||
this.fail(error.ConnectionClosed);
|
||||
assert(this.state.fail != null);
|
||||
// After fail is called, this memory is no longer accessible.
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -2501,6 +2669,9 @@ pub const HTTPResponseMetadata = struct {
|
||||
url: []const u8 = "",
|
||||
owned_buf: []u8 = "",
|
||||
response: picohttp.Response = .{},
|
||||
redirect_count: u32 = 0,
|
||||
done: bool = false,
|
||||
|
||||
pub fn deinit(this: *HTTPResponseMetadata, allocator: std.mem.Allocator) void {
|
||||
if (this.owned_buf.len > 0) allocator.free(this.owned_buf);
|
||||
if (this.response.headers.len > 0) allocator.free(this.response.headers);
|
||||
@@ -2533,7 +2704,7 @@ fn printResponse(response: picohttp.Response) void {
|
||||
|
||||
pub fn onPreconnect(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
log("onPreconnect({})", .{this.url});
|
||||
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
|
||||
abort_tracker.remove(this.token);
|
||||
const ctx = if (comptime is_ssl) &http_thread.https_context else &http_thread.http_context;
|
||||
ctx.releaseSocket(
|
||||
socket,
|
||||
@@ -2547,7 +2718,7 @@ pub fn onPreconnect(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCon
|
||||
this.state.request_stage = .done;
|
||||
this.state.stage = .done;
|
||||
this.proxy_tunneling = false;
|
||||
this.result_callback.run(@fieldParentPtr("client", this), HTTPClientResult{ .fail = null, .metadata = null, .has_more = false });
|
||||
this.async_http().onAsyncHTTPCallback(.{ .fail = null, .metadata = null, .has_more = false }, false);
|
||||
}
|
||||
|
||||
pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
@@ -3172,21 +3343,18 @@ pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCo
|
||||
}
|
||||
|
||||
fn fail(this: *HTTPClient, err: anyerror) void {
|
||||
if (this.signals.aborted != null) {
|
||||
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
|
||||
}
|
||||
abort_tracker.remove(this.token);
|
||||
|
||||
this.state.request_stage = .fail;
|
||||
this.state.response_stage = .fail;
|
||||
this.state.fail = err;
|
||||
this.state.stage = .fail;
|
||||
|
||||
const callback = this.result_callback;
|
||||
const result = this.toResult();
|
||||
this.state.reset(this.allocator);
|
||||
this.proxy_tunneling = false;
|
||||
|
||||
callback.run(@fieldParentPtr("client", this), result);
|
||||
this.async_http().onAsyncHTTPCallback(result, !this.is_preconnect_only);
|
||||
}
|
||||
|
||||
// We have to clone metadata immediately after use
|
||||
@@ -3245,14 +3413,13 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (this.signals.aborted != null and is_done) {
|
||||
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
|
||||
|
||||
if (is_done) {
|
||||
abort_tracker.remove(this.token);
|
||||
}
|
||||
|
||||
log("progressUpdate {}", .{is_done});
|
||||
|
||||
const callback = this.result_callback;
|
||||
|
||||
if (is_done) {
|
||||
if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) {
|
||||
ctx.releaseSocket(
|
||||
@@ -3273,7 +3440,8 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
|
||||
}
|
||||
|
||||
result.body.?.* = body;
|
||||
callback.run(@fieldParentPtr("client", this), result);
|
||||
|
||||
this.async_http().onAsyncHTTPCallback(result, !this.is_preconnect_only);
|
||||
|
||||
if (comptime print_every > 0) {
|
||||
print_every_i += 1;
|
||||
@@ -3301,6 +3469,9 @@ pub const HTTPClientResult = struct {
|
||||
/// If is not chunked encoded and Content-Length is not provided this will be unknown
|
||||
body_size: BodySize = .unknown,
|
||||
redirected: bool = false,
|
||||
aborted: bool = false,
|
||||
retry_count: u32 = 0,
|
||||
|
||||
certificate_info: ?CertificateInfo = null,
|
||||
|
||||
pub const BodySize = union(enum) {
|
||||
@@ -3325,10 +3496,10 @@ pub const HTTPClientResult = struct {
|
||||
ctx: *anyopaque,
|
||||
function: Function,
|
||||
|
||||
pub const Function = *const fn (*anyopaque, *AsyncHTTP, HTTPClientResult) void;
|
||||
pub const Function = *const fn (*anyopaque, *PendingHTTPRequest, HTTPClientResult) void;
|
||||
|
||||
pub fn run(self: Callback, async_http: *AsyncHTTP, result: HTTPClientResult) void {
|
||||
self.function(self.ctx, async_http, result);
|
||||
pub fn run(self: Callback, pending_request: *PendingHTTPRequest, result: HTTPClientResult) void {
|
||||
self.function(self.ctx, pending_request, result);
|
||||
}
|
||||
|
||||
pub fn New(comptime Type: type, comptime callback: anytype) type {
|
||||
@@ -3340,9 +3511,9 @@ pub const HTTPClientResult = struct {
|
||||
};
|
||||
}
|
||||
|
||||
pub fn wrapped_callback(ptr: *anyopaque, async_http: *AsyncHTTP, result: HTTPClientResult) void {
|
||||
pub fn wrapped_callback(ptr: *anyopaque, parent: *PendingHTTPRequest, result: HTTPClientResult) void {
|
||||
const casted = @as(Type, @ptrCast(@alignCast(ptr)));
|
||||
@call(bun.callmod_inline, callback, .{ casted, async_http, result });
|
||||
@call(bun.callmod_inline, callback, .{ casted, parent, result });
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -4008,3 +4179,6 @@ const ThreadlocalAsyncHTTP = struct {
|
||||
async_http: AsyncHTTP,
|
||||
pub usingnamespace bun.New(@This());
|
||||
};
|
||||
|
||||
pub var active_requests_count = std.atomic.Value(usize).init(0);
|
||||
pub var max_simultaneous_requests = std.atomic.Value(usize).init(256);
|
||||
|
||||
@@ -38,7 +38,6 @@ const FileSystem = Fs.FileSystem;
|
||||
const Lock = @import("../lock.zig").Lock;
|
||||
const URL = @import("../url.zig").URL;
|
||||
const HTTP = bun.http;
|
||||
const AsyncHTTP = HTTP.AsyncHTTP;
|
||||
const HTTPChannel = HTTP.HTTPChannel;
|
||||
|
||||
const HeaderBuilder = HTTP.HeaderBuilder;
|
||||
@@ -232,7 +231,8 @@ pub const Aligner = struct {
|
||||
};
|
||||
|
||||
const NetworkTask = struct {
|
||||
http: AsyncHTTP = undefined,
|
||||
http: *HTTP.PendingHTTPRequest,
|
||||
http_task: ?*ThreadPool.Task = null,
|
||||
task_id: u64,
|
||||
url_buf: []const u8 = &[_]u8{},
|
||||
retried: u16 = 0,
|
||||
@@ -259,10 +259,11 @@ const NetworkTask = struct {
|
||||
};
|
||||
pub const DedupeMap = std.HashMap(u64, DedupeMapEntry, IdentityContext(u64), 80);
|
||||
|
||||
pub fn notify(this: *NetworkTask, async_http: *AsyncHTTP, _: anytype) void {
|
||||
pub fn notify(this: *NetworkTask, pending_http_request: *HTTP.PendingHTTPRequest, _: anytype) void {
|
||||
defer this.package_manager.wake();
|
||||
async_http.real.?.* = async_http.*;
|
||||
async_http.real.?.response_buffer = async_http.response_buffer;
|
||||
this.http = pending_http_request;
|
||||
bun.debugAssert(this.http == pending_http_request);
|
||||
this.response_buffer = pending_http_request.response_body;
|
||||
this.package_manager.async_network_task_queue.push(this);
|
||||
}
|
||||
|
||||
@@ -422,18 +423,24 @@ const NetworkTask = struct {
|
||||
header_builder.content = GlobalStringBuilder{ .ptr = @as([*]u8, @ptrFromInt(@intFromPtr(bun.span(default_headers_buf).ptr))), .len = default_headers_buf.len, .cap = default_headers_buf.len };
|
||||
}
|
||||
|
||||
this.response_buffer = try MutableString.init(allocator, 0);
|
||||
this.allocator = allocator;
|
||||
|
||||
const url = URL.parse(this.url_buf);
|
||||
this.http = AsyncHTTP.init(allocator, .GET, url, header_builder.entries, header_builder.content.ptr.?[0..header_builder.content.len], &this.response_buffer, "", this.getCompletionCallback(), HTTP.FetchRedirect.follow, .{
|
||||
.http_proxy = this.package_manager.httpProxy(url),
|
||||
});
|
||||
this.http.client.reject_unauthorized = this.package_manager.tlsRejectUnauthorized();
|
||||
|
||||
if (PackageManager.verbose_install) {
|
||||
this.http.client.verbose = .headers;
|
||||
}
|
||||
const verbose: HTTP.HTTPVerboseLevel = if (PackageManager.verbose_install) .headers else .none;
|
||||
this.http = bun.http.fetchBatched(
|
||||
&.{
|
||||
.allocator = allocator,
|
||||
.method = .GET,
|
||||
.url = url,
|
||||
.headers = header_builder.entries,
|
||||
.headers_buf = header_builder.content.ptr.?[0..header_builder.content.len],
|
||||
.reject_unauthorized = this.package_manager.tlsRejectUnauthorized(),
|
||||
.http_proxy = this.package_manager.httpProxy(url),
|
||||
.verbose = verbose,
|
||||
},
|
||||
this.getCompletionCallback(),
|
||||
&this.http_task,
|
||||
);
|
||||
|
||||
this.callback = .{
|
||||
.package_manifest = .{
|
||||
@@ -441,17 +448,6 @@ const NetworkTask = struct {
|
||||
.loaded_manifest = if (loaded_manifest) |manifest| manifest.* else null,
|
||||
},
|
||||
};
|
||||
|
||||
if (PackageManager.verbose_install) {
|
||||
this.http.verbose = .headers;
|
||||
this.http.client.verbose = .headers;
|
||||
}
|
||||
|
||||
// Incase the ETag causes invalidation, we fallback to the last modified date.
|
||||
if (last_modified.len != 0 and bun.getRuntimeFeatureFlag("BUN_FEATURE_FLAG_LAST_MODIFIED_PRETEND_304")) {
|
||||
this.http.client.force_last_modified = true;
|
||||
this.http.client.if_modified_since = last_modified;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn getCompletionCallback(this: *NetworkTask) HTTP.HTTPClientResult.Callback {
|
||||
@@ -459,7 +455,11 @@ const NetworkTask = struct {
|
||||
}
|
||||
|
||||
pub fn schedule(this: *NetworkTask, batch: *ThreadPool.Batch) void {
|
||||
this.http.schedule(this.allocator, batch);
|
||||
batch.push(.{
|
||||
.head = this.http_task.?,
|
||||
.tail = this.http_task.?,
|
||||
.len = 1,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn forTarball(
|
||||
@@ -510,13 +510,21 @@ const NetworkTask = struct {
|
||||
|
||||
const url = URL.parse(this.url_buf);
|
||||
|
||||
this.http = AsyncHTTP.init(allocator, .GET, url, header_builder.entries, header_buf, &this.response_buffer, "", this.getCompletionCallback(), HTTP.FetchRedirect.follow, .{
|
||||
.http_proxy = this.package_manager.httpProxy(url),
|
||||
});
|
||||
this.http.client.reject_unauthorized = this.package_manager.tlsRejectUnauthorized();
|
||||
if (PackageManager.verbose_install) {
|
||||
this.http.client.verbose = .headers;
|
||||
}
|
||||
const verbose: HTTP.HTTPVerboseLevel = if (PackageManager.verbose_install) .headers else .none;
|
||||
this.http = bun.http.fetchBatched(
|
||||
&.{
|
||||
.allocator = allocator,
|
||||
.method = .GET,
|
||||
.url = url,
|
||||
.headers = header_builder.entries,
|
||||
.headers_buf = header_buf,
|
||||
.reject_unauthorized = this.package_manager.tlsRejectUnauthorized(),
|
||||
.http_proxy = this.package_manager.httpProxy(url),
|
||||
.verbose = verbose,
|
||||
},
|
||||
this.getCompletionCallback(),
|
||||
&this.http_task,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -699,7 +707,7 @@ pub const Task = struct {
|
||||
const package_manifest = Npm.Registry.getPackageMetadata(
|
||||
allocator,
|
||||
manager.scopeForPackageName(manifest.name.slice()),
|
||||
manifest.network.http.response.?,
|
||||
manifest.network.http.response().?,
|
||||
body,
|
||||
&this.log,
|
||||
manifest.name.slice(),
|
||||
@@ -4275,6 +4283,7 @@ pub const PackageManager = struct {
|
||||
.task_id = task_id,
|
||||
.callback = undefined,
|
||||
.allocator = this.allocator,
|
||||
.http = undefined,
|
||||
.package_manager = this,
|
||||
.apply_patch_task = if (patch_name_and_version_hash) |h| brk: {
|
||||
const patch_hash = this.lockfile.patched_dependencies.get(h).?.patchfileHash().?;
|
||||
@@ -5229,6 +5238,7 @@ pub const PackageManager = struct {
|
||||
|
||||
var network_task = this.getNetworkTask();
|
||||
network_task.* = .{
|
||||
.http = undefined,
|
||||
.package_manager = &PackageManager.instance, // https://github.com/ziglang/zig/issues/14005
|
||||
.callback = undefined,
|
||||
.task_id = task_id,
|
||||
@@ -6167,18 +6177,18 @@ pub const PackageManager = struct {
|
||||
}
|
||||
}
|
||||
|
||||
if (!has_network_error and task.http.response == null) {
|
||||
if (!has_network_error and task.http.response() == null) {
|
||||
has_network_error = true;
|
||||
const min = manager.options.min_simultaneous_requests;
|
||||
const max = AsyncHTTP.max_simultaneous_requests.load(.monotonic);
|
||||
const max = HTTP.max_simultaneous_requests.load(.monotonic);
|
||||
if (max > min) {
|
||||
AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .monotonic);
|
||||
HTTP.max_simultaneous_requests.store(@max(min, max / 2), .monotonic);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle retry-able errors.
|
||||
if (task.http.response == null or task.http.response.?.status_code > 499) {
|
||||
const err = task.http.err orelse error.HTTPError;
|
||||
if (task.http.response() == null or task.http.response().?.status_code > 499) {
|
||||
const err = task.http.err() orelse error.HTTPError;
|
||||
|
||||
if (task.retried < manager.options.max_retry_count) {
|
||||
task.retried += 1;
|
||||
@@ -6198,9 +6208,9 @@ pub const PackageManager = struct {
|
||||
}
|
||||
}
|
||||
|
||||
const response = task.http.response orelse {
|
||||
const response = task.http.response() orelse {
|
||||
// Handle non-retry-able errors.
|
||||
const err = task.http.err orelse error.HTTPError;
|
||||
const err = task.http.err() orelse error.HTTPError;
|
||||
|
||||
if (@TypeOf(callbacks.onPackageManifestError) != void) {
|
||||
callbacks.onPackageManifestError(
|
||||
@@ -6272,7 +6282,7 @@ pub const PackageManager = struct {
|
||||
logger.Loc.Empty,
|
||||
manager.allocator,
|
||||
"<r><red><b>GET<r><red> {s}<d> - {d}<r>",
|
||||
.{ task.http.client.url.href, response.status_code },
|
||||
.{ task.url_buf, response.status_code },
|
||||
) catch bun.outOfMemory();
|
||||
} else {
|
||||
manager.log.addWarningFmt(
|
||||
@@ -6280,7 +6290,7 @@ pub const PackageManager = struct {
|
||||
logger.Loc.Empty,
|
||||
manager.allocator,
|
||||
"<r><yellow><b>GET<r><yellow> {s}<d> - {d}<r>",
|
||||
.{ task.http.client.url.href, response.status_code },
|
||||
.{ task.url_buf, response.status_code },
|
||||
) catch bun.outOfMemory();
|
||||
}
|
||||
if (manager.subcommand != .remove) {
|
||||
@@ -6343,17 +6353,17 @@ pub const PackageManager = struct {
|
||||
manager.task_batch.push(ThreadPool.Batch.from(manager.enqueueParseNPMPackage(task.task_id, name, task)));
|
||||
},
|
||||
.extract => |*extract| {
|
||||
if (!has_network_error and task.http.response == null) {
|
||||
if (!has_network_error and task.http.response() == null) {
|
||||
has_network_error = true;
|
||||
const min = manager.options.min_simultaneous_requests;
|
||||
const max = AsyncHTTP.max_simultaneous_requests.load(.monotonic);
|
||||
const max = HTTP.max_simultaneous_requests.load(.monotonic);
|
||||
if (max > min) {
|
||||
AsyncHTTP.max_simultaneous_requests.store(@max(min, max / 2), .monotonic);
|
||||
HTTP.max_simultaneous_requests.store(@max(min, max / 2), .monotonic);
|
||||
}
|
||||
}
|
||||
|
||||
if (task.http.response == null or task.http.response.?.status_code > 499) {
|
||||
const err = task.http.err orelse error.TarballFailedToDownload;
|
||||
if (task.http.response() == null or task.http.response().?.status_code > 499) {
|
||||
const err = task.http.err() orelse error.TarballFailedToDownload;
|
||||
|
||||
if (task.retried < manager.options.max_retry_count) {
|
||||
task.retried += 1;
|
||||
@@ -6379,8 +6389,8 @@ pub const PackageManager = struct {
|
||||
}
|
||||
}
|
||||
|
||||
const response = task.http.response orelse {
|
||||
const err = task.http.err orelse error.TarballFailedToDownload;
|
||||
const response = task.http.response() orelse {
|
||||
const err = task.http.err() orelse error.TarballFailedToDownload;
|
||||
|
||||
if (@TypeOf(callbacks.onPackageDownloadError) != void) {
|
||||
const package_id = manager.lockfile.buffers.resolutions.items[extract.dependency_id];
|
||||
@@ -6466,7 +6476,7 @@ pub const PackageManager = struct {
|
||||
manager.allocator,
|
||||
"<r><red><b>GET<r><red> {s}<d> - {d}<r>",
|
||||
.{
|
||||
task.http.client.url.href,
|
||||
task.url_buf,
|
||||
response.status_code,
|
||||
},
|
||||
) catch bun.outOfMemory();
|
||||
@@ -6477,7 +6487,7 @@ pub const PackageManager = struct {
|
||||
manager.allocator,
|
||||
"<r><yellow><b>GET<r><yellow> {s}<d> - {d}<r>",
|
||||
.{
|
||||
task.http.client.url.href,
|
||||
task.url_buf,
|
||||
response.status_code,
|
||||
},
|
||||
) catch bun.outOfMemory();
|
||||
@@ -7170,7 +7180,7 @@ pub const PackageManager = struct {
|
||||
if (std.fmt.parseInt(u16, retry_count, 10)) |int| this.max_retry_count = int else |_| {}
|
||||
}
|
||||
|
||||
AsyncHTTP.loadEnv(allocator, log, env);
|
||||
bun.http.loadEnv(allocator, log, env);
|
||||
|
||||
if (env.get("BUN_CONFIG_SKIP_SAVE_LOCKFILE")) |check_bool| {
|
||||
this.do.save_lockfile = strings.eqlComptime(check_bool, "0");
|
||||
|
||||
@@ -40,7 +40,6 @@ const Fs = @import("../fs.zig");
|
||||
const FileSystem = Fs.FileSystem;
|
||||
const Lock = @import("../lock.zig").Lock;
|
||||
const URL = @import("../url.zig").URL;
|
||||
const AsyncHTTP = bun.http.AsyncHTTP;
|
||||
const HTTPChannel = bun.http.HTTPChannel;
|
||||
|
||||
const Integrity = @import("./integrity.zig").Integrity;
|
||||
|
||||
Reference in New Issue
Block a user