fix streaming issue (#23289)

### What does this PR do?

### How did you verify your code works?
This commit is contained in:
Jarred Sumner
2025-10-06 05:39:22 -07:00
committed by GitHub
parent b81018707d
commit 08cee69ff4
8 changed files with 328 additions and 69 deletions

View File

@@ -10,9 +10,12 @@ queued_tasks: Queue = Queue{},
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){},
queued_response_body_drains: std.ArrayListUnmanaged(DrainMessage) = std.ArrayListUnmanaged(DrainMessage){},
queued_shutdowns_lock: bun.Mutex = .{},
queued_writes_lock: bun.Mutex = .{},
queued_response_body_drains_lock: bun.Mutex = .{},
queued_threadlocal_proxy_derefs: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
@@ -88,6 +91,9 @@ const WriteMessage = struct {
end = 1,
};
};
const DrainMessage = struct {
async_http_id: u32,
};
const ShutdownMessage = struct {
async_http_id: u32,
};
@@ -359,8 +365,43 @@ fn drainQueuedWrites(this: *@This()) void {
}
}
fn drainQueuedHTTPResponseBodyDrains(this: *@This()) void {
while (true) {
// socket.close() can potentially be slow
// Let's not block other threads while this runs.
var queued_response_body_drains = brk: {
this.queued_response_body_drains_lock.lock();
defer this.queued_response_body_drains_lock.unlock();
const drains = this.queued_response_body_drains;
this.queued_response_body_drains = .{};
break :brk drains;
};
defer queued_response_body_drains.deinit(bun.default_allocator);
for (queued_response_body_drains.items) |drain| {
if (bun.http.socket_async_http_abort_tracker.get(drain.async_http_id)) |socket_ptr| {
switch (socket_ptr) {
inline .SocketTLS, .SocketTCP => |socket, tag| {
const is_tls = tag == .SocketTLS;
const HTTPContext = HTTPThread.NewHTTPContext(comptime is_tls);
const tagged = HTTPContext.getTaggedFromSocket(socket);
if (tagged.get(HTTPClient)) |client| {
client.drainResponseBody(comptime is_tls, socket);
}
},
}
}
}
if (queued_response_body_drains.items.len == 0) {
break;
}
threadlog("drained {d} queued drains", .{queued_response_body_drains.items.len});
}
}
fn drainEvents(this: *@This()) void {
// Process any pending writes **before** aborting.
this.drainQueuedHTTPResponseBodyDrains();
this.drainQueuedWrites();
this.drainQueuedShutdowns();
@@ -443,6 +484,18 @@ fn processEvents(this: *@This()) noreturn {
}
}
pub fn scheduleResponseBodyDrain(this: *@This(), async_http_id: u32) void {
{
this.queued_response_body_drains_lock.lock();
defer this.queued_response_body_drains_lock.unlock();
this.queued_response_body_drains.append(bun.default_allocator, .{
.async_http_id = async_http_id,
}) catch |err| bun.handleOom(err);
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}
pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void {
threadlog("scheduleShutdown {d}", .{http.async_http_id});
{