diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index 3150f4e431..7f4e8be9ab 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -58,1342 +58,7 @@ pub const fetch_type_error_strings: JSTypeErrorEnum = brk: { break :brk errors; }; -pub const FetchTasklet = struct { - pub const ResumableSink = jsc.WebCore.ResumableFetchSink; - - const log = Output.scoped(.FetchTasklet, .visible); - sink: ?*ResumableSink = null, - http: ?*http.AsyncHTTP = null, - result: http.HTTPClientResult = .{}, - metadata: ?http.HTTPResponseMetadata = null, - javascript_vm: *VirtualMachine = undefined, - global_this: *JSGlobalObject = undefined, - request_body: HTTPRequestBody = undefined, - request_body_streaming_buffer: ?*http.ThreadSafeStreamBuffer = null, - - /// buffer being used by AsyncHTTP - response_buffer: MutableString = undefined, - /// buffer used to stream response to JS - scheduled_response_buffer: MutableString = undefined, - /// response weak ref we need this to track the response JS lifetime - response: jsc.Weak(FetchTasklet) = .{}, - /// native response ref if we still need it when JS is discarted - native_response: ?*Response = null, - ignore_data: bool = false, - /// stream strong ref if any is available - readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{}, - request_headers: Headers = Headers{ .allocator = undefined }, - promise: jsc.JSPromise.Strong, - concurrent_task: jsc.ConcurrentTask = .{}, - poll_ref: Async.KeepAlive = .{}, - /// For Http Client requests - /// when Content-Length is provided this represents the whole size of the request - /// If chunked encoded this will represent the total received size (ignoring the chunk headers) - /// If is not chunked encoded and Content-Length is not provided this will be unknown - body_size: http.HTTPClientResult.BodySize = .unknown, - - /// This is url + proxy memory buffer and is owned by FetchTasklet - /// We always clone url and proxy (if informed) - url_proxy_buffer: []const u8 = "", - - signal: ?*jsc.WebCore.AbortSignal = null, - signals: http.Signals = .{}, - signal_store: http.Signals.Store = .{}, - has_schedule_callback: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - - // must be stored because AbortSignal stores reason weakly - abort_reason: jsc.Strong.Optional = .empty, - - // custom checkServerIdentity - check_server_identity: jsc.Strong.Optional = .empty, - reject_unauthorized: bool = true, - upgraded_connection: bool = false, - // Custom Hostname - hostname: ?[]u8 = null, - is_waiting_body: bool = false, - is_waiting_abort: bool = false, - is_waiting_request_stream_start: bool = false, - mutex: Mutex, - - tracker: jsc.Debugger.AsyncTaskTracker, - - ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1), - - pub fn ref(this: *FetchTasklet) void { - const count = this.ref_count.fetchAdd(1, .monotonic); - bun.debugAssert(count > 0); - } - - pub fn deref(this: *FetchTasklet) void { - const count = this.ref_count.fetchSub(1, .monotonic); - bun.debugAssert(count > 0); - - if (count == 1) { - this.deinit() catch |err| switch (err) {}; - } - } - - pub fn derefFromThread(this: *FetchTasklet) void { - const count = this.ref_count.fetchSub(1, .monotonic); - bun.debugAssert(count > 0); - - if (count == 1) { - // this is really unlikely to happen, but can happen - // lets make sure that we always call deinit from main thread - - this.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.deinit)); - } - } - - pub const HTTPRequestBody = union(enum) { - AnyBlob: AnyBlob, - Sendfile: http.SendFile, - ReadableStream: jsc.WebCore.ReadableStream.Strong, - - pub const Empty: HTTPRequestBody = .{ .AnyBlob = .{ .Blob = .{} } }; - - pub fn store(this: *HTTPRequestBody) ?*Blob.Store { - return switch (this.*) { - .AnyBlob => this.AnyBlob.store(), - else => null, - }; - } - - pub fn slice(this: *const HTTPRequestBody) []const u8 { - return switch (this.*) { - .AnyBlob => this.AnyBlob.slice(), - else => "", - }; - } - - pub fn detach(this: *HTTPRequestBody) void { - switch (this.*) { - .AnyBlob => this.AnyBlob.detach(), - .ReadableStream => |*stream| { - stream.deinit(); - }, - .Sendfile => { - if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0) - this.Sendfile.fd.close(); - this.Sendfile.offset = 0; - this.Sendfile.remain = 0; - }, - } - } - - pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!HTTPRequestBody { - var body_value = try Body.Value.fromJS(globalThis, value); - if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed2(globalThis)))) { - return globalThis.ERR(.BODY_ALREADY_USED, "body already used", .{}).throw(); - } - if (body_value == .Locked) { - if (body_value.Locked.readable.has()) { - // just grab the ref - return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable }; - } - const readable = try body_value.toReadableStream(globalThis); - if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) { - return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable }; - } - } - return FetchTasklet.HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() }; - } - - pub fn needsToReadFile(this: *HTTPRequestBody) bool { - return switch (this.*) { - .AnyBlob => |blob| blob.needsToReadFile(), - else => false, - }; - } - - pub fn isS3(this: *const HTTPRequestBody) bool { - return switch (this.*) { - .AnyBlob => |*blob| blob.isS3(), - else => false, - }; - } - - pub fn hasContentTypeFromUser(this: *HTTPRequestBody) bool { - return switch (this.*) { - .AnyBlob => |blob| blob.hasContentTypeFromUser(), - else => false, - }; - } - - pub fn getAnyBlob(this: *HTTPRequestBody) ?*AnyBlob { - return switch (this.*) { - .AnyBlob => &this.AnyBlob, - else => null, - }; - } - - pub fn hasBody(this: *HTTPRequestBody) bool { - return switch (this.*) { - .AnyBlob => |blob| blob.size() > 0, - .ReadableStream => |*stream| stream.has(), - .Sendfile => true, - }; - } - }; - - pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet { - return FetchTasklet{}; - } - - fn clearSink(this: *FetchTasklet) void { - if (this.sink) |sink| { - this.sink = null; - sink.deref(); - } - if (this.request_body_streaming_buffer) |buffer| { - this.request_body_streaming_buffer = null; - buffer.clearDrainCallback(); - buffer.deref(); - } - } - - fn clearData(this: *FetchTasklet) void { - log("clearData ", .{}); - const allocator = bun.default_allocator; - if (this.url_proxy_buffer.len > 0) { - allocator.free(this.url_proxy_buffer); - this.url_proxy_buffer.len = 0; - } - - if (this.hostname) |hostname| { - allocator.free(hostname); - this.hostname = null; - } - - if (this.result.certificate_info) |*certificate| { - certificate.deinit(bun.default_allocator); - this.result.certificate_info = null; - } - - this.request_headers.entries.deinit(allocator); - this.request_headers.buf.deinit(allocator); - this.request_headers = Headers{ .allocator = undefined }; - - if (this.http) |http_| { - http_.clearData(); - } - - if (this.metadata != null) { - this.metadata.?.deinit(allocator); - this.metadata = null; - } - - this.response_buffer.deinit(); - this.response.deinit(); - if (this.native_response) |response| { - this.native_response = null; - - response.unref(); - } - - this.readable_stream_ref.deinit(); - - this.scheduled_response_buffer.deinit(); - if (this.request_body != .ReadableStream or this.is_waiting_request_stream_start) { - this.request_body.detach(); - } - - this.abort_reason.deinit(); - this.check_server_identity.deinit(); - this.clearAbortSignal(); - // Clear the sink only after the requested ended otherwise we would potentialy lose the last chunk - this.clearSink(); - } - - // XXX: 'fn (*FetchTasklet) error{}!void' coerces to 'fn (*FetchTasklet) bun.JSError!void' but 'fn (*FetchTasklet) void' does not - pub fn deinit(this: *FetchTasklet) error{}!void { - log("deinit", .{}); - - bun.assert(this.ref_count.load(.monotonic) == 0); - - this.clearData(); - - const allocator = bun.default_allocator; - - if (this.http) |http_| { - this.http = null; - allocator.destroy(http_); - } - allocator.destroy(this); - } - - fn getCurrentResponse(this: *FetchTasklet) ?*Response { - // we need a body to resolve the promise when buffering - if (this.native_response) |response| { - return response; - } - - // if we did not have a direct reference we check if the Weak ref is still alive - if (this.response.get()) |response_js| { - if (response_js.as(Response)) |response| { - return response; - } - } - - return null; - } - - pub fn startRequestStream(this: *FetchTasklet) void { - this.is_waiting_request_stream_start = false; - bun.assert(this.request_body == .ReadableStream); - if (this.request_body.ReadableStream.get(this.global_this)) |stream| { - if (this.signal) |signal| { - if (signal.aborted()) { - stream.abort(this.global_this); - return; - } - } - - const globalThis = this.global_this; - this.ref(); // lets only unref when sink is done - // +1 because the task refs the sink - const sink = ResumableSink.initExactRefs(globalThis, stream, this, 2); - this.sink = sink; - } - } - - pub fn onBodyReceived(this: *FetchTasklet) bun.JSTerminated!void { - const success = this.result.isSuccess(); - const globalThis = this.global_this; - // reset the buffer if we are streaming or if we are not waiting for bufferig anymore - var buffer_reset = true; - log("onBodyReceived success={} has_more={}", .{ success, this.result.has_more }); - defer { - if (buffer_reset) { - this.scheduled_response_buffer.reset(); - } - } - - if (!success) { - var err = this.onReject(); - var need_deinit = true; - defer if (need_deinit) err.deinit(); - var js_err = JSValue.zero; - // if we are streaming update with error - if (this.readable_stream_ref.get(globalThis)) |readable| { - if (readable.ptr == .Bytes) { - js_err = err.toJS(globalThis); - js_err.ensureStillAlive(); - try readable.ptr.Bytes.onData( - .{ - .err = .{ .JSValue = js_err }, - }, - bun.default_allocator, - ); - } - } - if (this.sink) |sink| { - if (js_err == .zero) { - js_err = err.toJS(globalThis); - js_err.ensureStillAlive(); - } - sink.cancel(js_err); - return; - } - // if we are buffering resolve the promise - if (this.getCurrentResponse()) |response| { - need_deinit = false; // body value now owns the error - const body = response.getBodyValue(); - try body.toErrorInstance(err, globalThis); - } - return; - } - - if (this.readable_stream_ref.get(globalThis)) |readable| { - log("onBodyReceived readable_stream_ref", .{}); - if (readable.ptr == .Bytes) { - readable.ptr.Bytes.size_hint = this.getSizeHint(); - // body can be marked as used but we still need to pipe the data - const scheduled_response_buffer = &this.scheduled_response_buffer.list; - - const chunk = scheduled_response_buffer.items; - - if (this.result.has_more) { - try readable.ptr.Bytes.onData( - .{ - .temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk), - }, - bun.default_allocator, - ); - } else { - var prev = this.readable_stream_ref; - this.readable_stream_ref = .{}; - defer prev.deinit(); - buffer_reset = false; - - try readable.ptr.Bytes.onData( - .{ - .temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk), - }, - bun.default_allocator, - ); - } - return; - } - } - - if (this.getCurrentResponse()) |response| { - log("onBodyReceived Current Response", .{}); - const sizeHint = this.getSizeHint(); - response.setSizeHint(sizeHint); - if (response.getBodyReadableStream(globalThis)) |readable| { - log("onBodyReceived CurrentResponse BodyReadableStream", .{}); - if (readable.ptr == .Bytes) { - const scheduled_response_buffer = this.scheduled_response_buffer.list; - - const chunk = scheduled_response_buffer.items; - - if (this.result.has_more) { - try readable.ptr.Bytes.onData( - .{ - .temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk), - }, - bun.default_allocator, - ); - } else { - readable.value.ensureStillAlive(); - response.detachReadableStream(globalThis); - try readable.ptr.Bytes.onData( - .{ - .temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk), - }, - bun.default_allocator, - ); - } - - return; - } - } - - // we will reach here when not streaming, this is also the only case we dont wanna to reset the buffer - buffer_reset = false; - if (!this.result.has_more) { - var scheduled_response_buffer = this.scheduled_response_buffer.list; - const body = response.getBodyValue(); - // done resolve body - var old = body.*; - const body_value = Body.Value{ - .InternalBlob = .{ - .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), - }, - }; - body.* = body_value; - log("onBodyReceived body_value length={}", .{body_value.InternalBlob.bytes.items.len}); - - this.scheduled_response_buffer = .{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }; - - if (old == .Locked) { - log("onBodyReceived old.resolve", .{}); - try old.resolve(body, this.global_this, response.getFetchHeaders()); - } - } - } - } - - pub fn onProgressUpdate(this: *FetchTasklet) bun.JSTerminated!void { - jsc.markBinding(@src()); - log("onProgressUpdate", .{}); - this.mutex.lock(); - this.has_schedule_callback.store(false, .monotonic); - const is_done = !this.result.has_more; - - const vm = this.javascript_vm; - // vm is shutting down we cannot touch JS - if (vm.isShuttingDown()) { - this.mutex.unlock(); - if (is_done) { - this.deref(); - } - return; - } - - const globalThis = this.global_this; - defer { - this.mutex.unlock(); - // if we are not done we wait until the next call - if (is_done) { - var poll_ref = this.poll_ref; - this.poll_ref = .{}; - poll_ref.unref(vm); - this.deref(); - } - } - if (this.is_waiting_request_stream_start and this.result.can_stream) { - // start streaming - this.startRequestStream(); - } - // if we already respond the metadata and still need to process the body - if (this.is_waiting_body) { - try this.onBodyReceived(); - return; - } - if (this.metadata == null and this.result.isSuccess()) return; - - // if we abort because of cert error - // we wait the Http Client because we already have the response - // we just need to deinit - if (this.is_waiting_abort) { - return; - } - const promise_value = this.promise.valueOrEmpty(); - - if (promise_value.isEmptyOrUndefinedOrNull()) { - log("onProgressUpdate: promise_value is null", .{}); - this.promise.deinit(); - return; - } - - if (this.result.certificate_info) |certificate_info| { - this.result.certificate_info = null; - defer certificate_info.deinit(bun.default_allocator); - - // we receive some error - if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info)) { - log("onProgressUpdate: aborted due certError", .{}); - // we need to abort the request - const promise = promise_value.asAnyPromise().?; - const tracker = this.tracker; - var result = this.onReject(); - defer result.deinit(); - - promise_value.ensureStillAlive(); - try promise.reject(globalThis, result.toJS(globalThis)); - - tracker.didDispatch(globalThis); - this.promise.deinit(); - return; - } - // everything ok - if (this.metadata == null) { - log("onProgressUpdate: metadata is null", .{}); - return; - } - } - - const tracker = this.tracker; - tracker.willDispatch(globalThis); - defer { - log("onProgressUpdate: promise_value is not null", .{}); - tracker.didDispatch(globalThis); - this.promise.deinit(); - } - const success = this.result.isSuccess(); - const result = switch (success) { - true => jsc.Strong.Optional.create(this.onResolve(), globalThis), - false => brk: { - // in this case we wanna a jsc.Strong.Optional so we just convert it - var value = this.onReject(); - const err = value.toJS(globalThis); - if (this.sink) |sink| { - sink.cancel(err); - } - break :brk value.JSValue; - }, - }; - - promise_value.ensureStillAlive(); - const Holder = struct { - held: jsc.Strong.Optional, - promise: jsc.Strong.Optional, - globalObject: *jsc.JSGlobalObject, - task: jsc.AnyTask, - - pub fn resolve(self: *@This()) bun.JSTerminated!void { - // cleanup - defer bun.default_allocator.destroy(self); - defer self.held.deinit(); - defer self.promise.deinit(); - // resolve the promise - var prom = self.promise.swap().asAnyPromise().?; - const res = self.held.swap(); - res.ensureStillAlive(); - try prom.resolve(self.globalObject, res); - } - - pub fn reject(self: *@This()) bun.JSTerminated!void { - // cleanup - defer bun.default_allocator.destroy(self); - defer self.held.deinit(); - defer self.promise.deinit(); - - // reject the promise - var prom = self.promise.swap().asAnyPromise().?; - const res = self.held.swap(); - res.ensureStillAlive(); - try prom.reject(self.globalObject, res); - } - }; - var holder = bun.handleOom(bun.default_allocator.create(Holder)); - holder.* = .{ - .held = result, - // we need the promise to be alive until the task is done - .promise = this.promise.strong, - .globalObject = globalThis, - .task = undefined, - }; - this.promise.strong = .empty; - holder.task = switch (success) { - true => jsc.AnyTask.New(Holder, Holder.resolve).init(holder), - false => jsc.AnyTask.New(Holder, Holder.reject).init(holder), - }; - - vm.enqueueTask(jsc.Task.init(&holder.task)); - } - - pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: http.CertificateInfo) bool { - if (this.check_server_identity.get()) |check_server_identity| { - check_server_identity.ensureStillAlive(); - if (certificate_info.cert.len > 0) { - const cert = certificate_info.cert; - var cert_ptr = cert.ptr; - if (BoringSSL.d2i_X509(null, &cert_ptr, @intCast(cert.len))) |x509| { - const globalObject = this.global_this; - defer x509.free(); - const js_cert = X509.toJS(x509, globalObject) catch |err| { - switch (err) { - error.JSError => {}, - error.OutOfMemory => globalObject.throwOutOfMemory() catch {}, - error.JSTerminated => {}, - } - const check_result = globalObject.tryTakeException().?; - // mark to wait until deinit - this.is_waiting_abort = this.result.has_more; - this.abort_reason.set(globalObject, check_result); - this.signal_store.aborted.store(true, .monotonic); - this.tracker.didCancel(this.global_this); - // we need to abort the request - if (this.http) |http_| http.http_thread.scheduleShutdown(http_); - this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; - return false; - }; - var hostname: bun.String = bun.String.cloneUTF8(certificate_info.hostname); - defer hostname.deref(); - const js_hostname = hostname.toJS(globalObject); - js_hostname.ensureStillAlive(); - js_cert.ensureStillAlive(); - const check_result = check_server_identity.call(globalObject, .js_undefined, &.{ js_hostname, js_cert }) catch |err| globalObject.takeException(err); - - // > Returns object [...] on failure - if (check_result.isAnyError()) { - // mark to wait until deinit - this.is_waiting_abort = this.result.has_more; - this.abort_reason.set(globalObject, check_result); - this.signal_store.aborted.store(true, .monotonic); - this.tracker.didCancel(this.global_this); - - // we need to abort the request - if (this.http) |http_| { - http.http_thread.scheduleShutdown(http_); - } - this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; - return false; - } - - // > On success, returns - // We treat any non-error value as a success. - return true; - } - } - } - this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; - return false; - } - - fn getAbortError(this: *FetchTasklet) ?Body.Value.ValueError { - if (this.abort_reason.has()) { - defer this.clearAbortSignal(); - const out = this.abort_reason; - - this.abort_reason = .empty; - return Body.Value.ValueError{ .JSValue = out }; - } - - if (this.signal) |signal| { - if (signal.reasonIfAborted(this.global_this)) |reason| { - defer this.clearAbortSignal(); - return reason.toBodyValueError(this.global_this); - } - } - - return null; - } - - fn clearAbortSignal(this: *FetchTasklet) void { - const signal = this.signal orelse return; - this.signal = null; - defer { - signal.pendingActivityUnref(); - signal.unref(); - } - - signal.cleanNativeBindings(this); - } - - pub fn onReject(this: *FetchTasklet) Body.Value.ValueError { - bun.assert(this.result.fail != null); - log("onReject", .{}); - - if (this.getAbortError()) |err| { - return err; - } - - if (this.result.abortReason()) |reason| { - return .{ .AbortReason = reason }; - } - - // some times we don't have metadata so we also check http.url - const path = if (this.metadata) |metadata| - bun.String.cloneUTF8(metadata.url) - else if (this.http) |http_| - bun.String.cloneUTF8(http_.url.href) - else - bun.String.empty; - - const fetch_error = jsc.SystemError{ - .code = bun.String.static(switch (this.result.fail.?) { - error.ConnectionClosed => "ECONNRESET", - else => |e| @errorName(e), - }), - .message = switch (this.result.fail.?) { - error.ConnectionClosed => bun.String.static("The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()"), - error.FailedToOpenSocket => bun.String.static("Was there a typo in the url or port?"), - error.TooManyRedirects => bun.String.static("The response redirected too many times. For more information, pass `verbose: true` in the second argument to fetch()"), - error.ConnectionRefused => bun.String.static("Unable to connect. Is the computer able to access the url?"), - error.RedirectURLInvalid => bun.String.static("Redirect URL in Location header is invalid."), - - error.UNABLE_TO_GET_ISSUER_CERT => bun.String.static("unable to get issuer certificate"), - error.UNABLE_TO_GET_CRL => bun.String.static("unable to get certificate CRL"), - error.UNABLE_TO_DECRYPT_CERT_SIGNATURE => bun.String.static("unable to decrypt certificate's signature"), - error.UNABLE_TO_DECRYPT_CRL_SIGNATURE => bun.String.static("unable to decrypt CRL's signature"), - error.UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY => bun.String.static("unable to decode issuer public key"), - error.CERT_SIGNATURE_FAILURE => bun.String.static("certificate signature failure"), - error.CRL_SIGNATURE_FAILURE => bun.String.static("CRL signature failure"), - error.CERT_NOT_YET_VALID => bun.String.static("certificate is not yet valid"), - error.CRL_NOT_YET_VALID => bun.String.static("CRL is not yet valid"), - error.CERT_HAS_EXPIRED => bun.String.static("certificate has expired"), - error.CRL_HAS_EXPIRED => bun.String.static("CRL has expired"), - error.ERROR_IN_CERT_NOT_BEFORE_FIELD => bun.String.static("format error in certificate's notBefore field"), - error.ERROR_IN_CERT_NOT_AFTER_FIELD => bun.String.static("format error in certificate's notAfter field"), - error.ERROR_IN_CRL_LAST_UPDATE_FIELD => bun.String.static("format error in CRL's lastUpdate field"), - error.ERROR_IN_CRL_NEXT_UPDATE_FIELD => bun.String.static("format error in CRL's nextUpdate field"), - error.OUT_OF_MEM => bun.String.static("out of memory"), - error.DEPTH_ZERO_SELF_SIGNED_CERT => bun.String.static("self signed certificate"), - error.SELF_SIGNED_CERT_IN_CHAIN => bun.String.static("self signed certificate in certificate chain"), - error.UNABLE_TO_GET_ISSUER_CERT_LOCALLY => bun.String.static("unable to get local issuer certificate"), - error.UNABLE_TO_VERIFY_LEAF_SIGNATURE => bun.String.static("unable to verify the first certificate"), - error.CERT_CHAIN_TOO_LONG => bun.String.static("certificate chain too long"), - error.CERT_REVOKED => bun.String.static("certificate revoked"), - error.INVALID_CA => bun.String.static("invalid CA certificate"), - error.INVALID_NON_CA => bun.String.static("invalid non-CA certificate (has CA markings)"), - error.PATH_LENGTH_EXCEEDED => bun.String.static("path length constraint exceeded"), - error.PROXY_PATH_LENGTH_EXCEEDED => bun.String.static("proxy path length constraint exceeded"), - error.PROXY_CERTIFICATES_NOT_ALLOWED => bun.String.static("proxy certificates not allowed, please set the appropriate flag"), - error.INVALID_PURPOSE => bun.String.static("unsupported certificate purpose"), - error.CERT_UNTRUSTED => bun.String.static("certificate not trusted"), - error.CERT_REJECTED => bun.String.static("certificate rejected"), - error.APPLICATION_VERIFICATION => bun.String.static("application verification failure"), - error.SUBJECT_ISSUER_MISMATCH => bun.String.static("subject issuer mismatch"), - error.AKID_SKID_MISMATCH => bun.String.static("authority and subject key identifier mismatch"), - error.AKID_ISSUER_SERIAL_MISMATCH => bun.String.static("authority and issuer serial number mismatch"), - error.KEYUSAGE_NO_CERTSIGN => bun.String.static("key usage does not include certificate signing"), - error.UNABLE_TO_GET_CRL_ISSUER => bun.String.static("unable to get CRL issuer certificate"), - error.UNHANDLED_CRITICAL_EXTENSION => bun.String.static("unhandled critical extension"), - error.KEYUSAGE_NO_CRL_SIGN => bun.String.static("key usage does not include CRL signing"), - error.KEYUSAGE_NO_DIGITAL_SIGNATURE => bun.String.static("key usage does not include digital signature"), - error.UNHANDLED_CRITICAL_CRL_EXTENSION => bun.String.static("unhandled critical CRL extension"), - error.INVALID_EXTENSION => bun.String.static("invalid or inconsistent certificate extension"), - error.INVALID_POLICY_EXTENSION => bun.String.static("invalid or inconsistent certificate policy extension"), - error.NO_EXPLICIT_POLICY => bun.String.static("no explicit policy"), - error.DIFFERENT_CRL_SCOPE => bun.String.static("Different CRL scope"), - error.UNSUPPORTED_EXTENSION_FEATURE => bun.String.static("Unsupported extension feature"), - error.UNNESTED_RESOURCE => bun.String.static("RFC 3779 resource not subset of parent's resources"), - error.PERMITTED_VIOLATION => bun.String.static("permitted subtree violation"), - error.EXCLUDED_VIOLATION => bun.String.static("excluded subtree violation"), - error.SUBTREE_MINMAX => bun.String.static("name constraints minimum and maximum not supported"), - error.UNSUPPORTED_CONSTRAINT_TYPE => bun.String.static("unsupported name constraint type"), - error.UNSUPPORTED_CONSTRAINT_SYNTAX => bun.String.static("unsupported or invalid name constraint syntax"), - error.UNSUPPORTED_NAME_SYNTAX => bun.String.static("unsupported or invalid name syntax"), - error.CRL_PATH_VALIDATION_ERROR => bun.String.static("CRL path validation error"), - error.SUITE_B_INVALID_VERSION => bun.String.static("Suite B: certificate version invalid"), - error.SUITE_B_INVALID_ALGORITHM => bun.String.static("Suite B: invalid public key algorithm"), - error.SUITE_B_INVALID_CURVE => bun.String.static("Suite B: invalid ECC curve"), - error.SUITE_B_INVALID_SIGNATURE_ALGORITHM => bun.String.static("Suite B: invalid signature algorithm"), - error.SUITE_B_LOS_NOT_ALLOWED => bun.String.static("Suite B: curve not allowed for this LOS"), - error.SUITE_B_CANNOT_SIGN_P_384_WITH_P_256 => bun.String.static("Suite B: cannot sign P-384 with P-256"), - error.HOSTNAME_MISMATCH => bun.String.static("Hostname mismatch"), - error.EMAIL_MISMATCH => bun.String.static("Email address mismatch"), - error.IP_ADDRESS_MISMATCH => bun.String.static("IP address mismatch"), - error.INVALID_CALL => bun.String.static("Invalid certificate verification context"), - error.STORE_LOOKUP => bun.String.static("Issuer certificate lookup error"), - error.NAME_CONSTRAINTS_WITHOUT_SANS => bun.String.static("Issuer has name constraints but leaf has no SANs"), - error.UNKNOWN_CERTIFICATE_VERIFICATION_ERROR => bun.String.static("unknown certificate verification error"), - - else => |e| bun.String.createFormat("{s} fetching \"{}\". For more information, pass `verbose: true` in the second argument to fetch()", .{ - @errorName(e), - path, - }) catch |err| bun.handleOom(err), - }, - .path = path, - }; - - return .{ .SystemError = fetch_error }; - } - - pub fn onReadableStreamAvailable(ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void { - const this = bun.cast(*FetchTasklet, ctx); - this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis); - } - - pub fn onStartStreamingHTTPResponseBodyCallback(ctx: *anyopaque) jsc.WebCore.DrainResult { - const this = bun.cast(*FetchTasklet, ctx); - if (this.signal_store.aborted.load(.monotonic)) { - return jsc.WebCore.DrainResult{ - .aborted = {}, - }; - } - - if (this.http) |http_| { - http_.enableResponseBodyStreaming(); - - // If the server sent the headers and the response body in two separate socket writes - // and if the server doesn't close the connection by itself - // and doesn't send any follow-up data - // then we must make sure the HTTP thread flushes. - bun.http.http_thread.scheduleResponseBodyDrain(http_.async_http_id); - } - - this.mutex.lock(); - defer this.mutex.unlock(); - const size_hint = this.getSizeHint(); - - var scheduled_response_buffer = this.scheduled_response_buffer.list; - // This means we have received part of the body but not the whole thing - if (scheduled_response_buffer.items.len > 0) { - this.scheduled_response_buffer = .{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }; - - return .{ - .owned = .{ - .list = scheduled_response_buffer.toManaged(bun.default_allocator), - .size_hint = size_hint, - }, - }; - } - - return .{ - .estimated_size = size_hint, - }; - } - - fn getSizeHint(this: *FetchTasklet) Blob.SizeType { - return switch (this.body_size) { - .content_length => @truncate(this.body_size.content_length), - .total_received => @truncate(this.body_size.total_received), - .unknown => 0, - }; - } - - fn toBodyValue(this: *FetchTasklet) Body.Value { - if (this.getAbortError()) |err| { - return .{ .Error = err }; - } - if (this.is_waiting_body) { - const response = Body.Value{ - .Locked = .{ - .size_hint = this.getSizeHint(), - .task = this, - .global = this.global_this, - .onStartStreaming = FetchTasklet.onStartStreamingHTTPResponseBodyCallback, - .onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable, - }, - }; - return response; - } - - var scheduled_response_buffer = this.scheduled_response_buffer.list; - const response = Body.Value{ - .InternalBlob = .{ - .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), - }, - }; - this.scheduled_response_buffer = .{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }; - - return response; - } - - fn toResponse(this: *FetchTasklet) Response { - log("toResponse", .{}); - bun.assert(this.metadata != null); - // at this point we always should have metadata - const metadata = this.metadata.?; - const http_response = metadata.response; - this.is_waiting_body = this.result.has_more; - return Response.init( - .{ - .headers = FetchHeaders.createFromPicoHeaders(http_response.headers), - .status_code = @as(u16, @truncate(http_response.status_code)), - .status_text = bun.String.createAtomIfPossible(http_response.status), - }, - Body{ - .value = this.toBodyValue(), - }, - bun.String.createAtomIfPossible(metadata.url), - this.result.redirected, - ); - } - - fn ignoreRemainingResponseBody(this: *FetchTasklet) void { - log("ignoreRemainingResponseBody", .{}); - // enabling streaming will make the http thread to drain into the main thread (aka stop buffering) - // without a stream ref, response body or response instance alive it will just ignore the result - if (this.http) |http_| { - http_.enableResponseBodyStreaming(); - } - // we should not keep the process alive if we are ignoring the body - const vm = this.javascript_vm; - this.poll_ref.unref(vm); - // clean any remaining refereces - this.readable_stream_ref.deinit(); - this.response.deinit(); - - if (this.native_response) |response| { - response.unref(); - this.native_response = null; - } - - this.ignore_data = true; - } - - export fn Bun__FetchResponse_finalize(this: *FetchTasklet) callconv(.C) void { - log("onResponseFinalize", .{}); - if (this.native_response) |response| { - const body = response.getBodyValue(); - // Three scenarios: - // - // 1. We are streaming, in which case we should not ignore the body. - // 2. We were buffering, in which case - // 2a. if we have no promise, we should ignore the body. - // 2b. if we have a promise, we should keep loading the body. - // 3. We never started buffering, in which case we should ignore the body. - // - // Note: We cannot call .get() on the ReadableStreamRef. This is called inside a finalizer. - if (body.* != .Locked or this.readable_stream_ref.held.has()) { - // Scenario 1 or 3. - return; - } - - if (body.Locked.promise) |promise| { - if (promise.isEmptyOrUndefinedOrNull()) { - // Scenario 2b. - this.ignoreRemainingResponseBody(); - } - } else { - // Scenario 3. - this.ignoreRemainingResponseBody(); - } - } - } - comptime { - _ = Bun__FetchResponse_finalize; - } - - pub fn onResolve(this: *FetchTasklet) JSValue { - log("onResolve", .{}); - const response = bun.new(Response, this.toResponse()); - const response_js = Response.makeMaybePooled(@as(*jsc.JSGlobalObject, this.global_this), response); - response_js.ensureStillAlive(); - this.response = jsc.Weak(FetchTasklet).create(response_js, this.global_this, .FetchResponse, this); - this.native_response = response.ref(); - return response_js; - } - - pub fn get( - allocator: std.mem.Allocator, - globalThis: *jsc.JSGlobalObject, - fetch_options: *const FetchOptions, - promise: jsc.JSPromise.Strong, - ) !*FetchTasklet { - var jsc_vm = globalThis.bunVM(); - var fetch_tasklet = try allocator.create(FetchTasklet); - - fetch_tasklet.* = .{ - .mutex = .{}, - .scheduled_response_buffer = .{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }, - .response_buffer = MutableString{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }, - .http = try allocator.create(http.AsyncHTTP), - .javascript_vm = jsc_vm, - .request_body = fetch_options.body, - .global_this = globalThis, - .promise = promise, - .request_headers = fetch_options.headers, - .url_proxy_buffer = fetch_options.url_proxy_buffer, - .signal = fetch_options.signal, - .hostname = fetch_options.hostname, - .tracker = jsc.Debugger.AsyncTaskTracker.init(jsc_vm), - .check_server_identity = fetch_options.check_server_identity, - .reject_unauthorized = fetch_options.reject_unauthorized, - .upgraded_connection = fetch_options.upgraded_connection, - }; - - fetch_tasklet.signals = fetch_tasklet.signal_store.to(); - - fetch_tasklet.tracker.didSchedule(globalThis); - - if (fetch_tasklet.request_body.store()) |store| { - store.ref(); - } - - var proxy: ?ZigURL = null; - if (fetch_options.proxy) |proxy_opt| { - if (!proxy_opt.isEmpty()) { //if is empty just ignore proxy - proxy = fetch_options.proxy orelse jsc_vm.transpiler.env.getHttpProxyFor(fetch_options.url); - } - } else { - proxy = jsc_vm.transpiler.env.getHttpProxyFor(fetch_options.url); - } - - if (fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized) { - fetch_tasklet.signal_store.cert_errors.store(true, .monotonic); - } else { - fetch_tasklet.signals.cert_errors = null; - } - - // This task gets queued on the HTTP thread. - fetch_tasklet.http.?.* = http.AsyncHTTP.init( - bun.default_allocator, - fetch_options.method, - fetch_options.url, - fetch_options.headers.entries, - fetch_options.headers.buf.items, - &fetch_tasklet.response_buffer, - fetch_tasklet.request_body.slice(), - http.HTTPClientResult.Callback.New( - *FetchTasklet, - // handles response events (on headers, on body, etc.) - FetchTasklet.callback, - ).init(fetch_tasklet), - fetch_options.redirect_type, - .{ - .http_proxy = proxy, - .hostname = fetch_options.hostname, - .signals = fetch_tasklet.signals, - .unix_socket_path = fetch_options.unix_socket_path, - .disable_timeout = fetch_options.disable_timeout, - .disable_keepalive = fetch_options.disable_keepalive, - .disable_decompression = fetch_options.disable_decompression, - .reject_unauthorized = fetch_options.reject_unauthorized, - .verbose = fetch_options.verbose, - .tls_props = fetch_options.ssl_config, - }, - ); - // enable streaming the write side - const isStream = fetch_tasklet.request_body == .ReadableStream; - fetch_tasklet.http.?.client.flags.is_streaming_request_body = isStream; - fetch_tasklet.is_waiting_request_stream_start = isStream; - if (isStream) { - const buffer = http.ThreadSafeStreamBuffer.new(.{}); - buffer.setDrainCallback(FetchTasklet, FetchTasklet.onWriteRequestDataDrain, fetch_tasklet); - fetch_tasklet.request_body_streaming_buffer = buffer; - fetch_tasklet.http.?.request_body = .{ - .stream = .{ - .buffer = buffer, - .ended = false, - }, - }; - } - // TODO is this necessary? the http client already sets the redirect type, - // so manually setting it here seems redundant - if (fetch_options.redirect_type != FetchRedirect.follow) { - fetch_tasklet.http.?.client.remaining_redirect_count = 0; - } - - // we want to return after headers are received - fetch_tasklet.signal_store.header_progress.store(true, .monotonic); - - if (fetch_tasklet.request_body == .Sendfile) { - bun.assert(fetch_options.url.isHTTP()); - bun.assert(fetch_options.proxy == null); - fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile }; - } - - if (fetch_tasklet.signal) |signal| { - signal.pendingActivityRef(); - fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener); - } - return fetch_tasklet; - } - - pub fn abortListener(this: *FetchTasklet, reason: JSValue) void { - log("abortListener", .{}); - reason.ensureStillAlive(); - this.abort_reason.set(this.global_this, reason); - this.abortTask(); - if (this.sink) |sink| { - sink.cancel(reason); - return; - } - } - - /// This is ALWAYS called from the http thread and we cannot touch the buffer here because is locked - pub fn onWriteRequestDataDrain(this: *FetchTasklet) void { - // ref until the main thread callback is called - this.ref(); - this.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.resumeRequestDataStream)); - } - - /// This is ALWAYS called from the main thread - // XXX: 'fn (*FetchTasklet) error{}!void' coerces to 'fn (*FetchTasklet) bun.JSError!void' but 'fn (*FetchTasklet) void' does not - pub fn resumeRequestDataStream(this: *FetchTasklet) error{}!void { - // deref when done because we ref inside onWriteRequestDataDrain - defer this.deref(); - log("resumeRequestDataStream", .{}); - if (this.sink) |sink| { - if (this.signal) |signal| { - if (signal.aborted()) { - // already aborted; nothing to drain - return; - } - } - sink.drain(); - } - } - - pub fn writeRequestData(this: *FetchTasklet, data: []const u8) ResumableSinkBackpressure { - log("writeRequestData {}", .{data.len}); - if (this.signal) |signal| { - if (signal.aborted()) { - return .done; - } - } - const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return .done; - const stream_buffer = thread_safe_stream_buffer.acquire(); - defer thread_safe_stream_buffer.release(); - const highWaterMark = if (this.sink) |sink| sink.highWaterMark else 16384; - - var needs_schedule = false; - defer if (needs_schedule) { - // wakeup the http thread to write the data - http.http_thread.scheduleRequestWrite(this.http.?, .data); - }; - - // dont have backpressure so we will schedule the data to be written - // if we have backpressure the onWritable will drain the buffer - needs_schedule = stream_buffer.isEmpty(); - if (this.upgraded_connection) { - bun.handleOom(stream_buffer.write(data)); - } else { - //16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n - var formated_size_buffer: [18]u8 = undefined; - const formated_size = std.fmt.bufPrint( - formated_size_buffer[0..], - "{x}\r\n", - .{data.len}, - ) catch |err| switch (err) { - error.NoSpaceLeft => unreachable, - }; - bun.handleOom(stream_buffer.ensureUnusedCapacity(formated_size.len + data.len + 2)); - stream_buffer.writeAssumeCapacity(formated_size); - stream_buffer.writeAssumeCapacity(data); - stream_buffer.writeAssumeCapacity("\r\n"); - } - - // pause the stream if we hit the high water mark - return if (stream_buffer.size() >= highWaterMark) .backpressure else .want_more; - } - - pub fn writeEndRequest(this: *FetchTasklet, err: ?jsc.JSValue) void { - log("writeEndRequest hasError? {}", .{err != null}); - defer this.deref(); - if (err) |jsError| { - if (this.signal_store.aborted.load(.monotonic) or this.abort_reason.has()) { - return; - } - if (!jsError.isUndefinedOrNull()) { - this.abort_reason.set(this.global_this, jsError); - } - this.abortTask(); - } else { - if (!this.upgraded_connection) { - // If is not upgraded we need to send the terminating chunk - const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return; - const stream_buffer = thread_safe_stream_buffer.acquire(); - defer thread_safe_stream_buffer.release(); - bun.handleOom(stream_buffer.write(http.end_of_chunked_http1_1_encoding_response_body)); - } - if (this.http) |http_| { - // just tell to write the end of the chunked encoding aka 0\r\n\r\n - http.http_thread.scheduleRequestWrite(http_, .end); - } - } - } - - pub fn abortTask(this: *FetchTasklet) void { - this.signal_store.aborted.store(true, .monotonic); - this.tracker.didCancel(this.global_this); - - if (this.http) |http_| { - http.http_thread.scheduleShutdown(http_); - } - } - - const FetchOptions = struct { - method: Method, - headers: Headers, - body: HTTPRequestBody, - disable_timeout: bool, - disable_keepalive: bool, - disable_decompression: bool, - reject_unauthorized: bool, - url: ZigURL, - verbose: http.HTTPVerboseLevel = .none, - redirect_type: FetchRedirect = FetchRedirect.follow, - proxy: ?ZigURL = null, - url_proxy_buffer: []const u8 = "", - signal: ?*jsc.WebCore.AbortSignal = null, - globalThis: ?*JSGlobalObject, - // Custom Hostname - hostname: ?[]u8 = null, - check_server_identity: jsc.Strong.Optional = .empty, - unix_socket_path: ZigString.Slice, - ssl_config: ?*SSLConfig = null, - upgraded_connection: bool = false, - }; - - pub fn queue( - allocator: std.mem.Allocator, - global: *JSGlobalObject, - fetch_options: *const FetchOptions, - promise: jsc.JSPromise.Strong, - ) !*FetchTasklet { - http.HTTPThread.init(&.{}); - var node = try get( - allocator, - global, - fetch_options, - promise, - ); - - var batch = bun.ThreadPool.Batch{}; - node.http.?.schedule(allocator, &batch); - node.poll_ref.ref(global.bunVM()); - - // increment ref so we can keep it alive until the http client is done - node.ref(); - http.http_thread.schedule(batch); - - return node; - } - - /// Called from HTTP thread. Handles HTTP events received from socket. - pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void { - // at this point only this thread is accessing result to is no race condition - const is_done = !result.has_more; - // we are done with the http client so we can deref our side - // this is a atomic operation and will enqueue a task to deinit on the main thread - defer if (is_done) task.derefFromThread(); - - task.mutex.lock(); - // we need to unlock before task.deref(); - defer task.mutex.unlock(); - task.http.?.* = async_http.*; - task.http.?.response_buffer = async_http.response_buffer; - - log("callback success={} ignore_data={} has_more={} bytes={}", .{ result.isSuccess(), task.ignore_data, result.has_more, result.body.?.list.items.len }); - - const prev_metadata = task.result.metadata; - const prev_cert_info = task.result.certificate_info; - task.result = result; - - // Preserve pending certificate info if it was preovided in the previous update. - if (task.result.certificate_info == null) { - if (prev_cert_info) |cert_info| { - task.result.certificate_info = cert_info; - } - } - - // metadata should be provided only once - if (result.metadata orelse prev_metadata) |metadata| { - log("added callback metadata", .{}); - if (task.metadata == null) { - task.metadata = metadata; - } - - task.result.metadata = null; - } - - task.body_size = result.body_size; - - const success = result.isSuccess(); - task.response_buffer = result.body.?.*; - - if (task.ignore_data) { - task.response_buffer.reset(); - - if (task.scheduled_response_buffer.list.capacity > 0) { - task.scheduled_response_buffer.deinit(); - task.scheduled_response_buffer = .{ - .allocator = bun.default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, - }; - } - if (success and result.has_more) { - // we are ignoring the body so we should not receive more data, so will only signal when result.has_more = true - return; - } - } else { - if (success) { - _ = bun.handleOom(task.scheduled_response_buffer.write(task.response_buffer.list.items)); - } - // reset for reuse - task.response_buffer.reset(); - } - - if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| { - if (has_schedule_callback) { - return; - } - } - - task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit)); - } -}; +pub const FetchTasklet = @import("./fetch/FetchTasklet.zig").FetchTasklet; fn dataURLResponse( _data_url: DataURL, @@ -2728,21 +1393,16 @@ fn setHeaders(headers: *?Headers, new_headers: []const picohttp.Header, allocato const string = []const u8; -const X509 = @import("../api/bun/x509.zig"); const std = @import("std"); const DataURL = @import("../../resolver/data_url.zig").DataURL; const Method = @import("../../http/Method.zig").Method; const ZigURL = @import("../../url.zig").URL; const bun = @import("bun"); -const Async = bun.Async; const Environment = bun.Environment; -const MutableString = bun.MutableString; -const Mutex = bun.Mutex; const Output = bun.Output; const picohttp = bun.picohttp; const s3 = bun.S3; -const BoringSSL = bun.BoringSSL.c; const FetchHeaders = bun.webcore.FetchHeaders; const PosixToWinNormalizer = bun.path.PosixToWinNormalizer; const SSLConfig = bun.api.server.ServerConfig.SSLConfig; @@ -2762,7 +1422,6 @@ const JSType = jsc.C.JSType; const Body = jsc.WebCore.Body; const Request = jsc.WebCore.Request; const Response = jsc.WebCore.Response; -const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure; const Blob = jsc.WebCore.Blob; const AnyBlob = jsc.WebCore.Blob.Any; diff --git a/src/bun.js/webcore/fetch/FetchTasklet.zig b/src/bun.js/webcore/fetch/FetchTasklet.zig new file mode 100644 index 0000000000..b4f79058a4 --- /dev/null +++ b/src/bun.js/webcore/fetch/FetchTasklet.zig @@ -0,0 +1,1368 @@ +pub const FetchTasklet = struct { + pub const ResumableSink = jsc.WebCore.ResumableFetchSink; + + const log = Output.scoped(.FetchTasklet, .visible); + sink: ?*ResumableSink = null, + http: ?*http.AsyncHTTP = null, + result: http.HTTPClientResult = .{}, + metadata: ?http.HTTPResponseMetadata = null, + javascript_vm: *VirtualMachine = undefined, + global_this: *JSGlobalObject = undefined, + request_body: HTTPRequestBody = undefined, + request_body_streaming_buffer: ?*http.ThreadSafeStreamBuffer = null, + + /// buffer being used by AsyncHTTP + response_buffer: MutableString = undefined, + /// buffer used to stream response to JS + scheduled_response_buffer: MutableString = undefined, + /// response weak ref we need this to track the response JS lifetime + response: jsc.Weak(FetchTasklet) = .{}, + /// native response ref if we still need it when JS is discarted + native_response: ?*Response = null, + ignore_data: bool = false, + /// stream strong ref if any is available + readable_stream_ref: jsc.WebCore.ReadableStream.Strong = .{}, + request_headers: Headers = Headers{ .allocator = undefined }, + promise: jsc.JSPromise.Strong, + concurrent_task: jsc.ConcurrentTask = .{}, + poll_ref: Async.KeepAlive = .{}, + /// For Http Client requests + /// when Content-Length is provided this represents the whole size of the request + /// If chunked encoded this will represent the total received size (ignoring the chunk headers) + /// If is not chunked encoded and Content-Length is not provided this will be unknown + body_size: http.HTTPClientResult.BodySize = .unknown, + + /// This is url + proxy memory buffer and is owned by FetchTasklet + /// We always clone url and proxy (if informed) + url_proxy_buffer: []const u8 = "", + + signal: ?*jsc.WebCore.AbortSignal = null, + signals: http.Signals = .{}, + signal_store: http.Signals.Store = .{}, + has_schedule_callback: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + + // must be stored because AbortSignal stores reason weakly + abort_reason: jsc.Strong.Optional = .empty, + + // custom checkServerIdentity + check_server_identity: jsc.Strong.Optional = .empty, + reject_unauthorized: bool = true, + upgraded_connection: bool = false, + // Custom Hostname + hostname: ?[]u8 = null, + is_waiting_body: bool = false, + is_waiting_abort: bool = false, + is_waiting_request_stream_start: bool = false, + mutex: Mutex, + + tracker: jsc.Debugger.AsyncTaskTracker, + + ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1), + + pub fn ref(this: *FetchTasklet) void { + const count = this.ref_count.fetchAdd(1, .monotonic); + bun.debugAssert(count > 0); + } + + pub fn deref(this: *FetchTasklet) void { + const count = this.ref_count.fetchSub(1, .monotonic); + bun.debugAssert(count > 0); + + if (count == 1) { + this.deinit() catch |err| switch (err) {}; + } + } + + pub fn derefFromThread(this: *FetchTasklet) void { + const count = this.ref_count.fetchSub(1, .monotonic); + bun.debugAssert(count > 0); + + if (count == 1) { + // this is really unlikely to happen, but can happen + // lets make sure that we always call deinit from main thread + + this.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.deinit)); + } + } + + pub const HTTPRequestBody = union(enum) { + AnyBlob: AnyBlob, + Sendfile: http.SendFile, + ReadableStream: jsc.WebCore.ReadableStream.Strong, + + pub const Empty: HTTPRequestBody = .{ .AnyBlob = .{ .Blob = .{} } }; + + pub fn store(this: *HTTPRequestBody) ?*Blob.Store { + return switch (this.*) { + .AnyBlob => this.AnyBlob.store(), + else => null, + }; + } + + pub fn slice(this: *const HTTPRequestBody) []const u8 { + return switch (this.*) { + .AnyBlob => this.AnyBlob.slice(), + else => "", + }; + } + + pub fn detach(this: *HTTPRequestBody) void { + switch (this.*) { + .AnyBlob => this.AnyBlob.detach(), + .ReadableStream => |*stream| { + stream.deinit(); + }, + .Sendfile => { + if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0) + this.Sendfile.fd.close(); + this.Sendfile.offset = 0; + this.Sendfile.remain = 0; + }, + } + } + + pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!HTTPRequestBody { + var body_value = try Body.Value.fromJS(globalThis, value); + if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed2(globalThis)))) { + return globalThis.ERR(.BODY_ALREADY_USED, "body already used", .{}).throw(); + } + if (body_value == .Locked) { + if (body_value.Locked.readable.has()) { + // just grab the ref + return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable }; + } + const readable = try body_value.toReadableStream(globalThis); + if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) { + return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable }; + } + } + return FetchTasklet.HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() }; + } + + pub fn needsToReadFile(this: *HTTPRequestBody) bool { + return switch (this.*) { + .AnyBlob => |blob| blob.needsToReadFile(), + else => false, + }; + } + + pub fn isS3(this: *const HTTPRequestBody) bool { + return switch (this.*) { + .AnyBlob => |*blob| blob.isS3(), + else => false, + }; + } + + pub fn hasContentTypeFromUser(this: *HTTPRequestBody) bool { + return switch (this.*) { + .AnyBlob => |blob| blob.hasContentTypeFromUser(), + else => false, + }; + } + + pub fn getAnyBlob(this: *HTTPRequestBody) ?*AnyBlob { + return switch (this.*) { + .AnyBlob => &this.AnyBlob, + else => null, + }; + } + + pub fn hasBody(this: *HTTPRequestBody) bool { + return switch (this.*) { + .AnyBlob => |blob| blob.size() > 0, + .ReadableStream => |*stream| stream.has(), + .Sendfile => true, + }; + } + }; + + pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet { + return FetchTasklet{}; + } + + fn clearSink(this: *FetchTasklet) void { + if (this.sink) |sink| { + this.sink = null; + sink.deref(); + } + if (this.request_body_streaming_buffer) |buffer| { + this.request_body_streaming_buffer = null; + buffer.clearDrainCallback(); + buffer.deref(); + } + } + + fn clearData(this: *FetchTasklet) void { + log("clearData ", .{}); + const allocator = bun.default_allocator; + if (this.url_proxy_buffer.len > 0) { + allocator.free(this.url_proxy_buffer); + this.url_proxy_buffer.len = 0; + } + + if (this.hostname) |hostname| { + allocator.free(hostname); + this.hostname = null; + } + + if (this.result.certificate_info) |*certificate| { + certificate.deinit(bun.default_allocator); + this.result.certificate_info = null; + } + + this.request_headers.entries.deinit(allocator); + this.request_headers.buf.deinit(allocator); + this.request_headers = Headers{ .allocator = undefined }; + + if (this.http) |http_| { + http_.clearData(); + } + + if (this.metadata != null) { + this.metadata.?.deinit(allocator); + this.metadata = null; + } + + this.response_buffer.deinit(); + this.response.deinit(); + if (this.native_response) |response| { + this.native_response = null; + + response.unref(); + } + + this.readable_stream_ref.deinit(); + + this.scheduled_response_buffer.deinit(); + if (this.request_body != .ReadableStream or this.is_waiting_request_stream_start) { + this.request_body.detach(); + } + + this.abort_reason.deinit(); + this.check_server_identity.deinit(); + this.clearAbortSignal(); + // Clear the sink only after the requested ended otherwise we would potentialy lose the last chunk + this.clearSink(); + } + + // XXX: 'fn (*FetchTasklet) error{}!void' coerces to 'fn (*FetchTasklet) bun.JSError!void' but 'fn (*FetchTasklet) void' does not + pub fn deinit(this: *FetchTasklet) error{}!void { + log("deinit", .{}); + + bun.assert(this.ref_count.load(.monotonic) == 0); + + this.clearData(); + + const allocator = bun.default_allocator; + + if (this.http) |http_| { + this.http = null; + allocator.destroy(http_); + } + allocator.destroy(this); + } + + fn getCurrentResponse(this: *FetchTasklet) ?*Response { + // we need a body to resolve the promise when buffering + if (this.native_response) |response| { + return response; + } + + // if we did not have a direct reference we check if the Weak ref is still alive + if (this.response.get()) |response_js| { + if (response_js.as(Response)) |response| { + return response; + } + } + + return null; + } + + pub fn startRequestStream(this: *FetchTasklet) void { + this.is_waiting_request_stream_start = false; + bun.assert(this.request_body == .ReadableStream); + if (this.request_body.ReadableStream.get(this.global_this)) |stream| { + if (this.signal) |signal| { + if (signal.aborted()) { + stream.abort(this.global_this); + return; + } + } + + const globalThis = this.global_this; + this.ref(); // lets only unref when sink is done + // +1 because the task refs the sink + const sink = ResumableSink.initExactRefs(globalThis, stream, this, 2); + this.sink = sink; + } + } + + pub fn onBodyReceived(this: *FetchTasklet) bun.JSTerminated!void { + const success = this.result.isSuccess(); + const globalThis = this.global_this; + // reset the buffer if we are streaming or if we are not waiting for bufferig anymore + var buffer_reset = true; + log("onBodyReceived success={} has_more={}", .{ success, this.result.has_more }); + defer { + if (buffer_reset) { + this.scheduled_response_buffer.reset(); + } + } + + if (!success) { + var err = this.onReject(); + var need_deinit = true; + defer if (need_deinit) err.deinit(); + var js_err = JSValue.zero; + // if we are streaming update with error + if (this.readable_stream_ref.get(globalThis)) |readable| { + if (readable.ptr == .Bytes) { + js_err = err.toJS(globalThis); + js_err.ensureStillAlive(); + try readable.ptr.Bytes.onData( + .{ + .err = .{ .JSValue = js_err }, + }, + bun.default_allocator, + ); + } + } + if (this.sink) |sink| { + if (js_err == .zero) { + js_err = err.toJS(globalThis); + js_err.ensureStillAlive(); + } + sink.cancel(js_err); + return; + } + // if we are buffering resolve the promise + if (this.getCurrentResponse()) |response| { + need_deinit = false; // body value now owns the error + const body = response.getBodyValue(); + try body.toErrorInstance(err, globalThis); + } + return; + } + + if (this.readable_stream_ref.get(globalThis)) |readable| { + log("onBodyReceived readable_stream_ref", .{}); + if (readable.ptr == .Bytes) { + readable.ptr.Bytes.size_hint = this.getSizeHint(); + // body can be marked as used but we still need to pipe the data + const scheduled_response_buffer = &this.scheduled_response_buffer.list; + + const chunk = scheduled_response_buffer.items; + + if (this.result.has_more) { + try readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk), + }, + bun.default_allocator, + ); + } else { + var prev = this.readable_stream_ref; + this.readable_stream_ref = .{}; + defer prev.deinit(); + buffer_reset = false; + + try readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk), + }, + bun.default_allocator, + ); + } + return; + } + } + + if (this.getCurrentResponse()) |response| { + log("onBodyReceived Current Response", .{}); + const sizeHint = this.getSizeHint(); + response.setSizeHint(sizeHint); + if (response.getBodyReadableStream(globalThis)) |readable| { + log("onBodyReceived CurrentResponse BodyReadableStream", .{}); + if (readable.ptr == .Bytes) { + const scheduled_response_buffer = this.scheduled_response_buffer.list; + + const chunk = scheduled_response_buffer.items; + + if (this.result.has_more) { + try readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.fromBorrowedSliceDangerous(chunk), + }, + bun.default_allocator, + ); + } else { + readable.value.ensureStillAlive(); + response.detachReadableStream(globalThis); + try readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk), + }, + bun.default_allocator, + ); + } + + return; + } + } + + // we will reach here when not streaming, this is also the only case we dont wanna to reset the buffer + buffer_reset = false; + if (!this.result.has_more) { + var scheduled_response_buffer = this.scheduled_response_buffer.list; + const body = response.getBodyValue(); + // done resolve body + var old = body.*; + const body_value = Body.Value{ + .InternalBlob = .{ + .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), + }, + }; + body.* = body_value; + log("onBodyReceived body_value length={}", .{body_value.InternalBlob.bytes.items.len}); + + this.scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + + if (old == .Locked) { + log("onBodyReceived old.resolve", .{}); + try old.resolve(body, this.global_this, response.getFetchHeaders()); + } + } + } + } + + pub fn onProgressUpdate(this: *FetchTasklet) bun.JSTerminated!void { + jsc.markBinding(@src()); + log("onProgressUpdate", .{}); + this.mutex.lock(); + this.has_schedule_callback.store(false, .monotonic); + const is_done = !this.result.has_more; + + const vm = this.javascript_vm; + // vm is shutting down we cannot touch JS + if (vm.isShuttingDown()) { + this.mutex.unlock(); + if (is_done) { + this.deref(); + } + return; + } + + const globalThis = this.global_this; + defer { + this.mutex.unlock(); + // if we are not done we wait until the next call + if (is_done) { + var poll_ref = this.poll_ref; + this.poll_ref = .{}; + poll_ref.unref(vm); + this.deref(); + } + } + if (this.is_waiting_request_stream_start and this.result.can_stream) { + // start streaming + this.startRequestStream(); + } + // if we already respond the metadata and still need to process the body + if (this.is_waiting_body) { + try this.onBodyReceived(); + return; + } + if (this.metadata == null and this.result.isSuccess()) return; + + // if we abort because of cert error + // we wait the Http Client because we already have the response + // we just need to deinit + if (this.is_waiting_abort) { + return; + } + const promise_value = this.promise.valueOrEmpty(); + + if (promise_value.isEmptyOrUndefinedOrNull()) { + log("onProgressUpdate: promise_value is null", .{}); + this.promise.deinit(); + return; + } + + if (this.result.certificate_info) |certificate_info| { + this.result.certificate_info = null; + defer certificate_info.deinit(bun.default_allocator); + + // we receive some error + if (this.reject_unauthorized and !this.checkServerIdentity(certificate_info)) { + log("onProgressUpdate: aborted due certError", .{}); + // we need to abort the request + const promise = promise_value.asAnyPromise().?; + const tracker = this.tracker; + var result = this.onReject(); + defer result.deinit(); + + promise_value.ensureStillAlive(); + try promise.reject(globalThis, result.toJS(globalThis)); + + tracker.didDispatch(globalThis); + this.promise.deinit(); + return; + } + // everything ok + if (this.metadata == null) { + log("onProgressUpdate: metadata is null", .{}); + return; + } + } + + const tracker = this.tracker; + tracker.willDispatch(globalThis); + defer { + log("onProgressUpdate: promise_value is not null", .{}); + tracker.didDispatch(globalThis); + this.promise.deinit(); + } + const success = this.result.isSuccess(); + const result = switch (success) { + true => jsc.Strong.Optional.create(this.onResolve(), globalThis), + false => brk: { + // in this case we wanna a jsc.Strong.Optional so we just convert it + var value = this.onReject(); + const err = value.toJS(globalThis); + if (this.sink) |sink| { + sink.cancel(err); + } + break :brk value.JSValue; + }, + }; + + promise_value.ensureStillAlive(); + const Holder = struct { + held: jsc.Strong.Optional, + promise: jsc.Strong.Optional, + globalObject: *jsc.JSGlobalObject, + task: jsc.AnyTask, + + pub fn resolve(self: *@This()) bun.JSTerminated!void { + // cleanup + defer bun.default_allocator.destroy(self); + defer self.held.deinit(); + defer self.promise.deinit(); + // resolve the promise + var prom = self.promise.swap().asAnyPromise().?; + const res = self.held.swap(); + res.ensureStillAlive(); + try prom.resolve(self.globalObject, res); + } + + pub fn reject(self: *@This()) bun.JSTerminated!void { + // cleanup + defer bun.default_allocator.destroy(self); + defer self.held.deinit(); + defer self.promise.deinit(); + + // reject the promise + var prom = self.promise.swap().asAnyPromise().?; + const res = self.held.swap(); + res.ensureStillAlive(); + try prom.reject(self.globalObject, res); + } + }; + var holder = bun.handleOom(bun.default_allocator.create(Holder)); + holder.* = .{ + .held = result, + // we need the promise to be alive until the task is done + .promise = this.promise.strong, + .globalObject = globalThis, + .task = undefined, + }; + this.promise.strong = .empty; + holder.task = switch (success) { + true => jsc.AnyTask.New(Holder, Holder.resolve).init(holder), + false => jsc.AnyTask.New(Holder, Holder.reject).init(holder), + }; + + vm.enqueueTask(jsc.Task.init(&holder.task)); + } + + pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: http.CertificateInfo) bool { + if (this.check_server_identity.get()) |check_server_identity| { + check_server_identity.ensureStillAlive(); + if (certificate_info.cert.len > 0) { + const cert = certificate_info.cert; + var cert_ptr = cert.ptr; + if (BoringSSL.d2i_X509(null, &cert_ptr, @intCast(cert.len))) |x509| { + const globalObject = this.global_this; + defer x509.free(); + const js_cert = X509.toJS(x509, globalObject) catch |err| { + switch (err) { + error.JSError => {}, + error.OutOfMemory => globalObject.throwOutOfMemory() catch {}, + error.JSTerminated => {}, + } + const check_result = globalObject.tryTakeException().?; + // mark to wait until deinit + this.is_waiting_abort = this.result.has_more; + this.abort_reason.set(globalObject, check_result); + this.signal_store.aborted.store(true, .monotonic); + this.tracker.didCancel(this.global_this); + // we need to abort the request + if (this.http) |http_| http.http_thread.scheduleShutdown(http_); + this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; + return false; + }; + var hostname: bun.String = bun.String.cloneUTF8(certificate_info.hostname); + defer hostname.deref(); + const js_hostname = hostname.toJS(globalObject); + js_hostname.ensureStillAlive(); + js_cert.ensureStillAlive(); + const check_result = check_server_identity.call(globalObject, .js_undefined, &.{ js_hostname, js_cert }) catch |err| globalObject.takeException(err); + + // > Returns object [...] on failure + if (check_result.isAnyError()) { + // mark to wait until deinit + this.is_waiting_abort = this.result.has_more; + this.abort_reason.set(globalObject, check_result); + this.signal_store.aborted.store(true, .monotonic); + this.tracker.didCancel(this.global_this); + + // we need to abort the request + if (this.http) |http_| { + http.http_thread.scheduleShutdown(http_); + } + this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; + return false; + } + + // > On success, returns + // We treat any non-error value as a success. + return true; + } + } + } + this.result.fail = error.ERR_TLS_CERT_ALTNAME_INVALID; + return false; + } + + fn getAbortError(this: *FetchTasklet) ?Body.Value.ValueError { + if (this.abort_reason.has()) { + defer this.clearAbortSignal(); + const out = this.abort_reason; + + this.abort_reason = .empty; + return Body.Value.ValueError{ .JSValue = out }; + } + + if (this.signal) |signal| { + if (signal.reasonIfAborted(this.global_this)) |reason| { + defer this.clearAbortSignal(); + return reason.toBodyValueError(this.global_this); + } + } + + return null; + } + + fn clearAbortSignal(this: *FetchTasklet) void { + const signal = this.signal orelse return; + this.signal = null; + defer { + signal.pendingActivityUnref(); + signal.unref(); + } + + signal.cleanNativeBindings(this); + } + + pub fn onReject(this: *FetchTasklet) Body.Value.ValueError { + bun.assert(this.result.fail != null); + log("onReject", .{}); + + if (this.getAbortError()) |err| { + return err; + } + + if (this.result.abortReason()) |reason| { + return .{ .AbortReason = reason }; + } + + // some times we don't have metadata so we also check http.url + const path = if (this.metadata) |metadata| + bun.String.cloneUTF8(metadata.url) + else if (this.http) |http_| + bun.String.cloneUTF8(http_.url.href) + else + bun.String.empty; + + const fetch_error = jsc.SystemError{ + .code = bun.String.static(switch (this.result.fail.?) { + error.ConnectionClosed => "ECONNRESET", + else => |e| @errorName(e), + }), + .message = switch (this.result.fail.?) { + error.ConnectionClosed => bun.String.static("The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()"), + error.FailedToOpenSocket => bun.String.static("Was there a typo in the url or port?"), + error.TooManyRedirects => bun.String.static("The response redirected too many times. For more information, pass `verbose: true` in the second argument to fetch()"), + error.ConnectionRefused => bun.String.static("Unable to connect. Is the computer able to access the url?"), + error.RedirectURLInvalid => bun.String.static("Redirect URL in Location header is invalid."), + + error.UNABLE_TO_GET_ISSUER_CERT => bun.String.static("unable to get issuer certificate"), + error.UNABLE_TO_GET_CRL => bun.String.static("unable to get certificate CRL"), + error.UNABLE_TO_DECRYPT_CERT_SIGNATURE => bun.String.static("unable to decrypt certificate's signature"), + error.UNABLE_TO_DECRYPT_CRL_SIGNATURE => bun.String.static("unable to decrypt CRL's signature"), + error.UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY => bun.String.static("unable to decode issuer public key"), + error.CERT_SIGNATURE_FAILURE => bun.String.static("certificate signature failure"), + error.CRL_SIGNATURE_FAILURE => bun.String.static("CRL signature failure"), + error.CERT_NOT_YET_VALID => bun.String.static("certificate is not yet valid"), + error.CRL_NOT_YET_VALID => bun.String.static("CRL is not yet valid"), + error.CERT_HAS_EXPIRED => bun.String.static("certificate has expired"), + error.CRL_HAS_EXPIRED => bun.String.static("CRL has expired"), + error.ERROR_IN_CERT_NOT_BEFORE_FIELD => bun.String.static("format error in certificate's notBefore field"), + error.ERROR_IN_CERT_NOT_AFTER_FIELD => bun.String.static("format error in certificate's notAfter field"), + error.ERROR_IN_CRL_LAST_UPDATE_FIELD => bun.String.static("format error in CRL's lastUpdate field"), + error.ERROR_IN_CRL_NEXT_UPDATE_FIELD => bun.String.static("format error in CRL's nextUpdate field"), + error.OUT_OF_MEM => bun.String.static("out of memory"), + error.DEPTH_ZERO_SELF_SIGNED_CERT => bun.String.static("self signed certificate"), + error.SELF_SIGNED_CERT_IN_CHAIN => bun.String.static("self signed certificate in certificate chain"), + error.UNABLE_TO_GET_ISSUER_CERT_LOCALLY => bun.String.static("unable to get local issuer certificate"), + error.UNABLE_TO_VERIFY_LEAF_SIGNATURE => bun.String.static("unable to verify the first certificate"), + error.CERT_CHAIN_TOO_LONG => bun.String.static("certificate chain too long"), + error.CERT_REVOKED => bun.String.static("certificate revoked"), + error.INVALID_CA => bun.String.static("invalid CA certificate"), + error.INVALID_NON_CA => bun.String.static("invalid non-CA certificate (has CA markings)"), + error.PATH_LENGTH_EXCEEDED => bun.String.static("path length constraint exceeded"), + error.PROXY_PATH_LENGTH_EXCEEDED => bun.String.static("proxy path length constraint exceeded"), + error.PROXY_CERTIFICATES_NOT_ALLOWED => bun.String.static("proxy certificates not allowed, please set the appropriate flag"), + error.INVALID_PURPOSE => bun.String.static("unsupported certificate purpose"), + error.CERT_UNTRUSTED => bun.String.static("certificate not trusted"), + error.CERT_REJECTED => bun.String.static("certificate rejected"), + error.APPLICATION_VERIFICATION => bun.String.static("application verification failure"), + error.SUBJECT_ISSUER_MISMATCH => bun.String.static("subject issuer mismatch"), + error.AKID_SKID_MISMATCH => bun.String.static("authority and subject key identifier mismatch"), + error.AKID_ISSUER_SERIAL_MISMATCH => bun.String.static("authority and issuer serial number mismatch"), + error.KEYUSAGE_NO_CERTSIGN => bun.String.static("key usage does not include certificate signing"), + error.UNABLE_TO_GET_CRL_ISSUER => bun.String.static("unable to get CRL issuer certificate"), + error.UNHANDLED_CRITICAL_EXTENSION => bun.String.static("unhandled critical extension"), + error.KEYUSAGE_NO_CRL_SIGN => bun.String.static("key usage does not include CRL signing"), + error.KEYUSAGE_NO_DIGITAL_SIGNATURE => bun.String.static("key usage does not include digital signature"), + error.UNHANDLED_CRITICAL_CRL_EXTENSION => bun.String.static("unhandled critical CRL extension"), + error.INVALID_EXTENSION => bun.String.static("invalid or inconsistent certificate extension"), + error.INVALID_POLICY_EXTENSION => bun.String.static("invalid or inconsistent certificate policy extension"), + error.NO_EXPLICIT_POLICY => bun.String.static("no explicit policy"), + error.DIFFERENT_CRL_SCOPE => bun.String.static("Different CRL scope"), + error.UNSUPPORTED_EXTENSION_FEATURE => bun.String.static("Unsupported extension feature"), + error.UNNESTED_RESOURCE => bun.String.static("RFC 3779 resource not subset of parent's resources"), + error.PERMITTED_VIOLATION => bun.String.static("permitted subtree violation"), + error.EXCLUDED_VIOLATION => bun.String.static("excluded subtree violation"), + error.SUBTREE_MINMAX => bun.String.static("name constraints minimum and maximum not supported"), + error.UNSUPPORTED_CONSTRAINT_TYPE => bun.String.static("unsupported name constraint type"), + error.UNSUPPORTED_CONSTRAINT_SYNTAX => bun.String.static("unsupported or invalid name constraint syntax"), + error.UNSUPPORTED_NAME_SYNTAX => bun.String.static("unsupported or invalid name syntax"), + error.CRL_PATH_VALIDATION_ERROR => bun.String.static("CRL path validation error"), + error.SUITE_B_INVALID_VERSION => bun.String.static("Suite B: certificate version invalid"), + error.SUITE_B_INVALID_ALGORITHM => bun.String.static("Suite B: invalid public key algorithm"), + error.SUITE_B_INVALID_CURVE => bun.String.static("Suite B: invalid ECC curve"), + error.SUITE_B_INVALID_SIGNATURE_ALGORITHM => bun.String.static("Suite B: invalid signature algorithm"), + error.SUITE_B_LOS_NOT_ALLOWED => bun.String.static("Suite B: curve not allowed for this LOS"), + error.SUITE_B_CANNOT_SIGN_P_384_WITH_P_256 => bun.String.static("Suite B: cannot sign P-384 with P-256"), + error.HOSTNAME_MISMATCH => bun.String.static("Hostname mismatch"), + error.EMAIL_MISMATCH => bun.String.static("Email address mismatch"), + error.IP_ADDRESS_MISMATCH => bun.String.static("IP address mismatch"), + error.INVALID_CALL => bun.String.static("Invalid certificate verification context"), + error.STORE_LOOKUP => bun.String.static("Issuer certificate lookup error"), + error.NAME_CONSTRAINTS_WITHOUT_SANS => bun.String.static("Issuer has name constraints but leaf has no SANs"), + error.UNKNOWN_CERTIFICATE_VERIFICATION_ERROR => bun.String.static("unknown certificate verification error"), + + else => |e| bun.String.createFormat("{s} fetching \"{}\". For more information, pass `verbose: true` in the second argument to fetch()", .{ + @errorName(e), + path, + }) catch |err| bun.handleOom(err), + }, + .path = path, + }; + + return .{ .SystemError = fetch_error }; + } + + pub fn onReadableStreamAvailable(ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void { + const this = bun.cast(*FetchTasklet, ctx); + this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis); + } + + pub fn onStartStreamingHTTPResponseBodyCallback(ctx: *anyopaque) jsc.WebCore.DrainResult { + const this = bun.cast(*FetchTasklet, ctx); + if (this.signal_store.aborted.load(.monotonic)) { + return jsc.WebCore.DrainResult{ + .aborted = {}, + }; + } + + if (this.http) |http_| { + http_.enableResponseBodyStreaming(); + + // If the server sent the headers and the response body in two separate socket writes + // and if the server doesn't close the connection by itself + // and doesn't send any follow-up data + // then we must make sure the HTTP thread flushes. + bun.http.http_thread.scheduleResponseBodyDrain(http_.async_http_id); + } + + this.mutex.lock(); + defer this.mutex.unlock(); + const size_hint = this.getSizeHint(); + + var scheduled_response_buffer = this.scheduled_response_buffer.list; + // This means we have received part of the body but not the whole thing + if (scheduled_response_buffer.items.len > 0) { + this.scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + + return .{ + .owned = .{ + .list = scheduled_response_buffer.toManaged(bun.default_allocator), + .size_hint = size_hint, + }, + }; + } + + return .{ + .estimated_size = size_hint, + }; + } + + fn getSizeHint(this: *FetchTasklet) Blob.SizeType { + return switch (this.body_size) { + .content_length => @truncate(this.body_size.content_length), + .total_received => @truncate(this.body_size.total_received), + .unknown => 0, + }; + } + + fn toBodyValue(this: *FetchTasklet) Body.Value { + if (this.getAbortError()) |err| { + return .{ .Error = err }; + } + if (this.is_waiting_body) { + const response = Body.Value{ + .Locked = .{ + .size_hint = this.getSizeHint(), + .task = this, + .global = this.global_this, + .onStartStreaming = FetchTasklet.onStartStreamingHTTPResponseBodyCallback, + .onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable, + }, + }; + return response; + } + + var scheduled_response_buffer = this.scheduled_response_buffer.list; + const response = Body.Value{ + .InternalBlob = .{ + .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), + }, + }; + this.scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + + return response; + } + + fn toResponse(this: *FetchTasklet) Response { + log("toResponse", .{}); + bun.assert(this.metadata != null); + // at this point we always should have metadata + const metadata = this.metadata.?; + const http_response = metadata.response; + this.is_waiting_body = this.result.has_more; + return Response.init( + .{ + .headers = FetchHeaders.createFromPicoHeaders(http_response.headers), + .status_code = @as(u16, @truncate(http_response.status_code)), + .status_text = bun.String.createAtomIfPossible(http_response.status), + }, + Body{ + .value = this.toBodyValue(), + }, + bun.String.createAtomIfPossible(metadata.url), + this.result.redirected, + ); + } + + fn ignoreRemainingResponseBody(this: *FetchTasklet) void { + log("ignoreRemainingResponseBody", .{}); + // enabling streaming will make the http thread to drain into the main thread (aka stop buffering) + // without a stream ref, response body or response instance alive it will just ignore the result + if (this.http) |http_| { + http_.enableResponseBodyStreaming(); + } + // we should not keep the process alive if we are ignoring the body + const vm = this.javascript_vm; + this.poll_ref.unref(vm); + // clean any remaining refereces + this.readable_stream_ref.deinit(); + this.response.deinit(); + + if (this.native_response) |response| { + response.unref(); + this.native_response = null; + } + + this.ignore_data = true; + } + + export fn Bun__FetchResponse_finalize(this: *FetchTasklet) callconv(.C) void { + log("onResponseFinalize", .{}); + if (this.native_response) |response| { + const body = response.getBodyValue(); + // Three scenarios: + // + // 1. We are streaming, in which case we should not ignore the body. + // 2. We were buffering, in which case + // 2a. if we have no promise, we should ignore the body. + // 2b. if we have a promise, we should keep loading the body. + // 3. We never started buffering, in which case we should ignore the body. + // + // Note: We cannot call .get() on the ReadableStreamRef. This is called inside a finalizer. + if (body.* != .Locked or this.readable_stream_ref.held.has()) { + // Scenario 1 or 3. + return; + } + + if (body.Locked.promise) |promise| { + if (promise.isEmptyOrUndefinedOrNull()) { + // Scenario 2b. + this.ignoreRemainingResponseBody(); + } + } else { + // Scenario 3. + this.ignoreRemainingResponseBody(); + } + } + } + comptime { + _ = Bun__FetchResponse_finalize; + } + + pub fn onResolve(this: *FetchTasklet) JSValue { + log("onResolve", .{}); + const response = bun.new(Response, this.toResponse()); + const response_js = Response.makeMaybePooled(@as(*jsc.JSGlobalObject, this.global_this), response); + response_js.ensureStillAlive(); + this.response = jsc.Weak(FetchTasklet).create(response_js, this.global_this, .FetchResponse, this); + this.native_response = response.ref(); + return response_js; + } + + pub fn get( + allocator: std.mem.Allocator, + globalThis: *jsc.JSGlobalObject, + fetch_options: *const FetchOptions, + promise: jsc.JSPromise.Strong, + ) !*FetchTasklet { + var jsc_vm = globalThis.bunVM(); + var fetch_tasklet = try allocator.create(FetchTasklet); + + fetch_tasklet.* = .{ + .mutex = .{}, + .scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }, + .response_buffer = MutableString{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }, + .http = try allocator.create(http.AsyncHTTP), + .javascript_vm = jsc_vm, + .request_body = fetch_options.body, + .global_this = globalThis, + .promise = promise, + .request_headers = fetch_options.headers, + .url_proxy_buffer = fetch_options.url_proxy_buffer, + .signal = fetch_options.signal, + .hostname = fetch_options.hostname, + .tracker = jsc.Debugger.AsyncTaskTracker.init(jsc_vm), + .check_server_identity = fetch_options.check_server_identity, + .reject_unauthorized = fetch_options.reject_unauthorized, + .upgraded_connection = fetch_options.upgraded_connection, + }; + + fetch_tasklet.signals = fetch_tasklet.signal_store.to(); + + fetch_tasklet.tracker.didSchedule(globalThis); + + if (fetch_tasklet.request_body.store()) |store| { + store.ref(); + } + + var proxy: ?ZigURL = null; + if (fetch_options.proxy) |proxy_opt| { + if (!proxy_opt.isEmpty()) { //if is empty just ignore proxy + proxy = fetch_options.proxy orelse jsc_vm.transpiler.env.getHttpProxyFor(fetch_options.url); + } + } else { + proxy = jsc_vm.transpiler.env.getHttpProxyFor(fetch_options.url); + } + + if (fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized) { + fetch_tasklet.signal_store.cert_errors.store(true, .monotonic); + } else { + fetch_tasklet.signals.cert_errors = null; + } + + // This task gets queued on the HTTP thread. + fetch_tasklet.http.?.* = http.AsyncHTTP.init( + bun.default_allocator, + fetch_options.method, + fetch_options.url, + fetch_options.headers.entries, + fetch_options.headers.buf.items, + &fetch_tasklet.response_buffer, + fetch_tasklet.request_body.slice(), + http.HTTPClientResult.Callback.New( + *FetchTasklet, + // handles response events (on headers, on body, etc.) + FetchTasklet.callback, + ).init(fetch_tasklet), + fetch_options.redirect_type, + .{ + .http_proxy = proxy, + .hostname = fetch_options.hostname, + .signals = fetch_tasklet.signals, + .unix_socket_path = fetch_options.unix_socket_path, + .disable_timeout = fetch_options.disable_timeout, + .disable_keepalive = fetch_options.disable_keepalive, + .disable_decompression = fetch_options.disable_decompression, + .reject_unauthorized = fetch_options.reject_unauthorized, + .verbose = fetch_options.verbose, + .tls_props = fetch_options.ssl_config, + }, + ); + // enable streaming the write side + const isStream = fetch_tasklet.request_body == .ReadableStream; + fetch_tasklet.http.?.client.flags.is_streaming_request_body = isStream; + fetch_tasklet.is_waiting_request_stream_start = isStream; + if (isStream) { + const buffer = http.ThreadSafeStreamBuffer.new(.{}); + buffer.setDrainCallback(FetchTasklet, FetchTasklet.onWriteRequestDataDrain, fetch_tasklet); + fetch_tasklet.request_body_streaming_buffer = buffer; + fetch_tasklet.http.?.request_body = .{ + .stream = .{ + .buffer = buffer, + .ended = false, + }, + }; + } + // TODO is this necessary? the http client already sets the redirect type, + // so manually setting it here seems redundant + if (fetch_options.redirect_type != FetchRedirect.follow) { + fetch_tasklet.http.?.client.remaining_redirect_count = 0; + } + + // we want to return after headers are received + fetch_tasklet.signal_store.header_progress.store(true, .monotonic); + + if (fetch_tasklet.request_body == .Sendfile) { + bun.assert(fetch_options.url.isHTTP()); + bun.assert(fetch_options.proxy == null); + fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile }; + } + + if (fetch_tasklet.signal) |signal| { + signal.pendingActivityRef(); + fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener); + } + return fetch_tasklet; + } + + pub fn abortListener(this: *FetchTasklet, reason: JSValue) void { + log("abortListener", .{}); + reason.ensureStillAlive(); + this.abort_reason.set(this.global_this, reason); + this.abortTask(); + if (this.sink) |sink| { + sink.cancel(reason); + return; + } + } + + /// This is ALWAYS called from the http thread and we cannot touch the buffer here because is locked + pub fn onWriteRequestDataDrain(this: *FetchTasklet) void { + // ref until the main thread callback is called + this.ref(); + this.javascript_vm.eventLoop().enqueueTaskConcurrent(jsc.ConcurrentTask.fromCallback(this, FetchTasklet.resumeRequestDataStream)); + } + + /// This is ALWAYS called from the main thread + // XXX: 'fn (*FetchTasklet) error{}!void' coerces to 'fn (*FetchTasklet) bun.JSError!void' but 'fn (*FetchTasklet) void' does not + pub fn resumeRequestDataStream(this: *FetchTasklet) error{}!void { + // deref when done because we ref inside onWriteRequestDataDrain + defer this.deref(); + log("resumeRequestDataStream", .{}); + if (this.sink) |sink| { + if (this.signal) |signal| { + if (signal.aborted()) { + // already aborted; nothing to drain + return; + } + } + sink.drain(); + } + } + + pub fn writeRequestData(this: *FetchTasklet, data: []const u8) ResumableSinkBackpressure { + log("writeRequestData {}", .{data.len}); + if (this.signal) |signal| { + if (signal.aborted()) { + return .done; + } + } + const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return .done; + const stream_buffer = thread_safe_stream_buffer.acquire(); + defer thread_safe_stream_buffer.release(); + const highWaterMark = if (this.sink) |sink| sink.highWaterMark else 16384; + + var needs_schedule = false; + defer if (needs_schedule) { + // wakeup the http thread to write the data + http.http_thread.scheduleRequestWrite(this.http.?, .data); + }; + + // dont have backpressure so we will schedule the data to be written + // if we have backpressure the onWritable will drain the buffer + needs_schedule = stream_buffer.isEmpty(); + if (this.upgraded_connection) { + bun.handleOom(stream_buffer.write(data)); + } else { + //16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n + var formated_size_buffer: [18]u8 = undefined; + const formated_size = std.fmt.bufPrint( + formated_size_buffer[0..], + "{x}\r\n", + .{data.len}, + ) catch |err| switch (err) { + error.NoSpaceLeft => unreachable, + }; + bun.handleOom(stream_buffer.ensureUnusedCapacity(formated_size.len + data.len + 2)); + stream_buffer.writeAssumeCapacity(formated_size); + stream_buffer.writeAssumeCapacity(data); + stream_buffer.writeAssumeCapacity("\r\n"); + } + + // pause the stream if we hit the high water mark + return if (stream_buffer.size() >= highWaterMark) .backpressure else .want_more; + } + + pub fn writeEndRequest(this: *FetchTasklet, err: ?jsc.JSValue) void { + log("writeEndRequest hasError? {}", .{err != null}); + defer this.deref(); + if (err) |jsError| { + if (this.signal_store.aborted.load(.monotonic) or this.abort_reason.has()) { + return; + } + if (!jsError.isUndefinedOrNull()) { + this.abort_reason.set(this.global_this, jsError); + } + this.abortTask(); + } else { + if (!this.upgraded_connection) { + // If is not upgraded we need to send the terminating chunk + const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return; + const stream_buffer = thread_safe_stream_buffer.acquire(); + defer thread_safe_stream_buffer.release(); + bun.handleOom(stream_buffer.write(http.end_of_chunked_http1_1_encoding_response_body)); + } + if (this.http) |http_| { + // just tell to write the end of the chunked encoding aka 0\r\n\r\n + http.http_thread.scheduleRequestWrite(http_, .end); + } + } + } + + pub fn abortTask(this: *FetchTasklet) void { + this.signal_store.aborted.store(true, .monotonic); + this.tracker.didCancel(this.global_this); + + if (this.http) |http_| { + http.http_thread.scheduleShutdown(http_); + } + } + + const FetchOptions = struct { + method: Method, + headers: Headers, + body: HTTPRequestBody, + disable_timeout: bool, + disable_keepalive: bool, + disable_decompression: bool, + reject_unauthorized: bool, + url: ZigURL, + verbose: http.HTTPVerboseLevel = .none, + redirect_type: FetchRedirect = FetchRedirect.follow, + proxy: ?ZigURL = null, + url_proxy_buffer: []const u8 = "", + signal: ?*jsc.WebCore.AbortSignal = null, + globalThis: ?*JSGlobalObject, + // Custom Hostname + hostname: ?[]u8 = null, + check_server_identity: jsc.Strong.Optional = .empty, + unix_socket_path: ZigString.Slice, + ssl_config: ?*SSLConfig = null, + upgraded_connection: bool = false, + }; + + pub fn queue( + allocator: std.mem.Allocator, + global: *JSGlobalObject, + fetch_options: *const FetchOptions, + promise: jsc.JSPromise.Strong, + ) !*FetchTasklet { + http.HTTPThread.init(&.{}); + var node = try get( + allocator, + global, + fetch_options, + promise, + ); + + var batch = bun.ThreadPool.Batch{}; + node.http.?.schedule(allocator, &batch); + node.poll_ref.ref(global.bunVM()); + + // increment ref so we can keep it alive until the http client is done + node.ref(); + http.http_thread.schedule(batch); + + return node; + } + + /// Called from HTTP thread. Handles HTTP events received from socket. + pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void { + // at this point only this thread is accessing result to is no race condition + const is_done = !result.has_more; + // we are done with the http client so we can deref our side + // this is a atomic operation and will enqueue a task to deinit on the main thread + defer if (is_done) task.derefFromThread(); + + task.mutex.lock(); + // we need to unlock before task.deref(); + defer task.mutex.unlock(); + task.http.?.* = async_http.*; + task.http.?.response_buffer = async_http.response_buffer; + + log("callback success={} ignore_data={} has_more={} bytes={}", .{ result.isSuccess(), task.ignore_data, result.has_more, result.body.?.list.items.len }); + + const prev_metadata = task.result.metadata; + const prev_cert_info = task.result.certificate_info; + task.result = result; + + // Preserve pending certificate info if it was preovided in the previous update. + if (task.result.certificate_info == null) { + if (prev_cert_info) |cert_info| { + task.result.certificate_info = cert_info; + } + } + + // metadata should be provided only once + if (result.metadata orelse prev_metadata) |metadata| { + log("added callback metadata", .{}); + if (task.metadata == null) { + task.metadata = metadata; + } + + task.result.metadata = null; + } + + task.body_size = result.body_size; + + const success = result.isSuccess(); + task.response_buffer = result.body.?.*; + + if (task.ignore_data) { + task.response_buffer.reset(); + + if (task.scheduled_response_buffer.list.capacity > 0) { + task.scheduled_response_buffer.deinit(); + task.scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + } + if (success and result.has_more) { + // we are ignoring the body so we should not receive more data, so will only signal when result.has_more = true + return; + } + } else { + if (success) { + _ = bun.handleOom(task.scheduled_response_buffer.write(task.response_buffer.list.items)); + } + // reset for reuse + task.response_buffer.reset(); + } + + if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| { + if (has_schedule_callback) { + return; + } + } + + task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit)); + } +}; + +const X509 = @import("../../api/bun/x509.zig"); +const std = @import("std"); +const Method = @import("../../../http/Method.zig").Method; +const ZigURL = @import("../../../url.zig").URL; + +const bun = @import("bun"); +const Async = bun.Async; +const MutableString = bun.MutableString; +const Mutex = bun.Mutex; +const Output = bun.Output; +const BoringSSL = bun.BoringSSL.c; +const FetchHeaders = bun.webcore.FetchHeaders; +const SSLConfig = bun.api.server.ServerConfig.SSLConfig; + +const http = bun.http; +const FetchRedirect = http.FetchRedirect; +const Headers = bun.http.Headers; + +const jsc = bun.jsc; +const JSGlobalObject = jsc.JSGlobalObject; +const JSPromise = jsc.JSPromise; +const JSValue = jsc.JSValue; +const VirtualMachine = jsc.VirtualMachine; +const ZigString = jsc.ZigString; + +const Body = jsc.WebCore.Body; +const Response = jsc.WebCore.Response; +const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure; + +const Blob = jsc.WebCore.Blob; +const AnyBlob = jsc.WebCore.Blob.Any;