Files
bun.sh/src/bun.js/webcore/fetch/FetchTasklet.zig
Claude Bot 8a0681e0c8 Phase 2, Step 2.3: Fix Cross-Thread Deallocation
Fix potential memory leak and use-after-free in derefFromThread when VM is
shutting down. The function now checks VM shutdown status before enqueueing
cleanup tasks, intentionally leaking memory (detected by ASAN) rather than
risking use-after-free crashes.

Changes:
- Add VM shutdown pre-check before enqueueTaskConcurrent
- Add debug logging for intentional leaks during shutdown
- Add deinitFromMainThread helper with main thread assertion
- Early return on shutdown to prevent unsafe cleanup attempts

This is safer than attempting cleanup during VM shutdown and makes the
memory safety trade-off explicit: leak detection over undefined behavior.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-03 14:15:13 +00:00

1766 lines
70 KiB
Zig

// ============================================================================
// STATE MACHINE
// ============================================================================
//
// FetchTasklet tracks multiple orthogonal state dimensions:
// 1. Main lifecycle (FetchLifecycle) - mutually exclusive
// 2. Request streaming (RequestStreamState) - independent
// 3. Abort status (atomic bool) - independent
// 4. Connection upgrade (bool) - one-time flag
/// Main fetch lifecycle - mutually exclusive OR states.
/// Every FetchTasklet is in exactly ONE of these states at a time.
const FetchLifecycle = enum(u8) {
/// Initial: Created, not yet queued to HTTP thread
created,
/// HTTP request in flight
/// Replaces complex state tracking in old code:
/// - Sending headers
/// - Sending body (if present)
/// - Waiting for response headers
http_active,
/// Receiving response headers from server
http_receiving_headers,
/// Receiving response body from server
/// Response object may or may not exist yet
http_receiving_body,
/// Response object created, body not yet accessed by JS
/// Replaces: is_waiting_body = true
response_awaiting_body_access,
/// Response body is streaming to JS ReadableStream
response_body_streaming,
/// Response body is buffering in memory (no stream created yet)
response_body_buffering,
/// Terminal states (no transitions out)
completed,
failed,
aborted,
pub fn isTerminal(self: FetchLifecycle) bool {
return switch (self) {
.completed, .failed, .aborted => true,
else => false,
};
}
pub fn isHTTPActive(self: FetchLifecycle) bool {
return switch (self) {
.http_active, .http_receiving_headers, .http_receiving_body => true,
else => false,
};
}
pub fn canReceiveBody(self: FetchLifecycle) bool {
return switch (self) {
.http_receiving_body, .response_body_streaming, .response_body_buffering => true,
else => false,
};
}
/// Validate state transition (debug builds only)
pub fn canTransitionTo(self: FetchLifecycle, next: FetchLifecycle) bool {
return switch (self) {
.created => switch (next) {
.http_active, .aborted, .failed => true,
else => false,
},
.http_active => switch (next) {
.http_receiving_headers, .aborted, .failed => true,
else => false,
},
.http_receiving_headers => switch (next) {
.http_receiving_body,
.response_awaiting_body_access,
.response_body_buffering,
.completed, // Empty body case
.aborted,
.failed,
=> true,
else => false,
},
.http_receiving_body => switch (next) {
.response_awaiting_body_access, .response_body_streaming, .response_body_buffering, .completed, .aborted, .failed => true,
else => false,
},
.response_awaiting_body_access => switch (next) {
.response_body_streaming, .response_body_buffering, .completed, .aborted, .failed => true,
else => false,
},
.response_body_streaming => switch (next) {
.completed, .aborted, .failed => true,
else => false,
},
.response_body_buffering => switch (next) {
.response_body_streaming, // Upgrade to streaming
.completed,
.aborted,
.failed,
=> true,
else => false,
},
.completed, .failed, .aborted => false, // Terminal
};
}
};
/// Request body streaming state - orthogonal to main lifecycle.
/// Only relevant when request has a streaming body (ReadableStream).
const RequestStreamState = enum(u8) {
/// No streaming request body (Blob, Sendfile, or empty)
none,
/// Stream exists but hasn't started yet (waiting for server ready)
/// Replaces: is_waiting_request_stream_start = true
waiting_start,
/// Stream actively being read and sent to server
active,
/// Stream finished (successfully or with error)
complete,
};
// ============================================================================
// STATE TRANSITION EXAMPLES
// ============================================================================
//
// Normal fetch with buffered body:
// created → http_active → http_receiving_headers → http_receiving_body
// → response_body_buffering → completed
//
// Normal fetch with streaming body:
// created → http_active → http_receiving_headers → http_receiving_body
// → response_body_streaming → completed
//
// Fetch with body accessed after response created:
// created → http_active → http_receiving_headers → http_receiving_body
// → response_awaiting_body_access → response_body_streaming → completed
//
// Aborted fetch:
// (any state) → aborted
//
// Failed fetch:
// (any state) → failed
//
// Request streaming (orthogonal):
// none (most requests)
// OR: waiting_start → active → complete
/// Helper for validated state transitions (in debug builds)
fn transitionLifecycle(shared: *SharedData, old_state: FetchLifecycle, new_state: FetchLifecycle) void {
if (bun.Environment.isDebug) {
bun.assert(old_state.canTransitionTo(new_state));
}
shared.lifecycle = new_state;
}
/// Computed property: Should we ignore remaining body data?
/// Replaces: ignore_data boolean flag
fn shouldIgnoreBodyData(lifecycle: FetchLifecycle, abort_requested: bool) bool {
// Ignore data if:
// 1. Abort was requested
// 2. Already in aborted state
// 3. Response finalized (handled via lifecycle check)
return abort_requested or lifecycle == .aborted;
}
// ============================================================================
// THREAD SAFETY ARCHITECTURE
// ============================================================================
//
// Data is split into two categories:
// 1. MainThreadData - Only accessed from JavaScript main thread (no lock)
// 2. SharedData - Accessed from both threads (mutex protected)
/// Data that can ONLY be accessed from the main JavaScript thread.
/// No mutex needed - thread confinement enforced by assertions in debug builds.
const MainThreadData = struct {
/// Global object (non-owning pointer)
global_this: *JSGlobalObject,
/// VM (non-owning pointer)
javascript_vm: *VirtualMachine,
/// Promise to resolve/reject (owned)
promise: jsc.JSPromise.Strong,
/// Weak reference to Response JS object for finalization tracking.
/// Can become null if GC collects the Response.
response_weak: jsc.Weak(FetchTasklet) = .{},
/// Native Response object for finalization tracking.
/// INTENTIONAL DUAL OWNERSHIP with response_weak:
/// - Allows tracking when Response JS object is finalized
/// - Signals we should stop processing body data
/// - See Bun__FetchResponse_finalize for usage
native_response: ?*Response = null,
/// Strong reference to response ReadableStream (owned)
readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{},
/// Abort signal (ref-counted via AbortSignal's API)
/// Managed by AbortHandling wrapper
abort_signal: ?*jsc.WebCore.AbortSignal = null,
/// Abort reason (owned)
abort_reason: jsc.Strong.Optional = .empty,
/// Custom TLS check function (owned)
check_server_identity: jsc.Strong.Optional = .empty,
/// Keep VM alive during fetch
poll_ref: Async.KeepAlive = .{},
/// Task for cross-thread callbacks
concurrent_task: jsc.ConcurrentTask = .{},
/// Debug tracker
tracker: jsc.Debugger.AsyncTaskTracker,
fn assertMainThread(self: *const MainThreadData) void {
if (bun.Environment.isDebug) {
// Thread confinement assertion
// Could add actual thread ID check if available
_ = self;
}
}
fn deinit(self: *MainThreadData) void {
self.promise.deinit();
self.readable_stream_ref.deinit();
self.abort_reason.deinit();
self.check_server_identity.deinit();
self.poll_ref.unref(self.javascript_vm);
// abort_signal handled by AbortHandling wrapper
}
};
/// Data shared between main thread and HTTP thread.
/// ALL access must be protected by mutex.
const SharedData = struct {
/// Mutex protecting all mutable fields below
mutex: bun.Mutex,
/// === STATE TRACKING (protected by mutex) ===
/// Main fetch lifecycle (mutually exclusive)
lifecycle: FetchLifecycle,
/// Request body streaming state (orthogonal)
request_stream_state: RequestStreamState,
/// Abort requested? (atomic for fast-path check from HTTP thread)
abort_requested: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
/// Connection upgraded to WebSocket? (one-time flag)
upgraded_connection: bool = false,
/// === REFERENCE COUNTING (atomic) ===
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
/// === HTTP CLIENT DATA (owned by HTTP thread after queue) ===
http: ?*http.AsyncHTTP = null,
result: http.HTTPClientResult = .{},
metadata: ?http.HTTPResponseMetadata = null,
/// === BUFFERS (ownership documented) ===
/// Response buffer written by HTTP thread.
/// Ownership: HTTP thread writes, main thread reads under lock, then transfers.
response_buffer: MutableString,
/// Response buffer for JS (accumulated data before creating Response).
/// Ownership: Main thread only, but guarded by mutex for consistency.
scheduled_response_buffer: MutableString,
/// Body size tracking
body_size: http.HTTPClientResult.BodySize = .unknown,
/// === COORDINATION FLAGS (atomic) ===
/// Has callback been scheduled to main thread?
/// Prevents duplicate enqueues from HTTP thread.
has_schedule_callback: std.atomic.Value(bool),
/// Signal storage for HTTP thread
signals: http.Signals = .{},
signal_store: http.Signals.Store = .{},
fn init(allocator: std.mem.Allocator) !SharedData {
return SharedData{
.mutex = bun.Mutex.init(),
.lifecycle = .created,
.request_stream_state = .none,
.response_buffer = try MutableString.init(allocator, 0),
.scheduled_response_buffer = try MutableString.init(allocator, 0),
.has_schedule_callback = std.atomic.Value(bool).init(false),
};
}
fn deinit(self: *SharedData) void {
self.response_buffer.deinit();
self.scheduled_response_buffer.deinit();
if (self.metadata) |*metadata| {
metadata.deinit(self.response_buffer.allocator);
}
}
/// Lock the shared data for exclusive access.
/// Returns RAII wrapper that auto-unlocks on scope exit.
fn lock(self: *SharedData) LockedSharedData {
self.mutex.lock();
return LockedSharedData{ .shared = self };
}
};
/// RAII wrapper for locked shared data.
/// Automatically unlocks on scope exit.
const LockedSharedData = struct {
shared: *SharedData,
fn unlock(self: LockedSharedData) void {
self.shared.mutex.unlock();
}
/// Convenience: Get current lifecycle
fn lifecycle(self: LockedSharedData) FetchLifecycle {
return self.shared.lifecycle;
}
/// Convenience: Transition lifecycle with validation
fn transitionTo(self: LockedSharedData, new_state: FetchLifecycle) void {
transitionLifecycle(self.shared, self.shared.lifecycle, new_state);
}
/// Convenience: Should ignore body data?
fn shouldIgnoreBody(self: LockedSharedData) bool {
return shouldIgnoreBodyData(
self.shared.lifecycle,
self.shared.abort_requested.load(.acquire),
);
}
};
pub const FetchTasklet = struct {
pub const ResumableSink = jsc.WebCore.ResumableFetchSink;
const log = Output.scoped(.FetchTasklet, .visible);
sink: ?*ResumableSink = null,
http: ?*http.AsyncHTTP = null,
result: http.HTTPClientResult = .{},
metadata: ?http.HTTPResponseMetadata = null,
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
request_body_streaming_buffer: ?*http.ThreadSafeStreamBuffer = null,
/// buffer being used by AsyncHTTP
response_buffer: MutableString = undefined,
/// buffer used to stream response to JS
scheduled_response_buffer: MutableString = undefined,
/// response weak ref we need this to track the response JS lifetime
response: jsc.Weak(FetchTasklet) = .{},
/// native response ref if we still need it when JS is discarted
native_response: ?*Response = null,
ignore_data: bool = false,
/// stream strong ref if any is available
readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{},
request_headers: Headers = Headers{ .allocator = undefined },
promise: jsc.JSPromise.Strong,
concurrent_task: jsc.ConcurrentTask = .{},
poll_ref: Async.KeepAlive = .{},
/// For Http Client requests
/// when Content-Length is provided this represents the whole size of the request
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
/// If is not chunked encoded and Content-Length is not provided this will be unknown
body_size: http.HTTPClientResult.BodySize = .unknown,
/// This is url + proxy memory buffer and is owned by FetchTasklet
/// We always clone url and proxy (if informed)
url_proxy_buffer: []const u8 = "",
signal: ?*jsc.WebCore.AbortSignal = null,
signals: http.Signals = .{},
signal_store: http.Signals.Store = .{},
has_schedule_callback: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
// must be stored because AbortSignal stores reason weakly
abort_reason: jsc.Strong.Optional = .empty,
// custom checkServerIdentity
check_server_identity: jsc.Strong.Optional = .empty,
reject_unauthorized: bool = true,
upgraded_connection: bool = false,
// Custom Hostname
hostname: ?[]u8 = null,
is_waiting_body: bool = false,
is_waiting_abort: bool = false,
is_waiting_request_stream_start: bool = false,
mutex: Mutex,
tracker: jsc.Debugger.AsyncTaskTracker,
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
pub fn ref(this: *FetchTasklet) void {
const count = this.ref_count.fetchAdd(1, .monotonic);
bun.debugAssert(count > 0);
}
pub fn deref(this: *FetchTasklet) void {
const count = this.ref_count.fetchSub(1, .monotonic);
bun.debugAssert(count > 0);
if (count == 1) {
this.deinit() catch |err| switch (err) {};
}
}
pub fn derefFromThread(this: *FetchTasklet) void {
const count = this.ref_count.fetchSub(1, .monotonic);
bun.debugAssert(count > 0);
if (count == 1) {
// this is really unlikely to happen, but can happen
// lets make sure that we always call deinit from main thread
const vm = this.javascript_vm;
// Check if VM is shutting down before enqueuing
if (vm.isShuttingDown()) {
// VM is shutting down - cannot safely enqueue task to main thread
// This will be detected as a leak by ASAN, which is correct behavior.
// Better to leak than use-after-free.
if (bun.Environment.isDebug) {
bun.Output.err(
"LEAK",
"FetchTasklet leaked during VM shutdown (addr=0x{x})",
.{@intFromPtr(this)},
);
}
// Intentional leak - safer than use-after-free
return;
}
vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.deinitFromMainThread));
}
}
pub const HTTPRequestBody = union(enum) {
AnyBlob: AnyBlob,
Sendfile: http.SendFile,
ReadableStream: jsc.WebCore.ReadableStream.Strong,
pub const Empty: HTTPRequestBody = .{ .AnyBlob = .{ .Blob = .{} } };
pub fn store(this: *HTTPRequestBody) ?*Blob.Store {
return switch (this.*) {
.AnyBlob => this.AnyBlob.store(),
else => null,
};
}
pub fn slice(this: *const HTTPRequestBody) []const u8 {
return switch (this.*) {
.AnyBlob => this.AnyBlob.slice(),
else => "",
};
}
pub fn detach(this: *HTTPRequestBody) void {
switch (this.*) {
.AnyBlob => this.AnyBlob.detach(),
.ReadableStream => |*stream| {
stream.deinit();
},
.Sendfile => {
if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0)
this.Sendfile.fd.close();
this.Sendfile.offset = 0;
this.Sendfile.remain = 0;
},
}
}
pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!HTTPRequestBody {
var body_value = try Body.Value.fromJS(globalThis, value);
if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed2(globalThis)))) {
return globalThis.ERR(.BODY_ALREADY_USED, "body already used", .{}).throw();
}
if (body_value == .Locked) {
if (body_value.Locked.readable.has()) {
// just grab the ref
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
}
const readable = try body_value.toReadableStream(globalThis);
if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) {
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
}
}
return FetchTasklet.HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() };
}
pub fn needsToReadFile(this: *HTTPRequestBody) bool {
return switch (this.*) {
.AnyBlob => |blob| blob.needsToReadFile(),
else => false,
};
}
pub fn isS3(this: *const HTTPRequestBody) bool {
return switch (this.*) {
.AnyBlob => |*blob| blob.isS3(),
else => false,
};
}
pub fn hasContentTypeFromUser(this: *HTTPRequestBody) bool {
return switch (this.*) {
.AnyBlob => |blob| blob.hasContentTypeFromUser(),
else => false,
};
}
pub fn getAnyBlob(this: *HTTPRequestBody) ?*AnyBlob {
return switch (this.*) {
.AnyBlob => &this.AnyBlob,
else => null,
};
}
pub fn hasBody(this: *HTTPRequestBody) bool {
return switch (this.*) {
.AnyBlob => |blob| blob.size() > 0,
.ReadableStream => |*stream| stream.has(),
.Sendfile => true,
};
}
};
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
return FetchTasklet{};
}
fn clearSink(this: *FetchTasklet) void {
if (this.sink) |sink| {
this.sink = null;
sink.deref();
}
if (this.request_body_streaming_buffer) |buffer| {
this.request_body_streaming_buffer = null;
buffer.clearDrainCallback();
buffer.deref();
}
}
fn clearData(this: *FetchTasklet) void {
log("clearData ", .{});
const allocator = bun.default_allocator;
if (this.url_proxy_buffer.len > 0) {
allocator.free(this.url_proxy_buffer);
this.url_proxy_buffer.len = 0;
}
if (this.hostname) |hostname| {
allocator.free(hostname);
this.hostname = null;
}
if (this.result.certificate_info) |*certificate| {
certificate.deinit(bun.default_allocator);
this.result.certificate_info = null;
}
this.request_headers.entries.deinit(allocator);
this.request_headers.buf.deinit(allocator);
this.request_headers = Headers{ .allocator = undefined };
if (this.http) |http_| {
http_.clearData();
}
if (this.metadata != null) {
this.metadata.?.deinit(allocator);
this.metadata = null;
}
this.response_buffer.deinit();
this.response.deinit();
if (this.native_response) |response| {
this.native_response = null;
response.unref();
}
this.readable_stream_ref.deinit();
this.scheduled_response_buffer.deinit();
if (this.request_body != .ReadableStream or this.is_waiting_request_stream_start) {
this.request_body.detach();
}
this.abort_reason.deinit();
this.check_server_identity.deinit();
this.clearAbortSignal();
// Clear the sink only after the requested ended otherwise we would potentialy lose the last chunk
this.clearSink();
}
/// Helper function to ensure deinit happens on main thread.
/// Called via enqueueTaskConcurrent from derefFromThread.
// XXX: 'fn (*FetchTasklet) error{}!void' coerces to 'fn (*FetchTasklet) bun.JSError!void' but 'fn (*FetchTasklet) void' does not
fn deinitFromMainThread(this: *FetchTasklet) error{}!void {
bun.debugAssert(this.javascript_vm.isMainThread());
this.deinit() catch |err| switch (err) {};
}
// XXX: 'fn (*FetchTasklet) error{}!void' coerces to 'fn (*FetchTasklet) bun.JSError!void' but 'fn (*FetchTasklet) void' does not
pub fn deinit(this: *FetchTasklet) error{}!void {
log("deinit", .{});
bun.assert(this.ref_count.load(.monotonic) == 0);
this.clearData();
const allocator = bun.default_allocator;
if (this.http) |http_| {
this.http = null;
allocator.destroy(http_);
}
allocator.destroy(this);
}
fn getCurrentResponse(this: *FetchTasklet) ?*Response {
// we need a body to resolve the promise when buffering
if (this.native_response) |response| {
return response;
}
// if we did not have a direct reference we check if the Weak ref is still alive
if (this.response.get()) |response_js| {
if (response_js.as(Response)) |response| {
return response;
}
}
return null;
}
pub fn startRequestStream(this: *FetchTasklet) void {
this.is_waiting_request_stream_start = false;
bun.assert(this.request_body == .ReadableStream);
if (this.request_body.ReadableStream.get(this.global_this)) |stream| {
if (this.signal) |signal| {
if (signal.aborted()) {
stream.abort(this.global_this);
return;
}
}
const globalThis = this.global_this;
this.ref(); // lets only unref when sink is done
// +1 because the task refs the sink
const sink = ResumableSink.initExactRefs(globalThis, stream, this, 2);
this.sink = sink;
}
}
pub fn onBodyReceived(this: *FetchTasklet) bun.JSTerminated!void {
const success = this.result.isSuccess();
const globalThis = this.global_this;
// reset the buffer if we are streaming or if we are not waiting for bufferig anymore
var buffer_reset = true;
log("onBodyReceived success={} has_more={}", .{ success, this.result.has_more });
defer {
if (buffer_reset) {
this.scheduled_response_buffer.reset();
}
}
if (!success) {
var err = this.onReject();
var need_deinit = true;
defer if (need_deinit) err.deinit();
var js_err = JSValue.zero;
// if we are streaming update with error
if (this.readable_stream_ref.get(globalThis)) |readable| {
if (readable.ptr == .Bytes) {
js_err = err.toJS(globalThis);
js_err.ensureStillAlive();
try readable.ptr.Bytes.onData(
.{
.err = .{ .JSValue = js_err },
},
bun.default_allocator,
);
}
}
if (this.sink) |sink| {
if (js_err == .zero) {
js_err = err.toJS(globalThis);
js_err.ensureStillAlive();
}
sink.cancel(js_err);
return;
}
// if we are buffering resolve the promise
if (this.getCurrentResponse()) |response| {
need_deinit = false; // body value now owns the error
const body = response.getBodyValue();
try body.toErrorInstance(err, globalThis);
}
return;
}
if (this.readable_stream_ref.get(globalThis)) |readable| {
log("onBodyReceived readable_stream_ref", .{});
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
// body can be marked as used but we still need to pipe the data
const scheduled_response_buffer = &this.scheduled_response_buffer.list;
const chunk = scheduled_response_buffer.items;
if (this.result.has_more) {
try readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk),
},
bun.default_allocator,
);
} else {
var prev = this.readable_stream_ref;
this.readable_stream_ref = .{};
defer prev.deinit();
buffer_reset = false;
try readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk),
},
bun.default_allocator,
);
}
return;
}
}
if (this.getCurrentResponse()) |response| {
log("onBodyReceived Current Response", .{});
const sizeHint = this.getSizeHint();
response.setSizeHint(sizeHint);
if (response.getBodyReadableStream(globalThis)) |readable| {
log("onBodyReceived CurrentResponse BodyReadableStream", .{});
if (readable.ptr == .Bytes) {
const scheduled_response_buffer = this.scheduled_response_buffer.list;
const chunk = scheduled_response_buffer.items;
if (this.result.has_more) {
try readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk),
},
bun.default_allocator,
);
} else {
readable.value.ensureStillAlive();
response.detachReadableStream(globalThis);
try readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk),
},
bun.default_allocator,
);
}
return;
}
}
// we will reach here when not streaming, this is also the only case we dont wanna to reset the buffer
buffer_reset = false;
if (!this.result.has_more) {
var scheduled_response_buffer = this.scheduled_response_buffer.list;
const body = response.getBodyValue();
// done resolve body
var old = body.*;
const body_value = Body.Value{
.InternalBlob = .{
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
},
};
body.* = body_value;
log("onBodyReceived body_value length={}", .{body_value.InternalBlob.bytes.items.len});
this.scheduled_response_buffer = .{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
};
if (old == .Locked) {
log("onBodyReceived old.resolve", .{});
try old.resolve(body, this.global_this, response.getFetchHeaders());
}
}
}
}
pub fn onProgressUpdate(this: *FetchTasklet) bun.JSTerminated!void {
jsc.markBinding(@src());
log("onProgressUpdate", .{});
this.mutex.lock();
this.has_schedule_callback.store(false, .monotonic);
const is_done = !this.result.has_more;
const vm = this.javascript_vm;
// vm is shutting down we cannot touch JS
if (vm.isShuttingDown()) {
this.mutex.unlock();
if (is_done) {
this.deref();
}
return;
}
const globalThis = this.global_this;
defer {
this.mutex.unlock();
// if we are not done we wait until the next call
if (is_done) {
var poll_ref = this.poll_ref;
this.poll_ref = .{};
poll_ref.unref(vm);
this.deref();
}
}
if (this.is_waiting_request_stream_start and this.result.can_stream) {
// start streaming
this.startRequestStream();
}
// if we already respond the metadata and still need to process the body
if (this.is_waiting_body) {
try this.onBodyReceived();
return;
}
if (this.metadata == null and this.result.isSuccess()) return;
// if we abort because of cert error
// we wait the Http Client because we already have the response
// we just need to deinit
if (this.is_waiting_abort) {
return;
}
const promise_value = this.promise.valueOrEmpty();
if (promise_value.isEmptyOrUndefinedOrNull()) {
log("onProgressUpdate: promise_value is null", .{});
this.promise.deinit();
return;
}
if (this.result.certificate_info) |certificate_info| {
this.result.certificate_info = null;
defer certificate_info.deinit(bun.default_allocator);
// we receive some error
if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info)) {
log("onProgressUpdate: aborted due certError", .{});
// we need to abort the request
const promise = promise_value.asAnyPromise().?;
const tracker = this.tracker;
var result = this.onReject();
defer result.deinit();
promise_value.ensureStillAlive();
try promise.reject(globalThis, result.toJS(globalThis));
tracker.didDispatch(globalThis);
this.promise.deinit();
return;
}
// everything ok
if (this.metadata == null) {
log("onProgressUpdate: metadata is null", .{});
return;
}
}
const tracker = this.tracker;
tracker.willDispatch(globalThis);
defer {
log("onProgressUpdate: promise_value is not null", .{});
tracker.didDispatch(globalThis);
this.promise.deinit();
}
const success = this.result.isSuccess();
const result = switch (success) {
true => jsc.Strong.Optional.create(this.onResolve(), globalThis),
false => brk: {
// in this case we wanna a jsc.Strong.Optional so we just convert it
var value = this.onReject();
const err = value.toJS(globalThis);
if (this.sink) |sink| {
sink.cancel(err);
}
break :brk value.JSValue;
},
};
promise_value.ensureStillAlive();
const Holder = struct {
held: jsc.Strong.Optional,
promise: jsc.Strong.Optional,
globalObject: *jsc.JSGlobalObject,
task: jsc.AnyTask,
pub fn resolve(self: *@This()) bun.JSTerminated!void {
// cleanup
defer bun.default_allocator.destroy(self);
defer self.held.deinit();
defer self.promise.deinit();
// resolve the promise
var prom = self.promise.swap().asAnyPromise().?;
const res = self.held.swap();
res.ensureStillAlive();
try prom.resolve(self.globalObject, res);
}
pub fn reject(self: *@This()) bun.JSTerminated!void {
// cleanup
defer bun.default_allocator.destroy(self);
defer self.held.deinit();
defer self.promise.deinit();
// reject the promise
var prom = self.promise.swap().asAnyPromise().?;
const res = self.held.swap();
res.ensureStillAlive();
try prom.reject(self.globalObject, res);
}
};
var holder = bun.handleOom(bun.default_allocator.create(Holder));
holder.* = .{
.held = result,
// we need the promise to be alive until the task is done
.promise = this.promise.strong,
.globalObject = globalThis,
.task = undefined,
};
this.promise.strong = .empty;
holder.task = switch (success) {
true => jsc.AnyTask.New(Holder, Holder.resolve).init(holder),
false => jsc.AnyTask.New(Holder, Holder.reject).init(holder),
};
vm.enqueueTask(jsc.Task.init(&holder.task));
}
pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: http.CertificateInfo) bool {
if (this.check_server_identity.get()) |check_server_identity| {
check_server_identity.ensureStillAlive();
if (certificate_info.cert.len > 0) {
const cert = certificate_info.cert;
var cert_ptr = cert.ptr;
if (BoringSSL.d2i_X509(null, &cert_ptr, @intCast(cert.len))) |x509| {
const globalObject = this.global_this;
defer x509.free();
const js_cert = X509.toJS(x509, globalObject) catch |err| {
switch (err) {
error.JSError => {},
error.OutOfMemory => globalObject.throwOutOfMemory() catch {},
error.JSTerminated => {},
}
const check_result = globalObject.tryTakeException().?;
// mark to wait until deinit
this.is_waiting_abort = this.result.has_more;
this.abort_reason.set(globalObject, check_result);
this.signal_store.aborted.store(true, .monotonic);
this.tracker.didCancel(this.global_this);
// we need to abort the request
if (this.http) |http_| http.http_thread.scheduleShutdown(http_);
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
return false;
};
var hostname: bun.String = bun.String.cloneUTF8(certificate_info.hostname);
defer hostname.deref();
const js_hostname = hostname.toJS(globalObject);
js_hostname.ensureStillAlive();
js_cert.ensureStillAlive();
const check_result = check_server_identity.call(globalObject, .js_undefined, &.{ js_hostname, js_cert }) catch |err| globalObject.takeException(err);
// > Returns <Error> object [...] on failure
if (check_result.isAnyError()) {
// mark to wait until deinit
this.is_waiting_abort = this.result.has_more;
this.abort_reason.set(globalObject, check_result);
this.signal_store.aborted.store(true, .monotonic);
this.tracker.didCancel(this.global_this);
// we need to abort the request
if (this.http) |http_| {
http.http_thread.scheduleShutdown(http_);
}
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
return false;
}
// > On success, returns <undefined>
// We treat any non-error value as a success.
return true;
}
}
}
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
return false;
}
fn getAbortError(this: *FetchTasklet) ?Body.Value.ValueError {
if (this.abort_reason.has()) {
defer this.clearAbortSignal();
const out = this.abort_reason;
this.abort_reason = .empty;
return Body.Value.ValueError{ .JSValue = out };
}
if (this.signal) |signal| {
if (signal.reasonIfAborted(this.global_this)) |reason| {
defer this.clearAbortSignal();
return reason.toBodyValueError(this.global_this);
}
}
return null;
}
fn clearAbortSignal(this: *FetchTasklet) void {
const signal = this.signal orelse return;
this.signal = null;
defer {
signal.pendingActivityUnref();
signal.unref();
}
signal.cleanNativeBindings(this);
}
pub fn onReject(this: *FetchTasklet) Body.Value.ValueError {
bun.assert(this.result.fail != null);
log("onReject", .{});
if (this.getAbortError()) |err| {
return err;
}
if (this.result.abortReason()) |reason| {
return .{ .AbortReason = reason };
}
// some times we don't have metadata so we also check http.url
const path = if (this.metadata) |metadata|
bun.String.cloneUTF8(metadata.url)
else if (this.http) |http_|
bun.String.cloneUTF8(http_.url.href)
else
bun.String.empty;
const fetch_error = jsc.SystemError{
.code = bun.String.static(switch (this.result.fail.?) {
error.ConnectionClosed => "ECONNRESET",
else => |e| @errorName(e),
}),
.message = switch (this.result.fail.?) {
error.ConnectionClosed => bun.String.static("The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()"),
error.FailedToOpenSocket => bun.String.static("Was there a typo in the url or port?"),
error.TooManyRedirects => bun.String.static("The response redirected too many times. For more information, pass `verbose: true` in the second argument to fetch()"),
error.ConnectionRefused => bun.String.static("Unable to connect. Is the computer able to access the url?"),
error.RedirectURLInvalid => bun.String.static("Redirect URL in Location header is invalid."),
error.UNABLE_TO_GET_ISSUER_CERT => bun.String.static("unable to get issuer certificate"),
error.UNABLE_TO_GET_CRL => bun.String.static("unable to get certificate CRL"),
error.UNABLE_TO_DECRYPT_CERT_SIGNATURE => bun.String.static("unable to decrypt certificate's signature"),
error.UNABLE_TO_DECRYPT_CRL_SIGNATURE => bun.String.static("unable to decrypt CRL's signature"),
error.UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY => bun.String.static("unable to decode issuer public key"),
error.CERT_SIGNATURE_FAILURE => bun.String.static("certificate signature failure"),
error.CRL_SIGNATURE_FAILURE => bun.String.static("CRL signature failure"),
error.CERT_NOT_YET_VALID => bun.String.static("certificate is not yet valid"),
error.CRL_NOT_YET_VALID => bun.String.static("CRL is not yet valid"),
error.CERT_HAS_EXPIRED => bun.String.static("certificate has expired"),
error.CRL_HAS_EXPIRED => bun.String.static("CRL has expired"),
error.ERROR_IN_CERT_NOT_BEFORE_FIELD => bun.String.static("format error in certificate's notBefore field"),
error.ERROR_IN_CERT_NOT_AFTER_FIELD => bun.String.static("format error in certificate's notAfter field"),
error.ERROR_IN_CRL_LAST_UPDATE_FIELD => bun.String.static("format error in CRL's lastUpdate field"),
error.ERROR_IN_CRL_NEXT_UPDATE_FIELD => bun.String.static("format error in CRL's nextUpdate field"),
error.OUT_OF_MEM => bun.String.static("out of memory"),
error.DEPTH_ZERO_SELF_SIGNED_CERT => bun.String.static("self signed certificate"),
error.SELF_SIGNED_CERT_IN_CHAIN => bun.String.static("self signed certificate in certificate chain"),
error.UNABLE_TO_GET_ISSUER_CERT_LOCALLY => bun.String.static("unable to get local issuer certificate"),
error.UNABLE_TO_VERIFY_LEAF_SIGNATURE => bun.String.static("unable to verify the first certificate"),
error.CERT_CHAIN_TOO_LONG => bun.String.static("certificate chain too long"),
error.CERT_REVOKED => bun.String.static("certificate revoked"),
error.INVALID_CA => bun.String.static("invalid CA certificate"),
error.INVALID_NON_CA => bun.String.static("invalid non-CA certificate (has CA markings)"),
error.PATH_LENGTH_EXCEEDED => bun.String.static("path length constraint exceeded"),
error.PROXY_PATH_LENGTH_EXCEEDED => bun.String.static("proxy path length constraint exceeded"),
error.PROXY_CERTIFICATES_NOT_ALLOWED => bun.String.static("proxy certificates not allowed, please set the appropriate flag"),
error.INVALID_PURPOSE => bun.String.static("unsupported certificate purpose"),
error.CERT_UNTRUSTED => bun.String.static("certificate not trusted"),
error.CERT_REJECTED => bun.String.static("certificate rejected"),
error.APPLICATION_VERIFICATION => bun.String.static("application verification failure"),
error.SUBJECT_ISSUER_MISMATCH => bun.String.static("subject issuer mismatch"),
error.AKID_SKID_MISMATCH => bun.String.static("authority and subject key identifier mismatch"),
error.AKID_ISSUER_SERIAL_MISMATCH => bun.String.static("authority and issuer serial number mismatch"),
error.KEYUSAGE_NO_CERTSIGN => bun.String.static("key usage does not include certificate signing"),
error.UNABLE_TO_GET_CRL_ISSUER => bun.String.static("unable to get CRL issuer certificate"),
error.UNHANDLED_CRITICAL_EXTENSION => bun.String.static("unhandled critical extension"),
error.KEYUSAGE_NO_CRL_SIGN => bun.String.static("key usage does not include CRL signing"),
error.KEYUSAGE_NO_DIGITAL_SIGNATURE => bun.String.static("key usage does not include digital signature"),
error.UNHANDLED_CRITICAL_CRL_EXTENSION => bun.String.static("unhandled critical CRL extension"),
error.INVALID_EXTENSION => bun.String.static("invalid or inconsistent certificate extension"),
error.INVALID_POLICY_EXTENSION => bun.String.static("invalid or inconsistent certificate policy extension"),
error.NO_EXPLICIT_POLICY => bun.String.static("no explicit policy"),
error.DIFFERENT_CRL_SCOPE => bun.String.static("Different CRL scope"),
error.UNSUPPORTED_EXTENSION_FEATURE => bun.String.static("Unsupported extension feature"),
error.UNNESTED_RESOURCE => bun.String.static("RFC 3779 resource not subset of parent's resources"),
error.PERMITTED_VIOLATION => bun.String.static("permitted subtree violation"),
error.EXCLUDED_VIOLATION => bun.String.static("excluded subtree violation"),
error.SUBTREE_MINMAX => bun.String.static("name constraints minimum and maximum not supported"),
error.UNSUPPORTED_CONSTRAINT_TYPE => bun.String.static("unsupported name constraint type"),
error.UNSUPPORTED_CONSTRAINT_SYNTAX => bun.String.static("unsupported or invalid name constraint syntax"),
error.UNSUPPORTED_NAME_SYNTAX => bun.String.static("unsupported or invalid name syntax"),
error.CRL_PATH_VALIDATION_ERROR => bun.String.static("CRL path validation error"),
error.SUITE_B_INVALID_VERSION => bun.String.static("Suite B: certificate version invalid"),
error.SUITE_B_INVALID_ALGORITHM => bun.String.static("Suite B: invalid public key algorithm"),
error.SUITE_B_INVALID_CURVE => bun.String.static("Suite B: invalid ECC curve"),
error.SUITE_B_INVALID_SIGNATURE_ALGORITHM => bun.String.static("Suite B: invalid signature algorithm"),
error.SUITE_B_LOS_NOT_ALLOWED => bun.String.static("Suite B: curve not allowed for this LOS"),
error.SUITE_B_CANNOT_SIGN_P_384_WITH_P_256 => bun.String.static("Suite B: cannot sign P-384 with P-256"),
error.HOSTNAME_MISMATCH => bun.String.static("Hostname mismatch"),
error.EMAIL_MISMATCH => bun.String.static("Email address mismatch"),
error.IP_ADDRESS_MISMATCH => bun.String.static("IP address mismatch"),
error.INVALID_CALL => bun.String.static("Invalid certificate verification context"),
error.STORE_LOOKUP => bun.String.static("Issuer certificate lookup error"),
error.NAME_CONSTRAINTS_WITHOUT_SANS => bun.String.static("Issuer has name constraints but leaf has no SANs"),
error.UNKNOWN_CERTIFICATE_VERIFICATION_ERROR => bun.String.static("unknown certificate verification error"),
else => |e| bun.String.createFormat("{s} fetching \"{}\". For more information, pass `verbose: true` in the second argument to fetch()", .{
@errorName(e),
path,
}) catch |err| bun.handleOom(err),
},
.path = path,
};
return .{ .SystemError = fetch_error };
}
pub fn onReadableStreamAvailable(ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void {
const this = bun.cast(*FetchTasklet, ctx);
this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis);
}
pub fn onStartStreamingHTTPResponseBodyCallback(ctx: *anyopaque) jsc.WebCore.DrainResult {
const this = bun.cast(*FetchTasklet, ctx);
if (this.signal_store.aborted.load(.monotonic)) {
return jsc.WebCore.DrainResult{
.aborted = {},
};
}
if (this.http) |http_| {
http_.enableResponseBodyStreaming();
// If the server sent the headers and the response body in two separate socket writes
// and if the server doesn't close the connection by itself
// and doesn't send any follow-up data
// then we must make sure the HTTP thread flushes.
bun.http.http_thread.scheduleResponseBodyDrain(http_.async_http_id);
}
this.mutex.lock();
defer this.mutex.unlock();
const size_hint = this.getSizeHint();
var scheduled_response_buffer = this.scheduled_response_buffer.list;
// This means we have received part of the body but not the whole thing
if (scheduled_response_buffer.items.len > 0) {
this.scheduled_response_buffer = .{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
};
return .{
.owned = .{
.list = scheduled_response_buffer.toManaged(bun.default_allocator),
.size_hint = size_hint,
},
};
}
return .{
.estimated_size = size_hint,
};
}
fn getSizeHint(this: *FetchTasklet) Blob.SizeType {
return switch (this.body_size) {
.content_length => @truncate(this.body_size.content_length),
.total_received => @truncate(this.body_size.total_received),
.unknown => 0,
};
}
fn toBodyValue(this: *FetchTasklet) Body.Value {
if (this.getAbortError()) |err| {
return .{ .Error = err };
}
if (this.is_waiting_body) {
const response = Body.Value{
.Locked = .{
.size_hint = this.getSizeHint(),
.task = this,
.global = this.global_this,
.onStartStreaming = FetchTasklet.onStartStreamingHTTPResponseBodyCallback,
.onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable,
},
};
return response;
}
var scheduled_response_buffer = this.scheduled_response_buffer.list;
const response = Body.Value{
.InternalBlob = .{
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
},
};
this.scheduled_response_buffer = .{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
};
return response;
}
fn toResponse(this: *FetchTasklet) Response {
log("toResponse", .{});
bun.assert(this.metadata != null);
// at this point we always should have metadata
const metadata = this.metadata.?;
const http_response = metadata.response;
this.is_waiting_body = this.result.has_more;
return Response.init(
.{
.headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
.status_code = @as(u16, @truncate(http_response.status_code)),
.status_text = bun.String.createAtomIfPossible(http_response.status),
},
Body{
.value = this.toBodyValue(),
},
bun.String.createAtomIfPossible(metadata.url),
this.result.redirected,
);
}
fn ignoreRemainingResponseBody(this: *FetchTasklet) void {
log("ignoreRemainingResponseBody", .{});
// enabling streaming will make the http thread to drain into the main thread (aka stop buffering)
// without a stream ref, response body or response instance alive it will just ignore the result
if (this.http) |http_| {
http_.enableResponseBodyStreaming();
}
// we should not keep the process alive if we are ignoring the body
const vm = this.javascript_vm;
this.poll_ref.unref(vm);
// clean any remaining refereces
this.readable_stream_ref.deinit();
this.response.deinit();
if (this.native_response) |response| {
response.unref();
this.native_response = null;
}
this.ignore_data = true;
}
/// Called when Response JS object is garbage collected.
/// This is our signal to stop processing body data.
export fn Bun__FetchResponse_finalize(this: *FetchTasklet) callconv(.C) void {
log("onResponseFinalize", .{});
// === ACQUIRE LOCK - Fix race condition ===
// The HTTP thread accesses shared state in callback(), so we must lock
this.mutex.lock();
defer this.mutex.unlock();
// Check if we have a native response to work with
if (this.native_response) |response| {
const body = response.getBodyValue();
// Three scenarios:
//
// 1. We are streaming, in which case we should not ignore the body.
// 2. We were buffering, in which case
// 2a. if we have no promise, we should ignore the body.
// 2b. if we have a promise, we should keep loading the body.
// 3. We never started buffering, in which case we should ignore the body.
//
// Note: We cannot call .get() on the ReadableStreamRef. This is called inside a finalizer.
if (body.* != .Locked or this.readable_stream_ref.held.has()) {
// Scenario 1 or 3.
return;
}
if (body.Locked.promise) |promise| {
if (!promise.isEmptyOrUndefinedOrNull()) {
// Scenario 2b - promise exists, keep loading
return;
}
}
// Scenario 2a or 3 - ignore remaining body
// Signal abort to HTTP thread (under lock)
this.signal_store.aborted.store(true, .release);
// Set ignore_data flag to stop buffering
this.ignore_data = true;
// Clear accumulated buffers since we're ignoring the rest
this.response_buffer.list.clearRetainingCapacity();
this.scheduled_response_buffer.list.clearRetainingCapacity();
// Enable streaming to drain remaining data without buffering
if (this.http) |http_| {
http_.enableResponseBodyStreaming();
}
}
}
comptime {
_ = Bun__FetchResponse_finalize;
}
pub fn onResolve(this: *FetchTasklet) JSValue {
log("onResolve", .{});
const response = bun.new(Response, this.toResponse());
const response_js = Response.makeMaybePooled(@as(*jsc.JSGlobalObject, this.global_this), response);
response_js.ensureStillAlive();
this.response = jsc.Weak(FetchTasklet).create(response_js, this.global_this, .FetchResponse, this);
this.native_response = response.ref();
return response_js;
}
pub fn get(
allocator: std.mem.Allocator,
globalThis: *jsc.JSGlobalObject,
fetch_options: *const FetchOptions,
promise: jsc.JSPromise.Strong,
) !*FetchTasklet {
var jsc_vm = globalThis.bunVM();
var fetch_tasklet = try allocator.create(FetchTasklet);
fetch_tasklet.* = .{
.mutex = .{},
.scheduled_response_buffer = .{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
},
.response_buffer = MutableString{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
},
.http = try allocator.create(http.AsyncHTTP),
.javascript_vm = jsc_vm,
.request_body = fetch_options.body,
.global_this = globalThis,
.promise = promise,
.request_headers = fetch_options.headers,
.url_proxy_buffer = fetch_options.url_proxy_buffer,
.signal = fetch_options.signal,
.hostname = fetch_options.hostname,
.tracker = jsc.Debugger.AsyncTaskTracker.init(jsc_vm),
.check_server_identity = fetch_options.check_server_identity,
.reject_unauthorized = fetch_options.reject_unauthorized,
.upgraded_connection = fetch_options.upgraded_connection,
};
fetch_tasklet.signals = fetch_tasklet.signal_store.to();
fetch_tasklet.tracker.didSchedule(globalThis);
if (fetch_tasklet.request_body.store()) |store| {
store.ref();
}
var proxy: ?ZigURL = null;
if (fetch_options.proxy) |proxy_opt| {
if (!proxy_opt.isEmpty()) { //if is empty just ignore proxy
proxy = fetch_options.proxy orelse jsc_vm.transpiler.env.getHttpProxyFor(fetch_options.url);
}
} else {
proxy = jsc_vm.transpiler.env.getHttpProxyFor(fetch_options.url);
}
if (fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized) {
fetch_tasklet.signal_store.cert_errors.store(true, .monotonic);
} else {
fetch_tasklet.signals.cert_errors = null;
}
// This task gets queued on the HTTP thread.
fetch_tasklet.http.?.* = http.AsyncHTTP.init(
bun.default_allocator,
fetch_options.method,
fetch_options.url,
fetch_options.headers.entries,
fetch_options.headers.buf.items,
&fetch_tasklet.response_buffer,
fetch_tasklet.request_body.slice(),
http.HTTPClientResult.Callback.New(
*FetchTasklet,
// handles response events (on headers, on body, etc.)
FetchTasklet.callback,
).init(fetch_tasklet),
fetch_options.redirect_type,
.{
.http_proxy = proxy,
.hostname = fetch_options.hostname,
.signals = fetch_tasklet.signals,
.unix_socket_path = fetch_options.unix_socket_path,
.disable_timeout = fetch_options.disable_timeout,
.disable_keepalive = fetch_options.disable_keepalive,
.disable_decompression = fetch_options.disable_decompression,
.reject_unauthorized = fetch_options.reject_unauthorized,
.verbose = fetch_options.verbose,
.tls_props = fetch_options.ssl_config,
},
);
// enable streaming the write side
const isStream = fetch_tasklet.request_body == .ReadableStream;
fetch_tasklet.http.?.client.flags.is_streaming_request_body = isStream;
fetch_tasklet.is_waiting_request_stream_start = isStream;
if (isStream) {
const buffer = http.ThreadSafeStreamBuffer.new(.{});
buffer.setDrainCallback(FetchTasklet, FetchTasklet.onWriteRequestDataDrain, fetch_tasklet);
fetch_tasklet.request_body_streaming_buffer = buffer;
fetch_tasklet.http.?.request_body = .{
.stream = .{
.buffer = buffer,
.ended = false,
},
};
}
// TODO is this necessary? the http client already sets the redirect type,
// so manually setting it here seems redundant
if (fetch_options.redirect_type != FetchRedirect.follow) {
fetch_tasklet.http.?.client.remaining_redirect_count = 0;
}
// we want to return after headers are received
fetch_tasklet.signal_store.header_progress.store(true, .monotonic);
if (fetch_tasklet.request_body == .Sendfile) {
bun.assert(fetch_options.url.isHTTP());
bun.assert(fetch_options.proxy == null);
fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile };
}
if (fetch_tasklet.signal) |signal| {
signal.pendingActivityRef();
fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener);
}
return fetch_tasklet;
}
pub fn abortListener(this: *FetchTasklet, reason: JSValue) void {
log("abortListener", .{});
reason.ensureStillAlive();
this.abort_reason.set(this.global_this, reason);
this.abortTask();
if (this.sink) |sink| {
sink.cancel(reason);
return;
}
}
/// This is ALWAYS called from the http thread and we cannot touch the buffer here because is locked
pub fn onWriteRequestDataDrain(this: *FetchTasklet) void {
// ref until the main thread callback is called
this.ref();
this.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.resumeRequestDataStream));
}
/// This is ALWAYS called from the main thread
// XXX: 'fn (*FetchTasklet) error{}!void' coerces to 'fn (*FetchTasklet) bun.JSError!void' but 'fn (*FetchTasklet) void' does not
pub fn resumeRequestDataStream(this: *FetchTasklet) error{}!void {
// deref when done because we ref inside onWriteRequestDataDrain
defer this.deref();
log("resumeRequestDataStream", .{});
if (this.sink) |sink| {
if (this.signal) |signal| {
if (signal.aborted()) {
// already aborted; nothing to drain
return;
}
}
sink.drain();
}
}
pub fn writeRequestData(this: *FetchTasklet, data: []const u8) ResumableSinkBackpressure {
log("writeRequestData {}", .{data.len});
if (this.signal) |signal| {
if (signal.aborted()) {
return .done;
}
}
const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return .done;
const stream_buffer = thread_safe_stream_buffer.acquire();
defer thread_safe_stream_buffer.release();
const highWaterMark = if (this.sink) |sink| sink.highWaterMark else 16384;
var needs_schedule = false;
defer if (needs_schedule) {
// wakeup the http thread to write the data
http.http_thread.scheduleRequestWrite(this.http.?, .data);
};
// dont have backpressure so we will schedule the data to be written
// if we have backpressure the onWritable will drain the buffer
needs_schedule = stream_buffer.isEmpty();
if (this.upgraded_connection) {
bun.handleOom(stream_buffer.write(data));
} else {
//16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n
var formated_size_buffer: [18]u8 = undefined;
const formated_size = std.fmt.bufPrint(
formated_size_buffer[0..],
"{x}\r\n",
.{data.len},
) catch |err| switch (err) {
error.NoSpaceLeft => unreachable,
};
bun.handleOom(stream_buffer.ensureUnusedCapacity(formated_size.len + data.len + 2));
stream_buffer.writeAssumeCapacity(formated_size);
stream_buffer.writeAssumeCapacity(data);
stream_buffer.writeAssumeCapacity("\r\n");
}
// pause the stream if we hit the high water mark
return if (stream_buffer.size() >= highWaterMark) .backpressure else .want_more;
}
pub fn writeEndRequest(this: *FetchTasklet, err: ?jsc.JSValue) void {
log("writeEndRequest hasError? {}", .{err != null});
defer this.deref();
if (err) |jsError| {
if (this.signal_store.aborted.load(.monotonic) or this.abort_reason.has()) {
return;
}
if (!jsError.isUndefinedOrNull()) {
this.abort_reason.set(this.global_this, jsError);
}
this.abortTask();
} else {
if (!this.upgraded_connection) {
// If is not upgraded we need to send the terminating chunk
const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return;
const stream_buffer = thread_safe_stream_buffer.acquire();
defer thread_safe_stream_buffer.release();
bun.handleOom(stream_buffer.write(http.end_of_chunked_http1_1_encoding_response_body));
}
if (this.http) |http_| {
// just tell to write the end of the chunked encoding aka 0\r\n\r\n
http.http_thread.scheduleRequestWrite(http_, .end);
}
}
}
pub fn abortTask(this: *FetchTasklet) void {
this.signal_store.aborted.store(true, .monotonic);
this.tracker.didCancel(this.global_this);
if (this.http) |http_| {
http.http_thread.scheduleShutdown(http_);
}
}
const FetchOptions = struct {
method: Method,
headers: Headers,
body: HTTPRequestBody,
disable_timeout: bool,
disable_keepalive: bool,
disable_decompression: bool,
reject_unauthorized: bool,
url: ZigURL,
verbose: http.HTTPVerboseLevel = .none,
redirect_type: FetchRedirect = FetchRedirect.follow,
proxy: ?ZigURL = null,
url_proxy_buffer: []const u8 = "",
signal: ?*jsc.WebCore.AbortSignal = null,
globalThis: ?*JSGlobalObject,
// Custom Hostname
hostname: ?[]u8 = null,
check_server_identity: jsc.Strong.Optional = .empty,
unix_socket_path: ZigString.Slice,
ssl_config: ?*SSLConfig = null,
upgraded_connection: bool = false,
};
pub fn queue(
allocator: std.mem.Allocator,
global: *JSGlobalObject,
fetch_options: *const FetchOptions,
promise: jsc.JSPromise.Strong,
) !*FetchTasklet {
http.HTTPThread.init(&.{});
var node = try get(
allocator,
global,
fetch_options,
promise,
);
var batch = bun.ThreadPool.Batch{};
node.http.?.schedule(allocator, &batch);
node.poll_ref.ref(global.bunVM());
// increment ref so we can keep it alive until the http client is done
node.ref();
http.http_thread.schedule(batch);
return node;
}
/// Called from HTTP thread. Handles HTTP events received from socket.
pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void {
// at this point only this thread is accessing result to is no race condition
const is_done = !result.has_more;
// we are done with the http client so we can deref our side
// this is a atomic operation and will enqueue a task to deinit on the main thread
defer if (is_done) task.derefFromThread();
task.mutex.lock();
// we need to unlock before task.deref();
defer task.mutex.unlock();
task.http.?.* = async_http.*;
task.http.?.response_buffer = async_http.response_buffer;
log("callback success={} ignore_data={} has_more={} bytes={}", .{ result.isSuccess(), task.ignore_data, result.has_more, result.body.?.list.items.len });
const prev_metadata = task.result.metadata;
const prev_cert_info = task.result.certificate_info;
task.result = result;
// Preserve pending certificate info if it was preovided in the previous update.
if (task.result.certificate_info == null) {
if (prev_cert_info) |cert_info| {
task.result.certificate_info = cert_info;
}
}
// metadata should be provided only once
if (result.metadata orelse prev_metadata) |metadata| {
log("added callback metadata", .{});
if (task.metadata == null) {
task.metadata = metadata;
}
task.result.metadata = null;
}
task.body_size = result.body_size;
const success = result.isSuccess();
task.response_buffer = result.body.?.*;
if (task.ignore_data) {
task.response_buffer.reset();
if (task.scheduled_response_buffer.list.capacity > 0) {
task.scheduled_response_buffer.deinit();
task.scheduled_response_buffer = .{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
};
}
if (success and result.has_more) {
// we are ignoring the body so we should not receive more data, so will only signal when result.has_more = true
return;
}
} else {
if (success) {
_ = bun.handleOom(task.scheduled_response_buffer.write(task.response_buffer.list.items));
}
// reset for reuse
task.response_buffer.reset();
}
if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| {
if (has_schedule_callback) {
return;
}
}
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
}
};
const X509 = @import("../../api/bun/x509.zig");
const std = @import("std");
const Method = @import("../../../http/Method.zig").Method;
const ZigURL = @import("../../../url.zig").URL;
const bun = @import("bun");
const Async = bun.Async;
const MutableString = bun.MutableString;
const Mutex = bun.Mutex;
const Output = bun.Output;
const BoringSSL = bun.BoringSSL.c;
const FetchHeaders = bun.webcore.FetchHeaders;
const SSLConfig = bun.api.server.ServerConfig.SSLConfig;
const http = bun.http;
const FetchRedirect = http.FetchRedirect;
const Headers = bun.http.Headers;
const jsc = bun.jsc;
const JSGlobalObject = jsc.JSGlobalObject;
const JSPromise = jsc.JSPromise;
const JSValue = jsc.JSValue;
const VirtualMachine = jsc.VirtualMachine;
const ZigString = jsc.ZigString;
const Body = jsc.WebCore.Body;
const Response = jsc.WebCore.Response;
const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure;
const Blob = jsc.WebCore.Blob;
const AnyBlob = jsc.WebCore.Blob.Any;