From 4dfd87a3023c5a5af3cd97764d48f2a7cebcff99 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Thu, 25 Sep 2025 16:08:06 -0700 Subject: [PATCH] Fix aborting fetch() calls while the socket is connecting. Fix a thread-safety issue involving redirects and AbortSignal. (#22842) ### What does this PR do? When we added "happy eyeballs" support to fetch(), it meant that `onOpen` would not be called potentially for awhile. If the AbortSignal is aborted between `connect()` and the socket becoming readable/writable, then we would delay closing the connection until the connection opens. Fixing that fixes #18536. Separately, the `isHTTPS()` function used in abort and in request body streams was not thread safe. This caused a crash when many redirects happen simultaneously while either AbortSignal or request body messages are in-flight. This PR fixes https://github.com/oven-sh/bun/issues/14137 ### How did you verify your code works? There are tests --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Claude Bot Co-authored-by: Ciro Spaciari --- src/bun.js/webcore.zig | 1 + src/bun.js/webcore/ResumableSink.zig | 159 +++++++++--------- src/bun.js/webcore/fetch.zig | 99 +++++++---- src/http.zig | 72 +++++--- src/http/HTTPContext.zig | 61 +++---- src/http/HTTPThread.zig | 140 +++++++++------ src/http/InternalState.zig | 4 + src/js/builtins/ReadableStreamInternals.ts | 132 +++++++++------ src/s3/client.zig | 3 +- src/s3/multipart.zig | 17 +- .../io/fetch/fetch-abort-slow-connect.test.ts | 59 +++++++ test/js/bun/s3/s3-stream-leak-fixture.js | 2 +- test/js/bun/s3/s3.leak.test.ts | 3 +- test/js/web/fetch/fetch.test.ts | 35 ++++ 14 files changed, 497 insertions(+), 290 deletions(-) create mode 100644 test/js/bun/io/fetch/fetch-abort-slow-connect.test.ts diff --git a/src/bun.js/webcore.zig b/src/bun.js/webcore.zig index 9fcfebfc29..5bf55087b4 100644 --- a/src/bun.js/webcore.zig +++ b/src/bun.js/webcore.zig @@ -28,6 +28,7 @@ pub const Blob = @import("./webcore/Blob.zig"); pub const S3Stat = @import("./webcore/S3Stat.zig").S3Stat; pub const ResumableFetchSink = @import("./webcore/ResumableSink.zig").ResumableFetchSink; pub const ResumableS3UploadSink = @import("./webcore/ResumableSink.zig").ResumableS3UploadSink; +pub const ResumableSinkBackpressure = @import("./webcore/ResumableSink.zig").ResumableSinkBackpressure; pub const S3Client = @import("./webcore/S3Client.zig").S3Client; pub const Request = @import("./webcore/Request.zig"); pub const Body = @import("./webcore/Body.zig"); diff --git a/src/bun.js/webcore/ResumableSink.zig b/src/bun.js/webcore/ResumableSink.zig index ddbe325c40..3812777eb8 100644 --- a/src/bun.js/webcore/ResumableSink.zig +++ b/src/bun.js/webcore/ResumableSink.zig @@ -6,7 +6,7 @@ pub fn ResumableSink( comptime js: type, comptime Context: type, - comptime onWrite: fn (context: *Context, chunk: []const u8) bool, + comptime onWrite: fn (context: *Context, chunk: []const u8) ResumableSinkBackpressure, comptime onEnd: fn (context: *Context, err: ?jsc.JSValue) void, ) type { return struct { @@ -15,6 +15,8 @@ pub fn ResumableSink( pub const fromJS = js.fromJS; pub const fromJSDirect = js.fromJSDirect; + const ThisSink = @This(); + pub const new = bun.TrivialNew(@This()); const RefCount = bun.ptr.RefCount(@This(), "ref_count", deinit, .{}); pub const ref = RefCount.ref; @@ -26,7 +28,7 @@ pub fn ResumableSink( const setStream = js.streamSetCached; const getStream = js.streamGetCached; ref_count: RefCount, - self: jsc.Strong.Optional = jsc.Strong.Optional.empty, + #js_this: jsc.JSRef = .empty(), // We can have a detached self, and still have a strong reference to the stream stream: jsc.WebCore.ReadableStream.Strong = .{}, globalThis: *jsc.JSGlobalObject, @@ -41,16 +43,16 @@ pub fn ResumableSink( done, }; - pub fn constructor(globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!*@This() { + pub fn constructor(globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!*ThisSink { return globalThis.throwInvalidArguments("ResumableSink is not constructable", .{}); } - pub fn init(globalThis: *jsc.JSGlobalObject, stream: jsc.WebCore.ReadableStream, context: *Context) *@This() { + pub fn init(globalThis: *jsc.JSGlobalObject, stream: jsc.WebCore.ReadableStream, context: *Context) *ThisSink { return initExactRefs(globalThis, stream, context, 1); } - pub fn initExactRefs(globalThis: *jsc.JSGlobalObject, stream: jsc.WebCore.ReadableStream, context: *Context, ref_count: u32) *@This() { - const this = @This().new(.{ + pub fn initExactRefs(globalThis: *jsc.JSGlobalObject, stream: jsc.WebCore.ReadableStream, context: *Context, ref_count: u32) *ThisSink { + const this: *ThisSink = ThisSink.new(.{ .globalThis = globalThis, .context = context, .ref_count = RefCount.initExactRefs(ref_count), @@ -123,13 +125,15 @@ pub fn ResumableSink( self.ensureStillAlive(); const js_stream = stream.toJS(); js_stream.ensureStillAlive(); - _ = Bun__assignStreamIntoResumableSink(globalThis, js_stream, self); - this.self = jsc.Strong.Optional.create(self, globalThis); + this.#js_this.setStrong(self, globalThis); setStream(self, globalThis, js_stream); + + _ = Bun__assignStreamIntoResumableSink(globalThis, js_stream, self); + return this; } - pub fn jsSetHandlers(_: *@This(), globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, this_value: jsc.JSValue) bun.JSError!jsc.JSValue { + pub fn jsSetHandlers(_: *ThisSink, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, this_value: jsc.JSValue) bun.JSError!jsc.JSValue { jsc.markBinding(@src()); const args = callframe.arguments(); @@ -149,7 +153,7 @@ pub fn ResumableSink( return .js_undefined; } - pub fn jsStart(this: *@This(), globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + pub fn jsStart(this: *ThisSink, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { jsc.markBinding(@src()); const args = callframe.arguments(); if (args.len > 0 and args[0].isObject()) { @@ -161,38 +165,43 @@ pub fn ResumableSink( return .js_undefined; } - pub fn jsWrite(this: *@This(), globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + pub fn jsWrite(this: *ThisSink, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { jsc.markBinding(@src()); const args = callframe.arguments(); // ignore any call if detached - if (!this.self.has() or this.status == .done) return .js_undefined; + if (this.isDetached()) return .js_undefined; if (args.len < 1) { return globalThis.throwInvalidArguments("ResumableSink.write requires at least 1 argument", .{}); } const buffer = args[0]; - buffer.ensureStillAlive(); - if (try jsc.Node.StringOrBuffer.fromJS(globalThis, bun.default_allocator, buffer)) |sb| { - defer sb.deinit(); - const bytes = sb.slice(); - log("jsWrite {}", .{bytes.len}); - const should_continue = onWrite(this.context, bytes); - if (!should_continue) { + const sb = try jsc.Node.StringOrBuffer.fromJS(globalThis, bun.default_allocator, buffer) orelse { + return globalThis.throwInvalidArguments("ResumableSink.write requires a string or buffer", .{}); + }; + + defer sb.deinit(); + const bytes = sb.slice(); + log("jsWrite {}", .{bytes.len}); + switch (onWrite(this.context, bytes)) { + .backpressure => { log("paused", .{}); this.status = .paused; - } - return .jsBoolean(should_continue); + }, + .done => {}, + .want_more => { + this.status = .started; + }, } - return globalThis.throwInvalidArguments("ResumableSink.write requires a string or buffer", .{}); + return .jsBoolean(this.status != .paused); } - pub fn jsEnd(this: *@This(), _: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + pub fn jsEnd(this: *ThisSink, _: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { jsc.markBinding(@src()); const args = callframe.arguments(); // ignore any call if detached - if (!this.self.has() or this.status == .done) return .js_undefined; + if (this.isDetached()) return .js_undefined; this.detachJS(); log("jsEnd {}", .{args.len}); this.status = .done; @@ -201,86 +210,73 @@ pub fn ResumableSink( return .js_undefined; } - pub fn drain(this: *@This()) void { + pub fn drain(this: *ThisSink) void { log("drain", .{}); if (this.status != .paused) { return; } - if (this.self.get()) |js_this| { + if (this.#js_this.tryGet()) |js_this| { const globalObject = this.globalThis; - const vm = globalObject.bunVM(); - vm.eventLoop().enter(); - defer vm.eventLoop().exit(); + if (getDrain(js_this)) |ondrain| { - if (ondrain.isCallable()) { - this.status = .started; - _ = ondrain.call(globalObject, .js_undefined, &.{.js_undefined}) catch |err| { - // should never happen - bun.debugAssert(false); - _ = globalObject.takeError(err); - }; - } + this.status = .started; + globalObject.bunVM().eventLoop().runCallback(ondrain, globalObject, .js_undefined, &.{ .js_undefined, .js_undefined }); } } } - pub fn cancel(this: *@This(), reason: jsc.JSValue) void { + pub fn cancel(this: *ThisSink, reason: jsc.JSValue) void { if (this.status == .piped) { reason.ensureStillAlive(); this.endPipe(reason); return; } - if (this.self.get()) |js_this| { + if (this.#js_this.tryGet()) |js_this| { this.status = .done; js_this.ensureStillAlive(); + const onCancelCallback = getCancel(js_this); const globalObject = this.globalThis; - const vm = globalObject.bunVM(); - vm.eventLoop().enter(); - defer vm.eventLoop().exit(); - if (getCancel(js_this)) |oncancel| { - oncancel.ensureStillAlive(); - // detach first so if cancel calls end will be a no-op - this.detachJS(); - // call onEnd to indicate the native side that the stream errored - onEnd(this.context, reason); - if (oncancel.isCallable()) { - _ = oncancel.call(globalObject, .js_undefined, &.{ .js_undefined, reason }) catch |err| { - // should never happen - bun.debugAssert(false); - _ = globalObject.takeError(err); - }; - } - } else { - // should never happen but lets call onEnd to indicate the native side that the stream errored - this.detachJS(); - onEnd(this.context, reason); + // detach first so if cancel calls end will be a no-op + this.detachJS(); + + // call onEnd to indicate the native side that the stream errored + onEnd(this.context, reason); + + js_this.ensureStillAlive(); + if (onCancelCallback) |callback| { + const event_loop = globalObject.bunVM().eventLoop(); + event_loop.runCallback(callback, globalObject, .js_undefined, &.{ .js_undefined, reason }); } } } - fn detachJS(this: *@This()) void { - if (this.self.trySwap()) |js_this| { + pub fn isDetached(this: *const ThisSink) bool { + return this.#js_this != .strong or this.status == .done; + } + + fn detachJS(this: *ThisSink) void { + if (this.#js_this.tryGet()) |js_this| { setDrain(js_this, this.globalThis, .zero); setCancel(js_this, this.globalThis, .zero); setStream(js_this, this.globalThis, .zero); - this.self.deinit(); - this.self = jsc.Strong.Optional.empty; + this.#js_this.downgrade(); } } - pub fn deinit(this: *@This()) void { + pub fn deinit(this: *ThisSink) void { this.detachJS(); this.stream.deinit(); bun.destroy(this); } - pub fn finalize(this: *@This()) void { + pub fn finalize(this: *ThisSink) void { + this.#js_this.finalize(); this.deref(); } fn onStreamPipe( - this: *@This(), + this: *ThisSink, stream: bun.webcore.streams.Result, allocator: std.mem.Allocator, ) void { @@ -298,7 +294,10 @@ pub fn ResumableSink( } const chunk = stream.slice(); log("onWrite {}", .{chunk.len}); - const stopStream = !onWrite(this.context, chunk); + + // TODO: should the "done" state also trigger `endPipe`? + _ = onWrite(this.context, chunk); + const is_done = stream.isDone(); if (is_done) { @@ -313,34 +312,31 @@ pub fn ResumableSink( break :brk_err null; }; this.endPipe(err); - } else if (stopStream) { - // dont make sense pausing the stream here - // it will be buffered in the pipe anyways } } - fn endPipe(this: *@This(), err: ?jsc.JSValue) void { + fn endPipe(this: *ThisSink, err: ?jsc.JSValue) void { log("endPipe", .{}); if (this.status != .piped) return; this.status = .done; - if (this.stream.get(this.globalThis)) |stream_| { + const globalObject = this.globalThis; + if (this.stream.get(globalObject)) |stream_| { if (stream_.ptr == .Bytes) { stream_.ptr.Bytes.pipe = .{}; } if (err != null) { - stream_.cancel(this.globalThis); + stream_.cancel(globalObject); } else { - stream_.done(this.globalThis); + stream_.done(globalObject); } var stream = this.stream; this.stream = .{}; stream.deinit(); } - // We ref when we attach the stream so we deref when we detach the stream - this.deref(); onEnd(this.context, err); - if (this.self.has()) { + + if (this.#js_this == .strong) { // JS owns the stream, so we need to detach the JS and let finalize handle the deref // this should not happen but lets handle it anyways this.detachJS(); @@ -348,10 +344,17 @@ pub fn ResumableSink( // no js attached, so we can just deref this.deref(); } + + // We ref when we attach the stream so we deref when we detach the stream + this.deref(); } }; } - +pub const ResumableSinkBackpressure = enum { + want_more, + backpressure, + done, +}; pub const ResumableFetchSink = ResumableSink(jsc.Codegen.JSResumableFetchSink, FetchTasklet, FetchTasklet.writeRequestData, FetchTasklet.writeEndRequest); pub const ResumableS3UploadSink = ResumableSink(jsc.Codegen.JSResumableS3UploadSink, S3UploadStreamWrapper, S3UploadStreamWrapper.writeRequestData, S3UploadStreamWrapper.writeEndRequest); diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index 633f17d16c..b48c2f406b 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -302,6 +302,8 @@ pub const FetchTasklet = struct { this.abort_reason.deinit(); this.check_server_identity.deinit(); this.clearAbortSignal(); + // Clear the sink only after the requested ended otherwise we would potentialy lose the last chunk + this.clearSink(); } pub fn deinit(this: *FetchTasklet) void { @@ -343,6 +345,13 @@ pub const FetchTasklet = struct { this.is_waiting_request_stream_start = false; bun.assert(this.request_body == .ReadableStream); if (this.request_body.ReadableStream.get(this.global_this)) |stream| { + if (this.signal) |signal| { + if (signal.aborted()) { + stream.abort(this.global_this); + return; + } + } + const globalThis = this.global_this; this.ref(); // lets only unref when sink is done // +1 because the task refs the sink @@ -1176,53 +1185,63 @@ pub const FetchTasklet = struct { pub fn resumeRequestDataStream(this: *FetchTasklet) void { // deref when done because we ref inside onWriteRequestDataDrain defer this.deref(); + log("resumeRequestDataStream", .{}); if (this.sink) |sink| { + if (this.signal) |signal| { + if (signal.aborted()) { + // already aborted; nothing to drain + return; + } + } sink.drain(); } } - pub fn writeRequestData(this: *FetchTasklet, data: []const u8) bool { + pub fn writeRequestData(this: *FetchTasklet, data: []const u8) ResumableSinkBackpressure { log("writeRequestData {}", .{data.len}); - if (this.request_body_streaming_buffer) |buffer| { - const highWaterMark = if (this.sink) |sink| sink.highWaterMark else 16384; - const stream_buffer = buffer.acquire(); - var needs_schedule = false; - defer if (needs_schedule) { - // wakeup the http thread to write the data - http.http_thread.scheduleRequestWrite(this.http.?, .data); - }; - defer buffer.release(); - - // dont have backpressure so we will schedule the data to be written - // if we have backpressure the onWritable will drain the buffer - needs_schedule = stream_buffer.isEmpty(); - if (this.upgraded_connection) { - bun.handleOom(stream_buffer.write(data)); - } else { - //16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n - var formated_size_buffer: [18]u8 = undefined; - const formated_size = std.fmt.bufPrint( - formated_size_buffer[0..], - "{x}\r\n", - .{data.len}, - ) catch |err| switch (err) { - error.NoSpaceLeft => unreachable, - }; - bun.handleOom(stream_buffer.ensureUnusedCapacity(formated_size.len + data.len + 2)); - stream_buffer.writeAssumeCapacity(formated_size); - stream_buffer.writeAssumeCapacity(data); - stream_buffer.writeAssumeCapacity("\r\n"); + if (this.signal) |signal| { + if (signal.aborted()) { + return .done; } - - // pause the stream if we hit the high water mark - return stream_buffer.size() >= highWaterMark; } - return false; + const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return .done; + const stream_buffer = thread_safe_stream_buffer.acquire(); + defer thread_safe_stream_buffer.release(); + const highWaterMark = if (this.sink) |sink| sink.highWaterMark else 16384; + + var needs_schedule = false; + defer if (needs_schedule) { + // wakeup the http thread to write the data + http.http_thread.scheduleRequestWrite(this.http.?, .data); + }; + + // dont have backpressure so we will schedule the data to be written + // if we have backpressure the onWritable will drain the buffer + needs_schedule = stream_buffer.isEmpty(); + if (this.upgraded_connection) { + bun.handleOom(stream_buffer.write(data)); + } else { + //16 is the max size of a hex number size that represents 64 bits + 2 for the \r\n + var formated_size_buffer: [18]u8 = undefined; + const formated_size = std.fmt.bufPrint( + formated_size_buffer[0..], + "{x}\r\n", + .{data.len}, + ) catch |err| switch (err) { + error.NoSpaceLeft => unreachable, + }; + bun.handleOom(stream_buffer.ensureUnusedCapacity(formated_size.len + data.len + 2)); + stream_buffer.writeAssumeCapacity(formated_size); + stream_buffer.writeAssumeCapacity(data); + stream_buffer.writeAssumeCapacity("\r\n"); + } + + // pause the stream if we hit the high water mark + return if (stream_buffer.size() >= highWaterMark) .backpressure else .want_more; } pub fn writeEndRequest(this: *FetchTasklet, err: ?jsc.JSValue) void { log("writeEndRequest hasError? {}", .{err != null}); - this.clearSink(); defer this.deref(); if (err) |jsError| { if (this.signal_store.aborted.load(.monotonic) or this.abort_reason.has()) { @@ -1233,9 +1252,16 @@ pub const FetchTasklet = struct { } this.abortTask(); } else { + if (!this.upgraded_connection) { + // If is not upgraded we need to send the terminating chunk + const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return; + const stream_buffer = thread_safe_stream_buffer.acquire(); + defer thread_safe_stream_buffer.release(); + bun.handleOom(stream_buffer.write(http.end_of_chunked_http1_1_encoding_response_body)); + } if (this.http) |http_| { // just tell to write the end of the chunked encoding aka 0\r\n\r\n - http.http_thread.scheduleRequestWrite(http_, .endChunked); + http.http_thread.scheduleRequestWrite(http_, .end); } } } @@ -2743,6 +2769,7 @@ const JSType = jsc.C.JSType; const Body = jsc.WebCore.Body; const Request = jsc.WebCore.Request; const Response = jsc.WebCore.Response; +const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure; const Blob = jsc.WebCore.Blob; const AnyBlob = jsc.WebCore.Blob.Any; diff --git a/src/http.zig b/src/http.zig index b081db2ecc..c5cea683eb 100644 --- a/src/http.zig +++ b/src/http.zig @@ -6,7 +6,7 @@ pub var default_arena: Arena = undefined; pub var http_thread: HTTPThread = undefined; //TODO: this needs to be freed when Worker Threads are implemented -pub var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSocket).init(bun.default_allocator); +pub var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.AnySocket).init(bun.default_allocator); pub var async_http_id_monotonic: std.atomic.Value(u32) = std.atomic.Value(u32).init(0); const MAX_REDIRECT_URL_LENGTH = 128 * 1024; @@ -107,7 +107,10 @@ pub fn registerAbortTracker( socket: NewHTTPContext(is_ssl).HTTPSocket, ) void { if (client.signals.aborted != null) { - socket_async_http_abort_tracker.put(client.async_http_id, socket.socket) catch unreachable; + switch (is_ssl) { + true => socket_async_http_abort_tracker.put(client.async_http_id, .{ .SocketTLS = socket }) catch unreachable, + false => socket_async_http_abort_tracker.put(client.async_http_id, .{ .SocketTCP = socket }) catch unreachable, + } } } @@ -139,6 +142,9 @@ pub fn onOpen( return error.ClientAborted; } + if (client.state.request_stage == .pending) + client.state.request_stage = .opened; + if (comptime is_ssl) { var ssl_ptr: *BoringSSL.SSL = @ptrCast(socket.getNativeHandle()); if (!ssl_ptr.isInitFinished()) { @@ -181,8 +187,11 @@ pub fn firstCall( } } - if (client.state.request_stage == .pending) { - client.onWritable(true, comptime is_ssl, socket); + switch (client.state.request_stage) { + .opened, .pending => { + client.onWritable(true, comptime is_ssl, socket); + }, + else => {}, } } pub fn onClose( @@ -724,10 +733,7 @@ pub fn doRedirect( log("close the tunnel in redirect", .{}); this.proxy_tunnel = null; tunnel.detachAndDeref(); - if (!socket.isClosed()) { - log("close socket in redirect", .{}); - NewHTTPContext(is_ssl).closeSocket(socket); - } + NewHTTPContext(is_ssl).closeSocket(socket); } else { // we need to clean the client reference before closing the socket because we are going to reuse the same ref in a another request if (this.isKeepAlivePossible()) { @@ -762,6 +768,8 @@ pub fn doRedirect( return this.start(.{ .bytes = request_body }, body_out_str); } + +/// **Not thread safe while request is in-flight** pub fn isHTTPS(this: *HTTPClient) bool { if (this.http_proxy) |proxy| { if (proxy.isHTTPS()) { @@ -774,6 +782,7 @@ pub fn isHTTPS(this: *HTTPClient) bool { } return false; } + pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableString) void { body_out_str.reset(); @@ -788,6 +797,8 @@ pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableStr } fn start_(this: *HTTPClient, comptime is_ssl: bool) void { + this.unregisterAbortTracker(); + // mark that we are connecting this.flags.defer_fail_until_connecting_is_complete = true; // this will call .fail() if the connection fails in the middle of the function avoiding UAF with can happen when the connection is aborted @@ -819,6 +830,18 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void { this.fail(error.ConnectionClosed); return; } + + // If we haven't already called onOpen(), then that means we need to + // register the abort tracker. We need to do this in cases where the + // connection takes a long time to happen such as when it's not routable. + // See test/js/bun/io/fetch/fetch-abort-slow-connect.test.ts. + // + // We have to be careful here because if .connect() had finished + // synchronously, then this socket is on longer valid and the pointer points + // to invalid memory. + if (this.state.request_stage == .pending) { + this.registerAbortTracker(is_ssl, socket); + } } pub const HTTPResponseMetadata = struct { @@ -1003,8 +1026,8 @@ fn writeToSocketWithBufferFallback(comptime is_ssl: bool, socket: NewHTTPContext /// Write buffered data to the socket returning true if there is backpressure fn writeToStreamUsingBuffer(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, buffer: *bun.io.StreamBuffer, data: []const u8) !bool { - if (buffer.isNotEmpty()) { - const to_send = buffer.slice(); + const to_send = buffer.slice(); + if (to_send.len > 0) { const amount = try writeToSocket(is_ssl, socket, to_send); this.state.request_sent_len += amount; buffer.cursor += amount; @@ -1020,6 +1043,7 @@ fn writeToStreamUsingBuffer(this: *HTTPClient, comptime is_ssl: bool, socket: Ne buffer.reset(); } } + // ok we flushed all pending data so we can reset the backpressure if (data.len > 0) { // no backpressure everything was sended so we can just try to send @@ -1109,7 +1133,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s } switch (this.state.request_stage) { - .pending, .headers => { + .pending, .headers, .opened => { log("sendInitialRequestPayload", .{}); this.setTimeout(socket, 5); const result = sendInitialRequestPayload(this, is_first_call, is_ssl, socket) catch |err| { @@ -1164,13 +1188,15 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s switch (this.state.original_request_body) { .bytes => { const to_send = this.state.request_body; - const sent = writeToSocket(is_ssl, socket, to_send) catch |err| { - this.closeAndFail(err, is_ssl, socket); - return; - }; + if (to_send.len > 0) { + const sent = writeToSocket(is_ssl, socket, to_send) catch |err| { + this.closeAndFail(err, is_ssl, socket); + return; + }; - this.state.request_sent_len += sent; - this.state.request_body = this.state.request_body[sent..]; + this.state.request_sent_len += sent; + this.state.request_body = this.state.request_body[sent..]; + } if (this.state.request_body.len == 0) { this.state.request_stage = .done; @@ -1312,9 +1338,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { log("closeAndFail: {s}", .{@errorName(err)}); - if (!socket.isClosed()) { - NewHTTPContext(is_ssl).terminateSocket(socket); - } + NewHTTPContext(is_ssl).terminateSocket(socket); this.fail(err); } @@ -1684,10 +1708,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon this.proxy_tunnel = null; tunnel.shutdown(); tunnel.detachAndDeref(); - if (!socket.isClosed()) { - log("close socket", .{}); - NewHTTPContext(is_ssl).closeSocket(socket); - } + NewHTTPContext(is_ssl).closeSocket(socket); } else { if (this.isKeepAlivePossible() and !socket.isClosedOrHasError()) { log("release socket", .{}); @@ -1697,8 +1718,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon this.connected_url.hostname, this.connected_url.getPortAuto(), ); - } else if (!socket.isClosed()) { - log("close socket", .{}); + } else { NewHTTPContext(is_ssl).closeSocket(socket); } } diff --git a/src/http/HTTPContext.zig b/src/http/HTTPContext.zig index bc36f9abef..8cf733037e 100644 --- a/src/http/HTTPContext.zig +++ b/src/http/HTTPContext.zig @@ -10,10 +10,18 @@ pub fn NewHTTPContext(comptime ssl: bool) type { did_have_handshaking_error_while_reject_unauthorized_is_false: bool = false, }; - pub fn markSocketAsDead(socket: HTTPSocket) void { - if (socket.ext(**anyopaque)) |ctx| { - ctx.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr()); + pub fn markTaggedSocketAsDead(socket: HTTPSocket, tagged: ActiveSocket) void { + if (tagged.is(PooledSocket)) { + Handler.addMemoryBackToPool(tagged.as(PooledSocket)); } + + if (socket.ext(**anyopaque)) |ctx| { + ctx.* = bun.cast(**anyopaque, ActiveSocket.init(dead_socket).ptr()); + } + } + + pub fn markSocketAsDead(socket: HTTPSocket) void { + markTaggedSocketAsDead(socket, getTaggedFromSocket(socket)); } pub fn terminateSocket(socket: HTTPSocket) void { @@ -34,7 +42,7 @@ pub fn NewHTTPContext(comptime ssl: bool) type { if (socket.ext(anyopaque)) |ctx| { return getTagged(ctx); } - return ActiveSocket.init(&dead_socket); + return ActiveSocket.init(dead_socket); } pub const PooledSocketHiveAllocator = bun.HiveArray(PooledSocket, pool_size); @@ -54,7 +62,7 @@ pub fn NewHTTPContext(comptime ssl: bool) type { } const ActiveSocket = TaggedPointerUnion(.{ - *DeadSocket, + DeadSocket, HTTPClient, PooledSocket, }); @@ -208,11 +216,6 @@ pub fn NewHTTPContext(comptime ssl: bool) type { } } - if (active.get(PooledSocket)) |pooled| { - addMemoryBackToPool(pooled); - return; - } - log("Unexpected open on unknown socket", .{}); terminateSocket(socket); } @@ -268,9 +271,6 @@ pub fn NewHTTPContext(comptime ssl: bool) type { if (socket.isClosed()) { markSocketAsDead(socket); - if (active.get(PooledSocket)) |pooled| { - addMemoryBackToPool(pooled); - } return; } @@ -284,10 +284,6 @@ pub fn NewHTTPContext(comptime ssl: bool) type { } } - if (active.get(PooledSocket)) |pooled| { - addMemoryBackToPool(pooled); - } - terminateSocket(socket); } pub fn onClose( @@ -302,12 +298,6 @@ pub fn NewHTTPContext(comptime ssl: bool) type { if (tagged.get(HTTPClient)) |client| { return client.onClose(comptime ssl, socket); } - - if (tagged.get(PooledSocket)) |pooled| { - addMemoryBackToPool(pooled); - } - - return; } fn addMemoryBackToPool(pooled: *PooledSocket) void { @@ -366,10 +356,6 @@ pub fn NewHTTPContext(comptime ssl: bool) type { const tagged = getTagged(ptr); if (tagged.get(HTTPClient)) |client| { return client.onTimeout(comptime ssl, socket); - } else if (tagged.get(PooledSocket)) |pooled| { - // If a socket has been sitting around for 5 minutes - // Let's close it and remove it from the pool. - addMemoryBackToPool(pooled); } terminateSocket(socket); @@ -380,16 +366,14 @@ pub fn NewHTTPContext(comptime ssl: bool) type { _: c_int, ) void { const tagged = getTagged(ptr); - markSocketAsDead(socket); + markTaggedSocketAsDead(socket, tagged); if (tagged.get(HTTPClient)) |client| { client.onConnectError(); - } else if (tagged.get(PooledSocket)) |pooled| { - addMemoryBackToPool(pooled); } // us_connecting_socket_close is always called internally by uSockets } pub fn onEnd( - _: *anyopaque, + ptr: *anyopaque, socket: HTTPSocket, ) void { // TCP fin must be closed, but we must keep the original tagged @@ -399,7 +383,14 @@ pub fn NewHTTPContext(comptime ssl: bool) type { // 1. HTTP Keep-Alive socket: it must be removed from the pool // 2. HTTP Client socket: it might need to be retried // 3. Dead socket: it is already marked as dead + const tagged = getTagged(ptr); + markTaggedSocketAsDead(socket, tagged); socket.close(.failure); + + if (tagged.get(HTTPClient)) |client| { + client.onClose(comptime ssl, socket); + return; + } } }; @@ -489,8 +480,12 @@ pub fn NewHTTPContext(comptime ssl: bool) type { }; } -const DeadSocket = opaque {}; -var dead_socket = @as(*DeadSocket, @ptrFromInt(1)); +const DeadSocket = struct { + garbage: u8 = 0, + pub var dead_socket: DeadSocket = .{}; +}; + +var dead_socket = &DeadSocket.dead_socket; const log = bun.Output.scoped(.HTTPContext, .hidden); const HTTPCertError = @import("./HTTPCertError.zig"); diff --git a/src/http/HTTPThread.zig b/src/http/HTTPThread.zig index 23f0514621..c65dd26ec8 100644 --- a/src/http/HTTPThread.zig +++ b/src/http/HTTPThread.zig @@ -13,8 +13,7 @@ queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(Wri queued_shutdowns_lock: bun.Mutex = .{}, queued_writes_lock: bun.Mutex = .{}, - -queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){}, +queued_threadlocal_proxy_derefs: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){}, has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), timer: std.time.Timer, @@ -82,21 +81,15 @@ pub const RequestBodyBuffer = union(enum) { const threadlog = Output.scoped(.HTTPThread, .hidden); const WriteMessage = struct { async_http_id: u32, - flags: packed struct(u8) { - is_tls: bool, - type: Type, - _: u5 = 0, - }, + message_type: Type, pub const Type = enum(u2) { data = 0, end = 1, - endChunked = 2, }; }; const ShutdownMessage = struct { async_http_id: u32, - is_tls: bool, }; pub const LibdeflateState = struct { @@ -285,62 +278,96 @@ pub fn context(this: *@This(), comptime is_ssl: bool) *NewHTTPContext(is_ssl) { return if (is_ssl) &this.https_context else &this.http_context; } -fn drainEvents(this: *@This()) void { - { - this.queued_shutdowns_lock.lock(); - defer this.queued_shutdowns_lock.unlock(); - for (this.queued_shutdowns.items) |http| { +fn drainQueuedShutdowns(this: *@This()) void { + while (true) { + // socket.close() can potentially be slow + // Let's not block other threads while this runs. + var queued_shutdowns = brk: { + this.queued_shutdowns_lock.lock(); + defer this.queued_shutdowns_lock.unlock(); + const shutdowns = this.queued_shutdowns; + this.queued_shutdowns = .{}; + break :brk shutdowns; + }; + defer queued_shutdowns.deinit(bun.default_allocator); + + for (queued_shutdowns.items) |http| { if (bun.http.socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| { - if (http.is_tls) { - const socket = uws.SocketTLS.fromAny(socket_ptr.value); - // do a fast shutdown here since we are aborting and we dont want to wait for the close_notify from the other side - socket.close(.failure); - } else { - const socket = uws.SocketTCP.fromAny(socket_ptr.value); - socket.close(.failure); + switch (socket_ptr.value) { + inline .SocketTLS, .SocketTCP => |socket, tag| { + const is_tls = tag == .SocketTLS; + const HTTPContext = HTTPThread.NewHTTPContext(comptime is_tls); + const tagged = HTTPContext.getTaggedFromSocket(socket); + if (tagged.get(HTTPClient)) |client| { + // If we only call socket.close(), then it won't + // call `onClose` if this happens before `onOpen` is + // called. + // + client.closeAndAbort(comptime is_tls, socket); + continue; + } + socket.close(.failure); + }, } } } - this.queued_shutdowns.clearRetainingCapacity(); + if (queued_shutdowns.items.len == 0) { + break; + } + threadlog("drained {d} queued shutdowns", .{queued_shutdowns.items.len}); } - { - this.queued_writes_lock.lock(); - defer this.queued_writes_lock.unlock(); - for (this.queued_writes.items) |write| { - const flags = write.flags; - const messageType = flags.type; - const ended = messageType == .end or messageType == .endChunked; +} + +fn drainQueuedWrites(this: *@This()) void { + while (true) { + var queued_writes = brk: { + this.queued_writes_lock.lock(); + defer this.queued_writes_lock.unlock(); + const writes = this.queued_writes; + this.queued_writes = .{}; + break :brk writes; + }; + defer queued_writes.deinit(bun.default_allocator); + for (queued_writes.items) |write| { + const message = write.message_type; + const ended = message == .end; if (bun.http.socket_async_http_abort_tracker.get(write.async_http_id)) |socket_ptr| { - switch (flags.is_tls) { - inline true, false => |is_tls| { - const socket = uws.NewSocketHandler(is_tls).fromAny(socket_ptr); + switch (socket_ptr) { + inline .SocketTLS, .SocketTCP => |socket, tag| { + const is_tls = tag == .SocketTLS; if (socket.isClosed() or socket.isShutdown()) { continue; } - const tagged = NewHTTPContext(is_tls).getTaggedFromSocket(socket); + const tagged = NewHTTPContext(comptime is_tls).getTaggedFromSocket(socket); if (tagged.get(HTTPClient)) |client| { if (client.state.original_request_body == .stream) { var stream = &client.state.original_request_body.stream; stream.ended = ended; - if (messageType == .endChunked and client.flags.upgrade_state != .upgraded) { - // only send the 0-length chunk if the request body is chunked and not upgraded - client.writeToStream(is_tls, socket, bun.http.end_of_chunked_http1_1_encoding_response_body); - } else { - client.flushStream(is_tls, socket); - } + + client.flushStream(is_tls, socket); } } }, } } } - this.queued_writes.clearRetainingCapacity(); + if (queued_writes.items.len == 0) { + break; + } + threadlog("drained {d} queued writes", .{queued_writes.items.len}); } +} - while (this.queued_proxy_deref.pop()) |http| { +fn drainEvents(this: *@This()) void { + // Process any pending writes **before** aborting. + this.drainQueuedWrites(); + this.drainQueuedShutdowns(); + + for (this.queued_threadlocal_proxy_derefs.items) |http| { http.deref(); } + this.queued_threadlocal_proxy_derefs.clearRetainingCapacity(); var count: usize = 0; var active = AsyncHTTP.active_requests_count.load(.monotonic); @@ -379,6 +406,14 @@ fn processEvents(this: *@This()) noreturn { while (true) { this.drainEvents(); + if (comptime Environment.isDebug and bun.asan.enabled) { + for (bun.http.socket_async_http_abort_tracker.keys(), bun.http.socket_async_http_abort_tracker.values()) |http_id, socket| { + if (socket.socket().get()) |usocket| { + _ = http_id; + bun.asan.assertUnpoisoned(usocket); + } + } + } var start_time: i128 = 0; if (comptime Environment.isDebug) { @@ -390,6 +425,15 @@ fn processEvents(this: *@This()) noreturn { this.loop.loop.tick(); this.loop.loop.dec(); + if (comptime Environment.isDebug and bun.asan.enabled) { + for (bun.http.socket_async_http_abort_tracker.keys(), bun.http.socket_async_http_abort_tracker.values()) |http_id, socket| { + if (socket.socket().get()) |usocket| { + _ = http_id; + bun.asan.assertUnpoisoned(usocket); + } + } + } + // this.loop.run(); if (comptime Environment.isDebug) { const end = std.time.nanoTimestamp(); @@ -400,12 +444,12 @@ fn processEvents(this: *@This()) noreturn { } pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void { + threadlog("scheduleShutdown {d}", .{http.async_http_id}); { this.queued_shutdowns_lock.lock(); defer this.queued_shutdowns_lock.unlock(); this.queued_shutdowns.append(bun.default_allocator, .{ .async_http_id = http.async_http_id, - .is_tls = http.client.isHTTPS(), }) catch |err| bun.handleOom(err); } if (this.has_awoken.load(.monotonic)) @@ -418,10 +462,7 @@ pub fn scheduleRequestWrite(this: *@This(), http: *AsyncHTTP, messageType: Write defer this.queued_writes_lock.unlock(); this.queued_writes.append(bun.default_allocator, .{ .async_http_id = http.async_http_id, - .flags = .{ - .is_tls = http.client.isHTTPS(), - .type = messageType, - }, + .message_type = messageType, }) catch |err| bun.handleOom(err); } if (this.has_awoken.load(.monotonic)) @@ -429,10 +470,8 @@ pub fn scheduleRequestWrite(this: *@This(), http: *AsyncHTTP, messageType: Write } pub fn scheduleProxyDeref(this: *@This(), proxy: *ProxyTunnel) void { - // this is always called on the http thread - { - bun.handleOom(this.queued_proxy_deref.append(bun.default_allocator, proxy)); - } + // this is always called on the http thread, + bun.handleOom(this.queued_threadlocal_proxy_derefs.append(bun.default_allocator, proxy)); if (this.has_awoken.load(.monotonic)) this.loop.loop.wakeup(); } @@ -473,7 +512,6 @@ const Global = bun.Global; const Output = bun.Output; const jsc = bun.jsc; const strings = bun.strings; -const uws = bun.uws; const Arena = bun.allocators.MimallocArena; const Batch = bun.ThreadPool.Batch; const UnboundedQueue = bun.threading.UnboundedQueue; diff --git a/src/http/InternalState.zig b/src/http/InternalState.zig index dab63e9053..30fed9fd3a 100644 --- a/src/http/InternalState.zig +++ b/src/http/InternalState.zig @@ -221,6 +221,10 @@ const log = Output.scoped(.HTTPInternalState, .hidden); const HTTPStage = enum { pending, + + /// The `onOpen` callback has been called for the first time. + opened, + headers, body, body_chunk, diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index abb4873b22..ff1e9130c4 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -1204,7 +1204,10 @@ export function onCloseDirectStream(reason) { stream = undefined; return thisResult; }; - } else if (this._pendingRead) { + // We will close after the next $pull is called otherwise we would lost the last chunk + return; + } + if (this._pendingRead) { var read = this._pendingRead; this._pendingRead = undefined; $putByIdDirectPrivate(this, "pull", $noopDoneFunction); @@ -1796,6 +1799,66 @@ export function readableStreamFromAsyncIterator(target, fn) { throw new TypeError("Expected an async generator"); } + var runningAsyncIteratorPromise; + async function runAsyncIterator(controller) { + var closingError: Error | undefined, value, done, immediateTask; + + try { + while (!cancelled && !done) { + const promise = iter.next(controller); + + if (cancelled) { + return; + } + + if ($isPromise(promise) && $isPromiseFulfilled(promise)) { + clearImmediate(immediateTask); + ({ value, done } = $getPromiseInternalField(promise, $promiseFieldReactionsOrResult)); + $assert(!$isPromise(value), "Expected a value, not a promise"); + } else { + immediateTask = setImmediate(() => immediateTask && controller?.flush?.(true)); + ({ value, done } = await promise); + + if (cancelled) { + return; + } + } + + if (!$isUndefinedOrNull(value)) { + controller.write(value); + } + } + } catch (e) { + closingError = e; + } finally { + clearImmediate(immediateTask); + immediateTask = undefined; + // "iter" will be undefined if the stream was closed above. + + // Stream was closed before we tried writing to it. + if (closingError?.code === "ERR_INVALID_THIS") { + await iter?.return?.(); + return; + } + + if (closingError) { + try { + await iter.throw?.(closingError); + } finally { + iter = undefined; + // eslint-disable-next-line no-throw-literal + throw closingError; + } + } else { + await controller.end(); + if (iter) { + await iter.return?.(); + } + } + iter = undefined; + } + } + return new ReadableStream({ type: "direct", @@ -1826,62 +1889,23 @@ export function readableStreamFromAsyncIterator(target, fn) { }, async pull(controller) { - var closingError: Error | undefined, value, done, immediateTask; - - try { - while (!cancelled && !done) { - const promise = iter.next(controller); - - if (cancelled) { - return; - } - - if ($isPromise(promise) && $isPromiseFulfilled(promise)) { - clearImmediate(immediateTask); - ({ value, done } = $getPromiseInternalField(promise, $promiseFieldReactionsOrResult)); - $assert(!$isPromise(value), "Expected a value, not a promise"); - } else { - immediateTask = setImmediate(() => immediateTask && controller?.flush?.(true)); - ({ value, done } = await promise); - - if (cancelled) { - return; - } - } - - if (!$isUndefinedOrNull(value)) { - controller.write(value); + // pull() may be called multiple times before a single call completes. + // + // But, we only call into the stream once while a stream is in-progress. + if (!runningAsyncIteratorPromise) { + const asyncIteratorPromise = runAsyncIterator(controller); + runningAsyncIteratorPromise = asyncIteratorPromise; + try { + const result = await asyncIteratorPromise; + return result; + } finally { + if (runningAsyncIteratorPromise === asyncIteratorPromise) { + runningAsyncIteratorPromise = undefined; } } - } catch (e) { - closingError = e; - } finally { - clearImmediate(immediateTask); - immediateTask = undefined; - // "iter" will be undefined if the stream was closed above. - - // Stream was closed before we tried writing to it. - if (closingError?.code === "ERR_INVALID_THIS") { - await iter?.return?.(); - return; - } - - if (closingError) { - try { - await iter.throw?.(closingError); - } finally { - iter = undefined; - // eslint-disable-next-line no-throw-literal - throw closingError; - } - } else { - await controller.end(); - if (iter) { - await iter.return?.(); - } - } - iter = undefined; } + + return runningAsyncIteratorPromise; }, }); } diff --git a/src/s3/client.zig b/src/s3/client.zig index 1117409b8a..d08e774070 100644 --- a/src/s3/client.zig +++ b/src/s3/client.zig @@ -359,7 +359,7 @@ pub const S3UploadStreamWrapper = struct { } } - pub fn writeRequestData(this: *@This(), data: []const u8) bool { + pub fn writeRequestData(this: *@This(), data: []const u8) ResumableSinkBackpressure { log("writeRequestData {}", .{data.len}); return bun.handleOom(this.task.writeBytes(data, false)); } @@ -685,3 +685,4 @@ const std = @import("std"); const bun = @import("bun"); const jsc = bun.jsc; const picohttp = bun.picohttp; +const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure; diff --git a/src/s3/multipart.zig b/src/s3/multipart.zig index acc303469b..66b012a195 100644 --- a/src/s3/multipart.zig +++ b/src/s3/multipart.zig @@ -704,8 +704,8 @@ pub const MultiPartUpload = struct { utf16, }; - fn write(this: *@This(), chunk: []const u8, is_last: bool, comptime encoding: WriteEncoding) bun.OOM!bool { - if (this.ended) return true; // no backpressure since we are done + fn write(this: *@This(), chunk: []const u8, is_last: bool, comptime encoding: WriteEncoding) bun.OOM!ResumableSinkBackpressure { + if (this.ended) return .done; // no backpressure since we are done // we may call done inside processBuffered so we ensure that we keep a ref until we are done this.ref(); defer this.deref(); @@ -715,7 +715,7 @@ pub const MultiPartUpload = struct { if (this.buffered.size() > 0) { this.processBuffered(this.partSizeInBytes()); } - return !this.hasBackpressure(); + return if (this.hasBackpressure()) .backpressure else .want_more; } if (is_last) { this.ended = true; @@ -729,7 +729,7 @@ pub const MultiPartUpload = struct { this.processBuffered(this.partSizeInBytes()); } else { // still have more data and receive empty, nothing todo here - if (chunk.len == 0) return this.hasBackpressure(); + if (chunk.len == 0) return if (this.hasBackpressure()) .backpressure else .want_more; switch (encoding) { .bytes => try this.buffered.write(chunk), .latin1 => try this.buffered.writeLatin1(chunk, true), @@ -743,18 +743,18 @@ pub const MultiPartUpload = struct { // wait for more } - return !this.hasBackpressure(); + return if (this.hasBackpressure()) .backpressure else .want_more; } - pub fn writeLatin1(this: *@This(), chunk: []const u8, is_last: bool) bun.OOM!bool { + pub fn writeLatin1(this: *@This(), chunk: []const u8, is_last: bool) bun.OOM!ResumableSinkBackpressure { return try this.write(chunk, is_last, .latin1); } - pub fn writeUTF16(this: *@This(), chunk: []const u8, is_last: bool) bun.OOM!bool { + pub fn writeUTF16(this: *@This(), chunk: []const u8, is_last: bool) bun.OOM!ResumableSinkBackpressure { return try this.write(chunk, is_last, .utf16); } - pub fn writeBytes(this: *@This(), chunk: []const u8, is_last: bool) bun.OOM!bool { + pub fn writeBytes(this: *@This(), chunk: []const u8, is_last: bool) bun.OOM!ResumableSinkBackpressure { return try this.write(chunk, is_last, .bytes); } }; @@ -772,3 +772,4 @@ const executeSimpleS3Request = S3SimpleRequest.executeSimpleS3Request; const bun = @import("bun"); const jsc = bun.jsc; const strings = bun.strings; +const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure; diff --git a/test/js/bun/io/fetch/fetch-abort-slow-connect.test.ts b/test/js/bun/io/fetch/fetch-abort-slow-connect.test.ts new file mode 100644 index 0000000000..ef4f31e199 --- /dev/null +++ b/test/js/bun/io/fetch/fetch-abort-slow-connect.test.ts @@ -0,0 +1,59 @@ +import { expect, test } from "bun:test"; + +test.concurrent("fetch aborts when connect() returns EINPROGRESS but never completes", async () => { + // Use TEST-NET-1 (192.0.2.0/24) from RFC 5737 + // These IPs are reserved for documentation and testing. + // Connecting to them will cause connect() to return EINPROGRESS + // but the connection will never complete because there's no route. + const nonRoutableIP = "192.0.2.1"; + const port = 80; + + const start = performance.now(); + try { + await fetch(`http://${nonRoutableIP}:${port}/`, { + signal: AbortSignal.timeout(50), + }); + expect.unreachable("Fetch should have aborted"); + } catch (e: any) { + const elapsed = performance.now() - start; + expect(e.name).toBe("TimeoutError"); + expect(elapsed).toBeLessThan(1000); // But not more than 1000ms + } +}); + +test.concurrent("fetch aborts immediately during EINPROGRESS connect", async () => { + const nonRoutableIP = "192.0.2.1"; + const port = 80; + + // Start the fetch + const fetchPromise = fetch(`http://${nonRoutableIP}:${port}/`, { + signal: AbortSignal.timeout(1), + }); + + const start = performance.now(); + try { + await fetchPromise; + expect.unreachable("Fetch should have aborted"); + } catch (e: any) { + const elapsed = performance.now() - start; + expect(e.name).toBe("TimeoutError"); + expect(elapsed).toBeLessThan(1000); // Should reject very quickly after abort + } +}); + +test.concurrent("pre-aborted signal prevents connection attempt", async () => { + const nonRoutableIP = "192.0.2.1"; + const port = 80; + + const start = performance.now(); + try { + await fetch(`http://${nonRoutableIP}:${port}/`, { + signal: AbortSignal.abort(), + }); + expect.unreachable("Fetch should have aborted"); + } catch (e: any) { + const elapsed = performance.now() - start; + expect(e.name).toBe("AbortError"); + expect(elapsed).toBeLessThan(10); // Should fail immediately + } +}); diff --git a/test/js/bun/s3/s3-stream-leak-fixture.js b/test/js/bun/s3/s3-stream-leak-fixture.js index b5052d5d86..b440f8334b 100644 --- a/test/js/bun/s3/s3-stream-leak-fixture.js +++ b/test/js/bun/s3/s3-stream-leak-fixture.js @@ -33,7 +33,7 @@ async function run(inputType) { const rss = (process.memoryUsage.rss() / 1024 / 1024) | 0; if (rss > MAX_ALLOWED_MEMORY_USAGE) { await s3file.unlink(); - throw new Error("Memory usage is too high"); + throw new Error("RSS reached " + rss + "MB"); } } await run(new Buffer(1024 * 1024 * 1, "A".charCodeAt(0)).toString("utf-8")); diff --git a/test/js/bun/s3/s3.leak.test.ts b/test/js/bun/s3/s3.leak.test.ts index ac3688996c..65a655ad0e 100644 --- a/test/js/bun/s3/s3.leak.test.ts +++ b/test/js/bun/s3/s3.leak.test.ts @@ -33,13 +33,12 @@ describe.skipIf(!s3Options.accessKeyId)("s3", () => { AWS_ENDPOINT: s3Options.endpoint, AWS_BUCKET: S3Bucket, }, - stderr: "pipe", + stderr: "inherit", stdout: "inherit", stdin: "ignore", }, ); expect(exitCode).toBe(0); - expect(stderr.toString()).toBe(""); }, 30 * 1000, ); diff --git a/test/js/web/fetch/fetch.test.ts b/test/js/web/fetch/fetch.test.ts index 90e7728673..c307515cb3 100644 --- a/test/js/web/fetch/fetch.test.ts +++ b/test/js/web/fetch/fetch.test.ts @@ -295,10 +295,45 @@ describe("AbortSignal", () => { method: "POST", body: new ReadableStream({ pull(event_controller) { + console.count("pull"); event_controller.enqueue(new Uint8Array([1, 2, 3, 4])); //this will abort immediately should abort before connected controller.abort(); }, + cancel(reason) { + console.log("cancel", reason); + }, + }), + signal: controller.signal, + }); + expect.unreachable(); + } catch (ex: any) { + expect(ex?.message).toEqual("The operation was aborted."); + expect(ex?.name).toEqual("AbortError"); + expect(ex?.constructor.name).toEqual("DOMException"); + } + }); + + it("abort while uploading prevents pull() from being called", async () => { + const controller = new AbortController(); + await fetch(`http://localhost:${server.port}`, { + method: "POST", + body: new Blob(["a"]), + }); + + try { + await fetch(`http://localhost:${server.port}`, { + method: "POST", + body: new ReadableStream({ + async pull(event_controller) { + expect(controller.signal.aborted).toBeFalse(); + const chunk = Buffer.alloc(256 * 1024, "abc"); + for (let i = 0; i < 64; i++) { + event_controller.enqueue(chunk); + } + //this will abort immediately should abort before connected + controller.abort(); + }, }), signal: controller.signal, });