This commit is contained in:
Ciro Spaciari
2025-11-10 16:18:07 -08:00
parent feefe6a7d0
commit ccda3dbff9
4 changed files with 24 additions and 17 deletions

View File

@@ -437,7 +437,7 @@ pub fn get(
fetch_tasklet.is_waiting_request_stream_start = isStream;
if (isStream) {
const buffer = http.ThreadSafeStreamBuffer.new(.{});
buffer.setDrainCallback(FetchTasklet, FetchTasklet.onWriteRequestDataDrain, fetch_tasklet);
buffer.setDrainCallback(FetchTaskletSharedData, FetchTaskletSharedData.resumeRequestDataStream, &fetch_tasklet.shared);
fetch_tasklet.request_body_streaming_buffer = buffer;
fetch_tasklet.http.?.request_body = .{
.stream = .{

View File

@@ -1,6 +1,5 @@
const FetchTaskletRequest = @This();
request_body: ?HTTPRequestBody = null,
request_body_streaming_buffer: ?*http.ThreadSafeStreamBuffer = null,
request_headers: Headers = Headers{ .allocator = undefined },
sink: ?*ResumableSink = null,
metadata: ?http.HTTPResponseMetadata = null,
@@ -29,16 +28,15 @@ fn parent(this: *FetchTaskletRequest) *FetchTasklet {
pub fn startRequestStream(this: *FetchTaskletRequest) void {
this.is_waiting_request_stream_start = false;
bun.assert(this.request_body == .ReadableStream);
const tasklet = this.parent();
if (this.request_body.ReadableStream.get(this.global_this)) |stream| {
if (this.signal) |signal| {
if (signal.aborted()) {
stream.abort(this.global_this);
return;
}
const globalThis = tasklet.global_this;
if (tasklet.isAborted()) {
stream.abort(globalThis);
return;
}
const globalThis = this.global_this;
this.ref(); // lets only unref when sink is done
tasklet.ref(); // lets only unref when sink is done
// +1 because the task refs the sink
const sink = ResumableSink.initExactRefs(globalThis, stream, this, 2);
this.sink = sink;
@@ -46,12 +44,11 @@ pub fn startRequestStream(this: *FetchTaskletRequest) void {
}
pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableSinkBackpressure {
log("writeRequestData {}", .{data.len});
if (this.signal) |signal| {
if (signal.aborted()) {
return .done;
}
const tasklet = this.parent();
if (tasklet.isAborted()) {
return .done;
}
const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return .done;
const thread_safe_stream_buffer = tasklet.shared.request_body_streaming_buffer orelse return .done;
const stream_buffer = thread_safe_stream_buffer.acquire();
defer thread_safe_stream_buffer.release();
const highWaterMark = if (this.sink) |sink| sink.highWaterMark else 16384;
@@ -59,7 +56,7 @@ pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableS
var needs_schedule = false;
defer if (needs_schedule) {
// wakeup the http thread to write the data
http.http_thread.scheduleRequestWrite(this.http.?, .data);
http.http_thread.scheduleRequestWrite(tasklet.http.?, .data);
};
// dont have backpressure so we will schedule the data to be written
@@ -115,7 +112,7 @@ pub fn writeEndRequest(this: *FetchTaskletRequest, err: ?jsc.JSValue) void {
/// This is ALWAYS called from the main thread
pub fn resumeRequestDataStream(this: *FetchTaskletRequest) void {
// deref when done because we ref inside onWriteRequestDataDrain
// deref when done because we ref inside SharedData.resumeRequestDataStream
const tasklet = this.parent();
defer tasklet.deref();
if (tasklet.isAborted()) {

View File

@@ -528,3 +528,11 @@ const MutableString = bun.MutableString;
const jsc = bun.jsc;
const http = bun.http;
pub const ResumableSink = jsc.WebCore.ResumableFetchSink;
const FetchTasklet = @import("../FetchTasklet.zig").FetchTasklet;
const log = bun.Output.scoped(.FetchTaskletResponse, .visible);
const BoringSSL = bun.BoringSSL.c;
const FetchHeaders = bun.webcore.FetchHeaders;
const Body = jsc.WebCore.Body;
const X509 = @import("../../../api/bun/x509.zig").X509;
const JSValue = jsc.JSValue;
const Blob = jsc.WebCore.Blob;

View File

@@ -5,6 +5,7 @@ ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
/// buffer being used by AsyncHTTP
response_buffer: bun.MutableString = undefined,
request_body_streaming_buffer: ?*bun.http.ThreadSafeStreamBuffer = null,
result: bun.http.HTTPClientResult = .{},
signals: bun.http.Signals = .{},
@@ -16,7 +17,7 @@ fn parent(this: *SharedData) *FetchTasklet {
}
/// This is ALWAYS called from the http thread and we cannot touch the buffer here because is locked
pub fn onWriteRequestDataDrain(this: *SharedData) void {
pub fn resumeRequestDataStream(this: *SharedData) void {
// ref until the main thread callback is called
const tasklet = this.parent();
tasklet.ref();
@@ -27,3 +28,4 @@ const std = @import("std");
const bun = @import("bun");
const jsc = bun.jsc;
const FetchTasklet = @import("../FetchTasklet.zig");
const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure;