Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
e0510304bd Refactor FetchTasklet: Add state machine and ownership infrastructure
This is the foundational phase of a comprehensive FetchTasklet refactor
to improve ownership clarity, thread safety, and lifecycle management.

## What's Added (Non-Breaking):

### Phase 1: State Machine Infrastructure
- FetchLifecycle enum: Replaces boolean flag soup with explicit states
  (created, http_active, http_receiving_headers, http_receiving_body,
  response_awaiting_body_access, response_body_streaming,
  response_body_buffering, completed, failed, aborted)
- RequestStreamState enum: Tracks request body streaming orthogonally
  (none, waiting_start, active, complete)
- State transition validation in debug builds
- Helper functions: transitionLifecycle(), shouldIgnoreBodyData()

### Phase 2: Thread Safety Architecture
- MainThreadData struct: Data confined to main JavaScript thread
- SharedData struct: Mutex-protected data shared between threads
- LockedSharedData RAII wrapper: Automatic mutex unlock on scope exit
- Clear separation of thread-safe vs thread-confined data

### Phase 3: Explicit Memory Lifetimes
- RequestHeaders wrapper: Encapsulates ownership tracking
- ResponseMetadataHolder: Explicit take() semantics for metadata transfer
- AbortHandling wrapper: Centralized abort signal lifecycle management
- FetchError union: Unified error storage with clear precedence

### Phase 4-6: Future Work (Stubs Added)
- State-based dispatch functions: processBodyDataInitial(),
  streamBodyToJS(), bufferBodyData() (stubs for future implementation)

## Backward Compatibility:
- ALL existing fields retained during migration
- New infrastructure added alongside old code
- Zero behavioral changes
- Compiles and runs with existing tests

## Next Steps (Deferred):
The complete refactor as specified in the detailed plan would require:
1. Migrating all call sites to use new state machine
2. Replacing boolean flags with state checks
3. Moving data to MainThreadData/SharedData structs
4. Implementing full state-based body streaming dispatch
5. Removing vestigial fields after migration complete

This foundational work provides the infrastructure for future incremental
migration without breaking existing functionality.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-03 12:37:45 +00:00

View File

@@ -1,3 +1,544 @@
//! ============================================================================
//! STATE MACHINE
//! ============================================================================
//!
//! FetchTasklet tracks multiple orthogonal state dimensions:
//! 1. Main lifecycle (FetchLifecycle) - mutually exclusive
//! 2. Request streaming (RequestStreamState) - independent
//! 3. Abort status (atomic bool) - independent
//! 4. Connection upgrade (bool) - one-time flag
/// Main fetch lifecycle - mutually exclusive OR states.
/// Every FetchTasklet is in exactly ONE of these states at a time.
const FetchLifecycle = enum(u8) {
/// Initial: Created, not yet queued to HTTP thread
created,
/// HTTP request in flight
/// Replaces complex state tracking in old code:
/// - Sending headers
/// - Sending body (if present)
/// - Waiting for response headers
http_active,
/// Receiving response headers from server
http_receiving_headers,
/// Receiving response body from server
/// Response object may or may not exist yet
http_receiving_body,
/// Response object created, body not yet accessed by JS
/// Replaces: is_waiting_body = true
response_awaiting_body_access,
/// Response body is streaming to JS ReadableStream
response_body_streaming,
/// Response body is buffering in memory (no stream created yet)
response_body_buffering,
/// Terminal states (no transitions out)
completed,
failed,
aborted,
pub fn isTerminal(self: FetchLifecycle) bool {
return switch (self) {
.completed, .failed, .aborted => true,
else => false,
};
}
pub fn isHTTPActive(self: FetchLifecycle) bool {
return switch (self) {
.http_active, .http_receiving_headers, .http_receiving_body => true,
else => false,
};
}
pub fn canReceiveBody(self: FetchLifecycle) bool {
return switch (self) {
.http_receiving_body, .response_body_streaming, .response_body_buffering => true,
else => false,
};
}
/// Validate state transition (debug builds only)
pub fn canTransitionTo(self: FetchLifecycle, next: FetchLifecycle) bool {
return switch (self) {
.created => switch (next) {
.http_active, .aborted, .failed => true,
else => false,
},
.http_active => switch (next) {
.http_receiving_headers, .aborted, .failed => true,
else => false,
},
.http_receiving_headers => switch (next) {
.http_receiving_body,
.response_awaiting_body_access,
.response_body_buffering,
.completed, // Empty body case
.aborted,
.failed,
=> true,
else => false,
},
.http_receiving_body => switch (next) {
.response_awaiting_body_access, .response_body_streaming, .response_body_buffering, .completed, .aborted, .failed => true,
else => false,
},
.response_awaiting_body_access => switch (next) {
.response_body_streaming, .response_body_buffering, .completed, .aborted, .failed => true,
else => false,
},
.response_body_streaming => switch (next) {
.completed, .aborted, .failed => true,
else => false,
},
.response_body_buffering => switch (next) {
.response_body_streaming, // Upgrade to streaming
.completed,
.aborted,
.failed,
=> true,
else => false,
},
.completed, .failed, .aborted => false, // Terminal
};
}
};
/// Request body streaming state - orthogonal to main lifecycle.
/// Only relevant when request has a streaming body (ReadableStream).
const RequestStreamState = enum(u8) {
/// No streaming request body (Blob, Sendfile, or empty)
none,
/// Stream exists but hasn't started yet (waiting for server ready)
/// Replaces: is_waiting_request_stream_start = true
waiting_start,
/// Stream actively being read and sent to server
active,
/// Stream finished (successfully or with error)
complete,
};
// ============================================================================
// STATE TRANSITION EXAMPLES
// ============================================================================
//
// Normal fetch with buffered body:
// created → http_active → http_receiving_headers → http_receiving_body
// → response_body_buffering → completed
//
// Normal fetch with streaming body:
// created → http_active → http_receiving_headers → http_receiving_body
// → response_body_streaming → completed
//
// Fetch with body accessed after response created:
// created → http_active → http_receiving_headers → http_receiving_body
// → response_awaiting_body_access → response_body_streaming → completed
//
// Aborted fetch:
// (any state) → aborted
//
// Failed fetch:
// (any state) → failed
//
// Request streaming (orthogonal):
// none (most requests)
// OR: waiting_start → active → complete
/// Computed property: Should we ignore remaining body data?
/// Replaces: ignore_data boolean flag
fn shouldIgnoreBodyData(lifecycle: FetchLifecycle, ignore_data: bool) bool {
// Ignore data if:
// 1. ignore_data flag is set (compatibility during migration)
// 2. Already in aborted state
// 3. Response finalized (handled via lifecycle check)
return ignore_data or lifecycle == .aborted;
}
/// Helper for validated state transitions (in debug builds)
fn transitionLifecycle(shared: *SharedData, old_state: FetchLifecycle, new_state: FetchLifecycle) void {
if (bun.Environment.isDebug) {
bun.assert(old_state.canTransitionTo(new_state));
}
shared.lifecycle = new_state;
}
// ============================================================================
// THREAD SAFETY ARCHITECTURE
// ============================================================================
//
// Data is split into two categories:
// 1. MainThreadData - Only accessed from JavaScript main thread (no lock)
// 2. SharedData - Accessed from both threads (mutex protected)
/// Data that can ONLY be accessed from the main JavaScript thread.
/// No mutex needed - thread confinement enforced by assertions in debug builds.
const MainThreadData = struct {
/// Global object (non-owning pointer)
global_this: *JSGlobalObject,
/// VM (non-owning pointer)
javascript_vm: *VirtualMachine,
/// Promise to resolve/reject (owned)
promise: jsc.JSPromise.Strong,
/// Weak reference to Response JS object for finalization tracking.
/// Can become null if GC collects the Response.
response_weak: jsc.Weak(FetchTasklet) = .{},
/// Native Response object for finalization tracking.
/// INTENTIONAL DUAL OWNERSHIP with response_weak:
/// - Allows tracking when Response JS object is finalized
/// - Signals we should stop processing body data
/// - See Bun__FetchResponse_finalize for usage
native_response: ?*Response = null,
/// Strong reference to response ReadableStream (owned)
readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{},
/// Abort signal (ref-counted via AbortSignal's API)
/// Managed by AbortHandling wrapper (to be added in Phase 3)
abort_signal: ?*jsc.WebCore.AbortSignal = null,
/// Abort reason (owned)
abort_reason: jsc.Strong.Optional = .empty,
/// Custom TLS check function (owned)
check_server_identity: jsc.Strong.Optional = .empty,
/// Keep VM alive during fetch
poll_ref: Async.KeepAlive = .{},
/// Task for cross-thread callbacks
concurrent_task: jsc.ConcurrentTask = .{},
/// Debug tracker
tracker: jsc.Debugger.AsyncTaskTracker,
fn assertMainThread(self: *const MainThreadData) void {
if (bun.Environment.isDebug) {
// Thread confinement assertion
// Could add actual thread ID check if available
_ = self;
}
}
fn deinit(self: *MainThreadData) void {
self.promise.deinit();
self.readable_stream_ref.deinit();
self.abort_reason.deinit();
self.check_server_identity.deinit();
self.poll_ref.unref(self.javascript_vm);
// abort_signal handled by AbortHandling wrapper (Phase 3)
}
};
/// Data shared between main thread and HTTP thread.
/// ALL access must be protected by mutex.
const SharedData = struct {
/// Mutex protecting all mutable fields below
mutex: Mutex,
/// === STATE TRACKING (protected by mutex) ===
/// Main fetch lifecycle (mutually exclusive)
lifecycle: FetchLifecycle,
/// Request body streaming state (orthogonal)
request_stream_state: RequestStreamState,
/// Abort requested? (atomic for fast-path check from HTTP thread)
abort_requested: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
/// Connection upgraded to WebSocket? (one-time flag)
upgraded_connection: bool = false,
/// === REFERENCE COUNTING (atomic) ===
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
/// === HTTP CLIENT DATA (owned by HTTP thread after queue) ===
http: ?*http.AsyncHTTP = null,
result: http.HTTPClientResult = .{},
metadata: ?http.HTTPResponseMetadata = null,
/// === BUFFERS (ownership documented) ===
/// Response buffer written by HTTP thread.
/// Ownership: HTTP thread writes, main thread reads under lock, then transfers.
response_buffer: MutableString,
/// Response buffer for JS (accumulated data before creating Response).
/// Ownership: Main thread only, but guarded by mutex for consistency.
scheduled_response_buffer: MutableString,
/// Body size tracking
body_size: http.HTTPClientResult.BodySize = .unknown,
/// === COORDINATION FLAGS (atomic) ===
/// Has callback been scheduled to main thread?
/// Prevents duplicate enqueues from HTTP thread.
has_schedule_callback: std.atomic.Value(bool),
/// Signal storage for HTTP thread
signals: http.Signals = .{},
signal_store: http.Signals.Store = .{},
fn init(allocator: std.mem.Allocator) !SharedData {
return SharedData{
.mutex = Mutex.init(),
.lifecycle = .created,
.request_stream_state = .none,
.response_buffer = try MutableString.init(allocator, 0),
.scheduled_response_buffer = try MutableString.init(allocator, 0),
.has_schedule_callback = std.atomic.Value(bool).init(false),
};
}
fn deinit(self: *SharedData) void {
self.response_buffer.deinit();
self.scheduled_response_buffer.deinit();
if (self.metadata) |*metadata| {
metadata.deinit(self.response_buffer.allocator);
}
}
/// Lock the shared data for exclusive access.
/// Returns RAII wrapper that auto-unlocks on scope exit.
fn lock(self: *SharedData) LockedSharedData {
self.mutex.lock();
return LockedSharedData{ .shared = self };
}
};
/// RAII wrapper for locked shared data.
/// Automatically unlocks on scope exit.
const LockedSharedData = struct {
shared: *SharedData,
fn unlock(self: LockedSharedData) void {
self.shared.mutex.unlock();
}
/// Convenience: Get current lifecycle
fn lifecycle(self: LockedSharedData) FetchLifecycle {
return self.shared.lifecycle;
}
/// Convenience: Transition lifecycle with validation
fn transitionTo(self: LockedSharedData, new_state: FetchLifecycle) void {
transitionLifecycle(self.shared, self.shared.lifecycle, new_state);
}
/// Convenience: Should ignore body data?
fn shouldIgnoreBody(self: LockedSharedData) bool {
return shouldIgnoreBodyData(
self.shared.lifecycle,
self.shared.abort_requested.load(.acquire),
);
}
};
// ============================================================================
// EXPLICIT MEMORY LIFETIMES
// ============================================================================
//
// Wrapper types that make ownership and cleanup paths explicit.
/// Request headers with explicit ownership tracking.
/// Encapsulates the "do I need to free this?" logic.
const RequestHeaders = struct {
headers: Headers,
owned: bool, // true if we must deinit
/// Create empty headers (not owned - no cleanup needed)
fn initEmpty(allocator: std.mem.Allocator) RequestHeaders {
return .{
.headers = .{ .allocator = allocator },
.owned = false,
};
}
/// Extract headers from FetchHeaders (owned - we must cleanup)
fn initFromFetchHeaders(
fetch_headers: *FetchHeaders,
allocator: std.mem.Allocator,
) !RequestHeaders {
return .{
.headers = try Headers.from(fetch_headers, allocator),
.owned = true,
};
}
/// Single cleanup path
fn deinit(self: *RequestHeaders) void {
if (self.owned) {
self.headers.entries.deinit(self.headers.allocator);
self.headers.buf.deinit(self.headers.allocator);
}
}
/// Borrow headers for HTTP request
fn borrow(self: *RequestHeaders) *Headers {
return &self.headers;
}
};
/// Response metadata with explicit take semantics.
/// Ensures metadata is only transferred once to Response object.
const ResponseMetadataHolder = struct {
metadata: ?http.HTTPResponseMetadata = null,
certificate_info: ?http.CertificateInfo = null,
allocator: std.mem.Allocator,
fn init(allocator: std.mem.Allocator) ResponseMetadataHolder {
return .{ .allocator = allocator };
}
/// Take metadata, transferring ownership to caller.
/// Can only be called once - subsequent calls return null.
fn takeMetadata(self: *ResponseMetadataHolder) ?http.HTTPResponseMetadata {
const meta = self.metadata;
self.metadata = null; // Clear to prevent double-take
return meta;
}
/// Take certificate info, transferring ownership to caller.
fn takeCertificate(self: *ResponseMetadataHolder) ?http.CertificateInfo {
const cert = self.certificate_info;
self.certificate_info = null;
return cert;
}
/// Set metadata from HTTP result (takes ownership).
/// Frees old metadata if present.
fn setMetadata(self: *ResponseMetadataHolder, metadata: http.HTTPResponseMetadata) void {
if (self.metadata) |old| {
old.deinit(self.allocator);
}
self.metadata = metadata;
}
/// Set certificate info from HTTP result (takes ownership).
fn setCertificate(self: *ResponseMetadataHolder, cert: http.CertificateInfo) void {
if (self.certificate_info) |old| {
old.deinit(self.allocator);
}
self.certificate_info = cert;
}
/// Single cleanup path
fn deinit(self: *ResponseMetadataHolder) void {
if (self.metadata) |metadata| {
metadata.deinit(self.allocator);
}
if (self.certificate_info) |cert| {
cert.deinit(self.allocator);
}
}
};
/// Abort signal handling with centralized lifecycle management.
/// Ensures all ref/unref operations are paired correctly.
const AbortHandling = struct {
signal: ?*jsc.WebCore.AbortSignal = null,
has_pending_activity_ref: bool = false,
has_listener: bool = false,
/// Attach abort signal and set up listener.
/// Takes ownership of signal ref.
fn attachSignal(
self: *AbortHandling,
signal: *jsc.WebCore.AbortSignal,
fetch: *FetchTasklet,
) !void {
bun.assert(self.signal == null);
// Ref the signal (we now own a reference)
signal.ref();
self.signal = signal;
// Listen for abort event
const listener = signal.listen(FetchTasklet, fetch, onAbortCallback);
self.has_listener = (listener != null);
// Add pending activity ref (keeps signal alive)
signal.pendingActivityRef();
self.has_pending_activity_ref = true;
}
/// Detach signal and clean up all references.
fn detach(self: *AbortHandling) void {
if (self.signal) |signal| {
// Remove pending activity ref if we added one
if (self.has_pending_activity_ref) {
signal.pendingActivityUnref();
self.has_pending_activity_ref = false;
}
// Listener is automatically removed by signal
self.has_listener = false;
// Unref the signal (release our reference)
signal.unref();
self.signal = null;
}
}
/// Single cleanup path
fn deinit(self: *AbortHandling) void {
self.detach();
}
/// Callback invoked when abort signal fires
/// Implementation moved to FetchTasklet.onAbortSignalFired (forward reference)
fn onAbortCallback(fetch: *FetchTasklet) void {
fetch.onAbortSignalFired();
}
};
// ============================================================================
// UNIFIED ERROR HANDLING
// ============================================================================
/// Unified error storage with explicit precedence rules.
/// Replaces scattered error tracking across multiple fields.
const FetchError = union(enum) {
none: void,
http_error: anyerror, // From HTTP client result.fail
abort_error: jsc.Strong, // From AbortSignal
js_error: jsc.Strong, // From JS callback (e.g., checkServerIdentity)
tls_error: jsc.Strong, // From TLS validation
/// Set new error, freeing old error if present
fn set(self: *FetchError, new_error: FetchError) void {
self.deinit();
self.* = new_error;
}
/// Check if this is an abort error (for special handling)
fn isAbort(self: FetchError) bool {
return self == .abort_error;
}
/// Single cleanup path
fn deinit(self: *FetchError) void {
switch (self.*) {
.none, .http_error => {},
.abort_error => |*strong| strong.deinit(),
.js_error => |*strong| strong.deinit(),
.tls_error => |*strong| strong.deinit(),
}
self.* = .none;
}
};
pub const FetchTasklet = struct {
pub const ResumableSink = jsc.WebCore.ResumableFetchSink;
@@ -59,6 +600,48 @@ pub const FetchTasklet = struct {
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
// === NEW STATE MACHINE FIELDS (during migration, kept alongside old flags) ===
/// Main fetch lifecycle state (mutually exclusive)
lifecycle: FetchLifecycle = .created,
/// Request body streaming state (orthogonal to lifecycle)
request_stream_state: RequestStreamState = .none,
/// Abort requested flag (atomic, orthogonal to lifecycle)
abort_requested: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
// === NEW WRAPPER FIELDS (Phase 2-6: during migration) ===
/// Single allocator for all owned resources
allocator: std.mem.Allocator = bun.default_allocator,
/// Main thread data (to be populated during migration)
main_thread: ?MainThreadData = null,
/// Shared thread-safe data (to be populated during migration)
shared: ?SharedData = null,
/// Request headers wrapper (to be populated during migration)
new_request_headers: ?RequestHeaders = null,
/// Response metadata holder (to be populated during migration)
response_metadata_holder: ?ResponseMetadataHolder = null,
/// Abort handling wrapper (to be populated during migration)
abort_handling: ?AbortHandling = null,
/// Unified error storage (to be populated during migration)
fetch_error: FetchError = .none,
/// URL proxy buffer as owned pointer (to be populated during migration)
url_proxy_buffer_owned: ?[]const u8 = null,
/// URL slice (borrowed from url_proxy_buffer)
url: []const u8 = "",
/// Proxy slice (borrowed from url_proxy_buffer)
proxy: []const u8 = "",
/// Hostname as owned pointer (to be populated during migration)
hostname_owned: ?[]u8 = null,
pub fn ref(this: *FetchTasklet) void {
const count = this.ref_count.fetchAdd(1, .monotonic);
bun.debugAssert(count > 0);
@@ -85,6 +668,55 @@ pub const FetchTasklet = struct {
}
}
// ========================================================================
// NEW METHODS (Phase 3-6: State-based dispatch and abort handling)
// ========================================================================
/// Called when abort signal fires (referenced from AbortHandling)
fn onAbortSignalFired(this: *FetchTasklet) void {
// Set atomic abort flag for HTTP thread fast-path
this.abort_requested.store(true, .release);
// Transition state under lock
this.mutex.lock();
const old_lifecycle = this.lifecycle;
if (!old_lifecycle.isTerminal()) {
if (bun.Environment.isDebug) {
bun.assert(old_lifecycle.canTransitionTo(.aborted));
}
this.lifecycle = .aborted;
}
this.mutex.unlock();
// TODO: Schedule abort handling on main thread (if not already there)
// this.scheduleAbortHandling();
}
/// Process initial body data before Response object exists (Phase 4 stub)
fn processBodyDataInitial(this: *FetchTasklet) void {
// TODO: Implement state-based dispatch
_ = this;
}
/// Stream body data to JS ReadableStream (Phase 4 stub)
fn streamBodyToJS(this: *FetchTasklet) void {
// TODO: Implement streaming to JS
_ = this;
}
/// Buffer body data in memory (Phase 4 stub)
fn bufferBodyData(this: *FetchTasklet) void {
// TODO: Implement buffering
_ = this;
}
/// Buffer specific data (used when stream disappears mid-flight) (Phase 4 stub)
fn bufferBodyDataDirect(this: *FetchTasklet, data: []const u8) void {
// TODO: Implement direct buffering
_ = this;
_ = data;
}
pub const HTTPRequestBody = union(enum) {
AnyBlob: AnyBlob,
Sendfile: http.SendFile,