Compare commits

...

11 Commits

Author SHA1 Message Date
cirospaciari
ac48476aa6 revert abortListener 2023-09-07 18:11:21 -04:00
cirospaciari
a5a4d51788 revert fn abortListener 2023-09-07 18:09:49 -04:00
cirospaciari
7ff0530925 format 2023-09-07 18:04:23 -04:00
cirospaciari
2c88a1138a metadata can be null and restore fn fail 2023-09-07 18:01:07 -04:00
cirospaciari
0a125c514d only schedule if queue is not empty 2023-09-07 17:41:50 -04:00
cirospaciari
29c5645f50 resolve promise/stream on abort always 2023-09-07 17:11:53 -03:00
cirospaciari
e9db9f32b1 fix error when not locked but with readable available 2023-09-07 16:43:24 -03:00
cirospaciari
3260fdd790 consume multiple, only 1 task per tick 2023-09-07 16:21:42 -03:00
cirospaciari
ab0cb34cf0 fix partial body 2023-09-07 15:53:01 -03:00
cirospaciari
78b31e588a move body_size 2023-09-07 15:01:24 -03:00
cirospaciari
2332bb47b2 no mutex aproach 2023-09-07 14:40:54 -03:00
2 changed files with 224 additions and 188 deletions

View File

@@ -618,18 +618,67 @@ pub const Fetch = struct {
}
}
const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue;
pub const HTTPClientQueueResult = struct {
body: MutableString,
has_more: bool,
redirected: bool,
fail: anyerror,
certificate_info: ?HTTPClient.CertificateInfo,
/// For Http Client requests
/// when Content-Length is provided this represents the whole size of the request
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
/// If is not chunked encoded and Content-Length is not provided this will be unknown
body_size: HTTPClient.HTTPClientResult.BodySize = .unknown,
next: ?*HTTPClientQueueResult = null,
pub fn isSuccess(this: *HTTPClientQueueResult) bool {
return this.fail == error.NoError;
}
pub fn isTimeout(this: *HTTPClientQueueResult) bool {
return this.fail == error.Timeout;
}
pub fn isAbort(this: *HTTPClientQueueResult) bool {
return this.fail == error.Aborted;
}
fn getSizeHint(this: *HTTPClientQueueResult) Blob.SizeType {
return switch (this.body_size) {
.content_length => @truncate(this.body_size.content_length),
.total_received => @truncate(this.body_size.total_received),
else => 0,
};
}
pub fn deinit(this: *HTTPClientQueueResult) void {
this.body.deinit();
if (this.certificate_info) |certificate_info| {
defer certificate_info.deinit(bun.default_allocator);
}
bun.default_allocator.destroy(this);
}
};
const HTTPClientResultQueue = UnboundedQueue(HTTPClientQueueResult, .next);
pub const FetchTasklet = struct {
const log = Output.scoped(.FetchTasklet, false);
http: ?*HTTPClient.AsyncHTTP = null,
result: HTTPClient.HTTPClientResult = .{},
result_queue: HTTPClientResultQueue = HTTPClientResultQueue{},
metadata: ?HTTPClient.HTTPResponseMetadata = null,
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
/// buffer being used by AsyncHTTP
response_buffer: MutableString = undefined,
/// buffer used to stream response to JS
// all shedule buffers are stored here when not streaming
scheduled_response_buffer: MutableString = undefined,
/// response strong ref
response: JSC.Strong = .{},
@@ -640,12 +689,7 @@ pub const Fetch = struct {
concurrent_task: JSC.ConcurrentTask = .{},
poll_ref: JSC.PollRef = .{},
memory_reporter: *JSC.MemoryReportingAllocator,
/// For Http Client requests
/// when Content-Length is provided this represents the whole size of the request
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
/// If is not chunked encoded and Content-Length is not provided this will be unknown
body_size: HTTPClient.HTTPClientResult.BodySize = .unknown,
size_hint: Blob.SizeType,
/// This is url + proxy memory buffer and is owned by FetchTasklet
/// We always clone url and proxy (if informed)
url_proxy_buffer: []const u8 = "",
@@ -653,7 +697,6 @@ pub const Fetch = struct {
signal: ?*JSC.WebCore.AbortSignal = null,
signals: HTTPClient.Signals = .{},
signal_store: HTTPClient.Signals.Store = .{},
has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
// must be stored because AbortSignal stores reason weakly
abort_reason: JSValue = JSValue.zero,
@@ -665,7 +708,6 @@ pub const Fetch = struct {
hostname: ?[]u8 = null,
is_waiting_body: bool = false,
is_waiting_abort: bool = false,
mutex: Mutex,
tracker: JSC.AsyncTaskTracker,
@@ -704,6 +746,59 @@ pub const Fetch = struct {
return FetchTasklet{};
}
fn getFullResponseBodyValue(this: *FetchTasklet, result: *HTTPClientQueueResult) Body.Value {
var body_value: Body.Value = undefined;
if (this.scheduled_response_buffer.list.items.len > 0) {
_ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
this.memory_reporter.discard(this.scheduled_response_buffer.list.allocatedSlice());
body_value = Body.Value{
.InternalBlob = .{
.bytes = this.scheduled_response_buffer.list.toManaged(bun.default_allocator),
},
};
this.scheduled_response_buffer = .{
.allocator = this.memory_reporter.allocator(),
.list = .{
.items = &.{},
.capacity = 0,
},
};
} else {
body_value = Body.Value{
.InternalBlob = .{
.bytes = result.body.list.toManaged(bun.default_allocator),
},
};
}
result.body = MutableString{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
};
return body_value;
}
fn writeChunkToReadable(readable: JSC.WebCore.ReadableStream, chunk: []const u8, comptime has_more: bool) void {
if (has_more) {
readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
} else {
readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
}
}
fn clearData(this: *FetchTasklet) void {
log("clearData", .{});
const allocator = this.memory_reporter.allocator();
@@ -731,10 +826,10 @@ pub const Fetch = struct {
}
this.response_buffer.deinit();
this.scheduled_response_buffer.deinit();
this.response.deinit();
this.readable_stream_ref.deinit();
this.scheduled_response_buffer.deinit();
this.request_body.detach();
if (this.abort_reason != .zero)
@@ -759,14 +854,11 @@ pub const Fetch = struct {
bun.default_allocator.destroy(reporter);
}
pub fn onBodyReceived(this: *FetchTasklet) void {
this.mutex.lock();
const success = this.result.isSuccess();
pub fn onBodyReceived(this: *FetchTasklet, result: *HTTPClientQueueResult) void {
const success = result.isSuccess();
const globalThis = this.global_this;
const is_done = !success or !this.result.has_more;
const is_done = !success or !result.has_more;
defer {
this.has_schedule_callback.store(false, .Monotonic);
this.mutex.unlock();
if (is_done) {
var vm = globalThis.bunVM();
this.poll_ref.unref(vm);
@@ -776,8 +868,17 @@ pub const Fetch = struct {
}
if (!success) {
const err = this.onReject();
const err = this.onReject(result);
err.ensureStillAlive();
if (this.readable_stream_ref.get()) |readable| {
readable.ptr.Bytes.onData(
.{
.err = .{ .JSValue = err },
},
bun.default_allocator,
);
return;
}
if (this.response.get()) |response_js| {
if (response_js.as(Response)) |response| {
const body = response.body;
@@ -804,30 +905,18 @@ pub const Fetch = struct {
if (this.readable_stream_ref.get()) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
// body can be marked as used but we still need to pipe the data
var scheduled_response_buffer = this.scheduled_response_buffer.list;
const chunk = scheduled_response_buffer.items;
if (this.result.has_more) {
readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
// clean for reuse later
readable.ptr.Bytes.size_hint = result.getSizeHint();
if (this.scheduled_response_buffer.list.items.len > 0) {
writeChunkToReadable(readable, this.scheduled_response_buffer.list.items, true);
this.scheduled_response_buffer.reset();
} else {
readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
}
if (result.has_more) {
writeChunkToReadable(readable, result.body.list.items, true);
} else {
writeChunkToReadable(readable, result.body.list.items, false);
}
return;
}
}
@@ -838,57 +927,34 @@ pub const Fetch = struct {
if (body.value == .Locked) {
if (body.value.Locked.readable) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
var scheduled_response_buffer = this.scheduled_response_buffer.list;
const chunk = scheduled_response_buffer.items;
if (this.result.has_more) {
readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
// clean for reuse later
readable.ptr.Bytes.size_hint = result.getSizeHint();
if (this.scheduled_response_buffer.list.items.len > 0) {
writeChunkToReadable(readable, this.scheduled_response_buffer.list.items, true);
this.scheduled_response_buffer.reset();
}
if (result.has_more) {
writeChunkToReadable(readable, result.body.list.items, true);
} else {
readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
writeChunkToReadable(readable, result.body.list.items, false);
}
return;
}
} else {
response.body.value.Locked.size_hint = this.getSizeHint();
response.body.value.Locked.size_hint = result.getSizeHint();
}
// we will reach here when not streaming
if (!this.result.has_more) {
var scheduled_response_buffer = this.scheduled_response_buffer.list;
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
if (result.has_more) {
// buffer more data
_ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
} else {
// done resolve body
var old = body.value;
var body_value = Body.Value{
.InternalBlob = .{
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
},
};
response.body.value = body_value;
this.scheduled_response_buffer = .{
.allocator = this.memory_reporter.allocator(),
.list = .{
.items = &.{},
.capacity = 0,
},
};
response.body.value = this.getFullResponseBodyValue(result);
if (old == .Locked) {
old.resolve(&response.body.value, this.global_this);
@@ -899,25 +965,19 @@ pub const Fetch = struct {
}
}
pub fn onProgressUpdate(this: *FetchTasklet) void {
JSC.markBinding(@src());
log("onProgressUpdate", .{});
pub fn processResult(this: *FetchTasklet, result: *HTTPClientQueueResult) void {
if (this.is_waiting_body) {
return this.onBodyReceived();
return this.onBodyReceived(result);
}
// if we abort because of cert error
// we wait the Http Client because we already have the response
// we just need to deinit
const globalThis = this.global_this;
this.mutex.lock();
if (this.is_waiting_abort) {
// has_more will be false when the request is aborted/finished
if (this.result.has_more) {
this.mutex.unlock();
if (result.has_more) {
return;
}
this.mutex.unlock();
var poll_ref = this.poll_ref;
var vm = globalThis.bunVM();
@@ -936,35 +996,31 @@ pub const Fetch = struct {
if (promise_value.isEmptyOrUndefinedOrNull()) {
log("onProgressUpdate: promise_value is null", .{});
ref.strong.deinit();
this.has_schedule_callback.store(false, .Monotonic);
this.mutex.unlock();
poll_ref.unref(vm);
this.clearData();
this.deinit();
return;
}
if (this.result.certificate_info) |certificate_info| {
this.result.certificate_info = null;
if (result.certificate_info) |certificate_info| {
result.certificate_info = null;
defer certificate_info.deinit(bun.default_allocator);
// we receive some error
if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info)) {
if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info, result)) {
log("onProgressUpdate: aborted due certError", .{});
// we need to abort the request
const promise = promise_value.asAnyPromise().?;
const tracker = this.tracker;
const result = this.onReject();
const js_result = this.onReject(result);
result.ensureStillAlive();
js_result.ensureStillAlive();
promise_value.ensureStillAlive();
promise.reject(globalThis, result);
promise.reject(globalThis, js_result);
tracker.didDispatch(globalThis);
ref.strong.deinit();
this.has_schedule_callback.store(false, .Monotonic);
this.mutex.unlock();
if (this.is_waiting_abort) {
return;
}
@@ -977,48 +1033,59 @@ pub const Fetch = struct {
// everything ok
if (this.metadata == null) {
log("onProgressUpdate: metadata is null", .{});
this.has_schedule_callback.store(false, .Monotonic);
// cannot continue without metadata
this.mutex.unlock();
if (result.body.list.items.len > 0) {
// looks like we have some data to save until next time
_ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
}
return;
}
}
const promise = promise_value.asAnyPromise().?;
const tracker = this.tracker;
tracker.willDispatch(globalThis);
defer {
log("onProgressUpdate: promise_value is not null", .{});
tracker.didDispatch(globalThis);
ref.strong.deinit();
this.has_schedule_callback.store(false, .Monotonic);
this.mutex.unlock();
if (!this.is_waiting_body) {
poll_ref.unref(vm);
this.clearData();
this.deinit();
}
}
const success = this.result.isSuccess();
const result = switch (success) {
true => this.onResolve(),
false => this.onReject(),
const success = result.isSuccess();
const js_result = switch (success) {
true => this.onResolve(result),
false => this.onReject(result),
};
result.ensureStillAlive();
promise_value.ensureStillAlive();
const promise = promise_value.asAnyPromise() orelse return;
js_result.ensureStillAlive();
switch (success) {
true => {
promise.resolve(globalThis, result);
promise.resolve(globalThis, js_result);
},
false => {
promise.reject(globalThis, result);
promise.reject(globalThis, js_result);
},
}
}
pub fn onProgressUpdate(this: *FetchTasklet) void {
JSC.markBinding(@src());
log("onProgressUpdate", .{});
pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: HTTPClient.CertificateInfo) bool {
while (this.result_queue.pop()) |result| {
defer result.deinit();
this.processResult(result);
}
}
pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: HTTPClient.CertificateInfo, result: *HTTPClientQueueResult) bool {
if (this.check_server_identity.get()) |check_server_identity| {
check_server_identity.ensureStillAlive();
if (certificate_info.cert.len > 0) {
@@ -1036,7 +1103,7 @@ pub const Fetch = struct {
// if check failed abort the request
if (check_result.isAnyError()) {
// mark to wait until deinit
this.is_waiting_abort = this.result.has_more;
this.is_waiting_abort = result.has_more;
check_result.ensureStillAlive();
check_result.protect();
@@ -1048,17 +1115,17 @@ pub const Fetch = struct {
if (this.http != null) {
HTTPClient.http_thread.scheduleShutdown(this.http.?);
}
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
return false;
}
return true;
}
}
}
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
return false;
}
pub fn onReject(this: *FetchTasklet) JSValue {
pub fn onReject(this: *FetchTasklet, result: *HTTPClientQueueResult) JSValue {
log("onReject", .{});
if (this.signal) |signal| {
@@ -1070,12 +1137,12 @@ pub const Fetch = struct {
return this.abort_reason;
}
if (this.result.isTimeout()) {
if (result.isTimeout()) {
// Timeout without reason
return JSC.WebCore.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this);
}
if (this.result.isAbort()) {
if (result.isAbort()) {
// Abort without reason
return JSC.WebCore.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
}
@@ -1092,8 +1159,8 @@ pub const Fetch = struct {
}
const fetch_error = JSC.SystemError{
.code = bun.String.static(@errorName(this.result.fail)),
.message = switch (this.result.fail) {
.code = bun.String.static(@errorName(result.fail)),
.message = switch (result.fail) {
error.ConnectionClosed => bun.String.static("The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()"),
error.FailedToOpenSocket => bun.String.static("Was there a typo in the url or port?"),
error.TooManyRedirects => bun.String.static("The response redirected too many times. For more information, pass `verbose: true` in the second argument to fetch()"),
@@ -1122,14 +1189,10 @@ pub const Fetch = struct {
};
}
this.mutex.lock();
defer this.mutex.unlock();
const size_hint = this.getSizeHint();
var scheduled_response_buffer = this.scheduled_response_buffer.list;
var first_packed_buffer = this.scheduled_response_buffer.list;
// This means we have received part of the body but not the whole thing
if (scheduled_response_buffer.items.len > 0) {
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
if (first_packed_buffer.items.len > 0) {
this.memory_reporter.discard(first_packed_buffer.allocatedSlice());
this.scheduled_response_buffer = .{
.allocator = this.memory_reporter.allocator(),
.list = .{
@@ -1140,84 +1203,62 @@ pub const Fetch = struct {
return .{
.owned = .{
.list = scheduled_response_buffer.toManaged(bun.default_allocator),
.size_hint = size_hint,
.list = first_packed_buffer.toManaged(bun.default_allocator),
.size_hint = this.size_hint,
},
};
}
return .{
.estimated_size = size_hint,
.estimated_size = this.size_hint,
};
}
fn getSizeHint(this: *FetchTasklet) Blob.SizeType {
return switch (this.body_size) {
.content_length => @truncate(this.body_size.content_length),
.total_received => @truncate(this.body_size.total_received),
else => 0,
};
}
fn toBodyValue(this: *FetchTasklet) Body.Value {
fn toBodyValue(this: *FetchTasklet, result: *HTTPClientQueueResult) Body.Value {
if (this.is_waiting_body) {
const response = Body.Value{
.Locked = .{
.size_hint = this.getSizeHint(),
.size_hint = result.getSizeHint(),
.task = this,
.global = this.global_this,
.onStartStreaming = FetchTasklet.onStartStreamingRequestBodyCallback,
.onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable,
},
};
_ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
return response;
}
var scheduled_response_buffer = this.scheduled_response_buffer.list;
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
const response = Body.Value{
.InternalBlob = .{
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
},
};
this.scheduled_response_buffer = .{
.allocator = this.memory_reporter.allocator(),
.list = .{
.items = &.{},
.capacity = 0,
},
};
return response;
return this.getFullResponseBodyValue(result);
}
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator, result: *HTTPClientQueueResult) Response {
log("toResponse", .{});
std.debug.assert(this.metadata != null);
// at this point we always should have metadata
var metadata = this.metadata.?;
const http_response = metadata.response;
this.is_waiting_body = this.result.has_more;
this.is_waiting_body = result.has_more;
return Response{
.allocator = allocator,
.url = bun.String.createAtomIfPossible(metadata.url),
.status_text = bun.String.createAtomIfPossible(http_response.status),
.redirected = this.result.redirected,
.redirected = result.redirected,
.body = .{
.init = .{
.headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
.status_code = @as(u16, @truncate(http_response.status_code)),
},
.value = this.toBodyValue(),
.value = this.toBodyValue(result),
},
};
}
pub fn onResolve(this: *FetchTasklet) JSValue {
pub fn onResolve(this: *FetchTasklet, result: *HTTPClientQueueResult) JSValue {
log("onResolve", .{});
const allocator = bun.default_allocator;
var response = allocator.create(Response) catch unreachable;
response.* = this.toResponse(allocator);
response.* = this.toResponse(allocator, result);
const response_js = Response.makeMaybePooled(@as(js.JSContextRef, this.global_this), response);
response_js.ensureStillAlive();
this.response = JSC.Strong.create(response_js, this.global_this);
@@ -1234,8 +1275,8 @@ pub const Fetch = struct {
var fetch_tasklet = try allocator.create(FetchTasklet);
fetch_tasklet.* = .{
.mutex = Mutex.init(),
.scheduled_response_buffer = .{
.size_hint = 0,
.scheduled_response_buffer = MutableString{
.allocator = fetch_options.memory_reporter.allocator(),
.list = .{
.items = &.{},
@@ -1339,11 +1380,13 @@ pub const Fetch = struct {
pub fn abortListener(this: *FetchTasklet, reason: JSValue) void {
log("abortListener", .{});
const globalThis = this.global_this;
reason.ensureStillAlive();
this.abort_reason = reason;
reason.protect();
this.abort_reason = reason;
this.signal_store.aborted.store(true, .Monotonic);
this.tracker.didCancel(this.global_this);
this.tracker.didCancel(globalThis);
if (this.http != null) {
HTTPClient.http_thread.scheduleShutdown(this.http.?);
@@ -1396,10 +1439,7 @@ pub const Fetch = struct {
}
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
task.mutex.lock();
defer task.mutex.unlock();
log("callback success {} has_more {} bytes {}", .{ result.isSuccess(), result.has_more, result.body.?.list.items.len });
task.result = result;
// metadata should be provided only once so we preserve it until we consume it
if (result.metadata) |metadata| {
@@ -1407,23 +1447,24 @@ pub const Fetch = struct {
std.debug.assert(task.metadata == null);
task.metadata = metadata;
}
task.body_size = result.body_size;
const success = result.isSuccess();
task.response_buffer = result.body.?.*;
if (success) {
_ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM");
}
// reset for reuse
var item = bun.default_allocator.create(HTTPClientQueueResult) catch @panic("OOM");
item.* = .{
.body_size = result.body_size,
.has_more = result.has_more,
.redirected = result.redirected,
.fail = result.fail,
.certificate_info = result.certificate_info,
.body = MutableString.initCopy(bun.default_allocator, result.body.?.*.list.items) catch @panic("OOM"),
};
task.size_hint = item.getSizeHint();
const is_empty = task.result_queue.isEmpty();
task.result_queue.push(item);
task.response_buffer.reset();
if (task.has_schedule_callback.compareAndSwap(false, true, .Acquire, .Monotonic)) |has_schedule_callback| {
if (has_schedule_callback) {
return;
}
if (is_empty) {
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
}
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
}
};

View File

@@ -1324,10 +1324,6 @@ pub const InternalState = struct {
reader.deinit();
}
// if we are holding a cloned_metadata we need to deinit it
// this should never happen because we should always return the metadata to the user
std.debug.assert(this.cloned_metadata == null);
// just in case we check and free to avoid leaks
if (this.cloned_metadata != null) {
this.cloned_metadata.?.deinit(allocator);
this.cloned_metadata = null;
@@ -2865,10 +2861,6 @@ fn fail(this: *HTTPClient, err: anyerror) void {
if (this.signals.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
this.state.reset(this.allocator);
this.proxy_tunneling = false;
this.state.request_stage = .fail;
this.state.response_stage = .fail;
this.state.fail = err;
@@ -2876,6 +2868,9 @@ fn fail(this: *HTTPClient, err: anyerror) void {
const callback = this.result_callback;
const result = this.toResult();
this.state.reset(this.allocator);
this.proxy_tunneling = false;
callback.run(result);
}