import { Socket } from "bun"; import { describe, expect, it } from "bun:test"; import { createReadStream, readFileSync } from "fs"; import { gcTick, isWindows, tempDirWithFilesAnon } from "harness"; import http from "http"; import type { AddressInfo } from "net"; import path, { join } from "path"; import { pipeline } from "stream"; import zlib from "zlib"; const files = [ join(import.meta.dir, "fixture.html"), join(import.meta.dir, "fixture.png"), join(import.meta.dir, "fixture.png.gz"), ]; const fixtures = { "fixture": readFileSync(join(import.meta.dir, "fixture.html")), "fixture.png": readFileSync(join(import.meta.dir, "fixture.png")), "fixture.png.gz": readFileSync(join(import.meta.dir, "fixture.png.gz")), }; const invalid = Buffer.from([0xc0]); const bigText = Buffer.alloc(1 * 1024 * 1024, "a"); const smallText = Buffer.alloc(16 * "Hello".length, "Hello"); const empty = Buffer.alloc(0); describe.concurrent("fetch() with streaming", () => { [-1, 0, 20, 50, 100].forEach(timeout => { // This test is flaky. // Sometimes, we don't throw if signal.abort(). We need to fix that. it.todo(`should be able to fail properly when reading from readable stream with timeout ${timeout}`, async () => { using server = Bun.serve({ port: 0, async fetch(req) { return new Response( new ReadableStream({ async start(controller) { controller.enqueue("Hello, World!"); await Bun.sleep(1000); controller.enqueue("Hello, World!"); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", }, }, ); }, }); const server_url = `http://${server.hostname}:${server.port}`; try { const res = await fetch(server_url, { signal: timeout < 0 ? AbortSignal.abort() : AbortSignal.timeout(timeout), }); const reader = res.body?.getReader(); let results = []; while (true) { const { done, data } = await reader?.read(); if (data) results.push(data); if (done) break; } expect.unreachable(); } catch (err: any) { if (timeout < 0) { if (err.name !== "AbortError") throw err; expect(err.message).toBe("The operation was aborted."); } else { if (err.name !== "TimeoutError") throw err; expect(err.message).toBe("The operation timed out."); } } }); }); it(`should be locked after start buffering`, async () => { using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ async start(controller) { controller.enqueue("Hello, World!"); await Bun.sleep(10); controller.enqueue("Hello, World!"); await Bun.sleep(10); controller.enqueue("Hello, World!"); await Bun.sleep(10); controller.enqueue("Hello, World!"); await Bun.sleep(10); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", }, }, ); }, }); const server_url = `http://${server.hostname}:${server.port}`; const res = await fetch(server_url, {}); const promise = res.text(); expect(async () => res.body?.getReader()).toThrow("ReadableStream is locked"); await promise; }); it(`should be locked after start buffering when calling getReader`, async () => { using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ async start(controller) { controller.enqueue("Hello, World!"); await Bun.sleep(10); controller.enqueue("Hello, World!"); await Bun.sleep(10); controller.enqueue("Hello, World!"); await Bun.sleep(10); controller.enqueue("Hello, World!"); await Bun.sleep(10); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", }, }, ); }, }); const server_url = `http://${server.hostname}:${server.port}`; const res = await fetch(server_url); var body = res.body as ReadableStream; const promise = res.text(); expect(() => body.getReader()).toThrow("ReadableStream is locked"); await promise; }); it("can deflate with and without headers #4478", async () => { { using server = Bun.serve({ port: 0, fetch(req) { if (req.url.endsWith("/with_headers")) { const content = zlib.deflateSync(Buffer.from("Hello, World")); return new Response(content, { headers: { "Content-Type": "text/plain", "Content-Encoding": "deflate", "Access-Control-Allow-Origin": "*", }, }); } const content = zlib.deflateRawSync(Buffer.from("Hello, World")); return new Response(content, { headers: { "Content-Type": "text/plain", "Content-Encoding": "deflate", "Access-Control-Allow-Origin": "*", }, }); }, }); const url = `http://${server.hostname}:${server.port}/`; expect(await fetch(`${url}with_headers`).then(res => res.text())).toBe("Hello, World"); expect(await fetch(url).then(res => res.text())).toBe("Hello, World"); } }); for (let file of files) { it("stream can handle response.body + await response.something() #4500", async () => { let server: ReturnType | null = null; try { const errorHandler = (err: any) => expect(err).toBeUndefined(); server = http .createServer(function (req, res) { res.writeHead(200, { "Content-Type": "text/plain" }); pipeline(createReadStream(file), res, errorHandler); }) .listen(0); const address = server.address() as AddressInfo; let url; if (address.family == "IPv4") { url = `http://${address.address}:${address.port}`; } else { url = `http://[${address.address}]:${address.port}`; } async function getRequestLen(url: string) { const response = await fetch(url); const hasBody = response.body; if (hasBody) { const res = await response.blob(); return res.size; } return 0; } for (let i = 0; i < 10; i++) { let len = await getRequestLen(url); if (len <= 0) { throw new Error("Request length is 0"); } await Bun.sleep(50); } expect(true).toBe(true); } finally { server?.closeAllConnections(); } }); } it("stream still works after response get out of scope", async () => { { const content = "Hello, world!\n".repeat(5); using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { const data = Buffer.from(content, "utf8"); const size = data.byteLength / 5; controller.write(data.slice(0, size)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size, size * 2)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 2, size * 3)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 3, size * 5)); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain" } }, ); }, }); async function getReader() { return (await fetch(`http://${server.hostname}:${server.port}`, {})).body?.getReader(); } gcTick(false); const reader = await getReader(); gcTick(false); var chunks = []; while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { chunks.push(value); } if (done) { break; } } gcTick(false); expect(chunks.length).toBeGreaterThan(1); expect(Buffer.concat(chunks).toString("utf8")).toBe(content); } }); it("response inspected size should reflect stream state", async () => { { const content = "Bun!\n".repeat(4); using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { const data = Buffer.from(content, "utf8"); const size = data.byteLength / 5; controller.write(data.slice(0, size)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size, size * 2)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 2, size * 3)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 3, size * 4)); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain" } }, ); }, }); function inspectBytes(response: Response) { const match = /Response \(([0-9]+ )bytes\)/g.exec( Bun.inspect(response, { depth: 0, }), ); if (!match) return 0; return parseInt(match[1]?.trim(), 10); } const res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); gcTick(false); let size = 0; while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { size += value.length; } expect(inspectBytes(res)).toBe(size); if (done) { break; } } gcTick(false); } }); it("can handle multiple simultaneos requests", async () => { { const content = "Hello, world!\n".repeat(5); using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { const data = Buffer.from(content, "utf8"); const size = data.byteLength / 5; controller.write(data.slice(0, size)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size, size * 2)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 2, size * 3)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 3, size * 5)); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", }, }, ); }, }); const server_url = `http://${server.hostname}:${server.port}`; async function doRequest() { await Bun.sleep(10); const res = await fetch(server_url); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); let parts = 0; while (true) { const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); parts++; } if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe(content); expect(parts).toBeGreaterThan(1); } await Promise.all([doRequest(), doRequest(), doRequest(), doRequest(), doRequest(), doRequest()]); } }); it(`can handle transforms`, async () => { { const content = "Hello, world!\n".repeat(5); using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { const data = Buffer.from(content, "utf8"); const size = data.byteLength / 5; controller.write(data.slice(0, size)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size, size * 2)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 2, size * 3)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 3, size * 5)); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", }, }, ); }, }); const server_url = `http://${server.hostname}:${server.port}`; const res = await fetch(server_url); const transform = new TransformStream({ transform(chunk, controller) { controller.enqueue(Buffer.from(chunk).toString("utf8").toUpperCase()); }, }); const reader = res.body?.pipeThrough(transform).getReader(); let result = ""; while (true) { const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { result += value; } if (done) { break; } } gcTick(false); expect(result).toBe(content.toUpperCase()); } }); it(`can handle gz images`, async () => { { using server = Bun.serve({ port: 0, fetch(req) { const data = fixtures["fixture.png.gz"]; return new Response(data, { status: 200, headers: { "Content-Type": "text/plain", "Content-Encoding": "gzip", }, }); }, }); const server_url = `http://${server.hostname}:${server.port}`; const res = await fetch(server_url); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); while (true) { const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); } if (done) { break; } } gcTick(false); expect(buffer).toEqual(fixtures["fixture.png"]); } }); it(`can proxy fetch with Bun.serve`, async () => { { const content = "a".repeat(64 * 1024); using server_original = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { const data = Buffer.from(content, "utf8"); const size = data.byteLength / 5; controller.write(data.slice(0, size)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size, size * 2)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 2, size * 3)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 3, size * 5)); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", }, }, ); }, }); using server = Bun.serve({ port: 0, async fetch(req) { const response = await fetch(`http://${server_original.hostname}:${server_original.port}`, {}); await Bun.sleep(10); return new Response(response.body, { status: 200, headers: { "Content-Type": "text/plain", }, }); }, }); let res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); let parts = 0; while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); parts++; } if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe(content); expect(parts).toBeGreaterThanOrEqual(1); } }); const matrix = [ { name: "small", data: fixtures["fixture"] }, { name: "small text", data: smallText }, { name: "big text", data: bigText }, { name: "img", data: fixtures["fixture.png"] }, { name: "empty", data: empty }, ]; for (let i = 0; i < matrix.length; i++) { const fixture = matrix[i]; for (let j = 0; j < matrix.length; j++) { const fixtureb = matrix[j]; it(`can handle fixture ${fixture.name} x ${fixtureb.name}`, async () => { { //@ts-ignore const data = fixture.data; //@ts-ignore const data_b = fixtureb.data; const content = Buffer.concat([data, data_b]); using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { controller.write(data); await controller.flush(); await Bun.sleep(100); controller.write(data_b); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", }, }, ); }, }); const server_url = `http://${server.hostname}:${server.port}`; const res = await fetch(server_url); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); while (true) { const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); } if (done) { break; } } gcTick(false); expect(buffer).toEqual(content); } }); } } const types = [ { headers: {}, compression: "no" }, { headers: { "Content-Encoding": "gzip" }, compression: "gzip" }, { headers: { "Content-Encoding": "gzip" }, compression: "gzip-libdeflate" }, { headers: { "Content-Encoding": "deflate" }, compression: "deflate" }, { headers: { "Content-Encoding": "deflate" }, compression: "deflate-libdeflate" }, { headers: { "Content-Encoding": "deflate" }, compression: "deflate_with_headers" }, { headers: { "Content-Encoding": "br" }, compression: "br" }, { headers: { "Content-Encoding": "zstd" }, compression: "zstd" }, ] as const; function compress(compression, data: Uint8Array) { switch (compression) { case "gzip-libdeflate": case "gzip": return Bun.gzipSync(data, { library: compression === "gzip-libdeflate" ? "libdeflate" : "zlib", level: 1, // fastest compression }); case "deflate-libdeflate": case "deflate": return Bun.deflateSync(data, { library: compression === "deflate-libdeflate" ? "libdeflate" : "zlib", level: 1, // fastest compression }); case "deflate_with_headers": return zlib.deflateSync(data, { level: 1, // fastest compression }); case "br": return zlib.brotliCompressSync(data, { params: { [zlib.constants.BROTLI_PARAM_QUALITY]: 0, [zlib.constants.BROTLI_PARAM_MODE]: zlib.constants.BROTLI_MODE_GENERIC, [zlib.constants.BROTLI_PARAM_SIZE_HINT]: 0, }, }); case "zstd": return zlib.zstdCompressSync(data, {}); default: return data; } } for (const { headers, compression, skip } of types) { const test = skip ? it.skip : it; test(`with invalid utf8 with ${compression} compression`, async () => { const content = Buffer.concat([invalid, Buffer.from("Hello, world!\n".repeat(5), "utf8"), invalid]); using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { const data = compress(compression, content); const size = data.byteLength / 4; controller.write(data.slice(0, size)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size, size * 2)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 2, size * 3)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 3, size * 4)); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", ...headers, }, }, ); }, }); let res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); } if (done) { break; } } gcTick(false); expect(buffer).toEqual(content); }); test(`chunked response works (single chunk) with ${compression} compression`, async () => { const content = "Hello, world!\n".repeat(5); using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { const data = compress(compression, Buffer.from(content, "utf8")); controller.write(data); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", ...headers, }, }, ); }, }); let res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const result = await res.text(); gcTick(false); expect(result).toBe(content); res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); let parts = 0; while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); parts++; } if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe(content); expect(parts).toBe(1); }); test(`chunked response works (multiple chunks) with ${compression} compression`, async () => { const content = "Hello, world!\n".repeat(5); using server = Bun.serve({ port: 0, fetch(req) { return new Response( new ReadableStream({ type: "direct", async pull(controller) { const data = compress(compression, Buffer.from(content, "utf8")); const size = data.byteLength / 5; controller.write(data.slice(0, size)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size, size * 2)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 2, size * 3)); await controller.flush(); await Bun.sleep(100); controller.write(data.slice(size * 3, size * 5)); await controller.flush(); controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", ...headers, }, }, ); }, }); let res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const result = await res.text(); gcTick(false); expect(result).toBe(content); res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); let parts = 0; while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); } parts++; if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe(content); expect(parts).toBeGreaterThan(1); }); test(`Content-Length response works (single part) with ${compression} compression`, async () => { const content = "a".repeat(1024); using server = Bun.serve({ port: 0, fetch(req) { return new Response(compress(compression, Buffer.from(content)), { status: 200, headers: { "Content-Type": "text/plain", ...headers, }, }); }, }); let res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const result = await res.text(); gcTick(false); expect(result).toBe(content); res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); let parts = 0; while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); parts++; } if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe(content); expect(parts).toBe(1); }); test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => { const rawBytes = Buffer.allocUnsafe(1024 * 1024); // Random data doesn't compress well. We need enough random data that // the compressed data is larger than 64 bytes. require("crypto").randomFillSync(rawBytes); const content = rawBytes.toString("hex"); const contentBuffer = Buffer.from(content); const data = compress(compression, contentBuffer); var onReceivedHeaders = Promise.withResolvers(); using server = Bun.serve({ port: 0, async fetch(req) { return new Response( new ReadableStream({ async pull(controller) { // Ensure we actually send it over the network in multiple chunks. let tenth = (data.length / 10) | 0; let remaining = data; while (remaining.length > 0) { const chunk = remaining.subarray(0, Math.min(tenth, remaining.length)); controller.enqueue(chunk); if (remaining === data) { await onReceivedHeaders.promise; } remaining = remaining.subarray(chunk.length); await Bun.sleep(1); } controller.close(); }, }), { status: 200, headers: { "Content-Type": "text/plain", ...headers, }, }, ); }, }); let res = await fetch(`http://${server.hostname}:${server.port}`, {}); let onReceiveHeadersResolve = onReceivedHeaders.resolve; onReceivedHeaders = Promise.withResolvers(); onReceiveHeadersResolve(); gcTick(false); const result = await res.text(); gcTick(false); expect(result).toBe(content); res = await fetch(`http://${server.hostname}:${server.port}`, {}); onReceiveHeadersResolve = onReceivedHeaders.resolve; onReceivedHeaders = Promise.withResolvers(); onReceiveHeadersResolve(); gcTick(false); const reader = res.body?.getReader(); let chunks: Uint8Array[] = []; let currentRange = 0; while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { chunks.push(value); // Check the content is what is expected at this time. // We're avoiding calling .buffer since that changes the internal representation in JSC and we want to test the raw data. expect(contentBuffer.compare(value, undefined, undefined, currentRange, currentRange + value.length)).toEqual( 0, ); currentRange += value.length; } if (done) { break; } } gcTick(false); expect(Buffer.concat(chunks).toString("utf8")).toBe(content); expect(chunks.length).toBeGreaterThan(1); currentRange = 0; for (const chunk of chunks) { // Check that each chunk hasn't been modified. // We want to be 100% sure that there is no accidental memory re-use here. expect(contentBuffer.compare(chunk, undefined, undefined, currentRange, currentRange + chunk.length)).toEqual( 0, ); currentRange += chunk.length; } }); test(`Extra data should be ignored on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { const parts = 5; const content = "Hello".repeat(parts); using server = Bun.listen({ port: 0, hostname: "0.0.0.0", socket: { async open(socket) { var corked: any[] = []; var cork = true; async function write(chunk: any) { await new Promise((resolve, reject) => { if (cork) { corked.push(chunk); } if (!cork && corked.length) { socket.write(corked.join("")); corked.length = 0; socket.flush(); } if (!cork) { socket.write(chunk); socket.flush(); } resolve(); }); } const compressed = compress(compression, Buffer.from(content, "utf8")); await write("HTTP/1.1 200 OK\r\n"); await write("Content-Type: text/plain\r\n"); for (const [key, value] of Object.entries(headers)) { await write(key + ": " + value + "\r\n"); } await write("Content-Length: " + compressed.byteLength + "\r\n"); await write("\r\n"); const size = compressed.byteLength / 5; for (var i = 0; i < 5; i++) { cork = false; await write(compressed.slice(size * i, size * (i + 1))); } await write("Extra Data!"); await write("Extra Data!"); socket.flush(); }, drain(socket) {}, }, }); const res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); } if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe(content); }); test(`Missing data should timeout on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { const parts = 5; const content = "Hello".repeat(parts); using server = Bun.listen({ port: 0, hostname: "0.0.0.0", socket: { async open(socket) { var corked: any[] = []; var cork = true; async function write(chunk: any) { await new Promise((resolve, reject) => { if (cork) { corked.push(chunk); } if (!cork && corked.length) { socket.write(corked.join("")); corked.length = 0; socket.flush(); } if (!cork) { socket.write(chunk); socket.flush(); } resolve(); }); } const compressed = compress(compression, Buffer.from(content, "utf8")); await write("HTTP/1.1 200 OK\r\n"); await write("Content-Type: text/plain\r\n"); for (const [key, value] of Object.entries(headers)) { await write(key + ": " + value + "\r\n"); } // 10 extra missing bytes that we will never sent await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); await write("\r\n"); const size = compressed.byteLength / 5; for (var i = 0; i < 5; i++) { cork = false; await write(compressed.slice(size * i, size * (i + 1))); } socket.flush(); }, drain(socket) {}, }, }); try { const res = await fetch(`http://${server.hostname}:${server.port}`, { signal: AbortSignal.timeout(1000), }); gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); } if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe("unreachable"); } catch (err) { expect((err as Error).name).toBe("TimeoutError"); } }); if (compression !== "no") { test(`can handle corrupted ${compression} compression`, async () => { { const parts = 5; const content = "Hello".repeat(parts); using server = Bun.listen({ port: 0, hostname: "0.0.0.0", socket: { async open(socket) { var corked: any[] = []; var cork = true; async function write(chunk: any) { await new Promise((resolve, reject) => { if (cork) { corked.push(chunk); } if (!cork && corked.length) { socket.write(corked.join("")); corked.length = 0; socket.flush(); } if (!cork) { socket.write(chunk); socket.flush(); } resolve(); }); } const compressed = compress(compression, Buffer.from(content, "utf8")); await write("HTTP/1.1 200 OK\r\n"); await write("Content-Type: text/plain\r\n"); for (const [key, value] of Object.entries(headers)) { await write(key + ": " + value + "\r\n"); } // 10 extra missing bytes that we will never sent in this case we will wait to close await write("Content-Length: " + compressed.byteLength + "\r\n"); await write("\r\n"); const size = compressed.byteLength / 5; compressed[0] = 0; // corrupt data cork = false; for (var i = 0; i < 5; i++) { compressed[size * i] = 0; // corrupt data even more await write(compressed.slice(size * i, size * (i + 1))); } socket.flush(); }, drain(socket) {}, }, }); try { const res = await fetch(`http://${server.hostname}:${server.port}`, {}); gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); while (true) { gcTick(false); const read_promise = reader?.read(); const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); } if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe("unreachable"); } catch (err) { if (compression === "br") { expect((err as Error).name).toBe("Error"); expect((err as Error).code).toBe("BrotliDecompressionError"); } else if (compression === "deflate-libdeflate") { expect((err as Error).name).toBe("Error"); expect((err as Error).code).toBe("ZlibError"); } else if (compression === "zstd") { expect((err as Error).name).toBe("Error"); expect((err as Error).code).toBe("ZstdDecompressionError"); } else { expect((err as Error).name).toBe("Error"); expect((err as Error).code).toBe("ZlibError"); } } } }); } test(`can handle socket close with ${compression} compression`, async () => { const parts = 5; const content = "Hello".repeat(parts); const { promise, resolve: resolveSocket } = Promise.withResolvers(); using server = Bun.listen({ port: 0, hostname: "0.0.0.0", socket: { async open(socket) { var corked: any[] = []; var cork = true; async function write(chunk: any) { await new Promise((resolve, reject) => { if (cork) { corked.push(chunk); } if (!cork && corked.length) { socket.write(corked.join("")); corked.length = 0; socket.flush(); } if (!cork) { socket.write(chunk); socket.flush(); } resolve(); }); } const compressed = compress(compression, Buffer.from(content, "utf8")); await write("HTTP/1.1 200 OK\r\n"); await write("Content-Type: text/plain\r\n"); for (const [key, value] of Object.entries(headers)) { await write(key + ": " + value + "\r\n"); } // 10 extra missing bytes that we will never sent in this case we will wait to close await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); await write("\r\n"); resolveSocket(socket); const size = compressed.byteLength / 5; for (var i = 0; i < 5; i++) { cork = false; await write(compressed.slice(size * i, size * (i + 1))); } socket.flush(); }, drain(socket) {}, }, }); let socket: Socket | null = null; try { const res = await fetch(`http://${server.hostname}:${server.port}`, {}); socket = await promise; gcTick(false); const reader = res.body?.getReader(); let buffer = Buffer.alloc(0); while (true) { gcTick(false); const read_promise = reader?.read(); socket?.end(); socket = null; const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult; if (value) { buffer = Buffer.concat([buffer, value]); } if (done) { break; } } gcTick(false); expect(buffer.toString("utf8")).toBe("unreachable"); } catch (err) { expect((err as Error).name).toBe("Error"); expect((err as Error).code).toBe("ECONNRESET"); } }); } it.skipIf( // The C program is POSIX only isWindows, )("should drain response body from HTTP thread when server sends chunk then stops (chunked encoding)", async () => { // This test reproduces a bug where the HTTP client wasn't asking the HTTP thread // to drain pending response body bytes. If the server sent headers + first chunk, // then stopped sending data (but kept connection open), the read would hang forever. // // We use a C server with blocking sockets instead of Bun.listen because Bun's sockets // are non-blocking and event-driven, which makes it difficult to reliably reproduce // the exact timing conditions needed to trigger this bug. The C server uses blocking // write() calls that ensure data is buffered in the kernel before the server stops // sending, forcing the HTTP client to drain the response body from the HTTP thread. const dir = tempDirWithFilesAnon({ "a": "// a" }); { await using proc = Bun.spawn({ cmd: [ "cc", "-Wno-error", "-w", path.join(import.meta.dirname, "http-chunked-server.c"), "-o", "http-chunked-server", ], cwd: dir, stdout: "inherit", stderr: "inherit", stdin: "ignore", }); expect(await proc.exited).toBe(0); } await using server = Bun.spawn({ cmd: [path.join(dir, "http-chunked-server")], stdout: "pipe", stderr: "inherit", stdin: "ignore", }); const url = new URL("http://127.0.0.1:" + (await server.stdout.text()).trim()); const response = await fetch(url.toString(), {}); const reader = response.body!.getReader(); // Read the data - this should not hang const result = (await reader.read()) as ReadableStreamDefaultReadResult; // Verify we got the data without hanging expect(result.done).toBe(false); expect(result.value).toBeDefined(); expect(new TextDecoder().decode(result.value!)).toBe("hello\n"); server.kill("SIGTERM"); }); });