This commit is contained in:
Ciro Spaciari
2025-11-10 15:42:15 -08:00
parent 27d69e91a8
commit feefe6a7d0
4 changed files with 603 additions and 576 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -42,14 +42,14 @@ pub const HTTPRequestBody = union(enum) {
if (body_value == .Locked) {
if (body_value.Locked.readable.has()) {
// just grab the ref
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
return HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
}
const readable = try body_value.toReadableStream(globalThis);
if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) {
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
return HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
}
}
return FetchTasklet.HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() };
return HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() };
}
pub fn needsToReadFile(this: *HTTPRequestBody) bool {

View File

@@ -21,6 +21,11 @@ state: enum {
failed,
done,
} = .created,
fn parent(this: *FetchTaskletRequest) *FetchTasklet {
return @fieldParentPtr("request", this);
}
pub fn startRequestStream(this: *FetchTaskletRequest) void {
this.is_waiting_request_stream_start = false;
bun.assert(this.request_body == .ReadableStream);
@@ -108,6 +113,21 @@ pub fn writeEndRequest(this: *FetchTaskletRequest, err: ?jsc.JSValue) void {
}
}
/// This is ALWAYS called from the main thread
pub fn resumeRequestDataStream(this: *FetchTaskletRequest) void {
// deref when done because we ref inside onWriteRequestDataDrain
const tasklet = this.parent();
defer tasklet.deref();
if (tasklet.isAborted()) {
// already aborted; nothing to drain
return;
}
log("resumeRequestDataStream", .{});
if (this.sink) |sink| {
sink.drain();
}
}
pub fn deinit(this: *FetchTaskletRequest) void {
if (this.request_body) |body| {
body.detach();
@@ -128,3 +148,4 @@ const Headers = http.Headers;
const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure;
const ResumableSink = jsc.WebCore.ResumableFetchSink;
const log = bun.Output.scoped(.FetchTaskletRequest, .visible);
const FetchTasklet = @import("../FetchTasklet.zig").FetchTasklet;

View File

@@ -5,13 +5,25 @@ ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
/// buffer being used by AsyncHTTP
response_buffer: bun.MutableString = undefined,
result: bun.http.HTTPClientResult = .{},
signals: bun.http.Signals = .{},
signal_store: bun.http.Signals.Store = .{},
has_schedule_callback: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
fn parent(this: *SharedData) *FetchTasklet {
return @fieldParentPtr("shared", this);
}
/// This is ALWAYS called from the http thread and we cannot touch the buffer here because is locked
pub fn onWriteRequestDataDrain(this: *FetchTasklet) void {
pub fn onWriteRequestDataDrain(this: *SharedData) void {
// ref until the main thread callback is called
this.ref();
this.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.resumeRequestDataStream));
const tasklet = this.parent();
tasklet.ref();
tasklet.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(&tasklet.request, tasklet.request.resumeRequestDataStream));
}
const std = @import("std");
const bun = @import("bun");
const jsc = bun.jsc;
const FetchTasklet = @import("../FetchTasklet.zig");