fix(node:http) implement request.setTimeout and server.setTimeout (#13772)

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
This commit is contained in:
Ciro Spaciari
2024-09-06 17:52:38 -07:00
committed by GitHub
parent cbb57e5c5b
commit 1011b44d78
14 changed files with 567 additions and 58 deletions

View File

@@ -2811,6 +2811,21 @@ declare module "bun" {
*/
requestIP(request: Request): SocketAddress | null;
/**
* Reset the idleTimeout of the given Request to the number in seconds. 0 means no timeout.
*
* @example
* ```js
* export default {
* async fetch(request, server) {
* server.timeout(request, 60);
* await Bun.sleep(30000);
* return new Response("30 seconds have passed");
* }
* }
* ```
*/
timeout(request: Request, seconds: number): void;
/**
* Undo a call to {@link Server.unref}
*

View File

@@ -415,6 +415,12 @@ private:
/* Force close rather than gracefully shutdown and risk confusing the client with a complete download */
AsyncSocket<SSL> *asyncSocket = (AsyncSocket<SSL> *) s;
// Node.js by default sclose the connection but they emit the timeout event before that
HttpResponseData<SSL> *httpResponseData = (HttpResponseData<SSL> *) asyncSocket->getAsyncSocketData();
if (httpResponseData->onTimeout) {
httpResponseData->onTimeout((HttpResponse<SSL> *)s, httpResponseData->userData);
}
return asyncSocket->close();
});

View File

@@ -586,19 +586,38 @@ public:
httpResponseData->onAborted = handler;
return this;
}
HttpResponse *onTimeout(void* userData, HttpResponseData<SSL>::OnTimeoutCallback handler) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
httpResponseData->userData = userData;
httpResponseData->onTimeout = handler;
return this;
}
HttpResponse* clearOnWritableAndAborted() {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
httpResponseData->onWritable = nullptr;
httpResponseData->onAborted = nullptr;
httpResponseData->onTimeout = nullptr;
return this;
}
HttpResponse* clearOnAborted() {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
httpResponseData->onAborted = nullptr;
return this;
}
HttpResponse* clearOnTimeout() {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
httpResponseData->onTimeout = nullptr;
return this;
}
/* Attach a read handler for data sent. Will be called with FIN set true if last segment. */
void onData(void* userData, HttpResponseData<SSL>::OnDataCallback handler) {
HttpResponseData<SSL> *data = getHttpResponseData();

View File

@@ -35,9 +35,9 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
public:
using OnWritableCallback = bool (*)(uWS::HttpResponse<SSL>*, uint64_t, void*);
using OnAbortedCallback = void (*)(uWS::HttpResponse<SSL>*, void*);
using OnTimeoutCallback = void (*)(uWS::HttpResponse<SSL>*, void*);
using OnDataCallback = void (*)(uWS::HttpResponse<SSL>* response, const char* chunk, size_t chunk_length, bool, void*);
uint8_t idleTimeout = 10; // default HTTP_TIMEOUT 10 seconds
/* When we are done with a response we mark it like so */
void markDone() {
onAborted = nullptr;
@@ -46,6 +46,9 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
/* Ignore data after this point */
inStream = nullptr;
// Ensure we don't call a timeout callback
onTimeout = nullptr;
/* We are done with this request */
this->state &= ~HttpResponseData<SSL>::HTTP_RESPONSE_PENDING;
}
@@ -69,9 +72,8 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
return ret;
}
/* Bits of status */
enum : int32_t{
enum : uint8_t {
HTTP_STATUS_CALLED = 1, // used
HTTP_WRITE_CALLED = 2, // used
HTTP_END_CALLED = 4, // used
@@ -86,6 +88,7 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
OnWritableCallback onWritable = nullptr;
OnAbortedCallback onAborted = nullptr;
OnDataCallback inStream = nullptr;
OnTimeoutCallback onTimeout = nullptr;
/* Outgoing offset */
uint64_t offset = 0;
@@ -93,7 +96,8 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
unsigned int received_bytes_per_timeout = 0;
/* Current state (content-length sent, status sent, write called, etc */
int state = 0;
uint8_t state = 0;
uint8_t idleTimeout = 10; // default HTTP_TIMEOUT 10 seconds
#ifdef UWS_WITH_PROXY
ProxyParser proxyParser;

View File

@@ -133,6 +133,7 @@ pub fn SSLWrapper(comptime T: type) type {
// The peer might continue sending data for some period of time before handling the local application's shutdown indication.
// This will start a full shutdown process if fast_shutdown = false, we can assume that the other side will complete the 2-step shutdown ASAP.
const ret = BoringSSL.SSL_shutdown(ssl);
// when doing a fast shutdown we don't need to wait for the peer to send a shutdown so we just call SSL_shutdown again
if (fast_shutdown) {
// This allows for a more rapid shutdown process if the application does not wish to wait for the peer.
// This alternative "fast shutdown" approach should only be done if it is known that the peer will not send more data, otherwise there is a risk of an application exposing itself to a truncation attack.

View File

@@ -36,6 +36,10 @@ function generate(name) {
fn: "doRequestIP",
length: 1,
},
timeout: {
fn: "doTimeout",
length: 2,
},
port: {
getter: "getPort",
},

View File

@@ -322,6 +322,7 @@ const StaticRoute = struct {
fn onResponseComplete(this: *Route, resp: HTTPResponse) void {
resp.clearAborted();
resp.clearOnWritable();
resp.clearTimeout();
if (this.server) |server| {
server.onStaticRequestComplete();
@@ -1700,6 +1701,7 @@ fn NewFlags(comptime debug_mode: bool) type {
has_marked_complete: bool = false,
has_marked_pending: bool = false,
has_abort_handler: bool = false,
has_timeout_handler: bool = false,
has_sendfile_ctx: bool = false,
has_called_error_handler: bool = false,
needs_content_length: bool = false,
@@ -1741,6 +1743,52 @@ pub const AnyRequestContext = struct {
pub fn get(self: AnyRequestContext, comptime T: type) ?*T {
return self.tagged_pointer.get(T);
}
pub fn setTimeout(self: AnyRequestContext, seconds: c_uint) bool {
if (self.tagged_pointer.isNull()) {
return false;
}
switch (self.tagged_pointer.tag()) {
@field(Pointer.Tag, bun.meta.typeBaseName(@typeName(HTTPServer.RequestContext))) => {
return self.tagged_pointer.as(HTTPServer.RequestContext).setTimeout(seconds);
},
@field(Pointer.Tag, bun.meta.typeBaseName(@typeName(HTTPSServer.RequestContext))) => {
return self.tagged_pointer.as(HTTPSServer.RequestContext).setTimeout(seconds);
},
@field(Pointer.Tag, bun.meta.typeBaseName(@typeName(DebugHTTPServer.RequestContext))) => {
return self.tagged_pointer.as(DebugHTTPServer.RequestContext).setTimeout(seconds);
},
@field(Pointer.Tag, bun.meta.typeBaseName(@typeName(DebugHTTPSServer.RequestContext))) => {
return self.tagged_pointer.as(DebugHTTPSServer.RequestContext).setTimeout(seconds);
},
else => @panic("Unexpected AnyRequestContext tag"),
}
return false;
}
pub fn enableTimeoutEvents(self: AnyRequestContext) void {
if (self.tagged_pointer.isNull()) {
return;
}
switch (self.tagged_pointer.tag()) {
@field(Pointer.Tag, bun.meta.typeBaseName(@typeName(HTTPServer.RequestContext))) => {
return self.tagged_pointer.as(HTTPServer.RequestContext).setTimeoutHandler();
},
@field(Pointer.Tag, bun.meta.typeBaseName(@typeName(HTTPSServer.RequestContext))) => {
return self.tagged_pointer.as(HTTPSServer.RequestContext).setTimeoutHandler();
},
@field(Pointer.Tag, bun.meta.typeBaseName(@typeName(DebugHTTPServer.RequestContext))) => {
return self.tagged_pointer.as(DebugHTTPServer.RequestContext).setTimeoutHandler();
},
@field(Pointer.Tag, bun.meta.typeBaseName(@typeName(DebugHTTPSServer.RequestContext))) => {
return self.tagged_pointer.as(DebugHTTPSServer.RequestContext).setTimeoutHandler();
},
else => @panic("Unexpected AnyRequestContext tag"),
}
}
pub fn getRemoteSocketInfo(self: AnyRequestContext) ?uws.SocketAddress {
if (self.tagged_pointer.isNull()) {
return null;
@@ -1910,6 +1958,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
pub fn setTimeoutHandler(this: *RequestContext) void {
if (this.flags.has_timeout_handler) return;
if (this.resp) |resp| {
this.flags.has_timeout_handler = true;
resp.onTimeout(*RequestContext, RequestContext.onTimeout, this);
}
}
pub fn onResolve(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(JSC.conv) JSValue {
ctxLog("onResolve", .{});
@@ -2331,12 +2387,35 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctxLog("create<d> ({*})<r>", .{this});
}
pub fn onTimeout(this: *RequestContext, resp: *App.Response) void {
assert(this.resp == resp);
assert(this.server != null);
var any_js_calls = false;
var vm = this.server.?.vm;
const globalThis = this.server.?.globalThis;
defer {
// This is a task in the event loop.
// If we called into JavaScript, we must drain the microtask queue
if (any_js_calls) {
vm.drainMicrotasks();
}
}
if (this.request_weakref.get()) |request| {
if (request.internal_event_callback.trigger(Request.InternalJSEventCallback.EventType.timeout, globalThis)) {
any_js_calls = true;
}
}
}
pub fn onAbort(this: *RequestContext, resp: *App.Response) void {
assert(this.resp == resp);
assert(!this.flags.aborted);
assert(this.server != null);
// mark request as aborted
this.flags.aborted = true;
this.detachResponse();
var any_js_calls = false;
var vm = this.server.?.vm;
@@ -2350,6 +2429,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.deref();
}
if (this.request_weakref.get()) |request| {
request.request_context = AnyRequestContext.Null;
if (request.internal_event_callback.trigger(Request.InternalJSEventCallback.EventType.abort, globalThis)) {
any_js_calls = true;
}
// we can already clean this strong refs
request.internal_event_callback.deinit();
this.request_weakref.deinit();
}
// if signal is not aborted, abort the signal
if (this.signal) |signal| {
this.signal = null;
@@ -2417,6 +2505,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.request_weakref.get()) |request| {
request.request_context = AnyRequestContext.Null;
// we can already clean this strong refs
request.internal_event_callback.deinit();
this.request_weakref.deinit();
}
@@ -3065,6 +3155,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
resp.clearAborted();
this.flags.has_abort_handler = false;
}
if (this.flags.has_timeout_handler) {
resp.clearTimeout();
this.flags.has_timeout_handler = false;
}
}
}
@@ -4014,6 +4108,27 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return (this.resp orelse return null).getRemoteSocketInfo();
}
pub fn setTimeout(this: *RequestContext, seconds: c_uint) bool {
if (this.resp) |resp| {
resp.timeout(@min(seconds, 255));
if (seconds > 0) {
// we only set the timeout callback if we wanna the timeout event to be triggered
// the connection will be closed so the abort handler will be called after the timeout
if (this.request_weakref.get()) |req| {
if (req.internal_event_callback.hasCallback()) {
this.setTimeoutHandler();
}
}
} else {
// if the timeout is 0, we don't need to trigger the timeout event
resp.clearTimeout();
}
return true;
}
return false;
}
pub const Export = shim.exportFunctions(.{
.onResolve = onResolve,
.onReject = onReject,
@@ -5729,6 +5844,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
pub const doReload = onReload;
pub const doFetch = onFetch;
pub const doRequestIP = JSC.wrapInstanceMethod(ThisServer, "requestIP", false);
pub const doTimeout = JSC.wrapInstanceMethod(ThisServer, "timeout", false);
pub fn doSubscriberCount(this: *ThisServer, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) JSC.JSValue {
const arguments = callframe.arguments(1);
@@ -5780,6 +5896,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
JSValue.jsNull();
}
pub fn timeout(this: *ThisServer, request: *JSC.WebCore.Request, seconds: JSValue) JSC.JSValue {
if (!seconds.isNumber()) {
this.globalThis.throw("timeout() requires a number", .{});
return .zero;
}
const value = seconds.to(c_uint);
_ = request.request_context.setTimeout(value);
return JSValue.jsUndefined();
}
pub fn setIdleTimeout(this: *ThisServer, seconds: c_uint) void {
this.config.idleTimeout = @truncate(@min(seconds, 255));
}
pub fn publish(this: *ThisServer, globalThis: *JSC.JSGlobalObject, topic: ZigString, message_value: JSValue, compress_value: ?JSValue, exception: JSC.C.ExceptionRef) JSValue {
if (this.config.websocket == null)
return JSValue.jsNumber(0);
@@ -7110,3 +7240,37 @@ const welcome_page_html_gz = @embedFile("welcome-page.html.gz");
extern fn Bun__addInspector(bool, *anyopaque, *JSC.JSGlobalObject) void;
const assert = bun.assert;
pub export fn Server__setIdleTimeout(
server: JSC.JSValue,
seconds: JSC.JSValue,
globalThis: *JSC.JSGlobalObject,
) void {
if (!server.isObject()) {
globalThis.throw("Failed to set timeout: The 'this' value is not a Server.", .{});
return;
}
if (!seconds.isNumber()) {
globalThis.throw("Failed to set timeout: The provided value is not of type 'number'.", .{});
return;
}
const value = seconds.to(c_uint);
if (server.as(HTTPServer)) |this| {
this.setIdleTimeout(value);
} else if (server.as(HTTPSServer)) |this| {
this.setIdleTimeout(value);
} else if (server.as(DebugHTTPServer)) |this| {
this.setIdleTimeout(value);
} else if (server.as(DebugHTTPSServer)) |this| {
this.setIdleTimeout(value);
} else {
globalThis.throw("Failed to set timeout: The 'this' value is not a Server.", .{});
}
}
comptime {
if (!JSC.is_bindgen) {
_ = Server__setIdleTimeout;
}
}

View File

@@ -22,7 +22,9 @@ using namespace JSC;
using namespace WebCore;
extern "C" uWS::HttpRequest* Request__getUWSRequest(void*);
extern "C" void Request__setInternalEventCallback(void*, EncodedJSValue, JSC::JSGlobalObject*);
extern "C" void Request__setTimeout(void*, EncodedJSValue, JSC::JSGlobalObject*);
extern "C" void Server__setIdleTimeout(EncodedJSValue, EncodedJSValue, JSC::JSGlobalObject*);
static EncodedJSValue assignHeadersFromFetchHeaders(FetchHeaders& impl, JSObject* prototype, JSObject* objectValue, JSC::InternalFieldTuple* tuple, JSC::JSGlobalObject* globalObject, JSC::VM& vm)
{
auto scope = DECLARE_THROW_SCOPE(vm);
@@ -322,6 +324,57 @@ JSC_DEFINE_HOST_FUNCTION(jsHTTPAssignHeaders, (JSGlobalObject * globalObject, Ca
return JSValue::encode(jsNull());
}
JSC_DEFINE_HOST_FUNCTION(jsHTTPAssignEventCallback, (JSGlobalObject * globalObject, CallFrame* callFrame))
{
auto& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
// This is an internal binding.
JSValue requestValue = callFrame->uncheckedArgument(0);
JSValue callback = callFrame->uncheckedArgument(1);
ASSERT(callFrame->argumentCount() == 2);
if (auto* jsRequest = jsDynamicCast<WebCore::JSRequest*>(requestValue)) {
Request__setInternalEventCallback(jsRequest->wrapped(), JSValue::encode(callback), globalObject);
}
return JSValue::encode(jsNull());
}
JSC_DEFINE_HOST_FUNCTION(jsHTTPSetTimeout, (JSGlobalObject * globalObject, CallFrame* callFrame))
{
auto& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
// This is an internal binding.
JSValue requestValue = callFrame->uncheckedArgument(0);
JSValue seconds = callFrame->uncheckedArgument(1);
ASSERT(callFrame->argumentCount() == 2);
if (auto* jsRequest = jsDynamicCast<WebCore::JSRequest*>(requestValue)) {
Request__setTimeout(jsRequest->wrapped(), JSValue::encode(seconds), globalObject);
}
return JSValue::encode(jsUndefined());
}
JSC_DEFINE_HOST_FUNCTION(jsHTTPSetServerIdleTimeout, (JSGlobalObject * globalObject, CallFrame* callFrame))
{
auto& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
// This is an internal binding.
JSValue serverValue = callFrame->uncheckedArgument(0);
JSValue seconds = callFrame->uncheckedArgument(1);
ASSERT(callFrame->argumentCount() == 2);
Server__setIdleTimeout(JSValue::encode(serverValue), JSValue::encode(seconds), globalObject);
return JSValue::encode(jsUndefined());
}
JSC_DEFINE_HOST_FUNCTION(jsHTTPGetHeader, (JSGlobalObject * globalObject, CallFrame* callFrame))
{
auto& vm = globalObject->vm();
@@ -418,6 +471,17 @@ JSValue createNodeHTTPInternalBinding(Zig::GlobalObject* globalObject)
obj->putDirect(
vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "assignHeaders"_s)),
JSC::JSFunction::create(vm, globalObject, 2, "assignHeaders"_s, jsHTTPAssignHeaders, ImplementationVisibility::Public), 0);
obj->putDirect(
vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "assignEventCallback"_s)),
JSC::JSFunction::create(vm, globalObject, 2, "assignEventCallback"_s, jsHTTPAssignEventCallback, ImplementationVisibility::Public), 0);
obj->putDirect(
vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setRequestTimeout"_s)),
JSC::JSFunction::create(vm, globalObject, 2, "setRequestTimeout"_s, jsHTTPSetTimeout, ImplementationVisibility::Public), 0);
obj->putDirect(
vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setServerIdleTimeout"_s)),
JSC::JSFunction::create(vm, globalObject, 2, "setServerIdleTimeout"_s, jsHTTPSetServerIdleTimeout, ImplementationVisibility::Public), 0);
obj->putDirect(
vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "Response"_s)),
globalObject->JSResponseConstructor(), 0);

View File

@@ -62,6 +62,7 @@ pub const Request = struct {
weak_ptr_data: bun.WeakPtrData = .{},
// We must report a consistent value for this
reported_estimated_size: usize = 0,
internal_event_callback: InternalJSEventCallback = .{},
const RequestMixin = BodyMixin(@This());
pub usingnamespace JSC.Codegen.JSRequest;
@@ -84,12 +85,70 @@ pub const Request = struct {
return this.request_context.getRequest();
}
pub export fn Request__setInternalEventCallback(
this: *Request,
callback: JSC.JSValue,
globalThis: *JSC.JSGlobalObject,
) void {
this.internal_event_callback = InternalJSEventCallback.init(callback, globalThis);
// we always have the abort event but we need to enable the timeout event as well in case of `node:http`.Server.setTimeout is set
this.request_context.enableTimeoutEvents();
}
pub export fn Request__setTimeout(
this: *Request,
seconds: JSC.JSValue,
globalThis: *JSC.JSGlobalObject,
) void {
if (!seconds.isNumber()) {
globalThis.throw("Failed to set timeout: The provided value is not of type 'number'.", .{});
return;
}
this.setTimeout(seconds.to(c_uint));
}
comptime {
if (!JSC.is_bindgen) {
_ = Request__getUWSRequest;
_ = Request__setInternalEventCallback;
_ = Request__setTimeout;
}
}
pub const InternalJSEventCallback = struct {
function: JSC.Strong = .{},
pub const EventType = enum(u8) {
timeout = 0,
abort = 1,
};
pub fn init(function: JSC.JSValue, globalThis: *JSC.JSGlobalObject) InternalJSEventCallback {
return InternalJSEventCallback{
.function = JSC.Strong.create(function, globalThis),
};
}
pub fn hasCallback(this: *InternalJSEventCallback) bool {
return this.function.has();
}
pub fn trigger(this: *InternalJSEventCallback, eventType: EventType, globalThis: *JSC.JSGlobalObject) bool {
if (this.function.get()) |callback| {
const result = callback.call(globalThis, JSC.JSValue.jsUndefined(), &.{JSC.JSValue.jsNumber(@intFromEnum(eventType))});
if (result.toError()) |js_error| {
globalThis.throwValue(js_error);
}
return true;
}
return false;
}
pub fn deinit(this: *InternalJSEventCallback) void {
this.function.deinit();
}
};
pub fn init(
url: bun.String,
headers: ?*FetchHeaders,
@@ -290,6 +349,7 @@ pub const Request = struct {
signal.unref();
this.signal = null;
}
this.internal_event_callback.deinit();
}
pub fn finalize(this: *Request) void {
@@ -907,4 +967,11 @@ pub const Request = struct {
this.cloneInto(req, allocator, globalThis, false);
return req;
}
pub fn setTimeout(
this: *Request,
seconds: c_uint,
) void {
_ = this.request_context.setTimeout(seconds);
}
};

View File

@@ -1338,6 +1338,38 @@ extern "C"
}
}
void uws_res_on_timeout(int ssl, uws_res_r res,
void (*handler)(uws_res_r res, void *opcional_data),
void *opcional_data)
{
if (ssl)
{
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
auto* onTimeout = reinterpret_cast<void (*)(uWS::HttpResponse<true>*, void*)>(handler);
if (handler)
{
uwsRes->onTimeout(opcional_data, onTimeout);
}
else
{
uwsRes->clearOnTimeout();
}
}
else
{
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
auto* onTimeout = reinterpret_cast<void (*)(uWS::HttpResponse<false>*, void*)>(handler);
if (handler)
{
uwsRes->onTimeout(opcional_data, onTimeout);
}
else
{
uwsRes->clearOnTimeout();
}
}
}
void uws_res_on_data(int ssl, uws_res_r res,
void (*handler)(uws_res_r res, const char *chunk,
size_t chunk_length, bool is_end,

View File

@@ -2555,6 +2555,12 @@ pub const AnyResponse = union(enum) {
.TCP => |resp| resp.clearAborted(),
};
}
pub fn clearTimeout(this: AnyResponse) void {
return switch (this) {
.SSL => |resp| resp.clearTimeout(),
.TCP => |resp| resp.clearTimeout(),
};
}
pub fn clearOnWritable(this: AnyResponse) void {
return switch (this) {
@@ -3076,7 +3082,22 @@ pub fn NewApp(comptime ssl: bool) type {
pub fn clearAborted(res: *Response) void {
uws_res_on_aborted(ssl_flag, res.downcast(), null, null);
}
pub fn onTimeout(res: *Response, comptime UserDataType: type, comptime handler: fn (UserDataType, *Response) void, opcional_data: UserDataType) void {
const Wrapper = struct {
pub fn handle(this: *uws_res, user_data: ?*anyopaque) callconv(.C) void {
if (comptime UserDataType == void) {
@call(bun.callmod_inline, handler, .{ {}, castRes(this), {} });
} else {
@call(bun.callmod_inline, handler, .{ @as(UserDataType, @ptrCast(@alignCast(user_data.?))), castRes(this) });
}
}
};
uws_res_on_timeout(ssl_flag, res.downcast(), Wrapper.handle, opcional_data);
}
pub fn clearTimeout(res: *Response) void {
uws_res_on_timeout(ssl_flag, res.downcast(), null, null);
}
pub fn clearOnData(res: *Response) void {
uws_res_on_data(ssl_flag, res.downcast(), null, null);
}
@@ -3400,6 +3421,8 @@ extern fn uws_res_has_responded(ssl: i32, res: *uws_res) bool;
extern fn uws_res_on_writable(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, u64, ?*anyopaque) callconv(.C) bool, user_data: ?*anyopaque) void;
extern fn uws_res_clear_on_writable(ssl: i32, res: *uws_res) void;
extern fn uws_res_on_aborted(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, ?*anyopaque) callconv(.C) void, opcional_data: ?*anyopaque) void;
extern fn uws_res_on_timeout(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, ?*anyopaque) callconv(.C) void, opcional_data: ?*anyopaque) void;
extern fn uws_res_on_data(
ssl: i32,
res: *uws_res,
@@ -3481,7 +3504,7 @@ extern fn us_socket_mark_needs_more_not_ssl(socket: ?*uws_res) void;
extern fn uws_res_state(ssl: c_int, res: *const uws_res) State;
pub const State = enum(i32) {
pub const State = enum(u8) {
HTTP_STATUS_CALLED = 1,
HTTP_WRITE_CALLED = 2,
HTTP_END_CALLED = 4,

View File

@@ -11,6 +11,9 @@ const {
getHeader,
setHeader,
assignHeaders: assignHeadersFast,
assignEventCallback,
setRequestTimeout,
setServerIdleTimeout,
Response,
Request,
Headers,
@@ -20,6 +23,9 @@ const {
getHeader: (headers: Headers, name: string) => string | undefined;
setHeader: (headers: Headers, name: string, value: string) => void;
assignHeaders: (object: any, req: Request, headersTuple: any) => boolean;
assignEventCallback: (req: Request, callback: (event: number) => void) => void;
setRequestTimeout: (req: Request, timeout: number) => void;
setServerIdleTimeout: (server: any, timeout: number) => void;
Response: (typeof globalThis)["Response"];
Request: (typeof globalThis)["Request"];
Headers: (typeof globalThis)["Headers"];
@@ -233,6 +239,11 @@ var FakeSocket = class Socket extends Duplex {
}
setTimeout(timeout, callback) {
const socketData = this[kInternalSocketData];
if (!socketData) return; // sometimes 'this' is Socket not FakeSocket
const [server, http_res, req] = socketData;
http_res?.req?.setTimeout(timeout, callback);
return this;
}
@@ -421,6 +432,23 @@ function Server(options, callback) {
return this;
}
function onRequestEvent(event) {
const [server, http_res, req] = this.socket[kInternalSocketData];
if (!http_res[finishedSymbol]) {
switch (event) {
case 0: // timeout
this.emit("timeout");
server.emit("timeout", req.socket);
break;
case 1: // abort
this.complete = true;
this.emit("close");
http_res[finishedSymbol] = true;
break;
}
}
}
Server.prototype = {
ref() {
this._unref = false;
@@ -584,6 +612,7 @@ Server.prototype = {
this.serverName = tls.serverName || host || "localhost";
}
this[serverSymbol] = Bun.serve<any>({
idleTimeout: 0, // nodejs dont have a idleTimeout by default
tls,
port,
hostname: host,
@@ -636,6 +665,7 @@ Server.prototype = {
const prevIsNextIncomingMessageHTTPS = isNextIncomingMessageHTTPS;
isNextIncomingMessageHTTPS = isHTTPS;
const http_req = new RequestClass(req);
assignEventCallback(req, onRequestEvent.bind(http_req));
isNextIncomingMessageHTTPS = prevIsNextIncomingMessageHTTPS;
const upgrade = http_req.headers.upgrade;
@@ -683,7 +713,11 @@ Server.prototype = {
},
setTimeout(msecs, callback) {
// TODO:
const server = this[serverSymbol];
if (server) {
setServerIdleTimeout(server, Math.ceil(msecs / 1000));
typeof callback === "function" && this.once("timeout", callback);
}
return this;
},
@@ -770,6 +804,7 @@ function IncomingMessage(req, defaultIncomingOpts) {
this._dumped = false;
this[noBodySymbol] = false;
this[abortedSymbol] = false;
this.complete = false;
Readable.$call(this);
var { type = "request", [kInternalRequest]: nodeReq } = defaultIncomingOpts || {};
@@ -799,8 +834,6 @@ function IncomingMessage(req, defaultIncomingOpts) {
type === "request" // TODO: Add logic for checking for body on response
? requestHasNoBody(this.method, this)
: false;
this.complete = !!this[noBodySymbol];
}
IncomingMessage.prototype = {
@@ -920,7 +953,11 @@ IncomingMessage.prototype = {
// noop
},
setTimeout(msecs, callback) {
// noop
const req = this[reqSymbol];
if (req) {
setRequestTimeout(req, Math.ceil(msecs / 1000));
typeof callback === "function" && this.once("timeout", callback);
}
return this;
},
get socket() {
@@ -1145,6 +1182,10 @@ function emitCloseNT(self) {
}
}
function emitRequestCloseNT(self) {
self.emit("close");
}
function onServerResponseClose() {
// EventEmitter.emit makes a copy of the 'close' listeners array before
// calling the listeners. detachSocket() unregisters onServerResponseClose
@@ -1276,6 +1317,9 @@ function drainHeadersIfObservable() {
}
ServerResponse.prototype._final = function (callback) {
const req = this.req;
const shouldEmitClose = req && req.emit && !this[finishedSymbol];
if (!this.headersSent) {
var data = this[firstWriteSymbol] || "";
this[firstWriteSymbol] = undefined;
@@ -1288,6 +1332,10 @@ ServerResponse.prototype._final = function (callback) {
statusText: this.statusMessage ?? STATUS_CODES[this.statusCode],
}),
);
if (shouldEmitClose) {
req.complete = true;
process.nextTick(emitRequestCloseNT, req);
}
callback && callback();
return;
}
@@ -1295,7 +1343,10 @@ ServerResponse.prototype._final = function (callback) {
this[finishedSymbol] = true;
ensureReadableStreamController.$call(this, controller => {
controller.end();
if (shouldEmitClose) {
req.complete = true;
process.nextTick(emitRequestCloseNT, req);
}
callback();
const deferred = this[deferredSymbol];
if (deferred) {
@@ -2157,6 +2208,10 @@ function _writeHead(statusCode, reason, obj, response) {
// consisting only of the Status-Line and optional headers, and is
// terminated by an empty line.
response._hasBody = false;
const req = response.req;
if (req) {
req.complete = true;
}
}
}

View File

@@ -2038,3 +2038,20 @@ it("allow requestIP after async operation", async () => {
expect(ip.address).toBeString();
expect(ip.family).toBeString();
});
it("allow custom timeout per request", async () => {
using server = Bun.serve({
idleTimeout: 1,
port: 0,
async fetch(req, server) {
server.timeout(req, 60);
await Bun.sleep(10000); //uWS precision is not great
return new Response("Hello, World!");
},
});
expect(server.timeout).toBeFunction();
const res = await fetch(new URL("/long-timeout", server.url.origin));
expect(res.status).toBe(200);
expect(res.text()).resolves.toBe("Hello, World!");
}, 20_000);

View File

@@ -3,7 +3,7 @@ import { bunExe } from "bun:harness";
import { bunEnv, randomPort } from "harness";
import { createTest } from "node-harness";
import { spawnSync } from "node:child_process";
import { EventEmitter } from "node:events";
import { EventEmitter, once } from "node:events";
import nodefs, { unlinkSync } from "node:fs";
import http, {
Agent,
@@ -2150,51 +2150,6 @@ it("should error with faulty args", async () => {
server.close();
});
it("should mark complete true", async () => {
const { promise: serve, resolve: resolveServe } = Promise.withResolvers();
const server = createServer(async (req, res) => {
let count = 0;
let data = "";
req.on("data", chunk => {
data += chunk.toString();
});
while (!req.complete) {
await Bun.sleep(100);
count++;
if (count > 10) {
res.writeHead(500, { "Content-Type": "text/plain" });
res.end("Request timeout");
return;
}
}
res.writeHead(200, { "Content-Type": "text/plain" });
res.end(data);
});
server.listen(0, () => {
resolveServe(`http://localhost:${server.address().port}`);
});
const url = await serve;
try {
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
name: "Hotel 1",
price: 100,
}),
});
expect(response.status).toBe(200);
expect(await response.text()).toBe('{"name":"Hotel 1","price":100}');
} finally {
server.close();
}
});
it("should propagate exception in sync data handler", async () => {
const { exitCode, stdout } = Bun.spawnSync({
cmd: [bunExe(), "run", path.join(import.meta.dir, "node-http-error-in-data-handler-fixture.1.js")],
@@ -2365,3 +2320,86 @@ it("using node:http to do https: request fails", () => {
message: `Protocol "https:" not supported. Expected "http:"`,
});
});
it("should emit close, and complete should be true only after close #13373", async () => {
const server = http.createServer().listen(0);
try {
await once(server, "listening");
fetch(`http://localhost:${server.address().port}`)
.then(res => res.text())
.catch(() => {});
const [req, res] = await once(server, "request");
expect(req.complete).toBe(false);
const closeEvent = once(req, "close");
res.end("hi");
await closeEvent;
expect(req.complete).toBe(true);
} finally {
server.closeAllConnections();
}
});
it("should emit close when connection is aborted", async () => {
const server = http.createServer().listen(0);
try {
await once(server, "listening");
const controller = new AbortController();
fetch(`http://localhost:${server.address().port}`, { signal: controller.signal })
.then(res => res.text())
.catch(() => {});
const [req, res] = await once(server, "request");
expect(req.complete).toBe(false);
const closeEvent = once(req, "close");
controller.abort();
await closeEvent;
expect(req.complete).toBe(true);
} finally {
server.close();
}
});
it("should emit timeout event", async () => {
const server = http.createServer().listen(0);
try {
await once(server, "listening");
fetch(`http://localhost:${server.address().port}`)
.then(res => res.text())
.catch(() => {});
const [req, res] = await once(server, "request");
expect(req.complete).toBe(false);
let callBackCalled = false;
req.setTimeout(1000, () => {
callBackCalled = true;
});
await once(req, "timeout");
expect(callBackCalled).toBe(true);
} finally {
server.closeAllConnections();
}
}, 12_000);
it("should emit timeout event when using server.setTimeout", async () => {
const server = http.createServer().listen(0);
try {
await once(server, "listening");
let callBackCalled = false;
server.setTimeout(1000, () => {
callBackCalled = true;
});
fetch(`http://localhost:${server.address().port}`)
.then(res => res.text())
.catch(() => {});
const [req, res] = await once(server, "request");
expect(req.complete).toBe(false);
await once(server, "timeout");
expect(callBackCalled).toBe(true);
} finally {
server.closeAllConnections();
}
}, 12_000);