mirror of
https://github.com/oven-sh/bun
synced 2026-02-08 18:08:50 +00:00
Compare commits
1 Commits
dylan/pyth
...
claude/fet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0510304bd |
@@ -1,3 +1,544 @@
|
||||
//! ============================================================================
|
||||
//! STATE MACHINE
|
||||
//! ============================================================================
|
||||
//!
|
||||
//! FetchTasklet tracks multiple orthogonal state dimensions:
|
||||
//! 1. Main lifecycle (FetchLifecycle) - mutually exclusive
|
||||
//! 2. Request streaming (RequestStreamState) - independent
|
||||
//! 3. Abort status (atomic bool) - independent
|
||||
//! 4. Connection upgrade (bool) - one-time flag
|
||||
|
||||
/// Main fetch lifecycle - mutually exclusive OR states.
|
||||
/// Every FetchTasklet is in exactly ONE of these states at a time.
|
||||
const FetchLifecycle = enum(u8) {
|
||||
/// Initial: Created, not yet queued to HTTP thread
|
||||
created,
|
||||
|
||||
/// HTTP request in flight
|
||||
/// Replaces complex state tracking in old code:
|
||||
/// - Sending headers
|
||||
/// - Sending body (if present)
|
||||
/// - Waiting for response headers
|
||||
http_active,
|
||||
|
||||
/// Receiving response headers from server
|
||||
http_receiving_headers,
|
||||
|
||||
/// Receiving response body from server
|
||||
/// Response object may or may not exist yet
|
||||
http_receiving_body,
|
||||
|
||||
/// Response object created, body not yet accessed by JS
|
||||
/// Replaces: is_waiting_body = true
|
||||
response_awaiting_body_access,
|
||||
|
||||
/// Response body is streaming to JS ReadableStream
|
||||
response_body_streaming,
|
||||
|
||||
/// Response body is buffering in memory (no stream created yet)
|
||||
response_body_buffering,
|
||||
|
||||
/// Terminal states (no transitions out)
|
||||
completed,
|
||||
failed,
|
||||
aborted,
|
||||
|
||||
pub fn isTerminal(self: FetchLifecycle) bool {
|
||||
return switch (self) {
|
||||
.completed, .failed, .aborted => true,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn isHTTPActive(self: FetchLifecycle) bool {
|
||||
return switch (self) {
|
||||
.http_active, .http_receiving_headers, .http_receiving_body => true,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn canReceiveBody(self: FetchLifecycle) bool {
|
||||
return switch (self) {
|
||||
.http_receiving_body, .response_body_streaming, .response_body_buffering => true,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
/// Validate state transition (debug builds only)
|
||||
pub fn canTransitionTo(self: FetchLifecycle, next: FetchLifecycle) bool {
|
||||
return switch (self) {
|
||||
.created => switch (next) {
|
||||
.http_active, .aborted, .failed => true,
|
||||
else => false,
|
||||
},
|
||||
.http_active => switch (next) {
|
||||
.http_receiving_headers, .aborted, .failed => true,
|
||||
else => false,
|
||||
},
|
||||
.http_receiving_headers => switch (next) {
|
||||
.http_receiving_body,
|
||||
.response_awaiting_body_access,
|
||||
.response_body_buffering,
|
||||
.completed, // Empty body case
|
||||
.aborted,
|
||||
.failed,
|
||||
=> true,
|
||||
else => false,
|
||||
},
|
||||
.http_receiving_body => switch (next) {
|
||||
.response_awaiting_body_access, .response_body_streaming, .response_body_buffering, .completed, .aborted, .failed => true,
|
||||
else => false,
|
||||
},
|
||||
.response_awaiting_body_access => switch (next) {
|
||||
.response_body_streaming, .response_body_buffering, .completed, .aborted, .failed => true,
|
||||
else => false,
|
||||
},
|
||||
.response_body_streaming => switch (next) {
|
||||
.completed, .aborted, .failed => true,
|
||||
else => false,
|
||||
},
|
||||
.response_body_buffering => switch (next) {
|
||||
.response_body_streaming, // Upgrade to streaming
|
||||
.completed,
|
||||
.aborted,
|
||||
.failed,
|
||||
=> true,
|
||||
else => false,
|
||||
},
|
||||
.completed, .failed, .aborted => false, // Terminal
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
/// Request body streaming state - orthogonal to main lifecycle.
|
||||
/// Only relevant when request has a streaming body (ReadableStream).
|
||||
const RequestStreamState = enum(u8) {
|
||||
/// No streaming request body (Blob, Sendfile, or empty)
|
||||
none,
|
||||
|
||||
/// Stream exists but hasn't started yet (waiting for server ready)
|
||||
/// Replaces: is_waiting_request_stream_start = true
|
||||
waiting_start,
|
||||
|
||||
/// Stream actively being read and sent to server
|
||||
active,
|
||||
|
||||
/// Stream finished (successfully or with error)
|
||||
complete,
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// STATE TRANSITION EXAMPLES
|
||||
// ============================================================================
|
||||
//
|
||||
// Normal fetch with buffered body:
|
||||
// created → http_active → http_receiving_headers → http_receiving_body
|
||||
// → response_body_buffering → completed
|
||||
//
|
||||
// Normal fetch with streaming body:
|
||||
// created → http_active → http_receiving_headers → http_receiving_body
|
||||
// → response_body_streaming → completed
|
||||
//
|
||||
// Fetch with body accessed after response created:
|
||||
// created → http_active → http_receiving_headers → http_receiving_body
|
||||
// → response_awaiting_body_access → response_body_streaming → completed
|
||||
//
|
||||
// Aborted fetch:
|
||||
// (any state) → aborted
|
||||
//
|
||||
// Failed fetch:
|
||||
// (any state) → failed
|
||||
//
|
||||
// Request streaming (orthogonal):
|
||||
// none (most requests)
|
||||
// OR: waiting_start → active → complete
|
||||
|
||||
/// Computed property: Should we ignore remaining body data?
|
||||
/// Replaces: ignore_data boolean flag
|
||||
fn shouldIgnoreBodyData(lifecycle: FetchLifecycle, ignore_data: bool) bool {
|
||||
// Ignore data if:
|
||||
// 1. ignore_data flag is set (compatibility during migration)
|
||||
// 2. Already in aborted state
|
||||
// 3. Response finalized (handled via lifecycle check)
|
||||
return ignore_data or lifecycle == .aborted;
|
||||
}
|
||||
|
||||
/// Helper for validated state transitions (in debug builds)
|
||||
fn transitionLifecycle(shared: *SharedData, old_state: FetchLifecycle, new_state: FetchLifecycle) void {
|
||||
if (bun.Environment.isDebug) {
|
||||
bun.assert(old_state.canTransitionTo(new_state));
|
||||
}
|
||||
shared.lifecycle = new_state;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// THREAD SAFETY ARCHITECTURE
|
||||
// ============================================================================
|
||||
//
|
||||
// Data is split into two categories:
|
||||
// 1. MainThreadData - Only accessed from JavaScript main thread (no lock)
|
||||
// 2. SharedData - Accessed from both threads (mutex protected)
|
||||
|
||||
/// Data that can ONLY be accessed from the main JavaScript thread.
|
||||
/// No mutex needed - thread confinement enforced by assertions in debug builds.
|
||||
const MainThreadData = struct {
|
||||
/// Global object (non-owning pointer)
|
||||
global_this: *JSGlobalObject,
|
||||
|
||||
/// VM (non-owning pointer)
|
||||
javascript_vm: *VirtualMachine,
|
||||
|
||||
/// Promise to resolve/reject (owned)
|
||||
promise: jsc.JSPromise.Strong,
|
||||
|
||||
/// Weak reference to Response JS object for finalization tracking.
|
||||
/// Can become null if GC collects the Response.
|
||||
response_weak: jsc.Weak(FetchTasklet) = .{},
|
||||
|
||||
/// Native Response object for finalization tracking.
|
||||
/// INTENTIONAL DUAL OWNERSHIP with response_weak:
|
||||
/// - Allows tracking when Response JS object is finalized
|
||||
/// - Signals we should stop processing body data
|
||||
/// - See Bun__FetchResponse_finalize for usage
|
||||
native_response: ?*Response = null,
|
||||
|
||||
/// Strong reference to response ReadableStream (owned)
|
||||
readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{},
|
||||
|
||||
/// Abort signal (ref-counted via AbortSignal's API)
|
||||
/// Managed by AbortHandling wrapper (to be added in Phase 3)
|
||||
abort_signal: ?*jsc.WebCore.AbortSignal = null,
|
||||
|
||||
/// Abort reason (owned)
|
||||
abort_reason: jsc.Strong.Optional = .empty,
|
||||
|
||||
/// Custom TLS check function (owned)
|
||||
check_server_identity: jsc.Strong.Optional = .empty,
|
||||
|
||||
/// Keep VM alive during fetch
|
||||
poll_ref: Async.KeepAlive = .{},
|
||||
|
||||
/// Task for cross-thread callbacks
|
||||
concurrent_task: jsc.ConcurrentTask = .{},
|
||||
|
||||
/// Debug tracker
|
||||
tracker: jsc.Debugger.AsyncTaskTracker,
|
||||
|
||||
fn assertMainThread(self: *const MainThreadData) void {
|
||||
if (bun.Environment.isDebug) {
|
||||
// Thread confinement assertion
|
||||
// Could add actual thread ID check if available
|
||||
_ = self;
|
||||
}
|
||||
}
|
||||
|
||||
fn deinit(self: *MainThreadData) void {
|
||||
self.promise.deinit();
|
||||
self.readable_stream_ref.deinit();
|
||||
self.abort_reason.deinit();
|
||||
self.check_server_identity.deinit();
|
||||
self.poll_ref.unref(self.javascript_vm);
|
||||
// abort_signal handled by AbortHandling wrapper (Phase 3)
|
||||
}
|
||||
};
|
||||
|
||||
/// Data shared between main thread and HTTP thread.
|
||||
/// ALL access must be protected by mutex.
|
||||
const SharedData = struct {
|
||||
/// Mutex protecting all mutable fields below
|
||||
mutex: Mutex,
|
||||
|
||||
/// === STATE TRACKING (protected by mutex) ===
|
||||
/// Main fetch lifecycle (mutually exclusive)
|
||||
lifecycle: FetchLifecycle,
|
||||
|
||||
/// Request body streaming state (orthogonal)
|
||||
request_stream_state: RequestStreamState,
|
||||
|
||||
/// Abort requested? (atomic for fast-path check from HTTP thread)
|
||||
abort_requested: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
|
||||
/// Connection upgraded to WebSocket? (one-time flag)
|
||||
upgraded_connection: bool = false,
|
||||
|
||||
/// === REFERENCE COUNTING (atomic) ===
|
||||
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
|
||||
|
||||
/// === HTTP CLIENT DATA (owned by HTTP thread after queue) ===
|
||||
http: ?*http.AsyncHTTP = null,
|
||||
result: http.HTTPClientResult = .{},
|
||||
metadata: ?http.HTTPResponseMetadata = null,
|
||||
|
||||
/// === BUFFERS (ownership documented) ===
|
||||
/// Response buffer written by HTTP thread.
|
||||
/// Ownership: HTTP thread writes, main thread reads under lock, then transfers.
|
||||
response_buffer: MutableString,
|
||||
|
||||
/// Response buffer for JS (accumulated data before creating Response).
|
||||
/// Ownership: Main thread only, but guarded by mutex for consistency.
|
||||
scheduled_response_buffer: MutableString,
|
||||
|
||||
/// Body size tracking
|
||||
body_size: http.HTTPClientResult.BodySize = .unknown,
|
||||
|
||||
/// === COORDINATION FLAGS (atomic) ===
|
||||
/// Has callback been scheduled to main thread?
|
||||
/// Prevents duplicate enqueues from HTTP thread.
|
||||
has_schedule_callback: std.atomic.Value(bool),
|
||||
|
||||
/// Signal storage for HTTP thread
|
||||
signals: http.Signals = .{},
|
||||
signal_store: http.Signals.Store = .{},
|
||||
|
||||
fn init(allocator: std.mem.Allocator) !SharedData {
|
||||
return SharedData{
|
||||
.mutex = Mutex.init(),
|
||||
.lifecycle = .created,
|
||||
.request_stream_state = .none,
|
||||
.response_buffer = try MutableString.init(allocator, 0),
|
||||
.scheduled_response_buffer = try MutableString.init(allocator, 0),
|
||||
.has_schedule_callback = std.atomic.Value(bool).init(false),
|
||||
};
|
||||
}
|
||||
|
||||
fn deinit(self: *SharedData) void {
|
||||
self.response_buffer.deinit();
|
||||
self.scheduled_response_buffer.deinit();
|
||||
if (self.metadata) |*metadata| {
|
||||
metadata.deinit(self.response_buffer.allocator);
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock the shared data for exclusive access.
|
||||
/// Returns RAII wrapper that auto-unlocks on scope exit.
|
||||
fn lock(self: *SharedData) LockedSharedData {
|
||||
self.mutex.lock();
|
||||
return LockedSharedData{ .shared = self };
|
||||
}
|
||||
};
|
||||
|
||||
/// RAII wrapper for locked shared data.
|
||||
/// Automatically unlocks on scope exit.
|
||||
const LockedSharedData = struct {
|
||||
shared: *SharedData,
|
||||
|
||||
fn unlock(self: LockedSharedData) void {
|
||||
self.shared.mutex.unlock();
|
||||
}
|
||||
|
||||
/// Convenience: Get current lifecycle
|
||||
fn lifecycle(self: LockedSharedData) FetchLifecycle {
|
||||
return self.shared.lifecycle;
|
||||
}
|
||||
|
||||
/// Convenience: Transition lifecycle with validation
|
||||
fn transitionTo(self: LockedSharedData, new_state: FetchLifecycle) void {
|
||||
transitionLifecycle(self.shared, self.shared.lifecycle, new_state);
|
||||
}
|
||||
|
||||
/// Convenience: Should ignore body data?
|
||||
fn shouldIgnoreBody(self: LockedSharedData) bool {
|
||||
return shouldIgnoreBodyData(
|
||||
self.shared.lifecycle,
|
||||
self.shared.abort_requested.load(.acquire),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// EXPLICIT MEMORY LIFETIMES
|
||||
// ============================================================================
|
||||
//
|
||||
// Wrapper types that make ownership and cleanup paths explicit.
|
||||
|
||||
/// Request headers with explicit ownership tracking.
|
||||
/// Encapsulates the "do I need to free this?" logic.
|
||||
const RequestHeaders = struct {
|
||||
headers: Headers,
|
||||
owned: bool, // true if we must deinit
|
||||
|
||||
/// Create empty headers (not owned - no cleanup needed)
|
||||
fn initEmpty(allocator: std.mem.Allocator) RequestHeaders {
|
||||
return .{
|
||||
.headers = .{ .allocator = allocator },
|
||||
.owned = false,
|
||||
};
|
||||
}
|
||||
|
||||
/// Extract headers from FetchHeaders (owned - we must cleanup)
|
||||
fn initFromFetchHeaders(
|
||||
fetch_headers: *FetchHeaders,
|
||||
allocator: std.mem.Allocator,
|
||||
) !RequestHeaders {
|
||||
return .{
|
||||
.headers = try Headers.from(fetch_headers, allocator),
|
||||
.owned = true,
|
||||
};
|
||||
}
|
||||
|
||||
/// Single cleanup path
|
||||
fn deinit(self: *RequestHeaders) void {
|
||||
if (self.owned) {
|
||||
self.headers.entries.deinit(self.headers.allocator);
|
||||
self.headers.buf.deinit(self.headers.allocator);
|
||||
}
|
||||
}
|
||||
|
||||
/// Borrow headers for HTTP request
|
||||
fn borrow(self: *RequestHeaders) *Headers {
|
||||
return &self.headers;
|
||||
}
|
||||
};
|
||||
|
||||
/// Response metadata with explicit take semantics.
|
||||
/// Ensures metadata is only transferred once to Response object.
|
||||
const ResponseMetadataHolder = struct {
|
||||
metadata: ?http.HTTPResponseMetadata = null,
|
||||
certificate_info: ?http.CertificateInfo = null,
|
||||
allocator: std.mem.Allocator,
|
||||
|
||||
fn init(allocator: std.mem.Allocator) ResponseMetadataHolder {
|
||||
return .{ .allocator = allocator };
|
||||
}
|
||||
|
||||
/// Take metadata, transferring ownership to caller.
|
||||
/// Can only be called once - subsequent calls return null.
|
||||
fn takeMetadata(self: *ResponseMetadataHolder) ?http.HTTPResponseMetadata {
|
||||
const meta = self.metadata;
|
||||
self.metadata = null; // Clear to prevent double-take
|
||||
return meta;
|
||||
}
|
||||
|
||||
/// Take certificate info, transferring ownership to caller.
|
||||
fn takeCertificate(self: *ResponseMetadataHolder) ?http.CertificateInfo {
|
||||
const cert = self.certificate_info;
|
||||
self.certificate_info = null;
|
||||
return cert;
|
||||
}
|
||||
|
||||
/// Set metadata from HTTP result (takes ownership).
|
||||
/// Frees old metadata if present.
|
||||
fn setMetadata(self: *ResponseMetadataHolder, metadata: http.HTTPResponseMetadata) void {
|
||||
if (self.metadata) |old| {
|
||||
old.deinit(self.allocator);
|
||||
}
|
||||
self.metadata = metadata;
|
||||
}
|
||||
|
||||
/// Set certificate info from HTTP result (takes ownership).
|
||||
fn setCertificate(self: *ResponseMetadataHolder, cert: http.CertificateInfo) void {
|
||||
if (self.certificate_info) |old| {
|
||||
old.deinit(self.allocator);
|
||||
}
|
||||
self.certificate_info = cert;
|
||||
}
|
||||
|
||||
/// Single cleanup path
|
||||
fn deinit(self: *ResponseMetadataHolder) void {
|
||||
if (self.metadata) |metadata| {
|
||||
metadata.deinit(self.allocator);
|
||||
}
|
||||
if (self.certificate_info) |cert| {
|
||||
cert.deinit(self.allocator);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Abort signal handling with centralized lifecycle management.
|
||||
/// Ensures all ref/unref operations are paired correctly.
|
||||
const AbortHandling = struct {
|
||||
signal: ?*jsc.WebCore.AbortSignal = null,
|
||||
has_pending_activity_ref: bool = false,
|
||||
has_listener: bool = false,
|
||||
|
||||
/// Attach abort signal and set up listener.
|
||||
/// Takes ownership of signal ref.
|
||||
fn attachSignal(
|
||||
self: *AbortHandling,
|
||||
signal: *jsc.WebCore.AbortSignal,
|
||||
fetch: *FetchTasklet,
|
||||
) !void {
|
||||
bun.assert(self.signal == null);
|
||||
|
||||
// Ref the signal (we now own a reference)
|
||||
signal.ref();
|
||||
self.signal = signal;
|
||||
|
||||
// Listen for abort event
|
||||
const listener = signal.listen(FetchTasklet, fetch, onAbortCallback);
|
||||
self.has_listener = (listener != null);
|
||||
|
||||
// Add pending activity ref (keeps signal alive)
|
||||
signal.pendingActivityRef();
|
||||
self.has_pending_activity_ref = true;
|
||||
}
|
||||
|
||||
/// Detach signal and clean up all references.
|
||||
fn detach(self: *AbortHandling) void {
|
||||
if (self.signal) |signal| {
|
||||
// Remove pending activity ref if we added one
|
||||
if (self.has_pending_activity_ref) {
|
||||
signal.pendingActivityUnref();
|
||||
self.has_pending_activity_ref = false;
|
||||
}
|
||||
|
||||
// Listener is automatically removed by signal
|
||||
self.has_listener = false;
|
||||
|
||||
// Unref the signal (release our reference)
|
||||
signal.unref();
|
||||
self.signal = null;
|
||||
}
|
||||
}
|
||||
|
||||
/// Single cleanup path
|
||||
fn deinit(self: *AbortHandling) void {
|
||||
self.detach();
|
||||
}
|
||||
|
||||
/// Callback invoked when abort signal fires
|
||||
/// Implementation moved to FetchTasklet.onAbortSignalFired (forward reference)
|
||||
fn onAbortCallback(fetch: *FetchTasklet) void {
|
||||
fetch.onAbortSignalFired();
|
||||
}
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// UNIFIED ERROR HANDLING
|
||||
// ============================================================================
|
||||
|
||||
/// Unified error storage with explicit precedence rules.
|
||||
/// Replaces scattered error tracking across multiple fields.
|
||||
const FetchError = union(enum) {
|
||||
none: void,
|
||||
http_error: anyerror, // From HTTP client result.fail
|
||||
abort_error: jsc.Strong, // From AbortSignal
|
||||
js_error: jsc.Strong, // From JS callback (e.g., checkServerIdentity)
|
||||
tls_error: jsc.Strong, // From TLS validation
|
||||
|
||||
/// Set new error, freeing old error if present
|
||||
fn set(self: *FetchError, new_error: FetchError) void {
|
||||
self.deinit();
|
||||
self.* = new_error;
|
||||
}
|
||||
|
||||
/// Check if this is an abort error (for special handling)
|
||||
fn isAbort(self: FetchError) bool {
|
||||
return self == .abort_error;
|
||||
}
|
||||
|
||||
/// Single cleanup path
|
||||
fn deinit(self: *FetchError) void {
|
||||
switch (self.*) {
|
||||
.none, .http_error => {},
|
||||
.abort_error => |*strong| strong.deinit(),
|
||||
.js_error => |*strong| strong.deinit(),
|
||||
.tls_error => |*strong| strong.deinit(),
|
||||
}
|
||||
self.* = .none;
|
||||
}
|
||||
};
|
||||
|
||||
pub const FetchTasklet = struct {
|
||||
pub const ResumableSink = jsc.WebCore.ResumableFetchSink;
|
||||
|
||||
@@ -59,6 +600,48 @@ pub const FetchTasklet = struct {
|
||||
|
||||
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
|
||||
|
||||
// === NEW STATE MACHINE FIELDS (during migration, kept alongside old flags) ===
|
||||
/// Main fetch lifecycle state (mutually exclusive)
|
||||
lifecycle: FetchLifecycle = .created,
|
||||
/// Request body streaming state (orthogonal to lifecycle)
|
||||
request_stream_state: RequestStreamState = .none,
|
||||
/// Abort requested flag (atomic, orthogonal to lifecycle)
|
||||
abort_requested: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
|
||||
// === NEW WRAPPER FIELDS (Phase 2-6: during migration) ===
|
||||
/// Single allocator for all owned resources
|
||||
allocator: std.mem.Allocator = bun.default_allocator,
|
||||
|
||||
/// Main thread data (to be populated during migration)
|
||||
main_thread: ?MainThreadData = null,
|
||||
|
||||
/// Shared thread-safe data (to be populated during migration)
|
||||
shared: ?SharedData = null,
|
||||
|
||||
/// Request headers wrapper (to be populated during migration)
|
||||
new_request_headers: ?RequestHeaders = null,
|
||||
|
||||
/// Response metadata holder (to be populated during migration)
|
||||
response_metadata_holder: ?ResponseMetadataHolder = null,
|
||||
|
||||
/// Abort handling wrapper (to be populated during migration)
|
||||
abort_handling: ?AbortHandling = null,
|
||||
|
||||
/// Unified error storage (to be populated during migration)
|
||||
fetch_error: FetchError = .none,
|
||||
|
||||
/// URL proxy buffer as owned pointer (to be populated during migration)
|
||||
url_proxy_buffer_owned: ?[]const u8 = null,
|
||||
|
||||
/// URL slice (borrowed from url_proxy_buffer)
|
||||
url: []const u8 = "",
|
||||
|
||||
/// Proxy slice (borrowed from url_proxy_buffer)
|
||||
proxy: []const u8 = "",
|
||||
|
||||
/// Hostname as owned pointer (to be populated during migration)
|
||||
hostname_owned: ?[]u8 = null,
|
||||
|
||||
pub fn ref(this: *FetchTasklet) void {
|
||||
const count = this.ref_count.fetchAdd(1, .monotonic);
|
||||
bun.debugAssert(count > 0);
|
||||
@@ -85,6 +668,55 @@ pub const FetchTasklet = struct {
|
||||
}
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// NEW METHODS (Phase 3-6: State-based dispatch and abort handling)
|
||||
// ========================================================================
|
||||
|
||||
/// Called when abort signal fires (referenced from AbortHandling)
|
||||
fn onAbortSignalFired(this: *FetchTasklet) void {
|
||||
// Set atomic abort flag for HTTP thread fast-path
|
||||
this.abort_requested.store(true, .release);
|
||||
|
||||
// Transition state under lock
|
||||
this.mutex.lock();
|
||||
const old_lifecycle = this.lifecycle;
|
||||
if (!old_lifecycle.isTerminal()) {
|
||||
if (bun.Environment.isDebug) {
|
||||
bun.assert(old_lifecycle.canTransitionTo(.aborted));
|
||||
}
|
||||
this.lifecycle = .aborted;
|
||||
}
|
||||
this.mutex.unlock();
|
||||
|
||||
// TODO: Schedule abort handling on main thread (if not already there)
|
||||
// this.scheduleAbortHandling();
|
||||
}
|
||||
|
||||
/// Process initial body data before Response object exists (Phase 4 stub)
|
||||
fn processBodyDataInitial(this: *FetchTasklet) void {
|
||||
// TODO: Implement state-based dispatch
|
||||
_ = this;
|
||||
}
|
||||
|
||||
/// Stream body data to JS ReadableStream (Phase 4 stub)
|
||||
fn streamBodyToJS(this: *FetchTasklet) void {
|
||||
// TODO: Implement streaming to JS
|
||||
_ = this;
|
||||
}
|
||||
|
||||
/// Buffer body data in memory (Phase 4 stub)
|
||||
fn bufferBodyData(this: *FetchTasklet) void {
|
||||
// TODO: Implement buffering
|
||||
_ = this;
|
||||
}
|
||||
|
||||
/// Buffer specific data (used when stream disappears mid-flight) (Phase 4 stub)
|
||||
fn bufferBodyDataDirect(this: *FetchTasklet, data: []const u8) void {
|
||||
// TODO: Implement direct buffering
|
||||
_ = this;
|
||||
_ = data;
|
||||
}
|
||||
|
||||
pub const HTTPRequestBody = union(enum) {
|
||||
AnyBlob: AnyBlob,
|
||||
Sendfile: http.SendFile,
|
||||
|
||||
Reference in New Issue
Block a user