diff --git a/src/bun.js/webcore/fetch/FetchTasklet.zig b/src/bun.js/webcore/fetch/FetchTasklet.zig index 1dcff13eec..81d7656978 100644 --- a/src/bun.js/webcore/fetch/FetchTasklet.zig +++ b/src/bun.js/webcore/fetch/FetchTasklet.zig @@ -815,51 +815,101 @@ pub const FetchTasklet = struct { } } + /// Called on main thread when HTTP thread has data ready. + /// THREAD SAFETY: This runs on main thread, must minimize lock holding time. pub fn onProgressUpdate(this: *FetchTasklet) bun.JSTerminated!void { jsc.markBinding(@src()); log("onProgressUpdate", .{}); - this.mutex.lock(); - this.has_schedule_callback.store(false, .monotonic); - const is_done = !this.result.has_more; + + // === MAIN THREAD - Reset atomic flag first (allows HTTP thread to schedule again) === + defer this.has_schedule_callback.store(false, .release); + + // Balance the ref() from HTTP thread callback + defer this.deref(); const vm = this.javascript_vm; - // vm is shutting down we cannot touch JS + + // Early check: VM shutting down? if (vm.isShuttingDown()) { + // Cannot touch JS - just clean up + this.mutex.lock(); + const is_done = !this.result.has_more; this.mutex.unlock(); + + // Note: deref() is handled by outer defer above if (is_done) { - this.deref(); + // Additional cleanup only when done + var poll_ref = this.poll_ref; + this.poll_ref = .{}; + poll_ref.unref(vm); } return; } + // === ACQUIRE LOCK - Brief critical section to read state === + // Copy state out from under lock, then release before doing JS work + var is_done: bool = undefined; + var is_waiting_request_stream_start: bool = undefined; + var can_stream: bool = undefined; + var is_waiting_body: bool = undefined; + var metadata_exists: bool = undefined; + var is_success: bool = undefined; + var is_waiting_abort: bool = undefined; + var certificate_info_snapshot: ?http.CertificateInfo = null; + + { + this.mutex.lock(); + defer this.mutex.unlock(); + + is_done = !this.result.has_more; + is_waiting_request_stream_start = this.is_waiting_request_stream_start; + can_stream = this.result.can_stream; + is_waiting_body = this.is_waiting_body; + metadata_exists = this.metadata != null; + is_success = this.result.isSuccess(); + is_waiting_abort = this.is_waiting_abort; + + // Extract certificate info (will be processed outside lock) + if (this.result.certificate_info) |cert_info| { + certificate_info_snapshot = cert_info; + this.result.certificate_info = null; + } + } + // === LOCK RELEASED - Now safe to do JS work === + const globalThis = this.global_this; + + // Clean up at end defer { - this.mutex.unlock(); - // if we are not done we wait until the next call if (is_done) { var poll_ref = this.poll_ref; this.poll_ref = .{}; poll_ref.unref(vm); - this.deref(); + // Note: deref() is handled by outer defer at line ~828 } } - if (this.is_waiting_request_stream_start and this.result.can_stream) { - // start streaming + + // Handle request stream start (requires JS interaction) + if (is_waiting_request_stream_start and can_stream) { this.startRequestStream(); } - // if we already respond the metadata and still need to process the body - if (this.is_waiting_body) { + + // Handle body data already received + if (is_waiting_body) { try this.onBodyReceived(); return; } - if (this.metadata == null and this.result.isSuccess()) return; - // if we abort because of cert error - // we wait the Http Client because we already have the response - // we just need to deinit - if (this.is_waiting_abort) { + // Early exit if no metadata yet + if (!metadata_exists and is_success) { return; } + + // Waiting for abort to complete + if (is_waiting_abort) { + return; + } + const promise_value = this.promise.valueOrEmpty(); if (promise_value.isEmptyOrUndefinedOrNull()) { @@ -868,14 +918,12 @@ pub const FetchTasklet = struct { return; } - if (this.result.certificate_info) |certificate_info| { - this.result.certificate_info = null; + // Process certificate validation (requires JS call - outside lock!) + if (certificate_info_snapshot) |certificate_info| { defer certificate_info.deinit(bun.default_allocator); - // we receive some error if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info)) { log("onProgressUpdate: aborted due certError", .{}); - // we need to abort the request const promise = promise_value.asAnyPromise().?; const tracker = this.tracker; var result = this.onReject(); @@ -888,13 +936,19 @@ pub const FetchTasklet = struct { this.promise.deinit(); return; } - // everything ok - if (this.metadata == null) { - log("onProgressUpdate: metadata is null", .{}); + + // Re-check metadata after cert validation + this.mutex.lock(); + const has_metadata = this.metadata != null; + this.mutex.unlock(); + + if (!has_metadata) { + log("onProgressUpdate: metadata is null after cert check", .{}); return; } } + // Resolve or reject promise (JS interaction - no lock held) const tracker = this.tracker; tracker.willDispatch(globalThis); defer { @@ -902,11 +956,11 @@ pub const FetchTasklet = struct { tracker.didDispatch(globalThis); this.promise.deinit(); } - const success = this.result.isSuccess(); + + const success = is_success; const result = switch (success) { true => jsc.Strong.Optional.create(this.onResolve(), globalThis), false => brk: { - // in this case we wanna a jsc.Strong.Optional so we just convert it var value = this.onReject(); const err = value.toJS(globalThis); if (this.sink) |sink| { @@ -924,11 +978,9 @@ pub const FetchTasklet = struct { task: jsc.AnyTask, pub fn resolve(self: *@This()) bun.JSTerminated!void { - // cleanup defer bun.default_allocator.destroy(self); defer self.held.deinit(); defer self.promise.deinit(); - // resolve the promise var prom = self.promise.swap().asAnyPromise().?; const res = self.held.swap(); res.ensureStillAlive(); @@ -936,12 +988,9 @@ pub const FetchTasklet = struct { } pub fn reject(self: *@This()) bun.JSTerminated!void { - // cleanup defer bun.default_allocator.destroy(self); defer self.held.deinit(); defer self.promise.deinit(); - - // reject the promise var prom = self.promise.swap().asAnyPromise().?; const res = self.held.swap(); res.ensureStillAlive(); @@ -951,7 +1000,6 @@ pub const FetchTasklet = struct { var holder = bun.handleOom(bun.default_allocator.create(Holder)); holder.* = .{ .held = result, - // we need the promise to be alive until the task is done .promise = this.promise.strong, .globalObject = globalThis, .task = undefined, @@ -1656,48 +1704,63 @@ pub const FetchTasklet = struct { } /// Called from HTTP thread. Handles HTTP events received from socket. + /// THREAD SAFETY: This runs on HTTP thread, must minimize work under lock. pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void { - // at this point only this thread is accessing result to is no race condition + // === HTTP THREAD - Fast-path checks before lock === + const is_done = !result.has_more; - // we are done with the http client so we can deref our side - // this is a atomic operation and will enqueue a task to deinit on the main thread defer if (is_done) task.derefFromThread(); + // Fast-path abort check (no lock needed for atomic read) + if (task.signal_store.aborted.load(.acquire)) { + // Already aborted, don't schedule anything + return; + } + + // Prevent duplicate enqueues (atomic swap before taking lock) + if (task.has_schedule_callback.swap(true, .acq_rel)) { + // Already scheduled, this data will be picked up on next callback + return; + } + + // === ACQUIRE LOCK - Brief critical section === task.mutex.lock(); - // we need to unlock before task.deref(); defer task.mutex.unlock(); + + // Update HTTP client reference (needed for abort handling) task.http.?.* = async_http.*; task.http.?.response_buffer = async_http.response_buffer; log("callback success={} ignore_data={} has_more={} bytes={}", .{ result.isSuccess(), task.ignore_data, result.has_more, result.body.?.list.items.len }); + // Preserve previous metadata and certificate info const prev_metadata = task.result.metadata; const prev_cert_info = task.result.certificate_info; task.result = result; - // Preserve pending certificate info if it was preovided in the previous update. if (task.result.certificate_info == null) { if (prev_cert_info) |cert_info| { task.result.certificate_info = cert_info; } } - // metadata should be provided only once + // Store metadata (only provided once) if (result.metadata orelse prev_metadata) |metadata| { log("added callback metadata", .{}); if (task.metadata == null) { task.metadata = metadata; } - task.result.metadata = null; } task.body_size = result.body_size; + // Copy response body data to shared buffer const success = result.isSuccess(); task.response_buffer = result.body.?.*; if (task.ignore_data) { + // Ignoring data - clear buffers task.response_buffer.reset(); if (task.scheduled_response_buffer.list.capacity > 0) { @@ -1710,24 +1773,37 @@ pub const FetchTasklet = struct { }, }; } + if (success and result.has_more) { - // we are ignoring the body so we should not receive more data, so will only signal when result.has_more = true + // Ignoring body with more data - don't schedule callback + // Reset flag so future callbacks can schedule + task.has_schedule_callback.store(false, .release); return; } } else { + // Accumulate data into scheduled buffer if (success) { - _ = bun.handleOom(task.scheduled_response_buffer.write(task.response_buffer.list.items)); + // Handle OOM gracefully under lock + _ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch blk: { + // OOM while copying data - mark as failed + task.result.fail = error.OutOfMemory; + // Continue to schedule callback so main thread can handle error + break :blk 0; + }; } - // reset for reuse + // Reset for reuse by HTTP client task.response_buffer.reset(); } - if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| { - if (has_schedule_callback) { - return; - } - } + // === RELEASE LOCK - Schedule to main thread outside lock === + // Lock is automatically released by defer above + // Keep tasklet alive during main thread callback + // This will be balanced by deref() in onProgressUpdate + task.ref(); + + // Enqueue callback to main thread + // Note: concurrent_task.from() does not allocate, safe to call here task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit)); } };