diff --git a/src/bun.js/webcore/fetch/tasklet/Request.zig b/src/bun.js/webcore/fetch/tasklet/Request.zig index 43925e5848..03f8f8f1a0 100644 --- a/src/bun.js/webcore/fetch/tasklet/Request.zig +++ b/src/bun.js/webcore/fetch/tasklet/Request.zig @@ -10,7 +10,9 @@ hostname: ?[]u8 = null, /// We always clone url and proxy (if informed) url_proxy_buffer: []const u8 = "", -state: enum { +state: RequestState = .created, + +const RequestState = enum { created, enqueued, // information_headers, @@ -19,7 +21,7 @@ state: enum { // sending_trailer_headers, failed, done, -} = .created, +}; fn parent(this: *FetchTaskletRequest) *FetchTasklet { return @fieldParentPtr("request", this); @@ -62,7 +64,7 @@ pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableS // dont have backpressure so we will schedule the data to be written // if we have backpressure the onWritable will drain the buffer needs_schedule = stream_buffer.isEmpty(); - if (this.upgraded_connection) { + if (tasklet.response.flags.upgraded_connection) { bun.handleOom(stream_buffer.write(data)); } else { //16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n @@ -86,24 +88,25 @@ pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableS pub fn writeEndRequest(this: *FetchTaskletRequest, err: ?jsc.JSValue) void { log("writeEndRequest hasError? {}", .{err != null}); - defer this.deref(); + const tasklet = this.parent(); + defer tasklet.deref(); if (err) |jsError| { - if (this.signal_store.aborted.load(.monotonic) or this.abort_reason.has()) { + if (tasklet.isAborted()) { return; } if (!jsError.isUndefinedOrNull()) { - this.abort_reason.set(this.global_this, jsError); + tasklet.abort_reason.set(tasklet.global_this, jsError); } - this.abortTask(); + tasklet.abortTask(); } else { - if (!this.upgraded_connection) { + if (!tasklet.response.flags.upgraded_connection) { // If is not upgraded we need to send the terminating chunk - const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return; + const thread_safe_stream_buffer = tasklet.shared.request_body_streaming_buffer orelse return; const stream_buffer = thread_safe_stream_buffer.acquire(); defer thread_safe_stream_buffer.release(); bun.handleOom(stream_buffer.write(http.end_of_chunked_http1_1_encoding_response_body)); } - if (this.http) |http_| { + if (tasklet.http) |http_| { // just tell to write the end of the chunked encoding aka 0\r\n\r\n http.http_thread.scheduleRequestWrite(http_, .end); } @@ -145,4 +148,4 @@ const Headers = http.Headers; const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure; const ResumableSink = jsc.WebCore.ResumableFetchSink; const log = bun.Output.scoped(.FetchTaskletRequest, .visible); -const FetchTasklet = @import("../FetchTasklet.zig").FetchTasklet; +const FetchTasklet = @import("../FetchTasklet.zig"); diff --git a/src/bun.js/webcore/fetch/tasklet/Response.zig b/src/bun.js/webcore/fetch/tasklet/Response.zig index 067f235da3..be12d33f7c 100644 --- a/src/bun.js/webcore/fetch/tasklet/Response.zig +++ b/src/bun.js/webcore/fetch/tasklet/Response.zig @@ -1,6 +1,12 @@ const Response = @This(); /// buffer used to stream response to JS -scheduled_response_buffer: MutableString = undefined, +scheduled_response_buffer: MutableString = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, +}, check_server_identity: jsc.Strong.Optional = .empty, flags: Flags = .{}, /// stream strong ref if any is available diff --git a/src/bun.js/webcore/fetch/tasklet/SharedData.zig b/src/bun.js/webcore/fetch/tasklet/SharedData.zig index 5d0529bc05..0a6987f1b8 100644 --- a/src/bun.js/webcore/fetch/tasklet/SharedData.zig +++ b/src/bun.js/webcore/fetch/tasklet/SharedData.zig @@ -1,6 +1,5 @@ const SharedData = @This(); -mutex: bun.Mutex, - +mutex: bun.Mutex = .{}, ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1), /// buffer being used by AsyncHTTP