mirror of
https://github.com/oven-sh/bun
synced 2026-02-16 22:01:47 +00:00
Compare commits
4 Commits
claude/fix
...
ciro/http-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec6601eb41 | ||
|
|
da8c2273e4 | ||
|
|
15a89b98c6 | ||
|
|
682aa09944 |
2560
src/http.zig
2560
src/http.zig
File diff suppressed because it is too large
Load Diff
567
src/http/client/async_http.zig
Normal file
567
src/http/client/async_http.zig
Normal file
@@ -0,0 +1,567 @@
|
||||
const bun = @import("root").bun;
|
||||
const std = @import("std");
|
||||
const string = bun.string;
|
||||
const Environment = bun.Environment;
|
||||
const SSLConfig = bun.server.ServerConfig.SSLConfig;
|
||||
const picohttp = bun.picohttp;
|
||||
const JSC = bun.JSC;
|
||||
const assert = bun.assert;
|
||||
const Log = bun.logger.Log;
|
||||
const DotEnv = bun.DotEnv;
|
||||
const Task = bun.ThreadPool.Task;
|
||||
const Loc = bun.logger.Loc;
|
||||
const Batch = bun.ThreadPool.Batch;
|
||||
const FeatureFlags = bun.FeatureFlags;
|
||||
const MutableString = bun.MutableString;
|
||||
const Headers = JSC.WebCore.Headers;
|
||||
pub const UnboundedQueue = bun.UnboundedQueue;
|
||||
pub const Queue = UnboundedQueue(AsyncHTTP, .next);
|
||||
pub const ShutdownQueue = UnboundedQueue(AsyncHTTP, .next);
|
||||
pub const RequestWriteQueue = UnboundedQueue(AsyncHTTP, .next);
|
||||
const HTTPRequestBody = @import("./request_body.zig").HTTPRequestBody;
|
||||
const Method = @import("../method.zig").Method;
|
||||
const URL = bun.URL;
|
||||
const ThreadPool = bun.ThreadPool;
|
||||
const uws = bun.uws;
|
||||
const PercentEncoding = @import("../../url.zig").PercentEncoding;
|
||||
const http_thread = @import("./thread.zig").getHttpThread();
|
||||
const Signals = @import("./signals.zig").Signals;
|
||||
const HTTPClient = @import("../../http.zig").HTTPClient;
|
||||
var async_http_id_monotonic: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
|
||||
const default_allocator = bun.default_allocator;
|
||||
var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSocket).init(default_allocator);
|
||||
const log = bun.Output.scoped(.fetch, false);
|
||||
|
||||
const HTTPCallbackPair = .{ *AsyncHTTP, HTTPClientResult };
|
||||
pub const HTTPChannel = @import("../../sync.zig").Channel(HTTPCallbackPair, .{ .Static = 1000 });
|
||||
const HTTPThread = @import("./thread.zig").HTTPThread;
|
||||
// 32 pointers much cheaper than 1000 pointers
|
||||
const SingleHTTPChannel = struct {
|
||||
const SingleHTTPCHannel_ = @import("../../sync.zig").Channel(HTTPClientResult, .{ .Static = 8 });
|
||||
channel: SingleHTTPCHannel_,
|
||||
pub fn reset(_: *@This()) void {}
|
||||
pub fn init() SingleHTTPChannel {
|
||||
return SingleHTTPChannel{ .channel = SingleHTTPCHannel_.init() };
|
||||
}
|
||||
};
|
||||
|
||||
pub const HTTPChannelContext = struct {
|
||||
http: AsyncHTTP = undefined,
|
||||
channel: *HTTPChannel,
|
||||
|
||||
pub fn callback(data: HTTPCallbackPair) void {
|
||||
var this: *HTTPChannelContext = @fieldParentPtr("http", data.@"0");
|
||||
this.channel.writeItem(data) catch unreachable;
|
||||
}
|
||||
};
|
||||
|
||||
pub const HTTPVerboseLevel = enum {
|
||||
none,
|
||||
headers,
|
||||
curl,
|
||||
};
|
||||
|
||||
pub fn registerAbortTracker(
|
||||
async_http_id: u32,
|
||||
socket: uws.InternalSocket,
|
||||
) void {
|
||||
socket_async_http_abort_tracker.put(async_http_id, socket) catch unreachable;
|
||||
}
|
||||
|
||||
pub fn unregisterAbortTracker(
|
||||
async_http_id: u32,
|
||||
) void {
|
||||
_ = socket_async_http_abort_tracker.swapRemove(async_http_id);
|
||||
}
|
||||
|
||||
const HTTPClientResult = @import("./result.zig").HTTPClientResult;
|
||||
pub fn getSocketAsyncHTTPAbortTracker() *std.AutoArrayHashMap(u32, uws.InternalSocket) {
|
||||
return &socket_async_http_abort_tracker;
|
||||
}
|
||||
// Exists for heap stats reasons.
|
||||
pub const ThreadlocalAsyncHTTP = struct {
|
||||
async_http: AsyncHTTP,
|
||||
pub usingnamespace bun.New(@This());
|
||||
};
|
||||
|
||||
pub const FetchRedirect = enum(u8) {
|
||||
follow,
|
||||
manual,
|
||||
@"error",
|
||||
|
||||
pub const Map = bun.ComptimeStringMap(FetchRedirect, .{
|
||||
.{ "follow", .follow },
|
||||
.{ "manual", .manual },
|
||||
.{ "error", .@"error" },
|
||||
});
|
||||
};
|
||||
|
||||
pub const AsyncHTTP = struct {
|
||||
request: ?picohttp.Request = null,
|
||||
response: ?picohttp.Response = null,
|
||||
request_headers: Headers.Entry.List = .empty,
|
||||
response_headers: Headers.Entry.List = .empty,
|
||||
response_buffer: *MutableString,
|
||||
request_body: HTTPRequestBody = .{ .bytes = "" },
|
||||
allocator: std.mem.Allocator,
|
||||
request_header_buf: string = "",
|
||||
method: Method = Method.GET,
|
||||
url: URL,
|
||||
http_proxy: ?URL = null,
|
||||
real: ?*AsyncHTTP = null,
|
||||
next: ?*AsyncHTTP = null,
|
||||
|
||||
task: ThreadPool.Task = ThreadPool.Task{ .callback = &startAsyncHTTP },
|
||||
result_callback: HTTPClientResult.Callback = undefined,
|
||||
|
||||
redirected: bool = false,
|
||||
|
||||
response_encoding: Encoding = Encoding.identity,
|
||||
verbose: HTTPVerboseLevel = .none,
|
||||
|
||||
client: HTTPClient = undefined,
|
||||
waitingDeffered: bool = false,
|
||||
finalized: bool = false,
|
||||
err: ?anyerror = null,
|
||||
async_http_id: u32 = 0,
|
||||
|
||||
state: AtomicState = AtomicState.init(State.pending),
|
||||
elapsed: u64 = 0,
|
||||
gzip_elapsed: u64 = 0,
|
||||
|
||||
signals: Signals = .{},
|
||||
|
||||
pub var active_requests_count = std.atomic.Value(usize).init(0);
|
||||
pub var max_simultaneous_requests = std.atomic.Value(usize).init(256);
|
||||
|
||||
pub fn loadEnv(allocator: std.mem.Allocator, logger: *Log, env: *DotEnv.Loader) void {
|
||||
if (env.get("BUN_CONFIG_MAX_HTTP_REQUESTS")) |max_http_requests| {
|
||||
const max = std.fmt.parseInt(u16, max_http_requests, 10) catch {
|
||||
logger.addErrorFmt(
|
||||
null,
|
||||
Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value \"{s}\" is not a valid integer between 1 and 65535",
|
||||
.{max_http_requests},
|
||||
) catch unreachable;
|
||||
return;
|
||||
};
|
||||
if (max == 0) {
|
||||
logger.addWarningFmt(
|
||||
null,
|
||||
Loc.Empty,
|
||||
allocator,
|
||||
"BUN_CONFIG_MAX_HTTP_REQUESTS value must be a number between 1 and 65535",
|
||||
.{},
|
||||
) catch unreachable;
|
||||
return;
|
||||
}
|
||||
AsyncHTTP.max_simultaneous_requests.store(max, .monotonic);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn signalHeaderProgress(this: *AsyncHTTP) void {
|
||||
var progress = this.signals.header_progress orelse return;
|
||||
progress.store(true, .release);
|
||||
}
|
||||
|
||||
pub fn enableBodyStreaming(this: *AsyncHTTP) void {
|
||||
var stream = this.signals.body_streaming orelse return;
|
||||
stream.store(true, .release);
|
||||
}
|
||||
|
||||
pub fn clearData(this: *AsyncHTTP) void {
|
||||
this.response_headers.deinit(this.allocator);
|
||||
this.response_headers = .{};
|
||||
this.request = null;
|
||||
this.response = null;
|
||||
this.client.unix_socket_path.deinit();
|
||||
this.client.unix_socket_path = JSC.ZigString.Slice.empty;
|
||||
}
|
||||
|
||||
pub const State = enum(u32) {
|
||||
pending = 0,
|
||||
scheduled = 1,
|
||||
sending = 2,
|
||||
success = 3,
|
||||
fail = 4,
|
||||
};
|
||||
const AtomicState = std.atomic.Value(State);
|
||||
|
||||
pub const Options = struct {
|
||||
http_proxy: ?URL = null,
|
||||
hostname: ?[]u8 = null,
|
||||
signals: ?Signals = null,
|
||||
unix_socket_path: ?JSC.ZigString.Slice = null,
|
||||
disable_timeout: ?bool = null,
|
||||
verbose: ?HTTPVerboseLevel = null,
|
||||
disable_keepalive: ?bool = null,
|
||||
disable_decompression: ?bool = null,
|
||||
reject_unauthorized: ?bool = null,
|
||||
tls_props: ?*SSLConfig = null,
|
||||
};
|
||||
|
||||
const Preconnect = struct {
|
||||
async_http: AsyncHTTP,
|
||||
response_buffer: MutableString,
|
||||
url: bun.URL,
|
||||
is_url_owned: bool,
|
||||
|
||||
pub usingnamespace bun.New(@This());
|
||||
|
||||
pub fn onResult(this: *Preconnect, _: *AsyncHTTP, _: HTTPClientResult) void {
|
||||
this.response_buffer.deinit();
|
||||
this.async_http.clearData();
|
||||
this.async_http.client.deinit();
|
||||
if (this.is_url_owned) {
|
||||
default_allocator.free(this.url.href);
|
||||
}
|
||||
|
||||
this.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn preconnect(
|
||||
url: URL,
|
||||
is_url_owned: bool,
|
||||
) void {
|
||||
if (!FeatureFlags.is_fetch_preconnect_supported) {
|
||||
if (is_url_owned) {
|
||||
default_allocator.free(url.href);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
var this = Preconnect.new(.{
|
||||
.async_http = undefined,
|
||||
.response_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
.url = url,
|
||||
.is_url_owned = is_url_owned,
|
||||
});
|
||||
|
||||
this.async_http = AsyncHTTP.init(default_allocator, .GET, url, .{}, "", &this.response_buffer, "", HTTPClientResult.Callback.New(*Preconnect, Preconnect.onResult).init(this), .manual, .{});
|
||||
this.async_http.client.flags.is_preconnect_only = true;
|
||||
|
||||
http_thread.schedule(Batch.from(&this.async_http.task));
|
||||
}
|
||||
|
||||
pub fn init(
|
||||
allocator: std.mem.Allocator,
|
||||
method: Method,
|
||||
url: URL,
|
||||
headers: Headers.Entry.List,
|
||||
headers_buf: string,
|
||||
response_buffer: *MutableString,
|
||||
request_body: []const u8,
|
||||
callback: HTTPClientResult.Callback,
|
||||
redirect_type: FetchRedirect,
|
||||
options: Options,
|
||||
) AsyncHTTP {
|
||||
var this = AsyncHTTP{
|
||||
.allocator = allocator,
|
||||
.url = url,
|
||||
.method = method,
|
||||
.request_headers = headers,
|
||||
.request_header_buf = headers_buf,
|
||||
.request_body = .{ .bytes = request_body },
|
||||
.response_buffer = response_buffer,
|
||||
.result_callback = callback,
|
||||
.http_proxy = options.http_proxy,
|
||||
.signals = options.signals orelse .{},
|
||||
.async_http_id = if (options.signals != null and options.signals.?.aborted != null) async_http_id_monotonic.fetchAdd(1, .monotonic) else 0,
|
||||
};
|
||||
|
||||
this.client = .{
|
||||
.allocator = allocator,
|
||||
.method = method,
|
||||
.url = url,
|
||||
.header_entries = headers,
|
||||
.header_buf = headers_buf,
|
||||
.hostname = options.hostname,
|
||||
.signals = options.signals orelse this.signals,
|
||||
.async_http_id = this.async_http_id,
|
||||
.http_proxy = this.http_proxy,
|
||||
.redirect_type = redirect_type,
|
||||
};
|
||||
if (options.unix_socket_path) |val| {
|
||||
assert(this.client.unix_socket_path.length() == 0);
|
||||
this.client.unix_socket_path = val;
|
||||
}
|
||||
if (options.disable_timeout) |val| {
|
||||
this.client.flags.disable_timeout = val;
|
||||
}
|
||||
if (options.verbose) |val| {
|
||||
this.client.verbose = val;
|
||||
}
|
||||
if (options.disable_decompression) |val| {
|
||||
this.client.flags.disable_decompression = val;
|
||||
}
|
||||
if (options.disable_keepalive) |val| {
|
||||
this.client.flags.disable_keepalive = val;
|
||||
}
|
||||
if (options.reject_unauthorized) |val| {
|
||||
this.client.flags.reject_unauthorized = val;
|
||||
}
|
||||
if (options.tls_props) |val| {
|
||||
this.client.tls_props = val;
|
||||
}
|
||||
|
||||
if (options.http_proxy) |proxy| {
|
||||
// Username between 0 and 4096 chars
|
||||
if (proxy.username.len > 0 and proxy.username.len < 4096) {
|
||||
// Password between 0 and 4096 chars
|
||||
if (proxy.password.len > 0 and proxy.password.len < 4096) {
|
||||
// decode password
|
||||
var password_buffer = std.mem.zeroes([4096]u8);
|
||||
var password_stream = std.io.fixedBufferStream(&password_buffer);
|
||||
const password_writer = password_stream.writer();
|
||||
const PassWriter = @TypeOf(password_writer);
|
||||
const password_len = PercentEncoding.decode(PassWriter, password_writer, proxy.password) catch {
|
||||
// Invalid proxy authorization
|
||||
return this;
|
||||
};
|
||||
const password = password_buffer[0..password_len];
|
||||
|
||||
// Decode username
|
||||
var username_buffer = std.mem.zeroes([4096]u8);
|
||||
var username_stream = std.io.fixedBufferStream(&username_buffer);
|
||||
const username_writer = username_stream.writer();
|
||||
const UserWriter = @TypeOf(username_writer);
|
||||
const username_len = PercentEncoding.decode(UserWriter, username_writer, proxy.username) catch {
|
||||
// Invalid proxy authorization
|
||||
return this;
|
||||
};
|
||||
const username = username_buffer[0..username_len];
|
||||
|
||||
// concat user and password
|
||||
const auth = std.fmt.allocPrint(allocator, "{s}:{s}", .{ username, password }) catch unreachable;
|
||||
defer allocator.free(auth);
|
||||
const size = std.base64.standard.Encoder.calcSize(auth.len);
|
||||
var buf = this.allocator.alloc(u8, size + "Basic ".len) catch unreachable;
|
||||
const encoded = std.base64.url_safe.Encoder.encode(buf["Basic ".len..], auth);
|
||||
buf[0.."Basic ".len].* = "Basic ".*;
|
||||
this.client.proxy_authorization = buf[0 .. "Basic ".len + encoded.len];
|
||||
} else {
|
||||
//Decode username
|
||||
var username_buffer = std.mem.zeroes([4096]u8);
|
||||
var username_stream = std.io.fixedBufferStream(&username_buffer);
|
||||
const username_writer = username_stream.writer();
|
||||
const UserWriter = @TypeOf(username_writer);
|
||||
const username_len = PercentEncoding.decode(UserWriter, username_writer, proxy.username) catch {
|
||||
// Invalid proxy authorization
|
||||
return this;
|
||||
};
|
||||
const username = username_buffer[0..username_len];
|
||||
|
||||
// only use user
|
||||
const size = std.base64.standard.Encoder.calcSize(username_len);
|
||||
var buf = allocator.alloc(u8, size + "Basic ".len) catch unreachable;
|
||||
const encoded = std.base64.url_safe.Encoder.encode(buf["Basic ".len..], username);
|
||||
buf[0.."Basic ".len].* = "Basic ".*;
|
||||
this.client.proxy_authorization = buf[0 .. "Basic ".len + encoded.len];
|
||||
}
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn initSync(
|
||||
allocator: std.mem.Allocator,
|
||||
method: Method,
|
||||
url: URL,
|
||||
headers: Headers.Entry.List,
|
||||
headers_buf: string,
|
||||
response_buffer: *MutableString,
|
||||
request_body: []const u8,
|
||||
http_proxy: ?URL,
|
||||
hostname: ?[]u8,
|
||||
redirect_type: FetchRedirect,
|
||||
) AsyncHTTP {
|
||||
return @This().init(
|
||||
allocator,
|
||||
method,
|
||||
url,
|
||||
headers,
|
||||
headers_buf,
|
||||
response_buffer,
|
||||
request_body,
|
||||
undefined,
|
||||
redirect_type,
|
||||
.{
|
||||
.http_proxy = http_proxy,
|
||||
.hostname = hostname,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
fn reset(this: *AsyncHTTP) !void {
|
||||
const aborted = this.client.aborted;
|
||||
this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf, aborted);
|
||||
this.client.http_proxy = this.http_proxy;
|
||||
|
||||
if (this.http_proxy) |proxy| {
|
||||
//TODO: need to understand how is possible to reuse Proxy with TSL, so disable keepalive if url is HTTPS
|
||||
this.client.flags.disable_keepalive = this.url.isHTTPS();
|
||||
// Username between 0 and 4096 chars
|
||||
if (proxy.username.len > 0 and proxy.username.len < 4096) {
|
||||
// Password between 0 and 4096 chars
|
||||
if (proxy.password.len > 0 and proxy.password.len < 4096) {
|
||||
// decode password
|
||||
var password_buffer = std.mem.zeroes([4096]u8);
|
||||
var password_stream = std.io.fixedBufferStream(&password_buffer);
|
||||
const password_writer = password_stream.writer();
|
||||
const PassWriter = @TypeOf(password_writer);
|
||||
const password_len = PercentEncoding.decode(PassWriter, password_writer, proxy.password) catch {
|
||||
// Invalid proxy authorization
|
||||
return this;
|
||||
};
|
||||
const password = password_buffer[0..password_len];
|
||||
|
||||
// Decode username
|
||||
var username_buffer = std.mem.zeroes([4096]u8);
|
||||
var username_stream = std.io.fixedBufferStream(&username_buffer);
|
||||
const username_writer = username_stream.writer();
|
||||
const UserWriter = @TypeOf(username_writer);
|
||||
const username_len = PercentEncoding.decode(UserWriter, username_writer, proxy.username) catch {
|
||||
// Invalid proxy authorization
|
||||
return this;
|
||||
};
|
||||
|
||||
const username = username_buffer[0..username_len];
|
||||
|
||||
// concat user and password
|
||||
const auth = std.fmt.allocPrint(this.allocator, "{s}:{s}", .{ username, password }) catch unreachable;
|
||||
defer this.allocator.free(auth);
|
||||
const size = std.base64.standard.Encoder.calcSize(auth.len);
|
||||
var buf = this.allocator.alloc(u8, size + "Basic ".len) catch unreachable;
|
||||
const encoded = std.base64.url_safe.Encoder.encode(buf["Basic ".len..], auth);
|
||||
buf[0.."Basic ".len].* = "Basic ".*;
|
||||
this.client.proxy_authorization = buf[0 .. "Basic ".len + encoded.len];
|
||||
} else {
|
||||
//Decode username
|
||||
var username_buffer = std.mem.zeroes([4096]u8);
|
||||
var username_stream = std.io.fixedBufferStream(&username_buffer);
|
||||
const username_writer = username_stream.writer();
|
||||
const UserWriter = @TypeOf(username_writer);
|
||||
const username_len = PercentEncoding.decode(UserWriter, username_writer, proxy.username) catch {
|
||||
// Invalid proxy authorization
|
||||
return this;
|
||||
};
|
||||
const username = username_buffer[0..username_len];
|
||||
|
||||
// only use user
|
||||
const size = std.base64.standard.Encoder.calcSize(username_len);
|
||||
var buf = this.allocator.alloc(u8, size + "Basic ".len) catch unreachable;
|
||||
const encoded = std.base64.url_safe.Encoder.encode(buf["Basic ".len..], username);
|
||||
buf[0.."Basic ".len].* = "Basic ".*;
|
||||
this.client.proxy_authorization = buf[0 .. "Basic ".len + encoded.len];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void {
|
||||
this.state.store(.scheduled, .monotonic);
|
||||
batch.push(ThreadPool.Batch.from(&this.task));
|
||||
}
|
||||
|
||||
fn sendSyncCallback(this: *SingleHTTPChannel, async_http: *AsyncHTTP, result: HTTPClientResult) void {
|
||||
async_http.real.?.* = async_http.*;
|
||||
async_http.real.?.response_buffer = async_http.response_buffer;
|
||||
this.channel.writeItem(result) catch unreachable;
|
||||
}
|
||||
|
||||
pub fn sendSync(this: *AsyncHTTP) anyerror!picohttp.Response {
|
||||
HTTPThread.init(&.{});
|
||||
|
||||
var ctx = try default_allocator.create(SingleHTTPChannel);
|
||||
ctx.* = SingleHTTPChannel.init();
|
||||
this.result_callback = HTTPClientResult.Callback.New(
|
||||
*SingleHTTPChannel,
|
||||
sendSyncCallback,
|
||||
).init(ctx);
|
||||
|
||||
var batch = bun.ThreadPool.Batch{};
|
||||
this.schedule(default_allocator, &batch);
|
||||
http_thread.schedule(batch);
|
||||
|
||||
const result = ctx.channel.readItem() catch unreachable;
|
||||
if (result.fail) |err| {
|
||||
return err;
|
||||
}
|
||||
assert(result.metadata != null);
|
||||
return result.metadata.?.response;
|
||||
}
|
||||
|
||||
pub fn onAsyncHTTPCallback(this: *AsyncHTTP, async_http: *AsyncHTTP, result: HTTPClientResult) void {
|
||||
assert(this.real != null);
|
||||
|
||||
var callback = this.result_callback;
|
||||
this.elapsed = http_thread.timer.read() -| this.elapsed;
|
||||
|
||||
// TODO: this condition seems wrong: if we started with a non-default value, we might
|
||||
// report a redirect even if none happened
|
||||
this.redirected = this.client.flags.redirected;
|
||||
if (result.isSuccess()) {
|
||||
this.err = null;
|
||||
if (result.metadata) |metadata| {
|
||||
this.response = metadata.response;
|
||||
}
|
||||
this.state.store(.success, .monotonic);
|
||||
} else {
|
||||
this.err = result.fail;
|
||||
this.response = null;
|
||||
this.state.store(State.fail, .monotonic);
|
||||
}
|
||||
|
||||
if (comptime Environment.enable_logs) {
|
||||
if (socket_async_http_abort_tracker.count() > 0) {
|
||||
log("socket_async_http_abort_tracker count: {d}", .{socket_async_http_abort_tracker.count()});
|
||||
}
|
||||
}
|
||||
|
||||
if (socket_async_http_abort_tracker.capacity() > 10_000 and socket_async_http_abort_tracker.count() < 100) {
|
||||
socket_async_http_abort_tracker.shrinkAndFree(socket_async_http_abort_tracker.count());
|
||||
}
|
||||
|
||||
if (result.has_more) {
|
||||
callback.function(callback.ctx, async_http, result);
|
||||
} else {
|
||||
{
|
||||
this.client.deinit();
|
||||
var threadlocal_http: *ThreadlocalAsyncHTTP = @fieldParentPtr("async_http", async_http);
|
||||
defer threadlocal_http.destroy();
|
||||
log("onAsyncHTTPCallback: {any}", .{std.fmt.fmtDuration(this.elapsed)});
|
||||
callback.function(callback.ctx, async_http, result);
|
||||
}
|
||||
|
||||
const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .monotonic);
|
||||
assert(active_requests > 0);
|
||||
}
|
||||
|
||||
if (!http_thread.queued_tasks.isEmpty() and AsyncHTTP.active_requests_count.load(.monotonic) < AsyncHTTP.max_simultaneous_requests.load(.monotonic)) {
|
||||
http_thread.loop.loop.wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn startAsyncHTTP(task: *Task) void {
|
||||
var this: *AsyncHTTP = @fieldParentPtr("task", task);
|
||||
this.onStart();
|
||||
}
|
||||
|
||||
pub fn onStart(this: *AsyncHTTP) void {
|
||||
_ = active_requests_count.fetchAdd(1, .monotonic);
|
||||
this.err = null;
|
||||
this.state.store(.sending, .monotonic);
|
||||
this.client.result_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPCallback).init(
|
||||
this,
|
||||
);
|
||||
|
||||
this.elapsed = http_thread.timer.read();
|
||||
if (this.response_buffer.list.capacity == 0) {
|
||||
this.response_buffer.allocator = default_allocator;
|
||||
}
|
||||
this.client.start(this.request_body, this.response_buffer);
|
||||
}
|
||||
};
|
||||
13
src/http/client/certificate_info.zig
Normal file
13
src/http/client/certificate_info.zig
Normal file
@@ -0,0 +1,13 @@
|
||||
const HTTPCertError = @import("./errors.zig").HTTPCertError;
|
||||
const std = @import("std");
|
||||
pub const CertificateInfo = struct {
|
||||
cert: []const u8,
|
||||
cert_error: HTTPCertError,
|
||||
hostname: []const u8,
|
||||
pub fn deinit(this: *const CertificateInfo, allocator: std.mem.Allocator) void {
|
||||
allocator.free(this.cert);
|
||||
allocator.free(this.cert_error.code);
|
||||
allocator.free(this.cert_error.reason);
|
||||
allocator.free(this.hostname);
|
||||
}
|
||||
};
|
||||
11
src/http/client/errors.zig
Normal file
11
src/http/client/errors.zig
Normal file
@@ -0,0 +1,11 @@
|
||||
pub const InitError = error{
|
||||
FailedToOpenSocket,
|
||||
LoadCAFile,
|
||||
InvalidCAFile,
|
||||
InvalidCA,
|
||||
};
|
||||
pub const HTTPCertError = struct {
|
||||
error_no: i32 = 0,
|
||||
code: [:0]const u8 = "",
|
||||
reason: [:0]const u8 = "",
|
||||
};
|
||||
367
src/http/client/http_internal_state.zig
Normal file
367
src/http/client/http_internal_state.zig
Normal file
@@ -0,0 +1,367 @@
|
||||
const bun = @import("root").bun;
|
||||
const std = @import("std");
|
||||
|
||||
const picohttp = bun.picohttp;
|
||||
const MutableString = bun.MutableString;
|
||||
const HTTPResponseMetadata = @import("./result.zig").HTTPResponseMetadata;
|
||||
const CertificateInfo = @import("./certificate_info.zig").CertificateInfo;
|
||||
const Zlib = @import("./zlib.zig");
|
||||
const Brotli = bun.brotli;
|
||||
const default_allocator = bun.default_allocator;
|
||||
const assert = bun.assert;
|
||||
const Output = bun.Output;
|
||||
const FeatureFlags = bun.FeatureFlags;
|
||||
const log = bun.Output.scoped(.fetch, false);
|
||||
const HTTPRequestBody = @import("./request_body.zig").HTTPRequestBody;
|
||||
pub const extremely_verbose = false;
|
||||
const http_thread = @import("./thread.zig").getHttpThread();
|
||||
|
||||
pub const Encoding = enum {
|
||||
identity,
|
||||
gzip,
|
||||
deflate,
|
||||
brotli,
|
||||
chunked,
|
||||
|
||||
pub fn canUseLibDeflate(this: Encoding) bool {
|
||||
return switch (this) {
|
||||
.gzip, .deflate => true,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn isCompressed(this: Encoding) bool {
|
||||
return switch (this) {
|
||||
.brotli, .gzip, .deflate => true,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
};
|
||||
const Stage = enum(u8) {
|
||||
pending,
|
||||
connect,
|
||||
done,
|
||||
fail,
|
||||
};
|
||||
|
||||
pub const HTTPStage = enum {
|
||||
pending,
|
||||
headers,
|
||||
body,
|
||||
body_chunk,
|
||||
fail,
|
||||
done,
|
||||
proxy_handshake,
|
||||
proxy_headers,
|
||||
proxy_body,
|
||||
};
|
||||
|
||||
const Decompressor = union(enum) {
|
||||
zlib: *Zlib.ZlibReaderArrayList,
|
||||
brotli: *Brotli.BrotliReaderArrayList,
|
||||
none: void,
|
||||
|
||||
pub fn deinit(this: *Decompressor) void {
|
||||
switch (this.*) {
|
||||
inline .brotli, .zlib => |that| {
|
||||
that.deinit();
|
||||
this.* = .{ .none = {} };
|
||||
},
|
||||
.none => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn updateBuffers(this: *Decompressor, encoding: Encoding, buffer: []const u8, body_out_str: *MutableString) !void {
|
||||
if (!encoding.isCompressed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.* == .none) {
|
||||
switch (encoding) {
|
||||
.gzip, .deflate => {
|
||||
this.* = .{
|
||||
.zlib = try Zlib.ZlibReaderArrayList.initWithOptionsAndListAllocator(
|
||||
buffer,
|
||||
&body_out_str.list,
|
||||
body_out_str.allocator,
|
||||
default_allocator,
|
||||
.{
|
||||
// zlib.MAX_WBITS = 15
|
||||
// to (de-)compress deflate format, use wbits = -zlib.MAX_WBITS
|
||||
// to (de-)compress deflate format with headers we use wbits = 0 (we can detect the first byte using 120)
|
||||
// to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
|
||||
.windowBits = if (encoding == Encoding.gzip) Zlib.MAX_WBITS | 16 else (if (buffer.len > 1 and buffer[0] == 120) 0 else -Zlib.MAX_WBITS),
|
||||
},
|
||||
),
|
||||
};
|
||||
return;
|
||||
},
|
||||
.brotli => {
|
||||
this.* = .{
|
||||
.brotli = try Brotli.BrotliReaderArrayList.newWithOptions(
|
||||
buffer,
|
||||
&body_out_str.list,
|
||||
body_out_str.allocator,
|
||||
.{},
|
||||
),
|
||||
};
|
||||
return;
|
||||
},
|
||||
else => @panic("Invalid encoding. This code should not be reachable"),
|
||||
}
|
||||
}
|
||||
|
||||
switch (this.*) {
|
||||
.zlib => |reader| {
|
||||
assert(reader.zlib.avail_in == 0);
|
||||
reader.zlib.next_in = buffer.ptr;
|
||||
reader.zlib.avail_in = @as(u32, @truncate(buffer.len));
|
||||
|
||||
const initial = body_out_str.list.items.len;
|
||||
body_out_str.list.expandToCapacity();
|
||||
if (body_out_str.list.capacity == initial) {
|
||||
try body_out_str.list.ensureUnusedCapacity(body_out_str.allocator, 4096);
|
||||
body_out_str.list.expandToCapacity();
|
||||
}
|
||||
reader.list = body_out_str.list;
|
||||
reader.zlib.next_out = @ptrCast(&body_out_str.list.items[initial]);
|
||||
reader.zlib.avail_out = @as(u32, @truncate(body_out_str.list.capacity - initial));
|
||||
// we reset the total out so we can track how much we decompressed this time
|
||||
reader.zlib.total_out = @truncate(initial);
|
||||
},
|
||||
.brotli => |reader| {
|
||||
reader.input = buffer;
|
||||
reader.total_in = 0;
|
||||
|
||||
const initial = body_out_str.list.items.len;
|
||||
reader.list = body_out_str.list;
|
||||
reader.total_out = @truncate(initial);
|
||||
},
|
||||
else => @panic("Invalid encoding. This code should not be reachable"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn readAll(this: *Decompressor, is_done: bool) !void {
|
||||
switch (this.*) {
|
||||
.zlib => |zlib| try zlib.readAll(),
|
||||
.brotli => |brotli| try brotli.readAll(is_done),
|
||||
.none => {},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: reduce the size of this struct
|
||||
// Many of these fields can be moved to a packed struct and use less space
|
||||
pub const InternalState = struct {
|
||||
response_message_buffer: MutableString = undefined,
|
||||
/// pending response is the temporary storage for the response headers, url and status code
|
||||
/// this uses shared_response_headers_buf to store the headers
|
||||
/// this will be turned null once the metadata is cloned
|
||||
pending_response: ?picohttp.Response = null,
|
||||
|
||||
/// This is the cloned metadata containing the response headers, url and status code after the .headers phase are received
|
||||
/// will be turned null once returned to the user (the ownership is transferred to the user)
|
||||
/// this can happen after await fetch(...) and the body can continue streaming when this is already null
|
||||
/// the user will receive only chunks of the body stored in body_out_str
|
||||
cloned_metadata: ?HTTPResponseMetadata = null,
|
||||
flags: InternalStateFlags = InternalStateFlags{},
|
||||
|
||||
transfer_encoding: Encoding = Encoding.identity,
|
||||
encoding: Encoding = Encoding.identity,
|
||||
content_encoding_i: u8 = std.math.maxInt(u8),
|
||||
chunked_decoder: picohttp.phr_chunked_decoder = .{},
|
||||
decompressor: Decompressor = .{ .none = {} },
|
||||
stage: Stage = Stage.pending,
|
||||
/// This is owned by the user and should not be freed here
|
||||
body_out_str: ?*MutableString = null,
|
||||
compressed_body: MutableString = undefined,
|
||||
content_length: ?usize = null,
|
||||
total_body_received: usize = 0,
|
||||
request_body: []const u8 = "",
|
||||
original_request_body: HTTPRequestBody = .{ .bytes = "" },
|
||||
request_sent_len: usize = 0,
|
||||
fail: ?anyerror = null,
|
||||
request_stage: HTTPStage = .pending,
|
||||
response_stage: HTTPStage = .pending,
|
||||
certificate_info: ?CertificateInfo = null,
|
||||
|
||||
pub const InternalStateFlags = packed struct {
|
||||
allow_keepalive: bool = true,
|
||||
received_last_chunk: bool = false,
|
||||
did_set_content_encoding: bool = false,
|
||||
is_redirect_pending: bool = false,
|
||||
is_libdeflate_fast_path_disabled: bool = false,
|
||||
resend_request_body_on_redirect: bool = false,
|
||||
};
|
||||
|
||||
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
|
||||
return .{
|
||||
.original_request_body = body,
|
||||
.request_body = if (body == .bytes) body.bytes else "",
|
||||
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
.body_out_str = body_out_str,
|
||||
.stage = Stage.pending,
|
||||
.pending_response = null,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn isChunkedEncoding(this: *InternalState) bool {
|
||||
return this.transfer_encoding == Encoding.chunked;
|
||||
}
|
||||
|
||||
pub fn reset(this: *InternalState, allocator: std.mem.Allocator) void {
|
||||
this.compressed_body.deinit();
|
||||
this.response_message_buffer.deinit();
|
||||
|
||||
const body_msg = this.body_out_str;
|
||||
if (body_msg) |body| body.reset();
|
||||
this.decompressor.deinit();
|
||||
|
||||
// just in case we check and free to avoid leaks
|
||||
if (this.cloned_metadata != null) {
|
||||
this.cloned_metadata.?.deinit(allocator);
|
||||
this.cloned_metadata = null;
|
||||
}
|
||||
|
||||
// if exists we own this info
|
||||
if (this.certificate_info) |info| {
|
||||
this.certificate_info = null;
|
||||
info.deinit(default_allocator);
|
||||
}
|
||||
|
||||
this.original_request_body.deinit();
|
||||
this.* = .{
|
||||
.body_out_str = body_msg,
|
||||
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
.original_request_body = .{ .bytes = "" },
|
||||
.request_body = "",
|
||||
.certificate_info = null,
|
||||
.flags = .{},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn getBodyBuffer(this: *InternalState) *MutableString {
|
||||
if (this.encoding.isCompressed()) {
|
||||
return &this.compressed_body;
|
||||
}
|
||||
|
||||
return this.body_out_str.?;
|
||||
}
|
||||
|
||||
fn isDone(this: *InternalState) bool {
|
||||
if (this.isChunkedEncoding()) {
|
||||
return this.flags.received_last_chunk;
|
||||
}
|
||||
|
||||
if (this.content_length) |content_length| {
|
||||
return this.total_body_received >= content_length;
|
||||
}
|
||||
|
||||
// Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
|
||||
return this.flags.received_last_chunk;
|
||||
}
|
||||
|
||||
fn decompressBytes(this: *InternalState, buffer: []const u8, body_out_str: *MutableString, is_final_chunk: bool) !void {
|
||||
defer this.compressed_body.reset();
|
||||
var gzip_timer: std.time.Timer = undefined;
|
||||
|
||||
if (extremely_verbose)
|
||||
gzip_timer = std.time.Timer.start() catch @panic("Timer failure");
|
||||
|
||||
var still_needs_to_decompress = true;
|
||||
|
||||
if (FeatureFlags.isLibdeflateEnabled()) {
|
||||
// Fast-path: use libdeflate
|
||||
if (is_final_chunk and !this.flags.is_libdeflate_fast_path_disabled and this.encoding.canUseLibDeflate() and this.isDone()) libdeflate: {
|
||||
this.flags.is_libdeflate_fast_path_disabled = true;
|
||||
|
||||
log("Decompressing {d} bytes with libdeflate\n", .{buffer.len});
|
||||
var deflater = http_thread.deflater();
|
||||
|
||||
// gzip stores the size of the uncompressed data in the last 4 bytes of the stream
|
||||
// But it's only valid if the stream is less than 4.7 GB, since it's 4 bytes.
|
||||
// If we know that the stream is going to be larger than our
|
||||
// pre-allocated buffer, then let's dynamically allocate the exact
|
||||
// size.
|
||||
if (this.encoding == Encoding.gzip and buffer.len > 16 and buffer.len < 1024 * 1024 * 1024) {
|
||||
const estimated_size: u32 = @bitCast(buffer[buffer.len - 4 ..][0..4].*);
|
||||
// Since this is arbtirary input from the internet, let's set an upper bound of 32 MB for the allocation size.
|
||||
if (estimated_size > deflater.shared_buffer.len and estimated_size < 32 * 1024 * 1024) {
|
||||
try body_out_str.list.ensureTotalCapacityPrecise(body_out_str.allocator, estimated_size);
|
||||
const result = deflater.decompressor.decompress(buffer, body_out_str.list.allocatedSlice(), .gzip);
|
||||
|
||||
if (result.status == .success) {
|
||||
body_out_str.list.items.len = result.written;
|
||||
still_needs_to_decompress = false;
|
||||
}
|
||||
|
||||
break :libdeflate;
|
||||
}
|
||||
}
|
||||
|
||||
const result = deflater.decompressor.decompress(buffer, &deflater.shared_buffer, switch (this.encoding) {
|
||||
.gzip => .gzip,
|
||||
.deflate => .deflate,
|
||||
else => unreachable,
|
||||
});
|
||||
|
||||
if (result.status == .success) {
|
||||
try body_out_str.list.ensureTotalCapacityPrecise(body_out_str.allocator, result.written);
|
||||
body_out_str.list.appendSliceAssumeCapacity(deflater.shared_buffer[0..result.written]);
|
||||
still_needs_to_decompress = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path, or brotli: use the .decompressor
|
||||
if (still_needs_to_decompress) {
|
||||
log("Decompressing {d} bytes\n", .{buffer.len});
|
||||
if (body_out_str.list.capacity == 0) {
|
||||
const min = @min(@ceil(@as(f64, @floatFromInt(buffer.len)) * 1.5), @as(f64, 1024 * 1024 * 2));
|
||||
try body_out_str.growBy(@max(@as(usize, @intFromFloat(min)), 32));
|
||||
}
|
||||
|
||||
try this.decompressor.updateBuffers(this.encoding, buffer, body_out_str);
|
||||
|
||||
this.decompressor.readAll(this.isDone()) catch |err| {
|
||||
if (this.isDone() or error.ShortRead != err) {
|
||||
Output.prettyErrorln("<r><red>Decompression error: {s}<r>", .{bun.asByteSlice(@errorName(err))});
|
||||
Output.flush();
|
||||
return err;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if (extremely_verbose)
|
||||
this.gzip_elapsed = gzip_timer.read();
|
||||
}
|
||||
|
||||
fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString, is_final_chunk: bool) !void {
|
||||
try this.decompressBytes(buffer.list.items, body_out_str, is_final_chunk);
|
||||
}
|
||||
|
||||
pub fn processBodyBuffer(this: *InternalState, buffer: MutableString, is_final_chunk: bool) !bool {
|
||||
if (this.flags.is_redirect_pending) return false;
|
||||
|
||||
var body_out_str = this.body_out_str.?;
|
||||
|
||||
switch (this.encoding) {
|
||||
Encoding.brotli, Encoding.gzip, Encoding.deflate => {
|
||||
try this.decompress(buffer, body_out_str, is_final_chunk);
|
||||
},
|
||||
else => {
|
||||
if (!body_out_str.owns(buffer.list.items)) {
|
||||
body_out_str.append(buffer.list.items) catch |err| {
|
||||
Output.prettyErrorln("<r><red>Failed to append to body buffer: {s}<r>", .{bun.asByteSlice(@errorName(err))});
|
||||
Output.flush();
|
||||
return err;
|
||||
};
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
return this.body_out_str.?.list.items.len > 0;
|
||||
}
|
||||
};
|
||||
36
src/http/client/init_options.zig
Normal file
36
src/http/client/init_options.zig
Normal file
@@ -0,0 +1,36 @@
|
||||
const bun = @import("root").bun;
|
||||
|
||||
const InitError = @import("./errors.zig").InitError;
|
||||
const stringZ = bun.stringZ;
|
||||
const Output = bun.Output;
|
||||
const Global = bun.Global;
|
||||
|
||||
fn onInitErrorNoop(err: InitError, opts: InitOpts) noreturn {
|
||||
switch (err) {
|
||||
error.LoadCAFile => {
|
||||
if (!bun.sys.existsZ(opts.abs_ca_file_name)) {
|
||||
Output.err("HTTPThread", "failed to find CA file: '{s}'", .{opts.abs_ca_file_name});
|
||||
} else {
|
||||
Output.err("HTTPThread", "failed to load CA file: '{s}'", .{opts.abs_ca_file_name});
|
||||
}
|
||||
},
|
||||
error.InvalidCAFile => {
|
||||
Output.err("HTTPThread", "the CA file is invalid: '{s}'", .{opts.abs_ca_file_name});
|
||||
},
|
||||
error.InvalidCA => {
|
||||
Output.err("HTTPThread", "the provided CA is invalid", .{});
|
||||
},
|
||||
error.FailedToOpenSocket => {
|
||||
Output.errGeneric("failed to start HTTP client thread", .{});
|
||||
},
|
||||
}
|
||||
Global.crash();
|
||||
}
|
||||
|
||||
pub const InitOpts = struct {
|
||||
ca: []stringZ = &.{},
|
||||
abs_ca_file_name: stringZ = &.{},
|
||||
for_install: bool = false,
|
||||
|
||||
onInitError: *const fn (err: InitError, opts: InitOpts) noreturn = &onInitErrorNoop,
|
||||
};
|
||||
16
src/http/client/metadata.zig
Normal file
16
src/http/client/metadata.zig
Normal file
@@ -0,0 +1,16 @@
|
||||
const bun = @import("root").bun;
|
||||
const picohttp = bun.picohttp;
|
||||
const std = @import("std");
|
||||
pub const HTTPResponseMetadata = struct {
|
||||
url: []const u8 = "",
|
||||
owned_buf: []u8 = "",
|
||||
response: picohttp.Response = .{},
|
||||
pub fn deinit(this: *HTTPResponseMetadata, allocator: std.mem.Allocator) void {
|
||||
if (this.owned_buf.len > 0) allocator.free(this.owned_buf);
|
||||
if (this.response.headers.list.len > 0) allocator.free(this.response.headers.list);
|
||||
this.owned_buf = &.{};
|
||||
this.url = "";
|
||||
this.response.headers = .{};
|
||||
this.response.status = "";
|
||||
}
|
||||
};
|
||||
323
src/http/client/proxy_tunnel.zig
Normal file
323
src/http/client/proxy_tunnel.zig
Normal file
@@ -0,0 +1,323 @@
|
||||
const bun = @import("root").bun;
|
||||
const uws = bun.uws;
|
||||
const BoringSSL = bun.BoringSSL;
|
||||
const strings = bun.strings;
|
||||
const SSLWrapper = @import("../../bun.js/api/bun/ssl_wrapper.zig").SSLWrapper;
|
||||
const getHttpContext = @import("./thread.zig").getContext;
|
||||
const http_thread = @import("./http/client/thread.zig").getHttpThread();
|
||||
const NewHTTPContext = @import("./thread.zig").NewHTTPContext;
|
||||
const HTTPClient = @import("../../http.zig").HTTPClient;
|
||||
const SSLConfig = bun.server.ServerConfig.SSLConfig;
|
||||
const HTTPCertError = @import("./errors.zig").HTTPCertError;
|
||||
const log = bun.Output.scoped(.fetch, false);
|
||||
const getTempHostname = @import("../../http.zig").getTempHostname;
|
||||
const ProxyTunnel = struct {
|
||||
wrapper: ?ProxyTunnelWrapper = null,
|
||||
shutdown_err: anyerror = error.ConnectionClosed,
|
||||
// active socket is the socket that is currently being used
|
||||
socket: union(enum) {
|
||||
tcp: NewHTTPContext(false).HTTPSocket,
|
||||
ssl: NewHTTPContext(true).HTTPSocket,
|
||||
none: void,
|
||||
} = .{ .none = {} },
|
||||
write_buffer: bun.io.StreamBuffer = .{},
|
||||
ref_count: u32 = 1,
|
||||
|
||||
const ProxyTunnelWrapper = SSLWrapper(*HTTPClient);
|
||||
|
||||
usingnamespace bun.NewRefCounted(ProxyTunnel, _deinit, null);
|
||||
|
||||
fn onOpen(this: *HTTPClient) void {
|
||||
this.state.response_stage = .proxy_handshake;
|
||||
this.state.request_stage = .proxy_handshake;
|
||||
if (this.proxy_tunnel) |proxy| {
|
||||
proxy.ref();
|
||||
defer proxy.deref();
|
||||
if (proxy.wrapper) |*wrapper| {
|
||||
var ssl_ptr = wrapper.ssl orelse return;
|
||||
const _hostname = this.hostname orelse this.url.hostname;
|
||||
|
||||
var hostname: [:0]const u8 = "";
|
||||
var hostname_needs_free = false;
|
||||
const temp_hostname = getTempHostname();
|
||||
if (!strings.isIPAddress(_hostname)) {
|
||||
if (_hostname.len < temp_hostname.len) {
|
||||
@memcpy(temp_hostname[0.._hostname.len], _hostname);
|
||||
temp_hostname[_hostname.len] = 0;
|
||||
hostname = temp_hostname[0.._hostname.len :0];
|
||||
} else {
|
||||
hostname = bun.default_allocator.dupeZ(u8, _hostname) catch unreachable;
|
||||
hostname_needs_free = true;
|
||||
}
|
||||
}
|
||||
|
||||
defer if (hostname_needs_free) bun.default_allocator.free(hostname);
|
||||
ssl_ptr.configureHTTPClient(hostname);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn onData(this: *HTTPClient, decoded_data: []const u8) void {
|
||||
if (decoded_data.len == 0) return;
|
||||
log("onData decoded {}", .{decoded_data.len});
|
||||
|
||||
if (this.proxy_tunnel) |proxy| {
|
||||
proxy.ref();
|
||||
defer proxy.deref();
|
||||
switch (this.state.response_stage) {
|
||||
.body => {
|
||||
if (decoded_data.len == 0) return;
|
||||
const report_progress = this.handleResponseBody(decoded_data, false) catch |err| {
|
||||
proxy.close(err);
|
||||
return;
|
||||
};
|
||||
|
||||
if (report_progress) {
|
||||
switch (proxy.socket) {
|
||||
.ssl => |socket| {
|
||||
this.progressUpdate(true, getHttpContext(true), socket);
|
||||
},
|
||||
.tcp => |socket| {
|
||||
this.progressUpdate(false, getHttpContext(false), socket);
|
||||
},
|
||||
.none => {},
|
||||
}
|
||||
return;
|
||||
}
|
||||
},
|
||||
.body_chunk => {
|
||||
if (decoded_data.len == 0) return;
|
||||
const report_progress = this.handleResponseBodyChunkedEncoding(decoded_data) catch |err| {
|
||||
proxy.close(err);
|
||||
return;
|
||||
};
|
||||
|
||||
if (report_progress) {
|
||||
switch (proxy.socket) {
|
||||
.ssl => |socket| {
|
||||
this.progressUpdate(true, getHttpContext(true), socket);
|
||||
},
|
||||
.tcp => |socket| {
|
||||
this.progressUpdate(false, getHttpContext(false), socket);
|
||||
},
|
||||
.none => {},
|
||||
}
|
||||
return;
|
||||
}
|
||||
},
|
||||
.proxy_headers => {
|
||||
switch (proxy.socket) {
|
||||
.ssl => |socket| {
|
||||
this.handleOnDataHeaders(true, decoded_data, getHttpContext(true), socket);
|
||||
},
|
||||
.tcp => |socket| {
|
||||
this.handleOnDataHeaders(false, decoded_data, getHttpContext(false), socket);
|
||||
},
|
||||
.none => {},
|
||||
}
|
||||
},
|
||||
else => {
|
||||
this.state.pending_response = null;
|
||||
proxy.close(error.UnexpectedData);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn onHandshake(this: *HTTPClient, handshake_success: bool, ssl_error: uws.us_bun_verify_error_t) void {
|
||||
if (this.proxy_tunnel) |proxy| {
|
||||
proxy.ref();
|
||||
defer proxy.deref();
|
||||
this.state.response_stage = .proxy_headers;
|
||||
this.state.request_stage = .proxy_headers;
|
||||
this.state.request_sent_len = 0;
|
||||
const handshake_error = HTTPCertError{
|
||||
.error_no = ssl_error.error_no,
|
||||
.code = if (ssl_error.code == null) "" else ssl_error.code[0..bun.len(ssl_error.code) :0],
|
||||
.reason = if (ssl_error.code == null) "" else ssl_error.reason[0..bun.len(ssl_error.reason) :0],
|
||||
};
|
||||
if (handshake_success) {
|
||||
// handshake completed but we may have ssl errors
|
||||
this.flags.did_have_handshaking_error = handshake_error.error_no != 0;
|
||||
if (this.flags.reject_unauthorized) {
|
||||
// only reject the connection if reject_unauthorized == true
|
||||
if (this.flags.did_have_handshaking_error) {
|
||||
proxy.close(BoringSSL.getCertErrorFromNo(handshake_error.error_no));
|
||||
return;
|
||||
}
|
||||
|
||||
// if checkServerIdentity returns false, we dont call open this means that the connection was rejected
|
||||
bun.assert(proxy.wrapper != null);
|
||||
const ssl_ptr = proxy.wrapper.?.ssl orelse return;
|
||||
|
||||
switch (proxy.socket) {
|
||||
.ssl => |socket| {
|
||||
if (!this.checkServerIdentity(true, socket, handshake_error, ssl_ptr, false)) {
|
||||
this.flags.did_have_handshaking_error = true;
|
||||
this.unregisterAbortTracker();
|
||||
return;
|
||||
}
|
||||
},
|
||||
.tcp => |socket| {
|
||||
if (!this.checkServerIdentity(false, socket, handshake_error, ssl_ptr, false)) {
|
||||
this.flags.did_have_handshaking_error = true;
|
||||
this.unregisterAbortTracker();
|
||||
return;
|
||||
}
|
||||
},
|
||||
.none => {},
|
||||
}
|
||||
}
|
||||
|
||||
switch (proxy.socket) {
|
||||
.ssl => |socket| {
|
||||
this.onWritable(true, true, socket);
|
||||
},
|
||||
.tcp => |socket| {
|
||||
this.onWritable(true, false, socket);
|
||||
},
|
||||
.none => {},
|
||||
}
|
||||
} else {
|
||||
// if we are here is because server rejected us, and the error_no is the cause of this
|
||||
// if we set reject_unauthorized == false this means the server requires custom CA aka NODE_EXTRA_CA_CERTS
|
||||
if (this.flags.did_have_handshaking_error) {
|
||||
proxy.close(BoringSSL.getCertErrorFromNo(handshake_error.error_no));
|
||||
return;
|
||||
}
|
||||
// if handshake_success it self is false, this means that the connection was rejected
|
||||
proxy.close(error.ConnectionRefused);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write(this: *HTTPClient, encoded_data: []const u8) void {
|
||||
if (this.proxy_tunnel) |proxy| {
|
||||
const written = switch (proxy.socket) {
|
||||
.ssl => |socket| socket.write(encoded_data, true),
|
||||
.tcp => |socket| socket.write(encoded_data, true),
|
||||
.none => 0,
|
||||
};
|
||||
const pending = encoded_data[@intCast(written)..];
|
||||
if (pending.len > 0) {
|
||||
// lets flush when we are truly writable
|
||||
proxy.write_buffer.write(pending) catch bun.outOfMemory();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn onClose(this: *HTTPClient) void {
|
||||
if (this.proxy_tunnel) |proxy| {
|
||||
proxy.ref();
|
||||
// defer the proxy deref the proxy tunnel may still be in use after triggering the close callback
|
||||
defer http_thread.scheduleProxyDeref(proxy);
|
||||
const err = proxy.shutdown_err;
|
||||
switch (proxy.socket) {
|
||||
.ssl => |socket| {
|
||||
this.closeAndFail(err, true, socket);
|
||||
},
|
||||
.tcp => |socket| {
|
||||
this.closeAndFail(err, false, socket);
|
||||
},
|
||||
.none => {},
|
||||
}
|
||||
proxy.detachSocket();
|
||||
}
|
||||
}
|
||||
|
||||
fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: SSLConfig) void {
|
||||
const proxy_tunnel = ProxyTunnel.new(.{});
|
||||
|
||||
var custom_options = ssl_options;
|
||||
// we always request the cert so we can verify it and also we manually abort the connection if the hostname doesn't match
|
||||
custom_options.reject_unauthorized = 0;
|
||||
custom_options.request_cert = 1;
|
||||
proxy_tunnel.wrapper = SSLWrapper(*HTTPClient).init(custom_options, true, .{
|
||||
.onOpen = ProxyTunnel.onOpen,
|
||||
.onData = ProxyTunnel.onData,
|
||||
.onHandshake = ProxyTunnel.onHandshake,
|
||||
.onClose = ProxyTunnel.onClose,
|
||||
.write = ProxyTunnel.write,
|
||||
.ctx = this,
|
||||
}) catch |err| {
|
||||
if (err == error.OutOfMemory) {
|
||||
bun.outOfMemory();
|
||||
}
|
||||
|
||||
// invalid TLS Options
|
||||
proxy_tunnel.detachAndDeref();
|
||||
this.closeAndFail(error.ConnectionRefused, is_ssl, socket);
|
||||
return;
|
||||
};
|
||||
this.proxy_tunnel = proxy_tunnel;
|
||||
if (is_ssl) {
|
||||
proxy_tunnel.socket = .{ .ssl = socket };
|
||||
} else {
|
||||
proxy_tunnel.socket = .{ .tcp = socket };
|
||||
}
|
||||
proxy_tunnel.wrapper.?.start();
|
||||
}
|
||||
|
||||
pub fn close(this: *ProxyTunnel, err: anyerror) void {
|
||||
this.shutdown_err = err;
|
||||
if (this.wrapper) |*wrapper| {
|
||||
// fast shutdown the connection
|
||||
_ = wrapper.shutdown(true);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onWritable(this: *ProxyTunnel, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
this.ref();
|
||||
defer this.deref();
|
||||
const encoded_data = this.write_buffer.slice();
|
||||
if (encoded_data.len == 0) {
|
||||
return;
|
||||
}
|
||||
const written = socket.write(encoded_data, true);
|
||||
if (written == encoded_data.len) {
|
||||
this.write_buffer.reset();
|
||||
return;
|
||||
}
|
||||
|
||||
this.write_buffer.cursor += @intCast(written);
|
||||
if (this.wrapper) |*wrapper| {
|
||||
// Cycle to through the SSL state machine
|
||||
_ = wrapper.flush();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn receiveData(this: *ProxyTunnel, buf: []const u8) void {
|
||||
this.ref();
|
||||
defer this.deref();
|
||||
if (this.wrapper) |*wrapper| {
|
||||
wrapper.receiveData(buf);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn writeData(this: *ProxyTunnel, buf: []const u8) !usize {
|
||||
if (this.wrapper) |*wrapper| {
|
||||
return try wrapper.writeData(buf);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
pub fn detachSocket(this: *ProxyTunnel) void {
|
||||
this.socket = .{ .none = {} };
|
||||
}
|
||||
|
||||
pub fn detachAndDeref(this: *ProxyTunnel) void {
|
||||
this.detachSocket();
|
||||
this.deref();
|
||||
}
|
||||
|
||||
fn _deinit(this: *ProxyTunnel) void {
|
||||
this.socket = .{ .none = {} };
|
||||
if (this.wrapper) |*wrapper| {
|
||||
wrapper.deinit();
|
||||
this.wrapper = null;
|
||||
}
|
||||
this.write_buffer.deinit();
|
||||
this.destroy();
|
||||
}
|
||||
};
|
||||
37
src/http/client/request_body.zig
Normal file
37
src/http/client/request_body.zig
Normal file
@@ -0,0 +1,37 @@
|
||||
const bun = @import("root").bun;
|
||||
const std = @import("std");
|
||||
const Environment = bun.Environment;
|
||||
const FeatureFlags = bun.FeatureFlags;
|
||||
const Sendfile = @import("./sendfile.zig").Sendfile;
|
||||
pub const HTTPRequestBody = union(enum) {
|
||||
bytes: []const u8,
|
||||
sendfile: Sendfile,
|
||||
stream: struct {
|
||||
buffer: bun.io.StreamBuffer,
|
||||
ended: bool,
|
||||
has_backpressure: bool = false,
|
||||
|
||||
pub fn hasEnded(this: *@This()) bool {
|
||||
return this.ended and this.buffer.isEmpty();
|
||||
}
|
||||
},
|
||||
|
||||
pub fn isStream(this: *const HTTPRequestBody) bool {
|
||||
return this.* == .stream;
|
||||
}
|
||||
|
||||
pub fn deinit(this: *HTTPRequestBody) void {
|
||||
switch (this.*) {
|
||||
.sendfile, .bytes => {},
|
||||
.stream => |*stream| stream.buffer.deinit(),
|
||||
}
|
||||
}
|
||||
pub fn len(this: *const HTTPRequestBody) usize {
|
||||
return switch (this.*) {
|
||||
.bytes => this.bytes.len,
|
||||
.sendfile => this.sendfile.content_size,
|
||||
// unknow amounts
|
||||
.stream => std.math.maxInt(usize),
|
||||
};
|
||||
}
|
||||
};
|
||||
81
src/http/client/result.zig
Normal file
81
src/http/client/result.zig
Normal file
@@ -0,0 +1,81 @@
|
||||
const bun = @import("root").bun;
|
||||
const JSC = bun.JSC;
|
||||
const MutableString = bun.MutableString;
|
||||
const HTTPResponseMetadata = @import("./metadata.zig").HTTPResponseMetadata;
|
||||
const CertificateInfo = @import("./certificate_info.zig").CertificateInfo;
|
||||
const AsyncHTTP = @import("./async_http.zig").AsyncHTTP;
|
||||
pub const HTTPClientResult = struct {
|
||||
body: ?*MutableString = null,
|
||||
has_more: bool = false,
|
||||
redirected: bool = false,
|
||||
can_stream: bool = false,
|
||||
|
||||
fail: ?anyerror = null,
|
||||
|
||||
/// Owns the response metadata aka headers, url and status code
|
||||
metadata: ?HTTPResponseMetadata = null,
|
||||
|
||||
/// For Http Client requests
|
||||
/// when Content-Length is provided this represents the whole size of the request
|
||||
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
|
||||
/// If is not chunked encoded and Content-Length is not provided this will be unknown
|
||||
body_size: BodySize = .unknown,
|
||||
certificate_info: ?CertificateInfo = null,
|
||||
|
||||
pub fn abortReason(this: *const HTTPClientResult) ?JSC.CommonAbortReason {
|
||||
if (this.isTimeout()) {
|
||||
return .Timeout;
|
||||
}
|
||||
|
||||
if (this.isAbort()) {
|
||||
return .UserAbort;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
pub const BodySize = union(enum) {
|
||||
total_received: usize,
|
||||
content_length: usize,
|
||||
unknown: void,
|
||||
};
|
||||
|
||||
pub fn isSuccess(this: *const HTTPClientResult) bool {
|
||||
return this.fail == null;
|
||||
}
|
||||
|
||||
pub fn isTimeout(this: *const HTTPClientResult) bool {
|
||||
return if (this.fail) |e| e == error.Timeout else false;
|
||||
}
|
||||
|
||||
pub fn isAbort(this: *const HTTPClientResult) bool {
|
||||
return if (this.fail) |e| (e == error.Aborted or e == error.AbortedBeforeConnecting) else false;
|
||||
}
|
||||
|
||||
pub const Callback = struct {
|
||||
ctx: *anyopaque,
|
||||
function: Function,
|
||||
|
||||
pub const Function = *const fn (*anyopaque, *AsyncHTTP, HTTPClientResult) void;
|
||||
|
||||
pub fn run(self: Callback, async_http: *AsyncHTTP, result: HTTPClientResult) void {
|
||||
self.function(self.ctx, async_http, result);
|
||||
}
|
||||
|
||||
pub fn New(comptime Type: type, comptime callback: anytype) type {
|
||||
return struct {
|
||||
pub fn init(this: Type) Callback {
|
||||
return Callback{
|
||||
.ctx = this,
|
||||
.function = @This().wrapped_callback,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn wrapped_callback(ptr: *anyopaque, async_http: *AsyncHTTP, result: HTTPClientResult) void {
|
||||
const casted = @as(Type, @ptrCast(@alignCast(ptr)));
|
||||
@call(bun.callmod_inline, callback, .{ casted, async_http, result });
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
};
|
||||
76
src/http/client/sendfile.zig
Normal file
76
src/http/client/sendfile.zig
Normal file
@@ -0,0 +1,76 @@
|
||||
const bun = @import("root").bun;
|
||||
const std = @import("std");
|
||||
const Environment = bun.Environment;
|
||||
const FeatureFlags = bun.FeatureFlags;
|
||||
pub const Sendfile = struct {
|
||||
fd: bun.FileDescriptor,
|
||||
remain: usize = 0,
|
||||
offset: usize = 0,
|
||||
content_size: usize = 0,
|
||||
|
||||
pub fn isEligible(url: bun.URL) bool {
|
||||
if (comptime Environment.isWindows or !FeatureFlags.streaming_file_uploads_for_http_client) {
|
||||
return false;
|
||||
}
|
||||
return url.isHTTP() and url.href.len > 0;
|
||||
}
|
||||
|
||||
pub fn write(
|
||||
this: *Sendfile,
|
||||
socket_fd: bun.FileDescriptor,
|
||||
) Status {
|
||||
const adjusted_count_temporary = @min(@as(u64, this.remain), @as(u63, std.math.maxInt(u63)));
|
||||
// TODO we should not need this int cast; improve the return type of `@min`
|
||||
const adjusted_count = @as(u63, @intCast(adjusted_count_temporary));
|
||||
|
||||
if (Environment.isLinux) {
|
||||
var signed_offset = @as(i64, @intCast(this.offset));
|
||||
const begin = this.offset;
|
||||
const val =
|
||||
// this does the syscall directly, without libc
|
||||
std.os.linux.sendfile(socket_fd, this.fd.cast(), &signed_offset, this.remain);
|
||||
this.offset = @as(u66, @intCast(signed_offset));
|
||||
|
||||
const errcode = bun.C.getErrno(val);
|
||||
|
||||
this.remain -|= @as(u64, @intCast(this.offset -| begin));
|
||||
|
||||
if (errcode != .SUCCESS or this.remain == 0 or val == 0) {
|
||||
if (errcode == .SUCCESS) {
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
return .{ .err = bun.errnoToZigErr(errcode) };
|
||||
}
|
||||
} else if (Environment.isPosix) {
|
||||
var sbytes: std.posix.off_t = adjusted_count;
|
||||
const signed_offset = @as(i64, @bitCast(@as(u64, this.offset)));
|
||||
const errcode = bun.C.getErrno(std.c.sendfile(
|
||||
this.fd.cast(),
|
||||
socket_fd,
|
||||
signed_offset,
|
||||
&sbytes,
|
||||
null,
|
||||
0,
|
||||
));
|
||||
const wrote = @as(u64, @intCast(sbytes));
|
||||
this.offset +|= wrote;
|
||||
this.remain -|= wrote;
|
||||
if (errcode != .AGAIN or this.remain == 0 or sbytes == 0) {
|
||||
if (errcode == .SUCCESS) {
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
return .{ .err = bun.errnoToZigErr(errcode) };
|
||||
}
|
||||
}
|
||||
|
||||
return .{ .again = {} };
|
||||
}
|
||||
|
||||
pub const Status = union(enum) {
|
||||
done: void,
|
||||
err: anyerror,
|
||||
again: void,
|
||||
};
|
||||
};
|
||||
33
src/http/client/signals.zig
Normal file
33
src/http/client/signals.zig
Normal file
@@ -0,0 +1,33 @@
|
||||
const std = @import("std");
|
||||
|
||||
pub const Signals = struct {
|
||||
header_progress: ?*std.atomic.Value(bool) = null,
|
||||
body_streaming: ?*std.atomic.Value(bool) = null,
|
||||
aborted: ?*std.atomic.Value(bool) = null,
|
||||
cert_errors: ?*std.atomic.Value(bool) = null,
|
||||
|
||||
pub fn isEmpty(this: *const Signals) bool {
|
||||
return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null;
|
||||
}
|
||||
|
||||
pub const Store = struct {
|
||||
header_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
body_streaming: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
aborted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
cert_errors: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
|
||||
pub fn to(this: *Store) Signals {
|
||||
return .{
|
||||
.header_progress = &this.header_progress,
|
||||
.body_streaming = &this.body_streaming,
|
||||
.aborted = &this.aborted,
|
||||
.cert_errors = &this.cert_errors,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals)) bool {
|
||||
var ptr: *std.atomic.Value(bool) = @field(this, @tagName(field)) orelse return false;
|
||||
return ptr.load(.monotonic);
|
||||
}
|
||||
};
|
||||
973
src/http/client/thread.zig
Normal file
973
src/http/client/thread.zig
Normal file
@@ -0,0 +1,973 @@
|
||||
const bun = @import("root").bun;
|
||||
const std = @import("std");
|
||||
|
||||
const Global = bun.Global;
|
||||
const picohttp = bun.picohttp;
|
||||
const BoringSSL = bun.BoringSSL;
|
||||
const JSC = bun.JSC;
|
||||
const MutableString = bun.MutableString;
|
||||
const Environment = bun.Environment;
|
||||
const DeadSocket = opaque {};
|
||||
var dead_socket = @as(*DeadSocket, @ptrFromInt(1));
|
||||
const HiveArray = bun.HiveArray;
|
||||
const uws = bun.uws;
|
||||
const FeatureFlags = bun.FeatureFlags;
|
||||
const TaggedPointerUnion = bun.TaggedPointerUnion;
|
||||
const InitError = @import("./errors.zig").InitError;
|
||||
const InitOpts = @import("./init_options.zig").InitOpts;
|
||||
const SSLConfig = bun.server.ServerConfig.SSLConfig;
|
||||
const Output = bun.Output;
|
||||
const assert = bun.assert;
|
||||
const strings = bun.strings;
|
||||
const Batch = bun.ThreadPool.Batch;
|
||||
|
||||
pub var http_thread: HTTPThread = undefined;
|
||||
|
||||
var custom_ssl_context_map = std.AutoArrayHashMap(*SSLConfig, *NewHTTPContext(true)).init(bun.default_allocator);
|
||||
pub const Headers = JSC.WebCore.Headers;
|
||||
const HTTPCertError = @import("./errors.zig").HTTPCertError;
|
||||
const Queue = @import("./async_http.zig").Queue;
|
||||
const ThreadlocalAsyncHTTP = @import("./async_http.zig").ThreadlocalAsyncHTTP;
|
||||
const ProxyTunnel = @import("./proxy_tunnel.zig").ProxyTunnel;
|
||||
const AsyncHTTP = @import("./async_http.zig").AsyncHTTP;
|
||||
const socket_async_http_abort_tracker = AsyncHTTP.getSocketAsyncHTTPAbortTracker();
|
||||
const log = Output.scoped(.fetch, false);
|
||||
const HTTPClient = @import("../../http.zig").HTTPClient;
|
||||
pub const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
|
||||
const MAX_KEEPALIVE_HOSTNAME = 128;
|
||||
|
||||
pub fn getContext(comptime ssl: bool) *NewHTTPContext(ssl) {
|
||||
return if (ssl) &http_thread.https_context else &http_thread.http_context;
|
||||
}
|
||||
pub fn getHttpThread() *HTTPThread {
|
||||
return &http_thread;
|
||||
}
|
||||
|
||||
pub fn NewHTTPContext(comptime ssl: bool) type {
|
||||
return struct {
|
||||
const pool_size = 64;
|
||||
const PooledSocket = struct {
|
||||
http_socket: HTTPSocket,
|
||||
hostname_buf: [MAX_KEEPALIVE_HOSTNAME]u8 = undefined,
|
||||
hostname_len: u8 = 0,
|
||||
port: u16 = 0,
|
||||
/// If you set `rejectUnauthorized` to `false`, the connection fails to verify,
|
||||
did_have_handshaking_error_while_reject_unauthorized_is_false: bool = false,
|
||||
};
|
||||
|
||||
pub fn markSocketAsDead(socket: HTTPSocket) void {
|
||||
if (socket.ext(**anyopaque)) |ctx| {
|
||||
ctx.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr());
|
||||
}
|
||||
}
|
||||
|
||||
fn terminateSocket(socket: HTTPSocket) void {
|
||||
markSocketAsDead(socket);
|
||||
socket.close(.failure);
|
||||
}
|
||||
|
||||
fn closeSocket(socket: HTTPSocket) void {
|
||||
markSocketAsDead(socket);
|
||||
socket.close(.normal);
|
||||
}
|
||||
|
||||
fn getTagged(ptr: *anyopaque) ActiveSocket {
|
||||
return ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
|
||||
}
|
||||
|
||||
pub fn getTaggedFromSocket(socket: HTTPSocket) ActiveSocket {
|
||||
if (socket.ext(anyopaque)) |ctx| {
|
||||
return getTagged(ctx);
|
||||
}
|
||||
return ActiveSocket.init(&dead_socket);
|
||||
}
|
||||
|
||||
pending_sockets: HiveArray(PooledSocket, pool_size) = .empty,
|
||||
us_socket_context: *uws.SocketContext,
|
||||
|
||||
const Context = @This();
|
||||
pub const HTTPSocket = uws.NewSocketHandler(ssl);
|
||||
|
||||
pub fn context() *@This() {
|
||||
return getContext(ssl);
|
||||
}
|
||||
|
||||
const ActiveSocket = TaggedPointerUnion(.{
|
||||
*DeadSocket,
|
||||
HTTPClient,
|
||||
PooledSocket,
|
||||
});
|
||||
const ssl_int = @as(c_int, @intFromBool(ssl));
|
||||
|
||||
pub fn sslCtx(this: *@This()) *BoringSSL.SSL_CTX {
|
||||
if (comptime !ssl) {
|
||||
unreachable;
|
||||
}
|
||||
|
||||
return @as(*BoringSSL.SSL_CTX, @ptrCast(this.us_socket_context.getNativeHandle(true)));
|
||||
}
|
||||
|
||||
pub fn deinit(this: *@This()) void {
|
||||
this.us_socket_context.deinit(ssl);
|
||||
uws.us_socket_context_free(@as(c_int, @intFromBool(ssl)), this.us_socket_context);
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
pub fn initWithClientConfig(this: *@This(), client: *HTTPClient) InitError!void {
|
||||
if (!comptime ssl) {
|
||||
@compileError("ssl only");
|
||||
}
|
||||
var opts = client.tls_props.?.asUSockets();
|
||||
opts.request_cert = 1;
|
||||
opts.reject_unauthorized = 0;
|
||||
try this.initWithOpts(&opts);
|
||||
}
|
||||
|
||||
fn initWithOpts(this: *@This(), opts: *const uws.us_bun_socket_context_options_t) InitError!void {
|
||||
if (!comptime ssl) {
|
||||
@compileError("ssl only");
|
||||
}
|
||||
|
||||
var err: uws.create_bun_socket_error_t = .none;
|
||||
const socket = uws.us_create_bun_socket_context(ssl_int, http_thread.loop.loop, @sizeOf(usize), opts.*, &err);
|
||||
if (socket == null) {
|
||||
return switch (err) {
|
||||
.load_ca_file => error.LoadCAFile,
|
||||
.invalid_ca_file => error.InvalidCAFile,
|
||||
.invalid_ca => error.InvalidCA,
|
||||
else => error.FailedToOpenSocket,
|
||||
};
|
||||
}
|
||||
this.us_socket_context = socket.?;
|
||||
this.sslCtx().setup();
|
||||
|
||||
HTTPSocket.configure(
|
||||
this.us_socket_context,
|
||||
false,
|
||||
anyopaque,
|
||||
Handler,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn initWithThreadOpts(this: *@This(), init_opts: *const InitOpts) InitError!void {
|
||||
if (!comptime ssl) {
|
||||
@compileError("ssl only");
|
||||
}
|
||||
var opts: uws.us_bun_socket_context_options_t = .{
|
||||
.ca = if (init_opts.ca.len > 0) @ptrCast(init_opts.ca) else null,
|
||||
.ca_count = @intCast(init_opts.ca.len),
|
||||
.ca_file_name = if (init_opts.abs_ca_file_name.len > 0) init_opts.abs_ca_file_name else null,
|
||||
.request_cert = 1,
|
||||
};
|
||||
|
||||
try this.initWithOpts(&opts);
|
||||
}
|
||||
|
||||
pub fn init(this: *@This()) void {
|
||||
if (comptime ssl) {
|
||||
const opts: uws.us_bun_socket_context_options_t = .{
|
||||
// we request the cert so we load root certs and can verify it
|
||||
.request_cert = 1,
|
||||
// we manually abort the connection if the hostname doesn't match
|
||||
.reject_unauthorized = 0,
|
||||
};
|
||||
var err: uws.create_bun_socket_error_t = .none;
|
||||
this.us_socket_context = uws.us_create_bun_socket_context(ssl_int, http_thread.loop.loop, @sizeOf(usize), opts, &err).?;
|
||||
|
||||
this.sslCtx().setup();
|
||||
} else {
|
||||
const opts: uws.us_socket_context_options_t = .{};
|
||||
this.us_socket_context = uws.us_create_socket_context(ssl_int, http_thread.loop.loop, @sizeOf(usize), opts).?;
|
||||
}
|
||||
|
||||
HTTPSocket.configure(
|
||||
this.us_socket_context,
|
||||
false,
|
||||
anyopaque,
|
||||
Handler,
|
||||
);
|
||||
}
|
||||
|
||||
/// Attempt to keep the socket alive by reusing it for another request.
|
||||
/// If no space is available, close the socket.
|
||||
///
|
||||
/// If `did_have_handshaking_error_while_reject_unauthorized_is_false`
|
||||
/// is set, then we can only reuse the socket for HTTP Keep Alive if
|
||||
/// `reject_unauthorized` is set to `false`.
|
||||
pub fn releaseSocket(this: *@This(), socket: HTTPSocket, did_have_handshaking_error_while_reject_unauthorized_is_false: bool, hostname: []const u8, port: u16) void {
|
||||
// log("releaseSocket(0x{})", .{bun.fmt.hexIntUpper(@intFromPtr(socket.socket))});
|
||||
|
||||
if (comptime Environment.allow_assert) {
|
||||
assert(!socket.isClosed());
|
||||
assert(!socket.isShutdown());
|
||||
assert(socket.isEstablished());
|
||||
}
|
||||
assert(hostname.len > 0);
|
||||
assert(port > 0);
|
||||
|
||||
if (hostname.len <= MAX_KEEPALIVE_HOSTNAME and !socket.isClosedOrHasError() and socket.isEstablished()) {
|
||||
if (this.pending_sockets.get()) |pending| {
|
||||
if (socket.ext(**anyopaque)) |ctx| {
|
||||
ctx.* = bun.cast(**anyopaque, ActiveSocket.init(pending).ptr());
|
||||
}
|
||||
socket.flush();
|
||||
socket.timeout(0);
|
||||
socket.setTimeoutMinutes(5);
|
||||
|
||||
pending.http_socket = socket;
|
||||
pending.did_have_handshaking_error_while_reject_unauthorized_is_false = did_have_handshaking_error_while_reject_unauthorized_is_false;
|
||||
@memcpy(pending.hostname_buf[0..hostname.len], hostname);
|
||||
pending.hostname_len = @as(u8, @truncate(hostname.len));
|
||||
pending.port = port;
|
||||
|
||||
// log("Keep-Alive release {s}:{d} (0x{})", .{ hostname, port, @intFromPtr(socket.socket) });
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
closeSocket(socket);
|
||||
}
|
||||
|
||||
pub const Handler = struct {
|
||||
pub fn onOpen(
|
||||
ptr: *anyopaque,
|
||||
socket: HTTPSocket,
|
||||
) void {
|
||||
const active = getTagged(ptr);
|
||||
if (active.get(HTTPClient)) |client| {
|
||||
if (client.onOpen(comptime ssl, socket)) |_| {
|
||||
return;
|
||||
} else |_| {
|
||||
log("Unable to open socket", .{});
|
||||
terminateSocket(socket);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (active.get(PooledSocket)) |pooled| {
|
||||
addMemoryBackToPool(pooled);
|
||||
return;
|
||||
}
|
||||
|
||||
log("Unexpected open on unknown socket", .{});
|
||||
terminateSocket(socket);
|
||||
}
|
||||
pub fn onHandshake(
|
||||
ptr: *anyopaque,
|
||||
socket: HTTPSocket,
|
||||
success: i32,
|
||||
ssl_error: uws.us_bun_verify_error_t,
|
||||
) void {
|
||||
const handshake_success = if (success == 1) true else false;
|
||||
|
||||
const handshake_error = HTTPCertError{
|
||||
.error_no = ssl_error.error_no,
|
||||
.code = if (ssl_error.code == null) "" else ssl_error.code[0..bun.len(ssl_error.code) :0],
|
||||
.reason = if (ssl_error.code == null) "" else ssl_error.reason[0..bun.len(ssl_error.reason) :0],
|
||||
};
|
||||
|
||||
const active = getTagged(ptr);
|
||||
if (active.get(HTTPClient)) |client| {
|
||||
// handshake completed but we may have ssl errors
|
||||
client.flags.did_have_handshaking_error = handshake_error.error_no != 0;
|
||||
if (handshake_success) {
|
||||
if (client.flags.reject_unauthorized) {
|
||||
// only reject the connection if reject_unauthorized == true
|
||||
if (client.flags.did_have_handshaking_error) {
|
||||
client.closeAndFail(BoringSSL.getCertErrorFromNo(handshake_error.error_no), comptime ssl, socket);
|
||||
return;
|
||||
}
|
||||
|
||||
// if checkServerIdentity returns false, we dont call open this means that the connection was rejected
|
||||
const ssl_ptr = @as(*BoringSSL.SSL, @ptrCast(socket.getNativeHandle()));
|
||||
if (!client.checkServerIdentity(comptime ssl, socket, handshake_error, ssl_ptr, true)) {
|
||||
client.flags.did_have_handshaking_error = true;
|
||||
client.unregisterAbortTracker();
|
||||
if (!socket.isClosed()) terminateSocket(socket);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
return client.firstCall(comptime ssl, socket);
|
||||
} else {
|
||||
// if we are here is because server rejected us, and the error_no is the cause of this
|
||||
// if we set reject_unauthorized == false this means the server requires custom CA aka NODE_EXTRA_CA_CERTS
|
||||
if (client.flags.did_have_handshaking_error) {
|
||||
client.closeAndFail(BoringSSL.getCertErrorFromNo(handshake_error.error_no), comptime ssl, socket);
|
||||
return;
|
||||
}
|
||||
// if handshake_success it self is false, this means that the connection was rejected
|
||||
client.closeAndFail(error.ConnectionRefused, comptime ssl, socket);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (socket.isClosed()) {
|
||||
markSocketAsDead(socket);
|
||||
if (active.get(PooledSocket)) |pooled| {
|
||||
addMemoryBackToPool(pooled);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (handshake_success) {
|
||||
if (active.is(PooledSocket)) {
|
||||
// Allow pooled sockets to be reused if the handshake was successful.
|
||||
socket.setTimeout(0);
|
||||
socket.setTimeoutMinutes(5);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (active.get(PooledSocket)) |pooled| {
|
||||
addMemoryBackToPool(pooled);
|
||||
}
|
||||
|
||||
terminateSocket(socket);
|
||||
}
|
||||
pub fn onClose(
|
||||
ptr: *anyopaque,
|
||||
socket: HTTPSocket,
|
||||
_: c_int,
|
||||
_: ?*anyopaque,
|
||||
) void {
|
||||
const tagged = getTagged(ptr);
|
||||
markSocketAsDead(socket);
|
||||
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
return client.onClose(comptime ssl, socket);
|
||||
}
|
||||
|
||||
if (tagged.get(PooledSocket)) |pooled| {
|
||||
addMemoryBackToPool(pooled);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
fn addMemoryBackToPool(pooled: *PooledSocket) void {
|
||||
assert(context().pending_sockets.put(pooled));
|
||||
}
|
||||
|
||||
pub fn onData(
|
||||
ptr: *anyopaque,
|
||||
socket: HTTPSocket,
|
||||
buf: []const u8,
|
||||
) void {
|
||||
const tagged = getTagged(ptr);
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
return client.onData(
|
||||
comptime ssl,
|
||||
buf,
|
||||
getContext(ssl),
|
||||
socket,
|
||||
);
|
||||
} else if (tagged.is(PooledSocket)) {
|
||||
// trailing zero is fine to ignore
|
||||
if (strings.eqlComptime(buf, end_of_chunked_http1_1_encoding_response_body)) {
|
||||
return;
|
||||
}
|
||||
|
||||
log("Unexpected data on socket", .{});
|
||||
|
||||
return;
|
||||
}
|
||||
log("Unexpected data on unknown socket", .{});
|
||||
terminateSocket(socket);
|
||||
}
|
||||
pub fn onWritable(
|
||||
ptr: *anyopaque,
|
||||
socket: HTTPSocket,
|
||||
) void {
|
||||
const tagged = getTagged(ptr);
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
return client.onWritable(
|
||||
false,
|
||||
comptime ssl,
|
||||
socket,
|
||||
);
|
||||
} else if (tagged.is(PooledSocket)) {
|
||||
// it's a keep-alive socket
|
||||
} else {
|
||||
// don't know what this is, let's close it
|
||||
log("Unexpected writable on socket", .{});
|
||||
terminateSocket(socket);
|
||||
}
|
||||
}
|
||||
pub fn onLongTimeout(
|
||||
ptr: *anyopaque,
|
||||
socket: HTTPSocket,
|
||||
) void {
|
||||
const tagged = getTagged(ptr);
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
return client.onTimeout(comptime ssl, socket);
|
||||
} else if (tagged.get(PooledSocket)) |pooled| {
|
||||
// If a socket has been sitting around for 5 minutes
|
||||
// Let's close it and remove it from the pool.
|
||||
addMemoryBackToPool(pooled);
|
||||
}
|
||||
|
||||
terminateSocket(socket);
|
||||
}
|
||||
pub fn onConnectError(
|
||||
ptr: *anyopaque,
|
||||
socket: HTTPSocket,
|
||||
_: c_int,
|
||||
) void {
|
||||
const tagged = getTagged(ptr);
|
||||
markSocketAsDead(socket);
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
client.onConnectError();
|
||||
} else if (tagged.get(PooledSocket)) |pooled| {
|
||||
addMemoryBackToPool(pooled);
|
||||
}
|
||||
// us_connecting_socket_close is always called internally by uSockets
|
||||
}
|
||||
pub fn onEnd(
|
||||
_: *anyopaque,
|
||||
socket: HTTPSocket,
|
||||
) void {
|
||||
// TCP fin must be closed, but we must keep the original tagged
|
||||
// pointer so that their onClose callback is called.
|
||||
//
|
||||
// Three possible states:
|
||||
// 1. HTTP Keep-Alive socket: it must be removed from the pool
|
||||
// 2. HTTP Client socket: it might need to be retried
|
||||
// 3. Dead socket: it is already marked as dead
|
||||
socket.close(.failure);
|
||||
}
|
||||
};
|
||||
|
||||
fn existingSocket(this: *@This(), reject_unauthorized: bool, hostname: []const u8, port: u16) ?HTTPSocket {
|
||||
if (hostname.len > MAX_KEEPALIVE_HOSTNAME)
|
||||
return null;
|
||||
|
||||
var iter = this.pending_sockets.available.iterator(.{ .kind = .unset });
|
||||
|
||||
while (iter.next()) |pending_socket_index| {
|
||||
var socket = this.pending_sockets.at(@as(u16, @intCast(pending_socket_index)));
|
||||
if (socket.port != port) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (socket.did_have_handshaking_error_while_reject_unauthorized_is_false and reject_unauthorized) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strings.eqlLong(socket.hostname_buf[0..socket.hostname_len], hostname, true)) {
|
||||
const http_socket = socket.http_socket;
|
||||
assert(context().pending_sockets.put(socket));
|
||||
|
||||
if (http_socket.isClosed()) {
|
||||
markSocketAsDead(http_socket);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (http_socket.isShutdown() or http_socket.getError() != 0) {
|
||||
terminateSocket(http_socket);
|
||||
continue;
|
||||
}
|
||||
|
||||
log("+ Keep-Alive reuse {s}:{d}", .{ hostname, port });
|
||||
return http_socket;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn connectSocket(this: *@This(), client: *HTTPClient, socket_path: []const u8) !HTTPSocket {
|
||||
client.connected_url = if (client.http_proxy) |proxy| proxy else client.url;
|
||||
const socket = try HTTPSocket.connectUnixAnon(
|
||||
socket_path,
|
||||
this.us_socket_context,
|
||||
ActiveSocket.init(client).ptr(),
|
||||
false, // dont allow half-open sockets
|
||||
);
|
||||
client.allow_retry = false;
|
||||
return socket;
|
||||
}
|
||||
|
||||
pub fn connect(this: *@This(), client: *HTTPClient, hostname_: []const u8, port: u16) !HTTPSocket {
|
||||
const hostname = if (FeatureFlags.hardcode_localhost_to_127_0_0_1 and strings.eqlComptime(hostname_, "localhost"))
|
||||
"127.0.0.1"
|
||||
else
|
||||
hostname_;
|
||||
|
||||
client.connected_url = if (client.http_proxy) |proxy| proxy else client.url;
|
||||
client.connected_url.hostname = hostname;
|
||||
|
||||
if (client.isKeepAlivePossible()) {
|
||||
if (this.existingSocket(client.flags.reject_unauthorized, hostname, port)) |sock| {
|
||||
if (sock.ext(**anyopaque)) |ctx| {
|
||||
ctx.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr());
|
||||
}
|
||||
client.allow_retry = true;
|
||||
try client.onOpen(comptime ssl, sock);
|
||||
if (comptime ssl) {
|
||||
client.firstCall(comptime ssl, sock);
|
||||
}
|
||||
return sock;
|
||||
}
|
||||
}
|
||||
|
||||
const socket = try HTTPSocket.connectAnon(
|
||||
hostname,
|
||||
port,
|
||||
this.us_socket_context,
|
||||
ActiveSocket.init(client).ptr(),
|
||||
false,
|
||||
);
|
||||
client.allow_retry = false;
|
||||
return socket;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub const HTTPThread = struct {
|
||||
loop: *JSC.MiniEventLoop,
|
||||
http_context: NewHTTPContext(false),
|
||||
https_context: NewHTTPContext(true),
|
||||
|
||||
queued_tasks: Queue = Queue{},
|
||||
|
||||
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
|
||||
queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){},
|
||||
|
||||
queued_shutdowns_lock: bun.Mutex = .{},
|
||||
queued_writes_lock: bun.Mutex = .{},
|
||||
|
||||
queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
|
||||
|
||||
has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
timer: std.time.Timer,
|
||||
lazy_libdeflater: ?*LibdeflateState = null,
|
||||
lazy_request_body_buffer: ?*HeapRequestBodyBuffer = null,
|
||||
|
||||
pub const HeapRequestBodyBuffer = struct {
|
||||
buffer: [512 * 1024]u8 = undefined,
|
||||
fixed_buffer_allocator: std.heap.FixedBufferAllocator,
|
||||
|
||||
pub usingnamespace bun.New(@This());
|
||||
|
||||
pub fn init() *@This() {
|
||||
var this = HeapRequestBodyBuffer.new(.{
|
||||
.fixed_buffer_allocator = undefined,
|
||||
});
|
||||
this.fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(&this.buffer);
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn put(this: *@This()) void {
|
||||
if (http_thread.lazy_request_body_buffer == null) {
|
||||
// This case hypothetically should never happen
|
||||
this.fixed_buffer_allocator.reset();
|
||||
http_thread.lazy_request_body_buffer = this;
|
||||
} else {
|
||||
this.deinit();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(this: *@This()) void {
|
||||
this.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
pub const RequestBodyBuffer = union(enum) {
|
||||
heap: *HeapRequestBodyBuffer,
|
||||
stack: std.heap.StackFallbackAllocator(request_body_send_stack_buffer_size),
|
||||
|
||||
pub fn deinit(this: *@This()) void {
|
||||
switch (this.*) {
|
||||
.heap => |heap| heap.put(),
|
||||
.stack => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn allocatedSlice(this: *@This()) []u8 {
|
||||
return switch (this.*) {
|
||||
.heap => |heap| &heap.buffer,
|
||||
.stack => |*stack| &stack.buffer,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn allocator(this: *@This()) std.mem.Allocator {
|
||||
return switch (this.*) {
|
||||
.heap => |heap| heap.fixed_buffer_allocator.allocator(),
|
||||
.stack => |*stack| stack.get(),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn toArrayList(this: *@This()) std.ArrayList(u8) {
|
||||
var arraylist = std.ArrayList(u8).fromOwnedSlice(this.allocator(), this.allocatedSlice());
|
||||
arraylist.items.len = 0;
|
||||
return arraylist;
|
||||
}
|
||||
};
|
||||
|
||||
const threadlog = Output.scoped(.HTTPThread, true);
|
||||
const WriteMessage = struct {
|
||||
data: []const u8,
|
||||
async_http_id: u32,
|
||||
flags: packed struct {
|
||||
is_tls: bool,
|
||||
ended: bool,
|
||||
},
|
||||
};
|
||||
const ShutdownMessage = struct {
|
||||
async_http_id: u32,
|
||||
is_tls: bool,
|
||||
};
|
||||
|
||||
pub const LibdeflateState = struct {
|
||||
decompressor: *bun.libdeflate.Decompressor = undefined,
|
||||
shared_buffer: [512 * 1024]u8 = undefined,
|
||||
|
||||
pub usingnamespace bun.New(@This());
|
||||
};
|
||||
|
||||
const request_body_send_stack_buffer_size = 32 * 1024;
|
||||
|
||||
pub inline fn getRequestBodySendBuffer(this: *@This(), estimated_size: usize) RequestBodyBuffer {
|
||||
if (estimated_size >= request_body_send_stack_buffer_size) {
|
||||
if (this.lazy_request_body_buffer == null) {
|
||||
log("Allocating HeapRequestBodyBuffer due to {d} bytes request body", .{estimated_size});
|
||||
return .{
|
||||
.heap = HeapRequestBodyBuffer.init(),
|
||||
};
|
||||
}
|
||||
|
||||
return .{ .heap = bun.take(&this.lazy_request_body_buffer).? };
|
||||
}
|
||||
return .{
|
||||
.stack = std.heap.stackFallback(request_body_send_stack_buffer_size, bun.default_allocator),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deflater(this: *@This()) *LibdeflateState {
|
||||
if (this.lazy_libdeflater == null) {
|
||||
this.lazy_libdeflater = LibdeflateState.new(.{
|
||||
.decompressor = bun.libdeflate.Decompressor.alloc() orelse bun.outOfMemory(),
|
||||
});
|
||||
}
|
||||
|
||||
return this.lazy_libdeflater.?;
|
||||
}
|
||||
|
||||
fn initOnce(opts: *const InitOpts) void {
|
||||
http_thread = .{
|
||||
.loop = undefined,
|
||||
.http_context = .{
|
||||
.us_socket_context = undefined,
|
||||
},
|
||||
.https_context = .{
|
||||
.us_socket_context = undefined,
|
||||
},
|
||||
.timer = std.time.Timer.start() catch unreachable,
|
||||
};
|
||||
bun.libdeflate.load();
|
||||
const thread = std.Thread.spawn(
|
||||
.{
|
||||
.stack_size = bun.default_thread_stack_size,
|
||||
},
|
||||
onStart,
|
||||
.{opts.*},
|
||||
) catch |err| Output.panic("Failed to start HTTP Client thread: {s}", .{@errorName(err)});
|
||||
thread.detach();
|
||||
}
|
||||
var init_once = bun.once(initOnce);
|
||||
|
||||
pub fn init(opts: *const InitOpts) void {
|
||||
init_once.call(.{opts});
|
||||
}
|
||||
|
||||
pub fn onStart(opts: InitOpts) void {
|
||||
Output.Source.configureNamedThread("HTTP Client");
|
||||
|
||||
const loop = bun.JSC.MiniEventLoop.initGlobal(null);
|
||||
|
||||
if (Environment.isWindows) {
|
||||
_ = std.process.getenvW(comptime bun.strings.w("SystemRoot")) orelse {
|
||||
bun.Output.errGeneric("The %SystemRoot% environment variable is not set. Bun needs this set in order for network requests to work.", .{});
|
||||
Global.crash();
|
||||
};
|
||||
}
|
||||
|
||||
http_thread.loop = loop;
|
||||
http_thread.http_context.init();
|
||||
http_thread.https_context.initWithThreadOpts(&opts) catch |err| opts.onInitError(err, opts);
|
||||
http_thread.has_awoken.store(true, .monotonic);
|
||||
http_thread.processEvents();
|
||||
}
|
||||
|
||||
pub fn connect(this: *@This(), client: *HTTPClient, comptime is_ssl: bool) !NewHTTPContext(is_ssl).HTTPSocket {
|
||||
if (client.unix_socket_path.length() > 0) {
|
||||
return try this.context(is_ssl).connectSocket(client, client.unix_socket_path.slice());
|
||||
}
|
||||
|
||||
if (comptime is_ssl) {
|
||||
const needs_own_context = client.tls_props != null and client.tls_props.?.requires_custom_request_ctx;
|
||||
if (needs_own_context) {
|
||||
var requested_config = client.tls_props.?;
|
||||
for (custom_ssl_context_map.keys()) |other_config| {
|
||||
if (requested_config.isSame(other_config)) {
|
||||
// we free the callers config since we have a existing one
|
||||
if (requested_config != client.tls_props) {
|
||||
requested_config.deinit();
|
||||
bun.default_allocator.destroy(requested_config);
|
||||
}
|
||||
client.tls_props = other_config;
|
||||
if (client.http_proxy) |url| {
|
||||
return try custom_ssl_context_map.get(other_config).?.connect(client, url.hostname, url.getPortAuto());
|
||||
} else {
|
||||
return try custom_ssl_context_map.get(other_config).?.connect(client, client.url.hostname, client.url.getPortAuto());
|
||||
}
|
||||
}
|
||||
}
|
||||
// we need the config so dont free it
|
||||
var custom_context = try bun.default_allocator.create(NewHTTPContext(is_ssl));
|
||||
custom_context.initWithClientConfig(client) catch |err| {
|
||||
client.tls_props = null;
|
||||
|
||||
requested_config.deinit();
|
||||
bun.default_allocator.destroy(requested_config);
|
||||
bun.default_allocator.destroy(custom_context);
|
||||
|
||||
// TODO: these error names reach js. figure out how they should be handled
|
||||
return switch (err) {
|
||||
error.FailedToOpenSocket => |e| e,
|
||||
error.InvalidCA => error.FailedToOpenSocket,
|
||||
error.InvalidCAFile => error.FailedToOpenSocket,
|
||||
error.LoadCAFile => error.FailedToOpenSocket,
|
||||
};
|
||||
};
|
||||
try custom_ssl_context_map.put(requested_config, custom_context);
|
||||
// We might deinit the socket context, so we disable keepalive to make sure we don't
|
||||
// free it while in use.
|
||||
client.flags.disable_keepalive = true;
|
||||
if (client.http_proxy) |url| {
|
||||
// https://github.com/oven-sh/bun/issues/11343
|
||||
if (url.protocol.len == 0 or strings.eqlComptime(url.protocol, "https") or strings.eqlComptime(url.protocol, "http")) {
|
||||
return try this.context(is_ssl).connect(client, url.hostname, url.getPortAuto());
|
||||
}
|
||||
return error.UnsupportedProxyProtocol;
|
||||
}
|
||||
return try custom_context.connect(client, client.url.hostname, client.url.getPortAuto());
|
||||
}
|
||||
}
|
||||
if (client.http_proxy) |url| {
|
||||
if (url.href.len > 0) {
|
||||
// https://github.com/oven-sh/bun/issues/11343
|
||||
if (url.protocol.len == 0 or strings.eqlComptime(url.protocol, "https") or strings.eqlComptime(url.protocol, "http")) {
|
||||
return try this.context(is_ssl).connect(client, url.hostname, url.getPortAuto());
|
||||
}
|
||||
return error.UnsupportedProxyProtocol;
|
||||
}
|
||||
}
|
||||
return try this.context(is_ssl).connect(client, client.url.hostname, client.url.getPortAuto());
|
||||
}
|
||||
|
||||
pub fn context(this: *@This(), comptime is_ssl: bool) *NewHTTPContext(is_ssl) {
|
||||
return if (is_ssl) &this.https_context else &this.http_context;
|
||||
}
|
||||
|
||||
fn drainEvents(this: *@This()) void {
|
||||
{
|
||||
this.queued_shutdowns_lock.lock();
|
||||
defer this.queued_shutdowns_lock.unlock();
|
||||
for (this.queued_shutdowns.items) |http| {
|
||||
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
|
||||
if (http.is_tls) {
|
||||
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
|
||||
// do a fast shutdown here since we are aborting and we dont want to wait for the close_notify from the other side
|
||||
socket.close(.failure);
|
||||
} else {
|
||||
const socket = uws.SocketTCP.fromAny(socket_ptr.value);
|
||||
socket.close(.failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.queued_shutdowns.clearRetainingCapacity();
|
||||
}
|
||||
{
|
||||
this.queued_writes_lock.lock();
|
||||
defer this.queued_writes_lock.unlock();
|
||||
for (this.queued_writes.items) |write| {
|
||||
const ended = write.flags.ended;
|
||||
defer if (!strings.eqlComptime(write.data, end_of_chunked_http1_1_encoding_response_body) and write.data.len > 0) {
|
||||
// "0\r\n\r\n" is always a static so no need to free
|
||||
bun.default_allocator.free(write.data);
|
||||
};
|
||||
if (socket_async_http_abort_tracker.get(write.async_http_id)) |socket_ptr| {
|
||||
if (write.flags.is_tls) {
|
||||
const socket = uws.SocketTLS.fromAny(socket_ptr);
|
||||
if (socket.isClosed() or socket.isShutdown()) {
|
||||
continue;
|
||||
}
|
||||
const tagged = NewHTTPContext(true).getTaggedFromSocket(socket);
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
if (client.state.original_request_body == .stream) {
|
||||
var stream = &client.state.original_request_body.stream;
|
||||
if (write.data.len > 0) {
|
||||
stream.buffer.write(write.data) catch {};
|
||||
}
|
||||
stream.ended = ended;
|
||||
if (!stream.has_backpressure) {
|
||||
client.onWritable(
|
||||
false,
|
||||
true,
|
||||
socket,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const socket = uws.SocketTCP.fromAny(socket_ptr);
|
||||
if (socket.isClosed() or socket.isShutdown()) {
|
||||
continue;
|
||||
}
|
||||
const tagged = NewHTTPContext(false).getTaggedFromSocket(socket);
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
if (client.state.original_request_body == .stream) {
|
||||
var stream = &client.state.original_request_body.stream;
|
||||
if (write.data.len > 0) {
|
||||
stream.buffer.write(write.data) catch {};
|
||||
}
|
||||
stream.ended = ended;
|
||||
if (!stream.has_backpressure) {
|
||||
client.onWritable(
|
||||
false,
|
||||
false,
|
||||
socket,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.queued_writes.clearRetainingCapacity();
|
||||
}
|
||||
|
||||
while (this.queued_proxy_deref.popOrNull()) |http| {
|
||||
http.deref();
|
||||
}
|
||||
|
||||
var count: usize = 0;
|
||||
var active = AsyncHTTP.active_requests_count.load(.monotonic);
|
||||
const max = AsyncHTTP.max_simultaneous_requests.load(.monotonic);
|
||||
if (active >= max) return;
|
||||
defer {
|
||||
if (comptime Environment.allow_assert) {
|
||||
if (count > 0)
|
||||
log("Processed {d} tasks\n", .{count});
|
||||
}
|
||||
}
|
||||
|
||||
while (this.queued_tasks.pop()) |http| {
|
||||
var cloned = ThreadlocalAsyncHTTP.new(.{
|
||||
.async_http = http.*,
|
||||
});
|
||||
cloned.async_http.real = http;
|
||||
cloned.async_http.onStart();
|
||||
if (comptime Environment.allow_assert) {
|
||||
count += 1;
|
||||
}
|
||||
|
||||
active += 1;
|
||||
if (active >= max) break;
|
||||
}
|
||||
}
|
||||
|
||||
fn processEvents(this: *@This()) noreturn {
|
||||
if (comptime Environment.isPosix) {
|
||||
this.loop.loop.num_polls = @max(2, this.loop.loop.num_polls);
|
||||
} else if (comptime Environment.isWindows) {
|
||||
this.loop.loop.inc();
|
||||
} else {
|
||||
@compileError("TODO:");
|
||||
}
|
||||
|
||||
while (true) {
|
||||
this.drainEvents();
|
||||
|
||||
var start_time: i128 = 0;
|
||||
if (comptime Environment.isDebug) {
|
||||
start_time = std.time.nanoTimestamp();
|
||||
}
|
||||
Output.flush();
|
||||
|
||||
this.loop.loop.inc();
|
||||
this.loop.loop.tick();
|
||||
this.loop.loop.dec();
|
||||
|
||||
// this.loop.run();
|
||||
if (comptime Environment.isDebug) {
|
||||
const end = std.time.nanoTimestamp();
|
||||
threadlog("Waited {any}\n", .{std.fmt.fmtDurationSigned(@as(i64, @truncate(end - start_time)))});
|
||||
Output.flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void {
|
||||
{
|
||||
this.queued_shutdowns_lock.lock();
|
||||
defer this.queued_shutdowns_lock.unlock();
|
||||
this.queued_shutdowns.append(bun.default_allocator, .{
|
||||
.async_http_id = http.async_http_id,
|
||||
.is_tls = http.client.isHTTPS(),
|
||||
}) catch bun.outOfMemory();
|
||||
}
|
||||
if (this.has_awoken.load(.monotonic))
|
||||
this.loop.loop.wakeup();
|
||||
}
|
||||
|
||||
pub fn scheduleRequestWrite(this: *@This(), http: *AsyncHTTP, data: []const u8, ended: bool) void {
|
||||
{
|
||||
this.queued_writes_lock.lock();
|
||||
defer this.queued_writes_lock.unlock();
|
||||
this.queued_writes.append(bun.default_allocator, .{
|
||||
.async_http_id = http.async_http_id,
|
||||
.data = data,
|
||||
.flags = .{
|
||||
.is_tls = http.client.isHTTPS(),
|
||||
.ended = ended,
|
||||
},
|
||||
}) catch bun.outOfMemory();
|
||||
}
|
||||
if (this.has_awoken.load(.monotonic))
|
||||
this.loop.loop.wakeup();
|
||||
}
|
||||
|
||||
pub fn scheduleProxyDeref(this: *@This(), proxy: *ProxyTunnel) void {
|
||||
// this is always called on the http thread
|
||||
{
|
||||
this.queued_proxy_deref.append(bun.default_allocator, proxy) catch bun.outOfMemory();
|
||||
}
|
||||
if (this.has_awoken.load(.monotonic))
|
||||
this.loop.loop.wakeup();
|
||||
}
|
||||
|
||||
pub fn wakeup(this: *@This()) void {
|
||||
if (this.has_awoken.load(.monotonic))
|
||||
this.loop.loop.wakeup();
|
||||
}
|
||||
|
||||
pub fn schedule(this: *@This(), batch: Batch) void {
|
||||
if (batch.len == 0)
|
||||
return;
|
||||
|
||||
{
|
||||
var batch_ = batch;
|
||||
while (batch_.pop()) |task| {
|
||||
const http: *AsyncHTTP = @fieldParentPtr("task", task);
|
||||
this.queued_tasks.push(http);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.has_awoken.load(.monotonic))
|
||||
this.loop.loop.wakeup();
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user