Compare commits

...

14 Commits

Author SHA1 Message Date
Ciro Spaciari
89f4352400 increase more 2025-03-31 14:33:27 -07:00
Ciro Spaciari
6af7132fd1 remove log 2025-03-31 13:55:15 -07:00
Ciro Spaciari
33348ee428 increase again 2025-03-31 13:54:15 -07:00
Ciro Spaciari
1bfa558454 update 2025-03-31 11:55:30 -07:00
Ciro Spaciari
a969862ef3 Merge branch 'main' into ciro/fix-cork-uncork-visibility 2025-03-31 11:49:02 -07:00
Jarred Sumner
9e80a81d1e Merge branch 'main' into ciro/fix-cork-uncork-visibility 2025-03-27 18:39:48 -07:00
Jarred Sumner
d8810d5689 Merge branch 'main' into ciro/fix-cork-uncork-visibility 2025-03-27 18:07:57 -07:00
Ciro Spaciari
fc07af6c66 fix end-cork 2025-03-27 16:44:55 -07:00
Ciro Spaciari
060f9ddc10 cork/uncork and writableCorked should be visible 2025-03-27 16:35:53 -07:00
Ciro Spaciari
42c9a9f854 let errors error in other cases 2025-03-27 16:09:09 -07:00
Ciro Spaciari
d1be55629d more 2025-03-27 15:47:38 -07:00
Ciro Spaciari
def59c1cc0 emit error instead of throwing 2025-03-27 15:15:05 -07:00
Ciro Spaciari
6aada0fbed test 2025-03-27 15:08:54 -07:00
Ciro Spaciari
57e924f60e test 2025-03-27 11:40:54 -07:00
3 changed files with 167 additions and 2 deletions

View File

@@ -70,6 +70,7 @@ const serverSymbol = Symbol.for("::bunternal::");
const kPendingCallbacks = Symbol("pendingCallbacks");
const kRequest = Symbol("request");
const kCloseCallback = Symbol("closeCallback");
const kCorked = Symbol("corked");
const kEmptyObject = Object.freeze(Object.create(null));
@@ -446,6 +447,9 @@ const NodeHTTPServerSocket = class Socket extends Duplex {
return this;
}
get writableHighWaterMark() {
return getPlatformHighWaterMark();
}
get remoteAddress() {
return this.address()?.address;
}
@@ -1774,7 +1778,7 @@ const OutgoingMessagePrototype = {
},
get writableHighWaterMark() {
return 16 * 1024;
return getPlatformHighWaterMark();
},
get writableNeedDrain() {
@@ -1785,9 +1789,21 @@ const OutgoingMessagePrototype = {
return this.finished;
},
get writableCorked() {
return this[kCorked];
},
get writableFinished() {
return this.finished && !!(this[kEmitState] & (1 << ClientRequestEmitState.finish));
},
cork() {
this[kCorked]++;
// corking is actually handled internally in the socket/handler
},
uncork() {
this[kCorked]--;
// uncorking is actually handled internally in the socket/handler
},
_send(data, encoding, callback, byteLength) {
if (this.destroyed) {
@@ -1796,6 +1812,7 @@ const OutgoingMessagePrototype = {
return this.write(data, encoding, callback);
},
end(chunk, encoding, callback) {
this[kCorked] = 0;
return this;
},
destroy(err?: Error) {
@@ -1942,6 +1959,20 @@ function allowWritesToContinue() {
this._callPendingCallbacks();
this.emit("drain");
}
function getPlatformHighWaterMark() {
// 256kb is for macOS
// this is not quite correct, but it's close enough for now
switch (process.platform) {
case "darwin":
return 256 * 1024;
case "win32":
// unix
default:
// just a high enough number that will hit backpressure
// 4MB was enought for musl but not for x64
return 16 * 1024 * 1024;
}
}
const ServerResponsePrototype = {
constructor: ServerResponse,
__proto__: OutgoingMessage.prototype,
@@ -2002,6 +2033,7 @@ const ServerResponsePrototype = {
// But we don't want it for the fetch() response version.
end(chunk, encoding, callback) {
const handle = this[kHandle];
this[kCorked] = 0;
if (handle?.aborted) {
return this;
}
@@ -2226,7 +2258,7 @@ const ServerResponsePrototype = {
},
get writableHighWaterMark() {
return 64 * 1024;
return getPlatformHighWaterMark();
},
get closed() {

View File

@@ -0,0 +1,95 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const REQ_TIMEOUT = common.platformTimeout(500); // Set max ms of request time before abort
// Set total allowed test timeout to avoid infinite loop
// that will hang test suite
const TOTAL_TEST_TIMEOUT = common.platformTimeout(1000);
// Placeholder for sockets handled, to make sure that we
// will reach a socket re-use case.
const handledSockets = new Set();
let metReusedSocket = false; // Flag for request loop termination.
const doubleEndResponse = (res) => {
// First end the request while sending some normal data
res.end('regular end of request', 'utf8', common.mustCall());
// Make sure the response socket is uncorked after first call of end
assert.strictEqual(res.writableCorked, 0);
res.end(); // Double end the response to prep for next socket re-use.
};
const sendDrainNeedingData = (res) => {
// Send data to socket more than the high watermark so that
// it definitely needs drain
const highWaterMark = res.socket.writableHighWaterMark;
const bufferToSend = Buffer.alloc(highWaterMark + 100);
const ret = res.write(bufferToSend); // Write the request data.
// Make sure that we had back pressure on response stream.
assert.strictEqual(ret, false);
res.once('drain', () => res.end()); // End on drain.
};
const server = http.createServer((req, res) => {
const { socket: responseSocket } = res;
if (handledSockets.has(responseSocket)) { // re-used socket, send big data!
metReusedSocket = true; // stop request loop
console.debug('FOUND REUSED SOCKET!');
sendDrainNeedingData(res);
} else { // not used again
// add to make sure we recognise it when we meet socket again
handledSockets.add(responseSocket);
doubleEndResponse(res);
}
});
server.listen(0); // Start the server on a random port.
const sendRequest = (agent) => new Promise((resolve, reject) => {
const timeout = setTimeout(common.mustNotCall(() => {
reject(new Error('Request timed out'));
}), REQ_TIMEOUT);
http.get({
port: server.address().port,
path: '/',
agent
}, common.mustCall((res) => {
const resData = [];
res.on('data', (data) => resData.push(data));
res.on('end', common.mustCall(() => {
const totalData = resData.reduce((total, elem) => total + elem.length, 0);
clearTimeout(timeout); // Cancel rejection timeout.
resolve(totalData); // fulfill promise
}));
}));
});
server.once('listening', async () => {
const testTimeout = setTimeout(common.mustNotCall(() => {
console.error('Test running for a while but could not met re-used socket');
process.exit(1);
}), TOTAL_TEST_TIMEOUT);
// Explicitly start agent to force socket reuse.
const agent = new http.Agent({ keepAlive: true });
// Start the request loop
let reqNo = 0;
while (!metReusedSocket) {
try {
console.log(`Sending req no ${++reqNo}`);
const totalData = await sendRequest(agent);
console.log(`${totalData} bytes were received for request ${reqNo}`);
} catch (err) {
console.error(err);
process.exit(1);
}
}
// Successfully tested conditions and ended loop
clearTimeout(testTimeout);
console.log('Closing server');
agent.destroy();
server.close();
});

View File

@@ -0,0 +1,38 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const onWriteAfterEndError = common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}, 2);
const server = http.createServer(common.mustCall(function(req, res) {
res.end('testing ended state', common.mustCall());
assert.strictEqual(res.writableCorked, 0);
res.end(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
assert.strictEqual(res.writableCorked, 0);
res.end('end', onWriteAfterEndError);
assert.strictEqual(res.writableCorked, 0);
res.on('error', onWriteAfterEndError);
res.on('finish', common.mustCall(() => {
res.end(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
server.close();
}));
}));
}));
server.listen(0);
server.on('listening', common.mustCall(function() {
http
.request({
port: server.address().port,
method: 'GET',
path: '/'
})
.end();
}));