mirror of
https://github.com/oven-sh/bun
synced 2026-02-06 08:58:52 +00:00
Compare commits
7 Commits
ali/react
...
claude/fet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fcee1d344 | ||
|
|
b810d9f2ad | ||
|
|
27ff770bb8 | ||
|
|
9715f88f2e | ||
|
|
b16253807a | ||
|
|
d65d2b688c | ||
|
|
c72291b3d5 |
@@ -62,30 +62,272 @@ pub const FetchTasklet = struct {
|
||||
pub const ResumableSink = jsc.WebCore.ResumableFetchSink;
|
||||
|
||||
const log = Output.scoped(.FetchTasklet, .visible);
|
||||
|
||||
/// State machine for explicit lifecycle tracking
|
||||
pub const State = enum {
|
||||
Scheduled, // queued on HTTP thread; no headers yet
|
||||
HaveHeaders, // metadata set; decide buffering vs streaming
|
||||
Streaming, // streaming to JS ReadableStream or sink
|
||||
Buffering, // buffering body in-memory (promise pending)
|
||||
Completed, // success (body fully delivered)
|
||||
Aborted, // user abort or cert/transport failure
|
||||
Ignored, // response finalized & body intentionally dropped
|
||||
Destroying, // final main-thread teardown is scheduled
|
||||
};
|
||||
|
||||
/// Helper to prevent duplicate event loop callbacks
|
||||
pub const ScheduleGuard = struct {
|
||||
flag: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
|
||||
pub fn trySet(self: *ScheduleGuard) bool {
|
||||
return self.flag.cmpxchgStrong(false, true, .acquire, .monotonic) == null;
|
||||
}
|
||||
|
||||
pub fn clear(self: *ScheduleGuard) void {
|
||||
self.flag.store(false, .monotonic);
|
||||
}
|
||||
};
|
||||
|
||||
/// Abort signal state with single owner semantics
|
||||
pub const AbortState = struct {
|
||||
signal: ?*jsc.WebCore.AbortSignal = null,
|
||||
|
||||
pub fn attach(self: *AbortState, signal: *jsc.WebCore.AbortSignal) void {
|
||||
self.signal = signal;
|
||||
signal.pendingActivityRef();
|
||||
}
|
||||
|
||||
pub fn takeReason(self: *AbortState, global: *JSGlobalObject) ?Body.Value.ValueError {
|
||||
defer if (self.signal) |s| {
|
||||
s.pendingActivityUnref();
|
||||
s.unref();
|
||||
self.signal = null;
|
||||
};
|
||||
if (self.signal) |s| {
|
||||
if (s.reasonIfAborted(global)) |r| {
|
||||
return Body.Value.ValueError{ .JSValue = r };
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn clear(self: *AbortState) void {
|
||||
if (self.signal) |s| {
|
||||
s.pendingActivityUnref();
|
||||
s.unref();
|
||||
self.signal = null;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// JS-side owned resources (main-thread only)
|
||||
pub const JsRefs = struct {
|
||||
global_this: *JSGlobalObject,
|
||||
vm: *VirtualMachine,
|
||||
promise: jsc.JSPromise.Strong,
|
||||
response_weak: jsc.Weak(FetchTasklet) = .{},
|
||||
native_response: ?*Response = null,
|
||||
readable_stream: jsc.WebCore.ReadableStream.Strong = .{},
|
||||
sink: ?*ResumableSink = null,
|
||||
abort_reason: jsc.Strong.Optional = .empty,
|
||||
check_server_identity: jsc.Strong.Optional = .empty,
|
||||
poll_ref: Async.KeepAlive = .{},
|
||||
tracker: jsc.Debugger.AsyncTaskTracker,
|
||||
|
||||
pub fn deinit(self: *JsRefs) void {
|
||||
if (self.sink) |s| {
|
||||
self.sink = null;
|
||||
s.deref();
|
||||
}
|
||||
self.readable_stream.deinit();
|
||||
self.response_weak.deinit();
|
||||
if (self.native_response) |r| {
|
||||
self.native_response = null;
|
||||
r.unref();
|
||||
}
|
||||
self.abort_reason.deinit();
|
||||
self.check_server_identity.deinit();
|
||||
// poll_ref unref is done by owner on terminal transition
|
||||
}
|
||||
};
|
||||
|
||||
/// Network-side owned resources (http-thread owner, dropped on main thread)
|
||||
pub const NetRefs = struct {
|
||||
http: ?*http.AsyncHTTP = null,
|
||||
req_stream_buf: ?*http.ThreadSafeStreamBuffer = null,
|
||||
|
||||
pub fn deinit(self: *NetRefs) void {
|
||||
if (self.req_stream_buf) |b| {
|
||||
self.req_stream_buf = null;
|
||||
b.clearDrainCallback();
|
||||
b.deref();
|
||||
}
|
||||
if (self.http) |h| {
|
||||
self.http = null;
|
||||
bun.default_allocator.destroy(h);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Buffers and metadata owned by task
|
||||
pub const Buffers = struct {
|
||||
// bytes HTTP->JS
|
||||
scheduled: MutableString,
|
||||
scratch: MutableString,
|
||||
|
||||
// metadata & allocs owned by task
|
||||
metadata: ?http.HTTPResponseMetadata = null,
|
||||
url_proxy: []const u8 = "",
|
||||
hostname: ?[]u8 = null,
|
||||
request_headers: Headers = Headers{ .allocator = undefined },
|
||||
|
||||
pub fn deinit(self: *Buffers) void {
|
||||
if (self.url_proxy.len > 0) {
|
||||
bun.default_allocator.free(self.url_proxy);
|
||||
self.url_proxy = "";
|
||||
}
|
||||
if (self.hostname) |hn| {
|
||||
bun.default_allocator.free(hn);
|
||||
self.hostname = null;
|
||||
}
|
||||
if (self.metadata) |*m| {
|
||||
m.deinit(bun.default_allocator);
|
||||
self.metadata = null;
|
||||
}
|
||||
self.request_headers.entries.deinit(bun.default_allocator);
|
||||
self.request_headers.buf.deinit(bun.default_allocator);
|
||||
self.scheduled.deinit();
|
||||
self.scratch.deinit();
|
||||
}
|
||||
};
|
||||
|
||||
/// Request body with move-once semantics
|
||||
pub const RequestBodyOwner = union(enum) {
|
||||
None,
|
||||
AnyBlob: AnyBlob,
|
||||
Sendfile: http.SendFile,
|
||||
ReadableStream: jsc.WebCore.ReadableStream.Strong,
|
||||
|
||||
pub fn moveToSink(self: *RequestBodyOwner, sink: *ResumableSink) void {
|
||||
if (self.* == .ReadableStream) {
|
||||
// sink takes ownership; nothing else may detach later
|
||||
_ = sink;
|
||||
// TODO: implement sink.adoptReadableStream when ready
|
||||
self.* = .None;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn moveToHttp(self: *RequestBodyOwner, http_: *http.AsyncHTTP) void {
|
||||
switch (self.*) {
|
||||
.AnyBlob => |b| http_.request_body = .{ .bytes = b.slice() },
|
||||
.Sendfile => |sf| http_.request_body = .{ .sendfile = sf },
|
||||
.ReadableStream => {
|
||||
// HTTP will pull from ThreadSafeStreamBuffer; stream stays None here
|
||||
},
|
||||
.None => {},
|
||||
}
|
||||
// After move, task doesn't own/detach it anymore
|
||||
self.* = .None;
|
||||
}
|
||||
|
||||
pub fn deinit(self: *RequestBodyOwner) void {
|
||||
switch (self.*) {
|
||||
.AnyBlob => |*b| b.detach(),
|
||||
.Sendfile => |*sf| {
|
||||
if (@max(sf.offset, sf.remain) > 0) sf.fd.close();
|
||||
sf.offset = 0;
|
||||
sf.remain = 0;
|
||||
},
|
||||
.ReadableStream => |*s| s.deinit(),
|
||||
.None => {},
|
||||
}
|
||||
self.* = .None;
|
||||
}
|
||||
|
||||
// Helper methods to maintain compatibility with existing code
|
||||
pub fn fromHTTPRequestBody(body: HTTPRequestBody) RequestBodyOwner {
|
||||
return switch (body) {
|
||||
.AnyBlob => |b| .{ .AnyBlob = b },
|
||||
.Sendfile => |sf| .{ .Sendfile = sf },
|
||||
.ReadableStream => |s| .{ .ReadableStream = s },
|
||||
};
|
||||
}
|
||||
|
||||
pub fn needsToReadFile(self: *const RequestBodyOwner) bool {
|
||||
return switch (self.*) {
|
||||
.AnyBlob => |*blob| blob.needsToReadFile(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn isS3(self: *const RequestBodyOwner) bool {
|
||||
return switch (self.*) {
|
||||
.AnyBlob => |*blob| blob.isS3(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasContentTypeFromUser(self: *const RequestBodyOwner) bool {
|
||||
return switch (self.*) {
|
||||
.AnyBlob => |*blob| blob.hasContentTypeFromUser(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn slice(self: *const RequestBodyOwner) []const u8 {
|
||||
return switch (self.*) {
|
||||
.AnyBlob => |*blob| blob.slice(),
|
||||
else => "",
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasBody(self: *const RequestBodyOwner) bool {
|
||||
return switch (self.*) {
|
||||
.AnyBlob => |*blob| blob.size() > 0,
|
||||
.ReadableStream => |*stream| stream.has(),
|
||||
.Sendfile => true,
|
||||
.None => false,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// ---- Refactored ownership groups ----
|
||||
js: JsRefs = undefined,
|
||||
net: NetRefs = undefined,
|
||||
buffers: Buffers = undefined,
|
||||
req_body: RequestBodyOwner = .None,
|
||||
abort_state: AbortState = .{},
|
||||
state: State = .Scheduled,
|
||||
schedule_guard: ScheduleGuard = .{},
|
||||
|
||||
// ---- OLD fields retained for compatibility during migration ----
|
||||
// TODO: Remove these as we migrate all accessors to use bags
|
||||
sink: ?*ResumableSink = null,
|
||||
http: ?*http.AsyncHTTP = null,
|
||||
result: http.HTTPClientResult = .{},
|
||||
metadata: ?http.HTTPResponseMetadata = null,
|
||||
javascript_vm: *VirtualMachine = undefined,
|
||||
global_this: *JSGlobalObject = undefined,
|
||||
request_body: HTTPRequestBody = undefined,
|
||||
request_body_streaming_buffer: ?*http.ThreadSafeStreamBuffer = null,
|
||||
|
||||
/// buffer being used by AsyncHTTP
|
||||
/// buffer being used by AsyncHTTP (kept for now, will move to buffers.scratch)
|
||||
response_buffer: MutableString = undefined,
|
||||
/// buffer used to stream response to JS
|
||||
/// buffer used to stream response to JS (maps to buffers.scheduled)
|
||||
scheduled_response_buffer: MutableString = undefined,
|
||||
/// response weak ref we need this to track the response JS lifetime
|
||||
/// response weak ref (maps to js.response_weak)
|
||||
response: jsc.Weak(FetchTasklet) = .{},
|
||||
/// native response ref if we still need it when JS is discarted
|
||||
/// native response ref (maps to js.native_response)
|
||||
native_response: ?*Response = null,
|
||||
ignore_data: bool = false,
|
||||
/// stream strong ref if any is available
|
||||
/// stream strong ref (maps to js.readable_stream)
|
||||
readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{},
|
||||
request_headers: Headers = Headers{ .allocator = undefined },
|
||||
promise: jsc.JSPromise.Strong,
|
||||
concurrent_task: jsc.ConcurrentTask = .{},
|
||||
poll_ref: Async.KeepAlive = .{},
|
||||
|
||||
// Shared fields (not in bags)
|
||||
result: http.HTTPClientResult = .{},
|
||||
metadata: ?http.HTTPResponseMetadata = null,
|
||||
javascript_vm: *VirtualMachine = undefined,
|
||||
global_this: *JSGlobalObject = undefined,
|
||||
/// For Http Client requests
|
||||
/// when Content-Length is provided this represents the whole size of the request
|
||||
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
|
||||
@@ -240,6 +482,23 @@ pub const FetchTasklet = struct {
|
||||
return FetchTasklet{};
|
||||
}
|
||||
|
||||
fn setState(this: *FetchTasklet, next: State) void {
|
||||
// Debug builds assert legal transitions
|
||||
if (Environment.isDebug) {
|
||||
switch (this.state) {
|
||||
.Scheduled => bun.assert(next == .HaveHeaders or next == .Aborted or next == .Destroying),
|
||||
.HaveHeaders => bun.assert(next == .Streaming or next == .Buffering or next == .Aborted or next == .Ignored or next == .Destroying),
|
||||
.Streaming => bun.assert(next == .Completed or next == .Aborted or next == .Ignored or next == .Destroying),
|
||||
.Buffering => bun.assert(next == .Completed or next == .Aborted or next == .Ignored or next == .Destroying),
|
||||
.Completed => bun.assert(next == .Destroying),
|
||||
.Aborted, .Ignored => bun.assert(next == .Destroying),
|
||||
.Destroying => {}, // terminal
|
||||
}
|
||||
}
|
||||
log("setState: {s} -> {s}", .{ @tagName(this.state), @tagName(next) });
|
||||
this.state = next;
|
||||
}
|
||||
|
||||
fn clearSink(this: *FetchTasklet) void {
|
||||
if (this.sink) |sink| {
|
||||
this.sink = null;
|
||||
@@ -310,16 +569,39 @@ pub const FetchTasklet = struct {
|
||||
log("deinit", .{});
|
||||
|
||||
bun.assert(this.ref_count.load(.monotonic) == 0);
|
||||
this.setState(.Destroying);
|
||||
|
||||
this.clearData();
|
||||
|
||||
const allocator = bun.default_allocator;
|
||||
|
||||
if (this.http) |http_| {
|
||||
this.http = null;
|
||||
allocator.destroy(http_);
|
||||
// Clear certificate info first (not in bags)
|
||||
if (this.result.certificate_info) |*certificate| {
|
||||
certificate.deinit(bun.default_allocator);
|
||||
this.result.certificate_info = null;
|
||||
}
|
||||
allocator.destroy(this);
|
||||
|
||||
// Clear old response_buffer (will be migrated to buffers.scratch later)
|
||||
this.response_buffer.deinit();
|
||||
|
||||
// Deinit request body owner
|
||||
this.req_body.deinit();
|
||||
|
||||
// Deinit ownership bags (single drop point for each domain)
|
||||
this.buffers.deinit();
|
||||
this.net.deinit();
|
||||
this.js.deinit();
|
||||
|
||||
// Unref poll_ref (done here to avoid process-keepalive leaks)
|
||||
var poll_ref = this.poll_ref;
|
||||
this.poll_ref = .{};
|
||||
poll_ref.unref(this.javascript_vm);
|
||||
|
||||
// Clear abort signal
|
||||
this.abort_state.clear();
|
||||
if (this.signal) |signal| {
|
||||
signal.pendingActivityUnref();
|
||||
signal.unref();
|
||||
this.signal = null;
|
||||
}
|
||||
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
fn getCurrentResponse(this: *FetchTasklet) ?*Response {
|
||||
@@ -505,7 +787,7 @@ pub const FetchTasklet = struct {
|
||||
jsc.markBinding(@src());
|
||||
log("onProgressUpdate", .{});
|
||||
this.mutex.lock();
|
||||
this.has_schedule_callback.store(false, .monotonic);
|
||||
this.schedule_guard.clear();
|
||||
const is_done = !this.result.has_more;
|
||||
|
||||
const vm = this.javascript_vm;
|
||||
@@ -589,6 +871,11 @@ pub const FetchTasklet = struct {
|
||||
this.promise.deinit();
|
||||
}
|
||||
const success = this.result.isSuccess();
|
||||
|
||||
// Transition to Completed state on success, stay in current state on error
|
||||
if (success and is_done) {
|
||||
this.setState(.Completed);
|
||||
}
|
||||
const result = switch (success) {
|
||||
true => jsc.Strong.Optional.create(this.onResolve(), globalThis),
|
||||
false => brk: {
|
||||
@@ -853,6 +1140,11 @@ pub const FetchTasklet = struct {
|
||||
pub fn onReadableStreamAvailable(ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void {
|
||||
const this = bun.cast(*FetchTasklet, ctx);
|
||||
this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis);
|
||||
|
||||
// Transition to Streaming state when ReadableStream is created
|
||||
if (this.state == .HaveHeaders) {
|
||||
this.setState(.Streaming);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onStartStreamingHTTPResponseBodyCallback(ctx: *anyopaque) jsc.WebCore.DrainResult {
|
||||
@@ -950,6 +1242,16 @@ pub const FetchTasklet = struct {
|
||||
const metadata = this.metadata.?;
|
||||
const http_response = metadata.response;
|
||||
this.is_waiting_body = this.result.has_more;
|
||||
|
||||
// Transition to appropriate state based on body handling
|
||||
if (this.is_waiting_body) {
|
||||
// We're either buffering or streaming - will be determined in onResolve
|
||||
// For now, mark that we have headers
|
||||
if (this.state == .Scheduled) {
|
||||
this.setState(.HaveHeaders);
|
||||
}
|
||||
}
|
||||
|
||||
return Response.init(
|
||||
.{
|
||||
.headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
|
||||
@@ -966,6 +1268,10 @@ pub const FetchTasklet = struct {
|
||||
|
||||
fn ignoreRemainingResponseBody(this: *FetchTasklet) void {
|
||||
log("ignoreRemainingResponseBody", .{});
|
||||
|
||||
// Transition to Ignored state
|
||||
this.setState(.Ignored);
|
||||
|
||||
// enabling streaming will make the http thread to drain into the main thread (aka stop buffering)
|
||||
// without a stream ref, response body or response instance alive it will just ignore the result
|
||||
if (this.http) |http_| {
|
||||
@@ -983,6 +1289,7 @@ pub const FetchTasklet = struct {
|
||||
this.native_response = null;
|
||||
}
|
||||
|
||||
// Note: ignore_data flag kept for now for HTTP callback compatibility
|
||||
this.ignore_data = true;
|
||||
}
|
||||
|
||||
@@ -1039,6 +1346,39 @@ pub const FetchTasklet = struct {
|
||||
var fetch_tasklet = try allocator.create(FetchTasklet);
|
||||
|
||||
fetch_tasklet.* = .{
|
||||
// Initialize ownership bags
|
||||
.js = .{
|
||||
.global_this = globalThis,
|
||||
.vm = jsc_vm,
|
||||
.promise = promise,
|
||||
.tracker = jsc.Debugger.AsyncTaskTracker.init(jsc_vm),
|
||||
.check_server_identity = fetch_options.check_server_identity,
|
||||
},
|
||||
.net = .{
|
||||
.http = try allocator.create(http.AsyncHTTP),
|
||||
},
|
||||
.buffers = .{
|
||||
.scheduled = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
},
|
||||
.scratch = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
},
|
||||
.url_proxy = fetch_options.url_proxy_buffer,
|
||||
.hostname = fetch_options.hostname,
|
||||
.request_headers = fetch_options.headers,
|
||||
},
|
||||
.req_body = RequestBodyOwner.fromHTTPRequestBody(fetch_options.body),
|
||||
|
||||
// Old fields for compatibility
|
||||
.mutex = .{},
|
||||
.scheduled_response_buffer = .{
|
||||
.allocator = bun.default_allocator,
|
||||
@@ -1262,6 +1602,9 @@ pub const FetchTasklet = struct {
|
||||
}
|
||||
|
||||
pub fn abortTask(this: *FetchTasklet) void {
|
||||
// Transition to Aborted state
|
||||
this.setState(.Aborted);
|
||||
|
||||
this.signal_store.aborted.store(true, .monotonic);
|
||||
this.tracker.didCancel(this.global_this);
|
||||
|
||||
@@ -1360,7 +1703,10 @@ pub const FetchTasklet = struct {
|
||||
const success = result.isSuccess();
|
||||
task.response_buffer = result.body.?.*;
|
||||
|
||||
if (task.ignore_data) {
|
||||
// Check state instead of ignore_data flag (though flag is kept for now for compatibility)
|
||||
const should_ignore = task.state == .Ignored or task.ignore_data;
|
||||
|
||||
if (should_ignore) {
|
||||
task.response_buffer.reset();
|
||||
|
||||
if (task.scheduled_response_buffer.list.capacity > 0) {
|
||||
@@ -1385,10 +1731,9 @@ pub const FetchTasklet = struct {
|
||||
task.response_buffer.reset();
|
||||
}
|
||||
|
||||
if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| {
|
||||
if (has_schedule_callback) {
|
||||
return;
|
||||
}
|
||||
// Use schedule_guard to prevent duplicate callbacks
|
||||
if (!task.schedule_guard.trySet()) {
|
||||
return;
|
||||
}
|
||||
|
||||
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
|
||||
|
||||
Reference in New Issue
Block a user