From 7a44c8a89f203f981a84bfa8d2fc1aba30412e37 Mon Sep 17 00:00:00 2001 From: Claude Bot Date: Mon, 3 Nov 2025 23:41:00 +0000 Subject: [PATCH] Phase 7, Step 4: Move data to MainThread/Shared structs (breaking) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change completes the migration of FetchTasklet fields to MainThreadData and SharedData structs, eliminating duplicate field storage and clarifying data ownership and thread safety boundaries. Changes: - Removed 27 duplicate fields from FetchTasklet struct - Updated 100+ access sites to use this.main_thread.X or this.shared.X - Main thread fields: promise, global_this, vm, response, streams, abort handling - Shared fields: HTTP client, buffers, result, metadata, ref_count, mutex - Net reduction of 61 lines of code All data now explicitly separated into main-thread-only and thread-safe zones, providing clear boundaries for concurrent access patterns. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/bun.js/webcore/fetch/FetchTasklet.zig | 475 ++++++++++------------ 1 file changed, 207 insertions(+), 268 deletions(-) diff --git a/src/bun.js/webcore/fetch/FetchTasklet.zig b/src/bun.js/webcore/fetch/FetchTasklet.zig index dd3ac19d86..94cfb35e3e 100644 --- a/src/bun.js/webcore/fetch/FetchTasklet.zig +++ b/src/bun.js/webcore/fetch/FetchTasklet.zig @@ -974,8 +974,8 @@ const FetchError = union(enum) { // } // // // Cleanup response buffers -// this.response_buffer.deinit(); -// this.scheduled_response_buffer.deinit(); +// this.shared.response_buffer.deinit(); +// this.shared.scheduled_response_buffer.deinit(); // // // Cleanup thread-specific data // this.main_thread.deinit(); @@ -993,12 +993,12 @@ const FetchError = union(enum) { // // /// Reference counting for thread-safe lifecycle // pub fn ref(this: *FetchTasklet) void { -// const count = this.ref_count.fetchAdd(1, .monotonic); +// const count = this.shared.ref_count.fetchAdd(1, .monotonic); // bun.debugAssert(count > 0); // } // // pub fn deref(this: *FetchTasklet) void { -// const count = this.ref_count.fetchSub(1, .monotonic); +// const count = this.shared.ref_count.fetchSub(1, .monotonic); // bun.debugAssert(count > 0); // // if (count == 1) { @@ -1035,33 +1035,9 @@ pub const FetchTasklet = struct { const log = Output.scoped(.FetchTasklet, .visible); sink: ?*ResumableSink = null, - http: ?*http.AsyncHTTP = null, - result: http.HTTPClientResult = .{}, - metadata: ?http.HTTPResponseMetadata = null, - javascript_vm: *VirtualMachine = undefined, - global_this: *JSGlobalObject = undefined, request_body: HTTPRequestBody = undefined, request_body_streaming_buffer: ?*http.ThreadSafeStreamBuffer = null, - - /// buffer being used by AsyncHTTP - response_buffer: MutableString = undefined, - /// buffer used to stream response to JS - scheduled_response_buffer: MutableString = undefined, - /// response weak ref we need this to track the response JS lifetime - response: jsc.Weak(FetchTasklet) = .{}, - /// native response ref if we still need it when JS is discarted - native_response: ?*Response = null, - /// stream strong ref if any is available - readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{}, request_headers: Headers = Headers{ .allocator = undefined }, - promise: jsc.JSPromise.Strong, - concurrent_task: jsc.ConcurrentTask = .{}, - poll_ref: Async.KeepAlive = .{}, - /// For Http Client requests - /// when Content-Length is provided this represents the whole size of the request - /// If chunked encoded this will represent the total received size (ignoring the chunk headers) - /// If is not chunked encoded and Content-Length is not provided this will be unknown - body_size: http.HTTPClientResult.BodySize = .unknown, /// URL/Proxy buffer with explicit ownership tracking. /// @@ -1095,19 +1071,7 @@ pub const FetchTasklet = struct { /// However, the current pattern is already clear and well-established in the codebase. /// The explicit `allocator.free()` in clearData() is simple and obvious. url_proxy_buffer: []const u8 = "", - - signal: ?*jsc.WebCore.AbortSignal = null, - signals: http.Signals = .{}, - signal_store: http.Signals.Store = .{}, - has_schedule_callback: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - - // must be stored because AbortSignal stores reason weakly - abort_reason: jsc.Strong.Optional = .empty, - - // custom checkServerIdentity - check_server_identity: jsc.Strong.Optional = .empty, reject_unauthorized: bool = true, - upgraded_connection: bool = false, /// Hostname buffer for TLS certificate validation with custom checkServerIdentity. /// @@ -1137,15 +1101,9 @@ pub const FetchTasklet = struct { /// ownership explicit and ensures automatic cleanup via RAII. hostname: ?bun.ptr.Owned([]u8) = null, - mutex: Mutex, - - tracker: jsc.Debugger.AsyncTaskTracker, - - ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1), - - /// === PHASE 7 STEP 2: New thread safety fields (NON-BREAKING) === - /// These fields are added alongside existing fields during migration. - /// Old fields will be removed once migration to these structs is complete. + /// === PHASE 7 STEP 4: Thread safety fields (MIGRATION COMPLETE) === + /// Duplicate fields have been removed from the main struct and are now + /// accessed through main_thread and shared structs. /// Main thread data (no locking required) main_thread: MainThreadData = undefined, @@ -1153,12 +1111,12 @@ pub const FetchTasklet = struct { shared: *SharedData = undefined, pub fn ref(this: *FetchTasklet) void { - const count = this.ref_count.fetchAdd(1, .monotonic); + const count = this.shared.ref_count.fetchAdd(1, .monotonic); bun.debugAssert(count > 0); } pub fn deref(this: *FetchTasklet) void { - const count = this.ref_count.fetchSub(1, .monotonic); + const count = this.shared.ref_count.fetchSub(1, .monotonic); bun.debugAssert(count > 0); if (count == 1) { @@ -1167,14 +1125,14 @@ pub const FetchTasklet = struct { } pub fn derefFromThread(this: *FetchTasklet) void { - const count = this.ref_count.fetchSub(1, .monotonic); + const count = this.shared.ref_count.fetchSub(1, .monotonic); bun.debugAssert(count > 0); if (count == 1) { // this is really unlikely to happen, but can happen // lets make sure that we always call deinit from main thread - const vm = this.javascript_vm; + const vm = this.main_thread.javascript_vm; // Check if VM is shutting down before enqueuing if (vm.isShuttingDown()) { @@ -1357,41 +1315,41 @@ pub const FetchTasklet = struct { hostname.deinit(); } - if (this.result.certificate_info) |*certificate| { + if (this.shared.result.certificate_info) |*certificate| { certificate.deinit(bun.default_allocator); - this.result.certificate_info = null; + this.shared.result.certificate_info = null; } this.request_headers.entries.deinit(allocator); this.request_headers.buf.deinit(allocator); this.request_headers = Headers{ .allocator = undefined }; - if (this.http) |http_| { + if (this.shared.http) |http_| { http_.clearData(); } - if (this.metadata != null) { - this.metadata.?.deinit(allocator); - this.metadata = null; + if (this.shared.metadata != null) { + this.shared.metadata.?.deinit(allocator); + this.shared.metadata = null; } - this.response_buffer.deinit(); - this.response.deinit(); - if (this.native_response) |response| { - this.native_response = null; + this.shared.response_buffer.deinit(); + this.main_thread.response_weak.deinit(); + if (this.main_thread.native_response) |response| { + this.main_thread.native_response = null; response.unref(); } - this.readable_stream_ref.deinit(); + this.main_thread.readable_stream_ref.deinit(); - this.scheduled_response_buffer.deinit(); + this.shared.scheduled_response_buffer.deinit(); if (this.request_body != .ReadableStream or this.shared.request_stream_state == .waiting_start) { this.request_body.detach(); } - this.abort_reason.deinit(); - this.check_server_identity.deinit(); + this.main_thread.abort_reason.deinit(); + this.main_thread.check_server_identity.deinit(); this.clearAbortSignal(); // Clear the sink only after the requested ended otherwise we would potentialy lose the last chunk this.clearSink(); @@ -1406,7 +1364,7 @@ pub const FetchTasklet = struct { /// Called via enqueueTaskConcurrent from derefFromThread. // XXX: 'fn (*FetchTasklet) error{}!void' coerces to 'fn (*FetchTasklet) bun.JSError!void' but 'fn (*FetchTasklet) void' does not fn deinitFromMainThread(this: *FetchTasklet) error{}!void { - bun.debugAssert(this.javascript_vm.isMainThread()); + bun.debugAssert(this.main_thread.javascript_vm.isMainThread()); this.deinit() catch |err| switch (err) {}; } @@ -1414,14 +1372,14 @@ pub const FetchTasklet = struct { pub fn deinit(this: *FetchTasklet) error{}!void { log("deinit", .{}); - bun.assert(this.ref_count.load(.monotonic) == 0); + bun.assert(this.shared.ref_count.load(.monotonic) == 0); this.clearData(); const allocator = bun.default_allocator; - if (this.http) |http_| { - this.http = null; + if (this.shared.http) |http_| { + this.shared.http = null; allocator.destroy(http_); } allocator.destroy(this); @@ -1429,12 +1387,12 @@ pub const FetchTasklet = struct { fn getCurrentResponse(this: *FetchTasklet) ?*Response { // we need a body to resolve the promise when buffering - if (this.native_response) |response| { + if (this.main_thread.native_response) |response| { return response; } // if we did not have a direct reference we check if the Weak ref is still alive - if (this.response.get()) |response_js| { + if (this.main_thread.response_weak.get()) |response_js| { if (response_js.as(Response)) |response| { return response; } @@ -1460,23 +1418,23 @@ pub const FetchTasklet = struct { /// Cleanup happens via clearRequestStreaming() which drops our sink ref. pub fn startRequestStream(this: *FetchTasklet) void { { - this.mutex.lock(); - defer this.mutex.unlock(); + this.shared.mutex.lock(); + defer this.shared.mutex.unlock(); this.shared.request_stream_state = .active; } bun.assert(this.request_body == .ReadableStream); // Get the stream from request_body - if (this.request_body.ReadableStream.get(this.global_this)) |stream| { + if (this.request_body.ReadableStream.get(this.main_thread.global_this)) |stream| { // Check if request was already aborted - if (this.signal) |signal| { + if (this.main_thread.abort_signal) |signal| { if (signal.aborted()) { - stream.abort(this.global_this); + stream.abort(this.main_thread.global_this); return; } } - const globalThis = this.global_this; + const globalThis = this.main_thread.global_this; // Increment FetchTasklet ref count // This ref will be released when sink finishes (via sink callbacks) @@ -1498,14 +1456,14 @@ pub const FetchTasklet = struct { } pub fn onBodyReceived(this: *FetchTasklet) bun.JSTerminated!void { - const success = this.result.isSuccess(); - const globalThis = this.global_this; + const success = this.shared.result.isSuccess(); + const globalThis = this.main_thread.global_this; // reset the buffer if we are streaming or if we are not waiting for bufferig anymore var buffer_reset = true; - log("onBodyReceived success={} has_more={}", .{ success, this.result.has_more }); + log("onBodyReceived success={} has_more={}", .{ success, this.shared.result.has_more }); defer { if (buffer_reset) { - this.scheduled_response_buffer.reset(); + this.shared.scheduled_response_buffer.reset(); } } @@ -1515,7 +1473,7 @@ pub const FetchTasklet = struct { defer if (need_deinit) err.deinit(); var js_err = JSValue.zero; // if we are streaming update with error - if (this.readable_stream_ref.get(globalThis)) |readable| { + if (this.main_thread.readable_stream_ref.get(globalThis)) |readable| { if (readable.ptr == .Bytes) { js_err = err.toJS(globalThis); js_err.ensureStillAlive(); @@ -1544,16 +1502,16 @@ pub const FetchTasklet = struct { return; } - if (this.readable_stream_ref.get(globalThis)) |readable| { + if (this.main_thread.readable_stream_ref.get(globalThis)) |readable| { log("onBodyReceived readable_stream_ref", .{}); if (readable.ptr == .Bytes) { readable.ptr.Bytes.size_hint = this.getSizeHint(); // body can be marked as used but we still need to pipe the data - const scheduled_response_buffer = &this.scheduled_response_buffer.list; + const scheduled_response_buffer = &this.shared.scheduled_response_buffer.list; const chunk = scheduled_response_buffer.items; - if (this.result.has_more) { + if (this.shared.result.has_more) { try readable.ptr.Bytes.onData( .{ .temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk), @@ -1561,8 +1519,8 @@ pub const FetchTasklet = struct { bun.default_allocator, ); } else { - var prev = this.readable_stream_ref; - this.readable_stream_ref = .{}; + var prev = this.main_thread.readable_stream_ref; + this.main_thread.readable_stream_ref = .{}; defer prev.deinit(); buffer_reset = false; @@ -1584,11 +1542,11 @@ pub const FetchTasklet = struct { if (response.getBodyReadableStream(globalThis)) |readable| { log("onBodyReceived CurrentResponse BodyReadableStream", .{}); if (readable.ptr == .Bytes) { - const scheduled_response_buffer = this.scheduled_response_buffer.list; + const scheduled_response_buffer = this.shared.scheduled_response_buffer.list; const chunk = scheduled_response_buffer.items; - if (this.result.has_more) { + if (this.shared.result.has_more) { try readable.ptr.Bytes.onData( .{ .temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk), @@ -1612,8 +1570,8 @@ pub const FetchTasklet = struct { // we will reach here when not streaming, this is also the only case we dont wanna to reset the buffer buffer_reset = false; - if (!this.result.has_more) { - var scheduled_response_buffer = this.scheduled_response_buffer.list; + if (!this.shared.result.has_more) { + var scheduled_response_buffer = this.shared.scheduled_response_buffer.list; const body = response.getBodyValue(); // done resolve body var old = body.*; @@ -1625,7 +1583,7 @@ pub const FetchTasklet = struct { body.* = body_value; log("onBodyReceived body_value length={}", .{body_value.InternalBlob.bytes.items.len}); - this.scheduled_response_buffer = .{ + this.shared.scheduled_response_buffer = .{ .allocator = bun.default_allocator, .list = .{ .items = &.{}, @@ -1635,7 +1593,7 @@ pub const FetchTasklet = struct { if (old == .Locked) { log("onBodyReceived old.resolve", .{}); - try old.resolve(body, this.global_this, response.getFetchHeaders()); + try old.resolve(body, this.main_thread.global_this, response.getFetchHeaders()); } } } @@ -1659,10 +1617,10 @@ pub const FetchTasklet = struct { /// 1. Stream directly to the stream (Path 1 in old code) /// 2. Buffer the data until stream is created (Path 3 in old code) fn processBodyDataInitial(this: *FetchTasklet) bun.JSTerminated!void { - const globalThis = this.global_this; + const globalThis = this.main_thread.global_this; // Check if we have a readable stream yet (from Response.body being accessed) - const has_stream_ref = this.readable_stream_ref.held.has(); + const has_stream_ref = this.main_thread.readable_stream_ref.held.has(); // Also check if Response object exists with a stream const has_response_stream = blk: { @@ -1687,13 +1645,13 @@ pub const FetchTasklet = struct { /// Handles both direct streams (readable_stream_ref) and Response body streams. /// /// This corresponds to Path 1 and Path 2 in the old onBodyReceived code: - /// - Path 1: Stream via this.readable_stream_ref (lines 1130-1161) + /// - Path 1: Stream via this.main_thread.readable_stream_ref (lines 1130-1161) /// - Path 2: Stream via Response.getBodyReadableStream (lines 1163-1194) fn streamBodyToJS(this: *FetchTasklet) bun.JSTerminated!void { - const globalThis = this.global_this; - const scheduled_response_buffer = &this.scheduled_response_buffer.list; + const globalThis = this.main_thread.global_this; + const scheduled_response_buffer = &this.shared.scheduled_response_buffer.list; const chunk = scheduled_response_buffer.items; - const has_more = this.result.has_more; + const has_more = this.shared.result.has_more; // Early exit if no data and more coming if (chunk.len == 0 and has_more) { @@ -1701,7 +1659,7 @@ pub const FetchTasklet = struct { } // Path 1: Try streaming via readable_stream_ref first - if (this.readable_stream_ref.get(globalThis)) |readable| { + if (this.main_thread.readable_stream_ref.get(globalThis)) |readable| { if (readable.ptr == .Bytes) { readable.ptr.Bytes.size_hint = this.getSizeHint(); @@ -1714,8 +1672,8 @@ pub const FetchTasklet = struct { ); } else { // Done - send final chunk and clean up - var prev = this.readable_stream_ref; - this.readable_stream_ref = .{}; + var prev = this.main_thread.readable_stream_ref; + this.main_thread.readable_stream_ref = .{}; defer prev.deinit(); try readable.ptr.Bytes.onData( @@ -1776,9 +1734,9 @@ pub const FetchTasklet = struct { // This function is called when we decide to keep buffering // If request is complete, finalize the buffered body - if (!this.result.has_more) { + if (!this.shared.result.has_more) { if (this.getCurrentResponse()) |response| { - var scheduled_response_buffer = this.scheduled_response_buffer.list; + var scheduled_response_buffer = this.shared.scheduled_response_buffer.list; const body = response.getBodyValue(); // Transfer buffer to body @@ -1793,7 +1751,7 @@ pub const FetchTasklet = struct { log("bufferBodyData body_value length={}", .{body_value.InternalBlob.bytes.items.len}); // Reset buffer - this.scheduled_response_buffer = .{ + this.shared.scheduled_response_buffer = .{ .allocator = bun.default_allocator, .list = .{ .items = &.{}, @@ -1804,7 +1762,7 @@ pub const FetchTasklet = struct { // Resolve any pending promise if (old == .Locked) { log("bufferBodyData old.resolve", .{}); - old.resolve(body, this.global_this, response.getFetchHeaders()) catch { + old.resolve(body, this.main_thread.global_this, response.getFetchHeaders()) catch { // Handle termination error return; }; @@ -1822,9 +1780,9 @@ pub const FetchTasklet = struct { /// Unlike bufferBodyData(), this takes data as a parameter and appends it. fn bufferBodyDataDirect(this: *FetchTasklet, data: []const u8) void { // Append data to scheduled buffer - _ = this.scheduled_response_buffer.write(data) catch { + _ = this.shared.scheduled_response_buffer.write(data) catch { // OOM - mark as failed - this.result.fail = error.OutOfMemory; + this.shared.result.fail = error.OutOfMemory; log("bufferBodyDataDirect OOM", .{}); }; } @@ -1840,25 +1798,25 @@ pub const FetchTasklet = struct { log("onProgressUpdate", .{}); // === MAIN THREAD - Reset atomic flag first (allows HTTP thread to schedule again) === - defer this.has_schedule_callback.store(false, .release); + defer this.shared.has_schedule_callback.store(false, .release); // Balance the ref() from HTTP thread callback defer this.deref(); - const vm = this.javascript_vm; + const vm = this.main_thread.javascript_vm; // Early check: VM shutting down? if (vm.isShuttingDown()) { // Cannot touch JS - just clean up - this.mutex.lock(); - const is_done = !this.result.has_more; - this.mutex.unlock(); + this.shared.mutex.lock(); + const is_done = !this.shared.result.has_more; + this.shared.mutex.unlock(); // Note: deref() is handled by outer defer above if (is_done) { // Additional cleanup only when done - var poll_ref = this.poll_ref; - this.poll_ref = .{}; + var poll_ref = this.main_thread.poll_ref; + this.main_thread.poll_ref = .{}; poll_ref.unref(vm); } return; @@ -1876,32 +1834,32 @@ pub const FetchTasklet = struct { var certificate_info_snapshot: ?http.CertificateInfo = null; { - this.mutex.lock(); - defer this.mutex.unlock(); + this.shared.mutex.lock(); + defer this.shared.mutex.unlock(); - is_done = !this.result.has_more; + is_done = !this.shared.result.has_more; is_waiting_request_stream_start = this.shared.request_stream_state == .waiting_start; - can_stream = this.result.can_stream; + can_stream = this.shared.result.can_stream; is_waiting_body = this.shared.lifecycle == .response_awaiting_body_access; - metadata_exists = this.metadata != null; - is_success = this.result.isSuccess(); - is_waiting_abort = this.shared.abort_requested.load(.acquire) and this.result.has_more; + metadata_exists = this.shared.metadata != null; + is_success = this.shared.result.isSuccess(); + is_waiting_abort = this.shared.abort_requested.load(.acquire) and this.shared.result.has_more; // Extract certificate info (will be processed outside lock) - if (this.result.certificate_info) |cert_info| { + if (this.shared.result.certificate_info) |cert_info| { certificate_info_snapshot = cert_info; - this.result.certificate_info = null; + this.shared.result.certificate_info = null; } } // === LOCK RELEASED - Now safe to do JS work === - const globalThis = this.global_this; + const globalThis = this.main_thread.global_this; // Clean up at end defer { if (is_done) { - var poll_ref = this.poll_ref; - this.poll_ref = .{}; + var poll_ref = this.main_thread.poll_ref; + this.main_thread.poll_ref = .{}; poll_ref.unref(vm); // Note: deref() is handled by outer defer at line ~828 } @@ -1928,11 +1886,11 @@ pub const FetchTasklet = struct { return; } - const promise_value = this.promise.valueOrEmpty(); + const promise_value = this.main_thread.promise.valueOrEmpty(); if (promise_value.isEmptyOrUndefinedOrNull()) { log("onProgressUpdate: promise_value is null", .{}); - this.promise.deinit(); + this.main_thread.promise.deinit(); return; } @@ -1943,7 +1901,7 @@ pub const FetchTasklet = struct { if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info)) { log("onProgressUpdate: aborted due certError", .{}); const promise = promise_value.asAnyPromise().?; - const tracker = this.tracker; + const tracker = this.main_thread.tracker; var result = this.onReject(); defer result.deinit(); @@ -1951,14 +1909,14 @@ pub const FetchTasklet = struct { try promise.reject(globalThis, result.toJS(globalThis)); tracker.didDispatch(globalThis); - this.promise.deinit(); + this.main_thread.promise.deinit(); return; } // Re-check metadata after cert validation - this.mutex.lock(); - const has_metadata = this.metadata != null; - this.mutex.unlock(); + this.shared.mutex.lock(); + const has_metadata = this.shared.metadata != null; + this.shared.mutex.unlock(); if (!has_metadata) { log("onProgressUpdate: metadata is null after cert check", .{}); @@ -1967,12 +1925,12 @@ pub const FetchTasklet = struct { } // Resolve or reject promise (JS interaction - no lock held) - const tracker = this.tracker; + const tracker = this.main_thread.tracker; tracker.willDispatch(globalThis); defer { log("onProgressUpdate: promise_value is not null", .{}); tracker.didDispatch(globalThis); - this.promise.deinit(); + this.main_thread.promise.deinit(); } const success = is_success; @@ -2018,11 +1976,11 @@ pub const FetchTasklet = struct { var holder = bun.handleOom(bun.default_allocator.create(Holder)); holder.* = .{ .held = result, - .promise = this.promise.strong, + .promise = this.main_thread.promise.strong, .globalObject = globalThis, .task = undefined, }; - this.promise.strong = .empty; + this.main_thread.promise.strong = .empty; holder.task = switch (success) { true => jsc.AnyTask.New(Holder, Holder.resolve).init(holder), false => jsc.AnyTask.New(Holder, Holder.reject).init(holder), @@ -2032,13 +1990,13 @@ pub const FetchTasklet = struct { } pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: http.CertificateInfo) bool { - if (this.check_server_identity.get()) |check_server_identity| { + if (this.main_thread.check_server_identity.get()) |check_server_identity| { check_server_identity.ensureStillAlive(); if (certificate_info.cert.len > 0) { const cert = certificate_info.cert; var cert_ptr = cert.ptr; if (BoringSSL.d2i_X509(null, &cert_ptr, @intCast(cert.len))) |x509| { - const globalObject = this.global_this; + const globalObject = this.main_thread.global_this; defer x509.free(); const js_cert = X509.toJS(x509, globalObject) catch |err| { switch (err) { @@ -2049,12 +2007,12 @@ pub const FetchTasklet = struct { const check_result = globalObject.tryTakeException().?; // mark to wait until deinit (abort_requested + has_more checked at read time) this.shared.abort_requested.store(true, .release); - this.abort_reason.set(globalObject, check_result); - this.signal_store.aborted.store(true, .monotonic); - this.tracker.didCancel(this.global_this); + this.main_thread.abort_reason.set(globalObject, check_result); + this.shared.signal_store.aborted.store(true, .monotonic); + this.main_thread.tracker.didCancel(this.main_thread.global_this); // we need to abort the request - if (this.http) |http_| http.http_thread.scheduleShutdown(http_); - this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; + if (this.shared.http) |http_| http.http_thread.scheduleShutdown(http_); + this.shared.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; return false; }; var hostname: bun.String = bun.String.cloneUTF8(certificate_info.hostname); @@ -2068,15 +2026,15 @@ pub const FetchTasklet = struct { if (check_result.isAnyError()) { // mark to wait until deinit (abort_requested + has_more checked at read time) this.shared.abort_requested.store(true, .release); - this.abort_reason.set(globalObject, check_result); - this.signal_store.aborted.store(true, .monotonic); - this.tracker.didCancel(this.global_this); + this.main_thread.abort_reason.set(globalObject, check_result); + this.shared.signal_store.aborted.store(true, .monotonic); + this.main_thread.tracker.didCancel(this.main_thread.global_this); // we need to abort the request - if (this.http) |http_| { + if (this.shared.http) |http_| { http.http_thread.scheduleShutdown(http_); } - this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; + this.shared.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; return false; } @@ -2086,23 +2044,23 @@ pub const FetchTasklet = struct { } } } - this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; + this.shared.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; return false; } fn getAbortError(this: *FetchTasklet) ?Body.Value.ValueError { - if (this.abort_reason.has()) { + if (this.main_thread.abort_reason.has()) { defer this.clearAbortSignal(); - const out = this.abort_reason; + const out = this.main_thread.abort_reason; - this.abort_reason = .empty; + this.main_thread.abort_reason = .empty; return Body.Value.ValueError{ .JSValue = out }; } - if (this.signal) |signal| { - if (signal.reasonIfAborted(this.global_this)) |reason| { + if (this.main_thread.abort_signal) |signal| { + if (signal.reasonIfAborted(this.main_thread.global_this)) |reason| { defer this.clearAbortSignal(); - return reason.toBodyValueError(this.global_this); + return reason.toBodyValueError(this.main_thread.global_this); } } @@ -2110,8 +2068,8 @@ pub const FetchTasklet = struct { } fn clearAbortSignal(this: *FetchTasklet) void { - const signal = this.signal orelse return; - this.signal = null; + const signal = this.main_thread.abort_signal orelse return; + this.main_thread.abort_signal = null; defer { signal.pendingActivityUnref(); signal.unref(); @@ -2121,31 +2079,31 @@ pub const FetchTasklet = struct { } pub fn onReject(this: *FetchTasklet) Body.Value.ValueError { - bun.assert(this.result.fail != null); + bun.assert(this.shared.result.fail != null); log("onReject", .{}); if (this.getAbortError()) |err| { return err; } - if (this.result.abortReason()) |reason| { + if (this.shared.result.abortReason()) |reason| { return .{ .AbortReason = reason }; } // some times we don't have metadata so we also check http.url - const path = if (this.metadata) |metadata| + const path = if (this.shared.metadata) |metadata| bun.String.cloneUTF8(metadata.url) - else if (this.http) |http_| + else if (this.shared.http) |http_| bun.String.cloneUTF8(http_.url.href) else bun.String.empty; const fetch_error = jsc.SystemError{ - .code = bun.String.static(switch (this.result.fail.?) { + .code = bun.String.static(switch (this.shared.result.fail.?) { error.ConnectionClosed => "ECONNRESET", else => |e| @errorName(e), }), - .message = switch (this.result.fail.?) { + .message = switch (this.shared.result.fail.?) { error.ConnectionClosed => bun.String.static("The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()"), error.FailedToOpenSocket => bun.String.static("Was there a typo in the url or port?"), error.TooManyRedirects => bun.String.static("The response redirected too many times. For more information, pass `verbose: true` in the second argument to fetch()"), @@ -2232,18 +2190,18 @@ pub const FetchTasklet = struct { pub fn onReadableStreamAvailable(ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void { const this = bun.cast(*FetchTasklet, ctx); - this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis); + this.main_thread.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis); } pub fn onStartStreamingHTTPResponseBodyCallback(ctx: *anyopaque) jsc.WebCore.DrainResult { const this = bun.cast(*FetchTasklet, ctx); - if (this.signal_store.aborted.load(.monotonic)) { + if (this.shared.signal_store.aborted.load(.monotonic)) { return jsc.WebCore.DrainResult{ .aborted = {}, }; } - if (this.http) |http_| { + if (this.shared.http) |http_| { http_.enableResponseBodyStreaming(); // If the server sent the headers and the response body in two separate socket writes @@ -2253,14 +2211,14 @@ pub const FetchTasklet = struct { bun.http.http_thread.scheduleResponseBodyDrain(http_.async_http_id); } - this.mutex.lock(); - defer this.mutex.unlock(); + this.shared.mutex.lock(); + defer this.shared.mutex.unlock(); const size_hint = this.getSizeHint(); - var scheduled_response_buffer = this.scheduled_response_buffer.list; + var scheduled_response_buffer = this.shared.scheduled_response_buffer.list; // This means we have received part of the body but not the whole thing if (scheduled_response_buffer.items.len > 0) { - this.scheduled_response_buffer = .{ + this.shared.scheduled_response_buffer = .{ .allocator = bun.default_allocator, .list = .{ .items = &.{}, @@ -2282,9 +2240,9 @@ pub const FetchTasklet = struct { } fn getSizeHint(this: *FetchTasklet) Blob.SizeType { - return switch (this.body_size) { - .content_length => @truncate(this.body_size.content_length), - .total_received => @truncate(this.body_size.total_received), + return switch (this.shared.body_size) { + .content_length => @truncate(this.shared.body_size.content_length), + .total_received => @truncate(this.shared.body_size.total_received), .unknown => 0, }; } @@ -2298,7 +2256,7 @@ pub const FetchTasklet = struct { .Locked = .{ .size_hint = this.getSizeHint(), .task = this, - .global = this.global_this, + .global = this.main_thread.global_this, .onStartStreaming = FetchTasklet.onStartStreamingHTTPResponseBodyCallback, .onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable, }, @@ -2306,13 +2264,13 @@ pub const FetchTasklet = struct { return response; } - var scheduled_response_buffer = this.scheduled_response_buffer.list; + var scheduled_response_buffer = this.shared.scheduled_response_buffer.list; const response = Body.Value{ .InternalBlob = .{ .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), }, }; - this.scheduled_response_buffer = .{ + this.shared.scheduled_response_buffer = .{ .allocator = bun.default_allocator, .list = .{ .items = &.{}, @@ -2325,11 +2283,11 @@ pub const FetchTasklet = struct { fn toResponse(this: *FetchTasklet) Response { log("toResponse", .{}); - bun.assert(this.metadata != null); + bun.assert(this.shared.metadata != null); // at this point we always should have metadata - const metadata = this.metadata.?; + const metadata = this.shared.metadata.?; const http_response = metadata.response; - if (this.result.has_more) { + if (this.shared.result.has_more) { // Transition to response_awaiting_body_access when body hasn't been fully received yet // Must lock because lifecycle is shared with HTTP thread var locked = this.lockShared(); @@ -2346,7 +2304,7 @@ pub const FetchTasklet = struct { .value = this.toBodyValue(), }, bun.String.createAtomIfPossible(metadata.url), - this.result.redirected, + this.shared.result.redirected, ); } @@ -2354,19 +2312,19 @@ pub const FetchTasklet = struct { log("ignoreRemainingResponseBody", .{}); // enabling streaming will make the http thread to drain into the main thread (aka stop buffering) // without a stream ref, response body or response instance alive it will just ignore the result - if (this.http) |http_| { + if (this.shared.http) |http_| { http_.enableResponseBodyStreaming(); } // we should not keep the process alive if we are ignoring the body - const vm = this.javascript_vm; - this.poll_ref.unref(vm); + const vm = this.main_thread.javascript_vm; + this.main_thread.poll_ref.unref(vm); // clean any remaining refereces - this.readable_stream_ref.deinit(); - this.response.deinit(); + this.main_thread.readable_stream_ref.deinit(); + this.main_thread.response_weak.deinit(); - if (this.native_response) |response| { + if (this.main_thread.native_response) |response| { response.unref(); - this.native_response = null; + this.main_thread.native_response = null; } // Signal to ignore remaining body data (checked via shouldIgnoreBodyData) @@ -2380,11 +2338,11 @@ pub const FetchTasklet = struct { // === ACQUIRE LOCK - Fix race condition === // The HTTP thread accesses shared state in callback(), so we must lock - this.mutex.lock(); - defer this.mutex.unlock(); + this.shared.mutex.lock(); + defer this.shared.mutex.unlock(); // Check if we have a native response to work with - if (this.native_response) |response| { + if (this.main_thread.native_response) |response| { const body = response.getBodyValue(); // Three scenarios: @@ -2396,7 +2354,7 @@ pub const FetchTasklet = struct { // 3. We never started buffering, in which case we should ignore the body. // // Note: We cannot call .get() on the ReadableStreamRef. This is called inside a finalizer. - if (body.* != .Locked or this.readable_stream_ref.held.has()) { + if (body.* != .Locked or this.main_thread.readable_stream_ref.held.has()) { // Scenario 1 or 3. return; } @@ -2410,17 +2368,17 @@ pub const FetchTasklet = struct { // Scenario 2a or 3 - ignore remaining body // Signal abort to HTTP thread (under lock) - this.signal_store.aborted.store(true, .release); + this.shared.signal_store.aborted.store(true, .release); // Signal to ignore remaining body data (checked via shouldIgnoreBodyData) this.shared.abort_requested.store(true, .release); // Clear accumulated buffers since we're ignoring the rest - this.response_buffer.list.clearRetainingCapacity(); - this.scheduled_response_buffer.list.clearRetainingCapacity(); + this.shared.response_buffer.list.clearRetainingCapacity(); + this.shared.scheduled_response_buffer.list.clearRetainingCapacity(); // Enable streaming to drain remaining data without buffering - if (this.http) |http_| { + if (this.shared.http) |http_| { http_.enableResponseBodyStreaming(); } } @@ -2432,10 +2390,10 @@ pub const FetchTasklet = struct { pub fn onResolve(this: *FetchTasklet) JSValue { log("onResolve", .{}); const response = bun.new(Response, this.toResponse()); - const response_js = Response.makeMaybePooled(@as(*jsc.JSGlobalObject, this.global_this), response); + const response_js = Response.makeMaybePooled(@as(*jsc.JSGlobalObject, this.main_thread.global_this), response); response_js.ensureStillAlive(); - this.response = jsc.Weak(FetchTasklet).create(response_js, this.global_this, .FetchResponse, this); - this.native_response = response.ref(); + this.main_thread.response_weak = jsc.Weak(FetchTasklet).create(response_js, this.main_thread.global_this, .FetchResponse, this); + this.main_thread.native_response = response.ref(); return response_js; } @@ -2453,36 +2411,13 @@ pub const FetchTasklet = struct { shared_data.* = try SharedData.init(allocator); fetch_tasklet.* = .{ - .mutex = .{}, - .scheduled_response_buffer = .{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }, - .response_buffer = MutableString{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }, - .http = try allocator.create(http.AsyncHTTP), - .javascript_vm = jsc_vm, .request_body = fetch_options.body, - .global_this = globalThis, - .promise = promise, .request_headers = fetch_options.headers, .url_proxy_buffer = fetch_options.url_proxy_buffer, - .signal = fetch_options.signal, .hostname = fetch_options.hostname, - .tracker = jsc.Debugger.AsyncTaskTracker.init(jsc_vm), - .check_server_identity = fetch_options.check_server_identity, .reject_unauthorized = fetch_options.reject_unauthorized, - .upgraded_connection = fetch_options.upgraded_connection, - // === PHASE 7 STEP 2: Initialize new thread safety fields === + // === PHASE 7 STEP 4: Initialize thread safety fields (MIGRATION COMPLETE) === .main_thread = .{ .global_this = globalThis, .javascript_vm = jsc_vm, @@ -2494,9 +2429,13 @@ pub const FetchTasklet = struct { .shared = shared_data, }; - fetch_tasklet.signals = fetch_tasklet.signal_store.to(); + // Initialize HTTP client after struct init + fetch_tasklet.shared.http = try allocator.create(http.AsyncHTTP); + fetch_tasklet.shared.upgraded_connection = fetch_options.upgraded_connection; - fetch_tasklet.tracker.didSchedule(globalThis); + fetch_tasklet.shared.signals = fetch_tasklet.shared.signal_store.to(); + + fetch_tasklet.main_thread.tracker.didSchedule(globalThis); if (fetch_tasklet.request_body.store()) |store| { store.ref(); @@ -2511,20 +2450,20 @@ pub const FetchTasklet = struct { proxy = jsc_vm.transpiler.env.getHttpProxyFor(fetch_options.url); } - if (fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized) { - fetch_tasklet.signal_store.cert_errors.store(true, .monotonic); + if (fetch_tasklet.main_thread.check_server_identity.has() and fetch_tasklet.reject_unauthorized) { + fetch_tasklet.shared.signal_store.cert_errors.store(true, .monotonic); } else { - fetch_tasklet.signals.cert_errors = null; + fetch_tasklet.shared.signals.cert_errors = null; } // This task gets queued on the HTTP thread. - fetch_tasklet.http.?.* = http.AsyncHTTP.init( + fetch_tasklet.shared.http.?.* = http.AsyncHTTP.init( bun.default_allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, - &fetch_tasklet.response_buffer, + &fetch_tasklet.shared.response_buffer, fetch_tasklet.request_body.slice(), http.HTTPClientResult.Callback.New( *FetchTasklet, @@ -2535,7 +2474,7 @@ pub const FetchTasklet = struct { .{ .http_proxy = proxy, .hostname = if (fetch_options.hostname) |h| h.get() else null, - .signals = fetch_tasklet.signals, + .signals = fetch_tasklet.shared.signals, .unix_socket_path = fetch_options.unix_socket_path, .disable_timeout = fetch_options.disable_timeout, .disable_keepalive = fetch_options.disable_keepalive, @@ -2547,13 +2486,13 @@ pub const FetchTasklet = struct { ); // enable streaming the write side const isStream = fetch_tasklet.request_body == .ReadableStream; - fetch_tasklet.http.?.client.flags.is_streaming_request_body = isStream; + fetch_tasklet.shared.http.?.client.flags.is_streaming_request_body = isStream; fetch_tasklet.shared.request_stream_state = if (isStream) .waiting_start else .none; if (isStream) { const buffer = http.ThreadSafeStreamBuffer.new(.{}); buffer.setDrainCallback(FetchTasklet, FetchTasklet.onWriteRequestDataDrain, fetch_tasklet); fetch_tasklet.request_body_streaming_buffer = buffer; - fetch_tasklet.http.?.request_body = .{ + fetch_tasklet.shared.http.?.request_body = .{ .stream = .{ .buffer = buffer, .ended = false, @@ -2563,21 +2502,21 @@ pub const FetchTasklet = struct { // TODO is this necessary? the http client already sets the redirect type, // so manually setting it here seems redundant if (fetch_options.redirect_type != FetchRedirect.follow) { - fetch_tasklet.http.?.client.remaining_redirect_count = 0; + fetch_tasklet.shared.http.?.client.remaining_redirect_count = 0; } // we want to return after headers are received - fetch_tasklet.signal_store.header_progress.store(true, .monotonic); + fetch_tasklet.shared.signal_store.header_progress.store(true, .monotonic); if (fetch_tasklet.request_body == .Sendfile) { bun.assert(fetch_options.url.isHTTP()); bun.assert(fetch_options.proxy == null); - fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile }; + fetch_tasklet.shared.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile }; } - if (fetch_tasklet.signal) |signal| { + if (fetch_tasklet.main_thread.abort_signal) |signal| { signal.pendingActivityRef(); - fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener); + fetch_tasklet.main_thread.abort_signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener); } return fetch_tasklet; } @@ -2585,7 +2524,7 @@ pub const FetchTasklet = struct { pub fn abortListener(this: *FetchTasklet, reason: JSValue) void { log("abortListener", .{}); reason.ensureStillAlive(); - this.abort_reason.set(this.global_this, reason); + this.main_thread.abort_reason.set(this.main_thread.global_this, reason); this.abortTask(); if (this.sink) |sink| { sink.cancel(reason); @@ -2597,7 +2536,7 @@ pub const FetchTasklet = struct { pub fn onWriteRequestDataDrain(this: *FetchTasklet) void { // ref until the main thread callback is called this.ref(); - this.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.resumeRequestDataStream)); + this.main_thread.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.resumeRequestDataStream)); } /// This is ALWAYS called from the main thread @@ -2607,7 +2546,7 @@ pub const FetchTasklet = struct { defer this.deref(); log("resumeRequestDataStream", .{}); if (this.sink) |sink| { - if (this.signal) |signal| { + if (this.main_thread.abort_signal) |signal| { if (signal.aborted()) { // already aborted; nothing to drain return; @@ -2619,7 +2558,7 @@ pub const FetchTasklet = struct { pub fn writeRequestData(this: *FetchTasklet, data: []const u8) ResumableSinkBackpressure { log("writeRequestData {}", .{data.len}); - if (this.signal) |signal| { + if (this.main_thread.abort_signal) |signal| { if (signal.aborted()) { return .done; } @@ -2632,13 +2571,13 @@ pub const FetchTasklet = struct { var needs_schedule = false; defer if (needs_schedule) { // wakeup the http thread to write the data - http.http_thread.scheduleRequestWrite(this.http.?, .data); + http.http_thread.scheduleRequestWrite(this.shared.http.?, .data); }; // dont have backpressure so we will schedule the data to be written // if we have backpressure the onWritable will drain the buffer needs_schedule = stream_buffer.isEmpty(); - if (this.upgraded_connection) { + if (this.shared.upgraded_connection) { bun.handleOom(stream_buffer.write(data)); } else { //16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n @@ -2664,22 +2603,22 @@ pub const FetchTasklet = struct { log("writeEndRequest hasError? {}", .{err != null}); defer this.deref(); if (err) |jsError| { - if (this.signal_store.aborted.load(.monotonic) or this.abort_reason.has()) { + if (this.shared.signal_store.aborted.load(.monotonic) or this.main_thread.abort_reason.has()) { return; } if (!jsError.isUndefinedOrNull()) { - this.abort_reason.set(this.global_this, jsError); + this.main_thread.abort_reason.set(this.main_thread.global_this, jsError); } this.abortTask(); } else { - if (!this.upgraded_connection) { + if (!this.shared.upgraded_connection) { // If is not upgraded we need to send the terminating chunk const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return; const stream_buffer = thread_safe_stream_buffer.acquire(); defer thread_safe_stream_buffer.release(); bun.handleOom(stream_buffer.write(http.end_of_chunked_http1_1_encoding_response_body)); } - if (this.http) |http_| { + if (this.shared.http) |http_| { // just tell to write the end of the chunked encoding aka 0\r\n\r\n http.http_thread.scheduleRequestWrite(http_, .end); } @@ -2687,10 +2626,10 @@ pub const FetchTasklet = struct { } pub fn abortTask(this: *FetchTasklet) void { - this.signal_store.aborted.store(true, .monotonic); - this.tracker.didCancel(this.global_this); + this.shared.signal_store.aborted.store(true, .monotonic); + this.main_thread.tracker.didCancel(this.main_thread.global_this); - if (this.http) |http_| { + if (this.shared.http) |http_| { http.http_thread.scheduleShutdown(http_); } } @@ -2752,45 +2691,45 @@ pub const FetchTasklet = struct { defer if (is_done) task.derefFromThread(); // Fast-path abort check (no lock needed for atomic read) - if (task.signal_store.aborted.load(.acquire)) { + if (task.shared.signal_store.aborted.load(.acquire)) { // Already aborted, don't schedule anything return; } // Prevent duplicate enqueues (atomic swap before taking lock) - if (task.has_schedule_callback.swap(true, .acq_rel)) { + if (task.shared.has_schedule_callback.swap(true, .acq_rel)) { // Already scheduled, this data will be picked up on next callback return; } // === ACQUIRE LOCK - Brief critical section === - task.mutex.lock(); - defer task.mutex.unlock(); + task.shared.mutex.lock(); + defer task.shared.mutex.unlock(); // Update HTTP client reference (needed for abort handling) - task.http.?.* = async_http.*; - task.http.?.response_buffer = async_http.response_buffer; + task.shared.http.?.* = async_http.*; + task.shared.http.?.response_buffer = async_http.response_buffer; log("callback success={} ignore_data={} has_more={} bytes={}", .{ result.isSuccess(), shouldIgnoreBodyData(task.shared.lifecycle, task.shared.abort_requested.load(.acquire)), result.has_more, result.body.?.list.items.len }); // Preserve previous metadata and certificate info - const prev_metadata = task.result.metadata; - const prev_cert_info = task.result.certificate_info; - task.result = result; + const prev_metadata = task.shared.result.metadata; + const prev_cert_info = task.shared.result.certificate_info; + task.shared.result = result; - if (task.result.certificate_info == null) { + if (task.shared.result.certificate_info == null) { if (prev_cert_info) |cert_info| { - task.result.certificate_info = cert_info; + task.shared.result.certificate_info = cert_info; } } // Store metadata (only provided once) if (result.metadata orelse prev_metadata) |metadata| { log("added callback metadata", .{}); - if (task.metadata == null) { - task.metadata = metadata; + if (task.shared.metadata == null) { + task.shared.metadata = metadata; } - task.result.metadata = null; + task.shared.result.metadata = null; } task.body_size = result.body_size;