From 1011b44d789498de495d4f2b24e09a2929fed598 Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Fri, 6 Sep 2024 17:52:38 -0700 Subject: [PATCH] fix(node:http) implement request.setTimeout and server.setTimeout (#13772) Co-authored-by: Jarred Sumner --- packages/bun-types/bun.d.ts | 15 +++ packages/bun-uws/src/HttpContext.h | 6 + packages/bun-uws/src/HttpResponse.h | 19 +++ packages/bun-uws/src/HttpResponseData.h | 14 +- src/bun.js/api/bun/ssl_wrapper.zig | 1 + src/bun.js/api/server.classes.ts | 4 + src/bun.js/api/server.zig | 164 ++++++++++++++++++++++++ src/bun.js/bindings/NodeHTTP.cpp | 66 +++++++++- src/bun.js/webcore/request.zig | 67 ++++++++++ src/deps/libuwsockets.cpp | 32 +++++ src/deps/uws.zig | 25 +++- src/js/node/http.ts | 65 +++++++++- test/js/bun/http/serve.test.ts | 17 +++ test/js/node/http/node-http.test.ts | 130 ++++++++++++------- 14 files changed, 567 insertions(+), 58 deletions(-) diff --git a/packages/bun-types/bun.d.ts b/packages/bun-types/bun.d.ts index f717f08ddc..5b97a155cf 100644 --- a/packages/bun-types/bun.d.ts +++ b/packages/bun-types/bun.d.ts @@ -2811,6 +2811,21 @@ declare module "bun" { */ requestIP(request: Request): SocketAddress | null; + /** + * Reset the idleTimeout of the given Request to the number in seconds. 0 means no timeout. + * + * @example + * ```js + * export default { + * async fetch(request, server) { + * server.timeout(request, 60); + * await Bun.sleep(30000); + * return new Response("30 seconds have passed"); + * } + * } + * ``` + */ + timeout(request: Request, seconds: number): void; /** * Undo a call to {@link Server.unref} * diff --git a/packages/bun-uws/src/HttpContext.h b/packages/bun-uws/src/HttpContext.h index 0011001469..338683f816 100644 --- a/packages/bun-uws/src/HttpContext.h +++ b/packages/bun-uws/src/HttpContext.h @@ -415,6 +415,12 @@ private: /* Force close rather than gracefully shutdown and risk confusing the client with a complete download */ AsyncSocket *asyncSocket = (AsyncSocket *) s; + // Node.js by default sclose the connection but they emit the timeout event before that + HttpResponseData *httpResponseData = (HttpResponseData *) asyncSocket->getAsyncSocketData(); + + if (httpResponseData->onTimeout) { + httpResponseData->onTimeout((HttpResponse *)s, httpResponseData->userData); + } return asyncSocket->close(); }); diff --git a/packages/bun-uws/src/HttpResponse.h b/packages/bun-uws/src/HttpResponse.h index 33bef0e202..a58a86bd6a 100644 --- a/packages/bun-uws/src/HttpResponse.h +++ b/packages/bun-uws/src/HttpResponse.h @@ -586,19 +586,38 @@ public: httpResponseData->onAborted = handler; return this; } + + HttpResponse *onTimeout(void* userData, HttpResponseData::OnTimeoutCallback handler) { + HttpResponseData *httpResponseData = getHttpResponseData(); + + httpResponseData->userData = userData; + httpResponseData->onTimeout = handler; + return this; + } + HttpResponse* clearOnWritableAndAborted() { HttpResponseData *httpResponseData = getHttpResponseData(); httpResponseData->onWritable = nullptr; httpResponseData->onAborted = nullptr; + httpResponseData->onTimeout = nullptr; + return this; } + HttpResponse* clearOnAborted() { HttpResponseData *httpResponseData = getHttpResponseData(); httpResponseData->onAborted = nullptr; return this; } + + HttpResponse* clearOnTimeout() { + HttpResponseData *httpResponseData = getHttpResponseData(); + + httpResponseData->onTimeout = nullptr; + return this; + } /* Attach a read handler for data sent. Will be called with FIN set true if last segment. */ void onData(void* userData, HttpResponseData::OnDataCallback handler) { HttpResponseData *data = getHttpResponseData(); diff --git a/packages/bun-uws/src/HttpResponseData.h b/packages/bun-uws/src/HttpResponseData.h index c227c93ef4..9613e84fe4 100644 --- a/packages/bun-uws/src/HttpResponseData.h +++ b/packages/bun-uws/src/HttpResponseData.h @@ -35,9 +35,9 @@ struct HttpResponseData : AsyncSocketData, HttpParser { public: using OnWritableCallback = bool (*)(uWS::HttpResponse*, uint64_t, void*); using OnAbortedCallback = void (*)(uWS::HttpResponse*, void*); + using OnTimeoutCallback = void (*)(uWS::HttpResponse*, void*); using OnDataCallback = void (*)(uWS::HttpResponse* response, const char* chunk, size_t chunk_length, bool, void*); - uint8_t idleTimeout = 10; // default HTTP_TIMEOUT 10 seconds - + /* When we are done with a response we mark it like so */ void markDone() { onAborted = nullptr; @@ -46,6 +46,9 @@ struct HttpResponseData : AsyncSocketData, HttpParser { /* Ignore data after this point */ inStream = nullptr; + // Ensure we don't call a timeout callback + onTimeout = nullptr; + /* We are done with this request */ this->state &= ~HttpResponseData::HTTP_RESPONSE_PENDING; } @@ -69,9 +72,8 @@ struct HttpResponseData : AsyncSocketData, HttpParser { return ret; } - /* Bits of status */ - enum : int32_t{ + enum : uint8_t { HTTP_STATUS_CALLED = 1, // used HTTP_WRITE_CALLED = 2, // used HTTP_END_CALLED = 4, // used @@ -86,6 +88,7 @@ struct HttpResponseData : AsyncSocketData, HttpParser { OnWritableCallback onWritable = nullptr; OnAbortedCallback onAborted = nullptr; OnDataCallback inStream = nullptr; + OnTimeoutCallback onTimeout = nullptr; /* Outgoing offset */ uint64_t offset = 0; @@ -93,7 +96,8 @@ struct HttpResponseData : AsyncSocketData, HttpParser { unsigned int received_bytes_per_timeout = 0; /* Current state (content-length sent, status sent, write called, etc */ - int state = 0; + uint8_t state = 0; + uint8_t idleTimeout = 10; // default HTTP_TIMEOUT 10 seconds #ifdef UWS_WITH_PROXY ProxyParser proxyParser; diff --git a/src/bun.js/api/bun/ssl_wrapper.zig b/src/bun.js/api/bun/ssl_wrapper.zig index 48c95c346d..93b46bb3dc 100644 --- a/src/bun.js/api/bun/ssl_wrapper.zig +++ b/src/bun.js/api/bun/ssl_wrapper.zig @@ -133,6 +133,7 @@ pub fn SSLWrapper(comptime T: type) type { // The peer might continue sending data for some period of time before handling the local application's shutdown indication. // This will start a full shutdown process if fast_shutdown = false, we can assume that the other side will complete the 2-step shutdown ASAP. const ret = BoringSSL.SSL_shutdown(ssl); + // when doing a fast shutdown we don't need to wait for the peer to send a shutdown so we just call SSL_shutdown again if (fast_shutdown) { // This allows for a more rapid shutdown process if the application does not wish to wait for the peer. // This alternative "fast shutdown" approach should only be done if it is known that the peer will not send more data, otherwise there is a risk of an application exposing itself to a truncation attack. diff --git a/src/bun.js/api/server.classes.ts b/src/bun.js/api/server.classes.ts index 3a267f0cfd..9182fa809e 100644 --- a/src/bun.js/api/server.classes.ts +++ b/src/bun.js/api/server.classes.ts @@ -36,6 +36,10 @@ function generate(name) { fn: "doRequestIP", length: 1, }, + timeout: { + fn: "doTimeout", + length: 2, + }, port: { getter: "getPort", }, diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index d57fbee820..cdbdf2c0d5 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -322,6 +322,7 @@ const StaticRoute = struct { fn onResponseComplete(this: *Route, resp: HTTPResponse) void { resp.clearAborted(); resp.clearOnWritable(); + resp.clearTimeout(); if (this.server) |server| { server.onStaticRequestComplete(); @@ -1700,6 +1701,7 @@ fn NewFlags(comptime debug_mode: bool) type { has_marked_complete: bool = false, has_marked_pending: bool = false, has_abort_handler: bool = false, + has_timeout_handler: bool = false, has_sendfile_ctx: bool = false, has_called_error_handler: bool = false, needs_content_length: bool = false, @@ -1741,6 +1743,52 @@ pub const AnyRequestContext = struct { pub fn get(self: AnyRequestContext, comptime T: type) ?*T { return self.tagged_pointer.get(T); } + + pub fn setTimeout(self: AnyRequestContext, seconds: c_uint) bool { + if (self.tagged_pointer.isNull()) { + return false; + } + + switch (self.tagged_pointer.tag()) { + @field(Pointer.Tag, bun.meta.typeBaseName(@typeName(HTTPServer.RequestContext))) => { + return self.tagged_pointer.as(HTTPServer.RequestContext).setTimeout(seconds); + }, + @field(Pointer.Tag, bun.meta.typeBaseName(@typeName(HTTPSServer.RequestContext))) => { + return self.tagged_pointer.as(HTTPSServer.RequestContext).setTimeout(seconds); + }, + @field(Pointer.Tag, bun.meta.typeBaseName(@typeName(DebugHTTPServer.RequestContext))) => { + return self.tagged_pointer.as(DebugHTTPServer.RequestContext).setTimeout(seconds); + }, + @field(Pointer.Tag, bun.meta.typeBaseName(@typeName(DebugHTTPSServer.RequestContext))) => { + return self.tagged_pointer.as(DebugHTTPSServer.RequestContext).setTimeout(seconds); + }, + else => @panic("Unexpected AnyRequestContext tag"), + } + return false; + } + + pub fn enableTimeoutEvents(self: AnyRequestContext) void { + if (self.tagged_pointer.isNull()) { + return; + } + + switch (self.tagged_pointer.tag()) { + @field(Pointer.Tag, bun.meta.typeBaseName(@typeName(HTTPServer.RequestContext))) => { + return self.tagged_pointer.as(HTTPServer.RequestContext).setTimeoutHandler(); + }, + @field(Pointer.Tag, bun.meta.typeBaseName(@typeName(HTTPSServer.RequestContext))) => { + return self.tagged_pointer.as(HTTPSServer.RequestContext).setTimeoutHandler(); + }, + @field(Pointer.Tag, bun.meta.typeBaseName(@typeName(DebugHTTPServer.RequestContext))) => { + return self.tagged_pointer.as(DebugHTTPServer.RequestContext).setTimeoutHandler(); + }, + @field(Pointer.Tag, bun.meta.typeBaseName(@typeName(DebugHTTPSServer.RequestContext))) => { + return self.tagged_pointer.as(DebugHTTPSServer.RequestContext).setTimeoutHandler(); + }, + else => @panic("Unexpected AnyRequestContext tag"), + } + } + pub fn getRemoteSocketInfo(self: AnyRequestContext) ?uws.SocketAddress { if (self.tagged_pointer.isNull()) { return null; @@ -1910,6 +1958,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } + pub fn setTimeoutHandler(this: *RequestContext) void { + if (this.flags.has_timeout_handler) return; + if (this.resp) |resp| { + this.flags.has_timeout_handler = true; + resp.onTimeout(*RequestContext, RequestContext.onTimeout, this); + } + } + pub fn onResolve(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(JSC.conv) JSValue { ctxLog("onResolve", .{}); @@ -2331,12 +2387,35 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctxLog("create ({*})", .{this}); } + pub fn onTimeout(this: *RequestContext, resp: *App.Response) void { + assert(this.resp == resp); + assert(this.server != null); + + var any_js_calls = false; + var vm = this.server.?.vm; + const globalThis = this.server.?.globalThis; + defer { + // This is a task in the event loop. + // If we called into JavaScript, we must drain the microtask queue + if (any_js_calls) { + vm.drainMicrotasks(); + } + } + + if (this.request_weakref.get()) |request| { + if (request.internal_event_callback.trigger(Request.InternalJSEventCallback.EventType.timeout, globalThis)) { + any_js_calls = true; + } + } + } + pub fn onAbort(this: *RequestContext, resp: *App.Response) void { assert(this.resp == resp); assert(!this.flags.aborted); assert(this.server != null); // mark request as aborted this.flags.aborted = true; + this.detachResponse(); var any_js_calls = false; var vm = this.server.?.vm; @@ -2350,6 +2429,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.deref(); } + if (this.request_weakref.get()) |request| { + request.request_context = AnyRequestContext.Null; + if (request.internal_event_callback.trigger(Request.InternalJSEventCallback.EventType.abort, globalThis)) { + any_js_calls = true; + } + // we can already clean this strong refs + request.internal_event_callback.deinit(); + this.request_weakref.deinit(); + } // if signal is not aborted, abort the signal if (this.signal) |signal| { this.signal = null; @@ -2417,6 +2505,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.request_weakref.get()) |request| { request.request_context = AnyRequestContext.Null; + // we can already clean this strong refs + request.internal_event_callback.deinit(); this.request_weakref.deinit(); } @@ -3065,6 +3155,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp resp.clearAborted(); this.flags.has_abort_handler = false; } + if (this.flags.has_timeout_handler) { + resp.clearTimeout(); + this.flags.has_timeout_handler = false; + } } } @@ -4014,6 +4108,27 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp return (this.resp orelse return null).getRemoteSocketInfo(); } + pub fn setTimeout(this: *RequestContext, seconds: c_uint) bool { + if (this.resp) |resp| { + resp.timeout(@min(seconds, 255)); + if (seconds > 0) { + + // we only set the timeout callback if we wanna the timeout event to be triggered + // the connection will be closed so the abort handler will be called after the timeout + if (this.request_weakref.get()) |req| { + if (req.internal_event_callback.hasCallback()) { + this.setTimeoutHandler(); + } + } + } else { + // if the timeout is 0, we don't need to trigger the timeout event + resp.clearTimeout(); + } + return true; + } + return false; + } + pub const Export = shim.exportFunctions(.{ .onResolve = onResolve, .onReject = onReject, @@ -5729,6 +5844,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp pub const doReload = onReload; pub const doFetch = onFetch; pub const doRequestIP = JSC.wrapInstanceMethod(ThisServer, "requestIP", false); + pub const doTimeout = JSC.wrapInstanceMethod(ThisServer, "timeout", false); pub fn doSubscriberCount(this: *ThisServer, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) JSC.JSValue { const arguments = callframe.arguments(1); @@ -5780,6 +5896,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp JSValue.jsNull(); } + pub fn timeout(this: *ThisServer, request: *JSC.WebCore.Request, seconds: JSValue) JSC.JSValue { + if (!seconds.isNumber()) { + this.globalThis.throw("timeout() requires a number", .{}); + return .zero; + } + const value = seconds.to(c_uint); + _ = request.request_context.setTimeout(value); + return JSValue.jsUndefined(); + } + + pub fn setIdleTimeout(this: *ThisServer, seconds: c_uint) void { + this.config.idleTimeout = @truncate(@min(seconds, 255)); + } + pub fn publish(this: *ThisServer, globalThis: *JSC.JSGlobalObject, topic: ZigString, message_value: JSValue, compress_value: ?JSValue, exception: JSC.C.ExceptionRef) JSValue { if (this.config.websocket == null) return JSValue.jsNumber(0); @@ -7110,3 +7240,37 @@ const welcome_page_html_gz = @embedFile("welcome-page.html.gz"); extern fn Bun__addInspector(bool, *anyopaque, *JSC.JSGlobalObject) void; const assert = bun.assert; + +pub export fn Server__setIdleTimeout( + server: JSC.JSValue, + seconds: JSC.JSValue, + globalThis: *JSC.JSGlobalObject, +) void { + if (!server.isObject()) { + globalThis.throw("Failed to set timeout: The 'this' value is not a Server.", .{}); + return; + } + + if (!seconds.isNumber()) { + globalThis.throw("Failed to set timeout: The provided value is not of type 'number'.", .{}); + return; + } + const value = seconds.to(c_uint); + if (server.as(HTTPServer)) |this| { + this.setIdleTimeout(value); + } else if (server.as(HTTPSServer)) |this| { + this.setIdleTimeout(value); + } else if (server.as(DebugHTTPServer)) |this| { + this.setIdleTimeout(value); + } else if (server.as(DebugHTTPSServer)) |this| { + this.setIdleTimeout(value); + } else { + globalThis.throw("Failed to set timeout: The 'this' value is not a Server.", .{}); + } +} + +comptime { + if (!JSC.is_bindgen) { + _ = Server__setIdleTimeout; + } +} diff --git a/src/bun.js/bindings/NodeHTTP.cpp b/src/bun.js/bindings/NodeHTTP.cpp index 2f5c810ebf..8a7b04ad5e 100644 --- a/src/bun.js/bindings/NodeHTTP.cpp +++ b/src/bun.js/bindings/NodeHTTP.cpp @@ -22,7 +22,9 @@ using namespace JSC; using namespace WebCore; extern "C" uWS::HttpRequest* Request__getUWSRequest(void*); - +extern "C" void Request__setInternalEventCallback(void*, EncodedJSValue, JSC::JSGlobalObject*); +extern "C" void Request__setTimeout(void*, EncodedJSValue, JSC::JSGlobalObject*); +extern "C" void Server__setIdleTimeout(EncodedJSValue, EncodedJSValue, JSC::JSGlobalObject*); static EncodedJSValue assignHeadersFromFetchHeaders(FetchHeaders& impl, JSObject* prototype, JSObject* objectValue, JSC::InternalFieldTuple* tuple, JSC::JSGlobalObject* globalObject, JSC::VM& vm) { auto scope = DECLARE_THROW_SCOPE(vm); @@ -322,6 +324,57 @@ JSC_DEFINE_HOST_FUNCTION(jsHTTPAssignHeaders, (JSGlobalObject * globalObject, Ca return JSValue::encode(jsNull()); } +JSC_DEFINE_HOST_FUNCTION(jsHTTPAssignEventCallback, (JSGlobalObject * globalObject, CallFrame* callFrame)) +{ + auto& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + + // This is an internal binding. + JSValue requestValue = callFrame->uncheckedArgument(0); + JSValue callback = callFrame->uncheckedArgument(1); + + ASSERT(callFrame->argumentCount() == 2); + + if (auto* jsRequest = jsDynamicCast(requestValue)) { + Request__setInternalEventCallback(jsRequest->wrapped(), JSValue::encode(callback), globalObject); + } + + return JSValue::encode(jsNull()); +} + +JSC_DEFINE_HOST_FUNCTION(jsHTTPSetTimeout, (JSGlobalObject * globalObject, CallFrame* callFrame)) +{ + auto& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + + // This is an internal binding. + JSValue requestValue = callFrame->uncheckedArgument(0); + JSValue seconds = callFrame->uncheckedArgument(1); + + ASSERT(callFrame->argumentCount() == 2); + + if (auto* jsRequest = jsDynamicCast(requestValue)) { + Request__setTimeout(jsRequest->wrapped(), JSValue::encode(seconds), globalObject); + } + + return JSValue::encode(jsUndefined()); +} +JSC_DEFINE_HOST_FUNCTION(jsHTTPSetServerIdleTimeout, (JSGlobalObject * globalObject, CallFrame* callFrame)) +{ + auto& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + + // This is an internal binding. + JSValue serverValue = callFrame->uncheckedArgument(0); + JSValue seconds = callFrame->uncheckedArgument(1); + + ASSERT(callFrame->argumentCount() == 2); + + Server__setIdleTimeout(JSValue::encode(serverValue), JSValue::encode(seconds), globalObject); + + return JSValue::encode(jsUndefined()); +} + JSC_DEFINE_HOST_FUNCTION(jsHTTPGetHeader, (JSGlobalObject * globalObject, CallFrame* callFrame)) { auto& vm = globalObject->vm(); @@ -418,6 +471,17 @@ JSValue createNodeHTTPInternalBinding(Zig::GlobalObject* globalObject) obj->putDirect( vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "assignHeaders"_s)), JSC::JSFunction::create(vm, globalObject, 2, "assignHeaders"_s, jsHTTPAssignHeaders, ImplementationVisibility::Public), 0); + obj->putDirect( + vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "assignEventCallback"_s)), + JSC::JSFunction::create(vm, globalObject, 2, "assignEventCallback"_s, jsHTTPAssignEventCallback, ImplementationVisibility::Public), 0); + + obj->putDirect( + vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setRequestTimeout"_s)), + JSC::JSFunction::create(vm, globalObject, 2, "setRequestTimeout"_s, jsHTTPSetTimeout, ImplementationVisibility::Public), 0); + + obj->putDirect( + vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setServerIdleTimeout"_s)), + JSC::JSFunction::create(vm, globalObject, 2, "setServerIdleTimeout"_s, jsHTTPSetServerIdleTimeout, ImplementationVisibility::Public), 0); obj->putDirect( vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "Response"_s)), globalObject->JSResponseConstructor(), 0); diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig index 7898efa533..39264d5bef 100644 --- a/src/bun.js/webcore/request.zig +++ b/src/bun.js/webcore/request.zig @@ -62,6 +62,7 @@ pub const Request = struct { weak_ptr_data: bun.WeakPtrData = .{}, // We must report a consistent value for this reported_estimated_size: usize = 0, + internal_event_callback: InternalJSEventCallback = .{}, const RequestMixin = BodyMixin(@This()); pub usingnamespace JSC.Codegen.JSRequest; @@ -84,12 +85,70 @@ pub const Request = struct { return this.request_context.getRequest(); } + pub export fn Request__setInternalEventCallback( + this: *Request, + callback: JSC.JSValue, + globalThis: *JSC.JSGlobalObject, + ) void { + this.internal_event_callback = InternalJSEventCallback.init(callback, globalThis); + // we always have the abort event but we need to enable the timeout event as well in case of `node:http`.Server.setTimeout is set + this.request_context.enableTimeoutEvents(); + } + + pub export fn Request__setTimeout( + this: *Request, + seconds: JSC.JSValue, + globalThis: *JSC.JSGlobalObject, + ) void { + if (!seconds.isNumber()) { + globalThis.throw("Failed to set timeout: The provided value is not of type 'number'.", .{}); + return; + } + + this.setTimeout(seconds.to(c_uint)); + } + comptime { if (!JSC.is_bindgen) { _ = Request__getUWSRequest; + _ = Request__setInternalEventCallback; + _ = Request__setTimeout; } } + pub const InternalJSEventCallback = struct { + function: JSC.Strong = .{}, + + pub const EventType = enum(u8) { + timeout = 0, + abort = 1, + }; + pub fn init(function: JSC.JSValue, globalThis: *JSC.JSGlobalObject) InternalJSEventCallback { + return InternalJSEventCallback{ + .function = JSC.Strong.create(function, globalThis), + }; + } + + pub fn hasCallback(this: *InternalJSEventCallback) bool { + return this.function.has(); + } + + pub fn trigger(this: *InternalJSEventCallback, eventType: EventType, globalThis: *JSC.JSGlobalObject) bool { + if (this.function.get()) |callback| { + const result = callback.call(globalThis, JSC.JSValue.jsUndefined(), &.{JSC.JSValue.jsNumber(@intFromEnum(eventType))}); + if (result.toError()) |js_error| { + globalThis.throwValue(js_error); + } + return true; + } + return false; + } + + pub fn deinit(this: *InternalJSEventCallback) void { + this.function.deinit(); + } + }; + pub fn init( url: bun.String, headers: ?*FetchHeaders, @@ -290,6 +349,7 @@ pub const Request = struct { signal.unref(); this.signal = null; } + this.internal_event_callback.deinit(); } pub fn finalize(this: *Request) void { @@ -907,4 +967,11 @@ pub const Request = struct { this.cloneInto(req, allocator, globalThis, false); return req; } + + pub fn setTimeout( + this: *Request, + seconds: c_uint, + ) void { + _ = this.request_context.setTimeout(seconds); + } }; diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index 540ee430c3..bc9ff248f8 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1338,6 +1338,38 @@ extern "C" } } + void uws_res_on_timeout(int ssl, uws_res_r res, + void (*handler)(uws_res_r res, void *opcional_data), + void *opcional_data) + { + if (ssl) + { + uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; + auto* onTimeout = reinterpret_cast*, void*)>(handler); + if (handler) + { + uwsRes->onTimeout(opcional_data, onTimeout); + } + else + { + uwsRes->clearOnTimeout(); + } + } + else + { + uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; + auto* onTimeout = reinterpret_cast*, void*)>(handler); + if (handler) + { + uwsRes->onTimeout(opcional_data, onTimeout); + } + else + { + uwsRes->clearOnTimeout(); + } + } + } + void uws_res_on_data(int ssl, uws_res_r res, void (*handler)(uws_res_r res, const char *chunk, size_t chunk_length, bool is_end, diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 2824c3a7eb..c661645e8d 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -2555,6 +2555,12 @@ pub const AnyResponse = union(enum) { .TCP => |resp| resp.clearAborted(), }; } + pub fn clearTimeout(this: AnyResponse) void { + return switch (this) { + .SSL => |resp| resp.clearTimeout(), + .TCP => |resp| resp.clearTimeout(), + }; + } pub fn clearOnWritable(this: AnyResponse) void { return switch (this) { @@ -3076,7 +3082,22 @@ pub fn NewApp(comptime ssl: bool) type { pub fn clearAborted(res: *Response) void { uws_res_on_aborted(ssl_flag, res.downcast(), null, null); } + pub fn onTimeout(res: *Response, comptime UserDataType: type, comptime handler: fn (UserDataType, *Response) void, opcional_data: UserDataType) void { + const Wrapper = struct { + pub fn handle(this: *uws_res, user_data: ?*anyopaque) callconv(.C) void { + if (comptime UserDataType == void) { + @call(bun.callmod_inline, handler, .{ {}, castRes(this), {} }); + } else { + @call(bun.callmod_inline, handler, .{ @as(UserDataType, @ptrCast(@alignCast(user_data.?))), castRes(this) }); + } + } + }; + uws_res_on_timeout(ssl_flag, res.downcast(), Wrapper.handle, opcional_data); + } + pub fn clearTimeout(res: *Response) void { + uws_res_on_timeout(ssl_flag, res.downcast(), null, null); + } pub fn clearOnData(res: *Response) void { uws_res_on_data(ssl_flag, res.downcast(), null, null); } @@ -3400,6 +3421,8 @@ extern fn uws_res_has_responded(ssl: i32, res: *uws_res) bool; extern fn uws_res_on_writable(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, u64, ?*anyopaque) callconv(.C) bool, user_data: ?*anyopaque) void; extern fn uws_res_clear_on_writable(ssl: i32, res: *uws_res) void; extern fn uws_res_on_aborted(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, ?*anyopaque) callconv(.C) void, opcional_data: ?*anyopaque) void; +extern fn uws_res_on_timeout(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, ?*anyopaque) callconv(.C) void, opcional_data: ?*anyopaque) void; + extern fn uws_res_on_data( ssl: i32, res: *uws_res, @@ -3481,7 +3504,7 @@ extern fn us_socket_mark_needs_more_not_ssl(socket: ?*uws_res) void; extern fn uws_res_state(ssl: c_int, res: *const uws_res) State; -pub const State = enum(i32) { +pub const State = enum(u8) { HTTP_STATUS_CALLED = 1, HTTP_WRITE_CALLED = 2, HTTP_END_CALLED = 4, diff --git a/src/js/node/http.ts b/src/js/node/http.ts index dcce2a1f56..0d72ccde78 100644 --- a/src/js/node/http.ts +++ b/src/js/node/http.ts @@ -11,6 +11,9 @@ const { getHeader, setHeader, assignHeaders: assignHeadersFast, + assignEventCallback, + setRequestTimeout, + setServerIdleTimeout, Response, Request, Headers, @@ -20,6 +23,9 @@ const { getHeader: (headers: Headers, name: string) => string | undefined; setHeader: (headers: Headers, name: string, value: string) => void; assignHeaders: (object: any, req: Request, headersTuple: any) => boolean; + assignEventCallback: (req: Request, callback: (event: number) => void) => void; + setRequestTimeout: (req: Request, timeout: number) => void; + setServerIdleTimeout: (server: any, timeout: number) => void; Response: (typeof globalThis)["Response"]; Request: (typeof globalThis)["Request"]; Headers: (typeof globalThis)["Headers"]; @@ -233,6 +239,11 @@ var FakeSocket = class Socket extends Duplex { } setTimeout(timeout, callback) { + const socketData = this[kInternalSocketData]; + if (!socketData) return; // sometimes 'this' is Socket not FakeSocket + + const [server, http_res, req] = socketData; + http_res?.req?.setTimeout(timeout, callback); return this; } @@ -421,6 +432,23 @@ function Server(options, callback) { return this; } +function onRequestEvent(event) { + const [server, http_res, req] = this.socket[kInternalSocketData]; + if (!http_res[finishedSymbol]) { + switch (event) { + case 0: // timeout + this.emit("timeout"); + server.emit("timeout", req.socket); + break; + case 1: // abort + this.complete = true; + this.emit("close"); + http_res[finishedSymbol] = true; + break; + } + } +} + Server.prototype = { ref() { this._unref = false; @@ -584,6 +612,7 @@ Server.prototype = { this.serverName = tls.serverName || host || "localhost"; } this[serverSymbol] = Bun.serve({ + idleTimeout: 0, // nodejs dont have a idleTimeout by default tls, port, hostname: host, @@ -636,6 +665,7 @@ Server.prototype = { const prevIsNextIncomingMessageHTTPS = isNextIncomingMessageHTTPS; isNextIncomingMessageHTTPS = isHTTPS; const http_req = new RequestClass(req); + assignEventCallback(req, onRequestEvent.bind(http_req)); isNextIncomingMessageHTTPS = prevIsNextIncomingMessageHTTPS; const upgrade = http_req.headers.upgrade; @@ -683,7 +713,11 @@ Server.prototype = { }, setTimeout(msecs, callback) { - // TODO: + const server = this[serverSymbol]; + if (server) { + setServerIdleTimeout(server, Math.ceil(msecs / 1000)); + typeof callback === "function" && this.once("timeout", callback); + } return this; }, @@ -770,6 +804,7 @@ function IncomingMessage(req, defaultIncomingOpts) { this._dumped = false; this[noBodySymbol] = false; this[abortedSymbol] = false; + this.complete = false; Readable.$call(this); var { type = "request", [kInternalRequest]: nodeReq } = defaultIncomingOpts || {}; @@ -799,8 +834,6 @@ function IncomingMessage(req, defaultIncomingOpts) { type === "request" // TODO: Add logic for checking for body on response ? requestHasNoBody(this.method, this) : false; - - this.complete = !!this[noBodySymbol]; } IncomingMessage.prototype = { @@ -920,7 +953,11 @@ IncomingMessage.prototype = { // noop }, setTimeout(msecs, callback) { - // noop + const req = this[reqSymbol]; + if (req) { + setRequestTimeout(req, Math.ceil(msecs / 1000)); + typeof callback === "function" && this.once("timeout", callback); + } return this; }, get socket() { @@ -1145,6 +1182,10 @@ function emitCloseNT(self) { } } +function emitRequestCloseNT(self) { + self.emit("close"); +} + function onServerResponseClose() { // EventEmitter.emit makes a copy of the 'close' listeners array before // calling the listeners. detachSocket() unregisters onServerResponseClose @@ -1276,6 +1317,9 @@ function drainHeadersIfObservable() { } ServerResponse.prototype._final = function (callback) { + const req = this.req; + const shouldEmitClose = req && req.emit && !this[finishedSymbol]; + if (!this.headersSent) { var data = this[firstWriteSymbol] || ""; this[firstWriteSymbol] = undefined; @@ -1288,6 +1332,10 @@ ServerResponse.prototype._final = function (callback) { statusText: this.statusMessage ?? STATUS_CODES[this.statusCode], }), ); + if (shouldEmitClose) { + req.complete = true; + process.nextTick(emitRequestCloseNT, req); + } callback && callback(); return; } @@ -1295,7 +1343,10 @@ ServerResponse.prototype._final = function (callback) { this[finishedSymbol] = true; ensureReadableStreamController.$call(this, controller => { controller.end(); - + if (shouldEmitClose) { + req.complete = true; + process.nextTick(emitRequestCloseNT, req); + } callback(); const deferred = this[deferredSymbol]; if (deferred) { @@ -2157,6 +2208,10 @@ function _writeHead(statusCode, reason, obj, response) { // consisting only of the Status-Line and optional headers, and is // terminated by an empty line. response._hasBody = false; + const req = response.req; + if (req) { + req.complete = true; + } } } diff --git a/test/js/bun/http/serve.test.ts b/test/js/bun/http/serve.test.ts index 8da36c9277..1376c0a1c1 100644 --- a/test/js/bun/http/serve.test.ts +++ b/test/js/bun/http/serve.test.ts @@ -2038,3 +2038,20 @@ it("allow requestIP after async operation", async () => { expect(ip.address).toBeString(); expect(ip.family).toBeString(); }); + +it("allow custom timeout per request", async () => { + using server = Bun.serve({ + idleTimeout: 1, + port: 0, + async fetch(req, server) { + server.timeout(req, 60); + await Bun.sleep(10000); //uWS precision is not great + + return new Response("Hello, World!"); + }, + }); + expect(server.timeout).toBeFunction(); + const res = await fetch(new URL("/long-timeout", server.url.origin)); + expect(res.status).toBe(200); + expect(res.text()).resolves.toBe("Hello, World!"); +}, 20_000); diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index a2022c8180..fd55bf53ef 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -3,7 +3,7 @@ import { bunExe } from "bun:harness"; import { bunEnv, randomPort } from "harness"; import { createTest } from "node-harness"; import { spawnSync } from "node:child_process"; -import { EventEmitter } from "node:events"; +import { EventEmitter, once } from "node:events"; import nodefs, { unlinkSync } from "node:fs"; import http, { Agent, @@ -2150,51 +2150,6 @@ it("should error with faulty args", async () => { server.close(); }); -it("should mark complete true", async () => { - const { promise: serve, resolve: resolveServe } = Promise.withResolvers(); - const server = createServer(async (req, res) => { - let count = 0; - let data = ""; - req.on("data", chunk => { - data += chunk.toString(); - }); - while (!req.complete) { - await Bun.sleep(100); - count++; - if (count > 10) { - res.writeHead(500, { "Content-Type": "text/plain" }); - res.end("Request timeout"); - return; - } - } - res.writeHead(200, { "Content-Type": "text/plain" }); - res.end(data); - }); - - server.listen(0, () => { - resolveServe(`http://localhost:${server.address().port}`); - }); - - const url = await serve; - try { - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - name: "Hotel 1", - price: 100, - }), - }); - - expect(response.status).toBe(200); - expect(await response.text()).toBe('{"name":"Hotel 1","price":100}'); - } finally { - server.close(); - } -}); - it("should propagate exception in sync data handler", async () => { const { exitCode, stdout } = Bun.spawnSync({ cmd: [bunExe(), "run", path.join(import.meta.dir, "node-http-error-in-data-handler-fixture.1.js")], @@ -2365,3 +2320,86 @@ it("using node:http to do https: request fails", () => { message: `Protocol "https:" not supported. Expected "http:"`, }); }); + +it("should emit close, and complete should be true only after close #13373", async () => { + const server = http.createServer().listen(0); + try { + await once(server, "listening"); + fetch(`http://localhost:${server.address().port}`) + .then(res => res.text()) + .catch(() => {}); + + const [req, res] = await once(server, "request"); + expect(req.complete).toBe(false); + const closeEvent = once(req, "close"); + res.end("hi"); + + await closeEvent; + expect(req.complete).toBe(true); + } finally { + server.closeAllConnections(); + } +}); + +it("should emit close when connection is aborted", async () => { + const server = http.createServer().listen(0); + try { + await once(server, "listening"); + const controller = new AbortController(); + fetch(`http://localhost:${server.address().port}`, { signal: controller.signal }) + .then(res => res.text()) + .catch(() => {}); + + const [req, res] = await once(server, "request"); + expect(req.complete).toBe(false); + const closeEvent = once(req, "close"); + controller.abort(); + await closeEvent; + expect(req.complete).toBe(true); + } finally { + server.close(); + } +}); + +it("should emit timeout event", async () => { + const server = http.createServer().listen(0); + try { + await once(server, "listening"); + fetch(`http://localhost:${server.address().port}`) + .then(res => res.text()) + .catch(() => {}); + + const [req, res] = await once(server, "request"); + expect(req.complete).toBe(false); + let callBackCalled = false; + req.setTimeout(1000, () => { + callBackCalled = true; + }); + await once(req, "timeout"); + expect(callBackCalled).toBe(true); + } finally { + server.closeAllConnections(); + } +}, 12_000); + +it("should emit timeout event when using server.setTimeout", async () => { + const server = http.createServer().listen(0); + try { + await once(server, "listening"); + let callBackCalled = false; + server.setTimeout(1000, () => { + callBackCalled = true; + }); + fetch(`http://localhost:${server.address().port}`) + .then(res => res.text()) + .catch(() => {}); + + const [req, res] = await once(server, "request"); + expect(req.complete).toBe(false); + + await once(server, "timeout"); + expect(callBackCalled).toBe(true); + } finally { + server.closeAllConnections(); + } +}, 12_000);