Compare commits

...

4 Commits

Author SHA1 Message Date
Ciro Spaciari
ec6601eb41 more 2025-03-13 19:13:33 -07:00
Ciro Spaciari
da8c2273e4 more 2025-03-13 16:52:38 -07:00
Ciro Spaciari
15a89b98c6 more 2025-03-13 16:41:04 -07:00
Ciro Spaciari
682aa09944 wip 2025-03-12 18:06:15 -07:00
13 changed files with 2603 additions and 2490 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,567 @@
const bun = @import("root").bun;
const std = @import("std");
const string = bun.string;
const Environment = bun.Environment;
const SSLConfig = bun.server.ServerConfig.SSLConfig;
const picohttp = bun.picohttp;
const JSC = bun.JSC;
const assert = bun.assert;
const Log = bun.logger.Log;
const DotEnv = bun.DotEnv;
const Task = bun.ThreadPool.Task;
const Loc = bun.logger.Loc;
const Batch = bun.ThreadPool.Batch;
const FeatureFlags = bun.FeatureFlags;
const MutableString = bun.MutableString;
const Headers = JSC.WebCore.Headers;
pub const UnboundedQueue = bun.UnboundedQueue;
pub const Queue = UnboundedQueue(AsyncHTTP, .next);
pub const ShutdownQueue = UnboundedQueue(AsyncHTTP, .next);
pub const RequestWriteQueue = UnboundedQueue(AsyncHTTP, .next);
const HTTPRequestBody = @import("./request_body.zig").HTTPRequestBody;
const Method = @import("../method.zig").Method;
const URL = bun.URL;
const ThreadPool = bun.ThreadPool;
const uws = bun.uws;
const PercentEncoding = @import("../../url.zig").PercentEncoding;
const http_thread = @import("./thread.zig").getHttpThread();
const Signals = @import("./signals.zig").Signals;
const HTTPClient = @import("../../http.zig").HTTPClient;
var async_http_id_monotonic: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
const default_allocator = bun.default_allocator;
var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSocket).init(default_allocator);
const log = bun.Output.scoped(.fetch, false);
const HTTPCallbackPair = .{ *AsyncHTTP, HTTPClientResult };
pub const HTTPChannel = @import("../../sync.zig").Channel(HTTPCallbackPair, .{ .Static = 1000 });
const HTTPThread = @import("./thread.zig").HTTPThread;
// 32 pointers much cheaper than 1000 pointers
const SingleHTTPChannel = struct {
const SingleHTTPCHannel_ = @import("../../sync.zig").Channel(HTTPClientResult, .{ .Static = 8 });
channel: SingleHTTPCHannel_,
pub fn reset(_: *@This()) void {}
pub fn init() SingleHTTPChannel {
return SingleHTTPChannel{ .channel = SingleHTTPCHannel_.init() };
}
};
pub const HTTPChannelContext = struct {
http: AsyncHTTP = undefined,
channel: *HTTPChannel,
pub fn callback(data: HTTPCallbackPair) void {
var this: *HTTPChannelContext = @fieldParentPtr("http", data.@"0");
this.channel.writeItem(data) catch unreachable;
}
};
pub const HTTPVerboseLevel = enum {
none,
headers,
curl,
};
pub fn registerAbortTracker(
async_http_id: u32,
socket: uws.InternalSocket,
) void {
socket_async_http_abort_tracker.put(async_http_id, socket) catch unreachable;
}
pub fn unregisterAbortTracker(
async_http_id: u32,
) void {
_ = socket_async_http_abort_tracker.swapRemove(async_http_id);
}
const HTTPClientResult = @import("./result.zig").HTTPClientResult;
pub fn getSocketAsyncHTTPAbortTracker() *std.AutoArrayHashMap(u32, uws.InternalSocket) {
return &socket_async_http_abort_tracker;
}
// Exists for heap stats reasons.
pub const ThreadlocalAsyncHTTP = struct {
async_http: AsyncHTTP,
pub usingnamespace bun.New(@This());
};
pub const FetchRedirect = enum(u8) {
follow,
manual,
@"error",
pub const Map = bun.ComptimeStringMap(FetchRedirect, .{
.{ "follow", .follow },
.{ "manual", .manual },
.{ "error", .@"error" },
});
};
pub const AsyncHTTP = struct {
request: ?picohttp.Request = null,
response: ?picohttp.Response = null,
request_headers: Headers.Entry.List = .empty,
response_headers: Headers.Entry.List = .empty,
response_buffer: *MutableString,
request_body: HTTPRequestBody = .{ .bytes = "" },
allocator: std.mem.Allocator,
request_header_buf: string = "",
method: Method = Method.GET,
url: URL,
http_proxy: ?URL = null,
real: ?*AsyncHTTP = null,
next: ?*AsyncHTTP = null,
task: ThreadPool.Task = ThreadPool.Task{ .callback = &startAsyncHTTP },
result_callback: HTTPClientResult.Callback = undefined,
redirected: bool = false,
response_encoding: Encoding = Encoding.identity,
verbose: HTTPVerboseLevel = .none,
client: HTTPClient = undefined,
waitingDeffered: bool = false,
finalized: bool = false,
err: ?anyerror = null,
async_http_id: u32 = 0,
state: AtomicState = AtomicState.init(State.pending),
elapsed: u64 = 0,
gzip_elapsed: u64 = 0,
signals: Signals = .{},
pub var active_requests_count = std.atomic.Value(usize).init(0);
pub var max_simultaneous_requests = std.atomic.Value(usize).init(256);
pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void {
if (env.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
const max = std.fmt.parseInt(u16, max_http_requests, 10) catch {
logger.addErrorFmt(
null,
Loc.Empty,
allocator,
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
.{max_http_requests},
) catch unreachable;
return;
};
if (max == 0) {
logger.addWarningFmt(
null,
Loc.Empty,
allocator,
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
.{},
) catch unreachable;
return;
}
AsyncHTTP.max_simultaneous_requests.store(max, .monotonic);
}
}
pub fn signalHeaderProgress(this: *AsyncHTTP) void {
var progress = this.signals.header_progress orelse return;
progress.store(true, .release);
}
pub fn enableBodyStreaming(this: *AsyncHTTP) void {
var stream = this.signals.body_streaming orelse return;
stream.store(true, .release);
}
pub fn clearData(this: *AsyncHTTP) void {
this.response_headers.deinit(this.allocator);
this.response_headers = .{};
this.request = null;
this.response = null;
this.client.unix_socket_path.deinit();
this.client.unix_socket_path = JSC.ZigString.Slice.empty;
}
pub const State = enum(u32) {
pending = 0,
scheduled = 1,
sending = 2,
success = 3,
fail = 4,
};
const AtomicState = std.atomic.Value(State);
pub const Options = struct {
http_proxy: ?URL = null,
hostname: ?[]u8 = null,
signals: ?Signals = null,
unix_socket_path: ?JSC.ZigString.Slice = null,
disable_timeout: ?bool = null,
verbose: ?HTTPVerboseLevel = null,
disable_keepalive: ?bool = null,
disable_decompression: ?bool = null,
reject_unauthorized: ?bool = null,
tls_props: ?*SSLConfig = null,
};
const Preconnect = struct {
async_http: AsyncHTTP,
response_buffer: MutableString,
url: bun.URL,
is_url_owned: bool,
pub usingnamespace bun.New(@This());
pub fn onResult(this: *Preconnect, _: *AsyncHTTP, _: HTTPClientResult) void {
this.response_buffer.deinit();
this.async_http.clearData();
this.async_http.client.deinit();
if (this.is_url_owned) {
default_allocator.free(this.url.href);
}
this.destroy();
}
};
pub fn preconnect(
url: URL,
is_url_owned: bool,
) void {
if (!FeatureFlags.is_fetch_preconnect_supported) {
if (is_url_owned) {
default_allocator.free(url.href);
}
return;
}
var this = Preconnect.new(.{
.async_http = undefined,
.response_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
.url = url,
.is_url_owned = is_url_owned,
});
this.async_http = AsyncHTTP.init(default_allocator, .GET, url, .{}, "", &this.response_buffer, "", HTTPClientResult.Callback.New(*Preconnect, Preconnect.onResult).init(this), .manual, .{});
this.async_http.client.flags.is_preconnect_only = true;
http_thread.schedule(Batch.from(&this.async_http.task));
}
pub fn init(
allocator: std.mem.Allocator,
method: Method,
url: URL,
headers: Headers.Entry.List,
headers_buf: string,
response_buffer: *MutableString,
request_body: []const u8,
callback: HTTPClientResult.Callback,
redirect_type: FetchRedirect,
options: Options,
) AsyncHTTP {
var this = AsyncHTTP{
.allocator = allocator,
.url = url,
.method = method,
.request_headers = headers,
.request_header_buf = headers_buf,
.request_body = .{ .bytes = request_body },
.response_buffer = response_buffer,
.result_callback = callback,
.http_proxy = options.http_proxy,
.signals = options.signals orelse .{},
.async_http_id = if (options.signals != null and options.signals.?.aborted != null) async_http_id_monotonic.fetchAdd(1, .monotonic) else 0,
};
this.client = .{
.allocator = allocator,
.method = method,
.url = url,
.header_entries = headers,
.header_buf = headers_buf,
.hostname = options.hostname,
.signals = options.signals orelse this.signals,
.async_http_id = this.async_http_id,
.http_proxy = this.http_proxy,
.redirect_type = redirect_type,
};
if (options.unix_socket_path) |val| {
assert(this.client.unix_socket_path.length() == 0);
this.client.unix_socket_path = val;
}
if (options.disable_timeout) |val| {
this.client.flags.disable_timeout = val;
}
if (options.verbose) |val| {
this.client.verbose = val;
}
if (options.disable_decompression) |val| {
this.client.flags.disable_decompression = val;
}
if (options.disable_keepalive) |val| {
this.client.flags.disable_keepalive = val;
}
if (options.reject_unauthorized) |val| {
this.client.flags.reject_unauthorized = val;
}
if (options.tls_props) |val| {
this.client.tls_props = val;
}
if (options.http_proxy) |proxy| {
// Username between 0 and 4096 chars
if (proxy.username.len > 0 and proxy.username.len < 4096) {
// Password between 0 and 4096 chars
if (proxy.password.len > 0 and proxy.password.len < 4096) {
// decode password
var password_buffer = std.mem.zeroes([4096]u8);
var password_stream = std.io.fixedBufferStream(&password_buffer);
const password_writer = password_stream.writer();
const PassWriter = @TypeOf(password_writer);
const password_len = PercentEncoding.decode(PassWriter, password_writer, proxy.password) catch {
// Invalid proxy authorization
return this;
};
const password = password_buffer[0..password_len];
// Decode username
var username_buffer = std.mem.zeroes([4096]u8);
var username_stream = std.io.fixedBufferStream(&username_buffer);
const username_writer = username_stream.writer();
const UserWriter = @TypeOf(username_writer);
const username_len = PercentEncoding.decode(UserWriter, username_writer, proxy.username) catch {
// Invalid proxy authorization
return this;
};
const username = username_buffer[0..username_len];
// concat user and password
const auth = std.fmt.allocPrint(allocator, "{s}:{s}", .{ username, password }) catch unreachable;
defer allocator.free(auth);
const size = std.base64.standard.Encoder.calcSize(auth.len);
var buf = this.allocator.alloc(u8, size + "Basic ".len) catch unreachable;
const encoded = std.base64.url_safe.Encoder.encode(buf["Basic ".len..], auth);
buf[0.."Basic ".len].* = "Basic ".*;
this.client.proxy_authorization = buf[0 .. "Basic ".len + encoded.len];
} else {
//Decode username
var username_buffer = std.mem.zeroes([4096]u8);
var username_stream = std.io.fixedBufferStream(&username_buffer);
const username_writer = username_stream.writer();
const UserWriter = @TypeOf(username_writer);
const username_len = PercentEncoding.decode(UserWriter, username_writer, proxy.username) catch {
// Invalid proxy authorization
return this;
};
const username = username_buffer[0..username_len];
// only use user
const size = std.base64.standard.Encoder.calcSize(username_len);
var buf = allocator.alloc(u8, size + "Basic ".len) catch unreachable;
const encoded = std.base64.url_safe.Encoder.encode(buf["Basic ".len..], username);
buf[0.."Basic ".len].* = "Basic ".*;
this.client.proxy_authorization = buf[0 .. "Basic ".len + encoded.len];
}
}
}
return this;
}
pub fn initSync(
allocator: std.mem.Allocator,
method: Method,
url: URL,
headers: Headers.Entry.List,
headers_buf: string,
response_buffer: *MutableString,
request_body: []const u8,
http_proxy: ?URL,
hostname: ?[]u8,
redirect_type: FetchRedirect,
) AsyncHTTP {
return @This().init(
allocator,
method,
url,
headers,
headers_buf,
response_buffer,
request_body,
undefined,
redirect_type,
.{
.http_proxy = http_proxy,
.hostname = hostname,
},
);
}
fn reset(this: *AsyncHTTP) !void {
const aborted = this.client.aborted;
this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf, aborted);
this.client.http_proxy = this.http_proxy;
if (this.http_proxy) |proxy| {
//TODO: need to understand how is possible to reuse Proxy with TSL, so disable keepalive if url is HTTPS
this.client.flags.disable_keepalive = this.url.isHTTPS();
// Username between 0 and 4096 chars
if (proxy.username.len > 0 and proxy.username.len < 4096) {
// Password between 0 and 4096 chars
if (proxy.password.len > 0 and proxy.password.len < 4096) {
// decode password
var password_buffer = std.mem.zeroes([4096]u8);
var password_stream = std.io.fixedBufferStream(&password_buffer);
const password_writer = password_stream.writer();
const PassWriter = @TypeOf(password_writer);
const password_len = PercentEncoding.decode(PassWriter, password_writer, proxy.password) catch {
// Invalid proxy authorization
return this;
};
const password = password_buffer[0..password_len];
// Decode username
var username_buffer = std.mem.zeroes([4096]u8);
var username_stream = std.io.fixedBufferStream(&username_buffer);
const username_writer = username_stream.writer();
const UserWriter = @TypeOf(username_writer);
const username_len = PercentEncoding.decode(UserWriter, username_writer, proxy.username) catch {
// Invalid proxy authorization
return this;
};
const username = username_buffer[0..username_len];
// concat user and password
const auth = std.fmt.allocPrint(this.allocator, "{s}:{s}", .{ username, password }) catch unreachable;
defer this.allocator.free(auth);
const size = std.base64.standard.Encoder.calcSize(auth.len);
var buf = this.allocator.alloc(u8, size + "Basic ".len) catch unreachable;
const encoded = std.base64.url_safe.Encoder.encode(buf["Basic ".len..], auth);
buf[0.."Basic ".len].* = "Basic ".*;
this.client.proxy_authorization = buf[0 .. "Basic ".len + encoded.len];
} else {
//Decode username
var username_buffer = std.mem.zeroes([4096]u8);
var username_stream = std.io.fixedBufferStream(&username_buffer);
const username_writer = username_stream.writer();
const UserWriter = @TypeOf(username_writer);
const username_len = PercentEncoding.decode(UserWriter, username_writer, proxy.username) catch {
// Invalid proxy authorization
return this;
};
const username = username_buffer[0..username_len];
// only use user
const size = std.base64.standard.Encoder.calcSize(username_len);
var buf = this.allocator.alloc(u8, size + "Basic ".len) catch unreachable;
const encoded = std.base64.url_safe.Encoder.encode(buf["Basic ".len..], username);
buf[0.."Basic ".len].* = "Basic ".*;
this.client.proxy_authorization = buf[0 .. "Basic ".len + encoded.len];
}
}
}
}
pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void {
this.state.store(.scheduled, .monotonic);
batch.push(ThreadPool.Batch.from(&this.task));
}
fn sendSyncCallback(this: *SingleHTTPChannel, async_http: *AsyncHTTP, result: HTTPClientResult) void {
async_http.real.?.* = async_http.*;
async_http.real.?.response_buffer = async_http.response_buffer;
this.channel.writeItem(result) catch unreachable;
}
pub fn sendSync(this: *AsyncHTTP) anyerror!picohttp.Response {
HTTPThread.init(&.{});
var ctx = try default_allocator.create(SingleHTTPChannel);
ctx.* = SingleHTTPChannel.init();
this.result_callback = HTTPClientResult.Callback.New(
*SingleHTTPChannel,
sendSyncCallback,
).init(ctx);
var batch = bun.ThreadPool.Batch{};
this.schedule(default_allocator, &batch);
http_thread.schedule(batch);
const result = ctx.channel.readItem() catch unreachable;
if (result.fail) |err| {
return err;
}
assert(result.metadata != null);
return result.metadata.?.response;
}
pub fn onAsyncHTTPCallback(this: *AsyncHTTP, async_http: *AsyncHTTP, result: HTTPClientResult) void {
assert(this.real != null);
var callback = this.result_callback;
this.elapsed = http_thread.timer.read() -| this.elapsed;
// TODO: this condition seems wrong: if we started with a non-default value, we might
// report a redirect even if none happened
this.redirected = this.client.flags.redirected;
if (result.isSuccess()) {
this.err = null;
if (result.metadata) |metadata| {
this.response = metadata.response;
}
this.state.store(.success, .monotonic);
} else {
this.err = result.fail;
this.response = null;
this.state.store(State.fail, .monotonic);
}
if (comptime Environment.enable_logs) {
if (socket_async_http_abort_tracker.count() > 0) {
log("socket_async_http_abort_tracker count: {d}", .{socket_async_http_abort_tracker.count()});
}
}
if (socket_async_http_abort_tracker.capacity() > 10_000 and socket_async_http_abort_tracker.count() < 100) {
socket_async_http_abort_tracker.shrinkAndFree(socket_async_http_abort_tracker.count());
}
if (result.has_more) {
callback.function(callback.ctx, async_http, result);
} else {
{
this.client.deinit();
var threadlocal_http: *ThreadlocalAsyncHTTP = @fieldParentPtr("async_http", async_http);
defer threadlocal_http.destroy();
log("onAsyncHTTPCallback: {any}", .{std.fmt.fmtDuration(this.elapsed)});
callback.function(callback.ctx, async_http, result);
}
const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .monotonic);
assert(active_requests > 0);
}
if (!http_thread.queued_tasks.isEmpty() and AsyncHTTP.active_requests_count.load(.monotonic) < AsyncHTTP.max_simultaneous_requests.load(.monotonic)) {
http_thread.loop.loop.wakeup();
}
}
pub fn startAsyncHTTP(task: *Task) void {
var this: *AsyncHTTP = @fieldParentPtr("task", task);
this.onStart();
}
pub fn onStart(this: *AsyncHTTP) void {
_ = active_requests_count.fetchAdd(1, .monotonic);
this.err = null;
this.state.store(.sending, .monotonic);
this.client.result_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPCallback).init(
this,
);
this.elapsed = http_thread.timer.read();
if (this.response_buffer.list.capacity == 0) {
this.response_buffer.allocator = default_allocator;
}
this.client.start(this.request_body, this.response_buffer);
}
};

View File

@@ -0,0 +1,13 @@
const HTTPCertError = @import("./errors.zig").HTTPCertError;
const std = @import("std");
pub const CertificateInfo = struct {
cert: []const u8,
cert_error: HTTPCertError,
hostname: []const u8,
pub fn deinit(this: *const CertificateInfo, allocator: std.mem.Allocator) void {
allocator.free(this.cert);
allocator.free(this.cert_error.code);
allocator.free(this.cert_error.reason);
allocator.free(this.hostname);
}
};

View File

@@ -0,0 +1,11 @@
pub const InitError = error{
FailedToOpenSocket,
LoadCAFile,
InvalidCAFile,
InvalidCA,
};
pub const HTTPCertError = struct {
error_no: i32 = 0,
code: [:0]const u8 = "",
reason: [:0]const u8 = "",
};

View File

@@ -0,0 +1,367 @@
const bun = @import("root").bun;
const std = @import("std");
const picohttp = bun.picohttp;
const MutableString = bun.MutableString;
const HTTPResponseMetadata = @import("./result.zig").HTTPResponseMetadata;
const CertificateInfo = @import("./certificate_info.zig").CertificateInfo;
const Zlib = @import("./zlib.zig");
const Brotli = bun.brotli;
const default_allocator = bun.default_allocator;
const assert = bun.assert;
const Output = bun.Output;
const FeatureFlags = bun.FeatureFlags;
const log = bun.Output.scoped(.fetch, false);
const HTTPRequestBody = @import("./request_body.zig").HTTPRequestBody;
pub const extremely_verbose = false;
const http_thread = @import("./thread.zig").getHttpThread();
pub const Encoding = enum {
identity,
gzip,
deflate,
brotli,
chunked,
pub fn canUseLibDeflate(this: Encoding) bool {
return switch (this) {
.gzip, .deflate => true,
else => false,
};
}
pub fn isCompressed(this: Encoding) bool {
return switch (this) {
.brotli, .gzip, .deflate => true,
else => false,
};
}
};
const Stage = enum(u8) {
pending,
connect,
done,
fail,
};
pub const HTTPStage = enum {
pending,
headers,
body,
body_chunk,
fail,
done,
proxy_handshake,
proxy_headers,
proxy_body,
};
const Decompressor = union(enum) {
zlib: *Zlib.ZlibReaderArrayList,
brotli: *Brotli.BrotliReaderArrayList,
none: void,
pub fn deinit(this: *Decompressor) void {
switch (this.*) {
inline .brotli, .zlib => |that| {
that.deinit();
this.* = .{ .none = {} };
},
.none => {},
}
}
pub fn updateBuffers(this: *Decompressor, encoding: Encoding, buffer: []const u8, body_out_str: *MutableString) !void {
if (!encoding.isCompressed()) {
return;
}
if (this.* == .none) {
switch (encoding) {
.gzip, .deflate => {
this.* = .{
.zlib = try Zlib.ZlibReaderArrayList.initWithOptionsAndListAllocator(
buffer,
&body_out_str.list,
body_out_str.allocator,
default_allocator,
.{
// zlib.MAX_WBITS = 15
// to (de-)compress deflate format, use wbits = -zlib.MAX_WBITS
// to (de-)compress deflate format with headers we use wbits = 0 (we can detect the first byte using 120)
// to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
.windowBits = if (encoding == Encoding.gzip) Zlib.MAX_WBITS | 16 else (if (buffer.len > 1 and buffer[0] == 120) 0 else -Zlib.MAX_WBITS),
},
),
};
return;
},
.brotli => {
this.* = .{
.brotli = try Brotli.BrotliReaderArrayList.newWithOptions(
buffer,
&body_out_str.list,
body_out_str.allocator,
.{},
),
};
return;
},
else => @panic("Invalid encoding. This code should not be reachable"),
}
}
switch (this.*) {
.zlib => |reader| {
assert(reader.zlib.avail_in == 0);
reader.zlib.next_in = buffer.ptr;
reader.zlib.avail_in = @as(u32, @truncate(buffer.len));
const initial = body_out_str.list.items.len;
body_out_str.list.expandToCapacity();
if (body_out_str.list.capacity == initial) {
try body_out_str.list.ensureUnusedCapacity(body_out_str.allocator, 4096);
body_out_str.list.expandToCapacity();
}
reader.list = body_out_str.list;
reader.zlib.next_out = @ptrCast(&body_out_str.list.items[initial]);
reader.zlib.avail_out = @as(u32, @truncate(body_out_str.list.capacity - initial));
// we reset the total out so we can track how much we decompressed this time
reader.zlib.total_out = @truncate(initial);
},
.brotli => |reader| {
reader.input = buffer;
reader.total_in = 0;
const initial = body_out_str.list.items.len;
reader.list = body_out_str.list;
reader.total_out = @truncate(initial);
},
else => @panic("Invalid encoding. This code should not be reachable"),
}
}
pub fn readAll(this: *Decompressor, is_done: bool) !void {
switch (this.*) {
.zlib => |zlib| try zlib.readAll(),
.brotli => |brotli| try brotli.readAll(is_done),
.none => {},
}
}
};
// TODO: reduce the size of this struct
// Many of these fields can be moved to a packed struct and use less space
pub const InternalState = struct {
response_message_buffer: MutableString = undefined,
/// pending response is the temporary storage for the response headers, url and status code
/// this uses shared_response_headers_buf to store the headers
/// this will be turned null once the metadata is cloned
pending_response: ?picohttp.Response = null,
/// This is the cloned metadata containing the response headers, url and status code after the .headers phase are received
/// will be turned null once returned to the user (the ownership is transferred to the user)
/// this can happen after await fetch(...) and the body can continue streaming when this is already null
/// the user will receive only chunks of the body stored in body_out_str
cloned_metadata: ?HTTPResponseMetadata = null,
flags: InternalStateFlags = InternalStateFlags{},
transfer_encoding: Encoding = Encoding.identity,
encoding: Encoding = Encoding.identity,
content_encoding_i: u8 = std.math.maxInt(u8),
chunked_decoder: picohttp.phr_chunked_decoder = .{},
decompressor: Decompressor = .{ .none = {} },
stage: Stage = Stage.pending,
/// This is owned by the user and should not be freed here
body_out_str: ?*MutableString = null,
compressed_body: MutableString = undefined,
content_length: ?usize = null,
total_body_received: usize = 0,
request_body: []const u8 = "",
original_request_body: HTTPRequestBody = .{ .bytes = "" },
request_sent_len: usize = 0,
fail: ?anyerror = null,
request_stage: HTTPStage = .pending,
response_stage: HTTPStage = .pending,
certificate_info: ?CertificateInfo = null,
pub const InternalStateFlags = packed struct {
allow_keepalive: bool = true,
received_last_chunk: bool = false,
did_set_content_encoding: bool = false,
is_redirect_pending: bool = false,
is_libdeflate_fast_path_disabled: bool = false,
resend_request_body_on_redirect: bool = false,
};
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
return .{
.original_request_body = body,
.request_body = if (body == .bytes) body.bytes else "",
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
.body_out_str = body_out_str,
.stage = Stage.pending,
.pending_response = null,
};
}
pub fn isChunkedEncoding(this: *InternalState) bool {
return this.transfer_encoding == Encoding.chunked;
}
pub fn reset(this: *InternalState, allocator: std.mem.Allocator) void {
this.compressed_body.deinit();
this.response_message_buffer.deinit();
const body_msg = this.body_out_str;
if (body_msg) |body| body.reset();
this.decompressor.deinit();
// just in case we check and free to avoid leaks
if (this.cloned_metadata != null) {
this.cloned_metadata.?.deinit(allocator);
this.cloned_metadata = null;
}
// if exists we own this info
if (this.certificate_info) |info| {
this.certificate_info = null;
info.deinit(default_allocator);
}
this.original_request_body.deinit();
this.* = .{
.body_out_str = body_msg,
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
.original_request_body = .{ .bytes = "" },
.request_body = "",
.certificate_info = null,
.flags = .{},
};
}
pub fn getBodyBuffer(this: *InternalState) *MutableString {
if (this.encoding.isCompressed()) {
return &this.compressed_body;
}
return this.body_out_str.?;
}
fn isDone(this: *InternalState) bool {
if (this.isChunkedEncoding()) {
return this.flags.received_last_chunk;
}
if (this.content_length) |content_length| {
return this.total_body_received >= content_length;
}
// Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
return this.flags.received_last_chunk;
}
fn decompressBytes(this: *InternalState, buffer: []const u8, body_out_str: *MutableString, is_final_chunk: bool) !void {
defer this.compressed_body.reset();
var gzip_timer: std.time.Timer = undefined;
if (extremely_verbose)
gzip_timer = std.time.Timer.start() catch @panic("Timer failure");
var still_needs_to_decompress = true;
if (FeatureFlags.isLibdeflateEnabled()) {
// Fast-path: use libdeflate
if (is_final_chunk and !this.flags.is_libdeflate_fast_path_disabled and this.encoding.canUseLibDeflate() and this.isDone()) libdeflate: {
this.flags.is_libdeflate_fast_path_disabled = true;
log("Decompressing {d} bytes with libdeflate\n", .{buffer.len});
var deflater = http_thread.deflater();
// gzip stores the size of the uncompressed data in the last 4 bytes of the stream
// But it's only valid if the stream is less than 4.7 GB, since it's 4 bytes.
// If we know that the stream is going to be larger than our
// pre-allocated buffer, then let's dynamically allocate the exact
// size.
if (this.encoding == Encoding.gzip and buffer.len > 16 and buffer.len < 1024 * 1024 * 1024) {
const estimated_size: u32 = @bitCast(buffer[buffer.len - 4 ..][0..4].*);
// Since this is arbtirary input from the internet, let's set an upper bound of 32 MB for the allocation size.
if (estimated_size > deflater.shared_buffer.len and estimated_size < 32 * 1024 * 1024) {
try body_out_str.list.ensureTotalCapacityPrecise(body_out_str.allocator, estimated_size);
const result = deflater.decompressor.decompress(buffer, body_out_str.list.allocatedSlice(), .gzip);
if (result.status == .success) {
body_out_str.list.items.len = result.written;
still_needs_to_decompress = false;
}
break :libdeflate;
}
}
const result = deflater.decompressor.decompress(buffer, &deflater.shared_buffer, switch (this.encoding) {
.gzip => .gzip,
.deflate => .deflate,
else => unreachable,
});
if (result.status == .success) {
try body_out_str.list.ensureTotalCapacityPrecise(body_out_str.allocator, result.written);
body_out_str.list.appendSliceAssumeCapacity(deflater.shared_buffer[0..result.written]);
still_needs_to_decompress = false;
}
}
}
// Slow path, or brotli: use the .decompressor
if (still_needs_to_decompress) {
log("Decompressing {d} bytes\n", .{buffer.len});
if (body_out_str.list.capacity == 0) {
const min = @min(@ceil(@as(f64, @floatFromInt(buffer.len)) * 1.5), @as(f64, 1024 * 1024 * 2));
try body_out_str.growBy(@max(@as(usize, @intFromFloat(min)), 32));
}
try this.decompressor.updateBuffers(this.encoding, buffer, body_out_str);
this.decompressor.readAll(this.isDone()) catch |err| {
if (this.isDone() or error.ShortRead != err) {
Output.prettyErrorln("<r><red>Decompression error: {s}<r>", .{bun.asByteSlice(@errorName(err))});
Output.flush();
return err;
}
};
}
if (extremely_verbose)
this.gzip_elapsed = gzip_timer.read();
}
fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString, is_final_chunk: bool) !void {
try this.decompressBytes(buffer.list.items, body_out_str, is_final_chunk);
}
pub fn processBodyBuffer(this: *InternalState, buffer: MutableString, is_final_chunk: bool) !bool {
if (this.flags.is_redirect_pending) return false;
var body_out_str = this.body_out_str.?;
switch (this.encoding) {
Encoding.brotli, Encoding.gzip, Encoding.deflate => {
try this.decompress(buffer, body_out_str, is_final_chunk);
},
else => {
if (!body_out_str.owns(buffer.list.items)) {
body_out_str.append(buffer.list.items) catch |err| {
Output.prettyErrorln("<r><red>Failed to append to body buffer: {s}<r>", .{bun.asByteSlice(@errorName(err))});
Output.flush();
return err;
};
}
},
}
return this.body_out_str.?.list.items.len > 0;
}
};

View File

@@ -0,0 +1,36 @@
const bun = @import("root").bun;
const InitError = @import("./errors.zig").InitError;
const stringZ = bun.stringZ;
const Output = bun.Output;
const Global = bun.Global;
fn onInitErrorNoop(err: InitError, opts: InitOpts) noreturn {
switch (err) {
error.LoadCAFile => {
if (!bun.sys.existsZ(opts.abs_ca_file_name)) {
Output.err("HTTPThread", "failed to find CA file: '{s}'", .{opts.abs_ca_file_name});
} else {
Output.err("HTTPThread", "failed to load CA file: '{s}'", .{opts.abs_ca_file_name});
}
},
error.InvalidCAFile => {
Output.err("HTTPThread", "the CA file is invalid: '{s}'", .{opts.abs_ca_file_name});
},
error.InvalidCA => {
Output.err("HTTPThread", "the provided CA is invalid", .{});
},
error.FailedToOpenSocket => {
Output.errGeneric("failed to start HTTP client thread", .{});
},
}
Global.crash();
}
pub const InitOpts = struct {
ca: []stringZ = &.{},
abs_ca_file_name: stringZ = &.{},
for_install: bool = false,
onInitError: *const fn (err: InitError, opts: InitOpts) noreturn = &onInitErrorNoop,
};

View File

@@ -0,0 +1,16 @@
const bun = @import("root").bun;
const picohttp = bun.picohttp;
const std = @import("std");
pub const HTTPResponseMetadata = struct {
url: []const u8 = "",
owned_buf: []u8 = "",
response: picohttp.Response = .{},
pub fn deinit(this: *HTTPResponseMetadata, allocator: std.mem.Allocator) void {
if (this.owned_buf.len > 0) allocator.free(this.owned_buf);
if (this.response.headers.list.len > 0) allocator.free(this.response.headers.list);
this.owned_buf = &.{};
this.url = "";
this.response.headers = .{};
this.response.status = "";
}
};

View File

@@ -0,0 +1,323 @@
const bun = @import("root").bun;
const uws = bun.uws;
const BoringSSL = bun.BoringSSL;
const strings = bun.strings;
const SSLWrapper = @import("../../bun.js/api/bun/ssl_wrapper.zig").SSLWrapper;
const getHttpContext = @import("./thread.zig").getContext;
const http_thread = @import("./http/client/thread.zig").getHttpThread();
const NewHTTPContext = @import("./thread.zig").NewHTTPContext;
const HTTPClient = @import("../../http.zig").HTTPClient;
const SSLConfig = bun.server.ServerConfig.SSLConfig;
const HTTPCertError = @import("./errors.zig").HTTPCertError;
const log = bun.Output.scoped(.fetch, false);
const getTempHostname = @import("../../http.zig").getTempHostname;
const ProxyTunnel = struct {
wrapper: ?ProxyTunnelWrapper = null,
shutdown_err: anyerror = error.ConnectionClosed,
// active socket is the socket that is currently being used
socket: union(enum) {
tcp: NewHTTPContext(false).HTTPSocket,
ssl: NewHTTPContext(true).HTTPSocket,
none: void,
} = .{ .none = {} },
write_buffer: bun.io.StreamBuffer = .{},
ref_count: u32 = 1,
const ProxyTunnelWrapper = SSLWrapper(*HTTPClient);
usingnamespace bun.NewRefCounted(ProxyTunnel, _deinit, null);
fn onOpen(this: *HTTPClient) void {
this.state.response_stage = .proxy_handshake;
this.state.request_stage = .proxy_handshake;
if (this.proxy_tunnel) |proxy| {
proxy.ref();
defer proxy.deref();
if (proxy.wrapper) |*wrapper| {
var ssl_ptr = wrapper.ssl orelse return;
const _hostname = this.hostname orelse this.url.hostname;
var hostname: [:0]const u8 = "";
var hostname_needs_free = false;
const temp_hostname = getTempHostname();
if (!strings.isIPAddress(_hostname)) {
if (_hostname.len < temp_hostname.len) {
@memcpy(temp_hostname[0.._hostname.len], _hostname);
temp_hostname[_hostname.len] = 0;
hostname = temp_hostname[0.._hostname.len :0];
} else {
hostname = bun.default_allocator.dupeZ(u8, _hostname) catch unreachable;
hostname_needs_free = true;
}
}
defer if (hostname_needs_free) bun.default_allocator.free(hostname);
ssl_ptr.configureHTTPClient(hostname);
}
}
}
fn onData(this: *HTTPClient, decoded_data: []const u8) void {
if (decoded_data.len == 0) return;
log("onData decoded {}", .{decoded_data.len});
if (this.proxy_tunnel) |proxy| {
proxy.ref();
defer proxy.deref();
switch (this.state.response_stage) {
.body => {
if (decoded_data.len == 0) return;
const report_progress = this.handleResponseBody(decoded_data, false) catch |err| {
proxy.close(err);
return;
};
if (report_progress) {
switch (proxy.socket) {
.ssl => |socket| {
this.progressUpdate(true, getHttpContext(true), socket);
},
.tcp => |socket| {
this.progressUpdate(false, getHttpContext(false), socket);
},
.none => {},
}
return;
}
},
.body_chunk => {
if (decoded_data.len == 0) return;
const report_progress = this.handleResponseBodyChunkedEncoding(decoded_data) catch |err| {
proxy.close(err);
return;
};
if (report_progress) {
switch (proxy.socket) {
.ssl => |socket| {
this.progressUpdate(true, getHttpContext(true), socket);
},
.tcp => |socket| {
this.progressUpdate(false, getHttpContext(false), socket);
},
.none => {},
}
return;
}
},
.proxy_headers => {
switch (proxy.socket) {
.ssl => |socket| {
this.handleOnDataHeaders(true, decoded_data, getHttpContext(true), socket);
},
.tcp => |socket| {
this.handleOnDataHeaders(false, decoded_data, getHttpContext(false), socket);
},
.none => {},
}
},
else => {
this.state.pending_response = null;
proxy.close(error.UnexpectedData);
},
}
}
}
fn onHandshake(this: *HTTPClient, handshake_success: bool, ssl_error: uws.us_bun_verify_error_t) void {
if (this.proxy_tunnel) |proxy| {
proxy.ref();
defer proxy.deref();
this.state.response_stage = .proxy_headers;
this.state.request_stage = .proxy_headers;
this.state.request_sent_len = 0;
const handshake_error = HTTPCertError{
.error_no = ssl_error.error_no,
.code = if (ssl_error.code == null) "" else ssl_error.code[0..bun.len(ssl_error.code) :0],
.reason = if (ssl_error.code == null) "" else ssl_error.reason[0..bun.len(ssl_error.reason) :0],
};
if (handshake_success) {
// handshake completed but we may have ssl errors
this.flags.did_have_handshaking_error = handshake_error.error_no != 0;
if (this.flags.reject_unauthorized) {
// only reject the connection if reject_unauthorized == true
if (this.flags.did_have_handshaking_error) {
proxy.close(BoringSSL.getCertErrorFromNo(handshake_error.error_no));
return;
}
// if checkServerIdentity returns false, we dont call open this means that the connection was rejected
bun.assert(proxy.wrapper != null);
const ssl_ptr = proxy.wrapper.?.ssl orelse return;
switch (proxy.socket) {
.ssl => |socket| {
if (!this.checkServerIdentity(true, socket, handshake_error, ssl_ptr, false)) {
this.flags.did_have_handshaking_error = true;
this.unregisterAbortTracker();
return;
}
},
.tcp => |socket| {
if (!this.checkServerIdentity(false, socket, handshake_error, ssl_ptr, false)) {
this.flags.did_have_handshaking_error = true;
this.unregisterAbortTracker();
return;
}
},
.none => {},
}
}
switch (proxy.socket) {
.ssl => |socket| {
this.onWritable(true, true, socket);
},
.tcp => |socket| {
this.onWritable(true, false, socket);
},
.none => {},
}
} else {
// if we are here is because server rejected us, and the error_no is the cause of this
// if we set reject_unauthorized == false this means the server requires custom CA aka NODE_EXTRA_CA_CERTS
if (this.flags.did_have_handshaking_error) {
proxy.close(BoringSSL.getCertErrorFromNo(handshake_error.error_no));
return;
}
// if handshake_success it self is false, this means that the connection was rejected
proxy.close(error.ConnectionRefused);
return;
}
}
}
pub fn write(this: *HTTPClient, encoded_data: []const u8) void {
if (this.proxy_tunnel) |proxy| {
const written = switch (proxy.socket) {
.ssl => |socket| socket.write(encoded_data, true),
.tcp => |socket| socket.write(encoded_data, true),
.none => 0,
};
const pending = encoded_data[@intCast(written)..];
if (pending.len > 0) {
// lets flush when we are truly writable
proxy.write_buffer.write(pending) catch bun.outOfMemory();
}
}
}
fn onClose(this: *HTTPClient) void {
if (this.proxy_tunnel) |proxy| {
proxy.ref();
// defer the proxy deref the proxy tunnel may still be in use after triggering the close callback
defer http_thread.scheduleProxyDeref(proxy);
const err = proxy.shutdown_err;
switch (proxy.socket) {
.ssl => |socket| {
this.closeAndFail(err, true, socket);
},
.tcp => |socket| {
this.closeAndFail(err, false, socket);
},
.none => {},
}
proxy.detachSocket();
}
}
fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: SSLConfig) void {
const proxy_tunnel = ProxyTunnel.new(.{});
var custom_options = ssl_options;
// we always request the cert so we can verify it and also we manually abort the connection if the hostname doesn't match
custom_options.reject_unauthorized = 0;
custom_options.request_cert = 1;
proxy_tunnel.wrapper = SSLWrapper(*HTTPClient).init(custom_options, true, .{
.onOpen = ProxyTunnel.onOpen,
.onData = ProxyTunnel.onData,
.onHandshake = ProxyTunnel.onHandshake,
.onClose = ProxyTunnel.onClose,
.write = ProxyTunnel.write,
.ctx = this,
}) catch |err| {
if (err == error.OutOfMemory) {
bun.outOfMemory();
}
// invalid TLS Options
proxy_tunnel.detachAndDeref();
this.closeAndFail(error.ConnectionRefused, is_ssl, socket);
return;
};
this.proxy_tunnel = proxy_tunnel;
if (is_ssl) {
proxy_tunnel.socket = .{ .ssl = socket };
} else {
proxy_tunnel.socket = .{ .tcp = socket };
}
proxy_tunnel.wrapper.?.start();
}
pub fn close(this: *ProxyTunnel, err: anyerror) void {
this.shutdown_err = err;
if (this.wrapper) |*wrapper| {
// fast shutdown the connection
_ = wrapper.shutdown(true);
}
}
pub fn onWritable(this: *ProxyTunnel, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
this.ref();
defer this.deref();
const encoded_data = this.write_buffer.slice();
if (encoded_data.len == 0) {
return;
}
const written = socket.write(encoded_data, true);
if (written == encoded_data.len) {
this.write_buffer.reset();
return;
}
this.write_buffer.cursor += @intCast(written);
if (this.wrapper) |*wrapper| {
// Cycle to through the SSL state machine
_ = wrapper.flush();
}
}
pub fn receiveData(this: *ProxyTunnel, buf: []const u8) void {
this.ref();
defer this.deref();
if (this.wrapper) |*wrapper| {
wrapper.receiveData(buf);
}
}
pub fn writeData(this: *ProxyTunnel, buf: []const u8) !usize {
if (this.wrapper) |*wrapper| {
return try wrapper.writeData(buf);
}
return 0;
}
pub fn detachSocket(this: *ProxyTunnel) void {
this.socket = .{ .none = {} };
}
pub fn detachAndDeref(this: *ProxyTunnel) void {
this.detachSocket();
this.deref();
}
fn _deinit(this: *ProxyTunnel) void {
this.socket = .{ .none = {} };
if (this.wrapper) |*wrapper| {
wrapper.deinit();
this.wrapper = null;
}
this.write_buffer.deinit();
this.destroy();
}
};

View File

@@ -0,0 +1,37 @@
const bun = @import("root").bun;
const std = @import("std");
const Environment = bun.Environment;
const FeatureFlags = bun.FeatureFlags;
const Sendfile = @import("./sendfile.zig").Sendfile;
pub const HTTPRequestBody = union(enum) {
bytes: []const u8,
sendfile: Sendfile,
stream: struct {
buffer: bun.io.StreamBuffer,
ended: bool,
has_backpressure: bool = false,
pub fn hasEnded(this: *@This()) bool {
return this.ended and this.buffer.isEmpty();
}
},
pub fn isStream(this: *const HTTPRequestBody) bool {
return this.* == .stream;
}
pub fn deinit(this: *HTTPRequestBody) void {
switch (this.*) {
.sendfile, .bytes => {},
.stream => |*stream| stream.buffer.deinit(),
}
}
pub fn len(this: *const HTTPRequestBody) usize {
return switch (this.*) {
.bytes => this.bytes.len,
.sendfile => this.sendfile.content_size,
// unknow amounts
.stream => std.math.maxInt(usize),
};
}
};

View File

@@ -0,0 +1,81 @@
const bun = @import("root").bun;
const JSC = bun.JSC;
const MutableString = bun.MutableString;
const HTTPResponseMetadata = @import("./metadata.zig").HTTPResponseMetadata;
const CertificateInfo = @import("./certificate_info.zig").CertificateInfo;
const AsyncHTTP = @import("./async_http.zig").AsyncHTTP;
pub const HTTPClientResult = struct {
body: ?*MutableString = null,
has_more: bool = false,
redirected: bool = false,
can_stream: bool = false,
fail: ?anyerror = null,
/// Owns the response metadata aka headers, url and status code
metadata: ?HTTPResponseMetadata = null,
/// For Http Client requests
/// when Content-Length is provided this represents the whole size of the request
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
/// If is not chunked encoded and Content-Length is not provided this will be unknown
body_size: BodySize = .unknown,
certificate_info: ?CertificateInfo = null,
pub fn abortReason(this: *const HTTPClientResult) ?JSC.CommonAbortReason {
if (this.isTimeout()) {
return .Timeout;
}
if (this.isAbort()) {
return .UserAbort;
}
return null;
}
pub const BodySize = union(enum) {
total_received: usize,
content_length: usize,
unknown: void,
};
pub fn isSuccess(this: *const HTTPClientResult) bool {
return this.fail == null;
}
pub fn isTimeout(this: *const HTTPClientResult) bool {
return if (this.fail) |e| e == error.Timeout else false;
}
pub fn isAbort(this: *const HTTPClientResult) bool {
return if (this.fail) |e| (e == error.Aborted or e == error.AbortedBeforeConnecting) else false;
}
pub const Callback = struct {
ctx: *anyopaque,
function: Function,
pub const Function = *const fn (*anyopaque, *AsyncHTTP, HTTPClientResult) void;
pub fn run(self: Callback, async_http: *AsyncHTTP, result: HTTPClientResult) void {
self.function(self.ctx, async_http, result);
}
pub fn New(comptime Type: type, comptime callback: anytype) type {
return struct {
pub fn init(this: Type) Callback {
return Callback{
.ctx = this,
.function = @This().wrapped_callback,
};
}
pub fn wrapped_callback(ptr: *anyopaque, async_http: *AsyncHTTP, result: HTTPClientResult) void {
const casted = @as(Type, @ptrCast(@alignCast(ptr)));
@call(bun.callmod_inline, callback, .{ casted, async_http, result });
}
};
}
};
};

View File

@@ -0,0 +1,76 @@
const bun = @import("root").bun;
const std = @import("std");
const Environment = bun.Environment;
const FeatureFlags = bun.FeatureFlags;
pub const Sendfile = struct {
fd: bun.FileDescriptor,
remain: usize = 0,
offset: usize = 0,
content_size: usize = 0,
pub fn isEligible(url: bun.URL) bool {
if (comptime Environment.isWindows or !FeatureFlags.streaming_file_uploads_for_http_client) {
return false;
}
return url.isHTTP() and url.href.len > 0;
}
pub fn write(
this: *Sendfile,
socket_fd: bun.FileDescriptor,
) Status {
const adjusted_count_temporary = @min(@as(u64, this.remain), @as(u63, std.math.maxInt(u63)));
// TODO we should not need this int cast; improve the return type of `@min`
const adjusted_count = @as(u63, @intCast(adjusted_count_temporary));
if (Environment.isLinux) {
var signed_offset = @as(i64, @intCast(this.offset));
const begin = this.offset;
const val =
// this does the syscall directly, without libc
std.os.linux.sendfile(socket_fd, this.fd.cast(), &signed_offset, this.remain);
this.offset = @as(u66, @intCast(signed_offset));
const errcode = bun.C.getErrno(val);
this.remain -|= @as(u64, @intCast(this.offset -| begin));
if (errcode != .SUCCESS or this.remain == 0 or val == 0) {
if (errcode == .SUCCESS) {
return .{ .done = {} };
}
return .{ .err = bun.errnoToZigErr(errcode) };
}
} else if (Environment.isPosix) {
var sbytes: std.posix.off_t = adjusted_count;
const signed_offset = @as(i64, @bitCast(@as(u64, this.offset)));
const errcode = bun.C.getErrno(std.c.sendfile(
this.fd.cast(),
socket_fd,
signed_offset,
&sbytes,
null,
0,
));
const wrote = @as(u64, @intCast(sbytes));
this.offset +|= wrote;
this.remain -|= wrote;
if (errcode != .AGAIN or this.remain == 0 or sbytes == 0) {
if (errcode == .SUCCESS) {
return .{ .done = {} };
}
return .{ .err = bun.errnoToZigErr(errcode) };
}
}
return .{ .again = {} };
}
pub const Status = union(enum) {
done: void,
err: anyerror,
again: void,
};
};

View File

@@ -0,0 +1,33 @@
const std = @import("std");
pub const Signals = struct {
header_progress: ?*std.atomic.Value(bool) = null,
body_streaming: ?*std.atomic.Value(bool) = null,
aborted: ?*std.atomic.Value(bool) = null,
cert_errors: ?*std.atomic.Value(bool) = null,
pub fn isEmpty(this: *const Signals) bool {
return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null;
}
pub const Store = struct {
header_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
body_streaming: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
aborted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
cert_errors: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
pub fn to(this: *Store) Signals {
return .{
.header_progress = &this.header_progress,
.body_streaming = &this.body_streaming,
.aborted = &this.aborted,
.cert_errors = &this.cert_errors,
};
}
};
pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals)) bool {
var ptr: *std.atomic.Value(bool) = @field(this, @tagName(field)) orelse return false;
return ptr.load(.monotonic);
}
};

973
src/http/client/thread.zig Normal file
View File

@@ -0,0 +1,973 @@
const bun = @import("root").bun;
const std = @import("std");
const Global = bun.Global;
const picohttp = bun.picohttp;
const BoringSSL = bun.BoringSSL;
const JSC = bun.JSC;
const MutableString = bun.MutableString;
const Environment = bun.Environment;
const DeadSocket = opaque {};
var dead_socket = @as(*DeadSocket, @ptrFromInt(1));
const HiveArray = bun.HiveArray;
const uws = bun.uws;
const FeatureFlags = bun.FeatureFlags;
const TaggedPointerUnion = bun.TaggedPointerUnion;
const InitError = @import("./errors.zig").InitError;
const InitOpts = @import("./init_options.zig").InitOpts;
const SSLConfig = bun.server.ServerConfig.SSLConfig;
const Output = bun.Output;
const assert = bun.assert;
const strings = bun.strings;
const Batch = bun.ThreadPool.Batch;
pub var http_thread: HTTPThread = undefined;
var custom_ssl_context_map = std.AutoArrayHashMap(*SSLConfig, *NewHTTPContext(true)).init(bun.default_allocator);
pub const Headers = JSC.WebCore.Headers;
const HTTPCertError = @import("./errors.zig").HTTPCertError;
const Queue = @import("./async_http.zig").Queue;
const ThreadlocalAsyncHTTP = @import("./async_http.zig").ThreadlocalAsyncHTTP;
const ProxyTunnel = @import("./proxy_tunnel.zig").ProxyTunnel;
const AsyncHTTP = @import("./async_http.zig").AsyncHTTP;
const socket_async_http_abort_tracker = AsyncHTTP.getSocketAsyncHTTPAbortTracker();
const log = Output.scoped(.fetch, false);
const HTTPClient = @import("../../http.zig").HTTPClient;
pub const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
const MAX_KEEPALIVE_HOSTNAME = 128;
pub fn getContext(comptime ssl: bool) *NewHTTPContext(ssl) {
return if (ssl) &http_thread.https_context else &http_thread.http_context;
}
pub fn getHttpThread() *HTTPThread {
return &http_thread;
}
pub fn NewHTTPContext(comptime ssl: bool) type {
return struct {
const pool_size = 64;
const PooledSocket = struct {
http_socket: HTTPSocket,
hostname_buf: [MAX_KEEPALIVE_HOSTNAME]u8 = undefined,
hostname_len: u8 = 0,
port: u16 = 0,
/// If you set `rejectUnauthorized` to `false`, the connection fails to verify,
did_have_handshaking_error_while_reject_unauthorized_is_false: bool = false,
};
pub fn markSocketAsDead(socket: HTTPSocket) void {
if (socket.ext(**anyopaque)) |ctx| {
ctx.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr());
}
}
fn terminateSocket(socket: HTTPSocket) void {
markSocketAsDead(socket);
socket.close(.failure);
}
fn closeSocket(socket: HTTPSocket) void {
markSocketAsDead(socket);
socket.close(.normal);
}
fn getTagged(ptr: *anyopaque) ActiveSocket {
return ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
}
pub fn getTaggedFromSocket(socket: HTTPSocket) ActiveSocket {
if (socket.ext(anyopaque)) |ctx| {
return getTagged(ctx);
}
return ActiveSocket.init(&dead_socket);
}
pending_sockets: HiveArray(PooledSocket, pool_size) = .empty,
us_socket_context: *uws.SocketContext,
const Context = @This();
pub const HTTPSocket = uws.NewSocketHandler(ssl);
pub fn context() *@This() {
return getContext(ssl);
}
const ActiveSocket = TaggedPointerUnion(.{
*DeadSocket,
HTTPClient,
PooledSocket,
});
const ssl_int = @as(c_int, @intFromBool(ssl));
pub fn sslCtx(this: *@This()) *BoringSSL.SSL_CTX {
if (comptime !ssl) {
unreachable;
}
return @as(*BoringSSL.SSL_CTX, @ptrCast(this.us_socket_context.getNativeHandle(true)));
}
pub fn deinit(this: *@This()) void {
this.us_socket_context.deinit(ssl);
uws.us_socket_context_free(@as(c_int, @intFromBool(ssl)), this.us_socket_context);
bun.default_allocator.destroy(this);
}
pub fn initWithClientConfig(this: *@This(), client: *HTTPClient) InitError!void {
if (!comptime ssl) {
@compileError("ssl only");
}
var opts = client.tls_props.?.asUSockets();
opts.request_cert = 1;
opts.reject_unauthorized = 0;
try this.initWithOpts(&opts);
}
fn initWithOpts(this: *@This(), opts: *const uws.us_bun_socket_context_options_t) InitError!void {
if (!comptime ssl) {
@compileError("ssl only");
}
var err: uws.create_bun_socket_error_t = .none;
const socket = uws.us_create_bun_socket_context(ssl_int, http_thread.loop.loop, @sizeOf(usize), opts.*, &err);
if (socket == null) {
return switch (err) {
.load_ca_file => error.LoadCAFile,
.invalid_ca_file => error.InvalidCAFile,
.invalid_ca => error.InvalidCA,
else => error.FailedToOpenSocket,
};
}
this.us_socket_context = socket.?;
this.sslCtx().setup();
HTTPSocket.configure(
this.us_socket_context,
false,
anyopaque,
Handler,
);
}
pub fn initWithThreadOpts(this: *@This(), init_opts: *const InitOpts) InitError!void {
if (!comptime ssl) {
@compileError("ssl only");
}
var opts: uws.us_bun_socket_context_options_t = .{
.ca = if (init_opts.ca.len > 0) @ptrCast(init_opts.ca) else null,
.ca_count = @intCast(init_opts.ca.len),
.ca_file_name = if (init_opts.abs_ca_file_name.len > 0) init_opts.abs_ca_file_name else null,
.request_cert = 1,
};
try this.initWithOpts(&opts);
}
pub fn init(this: *@This()) void {
if (comptime ssl) {
const opts: uws.us_bun_socket_context_options_t = .{
// we request the cert so we load root certs and can verify it
.request_cert = 1,
// we manually abort the connection if the hostname doesn't match
.reject_unauthorized = 0,
};
var err: uws.create_bun_socket_error_t = .none;
this.us_socket_context = uws.us_create_bun_socket_context(ssl_int, http_thread.loop.loop, @sizeOf(usize), opts, &err).?;
this.sslCtx().setup();
} else {
const opts: uws.us_socket_context_options_t = .{};
this.us_socket_context = uws.us_create_socket_context(ssl_int, http_thread.loop.loop, @sizeOf(usize), opts).?;
}
HTTPSocket.configure(
this.us_socket_context,
false,
anyopaque,
Handler,
);
}
/// Attempt to keep the socket alive by reusing it for another request.
/// If no space is available, close the socket.
///
/// If `did_have_handshaking_error_while_reject_unauthorized_is_false`
/// is set, then we can only reuse the socket for HTTP Keep Alive if
/// `reject_unauthorized` is set to `false`.
pub fn releaseSocket(this: *@This(), socket: HTTPSocket, did_have_handshaking_error_while_reject_unauthorized_is_false: bool, hostname: []const u8, port: u16) void {
// log("releaseSocket(0x{})", .{bun.fmt.hexIntUpper(@intFromPtr(socket.socket))});
if (comptime Environment.allow_assert) {
assert(!socket.isClosed());
assert(!socket.isShutdown());
assert(socket.isEstablished());
}
assert(hostname.len > 0);
assert(port > 0);
if (hostname.len <= MAX_KEEPALIVE_HOSTNAME and !socket.isClosedOrHasError() and socket.isEstablished()) {
if (this.pending_sockets.get()) |pending| {
if (socket.ext(**anyopaque)) |ctx| {
ctx.* = bun.cast(**anyopaque, ActiveSocket.init(pending).ptr());
}
socket.flush();
socket.timeout(0);
socket.setTimeoutMinutes(5);
pending.http_socket = socket;
pending.did_have_handshaking_error_while_reject_unauthorized_is_false = did_have_handshaking_error_while_reject_unauthorized_is_false;
@memcpy(pending.hostname_buf[0..hostname.len], hostname);
pending.hostname_len = @as(u8, @truncate(hostname.len));
pending.port = port;
// log("Keep-Alive release {s}:{d} (0x{})", .{ hostname, port, @intFromPtr(socket.socket) });
return;
}
}
closeSocket(socket);
}
pub const Handler = struct {
pub fn onOpen(
ptr: *anyopaque,
socket: HTTPSocket,
) void {
const active = getTagged(ptr);
if (active.get(HTTPClient)) |client| {
if (client.onOpen(comptime ssl, socket)) |_| {
return;
} else |_| {
log("Unable to open socket", .{});
terminateSocket(socket);
return;
}
}
if (active.get(PooledSocket)) |pooled| {
addMemoryBackToPool(pooled);
return;
}
log("Unexpected open on unknown socket", .{});
terminateSocket(socket);
}
pub fn onHandshake(
ptr: *anyopaque,
socket: HTTPSocket,
success: i32,
ssl_error: uws.us_bun_verify_error_t,
) void {
const handshake_success = if (success == 1) true else false;
const handshake_error = HTTPCertError{
.error_no = ssl_error.error_no,
.code = if (ssl_error.code == null) "" else ssl_error.code[0..bun.len(ssl_error.code) :0],
.reason = if (ssl_error.code == null) "" else ssl_error.reason[0..bun.len(ssl_error.reason) :0],
};
const active = getTagged(ptr);
if (active.get(HTTPClient)) |client| {
// handshake completed but we may have ssl errors
client.flags.did_have_handshaking_error = handshake_error.error_no != 0;
if (handshake_success) {
if (client.flags.reject_unauthorized) {
// only reject the connection if reject_unauthorized == true
if (client.flags.did_have_handshaking_error) {
client.closeAndFail(BoringSSL.getCertErrorFromNo(handshake_error.error_no), comptime ssl, socket);
return;
}
// if checkServerIdentity returns false, we dont call open this means that the connection was rejected
const ssl_ptr = @as(*BoringSSL.SSL, @ptrCast(socket.getNativeHandle()));
if (!client.checkServerIdentity(comptime ssl, socket, handshake_error, ssl_ptr, true)) {
client.flags.did_have_handshaking_error = true;
client.unregisterAbortTracker();
if (!socket.isClosed()) terminateSocket(socket);
return;
}
}
return client.firstCall(comptime ssl, socket);
} else {
// if we are here is because server rejected us, and the error_no is the cause of this
// if we set reject_unauthorized == false this means the server requires custom CA aka NODE_EXTRA_CA_CERTS
if (client.flags.did_have_handshaking_error) {
client.closeAndFail(BoringSSL.getCertErrorFromNo(handshake_error.error_no), comptime ssl, socket);
return;
}
// if handshake_success it self is false, this means that the connection was rejected
client.closeAndFail(error.ConnectionRefused, comptime ssl, socket);
return;
}
}
if (socket.isClosed()) {
markSocketAsDead(socket);
if (active.get(PooledSocket)) |pooled| {
addMemoryBackToPool(pooled);
}
return;
}
if (handshake_success) {
if (active.is(PooledSocket)) {
// Allow pooled sockets to be reused if the handshake was successful.
socket.setTimeout(0);
socket.setTimeoutMinutes(5);
return;
}
}
if (active.get(PooledSocket)) |pooled| {
addMemoryBackToPool(pooled);
}
terminateSocket(socket);
}
pub fn onClose(
ptr: *anyopaque,
socket: HTTPSocket,
_: c_int,
_: ?*anyopaque,
) void {
const tagged = getTagged(ptr);
markSocketAsDead(socket);
if (tagged.get(HTTPClient)) |client| {
return client.onClose(comptime ssl, socket);
}
if (tagged.get(PooledSocket)) |pooled| {
addMemoryBackToPool(pooled);
}
return;
}
fn addMemoryBackToPool(pooled: *PooledSocket) void {
assert(context().pending_sockets.put(pooled));
}
pub fn onData(
ptr: *anyopaque,
socket: HTTPSocket,
buf: []const u8,
) void {
const tagged = getTagged(ptr);
if (tagged.get(HTTPClient)) |client| {
return client.onData(
comptime ssl,
buf,
getContext(ssl),
socket,
);
} else if (tagged.is(PooledSocket)) {
// trailing zero is fine to ignore
if (strings.eqlComptime(buf, end_of_chunked_http1_1_encoding_response_body)) {
return;
}
log("Unexpected data on socket", .{});
return;
}
log("Unexpected data on unknown socket", .{});
terminateSocket(socket);
}
pub fn onWritable(
ptr: *anyopaque,
socket: HTTPSocket,
) void {
const tagged = getTagged(ptr);
if (tagged.get(HTTPClient)) |client| {
return client.onWritable(
false,
comptime ssl,
socket,
);
} else if (tagged.is(PooledSocket)) {
// it's a keep-alive socket
} else {
// don't know what this is, let's close it
log("Unexpected writable on socket", .{});
terminateSocket(socket);
}
}
pub fn onLongTimeout(
ptr: *anyopaque,
socket: HTTPSocket,
) void {
const tagged = getTagged(ptr);
if (tagged.get(HTTPClient)) |client| {
return client.onTimeout(comptime ssl, socket);
} else if (tagged.get(PooledSocket)) |pooled| {
// If a socket has been sitting around for 5 minutes
// Let's close it and remove it from the pool.
addMemoryBackToPool(pooled);
}
terminateSocket(socket);
}
pub fn onConnectError(
ptr: *anyopaque,
socket: HTTPSocket,
_: c_int,
) void {
const tagged = getTagged(ptr);
markSocketAsDead(socket);
if (tagged.get(HTTPClient)) |client| {
client.onConnectError();
} else if (tagged.get(PooledSocket)) |pooled| {
addMemoryBackToPool(pooled);
}
// us_connecting_socket_close is always called internally by uSockets
}
pub fn onEnd(
_: *anyopaque,
socket: HTTPSocket,
) void {
// TCP fin must be closed, but we must keep the original tagged
// pointer so that their onClose callback is called.
//
// Three possible states:
// 1. HTTP Keep-Alive socket: it must be removed from the pool
// 2. HTTP Client socket: it might need to be retried
// 3. Dead socket: it is already marked as dead
socket.close(.failure);
}
};
fn existingSocket(this: *@This(), reject_unauthorized: bool, hostname: []const u8, port: u16) ?HTTPSocket {
if (hostname.len > MAX_KEEPALIVE_HOSTNAME)
return null;
var iter = this.pending_sockets.available.iterator(.{ .kind = .unset });
while (iter.next()) |pending_socket_index| {
var socket = this.pending_sockets.at(@as(u16, @intCast(pending_socket_index)));
if (socket.port != port) {
continue;
}
if (socket.did_have_handshaking_error_while_reject_unauthorized_is_false and reject_unauthorized) {
continue;
}
if (strings.eqlLong(socket.hostname_buf[0..socket.hostname_len], hostname, true)) {
const http_socket = socket.http_socket;
assert(context().pending_sockets.put(socket));
if (http_socket.isClosed()) {
markSocketAsDead(http_socket);
continue;
}
if (http_socket.isShutdown() or http_socket.getError() != 0) {
terminateSocket(http_socket);
continue;
}
log("+ Keep-Alive reuse {s}:{d}", .{ hostname, port });
return http_socket;
}
}
return null;
}
pub fn connectSocket(this: *@This(), client: *HTTPClient, socket_path: []const u8) !HTTPSocket {
client.connected_url = if (client.http_proxy) |proxy| proxy else client.url;
const socket = try HTTPSocket.connectUnixAnon(
socket_path,
this.us_socket_context,
ActiveSocket.init(client).ptr(),
false, // dont allow half-open sockets
);
client.allow_retry = false;
return socket;
}
pub fn connect(this: *@This(), client: *HTTPClient, hostname_: []const u8, port: u16) !HTTPSocket {
const hostname = if (FeatureFlags.hardcode_localhost_to_127_0_0_1 and strings.eqlComptime(hostname_, "localhost"))
"127.0.0.1"
else
hostname_;
client.connected_url = if (client.http_proxy) |proxy| proxy else client.url;
client.connected_url.hostname = hostname;
if (client.isKeepAlivePossible()) {
if (this.existingSocket(client.flags.reject_unauthorized, hostname, port)) |sock| {
if (sock.ext(**anyopaque)) |ctx| {
ctx.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr());
}
client.allow_retry = true;
try client.onOpen(comptime ssl, sock);
if (comptime ssl) {
client.firstCall(comptime ssl, sock);
}
return sock;
}
}
const socket = try HTTPSocket.connectAnon(
hostname,
port,
this.us_socket_context,
ActiveSocket.init(client).ptr(),
false,
);
client.allow_retry = false;
return socket;
}
};
}
pub const HTTPThread = struct {
loop: *JSC.MiniEventLoop,
http_context: NewHTTPContext(false),
https_context: NewHTTPContext(true),
queued_tasks: Queue = Queue{},
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){},
queued_shutdowns_lock: bun.Mutex = .{},
queued_writes_lock: bun.Mutex = .{},
queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
timer: std.time.Timer,
lazy_libdeflater: ?*LibdeflateState = null,
lazy_request_body_buffer: ?*HeapRequestBodyBuffer = null,
pub const HeapRequestBodyBuffer = struct {
buffer: [512 * 1024]u8 = undefined,
fixed_buffer_allocator: std.heap.FixedBufferAllocator,
pub usingnamespace bun.New(@This());
pub fn init() *@This() {
var this = HeapRequestBodyBuffer.new(.{
.fixed_buffer_allocator = undefined,
});
this.fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(&this.buffer);
return this;
}
pub fn put(this: *@This()) void {
if (http_thread.lazy_request_body_buffer == null) {
// This case hypothetically should never happen
this.fixed_buffer_allocator.reset();
http_thread.lazy_request_body_buffer = this;
} else {
this.deinit();
}
}
pub fn deinit(this: *@This()) void {
this.destroy();
}
};
pub const RequestBodyBuffer = union(enum) {
heap: *HeapRequestBodyBuffer,
stack: std.heap.StackFallbackAllocator(request_body_send_stack_buffer_size),
pub fn deinit(this: *@This()) void {
switch (this.*) {
.heap => |heap| heap.put(),
.stack => {},
}
}
pub fn allocatedSlice(this: *@This()) []u8 {
return switch (this.*) {
.heap => |heap| &heap.buffer,
.stack => |*stack| &stack.buffer,
};
}
pub fn allocator(this: *@This()) std.mem.Allocator {
return switch (this.*) {
.heap => |heap| heap.fixed_buffer_allocator.allocator(),
.stack => |*stack| stack.get(),
};
}
pub fn toArrayList(this: *@This()) std.ArrayList(u8) {
var arraylist = std.ArrayList(u8).fromOwnedSlice(this.allocator(), this.allocatedSlice());
arraylist.items.len = 0;
return arraylist;
}
};
const threadlog = Output.scoped(.HTTPThread, true);
const WriteMessage = struct {
data: []const u8,
async_http_id: u32,
flags: packed struct {
is_tls: bool,
ended: bool,
},
};
const ShutdownMessage = struct {
async_http_id: u32,
is_tls: bool,
};
pub const LibdeflateState = struct {
decompressor: *bun.libdeflate.Decompressor = undefined,
shared_buffer: [512 * 1024]u8 = undefined,
pub usingnamespace bun.New(@This());
};
const request_body_send_stack_buffer_size = 32 * 1024;
pub inline fn getRequestBodySendBuffer(this: *@This(), estimated_size: usize) RequestBodyBuffer {
if (estimated_size >= request_body_send_stack_buffer_size) {
if (this.lazy_request_body_buffer == null) {
log("Allocating HeapRequestBodyBuffer due to {d} bytes request body", .{estimated_size});
return .{
.heap = HeapRequestBodyBuffer.init(),
};
}
return .{ .heap = bun.take(&this.lazy_request_body_buffer).? };
}
return .{
.stack = std.heap.stackFallback(request_body_send_stack_buffer_size, bun.default_allocator),
};
}
pub fn deflater(this: *@This()) *LibdeflateState {
if (this.lazy_libdeflater == null) {
this.lazy_libdeflater = LibdeflateState.new(.{
.decompressor = bun.libdeflate.Decompressor.alloc() orelse bun.outOfMemory(),
});
}
return this.lazy_libdeflater.?;
}
fn initOnce(opts: *const InitOpts) void {
http_thread = .{
.loop = undefined,
.http_context = .{
.us_socket_context = undefined,
},
.https_context = .{
.us_socket_context = undefined,
},
.timer = std.time.Timer.start() catch unreachable,
};
bun.libdeflate.load();
const thread = std.Thread.spawn(
.{
.stack_size = bun.default_thread_stack_size,
},
onStart,
.{opts.*},
) catch |err| Output.panic("Failed to start HTTP Client thread: {s}", .{@errorName(err)});
thread.detach();
}
var init_once = bun.once(initOnce);
pub fn init(opts: *const InitOpts) void {
init_once.call(.{opts});
}
pub fn onStart(opts: InitOpts) void {
Output.Source.configureNamedThread("HTTP Client");
const loop = bun.JSC.MiniEventLoop.initGlobal(null);
if (Environment.isWindows) {
_ = std.process.getenvW(comptime bun.strings.w("SystemRoot")) orelse {
bun.Output.errGeneric("The %SystemRoot% environment variable is not set. Bun needs this set in order for network requests to work.", .{});
Global.crash();
};
}
http_thread.loop = loop;
http_thread.http_context.init();
http_thread.https_context.initWithThreadOpts(&opts) catch |err| opts.onInitError(err, opts);
http_thread.has_awoken.store(true, .monotonic);
http_thread.processEvents();
}
pub fn connect(this: *@This(), client: *HTTPClient, comptime is_ssl: bool) !NewHTTPContext(is_ssl).HTTPSocket {
if (client.unix_socket_path.length() > 0) {
return try this.context(is_ssl).connectSocket(client, client.unix_socket_path.slice());
}
if (comptime is_ssl) {
const needs_own_context = client.tls_props != null and client.tls_props.?.requires_custom_request_ctx;
if (needs_own_context) {
var requested_config = client.tls_props.?;
for (custom_ssl_context_map.keys()) |other_config| {
if (requested_config.isSame(other_config)) {
// we free the callers config since we have a existing one
if (requested_config != client.tls_props) {
requested_config.deinit();
bun.default_allocator.destroy(requested_config);
}
client.tls_props = other_config;
if (client.http_proxy) |url| {
return try custom_ssl_context_map.get(other_config).?.connect(client, url.hostname, url.getPortAuto());
} else {
return try custom_ssl_context_map.get(other_config).?.connect(client, client.url.hostname, client.url.getPortAuto());
}
}
}
// we need the config so dont free it
var custom_context = try bun.default_allocator.create(NewHTTPContext(is_ssl));
custom_context.initWithClientConfig(client) catch |err| {
client.tls_props = null;
requested_config.deinit();
bun.default_allocator.destroy(requested_config);
bun.default_allocator.destroy(custom_context);
// TODO: these error names reach js. figure out how they should be handled
return switch (err) {
error.FailedToOpenSocket => |e| e,
error.InvalidCA => error.FailedToOpenSocket,
error.InvalidCAFile => error.FailedToOpenSocket,
error.LoadCAFile => error.FailedToOpenSocket,
};
};
try custom_ssl_context_map.put(requested_config, custom_context);
// We might deinit the socket context, so we disable keepalive to make sure we don't
// free it while in use.
client.flags.disable_keepalive = true;
if (client.http_proxy) |url| {
// https://github.com/oven-sh/bun/issues/11343
if (url.protocol.len == 0 or strings.eqlComptime(url.protocol, "https") or strings.eqlComptime(url.protocol, "http")) {
return try this.context(is_ssl).connect(client, url.hostname, url.getPortAuto());
}
return error.UnsupportedProxyProtocol;
}
return try custom_context.connect(client, client.url.hostname, client.url.getPortAuto());
}
}
if (client.http_proxy) |url| {
if (url.href.len > 0) {
// https://github.com/oven-sh/bun/issues/11343
if (url.protocol.len == 0 or strings.eqlComptime(url.protocol, "https") or strings.eqlComptime(url.protocol, "http")) {
return try this.context(is_ssl).connect(client, url.hostname, url.getPortAuto());
}
return error.UnsupportedProxyProtocol;
}
}
return try this.context(is_ssl).connect(client, client.url.hostname, client.url.getPortAuto());
}
pub fn context(this: *@This(), comptime is_ssl: bool) *NewHTTPContext(is_ssl) {
return if (is_ssl) &this.https_context else &this.http_context;
}
fn drainEvents(this: *@This()) void {
{
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
for (this.queued_shutdowns.items) |http| {
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
if (http.is_tls) {
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
// do a fast shutdown here since we are aborting and we dont want to wait for the close_notify from the other side
socket.close(.failure);
} else {
const socket = uws.SocketTCP.fromAny(socket_ptr.value);
socket.close(.failure);
}
}
}
this.queued_shutdowns.clearRetainingCapacity();
}
{
this.queued_writes_lock.lock();
defer this.queued_writes_lock.unlock();
for (this.queued_writes.items) |write| {
const ended = write.flags.ended;
defer if (!strings.eqlComptime(write.data, end_of_chunked_http1_1_encoding_response_body) and write.data.len > 0) {
// "0\r\n\r\n" is always a static so no need to free
bun.default_allocator.free(write.data);
};
if (socket_async_http_abort_tracker.get(write.async_http_id)) |socket_ptr| {
if (write.flags.is_tls) {
const socket = uws.SocketTLS.fromAny(socket_ptr);
if (socket.isClosed() or socket.isShutdown()) {
continue;
}
const tagged = NewHTTPContext(true).getTaggedFromSocket(socket);
if (tagged.get(HTTPClient)) |client| {
if (client.state.original_request_body == .stream) {
var stream = &client.state.original_request_body.stream;
if (write.data.len > 0) {
stream.buffer.write(write.data) catch {};
}
stream.ended = ended;
if (!stream.has_backpressure) {
client.onWritable(
false,
true,
socket,
);
}
}
}
} else {
const socket = uws.SocketTCP.fromAny(socket_ptr);
if (socket.isClosed() or socket.isShutdown()) {
continue;
}
const tagged = NewHTTPContext(false).getTaggedFromSocket(socket);
if (tagged.get(HTTPClient)) |client| {
if (client.state.original_request_body == .stream) {
var stream = &client.state.original_request_body.stream;
if (write.data.len > 0) {
stream.buffer.write(write.data) catch {};
}
stream.ended = ended;
if (!stream.has_backpressure) {
client.onWritable(
false,
false,
socket,
);
}
}
}
}
}
}
this.queued_writes.clearRetainingCapacity();
}
while (this.queued_proxy_deref.popOrNull()) |http| {
http.deref();
}
var count: usize = 0;
var active = AsyncHTTP.active_requests_count.load(.monotonic);
const max = AsyncHTTP.max_simultaneous_requests.load(.monotonic);
if (active >= max) return;
defer {
if (comptime Environment.allow_assert) {
if (count > 0)
log("Processed {d} tasks\n", .{count});
}
}
while (this.queued_tasks.pop()) |http| {
var cloned = ThreadlocalAsyncHTTP.new(.{
.async_http = http.*,
});
cloned.async_http.real = http;
cloned.async_http.onStart();
if (comptime Environment.allow_assert) {
count += 1;
}
active += 1;
if (active >= max) break;
}
}
fn processEvents(this: *@This()) noreturn {
if (comptime Environment.isPosix) {
this.loop.loop.num_polls = @max(2, this.loop.loop.num_polls);
} else if (comptime Environment.isWindows) {
this.loop.loop.inc();
} else {
@compileError("TODO:");
}
while (true) {
this.drainEvents();
var start_time: i128 = 0;
if (comptime Environment.isDebug) {
start_time = std.time.nanoTimestamp();
}
Output.flush();
this.loop.loop.inc();
this.loop.loop.tick();
this.loop.loop.dec();
// this.loop.run();
if (comptime Environment.isDebug) {
const end = std.time.nanoTimestamp();
threadlog("Waited {any}\n", .{std.fmt.fmtDurationSigned(@as(i64, @truncate(end - start_time)))});
Output.flush();
}
}
}
pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void {
{
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
this.queued_shutdowns.append(bun.default_allocator, .{
.async_http_id = http.async_http_id,
.is_tls = http.client.isHTTPS(),
}) catch bun.outOfMemory();
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}
pub fn scheduleRequestWrite(this: *@This(), http: *AsyncHTTP, data: []const u8, ended: bool) void {
{
this.queued_writes_lock.lock();
defer this.queued_writes_lock.unlock();
this.queued_writes.append(bun.default_allocator, .{
.async_http_id = http.async_http_id,
.data = data,
.flags = .{
.is_tls = http.client.isHTTPS(),
.ended = ended,
},
}) catch bun.outOfMemory();
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}
pub fn scheduleProxyDeref(this: *@This(), proxy: *ProxyTunnel) void {
// this is always called on the http thread
{
this.queued_proxy_deref.append(bun.default_allocator, proxy) catch bun.outOfMemory();
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}
pub fn wakeup(this: *@This()) void {
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}
pub fn schedule(this: *@This(), batch: Batch) void {
if (batch.len == 0)
return;
{
var batch_ = batch;
while (batch_.pop()) |task| {
const http: *AsyncHTTP = @fieldParentPtr("task", task);
this.queued_tasks.push(http);
}
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}
};