fix(node:http) properly signal server drain after draining the socket buffer (#18479)

This commit is contained in:
Ciro Spaciari
2025-03-26 16:41:17 -07:00
committed by GitHub
parent 7740271359
commit a548c2ec54
11 changed files with 306 additions and 30 deletions

View File

@@ -182,9 +182,9 @@ public:
}
/* Returns the user space backpressure. */
unsigned int getBufferedAmount() {
size_t getBufferedAmount() {
/* We return the actual amount of bytes in backbuffer, including pendingRemoval */
return (unsigned int) getAsyncSocketData()->buffer.totalLength();
return getAsyncSocketData()->buffer.totalLength();
}
/* Returns the text representation of an IPv4 or IPv6 address */
@@ -222,6 +222,63 @@ public:
return addressAsText(getRemoteAddress());
}
/**
* Flushes the socket buffer by writing as much data as possible to the underlying socket.
*
* @return The total number of bytes successfully written to the socket
*/
size_t flush() {
/* Check if socket is valid for operations */
if (us_socket_is_closed(SSL, (us_socket_t *) this)) {
/* Socket is closed, no flushing is possible */
return 0;
}
/* Get the associated asynchronous socket data structure */
AsyncSocketData<SSL> *asyncSocketData = getAsyncSocketData();
size_t total_written = 0;
/* Continue flushing as long as we have data in the buffer */
while (asyncSocketData->buffer.length()) {
/* Get current buffer size */
size_t buffer_len = asyncSocketData->buffer.length();
/* Limit write size to INT_MAX as the underlying socket API uses int for length */
int max_flush_len = std::min(buffer_len, (size_t)INT_MAX);
/* Attempt to write data to the socket */
int written = us_socket_write(SSL, (us_socket_t *) this, asyncSocketData->buffer.data(), max_flush_len, 0);
total_written += written;
/* Check if we couldn't write the entire buffer */
if ((unsigned int) written < buffer_len) {
/* Remove the successfully written data from the buffer */
asyncSocketData->buffer.erase((unsigned int) written);
/* If we wrote less than we attempted, the socket buffer is likely full
* likely is used as an optimization hint to the compiler
* since written < buffer_len is very likely to be true
*/
if(written < max_flush_len) {
[[likely]]
/* Cannot write more at this time, return what we've written so far */
return total_written;
}
/* If we wrote exactly max_flush_len, we might be able to write more, so continue
* This is unlikely to happen, because this would be INT_MAX bytes, which is unlikely to be written in one go
* but we keep this check for completeness
*/
continue;
}
/* Successfully wrote the entire buffer, clear the buffer */
asyncSocketData->buffer.clear();
}
/* Return the total number of bytes written during this flush operation */
return total_written;
}
/* Write in three levels of prioritization: cork-buffer, syscall, socket-buffer. Always drain if possible.
* Returns pair of bytes written (anywhere) and wheter or not this call resulted in the polling for
* writable (or we are in a state that implies polling for writable). */
@@ -233,7 +290,6 @@ public:
LoopData *loopData = getLoopData();
AsyncSocketData<SSL> *asyncSocketData = getAsyncSocketData();
/* We are limited if we have a per-socket buffer */
if (asyncSocketData->buffer.length()) {
size_t buffer_len = asyncSocketData->buffer.length();
@@ -261,7 +317,7 @@ public:
asyncSocketData->buffer.clear();
}
if (length) {
if (length) {
if (loopData->isCorkedWith(this)) {
/* We are corked */
if (LoopData::CORK_BUFFER_SIZE - loopData->getCorkOffset() >= (unsigned int) length) {

View File

@@ -365,11 +365,32 @@ private:
auto *asyncSocket = reinterpret_cast<AsyncSocket<SSL> *>(s);
auto *httpResponseData = reinterpret_cast<HttpResponseData<SSL> *>(asyncSocket->getAsyncSocketData());
/* Attempt to drain the socket buffer before triggering onWritable callback */
size_t bufferedAmount = asyncSocket->getBufferedAmount();
if (bufferedAmount > 0) {
/* Try to flush pending data from the socket's buffer to the network */
bufferedAmount -= asyncSocket->flush();
/* Check if there's still data waiting to be sent after flush attempt */
if (bufferedAmount > 0) {
/* Socket buffer is not completely empty yet
* - Reset the timeout to prevent premature connection closure
* - This allows time for another writable event or new request
* - Return the socket to indicate we're still processing
*/
reinterpret_cast<HttpResponse<SSL> *>(s)->resetTimeout();
return s;
}
/* If bufferedAmount is now 0, we've successfully flushed everything
* and will fall through to the next section of code
*/
}
/* Ask the developer to write data and return success (true) or failure (false), OR skip sending anything and return success (true). */
if (httpResponseData->onWritable) {
/* We are now writable, so hang timeout again, the user does not have to do anything so we should hang until end or tryEnd rearms timeout */
us_socket_timeout(SSL, s, 0);
/* We expect the developer to return whether or not write was successful (true).
* If write was never called, the developer should still return true so that we may drain. */
bool success = httpResponseData->callOnWritable(reinterpret_cast<HttpResponse<SSL> *>(asyncSocket), httpResponseData->offset);
@@ -384,7 +405,7 @@ private:
}
/* Drain any socket buffer, this might empty our backpressure and thus finish the request */
/*auto [written, failed] = */asyncSocket->write(nullptr, 0, true, 0);
asyncSocket->flush();
/* Should we close this connection after a response - and is this response really done? */
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE) {

View File

@@ -122,15 +122,10 @@ public:
/* We do not have tryWrite-like functionalities, so ignore optional in this path */
/* Do not allow sending 0 chunk here */
if (data.length()) {
Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);
/* Ignoring optional for now */
Super::write(data.data(), (int) data.length());
}
/* Write the chunked data if there is any (this will not send zero chunks) */
this->write(data, nullptr);
/* Terminating 0 chunk */
Super::write("\r\n0\r\n\r\n", 7);
@@ -480,6 +475,40 @@ public:
return true;
}
size_t length = data.length();
// Special handling for extremely large data (greater than UINT_MAX bytes)
// most clients expect a max of UINT_MAX, so we need to split the write into multiple writes
if (length > UINT_MAX) {
bool has_failed = false;
size_t total_written = 0;
// Process full-sized chunks until remaining data is less than UINT_MAX
while (length > UINT_MAX) {
size_t written = 0;
// Write a UINT_MAX-sized chunk and check for failure
// even after failure we continue writing because the data will be buffered
if(!this->write(data.substr(0, UINT_MAX), &written)) {
has_failed = true;
}
total_written += written;
length -= UINT_MAX;
data = data.substr(UINT_MAX);
}
// Handle the final chunk (less than UINT_MAX bytes)
if (length > 0) {
size_t written = 0;
if(!this->write(data, &written)) {
has_failed = true;
}
total_written += written;
}
if (writtenPtr) {
*writtenPtr = total_written;
}
return !has_failed;
}
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER) && !httpResponseData->fromAncientRequest) {
@@ -499,17 +528,36 @@ public:
Super::write("\r\n", 2);
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
size_t total_written = 0;
bool has_failed = false;
auto [written, failed] = Super::write(data.data(), (int) data.length());
// Handle data larger than INT_MAX by writing it in chunks of INT_MAX bytes
while (length > INT_MAX) {
// Write the maximum allowed chunk size (INT_MAX)
auto [written, failed] = Super::write(data.data(), INT_MAX);
// If the write failed, set the has_failed flag we continue writting because the data will be buffered
has_failed = has_failed || failed;
total_written += written;
length -= INT_MAX;
data = data.substr(INT_MAX);
}
// Handle the remaining data (less than INT_MAX bytes)
if (length > 0) {
// Write the final chunk with exact remaining length
auto [written, failed] = Super::write(data.data(), (int) length);
has_failed = has_failed || failed;
total_written += written;
}
/* Reset timeout on each sended chunk */
this->resetTimeout();
if (writtenPtr) {
*writtenPtr = written;
*writtenPtr = total_written;
}
/* If we did not fail the write, accept more */
return !failed;
return !has_failed;
}
/* Get the current byte write offset for this Http response */

View File

@@ -339,7 +339,7 @@ private:
/* We store old backpressure since it is unclear whether write drained anything,
* however, in case of coming here with 0 backpressure we still need to emit drain event */
unsigned int backpressure = asyncSocket->getBufferedAmount();
size_t backpressure = asyncSocket->getBufferedAmount();
/* Drain as much as possible */
asyncSocket->write(nullptr, 0);