Compare commits

...

31 Commits

Author SHA1 Message Date
Ciro Spaciari
e3b2e254b1 Merge branch 'main' into ciro/fix-http-chunked-delay 2024-12-05 17:02:50 -08:00
Ciro Spaciari
f4b27f6108 Merge branch 'main' into ciro/fix-http-chunked-delay 2024-12-04 18:37:54 -08:00
Ciro Spaciari
62b69840cc Merge branch 'main' into ciro/fix-http-chunked-delay 2024-12-04 06:31:45 -08:00
cirospaciari
68ba3671ae Apply formatting changes 2024-09-09 18:26:44 +00:00
Ciro Spaciari
8531376b86 opsie 2024-09-09 11:24:56 -07:00
Ciro Spaciari
82dfdf5c66 comments 2024-09-09 11:24:56 -07:00
Ciro Spaciari
0fc33af53a noop 2024-09-09 11:24:56 -07:00
Ciro Spaciari
4120d400b5 TODO 2024-09-09 11:24:56 -07:00
Ciro Spaciari
cf937804a2 avoid setTimeout if Content-Length is present 2024-09-09 11:24:56 -07:00
Ciro Spaciari
dc53c32a52 dont break stuff 2024-09-09 11:24:56 -07:00
Ciro Spaciari
7fbe40f466 todo 2024-09-09 11:24:56 -07:00
Ciro Spaciari
edfde229c7 remove extra flushs 2024-09-09 11:24:56 -07:00
Ciro Spaciari
5ebc12e57e dont need this here 2024-09-09 11:24:56 -07:00
Ciro Spaciari
3d684f6eec flush every write 2024-09-09 11:24:56 -07:00
Ciro Spaciari
2011788fe6 1 is fine 2024-09-09 11:24:56 -07:00
Ciro Spaciari
8295d802b0 undo unnecessary change 2024-09-09 11:24:56 -07:00
Ciro Spaciari
dc6c53824e await flush 2024-09-09 11:24:56 -07:00
Ciro Spaciari
111e320e56 always flush 2024-09-09 11:24:56 -07:00
Ciro Spaciari
4eb9f4aac4 writev 2024-09-09 11:24:56 -07:00
Ciro Spaciari
7e55176cc3 revert 2024-09-09 11:24:56 -07:00
Ciro Spaciari
bcff56a4b6 still fails sometimes 2024-09-09 11:24:56 -07:00
Ciro Spaciari
6f5df574e5 test + revert flush and investigate 2024-09-09 11:24:56 -07:00
Felix Zedén Yverås
bc503ce8ec test: add failing test for #13696
It seems that the http module does not stream the initial few chunks of
a streaming http response in real-time. This has caused issues when
using bun with testcontainers.

While I am not qualified to locate and resolve the root cause of this
issue, I can provide a breaking test, in the hopes it will help somebody
else locate and fix the issue.
2024-09-09 11:24:56 -07:00
Ciro Spaciari
528ad62d1e remember #3458 2024-09-09 11:24:56 -07:00
Ciro Spaciari
876ff4c199 test for headersSent 2024-09-09 11:24:56 -07:00
Ciro Spaciari
1ec4a5ce3e setTimeout vs nextTick 2024-09-09 11:24:56 -07:00
Ciro Spaciari
67b72b0515 opsie 2024-09-09 11:24:56 -07:00
Ciro Spaciari
5f3444ae3b cleanup 2024-09-09 11:24:56 -07:00
Ciro Spaciari
907509a36c set headersSent 2024-09-09 11:24:56 -07:00
Ciro Spaciari
71af9ef496 finished 2024-09-09 11:24:56 -07:00
Ciro Spaciari
4e213a12e4 flush first chunk on nextTick 2024-09-09 11:24:56 -07:00
2 changed files with 118 additions and 1 deletions

View File

@@ -1246,10 +1246,53 @@ ServerResponse.prototype._implicitHeader = function () {
this.writeHead(this.statusCode);
};
function flushFirstWrite(self) {
// headersSent = already flushed the first write
if (self.headersSent) return;
self.headersSent = true;
let firstWrite = self[firstWriteSymbol];
// at this point, the user did not call end and we have not flushed the first write
// we need to flush it now and behave like chunked encoding
self._reply(
new Response(
new ReadableStream({
type: "direct",
pull: async controller => {
self[controllerSymbol] = controller;
if (firstWrite) {
controller.write(firstWrite);
await controller.flush(); // flush the first write
}
firstWrite = undefined;
if (!self[finishedSymbol]) {
const { promise, resolve } = $newPromiseCapability(GlobalPromise);
self[deferredSymbol] = resolve;
return await promise;
}
},
}),
{
headers: self[headersSymbol],
status: self.statusCode,
statusText: self.statusMessage ?? STATUS_CODES[self.statusCode],
},
),
);
}
ServerResponse.prototype._write = function (chunk, encoding, callback) {
if (this[firstWriteSymbol] === undefined && !this.headersSent) {
this[firstWriteSymbol] = chunk;
callback();
const headers = this[headersSymbol];
const hasContentLength = headers && headers.has("Content-Length");
if (hasContentLength) {
// wait for .end()
return;
}
// We still wanna to wait for more writes if the user call 2 consecutives writes
// but if the user delay it too much we need to flush
setTimeout(flushFirstWrite, 1, this);
return;
}
@@ -1263,6 +1306,17 @@ ServerResponse.prototype._writev = function (chunks, callback) {
if (chunks.length === 1 && !this.headersSent && this[firstWriteSymbol] === undefined) {
this[firstWriteSymbol] = chunks[0].chunk;
callback();
const headers = this[headersSymbol];
const hasContentLength = headers && headers.has("Content-Length");
if (hasContentLength) {
// wait for .end()
return;
}
// We still wanna to wait for more writes if the user call 2 consecutives writes
// but if the user delay it too much we need to flush
setTimeout(flushFirstWrite, 1, this);
return;
}
@@ -1270,7 +1324,6 @@ ServerResponse.prototype._writev = function (chunks, callback) {
for (const chunk of chunks) {
controller.write(chunk.chunk);
}
callback();
});
};

View File

@@ -2347,6 +2347,22 @@ it("should emit close when connection is aborted", async () => {
}
});
it("must set headersSent to true after headers are sent #3458", async () => {
const server = createServer().listen(0);
try {
await once(server, "listening");
fetch(`http://localhost:${server.address().port}`).then(res => res.text());
const [req, res] = await once(server, "request");
expect(res.headersSent).toBe(false);
const { promise, resolve } = Promise.withResolvers();
res.end("OK", resolve);
await promise;
expect(res.headersSent).toBe(true);
} finally {
server.close();
}
});
it("should emit timeout event", async () => {
const server = http.createServer().listen(0);
try {
@@ -2425,6 +2441,54 @@ it("must set headersSent to true after headers are sent when using chunk encoded
server.close();
}
});
it("response body streaming is immediate (#13696)", async () => {
const totalChunks = 10;
const spacing = 50;
const acceptableDelay = 20;
let totalSize = 0;
let receivedSize = 0;
let server: Server;
try {
server = createServer(async (req, res) => {
res.writeHead(200, { "Content-Type": "text/plain" });
for (let i = 0; i < totalChunks; i++) {
const payload = `${new Date().getTime().toString()}\n`;
totalSize += payload.length;
res.write(payload);
if (i + 1 < totalChunks) await Bun.sleep(spacing);
}
res.end();
});
const url = await listen(server);
const res = await fetch(url);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let receivedChunks = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;
receivedChunks++;
receivedSize += value.byteLength;
// Verify that chunks are not held up longer than necessary
// at the receiver. This is likely to be in the single digits.
//
// #13696: Bun would delay the initial chunks and then send multiple
// chunks before real-time streaming started working.
expect(new Date().getTime() - Number.parseInt(decoder.decode(value).trimEnd(), 10)).toBeLessThan(acceptableDelay);
}
// Verify that the correct number of chunks were sent (in case server
// decides to send no chunks at all).
expect(receivedChunks).toEqual(totalChunks);
// Also verify the total size in case some data was lost.
expect(receivedSize).toEqual(totalSize);
} finally {
server.close();
}
});
it("should work when sending https.request with agent:false", async () => {
const { promise, resolve, reject } = Promise.withResolvers();