Compare commits

...

7 Commits

Author SHA1 Message Date
Jarred Sumner
69b3653b7b Faster bun install on Windows 2024-05-23 02:29:50 -07:00
Jarred Sumner
628de36ad3 Fix windows errors 2024-05-22 23:22:13 -07:00
Jarred Sumner
dd39c1a0f6 Slight syscall reduction 2024-05-22 21:18:15 -07:00
Jarred Sumner
47ce51c43b Ensure nonblocking is set 2024-05-22 20:38:12 -07:00
Jarred Sumner
5dcc621224 Merge branch 'main' into georgijs/less-eyeballs 2024-05-22 19:23:09 -07:00
Jarred Sumner
98ae204cf7 Merge branch 'main' into georgijs/less-eyeballs 2024-05-22 19:16:57 -07:00
Georgijs Vilums
4113da1b2e limit concurrent connection count for happy eyeballs 2024-05-22 18:12:44 -07:00
5 changed files with 250 additions and 71 deletions

View File

@@ -276,6 +276,20 @@ LIBUS_SOCKET_DESCRIPTOR apple_no_sigpipe(LIBUS_SOCKET_DESCRIPTOR fd) {
return fd;
}
static LIBUS_SOCKET_DESCRIPTOR win32_set_nonblocking(LIBUS_SOCKET_DESCRIPTOR fd) {
#if _WIN32
if (fd != LIBUS_SOCKET_ERROR) {
// libuv will set non-blocking, but only on poll init!
// we need it to be set on connect as well
DWORD yes = 1;
ioctlsocket(fd, FIONBIO, &yes);
}
return fd;
#else
return fd;
#endif
}
LIBUS_SOCKET_DESCRIPTOR bsd_set_nonblocking(LIBUS_SOCKET_DESCRIPTOR fd) {
#ifdef _WIN32
/* Libuv will set windows sockets as non-blocking */
@@ -300,15 +314,14 @@ void bsd_socket_flush(LIBUS_SOCKET_DESCRIPTOR fd) {
}
LIBUS_SOCKET_DESCRIPTOR bsd_create_socket(int domain, int type, int protocol) {
// returns INVALID_SOCKET on error
int flags = 0;
#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK)
flags = SOCK_CLOEXEC | SOCK_NONBLOCK;
#endif
int flags = SOCK_CLOEXEC | SOCK_NONBLOCK;
LIBUS_SOCKET_DESCRIPTOR created_fd = socket(domain, type | flags, protocol);
return apple_no_sigpipe(created_fd);
#else
LIBUS_SOCKET_DESCRIPTOR created_fd = socket(domain, type, protocol);
return bsd_set_nonblocking(apple_no_sigpipe(created_fd));
#endif
}
void bsd_close_socket(LIBUS_SOCKET_DESCRIPTOR fd) {
@@ -888,20 +901,34 @@ int bsd_disconnect_udp_socket(LIBUS_SOCKET_DESCRIPTOR fd) {
// return 0; // no ecn defaults to 0
// }
static int bsd_do_connect_raw(struct sockaddr_storage *addr, LIBUS_SOCKET_DESCRIPTOR fd)
static int bsd_do_connect_raw(LIBUS_SOCKET_DESCRIPTOR fd, struct sockaddr *addr, size_t namelen)
{
int namelen = addr->ss_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
#ifdef _WIN32
do {
if (connect(fd, (struct sockaddr *)addr, namelen) == 0 || WSAGetLastError() == WSAEINPROGRESS) {
while (1) {
if (connect(fd, (struct sockaddr *)addr, namelen) == 0) {
return 0;
}
} while (WSAGetLastError() == WSAEINTR);
return WSAGetLastError();
int err = WSAGetLastError();
switch (err) {
case WSAEINPROGRESS:
case WSAEWOULDBLOCK:
case WSAEALREADY: {
return 0;
}
case WSAEINTR: {
continue;
}
default: {
return err;
}
}
}
#else
do {
if (connect(fd, (struct sockaddr *)addr, namelen) == 0 || errno == EINPROGRESS) {
if (connect(fd, (struct sockaddr *)addr, namelen) == 0 || errno == EINPROGRESS || errno == EAGAIN) {
return 0;
}
} while (errno == EINTR);
@@ -952,6 +979,7 @@ LIBUS_SOCKET_DESCRIPTOR bsd_create_connect_socket(struct sockaddr_storage *addr,
}
#ifdef _WIN32
win32_set_nonblocking(fd);
// On windows we can't connect to the null address directly.
// To match POSIX behavior, we need to connect to localhost instead.
@@ -982,8 +1010,9 @@ LIBUS_SOCKET_DESCRIPTOR bsd_create_connect_socket(struct sockaddr_storage *addr,
}
#endif
int rc = bsd_do_connect_raw(fd, (struct sockaddr*) addr, addr->ss_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6));
if (bsd_do_connect_raw(addr, fd) != 0) {
if (rc != 0) {
bsd_close_socket(fd);
return LIBUS_SOCKET_ERROR;
}
@@ -997,16 +1026,10 @@ static LIBUS_SOCKET_DESCRIPTOR internal_bsd_create_connect_socket_unix(const cha
return LIBUS_SOCKET_ERROR;
}
if (connect(fd, (struct sockaddr *)server_address, addrlen) != 0 && errno != EINPROGRESS) {
#if defined(_WIN32)
int shouldSimulateENOENT = WSAGetLastError() == WSAENETDOWN;
#endif
win32_set_nonblocking(fd);
if (bsd_do_connect_raw(fd, (struct sockaddr *)server_address, addrlen) != 0) {
bsd_close_socket(fd);
#if defined(_WIN32)
if (shouldSimulateENOENT) {
SetLastError(ERROR_PATH_NOT_FOUND);
}
#endif
return LIBUS_SOCKET_ERROR;
}

View File

@@ -25,6 +25,8 @@
#include <arpa/inet.h>
#endif
#define CONCURRENT_CONNECTIONS 2
int default_is_low_prio_handler(struct us_socket_t *s) {
return 0;
}
@@ -443,9 +445,9 @@ void *us_socket_context_connect(int ssl, struct us_socket_context_t *context, co
}
// if there is only one result we can immediately connect
if (result->info && result->info->ai_next == NULL) {
if (result->entries && result->entries->info.ai_next == NULL) {
struct sockaddr_storage addr;
init_addr_with_port(result->info, port, &addr);
init_addr_with_port(&result->entries->info, port, &addr);
*is_connecting = 1;
struct us_socket_t *s = us_socket_context_connect_resolved_dns(context, &addr, options, socket_ext_size);
Bun__addrinfo_freeRequest(ai_req, s == NULL);
@@ -474,37 +476,16 @@ void *us_socket_context_connect(int ssl, struct us_socket_context_t *context, co
return c;
}
void us_internal_socket_after_resolve(struct us_connecting_socket_t *c) {
// make sure to decrement the active_handles counter, no matter what
#ifdef _WIN32
c->context->loop->uv_loop->active_handles--;
#else
c->context->loop->num_polls--;
#endif
c->pending_resolve_callback = 0;
// if the socket was closed while we were resolving the address, free it
if (c->closed) {
us_connecting_socket_free(c);
return;
}
struct addrinfo_result *result = Bun__addrinfo_getRequestResult(c->addrinfo_req);
if (result->error) {
c->error = result->error;
c->context->on_connect_error(c, result->error);
Bun__addrinfo_freeRequest(c->addrinfo_req, 0);
us_connecting_socket_close(0, c);
return;
}
int error = 0;
for (struct addrinfo *info = result->info; info; info = info->ai_next) {
int start_connections(struct us_connecting_socket_t *c, int count) {
int opened = 0;
for (; c->addrinfo_head != NULL && opened < count; c->addrinfo_head = c->addrinfo_head->ai_next) {
struct sockaddr_storage addr;
init_addr_with_port(info, c->port, &addr);
init_addr_with_port(c->addrinfo_head, c->port, &addr);
LIBUS_SOCKET_DESCRIPTOR connect_socket_fd = bsd_create_connect_socket(&addr, c->options);
if (connect_socket_fd == LIBUS_SOCKET_ERROR) {
continue;
}
++opened;
bsd_socket_nodelay(connect_socket_fd, 1);
struct us_socket_t *s = (struct us_socket_t *)us_create_poll(c->context->loop, 0, sizeof(struct us_socket_t) + c->socket_ext_size);
@@ -530,20 +511,72 @@ void us_internal_socket_after_resolve(struct us_connecting_socket_t *c) {
us_poll_init(&s->p, connect_socket_fd, POLL_TYPE_SEMI_SOCKET);
us_poll_start(&s->p, s->context->loop, LIBUS_SOCKET_WRITABLE);
}
return opened;
}
if (!c->connecting_head) {
c->error = error;
c->context->on_connect_error(c, error);
Bun__addrinfo_freeRequest(c->addrinfo_req, 1);
void us_internal_socket_after_resolve(struct us_connecting_socket_t *c) {
// make sure to decrement the active_handles counter, no matter what
#ifdef _WIN32
c->context->loop->uv_loop->active_handles--;
#else
c->context->loop->num_polls--;
#endif
c->pending_resolve_callback = 0;
// if the socket was closed while we were resolving the address, free it
if (c->closed) {
us_connecting_socket_free(c);
return;
}
struct addrinfo_result *result = Bun__addrinfo_getRequestResult(c->addrinfo_req);
if (result->error) {
c->error = result->error;
c->context->on_connect_error(c, result->error);
Bun__addrinfo_freeRequest(c->addrinfo_req, 0);
us_connecting_socket_close(0, c);
return;
}
Bun__addrinfo_freeRequest(c->addrinfo_req, 0);
c->addrinfo_head = &result->entries->info;
int opened = start_connections(c, CONCURRENT_CONNECTIONS);
if (opened == 0) {
c->error = ECONNREFUSED;
c->context->on_connect_error(c, ECONNREFUSED);
Bun__addrinfo_freeRequest(c->addrinfo_req, 1);
us_connecting_socket_close(0, c);
return;
}
}
void us_internal_socket_after_open(struct us_socket_t *s, int error) {
struct us_connecting_socket_t *c = s->connect_state;
struct us_connecting_socket_t *c = s->connect_state;
#if _WIN32
// libuv doesn't give us a way to know if a non-blockingly connected socket failed to connect
// It shows up as writable.
//
// TODO: Submit PR to libuv to allow uv_poll to poll for connect and connect_fail
//
// AFD_POLL_CONNECT
// AFD_POLL_CONNECT_FAIL
//
if (error == 0) {
if (recv( us_poll_fd((struct us_poll_t*)s), NULL, 0, MSG_PUSH_IMMEDIATE ) == SOCKET_ERROR) {
// When a socket is not connected, this function returns WSAENOTCONN.
error = WSAGetLastError();
switch (error) {
case WSAEWOULDBLOCK:
case WSAEINTR: {
error = 0;
break;
}
default: {
break;
}
}
}
}
#endif
/* It is perfectly possible to come here with an error */
if (error) {
@@ -565,9 +598,17 @@ void us_internal_socket_after_open(struct us_socket_t *s, int error) {
}
}
us_socket_close(0, s, 0, 0);
// there are no further attempting to connect
if (!c->connecting_head) {
c->context->on_connect_error(c, error);
us_connecting_socket_close(0, c);
// start opening the next batch of connections
int opened = start_connections(c, CONCURRENT_CONNECTIONS);
// we have run out of addresses to attempt, signal the connection error
if (opened == 0) {
c->error = ECONNREFUSED;
c->context->on_connect_error(c, error);
Bun__addrinfo_freeRequest(c->addrinfo_req, ECONNREFUSED);
us_connecting_socket_close(0, c);
}
}
} else {
s->context->on_socket_connect_error(s, error);
@@ -594,6 +635,7 @@ void us_internal_socket_after_open(struct us_socket_t *s, int error) {
}
}
// now that the socket is open, we can release the associated us_connecting_socket_t if it exists
Bun__addrinfo_freeRequest(c->addrinfo_req, 0);
us_connecting_socket_free(c);
s->connect_state = NULL;
}

View File

@@ -76,8 +76,12 @@ void Bun__lock(uint32_t *lock);
void Bun__unlock(uint32_t *lock);
struct addrinfo_request;
struct addrinfo_result_entry {
struct addrinfo info;
struct sockaddr_storage _storage;
};
struct addrinfo_result {
struct addrinfo *info;
struct addrinfo_result_entry* entries;
int error;
};
@@ -158,6 +162,7 @@ struct us_connecting_socket_t {
unsigned char long_timeout;
uint16_t port;
int error;
struct addrinfo *addrinfo_head;
};
struct us_wrapped_socket_context_t {

View File

@@ -1226,7 +1226,7 @@ pub const InternalDNS = struct {
};
const Result = extern struct {
info: ?*std.c.addrinfo,
info: ?[*]ResultEntry,
err: c_int,
};
@@ -1279,7 +1279,7 @@ pub const InternalDNS = struct {
bun.assert(this.notify.items.len == 0);
if (this.result) |res| {
if (res.info) |info| {
std.c.freeaddrinfo(info);
bun.default_allocator.destroy(&info[0]);
}
}
if (this.key.host) |host| {
@@ -1432,11 +1432,84 @@ pub const InternalDNS = struct {
}
};
const ResultEntry = extern struct {
info: std.c.addrinfo,
addr: std.c.sockaddr.storage,
};
// re-order result to interleave ipv4 and ipv6 (also pack into a single allocation)
fn processResults(info: *std.c.addrinfo) []ResultEntry {
var count: usize = 0;
var info_: ?*std.c.addrinfo = info;
while (info_) |ai| {
count += 1;
info_ = ai.next;
}
var results = bun.default_allocator.alloc(ResultEntry, count) catch bun.outOfMemory();
// copy results
var i: usize = 0;
info_ = info;
while (info_) |ai| {
results[i].info = ai.*;
if (ai.addr) |addr| {
if (ai.family == std.c.AF.INET) {
const addr_in: *std.c.sockaddr.in = @ptrCast(&results[i].addr);
addr_in.* = @as(*std.c.sockaddr.in, @alignCast(@ptrCast(addr))).*;
} else if (ai.family == std.c.AF.INET6) {
const addr_in: *std.c.sockaddr.in6 = @ptrCast(&results[i].addr);
addr_in.* = @as(*std.c.sockaddr.in6, @alignCast(@ptrCast(addr))).*;
}
} else {
results[i].addr = std.mem.zeroes(std.c.sockaddr.storage);
}
i += 1;
info_ = ai.next;
}
// sort (interleave ipv4 and ipv6)
var want: usize = std.c.AF.INET6;
for (0..count) |idx| {
if (results[idx].info.family == want) continue;
for (idx + 1..count) |j| {
if (results[j].info.family == want) {
std.mem.swap(ResultEntry, &results[idx], &results[j]);
want = if (want == std.c.AF.INET6) std.c.AF.INET else std.c.AF.INET6;
}
} else {
// the rest of the list is all one address family
break;
}
}
// set up pointers
for (results, 0..) |*entry, idx| {
entry.info.canonname = null;
if (idx + 1 < count) {
entry.info.next = &results[idx + 1].info;
} else {
entry.info.next = null;
}
if (entry.info.addr != null) {
entry.info.addr = @alignCast(@ptrCast(&entry.addr));
}
}
return results;
}
fn afterResult(req: *Request, info: ?*std.c.addrinfo, err: c_int) void {
const results: ?[*]ResultEntry = if (info) |ai| brk: {
const res = processResults(ai);
std.c.freeaddrinfo(ai);
break :brk res.ptr;
} else null;
global_cache.lock.lock();
req.result = .{
.info = info,
.info = results,
.err = err,
};
var notify = req.notify;

View File

@@ -657,13 +657,50 @@ pub const PackageManifest = struct {
}
}
fn writeFile(this: *const PackageManifest, tmp_path: [:0]const u8, tmpdir: std.fs.Dir) !void {
var tmpfile = try tmpdir.createFileZ(tmp_path, .{
.truncate = true,
});
defer tmpfile.close();
const writer = tmpfile.writer();
fn writeFile(this: *const PackageManifest, tmp_path: [:0]const u8, tmpdir: std.fs.Dir, cache_dir: std.fs.Dir, outpath: [:0]const u8) !void {
var realpath_buf: bun.PathBuffer = undefined;
var realpath2_buf: bun.PathBuffer = undefined;
const tmp_path_partial = try bun.getFdPath(tmpdir, &realpath_buf);
const tmp_path_abs = bun.path.joinAbsStringBufZ(tmp_path_partial, &realpath2_buf, &.{ tmp_path_partial, tmp_path }, .auto);
const cache_dir_abs = try bun.getFdPath(cache_dir, &realpath_buf);
const cache_path_abs = bun.path.joinAbsStringBufZ(cache_dir_abs, &realpath2_buf, &.{ cache_dir_abs, outpath }, .auto);
const fields = .{
"string_buf",
"versions",
"external_strings",
"external_strings_for_versions",
"package_versions",
"extern_strings_bin_entries",
};
var estimated_byte_length: usize = 0;
inline for (fields) |field| {
estimated_byte_length += std.mem.sliceAsBytes(@field(this, field)).len;
}
var buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, estimated_byte_length);
defer buffer.deinit();
const writer = &buffer.writer();
try Serializer.write(this, @TypeOf(writer), writer);
// Do not forget to buffer writes!
// PS C:\bun> hyperfine "bun-debug install --ignore-scripts" "bun install --ignore-scripts" --prepare="del /s /q bun.lockb && del /s /q C:\Users\window\.bun\install\cache"
// Benchmark 1: bun-debug install --ignore-scripts
// Time (mean ± σ): 1.266 s ± 0.284 s [User: 1.631 s, System: 0.205 s]
// Range (min … max): 1.071 s … 1.804 s 10 runs
// Warning: Statistical outliers were detected. Consider re-running this benchmark on a quiet system without any interferences from other programs. It might help to use the '--warmup' or '--prepare' options.
// Benchmark 2: bun install --ignore-scripts
// Time (mean ± σ): 3.202 s ± 0.095 s [User: 0.255 s, System: 0.172 s]
// Range (min … max): 3.058 s … 3.371 s 10 runs
// Summary
// bun-debug install --ignore-scripts ran
// 2.53 ± 0.57 times faster than bun install --ignore-scripts
const file = try bun.sys.File.openat(tmpdir, tmp_path, std.os.O.CREAT | std.os.O.TRUNC, 0).unwrap();
{
errdefer file.close();
try file.writeAll(buffer.items).unwrap();
}
try file.closeAndMoveTo(tmp_path_abs, cache_path_abs);
}
pub fn save(this: *const PackageManifest, tmpdir: std.fs.Dir, cache_dir: std.fs.Dir) !void {
@@ -678,9 +715,8 @@ pub const PackageManifest = struct {
try dest_path_stream_writer.print("{any}.npm-{any}", .{ hex_fmt, hex_timestamp_fmt });
try dest_path_stream_writer.writeByte(0);
const tmp_path: [:0]u8 = dest_path_buf[0 .. dest_path_stream.pos - 1 :0];
try writeFile(this, tmp_path, tmpdir);
const out_path = std.fmt.bufPrintZ(&out_path_buf, "{any}.npm", .{hex_fmt}) catch unreachable;
try std.os.renameatZ(tmpdir.fd, tmp_path, cache_dir.fd, out_path);
try writeFile(this, tmp_path, tmpdir, cache_dir, out_path);
}
pub fn load(allocator: std.mem.Allocator, cache_dir: std.fs.Dir, package_name: string) !?PackageManifest {