node:http improvements (#17093)

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
Co-authored-by: Pham Minh Triet <92496972+Nanome203@users.noreply.github.com>
Co-authored-by: snwy <snwy@snwy.me>
Co-authored-by: Ciro Spaciari <ciro.spaciari@gmail.com>
Co-authored-by: cirospaciari <cirospaciari@users.noreply.github.com>
Co-authored-by: Ben Grant <ben@bun.sh>
This commit is contained in:
Kai Tamkun
2025-03-10 20:19:29 -07:00
committed by GitHub
parent 013fdddc6e
commit 4a0e982bb2
83 changed files with 6236 additions and 1888 deletions

View File

@@ -189,6 +189,10 @@ public:
* This function should probably be optimized a lot in future releases,
* it could be O(1) with a hash map of fullnames and their counts. */
unsigned int numSubscribers(std::string_view topic) {
if (!topicTree) {
return 0;
}
Topic *t = topicTree->lookupTopic(topic);
if (t) {
return (unsigned int) t->size();
@@ -606,6 +610,10 @@ public:
return std::move(*this);
}
void setOnClose(HttpContextData<SSL>::OnSocketClosedCallback onClose) {
httpContext->getSocketContextData()->onSocketClosed = onClose;
}
TemplatedApp &&run() {
uWS::run();
return std::move(*this);

View File

@@ -30,6 +30,9 @@
#include <iostream>
#include "libusockets.h"
#include "bun-usockets/src/internal/internal.h"
#include "LoopData.h"
#include "AsyncSocketData.h"
@@ -54,28 +57,6 @@ struct AsyncSocket {
template <typename, typename> friend struct TopicTree;
template <bool> friend struct HttpResponse;
private:
/* Helper, do not use directly (todo: move to uSockets or de-crazify) */
void throttle_helper(int toggle) {
/* These should be exposed by uSockets */
static thread_local int us_events[2] = {0, 0};
struct us_poll_t *p = (struct us_poll_t *) this;
struct us_loop_t *loop = us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this));
if (toggle) {
/* Pause */
int events = us_poll_events(p);
if (events) {
us_events[getBufferedAmount() ? 1 : 0] = events;
}
us_poll_change(p, loop, 0);
} else {
/* Resume */
int events = us_events[getBufferedAmount() ? 1 : 0];
us_poll_change(p, loop, events);
}
}
public:
/* Returns SSL pointer or FD as pointer */
@@ -105,13 +86,13 @@ public:
/* Experimental pause */
us_socket_t *pause() {
throttle_helper(1);
us_socket_pause(SSL, (us_socket_t *) this);
return (us_socket_t *) this;
}
/* Experimental resume */
us_socket_t *resume() {
throttle_helper(0);
us_socket_resume(SSL, (us_socket_t *) this);
return (us_socket_t *) this;
}

View File

@@ -115,9 +115,8 @@ private:
us_socket_context_on_close(SSL, getSocketContext(), [](us_socket_t *s, int /*code*/, void */*reason*/) {
((AsyncSocket<SSL> *)s)->uncorkWithoutSending();
/* Get socket ext */
HttpResponseData<SSL> *httpResponseData = (HttpResponseData<SSL> *) us_socket_ext(SSL, s);
auto *httpResponseData = reinterpret_cast<HttpResponseData<SSL> *>(us_socket_ext(SSL, s));
/* Call filter */
HttpContextData<SSL> *httpContextData = getSocketContextDataS(s);
@@ -130,6 +129,9 @@ private:
httpResponseData->onAborted((HttpResponse<SSL> *)s, httpResponseData->userData);
}
if (httpResponseData->socketData && httpContextData->onSocketClosed) {
httpContextData->onSocketClosed(httpResponseData->socketData, SSL, s);
}
/* Destruct socket ext */
httpResponseData->~HttpResponseData<SSL>();
@@ -171,7 +173,7 @@ private:
proxyParser = &httpResponseData->proxyParser;
#endif
/* The return value is entirely up to us to interpret. The HttpParser only care for whether the returned value is DIFFERENT or not from passed user */
/* The return value is entirely up to us to interpret. The HttpParser cares only for whether the returned value is DIFFERENT from passed user */
auto [err, returnedSocket] = httpResponseData->consumePostPadded(data, (unsigned int) length, s, proxyParser, [httpContextData](void *s, HttpRequest *httpRequest) -> void * {
/* For every request we reset the timeout and hang until user makes action */
/* Warning: if we are in shutdown state, resetting the timer is a security issue! */
@@ -182,7 +184,7 @@ private:
httpResponseData->offset = 0;
/* Are we not ready for another request yet? Terminate the connection.
* Important for denying async pipelining until, if ever, we want to suppot it.
* Important for denying async pipelining until, if ever, we want to support it.
* Otherwise requests can get mixed up on the same connection. We still support sync pipelining. */
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_RESPONSE_PENDING) {
us_socket_close(SSL, (us_socket_t *) s, 0, nullptr);
@@ -416,7 +418,7 @@ private:
/* Force close rather than gracefully shutdown and risk confusing the client with a complete download */
AsyncSocket<SSL> *asyncSocket = (AsyncSocket<SSL> *) s;
// Node.js by default sclose the connection but they emit the timeout event before that
// Node.js by default closes the connection but they emit the timeout event before that
HttpResponseData<SSL> *httpResponseData = (HttpResponseData<SSL> *) asyncSocket->getAsyncSocketData();
if (httpResponseData->onTimeout) {

View File

@@ -27,6 +27,7 @@ namespace uWS {
template<bool> struct HttpResponse;
struct HttpRequest;
template <bool SSL>
struct alignas(16) HttpContextData {
template <bool> friend struct HttpContext;
@@ -34,6 +35,7 @@ struct alignas(16) HttpContextData {
template <bool> friend struct TemplatedApp;
private:
std::vector<MoveOnlyFunction<void(HttpResponse<SSL> *, int)>> filterHandlers;
using OnSocketClosedCallback = void (*)(void* userData, int is_ssl, struct us_socket_t *rawSocket);
MoveOnlyFunction<void(const char *hostname)> missingServerNameHandler;
@@ -51,6 +53,9 @@ private:
bool isParsingHttp = false;
bool rejectUnauthorized = false;
/* Used to simulate Node.js socket events. */
OnSocketClosedCallback onSocketClosed = nullptr;
// TODO: SNI
void clearRoutes() {
this->router = HttpRouter<RouterData>{};

View File

@@ -239,7 +239,7 @@ namespace uWS
}
return unsignedIntegerValue;
}
static inline uint64_t hasLess(uint64_t x, uint64_t n) {
return (((x)-~0ULL/255*(n))&~(x)&~0ULL/255*128);
}
@@ -283,7 +283,7 @@ namespace uWS
}
return false;
}
static inline void *consumeFieldName(char *p) {
/* Best case fast path (particularly useful with clang) */
while (true) {
@@ -323,14 +323,14 @@ namespace uWS
uint64_t http;
__builtin_memcpy(&http, data, sizeof(uint64_t));
uint32_t first_four_bytes = http & static_cast<uint32_t>(0xFFFFFFFF);
// check if any of the first four bytes are > non-ascii
if ((first_four_bytes & 0x80808080) != 0) [[unlikely]] {
return 0;
}
first_four_bytes |= 0x20202020; // Lowercase the first four bytes
static constexpr char http_lowercase_bytes[4] = {'h', 't', 't', 'p'};
static constexpr uint32_t http_lowercase_bytes_int = __builtin_bit_cast(uint32_t, http_lowercase_bytes);
if (first_four_bytes == http_lowercase_bytes_int) [[likely]] {
@@ -343,7 +343,7 @@ namespace uWS
static constexpr char S_colon_slash_slash[4] = {'S', ':', '/', '/'};
static constexpr uint32_t S_colon_slash_slash_int = __builtin_bit_cast(uint32_t, S_colon_slash_slash);
// Extract the last four bytes from the uint64_t
const uint32_t last_four_bytes = (http >> 32) & static_cast<uint32_t>(0xFFFFFFFF);
return (last_four_bytes == s_colon_slash_slash_int) || (last_four_bytes == S_colon_slash_slash_int);
@@ -361,7 +361,7 @@ namespace uWS
if (&data[1] == end) [[unlikely]] {
return nullptr;
}
if (data[0] == 32 && (__builtin_expect(data[1] == '/', 1) || isHTTPorHTTPSPrefixForProxies(data + 1, end) == 1)) [[likely]] {
header.key = {start, (size_t) (data - start)};
data++;
@@ -536,7 +536,7 @@ namespace uWS
while (headers->value.length() && headers->value.front() < 33) {
headers->value.remove_prefix(1);
}
headers++;
/* We definitely have at least one header (or request line), so check if we are done */
@@ -598,7 +598,7 @@ namespace uWS
for (HttpRequest::Header *h = req->headers; (++h)->key.length(); ) {
req->bf.add(h->key);
}
/* Break if no host header (but we can have empty string which is different from nullptr) */
if (!req->getHeader("host").data()) {
return {HTTP_ERROR_400_BAD_REQUEST, FULLPTR};
@@ -611,11 +611,12 @@ namespace uWS
* ought to be handled as an error. */
std::string_view transferEncodingString = req->getHeader("transfer-encoding");
std::string_view contentLengthString = req->getHeader("content-length");
auto transferEncodingStringLen = transferEncodingString.length();
auto contentLengthStringLen = contentLengthString.length();
if (transferEncodingStringLen && contentLengthStringLen) {
/* Returning fullptr is the same as calling the errorHandler */
/* We could be smart and set an error in the context along with this, to indicate what
/* We could be smart and set an error in the context along with this, to indicate what
* http error response we might want to return */
return {HTTP_ERROR_400_BAD_REQUEST, FULLPTR};
}
@@ -623,7 +624,7 @@ namespace uWS
/* Parse query */
const char *querySeparatorPtr = (const char *) memchr(req->headers->value.data(), '?', req->headers->value.length());
req->querySeparator = (unsigned int) ((querySeparatorPtr ? querySeparatorPtr : req->headers->value.data() + req->headers->value.length()) - req->headers->value.data());
// lets check if content len is valid before calling requestHandler
if(contentLengthStringLen) {
remainingStreamingBytes = toUnsignedInteger(contentLengthString);
@@ -633,6 +634,14 @@ namespace uWS
}
}
// lets check if content len is valid before calling requestHandler
if(contentLengthStringLen) {
remainingStreamingBytes = toUnsignedInteger(contentLengthString);
if (remainingStreamingBytes == UINT64_MAX) {
/* Parser error */
return {HTTP_ERROR_400_BAD_REQUEST, FULLPTR};
}
}
/* If returned socket is not what we put in we need
* to break here as we either have upgraded to
* WebSockets or otherwise closed the socket. */
@@ -654,7 +663,7 @@ namespace uWS
if (transferEncodingStringLen) {
/* If a proxy sent us the transfer-encoding header that 100% means it must be chunked or else the proxy is
* not RFC 9112 compliant. Therefore it is always better to assume this is the case, since that entirely eliminates
* not RFC 9112 compliant. Therefore it is always better to assume this is the case, since that entirely eliminates
* all forms of transfer-encoding obfuscation tricks. We just rely on the header. */
/* RFC 9112 6.3
@@ -683,7 +692,6 @@ namespace uWS
consumedTotal += consumed;
}
} else if (contentLengthStringLen) {
if constexpr (!ConsumeMinimally) {
unsigned int emittable = (unsigned int) std::min<uint64_t>(remainingStreamingBytes, length);
dataHandler(user, std::string_view(data, emittable), emittable == remainingStreamingBytes);

View File

@@ -81,8 +81,12 @@ public:
/* Called only once per request */
void writeMark() {
if (getHttpResponseData()->state & HttpResponseData<SSL>::HTTP_WROTE_DATE_HEADER) {
return;
}
/* Date is always written */
writeHeader("Date", std::string_view(((LoopData *) us_loop_ext(us_socket_context_loop(SSL, (us_socket_context(SSL, (us_socket_t *) this)))))->date, 29));
getHttpResponseData()->state |= HttpResponseData<SSL>::HTTP_WROTE_DATE_HEADER;
}
/* Returns true on success, indicating that it might be feasible to write more data.
@@ -113,7 +117,8 @@ public:
httpResponseData->state |= HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE;
}
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED) {
/* if write was called and there was previously no Content-Length header set */
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED && !(httpResponseData->state & HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER)) {
/* We do not have tryWrite-like functionalities, so ignore optional in this path */
@@ -145,6 +150,8 @@ public:
}
}
}
} else {
this->uncork();
}
/* tryEnd can never fail when in chunked mode, since we do not have tryWrite (yet), only write */
@@ -152,7 +159,7 @@ public:
return true;
} else {
/* Write content-length on first call */
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_END_CALLED)) {
if (!(httpResponseData->state & (HttpResponseData<SSL>::HTTP_END_CALLED))) {
/* Write mark, this propagates to WebSockets too */
writeMark();
@@ -162,7 +169,8 @@ public:
Super::write("Content-Length: ", 16);
writeUnsigned64(totalSize);
Super::write("\r\n\r\n", 4);
} else {
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER;
} else if (!(httpResponseData->state & (HttpResponseData<SSL>::HTTP_WRITE_CALLED))) {
Super::write("\r\n", 2);
}
@@ -207,6 +215,8 @@ public:
}
}
}
} else {
this->uncork();
}
}
@@ -231,7 +241,7 @@ public:
/* Manually upgrade to WebSocket. Typically called in upgrade handler. Immediately calls open handler.
* NOTE: Will invalidate 'this' as socket might change location in memory. Throw away after use. */
template <typename UserData>
void upgrade(UserData &&userData, std::string_view secWebSocketKey, std::string_view secWebSocketProtocol,
us_socket_t *upgrade(UserData &&userData, std::string_view secWebSocketKey, std::string_view secWebSocketProtocol,
std::string_view secWebSocketExtensions,
struct us_socket_context_t *webSocketContext) {
@@ -313,8 +323,8 @@ public:
bool wasCorked = Super::isCorked();
/* Adopting a socket invalidates it, do not rely on it directly to carry any data */
WebSocket<SSL, true, UserData> *webSocket = (WebSocket<SSL, true, UserData> *) us_socket_context_adopt_socket(SSL,
(us_socket_context_t *) webSocketContext, (us_socket_t *) this, sizeof(WebSocketData) + sizeof(UserData));
us_socket_t *usSocket = us_socket_context_adopt_socket(SSL, (us_socket_context_t *) webSocketContext, (us_socket_t *) this, sizeof(WebSocketData) + sizeof(UserData));
WebSocket<SSL, true, UserData> *webSocket = (WebSocket<SSL, true, UserData> *) usSocket;
/* For whatever reason we were corked, update cork to the new socket */
if (wasCorked) {
@@ -344,6 +354,8 @@ public:
if (webSocketContextData->openHandler) {
webSocketContextData->openHandler(webSocket);
}
return usSocket;
}
/* Immediately terminate this Http response */
@@ -427,7 +439,7 @@ public:
/* End the response with an optional data chunk. Always starts a timeout. */
void end(std::string_view data = {}, bool closeConnection = false) {
internalEnd(data, data.length(), false, true, closeConnection);
internalEnd(data, data.length(), false, !(this->getHttpResponseData()->state & HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER), closeConnection);
}
/* Try and end the response. Returns [true, true] on success.
@@ -441,12 +453,12 @@ public:
bool sendTerminatingChunk(bool closeConnection = false) {
writeStatus(HTTP_200_OK);
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
if (!(httpResponseData->state & (HttpResponseData<SSL>::HTTP_WRITE_CALLED | HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER))) {
/* Write mark on first call to write */
writeMark();
writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
/* This will be sent always when state is HTTP_WRITE_CALLED inside internalEnd, so no need to write the terminating 0 chunk here */
@@ -456,33 +468,46 @@ public:
}
/* Write parts of the response in chunking fashion. Starts timeout if failed. */
bool write(std::string_view data) {
bool write(std::string_view data, size_t *writtenPtr = nullptr) {
writeStatus(HTTP_200_OK);
/* Do not allow sending 0 chunks, they mark end of response */
if (data.empty()) {
if (writtenPtr) {
*writtenPtr = 0;
}
/* If you called us, then according to you it was fine to call us so it's fine to still call us */
return true;
}
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
/* Write mark on first call to write */
writeMark();
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER)) {
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
/* Write mark on first call to write */
writeMark();
writeHeader("Transfer-Encoding", "chunked");
writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);
} else if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
writeMark();
Super::write("\r\n", 2);
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);
auto [written, failed] = Super::write(data.data(), (int) data.length());
/* Reset timeout on each sended chunk */
this->resetTimeout();
if (writtenPtr) {
*writtenPtr = written;
}
/* If we did not fail the write, accept more */
return !failed;
}
@@ -515,7 +540,7 @@ public:
Super::cork();
handler();
/* The only way we could possibly have changed the corked socket during handler call, would be if
/* The only way we could possibly have changed the corked socket during handler call, would be if
* the HTTP socket was upgraded to WebSocket and caused a realloc. Because of this we cannot use "this"
* from here downwards. The corking is done with corkUnchecked() in upgrade. It steals cork. */
auto *newCorkedSocket = loopData->getCorkedSocket();
@@ -582,7 +607,7 @@ public:
/* Attach handler for aborted HTTP request */
HttpResponse *onAborted(void* userData, HttpResponseData<SSL>::OnAbortedCallback handler) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
httpResponseData->userData = userData;
httpResponseData->onAborted = handler;
return this;
@@ -590,7 +615,7 @@ public:
HttpResponse *onTimeout(void* userData, HttpResponseData<SSL>::OnTimeoutCallback handler) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
httpResponseData->userData = userData;
httpResponseData->onTimeout = handler;
return this;
@@ -620,7 +645,7 @@ public:
return this;
}
/* Attach a read handler for data sent. Will be called with FIN set true if last segment. */
void onData(void* userData, HttpResponseData<SSL>::OnDataCallback handler) {
void onData(void* userData, HttpResponseData<SSL>::OnDataCallback handler) {
HttpResponseData<SSL> *data = getHttpResponseData();
data->userData = userData;
data->inStream = handler;
@@ -629,6 +654,17 @@ public:
data->received_bytes_per_timeout = 0;
}
void* getSocketData() {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
return httpResponseData->socketData;
}
void setSocketData(void* socketData) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
httpResponseData->socketData = socketData;
}
void setWriteOffset(uint64_t offset) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();

View File

@@ -77,11 +77,14 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
HTTP_WRITE_CALLED = 2, // used
HTTP_END_CALLED = 4, // used
HTTP_RESPONSE_PENDING = 8, // used
HTTP_CONNECTION_CLOSE = 16 // used
HTTP_CONNECTION_CLOSE = 16, // used
HTTP_WROTE_CONTENT_LENGTH_HEADER = 32, // used
HTTP_WROTE_DATE_HEADER = 64, // used
};
/* Shared context pointer */
void* userData = nullptr;
void* socketData = nullptr;
/* Per socket event handlers */
OnWritableCallback onWritable = nullptr;

View File

@@ -24,6 +24,7 @@
#include "LoopData.h"
#include <libusockets.h>
#include <iostream>
#include "AsyncSocket.h"
extern "C" int bun_is_exiting();
@@ -52,6 +53,15 @@ private:
for (auto &p : loopData->preHandlers) {
p.second((Loop *) loop);
}
void *corkedSocket = loopData->getCorkedSocket();
if (corkedSocket) {
if (loopData->isCorkedSSL()) {
((uWS::AsyncSocket<true> *) corkedSocket)->uncork();
} else {
((uWS::AsyncSocket<false> *) corkedSocket)->uncork();
}
}
}
static void postCb(us_loop_t *loop) {
@@ -148,6 +158,10 @@ public:
getLazyLoop().loop = nullptr;
}
static LoopData* data(struct us_loop_t *loop) {
return (LoopData *) us_loop_ext(loop);
}
void addPostHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);

View File

@@ -63,6 +63,7 @@ public:
}
delete [] corkBuffer;
}
void* getCorkedSocket() {
return this->corkedSocket;
}