mirror of
https://github.com/oven-sh/bun
synced 2026-02-09 10:28:47 +00:00
Fix flaky request.signal implementation
This commit is contained in:
@@ -1690,6 +1690,26 @@ pub const AbortSignal = extern opaque {
|
||||
pub const name = "JSC::AbortSignal";
|
||||
pub const namespace = "JSC";
|
||||
|
||||
pub fn listen(
|
||||
this: *AbortSignal,
|
||||
comptime Context: type,
|
||||
ctx: *Context,
|
||||
comptime cb: *const fn (*Context, JSValue) void,
|
||||
) *AbortSignal {
|
||||
const Wrapper = struct {
|
||||
const call = cb;
|
||||
pub fn callback(
|
||||
ptr: ?*anyopaque,
|
||||
reason: JSValue,
|
||||
) callconv(.C) void {
|
||||
var val = bun.cast(*Context, ptr.?);
|
||||
call(val, reason);
|
||||
}
|
||||
};
|
||||
|
||||
return this.addListener(@ptrCast(?*anyopaque, ctx), Wrapper.callback);
|
||||
}
|
||||
|
||||
pub fn addListener(
|
||||
this: *AbortSignal,
|
||||
ctx: ?*anyopaque,
|
||||
@@ -1709,10 +1729,12 @@ pub const AbortSignal = extern opaque {
|
||||
return cppFn("signal", .{ this, reason });
|
||||
}
|
||||
|
||||
/// This function is not threadsafe. aborted is a boolean, not an atomic!
|
||||
pub fn aborted(this: *AbortSignal) bool {
|
||||
return cppFn("aborted", .{this});
|
||||
}
|
||||
|
||||
/// This function is not threadsafe. JSValue cannot safely be passed between threads.
|
||||
pub fn abortReason(this: *AbortSignal) JSValue {
|
||||
return cppFn("abortReason", .{this});
|
||||
}
|
||||
@@ -1734,11 +1756,11 @@ pub const AbortSignal = extern opaque {
|
||||
}
|
||||
|
||||
pub fn toJS(this: *AbortSignal, global: *JSGlobalObject) JSValue {
|
||||
return cppFn("toJS", .{this, global});
|
||||
return cppFn("toJS", .{ this, global });
|
||||
}
|
||||
|
||||
pub fn create(global: *JSGlobalObject) JSValue {
|
||||
return cppFn("create", .{ global });
|
||||
return cppFn("create", .{global});
|
||||
}
|
||||
|
||||
pub fn createAbortError(message: *const ZigString, code: *const ZigString, global: *JSGlobalObject) JSValue {
|
||||
@@ -1749,20 +1771,7 @@ pub const AbortSignal = extern opaque {
|
||||
return cppFn("createTimeoutError", .{ message, code, global });
|
||||
}
|
||||
|
||||
pub const Extern = [_][]const u8{
|
||||
"createAbortError",
|
||||
"createTimeoutError",
|
||||
"create",
|
||||
"ref",
|
||||
"unref",
|
||||
"signal",
|
||||
"abortReason",
|
||||
"aborted",
|
||||
"addListener",
|
||||
"fromJS",
|
||||
"toJS",
|
||||
"cleanNativeBindings"
|
||||
};
|
||||
pub const Extern = [_][]const u8{ "createAbortError", "createTimeoutError", "create", "ref", "unref", "signal", "abortReason", "aborted", "addListener", "fromJS", "toJS", "cleanNativeBindings" };
|
||||
};
|
||||
|
||||
pub const JSPromise = extern struct {
|
||||
@@ -3567,14 +3576,7 @@ pub const JSValue = enum(JSValueReprInt) {
|
||||
return cppFn("eqlCell", .{ this, other });
|
||||
}
|
||||
|
||||
pub const BuiltinName = enum(u8) {
|
||||
method,
|
||||
headers,
|
||||
status,
|
||||
url,
|
||||
body,
|
||||
data
|
||||
};
|
||||
pub const BuiltinName = enum(u8) { method, headers, status, url, body, data };
|
||||
|
||||
// intended to be more lightweight than ZigString
|
||||
pub fn fastGet(this: JSValue, global: *JSGlobalObject, builtin_name: BuiltinName) ?JSValue {
|
||||
|
||||
@@ -260,7 +260,7 @@ pub const Request = struct {
|
||||
return ZigString.init(Properties.UTF8.navigate).toValue(globalThis);
|
||||
}
|
||||
|
||||
pub fn finalize(this: *Request) callconv(.C) void {
|
||||
pub fn finalizeWithoutDeinit(this: *Request) void {
|
||||
if (this.headers) |headers| {
|
||||
headers.deref();
|
||||
this.headers = null;
|
||||
@@ -275,7 +275,10 @@ pub const Request = struct {
|
||||
_ = signal.unref();
|
||||
this.signal = null;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finalize(this: *Request) callconv(.C) void {
|
||||
this.finalizeWithoutDeinit();
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
@@ -402,10 +405,7 @@ pub const Request = struct {
|
||||
if (Body.Value.fromJS(globalThis, body_)) |body| {
|
||||
request.body = body;
|
||||
} else {
|
||||
if (request.headers) |head| {
|
||||
head.deref();
|
||||
}
|
||||
|
||||
request.finalizeWithoutDeinit();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -419,6 +419,19 @@ pub const Request = struct {
|
||||
}
|
||||
},
|
||||
else => {
|
||||
if (arguments[1].get(globalThis, "signal")) |signal_| {
|
||||
if (AbortSignal.fromJS(signal_)) |signal| {
|
||||
//Keep it alive
|
||||
signal_.ensureStillAlive();
|
||||
request.signal = signal.ref();
|
||||
} else {
|
||||
globalThis.throw("Failed to construct 'Request': member signal is not of type AbortSignal.", .{});
|
||||
|
||||
request.finalizeWithoutDeinit();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
if (Body.Init.init(getAllocator(globalThis), globalThis, arguments[1], arguments[1].jsType()) catch null) |req_init| {
|
||||
request.headers = req_init.headers;
|
||||
request.method = req_init.method;
|
||||
@@ -428,26 +441,7 @@ pub const Request = struct {
|
||||
if (Body.Value.fromJS(globalThis, body_)) |body| {
|
||||
request.body = body;
|
||||
} else {
|
||||
if (request.headers) |head| {
|
||||
head.deref();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
if (arguments[1].get(globalThis, "signal")) |signal_| {
|
||||
if (AbortSignal.fromJS(signal_)) |signal| {
|
||||
//Keep it alive
|
||||
signal_.ensureStillAlive();
|
||||
request.signal = signal.ref();
|
||||
} else {
|
||||
globalThis.throw("Failed to construct 'Request': member signal is not of type AbortSignal.", .{});
|
||||
|
||||
if (request.headers) |head| {
|
||||
head.deref();
|
||||
}
|
||||
|
||||
request.finalizeWithoutDeinit();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -623,6 +623,12 @@ pub const Fetch = struct {
|
||||
/// We always clone url and proxy (if informed)
|
||||
url_proxy_buffer: []const u8 = "",
|
||||
|
||||
signal: ?*JSC.AbortSignal = null,
|
||||
aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
|
||||
|
||||
// must be stored because AbortSignal stores reason weakly
|
||||
abort_reason: JSValue = JSValue.zero,
|
||||
|
||||
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
|
||||
return FetchTasklet{};
|
||||
}
|
||||
@@ -641,6 +647,14 @@ pub const Fetch = struct {
|
||||
this.result.deinitMetadata();
|
||||
this.response_buffer.deinit();
|
||||
this.request_body.detach();
|
||||
|
||||
if (this.abort_reason != .zero) this.abort_reason.unprotect();
|
||||
|
||||
if (this.signal) |signal| {
|
||||
signal.cleanNativeBindings(this);
|
||||
_ = signal.unref();
|
||||
this.signal = null;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(this: *FetchTasklet) void {
|
||||
@@ -688,29 +702,23 @@ pub const Fetch = struct {
|
||||
}
|
||||
|
||||
pub fn onReject(this: *FetchTasklet) JSValue {
|
||||
if (this.signal) |signal| {
|
||||
_ = signal.unref();
|
||||
this.signal = null;
|
||||
}
|
||||
|
||||
if (!this.abort_reason.isEmptyOrUndefinedOrNull()) {
|
||||
return this.abort_reason;
|
||||
}
|
||||
|
||||
if (this.result.isTimeout()) {
|
||||
//Timeout with reason
|
||||
if (this.result.reason) |exception| {
|
||||
if (!exception.isEmptyOrUndefinedOrNull()) {
|
||||
return exception;
|
||||
}
|
||||
}
|
||||
//Timeout without reason
|
||||
const exception = JSC.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this);
|
||||
return exception;
|
||||
// Timeout without reason
|
||||
return JSC.AbortSignal.createTimeoutError(JSC.ZigString.static("The operation timed out"), &JSC.ZigString.Empty, this.global_this);
|
||||
}
|
||||
|
||||
if (this.result.isAbort()) {
|
||||
//Abort can be TimeoutError (AbortSignal.timeout(ms)) or AbortError so we need to detect
|
||||
if (this.result.reason) |exception| {
|
||||
if (!exception.isEmptyOrUndefinedOrNull()) {
|
||||
return exception;
|
||||
}
|
||||
}
|
||||
|
||||
//Abort without reason
|
||||
const exception = JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
|
||||
return exception;
|
||||
// Abort without reason
|
||||
return JSC.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
|
||||
}
|
||||
|
||||
const fetch_error = JSC.SystemError{
|
||||
@@ -793,6 +801,7 @@ pub const Fetch = struct {
|
||||
.request_headers = fetch_options.headers,
|
||||
.ref = JSC.napi.Ref.create(globalThis, promise),
|
||||
.url_proxy_buffer = fetch_options.url_proxy_buffer,
|
||||
.signal = fetch_options.signal,
|
||||
};
|
||||
|
||||
if (fetch_tasklet.request_body.store()) |store| {
|
||||
@@ -808,12 +817,24 @@ pub const Fetch = struct {
|
||||
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
|
||||
}
|
||||
|
||||
fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), fetch_options.timeout, HTTPClient.HTTPClientResult.Callback.New(
|
||||
*FetchTasklet,
|
||||
FetchTasklet.callback,
|
||||
).init(
|
||||
fetch_tasklet,
|
||||
), proxy, fetch_options.signal);
|
||||
fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
|
||||
allocator,
|
||||
fetch_options.method,
|
||||
fetch_options.url,
|
||||
fetch_options.headers.entries,
|
||||
fetch_options.headers.buf.items,
|
||||
&fetch_tasklet.response_buffer,
|
||||
fetch_tasklet.request_body.slice(),
|
||||
fetch_options.timeout,
|
||||
HTTPClient.HTTPClientResult.Callback.New(
|
||||
*FetchTasklet,
|
||||
FetchTasklet.callback,
|
||||
).init(
|
||||
fetch_tasklet,
|
||||
),
|
||||
proxy,
|
||||
if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null,
|
||||
);
|
||||
|
||||
if (!fetch_options.follow_redirects) {
|
||||
fetch_tasklet.http.?.client.remaining_redirect_count = 0;
|
||||
@@ -822,10 +843,35 @@ pub const Fetch = struct {
|
||||
fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout;
|
||||
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
|
||||
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
|
||||
|
||||
if (fetch_tasklet.signal) |signal| {
|
||||
fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener);
|
||||
}
|
||||
return fetch_tasklet;
|
||||
}
|
||||
|
||||
const FetchOptions = struct { method: Method, headers: Headers, body: AnyBlob, timeout: usize, disable_timeout: bool, disable_keepalive: bool, url: ZigURL, verbose: bool = false, follow_redirects: bool = true, proxy: ?ZigURL = null, url_proxy_buffer: []const u8 = "", signal: ?*JSC.AbortSignal = null, globalThis: ?*JSGlobalObject };
|
||||
pub fn abortListener(this: *FetchTasklet, reason: JSValue) void {
|
||||
reason.ensureStillAlive();
|
||||
this.abort_reason = reason;
|
||||
reason.protect();
|
||||
this.aborted.store(true, .Monotonic);
|
||||
}
|
||||
|
||||
const FetchOptions = struct {
|
||||
method: Method,
|
||||
headers: Headers,
|
||||
body: AnyBlob,
|
||||
timeout: usize,
|
||||
disable_timeout: bool,
|
||||
disable_keepalive: bool,
|
||||
url: ZigURL,
|
||||
verbose: bool = false,
|
||||
follow_redirects: bool = true,
|
||||
proxy: ?ZigURL = null,
|
||||
url_proxy_buffer: []const u8 = "",
|
||||
signal: ?*JSC.AbortSignal = null,
|
||||
globalThis: ?*JSGlobalObject,
|
||||
};
|
||||
|
||||
pub fn queue(
|
||||
allocator: std.mem.Allocator,
|
||||
@@ -1205,9 +1251,23 @@ pub const Fetch = struct {
|
||||
_ = FetchTasklet.queue(
|
||||
default_allocator,
|
||||
globalThis,
|
||||
.{ .method = method, .url = url, .headers = headers orelse Headers{
|
||||
.allocator = bun.default_allocator,
|
||||
}, .body = body, .timeout = std.time.ns_per_hour, .disable_keepalive = disable_keepalive, .disable_timeout = disable_timeout, .follow_redirects = follow_redirects, .verbose = verbose, .proxy = proxy, .url_proxy_buffer = url_proxy_buffer, .signal = signal, .globalThis = globalThis },
|
||||
.{
|
||||
.method = method,
|
||||
.url = url,
|
||||
.headers = headers orelse Headers{
|
||||
.allocator = bun.default_allocator,
|
||||
},
|
||||
.body = body,
|
||||
.timeout = std.time.ns_per_hour,
|
||||
.disable_keepalive = disable_keepalive,
|
||||
.disable_timeout = disable_timeout,
|
||||
.follow_redirects = follow_redirects,
|
||||
.verbose = verbose,
|
||||
.proxy = proxy,
|
||||
.url_proxy_buffer = url_proxy_buffer,
|
||||
.signal = signal,
|
||||
.globalThis = globalThis,
|
||||
},
|
||||
promise_val,
|
||||
) catch unreachable;
|
||||
return promise_val.asRef();
|
||||
|
||||
@@ -635,6 +635,11 @@ pub fn onOpen(
|
||||
|
||||
log("Connected {s} \n", .{client.url.href});
|
||||
|
||||
if (client.hasSignalAborted()) {
|
||||
client.closeAndAbort(comptime is_ssl, socket);
|
||||
return;
|
||||
}
|
||||
|
||||
if (comptime is_ssl) {
|
||||
var ssl: *BoringSSL.SSL = @ptrCast(*BoringSSL.SSL, socket.getNativeHandle());
|
||||
if (!ssl.isInitFinished()) {
|
||||
@@ -661,7 +666,7 @@ pub fn onOpen(
|
||||
ssl.configureHTTPClient(hostname);
|
||||
}
|
||||
}
|
||||
client.addAbortSignalEventListenner(is_ssl, socket);
|
||||
|
||||
if (client.state.request_stage == .pending) {
|
||||
client.onWritable(true, comptime is_ssl, socket);
|
||||
}
|
||||
@@ -695,7 +700,7 @@ pub fn onClose(
|
||||
}
|
||||
|
||||
if (in_progress) {
|
||||
client.fail(error.ConnectionClosed, null);
|
||||
client.fail(error.ConnectionClosed);
|
||||
}
|
||||
}
|
||||
pub fn onTimeout(
|
||||
@@ -707,7 +712,7 @@ pub fn onTimeout(
|
||||
log("Timeout {s}\n", .{client.url.href});
|
||||
|
||||
if (client.state.stage != .done and client.state.stage != .fail) {
|
||||
client.fail(error.Timeout, null);
|
||||
client.fail(error.Timeout);
|
||||
}
|
||||
}
|
||||
pub fn onConnectError(
|
||||
@@ -719,7 +724,7 @@ pub fn onConnectError(
|
||||
log("onConnectError {s}\n", .{client.url.href});
|
||||
|
||||
if (client.state.stage != .done and client.state.stage != .fail)
|
||||
client.fail(error.ConnectionRefused, null);
|
||||
client.fail(error.ConnectionRefused);
|
||||
}
|
||||
pub fn onEnd(
|
||||
client: *HTTPClient,
|
||||
@@ -729,7 +734,7 @@ pub fn onEnd(
|
||||
log("onEnd {s}\n", .{client.url.href});
|
||||
|
||||
if (client.state.stage != .done and client.state.stage != .fail)
|
||||
client.fail(error.ConnectionClosed, null);
|
||||
client.fail(error.ConnectionClosed);
|
||||
}
|
||||
|
||||
pub inline fn getAllocator() std.mem.Allocator {
|
||||
@@ -994,87 +999,26 @@ http_proxy: ?URL = null,
|
||||
proxy_authorization: ?[]u8 = null,
|
||||
proxy_tunneling: bool = false,
|
||||
proxy_tunnel: ?ProxyTunnel = null,
|
||||
signal: ?*JSC.AbortSignal = null,
|
||||
abort_handler: ?*anyopaque = null,
|
||||
abort_handler_deinit: ?*const fn (?*anyopaque) void = null,
|
||||
pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entries: Headers.Entries, header_buf: string, signal: ?*JSC.AbortSignal) HTTPClient {
|
||||
return HTTPClient{ .allocator = allocator, .method = method, .url = url, .header_entries = header_entries, .header_buf = header_buf, .signal = signal, .abort_handler = null, .abort_handler_deinit = null };
|
||||
}
|
||||
aborted: ?*std.atomic.Atomic(bool) = null,
|
||||
|
||||
pub fn ClientSocketAbortHandler(comptime is_ssl: bool) type {
|
||||
return struct {
|
||||
client: *HTTPClient,
|
||||
socket: NewHTTPContext(is_ssl).HTTPSocket,
|
||||
|
||||
pub fn init(client: *HTTPClient, socket: NewHTTPContext(is_ssl).HTTPSocket) !*@This() {
|
||||
var ctx = try client.allocator.create(@This());
|
||||
ctx.client = client;
|
||||
ctx.socket = socket;
|
||||
return ctx;
|
||||
}
|
||||
|
||||
pub fn onAborted(this: ?*anyopaque, reason: JSC.JSValue) callconv(.C) void {
|
||||
log("onAborted", .{});
|
||||
if (this) |this_| {
|
||||
const self = bun.cast(*@This(), this_);
|
||||
if (self.client.state.response_stage != .done and self.client.state.response_stage != .fail) {
|
||||
self.client.closeAndAbort(reason, is_ssl, self.socket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(this: ?*anyopaque) void {
|
||||
if (this) |this_| {
|
||||
var self = bun.cast(*@This(), this_);
|
||||
const allocator = self.client.allocator;
|
||||
allocator.destroy(self);
|
||||
}
|
||||
}
|
||||
pub fn init(
|
||||
allocator: std.mem.Allocator,
|
||||
method: Method,
|
||||
url: URL,
|
||||
header_entries: Headers.Entries,
|
||||
header_buf: string,
|
||||
signal: ?*std.atomic.Atomic(bool),
|
||||
) HTTPClient {
|
||||
return HTTPClient{
|
||||
.allocator = allocator,
|
||||
.method = method,
|
||||
.url = url,
|
||||
.header_entries = header_entries,
|
||||
.header_buf = header_buf,
|
||||
.aborted = signal,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn addAbortSignalEventListenner(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
if (this.signal) |signal| {
|
||||
const aborted = signal.aborted();
|
||||
if (aborted) {
|
||||
log("addAbortSignalEventListenner already aborted!", .{});
|
||||
const reason = signal.abortReason();
|
||||
this.closeAndAbort(reason, is_ssl, socket);
|
||||
return;
|
||||
}
|
||||
|
||||
const handler = ClientSocketAbortHandler(is_ssl).init(this, socket) catch unreachable;
|
||||
this.abort_handler = bun.cast(*anyopaque, handler);
|
||||
this.abort_handler_deinit = ClientSocketAbortHandler(is_ssl).deinit;
|
||||
_ = signal.addListener(this.abort_handler.?, ClientSocketAbortHandler(is_ssl).onAborted);
|
||||
log("addAbortSignalEventListenner added!", .{});
|
||||
return;
|
||||
}
|
||||
log("addAbortSignalEventListenner (signal == null)", .{});
|
||||
}
|
||||
|
||||
pub fn hasSignalAborted(this: *HTTPClient) ?JSC.JSValue {
|
||||
if (this.signal) |signal| {
|
||||
const aborted = signal.aborted();
|
||||
log("hasSignalAborted {any}", .{aborted});
|
||||
if (aborted) {
|
||||
return signal.abortReason();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
log("hasSignalAborted (signal == null)", .{});
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn deinitSignal(this: *HTTPClient) void {
|
||||
if (this.signal != null) {
|
||||
var signal = this.signal.?;
|
||||
const ctx = bun.cast(*anyopaque, this);
|
||||
signal.cleanNativeBindings(ctx);
|
||||
_ = signal.unref();
|
||||
this.signal = null;
|
||||
}
|
||||
}
|
||||
pub fn deinit(this: *HTTPClient) void {
|
||||
if (this.redirect) |redirect| {
|
||||
redirect.release();
|
||||
@@ -1089,13 +1033,6 @@ pub fn deinit(this: *HTTPClient) void {
|
||||
this.proxy_tunnel = null;
|
||||
}
|
||||
|
||||
this.deinitSignal();
|
||||
|
||||
if (this.abort_handler != null and this.abort_handler_deinit != null) {
|
||||
this.abort_handler_deinit.?(this.abort_handler.?);
|
||||
this.abort_handler = null;
|
||||
this.abort_handler_deinit = null;
|
||||
}
|
||||
this.state.compressed_body.deinit();
|
||||
this.state.response_message_buffer.deinit();
|
||||
}
|
||||
@@ -1255,8 +1192,30 @@ pub const AsyncHTTP = struct {
|
||||
};
|
||||
const AtomicState = std.atomic.Atomic(State);
|
||||
|
||||
pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, callback: HTTPClientResult.Callback, http_proxy: ?URL, signal: ?*JSC.AbortSignal) AsyncHTTP {
|
||||
var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy };
|
||||
pub fn init(
|
||||
allocator: std.mem.Allocator,
|
||||
method: Method,
|
||||
url: URL,
|
||||
headers: Headers.Entries,
|
||||
headers_buf: string,
|
||||
response_buffer: *MutableString,
|
||||
request_body: []const u8,
|
||||
timeout: usize,
|
||||
callback: HTTPClientResult.Callback,
|
||||
http_proxy: ?URL,
|
||||
signal: ?*std.atomic.Atomic(bool),
|
||||
) AsyncHTTP {
|
||||
var this = AsyncHTTP{
|
||||
.allocator = allocator,
|
||||
.url = url,
|
||||
.method = method,
|
||||
.request_headers = headers,
|
||||
.request_header_buf = headers_buf,
|
||||
.request_body = request_body,
|
||||
.response_buffer = response_buffer,
|
||||
.completion_callback = callback,
|
||||
.http_proxy = http_proxy,
|
||||
};
|
||||
this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal);
|
||||
this.client.timeout = timeout;
|
||||
this.client.http_proxy = this.http_proxy;
|
||||
@@ -1293,7 +1252,8 @@ pub const AsyncHTTP = struct {
|
||||
|
||||
fn reset(this: *AsyncHTTP) !void {
|
||||
const timeout = this.timeout;
|
||||
this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf);
|
||||
var aborted = this.client.aborted;
|
||||
this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf, aborted);
|
||||
this.client.timeout = timeout;
|
||||
this.client.http_proxy = this.http_proxy;
|
||||
if (this.http_proxy) |proxy| {
|
||||
@@ -1409,6 +1369,10 @@ pub const AsyncHTTP = struct {
|
||||
}
|
||||
};
|
||||
|
||||
pub fn hasSignalAborted(this: *const HTTPClient) bool {
|
||||
return (this.aborted orelse return false).load(.Monotonic);
|
||||
}
|
||||
|
||||
pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
|
||||
var header_count: usize = 0;
|
||||
var header_entries = this.header_entries.slice();
|
||||
@@ -1520,7 +1484,7 @@ pub fn doRedirect(this: *HTTPClient) void {
|
||||
std.debug.assert(this.follow_redirects);
|
||||
|
||||
if (this.remaining_redirect_count == 0) {
|
||||
this.fail(error.TooManyRedirects, null);
|
||||
this.fail(error.TooManyRedirects);
|
||||
return;
|
||||
}
|
||||
this.state.reset();
|
||||
@@ -1557,18 +1521,18 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString)
|
||||
|
||||
fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
|
||||
// Aborted before connecting
|
||||
if (this.hasSignalAborted()) |reason| {
|
||||
this.fail(error.Aborted, reason);
|
||||
if (this.hasSignalAborted()){
|
||||
this.fail(error.Aborted);
|
||||
return;
|
||||
}
|
||||
|
||||
var socket = http_thread.connect(this, is_ssl) catch |err| {
|
||||
this.fail(err, null);
|
||||
this.fail(err);
|
||||
return;
|
||||
};
|
||||
|
||||
if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) {
|
||||
this.fail(error.ConnectionClosed, null);
|
||||
this.fail(error.ConnectionClosed);
|
||||
std.debug.assert(this.state.fail != error.NoError);
|
||||
}
|
||||
}
|
||||
@@ -1594,8 +1558,8 @@ fn printResponse(response: picohttp.Response) void {
|
||||
}
|
||||
|
||||
pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
if (this.hasSignalAborted()) |reason| {
|
||||
this.closeAndAbort(reason, is_ssl, socket);
|
||||
if (this.hasSignalAborted()) {
|
||||
this.closeAndAbort(is_ssl, socket);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1840,7 +1804,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
||||
|
||||
pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
if (this.state.stage != .fail and this.state.stage != .done) {
|
||||
log("closeAndFail", .{});
|
||||
log("closeAndFail: {s}", .{@errorName(err)});
|
||||
if (!socket.isClosed()) {
|
||||
socket.ext(**anyopaque).?.* = bun.cast(
|
||||
**anyopaque,
|
||||
@@ -1848,7 +1812,7 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc
|
||||
);
|
||||
socket.close(0, null);
|
||||
}
|
||||
this.fail(err, null);
|
||||
this.fail(err);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1903,8 +1867,8 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP
|
||||
}
|
||||
|
||||
pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
if (this.hasSignalAborted()) |reason| {
|
||||
this.closeAndAbort(reason, is_ssl, socket);
|
||||
if (this.hasSignalAborted()) {
|
||||
this.closeAndAbort(is_ssl, socket);
|
||||
return;
|
||||
}
|
||||
switch (this.state.response_stage) {
|
||||
@@ -2140,34 +2104,22 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
|
||||
}
|
||||
}
|
||||
|
||||
pub fn closeAndAbort(this: *HTTPClient, reason: JSC.JSValue, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
if (this.state.stage != .fail and this.state.stage != .done) {
|
||||
log("closeAndAbort", .{});
|
||||
if (!socket.isClosed()) {
|
||||
socket.ext(**anyopaque).?.* = bun.cast(
|
||||
**anyopaque,
|
||||
NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
|
||||
);
|
||||
socket.close(0, null);
|
||||
}
|
||||
this.fail(error.Aborted, reason);
|
||||
}
|
||||
pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
this.closeAndFail(error.Aborted, comptime is_ssl, socket);
|
||||
}
|
||||
|
||||
fn fail(this: *HTTPClient, err: anyerror, reason: ?JSC.JSValue) void {
|
||||
fn fail(this: *HTTPClient, err: anyerror) void {
|
||||
this.state.request_stage = .fail;
|
||||
this.state.response_stage = .fail;
|
||||
this.state.fail = err;
|
||||
this.state.stage = .fail;
|
||||
|
||||
const callback = this.completion_callback;
|
||||
const result = this.toResult(this.cloned_metadata, reason);
|
||||
const result = this.toResult(this.cloned_metadata);
|
||||
this.state.reset();
|
||||
this.proxy_tunneling = false;
|
||||
|
||||
callback.run(result);
|
||||
|
||||
this.deinitSignal();
|
||||
}
|
||||
|
||||
// We have to clone metadata immediately after use
|
||||
@@ -2203,12 +2155,10 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss
|
||||
if (this.state.stage != .done and this.state.stage != .fail) {
|
||||
log("done", .{});
|
||||
|
||||
this.deinitSignal();
|
||||
|
||||
var out_str = this.state.body_out_str.?;
|
||||
var body = out_str.*;
|
||||
this.cloned_metadata.response = this.state.pending_response;
|
||||
const result = this.toResult(this.cloned_metadata, null);
|
||||
const result = this.toResult(this.cloned_metadata);
|
||||
const callback = this.completion_callback;
|
||||
|
||||
socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr());
|
||||
@@ -2251,7 +2201,6 @@ pub const HTTPClientResult = struct {
|
||||
fail: anyerror = error.NoError,
|
||||
redirected: bool = false,
|
||||
headers_buf: []picohttp.Header = &.{},
|
||||
reason: ?JSC.JSValue = null,
|
||||
|
||||
pub fn isSuccess(this: *const HTTPClientResult) bool {
|
||||
return this.fail == error.NoError;
|
||||
@@ -2304,7 +2253,7 @@ pub const HTTPClientResult = struct {
|
||||
};
|
||||
};
|
||||
|
||||
pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata, reason: ?JSC.JSValue) HTTPClientResult {
|
||||
pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult {
|
||||
return HTTPClientResult{
|
||||
.body = this.state.body_out_str,
|
||||
.response = metadata.response,
|
||||
@@ -2312,7 +2261,6 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata, reason: ?JSC.
|
||||
.redirected = this.remaining_redirect_count != default_redirect_count,
|
||||
.href = metadata.url,
|
||||
.fail = this.state.fail,
|
||||
.reason = reason,
|
||||
.headers_buf = metadata.response.headers,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,21 +1,26 @@
|
||||
import { afterAll, beforeAll, describe, expect, it, test } from "bun:test";
|
||||
import { afterAll, afterEach, beforeAll, describe, expect, it, test, beforeEach } from "bun:test";
|
||||
import fs, { chmodSync, unlinkSync } from "fs";
|
||||
import { mkfifo } from "mkfifo";
|
||||
import { gc, withoutAggressiveGC } from "./gc";
|
||||
|
||||
const sleep = countdown => {
|
||||
return Bun.sleep(countdown);
|
||||
};
|
||||
|
||||
const exampleFixture = fs.readFileSync(
|
||||
import.meta.path.substring(0, import.meta.path.lastIndexOf("/")) + "/fetch.js.txt",
|
||||
"utf8",
|
||||
);
|
||||
|
||||
var cachedServer;
|
||||
function getServer(handler) {
|
||||
cachedServer ||= Bun.serve(handler);
|
||||
cachedServer.reload(handler);
|
||||
return cachedServer;
|
||||
function getServer({ port, ...rest }) {
|
||||
return (cachedServer = Bun.serve({
|
||||
...rest,
|
||||
port: 0,
|
||||
}));
|
||||
}
|
||||
|
||||
afterAll(() => {
|
||||
afterEach(() => {
|
||||
cachedServer?.stop?.(true);
|
||||
});
|
||||
|
||||
@@ -63,9 +68,9 @@ describe("AbortSignalStreamTest", async () => {
|
||||
} catch (ex) {
|
||||
error = ex;
|
||||
}
|
||||
expect(error instanceof DOMException).toBeTruthy();
|
||||
expect(error.name).toBe("AbortError");
|
||||
expect(error.message).toBe("The operation was aborted.");
|
||||
expect(error instanceof DOMException).toBeTruthy();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,7 +82,6 @@ describe("AbortSignalStreamTest", async () => {
|
||||
});
|
||||
|
||||
describe("AbortSignalDirectStreamTest", () => {
|
||||
const port = 74322;
|
||||
async function abortOnStage(body, stage) {
|
||||
let error = undefined;
|
||||
var abortController = new AbortController();
|
||||
@@ -119,9 +123,9 @@ describe("AbortSignalDirectStreamTest", () => {
|
||||
} catch (ex) {
|
||||
error = ex;
|
||||
}
|
||||
expect(error instanceof DOMException).toBeTruthy();
|
||||
expect(error.name).toBe("AbortError");
|
||||
expect(error.message).toBe("The operation was aborted.");
|
||||
expect(error instanceof DOMException).toBeTruthy();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +137,39 @@ describe("AbortSignalDirectStreamTest", () => {
|
||||
});
|
||||
|
||||
describe("AbortSignal", () => {
|
||||
var server;
|
||||
beforeEach(() => {
|
||||
server = getServer({
|
||||
async fetch(request) {
|
||||
if (request.url.endsWith("/nodelay")) {
|
||||
return new Response("Hello");
|
||||
}
|
||||
if (request.url.endsWith("/stream")) {
|
||||
const reader = request.body.getReader();
|
||||
const body = new ReadableStream({
|
||||
async pull(controller) {
|
||||
if (!reader) controller.close();
|
||||
const { done, value } = await reader.read();
|
||||
// When no more data needs to be consumed, close the stream
|
||||
if (done) {
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
// Enqueue the next data chunk into our target stream
|
||||
controller.enqueue(value);
|
||||
},
|
||||
});
|
||||
return new Response(body);
|
||||
}
|
||||
if (request.method.toUpperCase() === "POST") {
|
||||
const body = await request.text();
|
||||
return new Response(body);
|
||||
}
|
||||
await sleep(15);
|
||||
return new Response("Hello");
|
||||
},
|
||||
});
|
||||
});
|
||||
it("AbortError", async () => {
|
||||
let name;
|
||||
try {
|
||||
@@ -140,7 +177,7 @@ describe("AbortSignal", () => {
|
||||
const signal = controller.signal;
|
||||
|
||||
async function manualAbort() {
|
||||
await Bun.sleep(10);
|
||||
await sleep(1);
|
||||
controller.abort();
|
||||
}
|
||||
await Promise.all([
|
||||
@@ -172,8 +209,9 @@ describe("AbortSignal", () => {
|
||||
try {
|
||||
var controller = new AbortController();
|
||||
const signal = controller.signal;
|
||||
|
||||
async function manualAbort() {
|
||||
await Bun.sleep(10);
|
||||
await sleep(10);
|
||||
controller.abort("My Reason");
|
||||
}
|
||||
await Promise.all([
|
||||
@@ -197,7 +235,7 @@ describe("AbortSignal", () => {
|
||||
});
|
||||
|
||||
async function manualAbort() {
|
||||
await Bun.sleep(10);
|
||||
await sleep(10);
|
||||
controller.abort();
|
||||
}
|
||||
await Promise.all([
|
||||
@@ -231,9 +269,9 @@ describe("AbortSignal", () => {
|
||||
error = ex;
|
||||
}
|
||||
|
||||
expect(error instanceof DOMException).toBeTruthy();
|
||||
expect(error.name).toBe("AbortError");
|
||||
expect(error.message).toBe("The operation was aborted.");
|
||||
expect(error instanceof DOMException).toBeTruthy();
|
||||
});
|
||||
|
||||
it("TimeoutError", async () => {
|
||||
@@ -253,7 +291,7 @@ describe("AbortSignal", () => {
|
||||
const signal = controller.signal;
|
||||
const request = new Request(`http://127.0.0.1:${server.port}`, { signal });
|
||||
async function manualAbort() {
|
||||
await Bun.sleep(10);
|
||||
await sleep(10);
|
||||
controller.abort();
|
||||
}
|
||||
await Promise.all([fetch(request).then(res => res.text()), manualAbort()]);
|
||||
@@ -443,7 +481,7 @@ describe("fetch", () => {
|
||||
await fetch(url, { body: "buntastic" });
|
||||
expect(false).toBe(true);
|
||||
} catch (exception) {
|
||||
expect(exception instanceof TypeError).toBe(true);
|
||||
expect(exception.name).toBe("TypeError");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user