mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 23:18:47 +00:00
Compare commits
11 Commits
claude/fix
...
ciro/queue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac48476aa6 | ||
|
|
a5a4d51788 | ||
|
|
7ff0530925 | ||
|
|
2c88a1138a | ||
|
|
0a125c514d | ||
|
|
29c5645f50 | ||
|
|
e9db9f32b1 | ||
|
|
3260fdd790 | ||
|
|
ab0cb34cf0 | ||
|
|
78b31e588a | ||
|
|
2332bb47b2 |
@@ -618,18 +618,67 @@ pub const Fetch = struct {
|
||||
}
|
||||
}
|
||||
|
||||
const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue;
|
||||
|
||||
pub const HTTPClientQueueResult = struct {
|
||||
body: MutableString,
|
||||
has_more: bool,
|
||||
redirected: bool,
|
||||
fail: anyerror,
|
||||
certificate_info: ?HTTPClient.CertificateInfo,
|
||||
/// For Http Client requests
|
||||
/// when Content-Length is provided this represents the whole size of the request
|
||||
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
|
||||
/// If is not chunked encoded and Content-Length is not provided this will be unknown
|
||||
body_size: HTTPClient.HTTPClientResult.BodySize = .unknown,
|
||||
|
||||
next: ?*HTTPClientQueueResult = null,
|
||||
|
||||
pub fn isSuccess(this: *HTTPClientQueueResult) bool {
|
||||
return this.fail == error.NoError;
|
||||
}
|
||||
|
||||
pub fn isTimeout(this: *HTTPClientQueueResult) bool {
|
||||
return this.fail == error.Timeout;
|
||||
}
|
||||
|
||||
pub fn isAbort(this: *HTTPClientQueueResult) bool {
|
||||
return this.fail == error.Aborted;
|
||||
}
|
||||
|
||||
fn getSizeHint(this: *HTTPClientQueueResult) Blob.SizeType {
|
||||
return switch (this.body_size) {
|
||||
.content_length => @truncate(this.body_size.content_length),
|
||||
.total_received => @truncate(this.body_size.total_received),
|
||||
else => 0,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(this: *HTTPClientQueueResult) void {
|
||||
this.body.deinit();
|
||||
if (this.certificate_info) |certificate_info| {
|
||||
defer certificate_info.deinit(bun.default_allocator);
|
||||
}
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
};
|
||||
|
||||
const HTTPClientResultQueue = UnboundedQueue(HTTPClientQueueResult, .next);
|
||||
|
||||
pub const FetchTasklet = struct {
|
||||
const log = Output.scoped(.FetchTasklet, false);
|
||||
|
||||
http: ?*HTTPClient.AsyncHTTP = null,
|
||||
result: HTTPClient.HTTPClientResult = .{},
|
||||
|
||||
result_queue: HTTPClientResultQueue = HTTPClientResultQueue{},
|
||||
|
||||
metadata: ?HTTPClient.HTTPResponseMetadata = null,
|
||||
javascript_vm: *VirtualMachine = undefined,
|
||||
global_this: *JSGlobalObject = undefined,
|
||||
request_body: HTTPRequestBody = undefined,
|
||||
/// buffer being used by AsyncHTTP
|
||||
response_buffer: MutableString = undefined,
|
||||
/// buffer used to stream response to JS
|
||||
// all shedule buffers are stored here when not streaming
|
||||
scheduled_response_buffer: MutableString = undefined,
|
||||
/// response strong ref
|
||||
response: JSC.Strong = .{},
|
||||
@@ -640,12 +689,7 @@ pub const Fetch = struct {
|
||||
concurrent_task: JSC.ConcurrentTask = .{},
|
||||
poll_ref: JSC.PollRef = .{},
|
||||
memory_reporter: *JSC.MemoryReportingAllocator,
|
||||
/// For Http Client requests
|
||||
/// when Content-Length is provided this represents the whole size of the request
|
||||
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
|
||||
/// If is not chunked encoded and Content-Length is not provided this will be unknown
|
||||
body_size: HTTPClient.HTTPClientResult.BodySize = .unknown,
|
||||
|
||||
size_hint: Blob.SizeType,
|
||||
/// This is url + proxy memory buffer and is owned by FetchTasklet
|
||||
/// We always clone url and proxy (if informed)
|
||||
url_proxy_buffer: []const u8 = "",
|
||||
@@ -653,7 +697,6 @@ pub const Fetch = struct {
|
||||
signal: ?*JSC.WebCore.AbortSignal = null,
|
||||
signals: HTTPClient.Signals = .{},
|
||||
signal_store: HTTPClient.Signals.Store = .{},
|
||||
has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
|
||||
|
||||
// must be stored because AbortSignal stores reason weakly
|
||||
abort_reason: JSValue = JSValue.zero,
|
||||
@@ -665,7 +708,6 @@ pub const Fetch = struct {
|
||||
hostname: ?[]u8 = null,
|
||||
is_waiting_body: bool = false,
|
||||
is_waiting_abort: bool = false,
|
||||
mutex: Mutex,
|
||||
|
||||
tracker: JSC.AsyncTaskTracker,
|
||||
|
||||
@@ -704,6 +746,59 @@ pub const Fetch = struct {
|
||||
return FetchTasklet{};
|
||||
}
|
||||
|
||||
fn getFullResponseBodyValue(this: *FetchTasklet, result: *HTTPClientQueueResult) Body.Value {
|
||||
var body_value: Body.Value = undefined;
|
||||
if (this.scheduled_response_buffer.list.items.len > 0) {
|
||||
_ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
|
||||
this.memory_reporter.discard(this.scheduled_response_buffer.list.allocatedSlice());
|
||||
body_value = Body.Value{
|
||||
.InternalBlob = .{
|
||||
.bytes = this.scheduled_response_buffer.list.toManaged(bun.default_allocator),
|
||||
},
|
||||
};
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = this.memory_reporter.allocator(),
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
};
|
||||
} else {
|
||||
body_value = Body.Value{
|
||||
.InternalBlob = .{
|
||||
.bytes = result.body.list.toManaged(bun.default_allocator),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
result.body = MutableString{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
};
|
||||
return body_value;
|
||||
}
|
||||
|
||||
fn writeChunkToReadable(readable: JSC.WebCore.ReadableStream, chunk: []const u8, comptime has_more: bool) void {
|
||||
if (has_more) {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary = bun.ByteList.initConst(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
} else {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary_and_done = bun.ByteList.initConst(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn clearData(this: *FetchTasklet) void {
|
||||
log("clearData", .{});
|
||||
const allocator = this.memory_reporter.allocator();
|
||||
@@ -731,10 +826,10 @@ pub const Fetch = struct {
|
||||
}
|
||||
|
||||
this.response_buffer.deinit();
|
||||
this.scheduled_response_buffer.deinit();
|
||||
this.response.deinit();
|
||||
this.readable_stream_ref.deinit();
|
||||
|
||||
this.scheduled_response_buffer.deinit();
|
||||
this.request_body.detach();
|
||||
|
||||
if (this.abort_reason != .zero)
|
||||
@@ -759,14 +854,11 @@ pub const Fetch = struct {
|
||||
bun.default_allocator.destroy(reporter);
|
||||
}
|
||||
|
||||
pub fn onBodyReceived(this: *FetchTasklet) void {
|
||||
this.mutex.lock();
|
||||
const success = this.result.isSuccess();
|
||||
pub fn onBodyReceived(this: *FetchTasklet, result: *HTTPClientQueueResult) void {
|
||||
const success = result.isSuccess();
|
||||
const globalThis = this.global_this;
|
||||
const is_done = !success or !this.result.has_more;
|
||||
const is_done = !success or !result.has_more;
|
||||
defer {
|
||||
this.has_schedule_callback.store(false, .Monotonic);
|
||||
this.mutex.unlock();
|
||||
if (is_done) {
|
||||
var vm = globalThis.bunVM();
|
||||
this.poll_ref.unref(vm);
|
||||
@@ -776,8 +868,17 @@ pub const Fetch = struct {
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
const err = this.onReject();
|
||||
const err = this.onReject(result);
|
||||
err.ensureStillAlive();
|
||||
if (this.readable_stream_ref.get()) |readable| {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.err = .{ .JSValue = err },
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (this.response.get()) |response_js| {
|
||||
if (response_js.as(Response)) |response| {
|
||||
const body = response.body;
|
||||
@@ -804,30 +905,18 @@ pub const Fetch = struct {
|
||||
|
||||
if (this.readable_stream_ref.get()) |readable| {
|
||||
if (readable.ptr == .Bytes) {
|
||||
readable.ptr.Bytes.size_hint = this.getSizeHint();
|
||||
// body can be marked as used but we still need to pipe the data
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
|
||||
const chunk = scheduled_response_buffer.items;
|
||||
|
||||
if (this.result.has_more) {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary = bun.ByteList.initConst(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
|
||||
// clean for reuse later
|
||||
readable.ptr.Bytes.size_hint = result.getSizeHint();
|
||||
if (this.scheduled_response_buffer.list.items.len > 0) {
|
||||
writeChunkToReadable(readable, this.scheduled_response_buffer.list.items, true);
|
||||
this.scheduled_response_buffer.reset();
|
||||
} else {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary_and_done = bun.ByteList.initConst(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
}
|
||||
|
||||
if (result.has_more) {
|
||||
writeChunkToReadable(readable, result.body.list.items, true);
|
||||
} else {
|
||||
writeChunkToReadable(readable, result.body.list.items, false);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -838,57 +927,34 @@ pub const Fetch = struct {
|
||||
if (body.value == .Locked) {
|
||||
if (body.value.Locked.readable) |readable| {
|
||||
if (readable.ptr == .Bytes) {
|
||||
readable.ptr.Bytes.size_hint = this.getSizeHint();
|
||||
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
|
||||
const chunk = scheduled_response_buffer.items;
|
||||
|
||||
if (this.result.has_more) {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary = bun.ByteList.initConst(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
|
||||
// clean for reuse later
|
||||
readable.ptr.Bytes.size_hint = result.getSizeHint();
|
||||
if (this.scheduled_response_buffer.list.items.len > 0) {
|
||||
writeChunkToReadable(readable, this.scheduled_response_buffer.list.items, true);
|
||||
this.scheduled_response_buffer.reset();
|
||||
}
|
||||
|
||||
if (result.has_more) {
|
||||
writeChunkToReadable(readable, result.body.list.items, true);
|
||||
} else {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary_and_done = bun.ByteList.initConst(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
writeChunkToReadable(readable, result.body.list.items, false);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
response.body.value.Locked.size_hint = this.getSizeHint();
|
||||
response.body.value.Locked.size_hint = result.getSizeHint();
|
||||
}
|
||||
|
||||
// we will reach here when not streaming
|
||||
if (!this.result.has_more) {
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
|
||||
if (result.has_more) {
|
||||
// buffer more data
|
||||
_ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
|
||||
} else {
|
||||
|
||||
// done resolve body
|
||||
var old = body.value;
|
||||
var body_value = Body.Value{
|
||||
.InternalBlob = .{
|
||||
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
|
||||
},
|
||||
};
|
||||
response.body.value = body_value;
|
||||
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = this.memory_reporter.allocator(),
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
};
|
||||
response.body.value = this.getFullResponseBodyValue(result);
|
||||
|
||||
if (old == .Locked) {
|
||||
old.resolve(&response.body.value, this.global_this);
|
||||
@@ -899,25 +965,19 @@ pub const Fetch = struct {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onProgressUpdate(this: *FetchTasklet) void {
|
||||
JSC.markBinding(@src());
|
||||
log("onProgressUpdate", .{});
|
||||
pub fn processResult(this: *FetchTasklet, result: *HTTPClientQueueResult) void {
|
||||
if (this.is_waiting_body) {
|
||||
return this.onBodyReceived();
|
||||
return this.onBodyReceived(result);
|
||||
}
|
||||
// if we abort because of cert error
|
||||
// we wait the Http Client because we already have the response
|
||||
// we just need to deinit
|
||||
const globalThis = this.global_this;
|
||||
this.mutex.lock();
|
||||
|
||||
if (this.is_waiting_abort) {
|
||||
// has_more will be false when the request is aborted/finished
|
||||
if (this.result.has_more) {
|
||||
this.mutex.unlock();
|
||||
if (result.has_more) {
|
||||
return;
|
||||
}
|
||||
this.mutex.unlock();
|
||||
var poll_ref = this.poll_ref;
|
||||
var vm = globalThis.bunVM();
|
||||
|
||||
@@ -936,35 +996,31 @@ pub const Fetch = struct {
|
||||
if (promise_value.isEmptyOrUndefinedOrNull()) {
|
||||
log("onProgressUpdate: promise_value is null", .{});
|
||||
ref.strong.deinit();
|
||||
this.has_schedule_callback.store(false, .Monotonic);
|
||||
this.mutex.unlock();
|
||||
poll_ref.unref(vm);
|
||||
this.clearData();
|
||||
this.deinit();
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.result.certificate_info) |certificate_info| {
|
||||
this.result.certificate_info = null;
|
||||
if (result.certificate_info) |certificate_info| {
|
||||
result.certificate_info = null;
|
||||
defer certificate_info.deinit(bun.default_allocator);
|
||||
|
||||
// we receive some error
|
||||
if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info)) {
|
||||
if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info, result)) {
|
||||
log("onProgressUpdate: aborted due certError", .{});
|
||||
// we need to abort the request
|
||||
const promise = promise_value.asAnyPromise().?;
|
||||
const tracker = this.tracker;
|
||||
const result = this.onReject();
|
||||
const js_result = this.onReject(result);
|
||||
|
||||
result.ensureStillAlive();
|
||||
js_result.ensureStillAlive();
|
||||
promise_value.ensureStillAlive();
|
||||
|
||||
promise.reject(globalThis, result);
|
||||
promise.reject(globalThis, js_result);
|
||||
|
||||
tracker.didDispatch(globalThis);
|
||||
ref.strong.deinit();
|
||||
this.has_schedule_callback.store(false, .Monotonic);
|
||||
this.mutex.unlock();
|
||||
if (this.is_waiting_abort) {
|
||||
return;
|
||||
}
|
||||
@@ -977,48 +1033,59 @@ pub const Fetch = struct {
|
||||
// everything ok
|
||||
if (this.metadata == null) {
|
||||
log("onProgressUpdate: metadata is null", .{});
|
||||
this.has_schedule_callback.store(false, .Monotonic);
|
||||
// cannot continue without metadata
|
||||
this.mutex.unlock();
|
||||
if (result.body.list.items.len > 0) {
|
||||
// looks like we have some data to save until next time
|
||||
_ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const promise = promise_value.asAnyPromise().?;
|
||||
const tracker = this.tracker;
|
||||
tracker.willDispatch(globalThis);
|
||||
defer {
|
||||
log("onProgressUpdate: promise_value is not null", .{});
|
||||
tracker.didDispatch(globalThis);
|
||||
ref.strong.deinit();
|
||||
this.has_schedule_callback.store(false, .Monotonic);
|
||||
this.mutex.unlock();
|
||||
if (!this.is_waiting_body) {
|
||||
poll_ref.unref(vm);
|
||||
this.clearData();
|
||||
this.deinit();
|
||||
}
|
||||
}
|
||||
const success = this.result.isSuccess();
|
||||
const result = switch (success) {
|
||||
true => this.onResolve(),
|
||||
false => this.onReject(),
|
||||
const success = result.isSuccess();
|
||||
|
||||
const js_result = switch (success) {
|
||||
true => this.onResolve(result),
|
||||
false => this.onReject(result),
|
||||
};
|
||||
result.ensureStillAlive();
|
||||
|
||||
promise_value.ensureStillAlive();
|
||||
const promise = promise_value.asAnyPromise() orelse return;
|
||||
|
||||
js_result.ensureStillAlive();
|
||||
|
||||
switch (success) {
|
||||
true => {
|
||||
promise.resolve(globalThis, result);
|
||||
promise.resolve(globalThis, js_result);
|
||||
},
|
||||
false => {
|
||||
promise.reject(globalThis, result);
|
||||
promise.reject(globalThis, js_result);
|
||||
},
|
||||
}
|
||||
}
|
||||
pub fn onProgressUpdate(this: *FetchTasklet) void {
|
||||
JSC.markBinding(@src());
|
||||
log("onProgressUpdate", .{});
|
||||
|
||||
pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: HTTPClient.CertificateInfo) bool {
|
||||
while (this.result_queue.pop()) |result| {
|
||||
defer result.deinit();
|
||||
this.processResult(result);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: HTTPClient.CertificateInfo, result: *HTTPClientQueueResult) bool {
|
||||
if (this.check_server_identity.get()) |check_server_identity| {
|
||||
check_server_identity.ensureStillAlive();
|
||||
if (certificate_info.cert.len > 0) {
|
||||
@@ -1036,7 +1103,7 @@ pub const Fetch = struct {
|
||||
// if check failed abort the request
|
||||
if (check_result.isAnyError()) {
|
||||
// mark to wait until deinit
|
||||
this.is_waiting_abort = this.result.has_more;
|
||||
this.is_waiting_abort = result.has_more;
|
||||
|
||||
check_result.ensureStillAlive();
|
||||
check_result.protect();
|
||||
@@ -1048,17 +1115,17 @@ pub const Fetch = struct {
|
||||
if (this.http != null) {
|
||||
HTTPClient.http_thread.scheduleShutdown(this.http.?);
|
||||
}
|
||||
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
|
||||
result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
|
||||
result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
|
||||
return false;
|
||||
}
|
||||
pub fn onReject(this: *FetchTasklet) JSValue {
|
||||
pub fn onReject(this: *FetchTasklet, result: *HTTPClientQueueResult) JSValue {
|
||||
log("onReject", .{});
|
||||
|
||||
if (this.signal) |signal| {
|
||||
@@ -1070,12 +1137,12 @@ pub const Fetch = struct {
|
||||
return this.abort_reason;
|
||||
}
|
||||
|
||||
if (this.result.isTimeout()) {
|
||||
if (result.isTimeout()) {
|
||||
// Timeout without reason
|
||||
return JSC.WebCore.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this);
|
||||
}
|
||||
|
||||
if (this.result.isAbort()) {
|
||||
if (result.isAbort()) {
|
||||
// Abort without reason
|
||||
return JSC.WebCore.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
|
||||
}
|
||||
@@ -1092,8 +1159,8 @@ pub const Fetch = struct {
|
||||
}
|
||||
|
||||
const fetch_error = JSC.SystemError{
|
||||
.code = bun.String.static(@errorName(this.result.fail)),
|
||||
.message = switch (this.result.fail) {
|
||||
.code = bun.String.static(@errorName(result.fail)),
|
||||
.message = switch (result.fail) {
|
||||
error.ConnectionClosed => bun.String.static("The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()"),
|
||||
error.FailedToOpenSocket => bun.String.static("Was there a typo in the url or port?"),
|
||||
error.TooManyRedirects => bun.String.static("The response redirected too many times. For more information, pass `verbose: true` in the second argument to fetch()"),
|
||||
@@ -1122,14 +1189,10 @@ pub const Fetch = struct {
|
||||
};
|
||||
}
|
||||
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
const size_hint = this.getSizeHint();
|
||||
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
var first_packed_buffer = this.scheduled_response_buffer.list;
|
||||
// This means we have received part of the body but not the whole thing
|
||||
if (scheduled_response_buffer.items.len > 0) {
|
||||
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
|
||||
if (first_packed_buffer.items.len > 0) {
|
||||
this.memory_reporter.discard(first_packed_buffer.allocatedSlice());
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = this.memory_reporter.allocator(),
|
||||
.list = .{
|
||||
@@ -1140,84 +1203,62 @@ pub const Fetch = struct {
|
||||
|
||||
return .{
|
||||
.owned = .{
|
||||
.list = scheduled_response_buffer.toManaged(bun.default_allocator),
|
||||
.size_hint = size_hint,
|
||||
.list = first_packed_buffer.toManaged(bun.default_allocator),
|
||||
.size_hint = this.size_hint,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return .{
|
||||
.estimated_size = size_hint,
|
||||
.estimated_size = this.size_hint,
|
||||
};
|
||||
}
|
||||
|
||||
fn getSizeHint(this: *FetchTasklet) Blob.SizeType {
|
||||
return switch (this.body_size) {
|
||||
.content_length => @truncate(this.body_size.content_length),
|
||||
.total_received => @truncate(this.body_size.total_received),
|
||||
else => 0,
|
||||
};
|
||||
}
|
||||
|
||||
fn toBodyValue(this: *FetchTasklet) Body.Value {
|
||||
fn toBodyValue(this: *FetchTasklet, result: *HTTPClientQueueResult) Body.Value {
|
||||
if (this.is_waiting_body) {
|
||||
const response = Body.Value{
|
||||
.Locked = .{
|
||||
.size_hint = this.getSizeHint(),
|
||||
.size_hint = result.getSizeHint(),
|
||||
.task = this,
|
||||
.global = this.global_this,
|
||||
.onStartStreaming = FetchTasklet.onStartStreamingRequestBodyCallback,
|
||||
.onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable,
|
||||
},
|
||||
};
|
||||
_ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
|
||||
return response;
|
||||
}
|
||||
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice());
|
||||
const response = Body.Value{
|
||||
.InternalBlob = .{
|
||||
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
|
||||
},
|
||||
};
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = this.memory_reporter.allocator(),
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
};
|
||||
|
||||
return response;
|
||||
return this.getFullResponseBodyValue(result);
|
||||
}
|
||||
|
||||
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
|
||||
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator, result: *HTTPClientQueueResult) Response {
|
||||
log("toResponse", .{});
|
||||
std.debug.assert(this.metadata != null);
|
||||
// at this point we always should have metadata
|
||||
var metadata = this.metadata.?;
|
||||
const http_response = metadata.response;
|
||||
this.is_waiting_body = this.result.has_more;
|
||||
this.is_waiting_body = result.has_more;
|
||||
return Response{
|
||||
.allocator = allocator,
|
||||
.url = bun.String.createAtomIfPossible(metadata.url),
|
||||
.status_text = bun.String.createAtomIfPossible(http_response.status),
|
||||
.redirected = this.result.redirected,
|
||||
.redirected = result.redirected,
|
||||
.body = .{
|
||||
.init = .{
|
||||
.headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
|
||||
.status_code = @as(u16, @truncate(http_response.status_code)),
|
||||
},
|
||||
.value = this.toBodyValue(),
|
||||
.value = this.toBodyValue(result),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn onResolve(this: *FetchTasklet) JSValue {
|
||||
pub fn onResolve(this: *FetchTasklet, result: *HTTPClientQueueResult) JSValue {
|
||||
log("onResolve", .{});
|
||||
const allocator = bun.default_allocator;
|
||||
var response = allocator.create(Response) catch unreachable;
|
||||
response.* = this.toResponse(allocator);
|
||||
response.* = this.toResponse(allocator, result);
|
||||
const response_js = Response.makeMaybePooled(@as(js.JSContextRef, this.global_this), response);
|
||||
response_js.ensureStillAlive();
|
||||
this.response = JSC.Strong.create(response_js, this.global_this);
|
||||
@@ -1234,8 +1275,8 @@ pub const Fetch = struct {
|
||||
var fetch_tasklet = try allocator.create(FetchTasklet);
|
||||
|
||||
fetch_tasklet.* = .{
|
||||
.mutex = Mutex.init(),
|
||||
.scheduled_response_buffer = .{
|
||||
.size_hint = 0,
|
||||
.scheduled_response_buffer = MutableString{
|
||||
.allocator = fetch_options.memory_reporter.allocator(),
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
@@ -1339,11 +1380,13 @@ pub const Fetch = struct {
|
||||
|
||||
pub fn abortListener(this: *FetchTasklet, reason: JSValue) void {
|
||||
log("abortListener", .{});
|
||||
const globalThis = this.global_this;
|
||||
reason.ensureStillAlive();
|
||||
this.abort_reason = reason;
|
||||
reason.protect();
|
||||
this.abort_reason = reason;
|
||||
|
||||
this.signal_store.aborted.store(true, .Monotonic);
|
||||
this.tracker.didCancel(this.global_this);
|
||||
this.tracker.didCancel(globalThis);
|
||||
|
||||
if (this.http != null) {
|
||||
HTTPClient.http_thread.scheduleShutdown(this.http.?);
|
||||
@@ -1396,10 +1439,7 @@ pub const Fetch = struct {
|
||||
}
|
||||
|
||||
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
|
||||
task.mutex.lock();
|
||||
defer task.mutex.unlock();
|
||||
log("callback success {} has_more {} bytes {}", .{ result.isSuccess(), result.has_more, result.body.?.list.items.len });
|
||||
task.result = result;
|
||||
|
||||
// metadata should be provided only once so we preserve it until we consume it
|
||||
if (result.metadata) |metadata| {
|
||||
@@ -1407,23 +1447,24 @@ pub const Fetch = struct {
|
||||
std.debug.assert(task.metadata == null);
|
||||
task.metadata = metadata;
|
||||
}
|
||||
task.body_size = result.body_size;
|
||||
|
||||
const success = result.isSuccess();
|
||||
task.response_buffer = result.body.?.*;
|
||||
if (success) {
|
||||
_ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM");
|
||||
}
|
||||
// reset for reuse
|
||||
var item = bun.default_allocator.create(HTTPClientQueueResult) catch @panic("OOM");
|
||||
item.* = .{
|
||||
.body_size = result.body_size,
|
||||
.has_more = result.has_more,
|
||||
.redirected = result.redirected,
|
||||
.fail = result.fail,
|
||||
.certificate_info = result.certificate_info,
|
||||
.body = MutableString.initCopy(bun.default_allocator, result.body.?.*.list.items) catch @panic("OOM"),
|
||||
};
|
||||
task.size_hint = item.getSizeHint();
|
||||
const is_empty = task.result_queue.isEmpty();
|
||||
task.result_queue.push(item);
|
||||
task.response_buffer.reset();
|
||||
|
||||
if (task.has_schedule_callback.compareAndSwap(false, true, .Acquire, .Monotonic)) |has_schedule_callback| {
|
||||
if (has_schedule_callback) {
|
||||
return;
|
||||
}
|
||||
if (is_empty) {
|
||||
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
|
||||
}
|
||||
|
||||
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -1324,10 +1324,6 @@ pub const InternalState = struct {
|
||||
reader.deinit();
|
||||
}
|
||||
|
||||
// if we are holding a cloned_metadata we need to deinit it
|
||||
// this should never happen because we should always return the metadata to the user
|
||||
std.debug.assert(this.cloned_metadata == null);
|
||||
// just in case we check and free to avoid leaks
|
||||
if (this.cloned_metadata != null) {
|
||||
this.cloned_metadata.?.deinit(allocator);
|
||||
this.cloned_metadata = null;
|
||||
@@ -2865,10 +2861,6 @@ fn fail(this: *HTTPClient, err: anyerror) void {
|
||||
if (this.signals.aborted != null) {
|
||||
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
|
||||
}
|
||||
|
||||
this.state.reset(this.allocator);
|
||||
this.proxy_tunneling = false;
|
||||
|
||||
this.state.request_stage = .fail;
|
||||
this.state.response_stage = .fail;
|
||||
this.state.fail = err;
|
||||
@@ -2876,6 +2868,9 @@ fn fail(this: *HTTPClient, err: anyerror) void {
|
||||
|
||||
const callback = this.result_callback;
|
||||
const result = this.toResult();
|
||||
this.state.reset(this.allocator);
|
||||
this.proxy_tunneling = false;
|
||||
|
||||
callback.run(result);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user