mirror of
https://github.com/oven-sh/bun
synced 2026-02-07 01:18:51 +00:00
Compare commits
16 Commits
dylan/pyth
...
ciro/fetch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6bd677f861 | ||
|
|
5ffc79e1bb | ||
|
|
28dc3a17bc | ||
|
|
b233b42045 | ||
|
|
f478f84d62 | ||
|
|
b8c281a313 | ||
|
|
a46702194b | ||
|
|
d748af1df8 | ||
|
|
ccda3dbff9 | ||
|
|
feefe6a7d0 | ||
|
|
27d69e91a8 | ||
|
|
97704c5819 | ||
|
|
6bcca4dcdd | ||
|
|
1cef837b1a | ||
|
|
bf58c78065 | ||
|
|
9e23b1c4b4 |
@@ -196,7 +196,7 @@ pub fn tickQueueWithCount(this: *EventLoop, virtual_machine: *VirtualMachine, co
|
||||
},
|
||||
@field(Task.Tag, @typeName(FetchTasklet)) => {
|
||||
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
|
||||
try fetch_task.onProgressUpdate();
|
||||
try fetch_task.updateLifeCycle();
|
||||
},
|
||||
@field(Task.Tag, @typeName(S3HttpSimpleTask)) => {
|
||||
var s3_task: *S3HttpSimpleTask = task.get(S3HttpSimpleTask).?;
|
||||
|
||||
@@ -355,13 +355,13 @@ pub const ResumableSinkBackpressure = enum {
|
||||
backpressure,
|
||||
done,
|
||||
};
|
||||
pub const ResumableFetchSink = ResumableSink(jsc.Codegen.JSResumableFetchSink, FetchTasklet, FetchTasklet.writeRequestData, FetchTasklet.writeEndRequest);
|
||||
pub const ResumableFetchSink = ResumableSink(jsc.Codegen.JSResumableFetchSink, FetchRequest, FetchRequest.writeRequestData, FetchRequest.writeEndRequest);
|
||||
pub const ResumableS3UploadSink = ResumableSink(jsc.Codegen.JSResumableS3UploadSink, S3UploadStreamWrapper, S3UploadStreamWrapper.writeRequestData, S3UploadStreamWrapper.writeEndRequest);
|
||||
|
||||
extern fn Bun__assignStreamIntoResumableSink(globalThis: *jsc.JSGlobalObject, stream: jsc.JSValue, sink: jsc.JSValue) jsc.JSValue;
|
||||
|
||||
const std = @import("std");
|
||||
const FetchTasklet = @import("./fetch.zig").FetchTasklet;
|
||||
const FetchRequest = @import("./fetch/tasklet/Request.zig");
|
||||
const S3UploadStreamWrapper = @import("../../s3/client.zig").S3UploadStreamWrapper;
|
||||
|
||||
const bun = @import("bun");
|
||||
|
||||
@@ -58,7 +58,7 @@ pub const fetch_type_error_strings: JSTypeErrorEnum = brk: {
|
||||
break :brk errors;
|
||||
};
|
||||
|
||||
pub const FetchTasklet = @import("./fetch/FetchTasklet.zig").FetchTasklet;
|
||||
pub const FetchTasklet = @import("./fetch/FetchTasklet.zig");
|
||||
|
||||
fn dataURLResponse(
|
||||
_data_url: DataURL,
|
||||
@@ -197,7 +197,7 @@ pub fn Bun__fetch_(
|
||||
// which is important for FormData.
|
||||
// https://github.com/oven-sh/bun/issues/2264
|
||||
//
|
||||
var body: FetchTasklet.HTTPRequestBody = FetchTasklet.HTTPRequestBody.Empty;
|
||||
var body: HTTPRequestBody = HTTPRequestBody.Empty;
|
||||
|
||||
var disable_timeout = false;
|
||||
var disable_keepalive = false;
|
||||
@@ -733,7 +733,7 @@ pub fn Bun__fetch_(
|
||||
if (options_object) |options| {
|
||||
if (try options.fastGet(globalThis, .body)) |body__| {
|
||||
if (!body__.isUndefined()) {
|
||||
break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(ctx, body__);
|
||||
break :extract_body try HTTPRequestBody.fromJS(ctx, body__);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -751,30 +751,30 @@ pub fn Bun__fetch_(
|
||||
|
||||
if (bodyValue.* == .Locked) {
|
||||
if (req.getBodyReadableStream(globalThis)) |readable| {
|
||||
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis) };
|
||||
break :extract_body HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis) };
|
||||
}
|
||||
if (bodyValue.Locked.readable.has()) {
|
||||
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(bodyValue.Locked.readable.get(globalThis).?, globalThis) };
|
||||
break :extract_body HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(bodyValue.Locked.readable.get(globalThis).?, globalThis) };
|
||||
}
|
||||
const readable = try bodyValue.toReadableStream(globalThis);
|
||||
if (!readable.isEmptyOrUndefinedOrNull() and bodyValue.* == .Locked and bodyValue.Locked.readable.has()) {
|
||||
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(bodyValue.Locked.readable.get(globalThis).?, globalThis) };
|
||||
break :extract_body HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(bodyValue.Locked.readable.get(globalThis).?, globalThis) };
|
||||
}
|
||||
}
|
||||
|
||||
break :extract_body FetchTasklet.HTTPRequestBody{ .AnyBlob = bodyValue.useAsAnyBlob() };
|
||||
break :extract_body HTTPRequestBody{ .AnyBlob = bodyValue.useAsAnyBlob() };
|
||||
}
|
||||
|
||||
if (request_init_object) |req| {
|
||||
if (try req.fastGet(globalThis, .body)) |body__| {
|
||||
if (!body__.isUndefined()) {
|
||||
break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(ctx, body__);
|
||||
break :extract_body try HTTPRequestBody.fromJS(ctx, body__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break :extract_body null;
|
||||
} orelse FetchTasklet.HTTPRequestBody.Empty;
|
||||
} orelse HTTPRequestBody.Empty;
|
||||
|
||||
if (globalThis.hasException()) {
|
||||
is_error = true;
|
||||
@@ -1371,7 +1371,7 @@ pub fn Bun__fetch_(
|
||||
body.detach();
|
||||
} else {
|
||||
// These are single-use, and have effectively been moved to the FetchTasklet.
|
||||
body = FetchTasklet.HTTPRequestBody.Empty;
|
||||
body = HTTPRequestBody.Empty;
|
||||
}
|
||||
proxy = null;
|
||||
url_proxy_buffer = "";
|
||||
@@ -1425,3 +1425,4 @@ const Response = jsc.WebCore.Response;
|
||||
|
||||
const Blob = jsc.WebCore.Blob;
|
||||
const AnyBlob = jsc.WebCore.Blob.Any;
|
||||
const HTTPRequestBody = @import("fetch/tasklet/HTTPRequestBody.zig").HTTPRequestBody;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
98
src/bun.js/webcore/fetch/tasklet/HTTPRequestBody.zig
Normal file
98
src/bun.js/webcore/fetch/tasklet/HTTPRequestBody.zig
Normal file
@@ -0,0 +1,98 @@
|
||||
pub const HTTPRequestBody = union(enum) {
|
||||
AnyBlob: AnyBlob,
|
||||
Sendfile: http.SendFile,
|
||||
ReadableStream: jsc.WebCore.ReadableStream.Strong,
|
||||
|
||||
pub const Empty: HTTPRequestBody = .{ .AnyBlob = .{ .Blob = .{} } };
|
||||
|
||||
pub fn store(this: *HTTPRequestBody) ?*Blob.Store {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => this.AnyBlob.store(),
|
||||
else => null,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn slice(this: *const HTTPRequestBody) []const u8 {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => this.AnyBlob.slice(),
|
||||
else => "",
|
||||
};
|
||||
}
|
||||
|
||||
pub fn detach(this: *HTTPRequestBody) void {
|
||||
switch (this.*) {
|
||||
.AnyBlob => this.AnyBlob.detach(),
|
||||
.ReadableStream => |*stream| {
|
||||
stream.deinit();
|
||||
},
|
||||
.Sendfile => {
|
||||
if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0)
|
||||
this.Sendfile.fd.close();
|
||||
this.Sendfile.offset = 0;
|
||||
this.Sendfile.remain = 0;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!HTTPRequestBody {
|
||||
var body_value = try Body.Value.fromJS(globalThis, value);
|
||||
if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed2(globalThis)))) {
|
||||
return globalThis.ERR(.BODY_ALREADY_USED, "body already used", .{}).throw();
|
||||
}
|
||||
if (body_value == .Locked) {
|
||||
if (body_value.Locked.readable.has()) {
|
||||
// just grab the ref
|
||||
return HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
|
||||
}
|
||||
const readable = try body_value.toReadableStream(globalThis);
|
||||
if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) {
|
||||
return HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
|
||||
}
|
||||
}
|
||||
return HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() };
|
||||
}
|
||||
|
||||
pub fn needsToReadFile(this: *HTTPRequestBody) bool {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => |blob| blob.needsToReadFile(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn isS3(this: *const HTTPRequestBody) bool {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => |*blob| blob.isS3(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasContentTypeFromUser(this: *HTTPRequestBody) bool {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => |blob| blob.hasContentTypeFromUser(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn getAnyBlob(this: *HTTPRequestBody) ?*AnyBlob {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => &this.AnyBlob,
|
||||
else => null,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasBody(this: *HTTPRequestBody) bool {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => |blob| blob.size() > 0,
|
||||
.ReadableStream => |*stream| stream.has(),
|
||||
.Sendfile => true,
|
||||
};
|
||||
}
|
||||
};
|
||||
const bun = @import("bun");
|
||||
const http = bun.http;
|
||||
const jsc = bun.jsc;
|
||||
const JSGlobalObject = jsc.JSGlobalObject;
|
||||
const JSValue = jsc.JSValue;
|
||||
const Body = bun.webcore.Body;
|
||||
const AnyBlob = jsc.WebCore.Blob.Any;
|
||||
const Blob = bun.webcore.Blob;
|
||||
151
src/bun.js/webcore/fetch/tasklet/Request.zig
Normal file
151
src/bun.js/webcore/fetch/tasklet/Request.zig
Normal file
@@ -0,0 +1,151 @@
|
||||
const FetchTaskletRequest = @This();
|
||||
request_body: ?HTTPRequestBody = null,
|
||||
request_headers: Headers = Headers{ .allocator = undefined },
|
||||
sink: ?*ResumableSink = null,
|
||||
|
||||
// Custom Hostname
|
||||
hostname: ?[]u8 = null,
|
||||
/// This is url + proxy memory buffer and is owned by FetchTasklet
|
||||
/// We always clone url and proxy (if informed)
|
||||
url_proxy_buffer: []const u8 = "",
|
||||
|
||||
state: RequestState = .created,
|
||||
|
||||
is_waiting_request_stream_start: bool = false,
|
||||
const RequestState = enum {
|
||||
created,
|
||||
enqueued,
|
||||
// information_headers,
|
||||
headers_sent,
|
||||
sending_body, // can be sent with the headers or separately
|
||||
// sending_trailer_headers,
|
||||
failed,
|
||||
done,
|
||||
};
|
||||
|
||||
fn parent(this: *FetchTaskletRequest) *FetchTasklet {
|
||||
return @fieldParentPtr("request", this);
|
||||
}
|
||||
|
||||
pub fn startRequestStream(this: *FetchTaskletRequest) void {
|
||||
this.is_waiting_request_stream_start = false;
|
||||
bun.assert(this.request_body != null and this.request_body.? == .ReadableStream);
|
||||
const tasklet = this.parent();
|
||||
const globalThis = tasklet.global_this;
|
||||
if (this.request_body.?.ReadableStream.get(globalThis)) |stream| {
|
||||
if (tasklet.isAborted()) {
|
||||
stream.abort(globalThis);
|
||||
return;
|
||||
}
|
||||
|
||||
tasklet.ref(); // lets only unref when sink is done
|
||||
// +1 because the task refs the sink
|
||||
const sink = ResumableSink.initExactRefs(globalThis, stream, this, 2);
|
||||
this.sink = sink;
|
||||
}
|
||||
}
|
||||
pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableSinkBackpressure {
|
||||
log("writeRequestData {}", .{data.len});
|
||||
const tasklet = this.parent();
|
||||
if (tasklet.isAborted()) {
|
||||
return .done;
|
||||
}
|
||||
const thread_safe_stream_buffer = tasklet.shared.request_body_streaming_buffer orelse return .done;
|
||||
const stream_buffer = thread_safe_stream_buffer.acquire();
|
||||
defer thread_safe_stream_buffer.release();
|
||||
const highWaterMark = if (this.sink) |sink| sink.highWaterMark else 16384;
|
||||
|
||||
var needs_schedule = false;
|
||||
defer if (needs_schedule) {
|
||||
// wakeup the http thread to write the data
|
||||
http.http_thread.scheduleRequestWrite(tasklet.http.?, .data);
|
||||
};
|
||||
|
||||
// dont have backpressure so we will schedule the data to be written
|
||||
// if we have backpressure the onWritable will drain the buffer
|
||||
needs_schedule = stream_buffer.isEmpty();
|
||||
if (tasklet.response.flags.upgraded_connection) {
|
||||
bun.handleOom(stream_buffer.write(data));
|
||||
} else {
|
||||
//16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n
|
||||
var formated_size_buffer: [18]u8 = undefined;
|
||||
const formated_size = std.fmt.bufPrint(
|
||||
formated_size_buffer[0..],
|
||||
"{x}\r\n",
|
||||
.{data.len},
|
||||
) catch |err| switch (err) {
|
||||
error.NoSpaceLeft => unreachable,
|
||||
};
|
||||
bun.handleOom(stream_buffer.ensureUnusedCapacity(formated_size.len + data.len + 2));
|
||||
stream_buffer.writeAssumeCapacity(formated_size);
|
||||
stream_buffer.writeAssumeCapacity(data);
|
||||
stream_buffer.writeAssumeCapacity("\r\n");
|
||||
}
|
||||
|
||||
// pause the stream if we hit the high water mark
|
||||
return if (stream_buffer.size() >= highWaterMark) .backpressure else .want_more;
|
||||
}
|
||||
|
||||
pub fn writeEndRequest(this: *FetchTaskletRequest, err: ?jsc.JSValue) void {
|
||||
log("writeEndRequest hasError? {}", .{err != null});
|
||||
const tasklet = this.parent();
|
||||
defer tasklet.deref();
|
||||
if (err) |jsError| {
|
||||
if (tasklet.isAborted()) {
|
||||
return;
|
||||
}
|
||||
if (!jsError.isUndefinedOrNull()) {
|
||||
tasklet.abort_reason.set(tasklet.global_this, jsError);
|
||||
}
|
||||
tasklet.abortTask();
|
||||
} else {
|
||||
if (!tasklet.response.flags.upgraded_connection) {
|
||||
// If is not upgraded we need to send the terminating chunk
|
||||
const thread_safe_stream_buffer = tasklet.shared.request_body_streaming_buffer orelse return;
|
||||
const stream_buffer = thread_safe_stream_buffer.acquire();
|
||||
defer thread_safe_stream_buffer.release();
|
||||
bun.handleOom(stream_buffer.write(http.end_of_chunked_http1_1_encoding_response_body));
|
||||
}
|
||||
if (tasklet.http) |http_| {
|
||||
// just tell to write the end of the chunked encoding aka 0\r\n\r\n
|
||||
http.http_thread.scheduleRequestWrite(http_, .end);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is ALWAYS called from the main thread
|
||||
pub fn resumeRequestDataStream(this: *FetchTaskletRequest) void {
|
||||
// deref when done because we ref inside SharedData.resumeRequestDataStream
|
||||
const tasklet = this.parent();
|
||||
defer tasklet.deref();
|
||||
if (tasklet.isAborted()) {
|
||||
// already aborted; nothing to drain
|
||||
return;
|
||||
}
|
||||
log("resumeRequestDataStream", .{});
|
||||
if (this.sink) |sink| {
|
||||
sink.drain();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(this: *FetchTaskletRequest) void {
|
||||
if (this.request_body) |body| {
|
||||
body.detach();
|
||||
this.request_body = null;
|
||||
}
|
||||
if (this.request_body_streaming_buffer) |buffer| {
|
||||
this.request_body_streaming_buffer = null;
|
||||
buffer.deref();
|
||||
}
|
||||
}
|
||||
|
||||
const HTTPRequestBody = @import("./HTTPRequestBody.zig").HTTPRequestBody;
|
||||
const std = @import("std");
|
||||
const bun = @import("bun");
|
||||
const jsc = bun.jsc;
|
||||
const http = bun.http;
|
||||
const Headers = http.Headers;
|
||||
const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure;
|
||||
const ResumableSink = jsc.WebCore.ResumableFetchSink;
|
||||
const log = bun.Output.scoped(.FetchTaskletRequest, .visible);
|
||||
const FetchTasklet = @import("../FetchTasklet.zig");
|
||||
425
src/bun.js/webcore/fetch/tasklet/Response.zig
Normal file
425
src/bun.js/webcore/fetch/tasklet/Response.zig
Normal file
@@ -0,0 +1,425 @@
|
||||
const Response = @This();
|
||||
|
||||
/// buffer used to stream response to JS
|
||||
scheduled_response_buffer: MutableString = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
},
|
||||
check_server_identity: jsc.Strong.Optional = .empty,
|
||||
flags: Flags = .{},
|
||||
/// stream strong ref if any is available
|
||||
readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{},
|
||||
/// response weak ref we need this to track the response JS lifetime
|
||||
response: jsc.Weak(FetchTasklet) = .{},
|
||||
/// native response ref if we still need it when JS is discarted
|
||||
native_response: ?*jsc.WebCore.Response = null,
|
||||
|
||||
/// For Http Client requests
|
||||
/// when Content-Length is provided this represents the whole size of the request
|
||||
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
|
||||
/// If is not chunked encoded and Content-Length is not provided this will be unknown
|
||||
body_size: http.HTTPClientResult.BodySize = .unknown,
|
||||
|
||||
state: ResponseState = .created,
|
||||
|
||||
const ResponseState = enum {
|
||||
created,
|
||||
enqueued,
|
||||
// information_headers,
|
||||
headers_received,
|
||||
receiving_body, // can be sent with the headers or separately
|
||||
// receiving_trailer_headers,
|
||||
failed,
|
||||
done,
|
||||
};
|
||||
|
||||
pub const Flags = packed struct(u8) {
|
||||
// most of these should be replaced with states instead
|
||||
ignore_data: bool = false,
|
||||
upgraded_connection: bool = false,
|
||||
reject_unauthorized: bool = true,
|
||||
is_waiting_body: bool = false,
|
||||
is_waiting_abort: bool = false,
|
||||
_padding: u3 = 0,
|
||||
};
|
||||
|
||||
fn parent(this: *Response) *FetchTasklet {
|
||||
return @fieldParentPtr("response", this);
|
||||
}
|
||||
|
||||
pub fn onReadableStreamAvailable(ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void {
|
||||
const this = bun.cast(*FetchTasklet, ctx);
|
||||
this.response.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis);
|
||||
}
|
||||
|
||||
pub fn checkServerIdentity(this: *Response, certificate_info: http.CertificateInfo) bool {
|
||||
const tasklet = this.parent();
|
||||
if (this.check_server_identity.get()) |check_server_identity| {
|
||||
check_server_identity.ensureStillAlive();
|
||||
if (certificate_info.cert.len > 0) {
|
||||
const cert = certificate_info.cert;
|
||||
var cert_ptr = cert.ptr;
|
||||
if (BoringSSL.d2i_X509(null, &cert_ptr, @intCast(cert.len))) |x509| {
|
||||
const globalObject = tasklet.global_this;
|
||||
defer x509.free();
|
||||
const js_cert = X509.toJS(x509, globalObject) catch |err| {
|
||||
switch (err) {
|
||||
error.JSError => {},
|
||||
error.OutOfMemory => globalObject.throwOutOfMemory() catch {},
|
||||
error.JSTerminated => {},
|
||||
}
|
||||
const check_result = globalObject.tryTakeException().?;
|
||||
// mark to wait until deinit
|
||||
this.flags.is_waiting_abort = tasklet.shared.result.has_more;
|
||||
tasklet.abort_reason.set(globalObject, check_result);
|
||||
tasklet.shared.signal_store.aborted.store(true, .monotonic);
|
||||
tasklet.tracker.didCancel(tasklet.global_this);
|
||||
// we need to abort the request
|
||||
if (tasklet.http) |http_| http.http_thread.scheduleShutdown(http_);
|
||||
tasklet.shared.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
|
||||
return false;
|
||||
};
|
||||
var hostname: bun.String = bun.String.cloneUTF8(certificate_info.hostname);
|
||||
defer hostname.deref();
|
||||
const js_hostname = hostname.toJS(globalObject);
|
||||
js_hostname.ensureStillAlive();
|
||||
js_cert.ensureStillAlive();
|
||||
const check_result = check_server_identity.call(globalObject, .js_undefined, &.{ js_hostname, js_cert }) catch |err| globalObject.takeException(err);
|
||||
|
||||
// > Returns <Error> object [...] on failure
|
||||
if (check_result.isAnyError()) {
|
||||
// mark to wait until deinit
|
||||
this.flags.is_waiting_abort = tasklet.shared.result.has_more;
|
||||
tasklet.abort_reason.set(globalObject, check_result);
|
||||
tasklet.shared.signal_store.aborted.store(true, .monotonic);
|
||||
tasklet.tracker.didCancel(tasklet.global_this);
|
||||
|
||||
// we need to abort the request
|
||||
if (tasklet.http) |http_| {
|
||||
http.http_thread.scheduleShutdown(http_);
|
||||
}
|
||||
tasklet.shared.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
|
||||
return false;
|
||||
}
|
||||
|
||||
// > On success, returns <undefined>
|
||||
// We treat any non-error value as a success.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
tasklet.shared.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID;
|
||||
return false;
|
||||
}
|
||||
|
||||
fn getCurrentResponse(this: *Response) ?*jsc.WebCore.Response {
|
||||
// we need a body to resolve the promise when buffering
|
||||
if (this.native_response) |response| {
|
||||
return response;
|
||||
}
|
||||
|
||||
// if we did not have a direct reference we check if the Weak ref is still alive
|
||||
if (this.response.get()) |response_js| {
|
||||
if (response_js.as(jsc.WebCore.Response)) |response| {
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn onBodyReceived(this: *Response) bun.JSTerminated!void {
|
||||
const tasklet = this.parent();
|
||||
const success = tasklet.shared.result.isSuccess();
|
||||
const globalThis = tasklet.global_this;
|
||||
// reset the buffer if we are streaming or if we are not waiting for bufferig anymore
|
||||
var buffer_reset = true;
|
||||
log("onBodyReceived success={} has_more={}", .{ success, tasklet.shared.result.has_more });
|
||||
defer {
|
||||
if (buffer_reset) {
|
||||
this.scheduled_response_buffer.reset();
|
||||
}
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
var err = tasklet.onReject();
|
||||
var need_deinit = true;
|
||||
defer if (need_deinit) err.deinit();
|
||||
var js_err = JSValue.zero;
|
||||
// if we are streaming update with error
|
||||
if (this.readable_stream_ref.get(globalThis)) |readable| {
|
||||
if (readable.ptr == .Bytes) {
|
||||
js_err = err.toJS(globalThis);
|
||||
js_err.ensureStillAlive();
|
||||
try readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.err = .{ .JSValue = js_err },
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
}
|
||||
}
|
||||
if (tasklet.request.sink) |sink| {
|
||||
if (js_err == .zero) {
|
||||
js_err = err.toJS(globalThis);
|
||||
js_err.ensureStillAlive();
|
||||
}
|
||||
sink.cancel(js_err);
|
||||
return;
|
||||
}
|
||||
// if we are buffering resolve the promise
|
||||
if (this.getCurrentResponse()) |response| {
|
||||
need_deinit = false; // body value now owns the error
|
||||
const body = response.getBodyValue();
|
||||
try body.toErrorInstance(err, globalThis);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.readable_stream_ref.get(globalThis)) |readable| {
|
||||
log("onBodyReceived readable_stream_ref", .{});
|
||||
if (readable.ptr == .Bytes) {
|
||||
readable.ptr.Bytes.size_hint = this.getSizeHint();
|
||||
// body can be marked as used but we still need to pipe the data
|
||||
const scheduled_response_buffer = &this.scheduled_response_buffer.list;
|
||||
|
||||
const chunk = scheduled_response_buffer.items;
|
||||
|
||||
if (tasklet.shared.result.has_more) {
|
||||
try readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
} else {
|
||||
var prev = this.readable_stream_ref;
|
||||
this.readable_stream_ref = .{};
|
||||
defer prev.deinit();
|
||||
buffer_reset = false;
|
||||
|
||||
try readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.getCurrentResponse()) |response| {
|
||||
log("onBodyReceived Current Response", .{});
|
||||
const sizeHint = this.getSizeHint();
|
||||
response.setSizeHint(sizeHint);
|
||||
if (response.getBodyReadableStream(globalThis)) |readable| {
|
||||
log("onBodyReceived CurrentResponse BodyReadableStream", .{});
|
||||
if (readable.ptr == .Bytes) {
|
||||
const scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
|
||||
const chunk = scheduled_response_buffer.items;
|
||||
|
||||
if (tasklet.shared.result.has_more) {
|
||||
try readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
} else {
|
||||
readable.value.ensureStillAlive();
|
||||
response.detachReadableStream(globalThis);
|
||||
try readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk),
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// we will reach here when not streaming, this is also the only case we dont wanna to reset the buffer
|
||||
buffer_reset = false;
|
||||
if (!tasklet.shared.result.has_more) {
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
const body = response.getBodyValue();
|
||||
// done resolve body
|
||||
var old = body.*;
|
||||
const body_value = Body.Value{
|
||||
.InternalBlob = .{
|
||||
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
|
||||
},
|
||||
};
|
||||
body.* = body_value;
|
||||
log("onBodyReceived body_value length={}", .{body_value.InternalBlob.bytes.items.len});
|
||||
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
};
|
||||
|
||||
if (old == .Locked) {
|
||||
log("onBodyReceived old.resolve", .{});
|
||||
try old.resolve(body, tasklet.global_this, response.getFetchHeaders());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onStartStreamingHTTPResponseBodyCallback(ctx: *anyopaque) jsc.WebCore.DrainResult {
|
||||
const this = bun.cast(*FetchTasklet, ctx);
|
||||
if (this.shared.signal_store.aborted.load(.monotonic)) {
|
||||
return jsc.WebCore.DrainResult{
|
||||
.aborted = {},
|
||||
};
|
||||
}
|
||||
|
||||
if (this.http) |http_| {
|
||||
http_.enableResponseBodyStreaming();
|
||||
|
||||
// If the server sent the headers and the response body in two separate socket writes
|
||||
// and if the server doesn't close the connection by itself
|
||||
// and doesn't send any follow-up data
|
||||
// then we must make sure the HTTP thread flushes.
|
||||
bun.http.http_thread.scheduleResponseBodyDrain(http_.async_http_id);
|
||||
}
|
||||
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
const size_hint = this.response.getSizeHint();
|
||||
|
||||
var scheduled_response_buffer = this.response.scheduled_response_buffer.list;
|
||||
// This means we have received part of the body but not the whole thing
|
||||
if (scheduled_response_buffer.items.len > 0) {
|
||||
this.response.scheduled_response_buffer = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
};
|
||||
|
||||
return .{
|
||||
.owned = .{
|
||||
.list = scheduled_response_buffer.toManaged(bun.default_allocator),
|
||||
.size_hint = size_hint,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return .{
|
||||
.estimated_size = size_hint,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn getSizeHint(this: *Response) Blob.SizeType {
|
||||
return switch (this.body_size) {
|
||||
.content_length => @truncate(this.body_size.content_length),
|
||||
.total_received => @truncate(this.body_size.total_received),
|
||||
.unknown => 0,
|
||||
};
|
||||
}
|
||||
|
||||
fn toBodyValue(this: *Response) Body.Value {
|
||||
const tasklet = this.parent();
|
||||
if (tasklet.getAbortError()) |err| {
|
||||
return .{ .Error = err };
|
||||
}
|
||||
if (this.flags.is_waiting_body) {
|
||||
const response = Body.Value{
|
||||
.Locked = .{
|
||||
.size_hint = this.getSizeHint(),
|
||||
.task = this,
|
||||
.global = tasklet.global_this,
|
||||
.onStartStreaming = onStartStreamingHTTPResponseBodyCallback,
|
||||
.onReadableStreamAvailable = onReadableStreamAvailable,
|
||||
},
|
||||
};
|
||||
return response;
|
||||
}
|
||||
|
||||
var scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
const response = Body.Value{
|
||||
.InternalBlob = .{
|
||||
.bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
|
||||
},
|
||||
};
|
||||
this.scheduled_response_buffer = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
};
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
pub fn ignoreRemainingResponseBody(this: *Response) void {
|
||||
log("ignoreRemainingResponseBody", .{});
|
||||
const tasklet = this.parent();
|
||||
// enabling streaming will make the http thread to drain into the main thread (aka stop buffering)
|
||||
// without a stream ref, response body or response instance alive it will just ignore the result
|
||||
if (tasklet.http) |http_| {
|
||||
http_.enableResponseBodyStreaming();
|
||||
}
|
||||
// we should not keep the process alive if we are ignoring the body
|
||||
const vm = tasklet.javascript_vm;
|
||||
tasklet.poll_ref.unref(vm);
|
||||
// clean any remaining refereces
|
||||
this.readable_stream_ref.deinit();
|
||||
this.response.deinit();
|
||||
|
||||
if (this.native_response) |response| {
|
||||
response.unref();
|
||||
this.native_response = null;
|
||||
}
|
||||
|
||||
this.flags.ignore_data = true;
|
||||
}
|
||||
|
||||
pub fn toResponse(this: *Response) jsc.WebCore.Response {
|
||||
log("toResponse", .{});
|
||||
const tasklet = this.parent();
|
||||
bun.assert(tasklet.shared.metadata != null);
|
||||
// at this point we always should have metadata
|
||||
const metadata = tasklet.shared.metadata.?;
|
||||
const http_response = metadata.response;
|
||||
this.flags.is_waiting_body = tasklet.shared.result.has_more;
|
||||
return jsc.WebCore.Response.init(
|
||||
.{
|
||||
.headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
|
||||
.status_code = @as(u16, @truncate(http_response.status_code)),
|
||||
.status_text = bun.String.createAtomIfPossible(http_response.status),
|
||||
},
|
||||
Body{
|
||||
.value = this.toBodyValue(),
|
||||
},
|
||||
bun.String.createAtomIfPossible(metadata.url),
|
||||
tasklet.shared.result.redirected,
|
||||
);
|
||||
}
|
||||
|
||||
const bun = @import("bun");
|
||||
const MutableString = bun.MutableString;
|
||||
const jsc = bun.jsc;
|
||||
const http = bun.http;
|
||||
pub const ResumableSink = jsc.WebCore.ResumableFetchSink;
|
||||
const FetchTasklet = @import("../FetchTasklet.zig");
|
||||
const log = bun.Output.scoped(.FetchTaskletResponse, .visible);
|
||||
const BoringSSL = bun.BoringSSL.c;
|
||||
const FetchHeaders = bun.webcore.FetchHeaders;
|
||||
const Body = jsc.WebCore.Body;
|
||||
const X509 = @import("../../../api/bun/x509.zig");
|
||||
const JSValue = jsc.JSValue;
|
||||
const Blob = jsc.WebCore.Blob;
|
||||
32
src/bun.js/webcore/fetch/tasklet/SharedData.zig
Normal file
32
src/bun.js/webcore/fetch/tasklet/SharedData.zig
Normal file
@@ -0,0 +1,32 @@
|
||||
const SharedData = @This();
|
||||
mutex: bun.Mutex = .{},
|
||||
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
|
||||
|
||||
/// buffer being used by AsyncHTTP
|
||||
response_buffer: bun.MutableString = undefined,
|
||||
request_body_streaming_buffer: ?*bun.http.ThreadSafeStreamBuffer = null,
|
||||
result: bun.http.HTTPClientResult = .{},
|
||||
metadata: ?bun.http.HTTPResponseMetadata = null,
|
||||
|
||||
signals: bun.http.Signals = .{},
|
||||
signal_store: bun.http.Signals.Store = .{},
|
||||
has_schedule_callback: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
|
||||
fn parent(this: *SharedData) *FetchTasklet {
|
||||
return @fieldParentPtr("shared", this);
|
||||
}
|
||||
|
||||
/// This is ALWAYS called from the http thread and we cannot touch the buffer here because is locked
|
||||
pub fn resumeRequestDataStream(this: *SharedData) void {
|
||||
// ref until the main thread callback is called
|
||||
const tasklet = this.parent();
|
||||
tasklet.ref();
|
||||
tasklet.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(&tasklet.request, FetchTaskletRequest.resumeRequestDataStream));
|
||||
}
|
||||
|
||||
const std = @import("std");
|
||||
const bun = @import("bun");
|
||||
const jsc = bun.jsc;
|
||||
const FetchTasklet = @import("../FetchTasklet.zig");
|
||||
const FetchTaskletRequest = @import("./Request.zig");
|
||||
const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure;
|
||||
Reference in New Issue
Block a user