mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 23:18:47 +00:00
Compare commits
4 Commits
claude/fix
...
claude/rea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ac4770a9e | ||
|
|
63d7670757 | ||
|
|
a06735ba70 | ||
|
|
ab776f8526 |
@@ -47,6 +47,7 @@ const {
|
||||
const { Agent, NODE_HTTP_WARNING } = require("node:_http_agent");
|
||||
const { IncomingMessage } = require("node:_http_incoming");
|
||||
const { OutgoingMessage } = require("node:_http_outgoing");
|
||||
const { freeParser, parsers, HTTPParser, isLenient, prepareError } = require("node:_http_common");
|
||||
|
||||
const globalReportError = globalThis.reportError;
|
||||
const setTimeout = globalThis.setTimeout;
|
||||
@@ -55,6 +56,29 @@ const fetch = Bun.fetch;
|
||||
|
||||
const { URL } = globalThis;
|
||||
|
||||
// HTTP parser constants for lenient parsing
|
||||
const kLenientNone = 0;
|
||||
const kLenientHeaders = 1 << 0;
|
||||
const kLenientChunkedLength = 1 << 1;
|
||||
const kLenientTransferEncoding = 1 << 2;
|
||||
const kLenientVersion = 1 << 3;
|
||||
const kLenientDataAfterClose = 1 << 4;
|
||||
const kLenientOptionalLFAfterCR = 1 << 5;
|
||||
const kLenientOptionalCRLFAfterChunk = 1 << 6;
|
||||
const kLenientOptionalCRBeforeLF = 1 << 7;
|
||||
const kLenientSpacesAfterChunkSize = 1 << 8;
|
||||
|
||||
const kLenientAll =
|
||||
kLenientHeaders |
|
||||
kLenientChunkedLength |
|
||||
kLenientTransferEncoding |
|
||||
kLenientVersion |
|
||||
kLenientDataAfterClose |
|
||||
kLenientOptionalLFAfterCR |
|
||||
kLenientOptionalCRLFAfterChunk |
|
||||
kLenientOptionalCRBeforeLF |
|
||||
kLenientSpacesAfterChunkSize;
|
||||
|
||||
// Primordials
|
||||
const ObjectAssign = Object.assign;
|
||||
const RegExpPrototypeExec = RegExp.prototype.exec;
|
||||
@@ -67,6 +91,157 @@ function emitErrorEventNT(self, err) {
|
||||
}
|
||||
}
|
||||
|
||||
function statusIsInformational(status) {
|
||||
return status >= 100 && status < 200;
|
||||
}
|
||||
|
||||
// Parser callback for handling incoming responses (from Node.js implementation)
|
||||
function parserOnIncomingClient(res, shouldKeepAlive) {
|
||||
const socket = this.socket;
|
||||
const req = socket._httpMessage;
|
||||
|
||||
if (req.res) {
|
||||
// We already have a response object, something is wrong
|
||||
socket.destroy();
|
||||
return 0;
|
||||
}
|
||||
req.res = res;
|
||||
res.req = req;
|
||||
|
||||
// Handle upgrade responses
|
||||
if (res.upgrade) {
|
||||
return 2; // Skip body and treat as Upgrade
|
||||
}
|
||||
|
||||
// Handle CONNECT method responses
|
||||
if (req.method === "CONNECT") {
|
||||
res.upgrade = true;
|
||||
return 2; // Skip body and treat as Upgrade
|
||||
}
|
||||
|
||||
// Handle informational responses (1xx status codes)
|
||||
if (statusIsInformational(res.statusCode)) {
|
||||
req.res = null; // Clear res so we can handle the final response
|
||||
if (res.statusCode === 100) {
|
||||
req.emit("continue");
|
||||
}
|
||||
req.emit("information", {
|
||||
statusCode: res.statusCode,
|
||||
statusMessage: res.statusMessage,
|
||||
httpVersion: res.httpVersion,
|
||||
httpVersionMajor: res.httpVersionMajor,
|
||||
httpVersionMinor: res.httpVersionMinor,
|
||||
headers: res.headers,
|
||||
rawHeaders: res.rawHeaders,
|
||||
});
|
||||
return 1; // Skip body but don't treat as Upgrade
|
||||
}
|
||||
|
||||
// Emit the response event
|
||||
process.nextTick(() => {
|
||||
if (!req.aborted && !req.emit("response", res)) {
|
||||
// If no listeners, dump the response
|
||||
res._dump();
|
||||
}
|
||||
});
|
||||
|
||||
return 0; // No special treatment
|
||||
}
|
||||
|
||||
// Socket event handlers (from Node.js implementation)
|
||||
function socketOnData(d) {
|
||||
const socket = this;
|
||||
const req = socket._httpMessage;
|
||||
const parser = socket.parser;
|
||||
|
||||
if (!parser) return;
|
||||
|
||||
const ret = parser.execute(d);
|
||||
if (ret instanceof Error) {
|
||||
prepareError(ret, parser, d);
|
||||
freeParser(parser, req, socket);
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle upgrades/CONNECT
|
||||
if (parser.incoming && parser.incoming.upgrade) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
function socketOnEnd() {
|
||||
const socket = this;
|
||||
const req = socket._httpMessage;
|
||||
const parser = socket.parser;
|
||||
|
||||
if (!req.res && !req._hadError) {
|
||||
req._hadError = true;
|
||||
emitErrorEventNT(req, new ConnResetException("socket hang up"));
|
||||
}
|
||||
if (parser) {
|
||||
parser.finish();
|
||||
freeParser(parser, req, socket);
|
||||
}
|
||||
}
|
||||
|
||||
function socketOnError(err) {
|
||||
const socket = this;
|
||||
const req = socket._httpMessage;
|
||||
|
||||
if (req) {
|
||||
req.emit("error", err);
|
||||
}
|
||||
}
|
||||
|
||||
function socketOnClose() {
|
||||
const socket = this;
|
||||
const req = socket._httpMessage;
|
||||
const parser = socket.parser;
|
||||
|
||||
if (parser) {
|
||||
freeParser(parser, req, socket);
|
||||
}
|
||||
|
||||
if (req) {
|
||||
if (!req.res || !req.res.complete) {
|
||||
req.emit("error", new ConnResetException("socket hang up"));
|
||||
}
|
||||
req.emit("close");
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize parser on socket connection (from Node.js implementation)
|
||||
function tickOnSocket(req, socket) {
|
||||
const parser = parsers.alloc();
|
||||
const lenient = req.insecureHTTPParser === undefined ? isLenient() : req.insecureHTTPParser;
|
||||
|
||||
// Initialize parser for response parsing
|
||||
parser.initialize(
|
||||
HTTPParser.RESPONSE,
|
||||
undefined, // asyncResource - not implemented
|
||||
req.maxHeaderSize || 0,
|
||||
lenient ? kLenientAll : kLenientNone,
|
||||
);
|
||||
|
||||
parser.socket = socket;
|
||||
parser.outgoing = req;
|
||||
req.parser = parser;
|
||||
socket.parser = parser;
|
||||
socket._httpMessage = req;
|
||||
|
||||
if (typeof req.maxHeadersCount === "number") {
|
||||
parser.maxHeaderPairs = req.maxHeadersCount << 1;
|
||||
}
|
||||
parser.joinDuplicateHeaders = req.joinDuplicateHeaders;
|
||||
parser.onIncoming = parserOnIncomingClient;
|
||||
|
||||
// Emit socket event
|
||||
process.nextTick(() => {
|
||||
req.emit("socket", socket);
|
||||
});
|
||||
}
|
||||
|
||||
function ClientRequest(input, options, cb) {
|
||||
if (!(this instanceof ClientRequest)) {
|
||||
return new (ClientRequest as any)(input, options, cb);
|
||||
@@ -94,7 +269,9 @@ function ClientRequest(input, options, cb) {
|
||||
const pushChunk = chunk => {
|
||||
this[kBodyChunks].push(chunk);
|
||||
if (writeCount > 1) {
|
||||
startFetch();
|
||||
connectToServer().catch(err => {
|
||||
this.emit("error", err);
|
||||
});
|
||||
}
|
||||
resolveNextChunk?.(false);
|
||||
};
|
||||
@@ -188,7 +365,9 @@ function ClientRequest(input, options, cb) {
|
||||
this[kAbortController].signal.addEventListener("abort", onAbort, {
|
||||
once: true,
|
||||
});
|
||||
startFetch();
|
||||
connectToServer().catch(err => {
|
||||
this.emit("error", err);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -258,7 +437,7 @@ function ClientRequest(input, options, cb) {
|
||||
|
||||
let fetching = false;
|
||||
|
||||
const startFetch = (customBody?) => {
|
||||
const connectToServer = async () => {
|
||||
if (fetching) {
|
||||
return false;
|
||||
}
|
||||
@@ -266,274 +445,81 @@ function ClientRequest(input, options, cb) {
|
||||
fetching = true;
|
||||
|
||||
const method = this[kMethod];
|
||||
|
||||
let keepalive = true;
|
||||
const agentKeepalive = this[kAgent]?.keepalive;
|
||||
if (agentKeepalive !== undefined) {
|
||||
keepalive = agentKeepalive;
|
||||
}
|
||||
|
||||
const protocol = this[kProtocol];
|
||||
const path = this[kPath];
|
||||
let host = this[kHost];
|
||||
const host = this[kHost];
|
||||
const port = this[kPort];
|
||||
const socketPath = this[kSocketPath];
|
||||
|
||||
const getURL = host => {
|
||||
if (isIPv6(host)) {
|
||||
host = `[${host}]`;
|
||||
}
|
||||
|
||||
if (path.startsWith("http://") || path.startsWith("https://")) {
|
||||
return [path, `${protocol}//${host}${this[kUseDefaultPort] ? "" : ":" + this[kPort]}`];
|
||||
} else {
|
||||
let proxy: string | undefined;
|
||||
const url = `${protocol}//${host}${this[kUseDefaultPort] ? "" : ":" + this[kPort]}${path}`;
|
||||
// support agent proxy url/string for http/https
|
||||
try {
|
||||
// getters can throw
|
||||
const agentProxy = this[kAgent]?.proxy;
|
||||
// this should work for URL like objects and strings
|
||||
proxy = agentProxy?.href || agentProxy;
|
||||
} catch {}
|
||||
return [url, proxy];
|
||||
}
|
||||
};
|
||||
|
||||
const go = (url, proxy, softFail = false) => {
|
||||
const tls =
|
||||
protocol === "https:" && this[kTls] ? { ...this[kTls], serverName: this[kTls].servername } : undefined;
|
||||
|
||||
const fetchOptions: any = {
|
||||
method,
|
||||
headers: this.getHeaders(),
|
||||
redirect: "manual",
|
||||
signal: this[kAbortController]?.signal,
|
||||
// Timeouts are handled via this.setTimeout.
|
||||
timeout: false,
|
||||
// Disable auto gzip/deflate
|
||||
decompress: false,
|
||||
keepalive,
|
||||
// Create connection options for Bun.connect()
|
||||
let connectionOptions;
|
||||
if (socketPath) {
|
||||
connectionOptions = { unix: socketPath };
|
||||
} else {
|
||||
connectionOptions = {
|
||||
hostname: host,
|
||||
port: port,
|
||||
};
|
||||
let keepOpen = false;
|
||||
// no body and not finished
|
||||
const isDuplex = customBody === undefined && !this.finished;
|
||||
|
||||
if (isDuplex) {
|
||||
fetchOptions.duplex = "half";
|
||||
keepOpen = true;
|
||||
// Add TLS options for HTTPS
|
||||
if (protocol === "https:" && this[kTls]) {
|
||||
connectionOptions.tls = { ...this[kTls], serverName: this[kTls].servername };
|
||||
}
|
||||
|
||||
if (method !== "GET" && method !== "HEAD" && method !== "OPTIONS") {
|
||||
const self = this;
|
||||
if (customBody !== undefined) {
|
||||
fetchOptions.body = customBody;
|
||||
} else if (isDuplex) {
|
||||
fetchOptions.body = async function* () {
|
||||
while (self[kBodyChunks]?.length > 0) {
|
||||
yield self[kBodyChunks].shift();
|
||||
}
|
||||
|
||||
if (self[kBodyChunks]?.length === 0) {
|
||||
self.emit("drain");
|
||||
}
|
||||
|
||||
while (!self.finished) {
|
||||
yield await new Promise(resolve => {
|
||||
resolveNextChunk = end => {
|
||||
resolveNextChunk = undefined;
|
||||
if (end) {
|
||||
resolve(undefined);
|
||||
} else {
|
||||
resolve(self[kBodyChunks].shift());
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
if (self[kBodyChunks]?.length === 0) {
|
||||
self.emit("drain");
|
||||
}
|
||||
}
|
||||
|
||||
handleResponse?.();
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (tls) {
|
||||
fetchOptions.tls = tls;
|
||||
}
|
||||
|
||||
if (!!$debug) {
|
||||
fetchOptions.verbose = true;
|
||||
}
|
||||
|
||||
if (proxy) {
|
||||
fetchOptions.proxy = proxy;
|
||||
}
|
||||
|
||||
const socketPath = this[kSocketPath];
|
||||
|
||||
if (socketPath) {
|
||||
fetchOptions.unix = socketPath;
|
||||
}
|
||||
|
||||
//@ts-ignore
|
||||
this[kFetchRequest] = fetch(url, fetchOptions).then(response => {
|
||||
if (this.aborted) {
|
||||
maybeEmitClose();
|
||||
return;
|
||||
}
|
||||
|
||||
handleResponse = () => {
|
||||
this[kFetchRequest] = null;
|
||||
this[kClearTimeout]();
|
||||
handleResponse = undefined;
|
||||
|
||||
const prevIsHTTPS = getIsNextIncomingMessageHTTPS();
|
||||
setIsNextIncomingMessageHTTPS(response.url.startsWith("https:"));
|
||||
var res = (this.res = new IncomingMessage(response, {
|
||||
[typeSymbol]: NodeHTTPIncomingRequestType.FetchResponse,
|
||||
[reqSymbol]: this,
|
||||
}));
|
||||
setIsNextIncomingMessageHTTPS(prevIsHTTPS);
|
||||
res.req = this;
|
||||
let timer;
|
||||
res.setTimeout = (msecs, callback) => {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
timer = setTimeout(() => {
|
||||
if (res.complete) {
|
||||
return;
|
||||
}
|
||||
res.emit("timeout");
|
||||
callback?.();
|
||||
}, msecs);
|
||||
};
|
||||
process.nextTick(
|
||||
(self, res) => {
|
||||
// If the user did not listen for the 'response' event, then they
|
||||
// can't possibly read the data, so we ._dump() it into the void
|
||||
// so that the socket doesn't hang there in a paused state.
|
||||
const contentLength = res.headers["content-length"];
|
||||
if (contentLength && isNaN(Number(contentLength))) {
|
||||
emitErrorEventNT(self, $HPE_UNEXPECTED_CONTENT_LENGTH("Parse Error"));
|
||||
|
||||
res.complete = true;
|
||||
maybeEmitClose();
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (self.aborted || !self.emit("response", res)) {
|
||||
res._dump();
|
||||
}
|
||||
} finally {
|
||||
maybeEmitClose();
|
||||
if (res.statusCode === 304) {
|
||||
res.complete = true;
|
||||
maybeEmitClose();
|
||||
return;
|
||||
}
|
||||
}
|
||||
},
|
||||
this,
|
||||
res,
|
||||
);
|
||||
};
|
||||
|
||||
if (!keepOpen) {
|
||||
handleResponse();
|
||||
}
|
||||
|
||||
onEnd();
|
||||
});
|
||||
|
||||
if (!softFail) {
|
||||
// Don't emit an error if we're iterating over multiple possible addresses and we haven't reached the end yet.
|
||||
// This is for the happy eyeballs implementation.
|
||||
this[kFetchRequest]
|
||||
.catch(err => {
|
||||
if (err.code === "ConnectionRefused") {
|
||||
err = new Error("ECONNREFUSED");
|
||||
err.code = "ECONNREFUSED";
|
||||
}
|
||||
// Node treats AbortError separately.
|
||||
// The "abort" listener on the abort controller should have called this
|
||||
if (isAbortError(err)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!!$debug) globalReportError(err);
|
||||
|
||||
try {
|
||||
this.emit("error", err);
|
||||
} catch (_err) {
|
||||
void _err;
|
||||
}
|
||||
})
|
||||
.finally(() => {
|
||||
if (!keepOpen) {
|
||||
fetching = false;
|
||||
this[kFetchRequest] = null;
|
||||
this[kClearTimeout]();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return this[kFetchRequest];
|
||||
};
|
||||
|
||||
if (isIP(host) || !options.lookup) {
|
||||
// Don't need to bother with lookup if it's already an IP address or no lookup function is provided.
|
||||
const [url, proxy] = getURL(host);
|
||||
go(url, proxy, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Use Bun.connect() to create a real TCP socket
|
||||
try {
|
||||
options.lookup(host, { all: true }, (err, results) => {
|
||||
if (err) {
|
||||
if (!!$debug) globalReportError(err);
|
||||
process.nextTick((self, err) => self.emit("error", err), this, err);
|
||||
return;
|
||||
}
|
||||
const socket = await Bun.connect({
|
||||
...connectionOptions,
|
||||
socket: {
|
||||
open: socket => {
|
||||
// Initialize the HTTP parser
|
||||
tickOnSocket(this, socket);
|
||||
|
||||
let candidates = results.sort((a, b) => b.family - a.family); // prefer IPv6
|
||||
// Send the HTTP request
|
||||
const requestLine = `${method} ${path} HTTP/1.1\r\n`;
|
||||
const headers = this.getHeaders();
|
||||
let headerString = "";
|
||||
|
||||
const fail = (message, name, code, syscall) => {
|
||||
const error = new Error(message);
|
||||
error.name = name;
|
||||
error.code = code;
|
||||
error.syscall = syscall;
|
||||
if (!!$debug) globalReportError(error);
|
||||
process.nextTick((self, err) => self.emit("error", err), this, error);
|
||||
};
|
||||
for (const [key, value] of Object.entries(headers)) {
|
||||
headerString += `${key}: ${value}\r\n`;
|
||||
}
|
||||
|
||||
if (candidates.length === 0) {
|
||||
fail("No records found", "DNSException", "ENOTFOUND", "getaddrinfo");
|
||||
return;
|
||||
}
|
||||
// Add Host header if not present
|
||||
if (!this.hasHeader("Host")) {
|
||||
headerString += `Host: ${host}${port !== 80 && port !== 443 ? `:${port}` : ""}\r\n`;
|
||||
}
|
||||
|
||||
if (!this.hasHeader("Host")) {
|
||||
this.setHeader("Host", `${host}:${port}`);
|
||||
}
|
||||
headerString += "\r\n";
|
||||
|
||||
// We want to try all possible addresses, beginning with the IPv6 ones, until one succeeds.
|
||||
// All addresses except for the last are allowed to "soft fail" -- instead of reporting
|
||||
// an error to the user, we'll just skip to the next address.
|
||||
// The last address is required to work, and if it fails we'll throw an error.
|
||||
const requestHeader = requestLine + headerString;
|
||||
socket.write(requestHeader);
|
||||
|
||||
const iterate = () => {
|
||||
if (candidates.length === 0) {
|
||||
// If we get to this point, it means that none of the addresses could be connected to.
|
||||
fail(`connect ECONNREFUSED ${host}:${port}`, "Error", "ECONNREFUSED", "connect");
|
||||
return;
|
||||
}
|
||||
// Send request body if present
|
||||
if (this[kBodyChunks] && this[kBodyChunks].length > 0) {
|
||||
for (const chunk of this[kBodyChunks]) {
|
||||
socket.write(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
const [url, proxy] = getURL(candidates.shift().address);
|
||||
go(url, proxy, candidates.length > 0).catch(iterate);
|
||||
};
|
||||
// Store socket reference
|
||||
this.socket = socket;
|
||||
socket._httpMessage = this;
|
||||
},
|
||||
|
||||
iterate();
|
||||
data: (socket, data) => {
|
||||
socketOnData.$call(socket, data);
|
||||
},
|
||||
end: socket => {
|
||||
socketOnEnd.$call(socket);
|
||||
},
|
||||
error: (socket, error) => {
|
||||
socketOnError.$call(socket, error);
|
||||
},
|
||||
close: socket => {
|
||||
socketOnClose.$call(socket);
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return true;
|
||||
@@ -544,27 +530,19 @@ function ClientRequest(input, options, cb) {
|
||||
}
|
||||
};
|
||||
|
||||
let onEnd = () => {};
|
||||
let handleResponse: (() => void) | undefined = () => {};
|
||||
|
||||
const send = () => {
|
||||
this.finished = true;
|
||||
this[kAbortController] ??= new AbortController();
|
||||
this[kAbortController].signal.addEventListener("abort", onAbort, { once: true });
|
||||
|
||||
var body = this[kBodyChunks] && this[kBodyChunks].length > 1 ? new Blob(this[kBodyChunks]) : this[kBodyChunks]?.[0];
|
||||
|
||||
try {
|
||||
startFetch(body);
|
||||
onEnd = () => {
|
||||
handleResponse?.();
|
||||
};
|
||||
} catch (err) {
|
||||
if (!!$debug) globalReportError(err);
|
||||
this.emit("error", err);
|
||||
} finally {
|
||||
process.nextTick(maybeEmitFinish.bind(this));
|
||||
}
|
||||
connectToServer()
|
||||
.catch(err => {
|
||||
if (!!$debug) globalReportError(err);
|
||||
this.emit("error", err);
|
||||
})
|
||||
.finally(() => {
|
||||
process.nextTick(maybeEmitFinish.bind(this));
|
||||
});
|
||||
};
|
||||
|
||||
// --- For faking the events in the right order ---
|
||||
@@ -805,6 +783,11 @@ function ClientRequest(input, options, cb) {
|
||||
this[kHost] = host;
|
||||
this[kProtocol] = protocol;
|
||||
|
||||
// Initialize socket-related properties
|
||||
this.socket = null;
|
||||
this.parser = null;
|
||||
this._hadError = false;
|
||||
|
||||
if (options.timeout !== undefined) {
|
||||
const timeout = getTimerDuration(options.timeout, "timeout");
|
||||
this.timeout = timeout;
|
||||
|
||||
Reference in New Issue
Block a user