This commit is contained in:
Ciro Spaciari
2025-11-10 16:56:48 -08:00
parent d748af1df8
commit a46702194b
3 changed files with 22 additions and 14 deletions

View File

@@ -10,7 +10,9 @@ hostname: ?[]u8 = null,
/// We always clone url and proxy (if informed)
url_proxy_buffer: []const u8 = "",
state: enum {
state: RequestState = .created,
const RequestState = enum {
created,
enqueued,
// information_headers,
@@ -19,7 +21,7 @@ state: enum {
// sending_trailer_headers,
failed,
done,
} = .created,
};
fn parent(this: *FetchTaskletRequest) *FetchTasklet {
return @fieldParentPtr("request", this);
@@ -62,7 +64,7 @@ pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableS
// dont have backpressure so we will schedule the data to be written
// if we have backpressure the onWritable will drain the buffer
needs_schedule = stream_buffer.isEmpty();
if (this.upgraded_connection) {
if (tasklet.response.flags.upgraded_connection) {
bun.handleOom(stream_buffer.write(data));
} else {
//16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n
@@ -86,24 +88,25 @@ pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableS
pub fn writeEndRequest(this: *FetchTaskletRequest, err: ?jsc.JSValue) void {
log("writeEndRequest hasError? {}", .{err != null});
defer this.deref();
const tasklet = this.parent();
defer tasklet.deref();
if (err) |jsError| {
if (this.signal_store.aborted.load(.monotonic) or this.abort_reason.has()) {
if (tasklet.isAborted()) {
return;
}
if (!jsError.isUndefinedOrNull()) {
this.abort_reason.set(this.global_this, jsError);
tasklet.abort_reason.set(tasklet.global_this, jsError);
}
this.abortTask();
tasklet.abortTask();
} else {
if (!this.upgraded_connection) {
if (!tasklet.response.flags.upgraded_connection) {
// If is not upgraded we need to send the terminating chunk
const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return;
const thread_safe_stream_buffer = tasklet.shared.request_body_streaming_buffer orelse return;
const stream_buffer = thread_safe_stream_buffer.acquire();
defer thread_safe_stream_buffer.release();
bun.handleOom(stream_buffer.write(http.end_of_chunked_http1_1_encoding_response_body));
}
if (this.http) |http_| {
if (tasklet.http) |http_| {
// just tell to write the end of the chunked encoding aka 0\r\n\r\n
http.http_thread.scheduleRequestWrite(http_, .end);
}
@@ -145,4 +148,4 @@ const Headers = http.Headers;
const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure;
const ResumableSink = jsc.WebCore.ResumableFetchSink;
const log = bun.Output.scoped(.FetchTaskletRequest, .visible);
const FetchTasklet = @import("../FetchTasklet.zig").FetchTasklet;
const FetchTasklet = @import("../FetchTasklet.zig");

View File

@@ -1,6 +1,12 @@
const Response = @This();
/// buffer used to stream response to JS
scheduled_response_buffer: MutableString = undefined,
scheduled_response_buffer: MutableString = .{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
},
check_server_identity: jsc.Strong.Optional = .empty,
flags: Flags = .{},
/// stream strong ref if any is available

View File

@@ -1,6 +1,5 @@
const SharedData = @This();
mutex: bun.Mutex,
mutex: bun.Mutex = .{},
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
/// buffer being used by AsyncHTTP