fix(serve) fix abrupt close when downloading data (#12581)

Co-authored-by: cirospaciari <cirospaciari@users.noreply.github.com>
This commit is contained in:
Ciro Spaciari
2024-07-16 16:39:37 -07:00
committed by GitHub
parent ff0dc62314
commit 37036f2eb0
3 changed files with 226 additions and 162 deletions

View File

@@ -1283,7 +1283,6 @@ fn NewFlags(comptime debug_mode: bool) type {
has_marked_pending: bool = false,
has_abort_handler: bool = false,
has_sendfile_ctx: bool = false,
has_pending_read: bool = false,
has_called_error_handler: bool = false,
needs_content_length: bool = false,
needs_content_range: bool = false,
@@ -1422,11 +1421,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
/// We can only safely free once the request body promise is finalized
/// and the response is rejected
response_jsvalue: JSC.JSValue = JSC.JSValue.zero,
pending_promises_for_abort: u8 = 0,
ref_count: u8 = 1,
response_ptr: ?*JSC.WebCore.Response = null,
blob: JSC.WebCore.AnyBlob = JSC.WebCore.AnyBlob{ .Blob = .{} },
promise: ?*JSC.JSValue = null,
sendfile: SendfileContext = undefined,
request_body: ?*JSC.WebCore.BodyValueRef = null,
@@ -1476,15 +1474,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const result = arguments.ptr[0];
result.ensureStillAlive();
ctx.pending_promises_for_abort -|= 1;
defer ctx.deref();
if (ctx.isAbortedOrEnded()) {
ctx.finalizeForAbort();
ctx.deref();
return JSValue.jsUndefined();
}
if (ctx.didUpgradeWebSocket()) {
ctx.finalize();
ctx.deref();
return JSValue.jsUndefined();
}
@@ -1538,10 +1536,75 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctx.render(response);
}
pub fn finalizeForAbort(this: *RequestContext) void {
streamLog("finalizeForAbort", .{});
this.pending_promises_for_abort -|= 1;
if (this.pending_promises_for_abort == 0) this.finalize();
pub fn shouldRenderMissing(this: *RequestContext) bool {
// If we did not respond yet, we should render missing
// To allow this all the conditions above should be true:
// 1 - still has a response (not detached)
// 2 - not aborted
// 3 - not marked completed
// 4 - not marked pending
// 5 - is the only reference of the context
// 6 - is not waiting for request body
// 7 - did not call sendfile
return this.resp != null and !this.flags.aborted and !this.flags.has_marked_complete and !this.flags.has_marked_pending and this.ref_count == 1 and !this.flags.is_waiting_for_request_body and !this.flags.has_sendfile_ctx;
}
pub fn isDeadRequest(this: *RequestContext) bool {
// check if has pending promise or extra reference (aka not the only reference)
if (this.ref_count > 1) return false;
// check if the body is Locked (streaming)
if (this.request_body) |body| {
if (body.value == .Locked) {
return false;
}
}
return true;
}
/// destroy RequestContext, should be only called by deref or if defer_deinit_until_callback_completes is ref is set to true
fn deinit(this: *RequestContext) void {
this.detachResponse();
// TODO: has_marked_complete is doing something?
this.flags.has_marked_complete = true;
if (this.defer_deinit_until_callback_completes) |defer_deinit| {
defer_deinit.* = true;
ctxLog("deferred deinit <d> ({*})<r>", .{this});
return;
}
ctxLog("deinit<d> ({*})<r>", .{this});
if (comptime Environment.allow_assert)
assert(this.flags.has_finalized);
this.request_body_buf.clearAndFree(this.allocator);
this.response_buf_owned.clearAndFree(this.allocator);
if (this.request_body) |body| {
_ = body.unref();
this.request_body = null;
}
const server = this.server;
server.request_pool_allocator.put(this);
server.onRequestComplete();
}
pub fn deref(this: *RequestContext) void {
streamLog("deref", .{});
bun.assert(this.ref_count > 0);
const ref_count = this.ref_count;
this.ref_count -= 1;
if (ref_count == 1) {
this.finalizeWithoutDeinit();
this.deinit();
}
}
pub fn ref(this: *RequestContext) void {
streamLog("ref", .{});
this.ref_count += 1;
}
pub fn onReject(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(JSC.conv) JSValue {
@@ -1551,7 +1614,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
var ctx = arguments.ptr[1].asPromisePtr(@This());
const err = arguments.ptr[0];
ctx.pending_promises_for_abort -|= 1;
defer ctx.deref();
handleReject(ctx, if (!err.isEmptyOrUndefinedOrNull()) err else JSC.JSValue.jsUndefined());
return JSValue.jsUndefined();
@@ -1559,7 +1622,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
fn handleReject(ctx: *RequestContext, value: JSC.JSValue) void {
if (ctx.isAbortedOrEnded()) {
ctx.finalizeForAbort();
ctx.deref();
return;
}
@@ -1582,13 +1645,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// check again in case it get aborted after runErrorHandler
if (ctx.isAbortedOrEnded()) {
ctx.finalizeForAbort();
ctx.deref();
return;
}
// I don't think this case happens?
if (ctx.didUpgradeWebSocket()) {
ctx.finalize();
ctx.deref();
return;
}
@@ -1602,7 +1665,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (ctx.resp) |resp| {
resp.runCorkedWithType(*RequestContext, renderMissingCorked, ctx);
}
ctx.finalize();
ctx.deref();
}
pub fn renderMissingCorked(ctx: *RequestContext) void {
@@ -1721,7 +1784,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return;
}
}
this.finalize();
this.deref();
}
/// Drain a partial response buffer
@@ -1739,40 +1802,27 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn end(this: *RequestContext, data: []const u8, closeConnection: bool) void {
if (this.resp) |resp| {
if (this.flags.is_waiting_for_request_body) {
this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.end(data, closeConnection);
this.detachResponse();
resp.end(data, closeConnection);
}
}
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
ctxLog("endStream", .{});
if (this.resp) |resp| {
if (this.flags.is_waiting_for_request_body) {
this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
this.detachResponse();
// This will send a terminating 0\r\n\r\n chunk to the client
// We only want to do that if they're still expecting a body
// We cannot call this function if the Content-Length header was previously set
if (resp.state().isResponsePending())
resp.endStream(closeConnection);
this.detachResponse();
}
}
pub fn endWithoutBody(this: *RequestContext, closeConnection: bool) void {
if (this.resp) |resp| {
if (this.flags.is_waiting_for_request_body) {
this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.endWithoutBody(closeConnection);
this.detachResponse();
resp.endWithoutBody(closeConnection);
}
}
@@ -1781,11 +1831,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
assert(this.resp == resp);
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return false;
}
this.end("", this.shouldCloseConnection());
this.finalize();
this.deref();
return false;
}
@@ -1795,7 +1845,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
assert(this.resp == resp);
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return false;
}
@@ -1805,7 +1855,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.method == .HEAD) {
this.end("", this.shouldCloseConnection());
this.finalize();
this.deref();
return false;
}
@@ -1816,7 +1866,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctxLog("onWritableCompleteResponseBuffer", .{});
assert(this.resp == resp);
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return false;
}
return this.sendWritableBytesForCompleteResponseBuffer(this.response_buf_owned.items, write_offset, resp);
@@ -1834,27 +1884,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctxLog("create<d> ({*})<r>", .{this});
}
pub fn isDeadRequest(this: *RequestContext) bool {
if (this.pending_promises_for_abort > 0 or this.flags.has_pending_read) return false;
if (this.promise != null) {
return false;
}
if (this.request_body) |body| {
if (body.value == .Locked) {
return false;
}
}
return true;
}
pub fn onAbort(this: *RequestContext, resp: *App.Response) void {
assert(this.resp == resp);
assert(!this.flags.aborted);
// mark request as aborted
this.flags.aborted = true;
// we should not use the response anymore
this.resp = null;
var any_js_calls = false;
var vm = this.server.vm;
defer {
@@ -1886,46 +1922,33 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// if we can, free the request now.
if (this.isDeadRequest()) {
this.finalizeWithoutDeinit();
this.deinit();
this.deref();
} else {
this.pending_promises_for_abort = 0;
this.ref();
defer this.deref();
// if we cannot, we have to reject pending promises
// first, we reject the request body promise
if (this.request_body) |body| {
// User called .blob(), .json(), text(), or .arrayBuffer() on the Request object
// but we received nothing or the connection was aborted
if (body.value == .Locked) {
// the promise is pending
if (body.value.Locked.action != .none or body.value.Locked.promise != null) {
this.pending_promises_for_abort += 1;
} else if (body.value.Locked.readable.get()) |readable| {
readable.abort(this.server.globalThis);
body.value.Locked.readable.deinit();
any_js_calls = true;
}
body.value.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis);
any_js_calls = true;
}
}
if (this.response_ptr) |response| {
if (response.body.value == .Locked) {
if (response.body.value.Locked.readable.get()) |readable| {
defer response.body.value.Locked.readable.deinit();
var strong_readable = response.body.value.Locked.readable;
response.body.value.Locked.readable = .{};
defer strong_readable.deinit();
if (strong_readable.get()) |readable| {
readable.abort(this.server.globalThis);
any_js_calls = true;
}
}
}
// then, we reject the response promise
if (this.promise) |promise| {
this.pending_promises_for_abort += 1;
this.promise = null;
promise.asAnyPromise().?.reject(this.server.globalThis, JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis));
any_js_calls = true;
}
}
}
@@ -1975,16 +1998,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
if (this.promise) |promise| {
ctxLog("finalizeWithoutDeinit: this.promise != null", .{});
this.promise = null;
if (promise.asAnyPromise()) |prom| {
prom.rejectAsHandled(this.server.globalThis, (JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis)));
}
JSC.C.JSValueUnprotect(this.server.globalThis, promise.asObjectRef());
}
if (this.byte_stream) |stream| {
ctxLog("finalizeWithoutDeinit: stream != null", .{});
@@ -2007,48 +2020,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
}
pub fn finalize(this: *RequestContext) void {
ctxLog("finalize<d> ({*})<r>", .{this});
this.finalizeWithoutDeinit();
this.deinit();
}
pub fn deinit(this: *RequestContext) void {
ctxLog("deinit<d> ({*})<r>", .{this});
if (!this.isDeadRequest()) {
ctxLog("deinit<d> ({*})<r> waiting request", .{this});
return;
}
if (!this.flags.has_marked_complete) this.server.onRequestComplete();
this.flags.has_marked_complete = true;
this.detachResponse();
if (this.defer_deinit_until_callback_completes) |defer_deinit| {
defer_deinit.* = true;
ctxLog("deferred deinit <d> ({*})<r>", .{this});
return;
}
ctxLog("deinit<d> ({*})<r>", .{this});
if (comptime Environment.allow_assert)
assert(this.flags.has_finalized);
if (comptime Environment.allow_assert)
assert(this.flags.has_marked_complete);
this.request_body_buf.clearAndFree(this.allocator);
this.response_buf_owned.clearAndFree(this.allocator);
if (this.request_body) |body| {
_ = body.unref();
this.request_body = null;
}
this.server.request_pool_allocator.put(this);
}
fn writeHeaders(
this: *RequestContext,
@@ -2086,7 +2057,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.sendfile.auto_close)
_ = bun.sys.close(this.sendfile.fd);
this.sendfile = undefined;
this.finalize();
this.deref();
}
const separator: string = "\r\n";
const separator_iovec = [1]std.posix.iovec_const{.{
@@ -2163,7 +2134,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctxLog("onWritableBytes", .{});
assert(this.resp == resp);
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return false;
}
@@ -2181,7 +2152,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const bytes = bytes_[@min(bytes_.len, @as(usize, @truncate(write_offset)))..];
if (resp.tryEnd(bytes, bytes_.len, this.shouldCloseConnection())) {
this.finalize();
this.deref();
return true;
} else {
this.flags.has_marked_pending = true;
@@ -2197,7 +2168,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const bytes = bytes_[@min(bytes_.len, @as(usize, @truncate(write_offset)))..];
if (resp.tryEnd(bytes, bytes_.len, this.shouldCloseConnection())) {
this.response_buf_owned.items.len = 0;
this.finalize();
this.deref();
} else {
this.flags.has_marked_pending = true;
resp.onWritable(*RequestContext, onWritableCompleteResponseBuffer, this);
@@ -2331,7 +2302,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn doSendfile(this: *RequestContext, blob: Blob) void {
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return;
}
@@ -2344,15 +2315,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
this.setAbortHandler();
this.flags.has_pending_read = true;
this.ref();
this.blob.Blob.doReadFileInternal(*RequestContext, this, onReadFile, this.server.globalThis);
}
pub fn onReadFile(this: *RequestContext, result: Blob.ReadFile.ResultType) void {
this.flags.has_pending_read = false;
defer this.deref();
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return;
}
@@ -2405,7 +2376,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
fn renderWithBlobFromBodyValue(this: *RequestContext) void {
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return;
}
@@ -2434,7 +2405,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.isAbortedOrEnded()) {
stream.cancel(this.server.globalThis);
this.readable_stream_ref.deinit();
this.finalizeForAbort();
this.deref();
return;
}
const resp = this.resp.?;
@@ -2502,7 +2473,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
stream.done(this.server.globalThis);
this.readable_stream_ref.deinit();
this.endStream(this.shouldCloseConnection());
this.finalize();
this.deref();
return;
}
@@ -2524,7 +2495,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// TODO: should this timeout?
this.setAbortHandler();
this.pending_promises_for_abort += 1;
this.ref();
this.response_ptr.?.body.value = .{
.Locked = .{
.readable = JSC.WebCore.ReadableStream.Strong.init(stream, globalThis),
@@ -2576,7 +2547,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
defer this.readable_stream_ref.deinit();
response_stream.sink.markDone();
this.finalizeForAbort();
this.deref();
response_stream.sink.onFirstWrite = null;
response_stream.sink.finalize();
@@ -2645,7 +2616,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
fn detachResponse(this: *RequestContext) void {
this.resp = null;
if (this.resp) |resp| {
this.resp = null;
// onAbort should have set this to null
bun.assert(!this.flags.aborted);
if (this.flags.is_waiting_for_request_body) {
this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
if (this.flags.has_abort_handler) {
resp.clearAborted();
this.flags.has_abort_handler = false;
}
}
}
fn isAbortedOrEnded(this: *const RequestContext) bool {
@@ -2678,7 +2662,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctx.drainMicrotasks();
if (ctx.isAbortedOrEnded()) {
ctx.finalizeForAbort();
ctx.deref();
return;
}
@@ -2687,7 +2671,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// just ignore the Response object. It doesn't do anything.
// it's better to do that than to throw an error
if (ctx.didUpgradeWebSocket()) {
ctx.finalize();
ctx.deref();
return;
}
@@ -2740,7 +2724,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// just ignore the Response object. It doesn't do anything.
// it's better to do that than to throw an error
if (ctx.didUpgradeWebSocket()) {
ctx.finalize();
ctx.deref();
return;
}
@@ -2785,7 +2769,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (wait_for_promise) {
ctx.pending_promises_for_abort += 1;
ctx.ref();
response_value.then(this.globalThis, ctx, RequestContext.onResolve, RequestContext.onReject);
return;
}
@@ -2820,7 +2804,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onResolve({any})", .{wrote_anything});
//aborted so call finalizeForAbort
if (req.isAbortedOrEnded()) {
req.finalizeForAbort();
req.deref();
return;
}
const resp = req.resp.?;
@@ -2835,14 +2819,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
req.endStream(req.shouldCloseConnection());
}
req.finalize();
req.deref();
}
pub fn onResolveStream(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(JSC.conv) JSValue {
streamLog("onResolveStream", .{});
var args = callframe.arguments(2);
var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
req.pending_promises_for_abort -|= 1;
defer req.deref();
req.handleResolveStream();
return JSValue.jsUndefined();
}
@@ -2850,7 +2834,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onRejectStream", .{});
const args = callframe.arguments(2);
var req = args.ptr[args.len - 1].asPromisePtr(@This());
req.pending_promises_for_abort -|= 1;
defer req.deref();
const err = args.ptr[0];
req.handleRejectStream(globalThis, err);
return JSValue.jsUndefined();
@@ -2881,7 +2865,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// aborted so call finalizeForAbort
if (req.isAbortedOrEnded()) {
req.finalizeForAbort();
req.deref();
return;
}
@@ -2899,7 +2883,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
req.server.vm.runErrorHandler(err, &exception_list);
}
}
req.finalize();
req.deref();
}
pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void {
@@ -2914,7 +2898,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const err = value.Error;
_ = value.use();
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return;
}
this.runErrorHandler(err);
@@ -2932,7 +2916,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.Locked => |*lock| {
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return;
}
@@ -3047,7 +3031,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return;
}
const resp = this.resp.?;
@@ -3060,7 +3044,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (resp.write(chunk)) {
if (stream.isDone()) {
this.endStream(this.shouldCloseConnection());
this.finalize();
this.deref();
}
} else {
// when it's the last one, we just want to know if it's done
@@ -3095,7 +3079,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctxLog("doRender", .{});
if (this.isAbortedOrEnded()) {
this.finalizeForAbort();
this.deref();
return;
}
var response = this.response_ptr.?;
@@ -3123,7 +3107,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
}
}
this.finalize();
this.deref();
}
pub fn runErrorHandler(
@@ -3281,7 +3265,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// Promise is not fulfilled yet
{
ctx.flags.is_error_promise_pending = true;
ctx.pending_promises_for_abort += 1;
ctx.ref();
promise_js.then(
ctx.server.globalThis,
ctx,
@@ -3435,7 +3419,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
this.finalize();
this.deref();
}
pub fn render(this: *RequestContext, response: *JSC.WebCore.Response) void {
@@ -6347,7 +6331,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
return;
}
if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body and !ctx.flags.has_sendfile_ctx) {
if (ctx.shouldRenderMissing()) {
ctx.renderMissing();
return;
}
@@ -6416,7 +6400,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
return;
}
if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body and !ctx.flags.has_sendfile_ctx) {
if (ctx.shouldRenderMissing()) {
ctx.renderMissing();
return;
}

View File

@@ -861,7 +861,7 @@ pub const Body = struct {
error_instance.ensureStillAlive();
if (this.* == .Locked) {
var locked = this.Locked;
// will be unprotected by body value deinit
error_instance.protect();
this.* = .{ .Error = error_instance };

View File

@@ -1,4 +1,4 @@
import type { ServerWebSocket, Server } from "bun";
import type { ServerWebSocket, Server, Socket } from "bun";
import { describe, expect, test } from "bun:test";
import { bunExe, bunEnv, rejectUnauthorizedScope } from "harness";
import path from "path";
@@ -543,3 +543,83 @@ test("should be able to async upgrade using custom protocol", async () => {
expect(await promise).toBe(true);
});
test("should be able to abrubtly close a upload request", async () => {
const { promise, resolve } = Promise.withResolvers();
using server = Bun.serve({
port: 0,
hostname: "localhost",
maxRequestBodySize: 1024 * 1024 * 1024 * 16,
async fetch(req) {
let total_size = 0;
req.signal.addEventListener("abort", resolve);
for await (const chunk of req.body as ReadableStream) {
total_size += chunk.length;
if (total_size > 1024 * 1024 * 1024) {
return new Response("too big", { status: 413 });
}
}
return new Response("Received " + total_size);
},
});
// ~100KB
const chunk = Buffer.alloc(1024 * 100, "a");
// ~1GB
const MAX_PAYLOAD = 1024 * 1024 * 1024;
const request = Buffer.from(
`POST / HTTP/1.1\r\nHost: ${server.hostname}:${server.port}\r\nContent-Length: ${MAX_PAYLOAD}\r\n\r\n`,
);
type SocketInfo = { state: number; pending: Buffer | null };
function tryWritePending(socket: Socket<SocketInfo>) {
if (socket.data.pending === null) {
// first write
socket.data.pending = request;
}
const data = socket.data.pending as Buffer;
const written = socket.write(data);
if (written < data.byteLength) {
// partial write
socket.data.pending = data.slice(0, written);
return false;
}
// full write got to next state
if (socket.data.state === 0) {
// request sent -> send chunk
socket.data.pending = chunk;
} else {
// chunk sent -> delay shutdown
setTimeout(() => socket.shutdown(), 100);
}
socket.data.state++;
socket.flush();
return true;
}
function trySend(socket: Socket<SocketInfo>) {
while (socket.data.state < 2) {
if (!tryWritePending(socket)) {
return;
}
}
return;
}
await Bun.connect({
hostname: server.hostname,
port: server.port,
data: {
state: 0,
pending: null,
} as SocketInfo,
socket: {
open: trySend,
drain: trySend,
data(socket, data) {},
},
});
await promise;
expect().pass();
});