diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index c6bc8f1df7..78af6dac27 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1283,7 +1283,6 @@ fn NewFlags(comptime debug_mode: bool) type { has_marked_pending: bool = false, has_abort_handler: bool = false, has_sendfile_ctx: bool = false, - has_pending_read: bool = false, has_called_error_handler: bool = false, needs_content_length: bool = false, needs_content_range: bool = false, @@ -1422,11 +1421,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp /// We can only safely free once the request body promise is finalized /// and the response is rejected response_jsvalue: JSC.JSValue = JSC.JSValue.zero, - pending_promises_for_abort: u8 = 0, + ref_count: u8 = 1, response_ptr: ?*JSC.WebCore.Response = null, blob: JSC.WebCore.AnyBlob = JSC.WebCore.AnyBlob{ .Blob = .{} }, - promise: ?*JSC.JSValue = null, sendfile: SendfileContext = undefined, request_body: ?*JSC.WebCore.BodyValueRef = null, @@ -1476,15 +1474,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const result = arguments.ptr[0]; result.ensureStillAlive(); - ctx.pending_promises_for_abort -|= 1; + defer ctx.deref(); if (ctx.isAbortedOrEnded()) { - ctx.finalizeForAbort(); + ctx.deref(); return JSValue.jsUndefined(); } if (ctx.didUpgradeWebSocket()) { - ctx.finalize(); + ctx.deref(); return JSValue.jsUndefined(); } @@ -1538,10 +1536,75 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctx.render(response); } - pub fn finalizeForAbort(this: *RequestContext) void { - streamLog("finalizeForAbort", .{}); - this.pending_promises_for_abort -|= 1; - if (this.pending_promises_for_abort == 0) this.finalize(); + pub fn shouldRenderMissing(this: *RequestContext) bool { + // If we did not respond yet, we should render missing + // To allow this all the conditions above should be true: + // 1 - still has a response (not detached) + // 2 - not aborted + // 3 - not marked completed + // 4 - not marked pending + // 5 - is the only reference of the context + // 6 - is not waiting for request body + // 7 - did not call sendfile + return this.resp != null and !this.flags.aborted and !this.flags.has_marked_complete and !this.flags.has_marked_pending and this.ref_count == 1 and !this.flags.is_waiting_for_request_body and !this.flags.has_sendfile_ctx; + } + + pub fn isDeadRequest(this: *RequestContext) bool { + // check if has pending promise or extra reference (aka not the only reference) + if (this.ref_count > 1) return false; + // check if the body is Locked (streaming) + if (this.request_body) |body| { + if (body.value == .Locked) { + return false; + } + } + + return true; + } + + /// destroy RequestContext, should be only called by deref or if defer_deinit_until_callback_completes is ref is set to true + fn deinit(this: *RequestContext) void { + this.detachResponse(); + // TODO: has_marked_complete is doing something? + this.flags.has_marked_complete = true; + + if (this.defer_deinit_until_callback_completes) |defer_deinit| { + defer_deinit.* = true; + ctxLog("deferred deinit ({*})", .{this}); + return; + } + + ctxLog("deinit ({*})", .{this}); + if (comptime Environment.allow_assert) + assert(this.flags.has_finalized); + + this.request_body_buf.clearAndFree(this.allocator); + this.response_buf_owned.clearAndFree(this.allocator); + + if (this.request_body) |body| { + _ = body.unref(); + this.request_body = null; + } + + const server = this.server; + server.request_pool_allocator.put(this); + server.onRequestComplete(); + } + + pub fn deref(this: *RequestContext) void { + streamLog("deref", .{}); + bun.assert(this.ref_count > 0); + const ref_count = this.ref_count; + this.ref_count -= 1; + if (ref_count == 1) { + this.finalizeWithoutDeinit(); + this.deinit(); + } + } + + pub fn ref(this: *RequestContext) void { + streamLog("ref", .{}); + this.ref_count += 1; } pub fn onReject(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(JSC.conv) JSValue { @@ -1551,7 +1614,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp var ctx = arguments.ptr[1].asPromisePtr(@This()); const err = arguments.ptr[0]; - ctx.pending_promises_for_abort -|= 1; + defer ctx.deref(); handleReject(ctx, if (!err.isEmptyOrUndefinedOrNull()) err else JSC.JSValue.jsUndefined()); return JSValue.jsUndefined(); @@ -1559,7 +1622,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp fn handleReject(ctx: *RequestContext, value: JSC.JSValue) void { if (ctx.isAbortedOrEnded()) { - ctx.finalizeForAbort(); + ctx.deref(); return; } @@ -1582,13 +1645,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // check again in case it get aborted after runErrorHandler if (ctx.isAbortedOrEnded()) { - ctx.finalizeForAbort(); + ctx.deref(); return; } // I don't think this case happens? if (ctx.didUpgradeWebSocket()) { - ctx.finalize(); + ctx.deref(); return; } @@ -1602,7 +1665,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (ctx.resp) |resp| { resp.runCorkedWithType(*RequestContext, renderMissingCorked, ctx); } - ctx.finalize(); + ctx.deref(); } pub fn renderMissingCorked(ctx: *RequestContext) void { @@ -1721,7 +1784,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp return; } } - this.finalize(); + this.deref(); } /// Drain a partial response buffer @@ -1739,40 +1802,27 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn end(this: *RequestContext, data: []const u8, closeConnection: bool) void { if (this.resp) |resp| { - if (this.flags.is_waiting_for_request_body) { - this.flags.is_waiting_for_request_body = false; - resp.clearOnData(); - } - resp.end(data, closeConnection); this.detachResponse(); + resp.end(data, closeConnection); } } pub fn endStream(this: *RequestContext, closeConnection: bool) void { ctxLog("endStream", .{}); if (this.resp) |resp| { - if (this.flags.is_waiting_for_request_body) { - this.flags.is_waiting_for_request_body = false; - resp.clearOnData(); - } - + this.detachResponse(); // This will send a terminating 0\r\n\r\n chunk to the client // We only want to do that if they're still expecting a body // We cannot call this function if the Content-Length header was previously set if (resp.state().isResponsePending()) resp.endStream(closeConnection); - this.detachResponse(); } } pub fn endWithoutBody(this: *RequestContext, closeConnection: bool) void { if (this.resp) |resp| { - if (this.flags.is_waiting_for_request_body) { - this.flags.is_waiting_for_request_body = false; - resp.clearOnData(); - } - resp.endWithoutBody(closeConnection); this.detachResponse(); + resp.endWithoutBody(closeConnection); } } @@ -1781,11 +1831,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp assert(this.resp == resp); if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return false; } this.end("", this.shouldCloseConnection()); - this.finalize(); + this.deref(); return false; } @@ -1795,7 +1845,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp assert(this.resp == resp); if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return false; } @@ -1805,7 +1855,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.method == .HEAD) { this.end("", this.shouldCloseConnection()); - this.finalize(); + this.deref(); return false; } @@ -1816,7 +1866,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctxLog("onWritableCompleteResponseBuffer", .{}); assert(this.resp == resp); if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return false; } return this.sendWritableBytesForCompleteResponseBuffer(this.response_buf_owned.items, write_offset, resp); @@ -1834,27 +1884,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctxLog("create ({*})", .{this}); } - pub fn isDeadRequest(this: *RequestContext) bool { - if (this.pending_promises_for_abort > 0 or this.flags.has_pending_read) return false; - - if (this.promise != null) { - return false; - } - - if (this.request_body) |body| { - if (body.value == .Locked) { - return false; - } - } - - return true; - } - pub fn onAbort(this: *RequestContext, resp: *App.Response) void { assert(this.resp == resp); assert(!this.flags.aborted); // mark request as aborted this.flags.aborted = true; + // we should not use the response anymore + this.resp = null; var any_js_calls = false; var vm = this.server.vm; defer { @@ -1886,46 +1922,33 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // if we can, free the request now. if (this.isDeadRequest()) { this.finalizeWithoutDeinit(); - this.deinit(); + this.deref(); } else { - this.pending_promises_for_abort = 0; + this.ref(); + defer this.deref(); // if we cannot, we have to reject pending promises // first, we reject the request body promise if (this.request_body) |body| { // User called .blob(), .json(), text(), or .arrayBuffer() on the Request object // but we received nothing or the connection was aborted - if (body.value == .Locked) { - // the promise is pending - if (body.value.Locked.action != .none or body.value.Locked.promise != null) { - this.pending_promises_for_abort += 1; - } else if (body.value.Locked.readable.get()) |readable| { - readable.abort(this.server.globalThis); - body.value.Locked.readable.deinit(); - any_js_calls = true; - } body.value.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); + any_js_calls = true; } } if (this.response_ptr) |response| { if (response.body.value == .Locked) { - if (response.body.value.Locked.readable.get()) |readable| { - defer response.body.value.Locked.readable.deinit(); + var strong_readable = response.body.value.Locked.readable; + response.body.value.Locked.readable = .{}; + defer strong_readable.deinit(); + if (strong_readable.get()) |readable| { readable.abort(this.server.globalThis); any_js_calls = true; } } } - - // then, we reject the response promise - if (this.promise) |promise| { - this.pending_promises_for_abort += 1; - this.promise = null; - promise.asAnyPromise().?.reject(this.server.globalThis, JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis)); - any_js_calls = true; - } } } @@ -1975,16 +1998,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } - if (this.promise) |promise| { - ctxLog("finalizeWithoutDeinit: this.promise != null", .{}); - this.promise = null; - - if (promise.asAnyPromise()) |prom| { - prom.rejectAsHandled(this.server.globalThis, (JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis))); - } - JSC.C.JSValueUnprotect(this.server.globalThis, promise.asObjectRef()); - } - if (this.byte_stream) |stream| { ctxLog("finalizeWithoutDeinit: stream != null", .{}); @@ -2007,48 +2020,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } } - pub fn finalize(this: *RequestContext) void { - ctxLog("finalize ({*})", .{this}); - this.finalizeWithoutDeinit(); - this.deinit(); - } - - pub fn deinit(this: *RequestContext) void { - ctxLog("deinit ({*})", .{this}); - - if (!this.isDeadRequest()) { - ctxLog("deinit ({*}) waiting request", .{this}); - return; - } - - if (!this.flags.has_marked_complete) this.server.onRequestComplete(); - this.flags.has_marked_complete = true; - - this.detachResponse(); - - if (this.defer_deinit_until_callback_completes) |defer_deinit| { - defer_deinit.* = true; - ctxLog("deferred deinit ({*})", .{this}); - return; - } - - ctxLog("deinit ({*})", .{this}); - if (comptime Environment.allow_assert) - assert(this.flags.has_finalized); - - if (comptime Environment.allow_assert) - assert(this.flags.has_marked_complete); - - this.request_body_buf.clearAndFree(this.allocator); - this.response_buf_owned.clearAndFree(this.allocator); - - if (this.request_body) |body| { - _ = body.unref(); - this.request_body = null; - } - - this.server.request_pool_allocator.put(this); - } fn writeHeaders( this: *RequestContext, @@ -2086,7 +2057,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.sendfile.auto_close) _ = bun.sys.close(this.sendfile.fd); this.sendfile = undefined; - this.finalize(); + this.deref(); } const separator: string = "\r\n"; const separator_iovec = [1]std.posix.iovec_const{.{ @@ -2163,7 +2134,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctxLog("onWritableBytes", .{}); assert(this.resp == resp); if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return false; } @@ -2181,7 +2152,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const bytes = bytes_[@min(bytes_.len, @as(usize, @truncate(write_offset)))..]; if (resp.tryEnd(bytes, bytes_.len, this.shouldCloseConnection())) { - this.finalize(); + this.deref(); return true; } else { this.flags.has_marked_pending = true; @@ -2197,7 +2168,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const bytes = bytes_[@min(bytes_.len, @as(usize, @truncate(write_offset)))..]; if (resp.tryEnd(bytes, bytes_.len, this.shouldCloseConnection())) { this.response_buf_owned.items.len = 0; - this.finalize(); + this.deref(); } else { this.flags.has_marked_pending = true; resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this); @@ -2331,7 +2302,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn doSendfile(this: *RequestContext, blob: Blob) void { if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return; } @@ -2344,15 +2315,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } this.setAbortHandler(); - this.flags.has_pending_read = true; + this.ref(); this.blob.Blob.doReadFileInternal(*RequestContext, this, onReadFile, this.server.globalThis); } pub fn onReadFile(this: *RequestContext, result: Blob.ReadFile.ResultType) void { - this.flags.has_pending_read = false; + defer this.deref(); if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return; } @@ -2405,7 +2376,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp fn renderWithBlobFromBodyValue(this: *RequestContext) void { if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return; } @@ -2434,7 +2405,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.isAbortedOrEnded()) { stream.cancel(this.server.globalThis); this.readable_stream_ref.deinit(); - this.finalizeForAbort(); + this.deref(); return; } const resp = this.resp.?; @@ -2502,7 +2473,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp stream.done(this.server.globalThis); this.readable_stream_ref.deinit(); this.endStream(this.shouldCloseConnection()); - this.finalize(); + this.deref(); return; } @@ -2524,7 +2495,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // TODO: should this timeout? this.setAbortHandler(); - this.pending_promises_for_abort += 1; + this.ref(); this.response_ptr.?.body.value = .{ .Locked = .{ .readable = JSC.WebCore.ReadableStream.Strong.init(stream, globalThis), @@ -2576,7 +2547,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp defer this.readable_stream_ref.deinit(); response_stream.sink.markDone(); - this.finalizeForAbort(); + this.deref(); response_stream.sink.onFirstWrite = null; response_stream.sink.finalize(); @@ -2645,7 +2616,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } fn detachResponse(this: *RequestContext) void { - this.resp = null; + if (this.resp) |resp| { + this.resp = null; + + // onAbort should have set this to null + bun.assert(!this.flags.aborted); + if (this.flags.is_waiting_for_request_body) { + this.flags.is_waiting_for_request_body = false; + resp.clearOnData(); + } + if (this.flags.has_abort_handler) { + resp.clearAborted(); + this.flags.has_abort_handler = false; + } + } } fn isAbortedOrEnded(this: *const RequestContext) bool { @@ -2678,7 +2662,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctx.drainMicrotasks(); if (ctx.isAbortedOrEnded()) { - ctx.finalizeForAbort(); + ctx.deref(); return; } @@ -2687,7 +2671,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // just ignore the Response object. It doesn't do anything. // it's better to do that than to throw an error if (ctx.didUpgradeWebSocket()) { - ctx.finalize(); + ctx.deref(); return; } @@ -2740,7 +2724,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // just ignore the Response object. It doesn't do anything. // it's better to do that than to throw an error if (ctx.didUpgradeWebSocket()) { - ctx.finalize(); + ctx.deref(); return; } @@ -2785,7 +2769,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (wait_for_promise) { - ctx.pending_promises_for_abort += 1; + ctx.ref(); response_value.then(this.globalThis, ctx, RequestContext.onResolve, RequestContext.onReject); return; } @@ -2820,7 +2804,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp streamLog("onResolve({any})", .{wrote_anything}); //aborted so call finalizeForAbort if (req.isAbortedOrEnded()) { - req.finalizeForAbort(); + req.deref(); return; } const resp = req.resp.?; @@ -2835,14 +2819,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp req.endStream(req.shouldCloseConnection()); } - req.finalize(); + req.deref(); } pub fn onResolveStream(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(JSC.conv) JSValue { streamLog("onResolveStream", .{}); var args = callframe.arguments(2); var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This()); - req.pending_promises_for_abort -|= 1; + defer req.deref(); req.handleResolveStream(); return JSValue.jsUndefined(); } @@ -2850,7 +2834,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp streamLog("onRejectStream", .{}); const args = callframe.arguments(2); var req = args.ptr[args.len - 1].asPromisePtr(@This()); - req.pending_promises_for_abort -|= 1; + defer req.deref(); const err = args.ptr[0]; req.handleRejectStream(globalThis, err); return JSValue.jsUndefined(); @@ -2881,7 +2865,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // aborted so call finalizeForAbort if (req.isAbortedOrEnded()) { - req.finalizeForAbort(); + req.deref(); return; } @@ -2899,7 +2883,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp req.server.vm.runErrorHandler(err, &exception_list); } } - req.finalize(); + req.deref(); } pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void { @@ -2914,7 +2898,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const err = value.Error; _ = value.use(); if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return; } this.runErrorHandler(err); @@ -2932,7 +2916,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, .Locked => |*lock| { if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return; } @@ -3047,7 +3031,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return; } const resp = this.resp.?; @@ -3060,7 +3044,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (resp.write(chunk)) { if (stream.isDone()) { this.endStream(this.shouldCloseConnection()); - this.finalize(); + this.deref(); } } else { // when it's the last one, we just want to know if it's done @@ -3095,7 +3079,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctxLog("doRender", .{}); if (this.isAbortedOrEnded()) { - this.finalizeForAbort(); + this.deref(); return; } var response = this.response_ptr.?; @@ -3123,7 +3107,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, } } - this.finalize(); + this.deref(); } pub fn runErrorHandler( @@ -3281,7 +3265,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // Promise is not fulfilled yet { ctx.flags.is_error_promise_pending = true; - ctx.pending_promises_for_abort += 1; + ctx.ref(); promise_js.then( ctx.server.globalThis, ctx, @@ -3435,7 +3419,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } - this.finalize(); + this.deref(); } pub fn render(this: *RequestContext, response: *JSC.WebCore.Response) void { @@ -6347,7 +6331,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp return; } - if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body and !ctx.flags.has_sendfile_ctx) { + if (ctx.shouldRenderMissing()) { ctx.renderMissing(); return; } @@ -6416,7 +6400,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp return; } - if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body and !ctx.flags.has_sendfile_ctx) { + if (ctx.shouldRenderMissing()) { ctx.renderMissing(); return; } diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig index 6bb0d6b831..e4ff382112 100644 --- a/src/bun.js/webcore/body.zig +++ b/src/bun.js/webcore/body.zig @@ -861,7 +861,7 @@ pub const Body = struct { error_instance.ensureStillAlive(); if (this.* == .Locked) { var locked = this.Locked; - + // will be unprotected by body value deinit error_instance.protect(); this.* = .{ .Error = error_instance }; diff --git a/test/js/bun/http/bun-server.test.ts b/test/js/bun/http/bun-server.test.ts index 4d4cc571ea..30318d15d7 100644 --- a/test/js/bun/http/bun-server.test.ts +++ b/test/js/bun/http/bun-server.test.ts @@ -1,4 +1,4 @@ -import type { ServerWebSocket, Server } from "bun"; +import type { ServerWebSocket, Server, Socket } from "bun"; import { describe, expect, test } from "bun:test"; import { bunExe, bunEnv, rejectUnauthorizedScope } from "harness"; import path from "path"; @@ -543,3 +543,83 @@ test("should be able to async upgrade using custom protocol", async () => { expect(await promise).toBe(true); }); + +test("should be able to abrubtly close a upload request", async () => { + const { promise, resolve } = Promise.withResolvers(); + using server = Bun.serve({ + port: 0, + hostname: "localhost", + maxRequestBodySize: 1024 * 1024 * 1024 * 16, + async fetch(req) { + let total_size = 0; + req.signal.addEventListener("abort", resolve); + + for await (const chunk of req.body as ReadableStream) { + total_size += chunk.length; + if (total_size > 1024 * 1024 * 1024) { + return new Response("too big", { status: 413 }); + } + } + + return new Response("Received " + total_size); + }, + }); + // ~100KB + const chunk = Buffer.alloc(1024 * 100, "a"); + // ~1GB + const MAX_PAYLOAD = 1024 * 1024 * 1024; + const request = Buffer.from( + `POST / HTTP/1.1\r\nHost: ${server.hostname}:${server.port}\r\nContent-Length: ${MAX_PAYLOAD}\r\n\r\n`, + ); + + type SocketInfo = { state: number; pending: Buffer | null }; + function tryWritePending(socket: Socket) { + if (socket.data.pending === null) { + // first write + socket.data.pending = request; + } + const data = socket.data.pending as Buffer; + const written = socket.write(data); + if (written < data.byteLength) { + // partial write + socket.data.pending = data.slice(0, written); + return false; + } + + // full write got to next state + if (socket.data.state === 0) { + // request sent -> send chunk + socket.data.pending = chunk; + } else { + // chunk sent -> delay shutdown + setTimeout(() => socket.shutdown(), 100); + } + socket.data.state++; + socket.flush(); + return true; + } + + function trySend(socket: Socket) { + while (socket.data.state < 2) { + if (!tryWritePending(socket)) { + return; + } + } + return; + } + await Bun.connect({ + hostname: server.hostname, + port: server.port, + data: { + state: 0, + pending: null, + } as SocketInfo, + socket: { + open: trySend, + drain: trySend, + data(socket, data) {}, + }, + }); + await promise; + expect().pass(); +});