Compare commits

...

5 Commits

Author SHA1 Message Date
Jarred Sumner
568ab792ba Update fetch.stream.test.ts 2025-10-04 02:37:21 -07:00
Jarred Sumner
15ffd1b565 Update fetch.stream.test.ts 2025-10-04 02:36:30 -07:00
Jarred Sumner
5a8e41876c Update fetch.stream.test.ts 2025-10-04 01:46:36 -07:00
Jarred Sumner
3a0def4805 Fixup 2025-10-04 01:36:17 -07:00
Jarred Sumner
aea0f5ac83 Deflake fetch.stream.test.ts 2025-10-04 01:13:59 -07:00
2 changed files with 85 additions and 59 deletions

View File

@@ -179,8 +179,11 @@ pub fn decompressBytes(this: *InternalState, buffer: []const u8, body_out_str: *
this.decompressor.readAll(this.isDone()) catch |err| {
if (this.isDone() or error.ShortRead != err) {
Output.prettyErrorln("<r><red>Decompression error: {s}<r>", .{bun.asByteSlice(@errorName(err))});
Output.flush();
if (shouldLogDecompressionErrorsToStderr()) {
Output.prettyErrorln("<r><red>Decompression error: {s}<r>", .{bun.asByteSlice(@errorName(err))});
Output.flush();
}
return err;
}
};
@@ -206,8 +209,11 @@ pub fn processBodyBuffer(this: *InternalState, buffer: MutableString, is_final_c
else => {
if (!body_out_str.owns(buffer.list.items)) {
body_out_str.append(buffer.list.items) catch |err| {
Output.prettyErrorln("<r><red>Failed to append to body buffer: {s}<r>", .{bun.asByteSlice(@errorName(err))});
Output.flush();
if (shouldLogDecompressionErrorsToStderr()) {
Output.prettyErrorln("<r><red>Failed to append to body buffer: {s}<r>", .{bun.asByteSlice(@errorName(err))});
Output.flush();
}
return err;
};
}
@@ -219,6 +225,16 @@ pub fn processBodyBuffer(this: *InternalState, buffer: MutableString, is_final_c
const log = Output.scoped(.HTTPInternalState, .hidden);
// Doing this has caught bugs.
// But we should generally not do this.
// For now, we can leave it in for non-runtime usage of Bun.
fn shouldLogDecompressionErrorsToStderr() bool {
return switch (bun.cli.Cli.cmd orelse .AutoCommand) {
.RunCommand, .AutoCommand => false,
else => true,
};
}
const HTTPStage = enum {
pending,

View File

@@ -26,54 +26,58 @@ const bigText = Buffer.alloc(1 * 1024 * 1024, "a");
const smallText = Buffer.alloc(16 * "Hello".length, "Hello");
const empty = Buffer.alloc(0);
describe.concurrent("fetch() with streaming", () => {
[-1, 0, 20, 50, 100].forEach(timeout => {
it(`should be able to fail properly when reading from readable stream with timeout ${timeout}`, async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(
new ReadableStream({
async start(controller) {
controller.enqueue("Hello, World!");
await Bun.sleep(1000);
controller.enqueue("Hello, World!");
controller.close();
describe.concurrent(() => {
[100, 50, 20, 0, -1].forEach(timeout => {
["pull", "start"].forEach(via => {
it(`should be able to fail properly when reading from readable stream via ${via} with timeout ${timeout}`, async () => {
let ran;
using server = Bun.serve({
port: 0,
async fetch(req) {
async function fn(controller) {
if (ran) return;
ran = true;
controller.enqueue("Hello, World!");
}
return new Response(
new ReadableStream({
[via]: fn,
}),
{
status: 200,
headers: {
"Content-Type": "text/plain",
},
},
}),
{
status: 200,
headers: {
"Content-Type": "text/plain",
},
},
);
},
});
const server_url = `http://${server.hostname}:${server.port}`;
try {
const res = await fetch(server_url, {
signal: timeout < 0 ? AbortSignal.abort() : AbortSignal.timeout(timeout),
);
},
});
const reader = res.body?.getReader();
let results = [];
while (true) {
const { done, data } = await reader?.read();
if (data) results.push(data);
if (done) break;
const server_url = server.url.href;
try {
const signal = timeout < 0 ? AbortSignal.abort() : AbortSignal.timeout(timeout);
const res = await fetch(server_url, {
signal,
});
const reader = res.body?.getReader();
if (!signal.aborted) await new Promise(resolve => signal.addEventListener("abort", resolve, { once: true }));
while (true) {
const { done } = await reader?.read();
if (done) break;
}
expect.unreachable();
} catch (err: any) {
if (timeout < 0) {
if (err.name !== "AbortError") throw err;
expect(err.message).toBe("The operation was aborted.");
} else {
if (err.name !== "TimeoutError") throw err;
expect(err.message).toBe("The operation timed out.");
}
}
expect.unreachable();
} catch (err: any) {
if (timeout < 0) {
if (err.name !== "AbortError") throw err;
expect(err.message).toBe("The operation was aborted.");
} else {
if (err.name !== "TimeoutError") throw err;
expect(err.message).toBe("The operation timed out.");
}
}
});
});
});
@@ -104,7 +108,7 @@ describe.concurrent("fetch() with streaming", () => {
);
},
});
const server_url = `http://${server.hostname}:${server.port}`;
const server_url = server.url.href;
const res = await fetch(server_url, {});
const promise = res.text();
expect(async () => res.body?.getReader()).toThrow("ReadableStream is locked");
@@ -139,7 +143,7 @@ describe.concurrent("fetch() with streaming", () => {
},
});
const server_url = `http://${server.hostname}:${server.port}`;
const server_url = server.url.href;
const res = await fetch(server_url);
var body = res.body as ReadableStream<Uint8Array>;
const promise = res.text();
@@ -172,7 +176,7 @@ describe.concurrent("fetch() with streaming", () => {
});
},
});
const url = `http://${server.hostname}:${server.port}/`;
const url = server.url.href;
expect(await fetch(`${url}with_headers`).then(res => res.text())).toBe("Hello, World");
expect(await fetch(url).then(res => res.text())).toBe("Hello, World");
}
@@ -184,15 +188,19 @@ describe.concurrent("fetch() with streaming", () => {
try {
const errorHandler = (err: any) => expect(err).toBeUndefined();
server = http
.createServer(function (req, res) {
res.writeHead(200, { "Content-Type": "text/plain" });
const address = await new Promise<AddressInfo>((resolve, reject) => {
server = http
.createServer(function (req, res) {
res.writeHead(200, { "Content-Type": "text/plain" });
pipeline(createReadStream(file), res, errorHandler);
})
.listen(0);
pipeline(createReadStream(file), res, errorHandler);
})
.listen(0, err => {
if (err) reject(err);
else resolve(server!.address() as AddressInfo);
});
});
const address = server.address() as AddressInfo;
let url;
if (address.family == "IPv4") {
url = `http://${address.address}:${address.port}`;
@@ -256,8 +264,10 @@ describe.concurrent("fetch() with streaming", () => {
},
});
const url = server.url.href;
async function getReader() {
return (await fetch(`http://${server.hostname}:${server.port}`, {})).body?.getReader();
return (await fetch(url, {})).body?.getReader();
}
gcTick(false);
const reader = await getReader();