New HTTP client (#1231)

* wip

* It mostly works!

* Support `bun install`

* Support `bun create`

* Support chunked transfer encoding

* Handle Keep Alive when redirecting to a different domain

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
This commit is contained in:
Jarred Sumner
2022-09-11 13:37:17 -07:00
committed by GitHub
parent 8b91360a33
commit 9a5aa059f9
17 changed files with 1578 additions and 762 deletions

View File

@@ -16,7 +16,7 @@ ARG WEBKIT_URL="https://github.com/oven-sh/WebKit/releases/download/$WEBKIT_TAG/
ARG ZIG_URL="https://github.com/oven-sh/zig/releases/download/$ZIG_TAG/zig-linux-$BUILDARCH.zip"
ARG GIT_SHA=""
ARG BUN_BASE_VERSION=0.1
ARG BUN_BASE_VERSION=10.
FROM bitnami/minideb:bullseye as bun-base

View File

@@ -140,6 +140,7 @@ fn addInternalPackages(step: *std.build.LibExeObjStep, _: std.mem.Allocator, tar
io,
boringssl,
thread_pool,
uws,
};
thread_pool.dependencies = &.{ io, http };
http.dependencies = &.{
@@ -148,6 +149,7 @@ fn addInternalPackages(step: *std.build.LibExeObjStep, _: std.mem.Allocator, tar
io,
boringssl,
thread_pool,
uws,
};
thread_pool.dependencies = &.{ io, http };

View File

@@ -17,7 +17,7 @@ const Method = @import("../src/http/method.zig").Method;
const ColonListType = @import("../src/cli/colon_list_type.zig").ColonListType;
const HeadersTuple = ColonListType(string, noop_resolver);
const path_handler = @import("../src/resolver/resolve_path.zig");
const NetworkThread = @import("http").NetworkThread;
const HTTPThread = @import("http").HTTPThread;
const HTTP = @import("http");
fn noop_resolver(in: string) !string {
return in;
@@ -184,7 +184,7 @@ pub fn main() anyerror!void {
try channel.buffer.ensureTotalCapacity(1);
try NetworkThread.init();
try HTTPThread.init();
var ctx = try default_allocator.create(HTTP.HTTPChannelContext);
ctx.* = .{
@@ -202,12 +202,12 @@ pub fn main() anyerror!void {
),
};
ctx.http.callback = HTTP.HTTPChannelContext.callback;
var batch = NetworkThread.Batch{};
var batch = HTTPThread.Batch{};
ctx.http.schedule(default_allocator, &batch);
ctx.http.client.verbose = args.verbose;
ctx.http.verbose = args.verbose;
NetworkThread.global.schedule(batch);
HTTPThread.global.schedule(batch);
while (true) {
while (channel.tryReadItem() catch null) |http| {

View File

@@ -486,7 +486,7 @@ pub const EventLoop = struct {
}
if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) {
if (this.waker) |waker| {
if (this.waker) |*waker| {
waker.wake() catch unreachable;
}
}

View File

@@ -517,6 +517,7 @@ pub const Fetch = struct {
pub const FetchTasklet = struct {
promise: *JSPromise = undefined,
http: HTTPClient.AsyncHTTP = undefined,
result: HTTPClient.HTTPClientResult = undefined,
status: Status = Status.pending,
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
@@ -592,12 +593,13 @@ pub const Fetch = struct {
this.blob_store = null;
store.deref();
}
defer this.result.deinitMetadata();
const fetch_error = std.fmt.allocPrint(
default_allocator,
"fetch() failed {s}\nurl: \"{s}\"",
.{
@errorName(this.http.err orelse error.HTTPFail),
this.http.url.href,
this.result.href,
},
) catch unreachable;
return ZigString.init(fetch_error).toErrorInstance(this.global_this);
@@ -611,11 +613,12 @@ pub const Fetch = struct {
this.blob_store = null;
store.deref();
}
defer this.result.deinitMetadata();
response.* = Response{
.allocator = allocator,
.url = allocator.dupe(u8, this.http.url.href) catch unreachable,
.url = allocator.dupe(u8, this.result.href) catch unreachable,
.status_text = allocator.dupe(u8, http_response.status) catch unreachable,
.redirected = this.http.redirect_count > 0,
.redirected = this.http.redirected,
.body = .{
.init = .{
.headers = FetchHeaders.createFromPicoHeaders(this.global_this, http_response.headers),
@@ -645,7 +648,7 @@ pub const Fetch = struct {
// linked_list.data.pooled_body = BodyPool.get(allocator);
linked_list.data.blob_store = request_body_store;
linked_list.data.response_buffer = MutableString.initEmpty(allocator);
linked_list.data.http = try HTTPClient.AsyncHTTP.init(
linked_list.data.http = HTTPClient.AsyncHTTP.init(
allocator,
method,
url,
@@ -653,10 +656,16 @@ pub const Fetch = struct {
headers_buf,
&linked_list.data.response_buffer,
request_body orelse &linked_list.data.empty_request_body,
timeout,
undefined,
);
linked_list.data.context = .{ .tasklet = &linked_list.data };
linked_list.data.http.completion_callback = HTTPClient.HTTPClientResult.Callback.New(
*FetchTasklet,
FetchTasklet.callback,
).init(
&linked_list.data,
);
return linked_list;
}
@@ -672,21 +681,20 @@ pub const Fetch = struct {
timeout: usize,
request_body_store: ?*Blob.Store,
) !*FetchTasklet.Pool.Node {
try NetworkThread.init();
try HTTPClient.HTTPThread.init();
var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store);
node.data.global_this = global;
node.data.http.callback = callback;
var batch = NetworkThread.Batch{};
node.data.http.schedule(allocator, &batch);
NetworkThread.global.schedule(batch);
HTTPClient.http_thread.schedule(batch);
VirtualMachine.vm.active_tasks +|= 1;
return node;
}
pub fn callback(http_: *HTTPClient.AsyncHTTP) void {
var task: *FetchTasklet = @fieldParentPtr(FetchTasklet, "http", http_);
@atomicStore(Status, &task.status, Status.done, .Monotonic);
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
task.response_buffer = result.body.?.*;
task.result = result;
task.javascript_vm.eventLoop().enqueueTaskConcurrent(Task.init(task));
}
};

View File

@@ -251,7 +251,7 @@ pub const CreateCommand = struct {
@setCold(true);
Global.configureAllocator(.{ .long_running = false });
try NetworkThread.init();
try HTTP.HTTPThread.init();
var create_options = try CreateOptions.parse(ctx, false);
const positionals = create_options.positionals;
@@ -1849,7 +1849,16 @@ pub const Example = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, api_url, header_entries, headers_buf, mutable, &request_body, 60 * std.time.ns_per_min);
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
api_url,
header_entries,
headers_buf,
mutable,
&request_body,
60 * std.time.ns_per_min,
);
async_http.client.progress_node = progress;
const response = try async_http.sendSync(true);
@@ -1912,7 +1921,7 @@ pub const Example = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
async_http.client.progress_node = progress;
var response = try async_http.sendSync(true);
@@ -1984,7 +1993,7 @@ pub const Example = struct {
mutable.reset();
// ensure very stable memory address
async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, URL.parse(tarball_url), .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, URL.parse(tarball_url), .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
async_http.client.progress_node = progress;
refresher.maybeRefresh();
@@ -2013,7 +2022,16 @@ pub const Example = struct {
var mutable = try ctx.allocator.create(MutableString);
mutable.* = try MutableString.init(ctx.allocator, 2048);
async_http.* = try HTTP.AsyncHTTP.init(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
url,
.{},
"",
mutable,
&request_body,
60 * std.time.ns_per_min,
);
if (Output.enable_ansi_colors) {
async_http.client.progress_node = progress_node;

View File

@@ -211,7 +211,16 @@ pub const UpgradeCommand = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = try HTTP.AsyncHTTP.init(allocator, .GET, api_url, header_entries, headers_buf, &metadata_body, &request_body, 60 * std.time.ns_per_min);
async_http.* = HTTP.AsyncHTTP.initSync(
allocator,
.GET,
api_url,
header_entries,
headers_buf,
&metadata_body,
&request_body,
60 * std.time.ns_per_min,
);
if (!silent) async_http.client.progress_node = progress;
const response = try async_http.sendSync(true);
@@ -434,7 +443,7 @@ pub const UpgradeCommand = struct {
zip_file_buffer.* = try MutableString.init(ctx.allocator, @maximum(version.size, 1024));
var request_buffer = try MutableString.init(ctx.allocator, 0);
async_http.* = try HTTP.AsyncHTTP.init(
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
.GET,
URL.parse(version.zip_url),

View File

@@ -8,6 +8,7 @@ const Environment = @import("../global.zig").Environment;
const fmt = std.fmt;
const assert = std.debug.assert;
const StringBuilder = @import("../string_builder.zig");
pub const Header = struct {
name: []const u8,
@@ -33,6 +34,18 @@ pub const Header = struct {
}
}
pub fn count(this: *const Header, builder: *StringBuilder) void {
builder.count(this.name);
builder.count(this.value);
}
pub fn clone(this: *const Header, builder: *StringBuilder) Header {
return .{
.name = builder.append(this.name),
.value = builder.append(this.value),
};
}
comptime {
assert(@sizeOf(Header) == @sizeOf(c.phr_header));
assert(@alignOf(Header) == @alignOf(c.phr_header));
@@ -44,6 +57,21 @@ pub const Request = struct {
path: []const u8,
minor_version: usize,
headers: []const Header,
bytes_read: u32 = 0,
pub fn clone(this: *const Request, headers: []Header, builder: *StringBuilder) Request {
for (this.headers) |header, i| {
headers[i] = header.clone(builder);
}
return .{
.method = builder.append(this.method),
.path = builder.append(this.path),
.minor_version = this.minor_version,
.headers = headers,
.bytes_read = this.bytes_read,
};
}
pub fn format(self: Request, comptime _: []const u8, _: fmt.FormatOptions, writer: anytype) !void {
try fmt.format(writer, "{s} {s}\n", .{ self.method, self.path });
@@ -83,16 +111,17 @@ pub const Request = struct {
.path = path,
.minor_version = @intCast(usize, minor_version),
.headers = src[0..num_headers],
.bytes_read = @intCast(u32, rc),
},
};
}
};
pub const Response = struct {
minor_version: usize,
status_code: usize,
status: []const u8,
headers: []Header,
minor_version: usize = 0,
status_code: usize = 0,
status: []const u8 = "",
headers: []Header = &.{},
bytes_read: c_int = 0,
pub fn format(self: Response, comptime _: []const u8, _: fmt.FormatOptions, writer: anytype) !void {
@@ -103,6 +132,27 @@ pub const Response = struct {
}
}
pub fn count(this: *const Response, builder: *StringBuilder) void {
builder.count(this.status);
for (this.headers) |header| {
header.count(builder);
}
}
pub fn clone(this: *const Response, headers: []Header, builder: *StringBuilder) Response {
var that = this.*;
that.status = builder.append(this.status);
for (this.headers) |header, i| {
headers[i] = header.clone(builder);
}
that.headers = headers[0..this.headers.len];
return that;
}
pub fn parseParts(buf: []const u8, src: []Header, offset: ?*usize) !Response {
var minor_version: c_int = 1;
var status_code: c_int = 0;

View File

@@ -10,10 +10,10 @@ pub extern fn phr_parse_request(buf: [*c]const u8, len: usize, method: [*c][*c]c
pub extern fn phr_parse_response(_buf: [*c]const u8, len: usize, minor_version: [*c]c_int, status: [*c]c_int, msg: [*c][*c]const u8, msg_len: [*c]usize, headers: [*c]struct_phr_header, num_headers: [*c]usize, last_len: usize) c_int;
pub extern fn phr_parse_headers(buf: [*c]const u8, len: usize, headers: [*c]struct_phr_header, num_headers: [*c]usize, last_len: usize) c_int;
pub const struct_phr_chunked_decoder = extern struct {
bytes_left_in_chunk: usize,
consume_trailer: u8,
_hex_count: u8,
_state: u8,
bytes_left_in_chunk: usize = 0,
consume_trailer: u8 = 0,
_hex_count: u8 = 0,
_state: u8 = 0,
};
pub extern fn phr_decode_chunked(decoder: *struct_phr_chunked_decoder, buf: [*]u8, bufsz: *usize) isize;
pub extern fn phr_decode_chunked_is_in_data(decoder: *struct_phr_chunked_decoder) c_int;

View File

@@ -25,12 +25,17 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
return us_socket_timeout(comptime ssl_int, this.socket, seconds);
}
pub fn ext(this: ThisSocket, comptime ContextType: type) ?*ContextType {
const alignment = if (ContextType == *anyopaque)
@sizeOf(usize)
else
std.meta.alignment(ContextType);
var ptr = us_socket_ext(
comptime ssl_int,
this.socket,
) orelse return null;
return @ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), ptr));
return @ptrCast(*ContextType, @alignCast(alignment, ptr));
}
pub fn context(this: ThisSocket) *us_socket_context_t {
return us_socket_context(
@@ -126,28 +131,51 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
return holder;
}
pub fn connectAnon(
host: []const u8,
port: c_int,
socket_ctx: *us_socket_context_t,
ptr: *anyopaque,
) ?ThisSocket {
var stack_fallback = std.heap.stackFallback(1024, bun.default_allocator);
var allocator = stack_fallback.get();
var host_ = allocator.dupeZ(u8, host) catch return null;
defer allocator.free(host_);
var socket = us_socket_context_connect(comptime ssl_int, socket_ctx, host_, port, null, 0, @sizeOf(*anyopaque)) orelse return null;
const socket_ = ThisSocket{ .socket = socket };
var holder = socket_.ext(*anyopaque) orelse {
if (comptime bun.Environment.allow_assert) unreachable;
_ = us_socket_close_connecting(comptime ssl_int, socket);
return null;
};
holder.* = ptr;
return socket_;
}
pub fn configure(
ctx: *us_socket_context_t,
comptime ContextType: type,
comptime onOpen: anytype,
comptime onClose: anytype,
comptime onData: anytype,
comptime onWritable: anytype,
comptime onTimeout: anytype,
comptime onConnectError: anytype,
comptime onEnd: anytype,
comptime Fields: anytype,
) void {
const field_type = comptime if (@TypeOf(Fields) != type) @TypeOf(Fields) else Fields;
const SocketHandler = struct {
const alignment = if (ContextType == anyopaque)
@sizeOf(usize)
else
std.meta.alignment(ContextType);
pub fn on_open(socket: *Socket, _: c_int, _: [*c]u8, _: c_int) callconv(.C) ?*Socket {
onOpen(
@ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
Fields.onOpen(
@ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
);
return socket;
}
pub fn on_close(socket: *Socket, code: c_int, reason: ?*anyopaque) callconv(.C) ?*Socket {
onClose(
@ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
Fields.onClose(
@ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
code,
reason,
@@ -155,57 +183,57 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
return socket;
}
pub fn on_data(socket: *Socket, buf: ?[*]u8, len: c_int) callconv(.C) ?*Socket {
onData(
@ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
Fields.onData(
@ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
buf.?[0..@intCast(usize, len)],
);
return socket;
}
pub fn on_writable(socket: *Socket) callconv(.C) ?*Socket {
onWritable(
@ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
Fields.onWritable(
@ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
);
return socket;
}
pub fn on_timeout(socket: *Socket) callconv(.C) ?*Socket {
onTimeout(
@ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
Fields.onTimeout(
@ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
);
return socket;
}
pub fn on_connect_error(socket: *Socket, code: c_int) callconv(.C) ?*Socket {
onConnectError(
@ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
Fields.onConnectError(
@ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
code,
);
return socket;
}
pub fn on_end(socket: *Socket) callconv(.C) ?*Socket {
onEnd(
@ptrCast(*ContextType, @alignCast(std.meta.alignment(ContextType), us_socket_ext(comptime ssl_int, socket).?)),
Fields.onEnd(
@ptrCast(*ContextType, @alignCast(alignment, us_socket_ext(comptime ssl_int, socket).?)),
ThisSocket{ .socket = socket },
);
return socket;
}
};
if (comptime @typeInfo(@TypeOf(onOpen)) != .Null)
if (comptime @hasDecl(field_type, "onOpen") and @typeInfo(@TypeOf(field_type.onOpen)) != .Null)
us_socket_context_on_open(ssl_int, ctx, SocketHandler.on_open);
if (comptime @typeInfo(@TypeOf(onClose)) != .Null)
if (comptime @hasDecl(field_type, "onClose") and @typeInfo(@TypeOf(field_type.onClose)) != .Null)
us_socket_context_on_close(ssl_int, ctx, SocketHandler.on_close);
if (comptime @typeInfo(@TypeOf(onData)) != .Null)
if (comptime @hasDecl(field_type, "onData") and @typeInfo(@TypeOf(field_type.onData)) != .Null)
us_socket_context_on_data(ssl_int, ctx, SocketHandler.on_data);
if (comptime @typeInfo(@TypeOf(onWritable)) != .Null)
if (comptime @hasDecl(field_type, "onWritable") and @typeInfo(@TypeOf(field_type.onWritable)) != .Null)
us_socket_context_on_writable(ssl_int, ctx, SocketHandler.on_writable);
if (comptime @typeInfo(@TypeOf(onTimeout)) != .Null)
if (comptime @hasDecl(field_type, "onTimeout") and @typeInfo(@TypeOf(field_type.onTimeout)) != .Null)
us_socket_context_on_timeout(ssl_int, ctx, SocketHandler.on_timeout);
if (comptime @typeInfo(@TypeOf(onConnectError)) != .Null)
if (comptime @hasDecl(field_type, "onConnectError") and @typeInfo(@TypeOf(field_type.onConnectError)) != .Null)
us_socket_context_on_connect_error(ssl_int, ctx, SocketHandler.on_connect_error);
if (comptime @typeInfo(@TypeOf(onEnd)) != .Null)
if (comptime @hasDecl(field_type, "onEnd") and @typeInfo(@TypeOf(field_type.onEnd)) != .Null)
us_socket_context_on_end(ssl_int, ctx, SocketHandler.on_end);
}
@@ -316,7 +344,7 @@ extern fn us_socket_context_add_server_name(ssl: c_int, context: ?*us_socket_con
extern fn us_socket_context_remove_server_name(ssl: c_int, context: ?*us_socket_context_t, hostname_pattern: [*c]const u8) void;
extern fn us_socket_context_on_server_name(ssl: c_int, context: ?*us_socket_context_t, cb: ?fn (?*us_socket_context_t, [*c]const u8) callconv(.C) void) void;
extern fn us_socket_context_get_native_handle(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque;
extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t;
pub extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t;
extern fn us_socket_context_free(ssl: c_int, context: ?*us_socket_context_t) void;
extern fn us_socket_context_on_open(ssl: c_int, context: ?*us_socket_context_t, on_open: fn (*Socket, c_int, [*c]u8, c_int) callconv(.C) ?*Socket) void;
extern fn us_socket_context_on_close(ssl: c_int, context: ?*us_socket_context_t, on_close: fn (*Socket, c_int, ?*anyopaque) callconv(.C) ?*Socket) void;

106
src/hive_array.zig Normal file
View File

@@ -0,0 +1,106 @@
const std = @import("std");
const assert = std.debug.assert;
const mem = std.mem;
const testing = std.testing;
/// An array that efficiently tracks which elements are in use.
/// The pointers are intended to be stable
/// Sorta related to https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2021/p0447r15.html
pub fn HiveArray(comptime T: type, comptime capacity: u16) type {
return struct {
const Self = @This();
buffer: [capacity]T = undefined,
available: std.bit_set.IntegerBitSet(capacity) = std.bit_set.IntegerBitSet(capacity).initFull(),
pub const size = capacity;
pub fn init() Self {
return .{};
}
pub fn get(self: *Self) ?*T {
const index = self.available.findFirstSet() orelse return null;
self.available.unset(index);
return &self.buffer[index];
}
pub fn at(self: *Self, index: u16) *T {
assert(index < capacity);
return &self.buffer[index];
}
pub fn claim(self: *Self, index: u16) void {
assert(index < capacity);
assert(self.available.isSet(index));
self.available.unset(index);
}
pub fn indexOf(self: *const Self, value: *const T) ?u63 {
const start = &self.buffer;
const end = @ptrCast([*]const T, start) + capacity;
if (!(@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end)))
return null;
// aligned to the size of T
const index = (@ptrToInt(value) - @ptrToInt(start)) / @sizeOf(T);
assert(index < capacity);
assert(&self.buffer[index] == value);
return @truncate(u63, index);
}
pub fn in(self: *const Self, value: *const T) bool {
const start = &self.buffer;
const end = @ptrCast([*]const T, start) + capacity;
return (@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end));
}
pub fn put(self: *Self, value: *T) bool {
const index = self.indexOf(value) orelse return false;
assert(!self.available.isSet(index));
assert(&self.buffer[index] == value);
value.* = undefined;
self.available.set(index);
return true;
}
};
}
test "HiveArray" {
const size = 64;
// Choose an integer with a weird alignment
const Int = u127;
var a = HiveArray(Int, size).init();
{
var b = a.get().?;
try testing.expect(a.get().? != b);
try testing.expectEqual(a.indexOf(b), 0);
try testing.expect(a.put(b));
try testing.expect(a.get().? == b);
var c = a.get().?;
c.* = 123;
var d: Int = 12345;
try testing.expect(a.put(&d) == false);
try testing.expect(a.in(&d) == false);
}
a.available = @TypeOf(a.available).initFull();
{
var i: u63 = 0;
while (i < size) {
var b = a.get().?;
try testing.expectEqual(a.indexOf(b), i);
try testing.expect(a.put(b));
try testing.expect(a.get().? == b);
i = i + 1;
}
i = 0;
while (i < size) : (i += 1) {
try testing.expect(a.get() == null);
}
}
}

View File

@@ -145,7 +145,19 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type {
vm.uws_event_loop = loop;
Socket.configure(ctx, HTTPClient, handleOpen, handleClose, handleData, handleWritable, handleTimeout, handleConnectError, handleEnd);
Socket.configure(
ctx,
HTTPClient,
.{
.onOpen = handleOpen,
.onClose = handleClose,
.onData = handleData,
.onWritable = handleWritable,
.onTimeout = handleTimeout,
.onConnectError = handleConnectError,
.onEnd = handleEnd,
},
);
if (is_new_loop) {
vm.prepareLoop();
}
@@ -805,13 +817,14 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
Socket.configure(
ctx,
WebSocket,
null,
handleClose,
handleData,
handleWritable,
handleTimeout,
handleConnectError,
handleEnd,
.{
.onClose = handleClose,
.onData = handleData,
.onWritable = handleWritable,
.onTimeout = handleTimeout,
.onConnectError = handleConnectError,
.onEnd = handleEnd,
},
);
}

View File

@@ -20,6 +20,8 @@ pub fn init(allocator: std.mem.Allocator) ZlibPool {
}
pub fn get(this: *ZlibPool) !*MutableString {
std.debug.assert(loaded);
switch (this.items.items.len) {
0 => {
var mutable = try getAllocator().create(MutableString);
@@ -35,6 +37,7 @@ pub fn get(this: *ZlibPool) !*MutableString {
}
pub fn put(this: *ZlibPool, mutable: *MutableString) !void {
std.debug.assert(loaded);
mutable.reset();
try this.items.append(mutable);
}

File diff suppressed because it is too large Load Diff

View File

@@ -40,6 +40,7 @@ const URL = @import("../url.zig").URL;
const AsyncHTTP = @import("http").AsyncHTTP;
const HTTPChannel = @import("http").HTTPChannel;
const NetworkThread = @import("http").NetworkThread;
const HTTP = @import("http");
const Integrity = @import("./integrity.zig").Integrity;
const clap = @import("clap");
@@ -183,9 +184,9 @@ const NetworkTask = struct {
binlink: void,
},
pub fn notify(http: *AsyncHTTP) void {
pub fn notify(this: *NetworkTask, _: anytype) void {
defer PackageManager.instance.wake();
PackageManager.instance.network_channel.writeItem(@fieldParentPtr(NetworkTask, "http", http)) catch {};
PackageManager.instance.network_channel.writeItem(this) catch {};
}
// We must use a less restrictive Accept header value
@@ -319,7 +320,7 @@ const NetworkTask = struct {
this.request_buffer = try MutableString.init(allocator, 0);
this.response_buffer = try MutableString.init(allocator, 0);
this.allocator = allocator;
this.http = try AsyncHTTP.init(
this.http = AsyncHTTP.init(
allocator,
.GET,
URL.parse(this.url_buf),
@@ -328,6 +329,7 @@ const NetworkTask = struct {
&this.response_buffer,
&this.request_buffer,
0,
this.getCompletionCallback(),
);
this.http.max_retry_count = PackageManager.instance.options.max_retry_count;
this.callback = .{
@@ -347,8 +349,10 @@ const NetworkTask = struct {
this.http.client.force_last_modified = true;
this.http.client.if_modified_since = last_modified;
}
}
this.http.callback = notify;
pub fn getCompletionCallback(this: *NetworkTask) HTTP.HTTPClientResult.Callback {
return HTTP.HTTPClientResult.Callback.New(*NetworkTask, notify).init(this);
}
pub fn schedule(this: *NetworkTask, batch: *ThreadPool.Batch) void {
@@ -399,7 +403,7 @@ const NetworkTask = struct {
header_buf = header_builder.content.ptr.?[0..header_builder.content.len];
}
this.http = try AsyncHTTP.init(
this.http = AsyncHTTP.init(
allocator,
.GET,
URL.parse(this.url_buf),
@@ -408,8 +412,8 @@ const NetworkTask = struct {
&this.response_buffer,
&this.request_buffer,
0,
this.getCompletionCallback(),
);
this.http.callback = notify;
this.http.max_retry_count = PackageManager.instance.options.max_retry_count;
this.callback = .{ .extract = tarball };
}
@@ -2424,7 +2428,7 @@ pub const PackageManager = struct {
manager.pending_tasks += @truncate(u32, count);
manager.total_tasks += @truncate(u32, count);
manager.network_resolve_batch.push(manager.network_tarball_batch);
NetworkThread.global.schedule(manager.network_resolve_batch);
HTTP.http_thread.schedule(manager.network_resolve_batch);
manager.network_tarball_batch = .{};
manager.network_resolve_batch = .{};
return count;
@@ -2463,7 +2467,7 @@ pub const PackageManager = struct {
this.pending_tasks += @truncate(u32, count);
this.total_tasks += @truncate(u32, count);
this.network_resolve_batch.push(this.network_tarball_batch);
NetworkThread.global.schedule(this.network_resolve_batch);
HTTP.http_thread.schedule(this.network_resolve_batch);
this.network_tarball_batch = .{};
this.network_resolve_batch = .{};
}
@@ -2831,7 +2835,7 @@ pub const PackageManager = struct {
manager.total_tasks += @truncate(u32, count);
manager.thread_pool.schedule(batch);
manager.network_resolve_batch.push(manager.network_tarball_batch);
NetworkThread.global.schedule(manager.network_resolve_batch);
HTTP.http_thread.schedule(manager.network_resolve_batch);
manager.network_tarball_batch = .{};
manager.network_resolve_batch = .{};
@@ -3611,7 +3615,7 @@ pub const PackageManager = struct {
cli: CommandLineArguments,
) !*PackageManager {
// assume that spawning a thread will take a lil so we do that asap
try NetworkThread.warmup();
try HTTP.HTTPThread.init();
if (cli.global) {
var explicit_global_dir: string = "";

View File

@@ -501,13 +501,16 @@ pub const Waker = struct {
kq: os.fd_t,
machport: *anyopaque = undefined,
machport_buf: []u8 = &.{},
has_pending_wake: bool = false,
const zeroed = std.mem.zeroes([16]Kevent64);
pub fn wake(this: Waker) !void {
if (!io_darwin_schedule_wakeup(this.machport)) {
return error.WakeUpFailed;
pub fn wake(this: *Waker) !void {
if (io_darwin_schedule_wakeup(this.machport)) {
this.has_pending_wake = false;
return;
}
this.has_pending_wake = true;
}
pub fn wait(this: Waker) !usize {

View File

@@ -26,6 +26,79 @@ pub var global: NetworkThread = undefined;
pub var global_loaded: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0);
const log = Output.scoped(.NetworkThread, true);
const Global = @import("./global.zig").Global;
pub fn onStartIOThread(waker: AsyncIO.Waker) void {
NetworkThread.address_list_cached = NetworkThread.AddressListCache.init(@import("./global.zig").default_allocator);
AsyncIO.global = AsyncIO.init(1024, 0, waker) catch |err| {
log: {
if (comptime Environment.isLinux) {
if (err == error.SystemOutdated) {
Output.prettyErrorln(
\\<red>error<r>: Linux kernel version doesn't support io_uring, which Bun depends on.
\\
\\ To fix this error: please upgrade to a newer Linux kernel.
\\
\\ If you're using Windows Subsystem for Linux, here's how:
\\ 1. Open PowerShell as an administrator
\\ 2. Run this:
\\ wsl --update
\\ wsl --shutdown
\\
\\ Please make sure you're using WSL version 2 (not WSL 1). To check: wsl -l -v
\\ If you are on WSL 1, update to WSL 2 with the following commands:
\\ 1. wsl --set-default-version 2
\\ 2. wsl --set-version [distro_name] 2
\\ 3. Now follow the WSL 2 instructions above.
\\ Where [distro_name] is one of the names from the list given by: wsl -l -v
\\
\\ If that doesn't work (and you're on a Windows machine), try this:
\\ 1. Open Windows Update
\\ 2. Download any updates to Windows Subsystem for Linux
\\
\\ If you're still having trouble, ask for help in bun's discord https://bun.sh/discord
, .{});
break :log;
} else if (err == error.SystemResources) {
Output.prettyErrorln(
\\<red>error<r>: memlock limit exceeded
\\
\\To fix this error: <b>please increase the memlock limit<r> or upgrade to Linux kernel 5.11+
\\
\\If Bun is running inside Docker, make sure to set the memlock limit to unlimited (-1)
\\
\\ docker run --rm --init --ulimit memlock=-1:-1 jarredsumner/bun:edge
\\
\\To bump the memlock limit, check one of the following:
\\ /etc/security/limits.conf
\\ /etc/systemd/user.conf
\\ /etc/systemd/system.conf
\\
\\You can also try running bun as root.
\\
\\If running many copies of Bun via exec or spawn, be sure that O_CLOEXEC is set so
\\that resources are not leaked when the child process exits.
\\
\\Why does this happen?
\\
\\Bun uses io_uring and io_uring accounts memory it
\\needs under the rlimit memlocked option, which can be
\\quite low on some setups (64K).
\\
\\
, .{});
break :log;
}
}
Output.prettyErrorln("<r><red>error<r>: Failed to initialize network thread: <red><b>{s}<r>.\nHTTP requests will not work. Please file an issue and run strace().", .{@errorName(err)});
}
Global.exit(1);
};
AsyncIO.global_loaded = true;
NetworkThread.global.io = &AsyncIO.global;
NetworkThread.global.processEvents();
}
fn queueEvents(this: *@This()) void {
this.queued_tasks_mutex.lock();
@@ -41,6 +114,7 @@ pub fn processEvents(this: *@This()) void {
processEvents_(this) catch {};
unreachable;
}
/// Should only be called on the HTTP thread!
fn processEvents_(this: *@This()) !void {
while (true) {
@@ -164,7 +238,7 @@ pub fn init() !void {
@compileLog("TODO: Waker");
}
global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, HTTP.onThreadStartNew, .{
global.thread = try std.Thread.spawn(.{ .stack_size = 64 * 1024 * 1024 }, onStartIOThread, .{
global.waker,
});
global.thread.detach();