Fix aborting fetch() calls while the socket is connecting. Fix a thread-safety issue involving redirects and AbortSignal. (#22842)

### What does this PR do?

When we added "happy eyeballs" support to fetch(), it meant that
`onOpen` would not be called potentially for awhile. If the AbortSignal
is aborted between `connect()` and the socket becoming
readable/writable, then we would delay closing the connection until the
connection opens. Fixing that fixes #18536.

Separately, the `isHTTPS()` function used in abort and in request body
streams was not thread safe. This caused a crash when many redirects
happen simultaneously while either AbortSignal or request body messages
are in-flight.
This PR fixes https://github.com/oven-sh/bun/issues/14137



### How did you verify your code works?

There are tests

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Claude Bot <claude-bot@bun.sh>
Co-authored-by: Ciro Spaciari <ciro.spaciari@gmail.com>
This commit is contained in:
Jarred Sumner
2025-09-25 16:08:06 -07:00
committed by GitHub
parent 20854fb285
commit 4dfd87a302
14 changed files with 497 additions and 290 deletions

View File

@@ -13,8 +13,7 @@ queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(Wri
queued_shutdowns_lock: bun.Mutex = .{},
queued_writes_lock: bun.Mutex = .{},
queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
queued_threadlocal_proxy_derefs: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
timer: std.time.Timer,
@@ -82,21 +81,15 @@ pub const RequestBodyBuffer = union(enum) {
const threadlog = Output.scoped(.HTTPThread, .hidden);
const WriteMessage = struct {
async_http_id: u32,
flags: packed struct(u8) {
is_tls: bool,
type: Type,
_: u5 = 0,
},
message_type: Type,
pub const Type = enum(u2) {
data = 0,
end = 1,
endChunked = 2,
};
};
const ShutdownMessage = struct {
async_http_id: u32,
is_tls: bool,
};
pub const LibdeflateState = struct {
@@ -285,62 +278,96 @@ pub fn context(this: *@This(), comptime is_ssl: bool) *NewHTTPContext(is_ssl) {
return if (is_ssl) &this.https_context else &this.http_context;
}
fn drainEvents(this: *@This()) void {
{
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
for (this.queued_shutdowns.items) |http| {
fn drainQueuedShutdowns(this: *@This()) void {
while (true) {
// socket.close() can potentially be slow
// Let's not block other threads while this runs.
var queued_shutdowns = brk: {
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
const shutdowns = this.queued_shutdowns;
this.queued_shutdowns = .{};
break :brk shutdowns;
};
defer queued_shutdowns.deinit(bun.default_allocator);
for (queued_shutdowns.items) |http| {
if (bun.http.socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
if (http.is_tls) {
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
// do a fast shutdown here since we are aborting and we dont want to wait for the close_notify from the other side
socket.close(.failure);
} else {
const socket = uws.SocketTCP.fromAny(socket_ptr.value);
socket.close(.failure);
switch (socket_ptr.value) {
inline .SocketTLS, .SocketTCP => |socket, tag| {
const is_tls = tag == .SocketTLS;
const HTTPContext = HTTPThread.NewHTTPContext(comptime is_tls);
const tagged = HTTPContext.getTaggedFromSocket(socket);
if (tagged.get(HTTPClient)) |client| {
// If we only call socket.close(), then it won't
// call `onClose` if this happens before `onOpen` is
// called.
//
client.closeAndAbort(comptime is_tls, socket);
continue;
}
socket.close(.failure);
},
}
}
}
this.queued_shutdowns.clearRetainingCapacity();
if (queued_shutdowns.items.len == 0) {
break;
}
threadlog("drained {d} queued shutdowns", .{queued_shutdowns.items.len});
}
{
this.queued_writes_lock.lock();
defer this.queued_writes_lock.unlock();
for (this.queued_writes.items) |write| {
const flags = write.flags;
const messageType = flags.type;
const ended = messageType == .end or messageType == .endChunked;
}
fn drainQueuedWrites(this: *@This()) void {
while (true) {
var queued_writes = brk: {
this.queued_writes_lock.lock();
defer this.queued_writes_lock.unlock();
const writes = this.queued_writes;
this.queued_writes = .{};
break :brk writes;
};
defer queued_writes.deinit(bun.default_allocator);
for (queued_writes.items) |write| {
const message = write.message_type;
const ended = message == .end;
if (bun.http.socket_async_http_abort_tracker.get(write.async_http_id)) |socket_ptr| {
switch (flags.is_tls) {
inline true, false => |is_tls| {
const socket = uws.NewSocketHandler(is_tls).fromAny(socket_ptr);
switch (socket_ptr) {
inline .SocketTLS, .SocketTCP => |socket, tag| {
const is_tls = tag == .SocketTLS;
if (socket.isClosed() or socket.isShutdown()) {
continue;
}
const tagged = NewHTTPContext(is_tls).getTaggedFromSocket(socket);
const tagged = NewHTTPContext(comptime is_tls).getTaggedFromSocket(socket);
if (tagged.get(HTTPClient)) |client| {
if (client.state.original_request_body == .stream) {
var stream = &client.state.original_request_body.stream;
stream.ended = ended;
if (messageType == .endChunked and client.flags.upgrade_state != .upgraded) {
// only send the 0-length chunk if the request body is chunked and not upgraded
client.writeToStream(is_tls, socket, bun.http.end_of_chunked_http1_1_encoding_response_body);
} else {
client.flushStream(is_tls, socket);
}
client.flushStream(is_tls, socket);
}
}
},
}
}
}
this.queued_writes.clearRetainingCapacity();
if (queued_writes.items.len == 0) {
break;
}
threadlog("drained {d} queued writes", .{queued_writes.items.len});
}
}
while (this.queued_proxy_deref.pop()) |http| {
fn drainEvents(this: *@This()) void {
// Process any pending writes **before** aborting.
this.drainQueuedWrites();
this.drainQueuedShutdowns();
for (this.queued_threadlocal_proxy_derefs.items) |http| {
http.deref();
}
this.queued_threadlocal_proxy_derefs.clearRetainingCapacity();
var count: usize = 0;
var active = AsyncHTTP.active_requests_count.load(.monotonic);
@@ -379,6 +406,14 @@ fn processEvents(this: *@This()) noreturn {
while (true) {
this.drainEvents();
if (comptime Environment.isDebug and bun.asan.enabled) {
for (bun.http.socket_async_http_abort_tracker.keys(), bun.http.socket_async_http_abort_tracker.values()) |http_id, socket| {
if (socket.socket().get()) |usocket| {
_ = http_id;
bun.asan.assertUnpoisoned(usocket);
}
}
}
var start_time: i128 = 0;
if (comptime Environment.isDebug) {
@@ -390,6 +425,15 @@ fn processEvents(this: *@This()) noreturn {
this.loop.loop.tick();
this.loop.loop.dec();
if (comptime Environment.isDebug and bun.asan.enabled) {
for (bun.http.socket_async_http_abort_tracker.keys(), bun.http.socket_async_http_abort_tracker.values()) |http_id, socket| {
if (socket.socket().get()) |usocket| {
_ = http_id;
bun.asan.assertUnpoisoned(usocket);
}
}
}
// this.loop.run();
if (comptime Environment.isDebug) {
const end = std.time.nanoTimestamp();
@@ -400,12 +444,12 @@ fn processEvents(this: *@This()) noreturn {
}
pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void {
threadlog("scheduleShutdown {d}", .{http.async_http_id});
{
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
this.queued_shutdowns.append(bun.default_allocator, .{
.async_http_id = http.async_http_id,
.is_tls = http.client.isHTTPS(),
}) catch |err| bun.handleOom(err);
}
if (this.has_awoken.load(.monotonic))
@@ -418,10 +462,7 @@ pub fn scheduleRequestWrite(this: *@This(), http: *AsyncHTTP, messageType: Write
defer this.queued_writes_lock.unlock();
this.queued_writes.append(bun.default_allocator, .{
.async_http_id = http.async_http_id,
.flags = .{
.is_tls = http.client.isHTTPS(),
.type = messageType,
},
.message_type = messageType,
}) catch |err| bun.handleOom(err);
}
if (this.has_awoken.load(.monotonic))
@@ -429,10 +470,8 @@ pub fn scheduleRequestWrite(this: *@This(), http: *AsyncHTTP, messageType: Write
}
pub fn scheduleProxyDeref(this: *@This(), proxy: *ProxyTunnel) void {
// this is always called on the http thread
{
bun.handleOom(this.queued_proxy_deref.append(bun.default_allocator, proxy));
}
// this is always called on the http thread,
bun.handleOom(this.queued_threadlocal_proxy_derefs.append(bun.default_allocator, proxy));
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}
@@ -473,7 +512,6 @@ const Global = bun.Global;
const Output = bun.Output;
const jsc = bun.jsc;
const strings = bun.strings;
const uws = bun.uws;
const Arena = bun.allocators.MimallocArena;
const Batch = bun.ThreadPool.Batch;
const UnboundedQueue = bun.threading.UnboundedQueue;