Files
bun.sh/test/js/node/http/node-http-connect.test.ts
Ciro Spaciari 85271f9dd9 fix(node:http) allow CONNECT in node http/https servers (#22756)
### What does this PR do?
Fixes https://github.com/oven-sh/bun/issues/22755
Fixes https://github.com/oven-sh/bun/issues/19790
Fixes https://github.com/oven-sh/bun/issues/16372
### How did you verify your code works?

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-09-23 16:46:59 -07:00

465 lines
14 KiB
TypeScript

import { describe, expect, test } from "bun:test";
import { bunEnv, bunExe, nodeExe } from "harness";
import http from "http";
import { once } from "node:events";
import type { AddressInfo } from "node:net";
import net from "node:net";
import { join } from "node:path";
function connectClient(proxyAddress: AddressInfo, targetAddress: AddressInfo, add_http_prefix: boolean) {
const client = net.connect({ port: proxyAddress.port, host: proxyAddress.address }, () => {
client.write(
`CONNECT ${add_http_prefix ? "http://" : ""}${targetAddress.address}:${targetAddress.port} HTTP/1.1\r\nHost: ${targetAddress.address}:${targetAddress.port}\r\nProxy-Authorization: Basic dXNlcjpwYXNzd29yZA==\r\n\r\n`,
);
});
const received: string[] = [];
const { promise, resolve, reject } = Promise.withResolvers<string>();
client.on("data", data => {
if (data.toString().includes("200 Connection established")) {
client.write("GET / HTTP/1.1\r\nHost: www.example.com:80\r\nConnection: close\r\n\r\n");
}
received.push(data.toString());
});
client.on("error", reject);
client.on("end", () => {
resolve(received.join(""));
});
return promise;
}
const BIG_DATA = Buffer.alloc(1024 * 1024 * 64, "bun").toString();
describe("HTTP server CONNECT", () => {
test("should handle backpressure", async () => {
const responseHeader = "HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n";
await using proxyServer = http.createServer((req, res) => {
res.end("Hello World from proxy server");
});
await using targetServer = net.createServer(socket => {
socket.write(responseHeader, () => {
socket.write(BIG_DATA, () => {
//TODO: is this a net bug? on windows the connection is closed before everything is sended
Bun.sleep(100).then(() => {
socket.end();
});
});
});
});
let proxyHeaders = {};
proxyServer.on("connect", (req, socket, head) => {
proxyHeaders = req.headers;
const [host, port] = req.url?.split(":") ?? [];
const serverSocket = net.connect(parseInt(port), host, async () => {
socket.write(`HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n`);
serverSocket.pipe(socket);
socket.pipe(serverSocket);
});
serverSocket.on("error", err => {
socket.end("HTTP/1.1 502 Bad Gateway\r\n\r\n");
});
socket.on("error", err => {
serverSocket.destroy();
});
socket.on("end", () => serverSocket.end());
serverSocket.on("end", () => socket.end());
});
await once(proxyServer.listen(0, "127.0.0.1"), "listening");
const proxyAddress = proxyServer.address() as AddressInfo;
await once(targetServer.listen(0, "127.0.0.1"), "listening");
const targetAddress = targetServer.address() as AddressInfo;
{
const response = await connectClient(proxyAddress, targetAddress, false);
expect(proxyHeaders["proxy-authorization"]).toBe("Basic dXNlcjpwYXNzd29yZA==");
expect(response).toContain("HTTP/1.1 200 OK");
expect(response.length).toBeGreaterThan(responseHeader.length + BIG_DATA.length);
expect(response).toContain(BIG_DATA);
}
});
test("should handle data, drain, end and close events", async () => {
await using proxyServer = http.createServer((req, res) => {
res.end("Hello World from proxy server");
});
await once(proxyServer.listen(0, "127.0.0.1"), "listening");
const proxyAddress = proxyServer.address() as AddressInfo;
let data_received: string[] = [];
let client_data_received: string[] = [];
let proxy_drain_received = false;
let proxy_end_received = false;
const { promise, resolve, reject } = Promise.withResolvers<string>();
const { promise: clientPromise, resolve: clientResolve, reject: clientReject } = Promise.withResolvers<string>();
const clientSocket = net.connect(proxyAddress.port, proxyAddress.address, () => {
clientSocket.on("error", clientReject);
clientSocket.on("data", chunk => {
client_data_received.push(chunk?.toString());
});
clientSocket.on("end", () => {
clientSocket.end();
clientResolve(client_data_received.join(""));
});
clientSocket.write("CONNECT localhost:80 HTTP/1.1\r\nHost: localhost:80\r\nConnection: close\r\n\r\n");
});
proxyServer.on("connect", (req, socket, head) => {
expect(head).toBeInstanceOf(Buffer);
socket.on("data", chunk => {
data_received.push(chunk?.toString());
});
socket.on("end", () => {
proxy_end_received = true;
});
socket.on("close", () => {
resolve(data_received.join(""));
});
socket.on("drain", () => {
proxy_drain_received = true;
socket.end();
});
socket.on("error", reject);
proxy_drain_received = false;
// write until backpressure
while (socket.write(BIG_DATA)) {}
clientSocket.write("Hello World");
});
expect(await promise).toContain("Hello World");
expect(await clientPromise).toContain(BIG_DATA);
expect(proxy_drain_received).toBe(true);
expect(proxy_end_received).toBe(true);
});
test("should handle CONNECT with invalid target", async () => {
await using proxyServer = http.createServer((req, res) => {
res.end("Hello World from proxy server");
});
proxyServer.on("connect", (req, socket, head) => {
const [host, port] = req.url?.split(":") ?? [];
const serverSocket = net.connect(parseInt(port) || 80, host, () => {
socket.write(`HTTP/1.1 200 Connection established\r\n\r\n`);
serverSocket.pipe(socket);
socket.pipe(serverSocket);
});
serverSocket.on("error", err => {
socket.write("HTTP/1.1 502 Bad Gateway\r\n\r\n");
socket.end();
});
socket.on("error", () => serverSocket.destroy());
});
await once(proxyServer.listen(0, "127.0.0.1"), "listening");
const proxyAddress = proxyServer.address() as AddressInfo;
const client = net.connect(proxyAddress.port, proxyAddress.address, () => {
client.write("CONNECT invalid.host.that.does.not.exist:9999 HTTP/1.1\r\nHost: invalid.host:9999\r\n\r\n");
});
const { promise, resolve } = Promise.withResolvers<string>();
const received: string[] = [];
client.on("data", data => {
received.push(data.toString());
});
client.on("end", () => {
resolve(received.join(""));
});
const response = await promise;
expect(response).toContain("502 Bad Gateway");
});
// TODO: timeout is not supported in bun socket yet
test.todo("should handle socket timeout", async () => {
await using proxyServer = http.createServer();
let timeoutFired = false;
proxyServer.on("connect", (req, socket, head) => {
socket.setTimeout(100);
socket.on("timeout", () => {
timeoutFired = true;
socket.write("HTTP/1.1 408 Request Timeout\r\n\r\n");
socket.end();
});
// Don't send any response immediately
});
await once(proxyServer.listen(0, "127.0.0.1"), "listening");
const proxyAddress = proxyServer.address() as AddressInfo;
const client = net.connect(proxyAddress.port, proxyAddress.address, () => {
client.write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com\r\n\r\n");
});
const { promise, resolve } = Promise.withResolvers<string>();
const received: string[] = [];
client.on("data", data => {
received.push(data.toString());
});
client.on("end", () => {
resolve(received.join(""));
});
const response = await promise;
expect(timeoutFired).toBe(true);
expect(response).toContain("408 Request Timeout");
});
//TODO pause and resume only not supported in bun socket yet
test.todo("should handle socket pause and resume", async () => {
await using proxyServer = http.createServer();
let pauseCount = 0;
let resumeCount = 0;
proxyServer.on("connect", (req, socket, head) => {
socket.write("HTTP/1.1 200 Connection established\r\n\r\n");
// Simulate backpressure scenario
const interval = setInterval(() => {
const canWrite = socket.write("X".repeat(1024));
if (!canWrite) {
pauseCount++;
socket.pause();
setTimeout(() => {
resumeCount++;
socket.resume();
}, 50);
}
}, 10);
socket.on("end", () => {
clearInterval(interval);
socket.end();
});
});
await once(proxyServer.listen(0, "127.0.0.1"), "listening");
const proxyAddress = proxyServer.address() as AddressInfo;
const client = net.connect(proxyAddress.port, proxyAddress.address, () => {
client.write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com\r\n\r\n");
setTimeout(() => client.end(), 200);
});
const { promise, resolve } = Promise.withResolvers<number>();
let bytesReceived = 0;
client.on("data", data => {
bytesReceived += data.length;
});
client.on("end", () => {
resolve(bytesReceived);
});
const totalBytes = await promise;
expect(totalBytes).toBeGreaterThan(0);
expect(pauseCount).toBeGreaterThan(0);
expect(resumeCount).toBeGreaterThan(0);
});
test("should handle malformed CONNECT requests", async () => {
await using proxyServer = http.createServer();
proxyServer.on("connect", (req, socket, head) => {
// This shouldn't be reached for malformed requests
socket.write("HTTP/1.1 200 Connection established\r\n\r\n");
socket.end();
});
await once(proxyServer.listen(0, "127.0.0.1"), "listening");
const proxyAddress = proxyServer.address() as AddressInfo;
// Test various malformed requests
const malformedRequests = [
"CONNECT\r\n\r\n", // Missing target
"CONNECT example.com HTTP/1.1\r\n\r\n", // Missing port
"CONNECT :80 HTTP/1.1\r\n\r\n", // Missing host
"CONNEC example.com:80 HTTP/1.1\r\n\r\n", // Typo in method
"CONNECT example.com:80\r\n\r\n", // Missing HTTP version
];
for (const request of malformedRequests) {
const client = net.connect(proxyAddress.port, proxyAddress.address, () => {
client.write(request);
});
const { promise, resolve } = Promise.withResolvers<string>();
const received: string[] = [];
client.on("data", data => {
received.push(data.toString());
});
client.on("end", () => {
resolve(received.join(""));
});
client.on("error", () => {
resolve("CONNECTION_ERROR");
});
setTimeout(() => {
client.end();
resolve(received.join("") || "TIMEOUT");
}, 100);
const response = await promise;
// Should either get an error response or timeout/connection error
expect(response).not.toContain("200 Connection established");
}
});
});
/**
* Test variations using normal HTTP requests and res.socket
* These tests should run in both Node.js and Bun
*/
describe("HTTP server socket access via normal requests", () => {
//TODO: right now http server socket dont emit error event
test.todo("should handle socket errors during normal requests", async () => {
let errorHandled = false;
await using server = http.createServer((req, res) => {
const socket = res.socket!;
socket.on("error", err => {
errorHandled = true;
});
// Simulate an error condition
setTimeout(() => {
socket.destroy(new Error("Simulated error"));
}, 50);
});
await once(server.listen(0, "127.0.0.1"), "listening");
const serverAddress = server.address() as AddressInfo;
const client = net.connect(serverAddress.port, serverAddress.address, () => {
client.write("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n");
});
const { promise, resolve } = Promise.withResolvers<boolean>();
client.on("error", () => {
resolve(true);
});
client.on("close", () => {
resolve(false);
});
await promise;
expect(errorHandled).toBe(true);
});
test.todo("should handle socket pause/resume during request", async () => {
const largeData = Buffer.alloc(1024 * 1024, "x").toString();
let pauseCount = 0;
let resumeCount = 0;
await using server = http.createServer((req, res) => {
const socket = res.socket!;
// Monitor socket state
const originalPause = socket.pause.bind(socket);
const originalResume = socket.resume.bind(socket);
socket.pause = function () {
pauseCount++;
return originalPause();
};
socket.resume = function () {
resumeCount++;
return originalResume();
};
// Send large response to trigger backpressure
res.writeHead(200, { "Content-Type": "text/plain" });
const sendData = () => {
let ok = true;
while (ok) {
ok = res.write(largeData);
if (!ok) {
// Wait for drain event
res.once("drain", sendData);
break;
}
}
};
sendData();
setTimeout(() => res.end(), 100);
});
await once(server.listen(0, "127.0.0.1"), "listening");
const serverAddress = server.address() as AddressInfo;
const client = net.connect(serverAddress.port, serverAddress.address, () => {
client.write("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n");
});
const { promise, resolve } = Promise.withResolvers<number>();
let bytesReceived = 0;
// Slow reader to trigger backpressure
client.on("data", chunk => {
bytesReceived += chunk.length;
client.pause();
setTimeout(() => client.resume(), 10);
});
client.on("end", () => {
resolve(bytesReceived);
});
const total = await promise;
expect(total).toBeGreaterThan(0);
});
});
describe("Should be compatible with node.js", () => {
test("tests should run on node.js", async () => {
const process = Bun.spawn({
cmd: [nodeExe(), "--test", join(import.meta.dir, "node-http-connect.node.mts")],
stdout: "inherit",
stderr: "inherit",
stdin: "ignore",
env: bunEnv,
});
expect(await process.exited).toBe(0);
});
test("tests should run on bun", async () => {
const process = Bun.spawn({
cmd: [bunExe(), "test", join(import.meta.dir, "node-http-connect.node.mts")],
stdout: "inherit",
stderr: "inherit",
stdin: "ignore",
env: bunEnv,
});
expect(await process.exited).toBe(0);
});
});