diff --git a/packages/bun-uws/src/HttpContext.h b/packages/bun-uws/src/HttpContext.h index 68f72c36df..0011001469 100644 --- a/packages/bun-uws/src/HttpContext.h +++ b/packages/bun-uws/src/HttpContext.h @@ -305,7 +305,7 @@ private: /* Timeout on uncork failure */ auto [written, failed] = ((AsyncSocket *) returnedSocket)->uncork(); - if (failed) { + if (written > 0 || failed) { /* All Http sockets timeout by this, and this behavior match the one in HttpResponse::cork */ ((HttpResponse *) s)->resetTimeout(); } diff --git a/packages/bun-uws/src/HttpResponse.h b/packages/bun-uws/src/HttpResponse.h index 9d583c337b..33bef0e202 100644 --- a/packages/bun-uws/src/HttpResponse.h +++ b/packages/bun-uws/src/HttpResponse.h @@ -188,11 +188,8 @@ public: /* Success is when we wrote the entire thing without any failures */ bool success = written == data.length() && !failed; - - /* If we are now at the end, start a timeout. Also start a timeout if we failed. */ - if (!success || httpResponseData->offset == totalSize) { - this->resetTimeout(); - } + /* Reset the timeout on each tryEnd */ + this->resetTimeout(); /* Remove onAborted function if we reach the end */ if (httpResponseData->offset == totalSize) { @@ -482,9 +479,8 @@ public: Super::write("\r\n", 2); auto [written, failed] = Super::write(data.data(), (int) data.length()); - if (failed) { - this->resetTimeout(); - } + /* Reset timeout on each sended chunk */ + this->resetTimeout(); /* If we did not fail the write, accept more */ return !failed; @@ -539,7 +535,7 @@ public: return static_cast(newCorkedSocket); } - if (failed) { + if (written > 0 || failed) { /* For now we only have one single timeout so let's use it */ /* This behavior should equal the behavior in HttpContext when uncorking fails */ this->resetTimeout(); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index d110bc15b6..a41e266f33 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2522,6 +2522,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } fn registerAutoFlusher(this: *@This()) void { + // if we enqueue data we should reset the timeout + this.res.resetTimeout(); if (!this.auto_flusher.registered) AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM()); } diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index 2e238bd746..3e3d527096 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1161,7 +1161,15 @@ extern "C" uwsRes->resetTimeout(); } } - + void uws_res_reset_timeout(int ssl, uws_res_r res) { + if (ssl) { + uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; + uwsRes->resetTimeout(); + } else { + uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; + uwsRes->resetTimeout(); + } + } void uws_res_timeout(int ssl, uws_res_r res, uint8_t seconds) { if (ssl) { uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 9d0b1b1c55..5bfa67193f 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -2270,6 +2270,9 @@ pub fn NewApp(comptime ssl: bool) type { pub fn timeout(res: *Response, seconds: u8) void { uws_res_timeout(ssl_flag, res.downcast(), seconds); } + pub fn resetTimeout(res: *Response) void { + uws_res_reset_timeout(ssl_flag, res.downcast()); + } pub fn write(res: *Response, data: []const u8) bool { return uws_res_write(ssl_flag, res.downcast(), data.ptr, data.len); } @@ -2683,7 +2686,7 @@ extern fn uws_res_write_header_int(ssl: i32, res: *uws_res, key: [*c]const u8, k extern fn uws_res_end_without_body(ssl: i32, res: *uws_res, close_connection: bool) void; extern fn uws_res_end_sendfile(ssl: i32, res: *uws_res, write_offset: u64, close_connection: bool) void; extern fn uws_res_timeout(ssl: i32, res: *uws_res, timeout: u8) void; - +extern fn uws_res_reset_timeout(ssl: i32, res: *uws_res) void; extern fn uws_res_write(ssl: i32, res: *uws_res, data: [*c]const u8, length: usize) bool; extern fn uws_res_get_write_offset(ssl: i32, res: *uws_res) u64; extern fn uws_res_override_write_offset(ssl: i32, res: *uws_res, u64) void; diff --git a/test/js/bun/http/serve.test.ts b/test/js/bun/http/serve.test.ts index ec19fbcb68..5c49b238b7 100644 --- a/test/js/bun/http/serve.test.ts +++ b/test/js/bun/http/serve.test.ts @@ -1892,3 +1892,55 @@ it("should allow use of custom timeout", async () => { } await Promise.all([testTimeout("/ok", true), testTimeout("/timeout", false)]); }, 10_000); + +it("should reset timeout after writes", async () => { + // the default is 10s so we send 15 + // this test should take 15s at most + const CHUNKS = 15; + const payload = Buffer.from(`data: ${Date.now()}\n\n`); + using server = Bun.serve({ + idleTimeout: 5, + port: 0, + fetch(request, server) { + let controller!: ReadableStreamDefaultController; + let count = CHUNKS; + let interval = setInterval(() => { + controller.enqueue(payload); + count--; + if (count == 0) { + clearInterval(interval); + interval = null; + controller.close(); + return; + } + }, 1000); + return new Response( + new ReadableStream({ + start(_controller) { + controller = _controller; + }, + cancel(controller) { + if (interval) clearInterval(interval); + }, + }), + { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + }, + }, + ); + }, + }); + let received = 0; + const response = await fetch(server.url); + const stream = response.body.getReader(); + const decoder = new TextDecoder(); + while (true) { + const { done, value } = await stream.read(); + received += value?.length || 0; + if (done) break; + } + + expect(received).toBe(CHUNKS * payload.byteLength); +}, 20_000);